1use std::sync::Arc;
15
16use atomr_core::actor::Address;
17use parking_lot::RwLock;
18use serde::{Deserialize, Serialize};
19
20use crate::member::{Member, MemberStatus};
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24#[non_exhaustive]
25pub enum ClusterEvent {
26 MemberJoined(Member),
27 MemberWeaklyUp(Member),
28 MemberUp(Member),
29 MemberLeft(Member),
30 MemberExited(Member),
31 MemberDowned(Member),
36 MemberRemoved(Member, MemberStatus),
37 UnreachableMember(Member),
38 ReachableMember(Member),
39 LeaderChanged {
40 from: Option<Address>,
41 to: Option<Address>,
42 },
43 ClusterShuttingDown,
44 Convergence(bool),
45}
46
47impl ClusterEvent {
48 pub fn from_status_transition(member: Member, old: MemberStatus) -> Option<ClusterEvent> {
53 let new = member.status;
54 if old == new {
55 return None;
56 }
57 Some(match new {
58 MemberStatus::Joining => ClusterEvent::MemberJoined(member),
59 MemberStatus::WeaklyUp => ClusterEvent::MemberWeaklyUp(member),
60 MemberStatus::Up => ClusterEvent::MemberUp(member),
61 MemberStatus::Leaving => ClusterEvent::MemberLeft(member),
62 MemberStatus::Exiting => ClusterEvent::MemberExited(member),
63 MemberStatus::Down => ClusterEvent::MemberDowned(member),
64 MemberStatus::Removed => ClusterEvent::MemberRemoved(member, old),
65 })
66 }
67}
68
69type Subscriber = Arc<dyn Fn(&ClusterEvent) + Send + Sync + 'static>;
70
71#[derive(Default, Clone)]
73pub struct ClusterEventBus {
74 inner: Arc<RwLock<Vec<Subscriber>>>,
75}
76
77impl ClusterEventBus {
78 pub fn new() -> Self {
79 Self::default()
80 }
81
82 pub fn subscribe<F>(&self, callback: F) -> SubscriptionHandle
85 where
86 F: Fn(&ClusterEvent) + Send + Sync + 'static,
87 {
88 let cb: Subscriber = Arc::new(callback);
89 let mut subs = self.inner.write();
90 subs.push(cb.clone());
91 SubscriptionHandle {
92 bus: self.inner.clone(),
93 id: Arc::as_ptr(&cb) as *const () as usize,
95 anchor: cb,
96 }
97 }
98
99 pub fn publish(&self, event: ClusterEvent) {
102 let subs = self.inner.read().clone();
103 for s in &subs {
104 s(&event);
105 }
106 }
107
108 pub fn subscriber_count(&self) -> usize {
110 self.inner.read().len()
111 }
112}
113
114pub struct SubscriptionHandle {
117 bus: Arc<RwLock<Vec<Subscriber>>>,
118 id: usize,
119 anchor: Subscriber,
121}
122
123impl Drop for SubscriptionHandle {
124 fn drop(&mut self) {
125 let mut subs = self.bus.write();
126 subs.retain(|s| Arc::as_ptr(s) as *const () as usize != self.id);
127 let _ = &self.anchor;
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use std::sync::atomic::{AtomicU32, Ordering};
136
137 #[test]
138 fn publish_delivers_to_subscribers() {
139 let bus = ClusterEventBus::new();
140 let n = Arc::new(AtomicU32::new(0));
141 let n2 = n.clone();
142 let _h = bus.subscribe(move |_| {
143 n2.fetch_add(1, Ordering::SeqCst);
144 });
145 bus.publish(ClusterEvent::ClusterShuttingDown);
146 bus.publish(ClusterEvent::Convergence(true));
147 assert_eq!(n.load(Ordering::SeqCst), 2);
148 }
149
150 #[test]
151 fn drop_unsubscribes() {
152 let bus = ClusterEventBus::new();
153 let n = Arc::new(AtomicU32::new(0));
154 let n2 = n.clone();
155 let h = bus.subscribe(move |_| {
156 n2.fetch_add(1, Ordering::SeqCst);
157 });
158 assert_eq!(bus.subscriber_count(), 1);
159 drop(h);
160 assert_eq!(bus.subscriber_count(), 0);
161 bus.publish(ClusterEvent::Convergence(false));
162 assert_eq!(n.load(Ordering::SeqCst), 0);
163 }
164
165 #[test]
166 fn leader_changed_carries_old_and_new() {
167 let bus = ClusterEventBus::new();
168 let captured = Arc::new(parking_lot::Mutex::new(None));
169 let c2 = captured.clone();
170 let _h = bus.subscribe(move |e| {
171 *c2.lock() = Some(e.clone());
172 });
173 bus.publish(ClusterEvent::LeaderChanged { from: None, to: Some(Address::local("a")) });
174 let got = captured.lock().clone();
175 assert!(matches!(got, Some(ClusterEvent::LeaderChanged { from: None, to: Some(_) })));
176 }
177}