1use std::sync::Arc;
15
16use atomr_core::actor::Address;
17use parking_lot::RwLock;
18use serde::{Deserialize, Serialize};
19
20use crate::member::{Member, MemberStatus};
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24#[non_exhaustive]
25pub enum ClusterEvent {
26 MemberJoined(Member),
27 MemberWeaklyUp(Member),
28 MemberUp(Member),
29 MemberLeft(Member),
30 MemberExited(Member),
31 MemberRemoved(Member, MemberStatus),
32 UnreachableMember(Member),
33 ReachableMember(Member),
34 LeaderChanged { from: Option<Address>, to: Option<Address> },
35 ClusterShuttingDown,
36 Convergence(bool),
37}
38
39impl ClusterEvent {
40 pub fn from_status_transition(member: Member, old: MemberStatus) -> Option<ClusterEvent> {
45 let new = member.status;
46 if old == new {
47 return None;
48 }
49 Some(match new {
50 MemberStatus::Joining => ClusterEvent::MemberJoined(member),
51 MemberStatus::WeaklyUp => ClusterEvent::MemberWeaklyUp(member),
52 MemberStatus::Up => ClusterEvent::MemberUp(member),
53 MemberStatus::Leaving => ClusterEvent::MemberLeft(member),
54 MemberStatus::Exiting => ClusterEvent::MemberExited(member),
55 MemberStatus::Down => ClusterEvent::UnreachableMember(member),
56 MemberStatus::Removed => ClusterEvent::MemberRemoved(member, old),
57 })
58 }
59}
60
61type Subscriber = Arc<dyn Fn(&ClusterEvent) + Send + Sync + 'static>;
62
63#[derive(Default, Clone)]
65pub struct ClusterEventBus {
66 inner: Arc<RwLock<Vec<Subscriber>>>,
67}
68
69impl ClusterEventBus {
70 pub fn new() -> Self {
71 Self::default()
72 }
73
74 pub fn subscribe<F>(&self, callback: F) -> SubscriptionHandle
77 where
78 F: Fn(&ClusterEvent) + Send + Sync + 'static,
79 {
80 let cb: Subscriber = Arc::new(callback);
81 let mut subs = self.inner.write();
82 subs.push(cb.clone());
83 SubscriptionHandle {
84 bus: self.inner.clone(),
85 id: Arc::as_ptr(&cb) as *const () as usize,
87 anchor: cb,
88 }
89 }
90
91 pub fn publish(&self, event: ClusterEvent) {
94 let subs = self.inner.read().clone();
95 for s in &subs {
96 s(&event);
97 }
98 }
99
100 pub fn subscriber_count(&self) -> usize {
102 self.inner.read().len()
103 }
104}
105
106pub struct SubscriptionHandle {
109 bus: Arc<RwLock<Vec<Subscriber>>>,
110 id: usize,
111 anchor: Subscriber,
113}
114
115impl Drop for SubscriptionHandle {
116 fn drop(&mut self) {
117 let mut subs = self.bus.write();
118 subs.retain(|s| Arc::as_ptr(s) as *const () as usize != self.id);
119 let _ = &self.anchor;
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127 use std::sync::atomic::{AtomicU32, Ordering};
128
129 #[test]
130 fn publish_delivers_to_subscribers() {
131 let bus = ClusterEventBus::new();
132 let n = Arc::new(AtomicU32::new(0));
133 let n2 = n.clone();
134 let _h = bus.subscribe(move |_| {
135 n2.fetch_add(1, Ordering::SeqCst);
136 });
137 bus.publish(ClusterEvent::ClusterShuttingDown);
138 bus.publish(ClusterEvent::Convergence(true));
139 assert_eq!(n.load(Ordering::SeqCst), 2);
140 }
141
142 #[test]
143 fn drop_unsubscribes() {
144 let bus = ClusterEventBus::new();
145 let n = Arc::new(AtomicU32::new(0));
146 let n2 = n.clone();
147 let h = bus.subscribe(move |_| {
148 n2.fetch_add(1, Ordering::SeqCst);
149 });
150 assert_eq!(bus.subscriber_count(), 1);
151 drop(h);
152 assert_eq!(bus.subscriber_count(), 0);
153 bus.publish(ClusterEvent::Convergence(false));
154 assert_eq!(n.load(Ordering::SeqCst), 0);
155 }
156
157 #[test]
158 fn leader_changed_carries_old_and_new() {
159 let bus = ClusterEventBus::new();
160 let captured = Arc::new(parking_lot::Mutex::new(None));
161 let c2 = captured.clone();
162 let _h = bus.subscribe(move |e| {
163 *c2.lock() = Some(e.clone());
164 });
165 bus.publish(ClusterEvent::LeaderChanged { from: None, to: Some(Address::local("a")) });
166 let got = captured.lock().clone();
167 assert!(matches!(got, Some(ClusterEvent::LeaderChanged { from: None, to: Some(_) })));
168 }
169}