atomr_persistence/
recovery_permitter.rs1use std::sync::Arc;
19
20use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
21
22#[derive(Clone)]
24pub struct RecoveryPermitter {
25 sem: Arc<Semaphore>,
26 capacity: usize,
27}
28
29impl RecoveryPermitter {
30 pub fn new(max_concurrent: usize) -> Self {
33 assert!(max_concurrent >= 1, "max_concurrent must be ≥ 1");
34 Self { sem: Arc::new(Semaphore::new(max_concurrent)), capacity: max_concurrent }
35 }
36
37 pub fn capacity(&self) -> usize {
39 self.capacity
40 }
41
42 pub fn available(&self) -> usize {
45 self.sem.available_permits()
46 }
47
48 pub fn in_flight(&self) -> usize {
51 self.capacity - self.available()
52 }
53
54 pub async fn acquire(&self) -> Option<OwnedSemaphorePermit> {
60 self.sem.clone().acquire_owned().await.ok()
61 }
62
63 pub fn try_acquire(&self) -> Result<OwnedSemaphorePermit, TryAcquireError> {
66 self.sem.clone().try_acquire_owned()
67 }
68
69 pub fn close(&self) {
73 self.sem.close();
74 }
75}
76
77#[cfg(test)]
78mod tests {
79 use super::*;
80 use std::time::Duration;
81
82 #[tokio::test]
83 async fn capacity_bounds_concurrent_acquires() {
84 let p = RecoveryPermitter::new(2);
85 assert_eq!(p.capacity(), 2);
86 assert_eq!(p.available(), 2);
87
88 let permit_a = p.acquire().await.unwrap();
89 let permit_b = p.acquire().await.unwrap();
90 assert_eq!(p.available(), 0);
91 assert_eq!(p.in_flight(), 2);
92
93 let p2 = p.clone();
96 let h = tokio::spawn(async move { p2.acquire().await });
97 tokio::time::sleep(Duration::from_millis(20)).await;
98 assert!(!h.is_finished()); drop(permit_a);
100 let permit_c = h.await.unwrap().unwrap();
101 assert_eq!(p.in_flight(), 2);
102
103 drop(permit_b);
104 drop(permit_c);
105 assert_eq!(p.in_flight(), 0);
106 }
107
108 #[tokio::test]
109 async fn try_acquire_returns_immediately() {
110 let p = RecoveryPermitter::new(1);
111 let _held = p.try_acquire().unwrap();
112 assert!(p.try_acquire().is_err());
113 }
114
115 #[tokio::test]
116 async fn close_returns_none_for_pending() {
117 let p = RecoveryPermitter::new(1);
118 let _held = p.acquire().await.unwrap();
119 let p2 = p.clone();
120 let h = tokio::spawn(async move { p2.acquire().await });
121 tokio::time::sleep(Duration::from_millis(10)).await;
122 p.close();
123 let r = h.await.unwrap();
124 assert!(r.is_none());
125 }
126
127 #[test]
128 #[should_panic]
129 fn zero_capacity_panics() {
130 let _ = RecoveryPermitter::new(0);
131 }
132}