atomr_coordination/
lib.rs1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Instant;
6
7use async_trait::async_trait;
8use parking_lot::Mutex;
9use thiserror::Error;
10
11#[derive(Debug, Error)]
12pub enum LeaseError {
13 #[error("lease already held by {0}")]
14 AlreadyHeld(String),
15 #[error("lease not held")]
16 NotHeld,
17}
18
19#[async_trait]
20pub trait Lease: Send + Sync + 'static {
21 async fn acquire(&self, owner: &str) -> Result<bool, LeaseError>;
22 async fn release(&self, owner: &str) -> Result<(), LeaseError>;
23 async fn check_lease(&self) -> Option<String>;
24}
25
26#[derive(Default)]
27pub struct InMemoryLease {
28 inner: Mutex<Option<(String, Instant)>>,
29}
30
31impl InMemoryLease {
32 pub fn new() -> Arc<Self> {
33 Arc::new(Self::default())
34 }
35}
36
37#[async_trait]
38impl Lease for InMemoryLease {
39 async fn acquire(&self, owner: &str) -> Result<bool, LeaseError> {
40 let mut guard = self.inner.lock();
41 match guard.as_ref() {
42 Some((current, _)) if current == owner => Ok(true),
43 Some((current, _)) => Err(LeaseError::AlreadyHeld(current.clone())),
44 None => {
45 *guard = Some((owner.to_string(), Instant::now()));
46 Ok(true)
47 }
48 }
49 }
50
51 async fn release(&self, owner: &str) -> Result<(), LeaseError> {
52 let mut guard = self.inner.lock();
53 match guard.as_ref() {
54 Some((current, _)) if current == owner => {
55 *guard = None;
56 Ok(())
57 }
58 _ => Err(LeaseError::NotHeld),
59 }
60 }
61
62 async fn check_lease(&self) -> Option<String> {
63 self.inner.lock().as_ref().map(|(s, _)| s.clone())
64 }
65}
66
67#[derive(Default)]
68pub struct LeaseRegistry {
69 leases: Mutex<HashMap<String, Arc<InMemoryLease>>>,
70}
71
72impl LeaseRegistry {
73 pub fn new() -> Self {
74 Self::default()
75 }
76
77 pub fn get_or_create(&self, name: &str) -> Arc<InMemoryLease> {
78 self.leases.lock().entry(name.to_string()).or_default().clone()
79 }
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85
86 #[tokio::test]
87 async fn acquire_release_cycle() {
88 let l = InMemoryLease::new();
89 assert!(l.acquire("me").await.unwrap());
90 assert_eq!(l.check_lease().await.as_deref(), Some("me"));
91 l.release("me").await.unwrap();
92 assert!(l.check_lease().await.is_none());
93 }
94
95 #[tokio::test]
96 async fn second_owner_rejected() {
97 let l = InMemoryLease::new();
98 l.acquire("a").await.unwrap();
99 matches!(l.acquire("b").await.unwrap_err(), LeaseError::AlreadyHeld(_));
100 }
101}