use crate::error::{Result, StreamingError};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BackpressureStrategy {
Block,
DropOldest,
DropNewest,
Fail,
Adaptive,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackpressureConfig {
pub strategy: BackpressureStrategy,
pub high_watermark: f64,
pub low_watermark: f64,
pub max_latency: Duration,
pub sample_window: Duration,
pub adaptive: bool,
}
impl Default for BackpressureConfig {
fn default() -> Self {
Self {
strategy: BackpressureStrategy::Block,
high_watermark: 0.8,
low_watermark: 0.2,
max_latency: Duration::from_secs(1),
sample_window: Duration::from_secs(10),
adaptive: true,
}
}
}
#[derive(Debug, Clone)]
pub struct LoadMetrics {
pub buffer_utilization: f64,
pub avg_latency: Duration,
pub peak_latency: Duration,
pub throughput: f64,
pub dropped_elements: u64,
pub backpressure_events: u64,
}
impl Default for LoadMetrics {
fn default() -> Self {
Self {
buffer_utilization: 0.0,
avg_latency: Duration::ZERO,
peak_latency: Duration::ZERO,
throughput: 0.0,
dropped_elements: 0,
backpressure_events: 0,
}
}
}
pub struct BackpressureManager {
config: BackpressureConfig,
metrics: Arc<RwLock<LoadMetrics>>,
buffer_capacity: AtomicUsize,
buffer_size: AtomicUsize,
elements_processed: AtomicU64,
elements_dropped: AtomicU64,
backpressure_events: AtomicU64,
last_sample: Arc<RwLock<Instant>>,
sample_start: Instant,
}
impl BackpressureManager {
pub fn new(config: BackpressureConfig, buffer_capacity: usize) -> Self {
Self {
config,
metrics: Arc::new(RwLock::new(LoadMetrics::default())),
buffer_capacity: AtomicUsize::new(buffer_capacity),
buffer_size: AtomicUsize::new(0),
elements_processed: AtomicU64::new(0),
elements_dropped: AtomicU64::new(0),
backpressure_events: AtomicU64::new(0),
last_sample: Arc::new(RwLock::new(Instant::now())),
sample_start: Instant::now(),
}
}
pub async fn should_apply_backpressure(&self) -> bool {
let utilization = self.buffer_utilization();
utilization >= self.config.high_watermark
}
pub async fn can_release_backpressure(&self) -> bool {
let utilization = self.buffer_utilization();
utilization <= self.config.low_watermark
}
pub async fn handle_element_arrival(&self) -> Result<bool> {
let current_size = self.buffer_size.load(Ordering::Relaxed);
let capacity = self.buffer_capacity.load(Ordering::Relaxed);
if current_size >= capacity {
self.backpressure_events.fetch_add(1, Ordering::Relaxed);
match self.config.strategy {
BackpressureStrategy::Block => {
return Ok(false);
}
BackpressureStrategy::DropOldest | BackpressureStrategy::DropNewest => {
self.elements_dropped.fetch_add(1, Ordering::Relaxed);
return Ok(true);
}
BackpressureStrategy::Fail => {
return Err(StreamingError::BufferFull);
}
BackpressureStrategy::Adaptive => {
if self.should_apply_backpressure().await {
return Ok(false);
}
}
}
}
self.buffer_size.fetch_add(1, Ordering::Relaxed);
Ok(true)
}
pub async fn handle_element_processed(&self, latency: Duration) {
self.buffer_size.fetch_sub(1, Ordering::Relaxed);
self.elements_processed.fetch_add(1, Ordering::Relaxed);
self.update_metrics(latency).await;
}
async fn update_metrics(&self, latency: Duration) {
let now = Instant::now();
let last_sample = *self.last_sample.read().await;
if now.duration_since(last_sample) >= self.config.sample_window {
let mut metrics = self.metrics.write().await;
let mut last = self.last_sample.write().await;
metrics.buffer_utilization = self.buffer_utilization();
metrics.dropped_elements = self.elements_dropped.load(Ordering::Relaxed);
metrics.backpressure_events = self.backpressure_events.load(Ordering::Relaxed);
let elapsed = now.duration_since(self.sample_start).as_secs_f64();
let processed = self.elements_processed.load(Ordering::Relaxed);
metrics.throughput = processed as f64 / elapsed;
if latency > metrics.peak_latency {
metrics.peak_latency = latency;
}
let alpha = 0.1;
let new_latency_secs = latency.as_secs_f64();
let old_latency_secs = metrics.avg_latency.as_secs_f64();
let avg_latency_secs = alpha * new_latency_secs + (1.0 - alpha) * old_latency_secs;
metrics.avg_latency = Duration::from_secs_f64(avg_latency_secs);
*last = now;
}
}
fn buffer_utilization(&self) -> f64 {
let size = self.buffer_size.load(Ordering::Relaxed);
let capacity = self.buffer_capacity.load(Ordering::Relaxed);
if capacity == 0 {
0.0
} else {
size as f64 / capacity as f64
}
}
pub async fn metrics(&self) -> LoadMetrics {
self.metrics.read().await.clone()
}
pub fn set_capacity(&self, capacity: usize) {
self.buffer_capacity.store(capacity, Ordering::Relaxed);
}
pub fn capacity(&self) -> usize {
self.buffer_capacity.load(Ordering::Relaxed)
}
pub fn size(&self) -> usize {
self.buffer_size.load(Ordering::Relaxed)
}
pub async fn reset_metrics(&self) {
let mut metrics = self.metrics.write().await;
*metrics = LoadMetrics::default();
self.elements_processed.store(0, Ordering::Relaxed);
self.elements_dropped.store(0, Ordering::Relaxed);
self.backpressure_events.store(0, Ordering::Relaxed);
}
pub async fn adjust_capacity_adaptive(&self) {
let utilization = self.buffer_utilization();
let metrics = self.metrics().await;
if utilization > self.config.high_watermark && metrics.avg_latency < self.config.max_latency
{
let current = self.buffer_capacity.load(Ordering::Relaxed);
let new_capacity = (current as f64 * 1.2) as usize;
self.buffer_capacity.store(new_capacity, Ordering::Relaxed);
} else if utilization < self.config.low_watermark {
let current = self.buffer_capacity.load(Ordering::Relaxed);
let new_capacity = ((current as f64 * 0.8) as usize).max(64);
self.buffer_capacity.store(new_capacity, Ordering::Relaxed);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_backpressure_manager_creation() {
let config = BackpressureConfig::default();
let manager = BackpressureManager::new(config, 1000);
assert_eq!(manager.capacity(), 1000);
assert_eq!(manager.size(), 0);
}
#[tokio::test]
async fn test_buffer_utilization() {
let config = BackpressureConfig::default();
let manager = BackpressureManager::new(config, 100);
assert_eq!(manager.buffer_utilization(), 0.0);
for _ in 0..50 {
manager
.handle_element_arrival()
.await
.expect("backpressure element arrival should succeed");
}
assert!((manager.buffer_utilization() - 0.5).abs() < 0.01);
}
#[tokio::test]
async fn test_backpressure_application() {
let config = BackpressureConfig {
high_watermark: 0.5,
..Default::default()
};
let manager = BackpressureManager::new(config, 100);
for _ in 0..55 {
manager
.handle_element_arrival()
.await
.expect("backpressure element arrival should succeed");
}
assert!(manager.should_apply_backpressure().await);
}
#[tokio::test]
async fn test_adaptive_capacity_adjustment() {
let config = BackpressureConfig::default();
let manager = BackpressureManager::new(config, 100);
let initial_capacity = manager.capacity();
for _ in 0..95 {
manager
.handle_element_arrival()
.await
.expect("backpressure element arrival should succeed");
}
manager.adjust_capacity_adaptive().await;
let new_capacity = manager.capacity();
assert!(new_capacity > initial_capacity);
}
}