1use alopex_core::kv::KVStore;
2
3use crate::ast::LITERAL_TABLE;
4use crate::catalog::{Catalog, StorageType};
5use crate::executor::evaluator::EvalContext;
6use crate::executor::{ExecutionResult, ExecutorError, QueryRowIterator, Result};
7use crate::planner::logical_plan::LogicalPlan;
8use crate::planner::typed_expr::Projection;
9use crate::storage::{SqlTxn, SqlValue};
10
11use super::{ColumnInfo, Row};
12
13pub mod columnar_scan;
14pub mod iterator;
15mod knn;
16mod project;
17mod scan;
18
19pub use columnar_scan::{ColumnarScanIterator, create_columnar_scan_iterator};
20pub use iterator::{FilterIterator, LimitIterator, RowIterator, ScanIterator, SortIterator};
21pub use scan::create_scan_iterator;
22
23pub fn execute_query<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
35 txn: &mut T,
36 catalog: &C,
37 plan: LogicalPlan,
38) -> Result<ExecutionResult> {
39 if let Some((pattern, projection, filter)) = knn::extract_knn_context(&plan) {
40 return knn::execute_knn_query(txn, catalog, &pattern, &projection, filter.as_ref());
41 }
42
43 let (mut iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan)?;
44
45 let mut rows = Vec::new();
47 while let Some(result) = iter.next_row() {
48 rows.push(result?);
49 }
50
51 let result = project::execute_project(rows, &projection, &schema)?;
52 Ok(ExecutionResult::Query(result))
53}
54
55pub fn execute_query_streaming<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
71 txn: &mut T,
72 catalog: &C,
73 plan: LogicalPlan,
74) -> Result<QueryRowIterator<'static>> {
75 if knn::extract_knn_context(&plan).is_some() {
77 let result = execute_query(txn, catalog, plan)?;
79 if let ExecutionResult::Query(qr) = result {
80 let column_names: Vec<String> = qr.columns.iter().map(|c| c.name.clone()).collect();
81 let schema: Vec<crate::catalog::ColumnMetadata> = qr
82 .columns
83 .iter()
84 .map(|c| crate::catalog::ColumnMetadata::new(&c.name, c.data_type.clone()))
85 .collect();
86 let rows: Vec<Row> = qr
87 .rows
88 .into_iter()
89 .enumerate()
90 .map(|(i, values)| Row::new(i as u64, values))
91 .collect();
92 let iter = iterator::VecIterator::new(rows, schema.clone());
93 return Ok(QueryRowIterator::new(
94 Box::new(iter),
95 Projection::All(column_names),
96 schema,
97 ));
98 }
99 return Err(ExecutorError::InvalidOperation {
100 operation: "execute_query_streaming".into(),
101 reason: "KNN query did not return Query result".into(),
102 });
103 }
104
105 let (iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan)?;
106
107 Ok(QueryRowIterator::new(iter, projection, schema))
108}
109
110fn build_iterator_pipeline<'txn, S: KVStore + 'txn, C: Catalog + ?Sized, T: SqlTxn<'txn, S>>(
117 txn: &mut T,
118 catalog: &C,
119 plan: LogicalPlan,
120) -> Result<(
121 Box<dyn RowIterator>,
122 Projection,
123 Vec<crate::catalog::ColumnMetadata>,
124)> {
125 match plan {
126 LogicalPlan::Scan { table, projection } => {
127 if table == LITERAL_TABLE {
128 let schema = Vec::new();
129 let rows = vec![Row::new(0, Vec::new())];
130 let iter = iterator::VecIterator::new(rows, schema.clone());
131 return Ok((Box::new(iter), projection, schema));
132 }
133 let table_meta = catalog
134 .get_table(&table)
135 .cloned()
136 .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
137
138 if table_meta.storage_options.storage_type == StorageType::Columnar {
139 let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
140 let rows = columnar_scan::execute_columnar_scan(txn, &table_meta, &columnar_scan)?;
141 let schema = table_meta.columns.clone();
142 let iter = iterator::VecIterator::new(rows, schema.clone());
143 return Ok((Box::new(iter), projection, schema));
144 }
145
146 let rows = scan::execute_scan(txn, &table_meta)?;
150 let schema = table_meta.columns.clone();
151
152 let iter = iterator::VecIterator::new(rows, schema.clone());
154 Ok((Box::new(iter), projection, schema))
155 }
156 LogicalPlan::Filter { input, predicate } => {
157 if let LogicalPlan::Scan { table, projection } = input.as_ref()
158 && let Some(table_meta) = catalog.get_table(table)
159 && table_meta.storage_options.storage_type == StorageType::Columnar
160 {
161 let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
162 table_meta,
163 projection.clone(),
164 &predicate,
165 );
166 let rows = columnar_scan::execute_columnar_scan(txn, table_meta, &columnar_scan)?;
167 let schema = table_meta.columns.clone();
168 let iter = iterator::VecIterator::new(rows, schema.clone());
169 return Ok((Box::new(iter), projection.clone(), schema));
170 }
171 let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
172 let filter_iter = FilterIterator::new(input_iter, predicate);
173 Ok((Box::new(filter_iter), projection, schema))
174 }
175 LogicalPlan::Sort { input, order_by } => {
176 let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
177 let sort_iter = SortIterator::new(input_iter, &order_by)?;
178 Ok((Box::new(sort_iter), projection, schema))
179 }
180 LogicalPlan::Limit {
181 input,
182 limit,
183 offset,
184 } => {
185 let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
186 let limit_iter = LimitIterator::new(input_iter, limit, offset);
187 Ok((Box::new(limit_iter), projection, schema))
188 }
189 other => Err(ExecutorError::UnsupportedOperation(format!(
190 "unsupported query plan: {other:?}"
191 ))),
192 }
193}
194
195pub fn build_streaming_pipeline<
207 'a,
208 'txn: 'a,
209 S: KVStore + 'txn,
210 C: Catalog + ?Sized,
211 T: SqlTxn<'txn, S>,
212>(
213 txn: &'a mut T,
214 catalog: &C,
215 plan: LogicalPlan,
216) -> Result<(
217 Box<dyn RowIterator + 'a>,
218 Projection,
219 Vec<crate::catalog::ColumnMetadata>,
220)> {
221 build_streaming_pipeline_inner(txn, catalog, plan)
222}
223
224fn build_streaming_pipeline_inner<
226 'a,
227 'txn: 'a,
228 S: KVStore + 'txn,
229 C: Catalog + ?Sized,
230 T: SqlTxn<'txn, S>,
231>(
232 txn: &'a mut T,
233 catalog: &C,
234 plan: LogicalPlan,
235) -> Result<(
236 Box<dyn RowIterator + 'a>,
237 Projection,
238 Vec<crate::catalog::ColumnMetadata>,
239)> {
240 match plan {
241 LogicalPlan::Scan { table, projection } => {
242 if table == LITERAL_TABLE {
243 let schema = Vec::new();
244 let rows = vec![Row::new(0, Vec::new())];
245 let iter = iterator::VecIterator::new(rows, schema.clone());
246 return Ok((Box::new(iter), projection, schema));
247 }
248 let table_meta = catalog
249 .get_table(&table)
250 .cloned()
251 .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
252
253 if table_meta.storage_options.storage_type == StorageType::Columnar {
254 let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
256 let schema = table_meta.columns.clone();
257 let iter =
258 columnar_scan::create_columnar_scan_iterator(txn, &table_meta, &columnar_scan)?;
259 return Ok((Box::new(iter), projection, schema));
260 }
261
262 let schema = table_meta.columns.clone();
264 let scan_iter = scan::create_scan_iterator(txn, &table_meta)?;
265 Ok((Box::new(scan_iter), projection, schema))
266 }
267 LogicalPlan::Filter { input, predicate } => {
268 if let LogicalPlan::Scan { table, projection } = input.as_ref()
269 && let Some(table_meta) = catalog.get_table(table)
270 && table_meta.storage_options.storage_type == StorageType::Columnar
271 {
272 let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
274 table_meta,
275 projection.clone(),
276 &predicate,
277 );
278 let schema = table_meta.columns.clone();
279 let iter =
280 columnar_scan::create_columnar_scan_iterator(txn, table_meta, &columnar_scan)?;
281 return Ok((Box::new(iter), projection.clone(), schema));
282 }
283 let (input_iter, projection, schema) =
284 build_streaming_pipeline_inner(txn, catalog, *input)?;
285 let filter_iter = FilterIterator::new(input_iter, predicate);
286 Ok((Box::new(filter_iter), projection, schema))
287 }
288 LogicalPlan::Sort { input, order_by } => {
289 let (input_iter, projection, schema) =
290 build_streaming_pipeline_inner(txn, catalog, *input)?;
291 let sort_iter = SortIterator::new(input_iter, &order_by)?;
292 Ok((Box::new(sort_iter), projection, schema))
293 }
294 LogicalPlan::Limit {
295 input,
296 limit,
297 offset,
298 } => {
299 let (input_iter, projection, schema) =
300 build_streaming_pipeline_inner(txn, catalog, *input)?;
301 let limit_iter = LimitIterator::new(input_iter, limit, offset);
302 Ok((Box::new(limit_iter), projection, schema))
303 }
304 other => Err(ExecutorError::UnsupportedOperation(format!(
305 "unsupported query plan: {other:?}"
306 ))),
307 }
308}
309
310fn eval_expr(expr: &crate::planner::typed_expr::TypedExpr, row: &Row) -> Result<SqlValue> {
312 let ctx = EvalContext::new(&row.values);
313 crate::executor::evaluator::evaluate(expr, &ctx)
314}
315
316fn column_name_from_projection(
318 projected: &crate::planner::typed_expr::ProjectedColumn,
319 idx: usize,
320) -> String {
321 projected
322 .alias
323 .clone()
324 .or_else(|| match &projected.expr.kind {
325 crate::planner::typed_expr::TypedExprKind::ColumnRef { column, .. } => {
326 Some(column.clone())
327 }
328 _ => None,
329 })
330 .unwrap_or_else(|| format!("col_{idx}"))
331}
332
333fn column_info_from_projection(
335 projected: &crate::planner::typed_expr::ProjectedColumn,
336 idx: usize,
337) -> ColumnInfo {
338 ColumnInfo::new(
339 column_name_from_projection(projected, idx),
340 projected.expr.resolved_type.clone(),
341 )
342}
343
344fn column_infos_from_all(
346 schema: &[crate::catalog::ColumnMetadata],
347 names: &[String],
348) -> Result<Vec<ColumnInfo>> {
349 names
350 .iter()
351 .map(|name| {
352 let col = schema
353 .iter()
354 .find(|c| &c.name == name)
355 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))?;
356 Ok(ColumnInfo::new(name.clone(), col.data_type.clone()))
357 })
358 .collect()
359}
360
361#[cfg(test)]
362mod tests {
363 use super::*;
364 use crate::catalog::{ColumnMetadata, MemoryCatalog, TableMetadata};
365 use crate::executor::ddl::create_table::execute_create_table;
366 use crate::planner::typed_expr::TypedExpr;
367 use crate::planner::types::ResolvedType;
368 use crate::storage::TxnBridge;
369 use alopex_core::kv::memory::MemoryKV;
370 use std::sync::Arc;
371
372 #[test]
373 fn execute_query_scan_only_returns_rows() {
374 let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
375 let mut catalog = MemoryCatalog::new();
376 let table = TableMetadata::new(
377 "users",
378 vec![
379 ColumnMetadata::new("id", ResolvedType::Integer),
380 ColumnMetadata::new("name", ResolvedType::Text),
381 ],
382 );
383 let mut ddl_txn = bridge.begin_write().unwrap();
384 execute_create_table(&mut ddl_txn, &mut catalog, table.clone(), vec![], false).unwrap();
385 ddl_txn.commit().unwrap();
386
387 let mut txn = bridge.begin_write().unwrap();
388 crate::executor::dml::execute_insert(
389 &mut txn,
390 &catalog,
391 "users",
392 vec!["id".into(), "name".into()],
393 vec![vec![
394 TypedExpr::literal(
395 crate::ast::expr::Literal::Number("1".into()),
396 ResolvedType::Integer,
397 crate::Span::default(),
398 ),
399 TypedExpr::literal(
400 crate::ast::expr::Literal::String("alice".into()),
401 ResolvedType::Text,
402 crate::Span::default(),
403 ),
404 ]],
405 )
406 .unwrap();
407
408 let result = execute_query(
409 &mut txn,
410 &catalog,
411 LogicalPlan::scan(
412 "users".into(),
413 Projection::All(vec!["id".into(), "name".into()]),
414 ),
415 )
416 .unwrap();
417
418 match result {
419 ExecutionResult::Query(q) => {
420 assert_eq!(q.rows.len(), 1);
421 assert_eq!(q.columns.len(), 2);
422 assert_eq!(
423 q.rows[0],
424 vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]
425 );
426 }
427 other => panic!("unexpected result {other:?}"),
428 }
429 }
430}