1use std::cmp::Ordering;
69use std::collections::BinaryHeap;
70
71use sochdb_core::{SochRow, SochValue};
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum SortDirection {
80 Ascending,
81 Descending,
82}
83
84#[derive(Debug, Clone)]
86pub struct OrderByColumn {
87 pub column: ColumnRef,
89 pub direction: SortDirection,
91 pub nulls_first: bool,
93}
94
95#[derive(Debug, Clone)]
97pub enum ColumnRef {
98 Index(usize),
100 Name(String),
102}
103
104impl ColumnRef {
105 pub fn resolve(&self, columns: &[String]) -> Option<usize> {
107 match self {
108 ColumnRef::Index(i) => Some(*i),
109 ColumnRef::Name(name) => columns.iter().position(|c| c == name),
110 }
111 }
112}
113
114#[derive(Debug, Clone)]
116pub struct OrderBySpec {
117 pub columns: Vec<OrderByColumn>,
119}
120
121impl OrderBySpec {
122 pub fn single(column: ColumnRef, direction: SortDirection) -> Self {
124 Self {
125 columns: vec![OrderByColumn {
126 column,
127 direction,
128 nulls_first: false,
129 }],
130 }
131 }
132
133 pub fn then_by(mut self, column: ColumnRef, direction: SortDirection) -> Self {
135 self.columns.push(OrderByColumn {
136 column,
137 direction,
138 nulls_first: false,
139 });
140 self
141 }
142
143 pub fn comparator(&self, column_names: &[String]) -> impl Fn(&SochRow, &SochRow) -> Ordering {
145 let resolved: Vec<_> = self.columns
146 .iter()
147 .filter_map(|col| {
148 col.column.resolve(column_names).map(|idx| (idx, col.direction, col.nulls_first))
149 })
150 .collect();
151
152 move |a: &SochRow, b: &SochRow| {
153 for &(idx, direction, nulls_first) in &resolved {
154 let val_a = a.values.get(idx);
155 let val_b = b.values.get(idx);
156
157 let ordering = compare_values(val_a, val_b, nulls_first);
158
159 if ordering != Ordering::Equal {
160 return match direction {
161 SortDirection::Ascending => ordering,
162 SortDirection::Descending => ordering.reverse(),
163 };
164 }
165 }
166 Ordering::Equal
167 }
168 }
169
170 pub fn matches_index(&self, index_columns: &[(String, SortDirection)]) -> bool {
172 if self.columns.len() > index_columns.len() {
173 return false;
174 }
175
176 self.columns.iter().zip(index_columns.iter()).all(|(col, (idx_col, idx_dir))| {
177 match &col.column {
178 ColumnRef::Name(name) => name == idx_col && col.direction == *idx_dir,
179 ColumnRef::Index(_) => false, }
181 })
182 }
183}
184
185fn compare_values(a: Option<&SochValue>, b: Option<&SochValue>, nulls_first: bool) -> Ordering {
187 match (a, b) {
188 (None, None) => Ordering::Equal,
189 (None, Some(_)) => if nulls_first { Ordering::Less } else { Ordering::Greater },
190 (Some(_), None) => if nulls_first { Ordering::Greater } else { Ordering::Less },
191 (Some(SochValue::Null), Some(SochValue::Null)) => Ordering::Equal,
192 (Some(SochValue::Null), Some(_)) => if nulls_first { Ordering::Less } else { Ordering::Greater },
193 (Some(_), Some(SochValue::Null)) => if nulls_first { Ordering::Greater } else { Ordering::Less },
194 (Some(a), Some(b)) => compare_soch_values(a, b),
195 }
196}
197
198fn compare_soch_values(a: &SochValue, b: &SochValue) -> Ordering {
200 match (a, b) {
201 (SochValue::Int(a), SochValue::Int(b)) => a.cmp(b),
202 (SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b),
203 (SochValue::Float(a), SochValue::Float(b)) => a.partial_cmp(b).unwrap_or(Ordering::Equal),
204 (SochValue::Text(a), SochValue::Text(b)) => a.cmp(b),
205 (SochValue::Bool(a), SochValue::Bool(b)) => a.cmp(b),
206 _ => Ordering::Equal, }
208}
209
210pub struct TopKHeap<T, F>
224where
225 F: Fn(&T, &T) -> Ordering,
226{
227 heap: BinaryHeap<ComparableWrapper<T, F>>,
229 k: usize,
231 comparator: F,
233 want_smallest: bool,
235}
236
237struct ComparableWrapper<T, F>
239where
240 F: Fn(&T, &T) -> Ordering,
241{
242 value: T,
243 comparator: *const F,
244 inverted: bool,
245}
246
247unsafe impl<T: Send, F> Send for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
249unsafe impl<T: Sync, F> Sync for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
250
251impl<T, F> PartialEq for ComparableWrapper<T, F>
252where
253 F: Fn(&T, &T) -> Ordering,
254{
255 fn eq(&self, other: &Self) -> bool {
256 self.cmp(other) == Ordering::Equal
257 }
258}
259
260impl<T, F> Eq for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
261
262impl<T, F> PartialOrd for ComparableWrapper<T, F>
263where
264 F: Fn(&T, &T) -> Ordering,
265{
266 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
267 Some(self.cmp(other))
268 }
269}
270
271impl<T, F> Ord for ComparableWrapper<T, F>
272where
273 F: Fn(&T, &T) -> Ordering,
274{
275 fn cmp(&self, other: &Self) -> Ordering {
276 let cmp = unsafe { &*self.comparator };
278 let result = cmp(&self.value, &other.value);
279
280 if self.inverted {
281 result.reverse()
282 } else {
283 result
284 }
285 }
286}
287
288impl<T, F> TopKHeap<T, F>
289where
290 F: Fn(&T, &T) -> Ordering,
291{
292 pub fn new(k: usize, comparator: F, want_smallest: bool) -> Self {
298 Self {
299 heap: BinaryHeap::with_capacity(k + 1),
300 k,
301 comparator,
302 want_smallest,
303 }
304 }
305
306 pub fn push(&mut self, value: T) {
310 if self.k == 0 {
311 return;
312 }
313
314 let wrapper = ComparableWrapper {
315 value,
316 comparator: &self.comparator as *const F,
317 inverted: !self.want_smallest,
320 };
321
322 if self.heap.len() < self.k {
323 self.heap.push(wrapper);
324 } else if let Some(top) = self.heap.peek() {
325 let should_replace = if self.want_smallest {
327 (self.comparator)(&wrapper.value, &top.value) == Ordering::Less
329 } else {
330 (self.comparator)(&wrapper.value, &top.value) == Ordering::Greater
332 };
333
334 if should_replace {
335 self.heap.pop();
336 self.heap.push(wrapper);
337 }
338 }
339 }
340
341 pub fn threshold(&self) -> Option<&T> {
345 self.heap.peek().map(|w| &w.value)
346 }
347
348 pub fn is_full(&self) -> bool {
350 self.heap.len() >= self.k
351 }
352
353 pub fn into_sorted_vec(self) -> Vec<T> {
357 let mut values: Vec<_> = self.heap.into_iter().map(|w| w.value).collect();
358 if self.want_smallest {
359 values.sort_by(&self.comparator);
360 } else {
361 values.sort_by(|a, b| (&self.comparator)(b, a));
363 }
364 values
365 }
366
367 pub fn len(&self) -> usize {
369 self.heap.len()
370 }
371
372 pub fn is_empty(&self) -> bool {
374 self.heap.is_empty()
375 }
376}
377
378#[derive(Debug, Clone, Copy, PartialEq, Eq)]
384pub enum ExecutionStrategy {
385 IndexPushdown,
387 StreamingTopK,
389 FullSort,
391}
392
393impl ExecutionStrategy {
394 pub fn choose(
396 has_matching_index: bool,
397 estimated_rows: Option<usize>,
398 limit: usize,
399 ) -> Self {
400 if has_matching_index {
402 return ExecutionStrategy::IndexPushdown;
403 }
404
405 let n = match estimated_rows {
407 Some(n) if n > 0 => n,
408 _ => return ExecutionStrategy::StreamingTopK,
409 };
410
411 let k = limit;
414
415 if k <= 100 {
416 ExecutionStrategy::StreamingTopK
418 } else if (k as f64) < (n as f64).sqrt() {
419 ExecutionStrategy::StreamingTopK
421 } else {
422 ExecutionStrategy::FullSort
425 }
426 }
427
428 pub fn complexity(&self, n: usize, k: usize) -> String {
430 match self {
431 ExecutionStrategy::IndexPushdown => {
432 format!("O(log {} + {}) = O({})", n, k, (n as f64).log2() as usize + k)
433 }
434 ExecutionStrategy::StreamingTopK => {
435 let log_k = (k as f64).log2().max(1.0) as usize;
436 format!("O({} * log {}) ≈ O({})", n, k, n * log_k)
437 }
438 ExecutionStrategy::FullSort => {
439 let log_n = (n as f64).log2().max(1.0) as usize;
440 format!("O({} * log {}) ≈ O({})", n, n, n * log_n)
441 }
442 }
443 }
444}
445
446#[derive(Debug, Clone, Default)]
448pub struct OrderByLimitStats {
449 pub strategy: Option<ExecutionStrategy>,
451 pub input_rows: usize,
453 pub output_rows: usize,
455 pub heap_operations: usize,
457 pub comparisons: usize,
459 pub offset_skipped: usize,
461}
462
463pub struct OrderByLimitExecutor {
468 order_by: OrderBySpec,
470 limit: usize,
472 offset: usize,
474 column_names: Vec<String>,
476 strategy: ExecutionStrategy,
478}
479
480impl OrderByLimitExecutor {
481 pub fn new(
483 order_by: OrderBySpec,
484 limit: usize,
485 offset: usize,
486 column_names: Vec<String>,
487 has_matching_index: bool,
488 estimated_rows: Option<usize>,
489 ) -> Self {
490 let effective_limit = limit.saturating_add(offset);
492 let strategy = ExecutionStrategy::choose(has_matching_index, estimated_rows, effective_limit);
493
494 Self {
495 order_by,
496 limit,
497 offset,
498 column_names,
499 strategy,
500 }
501 }
502
503 pub fn strategy(&self) -> ExecutionStrategy {
505 self.strategy
506 }
507
508 pub fn execute<I>(&self, rows: I) -> (Vec<SochRow>, OrderByLimitStats)
515 where
516 I: Iterator<Item = SochRow>,
517 {
518 let mut stats = OrderByLimitStats {
519 strategy: Some(self.strategy),
520 ..Default::default()
521 };
522
523 let effective_limit = self.limit.saturating_add(self.offset);
524
525 let result = match self.strategy {
526 ExecutionStrategy::IndexPushdown => {
527 let collected: Vec<_> = rows.take(effective_limit).collect();
530 stats.input_rows = collected.len();
531 collected
532 }
533 ExecutionStrategy::StreamingTopK => {
534 self.execute_streaming(rows, effective_limit, &mut stats)
535 }
536 ExecutionStrategy::FullSort => {
537 self.execute_full_sort(rows, effective_limit, &mut stats)
538 }
539 };
540
541 let final_result: Vec<_> = result
543 .into_iter()
544 .skip(self.offset)
545 .take(self.limit)
546 .collect();
547
548 stats.offset_skipped = self.offset.min(stats.input_rows);
549 stats.output_rows = final_result.len();
550
551 (final_result, stats)
552 }
553
554 fn execute_streaming<I>(
556 &self,
557 rows: I,
558 k: usize,
559 stats: &mut OrderByLimitStats,
560 ) -> Vec<SochRow>
561 where
562 I: Iterator<Item = SochRow>,
563 {
564 let comparator = self.order_by.comparator(&self.column_names);
565
566 let mut heap = TopKHeap::new(k, comparator, true);
568
569 for row in rows {
570 stats.input_rows += 1;
571 stats.heap_operations += 1;
572 heap.push(row);
573 }
574
575 heap.into_sorted_vec()
576 }
577
578 fn execute_full_sort<I>(
580 &self,
581 rows: I,
582 k: usize,
583 stats: &mut OrderByLimitStats,
584 ) -> Vec<SochRow>
585 where
586 I: Iterator<Item = SochRow>,
587 {
588 let comparator = self.order_by.comparator(&self.column_names);
589
590 let mut all_rows: Vec<_> = rows.collect();
591 stats.input_rows = all_rows.len();
592
593 all_rows.sort_by(&comparator);
594
595 all_rows.truncate(k);
596 all_rows
597 }
598}
599
600pub struct IndexAwareTopK<T, F>
610where
611 F: Fn(&T, &T) -> Ordering,
612{
613 current_batch: Vec<T>,
615 result: Vec<T>,
617 k: usize,
619 secondary_cmp: F,
621}
622
623impl<T, F> IndexAwareTopK<T, F>
624where
625 F: Fn(&T, &T) -> Ordering,
626{
627 pub fn new(k: usize, secondary_cmp: F) -> Self {
629 Self {
630 current_batch: Vec::new(),
631 result: Vec::with_capacity(k),
632 k,
633 secondary_cmp,
634 }
635 }
636
637 pub fn push(&mut self, item: T, same_index_key_as_previous: bool) {
642 if !same_index_key_as_previous {
643 self.finalize_batch();
644 }
645
646 self.current_batch.push(item);
647 }
648
649 fn finalize_batch(&mut self) {
651 if self.current_batch.is_empty() {
652 return;
653 }
654
655 self.current_batch.sort_by(&self.secondary_cmp);
657
658 let remaining = self.k.saturating_sub(self.result.len());
660 let to_take = remaining.min(self.current_batch.len());
661
662 self.result.extend(self.current_batch.drain(..to_take));
663 self.current_batch.clear();
664 }
665
666 pub fn is_complete(&self) -> bool {
668 self.result.len() >= self.k
669 }
670
671 pub fn into_result(mut self) -> Vec<T> {
673 self.finalize_batch();
674 self.result
675 }
676}
677
678pub struct SingleColumnTopK {
686 heap: BinaryHeap<SingleColEntry>,
688 k: usize,
690 col_idx: usize,
692 ascending: bool,
694}
695
696struct SingleColEntry {
697 row: SochRow,
698 key: OrderableValue,
699 ascending: bool,
700}
701
702#[derive(Clone)]
704enum OrderableValue {
705 Int(i64),
706 UInt(u64),
707 Float(f64),
708 Text(String),
709 Bool(bool),
710 Null,
711}
712
713impl From<&SochValue> for OrderableValue {
714 fn from(v: &SochValue) -> Self {
715 match v {
716 SochValue::Int(i) => OrderableValue::Int(*i),
717 SochValue::UInt(u) => OrderableValue::UInt(*u),
718 SochValue::Float(f) => OrderableValue::Float(*f),
719 SochValue::Text(s) => OrderableValue::Text(s.clone()),
720 SochValue::Bool(b) => OrderableValue::Bool(*b),
721 SochValue::Null => OrderableValue::Null,
722 _ => OrderableValue::Null,
723 }
724 }
725}
726
727impl PartialEq for OrderableValue {
728 fn eq(&self, other: &Self) -> bool {
729 self.cmp(other) == Ordering::Equal
730 }
731}
732
733impl Eq for OrderableValue {}
734
735impl PartialOrd for OrderableValue {
736 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
737 Some(self.cmp(other))
738 }
739}
740
741impl Ord for OrderableValue {
742 fn cmp(&self, other: &Self) -> Ordering {
743 match (self, other) {
744 (OrderableValue::Null, OrderableValue::Null) => Ordering::Equal,
745 (OrderableValue::Null, _) => Ordering::Greater, (_, OrderableValue::Null) => Ordering::Less,
747 (OrderableValue::Int(a), OrderableValue::Int(b)) => a.cmp(b),
748 (OrderableValue::UInt(a), OrderableValue::UInt(b)) => a.cmp(b),
749 (OrderableValue::Float(a), OrderableValue::Float(b)) => {
750 a.partial_cmp(b).unwrap_or(Ordering::Equal)
751 }
752 (OrderableValue::Text(a), OrderableValue::Text(b)) => a.cmp(b),
753 (OrderableValue::Bool(a), OrderableValue::Bool(b)) => a.cmp(b),
754 _ => Ordering::Equal, }
756 }
757}
758
759impl PartialEq for SingleColEntry {
760 fn eq(&self, other: &Self) -> bool {
761 self.key == other.key
762 }
763}
764
765impl Eq for SingleColEntry {}
766
767impl PartialOrd for SingleColEntry {
768 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
769 Some(self.cmp(other))
770 }
771}
772
773impl Ord for SingleColEntry {
774 fn cmp(&self, other: &Self) -> Ordering {
775 let base = self.key.cmp(&other.key);
776
777 if self.ascending {
780 base
781 } else {
782 base.reverse()
783 }
784 }
785}
786
787impl SingleColumnTopK {
788 pub fn new(k: usize, col_idx: usize, ascending: bool) -> Self {
790 Self {
791 heap: BinaryHeap::with_capacity(k + 1),
792 k,
793 col_idx,
794 ascending,
795 }
796 }
797
798 pub fn push(&mut self, row: SochRow) {
800 if self.k == 0 {
801 return;
802 }
803
804 let key = row.values
805 .get(self.col_idx)
806 .map(OrderableValue::from)
807 .unwrap_or(OrderableValue::Null);
808
809 let entry = SingleColEntry {
810 row,
811 key,
812 ascending: self.ascending,
813 };
814
815 if self.heap.len() < self.k {
816 self.heap.push(entry);
817 } else if let Some(top) = self.heap.peek() {
818 let should_replace = if self.ascending {
819 entry.key < top.key
820 } else {
821 entry.key > top.key
822 };
823
824 if should_replace {
825 self.heap.pop();
826 self.heap.push(entry);
827 }
828 }
829 }
830
831 pub fn into_sorted_vec(self) -> Vec<SochRow> {
833 let mut entries: Vec<_> = self.heap.into_iter().collect();
834
835 entries.sort_by(|a, b| {
836 let base = a.key.cmp(&b.key);
837 if self.ascending { base } else { base.reverse() }
838 });
839
840 entries.into_iter().map(|e| e.row).collect()
841 }
842
843 pub fn len(&self) -> usize {
845 self.heap.len()
846 }
847
848 pub fn is_empty(&self) -> bool {
850 self.heap.is_empty()
851 }
852}
853
854#[cfg(test)]
859mod tests {
860 use super::*;
861
862 fn make_row(values: Vec<SochValue>) -> SochRow {
863 SochRow::new(values)
864 }
865
866 #[test]
867 fn test_strategy_selection() {
868 assert_eq!(
870 ExecutionStrategy::choose(true, Some(1_000_000), 10),
871 ExecutionStrategy::IndexPushdown
872 );
873
874 assert_eq!(
876 ExecutionStrategy::choose(false, Some(1_000_000), 10),
877 ExecutionStrategy::StreamingTopK
878 );
879
880 assert_eq!(
883 ExecutionStrategy::choose(false, Some(1000), 500),
884 ExecutionStrategy::FullSort
885 );
886
887 assert_eq!(
889 ExecutionStrategy::choose(false, Some(100), 90),
890 ExecutionStrategy::StreamingTopK
891 );
892 }
893
894 #[test]
895 fn test_order_by_spec_comparator() {
896 let spec = OrderBySpec::single(
897 ColumnRef::Name("priority".to_string()),
898 SortDirection::Ascending,
899 );
900
901 let columns = vec!["id".to_string(), "priority".to_string(), "name".to_string()];
902 let cmp = spec.comparator(&columns);
903
904 let row1 = make_row(vec![
905 SochValue::Int(1),
906 SochValue::Int(5),
907 SochValue::Text("A".to_string()),
908 ]);
909 let row2 = make_row(vec![
910 SochValue::Int(2),
911 SochValue::Int(3),
912 SochValue::Text("B".to_string()),
913 ]);
914
915 assert_eq!(cmp(&row2, &row1), Ordering::Less);
917 }
918
919 #[test]
920 fn test_topk_heap_ascending() {
921 let cmp = |a: &i32, b: &i32| a.cmp(b);
922 let mut heap = TopKHeap::new(3, cmp, true);
923
924 for i in [5, 2, 8, 1, 9, 3, 7, 4, 6] {
925 heap.push(i);
926 }
927
928 let result = heap.into_sorted_vec();
929 assert_eq!(result, vec![1, 2, 3]);
930 }
931
932 #[test]
933 fn test_topk_heap_descending() {
934 let cmp = |a: &i32, b: &i32| a.cmp(b);
935 let mut heap = TopKHeap::new(3, cmp, false);
936
937 for i in [5, 2, 8, 1, 9, 3, 7, 4, 6] {
938 heap.push(i);
939 }
940
941 let result = heap.into_sorted_vec();
942 assert_eq!(result, vec![9, 8, 7]);
944 }
945
946 #[test]
947 fn test_executor_streaming() {
948 let columns = vec!["priority".to_string(), "name".to_string()];
949 let order_by = OrderBySpec::single(
950 ColumnRef::Name("priority".to_string()),
951 SortDirection::Ascending,
952 );
953
954 let executor = OrderByLimitExecutor::new(
955 order_by,
956 3, 0, columns.clone(),
959 false, Some(10),
961 );
962
963 let rows = vec![
965 make_row(vec![SochValue::Int(5), SochValue::Text("E".to_string())]),
966 make_row(vec![SochValue::Int(3), SochValue::Text("C".to_string())]),
967 make_row(vec![SochValue::Int(1), SochValue::Text("A".to_string())]),
968 make_row(vec![SochValue::Int(4), SochValue::Text("D".to_string())]),
969 make_row(vec![SochValue::Int(2), SochValue::Text("B".to_string())]),
970 ];
971
972 let (result, stats) = executor.execute(rows.into_iter());
973
974 assert_eq!(result.len(), 3);
976 assert_eq!(result[0].values[0], SochValue::Int(1));
977 assert_eq!(result[1].values[0], SochValue::Int(2));
978 assert_eq!(result[2].values[0], SochValue::Int(3));
979
980 assert_eq!(stats.input_rows, 5);
981 assert_eq!(stats.output_rows, 3);
982 }
983
984 #[test]
985 fn test_executor_with_offset() {
986 let columns = vec!["priority".to_string()];
987 let order_by = OrderBySpec::single(
988 ColumnRef::Name("priority".to_string()),
989 SortDirection::Ascending,
990 );
991
992 let executor = OrderByLimitExecutor::new(
993 order_by,
994 2, 2, columns,
997 false,
998 Some(10),
999 );
1000
1001 let rows = vec![
1004 make_row(vec![SochValue::Int(5)]),
1005 make_row(vec![SochValue::Int(3)]),
1006 make_row(vec![SochValue::Int(1)]),
1007 make_row(vec![SochValue::Int(4)]),
1008 make_row(vec![SochValue::Int(2)]),
1009 ];
1010
1011 let (result, _) = executor.execute(rows.into_iter());
1012
1013 assert_eq!(result.len(), 2);
1014 assert_eq!(result[0].values[0], SochValue::Int(3));
1015 assert_eq!(result[1].values[0], SochValue::Int(4));
1016 }
1017
1018 #[test]
1019 fn test_single_column_topk() {
1020 let mut topk = SingleColumnTopK::new(3, 0, true); topk.push(make_row(vec![SochValue::Int(5)]));
1023 topk.push(make_row(vec![SochValue::Int(3)]));
1024 topk.push(make_row(vec![SochValue::Int(1)]));
1025 topk.push(make_row(vec![SochValue::Int(4)]));
1026 topk.push(make_row(vec![SochValue::Int(2)]));
1027
1028 let result = topk.into_sorted_vec();
1029
1030 assert_eq!(result.len(), 3);
1031 assert_eq!(result[0].values[0], SochValue::Int(1));
1032 assert_eq!(result[1].values[0], SochValue::Int(2));
1033 assert_eq!(result[2].values[0], SochValue::Int(3));
1034 }
1035
1036 #[test]
1037 fn test_correctness_vs_buggy_implementation() {
1038 let columns = vec!["priority".to_string()];
1040 let order_by = OrderBySpec::single(
1041 ColumnRef::Name("priority".to_string()),
1042 SortDirection::Ascending,
1043 );
1044
1045 let rows: Vec<_> = [5, 2, 8, 1, 9, 3]
1047 .iter()
1048 .map(|&p| make_row(vec![SochValue::Int(p)]))
1049 .collect();
1050
1051 let buggy: Vec<_> = rows.iter().take(3).cloned().collect();
1053 let executor = OrderByLimitExecutor::new(
1058 order_by,
1059 3,
1060 0,
1061 columns,
1062 false,
1063 Some(6),
1064 );
1065 let (correct, _) = executor.execute(rows.into_iter());
1066
1067 assert_eq!(correct[0].values[0], SochValue::Int(1));
1069 assert_eq!(correct[1].values[0], SochValue::Int(2));
1070 assert_eq!(correct[2].values[0], SochValue::Int(3));
1071 }
1072
1073 #[test]
1074 fn test_multi_column_order_by() {
1075 let columns = vec!["priority".to_string(), "created_at".to_string()];
1076
1077 let order_by = OrderBySpec::single(
1078 ColumnRef::Name("priority".to_string()),
1079 SortDirection::Ascending,
1080 ).then_by(
1081 ColumnRef::Name("created_at".to_string()),
1082 SortDirection::Descending,
1083 );
1084
1085 let executor = OrderByLimitExecutor::new(
1086 order_by,
1087 3,
1088 0,
1089 columns,
1090 false,
1091 Some(5),
1092 );
1093
1094 let rows = vec![
1096 make_row(vec![SochValue::Int(1), SochValue::Int(100)]),
1097 make_row(vec![SochValue::Int(1), SochValue::Int(200)]), make_row(vec![SochValue::Int(2), SochValue::Int(150)]),
1099 make_row(vec![SochValue::Int(1), SochValue::Int(150)]),
1100 make_row(vec![SochValue::Int(3), SochValue::Int(100)]),
1101 ];
1102
1103 let (result, _) = executor.execute(rows.into_iter());
1104
1105 assert_eq!(result.len(), 3);
1108 assert_eq!(result[0].values[0], SochValue::Int(1));
1109 assert_eq!(result[0].values[1], SochValue::Int(200));
1110 assert_eq!(result[1].values[0], SochValue::Int(1));
1111 assert_eq!(result[1].values[1], SochValue::Int(150));
1112 assert_eq!(result[2].values[0], SochValue::Int(1));
1113 assert_eq!(result[2].values[1], SochValue::Int(100));
1114 }
1115}