Skip to main content

turul_a2a_aws_lambda/
lib.rs

1//! AWS Lambda adapter for turul-a2a.
2//!
3//! # Choosing a runner
4//!
5//! A Lambda binary receives raw event JSON; the adapter owns
6//! classifying that JSON and dispatching to the right A2A handler
7//! method. Adopters pick the runner whose name matches the Lambda's
8//! AWS trigger topology and end `main.rs` with one `.await?` call —
9//! `lambda_runtime` / `lambda_http` / event parsing never appear in
10//! adopter code.
11//!
12//! | Lambda triggers                   | Runner                                         |
13//! |-----------------------------------|------------------------------------------------|
14//! | HTTP only (Function URL / APIGW)  | [`LambdaA2aHandler::run_http_only`]            |
15//! | SQS only (event source mapping)   | [`LambdaA2aHandler::run_sqs_only`] *(`sqs`)*   |
16//! | HTTP **and** SQS on one function  | [`LambdaA2aHandler::run_http_and_sqs`] *(`sqs`)* |
17//! | not sure / just run it            | [`LambdaA2aHandler::run`]                      |
18//!
19//! [`LambdaA2aHandler::run`] is the default "it just works" entry
20//! point: without the `sqs` feature it aliases `run_http_only`; with
21//! `sqs` it aliases `run_http_and_sqs` (permissive, handles either
22//! shape). The explicit runners are **strict** — a non-matching
23//! event shape fails loudly, which is what hardened deployments
24//! and tests want.
25//!
26//! The one-call shortcut is [`LambdaA2aServerBuilder::run`] — it
27//! build-then-runs in a single fluent call. Use `.build()` + a
28//! handler method when the handler is needed for unit tests or
29//! custom middleware.
30//!
31//! ## Composing with non-framework triggers
32//!
33//! Lambdas wired to HTTP **plus** a third trigger the framework does
34//! not know about (EventBridge scheduler, DynamoDB stream, custom
35//! invoke) can't use a named runner directly — the Unknown branch
36//! is adopter-owned. Reach for the public classifier + the HTTP
37//! envelope primitive instead:
38//!
39//! ```ignore
40//! lambda_runtime::run(lambda_runtime::service_fn(
41//!     move |event: lambda_runtime::LambdaEvent<serde_json::Value>| {
42//!         let handler = handler.clone();
43//!         async move {
44//!             let (value, _ctx) = event.into_parts();
45//!             match turul_a2a_aws_lambda::classify_event(&value) {
46//!                 turul_a2a_aws_lambda::LambdaEvent::Http => {
47//!                     handler.handle_http_event_value(value).await
48//!                 }
49//!                 turul_a2a_aws_lambda::LambdaEvent::Sqs => {
50//!                     let sqs = serde_json::from_value(value)?;
51//!                     Ok(serde_json::to_value(handler.handle_sqs(sqs).await)?)
52//!                 }
53//!                 turul_a2a_aws_lambda::LambdaEvent::Unknown => {
54//!                     // Adopter-owned: run your scheduled sweep,
55//!                     // DynamoDB stream consumer, etc.
56//!                     run_adopter_specific_path(value).await
57//!                 }
58//!             }
59//!         }
60//!     }
61//! )).await
62//! ```
63//!
64//! [`LambdaA2aHandler::handle_http_event_value`] is available without
65//! the `sqs` feature; `handle_sqs` is gated on `sqs`. `classify_event`
66//! + `LambdaEvent` are always available.
67//!
68//! ## API Gateway path prefix
69//!
70//! When Lambda sits behind a REST API Gateway with `AWS_PROXY`
71//! integration, the event carries the full stage + resource tree in
72//! the path (e.g. `/stage/agent/message:send`),
73//! which the root-rooted A2A router would 404. Configure the prefix
74//! to strip via [`LambdaA2aServerBuilder::strip_path_prefix`]:
75//!
76//! ```ignore
77//! LambdaA2aServerBuilder::new()
78//!     .executor(my_executor)
79//!     .storage(my_storage)
80//!     .strip_path_prefix("/stage/agent")  // API GW prefix
81//!     .run()
82//!     .await
83//! ```
84//!
85//! The strip applies to `run_http_only` and `run_http_and_sqs`.
86//! SQS events are unaffected. Non-matching paths pass through
87//! unchanged (router still 404s on genuinely unknown paths —
88//! that's the correct failure mode).
89//!
90//! ```ignore
91//! // HTTP-only Lambda (request Lambda in two-Lambda topology):
92//! LambdaA2aServerBuilder::new()
93//!     .executor(my_executor)
94//!     .storage(my_storage)
95//!     .with_sqs_return_immediately(queue_url, sqs)  // enqueues durable jobs
96//!     .build()?
97//!     .run_http_only()
98//!     .await
99//!
100//! // Single-function HTTP+SQS demo:
101//! LambdaA2aServerBuilder::new()
102//!     .executor(my_executor)
103//!     .storage(my_storage)
104//!     .with_sqs_return_immediately(queue_url, sqs)
105//!     .run()            // builder shortcut; dispatches HTTP + SQS
106//!     .await
107//!
108//! // Pure SQS worker — no with_sqs_return_immediately(...) needed
109//! // (it consumes the queue, never enqueues):
110//! LambdaA2aServerBuilder::new()
111//!     .executor(my_executor)
112//!     .storage(my_storage)
113//!     .build()?
114//!     .run_sqs_only()
115//!     .await
116//! ```
117//!
118//! # Adapter internals
119//!
120//! Thin wrapper: converts Lambda events to axum requests, delegates to the
121//! same Router, converts responses back. Per ADR-008/ADR-009:
122//! - Authorizer context mapped via synthetic headers with anti-spoofing
123//! - Streaming supported via durable event store (D3)
124//! - SSE responses are buffered: task executes, events are collected, returned as one response
125//! - `POST /message:stream` executes the task within the Lambda invocation and returns all events
126//! - `GET /tasks/{id}:subscribe` is for tasks that are **not** in a terminal
127//!   state. Within one invocation it emits the initial `Task` snapshot,
128//!   replays stored events via `Last-Event-ID`, and closes when the task
129//!   reaches a terminal state. Subscribing to an already-terminal task
130//!   returns `UnsupportedOperationError` per A2A v1.0 §3.1.6 / ADR-010
131//!   §4.3. For retrieving a terminal task's final state use `GetTask`.
132//!
133//! # Push-notification delivery — external triggers are mandatory
134//!
135//! Push delivery on Lambda (ADR-013) is architecturally different
136//! from the binary server. The request Lambda installed via
137//! [`LambdaA2aServerBuilder`] still constructs a `PushDispatcher`
138//! when `push_delivery_store` is wired, but any `tokio::spawn`
139//! continuation it emits post-return is **opportunistic only** — the
140//! Lambda execution environment may be frozen indefinitely between
141//! invocations, so nothing can depend on that continuation completing
142//! (ADR-013 §4.4).
143//!
144//! Correctness for push delivery on Lambda is carried by:
145//!
146//! 1. The atomic pending-dispatch marker written inside the request
147//!    Lambda's commit transaction (ADR-013 §4.3 — opt in via
148//!    `StorageImpl::with_push_dispatch_enabled(true)`).
149//! 2. [`LambdaStreamRecoveryHandler`] — DynamoDB Streams trigger on
150//!    `a2a_push_pending_dispatches`. DynamoDB backends only.
151//! 3. [`LambdaScheduledRecoveryHandler`] — EventBridge Scheduler
152//!    backstop. **Required for all backends**; it is the sole
153//!    recovery path for SQLite / PostgreSQL / in-memory deployments.
154//!
155//! Without at least the scheduled worker, push delivery on Lambda is
156//! not durable — a marker written on a cold invocation may never be
157//! consumed. The example wiring lives in
158//! `examples/lambda-stream-worker` and
159//! `examples/lambda-scheduled-worker`.
160//!
161//! Lambda streaming is request-scoped (not persistent SSE connections). The durable
162//! event store ensures events survive across invocations. Clients reconnect with
163//! `Last-Event-ID` for continuation.
164//!
165//! # Cross-instance cancellation
166//!
167//! Lambda invocations are stateless and short-lived, so the Lambda
168//! adapter does **not** run the persistent cross-instance cancel
169//! poller that `A2aServer::run()` spawns. Cancellation behaviour on
170//! the Lambda adapter:
171//!
172//! - **Marker writes** — `CancelTask` on a Lambda invocation writes
173//!   the cancel marker to the shared backend (DynamoDB / PostgreSQL).
174//!   This works.
175//! - **Propagation to a live executor on the SAME Lambda invocation** —
176//!   works via the same-instance token-trip path in `core_cancel_task`.
177//! - **Propagation to a live executor on a DIFFERENT Lambda invocation
178//!   (warm container)** — **not currently supported**. There is no
179//!   persistent poller to observe markers written by other
180//!   invocations. A subsequent invocation whose handler reads the
181//!   marker directly may act on it, but that is not a substitute for
182//!   the server runtime's live propagation.
183//!
184//! The builder still **requires** an `A2aCancellationSupervisor`
185//! implementation on the same backend so that marker writes reach
186//! the correct backend and a future polling-adapter variant can
187//! consume them. If a deployment passes a non-matching supervisor,
188//! `build()` rejects the configuration.
189
190mod adapter;
191mod auth;
192mod event;
193mod no_streaming;
194mod scheduled_recovery;
195mod stream_recovery;
196
197#[cfg(feature = "sqs")]
198mod durable;
199
200#[cfg(test)]
201mod builder_tests;
202#[cfg(test)]
203mod scheduled_recovery_tests;
204#[cfg(test)]
205mod stream_recovery_tests;
206
207pub use adapter::{axum_to_lambda_response, lambda_to_axum_request};
208pub use auth::{AuthorizerMapping, LambdaAuthorizerMiddleware};
209pub use event::{LambdaEvent, classify_event};
210pub use no_streaming::NoStreamingLayer;
211pub use scheduled_recovery::{
212    LambdaScheduledRecoveryConfig, LambdaScheduledRecoveryHandler, LambdaScheduledRecoveryResponse,
213};
214pub use stream_recovery::LambdaStreamRecoveryHandler;
215
216#[cfg(feature = "sqs")]
217pub use durable::{SqsDurableExecutorQueue, drive_sqs_batch};
218
219use std::sync::Arc;
220
221use turul_a2a::error::A2aError;
222use turul_a2a::executor::AgentExecutor;
223use turul_a2a::middleware::{A2aMiddleware, MiddlewareStack};
224use turul_a2a::push::A2aPushDeliveryStore;
225use turul_a2a::router::{AppState, build_router};
226use turul_a2a::server::RuntimeConfig;
227use turul_a2a::storage::{
228    A2aAtomicStore, A2aCancellationSupervisor, A2aEventStore, A2aPushNotificationStorage,
229    A2aTaskStorage,
230};
231use turul_a2a::streaming::TaskEventBroker;
232
233/// Builder for Lambda A2A handler.
234///
235/// Use `.storage(my_storage)` to supply a single backend implementing all storage traits.
236/// This satisfies ADR-009's same-backend requirement and enables streaming via durable
237/// event store.
238///
239/// For push-notification delivery, wire `.storage()` with a backend that implements
240/// [`A2aPushDeliveryStore`] (all four first-party backends do). The atomic store
241/// must also opt in via `with_push_dispatch_enabled(true)` (ADR-013 §4.3); the
242/// builder rejects configurations where the flag and the delivery store disagree.
243pub struct LambdaA2aServerBuilder {
244    executor: Option<Arc<dyn AgentExecutor>>,
245    task_storage: Option<Arc<dyn A2aTaskStorage>>,
246    push_storage: Option<Arc<dyn A2aPushNotificationStorage>>,
247    event_store: Option<Arc<dyn A2aEventStore>>,
248    atomic_store: Option<Arc<dyn A2aAtomicStore>>,
249    cancellation_supervisor: Option<Arc<dyn A2aCancellationSupervisor>>,
250    push_delivery_store: Option<Arc<dyn A2aPushDeliveryStore>>,
251    middleware: Vec<Arc<dyn A2aMiddleware>>,
252    runtime_config: RuntimeConfig,
253    durable_executor_queue: Option<Arc<dyn turul_a2a::durable_executor::DurableExecutorQueue>>,
254    path_prefix: Option<String>,
255}
256
257impl LambdaA2aServerBuilder {
258    pub fn new() -> Self {
259        Self {
260            executor: None,
261            task_storage: None,
262            push_storage: None,
263            event_store: None,
264            atomic_store: None,
265            cancellation_supervisor: None,
266            push_delivery_store: None,
267            middleware: vec![],
268            runtime_config: RuntimeConfig::default(),
269            durable_executor_queue: None,
270            path_prefix: None,
271        }
272    }
273
274    /// Configure an HTTP path prefix to strip before the A2A router
275    /// sees the request. Needed when Lambda sits behind an API Gateway
276    /// whose stage + resource tree is forwarded into the function
277    /// (e.g. REST API `AWS_PROXY` integrations surface
278    /// `/stage/agent/message:send` where the A2A
279    /// router expects `/message:send`).
280    ///
281    /// The prefix must start with `/` and must not end with `/`
282    /// (unless it is exactly `/`, which is a no-op). Segment boundaries
283    /// are respected — a prefix of `/dev` will not strip `/devs/...`.
284    /// Paths that do not start with the prefix pass through unchanged
285    /// (the router will 404 on a genuinely unknown path, which is the
286    /// correct failure mode). Applies to both
287    /// [`LambdaA2aHandler::run_http_only`] and
288    /// [`LambdaA2aHandler::run_http_and_sqs`]. SQS events are
289    /// unaffected.
290    pub fn strip_path_prefix(mut self, prefix: impl Into<String>) -> Self {
291        self.path_prefix = Some(prefix.into());
292        self
293    }
294
295    /// Wire a durable executor queue (ADR-018). When set, the
296    /// `return_immediately = true` path in `core_send_message` enqueues
297    /// a [`turul_a2a::durable_executor::QueuedExecutorJob`] on this
298    /// queue instead of spawning the executor locally, and the
299    /// capability flag `RuntimeConfig::supports_return_immediately` is
300    /// turned **back on** as an implementation detail — the capability
301    /// cannot be claimed without supplying the queue.
302    ///
303    /// Adopters should typically reach for the SQS-specific helper
304    /// [`Self::with_sqs_return_immediately`] instead; this generic
305    /// method exists so adopters can inject in-memory fakes for tests
306    /// or provide non-SQS transports (Kinesis, Step Functions task
307    /// token, self-invoke, etc.).
308    pub fn with_durable_executor(
309        mut self,
310        queue: Arc<dyn turul_a2a::durable_executor::DurableExecutorQueue>,
311    ) -> Self {
312        self.durable_executor_queue = Some(queue);
313        self.runtime_config.supports_return_immediately = true;
314        self
315    }
316
317    /// Wire the AWS SQS durable executor queue (ADR-018). Ergonomic
318    /// helper over [`Self::with_durable_executor`] that constructs a
319    /// [`SqsDurableExecutorQueue`] from a `queue_url` + shared SQS
320    /// client. Requires the `sqs` feature.
321    #[cfg(feature = "sqs")]
322    pub fn with_sqs_return_immediately(
323        self,
324        queue_url: impl Into<String>,
325        sqs_client: Arc<aws_sdk_sqs::Client>,
326    ) -> Self {
327        let queue = Arc::new(SqsDurableExecutorQueue::new(queue_url, sqs_client));
328        self.with_durable_executor(queue)
329    }
330
331    pub fn executor(mut self, exec: impl AgentExecutor + 'static) -> Self {
332        self.executor = Some(Arc::new(exec));
333        self
334    }
335
336    /// Set all storage from a single backend instance.
337    ///
338    /// This is the **preferred** method — a single struct implementing all
339    /// storage traits guarantees the same-backend requirement (ADR-009)
340    /// AND ensures the cancellation supervisor reads the same backend
341    /// the cancel marker is written to. A mismatch here (e.g., DynamoDB
342    /// task storage + in-memory cancellation supervisor) silently
343    /// breaks cross-instance cancellation.
344    ///
345    /// ADR-013 §4.3 errata: `.storage()` wires storage traits only. It does
346    /// **not** auto-register the storage as a push-delivery store, even if
347    /// the backend happens to implement [`A2aPushDeliveryStore`]. To opt in
348    /// to push delivery, call [`Self::push_delivery_store`] explicitly and
349    /// call `with_push_dispatch_enabled(true)` on the storage instance
350    /// before passing it here. Non-push deployments need neither.
351    pub fn storage<S>(mut self, storage: S) -> Self
352    where
353        S: A2aTaskStorage
354            + A2aPushNotificationStorage
355            + A2aEventStore
356            + A2aAtomicStore
357            + A2aCancellationSupervisor
358            + Clone
359            + 'static,
360    {
361        self.task_storage = Some(Arc::new(storage.clone()));
362        self.push_storage = Some(Arc::new(storage.clone()));
363        self.event_store = Some(Arc::new(storage.clone()));
364        self.atomic_store = Some(Arc::new(storage.clone()));
365        self.cancellation_supervisor = Some(Arc::new(storage));
366        self
367    }
368
369    /// Set task storage individually. Must be paired with event_store/atomic_store
370    /// AND `cancellation_supervisor()` for same-backend compliance.
371    pub fn task_storage(mut self, s: impl A2aTaskStorage + 'static) -> Self {
372        self.task_storage = Some(Arc::new(s));
373        self
374    }
375
376    /// Set push notification storage individually.
377    pub fn push_storage(mut self, s: impl A2aPushNotificationStorage + 'static) -> Self {
378        self.push_storage = Some(Arc::new(s));
379        self
380    }
381
382    /// Set event store individually. Must match task_storage backend.
383    pub fn event_store(mut self, s: impl A2aEventStore + 'static) -> Self {
384        self.event_store = Some(Arc::new(s));
385        self
386    }
387
388    /// Set atomic store individually. Must match task_storage backend.
389    pub fn atomic_store(mut self, s: impl A2aAtomicStore + 'static) -> Self {
390        self.atomic_store = Some(Arc::new(s));
391        self
392    }
393
394    /// Set the cancellation supervisor individually. Must match
395    /// task_storage backend — otherwise `:cancel` writes the marker to
396    /// one backend and the supervisor reads from another, silently
397    /// breaking cross-instance cancellation. Prefer `.storage()` for
398    /// unified wiring. Consumed by `core_cancel_task` (ADR-012).
399    pub fn cancellation_supervisor(mut self, s: impl A2aCancellationSupervisor + 'static) -> Self {
400        self.cancellation_supervisor = Some(Arc::new(s));
401        self
402    }
403
404    /// Set the push delivery store individually (ADR-011 §10 / ADR-013).
405    ///
406    /// Prefer `.storage()` for unified wiring. The delivery store MUST
407    /// be on the same backend as `.task_storage(...)` — the builder
408    /// rejects mismatches. Passing a delivery store also requires the
409    /// atomic store to have opted in via `with_push_dispatch_enabled(true)`
410    /// (ADR-013 §4.3).
411    pub fn push_delivery_store(mut self, store: impl A2aPushDeliveryStore + 'static) -> Self {
412        self.push_delivery_store = Some(Arc::new(store));
413        self
414    }
415
416    /// Override the runtime configuration (timeouts, retry budgets,
417    /// push tuning). Defaults to [`RuntimeConfig::default()`]. The
418    /// builder validates push settings (claim expiry vs retry horizon,
419    /// ADR-011 §10.3) when `.push_delivery_store(...)` is wired.
420    pub fn runtime_config(mut self, cfg: RuntimeConfig) -> Self {
421        self.runtime_config = cfg;
422        self
423    }
424
425    pub fn middleware(mut self, mw: Arc<dyn A2aMiddleware>) -> Self {
426        self.middleware.push(mw);
427        self
428    }
429
430    pub fn build(self) -> Result<LambdaA2aHandler, A2aError> {
431        let executor = self
432            .executor
433            .ok_or(A2aError::Internal("executor is required".into()))?;
434        let task_storage = self.task_storage.ok_or(A2aError::Internal(
435            "task_storage is required for Lambda".into(),
436        ))?;
437        let push_storage = self.push_storage.ok_or(A2aError::Internal(
438            "push_storage is required for Lambda".into(),
439        ))?;
440        let event_store: Arc<dyn A2aEventStore> = self.event_store.ok_or(A2aError::Internal(
441            "event_store is required for Lambda (use .storage() for unified backend)".into(),
442        ))?;
443        let atomic_store: Arc<dyn A2aAtomicStore> = self.atomic_store.ok_or(A2aError::Internal(
444            "atomic_store is required for Lambda (use .storage() for unified backend)".into(),
445        ))?;
446        // ADR-012: the cancellation supervisor MUST be supplied. No
447        // InMemoryA2aStorage fallback: that would silently break
448        // cross-instance cancellation by reading markers from a
449        // different backend than they were written to.
450        let cancellation_supervisor: Arc<dyn A2aCancellationSupervisor> =
451            self.cancellation_supervisor.ok_or(A2aError::Internal(
452                "cancellation_supervisor is required for Lambda. Use .storage() for unified \
453                 backend wiring, or .cancellation_supervisor(...) alongside the individual \
454                 storage setters. Omitting it would silently break cross-instance \
455                 cancellation in ADR-012."
456                    .into(),
457            ))?;
458
459        // Same-backend enforcement (ADR-009 + ADR-012).
460        let task_backend = task_storage.backend_name();
461        let push_backend = push_storage.backend_name();
462        let event_backend = event_store.backend_name();
463        let atomic_backend = atomic_store.backend_name();
464        let supervisor_backend = cancellation_supervisor.backend_name();
465        if task_backend != push_backend
466            || task_backend != event_backend
467            || task_backend != atomic_backend
468            || task_backend != supervisor_backend
469        {
470            return Err(A2aError::Internal(format!(
471                "Storage backend mismatch: task={task_backend}, push={push_backend}, \
472                 event={event_backend}, atomic={atomic_backend}, \
473                 cancellation_supervisor={supervisor_backend}. \
474                 ADR-009 requires all storage traits to share the same backend. \
475                 ADR-012 requires the cancellation supervisor on the same backend \
476                 so cross-instance cancel markers are observable. \
477                 Use .storage() for unified backend."
478            )));
479        }
480
481        let push_delivery_store = self.push_delivery_store;
482
483        // Push-dispatch consistency (ADR-013 §4.3): the atomic store's
484        // opt-in flag and the presence of `push_delivery_store` MUST
485        // agree. Both-true = push fully wired; both-false = non-push
486        // deployment. Mixed cases are build errors — this mirrors the
487        // main server builder (server/mod.rs).
488        match (
489            push_delivery_store.is_some(),
490            atomic_store.push_dispatch_enabled(),
491        ) {
492            (true, true) | (false, false) => {}
493            (true, false) => {
494                return Err(A2aError::Internal(
495                    "push_delivery_store wired but atomic_store.push_dispatch_enabled() \
496                     is false. Call .with_push_dispatch_enabled(true) on the backend \
497                     storage before passing it to .storage()."
498                        .into(),
499                ));
500            }
501            (false, true) => {
502                return Err(A2aError::Internal(
503                    "atomic_store.push_dispatch_enabled() is true but no \
504                     push_delivery_store is wired. Pending-dispatch markers would be \
505                     written with no consumer, imposing load-bearing infra for no \
506                     benefit. If you need to populate markers for an external \
507                     consumer, open an issue for a distinctly-named opt-in — for now, \
508                     this configuration is rejected."
509                        .into(),
510                ));
511            }
512        }
513
514        // ADR-017 §Decision Bug 1: the Lambda execution environment
515        // freezes after the HTTP response is flushed, so any
516        // `tokio::spawn`'d executor continuation is opportunistic only
517        // (ADR-013 §4.4). Refuse `SendMessageConfiguration.return_immediately
518        // = true` at the `core_send_message` entry point via this
519        // capability flag instead of silently orphaning the executor.
520        //
521        // ADR-018: if a durable executor queue is wired via
522        // [`Self::with_durable_executor`] (or the SQS helper), the
523        // capability is re-enabled because enqueueing is durable
524        // across the Lambda freeze. The explicit re-enable lives in
525        // the builder method so the capability cannot be claimed
526        // without the mechanism.
527        let mut runtime_config = self.runtime_config;
528        if self.durable_executor_queue.is_none() {
529            runtime_config.supports_return_immediately = false;
530        }
531
532        // Push dispatcher wiring (ADR-011 §10 / ADR-013 §7). When
533        // `push_delivery_store` is present, validate its backend matches,
534        // validate the retry-horizon invariant, and construct the
535        // `PushDispatcher`. Under Lambda the dispatcher runs
536        // opportunistically post-return (ADR-013 §4.4); correctness
537        // comes from the atomic marker + stream/scheduler recovery.
538        let push_dispatcher: Option<Arc<turul_a2a::push::PushDispatcher>> =
539            if let Some(delivery) = push_delivery_store.as_ref() {
540                let push_delivery_backend = delivery.backend_name();
541                if task_backend != push_delivery_backend {
542                    return Err(A2aError::Internal(format!(
543                        "Storage backend mismatch: task={task_backend}, \
544                         push_delivery={push_delivery_backend}. \
545                         ADR-009 requires all storage traits to share the same backend."
546                    )));
547                }
548
549                // ADR-011 §10.3: claim expiry must exceed the worst-case
550                // retry horizon so a live claim is not mis-classified as
551                // stale mid-retry.
552                let retry_horizon = runtime_config
553                    .push_backoff_cap
554                    .saturating_mul(runtime_config.push_max_attempts as u32);
555                if runtime_config.push_claim_expiry <= retry_horizon {
556                    return Err(A2aError::Internal(format!(
557                        "push_claim_expiry ({:?}) must be greater than retry horizon \
558                         (push_max_attempts={} * push_backoff_cap={:?} = {:?}). \
559                         Raise push_claim_expiry or lower push_max_attempts/push_backoff_cap.",
560                        runtime_config.push_claim_expiry,
561                        runtime_config.push_max_attempts,
562                        runtime_config.push_backoff_cap,
563                        retry_horizon
564                    )));
565                }
566
567                let mut delivery_cfg = turul_a2a::push::delivery::PushDeliveryConfig::default();
568                delivery_cfg.max_attempts = runtime_config.push_max_attempts as u32;
569                delivery_cfg.backoff_base = runtime_config.push_backoff_base;
570                delivery_cfg.backoff_cap = runtime_config.push_backoff_cap;
571                delivery_cfg.backoff_jitter = runtime_config.push_backoff_jitter;
572                delivery_cfg.request_timeout = runtime_config.push_request_timeout;
573                delivery_cfg.connect_timeout = runtime_config.push_connect_timeout;
574                delivery_cfg.read_timeout = runtime_config.push_read_timeout;
575                delivery_cfg.claim_expiry = runtime_config.push_claim_expiry;
576                delivery_cfg.max_payload_bytes = runtime_config.push_max_payload_bytes;
577                delivery_cfg.allow_insecure_urls = runtime_config.allow_insecure_push_urls;
578
579                let instance_id = format!("a2a-lambda-{}", uuid::Uuid::now_v7());
580                let worker = turul_a2a::push::delivery::PushDeliveryWorker::new(
581                    delivery.clone(),
582                    delivery_cfg,
583                    None,
584                    instance_id,
585                )
586                .map_err(|e| A2aError::Internal(format!("push worker build failed: {e}")))?;
587
588                Some(Arc::new(turul_a2a::push::PushDispatcher::new(
589                    Arc::new(worker),
590                    push_storage.clone(),
591                    task_storage.clone(),
592                )))
593            } else {
594                None
595            };
596
597        let state = AppState {
598            executor,
599            task_storage,
600            push_storage,
601            event_store,
602            atomic_store,
603            event_broker: TaskEventBroker::new(),
604            middleware_stack: Arc::new(MiddlewareStack::new(self.middleware)),
605            runtime_config,
606            in_flight: Arc::new(turul_a2a::server::in_flight::InFlightRegistry::new()),
607            cancellation_supervisor,
608            push_delivery_store,
609            push_dispatcher,
610            durable_executor_queue: self.durable_executor_queue,
611        };
612
613        let router = build_router(state.clone());
614
615        let path_prefix = match self.path_prefix {
616            None => None,
617            Some(p) if p.is_empty() || p == "/" => None,
618            Some(p) => {
619                if !p.starts_with('/') {
620                    return Err(A2aError::InvalidRequest {
621                        message: format!(
622                            "LambdaA2aServerBuilder::strip_path_prefix: prefix must start with '/'; got {p:?}"
623                        ),
624                    });
625                }
626                if p.ends_with('/') {
627                    return Err(A2aError::InvalidRequest {
628                        message: format!(
629                            "LambdaA2aServerBuilder::strip_path_prefix: prefix must not end with '/' (except '/'); got {p:?}"
630                        ),
631                    });
632                }
633                Some(Arc::from(p))
634            }
635        };
636
637        Ok(LambdaA2aHandler {
638            router,
639            state,
640            path_prefix,
641        })
642    }
643}
644
645impl Default for LambdaA2aServerBuilder {
646    fn default() -> Self {
647        Self::new()
648    }
649}
650
651/// Lambda handler wrapping the axum Router.
652#[derive(Clone)]
653pub struct LambdaA2aHandler {
654    router: axum::Router,
655    /// Held for ADR-018 SQS-event dispatch (`handle_sqs`). Always
656    /// populated — the SQS handler method is gated behind the `sqs`
657    /// feature but the state is carried unconditionally so callers can
658    /// reach it for diagnostics or future non-HTTP / non-SQS handlers.
659    #[cfg_attr(not(feature = "sqs"), allow(dead_code))]
660    state: AppState,
661    /// Leading path segment to strip from each incoming HTTP request
662    /// before the A2A router sees it. `None` = no stripping. Set via
663    /// [`LambdaA2aServerBuilder::strip_path_prefix`].
664    path_prefix: Option<Arc<str>>,
665}
666
667impl LambdaA2aHandler {
668    pub fn builder() -> LambdaA2aServerBuilder {
669        LambdaA2aServerBuilder::new()
670    }
671
672    /// Handle a Lambda HTTP request.
673    pub async fn handle(
674        &self,
675        event: lambda_http::Request,
676    ) -> Result<lambda_http::Response<lambda_http::Body>, lambda_http::Error> {
677        let axum_req = lambda_to_axum_request(event)?;
678        let axum_req = match &self.path_prefix {
679            Some(prefix) => adapter::strip_request_path_prefix(axum_req, prefix),
680            None => axum_req,
681        };
682
683        let axum_resp = tower::ServiceExt::oneshot(self.router.clone(), axum_req)
684            .await
685            .map_err(|e| lambda_http::Error::from(format!("Router error: {e}")))?;
686
687        axum_to_lambda_response(axum_resp).await
688    }
689
690    /// Handle a raw Lambda HTTP event delivered as `serde_json::Value`
691    /// and return a response envelope **matching the inbound event
692    /// shape**:
693    ///
694    /// | Inbound shape detected by `lambda_http::LambdaRequest`   | Response envelope                  |
695    /// |----------------------------------------------------------|------------------------------------|
696    /// | API Gateway v1 (REST) — top-level `httpMethod` + `path`  | `ApiGatewayProxyResponse`          |
697    /// | API Gateway v2 / Function URL — `requestContext.http`    | `ApiGatewayV2httpResponse`         |
698    /// | ALB target group — `requestContext.elb`                  | `AlbTargetGroupResponse`           |
699    /// | WebSocket / unrecognised                                  | `Err(...)`                         |
700    ///
701    /// Mismatching envelopes — most commonly a v2 response sent in
702    /// response to a v1 invocation — cause API Gateway to return
703    /// `502 Bad Gateway` regardless of the body, because the response
704    /// fails the gateway's contract validation. This method ensures
705    /// symmetry: response shape is always picked from the inbound
706    /// shape, never hardcoded.
707    ///
708    /// This is the framework's canonical HTTP envelope conversion
709    /// primitive — useful when a single Lambda function routes among
710    /// HTTP and one or more non-framework triggers (EventBridge,
711    /// DynamoDB streams, custom invoke) and the adopter drives
712    /// [`lambda_runtime::run`] with `serde_json::Value` directly.
713    /// Handles the `LambdaRequest` deserialization, the text-vs-base64
714    /// content-type detection, and the response envelope rebuild.
715    ///
716    /// Body encoding: text for `text/*`, `application/json`,
717    /// `application/xml`, `application/javascript`, or anything with
718    /// `charset=`; base64 otherwise.
719    ///
720    /// Path-prefix stripping (see
721    /// [`LambdaA2aServerBuilder::strip_path_prefix`]) is applied
722    /// because dispatch flows through [`Self::handle`]. Adopters who
723    /// only serve HTTP should continue to use [`Self::run_http_only`]
724    /// or [`LambdaA2aServerBuilder::run`]; this entry point exists for
725    /// composition with adopter-owned third triggers.
726    pub async fn handle_http_event_value(
727        &self,
728        value: serde_json::Value,
729    ) -> Result<serde_json::Value, lambda_runtime::Error> {
730        use base64::Engine;
731        use http_body_util::BodyExt;
732
733        let lambda_req: lambda_http::request::LambdaRequest = serde_json::from_value(value)
734            .map_err(|e| lambda_runtime::Error::from(format!("invalid HTTP event: {e}")))?;
735
736        // Capture the inbound shape so we can emit the matching
737        // response envelope. Lost once we convert to `lambda_http::Request`.
738        let shape = match &lambda_req {
739            lambda_http::request::LambdaRequest::ApiGatewayV1(_) => HttpEventShape::ApiGatewayV1,
740            lambda_http::request::LambdaRequest::ApiGatewayV2(_) => HttpEventShape::ApiGatewayV2,
741            lambda_http::request::LambdaRequest::Alb(_) => HttpEventShape::Alb,
742            lambda_http::request::LambdaRequest::WebSocket(_) => {
743                return Err(lambda_runtime::Error::from(
744                    "WebSocket Lambda events are not supported by this adapter",
745                ));
746            }
747            _ => {
748                return Err(lambda_runtime::Error::from(
749                    "unsupported Lambda HTTP event variant",
750                ));
751            }
752        };
753
754        let req: lambda_http::Request = lambda_req.into();
755
756        let resp = self
757            .handle(req)
758            .await
759            .map_err(|e| lambda_runtime::Error::from(format!("handler error: {e}")))?;
760        let (parts, body) = resp.into_parts();
761
762        let bytes = body
763            .collect()
764            .await
765            .map_err(|e| lambda_runtime::Error::from(format!("body collect: {e}")))?
766            .to_bytes();
767
768        let ct = parts
769            .headers
770            .get(http::header::CONTENT_TYPE)
771            .and_then(|v| v.to_str().ok())
772            .unwrap_or("application/octet-stream")
773            .to_ascii_lowercase();
774        let is_text = ct.starts_with("text/")
775            || ct.starts_with("application/json")
776            || ct.starts_with("application/xml")
777            || ct.starts_with("application/javascript")
778            || ct.contains("charset=");
779
780        let (body_str, is_base64) = if bytes.is_empty() {
781            (None, false)
782        } else if is_text {
783            match std::str::from_utf8(&bytes) {
784                Ok(s) => (Some(s.to_string()), false),
785                Err(_) => (
786                    Some(base64::engine::general_purpose::STANDARD.encode(&bytes)),
787                    true,
788                ),
789            }
790        } else {
791            (
792                Some(base64::engine::general_purpose::STANDARD.encode(&bytes)),
793                true,
794            )
795        };
796
797        let status = parts.status.as_u16() as i64;
798        let headers = parts.headers;
799        let body_lambda = body_str.map(aws_lambda_events::encodings::Body::Text);
800
801        match shape {
802            HttpEventShape::ApiGatewayV1 => {
803                let mut api_resp = aws_lambda_events::apigw::ApiGatewayProxyResponse::default();
804                api_resp.status_code = status;
805                api_resp.headers = headers;
806                api_resp.body = body_lambda;
807                api_resp.is_base64_encoded = is_base64;
808                serde_json::to_value(api_resp).map_err(|e| {
809                    lambda_runtime::Error::from(format!("serialise APIGW v1 response: {e}"))
810                })
811            }
812            HttpEventShape::ApiGatewayV2 => {
813                let mut api_resp = aws_lambda_events::apigw::ApiGatewayV2httpResponse::default();
814                api_resp.status_code = status;
815                api_resp.headers = headers;
816                api_resp.body = body_lambda;
817                api_resp.is_base64_encoded = is_base64;
818                serde_json::to_value(api_resp).map_err(|e| {
819                    lambda_runtime::Error::from(format!("serialise APIGW v2 response: {e}"))
820                })
821            }
822            HttpEventShape::Alb => {
823                let mut api_resp = aws_lambda_events::alb::AlbTargetGroupResponse::default();
824                api_resp.status_code = status;
825                api_resp.headers = headers;
826                api_resp.body = body_lambda;
827                api_resp.is_base64_encoded = is_base64;
828                serde_json::to_value(api_resp).map_err(|e| {
829                    lambda_runtime::Error::from(format!("serialise ALB response: {e}"))
830                })
831            }
832        }
833    }
834}
835
836/// Inbound HTTP event shape detected by [`LambdaA2aHandler::handle_http_event_value`].
837///
838/// The response envelope must match the inbound shape — see method
839/// docs for the contract.
840#[derive(Debug, Clone, Copy, PartialEq, Eq)]
841enum HttpEventShape {
842    ApiGatewayV1,
843    ApiGatewayV2,
844    Alb,
845}
846
847impl LambdaA2aHandler {
848    /// Handle an SQS batch from the durable executor queue (ADR-018).
849    ///
850    /// Returns `SqsBatchResponse` whose `batch_item_failures` list
851    /// tells the event source mapping which records to retry.
852    /// Requires `FunctionResponseTypes: ["ReportBatchItemFailures"]`
853    /// on the mapping.
854    ///
855    /// Each record: deserialize envelope → load task → terminal-no-op
856    /// check → cancel-marker direct-CANCELED commit → run executor.
857    /// See `durable::drive_sqs_batch` for the full semantics.
858    #[cfg(feature = "sqs")]
859    pub async fn handle_sqs(
860        &self,
861        event: aws_lambda_events::event::sqs::SqsEvent,
862    ) -> aws_lambda_events::event::sqs::SqsBatchResponse {
863        durable::drive_sqs_batch(&self.state, event).await
864    }
865
866    /// Run this handler as a pure HTTP Lambda. Strict: any non-HTTP
867    /// event shape (SQS, DynamoDB stream, scheduler, …) causes
868    /// `lambda_http::run` to return a deserialization error on
869    /// invocation.
870    ///
871    /// Appropriate for Lambdas whose only trigger is a Function URL,
872    /// API Gateway, or ALB. The request Lambda in the two-Lambda
873    /// durable-executor topology is the canonical caller.
874    pub async fn run_http_only(self) -> Result<(), lambda_runtime::Error> {
875        let handler = Arc::new(self);
876        lambda_http::run(lambda_http::service_fn(move |event| {
877            let h = Arc::clone(&handler);
878            async move { h.handle(event).await }
879        }))
880        .await
881    }
882}
883
884// Default entrypoint when the `sqs` feature is off — `.run()` aliases
885// the HTTP-only runner. With the `sqs` feature enabled, `.run()` lives
886// in `durable.rs` and routes HTTP+SQS via the classifier.
887#[cfg(not(feature = "sqs"))]
888impl LambdaA2aHandler {
889    /// Default Lambda runner. Dispatches based on the event shape the
890    /// binary receives. Without the `sqs` feature this is HTTP-only
891    /// (equivalent to [`LambdaA2aHandler::run_http_only`]).
892    pub async fn run(self) -> Result<(), lambda_runtime::Error> {
893        self.run_http_only().await
894    }
895}
896
897impl LambdaA2aServerBuilder {
898    /// Build and run in one call. Sugar for
899    /// `self.build()?.run().await`. Hides the handler from the adopter —
900    /// use `.build()` + `handler.run_*()` instead when the handler is
901    /// needed for unit tests, custom middleware, or an explicit
902    /// topology runner.
903    pub async fn run(self) -> Result<(), lambda_runtime::Error> {
904        let handler = self.build().map_err(|e| {
905            lambda_runtime::Error::from(format!("LambdaA2aServerBuilder::run: {e}"))
906        })?;
907        handler.run().await
908    }
909}
910
911#[cfg(test)]
912mod handle_http_event_value_shape_tests {
913    //! Regression tests for the event-shape symmetry contract on
914    //! [`LambdaA2aHandler::handle_http_event_value`]. The wire-shape
915    //! mismatch (v2 response to a v1 invocation) is what causes API
916    //! Gateway REST integrations to return `502 Bad Gateway`. These
917    //! tests pin the contract: response envelope must always match
918    //! the inbound request envelope.
919
920    use super::*;
921    use std::sync::Arc;
922    use turul_a2a::executor::{AgentExecutor, ExecutionContext};
923    use turul_a2a::server::RuntimeConfig;
924    use turul_a2a::server::in_flight::InFlightRegistry;
925    use turul_a2a::storage::InMemoryA2aStorage;
926    use turul_a2a::streaming::TaskEventBroker;
927    use turul_a2a_types::{Message, Task};
928
929    struct NoOpExecutor;
930
931    #[async_trait::async_trait]
932    impl AgentExecutor for NoOpExecutor {
933        async fn execute(
934            &self,
935            task: &mut Task,
936            _msg: &Message,
937            _ctx: &ExecutionContext,
938        ) -> Result<(), turul_a2a::error::A2aError> {
939            let mut proto = task.as_proto().clone();
940            proto.status = Some(turul_a2a_proto::TaskStatus {
941                state: turul_a2a_proto::TaskState::Completed.into(),
942                message: None,
943                timestamp: None,
944            });
945            *task = Task::try_from(proto).unwrap();
946            Ok(())
947        }
948        fn agent_card(&self) -> turul_a2a_proto::AgentCard {
949            turul_a2a_proto::AgentCard {
950                name: "test-agent".to_string(),
951                ..Default::default()
952            }
953        }
954    }
955
956    fn build_handler() -> LambdaA2aHandler {
957        let s = Arc::new(InMemoryA2aStorage::new());
958        let state = AppState {
959            executor: Arc::new(NoOpExecutor),
960            task_storage: s.clone(),
961            push_storage: s.clone(),
962            event_store: s.clone(),
963            atomic_store: s.clone(),
964            event_broker: TaskEventBroker::new(),
965            middleware_stack: Arc::new(MiddlewareStack::new(vec![])),
966            runtime_config: RuntimeConfig::default(),
967            in_flight: Arc::new(InFlightRegistry::new()),
968            cancellation_supervisor: s.clone(),
969            push_delivery_store: None,
970            push_dispatcher: None,
971            durable_executor_queue: None,
972        };
973        let router = build_router(state.clone());
974        LambdaA2aHandler {
975            router,
976            state,
977            path_prefix: None,
978        }
979    }
980
981    /// API Gateway v2 / Function URL event. `requestContext.http.method`
982    /// is what `LambdaRequest::ApiGatewayV2` keys off.
983    fn apigw_v2_agent_card_get() -> serde_json::Value {
984        serde_json::json!({
985            "version": "2.0",
986            "routeKey": "GET /.well-known/agent-card.json",
987            "rawPath": "/.well-known/agent-card.json",
988            "rawQueryString": "",
989            "headers": {"accept": "application/json", "a2a-version": "1.0"},
990            "requestContext": {
991                "accountId": "000000000000",
992                "apiId": "api",
993                "domainName": "fake.execute-api",
994                "domainPrefix": "fake",
995                "http": {
996                    "method": "GET",
997                    "path": "/.well-known/agent-card.json",
998                    "protocol": "HTTP/1.1",
999                    "sourceIp": "127.0.0.1",
1000                    "userAgent": "test"
1001                },
1002                "requestId": "rid",
1003                "routeKey": "GET /.well-known/agent-card.json",
1004                "stage": "$default",
1005                "time": "01/Jan/2026:00:00:00 +0000",
1006                "timeEpoch": 1_735_689_600_000i64
1007            },
1008            "isBase64Encoded": false
1009        })
1010    }
1011
1012    /// API Gateway v1 (REST) event. Top-level `httpMethod` + `path` and
1013    /// `requestContext.identity` (no `requestContext.http`) is what
1014    /// `LambdaRequest::ApiGatewayV1` keys off. This is the shape REST
1015    /// API + AWS_PROXY integrations send into the Lambda — the live
1016    /// fleet topology that 502'd against a hardcoded v2 response.
1017    ///
1018    /// `stage` is `$default` here so `lambda_http` does not prepend a
1019    /// stage prefix — this fixture exists to pin the response-shape
1020    /// contract, not the path-rewriting behaviour. See the
1021    /// `strip_path_prefix_*` tests for stage-prefix coverage.
1022    fn apigw_v1_agent_card_get_with_stage(stage: &str, path: &str) -> serde_json::Value {
1023        let request_context_path = if stage == "$default" {
1024            path.to_string()
1025        } else {
1026            format!("/{stage}{path}")
1027        };
1028        serde_json::json!({
1029            "resource": "/{proxy+}",
1030            "path": path,
1031            "httpMethod": "GET",
1032            "headers": {"accept": "application/json", "a2a-version": "1.0"},
1033            "multiValueHeaders": {},
1034            "queryStringParameters": null,
1035            "multiValueQueryStringParameters": null,
1036            "pathParameters": {"proxy": path.trim_start_matches('/').to_string()},
1037            "stageVariables": null,
1038            "requestContext": {
1039                "accountId": "000000000000",
1040                "apiId": "abc123",
1041                "domainName": "fake.execute-api",
1042                "domainPrefix": "fake",
1043                "extendedRequestId": "xrid",
1044                "httpMethod": "GET",
1045                "identity": {
1046                    "sourceIp": "127.0.0.1",
1047                    "userAgent": "test"
1048                },
1049                "path": request_context_path,
1050                "protocol": "HTTP/1.1",
1051                "requestId": "rid",
1052                "requestTime": "01/Jan/2026:00:00:00 +0000",
1053                "requestTimeEpoch": 1_735_689_600_000i64,
1054                "resourceId": "rsrc",
1055                "resourcePath": "/{proxy+}",
1056                "stage": stage
1057            },
1058            "body": null,
1059            "isBase64Encoded": false
1060        })
1061    }
1062
1063    fn apigw_v1_agent_card_get() -> serde_json::Value {
1064        apigw_v1_agent_card_get_with_stage("$default", "/.well-known/agent-card.json")
1065    }
1066
1067    /// ALB target-group event. `requestContext.elb` is what
1068    /// `LambdaRequest::Alb` keys off.
1069    fn alb_agent_card_get() -> serde_json::Value {
1070        serde_json::json!({
1071            "requestContext": {
1072                "elb": {
1073                    "targetGroupArn": "arn:aws:elasticloadbalancing:ap-southeast-2:000:targetgroup/fake/00"
1074                }
1075            },
1076            "httpMethod": "GET",
1077            "path": "/.well-known/agent-card.json",
1078            "queryStringParameters": {},
1079            "headers": {"accept": "application/json", "a2a-version": "1.0"},
1080            "body": "",
1081            "isBase64Encoded": false
1082        })
1083    }
1084
1085    /// REST v1 inbound MUST produce a v1 response shape. This is the
1086    /// regression for the live-fleet 502.
1087    #[tokio::test]
1088    async fn v1_inbound_event_emits_v1_response_envelope() {
1089        let handler = build_handler();
1090        let resp_value = handler
1091            .handle_http_event_value(apigw_v1_agent_card_get())
1092            .await
1093            .expect("handle_http_event_value(v1) must succeed");
1094
1095        // Parses cleanly as v1.
1096        let v1: aws_lambda_events::apigw::ApiGatewayProxyResponse =
1097            serde_json::from_value(resp_value.clone())
1098                .expect("response must deserialize as ApiGatewayProxyResponse (v1)");
1099        assert_eq!(v1.status_code, 200);
1100        match v1.body {
1101            Some(aws_lambda_events::encodings::Body::Text(s)) => {
1102                assert!(s.contains("\"name\""), "v1 body shape: {s}");
1103            }
1104            other => panic!("expected text body, got {other:?}"),
1105        }
1106
1107        // Negative: v1 response JSON must NOT contain the v2-only
1108        // `cookies` field. Presence of `cookies` is API Gateway REST's
1109        // 502 trigger.
1110        assert!(
1111            resp_value.get("cookies").is_none(),
1112            "v1 response must not carry v2-only `cookies` field; payload: {resp_value}"
1113        );
1114    }
1115
1116    /// API Gateway v2 / Function URL inbound MUST produce a v2 response
1117    /// shape (current behaviour, locked in to prevent the inverse
1118    /// regression).
1119    #[tokio::test]
1120    async fn v2_inbound_event_emits_v2_response_envelope() {
1121        let handler = build_handler();
1122        let resp_value = handler
1123            .handle_http_event_value(apigw_v2_agent_card_get())
1124            .await
1125            .expect("handle_http_event_value(v2) must succeed");
1126
1127        let v2: aws_lambda_events::apigw::ApiGatewayV2httpResponse =
1128            serde_json::from_value(resp_value.clone())
1129                .expect("response must deserialize as ApiGatewayV2httpResponse (v2)");
1130        assert_eq!(v2.status_code, 200);
1131        match v2.body {
1132            Some(aws_lambda_events::encodings::Body::Text(s)) => {
1133                assert!(s.contains("\"name\""), "v2 body shape: {s}");
1134            }
1135            other => panic!("expected text body, got {other:?}"),
1136        }
1137    }
1138
1139    /// ALB inbound MUST produce an ALB response shape.
1140    #[tokio::test]
1141    async fn alb_inbound_event_emits_alb_response_envelope() {
1142        let handler = build_handler();
1143        let resp_value = handler
1144            .handle_http_event_value(alb_agent_card_get())
1145            .await
1146            .expect("handle_http_event_value(alb) must succeed");
1147
1148        let alb: aws_lambda_events::alb::AlbTargetGroupResponse =
1149            serde_json::from_value(resp_value)
1150                .expect("response must deserialize as AlbTargetGroupResponse");
1151        assert_eq!(alb.status_code, 200);
1152    }
1153
1154    /// Live-fleet end-to-end regression: REST v1 stage-prefixed event
1155    /// + `strip_path_prefix("/dev")` on the handler routes through
1156    /// the agent-card endpoint AND emits a v1 response envelope. This
1157    /// is the exact scenario the live fleet was 502'ing on.
1158    #[tokio::test]
1159    async fn v1_stage_prefix_inbound_with_strip_emits_v1_response() {
1160        let s = Arc::new(InMemoryA2aStorage::new());
1161        let state = AppState {
1162            executor: Arc::new(NoOpExecutor),
1163            task_storage: s.clone(),
1164            push_storage: s.clone(),
1165            event_store: s.clone(),
1166            atomic_store: s.clone(),
1167            event_broker: TaskEventBroker::new(),
1168            middleware_stack: Arc::new(MiddlewareStack::new(vec![])),
1169            runtime_config: RuntimeConfig::default(),
1170            in_flight: Arc::new(InFlightRegistry::new()),
1171            cancellation_supervisor: s.clone(),
1172            push_delivery_store: None,
1173            push_dispatcher: None,
1174            durable_executor_queue: None,
1175        };
1176        let router = build_router(state.clone());
1177        let handler = LambdaA2aHandler {
1178            router,
1179            state,
1180            path_prefix: Some(Arc::from("/dev")),
1181        };
1182
1183        let event = apigw_v1_agent_card_get_with_stage("dev", "/.well-known/agent-card.json");
1184        let resp_value = handler
1185            .handle_http_event_value(event)
1186            .await
1187            .expect("v1 stage-prefixed event must succeed under strip_path_prefix");
1188
1189        let v1: aws_lambda_events::apigw::ApiGatewayProxyResponse =
1190            serde_json::from_value(resp_value)
1191                .expect("response must deserialize as ApiGatewayProxyResponse");
1192        assert_eq!(v1.status_code, 200);
1193        match v1.body {
1194            Some(aws_lambda_events::encodings::Body::Text(s)) => {
1195                assert!(s.contains("\"name\""), "v1 stage-prefix body: {s}");
1196            }
1197            other => panic!("expected text body, got {other:?}"),
1198        }
1199    }
1200
1201    /// Cross-shape decoding regression: a v1 response JSON carries
1202    /// `multiValueHeaders` (REST shape) but no `cookies`; a v2 response
1203    /// carries `cookies` but no v1-only `multiValueHeaders` semantics.
1204    /// Confirms the two shapes are wire-distinguishable, which is why
1205    /// the gateway 502s on a mismatch.
1206    #[tokio::test]
1207    async fn v1_response_does_not_match_v2_envelope_strictly() {
1208        let handler = build_handler();
1209        let v1_value = handler
1210            .handle_http_event_value(apigw_v1_agent_card_get())
1211            .await
1212            .unwrap();
1213
1214        // Sanity: v1 response JSON has no v2-only `cookies` field.
1215        assert!(v1_value.get("cookies").is_none());
1216
1217        let v2_value = handler
1218            .handle_http_event_value(apigw_v2_agent_card_get())
1219            .await
1220            .unwrap();
1221        // Sanity: v2 response JSON carries the v2 `cookies` field
1222        // (default empty array — still present on the wire).
1223        assert!(
1224            v2_value.get("cookies").is_some(),
1225            "v2 response should carry the v2-only cookies field"
1226        );
1227    }
1228}