1use crate::encoding::encode_composite_key;
4use crate::parser::{BinOp, Expr};
5use crate::types::{IndexDef, TableSchema, Value};
6
7#[derive(Debug, Clone)]
8pub enum ScanPlan {
9 SeqScan,
10 PkLookup {
11 pk_values: Vec<Value>,
12 },
13 PkRangeScan {
14 start_key: Vec<u8>,
15 range_conds: Vec<(BinOp, Value)>,
16 num_pk_cols: usize,
17 },
18 IndexScan {
19 index_name: String,
20 idx_table: Vec<u8>,
21 prefix: Vec<u8>,
22 num_prefix_cols: usize,
23 range_conds: Vec<(BinOp, Value)>,
24 is_unique: bool,
25 index_columns: Vec<u16>,
26 },
27}
28
29struct SimplePredicate {
30 col_idx: usize,
31 op: BinOp,
32 value: Value,
33}
34
35fn flatten_and(expr: &Expr) -> Vec<&Expr> {
36 match expr {
37 Expr::BinaryOp {
38 left,
39 op: BinOp::And,
40 right,
41 } => {
42 let mut v = flatten_and(left);
43 v.extend(flatten_and(right));
44 v
45 }
46 _ => vec![expr],
47 }
48}
49
50fn is_comparison(op: BinOp) -> bool {
51 matches!(
52 op,
53 BinOp::Eq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq
54 )
55}
56
57fn is_range_op(op: BinOp) -> bool {
58 matches!(op, BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq)
59}
60
61fn flip_op(op: BinOp) -> BinOp {
62 match op {
63 BinOp::Lt => BinOp::Gt,
64 BinOp::LtEq => BinOp::GtEq,
65 BinOp::Gt => BinOp::Lt,
66 BinOp::GtEq => BinOp::LtEq,
67 other => other,
68 }
69}
70
71fn resolve_column_name(expr: &Expr) -> Option<&str> {
72 match expr {
73 Expr::Column(name) => Some(name.as_str()),
74 Expr::QualifiedColumn { column, .. } => Some(column.as_str()),
75 _ => None,
76 }
77}
78
79fn resolve_literal(expr: &Expr) -> Option<Value> {
80 match expr {
81 Expr::Literal(v) => Some(v.clone()),
82 Expr::Parameter(n) => crate::eval::resolve_scoped_param(*n).ok(),
83 _ => None,
84 }
85}
86
87fn extract_simple_predicate(expr: &Expr, schema: &TableSchema) -> Option<SimplePredicate> {
88 match expr {
89 Expr::BinaryOp { left, op, right } if is_comparison(*op) => {
90 if let (Some(name), Some(val)) = (resolve_column_name(left), resolve_literal(right)) {
91 let col_idx = schema.column_index(name)?;
92 return Some(SimplePredicate {
93 col_idx,
94 op: *op,
95 value: val,
96 });
97 }
98 if let (Some(val), Some(name)) = (resolve_literal(left), resolve_column_name(right)) {
99 let col_idx = schema.column_index(name)?;
100 return Some(SimplePredicate {
101 col_idx,
102 op: flip_op(*op),
103 value: val,
104 });
105 }
106 None
107 }
108 _ => None,
109 }
110}
111
112fn flatten_between(expr: &Expr, schema: &TableSchema, out: &mut Vec<SimplePredicate>) {
114 match expr {
115 Expr::Between {
116 expr: col_expr,
117 low,
118 high,
119 negated: false,
120 } => {
121 if let (Some(name), Some(lo), Some(hi)) = (
122 resolve_column_name(col_expr),
123 resolve_literal(low),
124 resolve_literal(high),
125 ) {
126 if let Some(col_idx) = schema.column_index(name) {
127 out.push(SimplePredicate {
128 col_idx,
129 op: BinOp::GtEq,
130 value: lo,
131 });
132 out.push(SimplePredicate {
133 col_idx,
134 op: BinOp::LtEq,
135 value: hi,
136 });
137 }
138 }
139 }
140 Expr::BinaryOp {
141 left,
142 op: BinOp::And,
143 right,
144 } => {
145 flatten_between(left, schema, out);
146 flatten_between(right, schema, out);
147 }
148 _ => {}
149 }
150}
151
152pub fn plan_select(schema: &TableSchema, where_clause: &Option<Expr>) -> ScanPlan {
153 let where_expr = match where_clause {
154 Some(e) => e,
155 None => return ScanPlan::SeqScan,
156 };
157
158 let predicates = flatten_and(where_expr);
159 let simple: Vec<Option<SimplePredicate>> = predicates
160 .iter()
161 .map(|p| extract_simple_predicate(p, schema))
162 .collect();
163
164 if let Some(plan) = try_pk_lookup(schema, &simple) {
165 return plan;
166 }
167
168 let mut range_preds: Vec<SimplePredicate> = simple
169 .iter()
170 .filter_map(|p| {
171 let p = p.as_ref()?;
172 if is_range_op(p.op) {
173 Some(SimplePredicate {
174 col_idx: p.col_idx,
175 op: p.op,
176 value: p.value.clone(),
177 })
178 } else {
179 None
180 }
181 })
182 .collect();
183 flatten_between(where_expr, schema, &mut range_preds);
184
185 if let Some(plan) = try_pk_range_scan(schema, &range_preds) {
186 return plan;
187 }
188
189 if let Some(plan) = try_best_index(schema, &simple) {
190 return plan;
191 }
192
193 ScanPlan::SeqScan
194}
195
196fn try_pk_range_scan(schema: &TableSchema, range_preds: &[SimplePredicate]) -> Option<ScanPlan> {
197 if schema.primary_key_columns.len() != 1 {
198 return None; }
200 let pk_col = schema.primary_key_columns[0] as usize;
201 let conds: Vec<(BinOp, Value)> = range_preds
202 .iter()
203 .filter(|p| p.col_idx == pk_col)
204 .map(|p| (p.op, p.value.clone()))
205 .collect();
206 if conds.is_empty() {
207 return None;
208 }
209 let start_key = conds
210 .iter()
211 .filter(|(op, _)| matches!(op, BinOp::GtEq | BinOp::Gt))
212 .map(|(_, v)| encode_composite_key(std::slice::from_ref(v)))
213 .min_by(|a, b| a.cmp(b))
214 .unwrap_or_default();
215 Some(ScanPlan::PkRangeScan {
216 start_key,
217 range_conds: conds,
218 num_pk_cols: 1,
219 })
220}
221
222fn try_pk_lookup(schema: &TableSchema, predicates: &[Option<SimplePredicate>]) -> Option<ScanPlan> {
223 let pk_cols = &schema.primary_key_columns;
224 let mut pk_values: Vec<Option<Value>> = vec![None; pk_cols.len()];
225
226 for pred in predicates.iter().flatten() {
227 if pred.op == BinOp::Eq {
228 if let Some(pk_pos) = pk_cols.iter().position(|&c| c == pred.col_idx as u16) {
229 pk_values[pk_pos] = Some(pred.value.clone());
230 }
231 }
232 }
233
234 if pk_values.iter().all(|v| v.is_some()) {
235 let values: Vec<Value> = pk_values.into_iter().map(|v| v.unwrap()).collect();
236 Some(ScanPlan::PkLookup { pk_values: values })
237 } else {
238 None
239 }
240}
241
242#[derive(PartialEq, Eq, PartialOrd, Ord)]
243struct IndexScore {
244 num_equality: usize,
245 has_range: bool,
246 is_unique: bool,
247}
248
249fn try_best_index(
250 schema: &TableSchema,
251 predicates: &[Option<SimplePredicate>],
252) -> Option<ScanPlan> {
253 let mut best_score: Option<IndexScore> = None;
254 let mut best_plan: Option<ScanPlan> = None;
255
256 for idx in &schema.indices {
257 if let Some((score, plan)) = try_index_scan(schema, idx, predicates) {
258 if best_score.is_none() || score > *best_score.as_ref().unwrap() {
259 best_score = Some(score);
260 best_plan = Some(plan);
261 }
262 }
263 }
264
265 best_plan
266}
267
268fn try_index_scan(
269 schema: &TableSchema,
270 idx: &IndexDef,
271 predicates: &[Option<SimplePredicate>],
272) -> Option<(IndexScore, ScanPlan)> {
273 let mut used = Vec::new();
274 let mut equality_values: Vec<Value> = Vec::new();
275 let mut range_conds: Vec<(BinOp, Value)> = Vec::new();
276
277 for &col_idx in &idx.columns {
278 let mut found_eq = false;
279 for (i, pred) in predicates.iter().enumerate() {
280 if used.contains(&i) {
281 continue;
282 }
283 if let Some(sp) = pred {
284 if sp.col_idx == col_idx as usize && sp.op == BinOp::Eq {
285 equality_values.push(sp.value.clone());
286 used.push(i);
287 found_eq = true;
288 break;
289 }
290 }
291 }
292 if !found_eq {
293 for (i, pred) in predicates.iter().enumerate() {
294 if used.contains(&i) {
295 continue;
296 }
297 if let Some(sp) = pred {
298 if sp.col_idx == col_idx as usize && is_range_op(sp.op) {
299 range_conds.push((sp.op, sp.value.clone()));
300 used.push(i);
301 }
302 }
303 }
304 break;
305 }
306 }
307
308 if equality_values.is_empty() && range_conds.is_empty() {
309 return None;
310 }
311
312 let score = IndexScore {
313 num_equality: equality_values.len(),
314 has_range: !range_conds.is_empty(),
315 is_unique: idx.unique,
316 };
317
318 let prefix = encode_composite_key(&equality_values);
319 let idx_table = TableSchema::index_table_name(&schema.name, &idx.name);
320
321 Some((
322 score,
323 ScanPlan::IndexScan {
324 index_name: idx.name.clone(),
325 idx_table,
326 prefix,
327 num_prefix_cols: equality_values.len(),
328 range_conds,
329 is_unique: idx.unique,
330 index_columns: idx.columns.clone(),
331 },
332 ))
333}
334
335pub fn describe_plan(plan: &ScanPlan, table_schema: &TableSchema) -> String {
336 match plan {
337 ScanPlan::SeqScan => String::new(),
338
339 ScanPlan::PkLookup { pk_values } => {
340 let pk_cols: Vec<&str> = table_schema
341 .primary_key_columns
342 .iter()
343 .map(|&idx| table_schema.columns[idx as usize].name.as_str())
344 .collect();
345 let conditions: Vec<String> = pk_cols
346 .iter()
347 .zip(pk_values.iter())
348 .map(|(col, val)| format!("{col} = {}", format_value(val)))
349 .collect();
350 format!("USING PRIMARY KEY ({})", conditions.join(", "))
351 }
352
353 ScanPlan::PkRangeScan { range_conds, .. } => {
354 let pk_col = &table_schema.columns[table_schema.primary_key_columns[0] as usize].name;
355 let conditions: Vec<String> = range_conds
356 .iter()
357 .map(|(op, val)| format!("{pk_col} {} {}", op_symbol(*op), format_value(val)))
358 .collect();
359 format!("USING PRIMARY KEY RANGE ({})", conditions.join(", "))
360 }
361
362 ScanPlan::IndexScan {
363 index_name,
364 num_prefix_cols,
365 range_conds,
366 index_columns,
367 ..
368 } => {
369 let mut conditions = Vec::new();
370 for &col in index_columns.iter().take(*num_prefix_cols) {
371 let col_idx = col as usize;
372 let col_name = &table_schema.columns[col_idx].name;
373 conditions.push(format!("{col_name} = ?"));
374 }
375 if !range_conds.is_empty() && *num_prefix_cols < index_columns.len() {
376 let col_idx = index_columns[*num_prefix_cols] as usize;
377 let col_name = &table_schema.columns[col_idx].name;
378 for (op, _) in range_conds {
379 conditions.push(format!("{col_name} {} ?", op_symbol(*op)));
380 }
381 }
382 if conditions.is_empty() {
383 format!("USING INDEX {index_name}")
384 } else {
385 format!("USING INDEX {index_name} ({})", conditions.join(", "))
386 }
387 }
388 }
389}
390
391fn format_value(val: &Value) -> String {
392 match val {
393 Value::Null => "NULL".into(),
394 Value::Integer(i) => i.to_string(),
395 Value::Real(f) => format!("{f}"),
396 Value::Text(s) => format!("'{s}'"),
397 Value::Blob(_) => "BLOB".into(),
398 Value::Boolean(b) => b.to_string(),
399 Value::Date(d) => format!("DATE '{}'", crate::datetime::format_date(*d)),
400 Value::Time(t) => format!("TIME '{}'", crate::datetime::format_time(*t)),
401 Value::Timestamp(t) => format!("TIMESTAMP '{}'", crate::datetime::format_timestamp(*t)),
402 Value::Interval {
403 months,
404 days,
405 micros,
406 } => format!(
407 "INTERVAL '{}'",
408 crate::datetime::format_interval(*months, *days, *micros)
409 ),
410 }
411}
412
413fn op_symbol(op: BinOp) -> &'static str {
414 match op {
415 BinOp::Lt => "<",
416 BinOp::LtEq => "<=",
417 BinOp::Gt => ">",
418 BinOp::GtEq => ">=",
419 BinOp::Eq => "=",
420 BinOp::NotEq => "!=",
421 _ => "?",
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428 use crate::types::{ColumnDef, DataType};
429
430 fn col(name: &str, dt: DataType, nullable: bool, pos: u16) -> ColumnDef {
431 ColumnDef {
432 name: name.into(),
433 data_type: dt,
434 nullable,
435 position: pos,
436 default_expr: None,
437 default_sql: None,
438 check_expr: None,
439 check_sql: None,
440 check_name: None,
441 is_with_timezone: false,
442 }
443 }
444
445 fn test_schema() -> TableSchema {
446 TableSchema::new(
447 "users".into(),
448 vec![
449 col("id", DataType::Integer, false, 0),
450 col("name", DataType::Text, true, 1),
451 col("age", DataType::Integer, true, 2),
452 col("email", DataType::Text, true, 3),
453 ],
454 vec![0],
455 vec![
456 IndexDef {
457 name: "idx_name".into(),
458 columns: vec![1],
459 unique: false,
460 },
461 IndexDef {
462 name: "idx_email".into(),
463 columns: vec![3],
464 unique: true,
465 },
466 IndexDef {
467 name: "idx_name_age".into(),
468 columns: vec![1, 2],
469 unique: false,
470 },
471 ],
472 vec![],
473 vec![],
474 )
475 }
476
477 #[test]
478 fn no_where_is_seq_scan() {
479 let schema = test_schema();
480 let plan = plan_select(&schema, &None);
481 assert!(matches!(plan, ScanPlan::SeqScan));
482 }
483
484 #[test]
485 fn pk_equality_is_pk_lookup() {
486 let schema = test_schema();
487 let where_clause = Some(Expr::BinaryOp {
488 left: Box::new(Expr::Column("id".into())),
489 op: BinOp::Eq,
490 right: Box::new(Expr::Literal(Value::Integer(42))),
491 });
492 let plan = plan_select(&schema, &where_clause);
493 match plan {
494 ScanPlan::PkLookup { pk_values } => {
495 assert_eq!(pk_values, vec![Value::Integer(42)]);
496 }
497 other => panic!("expected PkLookup, got {other:?}"),
498 }
499 }
500
501 #[test]
502 fn unique_index_equality() {
503 let schema = test_schema();
504 let where_clause = Some(Expr::BinaryOp {
505 left: Box::new(Expr::Column("email".into())),
506 op: BinOp::Eq,
507 right: Box::new(Expr::Literal(Value::Text("alice@test.com".into()))),
508 });
509 let plan = plan_select(&schema, &where_clause);
510 match plan {
511 ScanPlan::IndexScan {
512 index_name,
513 is_unique,
514 num_prefix_cols,
515 ..
516 } => {
517 assert_eq!(index_name, "idx_email");
518 assert!(is_unique);
519 assert_eq!(num_prefix_cols, 1);
520 }
521 other => panic!("expected IndexScan, got {other:?}"),
522 }
523 }
524
525 #[test]
526 fn non_unique_index_equality() {
527 let schema = test_schema();
528 let where_clause = Some(Expr::BinaryOp {
529 left: Box::new(Expr::Column("name".into())),
530 op: BinOp::Eq,
531 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
532 });
533 let plan = plan_select(&schema, &where_clause);
534 match plan {
535 ScanPlan::IndexScan {
536 index_name,
537 num_prefix_cols,
538 ..
539 } => {
540 assert!(index_name == "idx_name" || index_name == "idx_name_age");
541 assert_eq!(num_prefix_cols, 1);
542 }
543 other => panic!("expected IndexScan, got {other:?}"),
544 }
545 }
546
547 #[test]
548 fn composite_index_full_prefix() {
549 let schema = test_schema();
550 let where_clause = Some(Expr::BinaryOp {
551 left: Box::new(Expr::BinaryOp {
552 left: Box::new(Expr::Column("name".into())),
553 op: BinOp::Eq,
554 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
555 }),
556 op: BinOp::And,
557 right: Box::new(Expr::BinaryOp {
558 left: Box::new(Expr::Column("age".into())),
559 op: BinOp::Eq,
560 right: Box::new(Expr::Literal(Value::Integer(30))),
561 }),
562 });
563 let plan = plan_select(&schema, &where_clause);
564 match plan {
565 ScanPlan::IndexScan {
566 index_name,
567 num_prefix_cols,
568 ..
569 } => {
570 assert_eq!(index_name, "idx_name_age");
571 assert_eq!(num_prefix_cols, 2);
572 }
573 other => panic!("expected IndexScan, got {other:?}"),
574 }
575 }
576
577 #[test]
578 fn range_scan_on_indexed_column() {
579 let schema = test_schema();
580 let where_clause = Some(Expr::BinaryOp {
581 left: Box::new(Expr::Column("name".into())),
582 op: BinOp::Gt,
583 right: Box::new(Expr::Literal(Value::Text("M".into()))),
584 });
585 let plan = plan_select(&schema, &where_clause);
586 match plan {
587 ScanPlan::IndexScan {
588 range_conds,
589 num_prefix_cols,
590 ..
591 } => {
592 assert_eq!(num_prefix_cols, 0);
593 assert_eq!(range_conds.len(), 1);
594 assert_eq!(range_conds[0].0, BinOp::Gt);
595 }
596 other => panic!("expected IndexScan, got {other:?}"),
597 }
598 }
599
600 #[test]
601 fn composite_equality_plus_range() {
602 let schema = test_schema();
603 let where_clause = Some(Expr::BinaryOp {
604 left: Box::new(Expr::BinaryOp {
605 left: Box::new(Expr::Column("name".into())),
606 op: BinOp::Eq,
607 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
608 }),
609 op: BinOp::And,
610 right: Box::new(Expr::BinaryOp {
611 left: Box::new(Expr::Column("age".into())),
612 op: BinOp::Gt,
613 right: Box::new(Expr::Literal(Value::Integer(25))),
614 }),
615 });
616 let plan = plan_select(&schema, &where_clause);
617 match plan {
618 ScanPlan::IndexScan {
619 index_name,
620 num_prefix_cols,
621 range_conds,
622 ..
623 } => {
624 assert_eq!(index_name, "idx_name_age");
625 assert_eq!(num_prefix_cols, 1);
626 assert_eq!(range_conds.len(), 1);
627 }
628 other => panic!("expected IndexScan, got {other:?}"),
629 }
630 }
631
632 #[test]
633 fn or_condition_falls_back_to_seq_scan() {
634 let schema = test_schema();
635 let where_clause = Some(Expr::BinaryOp {
636 left: Box::new(Expr::BinaryOp {
637 left: Box::new(Expr::Column("name".into())),
638 op: BinOp::Eq,
639 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
640 }),
641 op: BinOp::Or,
642 right: Box::new(Expr::BinaryOp {
643 left: Box::new(Expr::Column("name".into())),
644 op: BinOp::Eq,
645 right: Box::new(Expr::Literal(Value::Text("Bob".into()))),
646 }),
647 });
648 let plan = plan_select(&schema, &where_clause);
649 assert!(matches!(plan, ScanPlan::SeqScan));
650 }
651
652 #[test]
653 fn non_indexed_column_is_seq_scan() {
654 let schema = test_schema();
655 let where_clause = Some(Expr::BinaryOp {
656 left: Box::new(Expr::Column("age".into())),
657 op: BinOp::Eq,
658 right: Box::new(Expr::Literal(Value::Integer(30))),
659 });
660 let plan = plan_select(&schema, &where_clause);
661 assert!(matches!(plan, ScanPlan::SeqScan));
662 }
663
664 #[test]
665 fn reversed_literal_column() {
666 let schema = test_schema();
667 let where_clause = Some(Expr::BinaryOp {
668 left: Box::new(Expr::Literal(Value::Integer(42))),
669 op: BinOp::Eq,
670 right: Box::new(Expr::Column("id".into())),
671 });
672 let plan = plan_select(&schema, &where_clause);
673 assert!(matches!(plan, ScanPlan::PkLookup { .. }));
674 }
675
676 #[test]
677 fn reversed_comparison_flips_op() {
678 let schema = test_schema();
679 let where_clause = Some(Expr::BinaryOp {
680 left: Box::new(Expr::Literal(Value::Integer(5))),
681 op: BinOp::Lt,
682 right: Box::new(Expr::Column("name".into())),
683 });
684 let plan = plan_select(&schema, &where_clause);
685 match plan {
686 ScanPlan::IndexScan { range_conds, .. } => {
687 assert_eq!(range_conds[0].0, BinOp::Gt);
688 }
689 other => panic!("expected IndexScan, got {other:?}"),
690 }
691 }
692
693 #[test]
694 fn prefers_unique_index() {
695 let schema = TableSchema::new(
696 "t".into(),
697 vec![
698 col("id", DataType::Integer, false, 0),
699 col("code", DataType::Text, false, 1),
700 ],
701 vec![0],
702 vec![
703 IndexDef {
704 name: "idx_code".into(),
705 columns: vec![1],
706 unique: false,
707 },
708 IndexDef {
709 name: "idx_code_uniq".into(),
710 columns: vec![1],
711 unique: true,
712 },
713 ],
714 vec![],
715 vec![],
716 );
717 let where_clause = Some(Expr::BinaryOp {
718 left: Box::new(Expr::Column("code".into())),
719 op: BinOp::Eq,
720 right: Box::new(Expr::Literal(Value::Text("X".into()))),
721 });
722 let plan = plan_select(&schema, &where_clause);
723 match plan {
724 ScanPlan::IndexScan {
725 index_name,
726 is_unique,
727 ..
728 } => {
729 assert_eq!(index_name, "idx_code_uniq");
730 assert!(is_unique);
731 }
732 other => panic!("expected IndexScan, got {other:?}"),
733 }
734 }
735
736 #[test]
737 fn prefers_more_equality_columns() {
738 let schema = test_schema();
739 let where_clause = Some(Expr::BinaryOp {
740 left: Box::new(Expr::BinaryOp {
741 left: Box::new(Expr::Column("name".into())),
742 op: BinOp::Eq,
743 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
744 }),
745 op: BinOp::And,
746 right: Box::new(Expr::BinaryOp {
747 left: Box::new(Expr::Column("age".into())),
748 op: BinOp::Eq,
749 right: Box::new(Expr::Literal(Value::Integer(30))),
750 }),
751 });
752 let plan = plan_select(&schema, &where_clause);
753 match plan {
754 ScanPlan::IndexScan {
755 index_name,
756 num_prefix_cols,
757 ..
758 } => {
759 assert_eq!(index_name, "idx_name_age");
760 assert_eq!(num_prefix_cols, 2);
761 }
762 other => panic!("expected IndexScan, got {other:?}"),
763 }
764 }
765}