1use alopex_core::kv::KVStore;
2
3use crate::catalog::{Catalog, StorageType};
4use crate::executor::evaluator::EvalContext;
5use crate::executor::{ExecutionResult, ExecutorError, QueryRowIterator, Result};
6use crate::planner::logical_plan::LogicalPlan;
7use crate::planner::typed_expr::Projection;
8use crate::storage::{SqlTxn, SqlValue};
9
10use super::{ColumnInfo, Row};
11
12pub mod columnar_scan;
13pub mod iterator;
14mod knn;
15mod project;
16mod scan;
17
18pub use columnar_scan::{ColumnarScanIterator, create_columnar_scan_iterator};
19pub use iterator::{FilterIterator, LimitIterator, RowIterator, ScanIterator, SortIterator};
20pub use scan::create_scan_iterator;
21
22pub fn execute_query<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
34 txn: &mut T,
35 catalog: &C,
36 plan: LogicalPlan,
37) -> Result<ExecutionResult> {
38 if let Some((pattern, projection, filter)) = knn::extract_knn_context(&plan) {
39 return knn::execute_knn_query(txn, catalog, &pattern, &projection, filter.as_ref());
40 }
41
42 let (mut iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan)?;
43
44 let mut rows = Vec::new();
46 while let Some(result) = iter.next_row() {
47 rows.push(result?);
48 }
49
50 let result = project::execute_project(rows, &projection, &schema)?;
51 Ok(ExecutionResult::Query(result))
52}
53
54pub fn execute_query_streaming<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
70 txn: &mut T,
71 catalog: &C,
72 plan: LogicalPlan,
73) -> Result<QueryRowIterator<'static>> {
74 if knn::extract_knn_context(&plan).is_some() {
76 let result = execute_query(txn, catalog, plan)?;
78 if let ExecutionResult::Query(qr) = result {
79 let column_names: Vec<String> = qr.columns.iter().map(|c| c.name.clone()).collect();
80 let schema: Vec<crate::catalog::ColumnMetadata> = qr
81 .columns
82 .iter()
83 .map(|c| crate::catalog::ColumnMetadata::new(&c.name, c.data_type.clone()))
84 .collect();
85 let rows: Vec<Row> = qr
86 .rows
87 .into_iter()
88 .enumerate()
89 .map(|(i, values)| Row::new(i as u64, values))
90 .collect();
91 let iter = iterator::VecIterator::new(rows, schema.clone());
92 return Ok(QueryRowIterator::new(
93 Box::new(iter),
94 Projection::All(column_names),
95 schema,
96 ));
97 }
98 return Err(ExecutorError::InvalidOperation {
99 operation: "execute_query_streaming".into(),
100 reason: "KNN query did not return Query result".into(),
101 });
102 }
103
104 let (iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan)?;
105
106 Ok(QueryRowIterator::new(iter, projection, schema))
107}
108
109fn build_iterator_pipeline<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
116 txn: &mut T,
117 catalog: &C,
118 plan: LogicalPlan,
119) -> Result<(
120 Box<dyn RowIterator>,
121 Projection,
122 Vec<crate::catalog::ColumnMetadata>,
123)> {
124 match plan {
125 LogicalPlan::Scan { table, projection } => {
126 let table_meta = catalog
127 .get_table(&table)
128 .cloned()
129 .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
130
131 if table_meta.storage_options.storage_type == StorageType::Columnar {
132 let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
133 let rows = columnar_scan::execute_columnar_scan(txn, &table_meta, &columnar_scan)?;
134 let schema = table_meta.columns.clone();
135 let iter = iterator::VecIterator::new(rows, schema.clone());
136 return Ok((Box::new(iter), projection, schema));
137 }
138
139 let rows = scan::execute_scan(txn, &table_meta)?;
143 let schema = table_meta.columns.clone();
144
145 let iter = iterator::VecIterator::new(rows, schema.clone());
147 Ok((Box::new(iter), projection, schema))
148 }
149 LogicalPlan::Filter { input, predicate } => {
150 if let LogicalPlan::Scan { table, projection } = input.as_ref()
151 && let Some(table_meta) = catalog.get_table(table)
152 && table_meta.storage_options.storage_type == StorageType::Columnar
153 {
154 let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
155 table_meta,
156 projection.clone(),
157 &predicate,
158 );
159 let rows = columnar_scan::execute_columnar_scan(txn, table_meta, &columnar_scan)?;
160 let schema = table_meta.columns.clone();
161 let iter = iterator::VecIterator::new(rows, schema.clone());
162 return Ok((Box::new(iter), projection.clone(), schema));
163 }
164 let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
165 let filter_iter = FilterIterator::new(input_iter, predicate);
166 Ok((Box::new(filter_iter), projection, schema))
167 }
168 LogicalPlan::Sort { input, order_by } => {
169 let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
170 let sort_iter = SortIterator::new(input_iter, &order_by)?;
171 Ok((Box::new(sort_iter), projection, schema))
172 }
173 LogicalPlan::Limit {
174 input,
175 limit,
176 offset,
177 } => {
178 let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
179 let limit_iter = LimitIterator::new(input_iter, limit, offset);
180 Ok((Box::new(limit_iter), projection, schema))
181 }
182 other => Err(ExecutorError::UnsupportedOperation(format!(
183 "unsupported query plan: {other:?}"
184 ))),
185 }
186}
187
188pub fn build_streaming_pipeline<
200 'a,
201 'txn: 'a,
202 S: KVStore + 'txn,
203 C: Catalog + ?Sized,
204 T: SqlTxn<'txn, S>,
205>(
206 txn: &'a mut T,
207 catalog: &C,
208 plan: LogicalPlan,
209) -> Result<(
210 Box<dyn RowIterator + 'a>,
211 Projection,
212 Vec<crate::catalog::ColumnMetadata>,
213)> {
214 build_streaming_pipeline_inner(txn, catalog, plan)
215}
216
217fn build_streaming_pipeline_inner<
219 'a,
220 'txn: 'a,
221 S: KVStore + 'txn,
222 C: Catalog + ?Sized,
223 T: SqlTxn<'txn, S>,
224>(
225 txn: &'a mut T,
226 catalog: &C,
227 plan: LogicalPlan,
228) -> Result<(
229 Box<dyn RowIterator + 'a>,
230 Projection,
231 Vec<crate::catalog::ColumnMetadata>,
232)> {
233 match plan {
234 LogicalPlan::Scan { table, projection } => {
235 let table_meta = catalog
236 .get_table(&table)
237 .cloned()
238 .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
239
240 if table_meta.storage_options.storage_type == StorageType::Columnar {
241 let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
243 let schema = table_meta.columns.clone();
244 let iter =
245 columnar_scan::create_columnar_scan_iterator(txn, &table_meta, &columnar_scan)?;
246 return Ok((Box::new(iter), projection, schema));
247 }
248
249 let schema = table_meta.columns.clone();
251 let scan_iter = scan::create_scan_iterator(txn, &table_meta)?;
252 Ok((Box::new(scan_iter), projection, schema))
253 }
254 LogicalPlan::Filter { input, predicate } => {
255 if let LogicalPlan::Scan { table, projection } = input.as_ref()
256 && let Some(table_meta) = catalog.get_table(table)
257 && table_meta.storage_options.storage_type == StorageType::Columnar
258 {
259 let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
261 table_meta,
262 projection.clone(),
263 &predicate,
264 );
265 let schema = table_meta.columns.clone();
266 let iter =
267 columnar_scan::create_columnar_scan_iterator(txn, table_meta, &columnar_scan)?;
268 return Ok((Box::new(iter), projection.clone(), schema));
269 }
270 let (input_iter, projection, schema) =
271 build_streaming_pipeline_inner(txn, catalog, *input)?;
272 let filter_iter = FilterIterator::new(input_iter, predicate);
273 Ok((Box::new(filter_iter), projection, schema))
274 }
275 LogicalPlan::Sort { input, order_by } => {
276 let (input_iter, projection, schema) =
277 build_streaming_pipeline_inner(txn, catalog, *input)?;
278 let sort_iter = SortIterator::new(input_iter, &order_by)?;
279 Ok((Box::new(sort_iter), projection, schema))
280 }
281 LogicalPlan::Limit {
282 input,
283 limit,
284 offset,
285 } => {
286 let (input_iter, projection, schema) =
287 build_streaming_pipeline_inner(txn, catalog, *input)?;
288 let limit_iter = LimitIterator::new(input_iter, limit, offset);
289 Ok((Box::new(limit_iter), projection, schema))
290 }
291 other => Err(ExecutorError::UnsupportedOperation(format!(
292 "unsupported query plan: {other:?}"
293 ))),
294 }
295}
296
297fn eval_expr(expr: &crate::planner::typed_expr::TypedExpr, row: &Row) -> Result<SqlValue> {
299 let ctx = EvalContext::new(&row.values);
300 crate::executor::evaluator::evaluate(expr, &ctx)
301}
302
303fn column_name_from_projection(
305 projected: &crate::planner::typed_expr::ProjectedColumn,
306 idx: usize,
307) -> String {
308 projected
309 .alias
310 .clone()
311 .or_else(|| match &projected.expr.kind {
312 crate::planner::typed_expr::TypedExprKind::ColumnRef { column, .. } => {
313 Some(column.clone())
314 }
315 _ => None,
316 })
317 .unwrap_or_else(|| format!("col_{idx}"))
318}
319
320fn column_info_from_projection(
322 projected: &crate::planner::typed_expr::ProjectedColumn,
323 idx: usize,
324) -> ColumnInfo {
325 ColumnInfo::new(
326 column_name_from_projection(projected, idx),
327 projected.expr.resolved_type.clone(),
328 )
329}
330
331fn column_infos_from_all(
333 schema: &[crate::catalog::ColumnMetadata],
334 names: &[String],
335) -> Result<Vec<ColumnInfo>> {
336 names
337 .iter()
338 .map(|name| {
339 let col = schema
340 .iter()
341 .find(|c| &c.name == name)
342 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))?;
343 Ok(ColumnInfo::new(name.clone(), col.data_type.clone()))
344 })
345 .collect()
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351 use crate::catalog::{ColumnMetadata, MemoryCatalog, TableMetadata};
352 use crate::executor::ddl::create_table::execute_create_table;
353 use crate::planner::typed_expr::TypedExpr;
354 use crate::planner::types::ResolvedType;
355 use crate::storage::TxnBridge;
356 use alopex_core::kv::memory::MemoryKV;
357 use std::sync::Arc;
358
359 #[test]
360 fn execute_query_scan_only_returns_rows() {
361 let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
362 let mut catalog = MemoryCatalog::new();
363 let table = TableMetadata::new(
364 "users",
365 vec![
366 ColumnMetadata::new("id", ResolvedType::Integer),
367 ColumnMetadata::new("name", ResolvedType::Text),
368 ],
369 );
370 let mut ddl_txn = bridge.begin_write().unwrap();
371 execute_create_table(&mut ddl_txn, &mut catalog, table.clone(), vec![], false).unwrap();
372 ddl_txn.commit().unwrap();
373
374 let mut txn = bridge.begin_write().unwrap();
375 crate::executor::dml::execute_insert(
376 &mut txn,
377 &catalog,
378 "users",
379 vec!["id".into(), "name".into()],
380 vec![vec![
381 TypedExpr::literal(
382 crate::ast::expr::Literal::Number("1".into()),
383 ResolvedType::Integer,
384 crate::Span::default(),
385 ),
386 TypedExpr::literal(
387 crate::ast::expr::Literal::String("alice".into()),
388 ResolvedType::Text,
389 crate::Span::default(),
390 ),
391 ]],
392 )
393 .unwrap();
394
395 let result = execute_query(
396 &mut txn,
397 &catalog,
398 LogicalPlan::scan(
399 "users".into(),
400 Projection::All(vec!["id".into(), "name".into()]),
401 ),
402 )
403 .unwrap();
404
405 match result {
406 ExecutionResult::Query(q) => {
407 assert_eq!(q.rows.len(), 1);
408 assert_eq!(q.columns.len(), 2);
409 assert_eq!(
410 q.rows[0],
411 vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]
412 );
413 }
414 other => panic!("unexpected result {other:?}"),
415 }
416 }
417}