1use crate::encoding::encode_composite_key;
7use crate::parser::{BinOp, Expr};
8use crate::types::{IndexDef, TableSchema, Value};
9
10#[derive(Debug)]
13pub enum ScanPlan {
14 SeqScan,
15 PkLookup {
16 pk_values: Vec<Value>,
17 },
18 IndexScan {
19 index_name: String,
20 idx_table: Vec<u8>,
21 prefix: Vec<u8>,
22 num_prefix_cols: usize,
23 range_conds: Vec<(BinOp, Value)>,
24 is_unique: bool,
25 index_columns: Vec<u16>,
26 },
27}
28
29struct SimplePredicate {
32 col_idx: usize,
33 op: BinOp,
34 value: Value,
35}
36
37fn flatten_and<'a>(expr: &'a Expr) -> Vec<&'a Expr> {
38 match expr {
39 Expr::BinaryOp { left, op: BinOp::And, right } => {
40 let mut v = flatten_and(left);
41 v.extend(flatten_and(right));
42 v
43 }
44 _ => vec![expr],
45 }
46}
47
48fn is_comparison(op: BinOp) -> bool {
49 matches!(op, BinOp::Eq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq)
50}
51
52fn is_range_op(op: BinOp) -> bool {
53 matches!(op, BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq)
54}
55
56fn flip_op(op: BinOp) -> BinOp {
57 match op {
58 BinOp::Lt => BinOp::Gt,
59 BinOp::LtEq => BinOp::GtEq,
60 BinOp::Gt => BinOp::Lt,
61 BinOp::GtEq => BinOp::LtEq,
62 other => other,
63 }
64}
65
66fn resolve_column_name<'a>(expr: &'a Expr) -> Option<&'a str> {
67 match expr {
68 Expr::Column(name) => Some(name.as_str()),
69 Expr::QualifiedColumn { column, .. } => Some(column.as_str()),
70 _ => None,
71 }
72}
73
74fn extract_simple_predicate(expr: &Expr, schema: &TableSchema) -> Option<SimplePredicate> {
75 match expr {
76 Expr::BinaryOp { left, op, right } if is_comparison(*op) => {
77 if let (Some(name), Expr::Literal(val)) = (resolve_column_name(left), right.as_ref()) {
78 let col_idx = schema.column_index(name)?;
79 return Some(SimplePredicate { col_idx, op: *op, value: val.clone() });
80 }
81 if let (Expr::Literal(val), Some(name)) = (left.as_ref(), resolve_column_name(right)) {
82 let col_idx = schema.column_index(name)?;
83 return Some(SimplePredicate { col_idx, op: flip_op(*op), value: val.clone() });
84 }
85 None
86 }
87 _ => None,
88 }
89}
90
91pub fn plan_select(schema: &TableSchema, where_clause: &Option<Expr>) -> ScanPlan {
94 let where_expr = match where_clause {
95 Some(e) => e,
96 None => return ScanPlan::SeqScan,
97 };
98
99 let predicates = flatten_and(where_expr);
100 let simple: Vec<Option<SimplePredicate>> = predicates.iter()
101 .map(|p| extract_simple_predicate(p, schema))
102 .collect();
103
104 if let Some(plan) = try_pk_lookup(schema, &simple) {
105 return plan;
106 }
107
108 if let Some(plan) = try_best_index(schema, &simple) {
109 return plan;
110 }
111
112 ScanPlan::SeqScan
113}
114
115fn try_pk_lookup(schema: &TableSchema, predicates: &[Option<SimplePredicate>]) -> Option<ScanPlan> {
116 let pk_cols = &schema.primary_key_columns;
117 let mut pk_values: Vec<Option<Value>> = vec![None; pk_cols.len()];
118
119 for pred in predicates.iter().flatten() {
120 if pred.op == BinOp::Eq {
121 if let Some(pk_pos) = pk_cols.iter().position(|&c| c == pred.col_idx as u16) {
122 pk_values[pk_pos] = Some(pred.value.clone());
123 }
124 }
125 }
126
127 if pk_values.iter().all(|v| v.is_some()) {
128 let values: Vec<Value> = pk_values.into_iter().map(|v| v.unwrap()).collect();
129 Some(ScanPlan::PkLookup { pk_values: values })
130 } else {
131 None
132 }
133}
134
135#[derive(PartialEq, Eq, PartialOrd, Ord)]
138struct IndexScore {
139 num_equality: usize,
140 has_range: bool,
141 is_unique: bool,
142}
143
144fn try_best_index(schema: &TableSchema, predicates: &[Option<SimplePredicate>]) -> Option<ScanPlan> {
145 let mut best_score: Option<IndexScore> = None;
146 let mut best_plan: Option<ScanPlan> = None;
147
148 for idx in &schema.indices {
149 if let Some((score, plan)) = try_index_scan(schema, idx, predicates) {
150 if best_score.is_none() || score > *best_score.as_ref().unwrap() {
151 best_score = Some(score);
152 best_plan = Some(plan);
153 }
154 }
155 }
156
157 best_plan
158}
159
160fn try_index_scan(
161 schema: &TableSchema,
162 idx: &IndexDef,
163 predicates: &[Option<SimplePredicate>],
164) -> Option<(IndexScore, ScanPlan)> {
165 let mut used = Vec::new();
166 let mut equality_values: Vec<Value> = Vec::new();
167 let mut range_conds: Vec<(BinOp, Value)> = Vec::new();
168
169 for &col_idx in &idx.columns {
170 let mut found_eq = false;
171 for (i, pred) in predicates.iter().enumerate() {
172 if used.contains(&i) { continue; }
173 if let Some(sp) = pred {
174 if sp.col_idx == col_idx as usize && sp.op == BinOp::Eq {
175 equality_values.push(sp.value.clone());
176 used.push(i);
177 found_eq = true;
178 break;
179 }
180 }
181 }
182 if !found_eq {
183 for (i, pred) in predicates.iter().enumerate() {
184 if used.contains(&i) { continue; }
185 if let Some(sp) = pred {
186 if sp.col_idx == col_idx as usize && is_range_op(sp.op) {
187 range_conds.push((sp.op, sp.value.clone()));
188 used.push(i);
189 }
190 }
191 }
192 break;
193 }
194 }
195
196 if equality_values.is_empty() && range_conds.is_empty() {
197 return None;
198 }
199
200 let score = IndexScore {
201 num_equality: equality_values.len(),
202 has_range: !range_conds.is_empty(),
203 is_unique: idx.unique,
204 };
205
206 let prefix = encode_composite_key(&equality_values);
207 let idx_table = TableSchema::index_table_name(&schema.name, &idx.name);
208
209 Some((score, ScanPlan::IndexScan {
210 index_name: idx.name.clone(),
211 idx_table,
212 prefix,
213 num_prefix_cols: equality_values.len(),
214 range_conds,
215 is_unique: idx.unique,
216 index_columns: idx.columns.clone(),
217 }))
218}
219
220pub fn describe_plan(plan: &ScanPlan, table_schema: &TableSchema) -> String {
223 match plan {
224 ScanPlan::SeqScan => String::new(),
225
226 ScanPlan::PkLookup { pk_values } => {
227 let pk_cols: Vec<&str> = table_schema.primary_key_columns.iter()
228 .map(|&idx| table_schema.columns[idx as usize].name.as_str())
229 .collect();
230 let conditions: Vec<String> = pk_cols.iter().zip(pk_values.iter())
231 .map(|(col, val)| format!("{col} = {}", format_value(val)))
232 .collect();
233 format!("USING PRIMARY KEY ({})", conditions.join(", "))
234 }
235
236 ScanPlan::IndexScan { index_name, num_prefix_cols, range_conds, index_columns, .. } => {
237 let mut conditions = Vec::new();
238 for i in 0..*num_prefix_cols {
239 let col_idx = index_columns[i] as usize;
240 let col_name = &table_schema.columns[col_idx].name;
241 conditions.push(format!("{col_name} = ?"));
242 }
243 if !range_conds.is_empty() && *num_prefix_cols < index_columns.len() {
244 let col_idx = index_columns[*num_prefix_cols] as usize;
245 let col_name = &table_schema.columns[col_idx].name;
246 for (op, _) in range_conds {
247 conditions.push(format!("{col_name} {} ?", op_symbol(*op)));
248 }
249 }
250 if conditions.is_empty() {
251 format!("USING INDEX {index_name}")
252 } else {
253 format!("USING INDEX {index_name} ({})", conditions.join(", "))
254 }
255 }
256 }
257}
258
259fn format_value(val: &Value) -> String {
260 match val {
261 Value::Null => "NULL".into(),
262 Value::Integer(i) => i.to_string(),
263 Value::Real(f) => format!("{f}"),
264 Value::Text(s) => format!("'{s}'"),
265 Value::Blob(_) => "BLOB".into(),
266 Value::Boolean(b) => b.to_string(),
267 }
268}
269
270fn op_symbol(op: BinOp) -> &'static str {
271 match op {
272 BinOp::Lt => "<",
273 BinOp::LtEq => "<=",
274 BinOp::Gt => ">",
275 BinOp::GtEq => ">=",
276 BinOp::Eq => "=",
277 BinOp::NotEq => "!=",
278 _ => "?",
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285 use crate::types::{ColumnDef, DataType};
286
287 fn test_schema() -> TableSchema {
288 TableSchema {
289 name: "users".into(),
290 columns: vec![
291 ColumnDef { name: "id".into(), data_type: DataType::Integer, nullable: false, position: 0 },
292 ColumnDef { name: "name".into(), data_type: DataType::Text, nullable: true, position: 1 },
293 ColumnDef { name: "age".into(), data_type: DataType::Integer, nullable: true, position: 2 },
294 ColumnDef { name: "email".into(), data_type: DataType::Text, nullable: true, position: 3 },
295 ],
296 primary_key_columns: vec![0],
297 indices: vec![
298 IndexDef { name: "idx_name".into(), columns: vec![1], unique: false },
299 IndexDef { name: "idx_email".into(), columns: vec![3], unique: true },
300 IndexDef { name: "idx_name_age".into(), columns: vec![1, 2], unique: false },
301 ],
302 }
303 }
304
305 #[test]
306 fn no_where_is_seq_scan() {
307 let schema = test_schema();
308 let plan = plan_select(&schema, &None);
309 assert!(matches!(plan, ScanPlan::SeqScan));
310 }
311
312 #[test]
313 fn pk_equality_is_pk_lookup() {
314 let schema = test_schema();
315 let where_clause = Some(Expr::BinaryOp {
316 left: Box::new(Expr::Column("id".into())),
317 op: BinOp::Eq,
318 right: Box::new(Expr::Literal(Value::Integer(42))),
319 });
320 let plan = plan_select(&schema, &where_clause);
321 match plan {
322 ScanPlan::PkLookup { pk_values } => {
323 assert_eq!(pk_values, vec![Value::Integer(42)]);
324 }
325 other => panic!("expected PkLookup, got {other:?}"),
326 }
327 }
328
329 #[test]
330 fn unique_index_equality() {
331 let schema = test_schema();
332 let where_clause = Some(Expr::BinaryOp {
333 left: Box::new(Expr::Column("email".into())),
334 op: BinOp::Eq,
335 right: Box::new(Expr::Literal(Value::Text("alice@test.com".into()))),
336 });
337 let plan = plan_select(&schema, &where_clause);
338 match plan {
339 ScanPlan::IndexScan { index_name, is_unique, num_prefix_cols, .. } => {
340 assert_eq!(index_name, "idx_email");
341 assert!(is_unique);
342 assert_eq!(num_prefix_cols, 1);
343 }
344 other => panic!("expected IndexScan, got {other:?}"),
345 }
346 }
347
348 #[test]
349 fn non_unique_index_equality() {
350 let schema = test_schema();
351 let where_clause = Some(Expr::BinaryOp {
352 left: Box::new(Expr::Column("name".into())),
353 op: BinOp::Eq,
354 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
355 });
356 let plan = plan_select(&schema, &where_clause);
357 match plan {
358 ScanPlan::IndexScan { index_name, num_prefix_cols, .. } => {
359 assert!(index_name == "idx_name" || index_name == "idx_name_age");
360 assert_eq!(num_prefix_cols, 1);
361 }
362 other => panic!("expected IndexScan, got {other:?}"),
363 }
364 }
365
366 #[test]
367 fn composite_index_full_prefix() {
368 let schema = test_schema();
369 let where_clause = Some(Expr::BinaryOp {
370 left: Box::new(Expr::BinaryOp {
371 left: Box::new(Expr::Column("name".into())),
372 op: BinOp::Eq,
373 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
374 }),
375 op: BinOp::And,
376 right: Box::new(Expr::BinaryOp {
377 left: Box::new(Expr::Column("age".into())),
378 op: BinOp::Eq,
379 right: Box::new(Expr::Literal(Value::Integer(30))),
380 }),
381 });
382 let plan = plan_select(&schema, &where_clause);
383 match plan {
384 ScanPlan::IndexScan { index_name, num_prefix_cols, .. } => {
385 assert_eq!(index_name, "idx_name_age");
386 assert_eq!(num_prefix_cols, 2);
387 }
388 other => panic!("expected IndexScan, got {other:?}"),
389 }
390 }
391
392 #[test]
393 fn range_scan_on_indexed_column() {
394 let schema = test_schema();
395 let where_clause = Some(Expr::BinaryOp {
396 left: Box::new(Expr::Column("name".into())),
397 op: BinOp::Gt,
398 right: Box::new(Expr::Literal(Value::Text("M".into()))),
399 });
400 let plan = plan_select(&schema, &where_clause);
401 match plan {
402 ScanPlan::IndexScan { range_conds, num_prefix_cols, .. } => {
403 assert_eq!(num_prefix_cols, 0);
404 assert_eq!(range_conds.len(), 1);
405 assert_eq!(range_conds[0].0, BinOp::Gt);
406 }
407 other => panic!("expected IndexScan, got {other:?}"),
408 }
409 }
410
411 #[test]
412 fn composite_equality_plus_range() {
413 let schema = test_schema();
414 let where_clause = Some(Expr::BinaryOp {
415 left: Box::new(Expr::BinaryOp {
416 left: Box::new(Expr::Column("name".into())),
417 op: BinOp::Eq,
418 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
419 }),
420 op: BinOp::And,
421 right: Box::new(Expr::BinaryOp {
422 left: Box::new(Expr::Column("age".into())),
423 op: BinOp::Gt,
424 right: Box::new(Expr::Literal(Value::Integer(25))),
425 }),
426 });
427 let plan = plan_select(&schema, &where_clause);
428 match plan {
429 ScanPlan::IndexScan { index_name, num_prefix_cols, range_conds, .. } => {
430 assert_eq!(index_name, "idx_name_age");
431 assert_eq!(num_prefix_cols, 1);
432 assert_eq!(range_conds.len(), 1);
433 }
434 other => panic!("expected IndexScan, got {other:?}"),
435 }
436 }
437
438 #[test]
439 fn or_condition_falls_back_to_seq_scan() {
440 let schema = test_schema();
441 let where_clause = Some(Expr::BinaryOp {
442 left: Box::new(Expr::BinaryOp {
443 left: Box::new(Expr::Column("name".into())),
444 op: BinOp::Eq,
445 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
446 }),
447 op: BinOp::Or,
448 right: Box::new(Expr::BinaryOp {
449 left: Box::new(Expr::Column("name".into())),
450 op: BinOp::Eq,
451 right: Box::new(Expr::Literal(Value::Text("Bob".into()))),
452 }),
453 });
454 let plan = plan_select(&schema, &where_clause);
455 assert!(matches!(plan, ScanPlan::SeqScan));
456 }
457
458 #[test]
459 fn non_indexed_column_is_seq_scan() {
460 let schema = test_schema();
461 let where_clause = Some(Expr::BinaryOp {
462 left: Box::new(Expr::Column("age".into())),
463 op: BinOp::Eq,
464 right: Box::new(Expr::Literal(Value::Integer(30))),
465 });
466 let plan = plan_select(&schema, &where_clause);
467 assert!(matches!(plan, ScanPlan::SeqScan));
468 }
469
470 #[test]
471 fn reversed_literal_column() {
472 let schema = test_schema();
473 let where_clause = Some(Expr::BinaryOp {
474 left: Box::new(Expr::Literal(Value::Integer(42))),
475 op: BinOp::Eq,
476 right: Box::new(Expr::Column("id".into())),
477 });
478 let plan = plan_select(&schema, &where_clause);
479 assert!(matches!(plan, ScanPlan::PkLookup { .. }));
480 }
481
482 #[test]
483 fn reversed_comparison_flips_op() {
484 let schema = test_schema();
485 let where_clause = Some(Expr::BinaryOp {
486 left: Box::new(Expr::Literal(Value::Integer(5))),
487 op: BinOp::Lt,
488 right: Box::new(Expr::Column("name".into())),
489 });
490 let plan = plan_select(&schema, &where_clause);
491 match plan {
492 ScanPlan::IndexScan { range_conds, .. } => {
493 assert_eq!(range_conds[0].0, BinOp::Gt);
494 }
495 other => panic!("expected IndexScan, got {other:?}"),
496 }
497 }
498
499 #[test]
500 fn prefers_unique_index() {
501 let schema = TableSchema {
502 name: "t".into(),
503 columns: vec![
504 ColumnDef { name: "id".into(), data_type: DataType::Integer, nullable: false, position: 0 },
505 ColumnDef { name: "code".into(), data_type: DataType::Text, nullable: false, position: 1 },
506 ],
507 primary_key_columns: vec![0],
508 indices: vec![
509 IndexDef { name: "idx_code".into(), columns: vec![1], unique: false },
510 IndexDef { name: "idx_code_uniq".into(), columns: vec![1], unique: true },
511 ],
512 };
513 let where_clause = Some(Expr::BinaryOp {
514 left: Box::new(Expr::Column("code".into())),
515 op: BinOp::Eq,
516 right: Box::new(Expr::Literal(Value::Text("X".into()))),
517 });
518 let plan = plan_select(&schema, &where_clause);
519 match plan {
520 ScanPlan::IndexScan { index_name, is_unique, .. } => {
521 assert_eq!(index_name, "idx_code_uniq");
522 assert!(is_unique);
523 }
524 other => panic!("expected IndexScan, got {other:?}"),
525 }
526 }
527
528 #[test]
529 fn prefers_more_equality_columns() {
530 let schema = test_schema();
531 let where_clause = Some(Expr::BinaryOp {
532 left: Box::new(Expr::BinaryOp {
533 left: Box::new(Expr::Column("name".into())),
534 op: BinOp::Eq,
535 right: Box::new(Expr::Literal(Value::Text("Alice".into()))),
536 }),
537 op: BinOp::And,
538 right: Box::new(Expr::BinaryOp {
539 left: Box::new(Expr::Column("age".into())),
540 op: BinOp::Eq,
541 right: Box::new(Expr::Literal(Value::Integer(30))),
542 }),
543 });
544 let plan = plan_select(&schema, &where_clause);
545 match plan {
546 ScanPlan::IndexScan { index_name, num_prefix_cols, .. } => {
547 assert_eq!(index_name, "idx_name_age");
548 assert_eq!(num_prefix_cols, 2);
549 }
550 other => panic!("expected IndexScan, got {other:?}"),
551 }
552 }
553}