Skip to main content

ralph_proto/
event_bus.rs

1//! Event bus for pub/sub messaging.
2//!
3//! The event bus routes events to subscribed hats based on topic patterns.
4//! Multiple observers can be added to receive all published events for
5//! recording, TUI updates, and benchmarking purposes.
6
7use crate::{Event, Hat, HatId};
8use std::collections::BTreeMap;
9
10/// Type alias for the observer callback function.
11type Observer = Box<dyn Fn(&Event) + Send + 'static>;
12
13/// Central pub/sub hub for routing events between hats.
14#[derive(Default)]
15pub struct EventBus {
16    /// Registered hats indexed by ID.
17    hats: BTreeMap<HatId, Hat>,
18
19    /// Pending events for each hat.
20    pending: BTreeMap<HatId, Vec<Event>>,
21
22    /// Pending human interaction events (human.*).
23    human_pending: Vec<Event>,
24
25    /// Observers that receive all published events.
26    /// Multiple observers can be registered (e.g., session recorder + TUI).
27    observers: Vec<Observer>,
28}
29
30impl EventBus {
31    /// Creates a new empty event bus.
32    pub fn new() -> Self {
33        Self::default()
34    }
35
36    /// Adds an observer that receives all published events.
37    ///
38    /// Multiple observers can be added (e.g., session recorder + TUI).
39    /// Each observer is called before events are routed to subscribers.
40    /// This enables recording sessions by subscribing to the event stream
41    /// without modifying the routing logic.
42    pub fn add_observer<F>(&mut self, observer: F)
43    where
44        F: Fn(&Event) + Send + 'static,
45    {
46        self.observers.push(Box::new(observer));
47    }
48
49    /// Sets a single observer, clearing any existing observers.
50    ///
51    /// Prefer `add_observer` when multiple observers are needed.
52    /// This method is kept for backwards compatibility.
53    #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
54    pub fn set_observer<F>(&mut self, observer: F)
55    where
56        F: Fn(&Event) + Send + 'static,
57    {
58        self.observers.clear();
59        self.observers.push(Box::new(observer));
60    }
61
62    /// Clears all observer callbacks.
63    pub fn clear_observers(&mut self) {
64        self.observers.clear();
65    }
66
67    /// Registers a hat with the event bus.
68    pub fn register(&mut self, hat: Hat) {
69        let id = hat.id.clone();
70        self.hats.insert(id.clone(), hat);
71        self.pending.entry(id).or_default();
72    }
73
74    /// Publishes an event to all subscribed hats.
75    ///
76    /// Returns the list of hat IDs that received the event.
77    /// If an observer is set, it receives the event before routing.
78    #[allow(clippy::needless_pass_by_value)] // Event is cloned to multiple recipients
79    pub fn publish(&mut self, event: Event) -> Vec<HatId> {
80        // Notify all observers before routing
81        for observer in &self.observers {
82            observer(&event);
83        }
84
85        if event.topic.as_str().starts_with("human.") {
86            self.human_pending.push(event);
87            return Vec::new();
88        }
89
90        let mut recipients = Vec::new();
91
92        // If there's a direct target, route only to that hat
93        if let Some(ref target) = event.target {
94            if self.hats.contains_key(target) {
95                self.pending
96                    .entry(target.clone())
97                    .or_default()
98                    .push(event.clone());
99                recipients.push(target.clone());
100            }
101            return recipients;
102        }
103
104        // Route with priority: specific subscriptions > fallback wildcards
105        // Per spec: "If event has subscriber → Select that hat's backend"
106        //           "If no subscriber → Select Ralph's backend (cli.backend)"
107
108        // First, find hats with specific (non-global-wildcard) subscriptions
109        let mut specific_recipients = Vec::new();
110        let mut fallback_recipients = Vec::new();
111
112        for (id, hat) in &self.hats {
113            if hat.has_specific_subscription(&event.topic) {
114                // Hat has a specific subscription for this topic
115                specific_recipients.push(id.clone());
116            } else if hat.is_subscribed(&event.topic) {
117                // Hat matches only via global wildcard (fallback)
118                fallback_recipients.push(id.clone());
119            }
120        }
121
122        // Use specific subscribers if any, otherwise fall back to wildcard handlers
123        let chosen_recipients = if specific_recipients.is_empty() {
124            fallback_recipients
125        } else {
126            specific_recipients
127        };
128
129        for id in chosen_recipients {
130            self.pending
131                .entry(id.clone())
132                .or_default()
133                .push(event.clone());
134            recipients.push(id);
135        }
136
137        recipients
138    }
139
140    /// Takes all pending events for a hat.
141    pub fn take_pending(&mut self, hat_id: &HatId) -> Vec<Event> {
142        self.pending.remove(hat_id).unwrap_or_default()
143    }
144
145    /// Takes all pending human interaction events.
146    pub fn take_human_pending(&mut self) -> Vec<Event> {
147        std::mem::take(&mut self.human_pending)
148    }
149
150    /// Returns a reference to pending events for a hat without consuming them.
151    pub fn peek_pending(&self, hat_id: &HatId) -> Option<&Vec<Event>> {
152        self.pending.get(hat_id)
153    }
154
155    /// Returns a reference to pending human interaction events without consuming them.
156    pub fn peek_human_pending(&self) -> &[Event] {
157        &self.human_pending
158    }
159
160    /// Checks if there are any pending events for any hat.
161    pub fn has_pending(&self) -> bool {
162        !self.human_pending.is_empty() || self.pending.values().any(|events| !events.is_empty())
163    }
164
165    /// Checks if there are any pending human interaction events.
166    pub fn has_human_pending(&self) -> bool {
167        !self.human_pending.is_empty()
168    }
169
170    /// Returns the next hat with pending events.
171    /// BTreeMap iteration is already sorted by key.
172    pub fn next_hat_with_pending(&self) -> Option<&HatId> {
173        self.pending
174            .iter()
175            .find(|(_, events)| !events.is_empty())
176            .map(|(id, _)| id)
177    }
178
179    /// Gets a hat by ID.
180    pub fn get_hat(&self, id: &HatId) -> Option<&Hat> {
181        self.hats.get(id)
182    }
183
184    /// Returns all registered hat IDs.
185    pub fn hat_ids(&self) -> impl Iterator<Item = &HatId> {
186        self.hats.keys()
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193
194    #[test]
195    fn test_publish_to_subscriber() {
196        let mut bus = EventBus::new();
197
198        let hat = Hat::new("impl", "Implementer").subscribe("task.*");
199        bus.register(hat);
200
201        let event = Event::new("task.start", "Start implementing");
202        let recipients = bus.publish(event);
203
204        assert_eq!(recipients.len(), 1);
205        assert_eq!(recipients[0].as_str(), "impl");
206    }
207
208    #[test]
209    fn test_no_match() {
210        let mut bus = EventBus::new();
211
212        let hat = Hat::new("impl", "Implementer").subscribe("task.*");
213        bus.register(hat);
214
215        let event = Event::new("review.done", "Review complete");
216        let recipients = bus.publish(event);
217
218        assert!(recipients.is_empty());
219    }
220
221    #[test]
222    fn test_direct_target() {
223        let mut bus = EventBus::new();
224
225        let impl_hat = Hat::new("impl", "Implementer").subscribe("task.*");
226        let review_hat = Hat::new("reviewer", "Reviewer").subscribe("impl.*");
227        bus.register(impl_hat);
228        bus.register(review_hat);
229
230        // Direct target bypasses subscription matching
231        let event = Event::new("handoff", "Please review").with_target("reviewer");
232        let recipients = bus.publish(event);
233
234        assert_eq!(recipients.len(), 1);
235        assert_eq!(recipients[0].as_str(), "reviewer");
236    }
237
238    #[test]
239    fn test_take_pending() {
240        let mut bus = EventBus::new();
241
242        let hat = Hat::new("impl", "Implementer").subscribe("*");
243        bus.register(hat);
244
245        bus.publish(Event::new("task.start", "Start"));
246        bus.publish(Event::new("task.continue", "Continue"));
247
248        let hat_id = HatId::new("impl");
249        let events = bus.take_pending(&hat_id);
250
251        assert_eq!(events.len(), 2);
252        assert!(bus.take_pending(&hat_id).is_empty());
253    }
254
255    #[test]
256    fn test_human_events_use_separate_queue() {
257        let mut bus = EventBus::new();
258
259        let hat = Hat::new("ralph", "Ralph").subscribe("*");
260        bus.register(hat);
261
262        bus.publish(Event::new("human.interact", "question"));
263        bus.publish(Event::new("human.response", "hello"));
264        bus.publish(Event::new("human.guidance", "note"));
265
266        assert_eq!(bus.peek_human_pending().len(), 3);
267        assert_eq!(
268            bus.peek_pending(&HatId::new("ralph"))
269                .map(|events| events.len())
270                .unwrap_or(0),
271            0
272        );
273
274        let taken = bus.take_human_pending();
275        assert_eq!(taken.len(), 3);
276        assert!(!bus.has_human_pending());
277    }
278
279    #[test]
280    fn test_self_routing_allowed() {
281        // Self-routing is allowed to handle LLM non-determinism.
282        // Spec acceptance criteria: planner emits build.done (even though builder "should"),
283        // event routes back to planner, planner continues (no source-based blocking).
284        let mut bus = EventBus::new();
285
286        let planner = Hat::new("planner", "Planner").subscribe("build.done");
287        bus.register(planner);
288
289        // Planner emits build.done (wrong hat, but LLMs are non-deterministic)
290        let event = Event::new("build.done", "Done").with_source("planner");
291        let recipients = bus.publish(event);
292
293        // Event SHOULD route back to planner (self-routing allowed, no source filtering)
294        assert_eq!(recipients.len(), 1);
295        assert_eq!(recipients[0].as_str(), "planner");
296    }
297
298    #[test]
299    fn test_observer_receives_all_events() {
300        use std::sync::{Arc, Mutex};
301
302        let mut bus = EventBus::new();
303        let observed: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
304
305        let observed_clone = Arc::clone(&observed);
306        bus.add_observer(move |event| {
307            observed_clone.lock().unwrap().push(event.payload.clone());
308        });
309
310        let hat = Hat::new("impl", "Implementer").subscribe("task.*");
311        bus.register(hat);
312
313        // Publish events - observer should see all regardless of routing
314        bus.publish(Event::new("task.start", "Start"));
315        bus.publish(Event::new("other.event", "Other")); // No subscriber
316        bus.publish(Event::new("task.done", "Done"));
317
318        let captured = observed.lock().unwrap();
319        assert_eq!(captured.len(), 3);
320        assert_eq!(captured[0], "Start");
321        assert_eq!(captured[1], "Other");
322        assert_eq!(captured[2], "Done");
323    }
324
325    #[test]
326    fn test_multiple_observers() {
327        use std::sync::{Arc, Mutex};
328
329        let mut bus = EventBus::new();
330        let observer1_count = Arc::new(Mutex::new(0));
331        let observer2_count = Arc::new(Mutex::new(0));
332
333        let count1 = Arc::clone(&observer1_count);
334        bus.add_observer(move |_| {
335            *count1.lock().unwrap() += 1;
336        });
337
338        let count2 = Arc::clone(&observer2_count);
339        bus.add_observer(move |_| {
340            *count2.lock().unwrap() += 1;
341        });
342
343        bus.publish(Event::new("test", "1"));
344        bus.publish(Event::new("test", "2"));
345
346        // Both observers should have received both events
347        assert_eq!(*observer1_count.lock().unwrap(), 2);
348        assert_eq!(*observer2_count.lock().unwrap(), 2);
349    }
350
351    #[test]
352    fn test_clear_observers() {
353        use std::sync::{Arc, Mutex};
354
355        let mut bus = EventBus::new();
356        let count = Arc::new(Mutex::new(0));
357
358        let count_clone = Arc::clone(&count);
359        bus.add_observer(move |_| {
360            *count_clone.lock().unwrap() += 1;
361        });
362
363        bus.publish(Event::new("test", "1"));
364        assert_eq!(*count.lock().unwrap(), 1);
365
366        bus.clear_observers();
367        bus.publish(Event::new("test", "2"));
368        assert_eq!(*count.lock().unwrap(), 1); // Still 1, observers cleared
369    }
370
371    #[test]
372    fn test_peek_pending_does_not_consume() {
373        let mut bus = EventBus::new();
374
375        let hat = Hat::new("impl", "Implementer").subscribe("*");
376        bus.register(hat);
377
378        bus.publish(Event::new("task.start", "Start"));
379        bus.publish(Event::new("task.continue", "Continue"));
380
381        let hat_id = HatId::new("impl");
382
383        // Peek at pending events
384        let peeked = bus.peek_pending(&hat_id);
385        assert!(peeked.is_some());
386        assert_eq!(peeked.unwrap().len(), 2);
387
388        // Peek again - should still be there
389        let peeked_again = bus.peek_pending(&hat_id);
390        assert!(peeked_again.is_some());
391        assert_eq!(peeked_again.unwrap().len(), 2);
392
393        // Now take them - should consume
394        let taken = bus.take_pending(&hat_id);
395        assert_eq!(taken.len(), 2);
396
397        // Peek after take - should be empty
398        let peeked_after_take = bus.peek_pending(&hat_id);
399        assert!(peeked_after_take.is_none() || peeked_after_take.unwrap().is_empty());
400    }
401}