powdb_query/executor/plan_exec.rs
1//! The execute_plan method and associated helpers.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::{check_join_limit, Engine, MAX_SORT_ROWS};
15use powdb_storage::view::{ViewDef, ViewRegistry};
16
17impl Engine {
18 pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
19 match plan {
20 PlanNode::SeqScan { table } => {
21 // Auto-refresh dirty materialized views on read.
22 if self.view_registry.is_dirty(table) {
23 self.refresh_view(table)?;
24 }
25 let schema = self
26 .catalog
27 .schema(table)
28 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
29 .clone();
30 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
31 let rows: Vec<Vec<Value>> = self
32 .catalog
33 .scan(table)
34 .map_err(|e| QueryError::StorageError(e.to_string()))?
35 .map(|(_, row)| row)
36 .collect();
37 Ok(QueryResult::Rows { columns, rows })
38 }
39
40 PlanNode::Filter { input, predicate } => {
41 // Materialize any IN-subqueries in the predicate before the
42 // scan loop — the closure can't call back into the engine.
43 // Correlated subqueries are left in place for per-row eval.
44 let materialized;
45 let predicate = if contains_subquery(predicate) {
46 materialized = self.materialize_subqueries(predicate)?;
47 &materialized
48 } else {
49 predicate
50 };
51
52 // Correlated subquery path: per-row materialisation.
53 if contains_subquery(predicate) {
54 let result = self.execute_plan(input)?;
55 return match result {
56 QueryResult::Rows { columns, rows } => {
57 let mut filtered = Vec::new();
58 for row in rows {
59 let row_pred =
60 self.materialize_correlated_for_row(predicate, &row, &columns)?;
61 if eval_predicate(&row_pred, &row, &columns) {
62 filtered.push(row);
63 }
64 }
65 Ok(QueryResult::Rows {
66 columns,
67 rows: filtered,
68 })
69 }
70 _ => Err("filter requires row input".into()),
71 };
72 }
73
74 // Fast path: fuse Filter + SeqScan into a zero-copy streaming
75 // loop. Uses decode_column() to evaluate the predicate on only
76 // the columns it references, avoiding heap allocations for
77 // String/Bytes columns that aren't part of the filter.
78 if let PlanNode::SeqScan { table } = input.as_ref() {
79 // Auto-refresh dirty materialized views.
80 if self.view_registry.is_dirty(table) {
81 self.refresh_view(table)?;
82 }
83 let schema = self
84 .catalog
85 .schema(table)
86 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
87 .clone();
88 let columns: Vec<String> =
89 schema.columns.iter().map(|c| c.name.clone()).collect();
90 let fast = FastLayout::new(&schema);
91 let row_layout = RowLayout::new(&schema);
92 // Mission F: pre-size to skip the first 4 Vec doublings
93 // (4 → 8 → 16 → 32 → 64). On a 100K-row scan with 30%
94 // selectivity that's ~4 fewer reallocations + memcpys.
95 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
96
97 // Try compiled predicate for the filter check (handles
98 // int leaves, string-eq leaves, and And conjunctions).
99 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
100 self.catalog
101 .for_each_row_raw(table, |_rid, data| {
102 if compiled(data) {
103 rows.push(decode_row(&schema, data));
104 }
105 })
106 .map_err(|e| QueryError::StorageError(e.to_string()))?;
107 } else {
108 let pred_cols = predicate_column_indices(predicate, &columns);
109 self.catalog
110 .for_each_row_raw(table, |_rid, data| {
111 let pred_row =
112 decode_selective(&schema, &row_layout, data, &pred_cols);
113 if eval_predicate(predicate, &pred_row, &columns) {
114 rows.push(decode_row(&schema, data));
115 }
116 })
117 .map_err(|e| QueryError::StorageError(e.to_string()))?;
118 }
119
120 return Ok(QueryResult::Rows { columns, rows });
121 }
122
123 // General path: materialise then filter.
124 let result = self.execute_plan(input)?;
125 match result {
126 QueryResult::Rows { columns, rows } => {
127 let filtered: Vec<Vec<Value>> = rows
128 .into_iter()
129 .filter(|row| eval_predicate(predicate, row, &columns))
130 .collect();
131 Ok(QueryResult::Rows {
132 columns,
133 rows: filtered,
134 })
135 }
136 _ => Err("filter requires row input".into()),
137 }
138 }
139
140 PlanNode::Project { input, fields } => {
141 // Fast path: Project over IndexScan — decode only projected
142 // columns from raw bytes instead of full decode_row.
143 if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
144 let schema = self
145 .catalog
146 .schema(table)
147 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
148 .clone();
149 let all_columns: Vec<String> =
150 schema.columns.iter().map(|c| c.name.clone()).collect();
151 let key_value = literal_to_value(key)?;
152 let tbl = self
153 .catalog
154 .get_table(table)
155 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
156
157 let proj_columns: Vec<String> = fields
158 .iter()
159 .map(|f| {
160 f.alias.clone().unwrap_or_else(|| match &f.expr {
161 Expr::Field(name) => name.clone(),
162 _ => "?".into(),
163 })
164 })
165 .collect();
166
167 // Determine which column indices the projection needs
168 let proj_indices: Vec<usize> = fields
169 .iter()
170 .filter_map(|f| {
171 if let Expr::Field(name) = &f.expr {
172 all_columns.iter().position(|c| c == name)
173 } else {
174 None
175 }
176 })
177 .collect();
178
179 if tbl.has_index(column) {
180 let layout = RowLayout::new(&schema);
181 let rids = tbl.index_lookup_all(column, &key_value);
182 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
183 for rid in rids {
184 if let Some(data) = tbl.heap.get(rid) {
185 let row: Vec<Value> = proj_indices
186 .iter()
187 .map(|&ci| decode_column(&schema, &layout, &data, ci))
188 .collect();
189 rows.push(row);
190 }
191 }
192 return Ok(QueryResult::Rows {
193 columns: proj_columns,
194 rows,
195 });
196 }
197 }
198
199 // Fast path: Project(Limit(Sort(Filter(SeqScan)))) — bounded
200 // top-N heap. Decodes only the sort key + projected columns,
201 // keeps at most `limit` rows in a heap. Also handles the
202 // Project(Limit(Sort(SeqScan))) variant (no filter).
203 if let PlanNode::Limit {
204 input: inner,
205 count: limit_expr,
206 } = input.as_ref()
207 {
208 if let PlanNode::Sort {
209 input: sort_input,
210 keys,
211 } = inner.as_ref()
212 {
213 // Fast path only for single-key sorts
214 if keys.len() == 1 {
215 let sort_field = &keys[0].field;
216 let descending = keys[0].descending;
217 let limit = match limit_expr {
218 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
219 _ => usize::MAX,
220 };
221 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
222 match sort_input.as_ref() {
223 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
224 PlanNode::Filter {
225 input: fi,
226 predicate,
227 } => {
228 if let PlanNode::SeqScan { table } = fi.as_ref() {
229 (Some(table.as_str()), Some(predicate))
230 } else {
231 (None, None)
232 }
233 }
234 _ => (None, None),
235 };
236 if let Some(table) = table_opt {
237 if let Some(result) = self.project_filter_sort_limit_fast(
238 table, fields, sort_field, descending, limit, pred_opt,
239 )? {
240 return Ok(result);
241 }
242 }
243 }
244 }
245 // Fast path: Project(Limit(Filter(SeqScan))) — stream,
246 // decode only projected columns, stop at limit.
247 if let PlanNode::Filter {
248 input: fi,
249 predicate,
250 } = inner.as_ref()
251 {
252 if let PlanNode::SeqScan { table } = fi.as_ref() {
253 let limit = match limit_expr {
254 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
255 _ => usize::MAX,
256 };
257 if let Some(result) = self.project_filter_limit_fast(
258 table,
259 fields,
260 limit,
261 Some(predicate),
262 )? {
263 return Ok(result);
264 }
265 }
266 }
267 // Fast path: Project(Limit(SeqScan)) — stream, no filter.
268 if let PlanNode::SeqScan { table } = inner.as_ref() {
269 let limit = match limit_expr {
270 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
271 _ => usize::MAX,
272 };
273 if let Some(result) =
274 self.project_filter_limit_fast(table, fields, limit, None)?
275 {
276 return Ok(result);
277 }
278 }
279 }
280
281 // Mission D4: Project(Filter(SeqScan)) without Limit. Reuses
282 // `project_filter_limit_fast` with limit = usize::MAX so the
283 // hot loop decodes only projected columns and uses the
284 // compiled predicate. Previously this fell through to the
285 // generic Filter branch which materialised every column via
286 // `decode_row` then re-projected — quadratic work.
287 //
288 // multi_col_and_filter (`U filter .age > 30 and .status =
289 // "active" { .name, .age }`) was 6.18ms (0.7x SQLite) and
290 // is the load-bearing workload for this fast path.
291 if let PlanNode::Filter {
292 input: fi,
293 predicate,
294 } = input.as_ref()
295 {
296 if let PlanNode::SeqScan { table } = fi.as_ref() {
297 if let Some(result) = self.project_filter_limit_fast(
298 table,
299 fields,
300 usize::MAX,
301 Some(predicate),
302 )? {
303 return Ok(result);
304 }
305 }
306 }
307
308 // Mission D4: Project(SeqScan) without Filter or Limit.
309 // Decode only projected columns; the previous fall-through
310 // built full Vec<Value> rows then re-projected.
311 if let PlanNode::SeqScan { table } = input.as_ref() {
312 if let Some(result) =
313 self.project_filter_limit_fast(table, fields, usize::MAX, None)?
314 {
315 return Ok(result);
316 }
317 }
318
319 let result = self.execute_plan(input)?;
320 match result {
321 QueryResult::Rows { columns, rows } => {
322 let proj_columns: Vec<String> = fields
323 .iter()
324 .map(|f| {
325 f.alias.clone().unwrap_or_else(|| match &f.expr {
326 Expr::Field(name) => name.clone(),
327 // Mission E1.2: `{ u.name }` projects as the
328 // qualified column name so callers can still
329 // disambiguate across the join output.
330 Expr::QualifiedField { qualifier, field } => {
331 format!("{qualifier}.{field}")
332 }
333 _ => "?".into(),
334 })
335 })
336 .collect();
337 let proj_rows: Vec<Vec<Value>> = rows
338 .iter()
339 .map(|row| {
340 fields
341 .iter()
342 .map(|f| eval_expr(&f.expr, row, &columns))
343 .collect()
344 })
345 .collect();
346 Ok(QueryResult::Rows {
347 columns: proj_columns,
348 rows: proj_rows,
349 })
350 }
351 _ => Err("project requires row input".into()),
352 }
353 }
354
355 PlanNode::Sort { input, keys } => {
356 let result = self.execute_plan(input)?;
357 match result {
358 QueryResult::Rows { columns, mut rows } => {
359 // WS2: row-count cap is a cheap secondary guard; the
360 // byte budget is the real OOM defense for the sort
361 // buffer (a few very large rows pass the row cap).
362 if rows.len() > MAX_SORT_ROWS {
363 return Err(QueryError::SortLimitExceeded);
364 }
365 self.charge_rows(&rows)?;
366 let key_indices: Vec<(usize, bool)> = keys
367 .iter()
368 .map(|k| {
369 columns
370 .iter()
371 .position(|c| c == &k.field)
372 .map(|idx| (idx, k.descending))
373 .ok_or_else(|| QueryError::ColumnNotFound {
374 table: String::new(),
375 column: k.field.clone(),
376 })
377 })
378 .collect::<Result<_, QueryError>>()?;
379 rows.sort_by(|a, b| {
380 for &(col_idx, descending) in &key_indices {
381 let cmp = a[col_idx].cmp(&b[col_idx]);
382 let cmp = if descending { cmp.reverse() } else { cmp };
383 if cmp != std::cmp::Ordering::Equal {
384 return cmp;
385 }
386 }
387 std::cmp::Ordering::Equal
388 });
389 Ok(QueryResult::Rows { columns, rows })
390 }
391 _ => Err("sort requires row input".into()),
392 }
393 }
394
395 PlanNode::Limit { input, count } => {
396 let result = self.execute_plan(input)?;
397 let n = match count {
398 Expr::Literal(Literal::Int(v)) => *v as usize,
399 _ => return Err("limit must be integer literal".into()),
400 };
401 match result {
402 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
403 columns,
404 rows: rows.into_iter().take(n).collect(),
405 }),
406 _ => Err("limit requires row input".into()),
407 }
408 }
409
410 PlanNode::Offset { input, count } => {
411 let result = self.execute_plan(input)?;
412 let n = match count {
413 Expr::Literal(Literal::Int(v)) => *v as usize,
414 _ => return Err("offset must be integer literal".into()),
415 };
416 match result {
417 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
418 columns,
419 rows: rows.into_iter().skip(n).collect(),
420 }),
421 _ => Err("offset requires row input".into()),
422 }
423 }
424
425 PlanNode::Aggregate {
426 input,
427 function,
428 field,
429 } => {
430 // Fast path: count() over SeqScan — count rows without any decode
431 if *function == AggFunc::Count {
432 if let PlanNode::SeqScan { table } = input.as_ref() {
433 // Auto-refresh a dirty materialized view before
434 // counting it — otherwise count(View) returns stale
435 // data after an underlying mutation (F3).
436 if self.view_registry.is_dirty(table) {
437 self.refresh_view(table)?;
438 }
439 let mut count: i64 = 0;
440 self.catalog
441 .for_each_row_raw(table, |_rid, _data| {
442 count += 1;
443 })
444 .map_err(|e| QueryError::StorageError(e.to_string()))?;
445 return Ok(QueryResult::Scalar(Value::Int(count)));
446 }
447 // Fast path: count() over Filter(SeqScan) — try compiled
448 // predicate first, fall back to decode_column path.
449 // Skip a predicate carrying a subquery: the raw-bytes
450 // evaluators here don't materialise subqueries, so
451 // `count(T filter .x in (...))` would silently count 0
452 // (F1). Falling through routes it to the generic path
453 // that resolves the subquery correctly.
454 if let PlanNode::Filter {
455 input: inner,
456 predicate,
457 } = input.as_ref()
458 {
459 if let PlanNode::SeqScan { table } = inner.as_ref() {
460 if self.view_registry.is_dirty(table) {
461 self.refresh_view(table)?;
462 }
463 }
464 if let (PlanNode::SeqScan { table }, false) =
465 (inner.as_ref(), contains_subquery(predicate))
466 {
467 let schema = self
468 .catalog
469 .schema(table)
470 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
471 .clone();
472 let columns: Vec<String> =
473 schema.columns.iter().map(|c| c.name.clone()).collect();
474 let fast = FastLayout::new(&schema);
475 let row_layout = RowLayout::new(&schema);
476
477 // Try compiled predicate (zero-allocation hot path).
478 // Handles int leaves, string-eq leaves, AND conjunctions.
479 if let Some(compiled) =
480 compile_predicate(predicate, &columns, &fast, &schema)
481 {
482 let mut count: i64 = 0;
483 self.catalog
484 .for_each_row_raw(table, |_rid, data| {
485 if compiled(data) {
486 count += 1;
487 }
488 })
489 .map_err(|e| QueryError::StorageError(e.to_string()))?;
490 return Ok(QueryResult::Scalar(Value::Int(count)));
491 }
492
493 // Fallback: decode predicate columns
494 let pred_cols = predicate_column_indices(predicate, &columns);
495 let mut count: i64 = 0;
496 self.catalog
497 .for_each_row_raw(table, |_rid, data| {
498 let pred_row =
499 decode_selective(&schema, &row_layout, data, &pred_cols);
500 if eval_predicate(predicate, &pred_row, &columns) {
501 count += 1;
502 }
503 })
504 .map_err(|e| QueryError::StorageError(e.to_string()))?;
505
506 return Ok(QueryResult::Scalar(Value::Int(count)));
507 }
508 }
509 }
510
511 // Fast path: sum/avg/min/max over a single fixed-size int
512 // column with an optional compiled filter predicate. Walks
513 // raw row bytes, zero allocation per row.
514 if matches!(
515 function,
516 AggFunc::Sum
517 | AggFunc::Avg
518 | AggFunc::Min
519 | AggFunc::Max
520 | AggFunc::CountDistinct
521 ) {
522 if let Some(col) = field.as_ref() {
523 // Shape: Aggregate(SeqScan) or Aggregate(Filter(SeqScan))
524 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
525 match input.as_ref() {
526 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
527 PlanNode::Filter {
528 input: inner,
529 predicate,
530 } => {
531 if let PlanNode::SeqScan { table } = inner.as_ref() {
532 (Some(table.as_str()), Some(predicate))
533 } else {
534 (None, None)
535 }
536 }
537 _ => (None, None),
538 };
539 if let Some(table) = table_opt {
540 if let Some(result) =
541 self.agg_single_col_fast(table, col, *function, pred_opt)?
542 {
543 return Ok(result);
544 }
545 }
546 }
547 }
548
549 // Fast path: Project(Limit(Filter(SeqScan))) — stream, decode
550 // only projected columns, stop once we hit the limit.
551 // (Handled in the Project branch; this branch only fires when
552 // the aggregate is the outer node.)
553 let result = self.execute_plan(input)?;
554 match result {
555 QueryResult::Rows { columns, rows } => {
556 match function {
557 AggFunc::Count => {
558 Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
559 }
560 AggFunc::CountDistinct => {
561 let col = field.as_ref().ok_or("count distinct requires field")?;
562 let idx = columns
563 .iter()
564 .position(|c| c == col)
565 .ok_or("col not found")?;
566 let mut seen = std::collections::HashSet::new();
567 for row in &rows {
568 let v = &row[idx];
569 if !v.is_empty() {
570 seen.insert(v.clone());
571 }
572 }
573 Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
574 }
575 AggFunc::Avg => {
576 let col = field.as_ref().ok_or("avg requires field")?;
577 let idx = columns
578 .iter()
579 .position(|c| c == col)
580 .ok_or("col not found")?;
581 let sum: f64 = rows
582 .iter()
583 .filter_map(|r| match &r[idx] {
584 Value::Int(v) => Some(*v as f64),
585 Value::Float(v) => Some(*v),
586 _ => None,
587 })
588 .sum();
589 let count = rows.len() as f64;
590 Ok(QueryResult::Scalar(Value::Float(sum / count)))
591 }
592 AggFunc::Sum => {
593 let col = field.as_ref().ok_or("sum requires field")?;
594 let idx = columns
595 .iter()
596 .position(|c| c == col)
597 .ok_or("col not found")?;
598 // Track int and float contributions separately so
599 // Float columns (and mixed Int/Float rows) don't get
600 // silently dropped as they did in the Int-only
601 // version. If any Float is present, the whole sum
602 // promotes to Float — matching Avg's semantics.
603 let mut int_sum: i64 = 0;
604 let mut float_sum: f64 = 0.0;
605 let mut saw_float = false;
606 for r in &rows {
607 match &r[idx] {
608 Value::Int(v) => int_sum += *v,
609 Value::Float(v) => {
610 float_sum += *v;
611 saw_float = true;
612 }
613 _ => {}
614 }
615 }
616 let result = if saw_float {
617 Value::Float(float_sum + int_sum as f64)
618 } else {
619 Value::Int(int_sum)
620 };
621 Ok(QueryResult::Scalar(result))
622 }
623 AggFunc::Min | AggFunc::Max => {
624 let col = field.as_ref().ok_or("min/max requires field")?;
625 let idx = columns
626 .iter()
627 .position(|c| c == col)
628 .ok_or("col not found")?;
629 let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
630 let result = if *function == AggFunc::Min {
631 vals.into_iter().min().cloned()
632 } else {
633 vals.into_iter().max().cloned()
634 };
635 Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
636 }
637 }
638 }
639 _ => Err("aggregate requires row input".into()),
640 }
641 }
642
643 PlanNode::Insert { table, rows } => {
644 // Build + validate EVERY row before inserting any, so a bad
645 // row (unknown/missing/uncoercible field) aborts the whole
646 // statement without a partial write. The WAL fsync happens
647 // once at statement end, so N rows = N appends + 1 fsync.
648 let all_values: Vec<Vec<Value>> = {
649 let schema = self
650 .catalog
651 .schema(table)
652 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
653 let mut all = Vec::with_capacity(rows.len());
654 for assignments in rows {
655 let mut values = vec![Value::Empty; schema.columns.len()];
656 for a in assignments {
657 let idx = schema.column_index(&a.field).ok_or_else(|| {
658 QueryError::ColumnNotFound {
659 table: String::new(),
660 column: a.field.clone(),
661 }
662 })?;
663 let raw = literal_to_value(&a.value)?;
664 values[idx] = coerce_value(raw, &schema.columns[idx])?;
665 }
666 for col in &schema.columns {
667 if col.required && matches!(values[col.position as usize], Value::Empty)
668 {
669 return Err(QueryError::Execution(format!(
670 "column '{}' is required but no value was provided",
671 col.name
672 )));
673 }
674 }
675 all.push(values);
676 }
677 all
678 };
679 // Charge the materialized batch against the per-query memory
680 // budget before inserting — keeps multi-row insert consistent
681 // with every other full-materialization point (sort/join/group)
682 // and bounds embedded callers (the server also caps the query
683 // string at 1 MB, but embedded callers have no such limit).
684 self.charge_rows(&all_values)?;
685 let n = all_values.len() as u64;
686 for values in &all_values {
687 self.catalog
688 .insert(table, values)
689 .map_err(|e| QueryError::StorageError(e.to_string()))?;
690 }
691 self.view_registry.mark_dependents_dirty(table);
692 Ok(QueryResult::Modified(n))
693 }
694
695 PlanNode::Upsert {
696 table,
697 key_column,
698 assignments,
699 on_conflict,
700 } => {
701 let (values, key_idx) = {
702 let schema = self
703 .catalog
704 .schema(table)
705 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
706 let mut values = vec![Value::Empty; schema.columns.len()];
707 for a in assignments {
708 let idx = schema.column_index(&a.field).ok_or_else(|| {
709 QueryError::ColumnNotFound {
710 table: String::new(),
711 column: a.field.clone(),
712 }
713 })?;
714 let raw = literal_to_value(&a.value)?;
715 values[idx] = coerce_value(raw, &schema.columns[idx])?;
716 }
717 for col in &schema.columns {
718 if col.required && matches!(values[col.position as usize], Value::Empty) {
719 return Err(QueryError::Execution(format!(
720 "column '{}' is required but no value was provided",
721 col.name
722 )));
723 }
724 }
725 let key_idx = schema
726 .column_index(key_column)
727 .ok_or_else(|| format!("key column '{key_column}' not found"))?;
728 (values, key_idx)
729 };
730
731 // Upsert requires the `on` column to be unique — otherwise
732 // there is no well-defined row to overwrite and a plain
733 // insert could silently create duplicate keys.
734 if self.catalog.is_index_unique(table, key_column) != Some(true) {
735 return Err(QueryError::Execution(format!(
736 "upsert on .{key_column} requires a unique column (declare it with \
737 `unique {key_column}: <type>` or `alter {table} add unique .{key_column}`)"
738 )));
739 }
740
741 let key_value = values[key_idx].clone();
742
743 // Probe the unique index for a conflict.
744 let existing = {
745 let tbl = self
746 .catalog
747 .get_table(table)
748 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
749 // The key column is guaranteed unique above, so this
750 // returns at most one matching row.
751 let rids = tbl.index_lookup_all(key_column, &key_value);
752 rids.into_iter().next().and_then(|rid| {
753 tbl.heap
754 .get(rid)
755 .map(|data| (rid, decode_row(&tbl.schema, &data)))
756 })
757 };
758
759 if let Some((rid, mut existing_row)) = existing {
760 // Conflict: apply on_conflict assignments (or all non-key if empty).
761 let update_assignments = if on_conflict.is_empty() {
762 assignments
763 } else {
764 on_conflict
765 };
766 let changed_cols: Vec<usize> = {
767 let schema = self
768 .catalog
769 .schema(table)
770 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
771 let mut indices = Vec::new();
772 for a in update_assignments {
773 let idx = schema.column_index(&a.field).ok_or_else(|| {
774 QueryError::ColumnNotFound {
775 table: String::new(),
776 column: a.field.clone(),
777 }
778 })?;
779 if idx != key_idx {
780 existing_row[idx] = literal_to_value(&a.value)?;
781 indices.push(idx);
782 }
783 }
784 indices
785 };
786 self.catalog
787 .update_hinted(table, rid, &existing_row, Some(&changed_cols))
788 .map_err(|e| QueryError::StorageError(e.to_string()))?;
789 self.view_registry.mark_dependents_dirty(table);
790 Ok(QueryResult::Modified(1))
791 } else {
792 // No conflict: insert.
793 self.catalog
794 .insert(table, &values)
795 .map_err(|e| QueryError::StorageError(e.to_string()))?;
796 self.view_registry.mark_dependents_dirty(table);
797 Ok(QueryResult::Modified(1))
798 }
799 }
800
801 PlanNode::Update {
802 input,
803 table,
804 assignments,
805 } => {
806 // Mission C Phase 3: resolve assignments against a borrowed
807 // schema, then drop the borrow before the mutation loop.
808 // Try literal-only path first; fall back to per-row expression
809 // evaluation if any assignment contains a non-literal expression
810 // (e.g., `age := .age + 1`).
811 let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
812 let schema_ref = self
813 .catalog
814 .schema(table)
815 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
816 let indices: Vec<usize> = assignments
817 .iter()
818 .map(|a| {
819 schema_ref.column_index(&a.field).ok_or_else(|| {
820 QueryError::ColumnNotFound {
821 table: String::new(),
822 column: a.field.clone(),
823 }
824 })
825 })
826 .collect::<Result<_, _>>()?;
827 let vals: Result<Vec<Value>, _> = assignments
828 .iter()
829 .map(|a| literal_to_value(&a.value))
830 .collect();
831 (indices, vals.ok())
832 };
833 let resolved_assignments: Option<Vec<(usize, Value)>> =
834 literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
835
836 // Mission C Phase 2: the hint Table::update_hinted needs to
837 // decide whether to read the old row for index diff.
838 let changed_cols: Vec<usize> = col_indices.clone();
839
840 // ── Fused scan+update for Update(Filter(SeqScan)) ────────
841 // Perf sprint: instead of the two-pass collect-RIDs-then-loop
842 // pattern (which pays one ensure_hot per matched row on the
843 // second pass), fuse the predicate evaluation and in-place
844 // byte-level mutation into a single heap walk. Same idea as
845 // the fused scan_delete_matching path for deletes.
846 if let Some(ref resolved_assignments) = resolved_assignments {
847 if let PlanNode::Filter {
848 input: inner,
849 predicate,
850 } = input.as_ref()
851 {
852 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
853 if t == table {
854 let fused_result = self.try_fused_scan_update(
855 table,
856 predicate,
857 resolved_assignments,
858 &changed_cols,
859 );
860 if let Some(result) = fused_result {
861 return result;
862 }
863 }
864 }
865 }
866 }
867
868 // Collect matching RowIds in a single pass.
869 let matching_rids = self.collect_rids_for_mutation(input, table)?;
870
871 // ── Literal-only fast paths ─────────────────────────────
872 if let Some(ref resolved_assignments) = resolved_assignments {
873 // Mission C Phase 4: in-place byte-patch fast path. If every
874 // assignment targets a fixed-size non-null column AND none of
875 // them is indexed, we can skip decode_row / Vec<Value> /
876 // encode_row_into entirely and patch the row's raw bytes on
877 // the hot page.
878 let fast_patch: Option<Vec<FastPatch>> = {
879 let tbl = self
880 .catalog
881 .get_table(table)
882 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
883 let schema = &tbl.schema;
884 let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
885 is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
886 });
887 let no_indexed = !resolved_assignments
888 .iter()
889 .any(|(idx, _)| tbl.has_indexed_col(*idx));
890
891 if all_fixed_nonnull && no_indexed {
892 let layout = RowLayout::new(schema);
893 let bitmap_size = layout.bitmap_size();
894 let patches: Vec<FastPatch> = resolved_assignments
895 .iter()
896 .map(|(idx, val)| {
897 let fixed_off = layout
898 .fixed_offset(*idx)
899 .expect("is_fixed_size already checked");
900 let field_off = 2 + bitmap_size + fixed_off;
901 let bytes: FixedBytes = match val {
902 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
903 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
904 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
905 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
906 Value::Uuid(v) => FixedBytes::Uuid(*v),
907 _ => unreachable!("all_fixed_nonnull guard lied"),
908 };
909 FastPatch {
910 field_off,
911 bitmap_byte_off: 2 + idx / 8,
912 bit_mask: 1u8 << (idx % 8),
913 bytes,
914 }
915 })
916 .collect();
917 Some(patches)
918 } else {
919 None
920 }
921 };
922
923 if let Some(patches) = fast_patch {
924 let mut count = 0u64;
925 for rid in matching_rids {
926 // Mission B2: WAL-log every patch so crash
927 // recovery replays the update. Same mutation
928 // closure as before — the wrapper just sandwiches
929 // it between a hot-page read and a WAL append.
930 let ok = self
931 .catalog
932 .update_row_bytes_logged(table, rid, |row| {
933 for p in &patches {
934 row[p.bitmap_byte_off] &= !p.bit_mask;
935 let field_bytes = p.bytes.as_slice();
936 row[p.field_off..p.field_off + field_bytes.len()]
937 .copy_from_slice(field_bytes);
938 }
939 })
940 .map_err(|e| QueryError::StorageError(e.to_string()))?;
941 if ok {
942 count += 1;
943 }
944 }
945 self.view_registry.mark_dependents_dirty(table);
946 return Ok(QueryResult::Modified(count));
947 }
948
949 // Mission C Phase 10: var-column in-place shrink fast path.
950 let var_fast: Option<(usize, Option<Vec<u8>>)> = {
951 let tbl = self
952 .catalog
953 .get_table(table)
954 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
955 let schema = &tbl.schema;
956 let is_single = resolved_assignments.len() == 1;
957 let is_var_col = is_single
958 && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
959 let no_indexed = !resolved_assignments
960 .iter()
961 .any(|(idx, _)| tbl.has_indexed_col(*idx));
962
963 if is_single && is_var_col && no_indexed {
964 let (idx, val) = &resolved_assignments[0];
965 let bytes_opt: Option<Vec<u8>> = match val {
966 Value::Str(s) => Some(s.as_bytes().to_vec()),
967 Value::Bytes(b) => Some(b.clone()),
968 Value::Empty => None,
969 _ => {
970 return Err(QueryError::TypeError(format!(
971 "cannot assign non-var value to var column '{}'",
972 schema.columns[*idx].name
973 )))
974 }
975 };
976 Some((*idx, bytes_opt))
977 } else {
978 None
979 }
980 };
981
982 if let Some((col_idx, new_bytes_opt)) = var_fast {
983 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
984 let mut count = 0u64;
985 let mut fallback_rids: Vec<RowId> = Vec::new();
986 for rid in &matching_rids {
987 // Mission B2: logged variant so crash recovery
988 // replays the shrink. On a false return (row
989 // would have to grow), the rid is pushed to
990 // `fallback_rids` and the slower `update_hinted`
991 // path — which is already WAL-logged — picks it up.
992 let ok = self
993 .catalog
994 .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
995 .map_err(|e| QueryError::StorageError(e.to_string()))?;
996 if ok {
997 count += 1;
998 } else {
999 fallback_rids.push(*rid);
1000 }
1001 }
1002 for rid in fallback_rids {
1003 let mut row = match self.catalog.get(table, rid) {
1004 Some(r) => r,
1005 None => continue,
1006 };
1007 for (idx, val) in resolved_assignments.iter() {
1008 row[*idx] = val.clone();
1009 }
1010 self.catalog
1011 .update_hinted(table, rid, &row, Some(&changed_cols))
1012 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1013 count += 1;
1014 }
1015 self.view_registry.mark_dependents_dirty(table);
1016 return Ok(QueryResult::Modified(count));
1017 }
1018
1019 // Generic literal path: decode row, apply literal values.
1020 let mut count = 0u64;
1021 for rid in matching_rids {
1022 let mut row = match self.catalog.get(table, rid) {
1023 Some(r) => r,
1024 None => continue,
1025 };
1026 for (idx, val) in resolved_assignments.iter() {
1027 row[*idx] = val.clone();
1028 }
1029 self.catalog
1030 .update_hinted(table, rid, &row, Some(&changed_cols))
1031 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1032 count += 1;
1033 }
1034 self.view_registry.mark_dependents_dirty(table);
1035 return Ok(QueryResult::Modified(count));
1036 } // end if let Some(resolved_assignments)
1037
1038 // ── Expression-based update path ────────────────────────
1039 // At least one assignment contains a non-literal expression
1040 // (e.g., `age := .age + 1`). Evaluate per-row.
1041 let col_names: Vec<String> = {
1042 let schema_ref = self
1043 .catalog
1044 .schema(table)
1045 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1046 schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1047 };
1048 let mut count = 0u64;
1049 for rid in matching_rids {
1050 let mut row = match self.catalog.get(table, rid) {
1051 Some(r) => r,
1052 None => continue,
1053 };
1054 for (i, asgn) in assignments.iter().enumerate() {
1055 let val = eval_expr(&asgn.value, &row, &col_names);
1056 row[col_indices[i]] = val;
1057 }
1058 self.catalog
1059 .update_hinted(table, rid, &row, Some(&changed_cols))
1060 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1061 count += 1;
1062 }
1063 self.view_registry.mark_dependents_dirty(table);
1064 Ok(QueryResult::Modified(count))
1065 }
1066
1067 PlanNode::Delete { input, table } => {
1068 // Mission C Phase 3: no schema clone — collect_rids_for_mutation
1069 // looks up schema internally when it needs one, and the mutation
1070 // loop doesn't need the schema at all.
1071 //
1072 // Mission C Phase 12: route bulk deletes through
1073 // `Catalog::delete_many`, which batches the btree leaf
1074 // compaction and shares one `ensure_hot` per row between
1075 // the index-key extraction and the slot delete. On
1076 // `delete_by_filter` (100K fixture, ~20K matches) that
1077 // removes ~4ms of pure `Vec::remove` memmove from the btree
1078 // maintenance phase.
1079 //
1080 // Mission C Phase 16: for the common `delete where ...`
1081 // shape (Filter(SeqScan)) — and the rarer "delete
1082 // everything" shape (SeqScan) — skip the two-pass
1083 // `collect_rids_for_mutation` + `delete_many` flow entirely.
1084 // The fused `scan_delete_matching` primitive walks the
1085 // heap exactly once, paying one `ensure_hot` per page
1086 // instead of per-row. That closes the last major gap on
1087 // the bench's `delete_by_filter` workload.
1088 if let PlanNode::Filter {
1089 input: inner,
1090 predicate,
1091 } = input.as_ref()
1092 {
1093 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1094 if t == table {
1095 let schema = self
1096 .catalog
1097 .schema(table)
1098 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1099 let columns: Vec<String> =
1100 schema.columns.iter().map(|c| c.name.clone()).collect();
1101 let fast = FastLayout::new(schema);
1102 if let Some(compiled) =
1103 compile_predicate(predicate, &columns, &fast, schema)
1104 {
1105 // Mission B2: logged variant so every
1106 // matched rid hits the WAL during the
1107 // single-pass scan. Structure of the
1108 // fused scan is unchanged — only the
1109 // hook closure now also appends.
1110 let count = self
1111 .catalog
1112 .scan_delete_matching_logged(table, |data| compiled(data))
1113 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1114 self.view_registry.mark_dependents_dirty(table);
1115 return Ok(QueryResult::Modified(count));
1116 }
1117 }
1118 }
1119 } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1120 if t == table {
1121 // `delete from T` with no predicate — every live
1122 // row matches. One pass is still the right shape.
1123 // Mission B2: logged variant — see above.
1124 let count = self
1125 .catalog
1126 .scan_delete_matching_logged(table, |_| true)
1127 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1128 self.view_registry.mark_dependents_dirty(table);
1129 return Ok(QueryResult::Modified(count));
1130 }
1131 }
1132
1133 let matching_rids = self.collect_rids_for_mutation(input, table)?;
1134 let count = self
1135 .catalog
1136 .delete_many(table, &matching_rids)
1137 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1138 self.view_registry.mark_dependents_dirty(table);
1139 Ok(QueryResult::Modified(count))
1140 }
1141
1142 PlanNode::AliasScan { table, alias } => {
1143 // Mission E1.2: scan `table` and rename every output column
1144 // to `alias.field`. Used as a join leaf so downstream
1145 // NestedLoopJoin + Filter + Project nodes can resolve
1146 // `Expr::QualifiedField` lookups by direct column-name match.
1147 //
1148 // We don't bother with a fused zero-copy loop here yet — the
1149 // whole join path is nested-loop and correctness-first
1150 // (Phase E1.3 will introduce hash join and at that point we
1151 // can revisit whether to specialise AliasScan).
1152 let schema = self
1153 .catalog
1154 .schema(table)
1155 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1156 .clone();
1157 let columns: Vec<String> = schema
1158 .columns
1159 .iter()
1160 .map(|c| format!("{alias}.{}", c.name))
1161 .collect();
1162 let rows: Vec<Vec<Value>> = self
1163 .catalog
1164 .scan(table)
1165 .map_err(|e| QueryError::StorageError(e.to_string()))?
1166 .map(|(_, row)| row)
1167 .collect();
1168 Ok(QueryResult::Rows { columns, rows })
1169 }
1170
1171 PlanNode::NestedLoopJoin {
1172 left,
1173 right,
1174 on,
1175 kind,
1176 } => {
1177 // Materialise both sides. The executor ships two strategies:
1178 // 1. Hash join (E1.3) — when the `on` predicate is a
1179 // simple equi-predicate `left_col = right_col`, build a
1180 // FxHashMap<Value, Vec<row_idx>> over the right side
1181 // and probe with the left side. O(L + R) instead of
1182 // O(L × R). Handles Inner and LeftOuter.
1183 // 2. Nested loop (E1.2) — fallback for Cross, non-equi
1184 // predicates, or `on` expressions that reference
1185 // either side with something more complex than a
1186 // QualifiedField.
1187 let left_result = self.execute_plan(left)?;
1188 let right_result = self.execute_plan(right)?;
1189 let (left_columns, left_rows) = match left_result {
1190 QueryResult::Rows { columns, rows } => (columns, rows),
1191 _ => return Err("join left side must produce rows".into()),
1192 };
1193 let (right_columns, right_rows) = match right_result {
1194 QueryResult::Rows { columns, rows } => (columns, rows),
1195 _ => return Err("join right side must produce rows".into()),
1196 };
1197
1198 // WS2: byte-budget guard on the join build side. Charge both
1199 // materialized inputs before we build the hash table / probe;
1200 // the output is row-capped by check_join_limit below.
1201 self.charge_rows(&left_rows)?;
1202 self.charge_rows(&right_rows)?;
1203
1204 // Hash-join fast path.
1205 if !matches!(kind, JoinKind::Cross) {
1206 if let Some(pred) = on {
1207 if let Some((l_idx, r_idx)) =
1208 try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1209 {
1210 let result = hash_join(
1211 left_columns,
1212 left_rows,
1213 right_columns,
1214 right_rows,
1215 l_idx,
1216 r_idx,
1217 *kind,
1218 );
1219 if let QueryResult::Rows { ref rows, .. } = result {
1220 check_join_limit(rows.len())?;
1221 }
1222 return Ok(result);
1223 }
1224 }
1225 }
1226
1227 // Nested-loop fallback.
1228 let n_left = left_columns.len();
1229 let n_right = right_columns.len();
1230 let mut columns = Vec::with_capacity(n_left + n_right);
1231 columns.extend(left_columns);
1232 columns.extend(right_columns);
1233
1234 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1235 let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1236
1237 for left_row in &left_rows {
1238 let mut matched = false;
1239 for right_row in &right_rows {
1240 combined.clear();
1241 combined.extend_from_slice(left_row);
1242 combined.extend_from_slice(right_row);
1243 let keep = match kind {
1244 JoinKind::Cross => true,
1245 JoinKind::Inner | JoinKind::LeftOuter => match on {
1246 Some(pred) => eval_predicate(pred, &combined, &columns),
1247 // Missing `on` for non-cross joins is a
1248 // parser error, but if it slips through we
1249 // treat it as "match everything".
1250 None => true,
1251 },
1252 // RightOuter is rewritten to LeftOuter by the
1253 // planner, so we never see it here.
1254 JoinKind::RightOuter => {
1255 unreachable!("planner rewrites RightOuter to LeftOuter")
1256 }
1257 };
1258 if keep {
1259 rows.push(combined.clone());
1260 check_join_limit(rows.len())?;
1261 matched = true;
1262 }
1263 }
1264 if !matched && matches!(kind, JoinKind::LeftOuter) {
1265 let mut row = Vec::with_capacity(n_left + n_right);
1266 row.extend_from_slice(left_row);
1267 row.resize(n_left + n_right, Value::Empty);
1268 rows.push(row);
1269 check_join_limit(rows.len())?;
1270 }
1271 }
1272
1273 Ok(QueryResult::Rows { columns, rows })
1274 }
1275
1276 PlanNode::Distinct { input } => {
1277 let result = self.execute_plan(input)?;
1278 match result {
1279 QueryResult::Rows { columns, rows } => {
1280 let mut seen = std::collections::HashSet::new();
1281 let mut unique_rows = Vec::new();
1282 for row in rows {
1283 if seen.insert(row.clone()) {
1284 unique_rows.push(row);
1285 }
1286 }
1287 Ok(QueryResult::Rows {
1288 columns,
1289 rows: unique_rows,
1290 })
1291 }
1292 other => Ok(other),
1293 }
1294 }
1295
1296 PlanNode::GroupBy {
1297 input,
1298 keys,
1299 aggregates,
1300 having,
1301 } => {
1302 let result = self.execute_plan(input)?;
1303 match result {
1304 QueryResult::Rows { columns, rows } => {
1305 // WS2: byte-budget guard on the GROUP BY input buffer
1306 // (the hash table is bounded by the input it groups).
1307 self.charge_rows(&rows)?;
1308 // Resolve key column indices.
1309 let key_indices: Vec<usize> = keys
1310 .iter()
1311 .map(|k| {
1312 columns
1313 .iter()
1314 .position(|c| c == k)
1315 .ok_or_else(|| format!("group-by column '{k}' not found"))
1316 })
1317 .collect::<Result<Vec<_>, _>>()?;
1318
1319 // Resolve aggregate field indices. count(*) uses
1320 // sentinel usize::MAX — compute_group_aggregate
1321 // treats it as "count all rows in the group".
1322 let agg_field_indices: Vec<usize> = aggregates
1323 .iter()
1324 .map(|a| {
1325 if a.field == "*" {
1326 Ok(usize::MAX)
1327 } else {
1328 columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1329 format!("aggregate column '{}' not found", a.field)
1330 })
1331 }
1332 })
1333 .collect::<Result<Vec<_>, _>>()?;
1334
1335 // Group rows by key values (preserving insertion order).
1336 let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1337 rustc_hash::FxHashMap::default();
1338 let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1339 for (ri, row) in rows.iter().enumerate() {
1340 let key: Vec<Value> =
1341 key_indices.iter().map(|&i| row[i].clone()).collect();
1342 match group_map.get(&key) {
1343 Some(&idx) => groups[idx].1.push(ri),
1344 None => {
1345 let idx = groups.len();
1346 group_map.insert(key.clone(), idx);
1347 groups.push((key, vec![ri]));
1348 }
1349 }
1350 }
1351
1352 // Build output column names: keys ++ aggregate output names.
1353 let mut out_columns: Vec<String> = keys.clone();
1354 for agg in aggregates.iter() {
1355 out_columns.push(agg.output_name.clone());
1356 }
1357
1358 // Compute aggregates per group.
1359 let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1360 for (key_vals, row_indices) in &groups {
1361 let mut row = key_vals.clone();
1362 for (ai, agg) in aggregates.iter().enumerate() {
1363 let col_idx = agg_field_indices[ai];
1364 let val = compute_group_aggregate(
1365 agg.function,
1366 &rows,
1367 row_indices,
1368 col_idx,
1369 );
1370 row.push(val);
1371 }
1372 out_rows.push(row);
1373 }
1374
1375 // Apply HAVING filter.
1376 if let Some(having_expr) = having {
1377 out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1378 }
1379
1380 Ok(QueryResult::Rows {
1381 columns: out_columns,
1382 rows: out_rows,
1383 })
1384 }
1385 _ => Err("group by requires row input".into()),
1386 }
1387 }
1388
1389 PlanNode::CreateTable { name, fields } => {
1390 let columns: Vec<ColumnDef> = fields
1391 .iter()
1392 .enumerate()
1393 .map(|(i, f)| -> Result<ColumnDef, QueryError> {
1394 Ok(ColumnDef {
1395 name: f.name.clone(),
1396 type_id: type_name_to_id(&f.type_name)
1397 .map_err(QueryError::TypeError)?,
1398 required: f.required,
1399 position: i as u16,
1400 })
1401 })
1402 .collect::<Result<Vec<_>, _>>()?;
1403 let schema = Schema {
1404 table_name: name.clone(),
1405 columns,
1406 };
1407 self.catalog
1408 .create_table(schema)
1409 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1410 // Declaring a field `unique` auto-creates a unique B+tree
1411 // index, which is where uniqueness is enforced on writes.
1412 for f in fields.iter().filter(|f| f.unique) {
1413 self.catalog
1414 .create_index_unique(name, &f.name, true)
1415 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1416 }
1417 Ok(QueryResult::Created(name.clone()))
1418 }
1419
1420 PlanNode::AlterTable { table, action } => match action {
1421 AlterAction::AddColumn {
1422 name,
1423 type_name,
1424 required,
1425 } => {
1426 let position = self
1427 .catalog
1428 .schema(table)
1429 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1430 .columns
1431 .len() as u16;
1432 let col = ColumnDef {
1433 name: name.clone(),
1434 type_id: type_name_to_id(type_name).map_err(QueryError::TypeError)?,
1435 required: *required,
1436 position,
1437 };
1438 self.catalog
1439 .alter_table_add_column(table, col)
1440 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1441 Ok(QueryResult::Executed {
1442 message: format!("column '{name}' added to '{table}'"),
1443 })
1444 }
1445 AlterAction::DropColumn { name } => {
1446 self.catalog
1447 .alter_table_drop_column(table, name)
1448 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1449 Ok(QueryResult::Executed {
1450 message: format!("column '{name}' dropped from '{table}'"),
1451 })
1452 }
1453 AlterAction::AddIndex { column } => {
1454 self.catalog
1455 .create_index(table, column)
1456 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1457 Ok(QueryResult::Executed {
1458 message: format!("index on '{table}.{column}' created"),
1459 })
1460 }
1461 AlterAction::AddUnique { column } => {
1462 // No DropIndex exists, so we cannot upgrade an existing
1463 // non-unique index in place — reject it cleanly.
1464 if self.catalog.has_index(table, column) {
1465 return Err(QueryError::Execution(format!(
1466 "cannot add unique on {table}.{column}: column already indexed"
1467 )));
1468 }
1469 // Scan existing rows for duplicate (non-null) values
1470 // before creating the unique index.
1471 {
1472 let tbl = self
1473 .catalog
1474 .get_table(table)
1475 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1476 let col_idx = tbl.schema.column_index(column).ok_or_else(|| {
1477 QueryError::ColumnNotFound {
1478 table: table.to_string(),
1479 column: column.clone(),
1480 }
1481 })?;
1482 let mut seen = std::collections::HashSet::new();
1483 for (_, row) in tbl.scan() {
1484 let v = &row[col_idx];
1485 if v.is_empty() {
1486 continue;
1487 }
1488 if !seen.insert(v.clone()) {
1489 return Err(QueryError::Execution(format!(
1490 "cannot add unique on {table}.{column}: \
1491 duplicate value {v:?} exists"
1492 )));
1493 }
1494 }
1495 }
1496 self.catalog
1497 .create_index_unique(table, column, true)
1498 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1499 Ok(QueryResult::Executed {
1500 message: format!("unique index on '{table}.{column}' created"),
1501 })
1502 }
1503 },
1504
1505 PlanNode::DropTable { name } => {
1506 self.catalog
1507 .drop_table(name)
1508 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1509 Ok(QueryResult::Executed {
1510 message: format!("table '{name}' dropped"),
1511 })
1512 }
1513
1514 PlanNode::CreateView { name, query_text } => {
1515 self.create_view(name, query_text)?;
1516 Ok(QueryResult::Executed {
1517 message: format!("materialized view '{name}' created"),
1518 })
1519 }
1520
1521 PlanNode::RefreshView { name } => {
1522 self.refresh_view(name)?;
1523 Ok(QueryResult::Executed {
1524 message: format!("materialized view '{name}' refreshed"),
1525 })
1526 }
1527
1528 PlanNode::DropView { name } => {
1529 self.drop_view(name)?;
1530 Ok(QueryResult::Executed {
1531 message: format!("materialized view '{name}' dropped"),
1532 })
1533 }
1534
1535 PlanNode::Window { input, windows } => {
1536 let result = self.execute_plan(input)?;
1537 execute_window(result, windows)
1538 }
1539
1540 PlanNode::Union { left, right, all } => {
1541 let left_result = self.execute_plan(left)?;
1542 let right_result = self.execute_plan(right)?;
1543 let (left_cols, left_rows) = match left_result {
1544 QueryResult::Rows { columns, rows } => (columns, rows),
1545 _ => return Err("UNION requires query results on left side".into()),
1546 };
1547 let (_, right_rows) = match right_result {
1548 QueryResult::Rows { columns, rows } => (columns, rows),
1549 _ => return Err("UNION requires query results on right side".into()),
1550 };
1551 let mut combined = left_rows;
1552 if *all {
1553 // UNION ALL — just concatenate.
1554 combined.extend(right_rows);
1555 } else {
1556 // UNION — deduplicate using the same HashSet approach
1557 // as DISTINCT. Value already implements Hash + Eq.
1558 let mut seen = std::collections::HashSet::new();
1559 for row in &combined {
1560 seen.insert(row.clone());
1561 }
1562 for row in right_rows {
1563 if seen.insert(row.clone()) {
1564 combined.push(row);
1565 }
1566 }
1567 }
1568 Ok(QueryResult::Rows {
1569 columns: left_cols,
1570 rows: combined,
1571 })
1572 }
1573
1574 PlanNode::Explain { input } => {
1575 let text = format_plan_tree(input, 0);
1576 Ok(QueryResult::Rows {
1577 columns: vec!["plan".to_string()],
1578 rows: text
1579 .lines()
1580 .map(|line| vec![Value::Str(line.to_string())])
1581 .collect(),
1582 })
1583 }
1584
1585 PlanNode::Begin => {
1586 if self.in_transaction {
1587 return Err(QueryError::Execution(
1588 "already in a transaction (nested transactions not supported)".into(),
1589 ));
1590 }
1591 self.in_transaction = true;
1592 Ok(QueryResult::Executed {
1593 message: "transaction started".to_string(),
1594 })
1595 }
1596
1597 PlanNode::Commit => {
1598 if !self.in_transaction {
1599 return Err(QueryError::Execution(
1600 "no active transaction to commit".into(),
1601 ));
1602 }
1603 self.in_transaction = false;
1604 self.catalog
1605 .sync_wal()
1606 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1607 Ok(QueryResult::Executed {
1608 message: "transaction committed".to_string(),
1609 })
1610 }
1611
1612 PlanNode::Rollback => {
1613 if !self.in_transaction {
1614 return Err(QueryError::Execution(
1615 "no active transaction to roll back".into(),
1616 ));
1617 }
1618 self.in_transaction = false;
1619 self.catalog
1620 .rollback_to_last_sync()
1621 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1622 if let Ok(mut cache) = self.plan_cache.lock() {
1623 cache.clear();
1624 }
1625 self.view_registry = ViewRegistry::open(self.catalog.data_dir())
1626 .unwrap_or_else(|_| ViewRegistry::new(self.catalog.data_dir()));
1627 Ok(QueryResult::Executed {
1628 message: "transaction rolled back".to_string(),
1629 })
1630 }
1631
1632 PlanNode::IndexScan { table, column, key } => {
1633 let key_value = literal_to_value(key)?;
1634 let tbl = self
1635 .catalog
1636 .get_table(table)
1637 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1638 let columns: Vec<String> =
1639 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1640
1641 // Fast path: the table has a B-tree on this column.
1642 // Uses index_lookup_all to return ALL matching rows for
1643 // both unique and non-unique indexes.
1644 if tbl.has_index(column) {
1645 let rids = tbl.index_lookup_all(column, &key_value);
1646 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1647 for rid in rids {
1648 if let Some(data) = tbl.heap.get(rid) {
1649 rows.push(decode_row(&tbl.schema, &data));
1650 }
1651 }
1652 return Ok(QueryResult::Rows { columns, rows });
1653 }
1654
1655 // Fallback: no index on this column. The planner emits IndexScan
1656 // eagerly (it has no visibility into which columns are indexed
1657 // at plan time), so here we must behave like SeqScan+Filter on
1658 // `.col = literal`: return *all* matching rows, not just the
1659 // first one. A non-indexed column isn't necessarily unique.
1660 // We compile the eq predicate once and stream without any
1661 // per-row decode for non-matching rows.
1662 let schema = &tbl.schema;
1663 let fast = FastLayout::new(schema);
1664 let synth_pred = Expr::BinaryOp(
1665 Box::new(Expr::Field(column.clone())),
1666 BinOp::Eq,
1667 Box::new(key.clone()),
1668 );
1669 if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1670 // Mission F: skip the first 4 Vec doublings.
1671 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1672 self.catalog
1673 .for_each_row_raw(table, |_rid, data| {
1674 if compiled(data) {
1675 rows.push(decode_row(schema, data));
1676 }
1677 })
1678 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1679 return Ok(QueryResult::Rows { columns, rows });
1680 }
1681
1682 // Last resort: slow eq-check on materialised rows.
1683 let col_idx =
1684 schema
1685 .column_index(column)
1686 .ok_or_else(|| QueryError::ColumnNotFound {
1687 table: String::new(),
1688 column: column.clone(),
1689 })?;
1690 let rows: Vec<Vec<Value>> = tbl
1691 .scan()
1692 .filter_map(|(_, row)| {
1693 if row[col_idx] == key_value {
1694 Some(row)
1695 } else {
1696 None
1697 }
1698 })
1699 .collect();
1700 Ok(QueryResult::Rows { columns, rows })
1701 }
1702
1703 PlanNode::RangeScan {
1704 table,
1705 column,
1706 start,
1707 end,
1708 } => {
1709 let tbl = self
1710 .catalog
1711 .get_table(table)
1712 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1713 let columns: Vec<String> =
1714 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1715 let schema = &tbl.schema;
1716
1717 let start_val = match start {
1718 Some((expr, _)) => Some(literal_to_value(expr)?),
1719 None => None,
1720 };
1721 let end_val = match end {
1722 Some((expr, _)) => Some(literal_to_value(expr)?),
1723 None => None,
1724 };
1725 let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1726 let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1727
1728 // Non-unique index: walk the composite (value, rid) leaf
1729 // chain between prefix bounds, fetch each row from the heap,
1730 // and recheck. The recheck enforces exclusive bounds
1731 // (range_rids is inclusive) and defensively skips any decoded
1732 // null (nulls are never indexed, so they must not match).
1733 if tbl.is_index_unique(column) == Some(false) {
1734 if let Some(btree) = tbl.index(column) {
1735 if start_val.is_some() || end_val.is_some() {
1736 let col_idx = schema.column_index(column).ok_or_else(|| {
1737 QueryError::ColumnNotFound {
1738 table: String::new(),
1739 column: column.clone(),
1740 }
1741 })?;
1742 let rids = btree.range_rids(start_val.as_ref(), end_val.as_ref());
1743 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1744 for rid in rids {
1745 if let Some(data) = tbl.heap.get(rid) {
1746 let row = decode_row(schema, &data);
1747 if !row[col_idx].is_empty()
1748 && range_matches(
1749 &row[col_idx],
1750 &start_val,
1751 start_inclusive,
1752 &end_val,
1753 end_inclusive,
1754 )
1755 {
1756 rows.push(row);
1757 }
1758 }
1759 }
1760 return Ok(QueryResult::Rows { columns, rows });
1761 }
1762 }
1763 }
1764
1765 // Range scans use the btree fast path for unique indexes,
1766 // walking raw column-value keys directly.
1767 if tbl.is_index_unique(column) == Some(true) {
1768 if let Some(btree) = tbl.index(column) {
1769 let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1770 (Some(s), Some(e)) => btree.range(s, e).collect(),
1771 (Some(s), None) => btree.range_from(s),
1772 (None, Some(e)) => btree.range_to(e),
1773 (None, None) => {
1774 let rows: Vec<Vec<Value>> =
1775 tbl.scan().map(|(_, row)| row).collect();
1776 return Ok(QueryResult::Rows { columns, rows });
1777 }
1778 };
1779 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1780 for (key, rid) in hits {
1781 if !start_inclusive {
1782 if let Some(ref s) = start_val {
1783 if &key == s {
1784 continue;
1785 }
1786 }
1787 }
1788 if !end_inclusive {
1789 if let Some(ref e) = end_val {
1790 if &key == e {
1791 continue;
1792 }
1793 }
1794 }
1795 if let Some(data) = tbl.heap.get(rid) {
1796 rows.push(decode_row(schema, &data));
1797 }
1798 }
1799 return Ok(QueryResult::Rows { columns, rows });
1800 }
1801 }
1802
1803 // Fallback: no index — synthesize range predicate and scan.
1804 let fast = FastLayout::new(schema);
1805 let synth = synthesize_range_predicate(column, start, end);
1806 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1807 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1808 self.catalog
1809 .for_each_row_raw(table, |_rid, data| {
1810 if compiled(data) {
1811 rows.push(decode_row(schema, data));
1812 }
1813 })
1814 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1815 return Ok(QueryResult::Rows { columns, rows });
1816 }
1817
1818 let col_idx =
1819 schema
1820 .column_index(column)
1821 .ok_or_else(|| QueryError::ColumnNotFound {
1822 table: String::new(),
1823 column: column.clone(),
1824 })?;
1825 let rows: Vec<Vec<Value>> = tbl
1826 .scan()
1827 .filter(|(_, row)| {
1828 range_matches(
1829 &row[col_idx],
1830 &start_val,
1831 start_inclusive,
1832 &end_val,
1833 end_inclusive,
1834 )
1835 })
1836 .map(|(_, row)| row)
1837 .collect();
1838 Ok(QueryResult::Rows { columns, rows })
1839 }
1840 }
1841 }
1842
1843 // ─── Materialized view operations ──────────────────────────────────────
1844
1845 /// Create a materialized view: execute the source query, store results
1846 /// in a new backing table, and register the view.
1847 fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1848 if self.view_registry.is_view(name) {
1849 return Err(QueryError::ViewError(format!(
1850 "materialized view '{name}' already exists"
1851 )));
1852 }
1853 // Execute the source query to get the result set.
1854 let result = self.execute_powql(query_text)?;
1855 let (columns, rows) = match result {
1856 QueryResult::Rows { columns, rows } => (columns, rows),
1857 _ => return Err("view source query must be a SELECT".into()),
1858 };
1859 // Derive a schema for the backing table from the query result columns.
1860 let schema = self.derive_view_schema(name, &columns, &rows);
1861 // Create the backing table and insert the result rows.
1862 self.catalog
1863 .create_table(schema)
1864 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1865 for row in &rows {
1866 self.catalog
1867 .insert(name, row)
1868 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1869 }
1870 // Determine which base tables this view depends on by parsing the query.
1871 let depends_on = self.extract_view_deps(query_text);
1872 self.view_registry
1873 .register(ViewDef {
1874 name: name.to_string(),
1875 query: query_text.to_string(),
1876 depends_on,
1877 dirty: false,
1878 })
1879 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1880 Ok(())
1881 }
1882
1883 /// Refresh a materialized view: re-execute its source query and replace
1884 /// the backing table's contents.
1885 fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1886 let def = self
1887 .view_registry
1888 .get(name)
1889 .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1890 let query_text = def.query.clone();
1891 // Execute the source query.
1892 let result = self.execute_powql(&query_text)?;
1893 let (_columns, rows) = match result {
1894 QueryResult::Rows { columns, rows } => (columns, rows),
1895 _ => return Err("view source query must be a SELECT".into()),
1896 };
1897 // Clear old data and insert fresh results. Mission B2: logged
1898 // variant — view refreshes are a mutation and crash recovery
1899 // must see them.
1900 self.catalog
1901 .scan_delete_matching_logged(name, |_| true)
1902 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1903 for row in &rows {
1904 self.catalog
1905 .insert(name, row)
1906 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1907 }
1908 self.view_registry.mark_clean(name);
1909 Ok(())
1910 }
1911
1912 /// Drop a materialized view: remove the backing table and unregister.
1913 fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1914 if !self.view_registry.is_view(name) {
1915 return Err(QueryError::ViewError(format!(
1916 "materialized view '{name}' not found"
1917 )));
1918 }
1919 self.view_registry
1920 .unregister(name)
1921 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1922 self.catalog
1923 .drop_table(name)
1924 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1925 Ok(())
1926 }
1927
1928 /// Derive a storage `Schema` for a view's backing table from query
1929 /// result column names and the first row's types.
1930 fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1931 use powdb_storage::types::{ColumnDef, TypeId};
1932 let cols: Vec<ColumnDef> = columns
1933 .iter()
1934 .enumerate()
1935 .map(|(i, col_name)| {
1936 let type_id = rows
1937 .first()
1938 .and_then(|row| row.get(i))
1939 .map(|v| v.type_id())
1940 .unwrap_or(TypeId::Str);
1941 ColumnDef {
1942 name: col_name.clone(),
1943 type_id,
1944 required: false,
1945 position: i as u16,
1946 }
1947 })
1948 .collect();
1949 Schema {
1950 table_name: name.to_string(),
1951 columns: cols,
1952 }
1953 }
1954
1955 /// Extract base table dependencies from a view's source query by
1956 /// parsing it and collecting the source table name.
1957 fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1958 use crate::parser::parse;
1959 match parse(query_text) {
1960 Ok(Statement::Query(q)) => {
1961 let mut deps = vec![q.source.clone()];
1962 for j in &q.joins {
1963 deps.push(j.source.clone());
1964 }
1965 deps
1966 }
1967 _ => Vec::new(),
1968 }
1969 }
1970
1971 // ─── Specialized fast paths ─────────────────────────────────────────────
1972 //
1973 // These methods are helpers for the `execute_plan` match arms above.
1974 // Each returns `Ok(Some(result))` when the fast path fires, `Ok(None)`
1975 // when the shape isn't supported (caller falls back to generic code).
1976
1977 /// Aggregate sum/avg/min/max over a single fixed-size i64 column, with
1978 /// an optional compiled filter predicate. Walks raw row bytes — zero
1979 /// per-row allocation. Uses i128 accumulator for sum/avg overflow safety.
1980 pub(super) fn agg_single_col_fast(
1981 &self,
1982 table: &str,
1983 col: &str,
1984 function: AggFunc,
1985 predicate: Option<&Expr>,
1986 ) -> Result<Option<QueryResult>, QueryError> {
1987 let schema = self
1988 .catalog
1989 .schema(table)
1990 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1991 .clone();
1992 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1993 let col_idx = match schema.column_index(col) {
1994 Some(i) => i,
1995 None => return Ok(None),
1996 };
1997 // Only fast-path fixed-size numeric columns (Int/Float) for
1998 // sum/avg/min/max/count. Mission D10: Float parity — prior version
1999 // bailed on Float columns, forcing them through the generic row-
2000 // decoding path that allocated a Vec<Value> per row and dispatched
2001 // on Value::cmp for every compare. f64 decode is structurally the
2002 // same as i64 (load 8 bytes, cast), so the fast path handles both.
2003 let col_type = schema.columns[col_idx].type_id;
2004 if col_type != TypeId::Int && col_type != TypeId::Float {
2005 return Ok(None);
2006 }
2007
2008 let fast = FastLayout::new(&schema);
2009 // Mission C Phase 20b: inline the numeric-column reader instead of
2010 // building a `Box<dyn Fn>`. Eliminates 100K vtable dispatches per
2011 // 100K-row agg scan — every reader call folds directly into the
2012 // hot loop below.
2013 let byte_offset = match fast.fixed_offsets[col_idx] {
2014 Some(o) => o,
2015 None => return Ok(None),
2016 };
2017 let bitmap_byte = col_idx / 8;
2018 let bitmap_bit = (col_idx % 8) as u32;
2019 let data_offset = 2 + fast.bitmap_size + byte_offset;
2020
2021 // Optional compiled filter.
2022 let compiled_pred: Option<CompiledPredicate> = match predicate {
2023 Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
2024 Some(c) => Some(c),
2025 None => return Ok(None), // let generic path handle it
2026 },
2027 None => None,
2028 };
2029
2030 // Mission C Phase 20b: specialize the inner loop per aggregate
2031 // function. The previous version ran a `match function { ... }`
2032 // *inside* the closure, which kept LLVM from producing optimal
2033 // scalar code for each variant (agg_max regressed ~23% vs the
2034 // baseline Box<dyn Fn> version even though per-row vtable cost
2035 // should have been strictly lower). Pushing the match out of the
2036 // hot loop lets each specialized body fold cleanly into
2037 // `for_each_row_raw` and removes a captured `AggFunc` + match
2038 // dispatch per row.
2039 //
2040 // Mission D10: same specialisation applies to the Float branch.
2041 // For Min/Max we use `f64::total_cmp` so the result matches
2042 // `Value::Ord` — this is the same ordering ORDER BY and the
2043 // top-N sort fast path use, keeping semantics consistent across
2044 // read paths (NaN compares as greatest, -0.0 < +0.0 for
2045 // deterministic tie-breaking).
2046 //
2047 // Mission D11 Phase 1: each inner loop now splits on presence of
2048 // a predicate (`if let Some(pred) = &compiled_pred`) so the hot
2049 // body never re-tests `Option` per row, and reads column bytes
2050 // via `read_i64_unchecked` / `read_f64_unchecked` helpers that
2051 // drop two bounds checks per row (null bitmap byte + value
2052 // slice). Safety is carried by the `FastLayout` invariant that
2053 // `data_offset + 8 <= row_len` for any fixed-size column; see
2054 // the helper doc comments. Hot loops are macro-generated so the
2055 // with-pred / no-pred split can't drift between variants.
2056 let result = match col_type {
2057 TypeId::Int => match function {
2058 AggFunc::Sum | AggFunc::Avg => {
2059 let mut sum_i128: i128 = 0;
2060 let mut count: i64 = 0;
2061 agg_int_loop!(
2062 self,
2063 table,
2064 compiled_pred,
2065 bitmap_byte,
2066 bitmap_bit,
2067 data_offset,
2068 |v: i64| {
2069 count += 1;
2070 sum_i128 += v as i128;
2071 }
2072 );
2073 if matches!(function, AggFunc::Sum) {
2074 let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
2075 QueryResult::Scalar(Value::Int(clamped))
2076 } else if count == 0 {
2077 QueryResult::Scalar(Value::Empty)
2078 } else {
2079 let avg = (sum_i128 as f64) / (count as f64);
2080 QueryResult::Scalar(Value::Float(avg))
2081 }
2082 }
2083 AggFunc::Min => {
2084 let mut min_v: Option<i64> = None;
2085 agg_int_loop!(
2086 self,
2087 table,
2088 compiled_pred,
2089 bitmap_byte,
2090 bitmap_bit,
2091 data_offset,
2092 |v: i64| {
2093 min_v = Some(match min_v {
2094 Some(m) => m.min(v),
2095 None => v,
2096 });
2097 }
2098 );
2099 QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
2100 }
2101 AggFunc::Max => {
2102 let mut max_v: Option<i64> = None;
2103 agg_int_loop!(
2104 self,
2105 table,
2106 compiled_pred,
2107 bitmap_byte,
2108 bitmap_bit,
2109 data_offset,
2110 |v: i64| {
2111 max_v = Some(match max_v {
2112 Some(m) => m.max(v),
2113 None => v,
2114 });
2115 }
2116 );
2117 QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
2118 }
2119 AggFunc::Count => {
2120 let mut count: i64 = 0;
2121 agg_int_loop!(
2122 self,
2123 table,
2124 compiled_pred,
2125 bitmap_byte,
2126 bitmap_bit,
2127 data_offset,
2128 |_v: i64| {
2129 count += 1;
2130 }
2131 );
2132 QueryResult::Scalar(Value::Int(count))
2133 }
2134 AggFunc::CountDistinct => {
2135 let mut seen = rustc_hash::FxHashSet::default();
2136 agg_int_loop!(
2137 self,
2138 table,
2139 compiled_pred,
2140 bitmap_byte,
2141 bitmap_bit,
2142 data_offset,
2143 |v: i64| {
2144 seen.insert(v);
2145 }
2146 );
2147 QueryResult::Scalar(Value::Int(seen.len() as i64))
2148 }
2149 },
2150 TypeId::Float => match function {
2151 AggFunc::Sum => {
2152 // Use a single f64 accumulator. Naive summation is
2153 // sufficient for MVP parity; if precision becomes an
2154 // issue on long scans we can upgrade to Kahan–Neumaier
2155 // compensated sum (~2x scalar cost, zero error growth).
2156 let mut sum: f64 = 0.0;
2157 agg_float_loop!(
2158 self,
2159 table,
2160 compiled_pred,
2161 bitmap_byte,
2162 bitmap_bit,
2163 data_offset,
2164 |v: f64| {
2165 sum += v;
2166 }
2167 );
2168 QueryResult::Scalar(Value::Float(sum))
2169 }
2170 AggFunc::Avg => {
2171 let mut sum: f64 = 0.0;
2172 let mut count: i64 = 0;
2173 agg_float_loop!(
2174 self,
2175 table,
2176 compiled_pred,
2177 bitmap_byte,
2178 bitmap_bit,
2179 data_offset,
2180 |v: f64| {
2181 sum += v;
2182 count += 1;
2183 }
2184 );
2185 if count == 0 {
2186 QueryResult::Scalar(Value::Empty)
2187 } else {
2188 QueryResult::Scalar(Value::Float(sum / count as f64))
2189 }
2190 }
2191 AggFunc::Min => {
2192 // `total_cmp` for deterministic NaN handling (matches
2193 // Value::Ord). NaN compares greatest, so Min will
2194 // correctly ignore it in favour of any finite value.
2195 let mut min_v: Option<f64> = None;
2196 agg_float_loop!(
2197 self,
2198 table,
2199 compiled_pred,
2200 bitmap_byte,
2201 bitmap_bit,
2202 data_offset,
2203 |v: f64| {
2204 min_v = Some(match min_v {
2205 Some(m) => {
2206 if v.total_cmp(&m).is_lt() {
2207 v
2208 } else {
2209 m
2210 }
2211 }
2212 None => v,
2213 });
2214 }
2215 );
2216 QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2217 }
2218 AggFunc::Max => {
2219 let mut max_v: Option<f64> = None;
2220 agg_float_loop!(
2221 self,
2222 table,
2223 compiled_pred,
2224 bitmap_byte,
2225 bitmap_bit,
2226 data_offset,
2227 |v: f64| {
2228 max_v = Some(match max_v {
2229 Some(m) => {
2230 if v.total_cmp(&m).is_gt() {
2231 v
2232 } else {
2233 m
2234 }
2235 }
2236 None => v,
2237 });
2238 }
2239 );
2240 QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2241 }
2242 AggFunc::Count => {
2243 let mut count: i64 = 0;
2244 agg_float_loop!(
2245 self,
2246 table,
2247 compiled_pred,
2248 bitmap_byte,
2249 bitmap_bit,
2250 data_offset,
2251 |_v: f64| {
2252 count += 1;
2253 }
2254 );
2255 QueryResult::Scalar(Value::Int(count))
2256 }
2257 AggFunc::CountDistinct => {
2258 // Hash on `f64::to_bits` — matches `Value::Hash`, so
2259 // distinct NaN bit patterns count as distinct and
2260 // -0.0/+0.0 count as distinct. Consistent with how
2261 // Float values are hashed in every other DISTINCT /
2262 // GROUP BY path.
2263 let mut seen = rustc_hash::FxHashSet::default();
2264 agg_float_loop!(
2265 self,
2266 table,
2267 compiled_pred,
2268 bitmap_byte,
2269 bitmap_bit,
2270 data_offset,
2271 |v: f64| {
2272 seen.insert(v.to_bits());
2273 }
2274 );
2275 QueryResult::Scalar(Value::Int(seen.len() as i64))
2276 }
2277 },
2278 _ => unreachable!("type guard above restricts to Int/Float"),
2279 };
2280 Ok(Some(result))
2281 }
2282
2283 /// `Project(Limit(Filter(SeqScan)))` and `Project(Limit(SeqScan))`.
2284 /// Streams rows, decodes only projected columns, stops at the limit.
2285 pub(super) fn project_filter_limit_fast(
2286 &self,
2287 table: &str,
2288 fields: &[ProjectField],
2289 limit: usize,
2290 predicate: Option<&Expr>,
2291 ) -> Result<Option<QueryResult>, QueryError> {
2292 let schema = self
2293 .catalog
2294 .schema(table)
2295 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2296 .clone();
2297 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2298
2299 // Each projection field must be a simple `.field` reference for this
2300 // fast path. Aliased or computed fields fall through.
2301 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2302 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2303 for f in fields {
2304 let name = match &f.expr {
2305 Expr::Field(n) => n.clone(),
2306 _ => return Ok(None),
2307 };
2308 let idx = match all_columns.iter().position(|c| c == &name) {
2309 Some(i) => i,
2310 None => return Ok(None),
2311 };
2312 proj_indices.push(idx);
2313 proj_columns.push(f.alias.clone().unwrap_or(name));
2314 }
2315
2316 let fast = FastLayout::new(&schema);
2317 let row_layout = RowLayout::new(&schema);
2318
2319 let compiled_pred: Option<CompiledPredicate> = match predicate {
2320 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2321 Some(c) => Some(c),
2322 None => return Ok(None),
2323 },
2324 None => None,
2325 };
2326
2327 let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2328 // Mission D2: use try_for_each_row_raw to actually stop iterating
2329 // once the limit is reached. The previous `done` flag only short-
2330 // circuited the closure body, so a `limit 100` over 100K rows still
2331 // walked all 100K slots — burning ~30x SQLite on scan_filter_project_top100.
2332 self.catalog
2333 .try_for_each_row_raw(table, |_rid, data| {
2334 use std::ops::ControlFlow;
2335 if let Some(ref pred) = compiled_pred {
2336 if !pred(data) {
2337 return ControlFlow::Continue(());
2338 }
2339 }
2340 let row: Vec<Value> = proj_indices
2341 .iter()
2342 .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2343 .collect();
2344 out.push(row);
2345 if out.len() >= limit {
2346 ControlFlow::Break(())
2347 } else {
2348 ControlFlow::Continue(())
2349 }
2350 })
2351 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2352
2353 Ok(Some(QueryResult::Rows {
2354 columns: proj_columns,
2355 rows: out,
2356 }))
2357 }
2358
2359 /// `Project(Limit(Sort(Filter(SeqScan))))` and `Project(Limit(Sort(SeqScan)))`.
2360 /// Bounded top-N heap over the sort key. Only the sort key needs to be
2361 /// read per row; projected columns are decoded only for the final
2362 /// winning rows when the heap drains.
2363 pub(super) fn project_filter_sort_limit_fast(
2364 &self,
2365 table: &str,
2366 fields: &[ProjectField],
2367 sort_field: &str,
2368 descending: bool,
2369 limit: usize,
2370 predicate: Option<&Expr>,
2371 ) -> Result<Option<QueryResult>, QueryError> {
2372 if limit == 0 {
2373 // Degenerate case — empty result. Let the generic path handle it
2374 // for proper column naming.
2375 return Ok(None);
2376 }
2377 let schema = self
2378 .catalog
2379 .schema(table)
2380 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2381 .clone();
2382 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2383
2384 // Sort key must be a fixed-size numeric column (Int or Float).
2385 // Mission D10: extended from Int-only. Float sort keys use a
2386 // sortable-u64 transform (see `f64_to_sortable_u64`) so the heap
2387 // path stays keyed on `u64` and the whole branch shape is
2388 // identical to the Int case — no new heap types, no `total_cmp`
2389 // closures in the hot loop.
2390 let sort_idx = match schema.column_index(sort_field) {
2391 Some(i) => i,
2392 None => return Ok(None),
2393 };
2394 let sort_col_type = schema.columns[sort_idx].type_id;
2395 if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2396 return Ok(None);
2397 }
2398
2399 // Each projection field must be a simple `.field`.
2400 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2401 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2402 for f in fields {
2403 let name = match &f.expr {
2404 Expr::Field(n) => n.clone(),
2405 _ => return Ok(None),
2406 };
2407 let idx = match all_columns.iter().position(|c| c == &name) {
2408 Some(i) => i,
2409 None => return Ok(None),
2410 };
2411 proj_indices.push(idx);
2412 proj_columns.push(f.alias.clone().unwrap_or(name));
2413 }
2414
2415 let fast = FastLayout::new(&schema);
2416 let row_layout = RowLayout::new(&schema);
2417 // Mission C Phase 20b: inline numeric-column reader (no Box<dyn Fn>).
2418 let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2419 Some(o) => o,
2420 None => return Ok(None),
2421 };
2422 let sort_bitmap_byte = sort_idx / 8;
2423 let sort_bitmap_bit = (sort_idx % 8) as u32;
2424 let sort_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2425
2426 let compiled_pred: Option<CompiledPredicate> = match predicate {
2427 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2428 Some(c) => Some(c),
2429 None => return Ok(None),
2430 },
2431 None => None,
2432 };
2433
2434 // Bounded top-N heap. For `order .x desc limit N`, we want the N
2435 // largest values — use a min-heap so the smallest is at the top and
2436 // can be popped when a better candidate arrives. For ascending, use
2437 // a max-heap. We tie-break with a monotonic `seq` counter so the
2438 // result is deterministic and stable.
2439 //
2440 // To keep this simple we maintain two typed heaps and pick by
2441 // direction.
2442 let drained: Vec<Vec<u8>> = match sort_col_type {
2443 TypeId::Int => {
2444 let mut seq: u64 = 0;
2445 let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2446 BinaryHeap::with_capacity(limit);
2447 let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2448 BinaryHeap::with_capacity(limit);
2449
2450 self.catalog
2451 .for_each_row_raw(table, |_rid, data| {
2452 if let Some(ref pred) = compiled_pred {
2453 if !pred(data) {
2454 return;
2455 }
2456 }
2457 // Inlined int-column reader: null check + i64 decode.
2458 if data.len() < sort_data_offset + 8 {
2459 return;
2460 }
2461 let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2462 if is_null {
2463 return;
2464 }
2465 let key = i64::from_le_bytes(
2466 data[sort_data_offset..sort_data_offset + 8]
2467 .try_into()
2468 .unwrap_or_else(|_| unreachable!()),
2469 );
2470 let id = seq;
2471 seq += 1;
2472
2473 if descending {
2474 if heap_desc.len() < limit {
2475 heap_desc.push(Reverse((key, id, data.to_vec())));
2476 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2477 if key > *top_key {
2478 heap_desc.pop();
2479 heap_desc.push(Reverse((key, id, data.to_vec())));
2480 }
2481 }
2482 } else if heap_asc.len() < limit {
2483 heap_asc.push((key, id, data.to_vec()));
2484 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2485 if key < *top_key {
2486 heap_asc.pop();
2487 heap_asc.push((key, id, data.to_vec()));
2488 }
2489 }
2490 })
2491 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2492
2493 let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2494 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2495 } else {
2496 heap_asc.into_iter().collect()
2497 };
2498 if descending {
2499 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2500 } else {
2501 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2502 }
2503 drained.into_iter().map(|(_, _, d)| d).collect()
2504 }
2505 TypeId::Float => {
2506 // Novel angle: rather than introducing a `TotalF64` newtype
2507 // with `Ord via total_cmp`, transform the f64 bit pattern
2508 // into a sortable `u64` so `BinaryHeap<u64>` orders exactly
2509 // like `f64::total_cmp` would. Classic trick: flip the sign
2510 // bit on positives, flip all bits on negatives. Result:
2511 // - NaN (sign=0) stays greatest, matching total_cmp
2512 // - -0.0 sorts before +0.0, matching total_cmp
2513 // - Hot loop is branch-cheap (one compare + one xor)
2514 let mut seq: u64 = 0;
2515 let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2516 BinaryHeap::with_capacity(limit);
2517 let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2518 BinaryHeap::with_capacity(limit);
2519
2520 self.catalog
2521 .for_each_row_raw(table, |_rid, data| {
2522 if let Some(ref pred) = compiled_pred {
2523 if !pred(data) {
2524 return;
2525 }
2526 }
2527 if data.len() < sort_data_offset + 8 {
2528 return;
2529 }
2530 let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2531 if is_null {
2532 return;
2533 }
2534 let bits = u64::from_le_bytes(
2535 data[sort_data_offset..sort_data_offset + 8]
2536 .try_into()
2537 .unwrap_or_else(|_| unreachable!()),
2538 );
2539 let key = f64_bits_to_sortable_u64(bits);
2540 let id = seq;
2541 seq += 1;
2542
2543 if descending {
2544 if heap_desc.len() < limit {
2545 heap_desc.push(Reverse((key, id, data.to_vec())));
2546 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2547 if key > *top_key {
2548 heap_desc.pop();
2549 heap_desc.push(Reverse((key, id, data.to_vec())));
2550 }
2551 }
2552 } else if heap_asc.len() < limit {
2553 heap_asc.push((key, id, data.to_vec()));
2554 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2555 if key < *top_key {
2556 heap_asc.pop();
2557 heap_asc.push((key, id, data.to_vec()));
2558 }
2559 }
2560 })
2561 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2562
2563 let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2564 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2565 } else {
2566 heap_asc.into_iter().collect()
2567 };
2568 if descending {
2569 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2570 } else {
2571 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2572 }
2573 drained.into_iter().map(|(_, _, d)| d).collect()
2574 }
2575 _ => unreachable!("type guard above restricts to Int/Float"),
2576 };
2577
2578 let rows: Vec<Vec<Value>> = drained
2579 .into_iter()
2580 .map(|data| {
2581 proj_indices
2582 .iter()
2583 .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2584 .collect()
2585 })
2586 .collect();
2587
2588 Ok(Some(QueryResult::Rows {
2589 columns: proj_columns,
2590 rows,
2591 }))
2592 }
2593
2594 /// Gather the RowIds that a mutation should operate on, without
2595 /// materialising the full row set. Handles the shapes the planner emits
2596 /// for update/delete: SeqScan, IndexScan, and Filter(SeqScan). Other
2597 /// shapes fall back to `generic_rid_match`.
2598 ///
2599 /// Perf sprint: try to fuse the predicate evaluation and in-place
2600 /// byte-level mutation into a single heap walk. Returns `Some(result)`
2601 /// if the fused path fired, `None` to fall through to the generic
2602 /// two-pass code.
2603 ///
2604 /// Covers two shapes:
2605 /// 1. Fixed-width non-null literal assignments on non-indexed columns
2606 /// → byte-patch every matched row in place (row length unchanged).
2607 /// 2. Single var-col literal assignment on a non-indexed column
2608 /// → `patch_var_column_in_place` on every matched row (may shrink);
2609 /// rows that can't be patched in place are collected for fallback.
2610 fn try_fused_scan_update(
2611 &mut self,
2612 table: &str,
2613 predicate: &Expr,
2614 resolved: &[(usize, Value)],
2615 changed_cols: &[usize],
2616 ) -> Option<Result<QueryResult, QueryError>> {
2617 // Build compiled predicate. Requires a schema borrow that must be
2618 // dropped before we call scan_patch_matching_logged.
2619 let compiled = {
2620 let schema = self.catalog.schema(table)?;
2621 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2622 let fast = FastLayout::new(schema);
2623 compile_predicate(predicate, &columns, &fast, schema)?
2624 };
2625
2626 // ── Path 1: fixed-width fast patch ──────────────────────────
2627 let fixed_patches: Option<Vec<FastPatch>> = {
2628 let tbl = self.catalog.get_table(table)?;
2629 let schema = &tbl.schema;
2630 let all_fixed_nonnull = resolved
2631 .iter()
2632 .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2633 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2634 if all_fixed_nonnull && no_indexed {
2635 let layout = RowLayout::new(schema);
2636 let bitmap_size = layout.bitmap_size();
2637 Some(
2638 resolved
2639 .iter()
2640 .map(|(idx, val)| {
2641 let fixed_off = layout
2642 .fixed_offset(*idx)
2643 .expect("is_fixed_size already checked");
2644 let field_off = 2 + bitmap_size + fixed_off;
2645 let bytes: FixedBytes = match val {
2646 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2647 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2648 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2649 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2650 Value::Uuid(v) => FixedBytes::Uuid(*v),
2651 _ => unreachable!("all_fixed_nonnull guard"),
2652 };
2653 FastPatch {
2654 field_off,
2655 bitmap_byte_off: 2 + idx / 8,
2656 bit_mask: 1u8 << (idx % 8),
2657 bytes,
2658 }
2659 })
2660 .collect(),
2661 )
2662 } else {
2663 None
2664 }
2665 };
2666 if let Some(patches) = fixed_patches {
2667 let result = self
2668 .catalog
2669 .scan_patch_matching_logged(table, compiled, |row| {
2670 for p in &patches {
2671 row[p.bitmap_byte_off] &= !p.bit_mask;
2672 let field_bytes = p.bytes.as_slice();
2673 row[p.field_off..p.field_off + field_bytes.len()]
2674 .copy_from_slice(field_bytes);
2675 }
2676 Some(row.len() as u16)
2677 })
2678 .map_err(|e| e.to_string());
2679 match result {
2680 Ok((count, _)) => {
2681 self.view_registry.mark_dependents_dirty(table);
2682 return Some(Ok(QueryResult::Modified(count)));
2683 }
2684 Err(e) => return Some(Err(QueryError::Execution(e))),
2685 }
2686 }
2687
2688 // ── Path 2: single var-col shrink fast patch ────────────────
2689 let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2690 let tbl = self.catalog.get_table(table)?;
2691 let schema = &tbl.schema;
2692 let is_single = resolved.len() == 1;
2693 let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2694 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2695 if is_single && is_var && no_indexed {
2696 let (idx, val) = &resolved[0];
2697 let bytes_opt = match val {
2698 Value::Str(s) => Some(s.as_bytes().to_vec()),
2699 Value::Bytes(b) => Some(b.clone()),
2700 Value::Empty => None,
2701 _ => return None, // type mismatch, fall through
2702 };
2703 Some((*idx, bytes_opt))
2704 } else {
2705 None
2706 }
2707 };
2708 if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2709 // Build a fresh RowLayout before the mutable borrow.
2710 let layout = {
2711 let schema = self.catalog.schema(table)?;
2712 RowLayout::new(schema)
2713 };
2714 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2715 let result = self
2716 .catalog
2717 .scan_patch_matching_logged(table, compiled, |row| {
2718 patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2719 })
2720 .map_err(|e| e.to_string());
2721 match result {
2722 Ok((mut count, fallback_rids)) => {
2723 // Handle rows where in-place patch failed (new > old).
2724 for rid in fallback_rids {
2725 let mut row = match self.catalog.get(table, rid) {
2726 Some(r) => r,
2727 None => continue,
2728 };
2729 for (idx, val) in resolved.iter() {
2730 row[*idx] = val.clone();
2731 }
2732 if let Err(e) =
2733 self.catalog
2734 .update_hinted(table, rid, &row, Some(changed_cols))
2735 {
2736 return Some(Err(QueryError::StorageError(e.to_string())));
2737 }
2738 count += 1;
2739 }
2740 self.view_registry.mark_dependents_dirty(table);
2741 return Some(Ok(QueryResult::Modified(count)));
2742 }
2743 Err(e) => return Some(Err(QueryError::Execution(e))),
2744 }
2745 }
2746
2747 None // no fused path applicable — fall through
2748 }
2749
2750 /// Mission C Phase 3: schema is looked up via `self.catalog.schema(table)`
2751 /// inside the branches that actually need it. Previously the caller had
2752 /// to clone the full Schema (6+ String allocs) before every mutation just
2753 /// so this function could borrow it — a cost the update/delete hot path
2754 /// did not need.
2755 fn collect_rids_for_mutation(
2756 &mut self,
2757 input: &PlanNode,
2758 table: &str,
2759 ) -> Result<Vec<RowId>, QueryError> {
2760 match input {
2761 PlanNode::SeqScan { table: t } if t == table => {
2762 // "Update/delete everything" — rare but legal.
2763 let rids: Vec<RowId> = self
2764 .catalog
2765 .scan(table)
2766 .map_err(|e| QueryError::StorageError(e.to_string()))?
2767 .map(|(rid, _)| rid)
2768 .collect();
2769 Ok(rids)
2770 }
2771 PlanNode::IndexScan {
2772 table: t,
2773 column,
2774 key,
2775 } if t == table => {
2776 let key_value = literal_to_value(key)?;
2777
2778 // Indexed case: single lookup, 0 or 1 rows.
2779 // Mission D7: int-specialized fast path on int-keyed indexes
2780 // (primary keys, created_at, etc.) — the common case for
2781 // `update_by_pk` / `delete where id = ?`.
2782 //
2783 // Scope the `tbl` borrow so it's released before we fall
2784 // through to the scan-based paths below (which reborrow
2785 // `self.catalog`).
2786 {
2787 let tbl = self
2788 .catalog
2789 .get_table(table)
2790 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2791 if tbl.has_index(column) {
2792 let rids = tbl.index_lookup_all(column, &key_value);
2793 return Ok(rids);
2794 }
2795 }
2796
2797 // No index: the planner folds `.col = literal` to IndexScan
2798 // regardless of whether the column is actually unique. When
2799 // there's no index we must behave like Filter(SeqScan) and
2800 // return *all* matching RIDs — not just the first one.
2801 let schema = self
2802 .catalog
2803 .schema(table)
2804 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2805 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2806 let fast = FastLayout::new(schema);
2807 let synth = Expr::BinaryOp(
2808 Box::new(Expr::Field(column.clone())),
2809 BinOp::Eq,
2810 Box::new(key.clone()),
2811 );
2812 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2813 // Mission F: skip the first 4 Vec doublings.
2814 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2815 self.catalog
2816 .for_each_row_raw(table, |rid, data| {
2817 if compiled(data) {
2818 rids.push(rid);
2819 }
2820 })
2821 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2822 return Ok(rids);
2823 }
2824
2825 // Fallback: decode each row, compare values.
2826 let col_idx =
2827 schema
2828 .column_index(column)
2829 .ok_or_else(|| QueryError::ColumnNotFound {
2830 table: String::new(),
2831 column: column.clone(),
2832 })?;
2833 let rids: Vec<RowId> = self
2834 .catalog
2835 .scan(table)
2836 .map_err(|e| QueryError::StorageError(e.to_string()))?
2837 .filter_map(|(rid, row)| {
2838 if row[col_idx] == key_value {
2839 Some(rid)
2840 } else {
2841 None
2842 }
2843 })
2844 .collect();
2845 Ok(rids)
2846 }
2847 PlanNode::Filter {
2848 input: inner,
2849 predicate,
2850 } => {
2851 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2852 if t != table {
2853 return self.generic_rid_match(input, table);
2854 }
2855 let schema = self
2856 .catalog
2857 .schema(table)
2858 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2859 let columns: Vec<String> =
2860 schema.columns.iter().map(|c| c.name.clone()).collect();
2861 let fast = FastLayout::new(schema);
2862 let row_layout = RowLayout::new(schema);
2863
2864 // Try compiled predicate first.
2865 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2866 // Mission F: skip the first 4 Vec doublings.
2867 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2868 self.catalog
2869 .for_each_row_raw(table, |rid, data| {
2870 if compiled(data) {
2871 rids.push(rid);
2872 }
2873 })
2874 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2875 return Ok(rids);
2876 }
2877
2878 // Fallback: selective decode + eval.
2879 let pred_cols = predicate_column_indices(predicate, &columns);
2880 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2881 self.catalog
2882 .for_each_row_raw(table, |rid, data| {
2883 let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2884 if eval_predicate(predicate, &pred_row, &columns) {
2885 rids.push(rid);
2886 }
2887 })
2888 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2889 return Ok(rids);
2890 }
2891 self.generic_rid_match(input, table)
2892 }
2893 _ => self.generic_rid_match(input, table),
2894 }
2895 }
2896
2897 /// Last-ditch generic match: execute the plan, collect matching rows,
2898 /// then find corresponding RowIds by value equality. This is the old
2899 /// O(N*M) code path; only used when the plan shape is something exotic.
2900 fn generic_rid_match(
2901 &mut self,
2902 input: &PlanNode,
2903 table: &str,
2904 ) -> Result<Vec<RowId>, QueryError> {
2905 let result = self.execute_plan(input)?;
2906 let rows = match result {
2907 QueryResult::Rows { rows, .. } => rows,
2908 _ => return Err("mutation source must be rows".into()),
2909 };
2910 let matching: Vec<RowId> = self
2911 .catalog
2912 .scan(table)
2913 .map_err(|e| QueryError::StorageError(e.to_string()))?
2914 .filter(|(_, row)| rows.iter().any(|r| r == row))
2915 .map(|(rid, _)| rid)
2916 .collect();
2917 Ok(matching)
2918 }
2919}
2920
2921pub(super) fn execute_window(
2922 result: QueryResult,
2923 windows: &[WindowDef],
2924) -> Result<QueryResult, QueryError> {
2925 let (mut columns, mut rows) = match result {
2926 QueryResult::Rows { columns, rows } => (columns, rows),
2927 _ => return Err("window function requires row input".into()),
2928 };
2929
2930 for wdef in windows {
2931 // Resolve partition/order column indices against current columns.
2932 let part_indices: Vec<usize> = wdef
2933 .partition_by
2934 .iter()
2935 .map(|name| {
2936 columns
2937 .iter()
2938 .position(|c| c == name)
2939 .ok_or_else(|| format!("window partition column '{name}' not found"))
2940 })
2941 .collect::<Result<Vec<_>, _>>()?;
2942
2943 let ord_indices: Vec<(usize, bool)> = wdef
2944 .order_by
2945 .iter()
2946 .map(|sk| {
2947 columns
2948 .iter()
2949 .position(|c| c == &sk.field)
2950 .map(|i| (i, sk.descending))
2951 .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2952 })
2953 .collect::<Result<Vec<_>, _>>()?;
2954
2955 // Resolve the argument column index (for aggregate windows).
2956 let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2957 match arg {
2958 Expr::Field(name) => {
2959 if name == "*" {
2960 None // count(*) style — no specific column
2961 } else {
2962 Some(
2963 columns
2964 .iter()
2965 .position(|c| c == name)
2966 .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2967 )
2968 }
2969 }
2970 _ => None,
2971 }
2972 } else {
2973 None
2974 };
2975
2976 // Build a sort-index to sort rows by partition_by then order_by
2977 // without actually reordering the original Vec (we need original
2978 // order to write results back).
2979 let n = rows.len();
2980 let mut indices: Vec<usize> = (0..n).collect();
2981 indices.sort_by(|&a, &b| {
2982 // Compare partition keys first.
2983 for &pi in &part_indices {
2984 let cmp = rows[a][pi].cmp(&rows[b][pi]);
2985 if cmp != std::cmp::Ordering::Equal {
2986 return cmp;
2987 }
2988 }
2989 // Then order keys.
2990 for &(oi, desc) in &ord_indices {
2991 let cmp = rows[a][oi].cmp(&rows[b][oi]);
2992 if cmp != std::cmp::Ordering::Equal {
2993 return if desc { cmp.reverse() } else { cmp };
2994 }
2995 }
2996 std::cmp::Ordering::Equal
2997 });
2998
2999 // SQL window-frame semantics: with no `order` clause the frame for an
3000 // aggregate window is the ENTIRE partition, not the running prefix.
3001 // The loop below computes running values; for the no-order case we
3002 // back-fill every row of a partition with the partition's final
3003 // (i.e. complete) aggregate once its boundary is reached. Ranking
3004 // functions are untouched — row_number/rank/dense_rank are inherently
3005 // positional.
3006 let whole_partition_frame = wdef.order_by.is_empty()
3007 && matches!(
3008 wdef.function,
3009 WindowFunc::Sum
3010 | WindowFunc::Avg
3011 | WindowFunc::Count
3012 | WindowFunc::Min
3013 | WindowFunc::Max
3014 );
3015 // Original row indices of the partition currently being scanned
3016 // (only tracked when back-filling is needed).
3017 let mut partition_row_indices: Vec<usize> = Vec::new();
3018
3019 // Compute window values in sorted order, tracking partition boundaries.
3020 let mut win_values: Vec<Value> = vec![Value::Empty; n];
3021 let mut partition_start = 0usize;
3022 // Running state for aggregate windows:
3023 let mut running_count: i64 = 0;
3024 let mut running_int_sum: i64 = 0;
3025 let mut running_float_sum: f64 = 0.0;
3026 let mut running_saw_float = false;
3027 let mut running_min: Option<Value> = None;
3028 let mut running_max: Option<Value> = None;
3029 let mut rank_counter: i64 = 0;
3030 let mut dense_rank_counter: i64 = 0;
3031 let mut prev_order_key: Option<Vec<Value>> = None;
3032 let mut same_rank_count: i64 = 0;
3033
3034 for sorted_pos in 0..n {
3035 let row_idx = indices[sorted_pos];
3036
3037 // Detect partition boundary.
3038 let new_partition = if sorted_pos == 0 {
3039 true
3040 } else {
3041 let prev_row_idx = indices[sorted_pos - 1];
3042 part_indices
3043 .iter()
3044 .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
3045 };
3046
3047 if new_partition {
3048 // No-order aggregate frame: the partition that just ended is
3049 // complete, so its final running value IS the whole-partition
3050 // aggregate. Back-fill it onto every row of that partition.
3051 if whole_partition_frame && sorted_pos > 0 {
3052 let final_v = win_values[indices[sorted_pos - 1]].clone();
3053 for ri in partition_row_indices.drain(..) {
3054 win_values[ri] = final_v.clone();
3055 }
3056 }
3057 partition_start = sorted_pos;
3058 running_count = 0;
3059 running_int_sum = 0;
3060 running_float_sum = 0.0;
3061 running_saw_float = false;
3062 running_min = None;
3063 running_max = None;
3064 rank_counter = 0;
3065 dense_rank_counter = 0;
3066 prev_order_key = None;
3067 same_rank_count = 0;
3068 }
3069
3070 // Extract current order key for rank tracking.
3071 let current_order_key: Vec<Value> = ord_indices
3072 .iter()
3073 .map(|&(oi, _)| rows[row_idx][oi].clone())
3074 .collect();
3075 let same_as_prev = prev_order_key.as_ref() == Some(¤t_order_key);
3076
3077 let value = match wdef.function {
3078 WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
3079 WindowFunc::Rank => {
3080 if same_as_prev {
3081 same_rank_count += 1;
3082 } else {
3083 rank_counter += same_rank_count + 1;
3084 same_rank_count = 0;
3085 if rank_counter == 0 {
3086 rank_counter = 1;
3087 }
3088 }
3089 Value::Int(rank_counter)
3090 }
3091 WindowFunc::DenseRank => {
3092 if !same_as_prev {
3093 dense_rank_counter += 1;
3094 }
3095 Value::Int(dense_rank_counter)
3096 }
3097 WindowFunc::Sum => {
3098 if let Some(ci) = arg_col_idx {
3099 match &rows[row_idx][ci] {
3100 Value::Int(v) => running_int_sum += v,
3101 Value::Float(v) => {
3102 running_float_sum += v;
3103 running_saw_float = true;
3104 }
3105 _ => {}
3106 }
3107 }
3108 if running_saw_float {
3109 Value::Float(running_float_sum + running_int_sum as f64)
3110 } else {
3111 Value::Int(running_int_sum)
3112 }
3113 }
3114 WindowFunc::Avg => {
3115 if let Some(ci) = arg_col_idx {
3116 match &rows[row_idx][ci] {
3117 Value::Int(v) => {
3118 running_float_sum += *v as f64;
3119 running_count += 1;
3120 }
3121 Value::Float(v) => {
3122 running_float_sum += v;
3123 running_count += 1;
3124 }
3125 _ => {}
3126 }
3127 }
3128 if running_count == 0 {
3129 Value::Empty
3130 } else {
3131 Value::Float(running_float_sum / running_count as f64)
3132 }
3133 }
3134 WindowFunc::Count => {
3135 if let Some(ci) = arg_col_idx {
3136 if !rows[row_idx][ci].is_empty() {
3137 running_count += 1;
3138 }
3139 } else {
3140 // count(*) — count all rows
3141 running_count += 1;
3142 }
3143 Value::Int(running_count)
3144 }
3145 WindowFunc::Min => {
3146 if let Some(ci) = arg_col_idx {
3147 let v = &rows[row_idx][ci];
3148 if !v.is_empty() {
3149 running_min = Some(match &running_min {
3150 None => v.clone(),
3151 Some(cur) => {
3152 if v < cur {
3153 v.clone()
3154 } else {
3155 cur.clone()
3156 }
3157 }
3158 });
3159 }
3160 }
3161 running_min.clone().unwrap_or(Value::Empty)
3162 }
3163 WindowFunc::Max => {
3164 if let Some(ci) = arg_col_idx {
3165 let v = &rows[row_idx][ci];
3166 if !v.is_empty() {
3167 running_max = Some(match &running_max {
3168 None => v.clone(),
3169 Some(cur) => {
3170 if v > cur {
3171 v.clone()
3172 } else {
3173 cur.clone()
3174 }
3175 }
3176 });
3177 }
3178 }
3179 running_max.clone().unwrap_or(Value::Empty)
3180 }
3181 };
3182
3183 prev_order_key = Some(current_order_key);
3184 win_values[row_idx] = value;
3185 if whole_partition_frame {
3186 partition_row_indices.push(row_idx);
3187 }
3188 }
3189
3190 // Back-fill the final partition (the loop only flushes at boundaries).
3191 if whole_partition_frame && n > 0 {
3192 let final_v = win_values[indices[n - 1]].clone();
3193 for ri in partition_row_indices.drain(..) {
3194 win_values[ri] = final_v.clone();
3195 }
3196 }
3197
3198 // Append the computed window column to each row.
3199 for (ri, row) in rows.iter_mut().enumerate() {
3200 row.push(win_values[ri].clone());
3201 }
3202 columns.push(wdef.output_name.clone());
3203 }
3204
3205 Ok(QueryResult::Rows { columns, rows })
3206}
3207
3208/// Mission E2b: compute one aggregate over a set of rows in a group.
3209pub(super) fn compute_group_aggregate(
3210 func: AggFunc,
3211 all_rows: &[Vec<Value>],
3212 row_indices: &[usize],
3213 col_idx: usize,
3214) -> Value {
3215 match func {
3216 AggFunc::Count => {
3217 if col_idx == usize::MAX {
3218 // count(*) — count all rows in the group.
3219 return Value::Int(row_indices.len() as i64);
3220 }
3221 let count = row_indices
3222 .iter()
3223 .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3224 .count();
3225 Value::Int(count as i64)
3226 }
3227 AggFunc::CountDistinct => {
3228 let mut seen = std::collections::HashSet::new();
3229 for &ri in row_indices {
3230 let v = &all_rows[ri][col_idx];
3231 if !v.is_empty() {
3232 seen.insert(v.clone());
3233 }
3234 }
3235 Value::Int(seen.len() as i64)
3236 }
3237 AggFunc::Sum => {
3238 // Mirror the scalar Sum path: accumulate int and float
3239 // contributions separately and promote the final result to
3240 // Float if any Float row was observed. Prevents silent
3241 // drop of Float columns in GROUP BY aggregates.
3242 let mut int_sum: i64 = 0;
3243 let mut float_sum: f64 = 0.0;
3244 let mut saw_float = false;
3245 for &ri in row_indices {
3246 match &all_rows[ri][col_idx] {
3247 Value::Int(v) => int_sum += v,
3248 Value::Float(v) => {
3249 float_sum += *v;
3250 saw_float = true;
3251 }
3252 _ => {}
3253 }
3254 }
3255 if saw_float {
3256 Value::Float(float_sum + int_sum as f64)
3257 } else {
3258 Value::Int(int_sum)
3259 }
3260 }
3261 AggFunc::Avg => {
3262 let mut sum = 0.0f64;
3263 let mut count = 0usize;
3264 for &ri in row_indices {
3265 match &all_rows[ri][col_idx] {
3266 Value::Int(v) => {
3267 sum += *v as f64;
3268 count += 1;
3269 }
3270 Value::Float(v) => {
3271 sum += *v;
3272 count += 1;
3273 }
3274 _ => {}
3275 }
3276 }
3277 if count == 0 {
3278 Value::Empty
3279 } else {
3280 Value::Float(sum / count as f64)
3281 }
3282 }
3283 AggFunc::Min => row_indices
3284 .iter()
3285 .map(|&ri| &all_rows[ri][col_idx])
3286 .filter(|v| !v.is_empty())
3287 .min()
3288 .cloned()
3289 .unwrap_or(Value::Empty),
3290 AggFunc::Max => row_indices
3291 .iter()
3292 .map(|&ri| &all_rows[ri][col_idx])
3293 .filter(|v| !v.is_empty())
3294 .max()
3295 .cloned()
3296 .unwrap_or(Value::Empty),
3297 }
3298}
3299
3300/// Mission E1.3: try to extract equi-join key indices from a join `on`
3301/// predicate. Returns `Some((left_col_idx, right_col_idx))` when the
3302/// predicate is exactly `L = R` (or `R = L`) and both sides resolve
3303/// cleanly — `L` to the left subtree's column list and `R` to the right
3304/// subtree's column list.
3305///
3306/// This is deliberately narrow. We only recognise the two shapes:
3307/// * `QualifiedField = QualifiedField` (`u.id = o.user_id`)
3308/// * `Field = Field` (`.id = .user_id`, unqualified)
3309///
3310/// Anything else — conjunctions, constants, function calls, or predicates
3311/// that touch the same side on both halves — falls through to the
3312/// nested-loop path unchanged.
3313pub(super) fn try_extract_equi_join_keys(
3314 pred: &Expr,
3315 left_columns: &[String],
3316 right_columns: &[String],
3317) -> Option<(usize, usize)> {
3318 let (lhs, op, rhs) = match pred {
3319 Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3320 _ => return None,
3321 };
3322 if op != BinOp::Eq {
3323 return None;
3324 }
3325 // Normal orientation: lhs in left, rhs in right.
3326 if let (Some(li), Some(ri)) = (
3327 resolve_side_column(lhs, left_columns),
3328 resolve_side_column(rhs, right_columns),
3329 ) {
3330 return Some((li, ri));
3331 }
3332 // Swapped: rhs in left, lhs in right. Both sides of `=` are
3333 // commutative so this is safe.
3334 if let (Some(li), Some(ri)) = (
3335 resolve_side_column(rhs, left_columns),
3336 resolve_side_column(lhs, right_columns),
3337 ) {
3338 return Some((li, ri));
3339 }
3340 None
3341}
3342
3343fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3344 match expr {
3345 Expr::QualifiedField { qualifier, field } => {
3346 // Byte-level match so we don't allocate a fresh `format!` on
3347 // every call — this runs once per plan, so allocation would be
3348 // cheap, but the match is trivial enough to keep inline with
3349 // the eval_expr version.
3350 let q = qualifier.as_bytes();
3351 let f = field.as_bytes();
3352 columns.iter().position(|c| {
3353 let b = c.as_bytes();
3354 b.len() == q.len() + 1 + f.len()
3355 && b[..q.len()] == *q
3356 && b[q.len()] == b'.'
3357 && b[q.len() + 1..] == *f
3358 })
3359 }
3360 Expr::Field(name) => columns.iter().position(|c| c == name),
3361 _ => None,
3362 }
3363}
3364
3365/// Mission E1.3: O(L + R) hash join. Builds a `FxHashMap<Value, Vec<usize>>`
3366/// over the right (inner) side's join keys, then streams the left (outer)
3367/// side and for each probe row emits every combined row whose right-side
3368/// key matches. For `JoinKind::LeftOuter`, unmatched left rows are emitted
3369/// padded with `Value::Empty` on the right side.
3370///
3371/// The right side is always the build side. That choice is forced for
3372/// LeftOuter (the left side must stream so we can detect orphans), and
3373/// for Inner it's a reasonable default — left-deep plans tend to grow the
3374/// left side with each join, so the un-joined right leaf is often the
3375/// smaller of the two at each level.
3376pub(super) fn hash_join(
3377 left_columns: Vec<String>,
3378 left_rows: Vec<Vec<Value>>,
3379 right_columns: Vec<String>,
3380 right_rows: Vec<Vec<Value>>,
3381 left_key_idx: usize,
3382 right_key_idx: usize,
3383 kind: JoinKind,
3384) -> QueryResult {
3385 use rustc_hash::FxHashMap;
3386
3387 let n_left = left_columns.len();
3388 let n_right = right_columns.len();
3389 let mut columns = Vec::with_capacity(n_left + n_right);
3390 columns.extend(left_columns);
3391 columns.extend(right_columns);
3392
3393 // Build: right_key -> list of right-row indices. Pre-size to the row
3394 // count so the map doesn't rehash mid-build.
3395 let mut build: FxHashMap<Value, Vec<usize>> =
3396 FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3397 for (i, row) in right_rows.iter().enumerate() {
3398 // Skip Empty keys on the build side — they can never match under
3399 // SQL semantics (NULL ≠ NULL) and would collapse all nullables to
3400 // one bucket.
3401 if matches!(row[right_key_idx], Value::Empty) {
3402 continue;
3403 }
3404 build.entry(row[right_key_idx].clone()).or_default().push(i);
3405 }
3406
3407 // Reasonable starting capacity — inner joins produce ≥ left_rows.len()
3408 // rows in the common 1:1 case, left-outer always emits ≥ left_rows.len().
3409 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3410
3411 for left_row in &left_rows {
3412 let key = &left_row[left_key_idx];
3413 let matched = if matches!(key, Value::Empty) {
3414 None
3415 } else {
3416 build.get(key)
3417 };
3418 match matched {
3419 Some(matches) if !matches.is_empty() => {
3420 for &ri in matches {
3421 let right_row = &right_rows[ri];
3422 let mut combined = Vec::with_capacity(n_left + n_right);
3423 combined.extend_from_slice(left_row);
3424 combined.extend_from_slice(right_row);
3425 rows.push(combined);
3426 }
3427 }
3428 _ => {
3429 if matches!(kind, JoinKind::LeftOuter) {
3430 let mut row = Vec::with_capacity(n_left + n_right);
3431 row.extend_from_slice(left_row);
3432 row.resize(n_left + n_right, Value::Empty);
3433 rows.push(row);
3434 }
3435 }
3436 }
3437 }
3438
3439 QueryResult::Rows { columns, rows }
3440}
3441
3442/// Lower unindexed `RangeScan` and `IndexScan` nodes to `Filter(SeqScan)`
3443/// so that all downstream fast paths (count, project+limit, sort+limit,
3444/// agg, update, delete) continue to fire.
3445///
3446/// The planner emits `RangeScan` (for `.age > 30`) and `IndexScan` (for
3447/// `.email = lit`) speculatively because it has no catalog access. When
3448/// the column has a B-tree index, those plans are correct. When it
3449/// doesn't, the executor's fallbacks materialise every matching row with
3450/// full `decode_row` — bypassing the compiled-predicate fast paths that
3451/// `Filter(SeqScan)` would trigger. Lowering both speculative leaf kinds
3452/// also keeps EXPLAIN honest: it prints the plan that actually runs.
3453///
3454/// This pass runs once per query, before execution.
3455pub(super) fn lower_unindexed_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3456 match plan {
3457 PlanNode::RangeScan {
3458 table,
3459 column,
3460 start,
3461 end,
3462 } => {
3463 if let Some(tbl) = catalog.get_table(table) {
3464 // Keep RangeScan whenever ANY index exists on the column:
3465 // unique indexes store raw column values, non-unique indexes
3466 // store composite (value, rid) keys that the executor walks
3467 // natively via BTree::range_rids. Only lower to Filter(SeqScan)
3468 // when the column is unindexed.
3469 if tbl.has_index(column) {
3470 return plan.clone();
3471 }
3472 }
3473 let pred = synthesize_range_predicate(column, start, end);
3474 PlanNode::Filter {
3475 input: Box::new(PlanNode::SeqScan {
3476 table: table.clone(),
3477 }),
3478 predicate: pred,
3479 }
3480 }
3481 PlanNode::Filter { input, predicate } => PlanNode::Filter {
3482 input: Box::new(lower_unindexed_scans(catalog, input)),
3483 predicate: predicate.clone(),
3484 },
3485 PlanNode::Project { input, fields } => PlanNode::Project {
3486 input: Box::new(lower_unindexed_scans(catalog, input)),
3487 fields: fields.clone(),
3488 },
3489 PlanNode::Sort { input, keys } => PlanNode::Sort {
3490 input: Box::new(lower_unindexed_scans(catalog, input)),
3491 keys: keys.clone(),
3492 },
3493 PlanNode::Limit { input, count } => PlanNode::Limit {
3494 input: Box::new(lower_unindexed_scans(catalog, input)),
3495 count: count.clone(),
3496 },
3497 PlanNode::Offset { input, count } => PlanNode::Offset {
3498 input: Box::new(lower_unindexed_scans(catalog, input)),
3499 count: count.clone(),
3500 },
3501 PlanNode::Aggregate {
3502 input,
3503 function,
3504 field,
3505 } => PlanNode::Aggregate {
3506 input: Box::new(lower_unindexed_scans(catalog, input)),
3507 function: *function,
3508 field: field.clone(),
3509 },
3510 PlanNode::Distinct { input } => PlanNode::Distinct {
3511 input: Box::new(lower_unindexed_scans(catalog, input)),
3512 },
3513 PlanNode::GroupBy {
3514 input,
3515 keys,
3516 aggregates,
3517 having,
3518 } => PlanNode::GroupBy {
3519 input: Box::new(lower_unindexed_scans(catalog, input)),
3520 keys: keys.clone(),
3521 aggregates: aggregates.clone(),
3522 having: having.clone(),
3523 },
3524 PlanNode::Update {
3525 input,
3526 table,
3527 assignments,
3528 } => PlanNode::Update {
3529 input: Box::new(lower_unindexed_scans(catalog, input)),
3530 table: table.clone(),
3531 assignments: assignments.clone(),
3532 },
3533 PlanNode::Delete { input, table } => PlanNode::Delete {
3534 input: Box::new(lower_unindexed_scans(catalog, input)),
3535 table: table.clone(),
3536 },
3537 PlanNode::Window { input, windows } => PlanNode::Window {
3538 input: Box::new(lower_unindexed_scans(catalog, input)),
3539 windows: windows.clone(),
3540 },
3541 PlanNode::Union { left, right, all } => PlanNode::Union {
3542 left: Box::new(lower_unindexed_scans(catalog, left)),
3543 right: Box::new(lower_unindexed_scans(catalog, right)),
3544 all: *all,
3545 },
3546 PlanNode::Explain { input } => PlanNode::Explain {
3547 input: Box::new(lower_unindexed_scans(catalog, input)),
3548 },
3549 PlanNode::NestedLoopJoin {
3550 left,
3551 right,
3552 on,
3553 kind,
3554 } => PlanNode::NestedLoopJoin {
3555 left: Box::new(lower_unindexed_scans(catalog, left)),
3556 right: Box::new(lower_unindexed_scans(catalog, right)),
3557 on: on.clone(),
3558 kind: *kind,
3559 },
3560 PlanNode::IndexScan { table, column, key } => {
3561 if let Some(tbl) = catalog.get_table(table) {
3562 if tbl.has_index(column) {
3563 return plan.clone();
3564 }
3565 }
3566 PlanNode::Filter {
3567 input: Box::new(PlanNode::SeqScan {
3568 table: table.clone(),
3569 }),
3570 predicate: Expr::BinaryOp(
3571 Box::new(Expr::Field(column.clone())),
3572 BinOp::Eq,
3573 Box::new(key.clone()),
3574 ),
3575 }
3576 }
3577 // Leaf nodes: no children to recurse into.
3578 _ => plan.clone(),
3579 }
3580}
3581
3582/// Synthesize a range predicate from RangeScan bounds for the fallback path.
3583pub(super) fn synthesize_range_predicate(
3584 column: &str,
3585 start: &Option<(Expr, bool)>,
3586 end: &Option<(Expr, bool)>,
3587) -> Expr {
3588 let lower = start.as_ref().map(|(expr, inclusive)| {
3589 let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3590 Expr::BinaryOp(
3591 Box::new(Expr::Field(column.to_string())),
3592 op,
3593 Box::new(expr.clone()),
3594 )
3595 });
3596 let upper = end.as_ref().map(|(expr, inclusive)| {
3597 let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3598 Expr::BinaryOp(
3599 Box::new(Expr::Field(column.to_string())),
3600 op,
3601 Box::new(expr.clone()),
3602 )
3603 });
3604 match (lower, upper) {
3605 (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3606 (Some(l), None) => l,
3607 (None, Some(u)) => u,
3608 (None, None) => Expr::Literal(Literal::Bool(true)),
3609 }
3610}
3611
3612/// Check if a value falls within a range (used in last-resort decoded-row eval).
3613pub(super) fn range_matches(
3614 val: &Value,
3615 start: &Option<Value>,
3616 start_inc: bool,
3617 end: &Option<Value>,
3618 end_inc: bool,
3619) -> bool {
3620 if let Some(ref s) = start {
3621 if start_inc {
3622 if val < s {
3623 return false;
3624 }
3625 } else if val <= s {
3626 return false;
3627 }
3628 }
3629 if let Some(ref e) = end {
3630 if end_inc {
3631 if val > e {
3632 return false;
3633 }
3634 } else if val >= e {
3635 return false;
3636 }
3637 }
3638 true
3639}
3640
3641/// Format a `PlanNode` tree as a human-readable, indented text
3642/// representation. Used by the `EXPLAIN` command.
3643pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3644 let indent = " ".repeat(depth);
3645 match plan {
3646 PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3647 PlanNode::AliasScan { table, alias } => {
3648 format!("{indent}AliasScan table={table} alias={alias}")
3649 }
3650 PlanNode::IndexScan { table, column, key } => {
3651 format!("{indent}IndexScan table={table} column={column} key={key:?}")
3652 }
3653 PlanNode::RangeScan {
3654 table,
3655 column,
3656 start,
3657 end,
3658 } => {
3659 let s = match start {
3660 Some((expr, inc)) => {
3661 let op = if *inc { ">=" } else { ">" };
3662 format!("{op}{expr:?}")
3663 }
3664 None => "unbounded".to_string(),
3665 };
3666 let e = match end {
3667 Some((expr, inc)) => {
3668 let op = if *inc { "<=" } else { "<" };
3669 format!("{op}{expr:?}")
3670 }
3671 None => "unbounded".to_string(),
3672 };
3673 format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3674 }
3675 PlanNode::Filter { input, predicate } => {
3676 let child = format_plan_tree(input, depth + 1);
3677 format!("{indent}Filter predicate={predicate:?}\n{child}")
3678 }
3679 PlanNode::Project { input, fields } => {
3680 let names: Vec<String> = fields
3681 .iter()
3682 .map(|f| match &f.alias {
3683 Some(a) => format!("{a}: {:?}", f.expr),
3684 None => format!("{:?}", f.expr),
3685 })
3686 .collect();
3687 let child = format_plan_tree(input, depth + 1);
3688 format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3689 }
3690 PlanNode::Sort { input, keys } => {
3691 let ks: Vec<String> = keys
3692 .iter()
3693 .map(|k| {
3694 if k.descending {
3695 format!("{} desc", k.field)
3696 } else {
3697 k.field.clone()
3698 }
3699 })
3700 .collect();
3701 let child = format_plan_tree(input, depth + 1);
3702 format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3703 }
3704 PlanNode::Limit { input, count } => {
3705 let child = format_plan_tree(input, depth + 1);
3706 format!("{indent}Limit count={count:?}\n{child}")
3707 }
3708 PlanNode::Offset { input, count } => {
3709 let child = format_plan_tree(input, depth + 1);
3710 format!("{indent}Offset count={count:?}\n{child}")
3711 }
3712 PlanNode::Aggregate {
3713 input,
3714 function,
3715 field,
3716 } => {
3717 let f = field.as_deref().unwrap_or("*");
3718 let child = format_plan_tree(input, depth + 1);
3719 format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3720 }
3721 PlanNode::NestedLoopJoin {
3722 left,
3723 right,
3724 on,
3725 kind,
3726 } => {
3727 let left_child = format_plan_tree(left, depth + 1);
3728 let right_child = format_plan_tree(right, depth + 1);
3729 let on_str = match on {
3730 Some(pred) => format!("{pred:?}"),
3731 None => "none".to_string(),
3732 };
3733 format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3734 }
3735 PlanNode::Distinct { input } => {
3736 let child = format_plan_tree(input, depth + 1);
3737 format!("{indent}Distinct\n{child}")
3738 }
3739 PlanNode::GroupBy {
3740 input,
3741 keys,
3742 aggregates,
3743 having,
3744 } => {
3745 let agg_strs: Vec<String> = aggregates
3746 .iter()
3747 .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3748 .collect();
3749 let having_str = match having {
3750 Some(h) => format!(" having={h:?}"),
3751 None => String::new(),
3752 };
3753 let child = format_plan_tree(input, depth + 1);
3754 format!(
3755 "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3756 keys.join(", "),
3757 agg_strs.join(", "),
3758 )
3759 }
3760 PlanNode::Insert { table, rows } => {
3761 let cols: Vec<&str> = rows
3762 .first()
3763 .map(|r| r.iter().map(|a| a.field.as_str()).collect())
3764 .unwrap_or_default();
3765 format!(
3766 "{indent}Insert table={table} rows={} cols=[{}]",
3767 rows.len(),
3768 cols.join(", ")
3769 )
3770 }
3771 PlanNode::Upsert {
3772 table,
3773 key_column,
3774 assignments,
3775 on_conflict,
3776 } => {
3777 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3778 let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3779 if conflict_cols.is_empty() {
3780 format!(
3781 "{indent}Upsert table={table} key={key_column} cols=[{}]",
3782 cols.join(", ")
3783 )
3784 } else {
3785 format!(
3786 "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3787 cols.join(", "),
3788 conflict_cols.join(", ")
3789 )
3790 }
3791 }
3792 PlanNode::Update {
3793 input,
3794 table,
3795 assignments,
3796 } => {
3797 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3798 let child = format_plan_tree(input, depth + 1);
3799 format!(
3800 "{indent}Update table={table} set=[{}]\n{child}",
3801 cols.join(", ")
3802 )
3803 }
3804 PlanNode::Delete { input, table } => {
3805 let child = format_plan_tree(input, depth + 1);
3806 format!("{indent}Delete table={table}\n{child}")
3807 }
3808 PlanNode::CreateTable { name, fields } => {
3809 let fs: Vec<String> = fields
3810 .iter()
3811 .map(|f| {
3812 let mut mods = String::new();
3813 if f.required {
3814 mods.push_str(" required");
3815 }
3816 if f.unique {
3817 mods.push_str(" unique");
3818 }
3819 format!("{}: {}{mods}", f.name, f.type_name)
3820 })
3821 .collect();
3822 format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3823 }
3824 PlanNode::AlterTable { table, action } => {
3825 format!("{indent}AlterTable table={table} action={action:?}")
3826 }
3827 PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3828 PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3829 PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3830 PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3831 PlanNode::Window { input, windows } => {
3832 let ws: Vec<String> = windows
3833 .iter()
3834 .map(|w| format!("{:?} as {}", w.function, w.output_name))
3835 .collect();
3836 let child = format_plan_tree(input, depth + 1);
3837 format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3838 }
3839 PlanNode::Union { left, right, all } => {
3840 let kind = if *all { "UNION ALL" } else { "UNION" };
3841 let left_child = format_plan_tree(left, depth + 1);
3842 let right_child = format_plan_tree(right, depth + 1);
3843 format!("{indent}{kind}\n{left_child}\n{right_child}")
3844 }
3845 PlanNode::Explain { input } => {
3846 let child = format_plan_tree(input, depth + 1);
3847 format!("{indent}Explain\n{child}")
3848 }
3849 PlanNode::Begin => format!("{indent}Begin"),
3850 PlanNode::Commit => format!("{indent}Commit"),
3851 PlanNode::Rollback => format!("{indent}Rollback"),
3852 }
3853}