autoagents_telemetry/
fanout.rs1use autoagents_core::utils::BoxEventStream;
2use autoagents_protocol::Event;
3use futures_util::StreamExt;
4use tokio::sync::broadcast;
5use tokio::task::JoinHandle;
6use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
7
8pub struct EventFanout {
10 tx: broadcast::Sender<Event>,
11 _task: JoinHandle<()>,
12}
13
14impl EventFanout {
15 pub fn new(mut event_stream: BoxEventStream<Event>, buffer: usize) -> Self {
17 let (tx, _) = broadcast::channel(buffer);
18 let tx_clone = tx.clone();
19 let task = tokio::spawn(async move {
20 while let Some(event) = event_stream.next().await {
21 let _ = tx_clone.send(event);
22 }
23 });
24
25 Self { tx, _task: task }
26 }
27
28 pub fn subscribe(&self) -> BoxEventStream<Event> {
30 let rx = self.tx.subscribe();
31 let stream = BroadcastStream::new(rx)
32 .filter_map(|item: Result<Event, BroadcastStreamRecvError>| async move { item.ok() });
33 Box::pin(stream)
34 }
35}
36
37#[cfg(test)]
38mod tests {
39 use super::*;
40 use futures_util::StreamExt;
41 use tokio_stream::iter;
42
43 #[tokio::test]
44 async fn test_event_fanout_forwards_events() {
45 let event = Event::TaskStarted {
46 sub_id: autoagents_protocol::SubmissionId::new_v4(),
47 actor_id: autoagents_protocol::ActorID::new_v4(),
48 actor_name: "agent".to_string(),
49 task_description: "task".to_string(),
50 };
51 let stream = Box::pin(iter(vec![event.clone()]));
52 let fanout = EventFanout::new(stream, 8);
53 let mut rx = fanout.subscribe();
54
55 let received = rx.next().await.expect("event");
56 match received {
57 Event::TaskStarted { actor_name, .. } => {
58 assert_eq!(actor_name, "agent");
59 }
60 other => panic!("unexpected event: {other:?}"),
61 }
62 }
63}