Skip to main content

citadel_sql/
planner.rs

1//! Query planner: chooses between seq scan, PK lookup, or index scan.
2
3use crate::encoding::encode_composite_key;
4use crate::parser::{BinOp, Expr};
5use crate::types::{IndexDef, TableSchema, Value};
6
7#[derive(Debug, Clone)]
8pub enum ScanPlan {
9    SeqScan,
10    PkLookup {
11        pk_values: Vec<Value>,
12    },
13    PkRangeScan {
14        start_key: Vec<u8>,
15        range_conds: Vec<(BinOp, Value)>,
16        num_pk_cols: usize,
17    },
18    IndexScan {
19        index_name: String,
20        idx_table: Vec<u8>,
21        prefix: Vec<u8>,
22        num_prefix_cols: usize,
23        range_conds: Vec<(BinOp, Value)>,
24        is_unique: bool,
25        index_columns: Vec<u16>,
26    },
27}
28
29struct SimplePredicate {
30    col_idx: usize,
31    op: BinOp,
32    value: Value,
33}
34
35fn flatten_and(expr: &Expr) -> Vec<&Expr> {
36    match expr {
37        Expr::BinaryOp {
38            left,
39            op: BinOp::And,
40            right,
41        } => {
42            let mut v = flatten_and(left);
43            v.extend(flatten_and(right));
44            v
45        }
46        _ => vec![expr],
47    }
48}
49
50fn is_comparison(op: BinOp) -> bool {
51    matches!(
52        op,
53        BinOp::Eq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq
54    )
55}
56
57fn is_range_op(op: BinOp) -> bool {
58    matches!(op, BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq)
59}
60
61fn flip_op(op: BinOp) -> BinOp {
62    match op {
63        BinOp::Lt => BinOp::Gt,
64        BinOp::LtEq => BinOp::GtEq,
65        BinOp::Gt => BinOp::Lt,
66        BinOp::GtEq => BinOp::LtEq,
67        other => other,
68    }
69}
70
71fn resolve_column_name(expr: &Expr) -> Option<&str> {
72    match expr {
73        Expr::Column(name) => Some(name.as_str()),
74        Expr::QualifiedColumn { column, .. } => Some(column.as_str()),
75        _ => None,
76    }
77}
78
79fn resolve_literal(expr: &Expr) -> Option<Value> {
80    match expr {
81        Expr::Literal(v) => Some(v.clone()),
82        Expr::Parameter(n) => crate::eval::resolve_scoped_param(*n).ok(),
83        _ => None,
84    }
85}
86
87fn extract_simple_predicate(expr: &Expr, schema: &TableSchema) -> Option<SimplePredicate> {
88    match expr {
89        Expr::BinaryOp { left, op, right } if is_comparison(*op) => {
90            if let (Some(name), Some(val)) = (resolve_column_name(left), resolve_literal(right)) {
91                let col_idx = schema.column_index(name)?;
92                return Some(SimplePredicate {
93                    col_idx,
94                    op: *op,
95                    value: val,
96                });
97            }
98            if let (Some(val), Some(name)) = (resolve_literal(left), resolve_column_name(right)) {
99                let col_idx = schema.column_index(name)?;
100                return Some(SimplePredicate {
101                    col_idx,
102                    op: flip_op(*op),
103                    value: val,
104                });
105            }
106            None
107        }
108        _ => None,
109    }
110}
111
112/// Decompose BETWEEN into two range predicates for planner use.
113fn flatten_between(expr: &Expr, schema: &TableSchema, out: &mut Vec<SimplePredicate>) {
114    match expr {
115        Expr::Between {
116            expr: col_expr,
117            low,
118            high,
119            negated: false,
120        } => {
121            if let (Some(name), Some(lo), Some(hi)) = (
122                resolve_column_name(col_expr),
123                resolve_literal(low),
124                resolve_literal(high),
125            ) {
126                if let Some(col_idx) = schema.column_index(name) {
127                    out.push(SimplePredicate {
128                        col_idx,
129                        op: BinOp::GtEq,
130                        value: lo,
131                    });
132                    out.push(SimplePredicate {
133                        col_idx,
134                        op: BinOp::LtEq,
135                        value: hi,
136                    });
137                }
138            }
139        }
140        Expr::BinaryOp {
141            left,
142            op: BinOp::And,
143            right,
144        } => {
145            flatten_between(left, schema, out);
146            flatten_between(right, schema, out);
147        }
148        _ => {}
149    }
150}
151
152pub fn plan_select(schema: &TableSchema, where_clause: &Option<Expr>) -> ScanPlan {
153    let where_expr = match where_clause {
154        Some(e) => e,
155        None => return ScanPlan::SeqScan,
156    };
157
158    let predicates = flatten_and(where_expr);
159    let simple: Vec<Option<SimplePredicate>> = predicates
160        .iter()
161        .map(|p| extract_simple_predicate(p, schema))
162        .collect();
163
164    if let Some(plan) = try_pk_lookup(schema, &simple) {
165        return plan;
166    }
167
168    let mut range_preds: Vec<SimplePredicate> = simple
169        .iter()
170        .filter_map(|p| {
171            let p = p.as_ref()?;
172            if is_range_op(p.op) {
173                Some(SimplePredicate {
174                    col_idx: p.col_idx,
175                    op: p.op,
176                    value: p.value.clone(),
177                })
178            } else {
179                None
180            }
181        })
182        .collect();
183    flatten_between(where_expr, schema, &mut range_preds);
184
185    if let Some(plan) = try_pk_range_scan(schema, &range_preds) {
186        return plan;
187    }
188
189    if let Some(plan) = try_best_index(schema, &simple) {
190        return plan;
191    }
192
193    ScanPlan::SeqScan
194}
195
196fn try_pk_range_scan(schema: &TableSchema, range_preds: &[SimplePredicate]) -> Option<ScanPlan> {
197    if schema.primary_key_columns.len() != 1 {
198        return None; // Only single-column PK for now
199    }
200    let pk_col = schema.primary_key_columns[0] as usize;
201    let conds: Vec<(BinOp, Value)> = range_preds
202        .iter()
203        .filter(|p| p.col_idx == pk_col)
204        .map(|p| (p.op, p.value.clone()))
205        .collect();
206    if conds.is_empty() {
207        return None;
208    }
209    let start_key = conds
210        .iter()
211        .filter(|(op, _)| matches!(op, BinOp::GtEq | BinOp::Gt))
212        .map(|(_, v)| encode_composite_key(std::slice::from_ref(v)))
213        .min_by(|a, b| a.cmp(b))
214        .unwrap_or_default();
215    Some(ScanPlan::PkRangeScan {
216        start_key,
217        range_conds: conds,
218        num_pk_cols: 1,
219    })
220}
221
222fn try_pk_lookup(schema: &TableSchema, predicates: &[Option<SimplePredicate>]) -> Option<ScanPlan> {
223    let pk_cols = &schema.primary_key_columns;
224    let mut pk_values: Vec<Option<Value>> = vec![None; pk_cols.len()];
225
226    for pred in predicates.iter().flatten() {
227        if pred.op == BinOp::Eq {
228            if let Some(pk_pos) = pk_cols.iter().position(|&c| c == pred.col_idx as u16) {
229                pk_values[pk_pos] = Some(pred.value.clone());
230            }
231        }
232    }
233
234    if pk_values.iter().all(|v| v.is_some()) {
235        let values: Vec<Value> = pk_values.into_iter().map(|v| v.unwrap()).collect();
236        Some(ScanPlan::PkLookup { pk_values: values })
237    } else {
238        None
239    }
240}
241
242#[derive(PartialEq, Eq, PartialOrd, Ord)]
243struct IndexScore {
244    num_equality: usize,
245    has_range: bool,
246    is_unique: bool,
247}
248
249fn try_best_index(
250    schema: &TableSchema,
251    predicates: &[Option<SimplePredicate>],
252) -> Option<ScanPlan> {
253    let mut best_score: Option<IndexScore> = None;
254    let mut best_plan: Option<ScanPlan> = None;
255
256    for idx in &schema.indices {
257        if let Some((score, plan)) = try_index_scan(schema, idx, predicates) {
258            if best_score.is_none() || score > *best_score.as_ref().unwrap() {
259                best_score = Some(score);
260                best_plan = Some(plan);
261            }
262        }
263    }
264
265    best_plan
266}
267
268fn try_index_scan(
269    schema: &TableSchema,
270    idx: &IndexDef,
271    predicates: &[Option<SimplePredicate>],
272) -> Option<(IndexScore, ScanPlan)> {
273    let mut used = Vec::new();
274    let mut equality_values: Vec<Value> = Vec::new();
275    let mut range_conds: Vec<(BinOp, Value)> = Vec::new();
276
277    for &col_idx in &idx.columns {
278        let mut found_eq = false;
279        for (i, pred) in predicates.iter().enumerate() {
280            if used.contains(&i) {
281                continue;
282            }
283            if let Some(sp) = pred {
284                if sp.col_idx == col_idx as usize && sp.op == BinOp::Eq {
285                    equality_values.push(sp.value.clone());
286                    used.push(i);
287                    found_eq = true;
288                    break;
289                }
290            }
291        }
292        if !found_eq {
293            for (i, pred) in predicates.iter().enumerate() {
294                if used.contains(&i) {
295                    continue;
296                }
297                if let Some(sp) = pred {
298                    if sp.col_idx == col_idx as usize && is_range_op(sp.op) {
299                        range_conds.push((sp.op, sp.value.clone()));
300                        used.push(i);
301                    }
302                }
303            }
304            break;
305        }
306    }
307
308    if equality_values.is_empty() && range_conds.is_empty() {
309        return None;
310    }
311
312    let score = IndexScore {
313        num_equality: equality_values.len(),
314        has_range: !range_conds.is_empty(),
315        is_unique: idx.unique,
316    };
317
318    let prefix = encode_composite_key(&equality_values);
319    let idx_table = TableSchema::index_table_name(&schema.name, &idx.name);
320
321    Some((
322        score,
323        ScanPlan::IndexScan {
324            index_name: idx.name.clone(),
325            idx_table,
326            prefix,
327            num_prefix_cols: equality_values.len(),
328            range_conds,
329            is_unique: idx.unique,
330            index_columns: idx.columns.clone(),
331        },
332    ))
333}
334
335pub fn describe_plan(plan: &ScanPlan, table_schema: &TableSchema) -> String {
336    match plan {
337        ScanPlan::SeqScan => String::new(),
338
339        ScanPlan::PkLookup { pk_values } => {
340            let pk_cols: Vec<&str> = table_schema
341                .primary_key_columns
342                .iter()
343                .map(|&idx| table_schema.columns[idx as usize].name.as_str())
344                .collect();
345            let conditions: Vec<String> = pk_cols
346                .iter()
347                .zip(pk_values.iter())
348                .map(|(col, val)| format!("{col} = {}", format_value(val)))
349                .collect();
350            format!("USING PRIMARY KEY ({})", conditions.join(", "))
351        }
352
353        ScanPlan::PkRangeScan { range_conds, .. } => {
354            let pk_col = &table_schema.columns[table_schema.primary_key_columns[0] as usize].name;
355            let conditions: Vec<String> = range_conds
356                .iter()
357                .map(|(op, val)| format!("{pk_col} {} {}", op_symbol(*op), format_value(val)))
358                .collect();
359            format!("USING PRIMARY KEY RANGE ({})", conditions.join(", "))
360        }
361
362        ScanPlan::IndexScan {
363            index_name,
364            num_prefix_cols,
365            range_conds,
366            index_columns,
367            ..
368        } => {
369            let mut conditions = Vec::new();
370            for &col in index_columns.iter().take(*num_prefix_cols) {
371                let col_idx = col as usize;
372                let col_name = &table_schema.columns[col_idx].name;
373                conditions.push(format!("{col_name} = ?"));
374            }
375            if !range_conds.is_empty() && *num_prefix_cols < index_columns.len() {
376                let col_idx = index_columns[*num_prefix_cols] as usize;
377                let col_name = &table_schema.columns[col_idx].name;
378                for (op, _) in range_conds {
379                    conditions.push(format!("{col_name} {} ?", op_symbol(*op)));
380                }
381            }
382            if conditions.is_empty() {
383                format!("USING INDEX {index_name}")
384            } else {
385                format!("USING INDEX {index_name} ({})", conditions.join(", "))
386            }
387        }
388    }
389}
390
391fn format_value(val: &Value) -> String {
392    match val {
393        Value::Null => "NULL".into(),
394        Value::Integer(i) => i.to_string(),
395        Value::Real(f) => format!("{f}"),
396        Value::Text(s) => format!("'{s}'"),
397        Value::Blob(_) => "BLOB".into(),
398        Value::Boolean(b) => b.to_string(),
399        Value::Date(d) => format!("DATE '{}'", crate::datetime::format_date(*d)),
400        Value::Time(t) => format!("TIME '{}'", crate::datetime::format_time(*t)),
401        Value::Timestamp(t) => format!("TIMESTAMP '{}'", crate::datetime::format_timestamp(*t)),
402        Value::Interval {
403            months,
404            days,
405            micros,
406        } => format!(
407            "INTERVAL '{}'",
408            crate::datetime::format_interval(*months, *days, *micros)
409        ),
410    }
411}
412
413fn op_symbol(op: BinOp) -> &'static str {
414    match op {
415        BinOp::Lt => "<",
416        BinOp::LtEq => "<=",
417        BinOp::Gt => ">",
418        BinOp::GtEq => ">=",
419        BinOp::Eq => "=",
420        BinOp::NotEq => "!=",
421        _ => "?",
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428    use crate::types::{ColumnDef, DataType};
429
430    fn col(name: &str, dt: DataType, nullable: bool, pos: u16) -> ColumnDef {
431        ColumnDef {
432            name: name.into(),
433            data_type: dt,
434            nullable,
435            position: pos,
436            default_expr: None,
437            default_sql: None,
438            check_expr: None,
439            check_sql: None,
440            check_name: None,
441            is_with_timezone: false,
442        }
443    }
444
445    fn test_schema() -> TableSchema {
446        TableSchema::new(
447            "users".into(),
448            vec![
449                col("id", DataType::Integer, false, 0),
450                col("name", DataType::Text, true, 1),
451                col("age", DataType::Integer, true, 2),
452                col("email", DataType::Text, true, 3),
453            ],
454            vec![0],
455            vec![
456                IndexDef {
457                    name: "idx_name".into(),
458                    columns: vec![1],
459                    unique: false,
460                },
461                IndexDef {
462                    name: "idx_email".into(),
463                    columns: vec![3],
464                    unique: true,
465                },
466                IndexDef {
467                    name: "idx_name_age".into(),
468                    columns: vec![1, 2],
469                    unique: false,
470                },
471            ],
472            vec![],
473            vec![],
474        )
475    }
476
477    #[test]
478    fn no_where_is_seq_scan() {
479        let schema = test_schema();
480        let plan = plan_select(&schema, &None);
481        assert!(matches!(plan, ScanPlan::SeqScan));
482    }
483
484    #[test]
485    fn pk_equality_is_pk_lookup() {
486        let schema = test_schema();
487        let where_clause = Some(Expr::BinaryOp {
488            left: Box::new(Expr::Column("id".into())),
489            op: BinOp::Eq,
490            right: Box::new(Expr::Literal(Value::Integer(42))),
491        });
492        let plan = plan_select(&schema, &where_clause);
493        match plan {
494            ScanPlan::PkLookup { pk_values } => {
495                assert_eq!(pk_values, vec![Value::Integer(42)]);
496            }
497            other => panic!("expected PkLookup, got {other:?}"),
498        }
499    }
500
501    #[test]
502    fn unique_index_equality() {
503        let schema = test_schema();
504        let where_clause = Some(Expr::BinaryOp {
505            left: Box::new(Expr::Column("email".into())),
506            op: BinOp::Eq,
507            right: Box::new(Expr::Literal(Value::Text("alice@test.com".into()))),
508        });
509        let plan = plan_select(&schema, &where_clause);
510        match plan {
511            ScanPlan::IndexScan {
512                index_name,
513                is_unique,
514                num_prefix_cols,
515                ..
516            } => {
517                assert_eq!(index_name, "idx_email");
518                assert!(is_unique);
519                assert_eq!(num_prefix_cols, 1);
520            }
521            other => panic!("expected IndexScan, got {other:?}"),
522        }
523    }
524
525    #[test]
526    fn non_unique_index_equality() {
527        let schema = test_schema();
528        let where_clause = Some(Expr::BinaryOp {
529            left: Box::new(Expr::Column("name".into())),
530            op: BinOp::Eq,
531            right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
532        });
533        let plan = plan_select(&schema, &where_clause);
534        match plan {
535            ScanPlan::IndexScan {
536                index_name,
537                num_prefix_cols,
538                ..
539            } => {
540                assert!(index_name == "idx_name" || index_name == "idx_name_age");
541                assert_eq!(num_prefix_cols, 1);
542            }
543            other => panic!("expected IndexScan, got {other:?}"),
544        }
545    }
546
547    #[test]
548    fn composite_index_full_prefix() {
549        let schema = test_schema();
550        let where_clause = Some(Expr::BinaryOp {
551            left: Box::new(Expr::BinaryOp {
552                left: Box::new(Expr::Column("name".into())),
553                op: BinOp::Eq,
554                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
555            }),
556            op: BinOp::And,
557            right: Box::new(Expr::BinaryOp {
558                left: Box::new(Expr::Column("age".into())),
559                op: BinOp::Eq,
560                right: Box::new(Expr::Literal(Value::Integer(30))),
561            }),
562        });
563        let plan = plan_select(&schema, &where_clause);
564        match plan {
565            ScanPlan::IndexScan {
566                index_name,
567                num_prefix_cols,
568                ..
569            } => {
570                assert_eq!(index_name, "idx_name_age");
571                assert_eq!(num_prefix_cols, 2);
572            }
573            other => panic!("expected IndexScan, got {other:?}"),
574        }
575    }
576
577    #[test]
578    fn range_scan_on_indexed_column() {
579        let schema = test_schema();
580        let where_clause = Some(Expr::BinaryOp {
581            left: Box::new(Expr::Column("name".into())),
582            op: BinOp::Gt,
583            right: Box::new(Expr::Literal(Value::Text("M".into()))),
584        });
585        let plan = plan_select(&schema, &where_clause);
586        match plan {
587            ScanPlan::IndexScan {
588                range_conds,
589                num_prefix_cols,
590                ..
591            } => {
592                assert_eq!(num_prefix_cols, 0);
593                assert_eq!(range_conds.len(), 1);
594                assert_eq!(range_conds[0].0, BinOp::Gt);
595            }
596            other => panic!("expected IndexScan, got {other:?}"),
597        }
598    }
599
600    #[test]
601    fn composite_equality_plus_range() {
602        let schema = test_schema();
603        let where_clause = Some(Expr::BinaryOp {
604            left: Box::new(Expr::BinaryOp {
605                left: Box::new(Expr::Column("name".into())),
606                op: BinOp::Eq,
607                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
608            }),
609            op: BinOp::And,
610            right: Box::new(Expr::BinaryOp {
611                left: Box::new(Expr::Column("age".into())),
612                op: BinOp::Gt,
613                right: Box::new(Expr::Literal(Value::Integer(25))),
614            }),
615        });
616        let plan = plan_select(&schema, &where_clause);
617        match plan {
618            ScanPlan::IndexScan {
619                index_name,
620                num_prefix_cols,
621                range_conds,
622                ..
623            } => {
624                assert_eq!(index_name, "idx_name_age");
625                assert_eq!(num_prefix_cols, 1);
626                assert_eq!(range_conds.len(), 1);
627            }
628            other => panic!("expected IndexScan, got {other:?}"),
629        }
630    }
631
632    #[test]
633    fn or_condition_falls_back_to_seq_scan() {
634        let schema = test_schema();
635        let where_clause = Some(Expr::BinaryOp {
636            left: Box::new(Expr::BinaryOp {
637                left: Box::new(Expr::Column("name".into())),
638                op: BinOp::Eq,
639                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
640            }),
641            op: BinOp::Or,
642            right: Box::new(Expr::BinaryOp {
643                left: Box::new(Expr::Column("name".into())),
644                op: BinOp::Eq,
645                right: Box::new(Expr::Literal(Value::Text("Bob".into()))),
646            }),
647        });
648        let plan = plan_select(&schema, &where_clause);
649        assert!(matches!(plan, ScanPlan::SeqScan));
650    }
651
652    #[test]
653    fn non_indexed_column_is_seq_scan() {
654        let schema = test_schema();
655        let where_clause = Some(Expr::BinaryOp {
656            left: Box::new(Expr::Column("age".into())),
657            op: BinOp::Eq,
658            right: Box::new(Expr::Literal(Value::Integer(30))),
659        });
660        let plan = plan_select(&schema, &where_clause);
661        assert!(matches!(plan, ScanPlan::SeqScan));
662    }
663
664    #[test]
665    fn reversed_literal_column() {
666        let schema = test_schema();
667        let where_clause = Some(Expr::BinaryOp {
668            left: Box::new(Expr::Literal(Value::Integer(42))),
669            op: BinOp::Eq,
670            right: Box::new(Expr::Column("id".into())),
671        });
672        let plan = plan_select(&schema, &where_clause);
673        assert!(matches!(plan, ScanPlan::PkLookup { .. }));
674    }
675
676    #[test]
677    fn reversed_comparison_flips_op() {
678        let schema = test_schema();
679        let where_clause = Some(Expr::BinaryOp {
680            left: Box::new(Expr::Literal(Value::Integer(5))),
681            op: BinOp::Lt,
682            right: Box::new(Expr::Column("name".into())),
683        });
684        let plan = plan_select(&schema, &where_clause);
685        match plan {
686            ScanPlan::IndexScan { range_conds, .. } => {
687                assert_eq!(range_conds[0].0, BinOp::Gt);
688            }
689            other => panic!("expected IndexScan, got {other:?}"),
690        }
691    }
692
693    #[test]
694    fn prefers_unique_index() {
695        let schema = TableSchema::new(
696            "t".into(),
697            vec![
698                col("id", DataType::Integer, false, 0),
699                col("code", DataType::Text, false, 1),
700            ],
701            vec![0],
702            vec![
703                IndexDef {
704                    name: "idx_code".into(),
705                    columns: vec![1],
706                    unique: false,
707                },
708                IndexDef {
709                    name: "idx_code_uniq".into(),
710                    columns: vec![1],
711                    unique: true,
712                },
713            ],
714            vec![],
715            vec![],
716        );
717        let where_clause = Some(Expr::BinaryOp {
718            left: Box::new(Expr::Column("code".into())),
719            op: BinOp::Eq,
720            right: Box::new(Expr::Literal(Value::Text("X".into()))),
721        });
722        let plan = plan_select(&schema, &where_clause);
723        match plan {
724            ScanPlan::IndexScan {
725                index_name,
726                is_unique,
727                ..
728            } => {
729                assert_eq!(index_name, "idx_code_uniq");
730                assert!(is_unique);
731            }
732            other => panic!("expected IndexScan, got {other:?}"),
733        }
734    }
735
736    #[test]
737    fn prefers_more_equality_columns() {
738        let schema = test_schema();
739        let where_clause = Some(Expr::BinaryOp {
740            left: Box::new(Expr::BinaryOp {
741                left: Box::new(Expr::Column("name".into())),
742                op: BinOp::Eq,
743                right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
744            }),
745            op: BinOp::And,
746            right: Box::new(Expr::BinaryOp {
747                left: Box::new(Expr::Column("age".into())),
748                op: BinOp::Eq,
749                right: Box::new(Expr::Literal(Value::Integer(30))),
750            }),
751        });
752        let plan = plan_select(&schema, &where_clause);
753        match plan {
754            ScanPlan::IndexScan {
755                index_name,
756                num_prefix_cols,
757                ..
758            } => {
759                assert_eq!(index_name, "idx_name_age");
760                assert_eq!(num_prefix_cols, 2);
761            }
762            other => panic!("expected IndexScan, got {other:?}"),
763        }
764    }
765}