Skip to main content

atomr_persistence/
recovery_permitter.rs

1//! `RecoveryPermitter` — bounded concurrent recoveries.
2//!
3//! Phase 11 of `docs/full-port-plan.md`. Akka.NET parity:
4//! `Akka.Persistence.RecoveryPermitter` (config:
5//! `akka.persistence.max-concurrent-recoveries`).
6//!
7//! Without a permitter, every actor that starts up triggers a journal
8//! replay; thousands of restart-storming actors can DoS the journal
9//! backend. The permitter bounds the in-flight recovery count so
10//! late arrivers wait their turn.
11//!
12//! Implementation note: a thin wrapper around
13//! [`tokio::sync::Semaphore`] so it integrates cleanly with the
14//! `Eventsourced::recover` driver. Held permits use the standard
15//! `OwnedSemaphorePermit` so callers can `drop` them mid-method
16//! (e.g. before running a long `recovery_completed` user hook).
17
18use std::sync::Arc;
19
20use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
21
22/// Bounded in-flight recovery counter.
23#[derive(Clone)]
24pub struct RecoveryPermitter {
25    sem: Arc<Semaphore>,
26    capacity: usize,
27}
28
29impl RecoveryPermitter {
30    /// Create a permitter that allows up to `max_concurrent` parallel
31    /// recoveries.
32    pub fn new(max_concurrent: usize) -> Self {
33        assert!(max_concurrent >= 1, "max_concurrent must be ≥ 1");
34        Self { sem: Arc::new(Semaphore::new(max_concurrent)), capacity: max_concurrent }
35    }
36
37    /// Maximum permits ever issued (i.e. construction-time capacity).
38    pub fn capacity(&self) -> usize {
39        self.capacity
40    }
41
42    /// Permits currently available — i.e. how many *more* recoveries
43    /// could begin right now without blocking.
44    pub fn available(&self) -> usize {
45        self.sem.available_permits()
46    }
47
48    /// Permits currently held by callers (waiting on or driving a
49    /// recovery).
50    pub fn in_flight(&self) -> usize {
51        self.capacity - self.available()
52    }
53
54    /// Block until a permit is available.
55    ///
56    /// Returns `None` if the permitter has been
57    /// [`close`d](RecoveryPermitter::close), so callers can map the
58    /// result onto `EventsourcedError::PermitDenied` cleanly.
59    pub async fn acquire(&self) -> Option<OwnedSemaphorePermit> {
60        self.sem.clone().acquire_owned().await.ok()
61    }
62
63    /// Try to acquire a permit without waiting. Returns `Err(_)` if
64    /// no permit is available *right now*.
65    pub fn try_acquire(&self) -> Result<OwnedSemaphorePermit, TryAcquireError> {
66        self.sem.clone().try_acquire_owned()
67    }
68
69    /// Drain the permitter — pending and future
70    /// [`acquire`s](RecoveryPermitter::acquire) return `None` so
71    /// shutdown can short-circuit.
72    pub fn close(&self) {
73        self.sem.close();
74    }
75}
76
77#[cfg(test)]
78mod tests {
79    use super::*;
80    use std::time::Duration;
81
82    #[tokio::test]
83    async fn capacity_bounds_concurrent_acquires() {
84        let p = RecoveryPermitter::new(2);
85        assert_eq!(p.capacity(), 2);
86        assert_eq!(p.available(), 2);
87
88        let permit_a = p.acquire().await.unwrap();
89        let permit_b = p.acquire().await.unwrap();
90        assert_eq!(p.available(), 0);
91        assert_eq!(p.in_flight(), 2);
92
93        // Third acquire must wait; a parallel task drops permit_a after
94        // a tick to release it.
95        let p2 = p.clone();
96        let h = tokio::spawn(async move { p2.acquire().await });
97        tokio::time::sleep(Duration::from_millis(20)).await;
98        assert!(!h.is_finished()); // still waiting
99        drop(permit_a);
100        let permit_c = h.await.unwrap().unwrap();
101        assert_eq!(p.in_flight(), 2);
102
103        drop(permit_b);
104        drop(permit_c);
105        assert_eq!(p.in_flight(), 0);
106    }
107
108    #[tokio::test]
109    async fn try_acquire_returns_immediately() {
110        let p = RecoveryPermitter::new(1);
111        let _held = p.try_acquire().unwrap();
112        assert!(p.try_acquire().is_err());
113    }
114
115    #[tokio::test]
116    async fn close_returns_none_for_pending() {
117        let p = RecoveryPermitter::new(1);
118        let _held = p.acquire().await.unwrap();
119        let p2 = p.clone();
120        let h = tokio::spawn(async move { p2.acquire().await });
121        tokio::time::sleep(Duration::from_millis(10)).await;
122        p.close();
123        let r = h.await.unwrap();
124        assert!(r.is_none());
125    }
126
127    #[test]
128    #[should_panic]
129    fn zero_capacity_panics() {
130        let _ = RecoveryPermitter::new(0);
131    }
132}