1use crate::encoding::encode_composite_key;
7use crate::parser::{BinOp, Expr};
8use crate::types::{IndexDef, TableSchema, Value};
9
10#[derive(Debug)]
13pub enum ScanPlan {
14 SeqScan,
15 PkLookup {
16 pk_values: Vec<Value>,
17 },
18 IndexScan {
19 index_name: String,
20 idx_table: Vec<u8>,
21 prefix: Vec<u8>,
22 num_prefix_cols: usize,
23 range_conds: Vec<(BinOp, Value)>,
24 is_unique: bool,
25 index_columns: Vec<u16>,
26 },
27}
28
29struct SimplePredicate {
32 col_idx: usize,
33 op: BinOp,
34 value: Value,
35}
36
37fn flatten_and(expr: &Expr) -> Vec<&Expr> {
38 match expr {
39 Expr::BinaryOp {
40 left,
41 op: BinOp::And,
42 right,
43 } => {
44 let mut v = flatten_and(left);
45 v.extend(flatten_and(right));
46 v
47 }
48 _ => vec![expr],
49 }
50}
51
52fn is_comparison(op: BinOp) -> bool {
53 matches!(
54 op,
55 BinOp::Eq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq
56 )
57}
58
59fn is_range_op(op: BinOp) -> bool {
60 matches!(op, BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq)
61}
62
63fn flip_op(op: BinOp) -> BinOp {
64 match op {
65 BinOp::Lt => BinOp::Gt,
66 BinOp::LtEq => BinOp::GtEq,
67 BinOp::Gt => BinOp::Lt,
68 BinOp::GtEq => BinOp::LtEq,
69 other => other,
70 }
71}
72
73fn resolve_column_name(expr: &Expr) -> Option<&str> {
74 match expr {
75 Expr::Column(name) => Some(name.as_str()),
76 Expr::QualifiedColumn { column, .. } => Some(column.as_str()),
77 _ => None,
78 }
79}
80
81fn extract_simple_predicate(expr: &Expr, schema: &TableSchema) -> Option<SimplePredicate> {
82 match expr {
83 Expr::BinaryOp { left, op, right } if is_comparison(*op) => {
84 if let (Some(name), Expr::Literal(val)) = (resolve_column_name(left), right.as_ref()) {
85 let col_idx = schema.column_index(name)?;
86 return Some(SimplePredicate {
87 col_idx,
88 op: *op,
89 value: val.clone(),
90 });
91 }
92 if let (Expr::Literal(val), Some(name)) = (left.as_ref(), resolve_column_name(right)) {
93 let col_idx = schema.column_index(name)?;
94 return Some(SimplePredicate {
95 col_idx,
96 op: flip_op(*op),
97 value: val.clone(),
98 });
99 }
100 None
101 }
102 _ => None,
103 }
104}
105
106pub fn plan_select(schema: &TableSchema, where_clause: &Option<Expr>) -> ScanPlan {
109 let where_expr = match where_clause {
110 Some(e) => e,
111 None => return ScanPlan::SeqScan,
112 };
113
114 let predicates = flatten_and(where_expr);
115 let simple: Vec<Option<SimplePredicate>> = predicates
116 .iter()
117 .map(|p| extract_simple_predicate(p, schema))
118 .collect();
119
120 if let Some(plan) = try_pk_lookup(schema, &simple) {
121 return plan;
122 }
123
124 if let Some(plan) = try_best_index(schema, &simple) {
125 return plan;
126 }
127
128 ScanPlan::SeqScan
129}
130
131fn try_pk_lookup(schema: &TableSchema, predicates: &[Option<SimplePredicate>]) -> Option<ScanPlan> {
132 let pk_cols = &schema.primary_key_columns;
133 let mut pk_values: Vec<Option<Value>> = vec![None; pk_cols.len()];
134
135 for pred in predicates.iter().flatten() {
136 if pred.op == BinOp::Eq {
137 if let Some(pk_pos) = pk_cols.iter().position(|&c| c == pred.col_idx as u16) {
138 pk_values[pk_pos] = Some(pred.value.clone());
139 }
140 }
141 }
142
143 if pk_values.iter().all(|v| v.is_some()) {
144 let values: Vec<Value> = pk_values.into_iter().map(|v| v.unwrap()).collect();
145 Some(ScanPlan::PkLookup { pk_values: values })
146 } else {
147 None
148 }
149}
150
151#[derive(PartialEq, Eq, PartialOrd, Ord)]
154struct IndexScore {
155 num_equality: usize,
156 has_range: bool,
157 is_unique: bool,
158}
159
160fn try_best_index(
161 schema: &TableSchema,
162 predicates: &[Option<SimplePredicate>],
163) -> Option<ScanPlan> {
164 let mut best_score: Option<IndexScore> = None;
165 let mut best_plan: Option<ScanPlan> = None;
166
167 for idx in &schema.indices {
168 if let Some((score, plan)) = try_index_scan(schema, idx, predicates) {
169 if best_score.is_none() || score > *best_score.as_ref().unwrap() {
170 best_score = Some(score);
171 best_plan = Some(plan);
172 }
173 }
174 }
175
176 best_plan
177}
178
179fn try_index_scan(
180 schema: &TableSchema,
181 idx: &IndexDef,
182 predicates: &[Option<SimplePredicate>],
183) -> Option<(IndexScore, ScanPlan)> {
184 let mut used = Vec::new();
185 let mut equality_values: Vec<Value> = Vec::new();
186 let mut range_conds: Vec<(BinOp, Value)> = Vec::new();
187
188 for &col_idx in &idx.columns {
189 let mut found_eq = false;
190 for (i, pred) in predicates.iter().enumerate() {
191 if used.contains(&i) {
192 continue;
193 }
194 if let Some(sp) = pred {
195 if sp.col_idx == col_idx as usize && sp.op == BinOp::Eq {
196 equality_values.push(sp.value.clone());
197 used.push(i);
198 found_eq = true;
199 break;
200 }
201 }
202 }
203 if !found_eq {
204 for (i, pred) in predicates.iter().enumerate() {
205 if used.contains(&i) {
206 continue;
207 }
208 if let Some(sp) = pred {
209 if sp.col_idx == col_idx as usize && is_range_op(sp.op) {
210 range_conds.push((sp.op, sp.value.clone()));
211 used.push(i);
212 }
213 }
214 }
215 break;
216 }
217 }
218
219 if equality_values.is_empty() && range_conds.is_empty() {
220 return None;
221 }
222
223 let score = IndexScore {
224 num_equality: equality_values.len(),
225 has_range: !range_conds.is_empty(),
226 is_unique: idx.unique,
227 };
228
229 let prefix = encode_composite_key(&equality_values);
230 let idx_table = TableSchema::index_table_name(&schema.name, &idx.name);
231
232 Some((
233 score,
234 ScanPlan::IndexScan {
235 index_name: idx.name.clone(),
236 idx_table,
237 prefix,
238 num_prefix_cols: equality_values.len(),
239 range_conds,
240 is_unique: idx.unique,
241 index_columns: idx.columns.clone(),
242 },
243 ))
244}
245
246pub fn describe_plan(plan: &ScanPlan, table_schema: &TableSchema) -> String {
249 match plan {
250 ScanPlan::SeqScan => String::new(),
251
252 ScanPlan::PkLookup { pk_values } => {
253 let pk_cols: Vec<&str> = table_schema
254 .primary_key_columns
255 .iter()
256 .map(|&idx| table_schema.columns[idx as usize].name.as_str())
257 .collect();
258 let conditions: Vec<String> = pk_cols
259 .iter()
260 .zip(pk_values.iter())
261 .map(|(col, val)| format!("{col} = {}", format_value(val)))
262 .collect();
263 format!("USING PRIMARY KEY ({})", conditions.join(", "))
264 }
265
266 ScanPlan::IndexScan {
267 index_name,
268 num_prefix_cols,
269 range_conds,
270 index_columns,
271 ..
272 } => {
273 let mut conditions = Vec::new();
274 for &col in index_columns.iter().take(*num_prefix_cols) {
275 let col_idx = col as usize;
276 let col_name = &table_schema.columns[col_idx].name;
277 conditions.push(format!("{col_name} = ?"));
278 }
279 if !range_conds.is_empty() && *num_prefix_cols < index_columns.len() {
280 let col_idx = index_columns[*num_prefix_cols] as usize;
281 let col_name = &table_schema.columns[col_idx].name;
282 for (op, _) in range_conds {
283 conditions.push(format!("{col_name} {} ?", op_symbol(*op)));
284 }
285 }
286 if conditions.is_empty() {
287 format!("USING INDEX {index_name}")
288 } else {
289 format!("USING INDEX {index_name} ({})", conditions.join(", "))
290 }
291 }
292 }
293}
294
295fn format_value(val: &Value) -> String {
296 match val {
297 Value::Null => "NULL".into(),
298 Value::Integer(i) => i.to_string(),
299 Value::Real(f) => format!("{f}"),
300 Value::Text(s) => format!("'{s}'"),
301 Value::Blob(_) => "BLOB".into(),
302 Value::Boolean(b) => b.to_string(),
303 }
304}
305
306fn op_symbol(op: BinOp) -> &'static str {
307 match op {
308 BinOp::Lt => "<",
309 BinOp::LtEq => "<=",
310 BinOp::Gt => ">",
311 BinOp::GtEq => ">=",
312 BinOp::Eq => "=",
313 BinOp::NotEq => "!=",
314 _ => "?",
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use crate::types::{ColumnDef, DataType};
322
323 fn test_schema() -> TableSchema {
324 TableSchema {
325 name: "users".into(),
326 columns: vec![
327 ColumnDef {
328 name: "id".into(),
329 data_type: DataType::Integer,
330 nullable: false,
331 position: 0,
332 },
333 ColumnDef {
334 name: "name".into(),
335 data_type: DataType::Text,
336 nullable: true,
337 position: 1,
338 },
339 ColumnDef {
340 name: "age".into(),
341 data_type: DataType::Integer,
342 nullable: true,
343 position: 2,
344 },
345 ColumnDef {
346 name: "email".into(),
347 data_type: DataType::Text,
348 nullable: true,
349 position: 3,
350 },
351 ],
352 primary_key_columns: vec![0],
353 indices: vec![
354 IndexDef {
355 name: "idx_name".into(),
356 columns: vec![1],
357 unique: false,
358 },
359 IndexDef {
360 name: "idx_email".into(),
361 columns: vec![3],
362 unique: true,
363 },
364 IndexDef {
365 name: "idx_name_age".into(),
366 columns: vec![1, 2],
367 unique: false,
368 },
369 ],
370 }
371 }
372
373 #[test]
374 fn no_where_is_seq_scan() {
375 let schema = test_schema();
376 let plan = plan_select(&schema, &None);
377 assert!(matches!(plan, ScanPlan::SeqScan));
378 }
379
380 #[test]
381 fn pk_equality_is_pk_lookup() {
382 let schema = test_schema();
383 let where_clause = Some(Expr::BinaryOp {
384 left: Box::new(Expr::Column("id".into())),
385 op: BinOp::Eq,
386 right: Box::new(Expr::Literal(Value::Integer(42))),
387 });
388 let plan = plan_select(&schema, &where_clause);
389 match plan {
390 ScanPlan::PkLookup { pk_values } => {
391 assert_eq!(pk_values, vec![Value::Integer(42)]);
392 }
393 other => panic!("expected PkLookup, got {other:?}"),
394 }
395 }
396
397 #[test]
398 fn unique_index_equality() {
399 let schema = test_schema();
400 let where_clause = Some(Expr::BinaryOp {
401 left: Box::new(Expr::Column("email".into())),
402 op: BinOp::Eq,
403 right: Box::new(Expr::Literal(Value::Text("alice@test.com".into()))),
404 });
405 let plan = plan_select(&schema, &where_clause);
406 match plan {
407 ScanPlan::IndexScan {
408 index_name,
409 is_unique,
410 num_prefix_cols,
411 ..
412 } => {
413 assert_eq!(index_name, "idx_email");
414 assert!(is_unique);
415 assert_eq!(num_prefix_cols, 1);
416 }
417 other => panic!("expected IndexScan, got {other:?}"),
418 }
419 }
420
421 #[test]
422 fn non_unique_index_equality() {
423 let schema = test_schema();
424 let where_clause = Some(Expr::BinaryOp {
425 left: Box::new(Expr::Column("name".into())),
426 op: BinOp::Eq,
427 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
428 });
429 let plan = plan_select(&schema, &where_clause);
430 match plan {
431 ScanPlan::IndexScan {
432 index_name,
433 num_prefix_cols,
434 ..
435 } => {
436 assert!(index_name == "idx_name" || index_name == "idx_name_age");
437 assert_eq!(num_prefix_cols, 1);
438 }
439 other => panic!("expected IndexScan, got {other:?}"),
440 }
441 }
442
443 #[test]
444 fn composite_index_full_prefix() {
445 let schema = test_schema();
446 let where_clause = Some(Expr::BinaryOp {
447 left: Box::new(Expr::BinaryOp {
448 left: Box::new(Expr::Column("name".into())),
449 op: BinOp::Eq,
450 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
451 }),
452 op: BinOp::And,
453 right: Box::new(Expr::BinaryOp {
454 left: Box::new(Expr::Column("age".into())),
455 op: BinOp::Eq,
456 right: Box::new(Expr::Literal(Value::Integer(30))),
457 }),
458 });
459 let plan = plan_select(&schema, &where_clause);
460 match plan {
461 ScanPlan::IndexScan {
462 index_name,
463 num_prefix_cols,
464 ..
465 } => {
466 assert_eq!(index_name, "idx_name_age");
467 assert_eq!(num_prefix_cols, 2);
468 }
469 other => panic!("expected IndexScan, got {other:?}"),
470 }
471 }
472
473 #[test]
474 fn range_scan_on_indexed_column() {
475 let schema = test_schema();
476 let where_clause = Some(Expr::BinaryOp {
477 left: Box::new(Expr::Column("name".into())),
478 op: BinOp::Gt,
479 right: Box::new(Expr::Literal(Value::Text("M".into()))),
480 });
481 let plan = plan_select(&schema, &where_clause);
482 match plan {
483 ScanPlan::IndexScan {
484 range_conds,
485 num_prefix_cols,
486 ..
487 } => {
488 assert_eq!(num_prefix_cols, 0);
489 assert_eq!(range_conds.len(), 1);
490 assert_eq!(range_conds[0].0, BinOp::Gt);
491 }
492 other => panic!("expected IndexScan, got {other:?}"),
493 }
494 }
495
496 #[test]
497 fn composite_equality_plus_range() {
498 let schema = test_schema();
499 let where_clause = Some(Expr::BinaryOp {
500 left: Box::new(Expr::BinaryOp {
501 left: Box::new(Expr::Column("name".into())),
502 op: BinOp::Eq,
503 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
504 }),
505 op: BinOp::And,
506 right: Box::new(Expr::BinaryOp {
507 left: Box::new(Expr::Column("age".into())),
508 op: BinOp::Gt,
509 right: Box::new(Expr::Literal(Value::Integer(25))),
510 }),
511 });
512 let plan = plan_select(&schema, &where_clause);
513 match plan {
514 ScanPlan::IndexScan {
515 index_name,
516 num_prefix_cols,
517 range_conds,
518 ..
519 } => {
520 assert_eq!(index_name, "idx_name_age");
521 assert_eq!(num_prefix_cols, 1);
522 assert_eq!(range_conds.len(), 1);
523 }
524 other => panic!("expected IndexScan, got {other:?}"),
525 }
526 }
527
528 #[test]
529 fn or_condition_falls_back_to_seq_scan() {
530 let schema = test_schema();
531 let where_clause = Some(Expr::BinaryOp {
532 left: Box::new(Expr::BinaryOp {
533 left: Box::new(Expr::Column("name".into())),
534 op: BinOp::Eq,
535 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
536 }),
537 op: BinOp::Or,
538 right: Box::new(Expr::BinaryOp {
539 left: Box::new(Expr::Column("name".into())),
540 op: BinOp::Eq,
541 right: Box::new(Expr::Literal(Value::Text("Bob".into()))),
542 }),
543 });
544 let plan = plan_select(&schema, &where_clause);
545 assert!(matches!(plan, ScanPlan::SeqScan));
546 }
547
548 #[test]
549 fn non_indexed_column_is_seq_scan() {
550 let schema = test_schema();
551 let where_clause = Some(Expr::BinaryOp {
552 left: Box::new(Expr::Column("age".into())),
553 op: BinOp::Eq,
554 right: Box::new(Expr::Literal(Value::Integer(30))),
555 });
556 let plan = plan_select(&schema, &where_clause);
557 assert!(matches!(plan, ScanPlan::SeqScan));
558 }
559
560 #[test]
561 fn reversed_literal_column() {
562 let schema = test_schema();
563 let where_clause = Some(Expr::BinaryOp {
564 left: Box::new(Expr::Literal(Value::Integer(42))),
565 op: BinOp::Eq,
566 right: Box::new(Expr::Column("id".into())),
567 });
568 let plan = plan_select(&schema, &where_clause);
569 assert!(matches!(plan, ScanPlan::PkLookup { .. }));
570 }
571
572 #[test]
573 fn reversed_comparison_flips_op() {
574 let schema = test_schema();
575 let where_clause = Some(Expr::BinaryOp {
576 left: Box::new(Expr::Literal(Value::Integer(5))),
577 op: BinOp::Lt,
578 right: Box::new(Expr::Column("name".into())),
579 });
580 let plan = plan_select(&schema, &where_clause);
581 match plan {
582 ScanPlan::IndexScan { range_conds, .. } => {
583 assert_eq!(range_conds[0].0, BinOp::Gt);
584 }
585 other => panic!("expected IndexScan, got {other:?}"),
586 }
587 }
588
589 #[test]
590 fn prefers_unique_index() {
591 let schema = TableSchema {
592 name: "t".into(),
593 columns: vec![
594 ColumnDef {
595 name: "id".into(),
596 data_type: DataType::Integer,
597 nullable: false,
598 position: 0,
599 },
600 ColumnDef {
601 name: "code".into(),
602 data_type: DataType::Text,
603 nullable: false,
604 position: 1,
605 },
606 ],
607 primary_key_columns: vec![0],
608 indices: vec![
609 IndexDef {
610 name: "idx_code".into(),
611 columns: vec![1],
612 unique: false,
613 },
614 IndexDef {
615 name: "idx_code_uniq".into(),
616 columns: vec![1],
617 unique: true,
618 },
619 ],
620 };
621 let where_clause = Some(Expr::BinaryOp {
622 left: Box::new(Expr::Column("code".into())),
623 op: BinOp::Eq,
624 right: Box::new(Expr::Literal(Value::Text("X".into()))),
625 });
626 let plan = plan_select(&schema, &where_clause);
627 match plan {
628 ScanPlan::IndexScan {
629 index_name,
630 is_unique,
631 ..
632 } => {
633 assert_eq!(index_name, "idx_code_uniq");
634 assert!(is_unique);
635 }
636 other => panic!("expected IndexScan, got {other:?}"),
637 }
638 }
639
640 #[test]
641 fn prefers_more_equality_columns() {
642 let schema = test_schema();
643 let where_clause = Some(Expr::BinaryOp {
644 left: Box::new(Expr::BinaryOp {
645 left: Box::new(Expr::Column("name".into())),
646 op: BinOp::Eq,
647 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
648 }),
649 op: BinOp::And,
650 right: Box::new(Expr::BinaryOp {
651 left: Box::new(Expr::Column("age".into())),
652 op: BinOp::Eq,
653 right: Box::new(Expr::Literal(Value::Integer(30))),
654 }),
655 });
656 let plan = plan_select(&schema, &where_clause);
657 match plan {
658 ScanPlan::IndexScan {
659 index_name,
660 num_prefix_cols,
661 ..
662 } => {
663 assert_eq!(index_name, "idx_name_age");
664 assert_eq!(num_prefix_cols, 2);
665 }
666 other => panic!("expected IndexScan, got {other:?}"),
667 }
668 }
669}