Skip to main content

citadeldb_sql/
planner.rs

1//! Rule-based query planner with index selection.
2//!
3//! Analyzes WHERE clauses to choose between sequential scan, PK lookup,
4//! or index scan. Uses leftmost-prefix rule for composite indexes.
5
6use crate::encoding::encode_composite_key;
7use crate::parser::{BinOp, Expr};
8use crate::types::{IndexDef, TableSchema, Value};
9
10// ── Scan plan ────────────────────────────────────────────────────────
11
12#[derive(Debug)]
13pub enum ScanPlan {
14    SeqScan,
15    PkLookup {
16        pk_values: Vec<Value>,
17    },
18    IndexScan {
19        index_name: String,
20        idx_table: Vec<u8>,
21        prefix: Vec<u8>,
22        num_prefix_cols: usize,
23        range_conds: Vec<(BinOp, Value)>,
24        is_unique: bool,
25        index_columns: Vec<u16>,
26    },
27}
28
29// ── Simple predicate extraction ──────────────────────────────────────
30
31struct SimplePredicate {
32    col_idx: usize,
33    op: BinOp,
34    value: Value,
35}
36
37fn flatten_and<'a>(expr: &'a Expr) -> Vec<&'a Expr> {
38    match expr {
39        Expr::BinaryOp { left, op: BinOp::And, right } => {
40            let mut v = flatten_and(left);
41            v.extend(flatten_and(right));
42            v
43        }
44        _ => vec![expr],
45    }
46}
47
48fn is_comparison(op: BinOp) -> bool {
49    matches!(op, BinOp::Eq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq)
50}
51
52fn is_range_op(op: BinOp) -> bool {
53    matches!(op, BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq)
54}
55
56fn flip_op(op: BinOp) -> BinOp {
57    match op {
58        BinOp::Lt => BinOp::Gt,
59        BinOp::LtEq => BinOp::GtEq,
60        BinOp::Gt => BinOp::Lt,
61        BinOp::GtEq => BinOp::LtEq,
62        other => other,
63    }
64}
65
66fn resolve_column_name<'a>(expr: &'a Expr) -> Option<&'a str> {
67    match expr {
68        Expr::Column(name) => Some(name.as_str()),
69        Expr::QualifiedColumn { column, .. } => Some(column.as_str()),
70        _ => None,
71    }
72}
73
74fn extract_simple_predicate(expr: &Expr, schema: &TableSchema) -> Option<SimplePredicate> {
75    match expr {
76        Expr::BinaryOp { left, op, right } if is_comparison(*op) => {
77            if let (Some(name), Expr::Literal(val)) = (resolve_column_name(left), right.as_ref()) {
78                let col_idx = schema.column_index(name)?;
79                return Some(SimplePredicate { col_idx, op: *op, value: val.clone() });
80            }
81            if let (Expr::Literal(val), Some(name)) = (left.as_ref(), resolve_column_name(right)) {
82                let col_idx = schema.column_index(name)?;
83                return Some(SimplePredicate { col_idx, op: flip_op(*op), value: val.clone() });
84            }
85            None
86        }
87        _ => None,
88    }
89}
90
91// ── Plan selection ───────────────────────────────────────────────────
92
93pub fn plan_select(schema: &TableSchema, where_clause: &Option<Expr>) -> ScanPlan {
94    let where_expr = match where_clause {
95        Some(e) => e,
96        None => return ScanPlan::SeqScan,
97    };
98
99    let predicates = flatten_and(where_expr);
100    let simple: Vec<Option<SimplePredicate>> = predicates.iter()
101        .map(|p| extract_simple_predicate(p, schema))
102        .collect();
103
104    if let Some(plan) = try_pk_lookup(schema, &simple) {
105        return plan;
106    }
107
108    if let Some(plan) = try_best_index(schema, &simple) {
109        return plan;
110    }
111
112    ScanPlan::SeqScan
113}
114
115fn try_pk_lookup(schema: &TableSchema, predicates: &[Option<SimplePredicate>]) -> Option<ScanPlan> {
116    let pk_cols = &schema.primary_key_columns;
117    let mut pk_values: Vec<Option<Value>> = vec![None; pk_cols.len()];
118
119    for pred in predicates.iter().flatten() {
120        if pred.op == BinOp::Eq {
121            if let Some(pk_pos) = pk_cols.iter().position(|&c| c == pred.col_idx as u16) {
122                pk_values[pk_pos] = Some(pred.value.clone());
123            }
124        }
125    }
126
127    if pk_values.iter().all(|v| v.is_some()) {
128        let values: Vec<Value> = pk_values.into_iter().map(|v| v.unwrap()).collect();
129        Some(ScanPlan::PkLookup { pk_values: values })
130    } else {
131        None
132    }
133}
134
135// ── Index scoring and selection ──────────────────────────────────────
136
137#[derive(PartialEq, Eq, PartialOrd, Ord)]
138struct IndexScore {
139    num_equality: usize,
140    has_range: bool,
141    is_unique: bool,
142}
143
144fn try_best_index(schema: &TableSchema, predicates: &[Option<SimplePredicate>]) -> Option<ScanPlan> {
145    let mut best_score: Option<IndexScore> = None;
146    let mut best_plan: Option<ScanPlan> = None;
147
148    for idx in &schema.indices {
149        if let Some((score, plan)) = try_index_scan(schema, idx, predicates) {
150            if best_score.is_none() || score > *best_score.as_ref().unwrap() {
151                best_score = Some(score);
152                best_plan = Some(plan);
153            }
154        }
155    }
156
157    best_plan
158}
159
160fn try_index_scan(
161    schema: &TableSchema,
162    idx: &IndexDef,
163    predicates: &[Option<SimplePredicate>],
164) -> Option<(IndexScore, ScanPlan)> {
165    let mut used = Vec::new();
166    let mut equality_values: Vec<Value> = Vec::new();
167    let mut range_conds: Vec<(BinOp, Value)> = Vec::new();
168
169    for &col_idx in &idx.columns {
170        let mut found_eq = false;
171        for (i, pred) in predicates.iter().enumerate() {
172            if used.contains(&i) { continue; }
173            if let Some(sp) = pred {
174                if sp.col_idx == col_idx as usize && sp.op == BinOp::Eq {
175                    equality_values.push(sp.value.clone());
176                    used.push(i);
177                    found_eq = true;
178                    break;
179                }
180            }
181        }
182        if !found_eq {
183            for (i, pred) in predicates.iter().enumerate() {
184                if used.contains(&i) { continue; }
185                if let Some(sp) = pred {
186                    if sp.col_idx == col_idx as usize && is_range_op(sp.op) {
187                        range_conds.push((sp.op, sp.value.clone()));
188                        used.push(i);
189                    }
190                }
191            }
192            break;
193        }
194    }
195
196    if equality_values.is_empty() && range_conds.is_empty() {
197        return None;
198    }
199
200    let score = IndexScore {
201        num_equality: equality_values.len(),
202        has_range: !range_conds.is_empty(),
203        is_unique: idx.unique,
204    };
205
206    let prefix = encode_composite_key(&equality_values);
207    let idx_table = TableSchema::index_table_name(&schema.name, &idx.name);
208
209    Some((score, ScanPlan::IndexScan {
210        index_name: idx.name.clone(),
211        idx_table,
212        prefix,
213        num_prefix_cols: equality_values.len(),
214        range_conds,
215        is_unique: idx.unique,
216        index_columns: idx.columns.clone(),
217    }))
218}
219
220// ── Plan description for EXPLAIN ────────────────────────────────────
221
222pub fn describe_plan(plan: &ScanPlan, table_schema: &TableSchema) -> String {
223    match plan {
224        ScanPlan::SeqScan => String::new(),
225
226        ScanPlan::PkLookup { pk_values } => {
227            let pk_cols: Vec<&str> = table_schema.primary_key_columns.iter()
228                .map(|&idx| table_schema.columns[idx as usize].name.as_str())
229                .collect();
230            let conditions: Vec<String> = pk_cols.iter().zip(pk_values.iter())
231                .map(|(col, val)| format!("{col} = {}", format_value(val)))
232                .collect();
233            format!("USING PRIMARY KEY ({})", conditions.join(", "))
234        }
235
236        ScanPlan::IndexScan { index_name, num_prefix_cols, range_conds, index_columns, .. } => {
237            let mut conditions = Vec::new();
238            for i in 0..*num_prefix_cols {
239                let col_idx = index_columns[i] as usize;
240                let col_name = &table_schema.columns[col_idx].name;
241                conditions.push(format!("{col_name} = ?"));
242            }
243            if !range_conds.is_empty() && *num_prefix_cols < index_columns.len() {
244                let col_idx = index_columns[*num_prefix_cols] as usize;
245                let col_name = &table_schema.columns[col_idx].name;
246                for (op, _) in range_conds {
247                    conditions.push(format!("{col_name} {} ?", op_symbol(*op)));
248                }
249            }
250            if conditions.is_empty() {
251                format!("USING INDEX {index_name}")
252            } else {
253                format!("USING INDEX {index_name} ({})", conditions.join(", "))
254            }
255        }
256    }
257}
258
259fn format_value(val: &Value) -> String {
260    match val {
261        Value::Null => "NULL".into(),
262        Value::Integer(i) => i.to_string(),
263        Value::Real(f) => format!("{f}"),
264        Value::Text(s) => format!("'{s}'"),
265        Value::Blob(_) => "BLOB".into(),
266        Value::Boolean(b) => b.to_string(),
267    }
268}
269
270fn op_symbol(op: BinOp) -> &'static str {
271    match op {
272        BinOp::Lt => "<",
273        BinOp::LtEq => "<=",
274        BinOp::Gt => ">",
275        BinOp::GtEq => ">=",
276        BinOp::Eq => "=",
277        BinOp::NotEq => "!=",
278        _ => "?",
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use crate::types::{ColumnDef, DataType};
286
287    fn test_schema() -> TableSchema {
288        TableSchema {
289            name: "users".into(),
290            columns: vec![
291                ColumnDef { name: "id".into(), data_type: DataType::Integer, nullable: false, position: 0 },
292                ColumnDef { name: "name".into(), data_type: DataType::Text, nullable: true, position: 1 },
293                ColumnDef { name: "age".into(), data_type: DataType::Integer, nullable: true, position: 2 },
294                ColumnDef { name: "email".into(), data_type: DataType::Text, nullable: true, position: 3 },
295            ],
296            primary_key_columns: vec![0],
297            indices: vec![
298                IndexDef { name: "idx_name".into(), columns: vec![1], unique: false },
299                IndexDef { name: "idx_email".into(), columns: vec![3], unique: true },
300                IndexDef { name: "idx_name_age".into(), columns: vec![1, 2], unique: false },
301            ],
302        }
303    }
304
305    #[test]
306    fn no_where_is_seq_scan() {
307        let schema = test_schema();
308        let plan = plan_select(&schema, &None);
309        assert!(matches!(plan, ScanPlan::SeqScan));
310    }
311
312    #[test]
313    fn pk_equality_is_pk_lookup() {
314        let schema = test_schema();
315        let where_clause = Some(Expr::BinaryOp {
316            left: Box::new(Expr::Column("id".into())),
317            op: BinOp::Eq,
318            right: Box::new(Expr::Literal(Value::Integer(42))),
319        });
320        let plan = plan_select(&schema, &where_clause);
321        match plan {
322            ScanPlan::PkLookup { pk_values } => {
323                assert_eq!(pk_values, vec![Value::Integer(42)]);
324            }
325            other => panic!("expected PkLookup, got {other:?}"),
326        }
327    }
328
329    #[test]
330    fn unique_index_equality() {
331        let schema = test_schema();
332        let where_clause = Some(Expr::BinaryOp {
333            left: Box::new(Expr::Column("email".into())),
334            op: BinOp::Eq,
335            right: Box::new(Expr::Literal(Value::Text("alice@test.com".into()))),
336        });
337        let plan = plan_select(&schema, &where_clause);
338        match plan {
339            ScanPlan::IndexScan { index_name, is_unique, num_prefix_cols, .. } => {
340                assert_eq!(index_name, "idx_email");
341                assert!(is_unique);
342                assert_eq!(num_prefix_cols, 1);
343            }
344            other => panic!("expected IndexScan, got {other:?}"),
345        }
346    }
347
348    #[test]
349    fn non_unique_index_equality() {
350        let schema = test_schema();
351        let where_clause = Some(Expr::BinaryOp {
352            left: Box::new(Expr::Column("name".into())),
353            op: BinOp::Eq,
354            right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
355        });
356        let plan = plan_select(&schema, &where_clause);
357        match plan {
358            ScanPlan::IndexScan { index_name, num_prefix_cols, .. } => {
359                assert!(index_name == "idx_name" || index_name == "idx_name_age");
360                assert_eq!(num_prefix_cols, 1);
361            }
362            other => panic!("expected IndexScan, got {other:?}"),
363        }
364    }
365
366    #[test]
367    fn composite_index_full_prefix() {
368        let schema = test_schema();
369        let where_clause = Some(Expr::BinaryOp {
370            left: Box::new(Expr::BinaryOp {
371                left: Box::new(Expr::Column("name".into())),
372                op: BinOp::Eq,
373                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
374            }),
375            op: BinOp::And,
376            right: Box::new(Expr::BinaryOp {
377                left: Box::new(Expr::Column("age".into())),
378                op: BinOp::Eq,
379                right: Box::new(Expr::Literal(Value::Integer(30))),
380            }),
381        });
382        let plan = plan_select(&schema, &where_clause);
383        match plan {
384            ScanPlan::IndexScan { index_name, num_prefix_cols, .. } => {
385                assert_eq!(index_name, "idx_name_age");
386                assert_eq!(num_prefix_cols, 2);
387            }
388            other => panic!("expected IndexScan, got {other:?}"),
389        }
390    }
391
392    #[test]
393    fn range_scan_on_indexed_column() {
394        let schema = test_schema();
395        let where_clause = Some(Expr::BinaryOp {
396            left: Box::new(Expr::Column("name".into())),
397            op: BinOp::Gt,
398            right: Box::new(Expr::Literal(Value::Text("M".into()))),
399        });
400        let plan = plan_select(&schema, &where_clause);
401        match plan {
402            ScanPlan::IndexScan { range_conds, num_prefix_cols, .. } => {
403                assert_eq!(num_prefix_cols, 0);
404                assert_eq!(range_conds.len(), 1);
405                assert_eq!(range_conds[0].0, BinOp::Gt);
406            }
407            other => panic!("expected IndexScan, got {other:?}"),
408        }
409    }
410
411    #[test]
412    fn composite_equality_plus_range() {
413        let schema = test_schema();
414        let where_clause = Some(Expr::BinaryOp {
415            left: Box::new(Expr::BinaryOp {
416                left: Box::new(Expr::Column("name".into())),
417                op: BinOp::Eq,
418                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
419            }),
420            op: BinOp::And,
421            right: Box::new(Expr::BinaryOp {
422                left: Box::new(Expr::Column("age".into())),
423                op: BinOp::Gt,
424                right: Box::new(Expr::Literal(Value::Integer(25))),
425            }),
426        });
427        let plan = plan_select(&schema, &where_clause);
428        match plan {
429            ScanPlan::IndexScan { index_name, num_prefix_cols, range_conds, .. } => {
430                assert_eq!(index_name, "idx_name_age");
431                assert_eq!(num_prefix_cols, 1);
432                assert_eq!(range_conds.len(), 1);
433            }
434            other => panic!("expected IndexScan, got {other:?}"),
435        }
436    }
437
438    #[test]
439    fn or_condition_falls_back_to_seq_scan() {
440        let schema = test_schema();
441        let where_clause = Some(Expr::BinaryOp {
442            left: Box::new(Expr::BinaryOp {
443                left: Box::new(Expr::Column("name".into())),
444                op: BinOp::Eq,
445                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
446            }),
447            op: BinOp::Or,
448            right: Box::new(Expr::BinaryOp {
449                left: Box::new(Expr::Column("name".into())),
450                op: BinOp::Eq,
451                right: Box::new(Expr::Literal(Value::Text("Bob".into()))),
452            }),
453        });
454        let plan = plan_select(&schema, &where_clause);
455        assert!(matches!(plan, ScanPlan::SeqScan));
456    }
457
458    #[test]
459    fn non_indexed_column_is_seq_scan() {
460        let schema = test_schema();
461        let where_clause = Some(Expr::BinaryOp {
462            left: Box::new(Expr::Column("age".into())),
463            op: BinOp::Eq,
464            right: Box::new(Expr::Literal(Value::Integer(30))),
465        });
466        let plan = plan_select(&schema, &where_clause);
467        assert!(matches!(plan, ScanPlan::SeqScan));
468    }
469
470    #[test]
471    fn reversed_literal_column() {
472        let schema = test_schema();
473        let where_clause = Some(Expr::BinaryOp {
474            left: Box::new(Expr::Literal(Value::Integer(42))),
475            op: BinOp::Eq,
476            right: Box::new(Expr::Column("id".into())),
477        });
478        let plan = plan_select(&schema, &where_clause);
479        assert!(matches!(plan, ScanPlan::PkLookup { .. }));
480    }
481
482    #[test]
483    fn reversed_comparison_flips_op() {
484        let schema = test_schema();
485        let where_clause = Some(Expr::BinaryOp {
486            left: Box::new(Expr::Literal(Value::Integer(5))),
487            op: BinOp::Lt,
488            right: Box::new(Expr::Column("name".into())),
489        });
490        let plan = plan_select(&schema, &where_clause);
491        match plan {
492            ScanPlan::IndexScan { range_conds, .. } => {
493                assert_eq!(range_conds[0].0, BinOp::Gt);
494            }
495            other => panic!("expected IndexScan, got {other:?}"),
496        }
497    }
498
499    #[test]
500    fn prefers_unique_index() {
501        let schema = TableSchema {
502            name: "t".into(),
503            columns: vec![
504                ColumnDef { name: "id".into(), data_type: DataType::Integer, nullable: false, position: 0 },
505                ColumnDef { name: "code".into(), data_type: DataType::Text, nullable: false, position: 1 },
506            ],
507            primary_key_columns: vec![0],
508            indices: vec![
509                IndexDef { name: "idx_code".into(), columns: vec![1], unique: false },
510                IndexDef { name: "idx_code_uniq".into(), columns: vec![1], unique: true },
511            ],
512        };
513        let where_clause = Some(Expr::BinaryOp {
514            left: Box::new(Expr::Column("code".into())),
515            op: BinOp::Eq,
516            right: Box::new(Expr::Literal(Value::Text("X".into()))),
517        });
518        let plan = plan_select(&schema, &where_clause);
519        match plan {
520            ScanPlan::IndexScan { index_name, is_unique, .. } => {
521                assert_eq!(index_name, "idx_code_uniq");
522                assert!(is_unique);
523            }
524            other => panic!("expected IndexScan, got {other:?}"),
525        }
526    }
527
528    #[test]
529    fn prefers_more_equality_columns() {
530        let schema = test_schema();
531        let where_clause = Some(Expr::BinaryOp {
532            left: Box::new(Expr::BinaryOp {
533                left: Box::new(Expr::Column("name".into())),
534                op: BinOp::Eq,
535                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
536            }),
537            op: BinOp::And,
538            right: Box::new(Expr::BinaryOp {
539                left: Box::new(Expr::Column("age".into())),
540                op: BinOp::Eq,
541                right: Box::new(Expr::Literal(Value::Integer(30))),
542            }),
543        });
544        let plan = plan_select(&schema, &where_clause);
545        match plan {
546            ScanPlan::IndexScan { index_name, num_prefix_cols, .. } => {
547                assert_eq!(index_name, "idx_name_age");
548                assert_eq!(num_prefix_cols, 2);
549            }
550            other => panic!("expected IndexScan, got {other:?}"),
551        }
552    }
553}