use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use crossbeam_channel::{Receiver, Sender, unbounded};
use crate::signals::StructuralSignal;
#[derive(Clone, Debug)]
enum EventBusMessage {
Signal(StructuralSignal),
Shutdown,
}
type Handler = Box<dyn Fn(StructuralSignal) + Send + 'static>;
type Handlers = Arc<Mutex<HashMap<usize, Handler>>>;
#[derive(Clone)]
pub(crate) struct EventBus {
inner: Arc<EventBusInner>,
}
struct EventBusInner {
next_id: AtomicUsize,
handlers: Handlers,
signal_tx: Sender<EventBusMessage>,
shutdown: Arc<AtomicBool>,
shutdown_complete: Arc<(Mutex<bool>, Condvar)>,
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
impl EventBus {
pub(crate) fn new() -> Self {
let (signal_tx, signal_rx) = unbounded();
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_complete = Arc::new((Mutex::new(false), Condvar::new()));
let handlers: Handlers = Arc::new(Mutex::new(HashMap::new()));
let handlers_clone = Arc::clone(&handlers);
let shutdown_clone = Arc::clone(&shutdown);
let shutdown_complete_clone = Arc::clone(&shutdown_complete);
thread::spawn(move || {
dispatcher_loop(
signal_rx,
handlers_clone,
shutdown_clone,
shutdown_complete_clone,
);
});
Self {
inner: Arc::new(EventBusInner {
next_id: AtomicUsize::new(0),
handlers,
signal_tx,
shutdown,
shutdown_complete,
}),
}
}
pub(crate) fn emit(&self, signal: StructuralSignal) {
let _ = self.inner.signal_tx.send(EventBusMessage::Signal(signal));
}
pub(crate) fn register<F>(&self, handler: F) -> usize
where
F: Fn(StructuralSignal) + Send + 'static,
{
let id = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
self.inner
.handlers
.lock()
.expect("handlers mutex poisoned")
.insert(id, Box::new(handler));
id
}
pub(crate) fn unregister(&self, id: usize) -> bool {
self.inner
.handlers
.lock()
.expect("handlers mutex poisoned")
.remove(&id)
.is_some()
}
pub(crate) fn shutdown(&self) {
self.inner.shutdown.store(true, Ordering::SeqCst);
let _ = self.inner.signal_tx.send(EventBusMessage::Shutdown);
let (lock, cvar) = &*self.inner.shutdown_complete;
let mut completed = lock.lock().unwrap();
while !*completed {
completed = cvar.wait(completed).unwrap();
}
}
pub(crate) fn is_shutdown(&self) -> bool {
self.inner.shutdown.load(Ordering::SeqCst)
}
pub(crate) fn subscribe(&self) -> (usize, Receiver<StructuralSignal>) {
let (tx, rx) = unbounded();
let id = self.register(move |signal| {
let _ = tx.send(signal);
});
(id, rx)
}
pub(crate) fn unsubscribe(&self, id: usize) -> bool {
self.unregister(id)
}
}
fn dispatcher_loop(
rx: Receiver<EventBusMessage>,
handlers: Handlers,
shutdown: Arc<AtomicBool>,
shutdown_complete: Arc<(Mutex<bool>, Condvar)>,
) {
while let Ok(message) = rx.recv() {
let signal = match message {
EventBusMessage::Signal(signal) => signal,
EventBusMessage::Shutdown => break,
};
if shutdown.load(Ordering::SeqCst) {
break;
}
{
let handlers = handlers.lock().expect("handlers mutex poisoned");
for handler in handlers.values() {
handler(signal.clone());
}
}
}
let (lock, cvar) = &*shutdown_complete;
let mut completed = lock.lock().unwrap();
*completed = true;
cvar.notify_all();
}
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicUsize;
use std::time::Duration;
use super::*;
use crate::signals::{ClipboardSignal, StructuralSignal};
#[test]
fn broadcasts_to_subscribers() {
let bus = EventBus::new();
let (_, rx) = bus.subscribe();
bus.emit(StructuralSignal::Clipboard(ClipboardSignal::text(
"hello",
"test-app".to_string(),
)));
thread::sleep(Duration::from_millis(50));
let signal = rx.try_recv().expect("expected signal");
match signal {
StructuralSignal::Clipboard(signal) => assert_eq!(signal.size_bytes, 5),
_ => panic!("expected clipboard signal"),
}
}
#[test]
fn unsubscribes_cleanly() {
let bus = EventBus::new();
let (id, rx) = bus.subscribe();
assert!(bus.unsubscribe(id));
bus.emit(StructuralSignal::Clipboard(ClipboardSignal::text(
"test",
"test-app".to_string(),
)));
thread::sleep(Duration::from_millis(50));
assert!(rx.try_recv().is_err());
}
#[test]
fn register_handler_receives_signals() {
let bus = EventBus::new();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
bus.register(move |_signal| {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
bus.emit(StructuralSignal::Clipboard(ClipboardSignal::text(
"test",
"test-app".to_string(),
)));
thread::sleep(Duration::from_millis(50));
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[test]
fn shutdown_completes_cleanly() {
let bus = EventBus::new();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
bus.register(move |_signal| {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
bus.emit(StructuralSignal::Clipboard(ClipboardSignal::text(
"test",
"test-app".to_string(),
)));
thread::sleep(Duration::from_millis(50));
bus.shutdown();
assert!(bus.is_shutdown());
}
}