use serde::Serialize;
use std::sync::{
atomic::{AtomicU32, AtomicU64, Ordering},
Arc,
};
#[derive(Debug, Clone)]
pub struct BackpressureConfig {
pub queue_threshold: f64,
pub reject_threshold: f64,
pub max_queue_depth: u32,
pub base_retry_after_ms: u64,
}
impl Default for BackpressureConfig {
fn default() -> Self {
BackpressureConfig {
queue_threshold: 0.70,
reject_threshold: 0.90,
max_queue_depth: 100,
base_retry_after_ms: 100,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BackpressureDecision {
Accept,
Queue,
Reject {
retry_after_ms: u64,
},
}
#[derive(Debug, Clone, Serialize)]
pub struct BackpressureStats {
pub queue_depth: u32,
pub total_accepted: u64,
pub total_queued: u64,
pub total_rejected: u64,
pub rejection_rate: f64,
}
#[derive(Clone)]
pub struct BackpressureController {
config: BackpressureConfig,
queue_depth: Arc<AtomicU32>,
total_accepted: Arc<AtomicU64>,
total_queued: Arc<AtomicU64>,
total_rejected: Arc<AtomicU64>,
}
impl BackpressureController {
pub fn new(config: BackpressureConfig) -> Self {
BackpressureController {
config,
queue_depth: Arc::new(AtomicU32::new(0)),
total_accepted: Arc::new(AtomicU64::new(0)),
total_queued: Arc::new(AtomicU64::new(0)),
total_rejected: Arc::new(AtomicU64::new(0)),
}
}
pub fn should_accept_request(&self, utilization: f64) -> BackpressureDecision {
let utilization = utilization.clamp(0.0, 1.0);
let depth = self.queue_depth.load(Ordering::Acquire);
if utilization >= self.config.reject_threshold || depth >= self.config.max_queue_depth {
self.total_rejected.fetch_add(1, Ordering::Relaxed);
let backoff_multiplier = 1u64 + (depth as u64 / 10);
let retry_after_ms = self
.config
.base_retry_after_ms
.saturating_mul(backoff_multiplier);
return BackpressureDecision::Reject { retry_after_ms };
}
if utilization >= self.config.queue_threshold {
self.queue_depth.fetch_add(1, Ordering::AcqRel);
self.total_queued.fetch_add(1, Ordering::Relaxed);
return BackpressureDecision::Queue;
}
self.total_accepted.fetch_add(1, Ordering::Relaxed);
BackpressureDecision::Accept
}
pub fn record_dequeue(&self) {
loop {
let cur = self.queue_depth.load(Ordering::Acquire);
if cur == 0 {
break;
}
if self
.queue_depth
.compare_exchange(cur, cur - 1, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
break;
}
}
}
pub fn record_completion(&self) {
}
pub fn current_queue_depth(&self) -> u32 {
self.queue_depth.load(Ordering::Relaxed)
}
pub fn stats(&self) -> BackpressureStats {
let accepted = self.total_accepted.load(Ordering::Relaxed);
let queued = self.total_queued.load(Ordering::Relaxed);
let rejected = self.total_rejected.load(Ordering::Relaxed);
let total = accepted + queued + rejected;
BackpressureStats {
queue_depth: self.queue_depth.load(Ordering::Relaxed),
total_accepted: accepted,
total_queued: queued,
total_rejected: rejected,
rejection_rate: if total == 0 {
0.0
} else {
rejected as f64 / total as f64
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_controller() -> BackpressureController {
BackpressureController::new(BackpressureConfig::default())
}
#[test]
fn test_default_config_values() {
let cfg = BackpressureConfig::default();
assert!((cfg.queue_threshold - 0.70).abs() < f64::EPSILON);
assert!((cfg.reject_threshold - 0.90).abs() < f64::EPSILON);
assert_eq!(cfg.max_queue_depth, 100);
assert_eq!(cfg.base_retry_after_ms, 100);
}
#[test]
fn test_accept_when_zero_utilization() {
let ctrl = make_controller();
assert_eq!(
ctrl.should_accept_request(0.0),
BackpressureDecision::Accept
);
}
#[test]
fn test_accept_below_queue_threshold() {
let ctrl = make_controller();
assert_eq!(
ctrl.should_accept_request(0.5),
BackpressureDecision::Accept
);
}
#[test]
fn test_accept_just_below_queue_threshold() {
let ctrl = make_controller();
assert_eq!(
ctrl.should_accept_request(0.699),
BackpressureDecision::Accept
);
}
#[test]
fn test_queue_at_queue_threshold() {
let ctrl = make_controller();
assert_eq!(
ctrl.should_accept_request(0.70),
BackpressureDecision::Queue
);
}
#[test]
fn test_queue_between_thresholds() {
let ctrl = make_controller();
assert_eq!(
ctrl.should_accept_request(0.80),
BackpressureDecision::Queue
);
}
#[test]
fn test_queue_just_below_reject_threshold() {
let ctrl = make_controller();
assert_eq!(
ctrl.should_accept_request(0.899),
BackpressureDecision::Queue
);
}
#[test]
fn test_reject_at_reject_threshold() {
let ctrl = make_controller();
match ctrl.should_accept_request(0.90) {
BackpressureDecision::Reject { .. } => {}
other => panic!("Expected Reject, got {:?}", other),
}
}
#[test]
fn test_reject_above_reject_threshold() {
let ctrl = make_controller();
match ctrl.should_accept_request(1.0) {
BackpressureDecision::Reject { .. } => {}
other => panic!("Expected Reject, got {:?}", other),
}
}
#[test]
fn test_reject_when_queue_full() {
let ctrl = BackpressureController::new(BackpressureConfig {
queue_threshold: 0.70,
reject_threshold: 0.90,
max_queue_depth: 3,
base_retry_after_ms: 100,
});
ctrl.should_accept_request(0.80);
ctrl.should_accept_request(0.80);
ctrl.should_accept_request(0.80);
match ctrl.should_accept_request(0.80) {
BackpressureDecision::Reject { .. } => {}
other => panic!("Expected Reject when queue full, got {:?}", other),
}
}
#[test]
fn test_retry_after_increases_with_queue_depth() {
let ctrl = BackpressureController::new(BackpressureConfig {
queue_threshold: 0.70,
reject_threshold: 0.90,
max_queue_depth: 1000,
base_retry_after_ms: 100,
});
for _ in 0..10 {
ctrl.should_accept_request(0.80); }
let first_reject = ctrl.should_accept_request(0.95);
for _ in 0..10 {
ctrl.should_accept_request(0.80);
}
let second_reject = ctrl.should_accept_request(0.95);
let first_ms = match first_reject {
BackpressureDecision::Reject { retry_after_ms } => retry_after_ms,
other => panic!("Expected Reject, got {:?}", other),
};
let second_ms = match second_reject {
BackpressureDecision::Reject { retry_after_ms } => retry_after_ms,
other => panic!("Expected Reject, got {:?}", other),
};
assert!(
second_ms >= first_ms,
"retry_after_ms should be >= at higher queue depth: {} vs {}",
second_ms,
first_ms
);
}
#[test]
fn test_retry_after_base_when_zero_depth() {
let ctrl = BackpressureController::new(BackpressureConfig {
base_retry_after_ms: 50,
reject_threshold: 0.0, ..Default::default()
});
match ctrl.should_accept_request(0.0) {
BackpressureDecision::Reject { retry_after_ms } => {
assert_eq!(retry_after_ms, 50);
}
other => panic!("Expected Reject, got {:?}", other),
}
}
#[test]
fn test_record_dequeue_decrements() {
let ctrl = make_controller();
ctrl.should_accept_request(0.80); ctrl.should_accept_request(0.80); assert_eq!(ctrl.current_queue_depth(), 2);
ctrl.record_dequeue();
assert_eq!(ctrl.current_queue_depth(), 1);
ctrl.record_dequeue();
assert_eq!(ctrl.current_queue_depth(), 0);
}
#[test]
fn test_record_dequeue_saturates_at_zero() {
let ctrl = make_controller();
assert_eq!(ctrl.current_queue_depth(), 0);
ctrl.record_dequeue(); assert_eq!(ctrl.current_queue_depth(), 0);
}
#[test]
fn test_stats_initial_state() {
let ctrl = make_controller();
let stats = ctrl.stats();
assert_eq!(stats.queue_depth, 0);
assert_eq!(stats.total_accepted, 0);
assert_eq!(stats.total_queued, 0);
assert_eq!(stats.total_rejected, 0);
assert!((stats.rejection_rate - 0.0).abs() < f64::EPSILON);
}
#[test]
fn test_stats_counts_accumulate() {
let ctrl = make_controller();
ctrl.should_accept_request(0.0); ctrl.should_accept_request(0.5); ctrl.should_accept_request(0.80); ctrl.should_accept_request(0.95);
let stats = ctrl.stats();
assert_eq!(stats.total_accepted, 2);
assert_eq!(stats.total_queued, 1);
assert_eq!(stats.total_rejected, 1);
}
#[test]
fn test_stats_rejection_rate() {
let ctrl = make_controller();
ctrl.should_accept_request(0.0); ctrl.should_accept_request(0.95);
let stats = ctrl.stats();
assert!(
(stats.rejection_rate - 0.5).abs() < f64::EPSILON,
"Expected 0.5, got {}",
stats.rejection_rate
);
}
#[test]
fn test_stats_rejection_rate_zero_when_no_rejects() {
let ctrl = make_controller();
ctrl.should_accept_request(0.0);
ctrl.should_accept_request(0.5);
let stats = ctrl.stats();
assert!((stats.rejection_rate - 0.0).abs() < f64::EPSILON);
}
#[test]
fn test_concurrent_decisions_do_not_corrupt_state() {
use std::thread;
let ctrl = Arc::new(BackpressureController::new(BackpressureConfig {
queue_threshold: 0.70,
reject_threshold: 0.95,
max_queue_depth: 1000,
base_retry_after_ms: 10,
}));
let handles: Vec<_> = (0..8)
.map(|i| {
let c = Arc::clone(&ctrl);
thread::spawn(move || {
let util = (i as f64) / 10.0;
for _ in 0..100 {
let decision = c.should_accept_request(util);
if decision == BackpressureDecision::Queue {
c.record_dequeue();
}
}
})
})
.collect();
for h in handles {
h.join().expect("Thread panicked");
}
let stats = ctrl.stats();
assert_eq!(
stats.total_accepted + stats.total_queued + stats.total_rejected,
800,
"Total decisions should equal 8 threads × 100 requests"
);
}
#[test]
fn test_utilization_above_one_clamped_to_reject() {
let ctrl = make_controller();
match ctrl.should_accept_request(2.0) {
BackpressureDecision::Reject { .. } => {}
other => panic!("Expected Reject for utilization > 1.0, got {:?}", other),
}
}
#[test]
fn test_utilization_below_zero_clamped_to_accept() {
let ctrl = make_controller();
assert_eq!(
ctrl.should_accept_request(-0.5),
BackpressureDecision::Accept
);
}
}