Skip to main content

turul_a2a_aws_lambda/
lib.rs

1//! AWS Lambda adapter for turul-a2a.
2//!
3//! # Choosing a runner
4//!
5//! A Lambda binary receives raw event JSON; the adapter owns
6//! classifying that JSON and dispatching to the right A2A handler
7//! method. Adopters pick the runner whose name matches the Lambda's
8//! AWS trigger topology and end `main.rs` with one `.await?` call —
9//! `lambda_runtime` / `lambda_http` / event parsing never appear in
10//! adopter code.
11//!
12//! | Lambda triggers                   | Runner                                         |
13//! |-----------------------------------|------------------------------------------------|
14//! | HTTP only (Function URL / APIGW)  | [`LambdaA2aHandler::run_http_only`]            |
15//! | SQS only (event source mapping)   | [`LambdaA2aHandler::run_sqs_only`] *(`sqs`)*   |
16//! | HTTP **and** SQS on one function  | [`LambdaA2aHandler::run_http_and_sqs`] *(`sqs`)* |
17//! | not sure / just run it            | [`LambdaA2aHandler::run`]                      |
18//!
19//! [`LambdaA2aHandler::run`] is the default "it just works" entry
20//! point: without the `sqs` feature it aliases `run_http_only`; with
21//! `sqs` it aliases `run_http_and_sqs` (permissive, handles either
22//! shape). The explicit runners are **strict** — a non-matching
23//! event shape fails loudly, which is what hardened deployments
24//! and tests want.
25//!
26//! The one-call shortcut is [`LambdaA2aServerBuilder::run`] — it
27//! build-then-runs in a single fluent call. Use `.build()` + a
28//! handler method when the handler is needed for unit tests or
29//! custom middleware.
30//!
31//! ## Composing with non-framework triggers
32//!
33//! Lambdas wired to HTTP **plus** a third trigger the framework does
34//! not know about (EventBridge scheduler, DynamoDB stream, custom
35//! invoke) can't use a named runner directly — the Unknown branch
36//! is adopter-owned. Reach for the public classifier + the HTTP
37//! envelope primitive instead:
38//!
39//! ```ignore
40//! lambda_runtime::run(lambda_runtime::service_fn(
41//!     move |event: lambda_runtime::LambdaEvent<serde_json::Value>| {
42//!         let handler = handler.clone();
43//!         async move {
44//!             let (value, _ctx) = event.into_parts();
45//!             match turul_a2a_aws_lambda::classify_event(&value) {
46//!                 turul_a2a_aws_lambda::LambdaEvent::Http => {
47//!                     handler.handle_http_event_value(value).await
48//!                 }
49//!                 turul_a2a_aws_lambda::LambdaEvent::Sqs => {
50//!                     let sqs = serde_json::from_value(value)?;
51//!                     Ok(serde_json::to_value(handler.handle_sqs(sqs).await)?)
52//!                 }
53//!                 turul_a2a_aws_lambda::LambdaEvent::Unknown => {
54//!                     // Adopter-owned: run your scheduled sweep,
55//!                     // DynamoDB stream consumer, etc.
56//!                     run_adopter_specific_path(value).await
57//!                 }
58//!             }
59//!         }
60//!     }
61//! )).await
62//! ```
63//!
64//! [`LambdaA2aHandler::handle_http_event_value`] is available without
65//! the `sqs` feature; `handle_sqs` is gated on `sqs`. `classify_event`
66//! + `LambdaEvent` are always available.
67//!
68//! ## API Gateway path prefix
69//!
70//! When Lambda sits behind a REST API Gateway with `AWS_PROXY`
71//! integration, the event carries the full stage + resource tree in
72//! the path (e.g. `/stage/agent/message:send`),
73//! which the root-rooted A2A router would 404. Configure the prefix
74//! to strip via [`LambdaA2aServerBuilder::strip_path_prefix`]:
75//!
76//! ```ignore
77//! LambdaA2aServerBuilder::new()
78//!     .executor(my_executor)
79//!     .storage(my_storage)
80//!     .strip_path_prefix("/stage/agent")  // API GW prefix
81//!     .run()
82//!     .await
83//! ```
84//!
85//! The strip applies to `run_http_only` and `run_http_and_sqs`.
86//! SQS events are unaffected. Non-matching paths pass through
87//! unchanged (router still 404s on genuinely unknown paths —
88//! that's the correct failure mode).
89//!
90//! ```ignore
91//! // HTTP-only Lambda (request Lambda in two-Lambda topology):
92//! LambdaA2aServerBuilder::new()
93//!     .executor(my_executor)
94//!     .storage(my_storage)
95//!     .with_sqs_return_immediately(queue_url, sqs)  // enqueues durable jobs
96//!     .build()?
97//!     .run_http_only()
98//!     .await
99//!
100//! // Single-function HTTP+SQS demo:
101//! LambdaA2aServerBuilder::new()
102//!     .executor(my_executor)
103//!     .storage(my_storage)
104//!     .with_sqs_return_immediately(queue_url, sqs)
105//!     .run()            // builder shortcut; dispatches HTTP + SQS
106//!     .await
107//!
108//! // Pure SQS worker — no with_sqs_return_immediately(...) needed
109//! // (it consumes the queue, never enqueues):
110//! LambdaA2aServerBuilder::new()
111//!     .executor(my_executor)
112//!     .storage(my_storage)
113//!     .build()?
114//!     .run_sqs_only()
115//!     .await
116//! ```
117//!
118//! # Adapter internals
119//!
120//! Thin wrapper: converts Lambda events to axum requests, delegates to
121//! the same Router, converts responses back. Adapter contract:
122//! - Authorizer context mapped via synthetic headers with anti-spoofing.
123//! - SSE responses are buffered: task executes, events are collected,
124//!   returned as one response.
125//! - `POST /message:stream` executes the task within the Lambda
126//!   invocation and returns all events.
127//! - `GET /tasks/{id}:subscribe` is for tasks that are **not** in a
128//!   terminal state. Within one invocation it emits the initial `Task`
129//!   snapshot, replays stored events via `Last-Event-ID`, and closes
130//!   when the task reaches a terminal state. Subscribing to an
131//!   already-terminal task returns `UnsupportedOperationError` per
132//!   A2A v1.0 §3.1.6. For retrieving a terminal task's final state
133//!   use `GetTask`.
134//!
135//! # Push-notification delivery — external triggers are mandatory
136//!
137//! Push delivery on Lambda is architecturally different from the
138//! binary server. The request Lambda installed via
139//! [`LambdaA2aServerBuilder`] still constructs a `PushDispatcher`
140//! when `push_delivery_store` is wired, but any `tokio::spawn`
141//! continuation it emits post-return is **opportunistic only** — the
142//! Lambda execution environment may be frozen indefinitely between
143//! invocations, so nothing can depend on that continuation completing.
144//!
145//! Correctness for push delivery on Lambda is carried by:
146//!
147//! 1. The atomic pending-dispatch marker written inside the request
148//!    Lambda's commit transaction (opt in via
149//!    `StorageImpl::with_push_dispatch_enabled(true)`).
150//! 2. [`LambdaStreamRecoveryHandler`] — DynamoDB Streams trigger on
151//!    `a2a_push_pending_dispatches`. DynamoDB backends only.
152//! 3. [`LambdaScheduledRecoveryHandler`] — EventBridge Scheduler
153//!    backstop. **Required for all backends**; it is the sole
154//!    recovery path for SQLite / PostgreSQL / in-memory deployments.
155//!
156//! Without at least the scheduled worker, push delivery on Lambda is
157//! not durable — a marker written on a cold invocation may never be
158//! consumed. The example wiring lives in
159//! `examples/lambda-stream-worker` and
160//! `examples/lambda-scheduled-worker`.
161//!
162//! Lambda streaming is request-scoped (not persistent SSE connections). The durable
163//! event store ensures events survive across invocations. Clients reconnect with
164//! `Last-Event-ID` for continuation.
165//!
166//! # Cross-instance cancellation
167//!
168//! Lambda invocations are stateless and short-lived, so the Lambda
169//! adapter does **not** run the persistent cross-instance cancel
170//! poller that `A2aServer::run()` spawns. Cancellation behaviour on
171//! the Lambda adapter:
172//!
173//! - **Marker writes** — `CancelTask` on a Lambda invocation writes
174//!   the cancel marker to the shared backend (DynamoDB / PostgreSQL).
175//!   This works.
176//! - **Propagation to a live executor on the SAME Lambda invocation** —
177//!   works via the same-instance token-trip path in `core_cancel_task`.
178//! - **Propagation to a live executor on a DIFFERENT Lambda invocation
179//!   (warm container)** — **not currently supported**. There is no
180//!   persistent poller to observe markers written by other
181//!   invocations. A subsequent invocation whose handler reads the
182//!   marker directly may act on it, but that is not a substitute for
183//!   the server runtime's live propagation.
184//!
185//! The builder still **requires** an `A2aCancellationSupervisor`
186//! implementation on the same backend so that marker writes reach
187//! the correct backend and a future polling-adapter variant can
188//! consume them. If a deployment passes a non-matching supervisor,
189//! `build()` rejects the configuration.
190
191mod adapter;
192mod auth;
193mod event;
194mod no_streaming;
195mod scheduled_recovery;
196mod stream_recovery;
197
198#[cfg(feature = "sqs")]
199mod durable;
200
201#[cfg(test)]
202mod builder_tests;
203#[cfg(test)]
204mod scheduled_recovery_tests;
205#[cfg(test)]
206mod stream_recovery_tests;
207
208pub use adapter::{axum_to_lambda_response, lambda_to_axum_request};
209pub use auth::{AuthorizerMapping, LambdaAuthorizerMiddleware};
210pub use event::{LambdaEvent, classify_event};
211pub use no_streaming::NoStreamingLayer;
212pub use scheduled_recovery::{
213    LambdaScheduledRecoveryConfig, LambdaScheduledRecoveryHandler, LambdaScheduledRecoveryResponse,
214};
215pub use stream_recovery::LambdaStreamRecoveryHandler;
216
217#[cfg(feature = "sqs")]
218pub use durable::{SqsDurableExecutorQueue, drive_sqs_batch};
219
220use std::sync::Arc;
221
222use turul_a2a::error::A2aError;
223use turul_a2a::executor::AgentExecutor;
224use turul_a2a::middleware::{A2aMiddleware, MiddlewareStack};
225use turul_a2a::push::A2aPushDeliveryStore;
226use turul_a2a::router::{AppState, build_router};
227use turul_a2a::server::RuntimeConfig;
228use turul_a2a::storage::{
229    A2aAtomicStore, A2aCancellationSupervisor, A2aEventStore, A2aPushNotificationStorage,
230    A2aTaskStorage,
231};
232use turul_a2a::streaming::TaskEventBroker;
233
234/// Builder for Lambda A2A handler.
235///
236/// Use `.storage(my_storage)` to supply a single backend implementing all storage traits.
237/// All four storage traits must share the same backend so streaming, push delivery,
238/// and cross-instance cancellation coordinate correctly.
239///
240/// For push-notification delivery, wire `.storage()` with a backend that implements
241/// [`A2aPushDeliveryStore`] (all four first-party backends do). The atomic store
242/// must also opt in via `with_push_dispatch_enabled(true)`; the
243/// builder rejects configurations where the flag and the delivery store disagree.
244pub struct LambdaA2aServerBuilder {
245    executor: Option<Arc<dyn AgentExecutor>>,
246    task_storage: Option<Arc<dyn A2aTaskStorage>>,
247    push_storage: Option<Arc<dyn A2aPushNotificationStorage>>,
248    event_store: Option<Arc<dyn A2aEventStore>>,
249    atomic_store: Option<Arc<dyn A2aAtomicStore>>,
250    cancellation_supervisor: Option<Arc<dyn A2aCancellationSupervisor>>,
251    push_delivery_store: Option<Arc<dyn A2aPushDeliveryStore>>,
252    middleware: Vec<Arc<dyn A2aMiddleware>>,
253    runtime_config: RuntimeConfig,
254    durable_executor_queue: Option<Arc<dyn turul_a2a::durable_executor::DurableExecutorQueue>>,
255    path_prefix: Option<String>,
256}
257
258impl LambdaA2aServerBuilder {
259    pub fn new() -> Self {
260        Self {
261            executor: None,
262            task_storage: None,
263            push_storage: None,
264            event_store: None,
265            atomic_store: None,
266            cancellation_supervisor: None,
267            push_delivery_store: None,
268            middleware: vec![],
269            runtime_config: RuntimeConfig::default(),
270            durable_executor_queue: None,
271            path_prefix: None,
272        }
273    }
274
275    /// Configure an HTTP path prefix to strip before the A2A router
276    /// sees the request. Needed when Lambda sits behind an API Gateway
277    /// whose stage + resource tree is forwarded into the function
278    /// (e.g. REST API `AWS_PROXY` integrations surface
279    /// `/stage/agent/message:send` where the A2A
280    /// router expects `/message:send`).
281    ///
282    /// The prefix must start with `/` and must not end with `/`
283    /// (unless it is exactly `/`, which is a no-op). Segment boundaries
284    /// are respected — a prefix of `/dev` will not strip `/devs/...`.
285    /// Paths that do not start with the prefix pass through unchanged
286    /// (the router will 404 on a genuinely unknown path, which is the
287    /// correct failure mode). Applies to both
288    /// [`LambdaA2aHandler::run_http_only`] and
289    /// [`LambdaA2aHandler::run_http_and_sqs`]. SQS events are
290    /// unaffected.
291    pub fn strip_path_prefix(mut self, prefix: impl Into<String>) -> Self {
292        self.path_prefix = Some(prefix.into());
293        self
294    }
295
296    /// Wire a durable executor queue. When set, the
297    /// `return_immediately = true` path in `core_send_message` enqueues
298    /// a [`turul_a2a::durable_executor::QueuedExecutorJob`] on this
299    /// queue instead of spawning the executor locally, and the
300    /// capability flag `RuntimeConfig::supports_return_immediately` is
301    /// turned **back on** as an implementation detail — the capability
302    /// cannot be claimed without supplying the queue.
303    ///
304    /// Adopters should typically reach for the SQS-specific helper
305    /// [`Self::with_sqs_return_immediately`] instead; this generic
306    /// method exists so adopters can inject in-memory fakes for tests
307    /// or provide non-SQS transports (Kinesis, Step Functions task
308    /// token, self-invoke, etc.).
309    pub fn with_durable_executor(
310        mut self,
311        queue: Arc<dyn turul_a2a::durable_executor::DurableExecutorQueue>,
312    ) -> Self {
313        self.durable_executor_queue = Some(queue);
314        self.runtime_config.supports_return_immediately = true;
315        self
316    }
317
318    /// Wire the AWS SQS durable executor queue. Ergonomic
319    /// helper over [`Self::with_durable_executor`] that constructs a
320    /// [`SqsDurableExecutorQueue`] from a `queue_url` + shared SQS
321    /// client. Requires the `sqs` feature.
322    #[cfg(feature = "sqs")]
323    pub fn with_sqs_return_immediately(
324        self,
325        queue_url: impl Into<String>,
326        sqs_client: Arc<aws_sdk_sqs::Client>,
327    ) -> Self {
328        let queue = Arc::new(SqsDurableExecutorQueue::new(queue_url, sqs_client));
329        self.with_durable_executor(queue)
330    }
331
332    pub fn executor(mut self, exec: impl AgentExecutor + 'static) -> Self {
333        self.executor = Some(Arc::new(exec));
334        self
335    }
336
337    /// Set all storage from a single backend instance.
338    ///
339    /// This is the **preferred** method — a single struct implementing all
340    /// storage traits guarantees the same-backend requirement
341    /// AND ensures the cancellation supervisor reads the same backend
342    /// the cancel marker is written to. A mismatch here (e.g., DynamoDB
343    /// task storage + in-memory cancellation supervisor) silently
344    /// breaks cross-instance cancellation.
345    ///
346    /// errata: `.storage()` wires storage traits only. It does
347    /// **not** auto-register the storage as a push-delivery store, even if
348    /// the backend happens to implement [`A2aPushDeliveryStore`]. To opt in
349    /// to push delivery, call [`Self::push_delivery_store`] explicitly and
350    /// call `with_push_dispatch_enabled(true)` on the storage instance
351    /// before passing it here. Non-push deployments need neither.
352    pub fn storage<S>(mut self, storage: S) -> Self
353    where
354        S: A2aTaskStorage
355            + A2aPushNotificationStorage
356            + A2aEventStore
357            + A2aAtomicStore
358            + A2aCancellationSupervisor
359            + Clone
360            + 'static,
361    {
362        self.task_storage = Some(Arc::new(storage.clone()));
363        self.push_storage = Some(Arc::new(storage.clone()));
364        self.event_store = Some(Arc::new(storage.clone()));
365        self.atomic_store = Some(Arc::new(storage.clone()));
366        self.cancellation_supervisor = Some(Arc::new(storage));
367        self
368    }
369
370    /// Set task storage individually. Must be paired with event_store/atomic_store
371    /// AND `cancellation_supervisor()` for same-backend compliance.
372    pub fn task_storage(mut self, s: impl A2aTaskStorage + 'static) -> Self {
373        self.task_storage = Some(Arc::new(s));
374        self
375    }
376
377    /// Set push notification storage individually.
378    pub fn push_storage(mut self, s: impl A2aPushNotificationStorage + 'static) -> Self {
379        self.push_storage = Some(Arc::new(s));
380        self
381    }
382
383    /// Set event store individually. Must match task_storage backend.
384    pub fn event_store(mut self, s: impl A2aEventStore + 'static) -> Self {
385        self.event_store = Some(Arc::new(s));
386        self
387    }
388
389    /// Set atomic store individually. Must match task_storage backend.
390    pub fn atomic_store(mut self, s: impl A2aAtomicStore + 'static) -> Self {
391        self.atomic_store = Some(Arc::new(s));
392        self
393    }
394
395    /// Set the cancellation supervisor individually. Must match
396    /// task_storage backend — otherwise `:cancel` writes the marker to
397    /// one backend and the supervisor reads from another, silently
398    /// breaking cross-instance cancellation. Prefer `.storage()` for
399    /// unified wiring. Consumed by `core_cancel_task`.
400    pub fn cancellation_supervisor(mut self, s: impl A2aCancellationSupervisor + 'static) -> Self {
401        self.cancellation_supervisor = Some(Arc::new(s));
402        self
403    }
404
405    /// Set the push delivery store individually.
406    ///
407    /// Prefer `.storage()` for unified wiring. The delivery store MUST
408    /// be on the same backend as `.task_storage(...)` — the builder
409    /// rejects mismatches. Passing a delivery store also requires the
410    /// atomic store to have opted in via `with_push_dispatch_enabled(true)`
411    ///.
412    pub fn push_delivery_store(mut self, store: impl A2aPushDeliveryStore + 'static) -> Self {
413        self.push_delivery_store = Some(Arc::new(store));
414        self
415    }
416
417    /// Override the runtime configuration (timeouts, retry budgets,
418    /// push tuning). Defaults to [`RuntimeConfig::default()`]. The
419    /// builder validates push settings (claim expiry vs retry horizon,
420    /// ) when `.push_delivery_store(...)` is wired.
421    pub fn runtime_config(mut self, cfg: RuntimeConfig) -> Self {
422        self.runtime_config = cfg;
423        self
424    }
425
426    pub fn middleware(mut self, mw: Arc<dyn A2aMiddleware>) -> Self {
427        self.middleware.push(mw);
428        self
429    }
430
431    pub fn build(self) -> Result<LambdaA2aHandler, A2aError> {
432        let executor = self
433            .executor
434            .ok_or(A2aError::Internal("executor is required".into()))?;
435        let task_storage = self.task_storage.ok_or(A2aError::Internal(
436            "task_storage is required for Lambda".into(),
437        ))?;
438        let push_storage = self.push_storage.ok_or(A2aError::Internal(
439            "push_storage is required for Lambda".into(),
440        ))?;
441        let event_store: Arc<dyn A2aEventStore> = self.event_store.ok_or(A2aError::Internal(
442            "event_store is required for Lambda (use .storage() for unified backend)".into(),
443        ))?;
444        let atomic_store: Arc<dyn A2aAtomicStore> = self.atomic_store.ok_or(A2aError::Internal(
445            "atomic_store is required for Lambda (use .storage() for unified backend)".into(),
446        ))?;
447        // the cancellation supervisor MUST be supplied. No
448        // InMemoryA2aStorage fallback: that would silently break
449        // cross-instance cancellation by reading markers from a
450        // different backend than they were written to.
451        let cancellation_supervisor: Arc<dyn A2aCancellationSupervisor> =
452            self.cancellation_supervisor.ok_or(A2aError::Internal(
453                "cancellation_supervisor is required for Lambda. Use .storage() for unified \
454                 backend wiring, or .cancellation_supervisor(...) alongside the individual \
455                 storage setters. Omitting it silently breaks cross-instance cancellation."
456                    .into(),
457            ))?;
458
459        // Same-backend enforcement: all storage traits + the
460        // cancellation supervisor must report the same `backend_name`.
461        let task_backend = task_storage.backend_name();
462        let push_backend = push_storage.backend_name();
463        let event_backend = event_store.backend_name();
464        let atomic_backend = atomic_store.backend_name();
465        let supervisor_backend = cancellation_supervisor.backend_name();
466        if task_backend != push_backend
467            || task_backend != event_backend
468            || task_backend != atomic_backend
469            || task_backend != supervisor_backend
470        {
471            return Err(A2aError::Internal(format!(
472                "Storage backend mismatch: task={task_backend}, push={push_backend}, \
473                 event={event_backend}, atomic={atomic_backend}, \
474                 cancellation_supervisor={supervisor_backend}. \
475  requires all storage traits to share the same backend. \
476  requires the cancellation supervisor on the same backend \
477                 so cross-instance cancel markers are observable. \
478                 Use .storage() for unified backend."
479            )));
480        }
481
482        let push_delivery_store = self.push_delivery_store;
483
484        // Push-dispatch consistency: the atomic store's
485        // opt-in flag and the presence of `push_delivery_store` MUST
486        // agree. Both-true = push fully wired; both-false = non-push
487        // deployment. Mixed cases are build errors — this mirrors the
488        // main server builder (server/mod.rs).
489        match (
490            push_delivery_store.is_some(),
491            atomic_store.push_dispatch_enabled(),
492        ) {
493            (true, true) | (false, false) => {}
494            (true, false) => {
495                return Err(A2aError::Internal(
496                    "push_delivery_store wired but atomic_store.push_dispatch_enabled() \
497                     is false. Call .with_push_dispatch_enabled(true) on the backend \
498                     storage before passing it to .storage()."
499                        .into(),
500                ));
501            }
502            (false, true) => {
503                return Err(A2aError::Internal(
504                    "atomic_store.push_dispatch_enabled() is true but no \
505                     push_delivery_store is wired. Pending-dispatch markers would be \
506                     written with no consumer, imposing load-bearing infra for no \
507                     benefit. If you need to populate markers for an external \
508                     consumer, open an issue for a distinctly-named opt-in — for now, \
509                     this configuration is rejected."
510                        .into(),
511                ));
512            }
513        }
514
515        // §Decision Bug 1: the Lambda execution environment
516        // freezes after the HTTP response is flushed, so any
517        // `tokio::spawn`'d executor continuation is opportunistic only
518        //. Refuse `SendMessageConfiguration.return_immediately
519        // = true` at the `core_send_message` entry point via this
520        // capability flag instead of silently orphaning the executor.
521        //
522        // if a durable executor queue is wired via
523        // [`Self::with_durable_executor`] (or the SQS helper), the
524        // capability is re-enabled because enqueueing is durable
525        // across the Lambda freeze. The explicit re-enable lives in
526        // the builder method so the capability cannot be claimed
527        // without the mechanism.
528        let mut runtime_config = self.runtime_config;
529        if self.durable_executor_queue.is_none() {
530            runtime_config.supports_return_immediately = false;
531        }
532
533        // Push dispatcher wiring. When
534        // `push_delivery_store` is present, validate its backend matches,
535        // validate the retry-horizon invariant, and construct the
536        // `PushDispatcher`. Under Lambda the dispatcher runs
537        // opportunistically post-return; correctness
538        // comes from the atomic marker + stream/scheduler recovery.
539        let push_dispatcher: Option<Arc<turul_a2a::push::PushDispatcher>> =
540            if let Some(delivery) = push_delivery_store.as_ref() {
541                let push_delivery_backend = delivery.backend_name();
542                if task_backend != push_delivery_backend {
543                    return Err(A2aError::Internal(format!(
544                        "Storage backend mismatch: task={task_backend}, \
545                         push_delivery={push_delivery_backend}. \
546  requires all storage traits to share the same backend."
547                    )));
548                }
549
550                // claim expiry must exceed the worst-case
551                // retry horizon so a live claim is not mis-classified as
552                // stale mid-retry.
553                let retry_horizon = runtime_config
554                    .push_backoff_cap
555                    .saturating_mul(runtime_config.push_max_attempts as u32);
556                if runtime_config.push_claim_expiry <= retry_horizon {
557                    return Err(A2aError::Internal(format!(
558                        "push_claim_expiry ({:?}) must be greater than retry horizon \
559                         (push_max_attempts={} * push_backoff_cap={:?} = {:?}). \
560                         Raise push_claim_expiry or lower push_max_attempts/push_backoff_cap.",
561                        runtime_config.push_claim_expiry,
562                        runtime_config.push_max_attempts,
563                        runtime_config.push_backoff_cap,
564                        retry_horizon
565                    )));
566                }
567
568                let mut delivery_cfg = turul_a2a::push::delivery::PushDeliveryConfig::default();
569                delivery_cfg.max_attempts = runtime_config.push_max_attempts as u32;
570                delivery_cfg.backoff_base = runtime_config.push_backoff_base;
571                delivery_cfg.backoff_cap = runtime_config.push_backoff_cap;
572                delivery_cfg.backoff_jitter = runtime_config.push_backoff_jitter;
573                delivery_cfg.request_timeout = runtime_config.push_request_timeout;
574                delivery_cfg.connect_timeout = runtime_config.push_connect_timeout;
575                delivery_cfg.read_timeout = runtime_config.push_read_timeout;
576                delivery_cfg.claim_expiry = runtime_config.push_claim_expiry;
577                delivery_cfg.max_payload_bytes = runtime_config.push_max_payload_bytes;
578                delivery_cfg.allow_insecure_urls = runtime_config.allow_insecure_push_urls;
579
580                let instance_id = format!("a2a-lambda-{}", uuid::Uuid::now_v7());
581                let worker = turul_a2a::push::delivery::PushDeliveryWorker::new(
582                    delivery.clone(),
583                    delivery_cfg,
584                    None,
585                    instance_id,
586                )
587                .map_err(|e| A2aError::Internal(format!("push worker build failed: {e}")))?;
588
589                Some(Arc::new(turul_a2a::push::PushDispatcher::new(
590                    Arc::new(worker),
591                    push_storage.clone(),
592                    task_storage.clone(),
593                )))
594            } else {
595                None
596            };
597
598        let state = AppState {
599            executor,
600            task_storage,
601            push_storage,
602            event_store,
603            atomic_store,
604            event_broker: TaskEventBroker::new(),
605            middleware_stack: Arc::new(MiddlewareStack::new(self.middleware)),
606            runtime_config,
607            in_flight: Arc::new(turul_a2a::server::in_flight::InFlightRegistry::new()),
608            cancellation_supervisor,
609            push_delivery_store,
610            push_dispatcher,
611            durable_executor_queue: self.durable_executor_queue,
612        };
613
614        let router = build_router(state.clone());
615
616        let path_prefix = match self.path_prefix {
617            None => None,
618            Some(p) if p.is_empty() || p == "/" => None,
619            Some(p) => {
620                if !p.starts_with('/') {
621                    return Err(A2aError::InvalidRequest {
622                        message: format!(
623                            "LambdaA2aServerBuilder::strip_path_prefix: prefix must start with '/'; got {p:?}"
624                        ),
625                    });
626                }
627                if p.ends_with('/') {
628                    return Err(A2aError::InvalidRequest {
629                        message: format!(
630                            "LambdaA2aServerBuilder::strip_path_prefix: prefix must not end with '/' (except '/'); got {p:?}"
631                        ),
632                    });
633                }
634                Some(Arc::from(p))
635            }
636        };
637
638        Ok(LambdaA2aHandler {
639            router,
640            state,
641            path_prefix,
642        })
643    }
644}
645
646impl Default for LambdaA2aServerBuilder {
647    fn default() -> Self {
648        Self::new()
649    }
650}
651
652/// Lambda handler wrapping the axum Router.
653#[derive(Clone)]
654pub struct LambdaA2aHandler {
655    router: axum::Router,
656    /// Held for SQS-event dispatch (`handle_sqs`). Always
657    /// populated — the SQS handler method is gated behind the `sqs`
658    /// feature but the state is carried unconditionally so callers can
659    /// reach it for diagnostics or future non-HTTP / non-SQS handlers.
660    #[cfg_attr(not(feature = "sqs"), allow(dead_code))]
661    state: AppState,
662    /// Leading path segment to strip from each incoming HTTP request
663    /// before the A2A router sees it. `None` = no stripping. Set via
664    /// [`LambdaA2aServerBuilder::strip_path_prefix`].
665    path_prefix: Option<Arc<str>>,
666}
667
668impl LambdaA2aHandler {
669    pub fn builder() -> LambdaA2aServerBuilder {
670        LambdaA2aServerBuilder::new()
671    }
672
673    /// Handle a Lambda HTTP request.
674    pub async fn handle(
675        &self,
676        event: lambda_http::Request,
677    ) -> Result<lambda_http::Response<lambda_http::Body>, lambda_http::Error> {
678        let axum_req = lambda_to_axum_request(event)?;
679        let axum_req = match &self.path_prefix {
680            Some(prefix) => adapter::strip_request_path_prefix(axum_req, prefix),
681            None => axum_req,
682        };
683
684        let axum_resp = tower::ServiceExt::oneshot(self.router.clone(), axum_req)
685            .await
686            .map_err(|e| lambda_http::Error::from(format!("Router error: {e}")))?;
687
688        axum_to_lambda_response(axum_resp).await
689    }
690
691    /// Handle a raw Lambda HTTP event delivered as `serde_json::Value`
692    /// and return a response envelope **matching the inbound event
693    /// shape**:
694    ///
695    /// | Inbound shape detected by `lambda_http::LambdaRequest`   | Response envelope                  |
696    /// |----------------------------------------------------------|------------------------------------|
697    /// | API Gateway v1 (REST) — top-level `httpMethod` + `path`  | `ApiGatewayProxyResponse`          |
698    /// | API Gateway v2 / Function URL — `requestContext.http`    | `ApiGatewayV2httpResponse`         |
699    /// | ALB target group — `requestContext.elb`                  | `AlbTargetGroupResponse`           |
700    /// | WebSocket / unrecognised                                  | `Err(...)`                         |
701    ///
702    /// Mismatching envelopes — most commonly a v2 response sent in
703    /// response to a v1 invocation — cause API Gateway to return
704    /// `502 Bad Gateway` regardless of the body, because the response
705    /// fails the gateway's contract validation. This method ensures
706    /// symmetry: response shape is always picked from the inbound
707    /// shape, never hardcoded.
708    ///
709    /// This is the framework's canonical HTTP envelope conversion
710    /// primitive — useful when a single Lambda function routes among
711    /// HTTP and one or more non-framework triggers (EventBridge,
712    /// DynamoDB streams, custom invoke) and the adopter drives
713    /// [`lambda_runtime::run`] with `serde_json::Value` directly.
714    /// Handles the `LambdaRequest` deserialization, the text-vs-base64
715    /// content-type detection, and the response envelope rebuild.
716    ///
717    /// Body encoding: text for `text/*`, `application/json`,
718    /// `application/xml`, `application/javascript`, or anything with
719    /// `charset=`; base64 otherwise.
720    ///
721    /// Path-prefix stripping (see
722    /// [`LambdaA2aServerBuilder::strip_path_prefix`]) is applied
723    /// because dispatch flows through [`Self::handle`]. Adopters who
724    /// only serve HTTP should continue to use [`Self::run_http_only`]
725    /// or [`LambdaA2aServerBuilder::run`]; this entry point exists for
726    /// composition with adopter-owned third triggers.
727    pub async fn handle_http_event_value(
728        &self,
729        value: serde_json::Value,
730    ) -> Result<serde_json::Value, lambda_runtime::Error> {
731        use base64::Engine;
732        use http_body_util::BodyExt;
733
734        let lambda_req: lambda_http::request::LambdaRequest = serde_json::from_value(value)
735            .map_err(|e| lambda_runtime::Error::from(format!("invalid HTTP event: {e}")))?;
736
737        // Capture the inbound shape so we can emit the matching
738        // response envelope. Lost once we convert to `lambda_http::Request`.
739        let shape = match &lambda_req {
740            lambda_http::request::LambdaRequest::ApiGatewayV1(_) => HttpEventShape::ApiGatewayV1,
741            lambda_http::request::LambdaRequest::ApiGatewayV2(_) => HttpEventShape::ApiGatewayV2,
742            lambda_http::request::LambdaRequest::Alb(_) => HttpEventShape::Alb,
743            lambda_http::request::LambdaRequest::WebSocket(_) => {
744                return Err(lambda_runtime::Error::from(
745                    "WebSocket Lambda events are not supported by this adapter",
746                ));
747            }
748            _ => {
749                return Err(lambda_runtime::Error::from(
750                    "unsupported Lambda HTTP event variant",
751                ));
752            }
753        };
754
755        let req: lambda_http::Request = lambda_req.into();
756
757        let resp = self
758            .handle(req)
759            .await
760            .map_err(|e| lambda_runtime::Error::from(format!("handler error: {e}")))?;
761        let (parts, body) = resp.into_parts();
762
763        let bytes = body
764            .collect()
765            .await
766            .map_err(|e| lambda_runtime::Error::from(format!("body collect: {e}")))?
767            .to_bytes();
768
769        let ct = parts
770            .headers
771            .get(http::header::CONTENT_TYPE)
772            .and_then(|v| v.to_str().ok())
773            .unwrap_or("application/octet-stream")
774            .to_ascii_lowercase();
775        let is_text = ct.starts_with("text/")
776            || ct.starts_with("application/json")
777            || ct.starts_with("application/xml")
778            || ct.starts_with("application/javascript")
779            || ct.contains("charset=");
780
781        let (body_str, is_base64) = if bytes.is_empty() {
782            (None, false)
783        } else if is_text {
784            match std::str::from_utf8(&bytes) {
785                Ok(s) => (Some(s.to_string()), false),
786                Err(_) => (
787                    Some(base64::engine::general_purpose::STANDARD.encode(&bytes)),
788                    true,
789                ),
790            }
791        } else {
792            (
793                Some(base64::engine::general_purpose::STANDARD.encode(&bytes)),
794                true,
795            )
796        };
797
798        let status = parts.status.as_u16() as i64;
799        let headers = parts.headers;
800        let body_lambda = body_str.map(aws_lambda_events::encodings::Body::Text);
801
802        match shape {
803            HttpEventShape::ApiGatewayV1 => {
804                let mut api_resp = aws_lambda_events::apigw::ApiGatewayProxyResponse::default();
805                api_resp.status_code = status;
806                api_resp.headers = headers;
807                api_resp.body = body_lambda;
808                api_resp.is_base64_encoded = is_base64;
809                serde_json::to_value(api_resp).map_err(|e| {
810                    lambda_runtime::Error::from(format!("serialise APIGW v1 response: {e}"))
811                })
812            }
813            HttpEventShape::ApiGatewayV2 => {
814                let mut api_resp = aws_lambda_events::apigw::ApiGatewayV2httpResponse::default();
815                api_resp.status_code = status;
816                api_resp.headers = headers;
817                api_resp.body = body_lambda;
818                api_resp.is_base64_encoded = is_base64;
819                serde_json::to_value(api_resp).map_err(|e| {
820                    lambda_runtime::Error::from(format!("serialise APIGW v2 response: {e}"))
821                })
822            }
823            HttpEventShape::Alb => {
824                let mut api_resp = aws_lambda_events::alb::AlbTargetGroupResponse::default();
825                api_resp.status_code = status;
826                api_resp.headers = headers;
827                api_resp.body = body_lambda;
828                api_resp.is_base64_encoded = is_base64;
829                serde_json::to_value(api_resp).map_err(|e| {
830                    lambda_runtime::Error::from(format!("serialise ALB response: {e}"))
831                })
832            }
833        }
834    }
835}
836
837/// Inbound HTTP event shape detected by [`LambdaA2aHandler::handle_http_event_value`].
838///
839/// The response envelope must match the inbound shape — see method
840/// docs for the contract.
841#[derive(Debug, Clone, Copy, PartialEq, Eq)]
842enum HttpEventShape {
843    ApiGatewayV1,
844    ApiGatewayV2,
845    Alb,
846}
847
848impl LambdaA2aHandler {
849    /// Handle an SQS batch from the durable executor queue.
850    ///
851    /// Returns `SqsBatchResponse` whose `batch_item_failures` list
852    /// tells the event source mapping which records to retry.
853    /// Requires `FunctionResponseTypes: ["ReportBatchItemFailures"]`
854    /// on the mapping.
855    ///
856    /// Each record: deserialize envelope → load task → terminal-no-op
857    /// check → cancel-marker direct-CANCELED commit → run executor.
858    /// See `durable::drive_sqs_batch` for the full semantics.
859    #[cfg(feature = "sqs")]
860    pub async fn handle_sqs(
861        &self,
862        event: aws_lambda_events::event::sqs::SqsEvent,
863    ) -> aws_lambda_events::event::sqs::SqsBatchResponse {
864        durable::drive_sqs_batch(&self.state, event).await
865    }
866
867    /// Run this handler as a pure HTTP Lambda. Strict: any non-HTTP
868    /// event shape (SQS, DynamoDB stream, scheduler, …) causes
869    /// `lambda_http::run` to return a deserialization error on
870    /// invocation.
871    ///
872    /// Appropriate for Lambdas whose only trigger is a Function URL,
873    /// API Gateway, or ALB. The request Lambda in the two-Lambda
874    /// durable-executor topology is the canonical caller.
875    pub async fn run_http_only(self) -> Result<(), lambda_runtime::Error> {
876        let handler = Arc::new(self);
877        lambda_http::run(lambda_http::service_fn(move |event| {
878            let h = Arc::clone(&handler);
879            async move { h.handle(event).await }
880        }))
881        .await
882    }
883}
884
885// Default entrypoint when the `sqs` feature is off — `.run()` aliases
886// the HTTP-only runner. With the `sqs` feature enabled, `.run()` lives
887// in `durable.rs` and routes HTTP+SQS via the classifier.
888#[cfg(not(feature = "sqs"))]
889impl LambdaA2aHandler {
890    /// Default Lambda runner. Dispatches based on the event shape the
891    /// binary receives. Without the `sqs` feature this is HTTP-only
892    /// (equivalent to [`LambdaA2aHandler::run_http_only`]).
893    pub async fn run(self) -> Result<(), lambda_runtime::Error> {
894        self.run_http_only().await
895    }
896}
897
898impl LambdaA2aServerBuilder {
899    /// Build and run in one call. Sugar for
900    /// `self.build()?.run().await`. Hides the handler from the adopter —
901    /// use `.build()` + `handler.run_*()` instead when the handler is
902    /// needed for unit tests, custom middleware, or an explicit
903    /// topology runner.
904    pub async fn run(self) -> Result<(), lambda_runtime::Error> {
905        let handler = self.build().map_err(|e| {
906            lambda_runtime::Error::from(format!("LambdaA2aServerBuilder::run: {e}"))
907        })?;
908        handler.run().await
909    }
910}
911
912#[cfg(test)]
913mod handle_http_event_value_shape_tests {
914    //! Regression tests for the event-shape symmetry contract on
915    //! [`LambdaA2aHandler::handle_http_event_value`]. The wire-shape
916    //! mismatch (v2 response to a v1 invocation) is what causes API
917    //! Gateway REST integrations to return `502 Bad Gateway`. These
918    //! tests pin the contract: response envelope must always match
919    //! the inbound request envelope.
920
921    use super::*;
922    use std::sync::Arc;
923    use turul_a2a::executor::{AgentExecutor, ExecutionContext};
924    use turul_a2a::server::RuntimeConfig;
925    use turul_a2a::server::in_flight::InFlightRegistry;
926    use turul_a2a::storage::InMemoryA2aStorage;
927    use turul_a2a::streaming::TaskEventBroker;
928    use turul_a2a_types::{Message, Task};
929
930    struct NoOpExecutor;
931
932    #[async_trait::async_trait]
933    impl AgentExecutor for NoOpExecutor {
934        async fn execute(
935            &self,
936            task: &mut Task,
937            _msg: &Message,
938            _ctx: &ExecutionContext,
939        ) -> Result<(), turul_a2a::error::A2aError> {
940            let mut proto = task.as_proto().clone();
941            proto.status = Some(turul_a2a_proto::TaskStatus {
942                state: turul_a2a_proto::TaskState::Completed.into(),
943                message: None,
944                timestamp: None,
945            });
946            *task = Task::try_from(proto).unwrap();
947            Ok(())
948        }
949        fn agent_card(&self) -> turul_a2a_proto::AgentCard {
950            turul_a2a_proto::AgentCard {
951                name: "test-agent".to_string(),
952                ..Default::default()
953            }
954        }
955    }
956
957    fn build_handler() -> LambdaA2aHandler {
958        let s = Arc::new(InMemoryA2aStorage::new());
959        let state = AppState {
960            executor: Arc::new(NoOpExecutor),
961            task_storage: s.clone(),
962            push_storage: s.clone(),
963            event_store: s.clone(),
964            atomic_store: s.clone(),
965            event_broker: TaskEventBroker::new(),
966            middleware_stack: Arc::new(MiddlewareStack::new(vec![])),
967            runtime_config: RuntimeConfig::default(),
968            in_flight: Arc::new(InFlightRegistry::new()),
969            cancellation_supervisor: s.clone(),
970            push_delivery_store: None,
971            push_dispatcher: None,
972            durable_executor_queue: None,
973        };
974        let router = build_router(state.clone());
975        LambdaA2aHandler {
976            router,
977            state,
978            path_prefix: None,
979        }
980    }
981
982    /// API Gateway v2 / Function URL event. `requestContext.http.method`
983    /// is what `LambdaRequest::ApiGatewayV2` keys off.
984    fn apigw_v2_agent_card_get() -> serde_json::Value {
985        serde_json::json!({
986            "version": "2.0",
987            "routeKey": "GET /.well-known/agent-card.json",
988            "rawPath": "/.well-known/agent-card.json",
989            "rawQueryString": "",
990            "headers": {"accept": "application/json", "a2a-version": "1.0"},
991            "requestContext": {
992                "accountId": "000000000000",
993                "apiId": "api",
994                "domainName": "fake.execute-api",
995                "domainPrefix": "fake",
996                "http": {
997                    "method": "GET",
998                    "path": "/.well-known/agent-card.json",
999                    "protocol": "HTTP/1.1",
1000                    "sourceIp": "127.0.0.1",
1001                    "userAgent": "test"
1002                },
1003                "requestId": "rid",
1004                "routeKey": "GET /.well-known/agent-card.json",
1005                "stage": "$default",
1006                "time": "01/Jan/2026:00:00:00 +0000",
1007                "timeEpoch": 1_735_689_600_000i64
1008            },
1009            "isBase64Encoded": false
1010        })
1011    }
1012
1013    /// API Gateway v1 (REST) event. Top-level `httpMethod` + `path` and
1014    /// `requestContext.identity` (no `requestContext.http`) is what
1015    /// `LambdaRequest::ApiGatewayV1` keys off. This is the shape REST
1016    /// API + AWS_PROXY integrations send into the Lambda — the live
1017    /// fleet topology that 502'd against a hardcoded v2 response.
1018    ///
1019    /// `stage` is `$default` here so `lambda_http` does not prepend a
1020    /// stage prefix — this fixture exists to pin the response-shape
1021    /// contract, not the path-rewriting behaviour. See the
1022    /// `strip_path_prefix_*` tests for stage-prefix coverage.
1023    fn apigw_v1_agent_card_get_with_stage(stage: &str, path: &str) -> serde_json::Value {
1024        let request_context_path = if stage == "$default" {
1025            path.to_string()
1026        } else {
1027            format!("/{stage}{path}")
1028        };
1029        serde_json::json!({
1030            "resource": "/{proxy+}",
1031            "path": path,
1032            "httpMethod": "GET",
1033            "headers": {"accept": "application/json", "a2a-version": "1.0"},
1034            "multiValueHeaders": {},
1035            "queryStringParameters": null,
1036            "multiValueQueryStringParameters": null,
1037            "pathParameters": {"proxy": path.trim_start_matches('/').to_string()},
1038            "stageVariables": null,
1039            "requestContext": {
1040                "accountId": "000000000000",
1041                "apiId": "abc123",
1042                "domainName": "fake.execute-api",
1043                "domainPrefix": "fake",
1044                "extendedRequestId": "xrid",
1045                "httpMethod": "GET",
1046                "identity": {
1047                    "sourceIp": "127.0.0.1",
1048                    "userAgent": "test"
1049                },
1050                "path": request_context_path,
1051                "protocol": "HTTP/1.1",
1052                "requestId": "rid",
1053                "requestTime": "01/Jan/2026:00:00:00 +0000",
1054                "requestTimeEpoch": 1_735_689_600_000i64,
1055                "resourceId": "rsrc",
1056                "resourcePath": "/{proxy+}",
1057                "stage": stage
1058            },
1059            "body": null,
1060            "isBase64Encoded": false
1061        })
1062    }
1063
1064    fn apigw_v1_agent_card_get() -> serde_json::Value {
1065        apigw_v1_agent_card_get_with_stage("$default", "/.well-known/agent-card.json")
1066    }
1067
1068    /// ALB target-group event. `requestContext.elb` is what
1069    /// `LambdaRequest::Alb` keys off.
1070    fn alb_agent_card_get() -> serde_json::Value {
1071        serde_json::json!({
1072            "requestContext": {
1073                "elb": {
1074                    "targetGroupArn": "arn:aws:elasticloadbalancing:ap-southeast-2:000:targetgroup/fake/00"
1075                }
1076            },
1077            "httpMethod": "GET",
1078            "path": "/.well-known/agent-card.json",
1079            "queryStringParameters": {},
1080            "headers": {"accept": "application/json", "a2a-version": "1.0"},
1081            "body": "",
1082            "isBase64Encoded": false
1083        })
1084    }
1085
1086    /// REST v1 inbound MUST produce a v1 response shape. This is the
1087    /// regression for the live-fleet 502.
1088    #[tokio::test]
1089    async fn v1_inbound_event_emits_v1_response_envelope() {
1090        let handler = build_handler();
1091        let resp_value = handler
1092            .handle_http_event_value(apigw_v1_agent_card_get())
1093            .await
1094            .expect("handle_http_event_value(v1) must succeed");
1095
1096        // Parses cleanly as v1.
1097        let v1: aws_lambda_events::apigw::ApiGatewayProxyResponse =
1098            serde_json::from_value(resp_value.clone())
1099                .expect("response must deserialize as ApiGatewayProxyResponse (v1)");
1100        assert_eq!(v1.status_code, 200);
1101        match v1.body {
1102            Some(aws_lambda_events::encodings::Body::Text(s)) => {
1103                assert!(s.contains("\"name\""), "v1 body shape: {s}");
1104            }
1105            other => panic!("expected text body, got {other:?}"),
1106        }
1107
1108        // Negative: v1 response JSON must NOT contain the v2-only
1109        // `cookies` field. Presence of `cookies` is API Gateway REST's
1110        // 502 trigger.
1111        assert!(
1112            resp_value.get("cookies").is_none(),
1113            "v1 response must not carry v2-only `cookies` field; payload: {resp_value}"
1114        );
1115    }
1116
1117    /// API Gateway v2 / Function URL inbound MUST produce a v2 response
1118    /// shape (current behaviour, locked in to prevent the inverse
1119    /// regression).
1120    #[tokio::test]
1121    async fn v2_inbound_event_emits_v2_response_envelope() {
1122        let handler = build_handler();
1123        let resp_value = handler
1124            .handle_http_event_value(apigw_v2_agent_card_get())
1125            .await
1126            .expect("handle_http_event_value(v2) must succeed");
1127
1128        let v2: aws_lambda_events::apigw::ApiGatewayV2httpResponse =
1129            serde_json::from_value(resp_value.clone())
1130                .expect("response must deserialize as ApiGatewayV2httpResponse (v2)");
1131        assert_eq!(v2.status_code, 200);
1132        match v2.body {
1133            Some(aws_lambda_events::encodings::Body::Text(s)) => {
1134                assert!(s.contains("\"name\""), "v2 body shape: {s}");
1135            }
1136            other => panic!("expected text body, got {other:?}"),
1137        }
1138    }
1139
1140    /// ALB inbound MUST produce an ALB response shape.
1141    #[tokio::test]
1142    async fn alb_inbound_event_emits_alb_response_envelope() {
1143        let handler = build_handler();
1144        let resp_value = handler
1145            .handle_http_event_value(alb_agent_card_get())
1146            .await
1147            .expect("handle_http_event_value(alb) must succeed");
1148
1149        let alb: aws_lambda_events::alb::AlbTargetGroupResponse =
1150            serde_json::from_value(resp_value)
1151                .expect("response must deserialize as AlbTargetGroupResponse");
1152        assert_eq!(alb.status_code, 200);
1153    }
1154
1155    /// Live-fleet end-to-end regression: REST v1 stage-prefixed event
1156    /// + `strip_path_prefix("/dev")` on the handler routes through
1157    /// the agent-card endpoint AND emits a v1 response envelope. This
1158    /// is the exact scenario the live fleet was 502'ing on.
1159    #[tokio::test]
1160    async fn v1_stage_prefix_inbound_with_strip_emits_v1_response() {
1161        let s = Arc::new(InMemoryA2aStorage::new());
1162        let state = AppState {
1163            executor: Arc::new(NoOpExecutor),
1164            task_storage: s.clone(),
1165            push_storage: s.clone(),
1166            event_store: s.clone(),
1167            atomic_store: s.clone(),
1168            event_broker: TaskEventBroker::new(),
1169            middleware_stack: Arc::new(MiddlewareStack::new(vec![])),
1170            runtime_config: RuntimeConfig::default(),
1171            in_flight: Arc::new(InFlightRegistry::new()),
1172            cancellation_supervisor: s.clone(),
1173            push_delivery_store: None,
1174            push_dispatcher: None,
1175            durable_executor_queue: None,
1176        };
1177        let router = build_router(state.clone());
1178        let handler = LambdaA2aHandler {
1179            router,
1180            state,
1181            path_prefix: Some(Arc::from("/dev")),
1182        };
1183
1184        let event = apigw_v1_agent_card_get_with_stage("dev", "/.well-known/agent-card.json");
1185        let resp_value = handler
1186            .handle_http_event_value(event)
1187            .await
1188            .expect("v1 stage-prefixed event must succeed under strip_path_prefix");
1189
1190        let v1: aws_lambda_events::apigw::ApiGatewayProxyResponse =
1191            serde_json::from_value(resp_value)
1192                .expect("response must deserialize as ApiGatewayProxyResponse");
1193        assert_eq!(v1.status_code, 200);
1194        match v1.body {
1195            Some(aws_lambda_events::encodings::Body::Text(s)) => {
1196                assert!(s.contains("\"name\""), "v1 stage-prefix body: {s}");
1197            }
1198            other => panic!("expected text body, got {other:?}"),
1199        }
1200    }
1201
1202    /// Cross-shape decoding regression: a v1 response JSON carries
1203    /// `multiValueHeaders` (REST shape) but no `cookies`; a v2 response
1204    /// carries `cookies` but no v1-only `multiValueHeaders` semantics.
1205    /// Confirms the two shapes are wire-distinguishable, which is why
1206    /// the gateway 502s on a mismatch.
1207    #[tokio::test]
1208    async fn v1_response_does_not_match_v2_envelope_strictly() {
1209        let handler = build_handler();
1210        let v1_value = handler
1211            .handle_http_event_value(apigw_v1_agent_card_get())
1212            .await
1213            .unwrap();
1214
1215        // Sanity: v1 response JSON has no v2-only `cookies` field.
1216        assert!(v1_value.get("cookies").is_none());
1217
1218        let v2_value = handler
1219            .handle_http_event_value(apigw_v2_agent_card_get())
1220            .await
1221            .unwrap();
1222        // Sanity: v2 response JSON carries the v2 `cookies` field
1223        // (default empty array — still present on the wire).
1224        assert!(
1225            v2_value.get("cookies").is_some(),
1226            "v2 response should carry the v2-only cookies field"
1227        );
1228    }
1229}