1use arrow::array::{
18 Array, ArrayRef, BooleanArray, BooleanBuilder, Float64Array, Int64Array, Int64Builder,
19 RecordBatch, StringArray, UInt32Array, new_null_array,
20};
21use arrow::compute::{
22 SortColumn, SortOptions, concat_batches, filter_record_batch, lexsort_to_indices, take,
23};
24use arrow::datatypes::{DataType, Field, Float64Type, Int64Type, Schema};
25use llkv_aggregate::{AggregateAccumulator, AggregateKind, AggregateSpec, AggregateState};
26use llkv_column_map::store::Projection as StoreProjection;
27use llkv_column_map::types::LogicalFieldId;
28use llkv_expr::expr::{AggregateCall, CompareOp, Expr as LlkvExpr, Filter, Operator, ScalarExpr};
29use llkv_expr::literal::Literal;
30use llkv_expr::typed_predicate::{
31 build_bool_predicate, build_fixed_width_predicate, build_var_width_predicate,
32};
33use llkv_join::cross_join_pair;
34use llkv_plan::{
35 AggregateExpr, AggregateFunction, CanonicalRow, OrderByPlan, OrderSortType, OrderTarget,
36 PlanValue, SelectPlan, SelectProjection,
37};
38use llkv_result::Error;
39use llkv_storage::pager::Pager;
40use llkv_table::table::{
41 RowIdFilter, ScanOrderDirection, ScanOrderSpec, ScanOrderTransform, ScanProjection,
42 ScanStreamOptions,
43};
44use llkv_table::types::FieldId;
45use llkv_table::{NumericArray, NumericArrayMap, NumericKernels, ROW_ID_FIELD_ID};
46use rustc_hash::{FxHashMap, FxHashSet};
47use simd_r_drive_entry_handle::EntryHandle;
48use std::fmt;
49use std::sync::Arc;
50use std::sync::atomic::Ordering;
51
52pub mod insert;
57pub mod translation;
58pub mod types;
59pub mod utils;
60
61pub type ExecutorResult<T> = Result<T, Error>;
67
68pub use insert::{
69 build_array_for_column, normalize_insert_value_for_column, resolve_insert_columns,
70};
71pub use translation::{
72 build_projected_columns, build_wildcard_projections, full_table_scan_filter,
73 resolve_field_id_from_schema, schema_for_projections, translate_predicate,
74 translate_predicate_with, translate_scalar, translate_scalar_with,
75};
76pub use types::{
77 ExecutorColumn, ExecutorMultiColumnUnique, ExecutorRowBatch, ExecutorSchema, ExecutorTable,
78 ExecutorTableProvider,
79};
80pub use utils::current_time_micros;
81
82pub struct QueryExecutor<P>
89where
90 P: Pager<Blob = EntryHandle> + Send + Sync,
91{
92 provider: Arc<dyn ExecutorTableProvider<P>>,
93}
94
95impl<P> QueryExecutor<P>
96where
97 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
98{
99 pub fn new(provider: Arc<dyn ExecutorTableProvider<P>>) -> Self {
100 Self { provider }
101 }
102
103 pub fn execute_select(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
104 self.execute_select_with_filter(plan, None)
105 }
106
107 pub fn execute_select_with_filter(
108 &self,
109 plan: SelectPlan,
110 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
111 ) -> ExecutorResult<SelectExecution<P>> {
112 if plan.tables.is_empty() {
114 return self.execute_select_without_table(plan);
115 }
116
117 if plan.tables.len() > 1 {
119 return self.execute_cross_product(plan);
120 }
121
122 let table_ref = &plan.tables[0];
124 let table = self.provider.get_table(&table_ref.qualified_name())?;
125 let display_name = table_ref.qualified_name();
126
127 if !plan.aggregates.is_empty() {
128 self.execute_aggregates(table, display_name, plan, row_filter)
129 } else if self.has_computed_aggregates(&plan) {
130 self.execute_computed_aggregates(table, display_name, plan, row_filter)
132 } else {
133 self.execute_projection(table, display_name, plan, row_filter)
134 }
135 }
136
137 fn has_computed_aggregates(&self, plan: &SelectPlan) -> bool {
139 plan.projections.iter().any(|proj| {
140 if let SelectProjection::Computed { expr, .. } = proj {
141 Self::expr_contains_aggregate(expr)
142 } else {
143 false
144 }
145 })
146 }
147
148 fn expr_contains_aggregate(expr: &ScalarExpr<String>) -> bool {
150 match expr {
151 ScalarExpr::Aggregate(_) => true,
152 ScalarExpr::Binary { left, right, .. } => {
153 Self::expr_contains_aggregate(left) || Self::expr_contains_aggregate(right)
154 }
155 ScalarExpr::GetField { base, .. } => Self::expr_contains_aggregate(base),
156 ScalarExpr::Column(_) | ScalarExpr::Literal(_) => false,
157 }
158 }
159
160 fn execute_select_without_table(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
162 use arrow::array::ArrayRef;
163 use arrow::datatypes::Field;
164
165 let mut fields = Vec::new();
167 let mut arrays: Vec<ArrayRef> = Vec::new();
168
169 for proj in &plan.projections {
170 match proj {
171 SelectProjection::Computed { expr, alias } => {
172 let (field_name, dtype, array) = match expr {
174 ScalarExpr::Literal(lit) => {
175 let (dtype, array) = Self::literal_to_array(lit)?;
176 (alias.clone(), dtype, array)
177 }
178 _ => {
179 return Err(Error::InvalidArgumentError(
180 "SELECT without FROM only supports literal expressions".into(),
181 ));
182 }
183 };
184
185 fields.push(Field::new(field_name, dtype, true));
186 arrays.push(array);
187 }
188 _ => {
189 return Err(Error::InvalidArgumentError(
190 "SELECT without FROM only supports computed projections".into(),
191 ));
192 }
193 }
194 }
195
196 let schema = Arc::new(Schema::new(fields));
197 let mut batch = RecordBatch::try_new(Arc::clone(&schema), arrays)
198 .map_err(|e| Error::Internal(format!("failed to create record batch: {}", e)))?;
199
200 if plan.distinct {
201 let mut state = DistinctState::default();
202 batch = match distinct_filter_batch(batch, &mut state)? {
203 Some(filtered) => filtered,
204 None => RecordBatch::new_empty(Arc::clone(&schema)),
205 };
206 }
207
208 let schema = batch.schema();
209
210 Ok(SelectExecution::new_single_batch(
211 String::new(), schema,
213 batch,
214 ))
215 }
216
217 fn literal_to_array(lit: &llkv_expr::literal::Literal) -> ExecutorResult<(DataType, ArrayRef)> {
219 use arrow::array::{
220 ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray, StructArray,
221 new_null_array,
222 };
223 use arrow::datatypes::{DataType, Field};
224 use llkv_expr::literal::Literal;
225
226 match lit {
227 Literal::Integer(v) => {
228 let val = i64::try_from(*v).unwrap_or(0);
229 Ok((
230 DataType::Int64,
231 Arc::new(Int64Array::from(vec![val])) as ArrayRef,
232 ))
233 }
234 Literal::Float(v) => Ok((
235 DataType::Float64,
236 Arc::new(Float64Array::from(vec![*v])) as ArrayRef,
237 )),
238 Literal::Boolean(v) => Ok((
239 DataType::Boolean,
240 Arc::new(BooleanArray::from(vec![*v])) as ArrayRef,
241 )),
242 Literal::String(v) => Ok((
243 DataType::Utf8,
244 Arc::new(StringArray::from(vec![v.clone()])) as ArrayRef,
245 )),
246 Literal::Null => Ok((DataType::Null, new_null_array(&DataType::Null, 1))),
247 Literal::Struct(struct_fields) => {
248 let mut inner_fields = Vec::new();
250 let mut inner_arrays = Vec::new();
251
252 for (field_name, field_lit) in struct_fields {
253 let (field_dtype, field_array) = Self::literal_to_array(field_lit)?;
254 inner_fields.push(Field::new(field_name.clone(), field_dtype, true));
255 inner_arrays.push(field_array);
256 }
257
258 let struct_array =
259 StructArray::try_new(inner_fields.clone().into(), inner_arrays, None).map_err(
260 |e| Error::Internal(format!("failed to create struct array: {}", e)),
261 )?;
262
263 Ok((
264 DataType::Struct(inner_fields.into()),
265 Arc::new(struct_array) as ArrayRef,
266 ))
267 }
268 }
269 }
270
271 fn execute_cross_product(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
273 use arrow::compute::concat_batches;
274
275 if plan.tables.len() < 2 {
276 return Err(Error::InvalidArgumentError(
277 "cross product requires at least 2 tables".into(),
278 ));
279 }
280
281 let mut tables = Vec::with_capacity(plan.tables.len());
283 for table_ref in &plan.tables {
284 let qualified_name = table_ref.qualified_name();
285 let table = self.provider.get_table(&qualified_name)?;
286 tables.push((table_ref.clone(), table));
287 }
288
289 let mut staged: Vec<TableCrossProductData> = Vec::with_capacity(tables.len());
290 for (table_ref, table) in &tables {
291 staged.push(collect_table_data(table_ref, table.as_ref())?);
292 }
293
294 let mut staged_iter = staged.into_iter();
295 let mut current = staged_iter
296 .next()
297 .ok_or_else(|| Error::Internal("cross product preparation yielded no tables".into()))?;
298
299 for next in staged_iter {
300 current = cross_join_table_batches(current, next)?;
301 }
302
303 let TableCrossProductData {
304 schema: combined_schema,
305 batches: mut combined_batches,
306 column_counts,
307 } = current;
308
309 let column_lookup_map = build_cross_product_column_lookup(
310 combined_schema.as_ref(),
311 &plan.tables,
312 &column_counts,
313 );
314
315 if let Some(filter_expr) = &plan.filter {
316 let mut filter_context = CrossProductExpressionContext::new(
317 combined_schema.as_ref(),
318 column_lookup_map.clone(),
319 )?;
320 let translated_filter =
321 translate_predicate(filter_expr.clone(), filter_context.schema(), |name| {
322 Error::InvalidArgumentError(format!(
323 "column '{}' not found in cross product result",
324 name
325 ))
326 })?;
327
328 let mut filtered_batches = Vec::with_capacity(combined_batches.len());
329 for batch in combined_batches.into_iter() {
330 filter_context.reset();
331 let mask = filter_context.evaluate_predicate_mask(&translated_filter, &batch)?;
332 let filtered = filter_record_batch(&batch, &mask).map_err(|err| {
333 Error::InvalidArgumentError(format!(
334 "failed to apply cross product filter: {err}"
335 ))
336 })?;
337 if filtered.num_rows() > 0 {
338 filtered_batches.push(filtered);
339 }
340 }
341 combined_batches = filtered_batches;
342 }
343
344 let mut combined_batch = if combined_batches.is_empty() {
345 RecordBatch::new_empty(Arc::clone(&combined_schema))
346 } else if combined_batches.len() == 1 {
347 combined_batches.pop().unwrap()
348 } else {
349 concat_batches(&combined_schema, &combined_batches).map_err(|e| {
350 Error::Internal(format!(
351 "failed to concatenate cross product batches: {}",
352 e
353 ))
354 })?
355 };
356
357 if !plan.projections.is_empty() {
359 let mut selected_fields = Vec::new();
360 let mut selected_columns = Vec::new();
361 let mut expr_context: Option<CrossProductExpressionContext> = None;
362
363 for proj in &plan.projections {
364 match proj {
365 SelectProjection::AllColumns => {
366 selected_fields = combined_schema.fields().iter().cloned().collect();
368 selected_columns = combined_batch.columns().to_vec();
369 break;
370 }
371 SelectProjection::AllColumnsExcept { exclude } => {
372 let exclude_lower: Vec<String> =
374 exclude.iter().map(|e| e.to_ascii_lowercase()).collect();
375
376 for (idx, field) in combined_schema.fields().iter().enumerate() {
377 let field_name_lower = field.name().to_ascii_lowercase();
378 if !exclude_lower.contains(&field_name_lower) {
379 selected_fields.push(field.clone());
380 selected_columns.push(combined_batch.column(idx).clone());
381 }
382 }
383 break;
384 }
385 SelectProjection::Column { name, alias } => {
386 let col_name = name.to_ascii_lowercase();
388 if let Some(&idx) = column_lookup_map.get(&col_name) {
389 let field = combined_schema.field(idx);
390 let output_name = alias.as_ref().unwrap_or(name).clone();
391 selected_fields.push(Arc::new(arrow::datatypes::Field::new(
392 output_name,
393 field.data_type().clone(),
394 field.is_nullable(),
395 )));
396 selected_columns.push(combined_batch.column(idx).clone());
397 } else {
398 return Err(Error::InvalidArgumentError(format!(
399 "column '{}' not found in cross product result",
400 name
401 )));
402 }
403 }
404 SelectProjection::Computed { expr, alias } => {
405 if expr_context.is_none() {
406 expr_context = Some(CrossProductExpressionContext::new(
407 combined_schema.as_ref(),
408 column_lookup_map.clone(),
409 )?);
410 }
411 let context = expr_context
412 .as_mut()
413 .expect("projection context must be initialized");
414 let evaluated = context.evaluate(expr, &combined_batch)?;
415 let field = Arc::new(arrow::datatypes::Field::new(
416 alias.clone(),
417 evaluated.data_type().clone(),
418 true,
419 ));
420 selected_fields.push(field);
421 selected_columns.push(evaluated);
422 }
423 }
424 }
425
426 let projected_schema = Arc::new(Schema::new(selected_fields));
427 combined_batch = RecordBatch::try_new(projected_schema, selected_columns)
428 .map_err(|e| Error::Internal(format!("failed to apply projections: {}", e)))?;
429 }
430
431 if plan.distinct {
432 let mut state = DistinctState::default();
433 let source_schema = combined_batch.schema();
434 combined_batch = match distinct_filter_batch(combined_batch, &mut state)? {
435 Some(filtered) => filtered,
436 None => RecordBatch::new_empty(source_schema),
437 };
438 }
439
440 let schema = combined_batch.schema();
441
442 let display_name = tables
443 .iter()
444 .map(|(table_ref, _)| table_ref.qualified_name())
445 .collect::<Vec<_>>()
446 .join(",");
447
448 Ok(SelectExecution::new_single_batch(
449 display_name,
450 schema,
451 combined_batch,
452 ))
453 }
454
455 fn execute_projection(
456 &self,
457 table: Arc<ExecutorTable<P>>,
458 display_name: String,
459 plan: SelectPlan,
460 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
461 ) -> ExecutorResult<SelectExecution<P>> {
462 let table_ref = table.as_ref();
463 let projections = if plan.projections.is_empty() {
464 build_wildcard_projections(table_ref)
465 } else {
466 build_projected_columns(table_ref, &plan.projections)?
467 };
468 let schema = schema_for_projections(table_ref, &projections)?;
469
470 let (filter_expr, full_table_scan) = match plan.filter {
471 Some(expr) => (
472 crate::translation::expression::translate_predicate(
473 expr,
474 table_ref.schema.as_ref(),
475 |name| Error::InvalidArgumentError(format!("unknown column '{}'", name)),
476 )?,
477 false,
478 ),
479 None => {
480 let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
481 Error::InvalidArgumentError(
482 "table has no columns; cannot perform wildcard scan".into(),
483 )
484 })?;
485 (
486 crate::translation::expression::full_table_scan_filter(field_id),
487 true,
488 )
489 }
490 };
491
492 let expanded_order = expand_order_targets(&plan.order_by, &projections)?;
493 let physical_order = if let Some(first) = expanded_order.first() {
494 Some(resolve_scan_order(table_ref, &projections, first)?)
495 } else {
496 None
497 };
498
499 let options = if let Some(order_spec) = physical_order {
500 if row_filter.is_some() {
501 tracing::debug!("Applying MVCC row filter with ORDER BY");
502 }
503 ScanStreamOptions {
504 include_nulls: true,
505 order: Some(order_spec),
506 row_id_filter: row_filter.clone(),
507 }
508 } else {
509 if row_filter.is_some() {
510 tracing::debug!("Applying MVCC row filter");
511 }
512 ScanStreamOptions {
513 include_nulls: true,
514 order: None,
515 row_id_filter: row_filter.clone(),
516 }
517 };
518
519 Ok(SelectExecution::new_projection(
520 display_name,
521 schema,
522 table,
523 projections,
524 filter_expr,
525 options,
526 full_table_scan,
527 expanded_order,
528 plan.distinct,
529 ))
530 }
531
532 fn execute_aggregates(
533 &self,
534 table: Arc<ExecutorTable<P>>,
535 display_name: String,
536 plan: SelectPlan,
537 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
538 ) -> ExecutorResult<SelectExecution<P>> {
539 let table_ref = table.as_ref();
540 let distinct = plan.distinct;
541 let mut specs: Vec<AggregateSpec> = Vec::with_capacity(plan.aggregates.len());
542 for aggregate in plan.aggregates {
543 match aggregate {
544 AggregateExpr::CountStar { alias } => {
545 specs.push(AggregateSpec {
546 alias,
547 kind: AggregateKind::CountStar,
548 });
549 }
550 AggregateExpr::Column {
551 column,
552 alias,
553 function,
554 distinct,
555 } => {
556 let col = table_ref.schema.resolve(&column).ok_or_else(|| {
557 Error::InvalidArgumentError(format!(
558 "unknown column '{}' in aggregate",
559 column
560 ))
561 })?;
562
563 let kind = match function {
564 AggregateFunction::Count => {
565 if distinct {
566 AggregateKind::CountDistinctField {
567 field_id: col.field_id,
568 }
569 } else {
570 AggregateKind::CountField {
571 field_id: col.field_id,
572 }
573 }
574 }
575 AggregateFunction::SumInt64 => {
576 if col.data_type != DataType::Int64 {
577 return Err(Error::InvalidArgumentError(
578 "SUM currently supports only INTEGER columns".into(),
579 ));
580 }
581 AggregateKind::SumInt64 {
582 field_id: col.field_id,
583 }
584 }
585 AggregateFunction::MinInt64 => {
586 if col.data_type != DataType::Int64 {
587 return Err(Error::InvalidArgumentError(
588 "MIN currently supports only INTEGER columns".into(),
589 ));
590 }
591 AggregateKind::MinInt64 {
592 field_id: col.field_id,
593 }
594 }
595 AggregateFunction::MaxInt64 => {
596 if col.data_type != DataType::Int64 {
597 return Err(Error::InvalidArgumentError(
598 "MAX currently supports only INTEGER columns".into(),
599 ));
600 }
601 AggregateKind::MaxInt64 {
602 field_id: col.field_id,
603 }
604 }
605 AggregateFunction::CountNulls => {
606 if distinct {
607 return Err(Error::InvalidArgumentError(
608 "DISTINCT is not supported for COUNT_NULLS".into(),
609 ));
610 }
611 AggregateKind::CountNulls {
612 field_id: col.field_id,
613 }
614 }
615 };
616 specs.push(AggregateSpec { alias, kind });
617 }
618 }
619 }
620
621 if specs.is_empty() {
622 return Err(Error::InvalidArgumentError(
623 "aggregate query requires at least one aggregate expression".into(),
624 ));
625 }
626
627 let had_filter = plan.filter.is_some();
628 let filter_expr = match plan.filter {
629 Some(expr) => crate::translation::expression::translate_predicate(
630 expr,
631 table.schema.as_ref(),
632 |name| Error::InvalidArgumentError(format!("unknown column '{}'", name)),
633 )?,
634 None => {
635 let field_id = table.schema.first_field_id().ok_or_else(|| {
636 Error::InvalidArgumentError(
637 "table has no columns; cannot perform aggregate scan".into(),
638 )
639 })?;
640 crate::translation::expression::full_table_scan_filter(field_id)
641 }
642 };
643
644 let mut projections = Vec::new();
646 let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
647
648 for spec in &specs {
649 if let Some(field_id) = spec.kind.field_id() {
650 let proj_idx = projections.len();
651 spec_to_projection.push(Some(proj_idx));
652 projections.push(ScanProjection::from(StoreProjection::with_alias(
653 LogicalFieldId::for_user(table.table.table_id(), field_id),
654 table
655 .schema
656 .column_by_field_id(field_id)
657 .map(|c| c.name.clone())
658 .unwrap_or_else(|| format!("col{field_id}")),
659 )));
660 } else {
661 spec_to_projection.push(None);
662 }
663 }
664
665 if projections.is_empty() {
666 let field_id = table.schema.first_field_id().ok_or_else(|| {
667 Error::InvalidArgumentError(
668 "table has no columns; cannot perform aggregate scan".into(),
669 )
670 })?;
671 projections.push(ScanProjection::from(StoreProjection::with_alias(
672 LogicalFieldId::for_user(table.table.table_id(), field_id),
673 table
674 .schema
675 .column_by_field_id(field_id)
676 .map(|c| c.name.clone())
677 .unwrap_or_else(|| format!("col{field_id}")),
678 )));
679 }
680
681 let options = ScanStreamOptions {
682 include_nulls: true,
683 order: None,
684 row_id_filter: row_filter.clone(),
685 };
686
687 let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
688 let mut count_star_override: Option<i64> = None;
692 if !had_filter && row_filter.is_none() {
693 let total_rows = table.total_rows.load(Ordering::SeqCst);
695 tracing::debug!(
696 "[AGGREGATE] Using COUNT(*) shortcut: total_rows={}",
697 total_rows
698 );
699 if total_rows > i64::MAX as u64 {
700 return Err(Error::InvalidArgumentError(
701 "COUNT(*) result exceeds supported range".into(),
702 ));
703 }
704 count_star_override = Some(total_rows as i64);
705 } else {
706 tracing::debug!(
707 "[AGGREGATE] NOT using COUNT(*) shortcut: had_filter={}, has_row_filter={}",
708 had_filter,
709 row_filter.is_some()
710 );
711 }
712
713 for (idx, spec) in specs.iter().enumerate() {
714 states.push(AggregateState {
715 alias: spec.alias.clone(),
716 accumulator: AggregateAccumulator::new_with_projection_index(
717 spec,
718 spec_to_projection[idx],
719 count_star_override,
720 )?,
721 override_value: match spec.kind {
722 AggregateKind::CountStar => {
723 tracing::debug!(
724 "[AGGREGATE] CountStar override_value={:?}",
725 count_star_override
726 );
727 count_star_override
728 }
729 _ => None,
730 },
731 });
732 }
733
734 let mut error: Option<Error> = None;
735 match table.table.scan_stream(
736 projections,
737 &filter_expr,
738 ScanStreamOptions {
739 row_id_filter: row_filter.clone(),
740 ..options
741 },
742 |batch| {
743 if error.is_some() {
744 return;
745 }
746 for state in &mut states {
747 if let Err(err) = state.update(&batch) {
748 error = Some(err);
749 return;
750 }
751 }
752 },
753 ) {
754 Ok(()) => {}
755 Err(llkv_result::Error::NotFound) => {
756 }
759 Err(err) => return Err(err),
760 }
761 if let Some(err) = error {
762 return Err(err);
763 }
764
765 let mut fields = Vec::with_capacity(states.len());
766 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(states.len());
767 for state in states {
768 let (field, array) = state.finalize()?;
769 fields.push(field);
770 arrays.push(array);
771 }
772
773 let schema = Arc::new(Schema::new(fields));
774 let mut batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
775
776 if distinct {
777 let mut state = DistinctState::default();
778 batch = match distinct_filter_batch(batch, &mut state)? {
779 Some(filtered) => filtered,
780 None => RecordBatch::new_empty(Arc::clone(&schema)),
781 };
782 }
783
784 let schema = batch.schema();
785
786 Ok(SelectExecution::new_single_batch(
787 display_name,
788 schema,
789 batch,
790 ))
791 }
792
793 fn execute_computed_aggregates(
796 &self,
797 table: Arc<ExecutorTable<P>>,
798 display_name: String,
799 plan: SelectPlan,
800 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
801 ) -> ExecutorResult<SelectExecution<P>> {
802 use arrow::array::Int64Array;
803 use llkv_expr::expr::AggregateCall;
804
805 let table_ref = table.as_ref();
806 let distinct = plan.distinct;
807
808 let mut aggregate_specs: Vec<(String, AggregateCall<String>)> = Vec::new();
810 for proj in &plan.projections {
811 if let SelectProjection::Computed { expr, .. } = proj {
812 Self::collect_aggregates(expr, &mut aggregate_specs);
813 }
814 }
815
816 let computed_aggregates = self.compute_aggregate_values(
818 table.clone(),
819 &plan.filter,
820 &aggregate_specs,
821 row_filter.clone(),
822 )?;
823
824 let mut fields = Vec::with_capacity(plan.projections.len());
826 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(plan.projections.len());
827
828 for proj in &plan.projections {
829 match proj {
830 SelectProjection::AllColumns | SelectProjection::AllColumnsExcept { .. } => {
831 return Err(Error::InvalidArgumentError(
832 "Wildcard projections not supported with computed aggregates".into(),
833 ));
834 }
835 SelectProjection::Column { name, alias } => {
836 let col = table_ref.schema.resolve(name).ok_or_else(|| {
837 Error::InvalidArgumentError(format!("unknown column '{}'", name))
838 })?;
839 let field_name = alias.as_ref().unwrap_or(name);
840 fields.push(arrow::datatypes::Field::new(
841 field_name,
842 col.data_type.clone(),
843 col.nullable,
844 ));
845 return Err(Error::InvalidArgumentError(
848 "Regular columns not supported in aggregate queries without GROUP BY"
849 .into(),
850 ));
851 }
852 SelectProjection::Computed { expr, alias } => {
853 let value = Self::evaluate_expr_with_aggregates(expr, &computed_aggregates)?;
855
856 fields.push(arrow::datatypes::Field::new(alias, DataType::Int64, false));
857
858 let array = Arc::new(Int64Array::from(vec![value])) as ArrayRef;
859 arrays.push(array);
860 }
861 }
862 }
863
864 let schema = Arc::new(Schema::new(fields));
865 let mut batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
866
867 if distinct {
868 let mut state = DistinctState::default();
869 batch = match distinct_filter_batch(batch, &mut state)? {
870 Some(filtered) => filtered,
871 None => RecordBatch::new_empty(Arc::clone(&schema)),
872 };
873 }
874
875 let schema = batch.schema();
876
877 Ok(SelectExecution::new_single_batch(
878 display_name,
879 schema,
880 batch,
881 ))
882 }
883
884 fn collect_aggregates(
886 expr: &ScalarExpr<String>,
887 aggregates: &mut Vec<(String, llkv_expr::expr::AggregateCall<String>)>,
888 ) {
889 match expr {
890 ScalarExpr::Aggregate(agg) => {
891 let key = format!("{:?}", agg);
893 if !aggregates.iter().any(|(k, _)| k == &key) {
894 aggregates.push((key, agg.clone()));
895 }
896 }
897 ScalarExpr::Binary { left, right, .. } => {
898 Self::collect_aggregates(left, aggregates);
899 Self::collect_aggregates(right, aggregates);
900 }
901 ScalarExpr::GetField { base, .. } => {
902 Self::collect_aggregates(base, aggregates);
903 }
904 ScalarExpr::Column(_) | ScalarExpr::Literal(_) => {}
905 }
906 }
907
908 fn compute_aggregate_values(
910 &self,
911 table: Arc<ExecutorTable<P>>,
912 filter: &Option<llkv_expr::expr::Expr<'static, String>>,
913 aggregate_specs: &[(String, llkv_expr::expr::AggregateCall<String>)],
914 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
915 ) -> ExecutorResult<FxHashMap<String, i64>> {
916 use llkv_expr::expr::AggregateCall;
917
918 let table_ref = table.as_ref();
919 let mut results =
920 FxHashMap::with_capacity_and_hasher(aggregate_specs.len(), Default::default());
921
922 let mut specs: Vec<AggregateSpec> = Vec::new();
924 for (key, agg) in aggregate_specs {
925 let kind = match agg {
926 AggregateCall::CountStar => AggregateKind::CountStar,
927 AggregateCall::Count(col_name) => {
928 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
929 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
930 })?;
931 AggregateKind::CountField {
932 field_id: col.field_id,
933 }
934 }
935 AggregateCall::Sum(col_name) => {
936 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
937 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
938 })?;
939 AggregateKind::SumInt64 {
940 field_id: col.field_id,
941 }
942 }
943 AggregateCall::Min(col_name) => {
944 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
945 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
946 })?;
947 AggregateKind::MinInt64 {
948 field_id: col.field_id,
949 }
950 }
951 AggregateCall::Max(col_name) => {
952 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
953 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
954 })?;
955 AggregateKind::MaxInt64 {
956 field_id: col.field_id,
957 }
958 }
959 AggregateCall::CountNulls(col_name) => {
960 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
961 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
962 })?;
963 AggregateKind::CountNulls {
964 field_id: col.field_id,
965 }
966 }
967 };
968 specs.push(AggregateSpec {
969 alias: key.clone(),
970 kind,
971 });
972 }
973
974 let filter_expr = match filter {
976 Some(expr) => crate::translation::expression::translate_predicate(
977 expr.clone(),
978 table_ref.schema.as_ref(),
979 |name| Error::InvalidArgumentError(format!("unknown column '{}'", name)),
980 )?,
981 None => {
982 let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
983 Error::InvalidArgumentError(
984 "table has no columns; cannot perform aggregate scan".into(),
985 )
986 })?;
987 crate::translation::expression::full_table_scan_filter(field_id)
988 }
989 };
990
991 let mut projections: Vec<ScanProjection> = Vec::new();
992 let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
993 let count_star_override: Option<i64> = None;
994
995 for spec in &specs {
996 if let Some(field_id) = spec.kind.field_id() {
997 spec_to_projection.push(Some(projections.len()));
998 projections.push(ScanProjection::from(StoreProjection::with_alias(
999 LogicalFieldId::for_user(table.table.table_id(), field_id),
1000 table
1001 .schema
1002 .column_by_field_id(field_id)
1003 .map(|c| c.name.clone())
1004 .unwrap_or_else(|| format!("col{field_id}")),
1005 )));
1006 } else {
1007 spec_to_projection.push(None);
1008 }
1009 }
1010
1011 if projections.is_empty() {
1012 let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
1013 Error::InvalidArgumentError(
1014 "table has no columns; cannot perform aggregate scan".into(),
1015 )
1016 })?;
1017 projections.push(ScanProjection::from(StoreProjection::with_alias(
1018 LogicalFieldId::for_user(table.table.table_id(), field_id),
1019 table
1020 .schema
1021 .column_by_field_id(field_id)
1022 .map(|c| c.name.clone())
1023 .unwrap_or_else(|| format!("col{field_id}")),
1024 )));
1025 }
1026
1027 let base_options = ScanStreamOptions {
1028 include_nulls: true,
1029 order: None,
1030 row_id_filter: None,
1031 };
1032
1033 let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
1034 for (idx, spec) in specs.iter().enumerate() {
1035 states.push(AggregateState {
1036 alias: spec.alias.clone(),
1037 accumulator: AggregateAccumulator::new_with_projection_index(
1038 spec,
1039 spec_to_projection[idx],
1040 count_star_override,
1041 )?,
1042 override_value: match spec.kind {
1043 AggregateKind::CountStar => count_star_override,
1044 _ => None,
1045 },
1046 });
1047 }
1048
1049 let mut error: Option<Error> = None;
1050 match table.table.scan_stream(
1051 projections,
1052 &filter_expr,
1053 ScanStreamOptions {
1054 row_id_filter: row_filter.clone(),
1055 ..base_options
1056 },
1057 |batch| {
1058 if error.is_some() {
1059 return;
1060 }
1061 for state in &mut states {
1062 if let Err(err) = state.update(&batch) {
1063 error = Some(err);
1064 return;
1065 }
1066 }
1067 },
1068 ) {
1069 Ok(()) => {}
1070 Err(llkv_result::Error::NotFound) => {}
1071 Err(err) => return Err(err),
1072 }
1073 if let Some(err) = error {
1074 return Err(err);
1075 }
1076
1077 for state in states {
1079 let alias = state.alias.clone();
1080 let (_field, array) = state.finalize()?;
1081
1082 let int64_array = array
1084 .as_any()
1085 .downcast_ref::<arrow::array::Int64Array>()
1086 .ok_or_else(|| Error::Internal("Expected Int64Array from aggregate".into()))?;
1087
1088 if int64_array.len() != 1 {
1089 return Err(Error::Internal(format!(
1090 "Expected single value from aggregate, got {}",
1091 int64_array.len()
1092 )));
1093 }
1094
1095 let value = if int64_array.is_null(0) {
1096 0
1097 } else {
1098 int64_array.value(0)
1099 };
1100
1101 results.insert(alias, value);
1102 }
1103
1104 Ok(results)
1105 }
1106
1107 fn evaluate_expr_with_aggregates(
1109 expr: &ScalarExpr<String>,
1110 aggregates: &FxHashMap<String, i64>,
1111 ) -> ExecutorResult<i64> {
1112 use llkv_expr::expr::BinaryOp;
1113 use llkv_expr::literal::Literal;
1114
1115 match expr {
1116 ScalarExpr::Literal(Literal::Integer(v)) => Ok(*v as i64),
1117 ScalarExpr::Literal(Literal::Float(v)) => Ok(*v as i64),
1118 ScalarExpr::Literal(Literal::Boolean(v)) => Ok(if *v { 1 } else { 0 }),
1119 ScalarExpr::Literal(Literal::String(_)) => Err(Error::InvalidArgumentError(
1120 "String literals not supported in aggregate expressions".into(),
1121 )),
1122 ScalarExpr::Literal(Literal::Null) => Err(Error::InvalidArgumentError(
1123 "NULL literals not supported in aggregate expressions".into(),
1124 )),
1125 ScalarExpr::Literal(Literal::Struct(_)) => Err(Error::InvalidArgumentError(
1126 "Struct literals not supported in aggregate expressions".into(),
1127 )),
1128 ScalarExpr::Column(_) => Err(Error::InvalidArgumentError(
1129 "Column references not supported in aggregate-only expressions".into(),
1130 )),
1131 ScalarExpr::Aggregate(agg) => {
1132 let key = format!("{:?}", agg);
1133 aggregates.get(&key).copied().ok_or_else(|| {
1134 Error::Internal(format!("Aggregate value not found for key: {}", key))
1135 })
1136 }
1137 ScalarExpr::Binary { left, op, right } => {
1138 let left_val = Self::evaluate_expr_with_aggregates(left, aggregates)?;
1139 let right_val = Self::evaluate_expr_with_aggregates(right, aggregates)?;
1140
1141 let result = match op {
1142 BinaryOp::Add => left_val.checked_add(right_val),
1143 BinaryOp::Subtract => left_val.checked_sub(right_val),
1144 BinaryOp::Multiply => left_val.checked_mul(right_val),
1145 BinaryOp::Divide => {
1146 if right_val == 0 {
1147 return Err(Error::InvalidArgumentError("Division by zero".into()));
1148 }
1149 left_val.checked_div(right_val)
1150 }
1151 BinaryOp::Modulo => {
1152 if right_val == 0 {
1153 return Err(Error::InvalidArgumentError("Modulo by zero".into()));
1154 }
1155 left_val.checked_rem(right_val)
1156 }
1157 };
1158
1159 result.ok_or_else(|| {
1160 Error::InvalidArgumentError("Arithmetic overflow in expression".into())
1161 })
1162 }
1163 ScalarExpr::GetField { .. } => Err(Error::InvalidArgumentError(
1164 "GetField not supported in aggregate-only expressions".into(),
1165 )),
1166 }
1167 }
1168}
1169
1170struct CrossProductExpressionContext {
1171 schema: Arc<ExecutorSchema>,
1172 field_id_to_index: FxHashMap<FieldId, usize>,
1173 numeric_cache: FxHashMap<FieldId, NumericArray>,
1174 column_cache: FxHashMap<FieldId, ColumnAccessor>,
1175}
1176
1177#[derive(Clone)]
1178enum ColumnAccessor {
1179 Int64(Arc<Int64Array>),
1180 Float64(Arc<Float64Array>),
1181 Boolean(Arc<BooleanArray>),
1182 Utf8(Arc<StringArray>),
1183 Null(usize),
1184}
1185
1186impl ColumnAccessor {
1187 fn from_array(array: &ArrayRef) -> ExecutorResult<Self> {
1188 match array.data_type() {
1189 DataType::Int64 => {
1190 let typed = array
1191 .as_any()
1192 .downcast_ref::<Int64Array>()
1193 .ok_or_else(|| Error::Internal("expected Int64 array".into()))?
1194 .clone();
1195 Ok(Self::Int64(Arc::new(typed)))
1196 }
1197 DataType::Float64 => {
1198 let typed = array
1199 .as_any()
1200 .downcast_ref::<Float64Array>()
1201 .ok_or_else(|| Error::Internal("expected Float64 array".into()))?
1202 .clone();
1203 Ok(Self::Float64(Arc::new(typed)))
1204 }
1205 DataType::Boolean => {
1206 let typed = array
1207 .as_any()
1208 .downcast_ref::<BooleanArray>()
1209 .ok_or_else(|| Error::Internal("expected Boolean array".into()))?
1210 .clone();
1211 Ok(Self::Boolean(Arc::new(typed)))
1212 }
1213 DataType::Utf8 => {
1214 let typed = array
1215 .as_any()
1216 .downcast_ref::<StringArray>()
1217 .ok_or_else(|| Error::Internal("expected Utf8 array".into()))?
1218 .clone();
1219 Ok(Self::Utf8(Arc::new(typed)))
1220 }
1221 DataType::Null => Ok(Self::Null(array.len())),
1222 other => Err(Error::InvalidArgumentError(format!(
1223 "unsupported column type {:?} in cross product filter",
1224 other
1225 ))),
1226 }
1227 }
1228
1229 fn len(&self) -> usize {
1230 match self {
1231 ColumnAccessor::Int64(array) => array.len(),
1232 ColumnAccessor::Float64(array) => array.len(),
1233 ColumnAccessor::Boolean(array) => array.len(),
1234 ColumnAccessor::Utf8(array) => array.len(),
1235 ColumnAccessor::Null(len) => *len,
1236 }
1237 }
1238
1239 fn is_null(&self, idx: usize) -> bool {
1240 match self {
1241 ColumnAccessor::Int64(array) => array.is_null(idx),
1242 ColumnAccessor::Float64(array) => array.is_null(idx),
1243 ColumnAccessor::Boolean(array) => array.is_null(idx),
1244 ColumnAccessor::Utf8(array) => array.is_null(idx),
1245 ColumnAccessor::Null(_) => true,
1246 }
1247 }
1248
1249 fn as_array_ref(&self) -> ArrayRef {
1250 match self {
1251 ColumnAccessor::Int64(array) => Arc::clone(array) as ArrayRef,
1252 ColumnAccessor::Float64(array) => Arc::clone(array) as ArrayRef,
1253 ColumnAccessor::Boolean(array) => Arc::clone(array) as ArrayRef,
1254 ColumnAccessor::Utf8(array) => Arc::clone(array) as ArrayRef,
1255 ColumnAccessor::Null(len) => new_null_array(&DataType::Null, *len),
1256 }
1257 }
1258}
1259
1260#[derive(Clone)]
1261enum ValueArray {
1262 Numeric(NumericArray),
1263 Boolean(Arc<BooleanArray>),
1264 Utf8(Arc<StringArray>),
1265 Null(usize),
1266}
1267
1268impl ValueArray {
1269 fn from_array(array: ArrayRef) -> ExecutorResult<Self> {
1270 match array.data_type() {
1271 DataType::Boolean => {
1272 let typed = array
1273 .as_any()
1274 .downcast_ref::<BooleanArray>()
1275 .ok_or_else(|| Error::Internal("expected Boolean array".into()))?
1276 .clone();
1277 Ok(Self::Boolean(Arc::new(typed)))
1278 }
1279 DataType::Utf8 => {
1280 let typed = array
1281 .as_any()
1282 .downcast_ref::<StringArray>()
1283 .ok_or_else(|| Error::Internal("expected Utf8 array".into()))?
1284 .clone();
1285 Ok(Self::Utf8(Arc::new(typed)))
1286 }
1287 DataType::Null => Ok(Self::Null(array.len())),
1288 DataType::Int8
1289 | DataType::Int16
1290 | DataType::Int32
1291 | DataType::Int64
1292 | DataType::UInt8
1293 | DataType::UInt16
1294 | DataType::UInt32
1295 | DataType::UInt64
1296 | DataType::Float32
1297 | DataType::Float64 => {
1298 let numeric = NumericArray::try_from_arrow(&array)?;
1299 Ok(Self::Numeric(numeric))
1300 }
1301 other => Err(Error::InvalidArgumentError(format!(
1302 "unsupported data type {:?} in cross product expression",
1303 other
1304 ))),
1305 }
1306 }
1307
1308 fn len(&self) -> usize {
1309 match self {
1310 ValueArray::Numeric(array) => array.len(),
1311 ValueArray::Boolean(array) => array.len(),
1312 ValueArray::Utf8(array) => array.len(),
1313 ValueArray::Null(len) => *len,
1314 }
1315 }
1316}
1317
1318fn truth_and(lhs: Option<bool>, rhs: Option<bool>) -> Option<bool> {
1319 match (lhs, rhs) {
1320 (Some(false), _) | (_, Some(false)) => Some(false),
1321 (Some(true), Some(true)) => Some(true),
1322 (Some(true), None) | (None, Some(true)) | (None, None) => None,
1323 }
1324}
1325
1326fn truth_or(lhs: Option<bool>, rhs: Option<bool>) -> Option<bool> {
1327 match (lhs, rhs) {
1328 (Some(true), _) | (_, Some(true)) => Some(true),
1329 (Some(false), Some(false)) => Some(false),
1330 (Some(false), None) | (None, Some(false)) | (None, None) => None,
1331 }
1332}
1333
1334fn truth_not(value: Option<bool>) -> Option<bool> {
1335 match value {
1336 Some(true) => Some(false),
1337 Some(false) => Some(true),
1338 None => None,
1339 }
1340}
1341
1342fn compare_bool(op: CompareOp, lhs: bool, rhs: bool) -> bool {
1343 let l = lhs as u8;
1344 let r = rhs as u8;
1345 match op {
1346 CompareOp::Eq => lhs == rhs,
1347 CompareOp::NotEq => lhs != rhs,
1348 CompareOp::Lt => l < r,
1349 CompareOp::LtEq => l <= r,
1350 CompareOp::Gt => l > r,
1351 CompareOp::GtEq => l >= r,
1352 }
1353}
1354
1355fn compare_str(op: CompareOp, lhs: &str, rhs: &str) -> bool {
1356 match op {
1357 CompareOp::Eq => lhs == rhs,
1358 CompareOp::NotEq => lhs != rhs,
1359 CompareOp::Lt => lhs < rhs,
1360 CompareOp::LtEq => lhs <= rhs,
1361 CompareOp::Gt => lhs > rhs,
1362 CompareOp::GtEq => lhs >= rhs,
1363 }
1364}
1365
1366fn finalize_in_list_result(has_match: bool, saw_null: bool, negated: bool) -> Option<bool> {
1367 if has_match {
1368 Some(!negated)
1369 } else if saw_null {
1370 None
1371 } else if negated {
1372 Some(true)
1373 } else {
1374 Some(false)
1375 }
1376}
1377
1378fn literal_to_constant_array(literal: &Literal, len: usize) -> ExecutorResult<ArrayRef> {
1379 match literal {
1380 Literal::Integer(v) => {
1381 let value = i64::try_from(*v).unwrap_or(0);
1382 let values = vec![value; len];
1383 Ok(Arc::new(Int64Array::from(values)) as ArrayRef)
1384 }
1385 Literal::Float(v) => {
1386 let values = vec![*v; len];
1387 Ok(Arc::new(Float64Array::from(values)) as ArrayRef)
1388 }
1389 Literal::Boolean(v) => {
1390 let values = vec![Some(*v); len];
1391 Ok(Arc::new(BooleanArray::from(values)) as ArrayRef)
1392 }
1393 Literal::String(v) => {
1394 let values: Vec<Option<String>> = (0..len).map(|_| Some(v.clone())).collect();
1395 Ok(Arc::new(StringArray::from(values)) as ArrayRef)
1396 }
1397 Literal::Null => Ok(new_null_array(&DataType::Null, len)),
1398 Literal::Struct(_) => Err(Error::InvalidArgumentError(
1399 "struct literals are not supported in cross product filters".into(),
1400 )),
1401 }
1402}
1403
1404impl CrossProductExpressionContext {
1405 fn new(schema: &Schema, lookup: FxHashMap<String, usize>) -> ExecutorResult<Self> {
1406 let mut columns = Vec::with_capacity(schema.fields().len());
1407 let mut field_id_to_index = FxHashMap::default();
1408 let mut next_field_id: FieldId = 1;
1409
1410 for (idx, field) in schema.fields().iter().enumerate() {
1411 if next_field_id == u32::MAX {
1412 return Err(Error::Internal(
1413 "cross product projection exhausted FieldId space".into(),
1414 ));
1415 }
1416
1417 let executor_column = ExecutorColumn {
1418 name: field.name().clone(),
1419 data_type: field.data_type().clone(),
1420 nullable: field.is_nullable(),
1421 primary_key: false,
1422 unique: false,
1423 field_id: next_field_id,
1424 check_expr: None,
1425 };
1426 let field_id = next_field_id;
1427 next_field_id = next_field_id.saturating_add(1);
1428
1429 columns.push(executor_column);
1430 field_id_to_index.insert(field_id, idx);
1431 }
1432
1433 Ok(Self {
1434 schema: Arc::new(ExecutorSchema { columns, lookup }),
1435 field_id_to_index,
1436 numeric_cache: FxHashMap::default(),
1437 column_cache: FxHashMap::default(),
1438 })
1439 }
1440
1441 fn schema(&self) -> &ExecutorSchema {
1442 self.schema.as_ref()
1443 }
1444
1445 fn reset(&mut self) {
1446 self.numeric_cache.clear();
1447 self.column_cache.clear();
1448 }
1449
1450 fn evaluate(
1451 &mut self,
1452 expr: &ScalarExpr<String>,
1453 batch: &RecordBatch,
1454 ) -> ExecutorResult<ArrayRef> {
1455 let translated = translate_scalar(expr, self.schema.as_ref(), |name| {
1456 Error::InvalidArgumentError(format!(
1457 "column '{}' not found in cross product result",
1458 name
1459 ))
1460 })?;
1461
1462 self.evaluate_numeric(&translated, batch)
1463 }
1464
1465 fn evaluate_predicate_mask(
1466 &mut self,
1467 expr: &LlkvExpr<'static, FieldId>,
1468 batch: &RecordBatch,
1469 ) -> ExecutorResult<BooleanArray> {
1470 let truths = self.evaluate_predicate_truths(expr, batch)?;
1471 let mut builder = BooleanBuilder::with_capacity(truths.len());
1472 for value in truths {
1473 builder.append_value(value.unwrap_or(false));
1474 }
1475 Ok(builder.finish())
1476 }
1477
1478 fn evaluate_predicate_truths(
1479 &mut self,
1480 expr: &LlkvExpr<'static, FieldId>,
1481 batch: &RecordBatch,
1482 ) -> ExecutorResult<Vec<Option<bool>>> {
1483 match expr {
1484 LlkvExpr::Literal(value) => Ok(vec![Some(*value); batch.num_rows()]),
1485 LlkvExpr::And(children) => {
1486 if children.is_empty() {
1487 return Ok(vec![Some(true); batch.num_rows()]);
1488 }
1489 let mut result = self.evaluate_predicate_truths(&children[0], batch)?;
1490 for child in &children[1..] {
1491 let next = self.evaluate_predicate_truths(child, batch)?;
1492 for (lhs, rhs) in result.iter_mut().zip(next.into_iter()) {
1493 *lhs = truth_and(*lhs, rhs);
1494 }
1495 }
1496 Ok(result)
1497 }
1498 LlkvExpr::Or(children) => {
1499 if children.is_empty() {
1500 return Ok(vec![Some(false); batch.num_rows()]);
1501 }
1502 let mut result = self.evaluate_predicate_truths(&children[0], batch)?;
1503 for child in &children[1..] {
1504 let next = self.evaluate_predicate_truths(child, batch)?;
1505 for (lhs, rhs) in result.iter_mut().zip(next.into_iter()) {
1506 *lhs = truth_or(*lhs, rhs);
1507 }
1508 }
1509 Ok(result)
1510 }
1511 LlkvExpr::Not(inner) => {
1512 let mut values = self.evaluate_predicate_truths(inner, batch)?;
1513 for value in &mut values {
1514 *value = truth_not(*value);
1515 }
1516 Ok(values)
1517 }
1518 LlkvExpr::Pred(filter) => self.evaluate_filter_truths(filter, batch),
1519 LlkvExpr::Compare { left, op, right } => {
1520 self.evaluate_compare_truths(left, *op, right, batch)
1521 }
1522 LlkvExpr::InList {
1523 expr: target,
1524 list,
1525 negated,
1526 } => self.evaluate_in_list_truths(target, list, *negated, batch),
1527 }
1528 }
1529
1530 fn evaluate_filter_truths(
1531 &mut self,
1532 filter: &Filter<FieldId>,
1533 batch: &RecordBatch,
1534 ) -> ExecutorResult<Vec<Option<bool>>> {
1535 let accessor = self.column_accessor(filter.field_id, batch)?;
1536 let len = accessor.len();
1537
1538 match &filter.op {
1539 Operator::IsNull => {
1540 let mut out = Vec::with_capacity(len);
1541 for idx in 0..len {
1542 out.push(Some(accessor.is_null(idx)));
1543 }
1544 Ok(out)
1545 }
1546 Operator::IsNotNull => {
1547 let mut out = Vec::with_capacity(len);
1548 for idx in 0..len {
1549 out.push(Some(!accessor.is_null(idx)));
1550 }
1551 Ok(out)
1552 }
1553 _ => match accessor {
1554 ColumnAccessor::Int64(array) => {
1555 let predicate = build_fixed_width_predicate::<Int64Type>(&filter.op)
1556 .map_err(Error::predicate_build)?;
1557 let mut out = Vec::with_capacity(len);
1558 for idx in 0..len {
1559 if array.is_null(idx) {
1560 out.push(None);
1561 } else {
1562 let value = array.value(idx);
1563 out.push(Some(predicate.matches(&value)));
1564 }
1565 }
1566 Ok(out)
1567 }
1568 ColumnAccessor::Float64(array) => {
1569 let predicate = build_fixed_width_predicate::<Float64Type>(&filter.op)
1570 .map_err(Error::predicate_build)?;
1571 let mut out = Vec::with_capacity(len);
1572 for idx in 0..len {
1573 if array.is_null(idx) {
1574 out.push(None);
1575 } else {
1576 let value = array.value(idx);
1577 out.push(Some(predicate.matches(&value)));
1578 }
1579 }
1580 Ok(out)
1581 }
1582 ColumnAccessor::Boolean(array) => {
1583 let predicate =
1584 build_bool_predicate(&filter.op).map_err(Error::predicate_build)?;
1585 let mut out = Vec::with_capacity(len);
1586 for idx in 0..len {
1587 if array.is_null(idx) {
1588 out.push(None);
1589 } else {
1590 let value = array.value(idx);
1591 out.push(Some(predicate.matches(&value)));
1592 }
1593 }
1594 Ok(out)
1595 }
1596 ColumnAccessor::Utf8(array) => {
1597 let predicate =
1598 build_var_width_predicate(&filter.op).map_err(Error::predicate_build)?;
1599 let mut out = Vec::with_capacity(len);
1600 for idx in 0..len {
1601 if array.is_null(idx) {
1602 out.push(None);
1603 } else {
1604 let value = array.value(idx);
1605 out.push(Some(predicate.matches(value)));
1606 }
1607 }
1608 Ok(out)
1609 }
1610 ColumnAccessor::Null(len) => Ok(vec![None; len]),
1611 },
1612 }
1613 }
1614
1615 fn evaluate_compare_truths(
1616 &mut self,
1617 left: &ScalarExpr<FieldId>,
1618 op: CompareOp,
1619 right: &ScalarExpr<FieldId>,
1620 batch: &RecordBatch,
1621 ) -> ExecutorResult<Vec<Option<bool>>> {
1622 let left_values = self.materialize_value_array(left, batch)?;
1623 let right_values = self.materialize_value_array(right, batch)?;
1624
1625 if left_values.len() != right_values.len() {
1626 return Err(Error::Internal(
1627 "mismatched compare operand lengths in cross product filter".into(),
1628 ));
1629 }
1630
1631 let len = left_values.len();
1632 match (&left_values, &right_values) {
1633 (ValueArray::Null(_), _) | (_, ValueArray::Null(_)) => Ok(vec![None; len]),
1634 (ValueArray::Numeric(lhs), ValueArray::Numeric(rhs)) => {
1635 let mut out = Vec::with_capacity(len);
1636 for idx in 0..len {
1637 match (lhs.value(idx), rhs.value(idx)) {
1638 (Some(lv), Some(rv)) => out.push(Some(NumericKernels::compare(op, lv, rv))),
1639 _ => out.push(None),
1640 }
1641 }
1642 Ok(out)
1643 }
1644 (ValueArray::Boolean(lhs), ValueArray::Boolean(rhs)) => {
1645 let lhs = lhs.as_ref();
1646 let rhs = rhs.as_ref();
1647 let mut out = Vec::with_capacity(len);
1648 for idx in 0..len {
1649 if lhs.is_null(idx) || rhs.is_null(idx) {
1650 out.push(None);
1651 } else {
1652 out.push(Some(compare_bool(op, lhs.value(idx), rhs.value(idx))));
1653 }
1654 }
1655 Ok(out)
1656 }
1657 (ValueArray::Utf8(lhs), ValueArray::Utf8(rhs)) => {
1658 let lhs = lhs.as_ref();
1659 let rhs = rhs.as_ref();
1660 let mut out = Vec::with_capacity(len);
1661 for idx in 0..len {
1662 if lhs.is_null(idx) || rhs.is_null(idx) {
1663 out.push(None);
1664 } else {
1665 out.push(Some(compare_str(op, lhs.value(idx), rhs.value(idx))));
1666 }
1667 }
1668 Ok(out)
1669 }
1670 _ => Err(Error::InvalidArgumentError(
1671 "unsupported comparison between mismatched types in cross product filter".into(),
1672 )),
1673 }
1674 }
1675
1676 fn evaluate_in_list_truths(
1677 &mut self,
1678 target: &ScalarExpr<FieldId>,
1679 list: &[ScalarExpr<FieldId>],
1680 negated: bool,
1681 batch: &RecordBatch,
1682 ) -> ExecutorResult<Vec<Option<bool>>> {
1683 let target_values = self.materialize_value_array(target, batch)?;
1684 let list_values = list
1685 .iter()
1686 .map(|expr| self.materialize_value_array(expr, batch))
1687 .collect::<ExecutorResult<Vec<_>>>()?;
1688
1689 let len = target_values.len();
1690 for values in &list_values {
1691 if values.len() != len {
1692 return Err(Error::Internal(
1693 "mismatched IN list operand lengths in cross product filter".into(),
1694 ));
1695 }
1696 }
1697
1698 match &target_values {
1699 ValueArray::Numeric(target_numeric) => {
1700 let mut out = Vec::with_capacity(len);
1701 for idx in 0..len {
1702 let target_value = match target_numeric.value(idx) {
1703 Some(value) => value,
1704 None => {
1705 out.push(None);
1706 continue;
1707 }
1708 };
1709 let mut has_match = false;
1710 let mut saw_null = false;
1711 for candidate in &list_values {
1712 match candidate {
1713 ValueArray::Numeric(array) => match array.value(idx) {
1714 Some(value) => {
1715 if NumericKernels::compare(CompareOp::Eq, target_value, value) {
1716 has_match = true;
1717 break;
1718 }
1719 }
1720 None => saw_null = true,
1721 },
1722 ValueArray::Null(_) => saw_null = true,
1723 _ => {
1724 return Err(Error::InvalidArgumentError(
1725 "type mismatch in IN list evaluation".into(),
1726 ));
1727 }
1728 }
1729 }
1730 out.push(finalize_in_list_result(has_match, saw_null, negated));
1731 }
1732 Ok(out)
1733 }
1734 ValueArray::Boolean(target_bool) => {
1735 let mut out = Vec::with_capacity(len);
1736 for idx in 0..len {
1737 if target_bool.is_null(idx) {
1738 out.push(None);
1739 continue;
1740 }
1741 let target_value = target_bool.value(idx);
1742 let mut has_match = false;
1743 let mut saw_null = false;
1744 for candidate in &list_values {
1745 match candidate {
1746 ValueArray::Boolean(array) => {
1747 if array.is_null(idx) {
1748 saw_null = true;
1749 } else if array.value(idx) == target_value {
1750 has_match = true;
1751 break;
1752 }
1753 }
1754 ValueArray::Null(_) => saw_null = true,
1755 _ => {
1756 return Err(Error::InvalidArgumentError(
1757 "type mismatch in IN list evaluation".into(),
1758 ));
1759 }
1760 }
1761 }
1762 out.push(finalize_in_list_result(has_match, saw_null, negated));
1763 }
1764 Ok(out)
1765 }
1766 ValueArray::Utf8(target_utf8) => {
1767 let mut out = Vec::with_capacity(len);
1768 for idx in 0..len {
1769 if target_utf8.is_null(idx) {
1770 out.push(None);
1771 continue;
1772 }
1773 let target_value = target_utf8.value(idx);
1774 let mut has_match = false;
1775 let mut saw_null = false;
1776 for candidate in &list_values {
1777 match candidate {
1778 ValueArray::Utf8(array) => {
1779 if array.is_null(idx) {
1780 saw_null = true;
1781 } else if array.value(idx) == target_value {
1782 has_match = true;
1783 break;
1784 }
1785 }
1786 ValueArray::Null(_) => saw_null = true,
1787 _ => {
1788 return Err(Error::InvalidArgumentError(
1789 "type mismatch in IN list evaluation".into(),
1790 ));
1791 }
1792 }
1793 }
1794 out.push(finalize_in_list_result(has_match, saw_null, negated));
1795 }
1796 Ok(out)
1797 }
1798 ValueArray::Null(len) => Ok(vec![None; *len]),
1799 }
1800 }
1801
1802 fn evaluate_numeric(
1803 &mut self,
1804 expr: &ScalarExpr<FieldId>,
1805 batch: &RecordBatch,
1806 ) -> ExecutorResult<ArrayRef> {
1807 let mut required = FxHashSet::default();
1808 collect_field_ids(expr, &mut required);
1809
1810 let mut arrays = NumericArrayMap::default();
1811 for field_id in required {
1812 let numeric = self.numeric_array(field_id, batch)?;
1813 arrays.insert(field_id, numeric);
1814 }
1815
1816 NumericKernels::evaluate_batch(expr, batch.num_rows(), &arrays)
1817 }
1818
1819 fn numeric_array(
1820 &mut self,
1821 field_id: FieldId,
1822 batch: &RecordBatch,
1823 ) -> ExecutorResult<NumericArray> {
1824 if let Some(existing) = self.numeric_cache.get(&field_id) {
1825 return Ok(existing.clone());
1826 }
1827
1828 let column_index = *self.field_id_to_index.get(&field_id).ok_or_else(|| {
1829 Error::Internal("field mapping missing during cross product evaluation".into())
1830 })?;
1831
1832 let array_ref = batch.column(column_index).clone();
1833 let numeric = NumericArray::try_from_arrow(&array_ref)?;
1834 self.numeric_cache.insert(field_id, numeric.clone());
1835 Ok(numeric)
1836 }
1837
1838 fn column_accessor(
1839 &mut self,
1840 field_id: FieldId,
1841 batch: &RecordBatch,
1842 ) -> ExecutorResult<ColumnAccessor> {
1843 if let Some(existing) = self.column_cache.get(&field_id) {
1844 return Ok(existing.clone());
1845 }
1846
1847 let column_index = *self.field_id_to_index.get(&field_id).ok_or_else(|| {
1848 Error::Internal("field mapping missing during cross product evaluation".into())
1849 })?;
1850
1851 let accessor = ColumnAccessor::from_array(batch.column(column_index))?;
1852 self.column_cache.insert(field_id, accessor.clone());
1853 Ok(accessor)
1854 }
1855
1856 fn materialize_scalar_array(
1857 &mut self,
1858 expr: &ScalarExpr<FieldId>,
1859 batch: &RecordBatch,
1860 ) -> ExecutorResult<ArrayRef> {
1861 match expr {
1862 ScalarExpr::Column(field_id) => {
1863 let accessor = self.column_accessor(*field_id, batch)?;
1864 Ok(accessor.as_array_ref())
1865 }
1866 ScalarExpr::Literal(literal) => literal_to_constant_array(literal, batch.num_rows()),
1867 ScalarExpr::Binary { .. } => self.evaluate_numeric(expr, batch),
1868 ScalarExpr::Aggregate(_) => Err(Error::InvalidArgumentError(
1869 "aggregate expressions are not supported in cross product filters".into(),
1870 )),
1871 ScalarExpr::GetField { .. } => Err(Error::InvalidArgumentError(
1872 "struct field access is not supported in cross product filters".into(),
1873 )),
1874 }
1875 }
1876
1877 fn materialize_value_array(
1878 &mut self,
1879 expr: &ScalarExpr<FieldId>,
1880 batch: &RecordBatch,
1881 ) -> ExecutorResult<ValueArray> {
1882 let array = self.materialize_scalar_array(expr, batch)?;
1883 ValueArray::from_array(array)
1884 }
1885}
1886
1887fn collect_field_ids(expr: &ScalarExpr<FieldId>, out: &mut FxHashSet<FieldId>) {
1889 match expr {
1890 ScalarExpr::Column(fid) => {
1891 out.insert(*fid);
1892 }
1893 ScalarExpr::Binary { left, right, .. } => {
1894 collect_field_ids(left, out);
1895 collect_field_ids(right, out);
1896 }
1897 ScalarExpr::Aggregate(call) => match call {
1898 AggregateCall::CountStar => {}
1899 AggregateCall::Count(fid)
1900 | AggregateCall::Sum(fid)
1901 | AggregateCall::Min(fid)
1902 | AggregateCall::Max(fid)
1903 | AggregateCall::CountNulls(fid) => {
1904 out.insert(*fid);
1905 }
1906 },
1907 ScalarExpr::GetField { base, .. } => collect_field_ids(base, out),
1908 ScalarExpr::Literal(_) => {}
1909 }
1910}
1911
1912fn table_column_key(name: &str) -> Option<String> {
1914 let trimmed = name.trim_start_matches('.');
1915 let mut parts = trimmed.split('.').collect::<Vec<_>>();
1916 if parts.len() < 2 {
1917 return None;
1918 }
1919 let column = parts.pop()?;
1920 let table = parts.pop()?;
1921 Some(format!("{}.{}", table, column))
1922}
1923
1924#[derive(Clone)]
1926pub struct SelectExecution<P>
1927where
1928 P: Pager<Blob = EntryHandle> + Send + Sync,
1929{
1930 table_name: String,
1931 schema: Arc<Schema>,
1932 stream: SelectStream<P>,
1933}
1934
1935#[derive(Clone)]
1936enum SelectStream<P>
1937where
1938 P: Pager<Blob = EntryHandle> + Send + Sync,
1939{
1940 Projection {
1941 table: Arc<ExecutorTable<P>>,
1942 projections: Vec<ScanProjection>,
1943 filter_expr: LlkvExpr<'static, FieldId>,
1944 options: ScanStreamOptions<P>,
1945 full_table_scan: bool,
1946 order_by: Vec<OrderByPlan>,
1947 distinct: bool,
1948 },
1949 Aggregation {
1950 batch: RecordBatch,
1951 },
1952}
1953
1954impl<P> SelectExecution<P>
1955where
1956 P: Pager<Blob = EntryHandle> + Send + Sync,
1957{
1958 #[allow(clippy::too_many_arguments)]
1959 fn new_projection(
1960 table_name: String,
1961 schema: Arc<Schema>,
1962 table: Arc<ExecutorTable<P>>,
1963 projections: Vec<ScanProjection>,
1964 filter_expr: LlkvExpr<'static, FieldId>,
1965 options: ScanStreamOptions<P>,
1966 full_table_scan: bool,
1967 order_by: Vec<OrderByPlan>,
1968 distinct: bool,
1969 ) -> Self {
1970 Self {
1971 table_name,
1972 schema,
1973 stream: SelectStream::Projection {
1974 table,
1975 projections,
1976 filter_expr,
1977 options,
1978 full_table_scan,
1979 order_by,
1980 distinct,
1981 },
1982 }
1983 }
1984
1985 pub fn new_single_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
1986 Self {
1987 table_name,
1988 schema,
1989 stream: SelectStream::Aggregation { batch },
1990 }
1991 }
1992
1993 pub fn from_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
1994 Self::new_single_batch(table_name, schema, batch)
1995 }
1996
1997 pub fn table_name(&self) -> &str {
1998 &self.table_name
1999 }
2000
2001 pub fn schema(&self) -> Arc<Schema> {
2002 Arc::clone(&self.schema)
2003 }
2004
2005 pub fn stream(
2006 self,
2007 mut on_batch: impl FnMut(RecordBatch) -> ExecutorResult<()>,
2008 ) -> ExecutorResult<()> {
2009 let schema = Arc::clone(&self.schema);
2010 match self.stream {
2011 SelectStream::Projection {
2012 table,
2013 projections,
2014 filter_expr,
2015 options,
2016 full_table_scan,
2017 order_by,
2018 distinct,
2019 } => {
2020 let total_rows = table.total_rows.load(Ordering::SeqCst);
2022 if total_rows == 0 {
2023 return Ok(());
2025 }
2026
2027 let mut error: Option<Error> = None;
2028 let mut produced = false;
2029 let mut produced_rows: u64 = 0;
2030 let capture_nulls_first = matches!(options.order, Some(spec) if spec.nulls_first);
2031 let needs_post_sort = order_by.len() > 1;
2032 let collect_batches = needs_post_sort || capture_nulls_first;
2033 let include_nulls = options.include_nulls;
2034 let has_row_id_filter = options.row_id_filter.is_some();
2035 let mut distinct_state = if distinct {
2036 Some(DistinctState::default())
2037 } else {
2038 None
2039 };
2040 let scan_options = options;
2041 let mut buffered_batches: Vec<RecordBatch> = Vec::new();
2042 table
2043 .table
2044 .scan_stream(projections, &filter_expr, scan_options, |batch| {
2045 if error.is_some() {
2046 return;
2047 }
2048 let mut batch = batch;
2049 if let Some(state) = distinct_state.as_mut() {
2050 match distinct_filter_batch(batch, state) {
2051 Ok(Some(filtered)) => {
2052 batch = filtered;
2053 }
2054 Ok(None) => {
2055 return;
2056 }
2057 Err(err) => {
2058 error = Some(err);
2059 return;
2060 }
2061 }
2062 }
2063 produced = true;
2064 produced_rows = produced_rows.saturating_add(batch.num_rows() as u64);
2065 if collect_batches {
2066 buffered_batches.push(batch);
2067 } else if let Err(err) = on_batch(batch) {
2068 error = Some(err);
2069 }
2070 })?;
2071 if let Some(err) = error {
2072 return Err(err);
2073 }
2074 if !produced {
2075 if !distinct && full_table_scan && total_rows > 0 {
2078 for batch in synthesize_null_scan(Arc::clone(&schema), total_rows)? {
2079 on_batch(batch)?;
2080 }
2081 }
2082 return Ok(());
2083 }
2084 let mut null_batches: Vec<RecordBatch> = Vec::new();
2085 if !distinct
2091 && include_nulls
2092 && full_table_scan
2093 && produced_rows < total_rows
2094 && !has_row_id_filter
2095 {
2096 let missing = total_rows - produced_rows;
2097 if missing > 0 {
2098 null_batches = synthesize_null_scan(Arc::clone(&schema), missing)?;
2099 }
2100 }
2101
2102 if collect_batches {
2103 if needs_post_sort {
2104 if !null_batches.is_empty() {
2105 buffered_batches.extend(null_batches);
2106 }
2107 if !buffered_batches.is_empty() {
2108 let combined =
2109 concat_batches(&schema, &buffered_batches).map_err(|err| {
2110 Error::InvalidArgumentError(format!(
2111 "failed to concatenate result batches for ORDER BY: {}",
2112 err
2113 ))
2114 })?;
2115 let sorted_batch =
2116 sort_record_batch_with_order(&schema, &combined, &order_by)?;
2117 on_batch(sorted_batch)?;
2118 }
2119 } else if capture_nulls_first {
2120 for batch in null_batches {
2121 on_batch(batch)?;
2122 }
2123 for batch in buffered_batches {
2124 on_batch(batch)?;
2125 }
2126 }
2127 } else if !null_batches.is_empty() {
2128 for batch in null_batches {
2129 on_batch(batch)?;
2130 }
2131 }
2132 Ok(())
2133 }
2134 SelectStream::Aggregation { batch } => on_batch(batch),
2135 }
2136 }
2137
2138 pub fn collect(self) -> ExecutorResult<Vec<RecordBatch>> {
2139 let mut batches = Vec::new();
2140 self.stream(|batch| {
2141 batches.push(batch);
2142 Ok(())
2143 })?;
2144 Ok(batches)
2145 }
2146
2147 pub fn collect_rows(self) -> ExecutorResult<ExecutorRowBatch> {
2148 let schema = self.schema();
2149 let mut rows: Vec<Vec<PlanValue>> = Vec::new();
2150 self.stream(|batch| {
2151 for row_idx in 0..batch.num_rows() {
2152 let mut row: Vec<PlanValue> = Vec::with_capacity(batch.num_columns());
2153 for col_idx in 0..batch.num_columns() {
2154 let value = llkv_plan::plan_value_from_array(batch.column(col_idx), row_idx)?;
2155 row.push(value);
2156 }
2157 rows.push(row);
2158 }
2159 Ok(())
2160 })?;
2161 let columns = schema
2162 .fields()
2163 .iter()
2164 .map(|field| field.name().to_string())
2165 .collect();
2166 Ok(ExecutorRowBatch { columns, rows })
2167 }
2168
2169 pub fn into_rows(self) -> ExecutorResult<Vec<Vec<PlanValue>>> {
2170 Ok(self.collect_rows()?.rows)
2171 }
2172}
2173
2174impl<P> fmt::Debug for SelectExecution<P>
2175where
2176 P: Pager<Blob = EntryHandle> + Send + Sync,
2177{
2178 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2179 f.debug_struct("SelectExecution")
2180 .field("table_name", &self.table_name)
2181 .field("schema", &self.schema)
2182 .finish()
2183 }
2184}
2185
2186fn expand_order_targets(
2191 order_items: &[OrderByPlan],
2192 projections: &[ScanProjection],
2193) -> ExecutorResult<Vec<OrderByPlan>> {
2194 let mut expanded = Vec::new();
2195
2196 for item in order_items {
2197 match &item.target {
2198 OrderTarget::All => {
2199 if projections.is_empty() {
2200 return Err(Error::InvalidArgumentError(
2201 "ORDER BY ALL requires at least one projection".into(),
2202 ));
2203 }
2204
2205 for (idx, projection) in projections.iter().enumerate() {
2206 if matches!(projection, ScanProjection::Computed { .. }) {
2207 return Err(Error::InvalidArgumentError(
2208 "ORDER BY ALL cannot reference computed projections".into(),
2209 ));
2210 }
2211
2212 let mut clone = item.clone();
2213 clone.target = OrderTarget::Index(idx);
2214 expanded.push(clone);
2215 }
2216 }
2217 _ => expanded.push(item.clone()),
2218 }
2219 }
2220
2221 Ok(expanded)
2222}
2223
2224fn resolve_scan_order<P>(
2225 table: &ExecutorTable<P>,
2226 projections: &[ScanProjection],
2227 order_plan: &OrderByPlan,
2228) -> ExecutorResult<ScanOrderSpec>
2229where
2230 P: Pager<Blob = EntryHandle> + Send + Sync,
2231{
2232 let (column, field_id) = match &order_plan.target {
2233 OrderTarget::Column(name) => {
2234 let column = table.schema.resolve(name).ok_or_else(|| {
2235 Error::InvalidArgumentError(format!("unknown column '{}' in ORDER BY", name))
2236 })?;
2237 (column, column.field_id)
2238 }
2239 OrderTarget::Index(position) => {
2240 let projection = projections.get(*position).ok_or_else(|| {
2241 Error::InvalidArgumentError(format!(
2242 "ORDER BY position {} is out of range",
2243 position + 1
2244 ))
2245 })?;
2246 match projection {
2247 ScanProjection::Column(store_projection) => {
2248 let field_id = store_projection.logical_field_id.field_id();
2249 let column = table.schema.column_by_field_id(field_id).ok_or_else(|| {
2250 Error::InvalidArgumentError(format!(
2251 "unknown column with field id {field_id} in ORDER BY"
2252 ))
2253 })?;
2254 (column, field_id)
2255 }
2256 ScanProjection::Computed { .. } => {
2257 return Err(Error::InvalidArgumentError(
2258 "ORDER BY position referring to computed projection is not supported"
2259 .into(),
2260 ));
2261 }
2262 }
2263 }
2264 OrderTarget::All => {
2265 return Err(Error::InvalidArgumentError(
2266 "ORDER BY ALL should be expanded before execution".into(),
2267 ));
2268 }
2269 };
2270
2271 let transform = match order_plan.sort_type {
2272 OrderSortType::Native => match column.data_type {
2273 DataType::Int64 => ScanOrderTransform::IdentityInteger,
2274 DataType::Utf8 => ScanOrderTransform::IdentityUtf8,
2275 ref other => {
2276 return Err(Error::InvalidArgumentError(format!(
2277 "ORDER BY on column type {:?} is not supported",
2278 other
2279 )));
2280 }
2281 },
2282 OrderSortType::CastTextToInteger => {
2283 if column.data_type != DataType::Utf8 {
2284 return Err(Error::InvalidArgumentError(
2285 "ORDER BY CAST expects a text column".into(),
2286 ));
2287 }
2288 ScanOrderTransform::CastUtf8ToInteger
2289 }
2290 };
2291
2292 let direction = if order_plan.ascending {
2293 ScanOrderDirection::Ascending
2294 } else {
2295 ScanOrderDirection::Descending
2296 };
2297
2298 Ok(ScanOrderSpec {
2299 field_id,
2300 direction,
2301 nulls_first: order_plan.nulls_first,
2302 transform,
2303 })
2304}
2305
2306fn synthesize_null_scan(schema: Arc<Schema>, total_rows: u64) -> ExecutorResult<Vec<RecordBatch>> {
2307 let row_count = usize::try_from(total_rows).map_err(|_| {
2308 Error::InvalidArgumentError("table row count exceeds supported in-memory batch size".into())
2309 })?;
2310
2311 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
2312 for field in schema.fields() {
2313 match field.data_type() {
2314 DataType::Int64 => {
2315 let mut builder = Int64Builder::with_capacity(row_count);
2316 for _ in 0..row_count {
2317 builder.append_null();
2318 }
2319 arrays.push(Arc::new(builder.finish()));
2320 }
2321 DataType::Float64 => {
2322 let mut builder = arrow::array::Float64Builder::with_capacity(row_count);
2323 for _ in 0..row_count {
2324 builder.append_null();
2325 }
2326 arrays.push(Arc::new(builder.finish()));
2327 }
2328 DataType::Utf8 => {
2329 let mut builder = arrow::array::StringBuilder::with_capacity(row_count, 0);
2330 for _ in 0..row_count {
2331 builder.append_null();
2332 }
2333 arrays.push(Arc::new(builder.finish()));
2334 }
2335 DataType::Date32 => {
2336 let mut builder = arrow::array::Date32Builder::with_capacity(row_count);
2337 for _ in 0..row_count {
2338 builder.append_null();
2339 }
2340 arrays.push(Arc::new(builder.finish()));
2341 }
2342 other => {
2343 return Err(Error::InvalidArgumentError(format!(
2344 "unsupported data type in null synthesis: {other:?}"
2345 )));
2346 }
2347 }
2348 }
2349
2350 let batch = RecordBatch::try_new(schema, arrays)?;
2351 Ok(vec![batch])
2352}
2353
2354struct TableCrossProductData {
2355 schema: Arc<Schema>,
2356 batches: Vec<RecordBatch>,
2357 column_counts: Vec<usize>,
2358}
2359
2360fn collect_table_data<P>(
2361 table_ref: &llkv_plan::TableRef,
2362 table: &ExecutorTable<P>,
2363) -> ExecutorResult<TableCrossProductData>
2364where
2365 P: Pager<Blob = EntryHandle> + Send + Sync,
2366{
2367 if table.schema.columns.is_empty() {
2368 return Err(Error::InvalidArgumentError(format!(
2369 "table '{}' has no columns; cross products require at least one column",
2370 table_ref.qualified_name()
2371 )));
2372 }
2373
2374 let mut projections = Vec::with_capacity(table.schema.columns.len());
2375 let mut fields = Vec::with_capacity(table.schema.columns.len());
2376
2377 for column in &table.schema.columns {
2378 let qualified_name = format!("{}.{}.{}", table_ref.schema, table_ref.table, column.name);
2379 projections.push(ScanProjection::from(StoreProjection::with_alias(
2380 LogicalFieldId::for_user(table.table.table_id(), column.field_id),
2381 qualified_name.clone(),
2382 )));
2383 fields.push(Field::new(
2384 qualified_name,
2385 column.data_type.clone(),
2386 column.nullable,
2387 ));
2388 }
2389
2390 let schema = Arc::new(Schema::new(fields));
2391
2392 let filter_field_id = table.schema.first_field_id().unwrap_or(ROW_ID_FIELD_ID);
2393 let filter_expr = crate::translation::expression::full_table_scan_filter(filter_field_id);
2394
2395 let mut raw_batches = Vec::new();
2396 table.table.scan_stream(
2397 projections,
2398 &filter_expr,
2399 ScanStreamOptions {
2400 include_nulls: true,
2401 ..ScanStreamOptions::default()
2402 },
2403 |batch| {
2404 raw_batches.push(batch);
2405 },
2406 )?;
2407
2408 let mut normalized_batches = Vec::with_capacity(raw_batches.len());
2409 for batch in raw_batches {
2410 let normalized = RecordBatch::try_new(Arc::clone(&schema), batch.columns().to_vec())
2411 .map_err(|err| {
2412 Error::Internal(format!(
2413 "failed to align scan batch for table '{}': {}",
2414 table_ref.qualified_name(),
2415 err
2416 ))
2417 })?;
2418 normalized_batches.push(normalized);
2419 }
2420
2421 Ok(TableCrossProductData {
2422 schema,
2423 batches: normalized_batches,
2424 column_counts: vec![table.schema.columns.len()],
2425 })
2426}
2427
2428fn build_cross_product_column_lookup(
2429 schema: &Schema,
2430 tables: &[llkv_plan::TableRef],
2431 column_counts: &[usize],
2432) -> FxHashMap<String, usize> {
2433 debug_assert_eq!(tables.len(), column_counts.len());
2434
2435 let mut table_column_counts: FxHashMap<String, usize> = FxHashMap::default();
2436 for field in schema.fields() {
2437 if let Some(name) = table_column_key(field.name()) {
2438 *table_column_counts
2439 .entry(name.to_ascii_lowercase())
2440 .or_insert(0) += 1;
2441 }
2442 }
2443
2444 let mut lookup = FxHashMap::default();
2445
2446 for (idx, field) in schema.fields().iter().enumerate() {
2447 let name = field.name();
2448 lookup.entry(name.to_ascii_lowercase()).or_insert(idx);
2449
2450 let trimmed = name.trim_start_matches('.');
2451 lookup.entry(trimmed.to_ascii_lowercase()).or_insert(idx);
2452
2453 let mut parts: Vec<&str> = trimmed.split('.').collect();
2454 if parts.len() >= 2 {
2455 let column_part = parts.pop().unwrap();
2456 let column_lower = column_part.to_ascii_lowercase();
2457
2458 if let Some(table_only) = parts.last() {
2459 let table_lower = table_only.to_ascii_lowercase();
2460 if table_column_counts
2461 .get(&format!("{}.{}", table_lower, column_lower))
2462 .copied()
2463 .unwrap_or(0)
2464 == 1
2465 {
2466 lookup
2467 .entry(format!("{}.{}", table_lower, column_lower.clone()))
2468 .or_insert(idx);
2469 }
2470 }
2471 }
2472 }
2473
2474 let mut offset = 0usize;
2475 for (table_ref, &count) in tables.iter().zip(column_counts.iter()) {
2476 if let Some(alias) = &table_ref.alias {
2477 let alias_lower = alias.to_ascii_lowercase();
2478 let end = usize::min(schema.fields().len(), offset.saturating_add(count));
2479 for column_index in offset..end {
2480 let name = schema.field(column_index).name();
2481 let trimmed = name.trim_start_matches('.');
2482 let column_lower = trimmed
2483 .rsplit('.')
2484 .next()
2485 .map(|part| part.to_ascii_lowercase())
2486 .unwrap_or_else(|| trimmed.to_ascii_lowercase());
2487 lookup
2488 .entry(format!("{}.{}", alias_lower, column_lower))
2489 .or_insert(column_index);
2490 }
2491 }
2492 offset = offset.saturating_add(count);
2493 }
2494
2495 lookup
2496}
2497
2498fn cross_join_table_batches(
2499 left: TableCrossProductData,
2500 right: TableCrossProductData,
2501) -> ExecutorResult<TableCrossProductData> {
2502 let TableCrossProductData {
2503 schema: left_schema,
2504 batches: left_batches,
2505 column_counts: mut left_counts,
2506 } = left;
2507 let TableCrossProductData {
2508 schema: right_schema,
2509 batches: right_batches,
2510 column_counts: right_counts,
2511 } = right;
2512
2513 let combined_fields: Vec<Field> = left_schema
2514 .fields()
2515 .iter()
2516 .chain(right_schema.fields().iter())
2517 .map(|field| field.as_ref().clone())
2518 .collect();
2519
2520 let mut column_counts = Vec::with_capacity(left_counts.len() + right_counts.len());
2521 column_counts.append(&mut left_counts);
2522 column_counts.extend(right_counts);
2523
2524 let combined_schema = Arc::new(Schema::new(combined_fields));
2525
2526 let left_has_rows = left_batches.iter().any(|batch| batch.num_rows() > 0);
2527 let right_has_rows = right_batches.iter().any(|batch| batch.num_rows() > 0);
2528
2529 if !left_has_rows || !right_has_rows {
2530 return Ok(TableCrossProductData {
2531 schema: combined_schema,
2532 batches: Vec::new(),
2533 column_counts,
2534 });
2535 }
2536
2537 let mut output_batches = Vec::new();
2538 for left_batch in &left_batches {
2539 if left_batch.num_rows() == 0 {
2540 continue;
2541 }
2542 for right_batch in &right_batches {
2543 if right_batch.num_rows() == 0 {
2544 continue;
2545 }
2546
2547 let batch =
2548 cross_join_pair(left_batch, right_batch, &combined_schema).map_err(|err| {
2549 Error::Internal(format!("failed to build cross join batch: {err}"))
2550 })?;
2551 output_batches.push(batch);
2552 }
2553 }
2554
2555 Ok(TableCrossProductData {
2556 schema: combined_schema,
2557 batches: output_batches,
2558 column_counts,
2559 })
2560}
2561
2562#[derive(Default)]
2563struct DistinctState {
2564 seen: FxHashSet<CanonicalRow>,
2565}
2566
2567impl DistinctState {
2568 fn insert(&mut self, row: CanonicalRow) -> bool {
2569 self.seen.insert(row)
2570 }
2571}
2572
2573fn distinct_filter_batch(
2574 batch: RecordBatch,
2575 state: &mut DistinctState,
2576) -> ExecutorResult<Option<RecordBatch>> {
2577 if batch.num_rows() == 0 {
2578 return Ok(None);
2579 }
2580
2581 let mut keep_flags = Vec::with_capacity(batch.num_rows());
2582 let mut keep_count = 0usize;
2583
2584 for row_idx in 0..batch.num_rows() {
2585 let row = CanonicalRow::from_batch(&batch, row_idx)?;
2586 if state.insert(row) {
2587 keep_flags.push(true);
2588 keep_count += 1;
2589 } else {
2590 keep_flags.push(false);
2591 }
2592 }
2593
2594 if keep_count == 0 {
2595 return Ok(None);
2596 }
2597
2598 if keep_count == batch.num_rows() {
2599 return Ok(Some(batch));
2600 }
2601
2602 let mut builder = BooleanBuilder::with_capacity(batch.num_rows());
2603 for flag in keep_flags {
2604 builder.append_value(flag);
2605 }
2606 let mask = Arc::new(builder.finish());
2607
2608 let filtered = filter_record_batch(&batch, &mask).map_err(|err| {
2609 Error::InvalidArgumentError(format!("failed to apply DISTINCT filter: {err}"))
2610 })?;
2611
2612 Ok(Some(filtered))
2613}
2614
2615fn sort_record_batch_with_order(
2616 schema: &Arc<Schema>,
2617 batch: &RecordBatch,
2618 order_by: &[OrderByPlan],
2619) -> ExecutorResult<RecordBatch> {
2620 if order_by.is_empty() {
2621 return Ok(batch.clone());
2622 }
2623
2624 let mut sort_columns: Vec<SortColumn> = Vec::with_capacity(order_by.len());
2625
2626 for order in order_by {
2627 let column_index = match &order.target {
2628 OrderTarget::Column(name) => schema.index_of(name).map_err(|_| {
2629 Error::InvalidArgumentError(format!(
2630 "ORDER BY references unknown column '{}'",
2631 name
2632 ))
2633 })?,
2634 OrderTarget::Index(idx) => {
2635 if *idx >= batch.num_columns() {
2636 return Err(Error::InvalidArgumentError(format!(
2637 "ORDER BY position {} is out of bounds for {} columns",
2638 idx + 1,
2639 batch.num_columns()
2640 )));
2641 }
2642 *idx
2643 }
2644 OrderTarget::All => {
2645 return Err(Error::InvalidArgumentError(
2646 "ORDER BY ALL should be expanded before sorting".into(),
2647 ));
2648 }
2649 };
2650
2651 let source_array = batch.column(column_index);
2652
2653 let values: ArrayRef = match order.sort_type {
2654 OrderSortType::Native => Arc::clone(source_array),
2655 OrderSortType::CastTextToInteger => {
2656 let strings = source_array
2657 .as_any()
2658 .downcast_ref::<StringArray>()
2659 .ok_or_else(|| {
2660 Error::InvalidArgumentError(
2661 "ORDER BY CAST expects the underlying column to be TEXT".into(),
2662 )
2663 })?;
2664 let mut builder = Int64Builder::with_capacity(strings.len());
2665 for i in 0..strings.len() {
2666 if strings.is_null(i) {
2667 builder.append_null();
2668 } else {
2669 match strings.value(i).parse::<i64>() {
2670 Ok(value) => builder.append_value(value),
2671 Err(_) => builder.append_null(),
2672 }
2673 }
2674 }
2675 Arc::new(builder.finish()) as ArrayRef
2676 }
2677 };
2678
2679 let sort_options = SortOptions {
2680 descending: !order.ascending,
2681 nulls_first: order.nulls_first,
2682 };
2683
2684 sort_columns.push(SortColumn {
2685 values,
2686 options: Some(sort_options),
2687 });
2688 }
2689
2690 let indices = lexsort_to_indices(&sort_columns, None).map_err(|err| {
2691 Error::InvalidArgumentError(format!("failed to compute ORDER BY indices: {err}"))
2692 })?;
2693
2694 let perm = indices
2695 .as_any()
2696 .downcast_ref::<UInt32Array>()
2697 .ok_or_else(|| Error::Internal("ORDER BY sorting produced unexpected index type".into()))?;
2698
2699 let mut reordered_columns: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
2700 for col_idx in 0..batch.num_columns() {
2701 let reordered = take(batch.column(col_idx), perm, None).map_err(|err| {
2702 Error::InvalidArgumentError(format!(
2703 "failed to apply ORDER BY permutation to column {col_idx}: {err}"
2704 ))
2705 })?;
2706 reordered_columns.push(reordered);
2707 }
2708
2709 RecordBatch::try_new(Arc::clone(schema), reordered_columns)
2710 .map_err(|err| Error::Internal(format!("failed to build reordered ORDER BY batch: {err}")))
2711}
2712
2713#[cfg(test)]
2714mod tests {
2715 use super::*;
2716 use arrow::array::{Array, ArrayRef, Int64Array};
2717 use arrow::datatypes::{DataType, Field, Schema};
2718 use llkv_expr::expr::BinaryOp;
2719 use std::sync::Arc;
2720
2721 #[test]
2722 fn cross_product_context_evaluates_expressions() {
2723 let schema = Arc::new(Schema::new(vec![
2724 Field::new("main.tab2.a", DataType::Int64, false),
2725 Field::new("main.tab2.b", DataType::Int64, false),
2726 ]));
2727
2728 let batch = RecordBatch::try_new(
2729 Arc::clone(&schema),
2730 vec![
2731 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
2732 Arc::new(Int64Array::from(vec![10, 20, 30])) as ArrayRef,
2733 ],
2734 )
2735 .expect("valid batch");
2736
2737 let lookup = build_cross_product_column_lookup(schema.as_ref(), &[], &[]);
2738 let mut ctx = CrossProductExpressionContext::new(schema.as_ref(), lookup)
2739 .expect("context builds from schema");
2740
2741 let literal_expr: ScalarExpr<String> = ScalarExpr::literal(67);
2742 let literal = ctx
2743 .evaluate(&literal_expr, &batch)
2744 .expect("literal evaluation succeeds");
2745 let literal_array = literal
2746 .as_any()
2747 .downcast_ref::<Int64Array>()
2748 .expect("int64 literal result");
2749 assert_eq!(literal_array.len(), 3);
2750 assert!(literal_array.iter().all(|value| value == Some(67)));
2751
2752 let add_expr = ScalarExpr::binary(
2753 ScalarExpr::column("tab2.a".to_string()),
2754 BinaryOp::Add,
2755 ScalarExpr::literal(5),
2756 );
2757 let added = ctx
2758 .evaluate(&add_expr, &batch)
2759 .expect("column addition succeeds");
2760 let added_array = added
2761 .as_any()
2762 .downcast_ref::<Int64Array>()
2763 .expect("int64 addition result");
2764 assert_eq!(added_array.values(), &[6, 7, 8]);
2765 }
2766
2767 #[test]
2768 fn cross_product_handles_more_than_two_tables() {
2769 let schema_a = Arc::new(Schema::new(vec![Field::new(
2770 "main.t1.a",
2771 DataType::Int64,
2772 false,
2773 )]));
2774 let schema_b = Arc::new(Schema::new(vec![Field::new(
2775 "main.t2.b",
2776 DataType::Int64,
2777 false,
2778 )]));
2779 let schema_c = Arc::new(Schema::new(vec![Field::new(
2780 "main.t3.c",
2781 DataType::Int64,
2782 false,
2783 )]));
2784
2785 let batch_a = RecordBatch::try_new(
2786 Arc::clone(&schema_a),
2787 vec![Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef],
2788 )
2789 .expect("valid batch");
2790 let batch_b = RecordBatch::try_new(
2791 Arc::clone(&schema_b),
2792 vec![Arc::new(Int64Array::from(vec![10, 20, 30])) as ArrayRef],
2793 )
2794 .expect("valid batch");
2795 let batch_c = RecordBatch::try_new(
2796 Arc::clone(&schema_c),
2797 vec![Arc::new(Int64Array::from(vec![100])) as ArrayRef],
2798 )
2799 .expect("valid batch");
2800
2801 let data_a = TableCrossProductData {
2802 schema: schema_a,
2803 batches: vec![batch_a],
2804 column_counts: vec![1],
2805 };
2806 let data_b = TableCrossProductData {
2807 schema: schema_b,
2808 batches: vec![batch_b],
2809 column_counts: vec![1],
2810 };
2811 let data_c = TableCrossProductData {
2812 schema: schema_c,
2813 batches: vec![batch_c],
2814 column_counts: vec![1],
2815 };
2816
2817 let ab = cross_join_table_batches(data_a, data_b).expect("two-table product");
2818 assert_eq!(ab.schema.fields().len(), 2);
2819 assert_eq!(ab.batches.len(), 1);
2820 assert_eq!(ab.batches[0].num_rows(), 6);
2821
2822 let abc = cross_join_table_batches(ab, data_c).expect("three-table product");
2823 assert_eq!(abc.schema.fields().len(), 3);
2824 assert_eq!(abc.batches.len(), 1);
2825
2826 let final_batch = &abc.batches[0];
2827 assert_eq!(final_batch.num_rows(), 6);
2828
2829 let col_a = final_batch
2830 .column(0)
2831 .as_any()
2832 .downcast_ref::<Int64Array>()
2833 .expect("left column values");
2834 assert_eq!(col_a.values(), &[1, 1, 1, 2, 2, 2]);
2835
2836 let col_b = final_batch
2837 .column(1)
2838 .as_any()
2839 .downcast_ref::<Int64Array>()
2840 .expect("middle column values");
2841 assert_eq!(col_b.values(), &[10, 20, 30, 10, 20, 30]);
2842
2843 let col_c = final_batch
2844 .column(2)
2845 .as_any()
2846 .downcast_ref::<Int64Array>()
2847 .expect("right column values");
2848 assert_eq!(col_c.values(), &[100, 100, 100, 100, 100, 100]);
2849 }
2850}