use crate::kafka::client::TopicPartition;
use std::collections::{HashMap, VecDeque};
use std::sync::Mutex;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy)]
struct WatermarkSample {
at: Instant,
hwm: i64,
}
fn compute_rate(buf: &VecDeque<WatermarkSample>, min_msgs_per_sec: f64) -> Option<f64> {
if buf.len() < 2 {
return None;
}
let first = *buf.front()?;
let last = *buf.back()?;
let d_hwm = (last.hwm - first.hwm) as f64;
let d_t = last.at.saturating_duration_since(first.at).as_secs_f64();
if d_t <= 0.0 || d_hwm <= 0.0 {
return None;
}
let rate = d_hwm / d_t;
if rate < min_msgs_per_sec {
return None;
}
Some(rate)
}
pub struct RateSampler {
history: Mutex<HashMap<TopicPartition, VecDeque<WatermarkSample>>>,
max_samples: usize,
max_age: Duration,
min_msgs_per_sec: f64,
}
impl RateSampler {
pub fn new(max_samples: usize, max_age: Duration, min_msgs_per_sec: f64) -> Self {
Self {
history: Mutex::new(HashMap::new()),
max_samples: max_samples.max(2),
max_age,
min_msgs_per_sec,
}
}
pub fn record_watermarks(&self, watermarks: &HashMap<TopicPartition, (i64, i64)>) {
let now = Instant::now();
let mut history = self.history.lock().unwrap_or_else(|p| p.into_inner());
history.retain(|tp, _| watermarks.contains_key(tp));
for (tp, (_low, high)) in watermarks {
let buf = history.entry(tp.clone()).or_default();
while let Some(front) = buf.front() {
if now.duration_since(front.at) > self.max_age {
buf.pop_front();
} else {
break;
}
}
buf.push_back(WatermarkSample {
at: now,
hwm: *high,
});
while buf.len() > self.max_samples {
buf.pop_front();
}
}
}
#[cfg_attr(not(test), allow(dead_code))]
pub fn estimate_lag_seconds(&self, tp: &TopicPartition, lag: i64) -> Option<f64> {
if lag <= 0 {
return Some(0.0);
}
let history = self.history.lock().unwrap_or_else(|p| p.into_inner());
compute_rate(history.get(tp)?, self.min_msgs_per_sec).map(|rate| lag as f64 / rate)
}
pub fn rates_snapshot(&self) -> HashMap<TopicPartition, f64> {
let history = self.history.lock().unwrap_or_else(|p| p.into_inner());
let mut out = HashMap::with_capacity(history.len());
for (tp, buf) in history.iter() {
if let Some(rate) = compute_rate(buf, self.min_msgs_per_sec) {
out.insert(tp.clone(), rate);
}
}
out
}
pub fn tracked_partitions(&self) -> usize {
self.history.lock().unwrap_or_else(|p| p.into_inner()).len()
}
#[allow(dead_code)]
pub fn clear(&self) {
self.history
.lock()
.unwrap_or_else(|p| p.into_inner())
.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
fn tp(topic: &str, partition: i32) -> TopicPartition {
TopicPartition::new(topic, partition)
}
#[test]
fn single_sample_returns_none() {
let s = RateSampler::new(5, Duration::from_secs(600), 0.01);
let mut wm = HashMap::new();
wm.insert(tp("t", 0), (0, 100));
s.record_watermarks(&wm);
assert_eq!(s.estimate_lag_seconds(&tp("t", 0), 50), None);
}
#[test]
fn two_samples_compute_rate() {
let s = RateSampler::new(5, Duration::from_secs(600), 0.01);
let mut wm1 = HashMap::new();
wm1.insert(tp("t", 0), (0, 100));
s.record_watermarks(&wm1);
std::thread::sleep(Duration::from_millis(250));
let mut wm2 = HashMap::new();
wm2.insert(tp("t", 0), (0, 1100));
s.record_watermarks(&wm2);
let est = s
.estimate_lag_seconds(&tp("t", 0), 500)
.expect("should compute rate");
assert!(est > 0.0 && est.is_finite(), "non-positive estimate: {est}");
}
#[test]
fn zero_lag_is_zero_seconds() {
let s = RateSampler::new(5, Duration::from_secs(600), 0.01);
let mut wm = HashMap::new();
wm.insert(tp("t", 0), (0, 100));
s.record_watermarks(&wm);
assert_eq!(s.estimate_lag_seconds(&tp("t", 0), 0), Some(0.0));
}
#[test]
fn idle_partition_below_min_rate_returns_none() {
let s = RateSampler::new(5, Duration::from_secs(600), 100.0);
let mut wm1 = HashMap::new();
wm1.insert(tp("t", 0), (0, 100));
s.record_watermarks(&wm1);
std::thread::sleep(Duration::from_millis(50));
s.record_watermarks(&wm1);
assert_eq!(s.estimate_lag_seconds(&tp("t", 0), 10), None);
}
#[test]
fn history_bounded_by_max_samples() {
let s = RateSampler::new(3, Duration::from_secs(600), 0.01);
let mut wm = HashMap::new();
wm.insert(tp("t", 0), (0, 0));
for i in 0..10 {
wm.insert(tp("t", 0), (0, i));
s.record_watermarks(&wm);
}
let history = s.history.lock().unwrap();
let buf = history.get(&tp("t", 0)).unwrap();
assert_eq!(buf.len(), 3, "must be bounded to max_samples");
}
#[test]
fn unmonitored_partitions_pruned() {
let s = RateSampler::new(5, Duration::from_secs(600), 0.01);
let mut wm = HashMap::new();
wm.insert(tp("t", 0), (0, 100));
wm.insert(tp("other", 0), (0, 100));
s.record_watermarks(&wm);
assert_eq!(s.tracked_partitions(), 2);
let mut wm2 = HashMap::new();
wm2.insert(tp("t", 0), (0, 101));
s.record_watermarks(&wm2);
assert_eq!(s.tracked_partitions(), 1);
}
#[test]
fn retention_rewind_returns_none() {
let s = RateSampler::new(5, Duration::from_secs(600), 0.01);
let mut wm1 = HashMap::new();
wm1.insert(tp("t", 0), (0, 1000));
s.record_watermarks(&wm1);
std::thread::sleep(Duration::from_millis(30));
let mut wm2 = HashMap::new();
wm2.insert(tp("t", 0), (0, 500));
s.record_watermarks(&wm2);
assert_eq!(s.estimate_lag_seconds(&tp("t", 0), 100), None);
}
}