atomr_cluster_tools/
cluster_singleton.rs1use std::collections::VecDeque;
16use std::sync::Arc;
17
18use parking_lot::RwLock;
19
20use atomr_core::actor::UntypedActorRef;
21
22#[derive(Debug, Clone, PartialEq, Eq)]
24#[non_exhaustive]
25pub enum SingletonState {
26 Inactive,
28 Starting,
30 Active { ref_: UntypedActorRef, here: bool },
32 HandingOver,
34}
35
36type BufferedMsg = Box<dyn FnOnce(&UntypedActorRef) + Send + 'static>;
41
42pub struct ClusterSingletonManager {
46 state: RwLock<SingletonState>,
47 buffer: parking_lot::Mutex<VecDeque<BufferedMsg>>,
48 buffer_size: usize,
49 drops: parking_lot::Mutex<u64>,
51}
52
53impl Default for ClusterSingletonManager {
54 fn default() -> Self {
55 Self {
56 state: RwLock::new(SingletonState::Inactive),
57 buffer: parking_lot::Mutex::new(VecDeque::new()),
58 buffer_size: 1_000,
59 drops: parking_lot::Mutex::new(0),
60 }
61 }
62}
63
64impl ClusterSingletonManager {
65 pub fn new() -> Arc<Self> {
66 Arc::new(Self::default())
67 }
68
69 pub fn with_buffer_size(size: usize) -> Arc<Self> {
71 Arc::new(Self { buffer_size: size, ..Self::default() })
72 }
73
74 pub fn state(&self) -> SingletonState {
75 self.state.read().clone()
76 }
77
78 pub fn set_active_here(&self, r: UntypedActorRef) {
81 *self.state.write() = SingletonState::Active { ref_: r.clone(), here: true };
82 self.flush(&r);
83 }
84
85 pub fn set_active_remote(&self, r: UntypedActorRef) {
87 *self.state.write() = SingletonState::Active { ref_: r.clone(), here: false };
88 self.flush(&r);
89 }
90
91 pub fn begin_handover(&self) {
93 *self.state.write() = SingletonState::HandingOver;
94 }
95
96 pub fn begin_starting(&self) {
98 *self.state.write() = SingletonState::Starting;
99 }
100
101 pub fn clear(&self) {
103 *self.state.write() = SingletonState::Inactive;
104 }
105
106 pub fn current(&self) -> Option<UntypedActorRef> {
107 match &*self.state.read() {
108 SingletonState::Active { ref_, .. } => Some(ref_.clone()),
109 _ => None,
110 }
111 }
112
113 fn buffer_or_deliver<F>(&self, deliver: F) -> bool
118 where
119 F: FnOnce(&UntypedActorRef) + Send + 'static,
120 {
121 if let Some(r) = self.current() {
122 deliver(&r);
123 return true;
124 }
125 let mut q = self.buffer.lock();
126 if q.len() >= self.buffer_size {
127 *self.drops.lock() += 1;
128 return false;
129 }
130 q.push_back(Box::new(deliver));
131 true
132 }
133
134 fn flush(&self, target: &UntypedActorRef) {
135 let mut q = self.buffer.lock();
136 while let Some(deliver) = q.pop_front() {
137 deliver(target);
138 }
139 }
140
141 pub fn buffered(&self) -> usize {
144 self.buffer.lock().len()
145 }
146
147 pub fn drops(&self) -> u64 {
149 *self.drops.lock()
150 }
151}
152
153pub struct ClusterSingletonProxy {
156 pub manager: Arc<ClusterSingletonManager>,
157}
158
159impl ClusterSingletonProxy {
160 pub fn new(manager: Arc<ClusterSingletonManager>) -> Self {
161 Self { manager }
162 }
163
164 pub fn singleton(&self) -> Option<UntypedActorRef> {
165 self.manager.current()
166 }
167
168 pub fn send<F>(&self, deliver: F) -> bool
172 where
173 F: FnOnce(&UntypedActorRef) + Send + 'static,
174 {
175 self.manager.buffer_or_deliver(deliver)
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use atomr_core::actor::Inbox;
183 use std::sync::atomic::{AtomicU32, Ordering};
184
185 #[test]
186 fn proxy_routes_to_current_singleton() {
187 let mgr = ClusterSingletonManager::new();
188 let inbox = Inbox::<u32>::new("singleton");
189 mgr.set_active_here(inbox.actor_ref().as_untyped());
190 let proxy = ClusterSingletonProxy::new(mgr);
191 assert!(proxy.singleton().is_some());
192 }
193
194 #[test]
195 fn handover_state_transitions() {
196 let mgr = ClusterSingletonManager::new();
197 assert!(matches!(mgr.state(), SingletonState::Inactive));
198 mgr.begin_starting();
199 assert!(matches!(mgr.state(), SingletonState::Starting));
200 let inbox = Inbox::<u32>::new("s");
201 mgr.set_active_here(inbox.actor_ref().as_untyped());
202 assert!(matches!(mgr.state(), SingletonState::Active { here: true, .. }));
203 mgr.begin_handover();
204 assert!(matches!(mgr.state(), SingletonState::HandingOver));
205 }
206
207 #[tokio::test]
208 async fn proxy_buffers_during_handover_and_flushes_after() {
209 let mgr = ClusterSingletonManager::new();
210 let proxy = ClusterSingletonProxy::new(mgr.clone());
211
212 let calls = Arc::new(AtomicU32::new(0));
213 for _ in 0..3 {
215 let c = calls.clone();
216 assert!(proxy.send(move |_r| {
217 c.fetch_add(1, Ordering::SeqCst);
218 }));
219 }
220 assert_eq!(mgr.buffered(), 3);
221 assert_eq!(calls.load(Ordering::SeqCst), 0);
222
223 let inbox = Inbox::<u32>::new("s");
225 mgr.set_active_here(inbox.actor_ref().as_untyped());
226 assert_eq!(mgr.buffered(), 0);
227 assert_eq!(calls.load(Ordering::SeqCst), 3);
228
229 let c2 = calls.clone();
231 proxy.send(move |_| {
232 c2.fetch_add(1, Ordering::SeqCst);
233 });
234 assert_eq!(calls.load(Ordering::SeqCst), 4);
235 }
236
237 #[test]
238 fn full_buffer_drops_and_counts_overflow() {
239 let mgr = ClusterSingletonManager::with_buffer_size(2);
240 let proxy = ClusterSingletonProxy::new(mgr.clone());
241 assert!(proxy.send(|_| {}));
242 assert!(proxy.send(|_| {}));
243 assert!(!proxy.send(|_| {}));
245 assert_eq!(mgr.drops(), 1);
246 assert_eq!(mgr.buffered(), 2);
247 }
248
249 #[test]
250 fn set_active_remote_marks_here_false() {
251 let mgr = ClusterSingletonManager::new();
252 let inbox = Inbox::<u32>::new("remote-host");
253 mgr.set_active_remote(inbox.actor_ref().as_untyped());
254 match mgr.state() {
255 SingletonState::Active { here, .. } => assert!(!here),
256 _ => panic!("expected active-remote"),
257 }
258 }
259}