Skip to main content

citadel_sql/
planner.rs

1//! Query planner: chooses between seq scan, PK lookup, or index scan.
2
3use crate::encoding::encode_composite_key;
4use crate::parser::{BinOp, Expr};
5use crate::types::{IndexDef, TableSchema, Value};
6
7// ── Scan plan ────────────────────────────────────────────────────────
8
9#[derive(Debug, Clone)]
10pub enum ScanPlan {
11    SeqScan,
12    PkLookup {
13        pk_values: Vec<Value>,
14    },
15    PkRangeScan {
16        start_key: Vec<u8>,
17        range_conds: Vec<(BinOp, Value)>,
18        num_pk_cols: usize,
19    },
20    IndexScan {
21        index_name: String,
22        idx_table: Vec<u8>,
23        prefix: Vec<u8>,
24        num_prefix_cols: usize,
25        range_conds: Vec<(BinOp, Value)>,
26        is_unique: bool,
27        index_columns: Vec<u16>,
28    },
29}
30
31// ── Simple predicate extraction ──────────────────────────────────────
32
33struct SimplePredicate {
34    col_idx: usize,
35    op: BinOp,
36    value: Value,
37}
38
39fn flatten_and(expr: &Expr) -> Vec<&Expr> {
40    match expr {
41        Expr::BinaryOp {
42            left,
43            op: BinOp::And,
44            right,
45        } => {
46            let mut v = flatten_and(left);
47            v.extend(flatten_and(right));
48            v
49        }
50        _ => vec![expr],
51    }
52}
53
54fn is_comparison(op: BinOp) -> bool {
55    matches!(
56        op,
57        BinOp::Eq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq
58    )
59}
60
61fn is_range_op(op: BinOp) -> bool {
62    matches!(op, BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq)
63}
64
65fn flip_op(op: BinOp) -> BinOp {
66    match op {
67        BinOp::Lt => BinOp::Gt,
68        BinOp::LtEq => BinOp::GtEq,
69        BinOp::Gt => BinOp::Lt,
70        BinOp::GtEq => BinOp::LtEq,
71        other => other,
72    }
73}
74
75fn resolve_column_name(expr: &Expr) -> Option<&str> {
76    match expr {
77        Expr::Column(name) => Some(name.as_str()),
78        Expr::QualifiedColumn { column, .. } => Some(column.as_str()),
79        _ => None,
80    }
81}
82
83fn extract_simple_predicate(expr: &Expr, schema: &TableSchema) -> Option<SimplePredicate> {
84    match expr {
85        Expr::BinaryOp { left, op, right } if is_comparison(*op) => {
86            if let (Some(name), Expr::Literal(val)) = (resolve_column_name(left), right.as_ref()) {
87                let col_idx = schema.column_index(name)?;
88                return Some(SimplePredicate {
89                    col_idx,
90                    op: *op,
91                    value: val.clone(),
92                });
93            }
94            if let (Expr::Literal(val), Some(name)) = (left.as_ref(), resolve_column_name(right)) {
95                let col_idx = schema.column_index(name)?;
96                return Some(SimplePredicate {
97                    col_idx,
98                    op: flip_op(*op),
99                    value: val.clone(),
100                });
101            }
102            None
103        }
104        _ => None,
105    }
106}
107
108/// Decompose BETWEEN into two range predicates for planner use.
109fn flatten_between(expr: &Expr, schema: &TableSchema, out: &mut Vec<SimplePredicate>) {
110    match expr {
111        Expr::Between {
112            expr: col_expr,
113            low,
114            high,
115            negated: false,
116        } => {
117            if let (Some(name), Expr::Literal(lo), Expr::Literal(hi)) =
118                (resolve_column_name(col_expr), low.as_ref(), high.as_ref())
119            {
120                if let Some(col_idx) = schema.column_index(name) {
121                    out.push(SimplePredicate {
122                        col_idx,
123                        op: BinOp::GtEq,
124                        value: lo.clone(),
125                    });
126                    out.push(SimplePredicate {
127                        col_idx,
128                        op: BinOp::LtEq,
129                        value: hi.clone(),
130                    });
131                }
132            }
133        }
134        Expr::BinaryOp {
135            left,
136            op: BinOp::And,
137            right,
138        } => {
139            flatten_between(left, schema, out);
140            flatten_between(right, schema, out);
141        }
142        _ => {}
143    }
144}
145
146// ── Plan selection ───────────────────────────────────────────────────
147
148pub fn plan_select(schema: &TableSchema, where_clause: &Option<Expr>) -> ScanPlan {
149    let where_expr = match where_clause {
150        Some(e) => e,
151        None => return ScanPlan::SeqScan,
152    };
153
154    let predicates = flatten_and(where_expr);
155    let simple: Vec<Option<SimplePredicate>> = predicates
156        .iter()
157        .map(|p| extract_simple_predicate(p, schema))
158        .collect();
159
160    if let Some(plan) = try_pk_lookup(schema, &simple) {
161        return plan;
162    }
163
164    // Collect range predicates from both comparisons and BETWEEN
165    let mut range_preds: Vec<SimplePredicate> = simple
166        .iter()
167        .filter_map(|p| {
168            let p = p.as_ref()?;
169            if is_range_op(p.op) {
170                Some(SimplePredicate {
171                    col_idx: p.col_idx,
172                    op: p.op,
173                    value: p.value.clone(),
174                })
175            } else {
176                None
177            }
178        })
179        .collect();
180    flatten_between(where_expr, schema, &mut range_preds);
181
182    if let Some(plan) = try_pk_range_scan(schema, &range_preds) {
183        return plan;
184    }
185
186    if let Some(plan) = try_best_index(schema, &simple) {
187        return plan;
188    }
189
190    ScanPlan::SeqScan
191}
192
193fn try_pk_range_scan(schema: &TableSchema, range_preds: &[SimplePredicate]) -> Option<ScanPlan> {
194    if schema.primary_key_columns.len() != 1 {
195        return None; // Only single-column PK for now
196    }
197    let pk_col = schema.primary_key_columns[0] as usize;
198    let conds: Vec<(BinOp, Value)> = range_preds
199        .iter()
200        .filter(|p| p.col_idx == pk_col)
201        .map(|p| (p.op, p.value.clone()))
202        .collect();
203    if conds.is_empty() {
204        return None;
205    }
206    // Build start key from the tightest lower bound (GtEq or Gt)
207    let start_key = conds
208        .iter()
209        .filter(|(op, _)| matches!(op, BinOp::GtEq | BinOp::Gt))
210        .map(|(_, v)| encode_composite_key(std::slice::from_ref(v)))
211        .min_by(|a, b| a.cmp(b))
212        .unwrap_or_default();
213    Some(ScanPlan::PkRangeScan {
214        start_key,
215        range_conds: conds,
216        num_pk_cols: 1,
217    })
218}
219
220fn try_pk_lookup(schema: &TableSchema, predicates: &[Option<SimplePredicate>]) -> Option<ScanPlan> {
221    let pk_cols = &schema.primary_key_columns;
222    let mut pk_values: Vec<Option<Value>> = vec![None; pk_cols.len()];
223
224    for pred in predicates.iter().flatten() {
225        if pred.op == BinOp::Eq {
226            if let Some(pk_pos) = pk_cols.iter().position(|&c| c == pred.col_idx as u16) {
227                pk_values[pk_pos] = Some(pred.value.clone());
228            }
229        }
230    }
231
232    if pk_values.iter().all(|v| v.is_some()) {
233        let values: Vec<Value> = pk_values.into_iter().map(|v| v.unwrap()).collect();
234        Some(ScanPlan::PkLookup { pk_values: values })
235    } else {
236        None
237    }
238}
239
240// ── Index scoring and selection ──────────────────────────────────────
241
242#[derive(PartialEq, Eq, PartialOrd, Ord)]
243struct IndexScore {
244    num_equality: usize,
245    has_range: bool,
246    is_unique: bool,
247}
248
249fn try_best_index(
250    schema: &TableSchema,
251    predicates: &[Option<SimplePredicate>],
252) -> Option<ScanPlan> {
253    let mut best_score: Option<IndexScore> = None;
254    let mut best_plan: Option<ScanPlan> = None;
255
256    for idx in &schema.indices {
257        if let Some((score, plan)) = try_index_scan(schema, idx, predicates) {
258            if best_score.is_none() || score > *best_score.as_ref().unwrap() {
259                best_score = Some(score);
260                best_plan = Some(plan);
261            }
262        }
263    }
264
265    best_plan
266}
267
268fn try_index_scan(
269    schema: &TableSchema,
270    idx: &IndexDef,
271    predicates: &[Option<SimplePredicate>],
272) -> Option<(IndexScore, ScanPlan)> {
273    let mut used = Vec::new();
274    let mut equality_values: Vec<Value> = Vec::new();
275    let mut range_conds: Vec<(BinOp, Value)> = Vec::new();
276
277    for &col_idx in &idx.columns {
278        let mut found_eq = false;
279        for (i, pred) in predicates.iter().enumerate() {
280            if used.contains(&i) {
281                continue;
282            }
283            if let Some(sp) = pred {
284                if sp.col_idx == col_idx as usize && sp.op == BinOp::Eq {
285                    equality_values.push(sp.value.clone());
286                    used.push(i);
287                    found_eq = true;
288                    break;
289                }
290            }
291        }
292        if !found_eq {
293            for (i, pred) in predicates.iter().enumerate() {
294                if used.contains(&i) {
295                    continue;
296                }
297                if let Some(sp) = pred {
298                    if sp.col_idx == col_idx as usize && is_range_op(sp.op) {
299                        range_conds.push((sp.op, sp.value.clone()));
300                        used.push(i);
301                    }
302                }
303            }
304            break;
305        }
306    }
307
308    if equality_values.is_empty() && range_conds.is_empty() {
309        return None;
310    }
311
312    let score = IndexScore {
313        num_equality: equality_values.len(),
314        has_range: !range_conds.is_empty(),
315        is_unique: idx.unique,
316    };
317
318    let prefix = encode_composite_key(&equality_values);
319    let idx_table = TableSchema::index_table_name(&schema.name, &idx.name);
320
321    Some((
322        score,
323        ScanPlan::IndexScan {
324            index_name: idx.name.clone(),
325            idx_table,
326            prefix,
327            num_prefix_cols: equality_values.len(),
328            range_conds,
329            is_unique: idx.unique,
330            index_columns: idx.columns.clone(),
331        },
332    ))
333}
334
335// ── Plan description for EXPLAIN ────────────────────────────────────
336
337pub fn describe_plan(plan: &ScanPlan, table_schema: &TableSchema) -> String {
338    match plan {
339        ScanPlan::SeqScan => String::new(),
340
341        ScanPlan::PkLookup { pk_values } => {
342            let pk_cols: Vec<&str> = table_schema
343                .primary_key_columns
344                .iter()
345                .map(|&idx| table_schema.columns[idx as usize].name.as_str())
346                .collect();
347            let conditions: Vec<String> = pk_cols
348                .iter()
349                .zip(pk_values.iter())
350                .map(|(col, val)| format!("{col} = {}", format_value(val)))
351                .collect();
352            format!("USING PRIMARY KEY ({})", conditions.join(", "))
353        }
354
355        ScanPlan::PkRangeScan { range_conds, .. } => {
356            let pk_col = &table_schema.columns[table_schema.primary_key_columns[0] as usize].name;
357            let conditions: Vec<String> = range_conds
358                .iter()
359                .map(|(op, val)| format!("{pk_col} {} {}", op_symbol(*op), format_value(val)))
360                .collect();
361            format!("USING PRIMARY KEY RANGE ({})", conditions.join(", "))
362        }
363
364        ScanPlan::IndexScan {
365            index_name,
366            num_prefix_cols,
367            range_conds,
368            index_columns,
369            ..
370        } => {
371            let mut conditions = Vec::new();
372            for &col in index_columns.iter().take(*num_prefix_cols) {
373                let col_idx = col as usize;
374                let col_name = &table_schema.columns[col_idx].name;
375                conditions.push(format!("{col_name} = ?"));
376            }
377            if !range_conds.is_empty() && *num_prefix_cols < index_columns.len() {
378                let col_idx = index_columns[*num_prefix_cols] as usize;
379                let col_name = &table_schema.columns[col_idx].name;
380                for (op, _) in range_conds {
381                    conditions.push(format!("{col_name} {} ?", op_symbol(*op)));
382                }
383            }
384            if conditions.is_empty() {
385                format!("USING INDEX {index_name}")
386            } else {
387                format!("USING INDEX {index_name} ({})", conditions.join(", "))
388            }
389        }
390    }
391}
392
393fn format_value(val: &Value) -> String {
394    match val {
395        Value::Null => "NULL".into(),
396        Value::Integer(i) => i.to_string(),
397        Value::Real(f) => format!("{f}"),
398        Value::Text(s) => format!("'{s}'"),
399        Value::Blob(_) => "BLOB".into(),
400        Value::Boolean(b) => b.to_string(),
401    }
402}
403
404fn op_symbol(op: BinOp) -> &'static str {
405    match op {
406        BinOp::Lt => "<",
407        BinOp::LtEq => "<=",
408        BinOp::Gt => ">",
409        BinOp::GtEq => ">=",
410        BinOp::Eq => "=",
411        BinOp::NotEq => "!=",
412        _ => "?",
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use crate::types::{ColumnDef, DataType};
420
421    fn col(name: &str, dt: DataType, nullable: bool, pos: u16) -> ColumnDef {
422        ColumnDef {
423            name: name.into(),
424            data_type: dt,
425            nullable,
426            position: pos,
427            default_expr: None,
428            default_sql: None,
429            check_expr: None,
430            check_sql: None,
431            check_name: None,
432        }
433    }
434
435    fn test_schema() -> TableSchema {
436        TableSchema::new(
437            "users".into(),
438            vec![
439                col("id", DataType::Integer, false, 0),
440                col("name", DataType::Text, true, 1),
441                col("age", DataType::Integer, true, 2),
442                col("email", DataType::Text, true, 3),
443            ],
444            vec![0],
445            vec![
446                IndexDef {
447                    name: "idx_name".into(),
448                    columns: vec![1],
449                    unique: false,
450                },
451                IndexDef {
452                    name: "idx_email".into(),
453                    columns: vec![3],
454                    unique: true,
455                },
456                IndexDef {
457                    name: "idx_name_age".into(),
458                    columns: vec![1, 2],
459                    unique: false,
460                },
461            ],
462            vec![],
463            vec![],
464        )
465    }
466
467    #[test]
468    fn no_where_is_seq_scan() {
469        let schema = test_schema();
470        let plan = plan_select(&schema, &None);
471        assert!(matches!(plan, ScanPlan::SeqScan));
472    }
473
474    #[test]
475    fn pk_equality_is_pk_lookup() {
476        let schema = test_schema();
477        let where_clause = Some(Expr::BinaryOp {
478            left: Box::new(Expr::Column("id".into())),
479            op: BinOp::Eq,
480            right: Box::new(Expr::Literal(Value::Integer(42))),
481        });
482        let plan = plan_select(&schema, &where_clause);
483        match plan {
484            ScanPlan::PkLookup { pk_values } => {
485                assert_eq!(pk_values, vec![Value::Integer(42)]);
486            }
487            other => panic!("expected PkLookup, got {other:?}"),
488        }
489    }
490
491    #[test]
492    fn unique_index_equality() {
493        let schema = test_schema();
494        let where_clause = Some(Expr::BinaryOp {
495            left: Box::new(Expr::Column("email".into())),
496            op: BinOp::Eq,
497            right: Box::new(Expr::Literal(Value::Text("alice@test.com".into()))),
498        });
499        let plan = plan_select(&schema, &where_clause);
500        match plan {
501            ScanPlan::IndexScan {
502                index_name,
503                is_unique,
504                num_prefix_cols,
505                ..
506            } => {
507                assert_eq!(index_name, "idx_email");
508                assert!(is_unique);
509                assert_eq!(num_prefix_cols, 1);
510            }
511            other => panic!("expected IndexScan, got {other:?}"),
512        }
513    }
514
515    #[test]
516    fn non_unique_index_equality() {
517        let schema = test_schema();
518        let where_clause = Some(Expr::BinaryOp {
519            left: Box::new(Expr::Column("name".into())),
520            op: BinOp::Eq,
521            right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
522        });
523        let plan = plan_select(&schema, &where_clause);
524        match plan {
525            ScanPlan::IndexScan {
526                index_name,
527                num_prefix_cols,
528                ..
529            } => {
530                assert!(index_name == "idx_name" || index_name == "idx_name_age");
531                assert_eq!(num_prefix_cols, 1);
532            }
533            other => panic!("expected IndexScan, got {other:?}"),
534        }
535    }
536
537    #[test]
538    fn composite_index_full_prefix() {
539        let schema = test_schema();
540        let where_clause = Some(Expr::BinaryOp {
541            left: Box::new(Expr::BinaryOp {
542                left: Box::new(Expr::Column("name".into())),
543                op: BinOp::Eq,
544                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
545            }),
546            op: BinOp::And,
547            right: Box::new(Expr::BinaryOp {
548                left: Box::new(Expr::Column("age".into())),
549                op: BinOp::Eq,
550                right: Box::new(Expr::Literal(Value::Integer(30))),
551            }),
552        });
553        let plan = plan_select(&schema, &where_clause);
554        match plan {
555            ScanPlan::IndexScan {
556                index_name,
557                num_prefix_cols,
558                ..
559            } => {
560                assert_eq!(index_name, "idx_name_age");
561                assert_eq!(num_prefix_cols, 2);
562            }
563            other => panic!("expected IndexScan, got {other:?}"),
564        }
565    }
566
567    #[test]
568    fn range_scan_on_indexed_column() {
569        let schema = test_schema();
570        let where_clause = Some(Expr::BinaryOp {
571            left: Box::new(Expr::Column("name".into())),
572            op: BinOp::Gt,
573            right: Box::new(Expr::Literal(Value::Text("M".into()))),
574        });
575        let plan = plan_select(&schema, &where_clause);
576        match plan {
577            ScanPlan::IndexScan {
578                range_conds,
579                num_prefix_cols,
580                ..
581            } => {
582                assert_eq!(num_prefix_cols, 0);
583                assert_eq!(range_conds.len(), 1);
584                assert_eq!(range_conds[0].0, BinOp::Gt);
585            }
586            other => panic!("expected IndexScan, got {other:?}"),
587        }
588    }
589
590    #[test]
591    fn composite_equality_plus_range() {
592        let schema = test_schema();
593        let where_clause = Some(Expr::BinaryOp {
594            left: Box::new(Expr::BinaryOp {
595                left: Box::new(Expr::Column("name".into())),
596                op: BinOp::Eq,
597                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
598            }),
599            op: BinOp::And,
600            right: Box::new(Expr::BinaryOp {
601                left: Box::new(Expr::Column("age".into())),
602                op: BinOp::Gt,
603                right: Box::new(Expr::Literal(Value::Integer(25))),
604            }),
605        });
606        let plan = plan_select(&schema, &where_clause);
607        match plan {
608            ScanPlan::IndexScan {
609                index_name,
610                num_prefix_cols,
611                range_conds,
612                ..
613            } => {
614                assert_eq!(index_name, "idx_name_age");
615                assert_eq!(num_prefix_cols, 1);
616                assert_eq!(range_conds.len(), 1);
617            }
618            other => panic!("expected IndexScan, got {other:?}"),
619        }
620    }
621
622    #[test]
623    fn or_condition_falls_back_to_seq_scan() {
624        let schema = test_schema();
625        let where_clause = Some(Expr::BinaryOp {
626            left: Box::new(Expr::BinaryOp {
627                left: Box::new(Expr::Column("name".into())),
628                op: BinOp::Eq,
629                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
630            }),
631            op: BinOp::Or,
632            right: Box::new(Expr::BinaryOp {
633                left: Box::new(Expr::Column("name".into())),
634                op: BinOp::Eq,
635                right: Box::new(Expr::Literal(Value::Text("Bob".into()))),
636            }),
637        });
638        let plan = plan_select(&schema, &where_clause);
639        assert!(matches!(plan, ScanPlan::SeqScan));
640    }
641
642    #[test]
643    fn non_indexed_column_is_seq_scan() {
644        let schema = test_schema();
645        let where_clause = Some(Expr::BinaryOp {
646            left: Box::new(Expr::Column("age".into())),
647            op: BinOp::Eq,
648            right: Box::new(Expr::Literal(Value::Integer(30))),
649        });
650        let plan = plan_select(&schema, &where_clause);
651        assert!(matches!(plan, ScanPlan::SeqScan));
652    }
653
654    #[test]
655    fn reversed_literal_column() {
656        let schema = test_schema();
657        let where_clause = Some(Expr::BinaryOp {
658            left: Box::new(Expr::Literal(Value::Integer(42))),
659            op: BinOp::Eq,
660            right: Box::new(Expr::Column("id".into())),
661        });
662        let plan = plan_select(&schema, &where_clause);
663        assert!(matches!(plan, ScanPlan::PkLookup { .. }));
664    }
665
666    #[test]
667    fn reversed_comparison_flips_op() {
668        let schema = test_schema();
669        let where_clause = Some(Expr::BinaryOp {
670            left: Box::new(Expr::Literal(Value::Integer(5))),
671            op: BinOp::Lt,
672            right: Box::new(Expr::Column("name".into())),
673        });
674        let plan = plan_select(&schema, &where_clause);
675        match plan {
676            ScanPlan::IndexScan { range_conds, .. } => {
677                assert_eq!(range_conds[0].0, BinOp::Gt);
678            }
679            other => panic!("expected IndexScan, got {other:?}"),
680        }
681    }
682
683    #[test]
684    fn prefers_unique_index() {
685        let schema = TableSchema::new(
686            "t".into(),
687            vec![
688                col("id", DataType::Integer, false, 0),
689                col("code", DataType::Text, false, 1),
690            ],
691            vec![0],
692            vec![
693                IndexDef {
694                    name: "idx_code".into(),
695                    columns: vec![1],
696                    unique: false,
697                },
698                IndexDef {
699                    name: "idx_code_uniq".into(),
700                    columns: vec![1],
701                    unique: true,
702                },
703            ],
704            vec![],
705            vec![],
706        );
707        let where_clause = Some(Expr::BinaryOp {
708            left: Box::new(Expr::Column("code".into())),
709            op: BinOp::Eq,
710            right: Box::new(Expr::Literal(Value::Text("X".into()))),
711        });
712        let plan = plan_select(&schema, &where_clause);
713        match plan {
714            ScanPlan::IndexScan {
715                index_name,
716                is_unique,
717                ..
718            } => {
719                assert_eq!(index_name, "idx_code_uniq");
720                assert!(is_unique);
721            }
722            other => panic!("expected IndexScan, got {other:?}"),
723        }
724    }
725
726    #[test]
727    fn prefers_more_equality_columns() {
728        let schema = test_schema();
729        let where_clause = Some(Expr::BinaryOp {
730            left: Box::new(Expr::BinaryOp {
731                left: Box::new(Expr::Column("name".into())),
732                op: BinOp::Eq,
733                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
734            }),
735            op: BinOp::And,
736            right: Box::new(Expr::BinaryOp {
737                left: Box::new(Expr::Column("age".into())),
738                op: BinOp::Eq,
739                right: Box::new(Expr::Literal(Value::Integer(30))),
740            }),
741        });
742        let plan = plan_select(&schema, &where_clause);
743        match plan {
744            ScanPlan::IndexScan {
745                index_name,
746                num_prefix_cols,
747                ..
748            } => {
749                assert_eq!(index_name, "idx_name_age");
750                assert_eq!(num_prefix_cols, 2);
751            }
752            other => panic!("expected IndexScan, got {other:?}"),
753        }
754    }
755}