turul_a2a_aws_lambda/lib.rs
1//! AWS Lambda adapter for turul-a2a.
2//!
3//! Thin wrapper: converts Lambda events to axum requests, delegates to the
4//! same Router, converts responses back. Per ADR-008/ADR-009:
5//! - Authorizer context mapped via synthetic headers with anti-spoofing
6//! - Streaming supported via durable event store (D3)
7//! - SSE responses are buffered: task executes, events are collected, returned as one response
8//! - `POST /message:stream` executes the task within the Lambda invocation and returns all events
9//! - `GET /tasks/{id}:subscribe` is for tasks that are **not** in a terminal
10//! state. Within one invocation it emits the initial `Task` snapshot,
11//! replays stored events via `Last-Event-ID`, and closes when the task
12//! reaches a terminal state. Subscribing to an already-terminal task
13//! returns `UnsupportedOperationError` per A2A v1.0 §3.1.6 / ADR-010
14//! §4.3. For retrieving a terminal task's final state use `GetTask`.
15//!
16//! # Push-notification delivery — external triggers are mandatory
17//!
18//! Push delivery on Lambda (ADR-013) is architecturally different
19//! from the binary server. The request Lambda installed via
20//! [`LambdaA2aServerBuilder`] still constructs a `PushDispatcher`
21//! when `push_delivery_store` is wired, but any `tokio::spawn`
22//! continuation it emits post-return is **opportunistic only** — the
23//! Lambda execution environment may be frozen indefinitely between
24//! invocations, so nothing can depend on that continuation completing
25//! (ADR-013 §4.4).
26//!
27//! Correctness for push delivery on Lambda is carried by:
28//!
29//! 1. The atomic pending-dispatch marker written inside the request
30//! Lambda's commit transaction (ADR-013 §4.3 — opt in via
31//! `StorageImpl::with_push_dispatch_enabled(true)`).
32//! 2. [`LambdaStreamRecoveryHandler`] — DynamoDB Streams trigger on
33//! `a2a_push_pending_dispatches`. DynamoDB backends only.
34//! 3. [`LambdaScheduledRecoveryHandler`] — EventBridge Scheduler
35//! backstop. **Required for all backends**; it is the sole
36//! recovery path for SQLite / PostgreSQL / in-memory deployments.
37//!
38//! Without at least the scheduled worker, push delivery on Lambda is
39//! not durable — a marker written on a cold invocation may never be
40//! consumed. The example wiring lives in
41//! `examples/lambda-stream-worker` and
42//! `examples/lambda-scheduled-worker`.
43//!
44//! Lambda streaming is request-scoped (not persistent SSE connections). The durable
45//! event store ensures events survive across invocations. Clients reconnect with
46//! `Last-Event-ID` for continuation.
47//!
48//! # Cross-instance cancellation
49//!
50//! Lambda invocations are stateless and short-lived, so the Lambda
51//! adapter does **not** run the persistent cross-instance cancel
52//! poller that `A2aServer::run()` spawns. Cancellation behaviour on
53//! the Lambda adapter:
54//!
55//! - **Marker writes** — `CancelTask` on a Lambda invocation writes
56//! the cancel marker to the shared backend (DynamoDB / PostgreSQL).
57//! This works.
58//! - **Propagation to a live executor on the SAME Lambda invocation** —
59//! works via the same-instance token-trip path in `core_cancel_task`.
60//! - **Propagation to a live executor on a DIFFERENT Lambda invocation
61//! (warm container)** — **not currently supported**. There is no
62//! persistent poller to observe markers written by other
63//! invocations. A subsequent invocation whose handler reads the
64//! marker directly may act on it, but that is not a substitute for
65//! the server runtime's live propagation.
66//!
67//! The builder still **requires** an `A2aCancellationSupervisor`
68//! implementation on the same backend so that marker writes reach
69//! the correct backend and a future polling-adapter variant can
70//! consume them. If a deployment passes a non-matching supervisor,
71//! `build()` rejects the configuration.
72
73mod adapter;
74mod auth;
75mod no_streaming;
76mod scheduled_recovery;
77mod stream_recovery;
78
79#[cfg(feature = "sqs")]
80mod durable;
81
82#[cfg(test)]
83mod builder_tests;
84#[cfg(test)]
85mod scheduled_recovery_tests;
86#[cfg(test)]
87mod stream_recovery_tests;
88
89pub use adapter::{axum_to_lambda_response, lambda_to_axum_request};
90pub use auth::{AuthorizerMapping, LambdaAuthorizerMiddleware};
91pub use no_streaming::NoStreamingLayer;
92pub use scheduled_recovery::{
93 LambdaScheduledRecoveryConfig, LambdaScheduledRecoveryHandler, LambdaScheduledRecoveryResponse,
94};
95pub use stream_recovery::LambdaStreamRecoveryHandler;
96
97#[cfg(feature = "sqs")]
98pub use durable::{LambdaEvent, SqsDurableExecutorQueue, classify_event, drive_sqs_batch};
99
100use std::sync::Arc;
101
102use turul_a2a::error::A2aError;
103use turul_a2a::executor::AgentExecutor;
104use turul_a2a::middleware::{A2aMiddleware, MiddlewareStack};
105use turul_a2a::push::A2aPushDeliveryStore;
106use turul_a2a::router::{AppState, build_router};
107use turul_a2a::server::RuntimeConfig;
108use turul_a2a::storage::{
109 A2aAtomicStore, A2aCancellationSupervisor, A2aEventStore, A2aPushNotificationStorage,
110 A2aTaskStorage,
111};
112use turul_a2a::streaming::TaskEventBroker;
113
114/// Builder for Lambda A2A handler.
115///
116/// Use `.storage(my_storage)` to supply a single backend implementing all storage traits.
117/// This satisfies ADR-009's same-backend requirement and enables streaming via durable
118/// event store.
119///
120/// For push-notification delivery, wire `.storage()` with a backend that implements
121/// [`A2aPushDeliveryStore`] (all four first-party backends do). The atomic store
122/// must also opt in via `with_push_dispatch_enabled(true)` (ADR-013 §4.3); the
123/// builder rejects configurations where the flag and the delivery store disagree.
124pub struct LambdaA2aServerBuilder {
125 executor: Option<Arc<dyn AgentExecutor>>,
126 task_storage: Option<Arc<dyn A2aTaskStorage>>,
127 push_storage: Option<Arc<dyn A2aPushNotificationStorage>>,
128 event_store: Option<Arc<dyn A2aEventStore>>,
129 atomic_store: Option<Arc<dyn A2aAtomicStore>>,
130 cancellation_supervisor: Option<Arc<dyn A2aCancellationSupervisor>>,
131 push_delivery_store: Option<Arc<dyn A2aPushDeliveryStore>>,
132 middleware: Vec<Arc<dyn A2aMiddleware>>,
133 runtime_config: RuntimeConfig,
134 durable_executor_queue: Option<Arc<dyn turul_a2a::durable_executor::DurableExecutorQueue>>,
135}
136
137impl LambdaA2aServerBuilder {
138 pub fn new() -> Self {
139 Self {
140 executor: None,
141 task_storage: None,
142 push_storage: None,
143 event_store: None,
144 atomic_store: None,
145 cancellation_supervisor: None,
146 push_delivery_store: None,
147 middleware: vec![],
148 runtime_config: RuntimeConfig::default(),
149 durable_executor_queue: None,
150 }
151 }
152
153 /// Wire a durable executor queue (ADR-018). When set, the
154 /// `return_immediately = true` path in `core_send_message` enqueues
155 /// a [`turul_a2a::durable_executor::QueuedExecutorJob`] on this
156 /// queue instead of spawning the executor locally, and the
157 /// capability flag `RuntimeConfig::supports_return_immediately` is
158 /// turned **back on** as an implementation detail — the capability
159 /// cannot be claimed without supplying the queue.
160 ///
161 /// Adopters should typically reach for the SQS-specific helper
162 /// [`Self::with_sqs_return_immediately`] instead; this generic
163 /// method exists so adopters can inject in-memory fakes for tests
164 /// or provide non-SQS transports (Kinesis, Step Functions task
165 /// token, self-invoke, etc.).
166 pub fn with_durable_executor(
167 mut self,
168 queue: Arc<dyn turul_a2a::durable_executor::DurableExecutorQueue>,
169 ) -> Self {
170 self.durable_executor_queue = Some(queue);
171 self.runtime_config.supports_return_immediately = true;
172 self
173 }
174
175 /// Wire the AWS SQS durable executor queue (ADR-018). Ergonomic
176 /// helper over [`Self::with_durable_executor`] that constructs a
177 /// [`SqsDurableExecutorQueue`] from a `queue_url` + shared SQS
178 /// client. Requires the `sqs` feature.
179 #[cfg(feature = "sqs")]
180 pub fn with_sqs_return_immediately(
181 self,
182 queue_url: impl Into<String>,
183 sqs_client: Arc<aws_sdk_sqs::Client>,
184 ) -> Self {
185 let queue = Arc::new(SqsDurableExecutorQueue::new(queue_url, sqs_client));
186 self.with_durable_executor(queue)
187 }
188
189 pub fn executor(mut self, exec: impl AgentExecutor + 'static) -> Self {
190 self.executor = Some(Arc::new(exec));
191 self
192 }
193
194 /// Set all storage from a single backend instance.
195 ///
196 /// This is the **preferred** method — a single struct implementing all
197 /// storage traits guarantees the same-backend requirement (ADR-009)
198 /// AND ensures the cancellation supervisor reads the same backend
199 /// the cancel marker is written to. A mismatch here (e.g., DynamoDB
200 /// task storage + in-memory cancellation supervisor) silently
201 /// breaks cross-instance cancellation.
202 ///
203 /// ADR-013 §4.3 errata: `.storage()` wires storage traits only. It does
204 /// **not** auto-register the storage as a push-delivery store, even if
205 /// the backend happens to implement [`A2aPushDeliveryStore`]. To opt in
206 /// to push delivery, call [`Self::push_delivery_store`] explicitly and
207 /// call `with_push_dispatch_enabled(true)` on the storage instance
208 /// before passing it here. Non-push deployments need neither.
209 pub fn storage<S>(mut self, storage: S) -> Self
210 where
211 S: A2aTaskStorage
212 + A2aPushNotificationStorage
213 + A2aEventStore
214 + A2aAtomicStore
215 + A2aCancellationSupervisor
216 + Clone
217 + 'static,
218 {
219 self.task_storage = Some(Arc::new(storage.clone()));
220 self.push_storage = Some(Arc::new(storage.clone()));
221 self.event_store = Some(Arc::new(storage.clone()));
222 self.atomic_store = Some(Arc::new(storage.clone()));
223 self.cancellation_supervisor = Some(Arc::new(storage));
224 self
225 }
226
227 /// Set task storage individually. Must be paired with event_store/atomic_store
228 /// AND `cancellation_supervisor()` for same-backend compliance.
229 pub fn task_storage(mut self, s: impl A2aTaskStorage + 'static) -> Self {
230 self.task_storage = Some(Arc::new(s));
231 self
232 }
233
234 /// Set push notification storage individually.
235 pub fn push_storage(mut self, s: impl A2aPushNotificationStorage + 'static) -> Self {
236 self.push_storage = Some(Arc::new(s));
237 self
238 }
239
240 /// Set event store individually. Must match task_storage backend.
241 pub fn event_store(mut self, s: impl A2aEventStore + 'static) -> Self {
242 self.event_store = Some(Arc::new(s));
243 self
244 }
245
246 /// Set atomic store individually. Must match task_storage backend.
247 pub fn atomic_store(mut self, s: impl A2aAtomicStore + 'static) -> Self {
248 self.atomic_store = Some(Arc::new(s));
249 self
250 }
251
252 /// Set the cancellation supervisor individually. Must match
253 /// task_storage backend — otherwise `:cancel` writes the marker to
254 /// one backend and the supervisor reads from another, silently
255 /// breaking cross-instance cancellation. Prefer `.storage()` for
256 /// unified wiring. Consumed by `core_cancel_task` (ADR-012).
257 pub fn cancellation_supervisor(mut self, s: impl A2aCancellationSupervisor + 'static) -> Self {
258 self.cancellation_supervisor = Some(Arc::new(s));
259 self
260 }
261
262 /// Set the push delivery store individually (ADR-011 §10 / ADR-013).
263 ///
264 /// Prefer `.storage()` for unified wiring. The delivery store MUST
265 /// be on the same backend as `.task_storage(...)` — the builder
266 /// rejects mismatches. Passing a delivery store also requires the
267 /// atomic store to have opted in via `with_push_dispatch_enabled(true)`
268 /// (ADR-013 §4.3).
269 pub fn push_delivery_store(mut self, store: impl A2aPushDeliveryStore + 'static) -> Self {
270 self.push_delivery_store = Some(Arc::new(store));
271 self
272 }
273
274 /// Override the runtime configuration (timeouts, retry budgets,
275 /// push tuning). Defaults to [`RuntimeConfig::default()`]. The
276 /// builder validates push settings (claim expiry vs retry horizon,
277 /// ADR-011 §10.3) when `.push_delivery_store(...)` is wired.
278 pub fn runtime_config(mut self, cfg: RuntimeConfig) -> Self {
279 self.runtime_config = cfg;
280 self
281 }
282
283 pub fn middleware(mut self, mw: Arc<dyn A2aMiddleware>) -> Self {
284 self.middleware.push(mw);
285 self
286 }
287
288 pub fn build(self) -> Result<LambdaA2aHandler, A2aError> {
289 let executor = self
290 .executor
291 .ok_or(A2aError::Internal("executor is required".into()))?;
292 let task_storage = self.task_storage.ok_or(A2aError::Internal(
293 "task_storage is required for Lambda".into(),
294 ))?;
295 let push_storage = self.push_storage.ok_or(A2aError::Internal(
296 "push_storage is required for Lambda".into(),
297 ))?;
298 let event_store: Arc<dyn A2aEventStore> = self.event_store.ok_or(A2aError::Internal(
299 "event_store is required for Lambda (use .storage() for unified backend)".into(),
300 ))?;
301 let atomic_store: Arc<dyn A2aAtomicStore> = self.atomic_store.ok_or(A2aError::Internal(
302 "atomic_store is required for Lambda (use .storage() for unified backend)".into(),
303 ))?;
304 // ADR-012: the cancellation supervisor MUST be supplied. No
305 // InMemoryA2aStorage fallback: that would silently break
306 // cross-instance cancellation by reading markers from a
307 // different backend than they were written to.
308 let cancellation_supervisor: Arc<dyn A2aCancellationSupervisor> =
309 self.cancellation_supervisor.ok_or(A2aError::Internal(
310 "cancellation_supervisor is required for Lambda. Use .storage() for unified \
311 backend wiring, or .cancellation_supervisor(...) alongside the individual \
312 storage setters. Omitting it would silently break cross-instance \
313 cancellation in ADR-012."
314 .into(),
315 ))?;
316
317 // Same-backend enforcement (ADR-009 + ADR-012).
318 let task_backend = task_storage.backend_name();
319 let push_backend = push_storage.backend_name();
320 let event_backend = event_store.backend_name();
321 let atomic_backend = atomic_store.backend_name();
322 let supervisor_backend = cancellation_supervisor.backend_name();
323 if task_backend != push_backend
324 || task_backend != event_backend
325 || task_backend != atomic_backend
326 || task_backend != supervisor_backend
327 {
328 return Err(A2aError::Internal(format!(
329 "Storage backend mismatch: task={task_backend}, push={push_backend}, \
330 event={event_backend}, atomic={atomic_backend}, \
331 cancellation_supervisor={supervisor_backend}. \
332 ADR-009 requires all storage traits to share the same backend. \
333 ADR-012 requires the cancellation supervisor on the same backend \
334 so cross-instance cancel markers are observable. \
335 Use .storage() for unified backend."
336 )));
337 }
338
339 let push_delivery_store = self.push_delivery_store;
340
341 // Push-dispatch consistency (ADR-013 §4.3): the atomic store's
342 // opt-in flag and the presence of `push_delivery_store` MUST
343 // agree. Both-true = push fully wired; both-false = non-push
344 // deployment. Mixed cases are build errors — this mirrors the
345 // main server builder (server/mod.rs).
346 match (
347 push_delivery_store.is_some(),
348 atomic_store.push_dispatch_enabled(),
349 ) {
350 (true, true) | (false, false) => {}
351 (true, false) => {
352 return Err(A2aError::Internal(
353 "push_delivery_store wired but atomic_store.push_dispatch_enabled() \
354 is false. Call .with_push_dispatch_enabled(true) on the backend \
355 storage before passing it to .storage()."
356 .into(),
357 ));
358 }
359 (false, true) => {
360 return Err(A2aError::Internal(
361 "atomic_store.push_dispatch_enabled() is true but no \
362 push_delivery_store is wired. Pending-dispatch markers would be \
363 written with no consumer, imposing load-bearing infra for no \
364 benefit. If you need to populate markers for an external \
365 consumer, open an issue for a distinctly-named opt-in — for now, \
366 this configuration is rejected."
367 .into(),
368 ));
369 }
370 }
371
372 // ADR-017 §Decision Bug 1: the Lambda execution environment
373 // freezes after the HTTP response is flushed, so any
374 // `tokio::spawn`'d executor continuation is opportunistic only
375 // (ADR-013 §4.4). Refuse `SendMessageConfiguration.return_immediately
376 // = true` at the `core_send_message` entry point via this
377 // capability flag instead of silently orphaning the executor.
378 //
379 // ADR-018: if a durable executor queue is wired via
380 // [`Self::with_durable_executor`] (or the SQS helper), the
381 // capability is re-enabled because enqueueing is durable
382 // across the Lambda freeze. The explicit re-enable lives in
383 // the builder method so the capability cannot be claimed
384 // without the mechanism.
385 let mut runtime_config = self.runtime_config;
386 if self.durable_executor_queue.is_none() {
387 runtime_config.supports_return_immediately = false;
388 }
389
390 // Push dispatcher wiring (ADR-011 §10 / ADR-013 §7). When
391 // `push_delivery_store` is present, validate its backend matches,
392 // validate the retry-horizon invariant, and construct the
393 // `PushDispatcher`. Under Lambda the dispatcher runs
394 // opportunistically post-return (ADR-013 §4.4); correctness
395 // comes from the atomic marker + stream/scheduler recovery.
396 let push_dispatcher: Option<Arc<turul_a2a::push::PushDispatcher>> =
397 if let Some(delivery) = push_delivery_store.as_ref() {
398 let push_delivery_backend = delivery.backend_name();
399 if task_backend != push_delivery_backend {
400 return Err(A2aError::Internal(format!(
401 "Storage backend mismatch: task={task_backend}, \
402 push_delivery={push_delivery_backend}. \
403 ADR-009 requires all storage traits to share the same backend."
404 )));
405 }
406
407 // ADR-011 §10.3: claim expiry must exceed the worst-case
408 // retry horizon so a live claim is not mis-classified as
409 // stale mid-retry.
410 let retry_horizon = runtime_config
411 .push_backoff_cap
412 .saturating_mul(runtime_config.push_max_attempts as u32);
413 if runtime_config.push_claim_expiry <= retry_horizon {
414 return Err(A2aError::Internal(format!(
415 "push_claim_expiry ({:?}) must be greater than retry horizon \
416 (push_max_attempts={} * push_backoff_cap={:?} = {:?}). \
417 Raise push_claim_expiry or lower push_max_attempts/push_backoff_cap.",
418 runtime_config.push_claim_expiry,
419 runtime_config.push_max_attempts,
420 runtime_config.push_backoff_cap,
421 retry_horizon
422 )));
423 }
424
425 let mut delivery_cfg = turul_a2a::push::delivery::PushDeliveryConfig::default();
426 delivery_cfg.max_attempts = runtime_config.push_max_attempts as u32;
427 delivery_cfg.backoff_base = runtime_config.push_backoff_base;
428 delivery_cfg.backoff_cap = runtime_config.push_backoff_cap;
429 delivery_cfg.backoff_jitter = runtime_config.push_backoff_jitter;
430 delivery_cfg.request_timeout = runtime_config.push_request_timeout;
431 delivery_cfg.connect_timeout = runtime_config.push_connect_timeout;
432 delivery_cfg.read_timeout = runtime_config.push_read_timeout;
433 delivery_cfg.claim_expiry = runtime_config.push_claim_expiry;
434 delivery_cfg.max_payload_bytes = runtime_config.push_max_payload_bytes;
435 delivery_cfg.allow_insecure_urls = runtime_config.allow_insecure_push_urls;
436
437 let instance_id = format!("a2a-lambda-{}", uuid::Uuid::now_v7());
438 let worker = turul_a2a::push::delivery::PushDeliveryWorker::new(
439 delivery.clone(),
440 delivery_cfg,
441 None,
442 instance_id,
443 )
444 .map_err(|e| A2aError::Internal(format!("push worker build failed: {e}")))?;
445
446 Some(Arc::new(turul_a2a::push::PushDispatcher::new(
447 Arc::new(worker),
448 push_storage.clone(),
449 task_storage.clone(),
450 )))
451 } else {
452 None
453 };
454
455 let state = AppState {
456 executor,
457 task_storage,
458 push_storage,
459 event_store,
460 atomic_store,
461 event_broker: TaskEventBroker::new(),
462 middleware_stack: Arc::new(MiddlewareStack::new(self.middleware)),
463 runtime_config,
464 in_flight: Arc::new(turul_a2a::server::in_flight::InFlightRegistry::new()),
465 cancellation_supervisor,
466 push_delivery_store,
467 push_dispatcher,
468 durable_executor_queue: self.durable_executor_queue,
469 };
470
471 let router = build_router(state.clone());
472
473 Ok(LambdaA2aHandler { router, state })
474 }
475}
476
477impl Default for LambdaA2aServerBuilder {
478 fn default() -> Self {
479 Self::new()
480 }
481}
482
483/// Lambda handler wrapping the axum Router.
484#[derive(Clone)]
485pub struct LambdaA2aHandler {
486 router: axum::Router,
487 /// Held for ADR-018 SQS-event dispatch (`handle_sqs`). Always
488 /// populated — the SQS handler method is gated behind the `sqs`
489 /// feature but the state is carried unconditionally so callers can
490 /// reach it for diagnostics or future non-HTTP / non-SQS handlers.
491 #[cfg_attr(not(feature = "sqs"), allow(dead_code))]
492 state: AppState,
493}
494
495impl LambdaA2aHandler {
496 pub fn builder() -> LambdaA2aServerBuilder {
497 LambdaA2aServerBuilder::new()
498 }
499
500 /// Handle a Lambda HTTP request.
501 pub async fn handle(
502 &self,
503 event: lambda_http::Request,
504 ) -> Result<lambda_http::Response<lambda_http::Body>, lambda_http::Error> {
505 let axum_req = lambda_to_axum_request(event)?;
506
507 let axum_resp = tower::ServiceExt::oneshot(self.router.clone(), axum_req)
508 .await
509 .map_err(|e| lambda_http::Error::from(format!("Router error: {e}")))?;
510
511 axum_to_lambda_response(axum_resp).await
512 }
513
514 /// Handle an SQS batch from the durable executor queue (ADR-018).
515 ///
516 /// Returns `SqsBatchResponse` whose `batch_item_failures` list
517 /// tells the event source mapping which records to retry.
518 /// Requires `FunctionResponseTypes: ["ReportBatchItemFailures"]`
519 /// on the mapping.
520 ///
521 /// Each record: deserialize envelope → load task → terminal-no-op
522 /// check → cancel-marker direct-CANCELED commit → run executor.
523 /// See `durable::drive_sqs_batch` for the full semantics.
524 #[cfg(feature = "sqs")]
525 pub async fn handle_sqs(
526 &self,
527 event: aws_lambda_events::event::sqs::SqsEvent,
528 ) -> aws_lambda_events::event::sqs::SqsBatchResponse {
529 durable::drive_sqs_batch(&self.state, event).await
530 }
531}