1use crate::{Event, Hat, HatId};
8use std::collections::BTreeMap;
9
10type Observer = Box<dyn Fn(&Event) + Send + 'static>;
12
13#[derive(Default)]
15pub struct EventBus {
16 hats: BTreeMap<HatId, Hat>,
18
19 pending: BTreeMap<HatId, Vec<Event>>,
21
22 human_pending: Vec<Event>,
24
25 observers: Vec<Observer>,
28}
29
30impl EventBus {
31 pub fn new() -> Self {
33 Self::default()
34 }
35
36 pub fn add_observer<F>(&mut self, observer: F)
43 where
44 F: Fn(&Event) + Send + 'static,
45 {
46 self.observers.push(Box::new(observer));
47 }
48
49 #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
54 pub fn set_observer<F>(&mut self, observer: F)
55 where
56 F: Fn(&Event) + Send + 'static,
57 {
58 self.observers.clear();
59 self.observers.push(Box::new(observer));
60 }
61
62 pub fn clear_observers(&mut self) {
64 self.observers.clear();
65 }
66
67 pub fn register(&mut self, hat: Hat) {
69 let id = hat.id.clone();
70 self.hats.insert(id.clone(), hat);
71 self.pending.entry(id).or_default();
72 }
73
74 #[allow(clippy::needless_pass_by_value)] pub fn publish(&mut self, event: Event) -> Vec<HatId> {
80 for observer in &self.observers {
82 observer(&event);
83 }
84
85 if event.topic.as_str().starts_with("human.") {
86 self.human_pending.push(event);
87 return Vec::new();
88 }
89
90 let mut recipients = Vec::new();
91
92 if let Some(ref target) = event.target {
94 if self.hats.contains_key(target) {
95 self.pending
96 .entry(target.clone())
97 .or_default()
98 .push(event.clone());
99 recipients.push(target.clone());
100 }
101 return recipients;
102 }
103
104 let mut specific_recipients = Vec::new();
110 let mut fallback_recipients = Vec::new();
111
112 for (id, hat) in &self.hats {
113 if hat.has_specific_subscription(&event.topic) {
114 specific_recipients.push(id.clone());
116 } else if hat.is_subscribed(&event.topic) {
117 fallback_recipients.push(id.clone());
119 }
120 }
121
122 let chosen_recipients = if specific_recipients.is_empty() {
124 fallback_recipients
125 } else {
126 specific_recipients
127 };
128
129 for id in chosen_recipients {
130 self.pending
131 .entry(id.clone())
132 .or_default()
133 .push(event.clone());
134 recipients.push(id);
135 }
136
137 recipients
138 }
139
140 pub fn take_pending(&mut self, hat_id: &HatId) -> Vec<Event> {
142 self.pending.remove(hat_id).unwrap_or_default()
143 }
144
145 pub fn take_human_pending(&mut self) -> Vec<Event> {
147 std::mem::take(&mut self.human_pending)
148 }
149
150 pub fn peek_pending(&self, hat_id: &HatId) -> Option<&Vec<Event>> {
152 self.pending.get(hat_id)
153 }
154
155 pub fn peek_human_pending(&self) -> &[Event] {
157 &self.human_pending
158 }
159
160 pub fn has_pending(&self) -> bool {
162 !self.human_pending.is_empty() || self.pending.values().any(|events| !events.is_empty())
163 }
164
165 pub fn has_human_pending(&self) -> bool {
167 !self.human_pending.is_empty()
168 }
169
170 pub fn next_hat_with_pending(&self) -> Option<&HatId> {
173 self.pending
174 .iter()
175 .find(|(_, events)| !events.is_empty())
176 .map(|(id, _)| id)
177 }
178
179 pub fn get_hat(&self, id: &HatId) -> Option<&Hat> {
181 self.hats.get(id)
182 }
183
184 pub fn hat_ids(&self) -> impl Iterator<Item = &HatId> {
186 self.hats.keys()
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
195 fn test_publish_to_subscriber() {
196 let mut bus = EventBus::new();
197
198 let hat = Hat::new("impl", "Implementer").subscribe("task.*");
199 bus.register(hat);
200
201 let event = Event::new("task.start", "Start implementing");
202 let recipients = bus.publish(event);
203
204 assert_eq!(recipients.len(), 1);
205 assert_eq!(recipients[0].as_str(), "impl");
206 }
207
208 #[test]
209 fn test_no_match() {
210 let mut bus = EventBus::new();
211
212 let hat = Hat::new("impl", "Implementer").subscribe("task.*");
213 bus.register(hat);
214
215 let event = Event::new("review.done", "Review complete");
216 let recipients = bus.publish(event);
217
218 assert!(recipients.is_empty());
219 }
220
221 #[test]
222 fn test_direct_target() {
223 let mut bus = EventBus::new();
224
225 let impl_hat = Hat::new("impl", "Implementer").subscribe("task.*");
226 let review_hat = Hat::new("reviewer", "Reviewer").subscribe("impl.*");
227 bus.register(impl_hat);
228 bus.register(review_hat);
229
230 let event = Event::new("handoff", "Please review").with_target("reviewer");
232 let recipients = bus.publish(event);
233
234 assert_eq!(recipients.len(), 1);
235 assert_eq!(recipients[0].as_str(), "reviewer");
236 }
237
238 #[test]
239 fn test_take_pending() {
240 let mut bus = EventBus::new();
241
242 let hat = Hat::new("impl", "Implementer").subscribe("*");
243 bus.register(hat);
244
245 bus.publish(Event::new("task.start", "Start"));
246 bus.publish(Event::new("task.continue", "Continue"));
247
248 let hat_id = HatId::new("impl");
249 let events = bus.take_pending(&hat_id);
250
251 assert_eq!(events.len(), 2);
252 assert!(bus.take_pending(&hat_id).is_empty());
253 }
254
255 #[test]
256 fn test_human_events_use_separate_queue() {
257 let mut bus = EventBus::new();
258
259 let hat = Hat::new("ralph", "Ralph").subscribe("*");
260 bus.register(hat);
261
262 bus.publish(Event::new("human.interact", "question"));
263 bus.publish(Event::new("human.response", "hello"));
264 bus.publish(Event::new("human.guidance", "note"));
265
266 assert_eq!(bus.peek_human_pending().len(), 3);
267 assert_eq!(
268 bus.peek_pending(&HatId::new("ralph"))
269 .map(|events| events.len())
270 .unwrap_or(0),
271 0
272 );
273
274 let taken = bus.take_human_pending();
275 assert_eq!(taken.len(), 3);
276 assert!(!bus.has_human_pending());
277 }
278
279 #[test]
280 fn test_self_routing_allowed() {
281 let mut bus = EventBus::new();
285
286 let planner = Hat::new("planner", "Planner").subscribe("build.done");
287 bus.register(planner);
288
289 let event = Event::new("build.done", "Done").with_source("planner");
291 let recipients = bus.publish(event);
292
293 assert_eq!(recipients.len(), 1);
295 assert_eq!(recipients[0].as_str(), "planner");
296 }
297
298 #[test]
299 fn test_observer_receives_all_events() {
300 use std::sync::{Arc, Mutex};
301
302 let mut bus = EventBus::new();
303 let observed: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
304
305 let observed_clone = Arc::clone(&observed);
306 bus.add_observer(move |event| {
307 observed_clone.lock().unwrap().push(event.payload.clone());
308 });
309
310 let hat = Hat::new("impl", "Implementer").subscribe("task.*");
311 bus.register(hat);
312
313 bus.publish(Event::new("task.start", "Start"));
315 bus.publish(Event::new("other.event", "Other")); bus.publish(Event::new("task.done", "Done"));
317
318 let captured = observed.lock().unwrap();
319 assert_eq!(captured.len(), 3);
320 assert_eq!(captured[0], "Start");
321 assert_eq!(captured[1], "Other");
322 assert_eq!(captured[2], "Done");
323 }
324
325 #[test]
326 fn test_multiple_observers() {
327 use std::sync::{Arc, Mutex};
328
329 let mut bus = EventBus::new();
330 let observer1_count = Arc::new(Mutex::new(0));
331 let observer2_count = Arc::new(Mutex::new(0));
332
333 let count1 = Arc::clone(&observer1_count);
334 bus.add_observer(move |_| {
335 *count1.lock().unwrap() += 1;
336 });
337
338 let count2 = Arc::clone(&observer2_count);
339 bus.add_observer(move |_| {
340 *count2.lock().unwrap() += 1;
341 });
342
343 bus.publish(Event::new("test", "1"));
344 bus.publish(Event::new("test", "2"));
345
346 assert_eq!(*observer1_count.lock().unwrap(), 2);
348 assert_eq!(*observer2_count.lock().unwrap(), 2);
349 }
350
351 #[test]
352 fn test_clear_observers() {
353 use std::sync::{Arc, Mutex};
354
355 let mut bus = EventBus::new();
356 let count = Arc::new(Mutex::new(0));
357
358 let count_clone = Arc::clone(&count);
359 bus.add_observer(move |_| {
360 *count_clone.lock().unwrap() += 1;
361 });
362
363 bus.publish(Event::new("test", "1"));
364 assert_eq!(*count.lock().unwrap(), 1);
365
366 bus.clear_observers();
367 bus.publish(Event::new("test", "2"));
368 assert_eq!(*count.lock().unwrap(), 1); }
370
371 #[test]
372 fn test_peek_pending_does_not_consume() {
373 let mut bus = EventBus::new();
374
375 let hat = Hat::new("impl", "Implementer").subscribe("*");
376 bus.register(hat);
377
378 bus.publish(Event::new("task.start", "Start"));
379 bus.publish(Event::new("task.continue", "Continue"));
380
381 let hat_id = HatId::new("impl");
382
383 let peeked = bus.peek_pending(&hat_id);
385 assert!(peeked.is_some());
386 assert_eq!(peeked.unwrap().len(), 2);
387
388 let peeked_again = bus.peek_pending(&hat_id);
390 assert!(peeked_again.is_some());
391 assert_eq!(peeked_again.unwrap().len(), 2);
392
393 let taken = bus.take_pending(&hat_id);
395 assert_eq!(taken.len(), 2);
396
397 let peeked_after_take = bus.peek_pending(&hat_id);
399 assert!(peeked_after_take.is_none() || peeked_after_take.unwrap().is_empty());
400 }
401}