Skip to main content

laminar_core/operator/
topk.rs

1//! # Streaming Top-K Operator
2//!
3//! Bounded heap of top-K items with retraction-based changelog emission.
4//!
5//! Supports `ORDER BY ... LIMIT N` on unbounded streams by maintaining
6//! a sorted buffer of at most K entries. New events that rank within the
7//! top-K cause eviction of the worst entry and optional rank-change
8//! retractions.
9//!
10//! ## Emit Strategies
11//!
12//! - `OnUpdate`: Emit changelog on every state change (lowest latency)
13//! - `OnWatermark`: Buffer changes, emit on watermark advance
14//! - `Periodic(interval)`: Emit on timer interval
15//!
16//! ## Ring 0 Constraints
17//!
18//! - `entries` pre-allocated to capacity K — no reallocation during `process()`
19//! - Sort keys use `Vec<u8>` memcomparable encoding for zero-branch comparison
20//! - Sorted Vec with binary search: O(log K) search + O(K) shift
21
22use super::window::ChangelogRecord;
23use super::{
24    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
25};
26use arrow_array::{Array, Float64Array, Int64Array, StringArray, TimestampMicrosecondArray};
27use arrow_schema::DataType;
28
29/// Configuration for a sort column in the top-K operator.
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct TopKSortColumn {
32    /// Column name in the event schema
33    pub column_name: String,
34    /// Sort in descending order
35    pub descending: bool,
36    /// Place NULL values before non-NULL values
37    pub nulls_first: bool,
38}
39
40impl TopKSortColumn {
41    /// Creates a new ascending sort column.
42    #[must_use]
43    pub fn ascending(name: impl Into<String>) -> Self {
44        Self {
45            column_name: name.into(),
46            descending: false,
47            nulls_first: false,
48        }
49    }
50
51    /// Creates a new descending sort column.
52    #[must_use]
53    pub fn descending(name: impl Into<String>) -> Self {
54        Self {
55            column_name: name.into(),
56            descending: true,
57            nulls_first: false,
58        }
59    }
60
61    /// Sets whether nulls should sort first.
62    #[must_use]
63    pub fn with_nulls_first(mut self, nulls_first: bool) -> Self {
64        self.nulls_first = nulls_first;
65        self
66    }
67}
68
69/// Emit strategy for the streaming top-K operator.
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub enum TopKEmitStrategy {
72    /// Emit changelog on every state change (lowest latency, highest volume).
73    OnUpdate,
74    /// Buffer changes, emit batch when watermark advances.
75    OnWatermark,
76    /// Emit on timer at the given interval in microseconds.
77    Periodic(i64),
78}
79
80/// An entry in the top-K buffer.
81#[derive(Debug, Clone)]
82struct TopKEntry {
83    /// Memcomparable sort key for efficient comparison.
84    sort_key: Vec<u8>,
85    /// The original event.
86    event: Event,
87}
88
89/// Streaming top-K operator for `ORDER BY ... LIMIT N`.
90///
91/// Maintains a sorted buffer of at most K entries. Each incoming event
92/// is checked against the current worst entry. If better, it replaces
93/// the worst and changelog records are emitted.
94pub struct StreamingTopKOperator {
95    /// Operator identifier for checkpointing.
96    operator_id: String,
97    /// Number of top entries to maintain.
98    k: usize,
99    /// Sort column specifications.
100    sort_columns: Vec<TopKSortColumn>,
101    /// Sorted entries (best first). Pre-allocated to capacity K.
102    entries: Vec<TopKEntry>,
103    /// Emission strategy.
104    emit_strategy: TopKEmitStrategy,
105    /// Pending changelog records (for OnWatermark/Periodic strategies).
106    pending_changes: Vec<ChangelogRecord>,
107    /// Monotonic sequence counter for changelog ordering.
108    sequence_counter: u64,
109    /// Current watermark value.
110    current_watermark: i64,
111}
112
113impl StreamingTopKOperator {
114    /// Creates a new streaming top-K operator.
115    #[must_use]
116    pub fn new(
117        operator_id: String,
118        k: usize,
119        sort_columns: Vec<TopKSortColumn>,
120        emit_strategy: TopKEmitStrategy,
121    ) -> Self {
122        Self {
123            operator_id,
124            k,
125            sort_columns,
126            entries: Vec::with_capacity(k),
127            emit_strategy,
128            pending_changes: Vec::new(),
129            sequence_counter: 0,
130            current_watermark: i64::MIN,
131        }
132    }
133
134    /// Returns the current number of entries in the top-K buffer.
135    #[must_use]
136    pub fn len(&self) -> usize {
137        self.entries.len()
138    }
139
140    /// Returns true if the top-K buffer is empty.
141    #[must_use]
142    pub fn is_empty(&self) -> bool {
143        self.entries.is_empty()
144    }
145
146    /// Returns the current entries as events (best first).
147    #[must_use]
148    pub fn entries(&self) -> Vec<&Event> {
149        self.entries.iter().map(|e| &e.event).collect()
150    }
151
152    /// Returns the current watermark value.
153    #[must_use]
154    pub fn current_watermark(&self) -> i64 {
155        self.current_watermark
156    }
157
158    /// Returns the number of pending changelog records.
159    #[must_use]
160    pub fn pending_changes_count(&self) -> usize {
161        self.pending_changes.len()
162    }
163
164    /// Extracts a memcomparable sort key from an event.
165    fn extract_sort_key(&self, event: &Event) -> Vec<u8> {
166        let batch = &event.data;
167        let schema = batch.schema();
168        let mut key = Vec::new();
169
170        for col_spec in &self.sort_columns {
171            let Ok(col_idx) = schema.index_of(&col_spec.column_name) else {
172                // Column not found — encode as null
173                encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
174                continue;
175            };
176
177            let array = batch.column(col_idx);
178
179            if array.is_null(0) {
180                encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
181                continue;
182            }
183
184            match array.data_type() {
185                DataType::Int64 => {
186                    let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
187                    encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
188                    encode_i64(arr.value(0), col_spec.descending, &mut key);
189                }
190                DataType::Float64 => {
191                    let arr = array.as_any().downcast_ref::<Float64Array>().unwrap();
192                    encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
193                    encode_f64(arr.value(0), col_spec.descending, &mut key);
194                }
195                DataType::Utf8 => {
196                    let arr = array.as_any().downcast_ref::<StringArray>().unwrap();
197                    encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
198                    encode_utf8(arr.value(0), col_spec.descending, &mut key);
199                }
200                DataType::Timestamp(_, _) => {
201                    let arr = array
202                        .as_any()
203                        .downcast_ref::<TimestampMicrosecondArray>()
204                        .unwrap();
205                    encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
206                    encode_i64(arr.value(0), col_spec.descending, &mut key);
207                }
208                _ => {
209                    // Unsupported type: treat as null
210                    encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
211                }
212            }
213        }
214
215        key
216    }
217
218    /// Finds the insertion position for a sort key using binary search.
219    /// Returns the index where the key should be inserted to maintain sorted order.
220    fn find_insert_position(&self, sort_key: &[u8]) -> usize {
221        self.entries
222            .binary_search_by(|entry| entry.sort_key.as_slice().cmp(sort_key))
223            .unwrap_or_else(|pos| pos)
224    }
225
226    /// Checks if an event with the given sort key would enter the top-K.
227    fn would_enter_topk(&self, sort_key: &[u8]) -> bool {
228        if self.entries.len() < self.k {
229            return true;
230        }
231        // Compare with the worst (last) entry
232        if let Some(worst) = self.entries.last() {
233            sort_key < worst.sort_key.as_slice()
234        } else {
235            true
236        }
237    }
238
239    /// Processes a single event, returning changelog records for the changes.
240    fn process_event(&mut self, event: &Event, emit_timestamp: i64) -> Vec<ChangelogRecord> {
241        let sort_key = self.extract_sort_key(event);
242
243        if !self.would_enter_topk(&sort_key) {
244            return Vec::new();
245        }
246
247        let insert_pos = self.find_insert_position(&sort_key);
248        let mut changes = Vec::new();
249
250        // Insert the new entry
251        let new_entry = TopKEntry {
252            sort_key,
253            event: event.clone(),
254        };
255        self.entries.insert(insert_pos, new_entry);
256
257        // Generate insert changelog
258        changes.push(ChangelogRecord::insert(event.clone(), emit_timestamp));
259
260        // Generate rank change retractions for entries that shifted down
261        for i in (insert_pos + 1)..self.entries.len().min(self.k) {
262            let shifted_event = &self.entries[i].event;
263            // Emit update: rank changed from (i-1) to i
264            let (before, after) = ChangelogRecord::update(
265                shifted_event.clone(),
266                shifted_event.clone(),
267                emit_timestamp,
268            );
269            changes.push(before);
270            changes.push(after);
271        }
272
273        // Evict worst entry if over capacity
274        if self.entries.len() > self.k {
275            let evicted = self.entries.pop().unwrap();
276            changes.push(ChangelogRecord::delete(evicted.event, emit_timestamp));
277        }
278
279        self.sequence_counter += 1;
280        changes
281    }
282
283    /// Flushes pending changelog records as Output.
284    fn flush_pending(&mut self) -> OutputVec {
285        let mut outputs = OutputVec::new();
286        for record in self.pending_changes.drain(..) {
287            outputs.push(Output::Changelog(record));
288        }
289        outputs
290    }
291}
292
293impl Operator for StreamingTopKOperator {
294    fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
295        let emit_timestamp = event.timestamp;
296        let changes = self.process_event(event, emit_timestamp);
297
298        match &self.emit_strategy {
299            TopKEmitStrategy::OnUpdate => {
300                let mut outputs = OutputVec::new();
301                for record in changes {
302                    outputs.push(Output::Changelog(record));
303                }
304                outputs
305            }
306            TopKEmitStrategy::OnWatermark | TopKEmitStrategy::Periodic(_) => {
307                self.pending_changes.extend(changes);
308                OutputVec::new()
309            }
310        }
311    }
312
313    fn on_timer(&mut self, _timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
314        // For Periodic strategy: flush pending changes on timer
315        match &self.emit_strategy {
316            TopKEmitStrategy::Periodic(_) => self.flush_pending(),
317            _ => OutputVec::new(),
318        }
319    }
320
321    fn checkpoint(&self) -> OperatorState {
322        // Serialize entry count + sort keys + timestamps
323        // For simplicity, serialize as JSON-like format
324        let mut data = Vec::new();
325
326        // Write entry count
327        let count = self.entries.len() as u64;
328        data.extend_from_slice(&count.to_le_bytes());
329
330        // Write watermark
331        data.extend_from_slice(&self.current_watermark.to_le_bytes());
332
333        // Write sequence counter
334        data.extend_from_slice(&self.sequence_counter.to_le_bytes());
335
336        // Write each entry's sort key length + sort key
337        for entry in &self.entries {
338            let key_len = entry.sort_key.len() as u64;
339            data.extend_from_slice(&key_len.to_le_bytes());
340            data.extend_from_slice(&entry.sort_key);
341            data.extend_from_slice(&entry.event.timestamp.to_le_bytes());
342        }
343
344        OperatorState {
345            operator_id: self.operator_id.clone(),
346            data,
347        }
348    }
349
350    #[allow(clippy::cast_possible_truncation)] // Checkpoint wire format uses u64 for counts
351    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
352        if state.data.len() < 24 {
353            return Err(OperatorError::SerializationFailed(
354                "TopK checkpoint data too short".to_string(),
355            ));
356        }
357
358        let mut offset = 0;
359        let count = u64::from_le_bytes(
360            state.data[offset..offset + 8]
361                .try_into()
362                .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
363        ) as usize;
364        offset += 8;
365
366        self.current_watermark = i64::from_le_bytes(
367            state.data[offset..offset + 8]
368                .try_into()
369                .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
370        );
371        offset += 8;
372
373        self.sequence_counter = u64::from_le_bytes(
374            state.data[offset..offset + 8]
375                .try_into()
376                .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
377        );
378        offset += 8;
379
380        // Restore sort keys (events are not fully restored — only sort key metadata)
381        self.entries.clear();
382        for _ in 0..count {
383            if offset + 8 > state.data.len() {
384                return Err(OperatorError::SerializationFailed(
385                    "TopK checkpoint truncated".to_string(),
386                ));
387            }
388            let key_len = u64::from_le_bytes(
389                state.data[offset..offset + 8]
390                    .try_into()
391                    .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
392            ) as usize;
393            offset += 8;
394
395            if offset + key_len + 8 > state.data.len() {
396                return Err(OperatorError::SerializationFailed(
397                    "TopK checkpoint truncated at key".to_string(),
398                ));
399            }
400            let sort_key = state.data[offset..offset + key_len].to_vec();
401            offset += key_len;
402
403            let timestamp = i64::from_le_bytes(
404                state.data[offset..offset + 8]
405                    .try_into()
406                    .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
407            );
408            offset += 8;
409
410            // Create a minimal event placeholder for the restored entry
411            let batch = arrow_array::RecordBatch::new_empty(std::sync::Arc::new(
412                arrow_schema::Schema::empty(),
413            ));
414            self.entries.push(TopKEntry {
415                sort_key,
416                event: Event::new(timestamp, batch),
417            });
418        }
419
420        Ok(())
421    }
422}
423
424// === Sort key encoding helpers ===
425
426/// Encodes a null value marker into the sort key.
427pub fn encode_null(nulls_first: bool, descending: bool, key: &mut Vec<u8>) {
428    // nulls_first=true + ascending  => null sorts first (0x00)
429    // nulls_first=true + descending => null sorts first (0xFF after flip)
430    // nulls_first=false + ascending => null sorts last (0x01)
431    // nulls_first=false + descending => null sorts last (0x00 after flip)
432    if nulls_first {
433        if descending {
434            key.push(0xFF);
435        } else {
436            key.push(0x00);
437        }
438    } else if descending {
439        key.push(0x00);
440    } else {
441        key.push(0xFF);
442    }
443}
444
445/// Encodes a non-null value marker into the sort key.
446pub fn encode_not_null(nulls_first: bool, descending: bool, key: &mut Vec<u8>) {
447    if nulls_first {
448        if descending {
449            key.push(0x00);
450        } else {
451            key.push(0x01);
452        }
453    } else if descending {
454        key.push(0x01);
455    } else {
456        key.push(0x00);
457    }
458}
459
460/// Encodes an i64 value as memcomparable bytes.
461///
462/// XOR with sign bit to convert signed comparison to unsigned,
463/// then big-endian encoding. Optionally flip all bits for descending.
464pub fn encode_i64(val: i64, descending: bool, key: &mut Vec<u8>) {
465    #[allow(clippy::cast_sign_loss)]
466    let unsigned = (val as u64) ^ (1u64 << 63);
467    let bytes = unsigned.to_be_bytes();
468    if descending {
469        key.extend(bytes.iter().map(|b| !b));
470    } else {
471        key.extend_from_slice(&bytes);
472    }
473}
474
475/// Encodes an f64 value as memcomparable bytes.
476///
477/// Uses IEEE 754 total ordering trick: if positive, flip sign bit;
478/// if negative, flip all bits. This gives correct ordering for all
479/// finite values, infinities, and NaN.
480pub fn encode_f64(val: f64, descending: bool, key: &mut Vec<u8>) {
481    let bits = val.to_bits();
482    let encoded = if bits & (1u64 << 63) == 0 {
483        bits ^ (1u64 << 63)
484    } else {
485        !bits
486    };
487    let bytes = encoded.to_be_bytes();
488    if descending {
489        key.extend(bytes.iter().map(|b| !b));
490    } else {
491        key.extend_from_slice(&bytes);
492    }
493}
494
495/// Encodes a UTF-8 string as memcomparable bytes.
496///
497/// Appends the raw bytes followed by a null terminator.
498/// For descending order, flips all bits.
499pub fn encode_utf8(val: &str, descending: bool, key: &mut Vec<u8>) {
500    if descending {
501        key.extend(val.as_bytes().iter().map(|b| !b));
502        key.push(0xFF); // flipped null terminator
503    } else {
504        key.extend_from_slice(val.as_bytes());
505        key.push(0x00); // null terminator
506    }
507}
508
509#[cfg(test)]
510#[allow(clippy::cast_possible_wrap)]
511mod tests {
512    use super::super::window::CdcOperation;
513    use super::*;
514    use crate::state::InMemoryStore;
515    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
516    use arrow_array::{Float64Array, Int64Array, RecordBatch, StringArray};
517    use arrow_schema::{DataType, Field, Schema};
518    use std::sync::Arc;
519
520    fn make_event(timestamp: i64, price: f64) -> Event {
521        let schema = Arc::new(Schema::new(vec![Field::new(
522            "price",
523            DataType::Float64,
524            false,
525        )]));
526        let batch =
527            RecordBatch::try_new(schema, vec![Arc::new(Float64Array::from(vec![price]))]).unwrap();
528        Event::new(timestamp, batch)
529    }
530
531    fn make_event_i64(timestamp: i64, value: i64) -> Event {
532        let schema = Arc::new(Schema::new(vec![Field::new(
533            "value",
534            DataType::Int64,
535            false,
536        )]));
537        let batch =
538            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
539        Event::new(timestamp, batch)
540    }
541
542    fn make_event_str(timestamp: i64, name: &str) -> Event {
543        let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, false)]));
544        let batch =
545            RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(vec![name]))]).unwrap();
546        Event::new(timestamp, batch)
547    }
548
549    fn make_multi_column_event(timestamp: i64, category: &str, price: f64) -> Event {
550        let schema = Arc::new(Schema::new(vec![
551            Field::new("category", DataType::Utf8, false),
552            Field::new("price", DataType::Float64, false),
553        ]));
554        let batch = RecordBatch::try_new(
555            schema,
556            vec![
557                Arc::new(StringArray::from(vec![category])),
558                Arc::new(Float64Array::from(vec![price])),
559            ],
560        )
561        .unwrap();
562        Event::new(timestamp, batch)
563    }
564
565    fn create_topk(
566        k: usize,
567        sort_columns: Vec<TopKSortColumn>,
568        strategy: TopKEmitStrategy,
569    ) -> StreamingTopKOperator {
570        StreamingTopKOperator::new("test_topk".to_string(), k, sort_columns, strategy)
571    }
572
573    fn create_test_context<'a>(
574        timers: &'a mut TimerService,
575        state: &'a mut dyn crate::state::StateStore,
576        watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
577    ) -> OperatorContext<'a> {
578        OperatorContext {
579            event_time: 0,
580            processing_time: 0,
581            timers,
582            state,
583            watermark_generator: watermark_gen,
584            operator_index: 0,
585        }
586    }
587
588    // --- Sort key encoding tests ---
589
590    #[test]
591    fn test_topk_sort_key_extraction_int64() {
592        let op = create_topk(
593            3,
594            vec![TopKSortColumn::ascending("value")],
595            TopKEmitStrategy::OnUpdate,
596        );
597        let e1 = make_event_i64(1, 100);
598        let e2 = make_event_i64(2, 200);
599        let e3 = make_event_i64(3, -50);
600
601        let k1 = op.extract_sort_key(&e1);
602        let k2 = op.extract_sort_key(&e2);
603        let k3 = op.extract_sort_key(&e3);
604
605        // Ascending: -50 < 100 < 200
606        assert!(k3 < k1);
607        assert!(k1 < k2);
608    }
609
610    #[test]
611    fn test_topk_sort_key_extraction_float64() {
612        let op = create_topk(
613            3,
614            vec![TopKSortColumn::descending("price")],
615            TopKEmitStrategy::OnUpdate,
616        );
617        let e1 = make_event(1, 150.0);
618        let e2 = make_event(2, 200.0);
619        let e3 = make_event(3, 100.0);
620
621        let k1 = op.extract_sort_key(&e1);
622        let k2 = op.extract_sort_key(&e2);
623        let k3 = op.extract_sort_key(&e3);
624
625        // Descending: 200 < 150 < 100 (in sort key order)
626        assert!(k2 < k1);
627        assert!(k1 < k3);
628    }
629
630    #[test]
631    fn test_topk_sort_key_extraction_utf8() {
632        let op = create_topk(
633            3,
634            vec![TopKSortColumn::ascending("name")],
635            TopKEmitStrategy::OnUpdate,
636        );
637        let e1 = make_event_str(1, "apple");
638        let e2 = make_event_str(2, "banana");
639        let e3 = make_event_str(3, "cherry");
640
641        let k1 = op.extract_sort_key(&e1);
642        let k2 = op.extract_sort_key(&e2);
643        let k3 = op.extract_sort_key(&e3);
644
645        // Ascending: apple < banana < cherry
646        assert!(k1 < k2);
647        assert!(k2 < k3);
648    }
649
650    #[test]
651    fn test_topk_sort_key_extraction_timestamp() {
652        let schema = Arc::new(Schema::new(vec![Field::new(
653            "ts",
654            DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
655            false,
656        )]));
657        let batch = RecordBatch::try_new(
658            schema,
659            vec![Arc::new(arrow_array::TimestampMicrosecondArray::from(
660                vec![1000],
661            ))],
662        )
663        .unwrap();
664        let event = Event::new(1, batch);
665
666        let op = create_topk(
667            3,
668            vec![TopKSortColumn::ascending("ts")],
669            TopKEmitStrategy::OnUpdate,
670        );
671        let key = op.extract_sort_key(&event);
672        assert!(!key.is_empty());
673    }
674
675    // --- Insertion tests ---
676
677    #[test]
678    fn test_topk_insert_below_capacity() {
679        let mut op = create_topk(
680            3,
681            vec![TopKSortColumn::descending("price")],
682            TopKEmitStrategy::OnUpdate,
683        );
684
685        let mut timers = TimerService::new();
686        let mut state = InMemoryStore::new();
687        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
688
689        let event = make_event(1, 150.0);
690        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
691        let outputs = op.process(&event, &mut ctx);
692
693        assert_eq!(op.len(), 1);
694        // Should emit an Insert changelog
695        assert!(!outputs.is_empty());
696    }
697
698    #[test]
699    fn test_topk_insert_at_capacity_better_entry() {
700        let mut op = create_topk(
701            2,
702            vec![TopKSortColumn::descending("price")],
703            TopKEmitStrategy::OnUpdate,
704        );
705
706        let mut timers = TimerService::new();
707        let mut state = InMemoryStore::new();
708        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
709
710        // Fill to capacity
711        for (i, price) in [100.0, 150.0].iter().enumerate() {
712            let event = make_event(i as i64, *price);
713            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
714            op.process(&event, &mut ctx);
715        }
716        assert_eq!(op.len(), 2);
717
718        // Insert a better entry (200 > 100, which is worst)
719        let better = make_event(3, 200.0);
720        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
721        let outputs = op.process(&better, &mut ctx);
722
723        assert_eq!(op.len(), 2);
724        // Should have Insert + rank changes + Delete for evicted
725        assert!(outputs.len() >= 2);
726    }
727
728    #[test]
729    fn test_topk_insert_at_capacity_worse_entry() {
730        let mut op = create_topk(
731            2,
732            vec![TopKSortColumn::descending("price")],
733            TopKEmitStrategy::OnUpdate,
734        );
735
736        let mut timers = TimerService::new();
737        let mut state = InMemoryStore::new();
738        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
739
740        // Fill with good entries
741        for (i, price) in [200.0, 150.0].iter().enumerate() {
742            let event = make_event(i as i64, *price);
743            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
744            op.process(&event, &mut ctx);
745        }
746
747        // Insert a worse entry (50 < 150)
748        let worse = make_event(3, 50.0);
749        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
750        let outputs = op.process(&worse, &mut ctx);
751
752        assert_eq!(op.len(), 2);
753        // No emission - entry doesn't enter top-K
754        assert!(outputs.is_empty());
755    }
756
757    #[test]
758    fn test_topk_ascending_order() {
759        let mut op = create_topk(
760            3,
761            vec![TopKSortColumn::ascending("value")],
762            TopKEmitStrategy::OnUpdate,
763        );
764
765        let mut timers = TimerService::new();
766        let mut state = InMemoryStore::new();
767        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
768
769        // Insert values: 30, 10, 20 -> top-3 ascending should be [10, 20, 30]
770        for (i, val) in [30i64, 10, 20].iter().enumerate() {
771            let event = make_event_i64(i as i64, *val);
772            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
773            op.process(&event, &mut ctx);
774        }
775
776        assert_eq!(op.len(), 3);
777        // Entries should be sorted by ascending value
778        let entries = op.entries();
779        let vals: Vec<i64> = entries
780            .iter()
781            .map(|e| {
782                e.data
783                    .column(0)
784                    .as_any()
785                    .downcast_ref::<Int64Array>()
786                    .unwrap()
787                    .value(0)
788            })
789            .collect();
790        assert_eq!(vals, vec![10, 20, 30]);
791    }
792
793    #[test]
794    fn test_topk_descending_order() {
795        let mut op = create_topk(
796            3,
797            vec![TopKSortColumn::descending("price")],
798            TopKEmitStrategy::OnUpdate,
799        );
800
801        let mut timers = TimerService::new();
802        let mut state = InMemoryStore::new();
803        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
804
805        // Insert: 100, 200, 150 -> top-3 descending should have 200 first
806        for (i, price) in [100.0, 200.0, 150.0].iter().enumerate() {
807            let event = make_event(i as i64, *price);
808            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
809            op.process(&event, &mut ctx);
810        }
811
812        let entries = op.entries();
813        let prices: Vec<f64> = entries
814            .iter()
815            .map(|e| {
816                e.data
817                    .column(0)
818                    .as_any()
819                    .downcast_ref::<Float64Array>()
820                    .unwrap()
821                    .value(0)
822            })
823            .collect();
824        assert_eq!(prices, vec![200.0, 150.0, 100.0]);
825    }
826
827    #[test]
828    fn test_topk_multi_column_sort() {
829        let mut op = create_topk(
830            3,
831            vec![
832                TopKSortColumn::ascending("category"),
833                TopKSortColumn::descending("price"),
834            ],
835            TopKEmitStrategy::OnUpdate,
836        );
837
838        let mut timers = TimerService::new();
839        let mut state = InMemoryStore::new();
840        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
841
842        let events = vec![
843            make_multi_column_event(1, "B", 100.0),
844            make_multi_column_event(2, "A", 200.0),
845            make_multi_column_event(3, "A", 150.0),
846        ];
847
848        for event in &events {
849            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
850            op.process(event, &mut ctx);
851        }
852
853        // Sort: category ASC, then price DESC within same category
854        // A/200, A/150, B/100
855        let entries = op.entries();
856        let cats: Vec<&str> = entries
857            .iter()
858            .map(|e| {
859                e.data
860                    .column(0)
861                    .as_any()
862                    .downcast_ref::<StringArray>()
863                    .unwrap()
864                    .value(0)
865            })
866            .collect();
867        assert_eq!(cats, vec!["A", "A", "B"]);
868    }
869
870    #[test]
871    fn test_topk_nulls_first() {
872        let sort_cols = vec![TopKSortColumn::ascending("value").with_nulls_first(true)];
873        let mut op = create_topk(3, sort_cols, TopKEmitStrategy::OnUpdate);
874
875        let mut timers = TimerService::new();
876        let mut state = InMemoryStore::new();
877        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
878
879        // Create a null value event
880        let schema = Arc::new(Schema::new(vec![Field::new(
881            "value",
882            DataType::Int64,
883            true,
884        )]));
885        let null_array = Int64Array::new_null(1);
886        let batch = RecordBatch::try_new(schema, vec![Arc::new(null_array)]).unwrap();
887        let null_event = Event::new(1, batch);
888
889        let val_event = make_event_i64(2, 100);
890
891        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
892        op.process(&val_event, &mut ctx);
893        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
894        op.process(&null_event, &mut ctx);
895
896        // With nulls_first, the null should sort before 100
897        let entries = op.entries();
898        assert_eq!(entries.len(), 2);
899        assert!(entries[0].data.column(0).is_null(0));
900    }
901
902    #[test]
903    fn test_topk_nulls_last() {
904        let sort_cols = vec![TopKSortColumn::ascending("value").with_nulls_first(false)];
905        let mut op = create_topk(3, sort_cols, TopKEmitStrategy::OnUpdate);
906
907        let mut timers = TimerService::new();
908        let mut state = InMemoryStore::new();
909        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
910
911        let schema = Arc::new(Schema::new(vec![Field::new(
912            "value",
913            DataType::Int64,
914            true,
915        )]));
916        let null_array = Int64Array::new_null(1);
917        let batch = RecordBatch::try_new(schema, vec![Arc::new(null_array)]).unwrap();
918        let null_event = Event::new(1, batch);
919
920        let val_event = make_event_i64(2, 100);
921
922        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
923        op.process(&null_event, &mut ctx);
924        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
925        op.process(&val_event, &mut ctx);
926
927        // With nulls_last, the value should sort before null
928        let entries = op.entries();
929        assert_eq!(entries.len(), 2);
930        assert!(!entries[0].data.column(0).is_null(0));
931    }
932
933    // --- Emit strategy tests ---
934
935    #[test]
936    fn test_topk_emit_on_update_insert() {
937        let mut op = create_topk(
938            3,
939            vec![TopKSortColumn::descending("price")],
940            TopKEmitStrategy::OnUpdate,
941        );
942
943        let mut timers = TimerService::new();
944        let mut state = InMemoryStore::new();
945        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
946
947        let event = make_event(1, 150.0);
948        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
949        let outputs = op.process(&event, &mut ctx);
950
951        // Should emit Insert changelog immediately
952        assert_eq!(outputs.len(), 1);
953        match &outputs[0] {
954            Output::Changelog(rec) => {
955                assert_eq!(rec.operation, CdcOperation::Insert);
956                assert_eq!(rec.weight, 1);
957            }
958            _ => panic!("Expected Changelog output"),
959        }
960    }
961
962    #[test]
963    fn test_topk_emit_on_update_eviction() {
964        let mut op = create_topk(
965            1,
966            vec![TopKSortColumn::descending("price")],
967            TopKEmitStrategy::OnUpdate,
968        );
969
970        let mut timers = TimerService::new();
971        let mut state = InMemoryStore::new();
972        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
973
974        // Fill with one entry
975        let event1 = make_event(1, 100.0);
976        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
977        op.process(&event1, &mut ctx);
978
979        // Better entry evicts the first
980        let event2 = make_event(2, 200.0);
981        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
982        let outputs = op.process(&event2, &mut ctx);
983
984        // Should have Insert + Delete (eviction)
985        let mut has_insert = false;
986        let mut has_delete = false;
987        for output in &outputs {
988            if let Output::Changelog(rec) = output {
989                match rec.operation {
990                    CdcOperation::Insert => has_insert = true,
991                    CdcOperation::Delete => has_delete = true,
992                    _ => {}
993                }
994            }
995        }
996        assert!(has_insert);
997        assert!(has_delete);
998    }
999
1000    #[test]
1001    fn test_topk_emit_on_update_rank_change() {
1002        let mut op = create_topk(
1003            3,
1004            vec![TopKSortColumn::descending("price")],
1005            TopKEmitStrategy::OnUpdate,
1006        );
1007
1008        let mut timers = TimerService::new();
1009        let mut state = InMemoryStore::new();
1010        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1011
1012        // Insert two entries
1013        let e1 = make_event(1, 100.0);
1014        let e2 = make_event(2, 200.0);
1015        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1016        op.process(&e1, &mut ctx);
1017        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1018        op.process(&e2, &mut ctx);
1019
1020        // Insert between them: 150 goes between 200 and 100, shifting 100's rank
1021        let e3 = make_event(3, 150.0);
1022        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1023        let outputs = op.process(&e3, &mut ctx);
1024
1025        // Should have Insert for 150 + UpdateBefore/UpdateAfter for 100's rank change
1026        let mut has_insert = false;
1027        let mut has_update_before = false;
1028        let mut has_update_after = false;
1029        for output in &outputs {
1030            if let Output::Changelog(rec) = output {
1031                match rec.operation {
1032                    CdcOperation::Insert => has_insert = true,
1033                    CdcOperation::UpdateBefore => has_update_before = true,
1034                    CdcOperation::UpdateAfter => has_update_after = true,
1035                    CdcOperation::Delete => {}
1036                }
1037            }
1038        }
1039        assert!(has_insert);
1040        assert!(has_update_before);
1041        assert!(has_update_after);
1042    }
1043
1044    #[test]
1045    fn test_topk_emit_on_watermark_batched() {
1046        let mut op = create_topk(
1047            3,
1048            vec![TopKSortColumn::descending("price")],
1049            TopKEmitStrategy::OnWatermark,
1050        );
1051
1052        let mut timers = TimerService::new();
1053        let mut state = InMemoryStore::new();
1054        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1055
1056        // Insert events — should not emit immediately
1057        let e1 = make_event(1, 100.0);
1058        let e2 = make_event(2, 200.0);
1059        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1060        let out1 = op.process(&e1, &mut ctx);
1061        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1062        let out2 = op.process(&e2, &mut ctx);
1063
1064        assert!(out1.is_empty());
1065        assert!(out2.is_empty());
1066        assert!(op.pending_changes_count() > 0);
1067    }
1068
1069    #[test]
1070    fn test_topk_emit_periodic() {
1071        let mut op = create_topk(
1072            3,
1073            vec![TopKSortColumn::descending("price")],
1074            TopKEmitStrategy::Periodic(1000),
1075        );
1076
1077        let mut timers = TimerService::new();
1078        let mut state = InMemoryStore::new();
1079        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1080
1081        // Insert events — buffered
1082        let e1 = make_event(1, 100.0);
1083        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1084        op.process(&e1, &mut ctx);
1085
1086        assert!(op.pending_changes_count() > 0);
1087
1088        // Timer triggers flush
1089        let timer = Timer {
1090            key: smallvec::smallvec![],
1091            timestamp: 1000,
1092        };
1093        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1094        let outputs = op.on_timer(timer, &mut ctx);
1095
1096        assert!(!outputs.is_empty());
1097        assert_eq!(op.pending_changes_count(), 0);
1098    }
1099
1100    // --- Edge case tests ---
1101
1102    #[test]
1103    fn test_topk_empty_heap() {
1104        let op = create_topk(
1105            3,
1106            vec![TopKSortColumn::descending("price")],
1107            TopKEmitStrategy::OnUpdate,
1108        );
1109        assert!(op.is_empty());
1110        assert_eq!(op.len(), 0);
1111    }
1112
1113    #[test]
1114    fn test_topk_k_equals_one() {
1115        let mut op = create_topk(
1116            1,
1117            vec![TopKSortColumn::descending("price")],
1118            TopKEmitStrategy::OnUpdate,
1119        );
1120
1121        let mut timers = TimerService::new();
1122        let mut state = InMemoryStore::new();
1123        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1124
1125        let e1 = make_event(1, 100.0);
1126        let e2 = make_event(2, 200.0);
1127        let e3 = make_event(3, 50.0);
1128
1129        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1130        op.process(&e1, &mut ctx);
1131        assert_eq!(op.len(), 1);
1132
1133        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1134        op.process(&e2, &mut ctx);
1135        assert_eq!(op.len(), 1);
1136
1137        // Verify it kept the best (200)
1138        let entries = op.entries();
1139        let price = entries[0]
1140            .data
1141            .column(0)
1142            .as_any()
1143            .downcast_ref::<Float64Array>()
1144            .unwrap()
1145            .value(0);
1146        assert!((price - 200.0).abs() < f64::EPSILON);
1147
1148        // Worse entry doesn't change anything
1149        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1150        let outputs = op.process(&e3, &mut ctx);
1151        assert!(outputs.is_empty());
1152    }
1153
1154    #[test]
1155    fn test_topk_large_k() {
1156        let mut op = create_topk(
1157            100,
1158            vec![TopKSortColumn::ascending("value")],
1159            TopKEmitStrategy::OnUpdate,
1160        );
1161
1162        let mut timers = TimerService::new();
1163        let mut state = InMemoryStore::new();
1164        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1165
1166        for i in 0..50 {
1167            let event = make_event_i64(i, i * 10);
1168            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1169            op.process(&event, &mut ctx);
1170        }
1171
1172        assert_eq!(op.len(), 50);
1173    }
1174
1175    #[test]
1176    fn test_topk_duplicate_sort_keys() {
1177        let mut op = create_topk(
1178            3,
1179            vec![TopKSortColumn::descending("price")],
1180            TopKEmitStrategy::OnUpdate,
1181        );
1182
1183        let mut timers = TimerService::new();
1184        let mut state = InMemoryStore::new();
1185        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1186
1187        // Insert three events with the same price
1188        for i in 0..3 {
1189            let event = make_event(i, 100.0);
1190            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1191            op.process(&event, &mut ctx);
1192        }
1193
1194        assert_eq!(op.len(), 3);
1195    }
1196
1197    // --- Checkpoint/restore tests ---
1198
1199    #[test]
1200    fn test_topk_checkpoint_roundtrip() {
1201        let mut op = create_topk(
1202            3,
1203            vec![TopKSortColumn::descending("price")],
1204            TopKEmitStrategy::OnUpdate,
1205        );
1206
1207        let mut timers = TimerService::new();
1208        let mut state = InMemoryStore::new();
1209        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1210
1211        for (i, price) in [150.0, 200.0, 100.0].iter().enumerate() {
1212            let event = make_event(i as i64, *price);
1213            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1214            op.process(&event, &mut ctx);
1215        }
1216
1217        let checkpoint = op.checkpoint();
1218        assert_eq!(checkpoint.operator_id, "test_topk");
1219        assert!(!checkpoint.data.is_empty());
1220
1221        // Restore to a new operator
1222        let mut op2 = create_topk(
1223            3,
1224            vec![TopKSortColumn::descending("price")],
1225            TopKEmitStrategy::OnUpdate,
1226        );
1227        op2.restore(checkpoint).unwrap();
1228
1229        assert_eq!(op2.len(), 3);
1230    }
1231
1232    #[test]
1233    fn test_topk_restore_and_continue() {
1234        let mut op = create_topk(
1235            2,
1236            vec![TopKSortColumn::descending("price")],
1237            TopKEmitStrategy::OnUpdate,
1238        );
1239
1240        let mut timers = TimerService::new();
1241        let mut state = InMemoryStore::new();
1242        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1243
1244        let event = make_event(1, 150.0);
1245        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1246        op.process(&event, &mut ctx);
1247
1248        let checkpoint = op.checkpoint();
1249
1250        let mut op2 = create_topk(
1251            2,
1252            vec![TopKSortColumn::descending("price")],
1253            TopKEmitStrategy::OnUpdate,
1254        );
1255        op2.restore(checkpoint).unwrap();
1256
1257        // Should be able to continue processing
1258        assert_eq!(op2.len(), 1);
1259    }
1260
1261    // --- Changelog record tests ---
1262
1263    #[test]
1264    fn test_topk_changelog_record_types() {
1265        let record = ChangelogRecord::insert(make_event(1, 100.0), 1);
1266        assert_eq!(record.operation, CdcOperation::Insert);
1267        assert_eq!(record.weight, 1);
1268
1269        let record = ChangelogRecord::delete(make_event(1, 100.0), 1);
1270        assert_eq!(record.operation, CdcOperation::Delete);
1271        assert_eq!(record.weight, -1);
1272
1273        let (before, after) =
1274            ChangelogRecord::update(make_event(1, 100.0), make_event(2, 200.0), 1);
1275        assert_eq!(before.operation, CdcOperation::UpdateBefore);
1276        assert_eq!(after.operation, CdcOperation::UpdateAfter);
1277    }
1278
1279    #[test]
1280    fn test_topk_no_emission_on_no_change() {
1281        let mut op = create_topk(
1282            2,
1283            vec![TopKSortColumn::descending("price")],
1284            TopKEmitStrategy::OnUpdate,
1285        );
1286
1287        let mut timers = TimerService::new();
1288        let mut state = InMemoryStore::new();
1289        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1290
1291        // Fill to capacity with good entries
1292        let e1 = make_event(1, 200.0);
1293        let e2 = make_event(2, 150.0);
1294        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1295        op.process(&e1, &mut ctx);
1296        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1297        op.process(&e2, &mut ctx);
1298
1299        // Worse entry — no change
1300        let e3 = make_event(3, 50.0);
1301        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1302        let outputs = op.process(&e3, &mut ctx);
1303
1304        assert!(outputs.is_empty());
1305    }
1306}