Skip to main content

a2a_protocol_server/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Tom F.
3
4//! Builder for [`RequestHandler`].
5//!
6//! [`RequestHandlerBuilder`] provides a fluent API for constructing a
7//! [`RequestHandler`] with optional stores, push sender, interceptors,
8//! and agent card.
9
10use std::sync::Arc;
11use std::time::Duration;
12
13use a2a_protocol_types::agent_card::AgentCard;
14
15use crate::error::ServerResult;
16use crate::executor::AgentExecutor;
17use crate::handler::RequestHandler;
18use crate::interceptor::{ServerInterceptor, ServerInterceptorChain};
19use crate::metrics::{Metrics, NoopMetrics};
20use crate::push::{InMemoryPushConfigStore, PushConfigStore, PushSender};
21use crate::store::{InMemoryTaskStore, TaskStore, TaskStoreConfig};
22use crate::streaming::EventQueueManager;
23
24/// Fluent builder for [`RequestHandler`].
25///
26/// # Required
27///
28/// - `executor`: Any [`AgentExecutor`] implementation (passed as a concrete
29///   type; the builder erases it to `Arc<dyn AgentExecutor>` during
30///   [`build`](Self::build)).
31///
32/// # Optional (with defaults)
33///
34/// - `task_store`: defaults to [`InMemoryTaskStore`].
35/// - `push_config_store`: defaults to [`InMemoryPushConfigStore`].
36/// - `push_sender`: defaults to `None`.
37/// - `interceptors`: defaults to an empty chain.
38/// - `agent_card`: defaults to `None`.
39pub struct RequestHandlerBuilder {
40    executor: Arc<dyn AgentExecutor>,
41    task_store: Option<Box<dyn TaskStore>>,
42    task_store_config: TaskStoreConfig,
43    push_config_store: Option<Box<dyn PushConfigStore>>,
44    push_sender: Option<Box<dyn PushSender>>,
45    interceptors: ServerInterceptorChain,
46    agent_card: Option<AgentCard>,
47    executor_timeout: Option<Duration>,
48    event_queue_capacity: Option<usize>,
49    max_event_size: Option<usize>,
50    max_concurrent_streams: Option<usize>,
51    metrics: Box<dyn Metrics>,
52}
53
54impl RequestHandlerBuilder {
55    /// Creates a new builder with the given executor.
56    ///
57    /// The executor is type-erased to `Arc<dyn AgentExecutor>`.
58    #[must_use]
59    pub fn new(executor: impl AgentExecutor) -> Self {
60        Self {
61            executor: Arc::new(executor),
62            task_store: None,
63            task_store_config: TaskStoreConfig::default(),
64            push_config_store: None,
65            push_sender: None,
66            interceptors: ServerInterceptorChain::new(),
67            agent_card: None,
68            executor_timeout: None,
69            event_queue_capacity: None,
70            max_event_size: None,
71            max_concurrent_streams: None,
72            metrics: Box::new(NoopMetrics),
73        }
74    }
75
76    /// Sets a custom task store.
77    #[must_use]
78    pub fn with_task_store(mut self, store: impl TaskStore + 'static) -> Self {
79        self.task_store = Some(Box::new(store));
80        self
81    }
82
83    /// Configures the default [`InMemoryTaskStore`] with custom TTL and capacity settings.
84    ///
85    /// This is ignored if a custom task store is set via [`with_task_store`](Self::with_task_store).
86    #[must_use]
87    pub const fn with_task_store_config(mut self, config: TaskStoreConfig) -> Self {
88        self.task_store_config = config;
89        self
90    }
91
92    /// Sets a custom push configuration store.
93    #[must_use]
94    pub fn with_push_config_store(mut self, store: impl PushConfigStore + 'static) -> Self {
95        self.push_config_store = Some(Box::new(store));
96        self
97    }
98
99    /// Sets a push notification sender.
100    #[must_use]
101    pub fn with_push_sender(mut self, sender: impl PushSender + 'static) -> Self {
102        self.push_sender = Some(Box::new(sender));
103        self
104    }
105
106    /// Adds a server interceptor to the chain.
107    #[must_use]
108    pub fn with_interceptor(mut self, interceptor: impl ServerInterceptor + 'static) -> Self {
109        self.interceptors.push(Arc::new(interceptor));
110        self
111    }
112
113    /// Sets a timeout for executor execution.
114    ///
115    /// If the executor does not complete within this duration, the task is
116    /// marked as failed with a timeout error.
117    #[must_use]
118    pub const fn with_executor_timeout(mut self, timeout: Duration) -> Self {
119        self.executor_timeout = Some(timeout);
120        self
121    }
122
123    /// Sets the agent card for discovery responses.
124    #[must_use]
125    pub fn with_agent_card(mut self, card: AgentCard) -> Self {
126        self.agent_card = Some(card);
127        self
128    }
129
130    /// Sets the event queue channel capacity for streaming.
131    ///
132    /// Defaults to 64 items. Higher values allow more events to be buffered
133    /// before backpressure is applied.
134    #[must_use]
135    pub const fn with_event_queue_capacity(mut self, capacity: usize) -> Self {
136        self.event_queue_capacity = Some(capacity);
137        self
138    }
139
140    /// Sets the maximum serialized event size in bytes.
141    ///
142    /// Events exceeding this size are rejected to prevent OOM conditions.
143    /// Defaults to 16 MiB.
144    #[must_use]
145    pub const fn with_max_event_size(mut self, max_event_size: usize) -> Self {
146        self.max_event_size = Some(max_event_size);
147        self
148    }
149
150    /// Sets the maximum number of concurrent streaming event queues.
151    ///
152    /// Limits memory usage from concurrent streams. When the limit is reached,
153    /// new streaming requests will fail.
154    #[must_use]
155    pub const fn with_max_concurrent_streams(mut self, max: usize) -> Self {
156        self.max_concurrent_streams = Some(max);
157        self
158    }
159
160    /// Sets a metrics observer for handler activity.
161    ///
162    /// Defaults to [`NoopMetrics`] which discards all events.
163    #[must_use]
164    pub fn with_metrics(mut self, metrics: impl Metrics + 'static) -> Self {
165        self.metrics = Box::new(metrics);
166        self
167    }
168
169    /// Builds the [`RequestHandler`].
170    ///
171    /// # Errors
172    ///
173    /// Returns [`ServerError::InvalidParams`](crate::error::ServerError::InvalidParams) if the configuration is invalid:
174    /// - Agent card with empty `supported_interfaces`
175    /// - Zero executor timeout (would cause immediate timeouts)
176    pub fn build(self) -> ServerResult<RequestHandler> {
177        // Validate agent card if provided.
178        if let Some(ref card) = self.agent_card {
179            if card.supported_interfaces.is_empty() {
180                return Err(crate::error::ServerError::InvalidParams(
181                    "agent card must have at least one supported interface".into(),
182                ));
183            }
184        }
185
186        // Validate executor timeout is not zero.
187        if let Some(timeout) = self.executor_timeout {
188            if timeout.is_zero() {
189                return Err(crate::error::ServerError::InvalidParams(
190                    "executor timeout must be greater than zero".into(),
191                ));
192            }
193        }
194
195        Ok(RequestHandler {
196            executor: self.executor,
197            task_store: self.task_store.unwrap_or_else(|| {
198                Box::new(InMemoryTaskStore::with_config(self.task_store_config))
199            }),
200            push_config_store: self
201                .push_config_store
202                .unwrap_or_else(|| Box::new(InMemoryPushConfigStore::new())),
203            push_sender: self.push_sender,
204            event_queue_manager: {
205                let mut mgr = self
206                    .event_queue_capacity
207                    .map_or_else(EventQueueManager::new, EventQueueManager::with_capacity);
208                if let Some(max_size) = self.max_event_size {
209                    mgr = mgr.with_max_event_size(max_size);
210                }
211                if let Some(max_streams) = self.max_concurrent_streams {
212                    mgr = mgr.with_max_concurrent_queues(max_streams);
213                }
214                mgr
215            },
216            interceptors: self.interceptors,
217            agent_card: self.agent_card,
218            executor_timeout: self.executor_timeout,
219            metrics: self.metrics,
220            cancellation_tokens: Arc::new(tokio::sync::RwLock::new(
221                std::collections::HashMap::new(),
222            )),
223        })
224    }
225}
226
227impl std::fmt::Debug for RequestHandlerBuilder {
228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229        f.debug_struct("RequestHandlerBuilder")
230            .field("executor", &"<dyn AgentExecutor>")
231            .field("task_store", &self.task_store.is_some())
232            .field("task_store_config", &self.task_store_config)
233            .field("push_config_store", &self.push_config_store.is_some())
234            .field("push_sender", &self.push_sender.is_some())
235            .field("interceptors", &self.interceptors)
236            .field("agent_card", &self.agent_card.is_some())
237            .field("executor_timeout", &self.executor_timeout)
238            .field("event_queue_capacity", &self.event_queue_capacity)
239            .field("max_event_size", &self.max_event_size)
240            .field("max_concurrent_streams", &self.max_concurrent_streams)
241            .field("metrics", &"<dyn Metrics>")
242            .finish()
243    }
244}