a2a_protocol_server/streaming/event_queue.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Tom F.
3
4//! Event queue for server-side streaming.
5//!
6//! The executor writes [`StreamResponse`] events to an [`EventQueueWriter`];
7//! the HTTP layer reads them from an [`EventQueueReader`] and serializes them
8//! as SSE frames.
9//!
10//! [`InMemoryQueueWriter`] and [`InMemoryQueueReader`] are backed by a
11//! bounded `tokio::sync::mpsc` channel.
12
13use std::collections::HashMap;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::Arc;
17
18use a2a_protocol_types::error::{A2aError, A2aResult};
19use a2a_protocol_types::events::StreamResponse;
20use a2a_protocol_types::task::TaskId;
21use tokio::sync::{mpsc, RwLock};
22
23/// Default channel capacity for event queues.
24pub const DEFAULT_QUEUE_CAPACITY: usize = 64;
25
26/// Default maximum event size in bytes (16 MiB).
27pub const DEFAULT_MAX_EVENT_SIZE: usize = 16 * 1024 * 1024;
28
29/// Default write timeout for event queue sends (5 seconds).
30///
31/// Prevents executor from blocking indefinitely on a slow/disconnected client.
32pub const DEFAULT_WRITE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
33
34// ── EventQueueWriter ─────────────────────────────────────────────────────────
35
36/// Trait for writing streaming events.
37///
38/// Object-safe; used as `&dyn EventQueueWriter` in the executor.
39pub trait EventQueueWriter: Send + Sync + 'static {
40 /// Writes a streaming event to the queue.
41 ///
42 /// # Errors
43 ///
44 /// Returns an [`A2aError`] if the receiver has been dropped.
45 fn write<'a>(
46 &'a self,
47 event: StreamResponse,
48 ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>;
49
50 /// Signals that no more events will be written.
51 ///
52 /// # Errors
53 ///
54 /// Returns an [`A2aError`] if closing fails.
55 fn close<'a>(&'a self) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>;
56}
57
58// ── EventQueueReader ─────────────────────────────────────────────────────────
59
60/// Trait for reading streaming events.
61///
62/// NOT object-safe (used as a concrete type internally). The `async fn` is
63/// fine because this trait is never used behind `dyn`.
64pub trait EventQueueReader: Send + 'static {
65 /// Reads the next event, returning `None` when the stream is closed.
66 fn read(
67 &mut self,
68 ) -> Pin<Box<dyn Future<Output = Option<A2aResult<StreamResponse>>> + Send + '_>>;
69}
70
71// ── InMemoryQueueWriter ──────────────────────────────────────────────────────
72
73/// In-memory [`EventQueueWriter`] backed by an `mpsc` channel sender.
74///
75/// Enforces a maximum serialized event size to prevent OOM from oversized
76/// events written by executors, and a write timeout to prevent blocking
77/// indefinitely on slow clients (PR-1).
78#[derive(Debug, Clone)]
79pub struct InMemoryQueueWriter {
80 tx: mpsc::Sender<A2aResult<StreamResponse>>,
81 /// Maximum serialized event size in bytes.
82 max_event_size: usize,
83 /// Write timeout — prevents executor from blocking if client is slow.
84 write_timeout: std::time::Duration,
85}
86
87#[allow(clippy::manual_async_fn)]
88impl EventQueueWriter for InMemoryQueueWriter {
89 fn write<'a>(
90 &'a self,
91 event: StreamResponse,
92 ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
93 Box::pin(async move {
94 // Check serialized event size to prevent OOM from oversized events.
95 let serialized_size = serde_json::to_string(&event).map(|s| s.len()).unwrap_or(0);
96 if serialized_size > self.max_event_size {
97 return Err(A2aError::internal(format!(
98 "event size {} bytes exceeds maximum {} bytes",
99 serialized_size, self.max_event_size
100 )));
101 }
102 // Apply write timeout to prevent blocking on slow clients (PR-1).
103 tokio::time::timeout(self.write_timeout, self.tx.send(Ok(event)))
104 .await
105 .map_err(|_| A2aError::internal("event queue write timed out"))?
106 .map_err(|_| A2aError::internal("event queue receiver dropped"))
107 })
108 }
109
110 fn close<'a>(&'a self) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
111 Box::pin(async move {
112 // Dropping all senders closes the channel. Since `InMemoryQueueWriter`
113 // is `Clone`, we cannot truly close here — but we can drop our permit
114 // by sending nothing and letting the reader see `None` when all
115 // senders are dropped. As a convention, send a synthetic close signal
116 // isn't needed; the spawned task will drop its writer clone.
117 Ok(())
118 })
119 }
120}
121
122// ── InMemoryQueueReader ──────────────────────────────────────────────────────
123
124/// In-memory [`EventQueueReader`] backed by an `mpsc` channel receiver.
125#[derive(Debug)]
126pub struct InMemoryQueueReader {
127 rx: mpsc::Receiver<A2aResult<StreamResponse>>,
128}
129
130impl EventQueueReader for InMemoryQueueReader {
131 fn read(
132 &mut self,
133 ) -> Pin<Box<dyn Future<Output = Option<A2aResult<StreamResponse>>> + Send + '_>> {
134 Box::pin(async move { self.rx.recv().await })
135 }
136}
137
138// ── Constructor ──────────────────────────────────────────────────────────────
139
140/// Creates a new in-memory event queue pair with the default capacity and
141/// default max event size.
142#[must_use]
143pub fn new_in_memory_queue() -> (InMemoryQueueWriter, InMemoryQueueReader) {
144 new_in_memory_queue_with_options(DEFAULT_QUEUE_CAPACITY, DEFAULT_MAX_EVENT_SIZE)
145}
146
147/// Creates a new in-memory event queue pair with the specified capacity
148/// and default max event size.
149#[must_use]
150pub fn new_in_memory_queue_with_capacity(
151 capacity: usize,
152) -> (InMemoryQueueWriter, InMemoryQueueReader) {
153 new_in_memory_queue_with_options(capacity, DEFAULT_MAX_EVENT_SIZE)
154}
155
156/// Creates a new in-memory event queue pair with the specified capacity
157/// and maximum event size.
158#[must_use]
159pub fn new_in_memory_queue_with_options(
160 capacity: usize,
161 max_event_size: usize,
162) -> (InMemoryQueueWriter, InMemoryQueueReader) {
163 let (tx, rx) = mpsc::channel(capacity);
164 (
165 InMemoryQueueWriter {
166 tx,
167 max_event_size,
168 write_timeout: DEFAULT_WRITE_TIMEOUT,
169 },
170 InMemoryQueueReader { rx },
171 )
172}
173
174// ── EventQueueManager ────────────────────────────────────────────────────────
175
176/// Manages event queues for active tasks.
177///
178/// Each task can have at most one active writer. When a client subscribes
179/// (or resubscribes), the manager returns the existing writer and a fresh
180/// reader, or creates both if none exists.
181#[derive(Debug, Clone)]
182pub struct EventQueueManager {
183 writers: Arc<RwLock<HashMap<TaskId, Arc<InMemoryQueueWriter>>>>,
184 /// Channel capacity for new event queues.
185 capacity: usize,
186 /// Maximum serialized event size in bytes.
187 max_event_size: usize,
188 /// Maximum number of concurrent event queues. `None` means no limit.
189 max_concurrent_queues: Option<usize>,
190}
191
192impl Default for EventQueueManager {
193 fn default() -> Self {
194 Self {
195 writers: Arc::default(),
196 capacity: DEFAULT_QUEUE_CAPACITY,
197 max_event_size: DEFAULT_MAX_EVENT_SIZE,
198 max_concurrent_queues: None,
199 }
200 }
201}
202
203impl EventQueueManager {
204 /// Creates a new, empty event queue manager with default capacity.
205 ///
206 /// # Examples
207 ///
208 /// ```
209 /// use a2a_protocol_server::EventQueueManager;
210 ///
211 /// let manager = EventQueueManager::new();
212 /// ```
213 #[must_use]
214 pub fn new() -> Self {
215 Self::default()
216 }
217
218 /// Creates a new event queue manager with the specified channel capacity.
219 #[must_use]
220 pub fn with_capacity(capacity: usize) -> Self {
221 Self {
222 writers: Arc::default(),
223 capacity,
224 max_event_size: DEFAULT_MAX_EVENT_SIZE,
225 max_concurrent_queues: None,
226 }
227 }
228
229 /// Creates a new event queue manager with the specified maximum event size.
230 ///
231 /// Events exceeding this size (in serialized bytes) will be rejected with
232 /// an error to prevent OOM conditions.
233 #[must_use]
234 pub const fn with_max_event_size(mut self, max_event_size: usize) -> Self {
235 self.max_event_size = max_event_size;
236 self
237 }
238
239 /// Sets the maximum number of concurrent event queues.
240 ///
241 /// When the limit is reached, new queue creation will return an error
242 /// reader (`None`) to signal capacity exhaustion.
243 #[must_use]
244 pub const fn with_max_concurrent_queues(mut self, max: usize) -> Self {
245 self.max_concurrent_queues = Some(max);
246 self
247 }
248
249 /// Returns the writer for the given task, creating a new queue if none
250 /// exists.
251 ///
252 /// If a queue already exists, the returned reader is `None` (the original
253 /// reader was given out at creation time). If a new queue is created, both
254 /// the writer and reader are returned.
255 ///
256 /// If `max_concurrent_queues` is set and the limit is reached, returns
257 /// the writer with `None` reader (same as existing queue case).
258 pub async fn get_or_create(
259 &self,
260 task_id: &TaskId,
261 ) -> (Arc<InMemoryQueueWriter>, Option<InMemoryQueueReader>) {
262 let mut map = self.writers.write().await;
263 #[allow(clippy::option_if_let_else)]
264 let result = if let Some(existing) = map.get(task_id) {
265 (Arc::clone(existing), None)
266 } else if self
267 .max_concurrent_queues
268 .is_some_and(|max| map.len() >= max)
269 {
270 // Concurrent queue limit reached — create a disconnected writer
271 // so the caller gets an error when trying to use it.
272 let (writer, _reader) =
273 new_in_memory_queue_with_options(self.capacity, self.max_event_size);
274 (Arc::new(writer), None)
275 } else {
276 let (writer, reader) =
277 new_in_memory_queue_with_options(self.capacity, self.max_event_size);
278 let writer = Arc::new(writer);
279 map.insert(task_id.clone(), Arc::clone(&writer));
280 (writer, Some(reader))
281 };
282 drop(map);
283 result
284 }
285
286 /// Removes and drops the event queue for the given task.
287 pub async fn destroy(&self, task_id: &TaskId) {
288 let mut map = self.writers.write().await;
289 map.remove(task_id);
290 drop(map);
291 }
292
293 /// Returns the number of active event queues.
294 pub async fn active_count(&self) -> usize {
295 let map = self.writers.read().await;
296 map.len()
297 }
298
299 /// Removes all event queues, causing all readers to see EOF.
300 pub async fn destroy_all(&self) {
301 let mut map = self.writers.write().await;
302 map.clear();
303 }
304}