llkv_executor/
lib.rs

1//! Query execution engine for LLKV.
2//!
3//! This crate provides the query execution layer that sits between the query planner
4//! (`llkv-plan`) and the storage layer (`llkv-table`, `llkv-column-map`).
5//!
6//! # Module Organization
7//!
8//! - [`translation`]: Expression and projection translation utilities
9//! - [`types`]: Core type definitions (tables, schemas, columns)  
10//! - [`insert`]: INSERT operation support (value coercion)
11//! - [`utils`]: Utility functions (time)
12//!
13//! The [`QueryExecutor`] and [`SelectExecution`] implementations are defined inline
14//! in this module for now, but should be extracted to a dedicated `query` module
15//! in a future refactoring.
16
17use arrow::array::{Array, ArrayRef, Int64Builder, RecordBatch, StringArray, UInt32Array};
18use arrow::compute::{SortColumn, SortOptions, concat_batches, lexsort_to_indices, take};
19use arrow::datatypes::{DataType, Schema};
20use llkv_aggregate::{AggregateAccumulator, AggregateKind, AggregateSpec, AggregateState};
21use llkv_column_map::store::Projection as StoreProjection;
22use llkv_column_map::types::LogicalFieldId;
23use llkv_expr::expr::{Expr as LlkvExpr, ScalarExpr};
24use llkv_plan::{
25    AggregateExpr, AggregateFunction, OrderByPlan, OrderSortType, OrderTarget, PlanValue,
26    SelectPlan, SelectProjection,
27};
28use llkv_result::Error;
29use llkv_storage::pager::Pager;
30use llkv_table::table::{
31    RowIdFilter, ScanOrderDirection, ScanOrderSpec, ScanOrderTransform, ScanProjection,
32    ScanStreamOptions,
33};
34use llkv_table::types::FieldId;
35use rustc_hash::FxHashMap;
36use simd_r_drive_entry_handle::EntryHandle;
37use std::fmt;
38use std::sync::Arc;
39use std::sync::atomic::Ordering;
40
41// ============================================================================
42// Module Declarations
43// ============================================================================
44
45pub mod insert;
46pub mod translation;
47pub mod types;
48pub mod utils;
49
50// ============================================================================
51// Type Aliases and Re-exports
52// ============================================================================
53
54/// Result type for executor operations.
55pub type ExecutorResult<T> = Result<T, Error>;
56
57pub use insert::{
58    build_array_for_column, normalize_insert_value_for_column, resolve_insert_columns,
59};
60pub use translation::{
61    build_projected_columns, build_wildcard_projections, full_table_scan_filter,
62    resolve_field_id_from_schema, schema_for_projections, translate_predicate,
63    translate_predicate_with, translate_scalar, translate_scalar_with,
64};
65pub use types::{
66    ExecutorColumn, ExecutorMultiColumnUnique, ExecutorRowBatch, ExecutorSchema, ExecutorTable,
67    ExecutorTableProvider,
68};
69pub use utils::current_time_micros;
70
71// ============================================================================
72// Query Executor - Implementation
73// ============================================================================
74// TODO: Extract this implementation into a dedicated query/ module
75
76/// Query executor that executes SELECT plans.
77pub struct QueryExecutor<P>
78where
79    P: Pager<Blob = EntryHandle> + Send + Sync,
80{
81    provider: Arc<dyn ExecutorTableProvider<P>>,
82}
83
84impl<P> QueryExecutor<P>
85where
86    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
87{
88    pub fn new(provider: Arc<dyn ExecutorTableProvider<P>>) -> Self {
89        Self { provider }
90    }
91
92    pub fn execute_select(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
93        self.execute_select_with_filter(plan, None)
94    }
95
96    pub fn execute_select_with_filter(
97        &self,
98        plan: SelectPlan,
99        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
100    ) -> ExecutorResult<SelectExecution<P>> {
101        // Handle SELECT without FROM clause (e.g., SELECT 42, SELECT {'a': 1})
102        if plan.tables.is_empty() {
103            return self.execute_select_without_table(plan);
104        }
105
106        // Handle multi-table queries (cross products/joins)
107        if plan.tables.len() > 1 {
108            return self.execute_cross_product(plan);
109        }
110
111        // Single table query
112        let table_ref = &plan.tables[0];
113        let table = self.provider.get_table(&table_ref.qualified_name())?;
114        let display_name = table_ref.qualified_name();
115
116        if !plan.aggregates.is_empty() {
117            self.execute_aggregates(table, display_name, plan, row_filter)
118        } else if self.has_computed_aggregates(&plan) {
119            // Handle computed projections that contain embedded aggregates
120            self.execute_computed_aggregates(table, display_name, plan, row_filter)
121        } else {
122            self.execute_projection(table, display_name, plan, row_filter)
123        }
124    }
125
126    /// Check if any computed projections contain aggregate functions
127    fn has_computed_aggregates(&self, plan: &SelectPlan) -> bool {
128        plan.projections.iter().any(|proj| {
129            if let SelectProjection::Computed { expr, .. } = proj {
130                Self::expr_contains_aggregate(expr)
131            } else {
132                false
133            }
134        })
135    }
136
137    /// Recursively check if a scalar expression contains aggregates
138    fn expr_contains_aggregate(expr: &ScalarExpr<String>) -> bool {
139        match expr {
140            ScalarExpr::Aggregate(_) => true,
141            ScalarExpr::Binary { left, right, .. } => {
142                Self::expr_contains_aggregate(left) || Self::expr_contains_aggregate(right)
143            }
144            ScalarExpr::GetField { base, .. } => Self::expr_contains_aggregate(base),
145            ScalarExpr::Column(_) | ScalarExpr::Literal(_) => false,
146        }
147    }
148
149    /// Execute a SELECT without a FROM clause (e.g., SELECT 42, SELECT {'a': 1})
150    fn execute_select_without_table(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
151        use arrow::array::ArrayRef;
152        use arrow::datatypes::Field;
153
154        // Build schema from computed projections
155        let mut fields = Vec::new();
156        let mut arrays: Vec<ArrayRef> = Vec::new();
157
158        for proj in &plan.projections {
159            match proj {
160                SelectProjection::Computed { expr, alias } => {
161                    // Infer the data type from the expression
162                    let (field_name, dtype, array) = match expr {
163                        ScalarExpr::Literal(lit) => {
164                            let (dtype, array) = Self::literal_to_array(lit)?;
165                            (alias.clone(), dtype, array)
166                        }
167                        _ => {
168                            return Err(Error::InvalidArgumentError(
169                                "SELECT without FROM only supports literal expressions".into(),
170                            ));
171                        }
172                    };
173
174                    fields.push(Field::new(field_name, dtype, true));
175                    arrays.push(array);
176                }
177                _ => {
178                    return Err(Error::InvalidArgumentError(
179                        "SELECT without FROM only supports computed projections".into(),
180                    ));
181                }
182            }
183        }
184
185        let schema = Arc::new(Schema::new(fields));
186        let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)
187            .map_err(|e| Error::Internal(format!("failed to create record batch: {}", e)))?;
188
189        Ok(SelectExecution::new_single_batch(
190            String::new(), // No table name
191            schema,
192            batch,
193        ))
194    }
195
196    /// Convert a Literal to an Arrow array (recursive for nested structs)
197    fn literal_to_array(lit: &llkv_expr::literal::Literal) -> ExecutorResult<(DataType, ArrayRef)> {
198        use arrow::array::{
199            ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray, StructArray,
200            new_null_array,
201        };
202        use arrow::datatypes::{DataType, Field};
203        use llkv_expr::literal::Literal;
204
205        match lit {
206            Literal::Integer(v) => {
207                let val = i64::try_from(*v).unwrap_or(0);
208                Ok((
209                    DataType::Int64,
210                    Arc::new(Int64Array::from(vec![val])) as ArrayRef,
211                ))
212            }
213            Literal::Float(v) => Ok((
214                DataType::Float64,
215                Arc::new(Float64Array::from(vec![*v])) as ArrayRef,
216            )),
217            Literal::Boolean(v) => Ok((
218                DataType::Boolean,
219                Arc::new(BooleanArray::from(vec![*v])) as ArrayRef,
220            )),
221            Literal::String(v) => Ok((
222                DataType::Utf8,
223                Arc::new(StringArray::from(vec![v.clone()])) as ArrayRef,
224            )),
225            Literal::Null => Ok((DataType::Null, new_null_array(&DataType::Null, 1))),
226            Literal::Struct(struct_fields) => {
227                // Build a struct array recursively
228                let mut inner_fields = Vec::new();
229                let mut inner_arrays = Vec::new();
230
231                for (field_name, field_lit) in struct_fields {
232                    let (field_dtype, field_array) = Self::literal_to_array(field_lit)?;
233                    inner_fields.push(Field::new(field_name.clone(), field_dtype, true));
234                    inner_arrays.push(field_array);
235                }
236
237                let struct_array =
238                    StructArray::try_new(inner_fields.clone().into(), inner_arrays, None).map_err(
239                        |e| Error::Internal(format!("failed to create struct array: {}", e)),
240                    )?;
241
242                Ok((
243                    DataType::Struct(inner_fields.into()),
244                    Arc::new(struct_array) as ArrayRef,
245                ))
246            }
247        }
248    }
249
250    /// Execute a cross product query (FROM table1, table2, ...)
251    fn execute_cross_product(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
252        use arrow::compute::concat_batches;
253
254        if plan.tables.len() < 2 {
255            return Err(Error::InvalidArgumentError(
256                "cross product requires at least 2 tables".into(),
257            ));
258        }
259
260        // Get all tables
261        let mut tables = Vec::new();
262        for table_ref in &plan.tables {
263            let qualified_name = table_ref.qualified_name();
264            let table = self.provider.get_table(&qualified_name)?;
265            tables.push((table_ref.clone(), table));
266        }
267
268        // For now, support only 2-table cross product
269        if tables.len() > 2 {
270            return Err(Error::InvalidArgumentError(
271                "cross products with more than 2 tables not yet supported".into(),
272            ));
273        }
274
275        let (left_ref, left_table) = &tables[0];
276        let (right_ref, right_table) = &tables[1];
277
278        // Build the cross product using llkv-join crate
279        // For cross product, we pass empty join keys = Cartesian product
280        use llkv_join::{JoinOptions, JoinType, TableJoinExt};
281
282        let mut result_batches = Vec::new();
283        left_table.table.join_stream(
284            &right_table.table,
285            &[], // Empty join keys = cross product
286            &JoinOptions {
287                join_type: JoinType::Inner,
288                ..Default::default()
289            },
290            |batch| {
291                result_batches.push(batch);
292            },
293        )?;
294
295        // Build combined schema with qualified column names
296        let mut combined_fields = Vec::new();
297
298        // Add left table columns with schema.table.column prefix
299        for col in &left_table.schema.columns {
300            let qualified_name = format!("{}.{}.{}", left_ref.schema, left_ref.table, col.name);
301            combined_fields.push(arrow::datatypes::Field::new(
302                qualified_name,
303                col.data_type.clone(),
304                col.nullable,
305            ));
306        }
307
308        // Add right table columns with schema.table.column prefix
309        for col in &right_table.schema.columns {
310            let qualified_name = format!("{}.{}.{}", right_ref.schema, right_ref.table, col.name);
311            combined_fields.push(arrow::datatypes::Field::new(
312                qualified_name,
313                col.data_type.clone(),
314                col.nullable,
315            ));
316        }
317
318        let combined_schema = Arc::new(Schema::new(combined_fields));
319
320        // Combine all result batches with the combined schema (renames columns)
321        let mut combined_batch = if result_batches.is_empty() {
322            RecordBatch::new_empty(Arc::clone(&combined_schema))
323        } else if result_batches.len() == 1 {
324            let batch = result_batches.into_iter().next().unwrap();
325            // The batch from join has original column names, we need to apply our qualified schema
326            RecordBatch::try_new(Arc::clone(&combined_schema), batch.columns().to_vec()).map_err(
327                |e| {
328                    Error::Internal(format!(
329                        "failed to create batch with qualified names: {}",
330                        e
331                    ))
332                },
333            )?
334        } else {
335            // First concatenate with original schema
336            let original_batch = concat_batches(&result_batches[0].schema(), &result_batches)
337                .map_err(|e| Error::Internal(format!("failed to concatenate batches: {}", e)))?;
338            // Then apply qualified schema
339            RecordBatch::try_new(
340                Arc::clone(&combined_schema),
341                original_batch.columns().to_vec(),
342            )
343            .map_err(|e| {
344                Error::Internal(format!(
345                    "failed to create batch with qualified names: {}",
346                    e
347                ))
348            })?
349        };
350
351        // Apply SELECT projections if specified
352        if !plan.projections.is_empty() {
353            let mut selected_fields = Vec::new();
354            let mut selected_columns = Vec::new();
355
356            for proj in &plan.projections {
357                match proj {
358                    SelectProjection::AllColumns => {
359                        // Keep all columns
360                        selected_fields = combined_schema.fields().iter().cloned().collect();
361                        selected_columns = combined_batch.columns().to_vec();
362                        break;
363                    }
364                    SelectProjection::AllColumnsExcept { exclude } => {
365                        // Keep all columns except the excluded ones
366                        let exclude_lower: Vec<String> =
367                            exclude.iter().map(|e| e.to_ascii_lowercase()).collect();
368
369                        for (idx, field) in combined_schema.fields().iter().enumerate() {
370                            let field_name_lower = field.name().to_ascii_lowercase();
371                            if !exclude_lower.contains(&field_name_lower) {
372                                selected_fields.push(field.clone());
373                                selected_columns.push(combined_batch.column(idx).clone());
374                            }
375                        }
376                        break;
377                    }
378                    SelectProjection::Column { name, alias } => {
379                        // Find the column by qualified name
380                        let col_name = name.to_ascii_lowercase();
381                        if let Some((idx, field)) = combined_schema
382                            .fields()
383                            .iter()
384                            .enumerate()
385                            .find(|(_, f)| f.name().to_ascii_lowercase() == col_name)
386                        {
387                            let output_name = alias.as_ref().unwrap_or(name).clone();
388                            selected_fields.push(Arc::new(arrow::datatypes::Field::new(
389                                output_name,
390                                field.data_type().clone(),
391                                field.is_nullable(),
392                            )));
393                            selected_columns.push(combined_batch.column(idx).clone());
394                        } else {
395                            return Err(Error::InvalidArgumentError(format!(
396                                "column '{}' not found in cross product result",
397                                name
398                            )));
399                        }
400                    }
401                    SelectProjection::Computed { expr, alias } => {
402                        // Handle simple column references (like s1.t1.t)
403                        if let ScalarExpr::Column(col_name) = expr {
404                            let col_name_lower = col_name.to_ascii_lowercase();
405                            if let Some((idx, field)) = combined_schema
406                                .fields()
407                                .iter()
408                                .enumerate()
409                                .find(|(_, f)| f.name().to_ascii_lowercase() == col_name_lower)
410                            {
411                                selected_fields.push(Arc::new(arrow::datatypes::Field::new(
412                                    alias.clone(),
413                                    field.data_type().clone(),
414                                    field.is_nullable(),
415                                )));
416                                selected_columns.push(combined_batch.column(idx).clone());
417                            } else {
418                                return Err(Error::InvalidArgumentError(format!(
419                                    "column '{}' not found in cross product result",
420                                    col_name
421                                )));
422                            }
423                        } else {
424                            return Err(Error::InvalidArgumentError(
425                                "complex computed projections not yet supported in cross products"
426                                    .into(),
427                            ));
428                        }
429                    }
430                }
431            }
432
433            let projected_schema = Arc::new(Schema::new(selected_fields));
434            combined_batch = RecordBatch::try_new(projected_schema, selected_columns)
435                .map_err(|e| Error::Internal(format!("failed to apply projections: {}", e)))?;
436        }
437
438        Ok(SelectExecution::new_single_batch(
439            format!(
440                "{},{}",
441                left_ref.qualified_name(),
442                right_ref.qualified_name()
443            ),
444            combined_batch.schema(),
445            combined_batch,
446        ))
447    }
448
449    fn execute_projection(
450        &self,
451        table: Arc<ExecutorTable<P>>,
452        display_name: String,
453        plan: SelectPlan,
454        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
455    ) -> ExecutorResult<SelectExecution<P>> {
456        let table_ref = table.as_ref();
457        let projections = if plan.projections.is_empty() {
458            build_wildcard_projections(table_ref)
459        } else {
460            build_projected_columns(table_ref, &plan.projections)?
461        };
462        let schema = schema_for_projections(table_ref, &projections)?;
463
464        let (filter_expr, full_table_scan) = match plan.filter {
465            Some(expr) => (
466                crate::translation::expression::translate_predicate(
467                    expr,
468                    table_ref.schema.as_ref(),
469                    |name| Error::InvalidArgumentError(format!("unknown column '{}'", name)),
470                )?,
471                false,
472            ),
473            None => {
474                let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
475                    Error::InvalidArgumentError(
476                        "table has no columns; cannot perform wildcard scan".into(),
477                    )
478                })?;
479                (
480                    crate::translation::expression::full_table_scan_filter(field_id),
481                    true,
482                )
483            }
484        };
485
486        let expanded_order = expand_order_targets(&plan.order_by, &projections)?;
487        let physical_order = if let Some(first) = expanded_order.first() {
488            Some(resolve_scan_order(table_ref, &projections, first)?)
489        } else {
490            None
491        };
492
493        let options = if let Some(order_spec) = physical_order {
494            if row_filter.is_some() {
495                tracing::debug!("Applying MVCC row filter with ORDER BY");
496            }
497            ScanStreamOptions {
498                include_nulls: true,
499                order: Some(order_spec),
500                row_id_filter: row_filter.clone(),
501            }
502        } else {
503            if row_filter.is_some() {
504                tracing::debug!("Applying MVCC row filter");
505            }
506            ScanStreamOptions {
507                include_nulls: true,
508                order: None,
509                row_id_filter: row_filter.clone(),
510            }
511        };
512
513        Ok(SelectExecution::new_projection(
514            display_name,
515            schema,
516            table,
517            projections,
518            filter_expr,
519            options,
520            full_table_scan,
521            expanded_order,
522        ))
523    }
524
525    fn execute_aggregates(
526        &self,
527        table: Arc<ExecutorTable<P>>,
528        display_name: String,
529        plan: SelectPlan,
530        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
531    ) -> ExecutorResult<SelectExecution<P>> {
532        let table_ref = table.as_ref();
533        let mut specs: Vec<AggregateSpec> = Vec::with_capacity(plan.aggregates.len());
534        for aggregate in plan.aggregates {
535            match aggregate {
536                AggregateExpr::CountStar { alias } => {
537                    specs.push(AggregateSpec {
538                        alias,
539                        kind: AggregateKind::CountStar,
540                    });
541                }
542                AggregateExpr::Column {
543                    column,
544                    alias,
545                    function,
546                    distinct,
547                } => {
548                    let col = table_ref.schema.resolve(&column).ok_or_else(|| {
549                        Error::InvalidArgumentError(format!(
550                            "unknown column '{}' in aggregate",
551                            column
552                        ))
553                    })?;
554
555                    let kind = match function {
556                        AggregateFunction::Count => {
557                            if distinct {
558                                AggregateKind::CountDistinctField {
559                                    field_id: col.field_id,
560                                }
561                            } else {
562                                AggregateKind::CountField {
563                                    field_id: col.field_id,
564                                }
565                            }
566                        }
567                        AggregateFunction::SumInt64 => {
568                            if col.data_type != DataType::Int64 {
569                                return Err(Error::InvalidArgumentError(
570                                    "SUM currently supports only INTEGER columns".into(),
571                                ));
572                            }
573                            AggregateKind::SumInt64 {
574                                field_id: col.field_id,
575                            }
576                        }
577                        AggregateFunction::MinInt64 => {
578                            if col.data_type != DataType::Int64 {
579                                return Err(Error::InvalidArgumentError(
580                                    "MIN currently supports only INTEGER columns".into(),
581                                ));
582                            }
583                            AggregateKind::MinInt64 {
584                                field_id: col.field_id,
585                            }
586                        }
587                        AggregateFunction::MaxInt64 => {
588                            if col.data_type != DataType::Int64 {
589                                return Err(Error::InvalidArgumentError(
590                                    "MAX currently supports only INTEGER columns".into(),
591                                ));
592                            }
593                            AggregateKind::MaxInt64 {
594                                field_id: col.field_id,
595                            }
596                        }
597                        AggregateFunction::CountNulls => {
598                            if distinct {
599                                return Err(Error::InvalidArgumentError(
600                                    "DISTINCT is not supported for COUNT_NULLS".into(),
601                                ));
602                            }
603                            AggregateKind::CountNulls {
604                                field_id: col.field_id,
605                            }
606                        }
607                    };
608                    specs.push(AggregateSpec { alias, kind });
609                }
610            }
611        }
612
613        if specs.is_empty() {
614            return Err(Error::InvalidArgumentError(
615                "aggregate query requires at least one aggregate expression".into(),
616            ));
617        }
618
619        let had_filter = plan.filter.is_some();
620        let filter_expr = match plan.filter {
621            Some(expr) => crate::translation::expression::translate_predicate(
622                expr,
623                table.schema.as_ref(),
624                |name| Error::InvalidArgumentError(format!("unknown column '{}'", name)),
625            )?,
626            None => {
627                let field_id = table.schema.first_field_id().ok_or_else(|| {
628                    Error::InvalidArgumentError(
629                        "table has no columns; cannot perform aggregate scan".into(),
630                    )
631                })?;
632                crate::translation::expression::full_table_scan_filter(field_id)
633            }
634        };
635
636        // Build projections and track which projection index each spec uses
637        let mut projections = Vec::new();
638        let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
639
640        for spec in &specs {
641            if let Some(field_id) = spec.kind.field_id() {
642                let proj_idx = projections.len();
643                spec_to_projection.push(Some(proj_idx));
644                projections.push(ScanProjection::from(StoreProjection::with_alias(
645                    LogicalFieldId::for_user(table.table.table_id(), field_id),
646                    table
647                        .schema
648                        .column_by_field_id(field_id)
649                        .map(|c| c.name.clone())
650                        .unwrap_or_else(|| format!("col{field_id}")),
651                )));
652            } else {
653                spec_to_projection.push(None);
654            }
655        }
656
657        if projections.is_empty() {
658            let field_id = table.schema.first_field_id().ok_or_else(|| {
659                Error::InvalidArgumentError(
660                    "table has no columns; cannot perform aggregate scan".into(),
661                )
662            })?;
663            projections.push(ScanProjection::from(StoreProjection::with_alias(
664                LogicalFieldId::for_user(table.table.table_id(), field_id),
665                table
666                    .schema
667                    .column_by_field_id(field_id)
668                    .map(|c| c.name.clone())
669                    .unwrap_or_else(|| format!("col{field_id}")),
670            )));
671        }
672
673        let options = ScanStreamOptions {
674            include_nulls: true,
675            order: None,
676            row_id_filter: row_filter.clone(),
677        };
678
679        let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
680        // MVCC Note: We cannot use the total_rows shortcut when MVCC visibility filtering
681        // is enabled, because some rows may be invisible due to uncommitted or aborted transactions.
682        // Always scan to apply proper visibility rules.
683        let mut count_star_override: Option<i64> = None;
684        if !had_filter && row_filter.is_none() {
685            // Only use shortcut if no filter AND no MVCC row filtering
686            let total_rows = table.total_rows.load(Ordering::SeqCst);
687            tracing::debug!(
688                "[AGGREGATE] Using COUNT(*) shortcut: total_rows={}",
689                total_rows
690            );
691            if total_rows > i64::MAX as u64 {
692                return Err(Error::InvalidArgumentError(
693                    "COUNT(*) result exceeds supported range".into(),
694                ));
695            }
696            count_star_override = Some(total_rows as i64);
697        } else {
698            tracing::debug!(
699                "[AGGREGATE] NOT using COUNT(*) shortcut: had_filter={}, has_row_filter={}",
700                had_filter,
701                row_filter.is_some()
702            );
703        }
704
705        for (idx, spec) in specs.iter().enumerate() {
706            states.push(AggregateState {
707                alias: spec.alias.clone(),
708                accumulator: AggregateAccumulator::new_with_projection_index(
709                    spec,
710                    spec_to_projection[idx],
711                    count_star_override,
712                )?,
713                override_value: match spec.kind {
714                    AggregateKind::CountStar => {
715                        tracing::debug!(
716                            "[AGGREGATE] CountStar override_value={:?}",
717                            count_star_override
718                        );
719                        count_star_override
720                    }
721                    _ => None,
722                },
723            });
724        }
725
726        let mut error: Option<Error> = None;
727        match table.table.scan_stream(
728            projections,
729            &filter_expr,
730            ScanStreamOptions {
731                row_id_filter: row_filter.clone(),
732                ..options
733            },
734            |batch| {
735                if error.is_some() {
736                    return;
737                }
738                for state in &mut states {
739                    if let Err(err) = state.update(&batch) {
740                        error = Some(err);
741                        return;
742                    }
743                }
744            },
745        ) {
746            Ok(()) => {}
747            Err(llkv_result::Error::NotFound) => {
748                // Treat missing storage keys as an empty result set. This occurs
749                // for freshly created tables that have no persisted chunks yet.
750            }
751            Err(err) => return Err(err),
752        }
753        if let Some(err) = error {
754            return Err(err);
755        }
756
757        let mut fields = Vec::with_capacity(states.len());
758        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(states.len());
759        for state in states {
760            let (field, array) = state.finalize()?;
761            fields.push(field);
762            arrays.push(array);
763        }
764
765        let schema = Arc::new(Schema::new(fields));
766        let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
767        Ok(SelectExecution::new_single_batch(
768            display_name,
769            schema,
770            batch,
771        ))
772    }
773
774    /// Execute a query where computed projections contain embedded aggregates
775    /// This extracts aggregates, computes them, then evaluates the scalar expressions
776    fn execute_computed_aggregates(
777        &self,
778        table: Arc<ExecutorTable<P>>,
779        display_name: String,
780        plan: SelectPlan,
781        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
782    ) -> ExecutorResult<SelectExecution<P>> {
783        use arrow::array::Int64Array;
784        use llkv_expr::expr::AggregateCall;
785
786        let table_ref = table.as_ref();
787
788        // First, extract all unique aggregates from the projections
789        let mut aggregate_specs: Vec<(String, AggregateCall<String>)> = Vec::new();
790        for proj in &plan.projections {
791            if let SelectProjection::Computed { expr, .. } = proj {
792                Self::collect_aggregates(expr, &mut aggregate_specs);
793            }
794        }
795
796        // Compute the aggregates using the existing aggregate execution infrastructure
797        let computed_aggregates = self.compute_aggregate_values(
798            table.clone(),
799            &plan.filter,
800            &aggregate_specs,
801            row_filter.clone(),
802        )?;
803
804        // Now build the final projections by evaluating expressions with aggregates substituted
805        let mut fields = Vec::with_capacity(plan.projections.len());
806        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(plan.projections.len());
807
808        for proj in &plan.projections {
809            match proj {
810                SelectProjection::AllColumns | SelectProjection::AllColumnsExcept { .. } => {
811                    return Err(Error::InvalidArgumentError(
812                        "Wildcard projections not supported with computed aggregates".into(),
813                    ));
814                }
815                SelectProjection::Column { name, alias } => {
816                    let col = table_ref.schema.resolve(name).ok_or_else(|| {
817                        Error::InvalidArgumentError(format!("unknown column '{}'", name))
818                    })?;
819                    let field_name = alias.as_ref().unwrap_or(name);
820                    fields.push(arrow::datatypes::Field::new(
821                        field_name,
822                        col.data_type.clone(),
823                        col.nullable,
824                    ));
825                    // For regular columns in an aggregate query, we'd need to handle GROUP BY
826                    // For now, return an error as this is not supported
827                    return Err(Error::InvalidArgumentError(
828                        "Regular columns not supported in aggregate queries without GROUP BY"
829                            .into(),
830                    ));
831                }
832                SelectProjection::Computed { expr, alias } => {
833                    // Evaluate the expression with aggregates substituted
834                    let value = Self::evaluate_expr_with_aggregates(expr, &computed_aggregates)?;
835
836                    fields.push(arrow::datatypes::Field::new(alias, DataType::Int64, false));
837
838                    let array = Arc::new(Int64Array::from(vec![value])) as ArrayRef;
839                    arrays.push(array);
840                }
841            }
842        }
843
844        let schema = Arc::new(Schema::new(fields));
845        let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
846        Ok(SelectExecution::new_single_batch(
847            display_name,
848            schema,
849            batch,
850        ))
851    }
852
853    /// Collect all aggregate calls from an expression
854    fn collect_aggregates(
855        expr: &ScalarExpr<String>,
856        aggregates: &mut Vec<(String, llkv_expr::expr::AggregateCall<String>)>,
857    ) {
858        match expr {
859            ScalarExpr::Aggregate(agg) => {
860                // Create a unique key for this aggregate
861                let key = format!("{:?}", agg);
862                if !aggregates.iter().any(|(k, _)| k == &key) {
863                    aggregates.push((key, agg.clone()));
864                }
865            }
866            ScalarExpr::Binary { left, right, .. } => {
867                Self::collect_aggregates(left, aggregates);
868                Self::collect_aggregates(right, aggregates);
869            }
870            ScalarExpr::GetField { base, .. } => {
871                Self::collect_aggregates(base, aggregates);
872            }
873            ScalarExpr::Column(_) | ScalarExpr::Literal(_) => {}
874        }
875    }
876
877    /// Compute the actual values for the aggregates
878    fn compute_aggregate_values(
879        &self,
880        table: Arc<ExecutorTable<P>>,
881        filter: &Option<llkv_expr::expr::Expr<'static, String>>,
882        aggregate_specs: &[(String, llkv_expr::expr::AggregateCall<String>)],
883        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
884    ) -> ExecutorResult<FxHashMap<String, i64>> {
885        use llkv_expr::expr::AggregateCall;
886
887        let table_ref = table.as_ref();
888        let mut results =
889            FxHashMap::with_capacity_and_hasher(aggregate_specs.len(), Default::default());
890
891        // Build aggregate specs for the aggregator
892        let mut specs: Vec<AggregateSpec> = Vec::new();
893        for (key, agg) in aggregate_specs {
894            let kind = match agg {
895                AggregateCall::CountStar => AggregateKind::CountStar,
896                AggregateCall::Count(col_name) => {
897                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
898                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
899                    })?;
900                    AggregateKind::CountField {
901                        field_id: col.field_id,
902                    }
903                }
904                AggregateCall::Sum(col_name) => {
905                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
906                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
907                    })?;
908                    AggregateKind::SumInt64 {
909                        field_id: col.field_id,
910                    }
911                }
912                AggregateCall::Min(col_name) => {
913                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
914                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
915                    })?;
916                    AggregateKind::MinInt64 {
917                        field_id: col.field_id,
918                    }
919                }
920                AggregateCall::Max(col_name) => {
921                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
922                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
923                    })?;
924                    AggregateKind::MaxInt64 {
925                        field_id: col.field_id,
926                    }
927                }
928                AggregateCall::CountNulls(col_name) => {
929                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
930                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
931                    })?;
932                    AggregateKind::CountNulls {
933                        field_id: col.field_id,
934                    }
935                }
936            };
937            specs.push(AggregateSpec {
938                alias: key.clone(),
939                kind,
940            });
941        }
942
943        // Prepare filter and projections
944        let filter_expr = match filter {
945            Some(expr) => crate::translation::expression::translate_predicate(
946                expr.clone(),
947                table_ref.schema.as_ref(),
948                |name| Error::InvalidArgumentError(format!("unknown column '{}'", name)),
949            )?,
950            None => {
951                let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
952                    Error::InvalidArgumentError(
953                        "table has no columns; cannot perform aggregate scan".into(),
954                    )
955                })?;
956                crate::translation::expression::full_table_scan_filter(field_id)
957            }
958        };
959
960        let mut projections: Vec<ScanProjection> = Vec::new();
961        let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
962        let count_star_override: Option<i64> = None;
963
964        for spec in &specs {
965            if let Some(field_id) = spec.kind.field_id() {
966                spec_to_projection.push(Some(projections.len()));
967                projections.push(ScanProjection::from(StoreProjection::with_alias(
968                    LogicalFieldId::for_user(table.table.table_id(), field_id),
969                    table
970                        .schema
971                        .column_by_field_id(field_id)
972                        .map(|c| c.name.clone())
973                        .unwrap_or_else(|| format!("col{field_id}")),
974                )));
975            } else {
976                spec_to_projection.push(None);
977            }
978        }
979
980        if projections.is_empty() {
981            let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
982                Error::InvalidArgumentError(
983                    "table has no columns; cannot perform aggregate scan".into(),
984                )
985            })?;
986            projections.push(ScanProjection::from(StoreProjection::with_alias(
987                LogicalFieldId::for_user(table.table.table_id(), field_id),
988                table
989                    .schema
990                    .column_by_field_id(field_id)
991                    .map(|c| c.name.clone())
992                    .unwrap_or_else(|| format!("col{field_id}")),
993            )));
994        }
995
996        let base_options = ScanStreamOptions {
997            include_nulls: true,
998            order: None,
999            row_id_filter: None,
1000        };
1001
1002        let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
1003        for (idx, spec) in specs.iter().enumerate() {
1004            states.push(AggregateState {
1005                alias: spec.alias.clone(),
1006                accumulator: AggregateAccumulator::new_with_projection_index(
1007                    spec,
1008                    spec_to_projection[idx],
1009                    count_star_override,
1010                )?,
1011                override_value: match spec.kind {
1012                    AggregateKind::CountStar => count_star_override,
1013                    _ => None,
1014                },
1015            });
1016        }
1017
1018        let mut error: Option<Error> = None;
1019        match table.table.scan_stream(
1020            projections,
1021            &filter_expr,
1022            ScanStreamOptions {
1023                row_id_filter: row_filter.clone(),
1024                ..base_options
1025            },
1026            |batch| {
1027                if error.is_some() {
1028                    return;
1029                }
1030                for state in &mut states {
1031                    if let Err(err) = state.update(&batch) {
1032                        error = Some(err);
1033                        return;
1034                    }
1035                }
1036            },
1037        ) {
1038            Ok(()) => {}
1039            Err(llkv_result::Error::NotFound) => {}
1040            Err(err) => return Err(err),
1041        }
1042        if let Some(err) = error {
1043            return Err(err);
1044        }
1045
1046        // Extract the computed values
1047        for state in states {
1048            let alias = state.alias.clone();
1049            let (_field, array) = state.finalize()?;
1050
1051            // Extract the i64 value from the array
1052            let int64_array = array
1053                .as_any()
1054                .downcast_ref::<arrow::array::Int64Array>()
1055                .ok_or_else(|| Error::Internal("Expected Int64Array from aggregate".into()))?;
1056
1057            if int64_array.len() != 1 {
1058                return Err(Error::Internal(format!(
1059                    "Expected single value from aggregate, got {}",
1060                    int64_array.len()
1061                )));
1062            }
1063
1064            let value = if int64_array.is_null(0) {
1065                0
1066            } else {
1067                int64_array.value(0)
1068            };
1069
1070            results.insert(alias, value);
1071        }
1072
1073        Ok(results)
1074    }
1075
1076    /// Evaluate an expression by substituting aggregate values
1077    fn evaluate_expr_with_aggregates(
1078        expr: &ScalarExpr<String>,
1079        aggregates: &FxHashMap<String, i64>,
1080    ) -> ExecutorResult<i64> {
1081        use llkv_expr::expr::BinaryOp;
1082        use llkv_expr::literal::Literal;
1083
1084        match expr {
1085            ScalarExpr::Literal(Literal::Integer(v)) => Ok(*v as i64),
1086            ScalarExpr::Literal(Literal::Float(v)) => Ok(*v as i64),
1087            ScalarExpr::Literal(Literal::Boolean(v)) => Ok(if *v { 1 } else { 0 }),
1088            ScalarExpr::Literal(Literal::String(_)) => Err(Error::InvalidArgumentError(
1089                "String literals not supported in aggregate expressions".into(),
1090            )),
1091            ScalarExpr::Literal(Literal::Null) => Err(Error::InvalidArgumentError(
1092                "NULL literals not supported in aggregate expressions".into(),
1093            )),
1094            ScalarExpr::Literal(Literal::Struct(_)) => Err(Error::InvalidArgumentError(
1095                "Struct literals not supported in aggregate expressions".into(),
1096            )),
1097            ScalarExpr::Column(_) => Err(Error::InvalidArgumentError(
1098                "Column references not supported in aggregate-only expressions".into(),
1099            )),
1100            ScalarExpr::Aggregate(agg) => {
1101                let key = format!("{:?}", agg);
1102                aggregates.get(&key).copied().ok_or_else(|| {
1103                    Error::Internal(format!("Aggregate value not found for key: {}", key))
1104                })
1105            }
1106            ScalarExpr::Binary { left, op, right } => {
1107                let left_val = Self::evaluate_expr_with_aggregates(left, aggregates)?;
1108                let right_val = Self::evaluate_expr_with_aggregates(right, aggregates)?;
1109
1110                let result = match op {
1111                    BinaryOp::Add => left_val.checked_add(right_val),
1112                    BinaryOp::Subtract => left_val.checked_sub(right_val),
1113                    BinaryOp::Multiply => left_val.checked_mul(right_val),
1114                    BinaryOp::Divide => {
1115                        if right_val == 0 {
1116                            return Err(Error::InvalidArgumentError("Division by zero".into()));
1117                        }
1118                        left_val.checked_div(right_val)
1119                    }
1120                    BinaryOp::Modulo => {
1121                        if right_val == 0 {
1122                            return Err(Error::InvalidArgumentError("Modulo by zero".into()));
1123                        }
1124                        left_val.checked_rem(right_val)
1125                    }
1126                };
1127
1128                result.ok_or_else(|| {
1129                    Error::InvalidArgumentError("Arithmetic overflow in expression".into())
1130                })
1131            }
1132            ScalarExpr::GetField { .. } => Err(Error::InvalidArgumentError(
1133                "GetField not supported in aggregate-only expressions".into(),
1134            )),
1135        }
1136    }
1137}
1138
1139/// Streaming execution handle for SELECT queries.
1140#[derive(Clone)]
1141pub struct SelectExecution<P>
1142where
1143    P: Pager<Blob = EntryHandle> + Send + Sync,
1144{
1145    table_name: String,
1146    schema: Arc<Schema>,
1147    stream: SelectStream<P>,
1148}
1149
1150#[derive(Clone)]
1151enum SelectStream<P>
1152where
1153    P: Pager<Blob = EntryHandle> + Send + Sync,
1154{
1155    Projection {
1156        table: Arc<ExecutorTable<P>>,
1157        projections: Vec<ScanProjection>,
1158        filter_expr: LlkvExpr<'static, FieldId>,
1159        options: ScanStreamOptions<P>,
1160        full_table_scan: bool,
1161        order_by: Vec<OrderByPlan>,
1162    },
1163    Aggregation {
1164        batch: RecordBatch,
1165    },
1166}
1167
1168impl<P> SelectExecution<P>
1169where
1170    P: Pager<Blob = EntryHandle> + Send + Sync,
1171{
1172    #[allow(clippy::too_many_arguments)]
1173    fn new_projection(
1174        table_name: String,
1175        schema: Arc<Schema>,
1176        table: Arc<ExecutorTable<P>>,
1177        projections: Vec<ScanProjection>,
1178        filter_expr: LlkvExpr<'static, FieldId>,
1179        options: ScanStreamOptions<P>,
1180        full_table_scan: bool,
1181        order_by: Vec<OrderByPlan>,
1182    ) -> Self {
1183        Self {
1184            table_name,
1185            schema,
1186            stream: SelectStream::Projection {
1187                table,
1188                projections,
1189                filter_expr,
1190                options,
1191                full_table_scan,
1192                order_by,
1193            },
1194        }
1195    }
1196
1197    pub fn new_single_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
1198        Self {
1199            table_name,
1200            schema,
1201            stream: SelectStream::Aggregation { batch },
1202        }
1203    }
1204
1205    pub fn from_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
1206        Self::new_single_batch(table_name, schema, batch)
1207    }
1208
1209    pub fn table_name(&self) -> &str {
1210        &self.table_name
1211    }
1212
1213    pub fn schema(&self) -> Arc<Schema> {
1214        Arc::clone(&self.schema)
1215    }
1216
1217    pub fn stream(
1218        self,
1219        mut on_batch: impl FnMut(RecordBatch) -> ExecutorResult<()>,
1220    ) -> ExecutorResult<()> {
1221        let schema = Arc::clone(&self.schema);
1222        match self.stream {
1223            SelectStream::Projection {
1224                table,
1225                projections,
1226                filter_expr,
1227                options,
1228                full_table_scan,
1229                order_by,
1230            } => {
1231                // Early return for empty tables to avoid ColumnStore data_type() errors
1232                let total_rows = table.total_rows.load(Ordering::SeqCst);
1233                if total_rows == 0 {
1234                    // Empty table - return empty result with correct schema
1235                    return Ok(());
1236                }
1237
1238                let mut error: Option<Error> = None;
1239                let mut produced = false;
1240                let mut produced_rows: u64 = 0;
1241                let capture_nulls_first = matches!(options.order, Some(spec) if spec.nulls_first);
1242                let needs_post_sort = order_by.len() > 1;
1243                let collect_batches = needs_post_sort || capture_nulls_first;
1244                let include_nulls = options.include_nulls;
1245                let has_row_id_filter = options.row_id_filter.is_some();
1246                let scan_options = options;
1247                let mut buffered_batches: Vec<RecordBatch> = Vec::new();
1248                table
1249                    .table
1250                    .scan_stream(projections, &filter_expr, scan_options, |batch| {
1251                        if error.is_some() {
1252                            return;
1253                        }
1254                        produced = true;
1255                        produced_rows = produced_rows.saturating_add(batch.num_rows() as u64);
1256                        if collect_batches {
1257                            buffered_batches.push(batch);
1258                        } else if let Err(err) = on_batch(batch) {
1259                            error = Some(err);
1260                        }
1261                    })?;
1262                if let Some(err) = error {
1263                    return Err(err);
1264                }
1265                if !produced {
1266                    if total_rows > 0 {
1267                        for batch in synthesize_null_scan(Arc::clone(&schema), total_rows)? {
1268                            on_batch(batch)?;
1269                        }
1270                    }
1271                    return Ok(());
1272                }
1273                let mut null_batches: Vec<RecordBatch> = Vec::new();
1274                // Only synthesize null rows if:
1275                // 1. include_nulls is true
1276                // 2. This is a full table scan
1277                // 3. We produced fewer rows than the total
1278                // 4. We DON'T have a row_id_filter (e.g., MVCC filter) that intentionally filtered rows
1279                if include_nulls
1280                    && full_table_scan
1281                    && produced_rows < total_rows
1282                    && !has_row_id_filter
1283                {
1284                    let missing = total_rows - produced_rows;
1285                    if missing > 0 {
1286                        null_batches = synthesize_null_scan(Arc::clone(&schema), missing)?;
1287                    }
1288                }
1289
1290                if collect_batches {
1291                    if needs_post_sort {
1292                        if !null_batches.is_empty() {
1293                            buffered_batches.extend(null_batches);
1294                        }
1295                        if !buffered_batches.is_empty() {
1296                            let combined =
1297                                concat_batches(&schema, &buffered_batches).map_err(|err| {
1298                                    Error::InvalidArgumentError(format!(
1299                                        "failed to concatenate result batches for ORDER BY: {}",
1300                                        err
1301                                    ))
1302                                })?;
1303                            let sorted_batch =
1304                                sort_record_batch_with_order(&schema, &combined, &order_by)?;
1305                            on_batch(sorted_batch)?;
1306                        }
1307                    } else if capture_nulls_first {
1308                        for batch in null_batches {
1309                            on_batch(batch)?;
1310                        }
1311                        for batch in buffered_batches {
1312                            on_batch(batch)?;
1313                        }
1314                    }
1315                } else if !null_batches.is_empty() {
1316                    for batch in null_batches {
1317                        on_batch(batch)?;
1318                    }
1319                }
1320                Ok(())
1321            }
1322            SelectStream::Aggregation { batch } => on_batch(batch),
1323        }
1324    }
1325
1326    pub fn collect(self) -> ExecutorResult<Vec<RecordBatch>> {
1327        let mut batches = Vec::new();
1328        self.stream(|batch| {
1329            batches.push(batch);
1330            Ok(())
1331        })?;
1332        Ok(batches)
1333    }
1334
1335    pub fn collect_rows(self) -> ExecutorResult<ExecutorRowBatch> {
1336        let schema = self.schema();
1337        let mut rows: Vec<Vec<PlanValue>> = Vec::new();
1338        self.stream(|batch| {
1339            for row_idx in 0..batch.num_rows() {
1340                let mut row: Vec<PlanValue> = Vec::with_capacity(batch.num_columns());
1341                for col_idx in 0..batch.num_columns() {
1342                    let value = llkv_plan::plan_value_from_array(batch.column(col_idx), row_idx)?;
1343                    row.push(value);
1344                }
1345                rows.push(row);
1346            }
1347            Ok(())
1348        })?;
1349        let columns = schema
1350            .fields()
1351            .iter()
1352            .map(|field| field.name().to_string())
1353            .collect();
1354        Ok(ExecutorRowBatch { columns, rows })
1355    }
1356
1357    pub fn into_rows(self) -> ExecutorResult<Vec<Vec<PlanValue>>> {
1358        Ok(self.collect_rows()?.rows)
1359    }
1360}
1361
1362impl<P> fmt::Debug for SelectExecution<P>
1363where
1364    P: Pager<Blob = EntryHandle> + Send + Sync,
1365{
1366    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1367        f.debug_struct("SelectExecution")
1368            .field("table_name", &self.table_name)
1369            .field("schema", &self.schema)
1370            .finish()
1371    }
1372}
1373
1374// ============================================================================
1375// Helper Functions
1376// ============================================================================
1377
1378fn expand_order_targets(
1379    order_items: &[OrderByPlan],
1380    projections: &[ScanProjection],
1381) -> ExecutorResult<Vec<OrderByPlan>> {
1382    let mut expanded = Vec::new();
1383
1384    for item in order_items {
1385        match &item.target {
1386            OrderTarget::All => {
1387                if projections.is_empty() {
1388                    return Err(Error::InvalidArgumentError(
1389                        "ORDER BY ALL requires at least one projection".into(),
1390                    ));
1391                }
1392
1393                for (idx, projection) in projections.iter().enumerate() {
1394                    if matches!(projection, ScanProjection::Computed { .. }) {
1395                        return Err(Error::InvalidArgumentError(
1396                            "ORDER BY ALL cannot reference computed projections".into(),
1397                        ));
1398                    }
1399
1400                    let mut clone = item.clone();
1401                    clone.target = OrderTarget::Index(idx);
1402                    expanded.push(clone);
1403                }
1404            }
1405            _ => expanded.push(item.clone()),
1406        }
1407    }
1408
1409    Ok(expanded)
1410}
1411
1412fn resolve_scan_order<P>(
1413    table: &ExecutorTable<P>,
1414    projections: &[ScanProjection],
1415    order_plan: &OrderByPlan,
1416) -> ExecutorResult<ScanOrderSpec>
1417where
1418    P: Pager<Blob = EntryHandle> + Send + Sync,
1419{
1420    let (column, field_id) = match &order_plan.target {
1421        OrderTarget::Column(name) => {
1422            let column = table.schema.resolve(name).ok_or_else(|| {
1423                Error::InvalidArgumentError(format!("unknown column '{}' in ORDER BY", name))
1424            })?;
1425            (column, column.field_id)
1426        }
1427        OrderTarget::Index(position) => {
1428            let projection = projections.get(*position).ok_or_else(|| {
1429                Error::InvalidArgumentError(format!(
1430                    "ORDER BY position {} is out of range",
1431                    position + 1
1432                ))
1433            })?;
1434            match projection {
1435                ScanProjection::Column(store_projection) => {
1436                    let field_id = store_projection.logical_field_id.field_id();
1437                    let column = table.schema.column_by_field_id(field_id).ok_or_else(|| {
1438                        Error::InvalidArgumentError(format!(
1439                            "unknown column with field id {field_id} in ORDER BY"
1440                        ))
1441                    })?;
1442                    (column, field_id)
1443                }
1444                ScanProjection::Computed { .. } => {
1445                    return Err(Error::InvalidArgumentError(
1446                        "ORDER BY position referring to computed projection is not supported"
1447                            .into(),
1448                    ));
1449                }
1450            }
1451        }
1452        OrderTarget::All => {
1453            return Err(Error::InvalidArgumentError(
1454                "ORDER BY ALL should be expanded before execution".into(),
1455            ));
1456        }
1457    };
1458
1459    let transform = match order_plan.sort_type {
1460        OrderSortType::Native => match column.data_type {
1461            DataType::Int64 => ScanOrderTransform::IdentityInteger,
1462            DataType::Utf8 => ScanOrderTransform::IdentityUtf8,
1463            ref other => {
1464                return Err(Error::InvalidArgumentError(format!(
1465                    "ORDER BY on column type {:?} is not supported",
1466                    other
1467                )));
1468            }
1469        },
1470        OrderSortType::CastTextToInteger => {
1471            if column.data_type != DataType::Utf8 {
1472                return Err(Error::InvalidArgumentError(
1473                    "ORDER BY CAST expects a text column".into(),
1474                ));
1475            }
1476            ScanOrderTransform::CastUtf8ToInteger
1477        }
1478    };
1479
1480    let direction = if order_plan.ascending {
1481        ScanOrderDirection::Ascending
1482    } else {
1483        ScanOrderDirection::Descending
1484    };
1485
1486    Ok(ScanOrderSpec {
1487        field_id,
1488        direction,
1489        nulls_first: order_plan.nulls_first,
1490        transform,
1491    })
1492}
1493
1494fn synthesize_null_scan(schema: Arc<Schema>, total_rows: u64) -> ExecutorResult<Vec<RecordBatch>> {
1495    let row_count = usize::try_from(total_rows).map_err(|_| {
1496        Error::InvalidArgumentError("table row count exceeds supported in-memory batch size".into())
1497    })?;
1498
1499    let mut arrays: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1500    for field in schema.fields() {
1501        match field.data_type() {
1502            DataType::Int64 => {
1503                let mut builder = Int64Builder::with_capacity(row_count);
1504                for _ in 0..row_count {
1505                    builder.append_null();
1506                }
1507                arrays.push(Arc::new(builder.finish()));
1508            }
1509            DataType::Float64 => {
1510                let mut builder = arrow::array::Float64Builder::with_capacity(row_count);
1511                for _ in 0..row_count {
1512                    builder.append_null();
1513                }
1514                arrays.push(Arc::new(builder.finish()));
1515            }
1516            DataType::Utf8 => {
1517                let mut builder = arrow::array::StringBuilder::with_capacity(row_count, 0);
1518                for _ in 0..row_count {
1519                    builder.append_null();
1520                }
1521                arrays.push(Arc::new(builder.finish()));
1522            }
1523            DataType::Date32 => {
1524                let mut builder = arrow::array::Date32Builder::with_capacity(row_count);
1525                for _ in 0..row_count {
1526                    builder.append_null();
1527                }
1528                arrays.push(Arc::new(builder.finish()));
1529            }
1530            other => {
1531                return Err(Error::InvalidArgumentError(format!(
1532                    "unsupported data type in null synthesis: {other:?}"
1533                )));
1534            }
1535        }
1536    }
1537
1538    let batch = RecordBatch::try_new(schema, arrays)?;
1539    Ok(vec![batch])
1540}
1541
1542fn sort_record_batch_with_order(
1543    schema: &Arc<Schema>,
1544    batch: &RecordBatch,
1545    order_by: &[OrderByPlan],
1546) -> ExecutorResult<RecordBatch> {
1547    if order_by.is_empty() {
1548        return Ok(batch.clone());
1549    }
1550
1551    let mut sort_columns: Vec<SortColumn> = Vec::with_capacity(order_by.len());
1552
1553    for order in order_by {
1554        let column_index = match &order.target {
1555            OrderTarget::Column(name) => schema.index_of(name).map_err(|_| {
1556                Error::InvalidArgumentError(format!(
1557                    "ORDER BY references unknown column '{}'",
1558                    name
1559                ))
1560            })?,
1561            OrderTarget::Index(idx) => {
1562                if *idx >= batch.num_columns() {
1563                    return Err(Error::InvalidArgumentError(format!(
1564                        "ORDER BY position {} is out of bounds for {} columns",
1565                        idx + 1,
1566                        batch.num_columns()
1567                    )));
1568                }
1569                *idx
1570            }
1571            OrderTarget::All => {
1572                return Err(Error::InvalidArgumentError(
1573                    "ORDER BY ALL should be expanded before sorting".into(),
1574                ));
1575            }
1576        };
1577
1578        let source_array = batch.column(column_index);
1579
1580        let values: ArrayRef = match order.sort_type {
1581            OrderSortType::Native => Arc::clone(source_array),
1582            OrderSortType::CastTextToInteger => {
1583                let strings = source_array
1584                    .as_any()
1585                    .downcast_ref::<StringArray>()
1586                    .ok_or_else(|| {
1587                        Error::InvalidArgumentError(
1588                            "ORDER BY CAST expects the underlying column to be TEXT".into(),
1589                        )
1590                    })?;
1591                let mut builder = Int64Builder::with_capacity(strings.len());
1592                for i in 0..strings.len() {
1593                    if strings.is_null(i) {
1594                        builder.append_null();
1595                    } else {
1596                        match strings.value(i).parse::<i64>() {
1597                            Ok(value) => builder.append_value(value),
1598                            Err(_) => builder.append_null(),
1599                        }
1600                    }
1601                }
1602                Arc::new(builder.finish()) as ArrayRef
1603            }
1604        };
1605
1606        let sort_options = SortOptions {
1607            descending: !order.ascending,
1608            nulls_first: order.nulls_first,
1609        };
1610
1611        sort_columns.push(SortColumn {
1612            values,
1613            options: Some(sort_options),
1614        });
1615    }
1616
1617    let indices = lexsort_to_indices(&sort_columns, None).map_err(|err| {
1618        Error::InvalidArgumentError(format!("failed to compute ORDER BY indices: {err}"))
1619    })?;
1620
1621    let perm = indices
1622        .as_any()
1623        .downcast_ref::<UInt32Array>()
1624        .ok_or_else(|| Error::Internal("ORDER BY sorting produced unexpected index type".into()))?;
1625
1626    let mut reordered_columns: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
1627    for col_idx in 0..batch.num_columns() {
1628        let reordered = take(batch.column(col_idx), perm, None).map_err(|err| {
1629            Error::InvalidArgumentError(format!(
1630                "failed to apply ORDER BY permutation to column {col_idx}: {err}"
1631            ))
1632        })?;
1633        reordered_columns.push(reordered);
1634    }
1635
1636    RecordBatch::try_new(Arc::clone(schema), reordered_columns)
1637        .map_err(|err| Error::Internal(format!("failed to build reordered ORDER BY batch: {err}")))
1638}