Skip to main content

reddb_server/storage/query/
sort.rs

1//! Sorting Operations for Query Engine
2//!
3//! Provides sorting, ordering, and limiting capabilities for query results.
4
5use super::value_compare::total_compare_values;
6use crate::storage::schema::Value;
7use std::cmp::Ordering;
8
9/// Sort direction
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
11pub enum Direction {
12    /// Ascending order (smallest first)
13    #[default]
14    Asc,
15    /// Descending order (largest first)
16    Desc,
17}
18
19/// Null handling in sort
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
21pub enum NullsOrder {
22    /// Nulls appear first
23    First,
24    /// Nulls appear last
25    #[default]
26    Last,
27}
28
29/// A single sort key (column + direction)
30#[derive(Debug, Clone)]
31pub struct SortKey {
32    /// Column name to sort by
33    pub column: String,
34    /// Sort direction
35    pub direction: Direction,
36    /// Null handling
37    pub nulls: NullsOrder,
38}
39
40impl SortKey {
41    /// Create a new sort key with ascending order
42    pub fn asc(column: impl Into<String>) -> Self {
43        Self {
44            column: column.into(),
45            direction: Direction::Asc,
46            nulls: NullsOrder::Last,
47        }
48    }
49
50    /// Create a new sort key with descending order
51    pub fn desc(column: impl Into<String>) -> Self {
52        Self {
53            column: column.into(),
54            direction: Direction::Desc,
55            nulls: NullsOrder::Last,
56        }
57    }
58
59    /// Set nulls to appear first
60    pub fn nulls_first(mut self) -> Self {
61        self.nulls = NullsOrder::First;
62        self
63    }
64
65    /// Set nulls to appear last
66    pub fn nulls_last(mut self) -> Self {
67        self.nulls = NullsOrder::Last;
68        self
69    }
70
71    /// Compare two values according to this sort key
72    pub fn compare(&self, a: &Value, b: &Value) -> Ordering {
73        // Handle nulls
74        match (a, b) {
75            (Value::Null, Value::Null) => Ordering::Equal,
76            (Value::Null, _) => match self.nulls {
77                NullsOrder::First => Ordering::Less,
78                NullsOrder::Last => Ordering::Greater,
79            },
80            (_, Value::Null) => match self.nulls {
81                NullsOrder::First => Ordering::Greater,
82                NullsOrder::Last => Ordering::Less,
83            },
84            _ => {
85                let base_order = total_compare_values(a, b);
86                match self.direction {
87                    Direction::Asc => base_order,
88                    Direction::Desc => base_order.reverse(),
89                }
90            }
91        }
92    }
93}
94
95/// Order by specification for queries
96#[derive(Debug, Clone, Default)]
97pub struct OrderBy {
98    /// Sort keys in priority order
99    keys: Vec<SortKey>,
100}
101
102impl OrderBy {
103    /// Create an empty order by
104    pub fn new() -> Self {
105        Self { keys: Vec::new() }
106    }
107
108    /// Add an ascending sort key
109    pub fn asc(mut self, column: impl Into<String>) -> Self {
110        self.keys.push(SortKey::asc(column));
111        self
112    }
113
114    /// Add a descending sort key
115    pub fn desc(mut self, column: impl Into<String>) -> Self {
116        self.keys.push(SortKey::desc(column));
117        self
118    }
119
120    /// Add a sort key
121    pub fn then(mut self, key: SortKey) -> Self {
122        self.keys.push(key);
123        self
124    }
125
126    /// Check if empty
127    pub fn is_empty(&self) -> bool {
128        self.keys.is_empty()
129    }
130
131    /// Get sort keys
132    pub fn keys(&self) -> &[SortKey] {
133        &self.keys
134    }
135
136    /// Compare two rows according to all sort keys
137    ///
138    /// The `get_value` closure takes a row and column name, returning the value.
139    pub fn compare<R>(&self, a: &R, b: &R, get_value: impl Fn(&R, &str) -> Value) -> Ordering {
140        for key in &self.keys {
141            let val_a = get_value(a, &key.column);
142            let val_b = get_value(b, &key.column);
143            let ord = key.compare(&val_a, &val_b);
144            if ord != Ordering::Equal {
145                return ord;
146            }
147        }
148        Ordering::Equal
149    }
150
151    /// Sort a slice of rows in place
152    pub fn sort_rows<R>(&self, rows: &mut [R], get_value: impl Fn(&R, &str) -> Value) {
153        if self.is_empty() {
154            return;
155        }
156        rows.sort_by(|a, b| self.compare(a, b, &get_value));
157    }
158
159    /// Phase 3.2 dispatch entry point. Combines `OrderBy::sort_rows`
160    /// and `incremental_sort_top_k` behind a single callable that
161    /// the planner / executor invokes after deciding the strategy
162    /// via `planner::pathkeys::plan_sort`.
163    ///
164    /// `prefix_keys` is the number of leading sort keys the input
165    /// already satisfies (0 if unknown). `limit` is the LIMIT k for
166    /// top-k early termination, or `None` for an unbounded sort.
167    ///
168    /// Behavior matrix:
169    /// - `prefix_keys == 0` && `limit == None` → full `sort_rows`.
170    /// - `prefix_keys == 0` && `limit == Some(k)` → full sort then
171    ///   truncate to k.
172    /// - `prefix_keys > 0` && `limit == Some(k)` → call
173    ///   `incremental_sort_top_k(prefix_keys, k)`.
174    /// - `prefix_keys > 0` && `limit == None` → walk groups and
175    ///   sort within each, no early termination (still cheaper than
176    ///   full sort because each group is independent).
177    pub fn dispatch_sort<R>(
178        &self,
179        rows: Vec<R>,
180        prefix_keys: usize,
181        limit: Option<usize>,
182        get_value: impl Fn(&R, &str) -> Value,
183    ) -> Vec<R> {
184        if self.is_empty() {
185            return rows;
186        }
187        match (prefix_keys, limit) {
188            (0, None) => {
189                let mut all = rows;
190                self.sort_rows(&mut all, &get_value);
191                all
192            }
193            (0, Some(k)) => {
194                let mut all = rows;
195                self.sort_rows(&mut all, &get_value);
196                all.truncate(k);
197                all
198            }
199            (_, Some(k)) => self.incremental_sort_top_k(rows, prefix_keys, k, get_value),
200            (_, None) => {
201                // Group-by-prefix sort with no early termination.
202                // Equivalent to incremental_sort_top_k with k = usize::MAX.
203                self.incremental_sort_top_k(rows, prefix_keys, usize::MAX, get_value)
204            }
205        }
206    }
207
208    /// Incremental top-K sort.
209    ///
210    /// Fase 4 P3 win: when the upstream operator already returns
211    /// rows in `prefix_keys` order (e.g. an index scan whose key
212    /// is a prefix of the requested ORDER BY), this method walks
213    /// the input in chunks of equal-prefix rows, sorts each chunk
214    /// by the *remaining* keys, emits up to `k` rows total, and
215    /// terminates as soon as the budget is met.
216    ///
217    /// Mirrors PG's `nodeIncrementalSort.c` algorithm, simplified:
218    ///
219    /// 1. Walk `rows` left-to-right, grouping by equal-prefix.
220    /// 2. For each group, full-sort by `self.keys[prefix_keys..]`
221    ///    (the suffix not already covered by upstream order).
222    /// 3. Append at most `k - emitted` rows from the sorted group
223    ///    to the output, then advance to the next group.
224    /// 4. Stop iteration entirely once `emitted == k`.
225    ///
226    /// **Caller contract**: rows MUST already be sorted by
227    /// `self.keys[..prefix_keys]`. Violating this produces wrong
228    /// results — the planner is responsible for verifying input
229    /// pathkey order before choosing this operator.
230    ///
231    /// When `prefix_keys == 0` this degenerates to a regular
232    /// top-k sort using `sort_rows` + truncate. When
233    /// `prefix_keys >= self.keys.len()` the input is already
234    /// fully ordered and the method just truncates.
235    pub fn incremental_sort_top_k<R>(
236        &self,
237        rows: Vec<R>,
238        prefix_keys: usize,
239        k: usize,
240        get_value: impl Fn(&R, &str) -> Value,
241    ) -> Vec<R> {
242        if k == 0 || rows.is_empty() {
243            return Vec::new();
244        }
245        // No prefix order known — fall back to full sort + truncate.
246        if prefix_keys == 0 {
247            let mut all = rows;
248            self.sort_rows(&mut all, &get_value);
249            all.truncate(k);
250            return all;
251        }
252        // Input is already fully ordered — just truncate.
253        if prefix_keys >= self.keys.len() {
254            let mut out = rows;
255            out.truncate(k);
256            return out;
257        }
258
259        let suffix_keys = &self.keys[prefix_keys..];
260        let prefix_slice = &self.keys[..prefix_keys];
261        let mut out: Vec<R> = Vec::with_capacity(k);
262        let mut group: Vec<R> = Vec::new();
263
264        // Closure to flush the current group into `out`, sorting
265        // by suffix keys first. Returns `true` when the budget is
266        // exhausted and the caller should stop iteration.
267        let flush =
268            |group: &mut Vec<R>, out: &mut Vec<R>, get_value: &dyn Fn(&R, &str) -> Value| -> bool {
269                if group.is_empty() {
270                    return false;
271                }
272                // Sort the group by the suffix keys only — prefix is
273                // already equal across the whole group.
274                group.sort_by(|a, b| {
275                    for key in suffix_keys {
276                        let ord =
277                            key.compare(&get_value(a, &key.column), &get_value(b, &key.column));
278                        if ord != Ordering::Equal {
279                            return ord;
280                        }
281                    }
282                    Ordering::Equal
283                });
284                let remaining = k - out.len();
285                if group.len() <= remaining {
286                    out.append(group);
287                } else {
288                    out.extend(group.drain(..remaining));
289                    group.clear();
290                }
291                out.len() >= k
292            };
293
294        // Helper to compare two rows by prefix keys. Inline closure
295        // would shadow `get_value`; use an inner fn-style binding.
296        let prefix_eq = |a: &R, b: &R| -> bool {
297            for key in prefix_slice {
298                if key.compare(&get_value(a, &key.column), &get_value(b, &key.column))
299                    != Ordering::Equal
300                {
301                    return false;
302                }
303            }
304            true
305        };
306
307        // Wrap get_value so the flush closure can take `&dyn Fn`.
308        // The wrapper has a stable address inside this function.
309        let get_value_dyn: &dyn Fn(&R, &str) -> Value = &get_value;
310
311        for row in rows {
312            if let Some(first) = group.first() {
313                if !prefix_eq(first, &row) && flush(&mut group, &mut out, get_value_dyn) {
314                    return out;
315                }
316            }
317            group.push(row);
318        }
319        // Flush the final group.
320        flush(&mut group, &mut out, get_value_dyn);
321        out
322    }
323
324    /// Get all referenced columns
325    pub fn referenced_columns(&self) -> Vec<&str> {
326        self.keys.iter().map(|k| k.column.as_str()).collect()
327    }
328}
329
330/// Query limits (LIMIT and OFFSET)
331#[derive(Debug, Clone, Default)]
332pub struct QueryLimits {
333    /// Maximum number of rows to return
334    pub limit: Option<usize>,
335    /// Number of rows to skip
336    pub offset: usize,
337}
338
339impl QueryLimits {
340    /// Create with no limits
341    pub fn none() -> Self {
342        Self {
343            limit: None,
344            offset: 0,
345        }
346    }
347
348    /// Set limit
349    pub fn limit(mut self, n: usize) -> Self {
350        self.limit = Some(n);
351        self
352    }
353
354    /// Set offset
355    pub fn offset(mut self, n: usize) -> Self {
356        self.offset = n;
357        self
358    }
359
360    /// Apply limits to a vector of results
361    pub fn apply<T>(&self, mut items: Vec<T>) -> Vec<T> {
362        if self.offset >= items.len() {
363            return Vec::new();
364        }
365
366        let start = self.offset;
367        let end = match self.limit {
368            Some(limit) => (start + limit).min(items.len()),
369            None => items.len(),
370        };
371
372        items.drain(start..end).collect()
373    }
374
375    /// Apply limits to an iterator
376    pub fn apply_iter<T: 'static, I: Iterator<Item = T> + 'static>(
377        &self,
378        iter: I,
379    ) -> Box<dyn Iterator<Item = T>> {
380        let iter = iter.skip(self.offset);
381        match self.limit {
382            Some(limit) => Box::new(iter.take(limit)),
383            None => Box::new(iter),
384        }
385    }
386
387    /// Calculate the effective range for pagination
388    pub fn range(&self, total: usize) -> std::ops::Range<usize> {
389        let start = self.offset.min(total);
390        let end = match self.limit {
391            Some(limit) => (start + limit).min(total),
392            None => total,
393        };
394        start..end
395    }
396}
397
398/// Top-K tracking structure for incremental sorting
399pub struct TopK<T> {
400    /// Maximum items to keep
401    k: usize,
402    /// Items stored
403    items: Vec<T>,
404    /// Comparison function
405    compare: Box<dyn Fn(&T, &T) -> Ordering>,
406}
407
408impl<T> TopK<T> {
409    /// Create a new top-k tracker
410    pub fn new<F>(k: usize, compare: F) -> Self
411    where
412        F: Fn(&T, &T) -> Ordering + 'static,
413    {
414        Self {
415            k,
416            items: Vec::with_capacity(k + 1),
417            compare: Box::new(compare),
418        }
419    }
420
421    /// Push an item, maintaining only top k
422    pub fn push(&mut self, item: T) {
423        self.items.push(item);
424        self.items.sort_by(&self.compare);
425        if self.items.len() > self.k {
426            self.items.pop();
427        }
428    }
429
430    /// Get current items
431    pub fn items(&self) -> &[T] {
432        &self.items
433    }
434
435    /// Take the items
436    pub fn into_items(self) -> Vec<T> {
437        self.items
438    }
439
440    /// Number of items
441    pub fn len(&self) -> usize {
442        self.items.len()
443    }
444
445    /// Check if empty
446    pub fn is_empty(&self) -> bool {
447        self.items.is_empty()
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454
455    #[test]
456    fn test_sort_key_asc() {
457        let key = SortKey::asc("age");
458
459        assert_eq!(
460            key.compare(&Value::Integer(25), &Value::Integer(30)),
461            Ordering::Less
462        );
463        assert_eq!(
464            key.compare(&Value::Integer(30), &Value::Integer(25)),
465            Ordering::Greater
466        );
467        assert_eq!(
468            key.compare(&Value::Integer(25), &Value::Integer(25)),
469            Ordering::Equal
470        );
471    }
472
473    #[test]
474    fn test_sort_key_desc() {
475        let key = SortKey::desc("age");
476
477        assert_eq!(
478            key.compare(&Value::Integer(25), &Value::Integer(30)),
479            Ordering::Greater
480        );
481        assert_eq!(
482            key.compare(&Value::Integer(30), &Value::Integer(25)),
483            Ordering::Less
484        );
485    }
486
487    #[test]
488    fn test_sort_key_nulls_first() {
489        let key = SortKey::asc("age").nulls_first();
490
491        assert_eq!(
492            key.compare(&Value::Null, &Value::Integer(30)),
493            Ordering::Less
494        );
495        assert_eq!(
496            key.compare(&Value::Integer(30), &Value::Null),
497            Ordering::Greater
498        );
499    }
500
501    #[test]
502    fn test_sort_key_nulls_last() {
503        let key = SortKey::asc("age").nulls_last();
504
505        assert_eq!(
506            key.compare(&Value::Null, &Value::Integer(30)),
507            Ordering::Greater
508        );
509        assert_eq!(
510            key.compare(&Value::Integer(30), &Value::Null),
511            Ordering::Less
512        );
513    }
514
515    #[test]
516    fn test_order_by_single() {
517        let order = OrderBy::new().asc("age");
518
519        // Simple row type: (name, age)
520        type Row = (String, i64);
521        let get_value = |row: &Row, col: &str| -> Value {
522            match col {
523                "name" => Value::text(row.0.clone()),
524                "age" => Value::Integer(row.1),
525                _ => Value::Null,
526            }
527        };
528
529        let mut rows = vec![
530            ("Charlie".to_string(), 30),
531            ("Alice".to_string(), 25),
532            ("Bob".to_string(), 35),
533        ];
534
535        order.sort_rows(&mut rows, get_value);
536
537        assert_eq!(rows[0].0, "Alice");
538        assert_eq!(rows[1].0, "Charlie");
539        assert_eq!(rows[2].0, "Bob");
540    }
541
542    #[test]
543    fn test_order_by_multiple() {
544        let order = OrderBy::new().asc("department").desc("salary");
545
546        type Row = (String, String, i64); // (name, department, salary)
547        let get_value = |row: &Row, col: &str| -> Value {
548            match col {
549                "name" => Value::text(row.0.clone()),
550                "department" => Value::text(row.1.clone()),
551                "salary" => Value::Integer(row.2),
552                _ => Value::Null,
553            }
554        };
555
556        let mut rows = vec![
557            ("Alice".to_string(), "Engineering".to_string(), 100000),
558            ("Bob".to_string(), "Engineering".to_string(), 120000),
559            ("Charlie".to_string(), "Sales".to_string(), 90000),
560            ("Diana".to_string(), "Engineering".to_string(), 110000),
561        ];
562
563        order.sort_rows(&mut rows, get_value);
564
565        // Engineering first (alphabetically), then by salary descending
566        assert_eq!(rows[0].0, "Bob"); // Eng, 120k
567        assert_eq!(rows[1].0, "Diana"); // Eng, 110k
568        assert_eq!(rows[2].0, "Alice"); // Eng, 100k
569        assert_eq!(rows[3].0, "Charlie"); // Sales, 90k
570    }
571
572    #[test]
573    fn test_query_limits_apply() {
574        let items: Vec<i32> = (0..10).collect();
575
576        // Limit only
577        let limited = QueryLimits::none().limit(3).apply(items.clone());
578        assert_eq!(limited, vec![0, 1, 2]);
579
580        // Offset only
581        let offset = QueryLimits::none().offset(3).apply(items.clone());
582        assert_eq!(offset, vec![3, 4, 5, 6, 7, 8, 9]);
583
584        // Both
585        let both = QueryLimits::none().offset(2).limit(3).apply(items.clone());
586        assert_eq!(both, vec![2, 3, 4]);
587
588        // Offset beyond length
589        let empty = QueryLimits::none().offset(20).apply(items.clone());
590        assert!(empty.is_empty());
591    }
592
593    #[test]
594    fn test_query_limits_range() {
595        let limits = QueryLimits::none().offset(5).limit(10);
596
597        assert_eq!(limits.range(100), 5..15);
598        assert_eq!(limits.range(8), 5..8); // Limit by total
599        assert_eq!(limits.range(3), 3..3); // Offset beyond total
600    }
601
602    #[test]
603    fn test_top_k() {
604        let mut topk = TopK::new(3, |a: &i32, b: &i32| a.cmp(b));
605
606        topk.push(5);
607        topk.push(2);
608        topk.push(8);
609        topk.push(1);
610        topk.push(9);
611
612        let items = topk.into_items();
613        assert_eq!(items, vec![1, 2, 5]); // Top 3 smallest
614    }
615
616    #[test]
617    fn test_top_k_desc() {
618        let mut topk = TopK::new(3, |a: &i32, b: &i32| b.cmp(a)); // Reverse for largest
619
620        topk.push(5);
621        topk.push(2);
622        topk.push(8);
623        topk.push(1);
624        topk.push(9);
625
626        let items = topk.into_items();
627        assert_eq!(items, vec![9, 8, 5]); // Top 3 largest
628    }
629
630    #[test]
631    fn test_compare_cross_type() {
632        assert_eq!(
633            total_compare_values(&Value::Integer(10), &Value::Float(10.0)),
634            Ordering::Equal
635        );
636        assert_eq!(
637            total_compare_values(&Value::Integer(9), &Value::Float(10.0)),
638            Ordering::Less
639        );
640    }
641
642    #[test]
643    fn test_order_by_empty() {
644        let order = OrderBy::new();
645        assert!(order.is_empty());
646
647        let mut rows = vec![3, 1, 2];
648        order.sort_rows(&mut rows, |r, _| Value::Integer(*r));
649        // Should not change order
650        assert_eq!(rows, vec![3, 1, 2]);
651    }
652
653    #[test]
654    fn test_sort_key_text() {
655        let key = SortKey::asc("name");
656
657        assert_eq!(
658            key.compare(
659                &Value::text("Alice".to_string()),
660                &Value::text("Bob".to_string())
661            ),
662            Ordering::Less
663        );
664    }
665
666    #[test]
667    fn test_sort_key_timestamp() {
668        let key = SortKey::desc("created_at");
669
670        // Later timestamp should come first in desc order
671        assert_eq!(
672            key.compare(&Value::Timestamp(1000), &Value::Timestamp(500)),
673            Ordering::Less // 1000 is "smaller" in desc = comes first
674        );
675    }
676}