use std::time::Duration;
use tokio::time::Instant;
pub const DEFAULT_SILENCE_THRESHOLD: Duration = Duration::from_secs(5);
#[derive(Debug)]
pub struct GapDetector {
topic: String,
last_event_at: Option<Instant>,
silence_threshold: Duration,
total_gap_ms: u64,
gap_count: u64,
}
impl GapDetector {
pub fn new(topic: impl Into<String>, silence_threshold: Duration) -> Self {
Self {
topic: topic.into(),
last_event_at: None,
silence_threshold,
total_gap_ms: 0,
gap_count: 0,
}
}
pub fn with_defaults(topic: impl Into<String>) -> Self {
Self::new(topic, DEFAULT_SILENCE_THRESHOLD)
}
pub fn record_event(&mut self) -> Option<u64> {
let now = Instant::now();
let gap = self.last_event_at.and_then(|prev| {
let elapsed = now.duration_since(prev);
if elapsed > self.silence_threshold {
let gap_ms = elapsed.as_millis() as u64;
self.total_gap_ms += gap_ms;
self.gap_count += 1;
tracing::warn!(
topic = %self.topic,
ingestion_gap_duration_ms = gap_ms,
gap_count = self.gap_count,
total_gap_ms = self.total_gap_ms,
"ingestion.gap_detected"
);
Some(gap_ms)
} else {
None
}
});
self.last_event_at = Some(now);
gap
}
pub fn topic(&self) -> &str {
&self.topic
}
pub fn total_gap_ms(&self) -> u64 {
self.total_gap_ms
}
pub fn gap_count(&self) -> u64 {
self.gap_count
}
pub fn time_since_last_event(&self) -> Option<Duration> {
self.last_event_at.map(|t| t.elapsed())
}
pub fn silence_threshold(&self) -> Duration {
self.silence_threshold
}
}
pub struct GapDetectorSet {
detectors: std::collections::HashMap<String, GapDetector>,
}
impl GapDetectorSet {
pub fn new(topics: &[&str], silence_threshold: Duration) -> Self {
let detectors = topics
.iter()
.map(|&t| (t.to_string(), GapDetector::new(t, silence_threshold)))
.collect();
Self { detectors }
}
pub fn record_event(&mut self, topic: &str) -> Option<u64> {
self.detectors.get_mut(topic).and_then(|d| d.record_event())
}
pub fn stats(&self) -> Vec<GapStats> {
self.detectors
.values()
.map(|d| GapStats {
topic: d.topic().to_string(),
gap_count: d.gap_count(),
total_gap_ms: d.total_gap_ms(),
})
.collect()
}
}
#[derive(Debug, Clone)]
pub struct GapStats {
pub topic: String,
pub gap_count: u64,
pub total_gap_ms: u64,
}