1use arrow::array::{Array, ArrayRef, Int64Builder, RecordBatch};
2use arrow::datatypes::{DataType, Schema};
3use llkv_aggregate::{AggregateAccumulator, AggregateKind, AggregateSpec, AggregateState};
4use llkv_column_map::store::Projection as StoreProjection;
5use llkv_column_map::types::LogicalFieldId;
6use llkv_expr::expr::{Expr as LlkvExpr, Filter, Operator, ScalarExpr};
7use llkv_plan::{
8 AggregateExpr, AggregateFunction, OrderByPlan, OrderSortType, OrderTarget, PlanValue,
9 SelectPlan, SelectProjection,
10};
11use llkv_result::Error;
12use llkv_storage::pager::Pager;
13use llkv_table::table::{
14 RowIdFilter, ScanOrderDirection, ScanOrderSpec, ScanOrderTransform, ScanProjection,
15 ScanStreamOptions,
16};
17use llkv_table::types::FieldId;
18use rustc_hash::FxHashMap;
19use simd_r_drive_entry_handle::EntryHandle;
20use std::fmt;
21use std::ops::Bound;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicU64, Ordering};
24
25pub type ExecutorResult<T> = Result<T, Error>;
26
27mod projections;
28mod schema;
29pub use projections::{build_projected_columns, build_wildcard_projections};
30pub use schema::schema_for_projections;
31
32pub trait TableProvider<P>
34where
35 P: Pager<Blob = EntryHandle> + Send + Sync,
36{
37 fn get_table(&self, canonical_name: &str) -> ExecutorResult<Arc<ExecutorTable<P>>>;
38}
39
40pub struct QueryExecutor<P>
42where
43 P: Pager<Blob = EntryHandle> + Send + Sync,
44{
45 provider: Arc<dyn TableProvider<P>>,
46}
47
48impl<P> QueryExecutor<P>
49where
50 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
51{
52 pub fn new(provider: Arc<dyn TableProvider<P>>) -> Self {
53 Self { provider }
54 }
55
56 pub fn execute_select(&self, plan: SelectPlan) -> ExecutorResult<SelectExecution<P>> {
57 self.execute_select_with_filter(plan, None)
58 }
59
60 pub fn execute_select_with_filter(
61 &self,
62 plan: SelectPlan,
63 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
64 ) -> ExecutorResult<SelectExecution<P>> {
65 let table = self.provider.get_table(&plan.table)?;
66 let display_name = plan.table.clone();
67
68 if !plan.aggregates.is_empty() {
69 self.execute_aggregates(table, display_name, plan, row_filter)
70 } else if self.has_computed_aggregates(&plan) {
71 self.execute_computed_aggregates(table, display_name, plan, row_filter)
73 } else {
74 self.execute_projection(table, display_name, plan, row_filter)
75 }
76 }
77
78 fn has_computed_aggregates(&self, plan: &SelectPlan) -> bool {
80 plan.projections.iter().any(|proj| {
81 if let SelectProjection::Computed { expr, .. } = proj {
82 Self::expr_contains_aggregate(expr)
83 } else {
84 false
85 }
86 })
87 }
88
89 fn expr_contains_aggregate(expr: &ScalarExpr<String>) -> bool {
91 match expr {
92 ScalarExpr::Aggregate(_) => true,
93 ScalarExpr::Binary { left, right, .. } => {
94 Self::expr_contains_aggregate(left) || Self::expr_contains_aggregate(right)
95 }
96 ScalarExpr::Column(_) | ScalarExpr::Literal(_) => false,
97 }
98 }
99
100 fn execute_projection(
101 &self,
102 table: Arc<ExecutorTable<P>>,
103 display_name: String,
104 plan: SelectPlan,
105 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
106 ) -> ExecutorResult<SelectExecution<P>> {
107 let table_ref = table.as_ref();
108 let projections = if plan.projections.is_empty() {
109 build_wildcard_projections(table_ref)
110 } else {
111 build_projected_columns(table_ref, &plan.projections)?
112 };
113 let schema = schema_for_projections(table_ref, &projections)?;
114
115 let (filter_expr, full_table_scan) = match plan.filter {
116 Some(expr) => (translate_predicate(expr, table_ref.schema.as_ref())?, false),
117 None => {
118 let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
119 Error::InvalidArgumentError(
120 "table has no columns; cannot perform wildcard scan".into(),
121 )
122 })?;
123 (full_table_scan_filter(field_id), true)
124 }
125 };
126
127 let options = if let Some(order_plan) = &plan.order_by {
128 let order_spec = resolve_scan_order(table_ref, &projections, order_plan)?;
129 if row_filter.is_some() {
130 tracing::debug!("Applying MVCC row filter with ORDER BY");
131 }
132 ScanStreamOptions {
133 include_nulls: true,
134 order: Some(order_spec),
135 row_id_filter: row_filter.clone(),
136 }
137 } else {
138 if row_filter.is_some() {
139 tracing::debug!("Applying MVCC row filter");
140 }
141 ScanStreamOptions {
142 include_nulls: true,
143 order: None,
144 row_id_filter: row_filter.clone(),
145 }
146 };
147
148 Ok(SelectExecution::new_projection(
149 display_name,
150 schema,
151 table,
152 projections,
153 filter_expr,
154 options,
155 full_table_scan,
156 ))
157 }
158
159 fn execute_aggregates(
160 &self,
161 table: Arc<ExecutorTable<P>>,
162 display_name: String,
163 plan: SelectPlan,
164 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
165 ) -> ExecutorResult<SelectExecution<P>> {
166 let table_ref = table.as_ref();
167 let mut specs: Vec<AggregateSpec> = Vec::with_capacity(plan.aggregates.len());
168 for aggregate in plan.aggregates {
169 match aggregate {
170 AggregateExpr::CountStar { alias } => {
171 specs.push(AggregateSpec {
172 alias,
173 kind: AggregateKind::CountStar,
174 });
175 }
176 AggregateExpr::Column {
177 column,
178 alias,
179 function,
180 } => {
181 let col = table_ref.schema.resolve(&column).ok_or_else(|| {
182 Error::InvalidArgumentError(format!(
183 "unknown column '{}' in aggregate",
184 column
185 ))
186 })?;
187 let kind = match function {
188 AggregateFunction::Count => AggregateKind::CountField {
189 field_id: col.field_id,
190 },
191 AggregateFunction::SumInt64 => {
192 if col.data_type != DataType::Int64 {
193 return Err(Error::InvalidArgumentError(
194 "SUM currently supports only INTEGER columns".into(),
195 ));
196 }
197 AggregateKind::SumInt64 {
198 field_id: col.field_id,
199 }
200 }
201 AggregateFunction::MinInt64 => {
202 if col.data_type != DataType::Int64 {
203 return Err(Error::InvalidArgumentError(
204 "MIN currently supports only INTEGER columns".into(),
205 ));
206 }
207 AggregateKind::MinInt64 {
208 field_id: col.field_id,
209 }
210 }
211 AggregateFunction::MaxInt64 => {
212 if col.data_type != DataType::Int64 {
213 return Err(Error::InvalidArgumentError(
214 "MAX currently supports only INTEGER columns".into(),
215 ));
216 }
217 AggregateKind::MaxInt64 {
218 field_id: col.field_id,
219 }
220 }
221 AggregateFunction::CountNulls => AggregateKind::CountNulls {
222 field_id: col.field_id,
223 },
224 };
225 specs.push(AggregateSpec { alias, kind });
226 }
227 }
228 }
229
230 if specs.is_empty() {
231 return Err(Error::InvalidArgumentError(
232 "aggregate query requires at least one aggregate expression".into(),
233 ));
234 }
235
236 let had_filter = plan.filter.is_some();
237 let filter_expr = match plan.filter {
238 Some(expr) => translate_predicate(expr, table.schema.as_ref())?,
239 None => {
240 let field_id = table.schema.first_field_id().ok_or_else(|| {
241 Error::InvalidArgumentError(
242 "table has no columns; cannot perform aggregate scan".into(),
243 )
244 })?;
245 full_table_scan_filter(field_id)
246 }
247 };
248
249 let mut projections = Vec::new();
251 let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
252
253 for spec in &specs {
254 if let Some(field_id) = spec.kind.field_id() {
255 let proj_idx = projections.len();
256 spec_to_projection.push(Some(proj_idx));
257 projections.push(ScanProjection::from(StoreProjection::with_alias(
258 LogicalFieldId::for_user(table.table.table_id(), field_id),
259 table
260 .schema
261 .column_by_field_id(field_id)
262 .map(|c| c.name.clone())
263 .unwrap_or_else(|| format!("col{field_id}")),
264 )));
265 } else {
266 spec_to_projection.push(None);
267 }
268 }
269
270 if projections.is_empty() {
271 let field_id = table.schema.first_field_id().ok_or_else(|| {
272 Error::InvalidArgumentError(
273 "table has no columns; cannot perform aggregate scan".into(),
274 )
275 })?;
276 projections.push(ScanProjection::from(StoreProjection::with_alias(
277 LogicalFieldId::for_user(table.table.table_id(), field_id),
278 table
279 .schema
280 .column_by_field_id(field_id)
281 .map(|c| c.name.clone())
282 .unwrap_or_else(|| format!("col{field_id}")),
283 )));
284 }
285
286 let options = ScanStreamOptions {
287 include_nulls: true,
288 order: None,
289 row_id_filter: row_filter.clone(),
290 };
291
292 let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
293 let mut count_star_override: Option<i64> = None;
297 if !had_filter && row_filter.is_none() {
298 let total_rows = table.total_rows.load(Ordering::SeqCst);
300 tracing::debug!(
301 "[AGGREGATE] Using COUNT(*) shortcut: total_rows={}",
302 total_rows
303 );
304 if total_rows > i64::MAX as u64 {
305 return Err(Error::InvalidArgumentError(
306 "COUNT(*) result exceeds supported range".into(),
307 ));
308 }
309 count_star_override = Some(total_rows as i64);
310 } else {
311 tracing::debug!(
312 "[AGGREGATE] NOT using COUNT(*) shortcut: had_filter={}, has_row_filter={}",
313 had_filter,
314 row_filter.is_some()
315 );
316 }
317
318 for (idx, spec) in specs.iter().enumerate() {
319 states.push(AggregateState {
320 alias: spec.alias.clone(),
321 accumulator: AggregateAccumulator::new_with_projection_index(
322 spec,
323 spec_to_projection[idx],
324 count_star_override,
325 )?,
326 override_value: match spec.kind {
327 AggregateKind::CountStar => {
328 tracing::debug!(
329 "[AGGREGATE] CountStar override_value={:?}",
330 count_star_override
331 );
332 count_star_override
333 }
334 _ => None,
335 },
336 });
337 }
338
339 let mut error: Option<Error> = None;
340 match table.table.scan_stream(
341 projections,
342 &filter_expr,
343 ScanStreamOptions {
344 row_id_filter: row_filter.clone(),
345 ..options
346 },
347 |batch| {
348 if error.is_some() {
349 return;
350 }
351 for state in &mut states {
352 if let Err(err) = state.update(&batch) {
353 error = Some(err);
354 return;
355 }
356 }
357 },
358 ) {
359 Ok(()) => {}
360 Err(llkv_result::Error::NotFound) => {
361 }
364 Err(err) => return Err(err),
365 }
366 if let Some(err) = error {
367 return Err(err);
368 }
369
370 let mut fields = Vec::with_capacity(states.len());
371 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(states.len());
372 for state in states {
373 let (field, array) = state.finalize()?;
374 fields.push(field);
375 arrays.push(array);
376 }
377
378 let schema = Arc::new(Schema::new(fields));
379 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
380 Ok(SelectExecution::new_single_batch(
381 display_name,
382 schema,
383 batch,
384 ))
385 }
386
387 fn execute_computed_aggregates(
390 &self,
391 table: Arc<ExecutorTable<P>>,
392 display_name: String,
393 plan: SelectPlan,
394 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
395 ) -> ExecutorResult<SelectExecution<P>> {
396 use arrow::array::Int64Array;
397 use llkv_expr::expr::AggregateCall;
398
399 let table_ref = table.as_ref();
400
401 let mut aggregate_specs: Vec<(String, AggregateCall<String>)> = Vec::new();
403 for proj in &plan.projections {
404 if let SelectProjection::Computed { expr, .. } = proj {
405 Self::collect_aggregates(expr, &mut aggregate_specs);
406 }
407 }
408
409 let computed_aggregates = self.compute_aggregate_values(
411 table.clone(),
412 &plan.filter,
413 &aggregate_specs,
414 row_filter.clone(),
415 )?;
416
417 let mut fields = Vec::with_capacity(plan.projections.len());
419 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(plan.projections.len());
420
421 for proj in &plan.projections {
422 match proj {
423 SelectProjection::AllColumns => {
424 return Err(Error::InvalidArgumentError(
425 "AllColumns projection not supported with computed aggregates".into(),
426 ));
427 }
428 SelectProjection::Column { name, alias } => {
429 let col = table_ref.schema.resolve(name).ok_or_else(|| {
430 Error::InvalidArgumentError(format!("unknown column '{}'", name))
431 })?;
432 let field_name = alias.as_ref().unwrap_or(name);
433 fields.push(arrow::datatypes::Field::new(
434 field_name,
435 col.data_type.clone(),
436 col.nullable,
437 ));
438 return Err(Error::InvalidArgumentError(
441 "Regular columns not supported in aggregate queries without GROUP BY"
442 .into(),
443 ));
444 }
445 SelectProjection::Computed { expr, alias } => {
446 let value = Self::evaluate_expr_with_aggregates(expr, &computed_aggregates)?;
448
449 fields.push(arrow::datatypes::Field::new(alias, DataType::Int64, false));
450
451 let array = Arc::new(Int64Array::from(vec![value])) as ArrayRef;
452 arrays.push(array);
453 }
454 }
455 }
456
457 let schema = Arc::new(Schema::new(fields));
458 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
459 Ok(SelectExecution::new_single_batch(
460 display_name,
461 schema,
462 batch,
463 ))
464 }
465
466 fn collect_aggregates(
468 expr: &ScalarExpr<String>,
469 aggregates: &mut Vec<(String, llkv_expr::expr::AggregateCall<String>)>,
470 ) {
471 match expr {
472 ScalarExpr::Aggregate(agg) => {
473 let key = format!("{:?}", agg);
475 if !aggregates.iter().any(|(k, _)| k == &key) {
476 aggregates.push((key, agg.clone()));
477 }
478 }
479 ScalarExpr::Binary { left, right, .. } => {
480 Self::collect_aggregates(left, aggregates);
481 Self::collect_aggregates(right, aggregates);
482 }
483 ScalarExpr::Column(_) | ScalarExpr::Literal(_) => {}
484 }
485 }
486
487 fn compute_aggregate_values(
489 &self,
490 table: Arc<ExecutorTable<P>>,
491 filter: &Option<llkv_expr::expr::Expr<'static, String>>,
492 aggregate_specs: &[(String, llkv_expr::expr::AggregateCall<String>)],
493 row_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
494 ) -> ExecutorResult<FxHashMap<String, i64>> {
495 use llkv_expr::expr::AggregateCall;
496
497 let table_ref = table.as_ref();
498 let mut results =
499 FxHashMap::with_capacity_and_hasher(aggregate_specs.len(), Default::default());
500
501 let mut specs: Vec<AggregateSpec> = Vec::new();
503 for (key, agg) in aggregate_specs {
504 let kind = match agg {
505 AggregateCall::CountStar => AggregateKind::CountStar,
506 AggregateCall::Count(col_name) => {
507 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
508 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
509 })?;
510 AggregateKind::CountField {
511 field_id: col.field_id,
512 }
513 }
514 AggregateCall::Sum(col_name) => {
515 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
516 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
517 })?;
518 AggregateKind::SumInt64 {
519 field_id: col.field_id,
520 }
521 }
522 AggregateCall::Min(col_name) => {
523 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
524 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
525 })?;
526 AggregateKind::MinInt64 {
527 field_id: col.field_id,
528 }
529 }
530 AggregateCall::Max(col_name) => {
531 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
532 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
533 })?;
534 AggregateKind::MaxInt64 {
535 field_id: col.field_id,
536 }
537 }
538 AggregateCall::CountNulls(col_name) => {
539 let col = table_ref.schema.resolve(col_name).ok_or_else(|| {
540 Error::InvalidArgumentError(format!("unknown column '{}'", col_name))
541 })?;
542 AggregateKind::CountNulls {
543 field_id: col.field_id,
544 }
545 }
546 };
547 specs.push(AggregateSpec {
548 alias: key.clone(),
549 kind,
550 });
551 }
552
553 let filter_expr = match filter {
555 Some(expr) => translate_predicate(expr.clone(), table_ref.schema.as_ref())?,
556 None => {
557 let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
558 Error::InvalidArgumentError(
559 "table has no columns; cannot perform aggregate scan".into(),
560 )
561 })?;
562 full_table_scan_filter(field_id)
563 }
564 };
565
566 let mut projections: Vec<ScanProjection> = Vec::new();
567 let mut spec_to_projection: Vec<Option<usize>> = Vec::with_capacity(specs.len());
568 let count_star_override: Option<i64> = None;
569
570 for spec in &specs {
571 if let Some(field_id) = spec.kind.field_id() {
572 spec_to_projection.push(Some(projections.len()));
573 projections.push(ScanProjection::from(StoreProjection::with_alias(
574 LogicalFieldId::for_user(table.table.table_id(), field_id),
575 table
576 .schema
577 .column_by_field_id(field_id)
578 .map(|c| c.name.clone())
579 .unwrap_or_else(|| format!("col{field_id}")),
580 )));
581 } else {
582 spec_to_projection.push(None);
583 }
584 }
585
586 if projections.is_empty() {
587 let field_id = table_ref.schema.first_field_id().ok_or_else(|| {
588 Error::InvalidArgumentError(
589 "table has no columns; cannot perform aggregate scan".into(),
590 )
591 })?;
592 projections.push(ScanProjection::from(StoreProjection::with_alias(
593 LogicalFieldId::for_user(table.table.table_id(), field_id),
594 table
595 .schema
596 .column_by_field_id(field_id)
597 .map(|c| c.name.clone())
598 .unwrap_or_else(|| format!("col{field_id}")),
599 )));
600 }
601
602 let base_options = ScanStreamOptions {
603 include_nulls: true,
604 order: None,
605 row_id_filter: None,
606 };
607
608 let mut states: Vec<AggregateState> = Vec::with_capacity(specs.len());
609 for (idx, spec) in specs.iter().enumerate() {
610 states.push(AggregateState {
611 alias: spec.alias.clone(),
612 accumulator: AggregateAccumulator::new_with_projection_index(
613 spec,
614 spec_to_projection[idx],
615 count_star_override,
616 )?,
617 override_value: match spec.kind {
618 AggregateKind::CountStar => count_star_override,
619 _ => None,
620 },
621 });
622 }
623
624 let mut error: Option<Error> = None;
625 match table.table.scan_stream(
626 projections,
627 &filter_expr,
628 ScanStreamOptions {
629 row_id_filter: row_filter.clone(),
630 ..base_options
631 },
632 |batch| {
633 if error.is_some() {
634 return;
635 }
636 for state in &mut states {
637 if let Err(err) = state.update(&batch) {
638 error = Some(err);
639 return;
640 }
641 }
642 },
643 ) {
644 Ok(()) => {}
645 Err(llkv_result::Error::NotFound) => {}
646 Err(err) => return Err(err),
647 }
648 if let Some(err) = error {
649 return Err(err);
650 }
651
652 for state in states {
654 let alias = state.alias.clone();
655 let (_field, array) = state.finalize()?;
656
657 let int64_array = array
659 .as_any()
660 .downcast_ref::<arrow::array::Int64Array>()
661 .ok_or_else(|| Error::Internal("Expected Int64Array from aggregate".into()))?;
662
663 if int64_array.len() != 1 {
664 return Err(Error::Internal(format!(
665 "Expected single value from aggregate, got {}",
666 int64_array.len()
667 )));
668 }
669
670 let value = if int64_array.is_null(0) {
671 0
672 } else {
673 int64_array.value(0)
674 };
675
676 results.insert(alias, value);
677 }
678
679 Ok(results)
680 }
681
682 fn evaluate_expr_with_aggregates(
684 expr: &ScalarExpr<String>,
685 aggregates: &FxHashMap<String, i64>,
686 ) -> ExecutorResult<i64> {
687 use llkv_expr::expr::BinaryOp;
688 use llkv_expr::literal::Literal;
689
690 match expr {
691 ScalarExpr::Literal(Literal::Integer(v)) => Ok(*v as i64),
692 ScalarExpr::Literal(Literal::Float(v)) => Ok(*v as i64),
693 ScalarExpr::Literal(Literal::String(_)) => Err(Error::InvalidArgumentError(
694 "String literals not supported in aggregate expressions".into(),
695 )),
696 ScalarExpr::Column(_) => Err(Error::InvalidArgumentError(
697 "Column references not supported in aggregate-only expressions".into(),
698 )),
699 ScalarExpr::Aggregate(agg) => {
700 let key = format!("{:?}", agg);
701 aggregates.get(&key).copied().ok_or_else(|| {
702 Error::Internal(format!("Aggregate value not found for key: {}", key))
703 })
704 }
705 ScalarExpr::Binary { left, op, right } => {
706 let left_val = Self::evaluate_expr_with_aggregates(left, aggregates)?;
707 let right_val = Self::evaluate_expr_with_aggregates(right, aggregates)?;
708
709 let result = match op {
710 BinaryOp::Add => left_val.checked_add(right_val),
711 BinaryOp::Subtract => left_val.checked_sub(right_val),
712 BinaryOp::Multiply => left_val.checked_mul(right_val),
713 BinaryOp::Divide => {
714 if right_val == 0 {
715 return Err(Error::InvalidArgumentError("Division by zero".into()));
716 }
717 left_val.checked_div(right_val)
718 }
719 BinaryOp::Modulo => {
720 if right_val == 0 {
721 return Err(Error::InvalidArgumentError("Modulo by zero".into()));
722 }
723 left_val.checked_rem(right_val)
724 }
725 };
726
727 result.ok_or_else(|| {
728 Error::InvalidArgumentError("Arithmetic overflow in expression".into())
729 })
730 }
731 }
732 }
733}
734
735#[derive(Clone)]
737pub struct SelectExecution<P>
738where
739 P: Pager<Blob = EntryHandle> + Send + Sync,
740{
741 table_name: String,
742 schema: Arc<Schema>,
743 stream: SelectStream<P>,
744}
745
746#[derive(Clone)]
747enum SelectStream<P>
748where
749 P: Pager<Blob = EntryHandle> + Send + Sync,
750{
751 Projection {
752 table: Arc<ExecutorTable<P>>,
753 projections: Vec<ScanProjection>,
754 filter_expr: LlkvExpr<'static, FieldId>,
755 options: ScanStreamOptions<P>,
756 full_table_scan: bool,
757 },
758 Aggregation {
759 batch: RecordBatch,
760 },
761}
762
763impl<P> SelectExecution<P>
764where
765 P: Pager<Blob = EntryHandle> + Send + Sync,
766{
767 fn new_projection(
768 table_name: String,
769 schema: Arc<Schema>,
770 table: Arc<ExecutorTable<P>>,
771 projections: Vec<ScanProjection>,
772 filter_expr: LlkvExpr<'static, FieldId>,
773 options: ScanStreamOptions<P>,
774 full_table_scan: bool,
775 ) -> Self {
776 Self {
777 table_name,
778 schema,
779 stream: SelectStream::Projection {
780 table,
781 projections,
782 filter_expr,
783 options,
784 full_table_scan,
785 },
786 }
787 }
788
789 fn new_single_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
790 Self {
791 table_name,
792 schema,
793 stream: SelectStream::Aggregation { batch },
794 }
795 }
796
797 pub fn from_batch(table_name: String, schema: Arc<Schema>, batch: RecordBatch) -> Self {
798 Self::new_single_batch(table_name, schema, batch)
799 }
800
801 pub fn table_name(&self) -> &str {
802 &self.table_name
803 }
804
805 pub fn schema(&self) -> Arc<Schema> {
806 Arc::clone(&self.schema)
807 }
808
809 pub fn stream(
810 self,
811 mut on_batch: impl FnMut(RecordBatch) -> ExecutorResult<()>,
812 ) -> ExecutorResult<()> {
813 let schema = Arc::clone(&self.schema);
814 match self.stream {
815 SelectStream::Projection {
816 table,
817 projections,
818 filter_expr,
819 options,
820 full_table_scan,
821 } => {
822 let total_rows = table.total_rows.load(Ordering::SeqCst);
824 if total_rows == 0 {
825 return Ok(());
827 }
828
829 let mut error: Option<Error> = None;
830 let mut produced = false;
831 let mut produced_rows: u64 = 0;
832 let capture_nulls_first = matches!(options.order, Some(spec) if spec.nulls_first);
833 let include_nulls = options.include_nulls;
834 let has_row_id_filter = options.row_id_filter.is_some();
835 let scan_options = options;
836 let mut buffered_batches: Vec<RecordBatch> = Vec::new();
837 table
838 .table
839 .scan_stream(projections, &filter_expr, scan_options, |batch| {
840 if error.is_some() {
841 return;
842 }
843 produced = true;
844 produced_rows = produced_rows.saturating_add(batch.num_rows() as u64);
845 if capture_nulls_first {
846 buffered_batches.push(batch);
847 } else if let Err(err) = on_batch(batch) {
848 error = Some(err);
849 }
850 })?;
851 if let Some(err) = error {
852 return Err(err);
853 }
854 if !produced {
855 if total_rows > 0 {
856 for batch in synthesize_null_scan(Arc::clone(&schema), total_rows)? {
857 on_batch(batch)?;
858 }
859 }
860 return Ok(());
861 }
862 let mut null_batches: Vec<RecordBatch> = Vec::new();
863 if include_nulls
869 && full_table_scan
870 && produced_rows < total_rows
871 && !has_row_id_filter
872 {
873 let missing = total_rows - produced_rows;
874 if missing > 0 {
875 null_batches = synthesize_null_scan(Arc::clone(&schema), missing)?;
876 }
877 }
878
879 if capture_nulls_first {
880 for batch in null_batches {
881 on_batch(batch)?;
882 }
883 for batch in buffered_batches {
884 on_batch(batch)?;
885 }
886 } else if !null_batches.is_empty() {
887 for batch in null_batches {
888 on_batch(batch)?;
889 }
890 }
891 Ok(())
892 }
893 SelectStream::Aggregation { batch } => on_batch(batch),
894 }
895 }
896
897 pub fn collect(self) -> ExecutorResult<Vec<RecordBatch>> {
898 let mut batches = Vec::new();
899 self.stream(|batch| {
900 batches.push(batch);
901 Ok(())
902 })?;
903 Ok(batches)
904 }
905
906 pub fn collect_rows(self) -> ExecutorResult<RowBatch> {
907 let schema = self.schema();
908 let mut rows: Vec<Vec<PlanValue>> = Vec::new();
909 self.stream(|batch| {
910 for row_idx in 0..batch.num_rows() {
911 let mut row: Vec<PlanValue> = Vec::with_capacity(batch.num_columns());
912 for col_idx in 0..batch.num_columns() {
913 let value = llkv_plan::plan_value_from_array(batch.column(col_idx), row_idx)?;
914 row.push(value);
915 }
916 rows.push(row);
917 }
918 Ok(())
919 })?;
920 let columns = schema
921 .fields()
922 .iter()
923 .map(|field| field.name().to_string())
924 .collect();
925 Ok(RowBatch { columns, rows })
926 }
927
928 pub fn into_rows(self) -> ExecutorResult<Vec<Vec<PlanValue>>> {
929 Ok(self.collect_rows()?.rows)
930 }
931}
932
933impl<P> fmt::Debug for SelectExecution<P>
934where
935 P: Pager<Blob = EntryHandle> + Send + Sync,
936{
937 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
938 f.debug_struct("SelectExecution")
939 .field("table_name", &self.table_name)
940 .field("schema", &self.schema)
941 .finish()
942 }
943}
944
945pub struct ExecutorTable<P>
946where
947 P: Pager<Blob = EntryHandle> + Send + Sync,
948{
949 pub table: Arc<llkv_table::table::Table<P>>,
951 pub schema: Arc<ExecutorSchema>,
952 pub next_row_id: AtomicU64,
953 pub total_rows: AtomicU64,
954}
955
956pub struct ExecutorSchema {
957 pub columns: Vec<ExecutorColumn>,
958 pub lookup: FxHashMap<String, usize>,
959}
960
961impl ExecutorSchema {
962 pub fn resolve(&self, name: &str) -> Option<&ExecutorColumn> {
963 let normalized = name.to_ascii_lowercase();
964 self.lookup
965 .get(&normalized)
966 .and_then(|idx| self.columns.get(*idx))
967 }
968
969 pub fn first_field_id(&self) -> Option<FieldId> {
970 self.columns.first().map(|col| col.field_id)
971 }
972
973 pub fn column_by_field_id(&self, field_id: FieldId) -> Option<&ExecutorColumn> {
974 self.columns.iter().find(|col| col.field_id == field_id)
975 }
976}
977
978#[derive(Clone)]
979pub struct ExecutorColumn {
980 pub name: String,
981 pub data_type: DataType,
982 pub nullable: bool,
983 pub primary_key: bool,
984 pub field_id: FieldId,
985}
986
987pub struct RowBatch {
996 pub columns: Vec<String>,
997 pub rows: Vec<Vec<PlanValue>>,
998}
999
1000fn resolve_scan_order<P>(
1001 table: &ExecutorTable<P>,
1002 projections: &[ScanProjection],
1003 order_plan: &OrderByPlan,
1004) -> ExecutorResult<ScanOrderSpec>
1005where
1006 P: Pager<Blob = EntryHandle> + Send + Sync,
1007{
1008 let (column, field_id) = match &order_plan.target {
1009 OrderTarget::Column(name) => {
1010 let column = table.schema.resolve(name).ok_or_else(|| {
1011 Error::InvalidArgumentError(format!("unknown column '{}' in ORDER BY", name))
1012 })?;
1013 (column, column.field_id)
1014 }
1015 OrderTarget::Index(position) => {
1016 let projection = projections.get(*position).ok_or_else(|| {
1017 Error::InvalidArgumentError(format!(
1018 "ORDER BY position {} is out of range",
1019 position + 1
1020 ))
1021 })?;
1022 match projection {
1023 ScanProjection::Column(store_projection) => {
1024 let field_id = store_projection.logical_field_id.field_id();
1025 let column = table.schema.column_by_field_id(field_id).ok_or_else(|| {
1026 Error::InvalidArgumentError(format!(
1027 "unknown column with field id {field_id} in ORDER BY"
1028 ))
1029 })?;
1030 (column, field_id)
1031 }
1032 ScanProjection::Computed { .. } => {
1033 return Err(Error::InvalidArgumentError(
1034 "ORDER BY position referring to computed projection is not supported"
1035 .into(),
1036 ));
1037 }
1038 }
1039 }
1040 };
1041
1042 let transform = match order_plan.sort_type {
1043 OrderSortType::Native => match column.data_type {
1044 DataType::Int64 => ScanOrderTransform::IdentityInteger,
1045 DataType::Utf8 => ScanOrderTransform::IdentityUtf8,
1046 ref other => {
1047 return Err(Error::InvalidArgumentError(format!(
1048 "ORDER BY on column type {:?} is not supported",
1049 other
1050 )));
1051 }
1052 },
1053 OrderSortType::CastTextToInteger => {
1054 if column.data_type != DataType::Utf8 {
1055 return Err(Error::InvalidArgumentError(
1056 "ORDER BY CAST expects a text column".into(),
1057 ));
1058 }
1059 ScanOrderTransform::CastUtf8ToInteger
1060 }
1061 };
1062
1063 let direction = if order_plan.ascending {
1064 ScanOrderDirection::Ascending
1065 } else {
1066 ScanOrderDirection::Descending
1067 };
1068
1069 Ok(ScanOrderSpec {
1070 field_id,
1071 direction,
1072 nulls_first: order_plan.nulls_first,
1073 transform,
1074 })
1075}
1076
1077fn full_table_scan_filter(field_id: FieldId) -> LlkvExpr<'static, FieldId> {
1078 LlkvExpr::Pred(Filter {
1079 field_id,
1080 op: Operator::Range {
1081 lower: Bound::Unbounded,
1082 upper: Bound::Unbounded,
1083 },
1084 })
1085}
1086
1087fn synthesize_null_scan(schema: Arc<Schema>, total_rows: u64) -> ExecutorResult<Vec<RecordBatch>> {
1088 let row_count = usize::try_from(total_rows).map_err(|_| {
1089 Error::InvalidArgumentError("table row count exceeds supported in-memory batch size".into())
1090 })?;
1091
1092 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1093 for field in schema.fields() {
1094 match field.data_type() {
1095 DataType::Int64 => {
1096 let mut builder = Int64Builder::with_capacity(row_count);
1097 for _ in 0..row_count {
1098 builder.append_null();
1099 }
1100 arrays.push(Arc::new(builder.finish()));
1101 }
1102 DataType::Float64 => {
1103 let mut builder = arrow::array::Float64Builder::with_capacity(row_count);
1104 for _ in 0..row_count {
1105 builder.append_null();
1106 }
1107 arrays.push(Arc::new(builder.finish()));
1108 }
1109 DataType::Utf8 => {
1110 let mut builder = arrow::array::StringBuilder::with_capacity(row_count, 0);
1111 for _ in 0..row_count {
1112 builder.append_null();
1113 }
1114 arrays.push(Arc::new(builder.finish()));
1115 }
1116 DataType::Date32 => {
1117 let mut builder = arrow::array::Date32Builder::with_capacity(row_count);
1118 for _ in 0..row_count {
1119 builder.append_null();
1120 }
1121 arrays.push(Arc::new(builder.finish()));
1122 }
1123 other => {
1124 return Err(Error::InvalidArgumentError(format!(
1125 "unsupported data type in null synthesis: {other:?}"
1126 )));
1127 }
1128 }
1129 }
1130
1131 let batch = RecordBatch::try_new(schema, arrays)?;
1132 Ok(vec![batch])
1133}
1134
1135fn translate_predicate(
1137 expr: llkv_expr::expr::Expr<'static, String>,
1138 schema: &ExecutorSchema,
1139) -> ExecutorResult<llkv_expr::expr::Expr<'static, FieldId>> {
1140 use llkv_expr::expr::Expr;
1141 match expr {
1142 Expr::And(exprs) => {
1143 let translated: Result<Vec<_>, _> = exprs
1144 .into_iter()
1145 .map(|e| translate_predicate(e, schema))
1146 .collect();
1147 Ok(Expr::And(translated?))
1148 }
1149 Expr::Or(exprs) => {
1150 let translated: Result<Vec<_>, _> = exprs
1151 .into_iter()
1152 .map(|e| translate_predicate(e, schema))
1153 .collect();
1154 Ok(Expr::Or(translated?))
1155 }
1156 Expr::Not(inner) => {
1157 let translated = translate_predicate(*inner, schema)?;
1158 Ok(Expr::Not(Box::new(translated)))
1159 }
1160 Expr::Pred(filter) => {
1161 let column = schema.resolve(&filter.field_id).ok_or_else(|| {
1162 Error::InvalidArgumentError(format!("unknown column '{}'", filter.field_id))
1163 })?;
1164 Ok(Expr::Pred(Filter {
1165 field_id: column.field_id,
1166 op: filter.op,
1167 }))
1168 }
1169 Expr::Compare { left, op, right } => Ok(Expr::Compare {
1170 left: translate_scalar(&left, schema)?,
1171 op,
1172 right: translate_scalar(&right, schema)?,
1173 }),
1174 }
1175}
1176
1177fn translate_scalar(
1179 expr: &ScalarExpr<String>,
1180 schema: &ExecutorSchema,
1181) -> ExecutorResult<ScalarExpr<FieldId>> {
1182 match expr {
1183 ScalarExpr::Literal(lit) => Ok(ScalarExpr::Literal(lit.clone())),
1184 ScalarExpr::Column(name) => {
1185 let column = schema
1186 .resolve(name)
1187 .ok_or_else(|| Error::InvalidArgumentError(format!("unknown column '{}'", name)))?;
1188 Ok(ScalarExpr::Column(column.field_id))
1189 }
1190 ScalarExpr::Binary { left, op, right } => Ok(ScalarExpr::Binary {
1191 left: Box::new(translate_scalar(left, schema)?),
1192 op: *op,
1193 right: Box::new(translate_scalar(right, schema)?),
1194 }),
1195 ScalarExpr::Aggregate(agg) => {
1196 use llkv_expr::expr::AggregateCall;
1198 let translated_agg = match agg {
1199 AggregateCall::CountStar => AggregateCall::CountStar,
1200 AggregateCall::Count(name) => {
1201 let column = schema.resolve(name).ok_or_else(|| {
1202 Error::InvalidArgumentError(format!("unknown column '{}'", name))
1203 })?;
1204 AggregateCall::Count(column.field_id)
1205 }
1206 AggregateCall::Sum(name) => {
1207 let column = schema.resolve(name).ok_or_else(|| {
1208 Error::InvalidArgumentError(format!("unknown column '{}'", name))
1209 })?;
1210 AggregateCall::Sum(column.field_id)
1211 }
1212 AggregateCall::Min(name) => {
1213 let column = schema.resolve(name).ok_or_else(|| {
1214 Error::InvalidArgumentError(format!("unknown column '{}'", name))
1215 })?;
1216 AggregateCall::Min(column.field_id)
1217 }
1218 AggregateCall::Max(name) => {
1219 let column = schema.resolve(name).ok_or_else(|| {
1220 Error::InvalidArgumentError(format!("unknown column '{}'", name))
1221 })?;
1222 AggregateCall::Max(column.field_id)
1223 }
1224 AggregateCall::CountNulls(name) => {
1225 let column = schema.resolve(name).ok_or_else(|| {
1226 Error::InvalidArgumentError(format!("unknown column '{}'", name))
1227 })?;
1228 AggregateCall::CountNulls(column.field_id)
1229 }
1230 };
1231 Ok(ScalarExpr::Aggregate(translated_agg))
1232 }
1233 }
1234}