use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};
const DEFAULT_WINDOW: Duration = Duration::from_secs(60);
pub const DEFAULT_THRESHOLD: u64 = 1000;
const IDLE_GC_TRIGGER_SIZE: usize = 1024;
const IDLE_GC_WINDOWS: u32 = 10;
struct StreamWindow {
window_start: Instant,
window_drops: u64,
}
impl StreamWindow {
fn new() -> Self {
Self {
window_start: Instant::now(),
window_drops: 0,
}
}
fn record(&mut self, drops: u64, threshold: u64, window: Duration) -> Option<u64> {
let elapsed = self.window_start.elapsed();
if elapsed >= window {
self.window_start = Instant::now();
self.window_drops = drops;
} else {
self.window_drops += drops;
}
if self.window_drops >= threshold {
Some(self.window_drops)
} else {
None
}
}
}
pub struct CdcLagWarner {
windows: Mutex<HashMap<(u64, String), StreamWindow>>,
threshold: u64,
window: Duration,
}
impl CdcLagWarner {
pub fn new(threshold: u64) -> Self {
Self {
windows: Mutex::new(HashMap::new()),
threshold,
window: DEFAULT_WINDOW,
}
}
pub fn record_drops(&self, tenant_id: u64, stream_name: &str, drops: u64, oldest_lsn: u64) {
if drops == 0 {
return;
}
let mut windows = self.windows.lock().unwrap_or_else(|p| p.into_inner());
if windows.len() >= IDLE_GC_TRIGGER_SIZE {
let idle_after = self.window * IDLE_GC_WINDOWS;
windows.retain(|_, w| w.window_start.elapsed() < idle_after);
}
let key = (tenant_id, stream_name.to_string());
let entry = windows.entry(key).or_insert_with(StreamWindow::new);
if let Some(rate) = entry.record(drops, self.threshold, self.window) {
tracing::warn!(
tenant_id,
stream = stream_name,
dropped_in_window = rate,
threshold = self.threshold,
oldest_available_lsn = oldest_lsn,
"CDC stream drop rate exceeded threshold: lagging consumers may miss events"
);
}
}
pub fn remove_stream(&self, tenant_id: u64, stream_name: &str) {
let mut windows = self.windows.lock().unwrap_or_else(|p| p.into_inner());
windows.remove(&(tenant_id, stream_name.to_string()));
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn no_warn_below_threshold() {
let warner = CdcLagWarner::new(1000);
warner.record_drops(1, "orders_stream", 999, 0);
warner.record_drops(1, "orders_stream", 0, 0); }
#[test]
fn warn_at_threshold() {
let warner = CdcLagWarner::new(1);
warner.record_drops(1, "stream_a", 1, 500);
}
#[test]
fn window_resets() {
let mut w = StreamWindow::new();
w.window_start = Instant::now() - Duration::from_secs(61);
let result = w.record(5, 10, Duration::from_secs(60));
assert!(result.is_none());
assert_eq!(w.window_drops, 5);
}
#[test]
fn window_accumulates() {
let mut w = StreamWindow::new();
w.record(3, 10, Duration::from_secs(60));
let result = w.record(8, 10, Duration::from_secs(60));
assert_eq!(result, Some(11));
}
#[test]
fn remove_stream_cleans_up() {
let warner = CdcLagWarner::new(100);
warner.record_drops(1, "s1", 50, 0);
warner.remove_stream(1, "s1");
warner.record_drops(1, "s1", 50, 0);
let windows = warner.windows.lock().unwrap();
let w = windows.get(&(1u64, "s1".to_string())).unwrap();
assert_eq!(w.window_drops, 50);
}
}