rakka_cluster_tools/
cluster_singleton.rs1use std::collections::VecDeque;
17use std::sync::Arc;
18
19use parking_lot::RwLock;
20
21use rakka_core::actor::UntypedActorRef;
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25#[non_exhaustive]
26pub enum SingletonState {
27 Inactive,
29 Starting,
31 Active { ref_: UntypedActorRef, here: bool },
33 HandingOver,
35}
36
37type BufferedMsg = Box<dyn FnOnce(&UntypedActorRef) + Send + 'static>;
42
43pub struct ClusterSingletonManager {
47 state: RwLock<SingletonState>,
48 buffer: parking_lot::Mutex<VecDeque<BufferedMsg>>,
49 buffer_size: usize,
50 drops: parking_lot::Mutex<u64>,
52}
53
54impl Default for ClusterSingletonManager {
55 fn default() -> Self {
56 Self {
57 state: RwLock::new(SingletonState::Inactive),
58 buffer: parking_lot::Mutex::new(VecDeque::new()),
59 buffer_size: 1_000,
60 drops: parking_lot::Mutex::new(0),
61 }
62 }
63}
64
65impl ClusterSingletonManager {
66 pub fn new() -> Arc<Self> {
67 Arc::new(Self::default())
68 }
69
70 pub fn with_buffer_size(size: usize) -> Arc<Self> {
72 Arc::new(Self { buffer_size: size, ..Self::default() })
73 }
74
75 pub fn state(&self) -> SingletonState {
76 self.state.read().clone()
77 }
78
79 pub fn set_active_here(&self, r: UntypedActorRef) {
82 *self.state.write() = SingletonState::Active { ref_: r.clone(), here: true };
83 self.flush(&r);
84 }
85
86 pub fn set_active_remote(&self, r: UntypedActorRef) {
88 *self.state.write() = SingletonState::Active { ref_: r.clone(), here: false };
89 self.flush(&r);
90 }
91
92 pub fn begin_handover(&self) {
94 *self.state.write() = SingletonState::HandingOver;
95 }
96
97 pub fn begin_starting(&self) {
99 *self.state.write() = SingletonState::Starting;
100 }
101
102 pub fn clear(&self) {
104 *self.state.write() = SingletonState::Inactive;
105 }
106
107 pub fn current(&self) -> Option<UntypedActorRef> {
108 match &*self.state.read() {
109 SingletonState::Active { ref_, .. } => Some(ref_.clone()),
110 _ => None,
111 }
112 }
113
114 fn buffer_or_deliver<F>(&self, deliver: F) -> bool
119 where
120 F: FnOnce(&UntypedActorRef) + Send + 'static,
121 {
122 if let Some(r) = self.current() {
123 deliver(&r);
124 return true;
125 }
126 let mut q = self.buffer.lock();
127 if q.len() >= self.buffer_size {
128 *self.drops.lock() += 1;
129 return false;
130 }
131 q.push_back(Box::new(deliver));
132 true
133 }
134
135 fn flush(&self, target: &UntypedActorRef) {
136 let mut q = self.buffer.lock();
137 while let Some(deliver) = q.pop_front() {
138 deliver(target);
139 }
140 }
141
142 pub fn buffered(&self) -> usize {
145 self.buffer.lock().len()
146 }
147
148 pub fn drops(&self) -> u64 {
150 *self.drops.lock()
151 }
152}
153
154pub struct ClusterSingletonProxy {
157 pub manager: Arc<ClusterSingletonManager>,
158}
159
160impl ClusterSingletonProxy {
161 pub fn new(manager: Arc<ClusterSingletonManager>) -> Self {
162 Self { manager }
163 }
164
165 pub fn singleton(&self) -> Option<UntypedActorRef> {
166 self.manager.current()
167 }
168
169 pub fn send<F>(&self, deliver: F) -> bool
173 where
174 F: FnOnce(&UntypedActorRef) + Send + 'static,
175 {
176 self.manager.buffer_or_deliver(deliver)
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use rakka_core::actor::Inbox;
184 use std::sync::atomic::{AtomicU32, Ordering};
185
186 #[test]
187 fn proxy_routes_to_current_singleton() {
188 let mgr = ClusterSingletonManager::new();
189 let inbox = Inbox::<u32>::new("singleton");
190 mgr.set_active_here(inbox.actor_ref().as_untyped());
191 let proxy = ClusterSingletonProxy::new(mgr);
192 assert!(proxy.singleton().is_some());
193 }
194
195 #[test]
196 fn handover_state_transitions() {
197 let mgr = ClusterSingletonManager::new();
198 assert!(matches!(mgr.state(), SingletonState::Inactive));
199 mgr.begin_starting();
200 assert!(matches!(mgr.state(), SingletonState::Starting));
201 let inbox = Inbox::<u32>::new("s");
202 mgr.set_active_here(inbox.actor_ref().as_untyped());
203 assert!(matches!(mgr.state(), SingletonState::Active { here: true, .. }));
204 mgr.begin_handover();
205 assert!(matches!(mgr.state(), SingletonState::HandingOver));
206 }
207
208 #[tokio::test]
209 async fn proxy_buffers_during_handover_and_flushes_after() {
210 let mgr = ClusterSingletonManager::new();
211 let proxy = ClusterSingletonProxy::new(mgr.clone());
212
213 let calls = Arc::new(AtomicU32::new(0));
214 for _ in 0..3 {
216 let c = calls.clone();
217 assert!(proxy.send(move |_r| {
218 c.fetch_add(1, Ordering::SeqCst);
219 }));
220 }
221 assert_eq!(mgr.buffered(), 3);
222 assert_eq!(calls.load(Ordering::SeqCst), 0);
223
224 let inbox = Inbox::<u32>::new("s");
226 mgr.set_active_here(inbox.actor_ref().as_untyped());
227 assert_eq!(mgr.buffered(), 0);
228 assert_eq!(calls.load(Ordering::SeqCst), 3);
229
230 let c2 = calls.clone();
232 proxy.send(move |_| {
233 c2.fetch_add(1, Ordering::SeqCst);
234 });
235 assert_eq!(calls.load(Ordering::SeqCst), 4);
236 }
237
238 #[test]
239 fn full_buffer_drops_and_counts_overflow() {
240 let mgr = ClusterSingletonManager::with_buffer_size(2);
241 let proxy = ClusterSingletonProxy::new(mgr.clone());
242 assert!(proxy.send(|_| {}));
243 assert!(proxy.send(|_| {}));
244 assert!(!proxy.send(|_| {}));
246 assert_eq!(mgr.drops(), 1);
247 assert_eq!(mgr.buffered(), 2);
248 }
249
250 #[test]
251 fn set_active_remote_marks_here_false() {
252 let mgr = ClusterSingletonManager::new();
253 let inbox = Inbox::<u32>::new("remote-host");
254 mgr.set_active_remote(inbox.actor_ref().as_untyped());
255 match mgr.state() {
256 SingletonState::Active { here, .. } => assert!(!here),
257 _ => panic!("expected active-remote"),
258 }
259 }
260}