Skip to main content

citadel_sql/
planner.rs

1//! Query planner: chooses between seq scan, PK lookup, or index scan.
2
3use crate::encoding::encode_composite_key;
4use crate::parser::{BinOp, Expr};
5use crate::types::{IndexDef, TableSchema, Value};
6
7#[derive(Debug, Clone)]
8pub enum ScanPlan {
9    SeqScan,
10    PkLookup {
11        pk_values: Vec<Value>,
12    },
13    PkRangeScan {
14        start_key: Vec<u8>,
15        range_conds: Vec<(BinOp, Value)>,
16        num_pk_cols: usize,
17    },
18    IndexScan {
19        index_name: String,
20        idx_table: Vec<u8>,
21        prefix: Vec<u8>,
22        num_prefix_cols: usize,
23        range_conds: Vec<(BinOp, Value)>,
24        is_unique: bool,
25        index_columns: Vec<u16>,
26    },
27}
28
29struct SimplePredicate {
30    col_idx: usize,
31    op: BinOp,
32    value: Value,
33}
34
35fn flatten_and(expr: &Expr) -> Vec<&Expr> {
36    match expr {
37        Expr::BinaryOp {
38            left,
39            op: BinOp::And,
40            right,
41        } => {
42            let mut v = flatten_and(left);
43            v.extend(flatten_and(right));
44            v
45        }
46        _ => vec![expr],
47    }
48}
49
50fn is_comparison(op: BinOp) -> bool {
51    matches!(
52        op,
53        BinOp::Eq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq
54    )
55}
56
57fn is_range_op(op: BinOp) -> bool {
58    matches!(op, BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq)
59}
60
61fn flip_op(op: BinOp) -> BinOp {
62    match op {
63        BinOp::Lt => BinOp::Gt,
64        BinOp::LtEq => BinOp::GtEq,
65        BinOp::Gt => BinOp::Lt,
66        BinOp::GtEq => BinOp::LtEq,
67        other => other,
68    }
69}
70
71fn resolve_column_name(expr: &Expr) -> Option<&str> {
72    match expr {
73        Expr::Column(name) => Some(name.as_str()),
74        Expr::QualifiedColumn { column, .. } => Some(column.as_str()),
75        _ => None,
76    }
77}
78
79fn resolve_literal(expr: &Expr) -> Option<Value> {
80    match expr {
81        Expr::Literal(v) => Some(v.clone()),
82        Expr::Parameter(n) => crate::eval::resolve_scoped_param(*n).ok(),
83        _ => None,
84    }
85}
86
87fn extract_simple_predicate(expr: &Expr, schema: &TableSchema) -> Option<SimplePredicate> {
88    match expr {
89        Expr::BinaryOp { left, op, right } if is_comparison(*op) => {
90            if let (Some(name), Some(val)) = (resolve_column_name(left), resolve_literal(right)) {
91                let col_idx = schema.column_index(name)?;
92                return Some(SimplePredicate {
93                    col_idx,
94                    op: *op,
95                    value: val,
96                });
97            }
98            if let (Some(val), Some(name)) = (resolve_literal(left), resolve_column_name(right)) {
99                let col_idx = schema.column_index(name)?;
100                return Some(SimplePredicate {
101                    col_idx,
102                    op: flip_op(*op),
103                    value: val,
104                });
105            }
106            None
107        }
108        _ => None,
109    }
110}
111
112/// Decompose BETWEEN into two range predicates for planner use.
113fn flatten_between(expr: &Expr, schema: &TableSchema, out: &mut Vec<SimplePredicate>) {
114    match expr {
115        Expr::Between {
116            expr: col_expr,
117            low,
118            high,
119            negated: false,
120        } => {
121            if let (Some(name), Some(lo), Some(hi)) = (
122                resolve_column_name(col_expr),
123                resolve_literal(low),
124                resolve_literal(high),
125            ) {
126                if let Some(col_idx) = schema.column_index(name) {
127                    out.push(SimplePredicate {
128                        col_idx,
129                        op: BinOp::GtEq,
130                        value: lo,
131                    });
132                    out.push(SimplePredicate {
133                        col_idx,
134                        op: BinOp::LtEq,
135                        value: hi,
136                    });
137                }
138            }
139        }
140        Expr::BinaryOp {
141            left,
142            op: BinOp::And,
143            right,
144        } => {
145            flatten_between(left, schema, out);
146            flatten_between(right, schema, out);
147        }
148        _ => {}
149    }
150}
151
152pub fn plan_select(schema: &TableSchema, where_clause: &Option<Expr>) -> ScanPlan {
153    let where_expr = match where_clause {
154        Some(e) => e,
155        None => return ScanPlan::SeqScan,
156    };
157
158    let predicates = flatten_and(where_expr);
159    let simple: Vec<Option<SimplePredicate>> = predicates
160        .iter()
161        .map(|p| extract_simple_predicate(p, schema))
162        .collect();
163
164    if let Some(plan) = try_pk_lookup(schema, &simple) {
165        return plan;
166    }
167
168    let mut range_preds: Vec<SimplePredicate> = simple
169        .iter()
170        .filter_map(|p| {
171            let p = p.as_ref()?;
172            if is_range_op(p.op) {
173                Some(SimplePredicate {
174                    col_idx: p.col_idx,
175                    op: p.op,
176                    value: p.value.clone(),
177                })
178            } else {
179                None
180            }
181        })
182        .collect();
183    flatten_between(where_expr, schema, &mut range_preds);
184
185    if let Some(plan) = try_pk_range_scan(schema, &range_preds) {
186        return plan;
187    }
188
189    if let Some(plan) = try_best_index(schema, &simple) {
190        return plan;
191    }
192
193    ScanPlan::SeqScan
194}
195
196fn try_pk_range_scan(schema: &TableSchema, range_preds: &[SimplePredicate]) -> Option<ScanPlan> {
197    if schema.primary_key_columns.len() != 1 {
198        return None; // Only single-column PK for now
199    }
200    let pk_col = schema.primary_key_columns[0] as usize;
201    let conds: Vec<(BinOp, Value)> = range_preds
202        .iter()
203        .filter(|p| p.col_idx == pk_col)
204        .map(|p| (p.op, p.value.clone()))
205        .collect();
206    if conds.is_empty() {
207        return None;
208    }
209    let start_key = conds
210        .iter()
211        .filter(|(op, _)| matches!(op, BinOp::GtEq | BinOp::Gt))
212        .map(|(_, v)| encode_composite_key(std::slice::from_ref(v)))
213        .min_by(|a, b| a.cmp(b))
214        .unwrap_or_default();
215    Some(ScanPlan::PkRangeScan {
216        start_key,
217        range_conds: conds,
218        num_pk_cols: 1,
219    })
220}
221
222fn try_pk_lookup(schema: &TableSchema, predicates: &[Option<SimplePredicate>]) -> Option<ScanPlan> {
223    let pk_cols = &schema.primary_key_columns;
224    let mut pk_values: Vec<Option<Value>> = vec![None; pk_cols.len()];
225
226    for pred in predicates.iter().flatten() {
227        if pred.op == BinOp::Eq {
228            if let Some(pk_pos) = pk_cols.iter().position(|&c| c == pred.col_idx as u16) {
229                pk_values[pk_pos] = Some(pred.value.clone());
230            }
231        }
232    }
233
234    if pk_values.iter().all(|v| v.is_some()) {
235        let values: Vec<Value> = pk_values.into_iter().map(|v| v.unwrap()).collect();
236        Some(ScanPlan::PkLookup { pk_values: values })
237    } else {
238        None
239    }
240}
241
242#[derive(PartialEq, Eq, PartialOrd, Ord)]
243struct IndexScore {
244    num_equality: usize,
245    has_range: bool,
246    is_unique: bool,
247}
248
249fn try_best_index(
250    schema: &TableSchema,
251    predicates: &[Option<SimplePredicate>],
252) -> Option<ScanPlan> {
253    let mut best_score: Option<IndexScore> = None;
254    let mut best_plan: Option<ScanPlan> = None;
255
256    for idx in &schema.indices {
257        if let Some((score, plan)) = try_index_scan(schema, idx, predicates) {
258            if best_score.is_none() || score > *best_score.as_ref().unwrap() {
259                best_score = Some(score);
260                best_plan = Some(plan);
261            }
262        }
263    }
264
265    best_plan
266}
267
268fn try_index_scan(
269    schema: &TableSchema,
270    idx: &IndexDef,
271    predicates: &[Option<SimplePredicate>],
272) -> Option<(IndexScore, ScanPlan)> {
273    let mut used = Vec::new();
274    let mut equality_values: Vec<Value> = Vec::new();
275    let mut range_conds: Vec<(BinOp, Value)> = Vec::new();
276
277    for &col_idx in &idx.columns {
278        let mut found_eq = false;
279        for (i, pred) in predicates.iter().enumerate() {
280            if used.contains(&i) {
281                continue;
282            }
283            if let Some(sp) = pred {
284                if sp.col_idx == col_idx as usize && sp.op == BinOp::Eq {
285                    equality_values.push(sp.value.clone());
286                    used.push(i);
287                    found_eq = true;
288                    break;
289                }
290            }
291        }
292        if !found_eq {
293            for (i, pred) in predicates.iter().enumerate() {
294                if used.contains(&i) {
295                    continue;
296                }
297                if let Some(sp) = pred {
298                    if sp.col_idx == col_idx as usize && is_range_op(sp.op) {
299                        range_conds.push((sp.op, sp.value.clone()));
300                        used.push(i);
301                    }
302                }
303            }
304            break;
305        }
306    }
307
308    if equality_values.is_empty() && range_conds.is_empty() {
309        return None;
310    }
311
312    let score = IndexScore {
313        num_equality: equality_values.len(),
314        has_range: !range_conds.is_empty(),
315        is_unique: idx.unique,
316    };
317
318    let prefix = encode_composite_key(&equality_values);
319    let idx_table = TableSchema::index_table_name(&schema.name, &idx.name);
320
321    Some((
322        score,
323        ScanPlan::IndexScan {
324            index_name: idx.name.clone(),
325            idx_table,
326            prefix,
327            num_prefix_cols: equality_values.len(),
328            range_conds,
329            is_unique: idx.unique,
330            index_columns: idx.columns.clone(),
331        },
332    ))
333}
334
335pub fn describe_plan(plan: &ScanPlan, table_schema: &TableSchema) -> String {
336    match plan {
337        ScanPlan::SeqScan => String::new(),
338
339        ScanPlan::PkLookup { pk_values } => {
340            let pk_cols: Vec<&str> = table_schema
341                .primary_key_columns
342                .iter()
343                .map(|&idx| table_schema.columns[idx as usize].name.as_str())
344                .collect();
345            let conditions: Vec<String> = pk_cols
346                .iter()
347                .zip(pk_values.iter())
348                .map(|(col, val)| format!("{col} = {}", format_value(val)))
349                .collect();
350            format!("USING PRIMARY KEY ({})", conditions.join(", "))
351        }
352
353        ScanPlan::PkRangeScan { range_conds, .. } => {
354            let pk_col = &table_schema.columns[table_schema.primary_key_columns[0] as usize].name;
355            let conditions: Vec<String> = range_conds
356                .iter()
357                .map(|(op, val)| format!("{pk_col} {} {}", op_symbol(*op), format_value(val)))
358                .collect();
359            format!("USING PRIMARY KEY RANGE ({})", conditions.join(", "))
360        }
361
362        ScanPlan::IndexScan {
363            index_name,
364            num_prefix_cols,
365            range_conds,
366            index_columns,
367            ..
368        } => {
369            let mut conditions = Vec::new();
370            for &col in index_columns.iter().take(*num_prefix_cols) {
371                let col_idx = col as usize;
372                let col_name = &table_schema.columns[col_idx].name;
373                conditions.push(format!("{col_name} = ?"));
374            }
375            if !range_conds.is_empty() && *num_prefix_cols < index_columns.len() {
376                let col_idx = index_columns[*num_prefix_cols] as usize;
377                let col_name = &table_schema.columns[col_idx].name;
378                for (op, _) in range_conds {
379                    conditions.push(format!("{col_name} {} ?", op_symbol(*op)));
380                }
381            }
382            if conditions.is_empty() {
383                format!("USING INDEX {index_name}")
384            } else {
385                format!("USING INDEX {index_name} ({})", conditions.join(", "))
386            }
387        }
388    }
389}
390
391fn format_value(val: &Value) -> String {
392    match val {
393        Value::Null => "NULL".into(),
394        Value::Integer(i) => i.to_string(),
395        Value::Real(f) => format!("{f}"),
396        Value::Text(s) => format!("'{s}'"),
397        Value::Blob(_) => "BLOB".into(),
398        Value::Boolean(b) => b.to_string(),
399        Value::Date(d) => format!("DATE '{}'", crate::datetime::format_date(*d)),
400        Value::Time(t) => format!("TIME '{}'", crate::datetime::format_time(*t)),
401        Value::Timestamp(t) => format!("TIMESTAMP '{}'", crate::datetime::format_timestamp(*t)),
402        Value::Interval {
403            months,
404            days,
405            micros,
406        } => format!(
407            "INTERVAL '{}'",
408            crate::datetime::format_interval(*months, *days, *micros)
409        ),
410    }
411}
412
413fn op_symbol(op: BinOp) -> &'static str {
414    match op {
415        BinOp::Lt => "<",
416        BinOp::LtEq => "<=",
417        BinOp::Gt => ">",
418        BinOp::GtEq => ">=",
419        BinOp::Eq => "=",
420        BinOp::NotEq => "!=",
421        _ => "?",
422    }
423}
424
425#[cfg(test)]
426#[path = "planner_tests.rs"]
427mod tests;