oxirs_stream/window/joins/
session_session.rs1use std::collections::HashMap;
16
17use super::{WindowJoinKey, WindowJoinResult, WindowJoinStats};
18
19#[derive(Debug, Clone)]
23pub struct SessionSessionJoinConfig {
24 pub gap_ms: i64,
26 pub allowed_lateness_ms: i64,
28}
29
30impl SessionSessionJoinConfig {
31 pub fn new(gap_ms: i64) -> Self {
33 assert!(gap_ms > 0, "gap_ms must be > 0");
34 Self {
35 gap_ms,
36 allowed_lateness_ms: 0,
37 }
38 }
39
40 pub fn with_lateness(mut self, allowed_lateness_ms: i64) -> Self {
42 self.allowed_lateness_ms = allowed_lateness_ms;
43 self
44 }
45}
46
47#[derive(Debug, Clone)]
50struct Session<E: Clone> {
51 first_ts_ms: i64,
53 last_ts_ms: i64,
55 events: Vec<(i64, E)>,
57}
58
59impl<E: Clone> Session<E> {
60 fn new(ts_ms: i64, event: E) -> Self {
61 Self {
62 first_ts_ms: ts_ms,
63 last_ts_ms: ts_ms,
64 events: vec![(ts_ms, event)],
65 }
66 }
67
68 fn extend(&mut self, ts_ms: i64, event: E) {
69 if ts_ms < self.first_ts_ms {
70 self.first_ts_ms = ts_ms;
71 }
72 if ts_ms > self.last_ts_ms {
73 self.last_ts_ms = ts_ms;
74 }
75 self.events.push((ts_ms, event));
76 }
77
78 fn end_ms(&self, gap: i64) -> i64 {
79 self.last_ts_ms.saturating_add(gap)
80 }
81
82 fn close_at(&self, gap: i64, lateness: i64) -> i64 {
83 self.end_ms(gap).saturating_add(lateness)
84 }
85}
86
87pub struct SessionSessionJoin<L: Clone, R: Clone> {
91 config: SessionSessionJoinConfig,
92 left_sessions: HashMap<WindowJoinKey, Vec<Session<L>>>,
93 right_sessions: HashMap<WindowJoinKey, Vec<Session<R>>>,
94 last_watermark_ms: i64,
95 stats: WindowJoinStats,
96}
97
98impl<L: Clone, R: Clone> SessionSessionJoin<L, R> {
99 pub fn new(config: SessionSessionJoinConfig) -> Self {
101 Self {
102 config,
103 left_sessions: HashMap::new(),
104 right_sessions: HashMap::new(),
105 last_watermark_ms: i64::MIN,
106 stats: WindowJoinStats::default(),
107 }
108 }
109
110 fn extend_or_new<E: Clone>(
111 sessions: &mut Vec<Session<E>>,
112 ts_ms: i64,
113 gap: i64,
114 event: E,
115 ) -> bool {
116 for s in sessions.iter_mut() {
118 if ts_ms.saturating_sub(s.last_ts_ms).abs() <= gap
119 || ts_ms.saturating_sub(s.first_ts_ms).abs() <= gap
120 || (ts_ms >= s.first_ts_ms && ts_ms <= s.last_ts_ms)
121 {
122 s.extend(ts_ms, event);
123 return true;
124 }
125 }
126 sessions.push(Session::new(ts_ms, event));
127 false
128 }
129
130 fn is_late(&self, ts_ms: i64) -> bool {
131 if self.last_watermark_ms == i64::MIN {
132 return false;
133 }
134 ts_ms.saturating_add(self.config.gap_ms)
137 < self
138 .last_watermark_ms
139 .saturating_sub(self.config.allowed_lateness_ms)
140 }
141
142 pub fn push_left(&mut self, key: WindowJoinKey, ts_ms: i64, event: L) {
144 if self.is_late(ts_ms) {
145 self.stats.late_events_dropped += 1;
146 return;
147 }
148 self.stats.left_events += 1;
149 let gap = self.config.gap_ms;
150 let entry = self.left_sessions.entry(key).or_default();
151 let _ = Self::extend_or_new(entry, ts_ms, gap, event);
152 }
153
154 pub fn push_right(&mut self, key: WindowJoinKey, ts_ms: i64, event: R) {
156 if self.is_late(ts_ms) {
157 self.stats.late_events_dropped += 1;
158 return;
159 }
160 self.stats.right_events += 1;
161 let gap = self.config.gap_ms;
162 let entry = self.right_sessions.entry(key).or_default();
163 let _ = Self::extend_or_new(entry, ts_ms, gap, event);
164 }
165
166 pub fn advance_watermark(&mut self, watermark_ms: i64) -> Vec<WindowJoinResult<L, R>> {
174 if watermark_ms < self.last_watermark_ms {
175 return Vec::new();
176 }
177 self.last_watermark_ms = watermark_ms;
178
179 let gap = self.config.gap_ms;
180 let lat = self.config.allowed_lateness_ms;
181 let mut emitted = Vec::new();
182 let mut purged = 0usize;
183
184 let keys: Vec<WindowJoinKey> = {
185 let mut k: Vec<WindowJoinKey> = self
186 .left_sessions
187 .keys()
188 .chain(self.right_sessions.keys())
189 .cloned()
190 .collect();
191 k.sort();
192 k.dedup();
193 k
194 };
195
196 for key in keys {
197 let left_closed_count = self
198 .left_sessions
199 .get(&key)
200 .map(|v| {
201 v.iter()
202 .filter(|s| s.close_at(gap, lat) <= watermark_ms)
203 .count()
204 })
205 .unwrap_or(0);
206 let right_closed_count = self
207 .right_sessions
208 .get(&key)
209 .map(|v| {
210 v.iter()
211 .filter(|s| s.close_at(gap, lat) <= watermark_ms)
212 .count()
213 })
214 .unwrap_or(0);
215
216 let left_total = self.left_sessions.get(&key).map(|v| v.len()).unwrap_or(0);
222 let right_total = self.right_sessions.get(&key).map(|v| v.len()).unwrap_or(0);
223
224 let both_sides_closed = left_closed_count == left_total
225 && right_closed_count == right_total
226 && left_total > 0
227 && right_total > 0;
228 if !both_sides_closed {
229 continue;
230 }
231
232 let lefts: Vec<Session<L>> = self.left_sessions.get(&key).cloned().unwrap_or_default();
234 let rights: Vec<Session<R>> =
235 self.right_sessions.get(&key).cloned().unwrap_or_default();
236 for ls in &lefts {
237 for rs in &rights {
238 if self.sessions_overlap(ls, rs) {
239 for (_, l_ev) in &ls.events {
240 for (_, r_ev) in &rs.events {
241 emitted.push(WindowJoinResult {
242 key: key.clone(),
243 left: l_ev.clone(),
244 right: r_ev.clone(),
245 pane_end_ms: ls.end_ms(gap).max(rs.end_ms(gap)),
246 });
247 }
248 }
249 }
250 }
251 }
252
253 purged += left_total + right_total;
255 self.left_sessions.remove(&key);
256 self.right_sessions.remove(&key);
257 }
258
259 self.stats.joined_pairs += emitted.len() as u64;
260 self.stats.windows_closed += purged as u64;
261 emitted
262 }
263
264 fn sessions_overlap(&self, a: &Session<L>, b: &Session<R>) -> bool {
265 let gap = self.config.gap_ms;
268 let a_end = a.last_ts_ms.saturating_add(gap);
269 let b_end = b.last_ts_ms.saturating_add(gap);
270 a.first_ts_ms <= b_end && b.first_ts_ms <= a_end
271 }
272
273 pub fn stats(&self) -> &WindowJoinStats {
275 &self.stats
276 }
277
278 pub fn session_count(&self) -> usize {
280 self.left_sessions.values().map(|v| v.len()).sum::<usize>()
281 + self.right_sessions.values().map(|v| v.len()).sum::<usize>()
282 }
283
284 pub fn watermark(&self) -> i64 {
286 self.last_watermark_ms
287 }
288}
289
290#[cfg(test)]
293mod tests {
294 use super::*;
295
296 #[test]
297 fn overlapping_sessions_emit_on_close() {
298 let cfg = SessionSessionJoinConfig::new(500);
299 let mut j: SessionSessionJoin<&str, &str> = SessionSessionJoin::new(cfg);
300 j.push_left("k".into(), 100, "L0");
302 j.push_left("k".into(), 200, "L1");
303 j.push_left("k".into(), 300, "L2");
304 j.push_right("k".into(), 250, "R0");
306 j.push_right("k".into(), 350, "R1");
307 let out = j.advance_watermark(900);
309 assert_eq!(out.len(), 6);
311 assert_eq!(j.session_count(), 0);
312 }
313
314 #[test]
315 fn non_overlapping_sessions_dont_emit() {
316 let cfg = SessionSessionJoinConfig::new(50);
317 let mut j: SessionSessionJoin<&str, &str> = SessionSessionJoin::new(cfg);
318 j.push_left("k".into(), 100, "L0"); j.push_right("k".into(), 1_000, "R0"); let out = j.advance_watermark(2_000);
321 assert!(out.is_empty());
322 }
323
324 #[test]
325 fn separate_keys_dont_join() {
326 let cfg = SessionSessionJoinConfig::new(500);
327 let mut j: SessionSessionJoin<&str, &str> = SessionSessionJoin::new(cfg);
328 j.push_left("a".into(), 100, "La");
329 j.push_right("b".into(), 200, "Rb");
330 let out = j.advance_watermark(2_000);
331 assert!(out.is_empty());
332 }
333
334 #[test]
335 fn late_event_after_emit_is_dropped() {
336 let cfg = SessionSessionJoinConfig::new(50);
337 let mut j: SessionSessionJoin<&str, &str> = SessionSessionJoin::new(cfg);
338 j.push_left("k".into(), 100, "L0");
339 j.advance_watermark(10_000);
341 j.push_left("k".into(), 100, "Late");
342 assert_eq!(j.stats.late_events_dropped, 1);
343 }
344
345 #[test]
346 fn allowed_lateness_keeps_session_open() {
347 let cfg = SessionSessionJoinConfig::new(50).with_lateness(1_000);
348 let mut j: SessionSessionJoin<&str, &str> = SessionSessionJoin::new(cfg);
349 j.push_left("k".into(), 100, "L0");
350 let out = j.advance_watermark(800);
352 assert!(out.is_empty());
353 j.push_right("k".into(), 120, "R0");
355 let out = j.advance_watermark(2_000);
357 assert_eq!(out.len(), 1);
358 }
359
360 #[test]
361 fn watermark_emits_only_closed_sessions() {
362 let cfg = SessionSessionJoinConfig::new(100);
363 let mut j: SessionSessionJoin<&str, &str> = SessionSessionJoin::new(cfg);
364 j.push_left("k".into(), 100, "L0"); j.push_right("k".into(), 150, "R0"); let out = j.advance_watermark(220);
368 assert!(out.is_empty());
369 let out = j.advance_watermark(260);
371 assert_eq!(out.len(), 1);
372 }
373}