Skip to main content

sochdb_query/
topk_executor.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! ORDER BY + LIMIT Optimization with Streaming Top-K
19//!
20//! This module fixes a critical semantic bug in the query execution path and
21//! provides efficient top-K query support for queue-like access patterns.
22//!
23//! ## The Bug: Incorrect ORDER BY + LIMIT Semantics
24//!
25//! The previous implementation applied LIMIT during scan collection, then sorted:
26//!
27//! ```text
28//! ❌ WRONG:
29//!    for row in scan:
30//!        output.push(row)
31//!        if output.len() >= limit:
32//!            break
33//!    output.sort(order_by)  # BUG: Sorting wrong subset!
34//! ```
35//!
36//! This is NOT semantically equivalent to `ORDER BY ... LIMIT K` unless the
37//! scan order matches the requested order (generally false).
38//!
39//! Example of the bug:
40//! ```text
41//! Table: [priority=5, priority=1, priority=3, priority=2, priority=4]
42//! Query: ORDER BY priority ASC LIMIT 1
43//!
44//! Wrong (limit-then-sort): Returns priority=5 (first row)
45//! Correct: Returns priority=1 (minimum)
46//! ```
47//!
48//! ## The Fix: Three Strategies
49//!
50//! | Strategy       | Time       | Space | When to Use                     |
51//! |----------------|------------|-------|----------------------------------|
52//! | IndexPushdown  | O(log N+K) | O(K)  | Ordered index on ORDER BY col   |
53//! | StreamingTopK  | O(N log K) | O(K)  | No index, K << N                |
54//! | FullSort       | O(N log N) | O(N)  | No index, K ≈ N                 |
55//!
56//! ## Streaming Top-K Algorithm
57//!
58//! For ORDER BY col ASC LIMIT K:
59//! 1. Maintain a max-heap of size K
60//! 2. For each row, if row.col < heap.max, evict max and insert row
61//! 3. At end, drain heap in sorted order
62//!
63//! This gives O(N log K) time and O(K) space, vs O(N log N) and O(N) for full sort.
64//!
65//! ## Queue Optimization
66//!
67//! For "get highest priority task" (ORDER BY priority ASC LIMIT 1):
68//! - With 10K tasks: ~10K comparisons with O(1) memory
69//! - With ordered index: ~14 comparisons (log₂ 10000)
70
71use std::cmp::Ordering;
72use std::collections::BinaryHeap;
73
74use sochdb_core::{SochRow, SochValue};
75
76// ============================================================================
77// OrderBySpec - Sort Specification
78// ============================================================================
79
80/// Sort direction
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum SortDirection {
83    Ascending,
84    Descending,
85}
86
87/// A single ORDER BY column specification
88#[derive(Debug, Clone)]
89pub struct OrderByColumn {
90    /// Column index or name
91    pub column: ColumnRef,
92    /// Sort direction
93    pub direction: SortDirection,
94    /// Null handling (NULLS FIRST or NULLS LAST)
95    pub nulls_first: bool,
96}
97
98/// Reference to a column
99#[derive(Debug, Clone)]
100pub enum ColumnRef {
101    /// Column by index
102    Index(usize),
103    /// Column by name
104    Name(String),
105}
106
107impl ColumnRef {
108    /// Resolve to an index given a column name mapping
109    pub fn resolve(&self, columns: &[String]) -> Option<usize> {
110        match self {
111            ColumnRef::Index(i) => Some(*i),
112            ColumnRef::Name(name) => columns.iter().position(|c| c == name),
113        }
114    }
115}
116
117/// Full ORDER BY specification
118#[derive(Debug, Clone)]
119pub struct OrderBySpec {
120    /// Columns to sort by (in order of priority)
121    pub columns: Vec<OrderByColumn>,
122}
123
124impl OrderBySpec {
125    /// Create from a single column
126    pub fn single(column: ColumnRef, direction: SortDirection) -> Self {
127        Self {
128            columns: vec![OrderByColumn {
129                column,
130                direction,
131                nulls_first: false,
132            }],
133        }
134    }
135
136    /// Add another column to the sort
137    pub fn then_by(mut self, column: ColumnRef, direction: SortDirection) -> Self {
138        self.columns.push(OrderByColumn {
139            column,
140            direction,
141            nulls_first: false,
142        });
143        self
144    }
145
146    /// Create a comparator function for rows
147    pub fn comparator(&self, column_names: &[String]) -> impl Fn(&SochRow, &SochRow) -> Ordering {
148        let resolved: Vec<_> = self
149            .columns
150            .iter()
151            .filter_map(|col| {
152                col.column
153                    .resolve(column_names)
154                    .map(|idx| (idx, col.direction, col.nulls_first))
155            })
156            .collect();
157
158        move |a: &SochRow, b: &SochRow| {
159            for &(idx, direction, nulls_first) in &resolved {
160                let val_a = a.values.get(idx);
161                let val_b = b.values.get(idx);
162
163                let ordering = compare_values(val_a, val_b, nulls_first);
164
165                if ordering != Ordering::Equal {
166                    return match direction {
167                        SortDirection::Ascending => ordering,
168                        SortDirection::Descending => ordering.reverse(),
169                    };
170                }
171            }
172            Ordering::Equal
173        }
174    }
175
176    /// Check if an index matches this ORDER BY spec
177    pub fn matches_index(&self, index_columns: &[(String, SortDirection)]) -> bool {
178        if self.columns.len() > index_columns.len() {
179            return false;
180        }
181
182        self.columns
183            .iter()
184            .zip(index_columns.iter())
185            .all(|(col, (idx_col, idx_dir))| {
186                match &col.column {
187                    ColumnRef::Name(name) => name == idx_col && col.direction == *idx_dir,
188                    ColumnRef::Index(_) => false, // Can't match by index
189                }
190            })
191    }
192}
193
194/// Compare two optional SochValues with null handling
195fn compare_values(a: Option<&SochValue>, b: Option<&SochValue>, nulls_first: bool) -> Ordering {
196    match (a, b) {
197        (None, None) => Ordering::Equal,
198        (None, Some(_)) => {
199            if nulls_first {
200                Ordering::Less
201            } else {
202                Ordering::Greater
203            }
204        }
205        (Some(_), None) => {
206            if nulls_first {
207                Ordering::Greater
208            } else {
209                Ordering::Less
210            }
211        }
212        (Some(SochValue::Null), Some(SochValue::Null)) => Ordering::Equal,
213        (Some(SochValue::Null), Some(_)) => {
214            if nulls_first {
215                Ordering::Less
216            } else {
217                Ordering::Greater
218            }
219        }
220        (Some(_), Some(SochValue::Null)) => {
221            if nulls_first {
222                Ordering::Greater
223            } else {
224                Ordering::Less
225            }
226        }
227        (Some(a), Some(b)) => compare_soch_values(a, b),
228    }
229}
230
231/// Compare two SochValues
232fn compare_soch_values(a: &SochValue, b: &SochValue) -> Ordering {
233    match (a, b) {
234        (SochValue::Int(a), SochValue::Int(b)) => a.cmp(b),
235        (SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b),
236        (SochValue::Float(a), SochValue::Float(b)) => a.partial_cmp(b).unwrap_or(Ordering::Equal),
237        (SochValue::Text(a), SochValue::Text(b)) => a.cmp(b),
238        (SochValue::Bool(a), SochValue::Bool(b)) => a.cmp(b),
239        _ => Ordering::Equal, // Incompatible types compare as equal
240    }
241}
242
243// ============================================================================
244// TopKHeap - Generic Streaming Top-K
245// ============================================================================
246
247/// A bounded heap for streaming top-K selection
248///
249/// This maintains the K smallest (or largest) elements seen so far,
250/// without storing all N elements.
251///
252/// ## Complexity
253/// - Push: O(log K) when heap is full, O(log K) insertion
254/// - Drain: O(K log K) to produce sorted output
255/// - Space: O(K)
256pub struct TopKHeap<T, F>
257where
258    F: Fn(&T, &T) -> Ordering,
259{
260    /// The heap (max-heap for smallest K, min-heap for largest K)
261    heap: BinaryHeap<ComparableWrapper<T, F>>,
262    /// Maximum size
263    k: usize,
264    /// Comparator (defines desired output order)
265    comparator: F,
266    /// Whether we want smallest K (true) or largest K (false)
267    want_smallest: bool,
268}
269
270/// Wrapper to make items comparable via the provided function
271struct ComparableWrapper<T, F>
272where
273    F: Fn(&T, &T) -> Ordering,
274{
275    value: T,
276    comparator: *const F,
277    inverted: bool,
278}
279
280// Safety: We ensure the comparator pointer remains valid for the lifetime of the heap
281unsafe impl<T: Send, F> Send for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
282unsafe impl<T: Sync, F> Sync for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
283
284impl<T, F> PartialEq for ComparableWrapper<T, F>
285where
286    F: Fn(&T, &T) -> Ordering,
287{
288    fn eq(&self, other: &Self) -> bool {
289        self.cmp(other) == Ordering::Equal
290    }
291}
292
293impl<T, F> Eq for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
294
295impl<T, F> PartialOrd for ComparableWrapper<T, F>
296where
297    F: Fn(&T, &T) -> Ordering,
298{
299    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
300        Some(self.cmp(other))
301    }
302}
303
304impl<T, F> Ord for ComparableWrapper<T, F>
305where
306    F: Fn(&T, &T) -> Ordering,
307{
308    fn cmp(&self, other: &Self) -> Ordering {
309        // Safety: comparator pointer is valid for heap lifetime
310        let cmp = unsafe { &*self.comparator };
311        let result = cmp(&self.value, &other.value);
312
313        if self.inverted {
314            result.reverse()
315        } else {
316            result
317        }
318    }
319}
320
321impl<T, F> TopKHeap<T, F>
322where
323    F: Fn(&T, &T) -> Ordering,
324{
325    /// Create a new top-K heap
326    ///
327    /// - `k`: Number of elements to keep
328    /// - `comparator`: Defines the desired output order (Less = should come first)
329    /// - `want_smallest`: If true, keep the K elements that compare as smallest
330    pub fn new(k: usize, comparator: F, want_smallest: bool) -> Self {
331        Self {
332            heap: BinaryHeap::with_capacity(k + 1),
333            k,
334            comparator,
335            want_smallest,
336        }
337    }
338
339    /// Push an element into the heap
340    ///
341    /// Complexity: O(log K)
342    pub fn push(&mut self, value: T) {
343        if self.k == 0 {
344            return;
345        }
346
347        let wrapper = ComparableWrapper {
348            value,
349            comparator: &self.comparator as *const F,
350            // For smallest K, we want a max-heap (so we can evict the largest)
351            // For largest K, we want a min-heap (so we can evict the smallest)
352            inverted: !self.want_smallest,
353        };
354
355        if self.heap.len() < self.k {
356            self.heap.push(wrapper);
357        } else if let Some(top) = self.heap.peek() {
358            // Check if new element should replace the current boundary
359            let should_replace = if self.want_smallest {
360                // For smallest K: replace if new < current max
361                (self.comparator)(&wrapper.value, &top.value) == Ordering::Less
362            } else {
363                // For largest K: replace if new > current min
364                (self.comparator)(&wrapper.value, &top.value) == Ordering::Greater
365            };
366
367            if should_replace {
368                self.heap.pop();
369                self.heap.push(wrapper);
370            }
371        }
372    }
373
374    /// Get the current boundary value
375    ///
376    /// This is the value that new elements must beat to be included.
377    pub fn threshold(&self) -> Option<&T> {
378        self.heap.peek().map(|w| &w.value)
379    }
380
381    /// Check if the heap is at capacity
382    pub fn is_full(&self) -> bool {
383        self.heap.len() >= self.k
384    }
385
386    /// Drain the heap into a sorted vector
387    ///
388    /// Complexity: O(K log K)
389    pub fn into_sorted_vec(self) -> Vec<T> {
390        let mut values: Vec<_> = self.heap.into_iter().map(|w| w.value).collect();
391        if self.want_smallest {
392            values.sort_by(&self.comparator);
393        } else {
394            // For largest K, sort in descending order
395            values.sort_by(|a, b| (&self.comparator)(b, a));
396        }
397        values
398    }
399
400    /// Current number of elements
401    pub fn len(&self) -> usize {
402        self.heap.len()
403    }
404
405    /// Check if empty
406    pub fn is_empty(&self) -> bool {
407        self.heap.is_empty()
408    }
409}
410
411// ============================================================================
412// OrderByLimitExecutor - The Fixed Implementation
413// ============================================================================
414
415/// Strategy for ORDER BY + LIMIT execution
416#[derive(Debug, Clone, Copy, PartialEq, Eq)]
417pub enum ExecutionStrategy {
418    /// Use index pushdown (storage provides ordered results)
419    IndexPushdown,
420    /// Streaming top-K with heap
421    StreamingTopK,
422    /// Full sort then limit
423    FullSort,
424}
425
426impl ExecutionStrategy {
427    /// Choose the optimal strategy
428    pub fn choose(has_matching_index: bool, estimated_rows: Option<usize>, limit: usize) -> Self {
429        // If we have a matching index, always use pushdown
430        if has_matching_index {
431            return ExecutionStrategy::IndexPushdown;
432        }
433
434        // If we don't know the row count, use streaming (safe default)
435        let n = match estimated_rows {
436            Some(n) if n > 0 => n,
437            _ => return ExecutionStrategy::StreamingTopK,
438        };
439
440        // Heuristic: streaming is better when K < sqrt(N) or K is "small"
441        // Break-even is approximately when K * log(K) < N, but we use simpler heuristic
442        let k = limit;
443
444        if k <= 100 {
445            // Small K: streaming is almost always better
446            ExecutionStrategy::StreamingTopK
447        } else if (k as f64) < (n as f64).sqrt() {
448            // K < sqrt(N): streaming wins
449            ExecutionStrategy::StreamingTopK
450        } else {
451            // Large K relative to N: full sort may be better
452            // (avoids heap overhead when keeping most of the data)
453            ExecutionStrategy::FullSort
454        }
455    }
456
457    /// Get estimated complexity description
458    pub fn complexity(&self, n: usize, k: usize) -> String {
459        match self {
460            ExecutionStrategy::IndexPushdown => {
461                format!(
462                    "O(log {} + {}) = O({})",
463                    n,
464                    k,
465                    (n as f64).log2() as usize + k
466                )
467            }
468            ExecutionStrategy::StreamingTopK => {
469                let log_k = (k as f64).log2().max(1.0) as usize;
470                format!("O({} * log {}) ≈ O({})", n, k, n * log_k)
471            }
472            ExecutionStrategy::FullSort => {
473                let log_n = (n as f64).log2().max(1.0) as usize;
474                format!("O({} * log {}) ≈ O({})", n, n, n * log_n)
475            }
476        }
477    }
478}
479
480/// Result statistics from ORDER BY + LIMIT execution
481#[derive(Debug, Clone, Default)]
482pub struct OrderByLimitStats {
483    /// Strategy used
484    pub strategy: Option<ExecutionStrategy>,
485    /// Input rows processed
486    pub input_rows: usize,
487    /// Output rows produced
488    pub output_rows: usize,
489    /// Heap operations performed
490    pub heap_operations: usize,
491    /// Comparisons performed
492    pub comparisons: usize,
493    /// Rows skipped by offset
494    pub offset_skipped: usize,
495}
496
497/// Executor for ORDER BY + LIMIT queries
498///
499/// This is the CORRECT implementation that ensures semantic equivalence
500/// with `ORDER BY ... LIMIT K OFFSET M`.
501pub struct OrderByLimitExecutor {
502    /// ORDER BY specification
503    order_by: OrderBySpec,
504    /// LIMIT value
505    limit: usize,
506    /// OFFSET value
507    offset: usize,
508    /// Column names for resolution
509    column_names: Vec<String>,
510    /// Execution strategy
511    strategy: ExecutionStrategy,
512}
513
514impl OrderByLimitExecutor {
515    /// Create a new executor
516    pub fn new(
517        order_by: OrderBySpec,
518        limit: usize,
519        offset: usize,
520        column_names: Vec<String>,
521        has_matching_index: bool,
522        estimated_rows: Option<usize>,
523    ) -> Self {
524        // For OFFSET, we need to fetch limit + offset rows
525        let effective_limit = limit.saturating_add(offset);
526        let strategy =
527            ExecutionStrategy::choose(has_matching_index, estimated_rows, effective_limit);
528
529        Self {
530            order_by,
531            limit,
532            offset,
533            column_names,
534            strategy,
535        }
536    }
537
538    /// Get the chosen strategy
539    pub fn strategy(&self) -> ExecutionStrategy {
540        self.strategy
541    }
542
543    /// Execute on an iterator of rows
544    ///
545    /// This is the main entry point. It:
546    /// 1. Applies the chosen strategy to get top (limit + offset) rows
547    /// 2. Applies offset
548    /// 3. Returns the final result
549    pub fn execute<I>(&self, rows: I) -> (Vec<SochRow>, OrderByLimitStats)
550    where
551        I: Iterator<Item = SochRow>,
552    {
553        let mut stats = OrderByLimitStats {
554            strategy: Some(self.strategy),
555            ..Default::default()
556        };
557
558        let effective_limit = self.limit.saturating_add(self.offset);
559
560        let result = match self.strategy {
561            ExecutionStrategy::IndexPushdown => {
562                // With index, just take the first limit+offset rows
563                // (caller must provide rows in correct order!)
564                let collected: Vec<_> = rows.take(effective_limit).collect();
565                stats.input_rows = collected.len();
566                collected
567            }
568            ExecutionStrategy::StreamingTopK => {
569                self.execute_streaming(rows, effective_limit, &mut stats)
570            }
571            ExecutionStrategy::FullSort => {
572                self.execute_full_sort(rows, effective_limit, &mut stats)
573            }
574        };
575
576        // Apply offset
577        let final_result: Vec<_> = result
578            .into_iter()
579            .skip(self.offset)
580            .take(self.limit)
581            .collect();
582
583        stats.offset_skipped = self.offset.min(stats.input_rows);
584        stats.output_rows = final_result.len();
585
586        (final_result, stats)
587    }
588
589    /// Execute using streaming top-K algorithm
590    fn execute_streaming<I>(&self, rows: I, k: usize, stats: &mut OrderByLimitStats) -> Vec<SochRow>
591    where
592        I: Iterator<Item = SochRow>,
593    {
594        let comparator = self.order_by.comparator(&self.column_names);
595
596        // We want the K smallest according to the comparator
597        let mut heap = TopKHeap::new(k, comparator, true);
598
599        for row in rows {
600            stats.input_rows += 1;
601            stats.heap_operations += 1;
602            heap.push(row);
603        }
604
605        heap.into_sorted_vec()
606    }
607
608    /// Execute using full sort
609    fn execute_full_sort<I>(&self, rows: I, k: usize, stats: &mut OrderByLimitStats) -> Vec<SochRow>
610    where
611        I: Iterator<Item = SochRow>,
612    {
613        let comparator = self.order_by.comparator(&self.column_names);
614
615        let mut all_rows: Vec<_> = rows.collect();
616        stats.input_rows = all_rows.len();
617
618        all_rows.sort_by(&comparator);
619
620        all_rows.truncate(k);
621        all_rows
622    }
623}
624
625// ============================================================================
626// IndexAwareTopK - For Index Pushdown with Partial Match
627// ============================================================================
628
629/// Top-K with index awareness
630///
631/// When the index only partially matches the ORDER BY (e.g., index on col1
632/// but ORDER BY col1, col2), we can still use the index for the first column
633/// and apply top-K for the rest.
634pub struct IndexAwareTopK<T, F>
635where
636    F: Fn(&T, &T) -> Ordering,
637{
638    /// Current batch of items with same index key
639    current_batch: Vec<T>,
640    /// Best items seen so far
641    result: Vec<T>,
642    /// Maximum items to keep
643    k: usize,
644    /// Secondary comparator (for columns not in index)
645    secondary_cmp: F,
646}
647
648impl<T, F> IndexAwareTopK<T, F>
649where
650    F: Fn(&T, &T) -> Ordering,
651{
652    /// Create new index-aware top-K
653    pub fn new(k: usize, secondary_cmp: F) -> Self {
654        Self {
655            current_batch: Vec::new(),
656            result: Vec::with_capacity(k),
657            k,
658            secondary_cmp,
659        }
660    }
661
662    /// Process an item from an index-ordered scan
663    ///
664    /// Items must be provided in index order. When the index key changes,
665    /// the previous batch is finalized.
666    pub fn push(&mut self, item: T, same_index_key_as_previous: bool) {
667        if !same_index_key_as_previous {
668            self.finalize_batch();
669        }
670
671        self.current_batch.push(item);
672    }
673
674    /// Finalize the current batch (sort by secondary key)
675    fn finalize_batch(&mut self) {
676        if self.current_batch.is_empty() {
677            return;
678        }
679
680        // Sort batch by secondary key
681        self.current_batch.sort_by(&self.secondary_cmp);
682
683        // Take as many as we need
684        let remaining = self.k.saturating_sub(self.result.len());
685        let to_take = remaining.min(self.current_batch.len());
686
687        self.result.extend(self.current_batch.drain(..to_take));
688        self.current_batch.clear();
689    }
690
691    /// Check if we have enough results
692    pub fn is_complete(&self) -> bool {
693        self.result.len() >= self.k
694    }
695
696    /// Drain into final result
697    pub fn into_result(mut self) -> Vec<T> {
698        self.finalize_batch();
699        self.result
700    }
701}
702
703// ============================================================================
704// SingleColumnTopK - Optimized for Single Column
705// ============================================================================
706
707/// Optimized top-K for single-column ORDER BY
708///
709/// Avoids the overhead of multi-column comparison when only one column is involved.
710pub struct SingleColumnTopK {
711    /// The heap
712    heap: BinaryHeap<SingleColEntry>,
713    /// K value
714    k: usize,
715    /// Column index
716    col_idx: usize,
717    /// Whether ascending order
718    ascending: bool,
719}
720
721struct SingleColEntry {
722    row: SochRow,
723    key: OrderableValue,
724    ascending: bool,
725}
726
727/// Wrapper to make SochValue orderable
728#[derive(Clone)]
729enum OrderableValue {
730    Int(i64),
731    UInt(u64),
732    Float(f64),
733    Text(String),
734    Bool(bool),
735    Null,
736}
737
738impl From<&SochValue> for OrderableValue {
739    fn from(v: &SochValue) -> Self {
740        match v {
741            SochValue::Int(i) => OrderableValue::Int(*i),
742            SochValue::UInt(u) => OrderableValue::UInt(*u),
743            SochValue::Float(f) => OrderableValue::Float(*f),
744            SochValue::Text(s) => OrderableValue::Text(s.clone()),
745            SochValue::Bool(b) => OrderableValue::Bool(*b),
746            SochValue::Null => OrderableValue::Null,
747            _ => OrderableValue::Null,
748        }
749    }
750}
751
752impl PartialEq for OrderableValue {
753    fn eq(&self, other: &Self) -> bool {
754        self.cmp(other) == Ordering::Equal
755    }
756}
757
758impl Eq for OrderableValue {}
759
760impl PartialOrd for OrderableValue {
761    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
762        Some(self.cmp(other))
763    }
764}
765
766impl Ord for OrderableValue {
767    fn cmp(&self, other: &Self) -> Ordering {
768        match (self, other) {
769            (OrderableValue::Null, OrderableValue::Null) => Ordering::Equal,
770            (OrderableValue::Null, _) => Ordering::Greater, // NULLS LAST
771            (_, OrderableValue::Null) => Ordering::Less,
772            (OrderableValue::Int(a), OrderableValue::Int(b)) => a.cmp(b),
773            (OrderableValue::UInt(a), OrderableValue::UInt(b)) => a.cmp(b),
774            (OrderableValue::Float(a), OrderableValue::Float(b)) => {
775                a.partial_cmp(b).unwrap_or(Ordering::Equal)
776            }
777            (OrderableValue::Text(a), OrderableValue::Text(b)) => a.cmp(b),
778            (OrderableValue::Bool(a), OrderableValue::Bool(b)) => a.cmp(b),
779            _ => Ordering::Equal, // Incompatible types
780        }
781    }
782}
783
784impl PartialEq for SingleColEntry {
785    fn eq(&self, other: &Self) -> bool {
786        self.key == other.key
787    }
788}
789
790impl Eq for SingleColEntry {}
791
792impl PartialOrd for SingleColEntry {
793    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
794        Some(self.cmp(other))
795    }
796}
797
798impl Ord for SingleColEntry {
799    fn cmp(&self, other: &Self) -> Ordering {
800        let base = self.key.cmp(&other.key);
801
802        // For ascending + max-heap, we want to evict the largest,
803        // so we don't invert. For descending, we invert.
804        if self.ascending { base } else { base.reverse() }
805    }
806}
807
808impl SingleColumnTopK {
809    /// Create new single-column top-K
810    pub fn new(k: usize, col_idx: usize, ascending: bool) -> Self {
811        Self {
812            heap: BinaryHeap::with_capacity(k + 1),
813            k,
814            col_idx,
815            ascending,
816        }
817    }
818
819    /// Push a row
820    pub fn push(&mut self, row: SochRow) {
821        if self.k == 0 {
822            return;
823        }
824
825        let key = row
826            .values
827            .get(self.col_idx)
828            .map(OrderableValue::from)
829            .unwrap_or(OrderableValue::Null);
830
831        let entry = SingleColEntry {
832            row,
833            key,
834            ascending: self.ascending,
835        };
836
837        if self.heap.len() < self.k {
838            self.heap.push(entry);
839        } else if let Some(top) = self.heap.peek() {
840            let should_replace = if self.ascending {
841                entry.key < top.key
842            } else {
843                entry.key > top.key
844            };
845
846            if should_replace {
847                self.heap.pop();
848                self.heap.push(entry);
849            }
850        }
851    }
852
853    /// Drain into sorted vector
854    pub fn into_sorted_vec(self) -> Vec<SochRow> {
855        let mut entries: Vec<_> = self.heap.into_iter().collect();
856
857        entries.sort_by(|a, b| {
858            let base = a.key.cmp(&b.key);
859            if self.ascending { base } else { base.reverse() }
860        });
861
862        entries.into_iter().map(|e| e.row).collect()
863    }
864
865    /// Current count
866    pub fn len(&self) -> usize {
867        self.heap.len()
868    }
869
870    /// Is empty
871    pub fn is_empty(&self) -> bool {
872        self.heap.is_empty()
873    }
874}
875
876// ============================================================================
877// Tests
878// ============================================================================
879
880#[cfg(test)]
881mod tests {
882    use super::*;
883
884    fn make_row(values: Vec<SochValue>) -> SochRow {
885        SochRow::new(values)
886    }
887
888    #[test]
889    fn test_strategy_selection() {
890        // With index: always pushdown
891        assert_eq!(
892            ExecutionStrategy::choose(true, Some(1_000_000), 10),
893            ExecutionStrategy::IndexPushdown
894        );
895
896        // Small K without index: streaming
897        assert_eq!(
898            ExecutionStrategy::choose(false, Some(1_000_000), 10),
899            ExecutionStrategy::StreamingTopK
900        );
901
902        // Large K relative to N (K > sqrt(N) and K > 100): full sort
903        // For N=1000, sqrt(N) ≈ 31.6, so K=500 > sqrt(1000) and K > 100
904        assert_eq!(
905            ExecutionStrategy::choose(false, Some(1000), 500),
906            ExecutionStrategy::FullSort
907        );
908
909        // K <= 100: always streaming even if K > sqrt(N)
910        assert_eq!(
911            ExecutionStrategy::choose(false, Some(100), 90),
912            ExecutionStrategy::StreamingTopK
913        );
914    }
915
916    #[test]
917    fn test_order_by_spec_comparator() {
918        let spec = OrderBySpec::single(
919            ColumnRef::Name("priority".to_string()),
920            SortDirection::Ascending,
921        );
922
923        let columns = vec!["id".to_string(), "priority".to_string(), "name".to_string()];
924        let cmp = spec.comparator(&columns);
925
926        let row1 = make_row(vec![
927            SochValue::Int(1),
928            SochValue::Int(5),
929            SochValue::Text("A".to_string()),
930        ]);
931        let row2 = make_row(vec![
932            SochValue::Int(2),
933            SochValue::Int(3),
934            SochValue::Text("B".to_string()),
935        ]);
936
937        // row2 has lower priority, should come first in ASC
938        assert_eq!(cmp(&row2, &row1), Ordering::Less);
939    }
940
941    #[test]
942    fn test_topk_heap_ascending() {
943        let cmp = |a: &i32, b: &i32| a.cmp(b);
944        let mut heap = TopKHeap::new(3, cmp, true);
945
946        for i in [5, 2, 8, 1, 9, 3, 7, 4, 6] {
947            heap.push(i);
948        }
949
950        let result = heap.into_sorted_vec();
951        assert_eq!(result, vec![1, 2, 3]);
952    }
953
954    #[test]
955    fn test_topk_heap_descending() {
956        let cmp = |a: &i32, b: &i32| a.cmp(b);
957        let mut heap = TopKHeap::new(3, cmp, false);
958
959        for i in [5, 2, 8, 1, 9, 3, 7, 4, 6] {
960            heap.push(i);
961        }
962
963        let result = heap.into_sorted_vec();
964        // Descending order
965        assert_eq!(result, vec![9, 8, 7]);
966    }
967
968    #[test]
969    fn test_executor_streaming() {
970        let columns = vec!["priority".to_string(), "name".to_string()];
971        let order_by = OrderBySpec::single(
972            ColumnRef::Name("priority".to_string()),
973            SortDirection::Ascending,
974        );
975
976        let executor = OrderByLimitExecutor::new(
977            order_by,
978            3, // limit
979            0, // offset
980            columns.clone(),
981            false, // no index
982            Some(10),
983        );
984
985        // Create rows with priorities: 5, 3, 1, 4, 2
986        let rows = vec![
987            make_row(vec![SochValue::Int(5), SochValue::Text("E".to_string())]),
988            make_row(vec![SochValue::Int(3), SochValue::Text("C".to_string())]),
989            make_row(vec![SochValue::Int(1), SochValue::Text("A".to_string())]),
990            make_row(vec![SochValue::Int(4), SochValue::Text("D".to_string())]),
991            make_row(vec![SochValue::Int(2), SochValue::Text("B".to_string())]),
992        ];
993
994        let (result, stats) = executor.execute(rows.into_iter());
995
996        // Should get priorities 1, 2, 3 (the 3 smallest)
997        assert_eq!(result.len(), 3);
998        assert_eq!(result[0].values[0], SochValue::Int(1));
999        assert_eq!(result[1].values[0], SochValue::Int(2));
1000        assert_eq!(result[2].values[0], SochValue::Int(3));
1001
1002        assert_eq!(stats.input_rows, 5);
1003        assert_eq!(stats.output_rows, 3);
1004    }
1005
1006    #[test]
1007    fn test_executor_with_offset() {
1008        let columns = vec!["priority".to_string()];
1009        let order_by = OrderBySpec::single(
1010            ColumnRef::Name("priority".to_string()),
1011            SortDirection::Ascending,
1012        );
1013
1014        let executor = OrderByLimitExecutor::new(
1015            order_by,
1016            2, // limit
1017            2, // offset
1018            columns,
1019            false,
1020            Some(10),
1021        );
1022
1023        // Priorities: 5, 3, 1, 4, 2 → sorted: 1, 2, 3, 4, 5
1024        // With offset 2, limit 2: should get 3, 4
1025        let rows = vec![
1026            make_row(vec![SochValue::Int(5)]),
1027            make_row(vec![SochValue::Int(3)]),
1028            make_row(vec![SochValue::Int(1)]),
1029            make_row(vec![SochValue::Int(4)]),
1030            make_row(vec![SochValue::Int(2)]),
1031        ];
1032
1033        let (result, _) = executor.execute(rows.into_iter());
1034
1035        assert_eq!(result.len(), 2);
1036        assert_eq!(result[0].values[0], SochValue::Int(3));
1037        assert_eq!(result[1].values[0], SochValue::Int(4));
1038    }
1039
1040    #[test]
1041    fn test_single_column_topk() {
1042        let mut topk = SingleColumnTopK::new(3, 0, true); // Column 0, ascending
1043
1044        topk.push(make_row(vec![SochValue::Int(5)]));
1045        topk.push(make_row(vec![SochValue::Int(3)]));
1046        topk.push(make_row(vec![SochValue::Int(1)]));
1047        topk.push(make_row(vec![SochValue::Int(4)]));
1048        topk.push(make_row(vec![SochValue::Int(2)]));
1049
1050        let result = topk.into_sorted_vec();
1051
1052        assert_eq!(result.len(), 3);
1053        assert_eq!(result[0].values[0], SochValue::Int(1));
1054        assert_eq!(result[1].values[0], SochValue::Int(2));
1055        assert_eq!(result[2].values[0], SochValue::Int(3));
1056    }
1057
1058    #[test]
1059    fn test_correctness_vs_buggy_implementation() {
1060        // This test demonstrates the bug in the old implementation
1061        let columns = vec!["priority".to_string()];
1062        let order_by = OrderBySpec::single(
1063            ColumnRef::Name("priority".to_string()),
1064            SortDirection::Ascending,
1065        );
1066
1067        // Rows in scan order: [5, 2, 8, 1, 9, 3]
1068        let rows: Vec<_> = [5, 2, 8, 1, 9, 3]
1069            .iter()
1070            .map(|&p| make_row(vec![SochValue::Int(p)]))
1071            .collect();
1072
1073        // BUGGY: collect first 3, then sort
1074        let buggy: Vec<_> = rows.iter().take(3).cloned().collect();
1075        // buggy contains: [5, 2, 8] → sorted: [2, 5, 8]
1076        // Bug: would return priority 2 as "smallest" but actual min is 1!
1077
1078        // CORRECT: streaming top-K
1079        let executor = OrderByLimitExecutor::new(order_by, 3, 0, columns, false, Some(6));
1080        let (correct, _) = executor.execute(rows.into_iter());
1081
1082        // Correct result: [1, 2, 3]
1083        assert_eq!(correct[0].values[0], SochValue::Int(1));
1084        assert_eq!(correct[1].values[0], SochValue::Int(2));
1085        assert_eq!(correct[2].values[0], SochValue::Int(3));
1086    }
1087
1088    #[test]
1089    fn test_multi_column_order_by() {
1090        let columns = vec!["priority".to_string(), "created_at".to_string()];
1091
1092        let order_by = OrderBySpec::single(
1093            ColumnRef::Name("priority".to_string()),
1094            SortDirection::Ascending,
1095        )
1096        .then_by(
1097            ColumnRef::Name("created_at".to_string()),
1098            SortDirection::Descending,
1099        );
1100
1101        let executor = OrderByLimitExecutor::new(order_by, 3, 0, columns, false, Some(5));
1102
1103        // Rows: (priority, created_at)
1104        let rows = vec![
1105            make_row(vec![SochValue::Int(1), SochValue::Int(100)]),
1106            make_row(vec![SochValue::Int(1), SochValue::Int(200)]), // Same priority, later
1107            make_row(vec![SochValue::Int(2), SochValue::Int(150)]),
1108            make_row(vec![SochValue::Int(1), SochValue::Int(150)]),
1109            make_row(vec![SochValue::Int(3), SochValue::Int(100)]),
1110        ];
1111
1112        let (result, _) = executor.execute(rows.into_iter());
1113
1114        // Should be: priority=1 rows ordered by created_at DESC
1115        // So: (1, 200), (1, 150), (1, 100)
1116        assert_eq!(result.len(), 3);
1117        assert_eq!(result[0].values[0], SochValue::Int(1));
1118        assert_eq!(result[0].values[1], SochValue::Int(200));
1119        assert_eq!(result[1].values[0], SochValue::Int(1));
1120        assert_eq!(result[1].values[1], SochValue::Int(150));
1121        assert_eq!(result[2].values[0], SochValue::Int(1));
1122        assert_eq!(result[2].values[1], SochValue::Int(100));
1123    }
1124}