procwire_client/
backpressure.rs1use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use crate::error::{ProcwireError, Result};
24
25pub const DEFAULT_MAX_PENDING: usize = 1024;
27
28pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
30
31const CHECK_INTERVAL: Duration = Duration::from_micros(100);
33
34#[derive(Debug)]
39pub struct BackpressureController {
40 pending: Arc<AtomicUsize>,
42 max_pending: usize,
44 timeout: Duration,
46}
47
48impl BackpressureController {
49 pub fn new(max_pending: usize) -> Self {
51 Self {
52 pending: Arc::new(AtomicUsize::new(0)),
53 max_pending,
54 timeout: DEFAULT_TIMEOUT,
55 }
56 }
57
58 pub fn with_timeout(max_pending: usize, timeout: Duration) -> Self {
60 Self {
61 pending: Arc::new(AtomicUsize::new(0)),
62 max_pending,
63 timeout,
64 }
65 }
66
67 pub fn from_shared(pending: Arc<AtomicUsize>, max_pending: usize, timeout: Duration) -> Self {
69 Self {
70 pending,
71 max_pending,
72 timeout,
73 }
74 }
75
76 pub fn pending_counter(&self) -> Arc<AtomicUsize> {
78 self.pending.clone()
79 }
80
81 #[inline]
83 pub fn can_accept(&self) -> bool {
84 self.pending.load(Ordering::Acquire) < self.max_pending
85 }
86
87 #[inline]
89 pub fn is_active(&self) -> bool {
90 self.pending.load(Ordering::Acquire) >= self.max_pending
91 }
92
93 #[inline]
95 pub fn pending_count(&self) -> usize {
96 self.pending.load(Ordering::Acquire)
97 }
98
99 #[inline]
101 pub fn max_pending(&self) -> usize {
102 self.max_pending
103 }
104
105 #[inline]
107 pub fn available_capacity(&self) -> usize {
108 let current = self.pending.load(Ordering::Acquire);
109 self.max_pending.saturating_sub(current)
110 }
111
112 pub fn try_reserve(&self) -> Result<()> {
116 let current = self.pending.load(Ordering::Acquire);
117 if current >= self.max_pending {
118 return Err(ProcwireError::BackpressureTimeout);
119 }
120
121 self.pending.fetch_add(1, Ordering::AcqRel);
123 Ok(())
124 }
125
126 pub async fn reserve(&self) -> Result<()> {
130 if self.pending.load(Ordering::Acquire) < self.max_pending {
132 self.pending.fetch_add(1, Ordering::AcqRel);
133 return Ok(());
134 }
135
136 self.wait_and_reserve().await
138 }
139
140 async fn wait_and_reserve(&self) -> Result<()> {
142 let start = Instant::now();
143
144 loop {
145 let current = self.pending.load(Ordering::Acquire);
146 if current < self.max_pending {
147 self.pending.fetch_add(1, Ordering::AcqRel);
148 return Ok(());
149 }
150
151 if start.elapsed() > self.timeout {
152 return Err(ProcwireError::BackpressureTimeout);
153 }
154
155 tokio::time::sleep(CHECK_INTERVAL).await;
156 }
157 }
158
159 #[inline]
161 pub fn release(&self) {
162 self.pending.fetch_sub(1, Ordering::Release);
163 }
164
165 #[inline]
167 pub fn release_many(&self, count: usize) {
168 self.pending.fetch_sub(count, Ordering::Release);
169 }
170
171 pub fn reset(&self) {
173 self.pending.store(0, Ordering::Release);
174 }
175}
176
177impl Default for BackpressureController {
178 fn default() -> Self {
179 Self::new(DEFAULT_MAX_PENDING)
180 }
181}
182
183impl Clone for BackpressureController {
184 fn clone(&self) -> Self {
185 Self {
186 pending: self.pending.clone(),
187 max_pending: self.max_pending,
188 timeout: self.timeout,
189 }
190 }
191}
192
193pub struct BackpressureGuard {
197 controller: BackpressureController,
198 released: bool,
199}
200
201impl BackpressureGuard {
202 pub fn new(controller: BackpressureController) -> Self {
204 Self {
205 controller,
206 released: false,
207 }
208 }
209
210 pub fn release(mut self) {
212 if !self.released {
213 self.controller.release();
214 self.released = true;
215 }
216 }
217
218 pub fn disarm(&mut self) {
222 self.released = true;
223 }
224}
225
226impl Drop for BackpressureGuard {
227 fn drop(&mut self) {
228 if !self.released {
229 self.controller.release();
230 }
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237
238 #[test]
239 fn test_controller_creation() {
240 let ctrl = BackpressureController::new(100);
241 assert_eq!(ctrl.max_pending(), 100);
242 assert_eq!(ctrl.pending_count(), 0);
243 assert!(ctrl.can_accept());
244 assert!(!ctrl.is_active());
245 }
246
247 #[test]
248 fn test_controller_default() {
249 let ctrl = BackpressureController::default();
250 assert_eq!(ctrl.max_pending(), DEFAULT_MAX_PENDING);
251 }
252
253 #[test]
254 fn test_try_reserve_success() {
255 let ctrl = BackpressureController::new(10);
256
257 for _ in 0..10 {
258 assert!(ctrl.try_reserve().is_ok());
259 }
260
261 assert_eq!(ctrl.pending_count(), 10);
262 assert!(ctrl.is_active());
263 }
264
265 #[test]
266 fn test_try_reserve_at_capacity() {
267 let ctrl = BackpressureController::new(5);
268
269 for _ in 0..5 {
271 ctrl.try_reserve().unwrap();
272 }
273
274 let result = ctrl.try_reserve();
276 assert!(matches!(result, Err(ProcwireError::BackpressureTimeout)));
277 }
278
279 #[test]
280 fn test_release() {
281 let ctrl = BackpressureController::new(10);
282
283 ctrl.try_reserve().unwrap();
284 ctrl.try_reserve().unwrap();
285 assert_eq!(ctrl.pending_count(), 2);
286
287 ctrl.release();
288 assert_eq!(ctrl.pending_count(), 1);
289
290 ctrl.release();
291 assert_eq!(ctrl.pending_count(), 0);
292 }
293
294 #[test]
295 fn test_release_many() {
296 let ctrl = BackpressureController::new(100);
297
298 for _ in 0..50 {
299 ctrl.try_reserve().unwrap();
300 }
301 assert_eq!(ctrl.pending_count(), 50);
302
303 ctrl.release_many(30);
304 assert_eq!(ctrl.pending_count(), 20);
305 }
306
307 #[test]
308 fn test_available_capacity() {
309 let ctrl = BackpressureController::new(100);
310
311 assert_eq!(ctrl.available_capacity(), 100);
312
313 ctrl.try_reserve().unwrap();
314 assert_eq!(ctrl.available_capacity(), 99);
315
316 for _ in 0..50 {
317 ctrl.try_reserve().unwrap();
318 }
319 assert_eq!(ctrl.available_capacity(), 49);
320 }
321
322 #[test]
323 fn test_clone_shares_state() {
324 let ctrl1 = BackpressureController::new(10);
325 let ctrl2 = ctrl1.clone();
326
327 ctrl1.try_reserve().unwrap();
328 assert_eq!(ctrl2.pending_count(), 1);
329
330 ctrl2.try_reserve().unwrap();
331 assert_eq!(ctrl1.pending_count(), 2);
332 }
333
334 #[test]
335 fn test_from_shared() {
336 let pending = Arc::new(AtomicUsize::new(5));
337 let ctrl = BackpressureController::from_shared(pending.clone(), 10, Duration::from_secs(1));
338
339 assert_eq!(ctrl.pending_count(), 5);
340 assert!(!ctrl.is_active());
341
342 pending.store(10, Ordering::SeqCst);
343 assert!(ctrl.is_active());
344 }
345
346 #[tokio::test]
347 async fn test_reserve_immediate() {
348 let ctrl = BackpressureController::new(10);
349
350 ctrl.reserve().await.unwrap();
351 assert_eq!(ctrl.pending_count(), 1);
352 }
353
354 #[tokio::test]
355 async fn test_reserve_timeout() {
356 let ctrl = BackpressureController::with_timeout(1, Duration::from_millis(10));
357
358 ctrl.try_reserve().unwrap();
360
361 let start = Instant::now();
363 let result = ctrl.reserve().await;
364 let elapsed = start.elapsed();
365
366 assert!(matches!(result, Err(ProcwireError::BackpressureTimeout)));
367 assert!(elapsed >= Duration::from_millis(10));
368 }
369
370 #[tokio::test]
371 async fn test_reserve_wait_success() {
372 let ctrl = BackpressureController::with_timeout(1, Duration::from_secs(1));
373
374 ctrl.try_reserve().unwrap();
376
377 let ctrl_clone = ctrl.clone();
379 tokio::spawn(async move {
380 tokio::time::sleep(Duration::from_millis(10)).await;
381 ctrl_clone.release();
382 });
383
384 let result = ctrl.reserve().await;
386 assert!(result.is_ok());
387 }
388
389 #[test]
390 fn test_reset() {
391 let ctrl = BackpressureController::new(100);
392
393 for _ in 0..50 {
394 ctrl.try_reserve().unwrap();
395 }
396 assert_eq!(ctrl.pending_count(), 50);
397
398 ctrl.reset();
399 assert_eq!(ctrl.pending_count(), 0);
400 }
401
402 #[test]
403 fn test_guard_release_on_drop() {
404 let ctrl = BackpressureController::new(10);
405 ctrl.try_reserve().unwrap();
406
407 {
408 let _guard = BackpressureGuard::new(ctrl.clone());
409 assert_eq!(ctrl.pending_count(), 1);
410 }
411
412 assert_eq!(ctrl.pending_count(), 0);
414 }
415
416 #[test]
417 fn test_guard_manual_release() {
418 let ctrl = BackpressureController::new(10);
419 ctrl.try_reserve().unwrap();
420
421 let guard = BackpressureGuard::new(ctrl.clone());
422 assert_eq!(ctrl.pending_count(), 1);
423
424 guard.release();
425 assert_eq!(ctrl.pending_count(), 0);
426 }
427
428 #[test]
429 fn test_guard_disarm() {
430 let ctrl = BackpressureController::new(10);
431 ctrl.try_reserve().unwrap();
432
433 {
434 let mut guard = BackpressureGuard::new(ctrl.clone());
435 guard.disarm();
436 }
437
438 assert_eq!(ctrl.pending_count(), 1);
440 }
441}