alopex_sql/executor/query/
mod.rs

1use alopex_core::kv::KVStore;
2
3use crate::catalog::{Catalog, StorageType};
4use crate::executor::evaluator::EvalContext;
5use crate::executor::{ExecutionResult, ExecutorError, QueryRowIterator, Result};
6use crate::planner::logical_plan::LogicalPlan;
7use crate::planner::typed_expr::Projection;
8use crate::storage::{SqlTxn, SqlValue};
9
10use super::{ColumnInfo, Row};
11
12pub mod columnar_scan;
13pub mod iterator;
14mod knn;
15mod project;
16mod scan;
17
18pub use columnar_scan::{ColumnarScanIterator, create_columnar_scan_iterator};
19pub use iterator::{FilterIterator, LimitIterator, RowIterator, ScanIterator, SortIterator};
20pub use scan::create_scan_iterator;
21
22/// Execute a SELECT logical plan and return a query result.
23///
24/// This function uses an iterator-based execution model that processes rows
25/// through a pipeline of operators. This approach:
26/// - Enables early termination for LIMIT queries
27/// - Provides streaming execution after the initial scan
28/// - Allows composable query operators
29///
30/// Note: The Scan stage reads all matching rows into memory, but subsequent
31/// operators (Filter, Sort, Limit) process rows through an iterator pipeline.
32/// Sort operations additionally require materializing all input rows.
33pub fn execute_query<'txn, S: KVStore + 'txn, C: Catalog, T: SqlTxn<'txn, S>>(
34    txn: &mut T,
35    catalog: &C,
36    plan: LogicalPlan,
37) -> Result<ExecutionResult> {
38    if let Some((pattern, projection, filter)) = knn::extract_knn_context(&plan) {
39        return knn::execute_knn_query(txn, catalog, &pattern, &projection, filter.as_ref());
40    }
41
42    let (mut iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan)?;
43
44    // Collect rows from iterator and apply projection
45    let mut rows = Vec::new();
46    while let Some(result) = iter.next_row() {
47        rows.push(result?);
48    }
49
50    let result = project::execute_project(rows, &projection, &schema)?;
51    Ok(ExecutionResult::Query(result))
52}
53
54/// Execute a SELECT logical plan and return a streaming query result.
55///
56/// This function returns a `QueryRowIterator` that yields rows one at a time,
57/// enabling true streaming output without materializing all rows upfront.
58///
59/// # FR-7 Streaming Output
60///
61/// This function implements the FR-7 requirement for streaming output.
62/// Rows are yielded through an iterator interface, and projection is applied
63/// on-the-fly as each row is consumed.
64///
65/// # Note
66///
67/// KNN queries currently fall back to the non-streaming path as they require
68/// specialized handling.
69pub fn execute_query_streaming<'txn, S: KVStore + 'txn, C: Catalog, T: SqlTxn<'txn, S>>(
70    txn: &mut T,
71    catalog: &C,
72    plan: LogicalPlan,
73) -> Result<QueryRowIterator<'static>> {
74    // KNN queries not yet supported for streaming - fall back would need different handling
75    if knn::extract_knn_context(&plan).is_some() {
76        // For KNN, we materialize and wrap in VecIterator
77        let result = execute_query(txn, catalog, plan)?;
78        if let ExecutionResult::Query(qr) = result {
79            let column_names: Vec<String> = qr.columns.iter().map(|c| c.name.clone()).collect();
80            let schema: Vec<crate::catalog::ColumnMetadata> = qr
81                .columns
82                .iter()
83                .map(|c| crate::catalog::ColumnMetadata::new(&c.name, c.data_type.clone()))
84                .collect();
85            let rows: Vec<Row> = qr
86                .rows
87                .into_iter()
88                .enumerate()
89                .map(|(i, values)| Row::new(i as u64, values))
90                .collect();
91            let iter = iterator::VecIterator::new(rows, schema.clone());
92            return Ok(QueryRowIterator::new(
93                Box::new(iter),
94                Projection::All(column_names),
95                schema,
96            ));
97        }
98        return Err(ExecutorError::InvalidOperation {
99            operation: "execute_query_streaming".into(),
100            reason: "KNN query did not return Query result".into(),
101        });
102    }
103
104    let (iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan)?;
105
106    Ok(QueryRowIterator::new(iter, projection, schema))
107}
108
109/// Build an iterator pipeline from a logical plan.
110///
111/// This recursively constructs a tree of iterators that mirrors the logical plan
112/// structure. The scan phase reads rows into memory, then subsequent operators
113/// process them through an iterator pipeline enabling streaming execution and
114/// early termination.
115fn build_iterator_pipeline<'txn, S: KVStore + 'txn, C: Catalog, T: SqlTxn<'txn, S>>(
116    txn: &mut T,
117    catalog: &C,
118    plan: LogicalPlan,
119) -> Result<(
120    Box<dyn RowIterator>,
121    Projection,
122    Vec<crate::catalog::ColumnMetadata>,
123)> {
124    match plan {
125        LogicalPlan::Scan { table, projection } => {
126            let table_meta = catalog
127                .get_table(&table)
128                .cloned()
129                .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
130
131            if table_meta.storage_options.storage_type == StorageType::Columnar {
132                let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
133                let rows = columnar_scan::execute_columnar_scan(txn, &table_meta, &columnar_scan)?;
134                let schema = table_meta.columns.clone();
135                let iter = iterator::VecIterator::new(rows, schema.clone());
136                return Ok((Box::new(iter), projection, schema));
137            }
138
139            // TODO: 現状は Scan で一度全件をメモリに載せてから iterator に渡しています。
140            // 将来ストリーミングを徹底する場合は、ScanIterator を活用できるよう
141            // トランザクションのライフタイム設計を見直すとよいです。
142            let rows = scan::execute_scan(txn, &table_meta)?;
143            let schema = table_meta.columns.clone();
144
145            // Wrap in VecIterator for consistent iterator-based processing
146            let iter = iterator::VecIterator::new(rows, schema.clone());
147            Ok((Box::new(iter), projection, schema))
148        }
149        LogicalPlan::Filter { input, predicate } => {
150            if let LogicalPlan::Scan { table, projection } = input.as_ref()
151                && let Some(table_meta) = catalog.get_table(table)
152                && table_meta.storage_options.storage_type == StorageType::Columnar
153            {
154                let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
155                    table_meta,
156                    projection.clone(),
157                    &predicate,
158                );
159                let rows = columnar_scan::execute_columnar_scan(txn, table_meta, &columnar_scan)?;
160                let schema = table_meta.columns.clone();
161                let iter = iterator::VecIterator::new(rows, schema.clone());
162                return Ok((Box::new(iter), projection.clone(), schema));
163            }
164            let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
165            let filter_iter = FilterIterator::new(input_iter, predicate);
166            Ok((Box::new(filter_iter), projection, schema))
167        }
168        LogicalPlan::Sort { input, order_by } => {
169            let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
170            let sort_iter = SortIterator::new(input_iter, &order_by)?;
171            Ok((Box::new(sort_iter), projection, schema))
172        }
173        LogicalPlan::Limit {
174            input,
175            limit,
176            offset,
177        } => {
178            let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
179            let limit_iter = LimitIterator::new(input_iter, limit, offset);
180            Ok((Box::new(limit_iter), projection, schema))
181        }
182        other => Err(ExecutorError::UnsupportedOperation(format!(
183            "unsupported query plan: {other:?}"
184        ))),
185    }
186}
187
188/// Build a streaming iterator pipeline from a logical plan (FR-7).
189///
190/// This version uses `ScanIterator` for row-based tables to enable true
191/// streaming without materializing all rows upfront. The returned iterator
192/// has lifetime `'a` tied to the transaction borrow.
193///
194/// # Limitations
195///
196/// - Columnar storage still materializes rows (uses VecIterator)
197/// - Sort operations materialize all input rows
198/// - KNN queries are not supported (use `build_iterator_pipeline` instead)
199pub fn build_streaming_pipeline<'a, 'txn: 'a, S: KVStore + 'txn, C: Catalog, T: SqlTxn<'txn, S>>(
200    txn: &'a mut T,
201    catalog: &C,
202    plan: LogicalPlan,
203) -> Result<(
204    Box<dyn RowIterator + 'a>,
205    Projection,
206    Vec<crate::catalog::ColumnMetadata>,
207)> {
208    build_streaming_pipeline_inner(txn, catalog, plan)
209}
210
211/// Inner implementation of streaming pipeline builder.
212fn build_streaming_pipeline_inner<
213    'a,
214    'txn: 'a,
215    S: KVStore + 'txn,
216    C: Catalog,
217    T: SqlTxn<'txn, S>,
218>(
219    txn: &'a mut T,
220    catalog: &C,
221    plan: LogicalPlan,
222) -> Result<(
223    Box<dyn RowIterator + 'a>,
224    Projection,
225    Vec<crate::catalog::ColumnMetadata>,
226)> {
227    match plan {
228        LogicalPlan::Scan { table, projection } => {
229            let table_meta = catalog
230                .get_table(&table)
231                .cloned()
232                .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
233
234            if table_meta.storage_options.storage_type == StorageType::Columnar {
235                // Columnar storage: use ColumnarScanIterator for FR-7 streaming
236                let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
237                let schema = table_meta.columns.clone();
238                let iter =
239                    columnar_scan::create_columnar_scan_iterator(txn, &table_meta, &columnar_scan)?;
240                return Ok((Box::new(iter), projection, schema));
241            }
242
243            // Row-based storage: use ScanIterator for true streaming (FR-7)
244            let schema = table_meta.columns.clone();
245            let scan_iter = scan::create_scan_iterator(txn, &table_meta)?;
246            Ok((Box::new(scan_iter), projection, schema))
247        }
248        LogicalPlan::Filter { input, predicate } => {
249            if let LogicalPlan::Scan { table, projection } = input.as_ref()
250                && let Some(table_meta) = catalog.get_table(table)
251                && table_meta.storage_options.storage_type == StorageType::Columnar
252            {
253                // Columnar storage with filter: use ColumnarScanIterator for FR-7 streaming
254                let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
255                    table_meta,
256                    projection.clone(),
257                    &predicate,
258                );
259                let schema = table_meta.columns.clone();
260                let iter =
261                    columnar_scan::create_columnar_scan_iterator(txn, table_meta, &columnar_scan)?;
262                return Ok((Box::new(iter), projection.clone(), schema));
263            }
264            let (input_iter, projection, schema) =
265                build_streaming_pipeline_inner(txn, catalog, *input)?;
266            let filter_iter = FilterIterator::new(input_iter, predicate);
267            Ok((Box::new(filter_iter), projection, schema))
268        }
269        LogicalPlan::Sort { input, order_by } => {
270            let (input_iter, projection, schema) =
271                build_streaming_pipeline_inner(txn, catalog, *input)?;
272            let sort_iter = SortIterator::new(input_iter, &order_by)?;
273            Ok((Box::new(sort_iter), projection, schema))
274        }
275        LogicalPlan::Limit {
276            input,
277            limit,
278            offset,
279        } => {
280            let (input_iter, projection, schema) =
281                build_streaming_pipeline_inner(txn, catalog, *input)?;
282            let limit_iter = LimitIterator::new(input_iter, limit, offset);
283            Ok((Box::new(limit_iter), projection, schema))
284        }
285        other => Err(ExecutorError::UnsupportedOperation(format!(
286            "unsupported query plan: {other:?}"
287        ))),
288    }
289}
290
291/// Evaluate a typed expression against a row, returning SqlValue.
292fn eval_expr(expr: &crate::planner::typed_expr::TypedExpr, row: &Row) -> Result<SqlValue> {
293    let ctx = EvalContext::new(&row.values);
294    crate::executor::evaluator::evaluate(expr, &ctx)
295}
296
297/// Build column info name using alias fallback.
298fn column_name_from_projection(
299    projected: &crate::planner::typed_expr::ProjectedColumn,
300    idx: usize,
301) -> String {
302    projected
303        .alias
304        .clone()
305        .or_else(|| match &projected.expr.kind {
306            crate::planner::typed_expr::TypedExprKind::ColumnRef { column, .. } => {
307                Some(column.clone())
308            }
309            _ => None,
310        })
311        .unwrap_or_else(|| format!("col_{idx}"))
312}
313
314/// Build ColumnInfo from projection.
315fn column_info_from_projection(
316    projected: &crate::planner::typed_expr::ProjectedColumn,
317    idx: usize,
318) -> ColumnInfo {
319    ColumnInfo::new(
320        column_name_from_projection(projected, idx),
321        projected.expr.resolved_type.clone(),
322    )
323}
324
325/// Build ColumnInfo for Projection::All using schema.
326fn column_infos_from_all(
327    schema: &[crate::catalog::ColumnMetadata],
328    names: &[String],
329) -> Result<Vec<ColumnInfo>> {
330    names
331        .iter()
332        .map(|name| {
333            let col = schema
334                .iter()
335                .find(|c| &c.name == name)
336                .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))?;
337            Ok(ColumnInfo::new(name.clone(), col.data_type.clone()))
338        })
339        .collect()
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use crate::catalog::{ColumnMetadata, MemoryCatalog, TableMetadata};
346    use crate::executor::ddl::create_table::execute_create_table;
347    use crate::planner::typed_expr::TypedExpr;
348    use crate::planner::types::ResolvedType;
349    use crate::storage::TxnBridge;
350    use alopex_core::kv::memory::MemoryKV;
351    use std::sync::Arc;
352
353    #[test]
354    fn execute_query_scan_only_returns_rows() {
355        let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
356        let mut catalog = MemoryCatalog::new();
357        let table = TableMetadata::new(
358            "users",
359            vec![
360                ColumnMetadata::new("id", ResolvedType::Integer),
361                ColumnMetadata::new("name", ResolvedType::Text),
362            ],
363        );
364        let mut ddl_txn = bridge.begin_write().unwrap();
365        execute_create_table(&mut ddl_txn, &mut catalog, table.clone(), vec![], false).unwrap();
366        ddl_txn.commit().unwrap();
367
368        let mut txn = bridge.begin_write().unwrap();
369        crate::executor::dml::execute_insert(
370            &mut txn,
371            &catalog,
372            "users",
373            vec!["id".into(), "name".into()],
374            vec![vec![
375                TypedExpr::literal(
376                    crate::ast::expr::Literal::Number("1".into()),
377                    ResolvedType::Integer,
378                    crate::Span::default(),
379                ),
380                TypedExpr::literal(
381                    crate::ast::expr::Literal::String("alice".into()),
382                    ResolvedType::Text,
383                    crate::Span::default(),
384                ),
385            ]],
386        )
387        .unwrap();
388
389        let result = execute_query(
390            &mut txn,
391            &catalog,
392            LogicalPlan::scan(
393                "users".into(),
394                Projection::All(vec!["id".into(), "name".into()]),
395            ),
396        )
397        .unwrap();
398
399        match result {
400            ExecutionResult::Query(q) => {
401                assert_eq!(q.rows.len(), 1);
402                assert_eq!(q.columns.len(), 2);
403                assert_eq!(
404                    q.rows[0],
405                    vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]
406                );
407            }
408            other => panic!("unexpected result {other:?}"),
409        }
410    }
411}