1use crate::encoding::encode_composite_key;
4use crate::parser::{BinOp, Expr};
5use crate::types::{IndexDef, TableSchema, Value};
6
7#[derive(Debug, Clone)]
10pub enum ScanPlan {
11 SeqScan,
12 PkLookup {
13 pk_values: Vec<Value>,
14 },
15 PkRangeScan {
16 start_key: Vec<u8>,
17 range_conds: Vec<(BinOp, Value)>,
18 num_pk_cols: usize,
19 },
20 IndexScan {
21 index_name: String,
22 idx_table: Vec<u8>,
23 prefix: Vec<u8>,
24 num_prefix_cols: usize,
25 range_conds: Vec<(BinOp, Value)>,
26 is_unique: bool,
27 index_columns: Vec<u16>,
28 },
29}
30
31struct SimplePredicate {
34 col_idx: usize,
35 op: BinOp,
36 value: Value,
37}
38
39fn flatten_and(expr: &Expr) -> Vec<&Expr> {
40 match expr {
41 Expr::BinaryOp {
42 left,
43 op: BinOp::And,
44 right,
45 } => {
46 let mut v = flatten_and(left);
47 v.extend(flatten_and(right));
48 v
49 }
50 _ => vec![expr],
51 }
52}
53
54fn is_comparison(op: BinOp) -> bool {
55 matches!(
56 op,
57 BinOp::Eq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq
58 )
59}
60
61fn is_range_op(op: BinOp) -> bool {
62 matches!(op, BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq)
63}
64
65fn flip_op(op: BinOp) -> BinOp {
66 match op {
67 BinOp::Lt => BinOp::Gt,
68 BinOp::LtEq => BinOp::GtEq,
69 BinOp::Gt => BinOp::Lt,
70 BinOp::GtEq => BinOp::LtEq,
71 other => other,
72 }
73}
74
75fn resolve_column_name(expr: &Expr) -> Option<&str> {
76 match expr {
77 Expr::Column(name) => Some(name.as_str()),
78 Expr::QualifiedColumn { column, .. } => Some(column.as_str()),
79 _ => None,
80 }
81}
82
83fn extract_simple_predicate(expr: &Expr, schema: &TableSchema) -> Option<SimplePredicate> {
84 match expr {
85 Expr::BinaryOp { left, op, right } if is_comparison(*op) => {
86 if let (Some(name), Expr::Literal(val)) = (resolve_column_name(left), right.as_ref()) {
87 let col_idx = schema.column_index(name)?;
88 return Some(SimplePredicate {
89 col_idx,
90 op: *op,
91 value: val.clone(),
92 });
93 }
94 if let (Expr::Literal(val), Some(name)) = (left.as_ref(), resolve_column_name(right)) {
95 let col_idx = schema.column_index(name)?;
96 return Some(SimplePredicate {
97 col_idx,
98 op: flip_op(*op),
99 value: val.clone(),
100 });
101 }
102 None
103 }
104 _ => None,
105 }
106}
107
108fn flatten_between(expr: &Expr, schema: &TableSchema, out: &mut Vec<SimplePredicate>) {
110 match expr {
111 Expr::Between {
112 expr: col_expr,
113 low,
114 high,
115 negated: false,
116 } => {
117 if let (Some(name), Expr::Literal(lo), Expr::Literal(hi)) =
118 (resolve_column_name(col_expr), low.as_ref(), high.as_ref())
119 {
120 if let Some(col_idx) = schema.column_index(name) {
121 out.push(SimplePredicate {
122 col_idx,
123 op: BinOp::GtEq,
124 value: lo.clone(),
125 });
126 out.push(SimplePredicate {
127 col_idx,
128 op: BinOp::LtEq,
129 value: hi.clone(),
130 });
131 }
132 }
133 }
134 Expr::BinaryOp {
135 left,
136 op: BinOp::And,
137 right,
138 } => {
139 flatten_between(left, schema, out);
140 flatten_between(right, schema, out);
141 }
142 _ => {}
143 }
144}
145
146pub fn plan_select(schema: &TableSchema, where_clause: &Option<Expr>) -> ScanPlan {
149 let where_expr = match where_clause {
150 Some(e) => e,
151 None => return ScanPlan::SeqScan,
152 };
153
154 let predicates = flatten_and(where_expr);
155 let simple: Vec<Option<SimplePredicate>> = predicates
156 .iter()
157 .map(|p| extract_simple_predicate(p, schema))
158 .collect();
159
160 if let Some(plan) = try_pk_lookup(schema, &simple) {
161 return plan;
162 }
163
164 let mut range_preds: Vec<SimplePredicate> = simple
166 .iter()
167 .filter_map(|p| {
168 let p = p.as_ref()?;
169 if is_range_op(p.op) {
170 Some(SimplePredicate {
171 col_idx: p.col_idx,
172 op: p.op,
173 value: p.value.clone(),
174 })
175 } else {
176 None
177 }
178 })
179 .collect();
180 flatten_between(where_expr, schema, &mut range_preds);
181
182 if let Some(plan) = try_pk_range_scan(schema, &range_preds) {
183 return plan;
184 }
185
186 if let Some(plan) = try_best_index(schema, &simple) {
187 return plan;
188 }
189
190 ScanPlan::SeqScan
191}
192
193fn try_pk_range_scan(schema: &TableSchema, range_preds: &[SimplePredicate]) -> Option<ScanPlan> {
194 if schema.primary_key_columns.len() != 1 {
195 return None; }
197 let pk_col = schema.primary_key_columns[0] as usize;
198 let conds: Vec<(BinOp, Value)> = range_preds
199 .iter()
200 .filter(|p| p.col_idx == pk_col)
201 .map(|p| (p.op, p.value.clone()))
202 .collect();
203 if conds.is_empty() {
204 return None;
205 }
206 let start_key = conds
208 .iter()
209 .filter(|(op, _)| matches!(op, BinOp::GtEq | BinOp::Gt))
210 .map(|(_, v)| encode_composite_key(std::slice::from_ref(v)))
211 .min_by(|a, b| a.cmp(b))
212 .unwrap_or_default();
213 Some(ScanPlan::PkRangeScan {
214 start_key,
215 range_conds: conds,
216 num_pk_cols: 1,
217 })
218}
219
220fn try_pk_lookup(schema: &TableSchema, predicates: &[Option<SimplePredicate>]) -> Option<ScanPlan> {
221 let pk_cols = &schema.primary_key_columns;
222 let mut pk_values: Vec<Option<Value>> = vec![None; pk_cols.len()];
223
224 for pred in predicates.iter().flatten() {
225 if pred.op == BinOp::Eq {
226 if let Some(pk_pos) = pk_cols.iter().position(|&c| c == pred.col_idx as u16) {
227 pk_values[pk_pos] = Some(pred.value.clone());
228 }
229 }
230 }
231
232 if pk_values.iter().all(|v| v.is_some()) {
233 let values: Vec<Value> = pk_values.into_iter().map(|v| v.unwrap()).collect();
234 Some(ScanPlan::PkLookup { pk_values: values })
235 } else {
236 None
237 }
238}
239
240#[derive(PartialEq, Eq, PartialOrd, Ord)]
243struct IndexScore {
244 num_equality: usize,
245 has_range: bool,
246 is_unique: bool,
247}
248
249fn try_best_index(
250 schema: &TableSchema,
251 predicates: &[Option<SimplePredicate>],
252) -> Option<ScanPlan> {
253 let mut best_score: Option<IndexScore> = None;
254 let mut best_plan: Option<ScanPlan> = None;
255
256 for idx in &schema.indices {
257 if let Some((score, plan)) = try_index_scan(schema, idx, predicates) {
258 if best_score.is_none() || score > *best_score.as_ref().unwrap() {
259 best_score = Some(score);
260 best_plan = Some(plan);
261 }
262 }
263 }
264
265 best_plan
266}
267
268fn try_index_scan(
269 schema: &TableSchema,
270 idx: &IndexDef,
271 predicates: &[Option<SimplePredicate>],
272) -> Option<(IndexScore, ScanPlan)> {
273 let mut used = Vec::new();
274 let mut equality_values: Vec<Value> = Vec::new();
275 let mut range_conds: Vec<(BinOp, Value)> = Vec::new();
276
277 for &col_idx in &idx.columns {
278 let mut found_eq = false;
279 for (i, pred) in predicates.iter().enumerate() {
280 if used.contains(&i) {
281 continue;
282 }
283 if let Some(sp) = pred {
284 if sp.col_idx == col_idx as usize && sp.op == BinOp::Eq {
285 equality_values.push(sp.value.clone());
286 used.push(i);
287 found_eq = true;
288 break;
289 }
290 }
291 }
292 if !found_eq {
293 for (i, pred) in predicates.iter().enumerate() {
294 if used.contains(&i) {
295 continue;
296 }
297 if let Some(sp) = pred {
298 if sp.col_idx == col_idx as usize && is_range_op(sp.op) {
299 range_conds.push((sp.op, sp.value.clone()));
300 used.push(i);
301 }
302 }
303 }
304 break;
305 }
306 }
307
308 if equality_values.is_empty() && range_conds.is_empty() {
309 return None;
310 }
311
312 let score = IndexScore {
313 num_equality: equality_values.len(),
314 has_range: !range_conds.is_empty(),
315 is_unique: idx.unique,
316 };
317
318 let prefix = encode_composite_key(&equality_values);
319 let idx_table = TableSchema::index_table_name(&schema.name, &idx.name);
320
321 Some((
322 score,
323 ScanPlan::IndexScan {
324 index_name: idx.name.clone(),
325 idx_table,
326 prefix,
327 num_prefix_cols: equality_values.len(),
328 range_conds,
329 is_unique: idx.unique,
330 index_columns: idx.columns.clone(),
331 },
332 ))
333}
334
335pub fn describe_plan(plan: &ScanPlan, table_schema: &TableSchema) -> String {
338 match plan {
339 ScanPlan::SeqScan => String::new(),
340
341 ScanPlan::PkLookup { pk_values } => {
342 let pk_cols: Vec<&str> = table_schema
343 .primary_key_columns
344 .iter()
345 .map(|&idx| table_schema.columns[idx as usize].name.as_str())
346 .collect();
347 let conditions: Vec<String> = pk_cols
348 .iter()
349 .zip(pk_values.iter())
350 .map(|(col, val)| format!("{col} = {}", format_value(val)))
351 .collect();
352 format!("USING PRIMARY KEY ({})", conditions.join(", "))
353 }
354
355 ScanPlan::PkRangeScan { range_conds, .. } => {
356 let pk_col = &table_schema.columns[table_schema.primary_key_columns[0] as usize].name;
357 let conditions: Vec<String> = range_conds
358 .iter()
359 .map(|(op, val)| format!("{pk_col} {} {}", op_symbol(*op), format_value(val)))
360 .collect();
361 format!("USING PRIMARY KEY RANGE ({})", conditions.join(", "))
362 }
363
364 ScanPlan::IndexScan {
365 index_name,
366 num_prefix_cols,
367 range_conds,
368 index_columns,
369 ..
370 } => {
371 let mut conditions = Vec::new();
372 for &col in index_columns.iter().take(*num_prefix_cols) {
373 let col_idx = col as usize;
374 let col_name = &table_schema.columns[col_idx].name;
375 conditions.push(format!("{col_name} = ?"));
376 }
377 if !range_conds.is_empty() && *num_prefix_cols < index_columns.len() {
378 let col_idx = index_columns[*num_prefix_cols] as usize;
379 let col_name = &table_schema.columns[col_idx].name;
380 for (op, _) in range_conds {
381 conditions.push(format!("{col_name} {} ?", op_symbol(*op)));
382 }
383 }
384 if conditions.is_empty() {
385 format!("USING INDEX {index_name}")
386 } else {
387 format!("USING INDEX {index_name} ({})", conditions.join(", "))
388 }
389 }
390 }
391}
392
393fn format_value(val: &Value) -> String {
394 match val {
395 Value::Null => "NULL".into(),
396 Value::Integer(i) => i.to_string(),
397 Value::Real(f) => format!("{f}"),
398 Value::Text(s) => format!("'{s}'"),
399 Value::Blob(_) => "BLOB".into(),
400 Value::Boolean(b) => b.to_string(),
401 }
402}
403
404fn op_symbol(op: BinOp) -> &'static str {
405 match op {
406 BinOp::Lt => "<",
407 BinOp::LtEq => "<=",
408 BinOp::Gt => ">",
409 BinOp::GtEq => ">=",
410 BinOp::Eq => "=",
411 BinOp::NotEq => "!=",
412 _ => "?",
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use crate::types::{ColumnDef, DataType};
420
421 fn col(name: &str, dt: DataType, nullable: bool, pos: u16) -> ColumnDef {
422 ColumnDef {
423 name: name.into(),
424 data_type: dt,
425 nullable,
426 position: pos,
427 default_expr: None,
428 default_sql: None,
429 check_expr: None,
430 check_sql: None,
431 check_name: None,
432 }
433 }
434
435 fn test_schema() -> TableSchema {
436 TableSchema::new(
437 "users".into(),
438 vec![
439 col("id", DataType::Integer, false, 0),
440 col("name", DataType::Text, true, 1),
441 col("age", DataType::Integer, true, 2),
442 col("email", DataType::Text, true, 3),
443 ],
444 vec![0],
445 vec![
446 IndexDef {
447 name: "idx_name".into(),
448 columns: vec![1],
449 unique: false,
450 },
451 IndexDef {
452 name: "idx_email".into(),
453 columns: vec![3],
454 unique: true,
455 },
456 IndexDef {
457 name: "idx_name_age".into(),
458 columns: vec![1, 2],
459 unique: false,
460 },
461 ],
462 vec![],
463 vec![],
464 )
465 }
466
467 #[test]
468 fn no_where_is_seq_scan() {
469 let schema = test_schema();
470 let plan = plan_select(&schema, &None);
471 assert!(matches!(plan, ScanPlan::SeqScan));
472 }
473
474 #[test]
475 fn pk_equality_is_pk_lookup() {
476 let schema = test_schema();
477 let where_clause = Some(Expr::BinaryOp {
478 left: Box::new(Expr::Column("id".into())),
479 op: BinOp::Eq,
480 right: Box::new(Expr::Literal(Value::Integer(42))),
481 });
482 let plan = plan_select(&schema, &where_clause);
483 match plan {
484 ScanPlan::PkLookup { pk_values } => {
485 assert_eq!(pk_values, vec![Value::Integer(42)]);
486 }
487 other => panic!("expected PkLookup, got {other:?}"),
488 }
489 }
490
491 #[test]
492 fn unique_index_equality() {
493 let schema = test_schema();
494 let where_clause = Some(Expr::BinaryOp {
495 left: Box::new(Expr::Column("email".into())),
496 op: BinOp::Eq,
497 right: Box::new(Expr::Literal(Value::Text("alice@test.com".into()))),
498 });
499 let plan = plan_select(&schema, &where_clause);
500 match plan {
501 ScanPlan::IndexScan {
502 index_name,
503 is_unique,
504 num_prefix_cols,
505 ..
506 } => {
507 assert_eq!(index_name, "idx_email");
508 assert!(is_unique);
509 assert_eq!(num_prefix_cols, 1);
510 }
511 other => panic!("expected IndexScan, got {other:?}"),
512 }
513 }
514
515 #[test]
516 fn non_unique_index_equality() {
517 let schema = test_schema();
518 let where_clause = Some(Expr::BinaryOp {
519 left: Box::new(Expr::Column("name".into())),
520 op: BinOp::Eq,
521 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
522 });
523 let plan = plan_select(&schema, &where_clause);
524 match plan {
525 ScanPlan::IndexScan {
526 index_name,
527 num_prefix_cols,
528 ..
529 } => {
530 assert!(index_name == "idx_name" || index_name == "idx_name_age");
531 assert_eq!(num_prefix_cols, 1);
532 }
533 other => panic!("expected IndexScan, got {other:?}"),
534 }
535 }
536
537 #[test]
538 fn composite_index_full_prefix() {
539 let schema = test_schema();
540 let where_clause = Some(Expr::BinaryOp {
541 left: Box::new(Expr::BinaryOp {
542 left: Box::new(Expr::Column("name".into())),
543 op: BinOp::Eq,
544 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
545 }),
546 op: BinOp::And,
547 right: Box::new(Expr::BinaryOp {
548 left: Box::new(Expr::Column("age".into())),
549 op: BinOp::Eq,
550 right: Box::new(Expr::Literal(Value::Integer(30))),
551 }),
552 });
553 let plan = plan_select(&schema, &where_clause);
554 match plan {
555 ScanPlan::IndexScan {
556 index_name,
557 num_prefix_cols,
558 ..
559 } => {
560 assert_eq!(index_name, "idx_name_age");
561 assert_eq!(num_prefix_cols, 2);
562 }
563 other => panic!("expected IndexScan, got {other:?}"),
564 }
565 }
566
567 #[test]
568 fn range_scan_on_indexed_column() {
569 let schema = test_schema();
570 let where_clause = Some(Expr::BinaryOp {
571 left: Box::new(Expr::Column("name".into())),
572 op: BinOp::Gt,
573 right: Box::new(Expr::Literal(Value::Text("M".into()))),
574 });
575 let plan = plan_select(&schema, &where_clause);
576 match plan {
577 ScanPlan::IndexScan {
578 range_conds,
579 num_prefix_cols,
580 ..
581 } => {
582 assert_eq!(num_prefix_cols, 0);
583 assert_eq!(range_conds.len(), 1);
584 assert_eq!(range_conds[0].0, BinOp::Gt);
585 }
586 other => panic!("expected IndexScan, got {other:?}"),
587 }
588 }
589
590 #[test]
591 fn composite_equality_plus_range() {
592 let schema = test_schema();
593 let where_clause = Some(Expr::BinaryOp {
594 left: Box::new(Expr::BinaryOp {
595 left: Box::new(Expr::Column("name".into())),
596 op: BinOp::Eq,
597 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
598 }),
599 op: BinOp::And,
600 right: Box::new(Expr::BinaryOp {
601 left: Box::new(Expr::Column("age".into())),
602 op: BinOp::Gt,
603 right: Box::new(Expr::Literal(Value::Integer(25))),
604 }),
605 });
606 let plan = plan_select(&schema, &where_clause);
607 match plan {
608 ScanPlan::IndexScan {
609 index_name,
610 num_prefix_cols,
611 range_conds,
612 ..
613 } => {
614 assert_eq!(index_name, "idx_name_age");
615 assert_eq!(num_prefix_cols, 1);
616 assert_eq!(range_conds.len(), 1);
617 }
618 other => panic!("expected IndexScan, got {other:?}"),
619 }
620 }
621
622 #[test]
623 fn or_condition_falls_back_to_seq_scan() {
624 let schema = test_schema();
625 let where_clause = Some(Expr::BinaryOp {
626 left: Box::new(Expr::BinaryOp {
627 left: Box::new(Expr::Column("name".into())),
628 op: BinOp::Eq,
629 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
630 }),
631 op: BinOp::Or,
632 right: Box::new(Expr::BinaryOp {
633 left: Box::new(Expr::Column("name".into())),
634 op: BinOp::Eq,
635 right: Box::new(Expr::Literal(Value::Text("Bob".into()))),
636 }),
637 });
638 let plan = plan_select(&schema, &where_clause);
639 assert!(matches!(plan, ScanPlan::SeqScan));
640 }
641
642 #[test]
643 fn non_indexed_column_is_seq_scan() {
644 let schema = test_schema();
645 let where_clause = Some(Expr::BinaryOp {
646 left: Box::new(Expr::Column("age".into())),
647 op: BinOp::Eq,
648 right: Box::new(Expr::Literal(Value::Integer(30))),
649 });
650 let plan = plan_select(&schema, &where_clause);
651 assert!(matches!(plan, ScanPlan::SeqScan));
652 }
653
654 #[test]
655 fn reversed_literal_column() {
656 let schema = test_schema();
657 let where_clause = Some(Expr::BinaryOp {
658 left: Box::new(Expr::Literal(Value::Integer(42))),
659 op: BinOp::Eq,
660 right: Box::new(Expr::Column("id".into())),
661 });
662 let plan = plan_select(&schema, &where_clause);
663 assert!(matches!(plan, ScanPlan::PkLookup { .. }));
664 }
665
666 #[test]
667 fn reversed_comparison_flips_op() {
668 let schema = test_schema();
669 let where_clause = Some(Expr::BinaryOp {
670 left: Box::new(Expr::Literal(Value::Integer(5))),
671 op: BinOp::Lt,
672 right: Box::new(Expr::Column("name".into())),
673 });
674 let plan = plan_select(&schema, &where_clause);
675 match plan {
676 ScanPlan::IndexScan { range_conds, .. } => {
677 assert_eq!(range_conds[0].0, BinOp::Gt);
678 }
679 other => panic!("expected IndexScan, got {other:?}"),
680 }
681 }
682
683 #[test]
684 fn prefers_unique_index() {
685 let schema = TableSchema::new(
686 "t".into(),
687 vec![
688 col("id", DataType::Integer, false, 0),
689 col("code", DataType::Text, false, 1),
690 ],
691 vec![0],
692 vec![
693 IndexDef {
694 name: "idx_code".into(),
695 columns: vec![1],
696 unique: false,
697 },
698 IndexDef {
699 name: "idx_code_uniq".into(),
700 columns: vec![1],
701 unique: true,
702 },
703 ],
704 vec![],
705 vec![],
706 );
707 let where_clause = Some(Expr::BinaryOp {
708 left: Box::new(Expr::Column("code".into())),
709 op: BinOp::Eq,
710 right: Box::new(Expr::Literal(Value::Text("X".into()))),
711 });
712 let plan = plan_select(&schema, &where_clause);
713 match plan {
714 ScanPlan::IndexScan {
715 index_name,
716 is_unique,
717 ..
718 } => {
719 assert_eq!(index_name, "idx_code_uniq");
720 assert!(is_unique);
721 }
722 other => panic!("expected IndexScan, got {other:?}"),
723 }
724 }
725
726 #[test]
727 fn prefers_more_equality_columns() {
728 let schema = test_schema();
729 let where_clause = Some(Expr::BinaryOp {
730 left: Box::new(Expr::BinaryOp {
731 left: Box::new(Expr::Column("name".into())),
732 op: BinOp::Eq,
733 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
734 }),
735 op: BinOp::And,
736 right: Box::new(Expr::BinaryOp {
737 left: Box::new(Expr::Column("age".into())),
738 op: BinOp::Eq,
739 right: Box::new(Expr::Literal(Value::Integer(30))),
740 }),
741 });
742 let plan = plan_select(&schema, &where_clause);
743 match plan {
744 ScanPlan::IndexScan {
745 index_name,
746 num_prefix_cols,
747 ..
748 } => {
749 assert_eq!(index_name, "idx_name_age");
750 assert_eq!(num_prefix_cols, 2);
751 }
752 other => panic!("expected IndexScan, got {other:?}"),
753 }
754 }
755}