use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use dactor::{ClusterError, ClusterEvent, ClusterEvents, SubscriptionId};
type SubscriberMap = HashMap<SubscriptionId, Arc<dyn Fn(ClusterEvent) + Send + Sync>>;
#[derive(Clone)]
pub struct RactorClusterEvents {
subscribers: Arc<Mutex<SubscriberMap>>,
next_id: Arc<AtomicU64>,
}
impl RactorClusterEvents {
pub fn new() -> Self {
Self {
subscribers: Arc::new(Mutex::new(HashMap::new())),
next_id: Arc::new(AtomicU64::new(1)),
}
}
pub fn emit(&self, event: ClusterEvent) {
let snapshot: Vec<_> = {
let subs = self.subscribers.lock().unwrap();
subs.values().cloned().collect()
};
for sub in snapshot {
let event_clone = event.clone();
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
sub(event_clone);
}));
}
}
}
impl Default for RactorClusterEvents {
fn default() -> Self {
Self::new()
}
}
impl ClusterEvents for RactorClusterEvents {
fn subscribe(
&self,
on_event: Box<dyn Fn(ClusterEvent) + Send + Sync>,
) -> Result<SubscriptionId, ClusterError> {
let id = SubscriptionId::from_raw(self.next_id.fetch_add(1, Ordering::SeqCst));
let mut subs = self.subscribers.lock().unwrap();
subs.insert(id, Arc::from(on_event));
Ok(id)
}
fn unsubscribe(&self, id: SubscriptionId) -> Result<(), ClusterError> {
let mut subs = self.subscribers.lock().unwrap();
subs.remove(&id);
Ok(())
}
}