Skip to main content

laminar_core/operator/
window.rs

1//! # Window Operators
2//!
3//! Implementation of various window types for stream processing.
4//!
5//! ## Window Types
6//!
7//! - **Tumbling**: Fixed-size, non-overlapping windows (implemented)
8//! - **Sliding**: Fixed-size, overlapping windows (future)
9//! - **Session**: Dynamic windows based on activity gaps (future)
10//!
11//! ## Emit Strategies
12//!
13//! Windows support different emission strategies via [`EmitStrategy`]:
14//!
15//! - `OnWatermark` (default): Emit results when watermark passes window end
16//! - `Periodic`: Emit intermediate results at fixed intervals
17//! - `OnUpdate`: Emit after every state change (most expensive)
18//!
19//! ## Example
20//!
21//! ```rust,no_run
22//! use laminar_core::operator::window::{
23//!     TumblingWindowAssigner, TumblingWindowOperator, CountAggregator, EmitStrategy,
24//! };
25//! use std::time::Duration;
26//!
27//! // Create a 1-minute tumbling window with count aggregation
28//! let assigner = TumblingWindowAssigner::new(Duration::from_secs(60));
29//! let mut operator = TumblingWindowOperator::new(
30//!     assigner,
31//!     CountAggregator::new(),
32//!     Duration::from_secs(5), // 5 second grace period
33//! );
34//!
35//! // Emit intermediate results every 10 seconds
36//! operator.set_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(10)));
37//! ```
38
39use super::{
40    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
41};
42use crate::state::{StateStore, StateStoreExt};
43use arrow_array::{Array as ArrowArray, Int64Array, RecordBatch};
44use arrow_schema::{DataType, Field, Schema, SchemaRef};
45use rkyv::{
46    api::high::{HighDeserializer, HighSerializer, HighValidator},
47    bytecheck::CheckBytes,
48    rancor::Error as RkyvError,
49    ser::allocator::ArenaHandle,
50    util::AlignedVec,
51    Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
52};
53use smallvec::SmallVec;
54use std::marker::PhantomData;
55use std::sync::atomic::{AtomicU64, Ordering};
56use std::sync::Arc;
57use std::time::Duration;
58
59/// Configuration for late data handling.
60///
61/// Controls what happens to events that arrive after their window has closed
62/// (i.e., after `window_end + allowed_lateness`).
63///
64/// # Example
65///
66/// ```rust,no_run
67/// use laminar_core::operator::window::LateDataConfig;
68/// use std::time::Duration;
69///
70/// // Route late events to a side output called "late_events"
71/// let config = LateDataConfig::with_side_output("late_events".to_string());
72///
73/// // Drop late events (default behavior)
74/// let config = LateDataConfig::drop();
75/// ```
76#[derive(Debug, Clone, PartialEq, Eq, Default)]
77pub struct LateDataConfig {
78    /// Name of the side output for late data (None = drop late events)
79    side_output: Option<String>,
80}
81
82impl LateDataConfig {
83    /// Creates a config that drops late events (default behavior).
84    #[must_use]
85    pub fn drop() -> Self {
86        Self { side_output: None }
87    }
88
89    /// Creates a config that routes late events to a named side output.
90    #[must_use]
91    pub fn with_side_output(name: String) -> Self {
92        Self {
93            side_output: Some(name),
94        }
95    }
96
97    /// Returns the side output name, if configured.
98    #[must_use]
99    pub fn side_output(&self) -> Option<&str> {
100        self.side_output.as_deref()
101    }
102
103    /// Returns true if late events should be dropped.
104    #[must_use]
105    pub fn should_drop(&self) -> bool {
106        self.side_output.is_none()
107    }
108}
109
110/// Metrics for tracking late data.
111///
112/// These counters track the behavior of the late data handling system
113/// and can be used for monitoring and alerting.
114#[derive(Debug, Clone, Default)]
115#[allow(clippy::struct_field_names)]
116pub struct LateDataMetrics {
117    /// Total number of late events received
118    late_events_total: u64,
119    /// Number of late events dropped (no side output configured)
120    late_events_dropped: u64,
121    /// Number of late events routed to side output
122    late_events_side_output: u64,
123}
124
125impl LateDataMetrics {
126    /// Creates a new metrics tracker.
127    #[must_use]
128    pub fn new() -> Self {
129        Self::default()
130    }
131
132    /// Returns the total number of late events received.
133    #[must_use]
134    pub fn late_events_total(&self) -> u64 {
135        self.late_events_total
136    }
137
138    /// Returns the number of late events that were dropped.
139    #[must_use]
140    pub fn late_events_dropped(&self) -> u64 {
141        self.late_events_dropped
142    }
143
144    /// Returns the number of late events routed to side output.
145    #[must_use]
146    pub fn late_events_side_output(&self) -> u64 {
147        self.late_events_side_output
148    }
149
150    /// Records a dropped late event.
151    pub fn record_dropped(&mut self) {
152        self.late_events_total += 1;
153        self.late_events_dropped += 1;
154    }
155
156    /// Records a late event routed to side output.
157    pub fn record_side_output(&mut self) {
158        self.late_events_total += 1;
159        self.late_events_side_output += 1;
160    }
161
162    /// Resets all counters to zero.
163    pub fn reset(&mut self) {
164        self.late_events_total = 0;
165        self.late_events_dropped = 0;
166        self.late_events_side_output = 0;
167    }
168}
169
170/// Strategy for when window results should be emitted.
171///
172/// This controls the trade-off between result freshness and efficiency:
173/// - `OnWatermark` is most efficient but has highest latency
174/// - `Periodic` balances freshness and efficiency
175/// - `OnUpdate` provides lowest latency but highest overhead
176/// - `OnWindowClose` (F011B) is for append-only sinks
177/// - `Changelog` (F011B) emits Z-set weighted records for CDC
178/// - `Final` (F011B) suppresses all intermediate results
179#[derive(Debug, Clone, PartialEq, Eq, Default)]
180pub enum EmitStrategy {
181    // === Existing (F011) ===
182    /// Emit final results when watermark passes window end (default).
183    ///
184    /// This is the most efficient strategy as it only emits once per window.
185    /// Results are guaranteed to be complete (within allowed lateness bounds).
186    /// May emit retractions if late data arrives within lateness bounds.
187    #[default]
188    OnWatermark,
189
190    /// Emit intermediate results at fixed intervals.
191    ///
192    /// Useful for dashboards and monitoring where periodic updates are needed
193    /// before the window closes. The final result is still emitted on watermark.
194    ///
195    /// The duration specifies the interval between periodic emissions.
196    Periodic(Duration),
197
198    /// Emit updated results after every state change.
199    ///
200    /// This provides the lowest latency for result visibility but has the
201    /// highest overhead. Each incoming event triggers an emission.
202    ///
203    /// Use with caution for high-volume streams.
204    OnUpdate,
205
206    // === New (F011B) ===
207    /// Emit ONLY when watermark passes window end. No intermediate emissions.
208    ///
209    /// **Critical for append-only sinks** (Kafka, S3, Delta Lake, Iceberg).
210    /// Unlike `OnWatermark`, this NEVER emits before window close, even with
211    /// late data retractions. Late data is buffered until next window close.
212    ///
213    /// Key difference from `OnWatermark`:
214    /// - `OnWatermark`: May emit retractions for late data
215    /// - `OnWindowClose`: Buffers late data, only emits final result
216    ///
217    /// SQL: `EMIT ON WINDOW CLOSE`
218    OnWindowClose,
219
220    /// Emit changelog records with Z-set weights.
221    ///
222    /// Every emission includes operation type and weight:
223    /// - Insert (+1 weight)
224    /// - Delete (-1 weight)
225    /// - Update (retraction pair: -1 old, +1 new)
226    ///
227    /// Required for:
228    /// - CDC pipelines
229    /// - Cascading materialized views (F060)
230    /// - Downstream consumers that need to track changes
231    ///
232    /// SQL: `EMIT CHANGES`
233    Changelog,
234
235    /// Suppress ALL intermediate results, emit only finalized.
236    ///
237    /// Similar to `OnWindowClose` but also suppresses:
238    /// - Periodic emissions (even if Periodic was set elsewhere)
239    /// - Late data retractions (drops late data entirely after window close)
240    ///
241    /// Use for BI reporting where only final, exact results matter.
242    ///
243    /// SQL: `EMIT FINAL`
244    Final,
245}
246
247impl EmitStrategy {
248    /// Returns true if this strategy requires periodic timer registration.
249    #[must_use]
250    pub fn needs_periodic_timer(&self) -> bool {
251        matches!(self, Self::Periodic(_))
252    }
253
254    /// Returns the periodic interval if this is a periodic strategy.
255    #[must_use]
256    pub fn periodic_interval(&self) -> Option<Duration> {
257        match self {
258            Self::Periodic(d) => Some(*d),
259            _ => None,
260        }
261    }
262
263    /// Returns true if results should be emitted on every update.
264    #[must_use]
265    pub fn emits_on_update(&self) -> bool {
266        matches!(self, Self::OnUpdate)
267    }
268
269    // === F011B Helper Methods ===
270
271    /// Returns true if this strategy emits intermediate results.
272    ///
273    /// Strategies that emit intermediate results (before window close):
274    /// - `OnUpdate`: emits after every state change
275    /// - `Periodic`: emits at fixed intervals
276    ///
277    /// Strategies that do NOT emit intermediate results:
278    /// - `OnWatermark`: waits for watermark
279    /// - `OnWindowClose`: only emits when window closes
280    /// - `Changelog`: depends on trigger, but typically on watermark
281    /// - `Final`: only emits final result
282    #[must_use]
283    pub fn emits_intermediate(&self) -> bool {
284        matches!(self, Self::OnUpdate | Self::Periodic(_))
285    }
286
287    /// Returns true if this strategy requires changelog/Z-set support.
288    ///
289    /// The `Changelog` strategy requires the operator to track previous
290    /// values and emit insert/delete/update records with weights.
291    #[must_use]
292    pub fn requires_changelog(&self) -> bool {
293        matches!(self, Self::Changelog)
294    }
295
296    /// Returns true if this strategy is suitable for append-only sinks.
297    ///
298    /// Append-only sinks (Kafka, S3, Delta Lake, Iceberg) cannot handle
299    /// retractions or updates. Only these strategies are safe:
300    /// - `OnWindowClose`: guarantees single emission per window
301    /// - `Final`: suppresses all intermediate results
302    #[must_use]
303    pub fn is_append_only_compatible(&self) -> bool {
304        matches!(self, Self::OnWindowClose | Self::Final)
305    }
306
307    /// Returns true if late data should generate retractions.
308    ///
309    /// Strategies that generate retractions for late data:
310    /// - `OnWatermark`: may retract previous result
311    /// - `OnUpdate`: immediately emits updated result
312    /// - `Changelog`: emits -old/+new pair
313    ///
314    /// Strategies that do NOT generate retractions:
315    /// - `OnWindowClose`: buffers late data
316    /// - `Final`: drops late data
317    /// - `Periodic`: depends on whether window is still open
318    #[must_use]
319    pub fn generates_retractions(&self) -> bool {
320        matches!(self, Self::OnWatermark | Self::OnUpdate | Self::Changelog)
321    }
322
323    /// Returns true if this strategy should suppress intermediate emissions.
324    ///
325    /// Used to override periodic timers when a suppressing strategy is active.
326    #[must_use]
327    pub fn suppresses_intermediate(&self) -> bool {
328        matches!(self, Self::OnWindowClose | Self::Final)
329    }
330
331    /// Returns true if late data should be dropped entirely.
332    ///
333    /// The `Final` strategy drops late data to ensure only exact,
334    /// finalized results are emitted.
335    #[must_use]
336    pub fn drops_late_data(&self) -> bool {
337        matches!(self, Self::Final)
338    }
339}
340
341/// Unique identifier for a window.
342///
343/// Windows are identified by their start and end timestamps (in milliseconds).
344/// For tumbling windows, these are non-overlapping intervals.
345#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Archive, RkyvSerialize, RkyvDeserialize)]
346pub struct WindowId {
347    /// Window start timestamp (inclusive, in milliseconds)
348    pub start: i64,
349    /// Window end timestamp (exclusive, in milliseconds)
350    pub end: i64,
351}
352
353impl WindowId {
354    /// Creates a new window ID.
355    #[must_use]
356    pub fn new(start: i64, end: i64) -> Self {
357        Self { start, end }
358    }
359
360    /// Returns the window duration in milliseconds.
361    #[must_use]
362    pub fn duration_ms(&self) -> i64 {
363        self.end - self.start
364    }
365
366    /// Converts the window ID to a byte key for state storage.
367    ///
368    /// Uses `TimerKey` (`SmallVec`) which stores the 16-byte key inline,
369    /// avoiding heap allocation on the hot path.
370    #[inline]
371    #[must_use]
372    pub fn to_key(&self) -> super::TimerKey {
373        super::TimerKey::from(self.to_key_inline())
374    }
375
376    /// Converts the window ID to a stack-allocated byte key.
377    ///
378    /// This is the zero-allocation version for Ring 0 hot path operations.
379    /// Returns a fixed-size array that can be used directly with state stores.
380    #[inline]
381    #[must_use]
382    pub fn to_key_inline(&self) -> [u8; 16] {
383        let mut key = [0u8; 16];
384        key[..8].copy_from_slice(&self.start.to_be_bytes());
385        key[8..16].copy_from_slice(&self.end.to_be_bytes());
386        key
387    }
388
389    /// Parses a window ID from a byte key.
390    ///
391    /// # Errors
392    ///
393    /// Returns `None` if the key is not exactly 16 bytes.
394    #[must_use]
395    pub fn from_key(key: &[u8]) -> Option<Self> {
396        if key.len() != 16 {
397            return None;
398        }
399        let start = i64::from_be_bytes(key[0..8].try_into().ok()?);
400        let end = i64::from_be_bytes(key[8..16].try_into().ok()?);
401        Some(Self { start, end })
402    }
403}
404
405/// Collection type for window assignments.
406///
407/// Uses `SmallVec` to avoid heap allocation for common cases:
408/// - 1 window: tumbling windows (most common)
409/// - 2-4 windows: sliding windows with small overlap
410pub type WindowIdVec = SmallVec<[WindowId; 4]>;
411
412// === F011B: Changelog/Z-Set Support ===
413
414/// CDC operation type for changelog records.
415///
416/// These map to Z-set weights:
417/// - `Insert`: +1 weight
418/// - `Delete`: -1 weight
419/// - `UpdateBefore`: -1 weight (first half of update)
420/// - `UpdateAfter`: +1 weight (second half of update)
421#[derive(Debug, Clone, Copy, PartialEq, Eq, Archive, RkyvSerialize, RkyvDeserialize)]
422pub enum CdcOperation {
423    /// Insert a new record (+1 weight)
424    Insert,
425    /// Delete an existing record (-1 weight)
426    Delete,
427    /// Retraction of previous value before update (-1 weight)
428    UpdateBefore,
429    /// New value after update (+1 weight)
430    UpdateAfter,
431}
432
433impl CdcOperation {
434    /// Returns the Z-set weight for this operation.
435    ///
436    /// - Insert/UpdateAfter: +1
437    /// - Delete/UpdateBefore: -1
438    #[must_use]
439    pub fn weight(&self) -> i32 {
440        match self {
441            Self::Insert | Self::UpdateAfter => 1,
442            Self::Delete | Self::UpdateBefore => -1,
443        }
444    }
445
446    /// Returns true if this is an insert-type operation.
447    #[must_use]
448    pub fn is_insert(&self) -> bool {
449        matches!(self, Self::Insert | Self::UpdateAfter)
450    }
451
452    /// Returns true if this is a delete-type operation.
453    #[must_use]
454    pub fn is_delete(&self) -> bool {
455        matches!(self, Self::Delete | Self::UpdateBefore)
456    }
457
458    /// Returns the Debezium-compatible operation code.
459    ///
460    /// - 'c': create (insert)
461    /// - 'd': delete
462    /// - 'u': update (used for both before/after in Debezium)
463    #[must_use]
464    pub fn debezium_op(&self) -> char {
465        match self {
466            Self::Insert => 'c',
467            Self::Delete => 'd',
468            Self::UpdateBefore | Self::UpdateAfter => 'u',
469        }
470    }
471
472    /// Converts the operation to a u8 for compact storage.
473    ///
474    /// Used by `ChangelogRef` to store operation type in a single byte.
475    #[inline]
476    #[must_use]
477    pub fn to_u8(self) -> u8 {
478        match self {
479            Self::Insert => 0,
480            Self::Delete => 1,
481            Self::UpdateBefore => 2,
482            Self::UpdateAfter => 3,
483        }
484    }
485
486    /// Converts from u8 (defaults to Insert for unknown values).
487    #[inline]
488    #[must_use]
489    pub fn from_u8(value: u8) -> Self {
490        match value {
491            1 => Self::Delete,
492            2 => Self::UpdateBefore,
493            3 => Self::UpdateAfter,
494            // 0 and unknown values default to Insert
495            _ => Self::Insert,
496        }
497    }
498}
499
500/// A changelog record with Z-set weight for CDC pipelines.
501///
502/// This wraps an event with metadata needed for change data capture:
503/// - Operation type (insert/delete/update)
504/// - Z-set weight (+1/-1)
505/// - Timestamp of the change
506///
507/// Used by `EmitStrategy::Changelog` to emit structured change records
508/// that can be consumed by downstream systems expecting CDC format.
509///
510/// # Example
511///
512/// ```rust,no_run
513/// use laminar_core::operator::window::{ChangelogRecord, CdcOperation};
514/// use laminar_core::operator::Event;
515/// # use std::sync::Arc;
516/// # use arrow_array::RecordBatch;
517/// # use arrow_schema::Schema;
518/// # let schema = Arc::new(Schema::empty());
519/// # let batch = RecordBatch::new_empty(schema);
520/// # let event = Event::new(0, batch.clone());
521/// # let old_event = event.clone();
522/// # let new_event = event.clone();
523///
524/// // Create an insert record
525/// let record = ChangelogRecord::insert(event, 1000);
526/// assert_eq!(record.operation, CdcOperation::Insert);
527/// assert_eq!(record.weight, 1);
528///
529/// // Create a retraction pair for an update
530/// let (before, after) = ChangelogRecord::update(old_event, new_event, 1000);
531/// assert_eq!(before.weight, -1);  // Retract old
532/// assert_eq!(after.weight, 1);    // Insert new
533/// ```
534#[derive(Debug, Clone)]
535pub struct ChangelogRecord {
536    /// The CDC operation type
537    pub operation: CdcOperation,
538    /// Z-set weight (+1 for insert, -1 for delete)
539    pub weight: i32,
540    /// Timestamp when this change was emitted
541    pub emit_timestamp: i64,
542    /// The event data
543    pub event: Event,
544}
545
546impl ChangelogRecord {
547    /// Creates an insert changelog record.
548    #[must_use]
549    pub fn insert(event: Event, emit_timestamp: i64) -> Self {
550        Self {
551            operation: CdcOperation::Insert,
552            weight: 1,
553            emit_timestamp,
554            event,
555        }
556    }
557
558    /// Creates a delete changelog record.
559    #[must_use]
560    pub fn delete(event: Event, emit_timestamp: i64) -> Self {
561        Self {
562            operation: CdcOperation::Delete,
563            weight: -1,
564            emit_timestamp,
565            event,
566        }
567    }
568
569    /// Creates an update retraction pair (before and after records).
570    ///
571    /// Returns a tuple of (`UpdateBefore`, `UpdateAfter`) records.
572    /// The first should be emitted before the second to properly
573    /// retract the old value.
574    #[must_use]
575    pub fn update(old_event: Event, new_event: Event, emit_timestamp: i64) -> (Self, Self) {
576        let before = Self {
577            operation: CdcOperation::UpdateBefore,
578            weight: -1,
579            emit_timestamp,
580            event: old_event,
581        };
582        let after = Self {
583            operation: CdcOperation::UpdateAfter,
584            weight: 1,
585            emit_timestamp,
586            event: new_event,
587        };
588        (before, after)
589    }
590
591    /// Creates a changelog record from raw parts.
592    #[must_use]
593    pub fn new(operation: CdcOperation, event: Event, emit_timestamp: i64) -> Self {
594        Self {
595            operation,
596            weight: operation.weight(),
597            emit_timestamp,
598            event,
599        }
600    }
601
602    /// Returns true if this is an insert-type record.
603    #[must_use]
604    pub fn is_insert(&self) -> bool {
605        self.operation.is_insert()
606    }
607
608    /// Returns true if this is a delete-type record.
609    #[must_use]
610    pub fn is_delete(&self) -> bool {
611        self.operation.is_delete()
612    }
613}
614
615/// Trait for assigning events to windows.
616pub trait WindowAssigner: Send {
617    /// Assigns an event timestamp to zero or more windows.
618    ///
619    /// For tumbling windows, this returns exactly one window.
620    /// For sliding windows, this may return multiple windows.
621    fn assign_windows(&self, timestamp: i64) -> WindowIdVec;
622
623    /// Returns the maximum timestamp that could still be assigned to a window
624    /// ending at `window_end`.
625    ///
626    /// Used for determining when a window can be safely triggered.
627    fn max_timestamp(&self, window_end: i64) -> i64 {
628        window_end - 1
629    }
630}
631
632/// Tumbling window assigner.
633///
634/// Assigns each event to exactly one non-overlapping window based on its timestamp.
635/// Windows are aligned to epoch (timestamp 0).
636#[derive(Debug, Clone)]
637pub struct TumblingWindowAssigner {
638    /// Window size in milliseconds
639    size_ms: i64,
640}
641
642impl TumblingWindowAssigner {
643    /// Creates a new tumbling window assigner.
644    ///
645    /// # Arguments
646    ///
647    /// * `size` - The duration of each window
648    ///
649    /// # Panics
650    ///
651    /// Panics if the size is zero.
652    #[must_use]
653    pub fn new(size: Duration) -> Self {
654        // Ensure window size fits in i64 and is positive
655        let size_ms = i64::try_from(size.as_millis()).expect("Window size must fit in i64");
656        assert!(size_ms > 0, "Window size must be positive");
657        Self { size_ms }
658    }
659
660    /// Creates a new tumbling window assigner with size in milliseconds.
661    ///
662    /// # Panics
663    ///
664    /// Panics if the size is zero or negative.
665    #[must_use]
666    pub fn from_millis(size_ms: i64) -> Self {
667        assert!(size_ms > 0, "Window size must be positive");
668        Self { size_ms }
669    }
670
671    /// Returns the window size in milliseconds.
672    #[must_use]
673    pub fn size_ms(&self) -> i64 {
674        self.size_ms
675    }
676
677    /// Assigns a timestamp to a window.
678    ///
679    /// This is the core window assignment function with O(1) complexity.
680    #[inline]
681    #[must_use]
682    pub fn assign(&self, timestamp: i64) -> WindowId {
683        // Handle negative timestamps correctly
684        let window_start = if timestamp >= 0 {
685            (timestamp / self.size_ms) * self.size_ms
686        } else {
687            // For negative timestamps, we need to floor divide
688            ((timestamp - self.size_ms + 1) / self.size_ms) * self.size_ms
689        };
690        let window_end = window_start + self.size_ms;
691        WindowId::new(window_start, window_end)
692    }
693}
694
695impl WindowAssigner for TumblingWindowAssigner {
696    #[inline]
697    fn assign_windows(&self, timestamp: i64) -> WindowIdVec {
698        let mut windows = WindowIdVec::new();
699        windows.push(self.assign(timestamp));
700        windows
701    }
702}
703
704/// Trait for converting aggregation results to i64 for output.
705///
706/// This is needed to produce Arrow `RecordBatch` outputs with numeric results.
707pub trait ResultToI64 {
708    /// Converts the result to an i64 value.
709    fn to_i64(&self) -> i64;
710}
711
712impl ResultToI64 for u64 {
713    fn to_i64(&self) -> i64 {
714        i64::try_from(*self).unwrap_or(i64::MAX)
715    }
716}
717
718impl ResultToI64 for i64 {
719    fn to_i64(&self) -> i64 {
720        *self
721    }
722}
723
724impl ResultToI64 for Option<i64> {
725    fn to_i64(&self) -> i64 {
726        self.unwrap_or(0)
727    }
728}
729
730impl ResultToI64 for Option<f64> {
731    fn to_i64(&self) -> i64 {
732        // Standard SQL behavior: truncate float to int
733        #[allow(clippy::cast_possible_truncation)]
734        self.map(|f| f as i64).unwrap_or(0)
735    }
736}
737
738/// Accumulator state for aggregations.
739///
740/// This is the state stored per window in the state store.
741/// Different aggregators store different types of accumulators.
742///
743/// Implementors should derive `rkyv::Archive`, `rkyv::Serialize`, and
744/// `rkyv::Deserialize` for zero-copy serialization on the hot path.
745pub trait Accumulator: Default + Clone + Send {
746    /// The input type for the aggregation.
747    type Input;
748    /// The output type produced by the aggregation.
749    type Output: ResultToI64;
750
751    /// Adds a value to the accumulator.
752    fn add(&mut self, value: Self::Input);
753
754    /// Merges another accumulator into this one.
755    fn merge(&mut self, other: &Self);
756
757    /// Extracts the final result from the accumulator.
758    fn result(&self) -> Self::Output;
759
760    /// Returns true if the accumulator is empty (no values added).
761    fn is_empty(&self) -> bool;
762}
763
764/// Trait for window aggregation functions.
765///
766/// Aggregators define how events are combined within a window.
767/// They must be serializable for checkpointing.
768pub trait Aggregator: Send + Clone {
769    /// The accumulator type used by this aggregator.
770    type Acc: Accumulator;
771
772    /// Creates a new empty accumulator.
773    fn create_accumulator(&self) -> Self::Acc;
774
775    /// Extracts a value from an event to be aggregated.
776    ///
777    /// Returns `None` if the event should be skipped.
778    fn extract(&self, event: &Event) -> Option<<Self::Acc as Accumulator>::Input>;
779}
780
781/// Count aggregator - counts the number of events in a window.
782#[derive(Debug, Clone, Default)]
783pub struct CountAggregator;
784
785/// Accumulator for count aggregation.
786#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
787pub struct CountAccumulator {
788    count: u64,
789}
790
791impl CountAggregator {
792    /// Creates a new count aggregator.
793    #[must_use]
794    pub fn new() -> Self {
795        Self
796    }
797}
798
799impl Accumulator for CountAccumulator {
800    type Input = ();
801    type Output = u64;
802
803    fn add(&mut self, _value: ()) {
804        self.count += 1;
805    }
806
807    fn merge(&mut self, other: &Self) {
808        self.count += other.count;
809    }
810
811    fn result(&self) -> u64 {
812        self.count
813    }
814
815    fn is_empty(&self) -> bool {
816        self.count == 0
817    }
818}
819
820impl Aggregator for CountAggregator {
821    type Acc = CountAccumulator;
822
823    fn create_accumulator(&self) -> CountAccumulator {
824        CountAccumulator::default()
825    }
826
827    fn extract(&self, _event: &Event) -> Option<()> {
828        Some(())
829    }
830}
831
832/// Sum aggregator - sums i64 values from events.
833#[derive(Debug, Clone)]
834pub struct SumAggregator {
835    /// Column index to sum (0-based)
836    column_index: usize,
837}
838
839/// Accumulator for sum aggregation.
840#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
841pub struct SumAccumulator {
842    sum: i64,
843    count: u64,
844}
845
846impl SumAggregator {
847    /// Creates a new sum aggregator for the specified column.
848    #[must_use]
849    pub fn new(column_index: usize) -> Self {
850        Self { column_index }
851    }
852}
853
854impl Accumulator for SumAccumulator {
855    type Input = i64;
856    type Output = i64;
857
858    fn add(&mut self, value: i64) {
859        self.sum += value;
860        self.count += 1;
861    }
862
863    fn merge(&mut self, other: &Self) {
864        self.sum += other.sum;
865        self.count += other.count;
866    }
867
868    fn result(&self) -> i64 {
869        self.sum
870    }
871
872    fn is_empty(&self) -> bool {
873        self.count == 0
874    }
875}
876
877impl Aggregator for SumAggregator {
878    type Acc = SumAccumulator;
879
880    fn create_accumulator(&self) -> SumAccumulator {
881        SumAccumulator::default()
882    }
883
884    fn extract(&self, event: &Event) -> Option<i64> {
885        use arrow_array::cast::AsArray;
886        use arrow_array::types::Int64Type;
887
888        let batch = &event.data;
889        if self.column_index >= batch.num_columns() {
890            return None;
891        }
892
893        let column = batch.column(self.column_index);
894        let array = column.as_primitive_opt::<Int64Type>()?;
895
896        // Sum all values in the array
897        Some(array.iter().flatten().sum())
898    }
899}
900
901/// Min aggregator - tracks minimum i64 value.
902#[derive(Debug, Clone)]
903pub struct MinAggregator {
904    column_index: usize,
905}
906
907/// Accumulator for min aggregation.
908#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
909pub struct MinAccumulator {
910    min: Option<i64>,
911}
912
913impl MinAggregator {
914    /// Creates a new min aggregator for the specified column.
915    #[must_use]
916    pub fn new(column_index: usize) -> Self {
917        Self { column_index }
918    }
919}
920
921impl Accumulator for MinAccumulator {
922    type Input = i64;
923    type Output = Option<i64>;
924
925    fn add(&mut self, value: i64) {
926        self.min = Some(self.min.map_or(value, |m| m.min(value)));
927    }
928
929    fn merge(&mut self, other: &Self) {
930        if let Some(other_min) = other.min {
931            self.add(other_min);
932        }
933    }
934
935    fn result(&self) -> Option<i64> {
936        self.min
937    }
938
939    fn is_empty(&self) -> bool {
940        self.min.is_none()
941    }
942}
943
944impl Aggregator for MinAggregator {
945    type Acc = MinAccumulator;
946
947    fn create_accumulator(&self) -> MinAccumulator {
948        MinAccumulator::default()
949    }
950
951    fn extract(&self, event: &Event) -> Option<i64> {
952        use arrow_array::cast::AsArray;
953        use arrow_array::types::Int64Type;
954
955        let batch = &event.data;
956        if self.column_index >= batch.num_columns() {
957            return None;
958        }
959
960        let column = batch.column(self.column_index);
961        let array = column.as_primitive_opt::<Int64Type>()?;
962
963        array.iter().flatten().min()
964    }
965}
966
967/// Max aggregator - tracks maximum i64 value.
968#[derive(Debug, Clone)]
969pub struct MaxAggregator {
970    column_index: usize,
971}
972
973/// Accumulator for max aggregation.
974#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
975pub struct MaxAccumulator {
976    max: Option<i64>,
977}
978
979impl MaxAggregator {
980    /// Creates a new max aggregator for the specified column.
981    #[must_use]
982    pub fn new(column_index: usize) -> Self {
983        Self { column_index }
984    }
985}
986
987impl Accumulator for MaxAccumulator {
988    type Input = i64;
989    type Output = Option<i64>;
990
991    fn add(&mut self, value: i64) {
992        self.max = Some(self.max.map_or(value, |m| m.max(value)));
993    }
994
995    fn merge(&mut self, other: &Self) {
996        if let Some(other_max) = other.max {
997            self.add(other_max);
998        }
999    }
1000
1001    fn result(&self) -> Option<i64> {
1002        self.max
1003    }
1004
1005    fn is_empty(&self) -> bool {
1006        self.max.is_none()
1007    }
1008}
1009
1010impl Aggregator for MaxAggregator {
1011    type Acc = MaxAccumulator;
1012
1013    fn create_accumulator(&self) -> MaxAccumulator {
1014        MaxAccumulator::default()
1015    }
1016
1017    fn extract(&self, event: &Event) -> Option<i64> {
1018        use arrow_array::cast::AsArray;
1019        use arrow_array::types::Int64Type;
1020
1021        let batch = &event.data;
1022        if self.column_index >= batch.num_columns() {
1023            return None;
1024        }
1025
1026        let column = batch.column(self.column_index);
1027        let array = column.as_primitive_opt::<Int64Type>()?;
1028
1029        array.iter().flatten().max()
1030    }
1031}
1032
1033/// Average aggregator - computes average of i64 values.
1034#[derive(Debug, Clone)]
1035pub struct AvgAggregator {
1036    column_index: usize,
1037}
1038
1039/// Accumulator for average aggregation.
1040#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1041pub struct AvgAccumulator {
1042    sum: i64,
1043    count: u64,
1044}
1045
1046impl AvgAggregator {
1047    /// Creates a new average aggregator for the specified column.
1048    #[must_use]
1049    pub fn new(column_index: usize) -> Self {
1050        Self { column_index }
1051    }
1052}
1053
1054impl Accumulator for AvgAccumulator {
1055    type Input = i64;
1056    type Output = Option<f64>;
1057
1058    fn add(&mut self, value: i64) {
1059        self.sum += value;
1060        self.count += 1;
1061    }
1062
1063    fn merge(&mut self, other: &Self) {
1064        self.sum += other.sum;
1065        self.count += other.count;
1066    }
1067
1068    // Precision loss is acceptable for arithmetic mean
1069    #[allow(clippy::cast_precision_loss)]
1070    fn result(&self) -> Option<f64> {
1071        if self.count == 0 {
1072            None
1073        } else {
1074            Some(self.sum as f64 / self.count as f64)
1075        }
1076    }
1077
1078    fn is_empty(&self) -> bool {
1079        self.count == 0
1080    }
1081}
1082
1083impl Aggregator for AvgAggregator {
1084    type Acc = AvgAccumulator;
1085
1086    fn create_accumulator(&self) -> AvgAccumulator {
1087        AvgAccumulator::default()
1088    }
1089
1090    fn extract(&self, event: &Event) -> Option<i64> {
1091        use arrow_array::cast::AsArray;
1092        use arrow_array::types::Int64Type;
1093
1094        let batch = &event.data;
1095        if self.column_index >= batch.num_columns() {
1096            return None;
1097        }
1098
1099        let column = batch.column(self.column_index);
1100        let array = column.as_primitive_opt::<Int64Type>()?;
1101
1102        // For average, we add each value individually
1103        array.iter().flatten().next()
1104    }
1105}
1106
1107// FIRST_VALUE / LAST_VALUE Aggregators (F059)
1108
1109/// `FIRST_VALUE` aggregator - returns the first value seen in a window.
1110///
1111/// Tracks the value with the earliest timestamp in the window.
1112/// For deterministic results, uses event timestamp, not arrival order.
1113///
1114/// # Example
1115///
1116/// ```rust,no_run
1117/// use laminar_core::operator::window::FirstValueAggregator;
1118///
1119/// // Track first price by timestamp
1120/// let first_price = FirstValueAggregator::new(0, 1); // price col 0, timestamp col 1
1121/// ```
1122#[derive(Debug, Clone)]
1123pub struct FirstValueAggregator {
1124    /// Column index to extract value from
1125    value_column_index: usize,
1126    /// Column index for event timestamp (for ordering)
1127    timestamp_column_index: usize,
1128}
1129
1130/// Accumulator for `FIRST_VALUE` aggregation.
1131///
1132/// Stores the value with the earliest timestamp seen so far.
1133#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1134#[rkyv(compare(PartialEq), derive(Debug))]
1135pub struct FirstValueAccumulator {
1136    /// The first value seen (None if no values yet)
1137    value: Option<i64>,
1138    /// Timestamp of the first value (for merge ordering)
1139    timestamp: Option<i64>,
1140}
1141
1142impl FirstValueAggregator {
1143    /// Creates a new `FIRST_VALUE` aggregator.
1144    ///
1145    /// # Arguments
1146    ///
1147    /// * `value_column_index` - Column to extract value from
1148    /// * `timestamp_column_index` - Column for event timestamp ordering
1149    #[must_use]
1150    pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1151        Self {
1152            value_column_index,
1153            timestamp_column_index,
1154        }
1155    }
1156}
1157
1158impl Accumulator for FirstValueAccumulator {
1159    type Input = (i64, i64); // (value, timestamp)
1160    type Output = Option<i64>;
1161
1162    fn add(&mut self, (value, timestamp): (i64, i64)) {
1163        match self.timestamp {
1164            None => {
1165                // First value
1166                self.value = Some(value);
1167                self.timestamp = Some(timestamp);
1168            }
1169            Some(existing_ts) if timestamp < existing_ts => {
1170                // Earlier timestamp - replace
1171                self.value = Some(value);
1172                self.timestamp = Some(timestamp);
1173            }
1174            _ => {
1175                // Later or equal timestamp - keep existing
1176            }
1177        }
1178    }
1179
1180    fn merge(&mut self, other: &Self) {
1181        match (self.timestamp, other.timestamp) {
1182            (None, Some(_)) => {
1183                self.value = other.value;
1184                self.timestamp = other.timestamp;
1185            }
1186            (Some(self_ts), Some(other_ts)) if other_ts < self_ts => {
1187                self.value = other.value;
1188                self.timestamp = other.timestamp;
1189            }
1190            _ => {
1191                // Keep self
1192            }
1193        }
1194    }
1195
1196    fn result(&self) -> Option<i64> {
1197        self.value
1198    }
1199
1200    fn is_empty(&self) -> bool {
1201        self.value.is_none()
1202    }
1203}
1204
1205impl Aggregator for FirstValueAggregator {
1206    type Acc = FirstValueAccumulator;
1207
1208    fn create_accumulator(&self) -> FirstValueAccumulator {
1209        FirstValueAccumulator::default()
1210    }
1211
1212    fn extract(&self, event: &Event) -> Option<(i64, i64)> {
1213        use arrow_array::cast::AsArray;
1214        use arrow_array::types::Int64Type;
1215
1216        let batch = &event.data;
1217        if self.value_column_index >= batch.num_columns()
1218            || self.timestamp_column_index >= batch.num_columns()
1219        {
1220            return None;
1221        }
1222
1223        // Extract value
1224        let value_col = batch.column(self.value_column_index);
1225        let value_array = value_col.as_primitive_opt::<Int64Type>()?;
1226        let value = value_array.iter().flatten().next()?;
1227
1228        // Extract timestamp
1229        let ts_col = batch.column(self.timestamp_column_index);
1230        let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1231        let timestamp = ts_array.iter().flatten().next()?;
1232
1233        Some((value, timestamp))
1234    }
1235}
1236
1237/// `LAST_VALUE` aggregator - returns the last value seen in a window.
1238///
1239/// Tracks the value with the latest timestamp in the window.
1240/// For deterministic results, uses event timestamp, not arrival order.
1241/// When timestamps are equal, the later arrival wins.
1242///
1243/// # Example
1244///
1245/// ```rust,no_run
1246/// use laminar_core::operator::window::LastValueAggregator;
1247///
1248/// // Track last (closing) price by timestamp
1249/// let last_price = LastValueAggregator::new(0, 1); // price col 0, timestamp col 1
1250/// ```
1251#[derive(Debug, Clone)]
1252pub struct LastValueAggregator {
1253    /// Column index to extract value from
1254    value_column_index: usize,
1255    /// Column index for event timestamp (for ordering)
1256    timestamp_column_index: usize,
1257}
1258
1259/// Accumulator for `LAST_VALUE` aggregation.
1260///
1261/// Stores the value with the latest timestamp seen so far.
1262#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1263#[rkyv(compare(PartialEq), derive(Debug))]
1264pub struct LastValueAccumulator {
1265    /// The last value seen (None if no values yet)
1266    value: Option<i64>,
1267    /// Timestamp of the last value (for merge ordering)
1268    timestamp: Option<i64>,
1269}
1270
1271impl LastValueAggregator {
1272    /// Creates a new `LAST_VALUE` aggregator.
1273    ///
1274    /// # Arguments
1275    ///
1276    /// * `value_column_index` - Column to extract value from
1277    /// * `timestamp_column_index` - Column for event timestamp ordering
1278    #[must_use]
1279    pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1280        Self {
1281            value_column_index,
1282            timestamp_column_index,
1283        }
1284    }
1285}
1286
1287impl Accumulator for LastValueAccumulator {
1288    type Input = (i64, i64); // (value, timestamp)
1289    type Output = Option<i64>;
1290
1291    fn add(&mut self, (value, timestamp): (i64, i64)) {
1292        match self.timestamp {
1293            None => {
1294                // First value
1295                self.value = Some(value);
1296                self.timestamp = Some(timestamp);
1297            }
1298            Some(existing_ts) if timestamp > existing_ts => {
1299                // Later timestamp - replace
1300                self.value = Some(value);
1301                self.timestamp = Some(timestamp);
1302            }
1303            Some(existing_ts) if timestamp == existing_ts => {
1304                // Same timestamp - keep latest arrival (replace)
1305                self.value = Some(value);
1306            }
1307            _ => {
1308                // Earlier timestamp - keep existing
1309            }
1310        }
1311    }
1312
1313    fn merge(&mut self, other: &Self) {
1314        match (self.timestamp, other.timestamp) {
1315            (None, Some(_)) => {
1316                self.value = other.value;
1317                self.timestamp = other.timestamp;
1318            }
1319            (Some(self_ts), Some(other_ts)) if other_ts > self_ts => {
1320                self.value = other.value;
1321                self.timestamp = other.timestamp;
1322            }
1323            (Some(self_ts), Some(other_ts)) if other_ts == self_ts => {
1324                // Same timestamp - use other (simulate later arrival)
1325                self.value = other.value;
1326            }
1327            _ => {
1328                // Keep self
1329            }
1330        }
1331    }
1332
1333    fn result(&self) -> Option<i64> {
1334        self.value
1335    }
1336
1337    fn is_empty(&self) -> bool {
1338        self.value.is_none()
1339    }
1340}
1341
1342impl Aggregator for LastValueAggregator {
1343    type Acc = LastValueAccumulator;
1344
1345    fn create_accumulator(&self) -> LastValueAccumulator {
1346        LastValueAccumulator::default()
1347    }
1348
1349    fn extract(&self, event: &Event) -> Option<(i64, i64)> {
1350        use arrow_array::cast::AsArray;
1351        use arrow_array::types::Int64Type;
1352
1353        let batch = &event.data;
1354        if self.value_column_index >= batch.num_columns()
1355            || self.timestamp_column_index >= batch.num_columns()
1356        {
1357            return None;
1358        }
1359
1360        // Extract value
1361        let value_col = batch.column(self.value_column_index);
1362        let value_array = value_col.as_primitive_opt::<Int64Type>()?;
1363        let value = value_array.iter().flatten().next()?;
1364
1365        // Extract timestamp
1366        let ts_col = batch.column(self.timestamp_column_index);
1367        let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1368        let timestamp = ts_array.iter().flatten().next()?;
1369
1370        Some((value, timestamp))
1371    }
1372}
1373
1374// FIRST_VALUE / LAST_VALUE for Float64 (F059)
1375
1376/// Accumulator for `FIRST_VALUE` aggregation on f64 values.
1377#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1378#[rkyv(compare(PartialEq), derive(Debug))]
1379pub struct FirstValueF64Accumulator {
1380    /// The first value seen (None if no values yet)
1381    value: Option<i64>, // Store as bits for rkyv compatibility
1382    /// Timestamp of the first value (for merge ordering)
1383    timestamp: Option<i64>,
1384}
1385
1386impl FirstValueF64Accumulator {
1387    /// Gets the result as f64.
1388    #[must_use]
1389    #[allow(clippy::cast_sign_loss)]
1390    pub fn result_f64(&self) -> Option<f64> {
1391        self.value.map(|bits| f64::from_bits(bits as u64))
1392    }
1393}
1394
1395impl Accumulator for FirstValueF64Accumulator {
1396    type Input = (f64, i64); // (value, timestamp)
1397    type Output = Option<f64>;
1398
1399    fn add(&mut self, (value, timestamp): (f64, i64)) {
1400        // SAFETY: We strictly use this as storage bits and convert back via from_bits
1401        #[allow(clippy::cast_possible_wrap)]
1402        let value_bits = value.to_bits() as i64;
1403        match self.timestamp {
1404            None => {
1405                self.value = Some(value_bits);
1406                self.timestamp = Some(timestamp);
1407            }
1408            Some(existing_ts) if timestamp < existing_ts => {
1409                self.value = Some(value_bits);
1410                self.timestamp = Some(timestamp);
1411            }
1412            _ => {}
1413        }
1414    }
1415
1416    fn merge(&mut self, other: &Self) {
1417        match (self.timestamp, other.timestamp) {
1418            (None, Some(_)) => {
1419                self.value = other.value;
1420                self.timestamp = other.timestamp;
1421            }
1422            (Some(self_ts), Some(other_ts)) if other_ts < self_ts => {
1423                self.value = other.value;
1424                self.timestamp = other.timestamp;
1425            }
1426            _ => {}
1427        }
1428    }
1429
1430    #[allow(clippy::cast_sign_loss)]
1431    fn result(&self) -> Option<f64> {
1432        self.value.map(|bits| f64::from_bits(bits as u64))
1433    }
1434
1435    fn is_empty(&self) -> bool {
1436        self.value.is_none()
1437    }
1438}
1439
1440/// `FIRST_VALUE` aggregator for f64 columns.
1441#[derive(Debug, Clone)]
1442pub struct FirstValueF64Aggregator {
1443    /// Column index to extract value from
1444    value_column_index: usize,
1445    /// Column index for event timestamp (for ordering)
1446    timestamp_column_index: usize,
1447}
1448
1449impl FirstValueF64Aggregator {
1450    /// Creates a new `FIRST_VALUE` aggregator for f64 columns.
1451    #[must_use]
1452    pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1453        Self {
1454            value_column_index,
1455            timestamp_column_index,
1456        }
1457    }
1458}
1459
1460impl Aggregator for FirstValueF64Aggregator {
1461    type Acc = FirstValueF64Accumulator;
1462
1463    fn create_accumulator(&self) -> FirstValueF64Accumulator {
1464        FirstValueF64Accumulator::default()
1465    }
1466
1467    fn extract(&self, event: &Event) -> Option<(f64, i64)> {
1468        use arrow_array::cast::AsArray;
1469        use arrow_array::types::{Float64Type, Int64Type};
1470
1471        let batch = &event.data;
1472        if self.value_column_index >= batch.num_columns()
1473            || self.timestamp_column_index >= batch.num_columns()
1474        {
1475            return None;
1476        }
1477
1478        // Extract value as f64
1479        let value_col = batch.column(self.value_column_index);
1480        let value_array = value_col.as_primitive_opt::<Float64Type>()?;
1481        let value = value_array.iter().flatten().next()?;
1482
1483        // Extract timestamp
1484        let ts_col = batch.column(self.timestamp_column_index);
1485        let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1486        let timestamp = ts_array.iter().flatten().next()?;
1487
1488        Some((value, timestamp))
1489    }
1490}
1491
1492/// Accumulator for `LAST_VALUE` aggregation on f64 values.
1493#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1494#[rkyv(compare(PartialEq), derive(Debug))]
1495pub struct LastValueF64Accumulator {
1496    /// The last value seen (None if no values yet)
1497    value: Option<i64>, // Store as bits for rkyv compatibility
1498    /// Timestamp of the last value (for merge ordering)
1499    timestamp: Option<i64>,
1500}
1501
1502impl LastValueF64Accumulator {
1503    /// Gets the result as f64.
1504    #[must_use]
1505    #[allow(clippy::cast_sign_loss)]
1506    pub fn result_f64(&self) -> Option<f64> {
1507        self.value.map(|bits| f64::from_bits(bits as u64))
1508    }
1509}
1510
1511impl Accumulator for LastValueF64Accumulator {
1512    type Input = (f64, i64); // (value, timestamp)
1513    type Output = Option<f64>;
1514
1515    fn add(&mut self, (value, timestamp): (f64, i64)) {
1516        // SAFETY: We strictly use this as storage bits and convert back via from_bits
1517        #[allow(clippy::cast_possible_wrap)]
1518        let value_bits = value.to_bits() as i64;
1519        match self.timestamp {
1520            None => {
1521                self.value = Some(value_bits);
1522                self.timestamp = Some(timestamp);
1523            }
1524            Some(existing_ts) if timestamp > existing_ts => {
1525                self.value = Some(value_bits);
1526                self.timestamp = Some(timestamp);
1527            }
1528            Some(existing_ts) if timestamp == existing_ts => {
1529                self.value = Some(value_bits);
1530            }
1531            _ => {}
1532        }
1533    }
1534
1535    fn merge(&mut self, other: &Self) {
1536        match (self.timestamp, other.timestamp) {
1537            (None, Some(_)) => {
1538                self.value = other.value;
1539                self.timestamp = other.timestamp;
1540            }
1541            (Some(self_ts), Some(other_ts)) if other_ts > self_ts => {
1542                self.value = other.value;
1543                self.timestamp = other.timestamp;
1544            }
1545            (Some(self_ts), Some(other_ts)) if other_ts == self_ts => {
1546                self.value = other.value;
1547            }
1548            _ => {}
1549        }
1550    }
1551
1552    #[allow(clippy::cast_sign_loss)]
1553    fn result(&self) -> Option<f64> {
1554        self.value.map(|bits| f64::from_bits(bits as u64))
1555    }
1556
1557    fn is_empty(&self) -> bool {
1558        self.value.is_none()
1559    }
1560}
1561
1562/// `LAST_VALUE` aggregator for f64 columns.
1563#[derive(Debug, Clone)]
1564pub struct LastValueF64Aggregator {
1565    /// Column index to extract value from
1566    value_column_index: usize,
1567    /// Column index for event timestamp (for ordering)
1568    timestamp_column_index: usize,
1569}
1570
1571impl LastValueF64Aggregator {
1572    /// Creates a new `LAST_VALUE` aggregator for f64 columns.
1573    #[must_use]
1574    pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1575        Self {
1576            value_column_index,
1577            timestamp_column_index,
1578        }
1579    }
1580}
1581
1582impl Aggregator for LastValueF64Aggregator {
1583    type Acc = LastValueF64Accumulator;
1584
1585    fn create_accumulator(&self) -> LastValueF64Accumulator {
1586        LastValueF64Accumulator::default()
1587    }
1588
1589    fn extract(&self, event: &Event) -> Option<(f64, i64)> {
1590        use arrow_array::cast::AsArray;
1591        use arrow_array::types::{Float64Type, Int64Type};
1592
1593        let batch = &event.data;
1594        if self.value_column_index >= batch.num_columns()
1595            || self.timestamp_column_index >= batch.num_columns()
1596        {
1597            return None;
1598        }
1599
1600        // Extract value as f64
1601        let value_col = batch.column(self.value_column_index);
1602        let value_array = value_col.as_primitive_opt::<Float64Type>()?;
1603        let value = value_array.iter().flatten().next()?;
1604
1605        // Extract timestamp
1606        let ts_col = batch.column(self.timestamp_column_index);
1607        let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1608        let timestamp = ts_array.iter().flatten().next()?;
1609
1610        Some((value, timestamp))
1611    }
1612}
1613
1614// F074: Composite Aggregator & f64 Type Support
1615
1616/// Scalar result type supporting multiple numeric types.
1617///
1618/// Used by [`DynAccumulator`] for dynamic-dispatch aggregation where
1619/// the result type is determined at runtime. This enables composite
1620/// aggregation (multiple aggregates per window) with mixed types.
1621///
1622/// # Example
1623///
1624/// ```rust,no_run
1625/// use laminar_core::operator::window::ScalarResult;
1626///
1627/// let r = ScalarResult::Float64(3.14);
1628/// assert_eq!(r.to_i64_lossy(), 3);
1629/// assert_eq!(r.to_f64_lossy(), 3.14);
1630/// ```
1631#[derive(Debug, Clone, PartialEq)]
1632pub enum ScalarResult {
1633    /// 64-bit signed integer
1634    Int64(i64),
1635    /// 64-bit floating point
1636    Float64(f64),
1637    /// 64-bit unsigned integer
1638    UInt64(u64),
1639    /// Optional 64-bit signed integer
1640    OptionalInt64(Option<i64>),
1641    /// Optional 64-bit floating point
1642    OptionalFloat64(Option<f64>),
1643    /// Null / no value
1644    Null,
1645}
1646
1647impl ScalarResult {
1648    /// Converts to i64, truncating floats and saturating unsigned values.
1649    #[must_use]
1650    #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
1651    pub fn to_i64_lossy(&self) -> i64 {
1652        match self {
1653            Self::Int64(v) => *v,
1654            Self::Float64(v) => *v as i64,
1655            Self::UInt64(v) => i64::try_from(*v).unwrap_or(i64::MAX),
1656            Self::OptionalInt64(v) => v.unwrap_or(0),
1657            Self::OptionalFloat64(v) => v.map(|f| f as i64).unwrap_or(0),
1658            Self::Null => 0,
1659        }
1660    }
1661
1662    /// Converts to f64, with potential precision loss for large integers.
1663    #[must_use]
1664    #[allow(clippy::cast_precision_loss)]
1665    pub fn to_f64_lossy(&self) -> f64 {
1666        match self {
1667            Self::Int64(v) => *v as f64,
1668            Self::Float64(v) => *v,
1669            Self::UInt64(v) => *v as f64,
1670            Self::OptionalInt64(v) => v.map(|i| i as f64).unwrap_or(0.0),
1671            Self::OptionalFloat64(v) => v.unwrap_or(0.0),
1672            Self::Null => 0.0,
1673        }
1674    }
1675
1676    /// Returns true if this is a null or None value.
1677    #[must_use]
1678    pub fn is_null(&self) -> bool {
1679        matches!(
1680            self,
1681            Self::Null | Self::OptionalInt64(None) | Self::OptionalFloat64(None)
1682        )
1683    }
1684
1685    /// Returns the Arrow [`DataType`] for this result.
1686    #[must_use]
1687    pub fn data_type(&self) -> DataType {
1688        match self {
1689            Self::Int64(_) | Self::OptionalInt64(_) => DataType::Int64,
1690            Self::Float64(_) | Self::OptionalFloat64(_) => DataType::Float64,
1691            Self::UInt64(_) => DataType::UInt64,
1692            Self::Null => DataType::Null,
1693        }
1694    }
1695}
1696
1697/// Dynamic accumulator trait for composite aggregation (F074).
1698///
1699/// Unlike the static [`Accumulator`] trait, this works with events directly
1700/// and returns [`ScalarResult`] for type-flexible output. Used by
1701/// [`CompositeAggregator`] to combine multiple aggregates per window.
1702///
1703/// # Ring Architecture
1704///
1705/// Dynamic dispatch has overhead (~2-5ns per vtable call), so composite
1706/// aggregation is intended for Ring 1 workloads. Ring 0 continues to use
1707/// the static [`Aggregator`] + [`Accumulator`] path.
1708pub trait DynAccumulator: Send {
1709    /// Adds an event to the accumulator.
1710    fn add_event(&mut self, event: &Event);
1711
1712    /// Merges another accumulator of the same type into this one.
1713    ///
1714    /// # Panics
1715    ///
1716    /// May panic if `other` is not the same concrete type.
1717    fn merge_dyn(&mut self, other: &dyn DynAccumulator);
1718
1719    /// Returns the current aggregate result.
1720    fn result_scalar(&self) -> ScalarResult;
1721
1722    /// Returns true if no values have been accumulated.
1723    fn is_empty(&self) -> bool;
1724
1725    /// Creates a boxed clone of this accumulator.
1726    fn clone_box(&self) -> Box<dyn DynAccumulator>;
1727
1728    /// Serializes the accumulator state to bytes (for checkpointing).
1729    fn serialize(&self) -> Vec<u8>;
1730
1731    /// Returns the Arrow field descriptor for this accumulator's output.
1732    fn result_field(&self) -> Field;
1733
1734    /// Returns a type tag for deserialization dispatch.
1735    fn type_tag(&self) -> &'static str;
1736
1737    /// Returns self as `Any` for downcasting (used by `DataFusion` bridge).
1738    fn as_any(&self) -> &dyn std::any::Any;
1739}
1740
1741/// Factory trait for creating [`DynAccumulator`] instances.
1742///
1743/// Each factory corresponds to one aggregate function (e.g., SUM, COUNT).
1744/// The [`CompositeAggregator`] holds multiple factories.
1745pub trait DynAggregatorFactory: Send + Sync {
1746    /// Creates a new empty accumulator.
1747    fn create_accumulator(&self) -> Box<dyn DynAccumulator>;
1748
1749    /// Returns the Arrow field descriptor for results.
1750    fn result_field(&self) -> Field;
1751
1752    /// Creates a boxed clone of this factory.
1753    fn clone_box(&self) -> Box<dyn DynAggregatorFactory>;
1754
1755    /// Returns a type tag for deserialization dispatch.
1756    fn type_tag(&self) -> &'static str;
1757}
1758
1759// ── f64 Aggregators ─────────────────────────────────────────────────────────
1760
1761/// Sum aggregator for f64 columns.
1762#[derive(Debug, Clone)]
1763pub struct SumF64Aggregator {
1764    /// Column index to sum
1765    column_index: usize,
1766}
1767
1768/// Accumulator for f64 sum aggregation.
1769#[derive(Debug, Clone, Default)]
1770pub struct SumF64Accumulator {
1771    /// Running sum
1772    sum: f64,
1773    /// Count of values for `is_empty` check
1774    count: u64,
1775}
1776
1777impl SumF64Aggregator {
1778    /// Creates a new f64 sum aggregator for the specified column.
1779    #[must_use]
1780    pub fn new(column_index: usize) -> Self {
1781        Self { column_index }
1782    }
1783
1784    /// Returns the column index.
1785    #[must_use]
1786    pub fn column_index(&self) -> usize {
1787        self.column_index
1788    }
1789}
1790
1791impl SumF64Accumulator {
1792    /// Returns the current sum.
1793    #[must_use]
1794    pub fn sum(&self) -> f64 {
1795        self.sum
1796    }
1797}
1798
1799impl DynAccumulator for SumF64Accumulator {
1800    fn add_event(&mut self, event: &Event) {
1801        use arrow_array::cast::AsArray;
1802        use arrow_array::types::Float64Type;
1803
1804        // Extract from first column by default (factory sets column_index)
1805        // Note: column_index is embedded in the accumulator at construction
1806        let batch = &event.data;
1807        if batch.num_columns() == 0 {
1808            return;
1809        }
1810        // Try first column as f64
1811        if let Some(array) = batch.column(0).as_primitive_opt::<Float64Type>() {
1812            for val in array.iter().flatten() {
1813                self.sum += val;
1814                self.count += 1;
1815            }
1816        }
1817    }
1818
1819    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
1820        // Downcast via serialize/deserialize for safety
1821        let data = other.serialize();
1822        if data.len() == 16 {
1823            let sum = f64::from_le_bytes(data[..8].try_into().unwrap());
1824            let count = u64::from_le_bytes(data[8..16].try_into().unwrap());
1825            self.sum += sum;
1826            self.count += count;
1827        }
1828    }
1829
1830    fn result_scalar(&self) -> ScalarResult {
1831        if self.count == 0 {
1832            ScalarResult::Null
1833        } else {
1834            ScalarResult::Float64(self.sum)
1835        }
1836    }
1837
1838    fn is_empty(&self) -> bool {
1839        self.count == 0
1840    }
1841
1842    fn clone_box(&self) -> Box<dyn DynAccumulator> {
1843        Box::new(self.clone())
1844    }
1845
1846    fn serialize(&self) -> Vec<u8> {
1847        let mut buf = Vec::with_capacity(16);
1848        buf.extend_from_slice(&self.sum.to_le_bytes());
1849        buf.extend_from_slice(&self.count.to_le_bytes());
1850        buf
1851    }
1852
1853    fn result_field(&self) -> Field {
1854        Field::new("sum_f64", DataType::Float64, true)
1855    }
1856
1857    fn type_tag(&self) -> &'static str {
1858        "sum_f64"
1859    }
1860
1861    fn as_any(&self) -> &dyn std::any::Any {
1862        self
1863    }
1864}
1865
1866/// Factory for [`SumF64Accumulator`].
1867#[derive(Debug, Clone)]
1868pub struct SumF64Factory {
1869    /// Column index to sum
1870    column_index: usize,
1871    /// Output field name
1872    field_name: String,
1873}
1874
1875impl SumF64Factory {
1876    /// Creates a new f64 sum factory.
1877    #[must_use]
1878    pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
1879        Self {
1880            column_index,
1881            field_name: field_name.into(),
1882        }
1883    }
1884}
1885
1886impl DynAggregatorFactory for SumF64Factory {
1887    fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
1888        Box::new(SumF64IndexedAccumulator::new(self.column_index))
1889    }
1890
1891    fn result_field(&self) -> Field {
1892        Field::new(&self.field_name, DataType::Float64, true)
1893    }
1894
1895    fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
1896        Box::new(self.clone())
1897    }
1898
1899    fn type_tag(&self) -> &'static str {
1900        "sum_f64"
1901    }
1902}
1903
1904/// f64 sum accumulator with embedded column index.
1905#[derive(Debug, Clone)]
1906pub struct SumF64IndexedAccumulator {
1907    /// Column index to extract from
1908    column_index: usize,
1909    /// Running sum
1910    sum: f64,
1911    /// Count of values
1912    count: u64,
1913}
1914
1915impl SumF64IndexedAccumulator {
1916    /// Creates a new indexed sum accumulator.
1917    #[must_use]
1918    pub fn new(column_index: usize) -> Self {
1919        Self {
1920            column_index,
1921            sum: 0.0,
1922            count: 0,
1923        }
1924    }
1925}
1926
1927impl DynAccumulator for SumF64IndexedAccumulator {
1928    fn add_event(&mut self, event: &Event) {
1929        use arrow_array::cast::AsArray;
1930        use arrow_array::types::Float64Type;
1931
1932        let batch = &event.data;
1933        if self.column_index >= batch.num_columns() {
1934            return;
1935        }
1936        if let Some(array) = batch
1937            .column(self.column_index)
1938            .as_primitive_opt::<Float64Type>()
1939        {
1940            for val in array.iter().flatten() {
1941                self.sum += val;
1942                self.count += 1;
1943            }
1944        }
1945    }
1946
1947    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
1948        let data = other.serialize();
1949        if data.len() >= 16 {
1950            let sum = f64::from_le_bytes(data[..8].try_into().unwrap());
1951            let count = u64::from_le_bytes(data[8..16].try_into().unwrap());
1952            self.sum += sum;
1953            self.count += count;
1954        }
1955    }
1956
1957    fn result_scalar(&self) -> ScalarResult {
1958        if self.count == 0 {
1959            ScalarResult::Null
1960        } else {
1961            ScalarResult::Float64(self.sum)
1962        }
1963    }
1964
1965    fn is_empty(&self) -> bool {
1966        self.count == 0
1967    }
1968
1969    fn clone_box(&self) -> Box<dyn DynAccumulator> {
1970        Box::new(self.clone())
1971    }
1972
1973    fn serialize(&self) -> Vec<u8> {
1974        let mut buf = Vec::with_capacity(16);
1975        buf.extend_from_slice(&self.sum.to_le_bytes());
1976        buf.extend_from_slice(&self.count.to_le_bytes());
1977        buf
1978    }
1979
1980    fn result_field(&self) -> Field {
1981        Field::new("sum_f64", DataType::Float64, true)
1982    }
1983
1984    fn type_tag(&self) -> &'static str {
1985        "sum_f64"
1986    }
1987
1988    fn as_any(&self) -> &dyn std::any::Any {
1989        self
1990    }
1991}
1992
1993/// Min aggregator for f64 columns.
1994#[derive(Debug, Clone)]
1995pub struct MinF64Factory {
1996    /// Column index
1997    column_index: usize,
1998    /// Output field name
1999    field_name: String,
2000}
2001
2002impl MinF64Factory {
2003    /// Creates a new f64 min factory.
2004    #[must_use]
2005    pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
2006        Self {
2007            column_index,
2008            field_name: field_name.into(),
2009        }
2010    }
2011}
2012
2013impl DynAggregatorFactory for MinF64Factory {
2014    fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2015        Box::new(MinF64IndexedAccumulator::new(self.column_index))
2016    }
2017
2018    fn result_field(&self) -> Field {
2019        Field::new(&self.field_name, DataType::Float64, true)
2020    }
2021
2022    fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2023        Box::new(self.clone())
2024    }
2025
2026    fn type_tag(&self) -> &'static str {
2027        "min_f64"
2028    }
2029}
2030
2031/// f64 min accumulator with embedded column index.
2032#[derive(Debug, Clone)]
2033pub struct MinF64IndexedAccumulator {
2034    /// Column index
2035    column_index: usize,
2036    /// Current minimum
2037    min: Option<f64>,
2038}
2039
2040impl MinF64IndexedAccumulator {
2041    /// Creates a new indexed min accumulator.
2042    #[must_use]
2043    pub fn new(column_index: usize) -> Self {
2044        Self {
2045            column_index,
2046            min: None,
2047        }
2048    }
2049}
2050
2051impl DynAccumulator for MinF64IndexedAccumulator {
2052    fn add_event(&mut self, event: &Event) {
2053        use arrow_array::cast::AsArray;
2054        use arrow_array::types::Float64Type;
2055
2056        let batch = &event.data;
2057        if self.column_index >= batch.num_columns() {
2058            return;
2059        }
2060        if let Some(array) = batch
2061            .column(self.column_index)
2062            .as_primitive_opt::<Float64Type>()
2063        {
2064            for val in array.iter().flatten() {
2065                self.min = Some(self.min.map_or(val, |m: f64| m.min(val)));
2066            }
2067        }
2068    }
2069
2070    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2071        let data = other.serialize();
2072        if data.len() >= 9 && data[0] == 1 {
2073            let other_min = f64::from_le_bytes(data[1..9].try_into().unwrap());
2074            self.min = Some(self.min.map_or(other_min, |m: f64| m.min(other_min)));
2075        }
2076    }
2077
2078    fn result_scalar(&self) -> ScalarResult {
2079        ScalarResult::OptionalFloat64(self.min)
2080    }
2081
2082    fn is_empty(&self) -> bool {
2083        self.min.is_none()
2084    }
2085
2086    fn clone_box(&self) -> Box<dyn DynAccumulator> {
2087        Box::new(self.clone())
2088    }
2089
2090    fn serialize(&self) -> Vec<u8> {
2091        match self.min {
2092            Some(v) => {
2093                let mut buf = Vec::with_capacity(9);
2094                buf.push(1); // has value marker
2095                buf.extend_from_slice(&v.to_le_bytes());
2096                buf
2097            }
2098            None => vec![0],
2099        }
2100    }
2101
2102    fn result_field(&self) -> Field {
2103        Field::new("min_f64", DataType::Float64, true)
2104    }
2105
2106    fn type_tag(&self) -> &'static str {
2107        "min_f64"
2108    }
2109
2110    fn as_any(&self) -> &dyn std::any::Any {
2111        self
2112    }
2113}
2114
2115/// Max aggregator for f64 columns.
2116#[derive(Debug, Clone)]
2117pub struct MaxF64Factory {
2118    /// Column index
2119    column_index: usize,
2120    /// Output field name
2121    field_name: String,
2122}
2123
2124impl MaxF64Factory {
2125    /// Creates a new f64 max factory.
2126    #[must_use]
2127    pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
2128        Self {
2129            column_index,
2130            field_name: field_name.into(),
2131        }
2132    }
2133}
2134
2135impl DynAggregatorFactory for MaxF64Factory {
2136    fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2137        Box::new(MaxF64IndexedAccumulator::new(self.column_index))
2138    }
2139
2140    fn result_field(&self) -> Field {
2141        Field::new(&self.field_name, DataType::Float64, true)
2142    }
2143
2144    fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2145        Box::new(self.clone())
2146    }
2147
2148    fn type_tag(&self) -> &'static str {
2149        "max_f64"
2150    }
2151}
2152
2153/// f64 max accumulator with embedded column index.
2154#[derive(Debug, Clone)]
2155pub struct MaxF64IndexedAccumulator {
2156    /// Column index
2157    column_index: usize,
2158    /// Current maximum
2159    max: Option<f64>,
2160}
2161
2162impl MaxF64IndexedAccumulator {
2163    /// Creates a new indexed max accumulator.
2164    #[must_use]
2165    pub fn new(column_index: usize) -> Self {
2166        Self {
2167            column_index,
2168            max: None,
2169        }
2170    }
2171}
2172
2173impl DynAccumulator for MaxF64IndexedAccumulator {
2174    fn add_event(&mut self, event: &Event) {
2175        use arrow_array::cast::AsArray;
2176        use arrow_array::types::Float64Type;
2177
2178        let batch = &event.data;
2179        if self.column_index >= batch.num_columns() {
2180            return;
2181        }
2182        if let Some(array) = batch
2183            .column(self.column_index)
2184            .as_primitive_opt::<Float64Type>()
2185        {
2186            for val in array.iter().flatten() {
2187                self.max = Some(self.max.map_or(val, |m: f64| m.max(val)));
2188            }
2189        }
2190    }
2191
2192    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2193        let data = other.serialize();
2194        if data.len() >= 9 && data[0] == 1 {
2195            let other_max = f64::from_le_bytes(data[1..9].try_into().unwrap());
2196            self.max = Some(self.max.map_or(other_max, |m: f64| m.max(other_max)));
2197        }
2198    }
2199
2200    fn result_scalar(&self) -> ScalarResult {
2201        ScalarResult::OptionalFloat64(self.max)
2202    }
2203
2204    fn is_empty(&self) -> bool {
2205        self.max.is_none()
2206    }
2207
2208    fn clone_box(&self) -> Box<dyn DynAccumulator> {
2209        Box::new(self.clone())
2210    }
2211
2212    fn serialize(&self) -> Vec<u8> {
2213        match self.max {
2214            Some(v) => {
2215                let mut buf = Vec::with_capacity(9);
2216                buf.push(1);
2217                buf.extend_from_slice(&v.to_le_bytes());
2218                buf
2219            }
2220            None => vec![0],
2221        }
2222    }
2223
2224    fn result_field(&self) -> Field {
2225        Field::new("max_f64", DataType::Float64, true)
2226    }
2227
2228    fn type_tag(&self) -> &'static str {
2229        "max_f64"
2230    }
2231
2232    fn as_any(&self) -> &dyn std::any::Any {
2233        self
2234    }
2235}
2236
2237/// Avg aggregator for f64 columns.
2238#[derive(Debug, Clone)]
2239pub struct AvgF64Factory {
2240    /// Column index
2241    column_index: usize,
2242    /// Output field name
2243    field_name: String,
2244}
2245
2246impl AvgF64Factory {
2247    /// Creates a new f64 avg factory.
2248    #[must_use]
2249    pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
2250        Self {
2251            column_index,
2252            field_name: field_name.into(),
2253        }
2254    }
2255}
2256
2257impl DynAggregatorFactory for AvgF64Factory {
2258    fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2259        Box::new(AvgF64IndexedAccumulator::new(self.column_index))
2260    }
2261
2262    fn result_field(&self) -> Field {
2263        Field::new(&self.field_name, DataType::Float64, true)
2264    }
2265
2266    fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2267        Box::new(self.clone())
2268    }
2269
2270    fn type_tag(&self) -> &'static str {
2271        "avg_f64"
2272    }
2273}
2274
2275/// f64 avg accumulator with embedded column index.
2276#[derive(Debug, Clone)]
2277pub struct AvgF64IndexedAccumulator {
2278    /// Column index
2279    column_index: usize,
2280    /// Running sum
2281    sum: f64,
2282    /// Count
2283    count: u64,
2284}
2285
2286impl AvgF64IndexedAccumulator {
2287    /// Creates a new indexed avg accumulator.
2288    #[must_use]
2289    pub fn new(column_index: usize) -> Self {
2290        Self {
2291            column_index,
2292            sum: 0.0,
2293            count: 0,
2294        }
2295    }
2296}
2297
2298impl DynAccumulator for AvgF64IndexedAccumulator {
2299    fn add_event(&mut self, event: &Event) {
2300        use arrow_array::cast::AsArray;
2301        use arrow_array::types::Float64Type;
2302
2303        let batch = &event.data;
2304        if self.column_index >= batch.num_columns() {
2305            return;
2306        }
2307        if let Some(array) = batch
2308            .column(self.column_index)
2309            .as_primitive_opt::<Float64Type>()
2310        {
2311            for val in array.iter().flatten() {
2312                self.sum += val;
2313                self.count += 1;
2314            }
2315        }
2316    }
2317
2318    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2319        let data = other.serialize();
2320        if data.len() >= 16 {
2321            let sum = f64::from_le_bytes(data[..8].try_into().unwrap());
2322            let count = u64::from_le_bytes(data[8..16].try_into().unwrap());
2323            self.sum += sum;
2324            self.count += count;
2325        }
2326    }
2327
2328    // Precision loss is acceptable for arithmetic mean
2329    #[allow(clippy::cast_precision_loss)]
2330    fn result_scalar(&self) -> ScalarResult {
2331        if self.count == 0 {
2332            ScalarResult::Null
2333        } else {
2334            ScalarResult::Float64(self.sum / self.count as f64)
2335        }
2336    }
2337
2338    fn is_empty(&self) -> bool {
2339        self.count == 0
2340    }
2341
2342    fn clone_box(&self) -> Box<dyn DynAccumulator> {
2343        Box::new(self.clone())
2344    }
2345
2346    fn serialize(&self) -> Vec<u8> {
2347        let mut buf = Vec::with_capacity(16);
2348        buf.extend_from_slice(&self.sum.to_le_bytes());
2349        buf.extend_from_slice(&self.count.to_le_bytes());
2350        buf
2351    }
2352
2353    fn result_field(&self) -> Field {
2354        Field::new("avg_f64", DataType::Float64, true)
2355    }
2356
2357    fn type_tag(&self) -> &'static str {
2358        "avg_f64"
2359    }
2360
2361    fn as_any(&self) -> &dyn std::any::Any {
2362        self
2363    }
2364}
2365
2366// ── Count DynAccumulator ────────────────────────────────────────────────────
2367
2368/// Count factory for [`DynAccumulator`].
2369#[derive(Debug, Clone)]
2370pub struct CountDynFactory {
2371    /// Output field name
2372    field_name: String,
2373}
2374
2375impl CountDynFactory {
2376    /// Creates a new count factory.
2377    #[must_use]
2378    pub fn new(field_name: impl Into<String>) -> Self {
2379        Self {
2380            field_name: field_name.into(),
2381        }
2382    }
2383}
2384
2385impl DynAggregatorFactory for CountDynFactory {
2386    fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2387        Box::new(CountDynAccumulator::default())
2388    }
2389
2390    fn result_field(&self) -> Field {
2391        Field::new(&self.field_name, DataType::Int64, false)
2392    }
2393
2394    fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2395        Box::new(self.clone())
2396    }
2397
2398    fn type_tag(&self) -> &'static str {
2399        "count"
2400    }
2401}
2402
2403/// Count accumulator implementing [`DynAccumulator`].
2404#[derive(Debug, Clone, Default)]
2405pub struct CountDynAccumulator {
2406    count: u64,
2407}
2408
2409impl DynAccumulator for CountDynAccumulator {
2410    fn add_event(&mut self, event: &Event) {
2411        let rows = event.data.num_rows();
2412        self.count += rows as u64;
2413    }
2414
2415    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2416        let data = other.serialize();
2417        if data.len() >= 8 {
2418            let count = u64::from_le_bytes(data[..8].try_into().unwrap());
2419            self.count += count;
2420        }
2421    }
2422
2423    fn result_scalar(&self) -> ScalarResult {
2424        ScalarResult::Int64(i64::try_from(self.count).unwrap_or(i64::MAX))
2425    }
2426
2427    fn is_empty(&self) -> bool {
2428        self.count == 0
2429    }
2430
2431    fn clone_box(&self) -> Box<dyn DynAccumulator> {
2432        Box::new(self.clone())
2433    }
2434
2435    fn serialize(&self) -> Vec<u8> {
2436        self.count.to_le_bytes().to_vec()
2437    }
2438
2439    fn result_field(&self) -> Field {
2440        Field::new("count", DataType::Int64, false)
2441    }
2442
2443    fn type_tag(&self) -> &'static str {
2444        "count"
2445    }
2446
2447    fn as_any(&self) -> &dyn std::any::Any {
2448        self
2449    }
2450}
2451
2452// ── FirstValue / LastValue DynAccumulator ───────────────────────────────────
2453
2454/// `FIRST_VALUE` factory for f64 columns via [`DynAccumulator`].
2455#[derive(Debug, Clone)]
2456pub struct FirstValueF64DynFactory {
2457    /// Column index to extract value from
2458    value_column_index: usize,
2459    /// Column index for event timestamp
2460    timestamp_column_index: usize,
2461    /// Output field name
2462    field_name: String,
2463}
2464
2465impl FirstValueF64DynFactory {
2466    /// Creates a new `FIRST_VALUE` factory for f64 columns.
2467    #[must_use]
2468    pub fn new(
2469        value_column_index: usize,
2470        timestamp_column_index: usize,
2471        field_name: impl Into<String>,
2472    ) -> Self {
2473        Self {
2474            value_column_index,
2475            timestamp_column_index,
2476            field_name: field_name.into(),
2477        }
2478    }
2479}
2480
2481impl DynAggregatorFactory for FirstValueF64DynFactory {
2482    fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2483        Box::new(FirstValueF64DynAccumulator::new(
2484            self.value_column_index,
2485            self.timestamp_column_index,
2486        ))
2487    }
2488
2489    fn result_field(&self) -> Field {
2490        Field::new(&self.field_name, DataType::Float64, true)
2491    }
2492
2493    fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2494        Box::new(self.clone())
2495    }
2496
2497    fn type_tag(&self) -> &'static str {
2498        "first_value_f64"
2499    }
2500}
2501
2502/// `FIRST_VALUE` accumulator for f64 columns via [`DynAccumulator`].
2503#[derive(Debug, Clone)]
2504pub struct FirstValueF64DynAccumulator {
2505    value_column_index: usize,
2506    timestamp_column_index: usize,
2507    value: Option<f64>,
2508    timestamp: Option<i64>,
2509}
2510
2511impl FirstValueF64DynAccumulator {
2512    /// Creates a new `FIRST_VALUE` dyn accumulator.
2513    #[must_use]
2514    pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
2515        Self {
2516            value_column_index,
2517            timestamp_column_index,
2518            value: None,
2519            timestamp: None,
2520        }
2521    }
2522}
2523
2524impl DynAccumulator for FirstValueF64DynAccumulator {
2525    fn add_event(&mut self, event: &Event) {
2526        use arrow_array::cast::AsArray;
2527        use arrow_array::types::{Float64Type, Int64Type};
2528
2529        let batch = &event.data;
2530        if self.value_column_index >= batch.num_columns()
2531            || self.timestamp_column_index >= batch.num_columns()
2532        {
2533            return;
2534        }
2535
2536        let val_col = batch.column(self.value_column_index);
2537        let ts_col = batch.column(self.timestamp_column_index);
2538
2539        let Some(val_array) = val_col.as_primitive_opt::<Float64Type>() else {
2540            return;
2541        };
2542        let Some(ts_array) = ts_col.as_primitive_opt::<Int64Type>() else {
2543            return;
2544        };
2545
2546        for i in 0..batch.num_rows() {
2547            if val_array.is_null(i) || ts_array.is_null(i) {
2548                continue;
2549            }
2550            let val = val_array.value(i);
2551            let ts = ts_array.value(i);
2552
2553            match self.timestamp {
2554                None => {
2555                    self.value = Some(val);
2556                    self.timestamp = Some(ts);
2557                }
2558                Some(existing_ts) if ts < existing_ts => {
2559                    self.value = Some(val);
2560                    self.timestamp = Some(ts);
2561                }
2562                _ => {}
2563            }
2564        }
2565    }
2566
2567    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2568        let data = other.serialize();
2569        if data.len() >= 17 && data[0] == 1 {
2570            let other_val = f64::from_le_bytes(data[1..9].try_into().unwrap());
2571            let other_ts = i64::from_le_bytes(data[9..17].try_into().unwrap());
2572            match self.timestamp {
2573                None => {
2574                    self.value = Some(other_val);
2575                    self.timestamp = Some(other_ts);
2576                }
2577                Some(self_ts) if other_ts < self_ts => {
2578                    self.value = Some(other_val);
2579                    self.timestamp = Some(other_ts);
2580                }
2581                _ => {}
2582            }
2583        }
2584    }
2585
2586    fn result_scalar(&self) -> ScalarResult {
2587        ScalarResult::OptionalFloat64(self.value)
2588    }
2589
2590    fn is_empty(&self) -> bool {
2591        self.value.is_none()
2592    }
2593
2594    fn clone_box(&self) -> Box<dyn DynAccumulator> {
2595        Box::new(self.clone())
2596    }
2597
2598    fn serialize(&self) -> Vec<u8> {
2599        match (self.value, self.timestamp) {
2600            (Some(v), Some(ts)) => {
2601                let mut buf = Vec::with_capacity(17);
2602                buf.push(1);
2603                buf.extend_from_slice(&v.to_le_bytes());
2604                buf.extend_from_slice(&ts.to_le_bytes());
2605                buf
2606            }
2607            _ => vec![0],
2608        }
2609    }
2610
2611    fn result_field(&self) -> Field {
2612        Field::new("first_value_f64", DataType::Float64, true)
2613    }
2614
2615    fn type_tag(&self) -> &'static str {
2616        "first_value_f64"
2617    }
2618
2619    fn as_any(&self) -> &dyn std::any::Any {
2620        self
2621    }
2622}
2623
2624/// `LAST_VALUE` factory for f64 columns via [`DynAccumulator`].
2625#[derive(Debug, Clone)]
2626pub struct LastValueF64DynFactory {
2627    /// Column index to extract value from
2628    value_column_index: usize,
2629    /// Column index for event timestamp
2630    timestamp_column_index: usize,
2631    /// Output field name
2632    field_name: String,
2633}
2634
2635impl LastValueF64DynFactory {
2636    /// Creates a new `LAST_VALUE` factory for f64 columns.
2637    #[must_use]
2638    pub fn new(
2639        value_column_index: usize,
2640        timestamp_column_index: usize,
2641        field_name: impl Into<String>,
2642    ) -> Self {
2643        Self {
2644            value_column_index,
2645            timestamp_column_index,
2646            field_name: field_name.into(),
2647        }
2648    }
2649}
2650
2651impl DynAggregatorFactory for LastValueF64DynFactory {
2652    fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2653        Box::new(LastValueF64DynAccumulator::new(
2654            self.value_column_index,
2655            self.timestamp_column_index,
2656        ))
2657    }
2658
2659    fn result_field(&self) -> Field {
2660        Field::new(&self.field_name, DataType::Float64, true)
2661    }
2662
2663    fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2664        Box::new(self.clone())
2665    }
2666
2667    fn type_tag(&self) -> &'static str {
2668        "last_value_f64"
2669    }
2670}
2671
2672/// `LAST_VALUE` accumulator for f64 columns via [`DynAccumulator`].
2673#[derive(Debug, Clone)]
2674pub struct LastValueF64DynAccumulator {
2675    value_column_index: usize,
2676    timestamp_column_index: usize,
2677    value: Option<f64>,
2678    timestamp: Option<i64>,
2679}
2680
2681impl LastValueF64DynAccumulator {
2682    /// Creates a new `LAST_VALUE` dyn accumulator.
2683    #[must_use]
2684    pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
2685        Self {
2686            value_column_index,
2687            timestamp_column_index,
2688            value: None,
2689            timestamp: None,
2690        }
2691    }
2692}
2693
2694impl DynAccumulator for LastValueF64DynAccumulator {
2695    fn add_event(&mut self, event: &Event) {
2696        use arrow_array::cast::AsArray;
2697        use arrow_array::types::{Float64Type, Int64Type};
2698
2699        let batch = &event.data;
2700        if self.value_column_index >= batch.num_columns()
2701            || self.timestamp_column_index >= batch.num_columns()
2702        {
2703            return;
2704        }
2705
2706        let val_col = batch.column(self.value_column_index);
2707        let ts_col = batch.column(self.timestamp_column_index);
2708
2709        let Some(val_array) = val_col.as_primitive_opt::<Float64Type>() else {
2710            return;
2711        };
2712        let Some(ts_array) = ts_col.as_primitive_opt::<Int64Type>() else {
2713            return;
2714        };
2715
2716        for i in 0..batch.num_rows() {
2717            if val_array.is_null(i) || ts_array.is_null(i) {
2718                continue;
2719            }
2720            let val = val_array.value(i);
2721            let ts = ts_array.value(i);
2722
2723            match self.timestamp {
2724                None => {
2725                    self.value = Some(val);
2726                    self.timestamp = Some(ts);
2727                }
2728                Some(existing_ts) if ts >= existing_ts => {
2729                    self.value = Some(val);
2730                    self.timestamp = Some(ts);
2731                }
2732                _ => {}
2733            }
2734        }
2735    }
2736
2737    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2738        let data = other.serialize();
2739        if data.len() >= 17 && data[0] == 1 {
2740            let other_val = f64::from_le_bytes(data[1..9].try_into().unwrap());
2741            let other_ts = i64::from_le_bytes(data[9..17].try_into().unwrap());
2742            match self.timestamp {
2743                None => {
2744                    self.value = Some(other_val);
2745                    self.timestamp = Some(other_ts);
2746                }
2747                Some(self_ts) if other_ts >= self_ts => {
2748                    self.value = Some(other_val);
2749                    self.timestamp = Some(other_ts);
2750                }
2751                _ => {}
2752            }
2753        }
2754    }
2755
2756    fn result_scalar(&self) -> ScalarResult {
2757        ScalarResult::OptionalFloat64(self.value)
2758    }
2759
2760    fn is_empty(&self) -> bool {
2761        self.value.is_none()
2762    }
2763
2764    fn clone_box(&self) -> Box<dyn DynAccumulator> {
2765        Box::new(self.clone())
2766    }
2767
2768    fn serialize(&self) -> Vec<u8> {
2769        match (self.value, self.timestamp) {
2770            (Some(v), Some(ts)) => {
2771                let mut buf = Vec::with_capacity(17);
2772                buf.push(1);
2773                buf.extend_from_slice(&v.to_le_bytes());
2774                buf.extend_from_slice(&ts.to_le_bytes());
2775                buf
2776            }
2777            _ => vec![0],
2778        }
2779    }
2780
2781    fn result_field(&self) -> Field {
2782        Field::new("last_value_f64", DataType::Float64, true)
2783    }
2784
2785    fn type_tag(&self) -> &'static str {
2786        "last_value_f64"
2787    }
2788
2789    fn as_any(&self) -> &dyn std::any::Any {
2790        self
2791    }
2792}
2793
2794// ── Composite Aggregator ────────────────────────────────────────────────────
2795
2796/// Composite aggregator combining multiple [`DynAggregatorFactory`] instances.
2797///
2798/// Produces multi-column output: `window_start, window_end, field_0, field_1, ...`
2799///
2800/// # Example
2801///
2802/// ```rust,no_run
2803/// use laminar_core::operator::window::{
2804///     CompositeAggregator, CountDynFactory, MaxF64Factory, MinF64Factory,
2805/// };
2806///
2807/// let agg = CompositeAggregator::new(vec![
2808///     Box::new(CountDynFactory::new("trade_count")),
2809///     Box::new(MinF64Factory::new(1, "low")),
2810///     Box::new(MaxF64Factory::new(1, "high")),
2811/// ]);
2812/// assert_eq!(agg.num_aggregates(), 3);
2813/// ```
2814pub struct CompositeAggregator {
2815    /// Factories for creating sub-accumulators
2816    factories: Vec<Box<dyn DynAggregatorFactory>>,
2817    /// Cached output schema (built once in constructor)
2818    cached_schema: SchemaRef,
2819}
2820
2821impl std::fmt::Debug for CompositeAggregator {
2822    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2823        f.debug_struct("CompositeAggregator")
2824            .field("num_factories", &self.factories.len())
2825            .finish_non_exhaustive()
2826    }
2827}
2828
2829impl CompositeAggregator {
2830    /// Creates a new composite aggregator from a list of factories.
2831    #[must_use]
2832    pub fn new(factories: Vec<Box<dyn DynAggregatorFactory>>) -> Self {
2833        let mut fields = vec![
2834            Field::new("window_start", DataType::Int64, false),
2835            Field::new("window_end", DataType::Int64, false),
2836        ];
2837        fields.extend(factories.iter().map(|f| f.result_field()));
2838        let cached_schema = Arc::new(Schema::new(fields));
2839        Self {
2840            factories,
2841            cached_schema,
2842        }
2843    }
2844
2845    /// Returns the number of sub-aggregates.
2846    #[must_use]
2847    pub fn num_aggregates(&self) -> usize {
2848        self.factories.len()
2849    }
2850
2851    /// Creates a new composite accumulator with all sub-accumulators.
2852    #[must_use]
2853    pub fn create_accumulator(&self) -> CompositeAccumulator {
2854        let accumulators = self
2855            .factories
2856            .iter()
2857            .map(|f| f.create_accumulator())
2858            .collect();
2859        CompositeAccumulator { accumulators }
2860    }
2861
2862    /// Returns the result fields for all sub-aggregates.
2863    #[must_use]
2864    pub fn result_fields(&self) -> Vec<Field> {
2865        self.factories.iter().map(|f| f.result_field()).collect()
2866    }
2867
2868    /// Creates the output schema: `window_start, window_end, [aggregate fields]`.
2869    #[must_use]
2870    pub fn output_schema(&self) -> SchemaRef {
2871        Arc::clone(&self.cached_schema)
2872    }
2873}
2874
2875impl Clone for CompositeAggregator {
2876    fn clone(&self) -> Self {
2877        let factories: Vec<Box<dyn DynAggregatorFactory>> =
2878            self.factories.iter().map(|f| f.clone_box()).collect();
2879        Self {
2880            cached_schema: Arc::clone(&self.cached_schema),
2881            factories,
2882        }
2883    }
2884}
2885
2886/// Composite accumulator holding multiple [`DynAccumulator`] instances.
2887///
2888/// Fans out each event to all sub-accumulators and collects results
2889/// as a multi-column [`RecordBatch`].
2890pub struct CompositeAccumulator {
2891    /// Sub-accumulators (one per aggregate function)
2892    accumulators: Vec<Box<dyn DynAccumulator>>,
2893}
2894
2895impl std::fmt::Debug for CompositeAccumulator {
2896    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2897        f.debug_struct("CompositeAccumulator")
2898            .field("num_accumulators", &self.accumulators.len())
2899            .finish()
2900    }
2901}
2902
2903impl CompositeAccumulator {
2904    /// Adds an event to all sub-accumulators.
2905    pub fn add_event(&mut self, event: &Event) {
2906        for acc in &mut self.accumulators {
2907            acc.add_event(event);
2908        }
2909    }
2910
2911    /// Merges another composite accumulator into this one.
2912    ///
2913    /// # Panics
2914    ///
2915    /// Panics if the other accumulator has a different number of sub-accumulators.
2916    pub fn merge(&mut self, other: &Self) {
2917        assert_eq!(
2918            self.accumulators.len(),
2919            other.accumulators.len(),
2920            "Cannot merge composite accumulators with different sizes"
2921        );
2922        for (self_acc, other_acc) in self.accumulators.iter_mut().zip(&other.accumulators) {
2923            self_acc.merge_dyn(other_acc.as_ref());
2924        }
2925    }
2926
2927    /// Returns all results as [`ScalarResult`] values.
2928    #[must_use]
2929    pub fn results(&self) -> Vec<ScalarResult> {
2930        self.accumulators
2931            .iter()
2932            .map(|a| a.result_scalar())
2933            .collect()
2934    }
2935
2936    /// Returns true if all sub-accumulators are empty.
2937    #[must_use]
2938    pub fn is_empty(&self) -> bool {
2939        self.accumulators.iter().all(|a| a.is_empty())
2940    }
2941
2942    /// Serializes all sub-accumulators for checkpointing.
2943    #[must_use]
2944    #[allow(clippy::cast_possible_truncation)] // Wire format uses fixed-width integers
2945    pub fn serialize(&self) -> Vec<u8> {
2946        let mut buf = Vec::new();
2947        // Header: number of accumulators (u32)
2948        let n = self.accumulators.len() as u32;
2949        buf.extend_from_slice(&n.to_le_bytes());
2950        for acc in &self.accumulators {
2951            let tag = acc.type_tag();
2952            let tag_bytes = tag.as_bytes();
2953            // Tag length (u16) + tag + data length (u32) + data
2954            buf.extend_from_slice(&(tag_bytes.len() as u16).to_le_bytes());
2955            buf.extend_from_slice(tag_bytes);
2956            let data = acc.serialize();
2957            buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
2958            buf.extend_from_slice(&data);
2959        }
2960        buf
2961    }
2962
2963    /// Creates a multi-column [`RecordBatch`] from the results.
2964    ///
2965    /// The batch has columns: `window_start, window_end, [aggregate results]`.
2966    ///
2967    /// # Errors
2968    ///
2969    /// Returns `None` if the batch cannot be created.
2970    #[must_use]
2971    pub fn to_record_batch(&self, window_id: &WindowId, schema: &SchemaRef) -> Option<RecordBatch> {
2972        use arrow_array::{Float64Array, UInt64Array};
2973
2974        let mut columns: Vec<Arc<dyn arrow_array::Array>> = vec![
2975            Arc::new(Int64Array::from(vec![window_id.start])),
2976            Arc::new(Int64Array::from(vec![window_id.end])),
2977        ];
2978
2979        for result in self.results() {
2980            let col: Arc<dyn arrow_array::Array> = match result {
2981                ScalarResult::Int64(v) => Arc::new(Int64Array::from(vec![v])),
2982                ScalarResult::Float64(v) => Arc::new(Float64Array::from(vec![v])),
2983                ScalarResult::UInt64(v) => Arc::new(UInt64Array::from(vec![v])),
2984                ScalarResult::OptionalInt64(v) => Arc::new(Int64Array::from(vec![v])),
2985                ScalarResult::OptionalFloat64(v) => Arc::new(Float64Array::from(vec![v])),
2986                ScalarResult::Null => Arc::new(Int64Array::new_null(1)),
2987            };
2988            columns.push(col);
2989        }
2990
2991        RecordBatch::try_new(Arc::clone(schema), columns).ok()
2992    }
2993
2994    /// Returns the number of sub-accumulators.
2995    #[must_use]
2996    pub fn num_accumulators(&self) -> usize {
2997        self.accumulators.len()
2998    }
2999}
3000
3001impl Clone for CompositeAccumulator {
3002    fn clone(&self) -> Self {
3003        Self {
3004            accumulators: self.accumulators.iter().map(|a| a.clone_box()).collect(),
3005        }
3006    }
3007}
3008
3009// End F074
3010
3011/// State key prefix for window accumulators (4 bytes)
3012const WINDOW_STATE_PREFIX: &[u8; 4] = b"win:";
3013
3014/// Total size of window state key: prefix (4) + `WindowId` (16) = 20 bytes
3015const WINDOW_STATE_KEY_SIZE: usize = 4 + 16;
3016
3017/// Tumbling window operator.
3018///
3019/// Processes events through non-overlapping, fixed-size time windows.
3020/// Events are assigned to windows based on their timestamps, aggregated,
3021/// and results are emitted based on the configured [`EmitStrategy`].
3022///
3023/// # Emit Strategies
3024///
3025/// - `OnWatermark` (default): Emit when watermark passes window end
3026/// - `Periodic`: Emit intermediate results at intervals, final on watermark
3027/// - `OnUpdate`: Emit after every state update
3028///
3029/// # Late Data Handling
3030///
3031/// Events that arrive after `window_end + allowed_lateness` are considered late.
3032/// Their behavior is controlled by [`LateDataConfig`]:
3033/// - Drop the event (default)
3034/// - Route to a named side output for separate processing
3035///
3036/// # State Management
3037///
3038/// Window state is stored in the operator context's state store using
3039/// prefixed keys:
3040/// - `win:<window_id>` - Accumulator state
3041/// - `meta:<window_id>` - Window metadata (registration status, etc.)
3042///
3043/// # Watermark Triggering
3044///
3045/// Windows are triggered when the watermark advances past `window_end + allowed_lateness`.
3046/// This ensures late data within the grace period is still processed.
3047pub struct TumblingWindowOperator<A: Aggregator> {
3048    /// Window assigner
3049    assigner: TumblingWindowAssigner,
3050    /// Aggregator function
3051    aggregator: A,
3052    /// Allowed lateness for late data
3053    allowed_lateness_ms: i64,
3054    /// Track registered timers to avoid duplicates
3055    registered_windows: std::collections::HashSet<WindowId>,
3056    /// Track windows with registered periodic timers
3057    periodic_timer_windows: std::collections::HashSet<WindowId>,
3058    /// Emit strategy for controlling when results are output
3059    emit_strategy: EmitStrategy,
3060    /// Late data handling configuration
3061    late_data_config: LateDataConfig,
3062    /// Metrics for late data tracking
3063    late_data_metrics: LateDataMetrics,
3064    /// Operator ID for checkpointing
3065    operator_id: String,
3066    /// Cached output schema (avoids allocation on every emit)
3067    output_schema: SchemaRef,
3068    /// Phantom data for accumulator type
3069    _phantom: PhantomData<A::Acc>,
3070}
3071
3072/// Static counter for generating unique operator IDs without allocation.
3073static OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
3074
3075/// Creates the standard window output schema.
3076///
3077/// This schema is used for all window aggregation results.
3078fn create_window_output_schema() -> SchemaRef {
3079    Arc::new(Schema::new(vec![
3080        Field::new("window_start", DataType::Int64, false),
3081        Field::new("window_end", DataType::Int64, false),
3082        Field::new("result", DataType::Int64, false),
3083    ]))
3084}
3085
3086impl<A: Aggregator> TumblingWindowOperator<A>
3087where
3088    A::Acc: Archive + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
3089    <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
3090        + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
3091{
3092    /// Creates a new tumbling window operator.
3093    ///
3094    /// # Arguments
3095    ///
3096    /// * `assigner` - Window assigner for determining window boundaries
3097    /// * `aggregator` - Aggregation function to apply within windows
3098    /// * `allowed_lateness` - Grace period for late data after window close
3099    /// # Panics
3100    ///
3101    /// Panics if allowed lateness does not fit in i64.
3102    #[must_use]
3103    pub fn new(
3104        assigner: TumblingWindowAssigner,
3105        aggregator: A,
3106        allowed_lateness: Duration,
3107    ) -> Self {
3108        let operator_num = OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
3109        Self {
3110            assigner,
3111            aggregator,
3112            // Ensure lateness fits in i64
3113            allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
3114                .expect("Allowed lateness must fit in i64"),
3115            registered_windows: std::collections::HashSet::new(),
3116            periodic_timer_windows: std::collections::HashSet::new(),
3117            emit_strategy: EmitStrategy::default(),
3118            late_data_config: LateDataConfig::default(),
3119            late_data_metrics: LateDataMetrics::new(),
3120            operator_id: format!("tumbling_window_{operator_num}"),
3121            output_schema: create_window_output_schema(),
3122            _phantom: PhantomData,
3123        }
3124    }
3125
3126    /// Creates a new tumbling window operator with a custom operator ID.
3127    /// # Panics
3128    ///
3129    /// Panics if allowed lateness does not fit in i64.
3130    #[must_use]
3131    pub fn with_id(
3132        assigner: TumblingWindowAssigner,
3133        aggregator: A,
3134        allowed_lateness: Duration,
3135        operator_id: String,
3136    ) -> Self {
3137        Self {
3138            assigner,
3139            aggregator,
3140            // Ensure lateness fits in i64
3141            allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
3142                .expect("Allowed lateness must fit in i64"),
3143            registered_windows: std::collections::HashSet::new(),
3144            periodic_timer_windows: std::collections::HashSet::new(),
3145            emit_strategy: EmitStrategy::default(),
3146            late_data_config: LateDataConfig::default(),
3147            late_data_metrics: LateDataMetrics::new(),
3148            operator_id,
3149            output_schema: create_window_output_schema(),
3150            _phantom: PhantomData,
3151        }
3152    }
3153
3154    /// Sets the emit strategy for this window operator.
3155    ///
3156    /// # Arguments
3157    ///
3158    /// * `strategy` - The emit strategy to use
3159    ///
3160    /// # Example
3161    ///
3162    /// ```rust,no_run
3163    /// use laminar_core::operator::window::{
3164    ///     TumblingWindowAssigner, TumblingWindowOperator, CountAggregator, EmitStrategy,
3165    /// };
3166    /// use std::time::Duration;
3167    ///
3168    /// let assigner = TumblingWindowAssigner::new(Duration::from_secs(60));
3169    /// let mut operator = TumblingWindowOperator::new(
3170    ///     assigner,
3171    ///     CountAggregator::new(),
3172    ///     Duration::from_secs(5),
3173    /// );
3174    ///
3175    /// // Emit every 10 seconds instead of waiting for watermark
3176    /// operator.set_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(10)));
3177    /// ```
3178    pub fn set_emit_strategy(&mut self, strategy: EmitStrategy) {
3179        self.emit_strategy = strategy;
3180    }
3181
3182    /// Returns the current emit strategy.
3183    #[must_use]
3184    pub fn emit_strategy(&self) -> &EmitStrategy {
3185        &self.emit_strategy
3186    }
3187
3188    /// Sets the late data handling configuration.
3189    ///
3190    /// # Arguments
3191    ///
3192    /// * `config` - The late data configuration to use
3193    ///
3194    /// # Example
3195    ///
3196    /// ```rust,no_run
3197    /// use laminar_core::operator::window::{
3198    ///     TumblingWindowAssigner, TumblingWindowOperator, CountAggregator, LateDataConfig,
3199    /// };
3200    /// use std::time::Duration;
3201    ///
3202    /// let assigner = TumblingWindowAssigner::new(Duration::from_secs(60));
3203    /// let mut operator = TumblingWindowOperator::new(
3204    ///     assigner,
3205    ///     CountAggregator::new(),
3206    ///     Duration::from_secs(5),
3207    /// );
3208    ///
3209    /// // Route late events to a side output
3210    /// operator.set_late_data_config(LateDataConfig::with_side_output("late_events".to_string()));
3211    /// ```
3212    pub fn set_late_data_config(&mut self, config: LateDataConfig) {
3213        self.late_data_config = config;
3214    }
3215
3216    /// Returns the current late data configuration.
3217    #[must_use]
3218    pub fn late_data_config(&self) -> &LateDataConfig {
3219        &self.late_data_config
3220    }
3221
3222    /// Returns the late data metrics.
3223    ///
3224    /// Use this to monitor late data behavior and set up alerts.
3225    #[must_use]
3226    pub fn late_data_metrics(&self) -> &LateDataMetrics {
3227        &self.late_data_metrics
3228    }
3229
3230    /// Resets the late data metrics counters.
3231    pub fn reset_late_data_metrics(&mut self) {
3232        self.late_data_metrics.reset();
3233    }
3234
3235    /// Returns the window assigner.
3236    #[must_use]
3237    pub fn assigner(&self) -> &TumblingWindowAssigner {
3238        &self.assigner
3239    }
3240
3241    /// Returns the allowed lateness in milliseconds.
3242    #[must_use]
3243    pub fn allowed_lateness_ms(&self) -> i64 {
3244        self.allowed_lateness_ms
3245    }
3246
3247    /// Generates the state key for a window's accumulator.
3248    ///
3249    /// Returns a stack-allocated fixed-size array to avoid heap allocation
3250    /// on the hot path. This is critical for Ring 0 performance.
3251    #[inline]
3252    fn state_key(window_id: &WindowId) -> [u8; WINDOW_STATE_KEY_SIZE] {
3253        let mut key = [0u8; WINDOW_STATE_KEY_SIZE];
3254        key[..4].copy_from_slice(WINDOW_STATE_PREFIX);
3255        let window_key = window_id.to_key_inline();
3256        key[4..20].copy_from_slice(&window_key);
3257        key
3258    }
3259
3260    /// Gets the accumulator for a window, creating a new one if needed.
3261    fn get_accumulator(&self, window_id: &WindowId, state: &dyn StateStore) -> A::Acc {
3262        let key = Self::state_key(window_id);
3263        state
3264            .get_typed::<A::Acc>(&key)
3265            .ok()
3266            .flatten()
3267            .unwrap_or_else(|| self.aggregator.create_accumulator())
3268    }
3269
3270    /// Stores the accumulator for a window.
3271    fn put_accumulator(
3272        window_id: &WindowId,
3273        acc: &A::Acc,
3274        state: &mut dyn StateStore,
3275    ) -> Result<(), OperatorError> {
3276        let key = Self::state_key(window_id);
3277        state
3278            .put_typed(&key, acc)
3279            .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
3280    }
3281
3282    /// Deletes the accumulator for a window.
3283    fn delete_accumulator(
3284        window_id: &WindowId,
3285        state: &mut dyn StateStore,
3286    ) -> Result<(), OperatorError> {
3287        let key = Self::state_key(window_id);
3288        state
3289            .delete(&key)
3290            .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
3291    }
3292
3293    /// Checks if an event is late (after window close + allowed lateness).
3294    fn is_late(&self, event_time: i64, watermark: i64) -> bool {
3295        let window_id = self.assigner.assign(event_time);
3296        let cleanup_time = window_id.end + self.allowed_lateness_ms;
3297        watermark >= cleanup_time
3298    }
3299
3300    /// Registers a timer for window triggering if not already registered.
3301    fn maybe_register_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
3302        if !self.registered_windows.contains(&window_id) {
3303            // Register timer at window_end + allowed_lateness
3304            let trigger_time = window_id.end + self.allowed_lateness_ms;
3305            ctx.timers.register_timer(
3306                trigger_time,
3307                Some(window_id.to_key()),
3308                Some(ctx.operator_index),
3309            );
3310            self.registered_windows.insert(window_id);
3311        }
3312    }
3313
3314    /// Registers a periodic timer for intermediate emissions.
3315    ///
3316    /// The timer key uses a special encoding to distinguish from final timers:
3317    /// - Final timers: raw `WindowId` bytes (16 bytes)
3318    /// - Periodic timers: `WindowId` with high bit set in first byte
3319    fn maybe_register_periodic_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
3320        if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
3321            if !self.periodic_timer_windows.contains(&window_id) {
3322                // Register first periodic timer at processing_time + interval
3323                let interval_ms =
3324                    i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
3325                let trigger_time = ctx.processing_time + interval_ms;
3326
3327                // Create a key with high bit set to distinguish from final timers
3328                let key = Self::periodic_timer_key(&window_id);
3329
3330                ctx.timers
3331                    .register_timer(trigger_time, Some(key), Some(ctx.operator_index));
3332                self.periodic_timer_windows.insert(window_id);
3333            }
3334        }
3335    }
3336
3337    /// Creates a periodic timer key from a window ID.
3338    ///
3339    /// Uses the high bit of the first byte as a marker to distinguish
3340    /// periodic timers from final watermark timers.
3341    #[inline]
3342    fn periodic_timer_key(window_id: &WindowId) -> super::TimerKey {
3343        let mut key = window_id.to_key();
3344        // Set the high bit of the first byte to mark as periodic
3345        if !key.is_empty() {
3346            key[0] |= 0x80;
3347        }
3348        key
3349    }
3350
3351    /// Checks if a timer key is for a periodic timer.
3352    #[inline]
3353    fn is_periodic_timer_key(key: &[u8]) -> bool {
3354        !key.is_empty() && (key[0] & 0x80) != 0
3355    }
3356
3357    /// Extracts the window ID from a periodic timer key.
3358    #[inline]
3359    fn window_id_from_periodic_key(key: &[u8]) -> Option<WindowId> {
3360        if key.len() != 16 {
3361            return None;
3362        }
3363        let mut clean_key = [0u8; 16];
3364        clean_key.copy_from_slice(key);
3365        // Clear the high bit to get the original window ID
3366        clean_key[0] &= 0x7F;
3367        WindowId::from_key(&clean_key)
3368    }
3369
3370    /// Creates an intermediate result for a window without cleaning up state.
3371    ///
3372    /// Returns `None` if the window is empty.
3373    fn create_intermediate_result(
3374        &self,
3375        window_id: &WindowId,
3376        state: &dyn crate::state::StateStore,
3377    ) -> Option<Event> {
3378        let acc = self.get_accumulator(window_id, state);
3379
3380        if acc.is_empty() {
3381            return None;
3382        }
3383
3384        let result = acc.result();
3385        let result_i64 = result.to_i64();
3386
3387        let batch = RecordBatch::try_new(
3388            Arc::clone(&self.output_schema),
3389            vec![
3390                Arc::new(Int64Array::from(vec![window_id.start])),
3391                Arc::new(Int64Array::from(vec![window_id.end])),
3392                Arc::new(Int64Array::from(vec![result_i64])),
3393            ],
3394        )
3395        .ok()?;
3396
3397        Some(Event::new(window_id.end, batch))
3398    }
3399
3400    /// Handles periodic timer expiration for intermediate emissions.
3401    fn handle_periodic_timer(
3402        &mut self,
3403        window_id: WindowId,
3404        ctx: &mut OperatorContext,
3405    ) -> OutputVec {
3406        let mut output = OutputVec::new();
3407
3408        // Check if window is still valid (not yet closed by watermark)
3409        if !self.registered_windows.contains(&window_id) {
3410            // Window already closed, remove from periodic tracking
3411            self.periodic_timer_windows.remove(&window_id);
3412            return output;
3413        }
3414
3415        // Emit intermediate result
3416        if let Some(event) = self.create_intermediate_result(&window_id, ctx.state) {
3417            output.push(Output::Event(event));
3418        }
3419
3420        // Schedule next periodic timer if still within window
3421        if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
3422            let interval_ms =
3423                i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
3424            let next_trigger = ctx.processing_time + interval_ms;
3425
3426            // Only schedule if the window hasn't closed yet
3427            let window_close_time = window_id.end + self.allowed_lateness_ms;
3428            if next_trigger < window_close_time {
3429                let key = Self::periodic_timer_key(&window_id);
3430                ctx.timers
3431                    .register_timer(next_trigger, Some(key), Some(ctx.operator_index));
3432            }
3433        }
3434
3435        output
3436    }
3437}
3438
3439impl<A: Aggregator> Operator for TumblingWindowOperator<A>
3440where
3441    A::Acc: 'static
3442        + Archive
3443        + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
3444    <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
3445        + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
3446{
3447    fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
3448        let event_time = event.timestamp;
3449
3450        // Update watermark with the new event and get any emitted watermark
3451        let emitted_watermark = ctx.watermark_generator.on_event(event_time);
3452
3453        // Check if this event is too late (beyond allowed lateness)
3454        // Use the current watermark (not just the newly emitted one) for the check
3455        let current_wm = ctx.watermark_generator.current_watermark();
3456        if current_wm > i64::MIN && self.is_late(event_time, current_wm) {
3457            let mut output = OutputVec::new();
3458
3459            // F011B: EMIT FINAL drops late data entirely
3460            if self.emit_strategy.drops_late_data() {
3461                self.late_data_metrics.record_dropped();
3462                return output; // Silently drop - no LateEvent output
3463            }
3464
3465            // Handle late event based on configuration
3466            if let Some(side_output_name) = self.late_data_config.side_output() {
3467                // Route to named side output
3468                self.late_data_metrics.record_side_output();
3469                output.push(Output::SideOutput {
3470                    name: side_output_name.to_string(),
3471                    event: event.clone(),
3472                });
3473            } else {
3474                // No side output configured - emit as LateEvent (may be dropped by downstream)
3475                self.late_data_metrics.record_dropped();
3476                output.push(Output::LateEvent(event.clone()));
3477            }
3478            return output;
3479        }
3480
3481        // Assign event to window
3482        let window_id = self.assigner.assign(event_time);
3483
3484        // Track if state was updated (for OnUpdate and Changelog strategies)
3485        let mut state_updated = false;
3486
3487        // Extract value and update accumulator
3488        if let Some(value) = self.aggregator.extract(event) {
3489            let mut acc = self.get_accumulator(&window_id, ctx.state);
3490            acc.add(value);
3491            if let Err(e) = Self::put_accumulator(&window_id, &acc, ctx.state) {
3492                // Log error but don't fail - we'll retry on next event
3493                tracing::error!("Failed to store window state: {e}");
3494            } else {
3495                state_updated = true;
3496            }
3497        }
3498
3499        // Register timer for this window (watermark-based final emission)
3500        self.maybe_register_timer(window_id, ctx);
3501
3502        // F011B: OnWindowClose and Final suppress intermediate emissions
3503        // Don't register periodic timers for these strategies
3504        if !self.emit_strategy.suppresses_intermediate() {
3505            self.maybe_register_periodic_timer(window_id, ctx);
3506        }
3507
3508        // Emit watermark update if generated
3509        let mut output = OutputVec::new();
3510        if let Some(wm) = emitted_watermark {
3511            output.push(Output::Watermark(wm.timestamp()));
3512        }
3513
3514        // F011B: Handle different emit strategies
3515        if state_updated {
3516            match &self.emit_strategy {
3517                // OnUpdate: emit intermediate result as regular event
3518                EmitStrategy::OnUpdate => {
3519                    if let Some(event) = self.create_intermediate_result(&window_id, ctx.state) {
3520                        output.push(Output::Event(event));
3521                    }
3522                }
3523                // Changelog: emit changelog record on every update
3524                EmitStrategy::Changelog => {
3525                    if let Some(event) = self.create_intermediate_result(&window_id, ctx.state) {
3526                        // For intermediate updates in changelog mode, we emit as insert
3527                        // Full CDC support (with retractions) requires F063
3528                        let record = ChangelogRecord::insert(event, ctx.processing_time);
3529                        output.push(Output::Changelog(record));
3530                    }
3531                }
3532                // Other strategies: no intermediate emission
3533                EmitStrategy::OnWatermark
3534                | EmitStrategy::Periodic(_)
3535                | EmitStrategy::OnWindowClose
3536                | EmitStrategy::Final => {}
3537            }
3538        }
3539
3540        output
3541    }
3542
3543    fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
3544        // Check if this is a periodic timer (high bit set)
3545        if Self::is_periodic_timer_key(&timer.key) {
3546            // F011B: OnWindowClose and Final suppress periodic emissions
3547            if self.emit_strategy.suppresses_intermediate() {
3548                // Don't emit, just clean up the periodic timer tracking
3549                if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
3550                    self.periodic_timer_windows.remove(&window_id);
3551                }
3552                return OutputVec::new();
3553            }
3554
3555            if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
3556                return self.handle_periodic_timer(window_id, ctx);
3557            }
3558            return OutputVec::new();
3559        }
3560
3561        // Parse window ID from timer key (final emission timer)
3562        let Some(window_id) = WindowId::from_key(&timer.key) else {
3563            return OutputVec::new();
3564        };
3565
3566        // Get the accumulator
3567        let acc = self.get_accumulator(&window_id, ctx.state);
3568
3569        // Skip empty windows
3570        if acc.is_empty() {
3571            // Clean up state
3572            let _ = Self::delete_accumulator(&window_id, ctx.state);
3573            self.registered_windows.remove(&window_id);
3574            self.periodic_timer_windows.remove(&window_id);
3575            return OutputVec::new();
3576        }
3577
3578        // Get the result
3579        let result = acc.result();
3580
3581        // Clean up window state
3582        let _ = Self::delete_accumulator(&window_id, ctx.state);
3583        self.registered_windows.remove(&window_id);
3584        self.periodic_timer_windows.remove(&window_id);
3585
3586        // Convert result to i64 for the batch
3587        let result_i64 = result.to_i64();
3588
3589        // Create output batch using cached schema (avoids ~200ns allocation per emit)
3590        let batch = RecordBatch::try_new(
3591            Arc::clone(&self.output_schema),
3592            vec![
3593                Arc::new(Int64Array::from(vec![window_id.start])),
3594                Arc::new(Int64Array::from(vec![window_id.end])),
3595                Arc::new(Int64Array::from(vec![result_i64])),
3596            ],
3597        );
3598
3599        let mut output = OutputVec::new();
3600        match batch {
3601            Ok(data) => {
3602                let event = Event::new(window_id.end, data);
3603
3604                // F011B: Emit based on strategy
3605                match &self.emit_strategy {
3606                    // Changelog: wrap in changelog record for CDC
3607                    EmitStrategy::Changelog => {
3608                        let record = ChangelogRecord::insert(event, ctx.processing_time);
3609                        output.push(Output::Changelog(record));
3610                    }
3611                    // All other strategies: emit as regular event
3612                    EmitStrategy::OnWatermark
3613                    | EmitStrategy::Periodic(_)
3614                    | EmitStrategy::OnUpdate
3615                    | EmitStrategy::OnWindowClose
3616                    | EmitStrategy::Final => {
3617                        output.push(Output::Event(event));
3618                    }
3619                }
3620            }
3621            Err(e) => {
3622                tracing::error!("Failed to create output batch: {e}");
3623            }
3624        }
3625        output
3626    }
3627
3628    fn checkpoint(&self) -> OperatorState {
3629        // Serialize both registered windows and periodic timer windows using rkyv
3630        let windows: Vec<_> = self.registered_windows.iter().copied().collect();
3631        let periodic_windows: Vec<_> = self.periodic_timer_windows.iter().copied().collect();
3632
3633        // Create a tuple of both sets
3634        let checkpoint_data = (windows, periodic_windows);
3635        let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
3636            .map(|v| v.to_vec())
3637            .unwrap_or_default();
3638
3639        OperatorState {
3640            operator_id: self.operator_id.clone(),
3641            data,
3642        }
3643    }
3644
3645    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
3646        if state.operator_id != self.operator_id {
3647            return Err(OperatorError::StateAccessFailed(format!(
3648                "Operator ID mismatch: expected {}, got {}",
3649                self.operator_id, state.operator_id
3650            )));
3651        }
3652
3653        // Try to deserialize as the new format (tuple of two vectors)
3654        if let Ok(archived) =
3655            rkyv::access::<rkyv::Archived<(Vec<WindowId>, Vec<WindowId>)>, RkyvError>(&state.data)
3656        {
3657            if let Ok((windows, periodic_windows)) =
3658                rkyv::deserialize::<(Vec<WindowId>, Vec<WindowId>), RkyvError>(archived)
3659            {
3660                self.registered_windows = windows.into_iter().collect();
3661                self.periodic_timer_windows = periodic_windows.into_iter().collect();
3662                return Ok(());
3663            }
3664        }
3665
3666        // Fall back to old format (single vector) for backwards compatibility
3667        let archived = rkyv::access::<rkyv::Archived<Vec<WindowId>>, RkyvError>(&state.data)
3668            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
3669        let windows: Vec<WindowId> = rkyv::deserialize::<Vec<WindowId>, RkyvError>(archived)
3670            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
3671
3672        self.registered_windows = windows.into_iter().collect();
3673        self.periodic_timer_windows = std::collections::HashSet::new();
3674        Ok(())
3675    }
3676}
3677
3678#[cfg(test)]
3679mod tests {
3680    use super::*;
3681    use crate::state::InMemoryStore;
3682    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
3683    use arrow_array::{Int64Array, RecordBatch};
3684    use arrow_schema::{DataType, Field, Schema};
3685    use std::sync::Arc;
3686
3687    fn create_test_event(timestamp: i64, value: i64) -> Event {
3688        let schema = Arc::new(Schema::new(vec![Field::new(
3689            "value",
3690            DataType::Int64,
3691            false,
3692        )]));
3693        let batch =
3694            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
3695        Event::new(timestamp, batch)
3696    }
3697
3698    fn create_test_context<'a>(
3699        timers: &'a mut TimerService,
3700        state: &'a mut dyn StateStore,
3701        watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
3702    ) -> OperatorContext<'a> {
3703        OperatorContext {
3704            event_time: 0,
3705            processing_time: 0,
3706            timers,
3707            state,
3708            watermark_generator: watermark_gen,
3709            operator_index: 0,
3710        }
3711    }
3712
3713    #[test]
3714    fn test_window_id_creation() {
3715        let window = WindowId::new(1000, 2000);
3716        assert_eq!(window.start, 1000);
3717        assert_eq!(window.end, 2000);
3718        assert_eq!(window.duration_ms(), 1000);
3719    }
3720
3721    #[test]
3722    fn test_window_id_serialization() {
3723        let window = WindowId::new(1000, 2000);
3724        let key = window.to_key();
3725        assert_eq!(key.len(), 16);
3726
3727        let restored = WindowId::from_key(&key).unwrap();
3728        assert_eq!(restored, window);
3729    }
3730
3731    #[test]
3732    fn test_tumbling_assigner_positive_timestamps() {
3733        let assigner = TumblingWindowAssigner::from_millis(1000);
3734
3735        // Events at different times within same window
3736        assert_eq!(assigner.assign(0), WindowId::new(0, 1000));
3737        assert_eq!(assigner.assign(500), WindowId::new(0, 1000));
3738        assert_eq!(assigner.assign(999), WindowId::new(0, 1000));
3739
3740        // Event at window boundary goes to next window
3741        assert_eq!(assigner.assign(1000), WindowId::new(1000, 2000));
3742        assert_eq!(assigner.assign(1500), WindowId::new(1000, 2000));
3743    }
3744
3745    #[test]
3746    fn test_tumbling_assigner_negative_timestamps() {
3747        let assigner = TumblingWindowAssigner::from_millis(1000);
3748
3749        // Negative timestamps
3750        assert_eq!(assigner.assign(-1), WindowId::new(-1000, 0));
3751        assert_eq!(assigner.assign(-500), WindowId::new(-1000, 0));
3752        assert_eq!(assigner.assign(-1000), WindowId::new(-1000, 0));
3753        assert_eq!(assigner.assign(-1001), WindowId::new(-2000, -1000));
3754    }
3755
3756    #[test]
3757    fn test_count_aggregator() {
3758        let mut acc = CountAccumulator::default();
3759        assert!(acc.is_empty());
3760        assert_eq!(acc.result(), 0);
3761
3762        acc.add(());
3763        acc.add(());
3764        acc.add(());
3765
3766        assert!(!acc.is_empty());
3767        assert_eq!(acc.result(), 3);
3768    }
3769
3770    #[test]
3771    fn test_sum_accumulator() {
3772        let mut acc = SumAccumulator::default();
3773        acc.add(10);
3774        acc.add(20);
3775        acc.add(30);
3776
3777        assert_eq!(acc.result(), 60);
3778    }
3779
3780    #[test]
3781    fn test_min_accumulator() {
3782        let mut acc = MinAccumulator::default();
3783        assert!(acc.is_empty());
3784        assert_eq!(acc.result(), None);
3785
3786        acc.add(50);
3787        acc.add(10);
3788        acc.add(30);
3789
3790        assert_eq!(acc.result(), Some(10));
3791    }
3792
3793    #[test]
3794    fn test_max_accumulator() {
3795        let mut acc = MaxAccumulator::default();
3796        acc.add(10);
3797        acc.add(50);
3798        acc.add(30);
3799
3800        assert_eq!(acc.result(), Some(50));
3801    }
3802
3803    #[test]
3804    fn test_avg_accumulator() {
3805        let mut acc = AvgAccumulator::default();
3806        acc.add(10);
3807        acc.add(20);
3808        acc.add(30);
3809
3810        let result = acc.result().unwrap();
3811        assert!((result - 20.0).abs() < f64::EPSILON);
3812    }
3813
3814    #[test]
3815    fn test_accumulator_merge() {
3816        let mut acc1 = SumAccumulator::default();
3817        acc1.add(10);
3818        acc1.add(20);
3819
3820        let mut acc2 = SumAccumulator::default();
3821        acc2.add(30);
3822        acc2.add(40);
3823
3824        acc1.merge(&acc2);
3825        assert_eq!(acc1.result(), 100);
3826    }
3827
3828    #[test]
3829    fn test_tumbling_window_operator_basic() {
3830        let assigner = TumblingWindowAssigner::from_millis(1000);
3831        let aggregator = CountAggregator::new();
3832        let mut operator = TumblingWindowOperator::with_id(
3833            assigner,
3834            aggregator,
3835            Duration::from_millis(0),
3836            "test_op".to_string(),
3837        );
3838
3839        let mut timers = TimerService::new();
3840        let mut state = InMemoryStore::new();
3841        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
3842
3843        // Process events in the same window
3844        let event1 = create_test_event(100, 1);
3845        let event2 = create_test_event(500, 2);
3846        let event3 = create_test_event(900, 3);
3847
3848        {
3849            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3850            operator.process(&event1, &mut ctx);
3851        }
3852        {
3853            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3854            operator.process(&event2, &mut ctx);
3855        }
3856        {
3857            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3858            operator.process(&event3, &mut ctx);
3859        }
3860
3861        // Check that a timer was registered
3862        assert_eq!(operator.registered_windows.len(), 1);
3863        assert!(operator
3864            .registered_windows
3865            .contains(&WindowId::new(0, 1000)));
3866    }
3867
3868    #[test]
3869    fn test_tumbling_window_operator_trigger() {
3870        let assigner = TumblingWindowAssigner::from_millis(1000);
3871        let aggregator = CountAggregator::new();
3872        let mut operator = TumblingWindowOperator::with_id(
3873            assigner,
3874            aggregator,
3875            Duration::from_millis(0),
3876            "test_op".to_string(),
3877        );
3878
3879        let mut timers = TimerService::new();
3880        let mut state = InMemoryStore::new();
3881        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
3882
3883        // Process events
3884        for ts in [100, 500, 900] {
3885            let event = create_test_event(ts, 1);
3886            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3887            operator.process(&event, &mut ctx);
3888        }
3889
3890        // Trigger the window via timer
3891        let timer = Timer {
3892            key: WindowId::new(0, 1000).to_key(),
3893            timestamp: 1000,
3894        };
3895
3896        let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3897        let outputs = operator.on_timer(timer, &mut ctx);
3898
3899        assert_eq!(outputs.len(), 1);
3900        match &outputs[0] {
3901            Output::Event(event) => {
3902                assert_eq!(event.timestamp, 1000); // window end
3903                                                   // Check the result column (count = 3)
3904                let result_col = event.data.column(2);
3905                let result_array = result_col.as_any().downcast_ref::<Int64Array>().unwrap();
3906                assert_eq!(result_array.value(0), 3);
3907            }
3908            _ => panic!("Expected Event output"),
3909        }
3910
3911        // Window should be cleaned up
3912        assert!(operator.registered_windows.is_empty());
3913    }
3914
3915    #[test]
3916    fn test_tumbling_window_multiple_windows() {
3917        let assigner = TumblingWindowAssigner::from_millis(1000);
3918        let aggregator = CountAggregator::new();
3919        let mut operator = TumblingWindowOperator::with_id(
3920            assigner,
3921            aggregator,
3922            Duration::from_millis(0),
3923            "test_op".to_string(),
3924        );
3925
3926        let mut timers = TimerService::new();
3927        let mut state = InMemoryStore::new();
3928        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
3929
3930        // Events in different windows
3931        let events = [
3932            create_test_event(100, 1),  // Window [0, 1000)
3933            create_test_event(500, 2),  // Window [0, 1000)
3934            create_test_event(1100, 3), // Window [1000, 2000)
3935            create_test_event(1500, 4), // Window [1000, 2000)
3936            create_test_event(2500, 5), // Window [2000, 3000)
3937        ];
3938
3939        for event in &events {
3940            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3941            operator.process(event, &mut ctx);
3942        }
3943
3944        // Should have 3 windows registered
3945        assert_eq!(operator.registered_windows.len(), 3);
3946    }
3947
3948    #[test]
3949    fn test_tumbling_window_checkpoint_restore() {
3950        let assigner = TumblingWindowAssigner::from_millis(1000);
3951        let aggregator = CountAggregator::new();
3952        let mut operator = TumblingWindowOperator::with_id(
3953            assigner.clone(),
3954            aggregator.clone(),
3955            Duration::from_millis(0),
3956            "test_op".to_string(),
3957        );
3958
3959        // Register some windows
3960        operator.registered_windows.insert(WindowId::new(0, 1000));
3961        operator
3962            .registered_windows
3963            .insert(WindowId::new(1000, 2000));
3964
3965        // Checkpoint
3966        let checkpoint = operator.checkpoint();
3967
3968        // Create a new operator and restore
3969        let mut restored_operator = TumblingWindowOperator::with_id(
3970            assigner,
3971            aggregator,
3972            Duration::from_millis(0),
3973            "test_op".to_string(),
3974        );
3975        restored_operator.restore(checkpoint).unwrap();
3976
3977        assert_eq!(restored_operator.registered_windows.len(), 2);
3978        assert!(restored_operator
3979            .registered_windows
3980            .contains(&WindowId::new(0, 1000)));
3981        assert!(restored_operator
3982            .registered_windows
3983            .contains(&WindowId::new(1000, 2000)));
3984    }
3985
3986    #[test]
3987    fn test_sum_aggregator_extraction() {
3988        let aggregator = SumAggregator::new(0);
3989        let event = create_test_event(100, 42);
3990
3991        let extracted = aggregator.extract(&event);
3992        assert_eq!(extracted, Some(42));
3993    }
3994
3995    #[test]
3996    fn test_empty_window_trigger() {
3997        let assigner = TumblingWindowAssigner::from_millis(1000);
3998        let aggregator = CountAggregator::new();
3999        let mut operator = TumblingWindowOperator::with_id(
4000            assigner,
4001            aggregator,
4002            Duration::from_millis(0),
4003            "test_op".to_string(),
4004        );
4005
4006        let mut timers = TimerService::new();
4007        let mut state = InMemoryStore::new();
4008        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
4009
4010        // Trigger without any events
4011        let timer = Timer {
4012            key: WindowId::new(0, 1000).to_key(),
4013            timestamp: 1000,
4014        };
4015
4016        let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4017        let outputs = operator.on_timer(timer, &mut ctx);
4018
4019        // Empty window should produce no output
4020        assert!(outputs.is_empty());
4021    }
4022
4023    #[test]
4024    fn test_window_assigner_trait() {
4025        let assigner = TumblingWindowAssigner::from_millis(1000);
4026        let windows = assigner.assign_windows(500);
4027
4028        assert_eq!(windows.len(), 1);
4029        assert_eq!(windows[0], WindowId::new(0, 1000));
4030    }
4031
4032    #[test]
4033    fn test_emit_strategy_default() {
4034        let strategy = EmitStrategy::default();
4035        assert_eq!(strategy, EmitStrategy::OnWatermark);
4036    }
4037
4038    #[test]
4039    fn test_emit_strategy_on_watermark() {
4040        let strategy = EmitStrategy::OnWatermark;
4041        assert!(!strategy.needs_periodic_timer());
4042        assert!(strategy.periodic_interval().is_none());
4043        assert!(!strategy.emits_on_update());
4044    }
4045
4046    #[test]
4047    fn test_emit_strategy_periodic() {
4048        let interval = Duration::from_secs(10);
4049        let strategy = EmitStrategy::Periodic(interval);
4050        assert!(strategy.needs_periodic_timer());
4051        assert_eq!(strategy.periodic_interval(), Some(interval));
4052        assert!(!strategy.emits_on_update());
4053    }
4054
4055    #[test]
4056    fn test_emit_strategy_on_update() {
4057        let strategy = EmitStrategy::OnUpdate;
4058        assert!(!strategy.needs_periodic_timer());
4059        assert!(strategy.periodic_interval().is_none());
4060        assert!(strategy.emits_on_update());
4061    }
4062
4063    #[test]
4064    fn test_window_operator_set_emit_strategy() {
4065        let assigner = TumblingWindowAssigner::from_millis(1000);
4066        let aggregator = CountAggregator::new();
4067        let mut operator = TumblingWindowOperator::with_id(
4068            assigner,
4069            aggregator,
4070            Duration::from_millis(0),
4071            "test_op".to_string(),
4072        );
4073
4074        // Default is OnWatermark
4075        assert_eq!(*operator.emit_strategy(), EmitStrategy::OnWatermark);
4076
4077        // Set to Periodic
4078        operator.set_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(5)));
4079        assert_eq!(
4080            *operator.emit_strategy(),
4081            EmitStrategy::Periodic(Duration::from_secs(5))
4082        );
4083
4084        // Set to OnUpdate
4085        operator.set_emit_strategy(EmitStrategy::OnUpdate);
4086        assert_eq!(*operator.emit_strategy(), EmitStrategy::OnUpdate);
4087    }
4088
4089    #[test]
4090    fn test_emit_on_update_emits_intermediate_results() {
4091        let assigner = TumblingWindowAssigner::from_millis(1000);
4092        let aggregator = CountAggregator::new();
4093        let mut operator = TumblingWindowOperator::with_id(
4094            assigner,
4095            aggregator,
4096            Duration::from_millis(0),
4097            "test_op".to_string(),
4098        );
4099
4100        // Set emit strategy to OnUpdate
4101        operator.set_emit_strategy(EmitStrategy::OnUpdate);
4102
4103        let mut timers = TimerService::new();
4104        let mut state = InMemoryStore::new();
4105        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
4106
4107        // Process first event - should emit intermediate result
4108        let event1 = create_test_event(100, 1);
4109        let outputs1 = {
4110            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4111            operator.process(&event1, &mut ctx)
4112        };
4113
4114        // Should have at least one event output (intermediate result)
4115        let has_event = outputs1.iter().any(|o| matches!(o, Output::Event(_)));
4116        assert!(
4117            has_event,
4118            "OnUpdate should emit intermediate result after first event"
4119        );
4120
4121        // Process second event - should emit another intermediate result
4122        let event2 = create_test_event(500, 2);
4123        let outputs2 = {
4124            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4125            operator.process(&event2, &mut ctx)
4126        };
4127
4128        // Should have intermediate result with count = 2
4129        let event_output = outputs2.iter().find_map(|o| {
4130            if let Output::Event(e) = o {
4131                Some(e)
4132            } else {
4133                None
4134            }
4135        });
4136
4137        assert!(
4138            event_output.is_some(),
4139            "OnUpdate should emit after second event"
4140        );
4141        if let Some(event) = event_output {
4142            let result_col = event.data.column(2);
4143            let result_array = result_col.as_any().downcast_ref::<Int64Array>().unwrap();
4144            assert_eq!(result_array.value(0), 2, "Intermediate count should be 2");
4145        }
4146    }
4147
4148    #[test]
4149    fn test_emit_on_watermark_no_intermediate_results() {
4150        let assigner = TumblingWindowAssigner::from_millis(1000);
4151        let aggregator = CountAggregator::new();
4152        let mut operator = TumblingWindowOperator::with_id(
4153            assigner,
4154            aggregator,
4155            Duration::from_millis(0),
4156            "test_op".to_string(),
4157        );
4158
4159        // Default is OnWatermark - no intermediate emissions
4160        let mut timers = TimerService::new();
4161        let mut state = InMemoryStore::new();
4162        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
4163
4164        // Process events
4165        let event1 = create_test_event(100, 1);
4166        let outputs1 = {
4167            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4168            operator.process(&event1, &mut ctx)
4169        };
4170
4171        // Should NOT have event output (only watermark update if any)
4172        let has_intermediate_event = outputs1.iter().any(|o| matches!(o, Output::Event(_)));
4173        assert!(
4174            !has_intermediate_event,
4175            "OnWatermark should not emit intermediate results"
4176        );
4177    }
4178
4179    #[test]
4180    fn test_checkpoint_restore_with_emit_strategy() {
4181        let assigner = TumblingWindowAssigner::from_millis(1000);
4182        let aggregator = CountAggregator::new();
4183        let mut operator = TumblingWindowOperator::with_id(
4184            assigner.clone(),
4185            aggregator.clone(),
4186            Duration::from_millis(0),
4187            "test_op".to_string(),
4188        );
4189
4190        // Set emit strategy and register some windows
4191        operator.set_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(10)));
4192        operator.registered_windows.insert(WindowId::new(0, 1000));
4193        operator
4194            .periodic_timer_windows
4195            .insert(WindowId::new(0, 1000));
4196
4197        // Checkpoint
4198        let checkpoint = operator.checkpoint();
4199
4200        // Create a new operator and restore
4201        let mut restored_operator = TumblingWindowOperator::with_id(
4202            assigner,
4203            aggregator,
4204            Duration::from_millis(0),
4205            "test_op".to_string(),
4206        );
4207        restored_operator.restore(checkpoint).unwrap();
4208
4209        // Both registered_windows and periodic_timer_windows should be restored
4210        assert_eq!(restored_operator.registered_windows.len(), 1);
4211        assert_eq!(restored_operator.periodic_timer_windows.len(), 1);
4212        assert!(restored_operator
4213            .registered_windows
4214            .contains(&WindowId::new(0, 1000)));
4215        assert!(restored_operator
4216            .periodic_timer_windows
4217            .contains(&WindowId::new(0, 1000)));
4218    }
4219
4220    #[test]
4221    fn test_periodic_timer_key_format() {
4222        // Verify the periodic timer key format
4223        let window_id = WindowId::new(1000, 2000);
4224
4225        // Create periodic key using the helper
4226        let periodic_key =
4227            TumblingWindowOperator::<CountAggregator>::periodic_timer_key(&window_id);
4228
4229        // Periodic key should be 16 bytes (same as window key, but with high bit set)
4230        assert_eq!(periodic_key.len(), 16);
4231
4232        // First byte should have high bit set
4233        assert!(TumblingWindowOperator::<CountAggregator>::is_periodic_timer_key(&periodic_key));
4234
4235        // Extract window ID from periodic key
4236        let extracted =
4237            TumblingWindowOperator::<CountAggregator>::window_id_from_periodic_key(&periodic_key);
4238        assert_eq!(extracted, Some(window_id));
4239
4240        // Regular window key should not be detected as periodic
4241        let regular_key = window_id.to_key();
4242        assert!(!TumblingWindowOperator::<CountAggregator>::is_periodic_timer_key(&regular_key));
4243    }
4244
4245    #[test]
4246    fn test_late_data_config_default() {
4247        let config = LateDataConfig::default();
4248        assert!(config.should_drop());
4249        assert!(config.side_output().is_none());
4250    }
4251
4252    #[test]
4253    fn test_late_data_config_drop() {
4254        let config = LateDataConfig::drop();
4255        assert!(config.should_drop());
4256        assert!(config.side_output().is_none());
4257    }
4258
4259    #[test]
4260    fn test_late_data_config_with_side_output() {
4261        let config = LateDataConfig::with_side_output("late_events".to_string());
4262        assert!(!config.should_drop());
4263        assert_eq!(config.side_output(), Some("late_events"));
4264    }
4265
4266    #[test]
4267    fn test_late_data_metrics_initial() {
4268        let metrics = LateDataMetrics::new();
4269        assert_eq!(metrics.late_events_total(), 0);
4270        assert_eq!(metrics.late_events_dropped(), 0);
4271        assert_eq!(metrics.late_events_side_output(), 0);
4272    }
4273
4274    #[test]
4275    fn test_late_data_metrics_tracking() {
4276        let mut metrics = LateDataMetrics::new();
4277
4278        metrics.record_dropped();
4279        metrics.record_dropped();
4280        metrics.record_side_output();
4281
4282        assert_eq!(metrics.late_events_total(), 3);
4283        assert_eq!(metrics.late_events_dropped(), 2);
4284        assert_eq!(metrics.late_events_side_output(), 1);
4285    }
4286
4287    #[test]
4288    fn test_late_data_metrics_reset() {
4289        let mut metrics = LateDataMetrics::new();
4290
4291        metrics.record_dropped();
4292        metrics.record_side_output();
4293
4294        assert_eq!(metrics.late_events_total(), 2);
4295
4296        metrics.reset();
4297
4298        assert_eq!(metrics.late_events_total(), 0);
4299        assert_eq!(metrics.late_events_dropped(), 0);
4300        assert_eq!(metrics.late_events_side_output(), 0);
4301    }
4302
4303    #[test]
4304    fn test_window_operator_set_late_data_config() {
4305        let assigner = TumblingWindowAssigner::from_millis(1000);
4306        let aggregator = CountAggregator::new();
4307        let mut operator = TumblingWindowOperator::with_id(
4308            assigner,
4309            aggregator,
4310            Duration::from_millis(100),
4311            "test_op".to_string(),
4312        );
4313
4314        // Default is drop
4315        assert!(operator.late_data_config().should_drop());
4316
4317        // Set to side output
4318        operator.set_late_data_config(LateDataConfig::with_side_output("late".to_string()));
4319        assert!(!operator.late_data_config().should_drop());
4320        assert_eq!(operator.late_data_config().side_output(), Some("late"));
4321    }
4322
4323    #[test]
4324    fn test_late_event_dropped_without_side_output() {
4325        let assigner = TumblingWindowAssigner::from_millis(1000);
4326        let aggregator = CountAggregator::new();
4327        let mut operator = TumblingWindowOperator::with_id(
4328            assigner,
4329            aggregator,
4330            Duration::from_millis(0), // No allowed lateness
4331            "test_op".to_string(),
4332        );
4333
4334        // Default: drop late events
4335        let mut timers = TimerService::new();
4336        let mut state = InMemoryStore::new();
4337        // Use a watermark generator with high max lateness so watermarks advance quickly
4338        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
4339
4340        // Process an event to advance the watermark to 1000
4341        let event1 = create_test_event(1000, 1);
4342        {
4343            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4344            operator.process(&event1, &mut ctx);
4345        }
4346
4347        // Process a late event (timestamp 500 when watermark is at 1000)
4348        let late_event = create_test_event(500, 2);
4349        let outputs = {
4350            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4351            operator.process(&late_event, &mut ctx)
4352        };
4353
4354        // Should emit a LateEvent (dropped)
4355        assert!(!outputs.is_empty());
4356        let is_late_event = outputs.iter().any(|o| matches!(o, Output::LateEvent(_)));
4357        assert!(is_late_event, "Expected LateEvent output");
4358
4359        // Metrics should show dropped
4360        assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
4361        assert_eq!(operator.late_data_metrics().late_events_side_output(), 0);
4362    }
4363
4364    #[test]
4365    fn test_late_event_routed_to_side_output() {
4366        let assigner = TumblingWindowAssigner::from_millis(1000);
4367        let aggregator = CountAggregator::new();
4368        let mut operator = TumblingWindowOperator::with_id(
4369            assigner,
4370            aggregator,
4371            Duration::from_millis(0), // No allowed lateness
4372            "test_op".to_string(),
4373        );
4374
4375        // Configure side output for late events
4376        operator.set_late_data_config(LateDataConfig::with_side_output("late_events".to_string()));
4377
4378        let mut timers = TimerService::new();
4379        let mut state = InMemoryStore::new();
4380        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
4381
4382        // Process an event to advance the watermark to 1000
4383        let event1 = create_test_event(1000, 1);
4384        {
4385            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4386            operator.process(&event1, &mut ctx);
4387        }
4388
4389        // Process a late event
4390        let late_event = create_test_event(500, 2);
4391        let outputs = {
4392            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4393            operator.process(&late_event, &mut ctx)
4394        };
4395
4396        // Should emit a SideOutput
4397        assert!(!outputs.is_empty());
4398        let side_output = outputs.iter().find_map(|o| {
4399            if let Output::SideOutput { name, .. } = o {
4400                Some(name.clone())
4401            } else {
4402                None
4403            }
4404        });
4405        assert_eq!(side_output, Some("late_events".to_string()));
4406
4407        // Metrics should show side output
4408        assert_eq!(operator.late_data_metrics().late_events_dropped(), 0);
4409        assert_eq!(operator.late_data_metrics().late_events_side_output(), 1);
4410    }
4411
4412    #[test]
4413    fn test_event_within_lateness_not_late() {
4414        let assigner = TumblingWindowAssigner::from_millis(1000);
4415        let aggregator = CountAggregator::new();
4416        let mut operator = TumblingWindowOperator::with_id(
4417            assigner,
4418            aggregator,
4419            Duration::from_millis(500), // 500ms allowed lateness
4420            "test_op".to_string(),
4421        );
4422
4423        let mut timers = TimerService::new();
4424        let mut state = InMemoryStore::new();
4425        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
4426
4427        // Process an event to advance the watermark to 1200
4428        // This would close window [0, 1000) at time 1000 + 0 (no lateness from watermark gen)
4429        // But with 500ms allowed lateness, window cleanup is at 1500
4430        let event1 = create_test_event(1200, 1);
4431        {
4432            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4433            operator.process(&event1, &mut ctx);
4434        }
4435
4436        // Process an event for window [0, 1000) at timestamp 800
4437        // Watermark is at 1200, window cleanup time is 1000 + 500 = 1500
4438        // Since 1200 < 1500, the event should NOT be late
4439        let event2 = create_test_event(800, 2);
4440        let outputs = {
4441            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4442            operator.process(&event2, &mut ctx)
4443        };
4444
4445        // Should NOT be a late event - should be processed normally
4446        let is_late_event = outputs
4447            .iter()
4448            .any(|o| matches!(o, Output::LateEvent(_) | Output::SideOutput { .. }));
4449        assert!(
4450            !is_late_event,
4451            "Event within lateness period should not be marked as late"
4452        );
4453
4454        // No late events recorded
4455        assert_eq!(operator.late_data_metrics().late_events_total(), 0);
4456    }
4457
4458    #[test]
4459    fn test_reset_late_data_metrics() {
4460        let assigner = TumblingWindowAssigner::from_millis(1000);
4461        let aggregator = CountAggregator::new();
4462        let mut operator = TumblingWindowOperator::with_id(
4463            assigner,
4464            aggregator,
4465            Duration::from_millis(0),
4466            "test_op".to_string(),
4467        );
4468
4469        let mut timers = TimerService::new();
4470        let mut state = InMemoryStore::new();
4471        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
4472
4473        // Generate a late event
4474        let event1 = create_test_event(1000, 1);
4475        {
4476            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4477            operator.process(&event1, &mut ctx);
4478        }
4479        let late_event = create_test_event(500, 2);
4480        {
4481            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4482            operator.process(&late_event, &mut ctx);
4483        }
4484
4485        assert_eq!(operator.late_data_metrics().late_events_total(), 1);
4486
4487        // Reset metrics
4488        operator.reset_late_data_metrics();
4489
4490        assert_eq!(operator.late_data_metrics().late_events_total(), 0);
4491    }
4492
4493    #[test]
4494    fn test_emit_strategy_helper_methods() {
4495        // OnWatermark
4496        assert!(!EmitStrategy::OnWatermark.emits_intermediate());
4497        assert!(!EmitStrategy::OnWatermark.requires_changelog());
4498        assert!(!EmitStrategy::OnWatermark.is_append_only_compatible());
4499        assert!(EmitStrategy::OnWatermark.generates_retractions());
4500        assert!(!EmitStrategy::OnWatermark.suppresses_intermediate());
4501        assert!(!EmitStrategy::OnWatermark.drops_late_data());
4502
4503        // OnUpdate
4504        assert!(EmitStrategy::OnUpdate.emits_intermediate());
4505        assert!(!EmitStrategy::OnUpdate.requires_changelog());
4506        assert!(!EmitStrategy::OnUpdate.is_append_only_compatible());
4507        assert!(EmitStrategy::OnUpdate.generates_retractions());
4508        assert!(!EmitStrategy::OnUpdate.suppresses_intermediate());
4509
4510        // Periodic
4511        let periodic = EmitStrategy::Periodic(Duration::from_secs(10));
4512        assert!(periodic.emits_intermediate());
4513        assert!(!periodic.requires_changelog());
4514        assert!(!periodic.is_append_only_compatible());
4515        assert!(!periodic.generates_retractions());
4516        assert!(!periodic.suppresses_intermediate());
4517
4518        // OnWindowClose (F011B)
4519        assert!(!EmitStrategy::OnWindowClose.emits_intermediate());
4520        assert!(!EmitStrategy::OnWindowClose.requires_changelog());
4521        assert!(EmitStrategy::OnWindowClose.is_append_only_compatible());
4522        assert!(!EmitStrategy::OnWindowClose.generates_retractions());
4523        assert!(EmitStrategy::OnWindowClose.suppresses_intermediate());
4524        assert!(!EmitStrategy::OnWindowClose.drops_late_data());
4525
4526        // Changelog (F011B)
4527        assert!(!EmitStrategy::Changelog.emits_intermediate());
4528        assert!(EmitStrategy::Changelog.requires_changelog());
4529        assert!(!EmitStrategy::Changelog.is_append_only_compatible());
4530        assert!(EmitStrategy::Changelog.generates_retractions());
4531        assert!(!EmitStrategy::Changelog.suppresses_intermediate());
4532
4533        // Final (F011B)
4534        assert!(!EmitStrategy::Final.emits_intermediate());
4535        assert!(!EmitStrategy::Final.requires_changelog());
4536        assert!(EmitStrategy::Final.is_append_only_compatible());
4537        assert!(!EmitStrategy::Final.generates_retractions());
4538        assert!(EmitStrategy::Final.suppresses_intermediate());
4539        assert!(EmitStrategy::Final.drops_late_data());
4540    }
4541
4542    #[test]
4543    fn test_cdc_operation() {
4544        assert_eq!(CdcOperation::Insert.weight(), 1);
4545        assert_eq!(CdcOperation::Delete.weight(), -1);
4546        assert_eq!(CdcOperation::UpdateBefore.weight(), -1);
4547        assert_eq!(CdcOperation::UpdateAfter.weight(), 1);
4548
4549        assert!(CdcOperation::Insert.is_insert());
4550        assert!(CdcOperation::UpdateAfter.is_insert());
4551        assert!(!CdcOperation::Delete.is_insert());
4552        assert!(!CdcOperation::UpdateBefore.is_insert());
4553
4554        assert!(CdcOperation::Delete.is_delete());
4555        assert!(CdcOperation::UpdateBefore.is_delete());
4556        assert!(!CdcOperation::Insert.is_delete());
4557        assert!(!CdcOperation::UpdateAfter.is_delete());
4558
4559        assert_eq!(CdcOperation::Insert.debezium_op(), 'c');
4560        assert_eq!(CdcOperation::Delete.debezium_op(), 'd');
4561        assert_eq!(CdcOperation::UpdateBefore.debezium_op(), 'u');
4562        assert_eq!(CdcOperation::UpdateAfter.debezium_op(), 'u');
4563    }
4564
4565    #[test]
4566    fn test_changelog_record_insert() {
4567        let event = create_test_event(1000, 42);
4568        let record = ChangelogRecord::insert(event.clone(), 2000);
4569
4570        assert_eq!(record.operation, CdcOperation::Insert);
4571        assert_eq!(record.weight, 1);
4572        assert_eq!(record.emit_timestamp, 2000);
4573        assert_eq!(record.event.timestamp, 1000);
4574        assert!(record.is_insert());
4575        assert!(!record.is_delete());
4576    }
4577
4578    #[test]
4579    fn test_changelog_record_delete() {
4580        let event = create_test_event(1000, 42);
4581        let record = ChangelogRecord::delete(event.clone(), 2000);
4582
4583        assert_eq!(record.operation, CdcOperation::Delete);
4584        assert_eq!(record.weight, -1);
4585        assert_eq!(record.emit_timestamp, 2000);
4586        assert!(record.is_delete());
4587        assert!(!record.is_insert());
4588    }
4589
4590    #[test]
4591    fn test_changelog_record_update() {
4592        let old_event = create_test_event(1000, 10);
4593        let new_event = create_test_event(1000, 20);
4594        let (before, after) = ChangelogRecord::update(old_event, new_event, 2000);
4595
4596        assert_eq!(before.operation, CdcOperation::UpdateBefore);
4597        assert_eq!(before.weight, -1);
4598        assert!(before.is_delete());
4599
4600        assert_eq!(after.operation, CdcOperation::UpdateAfter);
4601        assert_eq!(after.weight, 1);
4602        assert!(after.is_insert());
4603    }
4604
4605    #[test]
4606    fn test_emit_strategy_on_window_close() {
4607        let assigner = TumblingWindowAssigner::from_millis(1000);
4608        let aggregator = CountAggregator::new();
4609        let mut operator = TumblingWindowOperator::with_id(
4610            assigner,
4611            aggregator,
4612            Duration::from_millis(0),
4613            "test_op".to_string(),
4614        );
4615
4616        // Set OnWindowClose strategy
4617        operator.set_emit_strategy(EmitStrategy::OnWindowClose);
4618
4619        let mut timers = TimerService::new();
4620        let mut state = InMemoryStore::new();
4621        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
4622
4623        // Process events - should NOT emit intermediate results
4624        let event1 = create_test_event(100, 1);
4625        let event2 = create_test_event(200, 2);
4626
4627        let outputs1 = {
4628            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4629            operator.process(&event1, &mut ctx)
4630        };
4631        let outputs2 = {
4632            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4633            operator.process(&event2, &mut ctx)
4634        };
4635
4636        // No Event outputs (only watermark updates)
4637        let event_outputs1: Vec<_> = outputs1
4638            .iter()
4639            .filter(|o| matches!(o, Output::Event(_)))
4640            .collect();
4641        let event_outputs2: Vec<_> = outputs2
4642            .iter()
4643            .filter(|o| matches!(o, Output::Event(_)))
4644            .collect();
4645
4646        assert!(
4647            event_outputs1.is_empty(),
4648            "OnWindowClose should not emit intermediate results"
4649        );
4650        assert!(
4651            event_outputs2.is_empty(),
4652            "OnWindowClose should not emit intermediate results"
4653        );
4654    }
4655
4656    #[test]
4657    fn test_emit_strategy_final_drops_late_data() {
4658        let assigner = TumblingWindowAssigner::from_millis(1000);
4659        let aggregator = CountAggregator::new();
4660        let mut operator = TumblingWindowOperator::with_id(
4661            assigner,
4662            aggregator,
4663            Duration::from_millis(0),
4664            "test_op".to_string(),
4665        );
4666
4667        // Set Final strategy
4668        operator.set_emit_strategy(EmitStrategy::Final);
4669
4670        let mut timers = TimerService::new();
4671        let mut state = InMemoryStore::new();
4672        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
4673
4674        // Advance watermark past first window
4675        let event1 = create_test_event(1500, 1);
4676        {
4677            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4678            operator.process(&event1, &mut ctx);
4679        }
4680
4681        // Send late event - should be silently dropped (no LateEvent output)
4682        let late_event = create_test_event(500, 2);
4683        let outputs = {
4684            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4685            operator.process(&late_event, &mut ctx)
4686        };
4687
4688        // Should have NO output at all (silently dropped)
4689        assert!(
4690            outputs.is_empty(),
4691            "EMIT FINAL should silently drop late data"
4692        );
4693        assert_eq!(
4694            operator.late_data_metrics().late_events_dropped(),
4695            1,
4696            "Late event should be recorded as dropped"
4697        );
4698    }
4699
4700    #[test]
4701    fn test_emit_strategy_changelog_emits_records() {
4702        let assigner = TumblingWindowAssigner::from_millis(1000);
4703        let aggregator = CountAggregator::new();
4704        let mut operator = TumblingWindowOperator::with_id(
4705            assigner,
4706            aggregator,
4707            Duration::from_millis(0),
4708            "test_op".to_string(),
4709        );
4710
4711        // Set Changelog strategy
4712        operator.set_emit_strategy(EmitStrategy::Changelog);
4713
4714        let mut timers = TimerService::new();
4715        let mut state = InMemoryStore::new();
4716        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
4717
4718        // Process event - Changelog emits on every update
4719        let event = create_test_event(100, 1);
4720        let outputs = {
4721            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4722            operator.process(&event, &mut ctx)
4723        };
4724
4725        // Should emit Changelog record
4726        let changelog_outputs: Vec<_> = outputs
4727            .iter()
4728            .filter(|o| matches!(o, Output::Changelog(_)))
4729            .collect();
4730
4731        assert_eq!(
4732            changelog_outputs.len(),
4733            1,
4734            "Changelog strategy should emit changelog record on update"
4735        );
4736
4737        if let Output::Changelog(record) = &changelog_outputs[0] {
4738            assert_eq!(record.operation, CdcOperation::Insert);
4739            assert_eq!(record.weight, 1);
4740        } else {
4741            panic!("Expected Changelog output");
4742        }
4743    }
4744
4745    #[test]
4746    fn test_emit_strategy_changelog_on_timer() {
4747        let assigner = TumblingWindowAssigner::from_millis(1000);
4748        let aggregator = CountAggregator::new();
4749        let mut operator = TumblingWindowOperator::with_id(
4750            assigner,
4751            aggregator,
4752            Duration::from_millis(0),
4753            "test_op".to_string(),
4754        );
4755
4756        // Set Changelog strategy
4757        operator.set_emit_strategy(EmitStrategy::Changelog);
4758
4759        let mut timers = TimerService::new();
4760        let mut state = InMemoryStore::new();
4761        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
4762
4763        // Process events to populate window
4764        let event = create_test_event(100, 1);
4765        {
4766            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4767            operator.process(&event, &mut ctx);
4768        }
4769
4770        // Trigger window timer - should emit Changelog
4771        let timer = Timer {
4772            key: WindowId::new(0, 1000).to_key(),
4773            timestamp: 1000,
4774        };
4775
4776        let outputs = {
4777            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4778            operator.on_timer(timer, &mut ctx)
4779        };
4780
4781        // Should emit Changelog record on final emission
4782        let changelog_outputs: Vec<_> = outputs
4783            .iter()
4784            .filter(|o| matches!(o, Output::Changelog(_)))
4785            .collect();
4786
4787        assert_eq!(
4788            changelog_outputs.len(),
4789            1,
4790            "Changelog strategy should emit changelog record on timer"
4791        );
4792
4793        if let Output::Changelog(record) = &changelog_outputs[0] {
4794            assert_eq!(record.operation, CdcOperation::Insert);
4795        } else {
4796            panic!("Expected Changelog output");
4797        }
4798    }
4799
4800    // FIRST_VALUE / LAST_VALUE Tests (F059)
4801
4802    #[test]
4803    fn test_first_value_single_event() {
4804        let mut acc = FirstValueAccumulator::default();
4805        assert!(acc.is_empty());
4806        assert_eq!(acc.result(), None);
4807
4808        acc.add((100, 1000));
4809        assert!(!acc.is_empty());
4810        assert_eq!(acc.result(), Some(100));
4811    }
4812
4813    #[test]
4814    fn test_first_value_multiple_events() {
4815        let mut acc = FirstValueAccumulator::default();
4816        acc.add((100, 1000)); // timestamp 1000
4817        acc.add((200, 500)); // timestamp 500 (earlier)
4818        acc.add((300, 1500)); // timestamp 1500 (later)
4819
4820        // Earliest timestamp wins
4821        assert_eq!(acc.result(), Some(200));
4822    }
4823
4824    #[test]
4825    fn test_first_value_same_timestamp() {
4826        let mut acc = FirstValueAccumulator::default();
4827        acc.add((100, 1000));
4828        acc.add((200, 1000)); // Same timestamp - keep first
4829
4830        assert_eq!(acc.result(), Some(100));
4831    }
4832
4833    #[test]
4834    fn test_first_value_merge() {
4835        let mut acc1 = FirstValueAccumulator::default();
4836        acc1.add((100, 1000));
4837
4838        let mut acc2 = FirstValueAccumulator::default();
4839        acc2.add((200, 500)); // Earlier
4840
4841        acc1.merge(&acc2);
4842        assert_eq!(acc1.result(), Some(200)); // 500 < 1000
4843    }
4844
4845    #[test]
4846    fn test_first_value_merge_empty() {
4847        let mut acc1 = FirstValueAccumulator::default();
4848        acc1.add((100, 1000));
4849
4850        let acc2 = FirstValueAccumulator::default(); // Empty
4851
4852        acc1.merge(&acc2);
4853        assert_eq!(acc1.result(), Some(100)); // Keep acc1
4854
4855        let mut acc3 = FirstValueAccumulator::default(); // Empty
4856        acc3.merge(&acc1);
4857        assert_eq!(acc3.result(), Some(100)); // Take acc1's value
4858    }
4859
4860    #[test]
4861    fn test_last_value_single_event() {
4862        let mut acc = LastValueAccumulator::default();
4863        assert!(acc.is_empty());
4864        assert_eq!(acc.result(), None);
4865
4866        acc.add((100, 1000));
4867        assert!(!acc.is_empty());
4868        assert_eq!(acc.result(), Some(100));
4869    }
4870
4871    #[test]
4872    fn test_last_value_multiple_events() {
4873        let mut acc = LastValueAccumulator::default();
4874        acc.add((100, 1000));
4875        acc.add((200, 500)); // Earlier - ignored
4876        acc.add((300, 1500)); // Latest timestamp wins
4877
4878        assert_eq!(acc.result(), Some(300));
4879    }
4880
4881    #[test]
4882    fn test_last_value_same_timestamp() {
4883        let mut acc = LastValueAccumulator::default();
4884        acc.add((100, 1000));
4885        acc.add((200, 1000)); // Same timestamp - keep latest arrival
4886
4887        assert_eq!(acc.result(), Some(200));
4888    }
4889
4890    #[test]
4891    fn test_last_value_merge() {
4892        let mut acc1 = LastValueAccumulator::default();
4893        acc1.add((100, 1000));
4894
4895        let mut acc2 = LastValueAccumulator::default();
4896        acc2.add((200, 1500)); // Later
4897
4898        acc1.merge(&acc2);
4899        assert_eq!(acc1.result(), Some(200)); // 1500 > 1000
4900    }
4901
4902    #[test]
4903    fn test_last_value_merge_same_timestamp() {
4904        let mut acc1 = LastValueAccumulator::default();
4905        acc1.add((100, 1000));
4906
4907        let mut acc2 = LastValueAccumulator::default();
4908        acc2.add((200, 1000)); // Same timestamp
4909
4910        acc1.merge(&acc2);
4911        assert_eq!(acc1.result(), Some(200)); // Take other on same timestamp
4912    }
4913
4914    #[test]
4915    fn test_first_value_f64_basic() {
4916        let mut acc = FirstValueF64Accumulator::default();
4917        acc.add((100.5, 1000));
4918        acc.add((200.5, 500)); // Earlier
4919        acc.add((300.5, 1500)); // Later
4920
4921        let result = acc.result().unwrap();
4922        assert!((result - 200.5).abs() < f64::EPSILON);
4923    }
4924
4925    #[test]
4926    fn test_last_value_f64_basic() {
4927        let mut acc = LastValueF64Accumulator::default();
4928        acc.add((100.5, 1000));
4929        acc.add((200.5, 500)); // Earlier
4930        acc.add((300.5, 1500)); // Later
4931
4932        let result = acc.result().unwrap();
4933        assert!((result - 300.5).abs() < f64::EPSILON);
4934    }
4935
4936    #[test]
4937    fn test_first_value_aggregator_extract() {
4938        let aggregator = FirstValueAggregator::new(0, 1);
4939
4940        // Create event with value and timestamp columns
4941        let schema = Arc::new(Schema::new(vec![
4942            Field::new("price", DataType::Int64, false),
4943            Field::new("ts", DataType::Int64, false),
4944        ]));
4945        let batch = RecordBatch::try_new(
4946            schema,
4947            vec![
4948                Arc::new(Int64Array::from(vec![100])),
4949                Arc::new(Int64Array::from(vec![1000])),
4950            ],
4951        )
4952        .unwrap();
4953        let event = Event::new(1000, batch);
4954
4955        let extracted = aggregator.extract(&event);
4956        assert_eq!(extracted, Some((100, 1000)));
4957    }
4958
4959    #[test]
4960    fn test_last_value_aggregator_extract() {
4961        let aggregator = LastValueAggregator::new(0, 1);
4962
4963        let schema = Arc::new(Schema::new(vec![
4964            Field::new("price", DataType::Int64, false),
4965            Field::new("ts", DataType::Int64, false),
4966        ]));
4967        let batch = RecordBatch::try_new(
4968            schema,
4969            vec![
4970                Arc::new(Int64Array::from(vec![100])),
4971                Arc::new(Int64Array::from(vec![1000])),
4972            ],
4973        )
4974        .unwrap();
4975        let event = Event::new(1000, batch);
4976
4977        let extracted = aggregator.extract(&event);
4978        assert_eq!(extracted, Some((100, 1000)));
4979    }
4980
4981    #[test]
4982    fn test_first_value_aggregator_invalid_column() {
4983        let aggregator = FirstValueAggregator::new(5, 6); // Out of bounds
4984
4985        let schema = Arc::new(Schema::new(vec![Field::new(
4986            "price",
4987            DataType::Int64,
4988            false,
4989        )]));
4990        let batch =
4991            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![100]))]).unwrap();
4992        let event = Event::new(1000, batch);
4993
4994        assert_eq!(aggregator.extract(&event), None);
4995    }
4996
4997    #[test]
4998    fn test_ohlc_simulation() {
4999        // Simulate OHLC bar generation:
5000        // Open = FIRST_VALUE(price)
5001        // High = MAX(price)
5002        // Low = MIN(price)
5003        // Close = LAST_VALUE(price)
5004        // Volume = SUM(quantity)
5005
5006        // Trades: (price, timestamp, quantity)
5007        // t=100: price=100, qty=10
5008        // t=200: price=105, qty=5
5009        // t=300: price=98, qty=15
5010        // t=400: price=102, qty=8
5011
5012        let mut first = FirstValueAccumulator::default();
5013        let mut max = MaxAccumulator::default();
5014        let mut min = MinAccumulator::default();
5015        let mut last = LastValueAccumulator::default();
5016        let mut sum = SumAccumulator::default();
5017
5018        // Process trades
5019        first.add((100, 100));
5020        max.add(100);
5021        min.add(100);
5022        last.add((100, 100));
5023        sum.add(10);
5024
5025        first.add((105, 200));
5026        max.add(105);
5027        min.add(105);
5028        last.add((105, 200));
5029        sum.add(5);
5030
5031        first.add((98, 300));
5032        max.add(98);
5033        min.add(98);
5034        last.add((98, 300));
5035        sum.add(15);
5036
5037        first.add((102, 400));
5038        max.add(102);
5039        min.add(102);
5040        last.add((102, 400));
5041        sum.add(8);
5042
5043        // Expected OHLC: open=100, high=105, low=98, close=102, volume=38
5044        assert_eq!(first.result(), Some(100), "Open");
5045        assert_eq!(max.result(), Some(105), "High");
5046        assert_eq!(min.result(), Some(98), "Low");
5047        assert_eq!(last.result(), Some(102), "Close");
5048        assert_eq!(sum.result(), 38, "Volume");
5049    }
5050
5051    #[test]
5052    fn test_first_value_checkpoint_restore() {
5053        let mut acc = FirstValueAccumulator::default();
5054        acc.add((100, 1000));
5055        acc.add((200, 500)); // Earlier - this wins
5056
5057        // Serialize
5058        let bytes = rkyv::to_bytes::<RkyvError>(&acc)
5059            .expect("serialize")
5060            .to_vec();
5061
5062        // Deserialize
5063        let restored =
5064            rkyv::from_bytes::<FirstValueAccumulator, RkyvError>(&bytes).expect("deserialize");
5065
5066        assert_eq!(restored.result(), Some(200));
5067        assert_eq!(restored.timestamp, Some(500));
5068    }
5069
5070    #[test]
5071    fn test_last_value_checkpoint_restore() {
5072        let mut acc = LastValueAccumulator::default();
5073        acc.add((100, 1000));
5074        acc.add((300, 1500)); // Later - this wins
5075
5076        // Serialize
5077        let bytes = rkyv::to_bytes::<RkyvError>(&acc)
5078            .expect("serialize")
5079            .to_vec();
5080
5081        // Deserialize
5082        let restored =
5083            rkyv::from_bytes::<LastValueAccumulator, RkyvError>(&bytes).expect("deserialize");
5084
5085        assert_eq!(restored.result(), Some(300));
5086        assert_eq!(restored.timestamp, Some(1500));
5087    }
5088
5089    // ════════════════════════════════════════════════════════════════════════
5090    // F074: Composite Aggregator & f64 Type Support Tests
5091    // ════════════════════════════════════════════════════════════════════════
5092
5093    // ── ScalarResult tests ──────────────────────────────────────────────────
5094
5095    #[test]
5096    fn test_scalar_result_int64_conversions() {
5097        let r = ScalarResult::Int64(42);
5098        assert_eq!(r.to_i64_lossy(), 42);
5099        assert!((r.to_f64_lossy() - 42.0).abs() < f64::EPSILON);
5100        assert!(!r.is_null());
5101        assert_eq!(r.data_type(), DataType::Int64);
5102    }
5103
5104    #[test]
5105    fn test_scalar_result_float64_conversions() {
5106        let r = ScalarResult::Float64(3.125);
5107        assert_eq!(r.to_i64_lossy(), 3); // truncated
5108        assert!((r.to_f64_lossy() - 3.125).abs() < f64::EPSILON);
5109        assert!(!r.is_null());
5110        assert_eq!(r.data_type(), DataType::Float64);
5111    }
5112
5113    #[test]
5114    fn test_scalar_result_uint64_conversions() {
5115        let r = ScalarResult::UInt64(100);
5116        assert_eq!(r.to_i64_lossy(), 100);
5117        assert!((r.to_f64_lossy() - 100.0).abs() < f64::EPSILON);
5118        assert_eq!(r.data_type(), DataType::UInt64);
5119    }
5120
5121    #[test]
5122    fn test_scalar_result_null_variants() {
5123        assert!(ScalarResult::Null.is_null());
5124        assert!(ScalarResult::OptionalInt64(None).is_null());
5125        assert!(ScalarResult::OptionalFloat64(None).is_null());
5126        assert!(!ScalarResult::OptionalInt64(Some(1)).is_null());
5127        assert!(!ScalarResult::OptionalFloat64(Some(1.0)).is_null());
5128    }
5129
5130    #[test]
5131    fn test_scalar_result_optional_conversions() {
5132        let r = ScalarResult::OptionalFloat64(Some(2.5));
5133        assert_eq!(r.to_i64_lossy(), 2);
5134        assert!((r.to_f64_lossy() - 2.5).abs() < f64::EPSILON);
5135
5136        let r2 = ScalarResult::OptionalInt64(None);
5137        assert_eq!(r2.to_i64_lossy(), 0);
5138        assert!((r2.to_f64_lossy()).abs() < f64::EPSILON);
5139    }
5140
5141    // ── f64 aggregator tests ────────────────────────────────────────────────
5142
5143    fn make_f64_event(values: &[f64]) -> Event {
5144        let schema = Arc::new(Schema::new(vec![Field::new(
5145            "value",
5146            DataType::Float64,
5147            false,
5148        )]));
5149        let batch = RecordBatch::try_new(
5150            schema,
5151            vec![Arc::new(arrow_array::Float64Array::from(values.to_vec()))],
5152        )
5153        .unwrap();
5154        Event::new(1000, batch)
5155    }
5156
5157    #[test]
5158    fn test_sum_f64_accumulator_basic() {
5159        let mut acc = SumF64IndexedAccumulator::new(0);
5160        let event = make_f64_event(&[1.5, 2.5, 3.0]);
5161        acc.add_event(&event);
5162        assert!(!acc.is_empty());
5163        match acc.result_scalar() {
5164            ScalarResult::Float64(v) => assert!((v - 7.0).abs() < f64::EPSILON),
5165            other => panic!("Expected Float64, got {other:?}"),
5166        }
5167    }
5168
5169    #[test]
5170    fn test_sum_f64_accumulator_empty() {
5171        let acc = SumF64IndexedAccumulator::new(0);
5172        assert!(acc.is_empty());
5173        assert!(matches!(acc.result_scalar(), ScalarResult::Null));
5174    }
5175
5176    #[test]
5177    fn test_min_f64_accumulator_basic() {
5178        let mut acc = MinF64IndexedAccumulator::new(0);
5179        let event = make_f64_event(&[3.0, 1.5, 2.5]);
5180        acc.add_event(&event);
5181        match acc.result_scalar() {
5182            ScalarResult::OptionalFloat64(Some(v)) => {
5183                assert!((v - 1.5).abs() < f64::EPSILON);
5184            }
5185            other => panic!("Expected OptionalFloat64(Some), got {other:?}"),
5186        }
5187    }
5188
5189    #[test]
5190    fn test_max_f64_accumulator_basic() {
5191        let mut acc = MaxF64IndexedAccumulator::new(0);
5192        let event = make_f64_event(&[3.0, 1.5, 2.5]);
5193        acc.add_event(&event);
5194        match acc.result_scalar() {
5195            ScalarResult::OptionalFloat64(Some(v)) => {
5196                assert!((v - 3.0).abs() < f64::EPSILON);
5197            }
5198            other => panic!("Expected OptionalFloat64(Some), got {other:?}"),
5199        }
5200    }
5201
5202    #[test]
5203    fn test_avg_f64_accumulator_basic() {
5204        let mut acc = AvgF64IndexedAccumulator::new(0);
5205        let event = make_f64_event(&[1.0, 2.0, 3.0]);
5206        acc.add_event(&event);
5207        match acc.result_scalar() {
5208            ScalarResult::Float64(v) => assert!((v - 2.0).abs() < f64::EPSILON),
5209            other => panic!("Expected Float64, got {other:?}"),
5210        }
5211    }
5212
5213    #[test]
5214    fn test_sum_f64_merge() {
5215        let mut acc1 = SumF64IndexedAccumulator::new(0);
5216        let mut acc2 = SumF64IndexedAccumulator::new(0);
5217        acc1.add_event(&make_f64_event(&[1.0, 2.0]));
5218        acc2.add_event(&make_f64_event(&[3.0, 4.0]));
5219        acc1.merge_dyn(&acc2);
5220        match acc1.result_scalar() {
5221            ScalarResult::Float64(v) => assert!((v - 10.0).abs() < f64::EPSILON),
5222            other => panic!("Expected Float64, got {other:?}"),
5223        }
5224    }
5225
5226    #[test]
5227    fn test_min_f64_merge() {
5228        let mut acc1 = MinF64IndexedAccumulator::new(0);
5229        let mut acc2 = MinF64IndexedAccumulator::new(0);
5230        acc1.add_event(&make_f64_event(&[5.0]));
5231        acc2.add_event(&make_f64_event(&[2.0]));
5232        acc1.merge_dyn(&acc2);
5233        match acc1.result_scalar() {
5234            ScalarResult::OptionalFloat64(Some(v)) => {
5235                assert!((v - 2.0).abs() < f64::EPSILON);
5236            }
5237            other => panic!("Expected OptionalFloat64(Some), got {other:?}"),
5238        }
5239    }
5240
5241    #[test]
5242    fn test_f64_accumulator_serialization() {
5243        let mut acc = SumF64IndexedAccumulator::new(0);
5244        acc.add_event(&make_f64_event(&[1.5, 2.5]));
5245        let data = acc.serialize();
5246        assert_eq!(data.len(), 16); // 8 bytes sum + 8 bytes count
5247    }
5248
5249    // ── Count DynAccumulator tests ──────────────────────────────────────────
5250
5251    #[test]
5252    fn test_count_dyn_accumulator() {
5253        let mut acc = CountDynAccumulator::default();
5254        let event = make_f64_event(&[1.0, 2.0, 3.0]);
5255        acc.add_event(&event);
5256        assert_eq!(acc.result_scalar(), ScalarResult::Int64(3));
5257    }
5258
5259    #[test]
5260    fn test_count_dyn_merge() {
5261        let mut acc1 = CountDynAccumulator::default();
5262        let mut acc2 = CountDynAccumulator::default();
5263        acc1.add_event(&make_f64_event(&[1.0, 2.0]));
5264        acc2.add_event(&make_f64_event(&[3.0]));
5265        acc1.merge_dyn(&acc2);
5266        assert_eq!(acc1.result_scalar(), ScalarResult::Int64(3));
5267    }
5268
5269    // ── FirstValue/LastValue DynAccumulator tests ───────────────────────────
5270
5271    fn make_ts_f64_event(values: &[(f64, i64)]) -> Event {
5272        let schema = Arc::new(Schema::new(vec![
5273            Field::new("value", DataType::Float64, false),
5274            Field::new("timestamp", DataType::Int64, false),
5275        ]));
5276        let vals: Vec<f64> = values.iter().map(|(v, _)| *v).collect();
5277        let tss: Vec<i64> = values.iter().map(|(_, t)| *t).collect();
5278        let batch = RecordBatch::try_new(
5279            schema,
5280            vec![
5281                Arc::new(arrow_array::Float64Array::from(vals)),
5282                Arc::new(Int64Array::from(tss)),
5283            ],
5284        )
5285        .unwrap();
5286        Event::new(1000, batch)
5287    }
5288
5289    #[test]
5290    fn test_first_value_f64_dyn() {
5291        let mut acc = FirstValueF64DynAccumulator::new(0, 1);
5292        acc.add_event(&make_ts_f64_event(&[(10.0, 200), (20.0, 100), (30.0, 300)]));
5293        // Earliest timestamp is 100 → value 20.0
5294        match acc.result_scalar() {
5295            ScalarResult::OptionalFloat64(Some(v)) => {
5296                assert!((v - 20.0).abs() < f64::EPSILON);
5297            }
5298            other => panic!("Expected OptionalFloat64(Some), got {other:?}"),
5299        }
5300    }
5301
5302    #[test]
5303    fn test_last_value_f64_dyn() {
5304        let mut acc = LastValueF64DynAccumulator::new(0, 1);
5305        acc.add_event(&make_ts_f64_event(&[(10.0, 200), (20.0, 100), (30.0, 300)]));
5306        // Latest timestamp is 300 → value 30.0
5307        match acc.result_scalar() {
5308            ScalarResult::OptionalFloat64(Some(v)) => {
5309                assert!((v - 30.0).abs() < f64::EPSILON);
5310            }
5311            other => panic!("Expected OptionalFloat64(Some), got {other:?}"),
5312        }
5313    }
5314
5315    #[test]
5316    fn test_first_value_f64_dyn_merge() {
5317        let mut acc1 = FirstValueF64DynAccumulator::new(0, 1);
5318        let mut acc2 = FirstValueF64DynAccumulator::new(0, 1);
5319        acc1.add_event(&make_ts_f64_event(&[(10.0, 200)]));
5320        acc2.add_event(&make_ts_f64_event(&[(20.0, 50)]));
5321        acc1.merge_dyn(&acc2);
5322        // acc2 has earlier timestamp (50) → value 20.0
5323        match acc1.result_scalar() {
5324            ScalarResult::OptionalFloat64(Some(v)) => {
5325                assert!((v - 20.0).abs() < f64::EPSILON);
5326            }
5327            other => panic!("Expected OptionalFloat64(Some), got {other:?}"),
5328        }
5329    }
5330
5331    // ── CompositeAggregator tests ───────────────────────────────────────────
5332
5333    #[test]
5334    fn test_composite_aggregator_creation() {
5335        let agg = CompositeAggregator::new(vec![
5336            Box::new(CountDynFactory::new("cnt")),
5337            Box::new(SumF64Factory::new(0, "total")),
5338        ]);
5339        assert_eq!(agg.num_aggregates(), 2);
5340    }
5341
5342    #[test]
5343    fn test_composite_aggregator_schema() {
5344        let agg = CompositeAggregator::new(vec![
5345            Box::new(CountDynFactory::new("cnt")),
5346            Box::new(MinF64Factory::new(0, "low")),
5347            Box::new(MaxF64Factory::new(0, "high")),
5348        ]);
5349        let schema = agg.output_schema();
5350        assert_eq!(schema.fields().len(), 5); // window_start, window_end, cnt, low, high
5351        assert_eq!(schema.field(0).name(), "window_start");
5352        assert_eq!(schema.field(1).name(), "window_end");
5353        assert_eq!(schema.field(2).name(), "cnt");
5354        assert_eq!(schema.field(3).name(), "low");
5355        assert_eq!(schema.field(4).name(), "high");
5356    }
5357
5358    #[test]
5359    fn test_composite_accumulator_lifecycle() {
5360        let agg = CompositeAggregator::new(vec![
5361            Box::new(CountDynFactory::new("cnt")),
5362            Box::new(SumF64Factory::new(0, "total")),
5363        ]);
5364        let mut acc = agg.create_accumulator();
5365        assert!(acc.is_empty());
5366        assert_eq!(acc.num_accumulators(), 2);
5367
5368        let event = make_f64_event(&[1.0, 2.0, 3.0]);
5369        acc.add_event(&event);
5370        assert!(!acc.is_empty());
5371
5372        let results = acc.results();
5373        assert_eq!(results.len(), 2);
5374        assert_eq!(results[0], ScalarResult::Int64(3));
5375        match &results[1] {
5376            ScalarResult::Float64(v) => assert!((v - 6.0).abs() < f64::EPSILON),
5377            other => panic!("Expected Float64, got {other:?}"),
5378        }
5379    }
5380
5381    #[test]
5382    fn test_composite_accumulator_merge() {
5383        let agg = CompositeAggregator::new(vec![
5384            Box::new(CountDynFactory::new("cnt")),
5385            Box::new(SumF64Factory::new(0, "total")),
5386        ]);
5387        let mut acc1 = agg.create_accumulator();
5388        let acc2_holder = agg.create_accumulator();
5389        // Fill acc1
5390        acc1.add_event(&make_f64_event(&[1.0, 2.0]));
5391
5392        // We need a mutable acc2 to add events
5393        let mut acc2 = acc2_holder;
5394        acc2.add_event(&make_f64_event(&[3.0, 4.0]));
5395
5396        acc1.merge(&acc2);
5397        let results = acc1.results();
5398        assert_eq!(results[0], ScalarResult::Int64(4));
5399        match &results[1] {
5400            ScalarResult::Float64(v) => assert!((v - 10.0).abs() < f64::EPSILON),
5401            other => panic!("Expected Float64, got {other:?}"),
5402        }
5403    }
5404
5405    #[test]
5406    fn test_composite_accumulator_serialization() {
5407        let agg = CompositeAggregator::new(vec![
5408            Box::new(CountDynFactory::new("cnt")),
5409            Box::new(SumF64Factory::new(0, "total")),
5410        ]);
5411        let mut acc = agg.create_accumulator();
5412        acc.add_event(&make_f64_event(&[1.0, 2.0]));
5413
5414        let bytes = acc.serialize();
5415        // Should be non-empty with header and two accumulator entries
5416        assert!(bytes.len() > 4);
5417        // Header: 4 bytes for count
5418        let n = u32::from_le_bytes(bytes[..4].try_into().unwrap());
5419        assert_eq!(n, 2);
5420    }
5421
5422    #[test]
5423    fn test_composite_accumulator_record_batch() {
5424        let agg = CompositeAggregator::new(vec![
5425            Box::new(CountDynFactory::new("cnt")),
5426            Box::new(MinF64Factory::new(0, "low")),
5427            Box::new(MaxF64Factory::new(0, "high")),
5428        ]);
5429        let schema = agg.output_schema();
5430        let mut acc = agg.create_accumulator();
5431        acc.add_event(&make_f64_event(&[3.0, 1.0, 5.0, 2.0]));
5432
5433        let window_id = WindowId::new(0, 60000);
5434        let batch = acc.to_record_batch(&window_id, &schema).unwrap();
5435
5436        assert_eq!(batch.num_rows(), 1);
5437        assert_eq!(batch.num_columns(), 5);
5438
5439        // window_start = 0
5440        let ws = batch
5441            .column(0)
5442            .as_any()
5443            .downcast_ref::<Int64Array>()
5444            .unwrap();
5445        assert_eq!(ws.value(0), 0);
5446
5447        // window_end = 60000
5448        let we = batch
5449            .column(1)
5450            .as_any()
5451            .downcast_ref::<Int64Array>()
5452            .unwrap();
5453        assert_eq!(we.value(0), 60000);
5454
5455        // count = 4
5456        let cnt = batch
5457            .column(2)
5458            .as_any()
5459            .downcast_ref::<Int64Array>()
5460            .unwrap();
5461        assert_eq!(cnt.value(0), 4);
5462
5463        // min = 1.0
5464        let low = batch
5465            .column(3)
5466            .as_any()
5467            .downcast_ref::<arrow_array::Float64Array>()
5468            .unwrap();
5469        assert!((low.value(0) - 1.0).abs() < f64::EPSILON);
5470
5471        // max = 5.0
5472        let high = batch
5473            .column(4)
5474            .as_any()
5475            .downcast_ref::<arrow_array::Float64Array>()
5476            .unwrap();
5477        assert!((high.value(0) - 5.0).abs() < f64::EPSILON);
5478    }
5479
5480    #[test]
5481    fn test_ohlc_composite_integration() {
5482        // Simulate OHLC query: FIRST(price), MAX(price), MIN(price), LAST(price), SUM(qty), COUNT(*)
5483        let agg = CompositeAggregator::new(vec![
5484            Box::new(FirstValueF64DynFactory::new(0, 1, "open")),
5485            Box::new(MaxF64Factory::new(0, "high")),
5486            Box::new(MinF64Factory::new(0, "low")),
5487            Box::new(LastValueF64DynFactory::new(0, 1, "close")),
5488            Box::new(CountDynFactory::new("trade_count")),
5489        ]);
5490
5491        let mut acc = agg.create_accumulator();
5492
5493        // Trades: (price, timestamp)
5494        let schema = Arc::new(Schema::new(vec![
5495            Field::new("price", DataType::Float64, false),
5496            Field::new("ts", DataType::Int64, false),
5497        ]));
5498
5499        // Trade 1: price=100.0 at t=1000
5500        let batch1 = RecordBatch::try_new(
5501            Arc::clone(&schema),
5502            vec![
5503                Arc::new(arrow_array::Float64Array::from(vec![100.0])),
5504                Arc::new(Int64Array::from(vec![1000])),
5505            ],
5506        )
5507        .unwrap();
5508        acc.add_event(&Event::new(1000, batch1));
5509
5510        // Trade 2: price=105.0 at t=2000
5511        let batch2 = RecordBatch::try_new(
5512            Arc::clone(&schema),
5513            vec![
5514                Arc::new(arrow_array::Float64Array::from(vec![105.0])),
5515                Arc::new(Int64Array::from(vec![2000])),
5516            ],
5517        )
5518        .unwrap();
5519        acc.add_event(&Event::new(2000, batch2));
5520
5521        // Trade 3: price=98.0 at t=3000
5522        let batch3 = RecordBatch::try_new(
5523            Arc::clone(&schema),
5524            vec![
5525                Arc::new(arrow_array::Float64Array::from(vec![98.0])),
5526                Arc::new(Int64Array::from(vec![3000])),
5527            ],
5528        )
5529        .unwrap();
5530        acc.add_event(&Event::new(3000, batch3));
5531
5532        // Trade 4: price=102.0 at t=4000
5533        let batch4 = RecordBatch::try_new(
5534            Arc::clone(&schema),
5535            vec![
5536                Arc::new(arrow_array::Float64Array::from(vec![102.0])),
5537                Arc::new(Int64Array::from(vec![4000])),
5538            ],
5539        )
5540        .unwrap();
5541        acc.add_event(&Event::new(4000, batch4));
5542
5543        let results = acc.results();
5544        // OHLC: Open=100, High=105, Low=98, Close=102, Count=4
5545        match &results[0] {
5546            ScalarResult::OptionalFloat64(Some(v)) => {
5547                assert!((v - 100.0).abs() < f64::EPSILON, "Open should be 100.0");
5548            }
5549            other => panic!("Expected Open=100.0, got {other:?}"),
5550        }
5551        match &results[1] {
5552            ScalarResult::OptionalFloat64(Some(v)) => {
5553                assert!((v - 105.0).abs() < f64::EPSILON, "High should be 105.0");
5554            }
5555            other => panic!("Expected High=105.0, got {other:?}"),
5556        }
5557        match &results[2] {
5558            ScalarResult::OptionalFloat64(Some(v)) => {
5559                assert!((v - 98.0).abs() < f64::EPSILON, "Low should be 98.0");
5560            }
5561            other => panic!("Expected Low=98.0, got {other:?}"),
5562        }
5563        match &results[3] {
5564            ScalarResult::OptionalFloat64(Some(v)) => {
5565                assert!((v - 102.0).abs() < f64::EPSILON, "Close should be 102.0");
5566            }
5567            other => panic!("Expected Close=102.0, got {other:?}"),
5568        }
5569        assert_eq!(results[4], ScalarResult::Int64(4));
5570    }
5571
5572    #[test]
5573    fn test_composite_aggregator_clone() {
5574        let agg = CompositeAggregator::new(vec![
5575            Box::new(CountDynFactory::new("cnt")),
5576            Box::new(SumF64Factory::new(0, "total")),
5577        ]);
5578        let cloned = agg.clone();
5579        assert_eq!(cloned.num_aggregates(), 2);
5580
5581        // Create accumulators from both and verify they produce same results
5582        let mut acc1 = agg.create_accumulator();
5583        let mut acc2 = cloned.create_accumulator();
5584        let event = make_f64_event(&[5.0]);
5585        acc1.add_event(&event);
5586        acc2.add_event(&event);
5587        assert_eq!(acc1.results(), acc2.results());
5588    }
5589
5590    #[test]
5591    fn test_composite_accumulator_clone() {
5592        let agg = CompositeAggregator::new(vec![Box::new(CountDynFactory::new("cnt"))]);
5593        let mut acc = agg.create_accumulator();
5594        acc.add_event(&make_f64_event(&[1.0, 2.0]));
5595
5596        let cloned = acc.clone();
5597        assert_eq!(acc.results(), cloned.results());
5598    }
5599
5600    #[test]
5601    fn test_backward_compat_existing_aggregators_unchanged() {
5602        // Verify existing static-dispatch aggregators still work
5603        let count_agg = CountAggregator::new();
5604        let mut count_acc = count_agg.create_accumulator();
5605        count_acc.add(());
5606        count_acc.add(());
5607        assert_eq!(count_acc.result(), 2);
5608
5609        let sum_agg = SumAggregator::new(0);
5610        let mut sum_acc = sum_agg.create_accumulator();
5611        sum_acc.add(10);
5612        sum_acc.add(20);
5613        assert_eq!(sum_acc.result(), 30);
5614    }
5615
5616    #[test]
5617    fn test_backward_compat_result_to_i64() {
5618        // Verify ResultToI64 still works for existing types
5619        assert_eq!(42u64.to_i64(), 42);
5620        assert_eq!(42i64.to_i64(), 42);
5621        assert_eq!(Some(42i64).to_i64(), 42);
5622        assert_eq!(None::<i64>.to_i64(), 0);
5623    }
5624
5625    #[test]
5626    fn test_backward_compat_window_schema_unchanged() {
5627        let schema = create_window_output_schema();
5628        assert_eq!(schema.fields().len(), 3);
5629        assert_eq!(schema.field(0).name(), "window_start");
5630        assert_eq!(schema.field(1).name(), "window_end");
5631        assert_eq!(schema.field(2).name(), "result");
5632    }
5633
5634    #[test]
5635    fn test_f64_accumulator_out_of_range_column() {
5636        // Column index out of range should not panic
5637        let mut acc = SumF64IndexedAccumulator::new(99);
5638        acc.add_event(&make_f64_event(&[1.0, 2.0]));
5639        assert!(acc.is_empty());
5640    }
5641
5642    #[test]
5643    fn test_f64_factory_types() {
5644        let sum_factory = SumF64Factory::new(0, "total");
5645        assert_eq!(sum_factory.type_tag(), "sum_f64");
5646        assert_eq!(sum_factory.result_field().name(), "total");
5647
5648        let min_factory = MinF64Factory::new(1, "low");
5649        assert_eq!(min_factory.type_tag(), "min_f64");
5650
5651        let max_factory = MaxF64Factory::new(1, "high");
5652        assert_eq!(max_factory.type_tag(), "max_f64");
5653
5654        let avg_factory = AvgF64Factory::new(0, "average");
5655        assert_eq!(avg_factory.type_tag(), "avg_f64");
5656    }
5657}