Skip to main content

atomr_coordination/
lib.rs

1//! atomr-coordination./ `Lease` API.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Instant;
6
7use async_trait::async_trait;
8use parking_lot::Mutex;
9use thiserror::Error;
10
11#[derive(Debug, Error)]
12pub enum LeaseError {
13    #[error("lease already held by {0}")]
14    AlreadyHeld(String),
15    #[error("lease not held")]
16    NotHeld,
17}
18
19#[async_trait]
20pub trait Lease: Send + Sync + 'static {
21    async fn acquire(&self, owner: &str) -> Result<bool, LeaseError>;
22    async fn release(&self, owner: &str) -> Result<(), LeaseError>;
23    async fn check_lease(&self) -> Option<String>;
24}
25
26#[derive(Default)]
27pub struct InMemoryLease {
28    inner: Mutex<Option<(String, Instant)>>,
29}
30
31impl InMemoryLease {
32    pub fn new() -> Arc<Self> {
33        Arc::new(Self::default())
34    }
35}
36
37#[async_trait]
38impl Lease for InMemoryLease {
39    async fn acquire(&self, owner: &str) -> Result<bool, LeaseError> {
40        let mut guard = self.inner.lock();
41        match guard.as_ref() {
42            Some((current, _)) if current == owner => Ok(true),
43            Some((current, _)) => Err(LeaseError::AlreadyHeld(current.clone())),
44            None => {
45                *guard = Some((owner.to_string(), Instant::now()));
46                Ok(true)
47            }
48        }
49    }
50
51    async fn release(&self, owner: &str) -> Result<(), LeaseError> {
52        let mut guard = self.inner.lock();
53        match guard.as_ref() {
54            Some((current, _)) if current == owner => {
55                *guard = None;
56                Ok(())
57            }
58            _ => Err(LeaseError::NotHeld),
59        }
60    }
61
62    async fn check_lease(&self) -> Option<String> {
63        self.inner.lock().as_ref().map(|(s, _)| s.clone())
64    }
65}
66
67#[derive(Default)]
68pub struct LeaseRegistry {
69    leases: Mutex<HashMap<String, Arc<InMemoryLease>>>,
70}
71
72impl LeaseRegistry {
73    pub fn new() -> Self {
74        Self::default()
75    }
76
77    pub fn get_or_create(&self, name: &str) -> Arc<InMemoryLease> {
78        self.leases.lock().entry(name.to_string()).or_default().clone()
79    }
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85
86    #[tokio::test]
87    async fn acquire_release_cycle() {
88        let l = InMemoryLease::new();
89        assert!(l.acquire("me").await.unwrap());
90        assert_eq!(l.check_lease().await.as_deref(), Some("me"));
91        l.release("me").await.unwrap();
92        assert!(l.check_lease().await.is_none());
93    }
94
95    #[tokio::test]
96    async fn second_owner_rejected() {
97        let l = InMemoryLease::new();
98        l.acquire("a").await.unwrap();
99        matches!(l.acquire("b").await.unwrap_err(), LeaseError::AlreadyHeld(_));
100    }
101}