use crate::{Event, Hat, HatId};
use std::collections::HashMap;
type Observer = Box<dyn Fn(&Event) + Send + 'static>;
#[derive(Default)]
pub struct EventBus {
hats: HashMap<HatId, Hat>,
pending: HashMap<HatId, Vec<Event>>,
human_pending: Vec<Event>,
observers: Vec<Observer>,
}
impl EventBus {
pub fn new() -> Self {
Self::default()
}
pub fn add_observer<F>(&mut self, observer: F)
where
F: Fn(&Event) + Send + 'static,
{
self.observers.push(Box::new(observer));
}
#[deprecated(since = "2.0.0", note = "Use add_observer instead")]
pub fn set_observer<F>(&mut self, observer: F)
where
F: Fn(&Event) + Send + 'static,
{
self.observers.clear();
self.observers.push(Box::new(observer));
}
pub fn clear_observers(&mut self) {
self.observers.clear();
}
pub fn register(&mut self, hat: Hat) {
let id = hat.id.clone();
self.hats.insert(id.clone(), hat);
self.pending.entry(id).or_default();
}
#[allow(clippy::needless_pass_by_value)] pub fn publish(&mut self, event: Event) -> Vec<HatId> {
for observer in &self.observers {
observer(&event);
}
if event.topic.as_str().starts_with("human.") {
self.human_pending.push(event);
return Vec::new();
}
let mut recipients = Vec::new();
if let Some(ref target) = event.target {
if self.hats.contains_key(target) {
self.pending
.entry(target.clone())
.or_default()
.push(event.clone());
recipients.push(target.clone());
}
return recipients;
}
let mut specific_recipients = Vec::new();
let mut fallback_recipients = Vec::new();
for (id, hat) in &self.hats {
if hat.has_specific_subscription(&event.topic) {
specific_recipients.push(id.clone());
} else if hat.is_subscribed(&event.topic) {
fallback_recipients.push(id.clone());
}
}
let chosen_recipients = if specific_recipients.is_empty() {
fallback_recipients
} else {
specific_recipients
};
for id in chosen_recipients {
self.pending
.entry(id.clone())
.or_default()
.push(event.clone());
recipients.push(id);
}
recipients
}
pub fn take_pending(&mut self, hat_id: &HatId) -> Vec<Event> {
self.pending.remove(hat_id).unwrap_or_default()
}
pub fn take_human_pending(&mut self) -> Vec<Event> {
std::mem::take(&mut self.human_pending)
}
pub fn peek_pending(&self, hat_id: &HatId) -> Option<&Vec<Event>> {
self.pending.get(hat_id)
}
pub fn peek_human_pending(&self) -> &[Event] {
&self.human_pending
}
pub fn has_pending(&self) -> bool {
!self.human_pending.is_empty() || self.pending.values().any(|events| !events.is_empty())
}
pub fn has_human_pending(&self) -> bool {
!self.human_pending.is_empty()
}
pub fn next_hat_with_pending(&self) -> Option<&HatId> {
self.pending
.iter()
.find(|(_, events)| !events.is_empty())
.map(|(id, _)| id)
}
pub fn get_hat(&self, id: &HatId) -> Option<&Hat> {
self.hats.get(id)
}
pub fn hat_ids(&self) -> impl Iterator<Item = &HatId> {
self.hats.keys()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_publish_to_subscriber() {
let mut bus = EventBus::new();
let hat = Hat::new("impl", "Implementer").subscribe("task.*");
bus.register(hat);
let event = Event::new("task.start", "Start implementing");
let recipients = bus.publish(event);
assert_eq!(recipients.len(), 1);
assert_eq!(recipients[0].as_str(), "impl");
}
#[test]
fn test_no_match() {
let mut bus = EventBus::new();
let hat = Hat::new("impl", "Implementer").subscribe("task.*");
bus.register(hat);
let event = Event::new("review.done", "Review complete");
let recipients = bus.publish(event);
assert!(recipients.is_empty());
}
#[test]
fn test_direct_target() {
let mut bus = EventBus::new();
let impl_hat = Hat::new("impl", "Implementer").subscribe("task.*");
let review_hat = Hat::new("reviewer", "Reviewer").subscribe("impl.*");
bus.register(impl_hat);
bus.register(review_hat);
let event = Event::new("handoff", "Please review").with_target("reviewer");
let recipients = bus.publish(event);
assert_eq!(recipients.len(), 1);
assert_eq!(recipients[0].as_str(), "reviewer");
}
#[test]
fn test_take_pending() {
let mut bus = EventBus::new();
let hat = Hat::new("impl", "Implementer").subscribe("*");
bus.register(hat);
bus.publish(Event::new("task.start", "Start"));
bus.publish(Event::new("task.continue", "Continue"));
let hat_id = HatId::new("impl");
let events = bus.take_pending(&hat_id);
assert_eq!(events.len(), 2);
assert!(bus.take_pending(&hat_id).is_empty());
}
#[test]
fn test_human_events_use_separate_queue() {
let mut bus = EventBus::new();
let hat = Hat::new("ralph", "Ralph").subscribe("*");
bus.register(hat);
bus.publish(Event::new("human.interact", "question"));
bus.publish(Event::new("human.response", "hello"));
bus.publish(Event::new("human.guidance", "note"));
assert_eq!(bus.peek_human_pending().len(), 3);
assert_eq!(
bus.peek_pending(&HatId::new("ralph"))
.map(|events| events.len())
.unwrap_or(0),
0
);
let taken = bus.take_human_pending();
assert_eq!(taken.len(), 3);
assert!(!bus.has_human_pending());
}
#[test]
fn test_self_routing_allowed() {
let mut bus = EventBus::new();
let planner = Hat::new("planner", "Planner").subscribe("build.done");
bus.register(planner);
let event = Event::new("build.done", "Done").with_source("planner");
let recipients = bus.publish(event);
assert_eq!(recipients.len(), 1);
assert_eq!(recipients[0].as_str(), "planner");
}
#[test]
fn test_observer_receives_all_events() {
use std::sync::{Arc, Mutex};
let mut bus = EventBus::new();
let observed: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let observed_clone = Arc::clone(&observed);
bus.add_observer(move |event| {
observed_clone.lock().unwrap().push(event.payload.clone());
});
let hat = Hat::new("impl", "Implementer").subscribe("task.*");
bus.register(hat);
bus.publish(Event::new("task.start", "Start"));
bus.publish(Event::new("other.event", "Other")); bus.publish(Event::new("task.done", "Done"));
let captured = observed.lock().unwrap();
assert_eq!(captured.len(), 3);
assert_eq!(captured[0], "Start");
assert_eq!(captured[1], "Other");
assert_eq!(captured[2], "Done");
}
#[test]
fn test_multiple_observers() {
use std::sync::{Arc, Mutex};
let mut bus = EventBus::new();
let observer1_count = Arc::new(Mutex::new(0));
let observer2_count = Arc::new(Mutex::new(0));
let count1 = Arc::clone(&observer1_count);
bus.add_observer(move |_| {
*count1.lock().unwrap() += 1;
});
let count2 = Arc::clone(&observer2_count);
bus.add_observer(move |_| {
*count2.lock().unwrap() += 1;
});
bus.publish(Event::new("test", "1"));
bus.publish(Event::new("test", "2"));
assert_eq!(*observer1_count.lock().unwrap(), 2);
assert_eq!(*observer2_count.lock().unwrap(), 2);
}
#[test]
fn test_clear_observers() {
use std::sync::{Arc, Mutex};
let mut bus = EventBus::new();
let count = Arc::new(Mutex::new(0));
let count_clone = Arc::clone(&count);
bus.add_observer(move |_| {
*count_clone.lock().unwrap() += 1;
});
bus.publish(Event::new("test", "1"));
assert_eq!(*count.lock().unwrap(), 1);
bus.clear_observers();
bus.publish(Event::new("test", "2"));
assert_eq!(*count.lock().unwrap(), 1); }
#[test]
fn test_peek_pending_does_not_consume() {
let mut bus = EventBus::new();
let hat = Hat::new("impl", "Implementer").subscribe("*");
bus.register(hat);
bus.publish(Event::new("task.start", "Start"));
bus.publish(Event::new("task.continue", "Continue"));
let hat_id = HatId::new("impl");
let peeked = bus.peek_pending(&hat_id);
assert!(peeked.is_some());
assert_eq!(peeked.unwrap().len(), 2);
let peeked_again = bus.peek_pending(&hat_id);
assert!(peeked_again.is_some());
assert_eq!(peeked_again.unwrap().len(), 2);
let taken = bus.take_pending(&hat_id);
assert_eq!(taken.len(), 2);
let peeked_after_take = bus.peek_pending(&hat_id);
assert!(peeked_after_take.is_none() || peeked_after_take.unwrap().is_empty());
}
}