Skip to main content

monocoque_core/
backpressure.rs

1//! Backpressure: `BytePermits`
2//!
3//! Byte-based flow control for write pumps.
4//!
5//! Design principle:
6//! - Backpressure scales with **bytes**, not message count
7//! - One giant message should not starve other connections
8//! - Pluggable: `NoOp` (default) → Semaphore → dynamic policy
9//!
10//! Usage:
11//! ```rust,ignore
12//! let permits = SemaphorePermits::new(10 * 1024 * 1024); // 10MB limit
13//! let permit = permits.acquire(n_bytes).await;
14//! writer.write(buf).await;
15//! drop(permit); // releases automatically
16//! ```
17
18use async_trait::async_trait;
19use parking_lot::{Condvar, Mutex};
20use std::sync::Arc;
21
22/// Backpressure permit trait.
23///
24/// Implementations control write pump flow based on byte counts.
25#[async_trait]
26pub trait BytePermits: Send + Sync {
27    /// Acquire permission to write `n_bytes`.
28    ///
29    /// This may block if the system is under memory pressure.
30    async fn acquire(&self, n_bytes: usize) -> Permit;
31}
32
33/// Internal state for the byte semaphore.
34struct SemInner {
35    available: usize,
36    /// Total capacity; used to clamp oversized acquires so we never deadlock.
37    max_bytes: usize,
38}
39
40/// RAII permit guard.
41///
42/// Releases the permit when dropped.
43pub struct Permit {
44    inner: Option<PermitInner>,
45}
46
47enum PermitInner {
48    /// Byte-counting semaphore backed by parking_lot primitives (usable in drop).
49    ByteSem(Arc<(Mutex<SemInner>, Condvar)>, usize),
50    NoOp,
51}
52
53impl Drop for Permit {
54    fn drop(&mut self) {
55        match self.inner.take() {
56            Some(PermitInner::ByteSem(inner, n_bytes)) => {
57                let (mutex, condvar) = &*inner;
58                let mut guard = mutex.lock();
59                guard.available += n_bytes;
60                condvar.notify_all();
61            }
62            Some(PermitInner::NoOp) | None => {}
63        }
64    }
65}
66
67impl Permit {
68    pub(crate) const fn noop() -> Self {
69        Self {
70            inner: Some(PermitInner::NoOp),
71        }
72    }
73
74    fn byte_sem(inner: Arc<(Mutex<SemInner>, Condvar)>, n_bytes: usize) -> Self {
75        Self {
76            inner: Some(PermitInner::ByteSem(inner, n_bytes)),
77        }
78    }
79}
80
81/// No-op implementation (Phase 0).
82///
83/// Always grants permits immediately.
84/// Use this until memory pressure becomes an issue.
85#[derive(Debug, Clone, Copy, Default)]
86pub struct NoOpPermits;
87
88#[async_trait]
89impl BytePermits for NoOpPermits {
90    async fn acquire(&self, _n_bytes: usize) -> Permit {
91        Permit::noop()
92    }
93}
94
95/// Semaphore-based backpressure implementation.
96///
97/// Enforces a maximum number of bytes that can be buffered at once.
98/// When the limit is reached, `acquire()` will block until space is available.
99/// Acquires all N bytes in a single atomic operation (O(1), not O(N)).
100///
101/// # Example
102///
103/// ```
104/// use monocoque_core::backpressure::{BytePermits, SemaphorePermits};
105///
106/// # compio::runtime::Runtime::new().unwrap().block_on(async {
107/// // Allow up to 10MB of buffered data
108/// let permits = SemaphorePermits::new(10 * 1024 * 1024);
109///
110/// // Acquire permit for 1KB write
111/// let permit = permits.acquire(1024).await;
112/// // ... perform write ...
113/// drop(permit); // releases 1024 bytes back to the pool
114/// # });
115/// ```
116#[derive(Clone)]
117pub struct SemaphorePermits {
118    inner: Arc<(Mutex<SemInner>, Condvar)>,
119}
120
121impl SemaphorePermits {
122    /// Create a new semaphore-based backpressure controller.
123    ///
124    /// # Arguments
125    ///
126    /// * `max_bytes` - Maximum number of bytes that can be buffered
127    #[must_use]
128    pub fn new(max_bytes: usize) -> Self {
129        Self {
130            inner: Arc::new((
131                Mutex::new(SemInner {
132                    available: max_bytes,
133                    max_bytes,
134                }),
135                Condvar::new(),
136            )),
137        }
138    }
139}
140
141#[async_trait]
142impl BytePermits for SemaphorePermits {
143    async fn acquire(&self, n_bytes: usize) -> Permit {
144        if n_bytes == 0 {
145            return Permit::noop();
146        }
147
148        // Blocking wait is performed on a dedicated thread so we don't block
149        // the async executor. parking_lot::Condvar::wait is synchronous and
150        // safe to use here because SemInner uses parking_lot::Mutex.
151        let inner = self.inner.clone();
152        let actual = compio::runtime::spawn_blocking(move || {
153            let (mutex, condvar) = &*inner;
154            let mut guard = mutex.lock();
155            // Clamp to max_bytes so a single oversized message never deadlocks:
156            // the message will consume the entire capacity instead of waiting
157            // forever for capacity that can never exist.
158            let claim = n_bytes.min(guard.max_bytes);
159            // Wait until enough capacity is available.
160            while guard.available < claim {
161                condvar.wait(&mut guard);
162            }
163            guard.available -= claim;
164            // Return the actual bytes claimed so the Permit releases the right amount.
165            claim
166        })
167        .await;
168
169        Permit::byte_sem(self.inner.clone(), actual)
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176
177    #[test]
178    fn noop_permits_always_succeed() {
179        let permits = NoOpPermits;
180        let rt = compio::runtime::Runtime::new().unwrap();
181        rt.block_on(async {
182            let _p1 = permits.acquire(1024).await;
183            let _p2 = permits.acquire(1_000_000).await;
184            // Should not block
185        });
186    }
187
188    #[test]
189    fn semaphore_permits_enforce_limit() {
190        let permits = SemaphorePermits::new(1024);
191        let rt = compio::runtime::Runtime::new().unwrap();
192
193        rt.block_on(async {
194            // First 1024 bytes should succeed
195            let p1 = permits.acquire(1024).await;
196
197            // Try to acquire more - this would block, so we test the behavior
198            // by checking we can acquire after dropping
199            drop(p1);
200
201            let _p2 = permits.acquire(512).await;
202            let _p3 = permits.acquire(512).await;
203            // Should succeed with 1024 total
204        });
205    }
206
207    #[test]
208    fn semaphore_permits_release_on_drop() {
209        let permits = SemaphorePermits::new(1000);
210        let rt = compio::runtime::Runtime::new().unwrap();
211
212        rt.block_on(async {
213            {
214                let _p1 = permits.acquire(500).await;
215                let _p2 = permits.acquire(500).await;
216                // Full capacity used
217            } // Permits dropped here
218
219            // Should be able to acquire again after drop
220            let _p3 = permits.acquire(1000).await;
221        });
222    }
223
224    #[test]
225    fn semaphore_permits_oversized_acquire_does_not_deadlock() {
226        // A single acquire larger than max_bytes must complete (clamped to max_bytes)
227        // rather than deadlocking forever waiting for capacity that can never exist.
228        let permits = SemaphorePermits::new(1024);
229        let rt = compio::runtime::Runtime::new().unwrap();
230
231        rt.block_on(async {
232            let permit = permits.acquire(2048).await; // 2× max — must not deadlock
233            drop(permit);
234            // After release, we can acquire up to max_bytes again.
235            let _p = permits.acquire(1024).await;
236        });
237    }
238
239    #[test]
240    fn semaphore_permits_single_atomic_acquire() {
241        // Verify that acquiring N bytes is done atomically (not O(N) individual acquires)
242        let permits = SemaphorePermits::new(1024 * 1024); // 1MB
243        let rt = compio::runtime::Runtime::new().unwrap();
244
245        rt.block_on(async {
246            // Acquire a large block in one shot - this should not loop N times
247            let permit = permits.acquire(512 * 1024).await; // 512KB
248            drop(permit);
249        });
250    }
251}