Skip to main content

atomr_cluster/
events.rs

1//! Cluster events bus.
2//!
3//! Events are published when membership transitions, leader changes, or
4//! reachability flips. Subscribers register a per-event-class callback (or
5//! a multi-class one via the [`ClusterEvent`] enum) and receive each event
6//! in publish order.
7//!
8//! The bus is a thin `RwLock<Vec<callback>>` rather than an actor
9//! because subscribers are typically a handful of long-lived objects
10//! (telemetry probes, sharding region, pubsub mediator) and the
11//! contention model is "rare write, rare read." Phase 13 may move it
12//! behind a real actor if profiling justifies it.
13
14use std::sync::Arc;
15
16use atomr_core::actor::Address;
17use parking_lot::RwLock;
18use serde::{Deserialize, Serialize};
19
20use crate::member::{Member, MemberStatus};
21
22/// Event variants published on [`ClusterEventBus`].
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24#[non_exhaustive]
25pub enum ClusterEvent {
26    MemberJoined(Member),
27    MemberWeaklyUp(Member),
28    MemberUp(Member),
29    MemberLeft(Member),
30    MemberExited(Member),
31    /// Emitted when a member transitions to `Down` via an explicit
32    /// `DaemonCmd::Down` (the operator-initiated downing path). The
33    /// member is still present in `MembershipState`; the next leader
34    /// tick promotes it to `Removed` and emits `MemberRemoved`.
35    MemberDowned(Member),
36    MemberRemoved(Member, MemberStatus),
37    UnreachableMember(Member),
38    ReachableMember(Member),
39    LeaderChanged {
40        from: Option<Address>,
41        to: Option<Address>,
42    },
43    ClusterShuttingDown,
44    Convergence(bool),
45}
46
47impl ClusterEvent {
48    /// Translate a status transition `(old, new)` into the
49    /// corresponding `ClusterEvent`, mirroring
50    /// `ClusterCoreDaemon` event-emission rules. Returns `None` when
51    /// the transition is a no-op (`old == new`).
52    pub fn from_status_transition(member: Member, old: MemberStatus) -> Option<ClusterEvent> {
53        let new = member.status;
54        if old == new {
55            return None;
56        }
57        Some(match new {
58            MemberStatus::Joining => ClusterEvent::MemberJoined(member),
59            MemberStatus::WeaklyUp => ClusterEvent::MemberWeaklyUp(member),
60            MemberStatus::Up => ClusterEvent::MemberUp(member),
61            MemberStatus::Leaving => ClusterEvent::MemberLeft(member),
62            MemberStatus::Exiting => ClusterEvent::MemberExited(member),
63            MemberStatus::Down => ClusterEvent::MemberDowned(member),
64            MemberStatus::Removed => ClusterEvent::MemberRemoved(member, old),
65        })
66    }
67}
68
69type Subscriber = Arc<dyn Fn(&ClusterEvent) + Send + Sync + 'static>;
70
71/// In-process cluster events bus.
72#[derive(Default, Clone)]
73pub struct ClusterEventBus {
74    inner: Arc<RwLock<Vec<Subscriber>>>,
75}
76
77impl ClusterEventBus {
78    pub fn new() -> Self {
79        Self::default()
80    }
81
82    /// Register a subscriber that fires on every event. Returns a
83    /// handle whose `Drop` removes the subscription.
84    pub fn subscribe<F>(&self, callback: F) -> SubscriptionHandle
85    where
86        F: Fn(&ClusterEvent) + Send + Sync + 'static,
87    {
88        let cb: Subscriber = Arc::new(callback);
89        let mut subs = self.inner.write();
90        subs.push(cb.clone());
91        SubscriptionHandle {
92            bus: self.inner.clone(),
93            // Use the Arc pointer identity to find this subscription on drop.
94            id: Arc::as_ptr(&cb) as *const () as usize,
95            anchor: cb,
96        }
97    }
98
99    /// Publish an event to all current subscribers, synchronously,
100    /// in registration order. Subscribers must not block.
101    pub fn publish(&self, event: ClusterEvent) {
102        let subs = self.inner.read().clone();
103        for s in &subs {
104            s(&event);
105        }
106    }
107
108    /// Number of currently registered subscribers.
109    pub fn subscriber_count(&self) -> usize {
110        self.inner.read().len()
111    }
112}
113
114/// RAII handle returned by [`ClusterEventBus::subscribe`]. Dropping it
115/// removes the corresponding subscriber.
116pub struct SubscriptionHandle {
117    bus: Arc<RwLock<Vec<Subscriber>>>,
118    id: usize,
119    /// Keeps the `Arc` alive so the pointer identity matches on drop.
120    anchor: Subscriber,
121}
122
123impl Drop for SubscriptionHandle {
124    fn drop(&mut self) {
125        let mut subs = self.bus.write();
126        subs.retain(|s| Arc::as_ptr(s) as *const () as usize != self.id);
127        // anchor goes out of scope after retain, so the Arc count drops.
128        let _ = &self.anchor;
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use std::sync::atomic::{AtomicU32, Ordering};
136
137    #[test]
138    fn publish_delivers_to_subscribers() {
139        let bus = ClusterEventBus::new();
140        let n = Arc::new(AtomicU32::new(0));
141        let n2 = n.clone();
142        let _h = bus.subscribe(move |_| {
143            n2.fetch_add(1, Ordering::SeqCst);
144        });
145        bus.publish(ClusterEvent::ClusterShuttingDown);
146        bus.publish(ClusterEvent::Convergence(true));
147        assert_eq!(n.load(Ordering::SeqCst), 2);
148    }
149
150    #[test]
151    fn drop_unsubscribes() {
152        let bus = ClusterEventBus::new();
153        let n = Arc::new(AtomicU32::new(0));
154        let n2 = n.clone();
155        let h = bus.subscribe(move |_| {
156            n2.fetch_add(1, Ordering::SeqCst);
157        });
158        assert_eq!(bus.subscriber_count(), 1);
159        drop(h);
160        assert_eq!(bus.subscriber_count(), 0);
161        bus.publish(ClusterEvent::Convergence(false));
162        assert_eq!(n.load(Ordering::SeqCst), 0);
163    }
164
165    #[test]
166    fn leader_changed_carries_old_and_new() {
167        let bus = ClusterEventBus::new();
168        let captured = Arc::new(parking_lot::Mutex::new(None));
169        let c2 = captured.clone();
170        let _h = bus.subscribe(move |e| {
171            *c2.lock() = Some(e.clone());
172        });
173        bus.publish(ClusterEvent::LeaderChanged { from: None, to: Some(Address::local("a")) });
174        let got = captured.lock().clone();
175        assert!(matches!(got, Some(ClusterEvent::LeaderChanged { from: None, to: Some(_) })));
176    }
177}