Skip to main content

oxirs_stream/window/joins/
session_session.rs

1//! Session × Session watermark-driven join.
2//!
3//! Both sides of the join use *session windows* defined by an inactivity gap.
4//! For a given key, consecutive events on either side are coalesced into one
5//! session as long as the inter-arrival gap is ≤ `gap_ms`.  When the gap is
6//! exceeded a fresh session starts.
7//!
8//! Two sessions (one per side) join *iff* they share the same key and their
9//! intervals (`[min_ts, max_ts]`) overlap.  Output is emitted on session
10//! closure: when the watermark advances past `session_end + allowed_lateness`,
11//! all matched pairs from that session are emitted and the session is purged.
12//!
13//! Late events arriving for an already-closed session are dropped and counted.
14
15use std::collections::HashMap;
16
17use super::{WindowJoinKey, WindowJoinResult, WindowJoinStats};
18
19// ─── Config ──────────────────────────────────────────────────────────────────
20
21/// Session × session join configuration.
22#[derive(Debug, Clone)]
23pub struct SessionSessionJoinConfig {
24    /// Inactivity gap that closes a session (ms).  Must be `> 0`.
25    pub gap_ms: i64,
26    /// Allowed lateness past `session_end` before purge.
27    pub allowed_lateness_ms: i64,
28}
29
30impl SessionSessionJoinConfig {
31    /// Build a config with the supplied gap and zero lateness.
32    pub fn new(gap_ms: i64) -> Self {
33        assert!(gap_ms > 0, "gap_ms must be > 0");
34        Self {
35            gap_ms,
36            allowed_lateness_ms: 0,
37        }
38    }
39
40    /// Override allowed lateness.
41    pub fn with_lateness(mut self, allowed_lateness_ms: i64) -> Self {
42        self.allowed_lateness_ms = allowed_lateness_ms;
43        self
44    }
45}
46
47// ─── Session state ───────────────────────────────────────────────────────────
48
49#[derive(Debug, Clone)]
50struct Session<E: Clone> {
51    /// Earliest event timestamp in this session.
52    first_ts_ms: i64,
53    /// Latest event timestamp in this session (extends with each new event).
54    last_ts_ms: i64,
55    /// All events in arrival order.
56    events: Vec<(i64, E)>,
57}
58
59impl<E: Clone> Session<E> {
60    fn new(ts_ms: i64, event: E) -> Self {
61        Self {
62            first_ts_ms: ts_ms,
63            last_ts_ms: ts_ms,
64            events: vec![(ts_ms, event)],
65        }
66    }
67
68    fn extend(&mut self, ts_ms: i64, event: E) {
69        if ts_ms < self.first_ts_ms {
70            self.first_ts_ms = ts_ms;
71        }
72        if ts_ms > self.last_ts_ms {
73            self.last_ts_ms = ts_ms;
74        }
75        self.events.push((ts_ms, event));
76    }
77
78    fn end_ms(&self, gap: i64) -> i64 {
79        self.last_ts_ms.saturating_add(gap)
80    }
81
82    fn close_at(&self, gap: i64, lateness: i64) -> i64 {
83        self.end_ms(gap).saturating_add(lateness)
84    }
85}
86
87// ─── SessionSessionJoin ──────────────────────────────────────────────────────
88
89/// Watermark-driven session × session join operator.
90pub struct SessionSessionJoin<L: Clone, R: Clone> {
91    config: SessionSessionJoinConfig,
92    left_sessions: HashMap<WindowJoinKey, Vec<Session<L>>>,
93    right_sessions: HashMap<WindowJoinKey, Vec<Session<R>>>,
94    last_watermark_ms: i64,
95    stats: WindowJoinStats,
96}
97
98impl<L: Clone, R: Clone> SessionSessionJoin<L, R> {
99    /// Create a new join.
100    pub fn new(config: SessionSessionJoinConfig) -> Self {
101        Self {
102            config,
103            left_sessions: HashMap::new(),
104            right_sessions: HashMap::new(),
105            last_watermark_ms: i64::MIN,
106            stats: WindowJoinStats::default(),
107        }
108    }
109
110    fn extend_or_new<E: Clone>(
111        sessions: &mut Vec<Session<E>>,
112        ts_ms: i64,
113        gap: i64,
114        event: E,
115    ) -> bool {
116        // True if we found an existing session within `gap` distance.
117        for s in sessions.iter_mut() {
118            if ts_ms.saturating_sub(s.last_ts_ms).abs() <= gap
119                || ts_ms.saturating_sub(s.first_ts_ms).abs() <= gap
120                || (ts_ms >= s.first_ts_ms && ts_ms <= s.last_ts_ms)
121            {
122                s.extend(ts_ms, event);
123                return true;
124            }
125        }
126        sessions.push(Session::new(ts_ms, event));
127        false
128    }
129
130    fn is_late(&self, ts_ms: i64) -> bool {
131        if self.last_watermark_ms == i64::MIN {
132            return false;
133        }
134        // An event is "late" only if the watermark has already advanced past
135        // both its potential session end + allowed lateness.
136        ts_ms.saturating_add(self.config.gap_ms)
137            < self
138                .last_watermark_ms
139                .saturating_sub(self.config.allowed_lateness_ms)
140    }
141
142    /// Insert a left event.
143    pub fn push_left(&mut self, key: WindowJoinKey, ts_ms: i64, event: L) {
144        if self.is_late(ts_ms) {
145            self.stats.late_events_dropped += 1;
146            return;
147        }
148        self.stats.left_events += 1;
149        let gap = self.config.gap_ms;
150        let entry = self.left_sessions.entry(key).or_default();
151        let _ = Self::extend_or_new(entry, ts_ms, gap, event);
152    }
153
154    /// Insert a right event.
155    pub fn push_right(&mut self, key: WindowJoinKey, ts_ms: i64, event: R) {
156        if self.is_late(ts_ms) {
157            self.stats.late_events_dropped += 1;
158            return;
159        }
160        self.stats.right_events += 1;
161        let gap = self.config.gap_ms;
162        let entry = self.right_sessions.entry(key).or_default();
163        let _ = Self::extend_or_new(entry, ts_ms, gap, event);
164    }
165
166    /// Advance the watermark.  Emits cross-products for pairs of sessions
167    /// (one per side, same key) where *both* sides have closed
168    /// (`end + allowed_lateness ≤ watermark`).  Closed sessions are then
169    /// purged.  If only one side of a key is closed at this watermark, the
170    /// closed session is held until its peer also closes — preventing the
171    /// case where a fast-closing left side would "lose" its potential right
172    /// match.
173    pub fn advance_watermark(&mut self, watermark_ms: i64) -> Vec<WindowJoinResult<L, R>> {
174        if watermark_ms < self.last_watermark_ms {
175            return Vec::new();
176        }
177        self.last_watermark_ms = watermark_ms;
178
179        let gap = self.config.gap_ms;
180        let lat = self.config.allowed_lateness_ms;
181        let mut emitted = Vec::new();
182        let mut purged = 0usize;
183
184        let keys: Vec<WindowJoinKey> = {
185            let mut k: Vec<WindowJoinKey> = self
186                .left_sessions
187                .keys()
188                .chain(self.right_sessions.keys())
189                .cloned()
190                .collect();
191            k.sort();
192            k.dedup();
193            k
194        };
195
196        for key in keys {
197            let left_closed_count = self
198                .left_sessions
199                .get(&key)
200                .map(|v| {
201                    v.iter()
202                        .filter(|s| s.close_at(gap, lat) <= watermark_ms)
203                        .count()
204                })
205                .unwrap_or(0);
206            let right_closed_count = self
207                .right_sessions
208                .get(&key)
209                .map(|v| {
210                    v.iter()
211                        .filter(|s| s.close_at(gap, lat) <= watermark_ms)
212                        .count()
213                })
214                .unwrap_or(0);
215
216            // Only emit + purge when at least one side is fully closed AND
217            // every session on the other side that *could* overlap has also
218            // closed.  We approximate "could overlap" by requiring the entire
219            // other-side bucket to be closed — the standard Flink-style
220            // session-join semantics.
221            let left_total = self.left_sessions.get(&key).map(|v| v.len()).unwrap_or(0);
222            let right_total = self.right_sessions.get(&key).map(|v| v.len()).unwrap_or(0);
223
224            let both_sides_closed = left_closed_count == left_total
225                && right_closed_count == right_total
226                && left_total > 0
227                && right_total > 0;
228            if !both_sides_closed {
229                continue;
230            }
231
232            // Cross product of closed sessions.
233            let lefts: Vec<Session<L>> = self.left_sessions.get(&key).cloned().unwrap_or_default();
234            let rights: Vec<Session<R>> =
235                self.right_sessions.get(&key).cloned().unwrap_or_default();
236            for ls in &lefts {
237                for rs in &rights {
238                    if self.sessions_overlap(ls, rs) {
239                        for (_, l_ev) in &ls.events {
240                            for (_, r_ev) in &rs.events {
241                                emitted.push(WindowJoinResult {
242                                    key: key.clone(),
243                                    left: l_ev.clone(),
244                                    right: r_ev.clone(),
245                                    pane_end_ms: ls.end_ms(gap).max(rs.end_ms(gap)),
246                                });
247                            }
248                        }
249                    }
250                }
251            }
252
253            // Purge closed sessions on both sides.
254            purged += left_total + right_total;
255            self.left_sessions.remove(&key);
256            self.right_sessions.remove(&key);
257        }
258
259        self.stats.joined_pairs += emitted.len() as u64;
260        self.stats.windows_closed += purged as u64;
261        emitted
262    }
263
264    fn sessions_overlap(&self, a: &Session<L>, b: &Session<R>) -> bool {
265        // Two sessions overlap when their gap-extended intervals intersect:
266        // [a.first, a.last + gap] ∩ [b.first, b.last + gap] ≠ ∅
267        let gap = self.config.gap_ms;
268        let a_end = a.last_ts_ms.saturating_add(gap);
269        let b_end = b.last_ts_ms.saturating_add(gap);
270        a.first_ts_ms <= b_end && b.first_ts_ms <= a_end
271    }
272
273    /// Statistics snapshot.
274    pub fn stats(&self) -> &WindowJoinStats {
275        &self.stats
276    }
277
278    /// Number of buffered (unemitted) sessions across both sides.
279    pub fn session_count(&self) -> usize {
280        self.left_sessions.values().map(|v| v.len()).sum::<usize>()
281            + self.right_sessions.values().map(|v| v.len()).sum::<usize>()
282    }
283
284    /// Most recently observed watermark.
285    pub fn watermark(&self) -> i64 {
286        self.last_watermark_ms
287    }
288}
289
290// ─── Tests ───────────────────────────────────────────────────────────────────
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295
296    #[test]
297    fn overlapping_sessions_emit_on_close() {
298        let cfg = SessionSessionJoinConfig::new(500);
299        let mut j: SessionSessionJoin<&str, &str> = SessionSessionJoin::new(cfg);
300        // Left session 100, 200, 300 (last_ts=300, end=800).
301        j.push_left("k".into(), 100, "L0");
302        j.push_left("k".into(), 200, "L1");
303        j.push_left("k".into(), 300, "L2");
304        // Right session 250, 350 (last_ts=350, end=850). Overlaps left.
305        j.push_right("k".into(), 250, "R0");
306        j.push_right("k".into(), 350, "R1");
307        // Watermark must pass max(800, 850) = 850 to close both.
308        let out = j.advance_watermark(900);
309        // 3 left × 2 right = 6 pairs.
310        assert_eq!(out.len(), 6);
311        assert_eq!(j.session_count(), 0);
312    }
313
314    #[test]
315    fn non_overlapping_sessions_dont_emit() {
316        let cfg = SessionSessionJoinConfig::new(50);
317        let mut j: SessionSessionJoin<&str, &str> = SessionSessionJoin::new(cfg);
318        j.push_left("k".into(), 100, "L0"); // session L1 ends 150
319        j.push_right("k".into(), 1_000, "R0"); // session R1 ends 1050
320        let out = j.advance_watermark(2_000);
321        assert!(out.is_empty());
322    }
323
324    #[test]
325    fn separate_keys_dont_join() {
326        let cfg = SessionSessionJoinConfig::new(500);
327        let mut j: SessionSessionJoin<&str, &str> = SessionSessionJoin::new(cfg);
328        j.push_left("a".into(), 100, "La");
329        j.push_right("b".into(), 200, "Rb");
330        let out = j.advance_watermark(2_000);
331        assert!(out.is_empty());
332    }
333
334    #[test]
335    fn late_event_after_emit_is_dropped() {
336        let cfg = SessionSessionJoinConfig::new(50);
337        let mut j: SessionSessionJoin<&str, &str> = SessionSessionJoin::new(cfg);
338        j.push_left("k".into(), 100, "L0");
339        // Advance well past gap so any future event for ts=100 is "late".
340        j.advance_watermark(10_000);
341        j.push_left("k".into(), 100, "Late");
342        assert_eq!(j.stats.late_events_dropped, 1);
343    }
344
345    #[test]
346    fn allowed_lateness_keeps_session_open() {
347        let cfg = SessionSessionJoinConfig::new(50).with_lateness(1_000);
348        let mut j: SessionSessionJoin<&str, &str> = SessionSessionJoin::new(cfg);
349        j.push_left("k".into(), 100, "L0");
350        // Watermark past session end (150) but within lateness budget.
351        let out = j.advance_watermark(800);
352        assert!(out.is_empty());
353        // Late right event still accepted.
354        j.push_right("k".into(), 120, "R0");
355        // Now advance past lateness budget → emits.
356        let out = j.advance_watermark(2_000);
357        assert_eq!(out.len(), 1);
358    }
359
360    #[test]
361    fn watermark_emits_only_closed_sessions() {
362        let cfg = SessionSessionJoinConfig::new(100);
363        let mut j: SessionSessionJoin<&str, &str> = SessionSessionJoin::new(cfg);
364        j.push_left("k".into(), 100, "L0"); // session ends 200
365        j.push_right("k".into(), 150, "R0"); // session ends 250
366                                             // wm = 220 → only left closed, right still open. Emit: nothing yet.
367        let out = j.advance_watermark(220);
368        assert!(out.is_empty());
369        // wm = 260 → both closed.
370        let out = j.advance_watermark(260);
371        assert_eq!(out.len(), 1);
372    }
373}