Skip to main content

rakka_cluster_tools/
cluster_singleton.rs

1//! ClusterSingletonManager / Proxy — one logical actor across the cluster.
2//! akka.net: `Akka.Cluster.Tools/Singleton/`.
3//!
4//! Phase 7.C of `docs/full-port-plan.md`. Handover protocol:
5//!
6//! ```text
7//! Active(here) ── oldest changed ──► HandingOver ── handover ack ──► Inactive
8//! Inactive ────── elected oldest ──► Starting     ── started   ──► Active(here)
9//! Inactive ────── observed remote ──► Active(remote)
10//! ```
11//!
12//! While in `HandingOver` or `Starting`, messages submitted via the
13//! [`ClusterSingletonProxy`] are buffered (up to `buffer_size`) and
14//! flushed once the singleton is `Active` again.
15
16use std::collections::VecDeque;
17use std::sync::Arc;
18
19use parking_lot::RwLock;
20
21use rakka_core::actor::UntypedActorRef;
22
23/// Singleton lifecycle state.
24#[derive(Debug, Clone, PartialEq, Eq)]
25#[non_exhaustive]
26pub enum SingletonState {
27    /// No singleton known yet.
28    Inactive,
29    /// We are about to become the singleton, but haven't started it.
30    Starting,
31    /// The singleton lives at this ref.
32    Active { ref_: UntypedActorRef, here: bool },
33    /// We were the singleton; new oldest member is taking over.
34    HandingOver,
35}
36
37/// Buffered envelope used during handover. The payload is type-erased
38/// (Box<dyn Any>) because the proxy doesn't know the concrete message
39/// type at the API boundary; recipients downcast on flush. Phase 13
40/// will replace this with typed-per-(M) buffering.
41type BufferedMsg = Box<dyn FnOnce(&UntypedActorRef) + Send + 'static>;
42
43/// Decides which node hosts the singleton based on oldest up-member —
44/// a hook is provided so tests can simulate handover without wiring
45/// the full cluster.
46pub struct ClusterSingletonManager {
47    state: RwLock<SingletonState>,
48    buffer: parking_lot::Mutex<VecDeque<BufferedMsg>>,
49    buffer_size: usize,
50    /// Count of messages dropped because the buffer was full.
51    drops: parking_lot::Mutex<u64>,
52}
53
54impl Default for ClusterSingletonManager {
55    fn default() -> Self {
56        Self {
57            state: RwLock::new(SingletonState::Inactive),
58            buffer: parking_lot::Mutex::new(VecDeque::new()),
59            buffer_size: 1_000,
60            drops: parking_lot::Mutex::new(0),
61        }
62    }
63}
64
65impl ClusterSingletonManager {
66    pub fn new() -> Arc<Self> {
67        Arc::new(Self::default())
68    }
69
70    /// Construct with a custom proxy buffer size.
71    pub fn with_buffer_size(size: usize) -> Arc<Self> {
72        Arc::new(Self { buffer_size: size, ..Self::default() })
73    }
74
75    pub fn state(&self) -> SingletonState {
76        self.state.read().clone()
77    }
78
79    /// Mark `r` as the local singleton (we won the election).
80    /// Flushes any messages that were buffered during handover.
81    pub fn set_active_here(&self, r: UntypedActorRef) {
82        *self.state.write() = SingletonState::Active { ref_: r.clone(), here: true };
83        self.flush(&r);
84    }
85
86    /// Mark `r` as the remote singleton (some other node is hosting it).
87    pub fn set_active_remote(&self, r: UntypedActorRef) {
88        *self.state.write() = SingletonState::Active { ref_: r.clone(), here: false };
89        self.flush(&r);
90    }
91
92    /// Begin handover (we used to be Active(here)).
93    pub fn begin_handover(&self) {
94        *self.state.write() = SingletonState::HandingOver;
95    }
96
97    /// Begin starting (we were elected as the new oldest).
98    pub fn begin_starting(&self) {
99        *self.state.write() = SingletonState::Starting;
100    }
101
102    /// Forget the current singleton entirely.
103    pub fn clear(&self) {
104        *self.state.write() = SingletonState::Inactive;
105    }
106
107    pub fn current(&self) -> Option<UntypedActorRef> {
108        match &*self.state.read() {
109            SingletonState::Active { ref_, .. } => Some(ref_.clone()),
110            _ => None,
111        }
112    }
113
114    /// Buffer `deliver` for replay once the singleton becomes
115    /// `Active`. Used by the proxy when the singleton isn't yet
116    /// reachable. Returns `true` if buffered, `false` if the buffer
117    /// was full (in which case the caller can route to DeadLetters).
118    fn buffer_or_deliver<F>(&self, deliver: F) -> bool
119    where
120        F: FnOnce(&UntypedActorRef) + Send + 'static,
121    {
122        if let Some(r) = self.current() {
123            deliver(&r);
124            return true;
125        }
126        let mut q = self.buffer.lock();
127        if q.len() >= self.buffer_size {
128            *self.drops.lock() += 1;
129            return false;
130        }
131        q.push_back(Box::new(deliver));
132        true
133    }
134
135    fn flush(&self, target: &UntypedActorRef) {
136        let mut q = self.buffer.lock();
137        while let Some(deliver) = q.pop_front() {
138            deliver(target);
139        }
140    }
141
142    /// Number of currently-buffered messages (waiting for handover to
143    /// complete).
144    pub fn buffered(&self) -> usize {
145        self.buffer.lock().len()
146    }
147
148    /// Total number of messages dropped due to buffer-full overflow.
149    pub fn drops(&self) -> u64 {
150        *self.drops.lock()
151    }
152}
153
154/// Proxy that routes messages to the current singleton, buffering
155/// during handover.
156pub struct ClusterSingletonProxy {
157    pub manager: Arc<ClusterSingletonManager>,
158}
159
160impl ClusterSingletonProxy {
161    pub fn new(manager: Arc<ClusterSingletonManager>) -> Self {
162        Self { manager }
163    }
164
165    pub fn singleton(&self) -> Option<UntypedActorRef> {
166        self.manager.current()
167    }
168
169    /// Schedule `deliver` against the singleton. If `Active`, runs
170    /// immediately; if `Inactive`/`Starting`/`HandingOver`, buffers
171    /// for replay. Returns `false` if the buffer was full.
172    pub fn send<F>(&self, deliver: F) -> bool
173    where
174        F: FnOnce(&UntypedActorRef) + Send + 'static,
175    {
176        self.manager.buffer_or_deliver(deliver)
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183    use rakka_core::actor::Inbox;
184    use std::sync::atomic::{AtomicU32, Ordering};
185
186    #[test]
187    fn proxy_routes_to_current_singleton() {
188        let mgr = ClusterSingletonManager::new();
189        let inbox = Inbox::<u32>::new("singleton");
190        mgr.set_active_here(inbox.actor_ref().as_untyped());
191        let proxy = ClusterSingletonProxy::new(mgr);
192        assert!(proxy.singleton().is_some());
193    }
194
195    #[test]
196    fn handover_state_transitions() {
197        let mgr = ClusterSingletonManager::new();
198        assert!(matches!(mgr.state(), SingletonState::Inactive));
199        mgr.begin_starting();
200        assert!(matches!(mgr.state(), SingletonState::Starting));
201        let inbox = Inbox::<u32>::new("s");
202        mgr.set_active_here(inbox.actor_ref().as_untyped());
203        assert!(matches!(mgr.state(), SingletonState::Active { here: true, .. }));
204        mgr.begin_handover();
205        assert!(matches!(mgr.state(), SingletonState::HandingOver));
206    }
207
208    #[tokio::test]
209    async fn proxy_buffers_during_handover_and_flushes_after() {
210        let mgr = ClusterSingletonManager::new();
211        let proxy = ClusterSingletonProxy::new(mgr.clone());
212
213        let calls = Arc::new(AtomicU32::new(0));
214        // Send 3 messages while inactive — all buffered.
215        for _ in 0..3 {
216            let c = calls.clone();
217            assert!(proxy.send(move |_r| {
218                c.fetch_add(1, Ordering::SeqCst);
219            }));
220        }
221        assert_eq!(mgr.buffered(), 3);
222        assert_eq!(calls.load(Ordering::SeqCst), 0);
223
224        // Become active → buffer flushes.
225        let inbox = Inbox::<u32>::new("s");
226        mgr.set_active_here(inbox.actor_ref().as_untyped());
227        assert_eq!(mgr.buffered(), 0);
228        assert_eq!(calls.load(Ordering::SeqCst), 3);
229
230        // After active, send delivers immediately.
231        let c2 = calls.clone();
232        proxy.send(move |_| {
233            c2.fetch_add(1, Ordering::SeqCst);
234        });
235        assert_eq!(calls.load(Ordering::SeqCst), 4);
236    }
237
238    #[test]
239    fn full_buffer_drops_and_counts_overflow() {
240        let mgr = ClusterSingletonManager::with_buffer_size(2);
241        let proxy = ClusterSingletonProxy::new(mgr.clone());
242        assert!(proxy.send(|_| {}));
243        assert!(proxy.send(|_| {}));
244        // Third should overflow.
245        assert!(!proxy.send(|_| {}));
246        assert_eq!(mgr.drops(), 1);
247        assert_eq!(mgr.buffered(), 2);
248    }
249
250    #[test]
251    fn set_active_remote_marks_here_false() {
252        let mgr = ClusterSingletonManager::new();
253        let inbox = Inbox::<u32>::new("remote-host");
254        mgr.set_active_remote(inbox.actor_ref().as_untyped());
255        match mgr.state() {
256            SingletonState::Active { here, .. } => assert!(!here),
257            _ => panic!("expected active-remote"),
258        }
259    }
260}