1use crate::errors::ClusterError;
2use crate::node::NodeId;
3
4#[derive(Debug, Clone, PartialEq, Eq)]
9#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
10#[non_exhaustive]
11pub enum ClusterEvent {
12 NodeJoined(NodeId),
14 NodeLeft(NodeId),
16 NodeRejected {
23 node_id: NodeId,
25 reason: NodeRejectionReason,
27 detail: String,
29 },
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
40#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
41#[non_exhaustive]
42pub enum NodeRejectionReason {
43 IncompatibleProtocol,
45 IncompatibleAdapter,
47 ConnectionFailed,
50}
51
52impl std::fmt::Display for NodeRejectionReason {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 match self {
55 NodeRejectionReason::IncompatibleProtocol => {
56 write!(f, "incompatible wire protocol")
57 }
58 NodeRejectionReason::IncompatibleAdapter => {
59 write!(f, "incompatible adapter")
60 }
61 NodeRejectionReason::ConnectionFailed => {
62 write!(f, "connection failed")
63 }
64 }
65 }
66}
67
68impl From<crate::system_actors::RejectionReason> for NodeRejectionReason {
69 fn from(reason: crate::system_actors::RejectionReason) -> Self {
70 match reason {
71 crate::system_actors::RejectionReason::IncompatibleProtocol => {
72 NodeRejectionReason::IncompatibleProtocol
73 }
74 crate::system_actors::RejectionReason::IncompatibleAdapter => {
75 NodeRejectionReason::IncompatibleAdapter
76 }
77 }
78 }
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
84#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
85pub struct SubscriptionId(pub(crate) u64);
86
87impl SubscriptionId {
88 pub fn from_raw(id: u64) -> Self {
92 Self(id)
93 }
94}
95
96pub trait ClusterEvents: Send + Sync + 'static {
98 fn subscribe(
103 &self,
104 on_event: Box<dyn Fn(ClusterEvent) + Send + Sync>,
105 ) -> Result<SubscriptionId, ClusterError>;
106
107 fn unsubscribe(&self, id: SubscriptionId) -> Result<(), ClusterError>;
109}
110
111pub struct ClusterEventEmitter {
121 next_id: u64,
122 subscribers: std::collections::HashMap<SubscriptionId, Box<dyn Fn(ClusterEvent) + Send + Sync>>,
123}
124
125impl ClusterEventEmitter {
126 pub fn new() -> Self {
128 Self {
129 next_id: 1,
130 subscribers: std::collections::HashMap::new(),
131 }
132 }
133
134 pub fn subscribe(
136 &mut self,
137 on_event: Box<dyn Fn(ClusterEvent) + Send + Sync>,
138 ) -> SubscriptionId {
139 let id = SubscriptionId(self.next_id);
140 self.next_id += 1;
141 self.subscribers.insert(id, on_event);
142 id
143 }
144
145 pub fn unsubscribe(&mut self, id: SubscriptionId) {
147 self.subscribers.remove(&id);
148 }
149
150 pub fn emit(&self, event: ClusterEvent) {
152 for callback in self.subscribers.values() {
153 callback(event.clone());
154 }
155 }
156
157 pub fn subscriber_count(&self) -> usize {
159 self.subscribers.len()
160 }
161}
162
163impl Default for ClusterEventEmitter {
164 fn default() -> Self {
165 Self::new()
166 }
167}
168
169#[async_trait::async_trait]
179pub trait AdapterCluster: Send + Sync + 'static {
180 async fn connect(&self, node: &NodeId) -> Result<(), ClusterError>;
183
184 async fn disconnect(&self, node: &NodeId) -> Result<(), ClusterError>;
187
188 async fn reconnect(&self, node: &NodeId) -> Result<(), ClusterError> {
191 self.disconnect(node).await?;
192 self.connect(node).await
193 }
194
195 async fn is_reachable(&self, node: &NodeId) -> bool;
197
198 async fn connected_nodes(&self) -> Vec<NodeId>;
200}
201
202#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum HealthStatus {
209 Healthy,
211 Unhealthy {
213 reason: String,
215 },
216 Timeout,
218}
219
220#[async_trait::async_trait]
225pub trait HealthChecker: Send + Sync + 'static {
226 async fn check(&self, node: &NodeId) -> HealthStatus;
228}
229
230#[async_trait::async_trait]
233pub trait UnreachableHandler: Send + Sync + 'static {
234 async fn on_node_unreachable(&self, node: &NodeId);
236}
237
238#[cfg(test)]
243mod tests {
244 use super::*;
245 use std::sync::atomic::{AtomicU64, Ordering};
246 use std::sync::Arc;
247
248 #[test]
249 fn cluster_event_emitter_subscribe_and_emit() {
250 let mut emitter = ClusterEventEmitter::new();
251 let count = Arc::new(AtomicU64::new(0));
252 let count_clone = Arc::clone(&count);
253
254 let _id = emitter.subscribe(Box::new(move |_event| {
255 count_clone.fetch_add(1, Ordering::SeqCst);
256 }));
257
258 assert_eq!(emitter.subscriber_count(), 1);
259
260 emitter.emit(ClusterEvent::NodeJoined(NodeId("n1".into())));
261 emitter.emit(ClusterEvent::NodeLeft(NodeId("n1".into())));
262
263 assert_eq!(count.load(Ordering::SeqCst), 2);
264 }
265
266 #[test]
267 fn cluster_event_emitter_unsubscribe() {
268 let mut emitter = ClusterEventEmitter::new();
269 let count = Arc::new(AtomicU64::new(0));
270 let count_clone = Arc::clone(&count);
271
272 let id = emitter.subscribe(Box::new(move |_event| {
273 count_clone.fetch_add(1, Ordering::SeqCst);
274 }));
275
276 emitter.emit(ClusterEvent::NodeJoined(NodeId("n1".into())));
277 assert_eq!(count.load(Ordering::SeqCst), 1);
278
279 emitter.unsubscribe(id);
280 assert_eq!(emitter.subscriber_count(), 0);
281
282 emitter.emit(ClusterEvent::NodeJoined(NodeId("n2".into())));
283 assert_eq!(count.load(Ordering::SeqCst), 1); }
285
286 #[test]
287 fn cluster_event_emitter_multiple_subscribers() {
288 let mut emitter = ClusterEventEmitter::new();
289 let count1 = Arc::new(AtomicU64::new(0));
290 let count2 = Arc::new(AtomicU64::new(0));
291 let c1 = Arc::clone(&count1);
292 let c2 = Arc::clone(&count2);
293
294 emitter.subscribe(Box::new(move |_| {
295 c1.fetch_add(1, Ordering::SeqCst);
296 }));
297 emitter.subscribe(Box::new(move |_| {
298 c2.fetch_add(10, Ordering::SeqCst);
299 }));
300
301 emitter.emit(ClusterEvent::NodeJoined(NodeId("n1".into())));
302
303 assert_eq!(count1.load(Ordering::SeqCst), 1);
304 assert_eq!(count2.load(Ordering::SeqCst), 10);
305 }
306
307 #[test]
308 fn cluster_event_emitter_captures_event_data() {
309 let mut emitter = ClusterEventEmitter::new();
310 let captured = Arc::new(std::sync::Mutex::new(Vec::new()));
311 let captured_clone = Arc::clone(&captured);
312
313 emitter.subscribe(Box::new(move |event| {
314 captured_clone.lock().unwrap().push(event);
315 }));
316
317 emitter.emit(ClusterEvent::NodeJoined(NodeId("alpha".into())));
318 emitter.emit(ClusterEvent::NodeLeft(NodeId("beta".into())));
319
320 let events = captured.lock().unwrap();
321 assert_eq!(events.len(), 2);
322 assert_eq!(events[0], ClusterEvent::NodeJoined(NodeId("alpha".into())));
323 assert_eq!(events[1], ClusterEvent::NodeLeft(NodeId("beta".into())));
324 }
325
326 #[test]
327 fn health_status_variants() {
328 let healthy = HealthStatus::Healthy;
329 assert_eq!(healthy, HealthStatus::Healthy);
330
331 let unhealthy = HealthStatus::Unhealthy {
332 reason: "connection refused".into(),
333 };
334 assert!(matches!(unhealthy, HealthStatus::Unhealthy { .. }));
335
336 let timeout = HealthStatus::Timeout;
337 assert_eq!(timeout, HealthStatus::Timeout);
338 }
339
340 #[test]
341 fn subscription_id_from_raw() {
342 let id = SubscriptionId::from_raw(42);
343 assert_eq!(id, SubscriptionId(42));
344 }
345
346 #[test]
349 fn node_rejected_event_construction() {
350 let event = ClusterEvent::NodeRejected {
351 node_id: NodeId("bad-node".into()),
352 reason: NodeRejectionReason::IncompatibleProtocol,
353 detail: "wire 1.0 vs 0.2".into(),
354 };
355 match &event {
356 ClusterEvent::NodeRejected {
357 node_id,
358 reason,
359 detail,
360 } => {
361 assert_eq!(node_id, &NodeId("bad-node".into()));
362 assert_eq!(*reason, NodeRejectionReason::IncompatibleProtocol);
363 assert!(detail.contains("1.0"));
364 }
365 _ => panic!("expected NodeRejected"),
366 }
367 }
368
369 #[test]
370 fn node_rejected_emitted_to_subscribers() {
371 let mut emitter = ClusterEventEmitter::new();
372 let captured = Arc::new(std::sync::Mutex::new(Vec::new()));
373 let captured_clone = Arc::clone(&captured);
374
375 emitter.subscribe(Box::new(move |event| {
376 captured_clone.lock().unwrap().push(event);
377 }));
378
379 emitter.emit(ClusterEvent::NodeRejected {
380 node_id: NodeId("rejected-node".into()),
381 reason: NodeRejectionReason::IncompatibleAdapter,
382 detail: "kameo vs ractor".into(),
383 });
384
385 let events = captured.lock().unwrap();
386 assert_eq!(events.len(), 1);
387 assert!(matches!(
388 &events[0],
389 ClusterEvent::NodeRejected {
390 reason: NodeRejectionReason::IncompatibleAdapter,
391 ..
392 }
393 ));
394 }
395
396 #[test]
397 fn node_rejection_reason_from_handshake_rejection() {
398 use crate::system_actors::RejectionReason;
399
400 let protocol: NodeRejectionReason = RejectionReason::IncompatibleProtocol.into();
401 assert_eq!(protocol, NodeRejectionReason::IncompatibleProtocol);
402
403 let adapter: NodeRejectionReason = RejectionReason::IncompatibleAdapter.into();
404 assert_eq!(adapter, NodeRejectionReason::IncompatibleAdapter);
405 }
406
407 #[test]
408 fn node_rejection_reason_display() {
409 assert_eq!(
410 NodeRejectionReason::IncompatibleProtocol.to_string(),
411 "incompatible wire protocol"
412 );
413 assert_eq!(
414 NodeRejectionReason::IncompatibleAdapter.to_string(),
415 "incompatible adapter"
416 );
417 assert_eq!(
418 NodeRejectionReason::ConnectionFailed.to_string(),
419 "connection failed"
420 );
421 }
422
423 #[test]
424 fn node_rejection_reason_connection_failed() {
425 let event = ClusterEvent::NodeRejected {
426 node_id: NodeId("unreachable".into()),
427 reason: NodeRejectionReason::ConnectionFailed,
428 detail: "transport error: connection refused".into(),
429 };
430 assert!(matches!(
431 event,
432 ClusterEvent::NodeRejected {
433 reason: NodeRejectionReason::ConnectionFailed,
434 ..
435 }
436 ));
437 }
438
439 #[test]
440 fn node_rejected_equality() {
441 let a = ClusterEvent::NodeRejected {
442 node_id: NodeId("n1".into()),
443 reason: NodeRejectionReason::IncompatibleProtocol,
444 detail: "test".into(),
445 };
446 let b = ClusterEvent::NodeRejected {
447 node_id: NodeId("n1".into()),
448 reason: NodeRejectionReason::IncompatibleProtocol,
449 detail: "test".into(),
450 };
451 assert_eq!(a, b);
452 }
453
454}