use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BackpressureStrategy {
Block,
Drop,
Throttle,
None,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackpressureConfig {
pub strategy: BackpressureStrategy,
pub max_queued_nodes: usize,
pub max_active_nodes: usize,
pub throttle_delay: Duration,
pub high_water_mark: f64,
pub low_water_mark: f64,
}
impl Default for BackpressureConfig {
fn default() -> Self {
Self {
strategy: BackpressureStrategy::Throttle,
max_queued_nodes: 1000,
max_active_nodes: 100,
throttle_delay: Duration::from_millis(100),
high_water_mark: 0.8,
low_water_mark: 0.6,
}
}
}
impl BackpressureConfig {
pub fn new(
strategy: BackpressureStrategy,
max_queued_nodes: usize,
max_active_nodes: usize,
) -> Self {
Self {
strategy,
max_queued_nodes,
max_active_nodes,
..Default::default()
}
}
pub fn unlimited() -> Self {
Self {
strategy: BackpressureStrategy::None,
max_queued_nodes: usize::MAX,
max_active_nodes: usize::MAX,
..Default::default()
}
}
pub fn strict(max_queued: usize, max_active: usize) -> Self {
Self {
strategy: BackpressureStrategy::Block,
max_queued_nodes: max_queued,
max_active_nodes: max_active,
high_water_mark: 0.9,
low_water_mark: 0.7,
..Default::default()
}
}
pub fn high_water_threshold(&self) -> usize {
(self.max_queued_nodes as f64 * self.high_water_mark) as usize
}
pub fn low_water_threshold(&self) -> usize {
(self.max_queued_nodes as f64 * self.low_water_mark) as usize
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BackpressureState {
Normal,
Warning,
Active,
}
pub struct BackpressureMonitor {
config: BackpressureConfig,
queued_count: Arc<AtomicUsize>,
active_count: Arc<AtomicUsize>,
total_blocked: Arc<AtomicUsize>,
total_dropped: Arc<AtomicUsize>,
total_throttled: Arc<AtomicUsize>,
}
impl BackpressureMonitor {
pub fn new(config: BackpressureConfig) -> Self {
Self {
config,
queued_count: Arc::new(AtomicUsize::new(0)),
active_count: Arc::new(AtomicUsize::new(0)),
total_blocked: Arc::new(AtomicUsize::new(0)),
total_dropped: Arc::new(AtomicUsize::new(0)),
total_throttled: Arc::new(AtomicUsize::new(0)),
}
}
pub fn state(&self) -> BackpressureState {
let queued = self.queued_count.load(Ordering::Relaxed);
let active = self.active_count.load(Ordering::Relaxed);
if queued >= self.config.max_queued_nodes || active >= self.config.max_active_nodes {
BackpressureState::Active
} else if queued >= self.config.high_water_threshold() {
BackpressureState::Warning
} else {
BackpressureState::Normal
}
}
pub fn should_apply_backpressure(&self) -> bool {
matches!(self.state(), BackpressureState::Active)
}
pub fn record_queued(&self) {
self.queued_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_dequeued(&self) {
self.queued_count.fetch_sub(1, Ordering::Relaxed);
self.active_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_completed(&self) {
self.active_count.fetch_sub(1, Ordering::Relaxed);
}
pub fn record_blocked(&self) {
self.total_blocked.fetch_add(1, Ordering::Relaxed);
}
pub fn record_dropped(&self) {
self.total_dropped.fetch_add(1, Ordering::Relaxed);
}
pub fn record_throttled(&self) {
self.total_throttled.fetch_add(1, Ordering::Relaxed);
}
pub fn queued_count(&self) -> usize {
self.queued_count.load(Ordering::Relaxed)
}
pub fn active_count(&self) -> usize {
self.active_count.load(Ordering::Relaxed)
}
pub fn stats(&self) -> BackpressureStats {
BackpressureStats {
queued_count: self.queued_count(),
active_count: self.active_count(),
total_blocked: self.total_blocked.load(Ordering::Relaxed),
total_dropped: self.total_dropped.load(Ordering::Relaxed),
total_throttled: self.total_throttled.load(Ordering::Relaxed),
state: self.state(),
config: self.config.clone(),
}
}
pub fn reset(&self) {
self.queued_count.store(0, Ordering::Relaxed);
self.active_count.store(0, Ordering::Relaxed);
self.total_blocked.store(0, Ordering::Relaxed);
self.total_dropped.store(0, Ordering::Relaxed);
self.total_throttled.store(0, Ordering::Relaxed);
}
pub fn throttle_delay(&self) -> Duration {
self.config.throttle_delay
}
pub fn strategy(&self) -> BackpressureStrategy {
self.config.strategy
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackpressureStats {
pub queued_count: usize,
pub active_count: usize,
pub total_blocked: usize,
pub total_dropped: usize,
pub total_throttled: usize,
pub state: BackpressureState,
pub config: BackpressureConfig,
}
impl BackpressureStats {
pub fn utilization(&self) -> f64 {
let max_total = self.config.max_queued_nodes + self.config.max_active_nodes;
let current_total = self.queued_count + self.active_count;
(current_total as f64) / (max_total as f64)
}
pub fn has_backpressure_events(&self) -> bool {
self.total_blocked > 0 || self.total_dropped > 0 || self.total_throttled > 0
}
pub fn total_backpressure_events(&self) -> usize {
self.total_blocked + self.total_dropped + self.total_throttled
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_backpressure_config_default() {
let config = BackpressureConfig::default();
assert_eq!(config.strategy, BackpressureStrategy::Throttle);
assert_eq!(config.max_queued_nodes, 1000);
assert_eq!(config.max_active_nodes, 100);
}
#[test]
fn test_backpressure_config_unlimited() {
let config = BackpressureConfig::unlimited();
assert_eq!(config.strategy, BackpressureStrategy::None);
assert_eq!(config.max_queued_nodes, usize::MAX);
assert_eq!(config.max_active_nodes, usize::MAX);
}
#[test]
fn test_backpressure_config_strict() {
let config = BackpressureConfig::strict(100, 10);
assert_eq!(config.strategy, BackpressureStrategy::Block);
assert_eq!(config.max_queued_nodes, 100);
assert_eq!(config.max_active_nodes, 10);
}
#[test]
fn test_water_marks() {
let config = BackpressureConfig {
max_queued_nodes: 100,
high_water_mark: 0.8,
low_water_mark: 0.6,
..Default::default()
};
assert_eq!(config.high_water_threshold(), 80);
assert_eq!(config.low_water_threshold(), 60);
}
#[test]
fn test_monitor_state_normal() {
let config = BackpressureConfig::default();
let monitor = BackpressureMonitor::new(config);
assert_eq!(monitor.state(), BackpressureState::Normal);
assert!(!monitor.should_apply_backpressure());
}
#[test]
fn test_monitor_state_warning() {
let config = BackpressureConfig {
max_queued_nodes: 100,
high_water_mark: 0.8,
..Default::default()
};
let monitor = BackpressureMonitor::new(config);
for _ in 0..81 {
monitor.record_queued();
}
assert_eq!(monitor.state(), BackpressureState::Warning);
assert!(!monitor.should_apply_backpressure()); }
#[test]
fn test_monitor_state_active() {
let config = BackpressureConfig {
max_queued_nodes: 100,
..Default::default()
};
let monitor = BackpressureMonitor::new(config);
for _ in 0..100 {
monitor.record_queued();
}
assert_eq!(monitor.state(), BackpressureState::Active);
assert!(monitor.should_apply_backpressure());
}
#[test]
fn test_monitor_queued_and_active() {
let config = BackpressureConfig::default();
let monitor = BackpressureMonitor::new(config);
assert_eq!(monitor.queued_count(), 0);
assert_eq!(monitor.active_count(), 0);
monitor.record_queued();
assert_eq!(monitor.queued_count(), 1);
assert_eq!(monitor.active_count(), 0);
monitor.record_dequeued();
assert_eq!(monitor.queued_count(), 0);
assert_eq!(monitor.active_count(), 1);
monitor.record_completed();
assert_eq!(monitor.queued_count(), 0);
assert_eq!(monitor.active_count(), 0);
}
#[test]
fn test_monitor_backpressure_events() {
let config = BackpressureConfig::default();
let monitor = BackpressureMonitor::new(config);
monitor.record_blocked();
monitor.record_dropped();
monitor.record_throttled();
let stats = monitor.stats();
assert_eq!(stats.total_blocked, 1);
assert_eq!(stats.total_dropped, 1);
assert_eq!(stats.total_throttled, 1);
assert!(stats.has_backpressure_events());
assert_eq!(stats.total_backpressure_events(), 3);
}
#[test]
fn test_monitor_reset() {
let config = BackpressureConfig::default();
let monitor = BackpressureMonitor::new(config);
monitor.record_queued();
monitor.record_blocked();
monitor.record_dropped();
assert_eq!(monitor.queued_count(), 1);
assert_eq!(monitor.stats().total_blocked, 1);
monitor.reset();
assert_eq!(monitor.queued_count(), 0);
assert_eq!(monitor.stats().total_blocked, 0);
assert_eq!(monitor.stats().total_dropped, 0);
}
#[test]
fn test_stats_utilization() {
let config = BackpressureConfig {
max_queued_nodes: 100,
max_active_nodes: 10,
..Default::default()
};
let monitor = BackpressureMonitor::new(config);
for _ in 0..50 {
monitor.record_queued();
}
for _ in 0..5 {
monitor.record_dequeued();
}
let stats = monitor.stats();
assert_eq!(stats.queued_count, 45); assert_eq!(stats.active_count, 5);
let utilization = stats.utilization();
assert!(utilization > 0.44 && utilization < 0.46);
}
}