use std::sync::{
atomic::{AtomicU64, Ordering},
Arc, OnceLock, RwLock,
};
use std::time::{SystemTime, UNIX_EPOCH};
use dashmap::{DashMap, DashSet};
use serde_json::Value;
#[derive(Debug, Clone)]
pub struct DispatchEvent {
pub event_type: String,
pub data: Value,
pub seq: u64,
}
impl DispatchEvent {
pub fn new(event_type: String, data: Value) -> Self {
Self { event_type, data, seq: next_event_seq() }
}
}
static EVENT_SEQ: OnceLock<AtomicU64> = OnceLock::new();
fn event_seq_counter() -> &'static AtomicU64 {
EVENT_SEQ.get_or_init(|| {
let seed = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos() as u64).unwrap_or(0);
AtomicU64::new(seed)
})
}
pub fn next_event_seq() -> u64 {
event_seq_counter().fetch_add(1, Ordering::Relaxed)
}
pub type EventCallback = Arc<dyn Fn(DispatchEvent) + Send + Sync>;
#[derive(Clone)]
struct EventListener {
id: String,
callback: EventCallback,
}
static LISTENER_COUNTER: AtomicU64 = AtomicU64::new(0);
fn generate_listener_id() -> String {
let id = LISTENER_COUNTER.fetch_add(1, Ordering::SeqCst);
format!("listener_{}", id)
}
pub struct EventEmitter {
private_events: Arc<DashMap<String, Vec<EventListener>>>,
any_events: Arc<RwLock<Vec<EventListener>>>,
unhandled_events: Arc<RwLock<Vec<EventListener>>>,
handled_types: Arc<DashSet<String>>,
}
pub struct EventSubscription {
id: String,
emitter: EventEmitter,
}
impl EventSubscription {
pub fn id(&self) -> &str {
&self.id
}
pub fn detach(self) -> String {
let id = self.id.clone();
std::mem::forget(self);
id
}
}
impl Drop for EventSubscription {
fn drop(&mut self) {
self.emitter.remove_listener(&self.id);
}
}
impl EventEmitter {
pub fn new() -> Self {
Self {
private_events: Arc::new(DashMap::new()),
any_events: Arc::new(RwLock::new(Vec::new())),
unhandled_events: Arc::new(RwLock::new(Vec::new())),
handled_types: Arc::new(DashSet::new()),
}
}
pub async fn on_event<F>(&self, event_name: &str, callback: F) -> EventSubscription
where
F: Fn(DispatchEvent) + Send + Sync + 'static,
{
let id = generate_listener_id();
let listener = EventListener { id: id.clone(), callback: Arc::new(callback) };
self.private_events.entry(event_name.to_string()).or_default().push(listener);
self.handled_types.insert(event_name.to_string());
EventSubscription { id, emitter: self.clone() }
}
pub async fn on_events<F>(&self, event_names: &str, callback: F) -> Vec<EventSubscription>
where
F: Fn(DispatchEvent) + Send + Sync + Clone + 'static,
{
let mut subscriptions = Vec::new();
for name in event_names.split_whitespace() {
let sub = self.on_event(name, callback.clone()).await;
subscriptions.push(sub);
}
subscriptions
}
pub async fn on_any_event<F>(&self, callback: F) -> EventSubscription
where
F: Fn(DispatchEvent) + Send + Sync + 'static,
{
let id = generate_listener_id();
let listener = EventListener { id: id.clone(), callback: Arc::new(callback) };
self.any_events.write().unwrap_or_else(|e| e.into_inner()).push(listener);
EventSubscription { id, emitter: self.clone() }
}
pub async fn on_unhandled_event<F>(&self, callback: F) -> EventSubscription
where
F: Fn(DispatchEvent) + Send + Sync + 'static,
{
let id = generate_listener_id();
let listener = EventListener { id: id.clone(), callback: Arc::new(callback) };
self.unhandled_events.write().unwrap_or_else(|e| e.into_inner()).push(listener);
EventSubscription { id, emitter: self.clone() }
}
pub fn remove_listener(&self, listener_id: &str) -> bool {
for mut r in self.private_events.iter_mut() {
let listeners = r.value_mut();
if let Some(pos) = listeners.iter().position(|l| l.id == listener_id) {
listeners.remove(pos);
return true;
}
}
{
let mut any = self.any_events.write().unwrap_or_else(|e| e.into_inner());
if let Some(pos) = any.iter().position(|l| l.id == listener_id) {
any.remove(pos);
return true;
}
}
{
let mut unhandled = self.unhandled_events.write().unwrap_or_else(|e| e.into_inner());
if let Some(pos) = unhandled.iter().position(|l| l.id == listener_id) {
unhandled.remove(pos);
return true;
}
}
false
}
pub async fn off_event(&self, listener_id: &str) -> bool {
self.remove_listener(listener_id)
}
pub async fn off_event_by_name(&self, event_names: &str) {
for name in event_names.split_whitespace() {
self.private_events.remove(name);
self.handled_types.remove(name);
}
}
pub async fn off_all(&self, event_name: &str) {
self.private_events.remove(event_name);
self.handled_types.remove(event_name);
}
pub async fn dispatch(&self, event: DispatchEvent) {
let event_type = event.event_type.clone();
let specific_listeners = self.private_events.get(&event_type).map(|l| l.clone());
if let Some(listeners) = specific_listeners {
for listener in listeners {
let ev = event.clone();
let task_name = format!("dispatch::event_handler::{}", event_type);
let _ = tokio::task::Builder::new().name(&task_name).spawn(async move {
(listener.callback)(ev);
});
}
}
let any_listeners = self.any_events.read().unwrap_or_else(|e| e.into_inner()).clone();
for listener in any_listeners {
let ev = event.clone();
let _ = tokio::task::Builder::new().name("dispatch::event_handler::any").spawn(async move {
(listener.callback)(ev);
});
}
if !self.handled_types.contains(&event_type) {
let unhandled_listeners = self.unhandled_events.read().unwrap_or_else(|e| e.into_inner()).clone();
for listener in unhandled_listeners {
let ev = event.clone();
let _ = tokio::task::Builder::new().name("dispatch::event_handler::unhandled").spawn(async move {
(listener.callback)(ev);
});
}
}
}
pub async fn has_listeners(&self, event_name: &str) -> bool {
self.private_events.get(event_name).map(|l| !l.is_empty()).unwrap_or(false)
}
pub async fn listener_count(&self, event_name: &str) -> usize {
self.private_events.get(event_name).map(|l| l.len()).unwrap_or(0)
}
}
impl Default for EventEmitter {
fn default() -> Self {
Self::new()
}
}
impl Clone for EventEmitter {
fn clone(&self) -> Self {
Self {
private_events: Arc::clone(&self.private_events),
any_events: Arc::clone(&self.any_events),
unhandled_events: Arc::clone(&self.unhandled_events),
handled_types: Arc::clone(&self.handled_types),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn next_event_seq_is_strictly_monotonic() {
let a = next_event_seq();
let b = next_event_seq();
let c = next_event_seq();
assert!(b > a, "seq must be strictly increasing: {a} → {b}");
assert!(c > b, "seq must be strictly increasing: {b} → {c}");
}
#[test]
fn dispatch_event_new_assigns_increasing_seqs() {
let e1 = DispatchEvent::new("A".to_string(), Value::Null);
let e2 = DispatchEvent::new("B".to_string(), Value::Null);
assert!(e2.seq > e1.seq);
}
#[test]
fn seed_is_close_to_wall_clock() {
let seq = next_event_seq();
const Y2K_NANOS: u64 = 946_684_800_000_000_000; const Y2100_NANOS: u64 = 4_102_444_800_000_000_000; assert!(seq > Y2K_NANOS, "seq seemed to seed below year-2000 wall-clock: {seq}");
assert!(seq < Y2100_NANOS, "seq seemed to seed above year-2100 wall-clock: {seq}");
}
}