llkv_executor/
lib.rs

1use arrow::array::{Array, ArrayRef, Int64Builder, RecordBatch, StringArray, UInt32Array};
2use arrow::compute::{SortColumn, SortOptions, concat_batches, lexsort_to_indices, take};
3use arrow::datatypes::{DataType, Schema};
4use llkv_aggregate::{AggregateAccumulator, AggregateKind, AggregateSpec, AggregateState};
5use llkv_column_map::store::Projection as StoreProjection;
6use llkv_column_map::types::LogicalFieldId;
7use llkv_expr::expr::{Expr as LlkvExpr, Filter, Operator, ScalarExpr};
8use llkv_plan::{
9    AggregateExpr, AggregateFunction, OrderByPlan, OrderSortType, OrderTarget, PlanValue,
10    SelectPlan, SelectProjection,
11};
12use llkv_result::Error;
13use llkv_storage::pager::Pager;
14use llkv_table::table::{
15    RowIdFilter, ScanOrderDirection, ScanOrderSpec, ScanOrderTransform, ScanProjection,
16    ScanStreamOptions,
17};
18use llkv_table::types::FieldId;
19use rustc_hash::FxHashMap;
20use simd_r_drive_entry_handle::EntryHandle;
21use std::fmt;
22use std::ops::Bound;
23use std::sync::atomic::{AtomicU64, Ordering};
24use std::sync::{Arc, RwLock};
25
26pub type ExecutorResult<T> = Result<T, Error>;
27
28mod projections;
29mod schema;
30pub use projections::{build_projected_columns, build_wildcard_projections};
31pub use schema::schema_for_projections;
32
33/// Trait for providing table access to the executor.
34pub trait TableProvider<P>
35where
36    P: Pager<Blob = EntryHandle> + Send + Sync,
37{
38    fn get_table(&self, canonical_name: &str) -> ExecutorResult<Arc<ExecutorTable<P>>>;
39}
40
41/// Query executor that executes SELECT plans.
42pub struct QueryExecutor<P>
43where
44    P: Pager<Blob = EntryHandle> + Send + Sync,
45{
46    provider: Arc<dyn TableProvider<P>>,
47}
48
49impl<P> QueryExecutor<P>
50where
51    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
52{
53    pub fn new(provider: Arc<dyn TableProvider<P>>) -> Self {
54        Self { provider }
55    }
56
57    pub fn execute_select(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
58        self.execute_select_with_filter(plan, None)
59    }
60
61    pub fn execute_select_with_filter(
62        &self,
63        plan: SelectPlan,
64        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
65    ) -> ExecutorResult<SelectExecution<P>> {
66        // Handle SELECT without FROM clause (e.g., SELECT 42, SELECT {'a': 1})
67        if plan.tables.is_empty() {
68            return self.execute_select_without_table(plan);
69        }
70
71        // Handle multi-table queries (cross products/joins)
72        if plan.tables.len() > 1 {
73            return self.execute_cross_product(plan);
74        }
75
76        // Single table query
77        let table_ref = &plan.tables[0];
78        let table = self.provider.get_table(&table_ref.qualified_name())?;
79        let display_name = table_ref.qualified_name();
80
81        if !plan.aggregates.is_empty() {
82            self.execute_aggregates(table, display_name, plan, row_filter)
83        } else if self.has_computed_aggregates(&plan) {
84            // Handle computed projections that contain embedded aggregates
85            self.execute_computed_aggregates(table, display_name, plan, row_filter)
86        } else {
87            self.execute_projection(table, display_name, plan, row_filter)
88        }
89    }
90
91    /// Check if any computed projections contain aggregate functions
92    fn has_computed_aggregates(&self, plan: &SelectPlan) -> bool {
93        plan.projections.iter().any(|proj| {
94            if let SelectProjection::Computed { expr, .. } = proj {
95                Self::expr_contains_aggregate(expr)
96            } else {
97                false
98            }
99        })
100    }
101
102    /// Recursively check if a scalar expression contains aggregates
103    fn expr_contains_aggregate(expr: &ScalarExpr<String>) -> bool {
104        match expr {
105            ScalarExpr::Aggregate(_) => true,
106            ScalarExpr::Binary { left, right, .. } => {
107                Self::expr_contains_aggregate(left) || Self::expr_contains_aggregate(right)
108            }
109            ScalarExpr::GetField { base, .. } => Self::expr_contains_aggregate(base),
110            ScalarExpr::Column(_) | ScalarExpr::Literal(_) => false,
111        }
112    }
113
114    /// Execute a SELECT without a FROM clause (e.g., SELECT 42, SELECT {'a': 1})
115    fn execute_select_without_table(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
116        use arrow::array::ArrayRef;
117        use arrow::datatypes::Field;
118
119        // Build schema from computed projections
120        let mut fields = Vec::new();
121        let mut arrays: Vec<ArrayRef> = Vec::new();
122
123        for proj in &plan.projections {
124            match proj {
125                SelectProjection::Computed { expr, alias } => {
126                    // Infer the data type from the expression
127                    let (field_name, dtype, array) = match expr {
128                        ScalarExpr::Literal(lit) => {
129                            let (dtype, array) = Self::literal_to_array(lit)?;
130                            (alias.clone(), dtype, array)
131                        }
132                        _ => {
133                            return Err(Error::InvalidArgumentError(
134                                "SELECT without FROM only supports literal expressions".into(),
135                            ));
136                        }
137                    };
138
139                    fields.push(Field::new(field_name, dtype, true));
140                    arrays.push(array);
141                }
142                _ => {
143                    return Err(Error::InvalidArgumentError(
144                        "SELECT without FROM only supports computed projections".into(),
145                    ));
146                }
147            }
148        }
149
150        let schema = Arc::new(Schema::new(fields));
151        let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)
152            .map_err(|e| Error::Internal(format!("failed to create record batch: {}", e)))?;
153
154        Ok(SelectExecution::new_single_batch(
155            String::new(), // No table name
156            schema,
157            batch,
158        ))
159    }
160
161    /// Convert a Literal to an Arrow array (recursive for nested structs)
162    fn literal_to_array(lit: &llkv_expr::literal::Literal) -> ExecutorResult<(DataType, ArrayRef)> {
163        use arrow::array::{
164            ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray, StructArray,
165            new_null_array,
166        };
167        use arrow::datatypes::{DataType, Field};
168        use llkv_expr::literal::Literal;
169
170        match lit {
171            Literal::Integer(v) => {
172                let val = i64::try_from(*v).unwrap_or(0);
173                Ok((
174                    DataType::Int64,
175                    Arc::new(Int64Array::from(vec![val])) as ArrayRef,
176                ))
177            }
178            Literal::Float(v) => Ok((
179                DataType::Float64,
180                Arc::new(Float64Array::from(vec![*v])) as ArrayRef,
181            )),
182            Literal::Boolean(v) => Ok((
183                DataType::Boolean,
184                Arc::new(BooleanArray::from(vec![*v])) as ArrayRef,
185            )),
186            Literal::String(v) => Ok((
187                DataType::Utf8,
188                Arc::new(StringArray::from(vec![v.clone()])) as ArrayRef,
189            )),
190            Literal::Null => Ok((DataType::Null, new_null_array(&DataType::Null, 1))),
191            Literal::Struct(struct_fields) => {
192                // Build a struct array recursively
193                let mut inner_fields = Vec::new();
194                let mut inner_arrays = Vec::new();
195
196                for (field_name, field_lit) in struct_fields {
197                    let (field_dtype, field_array) = Self::literal_to_array(field_lit)?;
198                    inner_fields.push(Field::new(field_name.clone(), field_dtype, true));
199                    inner_arrays.push(field_array);
200                }
201
202                let struct_array =
203                    StructArray::try_new(inner_fields.clone().into(), inner_arrays, None).map_err(
204                        |e| Error::Internal(format!("failed to create struct array: {}", e)),
205                    )?;
206
207                Ok((
208                    DataType::Struct(inner_fields.into()),
209                    Arc::new(struct_array) as ArrayRef,
210                ))
211            }
212        }
213    }
214
215    /// Execute a cross product query (FROM table1, table2, ...)
216    fn execute_cross_product(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
217        use arrow::compute::concat_batches;
218
219        if plan.tables.len() < 2 {
220            return Err(Error::InvalidArgumentError(
221                "cross product requires at least 2 tables".into(),
222            ));
223        }
224
225        // Get all tables
226        let mut tables = Vec::new();
227        for table_ref in &plan.tables {
228            let qualified_name = table_ref.qualified_name();
229            let table = self.provider.get_table(&qualified_name)?;
230            tables.push((table_ref.clone(), table));
231        }
232
233        // For now, support only 2-table cross product
234        if tables.len() > 2 {
235            return Err(Error::InvalidArgumentError(
236                "cross products with more than 2 tables not yet supported".into(),
237            ));
238        }
239
240        let (left_ref, left_table) = &tables[0];
241        let (right_ref, right_table) = &tables[1];
242
243        // Build the cross product using llkv-join crate
244        // For cross product, we pass empty join keys = Cartesian product
245        use llkv_join::{JoinOptions, JoinType, TableJoinExt};
246
247        let mut result_batches = Vec::new();
248        left_table.table.join_stream(
249            &right_table.table,
250            &[], // Empty join keys = cross product
251            &JoinOptions {
252                join_type: JoinType::Inner,
253                ..Default::default()
254            },
255            |batch| {
256                result_batches.push(batch);
257            },
258        )?;
259
260        // Build combined schema with qualified column names
261        let mut combined_fields = Vec::new();
262
263        // Add left table columns with schema.table.column prefix
264        for col in &left_table.schema.columns {
265            let qualified_name = format!("{}.{}.{}", left_ref.schema, left_ref.table, col.name);
266            combined_fields.push(arrow::datatypes::Field::new(
267                qualified_name,
268                col.data_type.clone(),
269                col.nullable,
270            ));
271        }
272
273        // Add right table columns with schema.table.column prefix
274        for col in &right_table.schema.columns {
275            let qualified_name = format!("{}.{}.{}", right_ref.schema, right_ref.table, col.name);
276            combined_fields.push(arrow::datatypes::Field::new(
277                qualified_name,
278                col.data_type.clone(),
279                col.nullable,
280            ));
281        }
282
283        let combined_schema = Arc::new(Schema::new(combined_fields));
284
285        // Combine all result batches with the combined schema (renames columns)
286        let mut combined_batch = if result_batches.is_empty() {
287            RecordBatch::new_empty(Arc::clone(&combined_schema))
288        } else if result_batches.len() == 1 {
289            let batch = result_batches.into_iter().next().unwrap();
290            // The batch from join has original column names, we need to apply our qualified schema
291            RecordBatch::try_new(Arc::clone(&combined_schema), batch.columns().to_vec()).map_err(
292                |e| {
293                    Error::Internal(format!(
294                        "failed to create batch with qualified names: {}",
295                        e
296                    ))
297                },
298            )?
299        } else {
300            // First concatenate with original schema
301            let original_batch = concat_batches(&result_batches[0].schema(), &result_batches)
302                .map_err(|e| Error::Internal(format!("failed to concatenate batches: {}", e)))?;
303            // Then apply qualified schema
304            RecordBatch::try_new(
305                Arc::clone(&combined_schema),
306                original_batch.columns().to_vec(),
307            )
308            .map_err(|e| {
309                Error::Internal(format!(
310                    "failed to create batch with qualified names: {}",
311                    e
312                ))
313            })?
314        };
315
316        // Apply SELECT projections if specified
317        if !plan.projections.is_empty() {
318            let mut selected_fields = Vec::new();
319            let mut selected_columns = Vec::new();
320
321            for proj in &plan.projections {
322                match proj {
323                    SelectProjection::AllColumns => {
324                        // Keep all columns
325                        selected_fields = combined_schema.fields().iter().cloned().collect();
326                        selected_columns = combined_batch.columns().to_vec();
327                        break;
328                    }
329                    SelectProjection::AllColumnsExcept { exclude } => {
330                        // Keep all columns except the excluded ones
331                        let exclude_lower: Vec<String> =
332                            exclude.iter().map(|e| e.to_ascii_lowercase()).collect();
333
334                        for (idx, field) in combined_schema.fields().iter().enumerate() {
335                            let field_name_lower = field.name().to_ascii_lowercase();
336                            if !exclude_lower.contains(&field_name_lower) {
337                                selected_fields.push(field.clone());
338                                selected_columns.push(combined_batch.column(idx).clone());
339                            }
340                        }
341                        break;
342                    }
343                    SelectProjection::Column { name, alias } => {
344                        // Find the column by qualified name
345                        let col_name = name.to_ascii_lowercase();
346                        if let Some((idx, field)) = combined_schema
347                            .fields()
348                            .iter()
349                            .enumerate()
350                            .find(|(_, f)| f.name().to_ascii_lowercase() == col_name)
351                        {
352                            let output_name = alias.as_ref().unwrap_or(name).clone();
353                            selected_fields.push(Arc::new(arrow::datatypes::Field::new(
354                                output_name,
355                                field.data_type().clone(),
356                                field.is_nullable(),
357                            )));
358                            selected_columns.push(combined_batch.column(idx).clone());
359                        } else {
360                            return Err(Error::InvalidArgumentError(format!(
361                                "column '{}' not found in cross product result",
362                                name
363                            )));
364                        }
365                    }
366                    SelectProjection::Computed { expr, alias } => {
367                        // Handle simple column references (like s1.t1.t)
368                        if let ScalarExpr::Column(col_name) = expr {
369                            let col_name_lower = col_name.to_ascii_lowercase();
370                            if let Some((idx, field)) = combined_schema
371                                .fields()
372                                .iter()
373                                .enumerate()
374                                .find(|(_, f)| f.name().to_ascii_lowercase() == col_name_lower)
375                            {
376                                selected_fields.push(Arc::new(arrow::datatypes::Field::new(
377                                    alias.clone(),
378                                    field.data_type().clone(),
379                                    field.is_nullable(),
380                                )));
381                                selected_columns.push(combined_batch.column(idx).clone());
382                            } else {
383                                return Err(Error::InvalidArgumentError(format!(
384                                    "column '{}' not found in cross product result",
385                                    col_name
386                                )));
387                            }
388                        } else {
389                            return Err(Error::InvalidArgumentError(
390                                "complex computed projections not yet supported in cross products"
391                                    .into(),
392                            ));
393                        }
394                    }
395                }
396            }
397
398            let projected_schema = Arc::new(Schema::new(selected_fields));
399            combined_batch = RecordBatch::try_new(projected_schema, selected_columns)
400                .map_err(|e| Error::Internal(format!("failed to apply projections: {}", e)))?;
401        }
402
403        Ok(SelectExecution::new_single_batch(
404            format!(
405                "{},{}",
406                left_ref.qualified_name(),
407                right_ref.qualified_name()
408            ),
409            combined_batch.schema(),
410            combined_batch,
411        ))
412    }
413
414    fn execute_projection(
415        &self,
416        table: Arc<ExecutorTable<P>>,
417        display_name: String,
418        plan: SelectPlan,
419        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
420    ) -> ExecutorResult<SelectExecution<P>> {
421        let table_ref = table.as_ref();
422        let projections = if plan.projections.is_empty() {
423            build_wildcard_projections(table_ref)
424        } else {
425            build_projected_columns(table_ref, &plan.projections)?
426        };
427        let schema = schema_for_projections(table_ref, &projections)?;
428
429        let (filter_expr, full_table_scan) = match plan.filter {
430            Some(expr) => (translate_predicate(expr, table_ref.schema.as_ref())?, false),
431            None => {
432                let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
433                    Error::InvalidArgumentError(
434                        "table has no columns; cannot perform wildcard scan".into(),
435                    )
436                })?;
437                (full_table_scan_filter(field_id), true)
438            }
439        };
440
441        let expanded_order = expand_order_targets(&plan.order_by, &projections)?;
442        let physical_order = if let Some(first) = expanded_order.first() {
443            Some(resolve_scan_order(table_ref, &projections, first)?)
444        } else {
445            None
446        };
447
448        let options = if let Some(order_spec) = physical_order {
449            if row_filter.is_some() {
450                tracing::debug!("Applying MVCC row filter with ORDER BY");
451            }
452            ScanStreamOptions {
453                include_nulls: true,
454                order: Some(order_spec),
455                row_id_filter: row_filter.clone(),
456            }
457        } else {
458            if row_filter.is_some() {
459                tracing::debug!("Applying MVCC row filter");
460            }
461            ScanStreamOptions {
462                include_nulls: true,
463                order: None,
464                row_id_filter: row_filter.clone(),
465            }
466        };
467
468        Ok(SelectExecution::new_projection(
469            display_name,
470            schema,
471            table,
472            projections,
473            filter_expr,
474            options,
475            full_table_scan,
476            expanded_order,
477        ))
478    }
479
480    fn execute_aggregates(
481        &self,
482        table: Arc<ExecutorTable<P>>,
483        display_name: String,
484        plan: SelectPlan,
485        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
486    ) -> ExecutorResult<SelectExecution<P>> {
487        let table_ref = table.as_ref();
488        let mut specs: Vec<AggregateSpec> = Vec::with_capacity(plan.aggregates.len());
489        for aggregate in plan.aggregates {
490            match aggregate {
491                AggregateExpr::CountStar { alias } => {
492                    specs.push(AggregateSpec {
493                        alias,
494                        kind: AggregateKind::CountStar,
495                    });
496                }
497                AggregateExpr::Column {
498                    column,
499                    alias,
500                    function,
501                    distinct,
502                } => {
503                    let col = table_ref.schema.resolve(&column).ok_or_else(|| {
504                        Error::InvalidArgumentError(format!(
505                            "unknown column '{}' in aggregate",
506                            column
507                        ))
508                    })?;
509                    let kind = match function {
510                        AggregateFunction::Count => {
511                            if distinct {
512                                AggregateKind::CountDistinctField {
513                                    field_id: col.field_id,
514                                }
515                            } else {
516                                AggregateKind::CountField {
517                                    field_id: col.field_id,
518                                }
519                            }
520                        }
521                        AggregateFunction::SumInt64 => {
522                            if col.data_type != DataType::Int64 {
523                                return Err(Error::InvalidArgumentError(
524                                    "SUM currently supports only INTEGER columns".into(),
525                                ));
526                            }
527                            AggregateKind::SumInt64 {
528                                field_id: col.field_id,
529                            }
530                        }
531                        AggregateFunction::MinInt64 => {
532                            if col.data_type != DataType::Int64 {
533                                return Err(Error::InvalidArgumentError(
534                                    "MIN currently supports only INTEGER columns".into(),
535                                ));
536                            }
537                            AggregateKind::MinInt64 {
538                                field_id: col.field_id,
539                            }
540                        }
541                        AggregateFunction::MaxInt64 => {
542                            if col.data_type != DataType::Int64 {
543                                return Err(Error::InvalidArgumentError(
544                                    "MAX currently supports only INTEGER columns".into(),
545                                ));
546                            }
547                            AggregateKind::MaxInt64 {
548                                field_id: col.field_id,
549                            }
550                        }
551                        AggregateFunction::CountNulls => {
552                            if distinct {
553                                return Err(Error::InvalidArgumentError(
554                                    "DISTINCT is not supported for COUNT_NULLS".into(),
555                                ));
556                            }
557                            AggregateKind::CountNulls {
558                                field_id: col.field_id,
559                            }
560                        }
561                    };
562                    specs.push(AggregateSpec { alias, kind });
563                }
564            }
565        }
566
567        if specs.is_empty() {
568            return Err(Error::InvalidArgumentError(
569                "aggregate query requires at least one aggregate expression".into(),
570            ));
571        }
572
573        let had_filter = plan.filter.is_some();
574        let filter_expr = match plan.filter {
575            Some(expr) => translate_predicate(expr, table.schema.as_ref())?,
576            None => {
577                let field_id = table.schema.first_field_id().ok_or_else(|| {
578                    Error::InvalidArgumentError(
579                        "table has no columns; cannot perform aggregate scan".into(),
580                    )
581                })?;
582                full_table_scan_filter(field_id)
583            }
584        };
585
586        // Build projections and track which projection index each spec uses
587        let mut projections = Vec::new();
588        let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
589
590        for spec in &specs {
591            if let Some(field_id) = spec.kind.field_id() {
592                let proj_idx = projections.len();
593                spec_to_projection.push(Some(proj_idx));
594                projections.push(ScanProjection::from(StoreProjection::with_alias(
595                    LogicalFieldId::for_user(table.table.table_id(), field_id),
596                    table
597                        .schema
598                        .column_by_field_id(field_id)
599                        .map(|c| c.name.clone())
600                        .unwrap_or_else(|| format!("col{field_id}")),
601                )));
602            } else {
603                spec_to_projection.push(None);
604            }
605        }
606
607        if projections.is_empty() {
608            let field_id = table.schema.first_field_id().ok_or_else(|| {
609                Error::InvalidArgumentError(
610                    "table has no columns; cannot perform aggregate scan".into(),
611                )
612            })?;
613            projections.push(ScanProjection::from(StoreProjection::with_alias(
614                LogicalFieldId::for_user(table.table.table_id(), field_id),
615                table
616                    .schema
617                    .column_by_field_id(field_id)
618                    .map(|c| c.name.clone())
619                    .unwrap_or_else(|| format!("col{field_id}")),
620            )));
621        }
622
623        let options = ScanStreamOptions {
624            include_nulls: true,
625            order: None,
626            row_id_filter: row_filter.clone(),
627        };
628
629        let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
630        // MVCC Note: We cannot use the total_rows shortcut when MVCC visibility filtering
631        // is enabled, because some rows may be invisible due to uncommitted or aborted transactions.
632        // Always scan to apply proper visibility rules.
633        let mut count_star_override: Option<i64> = None;
634        if !had_filter && row_filter.is_none() {
635            // Only use shortcut if no filter AND no MVCC row filtering
636            let total_rows = table.total_rows.load(Ordering::SeqCst);
637            tracing::debug!(
638                "[AGGREGATE] Using COUNT(*) shortcut: total_rows={}",
639                total_rows
640            );
641            if total_rows > i64::MAX as u64 {
642                return Err(Error::InvalidArgumentError(
643                    "COUNT(*) result exceeds supported range".into(),
644                ));
645            }
646            count_star_override = Some(total_rows as i64);
647        } else {
648            tracing::debug!(
649                "[AGGREGATE] NOT using COUNT(*) shortcut: had_filter={}, has_row_filter={}",
650                had_filter,
651                row_filter.is_some()
652            );
653        }
654
655        for (idx, spec) in specs.iter().enumerate() {
656            states.push(AggregateState {
657                alias: spec.alias.clone(),
658                accumulator: AggregateAccumulator::new_with_projection_index(
659                    spec,
660                    spec_to_projection[idx],
661                    count_star_override,
662                )?,
663                override_value: match spec.kind {
664                    AggregateKind::CountStar => {
665                        tracing::debug!(
666                            "[AGGREGATE] CountStar override_value={:?}",
667                            count_star_override
668                        );
669                        count_star_override
670                    }
671                    _ => None,
672                },
673            });
674        }
675
676        let mut error: Option<Error> = None;
677        match table.table.scan_stream(
678            projections,
679            &filter_expr,
680            ScanStreamOptions {
681                row_id_filter: row_filter.clone(),
682                ..options
683            },
684            |batch| {
685                if error.is_some() {
686                    return;
687                }
688                for state in &mut states {
689                    if let Err(err) = state.update(&batch) {
690                        error = Some(err);
691                        return;
692                    }
693                }
694            },
695        ) {
696            Ok(()) => {}
697            Err(llkv_result::Error::NotFound) => {
698                // Treat missing storage keys as an empty result set. This occurs
699                // for freshly created tables that have no persisted chunks yet.
700            }
701            Err(err) => return Err(err),
702        }
703        if let Some(err) = error {
704            return Err(err);
705        }
706
707        let mut fields = Vec::with_capacity(states.len());
708        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(states.len());
709        for state in states {
710            let (field, array) = state.finalize()?;
711            fields.push(field);
712            arrays.push(array);
713        }
714
715        let schema = Arc::new(Schema::new(fields));
716        let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
717        Ok(SelectExecution::new_single_batch(
718            display_name,
719            schema,
720            batch,
721        ))
722    }
723
724    /// Execute a query where computed projections contain embedded aggregates
725    /// This extracts aggregates, computes them, then evaluates the scalar expressions
726    fn execute_computed_aggregates(
727        &self,
728        table: Arc<ExecutorTable<P>>,
729        display_name: String,
730        plan: SelectPlan,
731        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
732    ) -> ExecutorResult<SelectExecution<P>> {
733        use arrow::array::Int64Array;
734        use llkv_expr::expr::AggregateCall;
735
736        let table_ref = table.as_ref();
737
738        // First, extract all unique aggregates from the projections
739        let mut aggregate_specs: Vec<(String, AggregateCall<String>)> = Vec::new();
740        for proj in &plan.projections {
741            if let SelectProjection::Computed { expr, .. } = proj {
742                Self::collect_aggregates(expr, &mut aggregate_specs);
743            }
744        }
745
746        // Compute the aggregates using the existing aggregate execution infrastructure
747        let computed_aggregates = self.compute_aggregate_values(
748            table.clone(),
749            &plan.filter,
750            &aggregate_specs,
751            row_filter.clone(),
752        )?;
753
754        // Now build the final projections by evaluating expressions with aggregates substituted
755        let mut fields = Vec::with_capacity(plan.projections.len());
756        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(plan.projections.len());
757
758        for proj in &plan.projections {
759            match proj {
760                SelectProjection::AllColumns | SelectProjection::AllColumnsExcept { .. } => {
761                    return Err(Error::InvalidArgumentError(
762                        "Wildcard projections not supported with computed aggregates".into(),
763                    ));
764                }
765                SelectProjection::Column { name, alias } => {
766                    let col = table_ref.schema.resolve(name).ok_or_else(|| {
767                        Error::InvalidArgumentError(format!("unknown column '{}'", name))
768                    })?;
769                    let field_name = alias.as_ref().unwrap_or(name);
770                    fields.push(arrow::datatypes::Field::new(
771                        field_name,
772                        col.data_type.clone(),
773                        col.nullable,
774                    ));
775                    // For regular columns in an aggregate query, we'd need to handle GROUP BY
776                    // For now, return an error as this is not supported
777                    return Err(Error::InvalidArgumentError(
778                        "Regular columns not supported in aggregate queries without GROUP BY"
779                            .into(),
780                    ));
781                }
782                SelectProjection::Computed { expr, alias } => {
783                    // Evaluate the expression with aggregates substituted
784                    let value = Self::evaluate_expr_with_aggregates(expr, &computed_aggregates)?;
785
786                    fields.push(arrow::datatypes::Field::new(alias, DataType::Int64, false));
787
788                    let array = Arc::new(Int64Array::from(vec![value])) as ArrayRef;
789                    arrays.push(array);
790                }
791            }
792        }
793
794        let schema = Arc::new(Schema::new(fields));
795        let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
796        Ok(SelectExecution::new_single_batch(
797            display_name,
798            schema,
799            batch,
800        ))
801    }
802
803    /// Collect all aggregate calls from an expression
804    fn collect_aggregates(
805        expr: &ScalarExpr<String>,
806        aggregates: &mut Vec<(String, llkv_expr::expr::AggregateCall<String>)>,
807    ) {
808        match expr {
809            ScalarExpr::Aggregate(agg) => {
810                // Create a unique key for this aggregate
811                let key = format!("{:?}", agg);
812                if !aggregates.iter().any(|(k, _)| k == &key) {
813                    aggregates.push((key, agg.clone()));
814                }
815            }
816            ScalarExpr::Binary { left, right, .. } => {
817                Self::collect_aggregates(left, aggregates);
818                Self::collect_aggregates(right, aggregates);
819            }
820            ScalarExpr::GetField { base, .. } => {
821                Self::collect_aggregates(base, aggregates);
822            }
823            ScalarExpr::Column(_) | ScalarExpr::Literal(_) => {}
824        }
825    }
826
827    /// Compute the actual values for the aggregates
828    fn compute_aggregate_values(
829        &self,
830        table: Arc<ExecutorTable<P>>,
831        filter: &Option<llkv_expr::expr::Expr<'static, String>>,
832        aggregate_specs: &[(String, llkv_expr::expr::AggregateCall<String>)],
833        row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
834    ) -> ExecutorResult<FxHashMap<String, i64>> {
835        use llkv_expr::expr::AggregateCall;
836
837        let table_ref = table.as_ref();
838        let mut results =
839            FxHashMap::with_capacity_and_hasher(aggregate_specs.len(), Default::default());
840
841        // Build aggregate specs for the aggregator
842        let mut specs: Vec<AggregateSpec> = Vec::new();
843        for (key, agg) in aggregate_specs {
844            let kind = match agg {
845                AggregateCall::CountStar => AggregateKind::CountStar,
846                AggregateCall::Count(col_name) => {
847                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
848                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
849                    })?;
850                    AggregateKind::CountField {
851                        field_id: col.field_id,
852                    }
853                }
854                AggregateCall::Sum(col_name) => {
855                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
856                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
857                    })?;
858                    AggregateKind::SumInt64 {
859                        field_id: col.field_id,
860                    }
861                }
862                AggregateCall::Min(col_name) => {
863                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
864                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
865                    })?;
866                    AggregateKind::MinInt64 {
867                        field_id: col.field_id,
868                    }
869                }
870                AggregateCall::Max(col_name) => {
871                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
872                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
873                    })?;
874                    AggregateKind::MaxInt64 {
875                        field_id: col.field_id,
876                    }
877                }
878                AggregateCall::CountNulls(col_name) => {
879                    let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
880                        Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
881                    })?;
882                    AggregateKind::CountNulls {
883                        field_id: col.field_id,
884                    }
885                }
886            };
887            specs.push(AggregateSpec {
888                alias: key.clone(),
889                kind,
890            });
891        }
892
893        // Prepare filter and projections
894        let filter_expr = match filter {
895            Some(expr) => translate_predicate(expr.clone(), table_ref.schema.as_ref())?,
896            None => {
897                let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
898                    Error::InvalidArgumentError(
899                        "table has no columns; cannot perform aggregate scan".into(),
900                    )
901                })?;
902                full_table_scan_filter(field_id)
903            }
904        };
905
906        let mut projections: Vec<ScanProjection> = Vec::new();
907        let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
908        let count_star_override: Option<i64> = None;
909
910        for spec in &specs {
911            if let Some(field_id) = spec.kind.field_id() {
912                spec_to_projection.push(Some(projections.len()));
913                projections.push(ScanProjection::from(StoreProjection::with_alias(
914                    LogicalFieldId::for_user(table.table.table_id(), field_id),
915                    table
916                        .schema
917                        .column_by_field_id(field_id)
918                        .map(|c| c.name.clone())
919                        .unwrap_or_else(|| format!("col{field_id}")),
920                )));
921            } else {
922                spec_to_projection.push(None);
923            }
924        }
925
926        if projections.is_empty() {
927            let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
928                Error::InvalidArgumentError(
929                    "table has no columns; cannot perform aggregate scan".into(),
930                )
931            })?;
932            projections.push(ScanProjection::from(StoreProjection::with_alias(
933                LogicalFieldId::for_user(table.table.table_id(), field_id),
934                table
935                    .schema
936                    .column_by_field_id(field_id)
937                    .map(|c| c.name.clone())
938                    .unwrap_or_else(|| format!("col{field_id}")),
939            )));
940        }
941
942        let base_options = ScanStreamOptions {
943            include_nulls: true,
944            order: None,
945            row_id_filter: None,
946        };
947
948        let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
949        for (idx, spec) in specs.iter().enumerate() {
950            states.push(AggregateState {
951                alias: spec.alias.clone(),
952                accumulator: AggregateAccumulator::new_with_projection_index(
953                    spec,
954                    spec_to_projection[idx],
955                    count_star_override,
956                )?,
957                override_value: match spec.kind {
958                    AggregateKind::CountStar => count_star_override,
959                    _ => None,
960                },
961            });
962        }
963
964        let mut error: Option<Error> = None;
965        match table.table.scan_stream(
966            projections,
967            &filter_expr,
968            ScanStreamOptions {
969                row_id_filter: row_filter.clone(),
970                ..base_options
971            },
972            |batch| {
973                if error.is_some() {
974                    return;
975                }
976                for state in &mut states {
977                    if let Err(err) = state.update(&batch) {
978                        error = Some(err);
979                        return;
980                    }
981                }
982            },
983        ) {
984            Ok(()) => {}
985            Err(llkv_result::Error::NotFound) => {}
986            Err(err) => return Err(err),
987        }
988        if let Some(err) = error {
989            return Err(err);
990        }
991
992        // Extract the computed values
993        for state in states {
994            let alias = state.alias.clone();
995            let (_field, array) = state.finalize()?;
996
997            // Extract the i64 value from the array
998            let int64_array = array
999                .as_any()
1000                .downcast_ref::<arrow::array::Int64Array>()
1001                .ok_or_else(|| Error::Internal("Expected Int64Array from aggregate".into()))?;
1002
1003            if int64_array.len() != 1 {
1004                return Err(Error::Internal(format!(
1005                    "Expected single value from aggregate, got {}",
1006                    int64_array.len()
1007                )));
1008            }
1009
1010            let value = if int64_array.is_null(0) {
1011                0
1012            } else {
1013                int64_array.value(0)
1014            };
1015
1016            results.insert(alias, value);
1017        }
1018
1019        Ok(results)
1020    }
1021
1022    /// Evaluate an expression by substituting aggregate values
1023    fn evaluate_expr_with_aggregates(
1024        expr: &ScalarExpr<String>,
1025        aggregates: &FxHashMap<String, i64>,
1026    ) -> ExecutorResult<i64> {
1027        use llkv_expr::expr::BinaryOp;
1028        use llkv_expr::literal::Literal;
1029
1030        match expr {
1031            ScalarExpr::Literal(Literal::Integer(v)) => Ok(*v as i64),
1032            ScalarExpr::Literal(Literal::Float(v)) => Ok(*v as i64),
1033            ScalarExpr::Literal(Literal::Boolean(v)) => Ok(if *v { 1 } else { 0 }),
1034            ScalarExpr::Literal(Literal::String(_)) => Err(Error::InvalidArgumentError(
1035                "String literals not supported in aggregate expressions".into(),
1036            )),
1037            ScalarExpr::Literal(Literal::Null) => Err(Error::InvalidArgumentError(
1038                "NULL literals not supported in aggregate expressions".into(),
1039            )),
1040            ScalarExpr::Literal(Literal::Struct(_)) => Err(Error::InvalidArgumentError(
1041                "Struct literals not supported in aggregate expressions".into(),
1042            )),
1043            ScalarExpr::Column(_) => Err(Error::InvalidArgumentError(
1044                "Column references not supported in aggregate-only expressions".into(),
1045            )),
1046            ScalarExpr::Aggregate(agg) => {
1047                let key = format!("{:?}", agg);
1048                aggregates.get(&key).copied().ok_or_else(|| {
1049                    Error::Internal(format!("Aggregate value not found for key: {}", key))
1050                })
1051            }
1052            ScalarExpr::Binary { left, op, right } => {
1053                let left_val = Self::evaluate_expr_with_aggregates(left, aggregates)?;
1054                let right_val = Self::evaluate_expr_with_aggregates(right, aggregates)?;
1055
1056                let result = match op {
1057                    BinaryOp::Add => left_val.checked_add(right_val),
1058                    BinaryOp::Subtract => left_val.checked_sub(right_val),
1059                    BinaryOp::Multiply => left_val.checked_mul(right_val),
1060                    BinaryOp::Divide => {
1061                        if right_val == 0 {
1062                            return Err(Error::InvalidArgumentError("Division by zero".into()));
1063                        }
1064                        left_val.checked_div(right_val)
1065                    }
1066                    BinaryOp::Modulo => {
1067                        if right_val == 0 {
1068                            return Err(Error::InvalidArgumentError("Modulo by zero".into()));
1069                        }
1070                        left_val.checked_rem(right_val)
1071                    }
1072                };
1073
1074                result.ok_or_else(|| {
1075                    Error::InvalidArgumentError("Arithmetic overflow in expression".into())
1076                })
1077            }
1078            ScalarExpr::GetField { .. } => Err(Error::InvalidArgumentError(
1079                "GetField not supported in aggregate-only expressions".into(),
1080            )),
1081        }
1082    }
1083}
1084
1085/// Streaming execution handle for SELECT queries.
1086#[derive(Clone)]
1087pub struct SelectExecution<P>
1088where
1089    P: Pager<Blob = EntryHandle> + Send + Sync,
1090{
1091    table_name: String,
1092    schema: Arc<Schema>,
1093    stream: SelectStream<P>,
1094}
1095
1096#[derive(Clone)]
1097enum SelectStream<P>
1098where
1099    P: Pager<Blob = EntryHandle> + Send + Sync,
1100{
1101    Projection {
1102        table: Arc<ExecutorTable<P>>,
1103        projections: Vec<ScanProjection>,
1104        filter_expr: LlkvExpr<'static, FieldId>,
1105        options: ScanStreamOptions<P>,
1106        full_table_scan: bool,
1107        order_by: Vec<OrderByPlan>,
1108    },
1109    Aggregation {
1110        batch: RecordBatch,
1111    },
1112}
1113
1114impl<P> SelectExecution<P>
1115where
1116    P: Pager<Blob = EntryHandle> + Send + Sync,
1117{
1118    #[allow(clippy::too_many_arguments)]
1119    fn new_projection(
1120        table_name: String,
1121        schema: Arc<Schema>,
1122        table: Arc<ExecutorTable<P>>,
1123        projections: Vec<ScanProjection>,
1124        filter_expr: LlkvExpr<'static, FieldId>,
1125        options: ScanStreamOptions<P>,
1126        full_table_scan: bool,
1127        order_by: Vec<OrderByPlan>,
1128    ) -> Self {
1129        Self {
1130            table_name,
1131            schema,
1132            stream: SelectStream::Projection {
1133                table,
1134                projections,
1135                filter_expr,
1136                options,
1137                full_table_scan,
1138                order_by,
1139            },
1140        }
1141    }
1142
1143    pub fn new_single_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
1144        Self {
1145            table_name,
1146            schema,
1147            stream: SelectStream::Aggregation { batch },
1148        }
1149    }
1150
1151    pub fn from_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
1152        Self::new_single_batch(table_name, schema, batch)
1153    }
1154
1155    pub fn table_name(&self) -> &str {
1156        &self.table_name
1157    }
1158
1159    pub fn schema(&self) -> Arc<Schema> {
1160        Arc::clone(&self.schema)
1161    }
1162
1163    pub fn stream(
1164        self,
1165        mut on_batch: impl FnMut(RecordBatch) -> ExecutorResult<()>,
1166    ) -> ExecutorResult<()> {
1167        let schema = Arc::clone(&self.schema);
1168        match self.stream {
1169            SelectStream::Projection {
1170                table,
1171                projections,
1172                filter_expr,
1173                options,
1174                full_table_scan,
1175                order_by,
1176            } => {
1177                // Early return for empty tables to avoid ColumnStore data_type() errors
1178                let total_rows = table.total_rows.load(Ordering::SeqCst);
1179                if total_rows == 0 {
1180                    // Empty table - return empty result with correct schema
1181                    return Ok(());
1182                }
1183
1184                let mut error: Option<Error> = None;
1185                let mut produced = false;
1186                let mut produced_rows: u64 = 0;
1187                let capture_nulls_first = matches!(options.order, Some(spec) if spec.nulls_first);
1188                let needs_post_sort = order_by.len() > 1;
1189                let collect_batches = needs_post_sort || capture_nulls_first;
1190                let include_nulls = options.include_nulls;
1191                let has_row_id_filter = options.row_id_filter.is_some();
1192                let scan_options = options;
1193                let mut buffered_batches: Vec<RecordBatch> = Vec::new();
1194                table
1195                    .table
1196                    .scan_stream(projections, &filter_expr, scan_options, |batch| {
1197                        if error.is_some() {
1198                            return;
1199                        }
1200                        produced = true;
1201                        produced_rows = produced_rows.saturating_add(batch.num_rows() as u64);
1202                        if collect_batches {
1203                            buffered_batches.push(batch);
1204                        } else if let Err(err) = on_batch(batch) {
1205                            error = Some(err);
1206                        }
1207                    })?;
1208                if let Some(err) = error {
1209                    return Err(err);
1210                }
1211                if !produced {
1212                    if total_rows > 0 {
1213                        for batch in synthesize_null_scan(Arc::clone(&schema), total_rows)? {
1214                            on_batch(batch)?;
1215                        }
1216                    }
1217                    return Ok(());
1218                }
1219                let mut null_batches: Vec<RecordBatch> = Vec::new();
1220                // Only synthesize null rows if:
1221                // 1. include_nulls is true
1222                // 2. This is a full table scan
1223                // 3. We produced fewer rows than the total
1224                // 4. We DON'T have a row_id_filter (e.g., MVCC filter) that intentionally filtered rows
1225                if include_nulls
1226                    && full_table_scan
1227                    && produced_rows < total_rows
1228                    && !has_row_id_filter
1229                {
1230                    let missing = total_rows - produced_rows;
1231                    if missing > 0 {
1232                        null_batches = synthesize_null_scan(Arc::clone(&schema), missing)?;
1233                    }
1234                }
1235
1236                if collect_batches {
1237                    if needs_post_sort {
1238                        if !null_batches.is_empty() {
1239                            buffered_batches.extend(null_batches);
1240                        }
1241                        if !buffered_batches.is_empty() {
1242                            let combined =
1243                                concat_batches(&schema, &buffered_batches).map_err(|err| {
1244                                    Error::InvalidArgumentError(format!(
1245                                        "failed to concatenate result batches for ORDER BY: {}",
1246                                        err
1247                                    ))
1248                                })?;
1249                            let sorted_batch =
1250                                sort_record_batch_with_order(&schema, &combined, &order_by)?;
1251                            on_batch(sorted_batch)?;
1252                        }
1253                    } else if capture_nulls_first {
1254                        for batch in null_batches {
1255                            on_batch(batch)?;
1256                        }
1257                        for batch in buffered_batches {
1258                            on_batch(batch)?;
1259                        }
1260                    }
1261                } else if !null_batches.is_empty() {
1262                    for batch in null_batches {
1263                        on_batch(batch)?;
1264                    }
1265                }
1266                Ok(())
1267            }
1268            SelectStream::Aggregation { batch } => on_batch(batch),
1269        }
1270    }
1271
1272    pub fn collect(self) -> ExecutorResult<Vec<RecordBatch>> {
1273        let mut batches = Vec::new();
1274        self.stream(|batch| {
1275            batches.push(batch);
1276            Ok(())
1277        })?;
1278        Ok(batches)
1279    }
1280
1281    pub fn collect_rows(self) -> ExecutorResult<RowBatch> {
1282        let schema = self.schema();
1283        let mut rows: Vec<Vec<PlanValue>> = Vec::new();
1284        self.stream(|batch| {
1285            for row_idx in 0..batch.num_rows() {
1286                let mut row: Vec<PlanValue> = Vec::with_capacity(batch.num_columns());
1287                for col_idx in 0..batch.num_columns() {
1288                    let value = llkv_plan::plan_value_from_array(batch.column(col_idx), row_idx)?;
1289                    row.push(value);
1290                }
1291                rows.push(row);
1292            }
1293            Ok(())
1294        })?;
1295        let columns = schema
1296            .fields()
1297            .iter()
1298            .map(|field| field.name().to_string())
1299            .collect();
1300        Ok(RowBatch { columns, rows })
1301    }
1302
1303    pub fn into_rows(self) -> ExecutorResult<Vec<Vec<PlanValue>>> {
1304        Ok(self.collect_rows()?.rows)
1305    }
1306}
1307
1308impl<P> fmt::Debug for SelectExecution<P>
1309where
1310    P: Pager<Blob = EntryHandle> + Send + Sync,
1311{
1312    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1313        f.debug_struct("SelectExecution")
1314            .field("table_name", &self.table_name)
1315            .field("schema", &self.schema)
1316            .finish()
1317    }
1318}
1319
1320pub struct ExecutorTable<P>
1321where
1322    P: Pager<Blob = EntryHandle> + Send + Sync,
1323{
1324    // underlying physical table from llkv_table
1325    pub table: Arc<llkv_table::table::Table<P>>,
1326    pub schema: Arc<ExecutorSchema>,
1327    pub next_row_id: AtomicU64,
1328    pub total_rows: AtomicU64,
1329    pub multi_column_uniques: RwLock<Vec<ExecutorMultiColumnUnique>>,
1330}
1331
1332pub struct ExecutorSchema {
1333    pub columns: Vec<ExecutorColumn>,
1334    pub lookup: FxHashMap<String, usize>,
1335}
1336
1337impl ExecutorSchema {
1338    pub fn resolve(&self, name: &str) -> Option<&ExecutorColumn> {
1339        let normalized = name.to_ascii_lowercase();
1340        self.lookup
1341            .get(&normalized)
1342            .and_then(|idx| self.columns.get(*idx))
1343    }
1344
1345    pub fn first_field_id(&self) -> Option<FieldId> {
1346        self.columns.first().map(|col| col.field_id)
1347    }
1348
1349    pub fn column_by_field_id(&self, field_id: FieldId) -> Option<&ExecutorColumn> {
1350        self.columns.iter().find(|col| col.field_id == field_id)
1351    }
1352}
1353
1354#[derive(Clone)]
1355pub struct ExecutorColumn {
1356    pub name: String,
1357    pub data_type: DataType,
1358    pub nullable: bool,
1359    pub primary_key: bool,
1360    pub unique: bool,
1361    pub field_id: FieldId,
1362    pub check_expr: Option<String>,
1363}
1364
1365#[derive(Clone, Debug, PartialEq, Eq)]
1366pub struct ExecutorMultiColumnUnique {
1367    pub index_name: Option<String>,
1368    pub column_indices: Vec<usize>,
1369}
1370
1371impl<P> ExecutorTable<P>
1372where
1373    P: Pager<Blob = EntryHandle> + Send + Sync,
1374{
1375    pub fn multi_column_uniques(&self) -> Vec<ExecutorMultiColumnUnique> {
1376        self.multi_column_uniques.read().unwrap().clone()
1377    }
1378
1379    pub fn set_multi_column_uniques(&self, uniques: Vec<ExecutorMultiColumnUnique>) {
1380        *self.multi_column_uniques.write().unwrap() = uniques;
1381    }
1382
1383    pub fn add_multi_column_unique(&self, unique: ExecutorMultiColumnUnique) {
1384        let mut guard = self.multi_column_uniques.write().unwrap();
1385        if !guard
1386            .iter()
1387            .any(|existing| existing.column_indices == unique.column_indices)
1388        {
1389            guard.push(unique);
1390        }
1391    }
1392}
1393
1394// Re-export from llkv-plan
1395// PlanValue is the plan-level value type; do not re-export higher-level prefixed symbols.
1396
1397// Export executor-local types with explicit Exec-prefixed names to avoid
1398// colliding with Arrow / storage types imported above.
1399// Executor types are public `ExecutorColumn`, `ExecutorSchema`, `ExecutorTable`.
1400// No short `Exec*` aliases to avoid confusion.
1401
1402pub struct RowBatch {
1403    pub columns: Vec<String>,
1404    pub rows: Vec<Vec<PlanValue>>,
1405}
1406
1407fn expand_order_targets(
1408    order_items: &[OrderByPlan],
1409    projections: &[ScanProjection],
1410) -> ExecutorResult<Vec<OrderByPlan>> {
1411    let mut expanded = Vec::new();
1412
1413    for item in order_items {
1414        match &item.target {
1415            OrderTarget::All => {
1416                if projections.is_empty() {
1417                    return Err(Error::InvalidArgumentError(
1418                        "ORDER BY ALL requires at least one projection".into(),
1419                    ));
1420                }
1421
1422                for (idx, projection) in projections.iter().enumerate() {
1423                    if matches!(projection, ScanProjection::Computed { .. }) {
1424                        return Err(Error::InvalidArgumentError(
1425                            "ORDER BY ALL cannot reference computed projections".into(),
1426                        ));
1427                    }
1428
1429                    let mut clone = item.clone();
1430                    clone.target = OrderTarget::Index(idx);
1431                    expanded.push(clone);
1432                }
1433            }
1434            _ => expanded.push(item.clone()),
1435        }
1436    }
1437
1438    Ok(expanded)
1439}
1440
1441fn resolve_scan_order<P>(
1442    table: &ExecutorTable<P>,
1443    projections: &[ScanProjection],
1444    order_plan: &OrderByPlan,
1445) -> ExecutorResult<ScanOrderSpec>
1446where
1447    P: Pager<Blob = EntryHandle> + Send + Sync,
1448{
1449    let (column, field_id) = match &order_plan.target {
1450        OrderTarget::Column(name) => {
1451            let column = table.schema.resolve(name).ok_or_else(|| {
1452                Error::InvalidArgumentError(format!("unknown column '{}' in ORDER BY", name))
1453            })?;
1454            (column, column.field_id)
1455        }
1456        OrderTarget::Index(position) => {
1457            let projection = projections.get(*position).ok_or_else(|| {
1458                Error::InvalidArgumentError(format!(
1459                    "ORDER BY position {} is out of range",
1460                    position + 1
1461                ))
1462            })?;
1463            match projection {
1464                ScanProjection::Column(store_projection) => {
1465                    let field_id = store_projection.logical_field_id.field_id();
1466                    let column = table.schema.column_by_field_id(field_id).ok_or_else(|| {
1467                        Error::InvalidArgumentError(format!(
1468                            "unknown column with field id {field_id} in ORDER BY"
1469                        ))
1470                    })?;
1471                    (column, field_id)
1472                }
1473                ScanProjection::Computed { .. } => {
1474                    return Err(Error::InvalidArgumentError(
1475                        "ORDER BY position referring to computed projection is not supported"
1476                            .into(),
1477                    ));
1478                }
1479            }
1480        }
1481        OrderTarget::All => {
1482            return Err(Error::InvalidArgumentError(
1483                "ORDER BY ALL should be expanded before execution".into(),
1484            ));
1485        }
1486    };
1487
1488    let transform = match order_plan.sort_type {
1489        OrderSortType::Native => match column.data_type {
1490            DataType::Int64 => ScanOrderTransform::IdentityInteger,
1491            DataType::Utf8 => ScanOrderTransform::IdentityUtf8,
1492            ref other => {
1493                return Err(Error::InvalidArgumentError(format!(
1494                    "ORDER BY on column type {:?} is not supported",
1495                    other
1496                )));
1497            }
1498        },
1499        OrderSortType::CastTextToInteger => {
1500            if column.data_type != DataType::Utf8 {
1501                return Err(Error::InvalidArgumentError(
1502                    "ORDER BY CAST expects a text column".into(),
1503                ));
1504            }
1505            ScanOrderTransform::CastUtf8ToInteger
1506        }
1507    };
1508
1509    let direction = if order_plan.ascending {
1510        ScanOrderDirection::Ascending
1511    } else {
1512        ScanOrderDirection::Descending
1513    };
1514
1515    Ok(ScanOrderSpec {
1516        field_id,
1517        direction,
1518        nulls_first: order_plan.nulls_first,
1519        transform,
1520    })
1521}
1522
1523fn full_table_scan_filter(field_id: FieldId) -> LlkvExpr<'static, FieldId> {
1524    LlkvExpr::Pred(Filter {
1525        field_id,
1526        op: Operator::Range {
1527            lower: Bound::Unbounded,
1528            upper: Bound::Unbounded,
1529        },
1530    })
1531}
1532
1533fn synthesize_null_scan(schema: Arc<Schema>, total_rows: u64) -> ExecutorResult<Vec<RecordBatch>> {
1534    let row_count = usize::try_from(total_rows).map_err(|_| {
1535        Error::InvalidArgumentError("table row count exceeds supported in-memory batch size".into())
1536    })?;
1537
1538    let mut arrays: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1539    for field in schema.fields() {
1540        match field.data_type() {
1541            DataType::Int64 => {
1542                let mut builder = Int64Builder::with_capacity(row_count);
1543                for _ in 0..row_count {
1544                    builder.append_null();
1545                }
1546                arrays.push(Arc::new(builder.finish()));
1547            }
1548            DataType::Float64 => {
1549                let mut builder = arrow::array::Float64Builder::with_capacity(row_count);
1550                for _ in 0..row_count {
1551                    builder.append_null();
1552                }
1553                arrays.push(Arc::new(builder.finish()));
1554            }
1555            DataType::Utf8 => {
1556                let mut builder = arrow::array::StringBuilder::with_capacity(row_count, 0);
1557                for _ in 0..row_count {
1558                    builder.append_null();
1559                }
1560                arrays.push(Arc::new(builder.finish()));
1561            }
1562            DataType::Date32 => {
1563                let mut builder = arrow::array::Date32Builder::with_capacity(row_count);
1564                for _ in 0..row_count {
1565                    builder.append_null();
1566                }
1567                arrays.push(Arc::new(builder.finish()));
1568            }
1569            other => {
1570                return Err(Error::InvalidArgumentError(format!(
1571                    "unsupported data type in null synthesis: {other:?}"
1572                )));
1573            }
1574        }
1575    }
1576
1577    let batch = RecordBatch::try_new(schema, arrays)?;
1578    Ok(vec![batch])
1579}
1580
1581fn sort_record_batch_with_order(
1582    schema: &Arc<Schema>,
1583    batch: &RecordBatch,
1584    order_by: &[OrderByPlan],
1585) -> ExecutorResult<RecordBatch> {
1586    if order_by.is_empty() {
1587        return Ok(batch.clone());
1588    }
1589
1590    let mut sort_columns: Vec<SortColumn> = Vec::with_capacity(order_by.len());
1591
1592    for order in order_by {
1593        let column_index = match &order.target {
1594            OrderTarget::Column(name) => schema.index_of(name).map_err(|_| {
1595                Error::InvalidArgumentError(format!(
1596                    "ORDER BY references unknown column '{}'",
1597                    name
1598                ))
1599            })?,
1600            OrderTarget::Index(idx) => {
1601                if *idx >= batch.num_columns() {
1602                    return Err(Error::InvalidArgumentError(format!(
1603                        "ORDER BY position {} is out of bounds for {} columns",
1604                        idx + 1,
1605                        batch.num_columns()
1606                    )));
1607                }
1608                *idx
1609            }
1610            OrderTarget::All => {
1611                return Err(Error::InvalidArgumentError(
1612                    "ORDER BY ALL should be expanded before sorting".into(),
1613                ));
1614            }
1615        };
1616
1617        let source_array = batch.column(column_index);
1618
1619        let values: ArrayRef = match order.sort_type {
1620            OrderSortType::Native => Arc::clone(source_array),
1621            OrderSortType::CastTextToInteger => {
1622                let strings = source_array
1623                    .as_any()
1624                    .downcast_ref::<StringArray>()
1625                    .ok_or_else(|| {
1626                        Error::InvalidArgumentError(
1627                            "ORDER BY CAST expects the underlying column to be TEXT".into(),
1628                        )
1629                    })?;
1630                let mut builder = Int64Builder::with_capacity(strings.len());
1631                for i in 0..strings.len() {
1632                    if strings.is_null(i) {
1633                        builder.append_null();
1634                    } else {
1635                        match strings.value(i).parse::<i64>() {
1636                            Ok(value) => builder.append_value(value),
1637                            Err(_) => builder.append_null(),
1638                        }
1639                    }
1640                }
1641                Arc::new(builder.finish()) as ArrayRef
1642            }
1643        };
1644
1645        let sort_options = SortOptions {
1646            descending: !order.ascending,
1647            nulls_first: order.nulls_first,
1648        };
1649
1650        sort_columns.push(SortColumn {
1651            values,
1652            options: Some(sort_options),
1653        });
1654    }
1655
1656    let indices = lexsort_to_indices(&sort_columns, None).map_err(|err| {
1657        Error::InvalidArgumentError(format!("failed to compute ORDER BY indices: {err}"))
1658    })?;
1659
1660    let perm = indices
1661        .as_any()
1662        .downcast_ref::<UInt32Array>()
1663        .ok_or_else(|| Error::Internal("ORDER BY sorting produced unexpected index type".into()))?;
1664
1665    let mut reordered_columns: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
1666    for col_idx in 0..batch.num_columns() {
1667        let reordered = take(batch.column(col_idx), perm, None).map_err(|err| {
1668            Error::InvalidArgumentError(format!(
1669                "failed to apply ORDER BY permutation to column {col_idx}: {err}"
1670            ))
1671        })?;
1672        reordered_columns.push(reordered);
1673    }
1674
1675    RecordBatch::try_new(Arc::clone(schema), reordered_columns)
1676        .map_err(|err| Error::Internal(format!("failed to build reordered ORDER BY batch: {err}")))
1677}
1678
1679// Translate predicate from column names to field IDs
1680fn translate_predicate(
1681    expr: llkv_expr::expr::Expr<'static, String>,
1682    schema: &ExecutorSchema,
1683) -> ExecutorResult<llkv_expr::expr::Expr<'static, FieldId>> {
1684    use llkv_expr::expr::Expr;
1685    match expr {
1686        Expr::And(exprs) => {
1687            let translated: Result<Vec<_>, _> = exprs
1688                .into_iter()
1689                .map(|e| translate_predicate(e, schema))
1690                .collect();
1691            Ok(Expr::And(translated?))
1692        }
1693        Expr::Or(exprs) => {
1694            let translated: Result<Vec<_>, _> = exprs
1695                .into_iter()
1696                .map(|e| translate_predicate(e, schema))
1697                .collect();
1698            Ok(Expr::Or(translated?))
1699        }
1700        Expr::Not(inner) => {
1701            let translated = translate_predicate(*inner, schema)?;
1702            Ok(Expr::Not(Box::new(translated)))
1703        }
1704        Expr::Pred(filter) => {
1705            let column = schema.resolve(&filter.field_id).ok_or_else(|| {
1706                Error::InvalidArgumentError(format!("unknown column '{}'", filter.field_id))
1707            })?;
1708            Ok(Expr::Pred(Filter {
1709                field_id: column.field_id,
1710                op: filter.op,
1711            }))
1712        }
1713        Expr::Compare { left, op, right } => Ok(Expr::Compare {
1714            left: translate_scalar(&left, schema)?,
1715            op,
1716            right: translate_scalar(&right, schema)?,
1717        }),
1718    }
1719}
1720
1721// Translate scalar expressions
1722fn translate_scalar(
1723    expr: &ScalarExpr<String>,
1724    schema: &ExecutorSchema,
1725) -> ExecutorResult<ScalarExpr<FieldId>> {
1726    match expr {
1727        ScalarExpr::Literal(lit) => Ok(ScalarExpr::Literal(lit.clone())),
1728        ScalarExpr::Column(name) => {
1729            let column = schema
1730                .resolve(name)
1731                .ok_or_else(|| Error::InvalidArgumentError(format!("unknown column '{}'", name)))?;
1732            Ok(ScalarExpr::Column(column.field_id))
1733        }
1734        ScalarExpr::Binary { left, op, right } => Ok(ScalarExpr::Binary {
1735            left: Box::new(translate_scalar(left, schema)?),
1736            op: *op,
1737            right: Box::new(translate_scalar(right, schema)?),
1738        }),
1739        ScalarExpr::Aggregate(agg) => {
1740            // Translate column names in aggregate calls to field IDs
1741            use llkv_expr::expr::AggregateCall;
1742            let translated_agg = match agg {
1743                AggregateCall::CountStar => AggregateCall::CountStar,
1744                AggregateCall::Count(name) => {
1745                    let column = schema.resolve(name).ok_or_else(|| {
1746                        Error::InvalidArgumentError(format!("unknown column '{}'", name))
1747                    })?;
1748                    AggregateCall::Count(column.field_id)
1749                }
1750                AggregateCall::Sum(name) => {
1751                    let column = schema.resolve(name).ok_or_else(|| {
1752                        Error::InvalidArgumentError(format!("unknown column '{}'", name))
1753                    })?;
1754                    AggregateCall::Sum(column.field_id)
1755                }
1756                AggregateCall::Min(name) => {
1757                    let column = schema.resolve(name).ok_or_else(|| {
1758                        Error::InvalidArgumentError(format!("unknown column '{}'", name))
1759                    })?;
1760                    AggregateCall::Min(column.field_id)
1761                }
1762                AggregateCall::Max(name) => {
1763                    let column = schema.resolve(name).ok_or_else(|| {
1764                        Error::InvalidArgumentError(format!("unknown column '{}'", name))
1765                    })?;
1766                    AggregateCall::Max(column.field_id)
1767                }
1768                AggregateCall::CountNulls(name) => {
1769                    let column = schema.resolve(name).ok_or_else(|| {
1770                        Error::InvalidArgumentError(format!("unknown column '{}'", name))
1771                    })?;
1772                    AggregateCall::CountNulls(column.field_id)
1773                }
1774            };
1775            Ok(ScalarExpr::Aggregate(translated_agg))
1776        }
1777        ScalarExpr::GetField { base, field_name } => Ok(ScalarExpr::GetField {
1778            base: Box::new(translate_scalar(base, schema)?),
1779            field_name: field_name.clone(),
1780        }),
1781    }
1782}