Skip to main content

dactor/
cluster.rs

1use crate::errors::ClusterError;
2use crate::node::NodeId;
3
4/// Events emitted by the cluster membership system.
5///
6/// Subscribe via [`ClusterEvents::subscribe`] to react to topology changes
7/// such as scaling, failover, or planned maintenance.
8#[derive(Debug, Clone, PartialEq, Eq)]
9#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
10#[non_exhaustive]
11pub enum ClusterEvent {
12    /// A new node has joined the cluster and is ready to receive messages.
13    NodeJoined(NodeId),
14    /// A node has left the cluster (gracefully or due to failure).
15    NodeLeft(NodeId),
16    /// A node attempted to join but was rejected during the version handshake.
17    ///
18    /// This event is emitted when a connecting node fails the compatibility
19    /// check (different wire protocol version or adapter) or when the
20    /// handshake transport call itself fails. The rejected node does **not**
21    /// appear in the cluster's node list.
22    NodeRejected {
23        /// The node that was rejected.
24        node_id: NodeId,
25        /// Why the node was rejected.
26        reason: NodeRejectionReason,
27        /// Human-readable detail message.
28        detail: String,
29    },
30}
31
32/// Reason a node was rejected during cluster join.
33///
34/// This is the **cluster-level** rejection reason, used in
35/// [`ClusterEvent::NodeRejected`]. It is distinct from
36/// [`RejectionReason`](crate::system_actors::RejectionReason), which is
37/// the handshake-level (wire protocol) reason. Use the [`From`]
38/// implementation to convert handshake rejections into cluster events.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
40#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
41#[non_exhaustive]
42pub enum NodeRejectionReason {
43    /// The remote node's wire protocol MAJOR version differs (Category 1).
44    IncompatibleProtocol,
45    /// The remote node uses a different actor framework adapter.
46    IncompatibleAdapter,
47    /// The transport-level handshake call failed (timeout, network error,
48    /// or the remote node did not respond).
49    ConnectionFailed,
50}
51
52impl std::fmt::Display for NodeRejectionReason {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        match self {
55            NodeRejectionReason::IncompatibleProtocol => {
56                write!(f, "incompatible wire protocol")
57            }
58            NodeRejectionReason::IncompatibleAdapter => {
59                write!(f, "incompatible adapter")
60            }
61            NodeRejectionReason::ConnectionFailed => {
62                write!(f, "connection failed")
63            }
64        }
65    }
66}
67
68impl From<crate::system_actors::RejectionReason> for NodeRejectionReason {
69    fn from(reason: crate::system_actors::RejectionReason) -> Self {
70        match reason {
71            crate::system_actors::RejectionReason::IncompatibleProtocol => {
72                NodeRejectionReason::IncompatibleProtocol
73            }
74            crate::system_actors::RejectionReason::IncompatibleAdapter => {
75                NodeRejectionReason::IncompatibleAdapter
76            }
77        }
78    }
79}
80
81/// Opaque handle returned by [`ClusterEvents::subscribe`], used to cancel
82/// a subscription via [`ClusterEvents::unsubscribe`].
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
84#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
85pub struct SubscriptionId(pub(crate) u64);
86
87impl SubscriptionId {
88    /// Create a new `SubscriptionId` from a raw integer.
89    ///
90    /// Intended for use by adapter crates implementing `ClusterEvents`.
91    pub fn from_raw(id: u64) -> Self {
92        Self(id)
93    }
94}
95
96/// Subscription to cluster membership events.
97pub trait ClusterEvents: Send + Sync + 'static {
98    /// Subscribe to cluster membership changes. The callback is invoked for
99    /// each [`ClusterEvent`] (`NodeJoined`, `NodeLeft`, `NodeRejected`).
100    /// Returns a [`SubscriptionId`] that can be used to cancel the
101    /// subscription.
102    fn subscribe(
103        &self,
104        on_event: Box<dyn Fn(ClusterEvent) + Send + Sync>,
105    ) -> Result<SubscriptionId, ClusterError>;
106
107    /// Remove a previously registered subscription. Idempotent.
108    fn unsubscribe(&self, id: SubscriptionId) -> Result<(), ClusterError>;
109}
110
111// ---------------------------------------------------------------------------
112// ClusterEventEmitter
113// ---------------------------------------------------------------------------
114
115/// In-process event emitter for cluster membership changes.
116///
117/// Manages subscriber callbacks and dispatches [`ClusterEvent`]s to all
118/// active subscribers. Used by adapter runtimes to notify actors of
119/// topology changes.
120pub struct ClusterEventEmitter {
121    next_id: u64,
122    subscribers: std::collections::HashMap<SubscriptionId, Box<dyn Fn(ClusterEvent) + Send + Sync>>,
123}
124
125impl ClusterEventEmitter {
126    /// Create a new emitter with no subscribers.
127    pub fn new() -> Self {
128        Self {
129            next_id: 1,
130            subscribers: std::collections::HashMap::new(),
131        }
132    }
133
134    /// Subscribe to cluster events. Returns a subscription ID.
135    pub fn subscribe(
136        &mut self,
137        on_event: Box<dyn Fn(ClusterEvent) + Send + Sync>,
138    ) -> SubscriptionId {
139        let id = SubscriptionId(self.next_id);
140        self.next_id += 1;
141        self.subscribers.insert(id, on_event);
142        id
143    }
144
145    /// Remove a subscription. Idempotent.
146    pub fn unsubscribe(&mut self, id: SubscriptionId) {
147        self.subscribers.remove(&id);
148    }
149
150    /// Emit an event to all subscribers.
151    pub fn emit(&self, event: ClusterEvent) {
152        for callback in self.subscribers.values() {
153            callback(event.clone());
154        }
155    }
156
157    /// Number of active subscribers.
158    pub fn subscriber_count(&self) -> usize {
159        self.subscribers.len()
160    }
161}
162
163impl Default for ClusterEventEmitter {
164    fn default() -> Self {
165        Self::new()
166    }
167}
168
169// ---------------------------------------------------------------------------
170// AdapterCluster trait (R4: Connection management)
171// ---------------------------------------------------------------------------
172
173/// Connection management for adapter runtimes.
174///
175/// Adapters implement this trait to wire cluster discovery, node connections,
176/// and health monitoring into their runtime. The dactor framework calls
177/// these methods during startup and when topology changes are detected.
178#[async_trait::async_trait]
179pub trait AdapterCluster: Send + Sync + 'static {
180    /// Connect to a remote node. Called when discovery finds a new peer
181    /// or when reconnecting after failure.
182    async fn connect(&self, node: &NodeId) -> Result<(), ClusterError>;
183
184    /// Disconnect from a remote node. Called on graceful shutdown or when
185    /// removing a node from the cluster.
186    async fn disconnect(&self, node: &NodeId) -> Result<(), ClusterError>;
187
188    /// Reconnect to a node (disconnect + connect). Used for recovery after
189    /// transient failures.
190    async fn reconnect(&self, node: &NodeId) -> Result<(), ClusterError> {
191        self.disconnect(node).await?;
192        self.connect(node).await
193    }
194
195    /// Check if a node is currently reachable.
196    async fn is_reachable(&self, node: &NodeId) -> bool;
197
198    /// Get the list of currently connected nodes.
199    async fn connected_nodes(&self) -> Vec<NodeId>;
200}
201
202// ---------------------------------------------------------------------------
203// HealthChecker trait (C5: Health delegation)
204// ---------------------------------------------------------------------------
205
206/// Result of a node health check.
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum HealthStatus {
209    /// Node is healthy and responsive.
210    Healthy,
211    /// Node is unhealthy or unresponsive.
212    Unhealthy {
213        /// Description of the health issue.
214        reason: String,
215    },
216    /// Health check timed out.
217    Timeout,
218}
219
220/// Trait for checking the health of a remote node.
221///
222/// Adapters delegate health checks to their underlying provider's
223/// mechanism (e.g., gRPC health check, TCP ping, heartbeat).
224#[async_trait::async_trait]
225pub trait HealthChecker: Send + Sync + 'static {
226    /// Check the health of a remote node.
227    async fn check(&self, node: &NodeId) -> HealthStatus;
228}
229
230/// Called when a node becomes unreachable. Adapters implement this to
231/// handle the failure (e.g., mark actors as stopped, notify watchers).
232#[async_trait::async_trait]
233pub trait UnreachableHandler: Send + Sync + 'static {
234    /// Called when a node is determined to be unreachable.
235    async fn on_node_unreachable(&self, node: &NodeId);
236}
237
238// ---------------------------------------------------------------------------
239// Tests
240// ---------------------------------------------------------------------------
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245    use std::sync::atomic::{AtomicU64, Ordering};
246    use std::sync::Arc;
247
248    #[test]
249    fn cluster_event_emitter_subscribe_and_emit() {
250        let mut emitter = ClusterEventEmitter::new();
251        let count = Arc::new(AtomicU64::new(0));
252        let count_clone = Arc::clone(&count);
253
254        let _id = emitter.subscribe(Box::new(move |_event| {
255            count_clone.fetch_add(1, Ordering::SeqCst);
256        }));
257
258        assert_eq!(emitter.subscriber_count(), 1);
259
260        emitter.emit(ClusterEvent::NodeJoined(NodeId("n1".into())));
261        emitter.emit(ClusterEvent::NodeLeft(NodeId("n1".into())));
262
263        assert_eq!(count.load(Ordering::SeqCst), 2);
264    }
265
266    #[test]
267    fn cluster_event_emitter_unsubscribe() {
268        let mut emitter = ClusterEventEmitter::new();
269        let count = Arc::new(AtomicU64::new(0));
270        let count_clone = Arc::clone(&count);
271
272        let id = emitter.subscribe(Box::new(move |_event| {
273            count_clone.fetch_add(1, Ordering::SeqCst);
274        }));
275
276        emitter.emit(ClusterEvent::NodeJoined(NodeId("n1".into())));
277        assert_eq!(count.load(Ordering::SeqCst), 1);
278
279        emitter.unsubscribe(id);
280        assert_eq!(emitter.subscriber_count(), 0);
281
282        emitter.emit(ClusterEvent::NodeJoined(NodeId("n2".into())));
283        assert_eq!(count.load(Ordering::SeqCst), 1); // no change
284    }
285
286    #[test]
287    fn cluster_event_emitter_multiple_subscribers() {
288        let mut emitter = ClusterEventEmitter::new();
289        let count1 = Arc::new(AtomicU64::new(0));
290        let count2 = Arc::new(AtomicU64::new(0));
291        let c1 = Arc::clone(&count1);
292        let c2 = Arc::clone(&count2);
293
294        emitter.subscribe(Box::new(move |_| {
295            c1.fetch_add(1, Ordering::SeqCst);
296        }));
297        emitter.subscribe(Box::new(move |_| {
298            c2.fetch_add(10, Ordering::SeqCst);
299        }));
300
301        emitter.emit(ClusterEvent::NodeJoined(NodeId("n1".into())));
302
303        assert_eq!(count1.load(Ordering::SeqCst), 1);
304        assert_eq!(count2.load(Ordering::SeqCst), 10);
305    }
306
307    #[test]
308    fn cluster_event_emitter_captures_event_data() {
309        let mut emitter = ClusterEventEmitter::new();
310        let captured = Arc::new(std::sync::Mutex::new(Vec::new()));
311        let captured_clone = Arc::clone(&captured);
312
313        emitter.subscribe(Box::new(move |event| {
314            captured_clone.lock().unwrap().push(event);
315        }));
316
317        emitter.emit(ClusterEvent::NodeJoined(NodeId("alpha".into())));
318        emitter.emit(ClusterEvent::NodeLeft(NodeId("beta".into())));
319
320        let events = captured.lock().unwrap();
321        assert_eq!(events.len(), 2);
322        assert_eq!(events[0], ClusterEvent::NodeJoined(NodeId("alpha".into())));
323        assert_eq!(events[1], ClusterEvent::NodeLeft(NodeId("beta".into())));
324    }
325
326    #[test]
327    fn health_status_variants() {
328        let healthy = HealthStatus::Healthy;
329        assert_eq!(healthy, HealthStatus::Healthy);
330
331        let unhealthy = HealthStatus::Unhealthy {
332            reason: "connection refused".into(),
333        };
334        assert!(matches!(unhealthy, HealthStatus::Unhealthy { .. }));
335
336        let timeout = HealthStatus::Timeout;
337        assert_eq!(timeout, HealthStatus::Timeout);
338    }
339
340    #[test]
341    fn subscription_id_from_raw() {
342        let id = SubscriptionId::from_raw(42);
343        assert_eq!(id, SubscriptionId(42));
344    }
345
346    // -- NodeRejected / NodeRejectionReason tests --
347
348    #[test]
349    fn node_rejected_event_construction() {
350        let event = ClusterEvent::NodeRejected {
351            node_id: NodeId("bad-node".into()),
352            reason: NodeRejectionReason::IncompatibleProtocol,
353            detail: "wire 1.0 vs 0.2".into(),
354        };
355        match &event {
356            ClusterEvent::NodeRejected {
357                node_id,
358                reason,
359                detail,
360            } => {
361                assert_eq!(node_id, &NodeId("bad-node".into()));
362                assert_eq!(*reason, NodeRejectionReason::IncompatibleProtocol);
363                assert!(detail.contains("1.0"));
364            }
365            _ => panic!("expected NodeRejected"),
366        }
367    }
368
369    #[test]
370    fn node_rejected_emitted_to_subscribers() {
371        let mut emitter = ClusterEventEmitter::new();
372        let captured = Arc::new(std::sync::Mutex::new(Vec::new()));
373        let captured_clone = Arc::clone(&captured);
374
375        emitter.subscribe(Box::new(move |event| {
376            captured_clone.lock().unwrap().push(event);
377        }));
378
379        emitter.emit(ClusterEvent::NodeRejected {
380            node_id: NodeId("rejected-node".into()),
381            reason: NodeRejectionReason::IncompatibleAdapter,
382            detail: "kameo vs ractor".into(),
383        });
384
385        let events = captured.lock().unwrap();
386        assert_eq!(events.len(), 1);
387        assert!(matches!(
388            &events[0],
389            ClusterEvent::NodeRejected {
390                reason: NodeRejectionReason::IncompatibleAdapter,
391                ..
392            }
393        ));
394    }
395
396    #[test]
397    fn node_rejection_reason_from_handshake_rejection() {
398        use crate::system_actors::RejectionReason;
399
400        let protocol: NodeRejectionReason = RejectionReason::IncompatibleProtocol.into();
401        assert_eq!(protocol, NodeRejectionReason::IncompatibleProtocol);
402
403        let adapter: NodeRejectionReason = RejectionReason::IncompatibleAdapter.into();
404        assert_eq!(adapter, NodeRejectionReason::IncompatibleAdapter);
405    }
406
407    #[test]
408    fn node_rejection_reason_display() {
409        assert_eq!(
410            NodeRejectionReason::IncompatibleProtocol.to_string(),
411            "incompatible wire protocol"
412        );
413        assert_eq!(
414            NodeRejectionReason::IncompatibleAdapter.to_string(),
415            "incompatible adapter"
416        );
417        assert_eq!(
418            NodeRejectionReason::ConnectionFailed.to_string(),
419            "connection failed"
420        );
421    }
422
423    #[test]
424    fn node_rejection_reason_connection_failed() {
425        let event = ClusterEvent::NodeRejected {
426            node_id: NodeId("unreachable".into()),
427            reason: NodeRejectionReason::ConnectionFailed,
428            detail: "transport error: connection refused".into(),
429        };
430        assert!(matches!(
431            event,
432            ClusterEvent::NodeRejected {
433                reason: NodeRejectionReason::ConnectionFailed,
434                ..
435            }
436        ));
437    }
438
439    #[test]
440    fn node_rejected_equality() {
441        let a = ClusterEvent::NodeRejected {
442            node_id: NodeId("n1".into()),
443            reason: NodeRejectionReason::IncompatibleProtocol,
444            detail: "test".into(),
445        };
446        let b = ClusterEvent::NodeRejected {
447            node_id: NodeId("n1".into()),
448            reason: NodeRejectionReason::IncompatibleProtocol,
449            detail: "test".into(),
450        };
451        assert_eq!(a, b);
452    }
453
454}