Skip to main content

turul_a2a_aws_lambda/
lib.rs

1//! AWS Lambda adapter for turul-a2a.
2//!
3//! Thin wrapper: converts Lambda events to axum requests, delegates to the
4//! same Router, converts responses back. Per ADR-008/ADR-009:
5//! - Authorizer context mapped via synthetic headers with anti-spoofing
6//! - Streaming supported via durable event store (D3)
7//! - SSE responses are buffered: task executes, events are collected, returned as one response
8//! - `POST /message:stream` executes the task within the Lambda invocation and returns all events
9//! - `GET /tasks/{id}:subscribe` is for tasks that are **not** in a terminal
10//!   state. Within one invocation it emits the initial `Task` snapshot,
11//!   replays stored events via `Last-Event-ID`, and closes when the task
12//!   reaches a terminal state. Subscribing to an already-terminal task
13//!   returns `UnsupportedOperationError` per A2A v1.0 §3.1.6 / ADR-010
14//!   §4.3. For retrieving a terminal task's final state use `GetTask`.
15//!
16//! # Push-notification delivery — external triggers are mandatory
17//!
18//! Push delivery on Lambda (ADR-013) is architecturally different
19//! from the binary server. The request Lambda installed via
20//! [`LambdaA2aServerBuilder`] still constructs a `PushDispatcher`
21//! when `push_delivery_store` is wired, but any `tokio::spawn`
22//! continuation it emits post-return is **opportunistic only** — the
23//! Lambda execution environment may be frozen indefinitely between
24//! invocations, so nothing can depend on that continuation completing
25//! (ADR-013 §4.4).
26//!
27//! Correctness for push delivery on Lambda is carried by:
28//!
29//! 1. The atomic pending-dispatch marker written inside the request
30//!    Lambda's commit transaction (ADR-013 §4.3 — opt in via
31//!    `StorageImpl::with_push_dispatch_enabled(true)`).
32//! 2. [`LambdaStreamRecoveryHandler`] — DynamoDB Streams trigger on
33//!    `a2a_push_pending_dispatches`. DynamoDB backends only.
34//! 3. [`LambdaScheduledRecoveryHandler`] — EventBridge Scheduler
35//!    backstop. **Required for all backends**; it is the sole
36//!    recovery path for SQLite / PostgreSQL / in-memory deployments.
37//!
38//! Without at least the scheduled worker, push delivery on Lambda is
39//! not durable — a marker written on a cold invocation may never be
40//! consumed. The example wiring lives in
41//! `examples/lambda-stream-worker` and
42//! `examples/lambda-scheduled-worker`.
43//!
44//! Lambda streaming is request-scoped (not persistent SSE connections). The durable
45//! event store ensures events survive across invocations. Clients reconnect with
46//! `Last-Event-ID` for continuation.
47//!
48//! # Cross-instance cancellation
49//!
50//! Lambda invocations are stateless and short-lived, so the Lambda
51//! adapter does **not** run the persistent cross-instance cancel
52//! poller that `A2aServer::run()` spawns. Cancellation behaviour on
53//! the Lambda adapter:
54//!
55//! - **Marker writes** — `CancelTask` on a Lambda invocation writes
56//!   the cancel marker to the shared backend (DynamoDB / PostgreSQL).
57//!   This works.
58//! - **Propagation to a live executor on the SAME Lambda invocation** —
59//!   works via the same-instance token-trip path in `core_cancel_task`.
60//! - **Propagation to a live executor on a DIFFERENT Lambda invocation
61//!   (warm container)** — **not currently supported**. There is no
62//!   persistent poller to observe markers written by other
63//!   invocations. A subsequent invocation whose handler reads the
64//!   marker directly may act on it, but that is not a substitute for
65//!   the server runtime's live propagation.
66//!
67//! The builder still **requires** an `A2aCancellationSupervisor`
68//! implementation on the same backend so that marker writes reach
69//! the correct backend and a future polling-adapter variant can
70//! consume them. If a deployment passes a non-matching supervisor,
71//! `build()` rejects the configuration.
72
73mod adapter;
74mod auth;
75mod no_streaming;
76mod scheduled_recovery;
77mod stream_recovery;
78
79#[cfg(test)]
80mod builder_tests;
81#[cfg(test)]
82mod scheduled_recovery_tests;
83#[cfg(test)]
84mod stream_recovery_tests;
85
86pub use adapter::{axum_to_lambda_response, lambda_to_axum_request};
87pub use auth::{AuthorizerMapping, LambdaAuthorizerMiddleware};
88pub use no_streaming::NoStreamingLayer;
89pub use scheduled_recovery::{
90    LambdaScheduledRecoveryConfig, LambdaScheduledRecoveryHandler, LambdaScheduledRecoveryResponse,
91};
92pub use stream_recovery::LambdaStreamRecoveryHandler;
93
94use std::sync::Arc;
95
96use turul_a2a::error::A2aError;
97use turul_a2a::executor::AgentExecutor;
98use turul_a2a::middleware::{A2aMiddleware, MiddlewareStack};
99use turul_a2a::push::A2aPushDeliveryStore;
100use turul_a2a::router::{AppState, build_router};
101use turul_a2a::server::RuntimeConfig;
102use turul_a2a::storage::{
103    A2aAtomicStore, A2aCancellationSupervisor, A2aEventStore, A2aPushNotificationStorage,
104    A2aTaskStorage,
105};
106use turul_a2a::streaming::TaskEventBroker;
107
108/// Builder for Lambda A2A handler.
109///
110/// Use `.storage(my_storage)` to supply a single backend implementing all storage traits.
111/// This satisfies ADR-009's same-backend requirement and enables streaming via durable
112/// event store.
113///
114/// For push-notification delivery, wire `.storage()` with a backend that implements
115/// [`A2aPushDeliveryStore`] (all four first-party backends do). The atomic store
116/// must also opt in via `with_push_dispatch_enabled(true)` (ADR-013 §4.3); the
117/// builder rejects configurations where the flag and the delivery store disagree.
118pub struct LambdaA2aServerBuilder {
119    executor: Option<Arc<dyn AgentExecutor>>,
120    task_storage: Option<Arc<dyn A2aTaskStorage>>,
121    push_storage: Option<Arc<dyn A2aPushNotificationStorage>>,
122    event_store: Option<Arc<dyn A2aEventStore>>,
123    atomic_store: Option<Arc<dyn A2aAtomicStore>>,
124    cancellation_supervisor: Option<Arc<dyn A2aCancellationSupervisor>>,
125    push_delivery_store: Option<Arc<dyn A2aPushDeliveryStore>>,
126    middleware: Vec<Arc<dyn A2aMiddleware>>,
127    runtime_config: RuntimeConfig,
128}
129
130impl LambdaA2aServerBuilder {
131    pub fn new() -> Self {
132        Self {
133            executor: None,
134            task_storage: None,
135            push_storage: None,
136            event_store: None,
137            atomic_store: None,
138            cancellation_supervisor: None,
139            push_delivery_store: None,
140            middleware: vec![],
141            runtime_config: RuntimeConfig::default(),
142        }
143    }
144
145    pub fn executor(mut self, exec: impl AgentExecutor + 'static) -> Self {
146        self.executor = Some(Arc::new(exec));
147        self
148    }
149
150    /// Set all storage from a single backend instance.
151    ///
152    /// This is the **preferred** method — a single struct implementing all
153    /// storage traits guarantees the same-backend requirement (ADR-009)
154    /// AND ensures the cancellation supervisor reads the same backend
155    /// the cancel marker is written to. A mismatch here (e.g., DynamoDB
156    /// task storage + in-memory cancellation supervisor) silently
157    /// breaks cross-instance cancellation.
158    ///
159    /// ADR-013 §4.3 errata: `.storage()` wires storage traits only. It does
160    /// **not** auto-register the storage as a push-delivery store, even if
161    /// the backend happens to implement [`A2aPushDeliveryStore`]. To opt in
162    /// to push delivery, call [`Self::push_delivery_store`] explicitly and
163    /// call `with_push_dispatch_enabled(true)` on the storage instance
164    /// before passing it here. Non-push deployments need neither.
165    pub fn storage<S>(mut self, storage: S) -> Self
166    where
167        S: A2aTaskStorage
168            + A2aPushNotificationStorage
169            + A2aEventStore
170            + A2aAtomicStore
171            + A2aCancellationSupervisor
172            + Clone
173            + 'static,
174    {
175        self.task_storage = Some(Arc::new(storage.clone()));
176        self.push_storage = Some(Arc::new(storage.clone()));
177        self.event_store = Some(Arc::new(storage.clone()));
178        self.atomic_store = Some(Arc::new(storage.clone()));
179        self.cancellation_supervisor = Some(Arc::new(storage));
180        self
181    }
182
183    /// Set task storage individually. Must be paired with event_store/atomic_store
184    /// AND `cancellation_supervisor()` for same-backend compliance.
185    pub fn task_storage(mut self, s: impl A2aTaskStorage + 'static) -> Self {
186        self.task_storage = Some(Arc::new(s));
187        self
188    }
189
190    /// Set push notification storage individually.
191    pub fn push_storage(mut self, s: impl A2aPushNotificationStorage + 'static) -> Self {
192        self.push_storage = Some(Arc::new(s));
193        self
194    }
195
196    /// Set event store individually. Must match task_storage backend.
197    pub fn event_store(mut self, s: impl A2aEventStore + 'static) -> Self {
198        self.event_store = Some(Arc::new(s));
199        self
200    }
201
202    /// Set atomic store individually. Must match task_storage backend.
203    pub fn atomic_store(mut self, s: impl A2aAtomicStore + 'static) -> Self {
204        self.atomic_store = Some(Arc::new(s));
205        self
206    }
207
208    /// Set the cancellation supervisor individually. Must match
209    /// task_storage backend — otherwise `:cancel` writes the marker to
210    /// one backend and the supervisor reads from another, silently
211    /// breaking cross-instance cancellation. Prefer `.storage()` for
212    /// unified wiring. Consumed by `core_cancel_task` (ADR-012).
213    pub fn cancellation_supervisor(mut self, s: impl A2aCancellationSupervisor + 'static) -> Self {
214        self.cancellation_supervisor = Some(Arc::new(s));
215        self
216    }
217
218    /// Set the push delivery store individually (ADR-011 §10 / ADR-013).
219    ///
220    /// Prefer `.storage()` for unified wiring. The delivery store MUST
221    /// be on the same backend as `.task_storage(...)` — the builder
222    /// rejects mismatches. Passing a delivery store also requires the
223    /// atomic store to have opted in via `with_push_dispatch_enabled(true)`
224    /// (ADR-013 §4.3).
225    pub fn push_delivery_store(mut self, store: impl A2aPushDeliveryStore + 'static) -> Self {
226        self.push_delivery_store = Some(Arc::new(store));
227        self
228    }
229
230    /// Override the runtime configuration (timeouts, retry budgets,
231    /// push tuning). Defaults to [`RuntimeConfig::default()`]. The
232    /// builder validates push settings (claim expiry vs retry horizon,
233    /// ADR-011 §10.3) when `.push_delivery_store(...)` is wired.
234    pub fn runtime_config(mut self, cfg: RuntimeConfig) -> Self {
235        self.runtime_config = cfg;
236        self
237    }
238
239    pub fn middleware(mut self, mw: Arc<dyn A2aMiddleware>) -> Self {
240        self.middleware.push(mw);
241        self
242    }
243
244    pub fn build(self) -> Result<LambdaA2aHandler, A2aError> {
245        let executor = self
246            .executor
247            .ok_or(A2aError::Internal("executor is required".into()))?;
248        let task_storage = self.task_storage.ok_or(A2aError::Internal(
249            "task_storage is required for Lambda".into(),
250        ))?;
251        let push_storage = self.push_storage.ok_or(A2aError::Internal(
252            "push_storage is required for Lambda".into(),
253        ))?;
254        let event_store: Arc<dyn A2aEventStore> = self.event_store.ok_or(A2aError::Internal(
255            "event_store is required for Lambda (use .storage() for unified backend)".into(),
256        ))?;
257        let atomic_store: Arc<dyn A2aAtomicStore> = self.atomic_store.ok_or(A2aError::Internal(
258            "atomic_store is required for Lambda (use .storage() for unified backend)".into(),
259        ))?;
260        // ADR-012: the cancellation supervisor MUST be supplied. No
261        // InMemoryA2aStorage fallback: that would silently break
262        // cross-instance cancellation by reading markers from a
263        // different backend than they were written to.
264        let cancellation_supervisor: Arc<dyn A2aCancellationSupervisor> =
265            self.cancellation_supervisor.ok_or(A2aError::Internal(
266                "cancellation_supervisor is required for Lambda. Use .storage() for unified \
267                 backend wiring, or .cancellation_supervisor(...) alongside the individual \
268                 storage setters. Omitting it would silently break cross-instance \
269                 cancellation in ADR-012."
270                    .into(),
271            ))?;
272
273        // Same-backend enforcement (ADR-009 + ADR-012).
274        let task_backend = task_storage.backend_name();
275        let push_backend = push_storage.backend_name();
276        let event_backend = event_store.backend_name();
277        let atomic_backend = atomic_store.backend_name();
278        let supervisor_backend = cancellation_supervisor.backend_name();
279        if task_backend != push_backend
280            || task_backend != event_backend
281            || task_backend != atomic_backend
282            || task_backend != supervisor_backend
283        {
284            return Err(A2aError::Internal(format!(
285                "Storage backend mismatch: task={task_backend}, push={push_backend}, \
286                 event={event_backend}, atomic={atomic_backend}, \
287                 cancellation_supervisor={supervisor_backend}. \
288                 ADR-009 requires all storage traits to share the same backend. \
289                 ADR-012 requires the cancellation supervisor on the same backend \
290                 so cross-instance cancel markers are observable. \
291                 Use .storage() for unified backend."
292            )));
293        }
294
295        let push_delivery_store = self.push_delivery_store;
296
297        // Push-dispatch consistency (ADR-013 §4.3): the atomic store's
298        // opt-in flag and the presence of `push_delivery_store` MUST
299        // agree. Both-true = push fully wired; both-false = non-push
300        // deployment. Mixed cases are build errors — this mirrors the
301        // main server builder (server/mod.rs).
302        match (
303            push_delivery_store.is_some(),
304            atomic_store.push_dispatch_enabled(),
305        ) {
306            (true, true) | (false, false) => {}
307            (true, false) => {
308                return Err(A2aError::Internal(
309                    "push_delivery_store wired but atomic_store.push_dispatch_enabled() \
310                     is false. Call .with_push_dispatch_enabled(true) on the backend \
311                     storage before passing it to .storage()."
312                        .into(),
313                ));
314            }
315            (false, true) => {
316                return Err(A2aError::Internal(
317                    "atomic_store.push_dispatch_enabled() is true but no \
318                     push_delivery_store is wired. Pending-dispatch markers would be \
319                     written with no consumer, imposing load-bearing infra for no \
320                     benefit. If you need to populate markers for an external \
321                     consumer, open an issue for a distinctly-named opt-in — for now, \
322                     this configuration is rejected."
323                        .into(),
324                ));
325            }
326        }
327
328        let runtime_config = self.runtime_config;
329
330        // Push dispatcher wiring (ADR-011 §10 / ADR-013 §7). When
331        // `push_delivery_store` is present, validate its backend matches,
332        // validate the retry-horizon invariant, and construct the
333        // `PushDispatcher`. Under Lambda the dispatcher runs
334        // opportunistically post-return (ADR-013 §4.4); correctness
335        // comes from the atomic marker + stream/scheduler recovery.
336        let push_dispatcher: Option<Arc<turul_a2a::push::PushDispatcher>> =
337            if let Some(delivery) = push_delivery_store.as_ref() {
338                let push_delivery_backend = delivery.backend_name();
339                if task_backend != push_delivery_backend {
340                    return Err(A2aError::Internal(format!(
341                        "Storage backend mismatch: task={task_backend}, \
342                         push_delivery={push_delivery_backend}. \
343                         ADR-009 requires all storage traits to share the same backend."
344                    )));
345                }
346
347                // ADR-011 §10.3: claim expiry must exceed the worst-case
348                // retry horizon so a live claim is not mis-classified as
349                // stale mid-retry.
350                let retry_horizon = runtime_config
351                    .push_backoff_cap
352                    .saturating_mul(runtime_config.push_max_attempts as u32);
353                if runtime_config.push_claim_expiry <= retry_horizon {
354                    return Err(A2aError::Internal(format!(
355                        "push_claim_expiry ({:?}) must be greater than retry horizon \
356                         (push_max_attempts={} * push_backoff_cap={:?} = {:?}). \
357                         Raise push_claim_expiry or lower push_max_attempts/push_backoff_cap.",
358                        runtime_config.push_claim_expiry,
359                        runtime_config.push_max_attempts,
360                        runtime_config.push_backoff_cap,
361                        retry_horizon
362                    )));
363                }
364
365                let mut delivery_cfg = turul_a2a::push::delivery::PushDeliveryConfig::default();
366                delivery_cfg.max_attempts = runtime_config.push_max_attempts as u32;
367                delivery_cfg.backoff_base = runtime_config.push_backoff_base;
368                delivery_cfg.backoff_cap = runtime_config.push_backoff_cap;
369                delivery_cfg.backoff_jitter = runtime_config.push_backoff_jitter;
370                delivery_cfg.request_timeout = runtime_config.push_request_timeout;
371                delivery_cfg.connect_timeout = runtime_config.push_connect_timeout;
372                delivery_cfg.read_timeout = runtime_config.push_read_timeout;
373                delivery_cfg.claim_expiry = runtime_config.push_claim_expiry;
374                delivery_cfg.max_payload_bytes = runtime_config.push_max_payload_bytes;
375                delivery_cfg.allow_insecure_urls = runtime_config.allow_insecure_push_urls;
376
377                let instance_id = format!("a2a-lambda-{}", uuid::Uuid::now_v7());
378                let worker = turul_a2a::push::delivery::PushDeliveryWorker::new(
379                    delivery.clone(),
380                    delivery_cfg,
381                    None,
382                    instance_id,
383                )
384                .map_err(|e| A2aError::Internal(format!("push worker build failed: {e}")))?;
385
386                Some(Arc::new(turul_a2a::push::PushDispatcher::new(
387                    Arc::new(worker),
388                    push_storage.clone(),
389                    task_storage.clone(),
390                )))
391            } else {
392                None
393            };
394
395        let state = AppState {
396            executor,
397            task_storage,
398            push_storage,
399            event_store,
400            atomic_store,
401            event_broker: TaskEventBroker::new(),
402            middleware_stack: Arc::new(MiddlewareStack::new(self.middleware)),
403            runtime_config,
404            in_flight: Arc::new(turul_a2a::server::in_flight::InFlightRegistry::new()),
405            cancellation_supervisor,
406            push_delivery_store,
407            push_dispatcher,
408        };
409
410        let router = build_router(state);
411
412        Ok(LambdaA2aHandler { router })
413    }
414}
415
416impl Default for LambdaA2aServerBuilder {
417    fn default() -> Self {
418        Self::new()
419    }
420}
421
422/// Lambda handler wrapping the axum Router.
423#[derive(Clone)]
424pub struct LambdaA2aHandler {
425    router: axum::Router,
426}
427
428impl LambdaA2aHandler {
429    pub fn builder() -> LambdaA2aServerBuilder {
430        LambdaA2aServerBuilder::new()
431    }
432
433    /// Handle a Lambda HTTP request.
434    pub async fn handle(
435        &self,
436        event: lambda_http::Request,
437    ) -> Result<lambda_http::Response<lambda_http::Body>, lambda_http::Error> {
438        let axum_req = lambda_to_axum_request(event)?;
439
440        let axum_resp = tower::ServiceExt::oneshot(self.router.clone(), axum_req)
441            .await
442            .map_err(|e| lambda_http::Error::from(format!("Router error: {e}")))?;
443
444        axum_to_lambda_response(axum_resp).await
445    }
446}