use std::collections::HashMap;
use super::{WindowJoinKey, WindowJoinResult, WindowJoinStats};
#[derive(Debug, Clone)]
pub struct TumblingSlidingJoinConfig {
pub left_size_ms: i64,
pub right_size_ms: i64,
pub right_slide_ms: i64,
pub allowed_lateness_ms: i64,
}
impl TumblingSlidingJoinConfig {
pub fn new(left_size_ms: i64, right_size_ms: i64, right_slide_ms: i64) -> Self {
assert!(left_size_ms > 0, "left_size_ms must be > 0");
assert!(right_size_ms > 0, "right_size_ms must be > 0");
assert!(right_slide_ms > 0, "right_slide_ms must be > 0");
Self {
left_size_ms,
right_size_ms,
right_slide_ms,
allowed_lateness_ms: 0,
}
}
pub fn with_lateness(mut self, allowed_lateness_ms: i64) -> Self {
self.allowed_lateness_ms = allowed_lateness_ms;
self
}
}
#[derive(Clone)]
struct StampedEvent<E: Clone> {
ts_ms: i64,
event: E,
}
pub struct TumblingSlidingJoin<L: Clone, R: Clone> {
config: TumblingSlidingJoinConfig,
left: HashMap<i64, HashMap<WindowJoinKey, Vec<StampedEvent<L>>>>,
right: HashMap<i64, HashMap<WindowJoinKey, Vec<StampedEvent<R>>>>,
last_watermark_ms: i64,
stats: WindowJoinStats,
}
impl<L: Clone, R: Clone> TumblingSlidingJoin<L, R> {
pub fn new(config: TumblingSlidingJoinConfig) -> Self {
Self {
config,
left: HashMap::new(),
right: HashMap::new(),
last_watermark_ms: i64::MIN,
stats: WindowJoinStats::default(),
}
}
fn left_pane(&self, ts_ms: i64) -> i64 {
let s = self.config.left_size_ms;
ts_ms.div_euclid(s) * s
}
fn right_panes_for(&self, ts_ms: i64) -> Vec<i64> {
let size = self.config.right_size_ms;
let slide = self.config.right_slide_ms;
let lower_excl = ts_ms - size;
let upper_incl = ts_ms;
let lower_k_floor = lower_excl.div_euclid(slide);
let mut start_k = lower_k_floor + 1;
while start_k * slide <= lower_excl {
start_k += 1;
}
let end_k = upper_incl.div_euclid(slide);
let mut panes = Vec::new();
for k in start_k..=end_k {
let s = k * slide;
if s > lower_excl && s <= upper_incl {
panes.push(s);
}
}
panes
}
fn left_pane_closed(&self, pane_start: i64) -> bool {
let close_at = pane_start
.saturating_add(self.config.left_size_ms)
.saturating_add(self.config.allowed_lateness_ms);
self.last_watermark_ms >= close_at
}
fn right_pane_closed(&self, pane_start: i64) -> bool {
let close_at = pane_start
.saturating_add(self.config.right_size_ms)
.saturating_add(self.config.allowed_lateness_ms);
self.last_watermark_ms >= close_at
}
pub fn push_left(
&mut self,
key: WindowJoinKey,
ts_ms: i64,
event: L,
) -> Vec<WindowJoinResult<L, R>> {
let pane_start = self.left_pane(ts_ms);
if self.left_pane_closed(pane_start) {
self.stats.late_events_dropped += 1;
return Vec::new();
}
self.stats.left_events += 1;
let pane_end = pane_start + self.config.left_size_ms;
let mut emitted: Vec<WindowJoinResult<L, R>> = Vec::new();
let mut seen_right_ts: Vec<i64> = Vec::new();
let right_panes_overlapping = self.right_panes_overlapping_left(pane_start);
for r_start in right_panes_overlapping {
if let Some(events_by_key) = self.right.get(&r_start) {
if let Some(events) = events_by_key.get(&key) {
for r_ev in events {
if r_ev.ts_ms >= pane_start
&& r_ev.ts_ms < pane_end
&& !seen_right_ts.contains(&r_ev.ts_ms)
{
seen_right_ts.push(r_ev.ts_ms);
emitted.push(WindowJoinResult {
key: key.clone(),
left: event.clone(),
right: r_ev.event.clone(),
pane_end_ms: pane_end,
});
}
}
}
}
}
self.left
.entry(pane_start)
.or_default()
.entry(key)
.or_default()
.push(StampedEvent { ts_ms, event });
self.stats.joined_pairs += emitted.len() as u64;
emitted
}
pub fn push_right(
&mut self,
key: WindowJoinKey,
ts_ms: i64,
event: R,
) -> Vec<WindowJoinResult<L, R>> {
let panes = self.right_panes_for(ts_ms);
let any_open = panes.iter().any(|&s| !self.right_pane_closed(s));
if !any_open {
self.stats.late_events_dropped += 1;
return Vec::new();
}
self.stats.right_events += 1;
let left_pane_start = self.left_pane(ts_ms);
let left_pane_end = left_pane_start + self.config.left_size_ms;
let mut emitted = Vec::new();
if let Some(events_by_key) = self.left.get(&left_pane_start) {
if let Some(events) = events_by_key.get(&key) {
for l_ev in events {
emitted.push(WindowJoinResult {
key: key.clone(),
left: l_ev.event.clone(),
right: event.clone(),
pane_end_ms: left_pane_end,
});
}
}
}
for s in panes {
if !self.right_pane_closed(s) {
self.right
.entry(s)
.or_default()
.entry(key.clone())
.or_default()
.push(StampedEvent {
ts_ms,
event: event.clone(),
});
}
}
self.stats.joined_pairs += emitted.len() as u64;
emitted
}
fn right_panes_overlapping_left(&self, left_pane_start: i64) -> Vec<i64> {
let l_end = left_pane_start + self.config.left_size_ms;
let mut out = Vec::new();
for &r_start in self.right.keys() {
let r_end = r_start + self.config.right_size_ms;
if r_start < l_end && r_end > left_pane_start {
out.push(r_start);
}
}
out
}
pub fn advance_watermark(&mut self, watermark_ms: i64) -> usize {
if watermark_ms < self.last_watermark_ms {
return 0;
}
self.last_watermark_ms = watermark_ms;
let lat = self.config.allowed_lateness_ms;
let l_size = self.config.left_size_ms;
let r_size = self.config.right_size_ms;
let mut purged = 0usize;
self.left.retain(|&start, _| {
let close_at = start.saturating_add(l_size).saturating_add(lat);
let keep = watermark_ms < close_at;
if !keep {
purged += 1;
}
keep
});
self.right.retain(|&start, _| {
let close_at = start.saturating_add(r_size).saturating_add(lat);
let keep = watermark_ms < close_at;
if !keep {
purged += 1;
}
keep
});
self.stats.windows_closed += purged as u64;
purged
}
pub fn stats(&self) -> &WindowJoinStats {
&self.stats
}
pub fn pane_count(&self) -> usize {
self.left.len() + self.right.len()
}
pub fn watermark(&self) -> i64 {
self.last_watermark_ms
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn joins_when_event_falls_in_tumbling_pane() {
let cfg = TumblingSlidingJoinConfig::new(1_000, 1_000, 500);
let mut j: TumblingSlidingJoin<&str, &str> = TumblingSlidingJoin::new(cfg);
j.push_left("k".into(), 200, "L0");
let r = j.push_right("k".into(), 600, "R0");
assert_eq!(r.len(), 1);
assert_eq!(r[0].pane_end_ms, 1_000);
}
#[test]
fn does_not_join_when_outside_tumbling_pane() {
let cfg = TumblingSlidingJoinConfig::new(1_000, 1_000, 500);
let mut j: TumblingSlidingJoin<&str, &str> = TumblingSlidingJoin::new(cfg);
j.push_left("k".into(), 200, "L0");
let r = j.push_right("k".into(), 1_100, "R0");
assert!(r.is_empty());
}
#[test]
fn left_event_matches_existing_right_in_overlapping_pane() {
let cfg = TumblingSlidingJoinConfig::new(1_000, 1_000, 500);
let mut j: TumblingSlidingJoin<&str, &str> = TumblingSlidingJoin::new(cfg);
let _ = j.push_right("k".into(), 300, "R0");
let r = j.push_left("k".into(), 200, "L0");
assert_eq!(r.len(), 1);
}
#[test]
fn watermark_purges_left_and_right_panes() {
let cfg = TumblingSlidingJoinConfig::new(1_000, 1_000, 500);
let mut j: TumblingSlidingJoin<&str, &str> = TumblingSlidingJoin::new(cfg);
j.push_left("k".into(), 100, "L0");
let _ = j.push_right("k".into(), 300, "R0");
assert!(j.pane_count() >= 2);
let purged = j.advance_watermark(10_000);
assert!(purged >= 2);
assert_eq!(j.pane_count(), 0);
}
#[test]
fn late_event_dropped_when_all_target_panes_closed() {
let cfg = TumblingSlidingJoinConfig::new(1_000, 1_000, 500);
let mut j: TumblingSlidingJoin<&str, &str> = TumblingSlidingJoin::new(cfg);
j.advance_watermark(10_000);
let r = j.push_right("k".into(), 500, "Late");
assert!(r.is_empty());
assert_eq!(j.stats.late_events_dropped, 1);
}
#[test]
fn allowed_lateness_extends_pane_lifetime() {
let cfg = TumblingSlidingJoinConfig::new(1_000, 1_000, 500).with_lateness(2_000);
let mut j: TumblingSlidingJoin<&str, &str> = TumblingSlidingJoin::new(cfg);
j.push_left("k".into(), 100, "L0");
j.advance_watermark(2_500); let r = j.push_right("k".into(), 700, "R0");
assert_eq!(r.len(), 1);
}
}