1use std::sync::Arc;
16
17use atomr_core::actor::Address;
18use parking_lot::RwLock;
19use serde::{Deserialize, Serialize};
20
21use crate::member::{Member, MemberStatus};
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
25#[non_exhaustive]
26pub enum ClusterEvent {
27 MemberJoined(Member),
28 MemberUp(Member),
29 MemberLeft(Member),
30 MemberExited(Member),
31 MemberRemoved(Member, MemberStatus),
32 UnreachableMember(Member),
33 ReachableMember(Member),
34 LeaderChanged { from: Option<Address>, to: Option<Address> },
35 ClusterShuttingDown,
36 Convergence(bool),
37}
38
39type Subscriber = Arc<dyn Fn(&ClusterEvent) + Send + Sync + 'static>;
40
41#[derive(Default, Clone)]
43pub struct ClusterEventBus {
44 inner: Arc<RwLock<Vec<Subscriber>>>,
45}
46
47impl ClusterEventBus {
48 pub fn new() -> Self {
49 Self::default()
50 }
51
52 pub fn subscribe<F>(&self, callback: F) -> SubscriptionHandle
55 where
56 F: Fn(&ClusterEvent) + Send + Sync + 'static,
57 {
58 let cb: Subscriber = Arc::new(callback);
59 let mut subs = self.inner.write();
60 subs.push(cb.clone());
61 SubscriptionHandle {
62 bus: self.inner.clone(),
63 id: Arc::as_ptr(&cb) as *const () as usize,
65 anchor: cb,
66 }
67 }
68
69 pub fn publish(&self, event: ClusterEvent) {
72 let subs = self.inner.read().clone();
73 for s in &subs {
74 s(&event);
75 }
76 }
77
78 pub fn subscriber_count(&self) -> usize {
80 self.inner.read().len()
81 }
82}
83
84pub struct SubscriptionHandle {
87 bus: Arc<RwLock<Vec<Subscriber>>>,
88 id: usize,
89 anchor: Subscriber,
91}
92
93impl Drop for SubscriptionHandle {
94 fn drop(&mut self) {
95 let mut subs = self.bus.write();
96 subs.retain(|s| Arc::as_ptr(s) as *const () as usize != self.id);
97 let _ = &self.anchor;
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105 use std::sync::atomic::{AtomicU32, Ordering};
106
107 #[test]
108 fn publish_delivers_to_subscribers() {
109 let bus = ClusterEventBus::new();
110 let n = Arc::new(AtomicU32::new(0));
111 let n2 = n.clone();
112 let _h = bus.subscribe(move |_| {
113 n2.fetch_add(1, Ordering::SeqCst);
114 });
115 bus.publish(ClusterEvent::ClusterShuttingDown);
116 bus.publish(ClusterEvent::Convergence(true));
117 assert_eq!(n.load(Ordering::SeqCst), 2);
118 }
119
120 #[test]
121 fn drop_unsubscribes() {
122 let bus = ClusterEventBus::new();
123 let n = Arc::new(AtomicU32::new(0));
124 let n2 = n.clone();
125 let h = bus.subscribe(move |_| {
126 n2.fetch_add(1, Ordering::SeqCst);
127 });
128 assert_eq!(bus.subscriber_count(), 1);
129 drop(h);
130 assert_eq!(bus.subscriber_count(), 0);
131 bus.publish(ClusterEvent::Convergence(false));
132 assert_eq!(n.load(Ordering::SeqCst), 0);
133 }
134
135 #[test]
136 fn leader_changed_carries_old_and_new() {
137 let bus = ClusterEventBus::new();
138 let captured = Arc::new(parking_lot::Mutex::new(None));
139 let c2 = captured.clone();
140 let _h = bus.subscribe(move |e| {
141 *c2.lock() = Some(e.clone());
142 });
143 bus.publish(ClusterEvent::LeaderChanged { from: None, to: Some(Address::local("a")) });
144 let got = captured.lock().clone();
145 assert!(matches!(got, Some(ClusterEvent::LeaderChanged { from: None, to: Some(_) })));
146 }
147}