alopex_sql/executor/query/
mod.rs

1use alopex_core::kv::KVStore;
2
3use crate::catalog::{Catalog, StorageType};
4use crate::executor::evaluator::EvalContext;
5use crate::executor::{ExecutionResult, ExecutorError, QueryRowIterator, Result};
6use crate::planner::logical_plan::LogicalPlan;
7use crate::planner::typed_expr::Projection;
8use crate::storage::{SqlTxn, SqlValue};
9
10use super::{ColumnInfo, Row};
11
12pub mod columnar_scan;
13pub mod iterator;
14mod knn;
15mod project;
16mod scan;
17
18pub use columnar_scan::{ColumnarScanIterator, create_columnar_scan_iterator};
19pub use iterator::{FilterIterator, LimitIterator, RowIterator, ScanIterator, SortIterator};
20pub use scan::create_scan_iterator;
21
22/// Execute a SELECT logical plan and return a query result.
23///
24/// This function uses an iterator-based execution model that processes rows
25/// through a pipeline of operators. This approach:
26/// - Enables early termination for LIMIT queries
27/// - Provides streaming execution after the initial scan
28/// - Allows composable query operators
29///
30/// Note: The Scan stage reads all matching rows into memory, but subsequent
31/// operators (Filter, Sort, Limit) process rows through an iterator pipeline.
32/// Sort operations additionally require materializing all input rows.
33pub fn execute_query<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
34    txn: &mut T,
35    catalog: &C,
36    plan: LogicalPlan,
37) -> Result<ExecutionResult> {
38    if let Some((pattern, projection, filter)) = knn::extract_knn_context(&plan) {
39        return knn::execute_knn_query(txn, catalog, &pattern, &projection, filter.as_ref());
40    }
41
42    let (mut iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan)?;
43
44    // Collect rows from iterator and apply projection
45    let mut rows = Vec::new();
46    while let Some(result) = iter.next_row() {
47        rows.push(result?);
48    }
49
50    let result = project::execute_project(rows, &projection, &schema)?;
51    Ok(ExecutionResult::Query(result))
52}
53
54/// Execute a SELECT logical plan and return a streaming query result.
55///
56/// This function returns a `QueryRowIterator` that yields rows one at a time,
57/// enabling true streaming output without materializing all rows upfront.
58///
59/// # FR-7 Streaming Output
60///
61/// This function implements the FR-7 requirement for streaming output.
62/// Rows are yielded through an iterator interface, and projection is applied
63/// on-the-fly as each row is consumed.
64///
65/// # Note
66///
67/// KNN queries currently fall back to the non-streaming path as they require
68/// specialized handling.
69pub fn execute_query_streaming<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
70    txn: &mut T,
71    catalog: &C,
72    plan: LogicalPlan,
73) -> Result<QueryRowIterator<'static>> {
74    // KNN queries not yet supported for streaming - fall back would need different handling
75    if knn::extract_knn_context(&plan).is_some() {
76        // For KNN, we materialize and wrap in VecIterator
77        let result = execute_query(txn, catalog, plan)?;
78        if let ExecutionResult::Query(qr) = result {
79            let column_names: Vec<String> = qr.columns.iter().map(|c| c.name.clone()).collect();
80            let schema: Vec<crate::catalog::ColumnMetadata> = qr
81                .columns
82                .iter()
83                .map(|c| crate::catalog::ColumnMetadata::new(&c.name, c.data_type.clone()))
84                .collect();
85            let rows: Vec<Row> = qr
86                .rows
87                .into_iter()
88                .enumerate()
89                .map(|(i, values)| Row::new(i as u64, values))
90                .collect();
91            let iter = iterator::VecIterator::new(rows, schema.clone());
92            return Ok(QueryRowIterator::new(
93                Box::new(iter),
94                Projection::All(column_names),
95                schema,
96            ));
97        }
98        return Err(ExecutorError::InvalidOperation {
99            operation: "execute_query_streaming".into(),
100            reason: "KNN query did not return Query result".into(),
101        });
102    }
103
104    let (iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan)?;
105
106    Ok(QueryRowIterator::new(iter, projection, schema))
107}
108
109/// Build an iterator pipeline from a logical plan.
110///
111/// This recursively constructs a tree of iterators that mirrors the logical plan
112/// structure. The scan phase reads rows into memory, then subsequent operators
113/// process them through an iterator pipeline enabling streaming execution and
114/// early termination.
115fn build_iterator_pipeline<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
116    txn: &mut T,
117    catalog: &C,
118    plan: LogicalPlan,
119) -> Result<(
120    Box<dyn RowIterator>,
121    Projection,
122    Vec<crate::catalog::ColumnMetadata>,
123)> {
124    match plan {
125        LogicalPlan::Scan { table, projection } => {
126            let table_meta = catalog
127                .get_table(&table)
128                .cloned()
129                .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
130
131            if table_meta.storage_options.storage_type == StorageType::Columnar {
132                let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
133                let rows = columnar_scan::execute_columnar_scan(txn, &table_meta, &columnar_scan)?;
134                let schema = table_meta.columns.clone();
135                let iter = iterator::VecIterator::new(rows, schema.clone());
136                return Ok((Box::new(iter), projection, schema));
137            }
138
139            // TODO: 現状は Scan で一度全件をメモリに載せてから iterator に渡しています。
140            // 将来ストリーミングを徹底する場合は、ScanIterator を活用できるよう
141            // トランザクションのライフタイム設計を見直すとよいです。
142            let rows = scan::execute_scan(txn, &table_meta)?;
143            let schema = table_meta.columns.clone();
144
145            // Wrap in VecIterator for consistent iterator-based processing
146            let iter = iterator::VecIterator::new(rows, schema.clone());
147            Ok((Box::new(iter), projection, schema))
148        }
149        LogicalPlan::Filter { input, predicate } => {
150            if let LogicalPlan::Scan { table, projection } = input.as_ref()
151                && let Some(table_meta) = catalog.get_table(table)
152                && table_meta.storage_options.storage_type == StorageType::Columnar
153            {
154                let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
155                    table_meta,
156                    projection.clone(),
157                    &predicate,
158                );
159                let rows = columnar_scan::execute_columnar_scan(txn, table_meta, &columnar_scan)?;
160                let schema = table_meta.columns.clone();
161                let iter = iterator::VecIterator::new(rows, schema.clone());
162                return Ok((Box::new(iter), projection.clone(), schema));
163            }
164            let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
165            let filter_iter = FilterIterator::new(input_iter, predicate);
166            Ok((Box::new(filter_iter), projection, schema))
167        }
168        LogicalPlan::Sort { input, order_by } => {
169            let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
170            let sort_iter = SortIterator::new(input_iter, &order_by)?;
171            Ok((Box::new(sort_iter), projection, schema))
172        }
173        LogicalPlan::Limit {
174            input,
175            limit,
176            offset,
177        } => {
178            let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
179            let limit_iter = LimitIterator::new(input_iter, limit, offset);
180            Ok((Box::new(limit_iter), projection, schema))
181        }
182        other => Err(ExecutorError::UnsupportedOperation(format!(
183            "unsupported query plan: {other:?}"
184        ))),
185    }
186}
187
188/// Build a streaming iterator pipeline from a logical plan (FR-7).
189///
190/// This version uses `ScanIterator` for row-based tables to enable true
191/// streaming without materializing all rows upfront. The returned iterator
192/// has lifetime `'a` tied to the transaction borrow.
193///
194/// # Limitations
195///
196/// - Columnar storage still materializes rows (uses VecIterator)
197/// - Sort operations materialize all input rows
198/// - KNN queries are not supported (use `build_iterator_pipeline` instead)
199pub fn build_streaming_pipeline<
200    'a,
201    'txn: 'a,
202    S: KVStore + 'txn,
203    C: Catalog + ?Sized,
204    T: SqlTxn<'txn, S>,
205>(
206    txn: &'a mut T,
207    catalog: &C,
208    plan: LogicalPlan,
209) -> Result<(
210    Box<dyn RowIterator + 'a>,
211    Projection,
212    Vec<crate::catalog::ColumnMetadata>,
213)> {
214    build_streaming_pipeline_inner(txn, catalog, plan)
215}
216
217/// Inner implementation of streaming pipeline builder.
218fn build_streaming_pipeline_inner<
219    'a,
220    'txn: 'a,
221    S: KVStore + 'txn,
222    C: Catalog + ?Sized,
223    T: SqlTxn<'txn, S>,
224>(
225    txn: &'a mut T,
226    catalog: &C,
227    plan: LogicalPlan,
228) -> Result<(
229    Box<dyn RowIterator + 'a>,
230    Projection,
231    Vec<crate::catalog::ColumnMetadata>,
232)> {
233    match plan {
234        LogicalPlan::Scan { table, projection } => {
235            let table_meta = catalog
236                .get_table(&table)
237                .cloned()
238                .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
239
240            if table_meta.storage_options.storage_type == StorageType::Columnar {
241                // Columnar storage: use ColumnarScanIterator for FR-7 streaming
242                let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
243                let schema = table_meta.columns.clone();
244                let iter =
245                    columnar_scan::create_columnar_scan_iterator(txn, &table_meta, &columnar_scan)?;
246                return Ok((Box::new(iter), projection, schema));
247            }
248
249            // Row-based storage: use ScanIterator for true streaming (FR-7)
250            let schema = table_meta.columns.clone();
251            let scan_iter = scan::create_scan_iterator(txn, &table_meta)?;
252            Ok((Box::new(scan_iter), projection, schema))
253        }
254        LogicalPlan::Filter { input, predicate } => {
255            if let LogicalPlan::Scan { table, projection } = input.as_ref()
256                && let Some(table_meta) = catalog.get_table(table)
257                && table_meta.storage_options.storage_type == StorageType::Columnar
258            {
259                // Columnar storage with filter: use ColumnarScanIterator for FR-7 streaming
260                let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
261                    table_meta,
262                    projection.clone(),
263                    &predicate,
264                );
265                let schema = table_meta.columns.clone();
266                let iter =
267                    columnar_scan::create_columnar_scan_iterator(txn, table_meta, &columnar_scan)?;
268                return Ok((Box::new(iter), projection.clone(), schema));
269            }
270            let (input_iter, projection, schema) =
271                build_streaming_pipeline_inner(txn, catalog, *input)?;
272            let filter_iter = FilterIterator::new(input_iter, predicate);
273            Ok((Box::new(filter_iter), projection, schema))
274        }
275        LogicalPlan::Sort { input, order_by } => {
276            let (input_iter, projection, schema) =
277                build_streaming_pipeline_inner(txn, catalog, *input)?;
278            let sort_iter = SortIterator::new(input_iter, &order_by)?;
279            Ok((Box::new(sort_iter), projection, schema))
280        }
281        LogicalPlan::Limit {
282            input,
283            limit,
284            offset,
285        } => {
286            let (input_iter, projection, schema) =
287                build_streaming_pipeline_inner(txn, catalog, *input)?;
288            let limit_iter = LimitIterator::new(input_iter, limit, offset);
289            Ok((Box::new(limit_iter), projection, schema))
290        }
291        other => Err(ExecutorError::UnsupportedOperation(format!(
292            "unsupported query plan: {other:?}"
293        ))),
294    }
295}
296
297/// Evaluate a typed expression against a row, returning SqlValue.
298fn eval_expr(expr: &crate::planner::typed_expr::TypedExpr, row: &Row) -> Result<SqlValue> {
299    let ctx = EvalContext::new(&row.values);
300    crate::executor::evaluator::evaluate(expr, &ctx)
301}
302
303/// Build column info name using alias fallback.
304fn column_name_from_projection(
305    projected: &crate::planner::typed_expr::ProjectedColumn,
306    idx: usize,
307) -> String {
308    projected
309        .alias
310        .clone()
311        .or_else(|| match &projected.expr.kind {
312            crate::planner::typed_expr::TypedExprKind::ColumnRef { column, .. } => {
313                Some(column.clone())
314            }
315            _ => None,
316        })
317        .unwrap_or_else(|| format!("col_{idx}"))
318}
319
320/// Build ColumnInfo from projection.
321fn column_info_from_projection(
322    projected: &crate::planner::typed_expr::ProjectedColumn,
323    idx: usize,
324) -> ColumnInfo {
325    ColumnInfo::new(
326        column_name_from_projection(projected, idx),
327        projected.expr.resolved_type.clone(),
328    )
329}
330
331/// Build ColumnInfo for Projection::All using schema.
332fn column_infos_from_all(
333    schema: &[crate::catalog::ColumnMetadata],
334    names: &[String],
335) -> Result<Vec<ColumnInfo>> {
336    names
337        .iter()
338        .map(|name| {
339            let col = schema
340                .iter()
341                .find(|c| &c.name == name)
342                .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))?;
343            Ok(ColumnInfo::new(name.clone(), col.data_type.clone()))
344        })
345        .collect()
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351    use crate::catalog::{ColumnMetadata, MemoryCatalog, TableMetadata};
352    use crate::executor::ddl::create_table::execute_create_table;
353    use crate::planner::typed_expr::TypedExpr;
354    use crate::planner::types::ResolvedType;
355    use crate::storage::TxnBridge;
356    use alopex_core::kv::memory::MemoryKV;
357    use std::sync::Arc;
358
359    #[test]
360    fn execute_query_scan_only_returns_rows() {
361        let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
362        let mut catalog = MemoryCatalog::new();
363        let table = TableMetadata::new(
364            "users",
365            vec![
366                ColumnMetadata::new("id", ResolvedType::Integer),
367                ColumnMetadata::new("name", ResolvedType::Text),
368            ],
369        );
370        let mut ddl_txn = bridge.begin_write().unwrap();
371        execute_create_table(&mut ddl_txn, &mut catalog, table.clone(), vec![], false).unwrap();
372        ddl_txn.commit().unwrap();
373
374        let mut txn = bridge.begin_write().unwrap();
375        crate::executor::dml::execute_insert(
376            &mut txn,
377            &catalog,
378            "users",
379            vec!["id".into(), "name".into()],
380            vec![vec![
381                TypedExpr::literal(
382                    crate::ast::expr::Literal::Number("1".into()),
383                    ResolvedType::Integer,
384                    crate::Span::default(),
385                ),
386                TypedExpr::literal(
387                    crate::ast::expr::Literal::String("alice".into()),
388                    ResolvedType::Text,
389                    crate::Span::default(),
390                ),
391            ]],
392        )
393        .unwrap();
394
395        let result = execute_query(
396            &mut txn,
397            &catalog,
398            LogicalPlan::scan(
399                "users".into(),
400                Projection::All(vec!["id".into(), "name".into()]),
401            ),
402        )
403        .unwrap();
404
405        match result {
406            ExecutionResult::Query(q) => {
407                assert_eq!(q.rows.len(), 1);
408                assert_eq!(q.columns.len(), 2);
409                assert_eq!(
410                    q.rows[0],
411                    vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]
412                );
413            }
414            other => panic!("unexpected result {other:?}"),
415        }
416    }
417}