Skip to main content

sparrow/runtime/
event_bus.rs

1use tokio::sync::broadcast;
2
3use crate::event::Event;
4
5// ─── Event bus ──────────────────────────────────────────────────────────────────
6
7/// Centralized pub/sub event distribution.
8/// Every surface and the recorder subscribe to this.
9pub struct EventBus {
10    /// Global broadcast channel for all events
11    tx: broadcast::Sender<Event>,
12}
13
14impl EventBus {
15    pub fn new(capacity: usize) -> Self {
16        let (tx, _) = broadcast::channel(capacity);
17        Self { tx }
18    }
19
20    /// Publish an event to all subscribers
21    pub fn publish(&self, event: Event) {
22        let _ = self.tx.send(event);
23    }
24
25    /// Subscribe to all events
26    pub fn subscribe_all(&self) -> broadcast::Receiver<Event> {
27        self.tx.subscribe()
28    }
29
30    /// Subscribe with a filter (by run_id or event type)
31    pub fn subscribe_filtered(&self, _filter: &EventFilter) -> broadcast::Receiver<Event> {
32        // For M4, we return the global channel.
33        // Filtering happens on the receiver side.
34        self.tx.subscribe()
35    }
36
37    /// Create a filtered subscription by run_id
38    pub fn subscribe_run(&self, _run_id: &crate::event::RunId) -> broadcast::Receiver<Event> {
39        self.tx.subscribe()
40        // Note: actual filtering is done by the receiver checking event.run field
41    }
42
43    /// Active subscriber count
44    pub fn subscriber_count(&self) -> usize {
45        self.tx.receiver_count()
46    }
47}
48
49impl Default for EventBus {
50    fn default() -> Self {
51        Self::new(1024)
52    }
53}
54
55impl Clone for EventBus {
56    fn clone(&self) -> Self {
57        Self {
58            tx: self.tx.clone(),
59        }
60    }
61}
62
63// ─── Event filter ───────────────────────────────────────────────────────────────
64
65#[derive(Debug, Clone)]
66pub struct EventFilter {
67    pub run_id: Option<String>,
68    pub event_types: Vec<String>,
69}
70
71impl EventFilter {
72    pub fn by_run(run_id: &str) -> Self {
73        Self {
74            run_id: Some(run_id.to_string()),
75            event_types: vec![],
76        }
77    }
78
79    pub fn matches(&self, event: &Event) -> bool {
80        // Check run_id filter
81        if let Some(ref rid) = self.run_id {
82            let event_run = match event {
83                Event::RunStarted { run, .. } => &run.0,
84                Event::RouteSelected { run, .. } => &run.0,
85                Event::ModelSwitched { run, .. } => &run.0,
86                Event::ThinkingDelta { run, .. } => &run.0,
87                Event::ReasoningDelta { run, .. } => &run.0,
88                Event::Message { run, .. } => &run.0,
89                Event::ToolUseProposed { run, .. } => &run.0,
90                Event::ApprovalRequested { run, .. } => &run.0,
91                Event::ApprovalResolved { run, .. } => &run.0,
92                Event::ToolUseStarted { run, .. } => &run.0,
93                Event::ToolOutput { run, .. } => &run.0,
94                Event::DiffProposed { run, .. } => &run.0,
95                Event::DiffApplied { run, .. } => &run.0,
96                Event::TestResult { run, .. } => &run.0,
97                Event::AgentSpawned { run, .. } => &run.0,
98                Event::AgentStatus { run, .. } => &run.0,
99                Event::CheckpointCreated { run, .. } => &run.0,
100                Event::SkillLearned { run, .. } => &run.0,
101                Event::CostUpdate { run, .. } => &run.0,
102                Event::TokenUsage { run, .. } => &run.0,
103                Event::TokenUsageEstimated { run, .. } => &run.0,
104                Event::AutonomyChanged { run, .. } => &run.0,
105                Event::RunFinished { run, .. } => &run.0,
106                Event::Error { run, .. } => &run.0,
107                Event::Compacted { run, .. } => &run.0,
108            };
109            if event_run != rid {
110                return false;
111            }
112        }
113
114        // Check event type filter
115        if !self.event_types.is_empty() {
116            let event_type = match event {
117                Event::RunStarted { .. } => "RunStarted",
118                Event::RouteSelected { .. } => "RouteSelected",
119                Event::ModelSwitched { .. } => "ModelSwitched",
120                Event::ThinkingDelta { .. } => "ThinkingDelta",
121                Event::ReasoningDelta { .. } => "ReasoningDelta",
122                Event::Message { .. } => "Message",
123                Event::ToolUseProposed { .. } => "ToolUseProposed",
124                Event::ApprovalRequested { .. } => "ApprovalRequested",
125                Event::ApprovalResolved { .. } => "ApprovalResolved",
126                Event::ToolUseStarted { .. } => "ToolUseStarted",
127                Event::ToolOutput { .. } => "ToolOutput",
128                Event::DiffProposed { .. } => "DiffProposed",
129                Event::DiffApplied { .. } => "DiffApplied",
130                Event::TestResult { .. } => "TestResult",
131                Event::AgentSpawned { .. } => "AgentSpawned",
132                Event::AgentStatus { .. } => "AgentStatus",
133                Event::CheckpointCreated { .. } => "CheckpointCreated",
134                Event::SkillLearned { .. } => "SkillLearned",
135                Event::CostUpdate { .. } => "CostUpdate",
136                Event::TokenUsage { .. } => "TokenUsage",
137                Event::TokenUsageEstimated { .. } => "TokenUsageEstimated",
138                Event::AutonomyChanged { .. } => "AutonomyChanged",
139                Event::RunFinished { .. } => "RunFinished",
140                Event::Error { .. } => "Error",
141                Event::Compacted { .. } => "Compacted",
142            };
143            if !self.event_types.contains(&event_type.to_string()) {
144                return false;
145            }
146        }
147
148        true
149    }
150}