Skip to main content

atomr_persistence/
recovery_permitter.rs

1//! `RecoveryPermitter` — bounded concurrent recoveries.
2//!
3//! Configured via `atomr.persistence.max-concurrent-recoveries`.
4//!
5//! Without a permitter, every actor that starts up triggers a journal
6//! replay; thousands of restart-storming actors can DoS the journal
7//! backend. The permitter bounds the in-flight recovery count so
8//! late arrivers wait their turn.
9//!
10//! Implementation note: a thin wrapper around
11//! [`tokio::sync::Semaphore`] so it integrates cleanly with the
12//! `Eventsourced::recover` driver. Held permits use the standard
13//! `OwnedSemaphorePermit` so callers can `drop` them mid-method
14//! (e.g. before running a long `recovery_completed` user hook).
15
16use std::sync::Arc;
17
18use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
19
20/// Bounded in-flight recovery counter.
21#[derive(Clone)]
22pub struct RecoveryPermitter {
23    sem: Arc<Semaphore>,
24    capacity: usize,
25}
26
27impl RecoveryPermitter {
28    /// Create a permitter that allows up to `max_concurrent` parallel
29    /// recoveries.
30    pub fn new(max_concurrent: usize) -> Self {
31        assert!(max_concurrent >= 1, "max_concurrent must be ≥ 1");
32        Self { sem: Arc::new(Semaphore::new(max_concurrent)), capacity: max_concurrent }
33    }
34
35    /// Maximum permits ever issued (i.e. construction-time capacity).
36    pub fn capacity(&self) -> usize {
37        self.capacity
38    }
39
40    /// Permits currently available — i.e. how many *more* recoveries
41    /// could begin right now without blocking.
42    pub fn available(&self) -> usize {
43        self.sem.available_permits()
44    }
45
46    /// Permits currently held by callers (waiting on or driving a
47    /// recovery).
48    pub fn in_flight(&self) -> usize {
49        self.capacity - self.available()
50    }
51
52    /// Block until a permit is available.
53    ///
54    /// Returns `None` if the permitter has been
55    /// [`close`d](RecoveryPermitter::close), so callers can map the
56    /// result onto `EventsourcedError::PermitDenied` cleanly.
57    pub async fn acquire(&self) -> Option<OwnedSemaphorePermit> {
58        self.sem.clone().acquire_owned().await.ok()
59    }
60
61    /// Try to acquire a permit without waiting. Returns `Err(_)` if
62    /// no permit is available *right now*.
63    pub fn try_acquire(&self) -> Result<OwnedSemaphorePermit, TryAcquireError> {
64        self.sem.clone().try_acquire_owned()
65    }
66
67    /// Drain the permitter — pending and future
68    /// [`acquire`s](RecoveryPermitter::acquire) return `None` so
69    /// shutdown can short-circuit.
70    pub fn close(&self) {
71        self.sem.close();
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78    use std::time::Duration;
79
80    #[tokio::test]
81    async fn capacity_bounds_concurrent_acquires() {
82        let p = RecoveryPermitter::new(2);
83        assert_eq!(p.capacity(), 2);
84        assert_eq!(p.available(), 2);
85
86        let permit_a = p.acquire().await.unwrap();
87        let permit_b = p.acquire().await.unwrap();
88        assert_eq!(p.available(), 0);
89        assert_eq!(p.in_flight(), 2);
90
91        // Third acquire must wait; a parallel task drops permit_a after
92        // a tick to release it.
93        let p2 = p.clone();
94        let h = tokio::spawn(async move { p2.acquire().await });
95        tokio::time::sleep(Duration::from_millis(20)).await;
96        assert!(!h.is_finished()); // still waiting
97        drop(permit_a);
98        let permit_c = h.await.unwrap().unwrap();
99        assert_eq!(p.in_flight(), 2);
100
101        drop(permit_b);
102        drop(permit_c);
103        assert_eq!(p.in_flight(), 0);
104    }
105
106    #[tokio::test]
107    async fn try_acquire_returns_immediately() {
108        let p = RecoveryPermitter::new(1);
109        let _held = p.try_acquire().unwrap();
110        assert!(p.try_acquire().is_err());
111    }
112
113    #[tokio::test]
114    async fn close_returns_none_for_pending() {
115        let p = RecoveryPermitter::new(1);
116        let _held = p.acquire().await.unwrap();
117        let p2 = p.clone();
118        let h = tokio::spawn(async move { p2.acquire().await });
119        tokio::time::sleep(Duration::from_millis(10)).await;
120        p.close();
121        let r = h.await.unwrap();
122        assert!(r.is_none());
123    }
124
125    #[test]
126    #[should_panic]
127    fn zero_capacity_panics() {
128        let _ = RecoveryPermitter::new(0);
129    }
130}