use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use crate::traits::BackpressureStrategy;
#[derive(Debug)]
pub struct BackpressureMonitor {
strategy: BackpressureStrategy,
capacity: usize,
current_fill: AtomicU64,
high_watermark: f64,
low_watermark: f64,
items_dropped: AtomicU64,
blocked_time_ns: AtomicU64,
backpressure_events: AtomicU64,
}
impl BackpressureMonitor {
pub fn new(strategy: BackpressureStrategy, capacity: usize) -> Self {
Self {
strategy,
capacity,
current_fill: AtomicU64::new(0),
high_watermark: 0.8,
low_watermark: 0.5,
items_dropped: AtomicU64::new(0),
blocked_time_ns: AtomicU64::new(0),
backpressure_events: AtomicU64::new(0),
}
}
pub fn with_watermarks(mut self, high: f64, low: f64) -> Self {
self.high_watermark = high.clamp(0.0, 1.0);
self.low_watermark = low.clamp(0.0, self.high_watermark);
self
}
pub fn update_fill(&self, current: usize) {
self.current_fill.store(current as u64, Ordering::Relaxed);
}
pub fn fill_ratio(&self) -> f64 {
self.current_fill.load(Ordering::Relaxed) as f64 / self.capacity as f64
}
pub fn should_apply_backpressure(&self) -> bool {
self.fill_ratio() >= self.high_watermark
}
pub fn has_recovered(&self) -> bool {
self.fill_ratio() <= self.low_watermark
}
pub fn record_backpressure(&self) {
self.backpressure_events.fetch_add(1, Ordering::Relaxed);
}
pub fn record_dropped(&self, count: u64) {
self.items_dropped.fetch_add(count, Ordering::Relaxed);
}
pub fn record_blocked_time(&self, duration: Duration) {
self.blocked_time_ns
.fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
}
pub fn stats(&self) -> BackpressureStats {
BackpressureStats {
strategy: self.strategy,
fill_ratio: self.fill_ratio(),
items_dropped: self.items_dropped.load(Ordering::Relaxed),
blocked_time_ms: self.blocked_time_ns.load(Ordering::Relaxed) / 1_000_000,
backpressure_events: self.backpressure_events.load(Ordering::Relaxed),
is_under_pressure: self.should_apply_backpressure(),
}
}
pub fn strategy(&self) -> BackpressureStrategy {
self.strategy
}
}
#[derive(Debug, Clone)]
pub struct BackpressureStats {
pub strategy: BackpressureStrategy,
pub fill_ratio: f64,
pub items_dropped: u64,
pub blocked_time_ms: u64,
pub backpressure_events: u64,
pub is_under_pressure: bool,
}
#[derive(Debug)]
pub struct AdaptiveBackpressure {
target_fill: f64,
min_delay_ns: u64,
max_delay_ns: u64,
current_delay_ns: AtomicU64,
last_adjustment: std::sync::Mutex<Instant>,
adjustment_interval: Duration,
}
impl AdaptiveBackpressure {
pub fn new() -> Self {
Self {
target_fill: 0.7,
min_delay_ns: 0,
max_delay_ns: 10_000_000, current_delay_ns: AtomicU64::new(0),
last_adjustment: std::sync::Mutex::new(Instant::now()),
adjustment_interval: Duration::from_millis(100),
}
}
pub fn with_target_fill(mut self, target: f64) -> Self {
self.target_fill = target.clamp(0.1, 0.9);
self
}
pub fn with_delay_bounds(mut self, min: Duration, max: Duration) -> Self {
self.min_delay_ns = min.as_nanos() as u64;
self.max_delay_ns = max.as_nanos() as u64;
self
}
pub fn adjust(&self, current_fill: f64) {
let mut last_adj = self.last_adjustment.lock().expect("mutex poisoned");
if last_adj.elapsed() < self.adjustment_interval {
return;
}
*last_adj = Instant::now();
drop(last_adj);
let current_delay = self.current_delay_ns.load(Ordering::Relaxed);
let error = current_fill - self.target_fill;
let new_delay = if current_delay == 0 && error > 0.0 {
let step = (self.max_delay_ns / 10).max(1000);
(step as f64 * error * 2.0) as u64
} else {
let adjustment_factor = 1.0 + error * 0.5;
(current_delay as f64 * adjustment_factor) as u64
};
let clamped_delay = new_delay.clamp(self.min_delay_ns, self.max_delay_ns);
self.current_delay_ns
.store(clamped_delay, Ordering::Relaxed);
}
pub fn current_delay(&self) -> Duration {
Duration::from_nanos(self.current_delay_ns.load(Ordering::Relaxed))
}
pub fn reset(&self) {
self.current_delay_ns
.store(self.min_delay_ns, Ordering::Relaxed);
}
}
impl Default for AdaptiveBackpressure {
fn default() -> Self {
Self::new()
}
}
pub struct BackpressureAwareProducer {
monitor: BackpressureMonitor,
adaptive: Option<AdaptiveBackpressure>,
state: BackpressureState,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackpressureState {
Normal,
SlowingDown,
Blocked,
Recovering,
}
impl BackpressureAwareProducer {
pub fn new(strategy: BackpressureStrategy, capacity: usize) -> Self {
Self {
monitor: BackpressureMonitor::new(strategy, capacity),
adaptive: None,
state: BackpressureState::Normal,
}
}
pub fn with_adaptive(mut self) -> Self {
self.adaptive = Some(AdaptiveBackpressure::new());
self
}
pub fn update(&mut self, fill_level: usize) {
self.monitor.update_fill(fill_level);
let ratio = self.monitor.fill_ratio();
if let Some(ref adaptive) = self.adaptive {
adaptive.adjust(ratio);
}
self.state = if ratio >= 1.0 {
BackpressureState::Blocked
} else if self.monitor.should_apply_backpressure() {
if self.state == BackpressureState::Normal {
self.monitor.record_backpressure();
}
BackpressureState::SlowingDown
} else if self.monitor.has_recovered() {
BackpressureState::Normal
} else if self.state == BackpressureState::SlowingDown {
BackpressureState::Recovering
} else {
self.state
};
}
pub fn state(&self) -> BackpressureState {
self.state
}
pub fn recommended_delay(&self) -> Duration {
match self.state {
BackpressureState::Normal => Duration::ZERO,
BackpressureState::SlowingDown | BackpressureState::Recovering => self
.adaptive
.as_ref()
.map(AdaptiveBackpressure::current_delay)
.unwrap_or(Duration::from_micros(100)),
BackpressureState::Blocked => Duration::from_millis(1),
}
}
pub fn record_dropped(&self, count: u64) {
self.monitor.record_dropped(count);
}
pub fn stats(&self) -> BackpressureStats {
self.monitor.stats()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn test_backpressure_monitor() {
let monitor = BackpressureMonitor::new(BackpressureStrategy::Block, 100);
monitor.update_fill(50);
assert!(!monitor.should_apply_backpressure());
monitor.update_fill(80);
assert!(monitor.should_apply_backpressure());
monitor.update_fill(40);
assert!(monitor.has_recovered());
}
#[test]
fn test_backpressure_monitor_stats() {
let monitor = BackpressureMonitor::new(BackpressureStrategy::DropOldest, 100);
monitor.record_dropped(5);
monitor.record_backpressure();
monitor.record_blocked_time(Duration::from_millis(100));
let stats = monitor.stats();
assert_eq!(stats.items_dropped, 5);
assert_eq!(stats.backpressure_events, 1);
assert!(stats.blocked_time_ms >= 100);
}
#[test]
fn test_adaptive_backpressure() {
let adaptive = AdaptiveBackpressure::new()
.with_target_fill(0.5)
.with_delay_bounds(Duration::ZERO, Duration::from_millis(100));
assert_eq!(adaptive.current_delay(), Duration::ZERO);
for _ in 0..10 {
adaptive.adjust(0.9);
std::thread::sleep(Duration::from_millis(110));
}
assert!(adaptive.current_delay() > Duration::ZERO);
}
#[test]
fn test_backpressure_aware_producer() {
let mut producer = BackpressureAwareProducer::new(BackpressureStrategy::Block, 100);
producer.update(50);
assert_eq!(producer.state(), BackpressureState::Normal);
producer.update(85);
assert_eq!(producer.state(), BackpressureState::SlowingDown);
producer.update(40);
assert_eq!(producer.state(), BackpressureState::Normal);
}
#[test]
fn test_backpressure_state_blocked() {
let mut producer = BackpressureAwareProducer::new(BackpressureStrategy::Block, 100);
producer.update(100);
assert_eq!(producer.state(), BackpressureState::Blocked);
}
}