use std::any::{Any, TypeId};
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex, RwLock};
use log::warn;
const MAX_QUEUE_SIZE: usize = 1000;
pub trait Event: Any + Send + Sync + 'static {
fn as_any(&self) -> &dyn Any;
fn type_name(&self) -> &'static str;
}
impl<T: Any + Send + Sync + 'static> Event for T {
fn as_any(&self) -> &dyn Any {
self
}
fn type_name(&self) -> &'static str {
std::any::type_name::<T>()
}
}
type Callback = Arc<dyn Fn(&dyn Any) + Send + Sync>;
pub type BoxedEvent = Box<dyn Event>;
#[derive(Clone)]
pub struct EventBus {
subscribers: Arc<RwLock<HashMap<TypeId, Vec<Callback>>>>,
queue: Arc<Mutex<VecDeque<BoxedEvent>>>,
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
impl EventBus {
pub fn new() -> Self {
Self {
subscribers: Arc::new(RwLock::new(HashMap::new())),
queue: Arc::new(Mutex::new(VecDeque::new())),
}
}
pub fn subscribe<E, F>(&self, callback: F)
where
E: Event,
F: Fn(&E) + Send + Sync + 'static,
{
let type_id = TypeId::of::<E>();
let wrapped: Callback = Arc::new(move |any: &dyn Any| {
if let Some(event) = any.downcast_ref::<E>() {
callback(event);
}
});
self.subscribers
.write()
.unwrap_or_else(|e| e.into_inner())
.entry(type_id)
.or_default()
.push(wrapped);
}
pub fn emit<E: Event + Clone>(&self, event: E) {
let type_id = TypeId::of::<E>();
let callbacks = {
let subs = self.subscribers.read().unwrap_or_else(|e| e.into_inner());
subs.get(&type_id).cloned()
};
if let Some(cbs) = callbacks {
for cb in cbs {
cb(&event);
}
}
let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
if queue.len() >= MAX_QUEUE_SIZE {
let evict_count = queue.len() / 2;
warn!("EventBus queue full ({} events), evicting oldest {}", queue.len(), evict_count);
for _ in 0..evict_count {
queue.pop_front();
}
}
queue.push_back(Box::new(event));
}
pub fn emit_boxed(&self, event: BoxedEvent) {
let type_id = (*event).type_id();
let callbacks = {
let subs = self.subscribers.read().unwrap_or_else(|e| e.into_inner());
subs.get(&type_id).cloned()
};
if let Some(cbs) = callbacks {
for cb in cbs {
cb((*event).as_any());
}
}
let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
if queue.len() >= MAX_QUEUE_SIZE {
let evict_count = queue.len() / 2;
warn!("EventBus queue full ({} events), evicting oldest {}", queue.len(), evict_count);
for _ in 0..evict_count {
queue.pop_front();
}
}
queue.push_back(event);
}
pub fn poll(&self) -> Vec<BoxedEvent> {
let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
queue.drain(..).collect()
}
pub fn emitter(&self) -> EventEmitter {
EventEmitter {
subscribers: Arc::clone(&self.subscribers),
queue: Arc::clone(&self.queue),
}
}
pub fn unsubscribe_all<E: Event>(&self) {
self.subscribers.write().unwrap_or_else(|e| e.into_inner()).remove(&TypeId::of::<E>());
}
pub fn clear(&self) {
self.subscribers.write().unwrap_or_else(|e| e.into_inner()).clear();
self.queue.lock().unwrap_or_else(|e| e.into_inner()).clear();
}
pub fn has_subscribers<E: Event>(&self) -> bool {
self.subscribers
.read()
.unwrap_or_else(|e| e.into_inner())
.get(&TypeId::of::<E>())
.map(|v| !v.is_empty())
.unwrap_or(false)
}
pub fn queue_len(&self) -> usize {
self.queue.lock().unwrap_or_else(|e| e.into_inner()).len()
}
}
#[derive(Clone)]
pub struct EventEmitter {
subscribers: Arc<RwLock<HashMap<TypeId, Vec<Callback>>>>,
queue: Arc<Mutex<VecDeque<BoxedEvent>>>,
}
impl std::fmt::Debug for EventEmitter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventEmitter")
.field("subscriber_types", &self.subscribers.read().map(|s| s.len()).unwrap_or(0))
.field("queue_len", &self.queue.lock().map(|q| q.len()).unwrap_or(0))
.finish()
}
}
impl EventEmitter {
pub fn emit<E: Event + Clone>(&self, event: E) {
let type_id = TypeId::of::<E>();
let callbacks = {
let subs = self.subscribers.read().unwrap_or_else(|e| e.into_inner());
subs.get(&type_id).cloned()
};
if let Some(cbs) = callbacks {
for cb in cbs {
cb(&event);
}
}
let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
if queue.len() >= MAX_QUEUE_SIZE {
let evict_count = queue.len() / 2;
warn!("EventEmitter queue full ({} events), evicting oldest {}", queue.len(), evict_count);
for _ in 0..evict_count {
queue.pop_front();
}
}
queue.push_back(Box::new(event));
}
pub fn emit_boxed(&self, event: BoxedEvent) {
let type_id = (*event).type_id();
let callbacks = {
let subs = self.subscribers.read().unwrap_or_else(|e| e.into_inner());
subs.get(&type_id).cloned()
};
if let Some(cbs) = callbacks {
for cb in cbs {
cb((*event).as_any());
}
}
let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
if queue.len() >= MAX_QUEUE_SIZE {
let evict_count = queue.len() / 2;
warn!("EventEmitter queue full ({} events), evicting oldest {}", queue.len(), evict_count);
for _ in 0..evict_count {
queue.pop_front();
}
}
queue.push_back(event);
}
}
#[derive(Clone, Default, Debug)]
pub struct CompEventEmitter {
inner: Option<EventEmitter>,
}
impl CompEventEmitter {
pub fn dummy() -> Self {
Self { inner: None }
}
pub fn from_emitter(emitter: EventEmitter) -> Self {
Self { inner: Some(emitter) }
}
pub fn emit<E: Event + Clone>(&self, event: E) {
if let Some(ref emitter) = self.inner {
emitter.emit(event);
}
}
}
#[inline]
pub fn downcast_event<E: Event>(event: &BoxedEvent) -> Option<&E> {
(**event).as_any().downcast_ref::<E>()
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicI32, Ordering};
#[derive(Clone, Debug)]
struct TestEvent { value: i32 }
#[derive(Clone, Debug)]
struct OtherEvent { msg: String }
#[test]
fn test_subscribe_emit_immediate() {
let bus = EventBus::new();
let counter = Arc::new(AtomicI32::new(0));
let c = Arc::clone(&counter);
bus.subscribe::<TestEvent, _>(move |e| {
c.fetch_add(e.value, Ordering::SeqCst);
});
bus.emit(TestEvent { value: 10 });
assert_eq!(counter.load(Ordering::SeqCst), 10);
bus.emit(TestEvent { value: 5 });
assert_eq!(counter.load(Ordering::SeqCst), 15);
}
#[test]
fn test_emit_queues_for_poll() {
let bus = EventBus::new();
bus.emit(TestEvent { value: 1 });
bus.emit(TestEvent { value: 2 });
bus.emit(OtherEvent { msg: "hello".into() });
let events = bus.poll();
assert_eq!(events.len(), 3);
assert_eq!(bus.poll().len(), 0);
}
#[test]
fn test_multiple_subscribers() {
let bus = EventBus::new();
let counter1 = Arc::new(AtomicI32::new(0));
let counter2 = Arc::new(AtomicI32::new(0));
let c1 = Arc::clone(&counter1);
bus.subscribe::<TestEvent, _>(move |e| {
c1.fetch_add(e.value, Ordering::SeqCst);
});
let c2 = Arc::clone(&counter2);
bus.subscribe::<TestEvent, _>(move |e| {
c2.fetch_add(e.value * 2, Ordering::SeqCst);
});
bus.emit(TestEvent { value: 10 });
assert_eq!(counter1.load(Ordering::SeqCst), 10);
assert_eq!(counter2.load(Ordering::SeqCst), 20);
}
#[test]
fn test_emitter_handle() {
let bus = EventBus::new();
let counter = Arc::new(AtomicI32::new(0));
let c = Arc::clone(&counter);
bus.subscribe::<TestEvent, _>(move |e| {
c.fetch_add(e.value, Ordering::SeqCst);
});
let emitter = bus.emitter();
emitter.emit(TestEvent { value: 42 });
assert_eq!(counter.load(Ordering::SeqCst), 42);
assert_eq!(bus.poll().len(), 1);
}
#[test]
fn test_unsubscribe() {
let bus = EventBus::new();
let counter = Arc::new(AtomicI32::new(0));
let c = Arc::clone(&counter);
bus.subscribe::<TestEvent, _>(move |e| {
c.fetch_add(e.value, Ordering::SeqCst);
});
bus.emit(TestEvent { value: 10 });
assert_eq!(counter.load(Ordering::SeqCst), 10);
bus.unsubscribe_all::<TestEvent>();
bus.emit(TestEvent { value: 10 });
assert_eq!(counter.load(Ordering::SeqCst), 10);
assert_eq!(bus.poll().len(), 2);
}
#[test]
fn test_downcast() {
let bus = EventBus::new();
bus.emit(TestEvent { value: 42 });
for ev in bus.poll() {
if let Some(e) = downcast_event::<TestEvent>(&ev) {
assert_eq!(e.value, 42);
}
}
}
}