Skip to main content

laminar_core/operator/
sliding_window.rs

1//! # Sliding Window Operators
2//!
3//! Implementation of sliding (hopping) windows for stream processing.
4//!
5//! Sliding windows are fixed-size windows that overlap. Each event can belong
6//! to multiple windows. The window size defines the duration, and the slide
7//! defines how much each window advances.
8//!
9//! ## Example
10//!
11//! ```text
12//! Window size: 1 hour, Slide: 15 minutes
13//!
14//! Window 1: [00:00, 01:00)
15//! Window 2: [00:15, 01:15)
16//! Window 3: [00:30, 01:30)
17//! Window 4: [00:45, 01:45)
18//!
19//! An event at 00:40 belongs to windows 1, 2, 3
20//! ```
21//!
22//! ## Performance
23//!
24//! - Each event is assigned to `ceil(size / slide)` windows
25//! - Uses `SmallVec<[WindowId; 4]>` to avoid heap allocation for common cases
26//! - State is stored per-window, so memory usage scales with active windows
27//!
28//! ## Usage
29//!
30//! ```rust,no_run
31//! use laminar_core::operator::sliding_window::{
32//!     SlidingWindowAssigner, SlidingWindowOperator,
33//! };
34//! use laminar_core::operator::window::CountAggregator;
35//! use std::time::Duration;
36//!
37//! // Create a 1-hour sliding window with 15-minute slide
38//! let assigner = SlidingWindowAssigner::new(
39//!     Duration::from_secs(3600),  // 1 hour window
40//!     Duration::from_secs(900),   // 15 minute slide
41//! );
42//! let operator = SlidingWindowOperator::new(
43//!     assigner,
44//!     CountAggregator::new(),
45//!     Duration::from_secs(60), // 1 minute grace period
46//! );
47//! ```
48
49use super::window::{
50    Accumulator, Aggregator, ChangelogRecord, EmitStrategy, LateDataConfig, LateDataMetrics,
51    ResultToI64, WindowAssigner, WindowId, WindowIdVec,
52};
53use super::{
54    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
55};
56use crate::state::{StateStore, StateStoreExt};
57use arrow_array::{Int64Array, RecordBatch};
58use arrow_schema::{DataType, Field, Schema, SchemaRef};
59use rkyv::{
60    api::high::{HighDeserializer, HighSerializer, HighValidator},
61    bytecheck::CheckBytes,
62    rancor::Error as RkyvError,
63    ser::allocator::ArenaHandle,
64    util::AlignedVec,
65    Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
66};
67use std::marker::PhantomData;
68use std::sync::atomic::{AtomicU64, Ordering};
69use std::sync::Arc;
70use std::time::Duration;
71
72/// Sliding window assigner.
73///
74/// Assigns each event to multiple overlapping windows based on its timestamp.
75/// Windows are aligned to epoch (timestamp 0).
76///
77/// # Parameters
78///
79/// - `size_ms`: The duration of each window in milliseconds
80/// - `slide_ms`: The advance interval between windows in milliseconds
81///
82/// # Window Assignment
83///
84/// An event at timestamp `t` belongs to all windows `[start, start + size)`
85/// where `start <= t < start + size` and `start` is a multiple of `slide`.
86///
87/// The number of windows per event is `ceil(size / slide)`.
88///
89/// # Example
90///
91/// ```rust,no_run
92/// use laminar_core::operator::sliding_window::SlidingWindowAssigner;
93/// use std::time::Duration;
94///
95/// // 1-minute window with 20-second slide
96/// let assigner = SlidingWindowAssigner::new(
97///     Duration::from_secs(60),
98///     Duration::from_secs(20),
99/// );
100///
101/// // Event at t=50 belongs to windows: [0,60), [20,80), [40,100)
102/// ```
103#[derive(Debug, Clone)]
104pub struct SlidingWindowAssigner {
105    /// Window size in milliseconds
106    size_ms: i64,
107    /// Slide interval in milliseconds
108    slide_ms: i64,
109    /// Number of windows per event (cached for performance)
110    windows_per_event: usize,
111}
112
113impl SlidingWindowAssigner {
114    /// Creates a new sliding window assigner.
115    ///
116    /// # Arguments
117    ///
118    /// * `size` - The duration of each window
119    /// * `slide` - The advance interval between windows
120    ///
121    /// # Panics
122    ///
123    /// Panics if:
124    /// - Size is zero or negative
125    /// - Slide is zero or negative
126    /// - Slide is greater than size (use tumbling windows instead)
127    #[must_use]
128    pub fn new(size: Duration, slide: Duration) -> Self {
129        // Ensure size and slide fit in i64
130        let size_ms = i64::try_from(size.as_millis()).expect("Window size must fit in i64");
131        let slide_ms = i64::try_from(slide.as_millis()).expect("Slide interval must fit in i64");
132
133        assert!(size_ms > 0, "Window size must be positive");
134        assert!(slide_ms > 0, "Slide interval must be positive");
135        assert!(
136            slide_ms <= size_ms,
137            "Slide must not exceed size (use tumbling windows for non-overlapping)"
138        );
139
140        // Calculate the number of windows each event belongs to
141        // This is ceil(size / slide)
142        let windows_per_event = usize::try_from((size_ms + slide_ms - 1) / slide_ms)
143            .expect("Windows per event should fit in usize");
144
145        Self {
146            size_ms,
147            slide_ms,
148            windows_per_event,
149        }
150    }
151
152    /// Creates a new sliding window assigner with sizes in milliseconds.
153    ///
154    /// # Panics
155    ///
156    /// Panics if size or slide is zero/negative, or if slide > size.
157    #[must_use]
158    #[allow(clippy::cast_sign_loss)]
159    pub fn from_millis(size_ms: i64, slide_ms: i64) -> Self {
160        assert!(size_ms > 0, "Window size must be positive");
161        assert!(slide_ms > 0, "Slide interval must be positive");
162        assert!(
163            slide_ms <= size_ms,
164            "Slide must not exceed size (use tumbling windows for non-overlapping)"
165        );
166
167        // Truncation is acceptable: number of windows per event will never exceed reasonable limits
168        let windows_per_event =
169            usize::try_from((size_ms + slide_ms - 1) / slide_ms).unwrap_or(usize::MAX);
170
171        Self {
172            size_ms,
173            slide_ms,
174            windows_per_event,
175        }
176    }
177
178    /// Returns the window size in milliseconds.
179    #[must_use]
180    pub fn size_ms(&self) -> i64 {
181        self.size_ms
182    }
183
184    /// Returns the slide interval in milliseconds.
185    #[must_use]
186    pub fn slide_ms(&self) -> i64 {
187        self.slide_ms
188    }
189
190    /// Returns the number of windows each event belongs to.
191    #[must_use]
192    pub fn windows_per_event(&self) -> usize {
193        self.windows_per_event
194    }
195
196    /// Computes the last window start that could contain this timestamp.
197    ///
198    /// This is the window with the largest start time where start <= timestamp.
199    #[inline]
200    fn last_window_start(&self, timestamp: i64) -> i64 {
201        if timestamp >= 0 {
202            (timestamp / self.slide_ms) * self.slide_ms
203        } else {
204            // For negative timestamps, use floor division
205            ((timestamp - self.slide_ms + 1) / self.slide_ms) * self.slide_ms
206        }
207    }
208}
209
210impl WindowAssigner for SlidingWindowAssigner {
211    /// Assigns a timestamp to all overlapping windows.
212    ///
213    /// Returns windows in order from earliest to latest start time.
214    #[inline]
215    fn assign_windows(&self, timestamp: i64) -> WindowIdVec {
216        let mut windows = WindowIdVec::new();
217
218        // Find the last window that could contain this timestamp
219        let last_start = self.last_window_start(timestamp);
220
221        // Walk backwards through all windows that contain this timestamp
222        let mut window_start = last_start;
223        while window_start + self.size_ms > timestamp {
224            let window_end = window_start + self.size_ms;
225            windows.push(WindowId::new(window_start, window_end));
226            window_start -= self.slide_ms;
227        }
228
229        // Reverse to get windows in chronological order (earliest first)
230        windows.reverse();
231        windows
232    }
233
234    /// Returns the maximum timestamp that could still be assigned to a window
235    /// ending at `window_end`.
236    fn max_timestamp(&self, window_end: i64) -> i64 {
237        window_end - 1
238    }
239}
240
241/// State key prefix for window accumulators (4 bytes)
242const WINDOW_STATE_PREFIX: &[u8; 4] = b"slw:";
243
244/// Total size of window state key: prefix (4) + `WindowId` (16) = 20 bytes
245const WINDOW_STATE_KEY_SIZE: usize = 4 + 16;
246
247/// Static counter for generating unique operator IDs without allocation.
248static SLIDING_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
249
250/// Creates the standard window output schema.
251fn create_window_output_schema() -> SchemaRef {
252    Arc::new(Schema::new(vec![
253        Field::new("window_start", DataType::Int64, false),
254        Field::new("window_end", DataType::Int64, false),
255        Field::new("result", DataType::Int64, false),
256    ]))
257}
258
259/// Sliding window operator.
260///
261/// Processes events through overlapping, fixed-size time windows.
262/// Each event is assigned to multiple windows based on the slide interval.
263/// Results are emitted based on the configured [`EmitStrategy`].
264///
265/// # Emit Strategies
266///
267/// - `OnWatermark` (default): Emit when watermark passes window end
268/// - `Periodic`: Emit intermediate results at intervals, final on watermark
269/// - `OnUpdate`: Emit after every state update (can produce many outputs)
270///
271/// # Late Data Handling
272///
273/// Events that arrive after `window_end + allowed_lateness` are considered late.
274/// Their behavior is controlled by [`LateDataConfig`]:
275/// - Drop the event (default)
276/// - Route to a named side output for separate processing
277///
278/// # State Management
279///
280/// Window state is stored in the operator context's state store using
281/// prefixed keys: `slw:<window_id>` - Accumulator state
282///
283/// # Performance Considerations
284///
285/// Each event updates `ceil(size / slide)` windows. For example:
286/// - 1-hour window, 15-minute slide = 4 windows per event
287/// - 1-minute window, 10-second slide = 6 windows per event
288///
289/// State usage scales linearly with active windows.
290pub struct SlidingWindowOperator<A: Aggregator> {
291    /// Window assigner
292    assigner: SlidingWindowAssigner,
293    /// Aggregator function
294    aggregator: A,
295    /// Allowed lateness for late data
296    allowed_lateness_ms: i64,
297    /// Track registered timers to avoid duplicates
298    registered_windows: std::collections::HashSet<WindowId>,
299    /// Track windows with registered periodic timers
300    periodic_timer_windows: std::collections::HashSet<WindowId>,
301    /// Emit strategy for controlling when results are output
302    emit_strategy: EmitStrategy,
303    /// Late data handling configuration
304    late_data_config: LateDataConfig,
305    /// Metrics for late data tracking
306    late_data_metrics: LateDataMetrics,
307    /// Operator ID for checkpointing
308    operator_id: String,
309    /// Cached output schema (avoids allocation on every emit)
310    output_schema: SchemaRef,
311    /// Phantom data for accumulator type
312    _phantom: PhantomData<A::Acc>,
313}
314
315impl<A: Aggregator> SlidingWindowOperator<A>
316where
317    A::Acc: Archive + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
318    <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
319        + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
320{
321    /// Creates a new sliding window operator.
322    ///
323    /// # Arguments
324    ///
325    /// * `assigner` - Window assigner for determining window boundaries
326    /// * `aggregator` - Aggregation function to apply within windows
327    /// * `allowed_lateness` - Grace period for late data after window close
328    /// # Panics
329    ///
330    /// Panics if allowed lateness does not fit in i64.
331    pub fn new(assigner: SlidingWindowAssigner, aggregator: A, allowed_lateness: Duration) -> Self {
332        let operator_num = SLIDING_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
333        Self {
334            assigner,
335            aggregator,
336            allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
337                .expect("Allowed lateness must fit in i64"),
338            registered_windows: std::collections::HashSet::new(),
339            periodic_timer_windows: std::collections::HashSet::new(),
340            emit_strategy: EmitStrategy::default(),
341            late_data_config: LateDataConfig::default(),
342            late_data_metrics: LateDataMetrics::new(),
343            operator_id: format!("sliding_window_{operator_num}"),
344            output_schema: create_window_output_schema(),
345            _phantom: PhantomData,
346        }
347    }
348
349    /// Creates a new sliding window operator with a custom operator ID.
350    /// # Panics
351    ///
352    /// Panics if allowed lateness does not fit in i64.
353    pub fn with_id(
354        assigner: SlidingWindowAssigner,
355        aggregator: A,
356        allowed_lateness: Duration,
357        operator_id: String,
358    ) -> Self {
359        Self {
360            assigner,
361            aggregator,
362            allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
363                .expect("Allowed lateness must fit in i64"),
364            registered_windows: std::collections::HashSet::new(),
365            periodic_timer_windows: std::collections::HashSet::new(),
366            emit_strategy: EmitStrategy::default(),
367            late_data_config: LateDataConfig::default(),
368            late_data_metrics: LateDataMetrics::new(),
369            operator_id,
370            output_schema: create_window_output_schema(),
371            _phantom: PhantomData,
372        }
373    }
374
375    /// Sets the emit strategy for this window operator.
376    pub fn set_emit_strategy(&mut self, strategy: EmitStrategy) {
377        self.emit_strategy = strategy;
378    }
379
380    /// Returns the current emit strategy.
381    #[must_use]
382    pub fn emit_strategy(&self) -> &EmitStrategy {
383        &self.emit_strategy
384    }
385
386    /// Sets the late data handling configuration.
387    pub fn set_late_data_config(&mut self, config: LateDataConfig) {
388        self.late_data_config = config;
389    }
390
391    /// Returns the current late data configuration.
392    #[must_use]
393    pub fn late_data_config(&self) -> &LateDataConfig {
394        &self.late_data_config
395    }
396
397    /// Returns the late data metrics.
398    #[must_use]
399    pub fn late_data_metrics(&self) -> &LateDataMetrics {
400        &self.late_data_metrics
401    }
402
403    /// Resets the late data metrics counters.
404    pub fn reset_late_data_metrics(&mut self) {
405        self.late_data_metrics.reset();
406    }
407
408    /// Returns the window assigner.
409    #[must_use]
410    pub fn assigner(&self) -> &SlidingWindowAssigner {
411        &self.assigner
412    }
413
414    /// Returns the allowed lateness in milliseconds.
415    #[must_use]
416    pub fn allowed_lateness_ms(&self) -> i64 {
417        self.allowed_lateness_ms
418    }
419
420    /// Generates the state key for a window's accumulator.
421    #[inline]
422    fn state_key(window_id: &WindowId) -> [u8; WINDOW_STATE_KEY_SIZE] {
423        let mut key = [0u8; WINDOW_STATE_KEY_SIZE];
424        key[..4].copy_from_slice(WINDOW_STATE_PREFIX);
425        let window_key = window_id.to_key_inline();
426        key[4..20].copy_from_slice(&window_key);
427        key
428    }
429
430    /// Gets the accumulator for a window, creating a new one if needed.
431    fn get_accumulator(&self, window_id: &WindowId, state: &dyn StateStore) -> A::Acc {
432        let key = Self::state_key(window_id);
433        state
434            .get_typed::<A::Acc>(&key)
435            .ok()
436            .flatten()
437            .unwrap_or_else(|| self.aggregator.create_accumulator())
438    }
439
440    /// Stores the accumulator for a window.
441    fn put_accumulator(
442        window_id: &WindowId,
443        acc: &A::Acc,
444        state: &mut dyn StateStore,
445    ) -> Result<(), OperatorError> {
446        let key = Self::state_key(window_id);
447        state
448            .put_typed(&key, acc)
449            .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
450    }
451
452    /// Deletes the accumulator for a window.
453    fn delete_accumulator(
454        window_id: &WindowId,
455        state: &mut dyn StateStore,
456    ) -> Result<(), OperatorError> {
457        let key = Self::state_key(window_id);
458        state
459            .delete(&key)
460            .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
461    }
462
463    /// Checks if an event is late for all possible windows.
464    ///
465    /// An event is late if all windows it would belong to have already closed
466    /// (watermark has passed their end + allowed lateness).
467    fn is_late(&self, event_time: i64, watermark: i64) -> bool {
468        // Get all windows this event would belong to
469        let windows = self.assigner.assign_windows(event_time);
470
471        // Event is late only if ALL its windows have closed
472        windows.iter().all(|window_id| {
473            let cleanup_time = window_id.end + self.allowed_lateness_ms;
474            watermark >= cleanup_time
475        })
476    }
477
478    /// Registers a timer for window triggering if not already registered.
479    fn maybe_register_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
480        if !self.registered_windows.contains(&window_id) {
481            let trigger_time = window_id.end + self.allowed_lateness_ms;
482            ctx.timers.register_timer(
483                trigger_time,
484                Some(window_id.to_key()),
485                Some(ctx.operator_index),
486            );
487            self.registered_windows.insert(window_id);
488        }
489    }
490
491    /// Registers a periodic timer for intermediate emissions.
492    fn maybe_register_periodic_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
493        if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
494            if !self.periodic_timer_windows.contains(&window_id) {
495                let interval_ms =
496                    i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
497                let trigger_time = ctx.processing_time + interval_ms;
498                let key = Self::periodic_timer_key(&window_id);
499                ctx.timers
500                    .register_timer(trigger_time, Some(key), Some(ctx.operator_index));
501                self.periodic_timer_windows.insert(window_id);
502            }
503        }
504    }
505
506    /// Creates a periodic timer key from a window ID.
507    #[inline]
508    fn periodic_timer_key(window_id: &WindowId) -> super::TimerKey {
509        let mut key = window_id.to_key();
510        if !key.is_empty() {
511            key[0] |= 0x80;
512        }
513        key
514    }
515
516    /// Checks if a timer key is for a periodic timer.
517    #[inline]
518    fn is_periodic_timer_key(key: &[u8]) -> bool {
519        !key.is_empty() && (key[0] & 0x80) != 0
520    }
521
522    /// Extracts the window ID from a periodic timer key.
523    #[inline]
524    fn window_id_from_periodic_key(key: &[u8]) -> Option<WindowId> {
525        if key.len() != 16 {
526            return None;
527        }
528        let mut clean_key = [0u8; 16];
529        clean_key.copy_from_slice(key);
530        clean_key[0] &= 0x7F;
531        WindowId::from_key(&clean_key)
532    }
533
534    /// Creates an intermediate result for a window without cleaning up state.
535    fn create_intermediate_result(
536        &self,
537        window_id: &WindowId,
538        state: &dyn StateStore,
539    ) -> Option<Event> {
540        let acc = self.get_accumulator(window_id, state);
541
542        if acc.is_empty() {
543            return None;
544        }
545
546        let result = acc.result();
547        let result_i64 = result.to_i64();
548
549        let batch = RecordBatch::try_new(
550            Arc::clone(&self.output_schema),
551            vec![
552                Arc::new(Int64Array::from(vec![window_id.start])),
553                Arc::new(Int64Array::from(vec![window_id.end])),
554                Arc::new(Int64Array::from(vec![result_i64])),
555            ],
556        )
557        .ok()?;
558
559        Some(Event::new(window_id.end, batch))
560    }
561
562    /// Handles periodic timer expiration for intermediate emissions.
563    fn handle_periodic_timer(
564        &mut self,
565        window_id: WindowId,
566        ctx: &mut OperatorContext,
567    ) -> OutputVec {
568        let mut output = OutputVec::new();
569
570        if !self.registered_windows.contains(&window_id) {
571            self.periodic_timer_windows.remove(&window_id);
572            return output;
573        }
574
575        if let Some(event) = self.create_intermediate_result(&window_id, ctx.state) {
576            output.push(Output::Event(event));
577        }
578
579        if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
580            let interval_ms =
581                i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
582            let next_trigger = ctx.processing_time + interval_ms;
583            let window_close_time = window_id.end + self.allowed_lateness_ms;
584            if next_trigger < window_close_time {
585                let key = Self::periodic_timer_key(&window_id);
586                ctx.timers
587                    .register_timer(next_trigger, Some(key), Some(ctx.operator_index));
588            }
589        }
590
591        output
592    }
593}
594
595impl<A: Aggregator> Operator for SlidingWindowOperator<A>
596where
597    A::Acc: 'static
598        + Archive
599        + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
600    <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
601        + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
602{
603    fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
604        let event_time = event.timestamp;
605
606        // Update watermark with the new event
607        let emitted_watermark = ctx.watermark_generator.on_event(event_time);
608
609        // Check if this event is too late (all windows closed)
610        let current_wm = ctx.watermark_generator.current_watermark();
611        if current_wm > i64::MIN && self.is_late(event_time, current_wm) {
612            let mut output = OutputVec::new();
613
614            // F011B: EMIT FINAL drops late data entirely
615            if self.emit_strategy.drops_late_data() {
616                self.late_data_metrics.record_dropped();
617                return output; // Silently drop - no LateEvent output
618            }
619
620            if let Some(side_output_name) = self.late_data_config.side_output() {
621                self.late_data_metrics.record_side_output();
622                output.push(Output::SideOutput {
623                    name: side_output_name.to_string(),
624                    event: event.clone(),
625                });
626            } else {
627                self.late_data_metrics.record_dropped();
628                output.push(Output::LateEvent(event.clone()));
629            }
630            return output;
631        }
632
633        // Assign event to all overlapping windows
634        let windows = self.assigner.assign_windows(event_time);
635
636        // Track windows that were updated (for OnUpdate and Changelog strategies)
637        let mut updated_windows = Vec::new();
638
639        // Update accumulator for each window
640        for window_id in &windows {
641            // Skip windows that have already closed
642            let cleanup_time = window_id.end + self.allowed_lateness_ms;
643            if current_wm > i64::MIN && current_wm >= cleanup_time {
644                continue;
645            }
646
647            // Extract value for each window (re-extract since Input may not be Clone)
648            if let Some(value) = self.aggregator.extract(event) {
649                let mut acc = self.get_accumulator(window_id, ctx.state);
650                acc.add(value);
651                if Self::put_accumulator(window_id, &acc, ctx.state).is_ok() {
652                    updated_windows.push(*window_id);
653                }
654            }
655
656            // Register timers for this window
657            self.maybe_register_timer(*window_id, ctx);
658
659            // F011B: OnWindowClose and Final suppress intermediate emissions
660            if !self.emit_strategy.suppresses_intermediate() {
661                self.maybe_register_periodic_timer(*window_id, ctx);
662            }
663        }
664
665        // Build output
666        let mut output = OutputVec::new();
667
668        // Emit watermark update if generated
669        if let Some(wm) = emitted_watermark {
670            output.push(Output::Watermark(wm.timestamp()));
671        }
672
673        // F011B: Handle different emit strategies for intermediate emissions
674        if !updated_windows.is_empty() {
675            match &self.emit_strategy {
676                // OnUpdate: emit intermediate result as regular event
677                EmitStrategy::OnUpdate => {
678                    for window_id in &updated_windows {
679                        if let Some(event) = self.create_intermediate_result(window_id, ctx.state) {
680                            output.push(Output::Event(event));
681                        }
682                    }
683                }
684                // Changelog: emit changelog record on every update
685                EmitStrategy::Changelog => {
686                    for window_id in &updated_windows {
687                        if let Some(event) = self.create_intermediate_result(window_id, ctx.state) {
688                            let record = ChangelogRecord::insert(event, ctx.processing_time);
689                            output.push(Output::Changelog(record));
690                        }
691                    }
692                }
693                // Other strategies: no intermediate emission
694                EmitStrategy::OnWatermark
695                | EmitStrategy::Periodic(_)
696                | EmitStrategy::OnWindowClose
697                | EmitStrategy::Final => {}
698            }
699        }
700
701        output
702    }
703
704    fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
705        // Check if this is a periodic timer
706        if Self::is_periodic_timer_key(&timer.key) {
707            // F011B: OnWindowClose and Final suppress periodic emissions
708            if self.emit_strategy.suppresses_intermediate() {
709                // Don't emit, just clean up the periodic timer tracking
710                if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
711                    self.periodic_timer_windows.remove(&window_id);
712                }
713                return OutputVec::new();
714            }
715
716            if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
717                return self.handle_periodic_timer(window_id, ctx);
718            }
719            return OutputVec::new();
720        }
721
722        // Parse window ID from timer key (final emission timer)
723        let Some(window_id) = WindowId::from_key(&timer.key) else {
724            return OutputVec::new();
725        };
726
727        // Get the accumulator
728        let acc = self.get_accumulator(&window_id, ctx.state);
729
730        // Skip empty windows
731        if acc.is_empty() {
732            let _ = Self::delete_accumulator(&window_id, ctx.state);
733            self.registered_windows.remove(&window_id);
734            self.periodic_timer_windows.remove(&window_id);
735            return OutputVec::new();
736        }
737
738        // Get the result
739        let result = acc.result();
740
741        // Clean up window state
742        let _ = Self::delete_accumulator(&window_id, ctx.state);
743        self.registered_windows.remove(&window_id);
744        self.periodic_timer_windows.remove(&window_id);
745
746        // Convert result to i64 for the batch
747        let result_i64 = result.to_i64();
748
749        // Create output batch
750        let batch = RecordBatch::try_new(
751            Arc::clone(&self.output_schema),
752            vec![
753                Arc::new(Int64Array::from(vec![window_id.start])),
754                Arc::new(Int64Array::from(vec![window_id.end])),
755                Arc::new(Int64Array::from(vec![result_i64])),
756            ],
757        );
758
759        let mut output = OutputVec::new();
760        match batch {
761            Ok(data) => {
762                let event = Event::new(window_id.end, data);
763
764                // F011B: Emit based on strategy
765                match &self.emit_strategy {
766                    // Changelog: wrap in changelog record for CDC
767                    EmitStrategy::Changelog => {
768                        let record = ChangelogRecord::insert(event, ctx.processing_time);
769                        output.push(Output::Changelog(record));
770                    }
771                    // All other strategies: emit as regular event
772                    EmitStrategy::OnWatermark
773                    | EmitStrategy::Periodic(_)
774                    | EmitStrategy::OnUpdate
775                    | EmitStrategy::OnWindowClose
776                    | EmitStrategy::Final => {
777                        output.push(Output::Event(event));
778                    }
779                }
780            }
781            Err(e) => {
782                tracing::error!("Failed to create output batch: {e}");
783            }
784        }
785        output
786    }
787
788    fn checkpoint(&self) -> OperatorState {
789        let windows: Vec<_> = self.registered_windows.iter().copied().collect();
790        let periodic_windows: Vec<_> = self.periodic_timer_windows.iter().copied().collect();
791
792        let checkpoint_data = (windows, periodic_windows);
793        let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
794            .map(|v| v.to_vec())
795            .unwrap_or_default();
796
797        OperatorState {
798            operator_id: self.operator_id.clone(),
799            data,
800        }
801    }
802
803    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
804        if state.operator_id != self.operator_id {
805            return Err(OperatorError::StateAccessFailed(format!(
806                "Operator ID mismatch: expected {}, got {}",
807                self.operator_id, state.operator_id
808            )));
809        }
810
811        // Try to deserialize as the new format (tuple of two vectors)
812        if let Ok(archived) =
813            rkyv::access::<rkyv::Archived<(Vec<WindowId>, Vec<WindowId>)>, RkyvError>(&state.data)
814        {
815            if let Ok((windows, periodic_windows)) =
816                rkyv::deserialize::<(Vec<WindowId>, Vec<WindowId>), RkyvError>(archived)
817            {
818                self.registered_windows = windows.into_iter().collect();
819                self.periodic_timer_windows = periodic_windows.into_iter().collect();
820                return Ok(());
821            }
822        }
823
824        // Fall back to old format for backwards compatibility
825        let archived = rkyv::access::<rkyv::Archived<Vec<WindowId>>, RkyvError>(&state.data)
826            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
827        let windows: Vec<WindowId> = rkyv::deserialize::<Vec<WindowId>, RkyvError>(archived)
828            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
829
830        self.registered_windows = windows.into_iter().collect();
831        self.periodic_timer_windows = std::collections::HashSet::new();
832        Ok(())
833    }
834}
835
836#[cfg(test)]
837mod tests {
838    use super::*;
839    use crate::operator::window::{CountAccumulator, CountAggregator, SumAggregator};
840    use crate::state::InMemoryStore;
841    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
842    use arrow_array::{Int64Array, RecordBatch};
843    use arrow_schema::{DataType, Field, Schema};
844
845    fn create_test_event(timestamp: i64, value: i64) -> Event {
846        let schema = Arc::new(Schema::new(vec![Field::new(
847            "value",
848            DataType::Int64,
849            false,
850        )]));
851        let batch =
852            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
853        Event::new(timestamp, batch)
854    }
855
856    fn create_test_context<'a>(
857        timers: &'a mut TimerService,
858        state: &'a mut dyn StateStore,
859        watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
860    ) -> OperatorContext<'a> {
861        OperatorContext {
862            event_time: 0,
863            processing_time: 0,
864            timers,
865            state,
866            watermark_generator: watermark_gen,
867            operator_index: 0,
868        }
869    }
870
871    #[test]
872    fn test_sliding_assigner_creation() {
873        let assigner = SlidingWindowAssigner::new(Duration::from_secs(60), Duration::from_secs(20));
874
875        assert_eq!(assigner.size_ms(), 60_000);
876        assert_eq!(assigner.slide_ms(), 20_000);
877        assert_eq!(assigner.windows_per_event(), 3); // ceil(60/20) = 3
878    }
879
880    #[test]
881    fn test_sliding_assigner_from_millis() {
882        let assigner = SlidingWindowAssigner::from_millis(1000, 200);
883
884        assert_eq!(assigner.size_ms(), 1000);
885        assert_eq!(assigner.slide_ms(), 200);
886        assert_eq!(assigner.windows_per_event(), 5); // ceil(1000/200) = 5
887    }
888
889    #[test]
890    #[should_panic(expected = "Window size must be positive")]
891    fn test_sliding_assigner_zero_size() {
892        let _ = SlidingWindowAssigner::from_millis(0, 100);
893    }
894
895    #[test]
896    #[should_panic(expected = "Slide interval must be positive")]
897    fn test_sliding_assigner_zero_slide() {
898        let _ = SlidingWindowAssigner::from_millis(1000, 0);
899    }
900
901    #[test]
902    #[should_panic(expected = "Slide must not exceed size")]
903    fn test_sliding_assigner_slide_exceeds_size() {
904        let _ = SlidingWindowAssigner::from_millis(100, 200);
905    }
906
907    #[test]
908    fn test_sliding_assigner_basic_assignment() {
909        // 1-minute window with 20-second slide
910        let assigner = SlidingWindowAssigner::from_millis(60_000, 20_000);
911
912        // Event at t=50000 should belong to 3 windows
913        let windows = assigner.assign_windows(50_000);
914
915        assert_eq!(windows.len(), 3);
916
917        // Windows should be (in chronological order):
918        // [20000, 80000), [40000, 100000), [60000, 120000) - but wait, 50000 is NOT in [60000, 120000)
919        // Let me recalculate:
920        // last_window_start(50000) = (50000 / 20000) * 20000 = 40000
921        // Window [40000, 100000) contains 50000? Yes (40000 <= 50000 < 100000)
922        // Window [20000, 80000) contains 50000? Yes (20000 <= 50000 < 80000)
923        // Window [0, 60000) contains 50000? Yes (0 <= 50000 < 60000)
924        // Window [-20000, 40000) contains 50000? No (50000 >= 40000)
925
926        assert!(windows.contains(&WindowId::new(0, 60_000)));
927        assert!(windows.contains(&WindowId::new(20_000, 80_000)));
928        assert!(windows.contains(&WindowId::new(40_000, 100_000)));
929    }
930
931    #[test]
932    fn test_sliding_assigner_boundary_event() {
933        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
934
935        // Event exactly at window boundary (t=1000)
936        let windows = assigner.assign_windows(1000);
937
938        // Should belong to windows starting at 500 and 1000
939        // Window [1000, 2000) contains 1000? Yes
940        // Window [500, 1500) contains 1000? Yes
941        // Window [0, 1000) contains 1000? No (end is exclusive)
942        assert_eq!(windows.len(), 2);
943        assert!(windows.contains(&WindowId::new(500, 1500)));
944        assert!(windows.contains(&WindowId::new(1000, 2000)));
945    }
946
947    #[test]
948    fn test_sliding_assigner_negative_timestamp() {
949        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
950
951        // Event at t=-500
952        let windows = assigner.assign_windows(-500);
953
954        // Should belong to windows containing -500
955        // last_window_start(-500) = floor(-500 / 500) * 500 = -500
956        // Window [-500, 500) contains -500? Yes
957        // Window [-1000, 0) contains -500? Yes
958        // Window [-1500, -500) contains -500? No (end is exclusive)
959        assert_eq!(windows.len(), 2);
960        assert!(windows.contains(&WindowId::new(-1000, 0)));
961        assert!(windows.contains(&WindowId::new(-500, 500)));
962    }
963
964    #[test]
965    fn test_sliding_assigner_equal_size_and_slide() {
966        // When size == slide, should behave like tumbling windows
967        let assigner = SlidingWindowAssigner::from_millis(1000, 1000);
968
969        assert_eq!(assigner.windows_per_event(), 1);
970
971        let windows = assigner.assign_windows(500);
972        assert_eq!(windows.len(), 1);
973        assert_eq!(windows[0], WindowId::new(0, 1000));
974    }
975
976    #[test]
977    fn test_sliding_assigner_small_slide() {
978        // 1 second window, 100ms slide = 10 windows per event
979        let assigner = SlidingWindowAssigner::from_millis(1000, 100);
980
981        assert_eq!(assigner.windows_per_event(), 10);
982
983        let windows = assigner.assign_windows(500);
984        assert_eq!(windows.len(), 10);
985    }
986
987    #[test]
988    fn test_sliding_operator_creation() {
989        let assigner = SlidingWindowAssigner::from_millis(1000, 200);
990        let aggregator = CountAggregator::new();
991        let operator = SlidingWindowOperator::new(assigner, aggregator, Duration::from_millis(100));
992
993        assert_eq!(operator.allowed_lateness_ms(), 100);
994        assert_eq!(*operator.emit_strategy(), EmitStrategy::OnWatermark);
995        assert!(operator.late_data_config().should_drop());
996    }
997
998    #[test]
999    fn test_sliding_operator_with_id() {
1000        let assigner = SlidingWindowAssigner::from_millis(1000, 200);
1001        let aggregator = CountAggregator::new();
1002        let operator = SlidingWindowOperator::with_id(
1003            assigner,
1004            aggregator,
1005            Duration::from_millis(0),
1006            "test_sliding".to_string(),
1007        );
1008
1009        assert_eq!(operator.operator_id, "test_sliding");
1010    }
1011
1012    #[test]
1013    fn test_sliding_operator_process_single_event() {
1014        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1015        let aggregator = CountAggregator::new();
1016        let mut operator = SlidingWindowOperator::with_id(
1017            assigner,
1018            aggregator,
1019            Duration::from_millis(0),
1020            "test_op".to_string(),
1021        );
1022
1023        let mut timers = TimerService::new();
1024        let mut state = InMemoryStore::new();
1025        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1026
1027        // Process event at t=600
1028        // Should belong to windows [0, 1000) and [500, 1500)
1029        let event = create_test_event(600, 1);
1030        {
1031            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1032            operator.process(&event, &mut ctx);
1033        }
1034
1035        // Should have 2 registered windows
1036        assert_eq!(operator.registered_windows.len(), 2);
1037        assert!(operator
1038            .registered_windows
1039            .contains(&WindowId::new(0, 1000)));
1040        assert!(operator
1041            .registered_windows
1042            .contains(&WindowId::new(500, 1500)));
1043    }
1044
1045    #[test]
1046    fn test_sliding_operator_accumulates_correctly() {
1047        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1048        let aggregator = CountAggregator::new();
1049        let mut operator = SlidingWindowOperator::with_id(
1050            assigner.clone(),
1051            aggregator,
1052            Duration::from_millis(0),
1053            "test_op".to_string(),
1054        );
1055
1056        let mut timers = TimerService::new();
1057        let mut state = InMemoryStore::new();
1058        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1059
1060        // Process events at t=100, t=600, t=800
1061        // t=100: belongs to [0, 1000)
1062        // t=600: belongs to [0, 1000), [500, 1500)
1063        // t=800: belongs to [0, 1000), [500, 1500)
1064        for ts in [100, 600, 800] {
1065            let event = create_test_event(ts, 1);
1066            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1067            operator.process(&event, &mut ctx);
1068        }
1069
1070        // Window [0, 1000) should have count = 3
1071        let window_0_1000 = WindowId::new(0, 1000);
1072        let acc: CountAccumulator = operator.get_accumulator(&window_0_1000, &state);
1073        assert_eq!(acc.result(), 3);
1074
1075        // Window [500, 1500) should have count = 2
1076        let window_500_1500 = WindowId::new(500, 1500);
1077        let acc: CountAccumulator = operator.get_accumulator(&window_500_1500, &state);
1078        assert_eq!(acc.result(), 2);
1079    }
1080
1081    #[test]
1082    fn test_sliding_operator_window_trigger() {
1083        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1084        let aggregator = CountAggregator::new();
1085        let mut operator = SlidingWindowOperator::with_id(
1086            assigner,
1087            aggregator,
1088            Duration::from_millis(0),
1089            "test_op".to_string(),
1090        );
1091
1092        let mut timers = TimerService::new();
1093        let mut state = InMemoryStore::new();
1094        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1095
1096        // Process 3 events in window [0, 1000)
1097        for ts in [100, 200, 300] {
1098            let event = create_test_event(ts, 1);
1099            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1100            operator.process(&event, &mut ctx);
1101        }
1102
1103        // Trigger window [0, 1000)
1104        let timer = Timer {
1105            key: WindowId::new(0, 1000).to_key(),
1106            timestamp: 1000,
1107        };
1108
1109        let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1110        let outputs = operator.on_timer(timer, &mut ctx);
1111
1112        assert_eq!(outputs.len(), 1);
1113        match &outputs[0] {
1114            Output::Event(event) => {
1115                assert_eq!(event.timestamp, 1000);
1116                let result_col = event.data.column(2);
1117                let result_array = result_col.as_any().downcast_ref::<Int64Array>().unwrap();
1118                assert_eq!(result_array.value(0), 3);
1119            }
1120            _ => panic!("Expected Event output"),
1121        }
1122
1123        // Window should be cleaned up
1124        assert!(!operator
1125            .registered_windows
1126            .contains(&WindowId::new(0, 1000)));
1127    }
1128
1129    #[test]
1130    fn test_sliding_operator_multiple_window_triggers() {
1131        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1132        let aggregator = SumAggregator::new(0);
1133        let mut operator = SlidingWindowOperator::with_id(
1134            assigner,
1135            aggregator,
1136            Duration::from_millis(0),
1137            "test_op".to_string(),
1138        );
1139
1140        let mut timers = TimerService::new();
1141        let mut state = InMemoryStore::new();
1142        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1143
1144        // Process event at t=600 with value 10
1145        // Belongs to [0, 1000) and [500, 1500)
1146        let event = create_test_event(600, 10);
1147        {
1148            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1149            operator.process(&event, &mut ctx);
1150        }
1151
1152        // Trigger first window [0, 1000)
1153        let t1 = Timer {
1154            key: WindowId::new(0, 1000).to_key(),
1155            timestamp: 1000,
1156        };
1157        let outputs1 = {
1158            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1159            operator.on_timer(t1, &mut ctx)
1160        };
1161
1162        assert_eq!(outputs1.len(), 1);
1163        if let Output::Event(e) = &outputs1[0] {
1164            let result = e
1165                .data
1166                .column(2)
1167                .as_any()
1168                .downcast_ref::<Int64Array>()
1169                .unwrap()
1170                .value(0);
1171            assert_eq!(result, 10);
1172        }
1173
1174        // Trigger second window [500, 1500)
1175        let t2 = Timer {
1176            key: WindowId::new(500, 1500).to_key(),
1177            timestamp: 1500,
1178        };
1179        let outputs2 = {
1180            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1181            operator.on_timer(t2, &mut ctx)
1182        };
1183
1184        assert_eq!(outputs2.len(), 1);
1185        if let Output::Event(e) = &outputs2[0] {
1186            let result = e
1187                .data
1188                .column(2)
1189                .as_any()
1190                .downcast_ref::<Int64Array>()
1191                .unwrap()
1192                .value(0);
1193            assert_eq!(result, 10);
1194        }
1195
1196        // Both windows should be cleaned up
1197        assert!(operator.registered_windows.is_empty());
1198    }
1199
1200    #[test]
1201    fn test_sliding_operator_late_event() {
1202        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1203        let aggregator = CountAggregator::new();
1204        let mut operator = SlidingWindowOperator::with_id(
1205            assigner,
1206            aggregator,
1207            Duration::from_millis(0),
1208            "test_op".to_string(),
1209        );
1210
1211        let mut timers = TimerService::new();
1212        let mut state = InMemoryStore::new();
1213        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1214
1215        // Advance watermark to 2000
1216        let event1 = create_test_event(2000, 1);
1217        {
1218            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1219            operator.process(&event1, &mut ctx);
1220        }
1221
1222        // Process late event at t=500 (all windows closed)
1223        let late_event = create_test_event(500, 2);
1224        let outputs = {
1225            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1226            operator.process(&late_event, &mut ctx)
1227        };
1228
1229        // Should emit LateEvent
1230        let is_late = outputs.iter().any(|o| matches!(o, Output::LateEvent(_)));
1231        assert!(is_late);
1232        assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
1233    }
1234
1235    #[test]
1236    fn test_sliding_operator_late_event_side_output() {
1237        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1238        let aggregator = CountAggregator::new();
1239        let mut operator = SlidingWindowOperator::with_id(
1240            assigner,
1241            aggregator,
1242            Duration::from_millis(0),
1243            "test_op".to_string(),
1244        );
1245
1246        operator.set_late_data_config(LateDataConfig::with_side_output("late".to_string()));
1247
1248        let mut timers = TimerService::new();
1249        let mut state = InMemoryStore::new();
1250        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1251
1252        // Advance watermark
1253        let event1 = create_test_event(2000, 1);
1254        {
1255            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1256            operator.process(&event1, &mut ctx);
1257        }
1258
1259        // Process late event
1260        let late_event = create_test_event(500, 2);
1261        let outputs = {
1262            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1263            operator.process(&late_event, &mut ctx)
1264        };
1265
1266        // Should emit SideOutput
1267        let side_output = outputs.iter().find_map(|o| {
1268            if let Output::SideOutput { name, .. } = o {
1269                Some(name.clone())
1270            } else {
1271                None
1272            }
1273        });
1274        assert_eq!(side_output, Some("late".to_string()));
1275        assert_eq!(operator.late_data_metrics().late_events_side_output(), 1);
1276    }
1277
1278    #[test]
1279    fn test_sliding_operator_emit_on_update() {
1280        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1281        let aggregator = CountAggregator::new();
1282        let mut operator = SlidingWindowOperator::with_id(
1283            assigner,
1284            aggregator,
1285            Duration::from_millis(0),
1286            "test_op".to_string(),
1287        );
1288
1289        operator.set_emit_strategy(EmitStrategy::OnUpdate);
1290
1291        let mut timers = TimerService::new();
1292        let mut state = InMemoryStore::new();
1293        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1294
1295        // Process event - should emit intermediate results for both windows
1296        let event = create_test_event(600, 1);
1297        let outputs = {
1298            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1299            operator.process(&event, &mut ctx)
1300        };
1301
1302        // Should have 2 Event outputs (one per window)
1303        let event_count = outputs
1304            .iter()
1305            .filter(|o| matches!(o, Output::Event(_)))
1306            .count();
1307        assert_eq!(event_count, 2);
1308    }
1309
1310    #[test]
1311    fn test_sliding_operator_checkpoint_restore() {
1312        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1313        let aggregator = CountAggregator::new();
1314        let mut operator = SlidingWindowOperator::with_id(
1315            assigner.clone(),
1316            aggregator.clone(),
1317            Duration::from_millis(0),
1318            "test_op".to_string(),
1319        );
1320
1321        // Register some windows
1322        operator.registered_windows.insert(WindowId::new(0, 1000));
1323        operator.registered_windows.insert(WindowId::new(500, 1500));
1324        operator
1325            .periodic_timer_windows
1326            .insert(WindowId::new(0, 1000));
1327
1328        // Checkpoint
1329        let checkpoint = operator.checkpoint();
1330
1331        // Create new operator and restore
1332        let mut restored = SlidingWindowOperator::with_id(
1333            assigner,
1334            aggregator,
1335            Duration::from_millis(0),
1336            "test_op".to_string(),
1337        );
1338        restored.restore(checkpoint).unwrap();
1339
1340        assert_eq!(restored.registered_windows.len(), 2);
1341        assert_eq!(restored.periodic_timer_windows.len(), 1);
1342        assert!(restored
1343            .registered_windows
1344            .contains(&WindowId::new(0, 1000)));
1345        assert!(restored
1346            .registered_windows
1347            .contains(&WindowId::new(500, 1500)));
1348        assert!(restored
1349            .periodic_timer_windows
1350            .contains(&WindowId::new(0, 1000)));
1351    }
1352
1353    #[test]
1354    fn test_sliding_operator_empty_window_trigger() {
1355        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1356        let aggregator = CountAggregator::new();
1357        let mut operator = SlidingWindowOperator::with_id(
1358            assigner,
1359            aggregator,
1360            Duration::from_millis(0),
1361            "test_op".to_string(),
1362        );
1363
1364        let mut timers = TimerService::new();
1365        let mut state = InMemoryStore::new();
1366        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1367
1368        // Trigger without any events
1369        let timer = Timer {
1370            key: WindowId::new(0, 1000).to_key(),
1371            timestamp: 1000,
1372        };
1373
1374        let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1375        let outputs = operator.on_timer(timer, &mut ctx);
1376
1377        // Empty window should produce no output
1378        assert!(outputs.is_empty());
1379    }
1380
1381    #[test]
1382    fn test_sliding_operator_periodic_timer_key() {
1383        let window_id = WindowId::new(1000, 2000);
1384
1385        let periodic_key = SlidingWindowOperator::<CountAggregator>::periodic_timer_key(&window_id);
1386        assert!(SlidingWindowOperator::<CountAggregator>::is_periodic_timer_key(&periodic_key));
1387
1388        let extracted =
1389            SlidingWindowOperator::<CountAggregator>::window_id_from_periodic_key(&periodic_key);
1390        assert_eq!(extracted, Some(window_id));
1391
1392        // Regular key should not be detected as periodic
1393        let regular_key = window_id.to_key();
1394        assert!(!SlidingWindowOperator::<CountAggregator>::is_periodic_timer_key(&regular_key));
1395    }
1396
1397    #[test]
1398    fn test_sliding_operator_skips_closed_windows() {
1399        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1400        let aggregator = CountAggregator::new();
1401        let mut operator = SlidingWindowOperator::with_id(
1402            assigner,
1403            aggregator,
1404            Duration::from_millis(0),
1405            "test_op".to_string(),
1406        );
1407
1408        let mut timers = TimerService::new();
1409        let mut state = InMemoryStore::new();
1410        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1411
1412        // Advance watermark to 1100 (window [0, 1000) is closed, [500, 1500) is open)
1413        let event1 = create_test_event(1100, 1);
1414        {
1415            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1416            operator.process(&event1, &mut ctx);
1417        }
1418
1419        // Process event at t=800 - belongs to [0, 1000) and [500, 1500)
1420        // But [0, 1000) is closed, so should only update [500, 1500)
1421        let event2 = create_test_event(800, 1);
1422        {
1423            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1424            operator.process(&event2, &mut ctx);
1425        }
1426
1427        // Only [500, 1500) and [1000, 2000) should be registered (not [0, 1000))
1428        assert!(!operator
1429            .registered_windows
1430            .contains(&WindowId::new(0, 1000)));
1431        assert!(operator
1432            .registered_windows
1433            .contains(&WindowId::new(500, 1500)));
1434    }
1435
1436    #[test]
1437    fn test_sliding_assigner_window_assigner_trait() {
1438        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1439
1440        // Test the WindowAssigner trait method
1441        let windows = assigner.assign_windows(600);
1442        assert_eq!(windows.len(), 2);
1443
1444        // Test max_timestamp
1445        assert_eq!(assigner.max_timestamp(1000), 999);
1446    }
1447
1448    #[test]
1449    fn test_sliding_operator_allowed_lateness() {
1450        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1451        let aggregator = CountAggregator::new();
1452        let mut operator = SlidingWindowOperator::with_id(
1453            assigner,
1454            aggregator,
1455            Duration::from_millis(500), // 500ms allowed lateness
1456            "test_op".to_string(),
1457        );
1458
1459        let mut timers = TimerService::new();
1460        let mut state = InMemoryStore::new();
1461        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1462
1463        // Advance watermark to 1200
1464        let event1 = create_test_event(1200, 1);
1465        {
1466            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1467            operator.process(&event1, &mut ctx);
1468        }
1469
1470        // Process event at t=800 - window [0, 1000) cleanup is at 1500
1471        // Watermark (1200) < cleanup time (1500), so NOT late
1472        let event2 = create_test_event(800, 1);
1473        let outputs = {
1474            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1475            operator.process(&event2, &mut ctx)
1476        };
1477
1478        // Should NOT be late
1479        let is_late = outputs
1480            .iter()
1481            .any(|o| matches!(o, Output::LateEvent(_) | Output::SideOutput { .. }));
1482        assert!(!is_late);
1483        assert_eq!(operator.late_data_metrics().late_events_total(), 0);
1484    }
1485}