1use std::sync::Arc;
13use std::time::Duration;
14
15use a2a_protocol_types::agent_card::AgentCard;
16
17use crate::error::ServerResult;
18use crate::executor::AgentExecutor;
19use crate::handler::{HandlerLimits, RequestHandler};
20use crate::interceptor::{ServerInterceptor, ServerInterceptorChain};
21use crate::metrics::{Metrics, NoopMetrics};
22use crate::push::{InMemoryPushConfigStore, PushConfigStore, PushSender};
23use crate::store::{InMemoryTaskStore, TaskStore, TaskStoreConfig};
24use crate::streaming::EventQueueManager;
25use crate::tenant_config::PerTenantConfig;
26use crate::tenant_resolver::TenantResolver;
27
28pub struct RequestHandlerBuilder {
46 executor: Arc<dyn AgentExecutor>,
47 task_store: Option<Arc<dyn TaskStore>>,
48 task_store_config: TaskStoreConfig,
49 push_config_store: Option<Arc<dyn PushConfigStore>>,
50 push_sender: Option<Arc<dyn PushSender>>,
51 interceptors: ServerInterceptorChain,
52 agent_card: Option<AgentCard>,
53 executor_timeout: Option<Duration>,
54 event_queue_capacity: Option<usize>,
55 max_event_size: Option<usize>,
56 max_concurrent_streams: Option<usize>,
57 event_queue_write_timeout: Option<Duration>,
58 metrics: Arc<dyn Metrics>,
59 handler_limits: HandlerLimits,
60 tenant_resolver: Option<Arc<dyn TenantResolver>>,
61 tenant_config: Option<PerTenantConfig>,
62}
63
64impl RequestHandlerBuilder {
65 #[must_use]
69 pub fn new(executor: impl AgentExecutor) -> Self {
70 Self {
71 executor: Arc::new(executor),
72 task_store: None,
73 task_store_config: TaskStoreConfig::default(),
74 push_config_store: None,
75 push_sender: None,
76 interceptors: ServerInterceptorChain::new(),
77 agent_card: None,
78 executor_timeout: None,
79 event_queue_capacity: None,
80 max_event_size: None,
81 max_concurrent_streams: None,
82 event_queue_write_timeout: None,
83 metrics: Arc::new(NoopMetrics),
84 handler_limits: HandlerLimits::default(),
85 tenant_resolver: None,
86 tenant_config: None,
87 }
88 }
89
90 #[must_use]
92 pub fn with_task_store(mut self, store: impl TaskStore + 'static) -> Self {
93 self.task_store = Some(Arc::new(store));
94 self
95 }
96
97 #[must_use]
102 pub fn with_task_store_arc(mut self, store: Arc<dyn TaskStore>) -> Self {
103 self.task_store = Some(store);
104 self
105 }
106
107 #[must_use]
115 pub fn with_task_store_config(mut self, config: TaskStoreConfig) -> Self {
116 debug_assert!(
117 self.task_store.is_none(),
118 "with_task_store_config() called after with_task_store(); \
119 the config will be ignored because a custom store was already set"
120 );
121 self.task_store_config = config;
122 self
123 }
124
125 #[must_use]
127 pub fn with_push_config_store(mut self, store: impl PushConfigStore + 'static) -> Self {
128 self.push_config_store = Some(Arc::new(store));
129 self
130 }
131
132 #[must_use]
134 pub fn with_push_sender(mut self, sender: impl PushSender + 'static) -> Self {
135 self.push_sender = Some(Arc::new(sender));
136 self
137 }
138
139 #[must_use]
141 pub fn with_interceptor(mut self, interceptor: impl ServerInterceptor + 'static) -> Self {
142 self.interceptors.push(Arc::new(interceptor));
143 self
144 }
145
146 #[must_use]
151 pub const fn with_executor_timeout(mut self, timeout: Duration) -> Self {
152 self.executor_timeout = Some(timeout);
153 self
154 }
155
156 #[must_use]
158 pub fn with_agent_card(mut self, card: AgentCard) -> Self {
159 self.agent_card = Some(card);
160 self
161 }
162
163 #[must_use]
171 pub const fn with_event_queue_capacity(mut self, capacity: usize) -> Self {
172 self.event_queue_capacity = Some(capacity);
173 self
174 }
175
176 #[must_use]
181 pub const fn with_max_event_size(mut self, max_event_size: usize) -> Self {
182 self.max_event_size = Some(max_event_size);
183 self
184 }
185
186 #[must_use]
191 pub const fn with_max_concurrent_streams(mut self, max: usize) -> Self {
192 self.max_concurrent_streams = Some(max);
193 self
194 }
195
196 #[must_use]
201 pub const fn with_event_queue_write_timeout(mut self, timeout: Duration) -> Self {
202 self.event_queue_write_timeout = Some(timeout);
203 self
204 }
205
206 #[must_use]
210 pub const fn with_handler_limits(mut self, limits: HandlerLimits) -> Self {
211 self.handler_limits = limits;
212 self
213 }
214
215 #[must_use]
219 pub fn with_metrics(mut self, metrics: impl Metrics + 'static) -> Self {
220 self.metrics = Arc::new(metrics);
221 self
222 }
223
224 #[must_use]
233 pub fn with_tenant_resolver(mut self, resolver: impl TenantResolver) -> Self {
234 self.tenant_resolver = Some(Arc::new(resolver));
235 self
236 }
237
238 #[must_use]
247 pub fn with_tenant_config(mut self, config: PerTenantConfig) -> Self {
248 self.tenant_config = Some(config);
249 self
250 }
251
252 #[allow(clippy::too_many_lines)]
260 pub fn build(self) -> ServerResult<RequestHandler> {
261 if let Some(ref card) = self.agent_card {
263 if card.supported_interfaces.is_empty() {
264 return Err(crate::error::ServerError::InvalidParams(
265 "agent card must have at least one supported interface".into(),
266 ));
267 }
268 }
269
270 if let Some(timeout) = self.executor_timeout {
272 if timeout.is_zero() {
273 return Err(crate::error::ServerError::InvalidParams(
274 "executor timeout must be greater than zero".into(),
275 ));
276 }
277 }
278
279 if self.handler_limits.max_id_length == 0 {
281 return Err(crate::error::ServerError::InvalidParams(
282 "max_id_length must be greater than zero".into(),
283 ));
284 }
285 if self.handler_limits.max_metadata_size == 0 {
286 return Err(crate::error::ServerError::InvalidParams(
287 "max_metadata_size must be greater than zero".into(),
288 ));
289 }
290 if self.handler_limits.push_delivery_timeout.is_zero() {
291 return Err(crate::error::ServerError::InvalidParams(
292 "push_delivery_timeout must be greater than zero".into(),
293 ));
294 }
295
296 Ok(RequestHandler {
297 executor: self.executor,
298 task_store: self.task_store.unwrap_or_else(|| {
299 Arc::new(InMemoryTaskStore::with_config(self.task_store_config))
300 }),
301 push_config_store: self
302 .push_config_store
303 .unwrap_or_else(|| Arc::new(InMemoryPushConfigStore::new())),
304 push_sender: self.push_sender,
305 event_queue_manager: {
306 let mut mgr = self
307 .event_queue_capacity
308 .map_or_else(EventQueueManager::new, EventQueueManager::with_capacity);
309 if let Some(max_size) = self.max_event_size {
310 mgr = mgr.with_max_event_size(max_size);
311 }
312 if let Some(timeout) = self.event_queue_write_timeout {
313 mgr = mgr.with_write_timeout(timeout);
314 }
315 if let Some(max_streams) = self.max_concurrent_streams {
316 mgr = mgr.with_max_concurrent_queues(max_streams);
317 }
318 mgr = mgr.with_metrics(Arc::clone(&self.metrics));
319 mgr
320 },
321 interceptors: self.interceptors,
322 agent_card: self.agent_card,
323 executor_timeout: self.executor_timeout,
324 metrics: self.metrics,
325 limits: self.handler_limits,
326 tenant_resolver: self.tenant_resolver,
327 tenant_config: self.tenant_config,
328 cancellation_tokens: Arc::new(tokio::sync::RwLock::new(
329 std::collections::HashMap::new(),
330 )),
331 context_locks: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
332 })
333 }
334}
335
336impl std::fmt::Debug for RequestHandlerBuilder {
337 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
338 f.debug_struct("RequestHandlerBuilder")
339 .field("executor", &"<dyn AgentExecutor>")
340 .field("task_store", &self.task_store.is_some())
341 .field("task_store_config", &self.task_store_config)
342 .field("push_config_store", &self.push_config_store.is_some())
343 .field("push_sender", &self.push_sender.is_some())
344 .field("interceptors", &self.interceptors)
345 .field("agent_card", &self.agent_card.is_some())
346 .field("executor_timeout", &self.executor_timeout)
347 .field("event_queue_capacity", &self.event_queue_capacity)
348 .field("max_event_size", &self.max_event_size)
349 .field("max_concurrent_streams", &self.max_concurrent_streams)
350 .field("event_queue_write_timeout", &self.event_queue_write_timeout)
351 .field("metrics", &"<dyn Metrics>")
352 .field("handler_limits", &self.handler_limits)
353 .field("tenant_resolver", &self.tenant_resolver.is_some())
354 .field("tenant_config", &self.tenant_config)
355 .finish()
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362 use crate::agent_executor;
363
364 struct TestExecutor;
365
366 agent_executor!(TestExecutor, |_ctx, _queue| async { Ok(()) });
367
368 #[test]
369 fn builder_defaults_build_ok() {
370 let handler = RequestHandlerBuilder::new(TestExecutor).build();
371 let h = handler.expect("default builder should succeed");
372 assert!(
373 h.tenant_resolver().is_none(),
374 "default builder should have no tenant resolver"
375 );
376 assert!(
377 h.tenant_config().is_none(),
378 "default builder should have no tenant config"
379 );
380 }
381
382 #[test]
383 fn builder_zero_executor_timeout_errors() {
384 let result = RequestHandlerBuilder::new(TestExecutor)
385 .with_executor_timeout(Duration::ZERO)
386 .build();
387 assert!(result.is_err());
388 }
389
390 #[test]
391 fn builder_empty_agent_card_interfaces_errors() {
392 use a2a_protocol_types::{AgentCapabilities, AgentCard};
393
394 let card = AgentCard {
395 url: None,
396 name: "empty".into(),
397 version: "1.0".into(),
398 description: "No interfaces".into(),
399 supported_interfaces: vec![],
400 provider: None,
401 icon_url: None,
402 documentation_url: None,
403 capabilities: AgentCapabilities::none(),
404 security_schemes: None,
405 security_requirements: None,
406 default_input_modes: vec![],
407 default_output_modes: vec![],
408 skills: vec![],
409 signatures: None,
410 };
411
412 let result = RequestHandlerBuilder::new(TestExecutor)
413 .with_agent_card(card)
414 .build();
415 assert!(result.is_err());
416 }
417
418 #[test]
419 fn builder_with_all_options() {
420 use a2a_protocol_types::{AgentCapabilities, AgentCard, AgentInterface};
421
422 let card = AgentCard {
423 url: None,
424 name: "test".into(),
425 version: "1.0".into(),
426 description: "Test agent".into(),
427 supported_interfaces: vec![AgentInterface {
428 url: "http://localhost:8080".into(),
429 protocol_binding: "JSONRPC".into(),
430 protocol_version: "1.0.0".into(),
431 tenant: None,
432 }],
433 provider: None,
434 icon_url: None,
435 documentation_url: None,
436 capabilities: AgentCapabilities::none(),
437 security_schemes: None,
438 security_requirements: None,
439 default_input_modes: vec![],
440 default_output_modes: vec![],
441 skills: vec![],
442 signatures: None,
443 };
444
445 let result = RequestHandlerBuilder::new(TestExecutor)
446 .with_agent_card(card)
447 .with_executor_timeout(Duration::from_secs(30))
448 .with_event_queue_capacity(128)
449 .with_max_event_size(1024 * 1024)
450 .with_max_concurrent_streams(10)
451 .with_handler_limits(HandlerLimits::default().with_max_id_length(2048))
452 .build();
453 let h = result.expect("builder with all options should succeed");
454 assert!(h.tenant_resolver().is_none(), "no tenant resolver set");
455 }
456
457 #[test]
458 fn builder_with_tenant_resolver_and_config() {
459 use crate::tenant_config::{PerTenantConfig, TenantLimits};
460 use crate::tenant_resolver::HeaderTenantResolver;
461
462 let handler = RequestHandlerBuilder::new(TestExecutor)
463 .with_tenant_resolver(HeaderTenantResolver::default())
464 .with_tenant_config(
465 PerTenantConfig::builder()
466 .default_limits(TenantLimits::builder().rate_limit_rps(100).build())
467 .with_override(
468 "premium",
469 TenantLimits::builder().rate_limit_rps(1000).build(),
470 )
471 .build(),
472 )
473 .build();
474 let handler = handler.expect("builder with tenant resolver and config should succeed");
475 assert!(handler.tenant_resolver().is_some());
476 assert!(handler.tenant_config().is_some());
477 assert_eq!(
478 handler
479 .tenant_config()
480 .unwrap()
481 .get("premium")
482 .rate_limit_rps,
483 Some(1000)
484 );
485 assert_eq!(
486 handler
487 .tenant_config()
488 .unwrap()
489 .get("unknown")
490 .rate_limit_rps,
491 Some(100)
492 );
493 }
494
495 #[test]
496 fn builder_without_tenant_fields() {
497 let handler = RequestHandlerBuilder::new(TestExecutor).build().unwrap();
498 assert!(handler.tenant_resolver().is_none());
499 assert!(handler.tenant_config().is_none());
500 }
501
502 #[test]
503 fn builder_debug_does_not_panic() {
504 let builder = RequestHandlerBuilder::new(TestExecutor);
505 let debug = format!("{builder:?}");
506 assert!(debug.contains("RequestHandlerBuilder"));
507 }
508
509 #[test]
510 fn builder_with_push_config_store_builds_ok() {
511 use crate::push::InMemoryPushConfigStore;
512 let result = RequestHandlerBuilder::new(TestExecutor)
513 .with_push_config_store(InMemoryPushConfigStore::new())
514 .build();
515 let _h = result.expect("builder with push config store should succeed");
516 }
517
518 #[test]
519 fn builder_with_event_queue_write_timeout_builds_ok() {
520 let result = RequestHandlerBuilder::new(TestExecutor)
521 .with_event_queue_write_timeout(Duration::from_secs(10))
522 .build();
523 let _h = result.expect("builder with event queue write timeout should succeed");
524 }
525
526 #[test]
527 fn builder_zero_max_id_length_errors() {
528 let result = RequestHandlerBuilder::new(TestExecutor)
529 .with_handler_limits(HandlerLimits::default().with_max_id_length(0))
530 .build();
531 assert!(result.is_err(), "zero max_id_length should be rejected");
532 }
533
534 #[test]
535 fn builder_zero_max_metadata_size_errors() {
536 let result = RequestHandlerBuilder::new(TestExecutor)
537 .with_handler_limits(HandlerLimits::default().with_max_metadata_size(0))
538 .build();
539 assert!(result.is_err(), "zero max_metadata_size should be rejected");
540 }
541
542 #[test]
543 fn builder_zero_push_delivery_timeout_errors() {
544 let result = RequestHandlerBuilder::new(TestExecutor)
545 .with_handler_limits(
546 HandlerLimits::default().with_push_delivery_timeout(Duration::ZERO),
547 )
548 .build();
549 assert!(
550 result.is_err(),
551 "zero push_delivery_timeout should be rejected"
552 );
553 }
554}