1use super::window::{
43 Accumulator, Aggregator, ChangelogRecord, EmitStrategy, LateDataConfig, LateDataMetrics,
44 ResultToI64, WindowId,
45};
46use super::{
47 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
48};
49use crate::state::{StateStore, StateStoreExt};
50use arrow_array::{Array, Int64Array, RecordBatch};
51use arrow_schema::{DataType, Field, Schema, SchemaRef};
52use fxhash::FxHashMap;
53use rkyv::{
54 api::high::{HighDeserializer, HighSerializer, HighValidator},
55 bytecheck::CheckBytes,
56 rancor::Error as RkyvError,
57 ser::allocator::ArenaHandle,
58 util::AlignedVec,
59 Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
60};
61use std::marker::PhantomData;
62use std::sync::atomic::{AtomicU64, Ordering};
63use std::sync::Arc;
64use std::time::Duration;
65
66const SESSION_STATE_PREFIX: &[u8; 4] = b"ses:";
68
69const SESSION_ACC_PREFIX: &[u8; 4] = b"sac:";
71
72const SESSION_TIMER_PREFIX: u8 = 0x01;
74
75static SESSION_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
77
78#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
83pub struct SessionState {
84 pub start: i64,
86 pub end: i64,
88 pub key: Vec<u8>,
90}
91
92impl SessionState {
93 fn new(timestamp: i64, gap_ms: i64, key: Vec<u8>) -> Self {
95 Self {
96 start: timestamp,
97 end: timestamp + gap_ms,
98 key,
99 }
100 }
101
102 #[must_use]
104 pub fn window_id(&self) -> WindowId {
105 WindowId::new(self.start, self.end)
106 }
107
108 fn contains(&self, timestamp: i64, gap_ms: i64) -> bool {
110 timestamp >= self.start && timestamp < self.end + gap_ms
113 }
114
115 fn extend(&mut self, timestamp: i64, gap_ms: i64) {
117 self.start = self.start.min(timestamp);
118 self.end = self.end.max(timestamp + gap_ms);
119 }
120
121 #[allow(dead_code)]
128 fn merge(&mut self, other: &SessionState) {
129 self.start = self.start.min(other.start);
130 self.end = self.end.max(other.end);
131 }
132}
133
134fn create_session_output_schema() -> SchemaRef {
136 Arc::new(Schema::new(vec![
137 Field::new("session_start", DataType::Int64, false),
138 Field::new("session_end", DataType::Int64, false),
139 Field::new("result", DataType::Int64, false),
140 ]))
141}
142
143pub struct SessionWindowOperator<A: Aggregator> {
169 gap_ms: i64,
171 aggregator: A,
173 allowed_lateness_ms: i64,
175 active_sessions: FxHashMap<u64, SessionState>,
177 pending_timers: FxHashMap<u64, i64>,
179 emit_strategy: EmitStrategy,
181 late_data_config: LateDataConfig,
183 late_data_metrics: LateDataMetrics,
185 operator_id: String,
187 output_schema: SchemaRef,
189 key_column: Option<usize>,
191 _phantom: PhantomData<A::Acc>,
193}
194
195impl<A: Aggregator> SessionWindowOperator<A>
196where
197 A::Acc: Archive + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
198 <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
199 + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
200{
201 pub fn new(gap: Duration, aggregator: A, allowed_lateness: Duration) -> Self {
213 let operator_num = SESSION_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
214 Self {
215 gap_ms: i64::try_from(gap.as_millis()).expect("Gap must fit in i64"),
216 aggregator,
217 allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
218 .expect("Allowed lateness must fit in i64"),
219 active_sessions: FxHashMap::default(),
220 pending_timers: FxHashMap::default(),
221 emit_strategy: EmitStrategy::default(),
222 late_data_config: LateDataConfig::default(),
223 late_data_metrics: LateDataMetrics::new(),
224 operator_id: format!("session_window_{operator_num}"),
225 output_schema: create_session_output_schema(),
226 key_column: None,
227 _phantom: PhantomData,
228 }
229 }
230
231 pub fn with_id(
237 gap: Duration,
238 aggregator: A,
239 allowed_lateness: Duration,
240 operator_id: String,
241 ) -> Self {
242 Self {
243 gap_ms: i64::try_from(gap.as_millis()).expect("Gap must fit in i64"),
244 aggregator,
245 allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
246 .expect("Allowed lateness must fit in i64"),
247 active_sessions: FxHashMap::default(),
248 pending_timers: FxHashMap::default(),
249 emit_strategy: EmitStrategy::default(),
250 late_data_config: LateDataConfig::default(),
251 late_data_metrics: LateDataMetrics::new(),
252 operator_id,
253 output_schema: create_session_output_schema(),
254 key_column: None,
255 _phantom: PhantomData,
256 }
257 }
258
259 pub fn set_key_column(&mut self, column_index: usize) {
263 self.key_column = Some(column_index);
264 }
265
266 #[must_use]
268 pub fn key_column(&self) -> Option<usize> {
269 self.key_column
270 }
271
272 pub fn set_emit_strategy(&mut self, strategy: EmitStrategy) {
274 self.emit_strategy = strategy;
275 }
276
277 #[must_use]
279 pub fn emit_strategy(&self) -> &EmitStrategy {
280 &self.emit_strategy
281 }
282
283 pub fn set_late_data_config(&mut self, config: LateDataConfig) {
285 self.late_data_config = config;
286 }
287
288 #[must_use]
290 pub fn late_data_config(&self) -> &LateDataConfig {
291 &self.late_data_config
292 }
293
294 #[must_use]
296 pub fn late_data_metrics(&self) -> &LateDataMetrics {
297 &self.late_data_metrics
298 }
299
300 pub fn reset_late_data_metrics(&mut self) {
302 self.late_data_metrics.reset();
303 }
304
305 #[must_use]
307 pub fn gap_ms(&self) -> i64 {
308 self.gap_ms
309 }
310
311 #[must_use]
313 pub fn allowed_lateness_ms(&self) -> i64 {
314 self.allowed_lateness_ms
315 }
316
317 #[must_use]
319 pub fn active_session_count(&self) -> usize {
320 self.active_sessions.len()
321 }
322
323 fn extract_key(&self, event: &Event) -> Vec<u8> {
325 use arrow_array::cast::AsArray;
326 use arrow_array::types::Int64Type;
327
328 if let Some(col_idx) = self.key_column {
329 if col_idx < event.data.num_columns() {
330 let column = event.data.column(col_idx);
331 if let Some(array) = column.as_primitive_opt::<Int64Type>() {
332 if !array.is_empty() && !array.is_null(0) {
333 return array.value(0).to_be_bytes().to_vec();
334 }
335 }
336 if let Some(array) = column.as_string_opt::<i32>() {
338 if !array.is_empty() && !array.is_null(0) {
339 return array.value(0).as_bytes().to_vec();
340 }
341 }
342 }
343 }
344 Vec::new()
346 }
347
348 fn key_hash(key: &[u8]) -> u64 {
350 use std::hash::{Hash, Hasher};
351 let mut hasher = fxhash::FxHasher::default();
352 key.hash(&mut hasher);
353 hasher.finish()
354 }
355
356 fn session_state_key(key_hash: u64) -> [u8; 12] {
358 let mut key = [0u8; 12];
359 key[..4].copy_from_slice(SESSION_STATE_PREFIX);
360 key[4..12].copy_from_slice(&key_hash.to_be_bytes());
361 key
362 }
363
364 fn session_acc_key(key_hash: u64) -> [u8; 12] {
366 let mut key = [0u8; 12];
367 key[..4].copy_from_slice(SESSION_ACC_PREFIX);
368 key[4..12].copy_from_slice(&key_hash.to_be_bytes());
369 key
370 }
371
372 fn timer_key(key_hash: u64) -> super::TimerKey {
374 let mut key = super::TimerKey::new();
375 key.push(SESSION_TIMER_PREFIX);
376 key.extend_from_slice(&key_hash.to_be_bytes());
377 key
378 }
379
380 fn key_hash_from_timer(timer_key: &[u8]) -> Option<u64> {
382 if timer_key.len() != 9 || timer_key[0] != SESSION_TIMER_PREFIX {
383 return None;
384 }
385 let hash_bytes: [u8; 8] = timer_key[1..9].try_into().ok()?;
386 Some(u64::from_be_bytes(hash_bytes))
387 }
388
389 fn get_or_create_session(
391 &mut self,
392 key_hash: u64,
393 key: Vec<u8>,
394 timestamp: i64,
395 state: &mut dyn StateStore,
396 ) -> SessionState {
397 if let Some(session) = self.active_sessions.get(&key_hash) {
399 return session.clone();
400 }
401
402 let state_key = Self::session_state_key(key_hash);
404 if let Ok(Some(session)) = state.get_typed::<SessionState>(&state_key) {
405 self.active_sessions.insert(key_hash, session.clone());
406 return session;
407 }
408
409 let session = SessionState::new(timestamp, self.gap_ms, key);
411 self.active_sessions.insert(key_hash, session.clone());
412 session
413 }
414
415 fn get_accumulator(&self, key_hash: u64, state: &dyn StateStore) -> A::Acc {
417 let acc_key = Self::session_acc_key(key_hash);
418 state
419 .get_typed::<A::Acc>(&acc_key)
420 .ok()
421 .flatten()
422 .unwrap_or_else(|| self.aggregator.create_accumulator())
423 }
424
425 fn put_session(
427 key_hash: u64,
428 session: &SessionState,
429 acc: &A::Acc,
430 state: &mut dyn StateStore,
431 ) -> Result<(), OperatorError> {
432 let state_key = Self::session_state_key(key_hash);
433 let acc_key = Self::session_acc_key(key_hash);
434
435 state
436 .put_typed(&state_key, session)
437 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))?;
438 state
439 .put_typed(&acc_key, acc)
440 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))?;
441
442 Ok(())
443 }
444
445 fn delete_session(
447 &mut self,
448 key_hash: u64,
449 state: &mut dyn StateStore,
450 ) -> Result<(), OperatorError> {
451 let state_key = Self::session_state_key(key_hash);
452 let acc_key = Self::session_acc_key(key_hash);
453
454 state
455 .delete(&state_key)
456 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))?;
457 state
458 .delete(&acc_key)
459 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))?;
460
461 self.active_sessions.remove(&key_hash);
462 self.pending_timers.remove(&key_hash);
463
464 Ok(())
465 }
466
467 fn register_timer(&mut self, key_hash: u64, session: &SessionState, ctx: &mut OperatorContext) {
469 let trigger_time = session.end + self.allowed_lateness_ms;
470
471 if let Some(&old_time) = self.pending_timers.get(&key_hash) {
473 if old_time == trigger_time {
474 return; }
476 }
478
479 let timer_key = Self::timer_key(key_hash);
480 ctx.timers
481 .register_timer(trigger_time, Some(timer_key), Some(ctx.operator_index));
482 self.pending_timers.insert(key_hash, trigger_time);
483 }
484
485 fn is_late(&self, timestamp: i64, watermark: i64) -> bool {
487 let potential_cleanup = timestamp + self.gap_ms + self.allowed_lateness_ms;
490 watermark >= potential_cleanup
491 }
492
493 fn create_output(&self, session: &SessionState, acc: &A::Acc) -> Option<Event> {
495 if acc.is_empty() {
496 return None;
497 }
498
499 let result = acc.result();
500 let result_i64 = result.to_i64();
501
502 let batch = RecordBatch::try_new(
503 Arc::clone(&self.output_schema),
504 vec![
505 Arc::new(Int64Array::from(vec![session.start])),
506 Arc::new(Int64Array::from(vec![session.end])),
507 Arc::new(Int64Array::from(vec![result_i64])),
508 ],
509 )
510 .ok()?;
511
512 Some(Event::new(session.end, batch))
513 }
514
515 #[allow(dead_code)]
523 fn find_overlapping_sessions(&self, key_hash: u64, _timestamp: i64) -> Vec<u64> {
524 if self.active_sessions.contains_key(&key_hash) {
527 vec![key_hash]
528 } else {
529 vec![]
530 }
531 }
532}
533
534impl<A: Aggregator> Operator for SessionWindowOperator<A>
535where
536 A::Acc: 'static
537 + Archive
538 + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
539 <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
540 + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
541{
542 fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
543 let event_time = event.timestamp;
544 let mut output = OutputVec::new();
545
546 let emitted_watermark = ctx.watermark_generator.on_event(event_time);
548
549 let current_wm = ctx.watermark_generator.current_watermark();
551 if current_wm > i64::MIN && self.is_late(event_time, current_wm) {
552 if self.emit_strategy.drops_late_data() {
554 self.late_data_metrics.record_dropped();
555 return output;
556 }
557
558 if let Some(side_output_name) = self.late_data_config.side_output() {
559 self.late_data_metrics.record_side_output();
560 output.push(Output::SideOutput {
561 name: side_output_name.to_string(),
562 event: event.clone(),
563 });
564 } else {
565 self.late_data_metrics.record_dropped();
566 output.push(Output::LateEvent(event.clone()));
567 }
568 return output;
569 }
570
571 let key = self.extract_key(event);
573 let key_hash = Self::key_hash(&key);
574
575 let mut session = self.get_or_create_session(key_hash, key.clone(), event_time, ctx.state);
577
578 if session.contains(event_time, self.gap_ms) {
580 session.extend(event_time, self.gap_ms);
582 } else if event_time < session.start {
583 session.extend(event_time, self.gap_ms);
585 } else {
586 if matches!(self.emit_strategy, EmitStrategy::OnUpdate) {
589 let old_acc = self.get_accumulator(key_hash, ctx.state);
590 if let Some(old_event) = self.create_output(&session, &old_acc) {
591 output.push(Output::Event(old_event));
592 }
593 }
594
595 session = SessionState::new(event_time, self.gap_ms, key);
597 let new_acc = self.aggregator.create_accumulator();
599 let _ = Self::put_session(key_hash, &session, &new_acc, ctx.state);
600 }
601
602 let mut acc = self.get_accumulator(key_hash, ctx.state);
604 if let Some(value) = self.aggregator.extract(event) {
605 acc.add(value);
606 }
607
608 if Self::put_session(key_hash, &session, &acc, ctx.state).is_ok() {
610 self.active_sessions.insert(key_hash, session.clone());
611 }
612
613 self.register_timer(key_hash, &session, ctx);
615
616 if let Some(wm) = emitted_watermark {
618 output.push(Output::Watermark(wm.timestamp()));
619 }
620
621 match &self.emit_strategy {
623 EmitStrategy::OnUpdate => {
624 if let Some(event) = self.create_output(&session, &acc) {
625 output.push(Output::Event(event));
626 }
627 }
628 EmitStrategy::Changelog => {
629 if let Some(event) = self.create_output(&session, &acc) {
630 let record = ChangelogRecord::insert(event, ctx.processing_time);
631 output.push(Output::Changelog(record));
632 }
633 }
634 EmitStrategy::OnWatermark
636 | EmitStrategy::Periodic(_)
637 | EmitStrategy::OnWindowClose
638 | EmitStrategy::Final => {}
639 }
640
641 output
642 }
643
644 fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
645 let mut output = OutputVec::new();
646
647 let Some(key_hash) = Self::key_hash_from_timer(&timer.key) else {
649 return output;
650 };
651
652 let Some(expected_time) = self.pending_timers.get(&key_hash) else {
654 return output; };
656
657 if *expected_time != timer.timestamp {
658 return output; }
660
661 let Some(session) = self.active_sessions.get(&key_hash).cloned() else {
663 self.pending_timers.remove(&key_hash);
664 return output;
665 };
666
667 let acc = self.get_accumulator(key_hash, ctx.state);
669
670 if let Some(event) = self.create_output(&session, &acc) {
672 match &self.emit_strategy {
673 EmitStrategy::Changelog => {
674 let record = ChangelogRecord::insert(event, ctx.processing_time);
675 output.push(Output::Changelog(record));
676 }
677 _ => {
678 output.push(Output::Event(event));
679 }
680 }
681 }
682
683 let _ = self.delete_session(key_hash, ctx.state);
685
686 output
687 }
688
689 fn checkpoint(&self) -> OperatorState {
690 let checkpoint_data: Vec<(u64, i64)> =
692 self.pending_timers.iter().map(|(&k, &v)| (k, v)).collect();
693
694 let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
695 .map(|v| v.to_vec())
696 .unwrap_or_default();
697
698 OperatorState {
699 operator_id: self.operator_id.clone(),
700 data,
701 }
702 }
703
704 fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
705 if state.operator_id != self.operator_id {
706 return Err(OperatorError::StateAccessFailed(format!(
707 "Operator ID mismatch: expected {}, got {}",
708 self.operator_id, state.operator_id
709 )));
710 }
711
712 if state.data.is_empty() {
713 return Ok(());
714 }
715
716 let archived = rkyv::access::<rkyv::Archived<Vec<(u64, i64)>>, RkyvError>(&state.data)
717 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
718 let timers: Vec<(u64, i64)> = rkyv::deserialize::<Vec<(u64, i64)>, RkyvError>(archived)
719 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
720
721 self.pending_timers = timers.into_iter().collect();
722 Ok(())
725 }
726}
727
728#[derive(Debug, Clone, Default)]
730pub struct SessionMetrics {
731 pub sessions_created: u64,
733 pub sessions_closed: u64,
735 pub sessions_merged: u64,
737 pub active_sessions: u64,
739}
740
741#[cfg(test)]
742mod tests {
743 use super::*;
744 use crate::operator::window::{CountAccumulator, CountAggregator, SumAggregator};
745 use crate::state::InMemoryStore;
746 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
747 use arrow_array::{Int64Array, RecordBatch};
748 use arrow_schema::{DataType, Field, Schema};
749
750 fn create_test_event(timestamp: i64, value: i64) -> Event {
751 let schema = Arc::new(Schema::new(vec![Field::new(
752 "value",
753 DataType::Int64,
754 false,
755 )]));
756 let batch =
757 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
758 Event::new(timestamp, batch)
759 }
760
761 fn create_keyed_event(timestamp: i64, key: i64, value: i64) -> Event {
762 let schema = Arc::new(Schema::new(vec![
763 Field::new("key", DataType::Int64, false),
764 Field::new("value", DataType::Int64, false),
765 ]));
766 let batch = RecordBatch::try_new(
767 schema,
768 vec![
769 Arc::new(Int64Array::from(vec![key])),
770 Arc::new(Int64Array::from(vec![value])),
771 ],
772 )
773 .unwrap();
774 Event::new(timestamp, batch)
775 }
776
777 fn create_test_context<'a>(
778 timers: &'a mut TimerService,
779 state: &'a mut dyn StateStore,
780 watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
781 ) -> OperatorContext<'a> {
782 OperatorContext {
783 event_time: 0,
784 processing_time: 0,
785 timers,
786 state,
787 watermark_generator: watermark_gen,
788 operator_index: 0,
789 }
790 }
791
792 #[test]
793 fn test_session_operator_creation() {
794 let aggregator = CountAggregator::new();
795 let operator = SessionWindowOperator::new(
796 Duration::from_secs(30),
797 aggregator,
798 Duration::from_secs(60),
799 );
800
801 assert_eq!(operator.gap_ms(), 30_000);
802 assert_eq!(operator.allowed_lateness_ms(), 60_000);
803 assert_eq!(operator.active_session_count(), 0);
804 assert_eq!(*operator.emit_strategy(), EmitStrategy::OnWatermark);
805 }
806
807 #[test]
808 fn test_session_operator_with_id() {
809 let aggregator = CountAggregator::new();
810 let operator = SessionWindowOperator::with_id(
811 Duration::from_secs(30),
812 aggregator,
813 Duration::from_secs(0),
814 "test_session".to_string(),
815 );
816
817 assert_eq!(operator.operator_id, "test_session");
818 }
819
820 #[test]
821 fn test_session_state_creation() {
822 let state = SessionState::new(1000, 5000, vec![1, 2, 3]);
823
824 assert_eq!(state.start, 1000);
825 assert_eq!(state.end, 6000); assert_eq!(state.key, vec![1, 2, 3]);
827 }
828
829 #[test]
830 fn test_session_state_contains() {
831 let state = SessionState::new(1000, 5000, vec![]);
832
833 assert!(state.contains(1000, 5000));
835 assert!(state.contains(3000, 5000));
836 assert!(state.contains(5999, 5000)); assert!(state.contains(6000, 5000));
840 assert!(state.contains(10999, 5000)); assert!(!state.contains(999, 5000)); assert!(!state.contains(11000, 5000)); }
846
847 #[test]
848 fn test_session_state_extend() {
849 let mut state = SessionState::new(1000, 5000, vec![]);
850
851 state.extend(8000, 5000);
853 assert_eq!(state.start, 1000);
854 assert_eq!(state.end, 13000); state.extend(500, 5000);
858 assert_eq!(state.start, 500);
859 assert_eq!(state.end, 13000); }
861
862 #[test]
863 fn test_session_state_merge() {
864 let mut state1 = SessionState::new(1000, 5000, vec![]);
865 let state2 = SessionState::new(8000, 5000, vec![]);
866
867 state1.merge(&state2);
868 assert_eq!(state1.start, 1000);
869 assert_eq!(state1.end, 13000); }
871
872 #[test]
873 fn test_session_single_event() {
874 let aggregator = CountAggregator::new();
875 let mut operator = SessionWindowOperator::with_id(
876 Duration::from_millis(1000),
877 aggregator,
878 Duration::from_millis(0),
879 "test_op".to_string(),
880 );
881
882 let mut timers = TimerService::new();
883 let mut state = InMemoryStore::new();
884 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
885
886 let event = create_test_event(500, 1);
887 {
888 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
889 operator.process(&event, &mut ctx);
890 }
891
892 assert_eq!(operator.active_session_count(), 1);
893 assert_eq!(operator.pending_timers.len(), 1);
894 }
895
896 #[test]
897 fn test_session_multiple_events_same_session() {
898 let aggregator = CountAggregator::new();
899 let mut operator = SessionWindowOperator::with_id(
900 Duration::from_millis(1000),
901 aggregator,
902 Duration::from_millis(0),
903 "test_op".to_string(),
904 );
905
906 let mut timers = TimerService::new();
907 let mut state = InMemoryStore::new();
908 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
909
910 for ts in [100, 500, 900, 1500] {
912 let event = create_test_event(ts, 1);
913 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
914 operator.process(&event, &mut ctx);
915 }
916
917 assert_eq!(operator.active_session_count(), 1);
919
920 let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
922 let acc: CountAccumulator = operator.get_accumulator(key_hash, &state);
923 assert_eq!(acc.result(), 4);
924 }
925
926 #[test]
927 fn test_session_gap_creates_new_session() {
928 let aggregator = CountAggregator::new();
929 let mut operator = SessionWindowOperator::with_id(
930 Duration::from_millis(1000),
931 aggregator,
932 Duration::from_millis(0),
933 "test_op".to_string(),
934 );
935 operator.set_emit_strategy(EmitStrategy::OnUpdate);
936
937 let mut timers = TimerService::new();
938 let mut state = InMemoryStore::new();
939 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
940
941 let event1 = create_test_event(100, 1);
943 {
944 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
945 operator.process(&event1, &mut ctx);
946 }
947
948 let event2 = create_test_event(3000, 1);
950 let outputs = {
951 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
952 operator.process(&event2, &mut ctx)
953 };
954
955 let event_count = outputs
957 .iter()
958 .filter(|o| matches!(o, Output::Event(_)))
959 .count();
960 assert!(event_count >= 1);
961 }
962
963 #[test]
964 fn test_session_timer_triggers_emission() {
965 let aggregator = CountAggregator::new();
966 let mut operator = SessionWindowOperator::with_id(
967 Duration::from_millis(1000),
968 aggregator,
969 Duration::from_millis(0),
970 "test_op".to_string(),
971 );
972
973 let mut timers = TimerService::new();
974 let mut state = InMemoryStore::new();
975 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
976
977 let event = create_test_event(500, 1);
979 {
980 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
981 operator.process(&event, &mut ctx);
982 }
983
984 let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
986 let timer_time = *operator.pending_timers.get(&key_hash).unwrap();
987
988 let timer = Timer {
990 key: SessionWindowOperator::<CountAggregator>::timer_key(key_hash),
991 timestamp: timer_time,
992 };
993
994 let outputs = {
995 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
996 operator.on_timer(timer, &mut ctx)
997 };
998
999 assert_eq!(outputs.len(), 1);
1000 match &outputs[0] {
1001 Output::Event(e) => {
1002 assert_eq!(e.timestamp, 1500); let result = e
1004 .data
1005 .column(2)
1006 .as_any()
1007 .downcast_ref::<Int64Array>()
1008 .unwrap()
1009 .value(0);
1010 assert_eq!(result, 1);
1011 }
1012 _ => panic!("Expected Event output"),
1013 }
1014
1015 assert_eq!(operator.active_session_count(), 0);
1017 }
1018
1019 #[test]
1020 fn test_session_keyed_tracking() {
1021 let aggregator = SumAggregator::new(1); let mut operator = SessionWindowOperator::with_id(
1023 Duration::from_millis(1000),
1024 aggregator,
1025 Duration::from_millis(0),
1026 "test_op".to_string(),
1027 );
1028 operator.set_key_column(0); let mut timers = TimerService::new();
1031 let mut state = InMemoryStore::new();
1032 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1033
1034 let event1 = create_keyed_event(100, 1, 10);
1036 let event2 = create_keyed_event(500, 1, 20);
1037
1038 let event3 = create_keyed_event(200, 2, 100);
1040 let event4 = create_keyed_event(600, 2, 200);
1041
1042 for event in [event1, event2, event3, event4] {
1043 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1044 operator.process(&event, &mut ctx);
1045 }
1046
1047 assert_eq!(operator.active_session_count(), 2);
1049 }
1050
1051 #[test]
1052 fn test_session_late_event_dropped() {
1053 let aggregator = CountAggregator::new();
1054 let mut operator = SessionWindowOperator::with_id(
1055 Duration::from_millis(1000),
1056 aggregator,
1057 Duration::from_millis(0),
1058 "test_op".to_string(),
1059 );
1060
1061 let mut timers = TimerService::new();
1062 let mut state = InMemoryStore::new();
1063 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1064
1065 let event1 = create_test_event(10000, 1);
1067 {
1068 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1069 operator.process(&event1, &mut ctx);
1070 }
1071
1072 let late_event = create_test_event(100, 1);
1074 let outputs = {
1075 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1076 operator.process(&late_event, &mut ctx)
1077 };
1078
1079 let is_late = outputs.iter().any(|o| matches!(o, Output::LateEvent(_)));
1081 assert!(is_late);
1082 assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
1083 }
1084
1085 #[test]
1086 fn test_session_late_event_side_output() {
1087 let aggregator = CountAggregator::new();
1088 let mut operator = SessionWindowOperator::with_id(
1089 Duration::from_millis(1000),
1090 aggregator,
1091 Duration::from_millis(0),
1092 "test_op".to_string(),
1093 );
1094 operator.set_late_data_config(LateDataConfig::with_side_output("late".to_string()));
1095
1096 let mut timers = TimerService::new();
1097 let mut state = InMemoryStore::new();
1098 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1099
1100 let event1 = create_test_event(10000, 1);
1102 {
1103 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1104 operator.process(&event1, &mut ctx);
1105 }
1106
1107 let late_event = create_test_event(100, 1);
1109 let outputs = {
1110 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1111 operator.process(&late_event, &mut ctx)
1112 };
1113
1114 let side_output = outputs.iter().find_map(|o| {
1116 if let Output::SideOutput { name, .. } = o {
1117 Some(name.clone())
1118 } else {
1119 None
1120 }
1121 });
1122 assert_eq!(side_output, Some("late".to_string()));
1123 assert_eq!(operator.late_data_metrics().late_events_side_output(), 1);
1124 }
1125
1126 #[test]
1127 fn test_session_emit_on_update() {
1128 let aggregator = CountAggregator::new();
1129 let mut operator = SessionWindowOperator::with_id(
1130 Duration::from_millis(1000),
1131 aggregator,
1132 Duration::from_millis(0),
1133 "test_op".to_string(),
1134 );
1135 operator.set_emit_strategy(EmitStrategy::OnUpdate);
1136
1137 let mut timers = TimerService::new();
1138 let mut state = InMemoryStore::new();
1139 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1140
1141 let event = create_test_event(500, 1);
1142 let outputs = {
1143 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1144 operator.process(&event, &mut ctx)
1145 };
1146
1147 let event_count = outputs
1149 .iter()
1150 .filter(|o| matches!(o, Output::Event(_)))
1151 .count();
1152 assert_eq!(event_count, 1);
1153 }
1154
1155 #[test]
1156 fn test_session_emit_changelog() {
1157 let aggregator = CountAggregator::new();
1158 let mut operator = SessionWindowOperator::with_id(
1159 Duration::from_millis(1000),
1160 aggregator,
1161 Duration::from_millis(0),
1162 "test_op".to_string(),
1163 );
1164 operator.set_emit_strategy(EmitStrategy::Changelog);
1165
1166 let mut timers = TimerService::new();
1167 let mut state = InMemoryStore::new();
1168 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1169
1170 let event = create_test_event(500, 1);
1171 let outputs = {
1172 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1173 operator.process(&event, &mut ctx)
1174 };
1175
1176 let changelog_count = outputs
1178 .iter()
1179 .filter(|o| matches!(o, Output::Changelog(_)))
1180 .count();
1181 assert_eq!(changelog_count, 1);
1182 }
1183
1184 #[test]
1185 fn test_session_emit_final_drops_late() {
1186 let aggregator = CountAggregator::new();
1187 let mut operator = SessionWindowOperator::with_id(
1188 Duration::from_millis(1000),
1189 aggregator,
1190 Duration::from_millis(0),
1191 "test_op".to_string(),
1192 );
1193 operator.set_emit_strategy(EmitStrategy::Final);
1194
1195 let mut timers = TimerService::new();
1196 let mut state = InMemoryStore::new();
1197 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1198
1199 let event1 = create_test_event(10000, 1);
1201 {
1202 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1203 operator.process(&event1, &mut ctx);
1204 }
1205
1206 let late_event = create_test_event(100, 1);
1208 let outputs = {
1209 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1210 operator.process(&late_event, &mut ctx)
1211 };
1212
1213 assert!(outputs.is_empty());
1215 assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
1216 }
1217
1218 #[test]
1219 fn test_session_checkpoint_restore() {
1220 let aggregator = CountAggregator::new();
1221 let mut operator = SessionWindowOperator::with_id(
1222 Duration::from_millis(1000),
1223 aggregator.clone(),
1224 Duration::from_millis(0),
1225 "test_op".to_string(),
1226 );
1227
1228 let mut timers = TimerService::new();
1229 let mut state = InMemoryStore::new();
1230 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1231
1232 for ts in [100, 500] {
1234 let event = create_test_event(ts, 1);
1235 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1236 operator.process(&event, &mut ctx);
1237 }
1238
1239 let checkpoint = operator.checkpoint();
1241
1242 let mut restored = SessionWindowOperator::with_id(
1244 Duration::from_millis(1000),
1245 aggregator,
1246 Duration::from_millis(0),
1247 "test_op".to_string(),
1248 );
1249 restored.restore(checkpoint).unwrap();
1250
1251 assert_eq!(restored.pending_timers.len(), 1);
1253 }
1254
1255 #[test]
1256 fn test_session_stale_timer_ignored() {
1257 let aggregator = CountAggregator::new();
1258 let mut operator = SessionWindowOperator::with_id(
1259 Duration::from_millis(1000),
1260 aggregator,
1261 Duration::from_millis(0),
1262 "test_op".to_string(),
1263 );
1264
1265 let mut timers = TimerService::new();
1266 let mut state = InMemoryStore::new();
1267 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1268
1269 let event1 = create_test_event(500, 1);
1271 {
1272 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1273 operator.process(&event1, &mut ctx);
1274 }
1275
1276 let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
1277 let old_timer_time = *operator.pending_timers.get(&key_hash).unwrap();
1278
1279 let event2 = create_test_event(1200, 1);
1281 {
1282 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1283 operator.process(&event2, &mut ctx);
1284 }
1285
1286 let stale_timer = Timer {
1288 key: SessionWindowOperator::<CountAggregator>::timer_key(key_hash),
1289 timestamp: old_timer_time,
1290 };
1291
1292 let outputs = {
1293 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1294 operator.on_timer(stale_timer, &mut ctx)
1295 };
1296
1297 assert!(outputs.is_empty());
1299 assert_eq!(operator.active_session_count(), 1);
1301 }
1302
1303 #[test]
1304 fn test_session_window_id() {
1305 let state = SessionState::new(1000, 5000, vec![]);
1306 let window_id = state.window_id();
1307
1308 assert_eq!(window_id.start, 1000);
1309 assert_eq!(window_id.end, 6000);
1310 }
1311
1312 #[test]
1313 fn test_timer_key_roundtrip() {
1314 let key_hash = 0x1234_5678_9ABC_DEF0u64;
1315 let timer_key = SessionWindowOperator::<CountAggregator>::timer_key(key_hash);
1316 let parsed = SessionWindowOperator::<CountAggregator>::key_hash_from_timer(&timer_key);
1317
1318 assert_eq!(parsed, Some(key_hash));
1319 }
1320
1321 #[test]
1322 fn test_timer_key_invalid() {
1323 let invalid1 = vec![0x02, 0, 0, 0, 0, 0, 0, 0, 0];
1325 assert!(SessionWindowOperator::<CountAggregator>::key_hash_from_timer(&invalid1).is_none());
1326
1327 let invalid2 = vec![SESSION_TIMER_PREFIX, 0, 0, 0];
1329 assert!(SessionWindowOperator::<CountAggregator>::key_hash_from_timer(&invalid2).is_none());
1330 }
1331
1332 #[test]
1333 fn test_session_sum_aggregation() {
1334 let aggregator = SumAggregator::new(0);
1335 let mut operator = SessionWindowOperator::with_id(
1336 Duration::from_millis(1000),
1337 aggregator,
1338 Duration::from_millis(0),
1339 "test_op".to_string(),
1340 );
1341
1342 let mut timers = TimerService::new();
1343 let mut state = InMemoryStore::new();
1344 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1345
1346 for (ts, value) in [(100, 10), (500, 20), (800, 30)] {
1348 let event = create_test_event(ts, value);
1349 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1350 operator.process(&event, &mut ctx);
1351 }
1352
1353 let key_hash = SessionWindowOperator::<SumAggregator>::key_hash(&[]);
1355 let timer_time = *operator.pending_timers.get(&key_hash).unwrap();
1356 let timer = Timer {
1357 key: SessionWindowOperator::<SumAggregator>::timer_key(key_hash),
1358 timestamp: timer_time,
1359 };
1360
1361 let outputs = {
1362 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1363 operator.on_timer(timer, &mut ctx)
1364 };
1365
1366 match &outputs[0] {
1367 Output::Event(e) => {
1368 let result = e
1369 .data
1370 .column(2)
1371 .as_any()
1372 .downcast_ref::<Int64Array>()
1373 .unwrap()
1374 .value(0);
1375 assert_eq!(result, 60); }
1377 _ => panic!("Expected Event output"),
1378 }
1379 }
1380
1381 #[test]
1382 fn test_session_output_schema() {
1383 let schema = create_session_output_schema();
1384
1385 assert_eq!(schema.fields().len(), 3);
1386 assert_eq!(schema.field(0).name(), "session_start");
1387 assert_eq!(schema.field(1).name(), "session_end");
1388 assert_eq!(schema.field(2).name(), "result");
1389 }
1390}