1use std::sync::Arc;
13use std::time::Duration;
14
15use a2a_protocol_types::agent_card::AgentCard;
16
17use crate::error::ServerResult;
18use crate::executor::AgentExecutor;
19use crate::handler::{HandlerLimits, RequestHandler};
20use crate::interceptor::{ServerInterceptor, ServerInterceptorChain};
21use crate::metrics::{Metrics, NoopMetrics};
22use crate::push::{InMemoryPushConfigStore, PushConfigStore, PushSender};
23use crate::store::{InMemoryTaskStore, TaskStore, TaskStoreConfig};
24use crate::streaming::EventQueueManager;
25use crate::tenant_config::PerTenantConfig;
26use crate::tenant_resolver::TenantResolver;
27
28pub struct RequestHandlerBuilder {
46 executor: Arc<dyn AgentExecutor>,
47 task_store: Option<Arc<dyn TaskStore>>,
48 task_store_config: TaskStoreConfig,
49 push_config_store: Option<Arc<dyn PushConfigStore>>,
50 push_sender: Option<Arc<dyn PushSender>>,
51 interceptors: ServerInterceptorChain,
52 agent_card: Option<AgentCard>,
53 executor_timeout: Option<Duration>,
54 event_queue_capacity: Option<usize>,
55 max_event_size: Option<usize>,
56 max_concurrent_streams: Option<usize>,
57 event_queue_write_timeout: Option<Duration>,
58 metrics: Arc<dyn Metrics>,
59 handler_limits: HandlerLimits,
60 tenant_resolver: Option<Arc<dyn TenantResolver>>,
61 tenant_config: Option<PerTenantConfig>,
62}
63
64impl RequestHandlerBuilder {
65 #[must_use]
69 pub fn new(executor: impl AgentExecutor) -> Self {
70 Self {
71 executor: Arc::new(executor),
72 task_store: None,
73 task_store_config: TaskStoreConfig::default(),
74 push_config_store: None,
75 push_sender: None,
76 interceptors: ServerInterceptorChain::new(),
77 agent_card: None,
78 executor_timeout: None,
79 event_queue_capacity: None,
80 max_event_size: None,
81 max_concurrent_streams: None,
82 event_queue_write_timeout: None,
83 metrics: Arc::new(NoopMetrics),
84 handler_limits: HandlerLimits::default(),
85 tenant_resolver: None,
86 tenant_config: None,
87 }
88 }
89
90 #[must_use]
92 pub fn with_task_store(mut self, store: impl TaskStore + 'static) -> Self {
93 self.task_store = Some(Arc::new(store));
94 self
95 }
96
97 #[must_use]
102 pub fn with_task_store_arc(mut self, store: Arc<dyn TaskStore>) -> Self {
103 self.task_store = Some(store);
104 self
105 }
106
107 #[must_use]
115 pub fn with_task_store_config(mut self, config: TaskStoreConfig) -> Self {
116 debug_assert!(
117 self.task_store.is_none(),
118 "with_task_store_config() called after with_task_store(); \
119 the config will be ignored because a custom store was already set"
120 );
121 self.task_store_config = config;
122 self
123 }
124
125 #[must_use]
127 pub fn with_push_config_store(mut self, store: impl PushConfigStore + 'static) -> Self {
128 self.push_config_store = Some(Arc::new(store));
129 self
130 }
131
132 #[must_use]
134 pub fn with_push_sender(mut self, sender: impl PushSender + 'static) -> Self {
135 self.push_sender = Some(Arc::new(sender));
136 self
137 }
138
139 #[must_use]
141 pub fn with_interceptor(mut self, interceptor: impl ServerInterceptor + 'static) -> Self {
142 self.interceptors.push(Arc::new(interceptor));
143 self
144 }
145
146 #[must_use]
151 pub const fn with_executor_timeout(mut self, timeout: Duration) -> Self {
152 self.executor_timeout = Some(timeout);
153 self
154 }
155
156 #[must_use]
158 pub fn with_agent_card(mut self, card: AgentCard) -> Self {
159 self.agent_card = Some(card);
160 self
161 }
162
163 #[must_use]
168 pub const fn with_event_queue_capacity(mut self, capacity: usize) -> Self {
169 self.event_queue_capacity = Some(capacity);
170 self
171 }
172
173 #[must_use]
178 pub const fn with_max_event_size(mut self, max_event_size: usize) -> Self {
179 self.max_event_size = Some(max_event_size);
180 self
181 }
182
183 #[must_use]
188 pub const fn with_max_concurrent_streams(mut self, max: usize) -> Self {
189 self.max_concurrent_streams = Some(max);
190 self
191 }
192
193 #[must_use]
198 pub const fn with_event_queue_write_timeout(mut self, timeout: Duration) -> Self {
199 self.event_queue_write_timeout = Some(timeout);
200 self
201 }
202
203 #[must_use]
207 pub const fn with_handler_limits(mut self, limits: HandlerLimits) -> Self {
208 self.handler_limits = limits;
209 self
210 }
211
212 #[must_use]
216 pub fn with_metrics(mut self, metrics: impl Metrics + 'static) -> Self {
217 self.metrics = Arc::new(metrics);
218 self
219 }
220
221 #[must_use]
230 pub fn with_tenant_resolver(mut self, resolver: impl TenantResolver) -> Self {
231 self.tenant_resolver = Some(Arc::new(resolver));
232 self
233 }
234
235 #[must_use]
244 pub fn with_tenant_config(mut self, config: PerTenantConfig) -> Self {
245 self.tenant_config = Some(config);
246 self
247 }
248
249 #[allow(clippy::too_many_lines)]
257 pub fn build(self) -> ServerResult<RequestHandler> {
258 if let Some(ref card) = self.agent_card {
260 if card.supported_interfaces.is_empty() {
261 return Err(crate::error::ServerError::InvalidParams(
262 "agent card must have at least one supported interface".into(),
263 ));
264 }
265 }
266
267 if let Some(timeout) = self.executor_timeout {
269 if timeout.is_zero() {
270 return Err(crate::error::ServerError::InvalidParams(
271 "executor timeout must be greater than zero".into(),
272 ));
273 }
274 }
275
276 if self.handler_limits.max_id_length == 0 {
278 return Err(crate::error::ServerError::InvalidParams(
279 "max_id_length must be greater than zero".into(),
280 ));
281 }
282 if self.handler_limits.max_metadata_size == 0 {
283 return Err(crate::error::ServerError::InvalidParams(
284 "max_metadata_size must be greater than zero".into(),
285 ));
286 }
287 if self.handler_limits.push_delivery_timeout.is_zero() {
288 return Err(crate::error::ServerError::InvalidParams(
289 "push_delivery_timeout must be greater than zero".into(),
290 ));
291 }
292
293 Ok(RequestHandler {
294 executor: self.executor,
295 task_store: self.task_store.unwrap_or_else(|| {
296 Arc::new(InMemoryTaskStore::with_config(self.task_store_config))
297 }),
298 push_config_store: self
299 .push_config_store
300 .unwrap_or_else(|| Arc::new(InMemoryPushConfigStore::new())),
301 push_sender: self.push_sender,
302 event_queue_manager: {
303 let mut mgr = self
304 .event_queue_capacity
305 .map_or_else(EventQueueManager::new, EventQueueManager::with_capacity);
306 if let Some(max_size) = self.max_event_size {
307 mgr = mgr.with_max_event_size(max_size);
308 }
309 if let Some(timeout) = self.event_queue_write_timeout {
310 mgr = mgr.with_write_timeout(timeout);
311 }
312 if let Some(max_streams) = self.max_concurrent_streams {
313 mgr = mgr.with_max_concurrent_queues(max_streams);
314 }
315 mgr = mgr.with_metrics(Arc::clone(&self.metrics));
316 mgr
317 },
318 interceptors: self.interceptors,
319 agent_card: self.agent_card,
320 executor_timeout: self.executor_timeout,
321 metrics: self.metrics,
322 limits: self.handler_limits,
323 tenant_resolver: self.tenant_resolver,
324 tenant_config: self.tenant_config,
325 cancellation_tokens: Arc::new(tokio::sync::RwLock::new(
326 std::collections::HashMap::new(),
327 )),
328 context_locks: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
329 })
330 }
331}
332
333impl std::fmt::Debug for RequestHandlerBuilder {
334 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
335 f.debug_struct("RequestHandlerBuilder")
336 .field("executor", &"<dyn AgentExecutor>")
337 .field("task_store", &self.task_store.is_some())
338 .field("task_store_config", &self.task_store_config)
339 .field("push_config_store", &self.push_config_store.is_some())
340 .field("push_sender", &self.push_sender.is_some())
341 .field("interceptors", &self.interceptors)
342 .field("agent_card", &self.agent_card.is_some())
343 .field("executor_timeout", &self.executor_timeout)
344 .field("event_queue_capacity", &self.event_queue_capacity)
345 .field("max_event_size", &self.max_event_size)
346 .field("max_concurrent_streams", &self.max_concurrent_streams)
347 .field("event_queue_write_timeout", &self.event_queue_write_timeout)
348 .field("metrics", &"<dyn Metrics>")
349 .field("handler_limits", &self.handler_limits)
350 .field("tenant_resolver", &self.tenant_resolver.is_some())
351 .field("tenant_config", &self.tenant_config)
352 .finish()
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359 use crate::agent_executor;
360
361 struct TestExecutor;
362
363 agent_executor!(TestExecutor, |_ctx, _queue| async { Ok(()) });
364
365 #[test]
366 fn builder_defaults_build_ok() {
367 let handler = RequestHandlerBuilder::new(TestExecutor).build();
368 let h = handler.expect("default builder should succeed");
369 assert!(
370 h.tenant_resolver().is_none(),
371 "default builder should have no tenant resolver"
372 );
373 assert!(
374 h.tenant_config().is_none(),
375 "default builder should have no tenant config"
376 );
377 }
378
379 #[test]
380 fn builder_zero_executor_timeout_errors() {
381 let result = RequestHandlerBuilder::new(TestExecutor)
382 .with_executor_timeout(Duration::ZERO)
383 .build();
384 assert!(result.is_err());
385 }
386
387 #[test]
388 fn builder_empty_agent_card_interfaces_errors() {
389 use a2a_protocol_types::{AgentCapabilities, AgentCard};
390
391 let card = AgentCard {
392 url: None,
393 name: "empty".into(),
394 version: "1.0".into(),
395 description: "No interfaces".into(),
396 supported_interfaces: vec![],
397 provider: None,
398 icon_url: None,
399 documentation_url: None,
400 capabilities: AgentCapabilities::none(),
401 security_schemes: None,
402 security_requirements: None,
403 default_input_modes: vec![],
404 default_output_modes: vec![],
405 skills: vec![],
406 signatures: None,
407 };
408
409 let result = RequestHandlerBuilder::new(TestExecutor)
410 .with_agent_card(card)
411 .build();
412 assert!(result.is_err());
413 }
414
415 #[test]
416 fn builder_with_all_options() {
417 use a2a_protocol_types::{AgentCapabilities, AgentCard, AgentInterface};
418
419 let card = AgentCard {
420 url: None,
421 name: "test".into(),
422 version: "1.0".into(),
423 description: "Test agent".into(),
424 supported_interfaces: vec![AgentInterface {
425 url: "http://localhost:8080".into(),
426 protocol_binding: "JSONRPC".into(),
427 protocol_version: "1.0.0".into(),
428 tenant: None,
429 }],
430 provider: None,
431 icon_url: None,
432 documentation_url: None,
433 capabilities: AgentCapabilities::none(),
434 security_schemes: None,
435 security_requirements: None,
436 default_input_modes: vec![],
437 default_output_modes: vec![],
438 skills: vec![],
439 signatures: None,
440 };
441
442 let result = RequestHandlerBuilder::new(TestExecutor)
443 .with_agent_card(card)
444 .with_executor_timeout(Duration::from_secs(30))
445 .with_event_queue_capacity(128)
446 .with_max_event_size(1024 * 1024)
447 .with_max_concurrent_streams(10)
448 .with_handler_limits(HandlerLimits::default().with_max_id_length(2048))
449 .build();
450 let h = result.expect("builder with all options should succeed");
451 assert!(h.tenant_resolver().is_none(), "no tenant resolver set");
452 }
453
454 #[test]
455 fn builder_with_tenant_resolver_and_config() {
456 use crate::tenant_config::{PerTenantConfig, TenantLimits};
457 use crate::tenant_resolver::HeaderTenantResolver;
458
459 let handler = RequestHandlerBuilder::new(TestExecutor)
460 .with_tenant_resolver(HeaderTenantResolver::default())
461 .with_tenant_config(
462 PerTenantConfig::builder()
463 .default_limits(TenantLimits::builder().rate_limit_rps(100).build())
464 .with_override(
465 "premium",
466 TenantLimits::builder().rate_limit_rps(1000).build(),
467 )
468 .build(),
469 )
470 .build();
471 let handler = handler.expect("builder with tenant resolver and config should succeed");
472 assert!(handler.tenant_resolver().is_some());
473 assert!(handler.tenant_config().is_some());
474 assert_eq!(
475 handler
476 .tenant_config()
477 .unwrap()
478 .get("premium")
479 .rate_limit_rps,
480 Some(1000)
481 );
482 assert_eq!(
483 handler
484 .tenant_config()
485 .unwrap()
486 .get("unknown")
487 .rate_limit_rps,
488 Some(100)
489 );
490 }
491
492 #[test]
493 fn builder_without_tenant_fields() {
494 let handler = RequestHandlerBuilder::new(TestExecutor).build().unwrap();
495 assert!(handler.tenant_resolver().is_none());
496 assert!(handler.tenant_config().is_none());
497 }
498
499 #[test]
500 fn builder_debug_does_not_panic() {
501 let builder = RequestHandlerBuilder::new(TestExecutor);
502 let debug = format!("{builder:?}");
503 assert!(debug.contains("RequestHandlerBuilder"));
504 }
505
506 #[test]
507 fn builder_with_push_config_store_builds_ok() {
508 use crate::push::InMemoryPushConfigStore;
509 let result = RequestHandlerBuilder::new(TestExecutor)
510 .with_push_config_store(InMemoryPushConfigStore::new())
511 .build();
512 let _h = result.expect("builder with push config store should succeed");
513 }
514
515 #[test]
516 fn builder_with_event_queue_write_timeout_builds_ok() {
517 let result = RequestHandlerBuilder::new(TestExecutor)
518 .with_event_queue_write_timeout(Duration::from_secs(10))
519 .build();
520 let _h = result.expect("builder with event queue write timeout should succeed");
521 }
522
523 #[test]
524 fn builder_zero_max_id_length_errors() {
525 let result = RequestHandlerBuilder::new(TestExecutor)
526 .with_handler_limits(HandlerLimits::default().with_max_id_length(0))
527 .build();
528 assert!(result.is_err(), "zero max_id_length should be rejected");
529 }
530
531 #[test]
532 fn builder_zero_max_metadata_size_errors() {
533 let result = RequestHandlerBuilder::new(TestExecutor)
534 .with_handler_limits(HandlerLimits::default().with_max_metadata_size(0))
535 .build();
536 assert!(result.is_err(), "zero max_metadata_size should be rejected");
537 }
538
539 #[test]
540 fn builder_zero_push_delivery_timeout_errors() {
541 let result = RequestHandlerBuilder::new(TestExecutor)
542 .with_handler_limits(
543 HandlerLimits::default().with_push_delivery_timeout(Duration::ZERO),
544 )
545 .build();
546 assert!(
547 result.is_err(),
548 "zero push_delivery_timeout should be rejected"
549 );
550 }
551}