1use super::{
40 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
41};
42use crate::state::{StateStore, StateStoreExt};
43use arrow_array::{Array as ArrowArray, Int64Array, RecordBatch};
44use arrow_schema::{DataType, Field, Schema, SchemaRef};
45use rkyv::{
46 api::high::{HighDeserializer, HighSerializer, HighValidator},
47 bytecheck::CheckBytes,
48 rancor::Error as RkyvError,
49 ser::allocator::ArenaHandle,
50 util::AlignedVec,
51 Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
52};
53use smallvec::SmallVec;
54use std::marker::PhantomData;
55use std::sync::atomic::{AtomicU64, Ordering};
56use std::sync::Arc;
57use std::time::Duration;
58
59#[derive(Debug, Clone, PartialEq, Eq, Default)]
77pub struct LateDataConfig {
78 side_output: Option<String>,
80}
81
82impl LateDataConfig {
83 #[must_use]
85 pub fn drop() -> Self {
86 Self { side_output: None }
87 }
88
89 #[must_use]
91 pub fn with_side_output(name: String) -> Self {
92 Self {
93 side_output: Some(name),
94 }
95 }
96
97 #[must_use]
99 pub fn side_output(&self) -> Option<&str> {
100 self.side_output.as_deref()
101 }
102
103 #[must_use]
105 pub fn should_drop(&self) -> bool {
106 self.side_output.is_none()
107 }
108}
109
110#[derive(Debug, Clone, Default)]
115#[allow(clippy::struct_field_names)]
116pub struct LateDataMetrics {
117 late_events_total: u64,
119 late_events_dropped: u64,
121 late_events_side_output: u64,
123}
124
125impl LateDataMetrics {
126 #[must_use]
128 pub fn new() -> Self {
129 Self::default()
130 }
131
132 #[must_use]
134 pub fn late_events_total(&self) -> u64 {
135 self.late_events_total
136 }
137
138 #[must_use]
140 pub fn late_events_dropped(&self) -> u64 {
141 self.late_events_dropped
142 }
143
144 #[must_use]
146 pub fn late_events_side_output(&self) -> u64 {
147 self.late_events_side_output
148 }
149
150 pub fn record_dropped(&mut self) {
152 self.late_events_total += 1;
153 self.late_events_dropped += 1;
154 }
155
156 pub fn record_side_output(&mut self) {
158 self.late_events_total += 1;
159 self.late_events_side_output += 1;
160 }
161
162 pub fn reset(&mut self) {
164 self.late_events_total = 0;
165 self.late_events_dropped = 0;
166 self.late_events_side_output = 0;
167 }
168}
169
170#[derive(Debug, Clone, PartialEq, Eq, Default)]
180pub enum EmitStrategy {
181 #[default]
188 OnWatermark,
189
190 Periodic(Duration),
197
198 OnUpdate,
205
206 OnWindowClose,
219
220 Changelog,
234
235 Final,
245}
246
247impl EmitStrategy {
248 #[must_use]
250 pub fn needs_periodic_timer(&self) -> bool {
251 matches!(self, Self::Periodic(_))
252 }
253
254 #[must_use]
256 pub fn periodic_interval(&self) -> Option<Duration> {
257 match self {
258 Self::Periodic(d) => Some(*d),
259 _ => None,
260 }
261 }
262
263 #[must_use]
265 pub fn emits_on_update(&self) -> bool {
266 matches!(self, Self::OnUpdate)
267 }
268
269 #[must_use]
283 pub fn emits_intermediate(&self) -> bool {
284 matches!(self, Self::OnUpdate | Self::Periodic(_))
285 }
286
287 #[must_use]
292 pub fn requires_changelog(&self) -> bool {
293 matches!(self, Self::Changelog)
294 }
295
296 #[must_use]
303 pub fn is_append_only_compatible(&self) -> bool {
304 matches!(self, Self::OnWindowClose | Self::Final)
305 }
306
307 #[must_use]
319 pub fn generates_retractions(&self) -> bool {
320 matches!(self, Self::OnWatermark | Self::OnUpdate | Self::Changelog)
321 }
322
323 #[must_use]
327 pub fn suppresses_intermediate(&self) -> bool {
328 matches!(self, Self::OnWindowClose | Self::Final)
329 }
330
331 #[must_use]
336 pub fn drops_late_data(&self) -> bool {
337 matches!(self, Self::Final)
338 }
339}
340
341#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Archive, RkyvSerialize, RkyvDeserialize)]
346pub struct WindowId {
347 pub start: i64,
349 pub end: i64,
351}
352
353impl WindowId {
354 #[must_use]
356 pub fn new(start: i64, end: i64) -> Self {
357 Self { start, end }
358 }
359
360 #[must_use]
362 pub fn duration_ms(&self) -> i64 {
363 self.end - self.start
364 }
365
366 #[inline]
371 #[must_use]
372 pub fn to_key(&self) -> super::TimerKey {
373 super::TimerKey::from(self.to_key_inline())
374 }
375
376 #[inline]
381 #[must_use]
382 pub fn to_key_inline(&self) -> [u8; 16] {
383 let mut key = [0u8; 16];
384 key[..8].copy_from_slice(&self.start.to_be_bytes());
385 key[8..16].copy_from_slice(&self.end.to_be_bytes());
386 key
387 }
388
389 #[must_use]
395 pub fn from_key(key: &[u8]) -> Option<Self> {
396 if key.len() != 16 {
397 return None;
398 }
399 let start = i64::from_be_bytes(key[0..8].try_into().ok()?);
400 let end = i64::from_be_bytes(key[8..16].try_into().ok()?);
401 Some(Self { start, end })
402 }
403}
404
405pub type WindowIdVec = SmallVec<[WindowId; 4]>;
411
412#[derive(Debug, Clone, Copy, PartialEq, Eq, Archive, RkyvSerialize, RkyvDeserialize)]
422pub enum CdcOperation {
423 Insert,
425 Delete,
427 UpdateBefore,
429 UpdateAfter,
431}
432
433impl CdcOperation {
434 #[must_use]
439 pub fn weight(&self) -> i32 {
440 match self {
441 Self::Insert | Self::UpdateAfter => 1,
442 Self::Delete | Self::UpdateBefore => -1,
443 }
444 }
445
446 #[must_use]
448 pub fn is_insert(&self) -> bool {
449 matches!(self, Self::Insert | Self::UpdateAfter)
450 }
451
452 #[must_use]
454 pub fn is_delete(&self) -> bool {
455 matches!(self, Self::Delete | Self::UpdateBefore)
456 }
457
458 #[must_use]
464 pub fn debezium_op(&self) -> char {
465 match self {
466 Self::Insert => 'c',
467 Self::Delete => 'd',
468 Self::UpdateBefore | Self::UpdateAfter => 'u',
469 }
470 }
471
472 #[inline]
476 #[must_use]
477 pub fn to_u8(self) -> u8 {
478 match self {
479 Self::Insert => 0,
480 Self::Delete => 1,
481 Self::UpdateBefore => 2,
482 Self::UpdateAfter => 3,
483 }
484 }
485
486 #[inline]
488 #[must_use]
489 pub fn from_u8(value: u8) -> Self {
490 match value {
491 1 => Self::Delete,
492 2 => Self::UpdateBefore,
493 3 => Self::UpdateAfter,
494 _ => Self::Insert,
496 }
497 }
498}
499
500#[derive(Debug, Clone)]
535pub struct ChangelogRecord {
536 pub operation: CdcOperation,
538 pub weight: i32,
540 pub emit_timestamp: i64,
542 pub event: Event,
544}
545
546impl ChangelogRecord {
547 #[must_use]
549 pub fn insert(event: Event, emit_timestamp: i64) -> Self {
550 Self {
551 operation: CdcOperation::Insert,
552 weight: 1,
553 emit_timestamp,
554 event,
555 }
556 }
557
558 #[must_use]
560 pub fn delete(event: Event, emit_timestamp: i64) -> Self {
561 Self {
562 operation: CdcOperation::Delete,
563 weight: -1,
564 emit_timestamp,
565 event,
566 }
567 }
568
569 #[must_use]
575 pub fn update(old_event: Event, new_event: Event, emit_timestamp: i64) -> (Self, Self) {
576 let before = Self {
577 operation: CdcOperation::UpdateBefore,
578 weight: -1,
579 emit_timestamp,
580 event: old_event,
581 };
582 let after = Self {
583 operation: CdcOperation::UpdateAfter,
584 weight: 1,
585 emit_timestamp,
586 event: new_event,
587 };
588 (before, after)
589 }
590
591 #[must_use]
593 pub fn new(operation: CdcOperation, event: Event, emit_timestamp: i64) -> Self {
594 Self {
595 operation,
596 weight: operation.weight(),
597 emit_timestamp,
598 event,
599 }
600 }
601
602 #[must_use]
604 pub fn is_insert(&self) -> bool {
605 self.operation.is_insert()
606 }
607
608 #[must_use]
610 pub fn is_delete(&self) -> bool {
611 self.operation.is_delete()
612 }
613}
614
615pub trait WindowAssigner: Send {
617 fn assign_windows(&self, timestamp: i64) -> WindowIdVec;
622
623 fn max_timestamp(&self, window_end: i64) -> i64 {
628 window_end - 1
629 }
630}
631
632#[derive(Debug, Clone)]
637pub struct TumblingWindowAssigner {
638 size_ms: i64,
640}
641
642impl TumblingWindowAssigner {
643 #[must_use]
653 pub fn new(size: Duration) -> Self {
654 let size_ms = i64::try_from(size.as_millis()).expect("Window size must fit in i64");
656 assert!(size_ms > 0, "Window size must be positive");
657 Self { size_ms }
658 }
659
660 #[must_use]
666 pub fn from_millis(size_ms: i64) -> Self {
667 assert!(size_ms > 0, "Window size must be positive");
668 Self { size_ms }
669 }
670
671 #[must_use]
673 pub fn size_ms(&self) -> i64 {
674 self.size_ms
675 }
676
677 #[inline]
681 #[must_use]
682 pub fn assign(&self, timestamp: i64) -> WindowId {
683 let window_start = if timestamp >= 0 {
685 (timestamp / self.size_ms) * self.size_ms
686 } else {
687 ((timestamp - self.size_ms + 1) / self.size_ms) * self.size_ms
689 };
690 let window_end = window_start + self.size_ms;
691 WindowId::new(window_start, window_end)
692 }
693}
694
695impl WindowAssigner for TumblingWindowAssigner {
696 #[inline]
697 fn assign_windows(&self, timestamp: i64) -> WindowIdVec {
698 let mut windows = WindowIdVec::new();
699 windows.push(self.assign(timestamp));
700 windows
701 }
702}
703
704pub trait ResultToI64 {
708 fn to_i64(&self) -> i64;
710}
711
712impl ResultToI64 for u64 {
713 fn to_i64(&self) -> i64 {
714 i64::try_from(*self).unwrap_or(i64::MAX)
715 }
716}
717
718impl ResultToI64 for i64 {
719 fn to_i64(&self) -> i64 {
720 *self
721 }
722}
723
724impl ResultToI64 for Option<i64> {
725 fn to_i64(&self) -> i64 {
726 self.unwrap_or(0)
727 }
728}
729
730impl ResultToI64 for Option<f64> {
731 fn to_i64(&self) -> i64 {
732 #[allow(clippy::cast_possible_truncation)]
734 self.map(|f| f as i64).unwrap_or(0)
735 }
736}
737
738pub trait Accumulator: Default + Clone + Send {
746 type Input;
748 type Output: ResultToI64;
750
751 fn add(&mut self, value: Self::Input);
753
754 fn merge(&mut self, other: &Self);
756
757 fn result(&self) -> Self::Output;
759
760 fn is_empty(&self) -> bool;
762}
763
764pub trait Aggregator: Send + Clone {
769 type Acc: Accumulator;
771
772 fn create_accumulator(&self) -> Self::Acc;
774
775 fn extract(&self, event: &Event) -> Option<<Self::Acc as Accumulator>::Input>;
779}
780
781#[derive(Debug, Clone, Default)]
783pub struct CountAggregator;
784
785#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
787pub struct CountAccumulator {
788 count: u64,
789}
790
791impl CountAggregator {
792 #[must_use]
794 pub fn new() -> Self {
795 Self
796 }
797}
798
799impl Accumulator for CountAccumulator {
800 type Input = ();
801 type Output = u64;
802
803 fn add(&mut self, _value: ()) {
804 self.count += 1;
805 }
806
807 fn merge(&mut self, other: &Self) {
808 self.count += other.count;
809 }
810
811 fn result(&self) -> u64 {
812 self.count
813 }
814
815 fn is_empty(&self) -> bool {
816 self.count == 0
817 }
818}
819
820impl Aggregator for CountAggregator {
821 type Acc = CountAccumulator;
822
823 fn create_accumulator(&self) -> CountAccumulator {
824 CountAccumulator::default()
825 }
826
827 fn extract(&self, _event: &Event) -> Option<()> {
828 Some(())
829 }
830}
831
832#[derive(Debug, Clone)]
834pub struct SumAggregator {
835 column_index: usize,
837}
838
839#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
841pub struct SumAccumulator {
842 sum: i64,
843 count: u64,
844}
845
846impl SumAggregator {
847 #[must_use]
849 pub fn new(column_index: usize) -> Self {
850 Self { column_index }
851 }
852}
853
854impl Accumulator for SumAccumulator {
855 type Input = i64;
856 type Output = i64;
857
858 fn add(&mut self, value: i64) {
859 self.sum += value;
860 self.count += 1;
861 }
862
863 fn merge(&mut self, other: &Self) {
864 self.sum += other.sum;
865 self.count += other.count;
866 }
867
868 fn result(&self) -> i64 {
869 self.sum
870 }
871
872 fn is_empty(&self) -> bool {
873 self.count == 0
874 }
875}
876
877impl Aggregator for SumAggregator {
878 type Acc = SumAccumulator;
879
880 fn create_accumulator(&self) -> SumAccumulator {
881 SumAccumulator::default()
882 }
883
884 fn extract(&self, event: &Event) -> Option<i64> {
885 use arrow_array::cast::AsArray;
886 use arrow_array::types::Int64Type;
887
888 let batch = &event.data;
889 if self.column_index >= batch.num_columns() {
890 return None;
891 }
892
893 let column = batch.column(self.column_index);
894 let array = column.as_primitive_opt::<Int64Type>()?;
895
896 Some(array.iter().flatten().sum())
898 }
899}
900
901#[derive(Debug, Clone)]
903pub struct MinAggregator {
904 column_index: usize,
905}
906
907#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
909pub struct MinAccumulator {
910 min: Option<i64>,
911}
912
913impl MinAggregator {
914 #[must_use]
916 pub fn new(column_index: usize) -> Self {
917 Self { column_index }
918 }
919}
920
921impl Accumulator for MinAccumulator {
922 type Input = i64;
923 type Output = Option<i64>;
924
925 fn add(&mut self, value: i64) {
926 self.min = Some(self.min.map_or(value, |m| m.min(value)));
927 }
928
929 fn merge(&mut self, other: &Self) {
930 if let Some(other_min) = other.min {
931 self.add(other_min);
932 }
933 }
934
935 fn result(&self) -> Option<i64> {
936 self.min
937 }
938
939 fn is_empty(&self) -> bool {
940 self.min.is_none()
941 }
942}
943
944impl Aggregator for MinAggregator {
945 type Acc = MinAccumulator;
946
947 fn create_accumulator(&self) -> MinAccumulator {
948 MinAccumulator::default()
949 }
950
951 fn extract(&self, event: &Event) -> Option<i64> {
952 use arrow_array::cast::AsArray;
953 use arrow_array::types::Int64Type;
954
955 let batch = &event.data;
956 if self.column_index >= batch.num_columns() {
957 return None;
958 }
959
960 let column = batch.column(self.column_index);
961 let array = column.as_primitive_opt::<Int64Type>()?;
962
963 array.iter().flatten().min()
964 }
965}
966
967#[derive(Debug, Clone)]
969pub struct MaxAggregator {
970 column_index: usize,
971}
972
973#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
975pub struct MaxAccumulator {
976 max: Option<i64>,
977}
978
979impl MaxAggregator {
980 #[must_use]
982 pub fn new(column_index: usize) -> Self {
983 Self { column_index }
984 }
985}
986
987impl Accumulator for MaxAccumulator {
988 type Input = i64;
989 type Output = Option<i64>;
990
991 fn add(&mut self, value: i64) {
992 self.max = Some(self.max.map_or(value, |m| m.max(value)));
993 }
994
995 fn merge(&mut self, other: &Self) {
996 if let Some(other_max) = other.max {
997 self.add(other_max);
998 }
999 }
1000
1001 fn result(&self) -> Option<i64> {
1002 self.max
1003 }
1004
1005 fn is_empty(&self) -> bool {
1006 self.max.is_none()
1007 }
1008}
1009
1010impl Aggregator for MaxAggregator {
1011 type Acc = MaxAccumulator;
1012
1013 fn create_accumulator(&self) -> MaxAccumulator {
1014 MaxAccumulator::default()
1015 }
1016
1017 fn extract(&self, event: &Event) -> Option<i64> {
1018 use arrow_array::cast::AsArray;
1019 use arrow_array::types::Int64Type;
1020
1021 let batch = &event.data;
1022 if self.column_index >= batch.num_columns() {
1023 return None;
1024 }
1025
1026 let column = batch.column(self.column_index);
1027 let array = column.as_primitive_opt::<Int64Type>()?;
1028
1029 array.iter().flatten().max()
1030 }
1031}
1032
1033#[derive(Debug, Clone)]
1035pub struct AvgAggregator {
1036 column_index: usize,
1037}
1038
1039#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1041pub struct AvgAccumulator {
1042 sum: i64,
1043 count: u64,
1044}
1045
1046impl AvgAggregator {
1047 #[must_use]
1049 pub fn new(column_index: usize) -> Self {
1050 Self { column_index }
1051 }
1052}
1053
1054impl Accumulator for AvgAccumulator {
1055 type Input = i64;
1056 type Output = Option<f64>;
1057
1058 fn add(&mut self, value: i64) {
1059 self.sum += value;
1060 self.count += 1;
1061 }
1062
1063 fn merge(&mut self, other: &Self) {
1064 self.sum += other.sum;
1065 self.count += other.count;
1066 }
1067
1068 #[allow(clippy::cast_precision_loss)]
1070 fn result(&self) -> Option<f64> {
1071 if self.count == 0 {
1072 None
1073 } else {
1074 Some(self.sum as f64 / self.count as f64)
1075 }
1076 }
1077
1078 fn is_empty(&self) -> bool {
1079 self.count == 0
1080 }
1081}
1082
1083impl Aggregator for AvgAggregator {
1084 type Acc = AvgAccumulator;
1085
1086 fn create_accumulator(&self) -> AvgAccumulator {
1087 AvgAccumulator::default()
1088 }
1089
1090 fn extract(&self, event: &Event) -> Option<i64> {
1091 use arrow_array::cast::AsArray;
1092 use arrow_array::types::Int64Type;
1093
1094 let batch = &event.data;
1095 if self.column_index >= batch.num_columns() {
1096 return None;
1097 }
1098
1099 let column = batch.column(self.column_index);
1100 let array = column.as_primitive_opt::<Int64Type>()?;
1101
1102 array.iter().flatten().next()
1104 }
1105}
1106
1107#[derive(Debug, Clone)]
1123pub struct FirstValueAggregator {
1124 value_column_index: usize,
1126 timestamp_column_index: usize,
1128}
1129
1130#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1134#[rkyv(compare(PartialEq), derive(Debug))]
1135pub struct FirstValueAccumulator {
1136 value: Option<i64>,
1138 timestamp: Option<i64>,
1140}
1141
1142impl FirstValueAggregator {
1143 #[must_use]
1150 pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1151 Self {
1152 value_column_index,
1153 timestamp_column_index,
1154 }
1155 }
1156}
1157
1158impl Accumulator for FirstValueAccumulator {
1159 type Input = (i64, i64); type Output = Option<i64>;
1161
1162 fn add(&mut self, (value, timestamp): (i64, i64)) {
1163 match self.timestamp {
1164 None => {
1165 self.value = Some(value);
1167 self.timestamp = Some(timestamp);
1168 }
1169 Some(existing_ts) if timestamp < existing_ts => {
1170 self.value = Some(value);
1172 self.timestamp = Some(timestamp);
1173 }
1174 _ => {
1175 }
1177 }
1178 }
1179
1180 fn merge(&mut self, other: &Self) {
1181 match (self.timestamp, other.timestamp) {
1182 (None, Some(_)) => {
1183 self.value = other.value;
1184 self.timestamp = other.timestamp;
1185 }
1186 (Some(self_ts), Some(other_ts)) if other_ts < self_ts => {
1187 self.value = other.value;
1188 self.timestamp = other.timestamp;
1189 }
1190 _ => {
1191 }
1193 }
1194 }
1195
1196 fn result(&self) -> Option<i64> {
1197 self.value
1198 }
1199
1200 fn is_empty(&self) -> bool {
1201 self.value.is_none()
1202 }
1203}
1204
1205impl Aggregator for FirstValueAggregator {
1206 type Acc = FirstValueAccumulator;
1207
1208 fn create_accumulator(&self) -> FirstValueAccumulator {
1209 FirstValueAccumulator::default()
1210 }
1211
1212 fn extract(&self, event: &Event) -> Option<(i64, i64)> {
1213 use arrow_array::cast::AsArray;
1214 use arrow_array::types::Int64Type;
1215
1216 let batch = &event.data;
1217 if self.value_column_index >= batch.num_columns()
1218 || self.timestamp_column_index >= batch.num_columns()
1219 {
1220 return None;
1221 }
1222
1223 let value_col = batch.column(self.value_column_index);
1225 let value_array = value_col.as_primitive_opt::<Int64Type>()?;
1226 let value = value_array.iter().flatten().next()?;
1227
1228 let ts_col = batch.column(self.timestamp_column_index);
1230 let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1231 let timestamp = ts_array.iter().flatten().next()?;
1232
1233 Some((value, timestamp))
1234 }
1235}
1236
1237#[derive(Debug, Clone)]
1252pub struct LastValueAggregator {
1253 value_column_index: usize,
1255 timestamp_column_index: usize,
1257}
1258
1259#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1263#[rkyv(compare(PartialEq), derive(Debug))]
1264pub struct LastValueAccumulator {
1265 value: Option<i64>,
1267 timestamp: Option<i64>,
1269}
1270
1271impl LastValueAggregator {
1272 #[must_use]
1279 pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1280 Self {
1281 value_column_index,
1282 timestamp_column_index,
1283 }
1284 }
1285}
1286
1287impl Accumulator for LastValueAccumulator {
1288 type Input = (i64, i64); type Output = Option<i64>;
1290
1291 fn add(&mut self, (value, timestamp): (i64, i64)) {
1292 match self.timestamp {
1293 None => {
1294 self.value = Some(value);
1296 self.timestamp = Some(timestamp);
1297 }
1298 Some(existing_ts) if timestamp > existing_ts => {
1299 self.value = Some(value);
1301 self.timestamp = Some(timestamp);
1302 }
1303 Some(existing_ts) if timestamp == existing_ts => {
1304 self.value = Some(value);
1306 }
1307 _ => {
1308 }
1310 }
1311 }
1312
1313 fn merge(&mut self, other: &Self) {
1314 match (self.timestamp, other.timestamp) {
1315 (None, Some(_)) => {
1316 self.value = other.value;
1317 self.timestamp = other.timestamp;
1318 }
1319 (Some(self_ts), Some(other_ts)) if other_ts > self_ts => {
1320 self.value = other.value;
1321 self.timestamp = other.timestamp;
1322 }
1323 (Some(self_ts), Some(other_ts)) if other_ts == self_ts => {
1324 self.value = other.value;
1326 }
1327 _ => {
1328 }
1330 }
1331 }
1332
1333 fn result(&self) -> Option<i64> {
1334 self.value
1335 }
1336
1337 fn is_empty(&self) -> bool {
1338 self.value.is_none()
1339 }
1340}
1341
1342impl Aggregator for LastValueAggregator {
1343 type Acc = LastValueAccumulator;
1344
1345 fn create_accumulator(&self) -> LastValueAccumulator {
1346 LastValueAccumulator::default()
1347 }
1348
1349 fn extract(&self, event: &Event) -> Option<(i64, i64)> {
1350 use arrow_array::cast::AsArray;
1351 use arrow_array::types::Int64Type;
1352
1353 let batch = &event.data;
1354 if self.value_column_index >= batch.num_columns()
1355 || self.timestamp_column_index >= batch.num_columns()
1356 {
1357 return None;
1358 }
1359
1360 let value_col = batch.column(self.value_column_index);
1362 let value_array = value_col.as_primitive_opt::<Int64Type>()?;
1363 let value = value_array.iter().flatten().next()?;
1364
1365 let ts_col = batch.column(self.timestamp_column_index);
1367 let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1368 let timestamp = ts_array.iter().flatten().next()?;
1369
1370 Some((value, timestamp))
1371 }
1372}
1373
1374#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1378#[rkyv(compare(PartialEq), derive(Debug))]
1379pub struct FirstValueF64Accumulator {
1380 value: Option<i64>, timestamp: Option<i64>,
1384}
1385
1386impl FirstValueF64Accumulator {
1387 #[must_use]
1389 #[allow(clippy::cast_sign_loss)]
1390 pub fn result_f64(&self) -> Option<f64> {
1391 self.value.map(|bits| f64::from_bits(bits as u64))
1392 }
1393}
1394
1395impl Accumulator for FirstValueF64Accumulator {
1396 type Input = (f64, i64); type Output = Option<f64>;
1398
1399 fn add(&mut self, (value, timestamp): (f64, i64)) {
1400 #[allow(clippy::cast_possible_wrap)]
1402 let value_bits = value.to_bits() as i64;
1403 match self.timestamp {
1404 None => {
1405 self.value = Some(value_bits);
1406 self.timestamp = Some(timestamp);
1407 }
1408 Some(existing_ts) if timestamp < existing_ts => {
1409 self.value = Some(value_bits);
1410 self.timestamp = Some(timestamp);
1411 }
1412 _ => {}
1413 }
1414 }
1415
1416 fn merge(&mut self, other: &Self) {
1417 match (self.timestamp, other.timestamp) {
1418 (None, Some(_)) => {
1419 self.value = other.value;
1420 self.timestamp = other.timestamp;
1421 }
1422 (Some(self_ts), Some(other_ts)) if other_ts < self_ts => {
1423 self.value = other.value;
1424 self.timestamp = other.timestamp;
1425 }
1426 _ => {}
1427 }
1428 }
1429
1430 #[allow(clippy::cast_sign_loss)]
1431 fn result(&self) -> Option<f64> {
1432 self.value.map(|bits| f64::from_bits(bits as u64))
1433 }
1434
1435 fn is_empty(&self) -> bool {
1436 self.value.is_none()
1437 }
1438}
1439
1440#[derive(Debug, Clone)]
1442pub struct FirstValueF64Aggregator {
1443 value_column_index: usize,
1445 timestamp_column_index: usize,
1447}
1448
1449impl FirstValueF64Aggregator {
1450 #[must_use]
1452 pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1453 Self {
1454 value_column_index,
1455 timestamp_column_index,
1456 }
1457 }
1458}
1459
1460impl Aggregator for FirstValueF64Aggregator {
1461 type Acc = FirstValueF64Accumulator;
1462
1463 fn create_accumulator(&self) -> FirstValueF64Accumulator {
1464 FirstValueF64Accumulator::default()
1465 }
1466
1467 fn extract(&self, event: &Event) -> Option<(f64, i64)> {
1468 use arrow_array::cast::AsArray;
1469 use arrow_array::types::{Float64Type, Int64Type};
1470
1471 let batch = &event.data;
1472 if self.value_column_index >= batch.num_columns()
1473 || self.timestamp_column_index >= batch.num_columns()
1474 {
1475 return None;
1476 }
1477
1478 let value_col = batch.column(self.value_column_index);
1480 let value_array = value_col.as_primitive_opt::<Float64Type>()?;
1481 let value = value_array.iter().flatten().next()?;
1482
1483 let ts_col = batch.column(self.timestamp_column_index);
1485 let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1486 let timestamp = ts_array.iter().flatten().next()?;
1487
1488 Some((value, timestamp))
1489 }
1490}
1491
1492#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1494#[rkyv(compare(PartialEq), derive(Debug))]
1495pub struct LastValueF64Accumulator {
1496 value: Option<i64>, timestamp: Option<i64>,
1500}
1501
1502impl LastValueF64Accumulator {
1503 #[must_use]
1505 #[allow(clippy::cast_sign_loss)]
1506 pub fn result_f64(&self) -> Option<f64> {
1507 self.value.map(|bits| f64::from_bits(bits as u64))
1508 }
1509}
1510
1511impl Accumulator for LastValueF64Accumulator {
1512 type Input = (f64, i64); type Output = Option<f64>;
1514
1515 fn add(&mut self, (value, timestamp): (f64, i64)) {
1516 #[allow(clippy::cast_possible_wrap)]
1518 let value_bits = value.to_bits() as i64;
1519 match self.timestamp {
1520 None => {
1521 self.value = Some(value_bits);
1522 self.timestamp = Some(timestamp);
1523 }
1524 Some(existing_ts) if timestamp > existing_ts => {
1525 self.value = Some(value_bits);
1526 self.timestamp = Some(timestamp);
1527 }
1528 Some(existing_ts) if timestamp == existing_ts => {
1529 self.value = Some(value_bits);
1530 }
1531 _ => {}
1532 }
1533 }
1534
1535 fn merge(&mut self, other: &Self) {
1536 match (self.timestamp, other.timestamp) {
1537 (None, Some(_)) => {
1538 self.value = other.value;
1539 self.timestamp = other.timestamp;
1540 }
1541 (Some(self_ts), Some(other_ts)) if other_ts > self_ts => {
1542 self.value = other.value;
1543 self.timestamp = other.timestamp;
1544 }
1545 (Some(self_ts), Some(other_ts)) if other_ts == self_ts => {
1546 self.value = other.value;
1547 }
1548 _ => {}
1549 }
1550 }
1551
1552 #[allow(clippy::cast_sign_loss)]
1553 fn result(&self) -> Option<f64> {
1554 self.value.map(|bits| f64::from_bits(bits as u64))
1555 }
1556
1557 fn is_empty(&self) -> bool {
1558 self.value.is_none()
1559 }
1560}
1561
1562#[derive(Debug, Clone)]
1564pub struct LastValueF64Aggregator {
1565 value_column_index: usize,
1567 timestamp_column_index: usize,
1569}
1570
1571impl LastValueF64Aggregator {
1572 #[must_use]
1574 pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1575 Self {
1576 value_column_index,
1577 timestamp_column_index,
1578 }
1579 }
1580}
1581
1582impl Aggregator for LastValueF64Aggregator {
1583 type Acc = LastValueF64Accumulator;
1584
1585 fn create_accumulator(&self) -> LastValueF64Accumulator {
1586 LastValueF64Accumulator::default()
1587 }
1588
1589 fn extract(&self, event: &Event) -> Option<(f64, i64)> {
1590 use arrow_array::cast::AsArray;
1591 use arrow_array::types::{Float64Type, Int64Type};
1592
1593 let batch = &event.data;
1594 if self.value_column_index >= batch.num_columns()
1595 || self.timestamp_column_index >= batch.num_columns()
1596 {
1597 return None;
1598 }
1599
1600 let value_col = batch.column(self.value_column_index);
1602 let value_array = value_col.as_primitive_opt::<Float64Type>()?;
1603 let value = value_array.iter().flatten().next()?;
1604
1605 let ts_col = batch.column(self.timestamp_column_index);
1607 let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1608 let timestamp = ts_array.iter().flatten().next()?;
1609
1610 Some((value, timestamp))
1611 }
1612}
1613
1614#[derive(Debug, Clone, PartialEq)]
1632pub enum ScalarResult {
1633 Int64(i64),
1635 Float64(f64),
1637 UInt64(u64),
1639 OptionalInt64(Option<i64>),
1641 OptionalFloat64(Option<f64>),
1643 Null,
1645}
1646
1647impl ScalarResult {
1648 #[must_use]
1650 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
1651 pub fn to_i64_lossy(&self) -> i64 {
1652 match self {
1653 Self::Int64(v) => *v,
1654 Self::Float64(v) => *v as i64,
1655 Self::UInt64(v) => i64::try_from(*v).unwrap_or(i64::MAX),
1656 Self::OptionalInt64(v) => v.unwrap_or(0),
1657 Self::OptionalFloat64(v) => v.map(|f| f as i64).unwrap_or(0),
1658 Self::Null => 0,
1659 }
1660 }
1661
1662 #[must_use]
1664 #[allow(clippy::cast_precision_loss)]
1665 pub fn to_f64_lossy(&self) -> f64 {
1666 match self {
1667 Self::Int64(v) => *v as f64,
1668 Self::Float64(v) => *v,
1669 Self::UInt64(v) => *v as f64,
1670 Self::OptionalInt64(v) => v.map(|i| i as f64).unwrap_or(0.0),
1671 Self::OptionalFloat64(v) => v.unwrap_or(0.0),
1672 Self::Null => 0.0,
1673 }
1674 }
1675
1676 #[must_use]
1678 pub fn is_null(&self) -> bool {
1679 matches!(
1680 self,
1681 Self::Null | Self::OptionalInt64(None) | Self::OptionalFloat64(None)
1682 )
1683 }
1684
1685 #[must_use]
1687 pub fn data_type(&self) -> DataType {
1688 match self {
1689 Self::Int64(_) | Self::OptionalInt64(_) => DataType::Int64,
1690 Self::Float64(_) | Self::OptionalFloat64(_) => DataType::Float64,
1691 Self::UInt64(_) => DataType::UInt64,
1692 Self::Null => DataType::Null,
1693 }
1694 }
1695}
1696
1697pub trait DynAccumulator: Send {
1709 fn add_event(&mut self, event: &Event);
1711
1712 fn merge_dyn(&mut self, other: &dyn DynAccumulator);
1718
1719 fn result_scalar(&self) -> ScalarResult;
1721
1722 fn is_empty(&self) -> bool;
1724
1725 fn clone_box(&self) -> Box<dyn DynAccumulator>;
1727
1728 fn serialize(&self) -> Vec<u8>;
1730
1731 fn result_field(&self) -> Field;
1733
1734 fn type_tag(&self) -> &'static str;
1736
1737 fn as_any(&self) -> &dyn std::any::Any;
1739}
1740
1741pub trait DynAggregatorFactory: Send + Sync {
1746 fn create_accumulator(&self) -> Box<dyn DynAccumulator>;
1748
1749 fn result_field(&self) -> Field;
1751
1752 fn clone_box(&self) -> Box<dyn DynAggregatorFactory>;
1754
1755 fn type_tag(&self) -> &'static str;
1757}
1758
1759#[derive(Debug, Clone)]
1763pub struct SumF64Aggregator {
1764 column_index: usize,
1766}
1767
1768#[derive(Debug, Clone, Default)]
1770pub struct SumF64Accumulator {
1771 sum: f64,
1773 count: u64,
1775}
1776
1777impl SumF64Aggregator {
1778 #[must_use]
1780 pub fn new(column_index: usize) -> Self {
1781 Self { column_index }
1782 }
1783
1784 #[must_use]
1786 pub fn column_index(&self) -> usize {
1787 self.column_index
1788 }
1789}
1790
1791impl SumF64Accumulator {
1792 #[must_use]
1794 pub fn sum(&self) -> f64 {
1795 self.sum
1796 }
1797}
1798
1799impl DynAccumulator for SumF64Accumulator {
1800 fn add_event(&mut self, event: &Event) {
1801 use arrow_array::cast::AsArray;
1802 use arrow_array::types::Float64Type;
1803
1804 let batch = &event.data;
1807 if batch.num_columns() == 0 {
1808 return;
1809 }
1810 if let Some(array) = batch.column(0).as_primitive_opt::<Float64Type>() {
1812 for val in array.iter().flatten() {
1813 self.sum += val;
1814 self.count += 1;
1815 }
1816 }
1817 }
1818
1819 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
1820 let data = other.serialize();
1822 if data.len() == 16 {
1823 let sum = f64::from_le_bytes(data[..8].try_into().unwrap());
1824 let count = u64::from_le_bytes(data[8..16].try_into().unwrap());
1825 self.sum += sum;
1826 self.count += count;
1827 }
1828 }
1829
1830 fn result_scalar(&self) -> ScalarResult {
1831 if self.count == 0 {
1832 ScalarResult::Null
1833 } else {
1834 ScalarResult::Float64(self.sum)
1835 }
1836 }
1837
1838 fn is_empty(&self) -> bool {
1839 self.count == 0
1840 }
1841
1842 fn clone_box(&self) -> Box<dyn DynAccumulator> {
1843 Box::new(self.clone())
1844 }
1845
1846 fn serialize(&self) -> Vec<u8> {
1847 let mut buf = Vec::with_capacity(16);
1848 buf.extend_from_slice(&self.sum.to_le_bytes());
1849 buf.extend_from_slice(&self.count.to_le_bytes());
1850 buf
1851 }
1852
1853 fn result_field(&self) -> Field {
1854 Field::new("sum_f64", DataType::Float64, true)
1855 }
1856
1857 fn type_tag(&self) -> &'static str {
1858 "sum_f64"
1859 }
1860
1861 fn as_any(&self) -> &dyn std::any::Any {
1862 self
1863 }
1864}
1865
1866#[derive(Debug, Clone)]
1868pub struct SumF64Factory {
1869 column_index: usize,
1871 field_name: String,
1873}
1874
1875impl SumF64Factory {
1876 #[must_use]
1878 pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
1879 Self {
1880 column_index,
1881 field_name: field_name.into(),
1882 }
1883 }
1884}
1885
1886impl DynAggregatorFactory for SumF64Factory {
1887 fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
1888 Box::new(SumF64IndexedAccumulator::new(self.column_index))
1889 }
1890
1891 fn result_field(&self) -> Field {
1892 Field::new(&self.field_name, DataType::Float64, true)
1893 }
1894
1895 fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
1896 Box::new(self.clone())
1897 }
1898
1899 fn type_tag(&self) -> &'static str {
1900 "sum_f64"
1901 }
1902}
1903
1904#[derive(Debug, Clone)]
1906pub struct SumF64IndexedAccumulator {
1907 column_index: usize,
1909 sum: f64,
1911 count: u64,
1913}
1914
1915impl SumF64IndexedAccumulator {
1916 #[must_use]
1918 pub fn new(column_index: usize) -> Self {
1919 Self {
1920 column_index,
1921 sum: 0.0,
1922 count: 0,
1923 }
1924 }
1925}
1926
1927impl DynAccumulator for SumF64IndexedAccumulator {
1928 fn add_event(&mut self, event: &Event) {
1929 use arrow_array::cast::AsArray;
1930 use arrow_array::types::Float64Type;
1931
1932 let batch = &event.data;
1933 if self.column_index >= batch.num_columns() {
1934 return;
1935 }
1936 if let Some(array) = batch
1937 .column(self.column_index)
1938 .as_primitive_opt::<Float64Type>()
1939 {
1940 for val in array.iter().flatten() {
1941 self.sum += val;
1942 self.count += 1;
1943 }
1944 }
1945 }
1946
1947 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
1948 let data = other.serialize();
1949 if data.len() >= 16 {
1950 let sum = f64::from_le_bytes(data[..8].try_into().unwrap());
1951 let count = u64::from_le_bytes(data[8..16].try_into().unwrap());
1952 self.sum += sum;
1953 self.count += count;
1954 }
1955 }
1956
1957 fn result_scalar(&self) -> ScalarResult {
1958 if self.count == 0 {
1959 ScalarResult::Null
1960 } else {
1961 ScalarResult::Float64(self.sum)
1962 }
1963 }
1964
1965 fn is_empty(&self) -> bool {
1966 self.count == 0
1967 }
1968
1969 fn clone_box(&self) -> Box<dyn DynAccumulator> {
1970 Box::new(self.clone())
1971 }
1972
1973 fn serialize(&self) -> Vec<u8> {
1974 let mut buf = Vec::with_capacity(16);
1975 buf.extend_from_slice(&self.sum.to_le_bytes());
1976 buf.extend_from_slice(&self.count.to_le_bytes());
1977 buf
1978 }
1979
1980 fn result_field(&self) -> Field {
1981 Field::new("sum_f64", DataType::Float64, true)
1982 }
1983
1984 fn type_tag(&self) -> &'static str {
1985 "sum_f64"
1986 }
1987
1988 fn as_any(&self) -> &dyn std::any::Any {
1989 self
1990 }
1991}
1992
1993#[derive(Debug, Clone)]
1995pub struct MinF64Factory {
1996 column_index: usize,
1998 field_name: String,
2000}
2001
2002impl MinF64Factory {
2003 #[must_use]
2005 pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
2006 Self {
2007 column_index,
2008 field_name: field_name.into(),
2009 }
2010 }
2011}
2012
2013impl DynAggregatorFactory for MinF64Factory {
2014 fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2015 Box::new(MinF64IndexedAccumulator::new(self.column_index))
2016 }
2017
2018 fn result_field(&self) -> Field {
2019 Field::new(&self.field_name, DataType::Float64, true)
2020 }
2021
2022 fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2023 Box::new(self.clone())
2024 }
2025
2026 fn type_tag(&self) -> &'static str {
2027 "min_f64"
2028 }
2029}
2030
2031#[derive(Debug, Clone)]
2033pub struct MinF64IndexedAccumulator {
2034 column_index: usize,
2036 min: Option<f64>,
2038}
2039
2040impl MinF64IndexedAccumulator {
2041 #[must_use]
2043 pub fn new(column_index: usize) -> Self {
2044 Self {
2045 column_index,
2046 min: None,
2047 }
2048 }
2049}
2050
2051impl DynAccumulator for MinF64IndexedAccumulator {
2052 fn add_event(&mut self, event: &Event) {
2053 use arrow_array::cast::AsArray;
2054 use arrow_array::types::Float64Type;
2055
2056 let batch = &event.data;
2057 if self.column_index >= batch.num_columns() {
2058 return;
2059 }
2060 if let Some(array) = batch
2061 .column(self.column_index)
2062 .as_primitive_opt::<Float64Type>()
2063 {
2064 for val in array.iter().flatten() {
2065 self.min = Some(self.min.map_or(val, |m: f64| m.min(val)));
2066 }
2067 }
2068 }
2069
2070 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2071 let data = other.serialize();
2072 if data.len() >= 9 && data[0] == 1 {
2073 let other_min = f64::from_le_bytes(data[1..9].try_into().unwrap());
2074 self.min = Some(self.min.map_or(other_min, |m: f64| m.min(other_min)));
2075 }
2076 }
2077
2078 fn result_scalar(&self) -> ScalarResult {
2079 ScalarResult::OptionalFloat64(self.min)
2080 }
2081
2082 fn is_empty(&self) -> bool {
2083 self.min.is_none()
2084 }
2085
2086 fn clone_box(&self) -> Box<dyn DynAccumulator> {
2087 Box::new(self.clone())
2088 }
2089
2090 fn serialize(&self) -> Vec<u8> {
2091 match self.min {
2092 Some(v) => {
2093 let mut buf = Vec::with_capacity(9);
2094 buf.push(1); buf.extend_from_slice(&v.to_le_bytes());
2096 buf
2097 }
2098 None => vec![0],
2099 }
2100 }
2101
2102 fn result_field(&self) -> Field {
2103 Field::new("min_f64", DataType::Float64, true)
2104 }
2105
2106 fn type_tag(&self) -> &'static str {
2107 "min_f64"
2108 }
2109
2110 fn as_any(&self) -> &dyn std::any::Any {
2111 self
2112 }
2113}
2114
2115#[derive(Debug, Clone)]
2117pub struct MaxF64Factory {
2118 column_index: usize,
2120 field_name: String,
2122}
2123
2124impl MaxF64Factory {
2125 #[must_use]
2127 pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
2128 Self {
2129 column_index,
2130 field_name: field_name.into(),
2131 }
2132 }
2133}
2134
2135impl DynAggregatorFactory for MaxF64Factory {
2136 fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2137 Box::new(MaxF64IndexedAccumulator::new(self.column_index))
2138 }
2139
2140 fn result_field(&self) -> Field {
2141 Field::new(&self.field_name, DataType::Float64, true)
2142 }
2143
2144 fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2145 Box::new(self.clone())
2146 }
2147
2148 fn type_tag(&self) -> &'static str {
2149 "max_f64"
2150 }
2151}
2152
2153#[derive(Debug, Clone)]
2155pub struct MaxF64IndexedAccumulator {
2156 column_index: usize,
2158 max: Option<f64>,
2160}
2161
2162impl MaxF64IndexedAccumulator {
2163 #[must_use]
2165 pub fn new(column_index: usize) -> Self {
2166 Self {
2167 column_index,
2168 max: None,
2169 }
2170 }
2171}
2172
2173impl DynAccumulator for MaxF64IndexedAccumulator {
2174 fn add_event(&mut self, event: &Event) {
2175 use arrow_array::cast::AsArray;
2176 use arrow_array::types::Float64Type;
2177
2178 let batch = &event.data;
2179 if self.column_index >= batch.num_columns() {
2180 return;
2181 }
2182 if let Some(array) = batch
2183 .column(self.column_index)
2184 .as_primitive_opt::<Float64Type>()
2185 {
2186 for val in array.iter().flatten() {
2187 self.max = Some(self.max.map_or(val, |m: f64| m.max(val)));
2188 }
2189 }
2190 }
2191
2192 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2193 let data = other.serialize();
2194 if data.len() >= 9 && data[0] == 1 {
2195 let other_max = f64::from_le_bytes(data[1..9].try_into().unwrap());
2196 self.max = Some(self.max.map_or(other_max, |m: f64| m.max(other_max)));
2197 }
2198 }
2199
2200 fn result_scalar(&self) -> ScalarResult {
2201 ScalarResult::OptionalFloat64(self.max)
2202 }
2203
2204 fn is_empty(&self) -> bool {
2205 self.max.is_none()
2206 }
2207
2208 fn clone_box(&self) -> Box<dyn DynAccumulator> {
2209 Box::new(self.clone())
2210 }
2211
2212 fn serialize(&self) -> Vec<u8> {
2213 match self.max {
2214 Some(v) => {
2215 let mut buf = Vec::with_capacity(9);
2216 buf.push(1);
2217 buf.extend_from_slice(&v.to_le_bytes());
2218 buf
2219 }
2220 None => vec![0],
2221 }
2222 }
2223
2224 fn result_field(&self) -> Field {
2225 Field::new("max_f64", DataType::Float64, true)
2226 }
2227
2228 fn type_tag(&self) -> &'static str {
2229 "max_f64"
2230 }
2231
2232 fn as_any(&self) -> &dyn std::any::Any {
2233 self
2234 }
2235}
2236
2237#[derive(Debug, Clone)]
2239pub struct AvgF64Factory {
2240 column_index: usize,
2242 field_name: String,
2244}
2245
2246impl AvgF64Factory {
2247 #[must_use]
2249 pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
2250 Self {
2251 column_index,
2252 field_name: field_name.into(),
2253 }
2254 }
2255}
2256
2257impl DynAggregatorFactory for AvgF64Factory {
2258 fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2259 Box::new(AvgF64IndexedAccumulator::new(self.column_index))
2260 }
2261
2262 fn result_field(&self) -> Field {
2263 Field::new(&self.field_name, DataType::Float64, true)
2264 }
2265
2266 fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2267 Box::new(self.clone())
2268 }
2269
2270 fn type_tag(&self) -> &'static str {
2271 "avg_f64"
2272 }
2273}
2274
2275#[derive(Debug, Clone)]
2277pub struct AvgF64IndexedAccumulator {
2278 column_index: usize,
2280 sum: f64,
2282 count: u64,
2284}
2285
2286impl AvgF64IndexedAccumulator {
2287 #[must_use]
2289 pub fn new(column_index: usize) -> Self {
2290 Self {
2291 column_index,
2292 sum: 0.0,
2293 count: 0,
2294 }
2295 }
2296}
2297
2298impl DynAccumulator for AvgF64IndexedAccumulator {
2299 fn add_event(&mut self, event: &Event) {
2300 use arrow_array::cast::AsArray;
2301 use arrow_array::types::Float64Type;
2302
2303 let batch = &event.data;
2304 if self.column_index >= batch.num_columns() {
2305 return;
2306 }
2307 if let Some(array) = batch
2308 .column(self.column_index)
2309 .as_primitive_opt::<Float64Type>()
2310 {
2311 for val in array.iter().flatten() {
2312 self.sum += val;
2313 self.count += 1;
2314 }
2315 }
2316 }
2317
2318 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2319 let data = other.serialize();
2320 if data.len() >= 16 {
2321 let sum = f64::from_le_bytes(data[..8].try_into().unwrap());
2322 let count = u64::from_le_bytes(data[8..16].try_into().unwrap());
2323 self.sum += sum;
2324 self.count += count;
2325 }
2326 }
2327
2328 #[allow(clippy::cast_precision_loss)]
2330 fn result_scalar(&self) -> ScalarResult {
2331 if self.count == 0 {
2332 ScalarResult::Null
2333 } else {
2334 ScalarResult::Float64(self.sum / self.count as f64)
2335 }
2336 }
2337
2338 fn is_empty(&self) -> bool {
2339 self.count == 0
2340 }
2341
2342 fn clone_box(&self) -> Box<dyn DynAccumulator> {
2343 Box::new(self.clone())
2344 }
2345
2346 fn serialize(&self) -> Vec<u8> {
2347 let mut buf = Vec::with_capacity(16);
2348 buf.extend_from_slice(&self.sum.to_le_bytes());
2349 buf.extend_from_slice(&self.count.to_le_bytes());
2350 buf
2351 }
2352
2353 fn result_field(&self) -> Field {
2354 Field::new("avg_f64", DataType::Float64, true)
2355 }
2356
2357 fn type_tag(&self) -> &'static str {
2358 "avg_f64"
2359 }
2360
2361 fn as_any(&self) -> &dyn std::any::Any {
2362 self
2363 }
2364}
2365
2366#[derive(Debug, Clone)]
2370pub struct CountDynFactory {
2371 field_name: String,
2373}
2374
2375impl CountDynFactory {
2376 #[must_use]
2378 pub fn new(field_name: impl Into<String>) -> Self {
2379 Self {
2380 field_name: field_name.into(),
2381 }
2382 }
2383}
2384
2385impl DynAggregatorFactory for CountDynFactory {
2386 fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2387 Box::new(CountDynAccumulator::default())
2388 }
2389
2390 fn result_field(&self) -> Field {
2391 Field::new(&self.field_name, DataType::Int64, false)
2392 }
2393
2394 fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2395 Box::new(self.clone())
2396 }
2397
2398 fn type_tag(&self) -> &'static str {
2399 "count"
2400 }
2401}
2402
2403#[derive(Debug, Clone, Default)]
2405pub struct CountDynAccumulator {
2406 count: u64,
2407}
2408
2409impl DynAccumulator for CountDynAccumulator {
2410 fn add_event(&mut self, event: &Event) {
2411 let rows = event.data.num_rows();
2412 self.count += rows as u64;
2413 }
2414
2415 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2416 let data = other.serialize();
2417 if data.len() >= 8 {
2418 let count = u64::from_le_bytes(data[..8].try_into().unwrap());
2419 self.count += count;
2420 }
2421 }
2422
2423 fn result_scalar(&self) -> ScalarResult {
2424 ScalarResult::Int64(i64::try_from(self.count).unwrap_or(i64::MAX))
2425 }
2426
2427 fn is_empty(&self) -> bool {
2428 self.count == 0
2429 }
2430
2431 fn clone_box(&self) -> Box<dyn DynAccumulator> {
2432 Box::new(self.clone())
2433 }
2434
2435 fn serialize(&self) -> Vec<u8> {
2436 self.count.to_le_bytes().to_vec()
2437 }
2438
2439 fn result_field(&self) -> Field {
2440 Field::new("count", DataType::Int64, false)
2441 }
2442
2443 fn type_tag(&self) -> &'static str {
2444 "count"
2445 }
2446
2447 fn as_any(&self) -> &dyn std::any::Any {
2448 self
2449 }
2450}
2451
2452#[derive(Debug, Clone)]
2456pub struct FirstValueF64DynFactory {
2457 value_column_index: usize,
2459 timestamp_column_index: usize,
2461 field_name: String,
2463}
2464
2465impl FirstValueF64DynFactory {
2466 #[must_use]
2468 pub fn new(
2469 value_column_index: usize,
2470 timestamp_column_index: usize,
2471 field_name: impl Into<String>,
2472 ) -> Self {
2473 Self {
2474 value_column_index,
2475 timestamp_column_index,
2476 field_name: field_name.into(),
2477 }
2478 }
2479}
2480
2481impl DynAggregatorFactory for FirstValueF64DynFactory {
2482 fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2483 Box::new(FirstValueF64DynAccumulator::new(
2484 self.value_column_index,
2485 self.timestamp_column_index,
2486 ))
2487 }
2488
2489 fn result_field(&self) -> Field {
2490 Field::new(&self.field_name, DataType::Float64, true)
2491 }
2492
2493 fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2494 Box::new(self.clone())
2495 }
2496
2497 fn type_tag(&self) -> &'static str {
2498 "first_value_f64"
2499 }
2500}
2501
2502#[derive(Debug, Clone)]
2504pub struct FirstValueF64DynAccumulator {
2505 value_column_index: usize,
2506 timestamp_column_index: usize,
2507 value: Option<f64>,
2508 timestamp: Option<i64>,
2509}
2510
2511impl FirstValueF64DynAccumulator {
2512 #[must_use]
2514 pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
2515 Self {
2516 value_column_index,
2517 timestamp_column_index,
2518 value: None,
2519 timestamp: None,
2520 }
2521 }
2522}
2523
2524impl DynAccumulator for FirstValueF64DynAccumulator {
2525 fn add_event(&mut self, event: &Event) {
2526 use arrow_array::cast::AsArray;
2527 use arrow_array::types::{Float64Type, Int64Type};
2528
2529 let batch = &event.data;
2530 if self.value_column_index >= batch.num_columns()
2531 || self.timestamp_column_index >= batch.num_columns()
2532 {
2533 return;
2534 }
2535
2536 let val_col = batch.column(self.value_column_index);
2537 let ts_col = batch.column(self.timestamp_column_index);
2538
2539 let Some(val_array) = val_col.as_primitive_opt::<Float64Type>() else {
2540 return;
2541 };
2542 let Some(ts_array) = ts_col.as_primitive_opt::<Int64Type>() else {
2543 return;
2544 };
2545
2546 for i in 0..batch.num_rows() {
2547 if val_array.is_null(i) || ts_array.is_null(i) {
2548 continue;
2549 }
2550 let val = val_array.value(i);
2551 let ts = ts_array.value(i);
2552
2553 match self.timestamp {
2554 None => {
2555 self.value = Some(val);
2556 self.timestamp = Some(ts);
2557 }
2558 Some(existing_ts) if ts < existing_ts => {
2559 self.value = Some(val);
2560 self.timestamp = Some(ts);
2561 }
2562 _ => {}
2563 }
2564 }
2565 }
2566
2567 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2568 let data = other.serialize();
2569 if data.len() >= 17 && data[0] == 1 {
2570 let other_val = f64::from_le_bytes(data[1..9].try_into().unwrap());
2571 let other_ts = i64::from_le_bytes(data[9..17].try_into().unwrap());
2572 match self.timestamp {
2573 None => {
2574 self.value = Some(other_val);
2575 self.timestamp = Some(other_ts);
2576 }
2577 Some(self_ts) if other_ts < self_ts => {
2578 self.value = Some(other_val);
2579 self.timestamp = Some(other_ts);
2580 }
2581 _ => {}
2582 }
2583 }
2584 }
2585
2586 fn result_scalar(&self) -> ScalarResult {
2587 ScalarResult::OptionalFloat64(self.value)
2588 }
2589
2590 fn is_empty(&self) -> bool {
2591 self.value.is_none()
2592 }
2593
2594 fn clone_box(&self) -> Box<dyn DynAccumulator> {
2595 Box::new(self.clone())
2596 }
2597
2598 fn serialize(&self) -> Vec<u8> {
2599 match (self.value, self.timestamp) {
2600 (Some(v), Some(ts)) => {
2601 let mut buf = Vec::with_capacity(17);
2602 buf.push(1);
2603 buf.extend_from_slice(&v.to_le_bytes());
2604 buf.extend_from_slice(&ts.to_le_bytes());
2605 buf
2606 }
2607 _ => vec![0],
2608 }
2609 }
2610
2611 fn result_field(&self) -> Field {
2612 Field::new("first_value_f64", DataType::Float64, true)
2613 }
2614
2615 fn type_tag(&self) -> &'static str {
2616 "first_value_f64"
2617 }
2618
2619 fn as_any(&self) -> &dyn std::any::Any {
2620 self
2621 }
2622}
2623
2624#[derive(Debug, Clone)]
2626pub struct LastValueF64DynFactory {
2627 value_column_index: usize,
2629 timestamp_column_index: usize,
2631 field_name: String,
2633}
2634
2635impl LastValueF64DynFactory {
2636 #[must_use]
2638 pub fn new(
2639 value_column_index: usize,
2640 timestamp_column_index: usize,
2641 field_name: impl Into<String>,
2642 ) -> Self {
2643 Self {
2644 value_column_index,
2645 timestamp_column_index,
2646 field_name: field_name.into(),
2647 }
2648 }
2649}
2650
2651impl DynAggregatorFactory for LastValueF64DynFactory {
2652 fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2653 Box::new(LastValueF64DynAccumulator::new(
2654 self.value_column_index,
2655 self.timestamp_column_index,
2656 ))
2657 }
2658
2659 fn result_field(&self) -> Field {
2660 Field::new(&self.field_name, DataType::Float64, true)
2661 }
2662
2663 fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2664 Box::new(self.clone())
2665 }
2666
2667 fn type_tag(&self) -> &'static str {
2668 "last_value_f64"
2669 }
2670}
2671
2672#[derive(Debug, Clone)]
2674pub struct LastValueF64DynAccumulator {
2675 value_column_index: usize,
2676 timestamp_column_index: usize,
2677 value: Option<f64>,
2678 timestamp: Option<i64>,
2679}
2680
2681impl LastValueF64DynAccumulator {
2682 #[must_use]
2684 pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
2685 Self {
2686 value_column_index,
2687 timestamp_column_index,
2688 value: None,
2689 timestamp: None,
2690 }
2691 }
2692}
2693
2694impl DynAccumulator for LastValueF64DynAccumulator {
2695 fn add_event(&mut self, event: &Event) {
2696 use arrow_array::cast::AsArray;
2697 use arrow_array::types::{Float64Type, Int64Type};
2698
2699 let batch = &event.data;
2700 if self.value_column_index >= batch.num_columns()
2701 || self.timestamp_column_index >= batch.num_columns()
2702 {
2703 return;
2704 }
2705
2706 let val_col = batch.column(self.value_column_index);
2707 let ts_col = batch.column(self.timestamp_column_index);
2708
2709 let Some(val_array) = val_col.as_primitive_opt::<Float64Type>() else {
2710 return;
2711 };
2712 let Some(ts_array) = ts_col.as_primitive_opt::<Int64Type>() else {
2713 return;
2714 };
2715
2716 for i in 0..batch.num_rows() {
2717 if val_array.is_null(i) || ts_array.is_null(i) {
2718 continue;
2719 }
2720 let val = val_array.value(i);
2721 let ts = ts_array.value(i);
2722
2723 match self.timestamp {
2724 None => {
2725 self.value = Some(val);
2726 self.timestamp = Some(ts);
2727 }
2728 Some(existing_ts) if ts >= existing_ts => {
2729 self.value = Some(val);
2730 self.timestamp = Some(ts);
2731 }
2732 _ => {}
2733 }
2734 }
2735 }
2736
2737 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2738 let data = other.serialize();
2739 if data.len() >= 17 && data[0] == 1 {
2740 let other_val = f64::from_le_bytes(data[1..9].try_into().unwrap());
2741 let other_ts = i64::from_le_bytes(data[9..17].try_into().unwrap());
2742 match self.timestamp {
2743 None => {
2744 self.value = Some(other_val);
2745 self.timestamp = Some(other_ts);
2746 }
2747 Some(self_ts) if other_ts >= self_ts => {
2748 self.value = Some(other_val);
2749 self.timestamp = Some(other_ts);
2750 }
2751 _ => {}
2752 }
2753 }
2754 }
2755
2756 fn result_scalar(&self) -> ScalarResult {
2757 ScalarResult::OptionalFloat64(self.value)
2758 }
2759
2760 fn is_empty(&self) -> bool {
2761 self.value.is_none()
2762 }
2763
2764 fn clone_box(&self) -> Box<dyn DynAccumulator> {
2765 Box::new(self.clone())
2766 }
2767
2768 fn serialize(&self) -> Vec<u8> {
2769 match (self.value, self.timestamp) {
2770 (Some(v), Some(ts)) => {
2771 let mut buf = Vec::with_capacity(17);
2772 buf.push(1);
2773 buf.extend_from_slice(&v.to_le_bytes());
2774 buf.extend_from_slice(&ts.to_le_bytes());
2775 buf
2776 }
2777 _ => vec![0],
2778 }
2779 }
2780
2781 fn result_field(&self) -> Field {
2782 Field::new("last_value_f64", DataType::Float64, true)
2783 }
2784
2785 fn type_tag(&self) -> &'static str {
2786 "last_value_f64"
2787 }
2788
2789 fn as_any(&self) -> &dyn std::any::Any {
2790 self
2791 }
2792}
2793
2794pub struct CompositeAggregator {
2815 factories: Vec<Box<dyn DynAggregatorFactory>>,
2817 cached_schema: SchemaRef,
2819}
2820
2821impl std::fmt::Debug for CompositeAggregator {
2822 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2823 f.debug_struct("CompositeAggregator")
2824 .field("num_factories", &self.factories.len())
2825 .finish_non_exhaustive()
2826 }
2827}
2828
2829impl CompositeAggregator {
2830 #[must_use]
2832 pub fn new(factories: Vec<Box<dyn DynAggregatorFactory>>) -> Self {
2833 let mut fields = vec![
2834 Field::new("window_start", DataType::Int64, false),
2835 Field::new("window_end", DataType::Int64, false),
2836 ];
2837 fields.extend(factories.iter().map(|f| f.result_field()));
2838 let cached_schema = Arc::new(Schema::new(fields));
2839 Self {
2840 factories,
2841 cached_schema,
2842 }
2843 }
2844
2845 #[must_use]
2847 pub fn num_aggregates(&self) -> usize {
2848 self.factories.len()
2849 }
2850
2851 #[must_use]
2853 pub fn create_accumulator(&self) -> CompositeAccumulator {
2854 let accumulators = self
2855 .factories
2856 .iter()
2857 .map(|f| f.create_accumulator())
2858 .collect();
2859 CompositeAccumulator { accumulators }
2860 }
2861
2862 #[must_use]
2864 pub fn result_fields(&self) -> Vec<Field> {
2865 self.factories.iter().map(|f| f.result_field()).collect()
2866 }
2867
2868 #[must_use]
2870 pub fn output_schema(&self) -> SchemaRef {
2871 Arc::clone(&self.cached_schema)
2872 }
2873}
2874
2875impl Clone for CompositeAggregator {
2876 fn clone(&self) -> Self {
2877 let factories: Vec<Box<dyn DynAggregatorFactory>> =
2878 self.factories.iter().map(|f| f.clone_box()).collect();
2879 Self {
2880 cached_schema: Arc::clone(&self.cached_schema),
2881 factories,
2882 }
2883 }
2884}
2885
2886pub struct CompositeAccumulator {
2891 accumulators: Vec<Box<dyn DynAccumulator>>,
2893}
2894
2895impl std::fmt::Debug for CompositeAccumulator {
2896 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2897 f.debug_struct("CompositeAccumulator")
2898 .field("num_accumulators", &self.accumulators.len())
2899 .finish()
2900 }
2901}
2902
2903impl CompositeAccumulator {
2904 pub fn add_event(&mut self, event: &Event) {
2906 for acc in &mut self.accumulators {
2907 acc.add_event(event);
2908 }
2909 }
2910
2911 pub fn merge(&mut self, other: &Self) {
2917 assert_eq!(
2918 self.accumulators.len(),
2919 other.accumulators.len(),
2920 "Cannot merge composite accumulators with different sizes"
2921 );
2922 for (self_acc, other_acc) in self.accumulators.iter_mut().zip(&other.accumulators) {
2923 self_acc.merge_dyn(other_acc.as_ref());
2924 }
2925 }
2926
2927 #[must_use]
2929 pub fn results(&self) -> Vec<ScalarResult> {
2930 self.accumulators
2931 .iter()
2932 .map(|a| a.result_scalar())
2933 .collect()
2934 }
2935
2936 #[must_use]
2938 pub fn is_empty(&self) -> bool {
2939 self.accumulators.iter().all(|a| a.is_empty())
2940 }
2941
2942 #[must_use]
2944 #[allow(clippy::cast_possible_truncation)] pub fn serialize(&self) -> Vec<u8> {
2946 let mut buf = Vec::new();
2947 let n = self.accumulators.len() as u32;
2949 buf.extend_from_slice(&n.to_le_bytes());
2950 for acc in &self.accumulators {
2951 let tag = acc.type_tag();
2952 let tag_bytes = tag.as_bytes();
2953 buf.extend_from_slice(&(tag_bytes.len() as u16).to_le_bytes());
2955 buf.extend_from_slice(tag_bytes);
2956 let data = acc.serialize();
2957 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
2958 buf.extend_from_slice(&data);
2959 }
2960 buf
2961 }
2962
2963 #[must_use]
2971 pub fn to_record_batch(&self, window_id: &WindowId, schema: &SchemaRef) -> Option<RecordBatch> {
2972 use arrow_array::{Float64Array, UInt64Array};
2973
2974 let mut columns: Vec<Arc<dyn arrow_array::Array>> = vec![
2975 Arc::new(Int64Array::from(vec![window_id.start])),
2976 Arc::new(Int64Array::from(vec![window_id.end])),
2977 ];
2978
2979 for result in self.results() {
2980 let col: Arc<dyn arrow_array::Array> = match result {
2981 ScalarResult::Int64(v) => Arc::new(Int64Array::from(vec![v])),
2982 ScalarResult::Float64(v) => Arc::new(Float64Array::from(vec![v])),
2983 ScalarResult::UInt64(v) => Arc::new(UInt64Array::from(vec![v])),
2984 ScalarResult::OptionalInt64(v) => Arc::new(Int64Array::from(vec![v])),
2985 ScalarResult::OptionalFloat64(v) => Arc::new(Float64Array::from(vec![v])),
2986 ScalarResult::Null => Arc::new(Int64Array::new_null(1)),
2987 };
2988 columns.push(col);
2989 }
2990
2991 RecordBatch::try_new(Arc::clone(schema), columns).ok()
2992 }
2993
2994 #[must_use]
2996 pub fn num_accumulators(&self) -> usize {
2997 self.accumulators.len()
2998 }
2999}
3000
3001impl Clone for CompositeAccumulator {
3002 fn clone(&self) -> Self {
3003 Self {
3004 accumulators: self.accumulators.iter().map(|a| a.clone_box()).collect(),
3005 }
3006 }
3007}
3008
3009const WINDOW_STATE_PREFIX: &[u8; 4] = b"win:";
3013
3014const WINDOW_STATE_KEY_SIZE: usize = 4 + 16;
3016
3017pub struct TumblingWindowOperator<A: Aggregator> {
3048 assigner: TumblingWindowAssigner,
3050 aggregator: A,
3052 allowed_lateness_ms: i64,
3054 registered_windows: std::collections::HashSet<WindowId>,
3056 periodic_timer_windows: std::collections::HashSet<WindowId>,
3058 emit_strategy: EmitStrategy,
3060 late_data_config: LateDataConfig,
3062 late_data_metrics: LateDataMetrics,
3064 operator_id: String,
3066 output_schema: SchemaRef,
3068 _phantom: PhantomData<A::Acc>,
3070}
3071
3072static OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
3074
3075fn create_window_output_schema() -> SchemaRef {
3079 Arc::new(Schema::new(vec![
3080 Field::new("window_start", DataType::Int64, false),
3081 Field::new("window_end", DataType::Int64, false),
3082 Field::new("result", DataType::Int64, false),
3083 ]))
3084}
3085
3086impl<A: Aggregator> TumblingWindowOperator<A>
3087where
3088 A::Acc: Archive + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
3089 <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
3090 + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
3091{
3092 #[must_use]
3103 pub fn new(
3104 assigner: TumblingWindowAssigner,
3105 aggregator: A,
3106 allowed_lateness: Duration,
3107 ) -> Self {
3108 let operator_num = OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
3109 Self {
3110 assigner,
3111 aggregator,
3112 allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
3114 .expect("Allowed lateness must fit in i64"),
3115 registered_windows: std::collections::HashSet::new(),
3116 periodic_timer_windows: std::collections::HashSet::new(),
3117 emit_strategy: EmitStrategy::default(),
3118 late_data_config: LateDataConfig::default(),
3119 late_data_metrics: LateDataMetrics::new(),
3120 operator_id: format!("tumbling_window_{operator_num}"),
3121 output_schema: create_window_output_schema(),
3122 _phantom: PhantomData,
3123 }
3124 }
3125
3126 #[must_use]
3131 pub fn with_id(
3132 assigner: TumblingWindowAssigner,
3133 aggregator: A,
3134 allowed_lateness: Duration,
3135 operator_id: String,
3136 ) -> Self {
3137 Self {
3138 assigner,
3139 aggregator,
3140 allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
3142 .expect("Allowed lateness must fit in i64"),
3143 registered_windows: std::collections::HashSet::new(),
3144 periodic_timer_windows: std::collections::HashSet::new(),
3145 emit_strategy: EmitStrategy::default(),
3146 late_data_config: LateDataConfig::default(),
3147 late_data_metrics: LateDataMetrics::new(),
3148 operator_id,
3149 output_schema: create_window_output_schema(),
3150 _phantom: PhantomData,
3151 }
3152 }
3153
3154 pub fn set_emit_strategy(&mut self, strategy: EmitStrategy) {
3179 self.emit_strategy = strategy;
3180 }
3181
3182 #[must_use]
3184 pub fn emit_strategy(&self) -> &EmitStrategy {
3185 &self.emit_strategy
3186 }
3187
3188 pub fn set_late_data_config(&mut self, config: LateDataConfig) {
3213 self.late_data_config = config;
3214 }
3215
3216 #[must_use]
3218 pub fn late_data_config(&self) -> &LateDataConfig {
3219 &self.late_data_config
3220 }
3221
3222 #[must_use]
3226 pub fn late_data_metrics(&self) -> &LateDataMetrics {
3227 &self.late_data_metrics
3228 }
3229
3230 pub fn reset_late_data_metrics(&mut self) {
3232 self.late_data_metrics.reset();
3233 }
3234
3235 #[must_use]
3237 pub fn assigner(&self) -> &TumblingWindowAssigner {
3238 &self.assigner
3239 }
3240
3241 #[must_use]
3243 pub fn allowed_lateness_ms(&self) -> i64 {
3244 self.allowed_lateness_ms
3245 }
3246
3247 #[inline]
3252 fn state_key(window_id: &WindowId) -> [u8; WINDOW_STATE_KEY_SIZE] {
3253 let mut key = [0u8; WINDOW_STATE_KEY_SIZE];
3254 key[..4].copy_from_slice(WINDOW_STATE_PREFIX);
3255 let window_key = window_id.to_key_inline();
3256 key[4..20].copy_from_slice(&window_key);
3257 key
3258 }
3259
3260 fn get_accumulator(&self, window_id: &WindowId, state: &dyn StateStore) -> A::Acc {
3262 let key = Self::state_key(window_id);
3263 state
3264 .get_typed::<A::Acc>(&key)
3265 .ok()
3266 .flatten()
3267 .unwrap_or_else(|| self.aggregator.create_accumulator())
3268 }
3269
3270 fn put_accumulator(
3272 window_id: &WindowId,
3273 acc: &A::Acc,
3274 state: &mut dyn StateStore,
3275 ) -> Result<(), OperatorError> {
3276 let key = Self::state_key(window_id);
3277 state
3278 .put_typed(&key, acc)
3279 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
3280 }
3281
3282 fn delete_accumulator(
3284 window_id: &WindowId,
3285 state: &mut dyn StateStore,
3286 ) -> Result<(), OperatorError> {
3287 let key = Self::state_key(window_id);
3288 state
3289 .delete(&key)
3290 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
3291 }
3292
3293 fn is_late(&self, event_time: i64, watermark: i64) -> bool {
3295 let window_id = self.assigner.assign(event_time);
3296 let cleanup_time = window_id.end + self.allowed_lateness_ms;
3297 watermark >= cleanup_time
3298 }
3299
3300 fn maybe_register_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
3302 if !self.registered_windows.contains(&window_id) {
3303 let trigger_time = window_id.end + self.allowed_lateness_ms;
3305 ctx.timers.register_timer(
3306 trigger_time,
3307 Some(window_id.to_key()),
3308 Some(ctx.operator_index),
3309 );
3310 self.registered_windows.insert(window_id);
3311 }
3312 }
3313
3314 fn maybe_register_periodic_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
3320 if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
3321 if !self.periodic_timer_windows.contains(&window_id) {
3322 let interval_ms =
3324 i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
3325 let trigger_time = ctx.processing_time + interval_ms;
3326
3327 let key = Self::periodic_timer_key(&window_id);
3329
3330 ctx.timers
3331 .register_timer(trigger_time, Some(key), Some(ctx.operator_index));
3332 self.periodic_timer_windows.insert(window_id);
3333 }
3334 }
3335 }
3336
3337 #[inline]
3342 fn periodic_timer_key(window_id: &WindowId) -> super::TimerKey {
3343 let mut key = window_id.to_key();
3344 if !key.is_empty() {
3346 key[0] |= 0x80;
3347 }
3348 key
3349 }
3350
3351 #[inline]
3353 fn is_periodic_timer_key(key: &[u8]) -> bool {
3354 !key.is_empty() && (key[0] & 0x80) != 0
3355 }
3356
3357 #[inline]
3359 fn window_id_from_periodic_key(key: &[u8]) -> Option<WindowId> {
3360 if key.len() != 16 {
3361 return None;
3362 }
3363 let mut clean_key = [0u8; 16];
3364 clean_key.copy_from_slice(key);
3365 clean_key[0] &= 0x7F;
3367 WindowId::from_key(&clean_key)
3368 }
3369
3370 fn create_intermediate_result(
3374 &self,
3375 window_id: &WindowId,
3376 state: &dyn crate::state::StateStore,
3377 ) -> Option<Event> {
3378 let acc = self.get_accumulator(window_id, state);
3379
3380 if acc.is_empty() {
3381 return None;
3382 }
3383
3384 let result = acc.result();
3385 let result_i64 = result.to_i64();
3386
3387 let batch = RecordBatch::try_new(
3388 Arc::clone(&self.output_schema),
3389 vec![
3390 Arc::new(Int64Array::from(vec![window_id.start])),
3391 Arc::new(Int64Array::from(vec![window_id.end])),
3392 Arc::new(Int64Array::from(vec![result_i64])),
3393 ],
3394 )
3395 .ok()?;
3396
3397 Some(Event::new(window_id.end, batch))
3398 }
3399
3400 fn handle_periodic_timer(
3402 &mut self,
3403 window_id: WindowId,
3404 ctx: &mut OperatorContext,
3405 ) -> OutputVec {
3406 let mut output = OutputVec::new();
3407
3408 if !self.registered_windows.contains(&window_id) {
3410 self.periodic_timer_windows.remove(&window_id);
3412 return output;
3413 }
3414
3415 if let Some(event) = self.create_intermediate_result(&window_id, ctx.state) {
3417 output.push(Output::Event(event));
3418 }
3419
3420 if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
3422 let interval_ms =
3423 i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
3424 let next_trigger = ctx.processing_time + interval_ms;
3425
3426 let window_close_time = window_id.end + self.allowed_lateness_ms;
3428 if next_trigger < window_close_time {
3429 let key = Self::periodic_timer_key(&window_id);
3430 ctx.timers
3431 .register_timer(next_trigger, Some(key), Some(ctx.operator_index));
3432 }
3433 }
3434
3435 output
3436 }
3437}
3438
3439impl<A: Aggregator> Operator for TumblingWindowOperator<A>
3440where
3441 A::Acc: 'static
3442 + Archive
3443 + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
3444 <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
3445 + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
3446{
3447 fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
3448 let event_time = event.timestamp;
3449
3450 let emitted_watermark = ctx.watermark_generator.on_event(event_time);
3452
3453 let current_wm = ctx.watermark_generator.current_watermark();
3456 if current_wm > i64::MIN && self.is_late(event_time, current_wm) {
3457 let mut output = OutputVec::new();
3458
3459 if self.emit_strategy.drops_late_data() {
3461 self.late_data_metrics.record_dropped();
3462 return output; }
3464
3465 if let Some(side_output_name) = self.late_data_config.side_output() {
3467 self.late_data_metrics.record_side_output();
3469 output.push(Output::SideOutput {
3470 name: side_output_name.to_string(),
3471 event: event.clone(),
3472 });
3473 } else {
3474 self.late_data_metrics.record_dropped();
3476 output.push(Output::LateEvent(event.clone()));
3477 }
3478 return output;
3479 }
3480
3481 let window_id = self.assigner.assign(event_time);
3483
3484 let mut state_updated = false;
3486
3487 if let Some(value) = self.aggregator.extract(event) {
3489 let mut acc = self.get_accumulator(&window_id, ctx.state);
3490 acc.add(value);
3491 if let Err(e) = Self::put_accumulator(&window_id, &acc, ctx.state) {
3492 tracing::error!("Failed to store window state: {e}");
3494 } else {
3495 state_updated = true;
3496 }
3497 }
3498
3499 self.maybe_register_timer(window_id, ctx);
3501
3502 if !self.emit_strategy.suppresses_intermediate() {
3505 self.maybe_register_periodic_timer(window_id, ctx);
3506 }
3507
3508 let mut output = OutputVec::new();
3510 if let Some(wm) = emitted_watermark {
3511 output.push(Output::Watermark(wm.timestamp()));
3512 }
3513
3514 if state_updated {
3516 match &self.emit_strategy {
3517 EmitStrategy::OnUpdate => {
3519 if let Some(event) = self.create_intermediate_result(&window_id, ctx.state) {
3520 output.push(Output::Event(event));
3521 }
3522 }
3523 EmitStrategy::Changelog => {
3525 if let Some(event) = self.create_intermediate_result(&window_id, ctx.state) {
3526 let record = ChangelogRecord::insert(event, ctx.processing_time);
3529 output.push(Output::Changelog(record));
3530 }
3531 }
3532 EmitStrategy::OnWatermark
3534 | EmitStrategy::Periodic(_)
3535 | EmitStrategy::OnWindowClose
3536 | EmitStrategy::Final => {}
3537 }
3538 }
3539
3540 output
3541 }
3542
3543 fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
3544 if Self::is_periodic_timer_key(&timer.key) {
3546 if self.emit_strategy.suppresses_intermediate() {
3548 if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
3550 self.periodic_timer_windows.remove(&window_id);
3551 }
3552 return OutputVec::new();
3553 }
3554
3555 if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
3556 return self.handle_periodic_timer(window_id, ctx);
3557 }
3558 return OutputVec::new();
3559 }
3560
3561 let Some(window_id) = WindowId::from_key(&timer.key) else {
3563 return OutputVec::new();
3564 };
3565
3566 let acc = self.get_accumulator(&window_id, ctx.state);
3568
3569 if acc.is_empty() {
3571 let _ = Self::delete_accumulator(&window_id, ctx.state);
3573 self.registered_windows.remove(&window_id);
3574 self.periodic_timer_windows.remove(&window_id);
3575 return OutputVec::new();
3576 }
3577
3578 let result = acc.result();
3580
3581 let _ = Self::delete_accumulator(&window_id, ctx.state);
3583 self.registered_windows.remove(&window_id);
3584 self.periodic_timer_windows.remove(&window_id);
3585
3586 let result_i64 = result.to_i64();
3588
3589 let batch = RecordBatch::try_new(
3591 Arc::clone(&self.output_schema),
3592 vec![
3593 Arc::new(Int64Array::from(vec![window_id.start])),
3594 Arc::new(Int64Array::from(vec![window_id.end])),
3595 Arc::new(Int64Array::from(vec![result_i64])),
3596 ],
3597 );
3598
3599 let mut output = OutputVec::new();
3600 match batch {
3601 Ok(data) => {
3602 let event = Event::new(window_id.end, data);
3603
3604 match &self.emit_strategy {
3606 EmitStrategy::Changelog => {
3608 let record = ChangelogRecord::insert(event, ctx.processing_time);
3609 output.push(Output::Changelog(record));
3610 }
3611 EmitStrategy::OnWatermark
3613 | EmitStrategy::Periodic(_)
3614 | EmitStrategy::OnUpdate
3615 | EmitStrategy::OnWindowClose
3616 | EmitStrategy::Final => {
3617 output.push(Output::Event(event));
3618 }
3619 }
3620 }
3621 Err(e) => {
3622 tracing::error!("Failed to create output batch: {e}");
3623 }
3624 }
3625 output
3626 }
3627
3628 fn checkpoint(&self) -> OperatorState {
3629 let windows: Vec<_> = self.registered_windows.iter().copied().collect();
3631 let periodic_windows: Vec<_> = self.periodic_timer_windows.iter().copied().collect();
3632
3633 let checkpoint_data = (windows, periodic_windows);
3635 let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
3636 .map(|v| v.to_vec())
3637 .unwrap_or_default();
3638
3639 OperatorState {
3640 operator_id: self.operator_id.clone(),
3641 data,
3642 }
3643 }
3644
3645 fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
3646 if state.operator_id != self.operator_id {
3647 return Err(OperatorError::StateAccessFailed(format!(
3648 "Operator ID mismatch: expected {}, got {}",
3649 self.operator_id, state.operator_id
3650 )));
3651 }
3652
3653 if let Ok(archived) =
3655 rkyv::access::<rkyv::Archived<(Vec<WindowId>, Vec<WindowId>)>, RkyvError>(&state.data)
3656 {
3657 if let Ok((windows, periodic_windows)) =
3658 rkyv::deserialize::<(Vec<WindowId>, Vec<WindowId>), RkyvError>(archived)
3659 {
3660 self.registered_windows = windows.into_iter().collect();
3661 self.periodic_timer_windows = periodic_windows.into_iter().collect();
3662 return Ok(());
3663 }
3664 }
3665
3666 let archived = rkyv::access::<rkyv::Archived<Vec<WindowId>>, RkyvError>(&state.data)
3668 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
3669 let windows: Vec<WindowId> = rkyv::deserialize::<Vec<WindowId>, RkyvError>(archived)
3670 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
3671
3672 self.registered_windows = windows.into_iter().collect();
3673 self.periodic_timer_windows = std::collections::HashSet::new();
3674 Ok(())
3675 }
3676}
3677
3678#[cfg(test)]
3679mod tests {
3680 use super::*;
3681 use crate::state::InMemoryStore;
3682 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
3683 use arrow_array::{Int64Array, RecordBatch};
3684 use arrow_schema::{DataType, Field, Schema};
3685 use std::sync::Arc;
3686
3687 fn create_test_event(timestamp: i64, value: i64) -> Event {
3688 let schema = Arc::new(Schema::new(vec![Field::new(
3689 "value",
3690 DataType::Int64,
3691 false,
3692 )]));
3693 let batch =
3694 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
3695 Event::new(timestamp, batch)
3696 }
3697
3698 fn create_test_context<'a>(
3699 timers: &'a mut TimerService,
3700 state: &'a mut dyn StateStore,
3701 watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
3702 ) -> OperatorContext<'a> {
3703 OperatorContext {
3704 event_time: 0,
3705 processing_time: 0,
3706 timers,
3707 state,
3708 watermark_generator: watermark_gen,
3709 operator_index: 0,
3710 }
3711 }
3712
3713 #[test]
3714 fn test_window_id_creation() {
3715 let window = WindowId::new(1000, 2000);
3716 assert_eq!(window.start, 1000);
3717 assert_eq!(window.end, 2000);
3718 assert_eq!(window.duration_ms(), 1000);
3719 }
3720
3721 #[test]
3722 fn test_window_id_serialization() {
3723 let window = WindowId::new(1000, 2000);
3724 let key = window.to_key();
3725 assert_eq!(key.len(), 16);
3726
3727 let restored = WindowId::from_key(&key).unwrap();
3728 assert_eq!(restored, window);
3729 }
3730
3731 #[test]
3732 fn test_tumbling_assigner_positive_timestamps() {
3733 let assigner = TumblingWindowAssigner::from_millis(1000);
3734
3735 assert_eq!(assigner.assign(0), WindowId::new(0, 1000));
3737 assert_eq!(assigner.assign(500), WindowId::new(0, 1000));
3738 assert_eq!(assigner.assign(999), WindowId::new(0, 1000));
3739
3740 assert_eq!(assigner.assign(1000), WindowId::new(1000, 2000));
3742 assert_eq!(assigner.assign(1500), WindowId::new(1000, 2000));
3743 }
3744
3745 #[test]
3746 fn test_tumbling_assigner_negative_timestamps() {
3747 let assigner = TumblingWindowAssigner::from_millis(1000);
3748
3749 assert_eq!(assigner.assign(-1), WindowId::new(-1000, 0));
3751 assert_eq!(assigner.assign(-500), WindowId::new(-1000, 0));
3752 assert_eq!(assigner.assign(-1000), WindowId::new(-1000, 0));
3753 assert_eq!(assigner.assign(-1001), WindowId::new(-2000, -1000));
3754 }
3755
3756 #[test]
3757 fn test_count_aggregator() {
3758 let mut acc = CountAccumulator::default();
3759 assert!(acc.is_empty());
3760 assert_eq!(acc.result(), 0);
3761
3762 acc.add(());
3763 acc.add(());
3764 acc.add(());
3765
3766 assert!(!acc.is_empty());
3767 assert_eq!(acc.result(), 3);
3768 }
3769
3770 #[test]
3771 fn test_sum_accumulator() {
3772 let mut acc = SumAccumulator::default();
3773 acc.add(10);
3774 acc.add(20);
3775 acc.add(30);
3776
3777 assert_eq!(acc.result(), 60);
3778 }
3779
3780 #[test]
3781 fn test_min_accumulator() {
3782 let mut acc = MinAccumulator::default();
3783 assert!(acc.is_empty());
3784 assert_eq!(acc.result(), None);
3785
3786 acc.add(50);
3787 acc.add(10);
3788 acc.add(30);
3789
3790 assert_eq!(acc.result(), Some(10));
3791 }
3792
3793 #[test]
3794 fn test_max_accumulator() {
3795 let mut acc = MaxAccumulator::default();
3796 acc.add(10);
3797 acc.add(50);
3798 acc.add(30);
3799
3800 assert_eq!(acc.result(), Some(50));
3801 }
3802
3803 #[test]
3804 fn test_avg_accumulator() {
3805 let mut acc = AvgAccumulator::default();
3806 acc.add(10);
3807 acc.add(20);
3808 acc.add(30);
3809
3810 let result = acc.result().unwrap();
3811 assert!((result - 20.0).abs() < f64::EPSILON);
3812 }
3813
3814 #[test]
3815 fn test_accumulator_merge() {
3816 let mut acc1 = SumAccumulator::default();
3817 acc1.add(10);
3818 acc1.add(20);
3819
3820 let mut acc2 = SumAccumulator::default();
3821 acc2.add(30);
3822 acc2.add(40);
3823
3824 acc1.merge(&acc2);
3825 assert_eq!(acc1.result(), 100);
3826 }
3827
3828 #[test]
3829 fn test_tumbling_window_operator_basic() {
3830 let assigner = TumblingWindowAssigner::from_millis(1000);
3831 let aggregator = CountAggregator::new();
3832 let mut operator = TumblingWindowOperator::with_id(
3833 assigner,
3834 aggregator,
3835 Duration::from_millis(0),
3836 "test_op".to_string(),
3837 );
3838
3839 let mut timers = TimerService::new();
3840 let mut state = InMemoryStore::new();
3841 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
3842
3843 let event1 = create_test_event(100, 1);
3845 let event2 = create_test_event(500, 2);
3846 let event3 = create_test_event(900, 3);
3847
3848 {
3849 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3850 operator.process(&event1, &mut ctx);
3851 }
3852 {
3853 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3854 operator.process(&event2, &mut ctx);
3855 }
3856 {
3857 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3858 operator.process(&event3, &mut ctx);
3859 }
3860
3861 assert_eq!(operator.registered_windows.len(), 1);
3863 assert!(operator
3864 .registered_windows
3865 .contains(&WindowId::new(0, 1000)));
3866 }
3867
3868 #[test]
3869 fn test_tumbling_window_operator_trigger() {
3870 let assigner = TumblingWindowAssigner::from_millis(1000);
3871 let aggregator = CountAggregator::new();
3872 let mut operator = TumblingWindowOperator::with_id(
3873 assigner,
3874 aggregator,
3875 Duration::from_millis(0),
3876 "test_op".to_string(),
3877 );
3878
3879 let mut timers = TimerService::new();
3880 let mut state = InMemoryStore::new();
3881 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
3882
3883 for ts in [100, 500, 900] {
3885 let event = create_test_event(ts, 1);
3886 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3887 operator.process(&event, &mut ctx);
3888 }
3889
3890 let timer = Timer {
3892 key: WindowId::new(0, 1000).to_key(),
3893 timestamp: 1000,
3894 };
3895
3896 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3897 let outputs = operator.on_timer(timer, &mut ctx);
3898
3899 assert_eq!(outputs.len(), 1);
3900 match &outputs[0] {
3901 Output::Event(event) => {
3902 assert_eq!(event.timestamp, 1000); let result_col = event.data.column(2);
3905 let result_array = result_col.as_any().downcast_ref::<Int64Array>().unwrap();
3906 assert_eq!(result_array.value(0), 3);
3907 }
3908 _ => panic!("Expected Event output"),
3909 }
3910
3911 assert!(operator.registered_windows.is_empty());
3913 }
3914
3915 #[test]
3916 fn test_tumbling_window_multiple_windows() {
3917 let assigner = TumblingWindowAssigner::from_millis(1000);
3918 let aggregator = CountAggregator::new();
3919 let mut operator = TumblingWindowOperator::with_id(
3920 assigner,
3921 aggregator,
3922 Duration::from_millis(0),
3923 "test_op".to_string(),
3924 );
3925
3926 let mut timers = TimerService::new();
3927 let mut state = InMemoryStore::new();
3928 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
3929
3930 let events = [
3932 create_test_event(100, 1), create_test_event(500, 2), create_test_event(1100, 3), create_test_event(1500, 4), create_test_event(2500, 5), ];
3938
3939 for event in &events {
3940 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3941 operator.process(event, &mut ctx);
3942 }
3943
3944 assert_eq!(operator.registered_windows.len(), 3);
3946 }
3947
3948 #[test]
3949 fn test_tumbling_window_checkpoint_restore() {
3950 let assigner = TumblingWindowAssigner::from_millis(1000);
3951 let aggregator = CountAggregator::new();
3952 let mut operator = TumblingWindowOperator::with_id(
3953 assigner.clone(),
3954 aggregator.clone(),
3955 Duration::from_millis(0),
3956 "test_op".to_string(),
3957 );
3958
3959 operator.registered_windows.insert(WindowId::new(0, 1000));
3961 operator
3962 .registered_windows
3963 .insert(WindowId::new(1000, 2000));
3964
3965 let checkpoint = operator.checkpoint();
3967
3968 let mut restored_operator = TumblingWindowOperator::with_id(
3970 assigner,
3971 aggregator,
3972 Duration::from_millis(0),
3973 "test_op".to_string(),
3974 );
3975 restored_operator.restore(checkpoint).unwrap();
3976
3977 assert_eq!(restored_operator.registered_windows.len(), 2);
3978 assert!(restored_operator
3979 .registered_windows
3980 .contains(&WindowId::new(0, 1000)));
3981 assert!(restored_operator
3982 .registered_windows
3983 .contains(&WindowId::new(1000, 2000)));
3984 }
3985
3986 #[test]
3987 fn test_sum_aggregator_extraction() {
3988 let aggregator = SumAggregator::new(0);
3989 let event = create_test_event(100, 42);
3990
3991 let extracted = aggregator.extract(&event);
3992 assert_eq!(extracted, Some(42));
3993 }
3994
3995 #[test]
3996 fn test_empty_window_trigger() {
3997 let assigner = TumblingWindowAssigner::from_millis(1000);
3998 let aggregator = CountAggregator::new();
3999 let mut operator = TumblingWindowOperator::with_id(
4000 assigner,
4001 aggregator,
4002 Duration::from_millis(0),
4003 "test_op".to_string(),
4004 );
4005
4006 let mut timers = TimerService::new();
4007 let mut state = InMemoryStore::new();
4008 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
4009
4010 let timer = Timer {
4012 key: WindowId::new(0, 1000).to_key(),
4013 timestamp: 1000,
4014 };
4015
4016 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4017 let outputs = operator.on_timer(timer, &mut ctx);
4018
4019 assert!(outputs.is_empty());
4021 }
4022
4023 #[test]
4024 fn test_window_assigner_trait() {
4025 let assigner = TumblingWindowAssigner::from_millis(1000);
4026 let windows = assigner.assign_windows(500);
4027
4028 assert_eq!(windows.len(), 1);
4029 assert_eq!(windows[0], WindowId::new(0, 1000));
4030 }
4031
4032 #[test]
4033 fn test_emit_strategy_default() {
4034 let strategy = EmitStrategy::default();
4035 assert_eq!(strategy, EmitStrategy::OnWatermark);
4036 }
4037
4038 #[test]
4039 fn test_emit_strategy_on_watermark() {
4040 let strategy = EmitStrategy::OnWatermark;
4041 assert!(!strategy.needs_periodic_timer());
4042 assert!(strategy.periodic_interval().is_none());
4043 assert!(!strategy.emits_on_update());
4044 }
4045
4046 #[test]
4047 fn test_emit_strategy_periodic() {
4048 let interval = Duration::from_secs(10);
4049 let strategy = EmitStrategy::Periodic(interval);
4050 assert!(strategy.needs_periodic_timer());
4051 assert_eq!(strategy.periodic_interval(), Some(interval));
4052 assert!(!strategy.emits_on_update());
4053 }
4054
4055 #[test]
4056 fn test_emit_strategy_on_update() {
4057 let strategy = EmitStrategy::OnUpdate;
4058 assert!(!strategy.needs_periodic_timer());
4059 assert!(strategy.periodic_interval().is_none());
4060 assert!(strategy.emits_on_update());
4061 }
4062
4063 #[test]
4064 fn test_window_operator_set_emit_strategy() {
4065 let assigner = TumblingWindowAssigner::from_millis(1000);
4066 let aggregator = CountAggregator::new();
4067 let mut operator = TumblingWindowOperator::with_id(
4068 assigner,
4069 aggregator,
4070 Duration::from_millis(0),
4071 "test_op".to_string(),
4072 );
4073
4074 assert_eq!(*operator.emit_strategy(), EmitStrategy::OnWatermark);
4076
4077 operator.set_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(5)));
4079 assert_eq!(
4080 *operator.emit_strategy(),
4081 EmitStrategy::Periodic(Duration::from_secs(5))
4082 );
4083
4084 operator.set_emit_strategy(EmitStrategy::OnUpdate);
4086 assert_eq!(*operator.emit_strategy(), EmitStrategy::OnUpdate);
4087 }
4088
4089 #[test]
4090 fn test_emit_on_update_emits_intermediate_results() {
4091 let assigner = TumblingWindowAssigner::from_millis(1000);
4092 let aggregator = CountAggregator::new();
4093 let mut operator = TumblingWindowOperator::with_id(
4094 assigner,
4095 aggregator,
4096 Duration::from_millis(0),
4097 "test_op".to_string(),
4098 );
4099
4100 operator.set_emit_strategy(EmitStrategy::OnUpdate);
4102
4103 let mut timers = TimerService::new();
4104 let mut state = InMemoryStore::new();
4105 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
4106
4107 let event1 = create_test_event(100, 1);
4109 let outputs1 = {
4110 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4111 operator.process(&event1, &mut ctx)
4112 };
4113
4114 let has_event = outputs1.iter().any(|o| matches!(o, Output::Event(_)));
4116 assert!(
4117 has_event,
4118 "OnUpdate should emit intermediate result after first event"
4119 );
4120
4121 let event2 = create_test_event(500, 2);
4123 let outputs2 = {
4124 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4125 operator.process(&event2, &mut ctx)
4126 };
4127
4128 let event_output = outputs2.iter().find_map(|o| {
4130 if let Output::Event(e) = o {
4131 Some(e)
4132 } else {
4133 None
4134 }
4135 });
4136
4137 assert!(
4138 event_output.is_some(),
4139 "OnUpdate should emit after second event"
4140 );
4141 if let Some(event) = event_output {
4142 let result_col = event.data.column(2);
4143 let result_array = result_col.as_any().downcast_ref::<Int64Array>().unwrap();
4144 assert_eq!(result_array.value(0), 2, "Intermediate count should be 2");
4145 }
4146 }
4147
4148 #[test]
4149 fn test_emit_on_watermark_no_intermediate_results() {
4150 let assigner = TumblingWindowAssigner::from_millis(1000);
4151 let aggregator = CountAggregator::new();
4152 let mut operator = TumblingWindowOperator::with_id(
4153 assigner,
4154 aggregator,
4155 Duration::from_millis(0),
4156 "test_op".to_string(),
4157 );
4158
4159 let mut timers = TimerService::new();
4161 let mut state = InMemoryStore::new();
4162 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
4163
4164 let event1 = create_test_event(100, 1);
4166 let outputs1 = {
4167 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4168 operator.process(&event1, &mut ctx)
4169 };
4170
4171 let has_intermediate_event = outputs1.iter().any(|o| matches!(o, Output::Event(_)));
4173 assert!(
4174 !has_intermediate_event,
4175 "OnWatermark should not emit intermediate results"
4176 );
4177 }
4178
4179 #[test]
4180 fn test_checkpoint_restore_with_emit_strategy() {
4181 let assigner = TumblingWindowAssigner::from_millis(1000);
4182 let aggregator = CountAggregator::new();
4183 let mut operator = TumblingWindowOperator::with_id(
4184 assigner.clone(),
4185 aggregator.clone(),
4186 Duration::from_millis(0),
4187 "test_op".to_string(),
4188 );
4189
4190 operator.set_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(10)));
4192 operator.registered_windows.insert(WindowId::new(0, 1000));
4193 operator
4194 .periodic_timer_windows
4195 .insert(WindowId::new(0, 1000));
4196
4197 let checkpoint = operator.checkpoint();
4199
4200 let mut restored_operator = TumblingWindowOperator::with_id(
4202 assigner,
4203 aggregator,
4204 Duration::from_millis(0),
4205 "test_op".to_string(),
4206 );
4207 restored_operator.restore(checkpoint).unwrap();
4208
4209 assert_eq!(restored_operator.registered_windows.len(), 1);
4211 assert_eq!(restored_operator.periodic_timer_windows.len(), 1);
4212 assert!(restored_operator
4213 .registered_windows
4214 .contains(&WindowId::new(0, 1000)));
4215 assert!(restored_operator
4216 .periodic_timer_windows
4217 .contains(&WindowId::new(0, 1000)));
4218 }
4219
4220 #[test]
4221 fn test_periodic_timer_key_format() {
4222 let window_id = WindowId::new(1000, 2000);
4224
4225 let periodic_key =
4227 TumblingWindowOperator::<CountAggregator>::periodic_timer_key(&window_id);
4228
4229 assert_eq!(periodic_key.len(), 16);
4231
4232 assert!(TumblingWindowOperator::<CountAggregator>::is_periodic_timer_key(&periodic_key));
4234
4235 let extracted =
4237 TumblingWindowOperator::<CountAggregator>::window_id_from_periodic_key(&periodic_key);
4238 assert_eq!(extracted, Some(window_id));
4239
4240 let regular_key = window_id.to_key();
4242 assert!(!TumblingWindowOperator::<CountAggregator>::is_periodic_timer_key(®ular_key));
4243 }
4244
4245 #[test]
4246 fn test_late_data_config_default() {
4247 let config = LateDataConfig::default();
4248 assert!(config.should_drop());
4249 assert!(config.side_output().is_none());
4250 }
4251
4252 #[test]
4253 fn test_late_data_config_drop() {
4254 let config = LateDataConfig::drop();
4255 assert!(config.should_drop());
4256 assert!(config.side_output().is_none());
4257 }
4258
4259 #[test]
4260 fn test_late_data_config_with_side_output() {
4261 let config = LateDataConfig::with_side_output("late_events".to_string());
4262 assert!(!config.should_drop());
4263 assert_eq!(config.side_output(), Some("late_events"));
4264 }
4265
4266 #[test]
4267 fn test_late_data_metrics_initial() {
4268 let metrics = LateDataMetrics::new();
4269 assert_eq!(metrics.late_events_total(), 0);
4270 assert_eq!(metrics.late_events_dropped(), 0);
4271 assert_eq!(metrics.late_events_side_output(), 0);
4272 }
4273
4274 #[test]
4275 fn test_late_data_metrics_tracking() {
4276 let mut metrics = LateDataMetrics::new();
4277
4278 metrics.record_dropped();
4279 metrics.record_dropped();
4280 metrics.record_side_output();
4281
4282 assert_eq!(metrics.late_events_total(), 3);
4283 assert_eq!(metrics.late_events_dropped(), 2);
4284 assert_eq!(metrics.late_events_side_output(), 1);
4285 }
4286
4287 #[test]
4288 fn test_late_data_metrics_reset() {
4289 let mut metrics = LateDataMetrics::new();
4290
4291 metrics.record_dropped();
4292 metrics.record_side_output();
4293
4294 assert_eq!(metrics.late_events_total(), 2);
4295
4296 metrics.reset();
4297
4298 assert_eq!(metrics.late_events_total(), 0);
4299 assert_eq!(metrics.late_events_dropped(), 0);
4300 assert_eq!(metrics.late_events_side_output(), 0);
4301 }
4302
4303 #[test]
4304 fn test_window_operator_set_late_data_config() {
4305 let assigner = TumblingWindowAssigner::from_millis(1000);
4306 let aggregator = CountAggregator::new();
4307 let mut operator = TumblingWindowOperator::with_id(
4308 assigner,
4309 aggregator,
4310 Duration::from_millis(100),
4311 "test_op".to_string(),
4312 );
4313
4314 assert!(operator.late_data_config().should_drop());
4316
4317 operator.set_late_data_config(LateDataConfig::with_side_output("late".to_string()));
4319 assert!(!operator.late_data_config().should_drop());
4320 assert_eq!(operator.late_data_config().side_output(), Some("late"));
4321 }
4322
4323 #[test]
4324 fn test_late_event_dropped_without_side_output() {
4325 let assigner = TumblingWindowAssigner::from_millis(1000);
4326 let aggregator = CountAggregator::new();
4327 let mut operator = TumblingWindowOperator::with_id(
4328 assigner,
4329 aggregator,
4330 Duration::from_millis(0), "test_op".to_string(),
4332 );
4333
4334 let mut timers = TimerService::new();
4336 let mut state = InMemoryStore::new();
4337 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
4339
4340 let event1 = create_test_event(1000, 1);
4342 {
4343 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4344 operator.process(&event1, &mut ctx);
4345 }
4346
4347 let late_event = create_test_event(500, 2);
4349 let outputs = {
4350 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4351 operator.process(&late_event, &mut ctx)
4352 };
4353
4354 assert!(!outputs.is_empty());
4356 let is_late_event = outputs.iter().any(|o| matches!(o, Output::LateEvent(_)));
4357 assert!(is_late_event, "Expected LateEvent output");
4358
4359 assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
4361 assert_eq!(operator.late_data_metrics().late_events_side_output(), 0);
4362 }
4363
4364 #[test]
4365 fn test_late_event_routed_to_side_output() {
4366 let assigner = TumblingWindowAssigner::from_millis(1000);
4367 let aggregator = CountAggregator::new();
4368 let mut operator = TumblingWindowOperator::with_id(
4369 assigner,
4370 aggregator,
4371 Duration::from_millis(0), "test_op".to_string(),
4373 );
4374
4375 operator.set_late_data_config(LateDataConfig::with_side_output("late_events".to_string()));
4377
4378 let mut timers = TimerService::new();
4379 let mut state = InMemoryStore::new();
4380 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
4381
4382 let event1 = create_test_event(1000, 1);
4384 {
4385 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4386 operator.process(&event1, &mut ctx);
4387 }
4388
4389 let late_event = create_test_event(500, 2);
4391 let outputs = {
4392 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4393 operator.process(&late_event, &mut ctx)
4394 };
4395
4396 assert!(!outputs.is_empty());
4398 let side_output = outputs.iter().find_map(|o| {
4399 if let Output::SideOutput { name, .. } = o {
4400 Some(name.clone())
4401 } else {
4402 None
4403 }
4404 });
4405 assert_eq!(side_output, Some("late_events".to_string()));
4406
4407 assert_eq!(operator.late_data_metrics().late_events_dropped(), 0);
4409 assert_eq!(operator.late_data_metrics().late_events_side_output(), 1);
4410 }
4411
4412 #[test]
4413 fn test_event_within_lateness_not_late() {
4414 let assigner = TumblingWindowAssigner::from_millis(1000);
4415 let aggregator = CountAggregator::new();
4416 let mut operator = TumblingWindowOperator::with_id(
4417 assigner,
4418 aggregator,
4419 Duration::from_millis(500), "test_op".to_string(),
4421 );
4422
4423 let mut timers = TimerService::new();
4424 let mut state = InMemoryStore::new();
4425 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
4426
4427 let event1 = create_test_event(1200, 1);
4431 {
4432 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4433 operator.process(&event1, &mut ctx);
4434 }
4435
4436 let event2 = create_test_event(800, 2);
4440 let outputs = {
4441 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4442 operator.process(&event2, &mut ctx)
4443 };
4444
4445 let is_late_event = outputs
4447 .iter()
4448 .any(|o| matches!(o, Output::LateEvent(_) | Output::SideOutput { .. }));
4449 assert!(
4450 !is_late_event,
4451 "Event within lateness period should not be marked as late"
4452 );
4453
4454 assert_eq!(operator.late_data_metrics().late_events_total(), 0);
4456 }
4457
4458 #[test]
4459 fn test_reset_late_data_metrics() {
4460 let assigner = TumblingWindowAssigner::from_millis(1000);
4461 let aggregator = CountAggregator::new();
4462 let mut operator = TumblingWindowOperator::with_id(
4463 assigner,
4464 aggregator,
4465 Duration::from_millis(0),
4466 "test_op".to_string(),
4467 );
4468
4469 let mut timers = TimerService::new();
4470 let mut state = InMemoryStore::new();
4471 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
4472
4473 let event1 = create_test_event(1000, 1);
4475 {
4476 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4477 operator.process(&event1, &mut ctx);
4478 }
4479 let late_event = create_test_event(500, 2);
4480 {
4481 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4482 operator.process(&late_event, &mut ctx);
4483 }
4484
4485 assert_eq!(operator.late_data_metrics().late_events_total(), 1);
4486
4487 operator.reset_late_data_metrics();
4489
4490 assert_eq!(operator.late_data_metrics().late_events_total(), 0);
4491 }
4492
4493 #[test]
4494 fn test_emit_strategy_helper_methods() {
4495 assert!(!EmitStrategy::OnWatermark.emits_intermediate());
4497 assert!(!EmitStrategy::OnWatermark.requires_changelog());
4498 assert!(!EmitStrategy::OnWatermark.is_append_only_compatible());
4499 assert!(EmitStrategy::OnWatermark.generates_retractions());
4500 assert!(!EmitStrategy::OnWatermark.suppresses_intermediate());
4501 assert!(!EmitStrategy::OnWatermark.drops_late_data());
4502
4503 assert!(EmitStrategy::OnUpdate.emits_intermediate());
4505 assert!(!EmitStrategy::OnUpdate.requires_changelog());
4506 assert!(!EmitStrategy::OnUpdate.is_append_only_compatible());
4507 assert!(EmitStrategy::OnUpdate.generates_retractions());
4508 assert!(!EmitStrategy::OnUpdate.suppresses_intermediate());
4509
4510 let periodic = EmitStrategy::Periodic(Duration::from_secs(10));
4512 assert!(periodic.emits_intermediate());
4513 assert!(!periodic.requires_changelog());
4514 assert!(!periodic.is_append_only_compatible());
4515 assert!(!periodic.generates_retractions());
4516 assert!(!periodic.suppresses_intermediate());
4517
4518 assert!(!EmitStrategy::OnWindowClose.emits_intermediate());
4520 assert!(!EmitStrategy::OnWindowClose.requires_changelog());
4521 assert!(EmitStrategy::OnWindowClose.is_append_only_compatible());
4522 assert!(!EmitStrategy::OnWindowClose.generates_retractions());
4523 assert!(EmitStrategy::OnWindowClose.suppresses_intermediate());
4524 assert!(!EmitStrategy::OnWindowClose.drops_late_data());
4525
4526 assert!(!EmitStrategy::Changelog.emits_intermediate());
4528 assert!(EmitStrategy::Changelog.requires_changelog());
4529 assert!(!EmitStrategy::Changelog.is_append_only_compatible());
4530 assert!(EmitStrategy::Changelog.generates_retractions());
4531 assert!(!EmitStrategy::Changelog.suppresses_intermediate());
4532
4533 assert!(!EmitStrategy::Final.emits_intermediate());
4535 assert!(!EmitStrategy::Final.requires_changelog());
4536 assert!(EmitStrategy::Final.is_append_only_compatible());
4537 assert!(!EmitStrategy::Final.generates_retractions());
4538 assert!(EmitStrategy::Final.suppresses_intermediate());
4539 assert!(EmitStrategy::Final.drops_late_data());
4540 }
4541
4542 #[test]
4543 fn test_cdc_operation() {
4544 assert_eq!(CdcOperation::Insert.weight(), 1);
4545 assert_eq!(CdcOperation::Delete.weight(), -1);
4546 assert_eq!(CdcOperation::UpdateBefore.weight(), -1);
4547 assert_eq!(CdcOperation::UpdateAfter.weight(), 1);
4548
4549 assert!(CdcOperation::Insert.is_insert());
4550 assert!(CdcOperation::UpdateAfter.is_insert());
4551 assert!(!CdcOperation::Delete.is_insert());
4552 assert!(!CdcOperation::UpdateBefore.is_insert());
4553
4554 assert!(CdcOperation::Delete.is_delete());
4555 assert!(CdcOperation::UpdateBefore.is_delete());
4556 assert!(!CdcOperation::Insert.is_delete());
4557 assert!(!CdcOperation::UpdateAfter.is_delete());
4558
4559 assert_eq!(CdcOperation::Insert.debezium_op(), 'c');
4560 assert_eq!(CdcOperation::Delete.debezium_op(), 'd');
4561 assert_eq!(CdcOperation::UpdateBefore.debezium_op(), 'u');
4562 assert_eq!(CdcOperation::UpdateAfter.debezium_op(), 'u');
4563 }
4564
4565 #[test]
4566 fn test_changelog_record_insert() {
4567 let event = create_test_event(1000, 42);
4568 let record = ChangelogRecord::insert(event.clone(), 2000);
4569
4570 assert_eq!(record.operation, CdcOperation::Insert);
4571 assert_eq!(record.weight, 1);
4572 assert_eq!(record.emit_timestamp, 2000);
4573 assert_eq!(record.event.timestamp, 1000);
4574 assert!(record.is_insert());
4575 assert!(!record.is_delete());
4576 }
4577
4578 #[test]
4579 fn test_changelog_record_delete() {
4580 let event = create_test_event(1000, 42);
4581 let record = ChangelogRecord::delete(event.clone(), 2000);
4582
4583 assert_eq!(record.operation, CdcOperation::Delete);
4584 assert_eq!(record.weight, -1);
4585 assert_eq!(record.emit_timestamp, 2000);
4586 assert!(record.is_delete());
4587 assert!(!record.is_insert());
4588 }
4589
4590 #[test]
4591 fn test_changelog_record_update() {
4592 let old_event = create_test_event(1000, 10);
4593 let new_event = create_test_event(1000, 20);
4594 let (before, after) = ChangelogRecord::update(old_event, new_event, 2000);
4595
4596 assert_eq!(before.operation, CdcOperation::UpdateBefore);
4597 assert_eq!(before.weight, -1);
4598 assert!(before.is_delete());
4599
4600 assert_eq!(after.operation, CdcOperation::UpdateAfter);
4601 assert_eq!(after.weight, 1);
4602 assert!(after.is_insert());
4603 }
4604
4605 #[test]
4606 fn test_emit_strategy_on_window_close() {
4607 let assigner = TumblingWindowAssigner::from_millis(1000);
4608 let aggregator = CountAggregator::new();
4609 let mut operator = TumblingWindowOperator::with_id(
4610 assigner,
4611 aggregator,
4612 Duration::from_millis(0),
4613 "test_op".to_string(),
4614 );
4615
4616 operator.set_emit_strategy(EmitStrategy::OnWindowClose);
4618
4619 let mut timers = TimerService::new();
4620 let mut state = InMemoryStore::new();
4621 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
4622
4623 let event1 = create_test_event(100, 1);
4625 let event2 = create_test_event(200, 2);
4626
4627 let outputs1 = {
4628 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4629 operator.process(&event1, &mut ctx)
4630 };
4631 let outputs2 = {
4632 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4633 operator.process(&event2, &mut ctx)
4634 };
4635
4636 let event_outputs1: Vec<_> = outputs1
4638 .iter()
4639 .filter(|o| matches!(o, Output::Event(_)))
4640 .collect();
4641 let event_outputs2: Vec<_> = outputs2
4642 .iter()
4643 .filter(|o| matches!(o, Output::Event(_)))
4644 .collect();
4645
4646 assert!(
4647 event_outputs1.is_empty(),
4648 "OnWindowClose should not emit intermediate results"
4649 );
4650 assert!(
4651 event_outputs2.is_empty(),
4652 "OnWindowClose should not emit intermediate results"
4653 );
4654 }
4655
4656 #[test]
4657 fn test_emit_strategy_final_drops_late_data() {
4658 let assigner = TumblingWindowAssigner::from_millis(1000);
4659 let aggregator = CountAggregator::new();
4660 let mut operator = TumblingWindowOperator::with_id(
4661 assigner,
4662 aggregator,
4663 Duration::from_millis(0),
4664 "test_op".to_string(),
4665 );
4666
4667 operator.set_emit_strategy(EmitStrategy::Final);
4669
4670 let mut timers = TimerService::new();
4671 let mut state = InMemoryStore::new();
4672 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
4673
4674 let event1 = create_test_event(1500, 1);
4676 {
4677 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4678 operator.process(&event1, &mut ctx);
4679 }
4680
4681 let late_event = create_test_event(500, 2);
4683 let outputs = {
4684 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4685 operator.process(&late_event, &mut ctx)
4686 };
4687
4688 assert!(
4690 outputs.is_empty(),
4691 "EMIT FINAL should silently drop late data"
4692 );
4693 assert_eq!(
4694 operator.late_data_metrics().late_events_dropped(),
4695 1,
4696 "Late event should be recorded as dropped"
4697 );
4698 }
4699
4700 #[test]
4701 fn test_emit_strategy_changelog_emits_records() {
4702 let assigner = TumblingWindowAssigner::from_millis(1000);
4703 let aggregator = CountAggregator::new();
4704 let mut operator = TumblingWindowOperator::with_id(
4705 assigner,
4706 aggregator,
4707 Duration::from_millis(0),
4708 "test_op".to_string(),
4709 );
4710
4711 operator.set_emit_strategy(EmitStrategy::Changelog);
4713
4714 let mut timers = TimerService::new();
4715 let mut state = InMemoryStore::new();
4716 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
4717
4718 let event = create_test_event(100, 1);
4720 let outputs = {
4721 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4722 operator.process(&event, &mut ctx)
4723 };
4724
4725 let changelog_outputs: Vec<_> = outputs
4727 .iter()
4728 .filter(|o| matches!(o, Output::Changelog(_)))
4729 .collect();
4730
4731 assert_eq!(
4732 changelog_outputs.len(),
4733 1,
4734 "Changelog strategy should emit changelog record on update"
4735 );
4736
4737 if let Output::Changelog(record) = &changelog_outputs[0] {
4738 assert_eq!(record.operation, CdcOperation::Insert);
4739 assert_eq!(record.weight, 1);
4740 } else {
4741 panic!("Expected Changelog output");
4742 }
4743 }
4744
4745 #[test]
4746 fn test_emit_strategy_changelog_on_timer() {
4747 let assigner = TumblingWindowAssigner::from_millis(1000);
4748 let aggregator = CountAggregator::new();
4749 let mut operator = TumblingWindowOperator::with_id(
4750 assigner,
4751 aggregator,
4752 Duration::from_millis(0),
4753 "test_op".to_string(),
4754 );
4755
4756 operator.set_emit_strategy(EmitStrategy::Changelog);
4758
4759 let mut timers = TimerService::new();
4760 let mut state = InMemoryStore::new();
4761 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
4762
4763 let event = create_test_event(100, 1);
4765 {
4766 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4767 operator.process(&event, &mut ctx);
4768 }
4769
4770 let timer = Timer {
4772 key: WindowId::new(0, 1000).to_key(),
4773 timestamp: 1000,
4774 };
4775
4776 let outputs = {
4777 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
4778 operator.on_timer(timer, &mut ctx)
4779 };
4780
4781 let changelog_outputs: Vec<_> = outputs
4783 .iter()
4784 .filter(|o| matches!(o, Output::Changelog(_)))
4785 .collect();
4786
4787 assert_eq!(
4788 changelog_outputs.len(),
4789 1,
4790 "Changelog strategy should emit changelog record on timer"
4791 );
4792
4793 if let Output::Changelog(record) = &changelog_outputs[0] {
4794 assert_eq!(record.operation, CdcOperation::Insert);
4795 } else {
4796 panic!("Expected Changelog output");
4797 }
4798 }
4799
4800 #[test]
4803 fn test_first_value_single_event() {
4804 let mut acc = FirstValueAccumulator::default();
4805 assert!(acc.is_empty());
4806 assert_eq!(acc.result(), None);
4807
4808 acc.add((100, 1000));
4809 assert!(!acc.is_empty());
4810 assert_eq!(acc.result(), Some(100));
4811 }
4812
4813 #[test]
4814 fn test_first_value_multiple_events() {
4815 let mut acc = FirstValueAccumulator::default();
4816 acc.add((100, 1000)); acc.add((200, 500)); acc.add((300, 1500)); assert_eq!(acc.result(), Some(200));
4822 }
4823
4824 #[test]
4825 fn test_first_value_same_timestamp() {
4826 let mut acc = FirstValueAccumulator::default();
4827 acc.add((100, 1000));
4828 acc.add((200, 1000)); assert_eq!(acc.result(), Some(100));
4831 }
4832
4833 #[test]
4834 fn test_first_value_merge() {
4835 let mut acc1 = FirstValueAccumulator::default();
4836 acc1.add((100, 1000));
4837
4838 let mut acc2 = FirstValueAccumulator::default();
4839 acc2.add((200, 500)); acc1.merge(&acc2);
4842 assert_eq!(acc1.result(), Some(200)); }
4844
4845 #[test]
4846 fn test_first_value_merge_empty() {
4847 let mut acc1 = FirstValueAccumulator::default();
4848 acc1.add((100, 1000));
4849
4850 let acc2 = FirstValueAccumulator::default(); acc1.merge(&acc2);
4853 assert_eq!(acc1.result(), Some(100)); let mut acc3 = FirstValueAccumulator::default(); acc3.merge(&acc1);
4857 assert_eq!(acc3.result(), Some(100)); }
4859
4860 #[test]
4861 fn test_last_value_single_event() {
4862 let mut acc = LastValueAccumulator::default();
4863 assert!(acc.is_empty());
4864 assert_eq!(acc.result(), None);
4865
4866 acc.add((100, 1000));
4867 assert!(!acc.is_empty());
4868 assert_eq!(acc.result(), Some(100));
4869 }
4870
4871 #[test]
4872 fn test_last_value_multiple_events() {
4873 let mut acc = LastValueAccumulator::default();
4874 acc.add((100, 1000));
4875 acc.add((200, 500)); acc.add((300, 1500)); assert_eq!(acc.result(), Some(300));
4879 }
4880
4881 #[test]
4882 fn test_last_value_same_timestamp() {
4883 let mut acc = LastValueAccumulator::default();
4884 acc.add((100, 1000));
4885 acc.add((200, 1000)); assert_eq!(acc.result(), Some(200));
4888 }
4889
4890 #[test]
4891 fn test_last_value_merge() {
4892 let mut acc1 = LastValueAccumulator::default();
4893 acc1.add((100, 1000));
4894
4895 let mut acc2 = LastValueAccumulator::default();
4896 acc2.add((200, 1500)); acc1.merge(&acc2);
4899 assert_eq!(acc1.result(), Some(200)); }
4901
4902 #[test]
4903 fn test_last_value_merge_same_timestamp() {
4904 let mut acc1 = LastValueAccumulator::default();
4905 acc1.add((100, 1000));
4906
4907 let mut acc2 = LastValueAccumulator::default();
4908 acc2.add((200, 1000)); acc1.merge(&acc2);
4911 assert_eq!(acc1.result(), Some(200)); }
4913
4914 #[test]
4915 fn test_first_value_f64_basic() {
4916 let mut acc = FirstValueF64Accumulator::default();
4917 acc.add((100.5, 1000));
4918 acc.add((200.5, 500)); acc.add((300.5, 1500)); let result = acc.result().unwrap();
4922 assert!((result - 200.5).abs() < f64::EPSILON);
4923 }
4924
4925 #[test]
4926 fn test_last_value_f64_basic() {
4927 let mut acc = LastValueF64Accumulator::default();
4928 acc.add((100.5, 1000));
4929 acc.add((200.5, 500)); acc.add((300.5, 1500)); let result = acc.result().unwrap();
4933 assert!((result - 300.5).abs() < f64::EPSILON);
4934 }
4935
4936 #[test]
4937 fn test_first_value_aggregator_extract() {
4938 let aggregator = FirstValueAggregator::new(0, 1);
4939
4940 let schema = Arc::new(Schema::new(vec![
4942 Field::new("price", DataType::Int64, false),
4943 Field::new("ts", DataType::Int64, false),
4944 ]));
4945 let batch = RecordBatch::try_new(
4946 schema,
4947 vec![
4948 Arc::new(Int64Array::from(vec![100])),
4949 Arc::new(Int64Array::from(vec![1000])),
4950 ],
4951 )
4952 .unwrap();
4953 let event = Event::new(1000, batch);
4954
4955 let extracted = aggregator.extract(&event);
4956 assert_eq!(extracted, Some((100, 1000)));
4957 }
4958
4959 #[test]
4960 fn test_last_value_aggregator_extract() {
4961 let aggregator = LastValueAggregator::new(0, 1);
4962
4963 let schema = Arc::new(Schema::new(vec![
4964 Field::new("price", DataType::Int64, false),
4965 Field::new("ts", DataType::Int64, false),
4966 ]));
4967 let batch = RecordBatch::try_new(
4968 schema,
4969 vec![
4970 Arc::new(Int64Array::from(vec![100])),
4971 Arc::new(Int64Array::from(vec![1000])),
4972 ],
4973 )
4974 .unwrap();
4975 let event = Event::new(1000, batch);
4976
4977 let extracted = aggregator.extract(&event);
4978 assert_eq!(extracted, Some((100, 1000)));
4979 }
4980
4981 #[test]
4982 fn test_first_value_aggregator_invalid_column() {
4983 let aggregator = FirstValueAggregator::new(5, 6); let schema = Arc::new(Schema::new(vec![Field::new(
4986 "price",
4987 DataType::Int64,
4988 false,
4989 )]));
4990 let batch =
4991 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![100]))]).unwrap();
4992 let event = Event::new(1000, batch);
4993
4994 assert_eq!(aggregator.extract(&event), None);
4995 }
4996
4997 #[test]
4998 fn test_ohlc_simulation() {
4999 let mut first = FirstValueAccumulator::default();
5013 let mut max = MaxAccumulator::default();
5014 let mut min = MinAccumulator::default();
5015 let mut last = LastValueAccumulator::default();
5016 let mut sum = SumAccumulator::default();
5017
5018 first.add((100, 100));
5020 max.add(100);
5021 min.add(100);
5022 last.add((100, 100));
5023 sum.add(10);
5024
5025 first.add((105, 200));
5026 max.add(105);
5027 min.add(105);
5028 last.add((105, 200));
5029 sum.add(5);
5030
5031 first.add((98, 300));
5032 max.add(98);
5033 min.add(98);
5034 last.add((98, 300));
5035 sum.add(15);
5036
5037 first.add((102, 400));
5038 max.add(102);
5039 min.add(102);
5040 last.add((102, 400));
5041 sum.add(8);
5042
5043 assert_eq!(first.result(), Some(100), "Open");
5045 assert_eq!(max.result(), Some(105), "High");
5046 assert_eq!(min.result(), Some(98), "Low");
5047 assert_eq!(last.result(), Some(102), "Close");
5048 assert_eq!(sum.result(), 38, "Volume");
5049 }
5050
5051 #[test]
5052 fn test_first_value_checkpoint_restore() {
5053 let mut acc = FirstValueAccumulator::default();
5054 acc.add((100, 1000));
5055 acc.add((200, 500)); let bytes = rkyv::to_bytes::<RkyvError>(&acc)
5059 .expect("serialize")
5060 .to_vec();
5061
5062 let restored =
5064 rkyv::from_bytes::<FirstValueAccumulator, RkyvError>(&bytes).expect("deserialize");
5065
5066 assert_eq!(restored.result(), Some(200));
5067 assert_eq!(restored.timestamp, Some(500));
5068 }
5069
5070 #[test]
5071 fn test_last_value_checkpoint_restore() {
5072 let mut acc = LastValueAccumulator::default();
5073 acc.add((100, 1000));
5074 acc.add((300, 1500)); let bytes = rkyv::to_bytes::<RkyvError>(&acc)
5078 .expect("serialize")
5079 .to_vec();
5080
5081 let restored =
5083 rkyv::from_bytes::<LastValueAccumulator, RkyvError>(&bytes).expect("deserialize");
5084
5085 assert_eq!(restored.result(), Some(300));
5086 assert_eq!(restored.timestamp, Some(1500));
5087 }
5088
5089 #[test]
5096 fn test_scalar_result_int64_conversions() {
5097 let r = ScalarResult::Int64(42);
5098 assert_eq!(r.to_i64_lossy(), 42);
5099 assert!((r.to_f64_lossy() - 42.0).abs() < f64::EPSILON);
5100 assert!(!r.is_null());
5101 assert_eq!(r.data_type(), DataType::Int64);
5102 }
5103
5104 #[test]
5105 fn test_scalar_result_float64_conversions() {
5106 let r = ScalarResult::Float64(3.125);
5107 assert_eq!(r.to_i64_lossy(), 3); assert!((r.to_f64_lossy() - 3.125).abs() < f64::EPSILON);
5109 assert!(!r.is_null());
5110 assert_eq!(r.data_type(), DataType::Float64);
5111 }
5112
5113 #[test]
5114 fn test_scalar_result_uint64_conversions() {
5115 let r = ScalarResult::UInt64(100);
5116 assert_eq!(r.to_i64_lossy(), 100);
5117 assert!((r.to_f64_lossy() - 100.0).abs() < f64::EPSILON);
5118 assert_eq!(r.data_type(), DataType::UInt64);
5119 }
5120
5121 #[test]
5122 fn test_scalar_result_null_variants() {
5123 assert!(ScalarResult::Null.is_null());
5124 assert!(ScalarResult::OptionalInt64(None).is_null());
5125 assert!(ScalarResult::OptionalFloat64(None).is_null());
5126 assert!(!ScalarResult::OptionalInt64(Some(1)).is_null());
5127 assert!(!ScalarResult::OptionalFloat64(Some(1.0)).is_null());
5128 }
5129
5130 #[test]
5131 fn test_scalar_result_optional_conversions() {
5132 let r = ScalarResult::OptionalFloat64(Some(2.5));
5133 assert_eq!(r.to_i64_lossy(), 2);
5134 assert!((r.to_f64_lossy() - 2.5).abs() < f64::EPSILON);
5135
5136 let r2 = ScalarResult::OptionalInt64(None);
5137 assert_eq!(r2.to_i64_lossy(), 0);
5138 assert!((r2.to_f64_lossy()).abs() < f64::EPSILON);
5139 }
5140
5141 fn make_f64_event(values: &[f64]) -> Event {
5144 let schema = Arc::new(Schema::new(vec![Field::new(
5145 "value",
5146 DataType::Float64,
5147 false,
5148 )]));
5149 let batch = RecordBatch::try_new(
5150 schema,
5151 vec![Arc::new(arrow_array::Float64Array::from(values.to_vec()))],
5152 )
5153 .unwrap();
5154 Event::new(1000, batch)
5155 }
5156
5157 #[test]
5158 fn test_sum_f64_accumulator_basic() {
5159 let mut acc = SumF64IndexedAccumulator::new(0);
5160 let event = make_f64_event(&[1.5, 2.5, 3.0]);
5161 acc.add_event(&event);
5162 assert!(!acc.is_empty());
5163 match acc.result_scalar() {
5164 ScalarResult::Float64(v) => assert!((v - 7.0).abs() < f64::EPSILON),
5165 other => panic!("Expected Float64, got {other:?}"),
5166 }
5167 }
5168
5169 #[test]
5170 fn test_sum_f64_accumulator_empty() {
5171 let acc = SumF64IndexedAccumulator::new(0);
5172 assert!(acc.is_empty());
5173 assert!(matches!(acc.result_scalar(), ScalarResult::Null));
5174 }
5175
5176 #[test]
5177 fn test_min_f64_accumulator_basic() {
5178 let mut acc = MinF64IndexedAccumulator::new(0);
5179 let event = make_f64_event(&[3.0, 1.5, 2.5]);
5180 acc.add_event(&event);
5181 match acc.result_scalar() {
5182 ScalarResult::OptionalFloat64(Some(v)) => {
5183 assert!((v - 1.5).abs() < f64::EPSILON);
5184 }
5185 other => panic!("Expected OptionalFloat64(Some), got {other:?}"),
5186 }
5187 }
5188
5189 #[test]
5190 fn test_max_f64_accumulator_basic() {
5191 let mut acc = MaxF64IndexedAccumulator::new(0);
5192 let event = make_f64_event(&[3.0, 1.5, 2.5]);
5193 acc.add_event(&event);
5194 match acc.result_scalar() {
5195 ScalarResult::OptionalFloat64(Some(v)) => {
5196 assert!((v - 3.0).abs() < f64::EPSILON);
5197 }
5198 other => panic!("Expected OptionalFloat64(Some), got {other:?}"),
5199 }
5200 }
5201
5202 #[test]
5203 fn test_avg_f64_accumulator_basic() {
5204 let mut acc = AvgF64IndexedAccumulator::new(0);
5205 let event = make_f64_event(&[1.0, 2.0, 3.0]);
5206 acc.add_event(&event);
5207 match acc.result_scalar() {
5208 ScalarResult::Float64(v) => assert!((v - 2.0).abs() < f64::EPSILON),
5209 other => panic!("Expected Float64, got {other:?}"),
5210 }
5211 }
5212
5213 #[test]
5214 fn test_sum_f64_merge() {
5215 let mut acc1 = SumF64IndexedAccumulator::new(0);
5216 let mut acc2 = SumF64IndexedAccumulator::new(0);
5217 acc1.add_event(&make_f64_event(&[1.0, 2.0]));
5218 acc2.add_event(&make_f64_event(&[3.0, 4.0]));
5219 acc1.merge_dyn(&acc2);
5220 match acc1.result_scalar() {
5221 ScalarResult::Float64(v) => assert!((v - 10.0).abs() < f64::EPSILON),
5222 other => panic!("Expected Float64, got {other:?}"),
5223 }
5224 }
5225
5226 #[test]
5227 fn test_min_f64_merge() {
5228 let mut acc1 = MinF64IndexedAccumulator::new(0);
5229 let mut acc2 = MinF64IndexedAccumulator::new(0);
5230 acc1.add_event(&make_f64_event(&[5.0]));
5231 acc2.add_event(&make_f64_event(&[2.0]));
5232 acc1.merge_dyn(&acc2);
5233 match acc1.result_scalar() {
5234 ScalarResult::OptionalFloat64(Some(v)) => {
5235 assert!((v - 2.0).abs() < f64::EPSILON);
5236 }
5237 other => panic!("Expected OptionalFloat64(Some), got {other:?}"),
5238 }
5239 }
5240
5241 #[test]
5242 fn test_f64_accumulator_serialization() {
5243 let mut acc = SumF64IndexedAccumulator::new(0);
5244 acc.add_event(&make_f64_event(&[1.5, 2.5]));
5245 let data = acc.serialize();
5246 assert_eq!(data.len(), 16); }
5248
5249 #[test]
5252 fn test_count_dyn_accumulator() {
5253 let mut acc = CountDynAccumulator::default();
5254 let event = make_f64_event(&[1.0, 2.0, 3.0]);
5255 acc.add_event(&event);
5256 assert_eq!(acc.result_scalar(), ScalarResult::Int64(3));
5257 }
5258
5259 #[test]
5260 fn test_count_dyn_merge() {
5261 let mut acc1 = CountDynAccumulator::default();
5262 let mut acc2 = CountDynAccumulator::default();
5263 acc1.add_event(&make_f64_event(&[1.0, 2.0]));
5264 acc2.add_event(&make_f64_event(&[3.0]));
5265 acc1.merge_dyn(&acc2);
5266 assert_eq!(acc1.result_scalar(), ScalarResult::Int64(3));
5267 }
5268
5269 fn make_ts_f64_event(values: &[(f64, i64)]) -> Event {
5272 let schema = Arc::new(Schema::new(vec![
5273 Field::new("value", DataType::Float64, false),
5274 Field::new("timestamp", DataType::Int64, false),
5275 ]));
5276 let vals: Vec<f64> = values.iter().map(|(v, _)| *v).collect();
5277 let tss: Vec<i64> = values.iter().map(|(_, t)| *t).collect();
5278 let batch = RecordBatch::try_new(
5279 schema,
5280 vec![
5281 Arc::new(arrow_array::Float64Array::from(vals)),
5282 Arc::new(Int64Array::from(tss)),
5283 ],
5284 )
5285 .unwrap();
5286 Event::new(1000, batch)
5287 }
5288
5289 #[test]
5290 fn test_first_value_f64_dyn() {
5291 let mut acc = FirstValueF64DynAccumulator::new(0, 1);
5292 acc.add_event(&make_ts_f64_event(&[(10.0, 200), (20.0, 100), (30.0, 300)]));
5293 match acc.result_scalar() {
5295 ScalarResult::OptionalFloat64(Some(v)) => {
5296 assert!((v - 20.0).abs() < f64::EPSILON);
5297 }
5298 other => panic!("Expected OptionalFloat64(Some), got {other:?}"),
5299 }
5300 }
5301
5302 #[test]
5303 fn test_last_value_f64_dyn() {
5304 let mut acc = LastValueF64DynAccumulator::new(0, 1);
5305 acc.add_event(&make_ts_f64_event(&[(10.0, 200), (20.0, 100), (30.0, 300)]));
5306 match acc.result_scalar() {
5308 ScalarResult::OptionalFloat64(Some(v)) => {
5309 assert!((v - 30.0).abs() < f64::EPSILON);
5310 }
5311 other => panic!("Expected OptionalFloat64(Some), got {other:?}"),
5312 }
5313 }
5314
5315 #[test]
5316 fn test_first_value_f64_dyn_merge() {
5317 let mut acc1 = FirstValueF64DynAccumulator::new(0, 1);
5318 let mut acc2 = FirstValueF64DynAccumulator::new(0, 1);
5319 acc1.add_event(&make_ts_f64_event(&[(10.0, 200)]));
5320 acc2.add_event(&make_ts_f64_event(&[(20.0, 50)]));
5321 acc1.merge_dyn(&acc2);
5322 match acc1.result_scalar() {
5324 ScalarResult::OptionalFloat64(Some(v)) => {
5325 assert!((v - 20.0).abs() < f64::EPSILON);
5326 }
5327 other => panic!("Expected OptionalFloat64(Some), got {other:?}"),
5328 }
5329 }
5330
5331 #[test]
5334 fn test_composite_aggregator_creation() {
5335 let agg = CompositeAggregator::new(vec![
5336 Box::new(CountDynFactory::new("cnt")),
5337 Box::new(SumF64Factory::new(0, "total")),
5338 ]);
5339 assert_eq!(agg.num_aggregates(), 2);
5340 }
5341
5342 #[test]
5343 fn test_composite_aggregator_schema() {
5344 let agg = CompositeAggregator::new(vec![
5345 Box::new(CountDynFactory::new("cnt")),
5346 Box::new(MinF64Factory::new(0, "low")),
5347 Box::new(MaxF64Factory::new(0, "high")),
5348 ]);
5349 let schema = agg.output_schema();
5350 assert_eq!(schema.fields().len(), 5); assert_eq!(schema.field(0).name(), "window_start");
5352 assert_eq!(schema.field(1).name(), "window_end");
5353 assert_eq!(schema.field(2).name(), "cnt");
5354 assert_eq!(schema.field(3).name(), "low");
5355 assert_eq!(schema.field(4).name(), "high");
5356 }
5357
5358 #[test]
5359 fn test_composite_accumulator_lifecycle() {
5360 let agg = CompositeAggregator::new(vec![
5361 Box::new(CountDynFactory::new("cnt")),
5362 Box::new(SumF64Factory::new(0, "total")),
5363 ]);
5364 let mut acc = agg.create_accumulator();
5365 assert!(acc.is_empty());
5366 assert_eq!(acc.num_accumulators(), 2);
5367
5368 let event = make_f64_event(&[1.0, 2.0, 3.0]);
5369 acc.add_event(&event);
5370 assert!(!acc.is_empty());
5371
5372 let results = acc.results();
5373 assert_eq!(results.len(), 2);
5374 assert_eq!(results[0], ScalarResult::Int64(3));
5375 match &results[1] {
5376 ScalarResult::Float64(v) => assert!((v - 6.0).abs() < f64::EPSILON),
5377 other => panic!("Expected Float64, got {other:?}"),
5378 }
5379 }
5380
5381 #[test]
5382 fn test_composite_accumulator_merge() {
5383 let agg = CompositeAggregator::new(vec![
5384 Box::new(CountDynFactory::new("cnt")),
5385 Box::new(SumF64Factory::new(0, "total")),
5386 ]);
5387 let mut acc1 = agg.create_accumulator();
5388 let acc2_holder = agg.create_accumulator();
5389 acc1.add_event(&make_f64_event(&[1.0, 2.0]));
5391
5392 let mut acc2 = acc2_holder;
5394 acc2.add_event(&make_f64_event(&[3.0, 4.0]));
5395
5396 acc1.merge(&acc2);
5397 let results = acc1.results();
5398 assert_eq!(results[0], ScalarResult::Int64(4));
5399 match &results[1] {
5400 ScalarResult::Float64(v) => assert!((v - 10.0).abs() < f64::EPSILON),
5401 other => panic!("Expected Float64, got {other:?}"),
5402 }
5403 }
5404
5405 #[test]
5406 fn test_composite_accumulator_serialization() {
5407 let agg = CompositeAggregator::new(vec![
5408 Box::new(CountDynFactory::new("cnt")),
5409 Box::new(SumF64Factory::new(0, "total")),
5410 ]);
5411 let mut acc = agg.create_accumulator();
5412 acc.add_event(&make_f64_event(&[1.0, 2.0]));
5413
5414 let bytes = acc.serialize();
5415 assert!(bytes.len() > 4);
5417 let n = u32::from_le_bytes(bytes[..4].try_into().unwrap());
5419 assert_eq!(n, 2);
5420 }
5421
5422 #[test]
5423 fn test_composite_accumulator_record_batch() {
5424 let agg = CompositeAggregator::new(vec![
5425 Box::new(CountDynFactory::new("cnt")),
5426 Box::new(MinF64Factory::new(0, "low")),
5427 Box::new(MaxF64Factory::new(0, "high")),
5428 ]);
5429 let schema = agg.output_schema();
5430 let mut acc = agg.create_accumulator();
5431 acc.add_event(&make_f64_event(&[3.0, 1.0, 5.0, 2.0]));
5432
5433 let window_id = WindowId::new(0, 60000);
5434 let batch = acc.to_record_batch(&window_id, &schema).unwrap();
5435
5436 assert_eq!(batch.num_rows(), 1);
5437 assert_eq!(batch.num_columns(), 5);
5438
5439 let ws = batch
5441 .column(0)
5442 .as_any()
5443 .downcast_ref::<Int64Array>()
5444 .unwrap();
5445 assert_eq!(ws.value(0), 0);
5446
5447 let we = batch
5449 .column(1)
5450 .as_any()
5451 .downcast_ref::<Int64Array>()
5452 .unwrap();
5453 assert_eq!(we.value(0), 60000);
5454
5455 let cnt = batch
5457 .column(2)
5458 .as_any()
5459 .downcast_ref::<Int64Array>()
5460 .unwrap();
5461 assert_eq!(cnt.value(0), 4);
5462
5463 let low = batch
5465 .column(3)
5466 .as_any()
5467 .downcast_ref::<arrow_array::Float64Array>()
5468 .unwrap();
5469 assert!((low.value(0) - 1.0).abs() < f64::EPSILON);
5470
5471 let high = batch
5473 .column(4)
5474 .as_any()
5475 .downcast_ref::<arrow_array::Float64Array>()
5476 .unwrap();
5477 assert!((high.value(0) - 5.0).abs() < f64::EPSILON);
5478 }
5479
5480 #[test]
5481 fn test_ohlc_composite_integration() {
5482 let agg = CompositeAggregator::new(vec![
5484 Box::new(FirstValueF64DynFactory::new(0, 1, "open")),
5485 Box::new(MaxF64Factory::new(0, "high")),
5486 Box::new(MinF64Factory::new(0, "low")),
5487 Box::new(LastValueF64DynFactory::new(0, 1, "close")),
5488 Box::new(CountDynFactory::new("trade_count")),
5489 ]);
5490
5491 let mut acc = agg.create_accumulator();
5492
5493 let schema = Arc::new(Schema::new(vec![
5495 Field::new("price", DataType::Float64, false),
5496 Field::new("ts", DataType::Int64, false),
5497 ]));
5498
5499 let batch1 = RecordBatch::try_new(
5501 Arc::clone(&schema),
5502 vec![
5503 Arc::new(arrow_array::Float64Array::from(vec![100.0])),
5504 Arc::new(Int64Array::from(vec![1000])),
5505 ],
5506 )
5507 .unwrap();
5508 acc.add_event(&Event::new(1000, batch1));
5509
5510 let batch2 = RecordBatch::try_new(
5512 Arc::clone(&schema),
5513 vec![
5514 Arc::new(arrow_array::Float64Array::from(vec![105.0])),
5515 Arc::new(Int64Array::from(vec![2000])),
5516 ],
5517 )
5518 .unwrap();
5519 acc.add_event(&Event::new(2000, batch2));
5520
5521 let batch3 = RecordBatch::try_new(
5523 Arc::clone(&schema),
5524 vec![
5525 Arc::new(arrow_array::Float64Array::from(vec![98.0])),
5526 Arc::new(Int64Array::from(vec![3000])),
5527 ],
5528 )
5529 .unwrap();
5530 acc.add_event(&Event::new(3000, batch3));
5531
5532 let batch4 = RecordBatch::try_new(
5534 Arc::clone(&schema),
5535 vec![
5536 Arc::new(arrow_array::Float64Array::from(vec![102.0])),
5537 Arc::new(Int64Array::from(vec![4000])),
5538 ],
5539 )
5540 .unwrap();
5541 acc.add_event(&Event::new(4000, batch4));
5542
5543 let results = acc.results();
5544 match &results[0] {
5546 ScalarResult::OptionalFloat64(Some(v)) => {
5547 assert!((v - 100.0).abs() < f64::EPSILON, "Open should be 100.0");
5548 }
5549 other => panic!("Expected Open=100.0, got {other:?}"),
5550 }
5551 match &results[1] {
5552 ScalarResult::OptionalFloat64(Some(v)) => {
5553 assert!((v - 105.0).abs() < f64::EPSILON, "High should be 105.0");
5554 }
5555 other => panic!("Expected High=105.0, got {other:?}"),
5556 }
5557 match &results[2] {
5558 ScalarResult::OptionalFloat64(Some(v)) => {
5559 assert!((v - 98.0).abs() < f64::EPSILON, "Low should be 98.0");
5560 }
5561 other => panic!("Expected Low=98.0, got {other:?}"),
5562 }
5563 match &results[3] {
5564 ScalarResult::OptionalFloat64(Some(v)) => {
5565 assert!((v - 102.0).abs() < f64::EPSILON, "Close should be 102.0");
5566 }
5567 other => panic!("Expected Close=102.0, got {other:?}"),
5568 }
5569 assert_eq!(results[4], ScalarResult::Int64(4));
5570 }
5571
5572 #[test]
5573 fn test_composite_aggregator_clone() {
5574 let agg = CompositeAggregator::new(vec![
5575 Box::new(CountDynFactory::new("cnt")),
5576 Box::new(SumF64Factory::new(0, "total")),
5577 ]);
5578 let cloned = agg.clone();
5579 assert_eq!(cloned.num_aggregates(), 2);
5580
5581 let mut acc1 = agg.create_accumulator();
5583 let mut acc2 = cloned.create_accumulator();
5584 let event = make_f64_event(&[5.0]);
5585 acc1.add_event(&event);
5586 acc2.add_event(&event);
5587 assert_eq!(acc1.results(), acc2.results());
5588 }
5589
5590 #[test]
5591 fn test_composite_accumulator_clone() {
5592 let agg = CompositeAggregator::new(vec![Box::new(CountDynFactory::new("cnt"))]);
5593 let mut acc = agg.create_accumulator();
5594 acc.add_event(&make_f64_event(&[1.0, 2.0]));
5595
5596 let cloned = acc.clone();
5597 assert_eq!(acc.results(), cloned.results());
5598 }
5599
5600 #[test]
5601 fn test_backward_compat_existing_aggregators_unchanged() {
5602 let count_agg = CountAggregator::new();
5604 let mut count_acc = count_agg.create_accumulator();
5605 count_acc.add(());
5606 count_acc.add(());
5607 assert_eq!(count_acc.result(), 2);
5608
5609 let sum_agg = SumAggregator::new(0);
5610 let mut sum_acc = sum_agg.create_accumulator();
5611 sum_acc.add(10);
5612 sum_acc.add(20);
5613 assert_eq!(sum_acc.result(), 30);
5614 }
5615
5616 #[test]
5617 fn test_backward_compat_result_to_i64() {
5618 assert_eq!(42u64.to_i64(), 42);
5620 assert_eq!(42i64.to_i64(), 42);
5621 assert_eq!(Some(42i64).to_i64(), 42);
5622 assert_eq!(None::<i64>.to_i64(), 0);
5623 }
5624
5625 #[test]
5626 fn test_backward_compat_window_schema_unchanged() {
5627 let schema = create_window_output_schema();
5628 assert_eq!(schema.fields().len(), 3);
5629 assert_eq!(schema.field(0).name(), "window_start");
5630 assert_eq!(schema.field(1).name(), "window_end");
5631 assert_eq!(schema.field(2).name(), "result");
5632 }
5633
5634 #[test]
5635 fn test_f64_accumulator_out_of_range_column() {
5636 let mut acc = SumF64IndexedAccumulator::new(99);
5638 acc.add_event(&make_f64_event(&[1.0, 2.0]));
5639 assert!(acc.is_empty());
5640 }
5641
5642 #[test]
5643 fn test_f64_factory_types() {
5644 let sum_factory = SumF64Factory::new(0, "total");
5645 assert_eq!(sum_factory.type_tag(), "sum_f64");
5646 assert_eq!(sum_factory.result_field().name(), "total");
5647
5648 let min_factory = MinF64Factory::new(1, "low");
5649 assert_eq!(min_factory.type_tag(), "min_f64");
5650
5651 let max_factory = MaxF64Factory::new(1, "high");
5652 assert_eq!(max_factory.type_tag(), "max_f64");
5653
5654 let avg_factory = AvgF64Factory::new(0, "average");
5655 assert_eq!(avg_factory.type_tag(), "avg_f64");
5656 }
5657}