powdb_query/executor/plan_exec.rs
1//! The execute_plan method and associated helpers.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::{check_join_limit, Engine, MAX_SORT_ROWS};
15use powdb_storage::view::{ViewDef, ViewRegistry};
16
17impl Engine {
18 pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
19 match plan {
20 PlanNode::SeqScan { table } => {
21 // Auto-refresh dirty materialized views on read.
22 if self.view_registry.is_dirty(table) {
23 self.refresh_view(table)?;
24 }
25 let schema = self
26 .catalog
27 .schema(table)
28 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
29 .clone();
30 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
31 let rows: Vec<Vec<Value>> = self
32 .catalog
33 .scan(table)
34 .map_err(|e| QueryError::StorageError(e.to_string()))?
35 .map(|(_, row)| row)
36 .collect();
37 Ok(QueryResult::Rows { columns, rows })
38 }
39
40 PlanNode::Filter { input, predicate } => {
41 // Materialize any IN-subqueries in the predicate before the
42 // scan loop — the closure can't call back into the engine.
43 // Correlated subqueries are left in place for per-row eval.
44 let materialized;
45 let predicate = if contains_subquery(predicate) {
46 materialized = self.materialize_subqueries(predicate)?;
47 &materialized
48 } else {
49 predicate
50 };
51
52 // Correlated subquery path: per-row materialisation.
53 if contains_subquery(predicate) {
54 let result = self.execute_plan(input)?;
55 return match result {
56 QueryResult::Rows { columns, rows } => {
57 let mut filtered = Vec::new();
58 for row in rows {
59 let row_pred =
60 self.materialize_correlated_for_row(predicate, &row, &columns)?;
61 if eval_predicate(&row_pred, &row, &columns) {
62 filtered.push(row);
63 }
64 }
65 Ok(QueryResult::Rows {
66 columns,
67 rows: filtered,
68 })
69 }
70 _ => Err("filter requires row input".into()),
71 };
72 }
73
74 // Fast path: fuse Filter + SeqScan into a zero-copy streaming
75 // loop. Uses decode_column() to evaluate the predicate on only
76 // the columns it references, avoiding heap allocations for
77 // String/Bytes columns that aren't part of the filter.
78 if let PlanNode::SeqScan { table } = input.as_ref() {
79 // Auto-refresh dirty materialized views.
80 if self.view_registry.is_dirty(table) {
81 self.refresh_view(table)?;
82 }
83 let schema = self
84 .catalog
85 .schema(table)
86 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
87 .clone();
88 let columns: Vec<String> =
89 schema.columns.iter().map(|c| c.name.clone()).collect();
90 let fast = FastLayout::new(&schema);
91 let row_layout = RowLayout::new(&schema);
92 // Mission F: pre-size to skip the first 4 Vec doublings
93 // (4 → 8 → 16 → 32 → 64). On a 100K-row scan with 30%
94 // selectivity that's ~4 fewer reallocations + memcpys.
95 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
96
97 // Try compiled predicate for the filter check (handles
98 // int leaves, string-eq leaves, and And conjunctions).
99 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
100 self.catalog
101 .for_each_row_raw(table, |_rid, data| {
102 if compiled(data) {
103 rows.push(decode_row(&schema, data));
104 }
105 })
106 .map_err(|e| QueryError::StorageError(e.to_string()))?;
107 } else {
108 let pred_cols = predicate_column_indices(predicate, &columns);
109 self.catalog
110 .for_each_row_raw(table, |_rid, data| {
111 let pred_row =
112 decode_selective(&schema, &row_layout, data, &pred_cols);
113 if eval_predicate(predicate, &pred_row, &columns) {
114 rows.push(decode_row(&schema, data));
115 }
116 })
117 .map_err(|e| QueryError::StorageError(e.to_string()))?;
118 }
119
120 return Ok(QueryResult::Rows { columns, rows });
121 }
122
123 // General path: materialise then filter.
124 let result = self.execute_plan(input)?;
125 match result {
126 QueryResult::Rows { columns, rows } => {
127 let filtered: Vec<Vec<Value>> = rows
128 .into_iter()
129 .filter(|row| eval_predicate(predicate, row, &columns))
130 .collect();
131 Ok(QueryResult::Rows {
132 columns,
133 rows: filtered,
134 })
135 }
136 _ => Err("filter requires row input".into()),
137 }
138 }
139
140 PlanNode::Project { input, fields } => {
141 // Fast path: Project over IndexScan — decode only projected
142 // columns from raw bytes instead of full decode_row.
143 if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
144 let schema = self
145 .catalog
146 .schema(table)
147 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
148 .clone();
149 let all_columns: Vec<String> =
150 schema.columns.iter().map(|c| c.name.clone()).collect();
151 let key_value = literal_to_value(key)?;
152 let tbl = self
153 .catalog
154 .get_table(table)
155 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
156
157 let proj_columns: Vec<String> = fields
158 .iter()
159 .map(|f| {
160 f.alias.clone().unwrap_or_else(|| match &f.expr {
161 Expr::Field(name) => name.clone(),
162 _ => "?".into(),
163 })
164 })
165 .collect();
166
167 // Determine which column indices the projection needs
168 let proj_indices: Vec<usize> = fields
169 .iter()
170 .filter_map(|f| {
171 if let Expr::Field(name) = &f.expr {
172 all_columns.iter().position(|c| c == name)
173 } else {
174 None
175 }
176 })
177 .collect();
178
179 if tbl.has_index(column) {
180 let layout = RowLayout::new(&schema);
181 let rids = tbl.index_lookup_all(column, &key_value);
182 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
183 for rid in rids {
184 if let Some(data) = tbl.heap.get(rid) {
185 let row: Vec<Value> = proj_indices
186 .iter()
187 .map(|&ci| decode_column(&schema, &layout, &data, ci))
188 .collect();
189 rows.push(row);
190 }
191 }
192 return Ok(QueryResult::Rows {
193 columns: proj_columns,
194 rows,
195 });
196 }
197 }
198
199 // Fast path: Project(Limit(Sort(Filter(SeqScan)))) — bounded
200 // top-N heap. Decodes only the sort key + projected columns,
201 // keeps at most `limit` rows in a heap. Also handles the
202 // Project(Limit(Sort(SeqScan))) variant (no filter).
203 if let PlanNode::Limit {
204 input: inner,
205 count: limit_expr,
206 } = input.as_ref()
207 {
208 if let PlanNode::Sort {
209 input: sort_input,
210 keys,
211 } = inner.as_ref()
212 {
213 // Fast path only for single-key sorts
214 if keys.len() == 1 {
215 let sort_field = &keys[0].field;
216 let descending = keys[0].descending;
217 let limit = match limit_expr {
218 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
219 _ => usize::MAX,
220 };
221 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
222 match sort_input.as_ref() {
223 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
224 PlanNode::Filter {
225 input: fi,
226 predicate,
227 } => {
228 if let PlanNode::SeqScan { table } = fi.as_ref() {
229 (Some(table.as_str()), Some(predicate))
230 } else {
231 (None, None)
232 }
233 }
234 _ => (None, None),
235 };
236 if let Some(table) = table_opt {
237 if let Some(result) = self.project_filter_sort_limit_fast(
238 table, fields, sort_field, descending, limit, pred_opt,
239 )? {
240 return Ok(result);
241 }
242 }
243 }
244 }
245 // Fast path: Project(Limit(Filter(SeqScan))) — stream,
246 // decode only projected columns, stop at limit.
247 if let PlanNode::Filter {
248 input: fi,
249 predicate,
250 } = inner.as_ref()
251 {
252 if let PlanNode::SeqScan { table } = fi.as_ref() {
253 let limit = match limit_expr {
254 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
255 _ => usize::MAX,
256 };
257 if let Some(result) = self.project_filter_limit_fast(
258 table,
259 fields,
260 limit,
261 Some(predicate),
262 )? {
263 return Ok(result);
264 }
265 }
266 }
267 // Fast path: Project(Limit(SeqScan)) — stream, no filter.
268 if let PlanNode::SeqScan { table } = inner.as_ref() {
269 let limit = match limit_expr {
270 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
271 _ => usize::MAX,
272 };
273 if let Some(result) =
274 self.project_filter_limit_fast(table, fields, limit, None)?
275 {
276 return Ok(result);
277 }
278 }
279 }
280
281 // Mission D4: Project(Filter(SeqScan)) without Limit. Reuses
282 // `project_filter_limit_fast` with limit = usize::MAX so the
283 // hot loop decodes only projected columns and uses the
284 // compiled predicate. Previously this fell through to the
285 // generic Filter branch which materialised every column via
286 // `decode_row` then re-projected — quadratic work.
287 //
288 // multi_col_and_filter (`U filter .age > 30 and .status =
289 // "active" { .name, .age }`) was 6.18ms (0.7x SQLite) and
290 // is the load-bearing workload for this fast path.
291 if let PlanNode::Filter {
292 input: fi,
293 predicate,
294 } = input.as_ref()
295 {
296 if let PlanNode::SeqScan { table } = fi.as_ref() {
297 if let Some(result) = self.project_filter_limit_fast(
298 table,
299 fields,
300 usize::MAX,
301 Some(predicate),
302 )? {
303 return Ok(result);
304 }
305 }
306 }
307
308 // Mission D4: Project(SeqScan) without Filter or Limit.
309 // Decode only projected columns; the previous fall-through
310 // built full Vec<Value> rows then re-projected.
311 if let PlanNode::SeqScan { table } = input.as_ref() {
312 if let Some(result) =
313 self.project_filter_limit_fast(table, fields, usize::MAX, None)?
314 {
315 return Ok(result);
316 }
317 }
318
319 let result = self.execute_plan(input)?;
320 match result {
321 QueryResult::Rows { columns, rows } => {
322 let proj_columns: Vec<String> = fields
323 .iter()
324 .map(|f| {
325 f.alias.clone().unwrap_or_else(|| match &f.expr {
326 Expr::Field(name) => name.clone(),
327 // Mission E1.2: `{ u.name }` projects as the
328 // qualified column name so callers can still
329 // disambiguate across the join output.
330 Expr::QualifiedField { qualifier, field } => {
331 format!("{qualifier}.{field}")
332 }
333 _ => "?".into(),
334 })
335 })
336 .collect();
337 let proj_rows: Vec<Vec<Value>> = rows
338 .iter()
339 .map(|row| {
340 fields
341 .iter()
342 .map(|f| eval_expr(&f.expr, row, &columns))
343 .collect()
344 })
345 .collect();
346 Ok(QueryResult::Rows {
347 columns: proj_columns,
348 rows: proj_rows,
349 })
350 }
351 _ => Err("project requires row input".into()),
352 }
353 }
354
355 PlanNode::Sort { input, keys } => {
356 let result = self.execute_plan(input)?;
357 match result {
358 QueryResult::Rows { columns, mut rows } => {
359 // WS2: row-count cap is a cheap secondary guard; the
360 // byte budget is the real OOM defense for the sort
361 // buffer (a few very large rows pass the row cap).
362 if rows.len() > MAX_SORT_ROWS {
363 return Err(QueryError::SortLimitExceeded);
364 }
365 self.charge_rows(&rows)?;
366 let key_indices: Vec<(usize, bool)> = keys
367 .iter()
368 .map(|k| {
369 columns
370 .iter()
371 .position(|c| c == &k.field)
372 .map(|idx| (idx, k.descending))
373 .ok_or_else(|| QueryError::ColumnNotFound {
374 table: String::new(),
375 column: k.field.clone(),
376 })
377 })
378 .collect::<Result<_, QueryError>>()?;
379 rows.sort_by(|a, b| {
380 for &(col_idx, descending) in &key_indices {
381 let cmp = a[col_idx].cmp(&b[col_idx]);
382 let cmp = if descending { cmp.reverse() } else { cmp };
383 if cmp != std::cmp::Ordering::Equal {
384 return cmp;
385 }
386 }
387 std::cmp::Ordering::Equal
388 });
389 Ok(QueryResult::Rows { columns, rows })
390 }
391 _ => Err("sort requires row input".into()),
392 }
393 }
394
395 PlanNode::Limit { input, count } => {
396 let result = self.execute_plan(input)?;
397 let n = match count {
398 Expr::Literal(Literal::Int(v)) => *v as usize,
399 _ => return Err("limit must be integer literal".into()),
400 };
401 match result {
402 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
403 columns,
404 rows: rows.into_iter().take(n).collect(),
405 }),
406 _ => Err("limit requires row input".into()),
407 }
408 }
409
410 PlanNode::Offset { input, count } => {
411 let result = self.execute_plan(input)?;
412 let n = match count {
413 Expr::Literal(Literal::Int(v)) => *v as usize,
414 _ => return Err("offset must be integer literal".into()),
415 };
416 match result {
417 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
418 columns,
419 rows: rows.into_iter().skip(n).collect(),
420 }),
421 _ => Err("offset requires row input".into()),
422 }
423 }
424
425 PlanNode::Aggregate {
426 input,
427 function,
428 field,
429 } => {
430 // Fast path: count() over SeqScan — count rows without any decode
431 if *function == AggFunc::Count {
432 if let PlanNode::SeqScan { table } = input.as_ref() {
433 // Auto-refresh a dirty materialized view before
434 // counting it — otherwise count(View) returns stale
435 // data after an underlying mutation (F3).
436 if self.view_registry.is_dirty(table) {
437 self.refresh_view(table)?;
438 }
439 let mut count: i64 = 0;
440 self.catalog
441 .for_each_row_raw(table, |_rid, _data| {
442 count += 1;
443 })
444 .map_err(|e| QueryError::StorageError(e.to_string()))?;
445 return Ok(QueryResult::Scalar(Value::Int(count)));
446 }
447 // Fast path: count() over Filter(SeqScan) — try compiled
448 // predicate first, fall back to decode_column path.
449 // Skip a predicate carrying a subquery: the raw-bytes
450 // evaluators here don't materialise subqueries, so
451 // `count(T filter .x in (...))` would silently count 0
452 // (F1). Falling through routes it to the generic path
453 // that resolves the subquery correctly.
454 if let PlanNode::Filter {
455 input: inner,
456 predicate,
457 } = input.as_ref()
458 {
459 if let PlanNode::SeqScan { table } = inner.as_ref() {
460 if self.view_registry.is_dirty(table) {
461 self.refresh_view(table)?;
462 }
463 }
464 if let (PlanNode::SeqScan { table }, false) =
465 (inner.as_ref(), contains_subquery(predicate))
466 {
467 let schema = self
468 .catalog
469 .schema(table)
470 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
471 .clone();
472 let columns: Vec<String> =
473 schema.columns.iter().map(|c| c.name.clone()).collect();
474 let fast = FastLayout::new(&schema);
475 let row_layout = RowLayout::new(&schema);
476
477 // Try compiled predicate (zero-allocation hot path).
478 // Handles int leaves, string-eq leaves, AND conjunctions.
479 if let Some(compiled) =
480 compile_predicate(predicate, &columns, &fast, &schema)
481 {
482 let mut count: i64 = 0;
483 self.catalog
484 .for_each_row_raw(table, |_rid, data| {
485 if compiled(data) {
486 count += 1;
487 }
488 })
489 .map_err(|e| QueryError::StorageError(e.to_string()))?;
490 return Ok(QueryResult::Scalar(Value::Int(count)));
491 }
492
493 // Fallback: decode predicate columns
494 let pred_cols = predicate_column_indices(predicate, &columns);
495 let mut count: i64 = 0;
496 self.catalog
497 .for_each_row_raw(table, |_rid, data| {
498 let pred_row =
499 decode_selective(&schema, &row_layout, data, &pred_cols);
500 if eval_predicate(predicate, &pred_row, &columns) {
501 count += 1;
502 }
503 })
504 .map_err(|e| QueryError::StorageError(e.to_string()))?;
505
506 return Ok(QueryResult::Scalar(Value::Int(count)));
507 }
508 }
509 }
510
511 // Fast path: sum/avg/min/max over a single fixed-size int
512 // column with an optional compiled filter predicate. Walks
513 // raw row bytes, zero allocation per row.
514 if matches!(
515 function,
516 AggFunc::Sum
517 | AggFunc::Avg
518 | AggFunc::Min
519 | AggFunc::Max
520 | AggFunc::CountDistinct
521 ) {
522 if let Some(col) = field.as_ref() {
523 // Shape: Aggregate(SeqScan) or Aggregate(Filter(SeqScan))
524 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
525 match input.as_ref() {
526 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
527 PlanNode::Filter {
528 input: inner,
529 predicate,
530 } => {
531 if let PlanNode::SeqScan { table } = inner.as_ref() {
532 (Some(table.as_str()), Some(predicate))
533 } else {
534 (None, None)
535 }
536 }
537 _ => (None, None),
538 };
539 if let Some(table) = table_opt {
540 if let Some(result) =
541 self.agg_single_col_fast(table, col, *function, pred_opt)?
542 {
543 return Ok(result);
544 }
545 }
546 }
547 }
548
549 // Fast path: Project(Limit(Filter(SeqScan))) — stream, decode
550 // only projected columns, stop once we hit the limit.
551 // (Handled in the Project branch; this branch only fires when
552 // the aggregate is the outer node.)
553 let result = self.execute_plan(input)?;
554 match result {
555 QueryResult::Rows { columns, rows } => {
556 match function {
557 AggFunc::Count => {
558 Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
559 }
560 AggFunc::CountDistinct => {
561 let col = field.as_ref().ok_or("count distinct requires field")?;
562 let idx = columns
563 .iter()
564 .position(|c| c == col)
565 .ok_or("col not found")?;
566 let mut seen = std::collections::HashSet::new();
567 for row in &rows {
568 let v = &row[idx];
569 if !v.is_empty() {
570 seen.insert(v.clone());
571 }
572 }
573 Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
574 }
575 AggFunc::Avg => {
576 let col = field.as_ref().ok_or("avg requires field")?;
577 let idx = columns
578 .iter()
579 .position(|c| c == col)
580 .ok_or("col not found")?;
581 let sum: f64 = rows
582 .iter()
583 .filter_map(|r| match &r[idx] {
584 Value::Int(v) => Some(*v as f64),
585 Value::Float(v) => Some(*v),
586 _ => None,
587 })
588 .sum();
589 let count = rows.len() as f64;
590 Ok(QueryResult::Scalar(Value::Float(sum / count)))
591 }
592 AggFunc::Sum => {
593 let col = field.as_ref().ok_or("sum requires field")?;
594 let idx = columns
595 .iter()
596 .position(|c| c == col)
597 .ok_or("col not found")?;
598 // Track int and float contributions separately so
599 // Float columns (and mixed Int/Float rows) don't get
600 // silently dropped as they did in the Int-only
601 // version. If any Float is present, the whole sum
602 // promotes to Float — matching Avg's semantics.
603 let mut int_sum: i64 = 0;
604 let mut float_sum: f64 = 0.0;
605 let mut saw_float = false;
606 for r in &rows {
607 match &r[idx] {
608 Value::Int(v) => int_sum += *v,
609 Value::Float(v) => {
610 float_sum += *v;
611 saw_float = true;
612 }
613 _ => {}
614 }
615 }
616 let result = if saw_float {
617 Value::Float(float_sum + int_sum as f64)
618 } else {
619 Value::Int(int_sum)
620 };
621 Ok(QueryResult::Scalar(result))
622 }
623 AggFunc::Min | AggFunc::Max => {
624 let col = field.as_ref().ok_or("min/max requires field")?;
625 let idx = columns
626 .iter()
627 .position(|c| c == col)
628 .ok_or("col not found")?;
629 let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
630 let result = if *function == AggFunc::Min {
631 vals.into_iter().min().cloned()
632 } else {
633 vals.into_iter().max().cloned()
634 };
635 Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
636 }
637 }
638 }
639 _ => Err("aggregate requires row input".into()),
640 }
641 }
642
643 PlanNode::Insert { table, rows } => {
644 // Build + validate EVERY row before inserting any, so a bad
645 // row (unknown/missing/uncoercible field) aborts the whole
646 // statement without a partial write. The WAL fsync happens
647 // once at statement end, so N rows = N appends + 1 fsync.
648 let all_values: Vec<Vec<Value>> = {
649 let schema = self
650 .catalog
651 .schema(table)
652 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
653 let mut all = Vec::with_capacity(rows.len());
654 for assignments in rows {
655 let mut values = vec![Value::Empty; schema.columns.len()];
656 for a in assignments {
657 let idx = schema.column_index(&a.field).ok_or_else(|| {
658 QueryError::ColumnNotFound {
659 table: String::new(),
660 column: a.field.clone(),
661 }
662 })?;
663 let raw = literal_to_value(&a.value)?;
664 values[idx] = coerce_value(raw, &schema.columns[idx])?;
665 }
666 for col in &schema.columns {
667 if col.required && matches!(values[col.position as usize], Value::Empty)
668 {
669 return Err(QueryError::Execution(format!(
670 "column '{}' is required but no value was provided",
671 col.name
672 )));
673 }
674 }
675 all.push(values);
676 }
677 all
678 };
679 // Charge the materialized batch against the per-query memory
680 // budget before inserting — keeps multi-row insert consistent
681 // with every other full-materialization point (sort/join/group)
682 // and bounds embedded callers (the server also caps the query
683 // string at 1 MB, but embedded callers have no such limit).
684 self.charge_rows(&all_values)?;
685 let n = all_values.len() as u64;
686 for values in &all_values {
687 self.catalog
688 .insert(table, values)
689 .map_err(|e| QueryError::StorageError(e.to_string()))?;
690 }
691 self.view_registry.mark_dependents_dirty(table);
692 Ok(QueryResult::Modified(n))
693 }
694
695 PlanNode::Upsert {
696 table,
697 key_column,
698 assignments,
699 on_conflict,
700 } => {
701 let (values, key_idx) = {
702 let schema = self
703 .catalog
704 .schema(table)
705 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
706 let mut values = vec![Value::Empty; schema.columns.len()];
707 for a in assignments {
708 let idx = schema.column_index(&a.field).ok_or_else(|| {
709 QueryError::ColumnNotFound {
710 table: String::new(),
711 column: a.field.clone(),
712 }
713 })?;
714 let raw = literal_to_value(&a.value)?;
715 values[idx] = coerce_value(raw, &schema.columns[idx])?;
716 }
717 for col in &schema.columns {
718 if col.required && matches!(values[col.position as usize], Value::Empty) {
719 return Err(QueryError::Execution(format!(
720 "column '{}' is required but no value was provided",
721 col.name
722 )));
723 }
724 }
725 let key_idx = schema
726 .column_index(key_column)
727 .ok_or_else(|| format!("key column '{key_column}' not found"))?;
728 (values, key_idx)
729 };
730
731 // Upsert requires the `on` column to be unique — otherwise
732 // there is no well-defined row to overwrite and a plain
733 // insert could silently create duplicate keys.
734 if self.catalog.is_index_unique(table, key_column) != Some(true) {
735 return Err(QueryError::Execution(format!(
736 "upsert on .{key_column} requires a unique column (declare it with \
737 `unique {key_column}: <type>` or `alter {table} add unique .{key_column}`)"
738 )));
739 }
740
741 let key_value = values[key_idx].clone();
742
743 // Probe the unique index for a conflict.
744 let existing = {
745 let tbl = self
746 .catalog
747 .get_table(table)
748 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
749 // The key column is guaranteed unique above, so this
750 // returns at most one matching row.
751 let rids = tbl.index_lookup_all(key_column, &key_value);
752 rids.into_iter().next().and_then(|rid| {
753 tbl.heap
754 .get(rid)
755 .map(|data| (rid, decode_row(&tbl.schema, &data)))
756 })
757 };
758
759 if let Some((rid, mut existing_row)) = existing {
760 // Conflict: apply on_conflict assignments (or all non-key if empty).
761 let update_assignments = if on_conflict.is_empty() {
762 assignments
763 } else {
764 on_conflict
765 };
766 let changed_cols: Vec<usize> = {
767 let schema = self
768 .catalog
769 .schema(table)
770 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
771 let mut indices = Vec::new();
772 for a in update_assignments {
773 let idx = schema.column_index(&a.field).ok_or_else(|| {
774 QueryError::ColumnNotFound {
775 table: String::new(),
776 column: a.field.clone(),
777 }
778 })?;
779 if idx != key_idx {
780 existing_row[idx] = literal_to_value(&a.value)?;
781 indices.push(idx);
782 }
783 }
784 indices
785 };
786 self.catalog
787 .update_hinted(table, rid, &existing_row, Some(&changed_cols))
788 .map_err(|e| QueryError::StorageError(e.to_string()))?;
789 self.view_registry.mark_dependents_dirty(table);
790 Ok(QueryResult::Modified(1))
791 } else {
792 // No conflict: insert.
793 self.catalog
794 .insert(table, &values)
795 .map_err(|e| QueryError::StorageError(e.to_string()))?;
796 self.view_registry.mark_dependents_dirty(table);
797 Ok(QueryResult::Modified(1))
798 }
799 }
800
801 PlanNode::Update {
802 input,
803 table,
804 assignments,
805 } => {
806 // Mission C Phase 3: resolve assignments against a borrowed
807 // schema, then drop the borrow before the mutation loop.
808 // Try literal-only path first; fall back to per-row expression
809 // evaluation if any assignment contains a non-literal expression
810 // (e.g., `age := .age + 1`).
811 let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
812 let schema_ref = self
813 .catalog
814 .schema(table)
815 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
816 let indices: Vec<usize> = assignments
817 .iter()
818 .map(|a| {
819 schema_ref.column_index(&a.field).ok_or_else(|| {
820 QueryError::ColumnNotFound {
821 table: String::new(),
822 column: a.field.clone(),
823 }
824 })
825 })
826 .collect::<Result<_, _>>()?;
827 let vals: Result<Vec<Value>, _> = assignments
828 .iter()
829 .map(|a| literal_to_value(&a.value))
830 .collect();
831 (indices, vals.ok())
832 };
833 let resolved_assignments: Option<Vec<(usize, Value)>> =
834 literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
835
836 // Mission C Phase 2: the hint Table::update_hinted needs to
837 // decide whether to read the old row for index diff.
838 let changed_cols: Vec<usize> = col_indices.clone();
839
840 // ── Fused scan+update for Update(Filter(SeqScan)) ────────
841 // Perf sprint: instead of the two-pass collect-RIDs-then-loop
842 // pattern (which pays one ensure_hot per matched row on the
843 // second pass), fuse the predicate evaluation and in-place
844 // byte-level mutation into a single heap walk. Same idea as
845 // the fused scan_delete_matching path for deletes.
846 if let Some(ref resolved_assignments) = resolved_assignments {
847 if let PlanNode::Filter {
848 input: inner,
849 predicate,
850 } = input.as_ref()
851 {
852 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
853 if t == table {
854 let fused_result = self.try_fused_scan_update(
855 table,
856 predicate,
857 resolved_assignments,
858 &changed_cols,
859 );
860 if let Some(result) = fused_result {
861 return result;
862 }
863 }
864 }
865 }
866 }
867
868 // Collect matching RowIds in a single pass.
869 let matching_rids = self.collect_rids_for_mutation(input, table)?;
870
871 // ── Literal-only fast paths ─────────────────────────────
872 if let Some(ref resolved_assignments) = resolved_assignments {
873 // Mission C Phase 4: in-place byte-patch fast path. If every
874 // assignment targets a fixed-size non-null column AND none of
875 // them is indexed, we can skip decode_row / Vec<Value> /
876 // encode_row_into entirely and patch the row's raw bytes on
877 // the hot page.
878 let fast_patch: Option<Vec<FastPatch>> = {
879 let tbl = self
880 .catalog
881 .get_table(table)
882 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
883 let schema = &tbl.schema;
884 let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
885 is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
886 });
887 let no_indexed = !resolved_assignments
888 .iter()
889 .any(|(idx, _)| tbl.has_indexed_col(*idx));
890
891 if all_fixed_nonnull && no_indexed {
892 let layout = RowLayout::new(schema);
893 let bitmap_size = layout.bitmap_size();
894 let patches: Vec<FastPatch> = resolved_assignments
895 .iter()
896 .map(|(idx, val)| {
897 let fixed_off = layout
898 .fixed_offset(*idx)
899 .expect("is_fixed_size already checked");
900 let field_off = 2 + bitmap_size + fixed_off;
901 let bytes: FixedBytes = match val {
902 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
903 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
904 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
905 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
906 Value::Uuid(v) => FixedBytes::Uuid(*v),
907 _ => unreachable!("all_fixed_nonnull guard lied"),
908 };
909 FastPatch {
910 field_off,
911 bitmap_byte_off: 2 + idx / 8,
912 bit_mask: 1u8 << (idx % 8),
913 bytes,
914 }
915 })
916 .collect();
917 Some(patches)
918 } else {
919 None
920 }
921 };
922
923 if let Some(patches) = fast_patch {
924 let mut count = 0u64;
925 for rid in matching_rids {
926 // Mission B2: WAL-log every patch so crash
927 // recovery replays the update. Same mutation
928 // closure as before — the wrapper just sandwiches
929 // it between a hot-page read and a WAL append.
930 let ok = self
931 .catalog
932 .update_row_bytes_logged(table, rid, |row| {
933 for p in &patches {
934 row[p.bitmap_byte_off] &= !p.bit_mask;
935 let field_bytes = p.bytes.as_slice();
936 row[p.field_off..p.field_off + field_bytes.len()]
937 .copy_from_slice(field_bytes);
938 }
939 })
940 .map_err(|e| QueryError::StorageError(e.to_string()))?;
941 if ok {
942 count += 1;
943 }
944 }
945 self.view_registry.mark_dependents_dirty(table);
946 return Ok(QueryResult::Modified(count));
947 }
948
949 // Mission C Phase 10: var-column in-place shrink fast path.
950 let var_fast: Option<(usize, Option<Vec<u8>>)> = {
951 let tbl = self
952 .catalog
953 .get_table(table)
954 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
955 let schema = &tbl.schema;
956 let is_single = resolved_assignments.len() == 1;
957 let is_var_col = is_single
958 && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
959 let no_indexed = !resolved_assignments
960 .iter()
961 .any(|(idx, _)| tbl.has_indexed_col(*idx));
962
963 if is_single && is_var_col && no_indexed {
964 let (idx, val) = &resolved_assignments[0];
965 let bytes_opt: Option<Vec<u8>> = match val {
966 Value::Str(s) => Some(s.as_bytes().to_vec()),
967 Value::Bytes(b) => Some(b.clone()),
968 Value::Empty => None,
969 _ => {
970 return Err(QueryError::TypeError(format!(
971 "cannot assign non-var value to var column '{}'",
972 schema.columns[*idx].name
973 )))
974 }
975 };
976 Some((*idx, bytes_opt))
977 } else {
978 None
979 }
980 };
981
982 if let Some((col_idx, new_bytes_opt)) = var_fast {
983 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
984 let mut count = 0u64;
985 let mut fallback_rids: Vec<RowId> = Vec::new();
986 for rid in &matching_rids {
987 // Mission B2: logged variant so crash recovery
988 // replays the shrink. On a false return (row
989 // would have to grow), the rid is pushed to
990 // `fallback_rids` and the slower `update_hinted`
991 // path — which is already WAL-logged — picks it up.
992 let ok = self
993 .catalog
994 .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
995 .map_err(|e| QueryError::StorageError(e.to_string()))?;
996 if ok {
997 count += 1;
998 } else {
999 fallback_rids.push(*rid);
1000 }
1001 }
1002 for rid in fallback_rids {
1003 let mut row = match self.catalog.get(table, rid) {
1004 Some(r) => r,
1005 None => continue,
1006 };
1007 for (idx, val) in resolved_assignments.iter() {
1008 row[*idx] = val.clone();
1009 }
1010 self.catalog
1011 .update_hinted(table, rid, &row, Some(&changed_cols))
1012 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1013 count += 1;
1014 }
1015 self.view_registry.mark_dependents_dirty(table);
1016 return Ok(QueryResult::Modified(count));
1017 }
1018
1019 // Generic literal path: decode row, apply literal values.
1020 let mut count = 0u64;
1021 for rid in matching_rids {
1022 let mut row = match self.catalog.get(table, rid) {
1023 Some(r) => r,
1024 None => continue,
1025 };
1026 for (idx, val) in resolved_assignments.iter() {
1027 row[*idx] = val.clone();
1028 }
1029 self.catalog
1030 .update_hinted(table, rid, &row, Some(&changed_cols))
1031 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1032 count += 1;
1033 }
1034 self.view_registry.mark_dependents_dirty(table);
1035 return Ok(QueryResult::Modified(count));
1036 } // end if let Some(resolved_assignments)
1037
1038 // ── Expression-based update path ────────────────────────
1039 // At least one assignment contains a non-literal expression
1040 // (e.g., `age := .age + 1`). Evaluate per-row.
1041 let col_names: Vec<String> = {
1042 let schema_ref = self
1043 .catalog
1044 .schema(table)
1045 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1046 schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1047 };
1048 let mut count = 0u64;
1049 for rid in matching_rids {
1050 let mut row = match self.catalog.get(table, rid) {
1051 Some(r) => r,
1052 None => continue,
1053 };
1054 for (i, asgn) in assignments.iter().enumerate() {
1055 let val = eval_expr(&asgn.value, &row, &col_names);
1056 row[col_indices[i]] = val;
1057 }
1058 self.catalog
1059 .update_hinted(table, rid, &row, Some(&changed_cols))
1060 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1061 count += 1;
1062 }
1063 self.view_registry.mark_dependents_dirty(table);
1064 Ok(QueryResult::Modified(count))
1065 }
1066
1067 PlanNode::Delete { input, table } => {
1068 // Mission C Phase 3: no schema clone — collect_rids_for_mutation
1069 // looks up schema internally when it needs one, and the mutation
1070 // loop doesn't need the schema at all.
1071 //
1072 // Mission C Phase 12: route bulk deletes through
1073 // `Catalog::delete_many`, which batches the btree leaf
1074 // compaction and shares one `ensure_hot` per row between
1075 // the index-key extraction and the slot delete. On
1076 // `delete_by_filter` (100K fixture, ~20K matches) that
1077 // removes ~4ms of pure `Vec::remove` memmove from the btree
1078 // maintenance phase.
1079 //
1080 // Mission C Phase 16: for the common `delete where ...`
1081 // shape (Filter(SeqScan)) — and the rarer "delete
1082 // everything" shape (SeqScan) — skip the two-pass
1083 // `collect_rids_for_mutation` + `delete_many` flow entirely.
1084 // The fused `scan_delete_matching` primitive walks the
1085 // heap exactly once, paying one `ensure_hot` per page
1086 // instead of per-row. That closes the last major gap on
1087 // the bench's `delete_by_filter` workload.
1088 if let PlanNode::Filter {
1089 input: inner,
1090 predicate,
1091 } = input.as_ref()
1092 {
1093 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1094 if t == table {
1095 let schema = self
1096 .catalog
1097 .schema(table)
1098 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1099 let columns: Vec<String> =
1100 schema.columns.iter().map(|c| c.name.clone()).collect();
1101 let fast = FastLayout::new(schema);
1102 if let Some(compiled) =
1103 compile_predicate(predicate, &columns, &fast, schema)
1104 {
1105 // Mission B2: logged variant so every
1106 // matched rid hits the WAL during the
1107 // single-pass scan. Structure of the
1108 // fused scan is unchanged — only the
1109 // hook closure now also appends.
1110 let count = self
1111 .catalog
1112 .scan_delete_matching_logged(table, |data| compiled(data))
1113 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1114 self.view_registry.mark_dependents_dirty(table);
1115 return Ok(QueryResult::Modified(count));
1116 }
1117 }
1118 }
1119 } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1120 if t == table {
1121 // `delete from T` with no predicate — every live
1122 // row matches. One pass is still the right shape.
1123 // Mission B2: logged variant — see above.
1124 let count = self
1125 .catalog
1126 .scan_delete_matching_logged(table, |_| true)
1127 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1128 self.view_registry.mark_dependents_dirty(table);
1129 return Ok(QueryResult::Modified(count));
1130 }
1131 }
1132
1133 let matching_rids = self.collect_rids_for_mutation(input, table)?;
1134 let count = self
1135 .catalog
1136 .delete_many(table, &matching_rids)
1137 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1138 self.view_registry.mark_dependents_dirty(table);
1139 Ok(QueryResult::Modified(count))
1140 }
1141
1142 PlanNode::AliasScan { table, alias } => {
1143 // Mission E1.2: scan `table` and rename every output column
1144 // to `alias.field`. Used as a join leaf so downstream
1145 // NestedLoopJoin + Filter + Project nodes can resolve
1146 // `Expr::QualifiedField` lookups by direct column-name match.
1147 //
1148 // We don't bother with a fused zero-copy loop here yet — the
1149 // whole join path is nested-loop and correctness-first
1150 // (Phase E1.3 will introduce hash join and at that point we
1151 // can revisit whether to specialise AliasScan).
1152 let schema = self
1153 .catalog
1154 .schema(table)
1155 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1156 .clone();
1157 let columns: Vec<String> = schema
1158 .columns
1159 .iter()
1160 .map(|c| format!("{alias}.{}", c.name))
1161 .collect();
1162 let rows: Vec<Vec<Value>> = self
1163 .catalog
1164 .scan(table)
1165 .map_err(|e| QueryError::StorageError(e.to_string()))?
1166 .map(|(_, row)| row)
1167 .collect();
1168 Ok(QueryResult::Rows { columns, rows })
1169 }
1170
1171 PlanNode::NestedLoopJoin {
1172 left,
1173 right,
1174 on,
1175 kind,
1176 } => {
1177 // Materialise both sides. The executor ships two strategies:
1178 // 1. Hash join (E1.3) — when the `on` predicate is a
1179 // simple equi-predicate `left_col = right_col`, build a
1180 // FxHashMap<Value, Vec<row_idx>> over the right side
1181 // and probe with the left side. O(L + R) instead of
1182 // O(L × R). Handles Inner and LeftOuter.
1183 // 2. Nested loop (E1.2) — fallback for Cross, non-equi
1184 // predicates, or `on` expressions that reference
1185 // either side with something more complex than a
1186 // QualifiedField.
1187 let left_result = self.execute_plan(left)?;
1188 let right_result = self.execute_plan(right)?;
1189 let (left_columns, left_rows) = match left_result {
1190 QueryResult::Rows { columns, rows } => (columns, rows),
1191 _ => return Err("join left side must produce rows".into()),
1192 };
1193 let (right_columns, right_rows) = match right_result {
1194 QueryResult::Rows { columns, rows } => (columns, rows),
1195 _ => return Err("join right side must produce rows".into()),
1196 };
1197
1198 // WS2: byte-budget guard on the join build side. Charge both
1199 // materialized inputs before we build the hash table / probe;
1200 // the output is row-capped by check_join_limit below.
1201 self.charge_rows(&left_rows)?;
1202 self.charge_rows(&right_rows)?;
1203
1204 // Hash-join fast path.
1205 if !matches!(kind, JoinKind::Cross) {
1206 if let Some(pred) = on {
1207 if let Some((l_idx, r_idx)) =
1208 try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1209 {
1210 let result = hash_join(
1211 left_columns,
1212 left_rows,
1213 right_columns,
1214 right_rows,
1215 l_idx,
1216 r_idx,
1217 *kind,
1218 );
1219 if let QueryResult::Rows { ref rows, .. } = result {
1220 check_join_limit(rows.len())?;
1221 }
1222 return Ok(result);
1223 }
1224 }
1225 }
1226
1227 // Nested-loop fallback.
1228 let n_left = left_columns.len();
1229 let n_right = right_columns.len();
1230 let mut columns = Vec::with_capacity(n_left + n_right);
1231 columns.extend(left_columns);
1232 columns.extend(right_columns);
1233
1234 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1235 let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1236
1237 for left_row in &left_rows {
1238 let mut matched = false;
1239 for right_row in &right_rows {
1240 combined.clear();
1241 combined.extend_from_slice(left_row);
1242 combined.extend_from_slice(right_row);
1243 let keep = match kind {
1244 JoinKind::Cross => true,
1245 JoinKind::Inner | JoinKind::LeftOuter => match on {
1246 Some(pred) => eval_predicate(pred, &combined, &columns),
1247 // Missing `on` for non-cross joins is a
1248 // parser error, but if it slips through we
1249 // treat it as "match everything".
1250 None => true,
1251 },
1252 // RightOuter is rewritten to LeftOuter by the
1253 // planner, so we never see it here.
1254 JoinKind::RightOuter => {
1255 unreachable!("planner rewrites RightOuter to LeftOuter")
1256 }
1257 };
1258 if keep {
1259 rows.push(combined.clone());
1260 check_join_limit(rows.len())?;
1261 matched = true;
1262 }
1263 }
1264 if !matched && matches!(kind, JoinKind::LeftOuter) {
1265 let mut row = Vec::with_capacity(n_left + n_right);
1266 row.extend_from_slice(left_row);
1267 row.resize(n_left + n_right, Value::Empty);
1268 rows.push(row);
1269 check_join_limit(rows.len())?;
1270 }
1271 }
1272
1273 Ok(QueryResult::Rows { columns, rows })
1274 }
1275
1276 PlanNode::Distinct { input } => {
1277 let result = self.execute_plan(input)?;
1278 match result {
1279 QueryResult::Rows { columns, rows } => {
1280 let mut seen = std::collections::HashSet::new();
1281 let mut unique_rows = Vec::new();
1282 for row in rows {
1283 if seen.insert(row.clone()) {
1284 unique_rows.push(row);
1285 }
1286 }
1287 Ok(QueryResult::Rows {
1288 columns,
1289 rows: unique_rows,
1290 })
1291 }
1292 other => Ok(other),
1293 }
1294 }
1295
1296 PlanNode::GroupBy {
1297 input,
1298 keys,
1299 aggregates,
1300 having,
1301 } => {
1302 let result = self.execute_plan(input)?;
1303 match result {
1304 QueryResult::Rows { columns, rows } => {
1305 // WS2: byte-budget guard on the GROUP BY input buffer
1306 // (the hash table is bounded by the input it groups).
1307 self.charge_rows(&rows)?;
1308 // Resolve key column indices.
1309 let key_indices: Vec<usize> = keys
1310 .iter()
1311 .map(|k| {
1312 columns
1313 .iter()
1314 .position(|c| c == k)
1315 .ok_or_else(|| format!("group-by column '{k}' not found"))
1316 })
1317 .collect::<Result<Vec<_>, _>>()?;
1318
1319 // Resolve aggregate field indices. count(*) uses
1320 // sentinel usize::MAX — compute_group_aggregate
1321 // treats it as "count all rows in the group".
1322 let agg_field_indices: Vec<usize> = aggregates
1323 .iter()
1324 .map(|a| {
1325 if a.field == "*" {
1326 Ok(usize::MAX)
1327 } else {
1328 columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1329 format!("aggregate column '{}' not found", a.field)
1330 })
1331 }
1332 })
1333 .collect::<Result<Vec<_>, _>>()?;
1334
1335 // Group rows by key values (preserving insertion order).
1336 let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1337 rustc_hash::FxHashMap::default();
1338 let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1339 for (ri, row) in rows.iter().enumerate() {
1340 let key: Vec<Value> =
1341 key_indices.iter().map(|&i| row[i].clone()).collect();
1342 match group_map.get(&key) {
1343 Some(&idx) => groups[idx].1.push(ri),
1344 None => {
1345 let idx = groups.len();
1346 group_map.insert(key.clone(), idx);
1347 groups.push((key, vec![ri]));
1348 }
1349 }
1350 }
1351
1352 // Build output column names: keys ++ aggregate output names.
1353 let mut out_columns: Vec<String> = keys.clone();
1354 for agg in aggregates.iter() {
1355 out_columns.push(agg.output_name.clone());
1356 }
1357
1358 // Compute aggregates per group.
1359 let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1360 for (key_vals, row_indices) in &groups {
1361 let mut row = key_vals.clone();
1362 for (ai, agg) in aggregates.iter().enumerate() {
1363 let col_idx = agg_field_indices[ai];
1364 let val = compute_group_aggregate(
1365 agg.function,
1366 &rows,
1367 row_indices,
1368 col_idx,
1369 );
1370 row.push(val);
1371 }
1372 out_rows.push(row);
1373 }
1374
1375 // Apply HAVING filter.
1376 if let Some(having_expr) = having {
1377 out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1378 }
1379
1380 Ok(QueryResult::Rows {
1381 columns: out_columns,
1382 rows: out_rows,
1383 })
1384 }
1385 _ => Err("group by requires row input".into()),
1386 }
1387 }
1388
1389 PlanNode::CreateTable { name, fields } => {
1390 let columns: Vec<ColumnDef> = fields
1391 .iter()
1392 .enumerate()
1393 .map(|(i, f)| -> Result<ColumnDef, QueryError> {
1394 Ok(ColumnDef {
1395 name: f.name.clone(),
1396 type_id: type_name_to_id(&f.type_name)
1397 .map_err(QueryError::TypeError)?,
1398 required: f.required,
1399 position: i as u16,
1400 })
1401 })
1402 .collect::<Result<Vec<_>, _>>()?;
1403 let schema = Schema {
1404 table_name: name.clone(),
1405 columns,
1406 };
1407 self.catalog
1408 .create_table(schema)
1409 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1410 // Declaring a field `unique` auto-creates a unique B+tree
1411 // index, which is where uniqueness is enforced on writes.
1412 for f in fields.iter().filter(|f| f.unique) {
1413 self.catalog
1414 .create_index_unique(name, &f.name, true)
1415 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1416 }
1417 Ok(QueryResult::Created(name.clone()))
1418 }
1419
1420 PlanNode::AlterTable { table, action } => match action {
1421 AlterAction::AddColumn {
1422 name,
1423 type_name,
1424 required,
1425 } => {
1426 let position = self
1427 .catalog
1428 .schema(table)
1429 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1430 .columns
1431 .len() as u16;
1432 let col = ColumnDef {
1433 name: name.clone(),
1434 type_id: type_name_to_id(type_name).map_err(QueryError::TypeError)?,
1435 required: *required,
1436 position,
1437 };
1438 self.catalog
1439 .alter_table_add_column(table, col)
1440 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1441 Ok(QueryResult::Executed {
1442 message: format!("column '{name}' added to '{table}'"),
1443 })
1444 }
1445 AlterAction::DropColumn { name } => {
1446 self.catalog
1447 .alter_table_drop_column(table, name)
1448 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1449 Ok(QueryResult::Executed {
1450 message: format!("column '{name}' dropped from '{table}'"),
1451 })
1452 }
1453 AlterAction::AddIndex { column } => {
1454 self.catalog
1455 .create_index(table, column)
1456 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1457 Ok(QueryResult::Executed {
1458 message: format!("index on '{table}.{column}' created"),
1459 })
1460 }
1461 AlterAction::AddUnique { column } => {
1462 // No DropIndex exists, so we cannot upgrade an existing
1463 // non-unique index in place — reject it cleanly.
1464 if self.catalog.has_index(table, column) {
1465 return Err(QueryError::Execution(format!(
1466 "cannot add unique on {table}.{column}: column already indexed"
1467 )));
1468 }
1469 // Scan existing rows for duplicate (non-null) values
1470 // before creating the unique index.
1471 {
1472 let tbl = self
1473 .catalog
1474 .get_table(table)
1475 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1476 let col_idx = tbl.schema.column_index(column).ok_or_else(|| {
1477 QueryError::ColumnNotFound {
1478 table: table.to_string(),
1479 column: column.clone(),
1480 }
1481 })?;
1482 let mut seen = std::collections::HashSet::new();
1483 for (_, row) in tbl.scan() {
1484 let v = &row[col_idx];
1485 if v.is_empty() {
1486 continue;
1487 }
1488 if !seen.insert(v.clone()) {
1489 return Err(QueryError::Execution(format!(
1490 "cannot add unique on {table}.{column}: \
1491 duplicate value {v:?} exists"
1492 )));
1493 }
1494 }
1495 }
1496 self.catalog
1497 .create_index_unique(table, column, true)
1498 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1499 Ok(QueryResult::Executed {
1500 message: format!("unique index on '{table}.{column}' created"),
1501 })
1502 }
1503 },
1504
1505 PlanNode::DropTable { name } => {
1506 self.catalog
1507 .drop_table(name)
1508 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1509 Ok(QueryResult::Executed {
1510 message: format!("table '{name}' dropped"),
1511 })
1512 }
1513
1514 PlanNode::CreateView { name, query_text } => {
1515 self.create_view(name, query_text)?;
1516 Ok(QueryResult::Executed {
1517 message: format!("materialized view '{name}' created"),
1518 })
1519 }
1520
1521 PlanNode::RefreshView { name } => {
1522 self.refresh_view(name)?;
1523 Ok(QueryResult::Executed {
1524 message: format!("materialized view '{name}' refreshed"),
1525 })
1526 }
1527
1528 PlanNode::DropView { name } => {
1529 self.drop_view(name)?;
1530 Ok(QueryResult::Executed {
1531 message: format!("materialized view '{name}' dropped"),
1532 })
1533 }
1534
1535 PlanNode::Window { input, windows } => {
1536 let result = self.execute_plan(input)?;
1537 execute_window(result, windows)
1538 }
1539
1540 PlanNode::Union { left, right, all } => {
1541 let left_result = self.execute_plan(left)?;
1542 let right_result = self.execute_plan(right)?;
1543 let (left_cols, left_rows) = match left_result {
1544 QueryResult::Rows { columns, rows } => (columns, rows),
1545 _ => return Err("UNION requires query results on left side".into()),
1546 };
1547 let (_, right_rows) = match right_result {
1548 QueryResult::Rows { columns, rows } => (columns, rows),
1549 _ => return Err("UNION requires query results on right side".into()),
1550 };
1551 let mut combined = left_rows;
1552 if *all {
1553 // UNION ALL — just concatenate.
1554 combined.extend(right_rows);
1555 } else {
1556 // UNION — deduplicate using the same HashSet approach
1557 // as DISTINCT. Value already implements Hash + Eq.
1558 let mut seen = std::collections::HashSet::new();
1559 for row in &combined {
1560 seen.insert(row.clone());
1561 }
1562 for row in right_rows {
1563 if seen.insert(row.clone()) {
1564 combined.push(row);
1565 }
1566 }
1567 }
1568 Ok(QueryResult::Rows {
1569 columns: left_cols,
1570 rows: combined,
1571 })
1572 }
1573
1574 PlanNode::Explain { input } => {
1575 let text = format_plan_tree(input, 0);
1576 Ok(QueryResult::Rows {
1577 columns: vec!["plan".to_string()],
1578 rows: text
1579 .lines()
1580 .map(|line| vec![Value::Str(line.to_string())])
1581 .collect(),
1582 })
1583 }
1584
1585 PlanNode::Begin => {
1586 if self.in_transaction {
1587 return Err(QueryError::Execution(
1588 "already in a transaction (nested transactions not supported)".into(),
1589 ));
1590 }
1591 self.in_transaction = true;
1592 Ok(QueryResult::Executed {
1593 message: "transaction started".to_string(),
1594 })
1595 }
1596
1597 PlanNode::Commit => {
1598 if !self.in_transaction {
1599 return Err(QueryError::Execution(
1600 "no active transaction to commit".into(),
1601 ));
1602 }
1603 self.in_transaction = false;
1604 self.catalog
1605 .sync_wal()
1606 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1607 Ok(QueryResult::Executed {
1608 message: "transaction committed".to_string(),
1609 })
1610 }
1611
1612 PlanNode::Rollback => {
1613 if !self.in_transaction {
1614 return Err(QueryError::Execution(
1615 "no active transaction to roll back".into(),
1616 ));
1617 }
1618 self.in_transaction = false;
1619 self.catalog
1620 .rollback_to_last_sync()
1621 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1622 if let Ok(mut cache) = self.plan_cache.lock() {
1623 cache.clear();
1624 }
1625 self.view_registry = ViewRegistry::open(self.catalog.data_dir())
1626 .unwrap_or_else(|_| ViewRegistry::new(self.catalog.data_dir()));
1627 Ok(QueryResult::Executed {
1628 message: "transaction rolled back".to_string(),
1629 })
1630 }
1631
1632 PlanNode::IndexScan { table, column, key } => {
1633 let key_value = literal_to_value(key)?;
1634 let tbl = self
1635 .catalog
1636 .get_table(table)
1637 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1638 let columns: Vec<String> =
1639 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1640
1641 // Fast path: the table has a B-tree on this column.
1642 // Uses index_lookup_all to return ALL matching rows for
1643 // both unique and non-unique indexes.
1644 if tbl.has_index(column) {
1645 let rids = tbl.index_lookup_all(column, &key_value);
1646 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1647 for rid in rids {
1648 if let Some(data) = tbl.heap.get(rid) {
1649 rows.push(decode_row(&tbl.schema, &data));
1650 }
1651 }
1652 return Ok(QueryResult::Rows { columns, rows });
1653 }
1654
1655 // Fallback: no index on this column. The planner emits IndexScan
1656 // eagerly (it has no visibility into which columns are indexed
1657 // at plan time), so here we must behave like SeqScan+Filter on
1658 // `.col = literal`: return *all* matching rows, not just the
1659 // first one. A non-indexed column isn't necessarily unique.
1660 // We compile the eq predicate once and stream without any
1661 // per-row decode for non-matching rows.
1662 let schema = &tbl.schema;
1663 let fast = FastLayout::new(schema);
1664 let synth_pred = Expr::BinaryOp(
1665 Box::new(Expr::Field(column.clone())),
1666 BinOp::Eq,
1667 Box::new(key.clone()),
1668 );
1669 if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1670 // Mission F: skip the first 4 Vec doublings.
1671 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1672 self.catalog
1673 .for_each_row_raw(table, |_rid, data| {
1674 if compiled(data) {
1675 rows.push(decode_row(schema, data));
1676 }
1677 })
1678 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1679 return Ok(QueryResult::Rows { columns, rows });
1680 }
1681
1682 // Last resort: slow eq-check on materialised rows.
1683 let col_idx =
1684 schema
1685 .column_index(column)
1686 .ok_or_else(|| QueryError::ColumnNotFound {
1687 table: String::new(),
1688 column: column.clone(),
1689 })?;
1690 let rows: Vec<Vec<Value>> = tbl
1691 .scan()
1692 .filter_map(|(_, row)| {
1693 if row[col_idx] == key_value {
1694 Some(row)
1695 } else {
1696 None
1697 }
1698 })
1699 .collect();
1700 Ok(QueryResult::Rows { columns, rows })
1701 }
1702
1703 PlanNode::RangeScan {
1704 table,
1705 column,
1706 start,
1707 end,
1708 } => {
1709 let tbl = self
1710 .catalog
1711 .get_table(table)
1712 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1713 let columns: Vec<String> =
1714 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1715 let schema = &tbl.schema;
1716
1717 let start_val = match start {
1718 Some((expr, _)) => Some(literal_to_value(expr)?),
1719 None => None,
1720 };
1721 let end_val = match end {
1722 Some((expr, _)) => Some(literal_to_value(expr)?),
1723 None => None,
1724 };
1725 let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1726 let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1727
1728 // Non-unique index: walk the composite (value, rid) leaf
1729 // chain between prefix bounds, fetch each row from the heap,
1730 // and recheck. The recheck enforces exclusive bounds
1731 // (range_rids is inclusive) and defensively skips any decoded
1732 // null (nulls are never indexed, so they must not match).
1733 if tbl.is_index_unique(column) == Some(false) {
1734 if let Some(btree) = tbl.index(column) {
1735 if start_val.is_some() || end_val.is_some() {
1736 let col_idx = schema.column_index(column).ok_or_else(|| {
1737 QueryError::ColumnNotFound {
1738 table: String::new(),
1739 column: column.clone(),
1740 }
1741 })?;
1742 let rids = btree.range_rids(start_val.as_ref(), end_val.as_ref());
1743 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1744 for rid in rids {
1745 if let Some(data) = tbl.heap.get(rid) {
1746 let row = decode_row(schema, &data);
1747 if !row[col_idx].is_empty()
1748 && range_matches(
1749 &row[col_idx],
1750 &start_val,
1751 start_inclusive,
1752 &end_val,
1753 end_inclusive,
1754 )
1755 {
1756 rows.push(row);
1757 }
1758 }
1759 }
1760 return Ok(QueryResult::Rows { columns, rows });
1761 }
1762 }
1763 }
1764
1765 // Range scans use the btree fast path for unique indexes,
1766 // walking raw column-value keys directly.
1767 if tbl.is_index_unique(column) == Some(true) {
1768 if let Some(btree) = tbl.index(column) {
1769 let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1770 (Some(s), Some(e)) => btree.range(s, e).collect(),
1771 (Some(s), None) => btree.range_from(s),
1772 (None, Some(e)) => btree.range_to(e),
1773 (None, None) => {
1774 let rows: Vec<Vec<Value>> =
1775 tbl.scan().map(|(_, row)| row).collect();
1776 return Ok(QueryResult::Rows { columns, rows });
1777 }
1778 };
1779 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1780 for (key, rid) in hits {
1781 if !start_inclusive {
1782 if let Some(ref s) = start_val {
1783 if &key == s {
1784 continue;
1785 }
1786 }
1787 }
1788 if !end_inclusive {
1789 if let Some(ref e) = end_val {
1790 if &key == e {
1791 continue;
1792 }
1793 }
1794 }
1795 if let Some(data) = tbl.heap.get(rid) {
1796 rows.push(decode_row(schema, &data));
1797 }
1798 }
1799 return Ok(QueryResult::Rows { columns, rows });
1800 }
1801 }
1802
1803 // Fallback: no index — synthesize range predicate and scan.
1804 let fast = FastLayout::new(schema);
1805 let synth = synthesize_range_predicate(column, start, end);
1806 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1807 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1808 self.catalog
1809 .for_each_row_raw(table, |_rid, data| {
1810 if compiled(data) {
1811 rows.push(decode_row(schema, data));
1812 }
1813 })
1814 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1815 return Ok(QueryResult::Rows { columns, rows });
1816 }
1817
1818 let col_idx =
1819 schema
1820 .column_index(column)
1821 .ok_or_else(|| QueryError::ColumnNotFound {
1822 table: String::new(),
1823 column: column.clone(),
1824 })?;
1825 let rows: Vec<Vec<Value>> = tbl
1826 .scan()
1827 .filter(|(_, row)| {
1828 range_matches(
1829 &row[col_idx],
1830 &start_val,
1831 start_inclusive,
1832 &end_val,
1833 end_inclusive,
1834 )
1835 })
1836 .map(|(_, row)| row)
1837 .collect();
1838 Ok(QueryResult::Rows { columns, rows })
1839 }
1840 }
1841 }
1842
1843 // ─── Materialized view operations ──────────────────────────────────────
1844
1845 /// Create a materialized view: execute the source query, store results
1846 /// in a new backing table, and register the view.
1847 fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1848 if self.view_registry.is_view(name) {
1849 return Err(QueryError::ViewError(format!(
1850 "materialized view '{name}' already exists"
1851 )));
1852 }
1853 // Execute the source query to get the result set.
1854 let result = self.execute_powql(query_text)?;
1855 let (columns, rows) = match result {
1856 QueryResult::Rows { columns, rows } => (columns, rows),
1857 _ => return Err("view source query must be a SELECT".into()),
1858 };
1859 // Derive a schema for the backing table from the query result columns.
1860 let schema = self.derive_view_schema(name, &columns, &rows);
1861 // Create the backing table and insert the result rows.
1862 self.catalog
1863 .create_table(schema)
1864 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1865 for row in &rows {
1866 self.catalog
1867 .insert(name, row)
1868 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1869 }
1870 // Determine which base tables this view depends on by parsing the query.
1871 let depends_on = self.extract_view_deps(query_text);
1872 self.view_registry
1873 .register(ViewDef {
1874 name: name.to_string(),
1875 query: query_text.to_string(),
1876 depends_on,
1877 dirty: false,
1878 })
1879 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1880 Ok(())
1881 }
1882
1883 /// Refresh a materialized view: re-execute its source query and replace
1884 /// the backing table's contents.
1885 fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1886 let def = self
1887 .view_registry
1888 .get(name)
1889 .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1890 let query_text = def.query.clone();
1891 // Execute the source query.
1892 let result = self.execute_powql(&query_text)?;
1893 let (_columns, rows) = match result {
1894 QueryResult::Rows { columns, rows } => (columns, rows),
1895 _ => return Err("view source query must be a SELECT".into()),
1896 };
1897 // Clear old data and insert fresh results. Mission B2: logged
1898 // variant — view refreshes are a mutation and crash recovery
1899 // must see them.
1900 self.catalog
1901 .scan_delete_matching_logged(name, |_| true)
1902 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1903 for row in &rows {
1904 self.catalog
1905 .insert(name, row)
1906 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1907 }
1908 self.view_registry.mark_clean(name);
1909 Ok(())
1910 }
1911
1912 /// Drop a materialized view: remove the backing table and unregister.
1913 fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1914 if !self.view_registry.is_view(name) {
1915 return Err(QueryError::ViewError(format!(
1916 "materialized view '{name}' not found"
1917 )));
1918 }
1919 self.view_registry
1920 .unregister(name)
1921 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1922 self.catalog
1923 .drop_table(name)
1924 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1925 Ok(())
1926 }
1927
1928 /// Derive a storage `Schema` for a view's backing table from query
1929 /// result column names and the first row's types.
1930 fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1931 use powdb_storage::types::{ColumnDef, TypeId};
1932 let cols: Vec<ColumnDef> = columns
1933 .iter()
1934 .enumerate()
1935 .map(|(i, col_name)| {
1936 let type_id = rows
1937 .first()
1938 .and_then(|row| row.get(i))
1939 .map(|v| v.type_id())
1940 .unwrap_or(TypeId::Str);
1941 ColumnDef {
1942 name: col_name.clone(),
1943 type_id,
1944 required: false,
1945 position: i as u16,
1946 }
1947 })
1948 .collect();
1949 Schema {
1950 table_name: name.to_string(),
1951 columns: cols,
1952 }
1953 }
1954
1955 /// Extract base table dependencies from a view's source query by
1956 /// parsing it and collecting the source table name.
1957 fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1958 use crate::parser::parse;
1959 match parse(query_text) {
1960 Ok(Statement::Query(q)) => {
1961 let mut deps = vec![q.source.clone()];
1962 for j in &q.joins {
1963 deps.push(j.source.clone());
1964 }
1965 deps
1966 }
1967 _ => Vec::new(),
1968 }
1969 }
1970
1971 // ─── Specialized fast paths ─────────────────────────────────────────────
1972 //
1973 // These methods are helpers for the `execute_plan` match arms above.
1974 // Each returns `Ok(Some(result))` when the fast path fires, `Ok(None)`
1975 // when the shape isn't supported (caller falls back to generic code).
1976
1977 /// Aggregate sum/avg/min/max over a single fixed-size i64 column, with
1978 /// an optional compiled filter predicate. Walks raw row bytes — zero
1979 /// per-row allocation. Uses i128 accumulator for sum/avg overflow safety.
1980 pub(super) fn agg_single_col_fast(
1981 &self,
1982 table: &str,
1983 col: &str,
1984 function: AggFunc,
1985 predicate: Option<&Expr>,
1986 ) -> Result<Option<QueryResult>, QueryError> {
1987 let schema = self
1988 .catalog
1989 .schema(table)
1990 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1991 .clone();
1992 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1993 let col_idx = match schema.column_index(col) {
1994 Some(i) => i,
1995 None => return Ok(None),
1996 };
1997 // Only fast-path fixed-size numeric columns (Int/Float) for
1998 // sum/avg/min/max/count. Mission D10: Float parity — prior version
1999 // bailed on Float columns, forcing them through the generic row-
2000 // decoding path that allocated a Vec<Value> per row and dispatched
2001 // on Value::cmp for every compare. f64 decode is structurally the
2002 // same as i64 (load 8 bytes, cast), so the fast path handles both.
2003 let col_type = schema.columns[col_idx].type_id;
2004 if col_type != TypeId::Int && col_type != TypeId::Float {
2005 return Ok(None);
2006 }
2007
2008 let fast = FastLayout::new(&schema);
2009 // Mission C Phase 20b: inline the numeric-column reader instead of
2010 // building a `Box<dyn Fn>`. Eliminates 100K vtable dispatches per
2011 // 100K-row agg scan — every reader call folds directly into the
2012 // hot loop below.
2013 let byte_offset = match fast.fixed_offsets[col_idx] {
2014 Some(o) => o,
2015 None => return Ok(None),
2016 };
2017 let bitmap_byte = col_idx / 8;
2018 let bitmap_bit = (col_idx % 8) as u32;
2019 let data_offset = 2 + fast.bitmap_size + byte_offset;
2020
2021 // Optional compiled filter.
2022 let compiled_pred: Option<CompiledPredicate> = match predicate {
2023 Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
2024 Some(c) => Some(c),
2025 None => return Ok(None), // let generic path handle it
2026 },
2027 None => None,
2028 };
2029
2030 // Mission C Phase 20b: specialize the inner loop per aggregate
2031 // function. The previous version ran a `match function { ... }`
2032 // *inside* the closure, which kept LLVM from producing optimal
2033 // scalar code for each variant (agg_max regressed ~23% vs the
2034 // baseline Box<dyn Fn> version even though per-row vtable cost
2035 // should have been strictly lower). Pushing the match out of the
2036 // hot loop lets each specialized body fold cleanly into
2037 // `for_each_row_raw` and removes a captured `AggFunc` + match
2038 // dispatch per row.
2039 //
2040 // Mission D10: same specialisation applies to the Float branch.
2041 // For Min/Max we use `f64::total_cmp` so the result matches
2042 // `Value::Ord` — this is the same ordering ORDER BY and the
2043 // top-N sort fast path use, keeping semantics consistent across
2044 // read paths (NaN compares as greatest, -0.0 < +0.0 for
2045 // deterministic tie-breaking).
2046 //
2047 // Mission D11 Phase 1: each inner loop now splits on presence of
2048 // a predicate (`if let Some(pred) = &compiled_pred`) so the hot
2049 // body never re-tests `Option` per row, and reads column bytes
2050 // via `read_i64_unchecked` / `read_f64_unchecked` helpers that
2051 // drop two bounds checks per row (null bitmap byte + value
2052 // slice). Safety is carried by the `FastLayout` invariant that
2053 // `data_offset + 8 <= row_len` for any fixed-size column; see
2054 // the helper doc comments. Hot loops are macro-generated so the
2055 // with-pred / no-pred split can't drift between variants.
2056 let result = match col_type {
2057 TypeId::Int => match function {
2058 AggFunc::Sum | AggFunc::Avg => {
2059 let mut sum_i128: i128 = 0;
2060 let mut count: i64 = 0;
2061 agg_int_loop!(
2062 self,
2063 table,
2064 compiled_pred,
2065 bitmap_byte,
2066 bitmap_bit,
2067 data_offset,
2068 |v: i64| {
2069 count += 1;
2070 sum_i128 += v as i128;
2071 }
2072 );
2073 if matches!(function, AggFunc::Sum) {
2074 let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
2075 QueryResult::Scalar(Value::Int(clamped))
2076 } else if count == 0 {
2077 QueryResult::Scalar(Value::Empty)
2078 } else {
2079 let avg = (sum_i128 as f64) / (count as f64);
2080 QueryResult::Scalar(Value::Float(avg))
2081 }
2082 }
2083 AggFunc::Min => {
2084 let mut min_v: Option<i64> = None;
2085 agg_int_loop!(
2086 self,
2087 table,
2088 compiled_pred,
2089 bitmap_byte,
2090 bitmap_bit,
2091 data_offset,
2092 |v: i64| {
2093 min_v = Some(match min_v {
2094 Some(m) => m.min(v),
2095 None => v,
2096 });
2097 }
2098 );
2099 QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
2100 }
2101 AggFunc::Max => {
2102 let mut max_v: Option<i64> = None;
2103 agg_int_loop!(
2104 self,
2105 table,
2106 compiled_pred,
2107 bitmap_byte,
2108 bitmap_bit,
2109 data_offset,
2110 |v: i64| {
2111 max_v = Some(match max_v {
2112 Some(m) => m.max(v),
2113 None => v,
2114 });
2115 }
2116 );
2117 QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
2118 }
2119 AggFunc::Count => {
2120 let mut count: i64 = 0;
2121 agg_int_loop!(
2122 self,
2123 table,
2124 compiled_pred,
2125 bitmap_byte,
2126 bitmap_bit,
2127 data_offset,
2128 |_v: i64| {
2129 count += 1;
2130 }
2131 );
2132 QueryResult::Scalar(Value::Int(count))
2133 }
2134 AggFunc::CountDistinct => {
2135 let mut seen = rustc_hash::FxHashSet::default();
2136 agg_int_loop!(
2137 self,
2138 table,
2139 compiled_pred,
2140 bitmap_byte,
2141 bitmap_bit,
2142 data_offset,
2143 |v: i64| {
2144 seen.insert(v);
2145 }
2146 );
2147 QueryResult::Scalar(Value::Int(seen.len() as i64))
2148 }
2149 },
2150 TypeId::Float => match function {
2151 AggFunc::Sum => {
2152 // Use a single f64 accumulator. Naive summation is
2153 // sufficient for MVP parity; if precision becomes an
2154 // issue on long scans we can upgrade to Kahan–Neumaier
2155 // compensated sum (~2x scalar cost, zero error growth).
2156 let mut sum: f64 = 0.0;
2157 agg_float_loop!(
2158 self,
2159 table,
2160 compiled_pred,
2161 bitmap_byte,
2162 bitmap_bit,
2163 data_offset,
2164 |v: f64| {
2165 sum += v;
2166 }
2167 );
2168 QueryResult::Scalar(Value::Float(sum))
2169 }
2170 AggFunc::Avg => {
2171 let mut sum: f64 = 0.0;
2172 let mut count: i64 = 0;
2173 agg_float_loop!(
2174 self,
2175 table,
2176 compiled_pred,
2177 bitmap_byte,
2178 bitmap_bit,
2179 data_offset,
2180 |v: f64| {
2181 sum += v;
2182 count += 1;
2183 }
2184 );
2185 if count == 0 {
2186 QueryResult::Scalar(Value::Empty)
2187 } else {
2188 QueryResult::Scalar(Value::Float(sum / count as f64))
2189 }
2190 }
2191 AggFunc::Min => {
2192 // `total_cmp` for deterministic NaN handling (matches
2193 // Value::Ord). NaN compares greatest, so Min will
2194 // correctly ignore it in favour of any finite value.
2195 let mut min_v: Option<f64> = None;
2196 agg_float_loop!(
2197 self,
2198 table,
2199 compiled_pred,
2200 bitmap_byte,
2201 bitmap_bit,
2202 data_offset,
2203 |v: f64| {
2204 min_v = Some(match min_v {
2205 Some(m) => {
2206 if v.total_cmp(&m).is_lt() {
2207 v
2208 } else {
2209 m
2210 }
2211 }
2212 None => v,
2213 });
2214 }
2215 );
2216 QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2217 }
2218 AggFunc::Max => {
2219 let mut max_v: Option<f64> = None;
2220 agg_float_loop!(
2221 self,
2222 table,
2223 compiled_pred,
2224 bitmap_byte,
2225 bitmap_bit,
2226 data_offset,
2227 |v: f64| {
2228 max_v = Some(match max_v {
2229 Some(m) => {
2230 if v.total_cmp(&m).is_gt() {
2231 v
2232 } else {
2233 m
2234 }
2235 }
2236 None => v,
2237 });
2238 }
2239 );
2240 QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2241 }
2242 AggFunc::Count => {
2243 let mut count: i64 = 0;
2244 agg_float_loop!(
2245 self,
2246 table,
2247 compiled_pred,
2248 bitmap_byte,
2249 bitmap_bit,
2250 data_offset,
2251 |_v: f64| {
2252 count += 1;
2253 }
2254 );
2255 QueryResult::Scalar(Value::Int(count))
2256 }
2257 AggFunc::CountDistinct => {
2258 // Hash on `f64::to_bits` — matches `Value::Hash`, so
2259 // distinct NaN bit patterns count as distinct and
2260 // -0.0/+0.0 count as distinct. Consistent with how
2261 // Float values are hashed in every other DISTINCT /
2262 // GROUP BY path.
2263 let mut seen = rustc_hash::FxHashSet::default();
2264 agg_float_loop!(
2265 self,
2266 table,
2267 compiled_pred,
2268 bitmap_byte,
2269 bitmap_bit,
2270 data_offset,
2271 |v: f64| {
2272 seen.insert(v.to_bits());
2273 }
2274 );
2275 QueryResult::Scalar(Value::Int(seen.len() as i64))
2276 }
2277 },
2278 _ => unreachable!("type guard above restricts to Int/Float"),
2279 };
2280 Ok(Some(result))
2281 }
2282
2283 /// `Project(Limit(Filter(SeqScan)))` and `Project(Limit(SeqScan))`.
2284 /// Streams rows, decodes only projected columns, stops at the limit.
2285 pub(super) fn project_filter_limit_fast(
2286 &self,
2287 table: &str,
2288 fields: &[ProjectField],
2289 limit: usize,
2290 predicate: Option<&Expr>,
2291 ) -> Result<Option<QueryResult>, QueryError> {
2292 let schema = self
2293 .catalog
2294 .schema(table)
2295 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2296 .clone();
2297 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2298
2299 // Each projection field must be a simple `.field` reference for this
2300 // fast path. Aliased or computed fields fall through.
2301 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2302 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2303 for f in fields {
2304 let name = match &f.expr {
2305 Expr::Field(n) => n.clone(),
2306 _ => return Ok(None),
2307 };
2308 let idx = match all_columns.iter().position(|c| c == &name) {
2309 Some(i) => i,
2310 None => return Ok(None),
2311 };
2312 proj_indices.push(idx);
2313 proj_columns.push(f.alias.clone().unwrap_or(name));
2314 }
2315
2316 let fast = FastLayout::new(&schema);
2317 let row_layout = RowLayout::new(&schema);
2318
2319 let compiled_pred: Option<CompiledPredicate> = match predicate {
2320 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2321 Some(c) => Some(c),
2322 None => return Ok(None),
2323 },
2324 None => None,
2325 };
2326
2327 let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2328 // Mission D2: use try_for_each_row_raw to actually stop iterating
2329 // once the limit is reached. The previous `done` flag only short-
2330 // circuited the closure body, so a `limit 100` over 100K rows still
2331 // walked all 100K slots — burning ~30x SQLite on scan_filter_project_top100.
2332 self.catalog
2333 .try_for_each_row_raw(table, |_rid, data| {
2334 use std::ops::ControlFlow;
2335 if let Some(ref pred) = compiled_pred {
2336 if !pred(data) {
2337 return ControlFlow::Continue(());
2338 }
2339 }
2340 let row: Vec<Value> = proj_indices
2341 .iter()
2342 .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2343 .collect();
2344 out.push(row);
2345 if out.len() >= limit {
2346 ControlFlow::Break(())
2347 } else {
2348 ControlFlow::Continue(())
2349 }
2350 })
2351 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2352
2353 Ok(Some(QueryResult::Rows {
2354 columns: proj_columns,
2355 rows: out,
2356 }))
2357 }
2358
2359 /// `Project(Limit(Sort(Filter(SeqScan))))` and `Project(Limit(Sort(SeqScan)))`.
2360 /// Bounded top-N heap over the sort key. Only the sort key needs to be
2361 /// read per row; projected columns are decoded only for the final
2362 /// winning rows when the heap drains.
2363 pub(super) fn project_filter_sort_limit_fast(
2364 &self,
2365 table: &str,
2366 fields: &[ProjectField],
2367 sort_field: &str,
2368 descending: bool,
2369 limit: usize,
2370 predicate: Option<&Expr>,
2371 ) -> Result<Option<QueryResult>, QueryError> {
2372 if limit == 0 {
2373 // Degenerate case — empty result. Let the generic path handle it
2374 // for proper column naming.
2375 return Ok(None);
2376 }
2377 // The top-N heaps never hold more than `limit` rows, but `limit` is an
2378 // attacker-supplied literal (`order .x limit 99999999999`). Reserving
2379 // that capacity up front would allocate gigabytes and abort the
2380 // process before a single row is read. Cap the pre-allocation; the
2381 // heaps still grow on demand up to the true `limit`.
2382 const TOPN_PREALLOC_CAP: usize = 4096;
2383 let prealloc = limit.min(TOPN_PREALLOC_CAP);
2384 let schema = self
2385 .catalog
2386 .schema(table)
2387 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2388 .clone();
2389 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2390
2391 // Sort key must be a fixed-size numeric column (Int or Float).
2392 // Mission D10: extended from Int-only. Float sort keys use a
2393 // sortable-u64 transform (see `f64_to_sortable_u64`) so the heap
2394 // path stays keyed on `u64` and the whole branch shape is
2395 // identical to the Int case — no new heap types, no `total_cmp`
2396 // closures in the hot loop.
2397 let sort_idx = match schema.column_index(sort_field) {
2398 Some(i) => i,
2399 None => return Ok(None),
2400 };
2401 let sort_col_type = schema.columns[sort_idx].type_id;
2402 if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2403 return Ok(None);
2404 }
2405
2406 // Each projection field must be a simple `.field`.
2407 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2408 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2409 for f in fields {
2410 let name = match &f.expr {
2411 Expr::Field(n) => n.clone(),
2412 _ => return Ok(None),
2413 };
2414 let idx = match all_columns.iter().position(|c| c == &name) {
2415 Some(i) => i,
2416 None => return Ok(None),
2417 };
2418 proj_indices.push(idx);
2419 proj_columns.push(f.alias.clone().unwrap_or(name));
2420 }
2421
2422 let fast = FastLayout::new(&schema);
2423 let row_layout = RowLayout::new(&schema);
2424 // Mission C Phase 20b: inline numeric-column reader (no Box<dyn Fn>).
2425 let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2426 Some(o) => o,
2427 None => return Ok(None),
2428 };
2429 let sort_bitmap_byte = sort_idx / 8;
2430 let sort_bitmap_bit = (sort_idx % 8) as u32;
2431 let sort_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2432
2433 let compiled_pred: Option<CompiledPredicate> = match predicate {
2434 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2435 Some(c) => Some(c),
2436 None => return Ok(None),
2437 },
2438 None => None,
2439 };
2440
2441 // Bounded top-N heap. For `order .x desc limit N`, we want the N
2442 // largest values — use a min-heap so the smallest is at the top and
2443 // can be popped when a better candidate arrives. For ascending, use
2444 // a max-heap. We tie-break with a monotonic `seq` counter so the
2445 // result is deterministic and stable.
2446 //
2447 // To keep this simple we maintain two typed heaps and pick by
2448 // direction.
2449 let drained: Vec<Vec<u8>> = match sort_col_type {
2450 TypeId::Int => {
2451 let mut seq: u64 = 0;
2452 let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2453 BinaryHeap::with_capacity(prealloc);
2454 let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2455 BinaryHeap::with_capacity(prealloc);
2456
2457 self.catalog
2458 .for_each_row_raw(table, |_rid, data| {
2459 if let Some(ref pred) = compiled_pred {
2460 if !pred(data) {
2461 return;
2462 }
2463 }
2464 // Inlined int-column reader: null check + i64 decode.
2465 if data.len() < sort_data_offset + 8 {
2466 return;
2467 }
2468 let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2469 if is_null {
2470 return;
2471 }
2472 let key = i64::from_le_bytes(
2473 data[sort_data_offset..sort_data_offset + 8]
2474 .try_into()
2475 .unwrap_or_else(|_| unreachable!()),
2476 );
2477 let id = seq;
2478 seq += 1;
2479
2480 if descending {
2481 if heap_desc.len() < limit {
2482 heap_desc.push(Reverse((key, id, data.to_vec())));
2483 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2484 if key > *top_key {
2485 heap_desc.pop();
2486 heap_desc.push(Reverse((key, id, data.to_vec())));
2487 }
2488 }
2489 } else if heap_asc.len() < limit {
2490 heap_asc.push((key, id, data.to_vec()));
2491 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2492 if key < *top_key {
2493 heap_asc.pop();
2494 heap_asc.push((key, id, data.to_vec()));
2495 }
2496 }
2497 })
2498 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2499
2500 let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2501 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2502 } else {
2503 heap_asc.into_iter().collect()
2504 };
2505 if descending {
2506 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2507 } else {
2508 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2509 }
2510 drained.into_iter().map(|(_, _, d)| d).collect()
2511 }
2512 TypeId::Float => {
2513 // Novel angle: rather than introducing a `TotalF64` newtype
2514 // with `Ord via total_cmp`, transform the f64 bit pattern
2515 // into a sortable `u64` so `BinaryHeap<u64>` orders exactly
2516 // like `f64::total_cmp` would. Classic trick: flip the sign
2517 // bit on positives, flip all bits on negatives. Result:
2518 // - NaN (sign=0) stays greatest, matching total_cmp
2519 // - -0.0 sorts before +0.0, matching total_cmp
2520 // - Hot loop is branch-cheap (one compare + one xor)
2521 let mut seq: u64 = 0;
2522 let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2523 BinaryHeap::with_capacity(prealloc);
2524 let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2525 BinaryHeap::with_capacity(prealloc);
2526
2527 self.catalog
2528 .for_each_row_raw(table, |_rid, data| {
2529 if let Some(ref pred) = compiled_pred {
2530 if !pred(data) {
2531 return;
2532 }
2533 }
2534 if data.len() < sort_data_offset + 8 {
2535 return;
2536 }
2537 let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2538 if is_null {
2539 return;
2540 }
2541 let bits = u64::from_le_bytes(
2542 data[sort_data_offset..sort_data_offset + 8]
2543 .try_into()
2544 .unwrap_or_else(|_| unreachable!()),
2545 );
2546 let key = f64_bits_to_sortable_u64(bits);
2547 let id = seq;
2548 seq += 1;
2549
2550 if descending {
2551 if heap_desc.len() < limit {
2552 heap_desc.push(Reverse((key, id, data.to_vec())));
2553 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2554 if key > *top_key {
2555 heap_desc.pop();
2556 heap_desc.push(Reverse((key, id, data.to_vec())));
2557 }
2558 }
2559 } else if heap_asc.len() < limit {
2560 heap_asc.push((key, id, data.to_vec()));
2561 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2562 if key < *top_key {
2563 heap_asc.pop();
2564 heap_asc.push((key, id, data.to_vec()));
2565 }
2566 }
2567 })
2568 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2569
2570 let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2571 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2572 } else {
2573 heap_asc.into_iter().collect()
2574 };
2575 if descending {
2576 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2577 } else {
2578 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2579 }
2580 drained.into_iter().map(|(_, _, d)| d).collect()
2581 }
2582 _ => unreachable!("type guard above restricts to Int/Float"),
2583 };
2584
2585 let rows: Vec<Vec<Value>> = drained
2586 .into_iter()
2587 .map(|data| {
2588 proj_indices
2589 .iter()
2590 .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2591 .collect()
2592 })
2593 .collect();
2594
2595 Ok(Some(QueryResult::Rows {
2596 columns: proj_columns,
2597 rows,
2598 }))
2599 }
2600
2601 /// Gather the RowIds that a mutation should operate on, without
2602 /// materialising the full row set. Handles the shapes the planner emits
2603 /// for update/delete: SeqScan, IndexScan, and Filter(SeqScan). Other
2604 /// shapes fall back to `generic_rid_match`.
2605 ///
2606 /// Perf sprint: try to fuse the predicate evaluation and in-place
2607 /// byte-level mutation into a single heap walk. Returns `Some(result)`
2608 /// if the fused path fired, `None` to fall through to the generic
2609 /// two-pass code.
2610 ///
2611 /// Covers two shapes:
2612 /// 1. Fixed-width non-null literal assignments on non-indexed columns
2613 /// → byte-patch every matched row in place (row length unchanged).
2614 /// 2. Single var-col literal assignment on a non-indexed column
2615 /// → `patch_var_column_in_place` on every matched row (may shrink);
2616 /// rows that can't be patched in place are collected for fallback.
2617 fn try_fused_scan_update(
2618 &mut self,
2619 table: &str,
2620 predicate: &Expr,
2621 resolved: &[(usize, Value)],
2622 changed_cols: &[usize],
2623 ) -> Option<Result<QueryResult, QueryError>> {
2624 // Build compiled predicate. Requires a schema borrow that must be
2625 // dropped before we call scan_patch_matching_logged.
2626 let compiled = {
2627 let schema = self.catalog.schema(table)?;
2628 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2629 let fast = FastLayout::new(schema);
2630 compile_predicate(predicate, &columns, &fast, schema)?
2631 };
2632
2633 // ── Path 1: fixed-width fast patch ──────────────────────────
2634 let fixed_patches: Option<Vec<FastPatch>> = {
2635 let tbl = self.catalog.get_table(table)?;
2636 let schema = &tbl.schema;
2637 let all_fixed_nonnull = resolved
2638 .iter()
2639 .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2640 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2641 if all_fixed_nonnull && no_indexed {
2642 let layout = RowLayout::new(schema);
2643 let bitmap_size = layout.bitmap_size();
2644 Some(
2645 resolved
2646 .iter()
2647 .map(|(idx, val)| {
2648 let fixed_off = layout
2649 .fixed_offset(*idx)
2650 .expect("is_fixed_size already checked");
2651 let field_off = 2 + bitmap_size + fixed_off;
2652 let bytes: FixedBytes = match val {
2653 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2654 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2655 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2656 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2657 Value::Uuid(v) => FixedBytes::Uuid(*v),
2658 _ => unreachable!("all_fixed_nonnull guard"),
2659 };
2660 FastPatch {
2661 field_off,
2662 bitmap_byte_off: 2 + idx / 8,
2663 bit_mask: 1u8 << (idx % 8),
2664 bytes,
2665 }
2666 })
2667 .collect(),
2668 )
2669 } else {
2670 None
2671 }
2672 };
2673 if let Some(patches) = fixed_patches {
2674 let result = self
2675 .catalog
2676 .scan_patch_matching_logged(table, compiled, |row| {
2677 for p in &patches {
2678 row[p.bitmap_byte_off] &= !p.bit_mask;
2679 let field_bytes = p.bytes.as_slice();
2680 row[p.field_off..p.field_off + field_bytes.len()]
2681 .copy_from_slice(field_bytes);
2682 }
2683 Some(row.len() as u16)
2684 })
2685 .map_err(|e| e.to_string());
2686 match result {
2687 Ok((count, _)) => {
2688 self.view_registry.mark_dependents_dirty(table);
2689 return Some(Ok(QueryResult::Modified(count)));
2690 }
2691 Err(e) => return Some(Err(QueryError::Execution(e))),
2692 }
2693 }
2694
2695 // ── Path 2: single var-col shrink fast patch ────────────────
2696 let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2697 let tbl = self.catalog.get_table(table)?;
2698 let schema = &tbl.schema;
2699 let is_single = resolved.len() == 1;
2700 let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2701 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2702 if is_single && is_var && no_indexed {
2703 let (idx, val) = &resolved[0];
2704 let bytes_opt = match val {
2705 Value::Str(s) => Some(s.as_bytes().to_vec()),
2706 Value::Bytes(b) => Some(b.clone()),
2707 Value::Empty => None,
2708 _ => return None, // type mismatch, fall through
2709 };
2710 Some((*idx, bytes_opt))
2711 } else {
2712 None
2713 }
2714 };
2715 if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2716 // Build a fresh RowLayout before the mutable borrow.
2717 let layout = {
2718 let schema = self.catalog.schema(table)?;
2719 RowLayout::new(schema)
2720 };
2721 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2722 let result = self
2723 .catalog
2724 .scan_patch_matching_logged(table, compiled, |row| {
2725 patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2726 })
2727 .map_err(|e| e.to_string());
2728 match result {
2729 Ok((mut count, fallback_rids)) => {
2730 // Handle rows where in-place patch failed (new > old).
2731 for rid in fallback_rids {
2732 let mut row = match self.catalog.get(table, rid) {
2733 Some(r) => r,
2734 None => continue,
2735 };
2736 for (idx, val) in resolved.iter() {
2737 row[*idx] = val.clone();
2738 }
2739 if let Err(e) =
2740 self.catalog
2741 .update_hinted(table, rid, &row, Some(changed_cols))
2742 {
2743 return Some(Err(QueryError::StorageError(e.to_string())));
2744 }
2745 count += 1;
2746 }
2747 self.view_registry.mark_dependents_dirty(table);
2748 return Some(Ok(QueryResult::Modified(count)));
2749 }
2750 Err(e) => return Some(Err(QueryError::Execution(e))),
2751 }
2752 }
2753
2754 None // no fused path applicable — fall through
2755 }
2756
2757 /// Mission C Phase 3: schema is looked up via `self.catalog.schema(table)`
2758 /// inside the branches that actually need it. Previously the caller had
2759 /// to clone the full Schema (6+ String allocs) before every mutation just
2760 /// so this function could borrow it — a cost the update/delete hot path
2761 /// did not need.
2762 fn collect_rids_for_mutation(
2763 &mut self,
2764 input: &PlanNode,
2765 table: &str,
2766 ) -> Result<Vec<RowId>, QueryError> {
2767 match input {
2768 PlanNode::SeqScan { table: t } if t == table => {
2769 // "Update/delete everything" — rare but legal.
2770 let rids: Vec<RowId> = self
2771 .catalog
2772 .scan(table)
2773 .map_err(|e| QueryError::StorageError(e.to_string()))?
2774 .map(|(rid, _)| rid)
2775 .collect();
2776 Ok(rids)
2777 }
2778 PlanNode::IndexScan {
2779 table: t,
2780 column,
2781 key,
2782 } if t == table => {
2783 let key_value = literal_to_value(key)?;
2784
2785 // Indexed case: single lookup, 0 or 1 rows.
2786 // Mission D7: int-specialized fast path on int-keyed indexes
2787 // (primary keys, created_at, etc.) — the common case for
2788 // `update_by_pk` / `delete where id = ?`.
2789 //
2790 // Scope the `tbl` borrow so it's released before we fall
2791 // through to the scan-based paths below (which reborrow
2792 // `self.catalog`).
2793 {
2794 let tbl = self
2795 .catalog
2796 .get_table(table)
2797 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2798 if tbl.has_index(column) {
2799 let rids = tbl.index_lookup_all(column, &key_value);
2800 return Ok(rids);
2801 }
2802 }
2803
2804 // No index: the planner folds `.col = literal` to IndexScan
2805 // regardless of whether the column is actually unique. When
2806 // there's no index we must behave like Filter(SeqScan) and
2807 // return *all* matching RIDs — not just the first one.
2808 let schema = self
2809 .catalog
2810 .schema(table)
2811 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2812 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2813 let fast = FastLayout::new(schema);
2814 let synth = Expr::BinaryOp(
2815 Box::new(Expr::Field(column.clone())),
2816 BinOp::Eq,
2817 Box::new(key.clone()),
2818 );
2819 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2820 // Mission F: skip the first 4 Vec doublings.
2821 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2822 self.catalog
2823 .for_each_row_raw(table, |rid, data| {
2824 if compiled(data) {
2825 rids.push(rid);
2826 }
2827 })
2828 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2829 return Ok(rids);
2830 }
2831
2832 // Fallback: decode each row, compare values.
2833 let col_idx =
2834 schema
2835 .column_index(column)
2836 .ok_or_else(|| QueryError::ColumnNotFound {
2837 table: String::new(),
2838 column: column.clone(),
2839 })?;
2840 let rids: Vec<RowId> = self
2841 .catalog
2842 .scan(table)
2843 .map_err(|e| QueryError::StorageError(e.to_string()))?
2844 .filter_map(|(rid, row)| {
2845 if row[col_idx] == key_value {
2846 Some(rid)
2847 } else {
2848 None
2849 }
2850 })
2851 .collect();
2852 Ok(rids)
2853 }
2854 PlanNode::Filter {
2855 input: inner,
2856 predicate,
2857 } => {
2858 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2859 if t != table {
2860 return self.generic_rid_match(input, table);
2861 }
2862 let schema = self
2863 .catalog
2864 .schema(table)
2865 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2866 let columns: Vec<String> =
2867 schema.columns.iter().map(|c| c.name.clone()).collect();
2868 let fast = FastLayout::new(schema);
2869 let row_layout = RowLayout::new(schema);
2870
2871 // Try compiled predicate first.
2872 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2873 // Mission F: skip the first 4 Vec doublings.
2874 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2875 self.catalog
2876 .for_each_row_raw(table, |rid, data| {
2877 if compiled(data) {
2878 rids.push(rid);
2879 }
2880 })
2881 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2882 return Ok(rids);
2883 }
2884
2885 // Fallback: selective decode + eval.
2886 let pred_cols = predicate_column_indices(predicate, &columns);
2887 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2888 self.catalog
2889 .for_each_row_raw(table, |rid, data| {
2890 let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2891 if eval_predicate(predicate, &pred_row, &columns) {
2892 rids.push(rid);
2893 }
2894 })
2895 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2896 return Ok(rids);
2897 }
2898 self.generic_rid_match(input, table)
2899 }
2900 _ => self.generic_rid_match(input, table),
2901 }
2902 }
2903
2904 /// Last-ditch generic match: execute the plan, collect matching rows,
2905 /// then find corresponding RowIds by value equality. This is the old
2906 /// O(N*M) code path; only used when the plan shape is something exotic.
2907 fn generic_rid_match(
2908 &mut self,
2909 input: &PlanNode,
2910 table: &str,
2911 ) -> Result<Vec<RowId>, QueryError> {
2912 let result = self.execute_plan(input)?;
2913 let rows = match result {
2914 QueryResult::Rows { rows, .. } => rows,
2915 _ => return Err("mutation source must be rows".into()),
2916 };
2917 let matching: Vec<RowId> = self
2918 .catalog
2919 .scan(table)
2920 .map_err(|e| QueryError::StorageError(e.to_string()))?
2921 .filter(|(_, row)| rows.iter().any(|r| r == row))
2922 .map(|(rid, _)| rid)
2923 .collect();
2924 Ok(matching)
2925 }
2926}
2927
2928pub(super) fn execute_window(
2929 result: QueryResult,
2930 windows: &[WindowDef],
2931) -> Result<QueryResult, QueryError> {
2932 let (mut columns, mut rows) = match result {
2933 QueryResult::Rows { columns, rows } => (columns, rows),
2934 _ => return Err("window function requires row input".into()),
2935 };
2936
2937 for wdef in windows {
2938 // Resolve partition/order column indices against current columns.
2939 let part_indices: Vec<usize> = wdef
2940 .partition_by
2941 .iter()
2942 .map(|name| {
2943 columns
2944 .iter()
2945 .position(|c| c == name)
2946 .ok_or_else(|| format!("window partition column '{name}' not found"))
2947 })
2948 .collect::<Result<Vec<_>, _>>()?;
2949
2950 let ord_indices: Vec<(usize, bool)> = wdef
2951 .order_by
2952 .iter()
2953 .map(|sk| {
2954 columns
2955 .iter()
2956 .position(|c| c == &sk.field)
2957 .map(|i| (i, sk.descending))
2958 .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2959 })
2960 .collect::<Result<Vec<_>, _>>()?;
2961
2962 // Resolve the argument column index (for aggregate windows).
2963 let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2964 match arg {
2965 Expr::Field(name) => {
2966 if name == "*" {
2967 None // count(*) style — no specific column
2968 } else {
2969 Some(
2970 columns
2971 .iter()
2972 .position(|c| c == name)
2973 .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2974 )
2975 }
2976 }
2977 _ => None,
2978 }
2979 } else {
2980 None
2981 };
2982
2983 // Build a sort-index to sort rows by partition_by then order_by
2984 // without actually reordering the original Vec (we need original
2985 // order to write results back).
2986 let n = rows.len();
2987 let mut indices: Vec<usize> = (0..n).collect();
2988 indices.sort_by(|&a, &b| {
2989 // Compare partition keys first.
2990 for &pi in &part_indices {
2991 let cmp = rows[a][pi].cmp(&rows[b][pi]);
2992 if cmp != std::cmp::Ordering::Equal {
2993 return cmp;
2994 }
2995 }
2996 // Then order keys.
2997 for &(oi, desc) in &ord_indices {
2998 let cmp = rows[a][oi].cmp(&rows[b][oi]);
2999 if cmp != std::cmp::Ordering::Equal {
3000 return if desc { cmp.reverse() } else { cmp };
3001 }
3002 }
3003 std::cmp::Ordering::Equal
3004 });
3005
3006 // SQL window-frame semantics: with no `order` clause the frame for an
3007 // aggregate window is the ENTIRE partition, not the running prefix.
3008 // The loop below computes running values; for the no-order case we
3009 // back-fill every row of a partition with the partition's final
3010 // (i.e. complete) aggregate once its boundary is reached. Ranking
3011 // functions are untouched — row_number/rank/dense_rank are inherently
3012 // positional.
3013 let whole_partition_frame = wdef.order_by.is_empty()
3014 && matches!(
3015 wdef.function,
3016 WindowFunc::Sum
3017 | WindowFunc::Avg
3018 | WindowFunc::Count
3019 | WindowFunc::Min
3020 | WindowFunc::Max
3021 );
3022 // Original row indices of the partition currently being scanned
3023 // (only tracked when back-filling is needed).
3024 let mut partition_row_indices: Vec<usize> = Vec::new();
3025
3026 // Compute window values in sorted order, tracking partition boundaries.
3027 let mut win_values: Vec<Value> = vec![Value::Empty; n];
3028 let mut partition_start = 0usize;
3029 // Running state for aggregate windows:
3030 let mut running_count: i64 = 0;
3031 let mut running_int_sum: i64 = 0;
3032 let mut running_float_sum: f64 = 0.0;
3033 let mut running_saw_float = false;
3034 let mut running_min: Option<Value> = None;
3035 let mut running_max: Option<Value> = None;
3036 let mut rank_counter: i64 = 0;
3037 let mut dense_rank_counter: i64 = 0;
3038 let mut prev_order_key: Option<Vec<Value>> = None;
3039 let mut same_rank_count: i64 = 0;
3040
3041 for sorted_pos in 0..n {
3042 let row_idx = indices[sorted_pos];
3043
3044 // Detect partition boundary.
3045 let new_partition = if sorted_pos == 0 {
3046 true
3047 } else {
3048 let prev_row_idx = indices[sorted_pos - 1];
3049 part_indices
3050 .iter()
3051 .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
3052 };
3053
3054 if new_partition {
3055 // No-order aggregate frame: the partition that just ended is
3056 // complete, so its final running value IS the whole-partition
3057 // aggregate. Back-fill it onto every row of that partition.
3058 if whole_partition_frame && sorted_pos > 0 {
3059 let final_v = win_values[indices[sorted_pos - 1]].clone();
3060 for ri in partition_row_indices.drain(..) {
3061 win_values[ri] = final_v.clone();
3062 }
3063 }
3064 partition_start = sorted_pos;
3065 running_count = 0;
3066 running_int_sum = 0;
3067 running_float_sum = 0.0;
3068 running_saw_float = false;
3069 running_min = None;
3070 running_max = None;
3071 rank_counter = 0;
3072 dense_rank_counter = 0;
3073 prev_order_key = None;
3074 same_rank_count = 0;
3075 }
3076
3077 // Extract current order key for rank tracking.
3078 let current_order_key: Vec<Value> = ord_indices
3079 .iter()
3080 .map(|&(oi, _)| rows[row_idx][oi].clone())
3081 .collect();
3082 let same_as_prev = prev_order_key.as_ref() == Some(¤t_order_key);
3083
3084 let value = match wdef.function {
3085 WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
3086 WindowFunc::Rank => {
3087 if same_as_prev {
3088 same_rank_count += 1;
3089 } else {
3090 rank_counter += same_rank_count + 1;
3091 same_rank_count = 0;
3092 if rank_counter == 0 {
3093 rank_counter = 1;
3094 }
3095 }
3096 Value::Int(rank_counter)
3097 }
3098 WindowFunc::DenseRank => {
3099 if !same_as_prev {
3100 dense_rank_counter += 1;
3101 }
3102 Value::Int(dense_rank_counter)
3103 }
3104 WindowFunc::Sum => {
3105 if let Some(ci) = arg_col_idx {
3106 match &rows[row_idx][ci] {
3107 Value::Int(v) => running_int_sum += v,
3108 Value::Float(v) => {
3109 running_float_sum += v;
3110 running_saw_float = true;
3111 }
3112 _ => {}
3113 }
3114 }
3115 if running_saw_float {
3116 Value::Float(running_float_sum + running_int_sum as f64)
3117 } else {
3118 Value::Int(running_int_sum)
3119 }
3120 }
3121 WindowFunc::Avg => {
3122 if let Some(ci) = arg_col_idx {
3123 match &rows[row_idx][ci] {
3124 Value::Int(v) => {
3125 running_float_sum += *v as f64;
3126 running_count += 1;
3127 }
3128 Value::Float(v) => {
3129 running_float_sum += v;
3130 running_count += 1;
3131 }
3132 _ => {}
3133 }
3134 }
3135 if running_count == 0 {
3136 Value::Empty
3137 } else {
3138 Value::Float(running_float_sum / running_count as f64)
3139 }
3140 }
3141 WindowFunc::Count => {
3142 if let Some(ci) = arg_col_idx {
3143 if !rows[row_idx][ci].is_empty() {
3144 running_count += 1;
3145 }
3146 } else {
3147 // count(*) — count all rows
3148 running_count += 1;
3149 }
3150 Value::Int(running_count)
3151 }
3152 WindowFunc::Min => {
3153 if let Some(ci) = arg_col_idx {
3154 let v = &rows[row_idx][ci];
3155 if !v.is_empty() {
3156 running_min = Some(match &running_min {
3157 None => v.clone(),
3158 Some(cur) => {
3159 if v < cur {
3160 v.clone()
3161 } else {
3162 cur.clone()
3163 }
3164 }
3165 });
3166 }
3167 }
3168 running_min.clone().unwrap_or(Value::Empty)
3169 }
3170 WindowFunc::Max => {
3171 if let Some(ci) = arg_col_idx {
3172 let v = &rows[row_idx][ci];
3173 if !v.is_empty() {
3174 running_max = Some(match &running_max {
3175 None => v.clone(),
3176 Some(cur) => {
3177 if v > cur {
3178 v.clone()
3179 } else {
3180 cur.clone()
3181 }
3182 }
3183 });
3184 }
3185 }
3186 running_max.clone().unwrap_or(Value::Empty)
3187 }
3188 };
3189
3190 prev_order_key = Some(current_order_key);
3191 win_values[row_idx] = value;
3192 if whole_partition_frame {
3193 partition_row_indices.push(row_idx);
3194 }
3195 }
3196
3197 // Back-fill the final partition (the loop only flushes at boundaries).
3198 if whole_partition_frame && n > 0 {
3199 let final_v = win_values[indices[n - 1]].clone();
3200 for ri in partition_row_indices.drain(..) {
3201 win_values[ri] = final_v.clone();
3202 }
3203 }
3204
3205 // Append the computed window column to each row.
3206 for (ri, row) in rows.iter_mut().enumerate() {
3207 row.push(win_values[ri].clone());
3208 }
3209 columns.push(wdef.output_name.clone());
3210 }
3211
3212 Ok(QueryResult::Rows { columns, rows })
3213}
3214
3215/// Mission E2b: compute one aggregate over a set of rows in a group.
3216pub(super) fn compute_group_aggregate(
3217 func: AggFunc,
3218 all_rows: &[Vec<Value>],
3219 row_indices: &[usize],
3220 col_idx: usize,
3221) -> Value {
3222 match func {
3223 AggFunc::Count => {
3224 if col_idx == usize::MAX {
3225 // count(*) — count all rows in the group.
3226 return Value::Int(row_indices.len() as i64);
3227 }
3228 let count = row_indices
3229 .iter()
3230 .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3231 .count();
3232 Value::Int(count as i64)
3233 }
3234 AggFunc::CountDistinct => {
3235 let mut seen = std::collections::HashSet::new();
3236 for &ri in row_indices {
3237 let v = &all_rows[ri][col_idx];
3238 if !v.is_empty() {
3239 seen.insert(v.clone());
3240 }
3241 }
3242 Value::Int(seen.len() as i64)
3243 }
3244 AggFunc::Sum => {
3245 // Mirror the scalar Sum path: accumulate int and float
3246 // contributions separately and promote the final result to
3247 // Float if any Float row was observed. Prevents silent
3248 // drop of Float columns in GROUP BY aggregates.
3249 let mut int_sum: i64 = 0;
3250 let mut float_sum: f64 = 0.0;
3251 let mut saw_float = false;
3252 for &ri in row_indices {
3253 match &all_rows[ri][col_idx] {
3254 Value::Int(v) => int_sum += v,
3255 Value::Float(v) => {
3256 float_sum += *v;
3257 saw_float = true;
3258 }
3259 _ => {}
3260 }
3261 }
3262 if saw_float {
3263 Value::Float(float_sum + int_sum as f64)
3264 } else {
3265 Value::Int(int_sum)
3266 }
3267 }
3268 AggFunc::Avg => {
3269 let mut sum = 0.0f64;
3270 let mut count = 0usize;
3271 for &ri in row_indices {
3272 match &all_rows[ri][col_idx] {
3273 Value::Int(v) => {
3274 sum += *v as f64;
3275 count += 1;
3276 }
3277 Value::Float(v) => {
3278 sum += *v;
3279 count += 1;
3280 }
3281 _ => {}
3282 }
3283 }
3284 if count == 0 {
3285 Value::Empty
3286 } else {
3287 Value::Float(sum / count as f64)
3288 }
3289 }
3290 AggFunc::Min => row_indices
3291 .iter()
3292 .map(|&ri| &all_rows[ri][col_idx])
3293 .filter(|v| !v.is_empty())
3294 .min()
3295 .cloned()
3296 .unwrap_or(Value::Empty),
3297 AggFunc::Max => row_indices
3298 .iter()
3299 .map(|&ri| &all_rows[ri][col_idx])
3300 .filter(|v| !v.is_empty())
3301 .max()
3302 .cloned()
3303 .unwrap_or(Value::Empty),
3304 }
3305}
3306
3307/// Mission E1.3: try to extract equi-join key indices from a join `on`
3308/// predicate. Returns `Some((left_col_idx, right_col_idx))` when the
3309/// predicate is exactly `L = R` (or `R = L`) and both sides resolve
3310/// cleanly — `L` to the left subtree's column list and `R` to the right
3311/// subtree's column list.
3312///
3313/// This is deliberately narrow. We only recognise the two shapes:
3314/// * `QualifiedField = QualifiedField` (`u.id = o.user_id`)
3315/// * `Field = Field` (`.id = .user_id`, unqualified)
3316///
3317/// Anything else — conjunctions, constants, function calls, or predicates
3318/// that touch the same side on both halves — falls through to the
3319/// nested-loop path unchanged.
3320pub(super) fn try_extract_equi_join_keys(
3321 pred: &Expr,
3322 left_columns: &[String],
3323 right_columns: &[String],
3324) -> Option<(usize, usize)> {
3325 let (lhs, op, rhs) = match pred {
3326 Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3327 _ => return None,
3328 };
3329 if op != BinOp::Eq {
3330 return None;
3331 }
3332 // Normal orientation: lhs in left, rhs in right.
3333 if let (Some(li), Some(ri)) = (
3334 resolve_side_column(lhs, left_columns),
3335 resolve_side_column(rhs, right_columns),
3336 ) {
3337 return Some((li, ri));
3338 }
3339 // Swapped: rhs in left, lhs in right. Both sides of `=` are
3340 // commutative so this is safe.
3341 if let (Some(li), Some(ri)) = (
3342 resolve_side_column(rhs, left_columns),
3343 resolve_side_column(lhs, right_columns),
3344 ) {
3345 return Some((li, ri));
3346 }
3347 None
3348}
3349
3350fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3351 match expr {
3352 Expr::QualifiedField { qualifier, field } => {
3353 // Byte-level match so we don't allocate a fresh `format!` on
3354 // every call — this runs once per plan, so allocation would be
3355 // cheap, but the match is trivial enough to keep inline with
3356 // the eval_expr version.
3357 let q = qualifier.as_bytes();
3358 let f = field.as_bytes();
3359 columns.iter().position(|c| {
3360 let b = c.as_bytes();
3361 b.len() == q.len() + 1 + f.len()
3362 && b[..q.len()] == *q
3363 && b[q.len()] == b'.'
3364 && b[q.len() + 1..] == *f
3365 })
3366 }
3367 Expr::Field(name) => columns.iter().position(|c| c == name),
3368 _ => None,
3369 }
3370}
3371
3372/// Mission E1.3: O(L + R) hash join. Builds a `FxHashMap<Value, Vec<usize>>`
3373/// over the right (inner) side's join keys, then streams the left (outer)
3374/// side and for each probe row emits every combined row whose right-side
3375/// key matches. For `JoinKind::LeftOuter`, unmatched left rows are emitted
3376/// padded with `Value::Empty` on the right side.
3377///
3378/// The right side is always the build side. That choice is forced for
3379/// LeftOuter (the left side must stream so we can detect orphans), and
3380/// for Inner it's a reasonable default — left-deep plans tend to grow the
3381/// left side with each join, so the un-joined right leaf is often the
3382/// smaller of the two at each level.
3383pub(super) fn hash_join(
3384 left_columns: Vec<String>,
3385 left_rows: Vec<Vec<Value>>,
3386 right_columns: Vec<String>,
3387 right_rows: Vec<Vec<Value>>,
3388 left_key_idx: usize,
3389 right_key_idx: usize,
3390 kind: JoinKind,
3391) -> QueryResult {
3392 use rustc_hash::FxHashMap;
3393
3394 let n_left = left_columns.len();
3395 let n_right = right_columns.len();
3396 let mut columns = Vec::with_capacity(n_left + n_right);
3397 columns.extend(left_columns);
3398 columns.extend(right_columns);
3399
3400 // Build: right_key -> list of right-row indices. Pre-size to the row
3401 // count so the map doesn't rehash mid-build.
3402 let mut build: FxHashMap<Value, Vec<usize>> =
3403 FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3404 for (i, row) in right_rows.iter().enumerate() {
3405 // Skip Empty keys on the build side — they can never match under
3406 // SQL semantics (NULL ≠ NULL) and would collapse all nullables to
3407 // one bucket.
3408 if matches!(row[right_key_idx], Value::Empty) {
3409 continue;
3410 }
3411 build.entry(row[right_key_idx].clone()).or_default().push(i);
3412 }
3413
3414 // Reasonable starting capacity — inner joins produce ≥ left_rows.len()
3415 // rows in the common 1:1 case, left-outer always emits ≥ left_rows.len().
3416 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3417
3418 for left_row in &left_rows {
3419 let key = &left_row[left_key_idx];
3420 let matched = if matches!(key, Value::Empty) {
3421 None
3422 } else {
3423 build.get(key)
3424 };
3425 match matched {
3426 Some(matches) if !matches.is_empty() => {
3427 for &ri in matches {
3428 let right_row = &right_rows[ri];
3429 let mut combined = Vec::with_capacity(n_left + n_right);
3430 combined.extend_from_slice(left_row);
3431 combined.extend_from_slice(right_row);
3432 rows.push(combined);
3433 }
3434 }
3435 _ => {
3436 if matches!(kind, JoinKind::LeftOuter) {
3437 let mut row = Vec::with_capacity(n_left + n_right);
3438 row.extend_from_slice(left_row);
3439 row.resize(n_left + n_right, Value::Empty);
3440 rows.push(row);
3441 }
3442 }
3443 }
3444 }
3445
3446 QueryResult::Rows { columns, rows }
3447}
3448
3449/// Lower unindexed `RangeScan` and `IndexScan` nodes to `Filter(SeqScan)`
3450/// so that all downstream fast paths (count, project+limit, sort+limit,
3451/// agg, update, delete) continue to fire.
3452///
3453/// The planner emits `RangeScan` (for `.age > 30`) and `IndexScan` (for
3454/// `.email = lit`) speculatively because it has no catalog access. When
3455/// the column has a B-tree index, those plans are correct. When it
3456/// doesn't, the executor's fallbacks materialise every matching row with
3457/// full `decode_row` — bypassing the compiled-predicate fast paths that
3458/// `Filter(SeqScan)` would trigger. Lowering both speculative leaf kinds
3459/// also keeps EXPLAIN honest: it prints the plan that actually runs.
3460///
3461/// This pass runs once per query, before execution.
3462pub(super) fn lower_unindexed_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3463 match plan {
3464 PlanNode::RangeScan {
3465 table,
3466 column,
3467 start,
3468 end,
3469 } => {
3470 if let Some(tbl) = catalog.get_table(table) {
3471 // Keep RangeScan whenever ANY index exists on the column:
3472 // unique indexes store raw column values, non-unique indexes
3473 // store composite (value, rid) keys that the executor walks
3474 // natively via BTree::range_rids. Only lower to Filter(SeqScan)
3475 // when the column is unindexed.
3476 if tbl.has_index(column) {
3477 return plan.clone();
3478 }
3479 }
3480 let pred = synthesize_range_predicate(column, start, end);
3481 PlanNode::Filter {
3482 input: Box::new(PlanNode::SeqScan {
3483 table: table.clone(),
3484 }),
3485 predicate: pred,
3486 }
3487 }
3488 PlanNode::Filter { input, predicate } => PlanNode::Filter {
3489 input: Box::new(lower_unindexed_scans(catalog, input)),
3490 predicate: predicate.clone(),
3491 },
3492 PlanNode::Project { input, fields } => PlanNode::Project {
3493 input: Box::new(lower_unindexed_scans(catalog, input)),
3494 fields: fields.clone(),
3495 },
3496 PlanNode::Sort { input, keys } => PlanNode::Sort {
3497 input: Box::new(lower_unindexed_scans(catalog, input)),
3498 keys: keys.clone(),
3499 },
3500 PlanNode::Limit { input, count } => PlanNode::Limit {
3501 input: Box::new(lower_unindexed_scans(catalog, input)),
3502 count: count.clone(),
3503 },
3504 PlanNode::Offset { input, count } => PlanNode::Offset {
3505 input: Box::new(lower_unindexed_scans(catalog, input)),
3506 count: count.clone(),
3507 },
3508 PlanNode::Aggregate {
3509 input,
3510 function,
3511 field,
3512 } => PlanNode::Aggregate {
3513 input: Box::new(lower_unindexed_scans(catalog, input)),
3514 function: *function,
3515 field: field.clone(),
3516 },
3517 PlanNode::Distinct { input } => PlanNode::Distinct {
3518 input: Box::new(lower_unindexed_scans(catalog, input)),
3519 },
3520 PlanNode::GroupBy {
3521 input,
3522 keys,
3523 aggregates,
3524 having,
3525 } => PlanNode::GroupBy {
3526 input: Box::new(lower_unindexed_scans(catalog, input)),
3527 keys: keys.clone(),
3528 aggregates: aggregates.clone(),
3529 having: having.clone(),
3530 },
3531 PlanNode::Update {
3532 input,
3533 table,
3534 assignments,
3535 } => PlanNode::Update {
3536 input: Box::new(lower_unindexed_scans(catalog, input)),
3537 table: table.clone(),
3538 assignments: assignments.clone(),
3539 },
3540 PlanNode::Delete { input, table } => PlanNode::Delete {
3541 input: Box::new(lower_unindexed_scans(catalog, input)),
3542 table: table.clone(),
3543 },
3544 PlanNode::Window { input, windows } => PlanNode::Window {
3545 input: Box::new(lower_unindexed_scans(catalog, input)),
3546 windows: windows.clone(),
3547 },
3548 PlanNode::Union { left, right, all } => PlanNode::Union {
3549 left: Box::new(lower_unindexed_scans(catalog, left)),
3550 right: Box::new(lower_unindexed_scans(catalog, right)),
3551 all: *all,
3552 },
3553 PlanNode::Explain { input } => PlanNode::Explain {
3554 input: Box::new(lower_unindexed_scans(catalog, input)),
3555 },
3556 PlanNode::NestedLoopJoin {
3557 left,
3558 right,
3559 on,
3560 kind,
3561 } => PlanNode::NestedLoopJoin {
3562 left: Box::new(lower_unindexed_scans(catalog, left)),
3563 right: Box::new(lower_unindexed_scans(catalog, right)),
3564 on: on.clone(),
3565 kind: *kind,
3566 },
3567 PlanNode::IndexScan { table, column, key } => {
3568 if let Some(tbl) = catalog.get_table(table) {
3569 if tbl.has_index(column) {
3570 return plan.clone();
3571 }
3572 }
3573 PlanNode::Filter {
3574 input: Box::new(PlanNode::SeqScan {
3575 table: table.clone(),
3576 }),
3577 predicate: Expr::BinaryOp(
3578 Box::new(Expr::Field(column.clone())),
3579 BinOp::Eq,
3580 Box::new(key.clone()),
3581 ),
3582 }
3583 }
3584 // Leaf nodes: no children to recurse into.
3585 _ => plan.clone(),
3586 }
3587}
3588
3589/// Synthesize a range predicate from RangeScan bounds for the fallback path.
3590pub(super) fn synthesize_range_predicate(
3591 column: &str,
3592 start: &Option<(Expr, bool)>,
3593 end: &Option<(Expr, bool)>,
3594) -> Expr {
3595 let lower = start.as_ref().map(|(expr, inclusive)| {
3596 let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3597 Expr::BinaryOp(
3598 Box::new(Expr::Field(column.to_string())),
3599 op,
3600 Box::new(expr.clone()),
3601 )
3602 });
3603 let upper = end.as_ref().map(|(expr, inclusive)| {
3604 let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3605 Expr::BinaryOp(
3606 Box::new(Expr::Field(column.to_string())),
3607 op,
3608 Box::new(expr.clone()),
3609 )
3610 });
3611 match (lower, upper) {
3612 (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3613 (Some(l), None) => l,
3614 (None, Some(u)) => u,
3615 (None, None) => Expr::Literal(Literal::Bool(true)),
3616 }
3617}
3618
3619/// Check if a value falls within a range (used in last-resort decoded-row eval).
3620pub(super) fn range_matches(
3621 val: &Value,
3622 start: &Option<Value>,
3623 start_inc: bool,
3624 end: &Option<Value>,
3625 end_inc: bool,
3626) -> bool {
3627 if let Some(ref s) = start {
3628 if start_inc {
3629 if val < s {
3630 return false;
3631 }
3632 } else if val <= s {
3633 return false;
3634 }
3635 }
3636 if let Some(ref e) = end {
3637 if end_inc {
3638 if val > e {
3639 return false;
3640 }
3641 } else if val >= e {
3642 return false;
3643 }
3644 }
3645 true
3646}
3647
3648/// Format a `PlanNode` tree as a human-readable, indented text
3649/// representation. Used by the `EXPLAIN` command.
3650pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3651 let indent = " ".repeat(depth);
3652 match plan {
3653 PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3654 PlanNode::AliasScan { table, alias } => {
3655 format!("{indent}AliasScan table={table} alias={alias}")
3656 }
3657 PlanNode::IndexScan { table, column, key } => {
3658 format!("{indent}IndexScan table={table} column={column} key={key:?}")
3659 }
3660 PlanNode::RangeScan {
3661 table,
3662 column,
3663 start,
3664 end,
3665 } => {
3666 let s = match start {
3667 Some((expr, inc)) => {
3668 let op = if *inc { ">=" } else { ">" };
3669 format!("{op}{expr:?}")
3670 }
3671 None => "unbounded".to_string(),
3672 };
3673 let e = match end {
3674 Some((expr, inc)) => {
3675 let op = if *inc { "<=" } else { "<" };
3676 format!("{op}{expr:?}")
3677 }
3678 None => "unbounded".to_string(),
3679 };
3680 format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3681 }
3682 PlanNode::Filter { input, predicate } => {
3683 let child = format_plan_tree(input, depth + 1);
3684 format!("{indent}Filter predicate={predicate:?}\n{child}")
3685 }
3686 PlanNode::Project { input, fields } => {
3687 let names: Vec<String> = fields
3688 .iter()
3689 .map(|f| match &f.alias {
3690 Some(a) => format!("{a}: {:?}", f.expr),
3691 None => format!("{:?}", f.expr),
3692 })
3693 .collect();
3694 let child = format_plan_tree(input, depth + 1);
3695 format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3696 }
3697 PlanNode::Sort { input, keys } => {
3698 let ks: Vec<String> = keys
3699 .iter()
3700 .map(|k| {
3701 if k.descending {
3702 format!("{} desc", k.field)
3703 } else {
3704 k.field.clone()
3705 }
3706 })
3707 .collect();
3708 let child = format_plan_tree(input, depth + 1);
3709 format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3710 }
3711 PlanNode::Limit { input, count } => {
3712 let child = format_plan_tree(input, depth + 1);
3713 format!("{indent}Limit count={count:?}\n{child}")
3714 }
3715 PlanNode::Offset { input, count } => {
3716 let child = format_plan_tree(input, depth + 1);
3717 format!("{indent}Offset count={count:?}\n{child}")
3718 }
3719 PlanNode::Aggregate {
3720 input,
3721 function,
3722 field,
3723 } => {
3724 let f = field.as_deref().unwrap_or("*");
3725 let child = format_plan_tree(input, depth + 1);
3726 format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3727 }
3728 PlanNode::NestedLoopJoin {
3729 left,
3730 right,
3731 on,
3732 kind,
3733 } => {
3734 let left_child = format_plan_tree(left, depth + 1);
3735 let right_child = format_plan_tree(right, depth + 1);
3736 let on_str = match on {
3737 Some(pred) => format!("{pred:?}"),
3738 None => "none".to_string(),
3739 };
3740 format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3741 }
3742 PlanNode::Distinct { input } => {
3743 let child = format_plan_tree(input, depth + 1);
3744 format!("{indent}Distinct\n{child}")
3745 }
3746 PlanNode::GroupBy {
3747 input,
3748 keys,
3749 aggregates,
3750 having,
3751 } => {
3752 let agg_strs: Vec<String> = aggregates
3753 .iter()
3754 .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3755 .collect();
3756 let having_str = match having {
3757 Some(h) => format!(" having={h:?}"),
3758 None => String::new(),
3759 };
3760 let child = format_plan_tree(input, depth + 1);
3761 format!(
3762 "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3763 keys.join(", "),
3764 agg_strs.join(", "),
3765 )
3766 }
3767 PlanNode::Insert { table, rows } => {
3768 let cols: Vec<&str> = rows
3769 .first()
3770 .map(|r| r.iter().map(|a| a.field.as_str()).collect())
3771 .unwrap_or_default();
3772 format!(
3773 "{indent}Insert table={table} rows={} cols=[{}]",
3774 rows.len(),
3775 cols.join(", ")
3776 )
3777 }
3778 PlanNode::Upsert {
3779 table,
3780 key_column,
3781 assignments,
3782 on_conflict,
3783 } => {
3784 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3785 let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3786 if conflict_cols.is_empty() {
3787 format!(
3788 "{indent}Upsert table={table} key={key_column} cols=[{}]",
3789 cols.join(", ")
3790 )
3791 } else {
3792 format!(
3793 "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3794 cols.join(", "),
3795 conflict_cols.join(", ")
3796 )
3797 }
3798 }
3799 PlanNode::Update {
3800 input,
3801 table,
3802 assignments,
3803 } => {
3804 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3805 let child = format_plan_tree(input, depth + 1);
3806 format!(
3807 "{indent}Update table={table} set=[{}]\n{child}",
3808 cols.join(", ")
3809 )
3810 }
3811 PlanNode::Delete { input, table } => {
3812 let child = format_plan_tree(input, depth + 1);
3813 format!("{indent}Delete table={table}\n{child}")
3814 }
3815 PlanNode::CreateTable { name, fields } => {
3816 let fs: Vec<String> = fields
3817 .iter()
3818 .map(|f| {
3819 let mut mods = String::new();
3820 if f.required {
3821 mods.push_str(" required");
3822 }
3823 if f.unique {
3824 mods.push_str(" unique");
3825 }
3826 format!("{}: {}{mods}", f.name, f.type_name)
3827 })
3828 .collect();
3829 format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3830 }
3831 PlanNode::AlterTable { table, action } => {
3832 format!("{indent}AlterTable table={table} action={action:?}")
3833 }
3834 PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3835 PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3836 PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3837 PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3838 PlanNode::Window { input, windows } => {
3839 let ws: Vec<String> = windows
3840 .iter()
3841 .map(|w| format!("{:?} as {}", w.function, w.output_name))
3842 .collect();
3843 let child = format_plan_tree(input, depth + 1);
3844 format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3845 }
3846 PlanNode::Union { left, right, all } => {
3847 let kind = if *all { "UNION ALL" } else { "UNION" };
3848 let left_child = format_plan_tree(left, depth + 1);
3849 let right_child = format_plan_tree(right, depth + 1);
3850 format!("{indent}{kind}\n{left_child}\n{right_child}")
3851 }
3852 PlanNode::Explain { input } => {
3853 let child = format_plan_tree(input, depth + 1);
3854 format!("{indent}Explain\n{child}")
3855 }
3856 PlanNode::Begin => format!("{indent}Begin"),
3857 PlanNode::Commit => format!("{indent}Commit"),
3858 PlanNode::Rollback => format!("{indent}Rollback"),
3859 }
3860}