a2a_protocol_server/
builder.rs1use std::sync::Arc;
11use std::time::Duration;
12
13use a2a_protocol_types::agent_card::AgentCard;
14
15use crate::error::ServerResult;
16use crate::executor::AgentExecutor;
17use crate::handler::RequestHandler;
18use crate::interceptor::{ServerInterceptor, ServerInterceptorChain};
19use crate::metrics::{Metrics, NoopMetrics};
20use crate::push::{InMemoryPushConfigStore, PushConfigStore, PushSender};
21use crate::store::{InMemoryTaskStore, TaskStore, TaskStoreConfig};
22use crate::streaming::EventQueueManager;
23
24pub struct RequestHandlerBuilder {
40 executor: Arc<dyn AgentExecutor>,
41 task_store: Option<Box<dyn TaskStore>>,
42 task_store_config: TaskStoreConfig,
43 push_config_store: Option<Box<dyn PushConfigStore>>,
44 push_sender: Option<Box<dyn PushSender>>,
45 interceptors: ServerInterceptorChain,
46 agent_card: Option<AgentCard>,
47 executor_timeout: Option<Duration>,
48 event_queue_capacity: Option<usize>,
49 max_event_size: Option<usize>,
50 max_concurrent_streams: Option<usize>,
51 metrics: Box<dyn Metrics>,
52}
53
54impl RequestHandlerBuilder {
55 #[must_use]
59 pub fn new(executor: impl AgentExecutor) -> Self {
60 Self {
61 executor: Arc::new(executor),
62 task_store: None,
63 task_store_config: TaskStoreConfig::default(),
64 push_config_store: None,
65 push_sender: None,
66 interceptors: ServerInterceptorChain::new(),
67 agent_card: None,
68 executor_timeout: None,
69 event_queue_capacity: None,
70 max_event_size: None,
71 max_concurrent_streams: None,
72 metrics: Box::new(NoopMetrics),
73 }
74 }
75
76 #[must_use]
78 pub fn with_task_store(mut self, store: impl TaskStore + 'static) -> Self {
79 self.task_store = Some(Box::new(store));
80 self
81 }
82
83 #[must_use]
87 pub const fn with_task_store_config(mut self, config: TaskStoreConfig) -> Self {
88 self.task_store_config = config;
89 self
90 }
91
92 #[must_use]
94 pub fn with_push_config_store(mut self, store: impl PushConfigStore + 'static) -> Self {
95 self.push_config_store = Some(Box::new(store));
96 self
97 }
98
99 #[must_use]
101 pub fn with_push_sender(mut self, sender: impl PushSender + 'static) -> Self {
102 self.push_sender = Some(Box::new(sender));
103 self
104 }
105
106 #[must_use]
108 pub fn with_interceptor(mut self, interceptor: impl ServerInterceptor + 'static) -> Self {
109 self.interceptors.push(Arc::new(interceptor));
110 self
111 }
112
113 #[must_use]
118 pub const fn with_executor_timeout(mut self, timeout: Duration) -> Self {
119 self.executor_timeout = Some(timeout);
120 self
121 }
122
123 #[must_use]
125 pub fn with_agent_card(mut self, card: AgentCard) -> Self {
126 self.agent_card = Some(card);
127 self
128 }
129
130 #[must_use]
135 pub const fn with_event_queue_capacity(mut self, capacity: usize) -> Self {
136 self.event_queue_capacity = Some(capacity);
137 self
138 }
139
140 #[must_use]
145 pub const fn with_max_event_size(mut self, max_event_size: usize) -> Self {
146 self.max_event_size = Some(max_event_size);
147 self
148 }
149
150 #[must_use]
155 pub const fn with_max_concurrent_streams(mut self, max: usize) -> Self {
156 self.max_concurrent_streams = Some(max);
157 self
158 }
159
160 #[must_use]
164 pub fn with_metrics(mut self, metrics: impl Metrics + 'static) -> Self {
165 self.metrics = Box::new(metrics);
166 self
167 }
168
169 pub fn build(self) -> ServerResult<RequestHandler> {
177 if let Some(ref card) = self.agent_card {
179 if card.supported_interfaces.is_empty() {
180 return Err(crate::error::ServerError::InvalidParams(
181 "agent card must have at least one supported interface".into(),
182 ));
183 }
184 }
185
186 if let Some(timeout) = self.executor_timeout {
188 if timeout.is_zero() {
189 return Err(crate::error::ServerError::InvalidParams(
190 "executor timeout must be greater than zero".into(),
191 ));
192 }
193 }
194
195 Ok(RequestHandler {
196 executor: self.executor,
197 task_store: self.task_store.unwrap_or_else(|| {
198 Box::new(InMemoryTaskStore::with_config(self.task_store_config))
199 }),
200 push_config_store: self
201 .push_config_store
202 .unwrap_or_else(|| Box::new(InMemoryPushConfigStore::new())),
203 push_sender: self.push_sender,
204 event_queue_manager: {
205 let mut mgr = self
206 .event_queue_capacity
207 .map_or_else(EventQueueManager::new, EventQueueManager::with_capacity);
208 if let Some(max_size) = self.max_event_size {
209 mgr = mgr.with_max_event_size(max_size);
210 }
211 if let Some(max_streams) = self.max_concurrent_streams {
212 mgr = mgr.with_max_concurrent_queues(max_streams);
213 }
214 mgr
215 },
216 interceptors: self.interceptors,
217 agent_card: self.agent_card,
218 executor_timeout: self.executor_timeout,
219 metrics: self.metrics,
220 cancellation_tokens: Arc::new(tokio::sync::RwLock::new(
221 std::collections::HashMap::new(),
222 )),
223 })
224 }
225}
226
227impl std::fmt::Debug for RequestHandlerBuilder {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 f.debug_struct("RequestHandlerBuilder")
230 .field("executor", &"<dyn AgentExecutor>")
231 .field("task_store", &self.task_store.is_some())
232 .field("task_store_config", &self.task_store_config)
233 .field("push_config_store", &self.push_config_store.is_some())
234 .field("push_sender", &self.push_sender.is_some())
235 .field("interceptors", &self.interceptors)
236 .field("agent_card", &self.agent_card.is_some())
237 .field("executor_timeout", &self.executor_timeout)
238 .field("event_queue_capacity", &self.event_queue_capacity)
239 .field("max_event_size", &self.max_event_size)
240 .field("max_concurrent_streams", &self.max_concurrent_streams)
241 .field("metrics", &"<dyn Metrics>")
242 .finish()
243 }
244}