Skip to main content

citadel_sql/
planner.rs

1//! Rule-based query planner with index selection.
2//!
3//! Analyzes WHERE clauses to choose between sequential scan, PK lookup,
4//! or index scan. Uses leftmost-prefix rule for composite indexes.
5
6use crate::encoding::encode_composite_key;
7use crate::parser::{BinOp, Expr};
8use crate::types::{IndexDef, TableSchema, Value};
9
10// ── Scan plan ────────────────────────────────────────────────────────
11
12#[derive(Debug)]
13pub enum ScanPlan {
14    SeqScan,
15    PkLookup {
16        pk_values: Vec<Value>,
17    },
18    IndexScan {
19        index_name: String,
20        idx_table: Vec<u8>,
21        prefix: Vec<u8>,
22        num_prefix_cols: usize,
23        range_conds: Vec<(BinOp, Value)>,
24        is_unique: bool,
25        index_columns: Vec<u16>,
26    },
27}
28
29// ── Simple predicate extraction ──────────────────────────────────────
30
31struct SimplePredicate {
32    col_idx: usize,
33    op: BinOp,
34    value: Value,
35}
36
37fn flatten_and(expr: &Expr) -> Vec<&Expr> {
38    match expr {
39        Expr::BinaryOp {
40            left,
41            op: BinOp::And,
42            right,
43        } => {
44            let mut v = flatten_and(left);
45            v.extend(flatten_and(right));
46            v
47        }
48        _ => vec![expr],
49    }
50}
51
52fn is_comparison(op: BinOp) -> bool {
53    matches!(
54        op,
55        BinOp::Eq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq
56    )
57}
58
59fn is_range_op(op: BinOp) -> bool {
60    matches!(op, BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq)
61}
62
63fn flip_op(op: BinOp) -> BinOp {
64    match op {
65        BinOp::Lt => BinOp::Gt,
66        BinOp::LtEq => BinOp::GtEq,
67        BinOp::Gt => BinOp::Lt,
68        BinOp::GtEq => BinOp::LtEq,
69        other => other,
70    }
71}
72
73fn resolve_column_name(expr: &Expr) -> Option<&str> {
74    match expr {
75        Expr::Column(name) => Some(name.as_str()),
76        Expr::QualifiedColumn { column, .. } => Some(column.as_str()),
77        _ => None,
78    }
79}
80
81fn extract_simple_predicate(expr: &Expr, schema: &TableSchema) -> Option<SimplePredicate> {
82    match expr {
83        Expr::BinaryOp { left, op, right } if is_comparison(*op) => {
84            if let (Some(name), Expr::Literal(val)) = (resolve_column_name(left), right.as_ref()) {
85                let col_idx = schema.column_index(name)?;
86                return Some(SimplePredicate {
87                    col_idx,
88                    op: *op,
89                    value: val.clone(),
90                });
91            }
92            if let (Expr::Literal(val), Some(name)) = (left.as_ref(), resolve_column_name(right)) {
93                let col_idx = schema.column_index(name)?;
94                return Some(SimplePredicate {
95                    col_idx,
96                    op: flip_op(*op),
97                    value: val.clone(),
98                });
99            }
100            None
101        }
102        _ => None,
103    }
104}
105
106// ── Plan selection ───────────────────────────────────────────────────
107
108pub fn plan_select(schema: &TableSchema, where_clause: &Option<Expr>) -> ScanPlan {
109    let where_expr = match where_clause {
110        Some(e) => e,
111        None => return ScanPlan::SeqScan,
112    };
113
114    let predicates = flatten_and(where_expr);
115    let simple: Vec<Option<SimplePredicate>> = predicates
116        .iter()
117        .map(|p| extract_simple_predicate(p, schema))
118        .collect();
119
120    if let Some(plan) = try_pk_lookup(schema, &simple) {
121        return plan;
122    }
123
124    if let Some(plan) = try_best_index(schema, &simple) {
125        return plan;
126    }
127
128    ScanPlan::SeqScan
129}
130
131fn try_pk_lookup(schema: &TableSchema, predicates: &[Option<SimplePredicate>]) -> Option<ScanPlan> {
132    let pk_cols = &schema.primary_key_columns;
133    let mut pk_values: Vec<Option<Value>> = vec![None; pk_cols.len()];
134
135    for pred in predicates.iter().flatten() {
136        if pred.op == BinOp::Eq {
137            if let Some(pk_pos) = pk_cols.iter().position(|&c| c == pred.col_idx as u16) {
138                pk_values[pk_pos] = Some(pred.value.clone());
139            }
140        }
141    }
142
143    if pk_values.iter().all(|v| v.is_some()) {
144        let values: Vec<Value> = pk_values.into_iter().map(|v| v.unwrap()).collect();
145        Some(ScanPlan::PkLookup { pk_values: values })
146    } else {
147        None
148    }
149}
150
151// ── Index scoring and selection ──────────────────────────────────────
152
153#[derive(PartialEq, Eq, PartialOrd, Ord)]
154struct IndexScore {
155    num_equality: usize,
156    has_range: bool,
157    is_unique: bool,
158}
159
160fn try_best_index(
161    schema: &TableSchema,
162    predicates: &[Option<SimplePredicate>],
163) -> Option<ScanPlan> {
164    let mut best_score: Option<IndexScore> = None;
165    let mut best_plan: Option<ScanPlan> = None;
166
167    for idx in &schema.indices {
168        if let Some((score, plan)) = try_index_scan(schema, idx, predicates) {
169            if best_score.is_none() || score > *best_score.as_ref().unwrap() {
170                best_score = Some(score);
171                best_plan = Some(plan);
172            }
173        }
174    }
175
176    best_plan
177}
178
179fn try_index_scan(
180    schema: &TableSchema,
181    idx: &IndexDef,
182    predicates: &[Option<SimplePredicate>],
183) -> Option<(IndexScore, ScanPlan)> {
184    let mut used = Vec::new();
185    let mut equality_values: Vec<Value> = Vec::new();
186    let mut range_conds: Vec<(BinOp, Value)> = Vec::new();
187
188    for &col_idx in &idx.columns {
189        let mut found_eq = false;
190        for (i, pred) in predicates.iter().enumerate() {
191            if used.contains(&i) {
192                continue;
193            }
194            if let Some(sp) = pred {
195                if sp.col_idx == col_idx as usize && sp.op == BinOp::Eq {
196                    equality_values.push(sp.value.clone());
197                    used.push(i);
198                    found_eq = true;
199                    break;
200                }
201            }
202        }
203        if !found_eq {
204            for (i, pred) in predicates.iter().enumerate() {
205                if used.contains(&i) {
206                    continue;
207                }
208                if let Some(sp) = pred {
209                    if sp.col_idx == col_idx as usize && is_range_op(sp.op) {
210                        range_conds.push((sp.op, sp.value.clone()));
211                        used.push(i);
212                    }
213                }
214            }
215            break;
216        }
217    }
218
219    if equality_values.is_empty() && range_conds.is_empty() {
220        return None;
221    }
222
223    let score = IndexScore {
224        num_equality: equality_values.len(),
225        has_range: !range_conds.is_empty(),
226        is_unique: idx.unique,
227    };
228
229    let prefix = encode_composite_key(&equality_values);
230    let idx_table = TableSchema::index_table_name(&schema.name, &idx.name);
231
232    Some((
233        score,
234        ScanPlan::IndexScan {
235            index_name: idx.name.clone(),
236            idx_table,
237            prefix,
238            num_prefix_cols: equality_values.len(),
239            range_conds,
240            is_unique: idx.unique,
241            index_columns: idx.columns.clone(),
242        },
243    ))
244}
245
246// ── Plan description for EXPLAIN ────────────────────────────────────
247
248pub fn describe_plan(plan: &ScanPlan, table_schema: &TableSchema) -> String {
249    match plan {
250        ScanPlan::SeqScan => String::new(),
251
252        ScanPlan::PkLookup { pk_values } => {
253            let pk_cols: Vec<&str> = table_schema
254                .primary_key_columns
255                .iter()
256                .map(|&idx| table_schema.columns[idx as usize].name.as_str())
257                .collect();
258            let conditions: Vec<String> = pk_cols
259                .iter()
260                .zip(pk_values.iter())
261                .map(|(col, val)| format!("{col} = {}", format_value(val)))
262                .collect();
263            format!("USING PRIMARY KEY ({})", conditions.join(", "))
264        }
265
266        ScanPlan::IndexScan {
267            index_name,
268            num_prefix_cols,
269            range_conds,
270            index_columns,
271            ..
272        } => {
273            let mut conditions = Vec::new();
274            for &col in index_columns.iter().take(*num_prefix_cols) {
275                let col_idx = col as usize;
276                let col_name = &table_schema.columns[col_idx].name;
277                conditions.push(format!("{col_name} = ?"));
278            }
279            if !range_conds.is_empty() && *num_prefix_cols < index_columns.len() {
280                let col_idx = index_columns[*num_prefix_cols] as usize;
281                let col_name = &table_schema.columns[col_idx].name;
282                for (op, _) in range_conds {
283                    conditions.push(format!("{col_name} {} ?", op_symbol(*op)));
284                }
285            }
286            if conditions.is_empty() {
287                format!("USING INDEX {index_name}")
288            } else {
289                format!("USING INDEX {index_name} ({})", conditions.join(", "))
290            }
291        }
292    }
293}
294
295fn format_value(val: &Value) -> String {
296    match val {
297        Value::Null => "NULL".into(),
298        Value::Integer(i) => i.to_string(),
299        Value::Real(f) => format!("{f}"),
300        Value::Text(s) => format!("'{s}'"),
301        Value::Blob(_) => "BLOB".into(),
302        Value::Boolean(b) => b.to_string(),
303    }
304}
305
306fn op_symbol(op: BinOp) -> &'static str {
307    match op {
308        BinOp::Lt => "<",
309        BinOp::LtEq => "<=",
310        BinOp::Gt => ">",
311        BinOp::GtEq => ">=",
312        BinOp::Eq => "=",
313        BinOp::NotEq => "!=",
314        _ => "?",
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use crate::types::{ColumnDef, DataType};
322
323    fn test_schema() -> TableSchema {
324        TableSchema {
325            name: "users".into(),
326            columns: vec![
327                ColumnDef {
328                    name: "id".into(),
329                    data_type: DataType::Integer,
330                    nullable: false,
331                    position: 0,
332                },
333                ColumnDef {
334                    name: "name".into(),
335                    data_type: DataType::Text,
336                    nullable: true,
337                    position: 1,
338                },
339                ColumnDef {
340                    name: "age".into(),
341                    data_type: DataType::Integer,
342                    nullable: true,
343                    position: 2,
344                },
345                ColumnDef {
346                    name: "email".into(),
347                    data_type: DataType::Text,
348                    nullable: true,
349                    position: 3,
350                },
351            ],
352            primary_key_columns: vec![0],
353            indices: vec![
354                IndexDef {
355                    name: "idx_name".into(),
356                    columns: vec![1],
357                    unique: false,
358                },
359                IndexDef {
360                    name: "idx_email".into(),
361                    columns: vec![3],
362                    unique: true,
363                },
364                IndexDef {
365                    name: "idx_name_age".into(),
366                    columns: vec![1, 2],
367                    unique: false,
368                },
369            ],
370        }
371    }
372
373    #[test]
374    fn no_where_is_seq_scan() {
375        let schema = test_schema();
376        let plan = plan_select(&schema, &None);
377        assert!(matches!(plan, ScanPlan::SeqScan));
378    }
379
380    #[test]
381    fn pk_equality_is_pk_lookup() {
382        let schema = test_schema();
383        let where_clause = Some(Expr::BinaryOp {
384            left: Box::new(Expr::Column("id".into())),
385            op: BinOp::Eq,
386            right: Box::new(Expr::Literal(Value::Integer(42))),
387        });
388        let plan = plan_select(&schema, &where_clause);
389        match plan {
390            ScanPlan::PkLookup { pk_values } => {
391                assert_eq!(pk_values, vec![Value::Integer(42)]);
392            }
393            other => panic!("expected PkLookup, got {other:?}"),
394        }
395    }
396
397    #[test]
398    fn unique_index_equality() {
399        let schema = test_schema();
400        let where_clause = Some(Expr::BinaryOp {
401            left: Box::new(Expr::Column("email".into())),
402            op: BinOp::Eq,
403            right: Box::new(Expr::Literal(Value::Text("alice@test.com".into()))),
404        });
405        let plan = plan_select(&schema, &where_clause);
406        match plan {
407            ScanPlan::IndexScan {
408                index_name,
409                is_unique,
410                num_prefix_cols,
411                ..
412            } => {
413                assert_eq!(index_name, "idx_email");
414                assert!(is_unique);
415                assert_eq!(num_prefix_cols, 1);
416            }
417            other => panic!("expected IndexScan, got {other:?}"),
418        }
419    }
420
421    #[test]
422    fn non_unique_index_equality() {
423        let schema = test_schema();
424        let where_clause = Some(Expr::BinaryOp {
425            left: Box::new(Expr::Column("name".into())),
426            op: BinOp::Eq,
427            right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
428        });
429        let plan = plan_select(&schema, &where_clause);
430        match plan {
431            ScanPlan::IndexScan {
432                index_name,
433                num_prefix_cols,
434                ..
435            } => {
436                assert!(index_name == "idx_name" || index_name == "idx_name_age");
437                assert_eq!(num_prefix_cols, 1);
438            }
439            other => panic!("expected IndexScan, got {other:?}"),
440        }
441    }
442
443    #[test]
444    fn composite_index_full_prefix() {
445        let schema = test_schema();
446        let where_clause = Some(Expr::BinaryOp {
447            left: Box::new(Expr::BinaryOp {
448                left: Box::new(Expr::Column("name".into())),
449                op: BinOp::Eq,
450                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
451            }),
452            op: BinOp::And,
453            right: Box::new(Expr::BinaryOp {
454                left: Box::new(Expr::Column("age".into())),
455                op: BinOp::Eq,
456                right: Box::new(Expr::Literal(Value::Integer(30))),
457            }),
458        });
459        let plan = plan_select(&schema, &where_clause);
460        match plan {
461            ScanPlan::IndexScan {
462                index_name,
463                num_prefix_cols,
464                ..
465            } => {
466                assert_eq!(index_name, "idx_name_age");
467                assert_eq!(num_prefix_cols, 2);
468            }
469            other => panic!("expected IndexScan, got {other:?}"),
470        }
471    }
472
473    #[test]
474    fn range_scan_on_indexed_column() {
475        let schema = test_schema();
476        let where_clause = Some(Expr::BinaryOp {
477            left: Box::new(Expr::Column("name".into())),
478            op: BinOp::Gt,
479            right: Box::new(Expr::Literal(Value::Text("M".into()))),
480        });
481        let plan = plan_select(&schema, &where_clause);
482        match plan {
483            ScanPlan::IndexScan {
484                range_conds,
485                num_prefix_cols,
486                ..
487            } => {
488                assert_eq!(num_prefix_cols, 0);
489                assert_eq!(range_conds.len(), 1);
490                assert_eq!(range_conds[0].0, BinOp::Gt);
491            }
492            other => panic!("expected IndexScan, got {other:?}"),
493        }
494    }
495
496    #[test]
497    fn composite_equality_plus_range() {
498        let schema = test_schema();
499        let where_clause = Some(Expr::BinaryOp {
500            left: Box::new(Expr::BinaryOp {
501                left: Box::new(Expr::Column("name".into())),
502                op: BinOp::Eq,
503                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
504            }),
505            op: BinOp::And,
506            right: Box::new(Expr::BinaryOp {
507                left: Box::new(Expr::Column("age".into())),
508                op: BinOp::Gt,
509                right: Box::new(Expr::Literal(Value::Integer(25))),
510            }),
511        });
512        let plan = plan_select(&schema, &where_clause);
513        match plan {
514            ScanPlan::IndexScan {
515                index_name,
516                num_prefix_cols,
517                range_conds,
518                ..
519            } => {
520                assert_eq!(index_name, "idx_name_age");
521                assert_eq!(num_prefix_cols, 1);
522                assert_eq!(range_conds.len(), 1);
523            }
524            other => panic!("expected IndexScan, got {other:?}"),
525        }
526    }
527
528    #[test]
529    fn or_condition_falls_back_to_seq_scan() {
530        let schema = test_schema();
531        let where_clause = Some(Expr::BinaryOp {
532            left: Box::new(Expr::BinaryOp {
533                left: Box::new(Expr::Column("name".into())),
534                op: BinOp::Eq,
535                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
536            }),
537            op: BinOp::Or,
538            right: Box::new(Expr::BinaryOp {
539                left: Box::new(Expr::Column("name".into())),
540                op: BinOp::Eq,
541                right: Box::new(Expr::Literal(Value::Text("Bob".into()))),
542            }),
543        });
544        let plan = plan_select(&schema, &where_clause);
545        assert!(matches!(plan, ScanPlan::SeqScan));
546    }
547
548    #[test]
549    fn non_indexed_column_is_seq_scan() {
550        let schema = test_schema();
551        let where_clause = Some(Expr::BinaryOp {
552            left: Box::new(Expr::Column("age".into())),
553            op: BinOp::Eq,
554            right: Box::new(Expr::Literal(Value::Integer(30))),
555        });
556        let plan = plan_select(&schema, &where_clause);
557        assert!(matches!(plan, ScanPlan::SeqScan));
558    }
559
560    #[test]
561    fn reversed_literal_column() {
562        let schema = test_schema();
563        let where_clause = Some(Expr::BinaryOp {
564            left: Box::new(Expr::Literal(Value::Integer(42))),
565            op: BinOp::Eq,
566            right: Box::new(Expr::Column("id".into())),
567        });
568        let plan = plan_select(&schema, &where_clause);
569        assert!(matches!(plan, ScanPlan::PkLookup { .. }));
570    }
571
572    #[test]
573    fn reversed_comparison_flips_op() {
574        let schema = test_schema();
575        let where_clause = Some(Expr::BinaryOp {
576            left: Box::new(Expr::Literal(Value::Integer(5))),
577            op: BinOp::Lt,
578            right: Box::new(Expr::Column("name".into())),
579        });
580        let plan = plan_select(&schema, &where_clause);
581        match plan {
582            ScanPlan::IndexScan { range_conds, .. } => {
583                assert_eq!(range_conds[0].0, BinOp::Gt);
584            }
585            other => panic!("expected IndexScan, got {other:?}"),
586        }
587    }
588
589    #[test]
590    fn prefers_unique_index() {
591        let schema = TableSchema {
592            name: "t".into(),
593            columns: vec![
594                ColumnDef {
595                    name: "id".into(),
596                    data_type: DataType::Integer,
597                    nullable: false,
598                    position: 0,
599                },
600                ColumnDef {
601                    name: "code".into(),
602                    data_type: DataType::Text,
603                    nullable: false,
604                    position: 1,
605                },
606            ],
607            primary_key_columns: vec![0],
608            indices: vec![
609                IndexDef {
610                    name: "idx_code".into(),
611                    columns: vec![1],
612                    unique: false,
613                },
614                IndexDef {
615                    name: "idx_code_uniq".into(),
616                    columns: vec![1],
617                    unique: true,
618                },
619            ],
620        };
621        let where_clause = Some(Expr::BinaryOp {
622            left: Box::new(Expr::Column("code".into())),
623            op: BinOp::Eq,
624            right: Box::new(Expr::Literal(Value::Text("X".into()))),
625        });
626        let plan = plan_select(&schema, &where_clause);
627        match plan {
628            ScanPlan::IndexScan {
629                index_name,
630                is_unique,
631                ..
632            } => {
633                assert_eq!(index_name, "idx_code_uniq");
634                assert!(is_unique);
635            }
636            other => panic!("expected IndexScan, got {other:?}"),
637        }
638    }
639
640    #[test]
641    fn prefers_more_equality_columns() {
642        let schema = test_schema();
643        let where_clause = Some(Expr::BinaryOp {
644            left: Box::new(Expr::BinaryOp {
645                left: Box::new(Expr::Column("name".into())),
646                op: BinOp::Eq,
647                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
648            }),
649            op: BinOp::And,
650            right: Box::new(Expr::BinaryOp {
651                left: Box::new(Expr::Column("age".into())),
652                op: BinOp::Eq,
653                right: Box::new(Expr::Literal(Value::Integer(30))),
654            }),
655        });
656        let plan = plan_select(&schema, &where_clause);
657        match plan {
658            ScanPlan::IndexScan {
659                index_name,
660                num_prefix_cols,
661                ..
662            } => {
663                assert_eq!(index_name, "idx_name_age");
664                assert_eq!(num_prefix_cols, 2);
665            }
666            other => panic!("expected IndexScan, got {other:?}"),
667        }
668    }
669}