Skip to main content

alopex_sql/executor/query/
mod.rs

1use alopex_core::kv::KVStore;
2
3use crate::ast::LITERAL_TABLE;
4use crate::catalog::{Catalog, StorageType};
5use crate::executor::evaluator::EvalContext;
6use crate::executor::memory::MemoryPolicy;
7use crate::executor::{ExecutionResult, ExecutorError, QueryRowIterator, Result};
8use crate::planner::logical_plan::LogicalPlan;
9use crate::planner::typed_expr::{Projection, SortExpr};
10use crate::storage::{SqlTxn, SqlValue};
11
12use super::{ColumnInfo, Row};
13
14pub mod aggregate;
15pub mod columnar_scan;
16pub mod iterator;
17mod knn;
18mod project;
19mod scan;
20
21pub use columnar_scan::{ColumnarScanIterator, create_columnar_scan_iterator};
22pub use iterator::{FilterIterator, LimitIterator, RowIterator, ScanIterator, SortIterator};
23pub use scan::create_scan_iterator;
24
25/// Execute a SELECT logical plan and return a query result.
26///
27/// This function uses an iterator-based execution model that processes rows
28/// through a pipeline of operators. This approach:
29/// - Enables early termination for LIMIT queries
30/// - Provides streaming execution after the initial scan
31/// - Allows composable query operators
32///
33/// Note: The Scan stage reads all matching rows into memory, but subsequent
34/// operators (Filter, Sort, Limit) process rows through an iterator pipeline.
35/// Sort operations additionally require materializing all input rows.
36pub fn execute_query<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
37    txn: &mut T,
38    catalog: &C,
39    plan: LogicalPlan,
40) -> Result<ExecutionResult> {
41    execute_query_with_policy(txn, catalog, plan, None)
42}
43
44pub fn execute_query_with_policy<
45    'txn,
46    S: KVStore + 'txn,
47    C: Catalog + ?Sized,
48    T: SqlTxn<'txn, S>,
49>(
50    txn: &mut T,
51    catalog: &C,
52    plan: LogicalPlan,
53    memory: Option<&MemoryPolicy>,
54) -> Result<ExecutionResult> {
55    if let Some((pattern, projection, filter)) = knn::extract_knn_context(&plan) {
56        return knn::execute_knn_query(txn, catalog, &pattern, &projection, filter.as_ref());
57    }
58
59    let (mut iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan, memory)?;
60
61    // Collect rows from iterator and apply projection
62    let mut rows = Vec::new();
63    while let Some(result) = iter.next_row() {
64        rows.push(result?);
65    }
66
67    let result = project::execute_project(rows, &projection, &schema)?;
68    Ok(ExecutionResult::Query(result))
69}
70
71/// Execute a SELECT logical plan and return a streaming query result.
72///
73/// This function returns a `QueryRowIterator` that yields rows one at a time,
74/// enabling true streaming output without materializing all rows upfront.
75///
76/// # FR-7 Streaming Output
77///
78/// This function implements the FR-7 requirement for streaming output.
79/// Rows are yielded through an iterator interface, and projection is applied
80/// on-the-fly as each row is consumed.
81///
82/// # Note
83///
84/// KNN queries currently fall back to the non-streaming path as they require
85/// specialized handling.
86pub fn execute_query_streaming<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
87    txn: &mut T,
88    catalog: &C,
89    plan: LogicalPlan,
90) -> Result<QueryRowIterator<'static>> {
91    execute_query_streaming_with_policy(txn, catalog, plan, None)
92}
93
94pub fn execute_query_streaming_with_policy<
95    'txn,
96    S: KVStore + 'txn,
97    C: Catalog + ?Sized,
98    T: SqlTxn<'txn, S>,
99>(
100    txn: &mut T,
101    catalog: &C,
102    plan: LogicalPlan,
103    memory: Option<&MemoryPolicy>,
104) -> Result<QueryRowIterator<'static>> {
105    // KNN queries not yet supported for streaming - fall back would need different handling
106    if knn::extract_knn_context(&plan).is_some() {
107        // For KNN, we materialize and wrap in VecIterator
108        let result = execute_query_with_policy(txn, catalog, plan, memory)?;
109        if let ExecutionResult::Query(qr) = result {
110            let column_names: Vec<String> = qr.columns.iter().map(|c| c.name.clone()).collect();
111            let schema: Vec<crate::catalog::ColumnMetadata> = qr
112                .columns
113                .iter()
114                .map(|c| crate::catalog::ColumnMetadata::new(&c.name, c.data_type.clone()))
115                .collect();
116            let rows: Vec<Row> = qr
117                .rows
118                .into_iter()
119                .enumerate()
120                .map(|(i, values)| Row::new(i as u64, values))
121                .collect();
122            let iter = iterator::VecIterator::new(rows, schema.clone());
123            return Ok(QueryRowIterator::new(
124                Box::new(iter),
125                Projection::All(column_names),
126                schema,
127            ));
128        }
129        return Err(ExecutorError::InvalidOperation {
130            operation: "execute_query_streaming".into(),
131            reason: "KNN query did not return Query result".into(),
132        });
133    }
134
135    let (iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan, memory)?;
136
137    Ok(QueryRowIterator::new(iter, projection, schema))
138}
139
140/// Build an iterator pipeline from a logical plan.
141///
142/// This recursively constructs a tree of iterators that mirrors the logical plan
143/// structure. The scan phase reads rows into memory, then subsequent operators
144/// process them through an iterator pipeline enabling streaming execution and
145/// early termination.
146fn build_iterator_pipeline<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
147    txn: &mut T,
148    catalog: &C,
149    plan: LogicalPlan,
150    memory: Option<&MemoryPolicy>,
151) -> Result<(
152    Box<dyn RowIterator>,
153    Projection,
154    Vec<crate::catalog::ColumnMetadata>,
155)> {
156    match plan {
157        LogicalPlan::Scan { table, projection } => {
158            if table == LITERAL_TABLE {
159                let schema = Vec::new();
160                let rows = vec![Row::new(0, Vec::new())];
161                let iter = iterator::VecIterator::new(rows, schema.clone());
162                return Ok((Box::new(iter), projection, schema));
163            }
164            let table_meta = catalog
165                .get_table(&table)
166                .cloned()
167                .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
168
169            if table_meta.storage_options.storage_type == StorageType::Columnar {
170                let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
171                let rows = columnar_scan::execute_columnar_scan(txn, &table_meta, &columnar_scan)?;
172                let schema = table_meta.columns.clone();
173                let iter = iterator::VecIterator::new(rows, schema.clone());
174                return Ok((Box::new(iter), projection, schema));
175            }
176
177            // TODO: 現状は Scan で一度全件をメモリに載せてから iterator に渡しています。
178            // 将来ストリーミングを徹底する場合は、ScanIterator を活用できるよう
179            // トランザクションのライフタイム設計を見直すとよいです。
180            let rows = scan::execute_scan(txn, &table_meta)?;
181            let schema = table_meta.columns.clone();
182
183            // Wrap in VecIterator for consistent iterator-based processing
184            let iter = iterator::VecIterator::new(rows, schema.clone());
185            Ok((Box::new(iter), projection, schema))
186        }
187        LogicalPlan::Filter { input, predicate } => {
188            if let LogicalPlan::Scan { table, projection } = input.as_ref()
189                && let Some(table_meta) = catalog.get_table(table)
190                && table_meta.storage_options.storage_type == StorageType::Columnar
191            {
192                let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
193                    table_meta,
194                    projection.clone(),
195                    &predicate,
196                );
197                let rows = columnar_scan::execute_columnar_scan(txn, table_meta, &columnar_scan)?;
198                let schema = table_meta.columns.clone();
199                let iter = iterator::VecIterator::new(rows, schema.clone());
200                return Ok((Box::new(iter), projection.clone(), schema));
201            }
202            let (input_iter, projection, schema) =
203                build_iterator_pipeline(txn, catalog, *input, memory)?;
204            let filter_iter = FilterIterator::new(input_iter, predicate);
205            Ok((Box::new(filter_iter), projection, schema))
206        }
207        LogicalPlan::Aggregate {
208            input,
209            group_keys,
210            aggregates,
211            having,
212            projection,
213        } => {
214            let (input_iter, _projection, _schema) =
215                build_iterator_pipeline(txn, catalog, *input, memory)?;
216            let schema = aggregate::build_aggregate_schema(&group_keys, &aggregates);
217            if let Some(policy) = memory
218                && policy.spill_directory().is_some()
219            {
220                if group_keys.is_empty() {
221                    let iter = aggregate::StreamingAggregateIterator::new(
222                        input_iter,
223                        group_keys,
224                        aggregates,
225                        having,
226                        schema.clone(),
227                    );
228                    return Ok((Box::new(iter), projection, schema));
229                }
230                let order_by = group_keys
231                    .iter()
232                    .cloned()
233                    .map(|expr| SortExpr {
234                        expr,
235                        asc: true,
236                        nulls_first: false,
237                    })
238                    .collect::<Vec<_>>();
239                let sort_iter =
240                    SortIterator::new_with_policy(input_iter, &order_by, Some(policy.clone()))?;
241                let iter = aggregate::StreamingAggregateIterator::new(
242                    Box::new(sort_iter),
243                    group_keys,
244                    aggregates,
245                    having,
246                    schema.clone(),
247                );
248                return Ok((Box::new(iter), projection, schema));
249            }
250
251            let mut iter = aggregate::AggregateIterator::new(
252                input_iter,
253                group_keys,
254                aggregates,
255                having,
256                schema.clone(),
257            );
258            if let Some(policy) = memory {
259                iter = iter.with_memory_policy(Some(policy.clone()));
260            }
261            Ok((Box::new(iter), projection, schema))
262        }
263        LogicalPlan::Sort { input, order_by } => {
264            let (input_iter, projection, schema) =
265                build_iterator_pipeline(txn, catalog, *input, memory)?;
266            let sort_iter = if let Some(policy) = memory {
267                SortIterator::new_with_policy(input_iter, &order_by, Some(policy.clone()))?
268            } else {
269                SortIterator::new(input_iter, &order_by)?
270            };
271            Ok((Box::new(sort_iter), projection, schema))
272        }
273        LogicalPlan::Limit {
274            input,
275            limit,
276            offset,
277        } => {
278            let (input_iter, projection, schema) =
279                build_iterator_pipeline(txn, catalog, *input, memory)?;
280            let limit_iter = LimitIterator::new(input_iter, limit, offset);
281            Ok((Box::new(limit_iter), projection, schema))
282        }
283        other => Err(ExecutorError::UnsupportedOperation(format!(
284            "unsupported query plan: {other:?}"
285        ))),
286    }
287}
288
289/// Build a streaming iterator pipeline from a logical plan (FR-7).
290///
291/// This version uses `ScanIterator` for row-based tables to enable true
292/// streaming without materializing all rows upfront. The returned iterator
293/// has lifetime `'a` tied to the transaction borrow.
294///
295/// # Limitations
296///
297/// - Columnar storage still materializes rows (uses VecIterator)
298/// - Sort operations materialize all input rows
299/// - KNN queries are not supported (use `build_iterator_pipeline` instead)
300pub fn build_streaming_pipeline<
301    'a,
302    'txn: 'a,
303    S: KVStore + 'txn,
304    C: Catalog + ?Sized,
305    T: SqlTxn<'txn, S>,
306>(
307    txn: &'a mut T,
308    catalog: &C,
309    plan: LogicalPlan,
310) -> Result<(
311    Box<dyn RowIterator + 'a>,
312    Projection,
313    Vec<crate::catalog::ColumnMetadata>,
314)> {
315    build_streaming_pipeline_with_policy(txn, catalog, plan, None)
316}
317
318pub fn build_streaming_pipeline_with_policy<
319    'a,
320    'txn: 'a,
321    S: KVStore + 'txn,
322    C: Catalog + ?Sized,
323    T: SqlTxn<'txn, S>,
324>(
325    txn: &'a mut T,
326    catalog: &C,
327    plan: LogicalPlan,
328    memory: Option<&MemoryPolicy>,
329) -> Result<(
330    Box<dyn RowIterator + 'a>,
331    Projection,
332    Vec<crate::catalog::ColumnMetadata>,
333)> {
334    build_streaming_pipeline_inner(txn, catalog, plan, memory)
335}
336
337/// Inner implementation of streaming pipeline builder.
338fn build_streaming_pipeline_inner<
339    'a,
340    'txn: 'a,
341    S: KVStore + 'txn,
342    C: Catalog + ?Sized,
343    T: SqlTxn<'txn, S>,
344>(
345    txn: &'a mut T,
346    catalog: &C,
347    plan: LogicalPlan,
348    memory: Option<&MemoryPolicy>,
349) -> Result<(
350    Box<dyn RowIterator + 'a>,
351    Projection,
352    Vec<crate::catalog::ColumnMetadata>,
353)> {
354    match plan {
355        LogicalPlan::Scan { table, projection } => {
356            if table == LITERAL_TABLE {
357                let schema = Vec::new();
358                let rows = vec![Row::new(0, Vec::new())];
359                let iter = iterator::VecIterator::new(rows, schema.clone());
360                return Ok((Box::new(iter), projection, schema));
361            }
362            let table_meta = catalog
363                .get_table(&table)
364                .cloned()
365                .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
366
367            if table_meta.storage_options.storage_type == StorageType::Columnar {
368                // Columnar storage: use ColumnarScanIterator for FR-7 streaming
369                let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
370                let schema = table_meta.columns.clone();
371                let iter =
372                    columnar_scan::create_columnar_scan_iterator(txn, &table_meta, &columnar_scan)?;
373                return Ok((Box::new(iter), projection, schema));
374            }
375
376            // Row-based storage: use ScanIterator for true streaming (FR-7)
377            let schema = table_meta.columns.clone();
378            let scan_iter = scan::create_scan_iterator(txn, &table_meta)?;
379            Ok((Box::new(scan_iter), projection, schema))
380        }
381        LogicalPlan::Filter { input, predicate } => {
382            if let LogicalPlan::Scan { table, projection } = input.as_ref()
383                && let Some(table_meta) = catalog.get_table(table)
384                && table_meta.storage_options.storage_type == StorageType::Columnar
385            {
386                // Columnar storage with filter: use ColumnarScanIterator for FR-7 streaming
387                let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
388                    table_meta,
389                    projection.clone(),
390                    &predicate,
391                );
392                let schema = table_meta.columns.clone();
393                let iter =
394                    columnar_scan::create_columnar_scan_iterator(txn, table_meta, &columnar_scan)?;
395                return Ok((Box::new(iter), projection.clone(), schema));
396            }
397            let (input_iter, projection, schema) =
398                build_streaming_pipeline_inner(txn, catalog, *input, memory)?;
399            let filter_iter = FilterIterator::new(input_iter, predicate);
400            Ok((Box::new(filter_iter), projection, schema))
401        }
402        LogicalPlan::Aggregate {
403            input,
404            group_keys,
405            aggregates,
406            having,
407            projection,
408        } => {
409            let (input_iter, _projection, _schema) =
410                build_streaming_pipeline_inner(txn, catalog, *input, memory)?;
411            let schema = aggregate::build_aggregate_schema(&group_keys, &aggregates);
412            if let Some(policy) = memory
413                && policy.spill_directory().is_some()
414            {
415                if group_keys.is_empty() {
416                    let iter = aggregate::StreamingAggregateIterator::new(
417                        input_iter,
418                        group_keys,
419                        aggregates,
420                        having,
421                        schema.clone(),
422                    );
423                    return Ok((Box::new(iter), projection, schema));
424                }
425                let order_by = group_keys
426                    .iter()
427                    .cloned()
428                    .map(|expr| SortExpr {
429                        expr,
430                        asc: true,
431                        nulls_first: false,
432                    })
433                    .collect::<Vec<_>>();
434                let sort_iter =
435                    SortIterator::new_with_policy(input_iter, &order_by, Some(policy.clone()))?;
436                let iter = aggregate::StreamingAggregateIterator::new(
437                    Box::new(sort_iter),
438                    group_keys,
439                    aggregates,
440                    having,
441                    schema.clone(),
442                );
443                return Ok((Box::new(iter), projection, schema));
444            }
445
446            let mut iter = aggregate::AggregateIterator::new(
447                input_iter,
448                group_keys,
449                aggregates,
450                having,
451                schema.clone(),
452            );
453            if let Some(policy) = memory {
454                iter = iter.with_memory_policy(Some(policy.clone()));
455            }
456            Ok((Box::new(iter), projection, schema))
457        }
458        LogicalPlan::Sort { input, order_by } => {
459            let (input_iter, projection, schema) =
460                build_streaming_pipeline_inner(txn, catalog, *input, memory)?;
461            let sort_iter = if let Some(policy) = memory {
462                SortIterator::new_with_policy(input_iter, &order_by, Some(policy.clone()))?
463            } else {
464                SortIterator::new(input_iter, &order_by)?
465            };
466            Ok((Box::new(sort_iter), projection, schema))
467        }
468        LogicalPlan::Limit {
469            input,
470            limit,
471            offset,
472        } => {
473            let (input_iter, projection, schema) =
474                build_streaming_pipeline_inner(txn, catalog, *input, memory)?;
475            let limit_iter = LimitIterator::new(input_iter, limit, offset);
476            Ok((Box::new(limit_iter), projection, schema))
477        }
478        other => Err(ExecutorError::UnsupportedOperation(format!(
479            "unsupported query plan: {other:?}"
480        ))),
481    }
482}
483
484/// Evaluate a typed expression against a row, returning SqlValue.
485fn eval_expr(expr: &crate::planner::typed_expr::TypedExpr, row: &Row) -> Result<SqlValue> {
486    let ctx = EvalContext::new(&row.values);
487    crate::executor::evaluator::evaluate(expr, &ctx)
488}
489
490/// Build column info name using alias fallback.
491fn column_name_from_projection(
492    projected: &crate::planner::typed_expr::ProjectedColumn,
493    idx: usize,
494) -> String {
495    projected
496        .alias
497        .clone()
498        .or_else(|| match &projected.expr.kind {
499            crate::planner::typed_expr::TypedExprKind::ColumnRef { column, .. } => {
500                Some(column.clone())
501            }
502            _ => None,
503        })
504        .unwrap_or_else(|| format!("col_{idx}"))
505}
506
507/// Build ColumnInfo from projection.
508fn column_info_from_projection(
509    projected: &crate::planner::typed_expr::ProjectedColumn,
510    idx: usize,
511) -> ColumnInfo {
512    ColumnInfo::new(
513        column_name_from_projection(projected, idx),
514        projected.expr.resolved_type.clone(),
515    )
516}
517
518/// Build ColumnInfo for Projection::All using schema.
519fn column_infos_from_all(
520    schema: &[crate::catalog::ColumnMetadata],
521    names: &[String],
522) -> Result<Vec<ColumnInfo>> {
523    names
524        .iter()
525        .map(|name| {
526            let col = schema
527                .iter()
528                .find(|c| &c.name == name)
529                .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))?;
530            Ok(ColumnInfo::new(name.clone(), col.data_type.clone()))
531        })
532        .collect()
533}
534
535#[cfg(test)]
536mod tests {
537    use super::*;
538    use crate::catalog::{ColumnMetadata, MemoryCatalog, TableMetadata};
539    use crate::executor::ddl::create_table::execute_create_table;
540    use crate::planner::typed_expr::TypedExpr;
541    use crate::planner::types::ResolvedType;
542    use crate::storage::TxnBridge;
543    use alopex_core::kv::memory::MemoryKV;
544    use std::sync::Arc;
545
546    #[test]
547    fn execute_query_scan_only_returns_rows() {
548        let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
549        let mut catalog = MemoryCatalog::new();
550        let table = TableMetadata::new(
551            "users",
552            vec![
553                ColumnMetadata::new("id", ResolvedType::Integer),
554                ColumnMetadata::new("name", ResolvedType::Text),
555            ],
556        );
557        let mut ddl_txn = bridge.begin_write().unwrap();
558        execute_create_table(&mut ddl_txn, &mut catalog, table.clone(), vec![], false).unwrap();
559        ddl_txn.commit().unwrap();
560
561        let mut txn = bridge.begin_write().unwrap();
562        crate::executor::dml::execute_insert(
563            &mut txn,
564            &catalog,
565            "users",
566            vec!["id".into(), "name".into()],
567            vec![vec![
568                TypedExpr::literal(
569                    crate::ast::expr::Literal::Number("1".into()),
570                    ResolvedType::Integer,
571                    crate::Span::default(),
572                ),
573                TypedExpr::literal(
574                    crate::ast::expr::Literal::String("alice".into()),
575                    ResolvedType::Text,
576                    crate::Span::default(),
577                ),
578            ]],
579        )
580        .unwrap();
581
582        let result = execute_query(
583            &mut txn,
584            &catalog,
585            LogicalPlan::scan(
586                "users".into(),
587                Projection::All(vec!["id".into(), "name".into()]),
588            ),
589        )
590        .unwrap();
591
592        match result {
593            ExecutionResult::Query(q) => {
594                assert_eq!(q.rows.len(), 1);
595                assert_eq!(q.columns.len(), 2);
596                assert_eq!(
597                    q.rows[0],
598                    vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]
599                );
600            }
601            other => panic!("unexpected result {other:?}"),
602        }
603    }
604}