1use std::cmp::Ordering;
72use std::collections::BinaryHeap;
73
74use sochdb_core::{SochRow, SochValue};
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum SortDirection {
83 Ascending,
84 Descending,
85}
86
87#[derive(Debug, Clone)]
89pub struct OrderByColumn {
90 pub column: ColumnRef,
92 pub direction: SortDirection,
94 pub nulls_first: bool,
96}
97
98#[derive(Debug, Clone)]
100pub enum ColumnRef {
101 Index(usize),
103 Name(String),
105}
106
107impl ColumnRef {
108 pub fn resolve(&self, columns: &[String]) -> Option<usize> {
110 match self {
111 ColumnRef::Index(i) => Some(*i),
112 ColumnRef::Name(name) => columns.iter().position(|c| c == name),
113 }
114 }
115}
116
117#[derive(Debug, Clone)]
119pub struct OrderBySpec {
120 pub columns: Vec<OrderByColumn>,
122}
123
124impl OrderBySpec {
125 pub fn single(column: ColumnRef, direction: SortDirection) -> Self {
127 Self {
128 columns: vec![OrderByColumn {
129 column,
130 direction,
131 nulls_first: false,
132 }],
133 }
134 }
135
136 pub fn then_by(mut self, column: ColumnRef, direction: SortDirection) -> Self {
138 self.columns.push(OrderByColumn {
139 column,
140 direction,
141 nulls_first: false,
142 });
143 self
144 }
145
146 pub fn comparator(&self, column_names: &[String]) -> impl Fn(&SochRow, &SochRow) -> Ordering {
148 let resolved: Vec<_> = self
149 .columns
150 .iter()
151 .filter_map(|col| {
152 col.column
153 .resolve(column_names)
154 .map(|idx| (idx, col.direction, col.nulls_first))
155 })
156 .collect();
157
158 move |a: &SochRow, b: &SochRow| {
159 for &(idx, direction, nulls_first) in &resolved {
160 let val_a = a.values.get(idx);
161 let val_b = b.values.get(idx);
162
163 let ordering = compare_values(val_a, val_b, nulls_first);
164
165 if ordering != Ordering::Equal {
166 return match direction {
167 SortDirection::Ascending => ordering,
168 SortDirection::Descending => ordering.reverse(),
169 };
170 }
171 }
172 Ordering::Equal
173 }
174 }
175
176 pub fn matches_index(&self, index_columns: &[(String, SortDirection)]) -> bool {
178 if self.columns.len() > index_columns.len() {
179 return false;
180 }
181
182 self.columns
183 .iter()
184 .zip(index_columns.iter())
185 .all(|(col, (idx_col, idx_dir))| {
186 match &col.column {
187 ColumnRef::Name(name) => name == idx_col && col.direction == *idx_dir,
188 ColumnRef::Index(_) => false, }
190 })
191 }
192}
193
194fn compare_values(a: Option<&SochValue>, b: Option<&SochValue>, nulls_first: bool) -> Ordering {
196 match (a, b) {
197 (None, None) => Ordering::Equal,
198 (None, Some(_)) => {
199 if nulls_first {
200 Ordering::Less
201 } else {
202 Ordering::Greater
203 }
204 }
205 (Some(_), None) => {
206 if nulls_first {
207 Ordering::Greater
208 } else {
209 Ordering::Less
210 }
211 }
212 (Some(SochValue::Null), Some(SochValue::Null)) => Ordering::Equal,
213 (Some(SochValue::Null), Some(_)) => {
214 if nulls_first {
215 Ordering::Less
216 } else {
217 Ordering::Greater
218 }
219 }
220 (Some(_), Some(SochValue::Null)) => {
221 if nulls_first {
222 Ordering::Greater
223 } else {
224 Ordering::Less
225 }
226 }
227 (Some(a), Some(b)) => compare_soch_values(a, b),
228 }
229}
230
231fn compare_soch_values(a: &SochValue, b: &SochValue) -> Ordering {
233 match (a, b) {
234 (SochValue::Int(a), SochValue::Int(b)) => a.cmp(b),
235 (SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b),
236 (SochValue::Float(a), SochValue::Float(b)) => a.partial_cmp(b).unwrap_or(Ordering::Equal),
237 (SochValue::Text(a), SochValue::Text(b)) => a.cmp(b),
238 (SochValue::Bool(a), SochValue::Bool(b)) => a.cmp(b),
239 _ => Ordering::Equal, }
241}
242
243pub struct TopKHeap<T, F>
257where
258 F: Fn(&T, &T) -> Ordering,
259{
260 heap: BinaryHeap<ComparableWrapper<T, F>>,
262 k: usize,
264 comparator: F,
266 want_smallest: bool,
268}
269
270struct ComparableWrapper<T, F>
272where
273 F: Fn(&T, &T) -> Ordering,
274{
275 value: T,
276 comparator: *const F,
277 inverted: bool,
278}
279
280unsafe impl<T: Send, F> Send for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
282unsafe impl<T: Sync, F> Sync for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
283
284impl<T, F> PartialEq for ComparableWrapper<T, F>
285where
286 F: Fn(&T, &T) -> Ordering,
287{
288 fn eq(&self, other: &Self) -> bool {
289 self.cmp(other) == Ordering::Equal
290 }
291}
292
293impl<T, F> Eq for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
294
295impl<T, F> PartialOrd for ComparableWrapper<T, F>
296where
297 F: Fn(&T, &T) -> Ordering,
298{
299 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
300 Some(self.cmp(other))
301 }
302}
303
304impl<T, F> Ord for ComparableWrapper<T, F>
305where
306 F: Fn(&T, &T) -> Ordering,
307{
308 fn cmp(&self, other: &Self) -> Ordering {
309 let cmp = unsafe { &*self.comparator };
311 let result = cmp(&self.value, &other.value);
312
313 if self.inverted {
314 result.reverse()
315 } else {
316 result
317 }
318 }
319}
320
321impl<T, F> TopKHeap<T, F>
322where
323 F: Fn(&T, &T) -> Ordering,
324{
325 pub fn new(k: usize, comparator: F, want_smallest: bool) -> Self {
331 Self {
332 heap: BinaryHeap::with_capacity(k + 1),
333 k,
334 comparator,
335 want_smallest,
336 }
337 }
338
339 pub fn push(&mut self, value: T) {
343 if self.k == 0 {
344 return;
345 }
346
347 let wrapper = ComparableWrapper {
348 value,
349 comparator: &self.comparator as *const F,
350 inverted: !self.want_smallest,
353 };
354
355 if self.heap.len() < self.k {
356 self.heap.push(wrapper);
357 } else if let Some(top) = self.heap.peek() {
358 let should_replace = if self.want_smallest {
360 (self.comparator)(&wrapper.value, &top.value) == Ordering::Less
362 } else {
363 (self.comparator)(&wrapper.value, &top.value) == Ordering::Greater
365 };
366
367 if should_replace {
368 self.heap.pop();
369 self.heap.push(wrapper);
370 }
371 }
372 }
373
374 pub fn threshold(&self) -> Option<&T> {
378 self.heap.peek().map(|w| &w.value)
379 }
380
381 pub fn is_full(&self) -> bool {
383 self.heap.len() >= self.k
384 }
385
386 pub fn into_sorted_vec(self) -> Vec<T> {
390 let mut values: Vec<_> = self.heap.into_iter().map(|w| w.value).collect();
391 if self.want_smallest {
392 values.sort_by(&self.comparator);
393 } else {
394 values.sort_by(|a, b| (&self.comparator)(b, a));
396 }
397 values
398 }
399
400 pub fn len(&self) -> usize {
402 self.heap.len()
403 }
404
405 pub fn is_empty(&self) -> bool {
407 self.heap.is_empty()
408 }
409}
410
411#[derive(Debug, Clone, Copy, PartialEq, Eq)]
417pub enum ExecutionStrategy {
418 IndexPushdown,
420 StreamingTopK,
422 FullSort,
424}
425
426impl ExecutionStrategy {
427 pub fn choose(has_matching_index: bool, estimated_rows: Option<usize>, limit: usize) -> Self {
429 if has_matching_index {
431 return ExecutionStrategy::IndexPushdown;
432 }
433
434 let n = match estimated_rows {
436 Some(n) if n > 0 => n,
437 _ => return ExecutionStrategy::StreamingTopK,
438 };
439
440 let k = limit;
443
444 if k <= 100 {
445 ExecutionStrategy::StreamingTopK
447 } else if (k as f64) < (n as f64).sqrt() {
448 ExecutionStrategy::StreamingTopK
450 } else {
451 ExecutionStrategy::FullSort
454 }
455 }
456
457 pub fn complexity(&self, n: usize, k: usize) -> String {
459 match self {
460 ExecutionStrategy::IndexPushdown => {
461 format!(
462 "O(log {} + {}) = O({})",
463 n,
464 k,
465 (n as f64).log2() as usize + k
466 )
467 }
468 ExecutionStrategy::StreamingTopK => {
469 let log_k = (k as f64).log2().max(1.0) as usize;
470 format!("O({} * log {}) ≈ O({})", n, k, n * log_k)
471 }
472 ExecutionStrategy::FullSort => {
473 let log_n = (n as f64).log2().max(1.0) as usize;
474 format!("O({} * log {}) ≈ O({})", n, n, n * log_n)
475 }
476 }
477 }
478}
479
480#[derive(Debug, Clone, Default)]
482pub struct OrderByLimitStats {
483 pub strategy: Option<ExecutionStrategy>,
485 pub input_rows: usize,
487 pub output_rows: usize,
489 pub heap_operations: usize,
491 pub comparisons: usize,
493 pub offset_skipped: usize,
495}
496
497pub struct OrderByLimitExecutor {
502 order_by: OrderBySpec,
504 limit: usize,
506 offset: usize,
508 column_names: Vec<String>,
510 strategy: ExecutionStrategy,
512}
513
514impl OrderByLimitExecutor {
515 pub fn new(
517 order_by: OrderBySpec,
518 limit: usize,
519 offset: usize,
520 column_names: Vec<String>,
521 has_matching_index: bool,
522 estimated_rows: Option<usize>,
523 ) -> Self {
524 let effective_limit = limit.saturating_add(offset);
526 let strategy =
527 ExecutionStrategy::choose(has_matching_index, estimated_rows, effective_limit);
528
529 Self {
530 order_by,
531 limit,
532 offset,
533 column_names,
534 strategy,
535 }
536 }
537
538 pub fn strategy(&self) -> ExecutionStrategy {
540 self.strategy
541 }
542
543 pub fn execute<I>(&self, rows: I) -> (Vec<SochRow>, OrderByLimitStats)
550 where
551 I: Iterator<Item = SochRow>,
552 {
553 let mut stats = OrderByLimitStats {
554 strategy: Some(self.strategy),
555 ..Default::default()
556 };
557
558 let effective_limit = self.limit.saturating_add(self.offset);
559
560 let result = match self.strategy {
561 ExecutionStrategy::IndexPushdown => {
562 let collected: Vec<_> = rows.take(effective_limit).collect();
565 stats.input_rows = collected.len();
566 collected
567 }
568 ExecutionStrategy::StreamingTopK => {
569 self.execute_streaming(rows, effective_limit, &mut stats)
570 }
571 ExecutionStrategy::FullSort => {
572 self.execute_full_sort(rows, effective_limit, &mut stats)
573 }
574 };
575
576 let final_result: Vec<_> = result
578 .into_iter()
579 .skip(self.offset)
580 .take(self.limit)
581 .collect();
582
583 stats.offset_skipped = self.offset.min(stats.input_rows);
584 stats.output_rows = final_result.len();
585
586 (final_result, stats)
587 }
588
589 fn execute_streaming<I>(&self, rows: I, k: usize, stats: &mut OrderByLimitStats) -> Vec<SochRow>
591 where
592 I: Iterator<Item = SochRow>,
593 {
594 let comparator = self.order_by.comparator(&self.column_names);
595
596 let mut heap = TopKHeap::new(k, comparator, true);
598
599 for row in rows {
600 stats.input_rows += 1;
601 stats.heap_operations += 1;
602 heap.push(row);
603 }
604
605 heap.into_sorted_vec()
606 }
607
608 fn execute_full_sort<I>(&self, rows: I, k: usize, stats: &mut OrderByLimitStats) -> Vec<SochRow>
610 where
611 I: Iterator<Item = SochRow>,
612 {
613 let comparator = self.order_by.comparator(&self.column_names);
614
615 let mut all_rows: Vec<_> = rows.collect();
616 stats.input_rows = all_rows.len();
617
618 all_rows.sort_by(&comparator);
619
620 all_rows.truncate(k);
621 all_rows
622 }
623}
624
625pub struct IndexAwareTopK<T, F>
635where
636 F: Fn(&T, &T) -> Ordering,
637{
638 current_batch: Vec<T>,
640 result: Vec<T>,
642 k: usize,
644 secondary_cmp: F,
646}
647
648impl<T, F> IndexAwareTopK<T, F>
649where
650 F: Fn(&T, &T) -> Ordering,
651{
652 pub fn new(k: usize, secondary_cmp: F) -> Self {
654 Self {
655 current_batch: Vec::new(),
656 result: Vec::with_capacity(k),
657 k,
658 secondary_cmp,
659 }
660 }
661
662 pub fn push(&mut self, item: T, same_index_key_as_previous: bool) {
667 if !same_index_key_as_previous {
668 self.finalize_batch();
669 }
670
671 self.current_batch.push(item);
672 }
673
674 fn finalize_batch(&mut self) {
676 if self.current_batch.is_empty() {
677 return;
678 }
679
680 self.current_batch.sort_by(&self.secondary_cmp);
682
683 let remaining = self.k.saturating_sub(self.result.len());
685 let to_take = remaining.min(self.current_batch.len());
686
687 self.result.extend(self.current_batch.drain(..to_take));
688 self.current_batch.clear();
689 }
690
691 pub fn is_complete(&self) -> bool {
693 self.result.len() >= self.k
694 }
695
696 pub fn into_result(mut self) -> Vec<T> {
698 self.finalize_batch();
699 self.result
700 }
701}
702
703pub struct SingleColumnTopK {
711 heap: BinaryHeap<SingleColEntry>,
713 k: usize,
715 col_idx: usize,
717 ascending: bool,
719}
720
721struct SingleColEntry {
722 row: SochRow,
723 key: OrderableValue,
724 ascending: bool,
725}
726
727#[derive(Clone)]
729enum OrderableValue {
730 Int(i64),
731 UInt(u64),
732 Float(f64),
733 Text(String),
734 Bool(bool),
735 Null,
736}
737
738impl From<&SochValue> for OrderableValue {
739 fn from(v: &SochValue) -> Self {
740 match v {
741 SochValue::Int(i) => OrderableValue::Int(*i),
742 SochValue::UInt(u) => OrderableValue::UInt(*u),
743 SochValue::Float(f) => OrderableValue::Float(*f),
744 SochValue::Text(s) => OrderableValue::Text(s.clone()),
745 SochValue::Bool(b) => OrderableValue::Bool(*b),
746 SochValue::Null => OrderableValue::Null,
747 _ => OrderableValue::Null,
748 }
749 }
750}
751
752impl PartialEq for OrderableValue {
753 fn eq(&self, other: &Self) -> bool {
754 self.cmp(other) == Ordering::Equal
755 }
756}
757
758impl Eq for OrderableValue {}
759
760impl PartialOrd for OrderableValue {
761 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
762 Some(self.cmp(other))
763 }
764}
765
766impl Ord for OrderableValue {
767 fn cmp(&self, other: &Self) -> Ordering {
768 match (self, other) {
769 (OrderableValue::Null, OrderableValue::Null) => Ordering::Equal,
770 (OrderableValue::Null, _) => Ordering::Greater, (_, OrderableValue::Null) => Ordering::Less,
772 (OrderableValue::Int(a), OrderableValue::Int(b)) => a.cmp(b),
773 (OrderableValue::UInt(a), OrderableValue::UInt(b)) => a.cmp(b),
774 (OrderableValue::Float(a), OrderableValue::Float(b)) => {
775 a.partial_cmp(b).unwrap_or(Ordering::Equal)
776 }
777 (OrderableValue::Text(a), OrderableValue::Text(b)) => a.cmp(b),
778 (OrderableValue::Bool(a), OrderableValue::Bool(b)) => a.cmp(b),
779 _ => Ordering::Equal, }
781 }
782}
783
784impl PartialEq for SingleColEntry {
785 fn eq(&self, other: &Self) -> bool {
786 self.key == other.key
787 }
788}
789
790impl Eq for SingleColEntry {}
791
792impl PartialOrd for SingleColEntry {
793 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
794 Some(self.cmp(other))
795 }
796}
797
798impl Ord for SingleColEntry {
799 fn cmp(&self, other: &Self) -> Ordering {
800 let base = self.key.cmp(&other.key);
801
802 if self.ascending { base } else { base.reverse() }
805 }
806}
807
808impl SingleColumnTopK {
809 pub fn new(k: usize, col_idx: usize, ascending: bool) -> Self {
811 Self {
812 heap: BinaryHeap::with_capacity(k + 1),
813 k,
814 col_idx,
815 ascending,
816 }
817 }
818
819 pub fn push(&mut self, row: SochRow) {
821 if self.k == 0 {
822 return;
823 }
824
825 let key = row
826 .values
827 .get(self.col_idx)
828 .map(OrderableValue::from)
829 .unwrap_or(OrderableValue::Null);
830
831 let entry = SingleColEntry {
832 row,
833 key,
834 ascending: self.ascending,
835 };
836
837 if self.heap.len() < self.k {
838 self.heap.push(entry);
839 } else if let Some(top) = self.heap.peek() {
840 let should_replace = if self.ascending {
841 entry.key < top.key
842 } else {
843 entry.key > top.key
844 };
845
846 if should_replace {
847 self.heap.pop();
848 self.heap.push(entry);
849 }
850 }
851 }
852
853 pub fn into_sorted_vec(self) -> Vec<SochRow> {
855 let mut entries: Vec<_> = self.heap.into_iter().collect();
856
857 entries.sort_by(|a, b| {
858 let base = a.key.cmp(&b.key);
859 if self.ascending { base } else { base.reverse() }
860 });
861
862 entries.into_iter().map(|e| e.row).collect()
863 }
864
865 pub fn len(&self) -> usize {
867 self.heap.len()
868 }
869
870 pub fn is_empty(&self) -> bool {
872 self.heap.is_empty()
873 }
874}
875
876#[cfg(test)]
881mod tests {
882 use super::*;
883
884 fn make_row(values: Vec<SochValue>) -> SochRow {
885 SochRow::new(values)
886 }
887
888 #[test]
889 fn test_strategy_selection() {
890 assert_eq!(
892 ExecutionStrategy::choose(true, Some(1_000_000), 10),
893 ExecutionStrategy::IndexPushdown
894 );
895
896 assert_eq!(
898 ExecutionStrategy::choose(false, Some(1_000_000), 10),
899 ExecutionStrategy::StreamingTopK
900 );
901
902 assert_eq!(
905 ExecutionStrategy::choose(false, Some(1000), 500),
906 ExecutionStrategy::FullSort
907 );
908
909 assert_eq!(
911 ExecutionStrategy::choose(false, Some(100), 90),
912 ExecutionStrategy::StreamingTopK
913 );
914 }
915
916 #[test]
917 fn test_order_by_spec_comparator() {
918 let spec = OrderBySpec::single(
919 ColumnRef::Name("priority".to_string()),
920 SortDirection::Ascending,
921 );
922
923 let columns = vec!["id".to_string(), "priority".to_string(), "name".to_string()];
924 let cmp = spec.comparator(&columns);
925
926 let row1 = make_row(vec![
927 SochValue::Int(1),
928 SochValue::Int(5),
929 SochValue::Text("A".to_string()),
930 ]);
931 let row2 = make_row(vec![
932 SochValue::Int(2),
933 SochValue::Int(3),
934 SochValue::Text("B".to_string()),
935 ]);
936
937 assert_eq!(cmp(&row2, &row1), Ordering::Less);
939 }
940
941 #[test]
942 fn test_topk_heap_ascending() {
943 let cmp = |a: &i32, b: &i32| a.cmp(b);
944 let mut heap = TopKHeap::new(3, cmp, true);
945
946 for i in [5, 2, 8, 1, 9, 3, 7, 4, 6] {
947 heap.push(i);
948 }
949
950 let result = heap.into_sorted_vec();
951 assert_eq!(result, vec![1, 2, 3]);
952 }
953
954 #[test]
955 fn test_topk_heap_descending() {
956 let cmp = |a: &i32, b: &i32| a.cmp(b);
957 let mut heap = TopKHeap::new(3, cmp, false);
958
959 for i in [5, 2, 8, 1, 9, 3, 7, 4, 6] {
960 heap.push(i);
961 }
962
963 let result = heap.into_sorted_vec();
964 assert_eq!(result, vec![9, 8, 7]);
966 }
967
968 #[test]
969 fn test_executor_streaming() {
970 let columns = vec!["priority".to_string(), "name".to_string()];
971 let order_by = OrderBySpec::single(
972 ColumnRef::Name("priority".to_string()),
973 SortDirection::Ascending,
974 );
975
976 let executor = OrderByLimitExecutor::new(
977 order_by,
978 3, 0, columns.clone(),
981 false, Some(10),
983 );
984
985 let rows = vec![
987 make_row(vec![SochValue::Int(5), SochValue::Text("E".to_string())]),
988 make_row(vec![SochValue::Int(3), SochValue::Text("C".to_string())]),
989 make_row(vec![SochValue::Int(1), SochValue::Text("A".to_string())]),
990 make_row(vec![SochValue::Int(4), SochValue::Text("D".to_string())]),
991 make_row(vec![SochValue::Int(2), SochValue::Text("B".to_string())]),
992 ];
993
994 let (result, stats) = executor.execute(rows.into_iter());
995
996 assert_eq!(result.len(), 3);
998 assert_eq!(result[0].values[0], SochValue::Int(1));
999 assert_eq!(result[1].values[0], SochValue::Int(2));
1000 assert_eq!(result[2].values[0], SochValue::Int(3));
1001
1002 assert_eq!(stats.input_rows, 5);
1003 assert_eq!(stats.output_rows, 3);
1004 }
1005
1006 #[test]
1007 fn test_executor_with_offset() {
1008 let columns = vec!["priority".to_string()];
1009 let order_by = OrderBySpec::single(
1010 ColumnRef::Name("priority".to_string()),
1011 SortDirection::Ascending,
1012 );
1013
1014 let executor = OrderByLimitExecutor::new(
1015 order_by,
1016 2, 2, columns,
1019 false,
1020 Some(10),
1021 );
1022
1023 let rows = vec![
1026 make_row(vec![SochValue::Int(5)]),
1027 make_row(vec![SochValue::Int(3)]),
1028 make_row(vec![SochValue::Int(1)]),
1029 make_row(vec![SochValue::Int(4)]),
1030 make_row(vec![SochValue::Int(2)]),
1031 ];
1032
1033 let (result, _) = executor.execute(rows.into_iter());
1034
1035 assert_eq!(result.len(), 2);
1036 assert_eq!(result[0].values[0], SochValue::Int(3));
1037 assert_eq!(result[1].values[0], SochValue::Int(4));
1038 }
1039
1040 #[test]
1041 fn test_single_column_topk() {
1042 let mut topk = SingleColumnTopK::new(3, 0, true); topk.push(make_row(vec![SochValue::Int(5)]));
1045 topk.push(make_row(vec![SochValue::Int(3)]));
1046 topk.push(make_row(vec![SochValue::Int(1)]));
1047 topk.push(make_row(vec![SochValue::Int(4)]));
1048 topk.push(make_row(vec![SochValue::Int(2)]));
1049
1050 let result = topk.into_sorted_vec();
1051
1052 assert_eq!(result.len(), 3);
1053 assert_eq!(result[0].values[0], SochValue::Int(1));
1054 assert_eq!(result[1].values[0], SochValue::Int(2));
1055 assert_eq!(result[2].values[0], SochValue::Int(3));
1056 }
1057
1058 #[test]
1059 fn test_correctness_vs_buggy_implementation() {
1060 let columns = vec!["priority".to_string()];
1062 let order_by = OrderBySpec::single(
1063 ColumnRef::Name("priority".to_string()),
1064 SortDirection::Ascending,
1065 );
1066
1067 let rows: Vec<_> = [5, 2, 8, 1, 9, 3]
1069 .iter()
1070 .map(|&p| make_row(vec![SochValue::Int(p)]))
1071 .collect();
1072
1073 let buggy: Vec<_> = rows.iter().take(3).cloned().collect();
1075 let executor = OrderByLimitExecutor::new(order_by, 3, 0, columns, false, Some(6));
1080 let (correct, _) = executor.execute(rows.into_iter());
1081
1082 assert_eq!(correct[0].values[0], SochValue::Int(1));
1084 assert_eq!(correct[1].values[0], SochValue::Int(2));
1085 assert_eq!(correct[2].values[0], SochValue::Int(3));
1086 }
1087
1088 #[test]
1089 fn test_multi_column_order_by() {
1090 let columns = vec!["priority".to_string(), "created_at".to_string()];
1091
1092 let order_by = OrderBySpec::single(
1093 ColumnRef::Name("priority".to_string()),
1094 SortDirection::Ascending,
1095 )
1096 .then_by(
1097 ColumnRef::Name("created_at".to_string()),
1098 SortDirection::Descending,
1099 );
1100
1101 let executor = OrderByLimitExecutor::new(order_by, 3, 0, columns, false, Some(5));
1102
1103 let rows = vec![
1105 make_row(vec![SochValue::Int(1), SochValue::Int(100)]),
1106 make_row(vec![SochValue::Int(1), SochValue::Int(200)]), make_row(vec![SochValue::Int(2), SochValue::Int(150)]),
1108 make_row(vec![SochValue::Int(1), SochValue::Int(150)]),
1109 make_row(vec![SochValue::Int(3), SochValue::Int(100)]),
1110 ];
1111
1112 let (result, _) = executor.execute(rows.into_iter());
1113
1114 assert_eq!(result.len(), 3);
1117 assert_eq!(result[0].values[0], SochValue::Int(1));
1118 assert_eq!(result[0].values[1], SochValue::Int(200));
1119 assert_eq!(result[1].values[0], SochValue::Int(1));
1120 assert_eq!(result[1].values[1], SochValue::Int(150));
1121 assert_eq!(result[2].values[0], SochValue::Int(1));
1122 assert_eq!(result[2].values[1], SochValue::Int(100));
1123 }
1124}