Skip to main content

a2a_protocol_server/streaming/
event_queue.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Tom F.
3
4//! Event queue for server-side streaming.
5//!
6//! The executor writes [`StreamResponse`] events to an [`EventQueueWriter`];
7//! the HTTP layer reads them from an [`EventQueueReader`] and serializes them
8//! as SSE frames.
9//!
10//! [`InMemoryQueueWriter`] and [`InMemoryQueueReader`] are backed by a
11//! bounded `tokio::sync::mpsc` channel.
12
13use std::collections::HashMap;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::Arc;
17
18use a2a_protocol_types::error::{A2aError, A2aResult};
19use a2a_protocol_types::events::StreamResponse;
20use a2a_protocol_types::task::TaskId;
21use tokio::sync::{mpsc, RwLock};
22
23/// Default channel capacity for event queues.
24pub const DEFAULT_QUEUE_CAPACITY: usize = 64;
25
26/// Default maximum event size in bytes (16 MiB).
27pub const DEFAULT_MAX_EVENT_SIZE: usize = 16 * 1024 * 1024;
28
29/// Default write timeout for event queue sends (5 seconds).
30///
31/// Prevents executor from blocking indefinitely on a slow/disconnected client.
32pub const DEFAULT_WRITE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
33
34// ── EventQueueWriter ─────────────────────────────────────────────────────────
35
36/// Trait for writing streaming events.
37///
38/// Object-safe; used as `&dyn EventQueueWriter` in the executor.
39pub trait EventQueueWriter: Send + Sync + 'static {
40    /// Writes a streaming event to the queue.
41    ///
42    /// # Errors
43    ///
44    /// Returns an [`A2aError`] if the receiver has been dropped.
45    fn write<'a>(
46        &'a self,
47        event: StreamResponse,
48    ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>;
49
50    /// Signals that no more events will be written.
51    ///
52    /// # Errors
53    ///
54    /// Returns an [`A2aError`] if closing fails.
55    fn close<'a>(&'a self) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>;
56}
57
58// ── EventQueueReader ─────────────────────────────────────────────────────────
59
60/// Trait for reading streaming events.
61///
62/// NOT object-safe (used as a concrete type internally). The `async fn` is
63/// fine because this trait is never used behind `dyn`.
64pub trait EventQueueReader: Send + 'static {
65    /// Reads the next event, returning `None` when the stream is closed.
66    fn read(
67        &mut self,
68    ) -> Pin<Box<dyn Future<Output = Option<A2aResult<StreamResponse>>> + Send + '_>>;
69}
70
71// ── InMemoryQueueWriter ──────────────────────────────────────────────────────
72
73/// In-memory [`EventQueueWriter`] backed by an `mpsc` channel sender.
74///
75/// Enforces a maximum serialized event size to prevent OOM from oversized
76/// events written by executors, and a write timeout to prevent blocking
77/// indefinitely on slow clients (PR-1).
78#[derive(Debug, Clone)]
79pub struct InMemoryQueueWriter {
80    tx: mpsc::Sender<A2aResult<StreamResponse>>,
81    /// Maximum serialized event size in bytes.
82    max_event_size: usize,
83    /// Write timeout — prevents executor from blocking if client is slow.
84    write_timeout: std::time::Duration,
85}
86
87#[allow(clippy::manual_async_fn)]
88impl EventQueueWriter for InMemoryQueueWriter {
89    fn write<'a>(
90        &'a self,
91        event: StreamResponse,
92    ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
93        Box::pin(async move {
94            // Check serialized event size to prevent OOM from oversized events.
95            let serialized_size = serde_json::to_string(&event).map(|s| s.len()).unwrap_or(0);
96            if serialized_size > self.max_event_size {
97                return Err(A2aError::internal(format!(
98                    "event size {} bytes exceeds maximum {} bytes",
99                    serialized_size, self.max_event_size
100                )));
101            }
102            // Apply write timeout to prevent blocking on slow clients (PR-1).
103            tokio::time::timeout(self.write_timeout, self.tx.send(Ok(event)))
104                .await
105                .map_err(|_| A2aError::internal("event queue write timed out"))?
106                .map_err(|_| A2aError::internal("event queue receiver dropped"))
107        })
108    }
109
110    fn close<'a>(&'a self) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
111        Box::pin(async move {
112            // Dropping all senders closes the channel. Since `InMemoryQueueWriter`
113            // is `Clone`, we cannot truly close here — but we can drop our permit
114            // by sending nothing and letting the reader see `None` when all
115            // senders are dropped. As a convention, send a synthetic close signal
116            // isn't needed; the spawned task will drop its writer clone.
117            Ok(())
118        })
119    }
120}
121
122// ── InMemoryQueueReader ──────────────────────────────────────────────────────
123
124/// In-memory [`EventQueueReader`] backed by an `mpsc` channel receiver.
125#[derive(Debug)]
126pub struct InMemoryQueueReader {
127    rx: mpsc::Receiver<A2aResult<StreamResponse>>,
128}
129
130impl EventQueueReader for InMemoryQueueReader {
131    fn read(
132        &mut self,
133    ) -> Pin<Box<dyn Future<Output = Option<A2aResult<StreamResponse>>> + Send + '_>> {
134        Box::pin(async move { self.rx.recv().await })
135    }
136}
137
138// ── Constructor ──────────────────────────────────────────────────────────────
139
140/// Creates a new in-memory event queue pair with the default capacity and
141/// default max event size.
142#[must_use]
143pub fn new_in_memory_queue() -> (InMemoryQueueWriter, InMemoryQueueReader) {
144    new_in_memory_queue_with_options(DEFAULT_QUEUE_CAPACITY, DEFAULT_MAX_EVENT_SIZE)
145}
146
147/// Creates a new in-memory event queue pair with the specified capacity
148/// and default max event size.
149#[must_use]
150pub fn new_in_memory_queue_with_capacity(
151    capacity: usize,
152) -> (InMemoryQueueWriter, InMemoryQueueReader) {
153    new_in_memory_queue_with_options(capacity, DEFAULT_MAX_EVENT_SIZE)
154}
155
156/// Creates a new in-memory event queue pair with the specified capacity
157/// and maximum event size.
158#[must_use]
159pub fn new_in_memory_queue_with_options(
160    capacity: usize,
161    max_event_size: usize,
162) -> (InMemoryQueueWriter, InMemoryQueueReader) {
163    let (tx, rx) = mpsc::channel(capacity);
164    (
165        InMemoryQueueWriter {
166            tx,
167            max_event_size,
168            write_timeout: DEFAULT_WRITE_TIMEOUT,
169        },
170        InMemoryQueueReader { rx },
171    )
172}
173
174// ── EventQueueManager ────────────────────────────────────────────────────────
175
176/// Manages event queues for active tasks.
177///
178/// Each task can have at most one active writer. When a client subscribes
179/// (or resubscribes), the manager returns the existing writer and a fresh
180/// reader, or creates both if none exists.
181#[derive(Debug, Clone)]
182pub struct EventQueueManager {
183    writers: Arc<RwLock<HashMap<TaskId, Arc<InMemoryQueueWriter>>>>,
184    /// Channel capacity for new event queues.
185    capacity: usize,
186    /// Maximum serialized event size in bytes.
187    max_event_size: usize,
188    /// Maximum number of concurrent event queues. `None` means no limit.
189    max_concurrent_queues: Option<usize>,
190}
191
192impl Default for EventQueueManager {
193    fn default() -> Self {
194        Self {
195            writers: Arc::default(),
196            capacity: DEFAULT_QUEUE_CAPACITY,
197            max_event_size: DEFAULT_MAX_EVENT_SIZE,
198            max_concurrent_queues: None,
199        }
200    }
201}
202
203impl EventQueueManager {
204    /// Creates a new, empty event queue manager with default capacity.
205    ///
206    /// # Examples
207    ///
208    /// ```
209    /// use a2a_protocol_server::EventQueueManager;
210    ///
211    /// let manager = EventQueueManager::new();
212    /// ```
213    #[must_use]
214    pub fn new() -> Self {
215        Self::default()
216    }
217
218    /// Creates a new event queue manager with the specified channel capacity.
219    #[must_use]
220    pub fn with_capacity(capacity: usize) -> Self {
221        Self {
222            writers: Arc::default(),
223            capacity,
224            max_event_size: DEFAULT_MAX_EVENT_SIZE,
225            max_concurrent_queues: None,
226        }
227    }
228
229    /// Creates a new event queue manager with the specified maximum event size.
230    ///
231    /// Events exceeding this size (in serialized bytes) will be rejected with
232    /// an error to prevent OOM conditions.
233    #[must_use]
234    pub const fn with_max_event_size(mut self, max_event_size: usize) -> Self {
235        self.max_event_size = max_event_size;
236        self
237    }
238
239    /// Sets the maximum number of concurrent event queues.
240    ///
241    /// When the limit is reached, new queue creation will return an error
242    /// reader (`None`) to signal capacity exhaustion.
243    #[must_use]
244    pub const fn with_max_concurrent_queues(mut self, max: usize) -> Self {
245        self.max_concurrent_queues = Some(max);
246        self
247    }
248
249    /// Returns the writer for the given task, creating a new queue if none
250    /// exists.
251    ///
252    /// If a queue already exists, the returned reader is `None` (the original
253    /// reader was given out at creation time). If a new queue is created, both
254    /// the writer and reader are returned.
255    ///
256    /// If `max_concurrent_queues` is set and the limit is reached, returns
257    /// the writer with `None` reader (same as existing queue case).
258    pub async fn get_or_create(
259        &self,
260        task_id: &TaskId,
261    ) -> (Arc<InMemoryQueueWriter>, Option<InMemoryQueueReader>) {
262        let mut map = self.writers.write().await;
263        #[allow(clippy::option_if_let_else)]
264        let result = if let Some(existing) = map.get(task_id) {
265            (Arc::clone(existing), None)
266        } else if self
267            .max_concurrent_queues
268            .is_some_and(|max| map.len() >= max)
269        {
270            // Concurrent queue limit reached — create a disconnected writer
271            // so the caller gets an error when trying to use it.
272            let (writer, _reader) =
273                new_in_memory_queue_with_options(self.capacity, self.max_event_size);
274            (Arc::new(writer), None)
275        } else {
276            let (writer, reader) =
277                new_in_memory_queue_with_options(self.capacity, self.max_event_size);
278            let writer = Arc::new(writer);
279            map.insert(task_id.clone(), Arc::clone(&writer));
280            (writer, Some(reader))
281        };
282        drop(map);
283        result
284    }
285
286    /// Removes and drops the event queue for the given task.
287    pub async fn destroy(&self, task_id: &TaskId) {
288        let mut map = self.writers.write().await;
289        map.remove(task_id);
290        drop(map);
291    }
292
293    /// Returns the number of active event queues.
294    pub async fn active_count(&self) -> usize {
295        let map = self.writers.read().await;
296        map.len()
297    }
298
299    /// Removes all event queues, causing all readers to see EOF.
300    pub async fn destroy_all(&self) {
301        let mut map = self.writers.write().await;
302        map.clear();
303    }
304}