Skip to main content

procwire_client/
backpressure.rs

1//! Backpressure handling for write operations.
2//!
3//! This module provides backpressure management to prevent memory exhaustion
4//! when the write side is slower than the producer side. It tracks pending
5//! frames and provides mechanisms to limit the number of in-flight writes.
6//!
7//! # Usage
8//!
9//! The [`BackpressureController`] is used internally by the writer task to
10//! track pending writes. Handlers interact with backpressure through the
11//! [`WriterHandle`](crate::writer::WriterHandle) which will automatically
12//! wait or reject when backpressure is active.
13//!
14//! # Configuration
15//!
16//! - `max_pending`: Maximum number of pending frames (default: 1024)
17//! - Timeout: How long to wait when backpressure is active (default: 5s)
18
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use crate::error::{ProcwireError, Result};
24
25/// Default maximum pending frames before backpressure kicks in.
26pub const DEFAULT_MAX_PENDING: usize = 1024;
27
28/// Default backpressure timeout (how long to wait for space to become available).
29pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
30
31/// Interval between backpressure checks.
32const CHECK_INTERVAL: Duration = Duration::from_micros(100);
33
34/// Backpressure controller for managing write queue pressure.
35///
36/// This controller uses atomic operations for lock-free tracking of
37/// pending frame counts. It can be shared across multiple threads/tasks.
38#[derive(Debug)]
39pub struct BackpressureController {
40    /// Current pending frame count.
41    pending: Arc<AtomicUsize>,
42    /// Maximum allowed pending frames.
43    max_pending: usize,
44    /// Timeout for waiting on backpressure.
45    timeout: Duration,
46}
47
48impl BackpressureController {
49    /// Create a new backpressure controller with specified limit.
50    pub fn new(max_pending: usize) -> Self {
51        Self {
52            pending: Arc::new(AtomicUsize::new(0)),
53            max_pending,
54            timeout: DEFAULT_TIMEOUT,
55        }
56    }
57
58    /// Create a controller with custom timeout.
59    pub fn with_timeout(max_pending: usize, timeout: Duration) -> Self {
60        Self {
61            pending: Arc::new(AtomicUsize::new(0)),
62            max_pending,
63            timeout,
64        }
65    }
66
67    /// Create from an existing atomic counter (for sharing with writer task).
68    pub fn from_shared(pending: Arc<AtomicUsize>, max_pending: usize, timeout: Duration) -> Self {
69        Self {
70            pending,
71            max_pending,
72            timeout,
73        }
74    }
75
76    /// Get a clone of the pending counter Arc (for sharing).
77    pub fn pending_counter(&self) -> Arc<AtomicUsize> {
78        self.pending.clone()
79    }
80
81    /// Check if we can accept more frames without blocking.
82    #[inline]
83    pub fn can_accept(&self) -> bool {
84        self.pending.load(Ordering::Acquire) < self.max_pending
85    }
86
87    /// Check if backpressure is currently active.
88    #[inline]
89    pub fn is_active(&self) -> bool {
90        self.pending.load(Ordering::Acquire) >= self.max_pending
91    }
92
93    /// Get current pending count.
94    #[inline]
95    pub fn pending_count(&self) -> usize {
96        self.pending.load(Ordering::Acquire)
97    }
98
99    /// Get maximum pending limit.
100    #[inline]
101    pub fn max_pending(&self) -> usize {
102        self.max_pending
103    }
104
105    /// Get available capacity.
106    #[inline]
107    pub fn available_capacity(&self) -> usize {
108        let current = self.pending.load(Ordering::Acquire);
109        self.max_pending.saturating_sub(current)
110    }
111
112    /// Try to reserve a slot without blocking.
113    ///
114    /// Returns `Ok(())` if reserved, `Err(BackpressureTimeout)` if at capacity.
115    pub fn try_reserve(&self) -> Result<()> {
116        let current = self.pending.load(Ordering::Acquire);
117        if current >= self.max_pending {
118            return Err(ProcwireError::BackpressureTimeout);
119        }
120
121        // Use compare-and-swap for safety (though fetch_add is usually fine)
122        self.pending.fetch_add(1, Ordering::AcqRel);
123        Ok(())
124    }
125
126    /// Reserve a slot, waiting if necessary.
127    ///
128    /// Returns `Err(BackpressureTimeout)` if timeout is reached.
129    pub async fn reserve(&self) -> Result<()> {
130        // Fast path: try immediate reservation
131        if self.pending.load(Ordering::Acquire) < self.max_pending {
132            self.pending.fetch_add(1, Ordering::AcqRel);
133            return Ok(());
134        }
135
136        // Slow path: wait for space
137        self.wait_and_reserve().await
138    }
139
140    /// Wait for backpressure to clear and then reserve.
141    async fn wait_and_reserve(&self) -> Result<()> {
142        let start = Instant::now();
143
144        loop {
145            let current = self.pending.load(Ordering::Acquire);
146            if current < self.max_pending {
147                self.pending.fetch_add(1, Ordering::AcqRel);
148                return Ok(());
149            }
150
151            if start.elapsed() > self.timeout {
152                return Err(ProcwireError::BackpressureTimeout);
153            }
154
155            tokio::time::sleep(CHECK_INTERVAL).await;
156        }
157    }
158
159    /// Release a slot (called after frame is written).
160    #[inline]
161    pub fn release(&self) {
162        self.pending.fetch_sub(1, Ordering::Release);
163    }
164
165    /// Release multiple slots at once (for batch writes).
166    #[inline]
167    pub fn release_many(&self, count: usize) {
168        self.pending.fetch_sub(count, Ordering::Release);
169    }
170
171    /// Reset the pending count (use with caution).
172    pub fn reset(&self) {
173        self.pending.store(0, Ordering::Release);
174    }
175}
176
177impl Default for BackpressureController {
178    fn default() -> Self {
179        Self::new(DEFAULT_MAX_PENDING)
180    }
181}
182
183impl Clone for BackpressureController {
184    fn clone(&self) -> Self {
185        Self {
186            pending: self.pending.clone(),
187            max_pending: self.max_pending,
188            timeout: self.timeout,
189        }
190    }
191}
192
193/// Guard that automatically releases a backpressure slot on drop.
194///
195/// Useful for RAII-style backpressure management.
196pub struct BackpressureGuard {
197    controller: BackpressureController,
198    released: bool,
199}
200
201impl BackpressureGuard {
202    /// Create a guard that will release on drop.
203    pub fn new(controller: BackpressureController) -> Self {
204        Self {
205            controller,
206            released: false,
207        }
208    }
209
210    /// Manually release the slot.
211    pub fn release(mut self) {
212        if !self.released {
213            self.controller.release();
214            self.released = true;
215        }
216    }
217
218    /// Disarm the guard (don't release on drop).
219    ///
220    /// Use this when the slot will be released by another mechanism.
221    pub fn disarm(&mut self) {
222        self.released = true;
223    }
224}
225
226impl Drop for BackpressureGuard {
227    fn drop(&mut self) {
228        if !self.released {
229            self.controller.release();
230        }
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237
238    #[test]
239    fn test_controller_creation() {
240        let ctrl = BackpressureController::new(100);
241        assert_eq!(ctrl.max_pending(), 100);
242        assert_eq!(ctrl.pending_count(), 0);
243        assert!(ctrl.can_accept());
244        assert!(!ctrl.is_active());
245    }
246
247    #[test]
248    fn test_controller_default() {
249        let ctrl = BackpressureController::default();
250        assert_eq!(ctrl.max_pending(), DEFAULT_MAX_PENDING);
251    }
252
253    #[test]
254    fn test_try_reserve_success() {
255        let ctrl = BackpressureController::new(10);
256
257        for _ in 0..10 {
258            assert!(ctrl.try_reserve().is_ok());
259        }
260
261        assert_eq!(ctrl.pending_count(), 10);
262        assert!(ctrl.is_active());
263    }
264
265    #[test]
266    fn test_try_reserve_at_capacity() {
267        let ctrl = BackpressureController::new(5);
268
269        // Fill to capacity
270        for _ in 0..5 {
271            ctrl.try_reserve().unwrap();
272        }
273
274        // Should fail now
275        let result = ctrl.try_reserve();
276        assert!(matches!(result, Err(ProcwireError::BackpressureTimeout)));
277    }
278
279    #[test]
280    fn test_release() {
281        let ctrl = BackpressureController::new(10);
282
283        ctrl.try_reserve().unwrap();
284        ctrl.try_reserve().unwrap();
285        assert_eq!(ctrl.pending_count(), 2);
286
287        ctrl.release();
288        assert_eq!(ctrl.pending_count(), 1);
289
290        ctrl.release();
291        assert_eq!(ctrl.pending_count(), 0);
292    }
293
294    #[test]
295    fn test_release_many() {
296        let ctrl = BackpressureController::new(100);
297
298        for _ in 0..50 {
299            ctrl.try_reserve().unwrap();
300        }
301        assert_eq!(ctrl.pending_count(), 50);
302
303        ctrl.release_many(30);
304        assert_eq!(ctrl.pending_count(), 20);
305    }
306
307    #[test]
308    fn test_available_capacity() {
309        let ctrl = BackpressureController::new(100);
310
311        assert_eq!(ctrl.available_capacity(), 100);
312
313        ctrl.try_reserve().unwrap();
314        assert_eq!(ctrl.available_capacity(), 99);
315
316        for _ in 0..50 {
317            ctrl.try_reserve().unwrap();
318        }
319        assert_eq!(ctrl.available_capacity(), 49);
320    }
321
322    #[test]
323    fn test_clone_shares_state() {
324        let ctrl1 = BackpressureController::new(10);
325        let ctrl2 = ctrl1.clone();
326
327        ctrl1.try_reserve().unwrap();
328        assert_eq!(ctrl2.pending_count(), 1);
329
330        ctrl2.try_reserve().unwrap();
331        assert_eq!(ctrl1.pending_count(), 2);
332    }
333
334    #[test]
335    fn test_from_shared() {
336        let pending = Arc::new(AtomicUsize::new(5));
337        let ctrl = BackpressureController::from_shared(pending.clone(), 10, Duration::from_secs(1));
338
339        assert_eq!(ctrl.pending_count(), 5);
340        assert!(!ctrl.is_active());
341
342        pending.store(10, Ordering::SeqCst);
343        assert!(ctrl.is_active());
344    }
345
346    #[tokio::test]
347    async fn test_reserve_immediate() {
348        let ctrl = BackpressureController::new(10);
349
350        ctrl.reserve().await.unwrap();
351        assert_eq!(ctrl.pending_count(), 1);
352    }
353
354    #[tokio::test]
355    async fn test_reserve_timeout() {
356        let ctrl = BackpressureController::with_timeout(1, Duration::from_millis(10));
357
358        // Fill to capacity
359        ctrl.try_reserve().unwrap();
360
361        // Should timeout
362        let start = Instant::now();
363        let result = ctrl.reserve().await;
364        let elapsed = start.elapsed();
365
366        assert!(matches!(result, Err(ProcwireError::BackpressureTimeout)));
367        assert!(elapsed >= Duration::from_millis(10));
368    }
369
370    #[tokio::test]
371    async fn test_reserve_wait_success() {
372        let ctrl = BackpressureController::with_timeout(1, Duration::from_secs(1));
373
374        // Fill to capacity
375        ctrl.try_reserve().unwrap();
376
377        // Spawn task to release after delay
378        let ctrl_clone = ctrl.clone();
379        tokio::spawn(async move {
380            tokio::time::sleep(Duration::from_millis(10)).await;
381            ctrl_clone.release();
382        });
383
384        // Should succeed after release
385        let result = ctrl.reserve().await;
386        assert!(result.is_ok());
387    }
388
389    #[test]
390    fn test_reset() {
391        let ctrl = BackpressureController::new(100);
392
393        for _ in 0..50 {
394            ctrl.try_reserve().unwrap();
395        }
396        assert_eq!(ctrl.pending_count(), 50);
397
398        ctrl.reset();
399        assert_eq!(ctrl.pending_count(), 0);
400    }
401
402    #[test]
403    fn test_guard_release_on_drop() {
404        let ctrl = BackpressureController::new(10);
405        ctrl.try_reserve().unwrap();
406
407        {
408            let _guard = BackpressureGuard::new(ctrl.clone());
409            assert_eq!(ctrl.pending_count(), 1);
410        }
411
412        // Should be released after guard drops
413        assert_eq!(ctrl.pending_count(), 0);
414    }
415
416    #[test]
417    fn test_guard_manual_release() {
418        let ctrl = BackpressureController::new(10);
419        ctrl.try_reserve().unwrap();
420
421        let guard = BackpressureGuard::new(ctrl.clone());
422        assert_eq!(ctrl.pending_count(), 1);
423
424        guard.release();
425        assert_eq!(ctrl.pending_count(), 0);
426    }
427
428    #[test]
429    fn test_guard_disarm() {
430        let ctrl = BackpressureController::new(10);
431        ctrl.try_reserve().unwrap();
432
433        {
434            let mut guard = BackpressureGuard::new(ctrl.clone());
435            guard.disarm();
436        }
437
438        // Should NOT be released (disarmed)
439        assert_eq!(ctrl.pending_count(), 1);
440    }
441}