Skip to main content

citadel_sql/
planner.rs

1//! Rule-based query planner with index selection.
2//!
3//! Analyzes WHERE clauses to choose between sequential scan, PK lookup,
4//! or index scan. Uses leftmost-prefix rule for composite indexes.
5
6use crate::encoding::encode_composite_key;
7use crate::parser::{BinOp, Expr};
8use crate::types::{IndexDef, TableSchema, Value};
9
10// ── Scan plan ────────────────────────────────────────────────────────
11
12#[derive(Debug)]
13pub enum ScanPlan {
14    SeqScan,
15    PkLookup {
16        pk_values: Vec<Value>,
17    },
18    IndexScan {
19        index_name: String,
20        idx_table: Vec<u8>,
21        prefix: Vec<u8>,
22        num_prefix_cols: usize,
23        range_conds: Vec<(BinOp, Value)>,
24        is_unique: bool,
25        index_columns: Vec<u16>,
26    },
27}
28
29// ── Simple predicate extraction ──────────────────────────────────────
30
31struct SimplePredicate {
32    col_idx: usize,
33    op: BinOp,
34    value: Value,
35}
36
37fn flatten_and(expr: &Expr) -> Vec<&Expr> {
38    match expr {
39        Expr::BinaryOp {
40            left,
41            op: BinOp::And,
42            right,
43        } => {
44            let mut v = flatten_and(left);
45            v.extend(flatten_and(right));
46            v
47        }
48        _ => vec![expr],
49    }
50}
51
52fn is_comparison(op: BinOp) -> bool {
53    matches!(
54        op,
55        BinOp::Eq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq
56    )
57}
58
59fn is_range_op(op: BinOp) -> bool {
60    matches!(op, BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq)
61}
62
63fn flip_op(op: BinOp) -> BinOp {
64    match op {
65        BinOp::Lt => BinOp::Gt,
66        BinOp::LtEq => BinOp::GtEq,
67        BinOp::Gt => BinOp::Lt,
68        BinOp::GtEq => BinOp::LtEq,
69        other => other,
70    }
71}
72
73fn resolve_column_name(expr: &Expr) -> Option<&str> {
74    match expr {
75        Expr::Column(name) => Some(name.as_str()),
76        Expr::QualifiedColumn { column, .. } => Some(column.as_str()),
77        _ => None,
78    }
79}
80
81fn extract_simple_predicate(expr: &Expr, schema: &TableSchema) -> Option<SimplePredicate> {
82    match expr {
83        Expr::BinaryOp { left, op, right } if is_comparison(*op) => {
84            if let (Some(name), Expr::Literal(val)) = (resolve_column_name(left), right.as_ref()) {
85                let col_idx = schema.column_index(name)?;
86                return Some(SimplePredicate {
87                    col_idx,
88                    op: *op,
89                    value: val.clone(),
90                });
91            }
92            if let (Expr::Literal(val), Some(name)) = (left.as_ref(), resolve_column_name(right)) {
93                let col_idx = schema.column_index(name)?;
94                return Some(SimplePredicate {
95                    col_idx,
96                    op: flip_op(*op),
97                    value: val.clone(),
98                });
99            }
100            None
101        }
102        _ => None,
103    }
104}
105
106// ── Plan selection ───────────────────────────────────────────────────
107
108pub fn plan_select(schema: &TableSchema, where_clause: &Option<Expr>) -> ScanPlan {
109    let where_expr = match where_clause {
110        Some(e) => e,
111        None => return ScanPlan::SeqScan,
112    };
113
114    let predicates = flatten_and(where_expr);
115    let simple: Vec<Option<SimplePredicate>> = predicates
116        .iter()
117        .map(|p| extract_simple_predicate(p, schema))
118        .collect();
119
120    if let Some(plan) = try_pk_lookup(schema, &simple) {
121        return plan;
122    }
123
124    if let Some(plan) = try_best_index(schema, &simple) {
125        return plan;
126    }
127
128    ScanPlan::SeqScan
129}
130
131fn try_pk_lookup(schema: &TableSchema, predicates: &[Option<SimplePredicate>]) -> Option<ScanPlan> {
132    let pk_cols = &schema.primary_key_columns;
133    let mut pk_values: Vec<Option<Value>> = vec![None; pk_cols.len()];
134
135    for pred in predicates.iter().flatten() {
136        if pred.op == BinOp::Eq {
137            if let Some(pk_pos) = pk_cols.iter().position(|&c| c == pred.col_idx as u16) {
138                pk_values[pk_pos] = Some(pred.value.clone());
139            }
140        }
141    }
142
143    if pk_values.iter().all(|v| v.is_some()) {
144        let values: Vec<Value> = pk_values.into_iter().map(|v| v.unwrap()).collect();
145        Some(ScanPlan::PkLookup { pk_values: values })
146    } else {
147        None
148    }
149}
150
151// ── Index scoring and selection ──────────────────────────────────────
152
153#[derive(PartialEq, Eq, PartialOrd, Ord)]
154struct IndexScore {
155    num_equality: usize,
156    has_range: bool,
157    is_unique: bool,
158}
159
160fn try_best_index(
161    schema: &TableSchema,
162    predicates: &[Option<SimplePredicate>],
163) -> Option<ScanPlan> {
164    let mut best_score: Option<IndexScore> = None;
165    let mut best_plan: Option<ScanPlan> = None;
166
167    for idx in &schema.indices {
168        if let Some((score, plan)) = try_index_scan(schema, idx, predicates) {
169            if best_score.is_none() || score > *best_score.as_ref().unwrap() {
170                best_score = Some(score);
171                best_plan = Some(plan);
172            }
173        }
174    }
175
176    best_plan
177}
178
179fn try_index_scan(
180    schema: &TableSchema,
181    idx: &IndexDef,
182    predicates: &[Option<SimplePredicate>],
183) -> Option<(IndexScore, ScanPlan)> {
184    let mut used = Vec::new();
185    let mut equality_values: Vec<Value> = Vec::new();
186    let mut range_conds: Vec<(BinOp, Value)> = Vec::new();
187
188    for &col_idx in &idx.columns {
189        let mut found_eq = false;
190        for (i, pred) in predicates.iter().enumerate() {
191            if used.contains(&i) {
192                continue;
193            }
194            if let Some(sp) = pred {
195                if sp.col_idx == col_idx as usize && sp.op == BinOp::Eq {
196                    equality_values.push(sp.value.clone());
197                    used.push(i);
198                    found_eq = true;
199                    break;
200                }
201            }
202        }
203        if !found_eq {
204            for (i, pred) in predicates.iter().enumerate() {
205                if used.contains(&i) {
206                    continue;
207                }
208                if let Some(sp) = pred {
209                    if sp.col_idx == col_idx as usize && is_range_op(sp.op) {
210                        range_conds.push((sp.op, sp.value.clone()));
211                        used.push(i);
212                    }
213                }
214            }
215            break;
216        }
217    }
218
219    if equality_values.is_empty() && range_conds.is_empty() {
220        return None;
221    }
222
223    let score = IndexScore {
224        num_equality: equality_values.len(),
225        has_range: !range_conds.is_empty(),
226        is_unique: idx.unique,
227    };
228
229    let prefix = encode_composite_key(&equality_values);
230    let idx_table = TableSchema::index_table_name(&schema.name, &idx.name);
231
232    Some((
233        score,
234        ScanPlan::IndexScan {
235            index_name: idx.name.clone(),
236            idx_table,
237            prefix,
238            num_prefix_cols: equality_values.len(),
239            range_conds,
240            is_unique: idx.unique,
241            index_columns: idx.columns.clone(),
242        },
243    ))
244}
245
246// ── Plan description for EXPLAIN ────────────────────────────────────
247
248pub fn describe_plan(plan: &ScanPlan, table_schema: &TableSchema) -> String {
249    match plan {
250        ScanPlan::SeqScan => String::new(),
251
252        ScanPlan::PkLookup { pk_values } => {
253            let pk_cols: Vec<&str> = table_schema
254                .primary_key_columns
255                .iter()
256                .map(|&idx| table_schema.columns[idx as usize].name.as_str())
257                .collect();
258            let conditions: Vec<String> = pk_cols
259                .iter()
260                .zip(pk_values.iter())
261                .map(|(col, val)| format!("{col} = {}", format_value(val)))
262                .collect();
263            format!("USING PRIMARY KEY ({})", conditions.join(", "))
264        }
265
266        ScanPlan::IndexScan {
267            index_name,
268            num_prefix_cols,
269            range_conds,
270            index_columns,
271            ..
272        } => {
273            let mut conditions = Vec::new();
274            for &col in index_columns.iter().take(*num_prefix_cols) {
275                let col_idx = col as usize;
276                let col_name = &table_schema.columns[col_idx].name;
277                conditions.push(format!("{col_name} = ?"));
278            }
279            if !range_conds.is_empty() && *num_prefix_cols < index_columns.len() {
280                let col_idx = index_columns[*num_prefix_cols] as usize;
281                let col_name = &table_schema.columns[col_idx].name;
282                for (op, _) in range_conds {
283                    conditions.push(format!("{col_name} {} ?", op_symbol(*op)));
284                }
285            }
286            if conditions.is_empty() {
287                format!("USING INDEX {index_name}")
288            } else {
289                format!("USING INDEX {index_name} ({})", conditions.join(", "))
290            }
291        }
292    }
293}
294
295fn format_value(val: &Value) -> String {
296    match val {
297        Value::Null => "NULL".into(),
298        Value::Integer(i) => i.to_string(),
299        Value::Real(f) => format!("{f}"),
300        Value::Text(s) => format!("'{s}'"),
301        Value::Blob(_) => "BLOB".into(),
302        Value::Boolean(b) => b.to_string(),
303    }
304}
305
306fn op_symbol(op: BinOp) -> &'static str {
307    match op {
308        BinOp::Lt => "<",
309        BinOp::LtEq => "<=",
310        BinOp::Gt => ">",
311        BinOp::GtEq => ">=",
312        BinOp::Eq => "=",
313        BinOp::NotEq => "!=",
314        _ => "?",
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use crate::types::{ColumnDef, DataType};
322
323    fn col(name: &str, dt: DataType, nullable: bool, pos: u16) -> ColumnDef {
324        ColumnDef {
325            name: name.into(),
326            data_type: dt,
327            nullable,
328            position: pos,
329            default_expr: None,
330            default_sql: None,
331            check_expr: None,
332            check_sql: None,
333            check_name: None,
334        }
335    }
336
337    fn test_schema() -> TableSchema {
338        TableSchema::new(
339            "users".into(),
340            vec![
341                col("id", DataType::Integer, false, 0),
342                col("name", DataType::Text, true, 1),
343                col("age", DataType::Integer, true, 2),
344                col("email", DataType::Text, true, 3),
345            ],
346            vec![0],
347            vec![
348                IndexDef {
349                    name: "idx_name".into(),
350                    columns: vec![1],
351                    unique: false,
352                },
353                IndexDef {
354                    name: "idx_email".into(),
355                    columns: vec![3],
356                    unique: true,
357                },
358                IndexDef {
359                    name: "idx_name_age".into(),
360                    columns: vec![1, 2],
361                    unique: false,
362                },
363            ],
364            vec![],
365            vec![],
366        )
367    }
368
369    #[test]
370    fn no_where_is_seq_scan() {
371        let schema = test_schema();
372        let plan = plan_select(&schema, &None);
373        assert!(matches!(plan, ScanPlan::SeqScan));
374    }
375
376    #[test]
377    fn pk_equality_is_pk_lookup() {
378        let schema = test_schema();
379        let where_clause = Some(Expr::BinaryOp {
380            left: Box::new(Expr::Column("id".into())),
381            op: BinOp::Eq,
382            right: Box::new(Expr::Literal(Value::Integer(42))),
383        });
384        let plan = plan_select(&schema, &where_clause);
385        match plan {
386            ScanPlan::PkLookup { pk_values } => {
387                assert_eq!(pk_values, vec![Value::Integer(42)]);
388            }
389            other => panic!("expected PkLookup, got {other:?}"),
390        }
391    }
392
393    #[test]
394    fn unique_index_equality() {
395        let schema = test_schema();
396        let where_clause = Some(Expr::BinaryOp {
397            left: Box::new(Expr::Column("email".into())),
398            op: BinOp::Eq,
399            right: Box::new(Expr::Literal(Value::Text("alice@test.com".into()))),
400        });
401        let plan = plan_select(&schema, &where_clause);
402        match plan {
403            ScanPlan::IndexScan {
404                index_name,
405                is_unique,
406                num_prefix_cols,
407                ..
408            } => {
409                assert_eq!(index_name, "idx_email");
410                assert!(is_unique);
411                assert_eq!(num_prefix_cols, 1);
412            }
413            other => panic!("expected IndexScan, got {other:?}"),
414        }
415    }
416
417    #[test]
418    fn non_unique_index_equality() {
419        let schema = test_schema();
420        let where_clause = Some(Expr::BinaryOp {
421            left: Box::new(Expr::Column("name".into())),
422            op: BinOp::Eq,
423            right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
424        });
425        let plan = plan_select(&schema, &where_clause);
426        match plan {
427            ScanPlan::IndexScan {
428                index_name,
429                num_prefix_cols,
430                ..
431            } => {
432                assert!(index_name == "idx_name" || index_name == "idx_name_age");
433                assert_eq!(num_prefix_cols, 1);
434            }
435            other => panic!("expected IndexScan, got {other:?}"),
436        }
437    }
438
439    #[test]
440    fn composite_index_full_prefix() {
441        let schema = test_schema();
442        let where_clause = Some(Expr::BinaryOp {
443            left: Box::new(Expr::BinaryOp {
444                left: Box::new(Expr::Column("name".into())),
445                op: BinOp::Eq,
446                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
447            }),
448            op: BinOp::And,
449            right: Box::new(Expr::BinaryOp {
450                left: Box::new(Expr::Column("age".into())),
451                op: BinOp::Eq,
452                right: Box::new(Expr::Literal(Value::Integer(30))),
453            }),
454        });
455        let plan = plan_select(&schema, &where_clause);
456        match plan {
457            ScanPlan::IndexScan {
458                index_name,
459                num_prefix_cols,
460                ..
461            } => {
462                assert_eq!(index_name, "idx_name_age");
463                assert_eq!(num_prefix_cols, 2);
464            }
465            other => panic!("expected IndexScan, got {other:?}"),
466        }
467    }
468
469    #[test]
470    fn range_scan_on_indexed_column() {
471        let schema = test_schema();
472        let where_clause = Some(Expr::BinaryOp {
473            left: Box::new(Expr::Column("name".into())),
474            op: BinOp::Gt,
475            right: Box::new(Expr::Literal(Value::Text("M".into()))),
476        });
477        let plan = plan_select(&schema, &where_clause);
478        match plan {
479            ScanPlan::IndexScan {
480                range_conds,
481                num_prefix_cols,
482                ..
483            } => {
484                assert_eq!(num_prefix_cols, 0);
485                assert_eq!(range_conds.len(), 1);
486                assert_eq!(range_conds[0].0, BinOp::Gt);
487            }
488            other => panic!("expected IndexScan, got {other:?}"),
489        }
490    }
491
492    #[test]
493    fn composite_equality_plus_range() {
494        let schema = test_schema();
495        let where_clause = Some(Expr::BinaryOp {
496            left: Box::new(Expr::BinaryOp {
497                left: Box::new(Expr::Column("name".into())),
498                op: BinOp::Eq,
499                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
500            }),
501            op: BinOp::And,
502            right: Box::new(Expr::BinaryOp {
503                left: Box::new(Expr::Column("age".into())),
504                op: BinOp::Gt,
505                right: Box::new(Expr::Literal(Value::Integer(25))),
506            }),
507        });
508        let plan = plan_select(&schema, &where_clause);
509        match plan {
510            ScanPlan::IndexScan {
511                index_name,
512                num_prefix_cols,
513                range_conds,
514                ..
515            } => {
516                assert_eq!(index_name, "idx_name_age");
517                assert_eq!(num_prefix_cols, 1);
518                assert_eq!(range_conds.len(), 1);
519            }
520            other => panic!("expected IndexScan, got {other:?}"),
521        }
522    }
523
524    #[test]
525    fn or_condition_falls_back_to_seq_scan() {
526        let schema = test_schema();
527        let where_clause = Some(Expr::BinaryOp {
528            left: Box::new(Expr::BinaryOp {
529                left: Box::new(Expr::Column("name".into())),
530                op: BinOp::Eq,
531                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
532            }),
533            op: BinOp::Or,
534            right: Box::new(Expr::BinaryOp {
535                left: Box::new(Expr::Column("name".into())),
536                op: BinOp::Eq,
537                right: Box::new(Expr::Literal(Value::Text("Bob".into()))),
538            }),
539        });
540        let plan = plan_select(&schema, &where_clause);
541        assert!(matches!(plan, ScanPlan::SeqScan));
542    }
543
544    #[test]
545    fn non_indexed_column_is_seq_scan() {
546        let schema = test_schema();
547        let where_clause = Some(Expr::BinaryOp {
548            left: Box::new(Expr::Column("age".into())),
549            op: BinOp::Eq,
550            right: Box::new(Expr::Literal(Value::Integer(30))),
551        });
552        let plan = plan_select(&schema, &where_clause);
553        assert!(matches!(plan, ScanPlan::SeqScan));
554    }
555
556    #[test]
557    fn reversed_literal_column() {
558        let schema = test_schema();
559        let where_clause = Some(Expr::BinaryOp {
560            left: Box::new(Expr::Literal(Value::Integer(42))),
561            op: BinOp::Eq,
562            right: Box::new(Expr::Column("id".into())),
563        });
564        let plan = plan_select(&schema, &where_clause);
565        assert!(matches!(plan, ScanPlan::PkLookup { .. }));
566    }
567
568    #[test]
569    fn reversed_comparison_flips_op() {
570        let schema = test_schema();
571        let where_clause = Some(Expr::BinaryOp {
572            left: Box::new(Expr::Literal(Value::Integer(5))),
573            op: BinOp::Lt,
574            right: Box::new(Expr::Column("name".into())),
575        });
576        let plan = plan_select(&schema, &where_clause);
577        match plan {
578            ScanPlan::IndexScan { range_conds, .. } => {
579                assert_eq!(range_conds[0].0, BinOp::Gt);
580            }
581            other => panic!("expected IndexScan, got {other:?}"),
582        }
583    }
584
585    #[test]
586    fn prefers_unique_index() {
587        let schema = TableSchema::new(
588            "t".into(),
589            vec![
590                col("id", DataType::Integer, false, 0),
591                col("code", DataType::Text, false, 1),
592            ],
593            vec![0],
594            vec![
595                IndexDef {
596                    name: "idx_code".into(),
597                    columns: vec![1],
598                    unique: false,
599                },
600                IndexDef {
601                    name: "idx_code_uniq".into(),
602                    columns: vec![1],
603                    unique: true,
604                },
605            ],
606            vec![],
607            vec![],
608        );
609        let where_clause = Some(Expr::BinaryOp {
610            left: Box::new(Expr::Column("code".into())),
611            op: BinOp::Eq,
612            right: Box::new(Expr::Literal(Value::Text("X".into()))),
613        });
614        let plan = plan_select(&schema, &where_clause);
615        match plan {
616            ScanPlan::IndexScan {
617                index_name,
618                is_unique,
619                ..
620            } => {
621                assert_eq!(index_name, "idx_code_uniq");
622                assert!(is_unique);
623            }
624            other => panic!("expected IndexScan, got {other:?}"),
625        }
626    }
627
628    #[test]
629    fn prefers_more_equality_columns() {
630        let schema = test_schema();
631        let where_clause = Some(Expr::BinaryOp {
632            left: Box::new(Expr::BinaryOp {
633                left: Box::new(Expr::Column("name".into())),
634                op: BinOp::Eq,
635                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
636            }),
637            op: BinOp::And,
638            right: Box::new(Expr::BinaryOp {
639                left: Box::new(Expr::Column("age".into())),
640                op: BinOp::Eq,
641                right: Box::new(Expr::Literal(Value::Integer(30))),
642            }),
643        });
644        let plan = plan_select(&schema, &where_clause);
645        match plan {
646            ScanPlan::IndexScan {
647                index_name,
648                num_prefix_cols,
649                ..
650            } => {
651                assert_eq!(index_name, "idx_name_age");
652                assert_eq!(num_prefix_cols, 2);
653            }
654            other => panic!("expected IndexScan, got {other:?}"),
655        }
656    }
657}