1use arrow::array::{Array, ArrayRef, Int64Builder, RecordBatch, StringArray, UInt32Array};
18use arrow::compute::{SortColumn, SortOptions, concat_batches, lexsort_to_indices, take};
19use arrow::datatypes::{DataType, Schema};
20use llkv_aggregate::{AggregateAccumulator, AggregateKind, AggregateSpec, AggregateState};
21use llkv_column_map::store::Projection as StoreProjection;
22use llkv_column_map::types::LogicalFieldId;
23use llkv_expr::expr::{Expr as LlkvExpr, ScalarExpr};
24use llkv_plan::{
25 AggregateExpr, AggregateFunction, OrderByPlan, OrderSortType, OrderTarget, PlanValue,
26 SelectPlan, SelectProjection,
27};
28use llkv_result::Error;
29use llkv_storage::pager::Pager;
30use llkv_table::table::{
31 RowIdFilter, ScanOrderDirection, ScanOrderSpec, ScanOrderTransform, ScanProjection,
32 ScanStreamOptions,
33};
34use llkv_table::types::FieldId;
35use rustc_hash::FxHashMap;
36use simd_r_drive_entry_handle::EntryHandle;
37use std::fmt;
38use std::sync::Arc;
39use std::sync::atomic::Ordering;
40
41pub mod insert;
46pub mod translation;
47pub mod types;
48pub mod utils;
49
50pub type ExecutorResult<T> = Result<T, Error>;
56
57pub use insert::{
58 build_array_for_column, normalize_insert_value_for_column, resolve_insert_columns,
59};
60pub use translation::{
61 build_projected_columns, build_wildcard_projections, full_table_scan_filter,
62 resolve_field_id_from_schema, schema_for_projections, translate_predicate,
63 translate_predicate_with, translate_scalar, translate_scalar_with,
64};
65pub use types::{
66 ExecutorColumn, ExecutorMultiColumnUnique, ExecutorRowBatch, ExecutorSchema, ExecutorTable,
67 ExecutorTableProvider,
68};
69pub use utils::current_time_micros;
70
71pub struct QueryExecutor<P>
78where
79 P: Pager<Blob = EntryHandle> + Send + Sync,
80{
81 provider: Arc<dyn ExecutorTableProvider<P>>,
82}
83
84impl<P> QueryExecutor<P>
85where
86 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
87{
88 pub fn new(provider: Arc<dyn ExecutorTableProvider<P>>) -> Self {
89 Self { provider }
90 }
91
92 pub fn execute_select(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
93 self.execute_select_with_filter(plan, None)
94 }
95
96 pub fn execute_select_with_filter(
97 &self,
98 plan: SelectPlan,
99 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
100 ) -> ExecutorResult<SelectExecution<P>> {
101 if plan.tables.is_empty() {
103 return self.execute_select_without_table(plan);
104 }
105
106 if plan.tables.len() > 1 {
108 return self.execute_cross_product(plan);
109 }
110
111 let table_ref = &plan.tables[0];
113 let table = self.provider.get_table(&table_ref.qualified_name())?;
114 let display_name = table_ref.qualified_name();
115
116 if !plan.aggregates.is_empty() {
117 self.execute_aggregates(table, display_name, plan, row_filter)
118 } else if self.has_computed_aggregates(&plan) {
119 self.execute_computed_aggregates(table, display_name, plan, row_filter)
121 } else {
122 self.execute_projection(table, display_name, plan, row_filter)
123 }
124 }
125
126 fn has_computed_aggregates(&self, plan: &SelectPlan) -> bool {
128 plan.projections.iter().any(|proj| {
129 if let SelectProjection::Computed { expr, .. } = proj {
130 Self::expr_contains_aggregate(expr)
131 } else {
132 false
133 }
134 })
135 }
136
137 fn expr_contains_aggregate(expr: &ScalarExpr<String>) -> bool {
139 match expr {
140 ScalarExpr::Aggregate(_) => true,
141 ScalarExpr::Binary { left, right, .. } => {
142 Self::expr_contains_aggregate(left) || Self::expr_contains_aggregate(right)
143 }
144 ScalarExpr::GetField { base, .. } => Self::expr_contains_aggregate(base),
145 ScalarExpr::Column(_) | ScalarExpr::Literal(_) => false,
146 }
147 }
148
149 fn execute_select_without_table(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
151 use arrow::array::ArrayRef;
152 use arrow::datatypes::Field;
153
154 let mut fields = Vec::new();
156 let mut arrays: Vec<ArrayRef> = Vec::new();
157
158 for proj in &plan.projections {
159 match proj {
160 SelectProjection::Computed { expr, alias } => {
161 let (field_name, dtype, array) = match expr {
163 ScalarExpr::Literal(lit) => {
164 let (dtype, array) = Self::literal_to_array(lit)?;
165 (alias.clone(), dtype, array)
166 }
167 _ => {
168 return Err(Error::InvalidArgumentError(
169 "SELECT without FROM only supports literal expressions".into(),
170 ));
171 }
172 };
173
174 fields.push(Field::new(field_name, dtype, true));
175 arrays.push(array);
176 }
177 _ => {
178 return Err(Error::InvalidArgumentError(
179 "SELECT without FROM only supports computed projections".into(),
180 ));
181 }
182 }
183 }
184
185 let schema = Arc::new(Schema::new(fields));
186 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)
187 .map_err(|e| Error::Internal(format!("failed to create record batch: {}", e)))?;
188
189 Ok(SelectExecution::new_single_batch(
190 String::new(), schema,
192 batch,
193 ))
194 }
195
196 fn literal_to_array(lit: &llkv_expr::literal::Literal) -> ExecutorResult<(DataType, ArrayRef)> {
198 use arrow::array::{
199 ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray, StructArray,
200 new_null_array,
201 };
202 use arrow::datatypes::{DataType, Field};
203 use llkv_expr::literal::Literal;
204
205 match lit {
206 Literal::Integer(v) => {
207 let val = i64::try_from(*v).unwrap_or(0);
208 Ok((
209 DataType::Int64,
210 Arc::new(Int64Array::from(vec![val])) as ArrayRef,
211 ))
212 }
213 Literal::Float(v) => Ok((
214 DataType::Float64,
215 Arc::new(Float64Array::from(vec![*v])) as ArrayRef,
216 )),
217 Literal::Boolean(v) => Ok((
218 DataType::Boolean,
219 Arc::new(BooleanArray::from(vec![*v])) as ArrayRef,
220 )),
221 Literal::String(v) => Ok((
222 DataType::Utf8,
223 Arc::new(StringArray::from(vec![v.clone()])) as ArrayRef,
224 )),
225 Literal::Null => Ok((DataType::Null, new_null_array(&DataType::Null, 1))),
226 Literal::Struct(struct_fields) => {
227 let mut inner_fields = Vec::new();
229 let mut inner_arrays = Vec::new();
230
231 for (field_name, field_lit) in struct_fields {
232 let (field_dtype, field_array) = Self::literal_to_array(field_lit)?;
233 inner_fields.push(Field::new(field_name.clone(), field_dtype, true));
234 inner_arrays.push(field_array);
235 }
236
237 let struct_array =
238 StructArray::try_new(inner_fields.clone().into(), inner_arrays, None).map_err(
239 |e| Error::Internal(format!("failed to create struct array: {}", e)),
240 )?;
241
242 Ok((
243 DataType::Struct(inner_fields.into()),
244 Arc::new(struct_array) as ArrayRef,
245 ))
246 }
247 }
248 }
249
250 fn execute_cross_product(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
252 use arrow::compute::concat_batches;
253
254 if plan.tables.len() < 2 {
255 return Err(Error::InvalidArgumentError(
256 "cross product requires at least 2 tables".into(),
257 ));
258 }
259
260 let mut tables = Vec::new();
262 for table_ref in &plan.tables {
263 let qualified_name = table_ref.qualified_name();
264 let table = self.provider.get_table(&qualified_name)?;
265 tables.push((table_ref.clone(), table));
266 }
267
268 if tables.len() > 2 {
270 return Err(Error::InvalidArgumentError(
271 "cross products with more than 2 tables not yet supported".into(),
272 ));
273 }
274
275 let (left_ref, left_table) = &tables[0];
276 let (right_ref, right_table) = &tables[1];
277
278 use llkv_join::{JoinOptions, JoinType, TableJoinExt};
281
282 let mut result_batches = Vec::new();
283 left_table.table.join_stream(
284 &right_table.table,
285 &[], &JoinOptions {
287 join_type: JoinType::Inner,
288 ..Default::default()
289 },
290 |batch| {
291 result_batches.push(batch);
292 },
293 )?;
294
295 let mut combined_fields = Vec::new();
297
298 for col in &left_table.schema.columns {
300 let qualified_name = format!("{}.{}.{}", left_ref.schema, left_ref.table, col.name);
301 combined_fields.push(arrow::datatypes::Field::new(
302 qualified_name,
303 col.data_type.clone(),
304 col.nullable,
305 ));
306 }
307
308 for col in &right_table.schema.columns {
310 let qualified_name = format!("{}.{}.{}", right_ref.schema, right_ref.table, col.name);
311 combined_fields.push(arrow::datatypes::Field::new(
312 qualified_name,
313 col.data_type.clone(),
314 col.nullable,
315 ));
316 }
317
318 let combined_schema = Arc::new(Schema::new(combined_fields));
319
320 let mut combined_batch = if result_batches.is_empty() {
322 RecordBatch::new_empty(Arc::clone(&combined_schema))
323 } else if result_batches.len() == 1 {
324 let batch = result_batches.into_iter().next().unwrap();
325 RecordBatch::try_new(Arc::clone(&combined_schema), batch.columns().to_vec()).map_err(
327 |e| {
328 Error::Internal(format!(
329 "failed to create batch with qualified names: {}",
330 e
331 ))
332 },
333 )?
334 } else {
335 let original_batch = concat_batches(&result_batches[0].schema(), &result_batches)
337 .map_err(|e| Error::Internal(format!("failed to concatenate batches: {}", e)))?;
338 RecordBatch::try_new(
340 Arc::clone(&combined_schema),
341 original_batch.columns().to_vec(),
342 )
343 .map_err(|e| {
344 Error::Internal(format!(
345 "failed to create batch with qualified names: {}",
346 e
347 ))
348 })?
349 };
350
351 if !plan.projections.is_empty() {
353 let mut selected_fields = Vec::new();
354 let mut selected_columns = Vec::new();
355
356 for proj in &plan.projections {
357 match proj {
358 SelectProjection::AllColumns => {
359 selected_fields = combined_schema.fields().iter().cloned().collect();
361 selected_columns = combined_batch.columns().to_vec();
362 break;
363 }
364 SelectProjection::AllColumnsExcept { exclude } => {
365 let exclude_lower: Vec<String> =
367 exclude.iter().map(|e| e.to_ascii_lowercase()).collect();
368
369 for (idx, field) in combined_schema.fields().iter().enumerate() {
370 let field_name_lower = field.name().to_ascii_lowercase();
371 if !exclude_lower.contains(&field_name_lower) {
372 selected_fields.push(field.clone());
373 selected_columns.push(combined_batch.column(idx).clone());
374 }
375 }
376 break;
377 }
378 SelectProjection::Column { name, alias } => {
379 let col_name = name.to_ascii_lowercase();
381 if let Some((idx, field)) = combined_schema
382 .fields()
383 .iter()
384 .enumerate()
385 .find(|(_, f)| f.name().to_ascii_lowercase() == col_name)
386 {
387 let output_name = alias.as_ref().unwrap_or(name).clone();
388 selected_fields.push(Arc::new(arrow::datatypes::Field::new(
389 output_name,
390 field.data_type().clone(),
391 field.is_nullable(),
392 )));
393 selected_columns.push(combined_batch.column(idx).clone());
394 } else {
395 return Err(Error::InvalidArgumentError(format!(
396 "column '{}' not found in cross product result",
397 name
398 )));
399 }
400 }
401 SelectProjection::Computed { expr, alias } => {
402 if let ScalarExpr::Column(col_name) = expr {
404 let col_name_lower = col_name.to_ascii_lowercase();
405 if let Some((idx, field)) = combined_schema
406 .fields()
407 .iter()
408 .enumerate()
409 .find(|(_, f)| f.name().to_ascii_lowercase() == col_name_lower)
410 {
411 selected_fields.push(Arc::new(arrow::datatypes::Field::new(
412 alias.clone(),
413 field.data_type().clone(),
414 field.is_nullable(),
415 )));
416 selected_columns.push(combined_batch.column(idx).clone());
417 } else {
418 return Err(Error::InvalidArgumentError(format!(
419 "column '{}' not found in cross product result",
420 col_name
421 )));
422 }
423 } else {
424 return Err(Error::InvalidArgumentError(
425 "complex computed projections not yet supported in cross products"
426 .into(),
427 ));
428 }
429 }
430 }
431 }
432
433 let projected_schema = Arc::new(Schema::new(selected_fields));
434 combined_batch = RecordBatch::try_new(projected_schema, selected_columns)
435 .map_err(|e| Error::Internal(format!("failed to apply projections: {}", e)))?;
436 }
437
438 Ok(SelectExecution::new_single_batch(
439 format!(
440 "{},{}",
441 left_ref.qualified_name(),
442 right_ref.qualified_name()
443 ),
444 combined_batch.schema(),
445 combined_batch,
446 ))
447 }
448
449 fn execute_projection(
450 &self,
451 table: Arc<ExecutorTable<P>>,
452 display_name: String,
453 plan: SelectPlan,
454 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
455 ) -> ExecutorResult<SelectExecution<P>> {
456 let table_ref = table.as_ref();
457 let projections = if plan.projections.is_empty() {
458 build_wildcard_projections(table_ref)
459 } else {
460 build_projected_columns(table_ref, &plan.projections)?
461 };
462 let schema = schema_for_projections(table_ref, &projections)?;
463
464 let (filter_expr, full_table_scan) = match plan.filter {
465 Some(expr) => (
466 crate::translation::expression::translate_predicate(
467 expr,
468 table_ref.schema.as_ref(),
469 |name| Error::InvalidArgumentError(format!("unknown column '{}'", name)),
470 )?,
471 false,
472 ),
473 None => {
474 let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
475 Error::InvalidArgumentError(
476 "table has no columns; cannot perform wildcard scan".into(),
477 )
478 })?;
479 (
480 crate::translation::expression::full_table_scan_filter(field_id),
481 true,
482 )
483 }
484 };
485
486 let expanded_order = expand_order_targets(&plan.order_by, &projections)?;
487 let physical_order = if let Some(first) = expanded_order.first() {
488 Some(resolve_scan_order(table_ref, &projections, first)?)
489 } else {
490 None
491 };
492
493 let options = if let Some(order_spec) = physical_order {
494 if row_filter.is_some() {
495 tracing::debug!("Applying MVCC row filter with ORDER BY");
496 }
497 ScanStreamOptions {
498 include_nulls: true,
499 order: Some(order_spec),
500 row_id_filter: row_filter.clone(),
501 }
502 } else {
503 if row_filter.is_some() {
504 tracing::debug!("Applying MVCC row filter");
505 }
506 ScanStreamOptions {
507 include_nulls: true,
508 order: None,
509 row_id_filter: row_filter.clone(),
510 }
511 };
512
513 Ok(SelectExecution::new_projection(
514 display_name,
515 schema,
516 table,
517 projections,
518 filter_expr,
519 options,
520 full_table_scan,
521 expanded_order,
522 ))
523 }
524
525 fn execute_aggregates(
526 &self,
527 table: Arc<ExecutorTable<P>>,
528 display_name: String,
529 plan: SelectPlan,
530 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
531 ) -> ExecutorResult<SelectExecution<P>> {
532 let table_ref = table.as_ref();
533 let mut specs: Vec<AggregateSpec> = Vec::with_capacity(plan.aggregates.len());
534 for aggregate in plan.aggregates {
535 match aggregate {
536 AggregateExpr::CountStar { alias } => {
537 specs.push(AggregateSpec {
538 alias,
539 kind: AggregateKind::CountStar,
540 });
541 }
542 AggregateExpr::Column {
543 column,
544 alias,
545 function,
546 distinct,
547 } => {
548 let col = table_ref.schema.resolve(&column).ok_or_else(|| {
549 Error::InvalidArgumentError(format!(
550 "unknown column '{}' in aggregate",
551 column
552 ))
553 })?;
554
555 let kind = match function {
556 AggregateFunction::Count => {
557 if distinct {
558 AggregateKind::CountDistinctField {
559 field_id: col.field_id,
560 }
561 } else {
562 AggregateKind::CountField {
563 field_id: col.field_id,
564 }
565 }
566 }
567 AggregateFunction::SumInt64 => {
568 if col.data_type != DataType::Int64 {
569 return Err(Error::InvalidArgumentError(
570 "SUM currently supports only INTEGER columns".into(),
571 ));
572 }
573 AggregateKind::SumInt64 {
574 field_id: col.field_id,
575 }
576 }
577 AggregateFunction::MinInt64 => {
578 if col.data_type != DataType::Int64 {
579 return Err(Error::InvalidArgumentError(
580 "MIN currently supports only INTEGER columns".into(),
581 ));
582 }
583 AggregateKind::MinInt64 {
584 field_id: col.field_id,
585 }
586 }
587 AggregateFunction::MaxInt64 => {
588 if col.data_type != DataType::Int64 {
589 return Err(Error::InvalidArgumentError(
590 "MAX currently supports only INTEGER columns".into(),
591 ));
592 }
593 AggregateKind::MaxInt64 {
594 field_id: col.field_id,
595 }
596 }
597 AggregateFunction::CountNulls => {
598 if distinct {
599 return Err(Error::InvalidArgumentError(
600 "DISTINCT is not supported for COUNT_NULLS".into(),
601 ));
602 }
603 AggregateKind::CountNulls {
604 field_id: col.field_id,
605 }
606 }
607 };
608 specs.push(AggregateSpec { alias, kind });
609 }
610 }
611 }
612
613 if specs.is_empty() {
614 return Err(Error::InvalidArgumentError(
615 "aggregate query requires at least one aggregate expression".into(),
616 ));
617 }
618
619 let had_filter = plan.filter.is_some();
620 let filter_expr = match plan.filter {
621 Some(expr) => crate::translation::expression::translate_predicate(
622 expr,
623 table.schema.as_ref(),
624 |name| Error::InvalidArgumentError(format!("unknown column '{}'", name)),
625 )?,
626 None => {
627 let field_id = table.schema.first_field_id().ok_or_else(|| {
628 Error::InvalidArgumentError(
629 "table has no columns; cannot perform aggregate scan".into(),
630 )
631 })?;
632 crate::translation::expression::full_table_scan_filter(field_id)
633 }
634 };
635
636 let mut projections = Vec::new();
638 let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
639
640 for spec in &specs {
641 if let Some(field_id) = spec.kind.field_id() {
642 let proj_idx = projections.len();
643 spec_to_projection.push(Some(proj_idx));
644 projections.push(ScanProjection::from(StoreProjection::with_alias(
645 LogicalFieldId::for_user(table.table.table_id(), field_id),
646 table
647 .schema
648 .column_by_field_id(field_id)
649 .map(|c| c.name.clone())
650 .unwrap_or_else(|| format!("col{field_id}")),
651 )));
652 } else {
653 spec_to_projection.push(None);
654 }
655 }
656
657 if projections.is_empty() {
658 let field_id = table.schema.first_field_id().ok_or_else(|| {
659 Error::InvalidArgumentError(
660 "table has no columns; cannot perform aggregate scan".into(),
661 )
662 })?;
663 projections.push(ScanProjection::from(StoreProjection::with_alias(
664 LogicalFieldId::for_user(table.table.table_id(), field_id),
665 table
666 .schema
667 .column_by_field_id(field_id)
668 .map(|c| c.name.clone())
669 .unwrap_or_else(|| format!("col{field_id}")),
670 )));
671 }
672
673 let options = ScanStreamOptions {
674 include_nulls: true,
675 order: None,
676 row_id_filter: row_filter.clone(),
677 };
678
679 let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
680 let mut count_star_override: Option<i64> = None;
684 if !had_filter && row_filter.is_none() {
685 let total_rows = table.total_rows.load(Ordering::SeqCst);
687 tracing::debug!(
688 "[AGGREGATE] Using COUNT(*) shortcut: total_rows={}",
689 total_rows
690 );
691 if total_rows > i64::MAX as u64 {
692 return Err(Error::InvalidArgumentError(
693 "COUNT(*) result exceeds supported range".into(),
694 ));
695 }
696 count_star_override = Some(total_rows as i64);
697 } else {
698 tracing::debug!(
699 "[AGGREGATE] NOT using COUNT(*) shortcut: had_filter={}, has_row_filter={}",
700 had_filter,
701 row_filter.is_some()
702 );
703 }
704
705 for (idx, spec) in specs.iter().enumerate() {
706 states.push(AggregateState {
707 alias: spec.alias.clone(),
708 accumulator: AggregateAccumulator::new_with_projection_index(
709 spec,
710 spec_to_projection[idx],
711 count_star_override,
712 )?,
713 override_value: match spec.kind {
714 AggregateKind::CountStar => {
715 tracing::debug!(
716 "[AGGREGATE] CountStar override_value={:?}",
717 count_star_override
718 );
719 count_star_override
720 }
721 _ => None,
722 },
723 });
724 }
725
726 let mut error: Option<Error> = None;
727 match table.table.scan_stream(
728 projections,
729 &filter_expr,
730 ScanStreamOptions {
731 row_id_filter: row_filter.clone(),
732 ..options
733 },
734 |batch| {
735 if error.is_some() {
736 return;
737 }
738 for state in &mut states {
739 if let Err(err) = state.update(&batch) {
740 error = Some(err);
741 return;
742 }
743 }
744 },
745 ) {
746 Ok(()) => {}
747 Err(llkv_result::Error::NotFound) => {
748 }
751 Err(err) => return Err(err),
752 }
753 if let Some(err) = error {
754 return Err(err);
755 }
756
757 let mut fields = Vec::with_capacity(states.len());
758 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(states.len());
759 for state in states {
760 let (field, array) = state.finalize()?;
761 fields.push(field);
762 arrays.push(array);
763 }
764
765 let schema = Arc::new(Schema::new(fields));
766 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
767 Ok(SelectExecution::new_single_batch(
768 display_name,
769 schema,
770 batch,
771 ))
772 }
773
774 fn execute_computed_aggregates(
777 &self,
778 table: Arc<ExecutorTable<P>>,
779 display_name: String,
780 plan: SelectPlan,
781 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
782 ) -> ExecutorResult<SelectExecution<P>> {
783 use arrow::array::Int64Array;
784 use llkv_expr::expr::AggregateCall;
785
786 let table_ref = table.as_ref();
787
788 let mut aggregate_specs: Vec<(String, AggregateCall<String>)> = Vec::new();
790 for proj in &plan.projections {
791 if let SelectProjection::Computed { expr, .. } = proj {
792 Self::collect_aggregates(expr, &mut aggregate_specs);
793 }
794 }
795
796 let computed_aggregates = self.compute_aggregate_values(
798 table.clone(),
799 &plan.filter,
800 &aggregate_specs,
801 row_filter.clone(),
802 )?;
803
804 let mut fields = Vec::with_capacity(plan.projections.len());
806 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(plan.projections.len());
807
808 for proj in &plan.projections {
809 match proj {
810 SelectProjection::AllColumns | SelectProjection::AllColumnsExcept { .. } => {
811 return Err(Error::InvalidArgumentError(
812 "Wildcard projections not supported with computed aggregates".into(),
813 ));
814 }
815 SelectProjection::Column { name, alias } => {
816 let col = table_ref.schema.resolve(name).ok_or_else(|| {
817 Error::InvalidArgumentError(format!("unknown column '{}'", name))
818 })?;
819 let field_name = alias.as_ref().unwrap_or(name);
820 fields.push(arrow::datatypes::Field::new(
821 field_name,
822 col.data_type.clone(),
823 col.nullable,
824 ));
825 return Err(Error::InvalidArgumentError(
828 "Regular columns not supported in aggregate queries without GROUP BY"
829 .into(),
830 ));
831 }
832 SelectProjection::Computed { expr, alias } => {
833 let value = Self::evaluate_expr_with_aggregates(expr, &computed_aggregates)?;
835
836 fields.push(arrow::datatypes::Field::new(alias, DataType::Int64, false));
837
838 let array = Arc::new(Int64Array::from(vec![value])) as ArrayRef;
839 arrays.push(array);
840 }
841 }
842 }
843
844 let schema = Arc::new(Schema::new(fields));
845 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
846 Ok(SelectExecution::new_single_batch(
847 display_name,
848 schema,
849 batch,
850 ))
851 }
852
853 fn collect_aggregates(
855 expr: &ScalarExpr<String>,
856 aggregates: &mut Vec<(String, llkv_expr::expr::AggregateCall<String>)>,
857 ) {
858 match expr {
859 ScalarExpr::Aggregate(agg) => {
860 let key = format!("{:?}", agg);
862 if !aggregates.iter().any(|(k, _)| k == &key) {
863 aggregates.push((key, agg.clone()));
864 }
865 }
866 ScalarExpr::Binary { left, right, .. } => {
867 Self::collect_aggregates(left, aggregates);
868 Self::collect_aggregates(right, aggregates);
869 }
870 ScalarExpr::GetField { base, .. } => {
871 Self::collect_aggregates(base, aggregates);
872 }
873 ScalarExpr::Column(_) | ScalarExpr::Literal(_) => {}
874 }
875 }
876
877 fn compute_aggregate_values(
879 &self,
880 table: Arc<ExecutorTable<P>>,
881 filter: &Option<llkv_expr::expr::Expr<'static, String>>,
882 aggregate_specs: &[(String, llkv_expr::expr::AggregateCall<String>)],
883 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
884 ) -> ExecutorResult<FxHashMap<String, i64>> {
885 use llkv_expr::expr::AggregateCall;
886
887 let table_ref = table.as_ref();
888 let mut results =
889 FxHashMap::with_capacity_and_hasher(aggregate_specs.len(), Default::default());
890
891 let mut specs: Vec<AggregateSpec> = Vec::new();
893 for (key, agg) in aggregate_specs {
894 let kind = match agg {
895 AggregateCall::CountStar => AggregateKind::CountStar,
896 AggregateCall::Count(col_name) => {
897 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
898 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
899 })?;
900 AggregateKind::CountField {
901 field_id: col.field_id,
902 }
903 }
904 AggregateCall::Sum(col_name) => {
905 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
906 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
907 })?;
908 AggregateKind::SumInt64 {
909 field_id: col.field_id,
910 }
911 }
912 AggregateCall::Min(col_name) => {
913 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
914 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
915 })?;
916 AggregateKind::MinInt64 {
917 field_id: col.field_id,
918 }
919 }
920 AggregateCall::Max(col_name) => {
921 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
922 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
923 })?;
924 AggregateKind::MaxInt64 {
925 field_id: col.field_id,
926 }
927 }
928 AggregateCall::CountNulls(col_name) => {
929 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
930 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
931 })?;
932 AggregateKind::CountNulls {
933 field_id: col.field_id,
934 }
935 }
936 };
937 specs.push(AggregateSpec {
938 alias: key.clone(),
939 kind,
940 });
941 }
942
943 let filter_expr = match filter {
945 Some(expr) => crate::translation::expression::translate_predicate(
946 expr.clone(),
947 table_ref.schema.as_ref(),
948 |name| Error::InvalidArgumentError(format!("unknown column '{}'", name)),
949 )?,
950 None => {
951 let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
952 Error::InvalidArgumentError(
953 "table has no columns; cannot perform aggregate scan".into(),
954 )
955 })?;
956 crate::translation::expression::full_table_scan_filter(field_id)
957 }
958 };
959
960 let mut projections: Vec<ScanProjection> = Vec::new();
961 let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
962 let count_star_override: Option<i64> = None;
963
964 for spec in &specs {
965 if let Some(field_id) = spec.kind.field_id() {
966 spec_to_projection.push(Some(projections.len()));
967 projections.push(ScanProjection::from(StoreProjection::with_alias(
968 LogicalFieldId::for_user(table.table.table_id(), field_id),
969 table
970 .schema
971 .column_by_field_id(field_id)
972 .map(|c| c.name.clone())
973 .unwrap_or_else(|| format!("col{field_id}")),
974 )));
975 } else {
976 spec_to_projection.push(None);
977 }
978 }
979
980 if projections.is_empty() {
981 let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
982 Error::InvalidArgumentError(
983 "table has no columns; cannot perform aggregate scan".into(),
984 )
985 })?;
986 projections.push(ScanProjection::from(StoreProjection::with_alias(
987 LogicalFieldId::for_user(table.table.table_id(), field_id),
988 table
989 .schema
990 .column_by_field_id(field_id)
991 .map(|c| c.name.clone())
992 .unwrap_or_else(|| format!("col{field_id}")),
993 )));
994 }
995
996 let base_options = ScanStreamOptions {
997 include_nulls: true,
998 order: None,
999 row_id_filter: None,
1000 };
1001
1002 let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
1003 for (idx, spec) in specs.iter().enumerate() {
1004 states.push(AggregateState {
1005 alias: spec.alias.clone(),
1006 accumulator: AggregateAccumulator::new_with_projection_index(
1007 spec,
1008 spec_to_projection[idx],
1009 count_star_override,
1010 )?,
1011 override_value: match spec.kind {
1012 AggregateKind::CountStar => count_star_override,
1013 _ => None,
1014 },
1015 });
1016 }
1017
1018 let mut error: Option<Error> = None;
1019 match table.table.scan_stream(
1020 projections,
1021 &filter_expr,
1022 ScanStreamOptions {
1023 row_id_filter: row_filter.clone(),
1024 ..base_options
1025 },
1026 |batch| {
1027 if error.is_some() {
1028 return;
1029 }
1030 for state in &mut states {
1031 if let Err(err) = state.update(&batch) {
1032 error = Some(err);
1033 return;
1034 }
1035 }
1036 },
1037 ) {
1038 Ok(()) => {}
1039 Err(llkv_result::Error::NotFound) => {}
1040 Err(err) => return Err(err),
1041 }
1042 if let Some(err) = error {
1043 return Err(err);
1044 }
1045
1046 for state in states {
1048 let alias = state.alias.clone();
1049 let (_field, array) = state.finalize()?;
1050
1051 let int64_array = array
1053 .as_any()
1054 .downcast_ref::<arrow::array::Int64Array>()
1055 .ok_or_else(|| Error::Internal("Expected Int64Array from aggregate".into()))?;
1056
1057 if int64_array.len() != 1 {
1058 return Err(Error::Internal(format!(
1059 "Expected single value from aggregate, got {}",
1060 int64_array.len()
1061 )));
1062 }
1063
1064 let value = if int64_array.is_null(0) {
1065 0
1066 } else {
1067 int64_array.value(0)
1068 };
1069
1070 results.insert(alias, value);
1071 }
1072
1073 Ok(results)
1074 }
1075
1076 fn evaluate_expr_with_aggregates(
1078 expr: &ScalarExpr<String>,
1079 aggregates: &FxHashMap<String, i64>,
1080 ) -> ExecutorResult<i64> {
1081 use llkv_expr::expr::BinaryOp;
1082 use llkv_expr::literal::Literal;
1083
1084 match expr {
1085 ScalarExpr::Literal(Literal::Integer(v)) => Ok(*v as i64),
1086 ScalarExpr::Literal(Literal::Float(v)) => Ok(*v as i64),
1087 ScalarExpr::Literal(Literal::Boolean(v)) => Ok(if *v { 1 } else { 0 }),
1088 ScalarExpr::Literal(Literal::String(_)) => Err(Error::InvalidArgumentError(
1089 "String literals not supported in aggregate expressions".into(),
1090 )),
1091 ScalarExpr::Literal(Literal::Null) => Err(Error::InvalidArgumentError(
1092 "NULL literals not supported in aggregate expressions".into(),
1093 )),
1094 ScalarExpr::Literal(Literal::Struct(_)) => Err(Error::InvalidArgumentError(
1095 "Struct literals not supported in aggregate expressions".into(),
1096 )),
1097 ScalarExpr::Column(_) => Err(Error::InvalidArgumentError(
1098 "Column references not supported in aggregate-only expressions".into(),
1099 )),
1100 ScalarExpr::Aggregate(agg) => {
1101 let key = format!("{:?}", agg);
1102 aggregates.get(&key).copied().ok_or_else(|| {
1103 Error::Internal(format!("Aggregate value not found for key: {}", key))
1104 })
1105 }
1106 ScalarExpr::Binary { left, op, right } => {
1107 let left_val = Self::evaluate_expr_with_aggregates(left, aggregates)?;
1108 let right_val = Self::evaluate_expr_with_aggregates(right, aggregates)?;
1109
1110 let result = match op {
1111 BinaryOp::Add => left_val.checked_add(right_val),
1112 BinaryOp::Subtract => left_val.checked_sub(right_val),
1113 BinaryOp::Multiply => left_val.checked_mul(right_val),
1114 BinaryOp::Divide => {
1115 if right_val == 0 {
1116 return Err(Error::InvalidArgumentError("Division by zero".into()));
1117 }
1118 left_val.checked_div(right_val)
1119 }
1120 BinaryOp::Modulo => {
1121 if right_val == 0 {
1122 return Err(Error::InvalidArgumentError("Modulo by zero".into()));
1123 }
1124 left_val.checked_rem(right_val)
1125 }
1126 };
1127
1128 result.ok_or_else(|| {
1129 Error::InvalidArgumentError("Arithmetic overflow in expression".into())
1130 })
1131 }
1132 ScalarExpr::GetField { .. } => Err(Error::InvalidArgumentError(
1133 "GetField not supported in aggregate-only expressions".into(),
1134 )),
1135 }
1136 }
1137}
1138
1139#[derive(Clone)]
1141pub struct SelectExecution<P>
1142where
1143 P: Pager<Blob = EntryHandle> + Send + Sync,
1144{
1145 table_name: String,
1146 schema: Arc<Schema>,
1147 stream: SelectStream<P>,
1148}
1149
1150#[derive(Clone)]
1151enum SelectStream<P>
1152where
1153 P: Pager<Blob = EntryHandle> + Send + Sync,
1154{
1155 Projection {
1156 table: Arc<ExecutorTable<P>>,
1157 projections: Vec<ScanProjection>,
1158 filter_expr: LlkvExpr<'static, FieldId>,
1159 options: ScanStreamOptions<P>,
1160 full_table_scan: bool,
1161 order_by: Vec<OrderByPlan>,
1162 },
1163 Aggregation {
1164 batch: RecordBatch,
1165 },
1166}
1167
1168impl<P> SelectExecution<P>
1169where
1170 P: Pager<Blob = EntryHandle> + Send + Sync,
1171{
1172 #[allow(clippy::too_many_arguments)]
1173 fn new_projection(
1174 table_name: String,
1175 schema: Arc<Schema>,
1176 table: Arc<ExecutorTable<P>>,
1177 projections: Vec<ScanProjection>,
1178 filter_expr: LlkvExpr<'static, FieldId>,
1179 options: ScanStreamOptions<P>,
1180 full_table_scan: bool,
1181 order_by: Vec<OrderByPlan>,
1182 ) -> Self {
1183 Self {
1184 table_name,
1185 schema,
1186 stream: SelectStream::Projection {
1187 table,
1188 projections,
1189 filter_expr,
1190 options,
1191 full_table_scan,
1192 order_by,
1193 },
1194 }
1195 }
1196
1197 pub fn new_single_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
1198 Self {
1199 table_name,
1200 schema,
1201 stream: SelectStream::Aggregation { batch },
1202 }
1203 }
1204
1205 pub fn from_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
1206 Self::new_single_batch(table_name, schema, batch)
1207 }
1208
1209 pub fn table_name(&self) -> &str {
1210 &self.table_name
1211 }
1212
1213 pub fn schema(&self) -> Arc<Schema> {
1214 Arc::clone(&self.schema)
1215 }
1216
1217 pub fn stream(
1218 self,
1219 mut on_batch: impl FnMut(RecordBatch) -> ExecutorResult<()>,
1220 ) -> ExecutorResult<()> {
1221 let schema = Arc::clone(&self.schema);
1222 match self.stream {
1223 SelectStream::Projection {
1224 table,
1225 projections,
1226 filter_expr,
1227 options,
1228 full_table_scan,
1229 order_by,
1230 } => {
1231 let total_rows = table.total_rows.load(Ordering::SeqCst);
1233 if total_rows == 0 {
1234 return Ok(());
1236 }
1237
1238 let mut error: Option<Error> = None;
1239 let mut produced = false;
1240 let mut produced_rows: u64 = 0;
1241 let capture_nulls_first = matches!(options.order, Some(spec) if spec.nulls_first);
1242 let needs_post_sort = order_by.len() > 1;
1243 let collect_batches = needs_post_sort || capture_nulls_first;
1244 let include_nulls = options.include_nulls;
1245 let has_row_id_filter = options.row_id_filter.is_some();
1246 let scan_options = options;
1247 let mut buffered_batches: Vec<RecordBatch> = Vec::new();
1248 table
1249 .table
1250 .scan_stream(projections, &filter_expr, scan_options, |batch| {
1251 if error.is_some() {
1252 return;
1253 }
1254 produced = true;
1255 produced_rows = produced_rows.saturating_add(batch.num_rows() as u64);
1256 if collect_batches {
1257 buffered_batches.push(batch);
1258 } else if let Err(err) = on_batch(batch) {
1259 error = Some(err);
1260 }
1261 })?;
1262 if let Some(err) = error {
1263 return Err(err);
1264 }
1265 if !produced {
1266 if total_rows > 0 {
1267 for batch in synthesize_null_scan(Arc::clone(&schema), total_rows)? {
1268 on_batch(batch)?;
1269 }
1270 }
1271 return Ok(());
1272 }
1273 let mut null_batches: Vec<RecordBatch> = Vec::new();
1274 if include_nulls
1280 && full_table_scan
1281 && produced_rows < total_rows
1282 && !has_row_id_filter
1283 {
1284 let missing = total_rows - produced_rows;
1285 if missing > 0 {
1286 null_batches = synthesize_null_scan(Arc::clone(&schema), missing)?;
1287 }
1288 }
1289
1290 if collect_batches {
1291 if needs_post_sort {
1292 if !null_batches.is_empty() {
1293 buffered_batches.extend(null_batches);
1294 }
1295 if !buffered_batches.is_empty() {
1296 let combined =
1297 concat_batches(&schema, &buffered_batches).map_err(|err| {
1298 Error::InvalidArgumentError(format!(
1299 "failed to concatenate result batches for ORDER BY: {}",
1300 err
1301 ))
1302 })?;
1303 let sorted_batch =
1304 sort_record_batch_with_order(&schema, &combined, &order_by)?;
1305 on_batch(sorted_batch)?;
1306 }
1307 } else if capture_nulls_first {
1308 for batch in null_batches {
1309 on_batch(batch)?;
1310 }
1311 for batch in buffered_batches {
1312 on_batch(batch)?;
1313 }
1314 }
1315 } else if !null_batches.is_empty() {
1316 for batch in null_batches {
1317 on_batch(batch)?;
1318 }
1319 }
1320 Ok(())
1321 }
1322 SelectStream::Aggregation { batch } => on_batch(batch),
1323 }
1324 }
1325
1326 pub fn collect(self) -> ExecutorResult<Vec<RecordBatch>> {
1327 let mut batches = Vec::new();
1328 self.stream(|batch| {
1329 batches.push(batch);
1330 Ok(())
1331 })?;
1332 Ok(batches)
1333 }
1334
1335 pub fn collect_rows(self) -> ExecutorResult<ExecutorRowBatch> {
1336 let schema = self.schema();
1337 let mut rows: Vec<Vec<PlanValue>> = Vec::new();
1338 self.stream(|batch| {
1339 for row_idx in 0..batch.num_rows() {
1340 let mut row: Vec<PlanValue> = Vec::with_capacity(batch.num_columns());
1341 for col_idx in 0..batch.num_columns() {
1342 let value = llkv_plan::plan_value_from_array(batch.column(col_idx), row_idx)?;
1343 row.push(value);
1344 }
1345 rows.push(row);
1346 }
1347 Ok(())
1348 })?;
1349 let columns = schema
1350 .fields()
1351 .iter()
1352 .map(|field| field.name().to_string())
1353 .collect();
1354 Ok(ExecutorRowBatch { columns, rows })
1355 }
1356
1357 pub fn into_rows(self) -> ExecutorResult<Vec<Vec<PlanValue>>> {
1358 Ok(self.collect_rows()?.rows)
1359 }
1360}
1361
1362impl<P> fmt::Debug for SelectExecution<P>
1363where
1364 P: Pager<Blob = EntryHandle> + Send + Sync,
1365{
1366 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1367 f.debug_struct("SelectExecution")
1368 .field("table_name", &self.table_name)
1369 .field("schema", &self.schema)
1370 .finish()
1371 }
1372}
1373
1374fn expand_order_targets(
1379 order_items: &[OrderByPlan],
1380 projections: &[ScanProjection],
1381) -> ExecutorResult<Vec<OrderByPlan>> {
1382 let mut expanded = Vec::new();
1383
1384 for item in order_items {
1385 match &item.target {
1386 OrderTarget::All => {
1387 if projections.is_empty() {
1388 return Err(Error::InvalidArgumentError(
1389 "ORDER BY ALL requires at least one projection".into(),
1390 ));
1391 }
1392
1393 for (idx, projection) in projections.iter().enumerate() {
1394 if matches!(projection, ScanProjection::Computed { .. }) {
1395 return Err(Error::InvalidArgumentError(
1396 "ORDER BY ALL cannot reference computed projections".into(),
1397 ));
1398 }
1399
1400 let mut clone = item.clone();
1401 clone.target = OrderTarget::Index(idx);
1402 expanded.push(clone);
1403 }
1404 }
1405 _ => expanded.push(item.clone()),
1406 }
1407 }
1408
1409 Ok(expanded)
1410}
1411
1412fn resolve_scan_order<P>(
1413 table: &ExecutorTable<P>,
1414 projections: &[ScanProjection],
1415 order_plan: &OrderByPlan,
1416) -> ExecutorResult<ScanOrderSpec>
1417where
1418 P: Pager<Blob = EntryHandle> + Send + Sync,
1419{
1420 let (column, field_id) = match &order_plan.target {
1421 OrderTarget::Column(name) => {
1422 let column = table.schema.resolve(name).ok_or_else(|| {
1423 Error::InvalidArgumentError(format!("unknown column '{}' in ORDER BY", name))
1424 })?;
1425 (column, column.field_id)
1426 }
1427 OrderTarget::Index(position) => {
1428 let projection = projections.get(*position).ok_or_else(|| {
1429 Error::InvalidArgumentError(format!(
1430 "ORDER BY position {} is out of range",
1431 position + 1
1432 ))
1433 })?;
1434 match projection {
1435 ScanProjection::Column(store_projection) => {
1436 let field_id = store_projection.logical_field_id.field_id();
1437 let column = table.schema.column_by_field_id(field_id).ok_or_else(|| {
1438 Error::InvalidArgumentError(format!(
1439 "unknown column with field id {field_id} in ORDER BY"
1440 ))
1441 })?;
1442 (column, field_id)
1443 }
1444 ScanProjection::Computed { .. } => {
1445 return Err(Error::InvalidArgumentError(
1446 "ORDER BY position referring to computed projection is not supported"
1447 .into(),
1448 ));
1449 }
1450 }
1451 }
1452 OrderTarget::All => {
1453 return Err(Error::InvalidArgumentError(
1454 "ORDER BY ALL should be expanded before execution".into(),
1455 ));
1456 }
1457 };
1458
1459 let transform = match order_plan.sort_type {
1460 OrderSortType::Native => match column.data_type {
1461 DataType::Int64 => ScanOrderTransform::IdentityInteger,
1462 DataType::Utf8 => ScanOrderTransform::IdentityUtf8,
1463 ref other => {
1464 return Err(Error::InvalidArgumentError(format!(
1465 "ORDER BY on column type {:?} is not supported",
1466 other
1467 )));
1468 }
1469 },
1470 OrderSortType::CastTextToInteger => {
1471 if column.data_type != DataType::Utf8 {
1472 return Err(Error::InvalidArgumentError(
1473 "ORDER BY CAST expects a text column".into(),
1474 ));
1475 }
1476 ScanOrderTransform::CastUtf8ToInteger
1477 }
1478 };
1479
1480 let direction = if order_plan.ascending {
1481 ScanOrderDirection::Ascending
1482 } else {
1483 ScanOrderDirection::Descending
1484 };
1485
1486 Ok(ScanOrderSpec {
1487 field_id,
1488 direction,
1489 nulls_first: order_plan.nulls_first,
1490 transform,
1491 })
1492}
1493
1494fn synthesize_null_scan(schema: Arc<Schema>, total_rows: u64) -> ExecutorResult<Vec<RecordBatch>> {
1495 let row_count = usize::try_from(total_rows).map_err(|_| {
1496 Error::InvalidArgumentError("table row count exceeds supported in-memory batch size".into())
1497 })?;
1498
1499 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1500 for field in schema.fields() {
1501 match field.data_type() {
1502 DataType::Int64 => {
1503 let mut builder = Int64Builder::with_capacity(row_count);
1504 for _ in 0..row_count {
1505 builder.append_null();
1506 }
1507 arrays.push(Arc::new(builder.finish()));
1508 }
1509 DataType::Float64 => {
1510 let mut builder = arrow::array::Float64Builder::with_capacity(row_count);
1511 for _ in 0..row_count {
1512 builder.append_null();
1513 }
1514 arrays.push(Arc::new(builder.finish()));
1515 }
1516 DataType::Utf8 => {
1517 let mut builder = arrow::array::StringBuilder::with_capacity(row_count, 0);
1518 for _ in 0..row_count {
1519 builder.append_null();
1520 }
1521 arrays.push(Arc::new(builder.finish()));
1522 }
1523 DataType::Date32 => {
1524 let mut builder = arrow::array::Date32Builder::with_capacity(row_count);
1525 for _ in 0..row_count {
1526 builder.append_null();
1527 }
1528 arrays.push(Arc::new(builder.finish()));
1529 }
1530 other => {
1531 return Err(Error::InvalidArgumentError(format!(
1532 "unsupported data type in null synthesis: {other:?}"
1533 )));
1534 }
1535 }
1536 }
1537
1538 let batch = RecordBatch::try_new(schema, arrays)?;
1539 Ok(vec![batch])
1540}
1541
1542fn sort_record_batch_with_order(
1543 schema: &Arc<Schema>,
1544 batch: &RecordBatch,
1545 order_by: &[OrderByPlan],
1546) -> ExecutorResult<RecordBatch> {
1547 if order_by.is_empty() {
1548 return Ok(batch.clone());
1549 }
1550
1551 let mut sort_columns: Vec<SortColumn> = Vec::with_capacity(order_by.len());
1552
1553 for order in order_by {
1554 let column_index = match &order.target {
1555 OrderTarget::Column(name) => schema.index_of(name).map_err(|_| {
1556 Error::InvalidArgumentError(format!(
1557 "ORDER BY references unknown column '{}'",
1558 name
1559 ))
1560 })?,
1561 OrderTarget::Index(idx) => {
1562 if *idx >= batch.num_columns() {
1563 return Err(Error::InvalidArgumentError(format!(
1564 "ORDER BY position {} is out of bounds for {} columns",
1565 idx + 1,
1566 batch.num_columns()
1567 )));
1568 }
1569 *idx
1570 }
1571 OrderTarget::All => {
1572 return Err(Error::InvalidArgumentError(
1573 "ORDER BY ALL should be expanded before sorting".into(),
1574 ));
1575 }
1576 };
1577
1578 let source_array = batch.column(column_index);
1579
1580 let values: ArrayRef = match order.sort_type {
1581 OrderSortType::Native => Arc::clone(source_array),
1582 OrderSortType::CastTextToInteger => {
1583 let strings = source_array
1584 .as_any()
1585 .downcast_ref::<StringArray>()
1586 .ok_or_else(|| {
1587 Error::InvalidArgumentError(
1588 "ORDER BY CAST expects the underlying column to be TEXT".into(),
1589 )
1590 })?;
1591 let mut builder = Int64Builder::with_capacity(strings.len());
1592 for i in 0..strings.len() {
1593 if strings.is_null(i) {
1594 builder.append_null();
1595 } else {
1596 match strings.value(i).parse::<i64>() {
1597 Ok(value) => builder.append_value(value),
1598 Err(_) => builder.append_null(),
1599 }
1600 }
1601 }
1602 Arc::new(builder.finish()) as ArrayRef
1603 }
1604 };
1605
1606 let sort_options = SortOptions {
1607 descending: !order.ascending,
1608 nulls_first: order.nulls_first,
1609 };
1610
1611 sort_columns.push(SortColumn {
1612 values,
1613 options: Some(sort_options),
1614 });
1615 }
1616
1617 let indices = lexsort_to_indices(&sort_columns, None).map_err(|err| {
1618 Error::InvalidArgumentError(format!("failed to compute ORDER BY indices: {err}"))
1619 })?;
1620
1621 let perm = indices
1622 .as_any()
1623 .downcast_ref::<UInt32Array>()
1624 .ok_or_else(|| Error::Internal("ORDER BY sorting produced unexpected index type".into()))?;
1625
1626 let mut reordered_columns: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
1627 for col_idx in 0..batch.num_columns() {
1628 let reordered = take(batch.column(col_idx), perm, None).map_err(|err| {
1629 Error::InvalidArgumentError(format!(
1630 "failed to apply ORDER BY permutation to column {col_idx}: {err}"
1631 ))
1632 })?;
1633 reordered_columns.push(reordered);
1634 }
1635
1636 RecordBatch::try_new(Arc::clone(schema), reordered_columns)
1637 .map_err(|err| Error::Internal(format!("failed to build reordered ORDER BY batch: {err}")))
1638}