turul_a2a_aws_lambda/lib.rs
1//! AWS Lambda adapter for turul-a2a.
2//!
3//! # Choosing a runner
4//!
5//! A Lambda binary receives raw event JSON; the adapter owns
6//! classifying that JSON and dispatching to the right A2A handler
7//! method. Adopters pick the runner whose name matches the Lambda's
8//! AWS trigger topology and end `main.rs` with one `.await?` call —
9//! `lambda_runtime` / `lambda_http` / event parsing never appear in
10//! adopter code.
11//!
12//! | Lambda triggers | Runner |
13//! |-----------------------------------|------------------------------------------------|
14//! | HTTP only (Function URL / APIGW) | [`LambdaA2aHandler::run_http_only`] |
15//! | SQS only (event source mapping) | [`LambdaA2aHandler::run_sqs_only`] *(`sqs`)* |
16//! | HTTP **and** SQS on one function | [`LambdaA2aHandler::run_http_and_sqs`] *(`sqs`)* |
17//! | not sure / just run it | [`LambdaA2aHandler::run`] |
18//!
19//! [`LambdaA2aHandler::run`] is the default "it just works" entry
20//! point: without the `sqs` feature it aliases `run_http_only`; with
21//! `sqs` it aliases `run_http_and_sqs` (permissive, handles either
22//! shape). The explicit runners are **strict** — a non-matching
23//! event shape fails loudly, which is what hardened deployments
24//! and tests want.
25//!
26//! The one-call shortcut is [`LambdaA2aServerBuilder::run`] — it
27//! build-then-runs in a single fluent call. Use `.build()` + a
28//! handler method when the handler is needed for unit tests or
29//! custom middleware.
30//!
31//! ## Composing with non-framework triggers
32//!
33//! Lambdas wired to HTTP **plus** a third trigger the framework does
34//! not know about (EventBridge scheduler, DynamoDB stream, custom
35//! invoke) can't use a named runner directly — the Unknown branch
36//! is adopter-owned. Reach for the public classifier + the HTTP
37//! envelope primitive instead:
38//!
39//! ```ignore
40//! lambda_runtime::run(lambda_runtime::service_fn(
41//! move |event: lambda_runtime::LambdaEvent<serde_json::Value>| {
42//! let handler = handler.clone();
43//! async move {
44//! let (value, _ctx) = event.into_parts();
45//! match turul_a2a_aws_lambda::classify_event(&value) {
46//! turul_a2a_aws_lambda::LambdaEvent::Http => {
47//! handler.handle_http_event_value(value).await
48//! }
49//! turul_a2a_aws_lambda::LambdaEvent::Sqs => {
50//! let sqs = serde_json::from_value(value)?;
51//! Ok(serde_json::to_value(handler.handle_sqs(sqs).await)?)
52//! }
53//! turul_a2a_aws_lambda::LambdaEvent::Unknown => {
54//! // Adopter-owned: run your scheduled sweep,
55//! // DynamoDB stream consumer, etc.
56//! run_adopter_specific_path(value).await
57//! }
58//! }
59//! }
60//! }
61//! )).await
62//! ```
63//!
64//! [`LambdaA2aHandler::handle_http_event_value`] is available without
65//! the `sqs` feature; `handle_sqs` is gated on `sqs`. `classify_event`
66//! + `LambdaEvent` are always available.
67//!
68//! ## API Gateway path prefix
69//!
70//! When Lambda sits behind a REST API Gateway with `AWS_PROXY`
71//! integration, the event carries the full stage + resource tree in
72//! the path (e.g. `/stage/agent/message:send`),
73//! which the root-rooted A2A router would 404. Configure the prefix
74//! to strip via [`LambdaA2aServerBuilder::strip_path_prefix`]:
75//!
76//! ```ignore
77//! LambdaA2aServerBuilder::new()
78//! .executor(my_executor)
79//! .storage(my_storage)
80//! .strip_path_prefix("/stage/agent") // API GW prefix
81//! .run()
82//! .await
83//! ```
84//!
85//! The strip applies to `run_http_only` and `run_http_and_sqs`.
86//! SQS events are unaffected. Non-matching paths pass through
87//! unchanged (router still 404s on genuinely unknown paths —
88//! that's the correct failure mode).
89//!
90//! ```ignore
91//! // HTTP-only Lambda (request Lambda in two-Lambda topology):
92//! LambdaA2aServerBuilder::new()
93//! .executor(my_executor)
94//! .storage(my_storage)
95//! .with_sqs_return_immediately(queue_url, sqs) // enqueues durable jobs
96//! .build()?
97//! .run_http_only()
98//! .await
99//!
100//! // Single-function HTTP+SQS demo:
101//! LambdaA2aServerBuilder::new()
102//! .executor(my_executor)
103//! .storage(my_storage)
104//! .with_sqs_return_immediately(queue_url, sqs)
105//! .run() // builder shortcut; dispatches HTTP + SQS
106//! .await
107//!
108//! // Pure SQS worker — no with_sqs_return_immediately(...) needed
109//! // (it consumes the queue, never enqueues):
110//! LambdaA2aServerBuilder::new()
111//! .executor(my_executor)
112//! .storage(my_storage)
113//! .build()?
114//! .run_sqs_only()
115//! .await
116//! ```
117//!
118//! # Adapter internals
119//!
120//! Thin wrapper: converts Lambda events to axum requests, delegates to the
121//! same Router, converts responses back. Per ADR-008/ADR-009:
122//! - Authorizer context mapped via synthetic headers with anti-spoofing
123//! - Streaming supported via durable event store (D3)
124//! - SSE responses are buffered: task executes, events are collected, returned as one response
125//! - `POST /message:stream` executes the task within the Lambda invocation and returns all events
126//! - `GET /tasks/{id}:subscribe` is for tasks that are **not** in a terminal
127//! state. Within one invocation it emits the initial `Task` snapshot,
128//! replays stored events via `Last-Event-ID`, and closes when the task
129//! reaches a terminal state. Subscribing to an already-terminal task
130//! returns `UnsupportedOperationError` per A2A v1.0 §3.1.6 / ADR-010
131//! §4.3. For retrieving a terminal task's final state use `GetTask`.
132//!
133//! # Push-notification delivery — external triggers are mandatory
134//!
135//! Push delivery on Lambda (ADR-013) is architecturally different
136//! from the binary server. The request Lambda installed via
137//! [`LambdaA2aServerBuilder`] still constructs a `PushDispatcher`
138//! when `push_delivery_store` is wired, but any `tokio::spawn`
139//! continuation it emits post-return is **opportunistic only** — the
140//! Lambda execution environment may be frozen indefinitely between
141//! invocations, so nothing can depend on that continuation completing
142//! (ADR-013 §4.4).
143//!
144//! Correctness for push delivery on Lambda is carried by:
145//!
146//! 1. The atomic pending-dispatch marker written inside the request
147//! Lambda's commit transaction (ADR-013 §4.3 — opt in via
148//! `StorageImpl::with_push_dispatch_enabled(true)`).
149//! 2. [`LambdaStreamRecoveryHandler`] — DynamoDB Streams trigger on
150//! `a2a_push_pending_dispatches`. DynamoDB backends only.
151//! 3. [`LambdaScheduledRecoveryHandler`] — EventBridge Scheduler
152//! backstop. **Required for all backends**; it is the sole
153//! recovery path for SQLite / PostgreSQL / in-memory deployments.
154//!
155//! Without at least the scheduled worker, push delivery on Lambda is
156//! not durable — a marker written on a cold invocation may never be
157//! consumed. The example wiring lives in
158//! `examples/lambda-stream-worker` and
159//! `examples/lambda-scheduled-worker`.
160//!
161//! Lambda streaming is request-scoped (not persistent SSE connections). The durable
162//! event store ensures events survive across invocations. Clients reconnect with
163//! `Last-Event-ID` for continuation.
164//!
165//! # Cross-instance cancellation
166//!
167//! Lambda invocations are stateless and short-lived, so the Lambda
168//! adapter does **not** run the persistent cross-instance cancel
169//! poller that `A2aServer::run()` spawns. Cancellation behaviour on
170//! the Lambda adapter:
171//!
172//! - **Marker writes** — `CancelTask` on a Lambda invocation writes
173//! the cancel marker to the shared backend (DynamoDB / PostgreSQL).
174//! This works.
175//! - **Propagation to a live executor on the SAME Lambda invocation** —
176//! works via the same-instance token-trip path in `core_cancel_task`.
177//! - **Propagation to a live executor on a DIFFERENT Lambda invocation
178//! (warm container)** — **not currently supported**. There is no
179//! persistent poller to observe markers written by other
180//! invocations. A subsequent invocation whose handler reads the
181//! marker directly may act on it, but that is not a substitute for
182//! the server runtime's live propagation.
183//!
184//! The builder still **requires** an `A2aCancellationSupervisor`
185//! implementation on the same backend so that marker writes reach
186//! the correct backend and a future polling-adapter variant can
187//! consume them. If a deployment passes a non-matching supervisor,
188//! `build()` rejects the configuration.
189
190mod adapter;
191mod auth;
192mod event;
193mod no_streaming;
194mod scheduled_recovery;
195mod stream_recovery;
196
197#[cfg(feature = "sqs")]
198mod durable;
199
200#[cfg(test)]
201mod builder_tests;
202#[cfg(test)]
203mod scheduled_recovery_tests;
204#[cfg(test)]
205mod stream_recovery_tests;
206
207pub use adapter::{axum_to_lambda_response, lambda_to_axum_request};
208pub use auth::{AuthorizerMapping, LambdaAuthorizerMiddleware};
209pub use event::{LambdaEvent, classify_event};
210pub use no_streaming::NoStreamingLayer;
211pub use scheduled_recovery::{
212 LambdaScheduledRecoveryConfig, LambdaScheduledRecoveryHandler, LambdaScheduledRecoveryResponse,
213};
214pub use stream_recovery::LambdaStreamRecoveryHandler;
215
216#[cfg(feature = "sqs")]
217pub use durable::{SqsDurableExecutorQueue, drive_sqs_batch};
218
219use std::sync::Arc;
220
221use turul_a2a::error::A2aError;
222use turul_a2a::executor::AgentExecutor;
223use turul_a2a::middleware::{A2aMiddleware, MiddlewareStack};
224use turul_a2a::push::A2aPushDeliveryStore;
225use turul_a2a::router::{AppState, build_router};
226use turul_a2a::server::RuntimeConfig;
227use turul_a2a::storage::{
228 A2aAtomicStore, A2aCancellationSupervisor, A2aEventStore, A2aPushNotificationStorage,
229 A2aTaskStorage,
230};
231use turul_a2a::streaming::TaskEventBroker;
232
233/// Builder for Lambda A2A handler.
234///
235/// Use `.storage(my_storage)` to supply a single backend implementing all storage traits.
236/// This satisfies ADR-009's same-backend requirement and enables streaming via durable
237/// event store.
238///
239/// For push-notification delivery, wire `.storage()` with a backend that implements
240/// [`A2aPushDeliveryStore`] (all four first-party backends do). The atomic store
241/// must also opt in via `with_push_dispatch_enabled(true)` (ADR-013 §4.3); the
242/// builder rejects configurations where the flag and the delivery store disagree.
243pub struct LambdaA2aServerBuilder {
244 executor: Option<Arc<dyn AgentExecutor>>,
245 task_storage: Option<Arc<dyn A2aTaskStorage>>,
246 push_storage: Option<Arc<dyn A2aPushNotificationStorage>>,
247 event_store: Option<Arc<dyn A2aEventStore>>,
248 atomic_store: Option<Arc<dyn A2aAtomicStore>>,
249 cancellation_supervisor: Option<Arc<dyn A2aCancellationSupervisor>>,
250 push_delivery_store: Option<Arc<dyn A2aPushDeliveryStore>>,
251 middleware: Vec<Arc<dyn A2aMiddleware>>,
252 runtime_config: RuntimeConfig,
253 durable_executor_queue: Option<Arc<dyn turul_a2a::durable_executor::DurableExecutorQueue>>,
254 path_prefix: Option<String>,
255}
256
257impl LambdaA2aServerBuilder {
258 pub fn new() -> Self {
259 Self {
260 executor: None,
261 task_storage: None,
262 push_storage: None,
263 event_store: None,
264 atomic_store: None,
265 cancellation_supervisor: None,
266 push_delivery_store: None,
267 middleware: vec![],
268 runtime_config: RuntimeConfig::default(),
269 durable_executor_queue: None,
270 path_prefix: None,
271 }
272 }
273
274 /// Configure an HTTP path prefix to strip before the A2A router
275 /// sees the request. Needed when Lambda sits behind an API Gateway
276 /// whose stage + resource tree is forwarded into the function
277 /// (e.g. REST API `AWS_PROXY` integrations surface
278 /// `/stage/agent/message:send` where the A2A
279 /// router expects `/message:send`).
280 ///
281 /// The prefix must start with `/` and must not end with `/`
282 /// (unless it is exactly `/`, which is a no-op). Segment boundaries
283 /// are respected — a prefix of `/dev` will not strip `/devs/...`.
284 /// Paths that do not start with the prefix pass through unchanged
285 /// (the router will 404 on a genuinely unknown path, which is the
286 /// correct failure mode). Applies to both
287 /// [`LambdaA2aHandler::run_http_only`] and
288 /// [`LambdaA2aHandler::run_http_and_sqs`]. SQS events are
289 /// unaffected.
290 pub fn strip_path_prefix(mut self, prefix: impl Into<String>) -> Self {
291 self.path_prefix = Some(prefix.into());
292 self
293 }
294
295 /// Wire a durable executor queue (ADR-018). When set, the
296 /// `return_immediately = true` path in `core_send_message` enqueues
297 /// a [`turul_a2a::durable_executor::QueuedExecutorJob`] on this
298 /// queue instead of spawning the executor locally, and the
299 /// capability flag `RuntimeConfig::supports_return_immediately` is
300 /// turned **back on** as an implementation detail — the capability
301 /// cannot be claimed without supplying the queue.
302 ///
303 /// Adopters should typically reach for the SQS-specific helper
304 /// [`Self::with_sqs_return_immediately`] instead; this generic
305 /// method exists so adopters can inject in-memory fakes for tests
306 /// or provide non-SQS transports (Kinesis, Step Functions task
307 /// token, self-invoke, etc.).
308 pub fn with_durable_executor(
309 mut self,
310 queue: Arc<dyn turul_a2a::durable_executor::DurableExecutorQueue>,
311 ) -> Self {
312 self.durable_executor_queue = Some(queue);
313 self.runtime_config.supports_return_immediately = true;
314 self
315 }
316
317 /// Wire the AWS SQS durable executor queue (ADR-018). Ergonomic
318 /// helper over [`Self::with_durable_executor`] that constructs a
319 /// [`SqsDurableExecutorQueue`] from a `queue_url` + shared SQS
320 /// client. Requires the `sqs` feature.
321 #[cfg(feature = "sqs")]
322 pub fn with_sqs_return_immediately(
323 self,
324 queue_url: impl Into<String>,
325 sqs_client: Arc<aws_sdk_sqs::Client>,
326 ) -> Self {
327 let queue = Arc::new(SqsDurableExecutorQueue::new(queue_url, sqs_client));
328 self.with_durable_executor(queue)
329 }
330
331 pub fn executor(mut self, exec: impl AgentExecutor + 'static) -> Self {
332 self.executor = Some(Arc::new(exec));
333 self
334 }
335
336 /// Set all storage from a single backend instance.
337 ///
338 /// This is the **preferred** method — a single struct implementing all
339 /// storage traits guarantees the same-backend requirement (ADR-009)
340 /// AND ensures the cancellation supervisor reads the same backend
341 /// the cancel marker is written to. A mismatch here (e.g., DynamoDB
342 /// task storage + in-memory cancellation supervisor) silently
343 /// breaks cross-instance cancellation.
344 ///
345 /// ADR-013 §4.3 errata: `.storage()` wires storage traits only. It does
346 /// **not** auto-register the storage as a push-delivery store, even if
347 /// the backend happens to implement [`A2aPushDeliveryStore`]. To opt in
348 /// to push delivery, call [`Self::push_delivery_store`] explicitly and
349 /// call `with_push_dispatch_enabled(true)` on the storage instance
350 /// before passing it here. Non-push deployments need neither.
351 pub fn storage<S>(mut self, storage: S) -> Self
352 where
353 S: A2aTaskStorage
354 + A2aPushNotificationStorage
355 + A2aEventStore
356 + A2aAtomicStore
357 + A2aCancellationSupervisor
358 + Clone
359 + 'static,
360 {
361 self.task_storage = Some(Arc::new(storage.clone()));
362 self.push_storage = Some(Arc::new(storage.clone()));
363 self.event_store = Some(Arc::new(storage.clone()));
364 self.atomic_store = Some(Arc::new(storage.clone()));
365 self.cancellation_supervisor = Some(Arc::new(storage));
366 self
367 }
368
369 /// Set task storage individually. Must be paired with event_store/atomic_store
370 /// AND `cancellation_supervisor()` for same-backend compliance.
371 pub fn task_storage(mut self, s: impl A2aTaskStorage + 'static) -> Self {
372 self.task_storage = Some(Arc::new(s));
373 self
374 }
375
376 /// Set push notification storage individually.
377 pub fn push_storage(mut self, s: impl A2aPushNotificationStorage + 'static) -> Self {
378 self.push_storage = Some(Arc::new(s));
379 self
380 }
381
382 /// Set event store individually. Must match task_storage backend.
383 pub fn event_store(mut self, s: impl A2aEventStore + 'static) -> Self {
384 self.event_store = Some(Arc::new(s));
385 self
386 }
387
388 /// Set atomic store individually. Must match task_storage backend.
389 pub fn atomic_store(mut self, s: impl A2aAtomicStore + 'static) -> Self {
390 self.atomic_store = Some(Arc::new(s));
391 self
392 }
393
394 /// Set the cancellation supervisor individually. Must match
395 /// task_storage backend — otherwise `:cancel` writes the marker to
396 /// one backend and the supervisor reads from another, silently
397 /// breaking cross-instance cancellation. Prefer `.storage()` for
398 /// unified wiring. Consumed by `core_cancel_task` (ADR-012).
399 pub fn cancellation_supervisor(mut self, s: impl A2aCancellationSupervisor + 'static) -> Self {
400 self.cancellation_supervisor = Some(Arc::new(s));
401 self
402 }
403
404 /// Set the push delivery store individually (ADR-011 §10 / ADR-013).
405 ///
406 /// Prefer `.storage()` for unified wiring. The delivery store MUST
407 /// be on the same backend as `.task_storage(...)` — the builder
408 /// rejects mismatches. Passing a delivery store also requires the
409 /// atomic store to have opted in via `with_push_dispatch_enabled(true)`
410 /// (ADR-013 §4.3).
411 pub fn push_delivery_store(mut self, store: impl A2aPushDeliveryStore + 'static) -> Self {
412 self.push_delivery_store = Some(Arc::new(store));
413 self
414 }
415
416 /// Override the runtime configuration (timeouts, retry budgets,
417 /// push tuning). Defaults to [`RuntimeConfig::default()`]. The
418 /// builder validates push settings (claim expiry vs retry horizon,
419 /// ADR-011 §10.3) when `.push_delivery_store(...)` is wired.
420 pub fn runtime_config(mut self, cfg: RuntimeConfig) -> Self {
421 self.runtime_config = cfg;
422 self
423 }
424
425 pub fn middleware(mut self, mw: Arc<dyn A2aMiddleware>) -> Self {
426 self.middleware.push(mw);
427 self
428 }
429
430 pub fn build(self) -> Result<LambdaA2aHandler, A2aError> {
431 let executor = self
432 .executor
433 .ok_or(A2aError::Internal("executor is required".into()))?;
434 let task_storage = self.task_storage.ok_or(A2aError::Internal(
435 "task_storage is required for Lambda".into(),
436 ))?;
437 let push_storage = self.push_storage.ok_or(A2aError::Internal(
438 "push_storage is required for Lambda".into(),
439 ))?;
440 let event_store: Arc<dyn A2aEventStore> = self.event_store.ok_or(A2aError::Internal(
441 "event_store is required for Lambda (use .storage() for unified backend)".into(),
442 ))?;
443 let atomic_store: Arc<dyn A2aAtomicStore> = self.atomic_store.ok_or(A2aError::Internal(
444 "atomic_store is required for Lambda (use .storage() for unified backend)".into(),
445 ))?;
446 // ADR-012: the cancellation supervisor MUST be supplied. No
447 // InMemoryA2aStorage fallback: that would silently break
448 // cross-instance cancellation by reading markers from a
449 // different backend than they were written to.
450 let cancellation_supervisor: Arc<dyn A2aCancellationSupervisor> =
451 self.cancellation_supervisor.ok_or(A2aError::Internal(
452 "cancellation_supervisor is required for Lambda. Use .storage() for unified \
453 backend wiring, or .cancellation_supervisor(...) alongside the individual \
454 storage setters. Omitting it would silently break cross-instance \
455 cancellation in ADR-012."
456 .into(),
457 ))?;
458
459 // Same-backend enforcement (ADR-009 + ADR-012).
460 let task_backend = task_storage.backend_name();
461 let push_backend = push_storage.backend_name();
462 let event_backend = event_store.backend_name();
463 let atomic_backend = atomic_store.backend_name();
464 let supervisor_backend = cancellation_supervisor.backend_name();
465 if task_backend != push_backend
466 || task_backend != event_backend
467 || task_backend != atomic_backend
468 || task_backend != supervisor_backend
469 {
470 return Err(A2aError::Internal(format!(
471 "Storage backend mismatch: task={task_backend}, push={push_backend}, \
472 event={event_backend}, atomic={atomic_backend}, \
473 cancellation_supervisor={supervisor_backend}. \
474 ADR-009 requires all storage traits to share the same backend. \
475 ADR-012 requires the cancellation supervisor on the same backend \
476 so cross-instance cancel markers are observable. \
477 Use .storage() for unified backend."
478 )));
479 }
480
481 let push_delivery_store = self.push_delivery_store;
482
483 // Push-dispatch consistency (ADR-013 §4.3): the atomic store's
484 // opt-in flag and the presence of `push_delivery_store` MUST
485 // agree. Both-true = push fully wired; both-false = non-push
486 // deployment. Mixed cases are build errors — this mirrors the
487 // main server builder (server/mod.rs).
488 match (
489 push_delivery_store.is_some(),
490 atomic_store.push_dispatch_enabled(),
491 ) {
492 (true, true) | (false, false) => {}
493 (true, false) => {
494 return Err(A2aError::Internal(
495 "push_delivery_store wired but atomic_store.push_dispatch_enabled() \
496 is false. Call .with_push_dispatch_enabled(true) on the backend \
497 storage before passing it to .storage()."
498 .into(),
499 ));
500 }
501 (false, true) => {
502 return Err(A2aError::Internal(
503 "atomic_store.push_dispatch_enabled() is true but no \
504 push_delivery_store is wired. Pending-dispatch markers would be \
505 written with no consumer, imposing load-bearing infra for no \
506 benefit. If you need to populate markers for an external \
507 consumer, open an issue for a distinctly-named opt-in — for now, \
508 this configuration is rejected."
509 .into(),
510 ));
511 }
512 }
513
514 // ADR-017 §Decision Bug 1: the Lambda execution environment
515 // freezes after the HTTP response is flushed, so any
516 // `tokio::spawn`'d executor continuation is opportunistic only
517 // (ADR-013 §4.4). Refuse `SendMessageConfiguration.return_immediately
518 // = true` at the `core_send_message` entry point via this
519 // capability flag instead of silently orphaning the executor.
520 //
521 // ADR-018: if a durable executor queue is wired via
522 // [`Self::with_durable_executor`] (or the SQS helper), the
523 // capability is re-enabled because enqueueing is durable
524 // across the Lambda freeze. The explicit re-enable lives in
525 // the builder method so the capability cannot be claimed
526 // without the mechanism.
527 let mut runtime_config = self.runtime_config;
528 if self.durable_executor_queue.is_none() {
529 runtime_config.supports_return_immediately = false;
530 }
531
532 // Push dispatcher wiring (ADR-011 §10 / ADR-013 §7). When
533 // `push_delivery_store` is present, validate its backend matches,
534 // validate the retry-horizon invariant, and construct the
535 // `PushDispatcher`. Under Lambda the dispatcher runs
536 // opportunistically post-return (ADR-013 §4.4); correctness
537 // comes from the atomic marker + stream/scheduler recovery.
538 let push_dispatcher: Option<Arc<turul_a2a::push::PushDispatcher>> =
539 if let Some(delivery) = push_delivery_store.as_ref() {
540 let push_delivery_backend = delivery.backend_name();
541 if task_backend != push_delivery_backend {
542 return Err(A2aError::Internal(format!(
543 "Storage backend mismatch: task={task_backend}, \
544 push_delivery={push_delivery_backend}. \
545 ADR-009 requires all storage traits to share the same backend."
546 )));
547 }
548
549 // ADR-011 §10.3: claim expiry must exceed the worst-case
550 // retry horizon so a live claim is not mis-classified as
551 // stale mid-retry.
552 let retry_horizon = runtime_config
553 .push_backoff_cap
554 .saturating_mul(runtime_config.push_max_attempts as u32);
555 if runtime_config.push_claim_expiry <= retry_horizon {
556 return Err(A2aError::Internal(format!(
557 "push_claim_expiry ({:?}) must be greater than retry horizon \
558 (push_max_attempts={} * push_backoff_cap={:?} = {:?}). \
559 Raise push_claim_expiry or lower push_max_attempts/push_backoff_cap.",
560 runtime_config.push_claim_expiry,
561 runtime_config.push_max_attempts,
562 runtime_config.push_backoff_cap,
563 retry_horizon
564 )));
565 }
566
567 let mut delivery_cfg = turul_a2a::push::delivery::PushDeliveryConfig::default();
568 delivery_cfg.max_attempts = runtime_config.push_max_attempts as u32;
569 delivery_cfg.backoff_base = runtime_config.push_backoff_base;
570 delivery_cfg.backoff_cap = runtime_config.push_backoff_cap;
571 delivery_cfg.backoff_jitter = runtime_config.push_backoff_jitter;
572 delivery_cfg.request_timeout = runtime_config.push_request_timeout;
573 delivery_cfg.connect_timeout = runtime_config.push_connect_timeout;
574 delivery_cfg.read_timeout = runtime_config.push_read_timeout;
575 delivery_cfg.claim_expiry = runtime_config.push_claim_expiry;
576 delivery_cfg.max_payload_bytes = runtime_config.push_max_payload_bytes;
577 delivery_cfg.allow_insecure_urls = runtime_config.allow_insecure_push_urls;
578
579 let instance_id = format!("a2a-lambda-{}", uuid::Uuid::now_v7());
580 let worker = turul_a2a::push::delivery::PushDeliveryWorker::new(
581 delivery.clone(),
582 delivery_cfg,
583 None,
584 instance_id,
585 )
586 .map_err(|e| A2aError::Internal(format!("push worker build failed: {e}")))?;
587
588 Some(Arc::new(turul_a2a::push::PushDispatcher::new(
589 Arc::new(worker),
590 push_storage.clone(),
591 task_storage.clone(),
592 )))
593 } else {
594 None
595 };
596
597 let state = AppState {
598 executor,
599 task_storage,
600 push_storage,
601 event_store,
602 atomic_store,
603 event_broker: TaskEventBroker::new(),
604 middleware_stack: Arc::new(MiddlewareStack::new(self.middleware)),
605 runtime_config,
606 in_flight: Arc::new(turul_a2a::server::in_flight::InFlightRegistry::new()),
607 cancellation_supervisor,
608 push_delivery_store,
609 push_dispatcher,
610 durable_executor_queue: self.durable_executor_queue,
611 };
612
613 let router = build_router(state.clone());
614
615 let path_prefix = match self.path_prefix {
616 None => None,
617 Some(p) if p.is_empty() || p == "/" => None,
618 Some(p) => {
619 if !p.starts_with('/') {
620 return Err(A2aError::InvalidRequest {
621 message: format!(
622 "LambdaA2aServerBuilder::strip_path_prefix: prefix must start with '/'; got {p:?}"
623 ),
624 });
625 }
626 if p.ends_with('/') {
627 return Err(A2aError::InvalidRequest {
628 message: format!(
629 "LambdaA2aServerBuilder::strip_path_prefix: prefix must not end with '/' (except '/'); got {p:?}"
630 ),
631 });
632 }
633 Some(Arc::from(p))
634 }
635 };
636
637 Ok(LambdaA2aHandler {
638 router,
639 state,
640 path_prefix,
641 })
642 }
643}
644
645impl Default for LambdaA2aServerBuilder {
646 fn default() -> Self {
647 Self::new()
648 }
649}
650
651/// Lambda handler wrapping the axum Router.
652#[derive(Clone)]
653pub struct LambdaA2aHandler {
654 router: axum::Router,
655 /// Held for ADR-018 SQS-event dispatch (`handle_sqs`). Always
656 /// populated — the SQS handler method is gated behind the `sqs`
657 /// feature but the state is carried unconditionally so callers can
658 /// reach it for diagnostics or future non-HTTP / non-SQS handlers.
659 #[cfg_attr(not(feature = "sqs"), allow(dead_code))]
660 state: AppState,
661 /// Leading path segment to strip from each incoming HTTP request
662 /// before the A2A router sees it. `None` = no stripping. Set via
663 /// [`LambdaA2aServerBuilder::strip_path_prefix`].
664 path_prefix: Option<Arc<str>>,
665}
666
667impl LambdaA2aHandler {
668 pub fn builder() -> LambdaA2aServerBuilder {
669 LambdaA2aServerBuilder::new()
670 }
671
672 /// Handle a Lambda HTTP request.
673 pub async fn handle(
674 &self,
675 event: lambda_http::Request,
676 ) -> Result<lambda_http::Response<lambda_http::Body>, lambda_http::Error> {
677 let axum_req = lambda_to_axum_request(event)?;
678 let axum_req = match &self.path_prefix {
679 Some(prefix) => adapter::strip_request_path_prefix(axum_req, prefix),
680 None => axum_req,
681 };
682
683 let axum_resp = tower::ServiceExt::oneshot(self.router.clone(), axum_req)
684 .await
685 .map_err(|e| lambda_http::Error::from(format!("Router error: {e}")))?;
686
687 axum_to_lambda_response(axum_resp).await
688 }
689
690 /// Handle a raw Lambda HTTP event delivered as `serde_json::Value`
691 /// and return a response envelope **matching the inbound event
692 /// shape**:
693 ///
694 /// | Inbound shape detected by `lambda_http::LambdaRequest` | Response envelope |
695 /// |----------------------------------------------------------|------------------------------------|
696 /// | API Gateway v1 (REST) — top-level `httpMethod` + `path` | `ApiGatewayProxyResponse` |
697 /// | API Gateway v2 / Function URL — `requestContext.http` | `ApiGatewayV2httpResponse` |
698 /// | ALB target group — `requestContext.elb` | `AlbTargetGroupResponse` |
699 /// | WebSocket / unrecognised | `Err(...)` |
700 ///
701 /// Mismatching envelopes — most commonly a v2 response sent in
702 /// response to a v1 invocation — cause API Gateway to return
703 /// `502 Bad Gateway` regardless of the body, because the response
704 /// fails the gateway's contract validation. This method ensures
705 /// symmetry: response shape is always picked from the inbound
706 /// shape, never hardcoded.
707 ///
708 /// This is the framework's canonical HTTP envelope conversion
709 /// primitive — useful when a single Lambda function routes among
710 /// HTTP and one or more non-framework triggers (EventBridge,
711 /// DynamoDB streams, custom invoke) and the adopter drives
712 /// [`lambda_runtime::run`] with `serde_json::Value` directly.
713 /// Handles the `LambdaRequest` deserialization, the text-vs-base64
714 /// content-type detection, and the response envelope rebuild.
715 ///
716 /// Body encoding: text for `text/*`, `application/json`,
717 /// `application/xml`, `application/javascript`, or anything with
718 /// `charset=`; base64 otherwise.
719 ///
720 /// Path-prefix stripping (see
721 /// [`LambdaA2aServerBuilder::strip_path_prefix`]) is applied
722 /// because dispatch flows through [`Self::handle`]. Adopters who
723 /// only serve HTTP should continue to use [`Self::run_http_only`]
724 /// or [`LambdaA2aServerBuilder::run`]; this entry point exists for
725 /// composition with adopter-owned third triggers.
726 pub async fn handle_http_event_value(
727 &self,
728 value: serde_json::Value,
729 ) -> Result<serde_json::Value, lambda_runtime::Error> {
730 use base64::Engine;
731 use http_body_util::BodyExt;
732
733 let lambda_req: lambda_http::request::LambdaRequest = serde_json::from_value(value)
734 .map_err(|e| lambda_runtime::Error::from(format!("invalid HTTP event: {e}")))?;
735
736 // Capture the inbound shape so we can emit the matching
737 // response envelope. Lost once we convert to `lambda_http::Request`.
738 let shape = match &lambda_req {
739 lambda_http::request::LambdaRequest::ApiGatewayV1(_) => HttpEventShape::ApiGatewayV1,
740 lambda_http::request::LambdaRequest::ApiGatewayV2(_) => HttpEventShape::ApiGatewayV2,
741 lambda_http::request::LambdaRequest::Alb(_) => HttpEventShape::Alb,
742 lambda_http::request::LambdaRequest::WebSocket(_) => {
743 return Err(lambda_runtime::Error::from(
744 "WebSocket Lambda events are not supported by this adapter",
745 ));
746 }
747 _ => {
748 return Err(lambda_runtime::Error::from(
749 "unsupported Lambda HTTP event variant",
750 ));
751 }
752 };
753
754 let req: lambda_http::Request = lambda_req.into();
755
756 let resp = self
757 .handle(req)
758 .await
759 .map_err(|e| lambda_runtime::Error::from(format!("handler error: {e}")))?;
760 let (parts, body) = resp.into_parts();
761
762 let bytes = body
763 .collect()
764 .await
765 .map_err(|e| lambda_runtime::Error::from(format!("body collect: {e}")))?
766 .to_bytes();
767
768 let ct = parts
769 .headers
770 .get(http::header::CONTENT_TYPE)
771 .and_then(|v| v.to_str().ok())
772 .unwrap_or("application/octet-stream")
773 .to_ascii_lowercase();
774 let is_text = ct.starts_with("text/")
775 || ct.starts_with("application/json")
776 || ct.starts_with("application/xml")
777 || ct.starts_with("application/javascript")
778 || ct.contains("charset=");
779
780 let (body_str, is_base64) = if bytes.is_empty() {
781 (None, false)
782 } else if is_text {
783 match std::str::from_utf8(&bytes) {
784 Ok(s) => (Some(s.to_string()), false),
785 Err(_) => (
786 Some(base64::engine::general_purpose::STANDARD.encode(&bytes)),
787 true,
788 ),
789 }
790 } else {
791 (
792 Some(base64::engine::general_purpose::STANDARD.encode(&bytes)),
793 true,
794 )
795 };
796
797 let status = parts.status.as_u16() as i64;
798 let headers = parts.headers;
799 let body_lambda = body_str.map(aws_lambda_events::encodings::Body::Text);
800
801 match shape {
802 HttpEventShape::ApiGatewayV1 => {
803 let mut api_resp = aws_lambda_events::apigw::ApiGatewayProxyResponse::default();
804 api_resp.status_code = status;
805 api_resp.headers = headers;
806 api_resp.body = body_lambda;
807 api_resp.is_base64_encoded = is_base64;
808 serde_json::to_value(api_resp).map_err(|e| {
809 lambda_runtime::Error::from(format!("serialise APIGW v1 response: {e}"))
810 })
811 }
812 HttpEventShape::ApiGatewayV2 => {
813 let mut api_resp = aws_lambda_events::apigw::ApiGatewayV2httpResponse::default();
814 api_resp.status_code = status;
815 api_resp.headers = headers;
816 api_resp.body = body_lambda;
817 api_resp.is_base64_encoded = is_base64;
818 serde_json::to_value(api_resp).map_err(|e| {
819 lambda_runtime::Error::from(format!("serialise APIGW v2 response: {e}"))
820 })
821 }
822 HttpEventShape::Alb => {
823 let mut api_resp = aws_lambda_events::alb::AlbTargetGroupResponse::default();
824 api_resp.status_code = status;
825 api_resp.headers = headers;
826 api_resp.body = body_lambda;
827 api_resp.is_base64_encoded = is_base64;
828 serde_json::to_value(api_resp).map_err(|e| {
829 lambda_runtime::Error::from(format!("serialise ALB response: {e}"))
830 })
831 }
832 }
833 }
834}
835
836/// Inbound HTTP event shape detected by [`LambdaA2aHandler::handle_http_event_value`].
837///
838/// The response envelope must match the inbound shape — see method
839/// docs for the contract.
840#[derive(Debug, Clone, Copy, PartialEq, Eq)]
841enum HttpEventShape {
842 ApiGatewayV1,
843 ApiGatewayV2,
844 Alb,
845}
846
847impl LambdaA2aHandler {
848 /// Handle an SQS batch from the durable executor queue (ADR-018).
849 ///
850 /// Returns `SqsBatchResponse` whose `batch_item_failures` list
851 /// tells the event source mapping which records to retry.
852 /// Requires `FunctionResponseTypes: ["ReportBatchItemFailures"]`
853 /// on the mapping.
854 ///
855 /// Each record: deserialize envelope → load task → terminal-no-op
856 /// check → cancel-marker direct-CANCELED commit → run executor.
857 /// See `durable::drive_sqs_batch` for the full semantics.
858 #[cfg(feature = "sqs")]
859 pub async fn handle_sqs(
860 &self,
861 event: aws_lambda_events::event::sqs::SqsEvent,
862 ) -> aws_lambda_events::event::sqs::SqsBatchResponse {
863 durable::drive_sqs_batch(&self.state, event).await
864 }
865
866 /// Run this handler as a pure HTTP Lambda. Strict: any non-HTTP
867 /// event shape (SQS, DynamoDB stream, scheduler, …) causes
868 /// `lambda_http::run` to return a deserialization error on
869 /// invocation.
870 ///
871 /// Appropriate for Lambdas whose only trigger is a Function URL,
872 /// API Gateway, or ALB. The request Lambda in the two-Lambda
873 /// durable-executor topology is the canonical caller.
874 pub async fn run_http_only(self) -> Result<(), lambda_runtime::Error> {
875 let handler = Arc::new(self);
876 lambda_http::run(lambda_http::service_fn(move |event| {
877 let h = Arc::clone(&handler);
878 async move { h.handle(event).await }
879 }))
880 .await
881 }
882}
883
884// Default entrypoint when the `sqs` feature is off — `.run()` aliases
885// the HTTP-only runner. With the `sqs` feature enabled, `.run()` lives
886// in `durable.rs` and routes HTTP+SQS via the classifier.
887#[cfg(not(feature = "sqs"))]
888impl LambdaA2aHandler {
889 /// Default Lambda runner. Dispatches based on the event shape the
890 /// binary receives. Without the `sqs` feature this is HTTP-only
891 /// (equivalent to [`LambdaA2aHandler::run_http_only`]).
892 pub async fn run(self) -> Result<(), lambda_runtime::Error> {
893 self.run_http_only().await
894 }
895}
896
897impl LambdaA2aServerBuilder {
898 /// Build and run in one call. Sugar for
899 /// `self.build()?.run().await`. Hides the handler from the adopter —
900 /// use `.build()` + `handler.run_*()` instead when the handler is
901 /// needed for unit tests, custom middleware, or an explicit
902 /// topology runner.
903 pub async fn run(self) -> Result<(), lambda_runtime::Error> {
904 let handler = self.build().map_err(|e| {
905 lambda_runtime::Error::from(format!("LambdaA2aServerBuilder::run: {e}"))
906 })?;
907 handler.run().await
908 }
909}
910
911#[cfg(test)]
912mod handle_http_event_value_shape_tests {
913 //! Regression tests for the event-shape symmetry contract on
914 //! [`LambdaA2aHandler::handle_http_event_value`]. The wire-shape
915 //! mismatch (v2 response to a v1 invocation) is what causes API
916 //! Gateway REST integrations to return `502 Bad Gateway`. These
917 //! tests pin the contract: response envelope must always match
918 //! the inbound request envelope.
919
920 use super::*;
921 use std::sync::Arc;
922 use turul_a2a::executor::{AgentExecutor, ExecutionContext};
923 use turul_a2a::server::RuntimeConfig;
924 use turul_a2a::server::in_flight::InFlightRegistry;
925 use turul_a2a::storage::InMemoryA2aStorage;
926 use turul_a2a::streaming::TaskEventBroker;
927 use turul_a2a_types::{Message, Task};
928
929 struct NoOpExecutor;
930
931 #[async_trait::async_trait]
932 impl AgentExecutor for NoOpExecutor {
933 async fn execute(
934 &self,
935 task: &mut Task,
936 _msg: &Message,
937 _ctx: &ExecutionContext,
938 ) -> Result<(), turul_a2a::error::A2aError> {
939 let mut proto = task.as_proto().clone();
940 proto.status = Some(turul_a2a_proto::TaskStatus {
941 state: turul_a2a_proto::TaskState::Completed.into(),
942 message: None,
943 timestamp: None,
944 });
945 *task = Task::try_from(proto).unwrap();
946 Ok(())
947 }
948 fn agent_card(&self) -> turul_a2a_proto::AgentCard {
949 turul_a2a_proto::AgentCard {
950 name: "test-agent".to_string(),
951 ..Default::default()
952 }
953 }
954 }
955
956 fn build_handler() -> LambdaA2aHandler {
957 let s = Arc::new(InMemoryA2aStorage::new());
958 let state = AppState {
959 executor: Arc::new(NoOpExecutor),
960 task_storage: s.clone(),
961 push_storage: s.clone(),
962 event_store: s.clone(),
963 atomic_store: s.clone(),
964 event_broker: TaskEventBroker::new(),
965 middleware_stack: Arc::new(MiddlewareStack::new(vec![])),
966 runtime_config: RuntimeConfig::default(),
967 in_flight: Arc::new(InFlightRegistry::new()),
968 cancellation_supervisor: s.clone(),
969 push_delivery_store: None,
970 push_dispatcher: None,
971 durable_executor_queue: None,
972 };
973 let router = build_router(state.clone());
974 LambdaA2aHandler {
975 router,
976 state,
977 path_prefix: None,
978 }
979 }
980
981 /// API Gateway v2 / Function URL event. `requestContext.http.method`
982 /// is what `LambdaRequest::ApiGatewayV2` keys off.
983 fn apigw_v2_agent_card_get() -> serde_json::Value {
984 serde_json::json!({
985 "version": "2.0",
986 "routeKey": "GET /.well-known/agent-card.json",
987 "rawPath": "/.well-known/agent-card.json",
988 "rawQueryString": "",
989 "headers": {"accept": "application/json", "a2a-version": "1.0"},
990 "requestContext": {
991 "accountId": "000000000000",
992 "apiId": "api",
993 "domainName": "fake.execute-api",
994 "domainPrefix": "fake",
995 "http": {
996 "method": "GET",
997 "path": "/.well-known/agent-card.json",
998 "protocol": "HTTP/1.1",
999 "sourceIp": "127.0.0.1",
1000 "userAgent": "test"
1001 },
1002 "requestId": "rid",
1003 "routeKey": "GET /.well-known/agent-card.json",
1004 "stage": "$default",
1005 "time": "01/Jan/2026:00:00:00 +0000",
1006 "timeEpoch": 1_735_689_600_000i64
1007 },
1008 "isBase64Encoded": false
1009 })
1010 }
1011
1012 /// API Gateway v1 (REST) event. Top-level `httpMethod` + `path` and
1013 /// `requestContext.identity` (no `requestContext.http`) is what
1014 /// `LambdaRequest::ApiGatewayV1` keys off. This is the shape REST
1015 /// API + AWS_PROXY integrations send into the Lambda — the live
1016 /// fleet topology that 502'd against a hardcoded v2 response.
1017 ///
1018 /// `stage` is `$default` here so `lambda_http` does not prepend a
1019 /// stage prefix — this fixture exists to pin the response-shape
1020 /// contract, not the path-rewriting behaviour. See the
1021 /// `strip_path_prefix_*` tests for stage-prefix coverage.
1022 fn apigw_v1_agent_card_get_with_stage(stage: &str, path: &str) -> serde_json::Value {
1023 let request_context_path = if stage == "$default" {
1024 path.to_string()
1025 } else {
1026 format!("/{stage}{path}")
1027 };
1028 serde_json::json!({
1029 "resource": "/{proxy+}",
1030 "path": path,
1031 "httpMethod": "GET",
1032 "headers": {"accept": "application/json", "a2a-version": "1.0"},
1033 "multiValueHeaders": {},
1034 "queryStringParameters": null,
1035 "multiValueQueryStringParameters": null,
1036 "pathParameters": {"proxy": path.trim_start_matches('/').to_string()},
1037 "stageVariables": null,
1038 "requestContext": {
1039 "accountId": "000000000000",
1040 "apiId": "abc123",
1041 "domainName": "fake.execute-api",
1042 "domainPrefix": "fake",
1043 "extendedRequestId": "xrid",
1044 "httpMethod": "GET",
1045 "identity": {
1046 "sourceIp": "127.0.0.1",
1047 "userAgent": "test"
1048 },
1049 "path": request_context_path,
1050 "protocol": "HTTP/1.1",
1051 "requestId": "rid",
1052 "requestTime": "01/Jan/2026:00:00:00 +0000",
1053 "requestTimeEpoch": 1_735_689_600_000i64,
1054 "resourceId": "rsrc",
1055 "resourcePath": "/{proxy+}",
1056 "stage": stage
1057 },
1058 "body": null,
1059 "isBase64Encoded": false
1060 })
1061 }
1062
1063 fn apigw_v1_agent_card_get() -> serde_json::Value {
1064 apigw_v1_agent_card_get_with_stage("$default", "/.well-known/agent-card.json")
1065 }
1066
1067 /// ALB target-group event. `requestContext.elb` is what
1068 /// `LambdaRequest::Alb` keys off.
1069 fn alb_agent_card_get() -> serde_json::Value {
1070 serde_json::json!({
1071 "requestContext": {
1072 "elb": {
1073 "targetGroupArn": "arn:aws:elasticloadbalancing:ap-southeast-2:000:targetgroup/fake/00"
1074 }
1075 },
1076 "httpMethod": "GET",
1077 "path": "/.well-known/agent-card.json",
1078 "queryStringParameters": {},
1079 "headers": {"accept": "application/json", "a2a-version": "1.0"},
1080 "body": "",
1081 "isBase64Encoded": false
1082 })
1083 }
1084
1085 /// REST v1 inbound MUST produce a v1 response shape. This is the
1086 /// regression for the live-fleet 502.
1087 #[tokio::test]
1088 async fn v1_inbound_event_emits_v1_response_envelope() {
1089 let handler = build_handler();
1090 let resp_value = handler
1091 .handle_http_event_value(apigw_v1_agent_card_get())
1092 .await
1093 .expect("handle_http_event_value(v1) must succeed");
1094
1095 // Parses cleanly as v1.
1096 let v1: aws_lambda_events::apigw::ApiGatewayProxyResponse =
1097 serde_json::from_value(resp_value.clone())
1098 .expect("response must deserialize as ApiGatewayProxyResponse (v1)");
1099 assert_eq!(v1.status_code, 200);
1100 match v1.body {
1101 Some(aws_lambda_events::encodings::Body::Text(s)) => {
1102 assert!(s.contains("\"name\""), "v1 body shape: {s}");
1103 }
1104 other => panic!("expected text body, got {other:?}"),
1105 }
1106
1107 // Negative: v1 response JSON must NOT contain the v2-only
1108 // `cookies` field. Presence of `cookies` is API Gateway REST's
1109 // 502 trigger.
1110 assert!(
1111 resp_value.get("cookies").is_none(),
1112 "v1 response must not carry v2-only `cookies` field; payload: {resp_value}"
1113 );
1114 }
1115
1116 /// API Gateway v2 / Function URL inbound MUST produce a v2 response
1117 /// shape (current behaviour, locked in to prevent the inverse
1118 /// regression).
1119 #[tokio::test]
1120 async fn v2_inbound_event_emits_v2_response_envelope() {
1121 let handler = build_handler();
1122 let resp_value = handler
1123 .handle_http_event_value(apigw_v2_agent_card_get())
1124 .await
1125 .expect("handle_http_event_value(v2) must succeed");
1126
1127 let v2: aws_lambda_events::apigw::ApiGatewayV2httpResponse =
1128 serde_json::from_value(resp_value.clone())
1129 .expect("response must deserialize as ApiGatewayV2httpResponse (v2)");
1130 assert_eq!(v2.status_code, 200);
1131 match v2.body {
1132 Some(aws_lambda_events::encodings::Body::Text(s)) => {
1133 assert!(s.contains("\"name\""), "v2 body shape: {s}");
1134 }
1135 other => panic!("expected text body, got {other:?}"),
1136 }
1137 }
1138
1139 /// ALB inbound MUST produce an ALB response shape.
1140 #[tokio::test]
1141 async fn alb_inbound_event_emits_alb_response_envelope() {
1142 let handler = build_handler();
1143 let resp_value = handler
1144 .handle_http_event_value(alb_agent_card_get())
1145 .await
1146 .expect("handle_http_event_value(alb) must succeed");
1147
1148 let alb: aws_lambda_events::alb::AlbTargetGroupResponse =
1149 serde_json::from_value(resp_value)
1150 .expect("response must deserialize as AlbTargetGroupResponse");
1151 assert_eq!(alb.status_code, 200);
1152 }
1153
1154 /// Live-fleet end-to-end regression: REST v1 stage-prefixed event
1155 /// + `strip_path_prefix("/dev")` on the handler routes through
1156 /// the agent-card endpoint AND emits a v1 response envelope. This
1157 /// is the exact scenario the live fleet was 502'ing on.
1158 #[tokio::test]
1159 async fn v1_stage_prefix_inbound_with_strip_emits_v1_response() {
1160 let s = Arc::new(InMemoryA2aStorage::new());
1161 let state = AppState {
1162 executor: Arc::new(NoOpExecutor),
1163 task_storage: s.clone(),
1164 push_storage: s.clone(),
1165 event_store: s.clone(),
1166 atomic_store: s.clone(),
1167 event_broker: TaskEventBroker::new(),
1168 middleware_stack: Arc::new(MiddlewareStack::new(vec![])),
1169 runtime_config: RuntimeConfig::default(),
1170 in_flight: Arc::new(InFlightRegistry::new()),
1171 cancellation_supervisor: s.clone(),
1172 push_delivery_store: None,
1173 push_dispatcher: None,
1174 durable_executor_queue: None,
1175 };
1176 let router = build_router(state.clone());
1177 let handler = LambdaA2aHandler {
1178 router,
1179 state,
1180 path_prefix: Some(Arc::from("/dev")),
1181 };
1182
1183 let event = apigw_v1_agent_card_get_with_stage("dev", "/.well-known/agent-card.json");
1184 let resp_value = handler
1185 .handle_http_event_value(event)
1186 .await
1187 .expect("v1 stage-prefixed event must succeed under strip_path_prefix");
1188
1189 let v1: aws_lambda_events::apigw::ApiGatewayProxyResponse =
1190 serde_json::from_value(resp_value)
1191 .expect("response must deserialize as ApiGatewayProxyResponse");
1192 assert_eq!(v1.status_code, 200);
1193 match v1.body {
1194 Some(aws_lambda_events::encodings::Body::Text(s)) => {
1195 assert!(s.contains("\"name\""), "v1 stage-prefix body: {s}");
1196 }
1197 other => panic!("expected text body, got {other:?}"),
1198 }
1199 }
1200
1201 /// Cross-shape decoding regression: a v1 response JSON carries
1202 /// `multiValueHeaders` (REST shape) but no `cookies`; a v2 response
1203 /// carries `cookies` but no v1-only `multiValueHeaders` semantics.
1204 /// Confirms the two shapes are wire-distinguishable, which is why
1205 /// the gateway 502s on a mismatch.
1206 #[tokio::test]
1207 async fn v1_response_does_not_match_v2_envelope_strictly() {
1208 let handler = build_handler();
1209 let v1_value = handler
1210 .handle_http_event_value(apigw_v1_agent_card_get())
1211 .await
1212 .unwrap();
1213
1214 // Sanity: v1 response JSON has no v2-only `cookies` field.
1215 assert!(v1_value.get("cookies").is_none());
1216
1217 let v2_value = handler
1218 .handle_http_event_value(apigw_v2_agent_card_get())
1219 .await
1220 .unwrap();
1221 // Sanity: v2 response JSON carries the v2 `cookies` field
1222 // (default empty array — still present on the wire).
1223 assert!(
1224 v2_value.get("cookies").is_some(),
1225 "v2 response should carry the v2-only cookies field"
1226 );
1227 }
1228}