Skip to main content

sochdb_query/
topk_executor.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! ORDER BY + LIMIT Optimization with Streaming Top-K
19//!
20//! This module fixes a critical semantic bug in the query execution path and
21//! provides efficient top-K query support for queue-like access patterns.
22//!
23//! ## The Bug: Incorrect ORDER BY + LIMIT Semantics
24//!
25//! The previous implementation applied LIMIT during scan collection, then sorted:
26//!
27//! ```text
28//! ❌ WRONG:
29//!    for row in scan:
30//!        output.push(row)
31//!        if output.len() >= limit:
32//!            break
33//!    output.sort(order_by)  # BUG: Sorting wrong subset!
34//! ```
35//!
36//! This is NOT semantically equivalent to `ORDER BY ... LIMIT K` unless the
37//! scan order matches the requested order (generally false).
38//!
39//! Example of the bug:
40//! ```text
41//! Table: [priority=5, priority=1, priority=3, priority=2, priority=4]
42//! Query: ORDER BY priority ASC LIMIT 1
43//!
44//! Wrong (limit-then-sort): Returns priority=5 (first row)
45//! Correct: Returns priority=1 (minimum)
46//! ```
47//!
48//! ## The Fix: Three Strategies
49//!
50//! | Strategy       | Time       | Space | When to Use                     |
51//! |----------------|------------|-------|----------------------------------|
52//! | IndexPushdown  | O(log N+K) | O(K)  | Ordered index on ORDER BY col   |
53//! | StreamingTopK  | O(N log K) | O(K)  | No index, K << N                |
54//! | FullSort       | O(N log N) | O(N)  | No index, K ≈ N                 |
55//!
56//! ## Streaming Top-K Algorithm
57//!
58//! For ORDER BY col ASC LIMIT K:
59//! 1. Maintain a max-heap of size K
60//! 2. For each row, if row.col < heap.max, evict max and insert row
61//! 3. At end, drain heap in sorted order
62//!
63//! This gives O(N log K) time and O(K) space, vs O(N log N) and O(N) for full sort.
64//!
65//! ## Queue Optimization
66//!
67//! For "get highest priority task" (ORDER BY priority ASC LIMIT 1):
68//! - With 10K tasks: ~10K comparisons with O(1) memory
69//! - With ordered index: ~14 comparisons (log₂ 10000)
70
71use std::cmp::Ordering;
72use std::collections::BinaryHeap;
73
74use sochdb_core::{SochRow, SochValue};
75
76// ============================================================================
77// OrderBySpec - Sort Specification
78// ============================================================================
79
80/// Sort direction
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum SortDirection {
83    Ascending,
84    Descending,
85}
86
87/// A single ORDER BY column specification
88#[derive(Debug, Clone)]
89pub struct OrderByColumn {
90    /// Column index or name
91    pub column: ColumnRef,
92    /// Sort direction
93    pub direction: SortDirection,
94    /// Null handling (NULLS FIRST or NULLS LAST)
95    pub nulls_first: bool,
96}
97
98/// Reference to a column
99#[derive(Debug, Clone)]
100pub enum ColumnRef {
101    /// Column by index
102    Index(usize),
103    /// Column by name
104    Name(String),
105}
106
107impl ColumnRef {
108    /// Resolve to an index given a column name mapping
109    pub fn resolve(&self, columns: &[String]) -> Option<usize> {
110        match self {
111            ColumnRef::Index(i) => Some(*i),
112            ColumnRef::Name(name) => columns.iter().position(|c| c == name),
113        }
114    }
115}
116
117/// Full ORDER BY specification
118#[derive(Debug, Clone)]
119pub struct OrderBySpec {
120    /// Columns to sort by (in order of priority)
121    pub columns: Vec<OrderByColumn>,
122}
123
124impl OrderBySpec {
125    /// Create from a single column
126    pub fn single(column: ColumnRef, direction: SortDirection) -> Self {
127        Self {
128            columns: vec![OrderByColumn {
129                column,
130                direction,
131                nulls_first: false,
132            }],
133        }
134    }
135
136    /// Add another column to the sort
137    pub fn then_by(mut self, column: ColumnRef, direction: SortDirection) -> Self {
138        self.columns.push(OrderByColumn {
139            column,
140            direction,
141            nulls_first: false,
142        });
143        self
144    }
145
146    /// Create a comparator function for rows
147    pub fn comparator(&self, column_names: &[String]) -> impl Fn(&SochRow, &SochRow) -> Ordering {
148        let resolved: Vec<_> = self.columns
149            .iter()
150            .filter_map(|col| {
151                col.column.resolve(column_names).map(|idx| (idx, col.direction, col.nulls_first))
152            })
153            .collect();
154        
155        move |a: &SochRow, b: &SochRow| {
156            for &(idx, direction, nulls_first) in &resolved {
157                let val_a = a.values.get(idx);
158                let val_b = b.values.get(idx);
159                
160                let ordering = compare_values(val_a, val_b, nulls_first);
161                
162                if ordering != Ordering::Equal {
163                    return match direction {
164                        SortDirection::Ascending => ordering,
165                        SortDirection::Descending => ordering.reverse(),
166                    };
167                }
168            }
169            Ordering::Equal
170        }
171    }
172
173    /// Check if an index matches this ORDER BY spec
174    pub fn matches_index(&self, index_columns: &[(String, SortDirection)]) -> bool {
175        if self.columns.len() > index_columns.len() {
176            return false;
177        }
178
179        self.columns.iter().zip(index_columns.iter()).all(|(col, (idx_col, idx_dir))| {
180            match &col.column {
181                ColumnRef::Name(name) => name == idx_col && col.direction == *idx_dir,
182                ColumnRef::Index(_) => false, // Can't match by index
183            }
184        })
185    }
186}
187
188/// Compare two optional SochValues with null handling
189fn compare_values(a: Option<&SochValue>, b: Option<&SochValue>, nulls_first: bool) -> Ordering {
190    match (a, b) {
191        (None, None) => Ordering::Equal,
192        (None, Some(_)) => if nulls_first { Ordering::Less } else { Ordering::Greater },
193        (Some(_), None) => if nulls_first { Ordering::Greater } else { Ordering::Less },
194        (Some(SochValue::Null), Some(SochValue::Null)) => Ordering::Equal,
195        (Some(SochValue::Null), Some(_)) => if nulls_first { Ordering::Less } else { Ordering::Greater },
196        (Some(_), Some(SochValue::Null)) => if nulls_first { Ordering::Greater } else { Ordering::Less },
197        (Some(a), Some(b)) => compare_soch_values(a, b),
198    }
199}
200
201/// Compare two SochValues
202fn compare_soch_values(a: &SochValue, b: &SochValue) -> Ordering {
203    match (a, b) {
204        (SochValue::Int(a), SochValue::Int(b)) => a.cmp(b),
205        (SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b),
206        (SochValue::Float(a), SochValue::Float(b)) => a.partial_cmp(b).unwrap_or(Ordering::Equal),
207        (SochValue::Text(a), SochValue::Text(b)) => a.cmp(b),
208        (SochValue::Bool(a), SochValue::Bool(b)) => a.cmp(b),
209        _ => Ordering::Equal, // Incompatible types compare as equal
210    }
211}
212
213// ============================================================================
214// TopKHeap - Generic Streaming Top-K
215// ============================================================================
216
217/// A bounded heap for streaming top-K selection
218///
219/// This maintains the K smallest (or largest) elements seen so far,
220/// without storing all N elements.
221///
222/// ## Complexity
223/// - Push: O(log K) when heap is full, O(log K) insertion
224/// - Drain: O(K log K) to produce sorted output
225/// - Space: O(K)
226pub struct TopKHeap<T, F>
227where
228    F: Fn(&T, &T) -> Ordering,
229{
230    /// The heap (max-heap for smallest K, min-heap for largest K)
231    heap: BinaryHeap<ComparableWrapper<T, F>>,
232    /// Maximum size
233    k: usize,
234    /// Comparator (defines desired output order)
235    comparator: F,
236    /// Whether we want smallest K (true) or largest K (false)
237    want_smallest: bool,
238}
239
240/// Wrapper to make items comparable via the provided function
241struct ComparableWrapper<T, F>
242where
243    F: Fn(&T, &T) -> Ordering,
244{
245    value: T,
246    comparator: *const F,
247    inverted: bool,
248}
249
250// Safety: We ensure the comparator pointer remains valid for the lifetime of the heap
251unsafe impl<T: Send, F> Send for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
252unsafe impl<T: Sync, F> Sync for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
253
254impl<T, F> PartialEq for ComparableWrapper<T, F>
255where
256    F: Fn(&T, &T) -> Ordering,
257{
258    fn eq(&self, other: &Self) -> bool {
259        self.cmp(other) == Ordering::Equal
260    }
261}
262
263impl<T, F> Eq for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
264
265impl<T, F> PartialOrd for ComparableWrapper<T, F>
266where
267    F: Fn(&T, &T) -> Ordering,
268{
269    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
270        Some(self.cmp(other))
271    }
272}
273
274impl<T, F> Ord for ComparableWrapper<T, F>
275where
276    F: Fn(&T, &T) -> Ordering,
277{
278    fn cmp(&self, other: &Self) -> Ordering {
279        // Safety: comparator pointer is valid for heap lifetime
280        let cmp = unsafe { &*self.comparator };
281        let result = cmp(&self.value, &other.value);
282        
283        if self.inverted {
284            result.reverse()
285        } else {
286            result
287        }
288    }
289}
290
291impl<T, F> TopKHeap<T, F>
292where
293    F: Fn(&T, &T) -> Ordering,
294{
295    /// Create a new top-K heap
296    ///
297    /// - `k`: Number of elements to keep
298    /// - `comparator`: Defines the desired output order (Less = should come first)
299    /// - `want_smallest`: If true, keep the K elements that compare as smallest
300    pub fn new(k: usize, comparator: F, want_smallest: bool) -> Self {
301        Self {
302            heap: BinaryHeap::with_capacity(k + 1),
303            k,
304            comparator,
305            want_smallest,
306        }
307    }
308
309    /// Push an element into the heap
310    ///
311    /// Complexity: O(log K)
312    pub fn push(&mut self, value: T) {
313        if self.k == 0 {
314            return;
315        }
316
317        let wrapper = ComparableWrapper {
318            value,
319            comparator: &self.comparator as *const F,
320            // For smallest K, we want a max-heap (so we can evict the largest)
321            // For largest K, we want a min-heap (so we can evict the smallest)
322            inverted: !self.want_smallest,
323        };
324
325        if self.heap.len() < self.k {
326            self.heap.push(wrapper);
327        } else if let Some(top) = self.heap.peek() {
328            // Check if new element should replace the current boundary
329            let should_replace = if self.want_smallest {
330                // For smallest K: replace if new < current max
331                (self.comparator)(&wrapper.value, &top.value) == Ordering::Less
332            } else {
333                // For largest K: replace if new > current min
334                (self.comparator)(&wrapper.value, &top.value) == Ordering::Greater
335            };
336
337            if should_replace {
338                self.heap.pop();
339                self.heap.push(wrapper);
340            }
341        }
342    }
343
344    /// Get the current boundary value
345    ///
346    /// This is the value that new elements must beat to be included.
347    pub fn threshold(&self) -> Option<&T> {
348        self.heap.peek().map(|w| &w.value)
349    }
350
351    /// Check if the heap is at capacity
352    pub fn is_full(&self) -> bool {
353        self.heap.len() >= self.k
354    }
355
356    /// Drain the heap into a sorted vector
357    ///
358    /// Complexity: O(K log K)
359    pub fn into_sorted_vec(self) -> Vec<T> {
360        let mut values: Vec<_> = self.heap.into_iter().map(|w| w.value).collect();
361        if self.want_smallest {
362            values.sort_by(&self.comparator);
363        } else {
364            // For largest K, sort in descending order
365            values.sort_by(|a, b| (&self.comparator)(b, a));
366        }
367        values
368    }
369
370    /// Current number of elements
371    pub fn len(&self) -> usize {
372        self.heap.len()
373    }
374
375    /// Check if empty
376    pub fn is_empty(&self) -> bool {
377        self.heap.is_empty()
378    }
379}
380
381// ============================================================================
382// OrderByLimitExecutor - The Fixed Implementation
383// ============================================================================
384
385/// Strategy for ORDER BY + LIMIT execution
386#[derive(Debug, Clone, Copy, PartialEq, Eq)]
387pub enum ExecutionStrategy {
388    /// Use index pushdown (storage provides ordered results)
389    IndexPushdown,
390    /// Streaming top-K with heap
391    StreamingTopK,
392    /// Full sort then limit
393    FullSort,
394}
395
396impl ExecutionStrategy {
397    /// Choose the optimal strategy
398    pub fn choose(
399        has_matching_index: bool,
400        estimated_rows: Option<usize>,
401        limit: usize,
402    ) -> Self {
403        // If we have a matching index, always use pushdown
404        if has_matching_index {
405            return ExecutionStrategy::IndexPushdown;
406        }
407
408        // If we don't know the row count, use streaming (safe default)
409        let n = match estimated_rows {
410            Some(n) if n > 0 => n,
411            _ => return ExecutionStrategy::StreamingTopK,
412        };
413
414        // Heuristic: streaming is better when K < sqrt(N) or K is "small"
415        // Break-even is approximately when K * log(K) < N, but we use simpler heuristic
416        let k = limit;
417        
418        if k <= 100 {
419            // Small K: streaming is almost always better
420            ExecutionStrategy::StreamingTopK
421        } else if (k as f64) < (n as f64).sqrt() {
422            // K < sqrt(N): streaming wins
423            ExecutionStrategy::StreamingTopK
424        } else {
425            // Large K relative to N: full sort may be better
426            // (avoids heap overhead when keeping most of the data)
427            ExecutionStrategy::FullSort
428        }
429    }
430
431    /// Get estimated complexity description
432    pub fn complexity(&self, n: usize, k: usize) -> String {
433        match self {
434            ExecutionStrategy::IndexPushdown => {
435                format!("O(log {} + {}) = O({})", n, k, (n as f64).log2() as usize + k)
436            }
437            ExecutionStrategy::StreamingTopK => {
438                let log_k = (k as f64).log2().max(1.0) as usize;
439                format!("O({} * log {}) ≈ O({})", n, k, n * log_k)
440            }
441            ExecutionStrategy::FullSort => {
442                let log_n = (n as f64).log2().max(1.0) as usize;
443                format!("O({} * log {}) ≈ O({})", n, n, n * log_n)
444            }
445        }
446    }
447}
448
449/// Result statistics from ORDER BY + LIMIT execution
450#[derive(Debug, Clone, Default)]
451pub struct OrderByLimitStats {
452    /// Strategy used
453    pub strategy: Option<ExecutionStrategy>,
454    /// Input rows processed
455    pub input_rows: usize,
456    /// Output rows produced
457    pub output_rows: usize,
458    /// Heap operations performed
459    pub heap_operations: usize,
460    /// Comparisons performed
461    pub comparisons: usize,
462    /// Rows skipped by offset
463    pub offset_skipped: usize,
464}
465
466/// Executor for ORDER BY + LIMIT queries
467///
468/// This is the CORRECT implementation that ensures semantic equivalence
469/// with `ORDER BY ... LIMIT K OFFSET M`.
470pub struct OrderByLimitExecutor {
471    /// ORDER BY specification
472    order_by: OrderBySpec,
473    /// LIMIT value
474    limit: usize,
475    /// OFFSET value
476    offset: usize,
477    /// Column names for resolution
478    column_names: Vec<String>,
479    /// Execution strategy
480    strategy: ExecutionStrategy,
481}
482
483impl OrderByLimitExecutor {
484    /// Create a new executor
485    pub fn new(
486        order_by: OrderBySpec,
487        limit: usize,
488        offset: usize,
489        column_names: Vec<String>,
490        has_matching_index: bool,
491        estimated_rows: Option<usize>,
492    ) -> Self {
493        // For OFFSET, we need to fetch limit + offset rows
494        let effective_limit = limit.saturating_add(offset);
495        let strategy = ExecutionStrategy::choose(has_matching_index, estimated_rows, effective_limit);
496        
497        Self {
498            order_by,
499            limit,
500            offset,
501            column_names,
502            strategy,
503        }
504    }
505
506    /// Get the chosen strategy
507    pub fn strategy(&self) -> ExecutionStrategy {
508        self.strategy
509    }
510
511    /// Execute on an iterator of rows
512    ///
513    /// This is the main entry point. It:
514    /// 1. Applies the chosen strategy to get top (limit + offset) rows
515    /// 2. Applies offset
516    /// 3. Returns the final result
517    pub fn execute<I>(&self, rows: I) -> (Vec<SochRow>, OrderByLimitStats)
518    where
519        I: Iterator<Item = SochRow>,
520    {
521        let mut stats = OrderByLimitStats {
522            strategy: Some(self.strategy),
523            ..Default::default()
524        };
525
526        let effective_limit = self.limit.saturating_add(self.offset);
527        
528        let result = match self.strategy {
529            ExecutionStrategy::IndexPushdown => {
530                // With index, just take the first limit+offset rows
531                // (caller must provide rows in correct order!)
532                let collected: Vec<_> = rows.take(effective_limit).collect();
533                stats.input_rows = collected.len();
534                collected
535            }
536            ExecutionStrategy::StreamingTopK => {
537                self.execute_streaming(rows, effective_limit, &mut stats)
538            }
539            ExecutionStrategy::FullSort => {
540                self.execute_full_sort(rows, effective_limit, &mut stats)
541            }
542        };
543
544        // Apply offset
545        let final_result: Vec<_> = result
546            .into_iter()
547            .skip(self.offset)
548            .take(self.limit)
549            .collect();
550        
551        stats.offset_skipped = self.offset.min(stats.input_rows);
552        stats.output_rows = final_result.len();
553        
554        (final_result, stats)
555    }
556
557    /// Execute using streaming top-K algorithm
558    fn execute_streaming<I>(
559        &self,
560        rows: I,
561        k: usize,
562        stats: &mut OrderByLimitStats,
563    ) -> Vec<SochRow>
564    where
565        I: Iterator<Item = SochRow>,
566    {
567        let comparator = self.order_by.comparator(&self.column_names);
568        
569        // We want the K smallest according to the comparator
570        let mut heap = TopKHeap::new(k, comparator, true);
571        
572        for row in rows {
573            stats.input_rows += 1;
574            stats.heap_operations += 1;
575            heap.push(row);
576        }
577
578        heap.into_sorted_vec()
579    }
580
581    /// Execute using full sort
582    fn execute_full_sort<I>(
583        &self,
584        rows: I,
585        k: usize,
586        stats: &mut OrderByLimitStats,
587    ) -> Vec<SochRow>
588    where
589        I: Iterator<Item = SochRow>,
590    {
591        let comparator = self.order_by.comparator(&self.column_names);
592        
593        let mut all_rows: Vec<_> = rows.collect();
594        stats.input_rows = all_rows.len();
595        
596        all_rows.sort_by(&comparator);
597        
598        all_rows.truncate(k);
599        all_rows
600    }
601}
602
603// ============================================================================
604// IndexAwareTopK - For Index Pushdown with Partial Match
605// ============================================================================
606
607/// Top-K with index awareness
608///
609/// When the index only partially matches the ORDER BY (e.g., index on col1
610/// but ORDER BY col1, col2), we can still use the index for the first column
611/// and apply top-K for the rest.
612pub struct IndexAwareTopK<T, F>
613where
614    F: Fn(&T, &T) -> Ordering,
615{
616    /// Current batch of items with same index key
617    current_batch: Vec<T>,
618    /// Best items seen so far
619    result: Vec<T>,
620    /// Maximum items to keep
621    k: usize,
622    /// Secondary comparator (for columns not in index)
623    secondary_cmp: F,
624}
625
626impl<T, F> IndexAwareTopK<T, F>
627where
628    F: Fn(&T, &T) -> Ordering,
629{
630    /// Create new index-aware top-K
631    pub fn new(k: usize, secondary_cmp: F) -> Self {
632        Self {
633            current_batch: Vec::new(),
634            result: Vec::with_capacity(k),
635            k,
636            secondary_cmp,
637        }
638    }
639
640    /// Process an item from an index-ordered scan
641    ///
642    /// Items must be provided in index order. When the index key changes,
643    /// the previous batch is finalized.
644    pub fn push(&mut self, item: T, same_index_key_as_previous: bool) {
645        if !same_index_key_as_previous {
646            self.finalize_batch();
647        }
648        
649        self.current_batch.push(item);
650    }
651
652    /// Finalize the current batch (sort by secondary key)
653    fn finalize_batch(&mut self) {
654        if self.current_batch.is_empty() {
655            return;
656        }
657
658        // Sort batch by secondary key
659        self.current_batch.sort_by(&self.secondary_cmp);
660
661        // Take as many as we need
662        let remaining = self.k.saturating_sub(self.result.len());
663        let to_take = remaining.min(self.current_batch.len());
664        
665        self.result.extend(self.current_batch.drain(..to_take));
666        self.current_batch.clear();
667    }
668
669    /// Check if we have enough results
670    pub fn is_complete(&self) -> bool {
671        self.result.len() >= self.k
672    }
673
674    /// Drain into final result
675    pub fn into_result(mut self) -> Vec<T> {
676        self.finalize_batch();
677        self.result
678    }
679}
680
681// ============================================================================
682// SingleColumnTopK - Optimized for Single Column
683// ============================================================================
684
685/// Optimized top-K for single-column ORDER BY
686///
687/// Avoids the overhead of multi-column comparison when only one column is involved.
688pub struct SingleColumnTopK {
689    /// The heap
690    heap: BinaryHeap<SingleColEntry>,
691    /// K value
692    k: usize,
693    /// Column index
694    col_idx: usize,
695    /// Whether ascending order
696    ascending: bool,
697}
698
699struct SingleColEntry {
700    row: SochRow,
701    key: OrderableValue,
702    ascending: bool,
703}
704
705/// Wrapper to make SochValue orderable
706#[derive(Clone)]
707enum OrderableValue {
708    Int(i64),
709    UInt(u64),
710    Float(f64),
711    Text(String),
712    Bool(bool),
713    Null,
714}
715
716impl From<&SochValue> for OrderableValue {
717    fn from(v: &SochValue) -> Self {
718        match v {
719            SochValue::Int(i) => OrderableValue::Int(*i),
720            SochValue::UInt(u) => OrderableValue::UInt(*u),
721            SochValue::Float(f) => OrderableValue::Float(*f),
722            SochValue::Text(s) => OrderableValue::Text(s.clone()),
723            SochValue::Bool(b) => OrderableValue::Bool(*b),
724            SochValue::Null => OrderableValue::Null,
725            _ => OrderableValue::Null,
726        }
727    }
728}
729
730impl PartialEq for OrderableValue {
731    fn eq(&self, other: &Self) -> bool {
732        self.cmp(other) == Ordering::Equal
733    }
734}
735
736impl Eq for OrderableValue {}
737
738impl PartialOrd for OrderableValue {
739    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
740        Some(self.cmp(other))
741    }
742}
743
744impl Ord for OrderableValue {
745    fn cmp(&self, other: &Self) -> Ordering {
746        match (self, other) {
747            (OrderableValue::Null, OrderableValue::Null) => Ordering::Equal,
748            (OrderableValue::Null, _) => Ordering::Greater, // NULLS LAST
749            (_, OrderableValue::Null) => Ordering::Less,
750            (OrderableValue::Int(a), OrderableValue::Int(b)) => a.cmp(b),
751            (OrderableValue::UInt(a), OrderableValue::UInt(b)) => a.cmp(b),
752            (OrderableValue::Float(a), OrderableValue::Float(b)) => {
753                a.partial_cmp(b).unwrap_or(Ordering::Equal)
754            }
755            (OrderableValue::Text(a), OrderableValue::Text(b)) => a.cmp(b),
756            (OrderableValue::Bool(a), OrderableValue::Bool(b)) => a.cmp(b),
757            _ => Ordering::Equal, // Incompatible types
758        }
759    }
760}
761
762impl PartialEq for SingleColEntry {
763    fn eq(&self, other: &Self) -> bool {
764        self.key == other.key
765    }
766}
767
768impl Eq for SingleColEntry {}
769
770impl PartialOrd for SingleColEntry {
771    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
772        Some(self.cmp(other))
773    }
774}
775
776impl Ord for SingleColEntry {
777    fn cmp(&self, other: &Self) -> Ordering {
778        let base = self.key.cmp(&other.key);
779        
780        // For ascending + max-heap, we want to evict the largest,
781        // so we don't invert. For descending, we invert.
782        if self.ascending {
783            base
784        } else {
785            base.reverse()
786        }
787    }
788}
789
790impl SingleColumnTopK {
791    /// Create new single-column top-K
792    pub fn new(k: usize, col_idx: usize, ascending: bool) -> Self {
793        Self {
794            heap: BinaryHeap::with_capacity(k + 1),
795            k,
796            col_idx,
797            ascending,
798        }
799    }
800
801    /// Push a row
802    pub fn push(&mut self, row: SochRow) {
803        if self.k == 0 {
804            return;
805        }
806
807        let key = row.values
808            .get(self.col_idx)
809            .map(OrderableValue::from)
810            .unwrap_or(OrderableValue::Null);
811
812        let entry = SingleColEntry {
813            row,
814            key,
815            ascending: self.ascending,
816        };
817
818        if self.heap.len() < self.k {
819            self.heap.push(entry);
820        } else if let Some(top) = self.heap.peek() {
821            let should_replace = if self.ascending {
822                entry.key < top.key
823            } else {
824                entry.key > top.key
825            };
826
827            if should_replace {
828                self.heap.pop();
829                self.heap.push(entry);
830            }
831        }
832    }
833
834    /// Drain into sorted vector
835    pub fn into_sorted_vec(self) -> Vec<SochRow> {
836        let mut entries: Vec<_> = self.heap.into_iter().collect();
837        
838        entries.sort_by(|a, b| {
839            let base = a.key.cmp(&b.key);
840            if self.ascending { base } else { base.reverse() }
841        });
842        
843        entries.into_iter().map(|e| e.row).collect()
844    }
845
846    /// Current count
847    pub fn len(&self) -> usize {
848        self.heap.len()
849    }
850
851    /// Is empty
852    pub fn is_empty(&self) -> bool {
853        self.heap.is_empty()
854    }
855}
856
857// ============================================================================
858// Tests
859// ============================================================================
860
861#[cfg(test)]
862mod tests {
863    use super::*;
864
865    fn make_row(values: Vec<SochValue>) -> SochRow {
866        SochRow::new(values)
867    }
868
869    #[test]
870    fn test_strategy_selection() {
871        // With index: always pushdown
872        assert_eq!(
873            ExecutionStrategy::choose(true, Some(1_000_000), 10),
874            ExecutionStrategy::IndexPushdown
875        );
876
877        // Small K without index: streaming
878        assert_eq!(
879            ExecutionStrategy::choose(false, Some(1_000_000), 10),
880            ExecutionStrategy::StreamingTopK
881        );
882
883        // Large K relative to N (K > sqrt(N) and K > 100): full sort
884        // For N=1000, sqrt(N) ≈ 31.6, so K=500 > sqrt(1000) and K > 100
885        assert_eq!(
886            ExecutionStrategy::choose(false, Some(1000), 500),
887            ExecutionStrategy::FullSort
888        );
889        
890        // K <= 100: always streaming even if K > sqrt(N)
891        assert_eq!(
892            ExecutionStrategy::choose(false, Some(100), 90),
893            ExecutionStrategy::StreamingTopK
894        );
895    }
896
897    #[test]
898    fn test_order_by_spec_comparator() {
899        let spec = OrderBySpec::single(
900            ColumnRef::Name("priority".to_string()),
901            SortDirection::Ascending,
902        );
903        
904        let columns = vec!["id".to_string(), "priority".to_string(), "name".to_string()];
905        let cmp = spec.comparator(&columns);
906        
907        let row1 = make_row(vec![
908            SochValue::Int(1),
909            SochValue::Int(5),
910            SochValue::Text("A".to_string()),
911        ]);
912        let row2 = make_row(vec![
913            SochValue::Int(2),
914            SochValue::Int(3),
915            SochValue::Text("B".to_string()),
916        ]);
917        
918        // row2 has lower priority, should come first in ASC
919        assert_eq!(cmp(&row2, &row1), Ordering::Less);
920    }
921
922    #[test]
923    fn test_topk_heap_ascending() {
924        let cmp = |a: &i32, b: &i32| a.cmp(b);
925        let mut heap = TopKHeap::new(3, cmp, true);
926        
927        for i in [5, 2, 8, 1, 9, 3, 7, 4, 6] {
928            heap.push(i);
929        }
930        
931        let result = heap.into_sorted_vec();
932        assert_eq!(result, vec![1, 2, 3]);
933    }
934
935    #[test]
936    fn test_topk_heap_descending() {
937        let cmp = |a: &i32, b: &i32| a.cmp(b);
938        let mut heap = TopKHeap::new(3, cmp, false);
939        
940        for i in [5, 2, 8, 1, 9, 3, 7, 4, 6] {
941            heap.push(i);
942        }
943        
944        let result = heap.into_sorted_vec();
945        // Descending order
946        assert_eq!(result, vec![9, 8, 7]);
947    }
948
949    #[test]
950    fn test_executor_streaming() {
951        let columns = vec!["priority".to_string(), "name".to_string()];
952        let order_by = OrderBySpec::single(
953            ColumnRef::Name("priority".to_string()),
954            SortDirection::Ascending,
955        );
956        
957        let executor = OrderByLimitExecutor::new(
958            order_by,
959            3,      // limit
960            0,      // offset
961            columns.clone(),
962            false,  // no index
963            Some(10),
964        );
965        
966        // Create rows with priorities: 5, 3, 1, 4, 2
967        let rows = vec![
968            make_row(vec![SochValue::Int(5), SochValue::Text("E".to_string())]),
969            make_row(vec![SochValue::Int(3), SochValue::Text("C".to_string())]),
970            make_row(vec![SochValue::Int(1), SochValue::Text("A".to_string())]),
971            make_row(vec![SochValue::Int(4), SochValue::Text("D".to_string())]),
972            make_row(vec![SochValue::Int(2), SochValue::Text("B".to_string())]),
973        ];
974        
975        let (result, stats) = executor.execute(rows.into_iter());
976        
977        // Should get priorities 1, 2, 3 (the 3 smallest)
978        assert_eq!(result.len(), 3);
979        assert_eq!(result[0].values[0], SochValue::Int(1));
980        assert_eq!(result[1].values[0], SochValue::Int(2));
981        assert_eq!(result[2].values[0], SochValue::Int(3));
982        
983        assert_eq!(stats.input_rows, 5);
984        assert_eq!(stats.output_rows, 3);
985    }
986
987    #[test]
988    fn test_executor_with_offset() {
989        let columns = vec!["priority".to_string()];
990        let order_by = OrderBySpec::single(
991            ColumnRef::Name("priority".to_string()),
992            SortDirection::Ascending,
993        );
994        
995        let executor = OrderByLimitExecutor::new(
996            order_by,
997            2,      // limit
998            2,      // offset
999            columns,
1000            false,
1001            Some(10),
1002        );
1003        
1004        // Priorities: 5, 3, 1, 4, 2 → sorted: 1, 2, 3, 4, 5
1005        // With offset 2, limit 2: should get 3, 4
1006        let rows = vec![
1007            make_row(vec![SochValue::Int(5)]),
1008            make_row(vec![SochValue::Int(3)]),
1009            make_row(vec![SochValue::Int(1)]),
1010            make_row(vec![SochValue::Int(4)]),
1011            make_row(vec![SochValue::Int(2)]),
1012        ];
1013        
1014        let (result, _) = executor.execute(rows.into_iter());
1015        
1016        assert_eq!(result.len(), 2);
1017        assert_eq!(result[0].values[0], SochValue::Int(3));
1018        assert_eq!(result[1].values[0], SochValue::Int(4));
1019    }
1020
1021    #[test]
1022    fn test_single_column_topk() {
1023        let mut topk = SingleColumnTopK::new(3, 0, true); // Column 0, ascending
1024        
1025        topk.push(make_row(vec![SochValue::Int(5)]));
1026        topk.push(make_row(vec![SochValue::Int(3)]));
1027        topk.push(make_row(vec![SochValue::Int(1)]));
1028        topk.push(make_row(vec![SochValue::Int(4)]));
1029        topk.push(make_row(vec![SochValue::Int(2)]));
1030        
1031        let result = topk.into_sorted_vec();
1032        
1033        assert_eq!(result.len(), 3);
1034        assert_eq!(result[0].values[0], SochValue::Int(1));
1035        assert_eq!(result[1].values[0], SochValue::Int(2));
1036        assert_eq!(result[2].values[0], SochValue::Int(3));
1037    }
1038
1039    #[test]
1040    fn test_correctness_vs_buggy_implementation() {
1041        // This test demonstrates the bug in the old implementation
1042        let columns = vec!["priority".to_string()];
1043        let order_by = OrderBySpec::single(
1044            ColumnRef::Name("priority".to_string()),
1045            SortDirection::Ascending,
1046        );
1047        
1048        // Rows in scan order: [5, 2, 8, 1, 9, 3]
1049        let rows: Vec<_> = [5, 2, 8, 1, 9, 3]
1050            .iter()
1051            .map(|&p| make_row(vec![SochValue::Int(p)]))
1052            .collect();
1053        
1054        // BUGGY: collect first 3, then sort
1055        let buggy: Vec<_> = rows.iter().take(3).cloned().collect();
1056        // buggy contains: [5, 2, 8] → sorted: [2, 5, 8]
1057        // Bug: would return priority 2 as "smallest" but actual min is 1!
1058        
1059        // CORRECT: streaming top-K
1060        let executor = OrderByLimitExecutor::new(
1061            order_by,
1062            3,
1063            0,
1064            columns,
1065            false,
1066            Some(6),
1067        );
1068        let (correct, _) = executor.execute(rows.into_iter());
1069        
1070        // Correct result: [1, 2, 3]
1071        assert_eq!(correct[0].values[0], SochValue::Int(1));
1072        assert_eq!(correct[1].values[0], SochValue::Int(2));
1073        assert_eq!(correct[2].values[0], SochValue::Int(3));
1074    }
1075
1076    #[test]
1077    fn test_multi_column_order_by() {
1078        let columns = vec!["priority".to_string(), "created_at".to_string()];
1079        
1080        let order_by = OrderBySpec::single(
1081            ColumnRef::Name("priority".to_string()),
1082            SortDirection::Ascending,
1083        ).then_by(
1084            ColumnRef::Name("created_at".to_string()),
1085            SortDirection::Descending,
1086        );
1087        
1088        let executor = OrderByLimitExecutor::new(
1089            order_by,
1090            3,
1091            0,
1092            columns,
1093            false,
1094            Some(5),
1095        );
1096        
1097        // Rows: (priority, created_at)
1098        let rows = vec![
1099            make_row(vec![SochValue::Int(1), SochValue::Int(100)]),
1100            make_row(vec![SochValue::Int(1), SochValue::Int(200)]), // Same priority, later
1101            make_row(vec![SochValue::Int(2), SochValue::Int(150)]),
1102            make_row(vec![SochValue::Int(1), SochValue::Int(150)]),
1103            make_row(vec![SochValue::Int(3), SochValue::Int(100)]),
1104        ];
1105        
1106        let (result, _) = executor.execute(rows.into_iter());
1107        
1108        // Should be: priority=1 rows ordered by created_at DESC
1109        // So: (1, 200), (1, 150), (1, 100)
1110        assert_eq!(result.len(), 3);
1111        assert_eq!(result[0].values[0], SochValue::Int(1));
1112        assert_eq!(result[0].values[1], SochValue::Int(200));
1113        assert_eq!(result[1].values[0], SochValue::Int(1));
1114        assert_eq!(result[1].values[1], SochValue::Int(150));
1115        assert_eq!(result[2].values[0], SochValue::Int(1));
1116        assert_eq!(result[2].values[1], SochValue::Int(100));
1117    }
1118}