mod handler;
mod sender;
pub use sender::EventSender;
use std::{any::TypeId, collections::HashMap, sync::Arc};
use reovim_arch::sync::{ArcSwap, Mutex};
use handler::{ContextHandlerFn, HandlerFn, HandlerType, RegisteredHandler};
use super::{
channel::{BoundedReceiver, BoundedSender, bounded},
context::{DispatchResult, HandlerContext},
event::{DynEvent, Event, EventResult, TargetedEvent},
scope::EventScope,
subscription::{Subscription, SubscriptionId},
};
struct ChannelInner {
tx: BoundedSender<DynEvent>,
rx: Mutex<Option<BoundedReceiver<DynEvent>>>,
}
struct EventBusInner {
handlers: ArcSwap<HashMap<TypeId, Vec<RegisteredHandler>>>,
queue: Mutex<Vec<DynEvent>>,
channel: Option<ChannelInner>,
}
#[derive(Clone)]
pub struct EventBus {
inner: Arc<EventBusInner>,
}
impl EventBus {
#[must_use]
pub fn new() -> Self {
Self {
inner: Arc::new(EventBusInner {
handlers: ArcSwap::from_pointee(HashMap::new()),
queue: Mutex::new(Vec::new()),
channel: None,
}),
}
}
#[must_use]
pub fn new_with_channel(capacity: usize) -> Self {
let (tx, rx) = bounded(capacity);
Self {
inner: Arc::new(EventBusInner {
handlers: ArcSwap::from_pointee(HashMap::new()),
queue: Mutex::new(Vec::new()),
channel: Some(ChannelInner {
tx,
rx: Mutex::new(Some(rx)),
}),
}),
}
}
#[must_use]
pub fn sender(&self) -> Option<EventSender> {
self.inner
.channel
.as_ref()
.map(|c| EventSender { tx: c.tx.clone() })
}
#[must_use]
pub fn take_receiver(&self) -> Option<BoundedReceiver<DynEvent>> {
self.inner.channel.as_ref()?.rx.lock().take()
}
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn subscribe<E, F>(&self, priority: u32, handler: F) -> Subscription
where
E: Event,
F: Fn(&E) -> EventResult + Send + Sync + 'static,
{
let type_id = TypeId::of::<E>();
let sub_id = SubscriptionId::new();
let wrapped_handler: HandlerFn = Arc::new(move |dyn_event: &DynEvent| {
dyn_event
.downcast_ref::<E>()
.map_or(EventResult::NotHandled, &handler)
});
let registered = RegisteredHandler {
id: sub_id,
priority,
handler: HandlerType::Simple(wrapped_handler),
};
self.inner.handlers.rcu(|current| {
let mut new_map = (**current).clone();
let handlers = new_map.entry(type_id).or_default();
handlers.push(registered.clone());
handlers.sort_by_key(|h| h.priority);
new_map
});
let inner = Arc::clone(&self.inner);
let unsubscribe = move || {
inner.handlers.rcu(|current| {
let mut new_map = (**current).clone();
if let Some(handlers) = new_map.get_mut(&type_id) {
handlers.retain(|h| h.id != sub_id);
if handlers.is_empty() {
new_map.remove(&type_id);
}
}
new_map
});
};
Subscription::new::<E>(sub_id, unsubscribe)
}
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn subscribe_with_context<E, F>(&self, priority: u32, handler: F) -> Subscription
where
E: Event,
F: Fn(&E, &mut HandlerContext) -> EventResult + Send + Sync + 'static,
{
let type_id = TypeId::of::<E>();
let sub_id = SubscriptionId::new();
let wrapped_handler: ContextHandlerFn = Arc::new(move |dyn_event: &DynEvent, ctx| {
dyn_event
.downcast_ref::<E>()
.map_or(EventResult::NotHandled, |e| handler(e, ctx))
});
let registered = RegisteredHandler {
id: sub_id,
priority,
handler: HandlerType::WithContext(wrapped_handler),
};
self.inner.handlers.rcu(|current| {
let mut new_map = (**current).clone();
let handlers = new_map.entry(type_id).or_default();
handlers.push(registered.clone());
handlers.sort_by_key(|h| h.priority);
new_map
});
let inner = Arc::clone(&self.inner);
let unsubscribe = move || {
inner.handlers.rcu(|current| {
let mut new_map = (**current).clone();
if let Some(handlers) = new_map.get_mut(&type_id) {
handlers.retain(|h| h.id != sub_id);
if handlers.is_empty() {
new_map.remove(&type_id);
}
}
new_map
});
};
Subscription::new::<E>(sub_id, unsubscribe)
}
pub fn subscribe_targeted<E, F>(&self, target: &str, priority: u32, handler: F) -> Subscription
where
E: TargetedEvent,
F: Fn(&E, &mut HandlerContext) -> EventResult + Send + Sync + 'static,
{
let target = target.to_owned();
self.subscribe_with_context::<E, _>(priority, move |event, ctx| {
if event.target() == target {
handler(event, ctx)
} else {
EventResult::NotHandled
}
})
}
pub fn emit<E: Event>(&self, event: E) -> EventResult {
let dyn_event = DynEvent::new(event);
self.dispatch(&dyn_event)
}
pub fn emit_scoped<E: Event>(&self, event: E, scope: &EventScope) -> EventResult {
let dyn_event = DynEvent::new(event).with_scope(scope.clone());
let result = self.dispatch(&dyn_event);
scope.decrement();
result
}
pub fn emit_async<E: Event>(&self, event: E) {
let dyn_event = DynEvent::new(event);
self.inner.queue.lock().push(dyn_event);
}
pub fn emit_async_scoped<E: Event>(&self, event: E, scope: &EventScope) {
scope.increment();
let dyn_event = DynEvent::new(event).with_scope(scope.clone());
self.inner.queue.lock().push(dyn_event);
}
#[must_use]
pub fn process_queue(&self) -> usize {
let events: Vec<_> = {
let mut queue = self.inner.queue.lock();
std::mem::take(&mut *queue)
};
let count = events.len();
for mut event in events {
let scope = event.take_scope();
let _result = self.dispatch(&event);
if let Some(scope) = scope {
scope.decrement();
}
}
count
}
#[must_use]
pub fn dispatch(&self, event: &DynEvent) -> EventResult {
let handlers = self.inner.handlers.load();
let type_id = event.type_id();
let Some(handlers) = handlers.get(&type_id) else {
return EventResult::NotHandled;
};
let mut result = EventResult::NotHandled;
let mut temp_ctx = HandlerContext::new();
for handler in handlers {
let handler_result = match &handler.handler {
HandlerType::Simple(h) => h(event),
HandlerType::WithContext(h) => h(event, &mut temp_ctx),
};
match handler_result {
EventResult::Consumed => {
return EventResult::Consumed;
}
EventResult::Handled => {
result = EventResult::Handled;
}
EventResult::NotHandled => {
}
}
}
result
}
pub fn dispatch_with_context(
&self,
event: &DynEvent,
ctx: &mut HandlerContext,
) -> DispatchResult {
let handlers = self.inner.handlers.load();
let type_id = event.type_id();
let Some(handlers) = handlers.get(&type_id) else {
return DispatchResult::not_handled();
};
let mut result = EventResult::NotHandled;
for handler in handlers {
let handler_result = match &handler.handler {
HandlerType::Simple(h) => h(event),
HandlerType::WithContext(h) => h(event, ctx),
};
match handler_result {
EventResult::Consumed => {
return DispatchResult::new(EventResult::Consumed, ctx);
}
EventResult::Handled => {
result = EventResult::Handled;
}
EventResult::NotHandled => {
}
}
}
DispatchResult::new(result, ctx)
}
#[must_use]
pub fn handler_count<E: Event>(&self) -> usize {
let handlers = self.inner.handlers.load();
handlers.get(&TypeId::of::<E>()).map_or(0, Vec::len)
}
#[must_use]
pub fn total_handler_count(&self) -> usize {
let handlers = self.inner.handlers.load();
handlers.values().map(Vec::len).sum()
}
#[must_use]
pub fn queue_len(&self) -> usize {
self.inner.queue.lock().len()
}
#[must_use]
pub fn queue_is_empty(&self) -> bool {
self.inner.queue.lock().is_empty()
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for EventBus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let handlers = self.inner.handlers.load();
f.debug_struct("EventBus")
.field("event_types", &handlers.len())
.field("total_handlers", &self.total_handler_count())
.field("queue_len", &self.queue_len())
.finish()
}
}