Skip to main content

dactor_ractor/
cluster.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::{Arc, Mutex};
4
5use dactor::{ClusterError, ClusterEvent, ClusterEvents, SubscriptionId};
6
7type SubscriberMap = HashMap<SubscriptionId, Arc<dyn Fn(ClusterEvent) + Send + Sync>>;
8
9/// A dactor `ClusterEvents` implementation for the ractor adapter.
10///
11/// Provides a callback-based subscription system. In production, an
12/// integration with `ractor_cluster` would feed real membership changes
13/// into this subsystem via [`RactorClusterEvents::emit`].
14#[derive(Clone)]
15pub struct RactorClusterEvents {
16    subscribers: Arc<Mutex<SubscriberMap>>,
17    next_id: Arc<AtomicU64>,
18}
19
20impl RactorClusterEvents {
21    /// Create a new cluster events subsystem.
22    pub fn new() -> Self {
23        Self {
24            subscribers: Arc::new(Mutex::new(HashMap::new())),
25            next_id: Arc::new(AtomicU64::new(1)),
26        }
27    }
28
29    /// Emit a cluster event, notifying all subscribers.
30    ///
31    /// Callbacks are snapshot-cloned before invocation so that subscribers
32    /// may safely call `subscribe` or `unsubscribe` from within a callback
33    /// without deadlocking.
34    ///
35    /// In production, this would be driven by `ractor_cluster` membership
36    /// change notifications. For testing, call this directly.
37    pub fn emit(&self, event: ClusterEvent) {
38        let snapshot: Vec<_> = {
39            let subs = self.subscribers.lock().unwrap();
40            subs.values().cloned().collect()
41        };
42        for sub in snapshot {
43            let event_clone = event.clone();
44            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
45                sub(event_clone);
46            }));
47        }
48    }
49}
50
51impl Default for RactorClusterEvents {
52    fn default() -> Self {
53        Self::new()
54    }
55}
56
57impl ClusterEvents for RactorClusterEvents {
58    fn subscribe(
59        &self,
60        on_event: Box<dyn Fn(ClusterEvent) + Send + Sync>,
61    ) -> Result<SubscriptionId, ClusterError> {
62        let id = SubscriptionId::from_raw(self.next_id.fetch_add(1, Ordering::SeqCst));
63        let mut subs = self.subscribers.lock().unwrap();
64        subs.insert(id, Arc::from(on_event));
65        Ok(id)
66    }
67
68    fn unsubscribe(&self, id: SubscriptionId) -> Result<(), ClusterError> {
69        let mut subs = self.subscribers.lock().unwrap();
70        subs.remove(&id);
71        Ok(())
72    }
73}