1use super::window::ChangelogRecord;
23use super::{
24 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
25};
26use arrow_array::{Array, Float64Array, Int64Array, StringArray, TimestampMicrosecondArray};
27use arrow_schema::DataType;
28
29#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct TopKSortColumn {
32 pub column_name: String,
34 pub descending: bool,
36 pub nulls_first: bool,
38}
39
40impl TopKSortColumn {
41 #[must_use]
43 pub fn ascending(name: impl Into<String>) -> Self {
44 Self {
45 column_name: name.into(),
46 descending: false,
47 nulls_first: false,
48 }
49 }
50
51 #[must_use]
53 pub fn descending(name: impl Into<String>) -> Self {
54 Self {
55 column_name: name.into(),
56 descending: true,
57 nulls_first: false,
58 }
59 }
60
61 #[must_use]
63 pub fn with_nulls_first(mut self, nulls_first: bool) -> Self {
64 self.nulls_first = nulls_first;
65 self
66 }
67}
68
69#[derive(Debug, Clone, PartialEq, Eq)]
71pub enum TopKEmitStrategy {
72 OnUpdate,
74 OnWatermark,
76 Periodic(i64),
78}
79
80#[derive(Debug, Clone)]
82struct TopKEntry {
83 sort_key: Vec<u8>,
85 event: Event,
87}
88
89pub struct StreamingTopKOperator {
95 operator_id: String,
97 k: usize,
99 sort_columns: Vec<TopKSortColumn>,
101 entries: Vec<TopKEntry>,
103 emit_strategy: TopKEmitStrategy,
105 pending_changes: Vec<ChangelogRecord>,
107 sequence_counter: u64,
109 current_watermark: i64,
111}
112
113impl StreamingTopKOperator {
114 #[must_use]
116 pub fn new(
117 operator_id: String,
118 k: usize,
119 sort_columns: Vec<TopKSortColumn>,
120 emit_strategy: TopKEmitStrategy,
121 ) -> Self {
122 Self {
123 operator_id,
124 k,
125 sort_columns,
126 entries: Vec::with_capacity(k),
127 emit_strategy,
128 pending_changes: Vec::new(),
129 sequence_counter: 0,
130 current_watermark: i64::MIN,
131 }
132 }
133
134 #[must_use]
136 pub fn len(&self) -> usize {
137 self.entries.len()
138 }
139
140 #[must_use]
142 pub fn is_empty(&self) -> bool {
143 self.entries.is_empty()
144 }
145
146 #[must_use]
148 pub fn entries(&self) -> Vec<&Event> {
149 self.entries.iter().map(|e| &e.event).collect()
150 }
151
152 #[must_use]
154 pub fn current_watermark(&self) -> i64 {
155 self.current_watermark
156 }
157
158 #[must_use]
160 pub fn pending_changes_count(&self) -> usize {
161 self.pending_changes.len()
162 }
163
164 fn extract_sort_key(&self, event: &Event) -> Vec<u8> {
166 let batch = &event.data;
167 let schema = batch.schema();
168 let mut key = Vec::new();
169
170 for col_spec in &self.sort_columns {
171 let Ok(col_idx) = schema.index_of(&col_spec.column_name) else {
172 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
174 continue;
175 };
176
177 let array = batch.column(col_idx);
178
179 if array.is_null(0) {
180 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
181 continue;
182 }
183
184 match array.data_type() {
185 DataType::Int64 => {
186 let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
187 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
188 encode_i64(arr.value(0), col_spec.descending, &mut key);
189 }
190 DataType::Float64 => {
191 let arr = array.as_any().downcast_ref::<Float64Array>().unwrap();
192 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
193 encode_f64(arr.value(0), col_spec.descending, &mut key);
194 }
195 DataType::Utf8 => {
196 let arr = array.as_any().downcast_ref::<StringArray>().unwrap();
197 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
198 encode_utf8(arr.value(0), col_spec.descending, &mut key);
199 }
200 DataType::Timestamp(_, _) => {
201 let arr = array
202 .as_any()
203 .downcast_ref::<TimestampMicrosecondArray>()
204 .unwrap();
205 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
206 encode_i64(arr.value(0), col_spec.descending, &mut key);
207 }
208 _ => {
209 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
211 }
212 }
213 }
214
215 key
216 }
217
218 fn find_insert_position(&self, sort_key: &[u8]) -> usize {
221 self.entries
222 .binary_search_by(|entry| entry.sort_key.as_slice().cmp(sort_key))
223 .unwrap_or_else(|pos| pos)
224 }
225
226 fn would_enter_topk(&self, sort_key: &[u8]) -> bool {
228 if self.entries.len() < self.k {
229 return true;
230 }
231 if let Some(worst) = self.entries.last() {
233 sort_key < worst.sort_key.as_slice()
234 } else {
235 true
236 }
237 }
238
239 fn process_event(&mut self, event: &Event, emit_timestamp: i64) -> Vec<ChangelogRecord> {
241 let sort_key = self.extract_sort_key(event);
242
243 if !self.would_enter_topk(&sort_key) {
244 return Vec::new();
245 }
246
247 let insert_pos = self.find_insert_position(&sort_key);
248 let mut changes = Vec::new();
249
250 let new_entry = TopKEntry {
252 sort_key,
253 event: event.clone(),
254 };
255 self.entries.insert(insert_pos, new_entry);
256
257 changes.push(ChangelogRecord::insert(event.clone(), emit_timestamp));
259
260 for i in (insert_pos + 1)..self.entries.len().min(self.k) {
262 let shifted_event = &self.entries[i].event;
263 let (before, after) = ChangelogRecord::update(
265 shifted_event.clone(),
266 shifted_event.clone(),
267 emit_timestamp,
268 );
269 changes.push(before);
270 changes.push(after);
271 }
272
273 if self.entries.len() > self.k {
275 let evicted = self.entries.pop().unwrap();
276 changes.push(ChangelogRecord::delete(evicted.event, emit_timestamp));
277 }
278
279 self.sequence_counter += 1;
280 changes
281 }
282
283 fn flush_pending(&mut self) -> OutputVec {
285 let mut outputs = OutputVec::new();
286 for record in self.pending_changes.drain(..) {
287 outputs.push(Output::Changelog(record));
288 }
289 outputs
290 }
291}
292
293impl Operator for StreamingTopKOperator {
294 fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
295 let emit_timestamp = event.timestamp;
296 let changes = self.process_event(event, emit_timestamp);
297
298 match &self.emit_strategy {
299 TopKEmitStrategy::OnUpdate => {
300 let mut outputs = OutputVec::new();
301 for record in changes {
302 outputs.push(Output::Changelog(record));
303 }
304 outputs
305 }
306 TopKEmitStrategy::OnWatermark | TopKEmitStrategy::Periodic(_) => {
307 self.pending_changes.extend(changes);
308 OutputVec::new()
309 }
310 }
311 }
312
313 fn on_timer(&mut self, _timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
314 match &self.emit_strategy {
316 TopKEmitStrategy::Periodic(_) => self.flush_pending(),
317 _ => OutputVec::new(),
318 }
319 }
320
321 fn checkpoint(&self) -> OperatorState {
322 let mut data = Vec::new();
325
326 let count = self.entries.len() as u64;
328 data.extend_from_slice(&count.to_le_bytes());
329
330 data.extend_from_slice(&self.current_watermark.to_le_bytes());
332
333 data.extend_from_slice(&self.sequence_counter.to_le_bytes());
335
336 for entry in &self.entries {
338 let key_len = entry.sort_key.len() as u64;
339 data.extend_from_slice(&key_len.to_le_bytes());
340 data.extend_from_slice(&entry.sort_key);
341 data.extend_from_slice(&entry.event.timestamp.to_le_bytes());
342 }
343
344 OperatorState {
345 operator_id: self.operator_id.clone(),
346 data,
347 }
348 }
349
350 #[allow(clippy::cast_possible_truncation)] fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
352 if state.data.len() < 24 {
353 return Err(OperatorError::SerializationFailed(
354 "TopK checkpoint data too short".to_string(),
355 ));
356 }
357
358 let mut offset = 0;
359 let count = u64::from_le_bytes(
360 state.data[offset..offset + 8]
361 .try_into()
362 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
363 ) as usize;
364 offset += 8;
365
366 self.current_watermark = i64::from_le_bytes(
367 state.data[offset..offset + 8]
368 .try_into()
369 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
370 );
371 offset += 8;
372
373 self.sequence_counter = u64::from_le_bytes(
374 state.data[offset..offset + 8]
375 .try_into()
376 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
377 );
378 offset += 8;
379
380 self.entries.clear();
382 for _ in 0..count {
383 if offset + 8 > state.data.len() {
384 return Err(OperatorError::SerializationFailed(
385 "TopK checkpoint truncated".to_string(),
386 ));
387 }
388 let key_len = u64::from_le_bytes(
389 state.data[offset..offset + 8]
390 .try_into()
391 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
392 ) as usize;
393 offset += 8;
394
395 if offset + key_len + 8 > state.data.len() {
396 return Err(OperatorError::SerializationFailed(
397 "TopK checkpoint truncated at key".to_string(),
398 ));
399 }
400 let sort_key = state.data[offset..offset + key_len].to_vec();
401 offset += key_len;
402
403 let timestamp = i64::from_le_bytes(
404 state.data[offset..offset + 8]
405 .try_into()
406 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
407 );
408 offset += 8;
409
410 let batch = arrow_array::RecordBatch::new_empty(std::sync::Arc::new(
412 arrow_schema::Schema::empty(),
413 ));
414 self.entries.push(TopKEntry {
415 sort_key,
416 event: Event::new(timestamp, batch),
417 });
418 }
419
420 Ok(())
421 }
422}
423
424pub fn encode_null(nulls_first: bool, descending: bool, key: &mut Vec<u8>) {
428 if nulls_first {
433 if descending {
434 key.push(0xFF);
435 } else {
436 key.push(0x00);
437 }
438 } else if descending {
439 key.push(0x00);
440 } else {
441 key.push(0xFF);
442 }
443}
444
445pub fn encode_not_null(nulls_first: bool, descending: bool, key: &mut Vec<u8>) {
447 if nulls_first {
448 if descending {
449 key.push(0x00);
450 } else {
451 key.push(0x01);
452 }
453 } else if descending {
454 key.push(0x01);
455 } else {
456 key.push(0x00);
457 }
458}
459
460pub fn encode_i64(val: i64, descending: bool, key: &mut Vec<u8>) {
465 #[allow(clippy::cast_sign_loss)]
466 let unsigned = (val as u64) ^ (1u64 << 63);
467 let bytes = unsigned.to_be_bytes();
468 if descending {
469 key.extend(bytes.iter().map(|b| !b));
470 } else {
471 key.extend_from_slice(&bytes);
472 }
473}
474
475pub fn encode_f64(val: f64, descending: bool, key: &mut Vec<u8>) {
481 let bits = val.to_bits();
482 let encoded = if bits & (1u64 << 63) == 0 {
483 bits ^ (1u64 << 63)
484 } else {
485 !bits
486 };
487 let bytes = encoded.to_be_bytes();
488 if descending {
489 key.extend(bytes.iter().map(|b| !b));
490 } else {
491 key.extend_from_slice(&bytes);
492 }
493}
494
495pub fn encode_utf8(val: &str, descending: bool, key: &mut Vec<u8>) {
500 if descending {
501 key.extend(val.as_bytes().iter().map(|b| !b));
502 key.push(0xFF); } else {
504 key.extend_from_slice(val.as_bytes());
505 key.push(0x00); }
507}
508
509#[cfg(test)]
510#[allow(clippy::cast_possible_wrap)]
511mod tests {
512 use super::super::window::CdcOperation;
513 use super::*;
514 use crate::state::InMemoryStore;
515 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
516 use arrow_array::{Float64Array, Int64Array, RecordBatch, StringArray};
517 use arrow_schema::{DataType, Field, Schema};
518 use std::sync::Arc;
519
520 fn make_event(timestamp: i64, price: f64) -> Event {
521 let schema = Arc::new(Schema::new(vec![Field::new(
522 "price",
523 DataType::Float64,
524 false,
525 )]));
526 let batch =
527 RecordBatch::try_new(schema, vec![Arc::new(Float64Array::from(vec![price]))]).unwrap();
528 Event::new(timestamp, batch)
529 }
530
531 fn make_event_i64(timestamp: i64, value: i64) -> Event {
532 let schema = Arc::new(Schema::new(vec![Field::new(
533 "value",
534 DataType::Int64,
535 false,
536 )]));
537 let batch =
538 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
539 Event::new(timestamp, batch)
540 }
541
542 fn make_event_str(timestamp: i64, name: &str) -> Event {
543 let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, false)]));
544 let batch =
545 RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(vec![name]))]).unwrap();
546 Event::new(timestamp, batch)
547 }
548
549 fn make_multi_column_event(timestamp: i64, category: &str, price: f64) -> Event {
550 let schema = Arc::new(Schema::new(vec![
551 Field::new("category", DataType::Utf8, false),
552 Field::new("price", DataType::Float64, false),
553 ]));
554 let batch = RecordBatch::try_new(
555 schema,
556 vec![
557 Arc::new(StringArray::from(vec![category])),
558 Arc::new(Float64Array::from(vec![price])),
559 ],
560 )
561 .unwrap();
562 Event::new(timestamp, batch)
563 }
564
565 fn create_topk(
566 k: usize,
567 sort_columns: Vec<TopKSortColumn>,
568 strategy: TopKEmitStrategy,
569 ) -> StreamingTopKOperator {
570 StreamingTopKOperator::new("test_topk".to_string(), k, sort_columns, strategy)
571 }
572
573 fn create_test_context<'a>(
574 timers: &'a mut TimerService,
575 state: &'a mut dyn crate::state::StateStore,
576 watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
577 ) -> OperatorContext<'a> {
578 OperatorContext {
579 event_time: 0,
580 processing_time: 0,
581 timers,
582 state,
583 watermark_generator: watermark_gen,
584 operator_index: 0,
585 }
586 }
587
588 #[test]
591 fn test_topk_sort_key_extraction_int64() {
592 let op = create_topk(
593 3,
594 vec![TopKSortColumn::ascending("value")],
595 TopKEmitStrategy::OnUpdate,
596 );
597 let e1 = make_event_i64(1, 100);
598 let e2 = make_event_i64(2, 200);
599 let e3 = make_event_i64(3, -50);
600
601 let k1 = op.extract_sort_key(&e1);
602 let k2 = op.extract_sort_key(&e2);
603 let k3 = op.extract_sort_key(&e3);
604
605 assert!(k3 < k1);
607 assert!(k1 < k2);
608 }
609
610 #[test]
611 fn test_topk_sort_key_extraction_float64() {
612 let op = create_topk(
613 3,
614 vec![TopKSortColumn::descending("price")],
615 TopKEmitStrategy::OnUpdate,
616 );
617 let e1 = make_event(1, 150.0);
618 let e2 = make_event(2, 200.0);
619 let e3 = make_event(3, 100.0);
620
621 let k1 = op.extract_sort_key(&e1);
622 let k2 = op.extract_sort_key(&e2);
623 let k3 = op.extract_sort_key(&e3);
624
625 assert!(k2 < k1);
627 assert!(k1 < k3);
628 }
629
630 #[test]
631 fn test_topk_sort_key_extraction_utf8() {
632 let op = create_topk(
633 3,
634 vec![TopKSortColumn::ascending("name")],
635 TopKEmitStrategy::OnUpdate,
636 );
637 let e1 = make_event_str(1, "apple");
638 let e2 = make_event_str(2, "banana");
639 let e3 = make_event_str(3, "cherry");
640
641 let k1 = op.extract_sort_key(&e1);
642 let k2 = op.extract_sort_key(&e2);
643 let k3 = op.extract_sort_key(&e3);
644
645 assert!(k1 < k2);
647 assert!(k2 < k3);
648 }
649
650 #[test]
651 fn test_topk_sort_key_extraction_timestamp() {
652 let schema = Arc::new(Schema::new(vec![Field::new(
653 "ts",
654 DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
655 false,
656 )]));
657 let batch = RecordBatch::try_new(
658 schema,
659 vec![Arc::new(arrow_array::TimestampMicrosecondArray::from(
660 vec![1000],
661 ))],
662 )
663 .unwrap();
664 let event = Event::new(1, batch);
665
666 let op = create_topk(
667 3,
668 vec![TopKSortColumn::ascending("ts")],
669 TopKEmitStrategy::OnUpdate,
670 );
671 let key = op.extract_sort_key(&event);
672 assert!(!key.is_empty());
673 }
674
675 #[test]
678 fn test_topk_insert_below_capacity() {
679 let mut op = create_topk(
680 3,
681 vec![TopKSortColumn::descending("price")],
682 TopKEmitStrategy::OnUpdate,
683 );
684
685 let mut timers = TimerService::new();
686 let mut state = InMemoryStore::new();
687 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
688
689 let event = make_event(1, 150.0);
690 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
691 let outputs = op.process(&event, &mut ctx);
692
693 assert_eq!(op.len(), 1);
694 assert!(!outputs.is_empty());
696 }
697
698 #[test]
699 fn test_topk_insert_at_capacity_better_entry() {
700 let mut op = create_topk(
701 2,
702 vec![TopKSortColumn::descending("price")],
703 TopKEmitStrategy::OnUpdate,
704 );
705
706 let mut timers = TimerService::new();
707 let mut state = InMemoryStore::new();
708 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
709
710 for (i, price) in [100.0, 150.0].iter().enumerate() {
712 let event = make_event(i as i64, *price);
713 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
714 op.process(&event, &mut ctx);
715 }
716 assert_eq!(op.len(), 2);
717
718 let better = make_event(3, 200.0);
720 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
721 let outputs = op.process(&better, &mut ctx);
722
723 assert_eq!(op.len(), 2);
724 assert!(outputs.len() >= 2);
726 }
727
728 #[test]
729 fn test_topk_insert_at_capacity_worse_entry() {
730 let mut op = create_topk(
731 2,
732 vec![TopKSortColumn::descending("price")],
733 TopKEmitStrategy::OnUpdate,
734 );
735
736 let mut timers = TimerService::new();
737 let mut state = InMemoryStore::new();
738 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
739
740 for (i, price) in [200.0, 150.0].iter().enumerate() {
742 let event = make_event(i as i64, *price);
743 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
744 op.process(&event, &mut ctx);
745 }
746
747 let worse = make_event(3, 50.0);
749 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
750 let outputs = op.process(&worse, &mut ctx);
751
752 assert_eq!(op.len(), 2);
753 assert!(outputs.is_empty());
755 }
756
757 #[test]
758 fn test_topk_ascending_order() {
759 let mut op = create_topk(
760 3,
761 vec![TopKSortColumn::ascending("value")],
762 TopKEmitStrategy::OnUpdate,
763 );
764
765 let mut timers = TimerService::new();
766 let mut state = InMemoryStore::new();
767 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
768
769 for (i, val) in [30i64, 10, 20].iter().enumerate() {
771 let event = make_event_i64(i as i64, *val);
772 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
773 op.process(&event, &mut ctx);
774 }
775
776 assert_eq!(op.len(), 3);
777 let entries = op.entries();
779 let vals: Vec<i64> = entries
780 .iter()
781 .map(|e| {
782 e.data
783 .column(0)
784 .as_any()
785 .downcast_ref::<Int64Array>()
786 .unwrap()
787 .value(0)
788 })
789 .collect();
790 assert_eq!(vals, vec![10, 20, 30]);
791 }
792
793 #[test]
794 fn test_topk_descending_order() {
795 let mut op = create_topk(
796 3,
797 vec![TopKSortColumn::descending("price")],
798 TopKEmitStrategy::OnUpdate,
799 );
800
801 let mut timers = TimerService::new();
802 let mut state = InMemoryStore::new();
803 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
804
805 for (i, price) in [100.0, 200.0, 150.0].iter().enumerate() {
807 let event = make_event(i as i64, *price);
808 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
809 op.process(&event, &mut ctx);
810 }
811
812 let entries = op.entries();
813 let prices: Vec<f64> = entries
814 .iter()
815 .map(|e| {
816 e.data
817 .column(0)
818 .as_any()
819 .downcast_ref::<Float64Array>()
820 .unwrap()
821 .value(0)
822 })
823 .collect();
824 assert_eq!(prices, vec![200.0, 150.0, 100.0]);
825 }
826
827 #[test]
828 fn test_topk_multi_column_sort() {
829 let mut op = create_topk(
830 3,
831 vec![
832 TopKSortColumn::ascending("category"),
833 TopKSortColumn::descending("price"),
834 ],
835 TopKEmitStrategy::OnUpdate,
836 );
837
838 let mut timers = TimerService::new();
839 let mut state = InMemoryStore::new();
840 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
841
842 let events = vec![
843 make_multi_column_event(1, "B", 100.0),
844 make_multi_column_event(2, "A", 200.0),
845 make_multi_column_event(3, "A", 150.0),
846 ];
847
848 for event in &events {
849 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
850 op.process(event, &mut ctx);
851 }
852
853 let entries = op.entries();
856 let cats: Vec<&str> = entries
857 .iter()
858 .map(|e| {
859 e.data
860 .column(0)
861 .as_any()
862 .downcast_ref::<StringArray>()
863 .unwrap()
864 .value(0)
865 })
866 .collect();
867 assert_eq!(cats, vec!["A", "A", "B"]);
868 }
869
870 #[test]
871 fn test_topk_nulls_first() {
872 let sort_cols = vec![TopKSortColumn::ascending("value").with_nulls_first(true)];
873 let mut op = create_topk(3, sort_cols, TopKEmitStrategy::OnUpdate);
874
875 let mut timers = TimerService::new();
876 let mut state = InMemoryStore::new();
877 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
878
879 let schema = Arc::new(Schema::new(vec![Field::new(
881 "value",
882 DataType::Int64,
883 true,
884 )]));
885 let null_array = Int64Array::new_null(1);
886 let batch = RecordBatch::try_new(schema, vec![Arc::new(null_array)]).unwrap();
887 let null_event = Event::new(1, batch);
888
889 let val_event = make_event_i64(2, 100);
890
891 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
892 op.process(&val_event, &mut ctx);
893 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
894 op.process(&null_event, &mut ctx);
895
896 let entries = op.entries();
898 assert_eq!(entries.len(), 2);
899 assert!(entries[0].data.column(0).is_null(0));
900 }
901
902 #[test]
903 fn test_topk_nulls_last() {
904 let sort_cols = vec![TopKSortColumn::ascending("value").with_nulls_first(false)];
905 let mut op = create_topk(3, sort_cols, TopKEmitStrategy::OnUpdate);
906
907 let mut timers = TimerService::new();
908 let mut state = InMemoryStore::new();
909 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
910
911 let schema = Arc::new(Schema::new(vec![Field::new(
912 "value",
913 DataType::Int64,
914 true,
915 )]));
916 let null_array = Int64Array::new_null(1);
917 let batch = RecordBatch::try_new(schema, vec![Arc::new(null_array)]).unwrap();
918 let null_event = Event::new(1, batch);
919
920 let val_event = make_event_i64(2, 100);
921
922 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
923 op.process(&null_event, &mut ctx);
924 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
925 op.process(&val_event, &mut ctx);
926
927 let entries = op.entries();
929 assert_eq!(entries.len(), 2);
930 assert!(!entries[0].data.column(0).is_null(0));
931 }
932
933 #[test]
936 fn test_topk_emit_on_update_insert() {
937 let mut op = create_topk(
938 3,
939 vec![TopKSortColumn::descending("price")],
940 TopKEmitStrategy::OnUpdate,
941 );
942
943 let mut timers = TimerService::new();
944 let mut state = InMemoryStore::new();
945 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
946
947 let event = make_event(1, 150.0);
948 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
949 let outputs = op.process(&event, &mut ctx);
950
951 assert_eq!(outputs.len(), 1);
953 match &outputs[0] {
954 Output::Changelog(rec) => {
955 assert_eq!(rec.operation, CdcOperation::Insert);
956 assert_eq!(rec.weight, 1);
957 }
958 _ => panic!("Expected Changelog output"),
959 }
960 }
961
962 #[test]
963 fn test_topk_emit_on_update_eviction() {
964 let mut op = create_topk(
965 1,
966 vec![TopKSortColumn::descending("price")],
967 TopKEmitStrategy::OnUpdate,
968 );
969
970 let mut timers = TimerService::new();
971 let mut state = InMemoryStore::new();
972 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
973
974 let event1 = make_event(1, 100.0);
976 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
977 op.process(&event1, &mut ctx);
978
979 let event2 = make_event(2, 200.0);
981 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
982 let outputs = op.process(&event2, &mut ctx);
983
984 let mut has_insert = false;
986 let mut has_delete = false;
987 for output in &outputs {
988 if let Output::Changelog(rec) = output {
989 match rec.operation {
990 CdcOperation::Insert => has_insert = true,
991 CdcOperation::Delete => has_delete = true,
992 _ => {}
993 }
994 }
995 }
996 assert!(has_insert);
997 assert!(has_delete);
998 }
999
1000 #[test]
1001 fn test_topk_emit_on_update_rank_change() {
1002 let mut op = create_topk(
1003 3,
1004 vec![TopKSortColumn::descending("price")],
1005 TopKEmitStrategy::OnUpdate,
1006 );
1007
1008 let mut timers = TimerService::new();
1009 let mut state = InMemoryStore::new();
1010 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1011
1012 let e1 = make_event(1, 100.0);
1014 let e2 = make_event(2, 200.0);
1015 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1016 op.process(&e1, &mut ctx);
1017 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1018 op.process(&e2, &mut ctx);
1019
1020 let e3 = make_event(3, 150.0);
1022 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1023 let outputs = op.process(&e3, &mut ctx);
1024
1025 let mut has_insert = false;
1027 let mut has_update_before = false;
1028 let mut has_update_after = false;
1029 for output in &outputs {
1030 if let Output::Changelog(rec) = output {
1031 match rec.operation {
1032 CdcOperation::Insert => has_insert = true,
1033 CdcOperation::UpdateBefore => has_update_before = true,
1034 CdcOperation::UpdateAfter => has_update_after = true,
1035 CdcOperation::Delete => {}
1036 }
1037 }
1038 }
1039 assert!(has_insert);
1040 assert!(has_update_before);
1041 assert!(has_update_after);
1042 }
1043
1044 #[test]
1045 fn test_topk_emit_on_watermark_batched() {
1046 let mut op = create_topk(
1047 3,
1048 vec![TopKSortColumn::descending("price")],
1049 TopKEmitStrategy::OnWatermark,
1050 );
1051
1052 let mut timers = TimerService::new();
1053 let mut state = InMemoryStore::new();
1054 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1055
1056 let e1 = make_event(1, 100.0);
1058 let e2 = make_event(2, 200.0);
1059 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1060 let out1 = op.process(&e1, &mut ctx);
1061 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1062 let out2 = op.process(&e2, &mut ctx);
1063
1064 assert!(out1.is_empty());
1065 assert!(out2.is_empty());
1066 assert!(op.pending_changes_count() > 0);
1067 }
1068
1069 #[test]
1070 fn test_topk_emit_periodic() {
1071 let mut op = create_topk(
1072 3,
1073 vec![TopKSortColumn::descending("price")],
1074 TopKEmitStrategy::Periodic(1000),
1075 );
1076
1077 let mut timers = TimerService::new();
1078 let mut state = InMemoryStore::new();
1079 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1080
1081 let e1 = make_event(1, 100.0);
1083 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1084 op.process(&e1, &mut ctx);
1085
1086 assert!(op.pending_changes_count() > 0);
1087
1088 let timer = Timer {
1090 key: smallvec::smallvec![],
1091 timestamp: 1000,
1092 };
1093 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1094 let outputs = op.on_timer(timer, &mut ctx);
1095
1096 assert!(!outputs.is_empty());
1097 assert_eq!(op.pending_changes_count(), 0);
1098 }
1099
1100 #[test]
1103 fn test_topk_empty_heap() {
1104 let op = create_topk(
1105 3,
1106 vec![TopKSortColumn::descending("price")],
1107 TopKEmitStrategy::OnUpdate,
1108 );
1109 assert!(op.is_empty());
1110 assert_eq!(op.len(), 0);
1111 }
1112
1113 #[test]
1114 fn test_topk_k_equals_one() {
1115 let mut op = create_topk(
1116 1,
1117 vec![TopKSortColumn::descending("price")],
1118 TopKEmitStrategy::OnUpdate,
1119 );
1120
1121 let mut timers = TimerService::new();
1122 let mut state = InMemoryStore::new();
1123 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1124
1125 let e1 = make_event(1, 100.0);
1126 let e2 = make_event(2, 200.0);
1127 let e3 = make_event(3, 50.0);
1128
1129 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1130 op.process(&e1, &mut ctx);
1131 assert_eq!(op.len(), 1);
1132
1133 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1134 op.process(&e2, &mut ctx);
1135 assert_eq!(op.len(), 1);
1136
1137 let entries = op.entries();
1139 let price = entries[0]
1140 .data
1141 .column(0)
1142 .as_any()
1143 .downcast_ref::<Float64Array>()
1144 .unwrap()
1145 .value(0);
1146 assert!((price - 200.0).abs() < f64::EPSILON);
1147
1148 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1150 let outputs = op.process(&e3, &mut ctx);
1151 assert!(outputs.is_empty());
1152 }
1153
1154 #[test]
1155 fn test_topk_large_k() {
1156 let mut op = create_topk(
1157 100,
1158 vec![TopKSortColumn::ascending("value")],
1159 TopKEmitStrategy::OnUpdate,
1160 );
1161
1162 let mut timers = TimerService::new();
1163 let mut state = InMemoryStore::new();
1164 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1165
1166 for i in 0..50 {
1167 let event = make_event_i64(i, i * 10);
1168 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1169 op.process(&event, &mut ctx);
1170 }
1171
1172 assert_eq!(op.len(), 50);
1173 }
1174
1175 #[test]
1176 fn test_topk_duplicate_sort_keys() {
1177 let mut op = create_topk(
1178 3,
1179 vec![TopKSortColumn::descending("price")],
1180 TopKEmitStrategy::OnUpdate,
1181 );
1182
1183 let mut timers = TimerService::new();
1184 let mut state = InMemoryStore::new();
1185 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1186
1187 for i in 0..3 {
1189 let event = make_event(i, 100.0);
1190 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1191 op.process(&event, &mut ctx);
1192 }
1193
1194 assert_eq!(op.len(), 3);
1195 }
1196
1197 #[test]
1200 fn test_topk_checkpoint_roundtrip() {
1201 let mut op = create_topk(
1202 3,
1203 vec![TopKSortColumn::descending("price")],
1204 TopKEmitStrategy::OnUpdate,
1205 );
1206
1207 let mut timers = TimerService::new();
1208 let mut state = InMemoryStore::new();
1209 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1210
1211 for (i, price) in [150.0, 200.0, 100.0].iter().enumerate() {
1212 let event = make_event(i as i64, *price);
1213 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1214 op.process(&event, &mut ctx);
1215 }
1216
1217 let checkpoint = op.checkpoint();
1218 assert_eq!(checkpoint.operator_id, "test_topk");
1219 assert!(!checkpoint.data.is_empty());
1220
1221 let mut op2 = create_topk(
1223 3,
1224 vec![TopKSortColumn::descending("price")],
1225 TopKEmitStrategy::OnUpdate,
1226 );
1227 op2.restore(checkpoint).unwrap();
1228
1229 assert_eq!(op2.len(), 3);
1230 }
1231
1232 #[test]
1233 fn test_topk_restore_and_continue() {
1234 let mut op = create_topk(
1235 2,
1236 vec![TopKSortColumn::descending("price")],
1237 TopKEmitStrategy::OnUpdate,
1238 );
1239
1240 let mut timers = TimerService::new();
1241 let mut state = InMemoryStore::new();
1242 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1243
1244 let event = make_event(1, 150.0);
1245 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1246 op.process(&event, &mut ctx);
1247
1248 let checkpoint = op.checkpoint();
1249
1250 let mut op2 = create_topk(
1251 2,
1252 vec![TopKSortColumn::descending("price")],
1253 TopKEmitStrategy::OnUpdate,
1254 );
1255 op2.restore(checkpoint).unwrap();
1256
1257 assert_eq!(op2.len(), 1);
1259 }
1260
1261 #[test]
1264 fn test_topk_changelog_record_types() {
1265 let record = ChangelogRecord::insert(make_event(1, 100.0), 1);
1266 assert_eq!(record.operation, CdcOperation::Insert);
1267 assert_eq!(record.weight, 1);
1268
1269 let record = ChangelogRecord::delete(make_event(1, 100.0), 1);
1270 assert_eq!(record.operation, CdcOperation::Delete);
1271 assert_eq!(record.weight, -1);
1272
1273 let (before, after) =
1274 ChangelogRecord::update(make_event(1, 100.0), make_event(2, 200.0), 1);
1275 assert_eq!(before.operation, CdcOperation::UpdateBefore);
1276 assert_eq!(after.operation, CdcOperation::UpdateAfter);
1277 }
1278
1279 #[test]
1280 fn test_topk_no_emission_on_no_change() {
1281 let mut op = create_topk(
1282 2,
1283 vec![TopKSortColumn::descending("price")],
1284 TopKEmitStrategy::OnUpdate,
1285 );
1286
1287 let mut timers = TimerService::new();
1288 let mut state = InMemoryStore::new();
1289 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1290
1291 let e1 = make_event(1, 200.0);
1293 let e2 = make_event(2, 150.0);
1294 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1295 op.process(&e1, &mut ctx);
1296 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1297 op.process(&e2, &mut ctx);
1298
1299 let e3 = make_event(3, 50.0);
1301 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1302 let outputs = op.process(&e3, &mut ctx);
1303
1304 assert!(outputs.is_empty());
1305 }
1306}