a2a_protocol_server/handler/
mod.rs1mod event_processing;
23mod helpers;
24mod lifecycle;
25mod limits;
26mod messaging;
27mod push_config;
28mod shutdown;
29
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33
34use a2a_protocol_types::agent_card::AgentCard;
35use a2a_protocol_types::task::TaskId;
36
37use crate::executor::AgentExecutor;
38use crate::interceptor::ServerInterceptorChain;
39use crate::metrics::Metrics;
40use crate::push::{PushConfigStore, PushSender};
41use crate::store::TaskStore;
42use crate::streaming::{EventQueueManager, InMemoryQueueReader};
43use crate::tenant_config::PerTenantConfig;
44use crate::tenant_resolver::TenantResolver;
45
46pub use limits::HandlerLimits;
47
48pub use a2a_protocol_types::responses::SendMessageResponse;
50
51pub struct RequestHandler {
66 pub(crate) executor: Arc<dyn AgentExecutor>,
67 pub(crate) task_store: Arc<dyn TaskStore>,
68 pub(crate) push_config_store: Arc<dyn PushConfigStore>,
69 pub(crate) push_sender: Option<Arc<dyn PushSender>>,
70 pub(crate) event_queue_manager: EventQueueManager,
71 pub(crate) interceptors: ServerInterceptorChain,
72 pub(crate) agent_card: Option<AgentCard>,
73 pub(crate) executor_timeout: Option<Duration>,
74 pub(crate) metrics: Arc<dyn Metrics>,
75 pub(crate) limits: HandlerLimits,
76 pub(crate) tenant_resolver: Option<Arc<dyn TenantResolver>>,
77 pub(crate) tenant_config: Option<PerTenantConfig>,
78 pub(crate) cancellation_tokens: Arc<tokio::sync::RwLock<HashMap<TaskId, CancellationEntry>>>,
80 pub(crate) context_locks:
84 Arc<tokio::sync::RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>>,
85}
86
87#[derive(Debug, Clone)]
89pub(crate) struct CancellationEntry {
90 pub(crate) token: tokio_util::sync::CancellationToken,
92 pub(crate) created_at: Instant,
94}
95
96impl RequestHandler {
97 #[must_use]
102 pub fn tenant_resolver(&self) -> Option<&dyn TenantResolver> {
103 self.tenant_resolver.as_deref()
104 }
105
106 #[must_use]
111 pub const fn tenant_config(&self) -> Option<&PerTenantConfig> {
112 self.tenant_config.as_ref()
113 }
114}
115
116impl std::fmt::Debug for RequestHandler {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 f.debug_struct("RequestHandler")
119 .field("push_sender", &self.push_sender.is_some())
120 .field("event_queue_manager", &self.event_queue_manager)
121 .field("interceptors", &self.interceptors)
122 .field("agent_card", &self.agent_card.is_some())
123 .field("metrics", &"<dyn Metrics>")
124 .field("tenant_resolver", &self.tenant_resolver.is_some())
125 .field("tenant_config", &self.tenant_config)
126 .finish_non_exhaustive()
127 }
128}
129
130#[derive(Debug)]
132#[allow(clippy::large_enum_variant)]
133pub enum SendMessageResult {
134 Response(SendMessageResponse),
136 Stream(InMemoryQueueReader),
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use crate::agent_executor;
144 use crate::builder::RequestHandlerBuilder;
145 use crate::tenant_config::{PerTenantConfig, TenantLimits};
146 use crate::tenant_resolver::HeaderTenantResolver;
147
148 struct DummyExecutor;
149 agent_executor!(DummyExecutor, |_ctx, _queue| async { Ok(()) });
150
151 #[test]
154 fn default_build_has_no_tenant_resolver() {
155 let handler = RequestHandlerBuilder::new(DummyExecutor)
156 .build()
157 .expect("default build should succeed");
158 assert!(
159 handler.tenant_resolver().is_none(),
160 "default handler should have no tenant resolver"
161 );
162 }
163
164 #[test]
165 fn default_build_has_no_tenant_config() {
166 let handler = RequestHandlerBuilder::new(DummyExecutor)
167 .build()
168 .expect("default build should succeed");
169 assert!(
170 handler.tenant_config().is_none(),
171 "default handler should have no tenant config"
172 );
173 }
174
175 #[test]
178 fn tenant_resolver_returns_some_when_configured() {
179 let handler = RequestHandlerBuilder::new(DummyExecutor)
180 .with_tenant_resolver(HeaderTenantResolver::default())
181 .build()
182 .expect("build with tenant resolver");
183 assert!(
184 handler.tenant_resolver().is_some(),
185 "should return Some when a resolver was configured"
186 );
187 }
188
189 #[test]
190 fn tenant_resolver_returns_none_when_not_configured() {
191 let handler = RequestHandlerBuilder::new(DummyExecutor)
192 .build()
193 .expect("default build");
194 assert!(
195 handler.tenant_resolver().is_none(),
196 "should return None when no resolver was configured"
197 );
198 }
199
200 #[test]
203 fn tenant_config_returns_some_when_configured() {
204 let config = PerTenantConfig::builder()
205 .default_limits(TenantLimits::builder().rate_limit_rps(50).build())
206 .build();
207
208 let handler = RequestHandlerBuilder::new(DummyExecutor)
209 .with_tenant_config(config)
210 .build()
211 .expect("build with tenant config");
212 assert!(
213 handler.tenant_config().is_some(),
214 "should return Some when tenant config was provided"
215 );
216 }
217
218 #[test]
219 fn tenant_config_returns_none_when_not_configured() {
220 let handler = RequestHandlerBuilder::new(DummyExecutor)
221 .build()
222 .expect("default build");
223 assert!(
224 handler.tenant_config().is_none(),
225 "should return None when no tenant config was provided"
226 );
227 }
228
229 #[test]
230 fn tenant_config_preserves_values() {
231 let config = PerTenantConfig::builder()
232 .default_limits(TenantLimits::builder().rate_limit_rps(100).build())
233 .with_override("vip", TenantLimits::builder().rate_limit_rps(500).build())
234 .build();
235
236 let handler = RequestHandlerBuilder::new(DummyExecutor)
237 .with_tenant_config(config)
238 .build()
239 .expect("build with per-tenant overrides");
240
241 let cfg = handler.tenant_config().expect("config should be Some");
242 assert_eq!(cfg.get("vip").rate_limit_rps, Some(500));
243 assert_eq!(cfg.get("unknown-tenant").rate_limit_rps, Some(100));
244 }
245
246 #[test]
249 fn handler_with_both_tenant_fields() {
250 let handler = RequestHandlerBuilder::new(DummyExecutor)
251 .with_tenant_resolver(HeaderTenantResolver::default())
252 .with_tenant_config(
253 PerTenantConfig::builder()
254 .default_limits(TenantLimits::builder().rate_limit_rps(10).build())
255 .build(),
256 )
257 .build()
258 .expect("build with both tenant resolver and config");
259
260 assert!(handler.tenant_resolver().is_some());
261 assert!(handler.tenant_config().is_some());
262 }
263
264 #[test]
267 fn debug_impl_does_not_panic() {
268 let handler = RequestHandlerBuilder::new(DummyExecutor)
269 .build()
270 .expect("default build");
271 let debug = format!("{handler:?}");
272 assert!(
273 debug.contains("RequestHandler"),
274 "Debug output should contain struct name"
275 );
276 }
277
278 #[test]
279 fn debug_shows_tenant_resolver_presence() {
280 let without = RequestHandlerBuilder::new(DummyExecutor).build().unwrap();
281 let with = RequestHandlerBuilder::new(DummyExecutor)
282 .with_tenant_resolver(HeaderTenantResolver::default())
283 .build()
284 .unwrap();
285
286 let dbg_without = format!("{without:?}");
287 let dbg_with = format!("{with:?}");
288
289 assert!(
290 dbg_without.contains("tenant_resolver: false"),
291 "should show false when no resolver: {dbg_without}"
292 );
293 assert!(
294 dbg_with.contains("tenant_resolver: true"),
295 "should show true when resolver configured: {dbg_with}"
296 );
297 }
298
299 #[test]
302 fn send_message_result_response_variant() {
303 use a2a_protocol_types::responses::SendMessageResponse;
304 use a2a_protocol_types::task::{Task, TaskState, TaskStatus};
305
306 let task = Task {
307 id: "t1".into(),
308 context_id: "c1".into(),
309 status: TaskStatus::new(TaskState::Completed),
310 artifacts: None,
311 history: None,
312 metadata: None,
313 };
314 let result = SendMessageResult::Response(SendMessageResponse::Task(task));
315 assert!(matches!(result, SendMessageResult::Response(_)));
316 }
317}