monocoque_core/backpressure.rs
1//! Backpressure: `BytePermits`
2//!
3//! Byte-based flow control for write pumps.
4//!
5//! Design principle:
6//! - Backpressure scales with **bytes**, not message count
7//! - One giant message should not starve other connections
8//! - Pluggable: `NoOp` (default) → Semaphore → dynamic policy
9//!
10//! Usage:
11//! ```rust,ignore
12//! let permits = SemaphorePermits::new(10 * 1024 * 1024); // 10MB limit
13//! let permit = permits.acquire(n_bytes).await;
14//! writer.write(buf).await;
15//! drop(permit); // releases automatically
16//! ```
17
18use async_trait::async_trait;
19use parking_lot::{Condvar, Mutex};
20use std::sync::Arc;
21
22/// Backpressure permit trait.
23///
24/// Implementations control write pump flow based on byte counts.
25#[async_trait]
26pub trait BytePermits: Send + Sync {
27 /// Acquire permission to write `n_bytes`.
28 ///
29 /// This may block if the system is under memory pressure.
30 async fn acquire(&self, n_bytes: usize) -> Permit;
31}
32
33/// Internal state for the byte semaphore.
34struct SemInner {
35 available: usize,
36 /// Total capacity; used to clamp oversized acquires so we never deadlock.
37 max_bytes: usize,
38}
39
40/// RAII permit guard.
41///
42/// Releases the permit when dropped.
43pub struct Permit {
44 inner: Option<PermitInner>,
45}
46
47enum PermitInner {
48 /// Byte-counting semaphore backed by parking_lot primitives (usable in drop).
49 ByteSem(Arc<(Mutex<SemInner>, Condvar)>, usize),
50 NoOp,
51}
52
53impl Drop for Permit {
54 fn drop(&mut self) {
55 match self.inner.take() {
56 Some(PermitInner::ByteSem(inner, n_bytes)) => {
57 let (mutex, condvar) = &*inner;
58 let mut guard = mutex.lock();
59 guard.available += n_bytes;
60 condvar.notify_all();
61 }
62 Some(PermitInner::NoOp) | None => {}
63 }
64 }
65}
66
67impl Permit {
68 pub(crate) const fn noop() -> Self {
69 Self {
70 inner: Some(PermitInner::NoOp),
71 }
72 }
73
74 fn byte_sem(inner: Arc<(Mutex<SemInner>, Condvar)>, n_bytes: usize) -> Self {
75 Self {
76 inner: Some(PermitInner::ByteSem(inner, n_bytes)),
77 }
78 }
79}
80
81/// No-op implementation (Phase 0).
82///
83/// Always grants permits immediately.
84/// Use this until memory pressure becomes an issue.
85#[derive(Debug, Clone, Copy, Default)]
86pub struct NoOpPermits;
87
88#[async_trait]
89impl BytePermits for NoOpPermits {
90 async fn acquire(&self, _n_bytes: usize) -> Permit {
91 Permit::noop()
92 }
93}
94
95/// Semaphore-based backpressure implementation.
96///
97/// Enforces a maximum number of bytes that can be buffered at once.
98/// When the limit is reached, `acquire()` will block until space is available.
99/// Acquires all N bytes in a single atomic operation (O(1), not O(N)).
100///
101/// # Example
102///
103/// ```
104/// use monocoque_core::backpressure::{BytePermits, SemaphorePermits};
105///
106/// # compio::runtime::Runtime::new().unwrap().block_on(async {
107/// // Allow up to 10MB of buffered data
108/// let permits = SemaphorePermits::new(10 * 1024 * 1024);
109///
110/// // Acquire permit for 1KB write
111/// let permit = permits.acquire(1024).await;
112/// // ... perform write ...
113/// drop(permit); // releases 1024 bytes back to the pool
114/// # });
115/// ```
116#[derive(Clone)]
117pub struct SemaphorePermits {
118 inner: Arc<(Mutex<SemInner>, Condvar)>,
119}
120
121impl SemaphorePermits {
122 /// Create a new semaphore-based backpressure controller.
123 ///
124 /// # Arguments
125 ///
126 /// * `max_bytes` - Maximum number of bytes that can be buffered
127 #[must_use]
128 pub fn new(max_bytes: usize) -> Self {
129 Self {
130 inner: Arc::new((
131 Mutex::new(SemInner {
132 available: max_bytes,
133 max_bytes,
134 }),
135 Condvar::new(),
136 )),
137 }
138 }
139}
140
141#[async_trait]
142impl BytePermits for SemaphorePermits {
143 async fn acquire(&self, n_bytes: usize) -> Permit {
144 if n_bytes == 0 {
145 return Permit::noop();
146 }
147
148 // Blocking wait is performed on a dedicated thread so we don't block
149 // the async executor. parking_lot::Condvar::wait is synchronous and
150 // safe to use here because SemInner uses parking_lot::Mutex.
151 let inner = self.inner.clone();
152 let actual = compio::runtime::spawn_blocking(move || {
153 let (mutex, condvar) = &*inner;
154 let mut guard = mutex.lock();
155 // Clamp to max_bytes so a single oversized message never deadlocks:
156 // the message will consume the entire capacity instead of waiting
157 // forever for capacity that can never exist.
158 let claim = n_bytes.min(guard.max_bytes);
159 // Wait until enough capacity is available.
160 while guard.available < claim {
161 condvar.wait(&mut guard);
162 }
163 guard.available -= claim;
164 // Return the actual bytes claimed so the Permit releases the right amount.
165 claim
166 })
167 .await;
168
169 Permit::byte_sem(self.inner.clone(), actual)
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176
177 #[test]
178 fn noop_permits_always_succeed() {
179 let permits = NoOpPermits;
180 let rt = compio::runtime::Runtime::new().unwrap();
181 rt.block_on(async {
182 let _p1 = permits.acquire(1024).await;
183 let _p2 = permits.acquire(1_000_000).await;
184 // Should not block
185 });
186 }
187
188 #[test]
189 fn semaphore_permits_enforce_limit() {
190 let permits = SemaphorePermits::new(1024);
191 let rt = compio::runtime::Runtime::new().unwrap();
192
193 rt.block_on(async {
194 // First 1024 bytes should succeed
195 let p1 = permits.acquire(1024).await;
196
197 // Try to acquire more - this would block, so we test the behavior
198 // by checking we can acquire after dropping
199 drop(p1);
200
201 let _p2 = permits.acquire(512).await;
202 let _p3 = permits.acquire(512).await;
203 // Should succeed with 1024 total
204 });
205 }
206
207 #[test]
208 fn semaphore_permits_release_on_drop() {
209 let permits = SemaphorePermits::new(1000);
210 let rt = compio::runtime::Runtime::new().unwrap();
211
212 rt.block_on(async {
213 {
214 let _p1 = permits.acquire(500).await;
215 let _p2 = permits.acquire(500).await;
216 // Full capacity used
217 } // Permits dropped here
218
219 // Should be able to acquire again after drop
220 let _p3 = permits.acquire(1000).await;
221 });
222 }
223
224 #[test]
225 fn semaphore_permits_oversized_acquire_does_not_deadlock() {
226 // A single acquire larger than max_bytes must complete (clamped to max_bytes)
227 // rather than deadlocking forever waiting for capacity that can never exist.
228 let permits = SemaphorePermits::new(1024);
229 let rt = compio::runtime::Runtime::new().unwrap();
230
231 rt.block_on(async {
232 let permit = permits.acquire(2048).await; // 2× max — must not deadlock
233 drop(permit);
234 // After release, we can acquire up to max_bytes again.
235 let _p = permits.acquire(1024).await;
236 });
237 }
238
239 #[test]
240 fn semaphore_permits_single_atomic_acquire() {
241 // Verify that acquiring N bytes is done atomically (not O(N) individual acquires)
242 let permits = SemaphorePermits::new(1024 * 1024); // 1MB
243 let rt = compio::runtime::Runtime::new().unwrap();
244
245 rt.block_on(async {
246 // Acquire a large block in one shot - this should not loop N times
247 let permit = permits.acquire(512 * 1024).await; // 512KB
248 drop(permit);
249 });
250 }
251}