use crate::errors::ClusterError;
use crate::node::NodeId;
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub enum ClusterEvent {
NodeJoined(NodeId),
NodeLeft(NodeId),
NodeRejected {
node_id: NodeId,
reason: NodeRejectionReason,
detail: String,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub enum NodeRejectionReason {
IncompatibleProtocol,
IncompatibleAdapter,
ConnectionFailed,
}
impl std::fmt::Display for NodeRejectionReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NodeRejectionReason::IncompatibleProtocol => {
write!(f, "incompatible wire protocol")
}
NodeRejectionReason::IncompatibleAdapter => {
write!(f, "incompatible adapter")
}
NodeRejectionReason::ConnectionFailed => {
write!(f, "connection failed")
}
}
}
}
impl From<crate::system_actors::RejectionReason> for NodeRejectionReason {
fn from(reason: crate::system_actors::RejectionReason) -> Self {
match reason {
crate::system_actors::RejectionReason::IncompatibleProtocol => {
NodeRejectionReason::IncompatibleProtocol
}
crate::system_actors::RejectionReason::IncompatibleAdapter => {
NodeRejectionReason::IncompatibleAdapter
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct SubscriptionId(pub(crate) u64);
impl SubscriptionId {
pub fn from_raw(id: u64) -> Self {
Self(id)
}
}
pub trait ClusterEvents: Send + Sync + 'static {
fn subscribe(
&self,
on_event: Box<dyn Fn(ClusterEvent) + Send + Sync>,
) -> Result<SubscriptionId, ClusterError>;
fn unsubscribe(&self, id: SubscriptionId) -> Result<(), ClusterError>;
}
pub struct ClusterEventEmitter {
next_id: u64,
subscribers: std::collections::HashMap<SubscriptionId, Box<dyn Fn(ClusterEvent) + Send + Sync>>,
}
impl ClusterEventEmitter {
pub fn new() -> Self {
Self {
next_id: 1,
subscribers: std::collections::HashMap::new(),
}
}
pub fn subscribe(
&mut self,
on_event: Box<dyn Fn(ClusterEvent) + Send + Sync>,
) -> SubscriptionId {
let id = SubscriptionId(self.next_id);
self.next_id += 1;
self.subscribers.insert(id, on_event);
id
}
pub fn unsubscribe(&mut self, id: SubscriptionId) {
self.subscribers.remove(&id);
}
pub fn emit(&self, event: ClusterEvent) {
for callback in self.subscribers.values() {
callback(event.clone());
}
}
pub fn subscriber_count(&self) -> usize {
self.subscribers.len()
}
}
impl Default for ClusterEventEmitter {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
pub trait AdapterCluster: Send + Sync + 'static {
async fn connect(&self, node: &NodeId) -> Result<(), ClusterError>;
async fn disconnect(&self, node: &NodeId) -> Result<(), ClusterError>;
async fn reconnect(&self, node: &NodeId) -> Result<(), ClusterError> {
self.disconnect(node).await?;
self.connect(node).await
}
async fn is_reachable(&self, node: &NodeId) -> bool;
async fn connected_nodes(&self) -> Vec<NodeId>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HealthStatus {
Healthy,
Unhealthy {
reason: String,
},
Timeout,
}
#[async_trait::async_trait]
pub trait HealthChecker: Send + Sync + 'static {
async fn check(&self, node: &NodeId) -> HealthStatus;
}
#[async_trait::async_trait]
pub trait UnreachableHandler: Send + Sync + 'static {
async fn on_node_unreachable(&self, node: &NodeId);
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[test]
fn cluster_event_emitter_subscribe_and_emit() {
let mut emitter = ClusterEventEmitter::new();
let count = Arc::new(AtomicU64::new(0));
let count_clone = Arc::clone(&count);
let _id = emitter.subscribe(Box::new(move |_event| {
count_clone.fetch_add(1, Ordering::SeqCst);
}));
assert_eq!(emitter.subscriber_count(), 1);
emitter.emit(ClusterEvent::NodeJoined(NodeId("n1".into())));
emitter.emit(ClusterEvent::NodeLeft(NodeId("n1".into())));
assert_eq!(count.load(Ordering::SeqCst), 2);
}
#[test]
fn cluster_event_emitter_unsubscribe() {
let mut emitter = ClusterEventEmitter::new();
let count = Arc::new(AtomicU64::new(0));
let count_clone = Arc::clone(&count);
let id = emitter.subscribe(Box::new(move |_event| {
count_clone.fetch_add(1, Ordering::SeqCst);
}));
emitter.emit(ClusterEvent::NodeJoined(NodeId("n1".into())));
assert_eq!(count.load(Ordering::SeqCst), 1);
emitter.unsubscribe(id);
assert_eq!(emitter.subscriber_count(), 0);
emitter.emit(ClusterEvent::NodeJoined(NodeId("n2".into())));
assert_eq!(count.load(Ordering::SeqCst), 1); }
#[test]
fn cluster_event_emitter_multiple_subscribers() {
let mut emitter = ClusterEventEmitter::new();
let count1 = Arc::new(AtomicU64::new(0));
let count2 = Arc::new(AtomicU64::new(0));
let c1 = Arc::clone(&count1);
let c2 = Arc::clone(&count2);
emitter.subscribe(Box::new(move |_| {
c1.fetch_add(1, Ordering::SeqCst);
}));
emitter.subscribe(Box::new(move |_| {
c2.fetch_add(10, Ordering::SeqCst);
}));
emitter.emit(ClusterEvent::NodeJoined(NodeId("n1".into())));
assert_eq!(count1.load(Ordering::SeqCst), 1);
assert_eq!(count2.load(Ordering::SeqCst), 10);
}
#[test]
fn cluster_event_emitter_captures_event_data() {
let mut emitter = ClusterEventEmitter::new();
let captured = Arc::new(std::sync::Mutex::new(Vec::new()));
let captured_clone = Arc::clone(&captured);
emitter.subscribe(Box::new(move |event| {
captured_clone.lock().unwrap().push(event);
}));
emitter.emit(ClusterEvent::NodeJoined(NodeId("alpha".into())));
emitter.emit(ClusterEvent::NodeLeft(NodeId("beta".into())));
let events = captured.lock().unwrap();
assert_eq!(events.len(), 2);
assert_eq!(events[0], ClusterEvent::NodeJoined(NodeId("alpha".into())));
assert_eq!(events[1], ClusterEvent::NodeLeft(NodeId("beta".into())));
}
#[test]
fn health_status_variants() {
let healthy = HealthStatus::Healthy;
assert_eq!(healthy, HealthStatus::Healthy);
let unhealthy = HealthStatus::Unhealthy {
reason: "connection refused".into(),
};
assert!(matches!(unhealthy, HealthStatus::Unhealthy { .. }));
let timeout = HealthStatus::Timeout;
assert_eq!(timeout, HealthStatus::Timeout);
}
#[test]
fn subscription_id_from_raw() {
let id = SubscriptionId::from_raw(42);
assert_eq!(id, SubscriptionId(42));
}
#[test]
fn node_rejected_event_construction() {
let event = ClusterEvent::NodeRejected {
node_id: NodeId("bad-node".into()),
reason: NodeRejectionReason::IncompatibleProtocol,
detail: "wire 1.0 vs 0.2".into(),
};
match &event {
ClusterEvent::NodeRejected {
node_id,
reason,
detail,
} => {
assert_eq!(node_id, &NodeId("bad-node".into()));
assert_eq!(*reason, NodeRejectionReason::IncompatibleProtocol);
assert!(detail.contains("1.0"));
}
_ => panic!("expected NodeRejected"),
}
}
#[test]
fn node_rejected_emitted_to_subscribers() {
let mut emitter = ClusterEventEmitter::new();
let captured = Arc::new(std::sync::Mutex::new(Vec::new()));
let captured_clone = Arc::clone(&captured);
emitter.subscribe(Box::new(move |event| {
captured_clone.lock().unwrap().push(event);
}));
emitter.emit(ClusterEvent::NodeRejected {
node_id: NodeId("rejected-node".into()),
reason: NodeRejectionReason::IncompatibleAdapter,
detail: "kameo vs ractor".into(),
});
let events = captured.lock().unwrap();
assert_eq!(events.len(), 1);
assert!(matches!(
&events[0],
ClusterEvent::NodeRejected {
reason: NodeRejectionReason::IncompatibleAdapter,
..
}
));
}
#[test]
fn node_rejection_reason_from_handshake_rejection() {
use crate::system_actors::RejectionReason;
let protocol: NodeRejectionReason = RejectionReason::IncompatibleProtocol.into();
assert_eq!(protocol, NodeRejectionReason::IncompatibleProtocol);
let adapter: NodeRejectionReason = RejectionReason::IncompatibleAdapter.into();
assert_eq!(adapter, NodeRejectionReason::IncompatibleAdapter);
}
#[test]
fn node_rejection_reason_display() {
assert_eq!(
NodeRejectionReason::IncompatibleProtocol.to_string(),
"incompatible wire protocol"
);
assert_eq!(
NodeRejectionReason::IncompatibleAdapter.to_string(),
"incompatible adapter"
);
assert_eq!(
NodeRejectionReason::ConnectionFailed.to_string(),
"connection failed"
);
}
#[test]
fn node_rejection_reason_connection_failed() {
let event = ClusterEvent::NodeRejected {
node_id: NodeId("unreachable".into()),
reason: NodeRejectionReason::ConnectionFailed,
detail: "transport error: connection refused".into(),
};
assert!(matches!(
event,
ClusterEvent::NodeRejected {
reason: NodeRejectionReason::ConnectionFailed,
..
}
));
}
#[test]
fn node_rejected_equality() {
let a = ClusterEvent::NodeRejected {
node_id: NodeId("n1".into()),
reason: NodeRejectionReason::IncompatibleProtocol,
detail: "test".into(),
};
let b = ClusterEvent::NodeRejected {
node_id: NodeId("n1".into()),
reason: NodeRejectionReason::IncompatibleProtocol,
detail: "test".into(),
};
assert_eq!(a, b);
}
}