use std::collections::VecDeque;
use std::sync::Arc;
use parking_lot::RwLock;
use atomr_core::actor::UntypedActorRef;
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum SingletonState {
Inactive,
Starting,
Active { ref_: UntypedActorRef, here: bool },
HandingOver,
}
type BufferedMsg = Box<dyn FnOnce(&UntypedActorRef) + Send + 'static>;
pub struct ClusterSingletonManager {
state: RwLock<SingletonState>,
buffer: parking_lot::Mutex<VecDeque<BufferedMsg>>,
buffer_size: usize,
drops: parking_lot::Mutex<u64>,
}
impl Default for ClusterSingletonManager {
fn default() -> Self {
Self {
state: RwLock::new(SingletonState::Inactive),
buffer: parking_lot::Mutex::new(VecDeque::new()),
buffer_size: 1_000,
drops: parking_lot::Mutex::new(0),
}
}
}
impl ClusterSingletonManager {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
pub fn with_buffer_size(size: usize) -> Arc<Self> {
Arc::new(Self { buffer_size: size, ..Self::default() })
}
pub fn state(&self) -> SingletonState {
self.state.read().clone()
}
pub fn set_active_here(&self, r: UntypedActorRef) {
*self.state.write() = SingletonState::Active { ref_: r.clone(), here: true };
self.flush(&r);
}
pub fn set_active_remote(&self, r: UntypedActorRef) {
*self.state.write() = SingletonState::Active { ref_: r.clone(), here: false };
self.flush(&r);
}
pub fn begin_handover(&self) {
*self.state.write() = SingletonState::HandingOver;
}
pub fn begin_starting(&self) {
*self.state.write() = SingletonState::Starting;
}
pub fn clear(&self) {
*self.state.write() = SingletonState::Inactive;
}
pub fn current(&self) -> Option<UntypedActorRef> {
match &*self.state.read() {
SingletonState::Active { ref_, .. } => Some(ref_.clone()),
_ => None,
}
}
fn buffer_or_deliver<F>(&self, deliver: F) -> bool
where
F: FnOnce(&UntypedActorRef) + Send + 'static,
{
if let Some(r) = self.current() {
deliver(&r);
return true;
}
let mut q = self.buffer.lock();
if q.len() >= self.buffer_size {
*self.drops.lock() += 1;
return false;
}
q.push_back(Box::new(deliver));
true
}
fn flush(&self, target: &UntypedActorRef) {
let mut q = self.buffer.lock();
while let Some(deliver) = q.pop_front() {
deliver(target);
}
}
pub fn buffered(&self) -> usize {
self.buffer.lock().len()
}
pub fn drops(&self) -> u64 {
*self.drops.lock()
}
}
pub struct ClusterSingletonProxy {
pub manager: Arc<ClusterSingletonManager>,
}
impl ClusterSingletonProxy {
pub fn new(manager: Arc<ClusterSingletonManager>) -> Self {
Self { manager }
}
pub fn singleton(&self) -> Option<UntypedActorRef> {
self.manager.current()
}
pub fn send<F>(&self, deliver: F) -> bool
where
F: FnOnce(&UntypedActorRef) + Send + 'static,
{
self.manager.buffer_or_deliver(deliver)
}
}
#[cfg(test)]
mod tests {
use super::*;
use atomr_core::actor::Inbox;
use std::sync::atomic::{AtomicU32, Ordering};
#[test]
fn proxy_routes_to_current_singleton() {
let mgr = ClusterSingletonManager::new();
let inbox = Inbox::<u32>::new("singleton");
mgr.set_active_here(inbox.actor_ref().as_untyped());
let proxy = ClusterSingletonProxy::new(mgr);
assert!(proxy.singleton().is_some());
}
#[test]
fn handover_state_transitions() {
let mgr = ClusterSingletonManager::new();
assert!(matches!(mgr.state(), SingletonState::Inactive));
mgr.begin_starting();
assert!(matches!(mgr.state(), SingletonState::Starting));
let inbox = Inbox::<u32>::new("s");
mgr.set_active_here(inbox.actor_ref().as_untyped());
assert!(matches!(mgr.state(), SingletonState::Active { here: true, .. }));
mgr.begin_handover();
assert!(matches!(mgr.state(), SingletonState::HandingOver));
}
#[tokio::test]
async fn proxy_buffers_during_handover_and_flushes_after() {
let mgr = ClusterSingletonManager::new();
let proxy = ClusterSingletonProxy::new(mgr.clone());
let calls = Arc::new(AtomicU32::new(0));
for _ in 0..3 {
let c = calls.clone();
assert!(proxy.send(move |_r| {
c.fetch_add(1, Ordering::SeqCst);
}));
}
assert_eq!(mgr.buffered(), 3);
assert_eq!(calls.load(Ordering::SeqCst), 0);
let inbox = Inbox::<u32>::new("s");
mgr.set_active_here(inbox.actor_ref().as_untyped());
assert_eq!(mgr.buffered(), 0);
assert_eq!(calls.load(Ordering::SeqCst), 3);
let c2 = calls.clone();
proxy.send(move |_| {
c2.fetch_add(1, Ordering::SeqCst);
});
assert_eq!(calls.load(Ordering::SeqCst), 4);
}
#[test]
fn full_buffer_drops_and_counts_overflow() {
let mgr = ClusterSingletonManager::with_buffer_size(2);
let proxy = ClusterSingletonProxy::new(mgr.clone());
assert!(proxy.send(|_| {}));
assert!(proxy.send(|_| {}));
assert!(!proxy.send(|_| {}));
assert_eq!(mgr.drops(), 1);
assert_eq!(mgr.buffered(), 2);
}
#[test]
fn set_active_remote_marks_here_false() {
let mgr = ClusterSingletonManager::new();
let inbox = Inbox::<u32>::new("remote-host");
mgr.set_active_remote(inbox.actor_ref().as_untyped());
match mgr.state() {
SingletonState::Active { here, .. } => assert!(!here),
_ => panic!("expected active-remote"),
}
}
}