1use crate::encoding::encode_composite_key;
4use crate::parser::{BinOp, Expr};
5use crate::types::{IndexDef, TableSchema, Value};
6
7#[derive(Debug, Clone)]
8pub enum ScanPlan {
9 SeqScan,
10 PkLookup {
11 pk_values: Vec<Value>,
12 },
13 PkRangeScan {
14 start_key: Vec<u8>,
15 range_conds: Vec<(BinOp, Value)>,
16 num_pk_cols: usize,
17 },
18 IndexScan {
19 index_name: String,
20 idx_table: Vec<u8>,
21 prefix: Vec<u8>,
22 num_prefix_cols: usize,
23 range_conds: Vec<(BinOp, Value)>,
24 is_unique: bool,
25 index_columns: Vec<u16>,
26 },
27}
28
29struct SimplePredicate {
30 col_idx: usize,
31 op: BinOp,
32 value: Value,
33}
34
35fn flatten_and(expr: &Expr) -> Vec<&Expr> {
36 match expr {
37 Expr::BinaryOp {
38 left,
39 op: BinOp::And,
40 right,
41 } => {
42 let mut v = flatten_and(left);
43 v.extend(flatten_and(right));
44 v
45 }
46 _ => vec![expr],
47 }
48}
49
50fn is_comparison(op: BinOp) -> bool {
51 matches!(
52 op,
53 BinOp::Eq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq
54 )
55}
56
57fn is_range_op(op: BinOp) -> bool {
58 matches!(op, BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq)
59}
60
61fn flip_op(op: BinOp) -> BinOp {
62 match op {
63 BinOp::Lt => BinOp::Gt,
64 BinOp::LtEq => BinOp::GtEq,
65 BinOp::Gt => BinOp::Lt,
66 BinOp::GtEq => BinOp::LtEq,
67 other => other,
68 }
69}
70
71fn resolve_column_name(expr: &Expr) -> Option<&str> {
72 match expr {
73 Expr::Column(name) => Some(name.as_str()),
74 Expr::QualifiedColumn { column, .. } => Some(column.as_str()),
75 _ => None,
76 }
77}
78
79fn resolve_literal(expr: &Expr) -> Option<Value> {
80 match expr {
81 Expr::Literal(v) => Some(v.clone()),
82 Expr::Parameter(n) => crate::eval::resolve_scoped_param(*n).ok(),
83 _ => None,
84 }
85}
86
87fn extract_simple_predicate(expr: &Expr, schema: &TableSchema) -> Option<SimplePredicate> {
88 match expr {
89 Expr::BinaryOp { left, op, right } if is_comparison(*op) => {
90 if let (Some(name), Some(val)) = (resolve_column_name(left), resolve_literal(right)) {
91 let col_idx = schema.column_index(name)?;
92 return Some(SimplePredicate {
93 col_idx,
94 op: *op,
95 value: val,
96 });
97 }
98 if let (Some(val), Some(name)) = (resolve_literal(left), resolve_column_name(right)) {
99 let col_idx = schema.column_index(name)?;
100 return Some(SimplePredicate {
101 col_idx,
102 op: flip_op(*op),
103 value: val,
104 });
105 }
106 None
107 }
108 _ => None,
109 }
110}
111
112fn flatten_between(expr: &Expr, schema: &TableSchema, out: &mut Vec<SimplePredicate>) {
114 match expr {
115 Expr::Between {
116 expr: col_expr,
117 low,
118 high,
119 negated: false,
120 } => {
121 if let (Some(name), Some(lo), Some(hi)) = (
122 resolve_column_name(col_expr),
123 resolve_literal(low),
124 resolve_literal(high),
125 ) {
126 if let Some(col_idx) = schema.column_index(name) {
127 out.push(SimplePredicate {
128 col_idx,
129 op: BinOp::GtEq,
130 value: lo,
131 });
132 out.push(SimplePredicate {
133 col_idx,
134 op: BinOp::LtEq,
135 value: hi,
136 });
137 }
138 }
139 }
140 Expr::BinaryOp {
141 left,
142 op: BinOp::And,
143 right,
144 } => {
145 flatten_between(left, schema, out);
146 flatten_between(right, schema, out);
147 }
148 _ => {}
149 }
150}
151
152pub fn plan_select(schema: &TableSchema, where_clause: &Option<Expr>) -> ScanPlan {
153 let where_expr = match where_clause {
154 Some(e) => e,
155 None => return ScanPlan::SeqScan,
156 };
157
158 let predicates = flatten_and(where_expr);
159 let simple: Vec<Option<SimplePredicate>> = predicates
160 .iter()
161 .map(|p| extract_simple_predicate(p, schema))
162 .collect();
163
164 if let Some(plan) = try_pk_lookup(schema, &simple) {
165 return plan;
166 }
167
168 let mut range_preds: Vec<SimplePredicate> = simple
169 .iter()
170 .filter_map(|p| {
171 let p = p.as_ref()?;
172 if is_range_op(p.op) {
173 Some(SimplePredicate {
174 col_idx: p.col_idx,
175 op: p.op,
176 value: p.value.clone(),
177 })
178 } else {
179 None
180 }
181 })
182 .collect();
183 flatten_between(where_expr, schema, &mut range_preds);
184
185 if let Some(plan) = try_pk_range_scan(schema, &range_preds) {
186 return plan;
187 }
188
189 if let Some(plan) = try_best_index(schema, &simple) {
190 return plan;
191 }
192
193 ScanPlan::SeqScan
194}
195
196fn try_pk_range_scan(schema: &TableSchema, range_preds: &[SimplePredicate]) -> Option<ScanPlan> {
197 if schema.primary_key_columns.len() != 1 {
198 return None; }
200 let pk_col = schema.primary_key_columns[0] as usize;
201 let conds: Vec<(BinOp, Value)> = range_preds
202 .iter()
203 .filter(|p| p.col_idx == pk_col)
204 .map(|p| (p.op, p.value.clone()))
205 .collect();
206 if conds.is_empty() {
207 return None;
208 }
209 let start_key = conds
210 .iter()
211 .filter(|(op, _)| matches!(op, BinOp::GtEq | BinOp::Gt))
212 .map(|(_, v)| encode_composite_key(std::slice::from_ref(v)))
213 .min_by(|a, b| a.cmp(b))
214 .unwrap_or_default();
215 Some(ScanPlan::PkRangeScan {
216 start_key,
217 range_conds: conds,
218 num_pk_cols: 1,
219 })
220}
221
222fn try_pk_lookup(schema: &TableSchema, predicates: &[Option<SimplePredicate>]) -> Option<ScanPlan> {
223 let pk_cols = &schema.primary_key_columns;
224 let mut pk_values: Vec<Option<Value>> = vec![None; pk_cols.len()];
225
226 for pred in predicates.iter().flatten() {
227 if pred.op == BinOp::Eq {
228 if let Some(pk_pos) = pk_cols.iter().position(|&c| c == pred.col_idx as u16) {
229 pk_values[pk_pos] = Some(pred.value.clone());
230 }
231 }
232 }
233
234 if pk_values.iter().all(|v| v.is_some()) {
235 let values: Vec<Value> = pk_values.into_iter().map(|v| v.unwrap()).collect();
236 Some(ScanPlan::PkLookup { pk_values: values })
237 } else {
238 None
239 }
240}
241
242#[derive(PartialEq, Eq, PartialOrd, Ord)]
243struct IndexScore {
244 num_equality: usize,
245 has_range: bool,
246 is_unique: bool,
247}
248
249fn try_best_index(
250 schema: &TableSchema,
251 predicates: &[Option<SimplePredicate>],
252) -> Option<ScanPlan> {
253 let mut best_score: Option<IndexScore> = None;
254 let mut best_plan: Option<ScanPlan> = None;
255
256 for idx in &schema.indices {
257 if let Some((score, plan)) = try_index_scan(schema, idx, predicates) {
258 if best_score.is_none() || score > *best_score.as_ref().unwrap() {
259 best_score = Some(score);
260 best_plan = Some(plan);
261 }
262 }
263 }
264
265 best_plan
266}
267
268fn try_index_scan(
269 schema: &TableSchema,
270 idx: &IndexDef,
271 predicates: &[Option<SimplePredicate>],
272) -> Option<(IndexScore, ScanPlan)> {
273 let mut used = Vec::new();
274 let mut equality_values: Vec<Value> = Vec::new();
275 let mut range_conds: Vec<(BinOp, Value)> = Vec::new();
276
277 for &col_idx in &idx.columns {
278 let mut found_eq = false;
279 for (i, pred) in predicates.iter().enumerate() {
280 if used.contains(&i) {
281 continue;
282 }
283 if let Some(sp) = pred {
284 if sp.col_idx == col_idx as usize && sp.op == BinOp::Eq {
285 equality_values.push(sp.value.clone());
286 used.push(i);
287 found_eq = true;
288 break;
289 }
290 }
291 }
292 if !found_eq {
293 for (i, pred) in predicates.iter().enumerate() {
294 if used.contains(&i) {
295 continue;
296 }
297 if let Some(sp) = pred {
298 if sp.col_idx == col_idx as usize && is_range_op(sp.op) {
299 range_conds.push((sp.op, sp.value.clone()));
300 used.push(i);
301 }
302 }
303 }
304 break;
305 }
306 }
307
308 if equality_values.is_empty() && range_conds.is_empty() {
309 return None;
310 }
311
312 let score = IndexScore {
313 num_equality: equality_values.len(),
314 has_range: !range_conds.is_empty(),
315 is_unique: idx.unique,
316 };
317
318 let prefix = encode_composite_key(&equality_values);
319 let idx_table = TableSchema::index_table_name(&schema.name, &idx.name);
320
321 Some((
322 score,
323 ScanPlan::IndexScan {
324 index_name: idx.name.clone(),
325 idx_table,
326 prefix,
327 num_prefix_cols: equality_values.len(),
328 range_conds,
329 is_unique: idx.unique,
330 index_columns: idx.columns.clone(),
331 },
332 ))
333}
334
335pub fn describe_plan(plan: &ScanPlan, table_schema: &TableSchema) -> String {
336 match plan {
337 ScanPlan::SeqScan => String::new(),
338
339 ScanPlan::PkLookup { pk_values } => {
340 let pk_cols: Vec<&str> = table_schema
341 .primary_key_columns
342 .iter()
343 .map(|&idx| table_schema.columns[idx as usize].name.as_str())
344 .collect();
345 let conditions: Vec<String> = pk_cols
346 .iter()
347 .zip(pk_values.iter())
348 .map(|(col, val)| format!("{col} = {}", format_value(val)))
349 .collect();
350 format!("USING PRIMARY KEY ({})", conditions.join(", "))
351 }
352
353 ScanPlan::PkRangeScan { range_conds, .. } => {
354 let pk_col = &table_schema.columns[table_schema.primary_key_columns[0] as usize].name;
355 let conditions: Vec<String> = range_conds
356 .iter()
357 .map(|(op, val)| format!("{pk_col} {} {}", op_symbol(*op), format_value(val)))
358 .collect();
359 format!("USING PRIMARY KEY RANGE ({})", conditions.join(", "))
360 }
361
362 ScanPlan::IndexScan {
363 index_name,
364 num_prefix_cols,
365 range_conds,
366 index_columns,
367 ..
368 } => {
369 let mut conditions = Vec::new();
370 for &col in index_columns.iter().take(*num_prefix_cols) {
371 let col_idx = col as usize;
372 let col_name = &table_schema.columns[col_idx].name;
373 conditions.push(format!("{col_name} = ?"));
374 }
375 if !range_conds.is_empty() && *num_prefix_cols < index_columns.len() {
376 let col_idx = index_columns[*num_prefix_cols] as usize;
377 let col_name = &table_schema.columns[col_idx].name;
378 for (op, _) in range_conds {
379 conditions.push(format!("{col_name} {} ?", op_symbol(*op)));
380 }
381 }
382 if conditions.is_empty() {
383 format!("USING INDEX {index_name}")
384 } else {
385 format!("USING INDEX {index_name} ({})", conditions.join(", "))
386 }
387 }
388 }
389}
390
391fn format_value(val: &Value) -> String {
392 match val {
393 Value::Null => "NULL".into(),
394 Value::Integer(i) => i.to_string(),
395 Value::Real(f) => format!("{f}"),
396 Value::Text(s) => format!("'{s}'"),
397 Value::Blob(_) => "BLOB".into(),
398 Value::Boolean(b) => b.to_string(),
399 Value::Date(d) => format!("DATE '{}'", crate::datetime::format_date(*d)),
400 Value::Time(t) => format!("TIME '{}'", crate::datetime::format_time(*t)),
401 Value::Timestamp(t) => format!("TIMESTAMP '{}'", crate::datetime::format_timestamp(*t)),
402 Value::Interval {
403 months,
404 days,
405 micros,
406 } => format!(
407 "INTERVAL '{}'",
408 crate::datetime::format_interval(*months, *days, *micros)
409 ),
410 }
411}
412
413fn op_symbol(op: BinOp) -> &'static str {
414 match op {
415 BinOp::Lt => "<",
416 BinOp::LtEq => "<=",
417 BinOp::Gt => ">",
418 BinOp::GtEq => ">=",
419 BinOp::Eq => "=",
420 BinOp::NotEq => "!=",
421 _ => "?",
422 }
423}
424
425#[cfg(test)]
426#[path = "planner_tests.rs"]
427mod tests;