Skip to main content

citadel_sql/
planner.rs

1//! Query planner: chooses between seq scan, PK lookup, or index scan.
2
3use crate::encoding::encode_composite_key;
4use crate::parser::{BinOp, Expr};
5use crate::types::{IndexDef, IndexKey, IndexKind, InvertedKind, TableSchema, Value};
6
7/// Canonical form of an expression for symbolic-equivalence matching against expression indexes.
8/// Strips table qualifiers, lowercases identifiers and function names, sorts commutative operands.
9#[derive(Debug, Clone, PartialEq, Eq)]
10enum CanonicalExpr {
11    Literal(String),
12    Column(String),
13    Function {
14        name: String,
15        args: Vec<CanonicalExpr>,
16    },
17    BinaryOp {
18        op: BinOp,
19        operands: Vec<CanonicalExpr>,
20    },
21    UnaryOp {
22        op: crate::parser::UnaryOp,
23        operand: Box<CanonicalExpr>,
24    },
25    Cast {
26        expr: Box<CanonicalExpr>,
27        data_type: crate::types::DataType,
28    },
29    Other(String),
30}
31
32fn canonicalize(expr: &Expr) -> CanonicalExpr {
33    match expr {
34        Expr::Literal(v) => CanonicalExpr::Literal(format!("{v:?}")),
35        Expr::Column(name) => CanonicalExpr::Column(name.to_ascii_lowercase()),
36        Expr::QualifiedColumn { column, .. } => CanonicalExpr::Column(column.to_ascii_lowercase()),
37        Expr::Function { name, args, .. } => {
38            let canon_args: Vec<CanonicalExpr> = args.iter().map(canonicalize).collect();
39            CanonicalExpr::Function {
40                name: name.to_ascii_lowercase(),
41                args: canon_args,
42            }
43        }
44        Expr::BinaryOp { left, op, right } => {
45            let mut operands = vec![canonicalize(left), canonicalize(right)];
46            if is_commutative(*op) {
47                operands.sort_by_key(|e| format!("{e:?}"));
48            }
49            CanonicalExpr::BinaryOp { op: *op, operands }
50        }
51        Expr::UnaryOp { op, expr: inner } => CanonicalExpr::UnaryOp {
52            op: *op,
53            operand: Box::new(canonicalize(inner)),
54        },
55        Expr::Cast {
56            expr: inner,
57            data_type,
58        } => CanonicalExpr::Cast {
59            expr: Box::new(canonicalize(inner)),
60            data_type: *data_type,
61        },
62        Expr::Collate { expr: inner, .. } => canonicalize(inner),
63        other => CanonicalExpr::Other(format!("{other:?}")),
64    }
65}
66
67fn is_commutative(op: BinOp) -> bool {
68    matches!(op, BinOp::Add | BinOp::Mul | BinOp::And | BinOp::Or)
69}
70
71#[derive(Debug, Clone)]
72pub enum ScanPlan {
73    SeqScan,
74    PkLookup {
75        pk_values: Vec<Value>,
76        /// True when the pk equalities are the ENTIRE predicate. PkLookup is
77        /// otherwise a superset prefilter: consumers must re-apply the WHERE.
78        full_cover: bool,
79    },
80    PkRangeScan {
81        start_key: Vec<u8>,
82        range_conds: Vec<(BinOp, Value)>,
83        num_pk_cols: usize,
84        /// True when the range conds are the ENTIRE predicate. PkRangeScan is
85        /// otherwise a superset prefilter: consumers must re-apply the WHERE.
86        full_cover: bool,
87    },
88    IndexScan {
89        index_name: String,
90        idx_table: Vec<u8>,
91        prefix: Vec<u8>,
92        num_prefix_cols: usize,
93        range_conds: Vec<(BinOp, Value)>,
94        is_unique: bool,
95        index_columns: Vec<u16>,
96    },
97    InvertedScan {
98        kind: InvertedKind,
99        idx_table: Vec<u8>,
100        column_idx: u16,
101        probe_entries: Vec<Vec<u8>>,
102        recheck_expr: Expr,
103        recheck_needed: bool,
104    },
105}
106
107struct SimplePredicate {
108    col_idx: usize,
109    op: BinOp,
110    value: Value,
111}
112
113fn flatten_and(expr: &Expr) -> Vec<&Expr> {
114    match expr {
115        Expr::BinaryOp {
116            left,
117            op: BinOp::And,
118            right,
119        } => {
120            let mut v = flatten_and(left);
121            v.extend(flatten_and(right));
122            v
123        }
124        _ => vec![expr],
125    }
126}
127
128fn is_comparison(op: BinOp) -> bool {
129    matches!(
130        op,
131        BinOp::Eq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq
132    )
133}
134
135fn is_range_op(op: BinOp) -> bool {
136    matches!(op, BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq)
137}
138
139fn flip_op(op: BinOp) -> BinOp {
140    match op {
141        BinOp::Lt => BinOp::Gt,
142        BinOp::LtEq => BinOp::GtEq,
143        BinOp::Gt => BinOp::Lt,
144        BinOp::GtEq => BinOp::LtEq,
145        other => other,
146    }
147}
148
149fn resolve_column_name(expr: &Expr) -> Option<&str> {
150    match expr {
151        Expr::Column(name) => Some(name.as_str()),
152        Expr::QualifiedColumn { column, .. } => Some(column.as_str()),
153        _ => None,
154    }
155}
156
157fn resolve_literal(expr: &Expr) -> Option<Value> {
158    match expr {
159        Expr::Literal(v) => Some(v.clone()),
160        Expr::Parameter(n) => crate::eval::resolve_scoped_param(*n).ok(),
161        Expr::Function { .. } | Expr::Cast { .. } => {
162            let col_map = crate::eval::ColumnMap::new(&[]);
163            let ctx = crate::eval::EvalCtx::new(&col_map, &[]);
164            crate::eval::eval_expr(expr, &ctx).ok()
165        }
166        _ => None,
167    }
168}
169
170fn extract_simple_predicate(expr: &Expr, schema: &TableSchema) -> Option<SimplePredicate> {
171    match expr {
172        Expr::BinaryOp { left, op, right } if is_comparison(*op) => {
173            if let (Some(name), Some(val)) = (resolve_column_name(left), resolve_literal(right)) {
174                let col_idx = schema.column_index(name)?;
175                return Some(SimplePredicate {
176                    col_idx,
177                    op: *op,
178                    value: val,
179                });
180            }
181            if let (Some(val), Some(name)) = (resolve_literal(left), resolve_column_name(right)) {
182                let col_idx = schema.column_index(name)?;
183                return Some(SimplePredicate {
184                    col_idx,
185                    op: flip_op(*op),
186                    value: val,
187                });
188            }
189            None
190        }
191        _ => None,
192    }
193}
194
195/// Decompose BETWEEN into two range predicates for planner use.
196fn flatten_between(expr: &Expr, schema: &TableSchema, out: &mut Vec<SimplePredicate>) {
197    match expr {
198        Expr::Between {
199            expr: col_expr,
200            low,
201            high,
202            negated: false,
203        } => {
204            if let (Some(name), Some(lo), Some(hi)) = (
205                resolve_column_name(col_expr),
206                resolve_literal(low),
207                resolve_literal(high),
208            ) {
209                if let Some(col_idx) = schema.column_index(name) {
210                    out.push(SimplePredicate {
211                        col_idx,
212                        op: BinOp::GtEq,
213                        value: lo,
214                    });
215                    out.push(SimplePredicate {
216                        col_idx,
217                        op: BinOp::LtEq,
218                        value: hi,
219                    });
220                }
221            }
222        }
223        Expr::BinaryOp {
224            left,
225            op: BinOp::And,
226            right,
227        } => {
228            flatten_between(left, schema, out);
229            flatten_between(right, schema, out);
230        }
231        _ => {}
232    }
233}
234
235impl ScanPlan {
236    /// True when the plan consumed the entire WHERE. Every other plan yields
237    /// a row superset: consumers must re-apply the predicate per row.
238    pub fn covers_where(&self) -> bool {
239        match self {
240            ScanPlan::PkLookup { full_cover, .. } => *full_cover,
241            ScanPlan::PkRangeScan { full_cover, .. } => *full_cover,
242            _ => false,
243        }
244    }
245}
246
247pub fn plan_select(schema: &TableSchema, where_clause: &Option<Expr>) -> ScanPlan {
248    plan_select_inner(schema, where_clause, false)
249}
250
251pub fn plan_select_inverted(schema: &TableSchema, where_clause: &Option<Expr>) -> ScanPlan {
252    plan_select_inner(schema, where_clause, true)
253}
254
255fn plan_select_inner(
256    schema: &TableSchema,
257    where_clause: &Option<Expr>,
258    allow_inverted: bool,
259) -> ScanPlan {
260    let where_expr = match where_clause {
261        Some(e) => e,
262        None => return ScanPlan::SeqScan,
263    };
264
265    let predicates = flatten_and(where_expr);
266    let simple: Vec<Option<SimplePredicate>> = predicates
267        .iter()
268        .map(|p| extract_simple_predicate(p, schema))
269        .collect();
270
271    if let Some(plan) = try_pk_lookup(schema, &simple) {
272        return plan;
273    }
274
275    let mut range_preds: Vec<SimplePredicate> = simple
276        .iter()
277        .filter_map(|p| {
278            let p = p.as_ref()?;
279            if is_range_op(p.op) {
280                Some(SimplePredicate {
281                    col_idx: p.col_idx,
282                    op: p.op,
283                    value: p.value.clone(),
284                })
285            } else {
286                None
287            }
288        })
289        .collect();
290    flatten_between(where_expr, schema, &mut range_preds);
291
292    if let Some(plan) = try_pk_range_scan(
293        schema,
294        &range_preds,
295        pk_range_full_cover(schema, &predicates, &simple),
296    ) {
297        return plan;
298    }
299
300    if allow_inverted {
301        if let Some(plan) = try_inverted_scan(schema, where_expr) {
302            return plan;
303        }
304    }
305
306    if let Some(plan) = try_best_index(schema, where_expr, &simple) {
307        return plan;
308    }
309
310    ScanPlan::SeqScan
311}
312
313fn try_inverted_scan(schema: &TableSchema, where_expr: &Expr) -> Option<ScanPlan> {
314    use crate::parser::BinOp as B;
315    let (col_idx, rhs_val, op) = match where_expr {
316        Expr::BinaryOp {
317            left,
318            op: B::JsonContains,
319            right,
320        } => {
321            let name = resolve_column_name(left)?;
322            let col_idx = schema.column_index(name)? as u16;
323            let rhs = resolve_literal(right)?;
324            (col_idx, rhs, B::JsonContains)
325        }
326        Expr::BinaryOp {
327            left,
328            op: B::JsonPathMatch,
329            right,
330        } => {
331            let name = resolve_column_name(left)?;
332            let col_idx = schema.column_index(name)? as u16;
333            let rhs = resolve_literal(right)?;
334            (col_idx, rhs, B::JsonPathMatch)
335        }
336        _ => return None,
337    };
338    let idx = schema.indices.iter().find(|i| {
339        matches!(i.kind, IndexKind::Inverted(_))
340            && i.column_positions_iter()
341                .next()
342                .is_some_and(|c| c == col_idx)
343            && i.predicate_expr.is_none()
344    })?;
345    let kind = match idx.kind {
346        IndexKind::Inverted(k) => k,
347        _ => return None,
348    };
349    match (kind, op) {
350        (InvertedKind::Gin(_), B::JsonContains) => {}
351        (InvertedKind::Fts { .. }, B::JsonPathMatch) => {}
352        _ => return None,
353    }
354    let probe_entries = extract_inverted_probe(&rhs_val, kind)?;
355    if probe_entries.is_empty() {
356        return None;
357    }
358    let recheck_needed = inverted_recheck_needed(kind, &rhs_val);
359    let idx_table = TableSchema::index_table_name(&schema.name, &idx.name);
360    Some(ScanPlan::InvertedScan {
361        kind,
362        idx_table,
363        column_idx: col_idx,
364        probe_entries,
365        recheck_expr: where_expr.clone(),
366        recheck_needed,
367    })
368}
369
370fn inverted_recheck_needed(kind: InvertedKind, rhs: &Value) -> bool {
371    match kind {
372        InvertedKind::Gin(_) => true,
373        InvertedKind::Fts { .. } => match rhs {
374            Value::TsQuery(bytes) => match crate::fts::TsQueryAst::decode(bytes) {
375                Ok(ast) => !fts_ast_exact_for_index(&ast),
376                Err(_) => true,
377            },
378            _ => true,
379        },
380        InvertedKind::Ann { .. } => true,
381    }
382}
383
384fn fts_ast_exact_for_index(ast: &crate::fts::TsQueryAst) -> bool {
385    use crate::fts::TsQueryAst;
386    match ast {
387        TsQueryAst::Lexeme {
388            prefix: false,
389            weight_mask: 0,
390            ..
391        } => true,
392        TsQueryAst::Lexeme { .. } => false,
393        TsQueryAst::And(l, r) => fts_ast_exact_for_index(l) && fts_ast_exact_for_index(r),
394        _ => false,
395    }
396}
397
398pub(crate) fn fts_ast_is_pure_phrase(ast: &crate::fts::TsQueryAst) -> bool {
399    use crate::fts::TsQueryAst;
400    match ast {
401        TsQueryAst::Lexeme {
402            prefix: false,
403            weight_mask: 0,
404            ..
405        } => true,
406        TsQueryAst::Phrase { left, right, .. } => {
407            fts_ast_is_pure_phrase(left) && fts_ast_is_pure_phrase(right)
408        }
409        _ => false,
410    }
411}
412
413fn extract_inverted_probe(rhs: &Value, kind: InvertedKind) -> Option<Vec<Vec<u8>>> {
414    use crate::types::GinOpsClass;
415    match kind {
416        InvertedKind::Gin(ops) => {
417            let entries = crate::json::extract_gin_entries(rhs, ops).ok()?;
418            let filtered: Vec<Vec<u8>> = match ops {
419                GinOpsClass::JsonbOps => entries
420                    .into_iter()
421                    .filter(|e| !matches!(e.first(), Some(&0x01)))
422                    .collect(),
423                GinOpsClass::JsonbPathOps => entries,
424            };
425            Some(filtered)
426        }
427        InvertedKind::Fts { .. } => match rhs {
428            Value::TsQuery(bytes) => {
429                let ast = crate::fts::TsQueryAst::decode(bytes).ok()?;
430                let required = fts_required_lexemes(&ast)?;
431                if required.is_empty() {
432                    None
433                } else {
434                    Some(required)
435                }
436            }
437            _ => None,
438        },
439        InvertedKind::Ann { .. } => None,
440    }
441}
442
443fn fts_required_lexemes(ast: &crate::fts::TsQueryAst) -> Option<Vec<Vec<u8>>> {
444    let mut out: std::collections::BTreeSet<Vec<u8>> = std::collections::BTreeSet::new();
445    let ok = collect_required(ast, &mut out);
446    if !ok || out.is_empty() {
447        None
448    } else {
449        Some(out.into_iter().collect())
450    }
451}
452
453fn collect_required(
454    ast: &crate::fts::TsQueryAst,
455    out: &mut std::collections::BTreeSet<Vec<u8>>,
456) -> bool {
457    use crate::fts::TsQueryAst;
458    match ast {
459        TsQueryAst::Lexeme { prefix, .. } if *prefix => false,
460        TsQueryAst::Lexeme { lexeme, .. } => {
461            out.insert(lexeme.clone());
462            true
463        }
464        TsQueryAst::And(l, r) => {
465            let lo = collect_required(l, out);
466            let ro = collect_required(r, out);
467            lo || ro
468        }
469        TsQueryAst::Or(..) => false,
470        TsQueryAst::Not(_) => false,
471        TsQueryAst::Phrase { left, right, .. } => {
472            let lo = collect_required(left, out);
473            let ro = collect_required(right, out);
474            lo && ro
475        }
476    }
477}
478
479fn try_pk_range_scan(
480    schema: &TableSchema,
481    range_preds: &[SimplePredicate],
482    full_cover: bool,
483) -> Option<ScanPlan> {
484    if schema.primary_key_columns.len() != 1 {
485        return None; // Only single-column PK for now
486    }
487    let pk_col = schema.primary_key_columns[0] as usize;
488    let conds: Vec<(BinOp, Value)> = range_preds
489        .iter()
490        .filter(|p| p.col_idx == pk_col)
491        .map(|p| (p.op, p.value.clone()))
492        .collect();
493    if conds.is_empty() {
494        return None;
495    }
496    let start_key = conds
497        .iter()
498        .filter(|(op, _)| matches!(op, BinOp::GtEq | BinOp::Gt))
499        .map(|(_, v)| encode_composite_key(std::slice::from_ref(v)))
500        .min_by(|a, b| a.cmp(b))
501        .unwrap_or_default();
502    Some(ScanPlan::PkRangeScan {
503        start_key,
504        range_conds: conds,
505        num_pk_cols: 1,
506        full_cover,
507    })
508}
509
510/// True when an IndexScan's prefix+range conds consume the ENTIRE predicate,
511/// each prefix column exactly once and no NULL bounds (NULL never matches).
512pub fn index_scan_full_cover(schema: &TableSchema, where_expr: &Expr, plan: &ScanPlan) -> bool {
513    let ScanPlan::IndexScan {
514        num_prefix_cols,
515        range_conds,
516        index_columns,
517        ..
518    } = plan
519    else {
520        return false;
521    };
522    let prefix_cols = &index_columns[..*num_prefix_cols];
523    let range_col = index_columns.get(*num_prefix_cols);
524    let mut eq_cols: Vec<u16> = Vec::with_capacity(*num_prefix_cols);
525    let mut range_seen = 0usize;
526    for c in flatten_and(where_expr) {
527        let Some(p) = extract_simple_predicate(c, schema) else {
528            return false;
529        };
530        if p.value.is_null() {
531            return false;
532        }
533        let col = p.col_idx as u16;
534        if p.op == BinOp::Eq && prefix_cols.contains(&col) {
535            if eq_cols.contains(&col) {
536                return false;
537            }
538            eq_cols.push(col);
539            continue;
540        }
541        if is_range_op(p.op) && range_col == Some(&col) {
542            range_seen += 1;
543            continue;
544        }
545        return false;
546    }
547    eq_cols.len() == *num_prefix_cols && range_seen == range_conds.len()
548}
549
550/// True when every WHERE conjunct is a pk range or literal pk BETWEEN.
551fn pk_range_full_cover(
552    schema: &TableSchema,
553    predicates: &[&Expr],
554    simple: &[Option<SimplePredicate>],
555) -> bool {
556    let [pk_col] = schema.primary_key_columns[..] else {
557        return false;
558    };
559    let pk_col = pk_col as usize;
560    predicates.iter().zip(simple).all(|(raw, s)| match s {
561        Some(p) => p.col_idx == pk_col && is_range_op(p.op),
562        None => matches!(raw, Expr::Between { expr: col_expr, low, high, negated: false }
563            if resolve_column_name(col_expr).and_then(|n| schema.column_index(n)) == Some(pk_col)
564                && resolve_literal(low).is_some()
565                && resolve_literal(high).is_some()),
566    })
567}
568
569fn try_pk_lookup(schema: &TableSchema, predicates: &[Option<SimplePredicate>]) -> Option<ScanPlan> {
570    let pk_cols = &schema.primary_key_columns;
571    // No PK → fall through to SeqScan. An empty-key PkLookup would silently match 0 rows.
572    if pk_cols.is_empty() {
573        return None;
574    }
575    let mut pk_values: Vec<Option<Value>> = vec![None; pk_cols.len()];
576
577    for pred in predicates.iter().flatten() {
578        if pred.op == BinOp::Eq {
579            if let Some(pk_pos) = pk_cols.iter().position(|&c| c == pred.col_idx as u16) {
580                pk_values[pk_pos] = Some(pred.value.clone());
581            }
582        }
583    }
584
585    if pk_values.iter().all(|v| v.is_some()) {
586        let values: Vec<Value> = pk_values.into_iter().map(|v| v.unwrap()).collect();
587        // A duplicated pk col would leave another unfilled and return None.
588        let full_cover = predicates.len() == pk_cols.len()
589            && predicates.iter().all(|s| {
590                s.as_ref()
591                    .is_some_and(|p| p.op == BinOp::Eq && pk_cols.contains(&(p.col_idx as u16)))
592            });
593        Some(ScanPlan::PkLookup {
594            pk_values: values,
595            full_cover,
596        })
597    } else {
598        None
599    }
600}
601
602#[derive(PartialEq, Eq, PartialOrd, Ord)]
603struct IndexScore {
604    num_equality: usize,
605    has_range: bool,
606    is_unique: bool,
607}
608
609fn try_best_index(
610    schema: &TableSchema,
611    where_expr: &Expr,
612    predicates: &[Option<SimplePredicate>],
613) -> Option<ScanPlan> {
614    let mut best_score: Option<IndexScore> = None;
615    let mut best_plan: Option<ScanPlan> = None;
616
617    let conjuncts = flatten_and(where_expr);
618    for idx in &schema.indices {
619        if !partial_predicate_implied(idx, where_expr, &conjuncts) {
620            continue;
621        }
622        if let Some((score, plan)) = try_index_scan(schema, idx, predicates) {
623            if best_score.is_none() || score > *best_score.as_ref().unwrap() {
624                best_score = Some(score);
625                best_plan = Some(plan);
626            }
627        }
628        if !idx.is_pure_column_index() {
629            if let Some((score, plan)) = try_expr_index_scan(schema, idx, &conjuncts) {
630                if best_score.is_none() || score > *best_score.as_ref().unwrap() {
631                    best_score = Some(score);
632                    best_plan = Some(plan);
633                }
634            }
635        }
636    }
637
638    best_plan
639}
640
641fn try_expr_index_scan(
642    schema: &TableSchema,
643    idx: &IndexDef,
644    conjuncts: &[&Expr],
645) -> Option<(IndexScore, ScanPlan)> {
646    // Only equality on the first expression key supported (`WHERE LOWER(email) = ?`).
647    let first_key = idx.keys.first()?;
648    let key_expr = match first_key {
649        IndexKey::Expr { expr, .. } => expr,
650        IndexKey::Column { .. } => return None,
651    };
652    let canonical_key = canonicalize(key_expr);
653
654    let mut matched: Option<Value> = None;
655    for conj in conjuncts {
656        if let Expr::BinaryOp {
657            left,
658            op: BinOp::Eq,
659            right,
660        } = conj
661        {
662            let (expr_side, value_side) = match (left.as_ref(), right.as_ref()) {
663                (Expr::Literal(v), other) | (other, Expr::Literal(v)) => (other, v.clone()),
664                _ => continue,
665            };
666            if canonicalize(expr_side) == canonical_key {
667                matched = Some(value_side);
668                break;
669            }
670        }
671    }
672
673    let value = matched?;
674    let score = IndexScore {
675        num_equality: 1,
676        has_range: false,
677        is_unique: idx.unique,
678    };
679    let prefix = encode_composite_key(&[value]);
680    let idx_table = TableSchema::index_table_name(&schema.name, &idx.name);
681    Some((
682        score,
683        ScanPlan::IndexScan {
684            index_name: idx.name.clone(),
685            idx_table,
686            prefix,
687            num_prefix_cols: 1,
688            range_conds: vec![],
689            is_unique: idx.unique,
690            index_columns: vec![],
691        },
692    ))
693}
694
695fn partial_predicate_implied(idx: &IndexDef, where_expr: &Expr, conjuncts: &[&Expr]) -> bool {
696    let Some(pred) = idx.predicate_expr.as_ref() else {
697        return true;
698    };
699    if expr_structurally_eq(pred, where_expr) {
700        return true;
701    }
702    if conjuncts.iter().any(|c| expr_structurally_eq(pred, c)) {
703        return true;
704    }
705    if let Expr::IsNotNull(target) = pred {
706        if let Expr::Column(col) = target.as_ref() {
707            return conjuncts.iter().any(|c| conjunct_proves_not_null(c, col));
708        }
709    }
710    false
711}
712
713fn expr_structurally_eq(a: &Expr, b: &Expr) -> bool {
714    format!("{a:?}") == format!("{b:?}")
715}
716
717fn conjunct_proves_not_null(expr: &Expr, col: &str) -> bool {
718    let mentions = |e: &Expr| matches!(e, Expr::Column(n) if n.eq_ignore_ascii_case(col));
719    match expr {
720        Expr::BinaryOp {
721            left,
722            op: BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq,
723            right,
724        } => mentions(left) || mentions(right),
725        Expr::IsNotNull(inner) => mentions(inner),
726        _ => false,
727    }
728}
729
730/// Fold a probe value the way index keys are folded at write time
731/// (`encode_key_value_collated_into`), so probe bytes match stored key bytes.
732fn fold_probe_value(value: Value, coll: crate::types::Collation) -> Value {
733    match (&value, coll) {
734        (Value::Text(s), crate::types::Collation::NoCase) => Value::Text(s.to_ascii_lowercase()),
735        (Value::Text(s), crate::types::Collation::Rtrim) => {
736            Value::Text(s.trim_end_matches(' ').into())
737        }
738        _ => value,
739    }
740}
741
742fn try_index_scan(
743    schema: &TableSchema,
744    idx: &IndexDef,
745    predicates: &[Option<SimplePredicate>],
746) -> Option<(IndexScore, ScanPlan)> {
747    let mut used = Vec::new();
748    let mut equality_values: Vec<Value> = Vec::new();
749    let mut range_conds: Vec<(BinOp, Value)> = Vec::new();
750
751    // Expression-key indexes go through `try_expr_index_scan`, not the column path.
752    if !idx.is_pure_column_index() {
753        return None;
754    }
755    let idx_columns = idx.columns_vec();
756    for (pos, &col_idx) in idx_columns.iter().enumerate() {
757        // Keys are stored folded by the component collation; usable only when
758        // it matches the column's comparison collation, probes fold to match.
759        let coll = idx.collation_at(pos);
760        if coll != schema.columns[col_idx as usize].collation {
761            break;
762        }
763        let mut found_eq = false;
764        for (i, pred) in predicates.iter().enumerate() {
765            if used.contains(&i) {
766                continue;
767            }
768            if let Some(sp) = pred {
769                if sp.col_idx == col_idx as usize && sp.op == BinOp::Eq {
770                    equality_values.push(fold_probe_value(sp.value.clone(), coll));
771                    used.push(i);
772                    found_eq = true;
773                    break;
774                }
775            }
776        }
777        if !found_eq {
778            for (i, pred) in predicates.iter().enumerate() {
779                if used.contains(&i) {
780                    continue;
781                }
782                if let Some(sp) = pred {
783                    if sp.col_idx == col_idx as usize && is_range_op(sp.op) {
784                        range_conds.push((sp.op, fold_probe_value(sp.value.clone(), coll)));
785                        used.push(i);
786                    }
787                }
788            }
789            break;
790        }
791    }
792
793    if equality_values.is_empty() && range_conds.is_empty() {
794        return None;
795    }
796
797    let score = IndexScore {
798        num_equality: equality_values.len(),
799        has_range: !range_conds.is_empty(),
800        is_unique: idx.unique,
801    };
802
803    let prefix = encode_composite_key(&equality_values);
804    let idx_table = TableSchema::index_table_name(&schema.name, &idx.name);
805
806    Some((
807        score,
808        ScanPlan::IndexScan {
809            index_name: idx.name.clone(),
810            idx_table,
811            prefix,
812            num_prefix_cols: equality_values.len(),
813            range_conds,
814            is_unique: idx.unique,
815            index_columns: idx_columns.clone(),
816        },
817    ))
818}
819
820pub fn describe_plan(plan: &ScanPlan, table_schema: &TableSchema) -> String {
821    match plan {
822        ScanPlan::SeqScan => String::new(),
823
824        ScanPlan::PkLookup { pk_values, .. } => {
825            let pk_cols: Vec<&str> = table_schema
826                .primary_key_columns
827                .iter()
828                .map(|&idx| table_schema.columns[idx as usize].name.as_str())
829                .collect();
830            let conditions: Vec<String> = pk_cols
831                .iter()
832                .zip(pk_values.iter())
833                .map(|(col, val)| format!("{col} = {}", format_value(val)))
834                .collect();
835            format!("USING PRIMARY KEY ({})", conditions.join(", "))
836        }
837
838        ScanPlan::PkRangeScan { range_conds, .. } => {
839            let pk_col = &table_schema.columns[table_schema.primary_key_columns[0] as usize].name;
840            let conditions: Vec<String> = range_conds
841                .iter()
842                .map(|(op, val)| format!("{pk_col} {} {}", op_symbol(*op), format_value(val)))
843                .collect();
844            format!("USING PRIMARY KEY RANGE ({})", conditions.join(", "))
845        }
846
847        ScanPlan::IndexScan {
848            index_name,
849            num_prefix_cols,
850            range_conds,
851            index_columns,
852            ..
853        } => {
854            let mut conditions = Vec::new();
855            for &col in index_columns.iter().take(*num_prefix_cols) {
856                let col_idx = col as usize;
857                let col_name = &table_schema.columns[col_idx].name;
858                conditions.push(format!("{col_name} = ?"));
859            }
860            if !range_conds.is_empty() && *num_prefix_cols < index_columns.len() {
861                let col_idx = index_columns[*num_prefix_cols] as usize;
862                let col_name = &table_schema.columns[col_idx].name;
863                for (op, _) in range_conds {
864                    conditions.push(format!("{col_name} {} ?", op_symbol(*op)));
865                }
866            }
867            if conditions.is_empty() {
868                format!("USING INDEX {index_name}")
869            } else {
870                format!("USING INDEX {index_name} ({})", conditions.join(", "))
871            }
872        }
873
874        ScanPlan::InvertedScan { .. } => "USING INVERTED INDEX".to_string(),
875    }
876}
877
878fn format_value(val: &Value) -> String {
879    match val {
880        Value::Null => "NULL".into(),
881        Value::Integer(i) => i.to_string(),
882        Value::Real(f) => format!("{f}"),
883        Value::Text(s) => format!("'{s}'"),
884        Value::Blob(_) => "BLOB".into(),
885        Value::Boolean(b) => b.to_string(),
886        Value::Date(d) => format!("DATE '{}'", crate::datetime::format_date(*d)),
887        Value::Time(t) => format!("TIME '{}'", crate::datetime::format_time(*t)),
888        Value::Timestamp(t) => format!("TIMESTAMP '{}'", crate::datetime::format_timestamp(*t)),
889        Value::Interval {
890            months,
891            days,
892            micros,
893        } => format!(
894            "INTERVAL '{}'",
895            crate::datetime::format_interval(*months, *days, *micros)
896        ),
897        Value::Json(s) => format!("JSON '{s}'"),
898        Value::Jsonb(_) => "JSONB '<binary>'".into(),
899        Value::TsVector(_) => "TSVECTOR '<binary>'".into(),
900        Value::TsQuery(_) => "TSQUERY '<binary>'".into(),
901        Value::Array(_) => val.to_string(),
902        Value::Vector(v) => format!("VECTOR({})", v.len()),
903    }
904}
905
906fn op_symbol(op: BinOp) -> &'static str {
907    match op {
908        BinOp::Lt => "<",
909        BinOp::LtEq => "<=",
910        BinOp::Gt => ">",
911        BinOp::GtEq => ">=",
912        BinOp::Eq => "=",
913        BinOp::NotEq => "!=",
914        _ => "?",
915    }
916}
917
918#[cfg(test)]
919#[path = "planner_tests.rs"]
920mod tests;