llkv_executor/
lib.rs

1use arrow::array::{Array, ArrayRef, Int64Builder, RecordBatch};
2use arrow::datatypes::{DataType, Schema};
3use llkv_aggregate::{AggregateAccumulator, AggregateKind, AggregateSpec, AggregateState};
4use llkv_column_map::store::Projection as StoreProjection;
5use llkv_column_map::types::LogicalFieldId;
6use llkv_expr::expr::{Expr as LlkvExpr, Filter, Operator, ScalarExpr};
7use llkv_plan::{
8    AggregateExpr, AggregateFunction, OrderByPlan, OrderSortType, OrderTarget, PlanValue,
9    SelectPlan, SelectProjection,
10};
11use llkv_result::Error;
12use llkv_storage::pager::Pager;
13use llkv_table::table::{
14    RowIdFilter, ScanOrderDirection, ScanOrderSpec, ScanOrderTransform, ScanProjection,
15    ScanStreamOptions,
16};
17use llkv_table::types::FieldId;
18use rustc_hash::FxHashMap;
19use simd_r_drive_entry_handle::EntryHandle;
20use std::fmt;
21use std::ops::Bound;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicU64, Ordering};
24
25pub type ExecutorResult<T> = Result<T, Error>;
26
27mod projections;
28mod schema;
29pub use projections::{build_projected_columns, build_wildcard_projections};
30pub use schema::schema_for_projections;
31
32/// Trait for providing table access to the executor.
33pub trait TableProvider<P>
34where
35    P: Pager<Blob = EntryHandle> + Send + Sync,
36{
37    fn get_table(&self, canonical_name: &str) -> ExecutorResult<Arc<ExecutorTable<P>>>;
38}
39
40/// Query executor that executes SELECT plans.
41pub struct QueryExecutor<P>
42where
43    P: Pager<Blob = EntryHandle> + Send + Sync,
44{
45    provider: Arc<dyn TableProvider<P>>,
46}
47
48impl<P> QueryExecutor<P>
49where
50    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
51{
52    pub fn new(provider: Arc<dyn TableProvider<P>>) -> Self {
53        Self { provider }
54    }
55
56    pub fn execute_select(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
57        self.execute_select_with_filter(plan, None)
58    }
59
60    pub fn execute_select_with_filter(
61        &self,
62        plan: SelectPlan,
63        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
64    ) -> ExecutorResult<SelectExecution<P>> {
65        let table = self.provider.get_table(&plan.table)?;
66        let display_name = plan.table.clone();
67
68        if !plan.aggregates.is_empty() {
69            self.execute_aggregates(table, display_name, plan, row_filter)
70        } else if self.has_computed_aggregates(&plan) {
71            // Handle computed projections that contain embedded aggregates
72            self.execute_computed_aggregates(table, display_name, plan, row_filter)
73        } else {
74            self.execute_projection(table, display_name, plan, row_filter)
75        }
76    }
77
78    /// Check if any computed projections contain aggregate functions
79    fn has_computed_aggregates(&self, plan: &SelectPlan) -> bool {
80        plan.projections.iter().any(|proj| {
81            if let SelectProjection::Computed { expr, .. } = proj {
82                Self::expr_contains_aggregate(expr)
83            } else {
84                false
85            }
86        })
87    }
88
89    /// Recursively check if a scalar expression contains aggregates
90    fn expr_contains_aggregate(expr: &ScalarExpr<String>) -> bool {
91        match expr {
92            ScalarExpr::Aggregate(_) => true,
93            ScalarExpr::Binary { left, right, .. } => {
94                Self::expr_contains_aggregate(left) || Self::expr_contains_aggregate(right)
95            }
96            ScalarExpr::Column(_) | ScalarExpr::Literal(_) => false,
97        }
98    }
99
100    fn execute_projection(
101        &self,
102        table: Arc<ExecutorTable<P>>,
103        display_name: String,
104        plan: SelectPlan,
105        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
106    ) -> ExecutorResult<SelectExecution<P>> {
107        let table_ref = table.as_ref();
108        let projections = if plan.projections.is_empty() {
109            build_wildcard_projections(table_ref)
110        } else {
111            build_projected_columns(table_ref, &plan.projections)?
112        };
113        let schema = schema_for_projections(table_ref, &projections)?;
114
115        let (filter_expr, full_table_scan) = match plan.filter {
116            Some(expr) => (translate_predicate(expr, table_ref.schema.as_ref())?, false),
117            None => {
118                let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
119                    Error::InvalidArgumentError(
120                        "table has no columns; cannot perform wildcard scan".into(),
121                    )
122                })?;
123                (full_table_scan_filter(field_id), true)
124            }
125        };
126
127        let options = if let Some(order_plan) = &plan.order_by {
128            let order_spec = resolve_scan_order(table_ref, &projections, order_plan)?;
129            if row_filter.is_some() {
130                tracing::debug!("Applying MVCC row filter with ORDER BY");
131            }
132            ScanStreamOptions {
133                include_nulls: true,
134                order: Some(order_spec),
135                row_id_filter: row_filter.clone(),
136            }
137        } else {
138            if row_filter.is_some() {
139                tracing::debug!("Applying MVCC row filter");
140            }
141            ScanStreamOptions {
142                include_nulls: true,
143                order: None,
144                row_id_filter: row_filter.clone(),
145            }
146        };
147
148        Ok(SelectExecution::new_projection(
149            display_name,
150            schema,
151            table,
152            projections,
153            filter_expr,
154            options,
155            full_table_scan,
156        ))
157    }
158
159    fn execute_aggregates(
160        &self,
161        table: Arc<ExecutorTable<P>>,
162        display_name: String,
163        plan: SelectPlan,
164        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
165    ) -> ExecutorResult<SelectExecution<P>> {
166        let table_ref = table.as_ref();
167        let mut specs: Vec<AggregateSpec> = Vec::with_capacity(plan.aggregates.len());
168        for aggregate in plan.aggregates {
169            match aggregate {
170                AggregateExpr::CountStar { alias } => {
171                    specs.push(AggregateSpec {
172                        alias,
173                        kind: AggregateKind::CountStar,
174                    });
175                }
176                AggregateExpr::Column {
177                    column,
178                    alias,
179                    function,
180                } => {
181                    let col = table_ref.schema.resolve(&column).ok_or_else(|| {
182                        Error::InvalidArgumentError(format!(
183                            "unknown column '{}' in aggregate",
184                            column
185                        ))
186                    })?;
187                    let kind = match function {
188                        AggregateFunction::Count => AggregateKind::CountField {
189                            field_id: col.field_id,
190                        },
191                        AggregateFunction::SumInt64 => {
192                            if col.data_type != DataType::Int64 {
193                                return Err(Error::InvalidArgumentError(
194                                    "SUM currently supports only INTEGER columns".into(),
195                                ));
196                            }
197                            AggregateKind::SumInt64 {
198                                field_id: col.field_id,
199                            }
200                        }
201                        AggregateFunction::MinInt64 => {
202                            if col.data_type != DataType::Int64 {
203                                return Err(Error::InvalidArgumentError(
204                                    "MIN currently supports only INTEGER columns".into(),
205                                ));
206                            }
207                            AggregateKind::MinInt64 {
208                                field_id: col.field_id,
209                            }
210                        }
211                        AggregateFunction::MaxInt64 => {
212                            if col.data_type != DataType::Int64 {
213                                return Err(Error::InvalidArgumentError(
214                                    "MAX currently supports only INTEGER columns".into(),
215                                ));
216                            }
217                            AggregateKind::MaxInt64 {
218                                field_id: col.field_id,
219                            }
220                        }
221                        AggregateFunction::CountNulls => AggregateKind::CountNulls {
222                            field_id: col.field_id,
223                        },
224                    };
225                    specs.push(AggregateSpec { alias, kind });
226                }
227            }
228        }
229
230        if specs.is_empty() {
231            return Err(Error::InvalidArgumentError(
232                "aggregate query requires at least one aggregate expression".into(),
233            ));
234        }
235
236        let had_filter = plan.filter.is_some();
237        let filter_expr = match plan.filter {
238            Some(expr) => translate_predicate(expr, table.schema.as_ref())?,
239            None => {
240                let field_id = table.schema.first_field_id().ok_or_else(|| {
241                    Error::InvalidArgumentError(
242                        "table has no columns; cannot perform aggregate scan".into(),
243                    )
244                })?;
245                full_table_scan_filter(field_id)
246            }
247        };
248
249        // Build projections and track which projection index each spec uses
250        let mut projections = Vec::new();
251        let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
252
253        for spec in &specs {
254            if let Some(field_id) = spec.kind.field_id() {
255                let proj_idx = projections.len();
256                spec_to_projection.push(Some(proj_idx));
257                projections.push(ScanProjection::from(StoreProjection::with_alias(
258                    LogicalFieldId::for_user(table.table.table_id(), field_id),
259                    table
260                        .schema
261                        .column_by_field_id(field_id)
262                        .map(|c| c.name.clone())
263                        .unwrap_or_else(|| format!("col{field_id}")),
264                )));
265            } else {
266                spec_to_projection.push(None);
267            }
268        }
269
270        if projections.is_empty() {
271            let field_id = table.schema.first_field_id().ok_or_else(|| {
272                Error::InvalidArgumentError(
273                    "table has no columns; cannot perform aggregate scan".into(),
274                )
275            })?;
276            projections.push(ScanProjection::from(StoreProjection::with_alias(
277                LogicalFieldId::for_user(table.table.table_id(), field_id),
278                table
279                    .schema
280                    .column_by_field_id(field_id)
281                    .map(|c| c.name.clone())
282                    .unwrap_or_else(|| format!("col{field_id}")),
283            )));
284        }
285
286        let options = ScanStreamOptions {
287            include_nulls: true,
288            order: None,
289            row_id_filter: row_filter.clone(),
290        };
291
292        let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
293        // MVCC Note: We cannot use the total_rows shortcut when MVCC visibility filtering
294        // is enabled, because some rows may be invisible due to uncommitted or aborted transactions.
295        // Always scan to apply proper visibility rules.
296        let mut count_star_override: Option<i64> = None;
297        if !had_filter && row_filter.is_none() {
298            // Only use shortcut if no filter AND no MVCC row filtering
299            let total_rows = table.total_rows.load(Ordering::SeqCst);
300            tracing::debug!(
301                "[AGGREGATE] Using COUNT(*) shortcut: total_rows={}",
302                total_rows
303            );
304            if total_rows > i64::MAX as u64 {
305                return Err(Error::InvalidArgumentError(
306                    "COUNT(*) result exceeds supported range".into(),
307                ));
308            }
309            count_star_override = Some(total_rows as i64);
310        } else {
311            tracing::debug!(
312                "[AGGREGATE] NOT using COUNT(*) shortcut: had_filter={}, has_row_filter={}",
313                had_filter,
314                row_filter.is_some()
315            );
316        }
317
318        for (idx, spec) in specs.iter().enumerate() {
319            states.push(AggregateState {
320                alias: spec.alias.clone(),
321                accumulator: AggregateAccumulator::new_with_projection_index(
322                    spec,
323                    spec_to_projection[idx],
324                    count_star_override,
325                )?,
326                override_value: match spec.kind {
327                    AggregateKind::CountStar => {
328                        tracing::debug!(
329                            "[AGGREGATE] CountStar override_value={:?}",
330                            count_star_override
331                        );
332                        count_star_override
333                    }
334                    _ => None,
335                },
336            });
337        }
338
339        let mut error: Option<Error> = None;
340        match table.table.scan_stream(
341            projections,
342            &filter_expr,
343            ScanStreamOptions {
344                row_id_filter: row_filter.clone(),
345                ..options
346            },
347            |batch| {
348                if error.is_some() {
349                    return;
350                }
351                for state in &mut states {
352                    if let Err(err) = state.update(&batch) {
353                        error = Some(err);
354                        return;
355                    }
356                }
357            },
358        ) {
359            Ok(()) => {}
360            Err(llkv_result::Error::NotFound) => {
361                // Treat missing storage keys as an empty result set. This occurs
362                // for freshly created tables that have no persisted chunks yet.
363            }
364            Err(err) => return Err(err),
365        }
366        if let Some(err) = error {
367            return Err(err);
368        }
369
370        let mut fields = Vec::with_capacity(states.len());
371        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(states.len());
372        for state in states {
373            let (field, array) = state.finalize()?;
374            fields.push(field);
375            arrays.push(array);
376        }
377
378        let schema = Arc::new(Schema::new(fields));
379        let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
380        Ok(SelectExecution::new_single_batch(
381            display_name,
382            schema,
383            batch,
384        ))
385    }
386
387    /// Execute a query where computed projections contain embedded aggregates
388    /// This extracts aggregates, computes them, then evaluates the scalar expressions
389    fn execute_computed_aggregates(
390        &self,
391        table: Arc<ExecutorTable<P>>,
392        display_name: String,
393        plan: SelectPlan,
394        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
395    ) -> ExecutorResult<SelectExecution<P>> {
396        use arrow::array::Int64Array;
397        use llkv_expr::expr::AggregateCall;
398
399        let table_ref = table.as_ref();
400
401        // First, extract all unique aggregates from the projections
402        let mut aggregate_specs: Vec<(String, AggregateCall<String>)> = Vec::new();
403        for proj in &plan.projections {
404            if let SelectProjection::Computed { expr, .. } = proj {
405                Self::collect_aggregates(expr, &mut aggregate_specs);
406            }
407        }
408
409        // Compute the aggregates using the existing aggregate execution infrastructure
410        let computed_aggregates = self.compute_aggregate_values(
411            table.clone(),
412            &plan.filter,
413            &aggregate_specs,
414            row_filter.clone(),
415        )?;
416
417        // Now build the final projections by evaluating expressions with aggregates substituted
418        let mut fields = Vec::with_capacity(plan.projections.len());
419        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(plan.projections.len());
420
421        for proj in &plan.projections {
422            match proj {
423                SelectProjection::AllColumns => {
424                    return Err(Error::InvalidArgumentError(
425                        "AllColumns projection not supported with computed aggregates".into(),
426                    ));
427                }
428                SelectProjection::Column { name, alias } => {
429                    let col = table_ref.schema.resolve(name).ok_or_else(|| {
430                        Error::InvalidArgumentError(format!("unknown column '{}'", name))
431                    })?;
432                    let field_name = alias.as_ref().unwrap_or(name);
433                    fields.push(arrow::datatypes::Field::new(
434                        field_name,
435                        col.data_type.clone(),
436                        col.nullable,
437                    ));
438                    // For regular columns in an aggregate query, we'd need to handle GROUP BY
439                    // For now, return an error as this is not supported
440                    return Err(Error::InvalidArgumentError(
441                        "Regular columns not supported in aggregate queries without GROUP BY"
442                            .into(),
443                    ));
444                }
445                SelectProjection::Computed { expr, alias } => {
446                    // Evaluate the expression with aggregates substituted
447                    let value = Self::evaluate_expr_with_aggregates(expr, &computed_aggregates)?;
448
449                    fields.push(arrow::datatypes::Field::new(alias, DataType::Int64, false));
450
451                    let array = Arc::new(Int64Array::from(vec![value])) as ArrayRef;
452                    arrays.push(array);
453                }
454            }
455        }
456
457        let schema = Arc::new(Schema::new(fields));
458        let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
459        Ok(SelectExecution::new_single_batch(
460            display_name,
461            schema,
462            batch,
463        ))
464    }
465
466    /// Collect all aggregate calls from an expression
467    fn collect_aggregates(
468        expr: &ScalarExpr<String>,
469        aggregates: &mut Vec<(String, llkv_expr::expr::AggregateCall<String>)>,
470    ) {
471        match expr {
472            ScalarExpr::Aggregate(agg) => {
473                // Create a unique key for this aggregate
474                let key = format!("{:?}", agg);
475                if !aggregates.iter().any(|(k, _)| k == &key) {
476                    aggregates.push((key, agg.clone()));
477                }
478            }
479            ScalarExpr::Binary { left, right, .. } => {
480                Self::collect_aggregates(left, aggregates);
481                Self::collect_aggregates(right, aggregates);
482            }
483            ScalarExpr::Column(_) | ScalarExpr::Literal(_) => {}
484        }
485    }
486
487    /// Compute the actual values for the aggregates
488    fn compute_aggregate_values(
489        &self,
490        table: Arc<ExecutorTable<P>>,
491        filter: &Option<llkv_expr::expr::Expr<'static, String>>,
492        aggregate_specs: &[(String, llkv_expr::expr::AggregateCall<String>)],
493        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
494    ) -> ExecutorResult<FxHashMap<String, i64>> {
495        use llkv_expr::expr::AggregateCall;
496
497        let table_ref = table.as_ref();
498        let mut results =
499            FxHashMap::with_capacity_and_hasher(aggregate_specs.len(), Default::default());
500
501        // Build aggregate specs for the aggregator
502        let mut specs: Vec<AggregateSpec> = Vec::new();
503        for (key, agg) in aggregate_specs {
504            let kind = match agg {
505                AggregateCall::CountStar => AggregateKind::CountStar,
506                AggregateCall::Count(col_name) => {
507                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
508                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
509                    })?;
510                    AggregateKind::CountField {
511                        field_id: col.field_id,
512                    }
513                }
514                AggregateCall::Sum(col_name) => {
515                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
516                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
517                    })?;
518                    AggregateKind::SumInt64 {
519                        field_id: col.field_id,
520                    }
521                }
522                AggregateCall::Min(col_name) => {
523                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
524                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
525                    })?;
526                    AggregateKind::MinInt64 {
527                        field_id: col.field_id,
528                    }
529                }
530                AggregateCall::Max(col_name) => {
531                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
532                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
533                    })?;
534                    AggregateKind::MaxInt64 {
535                        field_id: col.field_id,
536                    }
537                }
538                AggregateCall::CountNulls(col_name) => {
539                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
540                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
541                    })?;
542                    AggregateKind::CountNulls {
543                        field_id: col.field_id,
544                    }
545                }
546            };
547            specs.push(AggregateSpec {
548                alias: key.clone(),
549                kind,
550            });
551        }
552
553        // Prepare filter and projections
554        let filter_expr = match filter {
555            Some(expr) => translate_predicate(expr.clone(), table_ref.schema.as_ref())?,
556            None => {
557                let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
558                    Error::InvalidArgumentError(
559                        "table has no columns; cannot perform aggregate scan".into(),
560                    )
561                })?;
562                full_table_scan_filter(field_id)
563            }
564        };
565
566        let mut projections: Vec<ScanProjection> = Vec::new();
567        let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
568        let count_star_override: Option<i64> = None;
569
570        for spec in &specs {
571            if let Some(field_id) = spec.kind.field_id() {
572                spec_to_projection.push(Some(projections.len()));
573                projections.push(ScanProjection::from(StoreProjection::with_alias(
574                    LogicalFieldId::for_user(table.table.table_id(), field_id),
575                    table
576                        .schema
577                        .column_by_field_id(field_id)
578                        .map(|c| c.name.clone())
579                        .unwrap_or_else(|| format!("col{field_id}")),
580                )));
581            } else {
582                spec_to_projection.push(None);
583            }
584        }
585
586        if projections.is_empty() {
587            let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
588                Error::InvalidArgumentError(
589                    "table has no columns; cannot perform aggregate scan".into(),
590                )
591            })?;
592            projections.push(ScanProjection::from(StoreProjection::with_alias(
593                LogicalFieldId::for_user(table.table.table_id(), field_id),
594                table
595                    .schema
596                    .column_by_field_id(field_id)
597                    .map(|c| c.name.clone())
598                    .unwrap_or_else(|| format!("col{field_id}")),
599            )));
600        }
601
602        let base_options = ScanStreamOptions {
603            include_nulls: true,
604            order: None,
605            row_id_filter: None,
606        };
607
608        let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
609        for (idx, spec) in specs.iter().enumerate() {
610            states.push(AggregateState {
611                alias: spec.alias.clone(),
612                accumulator: AggregateAccumulator::new_with_projection_index(
613                    spec,
614                    spec_to_projection[idx],
615                    count_star_override,
616                )?,
617                override_value: match spec.kind {
618                    AggregateKind::CountStar => count_star_override,
619                    _ => None,
620                },
621            });
622        }
623
624        let mut error: Option<Error> = None;
625        match table.table.scan_stream(
626            projections,
627            &filter_expr,
628            ScanStreamOptions {
629                row_id_filter: row_filter.clone(),
630                ..base_options
631            },
632            |batch| {
633                if error.is_some() {
634                    return;
635                }
636                for state in &mut states {
637                    if let Err(err) = state.update(&batch) {
638                        error = Some(err);
639                        return;
640                    }
641                }
642            },
643        ) {
644            Ok(()) => {}
645            Err(llkv_result::Error::NotFound) => {}
646            Err(err) => return Err(err),
647        }
648        if let Some(err) = error {
649            return Err(err);
650        }
651
652        // Extract the computed values
653        for state in states {
654            let alias = state.alias.clone();
655            let (_field, array) = state.finalize()?;
656
657            // Extract the i64 value from the array
658            let int64_array = array
659                .as_any()
660                .downcast_ref::<arrow::array::Int64Array>()
661                .ok_or_else(|| Error::Internal("Expected Int64Array from aggregate".into()))?;
662
663            if int64_array.len() != 1 {
664                return Err(Error::Internal(format!(
665                    "Expected single value from aggregate, got {}",
666                    int64_array.len()
667                )));
668            }
669
670            let value = if int64_array.is_null(0) {
671                0
672            } else {
673                int64_array.value(0)
674            };
675
676            results.insert(alias, value);
677        }
678
679        Ok(results)
680    }
681
682    /// Evaluate an expression by substituting aggregate values
683    fn evaluate_expr_with_aggregates(
684        expr: &ScalarExpr<String>,
685        aggregates: &FxHashMap<String, i64>,
686    ) -> ExecutorResult<i64> {
687        use llkv_expr::expr::BinaryOp;
688        use llkv_expr::literal::Literal;
689
690        match expr {
691            ScalarExpr::Literal(Literal::Integer(v)) => Ok(*v as i64),
692            ScalarExpr::Literal(Literal::Float(v)) => Ok(*v as i64),
693            ScalarExpr::Literal(Literal::String(_)) => Err(Error::InvalidArgumentError(
694                "String literals not supported in aggregate expressions".into(),
695            )),
696            ScalarExpr::Column(_) => Err(Error::InvalidArgumentError(
697                "Column references not supported in aggregate-only expressions".into(),
698            )),
699            ScalarExpr::Aggregate(agg) => {
700                let key = format!("{:?}", agg);
701                aggregates.get(&key).copied().ok_or_else(|| {
702                    Error::Internal(format!("Aggregate value not found for key: {}", key))
703                })
704            }
705            ScalarExpr::Binary { left, op, right } => {
706                let left_val = Self::evaluate_expr_with_aggregates(left, aggregates)?;
707                let right_val = Self::evaluate_expr_with_aggregates(right, aggregates)?;
708
709                let result = match op {
710                    BinaryOp::Add => left_val.checked_add(right_val),
711                    BinaryOp::Subtract => left_val.checked_sub(right_val),
712                    BinaryOp::Multiply => left_val.checked_mul(right_val),
713                    BinaryOp::Divide => {
714                        if right_val == 0 {
715                            return Err(Error::InvalidArgumentError("Division by zero".into()));
716                        }
717                        left_val.checked_div(right_val)
718                    }
719                    BinaryOp::Modulo => {
720                        if right_val == 0 {
721                            return Err(Error::InvalidArgumentError("Modulo by zero".into()));
722                        }
723                        left_val.checked_rem(right_val)
724                    }
725                };
726
727                result.ok_or_else(|| {
728                    Error::InvalidArgumentError("Arithmetic overflow in expression".into())
729                })
730            }
731        }
732    }
733}
734
735/// Streaming execution handle for SELECT queries.
736#[derive(Clone)]
737pub struct SelectExecution<P>
738where
739    P: Pager<Blob = EntryHandle> + Send + Sync,
740{
741    table_name: String,
742    schema: Arc<Schema>,
743    stream: SelectStream<P>,
744}
745
746#[derive(Clone)]
747enum SelectStream<P>
748where
749    P: Pager<Blob = EntryHandle> + Send + Sync,
750{
751    Projection {
752        table: Arc<ExecutorTable<P>>,
753        projections: Vec<ScanProjection>,
754        filter_expr: LlkvExpr<'static, FieldId>,
755        options: ScanStreamOptions<P>,
756        full_table_scan: bool,
757    },
758    Aggregation {
759        batch: RecordBatch,
760    },
761}
762
763impl<P> SelectExecution<P>
764where
765    P: Pager<Blob = EntryHandle> + Send + Sync,
766{
767    fn new_projection(
768        table_name: String,
769        schema: Arc<Schema>,
770        table: Arc<ExecutorTable<P>>,
771        projections: Vec<ScanProjection>,
772        filter_expr: LlkvExpr<'static, FieldId>,
773        options: ScanStreamOptions<P>,
774        full_table_scan: bool,
775    ) -> Self {
776        Self {
777            table_name,
778            schema,
779            stream: SelectStream::Projection {
780                table,
781                projections,
782                filter_expr,
783                options,
784                full_table_scan,
785            },
786        }
787    }
788
789    fn new_single_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
790        Self {
791            table_name,
792            schema,
793            stream: SelectStream::Aggregation { batch },
794        }
795    }
796
797    pub fn from_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
798        Self::new_single_batch(table_name, schema, batch)
799    }
800
801    pub fn table_name(&self) -> &str {
802        &self.table_name
803    }
804
805    pub fn schema(&self) -> Arc<Schema> {
806        Arc::clone(&self.schema)
807    }
808
809    pub fn stream(
810        self,
811        mut on_batch: impl FnMut(RecordBatch) -> ExecutorResult<()>,
812    ) -> ExecutorResult<()> {
813        let schema = Arc::clone(&self.schema);
814        match self.stream {
815            SelectStream::Projection {
816                table,
817                projections,
818                filter_expr,
819                options,
820                full_table_scan,
821            } => {
822                // Early return for empty tables to avoid ColumnStore data_type() errors
823                let total_rows = table.total_rows.load(Ordering::SeqCst);
824                if total_rows == 0 {
825                    // Empty table - return empty result with correct schema
826                    return Ok(());
827                }
828
829                let mut error: Option<Error> = None;
830                let mut produced = false;
831                let mut produced_rows: u64 = 0;
832                let capture_nulls_first = matches!(options.order, Some(spec) if spec.nulls_first);
833                let include_nulls = options.include_nulls;
834                let has_row_id_filter = options.row_id_filter.is_some();
835                let scan_options = options;
836                let mut buffered_batches: Vec<RecordBatch> = Vec::new();
837                table
838                    .table
839                    .scan_stream(projections, &filter_expr, scan_options, |batch| {
840                        if error.is_some() {
841                            return;
842                        }
843                        produced = true;
844                        produced_rows = produced_rows.saturating_add(batch.num_rows() as u64);
845                        if capture_nulls_first {
846                            buffered_batches.push(batch);
847                        } else if let Err(err) = on_batch(batch) {
848                            error = Some(err);
849                        }
850                    })?;
851                if let Some(err) = error {
852                    return Err(err);
853                }
854                if !produced {
855                    if total_rows > 0 {
856                        for batch in synthesize_null_scan(Arc::clone(&schema), total_rows)? {
857                            on_batch(batch)?;
858                        }
859                    }
860                    return Ok(());
861                }
862                let mut null_batches: Vec<RecordBatch> = Vec::new();
863                // Only synthesize null rows if:
864                // 1. include_nulls is true
865                // 2. This is a full table scan
866                // 3. We produced fewer rows than the total
867                // 4. We DON'T have a row_id_filter (e.g., MVCC filter) that intentionally filtered rows
868                if include_nulls
869                    && full_table_scan
870                    && produced_rows < total_rows
871                    && !has_row_id_filter
872                {
873                    let missing = total_rows - produced_rows;
874                    if missing > 0 {
875                        null_batches = synthesize_null_scan(Arc::clone(&schema), missing)?;
876                    }
877                }
878
879                if capture_nulls_first {
880                    for batch in null_batches {
881                        on_batch(batch)?;
882                    }
883                    for batch in buffered_batches {
884                        on_batch(batch)?;
885                    }
886                } else if !null_batches.is_empty() {
887                    for batch in null_batches {
888                        on_batch(batch)?;
889                    }
890                }
891                Ok(())
892            }
893            SelectStream::Aggregation { batch } => on_batch(batch),
894        }
895    }
896
897    pub fn collect(self) -> ExecutorResult<Vec<RecordBatch>> {
898        let mut batches = Vec::new();
899        self.stream(|batch| {
900            batches.push(batch);
901            Ok(())
902        })?;
903        Ok(batches)
904    }
905
906    pub fn collect_rows(self) -> ExecutorResult<RowBatch> {
907        let schema = self.schema();
908        let mut rows: Vec<Vec<PlanValue>> = Vec::new();
909        self.stream(|batch| {
910            for row_idx in 0..batch.num_rows() {
911                let mut row: Vec<PlanValue> = Vec::with_capacity(batch.num_columns());
912                for col_idx in 0..batch.num_columns() {
913                    let value = llkv_plan::plan_value_from_array(batch.column(col_idx), row_idx)?;
914                    row.push(value);
915                }
916                rows.push(row);
917            }
918            Ok(())
919        })?;
920        let columns = schema
921            .fields()
922            .iter()
923            .map(|field| field.name().to_string())
924            .collect();
925        Ok(RowBatch { columns, rows })
926    }
927
928    pub fn into_rows(self) -> ExecutorResult<Vec<Vec<PlanValue>>> {
929        Ok(self.collect_rows()?.rows)
930    }
931}
932
933impl<P> fmt::Debug for SelectExecution<P>
934where
935    P: Pager<Blob = EntryHandle> + Send + Sync,
936{
937    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
938        f.debug_struct("SelectExecution")
939            .field("table_name", &self.table_name)
940            .field("schema", &self.schema)
941            .finish()
942    }
943}
944
945pub struct ExecutorTable<P>
946where
947    P: Pager<Blob = EntryHandle> + Send + Sync,
948{
949    // underlying physical table from llkv_table
950    pub table: Arc<llkv_table::table::Table<P>>,
951    pub schema: Arc<ExecutorSchema>,
952    pub next_row_id: AtomicU64,
953    pub total_rows: AtomicU64,
954}
955
956pub struct ExecutorSchema {
957    pub columns: Vec<ExecutorColumn>,
958    pub lookup: FxHashMap<String, usize>,
959}
960
961impl ExecutorSchema {
962    pub fn resolve(&self, name: &str) -> Option<&ExecutorColumn> {
963        let normalized = name.to_ascii_lowercase();
964        self.lookup
965            .get(&normalized)
966            .and_then(|idx| self.columns.get(*idx))
967    }
968
969    pub fn first_field_id(&self) -> Option<FieldId> {
970        self.columns.first().map(|col| col.field_id)
971    }
972
973    pub fn column_by_field_id(&self, field_id: FieldId) -> Option<&ExecutorColumn> {
974        self.columns.iter().find(|col| col.field_id == field_id)
975    }
976}
977
978#[derive(Clone)]
979pub struct ExecutorColumn {
980    pub name: String,
981    pub data_type: DataType,
982    pub nullable: bool,
983    pub primary_key: bool,
984    pub field_id: FieldId,
985}
986
987// Re-export from llkv-plan
988// PlanValue is the plan-level value type; do not re-export higher-level prefixed symbols.
989
990// Export executor-local types with explicit Exec-prefixed names to avoid
991// colliding with Arrow / storage types imported above.
992// Executor types are public `ExecutorColumn`, `ExecutorSchema`, `ExecutorTable`.
993// No short `Exec*` aliases to avoid confusion.
994
995pub struct RowBatch {
996    pub columns: Vec<String>,
997    pub rows: Vec<Vec<PlanValue>>,
998}
999
1000fn resolve_scan_order<P>(
1001    table: &ExecutorTable<P>,
1002    projections: &[ScanProjection],
1003    order_plan: &OrderByPlan,
1004) -> ExecutorResult<ScanOrderSpec>
1005where
1006    P: Pager<Blob = EntryHandle> + Send + Sync,
1007{
1008    let (column, field_id) = match &order_plan.target {
1009        OrderTarget::Column(name) => {
1010            let column = table.schema.resolve(name).ok_or_else(|| {
1011                Error::InvalidArgumentError(format!("unknown column '{}' in ORDER BY", name))
1012            })?;
1013            (column, column.field_id)
1014        }
1015        OrderTarget::Index(position) => {
1016            let projection = projections.get(*position).ok_or_else(|| {
1017                Error::InvalidArgumentError(format!(
1018                    "ORDER BY position {} is out of range",
1019                    position + 1
1020                ))
1021            })?;
1022            match projection {
1023                ScanProjection::Column(store_projection) => {
1024                    let field_id = store_projection.logical_field_id.field_id();
1025                    let column = table.schema.column_by_field_id(field_id).ok_or_else(|| {
1026                        Error::InvalidArgumentError(format!(
1027                            "unknown column with field id {field_id} in ORDER BY"
1028                        ))
1029                    })?;
1030                    (column, field_id)
1031                }
1032                ScanProjection::Computed { .. } => {
1033                    return Err(Error::InvalidArgumentError(
1034                        "ORDER BY position referring to computed projection is not supported"
1035                            .into(),
1036                    ));
1037                }
1038            }
1039        }
1040    };
1041
1042    let transform = match order_plan.sort_type {
1043        OrderSortType::Native => match column.data_type {
1044            DataType::Int64 => ScanOrderTransform::IdentityInteger,
1045            DataType::Utf8 => ScanOrderTransform::IdentityUtf8,
1046            ref other => {
1047                return Err(Error::InvalidArgumentError(format!(
1048                    "ORDER BY on column type {:?} is not supported",
1049                    other
1050                )));
1051            }
1052        },
1053        OrderSortType::CastTextToInteger => {
1054            if column.data_type != DataType::Utf8 {
1055                return Err(Error::InvalidArgumentError(
1056                    "ORDER BY CAST expects a text column".into(),
1057                ));
1058            }
1059            ScanOrderTransform::CastUtf8ToInteger
1060        }
1061    };
1062
1063    let direction = if order_plan.ascending {
1064        ScanOrderDirection::Ascending
1065    } else {
1066        ScanOrderDirection::Descending
1067    };
1068
1069    Ok(ScanOrderSpec {
1070        field_id,
1071        direction,
1072        nulls_first: order_plan.nulls_first,
1073        transform,
1074    })
1075}
1076
1077fn full_table_scan_filter(field_id: FieldId) -> LlkvExpr<'static, FieldId> {
1078    LlkvExpr::Pred(Filter {
1079        field_id,
1080        op: Operator::Range {
1081            lower: Bound::Unbounded,
1082            upper: Bound::Unbounded,
1083        },
1084    })
1085}
1086
1087fn synthesize_null_scan(schema: Arc<Schema>, total_rows: u64) -> ExecutorResult<Vec<RecordBatch>> {
1088    let row_count = usize::try_from(total_rows).map_err(|_| {
1089        Error::InvalidArgumentError("table row count exceeds supported in-memory batch size".into())
1090    })?;
1091
1092    let mut arrays: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1093    for field in schema.fields() {
1094        match field.data_type() {
1095            DataType::Int64 => {
1096                let mut builder = Int64Builder::with_capacity(row_count);
1097                for _ in 0..row_count {
1098                    builder.append_null();
1099                }
1100                arrays.push(Arc::new(builder.finish()));
1101            }
1102            DataType::Float64 => {
1103                let mut builder = arrow::array::Float64Builder::with_capacity(row_count);
1104                for _ in 0..row_count {
1105                    builder.append_null();
1106                }
1107                arrays.push(Arc::new(builder.finish()));
1108            }
1109            DataType::Utf8 => {
1110                let mut builder = arrow::array::StringBuilder::with_capacity(row_count, 0);
1111                for _ in 0..row_count {
1112                    builder.append_null();
1113                }
1114                arrays.push(Arc::new(builder.finish()));
1115            }
1116            DataType::Date32 => {
1117                let mut builder = arrow::array::Date32Builder::with_capacity(row_count);
1118                for _ in 0..row_count {
1119                    builder.append_null();
1120                }
1121                arrays.push(Arc::new(builder.finish()));
1122            }
1123            other => {
1124                return Err(Error::InvalidArgumentError(format!(
1125                    "unsupported data type in null synthesis: {other:?}"
1126                )));
1127            }
1128        }
1129    }
1130
1131    let batch = RecordBatch::try_new(schema, arrays)?;
1132    Ok(vec![batch])
1133}
1134
1135// Translate predicate from column names to field IDs
1136fn translate_predicate(
1137    expr: llkv_expr::expr::Expr<'static, String>,
1138    schema: &ExecutorSchema,
1139) -> ExecutorResult<llkv_expr::expr::Expr<'static, FieldId>> {
1140    use llkv_expr::expr::Expr;
1141    match expr {
1142        Expr::And(exprs) => {
1143            let translated: Result<Vec<_>, _> = exprs
1144                .into_iter()
1145                .map(|e| translate_predicate(e, schema))
1146                .collect();
1147            Ok(Expr::And(translated?))
1148        }
1149        Expr::Or(exprs) => {
1150            let translated: Result<Vec<_>, _> = exprs
1151                .into_iter()
1152                .map(|e| translate_predicate(e, schema))
1153                .collect();
1154            Ok(Expr::Or(translated?))
1155        }
1156        Expr::Not(inner) => {
1157            let translated = translate_predicate(*inner, schema)?;
1158            Ok(Expr::Not(Box::new(translated)))
1159        }
1160        Expr::Pred(filter) => {
1161            let column = schema.resolve(&filter.field_id).ok_or_else(|| {
1162                Error::InvalidArgumentError(format!("unknown column '{}'", filter.field_id))
1163            })?;
1164            Ok(Expr::Pred(Filter {
1165                field_id: column.field_id,
1166                op: filter.op,
1167            }))
1168        }
1169        Expr::Compare { left, op, right } => Ok(Expr::Compare {
1170            left: translate_scalar(&left, schema)?,
1171            op,
1172            right: translate_scalar(&right, schema)?,
1173        }),
1174    }
1175}
1176
1177// Translate scalar expressions
1178fn translate_scalar(
1179    expr: &ScalarExpr<String>,
1180    schema: &ExecutorSchema,
1181) -> ExecutorResult<ScalarExpr<FieldId>> {
1182    match expr {
1183        ScalarExpr::Literal(lit) => Ok(ScalarExpr::Literal(lit.clone())),
1184        ScalarExpr::Column(name) => {
1185            let column = schema
1186                .resolve(name)
1187                .ok_or_else(|| Error::InvalidArgumentError(format!("unknown column '{}'", name)))?;
1188            Ok(ScalarExpr::Column(column.field_id))
1189        }
1190        ScalarExpr::Binary { left, op, right } => Ok(ScalarExpr::Binary {
1191            left: Box::new(translate_scalar(left, schema)?),
1192            op: *op,
1193            right: Box::new(translate_scalar(right, schema)?),
1194        }),
1195        ScalarExpr::Aggregate(agg) => {
1196            // Translate column names in aggregate calls to field IDs
1197            use llkv_expr::expr::AggregateCall;
1198            let translated_agg = match agg {
1199                AggregateCall::CountStar => AggregateCall::CountStar,
1200                AggregateCall::Count(name) => {
1201                    let column = schema.resolve(name).ok_or_else(|| {
1202                        Error::InvalidArgumentError(format!("unknown column '{}'", name))
1203                    })?;
1204                    AggregateCall::Count(column.field_id)
1205                }
1206                AggregateCall::Sum(name) => {
1207                    let column = schema.resolve(name).ok_or_else(|| {
1208                        Error::InvalidArgumentError(format!("unknown column '{}'", name))
1209                    })?;
1210                    AggregateCall::Sum(column.field_id)
1211                }
1212                AggregateCall::Min(name) => {
1213                    let column = schema.resolve(name).ok_or_else(|| {
1214                        Error::InvalidArgumentError(format!("unknown column '{}'", name))
1215                    })?;
1216                    AggregateCall::Min(column.field_id)
1217                }
1218                AggregateCall::Max(name) => {
1219                    let column = schema.resolve(name).ok_or_else(|| {
1220                        Error::InvalidArgumentError(format!("unknown column '{}'", name))
1221                    })?;
1222                    AggregateCall::Max(column.field_id)
1223                }
1224                AggregateCall::CountNulls(name) => {
1225                    let column = schema.resolve(name).ok_or_else(|| {
1226                        Error::InvalidArgumentError(format!("unknown column '{}'", name))
1227                    })?;
1228                    AggregateCall::CountNulls(column.field_id)
1229                }
1230            };
1231            Ok(ScalarExpr::Aggregate(translated_agg))
1232        }
1233    }
1234}