1use alopex_core::kv::KVStore;
2
3use crate::ast::LITERAL_TABLE;
4use crate::catalog::{Catalog, StorageType};
5use crate::executor::evaluator::EvalContext;
6use crate::executor::memory::MemoryPolicy;
7use crate::executor::{ExecutionResult, ExecutorError, QueryRowIterator, Result};
8use crate::planner::logical_plan::LogicalPlan;
9use crate::planner::typed_expr::{Projection, SortExpr};
10use crate::storage::{SqlTxn, SqlValue};
11
12use super::{ColumnInfo, Row};
13
14pub mod aggregate;
15pub mod columnar_scan;
16pub mod iterator;
17mod knn;
18mod project;
19mod scan;
20
21pub use columnar_scan::{ColumnarScanIterator, create_columnar_scan_iterator};
22pub use iterator::{FilterIterator, LimitIterator, RowIterator, ScanIterator, SortIterator};
23pub use scan::create_scan_iterator;
24
25pub fn execute_query<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
37 txn: &mut T,
38 catalog: &C,
39 plan: LogicalPlan,
40) -> Result<ExecutionResult> {
41 execute_query_with_policy(txn, catalog, plan, None)
42}
43
44pub fn execute_query_with_policy<
45 'txn,
46 S: KVStore + 'txn,
47 C: Catalog + ?Sized,
48 T: SqlTxn<'txn, S>,
49>(
50 txn: &mut T,
51 catalog: &C,
52 plan: LogicalPlan,
53 memory: Option<&MemoryPolicy>,
54) -> Result<ExecutionResult> {
55 if let Some((pattern, projection, filter)) = knn::extract_knn_context(&plan) {
56 return knn::execute_knn_query(txn, catalog, &pattern, &projection, filter.as_ref());
57 }
58
59 let (mut iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan, memory)?;
60
61 let mut rows = Vec::new();
63 while let Some(result) = iter.next_row() {
64 rows.push(result?);
65 }
66
67 let result = project::execute_project(rows, &projection, &schema)?;
68 Ok(ExecutionResult::Query(result))
69}
70
71pub fn execute_query_streaming<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
87 txn: &mut T,
88 catalog: &C,
89 plan: LogicalPlan,
90) -> Result<QueryRowIterator<'static>> {
91 execute_query_streaming_with_policy(txn, catalog, plan, None)
92}
93
94pub fn execute_query_streaming_with_policy<
95 'txn,
96 S: KVStore + 'txn,
97 C: Catalog + ?Sized,
98 T: SqlTxn<'txn, S>,
99>(
100 txn: &mut T,
101 catalog: &C,
102 plan: LogicalPlan,
103 memory: Option<&MemoryPolicy>,
104) -> Result<QueryRowIterator<'static>> {
105 if knn::extract_knn_context(&plan).is_some() {
107 let result = execute_query_with_policy(txn, catalog, plan, memory)?;
109 if let ExecutionResult::Query(qr) = result {
110 let column_names: Vec<String> = qr.columns.iter().map(|c| c.name.clone()).collect();
111 let schema: Vec<crate::catalog::ColumnMetadata> = qr
112 .columns
113 .iter()
114 .map(|c| crate::catalog::ColumnMetadata::new(&c.name, c.data_type.clone()))
115 .collect();
116 let rows: Vec<Row> = qr
117 .rows
118 .into_iter()
119 .enumerate()
120 .map(|(i, values)| Row::new(i as u64, values))
121 .collect();
122 let iter = iterator::VecIterator::new(rows, schema.clone());
123 return Ok(QueryRowIterator::new(
124 Box::new(iter),
125 Projection::All(column_names),
126 schema,
127 ));
128 }
129 return Err(ExecutorError::InvalidOperation {
130 operation: "execute_query_streaming".into(),
131 reason: "KNN query did not return Query result".into(),
132 });
133 }
134
135 let (iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan, memory)?;
136
137 Ok(QueryRowIterator::new(iter, projection, schema))
138}
139
140fn build_iterator_pipeline<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
147 txn: &mut T,
148 catalog: &C,
149 plan: LogicalPlan,
150 memory: Option<&MemoryPolicy>,
151) -> Result<(
152 Box<dyn RowIterator>,
153 Projection,
154 Vec<crate::catalog::ColumnMetadata>,
155)> {
156 match plan {
157 LogicalPlan::Scan { table, projection } => {
158 if table == LITERAL_TABLE {
159 let schema = Vec::new();
160 let rows = vec![Row::new(0, Vec::new())];
161 let iter = iterator::VecIterator::new(rows, schema.clone());
162 return Ok((Box::new(iter), projection, schema));
163 }
164 let table_meta = catalog
165 .get_table(&table)
166 .cloned()
167 .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
168
169 if table_meta.storage_options.storage_type == StorageType::Columnar {
170 let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
171 let rows = columnar_scan::execute_columnar_scan(txn, &table_meta, &columnar_scan)?;
172 let schema = table_meta.columns.clone();
173 let iter = iterator::VecIterator::new(rows, schema.clone());
174 return Ok((Box::new(iter), projection, schema));
175 }
176
177 let rows = scan::execute_scan(txn, &table_meta)?;
181 let schema = table_meta.columns.clone();
182
183 let iter = iterator::VecIterator::new(rows, schema.clone());
185 Ok((Box::new(iter), projection, schema))
186 }
187 LogicalPlan::Filter { input, predicate } => {
188 if let LogicalPlan::Scan { table, projection } = input.as_ref()
189 && let Some(table_meta) = catalog.get_table(table)
190 && table_meta.storage_options.storage_type == StorageType::Columnar
191 {
192 let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
193 table_meta,
194 projection.clone(),
195 &predicate,
196 );
197 let rows = columnar_scan::execute_columnar_scan(txn, table_meta, &columnar_scan)?;
198 let schema = table_meta.columns.clone();
199 let iter = iterator::VecIterator::new(rows, schema.clone());
200 return Ok((Box::new(iter), projection.clone(), schema));
201 }
202 let (input_iter, projection, schema) =
203 build_iterator_pipeline(txn, catalog, *input, memory)?;
204 let filter_iter = FilterIterator::new(input_iter, predicate);
205 Ok((Box::new(filter_iter), projection, schema))
206 }
207 LogicalPlan::Aggregate {
208 input,
209 group_keys,
210 aggregates,
211 having,
212 projection,
213 } => {
214 let (input_iter, _projection, _schema) =
215 build_iterator_pipeline(txn, catalog, *input, memory)?;
216 let schema = aggregate::build_aggregate_schema(&group_keys, &aggregates);
217 if let Some(policy) = memory
218 && policy.spill_directory().is_some()
219 {
220 if group_keys.is_empty() {
221 let iter = aggregate::StreamingAggregateIterator::new(
222 input_iter,
223 group_keys,
224 aggregates,
225 having,
226 schema.clone(),
227 );
228 return Ok((Box::new(iter), projection, schema));
229 }
230 let order_by = group_keys
231 .iter()
232 .cloned()
233 .map(|expr| SortExpr {
234 expr,
235 asc: true,
236 nulls_first: false,
237 })
238 .collect::<Vec<_>>();
239 let sort_iter =
240 SortIterator::new_with_policy(input_iter, &order_by, Some(policy.clone()))?;
241 let iter = aggregate::StreamingAggregateIterator::new(
242 Box::new(sort_iter),
243 group_keys,
244 aggregates,
245 having,
246 schema.clone(),
247 );
248 return Ok((Box::new(iter), projection, schema));
249 }
250
251 let mut iter = aggregate::AggregateIterator::new(
252 input_iter,
253 group_keys,
254 aggregates,
255 having,
256 schema.clone(),
257 );
258 if let Some(policy) = memory {
259 iter = iter.with_memory_policy(Some(policy.clone()));
260 }
261 Ok((Box::new(iter), projection, schema))
262 }
263 LogicalPlan::Sort { input, order_by } => {
264 let (input_iter, projection, schema) =
265 build_iterator_pipeline(txn, catalog, *input, memory)?;
266 let sort_iter = if let Some(policy) = memory {
267 SortIterator::new_with_policy(input_iter, &order_by, Some(policy.clone()))?
268 } else {
269 SortIterator::new(input_iter, &order_by)?
270 };
271 Ok((Box::new(sort_iter), projection, schema))
272 }
273 LogicalPlan::Limit {
274 input,
275 limit,
276 offset,
277 } => {
278 let (input_iter, projection, schema) =
279 build_iterator_pipeline(txn, catalog, *input, memory)?;
280 let limit_iter = LimitIterator::new(input_iter, limit, offset);
281 Ok((Box::new(limit_iter), projection, schema))
282 }
283 other => Err(ExecutorError::UnsupportedOperation(format!(
284 "unsupported query plan: {other:?}"
285 ))),
286 }
287}
288
289pub fn build_streaming_pipeline<
301 'a,
302 'txn: 'a,
303 S: KVStore + 'txn,
304 C: Catalog + ?Sized,
305 T: SqlTxn<'txn, S>,
306>(
307 txn: &'a mut T,
308 catalog: &C,
309 plan: LogicalPlan,
310) -> Result<(
311 Box<dyn RowIterator + 'a>,
312 Projection,
313 Vec<crate::catalog::ColumnMetadata>,
314)> {
315 build_streaming_pipeline_with_policy(txn, catalog, plan, None)
316}
317
318pub fn build_streaming_pipeline_with_policy<
319 'a,
320 'txn: 'a,
321 S: KVStore + 'txn,
322 C: Catalog + ?Sized,
323 T: SqlTxn<'txn, S>,
324>(
325 txn: &'a mut T,
326 catalog: &C,
327 plan: LogicalPlan,
328 memory: Option<&MemoryPolicy>,
329) -> Result<(
330 Box<dyn RowIterator + 'a>,
331 Projection,
332 Vec<crate::catalog::ColumnMetadata>,
333)> {
334 build_streaming_pipeline_inner(txn, catalog, plan, memory)
335}
336
337fn build_streaming_pipeline_inner<
339 'a,
340 'txn: 'a,
341 S: KVStore + 'txn,
342 C: Catalog + ?Sized,
343 T: SqlTxn<'txn, S>,
344>(
345 txn: &'a mut T,
346 catalog: &C,
347 plan: LogicalPlan,
348 memory: Option<&MemoryPolicy>,
349) -> Result<(
350 Box<dyn RowIterator + 'a>,
351 Projection,
352 Vec<crate::catalog::ColumnMetadata>,
353)> {
354 match plan {
355 LogicalPlan::Scan { table, projection } => {
356 if table == LITERAL_TABLE {
357 let schema = Vec::new();
358 let rows = vec![Row::new(0, Vec::new())];
359 let iter = iterator::VecIterator::new(rows, schema.clone());
360 return Ok((Box::new(iter), projection, schema));
361 }
362 let table_meta = catalog
363 .get_table(&table)
364 .cloned()
365 .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
366
367 if table_meta.storage_options.storage_type == StorageType::Columnar {
368 let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
370 let schema = table_meta.columns.clone();
371 let iter =
372 columnar_scan::create_columnar_scan_iterator(txn, &table_meta, &columnar_scan)?;
373 return Ok((Box::new(iter), projection, schema));
374 }
375
376 let schema = table_meta.columns.clone();
378 let scan_iter = scan::create_scan_iterator(txn, &table_meta)?;
379 Ok((Box::new(scan_iter), projection, schema))
380 }
381 LogicalPlan::Filter { input, predicate } => {
382 if let LogicalPlan::Scan { table, projection } = input.as_ref()
383 && let Some(table_meta) = catalog.get_table(table)
384 && table_meta.storage_options.storage_type == StorageType::Columnar
385 {
386 let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
388 table_meta,
389 projection.clone(),
390 &predicate,
391 );
392 let schema = table_meta.columns.clone();
393 let iter =
394 columnar_scan::create_columnar_scan_iterator(txn, table_meta, &columnar_scan)?;
395 return Ok((Box::new(iter), projection.clone(), schema));
396 }
397 let (input_iter, projection, schema) =
398 build_streaming_pipeline_inner(txn, catalog, *input, memory)?;
399 let filter_iter = FilterIterator::new(input_iter, predicate);
400 Ok((Box::new(filter_iter), projection, schema))
401 }
402 LogicalPlan::Aggregate {
403 input,
404 group_keys,
405 aggregates,
406 having,
407 projection,
408 } => {
409 let (input_iter, _projection, _schema) =
410 build_streaming_pipeline_inner(txn, catalog, *input, memory)?;
411 let schema = aggregate::build_aggregate_schema(&group_keys, &aggregates);
412 if let Some(policy) = memory
413 && policy.spill_directory().is_some()
414 {
415 if group_keys.is_empty() {
416 let iter = aggregate::StreamingAggregateIterator::new(
417 input_iter,
418 group_keys,
419 aggregates,
420 having,
421 schema.clone(),
422 );
423 return Ok((Box::new(iter), projection, schema));
424 }
425 let order_by = group_keys
426 .iter()
427 .cloned()
428 .map(|expr| SortExpr {
429 expr,
430 asc: true,
431 nulls_first: false,
432 })
433 .collect::<Vec<_>>();
434 let sort_iter =
435 SortIterator::new_with_policy(input_iter, &order_by, Some(policy.clone()))?;
436 let iter = aggregate::StreamingAggregateIterator::new(
437 Box::new(sort_iter),
438 group_keys,
439 aggregates,
440 having,
441 schema.clone(),
442 );
443 return Ok((Box::new(iter), projection, schema));
444 }
445
446 let mut iter = aggregate::AggregateIterator::new(
447 input_iter,
448 group_keys,
449 aggregates,
450 having,
451 schema.clone(),
452 );
453 if let Some(policy) = memory {
454 iter = iter.with_memory_policy(Some(policy.clone()));
455 }
456 Ok((Box::new(iter), projection, schema))
457 }
458 LogicalPlan::Sort { input, order_by } => {
459 let (input_iter, projection, schema) =
460 build_streaming_pipeline_inner(txn, catalog, *input, memory)?;
461 let sort_iter = if let Some(policy) = memory {
462 SortIterator::new_with_policy(input_iter, &order_by, Some(policy.clone()))?
463 } else {
464 SortIterator::new(input_iter, &order_by)?
465 };
466 Ok((Box::new(sort_iter), projection, schema))
467 }
468 LogicalPlan::Limit {
469 input,
470 limit,
471 offset,
472 } => {
473 let (input_iter, projection, schema) =
474 build_streaming_pipeline_inner(txn, catalog, *input, memory)?;
475 let limit_iter = LimitIterator::new(input_iter, limit, offset);
476 Ok((Box::new(limit_iter), projection, schema))
477 }
478 other => Err(ExecutorError::UnsupportedOperation(format!(
479 "unsupported query plan: {other:?}"
480 ))),
481 }
482}
483
484fn eval_expr(expr: &crate::planner::typed_expr::TypedExpr, row: &Row) -> Result<SqlValue> {
486 let ctx = EvalContext::new(&row.values);
487 crate::executor::evaluator::evaluate(expr, &ctx)
488}
489
490fn column_name_from_projection(
492 projected: &crate::planner::typed_expr::ProjectedColumn,
493 idx: usize,
494) -> String {
495 projected
496 .alias
497 .clone()
498 .or_else(|| match &projected.expr.kind {
499 crate::planner::typed_expr::TypedExprKind::ColumnRef { column, .. } => {
500 Some(column.clone())
501 }
502 _ => None,
503 })
504 .unwrap_or_else(|| format!("col_{idx}"))
505}
506
507fn column_info_from_projection(
509 projected: &crate::planner::typed_expr::ProjectedColumn,
510 idx: usize,
511) -> ColumnInfo {
512 ColumnInfo::new(
513 column_name_from_projection(projected, idx),
514 projected.expr.resolved_type.clone(),
515 )
516}
517
518fn column_infos_from_all(
520 schema: &[crate::catalog::ColumnMetadata],
521 names: &[String],
522) -> Result<Vec<ColumnInfo>> {
523 names
524 .iter()
525 .map(|name| {
526 let col = schema
527 .iter()
528 .find(|c| &c.name == name)
529 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))?;
530 Ok(ColumnInfo::new(name.clone(), col.data_type.clone()))
531 })
532 .collect()
533}
534
535#[cfg(test)]
536mod tests {
537 use super::*;
538 use crate::catalog::{ColumnMetadata, MemoryCatalog, TableMetadata};
539 use crate::executor::ddl::create_table::execute_create_table;
540 use crate::planner::typed_expr::TypedExpr;
541 use crate::planner::types::ResolvedType;
542 use crate::storage::TxnBridge;
543 use alopex_core::kv::memory::MemoryKV;
544 use std::sync::Arc;
545
546 #[test]
547 fn execute_query_scan_only_returns_rows() {
548 let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
549 let mut catalog = MemoryCatalog::new();
550 let table = TableMetadata::new(
551 "users",
552 vec![
553 ColumnMetadata::new("id", ResolvedType::Integer),
554 ColumnMetadata::new("name", ResolvedType::Text),
555 ],
556 );
557 let mut ddl_txn = bridge.begin_write().unwrap();
558 execute_create_table(&mut ddl_txn, &mut catalog, table.clone(), vec![], false).unwrap();
559 ddl_txn.commit().unwrap();
560
561 let mut txn = bridge.begin_write().unwrap();
562 crate::executor::dml::execute_insert(
563 &mut txn,
564 &catalog,
565 "users",
566 vec!["id".into(), "name".into()],
567 vec![vec![
568 TypedExpr::literal(
569 crate::ast::expr::Literal::Number("1".into()),
570 ResolvedType::Integer,
571 crate::Span::default(),
572 ),
573 TypedExpr::literal(
574 crate::ast::expr::Literal::String("alice".into()),
575 ResolvedType::Text,
576 crate::Span::default(),
577 ),
578 ]],
579 )
580 .unwrap();
581
582 let result = execute_query(
583 &mut txn,
584 &catalog,
585 LogicalPlan::scan(
586 "users".into(),
587 Projection::All(vec!["id".into(), "name".into()]),
588 ),
589 )
590 .unwrap();
591
592 match result {
593 ExecutionResult::Query(q) => {
594 assert_eq!(q.rows.len(), 1);
595 assert_eq!(q.columns.len(), 2);
596 assert_eq!(
597 q.rows[0],
598 vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]
599 );
600 }
601 other => panic!("unexpected result {other:?}"),
602 }
603 }
604}