alopex_sql/executor/query/
mod.rs1use alopex_core::kv::KVStore;
2
3use crate::catalog::{Catalog, StorageType};
4use crate::executor::evaluator::EvalContext;
5use crate::executor::{ExecutionResult, ExecutorError, Result};
6use crate::planner::logical_plan::LogicalPlan;
7use crate::planner::typed_expr::Projection;
8use crate::storage::{SqlTxn, SqlValue};
9
10use super::{ColumnInfo, Row};
11
12pub mod columnar_scan;
13pub mod iterator;
14mod knn;
15mod project;
16mod scan;
17
18pub use iterator::{FilterIterator, LimitIterator, RowIterator, SortIterator};
19
20pub fn execute_query<'txn, S: KVStore + 'txn, C: Catalog, T: SqlTxn<'txn, S>>(
32 txn: &mut T,
33 catalog: &C,
34 plan: LogicalPlan,
35) -> Result<ExecutionResult> {
36 if let Some((pattern, projection, filter)) = knn::extract_knn_context(&plan) {
37 return knn::execute_knn_query(txn, catalog, &pattern, &projection, filter.as_ref());
38 }
39
40 let (mut iter, projection, schema) = build_iterator_pipeline(txn, catalog, plan)?;
41
42 let mut rows = Vec::new();
44 while let Some(result) = iter.next_row() {
45 rows.push(result?);
46 }
47
48 let result = project::execute_project(rows, &projection, &schema)?;
49 Ok(ExecutionResult::Query(result))
50}
51
52fn build_iterator_pipeline<'txn, S: KVStore + 'txn, C: Catalog, T: SqlTxn<'txn, S>>(
59 txn: &mut T,
60 catalog: &C,
61 plan: LogicalPlan,
62) -> Result<(
63 Box<dyn RowIterator>,
64 Projection,
65 Vec<crate::catalog::ColumnMetadata>,
66)> {
67 match plan {
68 LogicalPlan::Scan { table, projection } => {
69 let table_meta = catalog
70 .get_table(&table)
71 .cloned()
72 .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?;
73
74 if table_meta.storage_options.storage_type == StorageType::Columnar {
75 let columnar_scan = columnar_scan::build_columnar_scan(&table_meta, &projection);
76 let rows = columnar_scan::execute_columnar_scan(txn, &table_meta, &columnar_scan)?;
77 let schema = table_meta.columns.clone();
78 let iter = iterator::VecIterator::new(rows, schema.clone());
79 return Ok((Box::new(iter), projection, schema));
80 }
81
82 let rows = scan::execute_scan(txn, &table_meta)?;
86 let schema = table_meta.columns.clone();
87
88 let iter = iterator::VecIterator::new(rows, schema.clone());
90 Ok((Box::new(iter), projection, schema))
91 }
92 LogicalPlan::Filter { input, predicate } => {
93 if let LogicalPlan::Scan { table, projection } = input.as_ref()
94 && let Some(table_meta) = catalog.get_table(table)
95 && table_meta.storage_options.storage_type == StorageType::Columnar
96 {
97 let columnar_scan = columnar_scan::build_columnar_scan_for_filter(
98 table_meta,
99 projection.clone(),
100 &predicate,
101 );
102 let rows = columnar_scan::execute_columnar_scan(txn, table_meta, &columnar_scan)?;
103 let schema = table_meta.columns.clone();
104 let iter = iterator::VecIterator::new(rows, schema.clone());
105 return Ok((Box::new(iter), projection.clone(), schema));
106 }
107 let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
108 let filter_iter = FilterIterator::new(input_iter, predicate);
109 Ok((Box::new(filter_iter), projection, schema))
110 }
111 LogicalPlan::Sort { input, order_by } => {
112 let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
113 let sort_iter = SortIterator::new(input_iter, &order_by)?;
114 Ok((Box::new(sort_iter), projection, schema))
115 }
116 LogicalPlan::Limit {
117 input,
118 limit,
119 offset,
120 } => {
121 let (input_iter, projection, schema) = build_iterator_pipeline(txn, catalog, *input)?;
122 let limit_iter = LimitIterator::new(input_iter, limit, offset);
123 Ok((Box::new(limit_iter), projection, schema))
124 }
125 other => Err(ExecutorError::UnsupportedOperation(format!(
126 "unsupported query plan: {other:?}"
127 ))),
128 }
129}
130
131fn eval_expr(expr: &crate::planner::typed_expr::TypedExpr, row: &Row) -> Result<SqlValue> {
133 let ctx = EvalContext::new(&row.values);
134 crate::executor::evaluator::evaluate(expr, &ctx)
135}
136
137fn column_name_from_projection(
139 projected: &crate::planner::typed_expr::ProjectedColumn,
140 idx: usize,
141) -> String {
142 projected
143 .alias
144 .clone()
145 .or_else(|| match &projected.expr.kind {
146 crate::planner::typed_expr::TypedExprKind::ColumnRef { column, .. } => {
147 Some(column.clone())
148 }
149 _ => None,
150 })
151 .unwrap_or_else(|| format!("col_{idx}"))
152}
153
154fn column_info_from_projection(
156 projected: &crate::planner::typed_expr::ProjectedColumn,
157 idx: usize,
158) -> ColumnInfo {
159 ColumnInfo::new(
160 column_name_from_projection(projected, idx),
161 projected.expr.resolved_type.clone(),
162 )
163}
164
165fn column_infos_from_all(
167 schema: &[crate::catalog::ColumnMetadata],
168 names: &[String],
169) -> Result<Vec<ColumnInfo>> {
170 names
171 .iter()
172 .map(|name| {
173 let col = schema
174 .iter()
175 .find(|c| &c.name == name)
176 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))?;
177 Ok(ColumnInfo::new(name.clone(), col.data_type.clone()))
178 })
179 .collect()
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use crate::catalog::{ColumnMetadata, MemoryCatalog, TableMetadata};
186 use crate::executor::ddl::create_table::execute_create_table;
187 use crate::planner::typed_expr::TypedExpr;
188 use crate::planner::types::ResolvedType;
189 use crate::storage::TxnBridge;
190 use alopex_core::kv::memory::MemoryKV;
191 use std::sync::Arc;
192
193 #[test]
194 fn execute_query_scan_only_returns_rows() {
195 let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
196 let mut catalog = MemoryCatalog::new();
197 let table = TableMetadata::new(
198 "users",
199 vec![
200 ColumnMetadata::new("id", ResolvedType::Integer),
201 ColumnMetadata::new("name", ResolvedType::Text),
202 ],
203 );
204 let mut ddl_txn = bridge.begin_write().unwrap();
205 execute_create_table(&mut ddl_txn, &mut catalog, table.clone(), vec![], false).unwrap();
206 ddl_txn.commit().unwrap();
207
208 let mut txn = bridge.begin_write().unwrap();
209 crate::executor::dml::execute_insert(
210 &mut txn,
211 &catalog,
212 "users",
213 vec!["id".into(), "name".into()],
214 vec![vec![
215 TypedExpr::literal(
216 crate::ast::expr::Literal::Number("1".into()),
217 ResolvedType::Integer,
218 crate::Span::default(),
219 ),
220 TypedExpr::literal(
221 crate::ast::expr::Literal::String("alice".into()),
222 ResolvedType::Text,
223 crate::Span::default(),
224 ),
225 ]],
226 )
227 .unwrap();
228
229 let result = execute_query(
230 &mut txn,
231 &catalog,
232 LogicalPlan::scan(
233 "users".into(),
234 Projection::All(vec!["id".into(), "name".into()]),
235 ),
236 )
237 .unwrap();
238
239 match result {
240 ExecutionResult::Query(q) => {
241 assert_eq!(q.rows.len(), 1);
242 assert_eq!(q.columns.len(), 2);
243 assert_eq!(
244 q.rows[0],
245 vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]
246 );
247 }
248 other => panic!("unexpected result {other:?}"),
249 }
250 }
251}