Skip to main content

atomr_cluster/
events.rs

1//! Cluster events bus.
2//!
3//! Phase 6 of `docs/full-port-plan.md`. Akka.NET parity:
4//! `Cluster/ClusterEvent.cs`. Events are published when membership
5//! transitions, leader changes, or reachability flips. Subscribers
6//! register a per-event-class callback (or a multi-class one via the
7//! [`ClusterEvent`] enum) and receive each event in publish order.
8//!
9//! The bus is a thin `RwLock<Vec<callback>>` rather than an actor
10//! because subscribers are typically a handful of long-lived objects
11//! (telemetry probes, sharding region, pubsub mediator) and the
12//! contention model is "rare write, rare read." Phase 13 may move it
13//! behind a real actor if profiling justifies it.
14
15use std::sync::Arc;
16
17use atomr_core::actor::Address;
18use parking_lot::RwLock;
19use serde::{Deserialize, Serialize};
20
21use crate::member::{Member, MemberStatus};
22
23/// Event variants published on [`ClusterEventBus`].
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
25#[non_exhaustive]
26pub enum ClusterEvent {
27    MemberJoined(Member),
28    MemberUp(Member),
29    MemberLeft(Member),
30    MemberExited(Member),
31    MemberRemoved(Member, MemberStatus),
32    UnreachableMember(Member),
33    ReachableMember(Member),
34    LeaderChanged { from: Option<Address>, to: Option<Address> },
35    ClusterShuttingDown,
36    Convergence(bool),
37}
38
39type Subscriber = Arc<dyn Fn(&ClusterEvent) + Send + Sync + 'static>;
40
41/// In-process cluster events bus.
42#[derive(Default, Clone)]
43pub struct ClusterEventBus {
44    inner: Arc<RwLock<Vec<Subscriber>>>,
45}
46
47impl ClusterEventBus {
48    pub fn new() -> Self {
49        Self::default()
50    }
51
52    /// Register a subscriber that fires on every event. Returns a
53    /// handle whose `Drop` removes the subscription.
54    pub fn subscribe<F>(&self, callback: F) -> SubscriptionHandle
55    where
56        F: Fn(&ClusterEvent) + Send + Sync + 'static,
57    {
58        let cb: Subscriber = Arc::new(callback);
59        let mut subs = self.inner.write();
60        subs.push(cb.clone());
61        SubscriptionHandle {
62            bus: self.inner.clone(),
63            // Use the Arc pointer identity to find this subscription on drop.
64            id: Arc::as_ptr(&cb) as *const () as usize,
65            anchor: cb,
66        }
67    }
68
69    /// Publish an event to all current subscribers, synchronously,
70    /// in registration order. Subscribers must not block.
71    pub fn publish(&self, event: ClusterEvent) {
72        let subs = self.inner.read().clone();
73        for s in &subs {
74            s(&event);
75        }
76    }
77
78    /// Number of currently registered subscribers.
79    pub fn subscriber_count(&self) -> usize {
80        self.inner.read().len()
81    }
82}
83
84/// RAII handle returned by [`ClusterEventBus::subscribe`]. Dropping it
85/// removes the corresponding subscriber.
86pub struct SubscriptionHandle {
87    bus: Arc<RwLock<Vec<Subscriber>>>,
88    id: usize,
89    /// Keeps the `Arc` alive so the pointer identity matches on drop.
90    anchor: Subscriber,
91}
92
93impl Drop for SubscriptionHandle {
94    fn drop(&mut self) {
95        let mut subs = self.bus.write();
96        subs.retain(|s| Arc::as_ptr(s) as *const () as usize != self.id);
97        // anchor goes out of scope after retain, so the Arc count drops.
98        let _ = &self.anchor;
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105    use std::sync::atomic::{AtomicU32, Ordering};
106
107    #[test]
108    fn publish_delivers_to_subscribers() {
109        let bus = ClusterEventBus::new();
110        let n = Arc::new(AtomicU32::new(0));
111        let n2 = n.clone();
112        let _h = bus.subscribe(move |_| {
113            n2.fetch_add(1, Ordering::SeqCst);
114        });
115        bus.publish(ClusterEvent::ClusterShuttingDown);
116        bus.publish(ClusterEvent::Convergence(true));
117        assert_eq!(n.load(Ordering::SeqCst), 2);
118    }
119
120    #[test]
121    fn drop_unsubscribes() {
122        let bus = ClusterEventBus::new();
123        let n = Arc::new(AtomicU32::new(0));
124        let n2 = n.clone();
125        let h = bus.subscribe(move |_| {
126            n2.fetch_add(1, Ordering::SeqCst);
127        });
128        assert_eq!(bus.subscriber_count(), 1);
129        drop(h);
130        assert_eq!(bus.subscriber_count(), 0);
131        bus.publish(ClusterEvent::Convergence(false));
132        assert_eq!(n.load(Ordering::SeqCst), 0);
133    }
134
135    #[test]
136    fn leader_changed_carries_old_and_new() {
137        let bus = ClusterEventBus::new();
138        let captured = Arc::new(parking_lot::Mutex::new(None));
139        let c2 = captured.clone();
140        let _h = bus.subscribe(move |e| {
141            *c2.lock() = Some(e.clone());
142        });
143        bus.publish(ClusterEvent::LeaderChanged { from: None, to: Some(Address::local("a")) });
144        let got = captured.lock().clone();
145        assert!(matches!(got, Some(ClusterEvent::LeaderChanged { from: None, to: Some(_) })));
146    }
147}