Skip to main content

turul_a2a_aws_lambda/
lib.rs

1//! AWS Lambda adapter for turul-a2a.
2//!
3//! Thin wrapper: converts Lambda events to axum requests, delegates to the
4//! same Router, converts responses back. Per ADR-008/ADR-009:
5//! - Authorizer context mapped via synthetic headers with anti-spoofing
6//! - Streaming supported via durable event store (D3)
7//! - SSE responses are buffered: task executes, events are collected, returned as one response
8//! - `POST /message:stream` executes the task within the Lambda invocation and returns all events
9//! - `GET /tasks/{id}:subscribe` replays stored events (works best for terminal tasks)
10//!
11//! Lambda streaming is request-scoped (not persistent SSE connections). The durable
12//! event store ensures events survive across invocations. Clients reconnect with
13//! `Last-Event-ID` for continuation.
14
15mod adapter;
16mod auth;
17mod no_streaming;
18
19pub use adapter::{lambda_to_axum_request, axum_to_lambda_response};
20pub use auth::{AuthorizerMapping, LambdaAuthorizerMiddleware};
21pub use no_streaming::NoStreamingLayer;
22
23use std::sync::Arc;
24
25use turul_a2a::error::A2aError;
26use turul_a2a::executor::AgentExecutor;
27use turul_a2a::middleware::{A2aMiddleware, MiddlewareStack};
28use turul_a2a::router::{build_router, AppState};
29use turul_a2a::storage::{
30    A2aAtomicStore, A2aEventStore, A2aPushNotificationStorage, A2aTaskStorage,
31};
32use turul_a2a::streaming::TaskEventBroker;
33
34/// Builder for Lambda A2A handler.
35///
36/// Use `.storage(my_storage)` to supply a single backend implementing all storage traits.
37/// This satisfies ADR-009's same-backend requirement and enables streaming via durable
38/// event store.
39pub struct LambdaA2aServerBuilder {
40    executor: Option<Arc<dyn AgentExecutor>>,
41    task_storage: Option<Arc<dyn A2aTaskStorage>>,
42    push_storage: Option<Arc<dyn A2aPushNotificationStorage>>,
43    event_store: Option<Arc<dyn A2aEventStore>>,
44    atomic_store: Option<Arc<dyn A2aAtomicStore>>,
45    middleware: Vec<Arc<dyn A2aMiddleware>>,
46}
47
48impl LambdaA2aServerBuilder {
49    pub fn new() -> Self {
50        Self {
51            executor: None,
52            task_storage: None,
53            push_storage: None,
54            event_store: None,
55            atomic_store: None,
56            middleware: vec![],
57        }
58    }
59
60    pub fn executor(mut self, exec: impl AgentExecutor + 'static) -> Self {
61        self.executor = Some(Arc::new(exec));
62        self
63    }
64
65    /// Set all storage from a single backend instance.
66    ///
67    /// This is the **preferred** method — a single struct implementing all storage
68    /// traits guarantees the same-backend requirement (ADR-009).
69    pub fn storage<S>(mut self, storage: S) -> Self
70    where
71        S: A2aTaskStorage
72            + A2aPushNotificationStorage
73            + A2aEventStore
74            + A2aAtomicStore
75            + Clone
76            + 'static,
77    {
78        self.task_storage = Some(Arc::new(storage.clone()));
79        self.push_storage = Some(Arc::new(storage.clone()));
80        self.event_store = Some(Arc::new(storage.clone()));
81        self.atomic_store = Some(Arc::new(storage));
82        self
83    }
84
85    /// Set task storage individually. Must be paired with event_store/atomic_store
86    /// for same-backend compliance.
87    pub fn task_storage(mut self, s: impl A2aTaskStorage + 'static) -> Self {
88        self.task_storage = Some(Arc::new(s));
89        self
90    }
91
92    /// Set push notification storage individually.
93    pub fn push_storage(mut self, s: impl A2aPushNotificationStorage + 'static) -> Self {
94        self.push_storage = Some(Arc::new(s));
95        self
96    }
97
98    /// Set event store individually. Must match task_storage backend.
99    pub fn event_store(mut self, s: impl A2aEventStore + 'static) -> Self {
100        self.event_store = Some(Arc::new(s));
101        self
102    }
103
104    /// Set atomic store individually. Must match task_storage backend.
105    pub fn atomic_store(mut self, s: impl A2aAtomicStore + 'static) -> Self {
106        self.atomic_store = Some(Arc::new(s));
107        self
108    }
109
110    pub fn middleware(mut self, mw: Arc<dyn A2aMiddleware>) -> Self {
111        self.middleware.push(mw);
112        self
113    }
114
115    pub fn build(self) -> Result<LambdaA2aHandler, A2aError> {
116        let executor = self.executor.ok_or(A2aError::Internal(
117            "executor is required".into(),
118        ))?;
119        let task_storage = self.task_storage.ok_or(A2aError::Internal(
120            "task_storage is required for Lambda".into(),
121        ))?;
122        let push_storage = self.push_storage.ok_or(A2aError::Internal(
123            "push_storage is required for Lambda".into(),
124        ))?;
125        let event_store: Arc<dyn A2aEventStore> = self.event_store.ok_or(A2aError::Internal(
126            "event_store is required for Lambda (use .storage() for unified backend)".into(),
127        ))?;
128        let atomic_store: Arc<dyn A2aAtomicStore> = self.atomic_store.ok_or(A2aError::Internal(
129            "atomic_store is required for Lambda (use .storage() for unified backend)".into(),
130        ))?;
131
132        // Same-backend enforcement (ADR-009)
133        let task_backend = task_storage.backend_name();
134        let push_backend = push_storage.backend_name();
135        let event_backend = event_store.backend_name();
136        let atomic_backend = atomic_store.backend_name();
137        if task_backend != push_backend
138            || task_backend != event_backend
139            || task_backend != atomic_backend
140        {
141            return Err(A2aError::Internal(format!(
142                "Storage backend mismatch: task={task_backend}, push={push_backend}, \
143                 event={event_backend}, atomic={atomic_backend}. \
144                 ADR-009 requires all storage traits to share the same backend. \
145                 Use .storage() for unified backend."
146            )));
147        }
148
149        let state = AppState {
150            executor,
151            task_storage,
152            push_storage,
153            event_store,
154            atomic_store,
155            event_broker: TaskEventBroker::new(),
156            middleware_stack: Arc::new(MiddlewareStack::new(self.middleware)),
157        };
158
159        let router = build_router(state);
160
161        Ok(LambdaA2aHandler { router })
162    }
163}
164
165impl Default for LambdaA2aServerBuilder {
166    fn default() -> Self {
167        Self::new()
168    }
169}
170
171/// Lambda handler wrapping the axum Router.
172#[derive(Clone)]
173pub struct LambdaA2aHandler {
174    router: axum::Router,
175}
176
177impl LambdaA2aHandler {
178    pub fn builder() -> LambdaA2aServerBuilder {
179        LambdaA2aServerBuilder::new()
180    }
181
182    /// Handle a Lambda HTTP request.
183    pub async fn handle(
184        &self,
185        event: lambda_http::Request,
186    ) -> Result<lambda_http::Response<lambda_http::Body>, lambda_http::Error> {
187        let axum_req = lambda_to_axum_request(event)?;
188
189        let axum_resp = tower::ServiceExt::oneshot(self.router.clone(), axum_req)
190            .await
191            .map_err(|e| lambda_http::Error::from(format!("Router error: {e}")))?;
192
193        axum_to_lambda_response(axum_resp).await
194    }
195}