use std::{hash::Hash, sync::Arc};
use crate::prelude::{BusRef, Error, EventEmitter};
pub struct EventBus<E, V> {
bus: Arc<BusRef<E, V>>,
}
impl<E, V> EventBus<E, V> {
pub fn unbound() -> Self {
Self {
bus: Arc::new(BusRef::unbound()),
}
}
pub fn bound(limit: usize) -> Self {
Self {
bus: Arc::new(BusRef::bound(limit)),
}
}
pub fn disconnected(&self) -> bool {
let bus_lock = self.bus_ref();
bus_lock.disconnected()
}
fn bus_ref(&self) -> &BusRef<E, V> {
self.bus.as_ref()
}
pub fn event_count(&self) -> usize {
self.bus_ref().event_count()
}
}
impl<E, V> EventEmitter<E, V> for EventBus<E, V>
where
E: Eq + Hash,
{
fn on<F>(&self, event: E, f: F) -> Result<(), Error>
where
F: Fn(&BusRef<E, V>, Option<&V>) + 'static,
{
self.bus_ref().on(event, f)
}
fn emit(&self, event: E) -> Result<(), Error> {
self.emit_with_value(event, None)
}
fn emit_with_value(&self, event: E, value: Option<&V>) -> Result<(), Error> {
self.bus_ref().emit_with_value(event, value)
}
}
impl<E, V> Clone for EventBus<E, V> {
fn clone(&self) -> Self {
Self {
bus: Arc::clone(&self.bus),
}
}
}
unsafe impl<E, V> Send for EventBus<E, V> where E: Send {}
unsafe impl<E, V> Sync for EventBus<E, V> where E: Send {}
#[cfg(test)]
mod test {
use super::*;
use std::{cell::RefCell, rc::Rc, sync::Mutex};
#[derive(PartialEq, Eq, Hash)]
enum EventType {
Start,
Stop,
}
#[derive(Debug, PartialEq)]
enum Status {
Stopped,
Started,
}
#[test]
fn create_bus() {
let _bus: EventBus<u8, ()> = EventBus::unbound();
}
#[test]
fn listen_emit_api() {
let bus: EventBus<EventType, ()> = EventBus::unbound();
let status = Rc::new(RefCell::new(Status::Stopped));
let status_closure = Rc::clone(&status);
let status_closure_2 = Rc::clone(&status);
bus.on(EventType::Start, move |_, _| {
*status_closure.borrow_mut() = Status::Started;
})
.unwrap();
bus.on(EventType::Stop, move |_, _| {
*status_closure_2.borrow_mut() = Status::Stopped;
})
.unwrap();
bus.emit(EventType::Start)
.expect("Failed to emit START event");
assert_eq!(*status.borrow(), Status::Started);
assert_eq!(bus.event_count(), 1);
bus.emit(EventType::Stop)
.expect("Failed to emit STOP event");
assert_eq!(*status.borrow(), Status::Stopped);
assert_eq!(bus.event_count(), 2);
}
#[test]
fn listen_emit_api_repeat() {
let bus: EventBus<u8, ()> = EventBus::unbound();
let status = Rc::new(RefCell::new(0));
let status2 = Rc::clone(&status);
bus.on(1u8, move |_, _| {
*status2.borrow_mut() += 1;
})
.unwrap();
for _ in 0..4 {
bus.emit(1).expect("Failed to emit");
}
assert_eq!(*status.borrow(), 4);
assert_eq!(bus.event_count(), 4);
}
#[test]
fn threaded_on() {
let bus: EventBus<EventType, ()> = EventBus::unbound();
let bus_clone = bus.clone();
let status = Arc::new(Mutex::new(Status::Stopped));
let final_status = Arc::clone(&status);
let t1 = std::thread::spawn(move || {
bus_clone
.on(EventType::Start, move |_, _| {
let mut status_lock = status.lock().unwrap();
*status_lock = Status::Started;
})
.unwrap();
});
t1.join().unwrap();
bus.emit(EventType::Start).unwrap();
let final_status_lock = final_status.lock().unwrap();
assert_eq!(*final_status_lock, Status::Started)
}
#[test]
fn threaded_emit() {
let bus: EventBus<EventType, ()> = EventBus::unbound();
let bus_clone = bus.clone();
let status = Arc::new(Mutex::new(Status::Stopped));
let final_status = Arc::clone(&status);
let t1 = std::thread::spawn(move || {
bus.emit(EventType::Start).unwrap();
});
bus_clone
.on(EventType::Start, move |_, _| {
let mut status_lock = status.lock().unwrap();
*status_lock = Status::Started;
})
.unwrap();
t1.join().unwrap();
let final_status_lock = final_status.lock().unwrap();
assert_eq!(*final_status_lock, Status::Started)
}
#[test]
fn with_data() {
let bus: EventBus<EventType, u8> = EventBus::unbound();
let status: Rc<RefCell<Option<u8>>> = Rc::new(RefCell::new(None));
let status_closure = Rc::clone(&status);
bus.on(EventType::Start, move |_, startup_data: Option<&u8>| {
*status_closure.borrow_mut() = Some(*startup_data.unwrap());
})
.unwrap();
bus.emit_with_value(EventType::Start, Some(&123))
.expect("Failed to emit");
assert_eq!(*status.borrow(), Some(123));
assert_eq!(bus.event_count(), 1);
}
#[test]
#[should_panic]
fn exceed_bounds() {
let bus: EventBus<EventType, u8> = EventBus::bound(5);
for i in 0..10 {
bus.emit_with_value(EventType::Start, Some(&i))
.expect("Failed to emit");
}
}
#[test]
fn re_emit() {
let bus: EventBus<EventType, u8> = EventBus::unbound();
let status: Rc<RefCell<Option<u8>>> = Rc::new(RefCell::new(None));
let status_closure = Rc::clone(&status);
let status_closure_2 = Rc::clone(&status);
bus.on(EventType::Start, move |inner_bus, startup_data| {
*status_closure.borrow_mut() = Some(*startup_data.unwrap());
inner_bus
.emit(EventType::Stop)
.expect("Cannot emit STOP event");
})
.unwrap();
bus.on(EventType::Stop, move |_, _| {
*status_closure_2.borrow_mut() = None;
})
.unwrap();
bus.emit_with_value(EventType::Start, Some(&123))
.expect("Failed to emit");
assert_eq!(*status.borrow(), None);
assert_eq!(bus.event_count(), 2);
}
}