1use crate::encoding::encode_composite_key;
7use crate::parser::{BinOp, Expr};
8use crate::types::{IndexDef, TableSchema, Value};
9
10#[derive(Debug)]
13pub enum ScanPlan {
14 SeqScan,
15 PkLookup {
16 pk_values: Vec<Value>,
17 },
18 IndexScan {
19 index_name: String,
20 idx_table: Vec<u8>,
21 prefix: Vec<u8>,
22 num_prefix_cols: usize,
23 range_conds: Vec<(BinOp, Value)>,
24 is_unique: bool,
25 index_columns: Vec<u16>,
26 },
27}
28
29struct SimplePredicate {
32 col_idx: usize,
33 op: BinOp,
34 value: Value,
35}
36
37fn flatten_and(expr: &Expr) -> Vec<&Expr> {
38 match expr {
39 Expr::BinaryOp {
40 left,
41 op: BinOp::And,
42 right,
43 } => {
44 let mut v = flatten_and(left);
45 v.extend(flatten_and(right));
46 v
47 }
48 _ => vec![expr],
49 }
50}
51
52fn is_comparison(op: BinOp) -> bool {
53 matches!(
54 op,
55 BinOp::Eq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq
56 )
57}
58
59fn is_range_op(op: BinOp) -> bool {
60 matches!(op, BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq)
61}
62
63fn flip_op(op: BinOp) -> BinOp {
64 match op {
65 BinOp::Lt => BinOp::Gt,
66 BinOp::LtEq => BinOp::GtEq,
67 BinOp::Gt => BinOp::Lt,
68 BinOp::GtEq => BinOp::LtEq,
69 other => other,
70 }
71}
72
73fn resolve_column_name(expr: &Expr) -> Option<&str> {
74 match expr {
75 Expr::Column(name) => Some(name.as_str()),
76 Expr::QualifiedColumn { column, .. } => Some(column.as_str()),
77 _ => None,
78 }
79}
80
81fn extract_simple_predicate(expr: &Expr, schema: &TableSchema) -> Option<SimplePredicate> {
82 match expr {
83 Expr::BinaryOp { left, op, right } if is_comparison(*op) => {
84 if let (Some(name), Expr::Literal(val)) = (resolve_column_name(left), right.as_ref()) {
85 let col_idx = schema.column_index(name)?;
86 return Some(SimplePredicate {
87 col_idx,
88 op: *op,
89 value: val.clone(),
90 });
91 }
92 if let (Expr::Literal(val), Some(name)) = (left.as_ref(), resolve_column_name(right)) {
93 let col_idx = schema.column_index(name)?;
94 return Some(SimplePredicate {
95 col_idx,
96 op: flip_op(*op),
97 value: val.clone(),
98 });
99 }
100 None
101 }
102 _ => None,
103 }
104}
105
106pub fn plan_select(schema: &TableSchema, where_clause: &Option<Expr>) -> ScanPlan {
109 let where_expr = match where_clause {
110 Some(e) => e,
111 None => return ScanPlan::SeqScan,
112 };
113
114 let predicates = flatten_and(where_expr);
115 let simple: Vec<Option<SimplePredicate>> = predicates
116 .iter()
117 .map(|p| extract_simple_predicate(p, schema))
118 .collect();
119
120 if let Some(plan) = try_pk_lookup(schema, &simple) {
121 return plan;
122 }
123
124 if let Some(plan) = try_best_index(schema, &simple) {
125 return plan;
126 }
127
128 ScanPlan::SeqScan
129}
130
131fn try_pk_lookup(schema: &TableSchema, predicates: &[Option<SimplePredicate>]) -> Option<ScanPlan> {
132 let pk_cols = &schema.primary_key_columns;
133 let mut pk_values: Vec<Option<Value>> = vec![None; pk_cols.len()];
134
135 for pred in predicates.iter().flatten() {
136 if pred.op == BinOp::Eq {
137 if let Some(pk_pos) = pk_cols.iter().position(|&c| c == pred.col_idx as u16) {
138 pk_values[pk_pos] = Some(pred.value.clone());
139 }
140 }
141 }
142
143 if pk_values.iter().all(|v| v.is_some()) {
144 let values: Vec<Value> = pk_values.into_iter().map(|v| v.unwrap()).collect();
145 Some(ScanPlan::PkLookup { pk_values: values })
146 } else {
147 None
148 }
149}
150
151#[derive(PartialEq, Eq, PartialOrd, Ord)]
154struct IndexScore {
155 num_equality: usize,
156 has_range: bool,
157 is_unique: bool,
158}
159
160fn try_best_index(
161 schema: &TableSchema,
162 predicates: &[Option<SimplePredicate>],
163) -> Option<ScanPlan> {
164 let mut best_score: Option<IndexScore> = None;
165 let mut best_plan: Option<ScanPlan> = None;
166
167 for idx in &schema.indices {
168 if let Some((score, plan)) = try_index_scan(schema, idx, predicates) {
169 if best_score.is_none() || score > *best_score.as_ref().unwrap() {
170 best_score = Some(score);
171 best_plan = Some(plan);
172 }
173 }
174 }
175
176 best_plan
177}
178
179fn try_index_scan(
180 schema: &TableSchema,
181 idx: &IndexDef,
182 predicates: &[Option<SimplePredicate>],
183) -> Option<(IndexScore, ScanPlan)> {
184 let mut used = Vec::new();
185 let mut equality_values: Vec<Value> = Vec::new();
186 let mut range_conds: Vec<(BinOp, Value)> = Vec::new();
187
188 for &col_idx in &idx.columns {
189 let mut found_eq = false;
190 for (i, pred) in predicates.iter().enumerate() {
191 if used.contains(&i) {
192 continue;
193 }
194 if let Some(sp) = pred {
195 if sp.col_idx == col_idx as usize && sp.op == BinOp::Eq {
196 equality_values.push(sp.value.clone());
197 used.push(i);
198 found_eq = true;
199 break;
200 }
201 }
202 }
203 if !found_eq {
204 for (i, pred) in predicates.iter().enumerate() {
205 if used.contains(&i) {
206 continue;
207 }
208 if let Some(sp) = pred {
209 if sp.col_idx == col_idx as usize && is_range_op(sp.op) {
210 range_conds.push((sp.op, sp.value.clone()));
211 used.push(i);
212 }
213 }
214 }
215 break;
216 }
217 }
218
219 if equality_values.is_empty() && range_conds.is_empty() {
220 return None;
221 }
222
223 let score = IndexScore {
224 num_equality: equality_values.len(),
225 has_range: !range_conds.is_empty(),
226 is_unique: idx.unique,
227 };
228
229 let prefix = encode_composite_key(&equality_values);
230 let idx_table = TableSchema::index_table_name(&schema.name, &idx.name);
231
232 Some((
233 score,
234 ScanPlan::IndexScan {
235 index_name: idx.name.clone(),
236 idx_table,
237 prefix,
238 num_prefix_cols: equality_values.len(),
239 range_conds,
240 is_unique: idx.unique,
241 index_columns: idx.columns.clone(),
242 },
243 ))
244}
245
246pub fn describe_plan(plan: &ScanPlan, table_schema: &TableSchema) -> String {
249 match plan {
250 ScanPlan::SeqScan => String::new(),
251
252 ScanPlan::PkLookup { pk_values } => {
253 let pk_cols: Vec<&str> = table_schema
254 .primary_key_columns
255 .iter()
256 .map(|&idx| table_schema.columns[idx as usize].name.as_str())
257 .collect();
258 let conditions: Vec<String> = pk_cols
259 .iter()
260 .zip(pk_values.iter())
261 .map(|(col, val)| format!("{col} = {}", format_value(val)))
262 .collect();
263 format!("USING PRIMARY KEY ({})", conditions.join(", "))
264 }
265
266 ScanPlan::IndexScan {
267 index_name,
268 num_prefix_cols,
269 range_conds,
270 index_columns,
271 ..
272 } => {
273 let mut conditions = Vec::new();
274 for &col in index_columns.iter().take(*num_prefix_cols) {
275 let col_idx = col as usize;
276 let col_name = &table_schema.columns[col_idx].name;
277 conditions.push(format!("{col_name} = ?"));
278 }
279 if !range_conds.is_empty() && *num_prefix_cols < index_columns.len() {
280 let col_idx = index_columns[*num_prefix_cols] as usize;
281 let col_name = &table_schema.columns[col_idx].name;
282 for (op, _) in range_conds {
283 conditions.push(format!("{col_name} {} ?", op_symbol(*op)));
284 }
285 }
286 if conditions.is_empty() {
287 format!("USING INDEX {index_name}")
288 } else {
289 format!("USING INDEX {index_name} ({})", conditions.join(", "))
290 }
291 }
292 }
293}
294
295fn format_value(val: &Value) -> String {
296 match val {
297 Value::Null => "NULL".into(),
298 Value::Integer(i) => i.to_string(),
299 Value::Real(f) => format!("{f}"),
300 Value::Text(s) => format!("'{s}'"),
301 Value::Blob(_) => "BLOB".into(),
302 Value::Boolean(b) => b.to_string(),
303 }
304}
305
306fn op_symbol(op: BinOp) -> &'static str {
307 match op {
308 BinOp::Lt => "<",
309 BinOp::LtEq => "<=",
310 BinOp::Gt => ">",
311 BinOp::GtEq => ">=",
312 BinOp::Eq => "=",
313 BinOp::NotEq => "!=",
314 _ => "?",
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use crate::types::{ColumnDef, DataType};
322
323 fn col(name: &str, dt: DataType, nullable: bool, pos: u16) -> ColumnDef {
324 ColumnDef {
325 name: name.into(),
326 data_type: dt,
327 nullable,
328 position: pos,
329 default_expr: None,
330 default_sql: None,
331 check_expr: None,
332 check_sql: None,
333 check_name: None,
334 }
335 }
336
337 fn test_schema() -> TableSchema {
338 TableSchema::new(
339 "users".into(),
340 vec![
341 col("id", DataType::Integer, false, 0),
342 col("name", DataType::Text, true, 1),
343 col("age", DataType::Integer, true, 2),
344 col("email", DataType::Text, true, 3),
345 ],
346 vec![0],
347 vec![
348 IndexDef {
349 name: "idx_name".into(),
350 columns: vec![1],
351 unique: false,
352 },
353 IndexDef {
354 name: "idx_email".into(),
355 columns: vec![3],
356 unique: true,
357 },
358 IndexDef {
359 name: "idx_name_age".into(),
360 columns: vec![1, 2],
361 unique: false,
362 },
363 ],
364 vec![],
365 vec![],
366 )
367 }
368
369 #[test]
370 fn no_where_is_seq_scan() {
371 let schema = test_schema();
372 let plan = plan_select(&schema, &None);
373 assert!(matches!(plan, ScanPlan::SeqScan));
374 }
375
376 #[test]
377 fn pk_equality_is_pk_lookup() {
378 let schema = test_schema();
379 let where_clause = Some(Expr::BinaryOp {
380 left: Box::new(Expr::Column("id".into())),
381 op: BinOp::Eq,
382 right: Box::new(Expr::Literal(Value::Integer(42))),
383 });
384 let plan = plan_select(&schema, &where_clause);
385 match plan {
386 ScanPlan::PkLookup { pk_values } => {
387 assert_eq!(pk_values, vec![Value::Integer(42)]);
388 }
389 other => panic!("expected PkLookup, got {other:?}"),
390 }
391 }
392
393 #[test]
394 fn unique_index_equality() {
395 let schema = test_schema();
396 let where_clause = Some(Expr::BinaryOp {
397 left: Box::new(Expr::Column("email".into())),
398 op: BinOp::Eq,
399 right: Box::new(Expr::Literal(Value::Text("alice@test.com".into()))),
400 });
401 let plan = plan_select(&schema, &where_clause);
402 match plan {
403 ScanPlan::IndexScan {
404 index_name,
405 is_unique,
406 num_prefix_cols,
407 ..
408 } => {
409 assert_eq!(index_name, "idx_email");
410 assert!(is_unique);
411 assert_eq!(num_prefix_cols, 1);
412 }
413 other => panic!("expected IndexScan, got {other:?}"),
414 }
415 }
416
417 #[test]
418 fn non_unique_index_equality() {
419 let schema = test_schema();
420 let where_clause = Some(Expr::BinaryOp {
421 left: Box::new(Expr::Column("name".into())),
422 op: BinOp::Eq,
423 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
424 });
425 let plan = plan_select(&schema, &where_clause);
426 match plan {
427 ScanPlan::IndexScan {
428 index_name,
429 num_prefix_cols,
430 ..
431 } => {
432 assert!(index_name == "idx_name" || index_name == "idx_name_age");
433 assert_eq!(num_prefix_cols, 1);
434 }
435 other => panic!("expected IndexScan, got {other:?}"),
436 }
437 }
438
439 #[test]
440 fn composite_index_full_prefix() {
441 let schema = test_schema();
442 let where_clause = Some(Expr::BinaryOp {
443 left: Box::new(Expr::BinaryOp {
444 left: Box::new(Expr::Column("name".into())),
445 op: BinOp::Eq,
446 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
447 }),
448 op: BinOp::And,
449 right: Box::new(Expr::BinaryOp {
450 left: Box::new(Expr::Column("age".into())),
451 op: BinOp::Eq,
452 right: Box::new(Expr::Literal(Value::Integer(30))),
453 }),
454 });
455 let plan = plan_select(&schema, &where_clause);
456 match plan {
457 ScanPlan::IndexScan {
458 index_name,
459 num_prefix_cols,
460 ..
461 } => {
462 assert_eq!(index_name, "idx_name_age");
463 assert_eq!(num_prefix_cols, 2);
464 }
465 other => panic!("expected IndexScan, got {other:?}"),
466 }
467 }
468
469 #[test]
470 fn range_scan_on_indexed_column() {
471 let schema = test_schema();
472 let where_clause = Some(Expr::BinaryOp {
473 left: Box::new(Expr::Column("name".into())),
474 op: BinOp::Gt,
475 right: Box::new(Expr::Literal(Value::Text("M".into()))),
476 });
477 let plan = plan_select(&schema, &where_clause);
478 match plan {
479 ScanPlan::IndexScan {
480 range_conds,
481 num_prefix_cols,
482 ..
483 } => {
484 assert_eq!(num_prefix_cols, 0);
485 assert_eq!(range_conds.len(), 1);
486 assert_eq!(range_conds[0].0, BinOp::Gt);
487 }
488 other => panic!("expected IndexScan, got {other:?}"),
489 }
490 }
491
492 #[test]
493 fn composite_equality_plus_range() {
494 let schema = test_schema();
495 let where_clause = Some(Expr::BinaryOp {
496 left: Box::new(Expr::BinaryOp {
497 left: Box::new(Expr::Column("name".into())),
498 op: BinOp::Eq,
499 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
500 }),
501 op: BinOp::And,
502 right: Box::new(Expr::BinaryOp {
503 left: Box::new(Expr::Column("age".into())),
504 op: BinOp::Gt,
505 right: Box::new(Expr::Literal(Value::Integer(25))),
506 }),
507 });
508 let plan = plan_select(&schema, &where_clause);
509 match plan {
510 ScanPlan::IndexScan {
511 index_name,
512 num_prefix_cols,
513 range_conds,
514 ..
515 } => {
516 assert_eq!(index_name, "idx_name_age");
517 assert_eq!(num_prefix_cols, 1);
518 assert_eq!(range_conds.len(), 1);
519 }
520 other => panic!("expected IndexScan, got {other:?}"),
521 }
522 }
523
524 #[test]
525 fn or_condition_falls_back_to_seq_scan() {
526 let schema = test_schema();
527 let where_clause = Some(Expr::BinaryOp {
528 left: Box::new(Expr::BinaryOp {
529 left: Box::new(Expr::Column("name".into())),
530 op: BinOp::Eq,
531 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
532 }),
533 op: BinOp::Or,
534 right: Box::new(Expr::BinaryOp {
535 left: Box::new(Expr::Column("name".into())),
536 op: BinOp::Eq,
537 right: Box::new(Expr::Literal(Value::Text("Bob".into()))),
538 }),
539 });
540 let plan = plan_select(&schema, &where_clause);
541 assert!(matches!(plan, ScanPlan::SeqScan));
542 }
543
544 #[test]
545 fn non_indexed_column_is_seq_scan() {
546 let schema = test_schema();
547 let where_clause = Some(Expr::BinaryOp {
548 left: Box::new(Expr::Column("age".into())),
549 op: BinOp::Eq,
550 right: Box::new(Expr::Literal(Value::Integer(30))),
551 });
552 let plan = plan_select(&schema, &where_clause);
553 assert!(matches!(plan, ScanPlan::SeqScan));
554 }
555
556 #[test]
557 fn reversed_literal_column() {
558 let schema = test_schema();
559 let where_clause = Some(Expr::BinaryOp {
560 left: Box::new(Expr::Literal(Value::Integer(42))),
561 op: BinOp::Eq,
562 right: Box::new(Expr::Column("id".into())),
563 });
564 let plan = plan_select(&schema, &where_clause);
565 assert!(matches!(plan, ScanPlan::PkLookup { .. }));
566 }
567
568 #[test]
569 fn reversed_comparison_flips_op() {
570 let schema = test_schema();
571 let where_clause = Some(Expr::BinaryOp {
572 left: Box::new(Expr::Literal(Value::Integer(5))),
573 op: BinOp::Lt,
574 right: Box::new(Expr::Column("name".into())),
575 });
576 let plan = plan_select(&schema, &where_clause);
577 match plan {
578 ScanPlan::IndexScan { range_conds, .. } => {
579 assert_eq!(range_conds[0].0, BinOp::Gt);
580 }
581 other => panic!("expected IndexScan, got {other:?}"),
582 }
583 }
584
585 #[test]
586 fn prefers_unique_index() {
587 let schema = TableSchema::new(
588 "t".into(),
589 vec![
590 col("id", DataType::Integer, false, 0),
591 col("code", DataType::Text, false, 1),
592 ],
593 vec![0],
594 vec![
595 IndexDef {
596 name: "idx_code".into(),
597 columns: vec![1],
598 unique: false,
599 },
600 IndexDef {
601 name: "idx_code_uniq".into(),
602 columns: vec![1],
603 unique: true,
604 },
605 ],
606 vec![],
607 vec![],
608 );
609 let where_clause = Some(Expr::BinaryOp {
610 left: Box::new(Expr::Column("code".into())),
611 op: BinOp::Eq,
612 right: Box::new(Expr::Literal(Value::Text("X".into()))),
613 });
614 let plan = plan_select(&schema, &where_clause);
615 match plan {
616 ScanPlan::IndexScan {
617 index_name,
618 is_unique,
619 ..
620 } => {
621 assert_eq!(index_name, "idx_code_uniq");
622 assert!(is_unique);
623 }
624 other => panic!("expected IndexScan, got {other:?}"),
625 }
626 }
627
628 #[test]
629 fn prefers_more_equality_columns() {
630 let schema = test_schema();
631 let where_clause = Some(Expr::BinaryOp {
632 left: Box::new(Expr::BinaryOp {
633 left: Box::new(Expr::Column("name".into())),
634 op: BinOp::Eq,
635 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
636 }),
637 op: BinOp::And,
638 right: Box::new(Expr::BinaryOp {
639 left: Box::new(Expr::Column("age".into())),
640 op: BinOp::Eq,
641 right: Box::new(Expr::Literal(Value::Integer(30))),
642 }),
643 });
644 let plan = plan_select(&schema, &where_clause);
645 match plan {
646 ScanPlan::IndexScan {
647 index_name,
648 num_prefix_cols,
649 ..
650 } => {
651 assert_eq!(index_name, "idx_name_age");
652 assert_eq!(num_prefix_cols, 2);
653 }
654 other => panic!("expected IndexScan, got {other:?}"),
655 }
656 }
657}