1use arrow::array::{Array, ArrayRef, Int64Builder, RecordBatch, StringArray, UInt32Array};
2use arrow::compute::{SortColumn, SortOptions, concat_batches, lexsort_to_indices, take};
3use arrow::datatypes::{DataType, Schema};
4use llkv_aggregate::{AggregateAccumulator, AggregateKind, AggregateSpec, AggregateState};
5use llkv_column_map::store::Projection as StoreProjection;
6use llkv_column_map::types::LogicalFieldId;
7use llkv_expr::expr::{Expr as LlkvExpr, Filter, Operator, ScalarExpr};
8use llkv_plan::{
9 AggregateExpr, AggregateFunction, OrderByPlan, OrderSortType, OrderTarget, PlanValue,
10 SelectPlan, SelectProjection,
11};
12use llkv_result::Error;
13use llkv_storage::pager::Pager;
14use llkv_table::table::{
15 RowIdFilter, ScanOrderDirection, ScanOrderSpec, ScanOrderTransform, ScanProjection,
16 ScanStreamOptions,
17};
18use llkv_table::types::FieldId;
19use rustc_hash::FxHashMap;
20use simd_r_drive_entry_handle::EntryHandle;
21use std::fmt;
22use std::ops::Bound;
23use std::sync::atomic::{AtomicU64, Ordering};
24use std::sync::{Arc, RwLock};
25
26pub type ExecutorResult<T> = Result<T, Error>;
27
28mod projections;
29mod schema;
30pub use projections::{build_projected_columns, build_wildcard_projections};
31pub use schema::schema_for_projections;
32
33pub trait TableProvider<P>
35where
36 P: Pager<Blob = EntryHandle> + Send + Sync,
37{
38 fn get_table(&self, canonical_name: &str) -> ExecutorResult<Arc<ExecutorTable<P>>>;
39}
40
41pub struct QueryExecutor<P>
43where
44 P: Pager<Blob = EntryHandle> + Send + Sync,
45{
46 provider: Arc<dyn TableProvider<P>>,
47}
48
49impl<P> QueryExecutor<P>
50where
51 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
52{
53 pub fn new(provider: Arc<dyn TableProvider<P>>) -> Self {
54 Self { provider }
55 }
56
57 pub fn execute_select(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
58 self.execute_select_with_filter(plan, None)
59 }
60
61 pub fn execute_select_with_filter(
62 &self,
63 plan: SelectPlan,
64 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
65 ) -> ExecutorResult<SelectExecution<P>> {
66 if plan.tables.is_empty() {
68 return self.execute_select_without_table(plan);
69 }
70
71 if plan.tables.len() > 1 {
73 return self.execute_cross_product(plan);
74 }
75
76 let table_ref = &plan.tables[0];
78 let table = self.provider.get_table(&table_ref.qualified_name())?;
79 let display_name = table_ref.qualified_name();
80
81 if !plan.aggregates.is_empty() {
82 self.execute_aggregates(table, display_name, plan, row_filter)
83 } else if self.has_computed_aggregates(&plan) {
84 self.execute_computed_aggregates(table, display_name, plan, row_filter)
86 } else {
87 self.execute_projection(table, display_name, plan, row_filter)
88 }
89 }
90
91 fn has_computed_aggregates(&self, plan: &SelectPlan) -> bool {
93 plan.projections.iter().any(|proj| {
94 if let SelectProjection::Computed { expr, .. } = proj {
95 Self::expr_contains_aggregate(expr)
96 } else {
97 false
98 }
99 })
100 }
101
102 fn expr_contains_aggregate(expr: &ScalarExpr<String>) -> bool {
104 match expr {
105 ScalarExpr::Aggregate(_) => true,
106 ScalarExpr::Binary { left, right, .. } => {
107 Self::expr_contains_aggregate(left) || Self::expr_contains_aggregate(right)
108 }
109 ScalarExpr::GetField { base, .. } => Self::expr_contains_aggregate(base),
110 ScalarExpr::Column(_) | ScalarExpr::Literal(_) => false,
111 }
112 }
113
114 fn execute_select_without_table(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
116 use arrow::array::ArrayRef;
117 use arrow::datatypes::Field;
118
119 let mut fields = Vec::new();
121 let mut arrays: Vec<ArrayRef> = Vec::new();
122
123 for proj in &plan.projections {
124 match proj {
125 SelectProjection::Computed { expr, alias } => {
126 let (field_name, dtype, array) = match expr {
128 ScalarExpr::Literal(lit) => {
129 let (dtype, array) = Self::literal_to_array(lit)?;
130 (alias.clone(), dtype, array)
131 }
132 _ => {
133 return Err(Error::InvalidArgumentError(
134 "SELECT without FROM only supports literal expressions".into(),
135 ));
136 }
137 };
138
139 fields.push(Field::new(field_name, dtype, true));
140 arrays.push(array);
141 }
142 _ => {
143 return Err(Error::InvalidArgumentError(
144 "SELECT without FROM only supports computed projections".into(),
145 ));
146 }
147 }
148 }
149
150 let schema = Arc::new(Schema::new(fields));
151 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)
152 .map_err(|e| Error::Internal(format!("failed to create record batch: {}", e)))?;
153
154 Ok(SelectExecution::new_single_batch(
155 String::new(), schema,
157 batch,
158 ))
159 }
160
161 fn literal_to_array(lit: &llkv_expr::literal::Literal) -> ExecutorResult<(DataType, ArrayRef)> {
163 use arrow::array::{
164 ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray, StructArray,
165 new_null_array,
166 };
167 use arrow::datatypes::{DataType, Field};
168 use llkv_expr::literal::Literal;
169
170 match lit {
171 Literal::Integer(v) => {
172 let val = i64::try_from(*v).unwrap_or(0);
173 Ok((
174 DataType::Int64,
175 Arc::new(Int64Array::from(vec![val])) as ArrayRef,
176 ))
177 }
178 Literal::Float(v) => Ok((
179 DataType::Float64,
180 Arc::new(Float64Array::from(vec![*v])) as ArrayRef,
181 )),
182 Literal::Boolean(v) => Ok((
183 DataType::Boolean,
184 Arc::new(BooleanArray::from(vec![*v])) as ArrayRef,
185 )),
186 Literal::String(v) => Ok((
187 DataType::Utf8,
188 Arc::new(StringArray::from(vec![v.clone()])) as ArrayRef,
189 )),
190 Literal::Null => Ok((DataType::Null, new_null_array(&DataType::Null, 1))),
191 Literal::Struct(struct_fields) => {
192 let mut inner_fields = Vec::new();
194 let mut inner_arrays = Vec::new();
195
196 for (field_name, field_lit) in struct_fields {
197 let (field_dtype, field_array) = Self::literal_to_array(field_lit)?;
198 inner_fields.push(Field::new(field_name.clone(), field_dtype, true));
199 inner_arrays.push(field_array);
200 }
201
202 let struct_array =
203 StructArray::try_new(inner_fields.clone().into(), inner_arrays, None).map_err(
204 |e| Error::Internal(format!("failed to create struct array: {}", e)),
205 )?;
206
207 Ok((
208 DataType::Struct(inner_fields.into()),
209 Arc::new(struct_array) as ArrayRef,
210 ))
211 }
212 }
213 }
214
215 fn execute_cross_product(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
217 use arrow::compute::concat_batches;
218
219 if plan.tables.len() < 2 {
220 return Err(Error::InvalidArgumentError(
221 "cross product requires at least 2 tables".into(),
222 ));
223 }
224
225 let mut tables = Vec::new();
227 for table_ref in &plan.tables {
228 let qualified_name = table_ref.qualified_name();
229 let table = self.provider.get_table(&qualified_name)?;
230 tables.push((table_ref.clone(), table));
231 }
232
233 if tables.len() > 2 {
235 return Err(Error::InvalidArgumentError(
236 "cross products with more than 2 tables not yet supported".into(),
237 ));
238 }
239
240 let (left_ref, left_table) = &tables[0];
241 let (right_ref, right_table) = &tables[1];
242
243 use llkv_join::{JoinOptions, JoinType, TableJoinExt};
246
247 let mut result_batches = Vec::new();
248 left_table.table.join_stream(
249 &right_table.table,
250 &[], &JoinOptions {
252 join_type: JoinType::Inner,
253 ..Default::default()
254 },
255 |batch| {
256 result_batches.push(batch);
257 },
258 )?;
259
260 let mut combined_fields = Vec::new();
262
263 for col in &left_table.schema.columns {
265 let qualified_name = format!("{}.{}.{}", left_ref.schema, left_ref.table, col.name);
266 combined_fields.push(arrow::datatypes::Field::new(
267 qualified_name,
268 col.data_type.clone(),
269 col.nullable,
270 ));
271 }
272
273 for col in &right_table.schema.columns {
275 let qualified_name = format!("{}.{}.{}", right_ref.schema, right_ref.table, col.name);
276 combined_fields.push(arrow::datatypes::Field::new(
277 qualified_name,
278 col.data_type.clone(),
279 col.nullable,
280 ));
281 }
282
283 let combined_schema = Arc::new(Schema::new(combined_fields));
284
285 let mut combined_batch = if result_batches.is_empty() {
287 RecordBatch::new_empty(Arc::clone(&combined_schema))
288 } else if result_batches.len() == 1 {
289 let batch = result_batches.into_iter().next().unwrap();
290 RecordBatch::try_new(Arc::clone(&combined_schema), batch.columns().to_vec()).map_err(
292 |e| {
293 Error::Internal(format!(
294 "failed to create batch with qualified names: {}",
295 e
296 ))
297 },
298 )?
299 } else {
300 let original_batch = concat_batches(&result_batches[0].schema(), &result_batches)
302 .map_err(|e| Error::Internal(format!("failed to concatenate batches: {}", e)))?;
303 RecordBatch::try_new(
305 Arc::clone(&combined_schema),
306 original_batch.columns().to_vec(),
307 )
308 .map_err(|e| {
309 Error::Internal(format!(
310 "failed to create batch with qualified names: {}",
311 e
312 ))
313 })?
314 };
315
316 if !plan.projections.is_empty() {
318 let mut selected_fields = Vec::new();
319 let mut selected_columns = Vec::new();
320
321 for proj in &plan.projections {
322 match proj {
323 SelectProjection::AllColumns => {
324 selected_fields = combined_schema.fields().iter().cloned().collect();
326 selected_columns = combined_batch.columns().to_vec();
327 break;
328 }
329 SelectProjection::AllColumnsExcept { exclude } => {
330 let exclude_lower: Vec<String> =
332 exclude.iter().map(|e| e.to_ascii_lowercase()).collect();
333
334 for (idx, field) in combined_schema.fields().iter().enumerate() {
335 let field_name_lower = field.name().to_ascii_lowercase();
336 if !exclude_lower.contains(&field_name_lower) {
337 selected_fields.push(field.clone());
338 selected_columns.push(combined_batch.column(idx).clone());
339 }
340 }
341 break;
342 }
343 SelectProjection::Column { name, alias } => {
344 let col_name = name.to_ascii_lowercase();
346 if let Some((idx, field)) = combined_schema
347 .fields()
348 .iter()
349 .enumerate()
350 .find(|(_, f)| f.name().to_ascii_lowercase() == col_name)
351 {
352 let output_name = alias.as_ref().unwrap_or(name).clone();
353 selected_fields.push(Arc::new(arrow::datatypes::Field::new(
354 output_name,
355 field.data_type().clone(),
356 field.is_nullable(),
357 )));
358 selected_columns.push(combined_batch.column(idx).clone());
359 } else {
360 return Err(Error::InvalidArgumentError(format!(
361 "column '{}' not found in cross product result",
362 name
363 )));
364 }
365 }
366 SelectProjection::Computed { expr, alias } => {
367 if let ScalarExpr::Column(col_name) = expr {
369 let col_name_lower = col_name.to_ascii_lowercase();
370 if let Some((idx, field)) = combined_schema
371 .fields()
372 .iter()
373 .enumerate()
374 .find(|(_, f)| f.name().to_ascii_lowercase() == col_name_lower)
375 {
376 selected_fields.push(Arc::new(arrow::datatypes::Field::new(
377 alias.clone(),
378 field.data_type().clone(),
379 field.is_nullable(),
380 )));
381 selected_columns.push(combined_batch.column(idx).clone());
382 } else {
383 return Err(Error::InvalidArgumentError(format!(
384 "column '{}' not found in cross product result",
385 col_name
386 )));
387 }
388 } else {
389 return Err(Error::InvalidArgumentError(
390 "complex computed projections not yet supported in cross products"
391 .into(),
392 ));
393 }
394 }
395 }
396 }
397
398 let projected_schema = Arc::new(Schema::new(selected_fields));
399 combined_batch = RecordBatch::try_new(projected_schema, selected_columns)
400 .map_err(|e| Error::Internal(format!("failed to apply projections: {}", e)))?;
401 }
402
403 Ok(SelectExecution::new_single_batch(
404 format!(
405 "{},{}",
406 left_ref.qualified_name(),
407 right_ref.qualified_name()
408 ),
409 combined_batch.schema(),
410 combined_batch,
411 ))
412 }
413
414 fn execute_projection(
415 &self,
416 table: Arc<ExecutorTable<P>>,
417 display_name: String,
418 plan: SelectPlan,
419 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
420 ) -> ExecutorResult<SelectExecution<P>> {
421 let table_ref = table.as_ref();
422 let projections = if plan.projections.is_empty() {
423 build_wildcard_projections(table_ref)
424 } else {
425 build_projected_columns(table_ref, &plan.projections)?
426 };
427 let schema = schema_for_projections(table_ref, &projections)?;
428
429 let (filter_expr, full_table_scan) = match plan.filter {
430 Some(expr) => (translate_predicate(expr, table_ref.schema.as_ref())?, false),
431 None => {
432 let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
433 Error::InvalidArgumentError(
434 "table has no columns; cannot perform wildcard scan".into(),
435 )
436 })?;
437 (full_table_scan_filter(field_id), true)
438 }
439 };
440
441 let expanded_order = expand_order_targets(&plan.order_by, &projections)?;
442 let physical_order = if let Some(first) = expanded_order.first() {
443 Some(resolve_scan_order(table_ref, &projections, first)?)
444 } else {
445 None
446 };
447
448 let options = if let Some(order_spec) = physical_order {
449 if row_filter.is_some() {
450 tracing::debug!("Applying MVCC row filter with ORDER BY");
451 }
452 ScanStreamOptions {
453 include_nulls: true,
454 order: Some(order_spec),
455 row_id_filter: row_filter.clone(),
456 }
457 } else {
458 if row_filter.is_some() {
459 tracing::debug!("Applying MVCC row filter");
460 }
461 ScanStreamOptions {
462 include_nulls: true,
463 order: None,
464 row_id_filter: row_filter.clone(),
465 }
466 };
467
468 Ok(SelectExecution::new_projection(
469 display_name,
470 schema,
471 table,
472 projections,
473 filter_expr,
474 options,
475 full_table_scan,
476 expanded_order,
477 ))
478 }
479
480 fn execute_aggregates(
481 &self,
482 table: Arc<ExecutorTable<P>>,
483 display_name: String,
484 plan: SelectPlan,
485 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
486 ) -> ExecutorResult<SelectExecution<P>> {
487 let table_ref = table.as_ref();
488 let mut specs: Vec<AggregateSpec> = Vec::with_capacity(plan.aggregates.len());
489 for aggregate in plan.aggregates {
490 match aggregate {
491 AggregateExpr::CountStar { alias } => {
492 specs.push(AggregateSpec {
493 alias,
494 kind: AggregateKind::CountStar,
495 });
496 }
497 AggregateExpr::Column {
498 column,
499 alias,
500 function,
501 distinct,
502 } => {
503 let col = table_ref.schema.resolve(&column).ok_or_else(|| {
504 Error::InvalidArgumentError(format!(
505 "unknown column '{}' in aggregate",
506 column
507 ))
508 })?;
509 let kind = match function {
510 AggregateFunction::Count => {
511 if distinct {
512 AggregateKind::CountDistinctField {
513 field_id: col.field_id,
514 }
515 } else {
516 AggregateKind::CountField {
517 field_id: col.field_id,
518 }
519 }
520 }
521 AggregateFunction::SumInt64 => {
522 if col.data_type != DataType::Int64 {
523 return Err(Error::InvalidArgumentError(
524 "SUM currently supports only INTEGER columns".into(),
525 ));
526 }
527 AggregateKind::SumInt64 {
528 field_id: col.field_id,
529 }
530 }
531 AggregateFunction::MinInt64 => {
532 if col.data_type != DataType::Int64 {
533 return Err(Error::InvalidArgumentError(
534 "MIN currently supports only INTEGER columns".into(),
535 ));
536 }
537 AggregateKind::MinInt64 {
538 field_id: col.field_id,
539 }
540 }
541 AggregateFunction::MaxInt64 => {
542 if col.data_type != DataType::Int64 {
543 return Err(Error::InvalidArgumentError(
544 "MAX currently supports only INTEGER columns".into(),
545 ));
546 }
547 AggregateKind::MaxInt64 {
548 field_id: col.field_id,
549 }
550 }
551 AggregateFunction::CountNulls => {
552 if distinct {
553 return Err(Error::InvalidArgumentError(
554 "DISTINCT is not supported for COUNT_NULLS".into(),
555 ));
556 }
557 AggregateKind::CountNulls {
558 field_id: col.field_id,
559 }
560 }
561 };
562 specs.push(AggregateSpec { alias, kind });
563 }
564 }
565 }
566
567 if specs.is_empty() {
568 return Err(Error::InvalidArgumentError(
569 "aggregate query requires at least one aggregate expression".into(),
570 ));
571 }
572
573 let had_filter = plan.filter.is_some();
574 let filter_expr = match plan.filter {
575 Some(expr) => translate_predicate(expr, table.schema.as_ref())?,
576 None => {
577 let field_id = table.schema.first_field_id().ok_or_else(|| {
578 Error::InvalidArgumentError(
579 "table has no columns; cannot perform aggregate scan".into(),
580 )
581 })?;
582 full_table_scan_filter(field_id)
583 }
584 };
585
586 let mut projections = Vec::new();
588 let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
589
590 for spec in &specs {
591 if let Some(field_id) = spec.kind.field_id() {
592 let proj_idx = projections.len();
593 spec_to_projection.push(Some(proj_idx));
594 projections.push(ScanProjection::from(StoreProjection::with_alias(
595 LogicalFieldId::for_user(table.table.table_id(), field_id),
596 table
597 .schema
598 .column_by_field_id(field_id)
599 .map(|c| c.name.clone())
600 .unwrap_or_else(|| format!("col{field_id}")),
601 )));
602 } else {
603 spec_to_projection.push(None);
604 }
605 }
606
607 if projections.is_empty() {
608 let field_id = table.schema.first_field_id().ok_or_else(|| {
609 Error::InvalidArgumentError(
610 "table has no columns; cannot perform aggregate scan".into(),
611 )
612 })?;
613 projections.push(ScanProjection::from(StoreProjection::with_alias(
614 LogicalFieldId::for_user(table.table.table_id(), field_id),
615 table
616 .schema
617 .column_by_field_id(field_id)
618 .map(|c| c.name.clone())
619 .unwrap_or_else(|| format!("col{field_id}")),
620 )));
621 }
622
623 let options = ScanStreamOptions {
624 include_nulls: true,
625 order: None,
626 row_id_filter: row_filter.clone(),
627 };
628
629 let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
630 let mut count_star_override: Option<i64> = None;
634 if !had_filter && row_filter.is_none() {
635 let total_rows = table.total_rows.load(Ordering::SeqCst);
637 tracing::debug!(
638 "[AGGREGATE] Using COUNT(*) shortcut: total_rows={}",
639 total_rows
640 );
641 if total_rows > i64::MAX as u64 {
642 return Err(Error::InvalidArgumentError(
643 "COUNT(*) result exceeds supported range".into(),
644 ));
645 }
646 count_star_override = Some(total_rows as i64);
647 } else {
648 tracing::debug!(
649 "[AGGREGATE] NOT using COUNT(*) shortcut: had_filter={}, has_row_filter={}",
650 had_filter,
651 row_filter.is_some()
652 );
653 }
654
655 for (idx, spec) in specs.iter().enumerate() {
656 states.push(AggregateState {
657 alias: spec.alias.clone(),
658 accumulator: AggregateAccumulator::new_with_projection_index(
659 spec,
660 spec_to_projection[idx],
661 count_star_override,
662 )?,
663 override_value: match spec.kind {
664 AggregateKind::CountStar => {
665 tracing::debug!(
666 "[AGGREGATE] CountStar override_value={:?}",
667 count_star_override
668 );
669 count_star_override
670 }
671 _ => None,
672 },
673 });
674 }
675
676 let mut error: Option<Error> = None;
677 match table.table.scan_stream(
678 projections,
679 &filter_expr,
680 ScanStreamOptions {
681 row_id_filter: row_filter.clone(),
682 ..options
683 },
684 |batch| {
685 if error.is_some() {
686 return;
687 }
688 for state in &mut states {
689 if let Err(err) = state.update(&batch) {
690 error = Some(err);
691 return;
692 }
693 }
694 },
695 ) {
696 Ok(()) => {}
697 Err(llkv_result::Error::NotFound) => {
698 }
701 Err(err) => return Err(err),
702 }
703 if let Some(err) = error {
704 return Err(err);
705 }
706
707 let mut fields = Vec::with_capacity(states.len());
708 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(states.len());
709 for state in states {
710 let (field, array) = state.finalize()?;
711 fields.push(field);
712 arrays.push(array);
713 }
714
715 let schema = Arc::new(Schema::new(fields));
716 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
717 Ok(SelectExecution::new_single_batch(
718 display_name,
719 schema,
720 batch,
721 ))
722 }
723
724 fn execute_computed_aggregates(
727 &self,
728 table: Arc<ExecutorTable<P>>,
729 display_name: String,
730 plan: SelectPlan,
731 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
732 ) -> ExecutorResult<SelectExecution<P>> {
733 use arrow::array::Int64Array;
734 use llkv_expr::expr::AggregateCall;
735
736 let table_ref = table.as_ref();
737
738 let mut aggregate_specs: Vec<(String, AggregateCall<String>)> = Vec::new();
740 for proj in &plan.projections {
741 if let SelectProjection::Computed { expr, .. } = proj {
742 Self::collect_aggregates(expr, &mut aggregate_specs);
743 }
744 }
745
746 let computed_aggregates = self.compute_aggregate_values(
748 table.clone(),
749 &plan.filter,
750 &aggregate_specs,
751 row_filter.clone(),
752 )?;
753
754 let mut fields = Vec::with_capacity(plan.projections.len());
756 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(plan.projections.len());
757
758 for proj in &plan.projections {
759 match proj {
760 SelectProjection::AllColumns | SelectProjection::AllColumnsExcept { .. } => {
761 return Err(Error::InvalidArgumentError(
762 "Wildcard projections not supported with computed aggregates".into(),
763 ));
764 }
765 SelectProjection::Column { name, alias } => {
766 let col = table_ref.schema.resolve(name).ok_or_else(|| {
767 Error::InvalidArgumentError(format!("unknown column '{}'", name))
768 })?;
769 let field_name = alias.as_ref().unwrap_or(name);
770 fields.push(arrow::datatypes::Field::new(
771 field_name,
772 col.data_type.clone(),
773 col.nullable,
774 ));
775 return Err(Error::InvalidArgumentError(
778 "Regular columns not supported in aggregate queries without GROUP BY"
779 .into(),
780 ));
781 }
782 SelectProjection::Computed { expr, alias } => {
783 let value = Self::evaluate_expr_with_aggregates(expr, &computed_aggregates)?;
785
786 fields.push(arrow::datatypes::Field::new(alias, DataType::Int64, false));
787
788 let array = Arc::new(Int64Array::from(vec![value])) as ArrayRef;
789 arrays.push(array);
790 }
791 }
792 }
793
794 let schema = Arc::new(Schema::new(fields));
795 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
796 Ok(SelectExecution::new_single_batch(
797 display_name,
798 schema,
799 batch,
800 ))
801 }
802
803 fn collect_aggregates(
805 expr: &ScalarExpr<String>,
806 aggregates: &mut Vec<(String, llkv_expr::expr::AggregateCall<String>)>,
807 ) {
808 match expr {
809 ScalarExpr::Aggregate(agg) => {
810 let key = format!("{:?}", agg);
812 if !aggregates.iter().any(|(k, _)| k == &key) {
813 aggregates.push((key, agg.clone()));
814 }
815 }
816 ScalarExpr::Binary { left, right, .. } => {
817 Self::collect_aggregates(left, aggregates);
818 Self::collect_aggregates(right, aggregates);
819 }
820 ScalarExpr::GetField { base, .. } => {
821 Self::collect_aggregates(base, aggregates);
822 }
823 ScalarExpr::Column(_) | ScalarExpr::Literal(_) => {}
824 }
825 }
826
827 fn compute_aggregate_values(
829 &self,
830 table: Arc<ExecutorTable<P>>,
831 filter: &Option<llkv_expr::expr::Expr<'static, String>>,
832 aggregate_specs: &[(String, llkv_expr::expr::AggregateCall<String>)],
833 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
834 ) -> ExecutorResult<FxHashMap<String, i64>> {
835 use llkv_expr::expr::AggregateCall;
836
837 let table_ref = table.as_ref();
838 let mut results =
839 FxHashMap::with_capacity_and_hasher(aggregate_specs.len(), Default::default());
840
841 let mut specs: Vec<AggregateSpec> = Vec::new();
843 for (key, agg) in aggregate_specs {
844 let kind = match agg {
845 AggregateCall::CountStar => AggregateKind::CountStar,
846 AggregateCall::Count(col_name) => {
847 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
848 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
849 })?;
850 AggregateKind::CountField {
851 field_id: col.field_id,
852 }
853 }
854 AggregateCall::Sum(col_name) => {
855 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
856 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
857 })?;
858 AggregateKind::SumInt64 {
859 field_id: col.field_id,
860 }
861 }
862 AggregateCall::Min(col_name) => {
863 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
864 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
865 })?;
866 AggregateKind::MinInt64 {
867 field_id: col.field_id,
868 }
869 }
870 AggregateCall::Max(col_name) => {
871 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
872 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
873 })?;
874 AggregateKind::MaxInt64 {
875 field_id: col.field_id,
876 }
877 }
878 AggregateCall::CountNulls(col_name) => {
879 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
880 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
881 })?;
882 AggregateKind::CountNulls {
883 field_id: col.field_id,
884 }
885 }
886 };
887 specs.push(AggregateSpec {
888 alias: key.clone(),
889 kind,
890 });
891 }
892
893 let filter_expr = match filter {
895 Some(expr) => translate_predicate(expr.clone(), table_ref.schema.as_ref())?,
896 None => {
897 let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
898 Error::InvalidArgumentError(
899 "table has no columns; cannot perform aggregate scan".into(),
900 )
901 })?;
902 full_table_scan_filter(field_id)
903 }
904 };
905
906 let mut projections: Vec<ScanProjection> = Vec::new();
907 let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
908 let count_star_override: Option<i64> = None;
909
910 for spec in &specs {
911 if let Some(field_id) = spec.kind.field_id() {
912 spec_to_projection.push(Some(projections.len()));
913 projections.push(ScanProjection::from(StoreProjection::with_alias(
914 LogicalFieldId::for_user(table.table.table_id(), field_id),
915 table
916 .schema
917 .column_by_field_id(field_id)
918 .map(|c| c.name.clone())
919 .unwrap_or_else(|| format!("col{field_id}")),
920 )));
921 } else {
922 spec_to_projection.push(None);
923 }
924 }
925
926 if projections.is_empty() {
927 let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
928 Error::InvalidArgumentError(
929 "table has no columns; cannot perform aggregate scan".into(),
930 )
931 })?;
932 projections.push(ScanProjection::from(StoreProjection::with_alias(
933 LogicalFieldId::for_user(table.table.table_id(), field_id),
934 table
935 .schema
936 .column_by_field_id(field_id)
937 .map(|c| c.name.clone())
938 .unwrap_or_else(|| format!("col{field_id}")),
939 )));
940 }
941
942 let base_options = ScanStreamOptions {
943 include_nulls: true,
944 order: None,
945 row_id_filter: None,
946 };
947
948 let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
949 for (idx, spec) in specs.iter().enumerate() {
950 states.push(AggregateState {
951 alias: spec.alias.clone(),
952 accumulator: AggregateAccumulator::new_with_projection_index(
953 spec,
954 spec_to_projection[idx],
955 count_star_override,
956 )?,
957 override_value: match spec.kind {
958 AggregateKind::CountStar => count_star_override,
959 _ => None,
960 },
961 });
962 }
963
964 let mut error: Option<Error> = None;
965 match table.table.scan_stream(
966 projections,
967 &filter_expr,
968 ScanStreamOptions {
969 row_id_filter: row_filter.clone(),
970 ..base_options
971 },
972 |batch| {
973 if error.is_some() {
974 return;
975 }
976 for state in &mut states {
977 if let Err(err) = state.update(&batch) {
978 error = Some(err);
979 return;
980 }
981 }
982 },
983 ) {
984 Ok(()) => {}
985 Err(llkv_result::Error::NotFound) => {}
986 Err(err) => return Err(err),
987 }
988 if let Some(err) = error {
989 return Err(err);
990 }
991
992 for state in states {
994 let alias = state.alias.clone();
995 let (_field, array) = state.finalize()?;
996
997 let int64_array = array
999 .as_any()
1000 .downcast_ref::<arrow::array::Int64Array>()
1001 .ok_or_else(|| Error::Internal("Expected Int64Array from aggregate".into()))?;
1002
1003 if int64_array.len() != 1 {
1004 return Err(Error::Internal(format!(
1005 "Expected single value from aggregate, got {}",
1006 int64_array.len()
1007 )));
1008 }
1009
1010 let value = if int64_array.is_null(0) {
1011 0
1012 } else {
1013 int64_array.value(0)
1014 };
1015
1016 results.insert(alias, value);
1017 }
1018
1019 Ok(results)
1020 }
1021
1022 fn evaluate_expr_with_aggregates(
1024 expr: &ScalarExpr<String>,
1025 aggregates: &FxHashMap<String, i64>,
1026 ) -> ExecutorResult<i64> {
1027 use llkv_expr::expr::BinaryOp;
1028 use llkv_expr::literal::Literal;
1029
1030 match expr {
1031 ScalarExpr::Literal(Literal::Integer(v)) => Ok(*v as i64),
1032 ScalarExpr::Literal(Literal::Float(v)) => Ok(*v as i64),
1033 ScalarExpr::Literal(Literal::Boolean(v)) => Ok(if *v { 1 } else { 0 }),
1034 ScalarExpr::Literal(Literal::String(_)) => Err(Error::InvalidArgumentError(
1035 "String literals not supported in aggregate expressions".into(),
1036 )),
1037 ScalarExpr::Literal(Literal::Null) => Err(Error::InvalidArgumentError(
1038 "NULL literals not supported in aggregate expressions".into(),
1039 )),
1040 ScalarExpr::Literal(Literal::Struct(_)) => Err(Error::InvalidArgumentError(
1041 "Struct literals not supported in aggregate expressions".into(),
1042 )),
1043 ScalarExpr::Column(_) => Err(Error::InvalidArgumentError(
1044 "Column references not supported in aggregate-only expressions".into(),
1045 )),
1046 ScalarExpr::Aggregate(agg) => {
1047 let key = format!("{:?}", agg);
1048 aggregates.get(&key).copied().ok_or_else(|| {
1049 Error::Internal(format!("Aggregate value not found for key: {}", key))
1050 })
1051 }
1052 ScalarExpr::Binary { left, op, right } => {
1053 let left_val = Self::evaluate_expr_with_aggregates(left, aggregates)?;
1054 let right_val = Self::evaluate_expr_with_aggregates(right, aggregates)?;
1055
1056 let result = match op {
1057 BinaryOp::Add => left_val.checked_add(right_val),
1058 BinaryOp::Subtract => left_val.checked_sub(right_val),
1059 BinaryOp::Multiply => left_val.checked_mul(right_val),
1060 BinaryOp::Divide => {
1061 if right_val == 0 {
1062 return Err(Error::InvalidArgumentError("Division by zero".into()));
1063 }
1064 left_val.checked_div(right_val)
1065 }
1066 BinaryOp::Modulo => {
1067 if right_val == 0 {
1068 return Err(Error::InvalidArgumentError("Modulo by zero".into()));
1069 }
1070 left_val.checked_rem(right_val)
1071 }
1072 };
1073
1074 result.ok_or_else(|| {
1075 Error::InvalidArgumentError("Arithmetic overflow in expression".into())
1076 })
1077 }
1078 ScalarExpr::GetField { .. } => Err(Error::InvalidArgumentError(
1079 "GetField not supported in aggregate-only expressions".into(),
1080 )),
1081 }
1082 }
1083}
1084
1085#[derive(Clone)]
1087pub struct SelectExecution<P>
1088where
1089 P: Pager<Blob = EntryHandle> + Send + Sync,
1090{
1091 table_name: String,
1092 schema: Arc<Schema>,
1093 stream: SelectStream<P>,
1094}
1095
1096#[derive(Clone)]
1097enum SelectStream<P>
1098where
1099 P: Pager<Blob = EntryHandle> + Send + Sync,
1100{
1101 Projection {
1102 table: Arc<ExecutorTable<P>>,
1103 projections: Vec<ScanProjection>,
1104 filter_expr: LlkvExpr<'static, FieldId>,
1105 options: ScanStreamOptions<P>,
1106 full_table_scan: bool,
1107 order_by: Vec<OrderByPlan>,
1108 },
1109 Aggregation {
1110 batch: RecordBatch,
1111 },
1112}
1113
1114impl<P> SelectExecution<P>
1115where
1116 P: Pager<Blob = EntryHandle> + Send + Sync,
1117{
1118 #[allow(clippy::too_many_arguments)]
1119 fn new_projection(
1120 table_name: String,
1121 schema: Arc<Schema>,
1122 table: Arc<ExecutorTable<P>>,
1123 projections: Vec<ScanProjection>,
1124 filter_expr: LlkvExpr<'static, FieldId>,
1125 options: ScanStreamOptions<P>,
1126 full_table_scan: bool,
1127 order_by: Vec<OrderByPlan>,
1128 ) -> Self {
1129 Self {
1130 table_name,
1131 schema,
1132 stream: SelectStream::Projection {
1133 table,
1134 projections,
1135 filter_expr,
1136 options,
1137 full_table_scan,
1138 order_by,
1139 },
1140 }
1141 }
1142
1143 pub fn new_single_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
1144 Self {
1145 table_name,
1146 schema,
1147 stream: SelectStream::Aggregation { batch },
1148 }
1149 }
1150
1151 pub fn from_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
1152 Self::new_single_batch(table_name, schema, batch)
1153 }
1154
1155 pub fn table_name(&self) -> &str {
1156 &self.table_name
1157 }
1158
1159 pub fn schema(&self) -> Arc<Schema> {
1160 Arc::clone(&self.schema)
1161 }
1162
1163 pub fn stream(
1164 self,
1165 mut on_batch: impl FnMut(RecordBatch) -> ExecutorResult<()>,
1166 ) -> ExecutorResult<()> {
1167 let schema = Arc::clone(&self.schema);
1168 match self.stream {
1169 SelectStream::Projection {
1170 table,
1171 projections,
1172 filter_expr,
1173 options,
1174 full_table_scan,
1175 order_by,
1176 } => {
1177 let total_rows = table.total_rows.load(Ordering::SeqCst);
1179 if total_rows == 0 {
1180 return Ok(());
1182 }
1183
1184 let mut error: Option<Error> = None;
1185 let mut produced = false;
1186 let mut produced_rows: u64 = 0;
1187 let capture_nulls_first = matches!(options.order, Some(spec) if spec.nulls_first);
1188 let needs_post_sort = order_by.len() > 1;
1189 let collect_batches = needs_post_sort || capture_nulls_first;
1190 let include_nulls = options.include_nulls;
1191 let has_row_id_filter = options.row_id_filter.is_some();
1192 let scan_options = options;
1193 let mut buffered_batches: Vec<RecordBatch> = Vec::new();
1194 table
1195 .table
1196 .scan_stream(projections, &filter_expr, scan_options, |batch| {
1197 if error.is_some() {
1198 return;
1199 }
1200 produced = true;
1201 produced_rows = produced_rows.saturating_add(batch.num_rows() as u64);
1202 if collect_batches {
1203 buffered_batches.push(batch);
1204 } else if let Err(err) = on_batch(batch) {
1205 error = Some(err);
1206 }
1207 })?;
1208 if let Some(err) = error {
1209 return Err(err);
1210 }
1211 if !produced {
1212 if total_rows > 0 {
1213 for batch in synthesize_null_scan(Arc::clone(&schema), total_rows)? {
1214 on_batch(batch)?;
1215 }
1216 }
1217 return Ok(());
1218 }
1219 let mut null_batches: Vec<RecordBatch> = Vec::new();
1220 if include_nulls
1226 && full_table_scan
1227 && produced_rows < total_rows
1228 && !has_row_id_filter
1229 {
1230 let missing = total_rows - produced_rows;
1231 if missing > 0 {
1232 null_batches = synthesize_null_scan(Arc::clone(&schema), missing)?;
1233 }
1234 }
1235
1236 if collect_batches {
1237 if needs_post_sort {
1238 if !null_batches.is_empty() {
1239 buffered_batches.extend(null_batches);
1240 }
1241 if !buffered_batches.is_empty() {
1242 let combined =
1243 concat_batches(&schema, &buffered_batches).map_err(|err| {
1244 Error::InvalidArgumentError(format!(
1245 "failed to concatenate result batches for ORDER BY: {}",
1246 err
1247 ))
1248 })?;
1249 let sorted_batch =
1250 sort_record_batch_with_order(&schema, &combined, &order_by)?;
1251 on_batch(sorted_batch)?;
1252 }
1253 } else if capture_nulls_first {
1254 for batch in null_batches {
1255 on_batch(batch)?;
1256 }
1257 for batch in buffered_batches {
1258 on_batch(batch)?;
1259 }
1260 }
1261 } else if !null_batches.is_empty() {
1262 for batch in null_batches {
1263 on_batch(batch)?;
1264 }
1265 }
1266 Ok(())
1267 }
1268 SelectStream::Aggregation { batch } => on_batch(batch),
1269 }
1270 }
1271
1272 pub fn collect(self) -> ExecutorResult<Vec<RecordBatch>> {
1273 let mut batches = Vec::new();
1274 self.stream(|batch| {
1275 batches.push(batch);
1276 Ok(())
1277 })?;
1278 Ok(batches)
1279 }
1280
1281 pub fn collect_rows(self) -> ExecutorResult<RowBatch> {
1282 let schema = self.schema();
1283 let mut rows: Vec<Vec<PlanValue>> = Vec::new();
1284 self.stream(|batch| {
1285 for row_idx in 0..batch.num_rows() {
1286 let mut row: Vec<PlanValue> = Vec::with_capacity(batch.num_columns());
1287 for col_idx in 0..batch.num_columns() {
1288 let value = llkv_plan::plan_value_from_array(batch.column(col_idx), row_idx)?;
1289 row.push(value);
1290 }
1291 rows.push(row);
1292 }
1293 Ok(())
1294 })?;
1295 let columns = schema
1296 .fields()
1297 .iter()
1298 .map(|field| field.name().to_string())
1299 .collect();
1300 Ok(RowBatch { columns, rows })
1301 }
1302
1303 pub fn into_rows(self) -> ExecutorResult<Vec<Vec<PlanValue>>> {
1304 Ok(self.collect_rows()?.rows)
1305 }
1306}
1307
1308impl<P> fmt::Debug for SelectExecution<P>
1309where
1310 P: Pager<Blob = EntryHandle> + Send + Sync,
1311{
1312 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1313 f.debug_struct("SelectExecution")
1314 .field("table_name", &self.table_name)
1315 .field("schema", &self.schema)
1316 .finish()
1317 }
1318}
1319
1320pub struct ExecutorTable<P>
1321where
1322 P: Pager<Blob = EntryHandle> + Send + Sync,
1323{
1324 pub table: Arc<llkv_table::table::Table<P>>,
1326 pub schema: Arc<ExecutorSchema>,
1327 pub next_row_id: AtomicU64,
1328 pub total_rows: AtomicU64,
1329 pub multi_column_uniques: RwLock<Vec<ExecutorMultiColumnUnique>>,
1330}
1331
1332pub struct ExecutorSchema {
1333 pub columns: Vec<ExecutorColumn>,
1334 pub lookup: FxHashMap<String, usize>,
1335}
1336
1337impl ExecutorSchema {
1338 pub fn resolve(&self, name: &str) -> Option<&ExecutorColumn> {
1339 let normalized = name.to_ascii_lowercase();
1340 self.lookup
1341 .get(&normalized)
1342 .and_then(|idx| self.columns.get(*idx))
1343 }
1344
1345 pub fn first_field_id(&self) -> Option<FieldId> {
1346 self.columns.first().map(|col| col.field_id)
1347 }
1348
1349 pub fn column_by_field_id(&self, field_id: FieldId) -> Option<&ExecutorColumn> {
1350 self.columns.iter().find(|col| col.field_id == field_id)
1351 }
1352}
1353
1354#[derive(Clone)]
1355pub struct ExecutorColumn {
1356 pub name: String,
1357 pub data_type: DataType,
1358 pub nullable: bool,
1359 pub primary_key: bool,
1360 pub unique: bool,
1361 pub field_id: FieldId,
1362 pub check_expr: Option<String>,
1363}
1364
1365#[derive(Clone, Debug, PartialEq, Eq)]
1366pub struct ExecutorMultiColumnUnique {
1367 pub index_name: Option<String>,
1368 pub column_indices: Vec<usize>,
1369}
1370
1371impl<P> ExecutorTable<P>
1372where
1373 P: Pager<Blob = EntryHandle> + Send + Sync,
1374{
1375 pub fn multi_column_uniques(&self) -> Vec<ExecutorMultiColumnUnique> {
1376 self.multi_column_uniques.read().unwrap().clone()
1377 }
1378
1379 pub fn set_multi_column_uniques(&self, uniques: Vec<ExecutorMultiColumnUnique>) {
1380 *self.multi_column_uniques.write().unwrap() = uniques;
1381 }
1382
1383 pub fn add_multi_column_unique(&self, unique: ExecutorMultiColumnUnique) {
1384 let mut guard = self.multi_column_uniques.write().unwrap();
1385 if !guard
1386 .iter()
1387 .any(|existing| existing.column_indices == unique.column_indices)
1388 {
1389 guard.push(unique);
1390 }
1391 }
1392}
1393
1394pub struct RowBatch {
1403 pub columns: Vec<String>,
1404 pub rows: Vec<Vec<PlanValue>>,
1405}
1406
1407fn expand_order_targets(
1408 order_items: &[OrderByPlan],
1409 projections: &[ScanProjection],
1410) -> ExecutorResult<Vec<OrderByPlan>> {
1411 let mut expanded = Vec::new();
1412
1413 for item in order_items {
1414 match &item.target {
1415 OrderTarget::All => {
1416 if projections.is_empty() {
1417 return Err(Error::InvalidArgumentError(
1418 "ORDER BY ALL requires at least one projection".into(),
1419 ));
1420 }
1421
1422 for (idx, projection) in projections.iter().enumerate() {
1423 if matches!(projection, ScanProjection::Computed { .. }) {
1424 return Err(Error::InvalidArgumentError(
1425 "ORDER BY ALL cannot reference computed projections".into(),
1426 ));
1427 }
1428
1429 let mut clone = item.clone();
1430 clone.target = OrderTarget::Index(idx);
1431 expanded.push(clone);
1432 }
1433 }
1434 _ => expanded.push(item.clone()),
1435 }
1436 }
1437
1438 Ok(expanded)
1439}
1440
1441fn resolve_scan_order<P>(
1442 table: &ExecutorTable<P>,
1443 projections: &[ScanProjection],
1444 order_plan: &OrderByPlan,
1445) -> ExecutorResult<ScanOrderSpec>
1446where
1447 P: Pager<Blob = EntryHandle> + Send + Sync,
1448{
1449 let (column, field_id) = match &order_plan.target {
1450 OrderTarget::Column(name) => {
1451 let column = table.schema.resolve(name).ok_or_else(|| {
1452 Error::InvalidArgumentError(format!("unknown column '{}' in ORDER BY", name))
1453 })?;
1454 (column, column.field_id)
1455 }
1456 OrderTarget::Index(position) => {
1457 let projection = projections.get(*position).ok_or_else(|| {
1458 Error::InvalidArgumentError(format!(
1459 "ORDER BY position {} is out of range",
1460 position + 1
1461 ))
1462 })?;
1463 match projection {
1464 ScanProjection::Column(store_projection) => {
1465 let field_id = store_projection.logical_field_id.field_id();
1466 let column = table.schema.column_by_field_id(field_id).ok_or_else(|| {
1467 Error::InvalidArgumentError(format!(
1468 "unknown column with field id {field_id} in ORDER BY"
1469 ))
1470 })?;
1471 (column, field_id)
1472 }
1473 ScanProjection::Computed { .. } => {
1474 return Err(Error::InvalidArgumentError(
1475 "ORDER BY position referring to computed projection is not supported"
1476 .into(),
1477 ));
1478 }
1479 }
1480 }
1481 OrderTarget::All => {
1482 return Err(Error::InvalidArgumentError(
1483 "ORDER BY ALL should be expanded before execution".into(),
1484 ));
1485 }
1486 };
1487
1488 let transform = match order_plan.sort_type {
1489 OrderSortType::Native => match column.data_type {
1490 DataType::Int64 => ScanOrderTransform::IdentityInteger,
1491 DataType::Utf8 => ScanOrderTransform::IdentityUtf8,
1492 ref other => {
1493 return Err(Error::InvalidArgumentError(format!(
1494 "ORDER BY on column type {:?} is not supported",
1495 other
1496 )));
1497 }
1498 },
1499 OrderSortType::CastTextToInteger => {
1500 if column.data_type != DataType::Utf8 {
1501 return Err(Error::InvalidArgumentError(
1502 "ORDER BY CAST expects a text column".into(),
1503 ));
1504 }
1505 ScanOrderTransform::CastUtf8ToInteger
1506 }
1507 };
1508
1509 let direction = if order_plan.ascending {
1510 ScanOrderDirection::Ascending
1511 } else {
1512 ScanOrderDirection::Descending
1513 };
1514
1515 Ok(ScanOrderSpec {
1516 field_id,
1517 direction,
1518 nulls_first: order_plan.nulls_first,
1519 transform,
1520 })
1521}
1522
1523fn full_table_scan_filter(field_id: FieldId) -> LlkvExpr<'static, FieldId> {
1524 LlkvExpr::Pred(Filter {
1525 field_id,
1526 op: Operator::Range {
1527 lower: Bound::Unbounded,
1528 upper: Bound::Unbounded,
1529 },
1530 })
1531}
1532
1533fn synthesize_null_scan(schema: Arc<Schema>, total_rows: u64) -> ExecutorResult<Vec<RecordBatch>> {
1534 let row_count = usize::try_from(total_rows).map_err(|_| {
1535 Error::InvalidArgumentError("table row count exceeds supported in-memory batch size".into())
1536 })?;
1537
1538 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1539 for field in schema.fields() {
1540 match field.data_type() {
1541 DataType::Int64 => {
1542 let mut builder = Int64Builder::with_capacity(row_count);
1543 for _ in 0..row_count {
1544 builder.append_null();
1545 }
1546 arrays.push(Arc::new(builder.finish()));
1547 }
1548 DataType::Float64 => {
1549 let mut builder = arrow::array::Float64Builder::with_capacity(row_count);
1550 for _ in 0..row_count {
1551 builder.append_null();
1552 }
1553 arrays.push(Arc::new(builder.finish()));
1554 }
1555 DataType::Utf8 => {
1556 let mut builder = arrow::array::StringBuilder::with_capacity(row_count, 0);
1557 for _ in 0..row_count {
1558 builder.append_null();
1559 }
1560 arrays.push(Arc::new(builder.finish()));
1561 }
1562 DataType::Date32 => {
1563 let mut builder = arrow::array::Date32Builder::with_capacity(row_count);
1564 for _ in 0..row_count {
1565 builder.append_null();
1566 }
1567 arrays.push(Arc::new(builder.finish()));
1568 }
1569 other => {
1570 return Err(Error::InvalidArgumentError(format!(
1571 "unsupported data type in null synthesis: {other:?}"
1572 )));
1573 }
1574 }
1575 }
1576
1577 let batch = RecordBatch::try_new(schema, arrays)?;
1578 Ok(vec![batch])
1579}
1580
1581fn sort_record_batch_with_order(
1582 schema: &Arc<Schema>,
1583 batch: &RecordBatch,
1584 order_by: &[OrderByPlan],
1585) -> ExecutorResult<RecordBatch> {
1586 if order_by.is_empty() {
1587 return Ok(batch.clone());
1588 }
1589
1590 let mut sort_columns: Vec<SortColumn> = Vec::with_capacity(order_by.len());
1591
1592 for order in order_by {
1593 let column_index = match &order.target {
1594 OrderTarget::Column(name) => schema.index_of(name).map_err(|_| {
1595 Error::InvalidArgumentError(format!(
1596 "ORDER BY references unknown column '{}'",
1597 name
1598 ))
1599 })?,
1600 OrderTarget::Index(idx) => {
1601 if *idx >= batch.num_columns() {
1602 return Err(Error::InvalidArgumentError(format!(
1603 "ORDER BY position {} is out of bounds for {} columns",
1604 idx + 1,
1605 batch.num_columns()
1606 )));
1607 }
1608 *idx
1609 }
1610 OrderTarget::All => {
1611 return Err(Error::InvalidArgumentError(
1612 "ORDER BY ALL should be expanded before sorting".into(),
1613 ));
1614 }
1615 };
1616
1617 let source_array = batch.column(column_index);
1618
1619 let values: ArrayRef = match order.sort_type {
1620 OrderSortType::Native => Arc::clone(source_array),
1621 OrderSortType::CastTextToInteger => {
1622 let strings = source_array
1623 .as_any()
1624 .downcast_ref::<StringArray>()
1625 .ok_or_else(|| {
1626 Error::InvalidArgumentError(
1627 "ORDER BY CAST expects the underlying column to be TEXT".into(),
1628 )
1629 })?;
1630 let mut builder = Int64Builder::with_capacity(strings.len());
1631 for i in 0..strings.len() {
1632 if strings.is_null(i) {
1633 builder.append_null();
1634 } else {
1635 match strings.value(i).parse::<i64>() {
1636 Ok(value) => builder.append_value(value),
1637 Err(_) => builder.append_null(),
1638 }
1639 }
1640 }
1641 Arc::new(builder.finish()) as ArrayRef
1642 }
1643 };
1644
1645 let sort_options = SortOptions {
1646 descending: !order.ascending,
1647 nulls_first: order.nulls_first,
1648 };
1649
1650 sort_columns.push(SortColumn {
1651 values,
1652 options: Some(sort_options),
1653 });
1654 }
1655
1656 let indices = lexsort_to_indices(&sort_columns, None).map_err(|err| {
1657 Error::InvalidArgumentError(format!("failed to compute ORDER BY indices: {err}"))
1658 })?;
1659
1660 let perm = indices
1661 .as_any()
1662 .downcast_ref::<UInt32Array>()
1663 .ok_or_else(|| Error::Internal("ORDER BY sorting produced unexpected index type".into()))?;
1664
1665 let mut reordered_columns: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
1666 for col_idx in 0..batch.num_columns() {
1667 let reordered = take(batch.column(col_idx), perm, None).map_err(|err| {
1668 Error::InvalidArgumentError(format!(
1669 "failed to apply ORDER BY permutation to column {col_idx}: {err}"
1670 ))
1671 })?;
1672 reordered_columns.push(reordered);
1673 }
1674
1675 RecordBatch::try_new(Arc::clone(schema), reordered_columns)
1676 .map_err(|err| Error::Internal(format!("failed to build reordered ORDER BY batch: {err}")))
1677}
1678
1679fn translate_predicate(
1681 expr: llkv_expr::expr::Expr<'static, String>,
1682 schema: &ExecutorSchema,
1683) -> ExecutorResult<llkv_expr::expr::Expr<'static, FieldId>> {
1684 use llkv_expr::expr::Expr;
1685 match expr {
1686 Expr::And(exprs) => {
1687 let translated: Result<Vec<_>, _> = exprs
1688 .into_iter()
1689 .map(|e| translate_predicate(e, schema))
1690 .collect();
1691 Ok(Expr::And(translated?))
1692 }
1693 Expr::Or(exprs) => {
1694 let translated: Result<Vec<_>, _> = exprs
1695 .into_iter()
1696 .map(|e| translate_predicate(e, schema))
1697 .collect();
1698 Ok(Expr::Or(translated?))
1699 }
1700 Expr::Not(inner) => {
1701 let translated = translate_predicate(*inner, schema)?;
1702 Ok(Expr::Not(Box::new(translated)))
1703 }
1704 Expr::Pred(filter) => {
1705 let column = schema.resolve(&filter.field_id).ok_or_else(|| {
1706 Error::InvalidArgumentError(format!("unknown column '{}'", filter.field_id))
1707 })?;
1708 Ok(Expr::Pred(Filter {
1709 field_id: column.field_id,
1710 op: filter.op,
1711 }))
1712 }
1713 Expr::Compare { left, op, right } => Ok(Expr::Compare {
1714 left: translate_scalar(&left, schema)?,
1715 op,
1716 right: translate_scalar(&right, schema)?,
1717 }),
1718 }
1719}
1720
1721fn translate_scalar(
1723 expr: &ScalarExpr<String>,
1724 schema: &ExecutorSchema,
1725) -> ExecutorResult<ScalarExpr<FieldId>> {
1726 match expr {
1727 ScalarExpr::Literal(lit) => Ok(ScalarExpr::Literal(lit.clone())),
1728 ScalarExpr::Column(name) => {
1729 let column = schema
1730 .resolve(name)
1731 .ok_or_else(|| Error::InvalidArgumentError(format!("unknown column '{}'", name)))?;
1732 Ok(ScalarExpr::Column(column.field_id))
1733 }
1734 ScalarExpr::Binary { left, op, right } => Ok(ScalarExpr::Binary {
1735 left: Box::new(translate_scalar(left, schema)?),
1736 op: *op,
1737 right: Box::new(translate_scalar(right, schema)?),
1738 }),
1739 ScalarExpr::Aggregate(agg) => {
1740 use llkv_expr::expr::AggregateCall;
1742 let translated_agg = match agg {
1743 AggregateCall::CountStar => AggregateCall::CountStar,
1744 AggregateCall::Count(name) => {
1745 let column = schema.resolve(name).ok_or_else(|| {
1746 Error::InvalidArgumentError(format!("unknown column '{}'", name))
1747 })?;
1748 AggregateCall::Count(column.field_id)
1749 }
1750 AggregateCall::Sum(name) => {
1751 let column = schema.resolve(name).ok_or_else(|| {
1752 Error::InvalidArgumentError(format!("unknown column '{}'", name))
1753 })?;
1754 AggregateCall::Sum(column.field_id)
1755 }
1756 AggregateCall::Min(name) => {
1757 let column = schema.resolve(name).ok_or_else(|| {
1758 Error::InvalidArgumentError(format!("unknown column '{}'", name))
1759 })?;
1760 AggregateCall::Min(column.field_id)
1761 }
1762 AggregateCall::Max(name) => {
1763 let column = schema.resolve(name).ok_or_else(|| {
1764 Error::InvalidArgumentError(format!("unknown column '{}'", name))
1765 })?;
1766 AggregateCall::Max(column.field_id)
1767 }
1768 AggregateCall::CountNulls(name) => {
1769 let column = schema.resolve(name).ok_or_else(|| {
1770 Error::InvalidArgumentError(format!("unknown column '{}'", name))
1771 })?;
1772 AggregateCall::CountNulls(column.field_id)
1773 }
1774 };
1775 Ok(ScalarExpr::Aggregate(translated_agg))
1776 }
1777 ScalarExpr::GetField { base, field_name } => Ok(ScalarExpr::GetField {
1778 base: Box::new(translate_scalar(base, schema)?),
1779 field_name: field_name.clone(),
1780 }),
1781 }
1782}