turul_a2a_aws_lambda/
lib.rs1mod adapter;
16mod auth;
17mod no_streaming;
18
19pub use adapter::{lambda_to_axum_request, axum_to_lambda_response};
20pub use auth::{AuthorizerMapping, LambdaAuthorizerMiddleware};
21pub use no_streaming::NoStreamingLayer;
22
23use std::sync::Arc;
24
25use turul_a2a::error::A2aError;
26use turul_a2a::executor::AgentExecutor;
27use turul_a2a::middleware::{A2aMiddleware, MiddlewareStack};
28use turul_a2a::router::{build_router, AppState};
29use turul_a2a::storage::{
30 A2aAtomicStore, A2aEventStore, A2aPushNotificationStorage, A2aTaskStorage,
31};
32use turul_a2a::streaming::TaskEventBroker;
33
34pub struct LambdaA2aServerBuilder {
40 executor: Option<Arc<dyn AgentExecutor>>,
41 task_storage: Option<Arc<dyn A2aTaskStorage>>,
42 push_storage: Option<Arc<dyn A2aPushNotificationStorage>>,
43 event_store: Option<Arc<dyn A2aEventStore>>,
44 atomic_store: Option<Arc<dyn A2aAtomicStore>>,
45 middleware: Vec<Arc<dyn A2aMiddleware>>,
46}
47
48impl LambdaA2aServerBuilder {
49 pub fn new() -> Self {
50 Self {
51 executor: None,
52 task_storage: None,
53 push_storage: None,
54 event_store: None,
55 atomic_store: None,
56 middleware: vec![],
57 }
58 }
59
60 pub fn executor(mut self, exec: impl AgentExecutor + 'static) -> Self {
61 self.executor = Some(Arc::new(exec));
62 self
63 }
64
65 pub fn storage<S>(mut self, storage: S) -> Self
70 where
71 S: A2aTaskStorage
72 + A2aPushNotificationStorage
73 + A2aEventStore
74 + A2aAtomicStore
75 + Clone
76 + 'static,
77 {
78 self.task_storage = Some(Arc::new(storage.clone()));
79 self.push_storage = Some(Arc::new(storage.clone()));
80 self.event_store = Some(Arc::new(storage.clone()));
81 self.atomic_store = Some(Arc::new(storage));
82 self
83 }
84
85 pub fn task_storage(mut self, s: impl A2aTaskStorage + 'static) -> Self {
88 self.task_storage = Some(Arc::new(s));
89 self
90 }
91
92 pub fn push_storage(mut self, s: impl A2aPushNotificationStorage + 'static) -> Self {
94 self.push_storage = Some(Arc::new(s));
95 self
96 }
97
98 pub fn event_store(mut self, s: impl A2aEventStore + 'static) -> Self {
100 self.event_store = Some(Arc::new(s));
101 self
102 }
103
104 pub fn atomic_store(mut self, s: impl A2aAtomicStore + 'static) -> Self {
106 self.atomic_store = Some(Arc::new(s));
107 self
108 }
109
110 pub fn middleware(mut self, mw: Arc<dyn A2aMiddleware>) -> Self {
111 self.middleware.push(mw);
112 self
113 }
114
115 pub fn build(self) -> Result<LambdaA2aHandler, A2aError> {
116 let executor = self.executor.ok_or(A2aError::Internal(
117 "executor is required".into(),
118 ))?;
119 let task_storage = self.task_storage.ok_or(A2aError::Internal(
120 "task_storage is required for Lambda".into(),
121 ))?;
122 let push_storage = self.push_storage.ok_or(A2aError::Internal(
123 "push_storage is required for Lambda".into(),
124 ))?;
125 let event_store: Arc<dyn A2aEventStore> = self.event_store.ok_or(A2aError::Internal(
126 "event_store is required for Lambda (use .storage() for unified backend)".into(),
127 ))?;
128 let atomic_store: Arc<dyn A2aAtomicStore> = self.atomic_store.ok_or(A2aError::Internal(
129 "atomic_store is required for Lambda (use .storage() for unified backend)".into(),
130 ))?;
131
132 let task_backend = task_storage.backend_name();
134 let push_backend = push_storage.backend_name();
135 let event_backend = event_store.backend_name();
136 let atomic_backend = atomic_store.backend_name();
137 if task_backend != push_backend
138 || task_backend != event_backend
139 || task_backend != atomic_backend
140 {
141 return Err(A2aError::Internal(format!(
142 "Storage backend mismatch: task={task_backend}, push={push_backend}, \
143 event={event_backend}, atomic={atomic_backend}. \
144 ADR-009 requires all storage traits to share the same backend. \
145 Use .storage() for unified backend."
146 )));
147 }
148
149 let state = AppState {
150 executor,
151 task_storage,
152 push_storage,
153 event_store,
154 atomic_store,
155 event_broker: TaskEventBroker::new(),
156 middleware_stack: Arc::new(MiddlewareStack::new(self.middleware)),
157 };
158
159 let router = build_router(state);
160
161 Ok(LambdaA2aHandler { router })
162 }
163}
164
165impl Default for LambdaA2aServerBuilder {
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171#[derive(Clone)]
173pub struct LambdaA2aHandler {
174 router: axum::Router,
175}
176
177impl LambdaA2aHandler {
178 pub fn builder() -> LambdaA2aServerBuilder {
179 LambdaA2aServerBuilder::new()
180 }
181
182 pub async fn handle(
184 &self,
185 event: lambda_http::Request,
186 ) -> Result<lambda_http::Response<lambda_http::Body>, lambda_http::Error> {
187 let axum_req = lambda_to_axum_request(event)?;
188
189 let axum_resp = tower::ServiceExt::oneshot(self.router.clone(), axum_req)
190 .await
191 .map_err(|e| lambda_http::Error::from(format!("Router error: {e}")))?;
192
193 axum_to_lambda_response(axum_resp).await
194 }
195}