use super::types::*;
use crate::core::error::Result;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
#[derive(Debug)]
pub struct BackpressureController {
config: AdvancedStreamingConfig,
pressure_level: Arc<RwLock<PressureLevel>>,
flow_state: Arc<RwLock<FlowState>>,
metrics: Arc<RwLock<BackpressureMetrics>>,
resource_monitor: Arc<RwLock<ResourceMonitor>>,
strategies: Arc<RwLock<FlowControlStrategies>>,
}
#[derive(Debug, Clone, PartialEq, PartialOrd)]
pub enum PressureLevel {
None = 0,
Low = 1,
Medium = 2,
High = 3,
Critical = 4,
}
#[derive(Debug, Clone)]
pub struct FlowState {
pub flow_rate: f32,
pub target_rate: f32,
pub buffer_fill: f32,
pub actions_taken: Vec<FlowAction>,
pub last_adjustment: Instant,
pub rate_history: VecDeque<(Instant, f32)>,
pub smoothed_rate: f32,
pub flow_variance: f32,
pub adaptation_factor: f32,
}
#[derive(Debug, Clone)]
pub enum FlowAction {
IncreaseRate(f32),
DecreaseRate(f32),
PauseFlow,
ResumeFlow,
BufferDrain,
QualityAdjustment(f32),
AdaptiveThrottle(f32),
LoadBalance(LoadBalanceAction),
EmergencyControl,
}
#[derive(Debug, Clone)]
pub enum LoadBalanceAction {
Redistribute,
Prioritize(Vec<String>),
DropLowPriority,
FairShare,
}
#[derive(Debug, Clone)]
pub struct BackpressureMetrics {
pub pressure_events: usize,
pub time_under_pressure_ms: u64,
pub flow_adjustments: usize,
pub overflows_prevented: usize,
pub quality_adjustments: usize,
pub emergency_activations: usize,
pub average_pressure: f32,
pub max_pressure_level: PressureLevel,
pub effectiveness_score: f32,
pub resource_stats: ResourceStats,
}
#[derive(Debug, Clone)]
pub struct ResourceMonitor {
pub cpu_usage: f32,
pub memory_usage: f32,
pub network_usage: f32,
pub io_ops_per_sec: f32,
pub usage_history: VecDeque<(Instant, ResourceSnapshot)>,
pub availability_forecast: f32,
}
#[derive(Debug, Clone)]
pub struct ResourceSnapshot {
pub cpu: f32,
pub memory: f32,
pub network: f32,
pub io: f32,
}
#[derive(Debug, Clone)]
pub struct ResourceStats {
pub avg_cpu: f32,
pub peak_cpu: f32,
pub avg_memory: f32,
pub peak_memory: f32,
pub network_efficiency: f32,
}
#[derive(Debug, Clone)]
pub struct FlowControlStrategies {
pub default_strategy: BackpressureStrategy,
pub high_pressure_strategy: BackpressureStrategy,
pub emergency_strategy: BackpressureStrategy,
pub custom_strategies: std::collections::HashMap<String, BackpressureStrategy>,
}
#[derive(Debug, Clone)]
pub enum BackpressureStrategy {
RateLimiting {
max_rate: f32,
burst_size: usize,
window_ms: u64,
},
Buffering {
buffer_size: usize,
overflow_action: OverflowAction,
},
Dropping {
drop_threshold: f32,
priority_preservation: bool,
},
Adaptive {
aggressiveness: f32,
adaptation_speed: f32,
},
}
#[derive(Debug, Clone)]
pub enum OverflowAction {
DropOldest,
DropNewest,
DropLowPriority,
Compress,
}
#[derive(Debug, Clone)]
pub struct EnhancedBufferState {
pub base_state: BufferState,
pub growth_rate: f32,
pub last_operation: Instant,
pub pressure_trend: f32,
pub utilization_history: VecDeque<(Instant, f32)>,
}
impl From<BufferState> for EnhancedBufferState {
fn from(base_state: BufferState) -> Self {
Self {
base_state,
growth_rate: 0.0,
last_operation: Instant::now(),
pressure_trend: 0.0,
utilization_history: VecDeque::with_capacity(50),
}
}
}
impl EnhancedBufferState {
pub fn new(base_state: BufferState) -> Self {
base_state.into()
}
pub fn update(&mut self, new_base_state: BufferState) {
let old_utilization = self.base_state.utilization;
let new_utilization = new_base_state.utilization;
let time_delta = self.last_operation.elapsed().as_secs_f32();
if time_delta > 0.0 {
self.growth_rate = (new_utilization - old_utilization) / time_delta;
}
self.pressure_trend = new_utilization - old_utilization;
self.utilization_history.push_back((Instant::now(), new_utilization));
if self.utilization_history.len() > 50 {
self.utilization_history.pop_front();
}
self.base_state = new_base_state;
self.last_operation = Instant::now();
}
pub fn utilization(&self) -> f32 {
self.base_state.utilization
}
}
impl BackpressureController {
pub fn new(config: AdvancedStreamingConfig) -> Self {
Self {
config,
pressure_level: Arc::new(RwLock::new(PressureLevel::None)),
flow_state: Arc::new(RwLock::new(FlowState::default())),
metrics: Arc::new(RwLock::new(BackpressureMetrics::default())),
resource_monitor: Arc::new(RwLock::new(ResourceMonitor::default())),
strategies: Arc::new(RwLock::new(FlowControlStrategies::default())),
}
}
pub async fn monitor_and_adjust(
&self,
buffer_state: &EnhancedBufferState,
) -> Result<Vec<FlowAction>> {
let mut actions = Vec::new();
self.update_resource_monitoring().await?;
let pressure = self.calculate_advanced_pressure_level(buffer_state).await;
let mut current_pressure = self.pressure_level.write().await;
let previous_pressure = current_pressure.clone();
*current_pressure = pressure.clone();
if pressure != previous_pressure {
let mut metrics = self.metrics.write().await;
metrics.pressure_events += 1;
if pressure > PressureLevel::None {
let now = Instant::now();
if let Some(last_event) = metrics.resource_stats.avg_cpu.partial_cmp(&0.0) {
metrics.time_under_pressure_ms += 100; }
}
if pressure > metrics.max_pressure_level {
metrics.max_pressure_level = pressure.clone();
}
}
let flow_actions = self.determine_adaptive_flow_actions(&pressure, buffer_state).await?;
actions.extend(flow_actions);
if self.config.enable_backpressure {
let load_balance_actions = self.calculate_load_balance_actions(&pressure).await?;
actions.extend(load_balance_actions);
}
self.update_adaptive_flow_state(&actions).await?;
self.update_effectiveness_metrics(&pressure, &actions).await?;
Ok(actions)
}
pub async fn monitor_and_adjust_basic(
&self,
buffer_state: &BufferState,
) -> Result<Vec<FlowAction>> {
let enhanced_state = EnhancedBufferState::from(buffer_state.clone());
self.monitor_and_adjust(&enhanced_state).await
}
async fn calculate_advanced_pressure_level(
&self,
buffer_state: &EnhancedBufferState,
) -> PressureLevel {
let utilization = buffer_state.utilization();
let growth_rate = buffer_state.growth_rate;
let resource_monitor = self.resource_monitor.read().await;
let cpu_factor = resource_monitor.cpu_usage / 100.0;
let memory_factor = resource_monitor.memory_usage / 100.0;
let buffer_pressure = utilization;
let resource_pressure = (cpu_factor + memory_factor) / 2.0;
let growth_pressure = growth_rate.min(1.0);
let flow_state = self.flow_state.read().await;
let adaptation_weight = flow_state.adaptation_factor;
let composite_pressure = (buffer_pressure * 0.4)
+ (resource_pressure * 0.3)
+ (growth_pressure * 0.2)
+ (adaptation_weight * 0.1);
match composite_pressure {
p if p >= 0.95 => PressureLevel::Critical,
p if p >= 0.85 => PressureLevel::High,
p if p >= 0.70 => PressureLevel::Medium,
p if p >= 0.50 => PressureLevel::Low,
_ => PressureLevel::None,
}
}
async fn determine_adaptive_flow_actions(
&self,
pressure: &PressureLevel,
buffer_state: &EnhancedBufferState,
) -> Result<Vec<FlowAction>> {
let mut actions = Vec::new();
let flow_state = self.flow_state.read().await;
let strategies = self.strategies.read().await;
let strategy = match pressure {
PressureLevel::Critical => &strategies.emergency_strategy,
PressureLevel::High => &strategies.high_pressure_strategy,
_ => &strategies.default_strategy,
};
match pressure {
PressureLevel::Critical => {
actions.push(FlowAction::EmergencyControl);
actions.push(FlowAction::PauseFlow);
actions.push(FlowAction::BufferDrain);
actions.push(FlowAction::QualityAdjustment(0.3));
actions.push(FlowAction::LoadBalance(LoadBalanceAction::DropLowPriority));
},
PressureLevel::High => {
let reduction_factor =
self.calculate_adaptive_reduction_factor(&flow_state, buffer_state).await;
let reduction = flow_state.flow_rate * reduction_factor;
actions.push(FlowAction::DecreaseRate(reduction));
actions.push(FlowAction::QualityAdjustment(0.6));
actions.push(FlowAction::AdaptiveThrottle(0.7));
},
PressureLevel::Medium => {
let reduction_factor =
self.calculate_adaptive_reduction_factor(&flow_state, buffer_state).await * 0.5;
let reduction = flow_state.flow_rate * reduction_factor;
actions.push(FlowAction::DecreaseRate(reduction));
actions.push(FlowAction::LoadBalance(LoadBalanceAction::Redistribute));
},
PressureLevel::Low => {
if flow_state.flow_rate > flow_state.target_rate {
let reduction = flow_state.flow_rate * 0.1 * flow_state.adaptation_factor;
actions.push(FlowAction::DecreaseRate(reduction));
}
},
PressureLevel::None => {
if flow_state.flow_rate < flow_state.target_rate {
let increase =
self.calculate_safe_increase_rate(&flow_state, buffer_state).await;
actions.push(FlowAction::IncreaseRate(increase));
}
if flow_state.smoothed_rate > flow_state.target_rate * 0.9 {
actions.push(FlowAction::QualityAdjustment(1.0));
}
},
}
Ok(actions)
}
async fn calculate_adaptive_reduction_factor(
&self,
flow_state: &FlowState,
buffer_state: &EnhancedBufferState,
) -> f32 {
let mut reduction_factor = 0.3;
if flow_state.flow_variance > 0.5 {
reduction_factor += 0.2;
}
if buffer_state.growth_rate > 0.8 {
reduction_factor += 0.3;
}
reduction_factor *= flow_state.adaptation_factor;
reduction_factor.clamp(0.1, 0.8)
}
async fn calculate_safe_increase_rate(
&self,
flow_state: &FlowState,
buffer_state: &EnhancedBufferState,
) -> f32 {
let target_gap = flow_state.target_rate - flow_state.flow_rate;
let base_increase = target_gap * 0.1;
let stability_factor = if flow_state.flow_variance > 0.3 { 0.5 } else { 1.0 };
let buffer_factor = if buffer_state.utilization() > 0.3 { 0.7 } else { 1.0 };
base_increase * stability_factor * buffer_factor * flow_state.adaptation_factor
}
async fn calculate_load_balance_actions(
&self,
pressure: &PressureLevel,
) -> Result<Vec<FlowAction>> {
let mut actions = Vec::new();
match pressure {
PressureLevel::Critical => {
actions.push(FlowAction::LoadBalance(LoadBalanceAction::DropLowPriority));
actions.push(FlowAction::LoadBalance(LoadBalanceAction::Prioritize(
vec!["critical".to_string()],
)));
},
PressureLevel::High => {
actions.push(FlowAction::LoadBalance(LoadBalanceAction::Redistribute));
},
PressureLevel::Medium => {
actions.push(FlowAction::LoadBalance(LoadBalanceAction::FairShare));
},
_ => {
},
}
Ok(actions)
}
async fn update_adaptive_flow_state(&self, actions: &[FlowAction]) -> Result<()> {
let mut flow_state = self.flow_state.write().await;
let mut metrics = self.metrics.write().await;
let now = Instant::now();
let current_rate = flow_state.flow_rate;
flow_state.rate_history.push_back((now, current_rate));
if flow_state.rate_history.len() > 100 {
flow_state.rate_history.pop_front();
}
let alpha = 0.3; flow_state.smoothed_rate =
alpha * flow_state.flow_rate + (1.0 - alpha) * flow_state.smoothed_rate;
if flow_state.rate_history.len() > 10 {
let rates: Vec<f32> = flow_state.rate_history.iter().map(|(_, rate)| *rate).collect();
let mean = rates.iter().sum::<f32>() / rates.len() as f32;
let variance =
rates.iter().map(|rate| (rate - mean).powi(2)).sum::<f32>() / rates.len() as f32;
flow_state.flow_variance = variance.sqrt() / mean; }
for action in actions {
match action {
FlowAction::IncreaseRate(increase) => {
flow_state.flow_rate += increase;
metrics.flow_adjustments += 1;
if flow_state.flow_variance < 0.3 {
flow_state.adaptation_factor =
(flow_state.adaptation_factor * 1.05).min(2.0);
}
},
FlowAction::DecreaseRate(decrease) => {
flow_state.flow_rate = (flow_state.flow_rate - decrease).max(0.0);
metrics.flow_adjustments += 1;
flow_state.adaptation_factor = (flow_state.adaptation_factor * 0.98).max(0.5);
},
FlowAction::PauseFlow => {
flow_state.flow_rate = 0.0;
metrics.flow_adjustments += 1;
flow_state.adaptation_factor = (flow_state.adaptation_factor * 0.9).max(0.3);
},
FlowAction::ResumeFlow => {
flow_state.flow_rate = flow_state.target_rate * 0.5; metrics.flow_adjustments += 1;
},
FlowAction::BufferDrain => {
metrics.overflows_prevented += 1;
},
FlowAction::QualityAdjustment(_) => {
metrics.quality_adjustments += 1;
},
FlowAction::EmergencyControl => {
metrics.emergency_activations += 1;
flow_state.adaptation_factor = (flow_state.adaptation_factor * 0.8).max(0.2);
},
FlowAction::AdaptiveThrottle(intensity) => {
flow_state.flow_rate *= 1.0 - intensity;
metrics.flow_adjustments += 1;
},
FlowAction::LoadBalance(_) => {
},
}
flow_state.actions_taken.push(action.clone());
}
flow_state.last_adjustment = now;
Ok(())
}
async fn update_resource_monitoring(&self) -> Result<()> {
let mut monitor = self.resource_monitor.write().await;
let now = Instant::now();
let snapshot = ResourceSnapshot {
cpu: self.get_cpu_usage().await,
memory: self.get_memory_usage().await,
network: self.get_network_usage().await,
io: self.get_io_usage().await,
};
monitor.cpu_usage = snapshot.cpu;
monitor.memory_usage = snapshot.memory;
monitor.network_usage = snapshot.network;
monitor.io_ops_per_sec = snapshot.io;
monitor.usage_history.push_back((now, snapshot));
if monitor.usage_history.len() > 100 {
monitor.usage_history.pop_front();
}
monitor.availability_forecast = self.calculate_resource_forecast(&monitor).await;
Ok(())
}
async fn calculate_resource_forecast(&self, monitor: &ResourceMonitor) -> f32 {
if monitor.usage_history.len() < 10 {
return 1.0; }
let recent_usage: Vec<f32> = monitor
.usage_history
.iter()
.rev()
.take(10)
.map(|(_, snapshot)| (snapshot.cpu + snapshot.memory) / 2.0)
.collect();
let trend = recent_usage.windows(2).map(|w| w[1] - w[0]).sum::<f32>()
/ (recent_usage.len() - 1) as f32;
let predicted_usage = recent_usage[0] + trend * 5.0; (100.0 - predicted_usage.clamp(0.0, 100.0)) / 100.0
}
async fn update_effectiveness_metrics(
&self,
pressure: &PressureLevel,
actions: &[FlowAction],
) -> Result<()> {
let mut metrics = self.metrics.write().await;
let action_score = match pressure {
PressureLevel::Critical => {
if actions.iter().any(|a| matches!(a, FlowAction::EmergencyControl)) {
1.0
} else {
0.3
}
},
PressureLevel::High => {
if actions.iter().any(|a| matches!(a, FlowAction::DecreaseRate(_))) {
0.9
} else {
0.5
}
},
PressureLevel::Medium => {
if actions
.iter()
.any(|a| matches!(a, FlowAction::DecreaseRate(_) | FlowAction::LoadBalance(_)))
{
0.8
} else {
0.6
}
},
PressureLevel::Low => {
if actions.len() <= 2 {
0.9
} else {
0.7
} },
PressureLevel::None => {
if actions.iter().any(|a| matches!(a, FlowAction::IncreaseRate(_))) {
1.0
} else {
0.8
}
},
};
let alpha = 0.1; metrics.effectiveness_score =
alpha * action_score + (1.0 - alpha) * metrics.effectiveness_score;
let pressure_value = pressure.clone() as u8 as f32;
metrics.average_pressure =
alpha * pressure_value + (1.0 - alpha) * metrics.average_pressure;
Ok(())
}
async fn get_cpu_usage(&self) -> f32 {
50.0
}
async fn get_memory_usage(&self) -> f32 {
60.0
}
async fn get_network_usage(&self) -> f32 {
30.0
}
async fn get_io_usage(&self) -> f32 {
40.0
}
pub async fn get_pressure_level(&self) -> PressureLevel {
self.pressure_level.read().await.clone()
}
pub async fn get_flow_state(&self) -> FlowState {
self.flow_state.read().await.clone()
}
pub async fn get_metrics(&self) -> BackpressureMetrics {
self.metrics.read().await.clone()
}
pub async fn get_resource_monitor(&self) -> ResourceMonitor {
self.resource_monitor.read().await.clone()
}
pub async fn reset_metrics(&self) {
let mut metrics = self.metrics.write().await;
*metrics = BackpressureMetrics::default();
}
pub async fn configure_strategies(&self, strategies: FlowControlStrategies) -> Result<()> {
let mut current_strategies = self.strategies.write().await;
*current_strategies = strategies;
Ok(())
}
pub async fn get_strategies(&self) -> FlowControlStrategies {
self.strategies.read().await.clone()
}
pub async fn health_check(&self) -> Result<SystemHealth> {
let pressure = self.get_pressure_level().await;
let metrics = self.get_metrics().await;
let resource_monitor = self.get_resource_monitor().await;
let health_score =
self.calculate_health_score(&pressure, &metrics, &resource_monitor).await;
Ok(SystemHealth {
overall_score: health_score,
pressure_level: pressure,
effectiveness: metrics.effectiveness_score,
resource_availability: resource_monitor.availability_forecast,
recommendations: self.generate_health_recommendations(&metrics).await,
})
}
async fn calculate_health_score(
&self,
pressure: &PressureLevel,
metrics: &BackpressureMetrics,
monitor: &ResourceMonitor,
) -> f32 {
let pressure_score = match pressure {
PressureLevel::None => 1.0,
PressureLevel::Low => 0.8,
PressureLevel::Medium => 0.6,
PressureLevel::High => 0.3,
PressureLevel::Critical => 0.1,
};
let effectiveness_score = metrics.effectiveness_score;
let resource_score = monitor.availability_forecast;
(pressure_score * 0.4 + effectiveness_score * 0.3 + resource_score * 0.3).clamp(0.0, 1.0)
}
async fn generate_health_recommendations(&self, metrics: &BackpressureMetrics) -> Vec<String> {
let mut recommendations = Vec::new();
if metrics.effectiveness_score < 0.7 {
recommendations.push(
"Consider adjusting flow control strategies for better effectiveness".to_string(),
);
}
if metrics.emergency_activations > 5 {
recommendations
.push("High number of emergency activations - review system capacity".to_string());
}
if metrics.average_pressure > 2.0 {
recommendations
.push("Consistently high pressure levels - consider scaling resources".to_string());
}
recommendations
}
}
#[derive(Debug, Clone)]
pub struct SystemHealth {
pub overall_score: f32,
pub pressure_level: PressureLevel,
pub effectiveness: f32,
pub resource_availability: f32,
pub recommendations: Vec<String>,
}
impl Default for FlowState {
fn default() -> Self {
Self {
flow_rate: 10.0, target_rate: 10.0,
buffer_fill: 0.0,
actions_taken: Vec::new(),
last_adjustment: Instant::now(),
rate_history: VecDeque::with_capacity(100),
smoothed_rate: 10.0,
flow_variance: 0.0,
adaptation_factor: 1.0,
}
}
}
impl Default for BackpressureMetrics {
fn default() -> Self {
Self {
pressure_events: 0,
time_under_pressure_ms: 0,
flow_adjustments: 0,
overflows_prevented: 0,
quality_adjustments: 0,
emergency_activations: 0,
average_pressure: 0.0,
max_pressure_level: PressureLevel::None,
effectiveness_score: 1.0,
resource_stats: ResourceStats::default(),
}
}
}
impl Default for ResourceMonitor {
fn default() -> Self {
Self {
cpu_usage: 0.0,
memory_usage: 0.0,
network_usage: 0.0,
io_ops_per_sec: 0.0,
usage_history: VecDeque::with_capacity(100),
availability_forecast: 1.0,
}
}
}
impl Default for ResourceStats {
fn default() -> Self {
Self {
avg_cpu: 0.0,
peak_cpu: 0.0,
avg_memory: 0.0,
peak_memory: 0.0,
network_efficiency: 1.0,
}
}
}
impl Default for FlowControlStrategies {
fn default() -> Self {
Self {
default_strategy: BackpressureStrategy::Adaptive {
aggressiveness: 0.5,
adaptation_speed: 0.3,
},
high_pressure_strategy: BackpressureStrategy::RateLimiting {
max_rate: 5.0,
burst_size: 10,
window_ms: 1000,
},
emergency_strategy: BackpressureStrategy::Dropping {
drop_threshold: 0.9,
priority_preservation: true,
},
custom_strategies: std::collections::HashMap::new(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
fn make_config() -> AdvancedStreamingConfig {
AdvancedStreamingConfig::default()
}
fn make_buffer_state(current: usize, max_size: usize) -> BufferState {
let utilization = if max_size > 0 { current as f32 / max_size as f32 } else { 0.0 };
BufferState {
current_size: current,
max_size,
utilization,
pending_chunks: current / 10,
}
}
#[test]
fn test_pressure_level_none_is_lowest() {
assert!(
PressureLevel::None < PressureLevel::Low,
"None should be less than Low"
);
assert!(
PressureLevel::None < PressureLevel::Critical,
"None should be less than Critical"
);
}
#[test]
fn test_pressure_level_critical_is_highest() {
assert!(PressureLevel::Critical > PressureLevel::High);
assert!(PressureLevel::Critical > PressureLevel::Medium);
assert!(PressureLevel::Critical > PressureLevel::Low);
assert!(PressureLevel::Critical > PressureLevel::None);
}
#[test]
fn test_pressure_level_ordering_invariant() {
assert!(PressureLevel::Low < PressureLevel::Medium);
assert!(PressureLevel::Medium < PressureLevel::High);
assert!(PressureLevel::High < PressureLevel::Critical);
}
#[test]
fn test_pressure_level_equality() {
assert_eq!(PressureLevel::None, PressureLevel::None);
assert_eq!(PressureLevel::High, PressureLevel::High);
assert_ne!(PressureLevel::None, PressureLevel::Critical);
}
#[tokio::test]
async fn test_backpressure_controller_new() {
let config = make_config();
let controller = BackpressureController::new(config);
let pressure = controller.get_pressure_level().await;
assert_eq!(
pressure,
PressureLevel::None,
"new controller should start at PressureLevel::None"
);
}
#[tokio::test]
async fn test_backpressure_controller_initial_flow_state() {
let config = make_config();
let controller = BackpressureController::new(config);
let flow_state = controller.get_flow_state().await;
assert!(
flow_state.flow_rate > 0.0,
"initial flow_rate should be positive"
);
assert!(
flow_state.target_rate > 0.0,
"initial target_rate should be positive"
);
assert_eq!(
flow_state.buffer_fill, 0.0,
"initial buffer_fill should be 0.0"
);
}
#[tokio::test]
async fn test_backpressure_controller_initial_metrics_zeroed() {
let config = make_config();
let controller = BackpressureController::new(config);
let metrics = controller.get_metrics().await;
assert_eq!(
metrics.pressure_events, 0,
"initial pressure_events should be 0"
);
assert_eq!(
metrics.flow_adjustments, 0,
"initial flow_adjustments should be 0"
);
assert_eq!(metrics.emergency_activations, 0);
assert!(
(metrics.effectiveness_score - 1.0).abs() < 1e-3,
"initial effectiveness_score should be 1.0"
);
}
#[tokio::test]
async fn test_monitor_no_pressure_empty_buffer() {
let config = make_config();
let controller = BackpressureController::new(config);
let buffer_state = make_buffer_state(0, 1000);
let actions = controller
.monitor_and_adjust_basic(&buffer_state)
.await
.expect("monitor_and_adjust_basic should not fail");
let has_emergency = actions.iter().any(|a| matches!(a, FlowAction::EmergencyControl));
assert!(
!has_emergency,
"empty buffer should not trigger EmergencyControl"
);
}
#[tokio::test]
async fn test_monitor_critical_pressure_high_buffer() {
let config = make_config();
let controller = BackpressureController::new(config);
let buffer_state = make_buffer_state(1000, 1000);
let actions = controller
.monitor_and_adjust_basic(&buffer_state)
.await
.expect("monitor_and_adjust_basic should not fail");
let pressure = controller.get_pressure_level().await;
assert!(
pressure >= PressureLevel::Low,
"100% buffer utilization should result in at least Low pressure, got {:?}",
pressure
);
let _ = actions.len();
}
#[tokio::test]
async fn test_monitor_returns_decrease_rate_on_high_fill() {
let config = make_config();
let controller = BackpressureController::new(config);
let buffer_state = make_buffer_state(900, 1000);
let actions = controller
.monitor_and_adjust_basic(&buffer_state)
.await
.expect("monitor_and_adjust_basic should not fail");
let pressure = controller.get_pressure_level().await;
assert!(
pressure >= PressureLevel::Low,
"90% buffer utilization should result in at least Low pressure, got {:?}",
pressure
);
let _ = actions.len();
}
#[test]
fn test_enhanced_buffer_state_from_basic() {
let basic = make_buffer_state(500, 1000);
let enhanced = EnhancedBufferState::from(basic.clone());
assert_eq!(enhanced.base_state.current_size, 500);
assert_eq!(enhanced.base_state.max_size, 1000);
assert_eq!(
enhanced.growth_rate, 0.0,
"initial growth_rate should be 0.0"
);
assert_eq!(
enhanced.pressure_trend, 0.0,
"initial pressure_trend should be 0.0"
);
}
#[test]
fn test_enhanced_buffer_state_utilization() {
let basic = make_buffer_state(300, 1000);
let enhanced = EnhancedBufferState::new(basic);
let util = enhanced.utilization();
assert!(
(util - 0.3).abs() < 1e-3,
"utilization should be ~0.3 for 300/1000, got {}",
util
);
}
#[test]
fn test_enhanced_buffer_state_update_tracks_trend() {
let basic = make_buffer_state(100, 1000);
let mut enhanced = EnhancedBufferState::new(basic);
let new_state = make_buffer_state(500, 1000);
enhanced.update(new_state);
assert!(
enhanced.pressure_trend >= 0.0,
"pressure_trend should be non-negative when utilization increased"
);
}
#[test]
fn test_enhanced_buffer_state_utilization_history_grows() {
let basic = make_buffer_state(100, 1000);
let mut enhanced = EnhancedBufferState::new(basic);
for i in 1..=5 {
let new_state = make_buffer_state(i * 100, 1000);
enhanced.update(new_state);
}
assert_eq!(
enhanced.utilization_history.len(),
5,
"utilization_history should track 5 updates"
);
}
#[test]
fn test_backpressure_metrics_default() {
let metrics = BackpressureMetrics::default();
assert_eq!(metrics.pressure_events, 0);
assert_eq!(metrics.time_under_pressure_ms, 0);
assert_eq!(metrics.flow_adjustments, 0);
assert_eq!(metrics.overflows_prevented, 0);
assert_eq!(metrics.quality_adjustments, 0);
assert_eq!(metrics.emergency_activations, 0);
assert_eq!(metrics.average_pressure, 0.0);
assert_eq!(metrics.max_pressure_level, PressureLevel::None);
assert!(
(metrics.effectiveness_score - 1.0).abs() < 1e-3,
"default effectiveness_score should be 1.0"
);
}
#[test]
fn test_backpressure_metrics_effectiveness_in_range() {
let mut metrics = BackpressureMetrics::default();
metrics.effectiveness_score = 0.75;
assert!(metrics.effectiveness_score >= 0.0 && metrics.effectiveness_score <= 1.0);
}
#[test]
fn test_flow_state_default() {
let state = FlowState::default();
assert!(
state.flow_rate > 0.0,
"default flow_rate should be positive"
);
assert!(
state.target_rate > 0.0,
"default target_rate should be positive"
);
assert_eq!(state.buffer_fill, 0.0, "default buffer_fill should be 0.0");
}
#[tokio::test]
async fn test_pressure_metric_range_low_utilization() {
let config = make_config();
let controller = BackpressureController::new(config);
let buffer_state = make_buffer_state(100, 1000);
let _ = controller
.monitor_and_adjust_basic(&buffer_state)
.await
.expect("should not fail");
let pressure = controller.get_pressure_level().await;
assert!(
pressure <= PressureLevel::Medium,
"10% buffer utilization should result in at most Medium pressure, got {:?}",
pressure
);
}
#[tokio::test]
async fn test_reset_metrics_clears_counters() {
let config = make_config();
let controller = BackpressureController::new(config);
let buffer_state = make_buffer_state(900, 1000);
let _ = controller
.monitor_and_adjust_basic(&buffer_state)
.await
.expect("should not fail");
controller.reset_metrics().await;
let metrics = controller.get_metrics().await;
assert_eq!(
metrics.pressure_events, 0,
"reset should clear pressure_events"
);
assert_eq!(
metrics.flow_adjustments, 0,
"reset should clear flow_adjustments"
);
}
#[tokio::test]
async fn test_no_oscillation_stable_buffer() {
let config = make_config();
let controller = BackpressureController::new(config);
let buffer_state = make_buffer_state(200, 1000);
let p1 = {
let _ = controller
.monitor_and_adjust_basic(&buffer_state)
.await
.expect("should not fail");
controller.get_pressure_level().await
};
let p2 = {
let _ = controller
.monitor_and_adjust_basic(&buffer_state)
.await
.expect("should not fail");
controller.get_pressure_level().await
};
assert_eq!(
p1, p2,
"stable buffer utilization should not cause pressure level oscillation"
);
}
#[test]
fn test_flow_control_strategies_default() {
let strategies = FlowControlStrategies::default();
assert!(
matches!(
strategies.default_strategy,
BackpressureStrategy::Adaptive { .. }
),
"default strategy should be Adaptive"
);
assert!(
matches!(
strategies.high_pressure_strategy,
BackpressureStrategy::RateLimiting { .. }
),
"high pressure strategy should be RateLimiting"
);
assert!(
matches!(
strategies.emergency_strategy,
BackpressureStrategy::Dropping { .. }
),
"emergency strategy should be Dropping"
);
}
}