powdb_query/executor/plan_exec.rs
1//! The execute_plan method and associated helpers.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::row_body_base;
15use super::{check_join_limit, Engine, MAX_SORT_ROWS};
16use powdb_storage::view::{ViewDef, ViewRegistry};
17
18impl Engine {
19 pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
20 match plan {
21 PlanNode::SeqScan { table } => {
22 // Auto-refresh dirty materialized views on read.
23 if self.view_registry.is_dirty(table) {
24 self.refresh_view(table)?;
25 }
26 let schema = self
27 .catalog
28 .schema(table)
29 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
30 .clone();
31 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
32 let rows: Vec<Vec<Value>> = self
33 .catalog
34 .scan(table)
35 .map_err(|e| QueryError::StorageError(e.to_string()))?
36 .map(|(_, row)| row)
37 .collect();
38 Ok(QueryResult::Rows { columns, rows })
39 }
40
41 PlanNode::Filter { input, predicate } => {
42 // Materialize any IN-subqueries in the predicate before the
43 // scan loop — the closure can't call back into the engine.
44 // Correlated subqueries are left in place for per-row eval.
45 let materialized;
46 let predicate = if contains_subquery(predicate) {
47 materialized = self.materialize_subqueries(predicate)?;
48 &materialized
49 } else {
50 predicate
51 };
52
53 // Correlated subquery path: per-row materialisation.
54 if contains_subquery(predicate) {
55 let result = self.execute_plan(input)?;
56 return match result {
57 QueryResult::Rows { columns, rows } => {
58 let mut filtered = Vec::new();
59 for row in rows {
60 let row_pred =
61 self.materialize_correlated_for_row(predicate, &row, &columns)?;
62 if eval_predicate(&row_pred, &row, &columns) {
63 filtered.push(row);
64 }
65 }
66 Ok(QueryResult::Rows {
67 columns,
68 rows: filtered,
69 })
70 }
71 _ => Err("filter requires row input".into()),
72 };
73 }
74
75 // Fast path: fuse Filter + SeqScan into a zero-copy streaming
76 // loop. Uses decode_column() to evaluate the predicate on only
77 // the columns it references, avoiding heap allocations for
78 // String/Bytes columns that aren't part of the filter.
79 if let PlanNode::SeqScan { table } = input.as_ref() {
80 // Auto-refresh dirty materialized views.
81 if self.view_registry.is_dirty(table) {
82 self.refresh_view(table)?;
83 }
84 let schema = self
85 .catalog
86 .schema(table)
87 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
88 .clone();
89 let columns: Vec<String> =
90 schema.columns.iter().map(|c| c.name.clone()).collect();
91 let fast = FastLayout::new(&schema);
92 let row_layout = RowLayout::new(&schema);
93 // Mission F: pre-size to skip the first 4 Vec doublings
94 // (4 → 8 → 16 → 32 → 64). On a 100K-row scan with 30%
95 // selectivity that's ~4 fewer reallocations + memcpys.
96 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
97
98 // Try compiled predicate for the filter check (handles
99 // int leaves, string-eq leaves, and And conjunctions).
100 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
101 self.catalog
102 .for_each_row_raw(table, |_rid, data| {
103 if compiled(data) {
104 rows.push(decode_row(&schema, data));
105 }
106 })
107 .map_err(|e| QueryError::StorageError(e.to_string()))?;
108 } else {
109 let pred_cols = predicate_column_indices(predicate, &columns);
110 self.catalog
111 .for_each_row_raw(table, |_rid, data| {
112 let pred_row =
113 decode_selective(&schema, &row_layout, data, &pred_cols);
114 if eval_predicate(predicate, &pred_row, &columns) {
115 rows.push(decode_row(&schema, data));
116 }
117 })
118 .map_err(|e| QueryError::StorageError(e.to_string()))?;
119 }
120
121 return Ok(QueryResult::Rows { columns, rows });
122 }
123
124 // General path: materialise then filter.
125 let result = self.execute_plan(input)?;
126 match result {
127 QueryResult::Rows { columns, rows } => {
128 let filtered: Vec<Vec<Value>> = rows
129 .into_iter()
130 .filter(|row| eval_predicate(predicate, row, &columns))
131 .collect();
132 Ok(QueryResult::Rows {
133 columns,
134 rows: filtered,
135 })
136 }
137 _ => Err("filter requires row input".into()),
138 }
139 }
140
141 PlanNode::Project { input, fields } => {
142 // Fast path: Project over IndexScan — decode only projected
143 // columns from raw bytes instead of full decode_row.
144 if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
145 let schema = self
146 .catalog
147 .schema(table)
148 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
149 .clone();
150 let all_columns: Vec<String> =
151 schema.columns.iter().map(|c| c.name.clone()).collect();
152 let key_value = literal_to_value(key)?;
153 let tbl = self
154 .catalog
155 .get_table(table)
156 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
157
158 let proj_columns: Vec<String> = fields
159 .iter()
160 .map(|f| {
161 f.alias.clone().unwrap_or_else(|| match &f.expr {
162 Expr::Field(name) => name.clone(),
163 _ => "?".into(),
164 })
165 })
166 .collect();
167
168 // Determine which column indices the projection needs
169 let proj_indices: Vec<usize> = fields
170 .iter()
171 .filter_map(|f| {
172 if let Expr::Field(name) = &f.expr {
173 all_columns.iter().position(|c| c == name)
174 } else {
175 None
176 }
177 })
178 .collect();
179
180 if tbl.has_index(column) {
181 let layout = RowLayout::new(&schema);
182 let rids = tbl.index_lookup_all(column, &key_value);
183 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
184 for rid in rids {
185 if let Some(data) = tbl.heap.get(rid) {
186 let row: Vec<Value> = proj_indices
187 .iter()
188 .map(|&ci| decode_column(&schema, &layout, &data, ci))
189 .collect();
190 rows.push(row);
191 }
192 }
193 return Ok(QueryResult::Rows {
194 columns: proj_columns,
195 rows,
196 });
197 }
198 }
199
200 // Fast path: Project(Limit(Sort(Filter(SeqScan)))) — bounded
201 // top-N heap. Decodes only the sort key + projected columns,
202 // keeps at most `limit` rows in a heap. Also handles the
203 // Project(Limit(Sort(SeqScan))) variant (no filter).
204 if let PlanNode::Limit {
205 input: inner,
206 count: limit_expr,
207 } = input.as_ref()
208 {
209 if let PlanNode::Sort {
210 input: sort_input,
211 keys,
212 } = inner.as_ref()
213 {
214 // Fast path only for single-key sorts
215 if keys.len() == 1 {
216 let sort_field = &keys[0].field;
217 let descending = keys[0].descending;
218 let limit = match limit_expr {
219 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
220 _ => usize::MAX,
221 };
222 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
223 match sort_input.as_ref() {
224 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
225 PlanNode::Filter {
226 input: fi,
227 predicate,
228 } => {
229 if let PlanNode::SeqScan { table } = fi.as_ref() {
230 (Some(table.as_str()), Some(predicate))
231 } else {
232 (None, None)
233 }
234 }
235 _ => (None, None),
236 };
237 if let Some(table) = table_opt {
238 if let Some(result) = self.project_filter_sort_limit_fast(
239 table, fields, sort_field, descending, limit, pred_opt,
240 )? {
241 return Ok(result);
242 }
243 }
244 }
245 }
246 // Fast path: Project(Limit(Filter(SeqScan))) — stream,
247 // decode only projected columns, stop at limit.
248 if let PlanNode::Filter {
249 input: fi,
250 predicate,
251 } = inner.as_ref()
252 {
253 if let PlanNode::SeqScan { table } = fi.as_ref() {
254 let limit = match limit_expr {
255 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
256 _ => usize::MAX,
257 };
258 if let Some(result) = self.project_filter_limit_fast(
259 table,
260 fields,
261 limit,
262 Some(predicate),
263 )? {
264 return Ok(result);
265 }
266 }
267 }
268 // Fast path: Project(Limit(SeqScan)) — stream, no filter.
269 if let PlanNode::SeqScan { table } = inner.as_ref() {
270 let limit = match limit_expr {
271 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
272 _ => usize::MAX,
273 };
274 if let Some(result) =
275 self.project_filter_limit_fast(table, fields, limit, None)?
276 {
277 return Ok(result);
278 }
279 }
280 }
281
282 // Mission D4: Project(Filter(SeqScan)) without Limit. Reuses
283 // `project_filter_limit_fast` with limit = usize::MAX so the
284 // hot loop decodes only projected columns and uses the
285 // compiled predicate. Previously this fell through to the
286 // generic Filter branch which materialised every column via
287 // `decode_row` then re-projected — quadratic work.
288 //
289 // multi_col_and_filter (`U filter .age > 30 and .status =
290 // "active" { .name, .age }`) was 6.18ms (0.7x SQLite) and
291 // is the load-bearing workload for this fast path.
292 if let PlanNode::Filter {
293 input: fi,
294 predicate,
295 } = input.as_ref()
296 {
297 if let PlanNode::SeqScan { table } = fi.as_ref() {
298 if let Some(result) = self.project_filter_limit_fast(
299 table,
300 fields,
301 usize::MAX,
302 Some(predicate),
303 )? {
304 return Ok(result);
305 }
306 }
307 }
308
309 // Mission D4: Project(SeqScan) without Filter or Limit.
310 // Decode only projected columns; the previous fall-through
311 // built full Vec<Value> rows then re-projected.
312 if let PlanNode::SeqScan { table } = input.as_ref() {
313 if let Some(result) =
314 self.project_filter_limit_fast(table, fields, usize::MAX, None)?
315 {
316 return Ok(result);
317 }
318 }
319
320 let result = self.execute_plan(input)?;
321 match result {
322 QueryResult::Rows { columns, rows } => {
323 let proj_columns: Vec<String> = fields
324 .iter()
325 .map(|f| {
326 f.alias.clone().unwrap_or_else(|| match &f.expr {
327 Expr::Field(name) => name.clone(),
328 // Mission E1.2: `{ u.name }` projects as the
329 // qualified column name so callers can still
330 // disambiguate across the join output.
331 Expr::QualifiedField { qualifier, field } => {
332 format!("{qualifier}.{field}")
333 }
334 _ => "?".into(),
335 })
336 })
337 .collect();
338 let proj_rows: Vec<Vec<Value>> = rows
339 .iter()
340 .map(|row| {
341 fields
342 .iter()
343 .map(|f| eval_expr(&f.expr, row, &columns))
344 .collect()
345 })
346 .collect();
347 Ok(QueryResult::Rows {
348 columns: proj_columns,
349 rows: proj_rows,
350 })
351 }
352 _ => Err("project requires row input".into()),
353 }
354 }
355
356 PlanNode::Sort { input, keys } => {
357 let result = self.execute_plan(input)?;
358 match result {
359 QueryResult::Rows { columns, mut rows } => {
360 // WS2: row-count cap is a cheap secondary guard; the
361 // byte budget is the real OOM defense for the sort
362 // buffer (a few very large rows pass the row cap).
363 if rows.len() > MAX_SORT_ROWS {
364 return Err(QueryError::SortLimitExceeded);
365 }
366 self.charge_rows(&rows)?;
367 let key_indices: Vec<(usize, bool)> = keys
368 .iter()
369 .map(|k| {
370 columns
371 .iter()
372 .position(|c| c == &k.field)
373 .map(|idx| (idx, k.descending))
374 .ok_or_else(|| QueryError::ColumnNotFound {
375 table: String::new(),
376 column: k.field.clone(),
377 })
378 })
379 .collect::<Result<_, QueryError>>()?;
380 rows.sort_by(|a, b| {
381 for &(col_idx, descending) in &key_indices {
382 let cmp = a[col_idx].cmp(&b[col_idx]);
383 let cmp = if descending { cmp.reverse() } else { cmp };
384 if cmp != std::cmp::Ordering::Equal {
385 return cmp;
386 }
387 }
388 std::cmp::Ordering::Equal
389 });
390 Ok(QueryResult::Rows { columns, rows })
391 }
392 _ => Err("sort requires row input".into()),
393 }
394 }
395
396 PlanNode::Limit { input, count } => {
397 let result = self.execute_plan(input)?;
398 let n = match count {
399 Expr::Literal(Literal::Int(v)) => *v as usize,
400 _ => return Err("limit must be integer literal".into()),
401 };
402 match result {
403 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
404 columns,
405 rows: rows.into_iter().take(n).collect(),
406 }),
407 _ => Err("limit requires row input".into()),
408 }
409 }
410
411 PlanNode::Offset { input, count } => {
412 let result = self.execute_plan(input)?;
413 let n = match count {
414 Expr::Literal(Literal::Int(v)) => *v as usize,
415 _ => return Err("offset must be integer literal".into()),
416 };
417 match result {
418 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
419 columns,
420 rows: rows.into_iter().skip(n).collect(),
421 }),
422 _ => Err("offset requires row input".into()),
423 }
424 }
425
426 PlanNode::Aggregate {
427 input,
428 function,
429 field,
430 } => {
431 // Fast path: count() over SeqScan — count rows without any decode
432 if *function == AggFunc::Count {
433 if let PlanNode::SeqScan { table } = input.as_ref() {
434 // Auto-refresh a dirty materialized view before
435 // counting it — otherwise count(View) returns stale
436 // data after an underlying mutation (F3).
437 if self.view_registry.is_dirty(table) {
438 self.refresh_view(table)?;
439 }
440 let mut count: i64 = 0;
441 self.catalog
442 .for_each_row_raw(table, |_rid, _data| {
443 count += 1;
444 })
445 .map_err(|e| QueryError::StorageError(e.to_string()))?;
446 return Ok(QueryResult::Scalar(Value::Int(count)));
447 }
448 // Fast path: count() over Filter(SeqScan) — try compiled
449 // predicate first, fall back to decode_column path.
450 // Skip a predicate carrying a subquery: the raw-bytes
451 // evaluators here don't materialise subqueries, so
452 // `count(T filter .x in (...))` would silently count 0
453 // (F1). Falling through routes it to the generic path
454 // that resolves the subquery correctly.
455 if let PlanNode::Filter {
456 input: inner,
457 predicate,
458 } = input.as_ref()
459 {
460 if let PlanNode::SeqScan { table } = inner.as_ref() {
461 if self.view_registry.is_dirty(table) {
462 self.refresh_view(table)?;
463 }
464 }
465 if let (PlanNode::SeqScan { table }, false) =
466 (inner.as_ref(), contains_subquery(predicate))
467 {
468 let schema = self
469 .catalog
470 .schema(table)
471 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
472 .clone();
473 let columns: Vec<String> =
474 schema.columns.iter().map(|c| c.name.clone()).collect();
475 let fast = FastLayout::new(&schema);
476 let row_layout = RowLayout::new(&schema);
477
478 // Try compiled predicate (zero-allocation hot path).
479 // Handles int leaves, string-eq leaves, AND conjunctions.
480 if let Some(compiled) =
481 compile_predicate(predicate, &columns, &fast, &schema)
482 {
483 let mut count: i64 = 0;
484 self.catalog
485 .for_each_row_raw(table, |_rid, data| {
486 if compiled(data) {
487 count += 1;
488 }
489 })
490 .map_err(|e| QueryError::StorageError(e.to_string()))?;
491 return Ok(QueryResult::Scalar(Value::Int(count)));
492 }
493
494 // Fallback: decode predicate columns
495 let pred_cols = predicate_column_indices(predicate, &columns);
496 let mut count: i64 = 0;
497 self.catalog
498 .for_each_row_raw(table, |_rid, data| {
499 let pred_row =
500 decode_selective(&schema, &row_layout, data, &pred_cols);
501 if eval_predicate(predicate, &pred_row, &columns) {
502 count += 1;
503 }
504 })
505 .map_err(|e| QueryError::StorageError(e.to_string()))?;
506
507 return Ok(QueryResult::Scalar(Value::Int(count)));
508 }
509 }
510 }
511
512 // Fast path: sum/avg/min/max over a single fixed-size int
513 // column with an optional compiled filter predicate. Walks
514 // raw row bytes, zero allocation per row.
515 if matches!(
516 function,
517 AggFunc::Sum
518 | AggFunc::Avg
519 | AggFunc::Min
520 | AggFunc::Max
521 | AggFunc::CountDistinct
522 ) {
523 if let Some(col) = field.as_ref() {
524 // Shape: Aggregate(SeqScan) or Aggregate(Filter(SeqScan))
525 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
526 match input.as_ref() {
527 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
528 PlanNode::Filter {
529 input: inner,
530 predicate,
531 } => {
532 if let PlanNode::SeqScan { table } = inner.as_ref() {
533 (Some(table.as_str()), Some(predicate))
534 } else {
535 (None, None)
536 }
537 }
538 _ => (None, None),
539 };
540 if let Some(table) = table_opt {
541 if let Some(result) =
542 self.agg_single_col_fast(table, col, *function, pred_opt)?
543 {
544 return Ok(result);
545 }
546 }
547 }
548 }
549
550 // Fast path: Project(Limit(Filter(SeqScan))) — stream, decode
551 // only projected columns, stop once we hit the limit.
552 // (Handled in the Project branch; this branch only fires when
553 // the aggregate is the outer node.)
554 let result = self.execute_plan(input)?;
555 match result {
556 QueryResult::Rows { columns, rows } => {
557 match function {
558 AggFunc::Count => {
559 Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
560 }
561 AggFunc::CountDistinct => {
562 let col = field.as_ref().ok_or("count distinct requires field")?;
563 let idx = columns
564 .iter()
565 .position(|c| c == col)
566 .ok_or("col not found")?;
567 let mut seen = std::collections::HashSet::new();
568 for row in &rows {
569 let v = &row[idx];
570 if !v.is_empty() {
571 seen.insert(v.clone());
572 }
573 }
574 Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
575 }
576 AggFunc::Avg => {
577 let col = field.as_ref().ok_or("avg requires field")?;
578 let idx = columns
579 .iter()
580 .position(|c| c == col)
581 .ok_or("col not found")?;
582 let sum: f64 = rows
583 .iter()
584 .filter_map(|r| match &r[idx] {
585 Value::Int(v) => Some(*v as f64),
586 Value::Float(v) => Some(*v),
587 _ => None,
588 })
589 .sum();
590 let count = rows.len() as f64;
591 Ok(QueryResult::Scalar(Value::Float(sum / count)))
592 }
593 AggFunc::Sum => {
594 let col = field.as_ref().ok_or("sum requires field")?;
595 let idx = columns
596 .iter()
597 .position(|c| c == col)
598 .ok_or("col not found")?;
599 // Track int and float contributions separately so
600 // Float columns (and mixed Int/Float rows) don't get
601 // silently dropped as they did in the Int-only
602 // version. If any Float is present, the whole sum
603 // promotes to Float — matching Avg's semantics.
604 let mut int_sum: i64 = 0;
605 let mut float_sum: f64 = 0.0;
606 let mut saw_float = false;
607 for r in &rows {
608 match &r[idx] {
609 Value::Int(v) => int_sum += *v,
610 Value::Float(v) => {
611 float_sum += *v;
612 saw_float = true;
613 }
614 _ => {}
615 }
616 }
617 let result = if saw_float {
618 Value::Float(float_sum + int_sum as f64)
619 } else {
620 Value::Int(int_sum)
621 };
622 Ok(QueryResult::Scalar(result))
623 }
624 AggFunc::Min | AggFunc::Max => {
625 let col = field.as_ref().ok_or("min/max requires field")?;
626 let idx = columns
627 .iter()
628 .position(|c| c == col)
629 .ok_or("col not found")?;
630 let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
631 let result = if *function == AggFunc::Min {
632 vals.into_iter().min().cloned()
633 } else {
634 vals.into_iter().max().cloned()
635 };
636 Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
637 }
638 }
639 }
640 _ => Err("aggregate requires row input".into()),
641 }
642 }
643
644 PlanNode::Insert { table, rows } => {
645 // Build + validate EVERY row before inserting any, so a bad
646 // row (unknown/missing/uncoercible field) aborts the whole
647 // statement without a partial write. The WAL fsync happens
648 // once at statement end, so N rows = N appends + 1 fsync.
649 let all_values: Vec<Vec<Value>> = {
650 let schema = self
651 .catalog
652 .schema(table)
653 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
654 let mut all = Vec::with_capacity(rows.len());
655 for assignments in rows {
656 let mut values = vec![Value::Empty; schema.columns.len()];
657 for a in assignments {
658 let idx = schema.column_index(&a.field).ok_or_else(|| {
659 QueryError::ColumnNotFound {
660 table: String::new(),
661 column: a.field.clone(),
662 }
663 })?;
664 let raw = literal_to_value(&a.value)?;
665 values[idx] = coerce_value(raw, &schema.columns[idx])?;
666 }
667 for col in &schema.columns {
668 if col.required && matches!(values[col.position as usize], Value::Empty)
669 {
670 return Err(QueryError::Execution(format!(
671 "column '{}' is required but no value was provided",
672 col.name
673 )));
674 }
675 }
676 all.push(values);
677 }
678 all
679 };
680 // Charge the materialized batch against the per-query memory
681 // budget before inserting — keeps multi-row insert consistent
682 // with every other full-materialization point (sort/join/group)
683 // and bounds embedded callers (the server also caps the query
684 // string at 1 MB, but embedded callers have no such limit).
685 self.charge_rows(&all_values)?;
686 let n = all_values.len() as u64;
687 for values in &all_values {
688 self.catalog
689 .insert(table, values)
690 .map_err(|e| QueryError::StorageError(e.to_string()))?;
691 }
692 self.view_registry.mark_dependents_dirty(table);
693 Ok(QueryResult::Modified(n))
694 }
695
696 PlanNode::Upsert {
697 table,
698 key_column,
699 assignments,
700 on_conflict,
701 } => {
702 let (values, key_idx) = {
703 let schema = self
704 .catalog
705 .schema(table)
706 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
707 let mut values = vec![Value::Empty; schema.columns.len()];
708 for a in assignments {
709 let idx = schema.column_index(&a.field).ok_or_else(|| {
710 QueryError::ColumnNotFound {
711 table: String::new(),
712 column: a.field.clone(),
713 }
714 })?;
715 let raw = literal_to_value(&a.value)?;
716 values[idx] = coerce_value(raw, &schema.columns[idx])?;
717 }
718 for col in &schema.columns {
719 if col.required && matches!(values[col.position as usize], Value::Empty) {
720 return Err(QueryError::Execution(format!(
721 "column '{}' is required but no value was provided",
722 col.name
723 )));
724 }
725 }
726 let key_idx = schema
727 .column_index(key_column)
728 .ok_or_else(|| format!("key column '{key_column}' not found"))?;
729 (values, key_idx)
730 };
731
732 // Upsert requires the `on` column to be unique — otherwise
733 // there is no well-defined row to overwrite and a plain
734 // insert could silently create duplicate keys.
735 if self.catalog.is_index_unique(table, key_column) != Some(true) {
736 return Err(QueryError::Execution(format!(
737 "upsert on .{key_column} requires a unique column (declare it with \
738 `unique {key_column}: <type>` or `alter {table} add unique .{key_column}`)"
739 )));
740 }
741
742 let key_value = values[key_idx].clone();
743
744 // Probe the unique index for a conflict.
745 let existing = {
746 let tbl = self
747 .catalog
748 .get_table(table)
749 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
750 // The key column is guaranteed unique above, so this
751 // returns at most one matching row.
752 let rids = tbl.index_lookup_all(key_column, &key_value);
753 rids.into_iter().next().and_then(|rid| {
754 tbl.heap
755 .get(rid)
756 .map(|data| (rid, decode_row(&tbl.schema, &data)))
757 })
758 };
759
760 if let Some((rid, mut existing_row)) = existing {
761 // Conflict: apply on_conflict assignments (or all non-key if empty).
762 let update_assignments = if on_conflict.is_empty() {
763 assignments
764 } else {
765 on_conflict
766 };
767 let changed_cols: Vec<usize> = {
768 let schema = self
769 .catalog
770 .schema(table)
771 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
772 let mut indices = Vec::new();
773 for a in update_assignments {
774 let idx = schema.column_index(&a.field).ok_or_else(|| {
775 QueryError::ColumnNotFound {
776 table: String::new(),
777 column: a.field.clone(),
778 }
779 })?;
780 if idx != key_idx {
781 existing_row[idx] = literal_to_value(&a.value)?;
782 indices.push(idx);
783 }
784 }
785 indices
786 };
787 self.catalog
788 .update_hinted(table, rid, &existing_row, Some(&changed_cols))
789 .map_err(|e| QueryError::StorageError(e.to_string()))?;
790 self.view_registry.mark_dependents_dirty(table);
791 Ok(QueryResult::Modified(1))
792 } else {
793 // No conflict: insert.
794 self.catalog
795 .insert(table, &values)
796 .map_err(|e| QueryError::StorageError(e.to_string()))?;
797 self.view_registry.mark_dependents_dirty(table);
798 Ok(QueryResult::Modified(1))
799 }
800 }
801
802 PlanNode::Update {
803 input,
804 table,
805 assignments,
806 } => {
807 // Mission C Phase 3: resolve assignments against a borrowed
808 // schema, then drop the borrow before the mutation loop.
809 // Try literal-only path first; fall back to per-row expression
810 // evaluation if any assignment contains a non-literal expression
811 // (e.g., `age := .age + 1`).
812 let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
813 let schema_ref = self
814 .catalog
815 .schema(table)
816 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
817 let indices: Vec<usize> = assignments
818 .iter()
819 .map(|a| {
820 schema_ref.column_index(&a.field).ok_or_else(|| {
821 QueryError::ColumnNotFound {
822 table: String::new(),
823 column: a.field.clone(),
824 }
825 })
826 })
827 .collect::<Result<_, _>>()?;
828 let vals: Result<Vec<Value>, _> = assignments
829 .iter()
830 .map(|a| literal_to_value(&a.value))
831 .collect();
832 (indices, vals.ok())
833 };
834 let resolved_assignments: Option<Vec<(usize, Value)>> =
835 literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
836
837 // Mission C Phase 2: the hint Table::update_hinted needs to
838 // decide whether to read the old row for index diff.
839 let changed_cols: Vec<usize> = col_indices.clone();
840
841 // ── Fused scan+update for Update(Filter(SeqScan)) ────────
842 // Perf sprint: instead of the two-pass collect-RIDs-then-loop
843 // pattern (which pays one ensure_hot per matched row on the
844 // second pass), fuse the predicate evaluation and in-place
845 // byte-level mutation into a single heap walk. Same idea as
846 // the fused scan_delete_matching path for deletes.
847 if let Some(ref resolved_assignments) = resolved_assignments {
848 if let PlanNode::Filter {
849 input: inner,
850 predicate,
851 } = input.as_ref()
852 {
853 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
854 if t == table {
855 let fused_result = self.try_fused_scan_update(
856 table,
857 predicate,
858 resolved_assignments,
859 &changed_cols,
860 );
861 if let Some(result) = fused_result {
862 return result;
863 }
864 }
865 }
866 }
867 }
868
869 // Collect matching RowIds in a single pass.
870 let matching_rids = self.collect_rids_for_mutation(input, table)?;
871
872 // ── Literal-only fast paths ─────────────────────────────
873 if let Some(ref resolved_assignments) = resolved_assignments {
874 // Mission C Phase 4: in-place byte-patch fast path. If every
875 // assignment targets a fixed-size non-null column AND none of
876 // them is indexed, we can skip decode_row / Vec<Value> /
877 // encode_row_into entirely and patch the row's raw bytes on
878 // the hot page.
879 let fast_patch: Option<Vec<FastPatch>> = {
880 let tbl = self
881 .catalog
882 .get_table(table)
883 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
884 let schema = &tbl.schema;
885 let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
886 is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
887 });
888 let no_indexed = !resolved_assignments
889 .iter()
890 .any(|(idx, _)| tbl.has_indexed_col(*idx));
891
892 if all_fixed_nonnull && no_indexed {
893 let layout = RowLayout::new(schema);
894 let bitmap_size = layout.bitmap_size();
895 let patches: Vec<FastPatch> = resolved_assignments
896 .iter()
897 .map(|(idx, val)| {
898 let fixed_off = layout
899 .fixed_offset(*idx)
900 .expect("is_fixed_size already checked");
901 let field_off = 2 + bitmap_size + fixed_off;
902 let bytes: FixedBytes = match val {
903 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
904 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
905 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
906 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
907 Value::Uuid(v) => FixedBytes::Uuid(*v),
908 _ => unreachable!("all_fixed_nonnull guard lied"),
909 };
910 FastPatch {
911 field_off,
912 bitmap_byte_off: 2 + idx / 8,
913 bit_mask: 1u8 << (idx % 8),
914 bytes,
915 }
916 })
917 .collect();
918 Some(patches)
919 } else {
920 None
921 }
922 };
923
924 if let Some(patches) = fast_patch {
925 let mut count = 0u64;
926 for rid in matching_rids {
927 // Mission B2: WAL-log every patch so crash
928 // recovery replays the update. Same mutation
929 // closure as before — the wrapper just sandwiches
930 // it between a hot-page read and a WAL append.
931 let ok = self
932 .catalog
933 .update_row_bytes_logged(table, rid, |row| {
934 let base = row_body_base(row);
935 for p in &patches {
936 row[base + p.bitmap_byte_off] &= !p.bit_mask;
937 let field_bytes = p.bytes.as_slice();
938 row[base + p.field_off
939 ..base + p.field_off + field_bytes.len()]
940 .copy_from_slice(field_bytes);
941 }
942 })
943 .map_err(|e| QueryError::StorageError(e.to_string()))?;
944 if ok {
945 count += 1;
946 }
947 }
948 self.view_registry.mark_dependents_dirty(table);
949 return Ok(QueryResult::Modified(count));
950 }
951
952 // Mission C Phase 10: var-column in-place shrink fast path.
953 let var_fast: Option<(usize, Option<Vec<u8>>)> = {
954 let tbl = self
955 .catalog
956 .get_table(table)
957 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
958 let schema = &tbl.schema;
959 let is_single = resolved_assignments.len() == 1;
960 let is_var_col = is_single
961 && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
962 let no_indexed = !resolved_assignments
963 .iter()
964 .any(|(idx, _)| tbl.has_indexed_col(*idx));
965
966 if is_single && is_var_col && no_indexed {
967 let (idx, val) = &resolved_assignments[0];
968 let bytes_opt: Option<Vec<u8>> = match val {
969 Value::Str(s) => Some(s.as_bytes().to_vec()),
970 Value::Bytes(b) => Some(b.clone()),
971 Value::Empty => None,
972 _ => {
973 return Err(QueryError::TypeError(format!(
974 "cannot assign non-var value to var column '{}'",
975 schema.columns[*idx].name
976 )))
977 }
978 };
979 Some((*idx, bytes_opt))
980 } else {
981 None
982 }
983 };
984
985 if let Some((col_idx, new_bytes_opt)) = var_fast {
986 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
987 let mut count = 0u64;
988 let mut fallback_rids: Vec<RowId> = Vec::new();
989 for rid in &matching_rids {
990 // Mission B2: logged variant so crash recovery
991 // replays the shrink. On a false return (row
992 // would have to grow), the rid is pushed to
993 // `fallback_rids` and the slower `update_hinted`
994 // path — which is already WAL-logged — picks it up.
995 let ok = self
996 .catalog
997 .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
998 .map_err(|e| QueryError::StorageError(e.to_string()))?;
999 if ok {
1000 count += 1;
1001 } else {
1002 fallback_rids.push(*rid);
1003 }
1004 }
1005 for rid in fallback_rids {
1006 let mut row = match self.catalog.get(table, rid) {
1007 Some(r) => r,
1008 None => continue,
1009 };
1010 for (idx, val) in resolved_assignments.iter() {
1011 row[*idx] = val.clone();
1012 }
1013 self.catalog
1014 .update_hinted(table, rid, &row, Some(&changed_cols))
1015 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1016 count += 1;
1017 }
1018 self.view_registry.mark_dependents_dirty(table);
1019 return Ok(QueryResult::Modified(count));
1020 }
1021
1022 // Generic literal path: decode row, apply literal values.
1023 let mut count = 0u64;
1024 for rid in matching_rids {
1025 let mut row = match self.catalog.get(table, rid) {
1026 Some(r) => r,
1027 None => continue,
1028 };
1029 for (idx, val) in resolved_assignments.iter() {
1030 row[*idx] = val.clone();
1031 }
1032 self.catalog
1033 .update_hinted(table, rid, &row, Some(&changed_cols))
1034 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1035 count += 1;
1036 }
1037 self.view_registry.mark_dependents_dirty(table);
1038 return Ok(QueryResult::Modified(count));
1039 } // end if let Some(resolved_assignments)
1040
1041 // ── Expression-based update path ────────────────────────
1042 // At least one assignment contains a non-literal expression
1043 // (e.g., `age := .age + 1`). Evaluate per-row.
1044 let col_names: Vec<String> = {
1045 let schema_ref = self
1046 .catalog
1047 .schema(table)
1048 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1049 schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1050 };
1051 let mut count = 0u64;
1052 for rid in matching_rids {
1053 let mut row = match self.catalog.get(table, rid) {
1054 Some(r) => r,
1055 None => continue,
1056 };
1057 for (i, asgn) in assignments.iter().enumerate() {
1058 let val = eval_expr(&asgn.value, &row, &col_names);
1059 row[col_indices[i]] = val;
1060 }
1061 self.catalog
1062 .update_hinted(table, rid, &row, Some(&changed_cols))
1063 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1064 count += 1;
1065 }
1066 self.view_registry.mark_dependents_dirty(table);
1067 Ok(QueryResult::Modified(count))
1068 }
1069
1070 PlanNode::Delete { input, table } => {
1071 // Mission C Phase 3: no schema clone — collect_rids_for_mutation
1072 // looks up schema internally when it needs one, and the mutation
1073 // loop doesn't need the schema at all.
1074 //
1075 // Mission C Phase 12: route bulk deletes through
1076 // `Catalog::delete_many`, which batches the btree leaf
1077 // compaction and shares one `ensure_hot` per row between
1078 // the index-key extraction and the slot delete. On
1079 // `delete_by_filter` (100K fixture, ~20K matches) that
1080 // removes ~4ms of pure `Vec::remove` memmove from the btree
1081 // maintenance phase.
1082 //
1083 // Mission C Phase 16: for the common `delete where ...`
1084 // shape (Filter(SeqScan)) — and the rarer "delete
1085 // everything" shape (SeqScan) — skip the two-pass
1086 // `collect_rids_for_mutation` + `delete_many` flow entirely.
1087 // The fused `scan_delete_matching` primitive walks the
1088 // heap exactly once, paying one `ensure_hot` per page
1089 // instead of per-row. That closes the last major gap on
1090 // the bench's `delete_by_filter` workload.
1091 if let PlanNode::Filter {
1092 input: inner,
1093 predicate,
1094 } = input.as_ref()
1095 {
1096 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1097 if t == table {
1098 let schema = self
1099 .catalog
1100 .schema(table)
1101 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1102 let columns: Vec<String> =
1103 schema.columns.iter().map(|c| c.name.clone()).collect();
1104 let fast = FastLayout::new(schema);
1105 if let Some(compiled) =
1106 compile_predicate(predicate, &columns, &fast, schema)
1107 {
1108 // Mission B2: logged variant so every
1109 // matched rid hits the WAL during the
1110 // single-pass scan. Structure of the
1111 // fused scan is unchanged — only the
1112 // hook closure now also appends.
1113 let count = self
1114 .catalog
1115 .scan_delete_matching_logged(table, |data| compiled(data))
1116 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1117 self.view_registry.mark_dependents_dirty(table);
1118 return Ok(QueryResult::Modified(count));
1119 }
1120 }
1121 }
1122 } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1123 if t == table {
1124 // `delete from T` with no predicate — every live
1125 // row matches. One pass is still the right shape.
1126 // Mission B2: logged variant — see above.
1127 let count = self
1128 .catalog
1129 .scan_delete_matching_logged(table, |_| true)
1130 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1131 self.view_registry.mark_dependents_dirty(table);
1132 return Ok(QueryResult::Modified(count));
1133 }
1134 }
1135
1136 let matching_rids = self.collect_rids_for_mutation(input, table)?;
1137 let count = self
1138 .catalog
1139 .delete_many(table, &matching_rids)
1140 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1141 self.view_registry.mark_dependents_dirty(table);
1142 Ok(QueryResult::Modified(count))
1143 }
1144
1145 PlanNode::AliasScan { table, alias } => {
1146 // Mission E1.2: scan `table` and rename every output column
1147 // to `alias.field`. Used as a join leaf so downstream
1148 // NestedLoopJoin + Filter + Project nodes can resolve
1149 // `Expr::QualifiedField` lookups by direct column-name match.
1150 //
1151 // We don't bother with a fused zero-copy loop here yet — the
1152 // whole join path is nested-loop and correctness-first
1153 // (Phase E1.3 will introduce hash join and at that point we
1154 // can revisit whether to specialise AliasScan).
1155 let schema = self
1156 .catalog
1157 .schema(table)
1158 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1159 .clone();
1160 let columns: Vec<String> = schema
1161 .columns
1162 .iter()
1163 .map(|c| format!("{alias}.{}", c.name))
1164 .collect();
1165 let rows: Vec<Vec<Value>> = self
1166 .catalog
1167 .scan(table)
1168 .map_err(|e| QueryError::StorageError(e.to_string()))?
1169 .map(|(_, row)| row)
1170 .collect();
1171 Ok(QueryResult::Rows { columns, rows })
1172 }
1173
1174 PlanNode::NestedLoopJoin {
1175 left,
1176 right,
1177 on,
1178 kind,
1179 } => {
1180 // Materialise both sides. The executor ships two strategies:
1181 // 1. Hash join (E1.3) — when the `on` predicate is a
1182 // simple equi-predicate `left_col = right_col`, build a
1183 // FxHashMap<Value, Vec<row_idx>> over the right side
1184 // and probe with the left side. O(L + R) instead of
1185 // O(L × R). Handles Inner and LeftOuter.
1186 // 2. Nested loop (E1.2) — fallback for Cross, non-equi
1187 // predicates, or `on` expressions that reference
1188 // either side with something more complex than a
1189 // QualifiedField.
1190 let left_result = self.execute_plan(left)?;
1191 let right_result = self.execute_plan(right)?;
1192 let (left_columns, left_rows) = match left_result {
1193 QueryResult::Rows { columns, rows } => (columns, rows),
1194 _ => return Err("join left side must produce rows".into()),
1195 };
1196 let (right_columns, right_rows) = match right_result {
1197 QueryResult::Rows { columns, rows } => (columns, rows),
1198 _ => return Err("join right side must produce rows".into()),
1199 };
1200
1201 // WS2: byte-budget guard on the join build side. Charge both
1202 // materialized inputs before we build the hash table / probe;
1203 // the output is row-capped by check_join_limit below.
1204 self.charge_rows(&left_rows)?;
1205 self.charge_rows(&right_rows)?;
1206
1207 // Hash-join fast path.
1208 if !matches!(kind, JoinKind::Cross) {
1209 if let Some(pred) = on {
1210 if let Some((l_idx, r_idx)) =
1211 try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1212 {
1213 let result = hash_join(
1214 left_columns,
1215 left_rows,
1216 right_columns,
1217 right_rows,
1218 l_idx,
1219 r_idx,
1220 *kind,
1221 );
1222 if let QueryResult::Rows { ref rows, .. } = result {
1223 check_join_limit(rows.len())?;
1224 }
1225 return Ok(result);
1226 }
1227 }
1228 }
1229
1230 // Nested-loop fallback.
1231 let n_left = left_columns.len();
1232 let n_right = right_columns.len();
1233 let mut columns = Vec::with_capacity(n_left + n_right);
1234 columns.extend(left_columns);
1235 columns.extend(right_columns);
1236
1237 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1238 let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1239
1240 for left_row in &left_rows {
1241 let mut matched = false;
1242 for right_row in &right_rows {
1243 combined.clear();
1244 combined.extend_from_slice(left_row);
1245 combined.extend_from_slice(right_row);
1246 let keep = match kind {
1247 JoinKind::Cross => true,
1248 JoinKind::Inner | JoinKind::LeftOuter => match on {
1249 Some(pred) => eval_predicate(pred, &combined, &columns),
1250 // Missing `on` for non-cross joins is a
1251 // parser error, but if it slips through we
1252 // treat it as "match everything".
1253 None => true,
1254 },
1255 // RightOuter is rewritten to LeftOuter by the
1256 // planner, so we never see it here.
1257 JoinKind::RightOuter => {
1258 unreachable!("planner rewrites RightOuter to LeftOuter")
1259 }
1260 };
1261 if keep {
1262 rows.push(combined.clone());
1263 check_join_limit(rows.len())?;
1264 matched = true;
1265 }
1266 }
1267 if !matched && matches!(kind, JoinKind::LeftOuter) {
1268 let mut row = Vec::with_capacity(n_left + n_right);
1269 row.extend_from_slice(left_row);
1270 row.resize(n_left + n_right, Value::Empty);
1271 rows.push(row);
1272 check_join_limit(rows.len())?;
1273 }
1274 }
1275
1276 Ok(QueryResult::Rows { columns, rows })
1277 }
1278
1279 PlanNode::Distinct { input } => {
1280 let result = self.execute_plan(input)?;
1281 match result {
1282 QueryResult::Rows { columns, rows } => {
1283 let mut seen = std::collections::HashSet::new();
1284 let mut unique_rows = Vec::new();
1285 for row in rows {
1286 if seen.insert(row.clone()) {
1287 unique_rows.push(row);
1288 }
1289 }
1290 Ok(QueryResult::Rows {
1291 columns,
1292 rows: unique_rows,
1293 })
1294 }
1295 other => Ok(other),
1296 }
1297 }
1298
1299 PlanNode::GroupBy {
1300 input,
1301 keys,
1302 aggregates,
1303 having,
1304 } => {
1305 let result = self.execute_plan(input)?;
1306 match result {
1307 QueryResult::Rows { columns, rows } => {
1308 // WS2: byte-budget guard on the GROUP BY input buffer
1309 // (the hash table is bounded by the input it groups).
1310 self.charge_rows(&rows)?;
1311 // Resolve key column indices.
1312 let key_indices: Vec<usize> = keys
1313 .iter()
1314 .map(|k| {
1315 columns
1316 .iter()
1317 .position(|c| c == k)
1318 .ok_or_else(|| format!("group-by column '{k}' not found"))
1319 })
1320 .collect::<Result<Vec<_>, _>>()?;
1321
1322 // Resolve aggregate field indices. count(*) uses
1323 // sentinel usize::MAX — compute_group_aggregate
1324 // treats it as "count all rows in the group".
1325 let agg_field_indices: Vec<usize> = aggregates
1326 .iter()
1327 .map(|a| {
1328 if a.field == "*" {
1329 Ok(usize::MAX)
1330 } else {
1331 columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1332 format!("aggregate column '{}' not found", a.field)
1333 })
1334 }
1335 })
1336 .collect::<Result<Vec<_>, _>>()?;
1337
1338 // Group rows by key values (preserving insertion order).
1339 let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1340 rustc_hash::FxHashMap::default();
1341 let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1342 for (ri, row) in rows.iter().enumerate() {
1343 let key: Vec<Value> =
1344 key_indices.iter().map(|&i| row[i].clone()).collect();
1345 match group_map.get(&key) {
1346 Some(&idx) => groups[idx].1.push(ri),
1347 None => {
1348 let idx = groups.len();
1349 group_map.insert(key.clone(), idx);
1350 groups.push((key, vec![ri]));
1351 }
1352 }
1353 }
1354
1355 // Build output column names: keys ++ aggregate output names.
1356 let mut out_columns: Vec<String> = keys.clone();
1357 for agg in aggregates.iter() {
1358 out_columns.push(agg.output_name.clone());
1359 }
1360
1361 // Compute aggregates per group.
1362 let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1363 for (key_vals, row_indices) in &groups {
1364 let mut row = key_vals.clone();
1365 for (ai, agg) in aggregates.iter().enumerate() {
1366 let col_idx = agg_field_indices[ai];
1367 let val = compute_group_aggregate(
1368 agg.function,
1369 &rows,
1370 row_indices,
1371 col_idx,
1372 );
1373 row.push(val);
1374 }
1375 out_rows.push(row);
1376 }
1377
1378 // Apply HAVING filter.
1379 if let Some(having_expr) = having {
1380 out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1381 }
1382
1383 Ok(QueryResult::Rows {
1384 columns: out_columns,
1385 rows: out_rows,
1386 })
1387 }
1388 _ => Err("group by requires row input".into()),
1389 }
1390 }
1391
1392 PlanNode::CreateTable { name, fields } => {
1393 let columns: Vec<ColumnDef> = fields
1394 .iter()
1395 .enumerate()
1396 .map(|(i, f)| -> Result<ColumnDef, QueryError> {
1397 Ok(ColumnDef {
1398 name: f.name.clone(),
1399 type_id: type_name_to_id(&f.type_name)
1400 .map_err(QueryError::TypeError)?,
1401 required: f.required,
1402 position: i as u16,
1403 })
1404 })
1405 .collect::<Result<Vec<_>, _>>()?;
1406 let schema = Schema {
1407 table_name: name.clone(),
1408 columns,
1409 };
1410 self.catalog
1411 .create_table(schema)
1412 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1413 // Declaring a field `unique` auto-creates a unique B+tree
1414 // index, which is where uniqueness is enforced on writes.
1415 for f in fields.iter().filter(|f| f.unique) {
1416 self.catalog
1417 .create_index_unique(name, &f.name, true)
1418 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1419 }
1420 Ok(QueryResult::Created(name.clone()))
1421 }
1422
1423 PlanNode::AlterTable { table, action } => match action {
1424 AlterAction::AddColumn {
1425 name,
1426 type_name,
1427 required,
1428 } => {
1429 let position = self
1430 .catalog
1431 .schema(table)
1432 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1433 .columns
1434 .len() as u16;
1435 let col = ColumnDef {
1436 name: name.clone(),
1437 type_id: type_name_to_id(type_name).map_err(QueryError::TypeError)?,
1438 required: *required,
1439 position,
1440 };
1441 self.catalog
1442 .alter_table_add_column(table, col)
1443 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1444 Ok(QueryResult::Executed {
1445 message: format!("column '{name}' added to '{table}'"),
1446 })
1447 }
1448 AlterAction::DropColumn { name } => {
1449 self.catalog
1450 .alter_table_drop_column(table, name)
1451 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1452 Ok(QueryResult::Executed {
1453 message: format!("column '{name}' dropped from '{table}'"),
1454 })
1455 }
1456 AlterAction::AddIndex { column } => {
1457 self.catalog
1458 .create_index(table, column)
1459 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1460 Ok(QueryResult::Executed {
1461 message: format!("index on '{table}.{column}' created"),
1462 })
1463 }
1464 AlterAction::AddUnique { column } => {
1465 // No DropIndex exists, so we cannot upgrade an existing
1466 // non-unique index in place — reject it cleanly.
1467 if self.catalog.has_index(table, column) {
1468 return Err(QueryError::Execution(format!(
1469 "cannot add unique on {table}.{column}: column already indexed"
1470 )));
1471 }
1472 // Scan existing rows for duplicate (non-null) values
1473 // before creating the unique index.
1474 {
1475 let tbl = self
1476 .catalog
1477 .get_table(table)
1478 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1479 let col_idx = tbl.schema.column_index(column).ok_or_else(|| {
1480 QueryError::ColumnNotFound {
1481 table: table.to_string(),
1482 column: column.clone(),
1483 }
1484 })?;
1485 let mut seen = std::collections::HashSet::new();
1486 for (_, row) in tbl.scan() {
1487 let v = &row[col_idx];
1488 if v.is_empty() {
1489 continue;
1490 }
1491 if !seen.insert(v.clone()) {
1492 return Err(QueryError::Execution(format!(
1493 "cannot add unique on {table}.{column}: \
1494 duplicate value {v:?} exists"
1495 )));
1496 }
1497 }
1498 }
1499 self.catalog
1500 .create_index_unique(table, column, true)
1501 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1502 Ok(QueryResult::Executed {
1503 message: format!("unique index on '{table}.{column}' created"),
1504 })
1505 }
1506 },
1507
1508 PlanNode::DropTable { name } => {
1509 self.catalog
1510 .drop_table(name)
1511 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1512 Ok(QueryResult::Executed {
1513 message: format!("table '{name}' dropped"),
1514 })
1515 }
1516
1517 PlanNode::CreateView { name, query_text } => {
1518 self.create_view(name, query_text)?;
1519 Ok(QueryResult::Executed {
1520 message: format!("materialized view '{name}' created"),
1521 })
1522 }
1523
1524 PlanNode::RefreshView { name } => {
1525 self.refresh_view(name)?;
1526 Ok(QueryResult::Executed {
1527 message: format!("materialized view '{name}' refreshed"),
1528 })
1529 }
1530
1531 PlanNode::DropView { name } => {
1532 self.drop_view(name)?;
1533 Ok(QueryResult::Executed {
1534 message: format!("materialized view '{name}' dropped"),
1535 })
1536 }
1537
1538 PlanNode::Window { input, windows } => {
1539 let result = self.execute_plan(input)?;
1540 execute_window(result, windows)
1541 }
1542
1543 PlanNode::Union { left, right, all } => {
1544 let left_result = self.execute_plan(left)?;
1545 let right_result = self.execute_plan(right)?;
1546 let (left_cols, left_rows) = match left_result {
1547 QueryResult::Rows { columns, rows } => (columns, rows),
1548 _ => return Err("UNION requires query results on left side".into()),
1549 };
1550 let (_, right_rows) = match right_result {
1551 QueryResult::Rows { columns, rows } => (columns, rows),
1552 _ => return Err("UNION requires query results on right side".into()),
1553 };
1554 let mut combined = left_rows;
1555 if *all {
1556 // UNION ALL — just concatenate.
1557 combined.extend(right_rows);
1558 } else {
1559 // UNION — deduplicate using the same HashSet approach
1560 // as DISTINCT. Value already implements Hash + Eq.
1561 let mut seen = std::collections::HashSet::new();
1562 for row in &combined {
1563 seen.insert(row.clone());
1564 }
1565 for row in right_rows {
1566 if seen.insert(row.clone()) {
1567 combined.push(row);
1568 }
1569 }
1570 }
1571 Ok(QueryResult::Rows {
1572 columns: left_cols,
1573 rows: combined,
1574 })
1575 }
1576
1577 PlanNode::Explain { input } => {
1578 let text = format_plan_tree(input, 0);
1579 Ok(QueryResult::Rows {
1580 columns: vec!["plan".to_string()],
1581 rows: text
1582 .lines()
1583 .map(|line| vec![Value::Str(line.to_string())])
1584 .collect(),
1585 })
1586 }
1587
1588 PlanNode::Begin => {
1589 if self.in_transaction {
1590 return Err(QueryError::Execution(
1591 "already in a transaction (nested transactions not supported)".into(),
1592 ));
1593 }
1594 self.catalog
1595 .begin_transaction()
1596 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1597 self.in_transaction = true;
1598 Ok(QueryResult::Executed {
1599 message: "transaction started".to_string(),
1600 })
1601 }
1602
1603 PlanNode::Commit => {
1604 if !self.in_transaction {
1605 return Err(QueryError::Execution(
1606 "no active transaction to commit".into(),
1607 ));
1608 }
1609 self.catalog
1610 .commit_transaction()
1611 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1612 self.in_transaction = false;
1613 Ok(QueryResult::Executed {
1614 message: "transaction committed".to_string(),
1615 })
1616 }
1617
1618 PlanNode::Rollback => {
1619 if !self.in_transaction {
1620 return Err(QueryError::Execution(
1621 "no active transaction to roll back".into(),
1622 ));
1623 }
1624 self.in_transaction = false;
1625 self.catalog
1626 .rollback_to_last_sync()
1627 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1628 if let Ok(mut cache) = self.plan_cache.lock() {
1629 cache.clear();
1630 }
1631 self.view_registry = ViewRegistry::open(self.catalog.data_dir())
1632 .unwrap_or_else(|_| ViewRegistry::new(self.catalog.data_dir()));
1633 Ok(QueryResult::Executed {
1634 message: "transaction rolled back".to_string(),
1635 })
1636 }
1637
1638 PlanNode::IndexScan { table, column, key } => {
1639 let key_value = literal_to_value(key)?;
1640 let tbl = self
1641 .catalog
1642 .get_table(table)
1643 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1644 let columns: Vec<String> =
1645 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1646
1647 // Fast path: the table has a B-tree on this column.
1648 // Uses index_lookup_all to return ALL matching rows for
1649 // both unique and non-unique indexes.
1650 if tbl.has_index(column) {
1651 let rids = tbl.index_lookup_all(column, &key_value);
1652 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1653 for rid in rids {
1654 if let Some(data) = tbl.heap.get(rid) {
1655 rows.push(decode_row(&tbl.schema, &data));
1656 }
1657 }
1658 return Ok(QueryResult::Rows { columns, rows });
1659 }
1660
1661 // Fallback: no index on this column. The planner emits IndexScan
1662 // eagerly (it has no visibility into which columns are indexed
1663 // at plan time), so here we must behave like SeqScan+Filter on
1664 // `.col = literal`: return *all* matching rows, not just the
1665 // first one. A non-indexed column isn't necessarily unique.
1666 // We compile the eq predicate once and stream without any
1667 // per-row decode for non-matching rows.
1668 let schema = &tbl.schema;
1669 let fast = FastLayout::new(schema);
1670 let synth_pred = Expr::BinaryOp(
1671 Box::new(Expr::Field(column.clone())),
1672 BinOp::Eq,
1673 Box::new(key.clone()),
1674 );
1675 if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1676 // Mission F: skip the first 4 Vec doublings.
1677 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1678 self.catalog
1679 .for_each_row_raw(table, |_rid, data| {
1680 if compiled(data) {
1681 rows.push(decode_row(schema, data));
1682 }
1683 })
1684 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1685 return Ok(QueryResult::Rows { columns, rows });
1686 }
1687
1688 // Last resort: slow eq-check on materialised rows.
1689 let col_idx =
1690 schema
1691 .column_index(column)
1692 .ok_or_else(|| QueryError::ColumnNotFound {
1693 table: String::new(),
1694 column: column.clone(),
1695 })?;
1696 let rows: Vec<Vec<Value>> = tbl
1697 .scan()
1698 .filter_map(|(_, row)| {
1699 if row[col_idx] == key_value {
1700 Some(row)
1701 } else {
1702 None
1703 }
1704 })
1705 .collect();
1706 Ok(QueryResult::Rows { columns, rows })
1707 }
1708
1709 PlanNode::RangeScan {
1710 table,
1711 column,
1712 start,
1713 end,
1714 } => {
1715 let tbl = self
1716 .catalog
1717 .get_table(table)
1718 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1719 let columns: Vec<String> =
1720 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1721 let schema = &tbl.schema;
1722
1723 let start_val = match start {
1724 Some((expr, _)) => Some(literal_to_value(expr)?),
1725 None => None,
1726 };
1727 let end_val = match end {
1728 Some((expr, _)) => Some(literal_to_value(expr)?),
1729 None => None,
1730 };
1731 let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1732 let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1733
1734 // Non-unique index: walk the composite (value, rid) leaf
1735 // chain between prefix bounds, fetch each row from the heap,
1736 // and recheck. The recheck enforces exclusive bounds
1737 // (range_rids is inclusive) and defensively skips any decoded
1738 // null (nulls are never indexed, so they must not match).
1739 if tbl.is_index_unique(column) == Some(false) {
1740 if let Some(btree) = tbl.index(column) {
1741 if start_val.is_some() || end_val.is_some() {
1742 let col_idx = schema.column_index(column).ok_or_else(|| {
1743 QueryError::ColumnNotFound {
1744 table: String::new(),
1745 column: column.clone(),
1746 }
1747 })?;
1748 let rids = btree.range_rids(start_val.as_ref(), end_val.as_ref());
1749 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1750 for rid in rids {
1751 if let Some(data) = tbl.heap.get(rid) {
1752 let row = decode_row(schema, &data);
1753 if !row[col_idx].is_empty()
1754 && range_matches(
1755 &row[col_idx],
1756 &start_val,
1757 start_inclusive,
1758 &end_val,
1759 end_inclusive,
1760 )
1761 {
1762 rows.push(row);
1763 }
1764 }
1765 }
1766 return Ok(QueryResult::Rows { columns, rows });
1767 }
1768 }
1769 }
1770
1771 // Range scans use the btree fast path for unique indexes,
1772 // walking raw column-value keys directly.
1773 if tbl.is_index_unique(column) == Some(true) {
1774 if let Some(btree) = tbl.index(column) {
1775 let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1776 (Some(s), Some(e)) => btree.range(s, e).collect(),
1777 (Some(s), None) => btree.range_from(s),
1778 (None, Some(e)) => btree.range_to(e),
1779 (None, None) => {
1780 let rows: Vec<Vec<Value>> =
1781 tbl.scan().map(|(_, row)| row).collect();
1782 return Ok(QueryResult::Rows { columns, rows });
1783 }
1784 };
1785 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1786 for (key, rid) in hits {
1787 if !start_inclusive {
1788 if let Some(ref s) = start_val {
1789 if &key == s {
1790 continue;
1791 }
1792 }
1793 }
1794 if !end_inclusive {
1795 if let Some(ref e) = end_val {
1796 if &key == e {
1797 continue;
1798 }
1799 }
1800 }
1801 if let Some(data) = tbl.heap.get(rid) {
1802 rows.push(decode_row(schema, &data));
1803 }
1804 }
1805 return Ok(QueryResult::Rows { columns, rows });
1806 }
1807 }
1808
1809 // Fallback: no index — synthesize range predicate and scan.
1810 let fast = FastLayout::new(schema);
1811 let synth = synthesize_range_predicate(column, start, end);
1812 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1813 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1814 self.catalog
1815 .for_each_row_raw(table, |_rid, data| {
1816 if compiled(data) {
1817 rows.push(decode_row(schema, data));
1818 }
1819 })
1820 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1821 return Ok(QueryResult::Rows { columns, rows });
1822 }
1823
1824 let col_idx =
1825 schema
1826 .column_index(column)
1827 .ok_or_else(|| QueryError::ColumnNotFound {
1828 table: String::new(),
1829 column: column.clone(),
1830 })?;
1831 let rows: Vec<Vec<Value>> = tbl
1832 .scan()
1833 .filter(|(_, row)| {
1834 range_matches(
1835 &row[col_idx],
1836 &start_val,
1837 start_inclusive,
1838 &end_val,
1839 end_inclusive,
1840 )
1841 })
1842 .map(|(_, row)| row)
1843 .collect();
1844 Ok(QueryResult::Rows { columns, rows })
1845 }
1846 }
1847 }
1848
1849 // ─── Materialized view operations ──────────────────────────────────────
1850
1851 /// Create a materialized view: execute the source query, store results
1852 /// in a new backing table, and register the view.
1853 fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1854 if self.view_registry.is_view(name) {
1855 return Err(QueryError::ViewError(format!(
1856 "materialized view '{name}' already exists"
1857 )));
1858 }
1859 // Execute the source query to get the result set.
1860 let result = self.execute_powql(query_text)?;
1861 let (columns, rows) = match result {
1862 QueryResult::Rows { columns, rows } => (columns, rows),
1863 _ => return Err("view source query must be a SELECT".into()),
1864 };
1865 // Derive a schema for the backing table from the query result columns.
1866 let schema = self.derive_view_schema(name, &columns, &rows);
1867 // Create the backing table and insert the result rows.
1868 self.catalog
1869 .create_table(schema)
1870 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1871 for row in &rows {
1872 self.catalog
1873 .insert(name, row)
1874 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1875 }
1876 // Determine which base tables this view depends on by parsing the query.
1877 let depends_on = self.extract_view_deps(query_text);
1878 self.view_registry
1879 .register(ViewDef {
1880 name: name.to_string(),
1881 query: query_text.to_string(),
1882 depends_on,
1883 dirty: false,
1884 })
1885 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1886 Ok(())
1887 }
1888
1889 /// Refresh a materialized view: re-execute its source query and replace
1890 /// the backing table's contents.
1891 fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1892 let def = self
1893 .view_registry
1894 .get(name)
1895 .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1896 let query_text = def.query.clone();
1897 // Execute the source query.
1898 let result = self.execute_powql(&query_text)?;
1899 let (_columns, rows) = match result {
1900 QueryResult::Rows { columns, rows } => (columns, rows),
1901 _ => return Err("view source query must be a SELECT".into()),
1902 };
1903 // Clear old data and insert fresh results. Mission B2: logged
1904 // variant — view refreshes are a mutation and crash recovery
1905 // must see them.
1906 self.catalog
1907 .scan_delete_matching_logged(name, |_| true)
1908 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1909 for row in &rows {
1910 self.catalog
1911 .insert(name, row)
1912 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1913 }
1914 self.view_registry.mark_clean(name);
1915 Ok(())
1916 }
1917
1918 /// Drop a materialized view: remove the backing table and unregister.
1919 fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1920 if !self.view_registry.is_view(name) {
1921 return Err(QueryError::ViewError(format!(
1922 "materialized view '{name}' not found"
1923 )));
1924 }
1925 self.view_registry
1926 .unregister(name)
1927 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1928 self.catalog
1929 .drop_table(name)
1930 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1931 Ok(())
1932 }
1933
1934 /// Derive a storage `Schema` for a view's backing table from query
1935 /// result column names and the first row's types.
1936 fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1937 use powdb_storage::types::{ColumnDef, TypeId};
1938 let cols: Vec<ColumnDef> = columns
1939 .iter()
1940 .enumerate()
1941 .map(|(i, col_name)| {
1942 let type_id = rows
1943 .first()
1944 .and_then(|row| row.get(i))
1945 .map(|v| v.type_id())
1946 .unwrap_or(TypeId::Str);
1947 ColumnDef {
1948 name: col_name.clone(),
1949 type_id,
1950 required: false,
1951 position: i as u16,
1952 }
1953 })
1954 .collect();
1955 Schema {
1956 table_name: name.to_string(),
1957 columns: cols,
1958 }
1959 }
1960
1961 /// Extract base table dependencies from a view's source query by
1962 /// parsing it and collecting the source table name.
1963 fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1964 use crate::parser::parse;
1965 match parse(query_text) {
1966 Ok(Statement::Query(q)) => {
1967 let mut deps = vec![q.source.clone()];
1968 for j in &q.joins {
1969 deps.push(j.source.clone());
1970 }
1971 deps
1972 }
1973 _ => Vec::new(),
1974 }
1975 }
1976
1977 // ─── Specialized fast paths ─────────────────────────────────────────────
1978 //
1979 // These methods are helpers for the `execute_plan` match arms above.
1980 // Each returns `Ok(Some(result))` when the fast path fires, `Ok(None)`
1981 // when the shape isn't supported (caller falls back to generic code).
1982
1983 /// Aggregate sum/avg/min/max over a single fixed-size i64 column, with
1984 /// an optional compiled filter predicate. Walks raw row bytes — zero
1985 /// per-row allocation. Uses i128 accumulator for sum/avg overflow safety.
1986 pub(super) fn agg_single_col_fast(
1987 &self,
1988 table: &str,
1989 col: &str,
1990 function: AggFunc,
1991 predicate: Option<&Expr>,
1992 ) -> Result<Option<QueryResult>, QueryError> {
1993 let schema = self
1994 .catalog
1995 .schema(table)
1996 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1997 .clone();
1998 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1999 let col_idx = match schema.column_index(col) {
2000 Some(i) => i,
2001 None => return Ok(None),
2002 };
2003 // Only fast-path fixed-size numeric columns (Int/Float) for
2004 // sum/avg/min/max/count. Mission D10: Float parity — prior version
2005 // bailed on Float columns, forcing them through the generic row-
2006 // decoding path that allocated a Vec<Value> per row and dispatched
2007 // on Value::cmp for every compare. f64 decode is structurally the
2008 // same as i64 (load 8 bytes, cast), so the fast path handles both.
2009 let col_type = schema.columns[col_idx].type_id;
2010 if col_type != TypeId::Int && col_type != TypeId::Float {
2011 return Ok(None);
2012 }
2013
2014 let fast = FastLayout::new(&schema);
2015 // Mission C Phase 20b: inline the numeric-column reader instead of
2016 // building a `Box<dyn Fn>`. Eliminates 100K vtable dispatches per
2017 // 100K-row agg scan — every reader call folds directly into the
2018 // hot loop below.
2019 let byte_offset = match fast.fixed_offsets[col_idx] {
2020 Some(o) => o,
2021 None => return Ok(None),
2022 };
2023 let bitmap_byte = col_idx / 8;
2024 let bitmap_bit = (col_idx % 8) as u32;
2025 let body_data_offset = 2 + fast.bitmap_size + byte_offset;
2026
2027 // Optional compiled filter.
2028 let compiled_pred: Option<CompiledPredicate> = match predicate {
2029 Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
2030 Some(c) => Some(c),
2031 None => return Ok(None), // let generic path handle it
2032 },
2033 None => None,
2034 };
2035
2036 // Mission C Phase 20b: specialize the inner loop per aggregate
2037 // function. The previous version ran a `match function { ... }`
2038 // *inside* the closure, which kept LLVM from producing optimal
2039 // scalar code for each variant (agg_max regressed ~23% vs the
2040 // baseline Box<dyn Fn> version even though per-row vtable cost
2041 // should have been strictly lower). Pushing the match out of the
2042 // hot loop lets each specialized body fold cleanly into
2043 // `for_each_row_raw` and removes a captured `AggFunc` + match
2044 // dispatch per row.
2045 //
2046 // Mission D10: same specialisation applies to the Float branch.
2047 // For Min/Max we use `f64::total_cmp` so the result matches
2048 // `Value::Ord` — this is the same ordering ORDER BY and the
2049 // top-N sort fast path use, keeping semantics consistent across
2050 // read paths (NaN compares as greatest, -0.0 < +0.0 for
2051 // deterministic tie-breaking).
2052 //
2053 // Mission D11 Phase 1: each inner loop now splits on presence of
2054 // a predicate (`if let Some(pred) = &compiled_pred`) so the hot
2055 // body never re-tests `Option` per row, and reads column bytes
2056 // via `read_i64_unchecked` / `read_f64_unchecked` helpers that
2057 // drop two bounds checks per row (null bitmap byte + value
2058 // slice). Safety is carried by the `FastLayout` invariant that
2059 // `data_offset + 8 <= row_len` for any fixed-size column; see
2060 // the helper doc comments. Hot loops are macro-generated so the
2061 // with-pred / no-pred split can't drift between variants.
2062 let result = match col_type {
2063 TypeId::Int => match function {
2064 AggFunc::Sum | AggFunc::Avg => {
2065 let mut sum_i128: i128 = 0;
2066 let mut count: i64 = 0;
2067 agg_int_loop!(
2068 self,
2069 table,
2070 compiled_pred,
2071 bitmap_byte,
2072 bitmap_bit,
2073 body_data_offset,
2074 |v: i64| {
2075 count += 1;
2076 sum_i128 += v as i128;
2077 }
2078 );
2079 if matches!(function, AggFunc::Sum) {
2080 let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
2081 QueryResult::Scalar(Value::Int(clamped))
2082 } else if count == 0 {
2083 QueryResult::Scalar(Value::Empty)
2084 } else {
2085 let avg = (sum_i128 as f64) / (count as f64);
2086 QueryResult::Scalar(Value::Float(avg))
2087 }
2088 }
2089 AggFunc::Min => {
2090 let mut min_v: Option<i64> = None;
2091 agg_int_loop!(
2092 self,
2093 table,
2094 compiled_pred,
2095 bitmap_byte,
2096 bitmap_bit,
2097 body_data_offset,
2098 |v: i64| {
2099 min_v = Some(match min_v {
2100 Some(m) => m.min(v),
2101 None => v,
2102 });
2103 }
2104 );
2105 QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
2106 }
2107 AggFunc::Max => {
2108 let mut max_v: Option<i64> = None;
2109 agg_int_loop!(
2110 self,
2111 table,
2112 compiled_pred,
2113 bitmap_byte,
2114 bitmap_bit,
2115 body_data_offset,
2116 |v: i64| {
2117 max_v = Some(match max_v {
2118 Some(m) => m.max(v),
2119 None => v,
2120 });
2121 }
2122 );
2123 QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
2124 }
2125 AggFunc::Count => {
2126 let mut count: i64 = 0;
2127 agg_int_loop!(
2128 self,
2129 table,
2130 compiled_pred,
2131 bitmap_byte,
2132 bitmap_bit,
2133 body_data_offset,
2134 |_v: i64| {
2135 count += 1;
2136 }
2137 );
2138 QueryResult::Scalar(Value::Int(count))
2139 }
2140 AggFunc::CountDistinct => {
2141 let mut seen = rustc_hash::FxHashSet::default();
2142 agg_int_loop!(
2143 self,
2144 table,
2145 compiled_pred,
2146 bitmap_byte,
2147 bitmap_bit,
2148 body_data_offset,
2149 |v: i64| {
2150 seen.insert(v);
2151 }
2152 );
2153 QueryResult::Scalar(Value::Int(seen.len() as i64))
2154 }
2155 },
2156 TypeId::Float => match function {
2157 AggFunc::Sum => {
2158 // Use a single f64 accumulator. Naive summation is
2159 // sufficient for MVP parity; if precision becomes an
2160 // issue on long scans we can upgrade to Kahan–Neumaier
2161 // compensated sum (~2x scalar cost, zero error growth).
2162 let mut sum: f64 = 0.0;
2163 agg_float_loop!(
2164 self,
2165 table,
2166 compiled_pred,
2167 bitmap_byte,
2168 bitmap_bit,
2169 body_data_offset,
2170 |v: f64| {
2171 sum += v;
2172 }
2173 );
2174 QueryResult::Scalar(Value::Float(sum))
2175 }
2176 AggFunc::Avg => {
2177 let mut sum: f64 = 0.0;
2178 let mut count: i64 = 0;
2179 agg_float_loop!(
2180 self,
2181 table,
2182 compiled_pred,
2183 bitmap_byte,
2184 bitmap_bit,
2185 body_data_offset,
2186 |v: f64| {
2187 sum += v;
2188 count += 1;
2189 }
2190 );
2191 if count == 0 {
2192 QueryResult::Scalar(Value::Empty)
2193 } else {
2194 QueryResult::Scalar(Value::Float(sum / count as f64))
2195 }
2196 }
2197 AggFunc::Min => {
2198 // `total_cmp` for deterministic NaN handling (matches
2199 // Value::Ord). NaN compares greatest, so Min will
2200 // correctly ignore it in favour of any finite value.
2201 let mut min_v: Option<f64> = None;
2202 agg_float_loop!(
2203 self,
2204 table,
2205 compiled_pred,
2206 bitmap_byte,
2207 bitmap_bit,
2208 body_data_offset,
2209 |v: f64| {
2210 min_v = Some(match min_v {
2211 Some(m) => {
2212 if v.total_cmp(&m).is_lt() {
2213 v
2214 } else {
2215 m
2216 }
2217 }
2218 None => v,
2219 });
2220 }
2221 );
2222 QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2223 }
2224 AggFunc::Max => {
2225 let mut max_v: Option<f64> = None;
2226 agg_float_loop!(
2227 self,
2228 table,
2229 compiled_pred,
2230 bitmap_byte,
2231 bitmap_bit,
2232 body_data_offset,
2233 |v: f64| {
2234 max_v = Some(match max_v {
2235 Some(m) => {
2236 if v.total_cmp(&m).is_gt() {
2237 v
2238 } else {
2239 m
2240 }
2241 }
2242 None => v,
2243 });
2244 }
2245 );
2246 QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2247 }
2248 AggFunc::Count => {
2249 let mut count: i64 = 0;
2250 agg_float_loop!(
2251 self,
2252 table,
2253 compiled_pred,
2254 bitmap_byte,
2255 bitmap_bit,
2256 body_data_offset,
2257 |_v: f64| {
2258 count += 1;
2259 }
2260 );
2261 QueryResult::Scalar(Value::Int(count))
2262 }
2263 AggFunc::CountDistinct => {
2264 // Hash on `f64::to_bits` — matches `Value::Hash`, so
2265 // distinct NaN bit patterns count as distinct and
2266 // -0.0/+0.0 count as distinct. Consistent with how
2267 // Float values are hashed in every other DISTINCT /
2268 // GROUP BY path.
2269 let mut seen = rustc_hash::FxHashSet::default();
2270 agg_float_loop!(
2271 self,
2272 table,
2273 compiled_pred,
2274 bitmap_byte,
2275 bitmap_bit,
2276 body_data_offset,
2277 |v: f64| {
2278 seen.insert(v.to_bits());
2279 }
2280 );
2281 QueryResult::Scalar(Value::Int(seen.len() as i64))
2282 }
2283 },
2284 _ => unreachable!("type guard above restricts to Int/Float"),
2285 };
2286 Ok(Some(result))
2287 }
2288
2289 /// `Project(Limit(Filter(SeqScan)))` and `Project(Limit(SeqScan))`.
2290 /// Streams rows, decodes only projected columns, stops at the limit.
2291 pub(super) fn project_filter_limit_fast(
2292 &self,
2293 table: &str,
2294 fields: &[ProjectField],
2295 limit: usize,
2296 predicate: Option<&Expr>,
2297 ) -> Result<Option<QueryResult>, QueryError> {
2298 let schema = self
2299 .catalog
2300 .schema(table)
2301 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2302 .clone();
2303 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2304
2305 // Each projection field must be a simple `.field` reference for this
2306 // fast path. Aliased or computed fields fall through.
2307 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2308 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2309 for f in fields {
2310 let name = match &f.expr {
2311 Expr::Field(n) => n.clone(),
2312 _ => return Ok(None),
2313 };
2314 let idx = match all_columns.iter().position(|c| c == &name) {
2315 Some(i) => i,
2316 None => return Ok(None),
2317 };
2318 proj_indices.push(idx);
2319 proj_columns.push(f.alias.clone().unwrap_or(name));
2320 }
2321
2322 let fast = FastLayout::new(&schema);
2323 let row_layout = RowLayout::new(&schema);
2324
2325 let compiled_pred: Option<CompiledPredicate> = match predicate {
2326 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2327 Some(c) => Some(c),
2328 None => return Ok(None),
2329 },
2330 None => None,
2331 };
2332
2333 let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2334 // Mission D2: use try_for_each_row_raw to actually stop iterating
2335 // once the limit is reached. The previous `done` flag only short-
2336 // circuited the closure body, so a `limit 100` over 100K rows still
2337 // walked all 100K slots — burning ~30x SQLite on scan_filter_project_top100.
2338 self.catalog
2339 .try_for_each_row_raw(table, |_rid, data| {
2340 use std::ops::ControlFlow;
2341 if let Some(ref pred) = compiled_pred {
2342 if !pred(data) {
2343 return ControlFlow::Continue(());
2344 }
2345 }
2346 let row: Vec<Value> = proj_indices
2347 .iter()
2348 .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2349 .collect();
2350 out.push(row);
2351 if out.len() >= limit {
2352 ControlFlow::Break(())
2353 } else {
2354 ControlFlow::Continue(())
2355 }
2356 })
2357 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2358
2359 Ok(Some(QueryResult::Rows {
2360 columns: proj_columns,
2361 rows: out,
2362 }))
2363 }
2364
2365 /// `Project(Limit(Sort(Filter(SeqScan))))` and `Project(Limit(Sort(SeqScan)))`.
2366 /// Bounded top-N heap over the sort key. Only the sort key needs to be
2367 /// read per row; projected columns are decoded only for the final
2368 /// winning rows when the heap drains.
2369 pub(super) fn project_filter_sort_limit_fast(
2370 &self,
2371 table: &str,
2372 fields: &[ProjectField],
2373 sort_field: &str,
2374 descending: bool,
2375 limit: usize,
2376 predicate: Option<&Expr>,
2377 ) -> Result<Option<QueryResult>, QueryError> {
2378 if limit == 0 {
2379 // Degenerate case — empty result. Let the generic path handle it
2380 // for proper column naming.
2381 return Ok(None);
2382 }
2383 // The top-N heaps never hold more than `limit` rows, but `limit` is an
2384 // attacker-supplied literal (`order .x limit 99999999999`). Reserving
2385 // that capacity up front would allocate gigabytes and abort the
2386 // process before a single row is read. Cap the pre-allocation; the
2387 // heaps still grow on demand up to the true `limit`.
2388 const TOPN_PREALLOC_CAP: usize = 4096;
2389 let prealloc = limit.min(TOPN_PREALLOC_CAP);
2390 let schema = self
2391 .catalog
2392 .schema(table)
2393 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2394 .clone();
2395 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2396
2397 // Sort key must be a fixed-size numeric column (Int or Float).
2398 // Mission D10: extended from Int-only. Float sort keys use a
2399 // sortable-u64 transform (see `f64_to_sortable_u64`) so the heap
2400 // path stays keyed on `u64` and the whole branch shape is
2401 // identical to the Int case — no new heap types, no `total_cmp`
2402 // closures in the hot loop.
2403 let sort_idx = match schema.column_index(sort_field) {
2404 Some(i) => i,
2405 None => return Ok(None),
2406 };
2407 let sort_col_type = schema.columns[sort_idx].type_id;
2408 if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2409 return Ok(None);
2410 }
2411
2412 // Each projection field must be a simple `.field`.
2413 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2414 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2415 for f in fields {
2416 let name = match &f.expr {
2417 Expr::Field(n) => n.clone(),
2418 _ => return Ok(None),
2419 };
2420 let idx = match all_columns.iter().position(|c| c == &name) {
2421 Some(i) => i,
2422 None => return Ok(None),
2423 };
2424 proj_indices.push(idx);
2425 proj_columns.push(f.alias.clone().unwrap_or(name));
2426 }
2427
2428 let fast = FastLayout::new(&schema);
2429 let row_layout = RowLayout::new(&schema);
2430 // Mission C Phase 20b: inline numeric-column reader (no Box<dyn Fn>).
2431 let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2432 Some(o) => o,
2433 None => return Ok(None),
2434 };
2435 let sort_bitmap_byte = sort_idx / 8;
2436 let sort_bitmap_bit = (sort_idx % 8) as u32;
2437 let sort_body_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2438
2439 let compiled_pred: Option<CompiledPredicate> = match predicate {
2440 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2441 Some(c) => Some(c),
2442 None => return Ok(None),
2443 },
2444 None => None,
2445 };
2446
2447 // Bounded top-N heap. For `order .x desc limit N`, we want the N
2448 // largest values — use a min-heap so the smallest is at the top and
2449 // can be popped when a better candidate arrives. For ascending, use
2450 // a max-heap. We tie-break with a monotonic `seq` counter so the
2451 // result is deterministic and stable.
2452 //
2453 // To keep this simple we maintain two typed heaps and pick by
2454 // direction.
2455 let drained: Vec<Vec<u8>> = match sort_col_type {
2456 TypeId::Int => {
2457 let mut seq: u64 = 0;
2458 let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2459 BinaryHeap::with_capacity(prealloc);
2460 let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2461 BinaryHeap::with_capacity(prealloc);
2462
2463 self.catalog
2464 .for_each_row_raw(table, |_rid, data| {
2465 if let Some(ref pred) = compiled_pred {
2466 if !pred(data) {
2467 return;
2468 }
2469 }
2470 // Inlined int-column reader: null check + i64 decode.
2471 let base = row_body_base(data);
2472 let sort_data_offset = base + sort_body_data_offset;
2473 if data.len() < sort_data_offset + 8
2474 || data.len() <= base + 2 + sort_bitmap_byte
2475 {
2476 return;
2477 }
2478 let is_null =
2479 (data[base + 2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2480 if is_null {
2481 return;
2482 }
2483 let key = i64::from_le_bytes(
2484 data[sort_data_offset..sort_data_offset + 8]
2485 .try_into()
2486 .unwrap_or_else(|_| unreachable!()),
2487 );
2488 let id = seq;
2489 seq += 1;
2490
2491 if descending {
2492 if heap_desc.len() < limit {
2493 heap_desc.push(Reverse((key, id, data.to_vec())));
2494 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2495 if key > *top_key {
2496 heap_desc.pop();
2497 heap_desc.push(Reverse((key, id, data.to_vec())));
2498 }
2499 }
2500 } else if heap_asc.len() < limit {
2501 heap_asc.push((key, id, data.to_vec()));
2502 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2503 if key < *top_key {
2504 heap_asc.pop();
2505 heap_asc.push((key, id, data.to_vec()));
2506 }
2507 }
2508 })
2509 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2510
2511 let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2512 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2513 } else {
2514 heap_asc.into_iter().collect()
2515 };
2516 if descending {
2517 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2518 } else {
2519 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2520 }
2521 drained.into_iter().map(|(_, _, d)| d).collect()
2522 }
2523 TypeId::Float => {
2524 // Novel angle: rather than introducing a `TotalF64` newtype
2525 // with `Ord via total_cmp`, transform the f64 bit pattern
2526 // into a sortable `u64` so `BinaryHeap<u64>` orders exactly
2527 // like `f64::total_cmp` would. Classic trick: flip the sign
2528 // bit on positives, flip all bits on negatives. Result:
2529 // - NaN (sign=0) stays greatest, matching total_cmp
2530 // - -0.0 sorts before +0.0, matching total_cmp
2531 // - Hot loop is branch-cheap (one compare + one xor)
2532 let mut seq: u64 = 0;
2533 let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2534 BinaryHeap::with_capacity(prealloc);
2535 let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2536 BinaryHeap::with_capacity(prealloc);
2537
2538 self.catalog
2539 .for_each_row_raw(table, |_rid, data| {
2540 if let Some(ref pred) = compiled_pred {
2541 if !pred(data) {
2542 return;
2543 }
2544 }
2545 let base = row_body_base(data);
2546 let sort_data_offset = base + sort_body_data_offset;
2547 if data.len() < sort_data_offset + 8
2548 || data.len() <= base + 2 + sort_bitmap_byte
2549 {
2550 return;
2551 }
2552 let is_null =
2553 (data[base + 2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2554 if is_null {
2555 return;
2556 }
2557 let bits = u64::from_le_bytes(
2558 data[sort_data_offset..sort_data_offset + 8]
2559 .try_into()
2560 .unwrap_or_else(|_| unreachable!()),
2561 );
2562 let key = f64_bits_to_sortable_u64(bits);
2563 let id = seq;
2564 seq += 1;
2565
2566 if descending {
2567 if heap_desc.len() < limit {
2568 heap_desc.push(Reverse((key, id, data.to_vec())));
2569 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2570 if key > *top_key {
2571 heap_desc.pop();
2572 heap_desc.push(Reverse((key, id, data.to_vec())));
2573 }
2574 }
2575 } else if heap_asc.len() < limit {
2576 heap_asc.push((key, id, data.to_vec()));
2577 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2578 if key < *top_key {
2579 heap_asc.pop();
2580 heap_asc.push((key, id, data.to_vec()));
2581 }
2582 }
2583 })
2584 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2585
2586 let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2587 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2588 } else {
2589 heap_asc.into_iter().collect()
2590 };
2591 if descending {
2592 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2593 } else {
2594 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2595 }
2596 drained.into_iter().map(|(_, _, d)| d).collect()
2597 }
2598 _ => unreachable!("type guard above restricts to Int/Float"),
2599 };
2600
2601 let rows: Vec<Vec<Value>> = drained
2602 .into_iter()
2603 .map(|data| {
2604 proj_indices
2605 .iter()
2606 .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2607 .collect()
2608 })
2609 .collect();
2610
2611 Ok(Some(QueryResult::Rows {
2612 columns: proj_columns,
2613 rows,
2614 }))
2615 }
2616
2617 /// Gather the RowIds that a mutation should operate on, without
2618 /// materialising the full row set. Handles the shapes the planner emits
2619 /// for update/delete: SeqScan, IndexScan, and Filter(SeqScan). Other
2620 /// shapes fall back to `generic_rid_match`.
2621 ///
2622 /// Perf sprint: try to fuse the predicate evaluation and in-place
2623 /// byte-level mutation into a single heap walk. Returns `Some(result)`
2624 /// if the fused path fired, `None` to fall through to the generic
2625 /// two-pass code.
2626 ///
2627 /// Covers two shapes:
2628 /// 1. Fixed-width non-null literal assignments on non-indexed columns
2629 /// → byte-patch every matched row in place (row length unchanged).
2630 /// 2. Single var-col literal assignment on a non-indexed column
2631 /// → `patch_var_column_in_place` on every matched row (may shrink);
2632 /// rows that can't be patched in place are collected for fallback.
2633 fn try_fused_scan_update(
2634 &mut self,
2635 table: &str,
2636 predicate: &Expr,
2637 resolved: &[(usize, Value)],
2638 changed_cols: &[usize],
2639 ) -> Option<Result<QueryResult, QueryError>> {
2640 // Build compiled predicate. Requires a schema borrow that must be
2641 // dropped before we call scan_patch_matching_logged.
2642 let compiled = {
2643 let schema = self.catalog.schema(table)?;
2644 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2645 let fast = FastLayout::new(schema);
2646 compile_predicate(predicate, &columns, &fast, schema)?
2647 };
2648
2649 // ── Path 1: fixed-width fast patch ──────────────────────────
2650 let fixed_patches: Option<Vec<FastPatch>> = {
2651 let tbl = self.catalog.get_table(table)?;
2652 let schema = &tbl.schema;
2653 let all_fixed_nonnull = resolved
2654 .iter()
2655 .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2656 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2657 if all_fixed_nonnull && no_indexed {
2658 let layout = RowLayout::new(schema);
2659 let bitmap_size = layout.bitmap_size();
2660 Some(
2661 resolved
2662 .iter()
2663 .map(|(idx, val)| {
2664 let fixed_off = layout
2665 .fixed_offset(*idx)
2666 .expect("is_fixed_size already checked");
2667 let field_off = 2 + bitmap_size + fixed_off;
2668 let bytes: FixedBytes = match val {
2669 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2670 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2671 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2672 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2673 Value::Uuid(v) => FixedBytes::Uuid(*v),
2674 _ => unreachable!("all_fixed_nonnull guard"),
2675 };
2676 FastPatch {
2677 field_off,
2678 bitmap_byte_off: 2 + idx / 8,
2679 bit_mask: 1u8 << (idx % 8),
2680 bytes,
2681 }
2682 })
2683 .collect(),
2684 )
2685 } else {
2686 None
2687 }
2688 };
2689 if let Some(patches) = fixed_patches {
2690 let result = self
2691 .catalog
2692 .scan_patch_matching_logged(table, compiled, |row| {
2693 let base = row_body_base(row);
2694 for p in &patches {
2695 row[base + p.bitmap_byte_off] &= !p.bit_mask;
2696 let field_bytes = p.bytes.as_slice();
2697 row[base + p.field_off..base + p.field_off + field_bytes.len()]
2698 .copy_from_slice(field_bytes);
2699 }
2700 Some(row.len() as u16)
2701 })
2702 .map_err(|e| e.to_string());
2703 match result {
2704 Ok((count, _)) => {
2705 self.view_registry.mark_dependents_dirty(table);
2706 return Some(Ok(QueryResult::Modified(count)));
2707 }
2708 Err(e) => return Some(Err(QueryError::Execution(e))),
2709 }
2710 }
2711
2712 // ── Path 2: single var-col shrink fast patch ────────────────
2713 let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2714 let tbl = self.catalog.get_table(table)?;
2715 let schema = &tbl.schema;
2716 let is_single = resolved.len() == 1;
2717 let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2718 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2719 if is_single && is_var && no_indexed {
2720 let (idx, val) = &resolved[0];
2721 let bytes_opt = match val {
2722 Value::Str(s) => Some(s.as_bytes().to_vec()),
2723 Value::Bytes(b) => Some(b.clone()),
2724 Value::Empty => None,
2725 _ => return None, // type mismatch, fall through
2726 };
2727 Some((*idx, bytes_opt))
2728 } else {
2729 None
2730 }
2731 };
2732 if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2733 // Build a fresh RowLayout before the mutable borrow.
2734 let layout = {
2735 let schema = self.catalog.schema(table)?;
2736 RowLayout::new(schema)
2737 };
2738 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2739 let result = self
2740 .catalog
2741 .scan_patch_matching_logged(table, compiled, |row| {
2742 patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2743 })
2744 .map_err(|e| e.to_string());
2745 match result {
2746 Ok((mut count, fallback_rids)) => {
2747 // Handle rows where in-place patch failed (new > old).
2748 for rid in fallback_rids {
2749 let mut row = match self.catalog.get(table, rid) {
2750 Some(r) => r,
2751 None => continue,
2752 };
2753 for (idx, val) in resolved.iter() {
2754 row[*idx] = val.clone();
2755 }
2756 if let Err(e) =
2757 self.catalog
2758 .update_hinted(table, rid, &row, Some(changed_cols))
2759 {
2760 return Some(Err(QueryError::StorageError(e.to_string())));
2761 }
2762 count += 1;
2763 }
2764 self.view_registry.mark_dependents_dirty(table);
2765 return Some(Ok(QueryResult::Modified(count)));
2766 }
2767 Err(e) => return Some(Err(QueryError::Execution(e))),
2768 }
2769 }
2770
2771 None // no fused path applicable — fall through
2772 }
2773
2774 /// Mission C Phase 3: schema is looked up via `self.catalog.schema(table)`
2775 /// inside the branches that actually need it. Previously the caller had
2776 /// to clone the full Schema (6+ String allocs) before every mutation just
2777 /// so this function could borrow it — a cost the update/delete hot path
2778 /// did not need.
2779 fn collect_rids_for_mutation(
2780 &mut self,
2781 input: &PlanNode,
2782 table: &str,
2783 ) -> Result<Vec<RowId>, QueryError> {
2784 match input {
2785 PlanNode::SeqScan { table: t } if t == table => {
2786 // "Update/delete everything" — rare but legal.
2787 let rids: Vec<RowId> = self
2788 .catalog
2789 .scan(table)
2790 .map_err(|e| QueryError::StorageError(e.to_string()))?
2791 .map(|(rid, _)| rid)
2792 .collect();
2793 Ok(rids)
2794 }
2795 PlanNode::IndexScan {
2796 table: t,
2797 column,
2798 key,
2799 } if t == table => {
2800 let key_value = literal_to_value(key)?;
2801
2802 // Indexed case: single lookup, 0 or 1 rows.
2803 // Mission D7: int-specialized fast path on int-keyed indexes
2804 // (primary keys, created_at, etc.) — the common case for
2805 // `update_by_pk` / `delete where id = ?`.
2806 //
2807 // Scope the `tbl` borrow so it's released before we fall
2808 // through to the scan-based paths below (which reborrow
2809 // `self.catalog`).
2810 {
2811 let tbl = self
2812 .catalog
2813 .get_table(table)
2814 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2815 if tbl.has_index(column) {
2816 let rids = tbl.index_lookup_all(column, &key_value);
2817 return Ok(rids);
2818 }
2819 }
2820
2821 // No index: the planner folds `.col = literal` to IndexScan
2822 // regardless of whether the column is actually unique. When
2823 // there's no index we must behave like Filter(SeqScan) and
2824 // return *all* matching RIDs — not just the first one.
2825 let schema = self
2826 .catalog
2827 .schema(table)
2828 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2829 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2830 let fast = FastLayout::new(schema);
2831 let synth = Expr::BinaryOp(
2832 Box::new(Expr::Field(column.clone())),
2833 BinOp::Eq,
2834 Box::new(key.clone()),
2835 );
2836 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2837 // Mission F: skip the first 4 Vec doublings.
2838 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2839 self.catalog
2840 .for_each_row_raw(table, |rid, data| {
2841 if compiled(data) {
2842 rids.push(rid);
2843 }
2844 })
2845 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2846 return Ok(rids);
2847 }
2848
2849 // Fallback: decode each row, compare values.
2850 let col_idx =
2851 schema
2852 .column_index(column)
2853 .ok_or_else(|| QueryError::ColumnNotFound {
2854 table: String::new(),
2855 column: column.clone(),
2856 })?;
2857 let rids: Vec<RowId> = self
2858 .catalog
2859 .scan(table)
2860 .map_err(|e| QueryError::StorageError(e.to_string()))?
2861 .filter_map(|(rid, row)| {
2862 if row[col_idx] == key_value {
2863 Some(rid)
2864 } else {
2865 None
2866 }
2867 })
2868 .collect();
2869 Ok(rids)
2870 }
2871 PlanNode::Filter {
2872 input: inner,
2873 predicate,
2874 } => {
2875 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2876 if t != table {
2877 return self.generic_rid_match(input, table);
2878 }
2879 let schema = self
2880 .catalog
2881 .schema(table)
2882 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2883 let columns: Vec<String> =
2884 schema.columns.iter().map(|c| c.name.clone()).collect();
2885 let fast = FastLayout::new(schema);
2886 let row_layout = RowLayout::new(schema);
2887
2888 // Try compiled predicate first.
2889 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2890 // Mission F: skip the first 4 Vec doublings.
2891 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2892 self.catalog
2893 .for_each_row_raw(table, |rid, data| {
2894 if compiled(data) {
2895 rids.push(rid);
2896 }
2897 })
2898 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2899 return Ok(rids);
2900 }
2901
2902 // Fallback: selective decode + eval.
2903 let pred_cols = predicate_column_indices(predicate, &columns);
2904 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2905 self.catalog
2906 .for_each_row_raw(table, |rid, data| {
2907 let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2908 if eval_predicate(predicate, &pred_row, &columns) {
2909 rids.push(rid);
2910 }
2911 })
2912 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2913 return Ok(rids);
2914 }
2915 self.generic_rid_match(input, table)
2916 }
2917 _ => self.generic_rid_match(input, table),
2918 }
2919 }
2920
2921 /// Last-ditch generic match: execute the plan, collect matching rows,
2922 /// then find corresponding RowIds by value equality. This is the old
2923 /// O(N*M) code path; only used when the plan shape is something exotic.
2924 fn generic_rid_match(
2925 &mut self,
2926 input: &PlanNode,
2927 table: &str,
2928 ) -> Result<Vec<RowId>, QueryError> {
2929 let result = self.execute_plan(input)?;
2930 let rows = match result {
2931 QueryResult::Rows { rows, .. } => rows,
2932 _ => return Err("mutation source must be rows".into()),
2933 };
2934 let matching: Vec<RowId> = self
2935 .catalog
2936 .scan(table)
2937 .map_err(|e| QueryError::StorageError(e.to_string()))?
2938 .filter(|(_, row)| rows.iter().any(|r| r == row))
2939 .map(|(rid, _)| rid)
2940 .collect();
2941 Ok(matching)
2942 }
2943}
2944
2945pub(super) fn execute_window(
2946 result: QueryResult,
2947 windows: &[WindowDef],
2948) -> Result<QueryResult, QueryError> {
2949 let (mut columns, mut rows) = match result {
2950 QueryResult::Rows { columns, rows } => (columns, rows),
2951 _ => return Err("window function requires row input".into()),
2952 };
2953
2954 for wdef in windows {
2955 // Resolve partition/order column indices against current columns.
2956 let part_indices: Vec<usize> = wdef
2957 .partition_by
2958 .iter()
2959 .map(|name| {
2960 columns
2961 .iter()
2962 .position(|c| c == name)
2963 .ok_or_else(|| format!("window partition column '{name}' not found"))
2964 })
2965 .collect::<Result<Vec<_>, _>>()?;
2966
2967 let ord_indices: Vec<(usize, bool)> = wdef
2968 .order_by
2969 .iter()
2970 .map(|sk| {
2971 columns
2972 .iter()
2973 .position(|c| c == &sk.field)
2974 .map(|i| (i, sk.descending))
2975 .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2976 })
2977 .collect::<Result<Vec<_>, _>>()?;
2978
2979 // Resolve the argument column index (for aggregate windows).
2980 let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2981 match arg {
2982 Expr::Field(name) => {
2983 if name == "*" {
2984 None // count(*) style — no specific column
2985 } else {
2986 Some(
2987 columns
2988 .iter()
2989 .position(|c| c == name)
2990 .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2991 )
2992 }
2993 }
2994 _ => None,
2995 }
2996 } else {
2997 None
2998 };
2999
3000 // Build a sort-index to sort rows by partition_by then order_by
3001 // without actually reordering the original Vec (we need original
3002 // order to write results back).
3003 let n = rows.len();
3004 let mut indices: Vec<usize> = (0..n).collect();
3005 indices.sort_by(|&a, &b| {
3006 // Compare partition keys first.
3007 for &pi in &part_indices {
3008 let cmp = rows[a][pi].cmp(&rows[b][pi]);
3009 if cmp != std::cmp::Ordering::Equal {
3010 return cmp;
3011 }
3012 }
3013 // Then order keys.
3014 for &(oi, desc) in &ord_indices {
3015 let cmp = rows[a][oi].cmp(&rows[b][oi]);
3016 if cmp != std::cmp::Ordering::Equal {
3017 return if desc { cmp.reverse() } else { cmp };
3018 }
3019 }
3020 std::cmp::Ordering::Equal
3021 });
3022
3023 // SQL window-frame semantics: with no `order` clause the frame for an
3024 // aggregate window is the ENTIRE partition, not the running prefix.
3025 // The loop below computes running values; for the no-order case we
3026 // back-fill every row of a partition with the partition's final
3027 // (i.e. complete) aggregate once its boundary is reached. Ranking
3028 // functions are untouched — row_number/rank/dense_rank are inherently
3029 // positional.
3030 let whole_partition_frame = wdef.order_by.is_empty()
3031 && matches!(
3032 wdef.function,
3033 WindowFunc::Sum
3034 | WindowFunc::Avg
3035 | WindowFunc::Count
3036 | WindowFunc::Min
3037 | WindowFunc::Max
3038 );
3039 // Original row indices of the partition currently being scanned
3040 // (only tracked when back-filling is needed).
3041 let mut partition_row_indices: Vec<usize> = Vec::new();
3042
3043 // Compute window values in sorted order, tracking partition boundaries.
3044 let mut win_values: Vec<Value> = vec![Value::Empty; n];
3045 let mut partition_start = 0usize;
3046 // Running state for aggregate windows:
3047 let mut running_count: i64 = 0;
3048 let mut running_int_sum: i64 = 0;
3049 let mut running_float_sum: f64 = 0.0;
3050 let mut running_saw_float = false;
3051 let mut running_min: Option<Value> = None;
3052 let mut running_max: Option<Value> = None;
3053 let mut rank_counter: i64 = 0;
3054 let mut dense_rank_counter: i64 = 0;
3055 let mut prev_order_key: Option<Vec<Value>> = None;
3056 let mut same_rank_count: i64 = 0;
3057
3058 for sorted_pos in 0..n {
3059 let row_idx = indices[sorted_pos];
3060
3061 // Detect partition boundary.
3062 let new_partition = if sorted_pos == 0 {
3063 true
3064 } else {
3065 let prev_row_idx = indices[sorted_pos - 1];
3066 part_indices
3067 .iter()
3068 .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
3069 };
3070
3071 if new_partition {
3072 // No-order aggregate frame: the partition that just ended is
3073 // complete, so its final running value IS the whole-partition
3074 // aggregate. Back-fill it onto every row of that partition.
3075 if whole_partition_frame && sorted_pos > 0 {
3076 let final_v = win_values[indices[sorted_pos - 1]].clone();
3077 for ri in partition_row_indices.drain(..) {
3078 win_values[ri] = final_v.clone();
3079 }
3080 }
3081 partition_start = sorted_pos;
3082 running_count = 0;
3083 running_int_sum = 0;
3084 running_float_sum = 0.0;
3085 running_saw_float = false;
3086 running_min = None;
3087 running_max = None;
3088 rank_counter = 0;
3089 dense_rank_counter = 0;
3090 prev_order_key = None;
3091 same_rank_count = 0;
3092 }
3093
3094 // Extract current order key for rank tracking.
3095 let current_order_key: Vec<Value> = ord_indices
3096 .iter()
3097 .map(|&(oi, _)| rows[row_idx][oi].clone())
3098 .collect();
3099 let same_as_prev = prev_order_key.as_ref() == Some(¤t_order_key);
3100
3101 let value = match wdef.function {
3102 WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
3103 WindowFunc::Rank => {
3104 if same_as_prev {
3105 same_rank_count += 1;
3106 } else {
3107 rank_counter += same_rank_count + 1;
3108 same_rank_count = 0;
3109 if rank_counter == 0 {
3110 rank_counter = 1;
3111 }
3112 }
3113 Value::Int(rank_counter)
3114 }
3115 WindowFunc::DenseRank => {
3116 if !same_as_prev {
3117 dense_rank_counter += 1;
3118 }
3119 Value::Int(dense_rank_counter)
3120 }
3121 WindowFunc::Sum => {
3122 if let Some(ci) = arg_col_idx {
3123 match &rows[row_idx][ci] {
3124 Value::Int(v) => running_int_sum += v,
3125 Value::Float(v) => {
3126 running_float_sum += v;
3127 running_saw_float = true;
3128 }
3129 _ => {}
3130 }
3131 }
3132 if running_saw_float {
3133 Value::Float(running_float_sum + running_int_sum as f64)
3134 } else {
3135 Value::Int(running_int_sum)
3136 }
3137 }
3138 WindowFunc::Avg => {
3139 if let Some(ci) = arg_col_idx {
3140 match &rows[row_idx][ci] {
3141 Value::Int(v) => {
3142 running_float_sum += *v as f64;
3143 running_count += 1;
3144 }
3145 Value::Float(v) => {
3146 running_float_sum += v;
3147 running_count += 1;
3148 }
3149 _ => {}
3150 }
3151 }
3152 if running_count == 0 {
3153 Value::Empty
3154 } else {
3155 Value::Float(running_float_sum / running_count as f64)
3156 }
3157 }
3158 WindowFunc::Count => {
3159 if let Some(ci) = arg_col_idx {
3160 if !rows[row_idx][ci].is_empty() {
3161 running_count += 1;
3162 }
3163 } else {
3164 // count(*) — count all rows
3165 running_count += 1;
3166 }
3167 Value::Int(running_count)
3168 }
3169 WindowFunc::Min => {
3170 if let Some(ci) = arg_col_idx {
3171 let v = &rows[row_idx][ci];
3172 if !v.is_empty() {
3173 running_min = Some(match &running_min {
3174 None => v.clone(),
3175 Some(cur) => {
3176 if v < cur {
3177 v.clone()
3178 } else {
3179 cur.clone()
3180 }
3181 }
3182 });
3183 }
3184 }
3185 running_min.clone().unwrap_or(Value::Empty)
3186 }
3187 WindowFunc::Max => {
3188 if let Some(ci) = arg_col_idx {
3189 let v = &rows[row_idx][ci];
3190 if !v.is_empty() {
3191 running_max = Some(match &running_max {
3192 None => v.clone(),
3193 Some(cur) => {
3194 if v > cur {
3195 v.clone()
3196 } else {
3197 cur.clone()
3198 }
3199 }
3200 });
3201 }
3202 }
3203 running_max.clone().unwrap_or(Value::Empty)
3204 }
3205 };
3206
3207 prev_order_key = Some(current_order_key);
3208 win_values[row_idx] = value;
3209 if whole_partition_frame {
3210 partition_row_indices.push(row_idx);
3211 }
3212 }
3213
3214 // Back-fill the final partition (the loop only flushes at boundaries).
3215 if whole_partition_frame && n > 0 {
3216 let final_v = win_values[indices[n - 1]].clone();
3217 for ri in partition_row_indices.drain(..) {
3218 win_values[ri] = final_v.clone();
3219 }
3220 }
3221
3222 // Append the computed window column to each row.
3223 for (ri, row) in rows.iter_mut().enumerate() {
3224 row.push(win_values[ri].clone());
3225 }
3226 columns.push(wdef.output_name.clone());
3227 }
3228
3229 Ok(QueryResult::Rows { columns, rows })
3230}
3231
3232/// Mission E2b: compute one aggregate over a set of rows in a group.
3233pub(super) fn compute_group_aggregate(
3234 func: AggFunc,
3235 all_rows: &[Vec<Value>],
3236 row_indices: &[usize],
3237 col_idx: usize,
3238) -> Value {
3239 match func {
3240 AggFunc::Count => {
3241 if col_idx == usize::MAX {
3242 // count(*) — count all rows in the group.
3243 return Value::Int(row_indices.len() as i64);
3244 }
3245 let count = row_indices
3246 .iter()
3247 .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3248 .count();
3249 Value::Int(count as i64)
3250 }
3251 AggFunc::CountDistinct => {
3252 let mut seen = std::collections::HashSet::new();
3253 for &ri in row_indices {
3254 let v = &all_rows[ri][col_idx];
3255 if !v.is_empty() {
3256 seen.insert(v.clone());
3257 }
3258 }
3259 Value::Int(seen.len() as i64)
3260 }
3261 AggFunc::Sum => {
3262 // Mirror the scalar Sum path: accumulate int and float
3263 // contributions separately and promote the final result to
3264 // Float if any Float row was observed. Prevents silent
3265 // drop of Float columns in GROUP BY aggregates.
3266 let mut int_sum: i64 = 0;
3267 let mut float_sum: f64 = 0.0;
3268 let mut saw_float = false;
3269 for &ri in row_indices {
3270 match &all_rows[ri][col_idx] {
3271 Value::Int(v) => int_sum += v,
3272 Value::Float(v) => {
3273 float_sum += *v;
3274 saw_float = true;
3275 }
3276 _ => {}
3277 }
3278 }
3279 if saw_float {
3280 Value::Float(float_sum + int_sum as f64)
3281 } else {
3282 Value::Int(int_sum)
3283 }
3284 }
3285 AggFunc::Avg => {
3286 let mut sum = 0.0f64;
3287 let mut count = 0usize;
3288 for &ri in row_indices {
3289 match &all_rows[ri][col_idx] {
3290 Value::Int(v) => {
3291 sum += *v as f64;
3292 count += 1;
3293 }
3294 Value::Float(v) => {
3295 sum += *v;
3296 count += 1;
3297 }
3298 _ => {}
3299 }
3300 }
3301 if count == 0 {
3302 Value::Empty
3303 } else {
3304 Value::Float(sum / count as f64)
3305 }
3306 }
3307 AggFunc::Min => row_indices
3308 .iter()
3309 .map(|&ri| &all_rows[ri][col_idx])
3310 .filter(|v| !v.is_empty())
3311 .min()
3312 .cloned()
3313 .unwrap_or(Value::Empty),
3314 AggFunc::Max => row_indices
3315 .iter()
3316 .map(|&ri| &all_rows[ri][col_idx])
3317 .filter(|v| !v.is_empty())
3318 .max()
3319 .cloned()
3320 .unwrap_or(Value::Empty),
3321 }
3322}
3323
3324/// Mission E1.3: try to extract equi-join key indices from a join `on`
3325/// predicate. Returns `Some((left_col_idx, right_col_idx))` when the
3326/// predicate is exactly `L = R` (or `R = L`) and both sides resolve
3327/// cleanly — `L` to the left subtree's column list and `R` to the right
3328/// subtree's column list.
3329///
3330/// This is deliberately narrow. We only recognise the two shapes:
3331/// * `QualifiedField = QualifiedField` (`u.id = o.user_id`)
3332/// * `Field = Field` (`.id = .user_id`, unqualified)
3333///
3334/// Anything else — conjunctions, constants, function calls, or predicates
3335/// that touch the same side on both halves — falls through to the
3336/// nested-loop path unchanged.
3337pub(super) fn try_extract_equi_join_keys(
3338 pred: &Expr,
3339 left_columns: &[String],
3340 right_columns: &[String],
3341) -> Option<(usize, usize)> {
3342 let (lhs, op, rhs) = match pred {
3343 Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3344 _ => return None,
3345 };
3346 if op != BinOp::Eq {
3347 return None;
3348 }
3349 // Normal orientation: lhs in left, rhs in right.
3350 if let (Some(li), Some(ri)) = (
3351 resolve_side_column(lhs, left_columns),
3352 resolve_side_column(rhs, right_columns),
3353 ) {
3354 return Some((li, ri));
3355 }
3356 // Swapped: rhs in left, lhs in right. Both sides of `=` are
3357 // commutative so this is safe.
3358 if let (Some(li), Some(ri)) = (
3359 resolve_side_column(rhs, left_columns),
3360 resolve_side_column(lhs, right_columns),
3361 ) {
3362 return Some((li, ri));
3363 }
3364 None
3365}
3366
3367fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3368 match expr {
3369 Expr::QualifiedField { qualifier, field } => {
3370 // Byte-level match so we don't allocate a fresh `format!` on
3371 // every call — this runs once per plan, so allocation would be
3372 // cheap, but the match is trivial enough to keep inline with
3373 // the eval_expr version.
3374 let q = qualifier.as_bytes();
3375 let f = field.as_bytes();
3376 columns.iter().position(|c| {
3377 let b = c.as_bytes();
3378 b.len() == q.len() + 1 + f.len()
3379 && b[..q.len()] == *q
3380 && b[q.len()] == b'.'
3381 && b[q.len() + 1..] == *f
3382 })
3383 }
3384 Expr::Field(name) => columns.iter().position(|c| c == name),
3385 _ => None,
3386 }
3387}
3388
3389/// Mission E1.3: O(L + R) hash join. Builds a `FxHashMap<Value, Vec<usize>>`
3390/// over the right (inner) side's join keys, then streams the left (outer)
3391/// side and for each probe row emits every combined row whose right-side
3392/// key matches. For `JoinKind::LeftOuter`, unmatched left rows are emitted
3393/// padded with `Value::Empty` on the right side.
3394///
3395/// The right side is always the build side. That choice is forced for
3396/// LeftOuter (the left side must stream so we can detect orphans), and
3397/// for Inner it's a reasonable default — left-deep plans tend to grow the
3398/// left side with each join, so the un-joined right leaf is often the
3399/// smaller of the two at each level.
3400pub(super) fn hash_join(
3401 left_columns: Vec<String>,
3402 left_rows: Vec<Vec<Value>>,
3403 right_columns: Vec<String>,
3404 right_rows: Vec<Vec<Value>>,
3405 left_key_idx: usize,
3406 right_key_idx: usize,
3407 kind: JoinKind,
3408) -> QueryResult {
3409 use rustc_hash::FxHashMap;
3410
3411 let n_left = left_columns.len();
3412 let n_right = right_columns.len();
3413 let mut columns = Vec::with_capacity(n_left + n_right);
3414 columns.extend(left_columns);
3415 columns.extend(right_columns);
3416
3417 // Build: right_key -> list of right-row indices. Pre-size to the row
3418 // count so the map doesn't rehash mid-build.
3419 let mut build: FxHashMap<Value, Vec<usize>> =
3420 FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3421 for (i, row) in right_rows.iter().enumerate() {
3422 // Skip Empty keys on the build side — they can never match under
3423 // SQL semantics (NULL ≠ NULL) and would collapse all nullables to
3424 // one bucket.
3425 if matches!(row[right_key_idx], Value::Empty) {
3426 continue;
3427 }
3428 build.entry(row[right_key_idx].clone()).or_default().push(i);
3429 }
3430
3431 // Reasonable starting capacity — inner joins produce ≥ left_rows.len()
3432 // rows in the common 1:1 case, left-outer always emits ≥ left_rows.len().
3433 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3434
3435 for left_row in &left_rows {
3436 let key = &left_row[left_key_idx];
3437 let matched = if matches!(key, Value::Empty) {
3438 None
3439 } else {
3440 build.get(key)
3441 };
3442 match matched {
3443 Some(matches) if !matches.is_empty() => {
3444 for &ri in matches {
3445 let right_row = &right_rows[ri];
3446 let mut combined = Vec::with_capacity(n_left + n_right);
3447 combined.extend_from_slice(left_row);
3448 combined.extend_from_slice(right_row);
3449 rows.push(combined);
3450 }
3451 }
3452 _ => {
3453 if matches!(kind, JoinKind::LeftOuter) {
3454 let mut row = Vec::with_capacity(n_left + n_right);
3455 row.extend_from_slice(left_row);
3456 row.resize(n_left + n_right, Value::Empty);
3457 rows.push(row);
3458 }
3459 }
3460 }
3461 }
3462
3463 QueryResult::Rows { columns, rows }
3464}
3465
3466/// Lower unindexed `RangeScan` and `IndexScan` nodes to `Filter(SeqScan)`
3467/// so that all downstream fast paths (count, project+limit, sort+limit,
3468/// agg, update, delete) continue to fire.
3469///
3470/// The planner emits `RangeScan` (for `.age > 30`) and `IndexScan` (for
3471/// `.email = lit`) speculatively because it has no catalog access. When
3472/// the column has a B-tree index, those plans are correct. When it
3473/// doesn't, the executor's fallbacks materialise every matching row with
3474/// full `decode_row` — bypassing the compiled-predicate fast paths that
3475/// `Filter(SeqScan)` would trigger. Lowering both speculative leaf kinds
3476/// also keeps EXPLAIN honest: it prints the plan that actually runs.
3477///
3478/// This pass runs once per query, before execution.
3479pub(super) fn lower_unindexed_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3480 match plan {
3481 PlanNode::RangeScan {
3482 table,
3483 column,
3484 start,
3485 end,
3486 } => {
3487 if let Some(tbl) = catalog.get_table(table) {
3488 // Keep RangeScan whenever ANY index exists on the column:
3489 // unique indexes store raw column values, non-unique indexes
3490 // store composite (value, rid) keys that the executor walks
3491 // natively via BTree::range_rids. Only lower to Filter(SeqScan)
3492 // when the column is unindexed.
3493 if tbl.has_index(column) {
3494 return plan.clone();
3495 }
3496 }
3497 let pred = synthesize_range_predicate(column, start, end);
3498 PlanNode::Filter {
3499 input: Box::new(PlanNode::SeqScan {
3500 table: table.clone(),
3501 }),
3502 predicate: pred,
3503 }
3504 }
3505 PlanNode::Filter { input, predicate } => PlanNode::Filter {
3506 input: Box::new(lower_unindexed_scans(catalog, input)),
3507 predicate: predicate.clone(),
3508 },
3509 PlanNode::Project { input, fields } => PlanNode::Project {
3510 input: Box::new(lower_unindexed_scans(catalog, input)),
3511 fields: fields.clone(),
3512 },
3513 PlanNode::Sort { input, keys } => PlanNode::Sort {
3514 input: Box::new(lower_unindexed_scans(catalog, input)),
3515 keys: keys.clone(),
3516 },
3517 PlanNode::Limit { input, count } => PlanNode::Limit {
3518 input: Box::new(lower_unindexed_scans(catalog, input)),
3519 count: count.clone(),
3520 },
3521 PlanNode::Offset { input, count } => PlanNode::Offset {
3522 input: Box::new(lower_unindexed_scans(catalog, input)),
3523 count: count.clone(),
3524 },
3525 PlanNode::Aggregate {
3526 input,
3527 function,
3528 field,
3529 } => PlanNode::Aggregate {
3530 input: Box::new(lower_unindexed_scans(catalog, input)),
3531 function: *function,
3532 field: field.clone(),
3533 },
3534 PlanNode::Distinct { input } => PlanNode::Distinct {
3535 input: Box::new(lower_unindexed_scans(catalog, input)),
3536 },
3537 PlanNode::GroupBy {
3538 input,
3539 keys,
3540 aggregates,
3541 having,
3542 } => PlanNode::GroupBy {
3543 input: Box::new(lower_unindexed_scans(catalog, input)),
3544 keys: keys.clone(),
3545 aggregates: aggregates.clone(),
3546 having: having.clone(),
3547 },
3548 PlanNode::Update {
3549 input,
3550 table,
3551 assignments,
3552 } => PlanNode::Update {
3553 input: Box::new(lower_unindexed_scans(catalog, input)),
3554 table: table.clone(),
3555 assignments: assignments.clone(),
3556 },
3557 PlanNode::Delete { input, table } => PlanNode::Delete {
3558 input: Box::new(lower_unindexed_scans(catalog, input)),
3559 table: table.clone(),
3560 },
3561 PlanNode::Window { input, windows } => PlanNode::Window {
3562 input: Box::new(lower_unindexed_scans(catalog, input)),
3563 windows: windows.clone(),
3564 },
3565 PlanNode::Union { left, right, all } => PlanNode::Union {
3566 left: Box::new(lower_unindexed_scans(catalog, left)),
3567 right: Box::new(lower_unindexed_scans(catalog, right)),
3568 all: *all,
3569 },
3570 PlanNode::Explain { input } => PlanNode::Explain {
3571 input: Box::new(lower_unindexed_scans(catalog, input)),
3572 },
3573 PlanNode::NestedLoopJoin {
3574 left,
3575 right,
3576 on,
3577 kind,
3578 } => PlanNode::NestedLoopJoin {
3579 left: Box::new(lower_unindexed_scans(catalog, left)),
3580 right: Box::new(lower_unindexed_scans(catalog, right)),
3581 on: on.clone(),
3582 kind: *kind,
3583 },
3584 PlanNode::IndexScan { table, column, key } => {
3585 if let Some(tbl) = catalog.get_table(table) {
3586 if tbl.has_index(column) {
3587 return plan.clone();
3588 }
3589 }
3590 PlanNode::Filter {
3591 input: Box::new(PlanNode::SeqScan {
3592 table: table.clone(),
3593 }),
3594 predicate: Expr::BinaryOp(
3595 Box::new(Expr::Field(column.clone())),
3596 BinOp::Eq,
3597 Box::new(key.clone()),
3598 ),
3599 }
3600 }
3601 // Leaf nodes: no children to recurse into.
3602 _ => plan.clone(),
3603 }
3604}
3605
3606/// Synthesize a range predicate from RangeScan bounds for the fallback path.
3607pub(super) fn synthesize_range_predicate(
3608 column: &str,
3609 start: &Option<(Expr, bool)>,
3610 end: &Option<(Expr, bool)>,
3611) -> Expr {
3612 let lower = start.as_ref().map(|(expr, inclusive)| {
3613 let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3614 Expr::BinaryOp(
3615 Box::new(Expr::Field(column.to_string())),
3616 op,
3617 Box::new(expr.clone()),
3618 )
3619 });
3620 let upper = end.as_ref().map(|(expr, inclusive)| {
3621 let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3622 Expr::BinaryOp(
3623 Box::new(Expr::Field(column.to_string())),
3624 op,
3625 Box::new(expr.clone()),
3626 )
3627 });
3628 match (lower, upper) {
3629 (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3630 (Some(l), None) => l,
3631 (None, Some(u)) => u,
3632 (None, None) => Expr::Literal(Literal::Bool(true)),
3633 }
3634}
3635
3636/// Check if a value falls within a range (used in last-resort decoded-row eval).
3637pub(super) fn range_matches(
3638 val: &Value,
3639 start: &Option<Value>,
3640 start_inc: bool,
3641 end: &Option<Value>,
3642 end_inc: bool,
3643) -> bool {
3644 if let Some(ref s) = start {
3645 if start_inc {
3646 if val < s {
3647 return false;
3648 }
3649 } else if val <= s {
3650 return false;
3651 }
3652 }
3653 if let Some(ref e) = end {
3654 if end_inc {
3655 if val > e {
3656 return false;
3657 }
3658 } else if val >= e {
3659 return false;
3660 }
3661 }
3662 true
3663}
3664
3665/// Format a `PlanNode` tree as a human-readable, indented text
3666/// representation. Used by the `EXPLAIN` command.
3667pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3668 let indent = " ".repeat(depth);
3669 match plan {
3670 PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3671 PlanNode::AliasScan { table, alias } => {
3672 format!("{indent}AliasScan table={table} alias={alias}")
3673 }
3674 PlanNode::IndexScan { table, column, key } => {
3675 format!("{indent}IndexScan table={table} column={column} key={key:?}")
3676 }
3677 PlanNode::RangeScan {
3678 table,
3679 column,
3680 start,
3681 end,
3682 } => {
3683 let s = match start {
3684 Some((expr, inc)) => {
3685 let op = if *inc { ">=" } else { ">" };
3686 format!("{op}{expr:?}")
3687 }
3688 None => "unbounded".to_string(),
3689 };
3690 let e = match end {
3691 Some((expr, inc)) => {
3692 let op = if *inc { "<=" } else { "<" };
3693 format!("{op}{expr:?}")
3694 }
3695 None => "unbounded".to_string(),
3696 };
3697 format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3698 }
3699 PlanNode::Filter { input, predicate } => {
3700 let child = format_plan_tree(input, depth + 1);
3701 format!("{indent}Filter predicate={predicate:?}\n{child}")
3702 }
3703 PlanNode::Project { input, fields } => {
3704 let names: Vec<String> = fields
3705 .iter()
3706 .map(|f| match &f.alias {
3707 Some(a) => format!("{a}: {:?}", f.expr),
3708 None => format!("{:?}", f.expr),
3709 })
3710 .collect();
3711 let child = format_plan_tree(input, depth + 1);
3712 format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3713 }
3714 PlanNode::Sort { input, keys } => {
3715 let ks: Vec<String> = keys
3716 .iter()
3717 .map(|k| {
3718 if k.descending {
3719 format!("{} desc", k.field)
3720 } else {
3721 k.field.clone()
3722 }
3723 })
3724 .collect();
3725 let child = format_plan_tree(input, depth + 1);
3726 format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3727 }
3728 PlanNode::Limit { input, count } => {
3729 let child = format_plan_tree(input, depth + 1);
3730 format!("{indent}Limit count={count:?}\n{child}")
3731 }
3732 PlanNode::Offset { input, count } => {
3733 let child = format_plan_tree(input, depth + 1);
3734 format!("{indent}Offset count={count:?}\n{child}")
3735 }
3736 PlanNode::Aggregate {
3737 input,
3738 function,
3739 field,
3740 } => {
3741 let f = field.as_deref().unwrap_or("*");
3742 let child = format_plan_tree(input, depth + 1);
3743 format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3744 }
3745 PlanNode::NestedLoopJoin {
3746 left,
3747 right,
3748 on,
3749 kind,
3750 } => {
3751 let left_child = format_plan_tree(left, depth + 1);
3752 let right_child = format_plan_tree(right, depth + 1);
3753 let on_str = match on {
3754 Some(pred) => format!("{pred:?}"),
3755 None => "none".to_string(),
3756 };
3757 format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3758 }
3759 PlanNode::Distinct { input } => {
3760 let child = format_plan_tree(input, depth + 1);
3761 format!("{indent}Distinct\n{child}")
3762 }
3763 PlanNode::GroupBy {
3764 input,
3765 keys,
3766 aggregates,
3767 having,
3768 } => {
3769 let agg_strs: Vec<String> = aggregates
3770 .iter()
3771 .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3772 .collect();
3773 let having_str = match having {
3774 Some(h) => format!(" having={h:?}"),
3775 None => String::new(),
3776 };
3777 let child = format_plan_tree(input, depth + 1);
3778 format!(
3779 "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3780 keys.join(", "),
3781 agg_strs.join(", "),
3782 )
3783 }
3784 PlanNode::Insert { table, rows } => {
3785 let cols: Vec<&str> = rows
3786 .first()
3787 .map(|r| r.iter().map(|a| a.field.as_str()).collect())
3788 .unwrap_or_default();
3789 format!(
3790 "{indent}Insert table={table} rows={} cols=[{}]",
3791 rows.len(),
3792 cols.join(", ")
3793 )
3794 }
3795 PlanNode::Upsert {
3796 table,
3797 key_column,
3798 assignments,
3799 on_conflict,
3800 } => {
3801 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3802 let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3803 if conflict_cols.is_empty() {
3804 format!(
3805 "{indent}Upsert table={table} key={key_column} cols=[{}]",
3806 cols.join(", ")
3807 )
3808 } else {
3809 format!(
3810 "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3811 cols.join(", "),
3812 conflict_cols.join(", ")
3813 )
3814 }
3815 }
3816 PlanNode::Update {
3817 input,
3818 table,
3819 assignments,
3820 } => {
3821 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3822 let child = format_plan_tree(input, depth + 1);
3823 format!(
3824 "{indent}Update table={table} set=[{}]\n{child}",
3825 cols.join(", ")
3826 )
3827 }
3828 PlanNode::Delete { input, table } => {
3829 let child = format_plan_tree(input, depth + 1);
3830 format!("{indent}Delete table={table}\n{child}")
3831 }
3832 PlanNode::CreateTable { name, fields } => {
3833 let fs: Vec<String> = fields
3834 .iter()
3835 .map(|f| {
3836 let mut mods = String::new();
3837 if f.required {
3838 mods.push_str(" required");
3839 }
3840 if f.unique {
3841 mods.push_str(" unique");
3842 }
3843 format!("{}: {}{mods}", f.name, f.type_name)
3844 })
3845 .collect();
3846 format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3847 }
3848 PlanNode::AlterTable { table, action } => {
3849 format!("{indent}AlterTable table={table} action={action:?}")
3850 }
3851 PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3852 PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3853 PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3854 PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3855 PlanNode::Window { input, windows } => {
3856 let ws: Vec<String> = windows
3857 .iter()
3858 .map(|w| format!("{:?} as {}", w.function, w.output_name))
3859 .collect();
3860 let child = format_plan_tree(input, depth + 1);
3861 format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3862 }
3863 PlanNode::Union { left, right, all } => {
3864 let kind = if *all { "UNION ALL" } else { "UNION" };
3865 let left_child = format_plan_tree(left, depth + 1);
3866 let right_child = format_plan_tree(right, depth + 1);
3867 format!("{indent}{kind}\n{left_child}\n{right_child}")
3868 }
3869 PlanNode::Explain { input } => {
3870 let child = format_plan_tree(input, depth + 1);
3871 format!("{indent}Explain\n{child}")
3872 }
3873 PlanNode::Begin => format!("{indent}Begin"),
3874 PlanNode::Commit => format!("{indent}Commit"),
3875 PlanNode::Rollback => format!("{indent}Rollback"),
3876 }
3877}