atomr_persistence/
recovery_permitter.rs1use std::sync::Arc;
17
18use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
19
20#[derive(Clone)]
22pub struct RecoveryPermitter {
23 sem: Arc<Semaphore>,
24 capacity: usize,
25}
26
27impl RecoveryPermitter {
28 pub fn new(max_concurrent: usize) -> Self {
31 assert!(max_concurrent >= 1, "max_concurrent must be ≥ 1");
32 Self { sem: Arc::new(Semaphore::new(max_concurrent)), capacity: max_concurrent }
33 }
34
35 pub fn capacity(&self) -> usize {
37 self.capacity
38 }
39
40 pub fn available(&self) -> usize {
43 self.sem.available_permits()
44 }
45
46 pub fn in_flight(&self) -> usize {
49 self.capacity - self.available()
50 }
51
52 pub async fn acquire(&self) -> Option<OwnedSemaphorePermit> {
58 self.sem.clone().acquire_owned().await.ok()
59 }
60
61 pub fn try_acquire(&self) -> Result<OwnedSemaphorePermit, TryAcquireError> {
64 self.sem.clone().try_acquire_owned()
65 }
66
67 pub fn close(&self) {
71 self.sem.close();
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use super::*;
78 use std::time::Duration;
79
80 #[tokio::test]
81 async fn capacity_bounds_concurrent_acquires() {
82 let p = RecoveryPermitter::new(2);
83 assert_eq!(p.capacity(), 2);
84 assert_eq!(p.available(), 2);
85
86 let permit_a = p.acquire().await.unwrap();
87 let permit_b = p.acquire().await.unwrap();
88 assert_eq!(p.available(), 0);
89 assert_eq!(p.in_flight(), 2);
90
91 let p2 = p.clone();
94 let h = tokio::spawn(async move { p2.acquire().await });
95 tokio::time::sleep(Duration::from_millis(20)).await;
96 assert!(!h.is_finished()); drop(permit_a);
98 let permit_c = h.await.unwrap().unwrap();
99 assert_eq!(p.in_flight(), 2);
100
101 drop(permit_b);
102 drop(permit_c);
103 assert_eq!(p.in_flight(), 0);
104 }
105
106 #[tokio::test]
107 async fn try_acquire_returns_immediately() {
108 let p = RecoveryPermitter::new(1);
109 let _held = p.try_acquire().unwrap();
110 assert!(p.try_acquire().is_err());
111 }
112
113 #[tokio::test]
114 async fn close_returns_none_for_pending() {
115 let p = RecoveryPermitter::new(1);
116 let _held = p.acquire().await.unwrap();
117 let p2 = p.clone();
118 let h = tokio::spawn(async move { p2.acquire().await });
119 tokio::time::sleep(Duration::from_millis(10)).await;
120 p.close();
121 let r = h.await.unwrap();
122 assert!(r.is_none());
123 }
124
125 #[test]
126 #[should_panic]
127 fn zero_capacity_panics() {
128 let _ = RecoveryPermitter::new(0);
129 }
130}