Skip to main content

kimberlite_query/
executor.rs

1//! Query executor: executes query plans against a projection store.
2
3#![allow(clippy::ref_option)]
4#![allow(clippy::trivially_copy_pass_by_ref)]
5#![allow(clippy::items_after_statements)]
6
7use std::cmp::Ordering;
8use std::ops::Bound;
9
10// ============================================================================
11// Query Execution Constants
12// ============================================================================
13
14/// Scan buffer multiplier when ORDER BY is present (needs extra buffer for sorting).
15///
16/// **Rationale**: Client-side sorting requires loading all candidate rows before
17/// applying LIMIT. We over-fetch by 10x to handle common cases where the ORDER BY
18/// columns have high cardinality, while still bounding memory usage.
19const SCAN_LIMIT_MULTIPLIER_WITH_SORT: usize = 10;
20
21/// Scan buffer multiplier without ORDER BY (minimal buffering).
22///
23/// **Rationale**: Without sorting, we can stream results and apply LIMIT incrementally.
24/// We fetch 2x the limit to handle edge cases with deleted rows or MVCC conflicts.
25const SCAN_LIMIT_MULTIPLIER_NO_SORT: usize = 2;
26
27/// Default scan limit when no LIMIT clause is specified.
28///
29/// **Rationale**: Prevents unbounded memory allocation for large tables.
30/// Set to 10K based on:
31/// - Avg row size ~1KB → ~10MB memory footprint
32/// - p99 query latency < 50ms for 10K row scan
33/// - Sufficient for most analytical queries
34const DEFAULT_SCAN_LIMIT: usize = 10_000;
35
36/// Maximum number of aggregates per query.
37///
38/// **Rationale**: Prevents `DoS` via memory exhaustion.
39/// Each aggregate maintains state (sum, count, min, max) ≈ 64 bytes per group.
40/// 100 aggregates × 1000 groups = ~6.4MB state, which is reasonable.
41const MAX_AGGREGATES_PER_QUERY: usize = 100;
42
43/// Maximum number of rows produced by a JOIN before aborting.
44///
45/// **Rationale**: A cross-join of two 1K-row tables yields 1M output rows.
46/// Without a bound, an adversary can trigger unbounded memory allocation.
47/// 1M rows × ~100 bytes/row ≈ 100 MB — a reasonable ceiling.
48const MAX_JOIN_OUTPUT_ROWS: usize = 1_000_000;
49
50/// Maximum number of distinct groups in a GROUP BY aggregate.
51///
52/// **Rationale**: Without a bound, a high-cardinality column causes
53/// unbounded HashMap growth. 100K groups × ~200 bytes/group ≈ 20 MB.
54const MAX_GROUP_COUNT: usize = 100_000;
55
56use bytes::Bytes;
57use kimberlite_store::{Key, ProjectionStore, TableId};
58use kimberlite_types::Offset;
59
60use crate::error::{QueryError, Result};
61use crate::key_encoder::successor_key;
62use crate::plan::{QueryPlan, ScanOrder, SortSpec};
63use crate::schema::{ColumnName, TableDef};
64use crate::value::Value;
65
66/// Result of executing a query.
67#[derive(Debug, Clone)]
68pub struct QueryResult {
69    /// Column names in result order.
70    pub columns: Vec<ColumnName>,
71    /// Result rows.
72    pub rows: Vec<Row>,
73}
74
75impl QueryResult {
76    /// Creates an empty result with the given columns.
77    pub fn empty(columns: Vec<ColumnName>) -> Self {
78        Self {
79            columns,
80            rows: vec![],
81        }
82    }
83
84    /// Returns the number of rows.
85    pub fn len(&self) -> usize {
86        self.rows.len()
87    }
88
89    /// Returns true if there are no rows.
90    pub fn is_empty(&self) -> bool {
91        self.rows.is_empty()
92    }
93}
94
95/// A single result row.
96pub type Row = Vec<Value>;
97
98/// Executes an index scan query.
99#[allow(clippy::too_many_arguments)]
100fn execute_index_scan<S: ProjectionStore>(
101    store: &mut S,
102    metadata: &crate::plan::TableMetadata,
103    index_id: u64,
104    start: &Bound<Key>,
105    end: &Bound<Key>,
106    filter: &Option<crate::plan::Filter>,
107    limit: &Option<usize>,
108    order: &ScanOrder,
109    order_by: &Option<crate::plan::SortSpec>,
110    columns: &[usize],
111    column_names: &[ColumnName],
112    position: Option<Offset>,
113) -> Result<QueryResult> {
114    let (start_key, end_key) = bounds_to_range(start, end);
115
116    // Calculate scan limit based on whether client-side sorting is needed
117    let scan_limit = if order_by.is_some() {
118        limit
119            .map(|l| l.saturating_mul(SCAN_LIMIT_MULTIPLIER_WITH_SORT))
120            .unwrap_or(DEFAULT_SCAN_LIMIT)
121    } else {
122        limit
123            .map(|l| l.saturating_mul(SCAN_LIMIT_MULTIPLIER_NO_SORT))
124            .unwrap_or(DEFAULT_SCAN_LIMIT)
125    };
126
127    // Postcondition: scan limit must be positive
128    debug_assert!(scan_limit > 0, "scan_limit must be positive");
129
130    // Calculate index table ID using hash to avoid overflow
131    use std::collections::hash_map::DefaultHasher;
132    use std::hash::{Hash, Hasher};
133
134    let mut hasher = DefaultHasher::new();
135    metadata.table_id.as_u64().hash(&mut hasher);
136    index_id.hash(&mut hasher);
137    let index_table_id = TableId::new(hasher.finish());
138
139    // Scan the index table to get composite keys
140    let index_pairs = match position {
141        Some(pos) => store.scan_at(index_table_id, start_key..end_key, scan_limit, pos)?,
142        None => store.scan(index_table_id, start_key..end_key, scan_limit)?,
143    };
144
145    let mut full_rows = Vec::new();
146    let index_iter: Box<dyn Iterator<Item = &(Key, Bytes)>> = match order {
147        ScanOrder::Ascending => Box::new(index_pairs.iter()),
148        ScanOrder::Descending => Box::new(index_pairs.iter().rev()),
149    };
150
151    for (index_key, _) in index_iter {
152        // Extract primary key from the composite index key
153        let pk_key = extract_pk_from_index_key(index_key, metadata);
154
155        // Fetch the actual row from the base table
156        let bytes_opt = match position {
157            Some(pos) => store.get_at(metadata.table_id, &pk_key, pos)?,
158            None => store.get(metadata.table_id, &pk_key)?,
159        };
160        if let Some(bytes) = bytes_opt {
161            let full_row = decode_row(&bytes, metadata)?;
162
163            // Apply filter
164            if let Some(f) = filter {
165                if !f.matches(&full_row) {
166                    continue;
167                }
168            }
169
170            full_rows.push(full_row);
171
172            // When client-side sorting is needed, don't apply limit during scan
173            if order_by.is_none() {
174                if let Some(lim) = limit {
175                    if full_rows.len() >= *lim {
176                        break;
177                    }
178                }
179            }
180        }
181    }
182
183    // Apply client-side sorting if needed (on full rows before projection)
184    if let Some(sort_spec) = order_by {
185        sort_rows(&mut full_rows, sort_spec);
186    }
187
188    // Apply limit after sorting
189    if let Some(lim) = limit {
190        full_rows.truncate(*lim);
191    }
192
193    // Project columns after sorting and limiting
194    let rows: Vec<Row> = full_rows
195        .iter()
196        .map(|full_row| project_row(full_row, columns))
197        .collect();
198
199    Ok(QueryResult {
200        columns: column_names.to_vec(),
201        rows,
202    })
203}
204
205/// Executes a table scan query.
206#[allow(clippy::too_many_arguments)]
207fn execute_table_scan<S: ProjectionStore>(
208    store: &mut S,
209    metadata: &crate::plan::TableMetadata,
210    filter: &Option<crate::plan::Filter>,
211    limit: &Option<usize>,
212    order: &Option<SortSpec>,
213    columns: &[usize],
214    column_names: &[ColumnName],
215    position: Option<Offset>,
216) -> Result<QueryResult> {
217    // Scan entire table
218    let scan_limit = limit.map(|l| l * 10).unwrap_or(100_000);
219    let pairs = match position {
220        Some(pos) => store.scan_at(metadata.table_id, Key::min()..Key::max(), scan_limit, pos)?,
221        None => store.scan(metadata.table_id, Key::min()..Key::max(), scan_limit)?,
222    };
223
224    let mut full_rows = Vec::new();
225
226    for (_, bytes) in &pairs {
227        let full_row = decode_row(bytes, metadata)?;
228
229        // Apply filter
230        if let Some(f) = filter {
231            if !f.matches(&full_row) {
232                continue;
233            }
234        }
235
236        full_rows.push(full_row);
237    }
238
239    // Apply sort on full rows (before projection)
240    if let Some(sort_spec) = order {
241        sort_rows(&mut full_rows, sort_spec);
242    }
243
244    // Apply limit
245    if let Some(lim) = limit {
246        full_rows.truncate(*lim);
247    }
248
249    // Project columns after sorting and limiting
250    let rows: Vec<Row> = full_rows
251        .iter()
252        .map(|full_row| project_row(full_row, columns))
253        .collect();
254
255    Ok(QueryResult {
256        columns: column_names.to_vec(),
257        rows,
258    })
259}
260
261/// Executes a range scan query.
262#[allow(clippy::too_many_arguments)]
263fn execute_range_scan<S: ProjectionStore>(
264    store: &mut S,
265    metadata: &crate::plan::TableMetadata,
266    start: &Bound<Key>,
267    end: &Bound<Key>,
268    filter: &Option<crate::plan::Filter>,
269    limit: &Option<usize>,
270    order: &ScanOrder,
271    order_by: &Option<crate::plan::SortSpec>,
272    columns: &[usize],
273    column_names: &[ColumnName],
274    position: Option<Offset>,
275) -> Result<QueryResult> {
276    let (start_key, end_key) = bounds_to_range(start, end);
277
278    // Calculate scan limit based on whether client-side sorting is needed
279    let scan_limit = if order_by.is_some() {
280        limit
281            .map(|l| l.saturating_mul(SCAN_LIMIT_MULTIPLIER_WITH_SORT))
282            .unwrap_or(DEFAULT_SCAN_LIMIT)
283    } else {
284        limit
285            .map(|l| l.saturating_mul(SCAN_LIMIT_MULTIPLIER_NO_SORT))
286            .unwrap_or(DEFAULT_SCAN_LIMIT)
287    };
288
289    // Postcondition: scan limit must be positive
290    debug_assert!(scan_limit > 0, "scan_limit must be positive");
291
292    let pairs = match position {
293        Some(pos) => store.scan_at(metadata.table_id, start_key..end_key, scan_limit, pos)?,
294        None => store.scan(metadata.table_id, start_key..end_key, scan_limit)?,
295    };
296
297    let mut full_rows = Vec::new();
298    let row_iter: Box<dyn Iterator<Item = &(Key, Bytes)>> = match order {
299        ScanOrder::Ascending => Box::new(pairs.iter()),
300        ScanOrder::Descending => Box::new(pairs.iter().rev()),
301    };
302
303    for (_, bytes) in row_iter {
304        let full_row = decode_row(bytes, metadata)?;
305
306        // Apply filter
307        if let Some(f) = filter {
308            if !f.matches(&full_row) {
309                continue;
310            }
311        }
312
313        full_rows.push(full_row);
314
315        // When client-side sorting is needed, don't apply limit during scan
316        if order_by.is_none() {
317            if let Some(lim) = limit {
318                if full_rows.len() >= *lim {
319                    break;
320                }
321            }
322        }
323    }
324
325    // Apply client-side sorting if needed (on full rows before projection)
326    if let Some(sort_spec) = order_by {
327        sort_rows(&mut full_rows, sort_spec);
328    }
329
330    // Apply limit after sorting
331    if let Some(lim) = limit {
332        full_rows.truncate(*lim);
333    }
334
335    // Project columns after sorting and limiting
336    let rows: Vec<Row> = full_rows
337        .iter()
338        .map(|full_row| project_row(full_row, columns))
339        .collect();
340
341    Ok(QueryResult {
342        columns: column_names.to_vec(),
343        rows,
344    })
345}
346
347/// Executes a point lookup query.
348fn execute_point_lookup<S: ProjectionStore>(
349    store: &mut S,
350    metadata: &crate::plan::TableMetadata,
351    key: &Key,
352    columns: &[usize],
353    column_names: &[ColumnName],
354    position: Option<Offset>,
355) -> Result<QueryResult> {
356    let result = match position {
357        Some(pos) => store.get_at(metadata.table_id, key, pos)?,
358        None => store.get(metadata.table_id, key)?,
359    };
360    match result {
361        Some(bytes) => {
362            let row = decode_and_project(&bytes, columns, metadata)?;
363            Ok(QueryResult {
364                columns: column_names.to_vec(),
365                rows: vec![row],
366            })
367        }
368        None => Ok(QueryResult::empty(column_names.to_vec())),
369    }
370}
371
372/// Internal execution function that handles both current and point-in-time queries.
373#[allow(clippy::too_many_lines, clippy::used_underscore_binding)]
374fn execute_internal<S: ProjectionStore>(
375    store: &mut S,
376    plan: &QueryPlan,
377    _table_def: &TableDef, // Kept for API compatibility, but metadata is now in plans
378    position: Option<Offset>,
379) -> Result<QueryResult> {
380    // SOMETIMES: time-travel query at a specific log position.
381    kimberlite_properties::sometimes!(
382        position.is_some(),
383        "query.time_travel_at_position",
384        "query executes at a pinned historical log offset"
385    );
386    let result = execute_internal_inner(store, plan, _table_def, position)?;
387
388    // ALWAYS: the executed result's column count must equal the plan's declared schema.
389    #[cfg(any(test, feature = "sim"))]
390    {
391        let _expected_cols = plan.column_names().len();
392        kimberlite_properties::always!(
393            result.columns.len() == _expected_cols,
394            "query.result_columns_match_plan",
395            "query result column count must equal plan-declared schema column count"
396        );
397        // ALWAYS: every row must match the column count.
398        kimberlite_properties::always!(
399            result.rows.iter().all(|r| r.len() == _expected_cols),
400            "query.row_width_matches_columns",
401            "every result row must have width equal to declared column count"
402        );
403    }
404    Ok(result)
405}
406
407#[allow(clippy::too_many_lines)]
408fn execute_internal_inner<S: ProjectionStore>(
409    store: &mut S,
410    plan: &QueryPlan,
411    _table_def: &TableDef,
412    position: Option<Offset>,
413) -> Result<QueryResult> {
414    match plan {
415        QueryPlan::PointLookup {
416            metadata,
417            key,
418            columns,
419            column_names,
420        } => execute_point_lookup(store, metadata, key, columns, column_names, position),
421
422        QueryPlan::RangeScan {
423            metadata,
424            start,
425            end,
426            filter,
427            limit,
428            order,
429            order_by,
430            columns,
431            column_names,
432        } => execute_range_scan(
433            store,
434            metadata,
435            start,
436            end,
437            filter,
438            limit,
439            order,
440            order_by,
441            columns,
442            column_names,
443            position,
444        ),
445
446        QueryPlan::IndexScan {
447            metadata,
448            index_id,
449            start,
450            end,
451            filter,
452            limit,
453            order,
454            order_by,
455            columns,
456            column_names,
457            ..
458        } => execute_index_scan(
459            store,
460            metadata,
461            *index_id,
462            start,
463            end,
464            filter,
465            limit,
466            order,
467            order_by,
468            columns,
469            column_names,
470            position,
471        ),
472
473        QueryPlan::TableScan {
474            metadata,
475            filter,
476            limit,
477            order,
478            columns,
479            column_names,
480        } => execute_table_scan(
481            store,
482            metadata,
483            filter,
484            limit,
485            order,
486            columns,
487            column_names,
488            position,
489        ),
490
491        QueryPlan::Aggregate {
492            metadata,
493            source,
494            group_by_cols,
495            group_by_names: _,
496            aggregates,
497            column_names,
498            having,
499        } => execute_aggregate(
500            store,
501            source,
502            group_by_cols,
503            aggregates,
504            column_names,
505            metadata,
506            having,
507            position,
508        ),
509
510        QueryPlan::Join {
511            join_type,
512            left,
513            right,
514            on_conditions,
515            columns,
516            column_names,
517        } => execute_join(
518            store,
519            join_type,
520            left,
521            right,
522            on_conditions,
523            columns,
524            column_names,
525            position,
526        ),
527
528        QueryPlan::Materialize {
529            source,
530            filter,
531            case_columns,
532            order,
533            limit,
534            column_names,
535        } => execute_materialize(
536            store,
537            source,
538            filter,
539            case_columns,
540            order,
541            limit,
542            column_names,
543            position,
544        ),
545    }
546}
547
548/// Executes a Materialize plan: filter, compute CASE columns, sort, and limit.
549#[allow(clippy::too_many_arguments)]
550fn execute_materialize<S: ProjectionStore>(
551    store: &mut S,
552    source: &QueryPlan,
553    filter: &Option<crate::plan::Filter>,
554    case_columns: &[crate::plan::CaseColumnDef],
555    order: &Option<SortSpec>,
556    limit: &Option<usize>,
557    column_names: &[ColumnName],
558    position: Option<Offset>,
559) -> Result<QueryResult> {
560    // Execute the source plan (e.g., the Join node)
561    let dummy_def = TableDef {
562        table_id: kimberlite_store::TableId::from(0u64),
563        columns: vec![],
564        primary_key: vec![],
565        indexes: vec![],
566    };
567    let mut source_result = execute_internal(store, source, &dummy_def, position)?;
568
569    kimberlite_properties::sometimes!(
570        filter.is_some() || order.is_some() || limit.is_some(),
571        "query.materialize_applies_filter_order_limit",
572        "Materialize wrapper applies at least one of filter, order, or limit"
573    );
574
575    // 1. Apply WHERE filter
576    if let Some(f) = filter {
577        source_result.rows.retain(|row| f.matches(row));
578    }
579
580    // 2. Evaluate CASE WHEN computed columns and append to each row
581    if !case_columns.is_empty() {
582        kimberlite_properties::sometimes!(
583            !source_result.rows.is_empty(),
584            "query.case_when_evaluated",
585            "CASE WHEN computed columns evaluated against at least one row"
586        );
587        for row in &mut source_result.rows {
588            for case_col in case_columns {
589                let val = evaluate_case_column(case_col, row);
590                row.push(val);
591            }
592        }
593    }
594
595    // 3. Apply ORDER BY (client-side sort)
596    if let Some(spec) = order {
597        sort_rows(&mut source_result.rows, spec);
598    }
599
600    // 4. Apply LIMIT
601    if let Some(n) = limit {
602        source_result.rows.truncate(*n);
603    }
604
605    // Return with the declared output column names
606    Ok(QueryResult {
607        columns: column_names.to_vec(),
608        rows: source_result.rows,
609    })
610}
611
612/// Evaluates a CASE WHEN computed column against a row, returning the result value.
613fn evaluate_case_column(case_col: &crate::plan::CaseColumnDef, row: &[Value]) -> Value {
614    for clause in &case_col.when_clauses {
615        if clause.condition.matches(row) {
616            return clause.result.clone();
617        }
618    }
619    case_col.else_value.clone()
620}
621
622/// Executes a query plan against the current store state.
623pub fn execute<S: ProjectionStore>(
624    store: &mut S,
625    plan: &QueryPlan,
626    table_def: &TableDef,
627) -> Result<QueryResult> {
628    execute_internal(store, plan, table_def, None)
629}
630
631/// Executes a query plan at a specific log position (point-in-time query).
632pub fn execute_at<S: ProjectionStore>(
633    store: &mut S,
634    plan: &QueryPlan,
635    table_def: &TableDef,
636    position: Offset,
637) -> Result<QueryResult> {
638    execute_internal(store, plan, table_def, Some(position))
639}
640
641/// Converts bounds to a range.
642///
643/// The store scan uses a half-open range [start, end), so we need to:
644/// - For Included start: use the key as-is
645/// - For Excluded start: use the successor key (to skip the excluded value)
646/// - For Included end: use successor key (to include the value)
647/// - For Excluded end: use the key as-is
648fn bounds_to_range(start: &Bound<Key>, end: &Bound<Key>) -> (Key, Key) {
649    let start_key = match start {
650        Bound::Included(k) => k.clone(),
651        Bound::Excluded(k) => successor_key(k),
652        Bound::Unbounded => Key::min(),
653    };
654
655    let end_key = match end {
656        Bound::Included(k) => successor_key(k),
657        Bound::Excluded(k) => k.clone(),
658        Bound::Unbounded => Key::max(),
659    };
660
661    (start_key, end_key)
662}
663
664/// Extracts the primary key from a composite index key.
665///
666/// Index keys are structured as: [`index_column_values`...][primary_key_values...]
667/// This function strips the index column values and returns only the primary key portion.
668///
669/// # Assertions
670/// - Index key must be longer than the number of index columns
671/// - Primary key columns must be non-empty
672fn extract_pk_from_index_key(index_key: &Key, metadata: &crate::plan::TableMetadata) -> Key {
673    use crate::key_encoder::{decode_key, encode_key};
674
675    // Decode the full composite key to get all values
676    let all_values = decode_key(index_key);
677
678    // Get the number of primary key columns
679    let pk_count = metadata.primary_key.len();
680
681    // Assertions
682    debug_assert!(pk_count > 0, "primary key columns must be non-empty");
683    debug_assert!(
684        all_values.len() >= pk_count,
685        "index key must contain at least the primary key values"
686    );
687
688    // Extract the last pk_count values (the primary key)
689    // Index key format: [index_col1, index_col2, ..., pk_col1, pk_col2, ...]
690    let pk_values: Vec<Value> = all_values
691        .iter()
692        .skip(all_values.len() - pk_count)
693        .cloned()
694        .collect();
695
696    debug_assert_eq!(
697        pk_values.len(),
698        pk_count,
699        "extracted primary key must have correct number of columns"
700    );
701
702    // Re-encode as a key
703    encode_key(&pk_values)
704}
705
706/// Decodes a JSON row to values using embedded table metadata.
707fn decode_row(bytes: &Bytes, metadata: &crate::plan::TableMetadata) -> Result<Row> {
708    let json: serde_json::Value = serde_json::from_slice(bytes)?;
709
710    let obj = json.as_object().ok_or_else(|| QueryError::TypeMismatch {
711        expected: "object".to_string(),
712        actual: format!("{json:?}"),
713    })?;
714
715    let mut row = Vec::with_capacity(metadata.columns.len());
716
717    for col_def in &metadata.columns {
718        let col_name = col_def.name.as_str();
719        let json_val = obj.get(col_name).unwrap_or(&serde_json::Value::Null);
720        let value = Value::from_json(json_val, col_def.data_type)?;
721        row.push(value);
722    }
723
724    Ok(row)
725}
726
727/// Decodes a JSON row and projects columns (deprecated - use decode_row + project_row).
728fn decode_and_project(
729    bytes: &Bytes,
730    columns: &[usize],
731    metadata: &crate::plan::TableMetadata,
732) -> Result<Row> {
733    let full_row = decode_row(bytes, metadata)?;
734    Ok(project_row(&full_row, columns))
735}
736
737/// Projects a row to selected columns.
738fn project_row(full_row: &[Value], columns: &[usize]) -> Row {
739    // Precondition: column indices must be valid
740    debug_assert!(
741        columns.iter().all(|&idx| idx < full_row.len()),
742        "column index out of bounds: columns={:?}, row_len={}",
743        columns,
744        full_row.len()
745    );
746
747    if columns.is_empty() {
748        // Empty columns means all columns
749        return full_row.to_vec();
750    }
751
752    let projected: Vec<Value> = columns
753        .iter()
754        .map(|&idx| {
755            full_row.get(idx).cloned().unwrap_or_else(|| {
756                // This should never happen due to precondition
757                panic!(
758                    "column index {} out of bounds (row len {})",
759                    idx,
760                    full_row.len()
761                );
762            })
763        })
764        .collect();
765
766    // Postcondition: result has correct length
767    debug_assert_eq!(
768        projected.len(),
769        columns.len(),
770        "projected row length mismatch"
771    );
772
773    projected
774}
775
776/// Sorts rows according to the sort specification.
777fn sort_rows(rows: &mut [Row], spec: &SortSpec) {
778    rows.sort_by(|a, b| {
779        for (col_idx, order) in &spec.columns {
780            let a_val = a.get(*col_idx);
781            let b_val = b.get(*col_idx);
782
783            let cmp = match (a_val, b_val) {
784                (Some(av), Some(bv)) => av.compare(bv).unwrap_or(Ordering::Equal),
785                (None, None) => Ordering::Equal,
786                (None, Some(_)) => Ordering::Less,
787                (Some(_), None) => Ordering::Greater,
788            };
789
790            if cmp != Ordering::Equal {
791                return match order {
792                    ScanOrder::Ascending => cmp,
793                    ScanOrder::Descending => cmp.reverse(),
794                };
795            }
796        }
797        Ordering::Equal
798    });
799}
800
801/// Executes a nested loop join between two tables.
802#[allow(clippy::too_many_arguments)]
803/// Evaluates all join conditions on a concatenated row.
804///
805/// All conditions must be true for the join to match (AND semantics).
806fn evaluate_join_conditions(row: &[Value], conditions: &[crate::plan::JoinCondition]) -> bool {
807    use crate::plan::JoinOp;
808
809    conditions.iter().all(|cond| {
810        // Get values at left and right column indices
811        let left_val = row.get(cond.left_col_idx);
812        let right_val = row.get(cond.right_col_idx);
813
814        // Both values must exist
815        if left_val.is_none() || right_val.is_none() {
816            return false;
817        }
818
819        let left_val = left_val.unwrap();
820        let right_val = right_val.unwrap();
821
822        // Apply comparison operator
823        match cond.op {
824            JoinOp::Eq => left_val == right_val,
825            JoinOp::Lt => left_val.compare(right_val) == Some(std::cmp::Ordering::Less),
826            JoinOp::Le => matches!(
827                left_val.compare(right_val),
828                Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
829            ),
830            JoinOp::Gt => left_val.compare(right_val) == Some(std::cmp::Ordering::Greater),
831            JoinOp::Ge => matches!(
832                left_val.compare(right_val),
833                Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
834            ),
835        }
836    })
837}
838
839fn execute_join<S: ProjectionStore>(
840    store: &mut S,
841    join_type: &crate::parser::JoinType,
842    left: &QueryPlan,
843    right: &QueryPlan,
844    on_conditions: &[crate::plan::JoinCondition],
845    _columns: &[usize],
846    column_names: &[ColumnName],
847    position: Option<Offset>,
848) -> Result<QueryResult> {
849    // Get metadata from child plans for dummy TableDef (each child has its own metadata embedded)
850    let left_metadata = left.metadata().ok_or_else(|| {
851        QueryError::UnsupportedFeature("JOIN child plan missing metadata".to_string())
852    })?;
853    let right_metadata = right.metadata().ok_or_else(|| {
854        QueryError::UnsupportedFeature("JOIN child plan missing metadata".to_string())
855    })?;
856
857    // Create dummy table defs for execute_internal (metadata in plans will be used)
858    let left_table_def = TableDef {
859        table_id: left_metadata.table_id,
860        columns: left_metadata.columns.clone(),
861        primary_key: left_metadata.primary_key.clone(),
862        indexes: vec![], // Not needed for JOIN execution
863    };
864    let right_table_def = TableDef {
865        table_id: right_metadata.table_id,
866        columns: right_metadata.columns.clone(),
867        primary_key: right_metadata.primary_key.clone(),
868        indexes: vec![], // Not needed for JOIN execution
869    };
870
871    // Execute left and right subqueries
872    let left_result = execute_internal(store, left, &left_table_def, position)?;
873    let right_result = execute_internal(store, right, &right_table_def, position)?;
874
875    let mut output_rows = Vec::new();
876
877    match join_type {
878        crate::parser::JoinType::Inner => {
879            // INNER JOIN: only rows that match the join conditions
880            for left_row in &left_result.rows {
881                for right_row in &right_result.rows {
882                    // Build concatenated row: [left_cols..., right_cols...]
883                    let combined_row: Vec<Value> =
884                        left_row.iter().chain(right_row.iter()).cloned().collect();
885
886                    // Evaluate all join conditions
887                    if evaluate_join_conditions(&combined_row, on_conditions) {
888                        output_rows.push(combined_row);
889                        kimberlite_properties::sometimes!(
890                            output_rows.len() > MAX_JOIN_OUTPUT_ROWS,
891                            "query.join_output_row_cap_hit",
892                            "INNER JOIN output hits MAX_JOIN_OUTPUT_ROWS (1M) cap"
893                        );
894                        if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
895                            return Err(QueryError::UnsupportedFeature(format!(
896                                "JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
897                            )));
898                        }
899                    }
900                }
901            }
902        }
903        crate::parser::JoinType::Left => {
904            // LEFT JOIN: include left row with NULLs if no match
905            for left_row in &left_result.rows {
906                let mut matched = false;
907                for right_row in &right_result.rows {
908                    // Build concatenated row
909                    let combined_row: Vec<Value> =
910                        left_row.iter().chain(right_row.iter()).cloned().collect();
911
912                    // Evaluate all join conditions
913                    if evaluate_join_conditions(&combined_row, on_conditions) {
914                        output_rows.push(combined_row);
915                        matched = true;
916                        kimberlite_properties::sometimes!(
917                            output_rows.len() > MAX_JOIN_OUTPUT_ROWS,
918                            "query.left_join_output_row_cap_hit",
919                            "LEFT JOIN output hits MAX_JOIN_OUTPUT_ROWS (1M) cap"
920                        );
921                        if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
922                            return Err(QueryError::UnsupportedFeature(format!(
923                                "JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
924                            )));
925                        }
926                    }
927                }
928
929                // LEFT JOIN: include left row with NULLs if no match
930                if !matched {
931                    let right_nulls = vec![Value::Null; right_result.columns.len()];
932                    let combined_row: Vec<Value> = left_row
933                        .iter()
934                        .cloned()
935                        .chain(right_nulls.into_iter())
936                        .collect();
937                    output_rows.push(combined_row);
938                }
939            }
940        }
941    }
942
943    kimberlite_properties::sometimes!(
944        output_rows.len() > 1,
945        "query.join_produces_multi_row_output",
946        "join execution produces more than one output row"
947    );
948
949    Ok(QueryResult {
950        columns: column_names.to_vec(),
951        rows: output_rows,
952    })
953}
954
955/// Executes an aggregate query with optional grouping.
956fn execute_aggregate<S: ProjectionStore>(
957    store: &mut S,
958    source: &QueryPlan,
959    group_by_cols: &[usize],
960    aggregates: &[crate::parser::AggregateFunction],
961    column_names: &[ColumnName],
962    metadata: &crate::plan::TableMetadata,
963    having: &[crate::parser::HavingCondition],
964    position: Option<Offset>,
965) -> Result<QueryResult> {
966    use std::collections::HashMap;
967
968    // Execute source plan to get all rows
969    // Pass metadata as TableDef for API compatibility (it will be ignored in child plans)
970    let dummy_table_def = TableDef {
971        table_id: metadata.table_id,
972        columns: metadata.columns.clone(),
973        primary_key: metadata.primary_key.clone(),
974        indexes: vec![],
975    };
976    let source_result = execute_internal(store, source, &dummy_table_def, position)?;
977
978    // Build aggregate state grouped by key
979    let mut groups: HashMap<Vec<Value>, AggregateState> = HashMap::new();
980
981    for row in source_result.rows {
982        // Extract group key (values from GROUP BY columns)
983        let group_key: Vec<Value> = if group_by_cols.is_empty() {
984            // No GROUP BY - all rows in one group
985            vec![]
986        } else {
987            group_by_cols
988                .iter()
989                .map(|&idx| row.get(idx).cloned().unwrap_or(Value::Null))
990                .collect()
991        };
992
993        // Guard against unbounded group accumulation (DoS prevention).
994        kimberlite_properties::sometimes!(
995            !groups.contains_key(&group_key) && groups.len() >= MAX_GROUP_COUNT,
996            "query.group_by_cardinality_cap_hit",
997            "GROUP BY hits MAX_GROUP_COUNT (100k) distinct group cap"
998        );
999        if !groups.contains_key(&group_key) && groups.len() >= MAX_GROUP_COUNT {
1000            return Err(QueryError::UnsupportedFeature(format!(
1001                "GROUP BY cardinality exceeds maximum of {MAX_GROUP_COUNT} distinct groups"
1002            )));
1003        }
1004
1005        // Update aggregates for this group
1006        let state = groups.entry(group_key).or_insert_with(AggregateState::new);
1007        state.update(&row, aggregates, metadata)?;
1008    }
1009
1010    // Convert groups to result rows
1011    let group_by_count = group_by_cols.len();
1012    let mut result_rows = Vec::new();
1013    for (group_key, state) in groups {
1014        let agg_values = state.finalize(aggregates);
1015
1016        // Apply HAVING filter: check each condition against aggregate results
1017        if !having.is_empty() && !evaluate_having(having, aggregates, &agg_values, group_by_count) {
1018            continue;
1019        }
1020
1021        let mut result_row = group_key; // Start with GROUP BY columns
1022        result_row.extend(agg_values); // Add aggregate results
1023        result_rows.push(result_row);
1024    }
1025
1026    // If no groups and no GROUP BY, return one row with global aggregates
1027    if result_rows.is_empty() && group_by_cols.is_empty() && having.is_empty() {
1028        let state = AggregateState::new();
1029        let agg_values = state.finalize(aggregates);
1030        result_rows.push(agg_values);
1031    }
1032
1033    Ok(QueryResult {
1034        columns: column_names.to_vec(),
1035        rows: result_rows,
1036    })
1037}
1038
1039/// Evaluates HAVING conditions against aggregate results for a group.
1040///
1041/// Returns true if the group passes all HAVING conditions.
1042fn evaluate_having(
1043    having: &[crate::parser::HavingCondition],
1044    aggregates: &[crate::parser::AggregateFunction],
1045    agg_values: &[Value],
1046    _group_by_count: usize,
1047) -> bool {
1048    having.iter().all(|condition| match condition {
1049        crate::parser::HavingCondition::AggregateComparison {
1050            aggregate,
1051            op,
1052            value,
1053        } => {
1054            // Find the index of this aggregate in the aggregates list
1055            let agg_idx = aggregates.iter().position(|a| a == aggregate);
1056            let Some(idx) = agg_idx else {
1057                return false;
1058            };
1059            let Some(agg_value) = agg_values.get(idx) else {
1060                return false;
1061            };
1062
1063            // Compare using the specified operator
1064            match op {
1065                crate::parser::HavingOp::Eq => agg_value == value,
1066                crate::parser::HavingOp::Lt => agg_value.compare(value) == Some(Ordering::Less),
1067                crate::parser::HavingOp::Le => matches!(
1068                    agg_value.compare(value),
1069                    Some(Ordering::Less | Ordering::Equal)
1070                ),
1071                crate::parser::HavingOp::Gt => agg_value.compare(value) == Some(Ordering::Greater),
1072                crate::parser::HavingOp::Ge => matches!(
1073                    agg_value.compare(value),
1074                    Some(Ordering::Greater | Ordering::Equal)
1075                ),
1076            }
1077        }
1078    })
1079}
1080
1081/// State for computing aggregates over a group of rows.
1082#[derive(Debug, Clone)]
1083struct AggregateState {
1084    count: i64,
1085    non_null_counts: Vec<i64>, // For COUNT(col) - tracks non-NULL values per aggregate
1086    sums: Vec<Option<Value>>,
1087    mins: Vec<Option<Value>>,
1088    maxs: Vec<Option<Value>>,
1089}
1090
1091impl AggregateState {
1092    fn new() -> Self {
1093        Self {
1094            count: 0,
1095            non_null_counts: Vec::new(),
1096            sums: Vec::new(),
1097            mins: Vec::new(),
1098            maxs: Vec::new(),
1099        }
1100    }
1101
1102    fn update(
1103        &mut self,
1104        row: &[Value],
1105        aggregates: &[crate::parser::AggregateFunction],
1106        metadata: &crate::plan::TableMetadata,
1107    ) -> Result<()> {
1108        // Precondition: row must have at least one column
1109        debug_assert!(!row.is_empty(), "row must have at least one column");
1110
1111        // Precondition: enforce maximum aggregates limit to prevent DoS
1112        // Note: aggregates can be empty for DISTINCT queries (deduplication only)
1113        assert!(
1114            aggregates.len() <= MAX_AGGREGATES_PER_QUERY,
1115            "too many aggregates ({} > {})",
1116            aggregates.len(),
1117            MAX_AGGREGATES_PER_QUERY
1118        );
1119
1120        self.count += 1;
1121
1122        // Ensure vectors are sized
1123        while self.sums.len() < aggregates.len() {
1124            self.non_null_counts.push(0);
1125            self.sums.push(None);
1126            self.mins.push(None);
1127            self.maxs.push(None);
1128        }
1129
1130        // Invariant: all vectors must be same length after sizing
1131        debug_assert_eq!(
1132            self.sums.len(),
1133            self.non_null_counts.len(),
1134            "aggregate state vectors out of sync"
1135        );
1136        debug_assert_eq!(self.sums.len(), self.mins.len());
1137        debug_assert_eq!(self.sums.len(), self.maxs.len());
1138
1139        // Helper to find column index
1140        let find_col_idx = |col: &ColumnName| -> usize {
1141            metadata
1142                .columns
1143                .iter()
1144                .position(|c| &c.name == col)
1145                .unwrap_or(0)
1146        };
1147
1148        for (i, agg) in aggregates.iter().enumerate() {
1149            match agg {
1150                crate::parser::AggregateFunction::CountStar => {
1151                    // Already counted above
1152                }
1153                crate::parser::AggregateFunction::Count(col) => {
1154                    // COUNT(col) counts non-NULL values
1155                    let col_idx = find_col_idx(col);
1156                    if let Some(val) = row.get(col_idx) {
1157                        if !val.is_null() {
1158                            self.non_null_counts[i] += 1;
1159                        }
1160                    }
1161                }
1162                crate::parser::AggregateFunction::Sum(col) => {
1163                    let col_idx = find_col_idx(col);
1164                    if let Some(val) = row.get(col_idx) {
1165                        if !val.is_null() {
1166                            self.sums[i] = Some(add_values(&self.sums[i], val)?);
1167                        }
1168                    }
1169                }
1170                crate::parser::AggregateFunction::Avg(col) => {
1171                    // AVG = SUM / COUNT - compute sum here
1172                    let col_idx = find_col_idx(col);
1173                    if let Some(val) = row.get(col_idx) {
1174                        if !val.is_null() {
1175                            self.sums[i] = Some(add_values(&self.sums[i], val)?);
1176                        }
1177                    }
1178                }
1179                crate::parser::AggregateFunction::Min(col) => {
1180                    let col_idx = find_col_idx(col);
1181                    if let Some(val) = row.get(col_idx) {
1182                        if !val.is_null() {
1183                            self.mins[i] = Some(min_value(&self.mins[i], val));
1184                        }
1185                    }
1186                }
1187                crate::parser::AggregateFunction::Max(col) => {
1188                    let col_idx = find_col_idx(col);
1189                    if let Some(val) = row.get(col_idx) {
1190                        if !val.is_null() {
1191                            self.maxs[i] = Some(max_value(&self.maxs[i], val));
1192                        }
1193                    }
1194                }
1195            }
1196        }
1197
1198        // Postcondition: state must match aggregate count after update
1199        debug_assert_eq!(
1200            self.sums.len(),
1201            aggregates.len(),
1202            "aggregate state must match aggregate count after update"
1203        );
1204
1205        Ok(())
1206    }
1207
1208    fn finalize(&self, aggregates: &[crate::parser::AggregateFunction]) -> Vec<Value> {
1209        let mut result = Vec::new();
1210
1211        for (i, agg) in aggregates.iter().enumerate() {
1212            let value = match agg {
1213                crate::parser::AggregateFunction::CountStar => Value::BigInt(self.count),
1214                crate::parser::AggregateFunction::Count(_) => {
1215                    // Use non-NULL count for COUNT(col)
1216                    Value::BigInt(self.non_null_counts.get(i).copied().unwrap_or(0))
1217                }
1218                crate::parser::AggregateFunction::Sum(_) => self
1219                    .sums
1220                    .get(i)
1221                    .and_then(std::clone::Clone::clone)
1222                    .unwrap_or(Value::Null),
1223                crate::parser::AggregateFunction::Avg(_) => {
1224                    // AVG = SUM / COUNT
1225                    if self.count == 0 {
1226                        Value::Null
1227                    } else {
1228                        // NEVER: count guard above must prevent division-by-zero
1229                        // from ever reaching divide_value.
1230                        kimberlite_properties::never!(
1231                            self.count == 0,
1232                            "query.avg_divide_by_zero",
1233                            "AVG divide_value must never be reached with count == 0"
1234                        );
1235                        match self.sums.get(i).and_then(|v| v.as_ref()) {
1236                            Some(sum) => divide_value(sum, self.count).unwrap_or(Value::Null),
1237                            None => Value::Null,
1238                        }
1239                    }
1240                }
1241                crate::parser::AggregateFunction::Min(_) => self
1242                    .mins
1243                    .get(i)
1244                    .and_then(std::clone::Clone::clone)
1245                    .unwrap_or(Value::Null),
1246                crate::parser::AggregateFunction::Max(_) => self
1247                    .maxs
1248                    .get(i)
1249                    .and_then(std::clone::Clone::clone)
1250                    .unwrap_or(Value::Null),
1251            };
1252            result.push(value);
1253        }
1254
1255        result
1256    }
1257}
1258
1259/// Adds two values for SUM aggregates.
1260///
1261/// Uses checked arithmetic to detect integer overflow and return an error
1262/// rather than silently producing a wrapped/incorrect result.
1263fn add_values(a: &Option<Value>, b: &Value) -> Result<Value> {
1264    match a {
1265        None => Ok(b.clone()),
1266        Some(a_val) => match (a_val, b) {
1267            (Value::BigInt(x), Value::BigInt(y)) => {
1268                let checked = x.checked_add(*y);
1269                // SOMETIMES: exercise the overflow-detection path so we know the
1270                // guard is reachable under simulation. NEVER below guarantees a
1271                // Some() result is a true non-overflowing sum.
1272                kimberlite_properties::sometimes!(
1273                    checked.is_none(),
1274                    "query.sum_bigint_overflow_detected",
1275                    "SUM(BIGINT) overflow detected by checked_add"
1276                );
1277                if let Some(sum) = checked {
1278                    // NEVER: a surviving sum must equal wrapping_add with no wrap
1279                    // — i.e. checked_add only returns Some() for in-range results.
1280                    kimberlite_properties::never!(
1281                        sum != x.wrapping_add(*y) || (*x > 0 && *y > 0 && sum < 0)
1282                            || (*x < 0 && *y < 0 && sum > 0),
1283                        "query.sum_bigint_silent_wrap",
1284                        "SUM(BIGINT) checked_add returned Some() for an overflowing result"
1285                    );
1286                    Ok(Value::BigInt(sum))
1287                } else {
1288                    Err(QueryError::TypeMismatch {
1289                        expected: "BigInt (non-overflowing)".to_string(),
1290                        actual: format!("overflow: {x} + {y}"),
1291                    })
1292                }
1293            }
1294            (Value::Integer(x), Value::Integer(y)) => x
1295                .checked_add(*y)
1296                .map(Value::Integer)
1297                .ok_or_else(|| QueryError::TypeMismatch {
1298                    expected: "Integer (non-overflowing)".to_string(),
1299                    actual: format!("overflow: {x} + {y}"),
1300                }),
1301            (Value::SmallInt(x), Value::SmallInt(y)) => x
1302                .checked_add(*y)
1303                .map(Value::SmallInt)
1304                .ok_or_else(|| QueryError::TypeMismatch {
1305                    expected: "SmallInt (non-overflowing)".to_string(),
1306                    actual: format!("overflow: {x} + {y}"),
1307                }),
1308            (Value::TinyInt(x), Value::TinyInt(y)) => x
1309                .checked_add(*y)
1310                .map(Value::TinyInt)
1311                .ok_or_else(|| QueryError::TypeMismatch {
1312                    expected: "TinyInt (non-overflowing)".to_string(),
1313                    actual: format!("overflow: {x} + {y}"),
1314                }),
1315            (Value::Real(x), Value::Real(y)) => Ok(Value::Real(x + y)),
1316            (Value::Decimal(x, sx), Value::Decimal(y, sy)) if sx == sy => x
1317                .checked_add(*y)
1318                .map(|sum| Value::Decimal(sum, *sx))
1319                .ok_or_else(|| QueryError::TypeMismatch {
1320                    expected: "Decimal (non-overflowing)".to_string(),
1321                    actual: format!("overflow: {x} + {y}"),
1322                }),
1323            _ => Err(QueryError::TypeMismatch {
1324                expected: format!("{a_val:?}"),
1325                actual: format!("{b:?}"),
1326            }),
1327        },
1328    }
1329}
1330
1331/// Returns the minimum of two values.
1332fn min_value(a: &Option<Value>, b: &Value) -> Value {
1333    match a {
1334        None => b.clone(),
1335        Some(a_val) => {
1336            if let Some(ord) = a_val.compare(b) {
1337                if ord == Ordering::Less {
1338                    a_val.clone()
1339                } else {
1340                    b.clone()
1341                }
1342            } else {
1343                a_val.clone() // Incomparable types, keep current
1344            }
1345        }
1346    }
1347}
1348
1349/// Returns the maximum of two values.
1350fn max_value(a: &Option<Value>, b: &Value) -> Value {
1351    match a {
1352        None => b.clone(),
1353        Some(a_val) => {
1354            if let Some(ord) = a_val.compare(b) {
1355                if ord == Ordering::Greater {
1356                    a_val.clone()
1357                } else {
1358                    b.clone()
1359                }
1360            } else {
1361                a_val.clone() // Incomparable types, keep current
1362            }
1363        }
1364    }
1365}
1366
1367/// Divides a value by a count for AVG aggregates.
1368///
1369/// Returns `Some(Value::Null)` when `count == 0` to match SQL semantics:
1370/// `AVG` over an empty set is `NULL`.
1371#[allow(clippy::cast_precision_loss)]
1372fn divide_value(val: &Value, count: i64) -> Option<Value> {
1373    // Guard against division-by-zero.  SQL defines AVG() of an empty set as NULL.
1374    if count == 0 {
1375        return Some(Value::Null);
1376    }
1377
1378    match val {
1379        Value::BigInt(x) => Some(Value::Real(*x as f64 / count as f64)),
1380        Value::Integer(x) => Some(Value::Real(f64::from(*x) / count as f64)),
1381        Value::SmallInt(x) => Some(Value::Real(f64::from(*x) / count as f64)),
1382        Value::TinyInt(x) => Some(Value::Real(f64::from(*x) / count as f64)),
1383        Value::Real(x) => Some(Value::Real(x / count as f64)),
1384        Value::Decimal(x, scale) => {
1385            // Convert to float for division
1386            let divisor = 10_i128.pow(u32::from(*scale));
1387            let float_val = *x as f64 / divisor as f64;
1388            Some(Value::Real(float_val / count as f64))
1389        }
1390        _ => None,
1391    }
1392}
1393
1394#[cfg(test)]
1395mod tests {
1396    use super::*;
1397    use crate::plan::Filter;
1398    use crate::plan::FilterCondition;
1399    use crate::plan::FilterOp;
1400
1401    #[test]
1402    fn test_project_row() {
1403        let row = vec![
1404            Value::BigInt(1),
1405            Value::Text("alice".to_string()),
1406            Value::BigInt(30),
1407        ];
1408
1409        let projected = project_row(&row, &[0, 2]);
1410        assert_eq!(projected, vec![Value::BigInt(1), Value::BigInt(30)]);
1411    }
1412
1413    #[test]
1414    fn test_project_row_all() {
1415        let row = vec![Value::BigInt(1), Value::Text("bob".to_string())];
1416        let projected = project_row(&row, &[]);
1417        assert_eq!(projected, row);
1418    }
1419
1420    #[test]
1421    fn test_filter_matches() {
1422        let row = vec![Value::BigInt(42), Value::Text("alice".to_string())];
1423
1424        let filter = Filter::single(FilterCondition {
1425            column_idx: 0,
1426            op: FilterOp::Eq,
1427            value: Value::BigInt(42),
1428        });
1429
1430        assert!(filter.matches(&row));
1431
1432        let filter_miss = Filter::single(FilterCondition {
1433            column_idx: 0,
1434            op: FilterOp::Eq,
1435            value: Value::BigInt(99),
1436        });
1437
1438        assert!(!filter_miss.matches(&row));
1439    }
1440
1441    #[test]
1442    fn test_sort_rows() {
1443        let mut rows = vec![
1444            vec![Value::BigInt(3), Value::Text("c".to_string())],
1445            vec![Value::BigInt(1), Value::Text("a".to_string())],
1446            vec![Value::BigInt(2), Value::Text("b".to_string())],
1447        ];
1448
1449        let spec = SortSpec {
1450            columns: vec![(0, ScanOrder::Ascending)],
1451        };
1452
1453        sort_rows(&mut rows, &spec);
1454
1455        assert_eq!(rows[0][0], Value::BigInt(1));
1456        assert_eq!(rows[1][0], Value::BigInt(2));
1457        assert_eq!(rows[2][0], Value::BigInt(3));
1458    }
1459
1460    #[test]
1461    fn test_sort_rows_descending() {
1462        let mut rows = vec![
1463            vec![Value::BigInt(1)],
1464            vec![Value::BigInt(3)],
1465            vec![Value::BigInt(2)],
1466        ];
1467
1468        let spec = SortSpec {
1469            columns: vec![(0, ScanOrder::Descending)],
1470        };
1471
1472        sort_rows(&mut rows, &spec);
1473
1474        assert_eq!(rows[0][0], Value::BigInt(3));
1475        assert_eq!(rows[1][0], Value::BigInt(2));
1476        assert_eq!(rows[2][0], Value::BigInt(1));
1477    }
1478}