Skip to main content

laminar_core/operator/
changelog.rs

1//! # Changelog and Retraction Support (F063)
2//!
3//! Z-set style changelog records with integer weights for incremental computation.
4//! This is the foundation for exactly-once sinks, cascading materialized views,
5//! and CDC connectors.
6//!
7//! ## Key Concepts
8//!
9//! - **Z-sets**: Elements have integer weights. Weight > 0 → insert, weight < 0 → delete.
10//! - **Retraction**: Emitting (-old, +new) pairs to correct previous results.
11//! - **CDC Envelope**: Debezium-compatible format for downstream systems.
12//!
13//! ## Ring Architecture
14//!
15//! - **Ring 0**: `ChangelogRef` and `ChangelogBuffer` for zero-allocation hot path
16//! - **Ring 1**: `LateDataRetractionGenerator` and `CdcEnvelope` serialization
17//! - **Ring 2**: Changelog configuration and CDC format selection
18//!
19//! ## Example
20//!
21//! ```rust,no_run
22//! use laminar_core::operator::changelog::{
23//!     ChangelogBuffer, ChangelogRef, RetractableCountAccumulator,
24//!     RetractableAccumulator, CdcEnvelope, CdcSource,
25//! };
26//! use laminar_core::operator::window::CdcOperation;
27//!
28//! // Ring 0: Zero-allocation changelog tracking
29//! let mut buffer = ChangelogBuffer::with_capacity(1024);
30//! buffer.push(ChangelogRef::insert(0, 0));
31//! buffer.push(ChangelogRef::delete(0, 1));
32//!
33//! // Ring 1: Retractable aggregation
34//! let mut agg = RetractableCountAccumulator::default();
35//! agg.add(());
36//! agg.add(());
37//! assert_eq!(agg.result(), 2);
38//! agg.retract(&());
39//! assert_eq!(agg.result(), 1);
40//!
41//! // CDC envelope for sinks
42//! let source = CdcSource::new("laminardb", "default", "orders");
43//! let envelope = CdcEnvelope::insert(serde_json::json!({"id": 1}), source, 1000);
44//! ```
45
46use super::window::{CdcOperation, WindowId};
47use fxhash::FxHashMap;
48use serde::{Deserialize, Serialize};
49
50// Ring 0: Zero-Allocation Types
51
52/// Zero-allocation changelog reference for Ring 0 hot path.
53///
54/// Instead of allocating a full `ChangelogRecord`, this stores
55/// offsets into the event batch with the operation type.
56///
57/// Size: 12 bytes (u32 + u32 + i16 + u8 + padding)
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59#[repr(C)]
60pub struct ChangelogRef {
61    /// Offset into the current batch
62    pub batch_offset: u32,
63    /// Row index within the batch
64    pub row_index: u32,
65    /// Z-set weight (+1 or -1)
66    pub weight: i16,
67    /// Operation type (stored as u8 for compactness)
68    operation_raw: u8,
69}
70
71impl ChangelogRef {
72    /// Creates a new changelog reference.
73    #[inline]
74    #[must_use]
75    pub fn new(batch_offset: u32, row_index: u32, weight: i16, operation: CdcOperation) -> Self {
76        Self {
77            batch_offset,
78            row_index,
79            weight,
80            operation_raw: operation.to_u8(),
81        }
82    }
83
84    /// Creates an insert reference.
85    #[inline]
86    #[must_use]
87    pub fn insert(batch_offset: u32, row_index: u32) -> Self {
88        Self::new(batch_offset, row_index, 1, CdcOperation::Insert)
89    }
90
91    /// Creates a delete reference.
92    #[inline]
93    #[must_use]
94    pub fn delete(batch_offset: u32, row_index: u32) -> Self {
95        Self::new(batch_offset, row_index, -1, CdcOperation::Delete)
96    }
97
98    /// Creates an update-before reference (retraction).
99    #[inline]
100    #[must_use]
101    pub fn update_before(batch_offset: u32, row_index: u32) -> Self {
102        Self::new(batch_offset, row_index, -1, CdcOperation::UpdateBefore)
103    }
104
105    /// Creates an update-after reference.
106    #[inline]
107    #[must_use]
108    pub fn update_after(batch_offset: u32, row_index: u32) -> Self {
109        Self::new(batch_offset, row_index, 1, CdcOperation::UpdateAfter)
110    }
111
112    /// Returns the CDC operation type.
113    #[inline]
114    #[must_use]
115    pub fn operation(&self) -> CdcOperation {
116        CdcOperation::from_u8(self.operation_raw)
117    }
118
119    /// Returns true if this is an insert-type operation.
120    #[inline]
121    #[must_use]
122    pub fn is_insert(&self) -> bool {
123        self.weight > 0
124    }
125
126    /// Returns true if this is a delete-type operation.
127    #[inline]
128    #[must_use]
129    pub fn is_delete(&self) -> bool {
130        self.weight < 0
131    }
132}
133
134/// Ring 0 changelog buffer (pre-allocated, reused per epoch).
135///
136/// This buffer stores changelog references without allocating on the hot path
137/// (after initial warmup). When the buffer is full, it signals backpressure.
138///
139/// # Example
140///
141/// ```rust,no_run
142/// use laminar_core::operator::changelog::{ChangelogBuffer, ChangelogRef};
143///
144/// let mut buffer = ChangelogBuffer::with_capacity(1024);
145///
146/// // Push references (no allocation after warmup)
147/// for i in 0..100 {
148///     buffer.push(ChangelogRef::insert(i, 0));
149/// }
150///
151/// // Drain for Ring 1 processing
152/// for changelog_ref in buffer.drain() {
153///     // Process in Ring 1
154/// }
155/// ```
156pub struct ChangelogBuffer {
157    /// Pre-allocated changelog references
158    refs: Vec<ChangelogRef>,
159    /// Current write position
160    len: usize,
161    /// Capacity
162    capacity: usize,
163}
164
165impl ChangelogBuffer {
166    /// Creates a new buffer with the given capacity.
167    #[must_use]
168    pub fn with_capacity(capacity: usize) -> Self {
169        let mut refs = Vec::with_capacity(capacity);
170        // Pre-warm the buffer to avoid allocations during hot path
171        refs.resize(
172            capacity,
173            ChangelogRef {
174                batch_offset: 0,
175                row_index: 0,
176                weight: 0,
177                operation_raw: 0,
178            },
179        );
180        Self {
181            refs,
182            len: 0,
183            capacity,
184        }
185    }
186
187    /// Pushes a changelog reference (no allocation if under capacity).
188    ///
189    /// Returns `true` if the reference was added, `false` if buffer is full
190    /// (backpressure signal).
191    #[inline]
192    pub fn push(&mut self, changelog_ref: ChangelogRef) -> bool {
193        if self.len < self.capacity {
194            self.refs[self.len] = changelog_ref;
195            self.len += 1;
196            true
197        } else {
198            false // Buffer full - backpressure signal
199        }
200    }
201
202    /// Pushes a retraction pair (update-before, update-after).
203    ///
204    /// Returns `true` if both references were added, `false` if buffer is full.
205    #[inline]
206    pub fn push_retraction(
207        &mut self,
208        batch_offset: u32,
209        old_row_index: u32,
210        new_row_index: u32,
211    ) -> bool {
212        if self.len + 2 <= self.capacity {
213            self.refs[self.len] = ChangelogRef::update_before(batch_offset, old_row_index);
214            self.refs[self.len + 1] = ChangelogRef::update_after(batch_offset, new_row_index);
215            self.len += 2;
216            true
217        } else {
218            false
219        }
220    }
221
222    /// Drains references for Ring 1 processing.
223    ///
224    /// After draining, the buffer is empty but retains its capacity.
225    pub fn drain(&mut self) -> impl Iterator<Item = ChangelogRef> + '_ {
226        let len = self.len;
227        self.len = 0;
228        self.refs[..len].iter().copied()
229    }
230
231    /// Returns current count of references.
232    #[inline]
233    #[must_use]
234    pub fn len(&self) -> usize {
235        self.len
236    }
237
238    /// Returns true if the buffer is empty.
239    #[inline]
240    #[must_use]
241    pub fn is_empty(&self) -> bool {
242        self.len == 0
243    }
244
245    /// Returns true if the buffer is full.
246    #[inline]
247    #[must_use]
248    pub fn is_full(&self) -> bool {
249        self.len >= self.capacity
250    }
251
252    /// Returns the buffer capacity.
253    #[inline]
254    #[must_use]
255    pub fn capacity(&self) -> usize {
256        self.capacity
257    }
258
259    /// Returns available space in the buffer.
260    #[inline]
261    #[must_use]
262    pub fn available(&self) -> usize {
263        self.capacity.saturating_sub(self.len)
264    }
265
266    /// Clears the buffer without deallocating.
267    #[inline]
268    pub fn clear(&mut self) {
269        self.len = 0;
270    }
271
272    /// Returns a slice of the current references.
273    #[must_use]
274    pub fn as_slice(&self) -> &[ChangelogRef] {
275        &self.refs[..self.len]
276    }
277}
278
279impl Default for ChangelogBuffer {
280    fn default() -> Self {
281        Self::with_capacity(1024)
282    }
283}
284
285// Retractable Aggregators
286
287/// Extension trait for accumulators that support retractions.
288///
289/// Retractable accumulators can "un-apply" a value, which is essential for:
290/// - Late data corrections (emit -old, +new pairs)
291/// - Cascading materialized views
292/// - Changelog-based downstream consumers
293///
294/// # Retraction Efficiency
295///
296/// Some aggregators support O(1) retraction (count, sum, avg), while others
297/// may require O(n) recomputation (min, max without value tracking).
298/// Use `supports_efficient_retraction()` to check.
299pub trait RetractableAccumulator: Default + Clone + Send {
300    /// The input type for the aggregation.
301    type Input;
302    /// The output type produced by the aggregation.
303    type Output;
304
305    /// Adds a value to the accumulator.
306    fn add(&mut self, value: Self::Input);
307
308    /// Retracts (un-applies) a value from the accumulator.
309    ///
310    /// This is the inverse of `add`. For example:
311    /// - Count: decrement by 1
312    /// - Sum: subtract the value
313    /// - Avg: update sum and count
314    fn retract(&mut self, value: &Self::Input);
315
316    /// Merges another accumulator into this one.
317    fn merge(&mut self, other: &Self);
318
319    /// Extracts the final result from the accumulator.
320    fn result(&self) -> Self::Output;
321
322    /// Returns true if the accumulator is empty (no values added).
323    fn is_empty(&self) -> bool;
324
325    /// Returns true if this accumulator can efficiently retract.
326    ///
327    /// Some aggregators (like Min/Max without value tracking) may need to
328    /// scan all values on retraction if the retracted value was the current
329    /// min/max.
330    fn supports_efficient_retraction(&self) -> bool {
331        true
332    }
333
334    /// Resets the accumulator to its initial state.
335    fn reset(&mut self);
336}
337
338/// Retractable count accumulator.
339///
340/// Uses signed integer to support negative counts from retractions.
341/// In a correct pipeline, the count should never go negative.
342#[derive(Debug, Clone, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
343pub struct RetractableCountAccumulator {
344    /// Signed count to support retraction
345    count: i64,
346}
347
348impl RetractableCountAccumulator {
349    /// Creates a new count accumulator.
350    #[must_use]
351    pub fn new() -> Self {
352        Self::default()
353    }
354
355    /// Returns the current count (may be negative during retraction).
356    #[must_use]
357    pub fn count(&self) -> i64 {
358        self.count
359    }
360}
361
362impl RetractableAccumulator for RetractableCountAccumulator {
363    type Input = ();
364    type Output = i64;
365
366    #[inline]
367    fn add(&mut self, _value: ()) {
368        self.count += 1;
369    }
370
371    #[inline]
372    fn retract(&mut self, _value: &()) {
373        self.count -= 1;
374    }
375
376    fn merge(&mut self, other: &Self) {
377        self.count += other.count;
378    }
379
380    fn result(&self) -> i64 {
381        self.count
382    }
383
384    fn is_empty(&self) -> bool {
385        self.count == 0
386    }
387
388    fn reset(&mut self) {
389        self.count = 0;
390    }
391}
392
393/// Retractable sum accumulator.
394///
395/// Supports O(1) retraction by simple subtraction.
396#[derive(Debug, Clone, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
397pub struct RetractableSumAccumulator {
398    /// Running sum (signed)
399    sum: i64,
400    /// Count of values for `is_empty` check
401    count: i64,
402}
403
404impl RetractableSumAccumulator {
405    /// Creates a new sum accumulator.
406    #[must_use]
407    pub fn new() -> Self {
408        Self::default()
409    }
410
411    /// Returns the current sum.
412    #[must_use]
413    pub fn sum(&self) -> i64 {
414        self.sum
415    }
416}
417
418impl RetractableAccumulator for RetractableSumAccumulator {
419    type Input = i64;
420    type Output = i64;
421
422    #[inline]
423    fn add(&mut self, value: i64) {
424        self.sum += value;
425        self.count += 1;
426    }
427
428    #[inline]
429    fn retract(&mut self, value: &i64) {
430        self.sum -= value;
431        self.count -= 1;
432    }
433
434    fn merge(&mut self, other: &Self) {
435        self.sum += other.sum;
436        self.count += other.count;
437    }
438
439    fn result(&self) -> i64 {
440        self.sum
441    }
442
443    fn is_empty(&self) -> bool {
444        self.count == 0
445    }
446
447    fn reset(&mut self) {
448        self.sum = 0;
449        self.count = 0;
450    }
451}
452
453/// Retractable average accumulator.
454///
455/// Supports O(1) retraction by updating sum and count.
456#[derive(Debug, Clone, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
457pub struct RetractableAvgAccumulator {
458    /// Running sum
459    sum: i64,
460    /// Count of values
461    count: i64,
462}
463
464impl RetractableAvgAccumulator {
465    /// Creates a new average accumulator.
466    #[must_use]
467    pub fn new() -> Self {
468        Self::default()
469    }
470
471    /// Returns the current sum.
472    #[must_use]
473    pub fn sum(&self) -> i64 {
474        self.sum
475    }
476
477    /// Returns the current count.
478    #[must_use]
479    pub fn count(&self) -> i64 {
480        self.count
481    }
482}
483
484impl RetractableAccumulator for RetractableAvgAccumulator {
485    type Input = i64;
486    type Output = Option<f64>;
487
488    #[inline]
489    fn add(&mut self, value: i64) {
490        self.sum += value;
491        self.count += 1;
492    }
493
494    #[inline]
495    fn retract(&mut self, value: &i64) {
496        self.sum -= value;
497        self.count -= 1;
498    }
499
500    fn merge(&mut self, other: &Self) {
501        self.sum += other.sum;
502        self.count += other.count;
503    }
504
505    #[allow(clippy::cast_precision_loss)]
506    fn result(&self) -> Option<f64> {
507        if self.count > 0 {
508            Some(self.sum as f64 / self.count as f64)
509        } else {
510            None
511        }
512    }
513
514    fn is_empty(&self) -> bool {
515        self.count == 0
516    }
517
518    fn reset(&mut self) {
519        self.sum = 0;
520        self.count = 0;
521    }
522}
523
524/// Retractable min accumulator with value tracking.
525///
526/// This accumulator tracks all values to support efficient retraction.
527/// When the current minimum is retracted, it recomputes from remaining values.
528///
529/// Note: This uses more memory than the basic `MinAccumulator` because it
530/// stores all values. Use only when retraction support is required.
531#[derive(Debug, Clone, Default)]
532pub struct RetractableMinAccumulator {
533    /// All values (for recomputation on retraction)
534    values: Vec<i64>,
535    /// Cached minimum
536    cached_min: Option<i64>,
537}
538
539impl RetractableMinAccumulator {
540    /// Creates a new min accumulator.
541    #[must_use]
542    pub fn new() -> Self {
543        Self::default()
544    }
545
546    fn recompute_min(&mut self) {
547        self.cached_min = self.values.iter().copied().min();
548    }
549}
550
551impl RetractableAccumulator for RetractableMinAccumulator {
552    type Input = i64;
553    type Output = Option<i64>;
554
555    fn add(&mut self, value: i64) {
556        self.values.push(value);
557        self.cached_min = Some(self.cached_min.map_or(value, |m| m.min(value)));
558    }
559
560    fn retract(&mut self, value: &i64) {
561        if let Some(pos) = self.values.iter().position(|v| v == value) {
562            self.values.swap_remove(pos);
563            // Recompute if we removed the minimum
564            if self.cached_min == Some(*value) {
565                self.recompute_min();
566            }
567        }
568    }
569
570    fn merge(&mut self, other: &Self) {
571        self.values.extend(&other.values);
572        self.recompute_min();
573    }
574
575    fn result(&self) -> Option<i64> {
576        self.cached_min
577    }
578
579    fn is_empty(&self) -> bool {
580        self.values.is_empty()
581    }
582
583    fn supports_efficient_retraction(&self) -> bool {
584        // Retraction may require O(n) recomputation
585        false
586    }
587
588    fn reset(&mut self) {
589        self.values.clear();
590        self.cached_min = None;
591    }
592}
593
594/// Retractable max accumulator with value tracking.
595///
596/// Similar to `RetractableMinAccumulator`, tracks all values for retraction support.
597#[derive(Debug, Clone, Default)]
598pub struct RetractableMaxAccumulator {
599    /// All values (for recomputation on retraction)
600    values: Vec<i64>,
601    /// Cached maximum
602    cached_max: Option<i64>,
603}
604
605impl RetractableMaxAccumulator {
606    /// Creates a new max accumulator.
607    #[must_use]
608    pub fn new() -> Self {
609        Self::default()
610    }
611
612    fn recompute_max(&mut self) {
613        self.cached_max = self.values.iter().copied().max();
614    }
615}
616
617impl RetractableAccumulator for RetractableMaxAccumulator {
618    type Input = i64;
619    type Output = Option<i64>;
620
621    fn add(&mut self, value: i64) {
622        self.values.push(value);
623        self.cached_max = Some(self.cached_max.map_or(value, |m| m.max(value)));
624    }
625
626    fn retract(&mut self, value: &i64) {
627        if let Some(pos) = self.values.iter().position(|v| v == value) {
628            self.values.swap_remove(pos);
629            // Recompute if we removed the maximum
630            if self.cached_max == Some(*value) {
631                self.recompute_max();
632            }
633        }
634    }
635
636    fn merge(&mut self, other: &Self) {
637        self.values.extend(&other.values);
638        self.recompute_max();
639    }
640
641    fn result(&self) -> Option<i64> {
642        self.cached_max
643    }
644
645    fn is_empty(&self) -> bool {
646        self.values.is_empty()
647    }
648
649    fn supports_efficient_retraction(&self) -> bool {
650        // Retraction may require O(n) recomputation
651        false
652    }
653
654    fn reset(&mut self) {
655        self.values.clear();
656        self.cached_max = None;
657    }
658}
659
660// Late Data Retraction Generator
661
662/// Tracks previously emitted results for generating late data retractions.
663#[derive(Debug, Clone)]
664struct EmittedResult {
665    /// The emitted data (serialized for comparison)
666    data: Vec<u8>,
667    /// Timestamp when emitted
668    emit_time: i64,
669    /// Number of times re-emitted (for metrics)
670    version: u32,
671}
672
673/// Generates retractions for late data corrections.
674///
675/// When late data arrives and updates an already-emitted window result,
676/// this generator produces:
677/// 1. A retraction (-1 weight) for the old result
678/// 2. An insert (+1 weight) for the new result
679///
680/// # Example
681///
682/// ```rust,no_run
683/// use laminar_core::operator::changelog::LateDataRetractionGenerator;
684/// use laminar_core::operator::window::WindowId;
685///
686/// let mut gen = LateDataRetractionGenerator::new(true);
687/// let window_id = WindowId::new(0, 60000);
688///
689/// // First emission - no retraction needed
690/// let result1 = gen.check_retraction(&window_id, b"count=5", 1000);
691/// assert!(result1.is_none());
692///
693/// // Late data changes result - generates retraction
694/// let result2 = gen.check_retraction(&window_id, b"count=7", 2000);
695/// assert!(result2.is_some());
696/// let (old, new) = result2.unwrap();
697/// assert_eq!(old.as_slice(), b"count=5");
698/// assert_eq!(new.as_slice(), b"count=7");
699///
700/// // Same result - no retraction
701/// let result3 = gen.check_retraction(&window_id, b"count=7", 3000);
702/// assert!(result3.is_none());
703/// ```
704pub struct LateDataRetractionGenerator {
705    /// Previously emitted results (for generating retractions)
706    emitted_results: FxHashMap<WindowId, EmittedResult>,
707    /// Whether retraction generation is enabled
708    enabled: bool,
709    /// Metrics: total retractions generated
710    retractions_generated: u64,
711    /// Metrics: total windows tracked
712    windows_tracked: u64,
713}
714
715impl LateDataRetractionGenerator {
716    /// Creates a new generator.
717    #[must_use]
718    pub fn new(enabled: bool) -> Self {
719        Self {
720            emitted_results: FxHashMap::default(),
721            enabled,
722            retractions_generated: 0,
723            windows_tracked: 0,
724        }
725    }
726
727    /// Creates a disabled generator (no-op).
728    #[must_use]
729    pub fn disabled() -> Self {
730        Self::new(false)
731    }
732
733    /// Returns true if retraction generation is enabled.
734    #[must_use]
735    pub fn is_enabled(&self) -> bool {
736        self.enabled
737    }
738
739    /// Enables or disables retraction generation.
740    pub fn set_enabled(&mut self, enabled: bool) {
741        self.enabled = enabled;
742    }
743
744    /// Checks if we need to generate a retraction for this window.
745    ///
746    /// Returns `Some((old_data, new_data))` if the window was previously
747    /// emitted with different data. Returns `None` if this is the first
748    /// emission or the data hasn't changed.
749    pub fn check_retraction(
750        &mut self,
751        window_id: &WindowId,
752        new_data: &[u8],
753        timestamp: i64,
754    ) -> Option<(Vec<u8>, Vec<u8>)> {
755        if !self.enabled {
756            return None;
757        }
758
759        if let Some(prev) = self.emitted_results.get_mut(window_id) {
760            if prev.data != new_data {
761                let old_data = std::mem::replace(&mut prev.data, new_data.to_vec());
762                prev.emit_time = timestamp;
763                prev.version += 1;
764                self.retractions_generated += 1;
765                return Some((old_data, new_data.to_vec()));
766            }
767        } else {
768            self.emitted_results.insert(
769                *window_id,
770                EmittedResult {
771                    data: new_data.to_vec(),
772                    emit_time: timestamp,
773                    version: 1,
774                },
775            );
776            self.windows_tracked += 1;
777        }
778
779        None
780    }
781
782    /// Checks for retraction and returns borrowed slices (avoiding allocation
783    /// when no retraction is needed).
784    ///
785    /// Returns `Some(old_data)` if retraction is needed. The caller should
786    /// then emit the retraction for `old_data` and insert for `new_data`.
787    pub fn check_retraction_ref(
788        &mut self,
789        window_id: &WindowId,
790        new_data: &[u8],
791        timestamp: i64,
792    ) -> Option<Vec<u8>> {
793        if !self.enabled {
794            return None;
795        }
796
797        if let Some(prev) = self.emitted_results.get_mut(window_id) {
798            if prev.data != new_data {
799                let old_data = std::mem::replace(&mut prev.data, new_data.to_vec());
800                prev.emit_time = timestamp;
801                prev.version += 1;
802                self.retractions_generated += 1;
803                return Some(old_data);
804            }
805        } else {
806            self.emitted_results.insert(
807                *window_id,
808                EmittedResult {
809                    data: new_data.to_vec(),
810                    emit_time: timestamp,
811                    version: 1,
812                },
813            );
814            self.windows_tracked += 1;
815        }
816
817        None
818    }
819
820    /// Cleans up state for closed windows.
821    ///
822    /// Call this when a window is closed to prevent unbounded memory growth.
823    pub fn cleanup_window(&mut self, window_id: &WindowId) {
824        self.emitted_results.remove(window_id);
825    }
826
827    /// Cleans up state for windows that ended before the given watermark.
828    ///
829    /// This should be called periodically to bound memory usage.
830    pub fn cleanup_before_watermark(&mut self, watermark: i64) {
831        self.emitted_results
832            .retain(|window_id, _| window_id.end > watermark);
833    }
834
835    /// Returns the number of retractions generated.
836    #[must_use]
837    pub fn retractions_generated(&self) -> u64 {
838        self.retractions_generated
839    }
840
841    /// Returns the number of windows currently being tracked.
842    #[must_use]
843    pub fn windows_tracked(&self) -> usize {
844        self.emitted_results.len()
845    }
846
847    /// Resets all metrics.
848    pub fn reset_metrics(&mut self) {
849        self.retractions_generated = 0;
850        self.windows_tracked = 0;
851    }
852
853    /// Clears all tracked state.
854    pub fn clear(&mut self) {
855        self.emitted_results.clear();
856        self.reset_metrics();
857    }
858}
859
860impl Default for LateDataRetractionGenerator {
861    fn default() -> Self {
862        Self::new(true)
863    }
864}
865
866// CDC Envelope (Debezium-Compatible)
867
868/// Source metadata for CDC envelope.
869///
870/// Contains information about the origin of the change event.
871#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
872pub struct CdcSource {
873    /// Source name (e.g., "laminardb")
874    pub name: String,
875    /// Database/schema name
876    pub db: String,
877    /// Table/view name
878    pub table: String,
879    /// Sequence number for ordering
880    #[serde(default)]
881    pub sequence: u64,
882}
883
884impl CdcSource {
885    /// Creates a new CDC source.
886    #[must_use]
887    pub fn new(name: impl Into<String>, db: impl Into<String>, table: impl Into<String>) -> Self {
888        Self {
889            name: name.into(),
890            db: db.into(),
891            table: table.into(),
892            sequence: 0,
893        }
894    }
895
896    /// Creates a new CDC source with sequence number.
897    #[must_use]
898    pub fn with_sequence(
899        name: impl Into<String>,
900        db: impl Into<String>,
901        table: impl Into<String>,
902        sequence: u64,
903    ) -> Self {
904        Self {
905            name: name.into(),
906            db: db.into(),
907            table: table.into(),
908            sequence,
909        }
910    }
911
912    /// Increments and returns the sequence number.
913    pub fn next_sequence(&mut self) -> u64 {
914        self.sequence += 1;
915        self.sequence
916    }
917}
918
919/// CDC envelope for sink serialization.
920///
921/// Compatible with Debezium envelope format for interoperability with
922/// downstream systems (Kafka Connect, data lakes, etc.).
923///
924/// # Debezium Operation Codes
925///
926/// - `"c"`: Create (insert)
927/// - `"u"`: Update
928/// - `"d"`: Delete
929/// - `"r"`: Read (snapshot)
930///
931/// # Example
932///
933/// ```rust,no_run
934/// use laminar_core::operator::changelog::{CdcEnvelope, CdcSource};
935/// use serde_json::json;
936///
937/// let source = CdcSource::new("laminardb", "default", "orders");
938///
939/// // Insert
940/// let insert = CdcEnvelope::insert(json!({"id": 1, "amount": 100}), source.clone(), 1000);
941/// assert_eq!(insert.op, "c");
942///
943/// // Delete
944/// let delete = CdcEnvelope::delete(json!({"id": 1}), source.clone(), 2000);
945/// assert_eq!(delete.op, "d");
946///
947/// // Update
948/// let update = CdcEnvelope::update(
949///     json!({"id": 1, "amount": 100}),
950///     json!({"id": 1, "amount": 150}),
951///     source,
952///     3000,
953/// );
954/// assert_eq!(update.op, "u");
955/// ```
956#[derive(Debug, Clone, Serialize, Deserialize)]
957pub struct CdcEnvelope<T> {
958    /// Operation type: "c" (create), "u" (update), "d" (delete), "r" (read/snapshot)
959    pub op: String,
960    /// Timestamp in milliseconds since epoch
961    pub ts_ms: i64,
962    /// Source metadata
963    pub source: CdcSource,
964    /// Value before change (for updates/deletes)
965    #[serde(skip_serializing_if = "Option::is_none")]
966    pub before: Option<T>,
967    /// Value after change (for inserts/updates)
968    #[serde(skip_serializing_if = "Option::is_none")]
969    pub after: Option<T>,
970}
971
972impl<T> CdcEnvelope<T> {
973    /// Creates an insert (create) envelope.
974    #[must_use]
975    pub fn insert(after: T, source: CdcSource, ts_ms: i64) -> Self {
976        Self {
977            op: "c".to_string(),
978            ts_ms,
979            source,
980            before: None,
981            after: Some(after),
982        }
983    }
984
985    /// Creates a delete envelope.
986    #[must_use]
987    pub fn delete(before: T, source: CdcSource, ts_ms: i64) -> Self {
988        Self {
989            op: "d".to_string(),
990            ts_ms,
991            source,
992            before: Some(before),
993            after: None,
994        }
995    }
996
997    /// Creates an update envelope.
998    #[must_use]
999    pub fn update(before: T, after: T, source: CdcSource, ts_ms: i64) -> Self {
1000        Self {
1001            op: "u".to_string(),
1002            ts_ms,
1003            source,
1004            before: Some(before),
1005            after: Some(after),
1006        }
1007    }
1008
1009    /// Creates a read (snapshot) envelope.
1010    #[must_use]
1011    pub fn read(after: T, source: CdcSource, ts_ms: i64) -> Self {
1012        Self {
1013            op: "r".to_string(),
1014            ts_ms,
1015            source,
1016            before: None,
1017            after: Some(after),
1018        }
1019    }
1020
1021    /// Returns true if this is an insert operation.
1022    #[must_use]
1023    pub fn is_insert(&self) -> bool {
1024        self.op == "c"
1025    }
1026
1027    /// Returns true if this is a delete operation.
1028    #[must_use]
1029    pub fn is_delete(&self) -> bool {
1030        self.op == "d"
1031    }
1032
1033    /// Returns true if this is an update operation.
1034    #[must_use]
1035    pub fn is_update(&self) -> bool {
1036        self.op == "u"
1037    }
1038
1039    /// Returns the Z-set weight for this operation.
1040    ///
1041    /// - Insert/Read: +1
1042    /// - Delete: -1
1043    /// - Update: 0 (net effect of -1 for before + +1 for after)
1044    #[must_use]
1045    pub fn weight(&self) -> i32 {
1046        match self.op.as_str() {
1047            "c" | "r" => 1,
1048            "d" => -1,
1049            // "u" (update) and unknown operations have net weight of 0
1050            _ => 0,
1051        }
1052    }
1053}
1054
1055impl<T: Serialize> CdcEnvelope<T> {
1056    /// Serializes the envelope to JSON.
1057    ///
1058    /// # Errors
1059    ///
1060    /// Returns an error if serialization fails.
1061    pub fn to_json(&self) -> Result<String, serde_json::Error> {
1062        serde_json::to_string(self)
1063    }
1064
1065    /// Serializes the envelope to pretty-printed JSON.
1066    ///
1067    /// # Errors
1068    ///
1069    /// Returns an error if serialization fails.
1070    pub fn to_json_pretty(&self) -> Result<String, serde_json::Error> {
1071        serde_json::to_string_pretty(self)
1072    }
1073
1074    /// Serializes the envelope to JSON bytes.
1075    ///
1076    /// # Errors
1077    ///
1078    /// Returns an error if serialization fails.
1079    pub fn to_json_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
1080        serde_json::to_vec(self)
1081    }
1082}
1083
1084// F076: Retractable FIRST/LAST Accumulators
1085
1086/// Retractable `FIRST_VALUE` accumulator for changelog/retraction mode.
1087///
1088/// Stores all `(timestamp, value)` entries sorted by timestamp ascending.
1089/// On retraction, removes the entry and recomputes the first value.
1090/// This is necessary for `EMIT CHANGES` with OHLC queries where the
1091/// open price may need to be retracted.
1092///
1093/// # Ring Architecture
1094///
1095/// This is a Ring 1 structure (allocates). Ring 0 uses the non-retractable
1096/// [`super::window::FirstValueAccumulator`] via the static dispatch path.
1097#[derive(Debug, Clone, Default)]
1098pub struct RetractableFirstValueAccumulator {
1099    /// Sorted entries: `(timestamp, value)`, ascending by timestamp
1100    entries: Vec<(i64, i64)>,
1101}
1102
1103impl RetractableFirstValueAccumulator {
1104    /// Creates a new empty accumulator.
1105    #[must_use]
1106    pub fn new() -> Self {
1107        Self::default()
1108    }
1109
1110    /// Returns the number of stored entries.
1111    #[must_use]
1112    pub fn len(&self) -> usize {
1113        self.entries.len()
1114    }
1115
1116    /// Returns true if no entries are stored.
1117    #[must_use]
1118    pub fn is_empty(&self) -> bool {
1119        self.entries.is_empty()
1120    }
1121}
1122
1123impl RetractableAccumulator for RetractableFirstValueAccumulator {
1124    type Input = (i64, i64); // (timestamp, value)
1125    type Output = Option<i64>;
1126
1127    fn add(&mut self, (timestamp, value): (i64, i64)) {
1128        // Insert in sorted order by timestamp, preserving insertion order
1129        // for duplicate timestamps (append after existing same-timestamp entries)
1130        let pos = match self.entries.binary_search_by_key(&timestamp, |(ts, _)| *ts) {
1131            Ok(mut p) => {
1132                // Skip past all entries with the same timestamp
1133                while p < self.entries.len() && self.entries[p].0 == timestamp {
1134                    p += 1;
1135                }
1136                p
1137            }
1138            Err(p) => p,
1139        };
1140        self.entries.insert(pos, (timestamp, value));
1141    }
1142
1143    fn retract(&mut self, (timestamp, value): &(i64, i64)) {
1144        // Find and remove the exact entry
1145        if let Some(pos) = self
1146            .entries
1147            .iter()
1148            .position(|(ts, val)| ts == timestamp && val == value)
1149        {
1150            self.entries.remove(pos);
1151        }
1152    }
1153
1154    fn merge(&mut self, other: &Self) {
1155        // Merge sorted lists
1156        let mut merged = Vec::with_capacity(self.entries.len() + other.entries.len());
1157        let mut i = 0;
1158        let mut j = 0;
1159        while i < self.entries.len() && j < other.entries.len() {
1160            if self.entries[i].0 <= other.entries[j].0 {
1161                merged.push(self.entries[i]);
1162                i += 1;
1163            } else {
1164                merged.push(other.entries[j]);
1165                j += 1;
1166            }
1167        }
1168        merged.extend_from_slice(&self.entries[i..]);
1169        merged.extend_from_slice(&other.entries[j..]);
1170        self.entries = merged;
1171    }
1172
1173    fn result(&self) -> Option<i64> {
1174        // First entry has the earliest timestamp
1175        self.entries.first().map(|(_, val)| *val)
1176    }
1177
1178    fn is_empty(&self) -> bool {
1179        self.entries.is_empty()
1180    }
1181
1182    fn supports_efficient_retraction(&self) -> bool {
1183        true
1184    }
1185
1186    fn reset(&mut self) {
1187        self.entries.clear();
1188    }
1189}
1190
1191/// Retractable `LAST_VALUE` accumulator for changelog/retraction mode.
1192///
1193/// Stores all `(timestamp, value)` entries sorted by timestamp ascending.
1194/// On retraction, removes the entry and recomputes the last value.
1195/// This is necessary for `EMIT CHANGES` with OHLC queries where the
1196/// close price may need to be retracted.
1197#[derive(Debug, Clone, Default)]
1198pub struct RetractableLastValueAccumulator {
1199    /// Sorted entries: `(timestamp, value)`, ascending by timestamp
1200    entries: Vec<(i64, i64)>,
1201}
1202
1203impl RetractableLastValueAccumulator {
1204    /// Creates a new empty accumulator.
1205    #[must_use]
1206    pub fn new() -> Self {
1207        Self::default()
1208    }
1209
1210    /// Returns the number of stored entries.
1211    #[must_use]
1212    pub fn len(&self) -> usize {
1213        self.entries.len()
1214    }
1215
1216    /// Returns true if no entries are stored.
1217    #[must_use]
1218    pub fn is_empty(&self) -> bool {
1219        self.entries.is_empty()
1220    }
1221}
1222
1223impl RetractableAccumulator for RetractableLastValueAccumulator {
1224    type Input = (i64, i64); // (timestamp, value)
1225    type Output = Option<i64>;
1226
1227    fn add(&mut self, (timestamp, value): (i64, i64)) {
1228        // Insert in sorted order by timestamp, preserving insertion order
1229        let pos = match self.entries.binary_search_by_key(&timestamp, |(ts, _)| *ts) {
1230            Ok(mut p) => {
1231                while p < self.entries.len() && self.entries[p].0 == timestamp {
1232                    p += 1;
1233                }
1234                p
1235            }
1236            Err(p) => p,
1237        };
1238        self.entries.insert(pos, (timestamp, value));
1239    }
1240
1241    fn retract(&mut self, (timestamp, value): &(i64, i64)) {
1242        if let Some(pos) = self
1243            .entries
1244            .iter()
1245            .position(|(ts, val)| ts == timestamp && val == value)
1246        {
1247            self.entries.remove(pos);
1248        }
1249    }
1250
1251    fn merge(&mut self, other: &Self) {
1252        let mut merged = Vec::with_capacity(self.entries.len() + other.entries.len());
1253        let mut i = 0;
1254        let mut j = 0;
1255        while i < self.entries.len() && j < other.entries.len() {
1256            if self.entries[i].0 <= other.entries[j].0 {
1257                merged.push(self.entries[i]);
1258                i += 1;
1259            } else {
1260                merged.push(other.entries[j]);
1261                j += 1;
1262            }
1263        }
1264        merged.extend_from_slice(&self.entries[i..]);
1265        merged.extend_from_slice(&other.entries[j..]);
1266        self.entries = merged;
1267    }
1268
1269    fn result(&self) -> Option<i64> {
1270        // Last entry has the latest timestamp
1271        self.entries.last().map(|(_, val)| *val)
1272    }
1273
1274    fn is_empty(&self) -> bool {
1275        self.entries.is_empty()
1276    }
1277
1278    fn supports_efficient_retraction(&self) -> bool {
1279        true
1280    }
1281
1282    fn reset(&mut self) {
1283        self.entries.clear();
1284    }
1285}
1286
1287/// Retractable `FIRST_VALUE` accumulator for f64 values.
1288///
1289/// Uses `f64::to_bits()` / `f64::from_bits()` for lossless i64 storage
1290/// within the sorted entry list.
1291#[derive(Debug, Clone, Default)]
1292pub struct RetractableFirstValueF64Accumulator {
1293    /// Sorted entries: `(timestamp, value_bits)`, ascending by timestamp
1294    entries: Vec<(i64, i64)>,
1295}
1296
1297impl RetractableFirstValueF64Accumulator {
1298    /// Creates a new empty accumulator.
1299    #[must_use]
1300    pub fn new() -> Self {
1301        Self::default()
1302    }
1303
1304    /// Returns the number of stored entries.
1305    #[must_use]
1306    pub fn len(&self) -> usize {
1307        self.entries.len()
1308    }
1309
1310    /// Returns true if no entries are stored.
1311    #[must_use]
1312    pub fn is_empty(&self) -> bool {
1313        self.entries.is_empty()
1314    }
1315
1316    /// Returns the result as f64.
1317    #[must_use]
1318    #[allow(clippy::cast_sign_loss)]
1319    pub fn result_f64(&self) -> Option<f64> {
1320        self.entries
1321            .first()
1322            .map(|(_, bits)| f64::from_bits(*bits as u64))
1323    }
1324}
1325
1326impl RetractableAccumulator for RetractableFirstValueF64Accumulator {
1327    type Input = (i64, f64); // (timestamp, value)
1328    type Output = Option<i64>; // value_bits for compatibility
1329
1330    #[allow(clippy::cast_possible_wrap)]
1331    fn add(&mut self, (timestamp, value): (i64, f64)) {
1332        let value_bits = value.to_bits() as i64;
1333        let pos = match self.entries.binary_search_by_key(&timestamp, |(ts, _)| *ts) {
1334            Ok(mut p) => {
1335                while p < self.entries.len() && self.entries[p].0 == timestamp {
1336                    p += 1;
1337                }
1338                p
1339            }
1340            Err(p) => p,
1341        };
1342        self.entries.insert(pos, (timestamp, value_bits));
1343    }
1344
1345    fn retract(&mut self, (timestamp, value): &(i64, f64)) {
1346        #[allow(clippy::cast_possible_wrap)]
1347        let value_bits = value.to_bits() as i64;
1348        if let Some(pos) = self
1349            .entries
1350            .iter()
1351            .position(|(ts, val)| *ts == *timestamp && *val == value_bits)
1352        {
1353            self.entries.remove(pos);
1354        }
1355    }
1356
1357    fn merge(&mut self, other: &Self) {
1358        let mut merged = Vec::with_capacity(self.entries.len() + other.entries.len());
1359        let mut i = 0;
1360        let mut j = 0;
1361        while i < self.entries.len() && j < other.entries.len() {
1362            if self.entries[i].0 <= other.entries[j].0 {
1363                merged.push(self.entries[i]);
1364                i += 1;
1365            } else {
1366                merged.push(other.entries[j]);
1367                j += 1;
1368            }
1369        }
1370        merged.extend_from_slice(&self.entries[i..]);
1371        merged.extend_from_slice(&other.entries[j..]);
1372        self.entries = merged;
1373    }
1374
1375    fn result(&self) -> Option<i64> {
1376        self.entries.first().map(|(_, val)| *val)
1377    }
1378
1379    fn is_empty(&self) -> bool {
1380        self.entries.is_empty()
1381    }
1382
1383    fn supports_efficient_retraction(&self) -> bool {
1384        true
1385    }
1386
1387    fn reset(&mut self) {
1388        self.entries.clear();
1389    }
1390}
1391
1392/// Retractable `LAST_VALUE` accumulator for f64 values.
1393///
1394/// Uses `f64::to_bits()` / `f64::from_bits()` for lossless i64 storage.
1395#[derive(Debug, Clone, Default)]
1396pub struct RetractableLastValueF64Accumulator {
1397    /// Sorted entries: `(timestamp, value_bits)`, ascending by timestamp
1398    entries: Vec<(i64, i64)>,
1399}
1400
1401impl RetractableLastValueF64Accumulator {
1402    /// Creates a new empty accumulator.
1403    #[must_use]
1404    pub fn new() -> Self {
1405        Self::default()
1406    }
1407
1408    /// Returns the number of stored entries.
1409    #[must_use]
1410    pub fn len(&self) -> usize {
1411        self.entries.len()
1412    }
1413
1414    /// Returns true if no entries are stored.
1415    #[must_use]
1416    pub fn is_empty(&self) -> bool {
1417        self.entries.is_empty()
1418    }
1419
1420    /// Returns the result as f64.
1421    #[must_use]
1422    #[allow(clippy::cast_sign_loss)]
1423    pub fn result_f64(&self) -> Option<f64> {
1424        self.entries
1425            .last()
1426            .map(|(_, bits)| f64::from_bits(*bits as u64))
1427    }
1428}
1429
1430impl RetractableAccumulator for RetractableLastValueF64Accumulator {
1431    type Input = (i64, f64); // (timestamp, value)
1432    type Output = Option<i64>; // value_bits for compatibility
1433
1434    #[allow(clippy::cast_possible_wrap)]
1435    fn add(&mut self, (timestamp, value): (i64, f64)) {
1436        let value_bits = value.to_bits() as i64;
1437        let pos = match self.entries.binary_search_by_key(&timestamp, |(ts, _)| *ts) {
1438            Ok(mut p) => {
1439                while p < self.entries.len() && self.entries[p].0 == timestamp {
1440                    p += 1;
1441                }
1442                p
1443            }
1444            Err(p) => p,
1445        };
1446        self.entries.insert(pos, (timestamp, value_bits));
1447    }
1448
1449    fn retract(&mut self, (timestamp, value): &(i64, f64)) {
1450        #[allow(clippy::cast_possible_wrap)]
1451        let value_bits = value.to_bits() as i64;
1452        if let Some(pos) = self
1453            .entries
1454            .iter()
1455            .position(|(ts, val)| *ts == *timestamp && *val == value_bits)
1456        {
1457            self.entries.remove(pos);
1458        }
1459    }
1460
1461    fn merge(&mut self, other: &Self) {
1462        let mut merged = Vec::with_capacity(self.entries.len() + other.entries.len());
1463        let mut i = 0;
1464        let mut j = 0;
1465        while i < self.entries.len() && j < other.entries.len() {
1466            if self.entries[i].0 <= other.entries[j].0 {
1467                merged.push(self.entries[i]);
1468                i += 1;
1469            } else {
1470                merged.push(other.entries[j]);
1471                j += 1;
1472            }
1473        }
1474        merged.extend_from_slice(&self.entries[i..]);
1475        merged.extend_from_slice(&other.entries[j..]);
1476        self.entries = merged;
1477    }
1478
1479    fn result(&self) -> Option<i64> {
1480        self.entries.last().map(|(_, val)| *val)
1481    }
1482
1483    fn is_empty(&self) -> bool {
1484        self.entries.is_empty()
1485    }
1486
1487    fn supports_efficient_retraction(&self) -> bool {
1488        true
1489    }
1490
1491    fn reset(&mut self) {
1492        self.entries.clear();
1493    }
1494}
1495
1496// Tests
1497
1498#[cfg(test)]
1499mod tests {
1500    use super::*;
1501
1502    // ChangelogRef Tests
1503
1504    #[test]
1505    fn test_changelog_ref_insert() {
1506        let cr = ChangelogRef::insert(10, 5);
1507        assert_eq!(cr.batch_offset, 10);
1508        assert_eq!(cr.row_index, 5);
1509        assert_eq!(cr.weight, 1);
1510        assert_eq!(cr.operation(), CdcOperation::Insert);
1511        assert!(cr.is_insert());
1512        assert!(!cr.is_delete());
1513    }
1514
1515    #[test]
1516    fn test_changelog_ref_delete() {
1517        let cr = ChangelogRef::delete(20, 3);
1518        assert_eq!(cr.batch_offset, 20);
1519        assert_eq!(cr.row_index, 3);
1520        assert_eq!(cr.weight, -1);
1521        assert_eq!(cr.operation(), CdcOperation::Delete);
1522        assert!(!cr.is_insert());
1523        assert!(cr.is_delete());
1524    }
1525
1526    #[test]
1527    fn test_changelog_ref_update() {
1528        let before = ChangelogRef::update_before(5, 1);
1529        let after = ChangelogRef::update_after(5, 2);
1530
1531        assert_eq!(before.weight, -1);
1532        assert_eq!(after.weight, 1);
1533        assert_eq!(before.operation(), CdcOperation::UpdateBefore);
1534        assert_eq!(after.operation(), CdcOperation::UpdateAfter);
1535    }
1536
1537    #[test]
1538    fn test_changelog_ref_size() {
1539        // Verify compact size
1540        assert!(std::mem::size_of::<ChangelogRef>() <= 16);
1541    }
1542
1543    // ChangelogBuffer Tests
1544
1545    #[test]
1546    fn test_changelog_buffer_basic() {
1547        let mut buffer = ChangelogBuffer::with_capacity(10);
1548        assert!(buffer.is_empty());
1549        assert_eq!(buffer.capacity(), 10);
1550
1551        assert!(buffer.push(ChangelogRef::insert(0, 0)));
1552        assert!(buffer.push(ChangelogRef::delete(1, 0)));
1553
1554        assert_eq!(buffer.len(), 2);
1555        assert!(!buffer.is_empty());
1556    }
1557
1558    #[test]
1559    fn test_changelog_buffer_full() {
1560        let mut buffer = ChangelogBuffer::with_capacity(2);
1561
1562        assert!(buffer.push(ChangelogRef::insert(0, 0)));
1563        assert!(buffer.push(ChangelogRef::insert(1, 0)));
1564        assert!(!buffer.push(ChangelogRef::insert(2, 0))); // Full
1565
1566        assert!(buffer.is_full());
1567        assert_eq!(buffer.available(), 0);
1568    }
1569
1570    #[test]
1571    fn test_changelog_buffer_drain() {
1572        let mut buffer = ChangelogBuffer::with_capacity(10);
1573
1574        for i in 0..5 {
1575            buffer.push(ChangelogRef::insert(i, 0));
1576        }
1577
1578        let drained: Vec<_> = buffer.drain().collect();
1579        assert_eq!(drained.len(), 5);
1580        assert!(buffer.is_empty());
1581
1582        // Buffer can be reused
1583        for i in 0..3 {
1584            buffer.push(ChangelogRef::delete(i, 0));
1585        }
1586        assert_eq!(buffer.len(), 3);
1587    }
1588
1589    #[test]
1590    fn test_changelog_buffer_retraction() {
1591        let mut buffer = ChangelogBuffer::with_capacity(10);
1592
1593        assert!(buffer.push_retraction(0, 1, 2));
1594        assert_eq!(buffer.len(), 2);
1595
1596        let refs: Vec<_> = buffer.as_slice().to_vec();
1597        assert_eq!(refs[0].operation(), CdcOperation::UpdateBefore);
1598        assert_eq!(refs[0].row_index, 1);
1599        assert_eq!(refs[1].operation(), CdcOperation::UpdateAfter);
1600        assert_eq!(refs[1].row_index, 2);
1601    }
1602
1603    #[test]
1604    fn test_changelog_buffer_zero_alloc_reuse() {
1605        let mut buffer = ChangelogBuffer::with_capacity(100);
1606
1607        // First pass
1608        for i in 0..50 {
1609            buffer.push(ChangelogRef::insert(i, 0));
1610        }
1611        let _: Vec<_> = buffer.drain().collect();
1612
1613        // Second pass - should not allocate
1614        for i in 0..50 {
1615            buffer.push(ChangelogRef::insert(i, 0));
1616        }
1617
1618        assert_eq!(buffer.len(), 50);
1619    }
1620
1621    // Retractable Accumulator Tests
1622
1623    #[test]
1624    fn test_retractable_count() {
1625        let mut agg = RetractableCountAccumulator::default();
1626
1627        agg.add(());
1628        agg.add(());
1629        agg.add(());
1630        assert_eq!(agg.result(), 3);
1631
1632        agg.retract(&());
1633        assert_eq!(agg.result(), 2);
1634
1635        agg.retract(&());
1636        agg.retract(&());
1637        assert_eq!(agg.result(), 0);
1638    }
1639
1640    #[test]
1641    fn test_retractable_count_negative() {
1642        let mut agg = RetractableCountAccumulator::default();
1643
1644        agg.add(());
1645        agg.retract(&());
1646        agg.retract(&()); // Extra retraction
1647
1648        // Count can go negative (indicates an error in the pipeline)
1649        assert_eq!(agg.result(), -1);
1650    }
1651
1652    #[test]
1653    fn test_retractable_sum() {
1654        let mut agg = RetractableSumAccumulator::default();
1655
1656        agg.add(10);
1657        agg.add(20);
1658        agg.add(30);
1659        assert_eq!(agg.result(), 60);
1660
1661        agg.retract(&20);
1662        assert_eq!(agg.result(), 40);
1663
1664        agg.retract(&10);
1665        agg.retract(&30);
1666        assert_eq!(agg.result(), 0);
1667    }
1668
1669    #[test]
1670    fn test_retractable_sum_merge() {
1671        let mut agg1 = RetractableSumAccumulator::default();
1672        agg1.add(10);
1673        agg1.add(20);
1674
1675        let mut agg2 = RetractableSumAccumulator::default();
1676        agg2.add(30);
1677        agg2.retract(&5);
1678
1679        agg1.merge(&agg2);
1680        assert_eq!(agg1.result(), 55); // 10 + 20 + 30 - 5
1681    }
1682
1683    #[test]
1684    fn test_retractable_avg() {
1685        let mut agg = RetractableAvgAccumulator::default();
1686
1687        agg.add(10);
1688        agg.add(20);
1689        agg.add(30);
1690        let avg = agg.result().unwrap();
1691        assert!((avg - 20.0).abs() < f64::EPSILON);
1692
1693        agg.retract(&30);
1694        let avg = agg.result().unwrap();
1695        assert!((avg - 15.0).abs() < f64::EPSILON); // (10 + 20) / 2
1696    }
1697
1698    #[test]
1699    fn test_retractable_avg_empty() {
1700        let mut agg = RetractableAvgAccumulator::default();
1701        assert!(agg.result().is_none());
1702
1703        agg.add(10);
1704        agg.retract(&10);
1705        assert!(agg.result().is_none());
1706    }
1707
1708    #[test]
1709    fn test_retractable_min() {
1710        let mut agg = RetractableMinAccumulator::default();
1711
1712        agg.add(30);
1713        agg.add(10);
1714        agg.add(20);
1715        assert_eq!(agg.result(), Some(10));
1716
1717        // Retract the minimum
1718        agg.retract(&10);
1719        assert_eq!(agg.result(), Some(20));
1720
1721        // Retract a non-minimum
1722        agg.retract(&30);
1723        assert_eq!(agg.result(), Some(20));
1724
1725        agg.retract(&20);
1726        assert_eq!(agg.result(), None);
1727    }
1728
1729    #[test]
1730    fn test_retractable_max() {
1731        let mut agg = RetractableMaxAccumulator::default();
1732
1733        agg.add(10);
1734        agg.add(30);
1735        agg.add(20);
1736        assert_eq!(agg.result(), Some(30));
1737
1738        // Retract the maximum
1739        agg.retract(&30);
1740        assert_eq!(agg.result(), Some(20));
1741
1742        agg.retract(&20);
1743        agg.retract(&10);
1744        assert_eq!(agg.result(), None);
1745    }
1746
1747    #[test]
1748    fn test_retractable_efficiency_flags() {
1749        let count = RetractableCountAccumulator::default();
1750        let sum = RetractableSumAccumulator::default();
1751        let avg = RetractableAvgAccumulator::default();
1752        let min = RetractableMinAccumulator::default();
1753        let max = RetractableMaxAccumulator::default();
1754
1755        // Count, sum, avg have O(1) retraction
1756        assert!(count.supports_efficient_retraction());
1757        assert!(sum.supports_efficient_retraction());
1758        assert!(avg.supports_efficient_retraction());
1759
1760        // Min/max may need recomputation
1761        assert!(!min.supports_efficient_retraction());
1762        assert!(!max.supports_efficient_retraction());
1763    }
1764
1765    // LateDataRetractionGenerator Tests
1766
1767    #[test]
1768    fn test_late_data_retraction_first_emission() {
1769        let mut gen = LateDataRetractionGenerator::new(true);
1770        let window_id = WindowId::new(0, 60000);
1771
1772        // First emission - no retraction
1773        let result = gen.check_retraction(&window_id, b"count=5", 1000);
1774        assert!(result.is_none());
1775        assert_eq!(gen.windows_tracked(), 1);
1776    }
1777
1778    #[test]
1779    fn test_late_data_retraction_changed_result() {
1780        let mut gen = LateDataRetractionGenerator::new(true);
1781        let window_id = WindowId::new(0, 60000);
1782
1783        // First emission
1784        gen.check_retraction(&window_id, b"count=5", 1000);
1785
1786        // Late data causes different result - generates retraction
1787        let result = gen.check_retraction(&window_id, b"count=7", 2000);
1788        assert!(result.is_some());
1789
1790        let (old, new) = result.unwrap();
1791        assert_eq!(old, b"count=5");
1792        assert_eq!(new, b"count=7");
1793        assert_eq!(gen.retractions_generated(), 1);
1794    }
1795
1796    #[test]
1797    fn test_late_data_retraction_same_result() {
1798        let mut gen = LateDataRetractionGenerator::new(true);
1799        let window_id = WindowId::new(0, 60000);
1800
1801        // First emission
1802        gen.check_retraction(&window_id, b"count=5", 1000);
1803
1804        // Same result - no retraction
1805        let result = gen.check_retraction(&window_id, b"count=5", 2000);
1806        assert!(result.is_none());
1807        assert_eq!(gen.retractions_generated(), 0);
1808    }
1809
1810    #[test]
1811    fn test_late_data_retraction_disabled() {
1812        let mut gen = LateDataRetractionGenerator::new(false);
1813        let window_id = WindowId::new(0, 60000);
1814
1815        gen.check_retraction(&window_id, b"count=5", 1000);
1816        let result = gen.check_retraction(&window_id, b"count=7", 2000);
1817
1818        // No retraction when disabled
1819        assert!(result.is_none());
1820    }
1821
1822    #[test]
1823    fn test_late_data_cleanup() {
1824        let mut gen = LateDataRetractionGenerator::new(true);
1825
1826        let w1 = WindowId::new(0, 1000);
1827        let w2 = WindowId::new(1000, 2000);
1828
1829        gen.check_retraction(&w1, b"a", 100);
1830        gen.check_retraction(&w2, b"b", 200);
1831        assert_eq!(gen.windows_tracked(), 2);
1832
1833        gen.cleanup_window(&w1);
1834        assert_eq!(gen.windows_tracked(), 1);
1835
1836        gen.cleanup_before_watermark(2000);
1837        assert_eq!(gen.windows_tracked(), 0);
1838    }
1839
1840    // CdcEnvelope Tests
1841
1842    #[test]
1843    fn test_cdc_envelope_insert() {
1844        let source = CdcSource::new("laminardb", "default", "orders");
1845        let envelope = CdcEnvelope::insert(
1846            serde_json::json!({"id": 1, "amount": 100}),
1847            source,
1848            1_706_140_800_000,
1849        );
1850
1851        assert_eq!(envelope.op, "c");
1852        assert!(envelope.is_insert());
1853        assert!(envelope.before.is_none());
1854        assert!(envelope.after.is_some());
1855        assert_eq!(envelope.weight(), 1);
1856    }
1857
1858    #[test]
1859    fn test_cdc_envelope_delete() {
1860        let source = CdcSource::new("laminardb", "default", "orders");
1861        let envelope = CdcEnvelope::delete(serde_json::json!({"id": 1}), source, 1_706_140_800_000);
1862
1863        assert_eq!(envelope.op, "d");
1864        assert!(envelope.is_delete());
1865        assert!(envelope.before.is_some());
1866        assert!(envelope.after.is_none());
1867        assert_eq!(envelope.weight(), -1);
1868    }
1869
1870    #[test]
1871    fn test_cdc_envelope_update() {
1872        let source = CdcSource::new("laminardb", "default", "orders");
1873        let envelope = CdcEnvelope::update(
1874            serde_json::json!({"id": 1, "amount": 100}),
1875            serde_json::json!({"id": 1, "amount": 150}),
1876            source,
1877            1_706_140_800_000,
1878        );
1879
1880        assert_eq!(envelope.op, "u");
1881        assert!(envelope.is_update());
1882        assert!(envelope.before.is_some());
1883        assert!(envelope.after.is_some());
1884        assert_eq!(envelope.weight(), 0);
1885    }
1886
1887    #[test]
1888    fn test_cdc_envelope_json_serialization() {
1889        let source = CdcSource::new("laminardb", "default", "orders");
1890        let envelope = CdcEnvelope::insert(
1891            serde_json::json!({"id": 1, "amount": 100}),
1892            source,
1893            1_706_140_800_000,
1894        );
1895
1896        let json = envelope.to_json().unwrap();
1897        assert!(json.contains("\"op\":\"c\""));
1898        assert!(json.contains("\"after\""));
1899        assert!(!json.contains("\"before\""));
1900        assert!(json.contains("\"ts_ms\":1706140800000"));
1901    }
1902
1903    #[test]
1904    fn test_cdc_envelope_debezium_compatible() {
1905        let source = CdcSource::with_sequence("laminardb", "test_db", "users", 42);
1906        let envelope = CdcEnvelope::insert(
1907            serde_json::json!({"user_id": 123, "name": "Alice"}),
1908            source,
1909            1_706_140_800_000,
1910        );
1911
1912        let json = envelope.to_json().unwrap();
1913
1914        // Verify Debezium-compatible fields
1915        assert!(json.contains("\"op\":\"c\""));
1916        assert!(json.contains("\"source\""));
1917        assert!(json.contains("\"name\":\"laminardb\""));
1918        assert!(json.contains("\"db\":\"test_db\""));
1919        assert!(json.contains("\"table\":\"users\""));
1920        assert!(json.contains("\"sequence\":42"));
1921    }
1922
1923    #[test]
1924    fn test_cdc_source_sequence() {
1925        let mut source = CdcSource::new("laminardb", "db", "table");
1926        assert_eq!(source.sequence, 0);
1927
1928        assert_eq!(source.next_sequence(), 1);
1929        assert_eq!(source.next_sequence(), 2);
1930        assert_eq!(source.sequence, 2);
1931    }
1932
1933    // CdcOperation Tests
1934
1935    #[test]
1936    fn test_cdc_operation_roundtrip() {
1937        for op in [
1938            CdcOperation::Insert,
1939            CdcOperation::Delete,
1940            CdcOperation::UpdateBefore,
1941            CdcOperation::UpdateAfter,
1942        ] {
1943            let u8_val = op.to_u8();
1944            let restored = CdcOperation::from_u8(u8_val);
1945            assert_eq!(op, restored);
1946        }
1947    }
1948
1949    #[test]
1950    fn test_cdc_operation_unknown_u8() {
1951        // Unknown values default to Insert
1952        assert_eq!(CdcOperation::from_u8(255), CdcOperation::Insert);
1953    }
1954
1955    // ════════════════════════════════════════════════════════════════════════
1956    // F076: Retractable FIRST/LAST Accumulator Tests
1957    // ════════════════════════════════════════════════════════════════════════
1958
1959    // ── RetractableFirstValueAccumulator ─────────────────────────────────────
1960
1961    #[test]
1962    fn test_retractable_first_value_basic() {
1963        let mut acc = RetractableFirstValueAccumulator::new();
1964        assert!(acc.is_empty());
1965        assert_eq!(acc.result(), None);
1966
1967        // Add entries out of order
1968        acc.add((200, 20));
1969        acc.add((100, 10));
1970        acc.add((300, 30));
1971
1972        assert!(!acc.is_empty());
1973        assert_eq!(acc.len(), 3);
1974        // First value = earliest timestamp (100) → value 10
1975        assert_eq!(acc.result(), Some(10));
1976    }
1977
1978    #[test]
1979    fn test_retractable_first_value_retract_non_first() {
1980        let mut acc = RetractableFirstValueAccumulator::new();
1981        acc.add((100, 10));
1982        acc.add((200, 20));
1983        acc.add((300, 30));
1984
1985        // Retract a non-first entry → first value unchanged
1986        acc.retract(&(200, 20));
1987        assert_eq!(acc.len(), 2);
1988        assert_eq!(acc.result(), Some(10));
1989    }
1990
1991    #[test]
1992    fn test_retractable_first_value_retract_first() {
1993        let mut acc = RetractableFirstValueAccumulator::new();
1994        acc.add((100, 10));
1995        acc.add((200, 20));
1996        acc.add((300, 30));
1997
1998        // Retract the first entry → next earliest becomes first
1999        acc.retract(&(100, 10));
2000        assert_eq!(acc.len(), 2);
2001        assert_eq!(acc.result(), Some(20)); // ts=200
2002    }
2003
2004    #[test]
2005    fn test_retractable_first_value_retract_all() {
2006        let mut acc = RetractableFirstValueAccumulator::new();
2007        acc.add((100, 10));
2008        acc.add((200, 20));
2009
2010        acc.retract(&(100, 10));
2011        acc.retract(&(200, 20));
2012        assert!(acc.is_empty());
2013        assert_eq!(acc.result(), None);
2014    }
2015
2016    #[test]
2017    fn test_retractable_first_value_retract_nonexistent() {
2018        let mut acc = RetractableFirstValueAccumulator::new();
2019        acc.add((100, 10));
2020
2021        // Retract something that doesn't exist → no effect
2022        acc.retract(&(999, 99));
2023        assert_eq!(acc.len(), 1);
2024        assert_eq!(acc.result(), Some(10));
2025    }
2026
2027    #[test]
2028    fn test_retractable_first_value_duplicate_timestamps() {
2029        let mut acc = RetractableFirstValueAccumulator::new();
2030        acc.add((100, 10));
2031        acc.add((100, 20)); // Same timestamp, different value
2032
2033        assert_eq!(acc.len(), 2);
2034        // First at timestamp 100, first inserted value
2035        assert_eq!(acc.result(), Some(10));
2036
2037        // Retract one → other remains
2038        acc.retract(&(100, 10));
2039        assert_eq!(acc.result(), Some(20));
2040    }
2041
2042    // ── RetractableLastValueAccumulator ──────────────────────────────────────
2043
2044    #[test]
2045    fn test_retractable_last_value_basic() {
2046        let mut acc = RetractableLastValueAccumulator::new();
2047        assert!(acc.is_empty());
2048        assert_eq!(acc.result(), None);
2049
2050        acc.add((100, 10));
2051        acc.add((300, 30));
2052        acc.add((200, 20));
2053
2054        assert_eq!(acc.len(), 3);
2055        // Last value = latest timestamp (300) → value 30
2056        assert_eq!(acc.result(), Some(30));
2057    }
2058
2059    #[test]
2060    fn test_retractable_last_value_retract_non_last() {
2061        let mut acc = RetractableLastValueAccumulator::new();
2062        acc.add((100, 10));
2063        acc.add((200, 20));
2064        acc.add((300, 30));
2065
2066        // Retract a non-last entry → last value unchanged
2067        acc.retract(&(200, 20));
2068        assert_eq!(acc.result(), Some(30));
2069    }
2070
2071    #[test]
2072    fn test_retractable_last_value_retract_last() {
2073        let mut acc = RetractableLastValueAccumulator::new();
2074        acc.add((100, 10));
2075        acc.add((200, 20));
2076        acc.add((300, 30));
2077
2078        // Retract the last entry → next latest becomes last
2079        acc.retract(&(300, 30));
2080        assert_eq!(acc.result(), Some(20)); // ts=200
2081    }
2082
2083    #[test]
2084    fn test_retractable_last_value_retract_all() {
2085        let mut acc = RetractableLastValueAccumulator::new();
2086        acc.add((100, 10));
2087        acc.retract(&(100, 10));
2088        assert!(acc.is_empty());
2089        assert_eq!(acc.result(), None);
2090    }
2091
2092    // ── Merge tests ─────────────────────────────────────────────────────────
2093
2094    #[test]
2095    fn test_retractable_first_value_merge() {
2096        let mut acc1 = RetractableFirstValueAccumulator::new();
2097        let mut acc2 = RetractableFirstValueAccumulator::new();
2098
2099        acc1.add((200, 20));
2100        acc1.add((400, 40));
2101        acc2.add((100, 10));
2102        acc2.add((300, 30));
2103
2104        acc1.merge(&acc2);
2105        assert_eq!(acc1.len(), 4);
2106        // Merged: sorted by timestamp, first = (100, 10)
2107        assert_eq!(acc1.result(), Some(10));
2108    }
2109
2110    #[test]
2111    fn test_retractable_last_value_merge() {
2112        let mut acc1 = RetractableLastValueAccumulator::new();
2113        let mut acc2 = RetractableLastValueAccumulator::new();
2114
2115        acc1.add((100, 10));
2116        acc1.add((300, 30));
2117        acc2.add((200, 20));
2118        acc2.add((400, 40));
2119
2120        acc1.merge(&acc2);
2121        assert_eq!(acc1.len(), 4);
2122        // Last = (400, 40)
2123        assert_eq!(acc1.result(), Some(40));
2124    }
2125
2126    #[test]
2127    fn test_retractable_first_value_merge_empty() {
2128        let mut acc1 = RetractableFirstValueAccumulator::new();
2129        let acc2 = RetractableFirstValueAccumulator::new();
2130
2131        acc1.add((100, 10));
2132        acc1.merge(&acc2); // Merge empty into non-empty
2133        assert_eq!(acc1.result(), Some(10));
2134
2135        let mut acc3 = RetractableFirstValueAccumulator::new();
2136        let acc4 = RetractableFirstValueAccumulator::new();
2137        acc3.merge(&acc4); // Merge empty into empty
2138        assert!(acc3.is_empty());
2139    }
2140
2141    // ── Reset/clear tests ───────────────────────────────────────────────────
2142
2143    #[test]
2144    fn test_retractable_first_value_reset() {
2145        let mut acc = RetractableFirstValueAccumulator::new();
2146        acc.add((100, 10));
2147        acc.add((200, 20));
2148        assert!(!acc.is_empty());
2149
2150        acc.reset();
2151        assert!(acc.is_empty());
2152        assert_eq!(acc.result(), None);
2153    }
2154
2155    #[test]
2156    fn test_retractable_last_value_reset() {
2157        let mut acc = RetractableLastValueAccumulator::new();
2158        acc.add((100, 10));
2159        acc.reset();
2160        assert!(acc.is_empty());
2161    }
2162
2163    // ── f64 variant tests ───────────────────────────────────────────────────
2164
2165    #[test]
2166    fn test_retractable_first_value_f64_basic() {
2167        let mut acc = RetractableFirstValueF64Accumulator::new();
2168        acc.add((200, 20.5));
2169        acc.add((100, 10.5));
2170        acc.add((300, 30.5));
2171
2172        assert_eq!(acc.len(), 3);
2173        // First = earliest timestamp (100) → value 10.5
2174        let result = acc.result_f64().unwrap();
2175        assert!((result - 10.5).abs() < f64::EPSILON);
2176    }
2177
2178    #[test]
2179    fn test_retractable_first_value_f64_retract() {
2180        let mut acc = RetractableFirstValueF64Accumulator::new();
2181        acc.add((100, 10.5));
2182        acc.add((200, 20.5));
2183
2184        // Retract first → next becomes first
2185        acc.retract(&(100, 10.5));
2186        let result = acc.result_f64().unwrap();
2187        assert!((result - 20.5).abs() < f64::EPSILON);
2188    }
2189
2190    #[test]
2191    fn test_retractable_last_value_f64_basic() {
2192        let mut acc = RetractableLastValueF64Accumulator::new();
2193        acc.add((100, 10.5));
2194        acc.add((300, 30.5));
2195        acc.add((200, 20.5));
2196
2197        let result = acc.result_f64().unwrap();
2198        assert!((result - 30.5).abs() < f64::EPSILON);
2199    }
2200
2201    #[test]
2202    fn test_retractable_last_value_f64_retract() {
2203        let mut acc = RetractableLastValueF64Accumulator::new();
2204        acc.add((100, 10.5));
2205        acc.add((200, 20.5));
2206        acc.add((300, 30.5));
2207
2208        acc.retract(&(300, 30.5));
2209        let result = acc.result_f64().unwrap();
2210        assert!((result - 20.5).abs() < f64::EPSILON);
2211    }
2212
2213    #[test]
2214    fn test_retractable_first_value_f64_merge() {
2215        let mut acc1 = RetractableFirstValueF64Accumulator::new();
2216        let mut acc2 = RetractableFirstValueF64Accumulator::new();
2217        acc1.add((200, 20.5));
2218        acc2.add((100, 10.5));
2219        acc1.merge(&acc2);
2220        let result = acc1.result_f64().unwrap();
2221        assert!((result - 10.5).abs() < f64::EPSILON);
2222    }
2223
2224    #[test]
2225    fn test_retractable_last_value_f64_merge() {
2226        let mut acc1 = RetractableLastValueF64Accumulator::new();
2227        let mut acc2 = RetractableLastValueF64Accumulator::new();
2228        acc1.add((100, 10.5));
2229        acc2.add((300, 30.5));
2230        acc1.merge(&acc2);
2231        let result = acc1.result_f64().unwrap();
2232        assert!((result - 30.5).abs() < f64::EPSILON);
2233    }
2234
2235    // ── Edge cases ──────────────────────────────────────────────────────────
2236
2237    #[test]
2238    fn test_retractable_first_value_single_entry() {
2239        let mut acc = RetractableFirstValueAccumulator::new();
2240        acc.add((100, 42));
2241        assert_eq!(acc.result(), Some(42));
2242        acc.retract(&(100, 42));
2243        assert_eq!(acc.result(), None);
2244    }
2245
2246    #[test]
2247    fn test_retractable_last_value_single_entry() {
2248        let mut acc = RetractableLastValueAccumulator::new();
2249        acc.add((100, 42));
2250        assert_eq!(acc.result(), Some(42));
2251        acc.retract(&(100, 42));
2252        assert_eq!(acc.result(), None);
2253    }
2254
2255    #[test]
2256    fn test_retractable_first_value_negative_values() {
2257        let mut acc = RetractableFirstValueAccumulator::new();
2258        acc.add((100, -10));
2259        acc.add((200, -20));
2260        assert_eq!(acc.result(), Some(-10));
2261    }
2262
2263    #[test]
2264    fn test_retractable_supports_efficient_retraction() {
2265        let acc = RetractableFirstValueAccumulator::new();
2266        assert!(acc.supports_efficient_retraction());
2267
2268        let acc2 = RetractableLastValueAccumulator::new();
2269        assert!(acc2.supports_efficient_retraction());
2270
2271        let acc3 = RetractableFirstValueF64Accumulator::new();
2272        assert!(acc3.supports_efficient_retraction());
2273
2274        let acc4 = RetractableLastValueF64Accumulator::new();
2275        assert!(acc4.supports_efficient_retraction());
2276    }
2277
2278    // ── OHLC retraction simulation ──────────────────────────────────────────
2279
2280    #[test]
2281    fn test_ohlc_retraction_simulation() {
2282        // Simulate an OHLC window where trades arrive out of order
2283        // and one needs to be retracted
2284        let mut open_acc = RetractableFirstValueAccumulator::new();
2285        let mut close_acc = RetractableLastValueAccumulator::new();
2286
2287        // Trade 1: price=100 at t=1000
2288        open_acc.add((1000, 100));
2289        close_acc.add((1000, 100));
2290
2291        // Trade 2: price=105 at t=2000
2292        open_acc.add((2000, 105));
2293        close_acc.add((2000, 105));
2294
2295        // Trade 3: price=98 at t=3000
2296        open_acc.add((3000, 98));
2297        close_acc.add((3000, 98));
2298
2299        assert_eq!(open_acc.result(), Some(100)); // Open = earliest
2300        assert_eq!(close_acc.result(), Some(98)); // Close = latest
2301
2302        // Retract trade 1 (correction: it was a bad trade)
2303        open_acc.retract(&(1000, 100));
2304        close_acc.retract(&(1000, 100));
2305
2306        // Open now = trade 2 (earliest remaining)
2307        assert_eq!(open_acc.result(), Some(105));
2308        // Close still = trade 3 (latest)
2309        assert_eq!(close_acc.result(), Some(98));
2310    }
2311
2312    #[test]
2313    fn test_ohlc_retraction_f64_simulation() {
2314        let mut open_acc = RetractableFirstValueF64Accumulator::new();
2315        let mut close_acc = RetractableLastValueF64Accumulator::new();
2316
2317        open_acc.add((1000, 100.50));
2318        close_acc.add((1000, 100.50));
2319        open_acc.add((2000, 105.25));
2320        close_acc.add((2000, 105.25));
2321        open_acc.add((3000, 98.75));
2322        close_acc.add((3000, 98.75));
2323
2324        let open = open_acc.result_f64().unwrap();
2325        let close = close_acc.result_f64().unwrap();
2326        assert!((open - 100.50).abs() < f64::EPSILON);
2327        assert!((close - 98.75).abs() < f64::EPSILON);
2328
2329        // Retract trade at t=1000
2330        open_acc.retract(&(1000, 100.50));
2331        close_acc.retract(&(1000, 100.50));
2332
2333        let open2 = open_acc.result_f64().unwrap();
2334        assert!((open2 - 105.25).abs() < f64::EPSILON);
2335    }
2336}