Skip to main content

autoagents_telemetry/
fanout.rs

1use autoagents_core::utils::BoxEventStream;
2use autoagents_protocol::Event;
3use futures_util::StreamExt;
4use tokio::sync::broadcast;
5use tokio::task::JoinHandle;
6use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
7
8/// Broadcasts a single event stream to multiple subscribers.
9pub struct EventFanout {
10    tx: broadcast::Sender<Event>,
11    _task: JoinHandle<()>,
12}
13
14impl EventFanout {
15    /// Spawn a background task that forwards events into a broadcast channel.
16    pub fn new(mut event_stream: BoxEventStream<Event>, buffer: usize) -> Self {
17        let (tx, _) = broadcast::channel(buffer);
18        let tx_clone = tx.clone();
19        let task = tokio::spawn(async move {
20            while let Some(event) = event_stream.next().await {
21                let _ = tx_clone.send(event);
22            }
23        });
24
25        Self { tx, _task: task }
26    }
27
28    /// Create a new stream receiver over the broadcast channel.
29    pub fn subscribe(&self) -> BoxEventStream<Event> {
30        let rx = self.tx.subscribe();
31        let stream = BroadcastStream::new(rx)
32            .filter_map(|item: Result<Event, BroadcastStreamRecvError>| async move { item.ok() });
33        Box::pin(stream)
34    }
35}
36
37#[cfg(test)]
38mod tests {
39    use super::*;
40    use futures_util::StreamExt;
41    use tokio_stream::iter;
42
43    #[tokio::test]
44    async fn test_event_fanout_forwards_events() {
45        let event = Event::TaskStarted {
46            sub_id: autoagents_protocol::SubmissionId::new_v4(),
47            actor_id: autoagents_protocol::ActorID::new_v4(),
48            actor_name: "agent".to_string(),
49            task_description: "task".to_string(),
50        };
51        let stream = Box::pin(iter(vec![event.clone()]));
52        let fanout = EventFanout::new(stream, 8);
53        let mut rx = fanout.subscribe();
54
55        let received = rx.next().await.expect("event");
56        match received {
57            Event::TaskStarted { actor_name, .. } => {
58                assert_eq!(actor_name, "agent");
59            }
60            other => panic!("unexpected event: {other:?}"),
61        }
62    }
63}