1use alopex_core::kv::KVStore;
2
3use crate::catalog::{Catalog, StorageType};
4use crate::executor::evaluator::EvalContext;
5use crate::executor::{ExecutionResult, ExecutorError, QueryRowIterator, Result};
6use crate::planner::logical_plan::LogicalPlan;
7use crate::planner::typed_expr::Projection;
8use crate::storage::{SqlTxn, SqlValue};
9
10use super::{ColumnInfo, Row};
11
12pub mod columnar_scan;
13pub mod iterator;
14mod knn;
15mod project;
16mod scan;
17
18pub use columnar_scan::{ColumnarScanIterator, create_columnar_scan_iterator};
19pub use iterator::{FilterIterator, LimitIterator, RowIterator, ScanIterator, SortIterator};
20pub use scan::create_scan_iterator;
21
22pub fn execute_query<'txn, S: KVStore + 'txn, C: Catalog, T: SqlTxn<'txn, S>>(
34 txn: &mut T,
35 catalog: &C,
36 plan: LogicalPlan,
37) -> Result<ExecutionResult> {
38 if let Some((pattern, projection, filter)) = knn::extract_knn_context(&plan) {
39 return knn::execute_knn_query(txn, catalog, &pattern, &projection, filter.as_ref());
40 }
41
42 let (mut iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan)?;
43
44 let mut rows = Vec::new();
46 while let Some(result) = iter.next_row() {
47 rows.push(result?);
48 }
49
50 let result = project::execute_project(rows, &projection, &schema)?;
51 Ok(ExecutionResult::Query(result))
52}
53
54pub fn execute_query_streaming<'txn, S: KVStore + 'txn, C: Catalog, T: SqlTxn<'txn, S>>(
70 txn: &mut T,
71 catalog: &C,
72 plan: LogicalPlan,
73) -> Result<QueryRowIterator<'static>> {
74 if knn::extract_knn_context(&plan).is_some() {
76 let result = execute_query(txn, catalog, plan)?;
78 if let ExecutionResult::Query(qr) = result {
79 let column_names: Vec<String> = qr.columns.iter().map(|c| c.name.clone()).collect();
80 let schema: Vec<crate::catalog::ColumnMetadata> = qr
81 .columns
82 .iter()
83 .map(|c| crate::catalog::ColumnMetadata::new(&c.name, c.data_type.clone()))
84 .collect();
85 let rows: Vec<Row> = qr
86 .rows
87 .into_iter()
88 .enumerate()
89 .map(|(i, values)| Row::new(i as u64, values))
90 .collect();
91 let iter = iterator::VecIterator::new(rows, schema.clone());
92 return Ok(QueryRowIterator::new(
93 Box::new(iter),
94 Projection::All(column_names),
95 schema,
96 ));
97 }
98 return Err(ExecutorError::InvalidOperation {
99 operation: "execute_query_streaming".into(),
100 reason: "KNN query did not return Query result".into(),
101 });
102 }
103
104 let (iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan)?;
105
106 Ok(QueryRowIterator::new(iter, projection, schema))
107}
108
109fn build_iterator_pipeline<'txn, S: KVStore + 'txn, C: Catalog, T: SqlTxn<'txn, S>>(
116 txn: &mut T,
117 catalog: &C,
118 plan: LogicalPlan,
119) -> Result<(
120 Box<dyn RowIterator>,
121 Projection,
122 Vec<crate::catalog::ColumnMetadata>,
123)> {
124 match plan {
125 LogicalPlan::Scan { table, projection } => {
126 let table_meta = catalog
127 .get_table(&table)
128 .cloned()
129 .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
130
131 if table_meta.storage_options.storage_type == StorageType::Columnar {
132 let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
133 let rows = columnar_scan::execute_columnar_scan(txn, &table_meta, &columnar_scan)?;
134 let schema = table_meta.columns.clone();
135 let iter = iterator::VecIterator::new(rows, schema.clone());
136 return Ok((Box::new(iter), projection, schema));
137 }
138
139 let rows = scan::execute_scan(txn, &table_meta)?;
143 let schema = table_meta.columns.clone();
144
145 let iter = iterator::VecIterator::new(rows, schema.clone());
147 Ok((Box::new(iter), projection, schema))
148 }
149 LogicalPlan::Filter { input, predicate } => {
150 if let LogicalPlan::Scan { table, projection } = input.as_ref()
151 && let Some(table_meta) = catalog.get_table(table)
152 && table_meta.storage_options.storage_type == StorageType::Columnar
153 {
154 let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
155 table_meta,
156 projection.clone(),
157 &predicate,
158 );
159 let rows = columnar_scan::execute_columnar_scan(txn, table_meta, &columnar_scan)?;
160 let schema = table_meta.columns.clone();
161 let iter = iterator::VecIterator::new(rows, schema.clone());
162 return Ok((Box::new(iter), projection.clone(), schema));
163 }
164 let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
165 let filter_iter = FilterIterator::new(input_iter, predicate);
166 Ok((Box::new(filter_iter), projection, schema))
167 }
168 LogicalPlan::Sort { input, order_by } => {
169 let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
170 let sort_iter = SortIterator::new(input_iter, &order_by)?;
171 Ok((Box::new(sort_iter), projection, schema))
172 }
173 LogicalPlan::Limit {
174 input,
175 limit,
176 offset,
177 } => {
178 let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
179 let limit_iter = LimitIterator::new(input_iter, limit, offset);
180 Ok((Box::new(limit_iter), projection, schema))
181 }
182 other => Err(ExecutorError::UnsupportedOperation(format!(
183 "unsupported query plan: {other:?}"
184 ))),
185 }
186}
187
188pub fn build_streaming_pipeline<'a, 'txn: 'a, S: KVStore + 'txn, C: Catalog, T: SqlTxn<'txn, S>>(
200 txn: &'a mut T,
201 catalog: &C,
202 plan: LogicalPlan,
203) -> Result<(
204 Box<dyn RowIterator + 'a>,
205 Projection,
206 Vec<crate::catalog::ColumnMetadata>,
207)> {
208 build_streaming_pipeline_inner(txn, catalog, plan)
209}
210
211fn build_streaming_pipeline_inner<
213 'a,
214 'txn: 'a,
215 S: KVStore + 'txn,
216 C: Catalog,
217 T: SqlTxn<'txn, S>,
218>(
219 txn: &'a mut T,
220 catalog: &C,
221 plan: LogicalPlan,
222) -> Result<(
223 Box<dyn RowIterator + 'a>,
224 Projection,
225 Vec<crate::catalog::ColumnMetadata>,
226)> {
227 match plan {
228 LogicalPlan::Scan { table, projection } => {
229 let table_meta = catalog
230 .get_table(&table)
231 .cloned()
232 .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
233
234 if table_meta.storage_options.storage_type == StorageType::Columnar {
235 let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
237 let schema = table_meta.columns.clone();
238 let iter =
239 columnar_scan::create_columnar_scan_iterator(txn, &table_meta, &columnar_scan)?;
240 return Ok((Box::new(iter), projection, schema));
241 }
242
243 let schema = table_meta.columns.clone();
245 let scan_iter = scan::create_scan_iterator(txn, &table_meta)?;
246 Ok((Box::new(scan_iter), projection, schema))
247 }
248 LogicalPlan::Filter { input, predicate } => {
249 if let LogicalPlan::Scan { table, projection } = input.as_ref()
250 && let Some(table_meta) = catalog.get_table(table)
251 && table_meta.storage_options.storage_type == StorageType::Columnar
252 {
253 let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
255 table_meta,
256 projection.clone(),
257 &predicate,
258 );
259 let schema = table_meta.columns.clone();
260 let iter =
261 columnar_scan::create_columnar_scan_iterator(txn, table_meta, &columnar_scan)?;
262 return Ok((Box::new(iter), projection.clone(), schema));
263 }
264 let (input_iter, projection, schema) =
265 build_streaming_pipeline_inner(txn, catalog, *input)?;
266 let filter_iter = FilterIterator::new(input_iter, predicate);
267 Ok((Box::new(filter_iter), projection, schema))
268 }
269 LogicalPlan::Sort { input, order_by } => {
270 let (input_iter, projection, schema) =
271 build_streaming_pipeline_inner(txn, catalog, *input)?;
272 let sort_iter = SortIterator::new(input_iter, &order_by)?;
273 Ok((Box::new(sort_iter), projection, schema))
274 }
275 LogicalPlan::Limit {
276 input,
277 limit,
278 offset,
279 } => {
280 let (input_iter, projection, schema) =
281 build_streaming_pipeline_inner(txn, catalog, *input)?;
282 let limit_iter = LimitIterator::new(input_iter, limit, offset);
283 Ok((Box::new(limit_iter), projection, schema))
284 }
285 other => Err(ExecutorError::UnsupportedOperation(format!(
286 "unsupported query plan: {other:?}"
287 ))),
288 }
289}
290
291fn eval_expr(expr: &crate::planner::typed_expr::TypedExpr, row: &Row) -> Result<SqlValue> {
293 let ctx = EvalContext::new(&row.values);
294 crate::executor::evaluator::evaluate(expr, &ctx)
295}
296
297fn column_name_from_projection(
299 projected: &crate::planner::typed_expr::ProjectedColumn,
300 idx: usize,
301) -> String {
302 projected
303 .alias
304 .clone()
305 .or_else(|| match &projected.expr.kind {
306 crate::planner::typed_expr::TypedExprKind::ColumnRef { column, .. } => {
307 Some(column.clone())
308 }
309 _ => None,
310 })
311 .unwrap_or_else(|| format!("col_{idx}"))
312}
313
314fn column_info_from_projection(
316 projected: &crate::planner::typed_expr::ProjectedColumn,
317 idx: usize,
318) -> ColumnInfo {
319 ColumnInfo::new(
320 column_name_from_projection(projected, idx),
321 projected.expr.resolved_type.clone(),
322 )
323}
324
325fn column_infos_from_all(
327 schema: &[crate::catalog::ColumnMetadata],
328 names: &[String],
329) -> Result<Vec<ColumnInfo>> {
330 names
331 .iter()
332 .map(|name| {
333 let col = schema
334 .iter()
335 .find(|c| &c.name == name)
336 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))?;
337 Ok(ColumnInfo::new(name.clone(), col.data_type.clone()))
338 })
339 .collect()
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use crate::catalog::{ColumnMetadata, MemoryCatalog, TableMetadata};
346 use crate::executor::ddl::create_table::execute_create_table;
347 use crate::planner::typed_expr::TypedExpr;
348 use crate::planner::types::ResolvedType;
349 use crate::storage::TxnBridge;
350 use alopex_core::kv::memory::MemoryKV;
351 use std::sync::Arc;
352
353 #[test]
354 fn execute_query_scan_only_returns_rows() {
355 let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
356 let mut catalog = MemoryCatalog::new();
357 let table = TableMetadata::new(
358 "users",
359 vec![
360 ColumnMetadata::new("id", ResolvedType::Integer),
361 ColumnMetadata::new("name", ResolvedType::Text),
362 ],
363 );
364 let mut ddl_txn = bridge.begin_write().unwrap();
365 execute_create_table(&mut ddl_txn, &mut catalog, table.clone(), vec![], false).unwrap();
366 ddl_txn.commit().unwrap();
367
368 let mut txn = bridge.begin_write().unwrap();
369 crate::executor::dml::execute_insert(
370 &mut txn,
371 &catalog,
372 "users",
373 vec!["id".into(), "name".into()],
374 vec![vec![
375 TypedExpr::literal(
376 crate::ast::expr::Literal::Number("1".into()),
377 ResolvedType::Integer,
378 crate::Span::default(),
379 ),
380 TypedExpr::literal(
381 crate::ast::expr::Literal::String("alice".into()),
382 ResolvedType::Text,
383 crate::Span::default(),
384 ),
385 ]],
386 )
387 .unwrap();
388
389 let result = execute_query(
390 &mut txn,
391 &catalog,
392 LogicalPlan::scan(
393 "users".into(),
394 Projection::All(vec!["id".into(), "name".into()]),
395 ),
396 )
397 .unwrap();
398
399 match result {
400 ExecutionResult::Query(q) => {
401 assert_eq!(q.rows.len(), 1);
402 assert_eq!(q.columns.len(), 2);
403 assert_eq!(
404 q.rows[0],
405 vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]
406 );
407 }
408 other => panic!("unexpected result {other:?}"),
409 }
410 }
411}