sochdb_query/
topk_executor.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! ORDER BY + LIMIT Optimization with Streaming Top-K
16//!
17//! This module fixes a critical semantic bug in the query execution path and
18//! provides efficient top-K query support for queue-like access patterns.
19//!
20//! ## The Bug: Incorrect ORDER BY + LIMIT Semantics
21//!
22//! The previous implementation applied LIMIT during scan collection, then sorted:
23//!
24//! ```text
25//! ❌ WRONG:
26//!    for row in scan:
27//!        output.push(row)
28//!        if output.len() >= limit:
29//!            break
30//!    output.sort(order_by)  # BUG: Sorting wrong subset!
31//! ```
32//!
33//! This is NOT semantically equivalent to `ORDER BY ... LIMIT K` unless the
34//! scan order matches the requested order (generally false).
35//!
36//! Example of the bug:
37//! ```text
38//! Table: [priority=5, priority=1, priority=3, priority=2, priority=4]
39//! Query: ORDER BY priority ASC LIMIT 1
40//!
41//! Wrong (limit-then-sort): Returns priority=5 (first row)
42//! Correct: Returns priority=1 (minimum)
43//! ```
44//!
45//! ## The Fix: Three Strategies
46//!
47//! | Strategy       | Time       | Space | When to Use                     |
48//! |----------------|------------|-------|----------------------------------|
49//! | IndexPushdown  | O(log N+K) | O(K)  | Ordered index on ORDER BY col   |
50//! | StreamingTopK  | O(N log K) | O(K)  | No index, K << N                |
51//! | FullSort       | O(N log N) | O(N)  | No index, K ≈ N                 |
52//!
53//! ## Streaming Top-K Algorithm
54//!
55//! For ORDER BY col ASC LIMIT K:
56//! 1. Maintain a max-heap of size K
57//! 2. For each row, if row.col < heap.max, evict max and insert row
58//! 3. At end, drain heap in sorted order
59//!
60//! This gives O(N log K) time and O(K) space, vs O(N log N) and O(N) for full sort.
61//!
62//! ## Queue Optimization
63//!
64//! For "get highest priority task" (ORDER BY priority ASC LIMIT 1):
65//! - With 10K tasks: ~10K comparisons with O(1) memory
66//! - With ordered index: ~14 comparisons (log₂ 10000)
67
68use std::cmp::Ordering;
69use std::collections::BinaryHeap;
70
71use sochdb_core::{SochRow, SochValue};
72
73// ============================================================================
74// OrderBySpec - Sort Specification
75// ============================================================================
76
77/// Sort direction
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum SortDirection {
80    Ascending,
81    Descending,
82}
83
84/// A single ORDER BY column specification
85#[derive(Debug, Clone)]
86pub struct OrderByColumn {
87    /// Column index or name
88    pub column: ColumnRef,
89    /// Sort direction
90    pub direction: SortDirection,
91    /// Null handling (NULLS FIRST or NULLS LAST)
92    pub nulls_first: bool,
93}
94
95/// Reference to a column
96#[derive(Debug, Clone)]
97pub enum ColumnRef {
98    /// Column by index
99    Index(usize),
100    /// Column by name
101    Name(String),
102}
103
104impl ColumnRef {
105    /// Resolve to an index given a column name mapping
106    pub fn resolve(&self, columns: &[String]) -> Option<usize> {
107        match self {
108            ColumnRef::Index(i) => Some(*i),
109            ColumnRef::Name(name) => columns.iter().position(|c| c == name),
110        }
111    }
112}
113
114/// Full ORDER BY specification
115#[derive(Debug, Clone)]
116pub struct OrderBySpec {
117    /// Columns to sort by (in order of priority)
118    pub columns: Vec<OrderByColumn>,
119}
120
121impl OrderBySpec {
122    /// Create from a single column
123    pub fn single(column: ColumnRef, direction: SortDirection) -> Self {
124        Self {
125            columns: vec![OrderByColumn {
126                column,
127                direction,
128                nulls_first: false,
129            }],
130        }
131    }
132
133    /// Add another column to the sort
134    pub fn then_by(mut self, column: ColumnRef, direction: SortDirection) -> Self {
135        self.columns.push(OrderByColumn {
136            column,
137            direction,
138            nulls_first: false,
139        });
140        self
141    }
142
143    /// Create a comparator function for rows
144    pub fn comparator(&self, column_names: &[String]) -> impl Fn(&SochRow, &SochRow) -> Ordering {
145        let resolved: Vec<_> = self.columns
146            .iter()
147            .filter_map(|col| {
148                col.column.resolve(column_names).map(|idx| (idx, col.direction, col.nulls_first))
149            })
150            .collect();
151        
152        move |a: &SochRow, b: &SochRow| {
153            for &(idx, direction, nulls_first) in &resolved {
154                let val_a = a.values.get(idx);
155                let val_b = b.values.get(idx);
156                
157                let ordering = compare_values(val_a, val_b, nulls_first);
158                
159                if ordering != Ordering::Equal {
160                    return match direction {
161                        SortDirection::Ascending => ordering,
162                        SortDirection::Descending => ordering.reverse(),
163                    };
164                }
165            }
166            Ordering::Equal
167        }
168    }
169
170    /// Check if an index matches this ORDER BY spec
171    pub fn matches_index(&self, index_columns: &[(String, SortDirection)]) -> bool {
172        if self.columns.len() > index_columns.len() {
173            return false;
174        }
175
176        self.columns.iter().zip(index_columns.iter()).all(|(col, (idx_col, idx_dir))| {
177            match &col.column {
178                ColumnRef::Name(name) => name == idx_col && col.direction == *idx_dir,
179                ColumnRef::Index(_) => false, // Can't match by index
180            }
181        })
182    }
183}
184
185/// Compare two optional SochValues with null handling
186fn compare_values(a: Option<&SochValue>, b: Option<&SochValue>, nulls_first: bool) -> Ordering {
187    match (a, b) {
188        (None, None) => Ordering::Equal,
189        (None, Some(_)) => if nulls_first { Ordering::Less } else { Ordering::Greater },
190        (Some(_), None) => if nulls_first { Ordering::Greater } else { Ordering::Less },
191        (Some(SochValue::Null), Some(SochValue::Null)) => Ordering::Equal,
192        (Some(SochValue::Null), Some(_)) => if nulls_first { Ordering::Less } else { Ordering::Greater },
193        (Some(_), Some(SochValue::Null)) => if nulls_first { Ordering::Greater } else { Ordering::Less },
194        (Some(a), Some(b)) => compare_soch_values(a, b),
195    }
196}
197
198/// Compare two SochValues
199fn compare_soch_values(a: &SochValue, b: &SochValue) -> Ordering {
200    match (a, b) {
201        (SochValue::Int(a), SochValue::Int(b)) => a.cmp(b),
202        (SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b),
203        (SochValue::Float(a), SochValue::Float(b)) => a.partial_cmp(b).unwrap_or(Ordering::Equal),
204        (SochValue::Text(a), SochValue::Text(b)) => a.cmp(b),
205        (SochValue::Bool(a), SochValue::Bool(b)) => a.cmp(b),
206        _ => Ordering::Equal, // Incompatible types compare as equal
207    }
208}
209
210// ============================================================================
211// TopKHeap - Generic Streaming Top-K
212// ============================================================================
213
214/// A bounded heap for streaming top-K selection
215///
216/// This maintains the K smallest (or largest) elements seen so far,
217/// without storing all N elements.
218///
219/// ## Complexity
220/// - Push: O(log K) when heap is full, O(log K) insertion
221/// - Drain: O(K log K) to produce sorted output
222/// - Space: O(K)
223pub struct TopKHeap<T, F>
224where
225    F: Fn(&T, &T) -> Ordering,
226{
227    /// The heap (max-heap for smallest K, min-heap for largest K)
228    heap: BinaryHeap<ComparableWrapper<T, F>>,
229    /// Maximum size
230    k: usize,
231    /// Comparator (defines desired output order)
232    comparator: F,
233    /// Whether we want smallest K (true) or largest K (false)
234    want_smallest: bool,
235}
236
237/// Wrapper to make items comparable via the provided function
238struct ComparableWrapper<T, F>
239where
240    F: Fn(&T, &T) -> Ordering,
241{
242    value: T,
243    comparator: *const F,
244    inverted: bool,
245}
246
247// Safety: We ensure the comparator pointer remains valid for the lifetime of the heap
248unsafe impl<T: Send, F> Send for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
249unsafe impl<T: Sync, F> Sync for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
250
251impl<T, F> PartialEq for ComparableWrapper<T, F>
252where
253    F: Fn(&T, &T) -> Ordering,
254{
255    fn eq(&self, other: &Self) -> bool {
256        self.cmp(other) == Ordering::Equal
257    }
258}
259
260impl<T, F> Eq for ComparableWrapper<T, F> where F: Fn(&T, &T) -> Ordering {}
261
262impl<T, F> PartialOrd for ComparableWrapper<T, F>
263where
264    F: Fn(&T, &T) -> Ordering,
265{
266    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
267        Some(self.cmp(other))
268    }
269}
270
271impl<T, F> Ord for ComparableWrapper<T, F>
272where
273    F: Fn(&T, &T) -> Ordering,
274{
275    fn cmp(&self, other: &Self) -> Ordering {
276        // Safety: comparator pointer is valid for heap lifetime
277        let cmp = unsafe { &*self.comparator };
278        let result = cmp(&self.value, &other.value);
279        
280        if self.inverted {
281            result.reverse()
282        } else {
283            result
284        }
285    }
286}
287
288impl<T, F> TopKHeap<T, F>
289where
290    F: Fn(&T, &T) -> Ordering,
291{
292    /// Create a new top-K heap
293    ///
294    /// - `k`: Number of elements to keep
295    /// - `comparator`: Defines the desired output order (Less = should come first)
296    /// - `want_smallest`: If true, keep the K elements that compare as smallest
297    pub fn new(k: usize, comparator: F, want_smallest: bool) -> Self {
298        Self {
299            heap: BinaryHeap::with_capacity(k + 1),
300            k,
301            comparator,
302            want_smallest,
303        }
304    }
305
306    /// Push an element into the heap
307    ///
308    /// Complexity: O(log K)
309    pub fn push(&mut self, value: T) {
310        if self.k == 0 {
311            return;
312        }
313
314        let wrapper = ComparableWrapper {
315            value,
316            comparator: &self.comparator as *const F,
317            // For smallest K, we want a max-heap (so we can evict the largest)
318            // For largest K, we want a min-heap (so we can evict the smallest)
319            inverted: !self.want_smallest,
320        };
321
322        if self.heap.len() < self.k {
323            self.heap.push(wrapper);
324        } else if let Some(top) = self.heap.peek() {
325            // Check if new element should replace the current boundary
326            let should_replace = if self.want_smallest {
327                // For smallest K: replace if new < current max
328                (self.comparator)(&wrapper.value, &top.value) == Ordering::Less
329            } else {
330                // For largest K: replace if new > current min
331                (self.comparator)(&wrapper.value, &top.value) == Ordering::Greater
332            };
333
334            if should_replace {
335                self.heap.pop();
336                self.heap.push(wrapper);
337            }
338        }
339    }
340
341    /// Get the current boundary value
342    ///
343    /// This is the value that new elements must beat to be included.
344    pub fn threshold(&self) -> Option<&T> {
345        self.heap.peek().map(|w| &w.value)
346    }
347
348    /// Check if the heap is at capacity
349    pub fn is_full(&self) -> bool {
350        self.heap.len() >= self.k
351    }
352
353    /// Drain the heap into a sorted vector
354    ///
355    /// Complexity: O(K log K)
356    pub fn into_sorted_vec(self) -> Vec<T> {
357        let mut values: Vec<_> = self.heap.into_iter().map(|w| w.value).collect();
358        if self.want_smallest {
359            values.sort_by(&self.comparator);
360        } else {
361            // For largest K, sort in descending order
362            values.sort_by(|a, b| (&self.comparator)(b, a));
363        }
364        values
365    }
366
367    /// Current number of elements
368    pub fn len(&self) -> usize {
369        self.heap.len()
370    }
371
372    /// Check if empty
373    pub fn is_empty(&self) -> bool {
374        self.heap.is_empty()
375    }
376}
377
378// ============================================================================
379// OrderByLimitExecutor - The Fixed Implementation
380// ============================================================================
381
382/// Strategy for ORDER BY + LIMIT execution
383#[derive(Debug, Clone, Copy, PartialEq, Eq)]
384pub enum ExecutionStrategy {
385    /// Use index pushdown (storage provides ordered results)
386    IndexPushdown,
387    /// Streaming top-K with heap
388    StreamingTopK,
389    /// Full sort then limit
390    FullSort,
391}
392
393impl ExecutionStrategy {
394    /// Choose the optimal strategy
395    pub fn choose(
396        has_matching_index: bool,
397        estimated_rows: Option<usize>,
398        limit: usize,
399    ) -> Self {
400        // If we have a matching index, always use pushdown
401        if has_matching_index {
402            return ExecutionStrategy::IndexPushdown;
403        }
404
405        // If we don't know the row count, use streaming (safe default)
406        let n = match estimated_rows {
407            Some(n) if n > 0 => n,
408            _ => return ExecutionStrategy::StreamingTopK,
409        };
410
411        // Heuristic: streaming is better when K < sqrt(N) or K is "small"
412        // Break-even is approximately when K * log(K) < N, but we use simpler heuristic
413        let k = limit;
414        
415        if k <= 100 {
416            // Small K: streaming is almost always better
417            ExecutionStrategy::StreamingTopK
418        } else if (k as f64) < (n as f64).sqrt() {
419            // K < sqrt(N): streaming wins
420            ExecutionStrategy::StreamingTopK
421        } else {
422            // Large K relative to N: full sort may be better
423            // (avoids heap overhead when keeping most of the data)
424            ExecutionStrategy::FullSort
425        }
426    }
427
428    /// Get estimated complexity description
429    pub fn complexity(&self, n: usize, k: usize) -> String {
430        match self {
431            ExecutionStrategy::IndexPushdown => {
432                format!("O(log {} + {}) = O({})", n, k, (n as f64).log2() as usize + k)
433            }
434            ExecutionStrategy::StreamingTopK => {
435                let log_k = (k as f64).log2().max(1.0) as usize;
436                format!("O({} * log {}) ≈ O({})", n, k, n * log_k)
437            }
438            ExecutionStrategy::FullSort => {
439                let log_n = (n as f64).log2().max(1.0) as usize;
440                format!("O({} * log {}) ≈ O({})", n, n, n * log_n)
441            }
442        }
443    }
444}
445
446/// Result statistics from ORDER BY + LIMIT execution
447#[derive(Debug, Clone, Default)]
448pub struct OrderByLimitStats {
449    /// Strategy used
450    pub strategy: Option<ExecutionStrategy>,
451    /// Input rows processed
452    pub input_rows: usize,
453    /// Output rows produced
454    pub output_rows: usize,
455    /// Heap operations performed
456    pub heap_operations: usize,
457    /// Comparisons performed
458    pub comparisons: usize,
459    /// Rows skipped by offset
460    pub offset_skipped: usize,
461}
462
463/// Executor for ORDER BY + LIMIT queries
464///
465/// This is the CORRECT implementation that ensures semantic equivalence
466/// with `ORDER BY ... LIMIT K OFFSET M`.
467pub struct OrderByLimitExecutor {
468    /// ORDER BY specification
469    order_by: OrderBySpec,
470    /// LIMIT value
471    limit: usize,
472    /// OFFSET value
473    offset: usize,
474    /// Column names for resolution
475    column_names: Vec<String>,
476    /// Execution strategy
477    strategy: ExecutionStrategy,
478}
479
480impl OrderByLimitExecutor {
481    /// Create a new executor
482    pub fn new(
483        order_by: OrderBySpec,
484        limit: usize,
485        offset: usize,
486        column_names: Vec<String>,
487        has_matching_index: bool,
488        estimated_rows: Option<usize>,
489    ) -> Self {
490        // For OFFSET, we need to fetch limit + offset rows
491        let effective_limit = limit.saturating_add(offset);
492        let strategy = ExecutionStrategy::choose(has_matching_index, estimated_rows, effective_limit);
493        
494        Self {
495            order_by,
496            limit,
497            offset,
498            column_names,
499            strategy,
500        }
501    }
502
503    /// Get the chosen strategy
504    pub fn strategy(&self) -> ExecutionStrategy {
505        self.strategy
506    }
507
508    /// Execute on an iterator of rows
509    ///
510    /// This is the main entry point. It:
511    /// 1. Applies the chosen strategy to get top (limit + offset) rows
512    /// 2. Applies offset
513    /// 3. Returns the final result
514    pub fn execute<I>(&self, rows: I) -> (Vec<SochRow>, OrderByLimitStats)
515    where
516        I: Iterator<Item = SochRow>,
517    {
518        let mut stats = OrderByLimitStats {
519            strategy: Some(self.strategy),
520            ..Default::default()
521        };
522
523        let effective_limit = self.limit.saturating_add(self.offset);
524        
525        let result = match self.strategy {
526            ExecutionStrategy::IndexPushdown => {
527                // With index, just take the first limit+offset rows
528                // (caller must provide rows in correct order!)
529                let collected: Vec<_> = rows.take(effective_limit).collect();
530                stats.input_rows = collected.len();
531                collected
532            }
533            ExecutionStrategy::StreamingTopK => {
534                self.execute_streaming(rows, effective_limit, &mut stats)
535            }
536            ExecutionStrategy::FullSort => {
537                self.execute_full_sort(rows, effective_limit, &mut stats)
538            }
539        };
540
541        // Apply offset
542        let final_result: Vec<_> = result
543            .into_iter()
544            .skip(self.offset)
545            .take(self.limit)
546            .collect();
547        
548        stats.offset_skipped = self.offset.min(stats.input_rows);
549        stats.output_rows = final_result.len();
550        
551        (final_result, stats)
552    }
553
554    /// Execute using streaming top-K algorithm
555    fn execute_streaming<I>(
556        &self,
557        rows: I,
558        k: usize,
559        stats: &mut OrderByLimitStats,
560    ) -> Vec<SochRow>
561    where
562        I: Iterator<Item = SochRow>,
563    {
564        let comparator = self.order_by.comparator(&self.column_names);
565        
566        // We want the K smallest according to the comparator
567        let mut heap = TopKHeap::new(k, comparator, true);
568        
569        for row in rows {
570            stats.input_rows += 1;
571            stats.heap_operations += 1;
572            heap.push(row);
573        }
574
575        heap.into_sorted_vec()
576    }
577
578    /// Execute using full sort
579    fn execute_full_sort<I>(
580        &self,
581        rows: I,
582        k: usize,
583        stats: &mut OrderByLimitStats,
584    ) -> Vec<SochRow>
585    where
586        I: Iterator<Item = SochRow>,
587    {
588        let comparator = self.order_by.comparator(&self.column_names);
589        
590        let mut all_rows: Vec<_> = rows.collect();
591        stats.input_rows = all_rows.len();
592        
593        all_rows.sort_by(&comparator);
594        
595        all_rows.truncate(k);
596        all_rows
597    }
598}
599
600// ============================================================================
601// IndexAwareTopK - For Index Pushdown with Partial Match
602// ============================================================================
603
604/// Top-K with index awareness
605///
606/// When the index only partially matches the ORDER BY (e.g., index on col1
607/// but ORDER BY col1, col2), we can still use the index for the first column
608/// and apply top-K for the rest.
609pub struct IndexAwareTopK<T, F>
610where
611    F: Fn(&T, &T) -> Ordering,
612{
613    /// Current batch of items with same index key
614    current_batch: Vec<T>,
615    /// Best items seen so far
616    result: Vec<T>,
617    /// Maximum items to keep
618    k: usize,
619    /// Secondary comparator (for columns not in index)
620    secondary_cmp: F,
621}
622
623impl<T, F> IndexAwareTopK<T, F>
624where
625    F: Fn(&T, &T) -> Ordering,
626{
627    /// Create new index-aware top-K
628    pub fn new(k: usize, secondary_cmp: F) -> Self {
629        Self {
630            current_batch: Vec::new(),
631            result: Vec::with_capacity(k),
632            k,
633            secondary_cmp,
634        }
635    }
636
637    /// Process an item from an index-ordered scan
638    ///
639    /// Items must be provided in index order. When the index key changes,
640    /// the previous batch is finalized.
641    pub fn push(&mut self, item: T, same_index_key_as_previous: bool) {
642        if !same_index_key_as_previous {
643            self.finalize_batch();
644        }
645        
646        self.current_batch.push(item);
647    }
648
649    /// Finalize the current batch (sort by secondary key)
650    fn finalize_batch(&mut self) {
651        if self.current_batch.is_empty() {
652            return;
653        }
654
655        // Sort batch by secondary key
656        self.current_batch.sort_by(&self.secondary_cmp);
657
658        // Take as many as we need
659        let remaining = self.k.saturating_sub(self.result.len());
660        let to_take = remaining.min(self.current_batch.len());
661        
662        self.result.extend(self.current_batch.drain(..to_take));
663        self.current_batch.clear();
664    }
665
666    /// Check if we have enough results
667    pub fn is_complete(&self) -> bool {
668        self.result.len() >= self.k
669    }
670
671    /// Drain into final result
672    pub fn into_result(mut self) -> Vec<T> {
673        self.finalize_batch();
674        self.result
675    }
676}
677
678// ============================================================================
679// SingleColumnTopK - Optimized for Single Column
680// ============================================================================
681
682/// Optimized top-K for single-column ORDER BY
683///
684/// Avoids the overhead of multi-column comparison when only one column is involved.
685pub struct SingleColumnTopK {
686    /// The heap
687    heap: BinaryHeap<SingleColEntry>,
688    /// K value
689    k: usize,
690    /// Column index
691    col_idx: usize,
692    /// Whether ascending order
693    ascending: bool,
694}
695
696struct SingleColEntry {
697    row: SochRow,
698    key: OrderableValue,
699    ascending: bool,
700}
701
702/// Wrapper to make SochValue orderable
703#[derive(Clone)]
704enum OrderableValue {
705    Int(i64),
706    UInt(u64),
707    Float(f64),
708    Text(String),
709    Bool(bool),
710    Null,
711}
712
713impl From<&SochValue> for OrderableValue {
714    fn from(v: &SochValue) -> Self {
715        match v {
716            SochValue::Int(i) => OrderableValue::Int(*i),
717            SochValue::UInt(u) => OrderableValue::UInt(*u),
718            SochValue::Float(f) => OrderableValue::Float(*f),
719            SochValue::Text(s) => OrderableValue::Text(s.clone()),
720            SochValue::Bool(b) => OrderableValue::Bool(*b),
721            SochValue::Null => OrderableValue::Null,
722            _ => OrderableValue::Null,
723        }
724    }
725}
726
727impl PartialEq for OrderableValue {
728    fn eq(&self, other: &Self) -> bool {
729        self.cmp(other) == Ordering::Equal
730    }
731}
732
733impl Eq for OrderableValue {}
734
735impl PartialOrd for OrderableValue {
736    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
737        Some(self.cmp(other))
738    }
739}
740
741impl Ord for OrderableValue {
742    fn cmp(&self, other: &Self) -> Ordering {
743        match (self, other) {
744            (OrderableValue::Null, OrderableValue::Null) => Ordering::Equal,
745            (OrderableValue::Null, _) => Ordering::Greater, // NULLS LAST
746            (_, OrderableValue::Null) => Ordering::Less,
747            (OrderableValue::Int(a), OrderableValue::Int(b)) => a.cmp(b),
748            (OrderableValue::UInt(a), OrderableValue::UInt(b)) => a.cmp(b),
749            (OrderableValue::Float(a), OrderableValue::Float(b)) => {
750                a.partial_cmp(b).unwrap_or(Ordering::Equal)
751            }
752            (OrderableValue::Text(a), OrderableValue::Text(b)) => a.cmp(b),
753            (OrderableValue::Bool(a), OrderableValue::Bool(b)) => a.cmp(b),
754            _ => Ordering::Equal, // Incompatible types
755        }
756    }
757}
758
759impl PartialEq for SingleColEntry {
760    fn eq(&self, other: &Self) -> bool {
761        self.key == other.key
762    }
763}
764
765impl Eq for SingleColEntry {}
766
767impl PartialOrd for SingleColEntry {
768    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
769        Some(self.cmp(other))
770    }
771}
772
773impl Ord for SingleColEntry {
774    fn cmp(&self, other: &Self) -> Ordering {
775        let base = self.key.cmp(&other.key);
776        
777        // For ascending + max-heap, we want to evict the largest,
778        // so we don't invert. For descending, we invert.
779        if self.ascending {
780            base
781        } else {
782            base.reverse()
783        }
784    }
785}
786
787impl SingleColumnTopK {
788    /// Create new single-column top-K
789    pub fn new(k: usize, col_idx: usize, ascending: bool) -> Self {
790        Self {
791            heap: BinaryHeap::with_capacity(k + 1),
792            k,
793            col_idx,
794            ascending,
795        }
796    }
797
798    /// Push a row
799    pub fn push(&mut self, row: SochRow) {
800        if self.k == 0 {
801            return;
802        }
803
804        let key = row.values
805            .get(self.col_idx)
806            .map(OrderableValue::from)
807            .unwrap_or(OrderableValue::Null);
808
809        let entry = SingleColEntry {
810            row,
811            key,
812            ascending: self.ascending,
813        };
814
815        if self.heap.len() < self.k {
816            self.heap.push(entry);
817        } else if let Some(top) = self.heap.peek() {
818            let should_replace = if self.ascending {
819                entry.key < top.key
820            } else {
821                entry.key > top.key
822            };
823
824            if should_replace {
825                self.heap.pop();
826                self.heap.push(entry);
827            }
828        }
829    }
830
831    /// Drain into sorted vector
832    pub fn into_sorted_vec(self) -> Vec<SochRow> {
833        let mut entries: Vec<_> = self.heap.into_iter().collect();
834        
835        entries.sort_by(|a, b| {
836            let base = a.key.cmp(&b.key);
837            if self.ascending { base } else { base.reverse() }
838        });
839        
840        entries.into_iter().map(|e| e.row).collect()
841    }
842
843    /// Current count
844    pub fn len(&self) -> usize {
845        self.heap.len()
846    }
847
848    /// Is empty
849    pub fn is_empty(&self) -> bool {
850        self.heap.is_empty()
851    }
852}
853
854// ============================================================================
855// Tests
856// ============================================================================
857
858#[cfg(test)]
859mod tests {
860    use super::*;
861
862    fn make_row(values: Vec<SochValue>) -> SochRow {
863        SochRow::new(values)
864    }
865
866    #[test]
867    fn test_strategy_selection() {
868        // With index: always pushdown
869        assert_eq!(
870            ExecutionStrategy::choose(true, Some(1_000_000), 10),
871            ExecutionStrategy::IndexPushdown
872        );
873
874        // Small K without index: streaming
875        assert_eq!(
876            ExecutionStrategy::choose(false, Some(1_000_000), 10),
877            ExecutionStrategy::StreamingTopK
878        );
879
880        // Large K relative to N (K > sqrt(N) and K > 100): full sort
881        // For N=1000, sqrt(N) ≈ 31.6, so K=500 > sqrt(1000) and K > 100
882        assert_eq!(
883            ExecutionStrategy::choose(false, Some(1000), 500),
884            ExecutionStrategy::FullSort
885        );
886        
887        // K <= 100: always streaming even if K > sqrt(N)
888        assert_eq!(
889            ExecutionStrategy::choose(false, Some(100), 90),
890            ExecutionStrategy::StreamingTopK
891        );
892    }
893
894    #[test]
895    fn test_order_by_spec_comparator() {
896        let spec = OrderBySpec::single(
897            ColumnRef::Name("priority".to_string()),
898            SortDirection::Ascending,
899        );
900        
901        let columns = vec!["id".to_string(), "priority".to_string(), "name".to_string()];
902        let cmp = spec.comparator(&columns);
903        
904        let row1 = make_row(vec![
905            SochValue::Int(1),
906            SochValue::Int(5),
907            SochValue::Text("A".to_string()),
908        ]);
909        let row2 = make_row(vec![
910            SochValue::Int(2),
911            SochValue::Int(3),
912            SochValue::Text("B".to_string()),
913        ]);
914        
915        // row2 has lower priority, should come first in ASC
916        assert_eq!(cmp(&row2, &row1), Ordering::Less);
917    }
918
919    #[test]
920    fn test_topk_heap_ascending() {
921        let cmp = |a: &i32, b: &i32| a.cmp(b);
922        let mut heap = TopKHeap::new(3, cmp, true);
923        
924        for i in [5, 2, 8, 1, 9, 3, 7, 4, 6] {
925            heap.push(i);
926        }
927        
928        let result = heap.into_sorted_vec();
929        assert_eq!(result, vec![1, 2, 3]);
930    }
931
932    #[test]
933    fn test_topk_heap_descending() {
934        let cmp = |a: &i32, b: &i32| a.cmp(b);
935        let mut heap = TopKHeap::new(3, cmp, false);
936        
937        for i in [5, 2, 8, 1, 9, 3, 7, 4, 6] {
938            heap.push(i);
939        }
940        
941        let result = heap.into_sorted_vec();
942        // Descending order
943        assert_eq!(result, vec![9, 8, 7]);
944    }
945
946    #[test]
947    fn test_executor_streaming() {
948        let columns = vec!["priority".to_string(), "name".to_string()];
949        let order_by = OrderBySpec::single(
950            ColumnRef::Name("priority".to_string()),
951            SortDirection::Ascending,
952        );
953        
954        let executor = OrderByLimitExecutor::new(
955            order_by,
956            3,      // limit
957            0,      // offset
958            columns.clone(),
959            false,  // no index
960            Some(10),
961        );
962        
963        // Create rows with priorities: 5, 3, 1, 4, 2
964        let rows = vec![
965            make_row(vec![SochValue::Int(5), SochValue::Text("E".to_string())]),
966            make_row(vec![SochValue::Int(3), SochValue::Text("C".to_string())]),
967            make_row(vec![SochValue::Int(1), SochValue::Text("A".to_string())]),
968            make_row(vec![SochValue::Int(4), SochValue::Text("D".to_string())]),
969            make_row(vec![SochValue::Int(2), SochValue::Text("B".to_string())]),
970        ];
971        
972        let (result, stats) = executor.execute(rows.into_iter());
973        
974        // Should get priorities 1, 2, 3 (the 3 smallest)
975        assert_eq!(result.len(), 3);
976        assert_eq!(result[0].values[0], SochValue::Int(1));
977        assert_eq!(result[1].values[0], SochValue::Int(2));
978        assert_eq!(result[2].values[0], SochValue::Int(3));
979        
980        assert_eq!(stats.input_rows, 5);
981        assert_eq!(stats.output_rows, 3);
982    }
983
984    #[test]
985    fn test_executor_with_offset() {
986        let columns = vec!["priority".to_string()];
987        let order_by = OrderBySpec::single(
988            ColumnRef::Name("priority".to_string()),
989            SortDirection::Ascending,
990        );
991        
992        let executor = OrderByLimitExecutor::new(
993            order_by,
994            2,      // limit
995            2,      // offset
996            columns,
997            false,
998            Some(10),
999        );
1000        
1001        // Priorities: 5, 3, 1, 4, 2 → sorted: 1, 2, 3, 4, 5
1002        // With offset 2, limit 2: should get 3, 4
1003        let rows = vec![
1004            make_row(vec![SochValue::Int(5)]),
1005            make_row(vec![SochValue::Int(3)]),
1006            make_row(vec![SochValue::Int(1)]),
1007            make_row(vec![SochValue::Int(4)]),
1008            make_row(vec![SochValue::Int(2)]),
1009        ];
1010        
1011        let (result, _) = executor.execute(rows.into_iter());
1012        
1013        assert_eq!(result.len(), 2);
1014        assert_eq!(result[0].values[0], SochValue::Int(3));
1015        assert_eq!(result[1].values[0], SochValue::Int(4));
1016    }
1017
1018    #[test]
1019    fn test_single_column_topk() {
1020        let mut topk = SingleColumnTopK::new(3, 0, true); // Column 0, ascending
1021        
1022        topk.push(make_row(vec![SochValue::Int(5)]));
1023        topk.push(make_row(vec![SochValue::Int(3)]));
1024        topk.push(make_row(vec![SochValue::Int(1)]));
1025        topk.push(make_row(vec![SochValue::Int(4)]));
1026        topk.push(make_row(vec![SochValue::Int(2)]));
1027        
1028        let result = topk.into_sorted_vec();
1029        
1030        assert_eq!(result.len(), 3);
1031        assert_eq!(result[0].values[0], SochValue::Int(1));
1032        assert_eq!(result[1].values[0], SochValue::Int(2));
1033        assert_eq!(result[2].values[0], SochValue::Int(3));
1034    }
1035
1036    #[test]
1037    fn test_correctness_vs_buggy_implementation() {
1038        // This test demonstrates the bug in the old implementation
1039        let columns = vec!["priority".to_string()];
1040        let order_by = OrderBySpec::single(
1041            ColumnRef::Name("priority".to_string()),
1042            SortDirection::Ascending,
1043        );
1044        
1045        // Rows in scan order: [5, 2, 8, 1, 9, 3]
1046        let rows: Vec<_> = [5, 2, 8, 1, 9, 3]
1047            .iter()
1048            .map(|&p| make_row(vec![SochValue::Int(p)]))
1049            .collect();
1050        
1051        // BUGGY: collect first 3, then sort
1052        let buggy: Vec<_> = rows.iter().take(3).cloned().collect();
1053        // buggy contains: [5, 2, 8] → sorted: [2, 5, 8]
1054        // Bug: would return priority 2 as "smallest" but actual min is 1!
1055        
1056        // CORRECT: streaming top-K
1057        let executor = OrderByLimitExecutor::new(
1058            order_by,
1059            3,
1060            0,
1061            columns,
1062            false,
1063            Some(6),
1064        );
1065        let (correct, _) = executor.execute(rows.into_iter());
1066        
1067        // Correct result: [1, 2, 3]
1068        assert_eq!(correct[0].values[0], SochValue::Int(1));
1069        assert_eq!(correct[1].values[0], SochValue::Int(2));
1070        assert_eq!(correct[2].values[0], SochValue::Int(3));
1071    }
1072
1073    #[test]
1074    fn test_multi_column_order_by() {
1075        let columns = vec!["priority".to_string(), "created_at".to_string()];
1076        
1077        let order_by = OrderBySpec::single(
1078            ColumnRef::Name("priority".to_string()),
1079            SortDirection::Ascending,
1080        ).then_by(
1081            ColumnRef::Name("created_at".to_string()),
1082            SortDirection::Descending,
1083        );
1084        
1085        let executor = OrderByLimitExecutor::new(
1086            order_by,
1087            3,
1088            0,
1089            columns,
1090            false,
1091            Some(5),
1092        );
1093        
1094        // Rows: (priority, created_at)
1095        let rows = vec![
1096            make_row(vec![SochValue::Int(1), SochValue::Int(100)]),
1097            make_row(vec![SochValue::Int(1), SochValue::Int(200)]), // Same priority, later
1098            make_row(vec![SochValue::Int(2), SochValue::Int(150)]),
1099            make_row(vec![SochValue::Int(1), SochValue::Int(150)]),
1100            make_row(vec![SochValue::Int(3), SochValue::Int(100)]),
1101        ];
1102        
1103        let (result, _) = executor.execute(rows.into_iter());
1104        
1105        // Should be: priority=1 rows ordered by created_at DESC
1106        // So: (1, 200), (1, 150), (1, 100)
1107        assert_eq!(result.len(), 3);
1108        assert_eq!(result[0].values[0], SochValue::Int(1));
1109        assert_eq!(result[0].values[1], SochValue::Int(200));
1110        assert_eq!(result[1].values[0], SochValue::Int(1));
1111        assert_eq!(result[1].values[1], SochValue::Int(150));
1112        assert_eq!(result[2].values[0], SochValue::Int(1));
1113        assert_eq!(result[2].values[1], SochValue::Int(100));
1114    }
1115}