use std::collections::HashMap;
pub use super::{LateDataDecision, LateDataHandler, LateDataPolicy};
#[derive(Debug, Default)]
pub struct AllowedLatenessTracker {
windows: HashMap<String, (i64, i64)>,
}
impl AllowedLatenessTracker {
pub fn new() -> Self {
Self {
windows: HashMap::new(),
}
}
pub fn register(
&mut self,
window_id: impl Into<String>,
window_end_ms: i64,
allowed_lateness_ms: i64,
) {
self.windows
.insert(window_id.into(), (window_end_ms, allowed_lateness_ms));
}
pub fn is_open(&self, window_id: &str, now_ms: i64) -> bool {
match self.windows.get(window_id) {
None => false,
Some(&(end_ms, lateness_ms)) => now_ms < end_ms.saturating_add(lateness_ms),
}
}
pub fn evict_closed(&mut self, now_ms: i64) -> Vec<String> {
let mut evicted = Vec::new();
self.windows.retain(|id, &mut (end_ms, lateness_ms)| {
let still_open = now_ms < end_ms.saturating_add(lateness_ms);
if !still_open {
evicted.push(id.clone());
}
still_open
});
evicted
}
pub fn len(&self) -> usize {
self.windows.len()
}
pub fn is_empty(&self) -> bool {
self.windows.is_empty()
}
}
#[derive(Debug, Default)]
pub struct SideOutputRouter<E> {
channels: HashMap<String, Vec<E>>,
}
impl<E> SideOutputRouter<E> {
pub fn new() -> Self {
Self {
channels: HashMap::new(),
}
}
pub fn push(&mut self, channel: &str, event: E) {
self.channels
.entry(channel.to_string())
.or_default()
.push(event);
}
pub fn drain(&mut self, channel: &str) -> Vec<E> {
self.channels.remove(channel).unwrap_or_default()
}
pub fn len(&self, channel: &str) -> usize {
self.channels.get(channel).map(|v| v.len()).unwrap_or(0)
}
pub fn is_empty(&self, channel: &str) -> bool {
self.channels
.get(channel)
.map(|v| v.is_empty())
.unwrap_or(true)
}
pub fn channels(&self) -> impl Iterator<Item = &String> {
self.channels.keys()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn allowed_lateness_open_until_budget_exhausted() {
let mut t = AllowedLatenessTracker::new();
t.register("w1", 1_000, 500);
assert!(t.is_open("w1", 1_400));
assert!(!t.is_open("w1", 1_500));
assert!(!t.is_open("w1", 2_000));
}
#[test]
fn allowed_lateness_evict_closed() {
let mut t = AllowedLatenessTracker::new();
t.register("a", 100, 100);
t.register("b", 1_000, 100);
let evicted = t.evict_closed(500);
assert!(evicted.contains(&"a".to_string()));
assert!(!evicted.contains(&"b".to_string()));
assert_eq!(t.len(), 1);
}
#[test]
fn side_output_router_pushes_and_drains() {
let mut router: SideOutputRouter<i32> = SideOutputRouter::new();
router.push("late", 1);
router.push("late", 2);
router.push("dlq", 7);
assert_eq!(router.len("late"), 2);
assert_eq!(router.len("dlq"), 1);
let drained = router.drain("late");
assert_eq!(drained, vec![1, 2]);
assert!(router.is_empty("late"));
assert_eq!(router.len("dlq"), 1);
}
#[test]
fn re_export_late_data_handler_is_visible() {
let mut h = LateDataHandler::new(LateDataPolicy::Drop);
let d = h.handle(10, 50);
assert_eq!(d, LateDataDecision::Drop);
}
}