alopex_sql/executor/query/
mod.rs

1use alopex_core::kv::KVStore;
2
3use crate::ast::LITERAL_TABLE;
4use crate::catalog::{Catalog, StorageType};
5use crate::executor::evaluator::EvalContext;
6use crate::executor::{ExecutionResult, ExecutorError, QueryRowIterator, Result};
7use crate::planner::logical_plan::LogicalPlan;
8use crate::planner::typed_expr::Projection;
9use crate::storage::{SqlTxn, SqlValue};
10
11use super::{ColumnInfo, Row};
12
13pub mod columnar_scan;
14pub mod iterator;
15mod knn;
16mod project;
17mod scan;
18
19pub use columnar_scan::{ColumnarScanIterator, create_columnar_scan_iterator};
20pub use iterator::{FilterIterator, LimitIterator, RowIterator, ScanIterator, SortIterator};
21pub use scan::create_scan_iterator;
22
23/// Execute a SELECT logical plan and return a query result.
24///
25/// This function uses an iterator-based execution model that processes rows
26/// through a pipeline of operators. This approach:
27/// - Enables early termination for LIMIT queries
28/// - Provides streaming execution after the initial scan
29/// - Allows composable query operators
30///
31/// Note: The Scan stage reads all matching rows into memory, but subsequent
32/// operators (Filter, Sort, Limit) process rows through an iterator pipeline.
33/// Sort operations additionally require materializing all input rows.
34pub fn execute_query<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
35    txn: &mut T,
36    catalog: &C,
37    plan: LogicalPlan,
38) -> Result<ExecutionResult> {
39    if let Some((pattern, projection, filter)) = knn::extract_knn_context(&plan) {
40        return knn::execute_knn_query(txn, catalog, &pattern, &projection, filter.as_ref());
41    }
42
43    let (mut iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan)?;
44
45    // Collect rows from iterator and apply projection
46    let mut rows = Vec::new();
47    while let Some(result) = iter.next_row() {
48        rows.push(result?);
49    }
50
51    let result = project::execute_project(rows, &projection, &schema)?;
52    Ok(ExecutionResult::Query(result))
53}
54
55/// Execute a SELECT logical plan and return a streaming query result.
56///
57/// This function returns a `QueryRowIterator` that yields rows one at a time,
58/// enabling true streaming output without materializing all rows upfront.
59///
60/// # FR-7 Streaming Output
61///
62/// This function implements the FR-7 requirement for streaming output.
63/// Rows are yielded through an iterator interface, and projection is applied
64/// on-the-fly as each row is consumed.
65///
66/// # Note
67///
68/// KNN queries currently fall back to the non-streaming path as they require
69/// specialized handling.
70pub fn execute_query_streaming<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
71    txn: &mut T,
72    catalog: &C,
73    plan: LogicalPlan,
74) -> Result<QueryRowIterator<'static>> {
75    // KNN queries not yet supported for streaming - fall back would need different handling
76    if knn::extract_knn_context(&plan).is_some() {
77        // For KNN, we materialize and wrap in VecIterator
78        let result = execute_query(txn, catalog, plan)?;
79        if let ExecutionResult::Query(qr) = result {
80            let column_names: Vec<String> = qr.columns.iter().map(|c| c.name.clone()).collect();
81            let schema: Vec<crate::catalog::ColumnMetadata> = qr
82                .columns
83                .iter()
84                .map(|c| crate::catalog::ColumnMetadata::new(&c.name, c.data_type.clone()))
85                .collect();
86            let rows: Vec<Row> = qr
87                .rows
88                .into_iter()
89                .enumerate()
90                .map(|(i, values)| Row::new(i as u64, values))
91                .collect();
92            let iter = iterator::VecIterator::new(rows, schema.clone());
93            return Ok(QueryRowIterator::new(
94                Box::new(iter),
95                Projection::All(column_names),
96                schema,
97            ));
98        }
99        return Err(ExecutorError::InvalidOperation {
100            operation: "execute_query_streaming".into(),
101            reason: "KNN query did not return Query result".into(),
102        });
103    }
104
105    let (iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan)?;
106
107    Ok(QueryRowIterator::new(iter, projection, schema))
108}
109
110/// Build an iterator pipeline from a logical plan.
111///
112/// This recursively constructs a tree of iterators that mirrors the logical plan
113/// structure. The scan phase reads rows into memory, then subsequent operators
114/// process them through an iterator pipeline enabling streaming execution and
115/// early termination.
116fn build_iterator_pipeline<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
117    txn: &mut T,
118    catalog: &C,
119    plan: LogicalPlan,
120) -> Result<(
121    Box<dyn RowIterator>,
122    Projection,
123    Vec<crate::catalog::ColumnMetadata>,
124)> {
125    match plan {
126        LogicalPlan::Scan { table, projection } => {
127            if table == LITERAL_TABLE {
128                let schema = Vec::new();
129                let rows = vec![Row::new(0, Vec::new())];
130                let iter = iterator::VecIterator::new(rows, schema.clone());
131                return Ok((Box::new(iter), projection, schema));
132            }
133            let table_meta = catalog
134                .get_table(&table)
135                .cloned()
136                .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
137
138            if table_meta.storage_options.storage_type == StorageType::Columnar {
139                let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
140                let rows = columnar_scan::execute_columnar_scan(txn, &table_meta, &columnar_scan)?;
141                let schema = table_meta.columns.clone();
142                let iter = iterator::VecIterator::new(rows, schema.clone());
143                return Ok((Box::new(iter), projection, schema));
144            }
145
146            // TODO: 現状は Scan で一度全件をメモリに載せてから iterator に渡しています。
147            // 将来ストリーミングを徹底する場合は、ScanIterator を活用できるよう
148            // トランザクションのライフタイム設計を見直すとよいです。
149            let rows = scan::execute_scan(txn, &table_meta)?;
150            let schema = table_meta.columns.clone();
151
152            // Wrap in VecIterator for consistent iterator-based processing
153            let iter = iterator::VecIterator::new(rows, schema.clone());
154            Ok((Box::new(iter), projection, schema))
155        }
156        LogicalPlan::Filter { input, predicate } => {
157            if let LogicalPlan::Scan { table, projection } = input.as_ref()
158                && let Some(table_meta) = catalog.get_table(table)
159                && table_meta.storage_options.storage_type == StorageType::Columnar
160            {
161                let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
162                    table_meta,
163                    projection.clone(),
164                    &predicate,
165                );
166                let rows = columnar_scan::execute_columnar_scan(txn, table_meta, &columnar_scan)?;
167                let schema = table_meta.columns.clone();
168                let iter = iterator::VecIterator::new(rows, schema.clone());
169                return Ok((Box::new(iter), projection.clone(), schema));
170            }
171            let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
172            let filter_iter = FilterIterator::new(input_iter, predicate);
173            Ok((Box::new(filter_iter), projection, schema))
174        }
175        LogicalPlan::Sort { input, order_by } => {
176            let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
177            let sort_iter = SortIterator::new(input_iter, &order_by)?;
178            Ok((Box::new(sort_iter), projection, schema))
179        }
180        LogicalPlan::Limit {
181            input,
182            limit,
183            offset,
184        } => {
185            let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
186            let limit_iter = LimitIterator::new(input_iter, limit, offset);
187            Ok((Box::new(limit_iter), projection, schema))
188        }
189        other => Err(ExecutorError::UnsupportedOperation(format!(
190            "unsupported query plan: {other:?}"
191        ))),
192    }
193}
194
195/// Build a streaming iterator pipeline from a logical plan (FR-7).
196///
197/// This version uses `ScanIterator` for row-based tables to enable true
198/// streaming without materializing all rows upfront. The returned iterator
199/// has lifetime `'a` tied to the transaction borrow.
200///
201/// # Limitations
202///
203/// - Columnar storage still materializes rows (uses VecIterator)
204/// - Sort operations materialize all input rows
205/// - KNN queries are not supported (use `build_iterator_pipeline` instead)
206pub fn build_streaming_pipeline<
207    'a,
208    'txn: 'a,
209    S: KVStore + 'txn,
210    C: Catalog + ?Sized,
211    T: SqlTxn<'txn, S>,
212>(
213    txn: &'a mut T,
214    catalog: &C,
215    plan: LogicalPlan,
216) -> Result<(
217    Box<dyn RowIterator + 'a>,
218    Projection,
219    Vec<crate::catalog::ColumnMetadata>,
220)> {
221    build_streaming_pipeline_inner(txn, catalog, plan)
222}
223
224/// Inner implementation of streaming pipeline builder.
225fn build_streaming_pipeline_inner<
226    'a,
227    'txn: 'a,
228    S: KVStore + 'txn,
229    C: Catalog + ?Sized,
230    T: SqlTxn<'txn, S>,
231>(
232    txn: &'a mut T,
233    catalog: &C,
234    plan: LogicalPlan,
235) -> Result<(
236    Box<dyn RowIterator + 'a>,
237    Projection,
238    Vec<crate::catalog::ColumnMetadata>,
239)> {
240    match plan {
241        LogicalPlan::Scan { table, projection } => {
242            if table == LITERAL_TABLE {
243                let schema = Vec::new();
244                let rows = vec![Row::new(0, Vec::new())];
245                let iter = iterator::VecIterator::new(rows, schema.clone());
246                return Ok((Box::new(iter), projection, schema));
247            }
248            let table_meta = catalog
249                .get_table(&table)
250                .cloned()
251                .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
252
253            if table_meta.storage_options.storage_type == StorageType::Columnar {
254                // Columnar storage: use ColumnarScanIterator for FR-7 streaming
255                let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
256                let schema = table_meta.columns.clone();
257                let iter =
258                    columnar_scan::create_columnar_scan_iterator(txn, &table_meta, &columnar_scan)?;
259                return Ok((Box::new(iter), projection, schema));
260            }
261
262            // Row-based storage: use ScanIterator for true streaming (FR-7)
263            let schema = table_meta.columns.clone();
264            let scan_iter = scan::create_scan_iterator(txn, &table_meta)?;
265            Ok((Box::new(scan_iter), projection, schema))
266        }
267        LogicalPlan::Filter { input, predicate } => {
268            if let LogicalPlan::Scan { table, projection } = input.as_ref()
269                && let Some(table_meta) = catalog.get_table(table)
270                && table_meta.storage_options.storage_type == StorageType::Columnar
271            {
272                // Columnar storage with filter: use ColumnarScanIterator for FR-7 streaming
273                let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
274                    table_meta,
275                    projection.clone(),
276                    &predicate,
277                );
278                let schema = table_meta.columns.clone();
279                let iter =
280                    columnar_scan::create_columnar_scan_iterator(txn, table_meta, &columnar_scan)?;
281                return Ok((Box::new(iter), projection.clone(), schema));
282            }
283            let (input_iter, projection, schema) =
284                build_streaming_pipeline_inner(txn, catalog, *input)?;
285            let filter_iter = FilterIterator::new(input_iter, predicate);
286            Ok((Box::new(filter_iter), projection, schema))
287        }
288        LogicalPlan::Sort { input, order_by } => {
289            let (input_iter, projection, schema) =
290                build_streaming_pipeline_inner(txn, catalog, *input)?;
291            let sort_iter = SortIterator::new(input_iter, &order_by)?;
292            Ok((Box::new(sort_iter), projection, schema))
293        }
294        LogicalPlan::Limit {
295            input,
296            limit,
297            offset,
298        } => {
299            let (input_iter, projection, schema) =
300                build_streaming_pipeline_inner(txn, catalog, *input)?;
301            let limit_iter = LimitIterator::new(input_iter, limit, offset);
302            Ok((Box::new(limit_iter), projection, schema))
303        }
304        other => Err(ExecutorError::UnsupportedOperation(format!(
305            "unsupported query plan: {other:?}"
306        ))),
307    }
308}
309
310/// Evaluate a typed expression against a row, returning SqlValue.
311fn eval_expr(expr: &crate::planner::typed_expr::TypedExpr, row: &Row) -> Result<SqlValue> {
312    let ctx = EvalContext::new(&row.values);
313    crate::executor::evaluator::evaluate(expr, &ctx)
314}
315
316/// Build column info name using alias fallback.
317fn column_name_from_projection(
318    projected: &crate::planner::typed_expr::ProjectedColumn,
319    idx: usize,
320) -> String {
321    projected
322        .alias
323        .clone()
324        .or_else(|| match &projected.expr.kind {
325            crate::planner::typed_expr::TypedExprKind::ColumnRef { column, .. } => {
326                Some(column.clone())
327            }
328            _ => None,
329        })
330        .unwrap_or_else(|| format!("col_{idx}"))
331}
332
333/// Build ColumnInfo from projection.
334fn column_info_from_projection(
335    projected: &crate::planner::typed_expr::ProjectedColumn,
336    idx: usize,
337) -> ColumnInfo {
338    ColumnInfo::new(
339        column_name_from_projection(projected, idx),
340        projected.expr.resolved_type.clone(),
341    )
342}
343
344/// Build ColumnInfo for Projection::All using schema.
345fn column_infos_from_all(
346    schema: &[crate::catalog::ColumnMetadata],
347    names: &[String],
348) -> Result<Vec<ColumnInfo>> {
349    names
350        .iter()
351        .map(|name| {
352            let col = schema
353                .iter()
354                .find(|c| &c.name == name)
355                .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))?;
356            Ok(ColumnInfo::new(name.clone(), col.data_type.clone()))
357        })
358        .collect()
359}
360
361#[cfg(test)]
362mod tests {
363    use super::*;
364    use crate::catalog::{ColumnMetadata, MemoryCatalog, TableMetadata};
365    use crate::executor::ddl::create_table::execute_create_table;
366    use crate::planner::typed_expr::TypedExpr;
367    use crate::planner::types::ResolvedType;
368    use crate::storage::TxnBridge;
369    use alopex_core::kv::memory::MemoryKV;
370    use std::sync::Arc;
371
372    #[test]
373    fn execute_query_scan_only_returns_rows() {
374        let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
375        let mut catalog = MemoryCatalog::new();
376        let table = TableMetadata::new(
377            "users",
378            vec![
379                ColumnMetadata::new("id", ResolvedType::Integer),
380                ColumnMetadata::new("name", ResolvedType::Text),
381            ],
382        );
383        let mut ddl_txn = bridge.begin_write().unwrap();
384        execute_create_table(&mut ddl_txn, &mut catalog, table.clone(), vec![], false).unwrap();
385        ddl_txn.commit().unwrap();
386
387        let mut txn = bridge.begin_write().unwrap();
388        crate::executor::dml::execute_insert(
389            &mut txn,
390            &catalog,
391            "users",
392            vec!["id".into(), "name".into()],
393            vec![vec![
394                TypedExpr::literal(
395                    crate::ast::expr::Literal::Number("1".into()),
396                    ResolvedType::Integer,
397                    crate::Span::default(),
398                ),
399                TypedExpr::literal(
400                    crate::ast::expr::Literal::String("alice".into()),
401                    ResolvedType::Text,
402                    crate::Span::default(),
403                ),
404            ]],
405        )
406        .unwrap();
407
408        let result = execute_query(
409            &mut txn,
410            &catalog,
411            LogicalPlan::scan(
412                "users".into(),
413                Projection::All(vec!["id".into(), "name".into()]),
414            ),
415        )
416        .unwrap();
417
418        match result {
419            ExecutionResult::Query(q) => {
420                assert_eq!(q.rows.len(), 1);
421                assert_eq!(q.columns.len(), 2);
422                assert_eq!(
423                    q.rows[0],
424                    vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]
425                );
426            }
427            other => panic!("unexpected result {other:?}"),
428        }
429    }
430}