a2a_protocol_server/streaming/event_queue/
in_memory.rs1use std::future::Future;
19use std::pin::Pin;
20
21use a2a_protocol_types::error::{A2aError, A2aResult};
22use a2a_protocol_types::events::StreamResponse;
23use tokio::sync::{broadcast, mpsc};
24
25use super::{EventQueueReader, EventQueueWriter};
26
27struct CountingWriter(usize);
33
34impl std::io::Write for CountingWriter {
35 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
36 self.0 += buf.len();
37 Ok(buf.len())
38 }
39
40 fn flush(&mut self) -> std::io::Result<()> {
41 Ok(())
42 }
43}
44
45#[derive(Debug, Clone)]
57pub struct InMemoryQueueWriter {
58 tx: broadcast::Sender<A2aResult<StreamResponse>>,
59 persistence_tx: Option<mpsc::Sender<A2aResult<StreamResponse>>>,
63 max_event_size: usize,
65 #[allow(dead_code)]
67 write_timeout: std::time::Duration,
68}
69
70impl InMemoryQueueWriter {
71 pub(super) const fn new(
73 tx: broadcast::Sender<A2aResult<StreamResponse>>,
74 max_event_size: usize,
75 write_timeout: std::time::Duration,
76 ) -> Self {
77 Self {
78 tx,
79 persistence_tx: None,
80 max_event_size,
81 write_timeout,
82 }
83 }
84
85 pub(super) const fn new_with_persistence(
87 tx: broadcast::Sender<A2aResult<StreamResponse>>,
88 persistence_tx: mpsc::Sender<A2aResult<StreamResponse>>,
89 max_event_size: usize,
90 write_timeout: std::time::Duration,
91 ) -> Self {
92 Self {
93 tx,
94 persistence_tx: Some(persistence_tx),
95 max_event_size,
96 write_timeout,
97 }
98 }
99
100 #[must_use]
105 pub fn subscribe(&self) -> InMemoryQueueReader {
106 InMemoryQueueReader {
107 rx: self.tx.subscribe(),
108 }
109 }
110}
111
112#[allow(clippy::manual_async_fn)]
113impl EventQueueWriter for InMemoryQueueWriter {
114 fn write<'a>(
115 &'a self,
116 event: StreamResponse,
117 ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
118 Box::pin(async move {
119 let serialized_size = {
124 let mut counter = CountingWriter(0);
125 serde_json::to_writer(&mut counter, &event)
126 .map_err(|e| A2aError::internal(format!("event serialization failed: {e}")))?;
127 counter.0
128 };
129 if serialized_size > self.max_event_size {
130 return Err(A2aError::internal(format!(
131 "event size {serialized_size} bytes exceeds maximum {} bytes",
132 self.max_event_size
133 )));
134 }
135 if let Some(ref persistence_tx) = self.persistence_tx {
138 if let Err(_e) = persistence_tx.send(Ok(event.clone())).await {
139 trace_warn!("persistence channel closed, event not persisted");
140 }
141 }
142 self.tx
143 .send(Ok(event))
144 .map(|_| ())
145 .map_err(|_| A2aError::internal("event queue: no active receivers"))
146 })
147 }
148
149 fn close<'a>(&'a self) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
150 Box::pin(async move {
151 Ok(())
154 })
155 }
156}
157
158#[derive(Debug)]
165pub struct InMemoryQueueReader {
166 rx: broadcast::Receiver<A2aResult<StreamResponse>>,
167}
168
169impl InMemoryQueueReader {
170 pub(crate) const fn new(rx: broadcast::Receiver<A2aResult<StreamResponse>>) -> Self {
172 Self { rx }
173 }
174}
175
176impl EventQueueReader for InMemoryQueueReader {
177 fn read(
178 &mut self,
179 ) -> Pin<Box<dyn Future<Output = Option<A2aResult<StreamResponse>>> + Send + '_>> {
180 Box::pin(async move {
181 loop {
182 match self.rx.recv().await {
183 Ok(event) => return Some(event),
184 Err(broadcast::error::RecvError::Lagged(_n)) => {
185 trace_warn!(
186 dropped_events = _n,
187 "event queue reader lagged, {_n} events skipped"
188 );
189 }
190 Err(broadcast::error::RecvError::Closed) => return None,
191 }
192 }
193 })
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200 use crate::streaming::event_queue::{
201 new_in_memory_queue, new_in_memory_queue_with_options, DEFAULT_MAX_EVENT_SIZE,
202 DEFAULT_WRITE_TIMEOUT,
203 };
204 use a2a_protocol_types::events::{StreamResponse, TaskStatusUpdateEvent};
205 use a2a_protocol_types::task::{ContextId, TaskId, TaskState, TaskStatus};
206
207 fn make_status_event(task_id: &str, state: TaskState) -> StreamResponse {
209 StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
210 task_id: TaskId::new(task_id),
211 context_id: ContextId::new("ctx-test"),
212 status: TaskStatus {
213 state,
214 message: None,
215 timestamp: None,
216 },
217 metadata: None,
218 })
219 }
220
221 #[tokio::test]
224 async fn write_then_read_single_event() {
225 let (writer, mut reader) = new_in_memory_queue();
226 let event = make_status_event("t1", TaskState::Working);
227
228 writer.write(event).await.expect("write should succeed");
229 drop(writer);
230
231 let received = reader.read().await;
232 assert!(received.is_some(), "reader should return the written event");
233 let result = received.unwrap();
234 let event = result.expect("event should be Ok");
235 match &event {
236 StreamResponse::StatusUpdate(evt) => {
237 assert_eq!(
238 evt.status.state,
239 TaskState::Working,
240 "should be Working event"
241 );
242 }
243 other => panic!("expected StatusUpdate, got: {other:?}"),
244 }
245
246 let eof = reader.read().await;
248 assert!(
249 eof.is_none(),
250 "reader should return None after writer is dropped"
251 );
252 }
253
254 #[tokio::test]
255 async fn write_multiple_events_read_in_order() {
256 let (writer, mut reader) = new_in_memory_queue();
257
258 let e1 = make_status_event("t1", TaskState::Working);
259 let e2 = make_status_event("t1", TaskState::Completed);
260
261 writer.write(e1).await.expect("first write should succeed");
262 writer.write(e2).await.expect("second write should succeed");
263 drop(writer);
264
265 let r1 = reader.read().await.expect("should read first event");
267 let sr1 = r1.expect("first event should be Ok");
268 match &sr1 {
269 StreamResponse::StatusUpdate(evt) => {
270 assert_eq!(
271 evt.status.state,
272 TaskState::Working,
273 "first event should be Working"
274 );
275 }
276 other => panic!("expected StatusUpdate, got: {other:?}"),
277 }
278
279 let r2 = reader.read().await.expect("should read second event");
281 let sr2 = r2.expect("second event should be Ok");
282 match &sr2 {
283 StreamResponse::StatusUpdate(evt) => {
284 assert_eq!(
285 evt.status.state,
286 TaskState::Completed,
287 "second event should be Completed"
288 );
289 }
290 other => panic!("expected StatusUpdate, got: {other:?}"),
291 }
292
293 assert!(
295 reader.read().await.is_none(),
296 "should be EOF after all events"
297 );
298 }
299
300 #[tokio::test]
303 async fn read_returns_none_on_empty_closed_queue() {
304 let (writer, mut reader) = new_in_memory_queue();
305 drop(writer); let result = reader.read().await;
308 assert!(
309 result.is_none(),
310 "reading from an empty closed queue should return None"
311 );
312 }
313
314 #[tokio::test]
315 async fn write_after_all_readers_dropped_returns_error() {
316 let (writer, reader) = new_in_memory_queue();
317 drop(reader);
318
319 let result = writer
320 .write(make_status_event("t1", TaskState::Working))
321 .await;
322 assert!(
323 result.is_err(),
324 "writing with no active receivers should return an error"
325 );
326 }
327
328 #[tokio::test]
329 async fn close_is_no_op_and_succeeds() {
330 let (writer, _reader) = new_in_memory_queue();
331 let result = writer.close().await;
332 assert!(result.is_ok(), "close() should succeed");
333 }
334
335 #[tokio::test]
338 async fn subscribe_creates_independent_reader() {
339 let (writer, mut reader1) = new_in_memory_queue();
340 let mut reader2 = writer.subscribe();
341
342 let event = make_status_event("t1", TaskState::Working);
343 writer.write(event).await.expect("write should succeed");
344 drop(writer);
345
346 let r1 = reader1.read().await;
348 assert!(r1.is_some(), "reader1 should receive the event");
349
350 let r2 = reader2.read().await;
351 assert!(r2.is_some(), "reader2 should receive the event");
352
353 assert!(reader1.read().await.is_none(), "reader1 should see EOF");
355 assert!(reader2.read().await.is_none(), "reader2 should see EOF");
356 }
357
358 #[tokio::test]
359 async fn subscriber_only_sees_events_after_subscribe() {
360 let (writer, mut reader1) = new_in_memory_queue();
361
362 writer
364 .write(make_status_event("t1", TaskState::Submitted))
365 .await
366 .expect("write should succeed");
367
368 let mut reader2 = writer.subscribe();
370
371 writer
373 .write(make_status_event("t1", TaskState::Working))
374 .await
375 .expect("write should succeed");
376 drop(writer);
377
378 let r1a = reader1
380 .read()
381 .await
382 .expect("reader1 should see first event");
383 let evt1a = r1a.expect("first event should be Ok");
384 assert!(
385 matches!(&evt1a, StreamResponse::StatusUpdate(e) if e.status.state == TaskState::Submitted),
386 "reader1 first event should be Submitted"
387 );
388 let r1b = reader1
389 .read()
390 .await
391 .expect("reader1 should see second event");
392 let evt_1b = r1b.expect("second event should be Ok");
393 assert!(
394 matches!(&evt_1b, StreamResponse::StatusUpdate(e) if e.status.state == TaskState::Working),
395 "reader1 second event should be Working"
396 );
397 assert!(reader1.read().await.is_none());
398
399 let r2a = reader2
401 .read()
402 .await
403 .expect("reader2 should see second event");
404 let evt2a = r2a.expect("event should be Ok");
405 assert!(
406 matches!(&evt2a, StreamResponse::StatusUpdate(e) if e.status.state == TaskState::Working),
407 "reader2 should see Working event"
408 );
409 assert!(
410 reader2.read().await.is_none(),
411 "reader2 should see EOF after the one event it received"
412 );
413 }
414
415 #[tokio::test]
418 async fn oversized_event_is_rejected() {
419 let (writer, _reader) = new_in_memory_queue_with_options(
421 16,
422 10, DEFAULT_WRITE_TIMEOUT,
424 );
425
426 let event = make_status_event("t1", TaskState::Working);
427 let result = writer.write(event).await;
428 assert!(
429 result.is_err(),
430 "event exceeding max_event_size should be rejected"
431 );
432 let err = result.unwrap_err();
433 let msg = format!("{err}");
434 assert!(
435 msg.contains("exceeds maximum"),
436 "error message should mention size limit, got: {msg}"
437 );
438 }
439
440 #[test]
442 fn counting_writer_flush_is_noop() {
443 use std::io::Write;
444 let mut cw = super::CountingWriter(0);
445 cw.write_all(b"hello").unwrap();
446 assert_eq!(cw.0, 5);
447 cw.flush().unwrap();
449 assert_eq!(cw.0, 5, "flush should not change the count");
450 }
451
452 #[tokio::test]
453 async fn event_within_size_limit_is_accepted() {
454 let (writer, mut reader) =
456 new_in_memory_queue_with_options(16, DEFAULT_MAX_EVENT_SIZE, DEFAULT_WRITE_TIMEOUT);
457
458 let event = make_status_event("t1", TaskState::Working);
459 writer
460 .write(event)
461 .await
462 .expect("event within size limit should be accepted");
463 drop(writer);
464
465 let r = reader.read().await;
466 assert!(r.is_some(), "reader should receive the event");
467 }
468}