1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::{Arc, Mutex};
4
5use dactor::{ClusterError, ClusterEvent, ClusterEvents, SubscriptionId};
6
7type SubscriberMap = HashMap<SubscriptionId, Arc<dyn Fn(ClusterEvent) + Send + Sync>>;
8
9#[derive(Clone)]
15pub struct RactorClusterEvents {
16 subscribers: Arc<Mutex<SubscriberMap>>,
17 next_id: Arc<AtomicU64>,
18}
19
20impl RactorClusterEvents {
21 pub fn new() -> Self {
23 Self {
24 subscribers: Arc::new(Mutex::new(HashMap::new())),
25 next_id: Arc::new(AtomicU64::new(1)),
26 }
27 }
28
29 pub fn emit(&self, event: ClusterEvent) {
38 let snapshot: Vec<_> = {
39 let subs = self.subscribers.lock().unwrap();
40 subs.values().cloned().collect()
41 };
42 for sub in snapshot {
43 let event_clone = event.clone();
44 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
45 sub(event_clone);
46 }));
47 }
48 }
49}
50
51impl Default for RactorClusterEvents {
52 fn default() -> Self {
53 Self::new()
54 }
55}
56
57impl ClusterEvents for RactorClusterEvents {
58 fn subscribe(
59 &self,
60 on_event: Box<dyn Fn(ClusterEvent) + Send + Sync>,
61 ) -> Result<SubscriptionId, ClusterError> {
62 let id = SubscriptionId::from_raw(self.next_id.fetch_add(1, Ordering::SeqCst));
63 let mut subs = self.subscribers.lock().unwrap();
64 subs.insert(id, Arc::from(on_event));
65 Ok(id)
66 }
67
68 fn unsubscribe(&self, id: SubscriptionId) -> Result<(), ClusterError> {
69 let mut subs = self.subscribers.lock().unwrap();
70 subs.remove(&id);
71 Ok(())
72 }
73}