ralph_proto/
event_bus.rs

1//! Event bus for pub/sub messaging.
2//!
3//! The event bus routes events to subscribed hats based on topic patterns.
4//! Multiple observers can be added to receive all published events for
5//! recording, TUI updates, and benchmarking purposes.
6
7use crate::{Event, Hat, HatId};
8use std::collections::HashMap;
9
10/// Type alias for the observer callback function.
11type Observer = Box<dyn Fn(&Event) + Send + 'static>;
12
13/// Central pub/sub hub for routing events between hats.
14#[derive(Default)]
15pub struct EventBus {
16    /// Registered hats indexed by ID.
17    hats: HashMap<HatId, Hat>,
18
19    /// Pending events for each hat.
20    pending: HashMap<HatId, Vec<Event>>,
21
22    /// Observers that receive all published events.
23    /// Multiple observers can be registered (e.g., session recorder + TUI).
24    observers: Vec<Observer>,
25}
26
27impl EventBus {
28    /// Creates a new empty event bus.
29    pub fn new() -> Self {
30        Self::default()
31    }
32
33    /// Adds an observer that receives all published events.
34    ///
35    /// Multiple observers can be added (e.g., session recorder + TUI).
36    /// Each observer is called before events are routed to subscribers.
37    /// This enables recording sessions by subscribing to the event stream
38    /// without modifying the routing logic.
39    pub fn add_observer<F>(&mut self, observer: F)
40    where
41        F: Fn(&Event) + Send + 'static,
42    {
43        self.observers.push(Box::new(observer));
44    }
45
46    /// Sets a single observer, clearing any existing observers.
47    ///
48    /// Prefer `add_observer` when multiple observers are needed.
49    /// This method is kept for backwards compatibility.
50    #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
51    pub fn set_observer<F>(&mut self, observer: F)
52    where
53        F: Fn(&Event) + Send + 'static,
54    {
55        self.observers.clear();
56        self.observers.push(Box::new(observer));
57    }
58
59    /// Clears all observer callbacks.
60    pub fn clear_observers(&mut self) {
61        self.observers.clear();
62    }
63
64    /// Registers a hat with the event bus.
65    pub fn register(&mut self, hat: Hat) {
66        let id = hat.id.clone();
67        self.hats.insert(id.clone(), hat);
68        self.pending.entry(id).or_default();
69    }
70
71    /// Publishes an event to all subscribed hats.
72    ///
73    /// Returns the list of hat IDs that received the event.
74    /// If an observer is set, it receives the event before routing.
75    #[allow(clippy::needless_pass_by_value)] // Event is cloned to multiple recipients
76    pub fn publish(&mut self, event: Event) -> Vec<HatId> {
77        // Notify all observers before routing
78        for observer in &self.observers {
79            observer(&event);
80        }
81
82        let mut recipients = Vec::new();
83
84        // If there's a direct target, route only to that hat
85        if let Some(ref target) = event.target {
86            if self.hats.contains_key(target) {
87                self.pending
88                    .entry(target.clone())
89                    .or_default()
90                    .push(event.clone());
91                recipients.push(target.clone());
92            }
93            return recipients;
94        }
95
96        // Route with priority: specific subscriptions > fallback wildcards
97        // Per spec: "If event has subscriber → Select that hat's backend"
98        //           "If no subscriber → Select Ralph's backend (cli.backend)"
99
100        // First, find hats with specific (non-global-wildcard) subscriptions
101        let mut specific_recipients = Vec::new();
102        let mut fallback_recipients = Vec::new();
103
104        for (id, hat) in &self.hats {
105            if hat.has_specific_subscription(&event.topic) {
106                // Hat has a specific subscription for this topic
107                specific_recipients.push(id.clone());
108            } else if hat.is_subscribed(&event.topic) {
109                // Hat matches only via global wildcard (fallback)
110                fallback_recipients.push(id.clone());
111            }
112        }
113
114        // Use specific subscribers if any, otherwise fall back to wildcard handlers
115        let chosen_recipients = if specific_recipients.is_empty() {
116            fallback_recipients
117        } else {
118            specific_recipients
119        };
120
121        for id in chosen_recipients {
122            self.pending
123                .entry(id.clone())
124                .or_default()
125                .push(event.clone());
126            recipients.push(id);
127        }
128
129        recipients
130    }
131
132    /// Takes all pending events for a hat.
133    pub fn take_pending(&mut self, hat_id: &HatId) -> Vec<Event> {
134        self.pending.remove(hat_id).unwrap_or_default()
135    }
136
137    /// Returns a reference to pending events for a hat without consuming them.
138    pub fn peek_pending(&self, hat_id: &HatId) -> Option<&Vec<Event>> {
139        self.pending.get(hat_id)
140    }
141
142    /// Checks if there are any pending events for any hat.
143    pub fn has_pending(&self) -> bool {
144        self.pending.values().any(|events| !events.is_empty())
145    }
146
147    /// Returns the next hat with pending events.
148    pub fn next_hat_with_pending(&self) -> Option<&HatId> {
149        self.pending
150            .iter()
151            .find(|(_, events)| !events.is_empty())
152            .map(|(id, _)| id)
153    }
154
155    /// Gets a hat by ID.
156    pub fn get_hat(&self, id: &HatId) -> Option<&Hat> {
157        self.hats.get(id)
158    }
159
160    /// Returns all registered hat IDs.
161    pub fn hat_ids(&self) -> impl Iterator<Item = &HatId> {
162        self.hats.keys()
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169
170    #[test]
171    fn test_publish_to_subscriber() {
172        let mut bus = EventBus::new();
173
174        let hat = Hat::new("impl", "Implementer").subscribe("task.*");
175        bus.register(hat);
176
177        let event = Event::new("task.start", "Start implementing");
178        let recipients = bus.publish(event);
179
180        assert_eq!(recipients.len(), 1);
181        assert_eq!(recipients[0].as_str(), "impl");
182    }
183
184    #[test]
185    fn test_no_match() {
186        let mut bus = EventBus::new();
187
188        let hat = Hat::new("impl", "Implementer").subscribe("task.*");
189        bus.register(hat);
190
191        let event = Event::new("review.done", "Review complete");
192        let recipients = bus.publish(event);
193
194        assert!(recipients.is_empty());
195    }
196
197    #[test]
198    fn test_direct_target() {
199        let mut bus = EventBus::new();
200
201        let impl_hat = Hat::new("impl", "Implementer").subscribe("task.*");
202        let review_hat = Hat::new("reviewer", "Reviewer").subscribe("impl.*");
203        bus.register(impl_hat);
204        bus.register(review_hat);
205
206        // Direct target bypasses subscription matching
207        let event = Event::new("handoff", "Please review").with_target("reviewer");
208        let recipients = bus.publish(event);
209
210        assert_eq!(recipients.len(), 1);
211        assert_eq!(recipients[0].as_str(), "reviewer");
212    }
213
214    #[test]
215    fn test_take_pending() {
216        let mut bus = EventBus::new();
217
218        let hat = Hat::new("impl", "Implementer").subscribe("*");
219        bus.register(hat);
220
221        bus.publish(Event::new("task.start", "Start"));
222        bus.publish(Event::new("task.continue", "Continue"));
223
224        let hat_id = HatId::new("impl");
225        let events = bus.take_pending(&hat_id);
226
227        assert_eq!(events.len(), 2);
228        assert!(bus.take_pending(&hat_id).is_empty());
229    }
230
231    #[test]
232    fn test_self_routing_allowed() {
233        // Self-routing is allowed to handle LLM non-determinism.
234        // Spec acceptance criteria: planner emits build.done (even though builder "should"),
235        // event routes back to planner, planner continues (no source-based blocking).
236        let mut bus = EventBus::new();
237
238        let planner = Hat::new("planner", "Planner").subscribe("build.done");
239        bus.register(planner);
240
241        // Planner emits build.done (wrong hat, but LLMs are non-deterministic)
242        let event = Event::new("build.done", "Done").with_source("planner");
243        let recipients = bus.publish(event);
244
245        // Event SHOULD route back to planner (self-routing allowed, no source filtering)
246        assert_eq!(recipients.len(), 1);
247        assert_eq!(recipients[0].as_str(), "planner");
248    }
249
250    #[test]
251    fn test_observer_receives_all_events() {
252        use std::sync::{Arc, Mutex};
253
254        let mut bus = EventBus::new();
255        let observed: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
256
257        let observed_clone = Arc::clone(&observed);
258        bus.add_observer(move |event| {
259            observed_clone.lock().unwrap().push(event.payload.clone());
260        });
261
262        let hat = Hat::new("impl", "Implementer").subscribe("task.*");
263        bus.register(hat);
264
265        // Publish events - observer should see all regardless of routing
266        bus.publish(Event::new("task.start", "Start"));
267        bus.publish(Event::new("other.event", "Other")); // No subscriber
268        bus.publish(Event::new("task.done", "Done"));
269
270        let captured = observed.lock().unwrap();
271        assert_eq!(captured.len(), 3);
272        assert_eq!(captured[0], "Start");
273        assert_eq!(captured[1], "Other");
274        assert_eq!(captured[2], "Done");
275    }
276
277    #[test]
278    fn test_multiple_observers() {
279        use std::sync::{Arc, Mutex};
280
281        let mut bus = EventBus::new();
282        let observer1_count = Arc::new(Mutex::new(0));
283        let observer2_count = Arc::new(Mutex::new(0));
284
285        let count1 = Arc::clone(&observer1_count);
286        bus.add_observer(move |_| {
287            *count1.lock().unwrap() += 1;
288        });
289
290        let count2 = Arc::clone(&observer2_count);
291        bus.add_observer(move |_| {
292            *count2.lock().unwrap() += 1;
293        });
294
295        bus.publish(Event::new("test", "1"));
296        bus.publish(Event::new("test", "2"));
297
298        // Both observers should have received both events
299        assert_eq!(*observer1_count.lock().unwrap(), 2);
300        assert_eq!(*observer2_count.lock().unwrap(), 2);
301    }
302
303    #[test]
304    fn test_clear_observers() {
305        use std::sync::{Arc, Mutex};
306
307        let mut bus = EventBus::new();
308        let count = Arc::new(Mutex::new(0));
309
310        let count_clone = Arc::clone(&count);
311        bus.add_observer(move |_| {
312            *count_clone.lock().unwrap() += 1;
313        });
314
315        bus.publish(Event::new("test", "1"));
316        assert_eq!(*count.lock().unwrap(), 1);
317
318        bus.clear_observers();
319        bus.publish(Event::new("test", "2"));
320        assert_eq!(*count.lock().unwrap(), 1); // Still 1, observers cleared
321    }
322
323    #[test]
324    fn test_peek_pending_does_not_consume() {
325        let mut bus = EventBus::new();
326
327        let hat = Hat::new("impl", "Implementer").subscribe("*");
328        bus.register(hat);
329
330        bus.publish(Event::new("task.start", "Start"));
331        bus.publish(Event::new("task.continue", "Continue"));
332
333        let hat_id = HatId::new("impl");
334
335        // Peek at pending events
336        let peeked = bus.peek_pending(&hat_id);
337        assert!(peeked.is_some());
338        assert_eq!(peeked.unwrap().len(), 2);
339
340        // Peek again - should still be there
341        let peeked_again = bus.peek_pending(&hat_id);
342        assert!(peeked_again.is_some());
343        assert_eq!(peeked_again.unwrap().len(), 2);
344
345        // Now take them - should consume
346        let taken = bus.take_pending(&hat_id);
347        assert_eq!(taken.len(), 2);
348
349        // Peek after take - should be empty
350        let peeked_after_take = bus.peek_pending(&hat_id);
351        assert!(peeked_after_take.is_none() || peeked_after_take.unwrap().is_empty());
352    }
353}