use std::sync::Arc;
use atomr_core::actor::Address;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use crate::member::{Member, MemberStatus};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[non_exhaustive]
pub enum ClusterEvent {
MemberJoined(Member),
MemberWeaklyUp(Member),
MemberUp(Member),
MemberLeft(Member),
MemberExited(Member),
MemberDowned(Member),
MemberRemoved(Member, MemberStatus),
UnreachableMember(Member),
ReachableMember(Member),
LeaderChanged {
from: Option<Address>,
to: Option<Address>,
},
ClusterShuttingDown,
Convergence(bool),
}
impl ClusterEvent {
pub fn from_status_transition(member: Member, old: MemberStatus) -> Option<ClusterEvent> {
let new = member.status;
if old == new {
return None;
}
Some(match new {
MemberStatus::Joining => ClusterEvent::MemberJoined(member),
MemberStatus::WeaklyUp => ClusterEvent::MemberWeaklyUp(member),
MemberStatus::Up => ClusterEvent::MemberUp(member),
MemberStatus::Leaving => ClusterEvent::MemberLeft(member),
MemberStatus::Exiting => ClusterEvent::MemberExited(member),
MemberStatus::Down => ClusterEvent::MemberDowned(member),
MemberStatus::Removed => ClusterEvent::MemberRemoved(member, old),
})
}
}
type Subscriber = Arc<dyn Fn(&ClusterEvent) + Send + Sync + 'static>;
#[derive(Default, Clone)]
pub struct ClusterEventBus {
inner: Arc<RwLock<Vec<Subscriber>>>,
}
impl ClusterEventBus {
pub fn new() -> Self {
Self::default()
}
pub fn subscribe<F>(&self, callback: F) -> SubscriptionHandle
where
F: Fn(&ClusterEvent) + Send + Sync + 'static,
{
let cb: Subscriber = Arc::new(callback);
let mut subs = self.inner.write();
subs.push(cb.clone());
SubscriptionHandle {
bus: self.inner.clone(),
id: Arc::as_ptr(&cb) as *const () as usize,
anchor: cb,
}
}
pub fn publish(&self, event: ClusterEvent) {
let subs = self.inner.read().clone();
for s in &subs {
s(&event);
}
}
pub fn subscriber_count(&self) -> usize {
self.inner.read().len()
}
}
pub struct SubscriptionHandle {
bus: Arc<RwLock<Vec<Subscriber>>>,
id: usize,
anchor: Subscriber,
}
impl Drop for SubscriptionHandle {
fn drop(&mut self) {
let mut subs = self.bus.write();
subs.retain(|s| Arc::as_ptr(s) as *const () as usize != self.id);
let _ = &self.anchor;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
#[test]
fn publish_delivers_to_subscribers() {
let bus = ClusterEventBus::new();
let n = Arc::new(AtomicU32::new(0));
let n2 = n.clone();
let _h = bus.subscribe(move |_| {
n2.fetch_add(1, Ordering::SeqCst);
});
bus.publish(ClusterEvent::ClusterShuttingDown);
bus.publish(ClusterEvent::Convergence(true));
assert_eq!(n.load(Ordering::SeqCst), 2);
}
#[test]
fn drop_unsubscribes() {
let bus = ClusterEventBus::new();
let n = Arc::new(AtomicU32::new(0));
let n2 = n.clone();
let h = bus.subscribe(move |_| {
n2.fetch_add(1, Ordering::SeqCst);
});
assert_eq!(bus.subscriber_count(), 1);
drop(h);
assert_eq!(bus.subscriber_count(), 0);
bus.publish(ClusterEvent::Convergence(false));
assert_eq!(n.load(Ordering::SeqCst), 0);
}
#[test]
fn leader_changed_carries_old_and_new() {
let bus = ClusterEventBus::new();
let captured = Arc::new(parking_lot::Mutex::new(None));
let c2 = captured.clone();
let _h = bus.subscribe(move |e| {
*c2.lock() = Some(e.clone());
});
bus.publish(ClusterEvent::LeaderChanged { from: None, to: Some(Address::local("a")) });
let got = captured.lock().clone();
assert!(matches!(got, Some(ClusterEvent::LeaderChanged { from: None, to: Some(_) })));
}
}