turul_a2a_aws_lambda/lib.rs
1//! AWS Lambda adapter for turul-a2a.
2//!
3//! # Choosing a runner
4//!
5//! A Lambda binary receives raw event JSON; the adapter owns
6//! classifying that JSON and dispatching to the right A2A handler
7//! method. Adopters pick the runner whose name matches the Lambda's
8//! AWS trigger topology and end `main.rs` with one `.await?` call —
9//! `lambda_runtime` / `lambda_http` / event parsing never appear in
10//! adopter code.
11//!
12//! | Lambda triggers | Runner |
13//! |-----------------------------------|------------------------------------------------|
14//! | HTTP only (Function URL / APIGW) | [`LambdaA2aHandler::run_http_only`] |
15//! | SQS only (event source mapping) | [`LambdaA2aHandler::run_sqs_only`] *(`sqs`)* |
16//! | HTTP **and** SQS on one function | [`LambdaA2aHandler::run_http_and_sqs`] *(`sqs`)* |
17//! | not sure / just run it | [`LambdaA2aHandler::run`] |
18//!
19//! [`LambdaA2aHandler::run`] is the default "it just works" entry
20//! point: without the `sqs` feature it aliases `run_http_only`; with
21//! `sqs` it aliases `run_http_and_sqs` (permissive, handles either
22//! shape). The explicit runners are **strict** — a non-matching
23//! event shape fails loudly, which is what hardened deployments
24//! and tests want.
25//!
26//! The one-call shortcut is [`LambdaA2aServerBuilder::run`] — it
27//! build-then-runs in a single fluent call. Use `.build()` + a
28//! handler method when the handler is needed for unit tests or
29//! custom middleware.
30//!
31//! ## Composing with non-framework triggers
32//!
33//! Lambdas wired to HTTP **plus** a third trigger the framework does
34//! not know about (EventBridge scheduler, DynamoDB stream, custom
35//! invoke) can't use a named runner directly — the Unknown branch
36//! is adopter-owned. Reach for the public classifier + the HTTP
37//! envelope primitive instead:
38//!
39//! ```ignore
40//! lambda_runtime::run(lambda_runtime::service_fn(
41//! move |event: lambda_runtime::LambdaEvent<serde_json::Value>| {
42//! let handler = handler.clone();
43//! async move {
44//! let (value, _ctx) = event.into_parts();
45//! match turul_a2a_aws_lambda::classify_event(&value) {
46//! turul_a2a_aws_lambda::LambdaEvent::Http => {
47//! handler.handle_http_event_value(value).await
48//! }
49//! turul_a2a_aws_lambda::LambdaEvent::Sqs => {
50//! let sqs = serde_json::from_value(value)?;
51//! Ok(serde_json::to_value(handler.handle_sqs(sqs).await)?)
52//! }
53//! turul_a2a_aws_lambda::LambdaEvent::Unknown => {
54//! // Adopter-owned: run your scheduled sweep,
55//! // DynamoDB stream consumer, etc.
56//! run_adopter_specific_path(value).await
57//! }
58//! }
59//! }
60//! }
61//! )).await
62//! ```
63//!
64//! [`LambdaA2aHandler::handle_http_event_value`] is available without
65//! the `sqs` feature; `handle_sqs` is gated on `sqs`. `classify_event`
66//! + `LambdaEvent` are always available.
67//!
68//! ## API Gateway path prefix
69//!
70//! When Lambda sits behind a REST API Gateway with `AWS_PROXY`
71//! integration, the event carries the full stage + resource tree in
72//! the path (e.g. `/stage/agent/message:send`),
73//! which the root-rooted A2A router would 404. Configure the prefix
74//! to strip via [`LambdaA2aServerBuilder::strip_path_prefix`]:
75//!
76//! ```ignore
77//! LambdaA2aServerBuilder::new()
78//! .executor(my_executor)
79//! .storage(my_storage)
80//! .strip_path_prefix("/stage/agent") // API GW prefix
81//! .run()
82//! .await
83//! ```
84//!
85//! The strip applies to `run_http_only` and `run_http_and_sqs`.
86//! SQS events are unaffected. Non-matching paths pass through
87//! unchanged (router still 404s on genuinely unknown paths —
88//! that's the correct failure mode).
89//!
90//! ```ignore
91//! // HTTP-only Lambda (request Lambda in two-Lambda topology):
92//! LambdaA2aServerBuilder::new()
93//! .executor(my_executor)
94//! .storage(my_storage)
95//! .with_sqs_return_immediately(queue_url, sqs) // enqueues durable jobs
96//! .build()?
97//! .run_http_only()
98//! .await
99//!
100//! // Single-function HTTP+SQS demo:
101//! LambdaA2aServerBuilder::new()
102//! .executor(my_executor)
103//! .storage(my_storage)
104//! .with_sqs_return_immediately(queue_url, sqs)
105//! .run() // builder shortcut; dispatches HTTP + SQS
106//! .await
107//!
108//! // Pure SQS worker — no with_sqs_return_immediately(...) needed
109//! // (it consumes the queue, never enqueues):
110//! LambdaA2aServerBuilder::new()
111//! .executor(my_executor)
112//! .storage(my_storage)
113//! .build()?
114//! .run_sqs_only()
115//! .await
116//! ```
117//!
118//! # Adapter internals
119//!
120//! Thin wrapper: converts Lambda events to axum requests, delegates to
121//! the same Router, converts responses back. Adapter contract:
122//! - Authorizer context mapped via synthetic headers with anti-spoofing.
123//! - SSE responses are buffered: task executes, events are collected,
124//! returned as one response.
125//! - `POST /message:stream` executes the task within the Lambda
126//! invocation and returns all events.
127//! - `GET /tasks/{id}:subscribe` is for tasks that are **not** in a
128//! terminal state. Within one invocation it emits the initial `Task`
129//! snapshot, replays stored events via `Last-Event-ID`, and closes
130//! when the task reaches a terminal state. Subscribing to an
131//! already-terminal task returns `UnsupportedOperationError` per
132//! A2A v1.0 §3.1.6. For retrieving a terminal task's final state
133//! use `GetTask`.
134//!
135//! # Push-notification delivery — external triggers are mandatory
136//!
137//! Push delivery on Lambda is architecturally different from the
138//! binary server. The request Lambda installed via
139//! [`LambdaA2aServerBuilder`] still constructs a `PushDispatcher`
140//! when `push_delivery_store` is wired, but any `tokio::spawn`
141//! continuation it emits post-return is **opportunistic only** — the
142//! Lambda execution environment may be frozen indefinitely between
143//! invocations, so nothing can depend on that continuation completing.
144//!
145//! Correctness for push delivery on Lambda is carried by:
146//!
147//! 1. The atomic pending-dispatch marker written inside the request
148//! Lambda's commit transaction (opt in via
149//! `StorageImpl::with_push_dispatch_enabled(true)`).
150//! 2. [`LambdaStreamRecoveryHandler`] — DynamoDB Streams trigger on
151//! `a2a_push_pending_dispatches`. DynamoDB backends only.
152//! 3. [`LambdaScheduledRecoveryHandler`] — EventBridge Scheduler
153//! backstop. **Required for all backends**; it is the sole
154//! recovery path for SQLite / PostgreSQL / in-memory deployments.
155//!
156//! Without at least the scheduled worker, push delivery on Lambda is
157//! not durable — a marker written on a cold invocation may never be
158//! consumed. The example wiring lives in
159//! `examples/lambda-stream-worker` and
160//! `examples/lambda-scheduled-worker`.
161//!
162//! Lambda streaming is request-scoped (not persistent SSE connections). The durable
163//! event store ensures events survive across invocations. Clients reconnect with
164//! `Last-Event-ID` for continuation.
165//!
166//! # Cross-instance cancellation
167//!
168//! Lambda invocations are stateless and short-lived, so the Lambda
169//! adapter does **not** run the persistent cross-instance cancel
170//! poller that `A2aServer::run()` spawns. Cancellation behaviour on
171//! the Lambda adapter:
172//!
173//! - **Marker writes** — `CancelTask` on a Lambda invocation writes
174//! the cancel marker to the shared backend (DynamoDB / PostgreSQL).
175//! This works.
176//! - **Propagation to a live executor on the SAME Lambda invocation** —
177//! works via the same-instance token-trip path in `core_cancel_task`.
178//! - **Propagation to a live executor on a DIFFERENT Lambda invocation
179//! (warm container)** — **not currently supported**. There is no
180//! persistent poller to observe markers written by other
181//! invocations. A subsequent invocation whose handler reads the
182//! marker directly may act on it, but that is not a substitute for
183//! the server runtime's live propagation.
184//!
185//! The builder still **requires** an `A2aCancellationSupervisor`
186//! implementation on the same backend so that marker writes reach
187//! the correct backend and a future polling-adapter variant can
188//! consume them. If a deployment passes a non-matching supervisor,
189//! `build()` rejects the configuration.
190
191mod adapter;
192mod auth;
193mod event;
194mod no_streaming;
195mod scheduled_recovery;
196mod stream_recovery;
197
198#[cfg(feature = "sqs")]
199mod durable;
200
201#[cfg(test)]
202mod builder_tests;
203#[cfg(test)]
204mod scheduled_recovery_tests;
205#[cfg(test)]
206mod stream_recovery_tests;
207
208pub use adapter::{axum_to_lambda_response, lambda_to_axum_request};
209pub use auth::{AuthorizerMapping, LambdaAuthorizerMiddleware};
210pub use event::{LambdaEvent, classify_event};
211pub use no_streaming::NoStreamingLayer;
212pub use scheduled_recovery::{
213 LambdaScheduledRecoveryConfig, LambdaScheduledRecoveryHandler, LambdaScheduledRecoveryResponse,
214};
215pub use stream_recovery::LambdaStreamRecoveryHandler;
216
217#[cfg(feature = "sqs")]
218pub use durable::{SqsDurableExecutorQueue, drive_sqs_batch};
219
220use std::sync::Arc;
221
222use turul_a2a::error::A2aError;
223use turul_a2a::executor::AgentExecutor;
224use turul_a2a::middleware::{A2aMiddleware, MiddlewareStack};
225use turul_a2a::push::A2aPushDeliveryStore;
226use turul_a2a::router::{AppState, build_router};
227use turul_a2a::server::RuntimeConfig;
228use turul_a2a::storage::{
229 A2aAtomicStore, A2aCancellationSupervisor, A2aEventStore, A2aPushNotificationStorage,
230 A2aTaskStorage,
231};
232use turul_a2a::streaming::TaskEventBroker;
233
234/// Builder for Lambda A2A handler.
235///
236/// Use `.storage(my_storage)` to supply a single backend implementing all storage traits.
237/// All four storage traits must share the same backend so streaming, push delivery,
238/// and cross-instance cancellation coordinate correctly.
239///
240/// For push-notification delivery, wire `.storage()` with a backend that implements
241/// [`A2aPushDeliveryStore`] (all four first-party backends do). The atomic store
242/// must also opt in via `with_push_dispatch_enabled(true)`; the
243/// builder rejects configurations where the flag and the delivery store disagree.
244pub struct LambdaA2aServerBuilder {
245 executor: Option<Arc<dyn AgentExecutor>>,
246 task_storage: Option<Arc<dyn A2aTaskStorage>>,
247 push_storage: Option<Arc<dyn A2aPushNotificationStorage>>,
248 event_store: Option<Arc<dyn A2aEventStore>>,
249 atomic_store: Option<Arc<dyn A2aAtomicStore>>,
250 cancellation_supervisor: Option<Arc<dyn A2aCancellationSupervisor>>,
251 push_delivery_store: Option<Arc<dyn A2aPushDeliveryStore>>,
252 middleware: Vec<Arc<dyn A2aMiddleware>>,
253 runtime_config: RuntimeConfig,
254 durable_executor_queue: Option<Arc<dyn turul_a2a::durable_executor::DurableExecutorQueue>>,
255 path_prefix: Option<String>,
256}
257
258impl LambdaA2aServerBuilder {
259 pub fn new() -> Self {
260 Self {
261 executor: None,
262 task_storage: None,
263 push_storage: None,
264 event_store: None,
265 atomic_store: None,
266 cancellation_supervisor: None,
267 push_delivery_store: None,
268 middleware: vec![],
269 runtime_config: RuntimeConfig::default(),
270 durable_executor_queue: None,
271 path_prefix: None,
272 }
273 }
274
275 /// Configure an HTTP path prefix to strip before the A2A router
276 /// sees the request. Needed when Lambda sits behind an API Gateway
277 /// whose stage + resource tree is forwarded into the function
278 /// (e.g. REST API `AWS_PROXY` integrations surface
279 /// `/stage/agent/message:send` where the A2A
280 /// router expects `/message:send`).
281 ///
282 /// The prefix must start with `/` and must not end with `/`
283 /// (unless it is exactly `/`, which is a no-op). Segment boundaries
284 /// are respected — a prefix of `/dev` will not strip `/devs/...`.
285 /// Paths that do not start with the prefix pass through unchanged
286 /// (the router will 404 on a genuinely unknown path, which is the
287 /// correct failure mode). Applies to both
288 /// [`LambdaA2aHandler::run_http_only`] and
289 /// [`LambdaA2aHandler::run_http_and_sqs`]. SQS events are
290 /// unaffected.
291 pub fn strip_path_prefix(mut self, prefix: impl Into<String>) -> Self {
292 self.path_prefix = Some(prefix.into());
293 self
294 }
295
296 /// Wire a durable executor queue. When set, the
297 /// `return_immediately = true` path in `core_send_message` enqueues
298 /// a [`turul_a2a::durable_executor::QueuedExecutorJob`] on this
299 /// queue instead of spawning the executor locally, and the
300 /// capability flag `RuntimeConfig::supports_return_immediately` is
301 /// turned **back on** as an implementation detail — the capability
302 /// cannot be claimed without supplying the queue.
303 ///
304 /// Adopters should typically reach for the SQS-specific helper
305 /// [`Self::with_sqs_return_immediately`] instead; this generic
306 /// method exists so adopters can inject in-memory fakes for tests
307 /// or provide non-SQS transports (Kinesis, Step Functions task
308 /// token, self-invoke, etc.).
309 pub fn with_durable_executor(
310 mut self,
311 queue: Arc<dyn turul_a2a::durable_executor::DurableExecutorQueue>,
312 ) -> Self {
313 self.durable_executor_queue = Some(queue);
314 self.runtime_config.supports_return_immediately = true;
315 self
316 }
317
318 /// Wire the AWS SQS durable executor queue. Ergonomic
319 /// helper over [`Self::with_durable_executor`] that constructs a
320 /// [`SqsDurableExecutorQueue`] from a `queue_url` + shared SQS
321 /// client. Requires the `sqs` feature.
322 #[cfg(feature = "sqs")]
323 pub fn with_sqs_return_immediately(
324 self,
325 queue_url: impl Into<String>,
326 sqs_client: Arc<aws_sdk_sqs::Client>,
327 ) -> Self {
328 let queue = Arc::new(SqsDurableExecutorQueue::new(queue_url, sqs_client));
329 self.with_durable_executor(queue)
330 }
331
332 pub fn executor(mut self, exec: impl AgentExecutor + 'static) -> Self {
333 self.executor = Some(Arc::new(exec));
334 self
335 }
336
337 /// Set all storage from a single backend instance.
338 ///
339 /// This is the **preferred** method — a single struct implementing all
340 /// storage traits guarantees the same-backend requirement
341 /// AND ensures the cancellation supervisor reads the same backend
342 /// the cancel marker is written to. A mismatch here (e.g., DynamoDB
343 /// task storage + in-memory cancellation supervisor) silently
344 /// breaks cross-instance cancellation.
345 ///
346 /// errata: `.storage()` wires storage traits only. It does
347 /// **not** auto-register the storage as a push-delivery store, even if
348 /// the backend happens to implement [`A2aPushDeliveryStore`]. To opt in
349 /// to push delivery, call [`Self::push_delivery_store`] explicitly and
350 /// call `with_push_dispatch_enabled(true)` on the storage instance
351 /// before passing it here. Non-push deployments need neither.
352 pub fn storage<S>(mut self, storage: S) -> Self
353 where
354 S: A2aTaskStorage
355 + A2aPushNotificationStorage
356 + A2aEventStore
357 + A2aAtomicStore
358 + A2aCancellationSupervisor
359 + Clone
360 + 'static,
361 {
362 self.task_storage = Some(Arc::new(storage.clone()));
363 self.push_storage = Some(Arc::new(storage.clone()));
364 self.event_store = Some(Arc::new(storage.clone()));
365 self.atomic_store = Some(Arc::new(storage.clone()));
366 self.cancellation_supervisor = Some(Arc::new(storage));
367 self
368 }
369
370 /// Set task storage individually. Must be paired with event_store/atomic_store
371 /// AND `cancellation_supervisor()` for same-backend compliance.
372 pub fn task_storage(mut self, s: impl A2aTaskStorage + 'static) -> Self {
373 self.task_storage = Some(Arc::new(s));
374 self
375 }
376
377 /// Set push notification storage individually.
378 pub fn push_storage(mut self, s: impl A2aPushNotificationStorage + 'static) -> Self {
379 self.push_storage = Some(Arc::new(s));
380 self
381 }
382
383 /// Set event store individually. Must match task_storage backend.
384 pub fn event_store(mut self, s: impl A2aEventStore + 'static) -> Self {
385 self.event_store = Some(Arc::new(s));
386 self
387 }
388
389 /// Set atomic store individually. Must match task_storage backend.
390 pub fn atomic_store(mut self, s: impl A2aAtomicStore + 'static) -> Self {
391 self.atomic_store = Some(Arc::new(s));
392 self
393 }
394
395 /// Set the cancellation supervisor individually. Must match
396 /// task_storage backend — otherwise `:cancel` writes the marker to
397 /// one backend and the supervisor reads from another, silently
398 /// breaking cross-instance cancellation. Prefer `.storage()` for
399 /// unified wiring. Consumed by `core_cancel_task`.
400 pub fn cancellation_supervisor(mut self, s: impl A2aCancellationSupervisor + 'static) -> Self {
401 self.cancellation_supervisor = Some(Arc::new(s));
402 self
403 }
404
405 /// Set the push delivery store individually.
406 ///
407 /// Prefer `.storage()` for unified wiring. The delivery store MUST
408 /// be on the same backend as `.task_storage(...)` — the builder
409 /// rejects mismatches. Passing a delivery store also requires the
410 /// atomic store to have opted in via `with_push_dispatch_enabled(true)`
411 ///.
412 pub fn push_delivery_store(mut self, store: impl A2aPushDeliveryStore + 'static) -> Self {
413 self.push_delivery_store = Some(Arc::new(store));
414 self
415 }
416
417 /// Override the runtime configuration (timeouts, retry budgets,
418 /// push tuning). Defaults to [`RuntimeConfig::default()`]. The
419 /// builder validates push settings (claim expiry vs retry horizon,
420 /// ) when `.push_delivery_store(...)` is wired.
421 pub fn runtime_config(mut self, cfg: RuntimeConfig) -> Self {
422 self.runtime_config = cfg;
423 self
424 }
425
426 pub fn middleware(mut self, mw: Arc<dyn A2aMiddleware>) -> Self {
427 self.middleware.push(mw);
428 self
429 }
430
431 pub fn build(self) -> Result<LambdaA2aHandler, A2aError> {
432 let executor = self
433 .executor
434 .ok_or(A2aError::Internal("executor is required".into()))?;
435 let task_storage = self.task_storage.ok_or(A2aError::Internal(
436 "task_storage is required for Lambda".into(),
437 ))?;
438 let push_storage = self.push_storage.ok_or(A2aError::Internal(
439 "push_storage is required for Lambda".into(),
440 ))?;
441 let event_store: Arc<dyn A2aEventStore> = self.event_store.ok_or(A2aError::Internal(
442 "event_store is required for Lambda (use .storage() for unified backend)".into(),
443 ))?;
444 let atomic_store: Arc<dyn A2aAtomicStore> = self.atomic_store.ok_or(A2aError::Internal(
445 "atomic_store is required for Lambda (use .storage() for unified backend)".into(),
446 ))?;
447 // the cancellation supervisor MUST be supplied. No
448 // InMemoryA2aStorage fallback: that would silently break
449 // cross-instance cancellation by reading markers from a
450 // different backend than they were written to.
451 let cancellation_supervisor: Arc<dyn A2aCancellationSupervisor> =
452 self.cancellation_supervisor.ok_or(A2aError::Internal(
453 "cancellation_supervisor is required for Lambda. Use .storage() for unified \
454 backend wiring, or .cancellation_supervisor(...) alongside the individual \
455 storage setters. Omitting it silently breaks cross-instance cancellation."
456 .into(),
457 ))?;
458
459 // Same-backend enforcement: all storage traits + the
460 // cancellation supervisor must report the same `backend_name`.
461 let task_backend = task_storage.backend_name();
462 let push_backend = push_storage.backend_name();
463 let event_backend = event_store.backend_name();
464 let atomic_backend = atomic_store.backend_name();
465 let supervisor_backend = cancellation_supervisor.backend_name();
466 if task_backend != push_backend
467 || task_backend != event_backend
468 || task_backend != atomic_backend
469 || task_backend != supervisor_backend
470 {
471 return Err(A2aError::Internal(format!(
472 "Storage backend mismatch: task={task_backend}, push={push_backend}, \
473 event={event_backend}, atomic={atomic_backend}, \
474 cancellation_supervisor={supervisor_backend}. \
475 requires all storage traits to share the same backend. \
476 requires the cancellation supervisor on the same backend \
477 so cross-instance cancel markers are observable. \
478 Use .storage() for unified backend."
479 )));
480 }
481
482 let push_delivery_store = self.push_delivery_store;
483
484 // Push-dispatch consistency: the atomic store's
485 // opt-in flag and the presence of `push_delivery_store` MUST
486 // agree. Both-true = push fully wired; both-false = non-push
487 // deployment. Mixed cases are build errors — this mirrors the
488 // main server builder (server/mod.rs).
489 match (
490 push_delivery_store.is_some(),
491 atomic_store.push_dispatch_enabled(),
492 ) {
493 (true, true) | (false, false) => {}
494 (true, false) => {
495 return Err(A2aError::Internal(
496 "push_delivery_store wired but atomic_store.push_dispatch_enabled() \
497 is false. Call .with_push_dispatch_enabled(true) on the backend \
498 storage before passing it to .storage()."
499 .into(),
500 ));
501 }
502 (false, true) => {
503 return Err(A2aError::Internal(
504 "atomic_store.push_dispatch_enabled() is true but no \
505 push_delivery_store is wired. Pending-dispatch markers would be \
506 written with no consumer, imposing load-bearing infra for no \
507 benefit. If you need to populate markers for an external \
508 consumer, open an issue for a distinctly-named opt-in — for now, \
509 this configuration is rejected."
510 .into(),
511 ));
512 }
513 }
514
515 // §Decision Bug 1: the Lambda execution environment
516 // freezes after the HTTP response is flushed, so any
517 // `tokio::spawn`'d executor continuation is opportunistic only
518 //. Refuse `SendMessageConfiguration.return_immediately
519 // = true` at the `core_send_message` entry point via this
520 // capability flag instead of silently orphaning the executor.
521 //
522 // if a durable executor queue is wired via
523 // [`Self::with_durable_executor`] (or the SQS helper), the
524 // capability is re-enabled because enqueueing is durable
525 // across the Lambda freeze. The explicit re-enable lives in
526 // the builder method so the capability cannot be claimed
527 // without the mechanism.
528 let mut runtime_config = self.runtime_config;
529 if self.durable_executor_queue.is_none() {
530 runtime_config.supports_return_immediately = false;
531 }
532
533 // Push dispatcher wiring. When
534 // `push_delivery_store` is present, validate its backend matches,
535 // validate the retry-horizon invariant, and construct the
536 // `PushDispatcher`. Under Lambda the dispatcher runs
537 // opportunistically post-return; correctness
538 // comes from the atomic marker + stream/scheduler recovery.
539 let push_dispatcher: Option<Arc<turul_a2a::push::PushDispatcher>> =
540 if let Some(delivery) = push_delivery_store.as_ref() {
541 let push_delivery_backend = delivery.backend_name();
542 if task_backend != push_delivery_backend {
543 return Err(A2aError::Internal(format!(
544 "Storage backend mismatch: task={task_backend}, \
545 push_delivery={push_delivery_backend}. \
546 requires all storage traits to share the same backend."
547 )));
548 }
549
550 // claim expiry must exceed the worst-case
551 // retry horizon so a live claim is not mis-classified as
552 // stale mid-retry.
553 let retry_horizon = runtime_config
554 .push_backoff_cap
555 .saturating_mul(runtime_config.push_max_attempts as u32);
556 if runtime_config.push_claim_expiry <= retry_horizon {
557 return Err(A2aError::Internal(format!(
558 "push_claim_expiry ({:?}) must be greater than retry horizon \
559 (push_max_attempts={} * push_backoff_cap={:?} = {:?}). \
560 Raise push_claim_expiry or lower push_max_attempts/push_backoff_cap.",
561 runtime_config.push_claim_expiry,
562 runtime_config.push_max_attempts,
563 runtime_config.push_backoff_cap,
564 retry_horizon
565 )));
566 }
567
568 let mut delivery_cfg = turul_a2a::push::delivery::PushDeliveryConfig::default();
569 delivery_cfg.max_attempts = runtime_config.push_max_attempts as u32;
570 delivery_cfg.backoff_base = runtime_config.push_backoff_base;
571 delivery_cfg.backoff_cap = runtime_config.push_backoff_cap;
572 delivery_cfg.backoff_jitter = runtime_config.push_backoff_jitter;
573 delivery_cfg.request_timeout = runtime_config.push_request_timeout;
574 delivery_cfg.connect_timeout = runtime_config.push_connect_timeout;
575 delivery_cfg.read_timeout = runtime_config.push_read_timeout;
576 delivery_cfg.claim_expiry = runtime_config.push_claim_expiry;
577 delivery_cfg.max_payload_bytes = runtime_config.push_max_payload_bytes;
578 delivery_cfg.allow_insecure_urls = runtime_config.allow_insecure_push_urls;
579
580 let instance_id = format!("a2a-lambda-{}", uuid::Uuid::now_v7());
581 let worker = turul_a2a::push::delivery::PushDeliveryWorker::new(
582 delivery.clone(),
583 delivery_cfg,
584 None,
585 instance_id,
586 )
587 .map_err(|e| A2aError::Internal(format!("push worker build failed: {e}")))?;
588
589 Some(Arc::new(turul_a2a::push::PushDispatcher::new(
590 Arc::new(worker),
591 push_storage.clone(),
592 task_storage.clone(),
593 )))
594 } else {
595 None
596 };
597
598 let state = AppState {
599 executor,
600 task_storage,
601 push_storage,
602 event_store,
603 atomic_store,
604 event_broker: TaskEventBroker::new(),
605 middleware_stack: Arc::new(MiddlewareStack::new(self.middleware)),
606 runtime_config,
607 in_flight: Arc::new(turul_a2a::server::in_flight::InFlightRegistry::new()),
608 cancellation_supervisor,
609 push_delivery_store,
610 push_dispatcher,
611 durable_executor_queue: self.durable_executor_queue,
612 };
613
614 let router = build_router(state.clone());
615
616 let path_prefix = match self.path_prefix {
617 None => None,
618 Some(p) if p.is_empty() || p == "/" => None,
619 Some(p) => {
620 if !p.starts_with('/') {
621 return Err(A2aError::InvalidRequest {
622 message: format!(
623 "LambdaA2aServerBuilder::strip_path_prefix: prefix must start with '/'; got {p:?}"
624 ),
625 });
626 }
627 if p.ends_with('/') {
628 return Err(A2aError::InvalidRequest {
629 message: format!(
630 "LambdaA2aServerBuilder::strip_path_prefix: prefix must not end with '/' (except '/'); got {p:?}"
631 ),
632 });
633 }
634 Some(Arc::from(p))
635 }
636 };
637
638 Ok(LambdaA2aHandler {
639 router,
640 state,
641 path_prefix,
642 })
643 }
644}
645
646impl Default for LambdaA2aServerBuilder {
647 fn default() -> Self {
648 Self::new()
649 }
650}
651
652/// Lambda handler wrapping the axum Router.
653#[derive(Clone)]
654pub struct LambdaA2aHandler {
655 router: axum::Router,
656 /// Held for SQS-event dispatch (`handle_sqs`). Always
657 /// populated — the SQS handler method is gated behind the `sqs`
658 /// feature but the state is carried unconditionally so callers can
659 /// reach it for diagnostics or future non-HTTP / non-SQS handlers.
660 #[cfg_attr(not(feature = "sqs"), allow(dead_code))]
661 state: AppState,
662 /// Leading path segment to strip from each incoming HTTP request
663 /// before the A2A router sees it. `None` = no stripping. Set via
664 /// [`LambdaA2aServerBuilder::strip_path_prefix`].
665 path_prefix: Option<Arc<str>>,
666}
667
668impl LambdaA2aHandler {
669 pub fn builder() -> LambdaA2aServerBuilder {
670 LambdaA2aServerBuilder::new()
671 }
672
673 /// Handle a Lambda HTTP request.
674 pub async fn handle(
675 &self,
676 event: lambda_http::Request,
677 ) -> Result<lambda_http::Response<lambda_http::Body>, lambda_http::Error> {
678 let axum_req = lambda_to_axum_request(event)?;
679 let axum_req = match &self.path_prefix {
680 Some(prefix) => adapter::strip_request_path_prefix(axum_req, prefix),
681 None => axum_req,
682 };
683
684 let axum_resp = tower::ServiceExt::oneshot(self.router.clone(), axum_req)
685 .await
686 .map_err(|e| lambda_http::Error::from(format!("Router error: {e}")))?;
687
688 axum_to_lambda_response(axum_resp).await
689 }
690
691 /// Handle a raw Lambda HTTP event delivered as `serde_json::Value`
692 /// and return a response envelope **matching the inbound event
693 /// shape**:
694 ///
695 /// | Inbound shape detected by `lambda_http::LambdaRequest` | Response envelope |
696 /// |----------------------------------------------------------|------------------------------------|
697 /// | API Gateway v1 (REST) — top-level `httpMethod` + `path` | `ApiGatewayProxyResponse` |
698 /// | API Gateway v2 / Function URL — `requestContext.http` | `ApiGatewayV2httpResponse` |
699 /// | ALB target group — `requestContext.elb` | `AlbTargetGroupResponse` |
700 /// | WebSocket / unrecognised | `Err(...)` |
701 ///
702 /// Mismatching envelopes — most commonly a v2 response sent in
703 /// response to a v1 invocation — cause API Gateway to return
704 /// `502 Bad Gateway` regardless of the body, because the response
705 /// fails the gateway's contract validation. This method ensures
706 /// symmetry: response shape is always picked from the inbound
707 /// shape, never hardcoded.
708 ///
709 /// This is the framework's canonical HTTP envelope conversion
710 /// primitive — useful when a single Lambda function routes among
711 /// HTTP and one or more non-framework triggers (EventBridge,
712 /// DynamoDB streams, custom invoke) and the adopter drives
713 /// [`lambda_runtime::run`] with `serde_json::Value` directly.
714 /// Handles the `LambdaRequest` deserialization, the text-vs-base64
715 /// content-type detection, and the response envelope rebuild.
716 ///
717 /// Body encoding: text for `text/*`, `application/json`,
718 /// `application/xml`, `application/javascript`, or anything with
719 /// `charset=`; base64 otherwise.
720 ///
721 /// Path-prefix stripping (see
722 /// [`LambdaA2aServerBuilder::strip_path_prefix`]) is applied
723 /// because dispatch flows through [`Self::handle`]. Adopters who
724 /// only serve HTTP should continue to use [`Self::run_http_only`]
725 /// or [`LambdaA2aServerBuilder::run`]; this entry point exists for
726 /// composition with adopter-owned third triggers.
727 pub async fn handle_http_event_value(
728 &self,
729 value: serde_json::Value,
730 ) -> Result<serde_json::Value, lambda_runtime::Error> {
731 use base64::Engine;
732 use http_body_util::BodyExt;
733
734 let lambda_req: lambda_http::request::LambdaRequest = serde_json::from_value(value)
735 .map_err(|e| lambda_runtime::Error::from(format!("invalid HTTP event: {e}")))?;
736
737 // Capture the inbound shape so we can emit the matching
738 // response envelope. Lost once we convert to `lambda_http::Request`.
739 let shape = match &lambda_req {
740 lambda_http::request::LambdaRequest::ApiGatewayV1(_) => HttpEventShape::ApiGatewayV1,
741 lambda_http::request::LambdaRequest::ApiGatewayV2(_) => HttpEventShape::ApiGatewayV2,
742 lambda_http::request::LambdaRequest::Alb(_) => HttpEventShape::Alb,
743 lambda_http::request::LambdaRequest::WebSocket(_) => {
744 return Err(lambda_runtime::Error::from(
745 "WebSocket Lambda events are not supported by this adapter",
746 ));
747 }
748 _ => {
749 return Err(lambda_runtime::Error::from(
750 "unsupported Lambda HTTP event variant",
751 ));
752 }
753 };
754
755 let req: lambda_http::Request = lambda_req.into();
756
757 let resp = self
758 .handle(req)
759 .await
760 .map_err(|e| lambda_runtime::Error::from(format!("handler error: {e}")))?;
761 let (parts, body) = resp.into_parts();
762
763 let bytes = body
764 .collect()
765 .await
766 .map_err(|e| lambda_runtime::Error::from(format!("body collect: {e}")))?
767 .to_bytes();
768
769 let ct = parts
770 .headers
771 .get(http::header::CONTENT_TYPE)
772 .and_then(|v| v.to_str().ok())
773 .unwrap_or("application/octet-stream")
774 .to_ascii_lowercase();
775 let is_text = ct.starts_with("text/")
776 || ct.starts_with("application/json")
777 || ct.starts_with("application/xml")
778 || ct.starts_with("application/javascript")
779 || ct.contains("charset=");
780
781 let (body_str, is_base64) = if bytes.is_empty() {
782 (None, false)
783 } else if is_text {
784 match std::str::from_utf8(&bytes) {
785 Ok(s) => (Some(s.to_string()), false),
786 Err(_) => (
787 Some(base64::engine::general_purpose::STANDARD.encode(&bytes)),
788 true,
789 ),
790 }
791 } else {
792 (
793 Some(base64::engine::general_purpose::STANDARD.encode(&bytes)),
794 true,
795 )
796 };
797
798 let status = parts.status.as_u16() as i64;
799 let headers = parts.headers;
800 let body_lambda = body_str.map(aws_lambda_events::encodings::Body::Text);
801
802 match shape {
803 HttpEventShape::ApiGatewayV1 => {
804 let mut api_resp = aws_lambda_events::apigw::ApiGatewayProxyResponse::default();
805 api_resp.status_code = status;
806 api_resp.headers = headers;
807 api_resp.body = body_lambda;
808 api_resp.is_base64_encoded = is_base64;
809 serde_json::to_value(api_resp).map_err(|e| {
810 lambda_runtime::Error::from(format!("serialise APIGW v1 response: {e}"))
811 })
812 }
813 HttpEventShape::ApiGatewayV2 => {
814 let mut api_resp = aws_lambda_events::apigw::ApiGatewayV2httpResponse::default();
815 api_resp.status_code = status;
816 api_resp.headers = headers;
817 api_resp.body = body_lambda;
818 api_resp.is_base64_encoded = is_base64;
819 serde_json::to_value(api_resp).map_err(|e| {
820 lambda_runtime::Error::from(format!("serialise APIGW v2 response: {e}"))
821 })
822 }
823 HttpEventShape::Alb => {
824 let mut api_resp = aws_lambda_events::alb::AlbTargetGroupResponse::default();
825 api_resp.status_code = status;
826 api_resp.headers = headers;
827 api_resp.body = body_lambda;
828 api_resp.is_base64_encoded = is_base64;
829 serde_json::to_value(api_resp).map_err(|e| {
830 lambda_runtime::Error::from(format!("serialise ALB response: {e}"))
831 })
832 }
833 }
834 }
835}
836
837/// Inbound HTTP event shape detected by [`LambdaA2aHandler::handle_http_event_value`].
838///
839/// The response envelope must match the inbound shape — see method
840/// docs for the contract.
841#[derive(Debug, Clone, Copy, PartialEq, Eq)]
842enum HttpEventShape {
843 ApiGatewayV1,
844 ApiGatewayV2,
845 Alb,
846}
847
848impl LambdaA2aHandler {
849 /// Handle an SQS batch from the durable executor queue.
850 ///
851 /// Returns `SqsBatchResponse` whose `batch_item_failures` list
852 /// tells the event source mapping which records to retry.
853 /// Requires `FunctionResponseTypes: ["ReportBatchItemFailures"]`
854 /// on the mapping.
855 ///
856 /// Each record: deserialize envelope → load task → terminal-no-op
857 /// check → cancel-marker direct-CANCELED commit → run executor.
858 /// See `durable::drive_sqs_batch` for the full semantics.
859 #[cfg(feature = "sqs")]
860 pub async fn handle_sqs(
861 &self,
862 event: aws_lambda_events::event::sqs::SqsEvent,
863 ) -> aws_lambda_events::event::sqs::SqsBatchResponse {
864 durable::drive_sqs_batch(&self.state, event).await
865 }
866
867 /// Run this handler as a pure HTTP Lambda. Strict: any non-HTTP
868 /// event shape (SQS, DynamoDB stream, scheduler, …) causes
869 /// `lambda_http::run` to return a deserialization error on
870 /// invocation.
871 ///
872 /// Appropriate for Lambdas whose only trigger is a Function URL,
873 /// API Gateway, or ALB. The request Lambda in the two-Lambda
874 /// durable-executor topology is the canonical caller.
875 pub async fn run_http_only(self) -> Result<(), lambda_runtime::Error> {
876 let handler = Arc::new(self);
877 lambda_http::run(lambda_http::service_fn(move |event| {
878 let h = Arc::clone(&handler);
879 async move { h.handle(event).await }
880 }))
881 .await
882 }
883}
884
885// Default entrypoint when the `sqs` feature is off — `.run()` aliases
886// the HTTP-only runner. With the `sqs` feature enabled, `.run()` lives
887// in `durable.rs` and routes HTTP+SQS via the classifier.
888#[cfg(not(feature = "sqs"))]
889impl LambdaA2aHandler {
890 /// Default Lambda runner. Dispatches based on the event shape the
891 /// binary receives. Without the `sqs` feature this is HTTP-only
892 /// (equivalent to [`LambdaA2aHandler::run_http_only`]).
893 pub async fn run(self) -> Result<(), lambda_runtime::Error> {
894 self.run_http_only().await
895 }
896}
897
898impl LambdaA2aServerBuilder {
899 /// Build and run in one call. Sugar for
900 /// `self.build()?.run().await`. Hides the handler from the adopter —
901 /// use `.build()` + `handler.run_*()` instead when the handler is
902 /// needed for unit tests, custom middleware, or an explicit
903 /// topology runner.
904 pub async fn run(self) -> Result<(), lambda_runtime::Error> {
905 let handler = self.build().map_err(|e| {
906 lambda_runtime::Error::from(format!("LambdaA2aServerBuilder::run: {e}"))
907 })?;
908 handler.run().await
909 }
910}
911
912#[cfg(test)]
913mod handle_http_event_value_shape_tests {
914 //! Regression tests for the event-shape symmetry contract on
915 //! [`LambdaA2aHandler::handle_http_event_value`]. The wire-shape
916 //! mismatch (v2 response to a v1 invocation) is what causes API
917 //! Gateway REST integrations to return `502 Bad Gateway`. These
918 //! tests pin the contract: response envelope must always match
919 //! the inbound request envelope.
920
921 use super::*;
922 use std::sync::Arc;
923 use turul_a2a::executor::{AgentExecutor, ExecutionContext};
924 use turul_a2a::server::RuntimeConfig;
925 use turul_a2a::server::in_flight::InFlightRegistry;
926 use turul_a2a::storage::InMemoryA2aStorage;
927 use turul_a2a::streaming::TaskEventBroker;
928 use turul_a2a_types::{Message, Task};
929
930 struct NoOpExecutor;
931
932 #[async_trait::async_trait]
933 impl AgentExecutor for NoOpExecutor {
934 async fn execute(
935 &self,
936 task: &mut Task,
937 _msg: &Message,
938 _ctx: &ExecutionContext,
939 ) -> Result<(), turul_a2a::error::A2aError> {
940 let mut proto = task.as_proto().clone();
941 proto.status = Some(turul_a2a_proto::TaskStatus {
942 state: turul_a2a_proto::TaskState::Completed.into(),
943 message: None,
944 timestamp: None,
945 });
946 *task = Task::try_from(proto).unwrap();
947 Ok(())
948 }
949 fn agent_card(&self) -> turul_a2a_proto::AgentCard {
950 turul_a2a_proto::AgentCard {
951 name: "test-agent".to_string(),
952 ..Default::default()
953 }
954 }
955 }
956
957 fn build_handler() -> LambdaA2aHandler {
958 let s = Arc::new(InMemoryA2aStorage::new());
959 let state = AppState {
960 executor: Arc::new(NoOpExecutor),
961 task_storage: s.clone(),
962 push_storage: s.clone(),
963 event_store: s.clone(),
964 atomic_store: s.clone(),
965 event_broker: TaskEventBroker::new(),
966 middleware_stack: Arc::new(MiddlewareStack::new(vec![])),
967 runtime_config: RuntimeConfig::default(),
968 in_flight: Arc::new(InFlightRegistry::new()),
969 cancellation_supervisor: s.clone(),
970 push_delivery_store: None,
971 push_dispatcher: None,
972 durable_executor_queue: None,
973 };
974 let router = build_router(state.clone());
975 LambdaA2aHandler {
976 router,
977 state,
978 path_prefix: None,
979 }
980 }
981
982 /// API Gateway v2 / Function URL event. `requestContext.http.method`
983 /// is what `LambdaRequest::ApiGatewayV2` keys off.
984 fn apigw_v2_agent_card_get() -> serde_json::Value {
985 serde_json::json!({
986 "version": "2.0",
987 "routeKey": "GET /.well-known/agent-card.json",
988 "rawPath": "/.well-known/agent-card.json",
989 "rawQueryString": "",
990 "headers": {"accept": "application/json", "a2a-version": "1.0"},
991 "requestContext": {
992 "accountId": "000000000000",
993 "apiId": "api",
994 "domainName": "fake.execute-api",
995 "domainPrefix": "fake",
996 "http": {
997 "method": "GET",
998 "path": "/.well-known/agent-card.json",
999 "protocol": "HTTP/1.1",
1000 "sourceIp": "127.0.0.1",
1001 "userAgent": "test"
1002 },
1003 "requestId": "rid",
1004 "routeKey": "GET /.well-known/agent-card.json",
1005 "stage": "$default",
1006 "time": "01/Jan/2026:00:00:00 +0000",
1007 "timeEpoch": 1_735_689_600_000i64
1008 },
1009 "isBase64Encoded": false
1010 })
1011 }
1012
1013 /// API Gateway v1 (REST) event. Top-level `httpMethod` + `path` and
1014 /// `requestContext.identity` (no `requestContext.http`) is what
1015 /// `LambdaRequest::ApiGatewayV1` keys off. This is the shape REST
1016 /// API + AWS_PROXY integrations send into the Lambda — the live
1017 /// fleet topology that 502'd against a hardcoded v2 response.
1018 ///
1019 /// `stage` is `$default` here so `lambda_http` does not prepend a
1020 /// stage prefix — this fixture exists to pin the response-shape
1021 /// contract, not the path-rewriting behaviour. See the
1022 /// `strip_path_prefix_*` tests for stage-prefix coverage.
1023 fn apigw_v1_agent_card_get_with_stage(stage: &str, path: &str) -> serde_json::Value {
1024 let request_context_path = if stage == "$default" {
1025 path.to_string()
1026 } else {
1027 format!("/{stage}{path}")
1028 };
1029 serde_json::json!({
1030 "resource": "/{proxy+}",
1031 "path": path,
1032 "httpMethod": "GET",
1033 "headers": {"accept": "application/json", "a2a-version": "1.0"},
1034 "multiValueHeaders": {},
1035 "queryStringParameters": null,
1036 "multiValueQueryStringParameters": null,
1037 "pathParameters": {"proxy": path.trim_start_matches('/').to_string()},
1038 "stageVariables": null,
1039 "requestContext": {
1040 "accountId": "000000000000",
1041 "apiId": "abc123",
1042 "domainName": "fake.execute-api",
1043 "domainPrefix": "fake",
1044 "extendedRequestId": "xrid",
1045 "httpMethod": "GET",
1046 "identity": {
1047 "sourceIp": "127.0.0.1",
1048 "userAgent": "test"
1049 },
1050 "path": request_context_path,
1051 "protocol": "HTTP/1.1",
1052 "requestId": "rid",
1053 "requestTime": "01/Jan/2026:00:00:00 +0000",
1054 "requestTimeEpoch": 1_735_689_600_000i64,
1055 "resourceId": "rsrc",
1056 "resourcePath": "/{proxy+}",
1057 "stage": stage
1058 },
1059 "body": null,
1060 "isBase64Encoded": false
1061 })
1062 }
1063
1064 fn apigw_v1_agent_card_get() -> serde_json::Value {
1065 apigw_v1_agent_card_get_with_stage("$default", "/.well-known/agent-card.json")
1066 }
1067
1068 /// ALB target-group event. `requestContext.elb` is what
1069 /// `LambdaRequest::Alb` keys off.
1070 fn alb_agent_card_get() -> serde_json::Value {
1071 serde_json::json!({
1072 "requestContext": {
1073 "elb": {
1074 "targetGroupArn": "arn:aws:elasticloadbalancing:ap-southeast-2:000:targetgroup/fake/00"
1075 }
1076 },
1077 "httpMethod": "GET",
1078 "path": "/.well-known/agent-card.json",
1079 "queryStringParameters": {},
1080 "headers": {"accept": "application/json", "a2a-version": "1.0"},
1081 "body": "",
1082 "isBase64Encoded": false
1083 })
1084 }
1085
1086 /// REST v1 inbound MUST produce a v1 response shape. This is the
1087 /// regression for the live-fleet 502.
1088 #[tokio::test]
1089 async fn v1_inbound_event_emits_v1_response_envelope() {
1090 let handler = build_handler();
1091 let resp_value = handler
1092 .handle_http_event_value(apigw_v1_agent_card_get())
1093 .await
1094 .expect("handle_http_event_value(v1) must succeed");
1095
1096 // Parses cleanly as v1.
1097 let v1: aws_lambda_events::apigw::ApiGatewayProxyResponse =
1098 serde_json::from_value(resp_value.clone())
1099 .expect("response must deserialize as ApiGatewayProxyResponse (v1)");
1100 assert_eq!(v1.status_code, 200);
1101 match v1.body {
1102 Some(aws_lambda_events::encodings::Body::Text(s)) => {
1103 assert!(s.contains("\"name\""), "v1 body shape: {s}");
1104 }
1105 other => panic!("expected text body, got {other:?}"),
1106 }
1107
1108 // Negative: v1 response JSON must NOT contain the v2-only
1109 // `cookies` field. Presence of `cookies` is API Gateway REST's
1110 // 502 trigger.
1111 assert!(
1112 resp_value.get("cookies").is_none(),
1113 "v1 response must not carry v2-only `cookies` field; payload: {resp_value}"
1114 );
1115 }
1116
1117 /// API Gateway v2 / Function URL inbound MUST produce a v2 response
1118 /// shape (current behaviour, locked in to prevent the inverse
1119 /// regression).
1120 #[tokio::test]
1121 async fn v2_inbound_event_emits_v2_response_envelope() {
1122 let handler = build_handler();
1123 let resp_value = handler
1124 .handle_http_event_value(apigw_v2_agent_card_get())
1125 .await
1126 .expect("handle_http_event_value(v2) must succeed");
1127
1128 let v2: aws_lambda_events::apigw::ApiGatewayV2httpResponse =
1129 serde_json::from_value(resp_value.clone())
1130 .expect("response must deserialize as ApiGatewayV2httpResponse (v2)");
1131 assert_eq!(v2.status_code, 200);
1132 match v2.body {
1133 Some(aws_lambda_events::encodings::Body::Text(s)) => {
1134 assert!(s.contains("\"name\""), "v2 body shape: {s}");
1135 }
1136 other => panic!("expected text body, got {other:?}"),
1137 }
1138 }
1139
1140 /// ALB inbound MUST produce an ALB response shape.
1141 #[tokio::test]
1142 async fn alb_inbound_event_emits_alb_response_envelope() {
1143 let handler = build_handler();
1144 let resp_value = handler
1145 .handle_http_event_value(alb_agent_card_get())
1146 .await
1147 .expect("handle_http_event_value(alb) must succeed");
1148
1149 let alb: aws_lambda_events::alb::AlbTargetGroupResponse =
1150 serde_json::from_value(resp_value)
1151 .expect("response must deserialize as AlbTargetGroupResponse");
1152 assert_eq!(alb.status_code, 200);
1153 }
1154
1155 /// Live-fleet end-to-end regression: REST v1 stage-prefixed event
1156 /// + `strip_path_prefix("/dev")` on the handler routes through
1157 /// the agent-card endpoint AND emits a v1 response envelope. This
1158 /// is the exact scenario the live fleet was 502'ing on.
1159 #[tokio::test]
1160 async fn v1_stage_prefix_inbound_with_strip_emits_v1_response() {
1161 let s = Arc::new(InMemoryA2aStorage::new());
1162 let state = AppState {
1163 executor: Arc::new(NoOpExecutor),
1164 task_storage: s.clone(),
1165 push_storage: s.clone(),
1166 event_store: s.clone(),
1167 atomic_store: s.clone(),
1168 event_broker: TaskEventBroker::new(),
1169 middleware_stack: Arc::new(MiddlewareStack::new(vec![])),
1170 runtime_config: RuntimeConfig::default(),
1171 in_flight: Arc::new(InFlightRegistry::new()),
1172 cancellation_supervisor: s.clone(),
1173 push_delivery_store: None,
1174 push_dispatcher: None,
1175 durable_executor_queue: None,
1176 };
1177 let router = build_router(state.clone());
1178 let handler = LambdaA2aHandler {
1179 router,
1180 state,
1181 path_prefix: Some(Arc::from("/dev")),
1182 };
1183
1184 let event = apigw_v1_agent_card_get_with_stage("dev", "/.well-known/agent-card.json");
1185 let resp_value = handler
1186 .handle_http_event_value(event)
1187 .await
1188 .expect("v1 stage-prefixed event must succeed under strip_path_prefix");
1189
1190 let v1: aws_lambda_events::apigw::ApiGatewayProxyResponse =
1191 serde_json::from_value(resp_value)
1192 .expect("response must deserialize as ApiGatewayProxyResponse");
1193 assert_eq!(v1.status_code, 200);
1194 match v1.body {
1195 Some(aws_lambda_events::encodings::Body::Text(s)) => {
1196 assert!(s.contains("\"name\""), "v1 stage-prefix body: {s}");
1197 }
1198 other => panic!("expected text body, got {other:?}"),
1199 }
1200 }
1201
1202 /// Cross-shape decoding regression: a v1 response JSON carries
1203 /// `multiValueHeaders` (REST shape) but no `cookies`; a v2 response
1204 /// carries `cookies` but no v1-only `multiValueHeaders` semantics.
1205 /// Confirms the two shapes are wire-distinguishable, which is why
1206 /// the gateway 502s on a mismatch.
1207 #[tokio::test]
1208 async fn v1_response_does_not_match_v2_envelope_strictly() {
1209 let handler = build_handler();
1210 let v1_value = handler
1211 .handle_http_event_value(apigw_v1_agent_card_get())
1212 .await
1213 .unwrap();
1214
1215 // Sanity: v1 response JSON has no v2-only `cookies` field.
1216 assert!(v1_value.get("cookies").is_none());
1217
1218 let v2_value = handler
1219 .handle_http_event_value(apigw_v2_agent_card_get())
1220 .await
1221 .unwrap();
1222 // Sanity: v2 response JSON carries the v2 `cookies` field
1223 // (default empty array — still present on the wire).
1224 assert!(
1225 v2_value.get("cookies").is_some(),
1226 "v2 response should carry the v2-only cookies field"
1227 );
1228 }
1229}