Skip to main content

somatize_runtime/
event_bus.rs

1//! Broadcast event bus for runtime observability.
2//!
3//! Emits [`Event`]s (node started/completed/failed, cache hits, run lifecycle)
4//! to all subscribers via a tokio broadcast channel.
5
6use somatize_core::event::Event;
7use tokio::sync::broadcast;
8
9/// Async event bus for broadcasting execution events to multiple subscribers.
10///
11/// Uses tokio's broadcast channel internally. Subscribers receive all events
12/// emitted after they subscribe. Events are cloned for each subscriber.
13pub struct EventBus {
14    sender: broadcast::Sender<Event>,
15}
16
17impl EventBus {
18    /// Create a new event bus with the given channel capacity.
19    pub fn new(capacity: usize) -> Self {
20        let (sender, _) = broadcast::channel(capacity);
21        Self { sender }
22    }
23
24    /// Emit an event to all subscribers.
25    /// Returns the number of receivers that received the event.
26    /// If there are no subscribers, the event is silently dropped.
27    pub fn emit(&self, event: Event) -> usize {
28        self.sender.send(event).unwrap_or(0)
29    }
30
31    /// Subscribe to receive events.
32    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
33        self.sender.subscribe()
34    }
35
36    /// Number of active subscribers.
37    pub fn subscriber_count(&self) -> usize {
38        self.sender.receiver_count()
39    }
40}
41
42impl Default for EventBus {
43    fn default() -> Self {
44        Self::new(256)
45    }
46}
47
48#[cfg(test)]
49mod tests {
50    use super::*;
51    use somatize_core::event::PlanSummary;
52    use std::time::Duration;
53
54    #[tokio::test]
55    async fn emit_without_subscribers_succeeds() {
56        let bus = EventBus::new(16);
57        let count = bus.emit(Event::RunStarted {
58            run_id: "r1".into(),
59            plan_summary: PlanSummary {
60                total_nodes: 1,
61                cached_nodes: 0,
62                parallel_branches: 0,
63            },
64        });
65        assert_eq!(count, 0);
66    }
67
68    #[tokio::test]
69    async fn subscriber_receives_events() {
70        let bus = EventBus::new(16);
71        let mut rx = bus.subscribe();
72
73        bus.emit(Event::RunStarted {
74            run_id: "r1".into(),
75            plan_summary: PlanSummary {
76                total_nodes: 2,
77                cached_nodes: 0,
78                parallel_branches: 0,
79            },
80        });
81        bus.emit(Event::RunCompleted {
82            run_id: "r1".into(),
83            duration: Duration::from_millis(100),
84        });
85
86        let e1 = rx.recv().await.unwrap();
87        assert!(matches!(e1, Event::RunStarted { .. }));
88
89        let e2 = rx.recv().await.unwrap();
90        assert!(matches!(e2, Event::RunCompleted { .. }));
91    }
92
93    #[tokio::test]
94    async fn multiple_subscribers() {
95        let bus = EventBus::new(16);
96        let mut rx1 = bus.subscribe();
97        let mut rx2 = bus.subscribe();
98
99        assert_eq!(bus.subscriber_count(), 2);
100
101        bus.emit(Event::RunCompleted {
102            run_id: "r1".into(),
103            duration: Duration::from_secs(1),
104        });
105
106        let e1 = rx1.recv().await.unwrap();
107        let e2 = rx2.recv().await.unwrap();
108        assert!(matches!(e1, Event::RunCompleted { .. }));
109        assert!(matches!(e2, Event::RunCompleted { .. }));
110    }
111
112    #[tokio::test]
113    async fn subscriber_after_emit_misses_earlier_events() {
114        let bus = EventBus::new(16);
115        bus.emit(Event::RunCompleted {
116            run_id: "r1".into(),
117            duration: Duration::from_secs(1),
118        });
119
120        let mut rx = bus.subscribe();
121        bus.emit(Event::RunCompleted {
122            run_id: "r2".into(),
123            duration: Duration::from_secs(2),
124        });
125
126        let event = rx.recv().await.unwrap();
127        if let Event::RunCompleted { run_id, .. } = event {
128            assert_eq!(run_id, "r2"); // only sees r2, not r1
129        } else {
130            panic!("wrong event type");
131        }
132    }
133}