alopex_sql/executor/query/
mod.rs

1use alopex_core::kv::KVStore;
2
3use crate::catalog::{Catalog, StorageType};
4use crate::executor::evaluator::EvalContext;
5use crate::executor::{ExecutionResult, ExecutorError, Result};
6use crate::planner::logical_plan::LogicalPlan;
7use crate::planner::typed_expr::Projection;
8use crate::storage::{SqlTxn, SqlValue};
9
10use super::{ColumnInfo, Row};
11
12pub mod columnar_scan;
13pub mod iterator;
14mod knn;
15mod project;
16mod scan;
17
18pub use iterator::{FilterIterator, LimitIterator, RowIterator, SortIterator};
19
20/// Execute a SELECT logical plan and return a query result.
21///
22/// This function uses an iterator-based execution model that processes rows
23/// through a pipeline of operators. This approach:
24/// - Enables early termination for LIMIT queries
25/// - Provides streaming execution after the initial scan
26/// - Allows composable query operators
27///
28/// Note: The Scan stage reads all matching rows into memory, but subsequent
29/// operators (Filter, Sort, Limit) process rows through an iterator pipeline.
30/// Sort operations additionally require materializing all input rows.
31pub fn execute_query<'txn, S: KVStore + 'txn, C: Catalog, T: SqlTxn<'txn, S>>(
32    txn: &mut T,
33    catalog: &C,
34    plan: LogicalPlan,
35) -> Result<ExecutionResult> {
36    if let Some((pattern, projection, filter)) = knn::extract_knn_context(&plan) {
37        return knn::execute_knn_query(txn, catalog, &pattern, &projection, filter.as_ref());
38    }
39
40    let (mut iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan)?;
41
42    // Collect rows from iterator and apply projection
43    let mut rows = Vec::new();
44    while let Some(result) = iter.next_row() {
45        rows.push(result?);
46    }
47
48    let result = project::execute_project(rows, &projection, &schema)?;
49    Ok(ExecutionResult::Query(result))
50}
51
52/// Build an iterator pipeline from a logical plan.
53///
54/// This recursively constructs a tree of iterators that mirrors the logical plan
55/// structure. The scan phase reads rows into memory, then subsequent operators
56/// process them through an iterator pipeline enabling streaming execution and
57/// early termination.
58fn build_iterator_pipeline<'txn, S: KVStore + 'txn, C: Catalog, T: SqlTxn<'txn, S>>(
59    txn: &mut T,
60    catalog: &C,
61    plan: LogicalPlan,
62) -> Result<(
63    Box<dyn RowIterator>,
64    Projection,
65    Vec<crate::catalog::ColumnMetadata>,
66)> {
67    match plan {
68        LogicalPlan::Scan { table, projection } => {
69            let table_meta = catalog
70                .get_table(&table)
71                .cloned()
72                .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
73
74            if table_meta.storage_options.storage_type == StorageType::Columnar {
75                let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
76                let rows = columnar_scan::execute_columnar_scan(txn, &table_meta, &columnar_scan)?;
77                let schema = table_meta.columns.clone();
78                let iter = iterator::VecIterator::new(rows, schema.clone());
79                return Ok((Box::new(iter), projection, schema));
80            }
81
82            // TODO: 現状は Scan で一度全件をメモリに載せてから iterator に渡しています。
83            // 将来ストリーミングを徹底する場合は、ScanIterator を活用できるよう
84            // トランザクションのライフタイム設計を見直すとよいです。
85            let rows = scan::execute_scan(txn, &table_meta)?;
86            let schema = table_meta.columns.clone();
87
88            // Wrap in VecIterator for consistent iterator-based processing
89            let iter = iterator::VecIterator::new(rows, schema.clone());
90            Ok((Box::new(iter), projection, schema))
91        }
92        LogicalPlan::Filter { input, predicate } => {
93            if let LogicalPlan::Scan { table, projection } = input.as_ref()
94                && let Some(table_meta) = catalog.get_table(table)
95                && table_meta.storage_options.storage_type == StorageType::Columnar
96            {
97                let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
98                    table_meta,
99                    projection.clone(),
100                    &predicate,
101                );
102                let rows = columnar_scan::execute_columnar_scan(txn, table_meta, &columnar_scan)?;
103                let schema = table_meta.columns.clone();
104                let iter = iterator::VecIterator::new(rows, schema.clone());
105                return Ok((Box::new(iter), projection.clone(), schema));
106            }
107            let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
108            let filter_iter = FilterIterator::new(input_iter, predicate);
109            Ok((Box::new(filter_iter), projection, schema))
110        }
111        LogicalPlan::Sort { input, order_by } => {
112            let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
113            let sort_iter = SortIterator::new(input_iter, &order_by)?;
114            Ok((Box::new(sort_iter), projection, schema))
115        }
116        LogicalPlan::Limit {
117            input,
118            limit,
119            offset,
120        } => {
121            let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
122            let limit_iter = LimitIterator::new(input_iter, limit, offset);
123            Ok((Box::new(limit_iter), projection, schema))
124        }
125        other => Err(ExecutorError::UnsupportedOperation(format!(
126            "unsupported query plan: {other:?}"
127        ))),
128    }
129}
130
131/// Evaluate a typed expression against a row, returning SqlValue.
132fn eval_expr(expr: &crate::planner::typed_expr::TypedExpr, row: &Row) -> Result<SqlValue> {
133    let ctx = EvalContext::new(&row.values);
134    crate::executor::evaluator::evaluate(expr, &ctx)
135}
136
137/// Build column info name using alias fallback.
138fn column_name_from_projection(
139    projected: &crate::planner::typed_expr::ProjectedColumn,
140    idx: usize,
141) -> String {
142    projected
143        .alias
144        .clone()
145        .or_else(|| match &projected.expr.kind {
146            crate::planner::typed_expr::TypedExprKind::ColumnRef { column, .. } => {
147                Some(column.clone())
148            }
149            _ => None,
150        })
151        .unwrap_or_else(|| format!("col_{idx}"))
152}
153
154/// Build ColumnInfo from projection.
155fn column_info_from_projection(
156    projected: &crate::planner::typed_expr::ProjectedColumn,
157    idx: usize,
158) -> ColumnInfo {
159    ColumnInfo::new(
160        column_name_from_projection(projected, idx),
161        projected.expr.resolved_type.clone(),
162    )
163}
164
165/// Build ColumnInfo for Projection::All using schema.
166fn column_infos_from_all(
167    schema: &[crate::catalog::ColumnMetadata],
168    names: &[String],
169) -> Result<Vec<ColumnInfo>> {
170    names
171        .iter()
172        .map(|name| {
173            let col = schema
174                .iter()
175                .find(|c| &c.name == name)
176                .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))?;
177            Ok(ColumnInfo::new(name.clone(), col.data_type.clone()))
178        })
179        .collect()
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use crate::catalog::{ColumnMetadata, MemoryCatalog, TableMetadata};
186    use crate::executor::ddl::create_table::execute_create_table;
187    use crate::planner::typed_expr::TypedExpr;
188    use crate::planner::types::ResolvedType;
189    use crate::storage::TxnBridge;
190    use alopex_core::kv::memory::MemoryKV;
191    use std::sync::Arc;
192
193    #[test]
194    fn execute_query_scan_only_returns_rows() {
195        let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
196        let mut catalog = MemoryCatalog::new();
197        let table = TableMetadata::new(
198            "users",
199            vec![
200                ColumnMetadata::new("id", ResolvedType::Integer),
201                ColumnMetadata::new("name", ResolvedType::Text),
202            ],
203        );
204        let mut ddl_txn = bridge.begin_write().unwrap();
205        execute_create_table(&mut ddl_txn, &mut catalog, table.clone(), vec![], false).unwrap();
206        ddl_txn.commit().unwrap();
207
208        let mut txn = bridge.begin_write().unwrap();
209        crate::executor::dml::execute_insert(
210            &mut txn,
211            &catalog,
212            "users",
213            vec!["id".into(), "name".into()],
214            vec![vec![
215                TypedExpr::literal(
216                    crate::ast::expr::Literal::Number("1".into()),
217                    ResolvedType::Integer,
218                    crate::Span::default(),
219                ),
220                TypedExpr::literal(
221                    crate::ast::expr::Literal::String("alice".into()),
222                    ResolvedType::Text,
223                    crate::Span::default(),
224                ),
225            ]],
226        )
227        .unwrap();
228
229        let result = execute_query(
230            &mut txn,
231            &catalog,
232            LogicalPlan::scan(
233                "users".into(),
234                Projection::All(vec!["id".into(), "name".into()]),
235            ),
236        )
237        .unwrap();
238
239        match result {
240            ExecutionResult::Query(q) => {
241                assert_eq!(q.rows.len(), 1);
242                assert_eq!(q.columns.len(), 2);
243                assert_eq!(
244                    q.rows[0],
245                    vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]
246                );
247            }
248            other => panic!("unexpected result {other:?}"),
249        }
250    }
251}