1use super::window::{CdcOperation, WindowId};
47use fxhash::FxHashMap;
48use serde::{Deserialize, Serialize};
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59#[repr(C)]
60pub struct ChangelogRef {
61 pub batch_offset: u32,
63 pub row_index: u32,
65 pub weight: i16,
67 operation_raw: u8,
69}
70
71impl ChangelogRef {
72 #[inline]
74 #[must_use]
75 pub fn new(batch_offset: u32, row_index: u32, weight: i16, operation: CdcOperation) -> Self {
76 Self {
77 batch_offset,
78 row_index,
79 weight,
80 operation_raw: operation.to_u8(),
81 }
82 }
83
84 #[inline]
86 #[must_use]
87 pub fn insert(batch_offset: u32, row_index: u32) -> Self {
88 Self::new(batch_offset, row_index, 1, CdcOperation::Insert)
89 }
90
91 #[inline]
93 #[must_use]
94 pub fn delete(batch_offset: u32, row_index: u32) -> Self {
95 Self::new(batch_offset, row_index, -1, CdcOperation::Delete)
96 }
97
98 #[inline]
100 #[must_use]
101 pub fn update_before(batch_offset: u32, row_index: u32) -> Self {
102 Self::new(batch_offset, row_index, -1, CdcOperation::UpdateBefore)
103 }
104
105 #[inline]
107 #[must_use]
108 pub fn update_after(batch_offset: u32, row_index: u32) -> Self {
109 Self::new(batch_offset, row_index, 1, CdcOperation::UpdateAfter)
110 }
111
112 #[inline]
114 #[must_use]
115 pub fn operation(&self) -> CdcOperation {
116 CdcOperation::from_u8(self.operation_raw)
117 }
118
119 #[inline]
121 #[must_use]
122 pub fn is_insert(&self) -> bool {
123 self.weight > 0
124 }
125
126 #[inline]
128 #[must_use]
129 pub fn is_delete(&self) -> bool {
130 self.weight < 0
131 }
132}
133
134pub struct ChangelogBuffer {
157 refs: Vec<ChangelogRef>,
159 len: usize,
161 capacity: usize,
163}
164
165impl ChangelogBuffer {
166 #[must_use]
168 pub fn with_capacity(capacity: usize) -> Self {
169 let mut refs = Vec::with_capacity(capacity);
170 refs.resize(
172 capacity,
173 ChangelogRef {
174 batch_offset: 0,
175 row_index: 0,
176 weight: 0,
177 operation_raw: 0,
178 },
179 );
180 Self {
181 refs,
182 len: 0,
183 capacity,
184 }
185 }
186
187 #[inline]
192 pub fn push(&mut self, changelog_ref: ChangelogRef) -> bool {
193 if self.len < self.capacity {
194 self.refs[self.len] = changelog_ref;
195 self.len += 1;
196 true
197 } else {
198 false }
200 }
201
202 #[inline]
206 pub fn push_retraction(
207 &mut self,
208 batch_offset: u32,
209 old_row_index: u32,
210 new_row_index: u32,
211 ) -> bool {
212 if self.len + 2 <= self.capacity {
213 self.refs[self.len] = ChangelogRef::update_before(batch_offset, old_row_index);
214 self.refs[self.len + 1] = ChangelogRef::update_after(batch_offset, new_row_index);
215 self.len += 2;
216 true
217 } else {
218 false
219 }
220 }
221
222 pub fn drain(&mut self) -> impl Iterator<Item = ChangelogRef> + '_ {
226 let len = self.len;
227 self.len = 0;
228 self.refs[..len].iter().copied()
229 }
230
231 #[inline]
233 #[must_use]
234 pub fn len(&self) -> usize {
235 self.len
236 }
237
238 #[inline]
240 #[must_use]
241 pub fn is_empty(&self) -> bool {
242 self.len == 0
243 }
244
245 #[inline]
247 #[must_use]
248 pub fn is_full(&self) -> bool {
249 self.len >= self.capacity
250 }
251
252 #[inline]
254 #[must_use]
255 pub fn capacity(&self) -> usize {
256 self.capacity
257 }
258
259 #[inline]
261 #[must_use]
262 pub fn available(&self) -> usize {
263 self.capacity.saturating_sub(self.len)
264 }
265
266 #[inline]
268 pub fn clear(&mut self) {
269 self.len = 0;
270 }
271
272 #[must_use]
274 pub fn as_slice(&self) -> &[ChangelogRef] {
275 &self.refs[..self.len]
276 }
277}
278
279impl Default for ChangelogBuffer {
280 fn default() -> Self {
281 Self::with_capacity(1024)
282 }
283}
284
285pub trait RetractableAccumulator: Default + Clone + Send {
300 type Input;
302 type Output;
304
305 fn add(&mut self, value: Self::Input);
307
308 fn retract(&mut self, value: &Self::Input);
315
316 fn merge(&mut self, other: &Self);
318
319 fn result(&self) -> Self::Output;
321
322 fn is_empty(&self) -> bool;
324
325 fn supports_efficient_retraction(&self) -> bool {
331 true
332 }
333
334 fn reset(&mut self);
336}
337
338#[derive(Debug, Clone, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
343pub struct RetractableCountAccumulator {
344 count: i64,
346}
347
348impl RetractableCountAccumulator {
349 #[must_use]
351 pub fn new() -> Self {
352 Self::default()
353 }
354
355 #[must_use]
357 pub fn count(&self) -> i64 {
358 self.count
359 }
360}
361
362impl RetractableAccumulator for RetractableCountAccumulator {
363 type Input = ();
364 type Output = i64;
365
366 #[inline]
367 fn add(&mut self, _value: ()) {
368 self.count += 1;
369 }
370
371 #[inline]
372 fn retract(&mut self, _value: &()) {
373 self.count -= 1;
374 }
375
376 fn merge(&mut self, other: &Self) {
377 self.count += other.count;
378 }
379
380 fn result(&self) -> i64 {
381 self.count
382 }
383
384 fn is_empty(&self) -> bool {
385 self.count == 0
386 }
387
388 fn reset(&mut self) {
389 self.count = 0;
390 }
391}
392
393#[derive(Debug, Clone, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
397pub struct RetractableSumAccumulator {
398 sum: i64,
400 count: i64,
402}
403
404impl RetractableSumAccumulator {
405 #[must_use]
407 pub fn new() -> Self {
408 Self::default()
409 }
410
411 #[must_use]
413 pub fn sum(&self) -> i64 {
414 self.sum
415 }
416}
417
418impl RetractableAccumulator for RetractableSumAccumulator {
419 type Input = i64;
420 type Output = i64;
421
422 #[inline]
423 fn add(&mut self, value: i64) {
424 self.sum += value;
425 self.count += 1;
426 }
427
428 #[inline]
429 fn retract(&mut self, value: &i64) {
430 self.sum -= value;
431 self.count -= 1;
432 }
433
434 fn merge(&mut self, other: &Self) {
435 self.sum += other.sum;
436 self.count += other.count;
437 }
438
439 fn result(&self) -> i64 {
440 self.sum
441 }
442
443 fn is_empty(&self) -> bool {
444 self.count == 0
445 }
446
447 fn reset(&mut self) {
448 self.sum = 0;
449 self.count = 0;
450 }
451}
452
453#[derive(Debug, Clone, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
457pub struct RetractableAvgAccumulator {
458 sum: i64,
460 count: i64,
462}
463
464impl RetractableAvgAccumulator {
465 #[must_use]
467 pub fn new() -> Self {
468 Self::default()
469 }
470
471 #[must_use]
473 pub fn sum(&self) -> i64 {
474 self.sum
475 }
476
477 #[must_use]
479 pub fn count(&self) -> i64 {
480 self.count
481 }
482}
483
484impl RetractableAccumulator for RetractableAvgAccumulator {
485 type Input = i64;
486 type Output = Option<f64>;
487
488 #[inline]
489 fn add(&mut self, value: i64) {
490 self.sum += value;
491 self.count += 1;
492 }
493
494 #[inline]
495 fn retract(&mut self, value: &i64) {
496 self.sum -= value;
497 self.count -= 1;
498 }
499
500 fn merge(&mut self, other: &Self) {
501 self.sum += other.sum;
502 self.count += other.count;
503 }
504
505 #[allow(clippy::cast_precision_loss)]
506 fn result(&self) -> Option<f64> {
507 if self.count > 0 {
508 Some(self.sum as f64 / self.count as f64)
509 } else {
510 None
511 }
512 }
513
514 fn is_empty(&self) -> bool {
515 self.count == 0
516 }
517
518 fn reset(&mut self) {
519 self.sum = 0;
520 self.count = 0;
521 }
522}
523
524#[derive(Debug, Clone, Default)]
532pub struct RetractableMinAccumulator {
533 values: Vec<i64>,
535 cached_min: Option<i64>,
537}
538
539impl RetractableMinAccumulator {
540 #[must_use]
542 pub fn new() -> Self {
543 Self::default()
544 }
545
546 fn recompute_min(&mut self) {
547 self.cached_min = self.values.iter().copied().min();
548 }
549}
550
551impl RetractableAccumulator for RetractableMinAccumulator {
552 type Input = i64;
553 type Output = Option<i64>;
554
555 fn add(&mut self, value: i64) {
556 self.values.push(value);
557 self.cached_min = Some(self.cached_min.map_or(value, |m| m.min(value)));
558 }
559
560 fn retract(&mut self, value: &i64) {
561 if let Some(pos) = self.values.iter().position(|v| v == value) {
562 self.values.swap_remove(pos);
563 if self.cached_min == Some(*value) {
565 self.recompute_min();
566 }
567 }
568 }
569
570 fn merge(&mut self, other: &Self) {
571 self.values.extend(&other.values);
572 self.recompute_min();
573 }
574
575 fn result(&self) -> Option<i64> {
576 self.cached_min
577 }
578
579 fn is_empty(&self) -> bool {
580 self.values.is_empty()
581 }
582
583 fn supports_efficient_retraction(&self) -> bool {
584 false
586 }
587
588 fn reset(&mut self) {
589 self.values.clear();
590 self.cached_min = None;
591 }
592}
593
594#[derive(Debug, Clone, Default)]
598pub struct RetractableMaxAccumulator {
599 values: Vec<i64>,
601 cached_max: Option<i64>,
603}
604
605impl RetractableMaxAccumulator {
606 #[must_use]
608 pub fn new() -> Self {
609 Self::default()
610 }
611
612 fn recompute_max(&mut self) {
613 self.cached_max = self.values.iter().copied().max();
614 }
615}
616
617impl RetractableAccumulator for RetractableMaxAccumulator {
618 type Input = i64;
619 type Output = Option<i64>;
620
621 fn add(&mut self, value: i64) {
622 self.values.push(value);
623 self.cached_max = Some(self.cached_max.map_or(value, |m| m.max(value)));
624 }
625
626 fn retract(&mut self, value: &i64) {
627 if let Some(pos) = self.values.iter().position(|v| v == value) {
628 self.values.swap_remove(pos);
629 if self.cached_max == Some(*value) {
631 self.recompute_max();
632 }
633 }
634 }
635
636 fn merge(&mut self, other: &Self) {
637 self.values.extend(&other.values);
638 self.recompute_max();
639 }
640
641 fn result(&self) -> Option<i64> {
642 self.cached_max
643 }
644
645 fn is_empty(&self) -> bool {
646 self.values.is_empty()
647 }
648
649 fn supports_efficient_retraction(&self) -> bool {
650 false
652 }
653
654 fn reset(&mut self) {
655 self.values.clear();
656 self.cached_max = None;
657 }
658}
659
660#[derive(Debug, Clone)]
664struct EmittedResult {
665 data: Vec<u8>,
667 emit_time: i64,
669 version: u32,
671}
672
673pub struct LateDataRetractionGenerator {
705 emitted_results: FxHashMap<WindowId, EmittedResult>,
707 enabled: bool,
709 retractions_generated: u64,
711 windows_tracked: u64,
713}
714
715impl LateDataRetractionGenerator {
716 #[must_use]
718 pub fn new(enabled: bool) -> Self {
719 Self {
720 emitted_results: FxHashMap::default(),
721 enabled,
722 retractions_generated: 0,
723 windows_tracked: 0,
724 }
725 }
726
727 #[must_use]
729 pub fn disabled() -> Self {
730 Self::new(false)
731 }
732
733 #[must_use]
735 pub fn is_enabled(&self) -> bool {
736 self.enabled
737 }
738
739 pub fn set_enabled(&mut self, enabled: bool) {
741 self.enabled = enabled;
742 }
743
744 pub fn check_retraction(
750 &mut self,
751 window_id: &WindowId,
752 new_data: &[u8],
753 timestamp: i64,
754 ) -> Option<(Vec<u8>, Vec<u8>)> {
755 if !self.enabled {
756 return None;
757 }
758
759 if let Some(prev) = self.emitted_results.get_mut(window_id) {
760 if prev.data != new_data {
761 let old_data = std::mem::replace(&mut prev.data, new_data.to_vec());
762 prev.emit_time = timestamp;
763 prev.version += 1;
764 self.retractions_generated += 1;
765 return Some((old_data, new_data.to_vec()));
766 }
767 } else {
768 self.emitted_results.insert(
769 *window_id,
770 EmittedResult {
771 data: new_data.to_vec(),
772 emit_time: timestamp,
773 version: 1,
774 },
775 );
776 self.windows_tracked += 1;
777 }
778
779 None
780 }
781
782 pub fn check_retraction_ref(
788 &mut self,
789 window_id: &WindowId,
790 new_data: &[u8],
791 timestamp: i64,
792 ) -> Option<Vec<u8>> {
793 if !self.enabled {
794 return None;
795 }
796
797 if let Some(prev) = self.emitted_results.get_mut(window_id) {
798 if prev.data != new_data {
799 let old_data = std::mem::replace(&mut prev.data, new_data.to_vec());
800 prev.emit_time = timestamp;
801 prev.version += 1;
802 self.retractions_generated += 1;
803 return Some(old_data);
804 }
805 } else {
806 self.emitted_results.insert(
807 *window_id,
808 EmittedResult {
809 data: new_data.to_vec(),
810 emit_time: timestamp,
811 version: 1,
812 },
813 );
814 self.windows_tracked += 1;
815 }
816
817 None
818 }
819
820 pub fn cleanup_window(&mut self, window_id: &WindowId) {
824 self.emitted_results.remove(window_id);
825 }
826
827 pub fn cleanup_before_watermark(&mut self, watermark: i64) {
831 self.emitted_results
832 .retain(|window_id, _| window_id.end > watermark);
833 }
834
835 #[must_use]
837 pub fn retractions_generated(&self) -> u64 {
838 self.retractions_generated
839 }
840
841 #[must_use]
843 pub fn windows_tracked(&self) -> usize {
844 self.emitted_results.len()
845 }
846
847 pub fn reset_metrics(&mut self) {
849 self.retractions_generated = 0;
850 self.windows_tracked = 0;
851 }
852
853 pub fn clear(&mut self) {
855 self.emitted_results.clear();
856 self.reset_metrics();
857 }
858}
859
860impl Default for LateDataRetractionGenerator {
861 fn default() -> Self {
862 Self::new(true)
863 }
864}
865
866#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
872pub struct CdcSource {
873 pub name: String,
875 pub db: String,
877 pub table: String,
879 #[serde(default)]
881 pub sequence: u64,
882}
883
884impl CdcSource {
885 #[must_use]
887 pub fn new(name: impl Into<String>, db: impl Into<String>, table: impl Into<String>) -> Self {
888 Self {
889 name: name.into(),
890 db: db.into(),
891 table: table.into(),
892 sequence: 0,
893 }
894 }
895
896 #[must_use]
898 pub fn with_sequence(
899 name: impl Into<String>,
900 db: impl Into<String>,
901 table: impl Into<String>,
902 sequence: u64,
903 ) -> Self {
904 Self {
905 name: name.into(),
906 db: db.into(),
907 table: table.into(),
908 sequence,
909 }
910 }
911
912 pub fn next_sequence(&mut self) -> u64 {
914 self.sequence += 1;
915 self.sequence
916 }
917}
918
919#[derive(Debug, Clone, Serialize, Deserialize)]
957pub struct CdcEnvelope<T> {
958 pub op: String,
960 pub ts_ms: i64,
962 pub source: CdcSource,
964 #[serde(skip_serializing_if = "Option::is_none")]
966 pub before: Option<T>,
967 #[serde(skip_serializing_if = "Option::is_none")]
969 pub after: Option<T>,
970}
971
972impl<T> CdcEnvelope<T> {
973 #[must_use]
975 pub fn insert(after: T, source: CdcSource, ts_ms: i64) -> Self {
976 Self {
977 op: "c".to_string(),
978 ts_ms,
979 source,
980 before: None,
981 after: Some(after),
982 }
983 }
984
985 #[must_use]
987 pub fn delete(before: T, source: CdcSource, ts_ms: i64) -> Self {
988 Self {
989 op: "d".to_string(),
990 ts_ms,
991 source,
992 before: Some(before),
993 after: None,
994 }
995 }
996
997 #[must_use]
999 pub fn update(before: T, after: T, source: CdcSource, ts_ms: i64) -> Self {
1000 Self {
1001 op: "u".to_string(),
1002 ts_ms,
1003 source,
1004 before: Some(before),
1005 after: Some(after),
1006 }
1007 }
1008
1009 #[must_use]
1011 pub fn read(after: T, source: CdcSource, ts_ms: i64) -> Self {
1012 Self {
1013 op: "r".to_string(),
1014 ts_ms,
1015 source,
1016 before: None,
1017 after: Some(after),
1018 }
1019 }
1020
1021 #[must_use]
1023 pub fn is_insert(&self) -> bool {
1024 self.op == "c"
1025 }
1026
1027 #[must_use]
1029 pub fn is_delete(&self) -> bool {
1030 self.op == "d"
1031 }
1032
1033 #[must_use]
1035 pub fn is_update(&self) -> bool {
1036 self.op == "u"
1037 }
1038
1039 #[must_use]
1045 pub fn weight(&self) -> i32 {
1046 match self.op.as_str() {
1047 "c" | "r" => 1,
1048 "d" => -1,
1049 _ => 0,
1051 }
1052 }
1053}
1054
1055impl<T: Serialize> CdcEnvelope<T> {
1056 pub fn to_json(&self) -> Result<String, serde_json::Error> {
1062 serde_json::to_string(self)
1063 }
1064
1065 pub fn to_json_pretty(&self) -> Result<String, serde_json::Error> {
1071 serde_json::to_string_pretty(self)
1072 }
1073
1074 pub fn to_json_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
1080 serde_json::to_vec(self)
1081 }
1082}
1083
1084#[derive(Debug, Clone, Default)]
1098pub struct RetractableFirstValueAccumulator {
1099 entries: Vec<(i64, i64)>,
1101}
1102
1103impl RetractableFirstValueAccumulator {
1104 #[must_use]
1106 pub fn new() -> Self {
1107 Self::default()
1108 }
1109
1110 #[must_use]
1112 pub fn len(&self) -> usize {
1113 self.entries.len()
1114 }
1115
1116 #[must_use]
1118 pub fn is_empty(&self) -> bool {
1119 self.entries.is_empty()
1120 }
1121}
1122
1123impl RetractableAccumulator for RetractableFirstValueAccumulator {
1124 type Input = (i64, i64); type Output = Option<i64>;
1126
1127 fn add(&mut self, (timestamp, value): (i64, i64)) {
1128 let pos = match self.entries.binary_search_by_key(×tamp, |(ts, _)| *ts) {
1131 Ok(mut p) => {
1132 while p < self.entries.len() && self.entries[p].0 == timestamp {
1134 p += 1;
1135 }
1136 p
1137 }
1138 Err(p) => p,
1139 };
1140 self.entries.insert(pos, (timestamp, value));
1141 }
1142
1143 fn retract(&mut self, (timestamp, value): &(i64, i64)) {
1144 if let Some(pos) = self
1146 .entries
1147 .iter()
1148 .position(|(ts, val)| ts == timestamp && val == value)
1149 {
1150 self.entries.remove(pos);
1151 }
1152 }
1153
1154 fn merge(&mut self, other: &Self) {
1155 let mut merged = Vec::with_capacity(self.entries.len() + other.entries.len());
1157 let mut i = 0;
1158 let mut j = 0;
1159 while i < self.entries.len() && j < other.entries.len() {
1160 if self.entries[i].0 <= other.entries[j].0 {
1161 merged.push(self.entries[i]);
1162 i += 1;
1163 } else {
1164 merged.push(other.entries[j]);
1165 j += 1;
1166 }
1167 }
1168 merged.extend_from_slice(&self.entries[i..]);
1169 merged.extend_from_slice(&other.entries[j..]);
1170 self.entries = merged;
1171 }
1172
1173 fn result(&self) -> Option<i64> {
1174 self.entries.first().map(|(_, val)| *val)
1176 }
1177
1178 fn is_empty(&self) -> bool {
1179 self.entries.is_empty()
1180 }
1181
1182 fn supports_efficient_retraction(&self) -> bool {
1183 true
1184 }
1185
1186 fn reset(&mut self) {
1187 self.entries.clear();
1188 }
1189}
1190
1191#[derive(Debug, Clone, Default)]
1198pub struct RetractableLastValueAccumulator {
1199 entries: Vec<(i64, i64)>,
1201}
1202
1203impl RetractableLastValueAccumulator {
1204 #[must_use]
1206 pub fn new() -> Self {
1207 Self::default()
1208 }
1209
1210 #[must_use]
1212 pub fn len(&self) -> usize {
1213 self.entries.len()
1214 }
1215
1216 #[must_use]
1218 pub fn is_empty(&self) -> bool {
1219 self.entries.is_empty()
1220 }
1221}
1222
1223impl RetractableAccumulator for RetractableLastValueAccumulator {
1224 type Input = (i64, i64); type Output = Option<i64>;
1226
1227 fn add(&mut self, (timestamp, value): (i64, i64)) {
1228 let pos = match self.entries.binary_search_by_key(×tamp, |(ts, _)| *ts) {
1230 Ok(mut p) => {
1231 while p < self.entries.len() && self.entries[p].0 == timestamp {
1232 p += 1;
1233 }
1234 p
1235 }
1236 Err(p) => p,
1237 };
1238 self.entries.insert(pos, (timestamp, value));
1239 }
1240
1241 fn retract(&mut self, (timestamp, value): &(i64, i64)) {
1242 if let Some(pos) = self
1243 .entries
1244 .iter()
1245 .position(|(ts, val)| ts == timestamp && val == value)
1246 {
1247 self.entries.remove(pos);
1248 }
1249 }
1250
1251 fn merge(&mut self, other: &Self) {
1252 let mut merged = Vec::with_capacity(self.entries.len() + other.entries.len());
1253 let mut i = 0;
1254 let mut j = 0;
1255 while i < self.entries.len() && j < other.entries.len() {
1256 if self.entries[i].0 <= other.entries[j].0 {
1257 merged.push(self.entries[i]);
1258 i += 1;
1259 } else {
1260 merged.push(other.entries[j]);
1261 j += 1;
1262 }
1263 }
1264 merged.extend_from_slice(&self.entries[i..]);
1265 merged.extend_from_slice(&other.entries[j..]);
1266 self.entries = merged;
1267 }
1268
1269 fn result(&self) -> Option<i64> {
1270 self.entries.last().map(|(_, val)| *val)
1272 }
1273
1274 fn is_empty(&self) -> bool {
1275 self.entries.is_empty()
1276 }
1277
1278 fn supports_efficient_retraction(&self) -> bool {
1279 true
1280 }
1281
1282 fn reset(&mut self) {
1283 self.entries.clear();
1284 }
1285}
1286
1287#[derive(Debug, Clone, Default)]
1292pub struct RetractableFirstValueF64Accumulator {
1293 entries: Vec<(i64, i64)>,
1295}
1296
1297impl RetractableFirstValueF64Accumulator {
1298 #[must_use]
1300 pub fn new() -> Self {
1301 Self::default()
1302 }
1303
1304 #[must_use]
1306 pub fn len(&self) -> usize {
1307 self.entries.len()
1308 }
1309
1310 #[must_use]
1312 pub fn is_empty(&self) -> bool {
1313 self.entries.is_empty()
1314 }
1315
1316 #[must_use]
1318 #[allow(clippy::cast_sign_loss)]
1319 pub fn result_f64(&self) -> Option<f64> {
1320 self.entries
1321 .first()
1322 .map(|(_, bits)| f64::from_bits(*bits as u64))
1323 }
1324}
1325
1326impl RetractableAccumulator for RetractableFirstValueF64Accumulator {
1327 type Input = (i64, f64); type Output = Option<i64>; #[allow(clippy::cast_possible_wrap)]
1331 fn add(&mut self, (timestamp, value): (i64, f64)) {
1332 let value_bits = value.to_bits() as i64;
1333 let pos = match self.entries.binary_search_by_key(×tamp, |(ts, _)| *ts) {
1334 Ok(mut p) => {
1335 while p < self.entries.len() && self.entries[p].0 == timestamp {
1336 p += 1;
1337 }
1338 p
1339 }
1340 Err(p) => p,
1341 };
1342 self.entries.insert(pos, (timestamp, value_bits));
1343 }
1344
1345 fn retract(&mut self, (timestamp, value): &(i64, f64)) {
1346 #[allow(clippy::cast_possible_wrap)]
1347 let value_bits = value.to_bits() as i64;
1348 if let Some(pos) = self
1349 .entries
1350 .iter()
1351 .position(|(ts, val)| *ts == *timestamp && *val == value_bits)
1352 {
1353 self.entries.remove(pos);
1354 }
1355 }
1356
1357 fn merge(&mut self, other: &Self) {
1358 let mut merged = Vec::with_capacity(self.entries.len() + other.entries.len());
1359 let mut i = 0;
1360 let mut j = 0;
1361 while i < self.entries.len() && j < other.entries.len() {
1362 if self.entries[i].0 <= other.entries[j].0 {
1363 merged.push(self.entries[i]);
1364 i += 1;
1365 } else {
1366 merged.push(other.entries[j]);
1367 j += 1;
1368 }
1369 }
1370 merged.extend_from_slice(&self.entries[i..]);
1371 merged.extend_from_slice(&other.entries[j..]);
1372 self.entries = merged;
1373 }
1374
1375 fn result(&self) -> Option<i64> {
1376 self.entries.first().map(|(_, val)| *val)
1377 }
1378
1379 fn is_empty(&self) -> bool {
1380 self.entries.is_empty()
1381 }
1382
1383 fn supports_efficient_retraction(&self) -> bool {
1384 true
1385 }
1386
1387 fn reset(&mut self) {
1388 self.entries.clear();
1389 }
1390}
1391
1392#[derive(Debug, Clone, Default)]
1396pub struct RetractableLastValueF64Accumulator {
1397 entries: Vec<(i64, i64)>,
1399}
1400
1401impl RetractableLastValueF64Accumulator {
1402 #[must_use]
1404 pub fn new() -> Self {
1405 Self::default()
1406 }
1407
1408 #[must_use]
1410 pub fn len(&self) -> usize {
1411 self.entries.len()
1412 }
1413
1414 #[must_use]
1416 pub fn is_empty(&self) -> bool {
1417 self.entries.is_empty()
1418 }
1419
1420 #[must_use]
1422 #[allow(clippy::cast_sign_loss)]
1423 pub fn result_f64(&self) -> Option<f64> {
1424 self.entries
1425 .last()
1426 .map(|(_, bits)| f64::from_bits(*bits as u64))
1427 }
1428}
1429
1430impl RetractableAccumulator for RetractableLastValueF64Accumulator {
1431 type Input = (i64, f64); type Output = Option<i64>; #[allow(clippy::cast_possible_wrap)]
1435 fn add(&mut self, (timestamp, value): (i64, f64)) {
1436 let value_bits = value.to_bits() as i64;
1437 let pos = match self.entries.binary_search_by_key(×tamp, |(ts, _)| *ts) {
1438 Ok(mut p) => {
1439 while p < self.entries.len() && self.entries[p].0 == timestamp {
1440 p += 1;
1441 }
1442 p
1443 }
1444 Err(p) => p,
1445 };
1446 self.entries.insert(pos, (timestamp, value_bits));
1447 }
1448
1449 fn retract(&mut self, (timestamp, value): &(i64, f64)) {
1450 #[allow(clippy::cast_possible_wrap)]
1451 let value_bits = value.to_bits() as i64;
1452 if let Some(pos) = self
1453 .entries
1454 .iter()
1455 .position(|(ts, val)| *ts == *timestamp && *val == value_bits)
1456 {
1457 self.entries.remove(pos);
1458 }
1459 }
1460
1461 fn merge(&mut self, other: &Self) {
1462 let mut merged = Vec::with_capacity(self.entries.len() + other.entries.len());
1463 let mut i = 0;
1464 let mut j = 0;
1465 while i < self.entries.len() && j < other.entries.len() {
1466 if self.entries[i].0 <= other.entries[j].0 {
1467 merged.push(self.entries[i]);
1468 i += 1;
1469 } else {
1470 merged.push(other.entries[j]);
1471 j += 1;
1472 }
1473 }
1474 merged.extend_from_slice(&self.entries[i..]);
1475 merged.extend_from_slice(&other.entries[j..]);
1476 self.entries = merged;
1477 }
1478
1479 fn result(&self) -> Option<i64> {
1480 self.entries.last().map(|(_, val)| *val)
1481 }
1482
1483 fn is_empty(&self) -> bool {
1484 self.entries.is_empty()
1485 }
1486
1487 fn supports_efficient_retraction(&self) -> bool {
1488 true
1489 }
1490
1491 fn reset(&mut self) {
1492 self.entries.clear();
1493 }
1494}
1495
1496#[cfg(test)]
1499mod tests {
1500 use super::*;
1501
1502 #[test]
1505 fn test_changelog_ref_insert() {
1506 let cr = ChangelogRef::insert(10, 5);
1507 assert_eq!(cr.batch_offset, 10);
1508 assert_eq!(cr.row_index, 5);
1509 assert_eq!(cr.weight, 1);
1510 assert_eq!(cr.operation(), CdcOperation::Insert);
1511 assert!(cr.is_insert());
1512 assert!(!cr.is_delete());
1513 }
1514
1515 #[test]
1516 fn test_changelog_ref_delete() {
1517 let cr = ChangelogRef::delete(20, 3);
1518 assert_eq!(cr.batch_offset, 20);
1519 assert_eq!(cr.row_index, 3);
1520 assert_eq!(cr.weight, -1);
1521 assert_eq!(cr.operation(), CdcOperation::Delete);
1522 assert!(!cr.is_insert());
1523 assert!(cr.is_delete());
1524 }
1525
1526 #[test]
1527 fn test_changelog_ref_update() {
1528 let before = ChangelogRef::update_before(5, 1);
1529 let after = ChangelogRef::update_after(5, 2);
1530
1531 assert_eq!(before.weight, -1);
1532 assert_eq!(after.weight, 1);
1533 assert_eq!(before.operation(), CdcOperation::UpdateBefore);
1534 assert_eq!(after.operation(), CdcOperation::UpdateAfter);
1535 }
1536
1537 #[test]
1538 fn test_changelog_ref_size() {
1539 assert!(std::mem::size_of::<ChangelogRef>() <= 16);
1541 }
1542
1543 #[test]
1546 fn test_changelog_buffer_basic() {
1547 let mut buffer = ChangelogBuffer::with_capacity(10);
1548 assert!(buffer.is_empty());
1549 assert_eq!(buffer.capacity(), 10);
1550
1551 assert!(buffer.push(ChangelogRef::insert(0, 0)));
1552 assert!(buffer.push(ChangelogRef::delete(1, 0)));
1553
1554 assert_eq!(buffer.len(), 2);
1555 assert!(!buffer.is_empty());
1556 }
1557
1558 #[test]
1559 fn test_changelog_buffer_full() {
1560 let mut buffer = ChangelogBuffer::with_capacity(2);
1561
1562 assert!(buffer.push(ChangelogRef::insert(0, 0)));
1563 assert!(buffer.push(ChangelogRef::insert(1, 0)));
1564 assert!(!buffer.push(ChangelogRef::insert(2, 0))); assert!(buffer.is_full());
1567 assert_eq!(buffer.available(), 0);
1568 }
1569
1570 #[test]
1571 fn test_changelog_buffer_drain() {
1572 let mut buffer = ChangelogBuffer::with_capacity(10);
1573
1574 for i in 0..5 {
1575 buffer.push(ChangelogRef::insert(i, 0));
1576 }
1577
1578 let drained: Vec<_> = buffer.drain().collect();
1579 assert_eq!(drained.len(), 5);
1580 assert!(buffer.is_empty());
1581
1582 for i in 0..3 {
1584 buffer.push(ChangelogRef::delete(i, 0));
1585 }
1586 assert_eq!(buffer.len(), 3);
1587 }
1588
1589 #[test]
1590 fn test_changelog_buffer_retraction() {
1591 let mut buffer = ChangelogBuffer::with_capacity(10);
1592
1593 assert!(buffer.push_retraction(0, 1, 2));
1594 assert_eq!(buffer.len(), 2);
1595
1596 let refs: Vec<_> = buffer.as_slice().to_vec();
1597 assert_eq!(refs[0].operation(), CdcOperation::UpdateBefore);
1598 assert_eq!(refs[0].row_index, 1);
1599 assert_eq!(refs[1].operation(), CdcOperation::UpdateAfter);
1600 assert_eq!(refs[1].row_index, 2);
1601 }
1602
1603 #[test]
1604 fn test_changelog_buffer_zero_alloc_reuse() {
1605 let mut buffer = ChangelogBuffer::with_capacity(100);
1606
1607 for i in 0..50 {
1609 buffer.push(ChangelogRef::insert(i, 0));
1610 }
1611 let _: Vec<_> = buffer.drain().collect();
1612
1613 for i in 0..50 {
1615 buffer.push(ChangelogRef::insert(i, 0));
1616 }
1617
1618 assert_eq!(buffer.len(), 50);
1619 }
1620
1621 #[test]
1624 fn test_retractable_count() {
1625 let mut agg = RetractableCountAccumulator::default();
1626
1627 agg.add(());
1628 agg.add(());
1629 agg.add(());
1630 assert_eq!(agg.result(), 3);
1631
1632 agg.retract(&());
1633 assert_eq!(agg.result(), 2);
1634
1635 agg.retract(&());
1636 agg.retract(&());
1637 assert_eq!(agg.result(), 0);
1638 }
1639
1640 #[test]
1641 fn test_retractable_count_negative() {
1642 let mut agg = RetractableCountAccumulator::default();
1643
1644 agg.add(());
1645 agg.retract(&());
1646 agg.retract(&()); assert_eq!(agg.result(), -1);
1650 }
1651
1652 #[test]
1653 fn test_retractable_sum() {
1654 let mut agg = RetractableSumAccumulator::default();
1655
1656 agg.add(10);
1657 agg.add(20);
1658 agg.add(30);
1659 assert_eq!(agg.result(), 60);
1660
1661 agg.retract(&20);
1662 assert_eq!(agg.result(), 40);
1663
1664 agg.retract(&10);
1665 agg.retract(&30);
1666 assert_eq!(agg.result(), 0);
1667 }
1668
1669 #[test]
1670 fn test_retractable_sum_merge() {
1671 let mut agg1 = RetractableSumAccumulator::default();
1672 agg1.add(10);
1673 agg1.add(20);
1674
1675 let mut agg2 = RetractableSumAccumulator::default();
1676 agg2.add(30);
1677 agg2.retract(&5);
1678
1679 agg1.merge(&agg2);
1680 assert_eq!(agg1.result(), 55); }
1682
1683 #[test]
1684 fn test_retractable_avg() {
1685 let mut agg = RetractableAvgAccumulator::default();
1686
1687 agg.add(10);
1688 agg.add(20);
1689 agg.add(30);
1690 let avg = agg.result().unwrap();
1691 assert!((avg - 20.0).abs() < f64::EPSILON);
1692
1693 agg.retract(&30);
1694 let avg = agg.result().unwrap();
1695 assert!((avg - 15.0).abs() < f64::EPSILON); }
1697
1698 #[test]
1699 fn test_retractable_avg_empty() {
1700 let mut agg = RetractableAvgAccumulator::default();
1701 assert!(agg.result().is_none());
1702
1703 agg.add(10);
1704 agg.retract(&10);
1705 assert!(agg.result().is_none());
1706 }
1707
1708 #[test]
1709 fn test_retractable_min() {
1710 let mut agg = RetractableMinAccumulator::default();
1711
1712 agg.add(30);
1713 agg.add(10);
1714 agg.add(20);
1715 assert_eq!(agg.result(), Some(10));
1716
1717 agg.retract(&10);
1719 assert_eq!(agg.result(), Some(20));
1720
1721 agg.retract(&30);
1723 assert_eq!(agg.result(), Some(20));
1724
1725 agg.retract(&20);
1726 assert_eq!(agg.result(), None);
1727 }
1728
1729 #[test]
1730 fn test_retractable_max() {
1731 let mut agg = RetractableMaxAccumulator::default();
1732
1733 agg.add(10);
1734 agg.add(30);
1735 agg.add(20);
1736 assert_eq!(agg.result(), Some(30));
1737
1738 agg.retract(&30);
1740 assert_eq!(agg.result(), Some(20));
1741
1742 agg.retract(&20);
1743 agg.retract(&10);
1744 assert_eq!(agg.result(), None);
1745 }
1746
1747 #[test]
1748 fn test_retractable_efficiency_flags() {
1749 let count = RetractableCountAccumulator::default();
1750 let sum = RetractableSumAccumulator::default();
1751 let avg = RetractableAvgAccumulator::default();
1752 let min = RetractableMinAccumulator::default();
1753 let max = RetractableMaxAccumulator::default();
1754
1755 assert!(count.supports_efficient_retraction());
1757 assert!(sum.supports_efficient_retraction());
1758 assert!(avg.supports_efficient_retraction());
1759
1760 assert!(!min.supports_efficient_retraction());
1762 assert!(!max.supports_efficient_retraction());
1763 }
1764
1765 #[test]
1768 fn test_late_data_retraction_first_emission() {
1769 let mut gen = LateDataRetractionGenerator::new(true);
1770 let window_id = WindowId::new(0, 60000);
1771
1772 let result = gen.check_retraction(&window_id, b"count=5", 1000);
1774 assert!(result.is_none());
1775 assert_eq!(gen.windows_tracked(), 1);
1776 }
1777
1778 #[test]
1779 fn test_late_data_retraction_changed_result() {
1780 let mut gen = LateDataRetractionGenerator::new(true);
1781 let window_id = WindowId::new(0, 60000);
1782
1783 gen.check_retraction(&window_id, b"count=5", 1000);
1785
1786 let result = gen.check_retraction(&window_id, b"count=7", 2000);
1788 assert!(result.is_some());
1789
1790 let (old, new) = result.unwrap();
1791 assert_eq!(old, b"count=5");
1792 assert_eq!(new, b"count=7");
1793 assert_eq!(gen.retractions_generated(), 1);
1794 }
1795
1796 #[test]
1797 fn test_late_data_retraction_same_result() {
1798 let mut gen = LateDataRetractionGenerator::new(true);
1799 let window_id = WindowId::new(0, 60000);
1800
1801 gen.check_retraction(&window_id, b"count=5", 1000);
1803
1804 let result = gen.check_retraction(&window_id, b"count=5", 2000);
1806 assert!(result.is_none());
1807 assert_eq!(gen.retractions_generated(), 0);
1808 }
1809
1810 #[test]
1811 fn test_late_data_retraction_disabled() {
1812 let mut gen = LateDataRetractionGenerator::new(false);
1813 let window_id = WindowId::new(0, 60000);
1814
1815 gen.check_retraction(&window_id, b"count=5", 1000);
1816 let result = gen.check_retraction(&window_id, b"count=7", 2000);
1817
1818 assert!(result.is_none());
1820 }
1821
1822 #[test]
1823 fn test_late_data_cleanup() {
1824 let mut gen = LateDataRetractionGenerator::new(true);
1825
1826 let w1 = WindowId::new(0, 1000);
1827 let w2 = WindowId::new(1000, 2000);
1828
1829 gen.check_retraction(&w1, b"a", 100);
1830 gen.check_retraction(&w2, b"b", 200);
1831 assert_eq!(gen.windows_tracked(), 2);
1832
1833 gen.cleanup_window(&w1);
1834 assert_eq!(gen.windows_tracked(), 1);
1835
1836 gen.cleanup_before_watermark(2000);
1837 assert_eq!(gen.windows_tracked(), 0);
1838 }
1839
1840 #[test]
1843 fn test_cdc_envelope_insert() {
1844 let source = CdcSource::new("laminardb", "default", "orders");
1845 let envelope = CdcEnvelope::insert(
1846 serde_json::json!({"id": 1, "amount": 100}),
1847 source,
1848 1_706_140_800_000,
1849 );
1850
1851 assert_eq!(envelope.op, "c");
1852 assert!(envelope.is_insert());
1853 assert!(envelope.before.is_none());
1854 assert!(envelope.after.is_some());
1855 assert_eq!(envelope.weight(), 1);
1856 }
1857
1858 #[test]
1859 fn test_cdc_envelope_delete() {
1860 let source = CdcSource::new("laminardb", "default", "orders");
1861 let envelope = CdcEnvelope::delete(serde_json::json!({"id": 1}), source, 1_706_140_800_000);
1862
1863 assert_eq!(envelope.op, "d");
1864 assert!(envelope.is_delete());
1865 assert!(envelope.before.is_some());
1866 assert!(envelope.after.is_none());
1867 assert_eq!(envelope.weight(), -1);
1868 }
1869
1870 #[test]
1871 fn test_cdc_envelope_update() {
1872 let source = CdcSource::new("laminardb", "default", "orders");
1873 let envelope = CdcEnvelope::update(
1874 serde_json::json!({"id": 1, "amount": 100}),
1875 serde_json::json!({"id": 1, "amount": 150}),
1876 source,
1877 1_706_140_800_000,
1878 );
1879
1880 assert_eq!(envelope.op, "u");
1881 assert!(envelope.is_update());
1882 assert!(envelope.before.is_some());
1883 assert!(envelope.after.is_some());
1884 assert_eq!(envelope.weight(), 0);
1885 }
1886
1887 #[test]
1888 fn test_cdc_envelope_json_serialization() {
1889 let source = CdcSource::new("laminardb", "default", "orders");
1890 let envelope = CdcEnvelope::insert(
1891 serde_json::json!({"id": 1, "amount": 100}),
1892 source,
1893 1_706_140_800_000,
1894 );
1895
1896 let json = envelope.to_json().unwrap();
1897 assert!(json.contains("\"op\":\"c\""));
1898 assert!(json.contains("\"after\""));
1899 assert!(!json.contains("\"before\""));
1900 assert!(json.contains("\"ts_ms\":1706140800000"));
1901 }
1902
1903 #[test]
1904 fn test_cdc_envelope_debezium_compatible() {
1905 let source = CdcSource::with_sequence("laminardb", "test_db", "users", 42);
1906 let envelope = CdcEnvelope::insert(
1907 serde_json::json!({"user_id": 123, "name": "Alice"}),
1908 source,
1909 1_706_140_800_000,
1910 );
1911
1912 let json = envelope.to_json().unwrap();
1913
1914 assert!(json.contains("\"op\":\"c\""));
1916 assert!(json.contains("\"source\""));
1917 assert!(json.contains("\"name\":\"laminardb\""));
1918 assert!(json.contains("\"db\":\"test_db\""));
1919 assert!(json.contains("\"table\":\"users\""));
1920 assert!(json.contains("\"sequence\":42"));
1921 }
1922
1923 #[test]
1924 fn test_cdc_source_sequence() {
1925 let mut source = CdcSource::new("laminardb", "db", "table");
1926 assert_eq!(source.sequence, 0);
1927
1928 assert_eq!(source.next_sequence(), 1);
1929 assert_eq!(source.next_sequence(), 2);
1930 assert_eq!(source.sequence, 2);
1931 }
1932
1933 #[test]
1936 fn test_cdc_operation_roundtrip() {
1937 for op in [
1938 CdcOperation::Insert,
1939 CdcOperation::Delete,
1940 CdcOperation::UpdateBefore,
1941 CdcOperation::UpdateAfter,
1942 ] {
1943 let u8_val = op.to_u8();
1944 let restored = CdcOperation::from_u8(u8_val);
1945 assert_eq!(op, restored);
1946 }
1947 }
1948
1949 #[test]
1950 fn test_cdc_operation_unknown_u8() {
1951 assert_eq!(CdcOperation::from_u8(255), CdcOperation::Insert);
1953 }
1954
1955 #[test]
1962 fn test_retractable_first_value_basic() {
1963 let mut acc = RetractableFirstValueAccumulator::new();
1964 assert!(acc.is_empty());
1965 assert_eq!(acc.result(), None);
1966
1967 acc.add((200, 20));
1969 acc.add((100, 10));
1970 acc.add((300, 30));
1971
1972 assert!(!acc.is_empty());
1973 assert_eq!(acc.len(), 3);
1974 assert_eq!(acc.result(), Some(10));
1976 }
1977
1978 #[test]
1979 fn test_retractable_first_value_retract_non_first() {
1980 let mut acc = RetractableFirstValueAccumulator::new();
1981 acc.add((100, 10));
1982 acc.add((200, 20));
1983 acc.add((300, 30));
1984
1985 acc.retract(&(200, 20));
1987 assert_eq!(acc.len(), 2);
1988 assert_eq!(acc.result(), Some(10));
1989 }
1990
1991 #[test]
1992 fn test_retractable_first_value_retract_first() {
1993 let mut acc = RetractableFirstValueAccumulator::new();
1994 acc.add((100, 10));
1995 acc.add((200, 20));
1996 acc.add((300, 30));
1997
1998 acc.retract(&(100, 10));
2000 assert_eq!(acc.len(), 2);
2001 assert_eq!(acc.result(), Some(20)); }
2003
2004 #[test]
2005 fn test_retractable_first_value_retract_all() {
2006 let mut acc = RetractableFirstValueAccumulator::new();
2007 acc.add((100, 10));
2008 acc.add((200, 20));
2009
2010 acc.retract(&(100, 10));
2011 acc.retract(&(200, 20));
2012 assert!(acc.is_empty());
2013 assert_eq!(acc.result(), None);
2014 }
2015
2016 #[test]
2017 fn test_retractable_first_value_retract_nonexistent() {
2018 let mut acc = RetractableFirstValueAccumulator::new();
2019 acc.add((100, 10));
2020
2021 acc.retract(&(999, 99));
2023 assert_eq!(acc.len(), 1);
2024 assert_eq!(acc.result(), Some(10));
2025 }
2026
2027 #[test]
2028 fn test_retractable_first_value_duplicate_timestamps() {
2029 let mut acc = RetractableFirstValueAccumulator::new();
2030 acc.add((100, 10));
2031 acc.add((100, 20)); assert_eq!(acc.len(), 2);
2034 assert_eq!(acc.result(), Some(10));
2036
2037 acc.retract(&(100, 10));
2039 assert_eq!(acc.result(), Some(20));
2040 }
2041
2042 #[test]
2045 fn test_retractable_last_value_basic() {
2046 let mut acc = RetractableLastValueAccumulator::new();
2047 assert!(acc.is_empty());
2048 assert_eq!(acc.result(), None);
2049
2050 acc.add((100, 10));
2051 acc.add((300, 30));
2052 acc.add((200, 20));
2053
2054 assert_eq!(acc.len(), 3);
2055 assert_eq!(acc.result(), Some(30));
2057 }
2058
2059 #[test]
2060 fn test_retractable_last_value_retract_non_last() {
2061 let mut acc = RetractableLastValueAccumulator::new();
2062 acc.add((100, 10));
2063 acc.add((200, 20));
2064 acc.add((300, 30));
2065
2066 acc.retract(&(200, 20));
2068 assert_eq!(acc.result(), Some(30));
2069 }
2070
2071 #[test]
2072 fn test_retractable_last_value_retract_last() {
2073 let mut acc = RetractableLastValueAccumulator::new();
2074 acc.add((100, 10));
2075 acc.add((200, 20));
2076 acc.add((300, 30));
2077
2078 acc.retract(&(300, 30));
2080 assert_eq!(acc.result(), Some(20)); }
2082
2083 #[test]
2084 fn test_retractable_last_value_retract_all() {
2085 let mut acc = RetractableLastValueAccumulator::new();
2086 acc.add((100, 10));
2087 acc.retract(&(100, 10));
2088 assert!(acc.is_empty());
2089 assert_eq!(acc.result(), None);
2090 }
2091
2092 #[test]
2095 fn test_retractable_first_value_merge() {
2096 let mut acc1 = RetractableFirstValueAccumulator::new();
2097 let mut acc2 = RetractableFirstValueAccumulator::new();
2098
2099 acc1.add((200, 20));
2100 acc1.add((400, 40));
2101 acc2.add((100, 10));
2102 acc2.add((300, 30));
2103
2104 acc1.merge(&acc2);
2105 assert_eq!(acc1.len(), 4);
2106 assert_eq!(acc1.result(), Some(10));
2108 }
2109
2110 #[test]
2111 fn test_retractable_last_value_merge() {
2112 let mut acc1 = RetractableLastValueAccumulator::new();
2113 let mut acc2 = RetractableLastValueAccumulator::new();
2114
2115 acc1.add((100, 10));
2116 acc1.add((300, 30));
2117 acc2.add((200, 20));
2118 acc2.add((400, 40));
2119
2120 acc1.merge(&acc2);
2121 assert_eq!(acc1.len(), 4);
2122 assert_eq!(acc1.result(), Some(40));
2124 }
2125
2126 #[test]
2127 fn test_retractable_first_value_merge_empty() {
2128 let mut acc1 = RetractableFirstValueAccumulator::new();
2129 let acc2 = RetractableFirstValueAccumulator::new();
2130
2131 acc1.add((100, 10));
2132 acc1.merge(&acc2); assert_eq!(acc1.result(), Some(10));
2134
2135 let mut acc3 = RetractableFirstValueAccumulator::new();
2136 let acc4 = RetractableFirstValueAccumulator::new();
2137 acc3.merge(&acc4); assert!(acc3.is_empty());
2139 }
2140
2141 #[test]
2144 fn test_retractable_first_value_reset() {
2145 let mut acc = RetractableFirstValueAccumulator::new();
2146 acc.add((100, 10));
2147 acc.add((200, 20));
2148 assert!(!acc.is_empty());
2149
2150 acc.reset();
2151 assert!(acc.is_empty());
2152 assert_eq!(acc.result(), None);
2153 }
2154
2155 #[test]
2156 fn test_retractable_last_value_reset() {
2157 let mut acc = RetractableLastValueAccumulator::new();
2158 acc.add((100, 10));
2159 acc.reset();
2160 assert!(acc.is_empty());
2161 }
2162
2163 #[test]
2166 fn test_retractable_first_value_f64_basic() {
2167 let mut acc = RetractableFirstValueF64Accumulator::new();
2168 acc.add((200, 20.5));
2169 acc.add((100, 10.5));
2170 acc.add((300, 30.5));
2171
2172 assert_eq!(acc.len(), 3);
2173 let result = acc.result_f64().unwrap();
2175 assert!((result - 10.5).abs() < f64::EPSILON);
2176 }
2177
2178 #[test]
2179 fn test_retractable_first_value_f64_retract() {
2180 let mut acc = RetractableFirstValueF64Accumulator::new();
2181 acc.add((100, 10.5));
2182 acc.add((200, 20.5));
2183
2184 acc.retract(&(100, 10.5));
2186 let result = acc.result_f64().unwrap();
2187 assert!((result - 20.5).abs() < f64::EPSILON);
2188 }
2189
2190 #[test]
2191 fn test_retractable_last_value_f64_basic() {
2192 let mut acc = RetractableLastValueF64Accumulator::new();
2193 acc.add((100, 10.5));
2194 acc.add((300, 30.5));
2195 acc.add((200, 20.5));
2196
2197 let result = acc.result_f64().unwrap();
2198 assert!((result - 30.5).abs() < f64::EPSILON);
2199 }
2200
2201 #[test]
2202 fn test_retractable_last_value_f64_retract() {
2203 let mut acc = RetractableLastValueF64Accumulator::new();
2204 acc.add((100, 10.5));
2205 acc.add((200, 20.5));
2206 acc.add((300, 30.5));
2207
2208 acc.retract(&(300, 30.5));
2209 let result = acc.result_f64().unwrap();
2210 assert!((result - 20.5).abs() < f64::EPSILON);
2211 }
2212
2213 #[test]
2214 fn test_retractable_first_value_f64_merge() {
2215 let mut acc1 = RetractableFirstValueF64Accumulator::new();
2216 let mut acc2 = RetractableFirstValueF64Accumulator::new();
2217 acc1.add((200, 20.5));
2218 acc2.add((100, 10.5));
2219 acc1.merge(&acc2);
2220 let result = acc1.result_f64().unwrap();
2221 assert!((result - 10.5).abs() < f64::EPSILON);
2222 }
2223
2224 #[test]
2225 fn test_retractable_last_value_f64_merge() {
2226 let mut acc1 = RetractableLastValueF64Accumulator::new();
2227 let mut acc2 = RetractableLastValueF64Accumulator::new();
2228 acc1.add((100, 10.5));
2229 acc2.add((300, 30.5));
2230 acc1.merge(&acc2);
2231 let result = acc1.result_f64().unwrap();
2232 assert!((result - 30.5).abs() < f64::EPSILON);
2233 }
2234
2235 #[test]
2238 fn test_retractable_first_value_single_entry() {
2239 let mut acc = RetractableFirstValueAccumulator::new();
2240 acc.add((100, 42));
2241 assert_eq!(acc.result(), Some(42));
2242 acc.retract(&(100, 42));
2243 assert_eq!(acc.result(), None);
2244 }
2245
2246 #[test]
2247 fn test_retractable_last_value_single_entry() {
2248 let mut acc = RetractableLastValueAccumulator::new();
2249 acc.add((100, 42));
2250 assert_eq!(acc.result(), Some(42));
2251 acc.retract(&(100, 42));
2252 assert_eq!(acc.result(), None);
2253 }
2254
2255 #[test]
2256 fn test_retractable_first_value_negative_values() {
2257 let mut acc = RetractableFirstValueAccumulator::new();
2258 acc.add((100, -10));
2259 acc.add((200, -20));
2260 assert_eq!(acc.result(), Some(-10));
2261 }
2262
2263 #[test]
2264 fn test_retractable_supports_efficient_retraction() {
2265 let acc = RetractableFirstValueAccumulator::new();
2266 assert!(acc.supports_efficient_retraction());
2267
2268 let acc2 = RetractableLastValueAccumulator::new();
2269 assert!(acc2.supports_efficient_retraction());
2270
2271 let acc3 = RetractableFirstValueF64Accumulator::new();
2272 assert!(acc3.supports_efficient_retraction());
2273
2274 let acc4 = RetractableLastValueF64Accumulator::new();
2275 assert!(acc4.supports_efficient_retraction());
2276 }
2277
2278 #[test]
2281 fn test_ohlc_retraction_simulation() {
2282 let mut open_acc = RetractableFirstValueAccumulator::new();
2285 let mut close_acc = RetractableLastValueAccumulator::new();
2286
2287 open_acc.add((1000, 100));
2289 close_acc.add((1000, 100));
2290
2291 open_acc.add((2000, 105));
2293 close_acc.add((2000, 105));
2294
2295 open_acc.add((3000, 98));
2297 close_acc.add((3000, 98));
2298
2299 assert_eq!(open_acc.result(), Some(100)); assert_eq!(close_acc.result(), Some(98)); open_acc.retract(&(1000, 100));
2304 close_acc.retract(&(1000, 100));
2305
2306 assert_eq!(open_acc.result(), Some(105));
2308 assert_eq!(close_acc.result(), Some(98));
2310 }
2311
2312 #[test]
2313 fn test_ohlc_retraction_f64_simulation() {
2314 let mut open_acc = RetractableFirstValueF64Accumulator::new();
2315 let mut close_acc = RetractableLastValueF64Accumulator::new();
2316
2317 open_acc.add((1000, 100.50));
2318 close_acc.add((1000, 100.50));
2319 open_acc.add((2000, 105.25));
2320 close_acc.add((2000, 105.25));
2321 open_acc.add((3000, 98.75));
2322 close_acc.add((3000, 98.75));
2323
2324 let open = open_acc.result_f64().unwrap();
2325 let close = close_acc.result_f64().unwrap();
2326 assert!((open - 100.50).abs() < f64::EPSILON);
2327 assert!((close - 98.75).abs() < f64::EPSILON);
2328
2329 open_acc.retract(&(1000, 100.50));
2331 close_acc.retract(&(1000, 100.50));
2332
2333 let open2 = open_acc.result_f64().unwrap();
2334 assert!((open2 - 105.25).abs() < f64::EPSILON);
2335 }
2336}