1use super::window::{
50 Accumulator, Aggregator, ChangelogRecord, EmitStrategy, LateDataConfig, LateDataMetrics,
51 ResultToI64, WindowAssigner, WindowId, WindowIdVec,
52};
53use super::{
54 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
55};
56use crate::state::{StateStore, StateStoreExt};
57use arrow_array::{Int64Array, RecordBatch};
58use arrow_schema::{DataType, Field, Schema, SchemaRef};
59use rkyv::{
60 api::high::{HighDeserializer, HighSerializer, HighValidator},
61 bytecheck::CheckBytes,
62 rancor::Error as RkyvError,
63 ser::allocator::ArenaHandle,
64 util::AlignedVec,
65 Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
66};
67use std::marker::PhantomData;
68use std::sync::atomic::{AtomicU64, Ordering};
69use std::sync::Arc;
70use std::time::Duration;
71
72#[derive(Debug, Clone)]
104pub struct SlidingWindowAssigner {
105 size_ms: i64,
107 slide_ms: i64,
109 windows_per_event: usize,
111}
112
113impl SlidingWindowAssigner {
114 #[must_use]
128 pub fn new(size: Duration, slide: Duration) -> Self {
129 let size_ms = i64::try_from(size.as_millis()).expect("Window size must fit in i64");
131 let slide_ms = i64::try_from(slide.as_millis()).expect("Slide interval must fit in i64");
132
133 assert!(size_ms > 0, "Window size must be positive");
134 assert!(slide_ms > 0, "Slide interval must be positive");
135 assert!(
136 slide_ms <= size_ms,
137 "Slide must not exceed size (use tumbling windows for non-overlapping)"
138 );
139
140 let windows_per_event = usize::try_from((size_ms + slide_ms - 1) / slide_ms)
143 .expect("Windows per event should fit in usize");
144
145 Self {
146 size_ms,
147 slide_ms,
148 windows_per_event,
149 }
150 }
151
152 #[must_use]
158 #[allow(clippy::cast_sign_loss)]
159 pub fn from_millis(size_ms: i64, slide_ms: i64) -> Self {
160 assert!(size_ms > 0, "Window size must be positive");
161 assert!(slide_ms > 0, "Slide interval must be positive");
162 assert!(
163 slide_ms <= size_ms,
164 "Slide must not exceed size (use tumbling windows for non-overlapping)"
165 );
166
167 let windows_per_event =
169 usize::try_from((size_ms + slide_ms - 1) / slide_ms).unwrap_or(usize::MAX);
170
171 Self {
172 size_ms,
173 slide_ms,
174 windows_per_event,
175 }
176 }
177
178 #[must_use]
180 pub fn size_ms(&self) -> i64 {
181 self.size_ms
182 }
183
184 #[must_use]
186 pub fn slide_ms(&self) -> i64 {
187 self.slide_ms
188 }
189
190 #[must_use]
192 pub fn windows_per_event(&self) -> usize {
193 self.windows_per_event
194 }
195
196 #[inline]
200 fn last_window_start(&self, timestamp: i64) -> i64 {
201 if timestamp >= 0 {
202 (timestamp / self.slide_ms) * self.slide_ms
203 } else {
204 ((timestamp - self.slide_ms + 1) / self.slide_ms) * self.slide_ms
206 }
207 }
208}
209
210impl WindowAssigner for SlidingWindowAssigner {
211 #[inline]
215 fn assign_windows(&self, timestamp: i64) -> WindowIdVec {
216 let mut windows = WindowIdVec::new();
217
218 let last_start = self.last_window_start(timestamp);
220
221 let mut window_start = last_start;
223 while window_start + self.size_ms > timestamp {
224 let window_end = window_start + self.size_ms;
225 windows.push(WindowId::new(window_start, window_end));
226 window_start -= self.slide_ms;
227 }
228
229 windows.reverse();
231 windows
232 }
233
234 fn max_timestamp(&self, window_end: i64) -> i64 {
237 window_end - 1
238 }
239}
240
241const WINDOW_STATE_PREFIX: &[u8; 4] = b"slw:";
243
244const WINDOW_STATE_KEY_SIZE: usize = 4 + 16;
246
247static SLIDING_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
249
250fn create_window_output_schema() -> SchemaRef {
252 Arc::new(Schema::new(vec![
253 Field::new("window_start", DataType::Int64, false),
254 Field::new("window_end", DataType::Int64, false),
255 Field::new("result", DataType::Int64, false),
256 ]))
257}
258
259pub struct SlidingWindowOperator<A: Aggregator> {
291 assigner: SlidingWindowAssigner,
293 aggregator: A,
295 allowed_lateness_ms: i64,
297 registered_windows: std::collections::HashSet<WindowId>,
299 periodic_timer_windows: std::collections::HashSet<WindowId>,
301 emit_strategy: EmitStrategy,
303 late_data_config: LateDataConfig,
305 late_data_metrics: LateDataMetrics,
307 operator_id: String,
309 output_schema: SchemaRef,
311 _phantom: PhantomData<A::Acc>,
313}
314
315impl<A: Aggregator> SlidingWindowOperator<A>
316where
317 A::Acc: Archive + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
318 <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
319 + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
320{
321 pub fn new(assigner: SlidingWindowAssigner, aggregator: A, allowed_lateness: Duration) -> Self {
332 let operator_num = SLIDING_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
333 Self {
334 assigner,
335 aggregator,
336 allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
337 .expect("Allowed lateness must fit in i64"),
338 registered_windows: std::collections::HashSet::new(),
339 periodic_timer_windows: std::collections::HashSet::new(),
340 emit_strategy: EmitStrategy::default(),
341 late_data_config: LateDataConfig::default(),
342 late_data_metrics: LateDataMetrics::new(),
343 operator_id: format!("sliding_window_{operator_num}"),
344 output_schema: create_window_output_schema(),
345 _phantom: PhantomData,
346 }
347 }
348
349 pub fn with_id(
354 assigner: SlidingWindowAssigner,
355 aggregator: A,
356 allowed_lateness: Duration,
357 operator_id: String,
358 ) -> Self {
359 Self {
360 assigner,
361 aggregator,
362 allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
363 .expect("Allowed lateness must fit in i64"),
364 registered_windows: std::collections::HashSet::new(),
365 periodic_timer_windows: std::collections::HashSet::new(),
366 emit_strategy: EmitStrategy::default(),
367 late_data_config: LateDataConfig::default(),
368 late_data_metrics: LateDataMetrics::new(),
369 operator_id,
370 output_schema: create_window_output_schema(),
371 _phantom: PhantomData,
372 }
373 }
374
375 pub fn set_emit_strategy(&mut self, strategy: EmitStrategy) {
377 self.emit_strategy = strategy;
378 }
379
380 #[must_use]
382 pub fn emit_strategy(&self) -> &EmitStrategy {
383 &self.emit_strategy
384 }
385
386 pub fn set_late_data_config(&mut self, config: LateDataConfig) {
388 self.late_data_config = config;
389 }
390
391 #[must_use]
393 pub fn late_data_config(&self) -> &LateDataConfig {
394 &self.late_data_config
395 }
396
397 #[must_use]
399 pub fn late_data_metrics(&self) -> &LateDataMetrics {
400 &self.late_data_metrics
401 }
402
403 pub fn reset_late_data_metrics(&mut self) {
405 self.late_data_metrics.reset();
406 }
407
408 #[must_use]
410 pub fn assigner(&self) -> &SlidingWindowAssigner {
411 &self.assigner
412 }
413
414 #[must_use]
416 pub fn allowed_lateness_ms(&self) -> i64 {
417 self.allowed_lateness_ms
418 }
419
420 #[inline]
422 fn state_key(window_id: &WindowId) -> [u8; WINDOW_STATE_KEY_SIZE] {
423 let mut key = [0u8; WINDOW_STATE_KEY_SIZE];
424 key[..4].copy_from_slice(WINDOW_STATE_PREFIX);
425 let window_key = window_id.to_key_inline();
426 key[4..20].copy_from_slice(&window_key);
427 key
428 }
429
430 fn get_accumulator(&self, window_id: &WindowId, state: &dyn StateStore) -> A::Acc {
432 let key = Self::state_key(window_id);
433 state
434 .get_typed::<A::Acc>(&key)
435 .ok()
436 .flatten()
437 .unwrap_or_else(|| self.aggregator.create_accumulator())
438 }
439
440 fn put_accumulator(
442 window_id: &WindowId,
443 acc: &A::Acc,
444 state: &mut dyn StateStore,
445 ) -> Result<(), OperatorError> {
446 let key = Self::state_key(window_id);
447 state
448 .put_typed(&key, acc)
449 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
450 }
451
452 fn delete_accumulator(
454 window_id: &WindowId,
455 state: &mut dyn StateStore,
456 ) -> Result<(), OperatorError> {
457 let key = Self::state_key(window_id);
458 state
459 .delete(&key)
460 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
461 }
462
463 fn is_late(&self, event_time: i64, watermark: i64) -> bool {
468 let windows = self.assigner.assign_windows(event_time);
470
471 windows.iter().all(|window_id| {
473 let cleanup_time = window_id.end + self.allowed_lateness_ms;
474 watermark >= cleanup_time
475 })
476 }
477
478 fn maybe_register_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
480 if !self.registered_windows.contains(&window_id) {
481 let trigger_time = window_id.end + self.allowed_lateness_ms;
482 ctx.timers.register_timer(
483 trigger_time,
484 Some(window_id.to_key()),
485 Some(ctx.operator_index),
486 );
487 self.registered_windows.insert(window_id);
488 }
489 }
490
491 fn maybe_register_periodic_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
493 if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
494 if !self.periodic_timer_windows.contains(&window_id) {
495 let interval_ms =
496 i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
497 let trigger_time = ctx.processing_time + interval_ms;
498 let key = Self::periodic_timer_key(&window_id);
499 ctx.timers
500 .register_timer(trigger_time, Some(key), Some(ctx.operator_index));
501 self.periodic_timer_windows.insert(window_id);
502 }
503 }
504 }
505
506 #[inline]
508 fn periodic_timer_key(window_id: &WindowId) -> super::TimerKey {
509 let mut key = window_id.to_key();
510 if !key.is_empty() {
511 key[0] |= 0x80;
512 }
513 key
514 }
515
516 #[inline]
518 fn is_periodic_timer_key(key: &[u8]) -> bool {
519 !key.is_empty() && (key[0] & 0x80) != 0
520 }
521
522 #[inline]
524 fn window_id_from_periodic_key(key: &[u8]) -> Option<WindowId> {
525 if key.len() != 16 {
526 return None;
527 }
528 let mut clean_key = [0u8; 16];
529 clean_key.copy_from_slice(key);
530 clean_key[0] &= 0x7F;
531 WindowId::from_key(&clean_key)
532 }
533
534 fn create_intermediate_result(
536 &self,
537 window_id: &WindowId,
538 state: &dyn StateStore,
539 ) -> Option<Event> {
540 let acc = self.get_accumulator(window_id, state);
541
542 if acc.is_empty() {
543 return None;
544 }
545
546 let result = acc.result();
547 let result_i64 = result.to_i64();
548
549 let batch = RecordBatch::try_new(
550 Arc::clone(&self.output_schema),
551 vec![
552 Arc::new(Int64Array::from(vec![window_id.start])),
553 Arc::new(Int64Array::from(vec![window_id.end])),
554 Arc::new(Int64Array::from(vec![result_i64])),
555 ],
556 )
557 .ok()?;
558
559 Some(Event::new(window_id.end, batch))
560 }
561
562 fn handle_periodic_timer(
564 &mut self,
565 window_id: WindowId,
566 ctx: &mut OperatorContext,
567 ) -> OutputVec {
568 let mut output = OutputVec::new();
569
570 if !self.registered_windows.contains(&window_id) {
571 self.periodic_timer_windows.remove(&window_id);
572 return output;
573 }
574
575 if let Some(event) = self.create_intermediate_result(&window_id, ctx.state) {
576 output.push(Output::Event(event));
577 }
578
579 if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
580 let interval_ms =
581 i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
582 let next_trigger = ctx.processing_time + interval_ms;
583 let window_close_time = window_id.end + self.allowed_lateness_ms;
584 if next_trigger < window_close_time {
585 let key = Self::periodic_timer_key(&window_id);
586 ctx.timers
587 .register_timer(next_trigger, Some(key), Some(ctx.operator_index));
588 }
589 }
590
591 output
592 }
593}
594
595impl<A: Aggregator> Operator for SlidingWindowOperator<A>
596where
597 A::Acc: 'static
598 + Archive
599 + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
600 <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
601 + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
602{
603 fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
604 let event_time = event.timestamp;
605
606 let emitted_watermark = ctx.watermark_generator.on_event(event_time);
608
609 let current_wm = ctx.watermark_generator.current_watermark();
611 if current_wm > i64::MIN && self.is_late(event_time, current_wm) {
612 let mut output = OutputVec::new();
613
614 if self.emit_strategy.drops_late_data() {
616 self.late_data_metrics.record_dropped();
617 return output; }
619
620 if let Some(side_output_name) = self.late_data_config.side_output() {
621 self.late_data_metrics.record_side_output();
622 output.push(Output::SideOutput {
623 name: side_output_name.to_string(),
624 event: event.clone(),
625 });
626 } else {
627 self.late_data_metrics.record_dropped();
628 output.push(Output::LateEvent(event.clone()));
629 }
630 return output;
631 }
632
633 let windows = self.assigner.assign_windows(event_time);
635
636 let mut updated_windows = Vec::new();
638
639 for window_id in &windows {
641 let cleanup_time = window_id.end + self.allowed_lateness_ms;
643 if current_wm > i64::MIN && current_wm >= cleanup_time {
644 continue;
645 }
646
647 if let Some(value) = self.aggregator.extract(event) {
649 let mut acc = self.get_accumulator(window_id, ctx.state);
650 acc.add(value);
651 if Self::put_accumulator(window_id, &acc, ctx.state).is_ok() {
652 updated_windows.push(*window_id);
653 }
654 }
655
656 self.maybe_register_timer(*window_id, ctx);
658
659 if !self.emit_strategy.suppresses_intermediate() {
661 self.maybe_register_periodic_timer(*window_id, ctx);
662 }
663 }
664
665 let mut output = OutputVec::new();
667
668 if let Some(wm) = emitted_watermark {
670 output.push(Output::Watermark(wm.timestamp()));
671 }
672
673 if !updated_windows.is_empty() {
675 match &self.emit_strategy {
676 EmitStrategy::OnUpdate => {
678 for window_id in &updated_windows {
679 if let Some(event) = self.create_intermediate_result(window_id, ctx.state) {
680 output.push(Output::Event(event));
681 }
682 }
683 }
684 EmitStrategy::Changelog => {
686 for window_id in &updated_windows {
687 if let Some(event) = self.create_intermediate_result(window_id, ctx.state) {
688 let record = ChangelogRecord::insert(event, ctx.processing_time);
689 output.push(Output::Changelog(record));
690 }
691 }
692 }
693 EmitStrategy::OnWatermark
695 | EmitStrategy::Periodic(_)
696 | EmitStrategy::OnWindowClose
697 | EmitStrategy::Final => {}
698 }
699 }
700
701 output
702 }
703
704 fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
705 if Self::is_periodic_timer_key(&timer.key) {
707 if self.emit_strategy.suppresses_intermediate() {
709 if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
711 self.periodic_timer_windows.remove(&window_id);
712 }
713 return OutputVec::new();
714 }
715
716 if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
717 return self.handle_periodic_timer(window_id, ctx);
718 }
719 return OutputVec::new();
720 }
721
722 let Some(window_id) = WindowId::from_key(&timer.key) else {
724 return OutputVec::new();
725 };
726
727 let acc = self.get_accumulator(&window_id, ctx.state);
729
730 if acc.is_empty() {
732 let _ = Self::delete_accumulator(&window_id, ctx.state);
733 self.registered_windows.remove(&window_id);
734 self.periodic_timer_windows.remove(&window_id);
735 return OutputVec::new();
736 }
737
738 let result = acc.result();
740
741 let _ = Self::delete_accumulator(&window_id, ctx.state);
743 self.registered_windows.remove(&window_id);
744 self.periodic_timer_windows.remove(&window_id);
745
746 let result_i64 = result.to_i64();
748
749 let batch = RecordBatch::try_new(
751 Arc::clone(&self.output_schema),
752 vec![
753 Arc::new(Int64Array::from(vec![window_id.start])),
754 Arc::new(Int64Array::from(vec![window_id.end])),
755 Arc::new(Int64Array::from(vec![result_i64])),
756 ],
757 );
758
759 let mut output = OutputVec::new();
760 match batch {
761 Ok(data) => {
762 let event = Event::new(window_id.end, data);
763
764 match &self.emit_strategy {
766 EmitStrategy::Changelog => {
768 let record = ChangelogRecord::insert(event, ctx.processing_time);
769 output.push(Output::Changelog(record));
770 }
771 EmitStrategy::OnWatermark
773 | EmitStrategy::Periodic(_)
774 | EmitStrategy::OnUpdate
775 | EmitStrategy::OnWindowClose
776 | EmitStrategy::Final => {
777 output.push(Output::Event(event));
778 }
779 }
780 }
781 Err(e) => {
782 tracing::error!("Failed to create output batch: {e}");
783 }
784 }
785 output
786 }
787
788 fn checkpoint(&self) -> OperatorState {
789 let windows: Vec<_> = self.registered_windows.iter().copied().collect();
790 let periodic_windows: Vec<_> = self.periodic_timer_windows.iter().copied().collect();
791
792 let checkpoint_data = (windows, periodic_windows);
793 let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
794 .map(|v| v.to_vec())
795 .unwrap_or_default();
796
797 OperatorState {
798 operator_id: self.operator_id.clone(),
799 data,
800 }
801 }
802
803 fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
804 if state.operator_id != self.operator_id {
805 return Err(OperatorError::StateAccessFailed(format!(
806 "Operator ID mismatch: expected {}, got {}",
807 self.operator_id, state.operator_id
808 )));
809 }
810
811 if let Ok(archived) =
813 rkyv::access::<rkyv::Archived<(Vec<WindowId>, Vec<WindowId>)>, RkyvError>(&state.data)
814 {
815 if let Ok((windows, periodic_windows)) =
816 rkyv::deserialize::<(Vec<WindowId>, Vec<WindowId>), RkyvError>(archived)
817 {
818 self.registered_windows = windows.into_iter().collect();
819 self.periodic_timer_windows = periodic_windows.into_iter().collect();
820 return Ok(());
821 }
822 }
823
824 let archived = rkyv::access::<rkyv::Archived<Vec<WindowId>>, RkyvError>(&state.data)
826 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
827 let windows: Vec<WindowId> = rkyv::deserialize::<Vec<WindowId>, RkyvError>(archived)
828 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
829
830 self.registered_windows = windows.into_iter().collect();
831 self.periodic_timer_windows = std::collections::HashSet::new();
832 Ok(())
833 }
834}
835
836#[cfg(test)]
837mod tests {
838 use super::*;
839 use crate::operator::window::{CountAccumulator, CountAggregator, SumAggregator};
840 use crate::state::InMemoryStore;
841 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
842 use arrow_array::{Int64Array, RecordBatch};
843 use arrow_schema::{DataType, Field, Schema};
844
845 fn create_test_event(timestamp: i64, value: i64) -> Event {
846 let schema = Arc::new(Schema::new(vec![Field::new(
847 "value",
848 DataType::Int64,
849 false,
850 )]));
851 let batch =
852 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
853 Event::new(timestamp, batch)
854 }
855
856 fn create_test_context<'a>(
857 timers: &'a mut TimerService,
858 state: &'a mut dyn StateStore,
859 watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
860 ) -> OperatorContext<'a> {
861 OperatorContext {
862 event_time: 0,
863 processing_time: 0,
864 timers,
865 state,
866 watermark_generator: watermark_gen,
867 operator_index: 0,
868 }
869 }
870
871 #[test]
872 fn test_sliding_assigner_creation() {
873 let assigner = SlidingWindowAssigner::new(Duration::from_secs(60), Duration::from_secs(20));
874
875 assert_eq!(assigner.size_ms(), 60_000);
876 assert_eq!(assigner.slide_ms(), 20_000);
877 assert_eq!(assigner.windows_per_event(), 3); }
879
880 #[test]
881 fn test_sliding_assigner_from_millis() {
882 let assigner = SlidingWindowAssigner::from_millis(1000, 200);
883
884 assert_eq!(assigner.size_ms(), 1000);
885 assert_eq!(assigner.slide_ms(), 200);
886 assert_eq!(assigner.windows_per_event(), 5); }
888
889 #[test]
890 #[should_panic(expected = "Window size must be positive")]
891 fn test_sliding_assigner_zero_size() {
892 let _ = SlidingWindowAssigner::from_millis(0, 100);
893 }
894
895 #[test]
896 #[should_panic(expected = "Slide interval must be positive")]
897 fn test_sliding_assigner_zero_slide() {
898 let _ = SlidingWindowAssigner::from_millis(1000, 0);
899 }
900
901 #[test]
902 #[should_panic(expected = "Slide must not exceed size")]
903 fn test_sliding_assigner_slide_exceeds_size() {
904 let _ = SlidingWindowAssigner::from_millis(100, 200);
905 }
906
907 #[test]
908 fn test_sliding_assigner_basic_assignment() {
909 let assigner = SlidingWindowAssigner::from_millis(60_000, 20_000);
911
912 let windows = assigner.assign_windows(50_000);
914
915 assert_eq!(windows.len(), 3);
916
917 assert!(windows.contains(&WindowId::new(0, 60_000)));
927 assert!(windows.contains(&WindowId::new(20_000, 80_000)));
928 assert!(windows.contains(&WindowId::new(40_000, 100_000)));
929 }
930
931 #[test]
932 fn test_sliding_assigner_boundary_event() {
933 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
934
935 let windows = assigner.assign_windows(1000);
937
938 assert_eq!(windows.len(), 2);
943 assert!(windows.contains(&WindowId::new(500, 1500)));
944 assert!(windows.contains(&WindowId::new(1000, 2000)));
945 }
946
947 #[test]
948 fn test_sliding_assigner_negative_timestamp() {
949 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
950
951 let windows = assigner.assign_windows(-500);
953
954 assert_eq!(windows.len(), 2);
960 assert!(windows.contains(&WindowId::new(-1000, 0)));
961 assert!(windows.contains(&WindowId::new(-500, 500)));
962 }
963
964 #[test]
965 fn test_sliding_assigner_equal_size_and_slide() {
966 let assigner = SlidingWindowAssigner::from_millis(1000, 1000);
968
969 assert_eq!(assigner.windows_per_event(), 1);
970
971 let windows = assigner.assign_windows(500);
972 assert_eq!(windows.len(), 1);
973 assert_eq!(windows[0], WindowId::new(0, 1000));
974 }
975
976 #[test]
977 fn test_sliding_assigner_small_slide() {
978 let assigner = SlidingWindowAssigner::from_millis(1000, 100);
980
981 assert_eq!(assigner.windows_per_event(), 10);
982
983 let windows = assigner.assign_windows(500);
984 assert_eq!(windows.len(), 10);
985 }
986
987 #[test]
988 fn test_sliding_operator_creation() {
989 let assigner = SlidingWindowAssigner::from_millis(1000, 200);
990 let aggregator = CountAggregator::new();
991 let operator = SlidingWindowOperator::new(assigner, aggregator, Duration::from_millis(100));
992
993 assert_eq!(operator.allowed_lateness_ms(), 100);
994 assert_eq!(*operator.emit_strategy(), EmitStrategy::OnWatermark);
995 assert!(operator.late_data_config().should_drop());
996 }
997
998 #[test]
999 fn test_sliding_operator_with_id() {
1000 let assigner = SlidingWindowAssigner::from_millis(1000, 200);
1001 let aggregator = CountAggregator::new();
1002 let operator = SlidingWindowOperator::with_id(
1003 assigner,
1004 aggregator,
1005 Duration::from_millis(0),
1006 "test_sliding".to_string(),
1007 );
1008
1009 assert_eq!(operator.operator_id, "test_sliding");
1010 }
1011
1012 #[test]
1013 fn test_sliding_operator_process_single_event() {
1014 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1015 let aggregator = CountAggregator::new();
1016 let mut operator = SlidingWindowOperator::with_id(
1017 assigner,
1018 aggregator,
1019 Duration::from_millis(0),
1020 "test_op".to_string(),
1021 );
1022
1023 let mut timers = TimerService::new();
1024 let mut state = InMemoryStore::new();
1025 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1026
1027 let event = create_test_event(600, 1);
1030 {
1031 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1032 operator.process(&event, &mut ctx);
1033 }
1034
1035 assert_eq!(operator.registered_windows.len(), 2);
1037 assert!(operator
1038 .registered_windows
1039 .contains(&WindowId::new(0, 1000)));
1040 assert!(operator
1041 .registered_windows
1042 .contains(&WindowId::new(500, 1500)));
1043 }
1044
1045 #[test]
1046 fn test_sliding_operator_accumulates_correctly() {
1047 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1048 let aggregator = CountAggregator::new();
1049 let mut operator = SlidingWindowOperator::with_id(
1050 assigner.clone(),
1051 aggregator,
1052 Duration::from_millis(0),
1053 "test_op".to_string(),
1054 );
1055
1056 let mut timers = TimerService::new();
1057 let mut state = InMemoryStore::new();
1058 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1059
1060 for ts in [100, 600, 800] {
1065 let event = create_test_event(ts, 1);
1066 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1067 operator.process(&event, &mut ctx);
1068 }
1069
1070 let window_0_1000 = WindowId::new(0, 1000);
1072 let acc: CountAccumulator = operator.get_accumulator(&window_0_1000, &state);
1073 assert_eq!(acc.result(), 3);
1074
1075 let window_500_1500 = WindowId::new(500, 1500);
1077 let acc: CountAccumulator = operator.get_accumulator(&window_500_1500, &state);
1078 assert_eq!(acc.result(), 2);
1079 }
1080
1081 #[test]
1082 fn test_sliding_operator_window_trigger() {
1083 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1084 let aggregator = CountAggregator::new();
1085 let mut operator = SlidingWindowOperator::with_id(
1086 assigner,
1087 aggregator,
1088 Duration::from_millis(0),
1089 "test_op".to_string(),
1090 );
1091
1092 let mut timers = TimerService::new();
1093 let mut state = InMemoryStore::new();
1094 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1095
1096 for ts in [100, 200, 300] {
1098 let event = create_test_event(ts, 1);
1099 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1100 operator.process(&event, &mut ctx);
1101 }
1102
1103 let timer = Timer {
1105 key: WindowId::new(0, 1000).to_key(),
1106 timestamp: 1000,
1107 };
1108
1109 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1110 let outputs = operator.on_timer(timer, &mut ctx);
1111
1112 assert_eq!(outputs.len(), 1);
1113 match &outputs[0] {
1114 Output::Event(event) => {
1115 assert_eq!(event.timestamp, 1000);
1116 let result_col = event.data.column(2);
1117 let result_array = result_col.as_any().downcast_ref::<Int64Array>().unwrap();
1118 assert_eq!(result_array.value(0), 3);
1119 }
1120 _ => panic!("Expected Event output"),
1121 }
1122
1123 assert!(!operator
1125 .registered_windows
1126 .contains(&WindowId::new(0, 1000)));
1127 }
1128
1129 #[test]
1130 fn test_sliding_operator_multiple_window_triggers() {
1131 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1132 let aggregator = SumAggregator::new(0);
1133 let mut operator = SlidingWindowOperator::with_id(
1134 assigner,
1135 aggregator,
1136 Duration::from_millis(0),
1137 "test_op".to_string(),
1138 );
1139
1140 let mut timers = TimerService::new();
1141 let mut state = InMemoryStore::new();
1142 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1143
1144 let event = create_test_event(600, 10);
1147 {
1148 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1149 operator.process(&event, &mut ctx);
1150 }
1151
1152 let t1 = Timer {
1154 key: WindowId::new(0, 1000).to_key(),
1155 timestamp: 1000,
1156 };
1157 let outputs1 = {
1158 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1159 operator.on_timer(t1, &mut ctx)
1160 };
1161
1162 assert_eq!(outputs1.len(), 1);
1163 if let Output::Event(e) = &outputs1[0] {
1164 let result = e
1165 .data
1166 .column(2)
1167 .as_any()
1168 .downcast_ref::<Int64Array>()
1169 .unwrap()
1170 .value(0);
1171 assert_eq!(result, 10);
1172 }
1173
1174 let t2 = Timer {
1176 key: WindowId::new(500, 1500).to_key(),
1177 timestamp: 1500,
1178 };
1179 let outputs2 = {
1180 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1181 operator.on_timer(t2, &mut ctx)
1182 };
1183
1184 assert_eq!(outputs2.len(), 1);
1185 if let Output::Event(e) = &outputs2[0] {
1186 let result = e
1187 .data
1188 .column(2)
1189 .as_any()
1190 .downcast_ref::<Int64Array>()
1191 .unwrap()
1192 .value(0);
1193 assert_eq!(result, 10);
1194 }
1195
1196 assert!(operator.registered_windows.is_empty());
1198 }
1199
1200 #[test]
1201 fn test_sliding_operator_late_event() {
1202 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1203 let aggregator = CountAggregator::new();
1204 let mut operator = SlidingWindowOperator::with_id(
1205 assigner,
1206 aggregator,
1207 Duration::from_millis(0),
1208 "test_op".to_string(),
1209 );
1210
1211 let mut timers = TimerService::new();
1212 let mut state = InMemoryStore::new();
1213 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1214
1215 let event1 = create_test_event(2000, 1);
1217 {
1218 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1219 operator.process(&event1, &mut ctx);
1220 }
1221
1222 let late_event = create_test_event(500, 2);
1224 let outputs = {
1225 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1226 operator.process(&late_event, &mut ctx)
1227 };
1228
1229 let is_late = outputs.iter().any(|o| matches!(o, Output::LateEvent(_)));
1231 assert!(is_late);
1232 assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
1233 }
1234
1235 #[test]
1236 fn test_sliding_operator_late_event_side_output() {
1237 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1238 let aggregator = CountAggregator::new();
1239 let mut operator = SlidingWindowOperator::with_id(
1240 assigner,
1241 aggregator,
1242 Duration::from_millis(0),
1243 "test_op".to_string(),
1244 );
1245
1246 operator.set_late_data_config(LateDataConfig::with_side_output("late".to_string()));
1247
1248 let mut timers = TimerService::new();
1249 let mut state = InMemoryStore::new();
1250 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1251
1252 let event1 = create_test_event(2000, 1);
1254 {
1255 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1256 operator.process(&event1, &mut ctx);
1257 }
1258
1259 let late_event = create_test_event(500, 2);
1261 let outputs = {
1262 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1263 operator.process(&late_event, &mut ctx)
1264 };
1265
1266 let side_output = outputs.iter().find_map(|o| {
1268 if let Output::SideOutput { name, .. } = o {
1269 Some(name.clone())
1270 } else {
1271 None
1272 }
1273 });
1274 assert_eq!(side_output, Some("late".to_string()));
1275 assert_eq!(operator.late_data_metrics().late_events_side_output(), 1);
1276 }
1277
1278 #[test]
1279 fn test_sliding_operator_emit_on_update() {
1280 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1281 let aggregator = CountAggregator::new();
1282 let mut operator = SlidingWindowOperator::with_id(
1283 assigner,
1284 aggregator,
1285 Duration::from_millis(0),
1286 "test_op".to_string(),
1287 );
1288
1289 operator.set_emit_strategy(EmitStrategy::OnUpdate);
1290
1291 let mut timers = TimerService::new();
1292 let mut state = InMemoryStore::new();
1293 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1294
1295 let event = create_test_event(600, 1);
1297 let outputs = {
1298 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1299 operator.process(&event, &mut ctx)
1300 };
1301
1302 let event_count = outputs
1304 .iter()
1305 .filter(|o| matches!(o, Output::Event(_)))
1306 .count();
1307 assert_eq!(event_count, 2);
1308 }
1309
1310 #[test]
1311 fn test_sliding_operator_checkpoint_restore() {
1312 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1313 let aggregator = CountAggregator::new();
1314 let mut operator = SlidingWindowOperator::with_id(
1315 assigner.clone(),
1316 aggregator.clone(),
1317 Duration::from_millis(0),
1318 "test_op".to_string(),
1319 );
1320
1321 operator.registered_windows.insert(WindowId::new(0, 1000));
1323 operator.registered_windows.insert(WindowId::new(500, 1500));
1324 operator
1325 .periodic_timer_windows
1326 .insert(WindowId::new(0, 1000));
1327
1328 let checkpoint = operator.checkpoint();
1330
1331 let mut restored = SlidingWindowOperator::with_id(
1333 assigner,
1334 aggregator,
1335 Duration::from_millis(0),
1336 "test_op".to_string(),
1337 );
1338 restored.restore(checkpoint).unwrap();
1339
1340 assert_eq!(restored.registered_windows.len(), 2);
1341 assert_eq!(restored.periodic_timer_windows.len(), 1);
1342 assert!(restored
1343 .registered_windows
1344 .contains(&WindowId::new(0, 1000)));
1345 assert!(restored
1346 .registered_windows
1347 .contains(&WindowId::new(500, 1500)));
1348 assert!(restored
1349 .periodic_timer_windows
1350 .contains(&WindowId::new(0, 1000)));
1351 }
1352
1353 #[test]
1354 fn test_sliding_operator_empty_window_trigger() {
1355 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1356 let aggregator = CountAggregator::new();
1357 let mut operator = SlidingWindowOperator::with_id(
1358 assigner,
1359 aggregator,
1360 Duration::from_millis(0),
1361 "test_op".to_string(),
1362 );
1363
1364 let mut timers = TimerService::new();
1365 let mut state = InMemoryStore::new();
1366 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1367
1368 let timer = Timer {
1370 key: WindowId::new(0, 1000).to_key(),
1371 timestamp: 1000,
1372 };
1373
1374 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1375 let outputs = operator.on_timer(timer, &mut ctx);
1376
1377 assert!(outputs.is_empty());
1379 }
1380
1381 #[test]
1382 fn test_sliding_operator_periodic_timer_key() {
1383 let window_id = WindowId::new(1000, 2000);
1384
1385 let periodic_key = SlidingWindowOperator::<CountAggregator>::periodic_timer_key(&window_id);
1386 assert!(SlidingWindowOperator::<CountAggregator>::is_periodic_timer_key(&periodic_key));
1387
1388 let extracted =
1389 SlidingWindowOperator::<CountAggregator>::window_id_from_periodic_key(&periodic_key);
1390 assert_eq!(extracted, Some(window_id));
1391
1392 let regular_key = window_id.to_key();
1394 assert!(!SlidingWindowOperator::<CountAggregator>::is_periodic_timer_key(®ular_key));
1395 }
1396
1397 #[test]
1398 fn test_sliding_operator_skips_closed_windows() {
1399 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1400 let aggregator = CountAggregator::new();
1401 let mut operator = SlidingWindowOperator::with_id(
1402 assigner,
1403 aggregator,
1404 Duration::from_millis(0),
1405 "test_op".to_string(),
1406 );
1407
1408 let mut timers = TimerService::new();
1409 let mut state = InMemoryStore::new();
1410 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1411
1412 let event1 = create_test_event(1100, 1);
1414 {
1415 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1416 operator.process(&event1, &mut ctx);
1417 }
1418
1419 let event2 = create_test_event(800, 1);
1422 {
1423 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1424 operator.process(&event2, &mut ctx);
1425 }
1426
1427 assert!(!operator
1429 .registered_windows
1430 .contains(&WindowId::new(0, 1000)));
1431 assert!(operator
1432 .registered_windows
1433 .contains(&WindowId::new(500, 1500)));
1434 }
1435
1436 #[test]
1437 fn test_sliding_assigner_window_assigner_trait() {
1438 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1439
1440 let windows = assigner.assign_windows(600);
1442 assert_eq!(windows.len(), 2);
1443
1444 assert_eq!(assigner.max_timestamp(1000), 999);
1446 }
1447
1448 #[test]
1449 fn test_sliding_operator_allowed_lateness() {
1450 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1451 let aggregator = CountAggregator::new();
1452 let mut operator = SlidingWindowOperator::with_id(
1453 assigner,
1454 aggregator,
1455 Duration::from_millis(500), "test_op".to_string(),
1457 );
1458
1459 let mut timers = TimerService::new();
1460 let mut state = InMemoryStore::new();
1461 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1462
1463 let event1 = create_test_event(1200, 1);
1465 {
1466 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1467 operator.process(&event1, &mut ctx);
1468 }
1469
1470 let event2 = create_test_event(800, 1);
1473 let outputs = {
1474 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1475 operator.process(&event2, &mut ctx)
1476 };
1477
1478 let is_late = outputs
1480 .iter()
1481 .any(|o| matches!(o, Output::LateEvent(_) | Output::SideOutput { .. }));
1482 assert!(!is_late);
1483 assert_eq!(operator.late_data_metrics().late_events_total(), 0);
1484 }
1485}