use std::collections::HashMap;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct BackpressureConfig {
pub max_pending: usize,
pub max_queue_size: usize,
pub high_watermark: f64,
pub low_watermark: f64,
}
impl BackpressureConfig {
pub fn new() -> Self {
Self {
max_pending: 1000,
max_queue_size: 10000,
high_watermark: 0.8,
low_watermark: 0.6,
}
}
pub fn with_max_pending(mut self, max: usize) -> Self {
self.max_pending = max;
self
}
pub fn with_max_queue_size(mut self, max: usize) -> Self {
self.max_queue_size = max;
self
}
pub fn with_high_watermark(mut self, ratio: f64) -> Self {
self.high_watermark = ratio.clamp(0.0, 1.0);
self
}
pub fn with_low_watermark(mut self, ratio: f64) -> Self {
self.low_watermark = ratio.clamp(0.0, 1.0);
self
}
pub fn should_apply_backpressure(&self, pending: usize) -> bool {
pending >= (self.max_pending as f64 * self.high_watermark) as usize
}
pub fn should_release_backpressure(&self, pending: usize) -> bool {
pending <= (self.max_pending as f64 * self.low_watermark) as usize
}
pub fn is_at_capacity(&self, queue_size: usize) -> bool {
queue_size >= self.max_queue_size
}
}
impl Default for BackpressureConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct PoisonMessageDetector {
pub max_failures: u32,
pub failure_window: Duration,
failures: std::sync::Arc<std::sync::Mutex<HashMap<uuid::Uuid, (u32, u64)>>>,
}
impl PoisonMessageDetector {
pub fn new() -> Self {
Self {
max_failures: 5,
failure_window: Duration::from_secs(3600),
failures: std::sync::Arc::new(std::sync::Mutex::new(HashMap::new())),
}
}
pub fn with_max_failures(mut self, max: u32) -> Self {
self.max_failures = max;
self
}
pub fn with_failure_window(mut self, window: Duration) -> Self {
self.failure_window = window;
self
}
pub fn record_failure(&self, task_id: uuid::Uuid) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let mut failures = self.failures.lock().unwrap();
let entry = failures.entry(task_id).or_insert((0, now));
if now - entry.1 > self.failure_window.as_secs() {
*entry = (1, now);
} else {
entry.0 += 1;
entry.1 = now;
}
}
pub fn is_poison(&self, task_id: uuid::Uuid) -> bool {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let failures = self.failures.lock().unwrap();
if let Some((count, last_failure)) = failures.get(&task_id) {
if now - last_failure <= self.failure_window.as_secs() {
return *count >= self.max_failures;
}
}
false
}
pub fn failure_count(&self, task_id: uuid::Uuid) -> u32 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let failures = self.failures.lock().unwrap();
if let Some((count, last_failure)) = failures.get(&task_id) {
if now - last_failure <= self.failure_window.as_secs() {
return *count;
}
}
0
}
pub fn clear_failures(&self, task_id: uuid::Uuid) {
let mut failures = self.failures.lock().unwrap();
failures.remove(&task_id);
}
pub fn clear_all(&self) {
let mut failures = self.failures.lock().unwrap();
failures.clear();
}
}
impl Default for PoisonMessageDetector {
fn default() -> Self {
Self::new()
}
}