1use tokio::sync::broadcast;
2
3use crate::event::Event;
4
5pub struct EventBus {
10 tx: broadcast::Sender<Event>,
12}
13
14impl EventBus {
15 pub fn new(capacity: usize) -> Self {
16 let (tx, _) = broadcast::channel(capacity);
17 Self { tx }
18 }
19
20 pub fn publish(&self, event: Event) {
22 let _ = self.tx.send(event);
23 }
24
25 pub fn subscribe_all(&self) -> broadcast::Receiver<Event> {
27 self.tx.subscribe()
28 }
29
30 pub fn subscribe_filtered(&self, _filter: &EventFilter) -> broadcast::Receiver<Event> {
32 self.tx.subscribe()
35 }
36
37 pub fn subscribe_run(&self, _run_id: &crate::event::RunId) -> broadcast::Receiver<Event> {
39 self.tx.subscribe()
40 }
42
43 pub fn subscriber_count(&self) -> usize {
45 self.tx.receiver_count()
46 }
47}
48
49impl Default for EventBus {
50 fn default() -> Self {
51 Self::new(1024)
52 }
53}
54
55impl Clone for EventBus {
56 fn clone(&self) -> Self {
57 Self {
58 tx: self.tx.clone(),
59 }
60 }
61}
62
63#[derive(Debug, Clone)]
66pub struct EventFilter {
67 pub run_id: Option<String>,
68 pub event_types: Vec<String>,
69}
70
71impl EventFilter {
72 pub fn by_run(run_id: &str) -> Self {
73 Self {
74 run_id: Some(run_id.to_string()),
75 event_types: vec![],
76 }
77 }
78
79 pub fn matches(&self, event: &Event) -> bool {
80 if let Some(ref rid) = self.run_id {
82 let event_run = match event {
83 Event::RunStarted { run, .. } => &run.0,
84 Event::RouteSelected { run, .. } => &run.0,
85 Event::ModelSwitched { run, .. } => &run.0,
86 Event::ThinkingDelta { run, .. } => &run.0,
87 Event::ReasoningDelta { run, .. } => &run.0,
88 Event::Message { run, .. } => &run.0,
89 Event::ToolUseProposed { run, .. } => &run.0,
90 Event::ApprovalRequested { run, .. } => &run.0,
91 Event::ApprovalResolved { run, .. } => &run.0,
92 Event::ToolUseStarted { run, .. } => &run.0,
93 Event::ToolOutput { run, .. } => &run.0,
94 Event::DiffProposed { run, .. } => &run.0,
95 Event::DiffApplied { run, .. } => &run.0,
96 Event::TestResult { run, .. } => &run.0,
97 Event::AgentSpawned { run, .. } => &run.0,
98 Event::AgentStatus { run, .. } => &run.0,
99 Event::CheckpointCreated { run, .. } => &run.0,
100 Event::SkillLearned { run, .. } => &run.0,
101 Event::CostUpdate { run, .. } => &run.0,
102 Event::TokenUsage { run, .. } => &run.0,
103 Event::TokenUsageEstimated { run, .. } => &run.0,
104 Event::AutonomyChanged { run, .. } => &run.0,
105 Event::RunFinished { run, .. } => &run.0,
106 Event::Error { run, .. } => &run.0,
107 Event::Compacted { run, .. } => &run.0,
108 };
109 if event_run != rid {
110 return false;
111 }
112 }
113
114 if !self.event_types.is_empty() {
116 let event_type = match event {
117 Event::RunStarted { .. } => "RunStarted",
118 Event::RouteSelected { .. } => "RouteSelected",
119 Event::ModelSwitched { .. } => "ModelSwitched",
120 Event::ThinkingDelta { .. } => "ThinkingDelta",
121 Event::ReasoningDelta { .. } => "ReasoningDelta",
122 Event::Message { .. } => "Message",
123 Event::ToolUseProposed { .. } => "ToolUseProposed",
124 Event::ApprovalRequested { .. } => "ApprovalRequested",
125 Event::ApprovalResolved { .. } => "ApprovalResolved",
126 Event::ToolUseStarted { .. } => "ToolUseStarted",
127 Event::ToolOutput { .. } => "ToolOutput",
128 Event::DiffProposed { .. } => "DiffProposed",
129 Event::DiffApplied { .. } => "DiffApplied",
130 Event::TestResult { .. } => "TestResult",
131 Event::AgentSpawned { .. } => "AgentSpawned",
132 Event::AgentStatus { .. } => "AgentStatus",
133 Event::CheckpointCreated { .. } => "CheckpointCreated",
134 Event::SkillLearned { .. } => "SkillLearned",
135 Event::CostUpdate { .. } => "CostUpdate",
136 Event::TokenUsage { .. } => "TokenUsage",
137 Event::TokenUsageEstimated { .. } => "TokenUsageEstimated",
138 Event::AutonomyChanged { .. } => "AutonomyChanged",
139 Event::RunFinished { .. } => "RunFinished",
140 Event::Error { .. } => "Error",
141 Event::Compacted { .. } => "Compacted",
142 };
143 if !self.event_types.contains(&event_type.to_string()) {
144 return false;
145 }
146 }
147
148 true
149 }
150}