event_bus/
lib.rs

1#[macro_use]
2extern crate lazy_static;
3#[macro_use]
4extern crate log;
5
6use std::sync::{Mutex, RwLock};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::collections::HashMap;
9use std::any::{TypeId, Any};
10
11#[macro_export]
12macro_rules! subscribe_event {
13    ($b:expr, $h:expr) => {
14        $crate::subscribe_event($b, $h, 0)
15    };
16    ($b:expr, $h:expr, $p:expr) => {
17        $crate::subscribe_event($b, $h, $p)
18    };
19}
20
21#[macro_export]
22macro_rules! dispatch_event {
23    ($b:expr, $e:expr) => {
24        $crate::dispatch_event($b, $e)
25    };
26}
27
28lazy_static! {
29    static ref EVENT_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
30    static ref EVENT_ID_MAP: Mutex<HashMap<TypeId, usize>> = Mutex::new(HashMap::new());
31
32    // Map (bus name, set of handlers + priority)
33    static ref EVENT_HANDLER_MAP: RwLock<HashMap<
34        String, // bus name
35        RwLock<HashMap<
36            usize, // event id
37            Box<dyn Any + Send + Sync + 'static>, // event handlers
38        >>
39    >> = RwLock::new(HashMap::new());
40}
41
42pub trait Event: 'static {
43    fn cancellable(&self) -> bool {
44        false
45    }
46
47    fn cancelled(&self) -> bool {
48        false
49    }
50
51    fn set_cancelled(&mut self, _cancel: bool) {
52        panic!("Cannot cancel event that is not cancellable!");
53    }
54
55    fn cancel(&mut self) {
56        self.set_cancelled(true);
57    }
58}
59
60pub struct EventBus {
61    name: String,
62}
63
64struct EventHandlers<T: Event>(Vec<(usize, Box<dyn Fn(&mut T) + Send + Sync + 'static>)>);
65
66impl<T: Event> Default for EventHandlers<T> {
67    fn default() -> Self {
68        EventHandlers(vec![])
69    }
70}
71
72impl EventBus {
73    pub fn new<S: Into<String>>(name: S) -> EventBus {
74        let name = name.into();
75        let mut map = EVENT_HANDLER_MAP.write()
76            .expect("Failed to get write guard on handler map");
77
78        // ensure bus doesn't already exist
79        if map.contains_key(&name) {
80            panic!("Event bus named '{}' already exists!", name);
81        }
82
83        // insert bus into handlers map
84        map.entry(name.clone()).or_insert_with(||
85            RwLock::new(HashMap::new())
86        );
87
88        EventBus { name }
89    }
90}
91
92impl Drop for EventBus {
93    fn drop(&mut self) {
94        EVENT_HANDLER_MAP.write()
95            .expect("Failed to get write guard on handler map")
96            .remove(&self.name);
97    }
98}
99
100pub fn dispatch_event<T: Event>(bus: &str, event: &mut T) {
101    let event_id = get_event_id::<T>();
102    let map = EVENT_HANDLER_MAP.read()
103        .expect("Failed to get read guard on handler map");
104
105    if map.contains_key(bus) {
106        let event_id_map = map.get(bus).unwrap()
107            .read().expect("Failed to get read guard on event id map");
108
109        if let Some(handlers) = event_id_map.get(&event_id) {
110            let handlers = handlers.downcast_ref::<EventHandlers<T>>().unwrap();
111            let cancellable = event.cancellable();
112
113            for handler in handlers.0.iter().rev() {
114                handler.1(event);
115
116                if cancellable && event.cancelled() {
117                    break;
118                }
119            }
120        }
121    } else {
122        warn!("Cannot dispatch event on invalid bus: '{}'", bus);
123    }
124}
125
126pub fn subscribe_event<T: Event, H: Fn(&mut T) + Send + Sync + 'static>(bus: &str, handler: H, priority: usize) {
127    let event_id = get_event_id::<T>();
128    let map = EVENT_HANDLER_MAP.read()
129        .expect("Failed to get read guard on handler map");
130
131    if map.contains_key(bus) {
132        let mut event_id_map = map.get(bus).unwrap()
133            .write().expect("Failed to get write guard on event id map");
134
135        let handlers = event_id_map
136            .entry(event_id)
137            .or_insert(Box::new(EventHandlers::<T>::default()))
138            .downcast_mut::<EventHandlers<T>>()
139            .unwrap();
140
141            // et pos = match vec.binary_search_by(|probe| probe.0.cmp(&priority)) { Ok(p) => p, Err(p) => p };
142        let pos = match handlers.0.binary_search_by(|probe| probe.0.cmp(&priority)) {
143            Ok(pos) => pos,
144            Err(pos) => pos,
145        };
146
147        handlers.0.insert(pos, (priority, Box::new(handler)));
148    } else {
149        warn!("Cannot subscribe on invalid bus: '{}'", bus);
150    }
151}
152
153fn get_event_id<T: Event>() -> usize {
154    *EVENT_ID_MAP.lock()
155        .expect("Failed to lock event id map")
156        .entry(TypeId::of::<T>()).or_insert_with(||
157            EVENT_ID_COUNTER.fetch_add(1, Ordering::Relaxed)
158        )
159}