a2a_protocol_server/streaming/event_queue/
in_memory.rs1use std::future::Future;
19use std::pin::Pin;
20
21use a2a_protocol_types::error::{A2aError, A2aResult};
22use a2a_protocol_types::events::StreamResponse;
23use tokio::sync::{broadcast, mpsc};
24
25use super::{EventQueueReader, EventQueueWriter};
26
27struct CountingWriter(usize);
33
34impl std::io::Write for CountingWriter {
35 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
36 self.0 += buf.len();
37 Ok(buf.len())
38 }
39
40 fn flush(&mut self) -> std::io::Result<()> {
41 Ok(())
42 }
43}
44
45#[derive(Debug, Clone)]
57pub struct InMemoryQueueWriter {
58 tx: broadcast::Sender<A2aResult<StreamResponse>>,
59 persistence_tx: Option<mpsc::Sender<A2aResult<StreamResponse>>>,
63 max_event_size: usize,
65 #[allow(dead_code)]
67 write_timeout: std::time::Duration,
68}
69
70impl InMemoryQueueWriter {
71 pub(super) const fn new(
73 tx: broadcast::Sender<A2aResult<StreamResponse>>,
74 max_event_size: usize,
75 write_timeout: std::time::Duration,
76 ) -> Self {
77 Self {
78 tx,
79 persistence_tx: None,
80 max_event_size,
81 write_timeout,
82 }
83 }
84
85 pub(super) const fn new_with_persistence(
87 tx: broadcast::Sender<A2aResult<StreamResponse>>,
88 persistence_tx: mpsc::Sender<A2aResult<StreamResponse>>,
89 max_event_size: usize,
90 write_timeout: std::time::Duration,
91 ) -> Self {
92 Self {
93 tx,
94 persistence_tx: Some(persistence_tx),
95 max_event_size,
96 write_timeout,
97 }
98 }
99
100 #[must_use]
105 pub fn subscribe(&self) -> InMemoryQueueReader {
106 InMemoryQueueReader::new(self.tx.subscribe())
107 }
108
109 pub(crate) fn raw_subscribe(&self) -> broadcast::Receiver<A2aResult<StreamResponse>> {
114 self.tx.subscribe()
115 }
116}
117
118#[allow(clippy::manual_async_fn)]
119impl EventQueueWriter for InMemoryQueueWriter {
120 fn write<'a>(
121 &'a self,
122 event: StreamResponse,
123 ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
124 Box::pin(async move {
125 let serialized_size = {
130 let mut counter = CountingWriter(0);
131 serde_json::to_writer(&mut counter, &event)
132 .map_err(|e| A2aError::internal(format!("event serialization failed: {e}")))?;
133 counter.0
134 };
135 if serialized_size > self.max_event_size {
136 return Err(A2aError::internal(format!(
137 "event size {serialized_size} bytes exceeds maximum {} bytes",
138 self.max_event_size
139 )));
140 }
141 if let Some(ref persistence_tx) = self.persistence_tx {
144 if let Err(_e) = persistence_tx.send(Ok(event.clone())).await {
145 trace_warn!("persistence channel closed, event not persisted");
146 }
147 }
148 self.tx
149 .send(Ok(event))
150 .map(|_| ())
151 .map_err(|_| A2aError::internal("event queue: no active receivers"))
152 })
153 }
154
155 fn close<'a>(&'a self) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
156 Box::pin(async move {
157 Ok(())
160 })
161 }
162}
163
164#[derive(Debug)]
175pub struct InMemoryQueueReader {
176 rx: broadcast::Receiver<A2aResult<StreamResponse>>,
177 pending_first: Option<A2aResult<StreamResponse>>,
178}
179
180impl InMemoryQueueReader {
181 pub(crate) const fn new(rx: broadcast::Receiver<A2aResult<StreamResponse>>) -> Self {
183 Self {
184 rx,
185 pending_first: None,
186 }
187 }
188
189 pub fn set_first_event(&mut self, event: StreamResponse) {
191 self.pending_first = Some(Ok(event));
192 }
193
194 pub(crate) const fn with_first_event(
196 rx: broadcast::Receiver<A2aResult<StreamResponse>>,
197 first: StreamResponse,
198 ) -> Self {
199 Self {
200 rx,
201 pending_first: Some(Ok(first)),
202 }
203 }
204}
205
206impl EventQueueReader for InMemoryQueueReader {
207 fn read(
208 &mut self,
209 ) -> Pin<Box<dyn Future<Output = Option<A2aResult<StreamResponse>>> + Send + '_>> {
210 Box::pin(async move {
211 if let Some(first) = self.pending_first.take() {
214 return Some(first);
215 }
216 loop {
217 match self.rx.recv().await {
218 Ok(event) => return Some(event),
219 Err(broadcast::error::RecvError::Lagged(_n)) => {
220 trace_warn!(
221 dropped_events = _n,
222 "event queue reader lagged, {_n} events skipped"
223 );
224 }
225 Err(broadcast::error::RecvError::Closed) => return None,
226 }
227 }
228 })
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::streaming::event_queue::{
236 new_in_memory_queue, new_in_memory_queue_with_options, DEFAULT_MAX_EVENT_SIZE,
237 DEFAULT_WRITE_TIMEOUT,
238 };
239 use a2a_protocol_types::events::{StreamResponse, TaskStatusUpdateEvent};
240 use a2a_protocol_types::task::{ContextId, TaskId, TaskState, TaskStatus};
241
242 fn make_status_event(task_id: &str, state: TaskState) -> StreamResponse {
244 StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
245 task_id: TaskId::new(task_id),
246 context_id: ContextId::new("ctx-test"),
247 status: TaskStatus {
248 state,
249 message: None,
250 timestamp: None,
251 },
252 metadata: None,
253 })
254 }
255
256 #[tokio::test]
259 async fn write_then_read_single_event() {
260 let (writer, mut reader) = new_in_memory_queue();
261 let event = make_status_event("t1", TaskState::Working);
262
263 writer.write(event).await.expect("write should succeed");
264 drop(writer);
265
266 let received = reader.read().await;
267 assert!(received.is_some(), "reader should return the written event");
268 let result = received.unwrap();
269 let event = result.expect("event should be Ok");
270 match &event {
271 StreamResponse::StatusUpdate(evt) => {
272 assert_eq!(
273 evt.status.state,
274 TaskState::Working,
275 "should be Working event"
276 );
277 }
278 other => panic!("expected StatusUpdate, got: {other:?}"),
279 }
280
281 let eof = reader.read().await;
283 assert!(
284 eof.is_none(),
285 "reader should return None after writer is dropped"
286 );
287 }
288
289 #[tokio::test]
290 async fn write_multiple_events_read_in_order() {
291 let (writer, mut reader) = new_in_memory_queue();
292
293 let e1 = make_status_event("t1", TaskState::Working);
294 let e2 = make_status_event("t1", TaskState::Completed);
295
296 writer.write(e1).await.expect("first write should succeed");
297 writer.write(e2).await.expect("second write should succeed");
298 drop(writer);
299
300 let r1 = reader.read().await.expect("should read first event");
302 let sr1 = r1.expect("first event should be Ok");
303 match &sr1 {
304 StreamResponse::StatusUpdate(evt) => {
305 assert_eq!(
306 evt.status.state,
307 TaskState::Working,
308 "first event should be Working"
309 );
310 }
311 other => panic!("expected StatusUpdate, got: {other:?}"),
312 }
313
314 let r2 = reader.read().await.expect("should read second event");
316 let sr2 = r2.expect("second event should be Ok");
317 match &sr2 {
318 StreamResponse::StatusUpdate(evt) => {
319 assert_eq!(
320 evt.status.state,
321 TaskState::Completed,
322 "second event should be Completed"
323 );
324 }
325 other => panic!("expected StatusUpdate, got: {other:?}"),
326 }
327
328 assert!(
330 reader.read().await.is_none(),
331 "should be EOF after all events"
332 );
333 }
334
335 #[tokio::test]
338 async fn read_returns_none_on_empty_closed_queue() {
339 let (writer, mut reader) = new_in_memory_queue();
340 drop(writer); let result = reader.read().await;
343 assert!(
344 result.is_none(),
345 "reading from an empty closed queue should return None"
346 );
347 }
348
349 #[tokio::test]
350 async fn write_after_all_readers_dropped_returns_error() {
351 let (writer, reader) = new_in_memory_queue();
352 drop(reader);
353
354 let result = writer
355 .write(make_status_event("t1", TaskState::Working))
356 .await;
357 assert!(
358 result.is_err(),
359 "writing with no active receivers should return an error"
360 );
361 }
362
363 #[tokio::test]
364 async fn close_is_no_op_and_succeeds() {
365 let (writer, _reader) = new_in_memory_queue();
366 let result = writer.close().await;
367 assert!(result.is_ok(), "close() should succeed");
368 }
369
370 #[tokio::test]
373 async fn subscribe_creates_independent_reader() {
374 let (writer, mut reader1) = new_in_memory_queue();
375 let mut reader2 = writer.subscribe();
376
377 let event = make_status_event("t1", TaskState::Working);
378 writer.write(event).await.expect("write should succeed");
379 drop(writer);
380
381 let r1 = reader1.read().await;
383 assert!(r1.is_some(), "reader1 should receive the event");
384
385 let r2 = reader2.read().await;
386 assert!(r2.is_some(), "reader2 should receive the event");
387
388 assert!(reader1.read().await.is_none(), "reader1 should see EOF");
390 assert!(reader2.read().await.is_none(), "reader2 should see EOF");
391 }
392
393 #[tokio::test]
394 async fn subscriber_only_sees_events_after_subscribe() {
395 let (writer, mut reader1) = new_in_memory_queue();
396
397 writer
399 .write(make_status_event("t1", TaskState::Submitted))
400 .await
401 .expect("write should succeed");
402
403 let mut reader2 = writer.subscribe();
405
406 writer
408 .write(make_status_event("t1", TaskState::Working))
409 .await
410 .expect("write should succeed");
411 drop(writer);
412
413 let r1a = reader1
415 .read()
416 .await
417 .expect("reader1 should see first event");
418 let evt1a = r1a.expect("first event should be Ok");
419 assert!(
420 matches!(&evt1a, StreamResponse::StatusUpdate(e) if e.status.state == TaskState::Submitted),
421 "reader1 first event should be Submitted"
422 );
423 let r1b = reader1
424 .read()
425 .await
426 .expect("reader1 should see second event");
427 let evt_1b = r1b.expect("second event should be Ok");
428 assert!(
429 matches!(&evt_1b, StreamResponse::StatusUpdate(e) if e.status.state == TaskState::Working),
430 "reader1 second event should be Working"
431 );
432 assert!(reader1.read().await.is_none());
433
434 let r2a = reader2
436 .read()
437 .await
438 .expect("reader2 should see second event");
439 let evt2a = r2a.expect("event should be Ok");
440 assert!(
441 matches!(&evt2a, StreamResponse::StatusUpdate(e) if e.status.state == TaskState::Working),
442 "reader2 should see Working event"
443 );
444 assert!(
445 reader2.read().await.is_none(),
446 "reader2 should see EOF after the one event it received"
447 );
448 }
449
450 #[tokio::test]
453 async fn oversized_event_is_rejected() {
454 let (writer, _reader) = new_in_memory_queue_with_options(
456 16,
457 10, DEFAULT_WRITE_TIMEOUT,
459 );
460
461 let event = make_status_event("t1", TaskState::Working);
462 let result = writer.write(event).await;
463 assert!(
464 result.is_err(),
465 "event exceeding max_event_size should be rejected"
466 );
467 let err = result.unwrap_err();
468 let msg = format!("{err}");
469 assert!(
470 msg.contains("exceeds maximum"),
471 "error message should mention size limit, got: {msg}"
472 );
473 }
474
475 #[test]
477 fn counting_writer_flush_is_noop() {
478 use std::io::Write;
479 let mut cw = super::CountingWriter(0);
480 cw.write_all(b"hello").unwrap();
481 assert_eq!(cw.0, 5);
482 cw.flush().unwrap();
484 assert_eq!(cw.0, 5, "flush should not change the count");
485 }
486
487 #[tokio::test]
488 async fn event_within_size_limit_is_accepted() {
489 let (writer, mut reader) =
491 new_in_memory_queue_with_options(16, DEFAULT_MAX_EVENT_SIZE, DEFAULT_WRITE_TIMEOUT);
492
493 let event = make_status_event("t1", TaskState::Working);
494 writer
495 .write(event)
496 .await
497 .expect("event within size limit should be accepted");
498 drop(writer);
499
500 let r = reader.read().await;
501 assert!(r.is_some(), "reader should receive the event");
502 }
503}