Skip to main content

laminar_core/operator/
session_window.rs

1//! # Session Window Operators
2//!
3//! Implementation of session windows for stream processing.
4//!
5//! Session windows are dynamic windows that group events by activity periods
6//! separated by gaps. Unlike tumbling and sliding windows which have fixed
7//! boundaries, session windows grow with activity and close after inactivity.
8//!
9//! ## Key Characteristics
10//!
11//! - **Dynamic boundaries**: Sessions start with the first event and extend
12//!   with each new event within the gap period
13//! - **Per-key tracking**: Each key maintains independent session state
14//! - **Gap-based closure**: Sessions close when no events arrive within the gap
15//! - **Session merging**: Late data can merge previously separate sessions
16//!
17//! ## Example
18//!
19//! ```text
20//! Gap: 30 seconds
21//!
22//! Events: [t=0] [t=10] [t=20]  ...gap...  [t=100] [t=110]
23//!         |<---- Session 1 ---->|          |<- Session 2 ->|
24//!         [0, 50)                          [100, 140)
25//! ```
26//!
27//! ## Usage
28//!
29//! ```rust,no_run
30//! use laminar_core::operator::session_window::SessionWindowOperator;
31//! use laminar_core::operator::window::CountAggregator;
32//! use std::time::Duration;
33//!
34//! // Create a session window with 30-second gap
35//! let operator = SessionWindowOperator::new(
36//!     Duration::from_secs(30),    // gap timeout
37//!     CountAggregator::new(),
38//!     Duration::from_secs(60),    // allowed lateness
39//! );
40//! ```
41
42use super::window::{
43    Accumulator, Aggregator, ChangelogRecord, EmitStrategy, LateDataConfig, LateDataMetrics,
44    ResultToI64, WindowId,
45};
46use super::{
47    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
48};
49use crate::state::{StateStore, StateStoreExt};
50use arrow_array::{Array, Int64Array, RecordBatch};
51use arrow_schema::{DataType, Field, Schema, SchemaRef};
52use fxhash::FxHashMap;
53use rkyv::{
54    api::high::{HighDeserializer, HighSerializer, HighValidator},
55    bytecheck::CheckBytes,
56    rancor::Error as RkyvError,
57    ser::allocator::ArenaHandle,
58    util::AlignedVec,
59    Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
60};
61use std::marker::PhantomData;
62use std::sync::atomic::{AtomicU64, Ordering};
63use std::sync::Arc;
64use std::time::Duration;
65
66/// State key prefix for session state (4 bytes)
67const SESSION_STATE_PREFIX: &[u8; 4] = b"ses:";
68
69/// State key prefix for session accumulator (4 bytes)
70const SESSION_ACC_PREFIX: &[u8; 4] = b"sac:";
71
72/// Timer key prefix for session closure (1 byte)
73const SESSION_TIMER_PREFIX: u8 = 0x01;
74
75/// Static counter for generating unique operator IDs.
76static SESSION_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
77
78/// Session state tracking start/end times and key.
79///
80/// This is the metadata for an active session, stored separately from
81/// the accumulator state for efficient updates.
82#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
83pub struct SessionState {
84    /// Session start timestamp (inclusive)
85    pub start: i64,
86    /// Session end timestamp (exclusive, = last event time + gap)
87    pub end: i64,
88    /// Key bytes for this session
89    pub key: Vec<u8>,
90}
91
92impl SessionState {
93    /// Creates a new session state from an event timestamp.
94    fn new(timestamp: i64, gap_ms: i64, key: Vec<u8>) -> Self {
95        Self {
96            start: timestamp,
97            end: timestamp + gap_ms,
98            key,
99        }
100    }
101
102    /// Returns the window ID for this session.
103    #[must_use]
104    pub fn window_id(&self) -> WindowId {
105        WindowId::new(self.start, self.end)
106    }
107
108    /// Checks if a timestamp falls within this session (including gap).
109    fn contains(&self, timestamp: i64, gap_ms: i64) -> bool {
110        // Event is within session if it's between start and end,
111        // or within gap of the current end
112        timestamp >= self.start && timestamp < self.end + gap_ms
113    }
114
115    /// Extends the session to include a new timestamp.
116    fn extend(&mut self, timestamp: i64, gap_ms: i64) {
117        self.start = self.start.min(timestamp);
118        self.end = self.end.max(timestamp + gap_ms);
119    }
120
121    /// Merges another session into this one.
122    ///
123    /// # Future Enhancement
124    ///
125    /// This method is prepared for session merging when late data arrives
126    /// that bridges two previously separate sessions.
127    #[allow(dead_code)]
128    fn merge(&mut self, other: &SessionState) {
129        self.start = self.start.min(other.start);
130        self.end = self.end.max(other.end);
131    }
132}
133
134/// Creates the standard session output schema.
135fn create_session_output_schema() -> SchemaRef {
136    Arc::new(Schema::new(vec![
137        Field::new("session_start", DataType::Int64, false),
138        Field::new("session_end", DataType::Int64, false),
139        Field::new("result", DataType::Int64, false),
140    ]))
141}
142
143/// Session window operator.
144///
145/// Groups events by activity periods separated by gaps. Each unique key
146/// maintains its own session state independently.
147///
148/// # Session Lifecycle
149///
150/// 1. **Start**: First event for a key creates a new session
151/// 2. **Extend**: Events within gap period extend the session
152/// 3. **Close**: Timer fires when gap expires, emitting results
153/// 4. **Merge**: Late data may merge previously separate sessions
154///
155/// # State Management
156///
157/// Session state is stored using prefixed keys:
158/// - `ses:<key_hash>` - Session metadata (start, end, key)
159/// - `sac:<key_hash>` - Accumulator state
160///
161/// # Emit Strategies
162///
163/// - `OnWatermark`: Emit when watermark passes session end
164/// - `OnUpdate`: Emit after every state update
165/// - `OnWindowClose`: Only emit on final closure
166/// - `Changelog`: Emit CDC records with Z-set weights
167/// - `Final`: Suppress all intermediate, drop late data
168pub struct SessionWindowOperator<A: Aggregator> {
169    /// Gap timeout in milliseconds
170    gap_ms: i64,
171    /// Aggregator function
172    aggregator: A,
173    /// Allowed lateness for late data
174    allowed_lateness_ms: i64,
175    /// Active sessions by key hash (in-memory index)
176    active_sessions: FxHashMap<u64, SessionState>,
177    /// Pending timers by key hash
178    pending_timers: FxHashMap<u64, i64>,
179    /// Emit strategy
180    emit_strategy: EmitStrategy,
181    /// Late data configuration
182    late_data_config: LateDataConfig,
183    /// Late data metrics
184    late_data_metrics: LateDataMetrics,
185    /// Operator ID for checkpointing
186    operator_id: String,
187    /// Cached output schema
188    output_schema: SchemaRef,
189    /// Key column index for partitioning (None = global session)
190    key_column: Option<usize>,
191    /// Phantom data for accumulator type
192    _phantom: PhantomData<A::Acc>,
193}
194
195impl<A: Aggregator> SessionWindowOperator<A>
196where
197    A::Acc: Archive + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
198    <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
199        + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
200{
201    /// Creates a new session window operator.
202    ///
203    /// # Arguments
204    ///
205    /// * `gap` - The inactivity gap that closes a session
206    /// * `aggregator` - Aggregation function to apply within sessions
207    /// * `allowed_lateness` - Grace period for late data after session close
208    ///
209    /// # Panics
210    ///
211    /// Panics if gap or allowed lateness does not fit in i64.
212    pub fn new(gap: Duration, aggregator: A, allowed_lateness: Duration) -> Self {
213        let operator_num = SESSION_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
214        Self {
215            gap_ms: i64::try_from(gap.as_millis()).expect("Gap must fit in i64"),
216            aggregator,
217            allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
218                .expect("Allowed lateness must fit in i64"),
219            active_sessions: FxHashMap::default(),
220            pending_timers: FxHashMap::default(),
221            emit_strategy: EmitStrategy::default(),
222            late_data_config: LateDataConfig::default(),
223            late_data_metrics: LateDataMetrics::new(),
224            operator_id: format!("session_window_{operator_num}"),
225            output_schema: create_session_output_schema(),
226            key_column: None,
227            _phantom: PhantomData,
228        }
229    }
230
231    /// Creates a new session window operator with a custom operator ID.
232    ///
233    /// # Panics
234    ///
235    /// Panics if gap or allowed lateness does not fit in i64.
236    pub fn with_id(
237        gap: Duration,
238        aggregator: A,
239        allowed_lateness: Duration,
240        operator_id: String,
241    ) -> Self {
242        Self {
243            gap_ms: i64::try_from(gap.as_millis()).expect("Gap must fit in i64"),
244            aggregator,
245            allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
246                .expect("Allowed lateness must fit in i64"),
247            active_sessions: FxHashMap::default(),
248            pending_timers: FxHashMap::default(),
249            emit_strategy: EmitStrategy::default(),
250            late_data_config: LateDataConfig::default(),
251            late_data_metrics: LateDataMetrics::new(),
252            operator_id,
253            output_schema: create_session_output_schema(),
254            key_column: None,
255            _phantom: PhantomData,
256        }
257    }
258
259    /// Sets the key column for per-key session tracking.
260    ///
261    /// If not set, a single global session is maintained.
262    pub fn set_key_column(&mut self, column_index: usize) {
263        self.key_column = Some(column_index);
264    }
265
266    /// Returns the key column index if set.
267    #[must_use]
268    pub fn key_column(&self) -> Option<usize> {
269        self.key_column
270    }
271
272    /// Sets the emit strategy for this operator.
273    pub fn set_emit_strategy(&mut self, strategy: EmitStrategy) {
274        self.emit_strategy = strategy;
275    }
276
277    /// Returns the current emit strategy.
278    #[must_use]
279    pub fn emit_strategy(&self) -> &EmitStrategy {
280        &self.emit_strategy
281    }
282
283    /// Sets the late data handling configuration.
284    pub fn set_late_data_config(&mut self, config: LateDataConfig) {
285        self.late_data_config = config;
286    }
287
288    /// Returns the current late data configuration.
289    #[must_use]
290    pub fn late_data_config(&self) -> &LateDataConfig {
291        &self.late_data_config
292    }
293
294    /// Returns the late data metrics.
295    #[must_use]
296    pub fn late_data_metrics(&self) -> &LateDataMetrics {
297        &self.late_data_metrics
298    }
299
300    /// Resets the late data metrics counters.
301    pub fn reset_late_data_metrics(&mut self) {
302        self.late_data_metrics.reset();
303    }
304
305    /// Returns the gap timeout in milliseconds.
306    #[must_use]
307    pub fn gap_ms(&self) -> i64 {
308        self.gap_ms
309    }
310
311    /// Returns the allowed lateness in milliseconds.
312    #[must_use]
313    pub fn allowed_lateness_ms(&self) -> i64 {
314        self.allowed_lateness_ms
315    }
316
317    /// Returns the number of active sessions.
318    #[must_use]
319    pub fn active_session_count(&self) -> usize {
320        self.active_sessions.len()
321    }
322
323    /// Extracts the key from an event.
324    fn extract_key(&self, event: &Event) -> Vec<u8> {
325        use arrow_array::cast::AsArray;
326        use arrow_array::types::Int64Type;
327
328        if let Some(col_idx) = self.key_column {
329            if col_idx < event.data.num_columns() {
330                let column = event.data.column(col_idx);
331                if let Some(array) = column.as_primitive_opt::<Int64Type>() {
332                    if !array.is_empty() && !array.is_null(0) {
333                        return array.value(0).to_be_bytes().to_vec();
334                    }
335                }
336                // Try string column
337                if let Some(array) = column.as_string_opt::<i32>() {
338                    if !array.is_empty() && !array.is_null(0) {
339                        return array.value(0).as_bytes().to_vec();
340                    }
341                }
342            }
343        }
344        // Default: global session (empty key)
345        Vec::new()
346    }
347
348    /// Computes a hash for the key.
349    fn key_hash(key: &[u8]) -> u64 {
350        use std::hash::{Hash, Hasher};
351        let mut hasher = fxhash::FxHasher::default();
352        key.hash(&mut hasher);
353        hasher.finish()
354    }
355
356    /// Generates the state key for session metadata.
357    fn session_state_key(key_hash: u64) -> [u8; 12] {
358        let mut key = [0u8; 12];
359        key[..4].copy_from_slice(SESSION_STATE_PREFIX);
360        key[4..12].copy_from_slice(&key_hash.to_be_bytes());
361        key
362    }
363
364    /// Generates the state key for session accumulator.
365    fn session_acc_key(key_hash: u64) -> [u8; 12] {
366        let mut key = [0u8; 12];
367        key[..4].copy_from_slice(SESSION_ACC_PREFIX);
368        key[4..12].copy_from_slice(&key_hash.to_be_bytes());
369        key
370    }
371
372    /// Generates the timer key for session closure.
373    fn timer_key(key_hash: u64) -> super::TimerKey {
374        let mut key = super::TimerKey::new();
375        key.push(SESSION_TIMER_PREFIX);
376        key.extend_from_slice(&key_hash.to_be_bytes());
377        key
378    }
379
380    /// Parses a key hash from a timer key.
381    fn key_hash_from_timer(timer_key: &[u8]) -> Option<u64> {
382        if timer_key.len() != 9 || timer_key[0] != SESSION_TIMER_PREFIX {
383            return None;
384        }
385        let hash_bytes: [u8; 8] = timer_key[1..9].try_into().ok()?;
386        Some(u64::from_be_bytes(hash_bytes))
387    }
388
389    /// Gets or creates a session for a key.
390    fn get_or_create_session(
391        &mut self,
392        key_hash: u64,
393        key: Vec<u8>,
394        timestamp: i64,
395        state: &mut dyn StateStore,
396    ) -> SessionState {
397        // Check in-memory cache first
398        if let Some(session) = self.active_sessions.get(&key_hash) {
399            return session.clone();
400        }
401
402        // Check persistent state
403        let state_key = Self::session_state_key(key_hash);
404        if let Ok(Some(session)) = state.get_typed::<SessionState>(&state_key) {
405            self.active_sessions.insert(key_hash, session.clone());
406            return session;
407        }
408
409        // Create new session
410        let session = SessionState::new(timestamp, self.gap_ms, key);
411        self.active_sessions.insert(key_hash, session.clone());
412        session
413    }
414
415    /// Gets the accumulator for a session.
416    fn get_accumulator(&self, key_hash: u64, state: &dyn StateStore) -> A::Acc {
417        let acc_key = Self::session_acc_key(key_hash);
418        state
419            .get_typed::<A::Acc>(&acc_key)
420            .ok()
421            .flatten()
422            .unwrap_or_else(|| self.aggregator.create_accumulator())
423    }
424
425    /// Stores session state and accumulator.
426    fn put_session(
427        key_hash: u64,
428        session: &SessionState,
429        acc: &A::Acc,
430        state: &mut dyn StateStore,
431    ) -> Result<(), OperatorError> {
432        let state_key = Self::session_state_key(key_hash);
433        let acc_key = Self::session_acc_key(key_hash);
434
435        state
436            .put_typed(&state_key, session)
437            .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))?;
438        state
439            .put_typed(&acc_key, acc)
440            .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))?;
441
442        Ok(())
443    }
444
445    /// Deletes session state and accumulator.
446    fn delete_session(
447        &mut self,
448        key_hash: u64,
449        state: &mut dyn StateStore,
450    ) -> Result<(), OperatorError> {
451        let state_key = Self::session_state_key(key_hash);
452        let acc_key = Self::session_acc_key(key_hash);
453
454        state
455            .delete(&state_key)
456            .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))?;
457        state
458            .delete(&acc_key)
459            .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))?;
460
461        self.active_sessions.remove(&key_hash);
462        self.pending_timers.remove(&key_hash);
463
464        Ok(())
465    }
466
467    /// Registers or updates a timer for session closure.
468    fn register_timer(&mut self, key_hash: u64, session: &SessionState, ctx: &mut OperatorContext) {
469        let trigger_time = session.end + self.allowed_lateness_ms;
470
471        // Cancel previous timer if different
472        if let Some(&old_time) = self.pending_timers.get(&key_hash) {
473            if old_time == trigger_time {
474                return; // Timer already set for correct time
475            }
476            // Note: We can't cancel timers, but the handler will check if session still exists
477        }
478
479        let timer_key = Self::timer_key(key_hash);
480        ctx.timers
481            .register_timer(trigger_time, Some(timer_key), Some(ctx.operator_index));
482        self.pending_timers.insert(key_hash, trigger_time);
483    }
484
485    /// Checks if an event is late for its potential session.
486    fn is_late(&self, timestamp: i64, watermark: i64) -> bool {
487        // An event is late if its session would have already closed
488        // Session end = timestamp + gap, cleanup = session end + allowed lateness
489        let potential_cleanup = timestamp + self.gap_ms + self.allowed_lateness_ms;
490        watermark >= potential_cleanup
491    }
492
493    /// Creates an output event from a session.
494    fn create_output(&self, session: &SessionState, acc: &A::Acc) -> Option<Event> {
495        if acc.is_empty() {
496            return None;
497        }
498
499        let result = acc.result();
500        let result_i64 = result.to_i64();
501
502        let batch = RecordBatch::try_new(
503            Arc::clone(&self.output_schema),
504            vec![
505                Arc::new(Int64Array::from(vec![session.start])),
506                Arc::new(Int64Array::from(vec![session.end])),
507                Arc::new(Int64Array::from(vec![result_i64])),
508            ],
509        )
510        .ok()?;
511
512        Some(Event::new(session.end, batch))
513    }
514
515    /// Finds overlapping sessions for potential merging.
516    ///
517    /// # Future Enhancement
518    ///
519    /// This method is prepared for session merging when late data arrives
520    /// that bridges two previously separate sessions. Currently only single
521    /// session per key is supported.
522    #[allow(dead_code)]
523    fn find_overlapping_sessions(&self, key_hash: u64, _timestamp: i64) -> Vec<u64> {
524        // For now, we only support single session per key
525        // Future: Could scan for sessions with overlapping time ranges
526        if self.active_sessions.contains_key(&key_hash) {
527            vec![key_hash]
528        } else {
529            vec![]
530        }
531    }
532}
533
534impl<A: Aggregator> Operator for SessionWindowOperator<A>
535where
536    A::Acc: 'static
537        + Archive
538        + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
539    <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
540        + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
541{
542    fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
543        let event_time = event.timestamp;
544        let mut output = OutputVec::new();
545
546        // Update watermark
547        let emitted_watermark = ctx.watermark_generator.on_event(event_time);
548
549        // Check if event is late
550        let current_wm = ctx.watermark_generator.current_watermark();
551        if current_wm > i64::MIN && self.is_late(event_time, current_wm) {
552            // F011B: EMIT FINAL drops late data entirely
553            if self.emit_strategy.drops_late_data() {
554                self.late_data_metrics.record_dropped();
555                return output;
556            }
557
558            if let Some(side_output_name) = self.late_data_config.side_output() {
559                self.late_data_metrics.record_side_output();
560                output.push(Output::SideOutput {
561                    name: side_output_name.to_string(),
562                    event: event.clone(),
563                });
564            } else {
565                self.late_data_metrics.record_dropped();
566                output.push(Output::LateEvent(event.clone()));
567            }
568            return output;
569        }
570
571        // Extract key and compute hash
572        let key = self.extract_key(event);
573        let key_hash = Self::key_hash(&key);
574
575        // Get or create session
576        let mut session = self.get_or_create_session(key_hash, key.clone(), event_time, ctx.state);
577
578        // Check if event extends existing session or needs a new one
579        if session.contains(event_time, self.gap_ms) {
580            // Extend existing session
581            session.extend(event_time, self.gap_ms);
582        } else if event_time < session.start {
583            // Event is before session start - extend backwards
584            session.extend(event_time, self.gap_ms);
585        } else {
586            // Event is after gap - this is a new session
587            // First, emit the old session if OnUpdate strategy
588            if matches!(self.emit_strategy, EmitStrategy::OnUpdate) {
589                let old_acc = self.get_accumulator(key_hash, ctx.state);
590                if let Some(old_event) = self.create_output(&session, &old_acc) {
591                    output.push(Output::Event(old_event));
592                }
593            }
594
595            // Start new session
596            session = SessionState::new(event_time, self.gap_ms, key);
597            // Reset accumulator for new session
598            let new_acc = self.aggregator.create_accumulator();
599            let _ = Self::put_session(key_hash, &session, &new_acc, ctx.state);
600        }
601
602        // Get and update accumulator
603        let mut acc = self.get_accumulator(key_hash, ctx.state);
604        if let Some(value) = self.aggregator.extract(event) {
605            acc.add(value);
606        }
607
608        // Store updated state
609        if Self::put_session(key_hash, &session, &acc, ctx.state).is_ok() {
610            self.active_sessions.insert(key_hash, session.clone());
611        }
612
613        // Register timer for session closure
614        self.register_timer(key_hash, &session, ctx);
615
616        // Emit watermark if generated
617        if let Some(wm) = emitted_watermark {
618            output.push(Output::Watermark(wm.timestamp()));
619        }
620
621        // Handle emit strategy
622        match &self.emit_strategy {
623            EmitStrategy::OnUpdate => {
624                if let Some(event) = self.create_output(&session, &acc) {
625                    output.push(Output::Event(event));
626                }
627            }
628            EmitStrategy::Changelog => {
629                if let Some(event) = self.create_output(&session, &acc) {
630                    let record = ChangelogRecord::insert(event, ctx.processing_time);
631                    output.push(Output::Changelog(record));
632                }
633            }
634            // Other strategies: no intermediate emission
635            EmitStrategy::OnWatermark
636            | EmitStrategy::Periodic(_)
637            | EmitStrategy::OnWindowClose
638            | EmitStrategy::Final => {}
639        }
640
641        output
642    }
643
644    fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
645        let mut output = OutputVec::new();
646
647        // Parse key hash from timer
648        let Some(key_hash) = Self::key_hash_from_timer(&timer.key) else {
649            return output;
650        };
651
652        // Check if this timer is still valid
653        let Some(expected_time) = self.pending_timers.get(&key_hash) else {
654            return output; // Timer was cancelled or session no longer exists
655        };
656
657        if *expected_time != timer.timestamp {
658            return output; // Stale timer, session was extended
659        }
660
661        // Get session state
662        let Some(session) = self.active_sessions.get(&key_hash).cloned() else {
663            self.pending_timers.remove(&key_hash);
664            return output;
665        };
666
667        // Get accumulator
668        let acc = self.get_accumulator(key_hash, ctx.state);
669
670        // Create output
671        if let Some(event) = self.create_output(&session, &acc) {
672            match &self.emit_strategy {
673                EmitStrategy::Changelog => {
674                    let record = ChangelogRecord::insert(event, ctx.processing_time);
675                    output.push(Output::Changelog(record));
676                }
677                _ => {
678                    output.push(Output::Event(event));
679                }
680            }
681        }
682
683        // Clean up session
684        let _ = self.delete_session(key_hash, ctx.state);
685
686        output
687    }
688
689    fn checkpoint(&self) -> OperatorState {
690        // Serialize active session key hashes and their timer times
691        let checkpoint_data: Vec<(u64, i64)> =
692            self.pending_timers.iter().map(|(&k, &v)| (k, v)).collect();
693
694        let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
695            .map(|v| v.to_vec())
696            .unwrap_or_default();
697
698        OperatorState {
699            operator_id: self.operator_id.clone(),
700            data,
701        }
702    }
703
704    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
705        if state.operator_id != self.operator_id {
706            return Err(OperatorError::StateAccessFailed(format!(
707                "Operator ID mismatch: expected {}, got {}",
708                self.operator_id, state.operator_id
709            )));
710        }
711
712        if state.data.is_empty() {
713            return Ok(());
714        }
715
716        let archived = rkyv::access::<rkyv::Archived<Vec<(u64, i64)>>, RkyvError>(&state.data)
717            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
718        let timers: Vec<(u64, i64)> = rkyv::deserialize::<Vec<(u64, i64)>, RkyvError>(archived)
719            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
720
721        self.pending_timers = timers.into_iter().collect();
722        // Note: active_sessions will be populated lazily from state store
723
724        Ok(())
725    }
726}
727
728/// Session metrics for monitoring.
729#[derive(Debug, Clone, Default)]
730pub struct SessionMetrics {
731    /// Total sessions created
732    pub sessions_created: u64,
733    /// Total sessions closed
734    pub sessions_closed: u64,
735    /// Total sessions merged
736    pub sessions_merged: u64,
737    /// Current active sessions
738    pub active_sessions: u64,
739}
740
741#[cfg(test)]
742mod tests {
743    use super::*;
744    use crate::operator::window::{CountAccumulator, CountAggregator, SumAggregator};
745    use crate::state::InMemoryStore;
746    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
747    use arrow_array::{Int64Array, RecordBatch};
748    use arrow_schema::{DataType, Field, Schema};
749
750    fn create_test_event(timestamp: i64, value: i64) -> Event {
751        let schema = Arc::new(Schema::new(vec![Field::new(
752            "value",
753            DataType::Int64,
754            false,
755        )]));
756        let batch =
757            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
758        Event::new(timestamp, batch)
759    }
760
761    fn create_keyed_event(timestamp: i64, key: i64, value: i64) -> Event {
762        let schema = Arc::new(Schema::new(vec![
763            Field::new("key", DataType::Int64, false),
764            Field::new("value", DataType::Int64, false),
765        ]));
766        let batch = RecordBatch::try_new(
767            schema,
768            vec![
769                Arc::new(Int64Array::from(vec![key])),
770                Arc::new(Int64Array::from(vec![value])),
771            ],
772        )
773        .unwrap();
774        Event::new(timestamp, batch)
775    }
776
777    fn create_test_context<'a>(
778        timers: &'a mut TimerService,
779        state: &'a mut dyn StateStore,
780        watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
781    ) -> OperatorContext<'a> {
782        OperatorContext {
783            event_time: 0,
784            processing_time: 0,
785            timers,
786            state,
787            watermark_generator: watermark_gen,
788            operator_index: 0,
789        }
790    }
791
792    #[test]
793    fn test_session_operator_creation() {
794        let aggregator = CountAggregator::new();
795        let operator = SessionWindowOperator::new(
796            Duration::from_secs(30),
797            aggregator,
798            Duration::from_secs(60),
799        );
800
801        assert_eq!(operator.gap_ms(), 30_000);
802        assert_eq!(operator.allowed_lateness_ms(), 60_000);
803        assert_eq!(operator.active_session_count(), 0);
804        assert_eq!(*operator.emit_strategy(), EmitStrategy::OnWatermark);
805    }
806
807    #[test]
808    fn test_session_operator_with_id() {
809        let aggregator = CountAggregator::new();
810        let operator = SessionWindowOperator::with_id(
811            Duration::from_secs(30),
812            aggregator,
813            Duration::from_secs(0),
814            "test_session".to_string(),
815        );
816
817        assert_eq!(operator.operator_id, "test_session");
818    }
819
820    #[test]
821    fn test_session_state_creation() {
822        let state = SessionState::new(1000, 5000, vec![1, 2, 3]);
823
824        assert_eq!(state.start, 1000);
825        assert_eq!(state.end, 6000); // start + gap
826        assert_eq!(state.key, vec![1, 2, 3]);
827    }
828
829    #[test]
830    fn test_session_state_contains() {
831        let state = SessionState::new(1000, 5000, vec![]);
832
833        // Within session
834        assert!(state.contains(1000, 5000));
835        assert!(state.contains(3000, 5000));
836        assert!(state.contains(5999, 5000)); // Just before end
837
838        // Within gap extension
839        assert!(state.contains(6000, 5000));
840        assert!(state.contains(10999, 5000)); // end + gap - 1
841
842        // Outside
843        assert!(!state.contains(999, 5000)); // Before start
844        assert!(!state.contains(11000, 5000)); // After end + gap
845    }
846
847    #[test]
848    fn test_session_state_extend() {
849        let mut state = SessionState::new(1000, 5000, vec![]);
850
851        // Extend forward
852        state.extend(8000, 5000);
853        assert_eq!(state.start, 1000);
854        assert_eq!(state.end, 13000); // 8000 + 5000
855
856        // Extend backward
857        state.extend(500, 5000);
858        assert_eq!(state.start, 500);
859        assert_eq!(state.end, 13000); // Unchanged (max)
860    }
861
862    #[test]
863    fn test_session_state_merge() {
864        let mut state1 = SessionState::new(1000, 5000, vec![]);
865        let state2 = SessionState::new(8000, 5000, vec![]);
866
867        state1.merge(&state2);
868        assert_eq!(state1.start, 1000);
869        assert_eq!(state1.end, 13000); // max(6000, 13000)
870    }
871
872    #[test]
873    fn test_session_single_event() {
874        let aggregator = CountAggregator::new();
875        let mut operator = SessionWindowOperator::with_id(
876            Duration::from_millis(1000),
877            aggregator,
878            Duration::from_millis(0),
879            "test_op".to_string(),
880        );
881
882        let mut timers = TimerService::new();
883        let mut state = InMemoryStore::new();
884        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
885
886        let event = create_test_event(500, 1);
887        {
888            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
889            operator.process(&event, &mut ctx);
890        }
891
892        assert_eq!(operator.active_session_count(), 1);
893        assert_eq!(operator.pending_timers.len(), 1);
894    }
895
896    #[test]
897    fn test_session_multiple_events_same_session() {
898        let aggregator = CountAggregator::new();
899        let mut operator = SessionWindowOperator::with_id(
900            Duration::from_millis(1000),
901            aggregator,
902            Duration::from_millis(0),
903            "test_op".to_string(),
904        );
905
906        let mut timers = TimerService::new();
907        let mut state = InMemoryStore::new();
908        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
909
910        // Events within gap (1000ms)
911        for ts in [100, 500, 900, 1500] {
912            let event = create_test_event(ts, 1);
913            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
914            operator.process(&event, &mut ctx);
915        }
916
917        // All events should be in the same session
918        assert_eq!(operator.active_session_count(), 1);
919
920        // Verify accumulator
921        let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
922        let acc: CountAccumulator = operator.get_accumulator(key_hash, &state);
923        assert_eq!(acc.result(), 4);
924    }
925
926    #[test]
927    fn test_session_gap_creates_new_session() {
928        let aggregator = CountAggregator::new();
929        let mut operator = SessionWindowOperator::with_id(
930            Duration::from_millis(1000),
931            aggregator,
932            Duration::from_millis(0),
933            "test_op".to_string(),
934        );
935        operator.set_emit_strategy(EmitStrategy::OnUpdate);
936
937        let mut timers = TimerService::new();
938        let mut state = InMemoryStore::new();
939        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
940
941        // First session
942        let event1 = create_test_event(100, 1);
943        {
944            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
945            operator.process(&event1, &mut ctx);
946        }
947
948        // Gap > 1000ms, should create new session
949        let event2 = create_test_event(3000, 1);
950        let outputs = {
951            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
952            operator.process(&event2, &mut ctx)
953        };
954
955        // Should have emitted old session (OnUpdate) and new session update
956        let event_count = outputs
957            .iter()
958            .filter(|o| matches!(o, Output::Event(_)))
959            .count();
960        assert!(event_count >= 1);
961    }
962
963    #[test]
964    fn test_session_timer_triggers_emission() {
965        let aggregator = CountAggregator::new();
966        let mut operator = SessionWindowOperator::with_id(
967            Duration::from_millis(1000),
968            aggregator,
969            Duration::from_millis(0),
970            "test_op".to_string(),
971        );
972
973        let mut timers = TimerService::new();
974        let mut state = InMemoryStore::new();
975        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
976
977        // Create session
978        let event = create_test_event(500, 1);
979        {
980            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
981            operator.process(&event, &mut ctx);
982        }
983
984        // Get the pending timer
985        let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
986        let timer_time = *operator.pending_timers.get(&key_hash).unwrap();
987
988        // Fire timer
989        let timer = Timer {
990            key: SessionWindowOperator::<CountAggregator>::timer_key(key_hash),
991            timestamp: timer_time,
992        };
993
994        let outputs = {
995            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
996            operator.on_timer(timer, &mut ctx)
997        };
998
999        assert_eq!(outputs.len(), 1);
1000        match &outputs[0] {
1001            Output::Event(e) => {
1002                assert_eq!(e.timestamp, 1500); // 500 + gap (1000)
1003                let result = e
1004                    .data
1005                    .column(2)
1006                    .as_any()
1007                    .downcast_ref::<Int64Array>()
1008                    .unwrap()
1009                    .value(0);
1010                assert_eq!(result, 1);
1011            }
1012            _ => panic!("Expected Event output"),
1013        }
1014
1015        // Session should be cleaned up
1016        assert_eq!(operator.active_session_count(), 0);
1017    }
1018
1019    #[test]
1020    fn test_session_keyed_tracking() {
1021        let aggregator = SumAggregator::new(1); // Sum column 1 (value)
1022        let mut operator = SessionWindowOperator::with_id(
1023            Duration::from_millis(1000),
1024            aggregator,
1025            Duration::from_millis(0),
1026            "test_op".to_string(),
1027        );
1028        operator.set_key_column(0); // Key by column 0
1029
1030        let mut timers = TimerService::new();
1031        let mut state = InMemoryStore::new();
1032        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1033
1034        // Events for key 1
1035        let event1 = create_keyed_event(100, 1, 10);
1036        let event2 = create_keyed_event(500, 1, 20);
1037
1038        // Events for key 2
1039        let event3 = create_keyed_event(200, 2, 100);
1040        let event4 = create_keyed_event(600, 2, 200);
1041
1042        for event in [event1, event2, event3, event4] {
1043            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1044            operator.process(&event, &mut ctx);
1045        }
1046
1047        // Should have 2 active sessions (one per key)
1048        assert_eq!(operator.active_session_count(), 2);
1049    }
1050
1051    #[test]
1052    fn test_session_late_event_dropped() {
1053        let aggregator = CountAggregator::new();
1054        let mut operator = SessionWindowOperator::with_id(
1055            Duration::from_millis(1000),
1056            aggregator,
1057            Duration::from_millis(0),
1058            "test_op".to_string(),
1059        );
1060
1061        let mut timers = TimerService::new();
1062        let mut state = InMemoryStore::new();
1063        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1064
1065        // Advance watermark far ahead
1066        let event1 = create_test_event(10000, 1);
1067        {
1068            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1069            operator.process(&event1, &mut ctx);
1070        }
1071
1072        // Process late event
1073        let late_event = create_test_event(100, 1);
1074        let outputs = {
1075            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1076            operator.process(&late_event, &mut ctx)
1077        };
1078
1079        // Should emit LateEvent
1080        let is_late = outputs.iter().any(|o| matches!(o, Output::LateEvent(_)));
1081        assert!(is_late);
1082        assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
1083    }
1084
1085    #[test]
1086    fn test_session_late_event_side_output() {
1087        let aggregator = CountAggregator::new();
1088        let mut operator = SessionWindowOperator::with_id(
1089            Duration::from_millis(1000),
1090            aggregator,
1091            Duration::from_millis(0),
1092            "test_op".to_string(),
1093        );
1094        operator.set_late_data_config(LateDataConfig::with_side_output("late".to_string()));
1095
1096        let mut timers = TimerService::new();
1097        let mut state = InMemoryStore::new();
1098        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1099
1100        // Advance watermark
1101        let event1 = create_test_event(10000, 1);
1102        {
1103            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1104            operator.process(&event1, &mut ctx);
1105        }
1106
1107        // Process late event
1108        let late_event = create_test_event(100, 1);
1109        let outputs = {
1110            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1111            operator.process(&late_event, &mut ctx)
1112        };
1113
1114        // Should emit SideOutput
1115        let side_output = outputs.iter().find_map(|o| {
1116            if let Output::SideOutput { name, .. } = o {
1117                Some(name.clone())
1118            } else {
1119                None
1120            }
1121        });
1122        assert_eq!(side_output, Some("late".to_string()));
1123        assert_eq!(operator.late_data_metrics().late_events_side_output(), 1);
1124    }
1125
1126    #[test]
1127    fn test_session_emit_on_update() {
1128        let aggregator = CountAggregator::new();
1129        let mut operator = SessionWindowOperator::with_id(
1130            Duration::from_millis(1000),
1131            aggregator,
1132            Duration::from_millis(0),
1133            "test_op".to_string(),
1134        );
1135        operator.set_emit_strategy(EmitStrategy::OnUpdate);
1136
1137        let mut timers = TimerService::new();
1138        let mut state = InMemoryStore::new();
1139        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1140
1141        let event = create_test_event(500, 1);
1142        let outputs = {
1143            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1144            operator.process(&event, &mut ctx)
1145        };
1146
1147        // Should emit intermediate result
1148        let event_count = outputs
1149            .iter()
1150            .filter(|o| matches!(o, Output::Event(_)))
1151            .count();
1152        assert_eq!(event_count, 1);
1153    }
1154
1155    #[test]
1156    fn test_session_emit_changelog() {
1157        let aggregator = CountAggregator::new();
1158        let mut operator = SessionWindowOperator::with_id(
1159            Duration::from_millis(1000),
1160            aggregator,
1161            Duration::from_millis(0),
1162            "test_op".to_string(),
1163        );
1164        operator.set_emit_strategy(EmitStrategy::Changelog);
1165
1166        let mut timers = TimerService::new();
1167        let mut state = InMemoryStore::new();
1168        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1169
1170        let event = create_test_event(500, 1);
1171        let outputs = {
1172            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1173            operator.process(&event, &mut ctx)
1174        };
1175
1176        // Should emit changelog record
1177        let changelog_count = outputs
1178            .iter()
1179            .filter(|o| matches!(o, Output::Changelog(_)))
1180            .count();
1181        assert_eq!(changelog_count, 1);
1182    }
1183
1184    #[test]
1185    fn test_session_emit_final_drops_late() {
1186        let aggregator = CountAggregator::new();
1187        let mut operator = SessionWindowOperator::with_id(
1188            Duration::from_millis(1000),
1189            aggregator,
1190            Duration::from_millis(0),
1191            "test_op".to_string(),
1192        );
1193        operator.set_emit_strategy(EmitStrategy::Final);
1194
1195        let mut timers = TimerService::new();
1196        let mut state = InMemoryStore::new();
1197        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1198
1199        // Advance watermark
1200        let event1 = create_test_event(10000, 1);
1201        {
1202            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1203            operator.process(&event1, &mut ctx);
1204        }
1205
1206        // Process late event - should be silently dropped
1207        let late_event = create_test_event(100, 1);
1208        let outputs = {
1209            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1210            operator.process(&late_event, &mut ctx)
1211        };
1212
1213        // Should NOT emit LateEvent (dropped silently)
1214        assert!(outputs.is_empty());
1215        assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
1216    }
1217
1218    #[test]
1219    fn test_session_checkpoint_restore() {
1220        let aggregator = CountAggregator::new();
1221        let mut operator = SessionWindowOperator::with_id(
1222            Duration::from_millis(1000),
1223            aggregator.clone(),
1224            Duration::from_millis(0),
1225            "test_op".to_string(),
1226        );
1227
1228        let mut timers = TimerService::new();
1229        let mut state = InMemoryStore::new();
1230        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1231
1232        // Create some sessions
1233        for ts in [100, 500] {
1234            let event = create_test_event(ts, 1);
1235            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1236            operator.process(&event, &mut ctx);
1237        }
1238
1239        // Checkpoint
1240        let checkpoint = operator.checkpoint();
1241
1242        // Create new operator and restore
1243        let mut restored = SessionWindowOperator::with_id(
1244            Duration::from_millis(1000),
1245            aggregator,
1246            Duration::from_millis(0),
1247            "test_op".to_string(),
1248        );
1249        restored.restore(checkpoint).unwrap();
1250
1251        // Pending timers should be restored
1252        assert_eq!(restored.pending_timers.len(), 1);
1253    }
1254
1255    #[test]
1256    fn test_session_stale_timer_ignored() {
1257        let aggregator = CountAggregator::new();
1258        let mut operator = SessionWindowOperator::with_id(
1259            Duration::from_millis(1000),
1260            aggregator,
1261            Duration::from_millis(0),
1262            "test_op".to_string(),
1263        );
1264
1265        let mut timers = TimerService::new();
1266        let mut state = InMemoryStore::new();
1267        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1268
1269        // Create session
1270        let event1 = create_test_event(500, 1);
1271        {
1272            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1273            operator.process(&event1, &mut ctx);
1274        }
1275
1276        let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
1277        let old_timer_time = *operator.pending_timers.get(&key_hash).unwrap();
1278
1279        // Extend session
1280        let event2 = create_test_event(1200, 1);
1281        {
1282            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1283            operator.process(&event2, &mut ctx);
1284        }
1285
1286        // Fire stale timer
1287        let stale_timer = Timer {
1288            key: SessionWindowOperator::<CountAggregator>::timer_key(key_hash),
1289            timestamp: old_timer_time,
1290        };
1291
1292        let outputs = {
1293            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1294            operator.on_timer(stale_timer, &mut ctx)
1295        };
1296
1297        // Stale timer should be ignored
1298        assert!(outputs.is_empty());
1299        // Session should still be active
1300        assert_eq!(operator.active_session_count(), 1);
1301    }
1302
1303    #[test]
1304    fn test_session_window_id() {
1305        let state = SessionState::new(1000, 5000, vec![]);
1306        let window_id = state.window_id();
1307
1308        assert_eq!(window_id.start, 1000);
1309        assert_eq!(window_id.end, 6000);
1310    }
1311
1312    #[test]
1313    fn test_timer_key_roundtrip() {
1314        let key_hash = 0x1234_5678_9ABC_DEF0u64;
1315        let timer_key = SessionWindowOperator::<CountAggregator>::timer_key(key_hash);
1316        let parsed = SessionWindowOperator::<CountAggregator>::key_hash_from_timer(&timer_key);
1317
1318        assert_eq!(parsed, Some(key_hash));
1319    }
1320
1321    #[test]
1322    fn test_timer_key_invalid() {
1323        // Wrong prefix
1324        let invalid1 = vec![0x02, 0, 0, 0, 0, 0, 0, 0, 0];
1325        assert!(SessionWindowOperator::<CountAggregator>::key_hash_from_timer(&invalid1).is_none());
1326
1327        // Wrong length
1328        let invalid2 = vec![SESSION_TIMER_PREFIX, 0, 0, 0];
1329        assert!(SessionWindowOperator::<CountAggregator>::key_hash_from_timer(&invalid2).is_none());
1330    }
1331
1332    #[test]
1333    fn test_session_sum_aggregation() {
1334        let aggregator = SumAggregator::new(0);
1335        let mut operator = SessionWindowOperator::with_id(
1336            Duration::from_millis(1000),
1337            aggregator,
1338            Duration::from_millis(0),
1339            "test_op".to_string(),
1340        );
1341
1342        let mut timers = TimerService::new();
1343        let mut state = InMemoryStore::new();
1344        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1345
1346        // Process events with different values
1347        for (ts, value) in [(100, 10), (500, 20), (800, 30)] {
1348            let event = create_test_event(ts, value);
1349            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1350            operator.process(&event, &mut ctx);
1351        }
1352
1353        // Fire timer
1354        let key_hash = SessionWindowOperator::<SumAggregator>::key_hash(&[]);
1355        let timer_time = *operator.pending_timers.get(&key_hash).unwrap();
1356        let timer = Timer {
1357            key: SessionWindowOperator::<SumAggregator>::timer_key(key_hash),
1358            timestamp: timer_time,
1359        };
1360
1361        let outputs = {
1362            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1363            operator.on_timer(timer, &mut ctx)
1364        };
1365
1366        match &outputs[0] {
1367            Output::Event(e) => {
1368                let result = e
1369                    .data
1370                    .column(2)
1371                    .as_any()
1372                    .downcast_ref::<Int64Array>()
1373                    .unwrap()
1374                    .value(0);
1375                assert_eq!(result, 60); // 10 + 20 + 30
1376            }
1377            _ => panic!("Expected Event output"),
1378        }
1379    }
1380
1381    #[test]
1382    fn test_session_output_schema() {
1383        let schema = create_session_output_schema();
1384
1385        assert_eq!(schema.fields().len(), 3);
1386        assert_eq!(schema.field(0).name(), "session_start");
1387        assert_eq!(schema.field(1).name(), "session_end");
1388        assert_eq!(schema.field(2).name(), "result");
1389    }
1390}