Skip to main content

kimberlite_query/
executor.rs

1//! Query executor: executes query plans against a projection store.
2
3#![allow(clippy::ref_option)]
4#![allow(clippy::trivially_copy_pass_by_ref)]
5#![allow(clippy::items_after_statements)]
6
7use std::cmp::Ordering;
8use std::ops::Bound;
9
10// ============================================================================
11// Query Execution Constants
12// ============================================================================
13
14/// Scan buffer multiplier when ORDER BY is present (needs extra buffer for sorting).
15///
16/// **Rationale**: Client-side sorting requires loading all candidate rows before
17/// applying LIMIT. We over-fetch by 10x to handle common cases where the ORDER BY
18/// columns have high cardinality, while still bounding memory usage.
19const SCAN_LIMIT_MULTIPLIER_WITH_SORT: usize = 10;
20
21/// Scan buffer multiplier without ORDER BY (minimal buffering).
22///
23/// **Rationale**: Without sorting, we can stream results and apply LIMIT incrementally.
24/// We fetch 2x the limit to handle edge cases with deleted rows or MVCC conflicts.
25const SCAN_LIMIT_MULTIPLIER_NO_SORT: usize = 2;
26
27/// Default scan limit when no LIMIT clause is specified.
28///
29/// **Rationale**: Prevents unbounded memory allocation for large tables.
30/// Set to 10K based on:
31/// - Avg row size ~1KB → ~10MB memory footprint
32/// - p99 query latency < 50ms for 10K row scan
33/// - Sufficient for most analytical queries
34const DEFAULT_SCAN_LIMIT: usize = 10_000;
35
36/// Maximum number of aggregates per query.
37///
38/// **Rationale**: Prevents `DoS` via memory exhaustion.
39/// Each aggregate maintains state (sum, count, min, max) ≈ 64 bytes per group.
40/// 100 aggregates × 1000 groups = ~6.4MB state, which is reasonable.
41const MAX_AGGREGATES_PER_QUERY: usize = 100;
42
43/// Maximum number of rows produced by a JOIN before aborting.
44///
45/// **Rationale**: A cross-join of two 1K-row tables yields 1M output rows.
46/// Without a bound, an adversary can trigger unbounded memory allocation.
47/// 1M rows × ~100 bytes/row ≈ 100 MB — a reasonable ceiling.
48const MAX_JOIN_OUTPUT_ROWS: usize = 1_000_000;
49
50/// Maximum number of distinct groups in a GROUP BY aggregate.
51///
52/// **Rationale**: Without a bound, a high-cardinality column causes
53/// unbounded HashMap growth. 100K groups × ~200 bytes/group ≈ 20 MB.
54const MAX_GROUP_COUNT: usize = 100_000;
55
56use bytes::Bytes;
57use kimberlite_store::{Key, ProjectionStore, TableId};
58use kimberlite_types::Offset;
59
60use crate::error::{QueryError, Result};
61use crate::key_encoder::successor_key;
62use crate::plan::{QueryPlan, ScanOrder, SortSpec};
63use crate::schema::{ColumnName, TableDef};
64use crate::value::Value;
65
66/// Result of executing a query.
67#[derive(Debug, Clone)]
68pub struct QueryResult {
69    /// Column names in result order.
70    pub columns: Vec<ColumnName>,
71    /// Result rows.
72    pub rows: Vec<Row>,
73}
74
75impl QueryResult {
76    /// Creates an empty result with the given columns.
77    pub fn empty(columns: Vec<ColumnName>) -> Self {
78        Self {
79            columns,
80            rows: vec![],
81        }
82    }
83
84    /// Returns the number of rows.
85    pub fn len(&self) -> usize {
86        self.rows.len()
87    }
88
89    /// Returns true if there are no rows.
90    pub fn is_empty(&self) -> bool {
91        self.rows.is_empty()
92    }
93}
94
95/// A single result row.
96pub type Row = Vec<Value>;
97
98/// Executes an index scan query.
99#[allow(clippy::too_many_arguments)]
100fn execute_index_scan<S: ProjectionStore>(
101    store: &mut S,
102    metadata: &crate::plan::TableMetadata,
103    index_id: u64,
104    start: &Bound<Key>,
105    end: &Bound<Key>,
106    filter: &Option<crate::plan::Filter>,
107    limit: &Option<usize>,
108    offset: &Option<usize>,
109    order: &ScanOrder,
110    order_by: &Option<crate::plan::SortSpec>,
111    columns: &[usize],
112    column_names: &[ColumnName],
113    position: Option<Offset>,
114) -> Result<QueryResult> {
115    let (start_key, end_key) = bounds_to_range(start, end);
116
117    // For pagination correctness, scan must consider offset+limit together so
118    // we don't truncate the window before skipping.
119    let limit_plus_offset = limit.map(|l| l.saturating_add(offset.unwrap_or(0)));
120
121    // Calculate scan limit based on whether client-side sorting is needed
122    let scan_limit = if order_by.is_some() {
123        limit_plus_offset.map_or(DEFAULT_SCAN_LIMIT, |l| {
124            l.saturating_mul(SCAN_LIMIT_MULTIPLIER_WITH_SORT)
125        })
126    } else {
127        limit_plus_offset.map_or(DEFAULT_SCAN_LIMIT, |l| {
128            l.saturating_mul(SCAN_LIMIT_MULTIPLIER_NO_SORT)
129        })
130    };
131
132    // Postcondition: scan limit must be positive
133    debug_assert!(scan_limit > 0, "scan_limit must be positive");
134
135    // Calculate index table ID using hash to avoid overflow
136    use std::collections::hash_map::DefaultHasher;
137    use std::hash::{Hash, Hasher};
138
139    let mut hasher = DefaultHasher::new();
140    metadata.table_id.as_u64().hash(&mut hasher);
141    index_id.hash(&mut hasher);
142    let index_table_id = TableId::new(hasher.finish());
143
144    // Scan the index table to get composite keys
145    let index_pairs = match position {
146        Some(pos) => store.scan_at(index_table_id, start_key..end_key, scan_limit, pos)?,
147        None => store.scan(index_table_id, start_key..end_key, scan_limit)?,
148    };
149
150    let mut full_rows = Vec::new();
151    let index_iter: Box<dyn Iterator<Item = &(Key, Bytes)>> = match order {
152        ScanOrder::Ascending => Box::new(index_pairs.iter()),
153        ScanOrder::Descending => Box::new(index_pairs.iter().rev()),
154    };
155
156    for (index_key, _) in index_iter {
157        // Extract primary key from the composite index key
158        let pk_key = extract_pk_from_index_key(index_key, metadata);
159
160        // Fetch the actual row from the base table
161        let bytes_opt = match position {
162            Some(pos) => store.get_at(metadata.table_id, &pk_key, pos)?,
163            None => store.get(metadata.table_id, &pk_key)?,
164        };
165        if let Some(bytes) = bytes_opt {
166            let full_row = decode_row(&bytes, metadata)?;
167
168            // Apply filter
169            if let Some(f) = filter {
170                if !f.matches(&full_row) {
171                    continue;
172                }
173            }
174
175            full_rows.push(full_row);
176
177            // When client-side sorting is needed, don't apply limit during scan
178            if order_by.is_none() {
179                if let Some(target) = limit_plus_offset {
180                    if full_rows.len() >= target {
181                        break;
182                    }
183                }
184            }
185        }
186    }
187
188    // Apply client-side sorting if needed (on full rows before projection)
189    if let Some(sort_spec) = order_by {
190        sort_rows(&mut full_rows, sort_spec);
191    }
192
193    apply_offset_and_limit(&mut full_rows, *offset, *limit);
194
195    // Project columns after sorting and limiting
196    let rows: Vec<Row> = full_rows
197        .iter()
198        .map(|full_row| project_row(full_row, columns))
199        .collect();
200
201    Ok(QueryResult {
202        columns: column_names.to_vec(),
203        rows,
204    })
205}
206
207/// Executes a table scan query.
208#[allow(clippy::too_many_arguments)]
209fn execute_table_scan<S: ProjectionStore>(
210    store: &mut S,
211    metadata: &crate::plan::TableMetadata,
212    filter: &Option<crate::plan::Filter>,
213    limit: &Option<usize>,
214    offset: &Option<usize>,
215    order: &Option<SortSpec>,
216    columns: &[usize],
217    column_names: &[ColumnName],
218    position: Option<Offset>,
219) -> Result<QueryResult> {
220    // Scan entire table — must scan past offset before applying limit
221    let limit_plus_offset = limit.map(|l| l.saturating_add(offset.unwrap_or(0)));
222    let scan_limit = limit_plus_offset.map_or(100_000, |l| l.saturating_mul(10));
223    let pairs = match position {
224        Some(pos) => store.scan_at(metadata.table_id, Key::min()..Key::max(), scan_limit, pos)?,
225        None => store.scan(metadata.table_id, Key::min()..Key::max(), scan_limit)?,
226    };
227
228    let mut full_rows = Vec::new();
229
230    for (_, bytes) in &pairs {
231        let full_row = decode_row(bytes, metadata)?;
232
233        // Apply filter
234        if let Some(f) = filter {
235            if !f.matches(&full_row) {
236                continue;
237            }
238        }
239
240        full_rows.push(full_row);
241    }
242
243    // Apply sort on full rows (before projection)
244    if let Some(sort_spec) = order {
245        sort_rows(&mut full_rows, sort_spec);
246    }
247
248    apply_offset_and_limit(&mut full_rows, *offset, *limit);
249
250    // Project columns after sorting and limiting
251    let rows: Vec<Row> = full_rows
252        .iter()
253        .map(|full_row| project_row(full_row, columns))
254        .collect();
255
256    Ok(QueryResult {
257        columns: column_names.to_vec(),
258        rows,
259    })
260}
261
262/// Executes a range scan query.
263#[allow(clippy::too_many_arguments)]
264fn execute_range_scan<S: ProjectionStore>(
265    store: &mut S,
266    metadata: &crate::plan::TableMetadata,
267    start: &Bound<Key>,
268    end: &Bound<Key>,
269    filter: &Option<crate::plan::Filter>,
270    limit: &Option<usize>,
271    offset: &Option<usize>,
272    order: &ScanOrder,
273    order_by: &Option<crate::plan::SortSpec>,
274    columns: &[usize],
275    column_names: &[ColumnName],
276    position: Option<Offset>,
277) -> Result<QueryResult> {
278    let (start_key, end_key) = bounds_to_range(start, end);
279
280    // Pagination: scan must include the offset window before truncating to limit.
281    let limit_plus_offset = limit.map(|l| l.saturating_add(offset.unwrap_or(0)));
282
283    // Calculate scan limit based on whether client-side sorting is needed
284    let scan_limit = if order_by.is_some() {
285        limit_plus_offset.map_or(DEFAULT_SCAN_LIMIT, |l| {
286            l.saturating_mul(SCAN_LIMIT_MULTIPLIER_WITH_SORT)
287        })
288    } else {
289        limit_plus_offset.map_or(DEFAULT_SCAN_LIMIT, |l| {
290            l.saturating_mul(SCAN_LIMIT_MULTIPLIER_NO_SORT)
291        })
292    };
293
294    // Postcondition: scan limit must be positive
295    debug_assert!(scan_limit > 0, "scan_limit must be positive");
296
297    let pairs = match position {
298        Some(pos) => store.scan_at(metadata.table_id, start_key..end_key, scan_limit, pos)?,
299        None => store.scan(metadata.table_id, start_key..end_key, scan_limit)?,
300    };
301
302    let mut full_rows = Vec::new();
303    let row_iter: Box<dyn Iterator<Item = &(Key, Bytes)>> = match order {
304        ScanOrder::Ascending => Box::new(pairs.iter()),
305        ScanOrder::Descending => Box::new(pairs.iter().rev()),
306    };
307
308    for (_, bytes) in row_iter {
309        let full_row = decode_row(bytes, metadata)?;
310
311        // Apply filter
312        if let Some(f) = filter {
313            if !f.matches(&full_row) {
314                continue;
315            }
316        }
317
318        full_rows.push(full_row);
319
320        // When client-side sorting is needed, don't apply limit during scan
321        if order_by.is_none() {
322            if let Some(target) = limit_plus_offset {
323                if full_rows.len() >= target {
324                    break;
325                }
326            }
327        }
328    }
329
330    // Apply client-side sorting if needed (on full rows before projection)
331    if let Some(sort_spec) = order_by {
332        sort_rows(&mut full_rows, sort_spec);
333    }
334
335    apply_offset_and_limit(&mut full_rows, *offset, *limit);
336
337    // Project columns after sorting and limiting
338    let rows: Vec<Row> = full_rows
339        .iter()
340        .map(|full_row| project_row(full_row, columns))
341        .collect();
342
343    Ok(QueryResult {
344        columns: column_names.to_vec(),
345        rows,
346    })
347}
348
349/// Applies SQL `OFFSET` then `LIMIT` to a row buffer in place.
350///
351/// `OFFSET` skips rows from the front; `LIMIT` truncates the remainder. Order
352/// matters: `OFFSET 5 LIMIT 10` returns rows 6..=15, not the first 10 then
353/// dropping 5. Either or both may be `None`.
354#[inline]
355fn apply_offset_and_limit<T>(rows: &mut Vec<T>, offset: Option<usize>, limit: Option<usize>) {
356    if let Some(off) = offset {
357        if off >= rows.len() {
358            rows.clear();
359        } else {
360            rows.drain(0..off);
361        }
362    }
363    if let Some(lim) = limit {
364        rows.truncate(lim);
365    }
366}
367
368/// Executes a point lookup query.
369fn execute_point_lookup<S: ProjectionStore>(
370    store: &mut S,
371    metadata: &crate::plan::TableMetadata,
372    key: &Key,
373    columns: &[usize],
374    column_names: &[ColumnName],
375    position: Option<Offset>,
376) -> Result<QueryResult> {
377    let result = match position {
378        Some(pos) => store.get_at(metadata.table_id, key, pos)?,
379        None => store.get(metadata.table_id, key)?,
380    };
381    match result {
382        Some(bytes) => {
383            let row = decode_and_project(&bytes, columns, metadata)?;
384            Ok(QueryResult {
385                columns: column_names.to_vec(),
386                rows: vec![row],
387            })
388        }
389        None => Ok(QueryResult::empty(column_names.to_vec())),
390    }
391}
392
393/// Internal execution function that handles both current and point-in-time queries.
394#[allow(clippy::too_many_lines, clippy::used_underscore_binding)]
395fn execute_internal<S: ProjectionStore>(
396    store: &mut S,
397    plan: &QueryPlan,
398    _table_def: &TableDef, // Kept for API compatibility, but metadata is now in plans
399    position: Option<Offset>,
400) -> Result<QueryResult> {
401    // SOMETIMES: time-travel query at a specific log position.
402    kimberlite_properties::sometimes!(
403        position.is_some(),
404        "query.time_travel_at_position",
405        "query executes at a pinned historical log offset"
406    );
407    let result = execute_internal_inner(store, plan, _table_def, position)?;
408
409    // ALWAYS: the executed result's column count must equal the plan's declared schema.
410    #[cfg(any(test, feature = "sim"))]
411    {
412        let _expected_cols = plan.column_names().len();
413        kimberlite_properties::always!(
414            result.columns.len() == _expected_cols,
415            "query.result_columns_match_plan",
416            "query result column count must equal plan-declared schema column count"
417        );
418        // ALWAYS: every row must match the column count.
419        kimberlite_properties::always!(
420            result.rows.iter().all(|r| r.len() == _expected_cols),
421            "query.row_width_matches_columns",
422            "every result row must have width equal to declared column count"
423        );
424    }
425    Ok(result)
426}
427
428#[allow(clippy::too_many_lines)]
429fn execute_internal_inner<S: ProjectionStore>(
430    store: &mut S,
431    plan: &QueryPlan,
432    _table_def: &TableDef,
433    position: Option<Offset>,
434) -> Result<QueryResult> {
435    match plan {
436        QueryPlan::PointLookup {
437            metadata,
438            key,
439            columns,
440            column_names,
441        } => execute_point_lookup(store, metadata, key, columns, column_names, position),
442
443        QueryPlan::RangeScan {
444            metadata,
445            start,
446            end,
447            filter,
448            limit,
449            offset,
450            order,
451            order_by,
452            columns,
453            column_names,
454        } => execute_range_scan(
455            store,
456            metadata,
457            start,
458            end,
459            filter,
460            limit,
461            offset,
462            order,
463            order_by,
464            columns,
465            column_names,
466            position,
467        ),
468
469        QueryPlan::IndexScan {
470            metadata,
471            index_id,
472            start,
473            end,
474            filter,
475            limit,
476            offset,
477            order,
478            order_by,
479            columns,
480            column_names,
481            ..
482        } => execute_index_scan(
483            store,
484            metadata,
485            *index_id,
486            start,
487            end,
488            filter,
489            limit,
490            offset,
491            order,
492            order_by,
493            columns,
494            column_names,
495            position,
496        ),
497
498        QueryPlan::TableScan {
499            metadata,
500            filter,
501            limit,
502            offset,
503            order,
504            columns,
505            column_names,
506        } => execute_table_scan(
507            store,
508            metadata,
509            filter,
510            limit,
511            offset,
512            order,
513            columns,
514            column_names,
515            position,
516        ),
517
518        QueryPlan::Aggregate {
519            metadata,
520            source,
521            group_by_cols,
522            group_by_names: _,
523            aggregates,
524            aggregate_filters,
525            column_names,
526            having,
527        } => execute_aggregate(
528            store,
529            source,
530            group_by_cols,
531            aggregates,
532            aggregate_filters,
533            column_names,
534            metadata,
535            having,
536            position,
537        ),
538
539        QueryPlan::Join {
540            join_type,
541            left,
542            right,
543            on_conditions,
544            columns,
545            column_names,
546        } => execute_join(
547            store,
548            join_type,
549            left,
550            right,
551            on_conditions,
552            columns,
553            column_names,
554            position,
555        ),
556
557        QueryPlan::Materialize {
558            source,
559            filter,
560            case_columns,
561            scalar_columns,
562            order,
563            limit,
564            offset,
565            column_names,
566        } => execute_materialize(
567            store,
568            source,
569            filter,
570            case_columns,
571            scalar_columns,
572            order,
573            limit,
574            offset,
575            column_names,
576            position,
577        ),
578    }
579}
580
581/// Executes a Materialize plan: filter, compute CASE columns, sort, offset, and limit.
582#[allow(clippy::too_many_arguments)]
583fn execute_materialize<S: ProjectionStore>(
584    store: &mut S,
585    source: &QueryPlan,
586    filter: &Option<crate::plan::Filter>,
587    case_columns: &[crate::plan::CaseColumnDef],
588    scalar_columns: &[crate::plan::ScalarColumnDef],
589    order: &Option<SortSpec>,
590    limit: &Option<usize>,
591    offset: &Option<usize>,
592    column_names: &[ColumnName],
593    position: Option<Offset>,
594) -> Result<QueryResult> {
595    // Execute the source plan (e.g., the Join node)
596    let dummy_def = TableDef {
597        table_id: kimberlite_store::TableId::from(0u64),
598        columns: vec![],
599        primary_key: vec![],
600        indexes: vec![],
601    };
602    let mut source_result = execute_internal(store, source, &dummy_def, position)?;
603
604    kimberlite_properties::sometimes!(
605        filter.is_some() || order.is_some() || limit.is_some() || offset.is_some(),
606        "query.materialize_applies_filter_order_limit",
607        "Materialize wrapper applies at least one of filter, order, limit, or offset"
608    );
609
610    // 1. Apply WHERE filter
611    if let Some(f) = filter {
612        source_result.rows.retain(|row| f.matches(row));
613    }
614
615    // 2. Evaluate CASE WHEN computed columns and append to each row
616    if !case_columns.is_empty() {
617        kimberlite_properties::sometimes!(
618            !source_result.rows.is_empty(),
619            "query.case_when_evaluated",
620            "CASE WHEN computed columns evaluated against at least one row"
621        );
622        for row in &mut source_result.rows {
623            for case_col in case_columns {
624                let val = evaluate_case_column(case_col, row);
625                row.push(val);
626            }
627        }
628    }
629
630    // 2b. Evaluate scalar-expression projections (v0.5.1) against each
631    // row. Each def carries its source-column layout so
632    // `ScalarExpr::Column(name)` resolves positionally. Evaluation
633    // errors propagate to the caller (e.g., CAST overflow).
634    if !scalar_columns.is_empty() {
635        for row in &mut source_result.rows {
636            for sc in scalar_columns {
637                let ctx = crate::expression::EvalContext::new(&sc.columns, row);
638                let val = crate::expression::evaluate(&sc.expr, &ctx)?;
639                row.push(val);
640            }
641        }
642    }
643
644    // 2c. Post-project: pick final output columns by name. The source
645    // plan may have fetched more columns than the final output shape
646    // declares (scalar projections need access to every referenced
647    // source column). Build an index map from `column_names` onto the
648    // row's current layout: source columns first, then CASE outputs,
649    // then scalar outputs.
650    let source_layout = source_result.columns.clone();
651    let mut full_layout: Vec<ColumnName> = source_layout;
652    full_layout.extend(case_columns.iter().map(|c| c.alias.clone()));
653    full_layout.extend(scalar_columns.iter().map(|c| c.output_name.clone()));
654    let needs_post_project = full_layout.len() != column_names.len()
655        || full_layout
656            .iter()
657            .zip(column_names.iter())
658            .any(|(a, b)| a != b);
659    if needs_post_project {
660        let mut index_map = Vec::with_capacity(column_names.len());
661        for target in column_names {
662            let pos = full_layout
663                .iter()
664                .position(|c| c == target)
665                .ok_or_else(|| QueryError::ColumnNotFound {
666                    table: String::new(),
667                    column: target.to_string(),
668                })?;
669            index_map.push(pos);
670        }
671        for row in &mut source_result.rows {
672            let projected: Vec<Value> = index_map.iter().map(|&i| row[i].clone()).collect();
673            *row = projected;
674        }
675    }
676
677    // 3. Apply ORDER BY (client-side sort)
678    if let Some(spec) = order {
679        sort_rows(&mut source_result.rows, spec);
680    }
681
682    // 4. Apply OFFSET then LIMIT
683    apply_offset_and_limit(&mut source_result.rows, *offset, *limit);
684
685    // Return with the declared output column names
686    Ok(QueryResult {
687        columns: column_names.to_vec(),
688        rows: source_result.rows,
689    })
690}
691
692/// Evaluates a CASE WHEN computed column against a row, returning the result value.
693fn evaluate_case_column(case_col: &crate::plan::CaseColumnDef, row: &[Value]) -> Value {
694    for clause in &case_col.when_clauses {
695        if clause.condition.matches(row) {
696            return clause.result.clone();
697        }
698    }
699    case_col.else_value.clone()
700}
701
702/// Executes a query plan against the current store state.
703pub fn execute<S: ProjectionStore>(
704    store: &mut S,
705    plan: &QueryPlan,
706    table_def: &TableDef,
707) -> Result<QueryResult> {
708    execute_internal(store, plan, table_def, None)
709}
710
711/// Executes a query plan at a specific log position (point-in-time query).
712pub fn execute_at<S: ProjectionStore>(
713    store: &mut S,
714    plan: &QueryPlan,
715    table_def: &TableDef,
716    position: Offset,
717) -> Result<QueryResult> {
718    execute_internal(store, plan, table_def, Some(position))
719}
720
721/// Converts bounds to a range.
722///
723/// The store scan uses a half-open range [start, end), so we need to:
724/// - For Included start: use the key as-is
725/// - For Excluded start: use the successor key (to skip the excluded value)
726/// - For Included end: use successor key (to include the value)
727/// - For Excluded end: use the key as-is
728fn bounds_to_range(start: &Bound<Key>, end: &Bound<Key>) -> (Key, Key) {
729    let start_key = match start {
730        Bound::Included(k) => k.clone(),
731        Bound::Excluded(k) => successor_key(k),
732        Bound::Unbounded => Key::min(),
733    };
734
735    let end_key = match end {
736        Bound::Included(k) => successor_key(k),
737        Bound::Excluded(k) => k.clone(),
738        Bound::Unbounded => Key::max(),
739    };
740
741    (start_key, end_key)
742}
743
744/// Extracts the primary key from a composite index key.
745///
746/// Index keys are structured as: [`index_column_values`...][primary_key_values...]
747/// This function strips the index column values and returns only the primary key portion.
748///
749/// # Assertions
750/// - Index key must be longer than the number of index columns
751/// - Primary key columns must be non-empty
752fn extract_pk_from_index_key(index_key: &Key, metadata: &crate::plan::TableMetadata) -> Key {
753    use crate::key_encoder::{decode_key, encode_key};
754
755    // Decode the full composite key to get all values
756    let all_values = decode_key(index_key);
757
758    // Get the number of primary key columns
759    let pk_count = metadata.primary_key.len();
760
761    // Assertions
762    debug_assert!(pk_count > 0, "primary key columns must be non-empty");
763    debug_assert!(
764        all_values.len() >= pk_count,
765        "index key must contain at least the primary key values"
766    );
767
768    // Extract the last pk_count values (the primary key)
769    // Index key format: [index_col1, index_col2, ..., pk_col1, pk_col2, ...]
770    let pk_values: Vec<Value> = all_values
771        .iter()
772        .skip(all_values.len() - pk_count)
773        .cloned()
774        .collect();
775
776    debug_assert_eq!(
777        pk_values.len(),
778        pk_count,
779        "extracted primary key must have correct number of columns"
780    );
781
782    // Re-encode as a key
783    encode_key(&pk_values)
784}
785
786/// Decodes a JSON row to values using embedded table metadata.
787fn decode_row(bytes: &Bytes, metadata: &crate::plan::TableMetadata) -> Result<Row> {
788    let json: serde_json::Value = serde_json::from_slice(bytes)?;
789
790    let obj = json.as_object().ok_or_else(|| QueryError::TypeMismatch {
791        expected: "object".to_string(),
792        actual: format!("{json:?}"),
793    })?;
794
795    let mut row = Vec::with_capacity(metadata.columns.len());
796
797    for col_def in &metadata.columns {
798        let col_name = col_def.name.as_str();
799        let json_val = obj.get(col_name).unwrap_or(&serde_json::Value::Null);
800        let value = Value::from_json(json_val, col_def.data_type)?;
801        row.push(value);
802    }
803
804    Ok(row)
805}
806
807/// Decodes a JSON row and projects columns (deprecated - use decode_row + project_row).
808fn decode_and_project(
809    bytes: &Bytes,
810    columns: &[usize],
811    metadata: &crate::plan::TableMetadata,
812) -> Result<Row> {
813    let full_row = decode_row(bytes, metadata)?;
814    Ok(project_row(&full_row, columns))
815}
816
817/// Projects a row to selected columns.
818fn project_row(full_row: &[Value], columns: &[usize]) -> Row {
819    // Precondition: column indices must be valid
820    debug_assert!(
821        columns.iter().all(|&idx| idx < full_row.len()),
822        "column index out of bounds: columns={:?}, row_len={}",
823        columns,
824        full_row.len()
825    );
826
827    if columns.is_empty() {
828        // Empty columns means all columns
829        return full_row.to_vec();
830    }
831
832    let projected: Vec<Value> = columns
833        .iter()
834        .map(|&idx| {
835            full_row.get(idx).cloned().unwrap_or_else(|| {
836                // This should never happen due to precondition
837                panic!(
838                    "column index {} out of bounds (row len {})",
839                    idx,
840                    full_row.len()
841                );
842            })
843        })
844        .collect();
845
846    // Postcondition: result has correct length
847    debug_assert_eq!(
848        projected.len(),
849        columns.len(),
850        "projected row length mismatch"
851    );
852
853    projected
854}
855
856/// Sorts rows according to the sort specification.
857fn sort_rows(rows: &mut [Row], spec: &SortSpec) {
858    rows.sort_by(|a, b| {
859        for (col_idx, order) in &spec.columns {
860            let a_val = a.get(*col_idx);
861            let b_val = b.get(*col_idx);
862
863            let cmp = match (a_val, b_val) {
864                (Some(av), Some(bv)) => av.compare(bv).unwrap_or(Ordering::Equal),
865                (None, None) => Ordering::Equal,
866                (None, Some(_)) => Ordering::Less,
867                (Some(_), None) => Ordering::Greater,
868            };
869
870            if cmp != Ordering::Equal {
871                return match order {
872                    ScanOrder::Ascending => cmp,
873                    ScanOrder::Descending => cmp.reverse(),
874                };
875            }
876        }
877        Ordering::Equal
878    });
879}
880
881/// Executes a nested loop join between two tables.
882#[allow(clippy::too_many_arguments)]
883/// Evaluates all join conditions on a concatenated row.
884///
885/// All conditions must be true for the join to match (AND semantics).
886fn evaluate_join_conditions(row: &[Value], conditions: &[crate::plan::JoinCondition]) -> bool {
887    use crate::plan::JoinOp;
888
889    conditions.iter().all(|cond| {
890        // Get values at left and right column indices
891        let left_val = row.get(cond.left_col_idx);
892        let right_val = row.get(cond.right_col_idx);
893
894        // Both values must exist
895        if left_val.is_none() || right_val.is_none() {
896            return false;
897        }
898
899        let left_val = left_val.unwrap();
900        let right_val = right_val.unwrap();
901
902        // Apply comparison operator
903        match cond.op {
904            JoinOp::Eq => left_val == right_val,
905            JoinOp::Lt => left_val.compare(right_val) == Some(std::cmp::Ordering::Less),
906            JoinOp::Le => matches!(
907                left_val.compare(right_val),
908                Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
909            ),
910            JoinOp::Gt => left_val.compare(right_val) == Some(std::cmp::Ordering::Greater),
911            JoinOp::Ge => matches!(
912                left_val.compare(right_val),
913                Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
914            ),
915        }
916    })
917}
918
919fn execute_join<S: ProjectionStore>(
920    store: &mut S,
921    join_type: &crate::parser::JoinType,
922    left: &QueryPlan,
923    right: &QueryPlan,
924    on_conditions: &[crate::plan::JoinCondition],
925    _columns: &[usize],
926    column_names: &[ColumnName],
927    position: Option<Offset>,
928) -> Result<QueryResult> {
929    // Get metadata from child plans for dummy TableDef (each child has its own metadata embedded)
930    let left_metadata = left.metadata().ok_or_else(|| {
931        QueryError::UnsupportedFeature("JOIN child plan missing metadata".to_string())
932    })?;
933    let right_metadata = right.metadata().ok_or_else(|| {
934        QueryError::UnsupportedFeature("JOIN child plan missing metadata".to_string())
935    })?;
936
937    // Create dummy table defs for execute_internal (metadata in plans will be used)
938    let left_table_def = TableDef {
939        table_id: left_metadata.table_id,
940        columns: left_metadata.columns.clone(),
941        primary_key: left_metadata.primary_key.clone(),
942        indexes: vec![], // Not needed for JOIN execution
943    };
944    let right_table_def = TableDef {
945        table_id: right_metadata.table_id,
946        columns: right_metadata.columns.clone(),
947        primary_key: right_metadata.primary_key.clone(),
948        indexes: vec![], // Not needed for JOIN execution
949    };
950
951    // Execute left and right subqueries
952    let left_result = execute_internal(store, left, &left_table_def, position)?;
953    let right_result = execute_internal(store, right, &right_table_def, position)?;
954
955    let mut output_rows = Vec::new();
956
957    match join_type {
958        crate::parser::JoinType::Inner => {
959            // INNER JOIN: only rows that match the join conditions
960            for left_row in &left_result.rows {
961                for right_row in &right_result.rows {
962                    // Build concatenated row: [left_cols..., right_cols...]
963                    let combined_row: Vec<Value> =
964                        left_row.iter().chain(right_row.iter()).cloned().collect();
965
966                    // Evaluate all join conditions
967                    if evaluate_join_conditions(&combined_row, on_conditions) {
968                        output_rows.push(combined_row);
969                        kimberlite_properties::sometimes!(
970                            output_rows.len() > MAX_JOIN_OUTPUT_ROWS,
971                            "query.join_output_row_cap_hit",
972                            "INNER JOIN output hits MAX_JOIN_OUTPUT_ROWS (1M) cap"
973                        );
974                        if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
975                            return Err(QueryError::UnsupportedFeature(format!(
976                                "JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
977                            )));
978                        }
979                    }
980                }
981            }
982        }
983        crate::parser::JoinType::Left => {
984            // LEFT JOIN: include left row with NULLs if no match
985            for left_row in &left_result.rows {
986                let mut matched = false;
987                for right_row in &right_result.rows {
988                    // Build concatenated row
989                    let combined_row: Vec<Value> =
990                        left_row.iter().chain(right_row.iter()).cloned().collect();
991
992                    // Evaluate all join conditions
993                    if evaluate_join_conditions(&combined_row, on_conditions) {
994                        output_rows.push(combined_row);
995                        matched = true;
996                        kimberlite_properties::sometimes!(
997                            output_rows.len() > MAX_JOIN_OUTPUT_ROWS,
998                            "query.left_join_output_row_cap_hit",
999                            "LEFT JOIN output hits MAX_JOIN_OUTPUT_ROWS (1M) cap"
1000                        );
1001                        if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
1002                            return Err(QueryError::UnsupportedFeature(format!(
1003                                "JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
1004                            )));
1005                        }
1006                    }
1007                }
1008
1009                // LEFT JOIN: include left row with NULLs if no match
1010                if !matched {
1011                    let right_nulls = vec![Value::Null; right_result.columns.len()];
1012                    let combined_row: Vec<Value> = left_row
1013                        .iter()
1014                        .cloned()
1015                        .chain(right_nulls.into_iter())
1016                        .collect();
1017                    output_rows.push(combined_row);
1018                }
1019            }
1020        }
1021        crate::parser::JoinType::Right => {
1022            // RIGHT JOIN: mirror of LEFT — include right row with NULLs if no match.
1023            // Output column order is still [left_cols..., right_cols...]; only the
1024            // unmatched-row treatment differs from LEFT.
1025            for right_row in &right_result.rows {
1026                let mut matched = false;
1027                for left_row in &left_result.rows {
1028                    let combined_row: Vec<Value> =
1029                        left_row.iter().chain(right_row.iter()).cloned().collect();
1030                    if evaluate_join_conditions(&combined_row, on_conditions) {
1031                        output_rows.push(combined_row);
1032                        matched = true;
1033                        if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
1034                            return Err(QueryError::UnsupportedFeature(format!(
1035                                "JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
1036                            )));
1037                        }
1038                    }
1039                }
1040                if !matched {
1041                    let left_nulls = vec![Value::Null; left_result.columns.len()];
1042                    let combined_row: Vec<Value> = left_nulls
1043                        .into_iter()
1044                        .chain(right_row.iter().cloned())
1045                        .collect();
1046                    output_rows.push(combined_row);
1047                }
1048            }
1049        }
1050        crate::parser::JoinType::Full => {
1051            // FULL OUTER JOIN: every left row appears at least once (with NULL
1052            // padding if unmatched); every right row appears at least once.
1053            // Implementation: do a LEFT pass then add unmatched right rows.
1054            let mut right_matched = vec![false; right_result.rows.len()];
1055            for left_row in &left_result.rows {
1056                let mut matched = false;
1057                for (rj, right_row) in right_result.rows.iter().enumerate() {
1058                    let combined_row: Vec<Value> =
1059                        left_row.iter().chain(right_row.iter()).cloned().collect();
1060                    if evaluate_join_conditions(&combined_row, on_conditions) {
1061                        output_rows.push(combined_row);
1062                        matched = true;
1063                        right_matched[rj] = true;
1064                        if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
1065                            return Err(QueryError::UnsupportedFeature(format!(
1066                                "JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
1067                            )));
1068                        }
1069                    }
1070                }
1071                if !matched {
1072                    let right_nulls = vec![Value::Null; right_result.columns.len()];
1073                    let combined_row: Vec<Value> = left_row
1074                        .iter()
1075                        .cloned()
1076                        .chain(right_nulls.into_iter())
1077                        .collect();
1078                    output_rows.push(combined_row);
1079                }
1080            }
1081            // Emit right rows with no left match.
1082            for (rj, right_row) in right_result.rows.iter().enumerate() {
1083                if !right_matched[rj] {
1084                    let left_nulls = vec![Value::Null; left_result.columns.len()];
1085                    let combined_row: Vec<Value> = left_nulls
1086                        .into_iter()
1087                        .chain(right_row.iter().cloned())
1088                        .collect();
1089                    output_rows.push(combined_row);
1090                }
1091            }
1092        }
1093        crate::parser::JoinType::Cross => {
1094            // CROSS JOIN: full Cartesian product. No ON predicate. Subject to
1095            // the same row-count cap as other join types — important for a
1096            // compliance database where a runaway cross-join can DoS the node.
1097            let estimated = left_result
1098                .rows
1099                .len()
1100                .saturating_mul(right_result.rows.len());
1101            if estimated > MAX_JOIN_OUTPUT_ROWS {
1102                return Err(QueryError::UnsupportedFeature(format!(
1103                    "CROSS JOIN cardinality {estimated} exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective query"
1104                )));
1105            }
1106            for left_row in &left_result.rows {
1107                for right_row in &right_result.rows {
1108                    let combined_row: Vec<Value> =
1109                        left_row.iter().chain(right_row.iter()).cloned().collect();
1110                    output_rows.push(combined_row);
1111                }
1112            }
1113        }
1114    }
1115
1116    kimberlite_properties::sometimes!(
1117        output_rows.len() > 1,
1118        "query.join_produces_multi_row_output",
1119        "join execution produces more than one output row"
1120    );
1121
1122    Ok(QueryResult {
1123        columns: column_names.to_vec(),
1124        rows: output_rows,
1125    })
1126}
1127
1128/// Executes an aggregate query with optional grouping.
1129#[allow(clippy::too_many_arguments)]
1130fn execute_aggregate<S: ProjectionStore>(
1131    store: &mut S,
1132    source: &QueryPlan,
1133    group_by_cols: &[usize],
1134    aggregates: &[crate::parser::AggregateFunction],
1135    aggregate_filters: &[Option<crate::plan::Filter>],
1136    column_names: &[ColumnName],
1137    metadata: &crate::plan::TableMetadata,
1138    having: &[crate::parser::HavingCondition],
1139    position: Option<Offset>,
1140) -> Result<QueryResult> {
1141    use std::collections::HashMap;
1142
1143    // Execute source plan to get all rows
1144    // Pass metadata as TableDef for API compatibility (it will be ignored in child plans)
1145    let dummy_table_def = TableDef {
1146        table_id: metadata.table_id,
1147        columns: metadata.columns.clone(),
1148        primary_key: metadata.primary_key.clone(),
1149        indexes: vec![],
1150    };
1151    let source_result = execute_internal(store, source, &dummy_table_def, position)?;
1152
1153    // Build aggregate state grouped by key
1154    let mut groups: HashMap<Vec<Value>, AggregateState> = HashMap::new();
1155
1156    for row in source_result.rows {
1157        // Extract group key (values from GROUP BY columns)
1158        let group_key: Vec<Value> = if group_by_cols.is_empty() {
1159            // No GROUP BY - all rows in one group
1160            vec![]
1161        } else {
1162            group_by_cols
1163                .iter()
1164                .map(|&idx| row.get(idx).cloned().unwrap_or(Value::Null))
1165                .collect()
1166        };
1167
1168        // Guard against unbounded group accumulation (DoS prevention).
1169        kimberlite_properties::sometimes!(
1170            !groups.contains_key(&group_key) && groups.len() >= MAX_GROUP_COUNT,
1171            "query.group_by_cardinality_cap_hit",
1172            "GROUP BY hits MAX_GROUP_COUNT (100k) distinct group cap"
1173        );
1174        if !groups.contains_key(&group_key) && groups.len() >= MAX_GROUP_COUNT {
1175            return Err(QueryError::UnsupportedFeature(format!(
1176                "GROUP BY cardinality exceeds maximum of {MAX_GROUP_COUNT} distinct groups"
1177            )));
1178        }
1179
1180        // Update aggregates for this group
1181        let state = groups.entry(group_key).or_insert_with(AggregateState::new);
1182        state.update(&row, aggregates, aggregate_filters, metadata)?;
1183    }
1184
1185    // Convert groups to result rows
1186    let group_by_count = group_by_cols.len();
1187    let mut result_rows = Vec::new();
1188    for (group_key, state) in groups {
1189        let agg_values = state.finalize(aggregates);
1190
1191        // Apply HAVING filter: check each condition against aggregate results
1192        if !having.is_empty() && !evaluate_having(having, aggregates, &agg_values, group_by_count) {
1193            continue;
1194        }
1195
1196        let mut result_row = group_key; // Start with GROUP BY columns
1197        result_row.extend(agg_values); // Add aggregate results
1198        result_rows.push(result_row);
1199    }
1200
1201    // If no groups and no GROUP BY, return one row with global aggregates
1202    if result_rows.is_empty() && group_by_cols.is_empty() && having.is_empty() {
1203        let state = AggregateState::new();
1204        let agg_values = state.finalize(aggregates);
1205        result_rows.push(agg_values);
1206    }
1207
1208    Ok(QueryResult {
1209        columns: column_names.to_vec(),
1210        rows: result_rows,
1211    })
1212}
1213
1214/// Evaluates HAVING conditions against aggregate results for a group.
1215///
1216/// Returns true if the group passes all HAVING conditions.
1217fn evaluate_having(
1218    having: &[crate::parser::HavingCondition],
1219    aggregates: &[crate::parser::AggregateFunction],
1220    agg_values: &[Value],
1221    _group_by_count: usize,
1222) -> bool {
1223    having.iter().all(|condition| match condition {
1224        crate::parser::HavingCondition::AggregateComparison {
1225            aggregate,
1226            op,
1227            value,
1228        } => {
1229            // Find the index of this aggregate in the aggregates list
1230            let agg_idx = aggregates.iter().position(|a| a == aggregate);
1231            let Some(idx) = agg_idx else {
1232                return false;
1233            };
1234            let Some(agg_value) = agg_values.get(idx) else {
1235                return false;
1236            };
1237
1238            // Compare using the specified operator
1239            match op {
1240                crate::parser::HavingOp::Eq => agg_value == value,
1241                crate::parser::HavingOp::Lt => agg_value.compare(value) == Some(Ordering::Less),
1242                crate::parser::HavingOp::Le => matches!(
1243                    agg_value.compare(value),
1244                    Some(Ordering::Less | Ordering::Equal)
1245                ),
1246                crate::parser::HavingOp::Gt => agg_value.compare(value) == Some(Ordering::Greater),
1247                crate::parser::HavingOp::Ge => matches!(
1248                    agg_value.compare(value),
1249                    Some(Ordering::Greater | Ordering::Equal)
1250                ),
1251            }
1252        }
1253    })
1254}
1255
1256/// State for computing aggregates over a group of rows.
1257#[derive(Debug, Clone)]
1258struct AggregateState {
1259    count: i64,
1260    /// Per-aggregate row count, used by `COUNT(*) FILTER (WHERE ...)` so that
1261    /// the result reflects only rows matching that aggregate's filter.
1262    /// Identical to `count` when no filters are present.
1263    per_agg_counts: Vec<i64>,
1264    non_null_counts: Vec<i64>, // For COUNT(col) - tracks non-NULL values per aggregate
1265    sums: Vec<Option<Value>>,
1266    mins: Vec<Option<Value>>,
1267    maxs: Vec<Option<Value>>,
1268}
1269
1270impl AggregateState {
1271    fn new() -> Self {
1272        Self {
1273            count: 0,
1274            per_agg_counts: Vec::new(),
1275            non_null_counts: Vec::new(),
1276            sums: Vec::new(),
1277            mins: Vec::new(),
1278            maxs: Vec::new(),
1279        }
1280    }
1281
1282    fn update(
1283        &mut self,
1284        row: &[Value],
1285        aggregates: &[crate::parser::AggregateFunction],
1286        aggregate_filters: &[Option<crate::plan::Filter>],
1287        metadata: &crate::plan::TableMetadata,
1288    ) -> Result<()> {
1289        // Precondition: row must have at least one column
1290        debug_assert!(!row.is_empty(), "row must have at least one column");
1291
1292        // Precondition: enforce maximum aggregates limit to prevent DoS
1293        // Note: aggregates can be empty for DISTINCT queries (deduplication only)
1294        assert!(
1295            aggregates.len() <= MAX_AGGREGATES_PER_QUERY,
1296            "too many aggregates ({} > {})",
1297            aggregates.len(),
1298            MAX_AGGREGATES_PER_QUERY
1299        );
1300
1301        // CountStar's count is per-aggregate when filters are involved, so we
1302        // track it inside the per-aggregate loop below rather than once here.
1303        let any_filter = aggregate_filters.iter().any(std::option::Option::is_some);
1304        if !any_filter {
1305            self.count += 1;
1306        }
1307
1308        // Ensure vectors are sized
1309        while self.sums.len() < aggregates.len() {
1310            self.non_null_counts.push(0);
1311            self.sums.push(None);
1312            self.mins.push(None);
1313            self.maxs.push(None);
1314            self.per_agg_counts.push(0);
1315        }
1316
1317        // Invariant: all vectors must be same length after sizing
1318        debug_assert_eq!(
1319            self.sums.len(),
1320            self.non_null_counts.len(),
1321            "aggregate state vectors out of sync"
1322        );
1323        debug_assert_eq!(self.sums.len(), self.mins.len());
1324        debug_assert_eq!(self.sums.len(), self.maxs.len());
1325
1326        // Helper to find column index
1327        let find_col_idx = |col: &ColumnName| -> usize {
1328            metadata
1329                .columns
1330                .iter()
1331                .position(|c| &c.name == col)
1332                .unwrap_or(0)
1333        };
1334
1335        for (i, agg) in aggregates.iter().enumerate() {
1336            // Per-aggregate FILTER (WHERE ...): skip this aggregate for this
1337            // row if the filter rejects it. The aggregate sees only the rows
1338            // matching its own filter; other aggregates are independent.
1339            if let Some(Some(filter)) = aggregate_filters.get(i) {
1340                if !filter.matches(row) {
1341                    continue;
1342                }
1343            }
1344            // Track per-aggregate row count so CountStar with FILTER produces
1345            // the per-aggregate count rather than the group total.
1346            self.per_agg_counts[i] += 1;
1347            match agg {
1348                crate::parser::AggregateFunction::CountStar => {
1349                    // Counted above (either globally or per-aggregate).
1350                }
1351                crate::parser::AggregateFunction::Count(col) => {
1352                    // COUNT(col) counts non-NULL values
1353                    let col_idx = find_col_idx(col);
1354                    if let Some(val) = row.get(col_idx) {
1355                        if !val.is_null() {
1356                            self.non_null_counts[i] += 1;
1357                        }
1358                    }
1359                }
1360                crate::parser::AggregateFunction::Sum(col) => {
1361                    let col_idx = find_col_idx(col);
1362                    if let Some(val) = row.get(col_idx) {
1363                        if !val.is_null() {
1364                            self.sums[i] = Some(add_values(&self.sums[i], val)?);
1365                        }
1366                    }
1367                }
1368                crate::parser::AggregateFunction::Avg(col) => {
1369                    // AVG = SUM / COUNT - compute sum here
1370                    let col_idx = find_col_idx(col);
1371                    if let Some(val) = row.get(col_idx) {
1372                        if !val.is_null() {
1373                            self.sums[i] = Some(add_values(&self.sums[i], val)?);
1374                        }
1375                    }
1376                }
1377                crate::parser::AggregateFunction::Min(col) => {
1378                    let col_idx = find_col_idx(col);
1379                    if let Some(val) = row.get(col_idx) {
1380                        if !val.is_null() {
1381                            self.mins[i] = Some(min_value(&self.mins[i], val));
1382                        }
1383                    }
1384                }
1385                crate::parser::AggregateFunction::Max(col) => {
1386                    let col_idx = find_col_idx(col);
1387                    if let Some(val) = row.get(col_idx) {
1388                        if !val.is_null() {
1389                            self.maxs[i] = Some(max_value(&self.maxs[i], val));
1390                        }
1391                    }
1392                }
1393            }
1394        }
1395
1396        // Postcondition: state must match aggregate count after update
1397        debug_assert_eq!(
1398            self.sums.len(),
1399            aggregates.len(),
1400            "aggregate state must match aggregate count after update"
1401        );
1402
1403        Ok(())
1404    }
1405
1406    fn finalize(&self, aggregates: &[crate::parser::AggregateFunction]) -> Vec<Value> {
1407        let mut result = Vec::new();
1408
1409        // For COUNT(*), prefer the per-aggregate count when it differs from
1410        // the global count (which means a FILTER (WHERE ...) is in play).
1411        // Otherwise the per-aggregate count is identical to the global count
1412        // when no filter is present (we keep both writes in sync in `update`).
1413        for (i, agg) in aggregates.iter().enumerate() {
1414            let per_agg_count = self.per_agg_counts.get(i).copied().unwrap_or(self.count);
1415            let value = match agg {
1416                crate::parser::AggregateFunction::CountStar => Value::BigInt(per_agg_count),
1417                crate::parser::AggregateFunction::Count(_) => {
1418                    // Use non-NULL count for COUNT(col)
1419                    Value::BigInt(self.non_null_counts.get(i).copied().unwrap_or(0))
1420                }
1421                crate::parser::AggregateFunction::Sum(_) => self
1422                    .sums
1423                    .get(i)
1424                    .and_then(std::clone::Clone::clone)
1425                    .unwrap_or(Value::Null),
1426                crate::parser::AggregateFunction::Avg(_) => {
1427                    // AVG = SUM / per-aggregate COUNT (so FILTER affects denominator).
1428                    if per_agg_count == 0 {
1429                        Value::Null
1430                    } else {
1431                        // NEVER: per-aggregate-count guard above must prevent
1432                        // division-by-zero from ever reaching divide_value.
1433                        kimberlite_properties::never!(
1434                            per_agg_count == 0,
1435                            "query.avg_divide_by_zero",
1436                            "AVG divide_value must never be reached with per_agg_count == 0"
1437                        );
1438                        match self.sums.get(i).and_then(|v| v.as_ref()) {
1439                            Some(sum) => divide_value(sum, per_agg_count).unwrap_or(Value::Null),
1440                            None => Value::Null,
1441                        }
1442                    }
1443                }
1444                crate::parser::AggregateFunction::Min(_) => self
1445                    .mins
1446                    .get(i)
1447                    .and_then(std::clone::Clone::clone)
1448                    .unwrap_or(Value::Null),
1449                crate::parser::AggregateFunction::Max(_) => self
1450                    .maxs
1451                    .get(i)
1452                    .and_then(std::clone::Clone::clone)
1453                    .unwrap_or(Value::Null),
1454            };
1455            result.push(value);
1456        }
1457
1458        result
1459    }
1460}
1461
1462/// Adds two values for SUM aggregates.
1463///
1464/// Uses checked arithmetic to detect integer overflow and return an error
1465/// rather than silently producing a wrapped/incorrect result.
1466fn add_values(a: &Option<Value>, b: &Value) -> Result<Value> {
1467    match a {
1468        None => Ok(b.clone()),
1469        Some(a_val) => match (a_val, b) {
1470            (Value::BigInt(x), Value::BigInt(y)) => {
1471                let checked = x.checked_add(*y);
1472                // SOMETIMES: exercise the overflow-detection path so we know the
1473                // guard is reachable under simulation. NEVER below guarantees a
1474                // Some() result is a true non-overflowing sum.
1475                kimberlite_properties::sometimes!(
1476                    checked.is_none(),
1477                    "query.sum_bigint_overflow_detected",
1478                    "SUM(BIGINT) overflow detected by checked_add"
1479                );
1480                if let Some(sum) = checked {
1481                    // NEVER: a surviving sum must equal wrapping_add with no wrap
1482                    // — i.e. checked_add only returns Some() for in-range results.
1483                    kimberlite_properties::never!(
1484                        sum != x.wrapping_add(*y)
1485                            || (*x > 0 && *y > 0 && sum < 0)
1486                            || (*x < 0 && *y < 0 && sum > 0),
1487                        "query.sum_bigint_silent_wrap",
1488                        "SUM(BIGINT) checked_add returned Some() for an overflowing result"
1489                    );
1490                    Ok(Value::BigInt(sum))
1491                } else {
1492                    Err(QueryError::TypeMismatch {
1493                        expected: "BigInt (non-overflowing)".to_string(),
1494                        actual: format!("overflow: {x} + {y}"),
1495                    })
1496                }
1497            }
1498            (Value::Integer(x), Value::Integer(y)) => x
1499                .checked_add(*y)
1500                .map(Value::Integer)
1501                .ok_or_else(|| QueryError::TypeMismatch {
1502                    expected: "Integer (non-overflowing)".to_string(),
1503                    actual: format!("overflow: {x} + {y}"),
1504                }),
1505            (Value::SmallInt(x), Value::SmallInt(y)) => x
1506                .checked_add(*y)
1507                .map(Value::SmallInt)
1508                .ok_or_else(|| QueryError::TypeMismatch {
1509                    expected: "SmallInt (non-overflowing)".to_string(),
1510                    actual: format!("overflow: {x} + {y}"),
1511                }),
1512            (Value::TinyInt(x), Value::TinyInt(y)) => x
1513                .checked_add(*y)
1514                .map(Value::TinyInt)
1515                .ok_or_else(|| QueryError::TypeMismatch {
1516                    expected: "TinyInt (non-overflowing)".to_string(),
1517                    actual: format!("overflow: {x} + {y}"),
1518                }),
1519            (Value::Real(x), Value::Real(y)) => Ok(Value::Real(x + y)),
1520            (Value::Decimal(x, sx), Value::Decimal(y, sy)) if sx == sy => x
1521                .checked_add(*y)
1522                .map(|sum| Value::Decimal(sum, *sx))
1523                .ok_or_else(|| QueryError::TypeMismatch {
1524                    expected: "Decimal (non-overflowing)".to_string(),
1525                    actual: format!("overflow: {x} + {y}"),
1526                }),
1527            _ => Err(QueryError::TypeMismatch {
1528                expected: format!("{a_val:?}"),
1529                actual: format!("{b:?}"),
1530            }),
1531        },
1532    }
1533}
1534
1535/// Returns the minimum of two values.
1536fn min_value(a: &Option<Value>, b: &Value) -> Value {
1537    match a {
1538        None => b.clone(),
1539        Some(a_val) => {
1540            if let Some(ord) = a_val.compare(b) {
1541                if ord == Ordering::Less {
1542                    a_val.clone()
1543                } else {
1544                    b.clone()
1545                }
1546            } else {
1547                a_val.clone() // Incomparable types, keep current
1548            }
1549        }
1550    }
1551}
1552
1553/// Returns the maximum of two values.
1554fn max_value(a: &Option<Value>, b: &Value) -> Value {
1555    match a {
1556        None => b.clone(),
1557        Some(a_val) => {
1558            if let Some(ord) = a_val.compare(b) {
1559                if ord == Ordering::Greater {
1560                    a_val.clone()
1561                } else {
1562                    b.clone()
1563                }
1564            } else {
1565                a_val.clone() // Incomparable types, keep current
1566            }
1567        }
1568    }
1569}
1570
1571/// Divides a value by a count for AVG aggregates.
1572///
1573/// Returns `Some(Value::Null)` when `count == 0` to match SQL semantics:
1574/// `AVG` over an empty set is `NULL`.
1575#[allow(clippy::cast_precision_loss)]
1576fn divide_value(val: &Value, count: i64) -> Option<Value> {
1577    // Guard against division-by-zero.  SQL defines AVG() of an empty set as NULL.
1578    if count == 0 {
1579        return Some(Value::Null);
1580    }
1581
1582    match val {
1583        Value::BigInt(x) => Some(Value::Real(*x as f64 / count as f64)),
1584        Value::Integer(x) => Some(Value::Real(f64::from(*x) / count as f64)),
1585        Value::SmallInt(x) => Some(Value::Real(f64::from(*x) / count as f64)),
1586        Value::TinyInt(x) => Some(Value::Real(f64::from(*x) / count as f64)),
1587        Value::Real(x) => Some(Value::Real(x / count as f64)),
1588        Value::Decimal(x, scale) => {
1589            // Convert to float for division
1590            let divisor = 10_i128.pow(u32::from(*scale));
1591            let float_val = *x as f64 / divisor as f64;
1592            Some(Value::Real(float_val / count as f64))
1593        }
1594        _ => None,
1595    }
1596}
1597
1598#[cfg(test)]
1599mod tests {
1600    use super::*;
1601    use crate::plan::Filter;
1602    use crate::plan::FilterCondition;
1603    use crate::plan::FilterOp;
1604
1605    #[test]
1606    fn test_project_row() {
1607        let row = vec![
1608            Value::BigInt(1),
1609            Value::Text("alice".to_string()),
1610            Value::BigInt(30),
1611        ];
1612
1613        let projected = project_row(&row, &[0, 2]);
1614        assert_eq!(projected, vec![Value::BigInt(1), Value::BigInt(30)]);
1615    }
1616
1617    #[test]
1618    fn test_project_row_all() {
1619        let row = vec![Value::BigInt(1), Value::Text("bob".to_string())];
1620        let projected = project_row(&row, &[]);
1621        assert_eq!(projected, row);
1622    }
1623
1624    #[test]
1625    fn test_filter_matches() {
1626        let row = vec![Value::BigInt(42), Value::Text("alice".to_string())];
1627
1628        let filter = Filter::single(FilterCondition {
1629            column_idx: 0,
1630            op: FilterOp::Eq,
1631            value: Value::BigInt(42),
1632        });
1633
1634        assert!(filter.matches(&row));
1635
1636        let filter_miss = Filter::single(FilterCondition {
1637            column_idx: 0,
1638            op: FilterOp::Eq,
1639            value: Value::BigInt(99),
1640        });
1641
1642        assert!(!filter_miss.matches(&row));
1643    }
1644
1645    #[test]
1646    fn test_sort_rows() {
1647        let mut rows = vec![
1648            vec![Value::BigInt(3), Value::Text("c".to_string())],
1649            vec![Value::BigInt(1), Value::Text("a".to_string())],
1650            vec![Value::BigInt(2), Value::Text("b".to_string())],
1651        ];
1652
1653        let spec = SortSpec {
1654            columns: vec![(0, ScanOrder::Ascending)],
1655        };
1656
1657        sort_rows(&mut rows, &spec);
1658
1659        assert_eq!(rows[0][0], Value::BigInt(1));
1660        assert_eq!(rows[1][0], Value::BigInt(2));
1661        assert_eq!(rows[2][0], Value::BigInt(3));
1662    }
1663
1664    #[test]
1665    fn test_sort_rows_descending() {
1666        let mut rows = vec![
1667            vec![Value::BigInt(1)],
1668            vec![Value::BigInt(3)],
1669            vec![Value::BigInt(2)],
1670        ];
1671
1672        let spec = SortSpec {
1673            columns: vec![(0, ScanOrder::Descending)],
1674        };
1675
1676        sort_rows(&mut rows, &spec);
1677
1678        assert_eq!(rows[0][0], Value::BigInt(3));
1679        assert_eq!(rows[1][0], Value::BigInt(2));
1680        assert_eq!(rows[2][0], Value::BigInt(1));
1681    }
1682}