Skip to main content

cqlite_core/query/
select_optimizer.rs

1//! Query optimizer for SELECT statements - basic planning and predicate pushdown.
2
3use super::select_ast::*;
4use crate::{schema::SchemaManager, storage::StorageEngine, Result, TableId, Value};
5use std::sync::Arc;
6
7/// Query optimizer for SELECT statements
8#[derive(Debug)]
9pub struct SelectOptimizer {
10    #[allow(dead_code)]
11    schema: Arc<SchemaManager>,
12    #[allow(dead_code)]
13    storage: Arc<StorageEngine>,
14}
15
16/// Optimized query execution plan
17#[derive(Debug, Clone)]
18pub struct OptimizedQueryPlan {
19    pub statement: SelectStatement,
20    pub execution_steps: Vec<ExecutionStep>,
21    pub sstable_predicates: Vec<SSTablePredicate>,
22    pub aggregation_plan: Option<AggregationPlan>,
23}
24
25/// Individual execution step
26#[derive(Debug, Clone)]
27pub enum ExecutionStep {
28    SSTableScan {
29        table: TableId,
30        predicates: Vec<SSTablePredicate>,
31        projection: Vec<String>,
32    },
33    Filter {
34        expression: WhereExpression,
35    },
36    Sort {
37        order_by: OrderByClause,
38    },
39    Aggregate {
40        plan: AggregationPlan,
41    },
42    Limit {
43        count: u64,
44        offset: Option<u64>,
45    },
46    Project {
47        columns: Vec<SelectExpression>,
48    },
49}
50
51/// SSTable-level predicate that can be pushed down
52#[derive(Debug, Clone)]
53pub struct SSTablePredicate {
54    pub column: String,
55    pub operation: SSTableFilterOp,
56    pub values: Vec<Value>,
57}
58
59/// SSTable filter operations
60#[derive(Debug, Clone)]
61pub enum SSTableFilterOp {
62    Equal,
63    Range,
64    In,
65    Prefix,
66    BloomFilter,
67}
68
69/// Aggregation execution plan
70#[derive(Debug, Clone)]
71pub struct AggregationPlan {
72    pub group_by_columns: Vec<String>,
73    pub aggregates: Vec<AggregateComputation>,
74}
75
76/// Individual aggregate computation
77#[derive(Debug, Clone)]
78pub struct AggregateComputation {
79    pub function: AggregateType,
80    pub column: String,
81    pub alias: String,
82    pub distinct: bool,
83}
84
85impl SelectOptimizer {
86    /// Create a new query optimizer
87    pub fn new(schema: Arc<SchemaManager>, storage: Arc<StorageEngine>) -> Self {
88        Self { schema, storage }
89    }
90
91    /// Optimize a SELECT statement
92    pub async fn optimize(&self, statement: SelectStatement) -> Result<OptimizedQueryPlan> {
93        let mut plan = OptimizedQueryPlan {
94            statement: statement.clone(),
95            execution_steps: Vec::new(),
96            sstable_predicates: Vec::new(),
97            aggregation_plan: None,
98        };
99
100        // Constant expressions (no FROM) need no execution steps.
101        let Some(from_clause) = statement.from_clause.as_ref() else {
102            return Ok(plan);
103        };
104        let table_id = match from_clause {
105            FromClause::Table(t) | FromClause::TableAlias(t, _) => t.clone(),
106        };
107
108        if let Some(where_clause) = &statement.where_clause {
109            plan.sstable_predicates = collect_sstable_predicates(where_clause);
110        }
111
112        plan.execution_steps.push(ExecutionStep::SSTableScan {
113            table: table_id,
114            predicates: plan.sstable_predicates.clone(),
115            projection: extract_projection_columns(&statement.select_clause),
116        });
117
118        // If we couldn't push any predicates down, keep the original WHERE as
119        // a post-scan filter step.
120        if let Some(where_clause) = &statement.where_clause {
121            if plan.sstable_predicates.is_empty() {
122                plan.execution_steps.push(ExecutionStep::Filter {
123                    expression: where_clause.clone(),
124                });
125            }
126        }
127
128        let needs_aggregation = statement.requires_aggregation();
129        if needs_aggregation {
130            let agg_plan = plan_aggregation(&statement);
131            plan.execution_steps.push(ExecutionStep::Aggregate {
132                plan: agg_plan.clone(),
133            });
134            plan.aggregation_plan = Some(agg_plan);
135        }
136
137        if let Some(order_by) = &statement.order_by {
138            plan.execution_steps.push(ExecutionStep::Sort {
139                order_by: order_by.clone(),
140            });
141        }
142
143        if let Some(limit) = &statement.limit {
144            plan.execution_steps.push(ExecutionStep::Limit {
145                count: limit.count,
146                offset: statement.offset,
147            });
148        }
149
150        // Aggregation already produces the final shape; an explicit Project
151        // step on top would be redundant.
152        if !needs_aggregation {
153            if let SelectClause::Columns(exprs) | SelectClause::Distinct(exprs) =
154                &statement.select_clause
155            {
156                plan.execution_steps.push(ExecutionStep::Project {
157                    columns: exprs.clone(),
158                });
159            }
160        }
161
162        Ok(plan)
163    }
164}
165
166/// Walk a WHERE expression tree, collecting comparisons that can be turned
167/// into SSTable-level predicates. OR/NOT branches are intentionally skipped:
168/// those require capabilities the SSTable filter pushdown doesn't have.
169fn collect_sstable_predicates(expr: &WhereExpression) -> Vec<SSTablePredicate> {
170    let mut out = Vec::new();
171    fn walk(expr: &WhereExpression, out: &mut Vec<SSTablePredicate>) {
172        match expr {
173            WhereExpression::Comparison(comp) => {
174                if let Some(predicate) = comparison_to_sstable_predicate(comp) {
175                    out.push(predicate);
176                }
177            }
178            WhereExpression::And(exprs) => {
179                for e in exprs {
180                    walk(e, out);
181                }
182            }
183            WhereExpression::Parentheses(inner) => walk(inner, out),
184            WhereExpression::Or(_) | WhereExpression::Not(_) => {}
185        }
186    }
187    walk(expr, &mut out);
188    out
189}
190
191fn comparison_to_sstable_predicate(comp: &ComparisonExpression) -> Option<SSTablePredicate> {
192    let SelectExpression::Column(col_ref) = &comp.left else {
193        return None;
194    };
195    let column = col_ref.column.clone();
196
197    match (&comp.operator, &comp.right) {
198        (ComparisonOperator::Equal, ComparisonRightSide::Value(value_expr)) => {
199            let value = literal_value(value_expr)?;
200            Some(SSTablePredicate {
201                column,
202                operation: SSTableFilterOp::Equal,
203                values: vec![value],
204            })
205        }
206        (ComparisonOperator::In, ComparisonRightSide::ValueList(value_exprs)) => {
207            let values: Vec<Value> = value_exprs.iter().filter_map(literal_value).collect();
208            (!values.is_empty()).then_some(SSTablePredicate {
209                column,
210                operation: SSTableFilterOp::In,
211                values,
212            })
213        }
214        (ComparisonOperator::Between, ComparisonRightSide::Range(start_expr, end_expr)) => {
215            let start = literal_value(start_expr)?;
216            let end = literal_value(end_expr)?;
217            Some(SSTablePredicate {
218                column,
219                operation: SSTableFilterOp::Range,
220                values: vec![start, end],
221            })
222        }
223        _ => None,
224    }
225}
226
227fn literal_value(expr: &SelectExpression) -> Option<Value> {
228    match expr {
229        SelectExpression::Literal(value) => Some(value.clone()),
230        _ => None,
231    }
232}
233
234fn extract_projection_columns(select_clause: &SelectClause) -> Vec<String> {
235    match select_clause {
236        SelectClause::All => Vec::new(),
237        SelectClause::Columns(exprs) | SelectClause::Distinct(exprs) => {
238            exprs.iter().filter_map(extract_column_name).collect()
239        }
240    }
241}
242
243fn extract_column_name(expr: &SelectExpression) -> Option<String> {
244    match expr {
245        SelectExpression::Column(col_ref) => Some(col_ref.column.clone()),
246        SelectExpression::Aliased(_, alias) => Some(alias.clone()),
247        _ => None,
248    }
249}
250
251fn plan_aggregation(statement: &SelectStatement) -> AggregationPlan {
252    let group_by_columns = statement
253        .group_by
254        .as_ref()
255        .map(|g| g.columns.iter().map(|col| col.column.clone()).collect())
256        .unwrap_or_default();
257
258    let mut aggregates = Vec::new();
259    if let SelectClause::Columns(exprs) = &statement.select_clause {
260        for expr in exprs {
261            if let SelectExpression::Aggregate(agg) = expr {
262                let (column, alias) = aggregate_column_and_alias(agg);
263                aggregates.push(AggregateComputation {
264                    function: agg.function.clone(),
265                    column,
266                    alias,
267                    distinct: agg.distinct,
268                });
269            }
270        }
271    }
272
273    AggregationPlan {
274        group_by_columns,
275        aggregates,
276    }
277}
278
279/// Resolve `(column, alias)` for an aggregate. `COUNT(*)` and any aggregate
280/// referencing `*` yields `("*", "Func(*)")`; a single named column yields
281/// `(name, "Func_name")`; anything else falls back to `("*", "Func")`.
282fn aggregate_column_and_alias(agg: &AggregateFunction) -> (String, String) {
283    let references_star = agg.args.is_empty()
284        || agg
285            .args
286            .iter()
287            .any(|arg| matches!(arg, SelectExpression::Column(c) if c.column == "*"));
288
289    if references_star {
290        return ("*".to_string(), format!("{:?}(*)", agg.function));
291    }
292
293    match agg.args.first().and_then(extract_column_name) {
294        Some(col_name) => {
295            let alias = format!("{:?}_{}", agg.function, col_name);
296            (col_name, alias)
297        }
298        None => ("*".to_string(), format!("{:?}", agg.function)),
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use crate::{platform::Platform, schema::SchemaManager, storage::StorageEngine, Config};
306    use tempfile::TempDir;
307
308    #[tokio::test]
309    async fn test_optimizer_creation() {
310        let temp_dir = TempDir::new().unwrap();
311        let config = Config::default();
312        let platform = Arc::new(Platform::new(&config).await.unwrap());
313        let storage = Arc::new(
314            StorageEngine::open(
315                temp_dir.path(),
316                &config,
317                platform.clone(),
318                #[cfg(feature = "state_machine")]
319                None,
320            )
321            .await
322            .unwrap(),
323        );
324        let schema = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
325        let optimizer = SelectOptimizer { schema, storage };
326        assert!(std::mem::size_of_val(&optimizer) > 0);
327    }
328}