use somatize_core::event::Event;
use tokio::sync::broadcast;
pub struct EventBus {
sender: broadcast::Sender<Event>,
}
impl EventBus {
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self { sender }
}
pub fn emit(&self, event: Event) -> usize {
self.sender.send(event).unwrap_or(0)
}
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.sender.subscribe()
}
pub fn subscriber_count(&self) -> usize {
self.sender.receiver_count()
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new(256)
}
}
#[cfg(test)]
mod tests {
use super::*;
use somatize_core::event::PlanSummary;
use std::time::Duration;
#[tokio::test]
async fn emit_without_subscribers_succeeds() {
let bus = EventBus::new(16);
let count = bus.emit(Event::RunStarted {
run_id: "r1".into(),
plan_summary: PlanSummary {
total_nodes: 1,
cached_nodes: 0,
parallel_branches: 0,
},
});
assert_eq!(count, 0);
}
#[tokio::test]
async fn subscriber_receives_events() {
let bus = EventBus::new(16);
let mut rx = bus.subscribe();
bus.emit(Event::RunStarted {
run_id: "r1".into(),
plan_summary: PlanSummary {
total_nodes: 2,
cached_nodes: 0,
parallel_branches: 0,
},
});
bus.emit(Event::RunCompleted {
run_id: "r1".into(),
duration: Duration::from_millis(100),
});
let e1 = rx.recv().await.unwrap();
assert!(matches!(e1, Event::RunStarted { .. }));
let e2 = rx.recv().await.unwrap();
assert!(matches!(e2, Event::RunCompleted { .. }));
}
#[tokio::test]
async fn multiple_subscribers() {
let bus = EventBus::new(16);
let mut rx1 = bus.subscribe();
let mut rx2 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 2);
bus.emit(Event::RunCompleted {
run_id: "r1".into(),
duration: Duration::from_secs(1),
});
let e1 = rx1.recv().await.unwrap();
let e2 = rx2.recv().await.unwrap();
assert!(matches!(e1, Event::RunCompleted { .. }));
assert!(matches!(e2, Event::RunCompleted { .. }));
}
#[tokio::test]
async fn subscriber_after_emit_misses_earlier_events() {
let bus = EventBus::new(16);
bus.emit(Event::RunCompleted {
run_id: "r1".into(),
duration: Duration::from_secs(1),
});
let mut rx = bus.subscribe();
bus.emit(Event::RunCompleted {
run_id: "r2".into(),
duration: Duration::from_secs(2),
});
let event = rx.recv().await.unwrap();
if let Event::RunCompleted { run_id, .. } = event {
assert_eq!(run_id, "r2"); } else {
panic!("wrong event type");
}
}
}