Skip to main content

atomr_cluster_tools/
cluster_singleton.rs

1//! ClusterSingletonManager / Proxy — one logical actor across the cluster.
2//!
3//! Phase 7.C of `docs/full-port-plan.md`. Handover protocol:
4//!
5//! ```text
6//! Active(here) ── oldest changed ──► HandingOver ── handover ack ──► Inactive
7//! Inactive ────── elected oldest ──► Starting     ── started   ──► Active(here)
8//! Inactive ────── observed remote ──► Active(remote)
9//! ```
10//!
11//! While in `HandingOver` or `Starting`, messages submitted via the
12//! [`ClusterSingletonProxy`] are buffered (up to `buffer_size`) and
13//! flushed once the singleton is `Active` again.
14
15use std::collections::VecDeque;
16use std::sync::Arc;
17
18use parking_lot::RwLock;
19
20use atomr_core::actor::UntypedActorRef;
21
22/// Singleton lifecycle state.
23#[derive(Debug, Clone, PartialEq, Eq)]
24#[non_exhaustive]
25pub enum SingletonState {
26    /// No singleton known yet.
27    Inactive,
28    /// We are about to become the singleton, but haven't started it.
29    Starting,
30    /// The singleton lives at this ref.
31    Active { ref_: UntypedActorRef, here: bool },
32    /// We were the singleton; new oldest member is taking over.
33    HandingOver,
34}
35
36/// Buffered envelope used during handover. The payload is type-erased
37/// (Box<dyn Any>) because the proxy doesn't know the concrete message
38/// type at the API boundary; recipients downcast on flush. Phase 13
39/// will replace this with typed-per-(M) buffering.
40type BufferedMsg = Box<dyn FnOnce(&UntypedActorRef) + Send + 'static>;
41
42/// Decides which node hosts the singleton based on oldest up-member —
43/// a hook is provided so tests can simulate handover without wiring
44/// the full cluster.
45pub struct ClusterSingletonManager {
46    state: RwLock<SingletonState>,
47    buffer: parking_lot::Mutex<VecDeque<BufferedMsg>>,
48    buffer_size: usize,
49    /// Count of messages dropped because the buffer was full.
50    drops: parking_lot::Mutex<u64>,
51}
52
53impl Default for ClusterSingletonManager {
54    fn default() -> Self {
55        Self {
56            state: RwLock::new(SingletonState::Inactive),
57            buffer: parking_lot::Mutex::new(VecDeque::new()),
58            buffer_size: 1_000,
59            drops: parking_lot::Mutex::new(0),
60        }
61    }
62}
63
64impl ClusterSingletonManager {
65    pub fn new() -> Arc<Self> {
66        Arc::new(Self::default())
67    }
68
69    /// Construct with a custom proxy buffer size.
70    pub fn with_buffer_size(size: usize) -> Arc<Self> {
71        Arc::new(Self { buffer_size: size, ..Self::default() })
72    }
73
74    pub fn state(&self) -> SingletonState {
75        self.state.read().clone()
76    }
77
78    /// Mark `r` as the local singleton (we won the election).
79    /// Flushes any messages that were buffered during handover.
80    pub fn set_active_here(&self, r: UntypedActorRef) {
81        *self.state.write() = SingletonState::Active { ref_: r.clone(), here: true };
82        self.flush(&r);
83    }
84
85    /// Mark `r` as the remote singleton (some other node is hosting it).
86    pub fn set_active_remote(&self, r: UntypedActorRef) {
87        *self.state.write() = SingletonState::Active { ref_: r.clone(), here: false };
88        self.flush(&r);
89    }
90
91    /// Begin handover (we used to be Active(here)).
92    pub fn begin_handover(&self) {
93        *self.state.write() = SingletonState::HandingOver;
94    }
95
96    /// Begin starting (we were elected as the new oldest).
97    pub fn begin_starting(&self) {
98        *self.state.write() = SingletonState::Starting;
99    }
100
101    /// Forget the current singleton entirely.
102    pub fn clear(&self) {
103        *self.state.write() = SingletonState::Inactive;
104    }
105
106    pub fn current(&self) -> Option<UntypedActorRef> {
107        match &*self.state.read() {
108            SingletonState::Active { ref_, .. } => Some(ref_.clone()),
109            _ => None,
110        }
111    }
112
113    /// Buffer `deliver` for replay once the singleton becomes
114    /// `Active`. Used by the proxy when the singleton isn't yet
115    /// reachable. Returns `true` if buffered, `false` if the buffer
116    /// was full (in which case the caller can route to DeadLetters).
117    fn buffer_or_deliver<F>(&self, deliver: F) -> bool
118    where
119        F: FnOnce(&UntypedActorRef) + Send + 'static,
120    {
121        if let Some(r) = self.current() {
122            deliver(&r);
123            return true;
124        }
125        let mut q = self.buffer.lock();
126        if q.len() >= self.buffer_size {
127            *self.drops.lock() += 1;
128            return false;
129        }
130        q.push_back(Box::new(deliver));
131        true
132    }
133
134    fn flush(&self, target: &UntypedActorRef) {
135        let mut q = self.buffer.lock();
136        while let Some(deliver) = q.pop_front() {
137            deliver(target);
138        }
139    }
140
141    /// Number of currently-buffered messages (waiting for handover to
142    /// complete).
143    pub fn buffered(&self) -> usize {
144        self.buffer.lock().len()
145    }
146
147    /// Total number of messages dropped due to buffer-full overflow.
148    pub fn drops(&self) -> u64 {
149        *self.drops.lock()
150    }
151}
152
153/// Proxy that routes messages to the current singleton, buffering
154/// during handover.
155pub struct ClusterSingletonProxy {
156    pub manager: Arc<ClusterSingletonManager>,
157}
158
159impl ClusterSingletonProxy {
160    pub fn new(manager: Arc<ClusterSingletonManager>) -> Self {
161        Self { manager }
162    }
163
164    pub fn singleton(&self) -> Option<UntypedActorRef> {
165        self.manager.current()
166    }
167
168    /// Schedule `deliver` against the singleton. If `Active`, runs
169    /// immediately; if `Inactive`/`Starting`/`HandingOver`, buffers
170    /// for replay. Returns `false` if the buffer was full.
171    pub fn send<F>(&self, deliver: F) -> bool
172    where
173        F: FnOnce(&UntypedActorRef) + Send + 'static,
174    {
175        self.manager.buffer_or_deliver(deliver)
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use atomr_core::actor::Inbox;
183    use std::sync::atomic::{AtomicU32, Ordering};
184
185    #[test]
186    fn proxy_routes_to_current_singleton() {
187        let mgr = ClusterSingletonManager::new();
188        let inbox = Inbox::<u32>::new("singleton");
189        mgr.set_active_here(inbox.actor_ref().as_untyped());
190        let proxy = ClusterSingletonProxy::new(mgr);
191        assert!(proxy.singleton().is_some());
192    }
193
194    #[test]
195    fn handover_state_transitions() {
196        let mgr = ClusterSingletonManager::new();
197        assert!(matches!(mgr.state(), SingletonState::Inactive));
198        mgr.begin_starting();
199        assert!(matches!(mgr.state(), SingletonState::Starting));
200        let inbox = Inbox::<u32>::new("s");
201        mgr.set_active_here(inbox.actor_ref().as_untyped());
202        assert!(matches!(mgr.state(), SingletonState::Active { here: true, .. }));
203        mgr.begin_handover();
204        assert!(matches!(mgr.state(), SingletonState::HandingOver));
205    }
206
207    #[tokio::test]
208    async fn proxy_buffers_during_handover_and_flushes_after() {
209        let mgr = ClusterSingletonManager::new();
210        let proxy = ClusterSingletonProxy::new(mgr.clone());
211
212        let calls = Arc::new(AtomicU32::new(0));
213        // Send 3 messages while inactive — all buffered.
214        for _ in 0..3 {
215            let c = calls.clone();
216            assert!(proxy.send(move |_r| {
217                c.fetch_add(1, Ordering::SeqCst);
218            }));
219        }
220        assert_eq!(mgr.buffered(), 3);
221        assert_eq!(calls.load(Ordering::SeqCst), 0);
222
223        // Become active → buffer flushes.
224        let inbox = Inbox::<u32>::new("s");
225        mgr.set_active_here(inbox.actor_ref().as_untyped());
226        assert_eq!(mgr.buffered(), 0);
227        assert_eq!(calls.load(Ordering::SeqCst), 3);
228
229        // After active, send delivers immediately.
230        let c2 = calls.clone();
231        proxy.send(move |_| {
232            c2.fetch_add(1, Ordering::SeqCst);
233        });
234        assert_eq!(calls.load(Ordering::SeqCst), 4);
235    }
236
237    #[test]
238    fn full_buffer_drops_and_counts_overflow() {
239        let mgr = ClusterSingletonManager::with_buffer_size(2);
240        let proxy = ClusterSingletonProxy::new(mgr.clone());
241        assert!(proxy.send(|_| {}));
242        assert!(proxy.send(|_| {}));
243        // Third should overflow.
244        assert!(!proxy.send(|_| {}));
245        assert_eq!(mgr.drops(), 1);
246        assert_eq!(mgr.buffered(), 2);
247    }
248
249    #[test]
250    fn set_active_remote_marks_here_false() {
251        let mgr = ClusterSingletonManager::new();
252        let inbox = Inbox::<u32>::new("remote-host");
253        mgr.set_active_remote(inbox.actor_ref().as_untyped());
254        match mgr.state() {
255            SingletonState::Active { here, .. } => assert!(!here),
256            _ => panic!("expected active-remote"),
257        }
258    }
259}