1use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
34use std::sync::Arc;
35
36#[derive(Debug, Clone)]
38pub struct BackpressureConfig {
39 pub exclusive_credits: usize,
42
43 pub floating_credits: usize,
46
47 pub overflow_strategy: OverflowStrategy,
49
50 pub high_watermark: f64,
53
54 pub low_watermark: f64,
57}
58
59impl Default for BackpressureConfig {
60 fn default() -> Self {
61 Self {
62 exclusive_credits: 4,
63 floating_credits: 8,
64 overflow_strategy: OverflowStrategy::Block,
65 high_watermark: 0.8,
66 low_watermark: 0.5,
67 }
68 }
69}
70
71impl BackpressureConfig {
72 #[must_use]
74 pub fn builder() -> BackpressureConfigBuilder {
75 BackpressureConfigBuilder::default()
76 }
77
78 #[must_use]
80 pub fn total_credits(&self) -> usize {
81 self.exclusive_credits + self.floating_credits
82 }
83}
84
85#[derive(Debug, Default)]
87pub struct BackpressureConfigBuilder {
88 exclusive_credits: Option<usize>,
89 floating_credits: Option<usize>,
90 overflow_strategy: Option<OverflowStrategy>,
91 high_watermark: Option<f64>,
92 low_watermark: Option<f64>,
93}
94
95impl BackpressureConfigBuilder {
96 #[must_use]
98 pub fn exclusive_credits(mut self, credits: usize) -> Self {
99 self.exclusive_credits = Some(credits);
100 self
101 }
102
103 #[must_use]
105 pub fn floating_credits(mut self, credits: usize) -> Self {
106 self.floating_credits = Some(credits);
107 self
108 }
109
110 #[must_use]
112 pub fn overflow_strategy(mut self, strategy: OverflowStrategy) -> Self {
113 self.overflow_strategy = Some(strategy);
114 self
115 }
116
117 #[must_use]
119 pub fn high_watermark(mut self, watermark: f64) -> Self {
120 self.high_watermark = Some(watermark.clamp(0.0, 1.0));
121 self
122 }
123
124 #[must_use]
126 pub fn low_watermark(mut self, watermark: f64) -> Self {
127 self.low_watermark = Some(watermark.clamp(0.0, 1.0));
128 self
129 }
130
131 #[must_use]
133 pub fn build(self) -> BackpressureConfig {
134 BackpressureConfig {
135 exclusive_credits: self.exclusive_credits.unwrap_or(4).min(u16::MAX as usize),
136 floating_credits: self.floating_credits.unwrap_or(8).min(u16::MAX as usize),
137 overflow_strategy: self.overflow_strategy.unwrap_or(OverflowStrategy::Block),
138 high_watermark: self.high_watermark.unwrap_or(0.8),
139 low_watermark: self.low_watermark.unwrap_or(0.5),
140 }
141 }
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub enum OverflowStrategy {
147 Block,
150
151 Drop,
154
155 Error,
158}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
162pub enum CreditAcquireResult {
163 Acquired,
165 WouldBlock,
167 Dropped,
169}
170
171#[derive(Debug)]
175pub struct CreditGate {
176 available: AtomicI64,
178 max_credits: usize,
180 config: BackpressureConfig,
182 metrics: CreditMetrics,
184}
185
186impl CreditGate {
187 #[must_use]
189 pub fn new(config: BackpressureConfig) -> Self {
190 let max_credits = config.total_credits();
191 Self {
192 #[allow(clippy::cast_possible_wrap)] available: AtomicI64::new(max_credits as i64),
194 max_credits,
195 config,
196 metrics: CreditMetrics::new(),
197 }
198 }
199
200 pub fn try_acquire(&self) -> CreditAcquireResult {
204 self.try_acquire_n(1)
205 }
206
207 pub fn try_acquire_n(&self, n: usize) -> CreditAcquireResult {
209 let n = i64::try_from(n).unwrap_or(i64::MAX);
210
211 let mut current = self.available.load(Ordering::Acquire);
213 loop {
214 if current < n {
215 self.metrics.record_blocked();
217 return match self.config.overflow_strategy {
218 OverflowStrategy::Drop => {
219 self.metrics.record_dropped(u64::try_from(n).unwrap_or(0));
220 CreditAcquireResult::Dropped
221 }
222 OverflowStrategy::Block | OverflowStrategy::Error => {
224 CreditAcquireResult::WouldBlock
225 }
226 };
227 }
228
229 match self.available.compare_exchange_weak(
230 current,
231 current - n,
232 Ordering::AcqRel,
233 Ordering::Acquire,
234 ) {
235 Ok(_) => {
236 self.metrics.record_acquired(u64::try_from(n).unwrap_or(0));
237 return CreditAcquireResult::Acquired;
238 }
239 Err(actual) => current = actual,
240 }
241 }
242 }
243
244 pub fn acquire_blocking(&self, n: usize) {
248 loop {
249 match self.try_acquire_n(n) {
250 CreditAcquireResult::Acquired | CreditAcquireResult::Dropped => return,
252 CreditAcquireResult::WouldBlock => {
253 std::hint::spin_loop();
255 }
256 }
257 }
258 }
259
260 pub fn release(&self, n: usize) {
264 let n = i64::try_from(n).unwrap_or(i64::MAX);
265 let prev = self.available.fetch_add(n, Ordering::Release);
266
267 let new_val = prev + n;
269 if new_val > {
270 #[allow(clippy::cast_possible_wrap)]
271 let max = self.max_credits as i64;
272 max
273 } {
274 let _ = self.available.compare_exchange(
276 new_val,
277 {
278 #[allow(clippy::cast_possible_wrap)]
279 let max = self.max_credits as i64;
280 max
281 },
282 Ordering::AcqRel,
283 Ordering::Relaxed,
284 );
285 }
286
287 self.metrics.record_released(u64::try_from(n).unwrap_or(0));
288 }
289
290 #[must_use]
292 pub fn available(&self) -> usize {
293 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
294 let val = self.available.load(Ordering::Relaxed).max(0) as usize;
296 val
297 }
298
299 #[must_use]
301 pub fn max_credits(&self) -> usize {
302 self.max_credits
303 }
304
305 #[must_use]
307 pub fn is_backpressured(&self) -> bool {
308 #[allow(clippy::cast_precision_loss)] let available = self.available() as f64;
310 #[allow(clippy::cast_precision_loss)] let max = self.max_credits as f64;
312 (available / max) < (1.0 - self.config.high_watermark)
313 }
314
315 #[must_use]
317 pub fn is_recovered(&self) -> bool {
318 #[allow(clippy::cast_precision_loss)] let available = self.available() as f64;
320 #[allow(clippy::cast_precision_loss)] let max = self.max_credits as f64;
322 (available / max) >= (1.0 - self.config.low_watermark)
323 }
324
325 #[must_use]
327 pub fn config(&self) -> &BackpressureConfig {
328 &self.config
329 }
330
331 #[must_use]
333 pub fn metrics(&self) -> &CreditMetrics {
334 &self.metrics
335 }
336
337 pub fn reset(&self) {
339 #[allow(clippy::cast_possible_wrap)] self.available
341 .store(self.max_credits as i64, Ordering::Release);
342 self.metrics.reset();
343 }
344}
345
346#[derive(Debug)]
348pub struct CreditMetrics {
349 credits_acquired: AtomicU64,
351 credits_released: AtomicU64,
353 times_blocked: AtomicU64,
355 items_dropped: AtomicU64,
357}
358
359impl CreditMetrics {
360 fn new() -> Self {
362 Self {
363 credits_acquired: AtomicU64::new(0),
364 credits_released: AtomicU64::new(0),
365 times_blocked: AtomicU64::new(0),
366 items_dropped: AtomicU64::new(0),
367 }
368 }
369
370 fn record_acquired(&self, n: u64) {
371 self.credits_acquired.fetch_add(n, Ordering::Relaxed);
372 }
373
374 fn record_released(&self, n: u64) {
375 self.credits_released.fetch_add(n, Ordering::Relaxed);
376 }
377
378 fn record_blocked(&self) {
379 self.times_blocked.fetch_add(1, Ordering::Relaxed);
380 }
381
382 fn record_dropped(&self, n: u64) {
383 self.items_dropped.fetch_add(n, Ordering::Relaxed);
384 }
385
386 fn reset(&self) {
388 self.credits_acquired.store(0, Ordering::Relaxed);
389 self.credits_released.store(0, Ordering::Relaxed);
390 self.times_blocked.store(0, Ordering::Relaxed);
391 self.items_dropped.store(0, Ordering::Relaxed);
392 }
393
394 #[must_use]
396 pub fn credits_acquired(&self) -> u64 {
397 self.credits_acquired.load(Ordering::Relaxed)
398 }
399
400 #[must_use]
402 pub fn credits_released(&self) -> u64 {
403 self.credits_released.load(Ordering::Relaxed)
404 }
405
406 #[must_use]
408 pub fn times_blocked(&self) -> u64 {
409 self.times_blocked.load(Ordering::Relaxed)
410 }
411
412 #[must_use]
414 pub fn items_dropped(&self) -> u64 {
415 self.items_dropped.load(Ordering::Relaxed)
416 }
417
418 #[must_use]
420 pub fn snapshot(&self) -> CreditMetricsSnapshot {
421 CreditMetricsSnapshot {
422 credits_acquired: self.credits_acquired(),
423 credits_released: self.credits_released(),
424 times_blocked: self.times_blocked(),
425 items_dropped: self.items_dropped(),
426 }
427 }
428}
429
430#[derive(Debug, Clone, Copy)]
432pub struct CreditMetricsSnapshot {
433 pub credits_acquired: u64,
435 pub credits_released: u64,
437 pub times_blocked: u64,
439 pub items_dropped: u64,
441}
442
443impl CreditMetricsSnapshot {
444 #[allow(clippy::cast_possible_wrap)] #[must_use]
447 pub fn credits_in_flight(&self) -> i64 {
448 (self.credits_acquired as i64).saturating_sub(self.credits_released as i64)
449 }
450}
451
452#[derive(Debug)]
457pub struct CreditChannel {
458 gate: Arc<CreditGate>,
460 backlog: Arc<AtomicU64>,
463}
464
465impl CreditChannel {
466 #[must_use]
468 pub fn new(config: BackpressureConfig) -> Self {
469 Self {
470 gate: Arc::new(CreditGate::new(config)),
471 backlog: Arc::new(AtomicU64::new(0)),
472 }
473 }
474
475 #[must_use]
477 pub fn sender(&self) -> CreditSender {
478 CreditSender {
479 gate: Arc::clone(&self.gate),
480 backlog: Arc::clone(&self.backlog),
481 }
482 }
483
484 #[must_use]
486 pub fn receiver(&self) -> CreditReceiver {
487 CreditReceiver {
488 gate: Arc::clone(&self.gate),
489 }
490 }
491
492 #[must_use]
494 pub fn gate(&self) -> &CreditGate {
495 &self.gate
496 }
497}
498
499#[derive(Debug, Clone)]
501pub struct CreditSender {
502 gate: Arc<CreditGate>,
503 backlog: Arc<AtomicU64>,
505}
506
507impl CreditSender {
508 #[must_use]
510 pub fn try_send(&self) -> CreditAcquireResult {
511 self.gate.try_acquire()
512 }
513
514 pub fn send_blocking(&self) {
516 self.gate.acquire_blocking(1);
517 }
518
519 pub fn set_backlog(&self, size: u64) {
521 self.backlog.store(size, Ordering::Relaxed);
522 }
523
524 #[must_use]
526 pub fn backlog(&self) -> u64 {
527 self.backlog.load(Ordering::Relaxed)
528 }
529
530 #[must_use]
532 pub fn available_credits(&self) -> usize {
533 self.gate.available()
534 }
535
536 #[must_use]
538 pub fn is_backpressured(&self) -> bool {
539 self.gate.is_backpressured()
540 }
541}
542
543#[derive(Debug, Clone)]
545pub struct CreditReceiver {
546 gate: Arc<CreditGate>,
547}
548
549impl CreditReceiver {
550 pub fn release(&self, n: usize) {
552 self.gate.release(n);
553 }
554
555 #[must_use]
557 pub fn available_credits(&self) -> usize {
558 self.gate.available()
559 }
560
561 #[must_use]
563 pub fn is_recovered(&self) -> bool {
564 self.gate.is_recovered()
565 }
566
567 #[must_use]
569 pub fn metrics(&self) -> &CreditMetrics {
570 self.gate.metrics()
571 }
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577
578 #[test]
579 fn test_credit_gate_basic() {
580 let config = BackpressureConfig {
581 exclusive_credits: 2,
582 floating_credits: 2,
583 overflow_strategy: OverflowStrategy::Block,
584 ..Default::default()
585 };
586 let gate = CreditGate::new(config);
587
588 assert_eq!(gate.available(), 4);
589 assert_eq!(gate.max_credits(), 4);
590
591 assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
593 assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
594 assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
595 assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
596 assert_eq!(gate.available(), 0);
597
598 assert_eq!(gate.try_acquire(), CreditAcquireResult::WouldBlock);
600
601 gate.release(2);
603 assert_eq!(gate.available(), 2);
604
605 assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
607 assert_eq!(gate.available(), 1);
608 }
609
610 #[test]
611 fn test_credit_gate_drop_strategy() {
612 let config = BackpressureConfig {
613 exclusive_credits: 1,
614 floating_credits: 0,
615 overflow_strategy: OverflowStrategy::Drop,
616 ..Default::default()
617 };
618 let gate = CreditGate::new(config);
619
620 assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
621 assert_eq!(gate.try_acquire(), CreditAcquireResult::Dropped);
622
623 assert_eq!(gate.metrics().items_dropped(), 1);
624 }
625
626 #[test]
627 fn test_credit_gate_batch_acquire() {
628 let config = BackpressureConfig {
629 exclusive_credits: 4,
630 floating_credits: 4,
631 overflow_strategy: OverflowStrategy::Block,
632 ..Default::default()
633 };
634 let gate = CreditGate::new(config);
635
636 assert_eq!(gate.try_acquire_n(5), CreditAcquireResult::Acquired);
637 assert_eq!(gate.available(), 3);
638
639 assert_eq!(gate.try_acquire_n(4), CreditAcquireResult::WouldBlock);
640 assert_eq!(gate.try_acquire_n(3), CreditAcquireResult::Acquired);
641 assert_eq!(gate.available(), 0);
642 }
643
644 #[test]
645 fn test_credit_channel() {
646 let config = BackpressureConfig::default();
647 let channel = CreditChannel::new(config);
648
649 let sender = channel.sender();
650 let receiver = channel.receiver();
651
652 assert_eq!(sender.try_send(), CreditAcquireResult::Acquired);
654 sender.set_backlog(10);
655 assert_eq!(sender.backlog(), 10);
656
657 receiver.release(1);
659 assert_eq!(receiver.available_credits(), channel.gate().max_credits());
660 }
661
662 #[test]
663 fn test_backpressure_watermarks() {
664 let config = BackpressureConfig {
665 exclusive_credits: 10,
666 floating_credits: 0,
667 high_watermark: 0.8, low_watermark: 0.5, ..Default::default()
670 };
671 let gate = CreditGate::new(config);
672
673 assert!(!gate.is_backpressured());
675 assert!(gate.is_recovered());
676
677 for _ in 0..9 {
679 gate.try_acquire();
680 }
681 assert!(gate.is_backpressured());
682 assert!(!gate.is_recovered());
683
684 gate.release(4);
686 assert!(!gate.is_backpressured());
687 assert!(gate.is_recovered());
688 }
689
690 #[test]
691 fn test_metrics_snapshot() {
692 let config = BackpressureConfig::default();
693 let gate = CreditGate::new(config);
694
695 gate.try_acquire();
696 gate.try_acquire();
697 gate.release(1);
698
699 let snapshot = gate.metrics().snapshot();
700 assert_eq!(snapshot.credits_acquired, 2);
701 assert_eq!(snapshot.credits_released, 1);
702 assert_eq!(snapshot.credits_in_flight(), 1);
703 }
704
705 #[test]
706 fn test_config_builder() {
707 let config = BackpressureConfig::builder()
708 .exclusive_credits(8)
709 .floating_credits(16)
710 .overflow_strategy(OverflowStrategy::Drop)
711 .high_watermark(0.9)
712 .low_watermark(0.6)
713 .build();
714
715 assert_eq!(config.exclusive_credits, 8);
716 assert_eq!(config.floating_credits, 16);
717 assert_eq!(config.overflow_strategy, OverflowStrategy::Drop);
718 assert!((config.high_watermark - 0.9).abs() < f64::EPSILON);
719 assert!((config.low_watermark - 0.6).abs() < f64::EPSILON);
720 assert_eq!(config.total_credits(), 24);
721 }
722
723 #[test]
724 fn test_concurrent_acquire_release() {
725 use std::sync::Arc;
726 use std::thread;
727
728 let config = BackpressureConfig {
729 exclusive_credits: 100,
730 floating_credits: 0,
731 overflow_strategy: OverflowStrategy::Block,
732 ..Default::default()
733 };
734 let gate = Arc::new(CreditGate::new(config));
735
736 let gate_sender = Arc::clone(&gate);
737 let gate_receiver = Arc::clone(&gate);
738
739 let sender = thread::spawn(move || {
740 let mut acquired = 0;
741 for _ in 0..1000 {
742 if gate_sender.try_acquire() == CreditAcquireResult::Acquired {
743 acquired += 1;
744 }
745 std::hint::spin_loop();
747 }
748 acquired
749 });
750
751 let receiver = thread::spawn(move || {
752 let mut released = 0;
753 for _ in 0..500 {
754 gate_receiver.release(1);
755 released += 1;
756 std::hint::spin_loop();
757 }
758 released
759 });
760
761 let acquired = sender.join().unwrap();
762 let released = receiver.join().unwrap();
763
764 assert!(acquired > 0);
766 assert_eq!(released, 500);
767 }
768}