1use std::cmp::Ordering;
72use std::collections::BinaryHeap;
73
74use sochdb_core::{SochRow, SochValue};
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum SortDirection {
83 Ascending,
84 Descending,
85}
86
87#[derive(Debug, Clone)]
89pub struct OrderByColumn {
90 pub column: ColumnRef,
92 pub direction: SortDirection,
94 pub nulls_first: bool,
96}
97
98#[derive(Debug, Clone)]
100pub enum ColumnRef {
101 Index(usize),
103 Name(String),
105}
106
107impl ColumnRef {
108 pub fn resolve(&self, columns: &[String]) -> Option<usize> {
110 match self {
111 ColumnRef::Index(i) => Some(*i),
112 ColumnRef::Name(name) => columns.iter().position(|c| c == name),
113 }
114 }
115}
116
117#[derive(Debug, Clone)]
119pub struct OrderBySpec {
120 pub columns: Vec<OrderByColumn>,
122}
123
124impl OrderBySpec {
125 pub fn single(column: ColumnRef, direction: SortDirection) -> Self {
127 Self {
128 columns: vec![OrderByColumn {
129 column,
130 direction,
131 nulls_first: false,
132 }],
133 }
134 }
135
136 pub fn then_by(mut self, column: ColumnRef, direction: SortDirection) -> Self {
138 self.columns.push(OrderByColumn {
139 column,
140 direction,
141 nulls_first: false,
142 });
143 self
144 }
145
146 pub fn comparator(&self, column_names: &[String]) -> impl Fn(&SochRow, &SochRow) -> Ordering {
148 let resolved: Vec<_> = self.columns
149 .iter()
150 .filter_map(|col| {
151 col.column.resolve(column_names).map(|idx| (idx, col.direction, col.nulls_first))
152 })
153 .collect();
154
155 move |a: &SochRow, b: &SochRow| {
156 for &(idx, direction, nulls_first) in &resolved {
157 let val_a = a.values.get(idx);
158 let val_b = b.values.get(idx);
159
160 let ordering = compare_values(val_a, val_b, nulls_first);
161
162 if ordering != Ordering::Equal {
163 return match direction {
164 SortDirection::Ascending => ordering,
165 SortDirection::Descending => ordering.reverse(),
166 };
167 }
168 }
169 Ordering::Equal
170 }
171 }
172
173 pub fn matches_index(&self, index_columns: &[(String, SortDirection)]) -> bool {
175 if self.columns.len() > index_columns.len() {
176 return false;
177 }
178
179 self.columns.iter().zip(index_columns.iter()).all(|(col, (idx_col, idx_dir))| {
180 match &col.column {
181 ColumnRef::Name(name) => name == idx_col && col.direction == *idx_dir,
182 ColumnRef::Index(_) => false, }
184 })
185 }
186}
187
188fn compare_values(a: Option<&SochValue>, b: Option<&SochValue>, nulls_first: bool) -> Ordering {
190 match (a, b) {
191 (None, None) => Ordering::Equal,
192 (None, Some(_)) => if nulls_first { Ordering::Less } else { Ordering::Greater },
193 (Some(_), None) => if nulls_first { Ordering::Greater } else { Ordering::Less },
194 (Some(SochValue::Null), Some(SochValue::Null)) => Ordering::Equal,
195 (Some(SochValue::Null), Some(_)) => if nulls_first { Ordering::Less } else { Ordering::Greater },
196 (Some(_), Some(SochValue::Null)) => if nulls_first { Ordering::Greater } else { Ordering::Less },
197 (Some(a), Some(b)) => compare_soch_values(a, b),
198 }
199}
200
201fn compare_soch_values(a: &SochValue, b: &SochValue) -> Ordering {
203 match (a, b) {
204 (SochValue::Int(a), SochValue::Int(b)) => a.cmp(b),
205 (SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b),
206 (SochValue::Float(a), SochValue::Float(b)) => a.partial_cmp(b).unwrap_or(Ordering::Equal),
207 (SochValue::Text(a), SochValue::Text(b)) => a.cmp(b),
208 (SochValue::Bool(a), SochValue::Bool(b)) => a.cmp(b),
209 _ => Ordering::Equal, }
211}
212
213pub struct TopKHeap<T, F>
227where
228 F: Fn(&T, &T) -> Ordering,
229{
230 heap: BinaryHeap<ComparableWrapper<T, F>>,
232 k: usize,
234 comparator: F,
236 want_smallest: bool,
238}
239
240struct ComparableWrapper<T, F>
242where
243 F: Fn(&T, &T) -> Ordering,
244{
245 value: T,
246 comparator: *const F,
247 inverted: bool,
248}
249
250unsafe impl<T: Send, F> Send for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
252unsafe impl<T: Sync, F> Sync for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
253
254impl<T, F> PartialEq for ComparableWrapper<T, F>
255where
256 F: Fn(&T, &T) -> Ordering,
257{
258 fn eq(&self, other: &Self) -> bool {
259 self.cmp(other) == Ordering::Equal
260 }
261}
262
263impl<T, F> Eq for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
264
265impl<T, F> PartialOrd for ComparableWrapper<T, F>
266where
267 F: Fn(&T, &T) -> Ordering,
268{
269 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
270 Some(self.cmp(other))
271 }
272}
273
274impl<T, F> Ord for ComparableWrapper<T, F>
275where
276 F: Fn(&T, &T) -> Ordering,
277{
278 fn cmp(&self, other: &Self) -> Ordering {
279 let cmp = unsafe { &*self.comparator };
281 let result = cmp(&self.value, &other.value);
282
283 if self.inverted {
284 result.reverse()
285 } else {
286 result
287 }
288 }
289}
290
291impl<T, F> TopKHeap<T, F>
292where
293 F: Fn(&T, &T) -> Ordering,
294{
295 pub fn new(k: usize, comparator: F, want_smallest: bool) -> Self {
301 Self {
302 heap: BinaryHeap::with_capacity(k + 1),
303 k,
304 comparator,
305 want_smallest,
306 }
307 }
308
309 pub fn push(&mut self, value: T) {
313 if self.k == 0 {
314 return;
315 }
316
317 let wrapper = ComparableWrapper {
318 value,
319 comparator: &self.comparator as *const F,
320 inverted: !self.want_smallest,
323 };
324
325 if self.heap.len() < self.k {
326 self.heap.push(wrapper);
327 } else if let Some(top) = self.heap.peek() {
328 let should_replace = if self.want_smallest {
330 (self.comparator)(&wrapper.value, &top.value) == Ordering::Less
332 } else {
333 (self.comparator)(&wrapper.value, &top.value) == Ordering::Greater
335 };
336
337 if should_replace {
338 self.heap.pop();
339 self.heap.push(wrapper);
340 }
341 }
342 }
343
344 pub fn threshold(&self) -> Option<&T> {
348 self.heap.peek().map(|w| &w.value)
349 }
350
351 pub fn is_full(&self) -> bool {
353 self.heap.len() >= self.k
354 }
355
356 pub fn into_sorted_vec(self) -> Vec<T> {
360 let mut values: Vec<_> = self.heap.into_iter().map(|w| w.value).collect();
361 if self.want_smallest {
362 values.sort_by(&self.comparator);
363 } else {
364 values.sort_by(|a, b| (&self.comparator)(b, a));
366 }
367 values
368 }
369
370 pub fn len(&self) -> usize {
372 self.heap.len()
373 }
374
375 pub fn is_empty(&self) -> bool {
377 self.heap.is_empty()
378 }
379}
380
381#[derive(Debug, Clone, Copy, PartialEq, Eq)]
387pub enum ExecutionStrategy {
388 IndexPushdown,
390 StreamingTopK,
392 FullSort,
394}
395
396impl ExecutionStrategy {
397 pub fn choose(
399 has_matching_index: bool,
400 estimated_rows: Option<usize>,
401 limit: usize,
402 ) -> Self {
403 if has_matching_index {
405 return ExecutionStrategy::IndexPushdown;
406 }
407
408 let n = match estimated_rows {
410 Some(n) if n > 0 => n,
411 _ => return ExecutionStrategy::StreamingTopK,
412 };
413
414 let k = limit;
417
418 if k <= 100 {
419 ExecutionStrategy::StreamingTopK
421 } else if (k as f64) < (n as f64).sqrt() {
422 ExecutionStrategy::StreamingTopK
424 } else {
425 ExecutionStrategy::FullSort
428 }
429 }
430
431 pub fn complexity(&self, n: usize, k: usize) -> String {
433 match self {
434 ExecutionStrategy::IndexPushdown => {
435 format!("O(log {} + {}) = O({})", n, k, (n as f64).log2() as usize + k)
436 }
437 ExecutionStrategy::StreamingTopK => {
438 let log_k = (k as f64).log2().max(1.0) as usize;
439 format!("O({} * log {}) ≈ O({})", n, k, n * log_k)
440 }
441 ExecutionStrategy::FullSort => {
442 let log_n = (n as f64).log2().max(1.0) as usize;
443 format!("O({} * log {}) ≈ O({})", n, n, n * log_n)
444 }
445 }
446 }
447}
448
449#[derive(Debug, Clone, Default)]
451pub struct OrderByLimitStats {
452 pub strategy: Option<ExecutionStrategy>,
454 pub input_rows: usize,
456 pub output_rows: usize,
458 pub heap_operations: usize,
460 pub comparisons: usize,
462 pub offset_skipped: usize,
464}
465
466pub struct OrderByLimitExecutor {
471 order_by: OrderBySpec,
473 limit: usize,
475 offset: usize,
477 column_names: Vec<String>,
479 strategy: ExecutionStrategy,
481}
482
483impl OrderByLimitExecutor {
484 pub fn new(
486 order_by: OrderBySpec,
487 limit: usize,
488 offset: usize,
489 column_names: Vec<String>,
490 has_matching_index: bool,
491 estimated_rows: Option<usize>,
492 ) -> Self {
493 let effective_limit = limit.saturating_add(offset);
495 let strategy = ExecutionStrategy::choose(has_matching_index, estimated_rows, effective_limit);
496
497 Self {
498 order_by,
499 limit,
500 offset,
501 column_names,
502 strategy,
503 }
504 }
505
506 pub fn strategy(&self) -> ExecutionStrategy {
508 self.strategy
509 }
510
511 pub fn execute<I>(&self, rows: I) -> (Vec<SochRow>, OrderByLimitStats)
518 where
519 I: Iterator<Item = SochRow>,
520 {
521 let mut stats = OrderByLimitStats {
522 strategy: Some(self.strategy),
523 ..Default::default()
524 };
525
526 let effective_limit = self.limit.saturating_add(self.offset);
527
528 let result = match self.strategy {
529 ExecutionStrategy::IndexPushdown => {
530 let collected: Vec<_> = rows.take(effective_limit).collect();
533 stats.input_rows = collected.len();
534 collected
535 }
536 ExecutionStrategy::StreamingTopK => {
537 self.execute_streaming(rows, effective_limit, &mut stats)
538 }
539 ExecutionStrategy::FullSort => {
540 self.execute_full_sort(rows, effective_limit, &mut stats)
541 }
542 };
543
544 let final_result: Vec<_> = result
546 .into_iter()
547 .skip(self.offset)
548 .take(self.limit)
549 .collect();
550
551 stats.offset_skipped = self.offset.min(stats.input_rows);
552 stats.output_rows = final_result.len();
553
554 (final_result, stats)
555 }
556
557 fn execute_streaming<I>(
559 &self,
560 rows: I,
561 k: usize,
562 stats: &mut OrderByLimitStats,
563 ) -> Vec<SochRow>
564 where
565 I: Iterator<Item = SochRow>,
566 {
567 let comparator = self.order_by.comparator(&self.column_names);
568
569 let mut heap = TopKHeap::new(k, comparator, true);
571
572 for row in rows {
573 stats.input_rows += 1;
574 stats.heap_operations += 1;
575 heap.push(row);
576 }
577
578 heap.into_sorted_vec()
579 }
580
581 fn execute_full_sort<I>(
583 &self,
584 rows: I,
585 k: usize,
586 stats: &mut OrderByLimitStats,
587 ) -> Vec<SochRow>
588 where
589 I: Iterator<Item = SochRow>,
590 {
591 let comparator = self.order_by.comparator(&self.column_names);
592
593 let mut all_rows: Vec<_> = rows.collect();
594 stats.input_rows = all_rows.len();
595
596 all_rows.sort_by(&comparator);
597
598 all_rows.truncate(k);
599 all_rows
600 }
601}
602
603pub struct IndexAwareTopK<T, F>
613where
614 F: Fn(&T, &T) -> Ordering,
615{
616 current_batch: Vec<T>,
618 result: Vec<T>,
620 k: usize,
622 secondary_cmp: F,
624}
625
626impl<T, F> IndexAwareTopK<T, F>
627where
628 F: Fn(&T, &T) -> Ordering,
629{
630 pub fn new(k: usize, secondary_cmp: F) -> Self {
632 Self {
633 current_batch: Vec::new(),
634 result: Vec::with_capacity(k),
635 k,
636 secondary_cmp,
637 }
638 }
639
640 pub fn push(&mut self, item: T, same_index_key_as_previous: bool) {
645 if !same_index_key_as_previous {
646 self.finalize_batch();
647 }
648
649 self.current_batch.push(item);
650 }
651
652 fn finalize_batch(&mut self) {
654 if self.current_batch.is_empty() {
655 return;
656 }
657
658 self.current_batch.sort_by(&self.secondary_cmp);
660
661 let remaining = self.k.saturating_sub(self.result.len());
663 let to_take = remaining.min(self.current_batch.len());
664
665 self.result.extend(self.current_batch.drain(..to_take));
666 self.current_batch.clear();
667 }
668
669 pub fn is_complete(&self) -> bool {
671 self.result.len() >= self.k
672 }
673
674 pub fn into_result(mut self) -> Vec<T> {
676 self.finalize_batch();
677 self.result
678 }
679}
680
681pub struct SingleColumnTopK {
689 heap: BinaryHeap<SingleColEntry>,
691 k: usize,
693 col_idx: usize,
695 ascending: bool,
697}
698
699struct SingleColEntry {
700 row: SochRow,
701 key: OrderableValue,
702 ascending: bool,
703}
704
705#[derive(Clone)]
707enum OrderableValue {
708 Int(i64),
709 UInt(u64),
710 Float(f64),
711 Text(String),
712 Bool(bool),
713 Null,
714}
715
716impl From<&SochValue> for OrderableValue {
717 fn from(v: &SochValue) -> Self {
718 match v {
719 SochValue::Int(i) => OrderableValue::Int(*i),
720 SochValue::UInt(u) => OrderableValue::UInt(*u),
721 SochValue::Float(f) => OrderableValue::Float(*f),
722 SochValue::Text(s) => OrderableValue::Text(s.clone()),
723 SochValue::Bool(b) => OrderableValue::Bool(*b),
724 SochValue::Null => OrderableValue::Null,
725 _ => OrderableValue::Null,
726 }
727 }
728}
729
730impl PartialEq for OrderableValue {
731 fn eq(&self, other: &Self) -> bool {
732 self.cmp(other) == Ordering::Equal
733 }
734}
735
736impl Eq for OrderableValue {}
737
738impl PartialOrd for OrderableValue {
739 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
740 Some(self.cmp(other))
741 }
742}
743
744impl Ord for OrderableValue {
745 fn cmp(&self, other: &Self) -> Ordering {
746 match (self, other) {
747 (OrderableValue::Null, OrderableValue::Null) => Ordering::Equal,
748 (OrderableValue::Null, _) => Ordering::Greater, (_, OrderableValue::Null) => Ordering::Less,
750 (OrderableValue::Int(a), OrderableValue::Int(b)) => a.cmp(b),
751 (OrderableValue::UInt(a), OrderableValue::UInt(b)) => a.cmp(b),
752 (OrderableValue::Float(a), OrderableValue::Float(b)) => {
753 a.partial_cmp(b).unwrap_or(Ordering::Equal)
754 }
755 (OrderableValue::Text(a), OrderableValue::Text(b)) => a.cmp(b),
756 (OrderableValue::Bool(a), OrderableValue::Bool(b)) => a.cmp(b),
757 _ => Ordering::Equal, }
759 }
760}
761
762impl PartialEq for SingleColEntry {
763 fn eq(&self, other: &Self) -> bool {
764 self.key == other.key
765 }
766}
767
768impl Eq for SingleColEntry {}
769
770impl PartialOrd for SingleColEntry {
771 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
772 Some(self.cmp(other))
773 }
774}
775
776impl Ord for SingleColEntry {
777 fn cmp(&self, other: &Self) -> Ordering {
778 let base = self.key.cmp(&other.key);
779
780 if self.ascending {
783 base
784 } else {
785 base.reverse()
786 }
787 }
788}
789
790impl SingleColumnTopK {
791 pub fn new(k: usize, col_idx: usize, ascending: bool) -> Self {
793 Self {
794 heap: BinaryHeap::with_capacity(k + 1),
795 k,
796 col_idx,
797 ascending,
798 }
799 }
800
801 pub fn push(&mut self, row: SochRow) {
803 if self.k == 0 {
804 return;
805 }
806
807 let key = row.values
808 .get(self.col_idx)
809 .map(OrderableValue::from)
810 .unwrap_or(OrderableValue::Null);
811
812 let entry = SingleColEntry {
813 row,
814 key,
815 ascending: self.ascending,
816 };
817
818 if self.heap.len() < self.k {
819 self.heap.push(entry);
820 } else if let Some(top) = self.heap.peek() {
821 let should_replace = if self.ascending {
822 entry.key < top.key
823 } else {
824 entry.key > top.key
825 };
826
827 if should_replace {
828 self.heap.pop();
829 self.heap.push(entry);
830 }
831 }
832 }
833
834 pub fn into_sorted_vec(self) -> Vec<SochRow> {
836 let mut entries: Vec<_> = self.heap.into_iter().collect();
837
838 entries.sort_by(|a, b| {
839 let base = a.key.cmp(&b.key);
840 if self.ascending { base } else { base.reverse() }
841 });
842
843 entries.into_iter().map(|e| e.row).collect()
844 }
845
846 pub fn len(&self) -> usize {
848 self.heap.len()
849 }
850
851 pub fn is_empty(&self) -> bool {
853 self.heap.is_empty()
854 }
855}
856
857#[cfg(test)]
862mod tests {
863 use super::*;
864
865 fn make_row(values: Vec<SochValue>) -> SochRow {
866 SochRow::new(values)
867 }
868
869 #[test]
870 fn test_strategy_selection() {
871 assert_eq!(
873 ExecutionStrategy::choose(true, Some(1_000_000), 10),
874 ExecutionStrategy::IndexPushdown
875 );
876
877 assert_eq!(
879 ExecutionStrategy::choose(false, Some(1_000_000), 10),
880 ExecutionStrategy::StreamingTopK
881 );
882
883 assert_eq!(
886 ExecutionStrategy::choose(false, Some(1000), 500),
887 ExecutionStrategy::FullSort
888 );
889
890 assert_eq!(
892 ExecutionStrategy::choose(false, Some(100), 90),
893 ExecutionStrategy::StreamingTopK
894 );
895 }
896
897 #[test]
898 fn test_order_by_spec_comparator() {
899 let spec = OrderBySpec::single(
900 ColumnRef::Name("priority".to_string()),
901 SortDirection::Ascending,
902 );
903
904 let columns = vec!["id".to_string(), "priority".to_string(), "name".to_string()];
905 let cmp = spec.comparator(&columns);
906
907 let row1 = make_row(vec![
908 SochValue::Int(1),
909 SochValue::Int(5),
910 SochValue::Text("A".to_string()),
911 ]);
912 let row2 = make_row(vec![
913 SochValue::Int(2),
914 SochValue::Int(3),
915 SochValue::Text("B".to_string()),
916 ]);
917
918 assert_eq!(cmp(&row2, &row1), Ordering::Less);
920 }
921
922 #[test]
923 fn test_topk_heap_ascending() {
924 let cmp = |a: &i32, b: &i32| a.cmp(b);
925 let mut heap = TopKHeap::new(3, cmp, true);
926
927 for i in [5, 2, 8, 1, 9, 3, 7, 4, 6] {
928 heap.push(i);
929 }
930
931 let result = heap.into_sorted_vec();
932 assert_eq!(result, vec![1, 2, 3]);
933 }
934
935 #[test]
936 fn test_topk_heap_descending() {
937 let cmp = |a: &i32, b: &i32| a.cmp(b);
938 let mut heap = TopKHeap::new(3, cmp, false);
939
940 for i in [5, 2, 8, 1, 9, 3, 7, 4, 6] {
941 heap.push(i);
942 }
943
944 let result = heap.into_sorted_vec();
945 assert_eq!(result, vec![9, 8, 7]);
947 }
948
949 #[test]
950 fn test_executor_streaming() {
951 let columns = vec!["priority".to_string(), "name".to_string()];
952 let order_by = OrderBySpec::single(
953 ColumnRef::Name("priority".to_string()),
954 SortDirection::Ascending,
955 );
956
957 let executor = OrderByLimitExecutor::new(
958 order_by,
959 3, 0, columns.clone(),
962 false, Some(10),
964 );
965
966 let rows = vec![
968 make_row(vec![SochValue::Int(5), SochValue::Text("E".to_string())]),
969 make_row(vec![SochValue::Int(3), SochValue::Text("C".to_string())]),
970 make_row(vec![SochValue::Int(1), SochValue::Text("A".to_string())]),
971 make_row(vec![SochValue::Int(4), SochValue::Text("D".to_string())]),
972 make_row(vec![SochValue::Int(2), SochValue::Text("B".to_string())]),
973 ];
974
975 let (result, stats) = executor.execute(rows.into_iter());
976
977 assert_eq!(result.len(), 3);
979 assert_eq!(result[0].values[0], SochValue::Int(1));
980 assert_eq!(result[1].values[0], SochValue::Int(2));
981 assert_eq!(result[2].values[0], SochValue::Int(3));
982
983 assert_eq!(stats.input_rows, 5);
984 assert_eq!(stats.output_rows, 3);
985 }
986
987 #[test]
988 fn test_executor_with_offset() {
989 let columns = vec!["priority".to_string()];
990 let order_by = OrderBySpec::single(
991 ColumnRef::Name("priority".to_string()),
992 SortDirection::Ascending,
993 );
994
995 let executor = OrderByLimitExecutor::new(
996 order_by,
997 2, 2, columns,
1000 false,
1001 Some(10),
1002 );
1003
1004 let rows = vec![
1007 make_row(vec![SochValue::Int(5)]),
1008 make_row(vec![SochValue::Int(3)]),
1009 make_row(vec![SochValue::Int(1)]),
1010 make_row(vec![SochValue::Int(4)]),
1011 make_row(vec![SochValue::Int(2)]),
1012 ];
1013
1014 let (result, _) = executor.execute(rows.into_iter());
1015
1016 assert_eq!(result.len(), 2);
1017 assert_eq!(result[0].values[0], SochValue::Int(3));
1018 assert_eq!(result[1].values[0], SochValue::Int(4));
1019 }
1020
1021 #[test]
1022 fn test_single_column_topk() {
1023 let mut topk = SingleColumnTopK::new(3, 0, true); topk.push(make_row(vec![SochValue::Int(5)]));
1026 topk.push(make_row(vec![SochValue::Int(3)]));
1027 topk.push(make_row(vec![SochValue::Int(1)]));
1028 topk.push(make_row(vec![SochValue::Int(4)]));
1029 topk.push(make_row(vec![SochValue::Int(2)]));
1030
1031 let result = topk.into_sorted_vec();
1032
1033 assert_eq!(result.len(), 3);
1034 assert_eq!(result[0].values[0], SochValue::Int(1));
1035 assert_eq!(result[1].values[0], SochValue::Int(2));
1036 assert_eq!(result[2].values[0], SochValue::Int(3));
1037 }
1038
1039 #[test]
1040 fn test_correctness_vs_buggy_implementation() {
1041 let columns = vec!["priority".to_string()];
1043 let order_by = OrderBySpec::single(
1044 ColumnRef::Name("priority".to_string()),
1045 SortDirection::Ascending,
1046 );
1047
1048 let rows: Vec<_> = [5, 2, 8, 1, 9, 3]
1050 .iter()
1051 .map(|&p| make_row(vec![SochValue::Int(p)]))
1052 .collect();
1053
1054 let buggy: Vec<_> = rows.iter().take(3).cloned().collect();
1056 let executor = OrderByLimitExecutor::new(
1061 order_by,
1062 3,
1063 0,
1064 columns,
1065 false,
1066 Some(6),
1067 );
1068 let (correct, _) = executor.execute(rows.into_iter());
1069
1070 assert_eq!(correct[0].values[0], SochValue::Int(1));
1072 assert_eq!(correct[1].values[0], SochValue::Int(2));
1073 assert_eq!(correct[2].values[0], SochValue::Int(3));
1074 }
1075
1076 #[test]
1077 fn test_multi_column_order_by() {
1078 let columns = vec!["priority".to_string(), "created_at".to_string()];
1079
1080 let order_by = OrderBySpec::single(
1081 ColumnRef::Name("priority".to_string()),
1082 SortDirection::Ascending,
1083 ).then_by(
1084 ColumnRef::Name("created_at".to_string()),
1085 SortDirection::Descending,
1086 );
1087
1088 let executor = OrderByLimitExecutor::new(
1089 order_by,
1090 3,
1091 0,
1092 columns,
1093 false,
1094 Some(5),
1095 );
1096
1097 let rows = vec![
1099 make_row(vec![SochValue::Int(1), SochValue::Int(100)]),
1100 make_row(vec![SochValue::Int(1), SochValue::Int(200)]), make_row(vec![SochValue::Int(2), SochValue::Int(150)]),
1102 make_row(vec![SochValue::Int(1), SochValue::Int(150)]),
1103 make_row(vec![SochValue::Int(3), SochValue::Int(100)]),
1104 ];
1105
1106 let (result, _) = executor.execute(rows.into_iter());
1107
1108 assert_eq!(result.len(), 3);
1111 assert_eq!(result[0].values[0], SochValue::Int(1));
1112 assert_eq!(result[0].values[1], SochValue::Int(200));
1113 assert_eq!(result[1].values[0], SochValue::Int(1));
1114 assert_eq!(result[1].values[1], SochValue::Int(150));
1115 assert_eq!(result[2].values[0], SochValue::Int(1));
1116 assert_eq!(result[2].values[1], SochValue::Int(100));
1117 }
1118}