1use super::select_ast::*;
4use crate::{schema::SchemaManager, storage::StorageEngine, Result, TableId, Value};
5use std::sync::Arc;
6
7#[derive(Debug)]
9pub struct SelectOptimizer {
10 #[allow(dead_code)]
11 schema: Arc<SchemaManager>,
12 #[allow(dead_code)]
13 storage: Arc<StorageEngine>,
14}
15
16#[derive(Debug, Clone)]
18pub struct OptimizedQueryPlan {
19 pub statement: SelectStatement,
20 pub execution_steps: Vec<ExecutionStep>,
21 pub sstable_predicates: Vec<SSTablePredicate>,
22 pub aggregation_plan: Option<AggregationPlan>,
23}
24
25#[derive(Debug, Clone)]
27pub enum ExecutionStep {
28 SSTableScan {
29 table: TableId,
30 predicates: Vec<SSTablePredicate>,
31 projection: Vec<String>,
32 },
33 Filter {
34 expression: WhereExpression,
35 },
36 Sort {
37 order_by: OrderByClause,
38 },
39 Aggregate {
40 plan: AggregationPlan,
41 },
42 Limit {
43 count: u64,
44 offset: Option<u64>,
45 },
46 Project {
47 columns: Vec<SelectExpression>,
48 },
49}
50
51#[derive(Debug, Clone)]
53pub struct SSTablePredicate {
54 pub column: String,
55 pub operation: SSTableFilterOp,
56 pub values: Vec<Value>,
57}
58
59#[derive(Debug, Clone)]
61pub enum SSTableFilterOp {
62 Equal,
63 Range,
64 In,
65 Prefix,
66 BloomFilter,
67}
68
69#[derive(Debug, Clone)]
71pub struct AggregationPlan {
72 pub group_by_columns: Vec<String>,
73 pub aggregates: Vec<AggregateComputation>,
74}
75
76#[derive(Debug, Clone)]
78pub struct AggregateComputation {
79 pub function: AggregateType,
80 pub column: String,
81 pub alias: String,
82 pub distinct: bool,
83}
84
85impl SelectOptimizer {
86 pub fn new(schema: Arc<SchemaManager>, storage: Arc<StorageEngine>) -> Self {
88 Self { schema, storage }
89 }
90
91 pub async fn optimize(&self, statement: SelectStatement) -> Result<OptimizedQueryPlan> {
93 let mut plan = OptimizedQueryPlan {
94 statement: statement.clone(),
95 execution_steps: Vec::new(),
96 sstable_predicates: Vec::new(),
97 aggregation_plan: None,
98 };
99
100 let Some(from_clause) = statement.from_clause.as_ref() else {
102 return Ok(plan);
103 };
104 let table_id = match from_clause {
105 FromClause::Table(t) | FromClause::TableAlias(t, _) => t.clone(),
106 };
107
108 if let Some(where_clause) = &statement.where_clause {
109 plan.sstable_predicates = collect_sstable_predicates(where_clause);
110 }
111
112 plan.execution_steps.push(ExecutionStep::SSTableScan {
113 table: table_id,
114 predicates: plan.sstable_predicates.clone(),
115 projection: extract_projection_columns(&statement.select_clause),
116 });
117
118 if let Some(where_clause) = &statement.where_clause {
121 if plan.sstable_predicates.is_empty() {
122 plan.execution_steps.push(ExecutionStep::Filter {
123 expression: where_clause.clone(),
124 });
125 }
126 }
127
128 let needs_aggregation = statement.requires_aggregation();
129 if needs_aggregation {
130 let agg_plan = plan_aggregation(&statement);
131 plan.execution_steps.push(ExecutionStep::Aggregate {
132 plan: agg_plan.clone(),
133 });
134 plan.aggregation_plan = Some(agg_plan);
135 }
136
137 if let Some(order_by) = &statement.order_by {
138 plan.execution_steps.push(ExecutionStep::Sort {
139 order_by: order_by.clone(),
140 });
141 }
142
143 if let Some(limit) = &statement.limit {
144 plan.execution_steps.push(ExecutionStep::Limit {
145 count: limit.count,
146 offset: statement.offset,
147 });
148 }
149
150 if !needs_aggregation {
153 if let SelectClause::Columns(exprs) | SelectClause::Distinct(exprs) =
154 &statement.select_clause
155 {
156 plan.execution_steps.push(ExecutionStep::Project {
157 columns: exprs.clone(),
158 });
159 }
160 }
161
162 Ok(plan)
163 }
164}
165
166fn collect_sstable_predicates(expr: &WhereExpression) -> Vec<SSTablePredicate> {
170 let mut out = Vec::new();
171 fn walk(expr: &WhereExpression, out: &mut Vec<SSTablePredicate>) {
172 match expr {
173 WhereExpression::Comparison(comp) => {
174 if let Some(predicate) = comparison_to_sstable_predicate(comp) {
175 out.push(predicate);
176 }
177 }
178 WhereExpression::And(exprs) => {
179 for e in exprs {
180 walk(e, out);
181 }
182 }
183 WhereExpression::Parentheses(inner) => walk(inner, out),
184 WhereExpression::Or(_) | WhereExpression::Not(_) => {}
185 }
186 }
187 walk(expr, &mut out);
188 out
189}
190
191fn comparison_to_sstable_predicate(comp: &ComparisonExpression) -> Option<SSTablePredicate> {
192 let SelectExpression::Column(col_ref) = &comp.left else {
193 return None;
194 };
195 let column = col_ref.column.clone();
196
197 match (&comp.operator, &comp.right) {
198 (ComparisonOperator::Equal, ComparisonRightSide::Value(value_expr)) => {
199 let value = literal_value(value_expr)?;
200 Some(SSTablePredicate {
201 column,
202 operation: SSTableFilterOp::Equal,
203 values: vec![value],
204 })
205 }
206 (ComparisonOperator::In, ComparisonRightSide::ValueList(value_exprs)) => {
207 let values: Vec<Value> = value_exprs.iter().filter_map(literal_value).collect();
208 (!values.is_empty()).then_some(SSTablePredicate {
209 column,
210 operation: SSTableFilterOp::In,
211 values,
212 })
213 }
214 (ComparisonOperator::Between, ComparisonRightSide::Range(start_expr, end_expr)) => {
215 let start = literal_value(start_expr)?;
216 let end = literal_value(end_expr)?;
217 Some(SSTablePredicate {
218 column,
219 operation: SSTableFilterOp::Range,
220 values: vec![start, end],
221 })
222 }
223 _ => None,
224 }
225}
226
227fn literal_value(expr: &SelectExpression) -> Option<Value> {
228 match expr {
229 SelectExpression::Literal(value) => Some(value.clone()),
230 _ => None,
231 }
232}
233
234fn extract_projection_columns(select_clause: &SelectClause) -> Vec<String> {
235 match select_clause {
236 SelectClause::All => Vec::new(),
237 SelectClause::Columns(exprs) | SelectClause::Distinct(exprs) => {
238 exprs.iter().filter_map(extract_column_name).collect()
239 }
240 }
241}
242
243fn extract_column_name(expr: &SelectExpression) -> Option<String> {
244 match expr {
245 SelectExpression::Column(col_ref) => Some(col_ref.column.clone()),
246 SelectExpression::Aliased(_, alias) => Some(alias.clone()),
247 _ => None,
248 }
249}
250
251fn plan_aggregation(statement: &SelectStatement) -> AggregationPlan {
252 let group_by_columns = statement
253 .group_by
254 .as_ref()
255 .map(|g| g.columns.iter().map(|col| col.column.clone()).collect())
256 .unwrap_or_default();
257
258 let mut aggregates = Vec::new();
259 if let SelectClause::Columns(exprs) = &statement.select_clause {
260 for expr in exprs {
261 if let SelectExpression::Aggregate(agg) = expr {
262 let (column, alias) = aggregate_column_and_alias(agg);
263 aggregates.push(AggregateComputation {
264 function: agg.function.clone(),
265 column,
266 alias,
267 distinct: agg.distinct,
268 });
269 }
270 }
271 }
272
273 AggregationPlan {
274 group_by_columns,
275 aggregates,
276 }
277}
278
279fn aggregate_column_and_alias(agg: &AggregateFunction) -> (String, String) {
283 let references_star = agg.args.is_empty()
284 || agg
285 .args
286 .iter()
287 .any(|arg| matches!(arg, SelectExpression::Column(c) if c.column == "*"));
288
289 if references_star {
290 return ("*".to_string(), format!("{:?}(*)", agg.function));
291 }
292
293 match agg.args.first().and_then(extract_column_name) {
294 Some(col_name) => {
295 let alias = format!("{:?}_{}", agg.function, col_name);
296 (col_name, alias)
297 }
298 None => ("*".to_string(), format!("{:?}", agg.function)),
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use crate::{platform::Platform, schema::SchemaManager, storage::StorageEngine, Config};
306 use tempfile::TempDir;
307
308 #[tokio::test]
309 async fn test_optimizer_creation() {
310 let temp_dir = TempDir::new().unwrap();
311 let config = Config::default();
312 let platform = Arc::new(Platform::new(&config).await.unwrap());
313 let storage = Arc::new(
314 StorageEngine::open(
315 temp_dir.path(),
316 &config,
317 platform.clone(),
318 #[cfg(feature = "state_machine")]
319 None,
320 )
321 .await
322 .unwrap(),
323 );
324 let schema = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
325 let optimizer = SelectOptimizer { schema, storage };
326 assert!(std::mem::size_of_val(&optimizer) > 0);
327 }
328}