a2a_protocol_server/streaming/event_queue/
in_memory.rs1use std::future::Future;
19use std::pin::Pin;
20
21use a2a_protocol_types::error::{A2aError, A2aResult};
22use a2a_protocol_types::events::StreamResponse;
23use tokio::sync::{broadcast, mpsc};
24
25use super::{EventQueueReader, EventQueueWriter};
26
27struct CountingWriter(usize);
33
34impl std::io::Write for CountingWriter {
35 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
36 self.0 += buf.len();
37 Ok(buf.len())
38 }
39
40 fn flush(&mut self) -> std::io::Result<()> {
41 Ok(())
42 }
43}
44
45#[derive(Debug, Clone)]
57pub struct InMemoryQueueWriter {
58 tx: broadcast::Sender<A2aResult<StreamResponse>>,
59 persistence_tx: Option<mpsc::Sender<A2aResult<StreamResponse>>>,
63 max_event_size: usize,
65 #[allow(dead_code)]
67 write_timeout: std::time::Duration,
68}
69
70impl InMemoryQueueWriter {
71 pub(super) const fn new(
73 tx: broadcast::Sender<A2aResult<StreamResponse>>,
74 max_event_size: usize,
75 write_timeout: std::time::Duration,
76 ) -> Self {
77 Self {
78 tx,
79 persistence_tx: None,
80 max_event_size,
81 write_timeout,
82 }
83 }
84
85 pub(super) const fn new_with_persistence(
87 tx: broadcast::Sender<A2aResult<StreamResponse>>,
88 persistence_tx: mpsc::Sender<A2aResult<StreamResponse>>,
89 max_event_size: usize,
90 write_timeout: std::time::Duration,
91 ) -> Self {
92 Self {
93 tx,
94 persistence_tx: Some(persistence_tx),
95 max_event_size,
96 write_timeout,
97 }
98 }
99
100 #[must_use]
105 pub fn subscribe(&self) -> InMemoryQueueReader {
106 InMemoryQueueReader::new(self.tx.subscribe())
107 }
108
109 pub(crate) fn raw_subscribe(&self) -> broadcast::Receiver<A2aResult<StreamResponse>> {
114 self.tx.subscribe()
115 }
116}
117
118#[allow(clippy::manual_async_fn)]
119impl EventQueueWriter for InMemoryQueueWriter {
120 fn write<'a>(
121 &'a self,
122 event: StreamResponse,
123 ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
124 Box::pin(async move {
125 let serialized_size = {
130 let mut counter = CountingWriter(0);
131 serde_json::to_writer(&mut counter, &event)
132 .map_err(|e| A2aError::internal(format!("event serialization failed: {e}")))?;
133 counter.0
134 };
135 if serialized_size > self.max_event_size {
136 return Err(A2aError::internal(format!(
137 "event size {serialized_size} bytes exceeds maximum {} bytes",
138 self.max_event_size
139 )));
140 }
141 if let Some(ref persistence_tx) = self.persistence_tx {
144 if let Err(_e) = persistence_tx.send(Ok(event.clone())).await {
145 trace_warn!("persistence channel closed, event not persisted");
146 }
147 }
148 match self.tx.send(Ok(event)) {
157 Ok(_) => Ok(()),
158 Err(_) if self.persistence_tx.is_some() => {
159 trace_warn!("no live event subscribers; event persisted only");
160 Ok(())
161 }
162 Err(_) => Err(A2aError::internal("event queue: no active receivers")),
163 }
164 })
165 }
166
167 fn close<'a>(&'a self) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
168 Box::pin(async move {
169 Ok(())
172 })
173 }
174}
175
176#[derive(Debug)]
187pub struct InMemoryQueueReader {
188 rx: broadcast::Receiver<A2aResult<StreamResponse>>,
189 pending_first: Option<A2aResult<StreamResponse>>,
190}
191
192impl InMemoryQueueReader {
193 pub(crate) const fn new(rx: broadcast::Receiver<A2aResult<StreamResponse>>) -> Self {
195 Self {
196 rx,
197 pending_first: None,
198 }
199 }
200
201 pub fn set_first_event(&mut self, event: StreamResponse) {
203 self.pending_first = Some(Ok(event));
204 }
205
206 pub(crate) const fn with_first_event(
208 rx: broadcast::Receiver<A2aResult<StreamResponse>>,
209 first: StreamResponse,
210 ) -> Self {
211 Self {
212 rx,
213 pending_first: Some(Ok(first)),
214 }
215 }
216}
217
218impl EventQueueReader for InMemoryQueueReader {
219 fn read(
220 &mut self,
221 ) -> Pin<Box<dyn Future<Output = Option<A2aResult<StreamResponse>>> + Send + '_>> {
222 Box::pin(async move {
223 if let Some(first) = self.pending_first.take() {
226 return Some(first);
227 }
228 loop {
229 match self.rx.recv().await {
230 Ok(event) => return Some(event),
231 Err(broadcast::error::RecvError::Lagged(_n)) => {
232 trace_warn!(
233 dropped_events = _n,
234 "event queue reader lagged, {_n} events skipped"
235 );
236 }
237 Err(broadcast::error::RecvError::Closed) => return None,
238 }
239 }
240 })
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247 use crate::streaming::event_queue::{
248 new_in_memory_queue, new_in_memory_queue_with_options, DEFAULT_MAX_EVENT_SIZE,
249 DEFAULT_WRITE_TIMEOUT,
250 };
251 use a2a_protocol_types::events::{StreamResponse, TaskStatusUpdateEvent};
252 use a2a_protocol_types::task::{ContextId, TaskId, TaskState, TaskStatus};
253
254 fn make_status_event(task_id: &str, state: TaskState) -> StreamResponse {
256 StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
257 task_id: TaskId::new(task_id),
258 context_id: ContextId::new("ctx-test"),
259 status: TaskStatus {
260 state,
261 message: None,
262 timestamp: None,
263 },
264 metadata: None,
265 })
266 }
267
268 #[tokio::test]
276 async fn write_with_no_subscribers_succeeds_when_persistence_attached() {
277 let (writer, reader, mut persistence_rx) =
278 crate::streaming::event_queue::new_in_memory_queue_with_persistence(
279 8,
280 1024 * 1024,
281 std::time::Duration::from_secs(1),
282 );
283 drop(reader); writer
286 .write(make_status_event("t1", TaskState::Working))
287 .await
288 .expect("write must succeed with persistence attached");
289
290 let persisted = persistence_rx
291 .recv()
292 .await
293 .expect("persistence channel should have the event")
294 .expect("event should be Ok");
295 match persisted {
296 StreamResponse::StatusUpdate(evt) => {
297 assert_eq!(evt.status.state, TaskState::Working);
298 }
299 other => panic!("expected StatusUpdate, got: {other:?}"),
300 }
301 }
302
303 #[tokio::test]
307 async fn write_with_no_subscribers_fails_without_persistence() {
308 let (writer, reader) = new_in_memory_queue();
309 drop(reader);
310
311 let result = writer
312 .write(make_status_event("t1", TaskState::Working))
313 .await;
314 assert!(
315 result.is_err(),
316 "sync-mode write with no receivers must fail"
317 );
318 }
319
320 #[tokio::test]
321 async fn write_then_read_single_event() {
322 let (writer, mut reader) = new_in_memory_queue();
323 let event = make_status_event("t1", TaskState::Working);
324
325 writer.write(event).await.expect("write should succeed");
326 drop(writer);
327
328 let received = reader.read().await;
329 assert!(received.is_some(), "reader should return the written event");
330 let result = received.unwrap();
331 let event = result.expect("event should be Ok");
332 match &event {
333 StreamResponse::StatusUpdate(evt) => {
334 assert_eq!(
335 evt.status.state,
336 TaskState::Working,
337 "should be Working event"
338 );
339 }
340 other => panic!("expected StatusUpdate, got: {other:?}"),
341 }
342
343 let eof = reader.read().await;
345 assert!(
346 eof.is_none(),
347 "reader should return None after writer is dropped"
348 );
349 }
350
351 #[tokio::test]
352 async fn write_multiple_events_read_in_order() {
353 let (writer, mut reader) = new_in_memory_queue();
354
355 let e1 = make_status_event("t1", TaskState::Working);
356 let e2 = make_status_event("t1", TaskState::Completed);
357
358 writer.write(e1).await.expect("first write should succeed");
359 writer.write(e2).await.expect("second write should succeed");
360 drop(writer);
361
362 let r1 = reader.read().await.expect("should read first event");
364 let sr1 = r1.expect("first event should be Ok");
365 match &sr1 {
366 StreamResponse::StatusUpdate(evt) => {
367 assert_eq!(
368 evt.status.state,
369 TaskState::Working,
370 "first event should be Working"
371 );
372 }
373 other => panic!("expected StatusUpdate, got: {other:?}"),
374 }
375
376 let r2 = reader.read().await.expect("should read second event");
378 let sr2 = r2.expect("second event should be Ok");
379 match &sr2 {
380 StreamResponse::StatusUpdate(evt) => {
381 assert_eq!(
382 evt.status.state,
383 TaskState::Completed,
384 "second event should be Completed"
385 );
386 }
387 other => panic!("expected StatusUpdate, got: {other:?}"),
388 }
389
390 assert!(
392 reader.read().await.is_none(),
393 "should be EOF after all events"
394 );
395 }
396
397 #[tokio::test]
400 async fn read_returns_none_on_empty_closed_queue() {
401 let (writer, mut reader) = new_in_memory_queue();
402 drop(writer); let result = reader.read().await;
405 assert!(
406 result.is_none(),
407 "reading from an empty closed queue should return None"
408 );
409 }
410
411 #[tokio::test]
412 async fn write_after_all_readers_dropped_returns_error() {
413 let (writer, reader) = new_in_memory_queue();
414 drop(reader);
415
416 let result = writer
417 .write(make_status_event("t1", TaskState::Working))
418 .await;
419 assert!(
420 result.is_err(),
421 "writing with no active receivers should return an error"
422 );
423 }
424
425 #[tokio::test]
426 async fn close_is_no_op_and_succeeds() {
427 let (writer, _reader) = new_in_memory_queue();
428 let result = writer.close().await;
429 assert!(result.is_ok(), "close() should succeed");
430 }
431
432 #[tokio::test]
435 async fn subscribe_creates_independent_reader() {
436 let (writer, mut reader1) = new_in_memory_queue();
437 let mut reader2 = writer.subscribe();
438
439 let event = make_status_event("t1", TaskState::Working);
440 writer.write(event).await.expect("write should succeed");
441 drop(writer);
442
443 let r1 = reader1.read().await;
445 assert!(r1.is_some(), "reader1 should receive the event");
446
447 let r2 = reader2.read().await;
448 assert!(r2.is_some(), "reader2 should receive the event");
449
450 assert!(reader1.read().await.is_none(), "reader1 should see EOF");
452 assert!(reader2.read().await.is_none(), "reader2 should see EOF");
453 }
454
455 #[tokio::test]
456 async fn subscriber_only_sees_events_after_subscribe() {
457 let (writer, mut reader1) = new_in_memory_queue();
458
459 writer
461 .write(make_status_event("t1", TaskState::Submitted))
462 .await
463 .expect("write should succeed");
464
465 let mut reader2 = writer.subscribe();
467
468 writer
470 .write(make_status_event("t1", TaskState::Working))
471 .await
472 .expect("write should succeed");
473 drop(writer);
474
475 let r1a = reader1
477 .read()
478 .await
479 .expect("reader1 should see first event");
480 let evt1a = r1a.expect("first event should be Ok");
481 assert!(
482 matches!(&evt1a, StreamResponse::StatusUpdate(e) if e.status.state == TaskState::Submitted),
483 "reader1 first event should be Submitted"
484 );
485 let r1b = reader1
486 .read()
487 .await
488 .expect("reader1 should see second event");
489 let evt_1b = r1b.expect("second event should be Ok");
490 assert!(
491 matches!(&evt_1b, StreamResponse::StatusUpdate(e) if e.status.state == TaskState::Working),
492 "reader1 second event should be Working"
493 );
494 assert!(reader1.read().await.is_none());
495
496 let r2a = reader2
498 .read()
499 .await
500 .expect("reader2 should see second event");
501 let evt2a = r2a.expect("event should be Ok");
502 assert!(
503 matches!(&evt2a, StreamResponse::StatusUpdate(e) if e.status.state == TaskState::Working),
504 "reader2 should see Working event"
505 );
506 assert!(
507 reader2.read().await.is_none(),
508 "reader2 should see EOF after the one event it received"
509 );
510 }
511
512 #[tokio::test]
515 async fn oversized_event_is_rejected() {
516 let (writer, _reader) = new_in_memory_queue_with_options(
518 16,
519 10, DEFAULT_WRITE_TIMEOUT,
521 );
522
523 let event = make_status_event("t1", TaskState::Working);
524 let result = writer.write(event).await;
525 assert!(
526 result.is_err(),
527 "event exceeding max_event_size should be rejected"
528 );
529 let err = result.unwrap_err();
530 let msg = format!("{err}");
531 assert!(
532 msg.contains("exceeds maximum"),
533 "error message should mention size limit, got: {msg}"
534 );
535 }
536
537 #[test]
539 fn counting_writer_flush_is_noop() {
540 use std::io::Write;
541 let mut cw = super::CountingWriter(0);
542 cw.write_all(b"hello").unwrap();
543 assert_eq!(cw.0, 5);
544 cw.flush().unwrap();
546 assert_eq!(cw.0, 5, "flush should not change the count");
547 }
548
549 #[tokio::test]
550 async fn event_within_size_limit_is_accepted() {
551 let (writer, mut reader) =
553 new_in_memory_queue_with_options(16, DEFAULT_MAX_EVENT_SIZE, DEFAULT_WRITE_TIMEOUT);
554
555 let event = make_status_event("t1", TaskState::Working);
556 writer
557 .write(event)
558 .await
559 .expect("event within size limit should be accepted");
560 drop(writer);
561
562 let r = reader.read().await;
563 assert!(r.is_some(), "reader should receive the event");
564 }
565}