powdb_query/executor/plan_exec.rs
1//! The execute_plan method and associated helpers.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::{check_join_limit, Engine, MAX_SORT_ROWS};
15use powdb_storage::view::{ViewDef, ViewRegistry};
16
17impl Engine {
18 pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
19 match plan {
20 PlanNode::SeqScan { table } => {
21 // Auto-refresh dirty materialized views on read.
22 if self.view_registry.is_dirty(table) {
23 self.refresh_view(table)?;
24 }
25 let schema = self
26 .catalog
27 .schema(table)
28 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
29 .clone();
30 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
31 let rows: Vec<Vec<Value>> = self
32 .catalog
33 .scan(table)
34 .map_err(|e| QueryError::StorageError(e.to_string()))?
35 .map(|(_, row)| row)
36 .collect();
37 Ok(QueryResult::Rows { columns, rows })
38 }
39
40 PlanNode::Filter { input, predicate } => {
41 // Materialize any IN-subqueries in the predicate before the
42 // scan loop — the closure can't call back into the engine.
43 // Correlated subqueries are left in place for per-row eval.
44 let materialized;
45 let predicate = if contains_subquery(predicate) {
46 materialized = self.materialize_subqueries(predicate)?;
47 &materialized
48 } else {
49 predicate
50 };
51
52 // Correlated subquery path: per-row materialisation.
53 if contains_subquery(predicate) {
54 let result = self.execute_plan(input)?;
55 return match result {
56 QueryResult::Rows { columns, rows } => {
57 let mut filtered = Vec::new();
58 for row in rows {
59 let row_pred =
60 self.materialize_correlated_for_row(predicate, &row, &columns)?;
61 if eval_predicate(&row_pred, &row, &columns) {
62 filtered.push(row);
63 }
64 }
65 Ok(QueryResult::Rows {
66 columns,
67 rows: filtered,
68 })
69 }
70 _ => Err("filter requires row input".into()),
71 };
72 }
73
74 // Fast path: fuse Filter + SeqScan into a zero-copy streaming
75 // loop. Uses decode_column() to evaluate the predicate on only
76 // the columns it references, avoiding heap allocations for
77 // String/Bytes columns that aren't part of the filter.
78 if let PlanNode::SeqScan { table } = input.as_ref() {
79 // Auto-refresh dirty materialized views.
80 if self.view_registry.is_dirty(table) {
81 self.refresh_view(table)?;
82 }
83 let schema = self
84 .catalog
85 .schema(table)
86 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
87 .clone();
88 let columns: Vec<String> =
89 schema.columns.iter().map(|c| c.name.clone()).collect();
90 let fast = FastLayout::new(&schema);
91 let row_layout = RowLayout::new(&schema);
92 // Mission F: pre-size to skip the first 4 Vec doublings
93 // (4 → 8 → 16 → 32 → 64). On a 100K-row scan with 30%
94 // selectivity that's ~4 fewer reallocations + memcpys.
95 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
96
97 // Try compiled predicate for the filter check (handles
98 // int leaves, string-eq leaves, and And conjunctions).
99 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
100 self.catalog
101 .for_each_row_raw(table, |_rid, data| {
102 if compiled(data) {
103 rows.push(decode_row(&schema, data));
104 }
105 })
106 .map_err(|e| QueryError::StorageError(e.to_string()))?;
107 } else {
108 let pred_cols = predicate_column_indices(predicate, &columns);
109 self.catalog
110 .for_each_row_raw(table, |_rid, data| {
111 let pred_row =
112 decode_selective(&schema, &row_layout, data, &pred_cols);
113 if eval_predicate(predicate, &pred_row, &columns) {
114 rows.push(decode_row(&schema, data));
115 }
116 })
117 .map_err(|e| QueryError::StorageError(e.to_string()))?;
118 }
119
120 return Ok(QueryResult::Rows { columns, rows });
121 }
122
123 // General path: materialise then filter.
124 let result = self.execute_plan(input)?;
125 match result {
126 QueryResult::Rows { columns, rows } => {
127 let filtered: Vec<Vec<Value>> = rows
128 .into_iter()
129 .filter(|row| eval_predicate(predicate, row, &columns))
130 .collect();
131 Ok(QueryResult::Rows {
132 columns,
133 rows: filtered,
134 })
135 }
136 _ => Err("filter requires row input".into()),
137 }
138 }
139
140 PlanNode::Project { input, fields } => {
141 // Fast path: Project over IndexScan — decode only projected
142 // columns from raw bytes instead of full decode_row.
143 if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
144 let schema = self
145 .catalog
146 .schema(table)
147 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
148 .clone();
149 let all_columns: Vec<String> =
150 schema.columns.iter().map(|c| c.name.clone()).collect();
151 let key_value = literal_to_value(key)?;
152 let tbl = self
153 .catalog
154 .get_table(table)
155 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
156
157 let proj_columns: Vec<String> = fields
158 .iter()
159 .map(|f| {
160 f.alias.clone().unwrap_or_else(|| match &f.expr {
161 Expr::Field(name) => name.clone(),
162 _ => "?".into(),
163 })
164 })
165 .collect();
166
167 // Determine which column indices the projection needs
168 let proj_indices: Vec<usize> = fields
169 .iter()
170 .filter_map(|f| {
171 if let Expr::Field(name) = &f.expr {
172 all_columns.iter().position(|c| c == name)
173 } else {
174 None
175 }
176 })
177 .collect();
178
179 if tbl.has_index(column) {
180 let layout = RowLayout::new(&schema);
181 let rids = tbl.index_lookup_all(column, &key_value);
182 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
183 for rid in rids {
184 if let Some(data) = tbl.heap.get(rid) {
185 let row: Vec<Value> = proj_indices
186 .iter()
187 .map(|&ci| decode_column(&schema, &layout, &data, ci))
188 .collect();
189 rows.push(row);
190 }
191 }
192 return Ok(QueryResult::Rows {
193 columns: proj_columns,
194 rows,
195 });
196 }
197 }
198
199 // Fast path: Project(Limit(Sort(Filter(SeqScan)))) — bounded
200 // top-N heap. Decodes only the sort key + projected columns,
201 // keeps at most `limit` rows in a heap. Also handles the
202 // Project(Limit(Sort(SeqScan))) variant (no filter).
203 if let PlanNode::Limit {
204 input: inner,
205 count: limit_expr,
206 } = input.as_ref()
207 {
208 if let PlanNode::Sort {
209 input: sort_input,
210 keys,
211 } = inner.as_ref()
212 {
213 // Fast path only for single-key sorts
214 if keys.len() == 1 {
215 let sort_field = &keys[0].field;
216 let descending = keys[0].descending;
217 let limit = match limit_expr {
218 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
219 _ => usize::MAX,
220 };
221 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
222 match sort_input.as_ref() {
223 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
224 PlanNode::Filter {
225 input: fi,
226 predicate,
227 } => {
228 if let PlanNode::SeqScan { table } = fi.as_ref() {
229 (Some(table.as_str()), Some(predicate))
230 } else {
231 (None, None)
232 }
233 }
234 _ => (None, None),
235 };
236 if let Some(table) = table_opt {
237 if let Some(result) = self.project_filter_sort_limit_fast(
238 table, fields, sort_field, descending, limit, pred_opt,
239 )? {
240 return Ok(result);
241 }
242 }
243 }
244 }
245 // Fast path: Project(Limit(Filter(SeqScan))) — stream,
246 // decode only projected columns, stop at limit.
247 if let PlanNode::Filter {
248 input: fi,
249 predicate,
250 } = inner.as_ref()
251 {
252 if let PlanNode::SeqScan { table } = fi.as_ref() {
253 let limit = match limit_expr {
254 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
255 _ => usize::MAX,
256 };
257 if let Some(result) = self.project_filter_limit_fast(
258 table,
259 fields,
260 limit,
261 Some(predicate),
262 )? {
263 return Ok(result);
264 }
265 }
266 }
267 // Fast path: Project(Limit(SeqScan)) — stream, no filter.
268 if let PlanNode::SeqScan { table } = inner.as_ref() {
269 let limit = match limit_expr {
270 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
271 _ => usize::MAX,
272 };
273 if let Some(result) =
274 self.project_filter_limit_fast(table, fields, limit, None)?
275 {
276 return Ok(result);
277 }
278 }
279 }
280
281 // Mission D4: Project(Filter(SeqScan)) without Limit. Reuses
282 // `project_filter_limit_fast` with limit = usize::MAX so the
283 // hot loop decodes only projected columns and uses the
284 // compiled predicate. Previously this fell through to the
285 // generic Filter branch which materialised every column via
286 // `decode_row` then re-projected — quadratic work.
287 //
288 // multi_col_and_filter (`U filter .age > 30 and .status =
289 // "active" { .name, .age }`) was 6.18ms (0.7x SQLite) and
290 // is the load-bearing workload for this fast path.
291 if let PlanNode::Filter {
292 input: fi,
293 predicate,
294 } = input.as_ref()
295 {
296 if let PlanNode::SeqScan { table } = fi.as_ref() {
297 if let Some(result) = self.project_filter_limit_fast(
298 table,
299 fields,
300 usize::MAX,
301 Some(predicate),
302 )? {
303 return Ok(result);
304 }
305 }
306 }
307
308 // Mission D4: Project(SeqScan) without Filter or Limit.
309 // Decode only projected columns; the previous fall-through
310 // built full Vec<Value> rows then re-projected.
311 if let PlanNode::SeqScan { table } = input.as_ref() {
312 if let Some(result) =
313 self.project_filter_limit_fast(table, fields, usize::MAX, None)?
314 {
315 return Ok(result);
316 }
317 }
318
319 let result = self.execute_plan(input)?;
320 match result {
321 QueryResult::Rows { columns, rows } => {
322 let proj_columns: Vec<String> = fields
323 .iter()
324 .map(|f| {
325 f.alias.clone().unwrap_or_else(|| match &f.expr {
326 Expr::Field(name) => name.clone(),
327 // Mission E1.2: `{ u.name }` projects as the
328 // qualified column name so callers can still
329 // disambiguate across the join output.
330 Expr::QualifiedField { qualifier, field } => {
331 format!("{qualifier}.{field}")
332 }
333 _ => "?".into(),
334 })
335 })
336 .collect();
337 let proj_rows: Vec<Vec<Value>> = rows
338 .iter()
339 .map(|row| {
340 fields
341 .iter()
342 .map(|f| eval_expr(&f.expr, row, &columns))
343 .collect()
344 })
345 .collect();
346 Ok(QueryResult::Rows {
347 columns: proj_columns,
348 rows: proj_rows,
349 })
350 }
351 _ => Err("project requires row input".into()),
352 }
353 }
354
355 PlanNode::Sort { input, keys } => {
356 let result = self.execute_plan(input)?;
357 match result {
358 QueryResult::Rows { columns, mut rows } => {
359 // WS2: row-count cap is a cheap secondary guard; the
360 // byte budget is the real OOM defense for the sort
361 // buffer (a few very large rows pass the row cap).
362 if rows.len() > MAX_SORT_ROWS {
363 return Err(QueryError::SortLimitExceeded);
364 }
365 self.charge_rows(&rows)?;
366 let key_indices: Vec<(usize, bool)> = keys
367 .iter()
368 .map(|k| {
369 columns
370 .iter()
371 .position(|c| c == &k.field)
372 .map(|idx| (idx, k.descending))
373 .ok_or_else(|| QueryError::ColumnNotFound {
374 table: String::new(),
375 column: k.field.clone(),
376 })
377 })
378 .collect::<Result<_, QueryError>>()?;
379 rows.sort_by(|a, b| {
380 for &(col_idx, descending) in &key_indices {
381 let cmp = a[col_idx].cmp(&b[col_idx]);
382 let cmp = if descending { cmp.reverse() } else { cmp };
383 if cmp != std::cmp::Ordering::Equal {
384 return cmp;
385 }
386 }
387 std::cmp::Ordering::Equal
388 });
389 Ok(QueryResult::Rows { columns, rows })
390 }
391 _ => Err("sort requires row input".into()),
392 }
393 }
394
395 PlanNode::Limit { input, count } => {
396 let result = self.execute_plan(input)?;
397 let n = match count {
398 Expr::Literal(Literal::Int(v)) => *v as usize,
399 _ => return Err("limit must be integer literal".into()),
400 };
401 match result {
402 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
403 columns,
404 rows: rows.into_iter().take(n).collect(),
405 }),
406 _ => Err("limit requires row input".into()),
407 }
408 }
409
410 PlanNode::Offset { input, count } => {
411 let result = self.execute_plan(input)?;
412 let n = match count {
413 Expr::Literal(Literal::Int(v)) => *v as usize,
414 _ => return Err("offset must be integer literal".into()),
415 };
416 match result {
417 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
418 columns,
419 rows: rows.into_iter().skip(n).collect(),
420 }),
421 _ => Err("offset requires row input".into()),
422 }
423 }
424
425 PlanNode::Aggregate {
426 input,
427 function,
428 field,
429 } => {
430 // Fast path: count() over SeqScan — count rows without any decode
431 if *function == AggFunc::Count {
432 if let PlanNode::SeqScan { table } = input.as_ref() {
433 // Auto-refresh a dirty materialized view before
434 // counting it — otherwise count(View) returns stale
435 // data after an underlying mutation (F3).
436 if self.view_registry.is_dirty(table) {
437 self.refresh_view(table)?;
438 }
439 let mut count: i64 = 0;
440 self.catalog
441 .for_each_row_raw(table, |_rid, _data| {
442 count += 1;
443 })
444 .map_err(|e| QueryError::StorageError(e.to_string()))?;
445 return Ok(QueryResult::Scalar(Value::Int(count)));
446 }
447 // Fast path: count() over Filter(SeqScan) — try compiled
448 // predicate first, fall back to decode_column path.
449 // Skip a predicate carrying a subquery: the raw-bytes
450 // evaluators here don't materialise subqueries, so
451 // `count(T filter .x in (...))` would silently count 0
452 // (F1). Falling through routes it to the generic path
453 // that resolves the subquery correctly.
454 if let PlanNode::Filter {
455 input: inner,
456 predicate,
457 } = input.as_ref()
458 {
459 if let PlanNode::SeqScan { table } = inner.as_ref() {
460 if self.view_registry.is_dirty(table) {
461 self.refresh_view(table)?;
462 }
463 }
464 if let (PlanNode::SeqScan { table }, false) =
465 (inner.as_ref(), contains_subquery(predicate))
466 {
467 let schema = self
468 .catalog
469 .schema(table)
470 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
471 .clone();
472 let columns: Vec<String> =
473 schema.columns.iter().map(|c| c.name.clone()).collect();
474 let fast = FastLayout::new(&schema);
475 let row_layout = RowLayout::new(&schema);
476
477 // Try compiled predicate (zero-allocation hot path).
478 // Handles int leaves, string-eq leaves, AND conjunctions.
479 if let Some(compiled) =
480 compile_predicate(predicate, &columns, &fast, &schema)
481 {
482 let mut count: i64 = 0;
483 self.catalog
484 .for_each_row_raw(table, |_rid, data| {
485 if compiled(data) {
486 count += 1;
487 }
488 })
489 .map_err(|e| QueryError::StorageError(e.to_string()))?;
490 return Ok(QueryResult::Scalar(Value::Int(count)));
491 }
492
493 // Fallback: decode predicate columns
494 let pred_cols = predicate_column_indices(predicate, &columns);
495 let mut count: i64 = 0;
496 self.catalog
497 .for_each_row_raw(table, |_rid, data| {
498 let pred_row =
499 decode_selective(&schema, &row_layout, data, &pred_cols);
500 if eval_predicate(predicate, &pred_row, &columns) {
501 count += 1;
502 }
503 })
504 .map_err(|e| QueryError::StorageError(e.to_string()))?;
505
506 return Ok(QueryResult::Scalar(Value::Int(count)));
507 }
508 }
509 }
510
511 // Fast path: sum/avg/min/max over a single fixed-size int
512 // column with an optional compiled filter predicate. Walks
513 // raw row bytes, zero allocation per row.
514 if matches!(
515 function,
516 AggFunc::Sum
517 | AggFunc::Avg
518 | AggFunc::Min
519 | AggFunc::Max
520 | AggFunc::CountDistinct
521 ) {
522 if let Some(col) = field.as_ref() {
523 // Shape: Aggregate(SeqScan) or Aggregate(Filter(SeqScan))
524 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
525 match input.as_ref() {
526 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
527 PlanNode::Filter {
528 input: inner,
529 predicate,
530 } => {
531 if let PlanNode::SeqScan { table } = inner.as_ref() {
532 (Some(table.as_str()), Some(predicate))
533 } else {
534 (None, None)
535 }
536 }
537 _ => (None, None),
538 };
539 if let Some(table) = table_opt {
540 if let Some(result) =
541 self.agg_single_col_fast(table, col, *function, pred_opt)?
542 {
543 return Ok(result);
544 }
545 }
546 }
547 }
548
549 // Fast path: Project(Limit(Filter(SeqScan))) — stream, decode
550 // only projected columns, stop once we hit the limit.
551 // (Handled in the Project branch; this branch only fires when
552 // the aggregate is the outer node.)
553 let result = self.execute_plan(input)?;
554 match result {
555 QueryResult::Rows { columns, rows } => {
556 match function {
557 AggFunc::Count => {
558 Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
559 }
560 AggFunc::CountDistinct => {
561 let col = field.as_ref().ok_or("count distinct requires field")?;
562 let idx = columns
563 .iter()
564 .position(|c| c == col)
565 .ok_or("col not found")?;
566 let mut seen = std::collections::HashSet::new();
567 for row in &rows {
568 let v = &row[idx];
569 if !v.is_empty() {
570 seen.insert(v.clone());
571 }
572 }
573 Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
574 }
575 AggFunc::Avg => {
576 let col = field.as_ref().ok_or("avg requires field")?;
577 let idx = columns
578 .iter()
579 .position(|c| c == col)
580 .ok_or("col not found")?;
581 let sum: f64 = rows
582 .iter()
583 .filter_map(|r| match &r[idx] {
584 Value::Int(v) => Some(*v as f64),
585 Value::Float(v) => Some(*v),
586 _ => None,
587 })
588 .sum();
589 let count = rows.len() as f64;
590 Ok(QueryResult::Scalar(Value::Float(sum / count)))
591 }
592 AggFunc::Sum => {
593 let col = field.as_ref().ok_or("sum requires field")?;
594 let idx = columns
595 .iter()
596 .position(|c| c == col)
597 .ok_or("col not found")?;
598 // Track int and float contributions separately so
599 // Float columns (and mixed Int/Float rows) don't get
600 // silently dropped as they did in the Int-only
601 // version. If any Float is present, the whole sum
602 // promotes to Float — matching Avg's semantics.
603 let mut int_sum: i64 = 0;
604 let mut float_sum: f64 = 0.0;
605 let mut saw_float = false;
606 for r in &rows {
607 match &r[idx] {
608 Value::Int(v) => int_sum += *v,
609 Value::Float(v) => {
610 float_sum += *v;
611 saw_float = true;
612 }
613 _ => {}
614 }
615 }
616 let result = if saw_float {
617 Value::Float(float_sum + int_sum as f64)
618 } else {
619 Value::Int(int_sum)
620 };
621 Ok(QueryResult::Scalar(result))
622 }
623 AggFunc::Min | AggFunc::Max => {
624 let col = field.as_ref().ok_or("min/max requires field")?;
625 let idx = columns
626 .iter()
627 .position(|c| c == col)
628 .ok_or("col not found")?;
629 let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
630 let result = if *function == AggFunc::Min {
631 vals.into_iter().min().cloned()
632 } else {
633 vals.into_iter().max().cloned()
634 };
635 Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
636 }
637 }
638 }
639 _ => Err("aggregate requires row input".into()),
640 }
641 }
642
643 PlanNode::Insert { table, rows } => {
644 // Build + validate EVERY row before inserting any, so a bad
645 // row (unknown/missing/uncoercible field) aborts the whole
646 // statement without a partial write. The WAL fsync happens
647 // once at statement end, so N rows = N appends + 1 fsync.
648 let all_values: Vec<Vec<Value>> = {
649 let schema = self
650 .catalog
651 .schema(table)
652 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
653 let mut all = Vec::with_capacity(rows.len());
654 for assignments in rows {
655 let mut values = vec![Value::Empty; schema.columns.len()];
656 for a in assignments {
657 let idx = schema.column_index(&a.field).ok_or_else(|| {
658 QueryError::ColumnNotFound {
659 table: String::new(),
660 column: a.field.clone(),
661 }
662 })?;
663 let raw = literal_to_value(&a.value)?;
664 values[idx] = coerce_value(raw, &schema.columns[idx])?;
665 }
666 for col in &schema.columns {
667 if col.required && matches!(values[col.position as usize], Value::Empty)
668 {
669 return Err(QueryError::Execution(format!(
670 "column '{}' is required but no value was provided",
671 col.name
672 )));
673 }
674 }
675 all.push(values);
676 }
677 all
678 };
679 // Charge the materialized batch against the per-query memory
680 // budget before inserting — keeps multi-row insert consistent
681 // with every other full-materialization point (sort/join/group)
682 // and bounds embedded callers (the server also caps the query
683 // string at 1 MB, but embedded callers have no such limit).
684 self.charge_rows(&all_values)?;
685 let n = all_values.len() as u64;
686 for values in &all_values {
687 self.catalog
688 .insert(table, values)
689 .map_err(|e| QueryError::StorageError(e.to_string()))?;
690 }
691 self.view_registry.mark_dependents_dirty(table);
692 Ok(QueryResult::Modified(n))
693 }
694
695 PlanNode::Upsert {
696 table,
697 key_column,
698 assignments,
699 on_conflict,
700 } => {
701 let (values, key_idx) = {
702 let schema = self
703 .catalog
704 .schema(table)
705 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
706 let mut values = vec![Value::Empty; schema.columns.len()];
707 for a in assignments {
708 let idx = schema.column_index(&a.field).ok_or_else(|| {
709 QueryError::ColumnNotFound {
710 table: String::new(),
711 column: a.field.clone(),
712 }
713 })?;
714 let raw = literal_to_value(&a.value)?;
715 values[idx] = coerce_value(raw, &schema.columns[idx])?;
716 }
717 for col in &schema.columns {
718 if col.required && matches!(values[col.position as usize], Value::Empty) {
719 return Err(QueryError::Execution(format!(
720 "column '{}' is required but no value was provided",
721 col.name
722 )));
723 }
724 }
725 let key_idx = schema
726 .column_index(key_column)
727 .ok_or_else(|| format!("key column '{key_column}' not found"))?;
728 (values, key_idx)
729 };
730
731 let key_value = values[key_idx].clone();
732
733 // Probe the index for a conflict.
734 let existing = {
735 let tbl = self
736 .catalog
737 .get_table(table)
738 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
739 if tbl.has_index(key_column) {
740 // Upsert key lookup: return the first match.
741 // For unique indexes this is the only match.
742 // For non-unique indexes on a key column, also
743 // just the first (upsert semantics).
744 let rids = tbl.index_lookup_all(key_column, &key_value);
745 rids.into_iter().next().and_then(|rid| {
746 tbl.heap
747 .get(rid)
748 .map(|data| (rid, decode_row(&tbl.schema, &data)))
749 })
750 } else {
751 // No index — linear scan for the key.
752 let mut found = None;
753 for (rid, row) in tbl.scan() {
754 if row[key_idx] == key_value {
755 found = Some((rid, row));
756 break;
757 }
758 }
759 found
760 }
761 };
762
763 if let Some((rid, mut existing_row)) = existing {
764 // Conflict: apply on_conflict assignments (or all non-key if empty).
765 let update_assignments = if on_conflict.is_empty() {
766 assignments
767 } else {
768 on_conflict
769 };
770 let changed_cols: Vec<usize> = {
771 let schema = self
772 .catalog
773 .schema(table)
774 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
775 let mut indices = Vec::new();
776 for a in update_assignments {
777 let idx = schema.column_index(&a.field).ok_or_else(|| {
778 QueryError::ColumnNotFound {
779 table: String::new(),
780 column: a.field.clone(),
781 }
782 })?;
783 if idx != key_idx {
784 existing_row[idx] = literal_to_value(&a.value)?;
785 indices.push(idx);
786 }
787 }
788 indices
789 };
790 self.catalog
791 .update_hinted(table, rid, &existing_row, Some(&changed_cols))
792 .map_err(|e| QueryError::StorageError(e.to_string()))?;
793 self.view_registry.mark_dependents_dirty(table);
794 Ok(QueryResult::Modified(1))
795 } else {
796 // No conflict: insert.
797 self.catalog
798 .insert(table, &values)
799 .map_err(|e| QueryError::StorageError(e.to_string()))?;
800 self.view_registry.mark_dependents_dirty(table);
801 Ok(QueryResult::Modified(1))
802 }
803 }
804
805 PlanNode::Update {
806 input,
807 table,
808 assignments,
809 } => {
810 // Mission C Phase 3: resolve assignments against a borrowed
811 // schema, then drop the borrow before the mutation loop.
812 // Try literal-only path first; fall back to per-row expression
813 // evaluation if any assignment contains a non-literal expression
814 // (e.g., `age := .age + 1`).
815 let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
816 let schema_ref = self
817 .catalog
818 .schema(table)
819 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
820 let indices: Vec<usize> = assignments
821 .iter()
822 .map(|a| {
823 schema_ref.column_index(&a.field).ok_or_else(|| {
824 QueryError::ColumnNotFound {
825 table: String::new(),
826 column: a.field.clone(),
827 }
828 })
829 })
830 .collect::<Result<_, _>>()?;
831 let vals: Result<Vec<Value>, _> = assignments
832 .iter()
833 .map(|a| literal_to_value(&a.value))
834 .collect();
835 (indices, vals.ok())
836 };
837 let resolved_assignments: Option<Vec<(usize, Value)>> =
838 literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
839
840 // Mission C Phase 2: the hint Table::update_hinted needs to
841 // decide whether to read the old row for index diff.
842 let changed_cols: Vec<usize> = col_indices.clone();
843
844 // ── Fused scan+update for Update(Filter(SeqScan)) ────────
845 // Perf sprint: instead of the two-pass collect-RIDs-then-loop
846 // pattern (which pays one ensure_hot per matched row on the
847 // second pass), fuse the predicate evaluation and in-place
848 // byte-level mutation into a single heap walk. Same idea as
849 // the fused scan_delete_matching path for deletes.
850 if let Some(ref resolved_assignments) = resolved_assignments {
851 if let PlanNode::Filter {
852 input: inner,
853 predicate,
854 } = input.as_ref()
855 {
856 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
857 if t == table {
858 let fused_result = self.try_fused_scan_update(
859 table,
860 predicate,
861 resolved_assignments,
862 &changed_cols,
863 );
864 if let Some(result) = fused_result {
865 return result;
866 }
867 }
868 }
869 }
870 }
871
872 // Collect matching RowIds in a single pass.
873 let matching_rids = self.collect_rids_for_mutation(input, table)?;
874
875 // ── Literal-only fast paths ─────────────────────────────
876 if let Some(ref resolved_assignments) = resolved_assignments {
877 // Mission C Phase 4: in-place byte-patch fast path. If every
878 // assignment targets a fixed-size non-null column AND none of
879 // them is indexed, we can skip decode_row / Vec<Value> /
880 // encode_row_into entirely and patch the row's raw bytes on
881 // the hot page.
882 let fast_patch: Option<Vec<FastPatch>> = {
883 let tbl = self
884 .catalog
885 .get_table(table)
886 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
887 let schema = &tbl.schema;
888 let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
889 is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
890 });
891 let no_indexed = !resolved_assignments
892 .iter()
893 .any(|(idx, _)| tbl.has_indexed_col(*idx));
894
895 if all_fixed_nonnull && no_indexed {
896 let layout = RowLayout::new(schema);
897 let bitmap_size = layout.bitmap_size();
898 let patches: Vec<FastPatch> = resolved_assignments
899 .iter()
900 .map(|(idx, val)| {
901 let fixed_off = layout
902 .fixed_offset(*idx)
903 .expect("is_fixed_size already checked");
904 let field_off = 2 + bitmap_size + fixed_off;
905 let bytes: FixedBytes = match val {
906 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
907 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
908 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
909 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
910 Value::Uuid(v) => FixedBytes::Uuid(*v),
911 _ => unreachable!("all_fixed_nonnull guard lied"),
912 };
913 FastPatch {
914 field_off,
915 bitmap_byte_off: 2 + idx / 8,
916 bit_mask: 1u8 << (idx % 8),
917 bytes,
918 }
919 })
920 .collect();
921 Some(patches)
922 } else {
923 None
924 }
925 };
926
927 if let Some(patches) = fast_patch {
928 let mut count = 0u64;
929 for rid in matching_rids {
930 // Mission B2: WAL-log every patch so crash
931 // recovery replays the update. Same mutation
932 // closure as before — the wrapper just sandwiches
933 // it between a hot-page read and a WAL append.
934 let ok = self
935 .catalog
936 .update_row_bytes_logged(table, rid, |row| {
937 for p in &patches {
938 row[p.bitmap_byte_off] &= !p.bit_mask;
939 let field_bytes = p.bytes.as_slice();
940 row[p.field_off..p.field_off + field_bytes.len()]
941 .copy_from_slice(field_bytes);
942 }
943 })
944 .map_err(|e| QueryError::StorageError(e.to_string()))?;
945 if ok {
946 count += 1;
947 }
948 }
949 self.view_registry.mark_dependents_dirty(table);
950 return Ok(QueryResult::Modified(count));
951 }
952
953 // Mission C Phase 10: var-column in-place shrink fast path.
954 let var_fast: Option<(usize, Option<Vec<u8>>)> = {
955 let tbl = self
956 .catalog
957 .get_table(table)
958 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
959 let schema = &tbl.schema;
960 let is_single = resolved_assignments.len() == 1;
961 let is_var_col = is_single
962 && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
963 let no_indexed = !resolved_assignments
964 .iter()
965 .any(|(idx, _)| tbl.has_indexed_col(*idx));
966
967 if is_single && is_var_col && no_indexed {
968 let (idx, val) = &resolved_assignments[0];
969 let bytes_opt: Option<Vec<u8>> = match val {
970 Value::Str(s) => Some(s.as_bytes().to_vec()),
971 Value::Bytes(b) => Some(b.clone()),
972 Value::Empty => None,
973 _ => {
974 return Err(QueryError::TypeError(format!(
975 "cannot assign non-var value to var column '{}'",
976 schema.columns[*idx].name
977 )))
978 }
979 };
980 Some((*idx, bytes_opt))
981 } else {
982 None
983 }
984 };
985
986 if let Some((col_idx, new_bytes_opt)) = var_fast {
987 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
988 let mut count = 0u64;
989 let mut fallback_rids: Vec<RowId> = Vec::new();
990 for rid in &matching_rids {
991 // Mission B2: logged variant so crash recovery
992 // replays the shrink. On a false return (row
993 // would have to grow), the rid is pushed to
994 // `fallback_rids` and the slower `update_hinted`
995 // path — which is already WAL-logged — picks it up.
996 let ok = self
997 .catalog
998 .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
999 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1000 if ok {
1001 count += 1;
1002 } else {
1003 fallback_rids.push(*rid);
1004 }
1005 }
1006 for rid in fallback_rids {
1007 let mut row = match self.catalog.get(table, rid) {
1008 Some(r) => r,
1009 None => continue,
1010 };
1011 for (idx, val) in resolved_assignments.iter() {
1012 row[*idx] = val.clone();
1013 }
1014 self.catalog
1015 .update_hinted(table, rid, &row, Some(&changed_cols))
1016 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1017 count += 1;
1018 }
1019 self.view_registry.mark_dependents_dirty(table);
1020 return Ok(QueryResult::Modified(count));
1021 }
1022
1023 // Generic literal path: decode row, apply literal values.
1024 let mut count = 0u64;
1025 for rid in matching_rids {
1026 let mut row = match self.catalog.get(table, rid) {
1027 Some(r) => r,
1028 None => continue,
1029 };
1030 for (idx, val) in resolved_assignments.iter() {
1031 row[*idx] = val.clone();
1032 }
1033 self.catalog
1034 .update_hinted(table, rid, &row, Some(&changed_cols))
1035 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1036 count += 1;
1037 }
1038 self.view_registry.mark_dependents_dirty(table);
1039 return Ok(QueryResult::Modified(count));
1040 } // end if let Some(resolved_assignments)
1041
1042 // ── Expression-based update path ────────────────────────
1043 // At least one assignment contains a non-literal expression
1044 // (e.g., `age := .age + 1`). Evaluate per-row.
1045 let col_names: Vec<String> = {
1046 let schema_ref = self
1047 .catalog
1048 .schema(table)
1049 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1050 schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1051 };
1052 let mut count = 0u64;
1053 for rid in matching_rids {
1054 let mut row = match self.catalog.get(table, rid) {
1055 Some(r) => r,
1056 None => continue,
1057 };
1058 for (i, asgn) in assignments.iter().enumerate() {
1059 let val = eval_expr(&asgn.value, &row, &col_names);
1060 row[col_indices[i]] = val;
1061 }
1062 self.catalog
1063 .update_hinted(table, rid, &row, Some(&changed_cols))
1064 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1065 count += 1;
1066 }
1067 self.view_registry.mark_dependents_dirty(table);
1068 Ok(QueryResult::Modified(count))
1069 }
1070
1071 PlanNode::Delete { input, table } => {
1072 // Mission C Phase 3: no schema clone — collect_rids_for_mutation
1073 // looks up schema internally when it needs one, and the mutation
1074 // loop doesn't need the schema at all.
1075 //
1076 // Mission C Phase 12: route bulk deletes through
1077 // `Catalog::delete_many`, which batches the btree leaf
1078 // compaction and shares one `ensure_hot` per row between
1079 // the index-key extraction and the slot delete. On
1080 // `delete_by_filter` (100K fixture, ~20K matches) that
1081 // removes ~4ms of pure `Vec::remove` memmove from the btree
1082 // maintenance phase.
1083 //
1084 // Mission C Phase 16: for the common `delete where ...`
1085 // shape (Filter(SeqScan)) — and the rarer "delete
1086 // everything" shape (SeqScan) — skip the two-pass
1087 // `collect_rids_for_mutation` + `delete_many` flow entirely.
1088 // The fused `scan_delete_matching` primitive walks the
1089 // heap exactly once, paying one `ensure_hot` per page
1090 // instead of per-row. That closes the last major gap on
1091 // the bench's `delete_by_filter` workload.
1092 if let PlanNode::Filter {
1093 input: inner,
1094 predicate,
1095 } = input.as_ref()
1096 {
1097 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1098 if t == table {
1099 let schema = self
1100 .catalog
1101 .schema(table)
1102 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1103 let columns: Vec<String> =
1104 schema.columns.iter().map(|c| c.name.clone()).collect();
1105 let fast = FastLayout::new(schema);
1106 if let Some(compiled) =
1107 compile_predicate(predicate, &columns, &fast, schema)
1108 {
1109 // Mission B2: logged variant so every
1110 // matched rid hits the WAL during the
1111 // single-pass scan. Structure of the
1112 // fused scan is unchanged — only the
1113 // hook closure now also appends.
1114 let count = self
1115 .catalog
1116 .scan_delete_matching_logged(table, |data| compiled(data))
1117 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1118 self.view_registry.mark_dependents_dirty(table);
1119 return Ok(QueryResult::Modified(count));
1120 }
1121 }
1122 }
1123 } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1124 if t == table {
1125 // `delete from T` with no predicate — every live
1126 // row matches. One pass is still the right shape.
1127 // Mission B2: logged variant — see above.
1128 let count = self
1129 .catalog
1130 .scan_delete_matching_logged(table, |_| true)
1131 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1132 self.view_registry.mark_dependents_dirty(table);
1133 return Ok(QueryResult::Modified(count));
1134 }
1135 }
1136
1137 let matching_rids = self.collect_rids_for_mutation(input, table)?;
1138 let count = self
1139 .catalog
1140 .delete_many(table, &matching_rids)
1141 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1142 self.view_registry.mark_dependents_dirty(table);
1143 Ok(QueryResult::Modified(count))
1144 }
1145
1146 PlanNode::AliasScan { table, alias } => {
1147 // Mission E1.2: scan `table` and rename every output column
1148 // to `alias.field`. Used as a join leaf so downstream
1149 // NestedLoopJoin + Filter + Project nodes can resolve
1150 // `Expr::QualifiedField` lookups by direct column-name match.
1151 //
1152 // We don't bother with a fused zero-copy loop here yet — the
1153 // whole join path is nested-loop and correctness-first
1154 // (Phase E1.3 will introduce hash join and at that point we
1155 // can revisit whether to specialise AliasScan).
1156 let schema = self
1157 .catalog
1158 .schema(table)
1159 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1160 .clone();
1161 let columns: Vec<String> = schema
1162 .columns
1163 .iter()
1164 .map(|c| format!("{alias}.{}", c.name))
1165 .collect();
1166 let rows: Vec<Vec<Value>> = self
1167 .catalog
1168 .scan(table)
1169 .map_err(|e| QueryError::StorageError(e.to_string()))?
1170 .map(|(_, row)| row)
1171 .collect();
1172 Ok(QueryResult::Rows { columns, rows })
1173 }
1174
1175 PlanNode::NestedLoopJoin {
1176 left,
1177 right,
1178 on,
1179 kind,
1180 } => {
1181 // Materialise both sides. The executor ships two strategies:
1182 // 1. Hash join (E1.3) — when the `on` predicate is a
1183 // simple equi-predicate `left_col = right_col`, build a
1184 // FxHashMap<Value, Vec<row_idx>> over the right side
1185 // and probe with the left side. O(L + R) instead of
1186 // O(L × R). Handles Inner and LeftOuter.
1187 // 2. Nested loop (E1.2) — fallback for Cross, non-equi
1188 // predicates, or `on` expressions that reference
1189 // either side with something more complex than a
1190 // QualifiedField.
1191 let left_result = self.execute_plan(left)?;
1192 let right_result = self.execute_plan(right)?;
1193 let (left_columns, left_rows) = match left_result {
1194 QueryResult::Rows { columns, rows } => (columns, rows),
1195 _ => return Err("join left side must produce rows".into()),
1196 };
1197 let (right_columns, right_rows) = match right_result {
1198 QueryResult::Rows { columns, rows } => (columns, rows),
1199 _ => return Err("join right side must produce rows".into()),
1200 };
1201
1202 // WS2: byte-budget guard on the join build side. Charge both
1203 // materialized inputs before we build the hash table / probe;
1204 // the output is row-capped by check_join_limit below.
1205 self.charge_rows(&left_rows)?;
1206 self.charge_rows(&right_rows)?;
1207
1208 // Hash-join fast path.
1209 if !matches!(kind, JoinKind::Cross) {
1210 if let Some(pred) = on {
1211 if let Some((l_idx, r_idx)) =
1212 try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1213 {
1214 let result = hash_join(
1215 left_columns,
1216 left_rows,
1217 right_columns,
1218 right_rows,
1219 l_idx,
1220 r_idx,
1221 *kind,
1222 );
1223 if let QueryResult::Rows { ref rows, .. } = result {
1224 check_join_limit(rows.len())?;
1225 }
1226 return Ok(result);
1227 }
1228 }
1229 }
1230
1231 // Nested-loop fallback.
1232 let n_left = left_columns.len();
1233 let n_right = right_columns.len();
1234 let mut columns = Vec::with_capacity(n_left + n_right);
1235 columns.extend(left_columns);
1236 columns.extend(right_columns);
1237
1238 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1239 let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1240
1241 for left_row in &left_rows {
1242 let mut matched = false;
1243 for right_row in &right_rows {
1244 combined.clear();
1245 combined.extend_from_slice(left_row);
1246 combined.extend_from_slice(right_row);
1247 let keep = match kind {
1248 JoinKind::Cross => true,
1249 JoinKind::Inner | JoinKind::LeftOuter => match on {
1250 Some(pred) => eval_predicate(pred, &combined, &columns),
1251 // Missing `on` for non-cross joins is a
1252 // parser error, but if it slips through we
1253 // treat it as "match everything".
1254 None => true,
1255 },
1256 // RightOuter is rewritten to LeftOuter by the
1257 // planner, so we never see it here.
1258 JoinKind::RightOuter => {
1259 unreachable!("planner rewrites RightOuter to LeftOuter")
1260 }
1261 };
1262 if keep {
1263 rows.push(combined.clone());
1264 check_join_limit(rows.len())?;
1265 matched = true;
1266 }
1267 }
1268 if !matched && matches!(kind, JoinKind::LeftOuter) {
1269 let mut row = Vec::with_capacity(n_left + n_right);
1270 row.extend_from_slice(left_row);
1271 row.resize(n_left + n_right, Value::Empty);
1272 rows.push(row);
1273 check_join_limit(rows.len())?;
1274 }
1275 }
1276
1277 Ok(QueryResult::Rows { columns, rows })
1278 }
1279
1280 PlanNode::Distinct { input } => {
1281 let result = self.execute_plan(input)?;
1282 match result {
1283 QueryResult::Rows { columns, rows } => {
1284 let mut seen = std::collections::HashSet::new();
1285 let mut unique_rows = Vec::new();
1286 for row in rows {
1287 if seen.insert(row.clone()) {
1288 unique_rows.push(row);
1289 }
1290 }
1291 Ok(QueryResult::Rows {
1292 columns,
1293 rows: unique_rows,
1294 })
1295 }
1296 other => Ok(other),
1297 }
1298 }
1299
1300 PlanNode::GroupBy {
1301 input,
1302 keys,
1303 aggregates,
1304 having,
1305 } => {
1306 let result = self.execute_plan(input)?;
1307 match result {
1308 QueryResult::Rows { columns, rows } => {
1309 // WS2: byte-budget guard on the GROUP BY input buffer
1310 // (the hash table is bounded by the input it groups).
1311 self.charge_rows(&rows)?;
1312 // Resolve key column indices.
1313 let key_indices: Vec<usize> = keys
1314 .iter()
1315 .map(|k| {
1316 columns
1317 .iter()
1318 .position(|c| c == k)
1319 .ok_or_else(|| format!("group-by column '{k}' not found"))
1320 })
1321 .collect::<Result<Vec<_>, _>>()?;
1322
1323 // Resolve aggregate field indices. count(*) uses
1324 // sentinel usize::MAX — compute_group_aggregate
1325 // treats it as "count all rows in the group".
1326 let agg_field_indices: Vec<usize> = aggregates
1327 .iter()
1328 .map(|a| {
1329 if a.field == "*" {
1330 Ok(usize::MAX)
1331 } else {
1332 columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1333 format!("aggregate column '{}' not found", a.field)
1334 })
1335 }
1336 })
1337 .collect::<Result<Vec<_>, _>>()?;
1338
1339 // Group rows by key values (preserving insertion order).
1340 let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1341 rustc_hash::FxHashMap::default();
1342 let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1343 for (ri, row) in rows.iter().enumerate() {
1344 let key: Vec<Value> =
1345 key_indices.iter().map(|&i| row[i].clone()).collect();
1346 match group_map.get(&key) {
1347 Some(&idx) => groups[idx].1.push(ri),
1348 None => {
1349 let idx = groups.len();
1350 group_map.insert(key.clone(), idx);
1351 groups.push((key, vec![ri]));
1352 }
1353 }
1354 }
1355
1356 // Build output column names: keys ++ aggregate output names.
1357 let mut out_columns: Vec<String> = keys.clone();
1358 for agg in aggregates.iter() {
1359 out_columns.push(agg.output_name.clone());
1360 }
1361
1362 // Compute aggregates per group.
1363 let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1364 for (key_vals, row_indices) in &groups {
1365 let mut row = key_vals.clone();
1366 for (ai, agg) in aggregates.iter().enumerate() {
1367 let col_idx = agg_field_indices[ai];
1368 let val = compute_group_aggregate(
1369 agg.function,
1370 &rows,
1371 row_indices,
1372 col_idx,
1373 );
1374 row.push(val);
1375 }
1376 out_rows.push(row);
1377 }
1378
1379 // Apply HAVING filter.
1380 if let Some(having_expr) = having {
1381 out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1382 }
1383
1384 Ok(QueryResult::Rows {
1385 columns: out_columns,
1386 rows: out_rows,
1387 })
1388 }
1389 _ => Err("group by requires row input".into()),
1390 }
1391 }
1392
1393 PlanNode::CreateTable { name, fields } => {
1394 let columns: Vec<ColumnDef> = fields
1395 .iter()
1396 .enumerate()
1397 .map(
1398 |(i, (fname, tname, req))| -> Result<ColumnDef, QueryError> {
1399 Ok(ColumnDef {
1400 name: fname.clone(),
1401 type_id: type_name_to_id(tname).map_err(QueryError::TypeError)?,
1402 required: *req,
1403 position: i as u16,
1404 })
1405 },
1406 )
1407 .collect::<Result<Vec<_>, _>>()?;
1408 let schema = Schema {
1409 table_name: name.clone(),
1410 columns,
1411 };
1412 self.catalog
1413 .create_table(schema)
1414 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1415 Ok(QueryResult::Created(name.clone()))
1416 }
1417
1418 PlanNode::AlterTable { table, action } => match action {
1419 AlterAction::AddColumn {
1420 name,
1421 type_name,
1422 required,
1423 } => {
1424 let position = self
1425 .catalog
1426 .schema(table)
1427 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1428 .columns
1429 .len() as u16;
1430 let col = ColumnDef {
1431 name: name.clone(),
1432 type_id: type_name_to_id(type_name).map_err(QueryError::TypeError)?,
1433 required: *required,
1434 position,
1435 };
1436 self.catalog
1437 .alter_table_add_column(table, col)
1438 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1439 Ok(QueryResult::Executed {
1440 message: format!("column '{name}' added to '{table}'"),
1441 })
1442 }
1443 AlterAction::DropColumn { name } => {
1444 self.catalog
1445 .alter_table_drop_column(table, name)
1446 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1447 Ok(QueryResult::Executed {
1448 message: format!("column '{name}' dropped from '{table}'"),
1449 })
1450 }
1451 AlterAction::AddIndex { column } => {
1452 self.catalog
1453 .create_index(table, column)
1454 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1455 Ok(QueryResult::Executed {
1456 message: format!("index on '{table}.{column}' created"),
1457 })
1458 }
1459 },
1460
1461 PlanNode::DropTable { name } => {
1462 self.catalog
1463 .drop_table(name)
1464 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1465 Ok(QueryResult::Executed {
1466 message: format!("table '{name}' dropped"),
1467 })
1468 }
1469
1470 PlanNode::CreateView { name, query_text } => {
1471 self.create_view(name, query_text)?;
1472 Ok(QueryResult::Executed {
1473 message: format!("materialized view '{name}' created"),
1474 })
1475 }
1476
1477 PlanNode::RefreshView { name } => {
1478 self.refresh_view(name)?;
1479 Ok(QueryResult::Executed {
1480 message: format!("materialized view '{name}' refreshed"),
1481 })
1482 }
1483
1484 PlanNode::DropView { name } => {
1485 self.drop_view(name)?;
1486 Ok(QueryResult::Executed {
1487 message: format!("materialized view '{name}' dropped"),
1488 })
1489 }
1490
1491 PlanNode::Window { input, windows } => {
1492 let result = self.execute_plan(input)?;
1493 execute_window(result, windows)
1494 }
1495
1496 PlanNode::Union { left, right, all } => {
1497 let left_result = self.execute_plan(left)?;
1498 let right_result = self.execute_plan(right)?;
1499 let (left_cols, left_rows) = match left_result {
1500 QueryResult::Rows { columns, rows } => (columns, rows),
1501 _ => return Err("UNION requires query results on left side".into()),
1502 };
1503 let (_, right_rows) = match right_result {
1504 QueryResult::Rows { columns, rows } => (columns, rows),
1505 _ => return Err("UNION requires query results on right side".into()),
1506 };
1507 let mut combined = left_rows;
1508 if *all {
1509 // UNION ALL — just concatenate.
1510 combined.extend(right_rows);
1511 } else {
1512 // UNION — deduplicate using the same HashSet approach
1513 // as DISTINCT. Value already implements Hash + Eq.
1514 let mut seen = std::collections::HashSet::new();
1515 for row in &combined {
1516 seen.insert(row.clone());
1517 }
1518 for row in right_rows {
1519 if seen.insert(row.clone()) {
1520 combined.push(row);
1521 }
1522 }
1523 }
1524 Ok(QueryResult::Rows {
1525 columns: left_cols,
1526 rows: combined,
1527 })
1528 }
1529
1530 PlanNode::Explain { input } => {
1531 let text = format_plan_tree(input, 0);
1532 Ok(QueryResult::Rows {
1533 columns: vec!["plan".to_string()],
1534 rows: text
1535 .lines()
1536 .map(|line| vec![Value::Str(line.to_string())])
1537 .collect(),
1538 })
1539 }
1540
1541 PlanNode::Begin => {
1542 if self.in_transaction {
1543 return Err(QueryError::Execution(
1544 "already in a transaction (nested transactions not supported)".into(),
1545 ));
1546 }
1547 self.in_transaction = true;
1548 Ok(QueryResult::Executed {
1549 message: "transaction started".to_string(),
1550 })
1551 }
1552
1553 PlanNode::Commit => {
1554 if !self.in_transaction {
1555 return Err(QueryError::Execution(
1556 "no active transaction to commit".into(),
1557 ));
1558 }
1559 self.in_transaction = false;
1560 self.catalog
1561 .sync_wal()
1562 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1563 Ok(QueryResult::Executed {
1564 message: "transaction committed".to_string(),
1565 })
1566 }
1567
1568 PlanNode::Rollback => {
1569 if !self.in_transaction {
1570 return Err(QueryError::Execution(
1571 "no active transaction to roll back".into(),
1572 ));
1573 }
1574 self.in_transaction = false;
1575 self.catalog
1576 .rollback_to_last_sync()
1577 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1578 if let Ok(mut cache) = self.plan_cache.lock() {
1579 cache.clear();
1580 }
1581 self.view_registry = ViewRegistry::open(self.catalog.data_dir())
1582 .unwrap_or_else(|_| ViewRegistry::new(self.catalog.data_dir()));
1583 Ok(QueryResult::Executed {
1584 message: "transaction rolled back".to_string(),
1585 })
1586 }
1587
1588 PlanNode::IndexScan { table, column, key } => {
1589 let key_value = literal_to_value(key)?;
1590 let tbl = self
1591 .catalog
1592 .get_table(table)
1593 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1594 let columns: Vec<String> =
1595 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1596
1597 // Fast path: the table has a B-tree on this column.
1598 // Uses index_lookup_all to return ALL matching rows for
1599 // both unique and non-unique indexes.
1600 if tbl.has_index(column) {
1601 let rids = tbl.index_lookup_all(column, &key_value);
1602 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1603 for rid in rids {
1604 if let Some(data) = tbl.heap.get(rid) {
1605 rows.push(decode_row(&tbl.schema, &data));
1606 }
1607 }
1608 return Ok(QueryResult::Rows { columns, rows });
1609 }
1610
1611 // Fallback: no index on this column. The planner emits IndexScan
1612 // eagerly (it has no visibility into which columns are indexed
1613 // at plan time), so here we must behave like SeqScan+Filter on
1614 // `.col = literal`: return *all* matching rows, not just the
1615 // first one. A non-indexed column isn't necessarily unique.
1616 // We compile the eq predicate once and stream without any
1617 // per-row decode for non-matching rows.
1618 let schema = &tbl.schema;
1619 let fast = FastLayout::new(schema);
1620 let synth_pred = Expr::BinaryOp(
1621 Box::new(Expr::Field(column.clone())),
1622 BinOp::Eq,
1623 Box::new(key.clone()),
1624 );
1625 if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1626 // Mission F: skip the first 4 Vec doublings.
1627 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1628 self.catalog
1629 .for_each_row_raw(table, |_rid, data| {
1630 if compiled(data) {
1631 rows.push(decode_row(schema, data));
1632 }
1633 })
1634 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1635 return Ok(QueryResult::Rows { columns, rows });
1636 }
1637
1638 // Last resort: slow eq-check on materialised rows.
1639 let col_idx =
1640 schema
1641 .column_index(column)
1642 .ok_or_else(|| QueryError::ColumnNotFound {
1643 table: String::new(),
1644 column: column.clone(),
1645 })?;
1646 let rows: Vec<Vec<Value>> = tbl
1647 .scan()
1648 .filter_map(|(_, row)| {
1649 if row[col_idx] == key_value {
1650 Some(row)
1651 } else {
1652 None
1653 }
1654 })
1655 .collect();
1656 Ok(QueryResult::Rows { columns, rows })
1657 }
1658
1659 PlanNode::RangeScan {
1660 table,
1661 column,
1662 start,
1663 end,
1664 } => {
1665 let tbl = self
1666 .catalog
1667 .get_table(table)
1668 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1669 let columns: Vec<String> =
1670 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1671 let schema = &tbl.schema;
1672
1673 let start_val = match start {
1674 Some((expr, _)) => Some(literal_to_value(expr)?),
1675 None => None,
1676 };
1677 let end_val = match end {
1678 Some((expr, _)) => Some(literal_to_value(expr)?),
1679 None => None,
1680 };
1681 let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1682 let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1683
1684 // Range scans only use the btree fast path for unique indexes,
1685 // because non-unique indexes store composite keys (column_value
1686 // + RowId) that don't directly compare against raw column values.
1687 if tbl.is_index_unique(column) == Some(true) {
1688 if let Some(btree) = tbl.index(column) {
1689 let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1690 (Some(s), Some(e)) => btree.range(s, e).collect(),
1691 (Some(s), None) => btree.range_from(s),
1692 (None, Some(e)) => btree.range_to(e),
1693 (None, None) => {
1694 let rows: Vec<Vec<Value>> =
1695 tbl.scan().map(|(_, row)| row).collect();
1696 return Ok(QueryResult::Rows { columns, rows });
1697 }
1698 };
1699 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1700 for (key, rid) in hits {
1701 if !start_inclusive {
1702 if let Some(ref s) = start_val {
1703 if &key == s {
1704 continue;
1705 }
1706 }
1707 }
1708 if !end_inclusive {
1709 if let Some(ref e) = end_val {
1710 if &key == e {
1711 continue;
1712 }
1713 }
1714 }
1715 if let Some(data) = tbl.heap.get(rid) {
1716 rows.push(decode_row(schema, &data));
1717 }
1718 }
1719 return Ok(QueryResult::Rows { columns, rows });
1720 }
1721 }
1722
1723 // Fallback: no index — synthesize range predicate and scan.
1724 let fast = FastLayout::new(schema);
1725 let synth = synthesize_range_predicate(column, start, end);
1726 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1727 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1728 self.catalog
1729 .for_each_row_raw(table, |_rid, data| {
1730 if compiled(data) {
1731 rows.push(decode_row(schema, data));
1732 }
1733 })
1734 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1735 return Ok(QueryResult::Rows { columns, rows });
1736 }
1737
1738 let col_idx =
1739 schema
1740 .column_index(column)
1741 .ok_or_else(|| QueryError::ColumnNotFound {
1742 table: String::new(),
1743 column: column.clone(),
1744 })?;
1745 let rows: Vec<Vec<Value>> = tbl
1746 .scan()
1747 .filter(|(_, row)| {
1748 range_matches(
1749 &row[col_idx],
1750 &start_val,
1751 start_inclusive,
1752 &end_val,
1753 end_inclusive,
1754 )
1755 })
1756 .map(|(_, row)| row)
1757 .collect();
1758 Ok(QueryResult::Rows { columns, rows })
1759 }
1760 }
1761 }
1762
1763 // ─── Materialized view operations ──────────────────────────────────────
1764
1765 /// Create a materialized view: execute the source query, store results
1766 /// in a new backing table, and register the view.
1767 fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1768 if self.view_registry.is_view(name) {
1769 return Err(QueryError::ViewError(format!(
1770 "materialized view '{name}' already exists"
1771 )));
1772 }
1773 // Execute the source query to get the result set.
1774 let result = self.execute_powql(query_text)?;
1775 let (columns, rows) = match result {
1776 QueryResult::Rows { columns, rows } => (columns, rows),
1777 _ => return Err("view source query must be a SELECT".into()),
1778 };
1779 // Derive a schema for the backing table from the query result columns.
1780 let schema = self.derive_view_schema(name, &columns, &rows);
1781 // Create the backing table and insert the result rows.
1782 self.catalog
1783 .create_table(schema)
1784 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1785 for row in &rows {
1786 self.catalog
1787 .insert(name, row)
1788 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1789 }
1790 // Determine which base tables this view depends on by parsing the query.
1791 let depends_on = self.extract_view_deps(query_text);
1792 self.view_registry
1793 .register(ViewDef {
1794 name: name.to_string(),
1795 query: query_text.to_string(),
1796 depends_on,
1797 dirty: false,
1798 })
1799 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1800 Ok(())
1801 }
1802
1803 /// Refresh a materialized view: re-execute its source query and replace
1804 /// the backing table's contents.
1805 fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1806 let def = self
1807 .view_registry
1808 .get(name)
1809 .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1810 let query_text = def.query.clone();
1811 // Execute the source query.
1812 let result = self.execute_powql(&query_text)?;
1813 let (_columns, rows) = match result {
1814 QueryResult::Rows { columns, rows } => (columns, rows),
1815 _ => return Err("view source query must be a SELECT".into()),
1816 };
1817 // Clear old data and insert fresh results. Mission B2: logged
1818 // variant — view refreshes are a mutation and crash recovery
1819 // must see them.
1820 self.catalog
1821 .scan_delete_matching_logged(name, |_| true)
1822 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1823 for row in &rows {
1824 self.catalog
1825 .insert(name, row)
1826 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1827 }
1828 self.view_registry.mark_clean(name);
1829 Ok(())
1830 }
1831
1832 /// Drop a materialized view: remove the backing table and unregister.
1833 fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1834 if !self.view_registry.is_view(name) {
1835 return Err(QueryError::ViewError(format!(
1836 "materialized view '{name}' not found"
1837 )));
1838 }
1839 self.view_registry
1840 .unregister(name)
1841 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1842 self.catalog
1843 .drop_table(name)
1844 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1845 Ok(())
1846 }
1847
1848 /// Derive a storage `Schema` for a view's backing table from query
1849 /// result column names and the first row's types.
1850 fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1851 use powdb_storage::types::{ColumnDef, TypeId};
1852 let cols: Vec<ColumnDef> = columns
1853 .iter()
1854 .enumerate()
1855 .map(|(i, col_name)| {
1856 let type_id = rows
1857 .first()
1858 .and_then(|row| row.get(i))
1859 .map(|v| v.type_id())
1860 .unwrap_or(TypeId::Str);
1861 ColumnDef {
1862 name: col_name.clone(),
1863 type_id,
1864 required: false,
1865 position: i as u16,
1866 }
1867 })
1868 .collect();
1869 Schema {
1870 table_name: name.to_string(),
1871 columns: cols,
1872 }
1873 }
1874
1875 /// Extract base table dependencies from a view's source query by
1876 /// parsing it and collecting the source table name.
1877 fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1878 use crate::parser::parse;
1879 match parse(query_text) {
1880 Ok(Statement::Query(q)) => {
1881 let mut deps = vec![q.source.clone()];
1882 for j in &q.joins {
1883 deps.push(j.source.clone());
1884 }
1885 deps
1886 }
1887 _ => Vec::new(),
1888 }
1889 }
1890
1891 // ─── Specialized fast paths ─────────────────────────────────────────────
1892 //
1893 // These methods are helpers for the `execute_plan` match arms above.
1894 // Each returns `Ok(Some(result))` when the fast path fires, `Ok(None)`
1895 // when the shape isn't supported (caller falls back to generic code).
1896
1897 /// Aggregate sum/avg/min/max over a single fixed-size i64 column, with
1898 /// an optional compiled filter predicate. Walks raw row bytes — zero
1899 /// per-row allocation. Uses i128 accumulator for sum/avg overflow safety.
1900 pub(super) fn agg_single_col_fast(
1901 &self,
1902 table: &str,
1903 col: &str,
1904 function: AggFunc,
1905 predicate: Option<&Expr>,
1906 ) -> Result<Option<QueryResult>, QueryError> {
1907 let schema = self
1908 .catalog
1909 .schema(table)
1910 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1911 .clone();
1912 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1913 let col_idx = match schema.column_index(col) {
1914 Some(i) => i,
1915 None => return Ok(None),
1916 };
1917 // Only fast-path fixed-size numeric columns (Int/Float) for
1918 // sum/avg/min/max/count. Mission D10: Float parity — prior version
1919 // bailed on Float columns, forcing them through the generic row-
1920 // decoding path that allocated a Vec<Value> per row and dispatched
1921 // on Value::cmp for every compare. f64 decode is structurally the
1922 // same as i64 (load 8 bytes, cast), so the fast path handles both.
1923 let col_type = schema.columns[col_idx].type_id;
1924 if col_type != TypeId::Int && col_type != TypeId::Float {
1925 return Ok(None);
1926 }
1927
1928 let fast = FastLayout::new(&schema);
1929 // Mission C Phase 20b: inline the numeric-column reader instead of
1930 // building a `Box<dyn Fn>`. Eliminates 100K vtable dispatches per
1931 // 100K-row agg scan — every reader call folds directly into the
1932 // hot loop below.
1933 let byte_offset = match fast.fixed_offsets[col_idx] {
1934 Some(o) => o,
1935 None => return Ok(None),
1936 };
1937 let bitmap_byte = col_idx / 8;
1938 let bitmap_bit = (col_idx % 8) as u32;
1939 let data_offset = 2 + fast.bitmap_size + byte_offset;
1940
1941 // Optional compiled filter.
1942 let compiled_pred: Option<CompiledPredicate> = match predicate {
1943 Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
1944 Some(c) => Some(c),
1945 None => return Ok(None), // let generic path handle it
1946 },
1947 None => None,
1948 };
1949
1950 // Mission C Phase 20b: specialize the inner loop per aggregate
1951 // function. The previous version ran a `match function { ... }`
1952 // *inside* the closure, which kept LLVM from producing optimal
1953 // scalar code for each variant (agg_max regressed ~23% vs the
1954 // baseline Box<dyn Fn> version even though per-row vtable cost
1955 // should have been strictly lower). Pushing the match out of the
1956 // hot loop lets each specialized body fold cleanly into
1957 // `for_each_row_raw` and removes a captured `AggFunc` + match
1958 // dispatch per row.
1959 //
1960 // Mission D10: same specialisation applies to the Float branch.
1961 // For Min/Max we use `f64::total_cmp` so the result matches
1962 // `Value::Ord` — this is the same ordering ORDER BY and the
1963 // top-N sort fast path use, keeping semantics consistent across
1964 // read paths (NaN compares as greatest, -0.0 < +0.0 for
1965 // deterministic tie-breaking).
1966 //
1967 // Mission D11 Phase 1: each inner loop now splits on presence of
1968 // a predicate (`if let Some(pred) = &compiled_pred`) so the hot
1969 // body never re-tests `Option` per row, and reads column bytes
1970 // via `read_i64_unchecked` / `read_f64_unchecked` helpers that
1971 // drop two bounds checks per row (null bitmap byte + value
1972 // slice). Safety is carried by the `FastLayout` invariant that
1973 // `data_offset + 8 <= row_len` for any fixed-size column; see
1974 // the helper doc comments. Hot loops are macro-generated so the
1975 // with-pred / no-pred split can't drift between variants.
1976 let result = match col_type {
1977 TypeId::Int => match function {
1978 AggFunc::Sum | AggFunc::Avg => {
1979 let mut sum_i128: i128 = 0;
1980 let mut count: i64 = 0;
1981 agg_int_loop!(
1982 self,
1983 table,
1984 compiled_pred,
1985 bitmap_byte,
1986 bitmap_bit,
1987 data_offset,
1988 |v: i64| {
1989 count += 1;
1990 sum_i128 += v as i128;
1991 }
1992 );
1993 if matches!(function, AggFunc::Sum) {
1994 let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
1995 QueryResult::Scalar(Value::Int(clamped))
1996 } else if count == 0 {
1997 QueryResult::Scalar(Value::Empty)
1998 } else {
1999 let avg = (sum_i128 as f64) / (count as f64);
2000 QueryResult::Scalar(Value::Float(avg))
2001 }
2002 }
2003 AggFunc::Min => {
2004 let mut min_v: Option<i64> = None;
2005 agg_int_loop!(
2006 self,
2007 table,
2008 compiled_pred,
2009 bitmap_byte,
2010 bitmap_bit,
2011 data_offset,
2012 |v: i64| {
2013 min_v = Some(match min_v {
2014 Some(m) => m.min(v),
2015 None => v,
2016 });
2017 }
2018 );
2019 QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
2020 }
2021 AggFunc::Max => {
2022 let mut max_v: Option<i64> = None;
2023 agg_int_loop!(
2024 self,
2025 table,
2026 compiled_pred,
2027 bitmap_byte,
2028 bitmap_bit,
2029 data_offset,
2030 |v: i64| {
2031 max_v = Some(match max_v {
2032 Some(m) => m.max(v),
2033 None => v,
2034 });
2035 }
2036 );
2037 QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
2038 }
2039 AggFunc::Count => {
2040 let mut count: i64 = 0;
2041 agg_int_loop!(
2042 self,
2043 table,
2044 compiled_pred,
2045 bitmap_byte,
2046 bitmap_bit,
2047 data_offset,
2048 |_v: i64| {
2049 count += 1;
2050 }
2051 );
2052 QueryResult::Scalar(Value::Int(count))
2053 }
2054 AggFunc::CountDistinct => {
2055 let mut seen = rustc_hash::FxHashSet::default();
2056 agg_int_loop!(
2057 self,
2058 table,
2059 compiled_pred,
2060 bitmap_byte,
2061 bitmap_bit,
2062 data_offset,
2063 |v: i64| {
2064 seen.insert(v);
2065 }
2066 );
2067 QueryResult::Scalar(Value::Int(seen.len() as i64))
2068 }
2069 },
2070 TypeId::Float => match function {
2071 AggFunc::Sum => {
2072 // Use a single f64 accumulator. Naive summation is
2073 // sufficient for MVP parity; if precision becomes an
2074 // issue on long scans we can upgrade to Kahan–Neumaier
2075 // compensated sum (~2x scalar cost, zero error growth).
2076 let mut sum: f64 = 0.0;
2077 agg_float_loop!(
2078 self,
2079 table,
2080 compiled_pred,
2081 bitmap_byte,
2082 bitmap_bit,
2083 data_offset,
2084 |v: f64| {
2085 sum += v;
2086 }
2087 );
2088 QueryResult::Scalar(Value::Float(sum))
2089 }
2090 AggFunc::Avg => {
2091 let mut sum: f64 = 0.0;
2092 let mut count: i64 = 0;
2093 agg_float_loop!(
2094 self,
2095 table,
2096 compiled_pred,
2097 bitmap_byte,
2098 bitmap_bit,
2099 data_offset,
2100 |v: f64| {
2101 sum += v;
2102 count += 1;
2103 }
2104 );
2105 if count == 0 {
2106 QueryResult::Scalar(Value::Empty)
2107 } else {
2108 QueryResult::Scalar(Value::Float(sum / count as f64))
2109 }
2110 }
2111 AggFunc::Min => {
2112 // `total_cmp` for deterministic NaN handling (matches
2113 // Value::Ord). NaN compares greatest, so Min will
2114 // correctly ignore it in favour of any finite value.
2115 let mut min_v: Option<f64> = None;
2116 agg_float_loop!(
2117 self,
2118 table,
2119 compiled_pred,
2120 bitmap_byte,
2121 bitmap_bit,
2122 data_offset,
2123 |v: f64| {
2124 min_v = Some(match min_v {
2125 Some(m) => {
2126 if v.total_cmp(&m).is_lt() {
2127 v
2128 } else {
2129 m
2130 }
2131 }
2132 None => v,
2133 });
2134 }
2135 );
2136 QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2137 }
2138 AggFunc::Max => {
2139 let mut max_v: Option<f64> = None;
2140 agg_float_loop!(
2141 self,
2142 table,
2143 compiled_pred,
2144 bitmap_byte,
2145 bitmap_bit,
2146 data_offset,
2147 |v: f64| {
2148 max_v = Some(match max_v {
2149 Some(m) => {
2150 if v.total_cmp(&m).is_gt() {
2151 v
2152 } else {
2153 m
2154 }
2155 }
2156 None => v,
2157 });
2158 }
2159 );
2160 QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2161 }
2162 AggFunc::Count => {
2163 let mut count: i64 = 0;
2164 agg_float_loop!(
2165 self,
2166 table,
2167 compiled_pred,
2168 bitmap_byte,
2169 bitmap_bit,
2170 data_offset,
2171 |_v: f64| {
2172 count += 1;
2173 }
2174 );
2175 QueryResult::Scalar(Value::Int(count))
2176 }
2177 AggFunc::CountDistinct => {
2178 // Hash on `f64::to_bits` — matches `Value::Hash`, so
2179 // distinct NaN bit patterns count as distinct and
2180 // -0.0/+0.0 count as distinct. Consistent with how
2181 // Float values are hashed in every other DISTINCT /
2182 // GROUP BY path.
2183 let mut seen = rustc_hash::FxHashSet::default();
2184 agg_float_loop!(
2185 self,
2186 table,
2187 compiled_pred,
2188 bitmap_byte,
2189 bitmap_bit,
2190 data_offset,
2191 |v: f64| {
2192 seen.insert(v.to_bits());
2193 }
2194 );
2195 QueryResult::Scalar(Value::Int(seen.len() as i64))
2196 }
2197 },
2198 _ => unreachable!("type guard above restricts to Int/Float"),
2199 };
2200 Ok(Some(result))
2201 }
2202
2203 /// `Project(Limit(Filter(SeqScan)))` and `Project(Limit(SeqScan))`.
2204 /// Streams rows, decodes only projected columns, stops at the limit.
2205 pub(super) fn project_filter_limit_fast(
2206 &self,
2207 table: &str,
2208 fields: &[ProjectField],
2209 limit: usize,
2210 predicate: Option<&Expr>,
2211 ) -> Result<Option<QueryResult>, QueryError> {
2212 let schema = self
2213 .catalog
2214 .schema(table)
2215 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2216 .clone();
2217 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2218
2219 // Each projection field must be a simple `.field` reference for this
2220 // fast path. Aliased or computed fields fall through.
2221 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2222 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2223 for f in fields {
2224 let name = match &f.expr {
2225 Expr::Field(n) => n.clone(),
2226 _ => return Ok(None),
2227 };
2228 let idx = match all_columns.iter().position(|c| c == &name) {
2229 Some(i) => i,
2230 None => return Ok(None),
2231 };
2232 proj_indices.push(idx);
2233 proj_columns.push(f.alias.clone().unwrap_or(name));
2234 }
2235
2236 let fast = FastLayout::new(&schema);
2237 let row_layout = RowLayout::new(&schema);
2238
2239 let compiled_pred: Option<CompiledPredicate> = match predicate {
2240 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2241 Some(c) => Some(c),
2242 None => return Ok(None),
2243 },
2244 None => None,
2245 };
2246
2247 let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2248 // Mission D2: use try_for_each_row_raw to actually stop iterating
2249 // once the limit is reached. The previous `done` flag only short-
2250 // circuited the closure body, so a `limit 100` over 100K rows still
2251 // walked all 100K slots — burning ~30x SQLite on scan_filter_project_top100.
2252 self.catalog
2253 .try_for_each_row_raw(table, |_rid, data| {
2254 use std::ops::ControlFlow;
2255 if let Some(ref pred) = compiled_pred {
2256 if !pred(data) {
2257 return ControlFlow::Continue(());
2258 }
2259 }
2260 let row: Vec<Value> = proj_indices
2261 .iter()
2262 .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2263 .collect();
2264 out.push(row);
2265 if out.len() >= limit {
2266 ControlFlow::Break(())
2267 } else {
2268 ControlFlow::Continue(())
2269 }
2270 })
2271 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2272
2273 Ok(Some(QueryResult::Rows {
2274 columns: proj_columns,
2275 rows: out,
2276 }))
2277 }
2278
2279 /// `Project(Limit(Sort(Filter(SeqScan))))` and `Project(Limit(Sort(SeqScan)))`.
2280 /// Bounded top-N heap over the sort key. Only the sort key needs to be
2281 /// read per row; projected columns are decoded only for the final
2282 /// winning rows when the heap drains.
2283 pub(super) fn project_filter_sort_limit_fast(
2284 &self,
2285 table: &str,
2286 fields: &[ProjectField],
2287 sort_field: &str,
2288 descending: bool,
2289 limit: usize,
2290 predicate: Option<&Expr>,
2291 ) -> Result<Option<QueryResult>, QueryError> {
2292 if limit == 0 {
2293 // Degenerate case — empty result. Let the generic path handle it
2294 // for proper column naming.
2295 return Ok(None);
2296 }
2297 let schema = self
2298 .catalog
2299 .schema(table)
2300 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2301 .clone();
2302 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2303
2304 // Sort key must be a fixed-size numeric column (Int or Float).
2305 // Mission D10: extended from Int-only. Float sort keys use a
2306 // sortable-u64 transform (see `f64_to_sortable_u64`) so the heap
2307 // path stays keyed on `u64` and the whole branch shape is
2308 // identical to the Int case — no new heap types, no `total_cmp`
2309 // closures in the hot loop.
2310 let sort_idx = match schema.column_index(sort_field) {
2311 Some(i) => i,
2312 None => return Ok(None),
2313 };
2314 let sort_col_type = schema.columns[sort_idx].type_id;
2315 if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2316 return Ok(None);
2317 }
2318
2319 // Each projection field must be a simple `.field`.
2320 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2321 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2322 for f in fields {
2323 let name = match &f.expr {
2324 Expr::Field(n) => n.clone(),
2325 _ => return Ok(None),
2326 };
2327 let idx = match all_columns.iter().position(|c| c == &name) {
2328 Some(i) => i,
2329 None => return Ok(None),
2330 };
2331 proj_indices.push(idx);
2332 proj_columns.push(f.alias.clone().unwrap_or(name));
2333 }
2334
2335 let fast = FastLayout::new(&schema);
2336 let row_layout = RowLayout::new(&schema);
2337 // Mission C Phase 20b: inline numeric-column reader (no Box<dyn Fn>).
2338 let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2339 Some(o) => o,
2340 None => return Ok(None),
2341 };
2342 let sort_bitmap_byte = sort_idx / 8;
2343 let sort_bitmap_bit = (sort_idx % 8) as u32;
2344 let sort_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2345
2346 let compiled_pred: Option<CompiledPredicate> = match predicate {
2347 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2348 Some(c) => Some(c),
2349 None => return Ok(None),
2350 },
2351 None => None,
2352 };
2353
2354 // Bounded top-N heap. For `order .x desc limit N`, we want the N
2355 // largest values — use a min-heap so the smallest is at the top and
2356 // can be popped when a better candidate arrives. For ascending, use
2357 // a max-heap. We tie-break with a monotonic `seq` counter so the
2358 // result is deterministic and stable.
2359 //
2360 // To keep this simple we maintain two typed heaps and pick by
2361 // direction.
2362 let drained: Vec<Vec<u8>> = match sort_col_type {
2363 TypeId::Int => {
2364 let mut seq: u64 = 0;
2365 let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2366 BinaryHeap::with_capacity(limit);
2367 let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2368 BinaryHeap::with_capacity(limit);
2369
2370 self.catalog
2371 .for_each_row_raw(table, |_rid, data| {
2372 if let Some(ref pred) = compiled_pred {
2373 if !pred(data) {
2374 return;
2375 }
2376 }
2377 // Inlined int-column reader: null check + i64 decode.
2378 if data.len() < sort_data_offset + 8 {
2379 return;
2380 }
2381 let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2382 if is_null {
2383 return;
2384 }
2385 let key = i64::from_le_bytes(
2386 data[sort_data_offset..sort_data_offset + 8]
2387 .try_into()
2388 .unwrap_or_else(|_| unreachable!()),
2389 );
2390 let id = seq;
2391 seq += 1;
2392
2393 if descending {
2394 if heap_desc.len() < limit {
2395 heap_desc.push(Reverse((key, id, data.to_vec())));
2396 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2397 if key > *top_key {
2398 heap_desc.pop();
2399 heap_desc.push(Reverse((key, id, data.to_vec())));
2400 }
2401 }
2402 } else if heap_asc.len() < limit {
2403 heap_asc.push((key, id, data.to_vec()));
2404 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2405 if key < *top_key {
2406 heap_asc.pop();
2407 heap_asc.push((key, id, data.to_vec()));
2408 }
2409 }
2410 })
2411 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2412
2413 let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2414 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2415 } else {
2416 heap_asc.into_iter().collect()
2417 };
2418 if descending {
2419 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2420 } else {
2421 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2422 }
2423 drained.into_iter().map(|(_, _, d)| d).collect()
2424 }
2425 TypeId::Float => {
2426 // Novel angle: rather than introducing a `TotalF64` newtype
2427 // with `Ord via total_cmp`, transform the f64 bit pattern
2428 // into a sortable `u64` so `BinaryHeap<u64>` orders exactly
2429 // like `f64::total_cmp` would. Classic trick: flip the sign
2430 // bit on positives, flip all bits on negatives. Result:
2431 // - NaN (sign=0) stays greatest, matching total_cmp
2432 // - -0.0 sorts before +0.0, matching total_cmp
2433 // - Hot loop is branch-cheap (one compare + one xor)
2434 let mut seq: u64 = 0;
2435 let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2436 BinaryHeap::with_capacity(limit);
2437 let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2438 BinaryHeap::with_capacity(limit);
2439
2440 self.catalog
2441 .for_each_row_raw(table, |_rid, data| {
2442 if let Some(ref pred) = compiled_pred {
2443 if !pred(data) {
2444 return;
2445 }
2446 }
2447 if data.len() < sort_data_offset + 8 {
2448 return;
2449 }
2450 let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2451 if is_null {
2452 return;
2453 }
2454 let bits = u64::from_le_bytes(
2455 data[sort_data_offset..sort_data_offset + 8]
2456 .try_into()
2457 .unwrap_or_else(|_| unreachable!()),
2458 );
2459 let key = f64_bits_to_sortable_u64(bits);
2460 let id = seq;
2461 seq += 1;
2462
2463 if descending {
2464 if heap_desc.len() < limit {
2465 heap_desc.push(Reverse((key, id, data.to_vec())));
2466 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2467 if key > *top_key {
2468 heap_desc.pop();
2469 heap_desc.push(Reverse((key, id, data.to_vec())));
2470 }
2471 }
2472 } else if heap_asc.len() < limit {
2473 heap_asc.push((key, id, data.to_vec()));
2474 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2475 if key < *top_key {
2476 heap_asc.pop();
2477 heap_asc.push((key, id, data.to_vec()));
2478 }
2479 }
2480 })
2481 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2482
2483 let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2484 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2485 } else {
2486 heap_asc.into_iter().collect()
2487 };
2488 if descending {
2489 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2490 } else {
2491 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2492 }
2493 drained.into_iter().map(|(_, _, d)| d).collect()
2494 }
2495 _ => unreachable!("type guard above restricts to Int/Float"),
2496 };
2497
2498 let rows: Vec<Vec<Value>> = drained
2499 .into_iter()
2500 .map(|data| {
2501 proj_indices
2502 .iter()
2503 .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2504 .collect()
2505 })
2506 .collect();
2507
2508 Ok(Some(QueryResult::Rows {
2509 columns: proj_columns,
2510 rows,
2511 }))
2512 }
2513
2514 /// Gather the RowIds that a mutation should operate on, without
2515 /// materialising the full row set. Handles the shapes the planner emits
2516 /// for update/delete: SeqScan, IndexScan, and Filter(SeqScan). Other
2517 /// shapes fall back to `generic_rid_match`.
2518 ///
2519 /// Perf sprint: try to fuse the predicate evaluation and in-place
2520 /// byte-level mutation into a single heap walk. Returns `Some(result)`
2521 /// if the fused path fired, `None` to fall through to the generic
2522 /// two-pass code.
2523 ///
2524 /// Covers two shapes:
2525 /// 1. Fixed-width non-null literal assignments on non-indexed columns
2526 /// → byte-patch every matched row in place (row length unchanged).
2527 /// 2. Single var-col literal assignment on a non-indexed column
2528 /// → `patch_var_column_in_place` on every matched row (may shrink);
2529 /// rows that can't be patched in place are collected for fallback.
2530 fn try_fused_scan_update(
2531 &mut self,
2532 table: &str,
2533 predicate: &Expr,
2534 resolved: &[(usize, Value)],
2535 changed_cols: &[usize],
2536 ) -> Option<Result<QueryResult, QueryError>> {
2537 // Build compiled predicate. Requires a schema borrow that must be
2538 // dropped before we call scan_patch_matching_logged.
2539 let compiled = {
2540 let schema = self.catalog.schema(table)?;
2541 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2542 let fast = FastLayout::new(schema);
2543 compile_predicate(predicate, &columns, &fast, schema)?
2544 };
2545
2546 // ── Path 1: fixed-width fast patch ──────────────────────────
2547 let fixed_patches: Option<Vec<FastPatch>> = {
2548 let tbl = self.catalog.get_table(table)?;
2549 let schema = &tbl.schema;
2550 let all_fixed_nonnull = resolved
2551 .iter()
2552 .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2553 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2554 if all_fixed_nonnull && no_indexed {
2555 let layout = RowLayout::new(schema);
2556 let bitmap_size = layout.bitmap_size();
2557 Some(
2558 resolved
2559 .iter()
2560 .map(|(idx, val)| {
2561 let fixed_off = layout
2562 .fixed_offset(*idx)
2563 .expect("is_fixed_size already checked");
2564 let field_off = 2 + bitmap_size + fixed_off;
2565 let bytes: FixedBytes = match val {
2566 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2567 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2568 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2569 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2570 Value::Uuid(v) => FixedBytes::Uuid(*v),
2571 _ => unreachable!("all_fixed_nonnull guard"),
2572 };
2573 FastPatch {
2574 field_off,
2575 bitmap_byte_off: 2 + idx / 8,
2576 bit_mask: 1u8 << (idx % 8),
2577 bytes,
2578 }
2579 })
2580 .collect(),
2581 )
2582 } else {
2583 None
2584 }
2585 };
2586 if let Some(patches) = fixed_patches {
2587 let result = self
2588 .catalog
2589 .scan_patch_matching_logged(table, compiled, |row| {
2590 for p in &patches {
2591 row[p.bitmap_byte_off] &= !p.bit_mask;
2592 let field_bytes = p.bytes.as_slice();
2593 row[p.field_off..p.field_off + field_bytes.len()]
2594 .copy_from_slice(field_bytes);
2595 }
2596 Some(row.len() as u16)
2597 })
2598 .map_err(|e| e.to_string());
2599 match result {
2600 Ok((count, _)) => {
2601 self.view_registry.mark_dependents_dirty(table);
2602 return Some(Ok(QueryResult::Modified(count)));
2603 }
2604 Err(e) => return Some(Err(QueryError::Execution(e))),
2605 }
2606 }
2607
2608 // ── Path 2: single var-col shrink fast patch ────────────────
2609 let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2610 let tbl = self.catalog.get_table(table)?;
2611 let schema = &tbl.schema;
2612 let is_single = resolved.len() == 1;
2613 let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2614 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2615 if is_single && is_var && no_indexed {
2616 let (idx, val) = &resolved[0];
2617 let bytes_opt = match val {
2618 Value::Str(s) => Some(s.as_bytes().to_vec()),
2619 Value::Bytes(b) => Some(b.clone()),
2620 Value::Empty => None,
2621 _ => return None, // type mismatch, fall through
2622 };
2623 Some((*idx, bytes_opt))
2624 } else {
2625 None
2626 }
2627 };
2628 if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2629 // Build a fresh RowLayout before the mutable borrow.
2630 let layout = {
2631 let schema = self.catalog.schema(table)?;
2632 RowLayout::new(schema)
2633 };
2634 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2635 let result = self
2636 .catalog
2637 .scan_patch_matching_logged(table, compiled, |row| {
2638 patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2639 })
2640 .map_err(|e| e.to_string());
2641 match result {
2642 Ok((mut count, fallback_rids)) => {
2643 // Handle rows where in-place patch failed (new > old).
2644 for rid in fallback_rids {
2645 let mut row = match self.catalog.get(table, rid) {
2646 Some(r) => r,
2647 None => continue,
2648 };
2649 for (idx, val) in resolved.iter() {
2650 row[*idx] = val.clone();
2651 }
2652 self.catalog
2653 .update_hinted(table, rid, &row, Some(changed_cols))
2654 .map_err(|e| e.to_string())
2655 .ok();
2656 count += 1;
2657 }
2658 self.view_registry.mark_dependents_dirty(table);
2659 return Some(Ok(QueryResult::Modified(count)));
2660 }
2661 Err(e) => return Some(Err(QueryError::Execution(e))),
2662 }
2663 }
2664
2665 None // no fused path applicable — fall through
2666 }
2667
2668 /// Mission C Phase 3: schema is looked up via `self.catalog.schema(table)`
2669 /// inside the branches that actually need it. Previously the caller had
2670 /// to clone the full Schema (6+ String allocs) before every mutation just
2671 /// so this function could borrow it — a cost the update/delete hot path
2672 /// did not need.
2673 fn collect_rids_for_mutation(
2674 &mut self,
2675 input: &PlanNode,
2676 table: &str,
2677 ) -> Result<Vec<RowId>, QueryError> {
2678 match input {
2679 PlanNode::SeqScan { table: t } if t == table => {
2680 // "Update/delete everything" — rare but legal.
2681 let rids: Vec<RowId> = self
2682 .catalog
2683 .scan(table)
2684 .map_err(|e| QueryError::StorageError(e.to_string()))?
2685 .map(|(rid, _)| rid)
2686 .collect();
2687 Ok(rids)
2688 }
2689 PlanNode::IndexScan {
2690 table: t,
2691 column,
2692 key,
2693 } if t == table => {
2694 let key_value = literal_to_value(key)?;
2695
2696 // Indexed case: single lookup, 0 or 1 rows.
2697 // Mission D7: int-specialized fast path on int-keyed indexes
2698 // (primary keys, created_at, etc.) — the common case for
2699 // `update_by_pk` / `delete where id = ?`.
2700 //
2701 // Scope the `tbl` borrow so it's released before we fall
2702 // through to the scan-based paths below (which reborrow
2703 // `self.catalog`).
2704 {
2705 let tbl = self
2706 .catalog
2707 .get_table(table)
2708 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2709 if tbl.has_index(column) {
2710 let rids = tbl.index_lookup_all(column, &key_value);
2711 return Ok(rids);
2712 }
2713 }
2714
2715 // No index: the planner folds `.col = literal` to IndexScan
2716 // regardless of whether the column is actually unique. When
2717 // there's no index we must behave like Filter(SeqScan) and
2718 // return *all* matching RIDs — not just the first one.
2719 let schema = self
2720 .catalog
2721 .schema(table)
2722 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2723 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2724 let fast = FastLayout::new(schema);
2725 let synth = Expr::BinaryOp(
2726 Box::new(Expr::Field(column.clone())),
2727 BinOp::Eq,
2728 Box::new(key.clone()),
2729 );
2730 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2731 // Mission F: skip the first 4 Vec doublings.
2732 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2733 self.catalog
2734 .for_each_row_raw(table, |rid, data| {
2735 if compiled(data) {
2736 rids.push(rid);
2737 }
2738 })
2739 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2740 return Ok(rids);
2741 }
2742
2743 // Fallback: decode each row, compare values.
2744 let col_idx =
2745 schema
2746 .column_index(column)
2747 .ok_or_else(|| QueryError::ColumnNotFound {
2748 table: String::new(),
2749 column: column.clone(),
2750 })?;
2751 let rids: Vec<RowId> = self
2752 .catalog
2753 .scan(table)
2754 .map_err(|e| QueryError::StorageError(e.to_string()))?
2755 .filter_map(|(rid, row)| {
2756 if row[col_idx] == key_value {
2757 Some(rid)
2758 } else {
2759 None
2760 }
2761 })
2762 .collect();
2763 Ok(rids)
2764 }
2765 PlanNode::Filter {
2766 input: inner,
2767 predicate,
2768 } => {
2769 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2770 if t != table {
2771 return self.generic_rid_match(input, table);
2772 }
2773 let schema = self
2774 .catalog
2775 .schema(table)
2776 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2777 let columns: Vec<String> =
2778 schema.columns.iter().map(|c| c.name.clone()).collect();
2779 let fast = FastLayout::new(schema);
2780 let row_layout = RowLayout::new(schema);
2781
2782 // Try compiled predicate first.
2783 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2784 // Mission F: skip the first 4 Vec doublings.
2785 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2786 self.catalog
2787 .for_each_row_raw(table, |rid, data| {
2788 if compiled(data) {
2789 rids.push(rid);
2790 }
2791 })
2792 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2793 return Ok(rids);
2794 }
2795
2796 // Fallback: selective decode + eval.
2797 let pred_cols = predicate_column_indices(predicate, &columns);
2798 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2799 self.catalog
2800 .for_each_row_raw(table, |rid, data| {
2801 let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2802 if eval_predicate(predicate, &pred_row, &columns) {
2803 rids.push(rid);
2804 }
2805 })
2806 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2807 return Ok(rids);
2808 }
2809 self.generic_rid_match(input, table)
2810 }
2811 _ => self.generic_rid_match(input, table),
2812 }
2813 }
2814
2815 /// Last-ditch generic match: execute the plan, collect matching rows,
2816 /// then find corresponding RowIds by value equality. This is the old
2817 /// O(N*M) code path; only used when the plan shape is something exotic.
2818 fn generic_rid_match(
2819 &mut self,
2820 input: &PlanNode,
2821 table: &str,
2822 ) -> Result<Vec<RowId>, QueryError> {
2823 let result = self.execute_plan(input)?;
2824 let rows = match result {
2825 QueryResult::Rows { rows, .. } => rows,
2826 _ => return Err("mutation source must be rows".into()),
2827 };
2828 let matching: Vec<RowId> = self
2829 .catalog
2830 .scan(table)
2831 .map_err(|e| QueryError::StorageError(e.to_string()))?
2832 .filter(|(_, row)| rows.iter().any(|r| r == row))
2833 .map(|(rid, _)| rid)
2834 .collect();
2835 Ok(matching)
2836 }
2837}
2838
2839pub(super) fn execute_window(
2840 result: QueryResult,
2841 windows: &[WindowDef],
2842) -> Result<QueryResult, QueryError> {
2843 let (mut columns, mut rows) = match result {
2844 QueryResult::Rows { columns, rows } => (columns, rows),
2845 _ => return Err("window function requires row input".into()),
2846 };
2847
2848 for wdef in windows {
2849 // Resolve partition/order column indices against current columns.
2850 let part_indices: Vec<usize> = wdef
2851 .partition_by
2852 .iter()
2853 .map(|name| {
2854 columns
2855 .iter()
2856 .position(|c| c == name)
2857 .ok_or_else(|| format!("window partition column '{name}' not found"))
2858 })
2859 .collect::<Result<Vec<_>, _>>()?;
2860
2861 let ord_indices: Vec<(usize, bool)> = wdef
2862 .order_by
2863 .iter()
2864 .map(|sk| {
2865 columns
2866 .iter()
2867 .position(|c| c == &sk.field)
2868 .map(|i| (i, sk.descending))
2869 .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2870 })
2871 .collect::<Result<Vec<_>, _>>()?;
2872
2873 // Resolve the argument column index (for aggregate windows).
2874 let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2875 match arg {
2876 Expr::Field(name) => {
2877 if name == "*" {
2878 None // count(*) style — no specific column
2879 } else {
2880 Some(
2881 columns
2882 .iter()
2883 .position(|c| c == name)
2884 .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2885 )
2886 }
2887 }
2888 _ => None,
2889 }
2890 } else {
2891 None
2892 };
2893
2894 // Build a sort-index to sort rows by partition_by then order_by
2895 // without actually reordering the original Vec (we need original
2896 // order to write results back).
2897 let n = rows.len();
2898 let mut indices: Vec<usize> = (0..n).collect();
2899 indices.sort_by(|&a, &b| {
2900 // Compare partition keys first.
2901 for &pi in &part_indices {
2902 let cmp = rows[a][pi].cmp(&rows[b][pi]);
2903 if cmp != std::cmp::Ordering::Equal {
2904 return cmp;
2905 }
2906 }
2907 // Then order keys.
2908 for &(oi, desc) in &ord_indices {
2909 let cmp = rows[a][oi].cmp(&rows[b][oi]);
2910 if cmp != std::cmp::Ordering::Equal {
2911 return if desc { cmp.reverse() } else { cmp };
2912 }
2913 }
2914 std::cmp::Ordering::Equal
2915 });
2916
2917 // Compute window values in sorted order, tracking partition boundaries.
2918 let mut win_values: Vec<Value> = vec![Value::Empty; n];
2919 let mut partition_start = 0usize;
2920 // Running state for aggregate windows:
2921 let mut running_count: i64 = 0;
2922 let mut running_int_sum: i64 = 0;
2923 let mut running_float_sum: f64 = 0.0;
2924 let mut running_saw_float = false;
2925 let mut running_min: Option<Value> = None;
2926 let mut running_max: Option<Value> = None;
2927 let mut rank_counter: i64 = 0;
2928 let mut dense_rank_counter: i64 = 0;
2929 let mut prev_order_key: Option<Vec<Value>> = None;
2930 let mut same_rank_count: i64 = 0;
2931
2932 for sorted_pos in 0..n {
2933 let row_idx = indices[sorted_pos];
2934
2935 // Detect partition boundary.
2936 let new_partition = if sorted_pos == 0 {
2937 true
2938 } else {
2939 let prev_row_idx = indices[sorted_pos - 1];
2940 part_indices
2941 .iter()
2942 .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
2943 };
2944
2945 if new_partition {
2946 partition_start = sorted_pos;
2947 running_count = 0;
2948 running_int_sum = 0;
2949 running_float_sum = 0.0;
2950 running_saw_float = false;
2951 running_min = None;
2952 running_max = None;
2953 rank_counter = 0;
2954 dense_rank_counter = 0;
2955 prev_order_key = None;
2956 same_rank_count = 0;
2957 }
2958
2959 // Extract current order key for rank tracking.
2960 let current_order_key: Vec<Value> = ord_indices
2961 .iter()
2962 .map(|&(oi, _)| rows[row_idx][oi].clone())
2963 .collect();
2964 let same_as_prev = prev_order_key.as_ref() == Some(¤t_order_key);
2965
2966 let value = match wdef.function {
2967 WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
2968 WindowFunc::Rank => {
2969 if same_as_prev {
2970 same_rank_count += 1;
2971 } else {
2972 rank_counter += same_rank_count + 1;
2973 same_rank_count = 0;
2974 if rank_counter == 0 {
2975 rank_counter = 1;
2976 }
2977 }
2978 Value::Int(rank_counter)
2979 }
2980 WindowFunc::DenseRank => {
2981 if !same_as_prev {
2982 dense_rank_counter += 1;
2983 }
2984 Value::Int(dense_rank_counter)
2985 }
2986 WindowFunc::Sum => {
2987 if let Some(ci) = arg_col_idx {
2988 match &rows[row_idx][ci] {
2989 Value::Int(v) => running_int_sum += v,
2990 Value::Float(v) => {
2991 running_float_sum += v;
2992 running_saw_float = true;
2993 }
2994 _ => {}
2995 }
2996 }
2997 if running_saw_float {
2998 Value::Float(running_float_sum + running_int_sum as f64)
2999 } else {
3000 Value::Int(running_int_sum)
3001 }
3002 }
3003 WindowFunc::Avg => {
3004 if let Some(ci) = arg_col_idx {
3005 match &rows[row_idx][ci] {
3006 Value::Int(v) => {
3007 running_float_sum += *v as f64;
3008 running_count += 1;
3009 }
3010 Value::Float(v) => {
3011 running_float_sum += v;
3012 running_count += 1;
3013 }
3014 _ => {}
3015 }
3016 }
3017 if running_count == 0 {
3018 Value::Empty
3019 } else {
3020 Value::Float(running_float_sum / running_count as f64)
3021 }
3022 }
3023 WindowFunc::Count => {
3024 if let Some(ci) = arg_col_idx {
3025 if !rows[row_idx][ci].is_empty() {
3026 running_count += 1;
3027 }
3028 } else {
3029 // count(*) — count all rows
3030 running_count += 1;
3031 }
3032 Value::Int(running_count)
3033 }
3034 WindowFunc::Min => {
3035 if let Some(ci) = arg_col_idx {
3036 let v = &rows[row_idx][ci];
3037 if !v.is_empty() {
3038 running_min = Some(match &running_min {
3039 None => v.clone(),
3040 Some(cur) => {
3041 if v < cur {
3042 v.clone()
3043 } else {
3044 cur.clone()
3045 }
3046 }
3047 });
3048 }
3049 }
3050 running_min.clone().unwrap_or(Value::Empty)
3051 }
3052 WindowFunc::Max => {
3053 if let Some(ci) = arg_col_idx {
3054 let v = &rows[row_idx][ci];
3055 if !v.is_empty() {
3056 running_max = Some(match &running_max {
3057 None => v.clone(),
3058 Some(cur) => {
3059 if v > cur {
3060 v.clone()
3061 } else {
3062 cur.clone()
3063 }
3064 }
3065 });
3066 }
3067 }
3068 running_max.clone().unwrap_or(Value::Empty)
3069 }
3070 };
3071
3072 prev_order_key = Some(current_order_key);
3073 win_values[row_idx] = value;
3074 }
3075
3076 // Append the computed window column to each row.
3077 for (ri, row) in rows.iter_mut().enumerate() {
3078 row.push(win_values[ri].clone());
3079 }
3080 columns.push(wdef.output_name.clone());
3081 }
3082
3083 Ok(QueryResult::Rows { columns, rows })
3084}
3085
3086/// Mission E2b: compute one aggregate over a set of rows in a group.
3087pub(super) fn compute_group_aggregate(
3088 func: AggFunc,
3089 all_rows: &[Vec<Value>],
3090 row_indices: &[usize],
3091 col_idx: usize,
3092) -> Value {
3093 match func {
3094 AggFunc::Count => {
3095 if col_idx == usize::MAX {
3096 // count(*) — count all rows in the group.
3097 return Value::Int(row_indices.len() as i64);
3098 }
3099 let count = row_indices
3100 .iter()
3101 .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3102 .count();
3103 Value::Int(count as i64)
3104 }
3105 AggFunc::CountDistinct => {
3106 let mut seen = std::collections::HashSet::new();
3107 for &ri in row_indices {
3108 let v = &all_rows[ri][col_idx];
3109 if !v.is_empty() {
3110 seen.insert(v.clone());
3111 }
3112 }
3113 Value::Int(seen.len() as i64)
3114 }
3115 AggFunc::Sum => {
3116 // Mirror the scalar Sum path: accumulate int and float
3117 // contributions separately and promote the final result to
3118 // Float if any Float row was observed. Prevents silent
3119 // drop of Float columns in GROUP BY aggregates.
3120 let mut int_sum: i64 = 0;
3121 let mut float_sum: f64 = 0.0;
3122 let mut saw_float = false;
3123 for &ri in row_indices {
3124 match &all_rows[ri][col_idx] {
3125 Value::Int(v) => int_sum += v,
3126 Value::Float(v) => {
3127 float_sum += *v;
3128 saw_float = true;
3129 }
3130 _ => {}
3131 }
3132 }
3133 if saw_float {
3134 Value::Float(float_sum + int_sum as f64)
3135 } else {
3136 Value::Int(int_sum)
3137 }
3138 }
3139 AggFunc::Avg => {
3140 let mut sum = 0.0f64;
3141 let mut count = 0usize;
3142 for &ri in row_indices {
3143 match &all_rows[ri][col_idx] {
3144 Value::Int(v) => {
3145 sum += *v as f64;
3146 count += 1;
3147 }
3148 Value::Float(v) => {
3149 sum += *v;
3150 count += 1;
3151 }
3152 _ => {}
3153 }
3154 }
3155 if count == 0 {
3156 Value::Empty
3157 } else {
3158 Value::Float(sum / count as f64)
3159 }
3160 }
3161 AggFunc::Min => row_indices
3162 .iter()
3163 .map(|&ri| &all_rows[ri][col_idx])
3164 .filter(|v| !v.is_empty())
3165 .min()
3166 .cloned()
3167 .unwrap_or(Value::Empty),
3168 AggFunc::Max => row_indices
3169 .iter()
3170 .map(|&ri| &all_rows[ri][col_idx])
3171 .filter(|v| !v.is_empty())
3172 .max()
3173 .cloned()
3174 .unwrap_or(Value::Empty),
3175 }
3176}
3177
3178/// Mission E1.3: try to extract equi-join key indices from a join `on`
3179/// predicate. Returns `Some((left_col_idx, right_col_idx))` when the
3180/// predicate is exactly `L = R` (or `R = L`) and both sides resolve
3181/// cleanly — `L` to the left subtree's column list and `R` to the right
3182/// subtree's column list.
3183///
3184/// This is deliberately narrow. We only recognise the two shapes:
3185/// * `QualifiedField = QualifiedField` (`u.id = o.user_id`)
3186/// * `Field = Field` (`.id = .user_id`, unqualified)
3187///
3188/// Anything else — conjunctions, constants, function calls, or predicates
3189/// that touch the same side on both halves — falls through to the
3190/// nested-loop path unchanged.
3191pub(super) fn try_extract_equi_join_keys(
3192 pred: &Expr,
3193 left_columns: &[String],
3194 right_columns: &[String],
3195) -> Option<(usize, usize)> {
3196 let (lhs, op, rhs) = match pred {
3197 Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3198 _ => return None,
3199 };
3200 if op != BinOp::Eq {
3201 return None;
3202 }
3203 // Normal orientation: lhs in left, rhs in right.
3204 if let (Some(li), Some(ri)) = (
3205 resolve_side_column(lhs, left_columns),
3206 resolve_side_column(rhs, right_columns),
3207 ) {
3208 return Some((li, ri));
3209 }
3210 // Swapped: rhs in left, lhs in right. Both sides of `=` are
3211 // commutative so this is safe.
3212 if let (Some(li), Some(ri)) = (
3213 resolve_side_column(rhs, left_columns),
3214 resolve_side_column(lhs, right_columns),
3215 ) {
3216 return Some((li, ri));
3217 }
3218 None
3219}
3220
3221fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3222 match expr {
3223 Expr::QualifiedField { qualifier, field } => {
3224 // Byte-level match so we don't allocate a fresh `format!` on
3225 // every call — this runs once per plan, so allocation would be
3226 // cheap, but the match is trivial enough to keep inline with
3227 // the eval_expr version.
3228 let q = qualifier.as_bytes();
3229 let f = field.as_bytes();
3230 columns.iter().position(|c| {
3231 let b = c.as_bytes();
3232 b.len() == q.len() + 1 + f.len()
3233 && b[..q.len()] == *q
3234 && b[q.len()] == b'.'
3235 && b[q.len() + 1..] == *f
3236 })
3237 }
3238 Expr::Field(name) => columns.iter().position(|c| c == name),
3239 _ => None,
3240 }
3241}
3242
3243/// Mission E1.3: O(L + R) hash join. Builds a `FxHashMap<Value, Vec<usize>>`
3244/// over the right (inner) side's join keys, then streams the left (outer)
3245/// side and for each probe row emits every combined row whose right-side
3246/// key matches. For `JoinKind::LeftOuter`, unmatched left rows are emitted
3247/// padded with `Value::Empty` on the right side.
3248///
3249/// The right side is always the build side. That choice is forced for
3250/// LeftOuter (the left side must stream so we can detect orphans), and
3251/// for Inner it's a reasonable default — left-deep plans tend to grow the
3252/// left side with each join, so the un-joined right leaf is often the
3253/// smaller of the two at each level.
3254pub(super) fn hash_join(
3255 left_columns: Vec<String>,
3256 left_rows: Vec<Vec<Value>>,
3257 right_columns: Vec<String>,
3258 right_rows: Vec<Vec<Value>>,
3259 left_key_idx: usize,
3260 right_key_idx: usize,
3261 kind: JoinKind,
3262) -> QueryResult {
3263 use rustc_hash::FxHashMap;
3264
3265 let n_left = left_columns.len();
3266 let n_right = right_columns.len();
3267 let mut columns = Vec::with_capacity(n_left + n_right);
3268 columns.extend(left_columns);
3269 columns.extend(right_columns);
3270
3271 // Build: right_key -> list of right-row indices. Pre-size to the row
3272 // count so the map doesn't rehash mid-build.
3273 let mut build: FxHashMap<Value, Vec<usize>> =
3274 FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3275 for (i, row) in right_rows.iter().enumerate() {
3276 // Skip Empty keys on the build side — they can never match under
3277 // SQL semantics (NULL ≠ NULL) and would collapse all nullables to
3278 // one bucket.
3279 if matches!(row[right_key_idx], Value::Empty) {
3280 continue;
3281 }
3282 build.entry(row[right_key_idx].clone()).or_default().push(i);
3283 }
3284
3285 // Reasonable starting capacity — inner joins produce ≥ left_rows.len()
3286 // rows in the common 1:1 case, left-outer always emits ≥ left_rows.len().
3287 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3288
3289 for left_row in &left_rows {
3290 let key = &left_row[left_key_idx];
3291 let matched = if matches!(key, Value::Empty) {
3292 None
3293 } else {
3294 build.get(key)
3295 };
3296 match matched {
3297 Some(matches) if !matches.is_empty() => {
3298 for &ri in matches {
3299 let right_row = &right_rows[ri];
3300 let mut combined = Vec::with_capacity(n_left + n_right);
3301 combined.extend_from_slice(left_row);
3302 combined.extend_from_slice(right_row);
3303 rows.push(combined);
3304 }
3305 }
3306 _ => {
3307 if matches!(kind, JoinKind::LeftOuter) {
3308 let mut row = Vec::with_capacity(n_left + n_right);
3309 row.extend_from_slice(left_row);
3310 row.resize(n_left + n_right, Value::Empty);
3311 rows.push(row);
3312 }
3313 }
3314 }
3315 }
3316
3317 QueryResult::Rows { columns, rows }
3318}
3319
3320/// Lower unindexed `RangeScan` nodes to `Filter(SeqScan)` so that all
3321/// downstream fast paths (count, project+limit, sort+limit, agg, update,
3322/// delete) continue to fire.
3323///
3324/// The planner emits `RangeScan` speculatively for every range inequality
3325/// (`.age > 30`) because it has no catalog access. When the column has a
3326/// B-tree index, `RangeScan` is the correct plan. When it doesn't, the
3327/// executor's `RangeScan` fallback materialises every matching row with
3328/// full `decode_row` — bypassing the compiled-predicate fast paths that
3329/// `Filter(SeqScan)` would trigger.
3330///
3331/// This pass runs once per query, before execution.
3332pub(super) fn lower_unindexed_range_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3333 match plan {
3334 PlanNode::RangeScan {
3335 table,
3336 column,
3337 start,
3338 end,
3339 } => {
3340 if let Some(tbl) = catalog.get_table(table) {
3341 // Keep RangeScan only for unique indexes — their btree
3342 // stores raw column values. Non-unique indexes store
3343 // composite keys that don't directly compare against
3344 // column values, so lower them to Filter(SeqScan).
3345 if tbl.is_index_unique(column) == Some(true) {
3346 return plan.clone();
3347 }
3348 }
3349 let pred = synthesize_range_predicate(column, start, end);
3350 PlanNode::Filter {
3351 input: Box::new(PlanNode::SeqScan {
3352 table: table.clone(),
3353 }),
3354 predicate: pred,
3355 }
3356 }
3357 PlanNode::Filter { input, predicate } => PlanNode::Filter {
3358 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3359 predicate: predicate.clone(),
3360 },
3361 PlanNode::Project { input, fields } => PlanNode::Project {
3362 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3363 fields: fields.clone(),
3364 },
3365 PlanNode::Sort { input, keys } => PlanNode::Sort {
3366 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3367 keys: keys.clone(),
3368 },
3369 PlanNode::Limit { input, count } => PlanNode::Limit {
3370 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3371 count: count.clone(),
3372 },
3373 PlanNode::Offset { input, count } => PlanNode::Offset {
3374 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3375 count: count.clone(),
3376 },
3377 PlanNode::Aggregate {
3378 input,
3379 function,
3380 field,
3381 } => PlanNode::Aggregate {
3382 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3383 function: *function,
3384 field: field.clone(),
3385 },
3386 PlanNode::Distinct { input } => PlanNode::Distinct {
3387 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3388 },
3389 PlanNode::GroupBy {
3390 input,
3391 keys,
3392 aggregates,
3393 having,
3394 } => PlanNode::GroupBy {
3395 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3396 keys: keys.clone(),
3397 aggregates: aggregates.clone(),
3398 having: having.clone(),
3399 },
3400 PlanNode::Update {
3401 input,
3402 table,
3403 assignments,
3404 } => PlanNode::Update {
3405 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3406 table: table.clone(),
3407 assignments: assignments.clone(),
3408 },
3409 PlanNode::Delete { input, table } => PlanNode::Delete {
3410 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3411 table: table.clone(),
3412 },
3413 PlanNode::Window { input, windows } => PlanNode::Window {
3414 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3415 windows: windows.clone(),
3416 },
3417 PlanNode::Union { left, right, all } => PlanNode::Union {
3418 left: Box::new(lower_unindexed_range_scans(catalog, left)),
3419 right: Box::new(lower_unindexed_range_scans(catalog, right)),
3420 all: *all,
3421 },
3422 PlanNode::Explain { input } => PlanNode::Explain {
3423 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3424 },
3425 PlanNode::NestedLoopJoin {
3426 left,
3427 right,
3428 on,
3429 kind,
3430 } => PlanNode::NestedLoopJoin {
3431 left: Box::new(lower_unindexed_range_scans(catalog, left)),
3432 right: Box::new(lower_unindexed_range_scans(catalog, right)),
3433 on: on.clone(),
3434 kind: *kind,
3435 },
3436 // Leaf nodes: no children to recurse into.
3437 _ => plan.clone(),
3438 }
3439}
3440
3441/// Synthesize a range predicate from RangeScan bounds for the fallback path.
3442pub(super) fn synthesize_range_predicate(
3443 column: &str,
3444 start: &Option<(Expr, bool)>,
3445 end: &Option<(Expr, bool)>,
3446) -> Expr {
3447 let lower = start.as_ref().map(|(expr, inclusive)| {
3448 let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3449 Expr::BinaryOp(
3450 Box::new(Expr::Field(column.to_string())),
3451 op,
3452 Box::new(expr.clone()),
3453 )
3454 });
3455 let upper = end.as_ref().map(|(expr, inclusive)| {
3456 let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3457 Expr::BinaryOp(
3458 Box::new(Expr::Field(column.to_string())),
3459 op,
3460 Box::new(expr.clone()),
3461 )
3462 });
3463 match (lower, upper) {
3464 (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3465 (Some(l), None) => l,
3466 (None, Some(u)) => u,
3467 (None, None) => Expr::Literal(Literal::Bool(true)),
3468 }
3469}
3470
3471/// Check if a value falls within a range (used in last-resort decoded-row eval).
3472pub(super) fn range_matches(
3473 val: &Value,
3474 start: &Option<Value>,
3475 start_inc: bool,
3476 end: &Option<Value>,
3477 end_inc: bool,
3478) -> bool {
3479 if let Some(ref s) = start {
3480 if start_inc {
3481 if val < s {
3482 return false;
3483 }
3484 } else if val <= s {
3485 return false;
3486 }
3487 }
3488 if let Some(ref e) = end {
3489 if end_inc {
3490 if val > e {
3491 return false;
3492 }
3493 } else if val >= e {
3494 return false;
3495 }
3496 }
3497 true
3498}
3499
3500/// Format a `PlanNode` tree as a human-readable, indented text
3501/// representation. Used by the `EXPLAIN` command.
3502pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3503 let indent = " ".repeat(depth);
3504 match plan {
3505 PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3506 PlanNode::AliasScan { table, alias } => {
3507 format!("{indent}AliasScan table={table} alias={alias}")
3508 }
3509 PlanNode::IndexScan { table, column, key } => {
3510 format!("{indent}IndexScan table={table} column={column} key={key:?}")
3511 }
3512 PlanNode::RangeScan {
3513 table,
3514 column,
3515 start,
3516 end,
3517 } => {
3518 let s = match start {
3519 Some((expr, inc)) => {
3520 let op = if *inc { ">=" } else { ">" };
3521 format!("{op}{expr:?}")
3522 }
3523 None => "unbounded".to_string(),
3524 };
3525 let e = match end {
3526 Some((expr, inc)) => {
3527 let op = if *inc { "<=" } else { "<" };
3528 format!("{op}{expr:?}")
3529 }
3530 None => "unbounded".to_string(),
3531 };
3532 format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3533 }
3534 PlanNode::Filter { input, predicate } => {
3535 let child = format_plan_tree(input, depth + 1);
3536 format!("{indent}Filter predicate={predicate:?}\n{child}")
3537 }
3538 PlanNode::Project { input, fields } => {
3539 let names: Vec<String> = fields
3540 .iter()
3541 .map(|f| match &f.alias {
3542 Some(a) => format!("{a}: {:?}", f.expr),
3543 None => format!("{:?}", f.expr),
3544 })
3545 .collect();
3546 let child = format_plan_tree(input, depth + 1);
3547 format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3548 }
3549 PlanNode::Sort { input, keys } => {
3550 let ks: Vec<String> = keys
3551 .iter()
3552 .map(|k| {
3553 if k.descending {
3554 format!("{} desc", k.field)
3555 } else {
3556 k.field.clone()
3557 }
3558 })
3559 .collect();
3560 let child = format_plan_tree(input, depth + 1);
3561 format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3562 }
3563 PlanNode::Limit { input, count } => {
3564 let child = format_plan_tree(input, depth + 1);
3565 format!("{indent}Limit count={count:?}\n{child}")
3566 }
3567 PlanNode::Offset { input, count } => {
3568 let child = format_plan_tree(input, depth + 1);
3569 format!("{indent}Offset count={count:?}\n{child}")
3570 }
3571 PlanNode::Aggregate {
3572 input,
3573 function,
3574 field,
3575 } => {
3576 let f = field.as_deref().unwrap_or("*");
3577 let child = format_plan_tree(input, depth + 1);
3578 format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3579 }
3580 PlanNode::NestedLoopJoin {
3581 left,
3582 right,
3583 on,
3584 kind,
3585 } => {
3586 let left_child = format_plan_tree(left, depth + 1);
3587 let right_child = format_plan_tree(right, depth + 1);
3588 let on_str = match on {
3589 Some(pred) => format!("{pred:?}"),
3590 None => "none".to_string(),
3591 };
3592 format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3593 }
3594 PlanNode::Distinct { input } => {
3595 let child = format_plan_tree(input, depth + 1);
3596 format!("{indent}Distinct\n{child}")
3597 }
3598 PlanNode::GroupBy {
3599 input,
3600 keys,
3601 aggregates,
3602 having,
3603 } => {
3604 let agg_strs: Vec<String> = aggregates
3605 .iter()
3606 .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3607 .collect();
3608 let having_str = match having {
3609 Some(h) => format!(" having={h:?}"),
3610 None => String::new(),
3611 };
3612 let child = format_plan_tree(input, depth + 1);
3613 format!(
3614 "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3615 keys.join(", "),
3616 agg_strs.join(", "),
3617 )
3618 }
3619 PlanNode::Insert { table, rows } => {
3620 let cols: Vec<&str> = rows
3621 .first()
3622 .map(|r| r.iter().map(|a| a.field.as_str()).collect())
3623 .unwrap_or_default();
3624 format!(
3625 "{indent}Insert table={table} rows={} cols=[{}]",
3626 rows.len(),
3627 cols.join(", ")
3628 )
3629 }
3630 PlanNode::Upsert {
3631 table,
3632 key_column,
3633 assignments,
3634 on_conflict,
3635 } => {
3636 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3637 let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3638 if conflict_cols.is_empty() {
3639 format!(
3640 "{indent}Upsert table={table} key={key_column} cols=[{}]",
3641 cols.join(", ")
3642 )
3643 } else {
3644 format!(
3645 "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3646 cols.join(", "),
3647 conflict_cols.join(", ")
3648 )
3649 }
3650 }
3651 PlanNode::Update {
3652 input,
3653 table,
3654 assignments,
3655 } => {
3656 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3657 let child = format_plan_tree(input, depth + 1);
3658 format!(
3659 "{indent}Update table={table} set=[{}]\n{child}",
3660 cols.join(", ")
3661 )
3662 }
3663 PlanNode::Delete { input, table } => {
3664 let child = format_plan_tree(input, depth + 1);
3665 format!("{indent}Delete table={table}\n{child}")
3666 }
3667 PlanNode::CreateTable { name, fields } => {
3668 let fs: Vec<String> = fields
3669 .iter()
3670 .map(|(n, t, r)| {
3671 if *r {
3672 format!("{n}: {t} required")
3673 } else {
3674 format!("{n}: {t}")
3675 }
3676 })
3677 .collect();
3678 format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3679 }
3680 PlanNode::AlterTable { table, action } => {
3681 format!("{indent}AlterTable table={table} action={action:?}")
3682 }
3683 PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3684 PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3685 PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3686 PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3687 PlanNode::Window { input, windows } => {
3688 let ws: Vec<String> = windows
3689 .iter()
3690 .map(|w| format!("{:?} as {}", w.function, w.output_name))
3691 .collect();
3692 let child = format_plan_tree(input, depth + 1);
3693 format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3694 }
3695 PlanNode::Union { left, right, all } => {
3696 let kind = if *all { "UNION ALL" } else { "UNION" };
3697 let left_child = format_plan_tree(left, depth + 1);
3698 let right_child = format_plan_tree(right, depth + 1);
3699 format!("{indent}{kind}\n{left_child}\n{right_child}")
3700 }
3701 PlanNode::Explain { input } => {
3702 let child = format_plan_tree(input, depth + 1);
3703 format!("{indent}Explain\n{child}")
3704 }
3705 PlanNode::Begin => format!("{indent}Begin"),
3706 PlanNode::Commit => format!("{indent}Commit"),
3707 PlanNode::Rollback => format!("{indent}Rollback"),
3708 }
3709}