llkv_executor/
lib.rs

1//! Query execution engine for LLKV.
2//!
3//! This crate provides the query execution layer that sits between the query planner
4//! (`llkv-plan`) and the storage layer (`llkv-table`, `llkv-column-map`).
5//!
6//! # Module Organization
7//!
8//! - [`translation`]: Expression and projection translation utilities
9//! - [`types`]: Core type definitions (tables, schemas, columns)  
10//! - [`insert`]: INSERT operation support (value coercion)
11//! - [`utils`]: Utility functions (time)
12//!
13//! The [`QueryExecutor`] and [`SelectExecution`] implementations are defined inline
14//! in this module for now, but should be extracted to a dedicated `query` module
15//! in a future refactoring.
16
17use arrow::array::{
18    Array, ArrayRef, BooleanArray, BooleanBuilder, Float64Array, Int64Array, Int64Builder,
19    RecordBatch, StringArray, UInt32Array, new_null_array,
20};
21use arrow::compute::{
22    SortColumn, SortOptions, concat_batches, filter_record_batch, lexsort_to_indices, take,
23};
24use arrow::datatypes::{DataType, Field, Float64Type, Int64Type, Schema};
25use llkv_aggregate::{AggregateAccumulator, AggregateKind, AggregateSpec, AggregateState};
26use llkv_column_map::store::Projection as StoreProjection;
27use llkv_column_map::types::LogicalFieldId;
28use llkv_expr::expr::{AggregateCall, CompareOp, Expr as LlkvExpr, Filter, Operator, ScalarExpr};
29use llkv_expr::literal::Literal;
30use llkv_expr::typed_predicate::{
31    build_bool_predicate, build_fixed_width_predicate, build_var_width_predicate,
32};
33use llkv_join::cross_join_pair;
34use llkv_plan::{
35    AggregateExpr, AggregateFunction, CanonicalRow, OrderByPlan, OrderSortType, OrderTarget,
36    PlanValue, SelectPlan, SelectProjection,
37};
38use llkv_result::Error;
39use llkv_storage::pager::Pager;
40use llkv_table::table::{
41    RowIdFilter, ScanOrderDirection, ScanOrderSpec, ScanOrderTransform, ScanProjection,
42    ScanStreamOptions,
43};
44use llkv_table::types::FieldId;
45use llkv_table::{NumericArray, NumericArrayMap, NumericKernels, ROW_ID_FIELD_ID};
46use rustc_hash::{FxHashMap, FxHashSet};
47use simd_r_drive_entry_handle::EntryHandle;
48use std::fmt;
49use std::sync::Arc;
50use std::sync::atomic::Ordering;
51
52// ============================================================================
53// Module Declarations
54// ============================================================================
55
56pub mod insert;
57pub mod translation;
58pub mod types;
59pub mod utils;
60
61// ============================================================================
62// Type Aliases and Re-exports
63// ============================================================================
64
65/// Result type for executor operations.
66pub type ExecutorResult<T> = Result<T, Error>;
67
68pub use insert::{
69    build_array_for_column, normalize_insert_value_for_column, resolve_insert_columns,
70};
71pub use translation::{
72    build_projected_columns, build_wildcard_projections, full_table_scan_filter,
73    resolve_field_id_from_schema, schema_for_projections, translate_predicate,
74    translate_predicate_with, translate_scalar, translate_scalar_with,
75};
76pub use types::{
77    ExecutorColumn, ExecutorMultiColumnUnique, ExecutorRowBatch, ExecutorSchema, ExecutorTable,
78    ExecutorTableProvider,
79};
80pub use utils::current_time_micros;
81
82// ============================================================================
83// Query Executor - Implementation
84// ============================================================================
85// TODO: Extract this implementation into a dedicated query/ module
86
87/// Query executor that executes SELECT plans.
88pub struct QueryExecutor<P>
89where
90    P: Pager<Blob = EntryHandle> + Send + Sync,
91{
92    provider: Arc<dyn ExecutorTableProvider<P>>,
93}
94
95impl<P> QueryExecutor<P>
96where
97    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
98{
99    pub fn new(provider: Arc<dyn ExecutorTableProvider<P>>) -> Self {
100        Self { provider }
101    }
102
103    pub fn execute_select(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
104        self.execute_select_with_filter(plan, None)
105    }
106
107    pub fn execute_select_with_filter(
108        &self,
109        plan: SelectPlan,
110        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
111    ) -> ExecutorResult<SelectExecution<P>> {
112        // Handle SELECT without FROM clause (e.g., SELECT 42, SELECT {'a': 1})
113        if plan.tables.is_empty() {
114            return self.execute_select_without_table(plan);
115        }
116
117        // Handle multi-table queries (cross products/joins)
118        if plan.tables.len() > 1 {
119            return self.execute_cross_product(plan);
120        }
121
122        // Single table query
123        let table_ref = &plan.tables[0];
124        let table = self.provider.get_table(&table_ref.qualified_name())?;
125        let display_name = table_ref.qualified_name();
126
127        if !plan.aggregates.is_empty() {
128            self.execute_aggregates(table, display_name, plan, row_filter)
129        } else if self.has_computed_aggregates(&plan) {
130            // Handle computed projections that contain embedded aggregates
131            self.execute_computed_aggregates(table, display_name, plan, row_filter)
132        } else {
133            self.execute_projection(table, display_name, plan, row_filter)
134        }
135    }
136
137    /// Check if any computed projections contain aggregate functions
138    fn has_computed_aggregates(&self, plan: &SelectPlan) -> bool {
139        plan.projections.iter().any(|proj| {
140            if let SelectProjection::Computed { expr, .. } = proj {
141                Self::expr_contains_aggregate(expr)
142            } else {
143                false
144            }
145        })
146    }
147
148    /// Recursively check if a scalar expression contains aggregates
149    fn expr_contains_aggregate(expr: &ScalarExpr<String>) -> bool {
150        match expr {
151            ScalarExpr::Aggregate(_) => true,
152            ScalarExpr::Binary { left, right, .. } => {
153                Self::expr_contains_aggregate(left) || Self::expr_contains_aggregate(right)
154            }
155            ScalarExpr::GetField { base, .. } => Self::expr_contains_aggregate(base),
156            ScalarExpr::Column(_) | ScalarExpr::Literal(_) => false,
157        }
158    }
159
160    /// Execute a SELECT without a FROM clause (e.g., SELECT 42, SELECT {'a': 1})
161    fn execute_select_without_table(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
162        use arrow::array::ArrayRef;
163        use arrow::datatypes::Field;
164
165        // Build schema from computed projections
166        let mut fields = Vec::new();
167        let mut arrays: Vec<ArrayRef> = Vec::new();
168
169        for proj in &plan.projections {
170            match proj {
171                SelectProjection::Computed { expr, alias } => {
172                    // Infer the data type from the expression
173                    let (field_name, dtype, array) = match expr {
174                        ScalarExpr::Literal(lit) => {
175                            let (dtype, array) = Self::literal_to_array(lit)?;
176                            (alias.clone(), dtype, array)
177                        }
178                        _ => {
179                            return Err(Error::InvalidArgumentError(
180                                "SELECT without FROM only supports literal expressions".into(),
181                            ));
182                        }
183                    };
184
185                    fields.push(Field::new(field_name, dtype, true));
186                    arrays.push(array);
187                }
188                _ => {
189                    return Err(Error::InvalidArgumentError(
190                        "SELECT without FROM only supports computed projections".into(),
191                    ));
192                }
193            }
194        }
195
196        let schema = Arc::new(Schema::new(fields));
197        let mut batch = RecordBatch::try_new(Arc::clone(&schema), arrays)
198            .map_err(|e| Error::Internal(format!("failed to create record batch: {}", e)))?;
199
200        if plan.distinct {
201            let mut state = DistinctState::default();
202            batch = match distinct_filter_batch(batch, &mut state)? {
203                Some(filtered) => filtered,
204                None => RecordBatch::new_empty(Arc::clone(&schema)),
205            };
206        }
207
208        let schema = batch.schema();
209
210        Ok(SelectExecution::new_single_batch(
211            String::new(), // No table name
212            schema,
213            batch,
214        ))
215    }
216
217    /// Convert a Literal to an Arrow array (recursive for nested structs)
218    fn literal_to_array(lit: &llkv_expr::literal::Literal) -> ExecutorResult<(DataType, ArrayRef)> {
219        use arrow::array::{
220            ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray, StructArray,
221            new_null_array,
222        };
223        use arrow::datatypes::{DataType, Field};
224        use llkv_expr::literal::Literal;
225
226        match lit {
227            Literal::Integer(v) => {
228                let val = i64::try_from(*v).unwrap_or(0);
229                Ok((
230                    DataType::Int64,
231                    Arc::new(Int64Array::from(vec![val])) as ArrayRef,
232                ))
233            }
234            Literal::Float(v) => Ok((
235                DataType::Float64,
236                Arc::new(Float64Array::from(vec![*v])) as ArrayRef,
237            )),
238            Literal::Boolean(v) => Ok((
239                DataType::Boolean,
240                Arc::new(BooleanArray::from(vec![*v])) as ArrayRef,
241            )),
242            Literal::String(v) => Ok((
243                DataType::Utf8,
244                Arc::new(StringArray::from(vec![v.clone()])) as ArrayRef,
245            )),
246            Literal::Null => Ok((DataType::Null, new_null_array(&DataType::Null, 1))),
247            Literal::Struct(struct_fields) => {
248                // Build a struct array recursively
249                let mut inner_fields = Vec::new();
250                let mut inner_arrays = Vec::new();
251
252                for (field_name, field_lit) in struct_fields {
253                    let (field_dtype, field_array) = Self::literal_to_array(field_lit)?;
254                    inner_fields.push(Field::new(field_name.clone(), field_dtype, true));
255                    inner_arrays.push(field_array);
256                }
257
258                let struct_array =
259                    StructArray::try_new(inner_fields.clone().into(), inner_arrays, None).map_err(
260                        |e| Error::Internal(format!("failed to create struct array: {}", e)),
261                    )?;
262
263                Ok((
264                    DataType::Struct(inner_fields.into()),
265                    Arc::new(struct_array) as ArrayRef,
266                ))
267            }
268        }
269    }
270
271    /// Execute a cross product query (FROM table1, table2, ...)
272    fn execute_cross_product(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
273        use arrow::compute::concat_batches;
274
275        if plan.tables.len() < 2 {
276            return Err(Error::InvalidArgumentError(
277                "cross product requires at least 2 tables".into(),
278            ));
279        }
280
281        // Acquire table handles and materialize the base batches for each table.
282        let mut tables = Vec::with_capacity(plan.tables.len());
283        for table_ref in &plan.tables {
284            let qualified_name = table_ref.qualified_name();
285            let table = self.provider.get_table(&qualified_name)?;
286            tables.push((table_ref.clone(), table));
287        }
288
289        let mut staged: Vec<TableCrossProductData> = Vec::with_capacity(tables.len());
290        for (table_ref, table) in &tables {
291            staged.push(collect_table_data(table_ref, table.as_ref())?);
292        }
293
294        let mut staged_iter = staged.into_iter();
295        let mut current = staged_iter
296            .next()
297            .ok_or_else(|| Error::Internal("cross product preparation yielded no tables".into()))?;
298
299        for next in staged_iter {
300            current = cross_join_table_batches(current, next)?;
301        }
302
303        let TableCrossProductData {
304            schema: combined_schema,
305            batches: mut combined_batches,
306            column_counts,
307        } = current;
308
309        let column_lookup_map = build_cross_product_column_lookup(
310            combined_schema.as_ref(),
311            &plan.tables,
312            &column_counts,
313        );
314
315        if let Some(filter_expr) = &plan.filter {
316            let mut filter_context = CrossProductExpressionContext::new(
317                combined_schema.as_ref(),
318                column_lookup_map.clone(),
319            )?;
320            let translated_filter =
321                translate_predicate(filter_expr.clone(), filter_context.schema(), |name| {
322                    Error::InvalidArgumentError(format!(
323                        "column '{}' not found in cross product result",
324                        name
325                    ))
326                })?;
327
328            let mut filtered_batches = Vec::with_capacity(combined_batches.len());
329            for batch in combined_batches.into_iter() {
330                filter_context.reset();
331                let mask = filter_context.evaluate_predicate_mask(&translated_filter, &batch)?;
332                let filtered = filter_record_batch(&batch, &mask).map_err(|err| {
333                    Error::InvalidArgumentError(format!(
334                        "failed to apply cross product filter: {err}"
335                    ))
336                })?;
337                if filtered.num_rows() > 0 {
338                    filtered_batches.push(filtered);
339                }
340            }
341            combined_batches = filtered_batches;
342        }
343
344        let mut combined_batch = if combined_batches.is_empty() {
345            RecordBatch::new_empty(Arc::clone(&combined_schema))
346        } else if combined_batches.len() == 1 {
347            combined_batches.pop().unwrap()
348        } else {
349            concat_batches(&combined_schema, &combined_batches).map_err(|e| {
350                Error::Internal(format!(
351                    "failed to concatenate cross product batches: {}",
352                    e
353                ))
354            })?
355        };
356
357        // Apply SELECT projections if specified
358        if !plan.projections.is_empty() {
359            let mut selected_fields = Vec::new();
360            let mut selected_columns = Vec::new();
361            let mut expr_context: Option<CrossProductExpressionContext> = None;
362
363            for proj in &plan.projections {
364                match proj {
365                    SelectProjection::AllColumns => {
366                        // Keep all columns
367                        selected_fields = combined_schema.fields().iter().cloned().collect();
368                        selected_columns = combined_batch.columns().to_vec();
369                        break;
370                    }
371                    SelectProjection::AllColumnsExcept { exclude } => {
372                        // Keep all columns except the excluded ones
373                        let exclude_lower: Vec<String> =
374                            exclude.iter().map(|e| e.to_ascii_lowercase()).collect();
375
376                        for (idx, field) in combined_schema.fields().iter().enumerate() {
377                            let field_name_lower = field.name().to_ascii_lowercase();
378                            if !exclude_lower.contains(&field_name_lower) {
379                                selected_fields.push(field.clone());
380                                selected_columns.push(combined_batch.column(idx).clone());
381                            }
382                        }
383                        break;
384                    }
385                    SelectProjection::Column { name, alias } => {
386                        // Find the column by qualified name
387                        let col_name = name.to_ascii_lowercase();
388                        if let Some(&idx) = column_lookup_map.get(&col_name) {
389                            let field = combined_schema.field(idx);
390                            let output_name = alias.as_ref().unwrap_or(name).clone();
391                            selected_fields.push(Arc::new(arrow::datatypes::Field::new(
392                                output_name,
393                                field.data_type().clone(),
394                                field.is_nullable(),
395                            )));
396                            selected_columns.push(combined_batch.column(idx).clone());
397                        } else {
398                            return Err(Error::InvalidArgumentError(format!(
399                                "column '{}' not found in cross product result",
400                                name
401                            )));
402                        }
403                    }
404                    SelectProjection::Computed { expr, alias } => {
405                        if expr_context.is_none() {
406                            expr_context = Some(CrossProductExpressionContext::new(
407                                combined_schema.as_ref(),
408                                column_lookup_map.clone(),
409                            )?);
410                        }
411                        let context = expr_context
412                            .as_mut()
413                            .expect("projection context must be initialized");
414                        let evaluated = context.evaluate(expr, &combined_batch)?;
415                        let field = Arc::new(arrow::datatypes::Field::new(
416                            alias.clone(),
417                            evaluated.data_type().clone(),
418                            true,
419                        ));
420                        selected_fields.push(field);
421                        selected_columns.push(evaluated);
422                    }
423                }
424            }
425
426            let projected_schema = Arc::new(Schema::new(selected_fields));
427            combined_batch = RecordBatch::try_new(projected_schema, selected_columns)
428                .map_err(|e| Error::Internal(format!("failed to apply projections: {}", e)))?;
429        }
430
431        if plan.distinct {
432            let mut state = DistinctState::default();
433            let source_schema = combined_batch.schema();
434            combined_batch = match distinct_filter_batch(combined_batch, &mut state)? {
435                Some(filtered) => filtered,
436                None => RecordBatch::new_empty(source_schema),
437            };
438        }
439
440        let schema = combined_batch.schema();
441
442        let display_name = tables
443            .iter()
444            .map(|(table_ref, _)| table_ref.qualified_name())
445            .collect::<Vec<_>>()
446            .join(",");
447
448        Ok(SelectExecution::new_single_batch(
449            display_name,
450            schema,
451            combined_batch,
452        ))
453    }
454
455    fn execute_projection(
456        &self,
457        table: Arc<ExecutorTable<P>>,
458        display_name: String,
459        plan: SelectPlan,
460        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
461    ) -> ExecutorResult<SelectExecution<P>> {
462        let table_ref = table.as_ref();
463        let projections = if plan.projections.is_empty() {
464            build_wildcard_projections(table_ref)
465        } else {
466            build_projected_columns(table_ref, &plan.projections)?
467        };
468        let schema = schema_for_projections(table_ref, &projections)?;
469
470        let (filter_expr, full_table_scan) = match plan.filter {
471            Some(expr) => (
472                crate::translation::expression::translate_predicate(
473                    expr,
474                    table_ref.schema.as_ref(),
475                    |name| Error::InvalidArgumentError(format!("unknown column '{}'", name)),
476                )?,
477                false,
478            ),
479            None => {
480                let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
481                    Error::InvalidArgumentError(
482                        "table has no columns; cannot perform wildcard scan".into(),
483                    )
484                })?;
485                (
486                    crate::translation::expression::full_table_scan_filter(field_id),
487                    true,
488                )
489            }
490        };
491
492        let expanded_order = expand_order_targets(&plan.order_by, &projections)?;
493        let physical_order = if let Some(first) = expanded_order.first() {
494            Some(resolve_scan_order(table_ref, &projections, first)?)
495        } else {
496            None
497        };
498
499        let options = if let Some(order_spec) = physical_order {
500            if row_filter.is_some() {
501                tracing::debug!("Applying MVCC row filter with ORDER BY");
502            }
503            ScanStreamOptions {
504                include_nulls: true,
505                order: Some(order_spec),
506                row_id_filter: row_filter.clone(),
507            }
508        } else {
509            if row_filter.is_some() {
510                tracing::debug!("Applying MVCC row filter");
511            }
512            ScanStreamOptions {
513                include_nulls: true,
514                order: None,
515                row_id_filter: row_filter.clone(),
516            }
517        };
518
519        Ok(SelectExecution::new_projection(
520            display_name,
521            schema,
522            table,
523            projections,
524            filter_expr,
525            options,
526            full_table_scan,
527            expanded_order,
528            plan.distinct,
529        ))
530    }
531
532    fn execute_aggregates(
533        &self,
534        table: Arc<ExecutorTable<P>>,
535        display_name: String,
536        plan: SelectPlan,
537        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
538    ) -> ExecutorResult<SelectExecution<P>> {
539        let table_ref = table.as_ref();
540        let distinct = plan.distinct;
541        let mut specs: Vec<AggregateSpec> = Vec::with_capacity(plan.aggregates.len());
542        for aggregate in plan.aggregates {
543            match aggregate {
544                AggregateExpr::CountStar { alias } => {
545                    specs.push(AggregateSpec {
546                        alias,
547                        kind: AggregateKind::CountStar,
548                    });
549                }
550                AggregateExpr::Column {
551                    column,
552                    alias,
553                    function,
554                    distinct,
555                } => {
556                    let col = table_ref.schema.resolve(&column).ok_or_else(|| {
557                        Error::InvalidArgumentError(format!(
558                            "unknown column '{}' in aggregate",
559                            column
560                        ))
561                    })?;
562
563                    let kind = match function {
564                        AggregateFunction::Count => {
565                            if distinct {
566                                AggregateKind::CountDistinctField {
567                                    field_id: col.field_id,
568                                }
569                            } else {
570                                AggregateKind::CountField {
571                                    field_id: col.field_id,
572                                }
573                            }
574                        }
575                        AggregateFunction::SumInt64 => {
576                            if col.data_type != DataType::Int64 {
577                                return Err(Error::InvalidArgumentError(
578                                    "SUM currently supports only INTEGER columns".into(),
579                                ));
580                            }
581                            AggregateKind::SumInt64 {
582                                field_id: col.field_id,
583                            }
584                        }
585                        AggregateFunction::MinInt64 => {
586                            if col.data_type != DataType::Int64 {
587                                return Err(Error::InvalidArgumentError(
588                                    "MIN currently supports only INTEGER columns".into(),
589                                ));
590                            }
591                            AggregateKind::MinInt64 {
592                                field_id: col.field_id,
593                            }
594                        }
595                        AggregateFunction::MaxInt64 => {
596                            if col.data_type != DataType::Int64 {
597                                return Err(Error::InvalidArgumentError(
598                                    "MAX currently supports only INTEGER columns".into(),
599                                ));
600                            }
601                            AggregateKind::MaxInt64 {
602                                field_id: col.field_id,
603                            }
604                        }
605                        AggregateFunction::CountNulls => {
606                            if distinct {
607                                return Err(Error::InvalidArgumentError(
608                                    "DISTINCT is not supported for COUNT_NULLS".into(),
609                                ));
610                            }
611                            AggregateKind::CountNulls {
612                                field_id: col.field_id,
613                            }
614                        }
615                    };
616                    specs.push(AggregateSpec { alias, kind });
617                }
618            }
619        }
620
621        if specs.is_empty() {
622            return Err(Error::InvalidArgumentError(
623                "aggregate query requires at least one aggregate expression".into(),
624            ));
625        }
626
627        let had_filter = plan.filter.is_some();
628        let filter_expr = match plan.filter {
629            Some(expr) => crate::translation::expression::translate_predicate(
630                expr,
631                table.schema.as_ref(),
632                |name| Error::InvalidArgumentError(format!("unknown column '{}'", name)),
633            )?,
634            None => {
635                let field_id = table.schema.first_field_id().ok_or_else(|| {
636                    Error::InvalidArgumentError(
637                        "table has no columns; cannot perform aggregate scan".into(),
638                    )
639                })?;
640                crate::translation::expression::full_table_scan_filter(field_id)
641            }
642        };
643
644        // Build projections and track which projection index each spec uses
645        let mut projections = Vec::new();
646        let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
647
648        for spec in &specs {
649            if let Some(field_id) = spec.kind.field_id() {
650                let proj_idx = projections.len();
651                spec_to_projection.push(Some(proj_idx));
652                projections.push(ScanProjection::from(StoreProjection::with_alias(
653                    LogicalFieldId::for_user(table.table.table_id(), field_id),
654                    table
655                        .schema
656                        .column_by_field_id(field_id)
657                        .map(|c| c.name.clone())
658                        .unwrap_or_else(|| format!("col{field_id}")),
659                )));
660            } else {
661                spec_to_projection.push(None);
662            }
663        }
664
665        if projections.is_empty() {
666            let field_id = table.schema.first_field_id().ok_or_else(|| {
667                Error::InvalidArgumentError(
668                    "table has no columns; cannot perform aggregate scan".into(),
669                )
670            })?;
671            projections.push(ScanProjection::from(StoreProjection::with_alias(
672                LogicalFieldId::for_user(table.table.table_id(), field_id),
673                table
674                    .schema
675                    .column_by_field_id(field_id)
676                    .map(|c| c.name.clone())
677                    .unwrap_or_else(|| format!("col{field_id}")),
678            )));
679        }
680
681        let options = ScanStreamOptions {
682            include_nulls: true,
683            order: None,
684            row_id_filter: row_filter.clone(),
685        };
686
687        let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
688        // MVCC Note: We cannot use the total_rows shortcut when MVCC visibility filtering
689        // is enabled, because some rows may be invisible due to uncommitted or aborted transactions.
690        // Always scan to apply proper visibility rules.
691        let mut count_star_override: Option<i64> = None;
692        if !had_filter && row_filter.is_none() {
693            // Only use shortcut if no filter AND no MVCC row filtering
694            let total_rows = table.total_rows.load(Ordering::SeqCst);
695            tracing::debug!(
696                "[AGGREGATE] Using COUNT(*) shortcut: total_rows={}",
697                total_rows
698            );
699            if total_rows > i64::MAX as u64 {
700                return Err(Error::InvalidArgumentError(
701                    "COUNT(*) result exceeds supported range".into(),
702                ));
703            }
704            count_star_override = Some(total_rows as i64);
705        } else {
706            tracing::debug!(
707                "[AGGREGATE] NOT using COUNT(*) shortcut: had_filter={}, has_row_filter={}",
708                had_filter,
709                row_filter.is_some()
710            );
711        }
712
713        for (idx, spec) in specs.iter().enumerate() {
714            states.push(AggregateState {
715                alias: spec.alias.clone(),
716                accumulator: AggregateAccumulator::new_with_projection_index(
717                    spec,
718                    spec_to_projection[idx],
719                    count_star_override,
720                )?,
721                override_value: match spec.kind {
722                    AggregateKind::CountStar => {
723                        tracing::debug!(
724                            "[AGGREGATE] CountStar override_value={:?}",
725                            count_star_override
726                        );
727                        count_star_override
728                    }
729                    _ => None,
730                },
731            });
732        }
733
734        let mut error: Option<Error> = None;
735        match table.table.scan_stream(
736            projections,
737            &filter_expr,
738            ScanStreamOptions {
739                row_id_filter: row_filter.clone(),
740                ..options
741            },
742            |batch| {
743                if error.is_some() {
744                    return;
745                }
746                for state in &mut states {
747                    if let Err(err) = state.update(&batch) {
748                        error = Some(err);
749                        return;
750                    }
751                }
752            },
753        ) {
754            Ok(()) => {}
755            Err(llkv_result::Error::NotFound) => {
756                // Treat missing storage keys as an empty result set. This occurs
757                // for freshly created tables that have no persisted chunks yet.
758            }
759            Err(err) => return Err(err),
760        }
761        if let Some(err) = error {
762            return Err(err);
763        }
764
765        let mut fields = Vec::with_capacity(states.len());
766        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(states.len());
767        for state in states {
768            let (field, array) = state.finalize()?;
769            fields.push(field);
770            arrays.push(array);
771        }
772
773        let schema = Arc::new(Schema::new(fields));
774        let mut batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
775
776        if distinct {
777            let mut state = DistinctState::default();
778            batch = match distinct_filter_batch(batch, &mut state)? {
779                Some(filtered) => filtered,
780                None => RecordBatch::new_empty(Arc::clone(&schema)),
781            };
782        }
783
784        let schema = batch.schema();
785
786        Ok(SelectExecution::new_single_batch(
787            display_name,
788            schema,
789            batch,
790        ))
791    }
792
793    /// Execute a query where computed projections contain embedded aggregates
794    /// This extracts aggregates, computes them, then evaluates the scalar expressions
795    fn execute_computed_aggregates(
796        &self,
797        table: Arc<ExecutorTable<P>>,
798        display_name: String,
799        plan: SelectPlan,
800        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
801    ) -> ExecutorResult<SelectExecution<P>> {
802        use arrow::array::Int64Array;
803        use llkv_expr::expr::AggregateCall;
804
805        let table_ref = table.as_ref();
806        let distinct = plan.distinct;
807
808        // First, extract all unique aggregates from the projections
809        let mut aggregate_specs: Vec<(String, AggregateCall<String>)> = Vec::new();
810        for proj in &plan.projections {
811            if let SelectProjection::Computed { expr, .. } = proj {
812                Self::collect_aggregates(expr, &mut aggregate_specs);
813            }
814        }
815
816        // Compute the aggregates using the existing aggregate execution infrastructure
817        let computed_aggregates = self.compute_aggregate_values(
818            table.clone(),
819            &plan.filter,
820            &aggregate_specs,
821            row_filter.clone(),
822        )?;
823
824        // Now build the final projections by evaluating expressions with aggregates substituted
825        let mut fields = Vec::with_capacity(plan.projections.len());
826        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(plan.projections.len());
827
828        for proj in &plan.projections {
829            match proj {
830                SelectProjection::AllColumns | SelectProjection::AllColumnsExcept { .. } => {
831                    return Err(Error::InvalidArgumentError(
832                        "Wildcard projections not supported with computed aggregates".into(),
833                    ));
834                }
835                SelectProjection::Column { name, alias } => {
836                    let col = table_ref.schema.resolve(name).ok_or_else(|| {
837                        Error::InvalidArgumentError(format!("unknown column '{}'", name))
838                    })?;
839                    let field_name = alias.as_ref().unwrap_or(name);
840                    fields.push(arrow::datatypes::Field::new(
841                        field_name,
842                        col.data_type.clone(),
843                        col.nullable,
844                    ));
845                    // For regular columns in an aggregate query, we'd need to handle GROUP BY
846                    // For now, return an error as this is not supported
847                    return Err(Error::InvalidArgumentError(
848                        "Regular columns not supported in aggregate queries without GROUP BY"
849                            .into(),
850                    ));
851                }
852                SelectProjection::Computed { expr, alias } => {
853                    // Evaluate the expression with aggregates substituted
854                    let value = Self::evaluate_expr_with_aggregates(expr, &computed_aggregates)?;
855
856                    fields.push(arrow::datatypes::Field::new(alias, DataType::Int64, false));
857
858                    let array = Arc::new(Int64Array::from(vec![value])) as ArrayRef;
859                    arrays.push(array);
860                }
861            }
862        }
863
864        let schema = Arc::new(Schema::new(fields));
865        let mut batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
866
867        if distinct {
868            let mut state = DistinctState::default();
869            batch = match distinct_filter_batch(batch, &mut state)? {
870                Some(filtered) => filtered,
871                None => RecordBatch::new_empty(Arc::clone(&schema)),
872            };
873        }
874
875        let schema = batch.schema();
876
877        Ok(SelectExecution::new_single_batch(
878            display_name,
879            schema,
880            batch,
881        ))
882    }
883
884    /// Collect all aggregate calls from an expression
885    fn collect_aggregates(
886        expr: &ScalarExpr<String>,
887        aggregates: &mut Vec<(String, llkv_expr::expr::AggregateCall<String>)>,
888    ) {
889        match expr {
890            ScalarExpr::Aggregate(agg) => {
891                // Create a unique key for this aggregate
892                let key = format!("{:?}", agg);
893                if !aggregates.iter().any(|(k, _)| k == &key) {
894                    aggregates.push((key, agg.clone()));
895                }
896            }
897            ScalarExpr::Binary { left, right, .. } => {
898                Self::collect_aggregates(left, aggregates);
899                Self::collect_aggregates(right, aggregates);
900            }
901            ScalarExpr::GetField { base, .. } => {
902                Self::collect_aggregates(base, aggregates);
903            }
904            ScalarExpr::Column(_) | ScalarExpr::Literal(_) => {}
905        }
906    }
907
908    /// Compute the actual values for the aggregates
909    fn compute_aggregate_values(
910        &self,
911        table: Arc<ExecutorTable<P>>,
912        filter: &Option<llkv_expr::expr::Expr<'static, String>>,
913        aggregate_specs: &[(String, llkv_expr::expr::AggregateCall<String>)],
914        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
915    ) -> ExecutorResult<FxHashMap<String, i64>> {
916        use llkv_expr::expr::AggregateCall;
917
918        let table_ref = table.as_ref();
919        let mut results =
920            FxHashMap::with_capacity_and_hasher(aggregate_specs.len(), Default::default());
921
922        // Build aggregate specs for the aggregator
923        let mut specs: Vec<AggregateSpec> = Vec::new();
924        for (key, agg) in aggregate_specs {
925            let kind = match agg {
926                AggregateCall::CountStar => AggregateKind::CountStar,
927                AggregateCall::Count(col_name) => {
928                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
929                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
930                    })?;
931                    AggregateKind::CountField {
932                        field_id: col.field_id,
933                    }
934                }
935                AggregateCall::Sum(col_name) => {
936                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
937                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
938                    })?;
939                    AggregateKind::SumInt64 {
940                        field_id: col.field_id,
941                    }
942                }
943                AggregateCall::Min(col_name) => {
944                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
945                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
946                    })?;
947                    AggregateKind::MinInt64 {
948                        field_id: col.field_id,
949                    }
950                }
951                AggregateCall::Max(col_name) => {
952                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
953                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
954                    })?;
955                    AggregateKind::MaxInt64 {
956                        field_id: col.field_id,
957                    }
958                }
959                AggregateCall::CountNulls(col_name) => {
960                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
961                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
962                    })?;
963                    AggregateKind::CountNulls {
964                        field_id: col.field_id,
965                    }
966                }
967            };
968            specs.push(AggregateSpec {
969                alias: key.clone(),
970                kind,
971            });
972        }
973
974        // Prepare filter and projections
975        let filter_expr = match filter {
976            Some(expr) => crate::translation::expression::translate_predicate(
977                expr.clone(),
978                table_ref.schema.as_ref(),
979                |name| Error::InvalidArgumentError(format!("unknown column '{}'", name)),
980            )?,
981            None => {
982                let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
983                    Error::InvalidArgumentError(
984                        "table has no columns; cannot perform aggregate scan".into(),
985                    )
986                })?;
987                crate::translation::expression::full_table_scan_filter(field_id)
988            }
989        };
990
991        let mut projections: Vec<ScanProjection> = Vec::new();
992        let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
993        let count_star_override: Option<i64> = None;
994
995        for spec in &specs {
996            if let Some(field_id) = spec.kind.field_id() {
997                spec_to_projection.push(Some(projections.len()));
998                projections.push(ScanProjection::from(StoreProjection::with_alias(
999                    LogicalFieldId::for_user(table.table.table_id(), field_id),
1000                    table
1001                        .schema
1002                        .column_by_field_id(field_id)
1003                        .map(|c| c.name.clone())
1004                        .unwrap_or_else(|| format!("col{field_id}")),
1005                )));
1006            } else {
1007                spec_to_projection.push(None);
1008            }
1009        }
1010
1011        if projections.is_empty() {
1012            let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
1013                Error::InvalidArgumentError(
1014                    "table has no columns; cannot perform aggregate scan".into(),
1015                )
1016            })?;
1017            projections.push(ScanProjection::from(StoreProjection::with_alias(
1018                LogicalFieldId::for_user(table.table.table_id(), field_id),
1019                table
1020                    .schema
1021                    .column_by_field_id(field_id)
1022                    .map(|c| c.name.clone())
1023                    .unwrap_or_else(|| format!("col{field_id}")),
1024            )));
1025        }
1026
1027        let base_options = ScanStreamOptions {
1028            include_nulls: true,
1029            order: None,
1030            row_id_filter: None,
1031        };
1032
1033        let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
1034        for (idx, spec) in specs.iter().enumerate() {
1035            states.push(AggregateState {
1036                alias: spec.alias.clone(),
1037                accumulator: AggregateAccumulator::new_with_projection_index(
1038                    spec,
1039                    spec_to_projection[idx],
1040                    count_star_override,
1041                )?,
1042                override_value: match spec.kind {
1043                    AggregateKind::CountStar => count_star_override,
1044                    _ => None,
1045                },
1046            });
1047        }
1048
1049        let mut error: Option<Error> = None;
1050        match table.table.scan_stream(
1051            projections,
1052            &filter_expr,
1053            ScanStreamOptions {
1054                row_id_filter: row_filter.clone(),
1055                ..base_options
1056            },
1057            |batch| {
1058                if error.is_some() {
1059                    return;
1060                }
1061                for state in &mut states {
1062                    if let Err(err) = state.update(&batch) {
1063                        error = Some(err);
1064                        return;
1065                    }
1066                }
1067            },
1068        ) {
1069            Ok(()) => {}
1070            Err(llkv_result::Error::NotFound) => {}
1071            Err(err) => return Err(err),
1072        }
1073        if let Some(err) = error {
1074            return Err(err);
1075        }
1076
1077        // Extract the computed values
1078        for state in states {
1079            let alias = state.alias.clone();
1080            let (_field, array) = state.finalize()?;
1081
1082            // Extract the i64 value from the array
1083            let int64_array = array
1084                .as_any()
1085                .downcast_ref::<arrow::array::Int64Array>()
1086                .ok_or_else(|| Error::Internal("Expected Int64Array from aggregate".into()))?;
1087
1088            if int64_array.len() != 1 {
1089                return Err(Error::Internal(format!(
1090                    "Expected single value from aggregate, got {}",
1091                    int64_array.len()
1092                )));
1093            }
1094
1095            let value = if int64_array.is_null(0) {
1096                0
1097            } else {
1098                int64_array.value(0)
1099            };
1100
1101            results.insert(alias, value);
1102        }
1103
1104        Ok(results)
1105    }
1106
1107    /// Evaluate an expression by substituting aggregate values
1108    fn evaluate_expr_with_aggregates(
1109        expr: &ScalarExpr<String>,
1110        aggregates: &FxHashMap<String, i64>,
1111    ) -> ExecutorResult<i64> {
1112        use llkv_expr::expr::BinaryOp;
1113        use llkv_expr::literal::Literal;
1114
1115        match expr {
1116            ScalarExpr::Literal(Literal::Integer(v)) => Ok(*v as i64),
1117            ScalarExpr::Literal(Literal::Float(v)) => Ok(*v as i64),
1118            ScalarExpr::Literal(Literal::Boolean(v)) => Ok(if *v { 1 } else { 0 }),
1119            ScalarExpr::Literal(Literal::String(_)) => Err(Error::InvalidArgumentError(
1120                "String literals not supported in aggregate expressions".into(),
1121            )),
1122            ScalarExpr::Literal(Literal::Null) => Err(Error::InvalidArgumentError(
1123                "NULL literals not supported in aggregate expressions".into(),
1124            )),
1125            ScalarExpr::Literal(Literal::Struct(_)) => Err(Error::InvalidArgumentError(
1126                "Struct literals not supported in aggregate expressions".into(),
1127            )),
1128            ScalarExpr::Column(_) => Err(Error::InvalidArgumentError(
1129                "Column references not supported in aggregate-only expressions".into(),
1130            )),
1131            ScalarExpr::Aggregate(agg) => {
1132                let key = format!("{:?}", agg);
1133                aggregates.get(&key).copied().ok_or_else(|| {
1134                    Error::Internal(format!("Aggregate value not found for key: {}", key))
1135                })
1136            }
1137            ScalarExpr::Binary { left, op, right } => {
1138                let left_val = Self::evaluate_expr_with_aggregates(left, aggregates)?;
1139                let right_val = Self::evaluate_expr_with_aggregates(right, aggregates)?;
1140
1141                let result = match op {
1142                    BinaryOp::Add => left_val.checked_add(right_val),
1143                    BinaryOp::Subtract => left_val.checked_sub(right_val),
1144                    BinaryOp::Multiply => left_val.checked_mul(right_val),
1145                    BinaryOp::Divide => {
1146                        if right_val == 0 {
1147                            return Err(Error::InvalidArgumentError("Division by zero".into()));
1148                        }
1149                        left_val.checked_div(right_val)
1150                    }
1151                    BinaryOp::Modulo => {
1152                        if right_val == 0 {
1153                            return Err(Error::InvalidArgumentError("Modulo by zero".into()));
1154                        }
1155                        left_val.checked_rem(right_val)
1156                    }
1157                };
1158
1159                result.ok_or_else(|| {
1160                    Error::InvalidArgumentError("Arithmetic overflow in expression".into())
1161                })
1162            }
1163            ScalarExpr::GetField { .. } => Err(Error::InvalidArgumentError(
1164                "GetField not supported in aggregate-only expressions".into(),
1165            )),
1166        }
1167    }
1168}
1169
1170struct CrossProductExpressionContext {
1171    schema: Arc<ExecutorSchema>,
1172    field_id_to_index: FxHashMap<FieldId, usize>,
1173    numeric_cache: FxHashMap<FieldId, NumericArray>,
1174    column_cache: FxHashMap<FieldId, ColumnAccessor>,
1175}
1176
1177#[derive(Clone)]
1178enum ColumnAccessor {
1179    Int64(Arc<Int64Array>),
1180    Float64(Arc<Float64Array>),
1181    Boolean(Arc<BooleanArray>),
1182    Utf8(Arc<StringArray>),
1183    Null(usize),
1184}
1185
1186impl ColumnAccessor {
1187    fn from_array(array: &ArrayRef) -> ExecutorResult<Self> {
1188        match array.data_type() {
1189            DataType::Int64 => {
1190                let typed = array
1191                    .as_any()
1192                    .downcast_ref::<Int64Array>()
1193                    .ok_or_else(|| Error::Internal("expected Int64 array".into()))?
1194                    .clone();
1195                Ok(Self::Int64(Arc::new(typed)))
1196            }
1197            DataType::Float64 => {
1198                let typed = array
1199                    .as_any()
1200                    .downcast_ref::<Float64Array>()
1201                    .ok_or_else(|| Error::Internal("expected Float64 array".into()))?
1202                    .clone();
1203                Ok(Self::Float64(Arc::new(typed)))
1204            }
1205            DataType::Boolean => {
1206                let typed = array
1207                    .as_any()
1208                    .downcast_ref::<BooleanArray>()
1209                    .ok_or_else(|| Error::Internal("expected Boolean array".into()))?
1210                    .clone();
1211                Ok(Self::Boolean(Arc::new(typed)))
1212            }
1213            DataType::Utf8 => {
1214                let typed = array
1215                    .as_any()
1216                    .downcast_ref::<StringArray>()
1217                    .ok_or_else(|| Error::Internal("expected Utf8 array".into()))?
1218                    .clone();
1219                Ok(Self::Utf8(Arc::new(typed)))
1220            }
1221            DataType::Null => Ok(Self::Null(array.len())),
1222            other => Err(Error::InvalidArgumentError(format!(
1223                "unsupported column type {:?} in cross product filter",
1224                other
1225            ))),
1226        }
1227    }
1228
1229    fn len(&self) -> usize {
1230        match self {
1231            ColumnAccessor::Int64(array) => array.len(),
1232            ColumnAccessor::Float64(array) => array.len(),
1233            ColumnAccessor::Boolean(array) => array.len(),
1234            ColumnAccessor::Utf8(array) => array.len(),
1235            ColumnAccessor::Null(len) => *len,
1236        }
1237    }
1238
1239    fn is_null(&self, idx: usize) -> bool {
1240        match self {
1241            ColumnAccessor::Int64(array) => array.is_null(idx),
1242            ColumnAccessor::Float64(array) => array.is_null(idx),
1243            ColumnAccessor::Boolean(array) => array.is_null(idx),
1244            ColumnAccessor::Utf8(array) => array.is_null(idx),
1245            ColumnAccessor::Null(_) => true,
1246        }
1247    }
1248
1249    fn as_array_ref(&self) -> ArrayRef {
1250        match self {
1251            ColumnAccessor::Int64(array) => Arc::clone(array) as ArrayRef,
1252            ColumnAccessor::Float64(array) => Arc::clone(array) as ArrayRef,
1253            ColumnAccessor::Boolean(array) => Arc::clone(array) as ArrayRef,
1254            ColumnAccessor::Utf8(array) => Arc::clone(array) as ArrayRef,
1255            ColumnAccessor::Null(len) => new_null_array(&DataType::Null, *len),
1256        }
1257    }
1258}
1259
1260#[derive(Clone)]
1261enum ValueArray {
1262    Numeric(NumericArray),
1263    Boolean(Arc<BooleanArray>),
1264    Utf8(Arc<StringArray>),
1265    Null(usize),
1266}
1267
1268impl ValueArray {
1269    fn from_array(array: ArrayRef) -> ExecutorResult<Self> {
1270        match array.data_type() {
1271            DataType::Boolean => {
1272                let typed = array
1273                    .as_any()
1274                    .downcast_ref::<BooleanArray>()
1275                    .ok_or_else(|| Error::Internal("expected Boolean array".into()))?
1276                    .clone();
1277                Ok(Self::Boolean(Arc::new(typed)))
1278            }
1279            DataType::Utf8 => {
1280                let typed = array
1281                    .as_any()
1282                    .downcast_ref::<StringArray>()
1283                    .ok_or_else(|| Error::Internal("expected Utf8 array".into()))?
1284                    .clone();
1285                Ok(Self::Utf8(Arc::new(typed)))
1286            }
1287            DataType::Null => Ok(Self::Null(array.len())),
1288            DataType::Int8
1289            | DataType::Int16
1290            | DataType::Int32
1291            | DataType::Int64
1292            | DataType::UInt8
1293            | DataType::UInt16
1294            | DataType::UInt32
1295            | DataType::UInt64
1296            | DataType::Float32
1297            | DataType::Float64 => {
1298                let numeric = NumericArray::try_from_arrow(&array)?;
1299                Ok(Self::Numeric(numeric))
1300            }
1301            other => Err(Error::InvalidArgumentError(format!(
1302                "unsupported data type {:?} in cross product expression",
1303                other
1304            ))),
1305        }
1306    }
1307
1308    fn len(&self) -> usize {
1309        match self {
1310            ValueArray::Numeric(array) => array.len(),
1311            ValueArray::Boolean(array) => array.len(),
1312            ValueArray::Utf8(array) => array.len(),
1313            ValueArray::Null(len) => *len,
1314        }
1315    }
1316}
1317
1318fn truth_and(lhs: Option<bool>, rhs: Option<bool>) -> Option<bool> {
1319    match (lhs, rhs) {
1320        (Some(false), _) | (_, Some(false)) => Some(false),
1321        (Some(true), Some(true)) => Some(true),
1322        (Some(true), None) | (None, Some(true)) | (None, None) => None,
1323    }
1324}
1325
1326fn truth_or(lhs: Option<bool>, rhs: Option<bool>) -> Option<bool> {
1327    match (lhs, rhs) {
1328        (Some(true), _) | (_, Some(true)) => Some(true),
1329        (Some(false), Some(false)) => Some(false),
1330        (Some(false), None) | (None, Some(false)) | (None, None) => None,
1331    }
1332}
1333
1334fn truth_not(value: Option<bool>) -> Option<bool> {
1335    match value {
1336        Some(true) => Some(false),
1337        Some(false) => Some(true),
1338        None => None,
1339    }
1340}
1341
1342fn compare_bool(op: CompareOp, lhs: bool, rhs: bool) -> bool {
1343    let l = lhs as u8;
1344    let r = rhs as u8;
1345    match op {
1346        CompareOp::Eq => lhs == rhs,
1347        CompareOp::NotEq => lhs != rhs,
1348        CompareOp::Lt => l < r,
1349        CompareOp::LtEq => l <= r,
1350        CompareOp::Gt => l > r,
1351        CompareOp::GtEq => l >= r,
1352    }
1353}
1354
1355fn compare_str(op: CompareOp, lhs: &str, rhs: &str) -> bool {
1356    match op {
1357        CompareOp::Eq => lhs == rhs,
1358        CompareOp::NotEq => lhs != rhs,
1359        CompareOp::Lt => lhs < rhs,
1360        CompareOp::LtEq => lhs <= rhs,
1361        CompareOp::Gt => lhs > rhs,
1362        CompareOp::GtEq => lhs >= rhs,
1363    }
1364}
1365
1366fn finalize_in_list_result(has_match: bool, saw_null: bool, negated: bool) -> Option<bool> {
1367    if has_match {
1368        Some(!negated)
1369    } else if saw_null {
1370        None
1371    } else if negated {
1372        Some(true)
1373    } else {
1374        Some(false)
1375    }
1376}
1377
1378fn literal_to_constant_array(literal: &Literal, len: usize) -> ExecutorResult<ArrayRef> {
1379    match literal {
1380        Literal::Integer(v) => {
1381            let value = i64::try_from(*v).unwrap_or(0);
1382            let values = vec![value; len];
1383            Ok(Arc::new(Int64Array::from(values)) as ArrayRef)
1384        }
1385        Literal::Float(v) => {
1386            let values = vec![*v; len];
1387            Ok(Arc::new(Float64Array::from(values)) as ArrayRef)
1388        }
1389        Literal::Boolean(v) => {
1390            let values = vec![Some(*v); len];
1391            Ok(Arc::new(BooleanArray::from(values)) as ArrayRef)
1392        }
1393        Literal::String(v) => {
1394            let values: Vec<Option<String>> = (0..len).map(|_| Some(v.clone())).collect();
1395            Ok(Arc::new(StringArray::from(values)) as ArrayRef)
1396        }
1397        Literal::Null => Ok(new_null_array(&DataType::Null, len)),
1398        Literal::Struct(_) => Err(Error::InvalidArgumentError(
1399            "struct literals are not supported in cross product filters".into(),
1400        )),
1401    }
1402}
1403
1404impl CrossProductExpressionContext {
1405    fn new(schema: &Schema, lookup: FxHashMap<String, usize>) -> ExecutorResult<Self> {
1406        let mut columns = Vec::with_capacity(schema.fields().len());
1407        let mut field_id_to_index = FxHashMap::default();
1408        let mut next_field_id: FieldId = 1;
1409
1410        for (idx, field) in schema.fields().iter().enumerate() {
1411            if next_field_id == u32::MAX {
1412                return Err(Error::Internal(
1413                    "cross product projection exhausted FieldId space".into(),
1414                ));
1415            }
1416
1417            let executor_column = ExecutorColumn {
1418                name: field.name().clone(),
1419                data_type: field.data_type().clone(),
1420                nullable: field.is_nullable(),
1421                primary_key: false,
1422                unique: false,
1423                field_id: next_field_id,
1424                check_expr: None,
1425            };
1426            let field_id = next_field_id;
1427            next_field_id = next_field_id.saturating_add(1);
1428
1429            columns.push(executor_column);
1430            field_id_to_index.insert(field_id, idx);
1431        }
1432
1433        Ok(Self {
1434            schema: Arc::new(ExecutorSchema { columns, lookup }),
1435            field_id_to_index,
1436            numeric_cache: FxHashMap::default(),
1437            column_cache: FxHashMap::default(),
1438        })
1439    }
1440
1441    fn schema(&self) -> &ExecutorSchema {
1442        self.schema.as_ref()
1443    }
1444
1445    fn reset(&mut self) {
1446        self.numeric_cache.clear();
1447        self.column_cache.clear();
1448    }
1449
1450    fn evaluate(
1451        &mut self,
1452        expr: &ScalarExpr<String>,
1453        batch: &RecordBatch,
1454    ) -> ExecutorResult<ArrayRef> {
1455        let translated = translate_scalar(expr, self.schema.as_ref(), |name| {
1456            Error::InvalidArgumentError(format!(
1457                "column '{}' not found in cross product result",
1458                name
1459            ))
1460        })?;
1461
1462        self.evaluate_numeric(&translated, batch)
1463    }
1464
1465    fn evaluate_predicate_mask(
1466        &mut self,
1467        expr: &LlkvExpr<'static, FieldId>,
1468        batch: &RecordBatch,
1469    ) -> ExecutorResult<BooleanArray> {
1470        let truths = self.evaluate_predicate_truths(expr, batch)?;
1471        let mut builder = BooleanBuilder::with_capacity(truths.len());
1472        for value in truths {
1473            builder.append_value(value.unwrap_or(false));
1474        }
1475        Ok(builder.finish())
1476    }
1477
1478    fn evaluate_predicate_truths(
1479        &mut self,
1480        expr: &LlkvExpr<'static, FieldId>,
1481        batch: &RecordBatch,
1482    ) -> ExecutorResult<Vec<Option<bool>>> {
1483        match expr {
1484            LlkvExpr::Literal(value) => Ok(vec![Some(*value); batch.num_rows()]),
1485            LlkvExpr::And(children) => {
1486                if children.is_empty() {
1487                    return Ok(vec![Some(true); batch.num_rows()]);
1488                }
1489                let mut result = self.evaluate_predicate_truths(&children[0], batch)?;
1490                for child in &children[1..] {
1491                    let next = self.evaluate_predicate_truths(child, batch)?;
1492                    for (lhs, rhs) in result.iter_mut().zip(next.into_iter()) {
1493                        *lhs = truth_and(*lhs, rhs);
1494                    }
1495                }
1496                Ok(result)
1497            }
1498            LlkvExpr::Or(children) => {
1499                if children.is_empty() {
1500                    return Ok(vec![Some(false); batch.num_rows()]);
1501                }
1502                let mut result = self.evaluate_predicate_truths(&children[0], batch)?;
1503                for child in &children[1..] {
1504                    let next = self.evaluate_predicate_truths(child, batch)?;
1505                    for (lhs, rhs) in result.iter_mut().zip(next.into_iter()) {
1506                        *lhs = truth_or(*lhs, rhs);
1507                    }
1508                }
1509                Ok(result)
1510            }
1511            LlkvExpr::Not(inner) => {
1512                let mut values = self.evaluate_predicate_truths(inner, batch)?;
1513                for value in &mut values {
1514                    *value = truth_not(*value);
1515                }
1516                Ok(values)
1517            }
1518            LlkvExpr::Pred(filter) => self.evaluate_filter_truths(filter, batch),
1519            LlkvExpr::Compare { left, op, right } => {
1520                self.evaluate_compare_truths(left, *op, right, batch)
1521            }
1522            LlkvExpr::InList {
1523                expr: target,
1524                list,
1525                negated,
1526            } => self.evaluate_in_list_truths(target, list, *negated, batch),
1527        }
1528    }
1529
1530    fn evaluate_filter_truths(
1531        &mut self,
1532        filter: &Filter<FieldId>,
1533        batch: &RecordBatch,
1534    ) -> ExecutorResult<Vec<Option<bool>>> {
1535        let accessor = self.column_accessor(filter.field_id, batch)?;
1536        let len = accessor.len();
1537
1538        match &filter.op {
1539            Operator::IsNull => {
1540                let mut out = Vec::with_capacity(len);
1541                for idx in 0..len {
1542                    out.push(Some(accessor.is_null(idx)));
1543                }
1544                Ok(out)
1545            }
1546            Operator::IsNotNull => {
1547                let mut out = Vec::with_capacity(len);
1548                for idx in 0..len {
1549                    out.push(Some(!accessor.is_null(idx)));
1550                }
1551                Ok(out)
1552            }
1553            _ => match accessor {
1554                ColumnAccessor::Int64(array) => {
1555                    let predicate = build_fixed_width_predicate::<Int64Type>(&filter.op)
1556                        .map_err(Error::predicate_build)?;
1557                    let mut out = Vec::with_capacity(len);
1558                    for idx in 0..len {
1559                        if array.is_null(idx) {
1560                            out.push(None);
1561                        } else {
1562                            let value = array.value(idx);
1563                            out.push(Some(predicate.matches(&value)));
1564                        }
1565                    }
1566                    Ok(out)
1567                }
1568                ColumnAccessor::Float64(array) => {
1569                    let predicate = build_fixed_width_predicate::<Float64Type>(&filter.op)
1570                        .map_err(Error::predicate_build)?;
1571                    let mut out = Vec::with_capacity(len);
1572                    for idx in 0..len {
1573                        if array.is_null(idx) {
1574                            out.push(None);
1575                        } else {
1576                            let value = array.value(idx);
1577                            out.push(Some(predicate.matches(&value)));
1578                        }
1579                    }
1580                    Ok(out)
1581                }
1582                ColumnAccessor::Boolean(array) => {
1583                    let predicate =
1584                        build_bool_predicate(&filter.op).map_err(Error::predicate_build)?;
1585                    let mut out = Vec::with_capacity(len);
1586                    for idx in 0..len {
1587                        if array.is_null(idx) {
1588                            out.push(None);
1589                        } else {
1590                            let value = array.value(idx);
1591                            out.push(Some(predicate.matches(&value)));
1592                        }
1593                    }
1594                    Ok(out)
1595                }
1596                ColumnAccessor::Utf8(array) => {
1597                    let predicate =
1598                        build_var_width_predicate(&filter.op).map_err(Error::predicate_build)?;
1599                    let mut out = Vec::with_capacity(len);
1600                    for idx in 0..len {
1601                        if array.is_null(idx) {
1602                            out.push(None);
1603                        } else {
1604                            let value = array.value(idx);
1605                            out.push(Some(predicate.matches(value)));
1606                        }
1607                    }
1608                    Ok(out)
1609                }
1610                ColumnAccessor::Null(len) => Ok(vec![None; len]),
1611            },
1612        }
1613    }
1614
1615    fn evaluate_compare_truths(
1616        &mut self,
1617        left: &ScalarExpr<FieldId>,
1618        op: CompareOp,
1619        right: &ScalarExpr<FieldId>,
1620        batch: &RecordBatch,
1621    ) -> ExecutorResult<Vec<Option<bool>>> {
1622        let left_values = self.materialize_value_array(left, batch)?;
1623        let right_values = self.materialize_value_array(right, batch)?;
1624
1625        if left_values.len() != right_values.len() {
1626            return Err(Error::Internal(
1627                "mismatched compare operand lengths in cross product filter".into(),
1628            ));
1629        }
1630
1631        let len = left_values.len();
1632        match (&left_values, &right_values) {
1633            (ValueArray::Null(_), _) | (_, ValueArray::Null(_)) => Ok(vec![None; len]),
1634            (ValueArray::Numeric(lhs), ValueArray::Numeric(rhs)) => {
1635                let mut out = Vec::with_capacity(len);
1636                for idx in 0..len {
1637                    match (lhs.value(idx), rhs.value(idx)) {
1638                        (Some(lv), Some(rv)) => out.push(Some(NumericKernels::compare(op, lv, rv))),
1639                        _ => out.push(None),
1640                    }
1641                }
1642                Ok(out)
1643            }
1644            (ValueArray::Boolean(lhs), ValueArray::Boolean(rhs)) => {
1645                let lhs = lhs.as_ref();
1646                let rhs = rhs.as_ref();
1647                let mut out = Vec::with_capacity(len);
1648                for idx in 0..len {
1649                    if lhs.is_null(idx) || rhs.is_null(idx) {
1650                        out.push(None);
1651                    } else {
1652                        out.push(Some(compare_bool(op, lhs.value(idx), rhs.value(idx))));
1653                    }
1654                }
1655                Ok(out)
1656            }
1657            (ValueArray::Utf8(lhs), ValueArray::Utf8(rhs)) => {
1658                let lhs = lhs.as_ref();
1659                let rhs = rhs.as_ref();
1660                let mut out = Vec::with_capacity(len);
1661                for idx in 0..len {
1662                    if lhs.is_null(idx) || rhs.is_null(idx) {
1663                        out.push(None);
1664                    } else {
1665                        out.push(Some(compare_str(op, lhs.value(idx), rhs.value(idx))));
1666                    }
1667                }
1668                Ok(out)
1669            }
1670            _ => Err(Error::InvalidArgumentError(
1671                "unsupported comparison between mismatched types in cross product filter".into(),
1672            )),
1673        }
1674    }
1675
1676    fn evaluate_in_list_truths(
1677        &mut self,
1678        target: &ScalarExpr<FieldId>,
1679        list: &[ScalarExpr<FieldId>],
1680        negated: bool,
1681        batch: &RecordBatch,
1682    ) -> ExecutorResult<Vec<Option<bool>>> {
1683        let target_values = self.materialize_value_array(target, batch)?;
1684        let list_values = list
1685            .iter()
1686            .map(|expr| self.materialize_value_array(expr, batch))
1687            .collect::<ExecutorResult<Vec<_>>>()?;
1688
1689        let len = target_values.len();
1690        for values in &list_values {
1691            if values.len() != len {
1692                return Err(Error::Internal(
1693                    "mismatched IN list operand lengths in cross product filter".into(),
1694                ));
1695            }
1696        }
1697
1698        match &target_values {
1699            ValueArray::Numeric(target_numeric) => {
1700                let mut out = Vec::with_capacity(len);
1701                for idx in 0..len {
1702                    let target_value = match target_numeric.value(idx) {
1703                        Some(value) => value,
1704                        None => {
1705                            out.push(None);
1706                            continue;
1707                        }
1708                    };
1709                    let mut has_match = false;
1710                    let mut saw_null = false;
1711                    for candidate in &list_values {
1712                        match candidate {
1713                            ValueArray::Numeric(array) => match array.value(idx) {
1714                                Some(value) => {
1715                                    if NumericKernels::compare(CompareOp::Eq, target_value, value) {
1716                                        has_match = true;
1717                                        break;
1718                                    }
1719                                }
1720                                None => saw_null = true,
1721                            },
1722                            ValueArray::Null(_) => saw_null = true,
1723                            _ => {
1724                                return Err(Error::InvalidArgumentError(
1725                                    "type mismatch in IN list evaluation".into(),
1726                                ));
1727                            }
1728                        }
1729                    }
1730                    out.push(finalize_in_list_result(has_match, saw_null, negated));
1731                }
1732                Ok(out)
1733            }
1734            ValueArray::Boolean(target_bool) => {
1735                let mut out = Vec::with_capacity(len);
1736                for idx in 0..len {
1737                    if target_bool.is_null(idx) {
1738                        out.push(None);
1739                        continue;
1740                    }
1741                    let target_value = target_bool.value(idx);
1742                    let mut has_match = false;
1743                    let mut saw_null = false;
1744                    for candidate in &list_values {
1745                        match candidate {
1746                            ValueArray::Boolean(array) => {
1747                                if array.is_null(idx) {
1748                                    saw_null = true;
1749                                } else if array.value(idx) == target_value {
1750                                    has_match = true;
1751                                    break;
1752                                }
1753                            }
1754                            ValueArray::Null(_) => saw_null = true,
1755                            _ => {
1756                                return Err(Error::InvalidArgumentError(
1757                                    "type mismatch in IN list evaluation".into(),
1758                                ));
1759                            }
1760                        }
1761                    }
1762                    out.push(finalize_in_list_result(has_match, saw_null, negated));
1763                }
1764                Ok(out)
1765            }
1766            ValueArray::Utf8(target_utf8) => {
1767                let mut out = Vec::with_capacity(len);
1768                for idx in 0..len {
1769                    if target_utf8.is_null(idx) {
1770                        out.push(None);
1771                        continue;
1772                    }
1773                    let target_value = target_utf8.value(idx);
1774                    let mut has_match = false;
1775                    let mut saw_null = false;
1776                    for candidate in &list_values {
1777                        match candidate {
1778                            ValueArray::Utf8(array) => {
1779                                if array.is_null(idx) {
1780                                    saw_null = true;
1781                                } else if array.value(idx) == target_value {
1782                                    has_match = true;
1783                                    break;
1784                                }
1785                            }
1786                            ValueArray::Null(_) => saw_null = true,
1787                            _ => {
1788                                return Err(Error::InvalidArgumentError(
1789                                    "type mismatch in IN list evaluation".into(),
1790                                ));
1791                            }
1792                        }
1793                    }
1794                    out.push(finalize_in_list_result(has_match, saw_null, negated));
1795                }
1796                Ok(out)
1797            }
1798            ValueArray::Null(len) => Ok(vec![None; *len]),
1799        }
1800    }
1801
1802    fn evaluate_numeric(
1803        &mut self,
1804        expr: &ScalarExpr<FieldId>,
1805        batch: &RecordBatch,
1806    ) -> ExecutorResult<ArrayRef> {
1807        let mut required = FxHashSet::default();
1808        collect_field_ids(expr, &mut required);
1809
1810        let mut arrays = NumericArrayMap::default();
1811        for field_id in required {
1812            let numeric = self.numeric_array(field_id, batch)?;
1813            arrays.insert(field_id, numeric);
1814        }
1815
1816        NumericKernels::evaluate_batch(expr, batch.num_rows(), &arrays)
1817    }
1818
1819    fn numeric_array(
1820        &mut self,
1821        field_id: FieldId,
1822        batch: &RecordBatch,
1823    ) -> ExecutorResult<NumericArray> {
1824        if let Some(existing) = self.numeric_cache.get(&field_id) {
1825            return Ok(existing.clone());
1826        }
1827
1828        let column_index = *self.field_id_to_index.get(&field_id).ok_or_else(|| {
1829            Error::Internal("field mapping missing during cross product evaluation".into())
1830        })?;
1831
1832        let array_ref = batch.column(column_index).clone();
1833        let numeric = NumericArray::try_from_arrow(&array_ref)?;
1834        self.numeric_cache.insert(field_id, numeric.clone());
1835        Ok(numeric)
1836    }
1837
1838    fn column_accessor(
1839        &mut self,
1840        field_id: FieldId,
1841        batch: &RecordBatch,
1842    ) -> ExecutorResult<ColumnAccessor> {
1843        if let Some(existing) = self.column_cache.get(&field_id) {
1844            return Ok(existing.clone());
1845        }
1846
1847        let column_index = *self.field_id_to_index.get(&field_id).ok_or_else(|| {
1848            Error::Internal("field mapping missing during cross product evaluation".into())
1849        })?;
1850
1851        let accessor = ColumnAccessor::from_array(batch.column(column_index))?;
1852        self.column_cache.insert(field_id, accessor.clone());
1853        Ok(accessor)
1854    }
1855
1856    fn materialize_scalar_array(
1857        &mut self,
1858        expr: &ScalarExpr<FieldId>,
1859        batch: &RecordBatch,
1860    ) -> ExecutorResult<ArrayRef> {
1861        match expr {
1862            ScalarExpr::Column(field_id) => {
1863                let accessor = self.column_accessor(*field_id, batch)?;
1864                Ok(accessor.as_array_ref())
1865            }
1866            ScalarExpr::Literal(literal) => literal_to_constant_array(literal, batch.num_rows()),
1867            ScalarExpr::Binary { .. } => self.evaluate_numeric(expr, batch),
1868            ScalarExpr::Aggregate(_) => Err(Error::InvalidArgumentError(
1869                "aggregate expressions are not supported in cross product filters".into(),
1870            )),
1871            ScalarExpr::GetField { .. } => Err(Error::InvalidArgumentError(
1872                "struct field access is not supported in cross product filters".into(),
1873            )),
1874        }
1875    }
1876
1877    fn materialize_value_array(
1878        &mut self,
1879        expr: &ScalarExpr<FieldId>,
1880        batch: &RecordBatch,
1881    ) -> ExecutorResult<ValueArray> {
1882        let array = self.materialize_scalar_array(expr, batch)?;
1883        ValueArray::from_array(array)
1884    }
1885}
1886
1887// TODO: Move to llkv-aggregate?
1888fn collect_field_ids(expr: &ScalarExpr<FieldId>, out: &mut FxHashSet<FieldId>) {
1889    match expr {
1890        ScalarExpr::Column(fid) => {
1891            out.insert(*fid);
1892        }
1893        ScalarExpr::Binary { left, right, .. } => {
1894            collect_field_ids(left, out);
1895            collect_field_ids(right, out);
1896        }
1897        ScalarExpr::Aggregate(call) => match call {
1898            AggregateCall::CountStar => {}
1899            AggregateCall::Count(fid)
1900            | AggregateCall::Sum(fid)
1901            | AggregateCall::Min(fid)
1902            | AggregateCall::Max(fid)
1903            | AggregateCall::CountNulls(fid) => {
1904                out.insert(*fid);
1905            }
1906        },
1907        ScalarExpr::GetField { base, .. } => collect_field_ids(base, out),
1908        ScalarExpr::Literal(_) => {}
1909    }
1910}
1911
1912// TODO: Move to llkv-table?
1913fn table_column_key(name: &str) -> Option<String> {
1914    let trimmed = name.trim_start_matches('.');
1915    let mut parts = trimmed.split('.').collect::<Vec<_>>();
1916    if parts.len() < 2 {
1917        return None;
1918    }
1919    let column = parts.pop()?;
1920    let table = parts.pop()?;
1921    Some(format!("{}.{}", table, column))
1922}
1923
1924/// Streaming execution handle for SELECT queries.
1925#[derive(Clone)]
1926pub struct SelectExecution<P>
1927where
1928    P: Pager<Blob = EntryHandle> + Send + Sync,
1929{
1930    table_name: String,
1931    schema: Arc<Schema>,
1932    stream: SelectStream<P>,
1933}
1934
1935#[derive(Clone)]
1936enum SelectStream<P>
1937where
1938    P: Pager<Blob = EntryHandle> + Send + Sync,
1939{
1940    Projection {
1941        table: Arc<ExecutorTable<P>>,
1942        projections: Vec<ScanProjection>,
1943        filter_expr: LlkvExpr<'static, FieldId>,
1944        options: ScanStreamOptions<P>,
1945        full_table_scan: bool,
1946        order_by: Vec<OrderByPlan>,
1947        distinct: bool,
1948    },
1949    Aggregation {
1950        batch: RecordBatch,
1951    },
1952}
1953
1954impl<P> SelectExecution<P>
1955where
1956    P: Pager<Blob = EntryHandle> + Send + Sync,
1957{
1958    #[allow(clippy::too_many_arguments)]
1959    fn new_projection(
1960        table_name: String,
1961        schema: Arc<Schema>,
1962        table: Arc<ExecutorTable<P>>,
1963        projections: Vec<ScanProjection>,
1964        filter_expr: LlkvExpr<'static, FieldId>,
1965        options: ScanStreamOptions<P>,
1966        full_table_scan: bool,
1967        order_by: Vec<OrderByPlan>,
1968        distinct: bool,
1969    ) -> Self {
1970        Self {
1971            table_name,
1972            schema,
1973            stream: SelectStream::Projection {
1974                table,
1975                projections,
1976                filter_expr,
1977                options,
1978                full_table_scan,
1979                order_by,
1980                distinct,
1981            },
1982        }
1983    }
1984
1985    pub fn new_single_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
1986        Self {
1987            table_name,
1988            schema,
1989            stream: SelectStream::Aggregation { batch },
1990        }
1991    }
1992
1993    pub fn from_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
1994        Self::new_single_batch(table_name, schema, batch)
1995    }
1996
1997    pub fn table_name(&self) -> &str {
1998        &self.table_name
1999    }
2000
2001    pub fn schema(&self) -> Arc<Schema> {
2002        Arc::clone(&self.schema)
2003    }
2004
2005    pub fn stream(
2006        self,
2007        mut on_batch: impl FnMut(RecordBatch) -> ExecutorResult<()>,
2008    ) -> ExecutorResult<()> {
2009        let schema = Arc::clone(&self.schema);
2010        match self.stream {
2011            SelectStream::Projection {
2012                table,
2013                projections,
2014                filter_expr,
2015                options,
2016                full_table_scan,
2017                order_by,
2018                distinct,
2019            } => {
2020                // Early return for empty tables to avoid ColumnStore data_type() errors
2021                let total_rows = table.total_rows.load(Ordering::SeqCst);
2022                if total_rows == 0 {
2023                    // Empty table - return empty result with correct schema
2024                    return Ok(());
2025                }
2026
2027                let mut error: Option<Error> = None;
2028                let mut produced = false;
2029                let mut produced_rows: u64 = 0;
2030                let capture_nulls_first = matches!(options.order, Some(spec) if spec.nulls_first);
2031                let needs_post_sort = order_by.len() > 1;
2032                let collect_batches = needs_post_sort || capture_nulls_first;
2033                let include_nulls = options.include_nulls;
2034                let has_row_id_filter = options.row_id_filter.is_some();
2035                let mut distinct_state = if distinct {
2036                    Some(DistinctState::default())
2037                } else {
2038                    None
2039                };
2040                let scan_options = options;
2041                let mut buffered_batches: Vec<RecordBatch> = Vec::new();
2042                table
2043                    .table
2044                    .scan_stream(projections, &filter_expr, scan_options, |batch| {
2045                        if error.is_some() {
2046                            return;
2047                        }
2048                        let mut batch = batch;
2049                        if let Some(state) = distinct_state.as_mut() {
2050                            match distinct_filter_batch(batch, state) {
2051                                Ok(Some(filtered)) => {
2052                                    batch = filtered;
2053                                }
2054                                Ok(None) => {
2055                                    return;
2056                                }
2057                                Err(err) => {
2058                                    error = Some(err);
2059                                    return;
2060                                }
2061                            }
2062                        }
2063                        produced = true;
2064                        produced_rows = produced_rows.saturating_add(batch.num_rows() as u64);
2065                        if collect_batches {
2066                            buffered_batches.push(batch);
2067                        } else if let Err(err) = on_batch(batch) {
2068                            error = Some(err);
2069                        }
2070                    })?;
2071                if let Some(err) = error {
2072                    return Err(err);
2073                }
2074                if !produced {
2075                    // Only synthesize null rows if this was a full table scan
2076                    // If there was a filter and it matched no rows, we should return empty results
2077                    if !distinct && full_table_scan && total_rows > 0 {
2078                        for batch in synthesize_null_scan(Arc::clone(&schema), total_rows)? {
2079                            on_batch(batch)?;
2080                        }
2081                    }
2082                    return Ok(());
2083                }
2084                let mut null_batches: Vec<RecordBatch> = Vec::new();
2085                // Only synthesize null rows if:
2086                // 1. include_nulls is true
2087                // 2. This is a full table scan
2088                // 3. We produced fewer rows than the total
2089                // 4. We DON'T have a row_id_filter (e.g., MVCC filter) that intentionally filtered rows
2090                if !distinct
2091                    && include_nulls
2092                    && full_table_scan
2093                    && produced_rows < total_rows
2094                    && !has_row_id_filter
2095                {
2096                    let missing = total_rows - produced_rows;
2097                    if missing > 0 {
2098                        null_batches = synthesize_null_scan(Arc::clone(&schema), missing)?;
2099                    }
2100                }
2101
2102                if collect_batches {
2103                    if needs_post_sort {
2104                        if !null_batches.is_empty() {
2105                            buffered_batches.extend(null_batches);
2106                        }
2107                        if !buffered_batches.is_empty() {
2108                            let combined =
2109                                concat_batches(&schema, &buffered_batches).map_err(|err| {
2110                                    Error::InvalidArgumentError(format!(
2111                                        "failed to concatenate result batches for ORDER BY: {}",
2112                                        err
2113                                    ))
2114                                })?;
2115                            let sorted_batch =
2116                                sort_record_batch_with_order(&schema, &combined, &order_by)?;
2117                            on_batch(sorted_batch)?;
2118                        }
2119                    } else if capture_nulls_first {
2120                        for batch in null_batches {
2121                            on_batch(batch)?;
2122                        }
2123                        for batch in buffered_batches {
2124                            on_batch(batch)?;
2125                        }
2126                    }
2127                } else if !null_batches.is_empty() {
2128                    for batch in null_batches {
2129                        on_batch(batch)?;
2130                    }
2131                }
2132                Ok(())
2133            }
2134            SelectStream::Aggregation { batch } => on_batch(batch),
2135        }
2136    }
2137
2138    pub fn collect(self) -> ExecutorResult<Vec<RecordBatch>> {
2139        let mut batches = Vec::new();
2140        self.stream(|batch| {
2141            batches.push(batch);
2142            Ok(())
2143        })?;
2144        Ok(batches)
2145    }
2146
2147    pub fn collect_rows(self) -> ExecutorResult<ExecutorRowBatch> {
2148        let schema = self.schema();
2149        let mut rows: Vec<Vec<PlanValue>> = Vec::new();
2150        self.stream(|batch| {
2151            for row_idx in 0..batch.num_rows() {
2152                let mut row: Vec<PlanValue> = Vec::with_capacity(batch.num_columns());
2153                for col_idx in 0..batch.num_columns() {
2154                    let value = llkv_plan::plan_value_from_array(batch.column(col_idx), row_idx)?;
2155                    row.push(value);
2156                }
2157                rows.push(row);
2158            }
2159            Ok(())
2160        })?;
2161        let columns = schema
2162            .fields()
2163            .iter()
2164            .map(|field| field.name().to_string())
2165            .collect();
2166        Ok(ExecutorRowBatch { columns, rows })
2167    }
2168
2169    pub fn into_rows(self) -> ExecutorResult<Vec<Vec<PlanValue>>> {
2170        Ok(self.collect_rows()?.rows)
2171    }
2172}
2173
2174impl<P> fmt::Debug for SelectExecution<P>
2175where
2176    P: Pager<Blob = EntryHandle> + Send + Sync,
2177{
2178    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2179        f.debug_struct("SelectExecution")
2180            .field("table_name", &self.table_name)
2181            .field("schema", &self.schema)
2182            .finish()
2183    }
2184}
2185
2186// ============================================================================
2187// Helper Functions
2188// ============================================================================
2189
2190fn expand_order_targets(
2191    order_items: &[OrderByPlan],
2192    projections: &[ScanProjection],
2193) -> ExecutorResult<Vec<OrderByPlan>> {
2194    let mut expanded = Vec::new();
2195
2196    for item in order_items {
2197        match &item.target {
2198            OrderTarget::All => {
2199                if projections.is_empty() {
2200                    return Err(Error::InvalidArgumentError(
2201                        "ORDER BY ALL requires at least one projection".into(),
2202                    ));
2203                }
2204
2205                for (idx, projection) in projections.iter().enumerate() {
2206                    if matches!(projection, ScanProjection::Computed { .. }) {
2207                        return Err(Error::InvalidArgumentError(
2208                            "ORDER BY ALL cannot reference computed projections".into(),
2209                        ));
2210                    }
2211
2212                    let mut clone = item.clone();
2213                    clone.target = OrderTarget::Index(idx);
2214                    expanded.push(clone);
2215                }
2216            }
2217            _ => expanded.push(item.clone()),
2218        }
2219    }
2220
2221    Ok(expanded)
2222}
2223
2224fn resolve_scan_order<P>(
2225    table: &ExecutorTable<P>,
2226    projections: &[ScanProjection],
2227    order_plan: &OrderByPlan,
2228) -> ExecutorResult<ScanOrderSpec>
2229where
2230    P: Pager<Blob = EntryHandle> + Send + Sync,
2231{
2232    let (column, field_id) = match &order_plan.target {
2233        OrderTarget::Column(name) => {
2234            let column = table.schema.resolve(name).ok_or_else(|| {
2235                Error::InvalidArgumentError(format!("unknown column '{}' in ORDER BY", name))
2236            })?;
2237            (column, column.field_id)
2238        }
2239        OrderTarget::Index(position) => {
2240            let projection = projections.get(*position).ok_or_else(|| {
2241                Error::InvalidArgumentError(format!(
2242                    "ORDER BY position {} is out of range",
2243                    position + 1
2244                ))
2245            })?;
2246            match projection {
2247                ScanProjection::Column(store_projection) => {
2248                    let field_id = store_projection.logical_field_id.field_id();
2249                    let column = table.schema.column_by_field_id(field_id).ok_or_else(|| {
2250                        Error::InvalidArgumentError(format!(
2251                            "unknown column with field id {field_id} in ORDER BY"
2252                        ))
2253                    })?;
2254                    (column, field_id)
2255                }
2256                ScanProjection::Computed { .. } => {
2257                    return Err(Error::InvalidArgumentError(
2258                        "ORDER BY position referring to computed projection is not supported"
2259                            .into(),
2260                    ));
2261                }
2262            }
2263        }
2264        OrderTarget::All => {
2265            return Err(Error::InvalidArgumentError(
2266                "ORDER BY ALL should be expanded before execution".into(),
2267            ));
2268        }
2269    };
2270
2271    let transform = match order_plan.sort_type {
2272        OrderSortType::Native => match column.data_type {
2273            DataType::Int64 => ScanOrderTransform::IdentityInteger,
2274            DataType::Utf8 => ScanOrderTransform::IdentityUtf8,
2275            ref other => {
2276                return Err(Error::InvalidArgumentError(format!(
2277                    "ORDER BY on column type {:?} is not supported",
2278                    other
2279                )));
2280            }
2281        },
2282        OrderSortType::CastTextToInteger => {
2283            if column.data_type != DataType::Utf8 {
2284                return Err(Error::InvalidArgumentError(
2285                    "ORDER BY CAST expects a text column".into(),
2286                ));
2287            }
2288            ScanOrderTransform::CastUtf8ToInteger
2289        }
2290    };
2291
2292    let direction = if order_plan.ascending {
2293        ScanOrderDirection::Ascending
2294    } else {
2295        ScanOrderDirection::Descending
2296    };
2297
2298    Ok(ScanOrderSpec {
2299        field_id,
2300        direction,
2301        nulls_first: order_plan.nulls_first,
2302        transform,
2303    })
2304}
2305
2306fn synthesize_null_scan(schema: Arc<Schema>, total_rows: u64) -> ExecutorResult<Vec<RecordBatch>> {
2307    let row_count = usize::try_from(total_rows).map_err(|_| {
2308        Error::InvalidArgumentError("table row count exceeds supported in-memory batch size".into())
2309    })?;
2310
2311    let mut arrays: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
2312    for field in schema.fields() {
2313        match field.data_type() {
2314            DataType::Int64 => {
2315                let mut builder = Int64Builder::with_capacity(row_count);
2316                for _ in 0..row_count {
2317                    builder.append_null();
2318                }
2319                arrays.push(Arc::new(builder.finish()));
2320            }
2321            DataType::Float64 => {
2322                let mut builder = arrow::array::Float64Builder::with_capacity(row_count);
2323                for _ in 0..row_count {
2324                    builder.append_null();
2325                }
2326                arrays.push(Arc::new(builder.finish()));
2327            }
2328            DataType::Utf8 => {
2329                let mut builder = arrow::array::StringBuilder::with_capacity(row_count, 0);
2330                for _ in 0..row_count {
2331                    builder.append_null();
2332                }
2333                arrays.push(Arc::new(builder.finish()));
2334            }
2335            DataType::Date32 => {
2336                let mut builder = arrow::array::Date32Builder::with_capacity(row_count);
2337                for _ in 0..row_count {
2338                    builder.append_null();
2339                }
2340                arrays.push(Arc::new(builder.finish()));
2341            }
2342            other => {
2343                return Err(Error::InvalidArgumentError(format!(
2344                    "unsupported data type in null synthesis: {other:?}"
2345                )));
2346            }
2347        }
2348    }
2349
2350    let batch = RecordBatch::try_new(schema, arrays)?;
2351    Ok(vec![batch])
2352}
2353
2354struct TableCrossProductData {
2355    schema: Arc<Schema>,
2356    batches: Vec<RecordBatch>,
2357    column_counts: Vec<usize>,
2358}
2359
2360fn collect_table_data<P>(
2361    table_ref: &llkv_plan::TableRef,
2362    table: &ExecutorTable<P>,
2363) -> ExecutorResult<TableCrossProductData>
2364where
2365    P: Pager<Blob = EntryHandle> + Send + Sync,
2366{
2367    if table.schema.columns.is_empty() {
2368        return Err(Error::InvalidArgumentError(format!(
2369            "table '{}' has no columns; cross products require at least one column",
2370            table_ref.qualified_name()
2371        )));
2372    }
2373
2374    let mut projections = Vec::with_capacity(table.schema.columns.len());
2375    let mut fields = Vec::with_capacity(table.schema.columns.len());
2376
2377    for column in &table.schema.columns {
2378        let qualified_name = format!("{}.{}.{}", table_ref.schema, table_ref.table, column.name);
2379        projections.push(ScanProjection::from(StoreProjection::with_alias(
2380            LogicalFieldId::for_user(table.table.table_id(), column.field_id),
2381            qualified_name.clone(),
2382        )));
2383        fields.push(Field::new(
2384            qualified_name,
2385            column.data_type.clone(),
2386            column.nullable,
2387        ));
2388    }
2389
2390    let schema = Arc::new(Schema::new(fields));
2391
2392    let filter_field_id = table.schema.first_field_id().unwrap_or(ROW_ID_FIELD_ID);
2393    let filter_expr = crate::translation::expression::full_table_scan_filter(filter_field_id);
2394
2395    let mut raw_batches = Vec::new();
2396    table.table.scan_stream(
2397        projections,
2398        &filter_expr,
2399        ScanStreamOptions {
2400            include_nulls: true,
2401            ..ScanStreamOptions::default()
2402        },
2403        |batch| {
2404            raw_batches.push(batch);
2405        },
2406    )?;
2407
2408    let mut normalized_batches = Vec::with_capacity(raw_batches.len());
2409    for batch in raw_batches {
2410        let normalized = RecordBatch::try_new(Arc::clone(&schema), batch.columns().to_vec())
2411            .map_err(|err| {
2412                Error::Internal(format!(
2413                    "failed to align scan batch for table '{}': {}",
2414                    table_ref.qualified_name(),
2415                    err
2416                ))
2417            })?;
2418        normalized_batches.push(normalized);
2419    }
2420
2421    Ok(TableCrossProductData {
2422        schema,
2423        batches: normalized_batches,
2424        column_counts: vec![table.schema.columns.len()],
2425    })
2426}
2427
2428fn build_cross_product_column_lookup(
2429    schema: &Schema,
2430    tables: &[llkv_plan::TableRef],
2431    column_counts: &[usize],
2432) -> FxHashMap<String, usize> {
2433    debug_assert_eq!(tables.len(), column_counts.len());
2434
2435    let mut table_column_counts: FxHashMap<String, usize> = FxHashMap::default();
2436    for field in schema.fields() {
2437        if let Some(name) = table_column_key(field.name()) {
2438            *table_column_counts
2439                .entry(name.to_ascii_lowercase())
2440                .or_insert(0) += 1;
2441        }
2442    }
2443
2444    let mut lookup = FxHashMap::default();
2445
2446    for (idx, field) in schema.fields().iter().enumerate() {
2447        let name = field.name();
2448        lookup.entry(name.to_ascii_lowercase()).or_insert(idx);
2449
2450        let trimmed = name.trim_start_matches('.');
2451        lookup.entry(trimmed.to_ascii_lowercase()).or_insert(idx);
2452
2453        let mut parts: Vec<&str> = trimmed.split('.').collect();
2454        if parts.len() >= 2 {
2455            let column_part = parts.pop().unwrap();
2456            let column_lower = column_part.to_ascii_lowercase();
2457
2458            if let Some(table_only) = parts.last() {
2459                let table_lower = table_only.to_ascii_lowercase();
2460                if table_column_counts
2461                    .get(&format!("{}.{}", table_lower, column_lower))
2462                    .copied()
2463                    .unwrap_or(0)
2464                    == 1
2465                {
2466                    lookup
2467                        .entry(format!("{}.{}", table_lower, column_lower.clone()))
2468                        .or_insert(idx);
2469                }
2470            }
2471        }
2472    }
2473
2474    let mut offset = 0usize;
2475    for (table_ref, &count) in tables.iter().zip(column_counts.iter()) {
2476        if let Some(alias) = &table_ref.alias {
2477            let alias_lower = alias.to_ascii_lowercase();
2478            let end = usize::min(schema.fields().len(), offset.saturating_add(count));
2479            for column_index in offset..end {
2480                let name = schema.field(column_index).name();
2481                let trimmed = name.trim_start_matches('.');
2482                let column_lower = trimmed
2483                    .rsplit('.')
2484                    .next()
2485                    .map(|part| part.to_ascii_lowercase())
2486                    .unwrap_or_else(|| trimmed.to_ascii_lowercase());
2487                lookup
2488                    .entry(format!("{}.{}", alias_lower, column_lower))
2489                    .or_insert(column_index);
2490            }
2491        }
2492        offset = offset.saturating_add(count);
2493    }
2494
2495    lookup
2496}
2497
2498fn cross_join_table_batches(
2499    left: TableCrossProductData,
2500    right: TableCrossProductData,
2501) -> ExecutorResult<TableCrossProductData> {
2502    let TableCrossProductData {
2503        schema: left_schema,
2504        batches: left_batches,
2505        column_counts: mut left_counts,
2506    } = left;
2507    let TableCrossProductData {
2508        schema: right_schema,
2509        batches: right_batches,
2510        column_counts: right_counts,
2511    } = right;
2512
2513    let combined_fields: Vec<Field> = left_schema
2514        .fields()
2515        .iter()
2516        .chain(right_schema.fields().iter())
2517        .map(|field| field.as_ref().clone())
2518        .collect();
2519
2520    let mut column_counts = Vec::with_capacity(left_counts.len() + right_counts.len());
2521    column_counts.append(&mut left_counts);
2522    column_counts.extend(right_counts);
2523
2524    let combined_schema = Arc::new(Schema::new(combined_fields));
2525
2526    let left_has_rows = left_batches.iter().any(|batch| batch.num_rows() > 0);
2527    let right_has_rows = right_batches.iter().any(|batch| batch.num_rows() > 0);
2528
2529    if !left_has_rows || !right_has_rows {
2530        return Ok(TableCrossProductData {
2531            schema: combined_schema,
2532            batches: Vec::new(),
2533            column_counts,
2534        });
2535    }
2536
2537    let mut output_batches = Vec::new();
2538    for left_batch in &left_batches {
2539        if left_batch.num_rows() == 0 {
2540            continue;
2541        }
2542        for right_batch in &right_batches {
2543            if right_batch.num_rows() == 0 {
2544                continue;
2545            }
2546
2547            let batch =
2548                cross_join_pair(left_batch, right_batch, &combined_schema).map_err(|err| {
2549                    Error::Internal(format!("failed to build cross join batch: {err}"))
2550                })?;
2551            output_batches.push(batch);
2552        }
2553    }
2554
2555    Ok(TableCrossProductData {
2556        schema: combined_schema,
2557        batches: output_batches,
2558        column_counts,
2559    })
2560}
2561
2562#[derive(Default)]
2563struct DistinctState {
2564    seen: FxHashSet<CanonicalRow>,
2565}
2566
2567impl DistinctState {
2568    fn insert(&mut self, row: CanonicalRow) -> bool {
2569        self.seen.insert(row)
2570    }
2571}
2572
2573fn distinct_filter_batch(
2574    batch: RecordBatch,
2575    state: &mut DistinctState,
2576) -> ExecutorResult<Option<RecordBatch>> {
2577    if batch.num_rows() == 0 {
2578        return Ok(None);
2579    }
2580
2581    let mut keep_flags = Vec::with_capacity(batch.num_rows());
2582    let mut keep_count = 0usize;
2583
2584    for row_idx in 0..batch.num_rows() {
2585        let row = CanonicalRow::from_batch(&batch, row_idx)?;
2586        if state.insert(row) {
2587            keep_flags.push(true);
2588            keep_count += 1;
2589        } else {
2590            keep_flags.push(false);
2591        }
2592    }
2593
2594    if keep_count == 0 {
2595        return Ok(None);
2596    }
2597
2598    if keep_count == batch.num_rows() {
2599        return Ok(Some(batch));
2600    }
2601
2602    let mut builder = BooleanBuilder::with_capacity(batch.num_rows());
2603    for flag in keep_flags {
2604        builder.append_value(flag);
2605    }
2606    let mask = Arc::new(builder.finish());
2607
2608    let filtered = filter_record_batch(&batch, &mask).map_err(|err| {
2609        Error::InvalidArgumentError(format!("failed to apply DISTINCT filter: {err}"))
2610    })?;
2611
2612    Ok(Some(filtered))
2613}
2614
2615fn sort_record_batch_with_order(
2616    schema: &Arc<Schema>,
2617    batch: &RecordBatch,
2618    order_by: &[OrderByPlan],
2619) -> ExecutorResult<RecordBatch> {
2620    if order_by.is_empty() {
2621        return Ok(batch.clone());
2622    }
2623
2624    let mut sort_columns: Vec<SortColumn> = Vec::with_capacity(order_by.len());
2625
2626    for order in order_by {
2627        let column_index = match &order.target {
2628            OrderTarget::Column(name) => schema.index_of(name).map_err(|_| {
2629                Error::InvalidArgumentError(format!(
2630                    "ORDER BY references unknown column '{}'",
2631                    name
2632                ))
2633            })?,
2634            OrderTarget::Index(idx) => {
2635                if *idx >= batch.num_columns() {
2636                    return Err(Error::InvalidArgumentError(format!(
2637                        "ORDER BY position {} is out of bounds for {} columns",
2638                        idx + 1,
2639                        batch.num_columns()
2640                    )));
2641                }
2642                *idx
2643            }
2644            OrderTarget::All => {
2645                return Err(Error::InvalidArgumentError(
2646                    "ORDER BY ALL should be expanded before sorting".into(),
2647                ));
2648            }
2649        };
2650
2651        let source_array = batch.column(column_index);
2652
2653        let values: ArrayRef = match order.sort_type {
2654            OrderSortType::Native => Arc::clone(source_array),
2655            OrderSortType::CastTextToInteger => {
2656                let strings = source_array
2657                    .as_any()
2658                    .downcast_ref::<StringArray>()
2659                    .ok_or_else(|| {
2660                        Error::InvalidArgumentError(
2661                            "ORDER BY CAST expects the underlying column to be TEXT".into(),
2662                        )
2663                    })?;
2664                let mut builder = Int64Builder::with_capacity(strings.len());
2665                for i in 0..strings.len() {
2666                    if strings.is_null(i) {
2667                        builder.append_null();
2668                    } else {
2669                        match strings.value(i).parse::<i64>() {
2670                            Ok(value) => builder.append_value(value),
2671                            Err(_) => builder.append_null(),
2672                        }
2673                    }
2674                }
2675                Arc::new(builder.finish()) as ArrayRef
2676            }
2677        };
2678
2679        let sort_options = SortOptions {
2680            descending: !order.ascending,
2681            nulls_first: order.nulls_first,
2682        };
2683
2684        sort_columns.push(SortColumn {
2685            values,
2686            options: Some(sort_options),
2687        });
2688    }
2689
2690    let indices = lexsort_to_indices(&sort_columns, None).map_err(|err| {
2691        Error::InvalidArgumentError(format!("failed to compute ORDER BY indices: {err}"))
2692    })?;
2693
2694    let perm = indices
2695        .as_any()
2696        .downcast_ref::<UInt32Array>()
2697        .ok_or_else(|| Error::Internal("ORDER BY sorting produced unexpected index type".into()))?;
2698
2699    let mut reordered_columns: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
2700    for col_idx in 0..batch.num_columns() {
2701        let reordered = take(batch.column(col_idx), perm, None).map_err(|err| {
2702            Error::InvalidArgumentError(format!(
2703                "failed to apply ORDER BY permutation to column {col_idx}: {err}"
2704            ))
2705        })?;
2706        reordered_columns.push(reordered);
2707    }
2708
2709    RecordBatch::try_new(Arc::clone(schema), reordered_columns)
2710        .map_err(|err| Error::Internal(format!("failed to build reordered ORDER BY batch: {err}")))
2711}
2712
2713#[cfg(test)]
2714mod tests {
2715    use super::*;
2716    use arrow::array::{Array, ArrayRef, Int64Array};
2717    use arrow::datatypes::{DataType, Field, Schema};
2718    use llkv_expr::expr::BinaryOp;
2719    use std::sync::Arc;
2720
2721    #[test]
2722    fn cross_product_context_evaluates_expressions() {
2723        let schema = Arc::new(Schema::new(vec![
2724            Field::new("main.tab2.a", DataType::Int64, false),
2725            Field::new("main.tab2.b", DataType::Int64, false),
2726        ]));
2727
2728        let batch = RecordBatch::try_new(
2729            Arc::clone(&schema),
2730            vec![
2731                Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
2732                Arc::new(Int64Array::from(vec![10, 20, 30])) as ArrayRef,
2733            ],
2734        )
2735        .expect("valid batch");
2736
2737        let lookup = build_cross_product_column_lookup(schema.as_ref(), &[], &[]);
2738        let mut ctx = CrossProductExpressionContext::new(schema.as_ref(), lookup)
2739            .expect("context builds from schema");
2740
2741        let literal_expr: ScalarExpr<String> = ScalarExpr::literal(67);
2742        let literal = ctx
2743            .evaluate(&literal_expr, &batch)
2744            .expect("literal evaluation succeeds");
2745        let literal_array = literal
2746            .as_any()
2747            .downcast_ref::<Int64Array>()
2748            .expect("int64 literal result");
2749        assert_eq!(literal_array.len(), 3);
2750        assert!(literal_array.iter().all(|value| value == Some(67)));
2751
2752        let add_expr = ScalarExpr::binary(
2753            ScalarExpr::column("tab2.a".to_string()),
2754            BinaryOp::Add,
2755            ScalarExpr::literal(5),
2756        );
2757        let added = ctx
2758            .evaluate(&add_expr, &batch)
2759            .expect("column addition succeeds");
2760        let added_array = added
2761            .as_any()
2762            .downcast_ref::<Int64Array>()
2763            .expect("int64 addition result");
2764        assert_eq!(added_array.values(), &[6, 7, 8]);
2765    }
2766
2767    #[test]
2768    fn cross_product_handles_more_than_two_tables() {
2769        let schema_a = Arc::new(Schema::new(vec![Field::new(
2770            "main.t1.a",
2771            DataType::Int64,
2772            false,
2773        )]));
2774        let schema_b = Arc::new(Schema::new(vec![Field::new(
2775            "main.t2.b",
2776            DataType::Int64,
2777            false,
2778        )]));
2779        let schema_c = Arc::new(Schema::new(vec![Field::new(
2780            "main.t3.c",
2781            DataType::Int64,
2782            false,
2783        )]));
2784
2785        let batch_a = RecordBatch::try_new(
2786            Arc::clone(&schema_a),
2787            vec![Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef],
2788        )
2789        .expect("valid batch");
2790        let batch_b = RecordBatch::try_new(
2791            Arc::clone(&schema_b),
2792            vec![Arc::new(Int64Array::from(vec![10, 20, 30])) as ArrayRef],
2793        )
2794        .expect("valid batch");
2795        let batch_c = RecordBatch::try_new(
2796            Arc::clone(&schema_c),
2797            vec![Arc::new(Int64Array::from(vec![100])) as ArrayRef],
2798        )
2799        .expect("valid batch");
2800
2801        let data_a = TableCrossProductData {
2802            schema: schema_a,
2803            batches: vec![batch_a],
2804            column_counts: vec![1],
2805        };
2806        let data_b = TableCrossProductData {
2807            schema: schema_b,
2808            batches: vec![batch_b],
2809            column_counts: vec![1],
2810        };
2811        let data_c = TableCrossProductData {
2812            schema: schema_c,
2813            batches: vec![batch_c],
2814            column_counts: vec![1],
2815        };
2816
2817        let ab = cross_join_table_batches(data_a, data_b).expect("two-table product");
2818        assert_eq!(ab.schema.fields().len(), 2);
2819        assert_eq!(ab.batches.len(), 1);
2820        assert_eq!(ab.batches[0].num_rows(), 6);
2821
2822        let abc = cross_join_table_batches(ab, data_c).expect("three-table product");
2823        assert_eq!(abc.schema.fields().len(), 3);
2824        assert_eq!(abc.batches.len(), 1);
2825
2826        let final_batch = &abc.batches[0];
2827        assert_eq!(final_batch.num_rows(), 6);
2828
2829        let col_a = final_batch
2830            .column(0)
2831            .as_any()
2832            .downcast_ref::<Int64Array>()
2833            .expect("left column values");
2834        assert_eq!(col_a.values(), &[1, 1, 1, 2, 2, 2]);
2835
2836        let col_b = final_batch
2837            .column(1)
2838            .as_any()
2839            .downcast_ref::<Int64Array>()
2840            .expect("middle column values");
2841        assert_eq!(col_b.values(), &[10, 20, 30, 10, 20, 30]);
2842
2843        let col_c = final_batch
2844            .column(2)
2845            .as_any()
2846            .downcast_ref::<Int64Array>()
2847            .expect("right column values");
2848        assert_eq!(col_c.values(), &[100, 100, 100, 100, 100, 100]);
2849    }
2850}