1use crate::{Event, Hat, HatId};
8use std::collections::HashMap;
9
10type Observer = Box<dyn Fn(&Event) + Send + 'static>;
12
13#[derive(Default)]
15pub struct EventBus {
16 hats: HashMap<HatId, Hat>,
18
19 pending: HashMap<HatId, Vec<Event>>,
21
22 observers: Vec<Observer>,
25}
26
27impl EventBus {
28 pub fn new() -> Self {
30 Self::default()
31 }
32
33 pub fn add_observer<F>(&mut self, observer: F)
40 where
41 F: Fn(&Event) + Send + 'static,
42 {
43 self.observers.push(Box::new(observer));
44 }
45
46 #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
51 pub fn set_observer<F>(&mut self, observer: F)
52 where
53 F: Fn(&Event) + Send + 'static,
54 {
55 self.observers.clear();
56 self.observers.push(Box::new(observer));
57 }
58
59 pub fn clear_observers(&mut self) {
61 self.observers.clear();
62 }
63
64 pub fn register(&mut self, hat: Hat) {
66 let id = hat.id.clone();
67 self.hats.insert(id.clone(), hat);
68 self.pending.entry(id).or_default();
69 }
70
71 #[allow(clippy::needless_pass_by_value)] pub fn publish(&mut self, event: Event) -> Vec<HatId> {
77 for observer in &self.observers {
79 observer(&event);
80 }
81
82 let mut recipients = Vec::new();
83
84 if let Some(ref target) = event.target {
86 if self.hats.contains_key(target) {
87 self.pending
88 .entry(target.clone())
89 .or_default()
90 .push(event.clone());
91 recipients.push(target.clone());
92 }
93 return recipients;
94 }
95
96 let mut specific_recipients = Vec::new();
102 let mut fallback_recipients = Vec::new();
103
104 for (id, hat) in &self.hats {
105 if hat.has_specific_subscription(&event.topic) {
106 specific_recipients.push(id.clone());
108 } else if hat.is_subscribed(&event.topic) {
109 fallback_recipients.push(id.clone());
111 }
112 }
113
114 let chosen_recipients = if specific_recipients.is_empty() {
116 fallback_recipients
117 } else {
118 specific_recipients
119 };
120
121 for id in chosen_recipients {
122 self.pending
123 .entry(id.clone())
124 .or_default()
125 .push(event.clone());
126 recipients.push(id);
127 }
128
129 recipients
130 }
131
132 pub fn take_pending(&mut self, hat_id: &HatId) -> Vec<Event> {
134 self.pending.remove(hat_id).unwrap_or_default()
135 }
136
137 pub fn peek_pending(&self, hat_id: &HatId) -> Option<&Vec<Event>> {
139 self.pending.get(hat_id)
140 }
141
142 pub fn has_pending(&self) -> bool {
144 self.pending.values().any(|events| !events.is_empty())
145 }
146
147 pub fn next_hat_with_pending(&self) -> Option<&HatId> {
149 self.pending
150 .iter()
151 .find(|(_, events)| !events.is_empty())
152 .map(|(id, _)| id)
153 }
154
155 pub fn get_hat(&self, id: &HatId) -> Option<&Hat> {
157 self.hats.get(id)
158 }
159
160 pub fn hat_ids(&self) -> impl Iterator<Item = &HatId> {
162 self.hats.keys()
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169
170 #[test]
171 fn test_publish_to_subscriber() {
172 let mut bus = EventBus::new();
173
174 let hat = Hat::new("impl", "Implementer").subscribe("task.*");
175 bus.register(hat);
176
177 let event = Event::new("task.start", "Start implementing");
178 let recipients = bus.publish(event);
179
180 assert_eq!(recipients.len(), 1);
181 assert_eq!(recipients[0].as_str(), "impl");
182 }
183
184 #[test]
185 fn test_no_match() {
186 let mut bus = EventBus::new();
187
188 let hat = Hat::new("impl", "Implementer").subscribe("task.*");
189 bus.register(hat);
190
191 let event = Event::new("review.done", "Review complete");
192 let recipients = bus.publish(event);
193
194 assert!(recipients.is_empty());
195 }
196
197 #[test]
198 fn test_direct_target() {
199 let mut bus = EventBus::new();
200
201 let impl_hat = Hat::new("impl", "Implementer").subscribe("task.*");
202 let review_hat = Hat::new("reviewer", "Reviewer").subscribe("impl.*");
203 bus.register(impl_hat);
204 bus.register(review_hat);
205
206 let event = Event::new("handoff", "Please review").with_target("reviewer");
208 let recipients = bus.publish(event);
209
210 assert_eq!(recipients.len(), 1);
211 assert_eq!(recipients[0].as_str(), "reviewer");
212 }
213
214 #[test]
215 fn test_take_pending() {
216 let mut bus = EventBus::new();
217
218 let hat = Hat::new("impl", "Implementer").subscribe("*");
219 bus.register(hat);
220
221 bus.publish(Event::new("task.start", "Start"));
222 bus.publish(Event::new("task.continue", "Continue"));
223
224 let hat_id = HatId::new("impl");
225 let events = bus.take_pending(&hat_id);
226
227 assert_eq!(events.len(), 2);
228 assert!(bus.take_pending(&hat_id).is_empty());
229 }
230
231 #[test]
232 fn test_self_routing_allowed() {
233 let mut bus = EventBus::new();
237
238 let planner = Hat::new("planner", "Planner").subscribe("build.done");
239 bus.register(planner);
240
241 let event = Event::new("build.done", "Done").with_source("planner");
243 let recipients = bus.publish(event);
244
245 assert_eq!(recipients.len(), 1);
247 assert_eq!(recipients[0].as_str(), "planner");
248 }
249
250 #[test]
251 fn test_observer_receives_all_events() {
252 use std::sync::{Arc, Mutex};
253
254 let mut bus = EventBus::new();
255 let observed: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
256
257 let observed_clone = Arc::clone(&observed);
258 bus.add_observer(move |event| {
259 observed_clone.lock().unwrap().push(event.payload.clone());
260 });
261
262 let hat = Hat::new("impl", "Implementer").subscribe("task.*");
263 bus.register(hat);
264
265 bus.publish(Event::new("task.start", "Start"));
267 bus.publish(Event::new("other.event", "Other")); bus.publish(Event::new("task.done", "Done"));
269
270 let captured = observed.lock().unwrap();
271 assert_eq!(captured.len(), 3);
272 assert_eq!(captured[0], "Start");
273 assert_eq!(captured[1], "Other");
274 assert_eq!(captured[2], "Done");
275 }
276
277 #[test]
278 fn test_multiple_observers() {
279 use std::sync::{Arc, Mutex};
280
281 let mut bus = EventBus::new();
282 let observer1_count = Arc::new(Mutex::new(0));
283 let observer2_count = Arc::new(Mutex::new(0));
284
285 let count1 = Arc::clone(&observer1_count);
286 bus.add_observer(move |_| {
287 *count1.lock().unwrap() += 1;
288 });
289
290 let count2 = Arc::clone(&observer2_count);
291 bus.add_observer(move |_| {
292 *count2.lock().unwrap() += 1;
293 });
294
295 bus.publish(Event::new("test", "1"));
296 bus.publish(Event::new("test", "2"));
297
298 assert_eq!(*observer1_count.lock().unwrap(), 2);
300 assert_eq!(*observer2_count.lock().unwrap(), 2);
301 }
302
303 #[test]
304 fn test_clear_observers() {
305 use std::sync::{Arc, Mutex};
306
307 let mut bus = EventBus::new();
308 let count = Arc::new(Mutex::new(0));
309
310 let count_clone = Arc::clone(&count);
311 bus.add_observer(move |_| {
312 *count_clone.lock().unwrap() += 1;
313 });
314
315 bus.publish(Event::new("test", "1"));
316 assert_eq!(*count.lock().unwrap(), 1);
317
318 bus.clear_observers();
319 bus.publish(Event::new("test", "2"));
320 assert_eq!(*count.lock().unwrap(), 1); }
322
323 #[test]
324 fn test_peek_pending_does_not_consume() {
325 let mut bus = EventBus::new();
326
327 let hat = Hat::new("impl", "Implementer").subscribe("*");
328 bus.register(hat);
329
330 bus.publish(Event::new("task.start", "Start"));
331 bus.publish(Event::new("task.continue", "Continue"));
332
333 let hat_id = HatId::new("impl");
334
335 let peeked = bus.peek_pending(&hat_id);
337 assert!(peeked.is_some());
338 assert_eq!(peeked.unwrap().len(), 2);
339
340 let peeked_again = bus.peek_pending(&hat_id);
342 assert!(peeked_again.is_some());
343 assert_eq!(peeked_again.unwrap().len(), 2);
344
345 let taken = bus.take_pending(&hat_id);
347 assert_eq!(taken.len(), 2);
348
349 let peeked_after_take = bus.peek_pending(&hat_id);
351 assert!(peeked_after_take.is_none() || peeked_after_take.unwrap().is_empty());
352 }
353}