Skip to main content

powdb_query/executor/
plan_exec.rs

1//! The execute_plan method and associated helpers.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::row_body_base;
15use super::{check_join_limit, Engine, MAX_SORT_ROWS};
16use powdb_storage::view::{ViewDef, ViewRegistry};
17
18impl Engine {
19    pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
20        match plan {
21            PlanNode::SeqScan { table } => {
22                // Auto-refresh dirty materialized views on read.
23                if self.view_registry.is_dirty(table) {
24                    self.refresh_view(table)?;
25                }
26                let schema = self
27                    .catalog
28                    .schema(table)
29                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
30                    .clone();
31                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
32                let rows: Vec<Vec<Value>> = self
33                    .catalog
34                    .scan(table)
35                    .map_err(|e| QueryError::StorageError(e.to_string()))?
36                    .map(|(_, row)| row)
37                    .collect();
38                Ok(QueryResult::Rows { columns, rows })
39            }
40
41            PlanNode::Filter { input, predicate } => {
42                // Materialize any IN-subqueries in the predicate before the
43                // scan loop — the closure can't call back into the engine.
44                // Correlated subqueries are left in place for per-row eval.
45                let materialized;
46                let predicate = if contains_subquery(predicate) {
47                    materialized = self.materialize_subqueries(predicate)?;
48                    &materialized
49                } else {
50                    predicate
51                };
52
53                // Correlated subquery path: per-row materialisation.
54                if contains_subquery(predicate) {
55                    let result = self.execute_plan(input)?;
56                    return match result {
57                        QueryResult::Rows { columns, rows } => {
58                            let mut filtered = Vec::new();
59                            for row in rows {
60                                let row_pred =
61                                    self.materialize_correlated_for_row(predicate, &row, &columns)?;
62                                if eval_predicate(&row_pred, &row, &columns) {
63                                    filtered.push(row);
64                                }
65                            }
66                            Ok(QueryResult::Rows {
67                                columns,
68                                rows: filtered,
69                            })
70                        }
71                        _ => Err("filter requires row input".into()),
72                    };
73                }
74
75                // Fast path: fuse Filter + SeqScan into a zero-copy streaming
76                // loop. Uses decode_column() to evaluate the predicate on only
77                // the columns it references, avoiding heap allocations for
78                // String/Bytes columns that aren't part of the filter.
79                if let PlanNode::SeqScan { table } = input.as_ref() {
80                    // Auto-refresh dirty materialized views.
81                    if self.view_registry.is_dirty(table) {
82                        self.refresh_view(table)?;
83                    }
84                    let schema = self
85                        .catalog
86                        .schema(table)
87                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
88                        .clone();
89                    let columns: Vec<String> =
90                        schema.columns.iter().map(|c| c.name.clone()).collect();
91                    let fast = FastLayout::new(&schema);
92                    let row_layout = RowLayout::new(&schema);
93                    // Mission F: pre-size to skip the first 4 Vec doublings
94                    // (4 → 8 → 16 → 32 → 64). On a 100K-row scan with 30%
95                    // selectivity that's ~4 fewer reallocations + memcpys.
96                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
97
98                    // Try compiled predicate for the filter check (handles
99                    // int leaves, string-eq leaves, and And conjunctions).
100                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
101                        self.catalog
102                            .for_each_row_raw(table, |_rid, data| {
103                                if compiled(data) {
104                                    rows.push(decode_row(&schema, data));
105                                }
106                            })
107                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
108                    } else {
109                        let pred_cols = predicate_column_indices(predicate, &columns);
110                        self.catalog
111                            .for_each_row_raw(table, |_rid, data| {
112                                let pred_row =
113                                    decode_selective(&schema, &row_layout, data, &pred_cols);
114                                if eval_predicate(predicate, &pred_row, &columns) {
115                                    rows.push(decode_row(&schema, data));
116                                }
117                            })
118                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
119                    }
120
121                    return Ok(QueryResult::Rows { columns, rows });
122                }
123
124                // General path: materialise then filter.
125                let result = self.execute_plan(input)?;
126                match result {
127                    QueryResult::Rows { columns, rows } => {
128                        let filtered: Vec<Vec<Value>> = rows
129                            .into_iter()
130                            .filter(|row| eval_predicate(predicate, row, &columns))
131                            .collect();
132                        Ok(QueryResult::Rows {
133                            columns,
134                            rows: filtered,
135                        })
136                    }
137                    _ => Err("filter requires row input".into()),
138                }
139            }
140
141            PlanNode::Project { input, fields } => {
142                // Fast path: Project over IndexScan — decode only projected
143                // columns from raw bytes instead of full decode_row.
144                if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
145                    let schema = self
146                        .catalog
147                        .schema(table)
148                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
149                        .clone();
150                    let all_columns: Vec<String> =
151                        schema.columns.iter().map(|c| c.name.clone()).collect();
152                    let key_value = literal_to_value(key)?;
153                    let tbl = self
154                        .catalog
155                        .get_table(table)
156                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
157
158                    let proj_columns: Vec<String> = fields
159                        .iter()
160                        .map(|f| {
161                            f.alias.clone().unwrap_or_else(|| match &f.expr {
162                                Expr::Field(name) => name.clone(),
163                                _ => "?".into(),
164                            })
165                        })
166                        .collect();
167
168                    // Determine which column indices the projection needs
169                    let proj_indices: Vec<usize> = fields
170                        .iter()
171                        .filter_map(|f| {
172                            if let Expr::Field(name) = &f.expr {
173                                all_columns.iter().position(|c| c == name)
174                            } else {
175                                None
176                            }
177                        })
178                        .collect();
179
180                    if tbl.has_index(column) {
181                        let layout = RowLayout::new(&schema);
182                        let rids = tbl.index_lookup_all(column, &key_value);
183                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
184                        for rid in rids {
185                            if let Some(data) = tbl.heap.get(rid) {
186                                let row: Vec<Value> = proj_indices
187                                    .iter()
188                                    .map(|&ci| decode_column(&schema, &layout, &data, ci))
189                                    .collect();
190                                rows.push(row);
191                            }
192                        }
193                        return Ok(QueryResult::Rows {
194                            columns: proj_columns,
195                            rows,
196                        });
197                    }
198                }
199
200                // Fast path: Project(Limit(Sort(Filter(SeqScan)))) — bounded
201                // top-N heap. Decodes only the sort key + projected columns,
202                // keeps at most `limit` rows in a heap. Also handles the
203                // Project(Limit(Sort(SeqScan))) variant (no filter).
204                if let PlanNode::Limit {
205                    input: inner,
206                    count: limit_expr,
207                } = input.as_ref()
208                {
209                    if let PlanNode::Sort {
210                        input: sort_input,
211                        keys,
212                    } = inner.as_ref()
213                    {
214                        // Fast path only for single-key sorts
215                        if keys.len() == 1 {
216                            let sort_field = &keys[0].field;
217                            let descending = keys[0].descending;
218                            let limit = match limit_expr {
219                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
220                                _ => usize::MAX,
221                            };
222                            let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
223                                match sort_input.as_ref() {
224                                    PlanNode::SeqScan { table } => (Some(table.as_str()), None),
225                                    PlanNode::Filter {
226                                        input: fi,
227                                        predicate,
228                                    } => {
229                                        if let PlanNode::SeqScan { table } = fi.as_ref() {
230                                            (Some(table.as_str()), Some(predicate))
231                                        } else {
232                                            (None, None)
233                                        }
234                                    }
235                                    _ => (None, None),
236                                };
237                            if let Some(table) = table_opt {
238                                if let Some(result) = self.project_filter_sort_limit_fast(
239                                    table, fields, sort_field, descending, limit, pred_opt,
240                                )? {
241                                    return Ok(result);
242                                }
243                            }
244                        }
245                    }
246                    // Fast path: Project(Limit(Filter(SeqScan))) — stream,
247                    // decode only projected columns, stop at limit.
248                    if let PlanNode::Filter {
249                        input: fi,
250                        predicate,
251                    } = inner.as_ref()
252                    {
253                        if let PlanNode::SeqScan { table } = fi.as_ref() {
254                            let limit = match limit_expr {
255                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
256                                _ => usize::MAX,
257                            };
258                            if let Some(result) = self.project_filter_limit_fast(
259                                table,
260                                fields,
261                                limit,
262                                Some(predicate),
263                            )? {
264                                return Ok(result);
265                            }
266                        }
267                    }
268                    // Fast path: Project(Limit(SeqScan)) — stream, no filter.
269                    if let PlanNode::SeqScan { table } = inner.as_ref() {
270                        let limit = match limit_expr {
271                            Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
272                            _ => usize::MAX,
273                        };
274                        if let Some(result) =
275                            self.project_filter_limit_fast(table, fields, limit, None)?
276                        {
277                            return Ok(result);
278                        }
279                    }
280                }
281
282                // Mission D4: Project(Filter(SeqScan)) without Limit. Reuses
283                // `project_filter_limit_fast` with limit = usize::MAX so the
284                // hot loop decodes only projected columns and uses the
285                // compiled predicate. Previously this fell through to the
286                // generic Filter branch which materialised every column via
287                // `decode_row` then re-projected — quadratic work.
288                //
289                // multi_col_and_filter (`U filter .age > 30 and .status =
290                // "active" { .name, .age }`) was 6.18ms (0.7x SQLite) and
291                // is the load-bearing workload for this fast path.
292                if let PlanNode::Filter {
293                    input: fi,
294                    predicate,
295                } = input.as_ref()
296                {
297                    if let PlanNode::SeqScan { table } = fi.as_ref() {
298                        if let Some(result) = self.project_filter_limit_fast(
299                            table,
300                            fields,
301                            usize::MAX,
302                            Some(predicate),
303                        )? {
304                            return Ok(result);
305                        }
306                    }
307                }
308
309                // Mission D4: Project(SeqScan) without Filter or Limit.
310                // Decode only projected columns; the previous fall-through
311                // built full Vec<Value> rows then re-projected.
312                if let PlanNode::SeqScan { table } = input.as_ref() {
313                    if let Some(result) =
314                        self.project_filter_limit_fast(table, fields, usize::MAX, None)?
315                    {
316                        return Ok(result);
317                    }
318                }
319
320                let result = self.execute_plan(input)?;
321                match result {
322                    QueryResult::Rows { columns, rows } => {
323                        let proj_columns: Vec<String> = fields
324                            .iter()
325                            .map(|f| {
326                                f.alias.clone().unwrap_or_else(|| match &f.expr {
327                                    Expr::Field(name) => name.clone(),
328                                    // Mission E1.2: `{ u.name }` projects as the
329                                    // qualified column name so callers can still
330                                    // disambiguate across the join output.
331                                    Expr::QualifiedField { qualifier, field } => {
332                                        format!("{qualifier}.{field}")
333                                    }
334                                    _ => "?".into(),
335                                })
336                            })
337                            .collect();
338                        let proj_rows: Vec<Vec<Value>> = rows
339                            .iter()
340                            .map(|row| {
341                                fields
342                                    .iter()
343                                    .map(|f| eval_expr(&f.expr, row, &columns))
344                                    .collect()
345                            })
346                            .collect();
347                        Ok(QueryResult::Rows {
348                            columns: proj_columns,
349                            rows: proj_rows,
350                        })
351                    }
352                    _ => Err("project requires row input".into()),
353                }
354            }
355
356            PlanNode::Sort { input, keys } => {
357                let result = self.execute_plan(input)?;
358                match result {
359                    QueryResult::Rows { columns, mut rows } => {
360                        // WS2: row-count cap is a cheap secondary guard; the
361                        // byte budget is the real OOM defense for the sort
362                        // buffer (a few very large rows pass the row cap).
363                        if rows.len() > MAX_SORT_ROWS {
364                            return Err(QueryError::SortLimitExceeded);
365                        }
366                        self.charge_rows(&rows)?;
367                        let key_indices: Vec<(usize, bool)> = keys
368                            .iter()
369                            .map(|k| {
370                                columns
371                                    .iter()
372                                    .position(|c| c == &k.field)
373                                    .map(|idx| (idx, k.descending))
374                                    .ok_or_else(|| QueryError::ColumnNotFound {
375                                        table: String::new(),
376                                        column: k.field.clone(),
377                                    })
378                            })
379                            .collect::<Result<_, QueryError>>()?;
380                        rows.sort_by(|a, b| {
381                            for &(col_idx, descending) in &key_indices {
382                                let cmp = a[col_idx].cmp(&b[col_idx]);
383                                let cmp = if descending { cmp.reverse() } else { cmp };
384                                if cmp != std::cmp::Ordering::Equal {
385                                    return cmp;
386                                }
387                            }
388                            std::cmp::Ordering::Equal
389                        });
390                        Ok(QueryResult::Rows { columns, rows })
391                    }
392                    _ => Err("sort requires row input".into()),
393                }
394            }
395
396            PlanNode::Limit { input, count } => {
397                let result = self.execute_plan(input)?;
398                let n = match count {
399                    Expr::Literal(Literal::Int(v)) => *v as usize,
400                    _ => return Err("limit must be integer literal".into()),
401                };
402                match result {
403                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
404                        columns,
405                        rows: rows.into_iter().take(n).collect(),
406                    }),
407                    _ => Err("limit requires row input".into()),
408                }
409            }
410
411            PlanNode::Offset { input, count } => {
412                let result = self.execute_plan(input)?;
413                let n = match count {
414                    Expr::Literal(Literal::Int(v)) => *v as usize,
415                    _ => return Err("offset must be integer literal".into()),
416                };
417                match result {
418                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
419                        columns,
420                        rows: rows.into_iter().skip(n).collect(),
421                    }),
422                    _ => Err("offset requires row input".into()),
423                }
424            }
425
426            PlanNode::Aggregate {
427                input,
428                function,
429                field,
430            } => {
431                // Fast path: count() over SeqScan — count rows without any decode
432                if *function == AggFunc::Count {
433                    if let PlanNode::SeqScan { table } = input.as_ref() {
434                        // Auto-refresh a dirty materialized view before
435                        // counting it — otherwise count(View) returns stale
436                        // data after an underlying mutation (F3).
437                        if self.view_registry.is_dirty(table) {
438                            self.refresh_view(table)?;
439                        }
440                        let mut count: i64 = 0;
441                        self.catalog
442                            .for_each_row_raw(table, |_rid, _data| {
443                                count += 1;
444                            })
445                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
446                        return Ok(QueryResult::Scalar(Value::Int(count)));
447                    }
448                    // Fast path: count() over Filter(SeqScan) — try compiled
449                    // predicate first, fall back to decode_column path.
450                    // Skip a predicate carrying a subquery: the raw-bytes
451                    // evaluators here don't materialise subqueries, so
452                    // `count(T filter .x in (...))` would silently count 0
453                    // (F1). Falling through routes it to the generic path
454                    // that resolves the subquery correctly.
455                    if let PlanNode::Filter {
456                        input: inner,
457                        predicate,
458                    } = input.as_ref()
459                    {
460                        if let PlanNode::SeqScan { table } = inner.as_ref() {
461                            if self.view_registry.is_dirty(table) {
462                                self.refresh_view(table)?;
463                            }
464                        }
465                        if let (PlanNode::SeqScan { table }, false) =
466                            (inner.as_ref(), contains_subquery(predicate))
467                        {
468                            let schema = self
469                                .catalog
470                                .schema(table)
471                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
472                                .clone();
473                            let columns: Vec<String> =
474                                schema.columns.iter().map(|c| c.name.clone()).collect();
475                            let fast = FastLayout::new(&schema);
476                            let row_layout = RowLayout::new(&schema);
477
478                            // Try compiled predicate (zero-allocation hot path).
479                            // Handles int leaves, string-eq leaves, AND conjunctions.
480                            if let Some(compiled) =
481                                compile_predicate(predicate, &columns, &fast, &schema)
482                            {
483                                let mut count: i64 = 0;
484                                self.catalog
485                                    .for_each_row_raw(table, |_rid, data| {
486                                        if compiled(data) {
487                                            count += 1;
488                                        }
489                                    })
490                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
491                                return Ok(QueryResult::Scalar(Value::Int(count)));
492                            }
493
494                            // Fallback: decode predicate columns
495                            let pred_cols = predicate_column_indices(predicate, &columns);
496                            let mut count: i64 = 0;
497                            self.catalog
498                                .for_each_row_raw(table, |_rid, data| {
499                                    let pred_row =
500                                        decode_selective(&schema, &row_layout, data, &pred_cols);
501                                    if eval_predicate(predicate, &pred_row, &columns) {
502                                        count += 1;
503                                    }
504                                })
505                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
506
507                            return Ok(QueryResult::Scalar(Value::Int(count)));
508                        }
509                    }
510                }
511
512                // Fast path: sum/avg/min/max over a single fixed-size int
513                // column with an optional compiled filter predicate. Walks
514                // raw row bytes, zero allocation per row.
515                if matches!(
516                    function,
517                    AggFunc::Sum
518                        | AggFunc::Avg
519                        | AggFunc::Min
520                        | AggFunc::Max
521                        | AggFunc::CountDistinct
522                ) {
523                    if let Some(col) = field.as_ref() {
524                        // Shape: Aggregate(SeqScan) or Aggregate(Filter(SeqScan))
525                        let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
526                            match input.as_ref() {
527                                PlanNode::SeqScan { table } => (Some(table.as_str()), None),
528                                PlanNode::Filter {
529                                    input: inner,
530                                    predicate,
531                                } => {
532                                    if let PlanNode::SeqScan { table } = inner.as_ref() {
533                                        (Some(table.as_str()), Some(predicate))
534                                    } else {
535                                        (None, None)
536                                    }
537                                }
538                                _ => (None, None),
539                            };
540                        if let Some(table) = table_opt {
541                            if let Some(result) =
542                                self.agg_single_col_fast(table, col, *function, pred_opt)?
543                            {
544                                return Ok(result);
545                            }
546                        }
547                    }
548                }
549
550                // Fast path: Project(Limit(Filter(SeqScan))) — stream, decode
551                // only projected columns, stop once we hit the limit.
552                // (Handled in the Project branch; this branch only fires when
553                // the aggregate is the outer node.)
554                let result = self.execute_plan(input)?;
555                match result {
556                    QueryResult::Rows { columns, rows } => {
557                        match function {
558                            AggFunc::Count => {
559                                Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
560                            }
561                            AggFunc::CountDistinct => {
562                                let col = field.as_ref().ok_or("count distinct requires field")?;
563                                let idx = columns
564                                    .iter()
565                                    .position(|c| c == col)
566                                    .ok_or("col not found")?;
567                                let mut seen = std::collections::HashSet::new();
568                                for row in &rows {
569                                    let v = &row[idx];
570                                    if !v.is_empty() {
571                                        seen.insert(v.clone());
572                                    }
573                                }
574                                Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
575                            }
576                            AggFunc::Avg => {
577                                let col = field.as_ref().ok_or("avg requires field")?;
578                                let idx = columns
579                                    .iter()
580                                    .position(|c| c == col)
581                                    .ok_or("col not found")?;
582                                let sum: f64 = rows
583                                    .iter()
584                                    .filter_map(|r| match &r[idx] {
585                                        Value::Int(v) => Some(*v as f64),
586                                        Value::Float(v) => Some(*v),
587                                        _ => None,
588                                    })
589                                    .sum();
590                                let count = rows.len() as f64;
591                                Ok(QueryResult::Scalar(Value::Float(sum / count)))
592                            }
593                            AggFunc::Sum => {
594                                let col = field.as_ref().ok_or("sum requires field")?;
595                                let idx = columns
596                                    .iter()
597                                    .position(|c| c == col)
598                                    .ok_or("col not found")?;
599                                // Track int and float contributions separately so
600                                // Float columns (and mixed Int/Float rows) don't get
601                                // silently dropped as they did in the Int-only
602                                // version. If any Float is present, the whole sum
603                                // promotes to Float — matching Avg's semantics.
604                                let mut int_sum: i64 = 0;
605                                let mut float_sum: f64 = 0.0;
606                                let mut saw_float = false;
607                                for r in &rows {
608                                    match &r[idx] {
609                                        Value::Int(v) => int_sum += *v,
610                                        Value::Float(v) => {
611                                            float_sum += *v;
612                                            saw_float = true;
613                                        }
614                                        _ => {}
615                                    }
616                                }
617                                let result = if saw_float {
618                                    Value::Float(float_sum + int_sum as f64)
619                                } else {
620                                    Value::Int(int_sum)
621                                };
622                                Ok(QueryResult::Scalar(result))
623                            }
624                            AggFunc::Min | AggFunc::Max => {
625                                let col = field.as_ref().ok_or("min/max requires field")?;
626                                let idx = columns
627                                    .iter()
628                                    .position(|c| c == col)
629                                    .ok_or("col not found")?;
630                                let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
631                                let result = if *function == AggFunc::Min {
632                                    vals.into_iter().min().cloned()
633                                } else {
634                                    vals.into_iter().max().cloned()
635                                };
636                                Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
637                            }
638                        }
639                    }
640                    _ => Err("aggregate requires row input".into()),
641                }
642            }
643
644            PlanNode::Insert { table, rows } => {
645                // Build + validate EVERY row before inserting any, so a bad
646                // row (unknown/missing/uncoercible field) aborts the whole
647                // statement without a partial write. The WAL fsync happens
648                // once at statement end, so N rows = N appends + 1 fsync.
649                let all_values: Vec<Vec<Value>> = {
650                    let schema = self
651                        .catalog
652                        .schema(table)
653                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
654                    let mut all = Vec::with_capacity(rows.len());
655                    for assignments in rows {
656                        let mut values = vec![Value::Empty; schema.columns.len()];
657                        for a in assignments {
658                            let idx = schema.column_index(&a.field).ok_or_else(|| {
659                                QueryError::ColumnNotFound {
660                                    table: String::new(),
661                                    column: a.field.clone(),
662                                }
663                            })?;
664                            let raw = literal_to_value(&a.value)?;
665                            values[idx] = coerce_value(raw, &schema.columns[idx])?;
666                        }
667                        for col in &schema.columns {
668                            if col.required && matches!(values[col.position as usize], Value::Empty)
669                            {
670                                return Err(QueryError::Execution(format!(
671                                    "column '{}' is required but no value was provided",
672                                    col.name
673                                )));
674                            }
675                        }
676                        all.push(values);
677                    }
678                    all
679                };
680                // Charge the materialized batch against the per-query memory
681                // budget before inserting — keeps multi-row insert consistent
682                // with every other full-materialization point (sort/join/group)
683                // and bounds embedded callers (the server also caps the query
684                // string at 1 MB, but embedded callers have no such limit).
685                self.charge_rows(&all_values)?;
686                let n = all_values.len() as u64;
687                for values in &all_values {
688                    self.catalog
689                        .insert(table, values)
690                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
691                }
692                self.view_registry.mark_dependents_dirty(table);
693                Ok(QueryResult::Modified(n))
694            }
695
696            PlanNode::Upsert {
697                table,
698                key_column,
699                assignments,
700                on_conflict,
701            } => {
702                let (values, key_idx) = {
703                    let schema = self
704                        .catalog
705                        .schema(table)
706                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
707                    let mut values = vec![Value::Empty; schema.columns.len()];
708                    for a in assignments {
709                        let idx = schema.column_index(&a.field).ok_or_else(|| {
710                            QueryError::ColumnNotFound {
711                                table: String::new(),
712                                column: a.field.clone(),
713                            }
714                        })?;
715                        let raw = literal_to_value(&a.value)?;
716                        values[idx] = coerce_value(raw, &schema.columns[idx])?;
717                    }
718                    for col in &schema.columns {
719                        if col.required && matches!(values[col.position as usize], Value::Empty) {
720                            return Err(QueryError::Execution(format!(
721                                "column '{}' is required but no value was provided",
722                                col.name
723                            )));
724                        }
725                    }
726                    let key_idx = schema
727                        .column_index(key_column)
728                        .ok_or_else(|| format!("key column '{key_column}' not found"))?;
729                    (values, key_idx)
730                };
731
732                // Upsert requires the `on` column to be unique — otherwise
733                // there is no well-defined row to overwrite and a plain
734                // insert could silently create duplicate keys.
735                if self.catalog.is_index_unique(table, key_column) != Some(true) {
736                    return Err(QueryError::Execution(format!(
737                        "upsert on .{key_column} requires a unique column (declare it with \
738                         `unique {key_column}: <type>` or `alter {table} add unique .{key_column}`)"
739                    )));
740                }
741
742                let key_value = values[key_idx].clone();
743
744                // Probe the unique index for a conflict.
745                let existing = {
746                    let tbl = self
747                        .catalog
748                        .get_table(table)
749                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
750                    // The key column is guaranteed unique above, so this
751                    // returns at most one matching row.
752                    let rids = tbl.index_lookup_all(key_column, &key_value);
753                    rids.into_iter().next().and_then(|rid| {
754                        tbl.heap
755                            .get(rid)
756                            .map(|data| (rid, decode_row(&tbl.schema, &data)))
757                    })
758                };
759
760                if let Some((rid, mut existing_row)) = existing {
761                    // Conflict: apply on_conflict assignments (or all non-key if empty).
762                    let update_assignments = if on_conflict.is_empty() {
763                        assignments
764                    } else {
765                        on_conflict
766                    };
767                    let changed_cols: Vec<usize> = {
768                        let schema = self
769                            .catalog
770                            .schema(table)
771                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
772                        let mut indices = Vec::new();
773                        for a in update_assignments {
774                            let idx = schema.column_index(&a.field).ok_or_else(|| {
775                                QueryError::ColumnNotFound {
776                                    table: String::new(),
777                                    column: a.field.clone(),
778                                }
779                            })?;
780                            if idx != key_idx {
781                                existing_row[idx] = literal_to_value(&a.value)?;
782                                indices.push(idx);
783                            }
784                        }
785                        indices
786                    };
787                    self.catalog
788                        .update_hinted(table, rid, &existing_row, Some(&changed_cols))
789                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
790                    self.view_registry.mark_dependents_dirty(table);
791                    Ok(QueryResult::Modified(1))
792                } else {
793                    // No conflict: insert.
794                    self.catalog
795                        .insert(table, &values)
796                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
797                    self.view_registry.mark_dependents_dirty(table);
798                    Ok(QueryResult::Modified(1))
799                }
800            }
801
802            PlanNode::Update {
803                input,
804                table,
805                assignments,
806            } => {
807                // Mission C Phase 3: resolve assignments against a borrowed
808                // schema, then drop the borrow before the mutation loop.
809                // Try literal-only path first; fall back to per-row expression
810                // evaluation if any assignment contains a non-literal expression
811                // (e.g., `age := .age + 1`).
812                let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
813                    let schema_ref = self
814                        .catalog
815                        .schema(table)
816                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
817                    let indices: Vec<usize> = assignments
818                        .iter()
819                        .map(|a| {
820                            schema_ref.column_index(&a.field).ok_or_else(|| {
821                                QueryError::ColumnNotFound {
822                                    table: String::new(),
823                                    column: a.field.clone(),
824                                }
825                            })
826                        })
827                        .collect::<Result<_, _>>()?;
828                    let vals: Result<Vec<Value>, _> = assignments
829                        .iter()
830                        .map(|a| literal_to_value(&a.value))
831                        .collect();
832                    (indices, vals.ok())
833                };
834                let resolved_assignments: Option<Vec<(usize, Value)>> =
835                    literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
836
837                // Mission C Phase 2: the hint Table::update_hinted needs to
838                // decide whether to read the old row for index diff.
839                let changed_cols: Vec<usize> = col_indices.clone();
840
841                // ── Fused scan+update for Update(Filter(SeqScan)) ────────
842                // Perf sprint: instead of the two-pass collect-RIDs-then-loop
843                // pattern (which pays one ensure_hot per matched row on the
844                // second pass), fuse the predicate evaluation and in-place
845                // byte-level mutation into a single heap walk. Same idea as
846                // the fused scan_delete_matching path for deletes.
847                if let Some(ref resolved_assignments) = resolved_assignments {
848                    if let PlanNode::Filter {
849                        input: inner,
850                        predicate,
851                    } = input.as_ref()
852                    {
853                        if let PlanNode::SeqScan { table: t } = inner.as_ref() {
854                            if t == table {
855                                let fused_result = self.try_fused_scan_update(
856                                    table,
857                                    predicate,
858                                    resolved_assignments,
859                                    &changed_cols,
860                                );
861                                if let Some(result) = fused_result {
862                                    return result;
863                                }
864                            }
865                        }
866                    }
867                }
868
869                // Collect matching RowIds in a single pass.
870                let matching_rids = self.collect_rids_for_mutation(input, table)?;
871
872                // ── Literal-only fast paths ─────────────────────────────
873                if let Some(ref resolved_assignments) = resolved_assignments {
874                    // Mission C Phase 4: in-place byte-patch fast path. If every
875                    // assignment targets a fixed-size non-null column AND none of
876                    // them is indexed, we can skip decode_row / Vec<Value> /
877                    // encode_row_into entirely and patch the row's raw bytes on
878                    // the hot page.
879                    let fast_patch: Option<Vec<FastPatch>> = {
880                        let tbl = self
881                            .catalog
882                            .get_table(table)
883                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
884                        let schema = &tbl.schema;
885                        let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
886                            is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
887                        });
888                        let no_indexed = !resolved_assignments
889                            .iter()
890                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
891
892                        if all_fixed_nonnull && no_indexed {
893                            let layout = RowLayout::new(schema);
894                            let bitmap_size = layout.bitmap_size();
895                            let patches: Vec<FastPatch> = resolved_assignments
896                                .iter()
897                                .map(|(idx, val)| {
898                                    let fixed_off = layout
899                                        .fixed_offset(*idx)
900                                        .expect("is_fixed_size already checked");
901                                    let field_off = 2 + bitmap_size + fixed_off;
902                                    let bytes: FixedBytes = match val {
903                                        Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
904                                        Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
905                                        Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
906                                        Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
907                                        Value::Uuid(v) => FixedBytes::Uuid(*v),
908                                        _ => unreachable!("all_fixed_nonnull guard lied"),
909                                    };
910                                    FastPatch {
911                                        field_off,
912                                        bitmap_byte_off: 2 + idx / 8,
913                                        bit_mask: 1u8 << (idx % 8),
914                                        bytes,
915                                    }
916                                })
917                                .collect();
918                            Some(patches)
919                        } else {
920                            None
921                        }
922                    };
923
924                    if let Some(patches) = fast_patch {
925                        let mut count = 0u64;
926                        for rid in matching_rids {
927                            // Mission B2: WAL-log every patch so crash
928                            // recovery replays the update. Same mutation
929                            // closure as before — the wrapper just sandwiches
930                            // it between a hot-page read and a WAL append.
931                            let ok = self
932                                .catalog
933                                .update_row_bytes_logged(table, rid, |row| {
934                                    let base = row_body_base(row);
935                                    for p in &patches {
936                                        row[base + p.bitmap_byte_off] &= !p.bit_mask;
937                                        let field_bytes = p.bytes.as_slice();
938                                        row[base + p.field_off
939                                            ..base + p.field_off + field_bytes.len()]
940                                            .copy_from_slice(field_bytes);
941                                    }
942                                })
943                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
944                            if ok {
945                                count += 1;
946                            }
947                        }
948                        self.view_registry.mark_dependents_dirty(table);
949                        return Ok(QueryResult::Modified(count));
950                    }
951
952                    // Mission C Phase 10: var-column in-place shrink fast path.
953                    let var_fast: Option<(usize, Option<Vec<u8>>)> = {
954                        let tbl = self
955                            .catalog
956                            .get_table(table)
957                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
958                        let schema = &tbl.schema;
959                        let is_single = resolved_assignments.len() == 1;
960                        let is_var_col = is_single
961                            && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
962                        let no_indexed = !resolved_assignments
963                            .iter()
964                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
965
966                        if is_single && is_var_col && no_indexed {
967                            let (idx, val) = &resolved_assignments[0];
968                            let bytes_opt: Option<Vec<u8>> = match val {
969                                Value::Str(s) => Some(s.as_bytes().to_vec()),
970                                Value::Bytes(b) => Some(b.clone()),
971                                Value::Empty => None,
972                                _ => {
973                                    return Err(QueryError::TypeError(format!(
974                                        "cannot assign non-var value to var column '{}'",
975                                        schema.columns[*idx].name
976                                    )))
977                                }
978                            };
979                            Some((*idx, bytes_opt))
980                        } else {
981                            None
982                        }
983                    };
984
985                    if let Some((col_idx, new_bytes_opt)) = var_fast {
986                        let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
987                        let mut count = 0u64;
988                        let mut fallback_rids: Vec<RowId> = Vec::new();
989                        for rid in &matching_rids {
990                            // Mission B2: logged variant so crash recovery
991                            // replays the shrink. On a false return (row
992                            // would have to grow), the rid is pushed to
993                            // `fallback_rids` and the slower `update_hinted`
994                            // path — which is already WAL-logged — picks it up.
995                            let ok = self
996                                .catalog
997                                .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
998                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
999                            if ok {
1000                                count += 1;
1001                            } else {
1002                                fallback_rids.push(*rid);
1003                            }
1004                        }
1005                        for rid in fallback_rids {
1006                            let mut row = match self.catalog.get(table, rid) {
1007                                Some(r) => r,
1008                                None => continue,
1009                            };
1010                            for (idx, val) in resolved_assignments.iter() {
1011                                row[*idx] = val.clone();
1012                            }
1013                            self.catalog
1014                                .update_hinted(table, rid, &row, Some(&changed_cols))
1015                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1016                            count += 1;
1017                        }
1018                        self.view_registry.mark_dependents_dirty(table);
1019                        return Ok(QueryResult::Modified(count));
1020                    }
1021
1022                    // Generic literal path: decode row, apply literal values.
1023                    let mut count = 0u64;
1024                    for rid in matching_rids {
1025                        let mut row = match self.catalog.get(table, rid) {
1026                            Some(r) => r,
1027                            None => continue,
1028                        };
1029                        for (idx, val) in resolved_assignments.iter() {
1030                            row[*idx] = val.clone();
1031                        }
1032                        self.catalog
1033                            .update_hinted(table, rid, &row, Some(&changed_cols))
1034                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1035                        count += 1;
1036                    }
1037                    self.view_registry.mark_dependents_dirty(table);
1038                    return Ok(QueryResult::Modified(count));
1039                } // end if let Some(resolved_assignments)
1040
1041                // ── Expression-based update path ────────────────────────
1042                // At least one assignment contains a non-literal expression
1043                // (e.g., `age := .age + 1`). Evaluate per-row.
1044                let col_names: Vec<String> = {
1045                    let schema_ref = self
1046                        .catalog
1047                        .schema(table)
1048                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1049                    schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1050                };
1051                let mut count = 0u64;
1052                for rid in matching_rids {
1053                    let mut row = match self.catalog.get(table, rid) {
1054                        Some(r) => r,
1055                        None => continue,
1056                    };
1057                    for (i, asgn) in assignments.iter().enumerate() {
1058                        let val = eval_expr(&asgn.value, &row, &col_names);
1059                        row[col_indices[i]] = val;
1060                    }
1061                    self.catalog
1062                        .update_hinted(table, rid, &row, Some(&changed_cols))
1063                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1064                    count += 1;
1065                }
1066                self.view_registry.mark_dependents_dirty(table);
1067                Ok(QueryResult::Modified(count))
1068            }
1069
1070            PlanNode::Delete { input, table } => {
1071                // Mission C Phase 3: no schema clone — collect_rids_for_mutation
1072                // looks up schema internally when it needs one, and the mutation
1073                // loop doesn't need the schema at all.
1074                //
1075                // Mission C Phase 12: route bulk deletes through
1076                // `Catalog::delete_many`, which batches the btree leaf
1077                // compaction and shares one `ensure_hot` per row between
1078                // the index-key extraction and the slot delete. On
1079                // `delete_by_filter` (100K fixture, ~20K matches) that
1080                // removes ~4ms of pure `Vec::remove` memmove from the btree
1081                // maintenance phase.
1082                //
1083                // Mission C Phase 16: for the common `delete where ...`
1084                // shape (Filter(SeqScan)) — and the rarer "delete
1085                // everything" shape (SeqScan) — skip the two-pass
1086                // `collect_rids_for_mutation` + `delete_many` flow entirely.
1087                // The fused `scan_delete_matching` primitive walks the
1088                // heap exactly once, paying one `ensure_hot` per page
1089                // instead of per-row. That closes the last major gap on
1090                // the bench's `delete_by_filter` workload.
1091                if let PlanNode::Filter {
1092                    input: inner,
1093                    predicate,
1094                } = input.as_ref()
1095                {
1096                    if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1097                        if t == table {
1098                            let schema = self
1099                                .catalog
1100                                .schema(table)
1101                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1102                            let columns: Vec<String> =
1103                                schema.columns.iter().map(|c| c.name.clone()).collect();
1104                            let fast = FastLayout::new(schema);
1105                            if let Some(compiled) =
1106                                compile_predicate(predicate, &columns, &fast, schema)
1107                            {
1108                                // Mission B2: logged variant so every
1109                                // matched rid hits the WAL during the
1110                                // single-pass scan. Structure of the
1111                                // fused scan is unchanged — only the
1112                                // hook closure now also appends.
1113                                let count = self
1114                                    .catalog
1115                                    .scan_delete_matching_logged(table, |data| compiled(data))
1116                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1117                                self.view_registry.mark_dependents_dirty(table);
1118                                return Ok(QueryResult::Modified(count));
1119                            }
1120                        }
1121                    }
1122                } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1123                    if t == table {
1124                        // `delete from T` with no predicate — every live
1125                        // row matches. One pass is still the right shape.
1126                        // Mission B2: logged variant — see above.
1127                        let count = self
1128                            .catalog
1129                            .scan_delete_matching_logged(table, |_| true)
1130                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1131                        self.view_registry.mark_dependents_dirty(table);
1132                        return Ok(QueryResult::Modified(count));
1133                    }
1134                }
1135
1136                let matching_rids = self.collect_rids_for_mutation(input, table)?;
1137                let count = self
1138                    .catalog
1139                    .delete_many(table, &matching_rids)
1140                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1141                self.view_registry.mark_dependents_dirty(table);
1142                Ok(QueryResult::Modified(count))
1143            }
1144
1145            PlanNode::AliasScan { table, alias } => {
1146                // Mission E1.2: scan `table` and rename every output column
1147                // to `alias.field`. Used as a join leaf so downstream
1148                // NestedLoopJoin + Filter + Project nodes can resolve
1149                // `Expr::QualifiedField` lookups by direct column-name match.
1150                //
1151                // We don't bother with a fused zero-copy loop here yet — the
1152                // whole join path is nested-loop and correctness-first
1153                // (Phase E1.3 will introduce hash join and at that point we
1154                // can revisit whether to specialise AliasScan).
1155                let schema = self
1156                    .catalog
1157                    .schema(table)
1158                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1159                    .clone();
1160                let columns: Vec<String> = schema
1161                    .columns
1162                    .iter()
1163                    .map(|c| format!("{alias}.{}", c.name))
1164                    .collect();
1165                let rows: Vec<Vec<Value>> = self
1166                    .catalog
1167                    .scan(table)
1168                    .map_err(|e| QueryError::StorageError(e.to_string()))?
1169                    .map(|(_, row)| row)
1170                    .collect();
1171                Ok(QueryResult::Rows { columns, rows })
1172            }
1173
1174            PlanNode::NestedLoopJoin {
1175                left,
1176                right,
1177                on,
1178                kind,
1179            } => {
1180                // Materialise both sides. The executor ships two strategies:
1181                //   1. Hash join (E1.3) — when the `on` predicate is a
1182                //      simple equi-predicate `left_col = right_col`, build a
1183                //      FxHashMap<Value, Vec<row_idx>> over the right side
1184                //      and probe with the left side. O(L + R) instead of
1185                //      O(L × R). Handles Inner and LeftOuter.
1186                //   2. Nested loop (E1.2) — fallback for Cross, non-equi
1187                //      predicates, or `on` expressions that reference
1188                //      either side with something more complex than a
1189                //      QualifiedField.
1190                let left_result = self.execute_plan(left)?;
1191                let right_result = self.execute_plan(right)?;
1192                let (left_columns, left_rows) = match left_result {
1193                    QueryResult::Rows { columns, rows } => (columns, rows),
1194                    _ => return Err("join left side must produce rows".into()),
1195                };
1196                let (right_columns, right_rows) = match right_result {
1197                    QueryResult::Rows { columns, rows } => (columns, rows),
1198                    _ => return Err("join right side must produce rows".into()),
1199                };
1200
1201                // WS2: byte-budget guard on the join build side. Charge both
1202                // materialized inputs before we build the hash table / probe;
1203                // the output is row-capped by check_join_limit below.
1204                self.charge_rows(&left_rows)?;
1205                self.charge_rows(&right_rows)?;
1206
1207                // Hash-join fast path.
1208                if !matches!(kind, JoinKind::Cross) {
1209                    if let Some(pred) = on {
1210                        if let Some((l_idx, r_idx)) =
1211                            try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1212                        {
1213                            let result = hash_join(
1214                                left_columns,
1215                                left_rows,
1216                                right_columns,
1217                                right_rows,
1218                                l_idx,
1219                                r_idx,
1220                                *kind,
1221                            );
1222                            if let QueryResult::Rows { ref rows, .. } = result {
1223                                check_join_limit(rows.len())?;
1224                            }
1225                            return Ok(result);
1226                        }
1227                    }
1228                }
1229
1230                // Nested-loop fallback.
1231                let n_left = left_columns.len();
1232                let n_right = right_columns.len();
1233                let mut columns = Vec::with_capacity(n_left + n_right);
1234                columns.extend(left_columns);
1235                columns.extend(right_columns);
1236
1237                let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1238                let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1239
1240                for left_row in &left_rows {
1241                    let mut matched = false;
1242                    for right_row in &right_rows {
1243                        combined.clear();
1244                        combined.extend_from_slice(left_row);
1245                        combined.extend_from_slice(right_row);
1246                        let keep = match kind {
1247                            JoinKind::Cross => true,
1248                            JoinKind::Inner | JoinKind::LeftOuter => match on {
1249                                Some(pred) => eval_predicate(pred, &combined, &columns),
1250                                // Missing `on` for non-cross joins is a
1251                                // parser error, but if it slips through we
1252                                // treat it as "match everything".
1253                                None => true,
1254                            },
1255                            // RightOuter is rewritten to LeftOuter by the
1256                            // planner, so we never see it here.
1257                            JoinKind::RightOuter => {
1258                                unreachable!("planner rewrites RightOuter to LeftOuter")
1259                            }
1260                        };
1261                        if keep {
1262                            rows.push(combined.clone());
1263                            check_join_limit(rows.len())?;
1264                            matched = true;
1265                        }
1266                    }
1267                    if !matched && matches!(kind, JoinKind::LeftOuter) {
1268                        let mut row = Vec::with_capacity(n_left + n_right);
1269                        row.extend_from_slice(left_row);
1270                        row.resize(n_left + n_right, Value::Empty);
1271                        rows.push(row);
1272                        check_join_limit(rows.len())?;
1273                    }
1274                }
1275
1276                Ok(QueryResult::Rows { columns, rows })
1277            }
1278
1279            PlanNode::Distinct { input } => {
1280                let result = self.execute_plan(input)?;
1281                match result {
1282                    QueryResult::Rows { columns, rows } => {
1283                        let mut seen = std::collections::HashSet::new();
1284                        let mut unique_rows = Vec::new();
1285                        for row in rows {
1286                            if seen.insert(row.clone()) {
1287                                unique_rows.push(row);
1288                            }
1289                        }
1290                        Ok(QueryResult::Rows {
1291                            columns,
1292                            rows: unique_rows,
1293                        })
1294                    }
1295                    other => Ok(other),
1296                }
1297            }
1298
1299            PlanNode::GroupBy {
1300                input,
1301                keys,
1302                aggregates,
1303                having,
1304            } => {
1305                let result = self.execute_plan(input)?;
1306                match result {
1307                    QueryResult::Rows { columns, rows } => {
1308                        // WS2: byte-budget guard on the GROUP BY input buffer
1309                        // (the hash table is bounded by the input it groups).
1310                        self.charge_rows(&rows)?;
1311                        // Resolve key column indices.
1312                        let key_indices: Vec<usize> = keys
1313                            .iter()
1314                            .map(|k| {
1315                                columns
1316                                    .iter()
1317                                    .position(|c| c == k)
1318                                    .ok_or_else(|| format!("group-by column '{k}' not found"))
1319                            })
1320                            .collect::<Result<Vec<_>, _>>()?;
1321
1322                        // Resolve aggregate field indices. count(*) uses
1323                        // sentinel usize::MAX — compute_group_aggregate
1324                        // treats it as "count all rows in the group".
1325                        let agg_field_indices: Vec<usize> = aggregates
1326                            .iter()
1327                            .map(|a| {
1328                                if a.field == "*" {
1329                                    Ok(usize::MAX)
1330                                } else {
1331                                    columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1332                                        format!("aggregate column '{}' not found", a.field)
1333                                    })
1334                                }
1335                            })
1336                            .collect::<Result<Vec<_>, _>>()?;
1337
1338                        // Group rows by key values (preserving insertion order).
1339                        let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1340                            rustc_hash::FxHashMap::default();
1341                        let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1342                        for (ri, row) in rows.iter().enumerate() {
1343                            let key: Vec<Value> =
1344                                key_indices.iter().map(|&i| row[i].clone()).collect();
1345                            match group_map.get(&key) {
1346                                Some(&idx) => groups[idx].1.push(ri),
1347                                None => {
1348                                    let idx = groups.len();
1349                                    group_map.insert(key.clone(), idx);
1350                                    groups.push((key, vec![ri]));
1351                                }
1352                            }
1353                        }
1354
1355                        // Build output column names: keys ++ aggregate output names.
1356                        let mut out_columns: Vec<String> = keys.clone();
1357                        for agg in aggregates.iter() {
1358                            out_columns.push(agg.output_name.clone());
1359                        }
1360
1361                        // Compute aggregates per group.
1362                        let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1363                        for (key_vals, row_indices) in &groups {
1364                            let mut row = key_vals.clone();
1365                            for (ai, agg) in aggregates.iter().enumerate() {
1366                                let col_idx = agg_field_indices[ai];
1367                                let val = compute_group_aggregate(
1368                                    agg.function,
1369                                    &rows,
1370                                    row_indices,
1371                                    col_idx,
1372                                );
1373                                row.push(val);
1374                            }
1375                            out_rows.push(row);
1376                        }
1377
1378                        // Apply HAVING filter.
1379                        if let Some(having_expr) = having {
1380                            out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1381                        }
1382
1383                        Ok(QueryResult::Rows {
1384                            columns: out_columns,
1385                            rows: out_rows,
1386                        })
1387                    }
1388                    _ => Err("group by requires row input".into()),
1389                }
1390            }
1391
1392            PlanNode::CreateTable { name, fields } => {
1393                let columns: Vec<ColumnDef> = fields
1394                    .iter()
1395                    .enumerate()
1396                    .map(|(i, f)| -> Result<ColumnDef, QueryError> {
1397                        Ok(ColumnDef {
1398                            name: f.name.clone(),
1399                            type_id: type_name_to_id(&f.type_name)
1400                                .map_err(QueryError::TypeError)?,
1401                            required: f.required,
1402                            position: i as u16,
1403                        })
1404                    })
1405                    .collect::<Result<Vec<_>, _>>()?;
1406                let schema = Schema {
1407                    table_name: name.clone(),
1408                    columns,
1409                };
1410                self.catalog
1411                    .create_table(schema)
1412                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1413                // Declaring a field `unique` auto-creates a unique B+tree
1414                // index, which is where uniqueness is enforced on writes.
1415                for f in fields.iter().filter(|f| f.unique) {
1416                    self.catalog
1417                        .create_index_unique(name, &f.name, true)
1418                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1419                }
1420                Ok(QueryResult::Created(name.clone()))
1421            }
1422
1423            PlanNode::AlterTable { table, action } => match action {
1424                AlterAction::AddColumn {
1425                    name,
1426                    type_name,
1427                    required,
1428                } => {
1429                    let position = self
1430                        .catalog
1431                        .schema(table)
1432                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1433                        .columns
1434                        .len() as u16;
1435                    let col = ColumnDef {
1436                        name: name.clone(),
1437                        type_id: type_name_to_id(type_name).map_err(QueryError::TypeError)?,
1438                        required: *required,
1439                        position,
1440                    };
1441                    self.catalog
1442                        .alter_table_add_column(table, col)
1443                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1444                    Ok(QueryResult::Executed {
1445                        message: format!("column '{name}' added to '{table}'"),
1446                    })
1447                }
1448                AlterAction::DropColumn { name } => {
1449                    self.catalog
1450                        .alter_table_drop_column(table, name)
1451                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1452                    Ok(QueryResult::Executed {
1453                        message: format!("column '{name}' dropped from '{table}'"),
1454                    })
1455                }
1456                AlterAction::AddIndex { column } => {
1457                    self.catalog
1458                        .create_index(table, column)
1459                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1460                    Ok(QueryResult::Executed {
1461                        message: format!("index on '{table}.{column}' created"),
1462                    })
1463                }
1464                AlterAction::AddUnique { column } => {
1465                    // No DropIndex exists, so we cannot upgrade an existing
1466                    // non-unique index in place — reject it cleanly.
1467                    if self.catalog.has_index(table, column) {
1468                        return Err(QueryError::Execution(format!(
1469                            "cannot add unique on {table}.{column}: column already indexed"
1470                        )));
1471                    }
1472                    // Scan existing rows for duplicate (non-null) values
1473                    // before creating the unique index.
1474                    {
1475                        let tbl = self
1476                            .catalog
1477                            .get_table(table)
1478                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1479                        let col_idx = tbl.schema.column_index(column).ok_or_else(|| {
1480                            QueryError::ColumnNotFound {
1481                                table: table.to_string(),
1482                                column: column.clone(),
1483                            }
1484                        })?;
1485                        let mut seen = std::collections::HashSet::new();
1486                        for (_, row) in tbl.scan() {
1487                            let v = &row[col_idx];
1488                            if v.is_empty() {
1489                                continue;
1490                            }
1491                            if !seen.insert(v.clone()) {
1492                                return Err(QueryError::Execution(format!(
1493                                    "cannot add unique on {table}.{column}: \
1494                                     duplicate value {v:?} exists"
1495                                )));
1496                            }
1497                        }
1498                    }
1499                    self.catalog
1500                        .create_index_unique(table, column, true)
1501                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1502                    Ok(QueryResult::Executed {
1503                        message: format!("unique index on '{table}.{column}' created"),
1504                    })
1505                }
1506            },
1507
1508            PlanNode::DropTable { name } => {
1509                self.catalog
1510                    .drop_table(name)
1511                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1512                Ok(QueryResult::Executed {
1513                    message: format!("table '{name}' dropped"),
1514                })
1515            }
1516
1517            PlanNode::CreateView { name, query_text } => {
1518                self.create_view(name, query_text)?;
1519                Ok(QueryResult::Executed {
1520                    message: format!("materialized view '{name}' created"),
1521                })
1522            }
1523
1524            PlanNode::RefreshView { name } => {
1525                self.refresh_view(name)?;
1526                Ok(QueryResult::Executed {
1527                    message: format!("materialized view '{name}' refreshed"),
1528                })
1529            }
1530
1531            PlanNode::DropView { name } => {
1532                self.drop_view(name)?;
1533                Ok(QueryResult::Executed {
1534                    message: format!("materialized view '{name}' dropped"),
1535                })
1536            }
1537
1538            PlanNode::Window { input, windows } => {
1539                let result = self.execute_plan(input)?;
1540                execute_window(result, windows)
1541            }
1542
1543            PlanNode::Union { left, right, all } => {
1544                let left_result = self.execute_plan(left)?;
1545                let right_result = self.execute_plan(right)?;
1546                let (left_cols, left_rows) = match left_result {
1547                    QueryResult::Rows { columns, rows } => (columns, rows),
1548                    _ => return Err("UNION requires query results on left side".into()),
1549                };
1550                let (_, right_rows) = match right_result {
1551                    QueryResult::Rows { columns, rows } => (columns, rows),
1552                    _ => return Err("UNION requires query results on right side".into()),
1553                };
1554                let mut combined = left_rows;
1555                if *all {
1556                    // UNION ALL — just concatenate.
1557                    combined.extend(right_rows);
1558                } else {
1559                    // UNION — deduplicate using the same HashSet approach
1560                    // as DISTINCT. Value already implements Hash + Eq.
1561                    let mut seen = std::collections::HashSet::new();
1562                    for row in &combined {
1563                        seen.insert(row.clone());
1564                    }
1565                    for row in right_rows {
1566                        if seen.insert(row.clone()) {
1567                            combined.push(row);
1568                        }
1569                    }
1570                }
1571                Ok(QueryResult::Rows {
1572                    columns: left_cols,
1573                    rows: combined,
1574                })
1575            }
1576
1577            PlanNode::Explain { input } => {
1578                let text = format_plan_tree(input, 0);
1579                Ok(QueryResult::Rows {
1580                    columns: vec!["plan".to_string()],
1581                    rows: text
1582                        .lines()
1583                        .map(|line| vec![Value::Str(line.to_string())])
1584                        .collect(),
1585                })
1586            }
1587
1588            PlanNode::Begin => {
1589                if self.in_transaction {
1590                    return Err(QueryError::Execution(
1591                        "already in a transaction (nested transactions not supported)".into(),
1592                    ));
1593                }
1594                self.catalog
1595                    .begin_transaction()
1596                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1597                self.in_transaction = true;
1598                Ok(QueryResult::Executed {
1599                    message: "transaction started".to_string(),
1600                })
1601            }
1602
1603            PlanNode::Commit => {
1604                if !self.in_transaction {
1605                    return Err(QueryError::Execution(
1606                        "no active transaction to commit".into(),
1607                    ));
1608                }
1609                self.catalog
1610                    .commit_transaction()
1611                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1612                self.in_transaction = false;
1613                Ok(QueryResult::Executed {
1614                    message: "transaction committed".to_string(),
1615                })
1616            }
1617
1618            PlanNode::Rollback => {
1619                if !self.in_transaction {
1620                    return Err(QueryError::Execution(
1621                        "no active transaction to roll back".into(),
1622                    ));
1623                }
1624                self.in_transaction = false;
1625                self.catalog
1626                    .rollback_to_last_sync()
1627                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1628                if let Ok(mut cache) = self.plan_cache.lock() {
1629                    cache.clear();
1630                }
1631                self.view_registry = ViewRegistry::open(self.catalog.data_dir())
1632                    .unwrap_or_else(|_| ViewRegistry::new(self.catalog.data_dir()));
1633                Ok(QueryResult::Executed {
1634                    message: "transaction rolled back".to_string(),
1635                })
1636            }
1637
1638            PlanNode::IndexScan { table, column, key } => {
1639                let key_value = literal_to_value(key)?;
1640                let tbl = self
1641                    .catalog
1642                    .get_table(table)
1643                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1644                let columns: Vec<String> =
1645                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1646
1647                // Fast path: the table has a B-tree on this column.
1648                // Uses index_lookup_all to return ALL matching rows for
1649                // both unique and non-unique indexes.
1650                if tbl.has_index(column) {
1651                    let rids = tbl.index_lookup_all(column, &key_value);
1652                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1653                    for rid in rids {
1654                        if let Some(data) = tbl.heap.get(rid) {
1655                            rows.push(decode_row(&tbl.schema, &data));
1656                        }
1657                    }
1658                    return Ok(QueryResult::Rows { columns, rows });
1659                }
1660
1661                // Fallback: no index on this column. The planner emits IndexScan
1662                // eagerly (it has no visibility into which columns are indexed
1663                // at plan time), so here we must behave like SeqScan+Filter on
1664                // `.col = literal`: return *all* matching rows, not just the
1665                // first one. A non-indexed column isn't necessarily unique.
1666                // We compile the eq predicate once and stream without any
1667                // per-row decode for non-matching rows.
1668                let schema = &tbl.schema;
1669                let fast = FastLayout::new(schema);
1670                let synth_pred = Expr::BinaryOp(
1671                    Box::new(Expr::Field(column.clone())),
1672                    BinOp::Eq,
1673                    Box::new(key.clone()),
1674                );
1675                if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1676                    // Mission F: skip the first 4 Vec doublings.
1677                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1678                    self.catalog
1679                        .for_each_row_raw(table, |_rid, data| {
1680                            if compiled(data) {
1681                                rows.push(decode_row(schema, data));
1682                            }
1683                        })
1684                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1685                    return Ok(QueryResult::Rows { columns, rows });
1686                }
1687
1688                // Last resort: slow eq-check on materialised rows.
1689                let col_idx =
1690                    schema
1691                        .column_index(column)
1692                        .ok_or_else(|| QueryError::ColumnNotFound {
1693                            table: String::new(),
1694                            column: column.clone(),
1695                        })?;
1696                let rows: Vec<Vec<Value>> = tbl
1697                    .scan()
1698                    .filter_map(|(_, row)| {
1699                        if row[col_idx] == key_value {
1700                            Some(row)
1701                        } else {
1702                            None
1703                        }
1704                    })
1705                    .collect();
1706                Ok(QueryResult::Rows { columns, rows })
1707            }
1708
1709            PlanNode::RangeScan {
1710                table,
1711                column,
1712                start,
1713                end,
1714            } => {
1715                let tbl = self
1716                    .catalog
1717                    .get_table(table)
1718                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1719                let columns: Vec<String> =
1720                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1721                let schema = &tbl.schema;
1722
1723                let start_val = match start {
1724                    Some((expr, _)) => Some(literal_to_value(expr)?),
1725                    None => None,
1726                };
1727                let end_val = match end {
1728                    Some((expr, _)) => Some(literal_to_value(expr)?),
1729                    None => None,
1730                };
1731                let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1732                let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1733
1734                // Non-unique index: walk the composite (value, rid) leaf
1735                // chain between prefix bounds, fetch each row from the heap,
1736                // and recheck. The recheck enforces exclusive bounds
1737                // (range_rids is inclusive) and defensively skips any decoded
1738                // null (nulls are never indexed, so they must not match).
1739                if tbl.is_index_unique(column) == Some(false) {
1740                    if let Some(btree) = tbl.index(column) {
1741                        if start_val.is_some() || end_val.is_some() {
1742                            let col_idx = schema.column_index(column).ok_or_else(|| {
1743                                QueryError::ColumnNotFound {
1744                                    table: String::new(),
1745                                    column: column.clone(),
1746                                }
1747                            })?;
1748                            let rids = btree.range_rids(start_val.as_ref(), end_val.as_ref());
1749                            let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1750                            for rid in rids {
1751                                if let Some(data) = tbl.heap.get(rid) {
1752                                    let row = decode_row(schema, &data);
1753                                    if !row[col_idx].is_empty()
1754                                        && range_matches(
1755                                            &row[col_idx],
1756                                            &start_val,
1757                                            start_inclusive,
1758                                            &end_val,
1759                                            end_inclusive,
1760                                        )
1761                                    {
1762                                        rows.push(row);
1763                                    }
1764                                }
1765                            }
1766                            return Ok(QueryResult::Rows { columns, rows });
1767                        }
1768                    }
1769                }
1770
1771                // Range scans use the btree fast path for unique indexes,
1772                // walking raw column-value keys directly.
1773                if tbl.is_index_unique(column) == Some(true) {
1774                    if let Some(btree) = tbl.index(column) {
1775                        let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1776                            (Some(s), Some(e)) => btree.range(s, e).collect(),
1777                            (Some(s), None) => btree.range_from(s),
1778                            (None, Some(e)) => btree.range_to(e),
1779                            (None, None) => {
1780                                let rows: Vec<Vec<Value>> =
1781                                    tbl.scan().map(|(_, row)| row).collect();
1782                                return Ok(QueryResult::Rows { columns, rows });
1783                            }
1784                        };
1785                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1786                        for (key, rid) in hits {
1787                            if !start_inclusive {
1788                                if let Some(ref s) = start_val {
1789                                    if &key == s {
1790                                        continue;
1791                                    }
1792                                }
1793                            }
1794                            if !end_inclusive {
1795                                if let Some(ref e) = end_val {
1796                                    if &key == e {
1797                                        continue;
1798                                    }
1799                                }
1800                            }
1801                            if let Some(data) = tbl.heap.get(rid) {
1802                                rows.push(decode_row(schema, &data));
1803                            }
1804                        }
1805                        return Ok(QueryResult::Rows { columns, rows });
1806                    }
1807                }
1808
1809                // Fallback: no index — synthesize range predicate and scan.
1810                let fast = FastLayout::new(schema);
1811                let synth = synthesize_range_predicate(column, start, end);
1812                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1813                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1814                    self.catalog
1815                        .for_each_row_raw(table, |_rid, data| {
1816                            if compiled(data) {
1817                                rows.push(decode_row(schema, data));
1818                            }
1819                        })
1820                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1821                    return Ok(QueryResult::Rows { columns, rows });
1822                }
1823
1824                let col_idx =
1825                    schema
1826                        .column_index(column)
1827                        .ok_or_else(|| QueryError::ColumnNotFound {
1828                            table: String::new(),
1829                            column: column.clone(),
1830                        })?;
1831                let rows: Vec<Vec<Value>> = tbl
1832                    .scan()
1833                    .filter(|(_, row)| {
1834                        range_matches(
1835                            &row[col_idx],
1836                            &start_val,
1837                            start_inclusive,
1838                            &end_val,
1839                            end_inclusive,
1840                        )
1841                    })
1842                    .map(|(_, row)| row)
1843                    .collect();
1844                Ok(QueryResult::Rows { columns, rows })
1845            }
1846        }
1847    }
1848
1849    // ─── Materialized view operations ──────────────────────────────────────
1850
1851    /// Create a materialized view: execute the source query, store results
1852    /// in a new backing table, and register the view.
1853    fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1854        if self.view_registry.is_view(name) {
1855            return Err(QueryError::ViewError(format!(
1856                "materialized view '{name}' already exists"
1857            )));
1858        }
1859        // Execute the source query to get the result set.
1860        let result = self.execute_powql(query_text)?;
1861        let (columns, rows) = match result {
1862            QueryResult::Rows { columns, rows } => (columns, rows),
1863            _ => return Err("view source query must be a SELECT".into()),
1864        };
1865        // Derive a schema for the backing table from the query result columns.
1866        let schema = self.derive_view_schema(name, &columns, &rows);
1867        // Create the backing table and insert the result rows.
1868        self.catalog
1869            .create_table(schema)
1870            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1871        for row in &rows {
1872            self.catalog
1873                .insert(name, row)
1874                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1875        }
1876        // Determine which base tables this view depends on by parsing the query.
1877        let depends_on = self.extract_view_deps(query_text);
1878        self.view_registry
1879            .register(ViewDef {
1880                name: name.to_string(),
1881                query: query_text.to_string(),
1882                depends_on,
1883                dirty: false,
1884            })
1885            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1886        Ok(())
1887    }
1888
1889    /// Refresh a materialized view: re-execute its source query and replace
1890    /// the backing table's contents.
1891    fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1892        let def = self
1893            .view_registry
1894            .get(name)
1895            .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1896        let query_text = def.query.clone();
1897        // Execute the source query.
1898        let result = self.execute_powql(&query_text)?;
1899        let (_columns, rows) = match result {
1900            QueryResult::Rows { columns, rows } => (columns, rows),
1901            _ => return Err("view source query must be a SELECT".into()),
1902        };
1903        // Clear old data and insert fresh results. Mission B2: logged
1904        // variant — view refreshes are a mutation and crash recovery
1905        // must see them.
1906        self.catalog
1907            .scan_delete_matching_logged(name, |_| true)
1908            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1909        for row in &rows {
1910            self.catalog
1911                .insert(name, row)
1912                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1913        }
1914        self.view_registry.mark_clean(name);
1915        Ok(())
1916    }
1917
1918    /// Drop a materialized view: remove the backing table and unregister.
1919    fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1920        if !self.view_registry.is_view(name) {
1921            return Err(QueryError::ViewError(format!(
1922                "materialized view '{name}' not found"
1923            )));
1924        }
1925        self.view_registry
1926            .unregister(name)
1927            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1928        self.catalog
1929            .drop_table(name)
1930            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1931        Ok(())
1932    }
1933
1934    /// Derive a storage `Schema` for a view's backing table from query
1935    /// result column names and the first row's types.
1936    fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1937        use powdb_storage::types::{ColumnDef, TypeId};
1938        let cols: Vec<ColumnDef> = columns
1939            .iter()
1940            .enumerate()
1941            .map(|(i, col_name)| {
1942                let type_id = rows
1943                    .first()
1944                    .and_then(|row| row.get(i))
1945                    .map(|v| v.type_id())
1946                    .unwrap_or(TypeId::Str);
1947                ColumnDef {
1948                    name: col_name.clone(),
1949                    type_id,
1950                    required: false,
1951                    position: i as u16,
1952                }
1953            })
1954            .collect();
1955        Schema {
1956            table_name: name.to_string(),
1957            columns: cols,
1958        }
1959    }
1960
1961    /// Extract base table dependencies from a view's source query by
1962    /// parsing it and collecting the source table name.
1963    fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1964        use crate::parser::parse;
1965        match parse(query_text) {
1966            Ok(Statement::Query(q)) => {
1967                let mut deps = vec![q.source.clone()];
1968                for j in &q.joins {
1969                    deps.push(j.source.clone());
1970                }
1971                deps
1972            }
1973            _ => Vec::new(),
1974        }
1975    }
1976
1977    // ─── Specialized fast paths ─────────────────────────────────────────────
1978    //
1979    // These methods are helpers for the `execute_plan` match arms above.
1980    // Each returns `Ok(Some(result))` when the fast path fires, `Ok(None)`
1981    // when the shape isn't supported (caller falls back to generic code).
1982
1983    /// Aggregate sum/avg/min/max over a single fixed-size i64 column, with
1984    /// an optional compiled filter predicate. Walks raw row bytes — zero
1985    /// per-row allocation. Uses i128 accumulator for sum/avg overflow safety.
1986    pub(super) fn agg_single_col_fast(
1987        &self,
1988        table: &str,
1989        col: &str,
1990        function: AggFunc,
1991        predicate: Option<&Expr>,
1992    ) -> Result<Option<QueryResult>, QueryError> {
1993        let schema = self
1994            .catalog
1995            .schema(table)
1996            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1997            .clone();
1998        let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1999        let col_idx = match schema.column_index(col) {
2000            Some(i) => i,
2001            None => return Ok(None),
2002        };
2003        // Only fast-path fixed-size numeric columns (Int/Float) for
2004        // sum/avg/min/max/count. Mission D10: Float parity — prior version
2005        // bailed on Float columns, forcing them through the generic row-
2006        // decoding path that allocated a Vec<Value> per row and dispatched
2007        // on Value::cmp for every compare. f64 decode is structurally the
2008        // same as i64 (load 8 bytes, cast), so the fast path handles both.
2009        let col_type = schema.columns[col_idx].type_id;
2010        if col_type != TypeId::Int && col_type != TypeId::Float {
2011            return Ok(None);
2012        }
2013
2014        let fast = FastLayout::new(&schema);
2015        // Mission C Phase 20b: inline the numeric-column reader instead of
2016        // building a `Box<dyn Fn>`. Eliminates 100K vtable dispatches per
2017        // 100K-row agg scan — every reader call folds directly into the
2018        // hot loop below.
2019        let byte_offset = match fast.fixed_offsets[col_idx] {
2020            Some(o) => o,
2021            None => return Ok(None),
2022        };
2023        let bitmap_byte = col_idx / 8;
2024        let bitmap_bit = (col_idx % 8) as u32;
2025        let body_data_offset = 2 + fast.bitmap_size + byte_offset;
2026
2027        // Optional compiled filter.
2028        let compiled_pred: Option<CompiledPredicate> = match predicate {
2029            Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
2030                Some(c) => Some(c),
2031                None => return Ok(None), // let generic path handle it
2032            },
2033            None => None,
2034        };
2035
2036        // Mission C Phase 20b: specialize the inner loop per aggregate
2037        // function. The previous version ran a `match function { ... }`
2038        // *inside* the closure, which kept LLVM from producing optimal
2039        // scalar code for each variant (agg_max regressed ~23% vs the
2040        // baseline Box<dyn Fn> version even though per-row vtable cost
2041        // should have been strictly lower). Pushing the match out of the
2042        // hot loop lets each specialized body fold cleanly into
2043        // `for_each_row_raw` and removes a captured `AggFunc` + match
2044        // dispatch per row.
2045        //
2046        // Mission D10: same specialisation applies to the Float branch.
2047        // For Min/Max we use `f64::total_cmp` so the result matches
2048        // `Value::Ord` — this is the same ordering ORDER BY and the
2049        // top-N sort fast path use, keeping semantics consistent across
2050        // read paths (NaN compares as greatest, -0.0 < +0.0 for
2051        // deterministic tie-breaking).
2052        //
2053        // Mission D11 Phase 1: each inner loop now splits on presence of
2054        // a predicate (`if let Some(pred) = &compiled_pred`) so the hot
2055        // body never re-tests `Option` per row, and reads column bytes
2056        // via `read_i64_unchecked` / `read_f64_unchecked` helpers that
2057        // drop two bounds checks per row (null bitmap byte + value
2058        // slice). Safety is carried by the `FastLayout` invariant that
2059        // `data_offset + 8 <= row_len` for any fixed-size column; see
2060        // the helper doc comments. Hot loops are macro-generated so the
2061        // with-pred / no-pred split can't drift between variants.
2062        let result = match col_type {
2063            TypeId::Int => match function {
2064                AggFunc::Sum | AggFunc::Avg => {
2065                    let mut sum_i128: i128 = 0;
2066                    let mut count: i64 = 0;
2067                    agg_int_loop!(
2068                        self,
2069                        table,
2070                        compiled_pred,
2071                        bitmap_byte,
2072                        bitmap_bit,
2073                        body_data_offset,
2074                        |v: i64| {
2075                            count += 1;
2076                            sum_i128 += v as i128;
2077                        }
2078                    );
2079                    if matches!(function, AggFunc::Sum) {
2080                        let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
2081                        QueryResult::Scalar(Value::Int(clamped))
2082                    } else if count == 0 {
2083                        QueryResult::Scalar(Value::Empty)
2084                    } else {
2085                        let avg = (sum_i128 as f64) / (count as f64);
2086                        QueryResult::Scalar(Value::Float(avg))
2087                    }
2088                }
2089                AggFunc::Min => {
2090                    let mut min_v: Option<i64> = None;
2091                    agg_int_loop!(
2092                        self,
2093                        table,
2094                        compiled_pred,
2095                        bitmap_byte,
2096                        bitmap_bit,
2097                        body_data_offset,
2098                        |v: i64| {
2099                            min_v = Some(match min_v {
2100                                Some(m) => m.min(v),
2101                                None => v,
2102                            });
2103                        }
2104                    );
2105                    QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
2106                }
2107                AggFunc::Max => {
2108                    let mut max_v: Option<i64> = None;
2109                    agg_int_loop!(
2110                        self,
2111                        table,
2112                        compiled_pred,
2113                        bitmap_byte,
2114                        bitmap_bit,
2115                        body_data_offset,
2116                        |v: i64| {
2117                            max_v = Some(match max_v {
2118                                Some(m) => m.max(v),
2119                                None => v,
2120                            });
2121                        }
2122                    );
2123                    QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
2124                }
2125                AggFunc::Count => {
2126                    let mut count: i64 = 0;
2127                    agg_int_loop!(
2128                        self,
2129                        table,
2130                        compiled_pred,
2131                        bitmap_byte,
2132                        bitmap_bit,
2133                        body_data_offset,
2134                        |_v: i64| {
2135                            count += 1;
2136                        }
2137                    );
2138                    QueryResult::Scalar(Value::Int(count))
2139                }
2140                AggFunc::CountDistinct => {
2141                    let mut seen = rustc_hash::FxHashSet::default();
2142                    agg_int_loop!(
2143                        self,
2144                        table,
2145                        compiled_pred,
2146                        bitmap_byte,
2147                        bitmap_bit,
2148                        body_data_offset,
2149                        |v: i64| {
2150                            seen.insert(v);
2151                        }
2152                    );
2153                    QueryResult::Scalar(Value::Int(seen.len() as i64))
2154                }
2155            },
2156            TypeId::Float => match function {
2157                AggFunc::Sum => {
2158                    // Use a single f64 accumulator. Naive summation is
2159                    // sufficient for MVP parity; if precision becomes an
2160                    // issue on long scans we can upgrade to Kahan–Neumaier
2161                    // compensated sum (~2x scalar cost, zero error growth).
2162                    let mut sum: f64 = 0.0;
2163                    agg_float_loop!(
2164                        self,
2165                        table,
2166                        compiled_pred,
2167                        bitmap_byte,
2168                        bitmap_bit,
2169                        body_data_offset,
2170                        |v: f64| {
2171                            sum += v;
2172                        }
2173                    );
2174                    QueryResult::Scalar(Value::Float(sum))
2175                }
2176                AggFunc::Avg => {
2177                    let mut sum: f64 = 0.0;
2178                    let mut count: i64 = 0;
2179                    agg_float_loop!(
2180                        self,
2181                        table,
2182                        compiled_pred,
2183                        bitmap_byte,
2184                        bitmap_bit,
2185                        body_data_offset,
2186                        |v: f64| {
2187                            sum += v;
2188                            count += 1;
2189                        }
2190                    );
2191                    if count == 0 {
2192                        QueryResult::Scalar(Value::Empty)
2193                    } else {
2194                        QueryResult::Scalar(Value::Float(sum / count as f64))
2195                    }
2196                }
2197                AggFunc::Min => {
2198                    // `total_cmp` for deterministic NaN handling (matches
2199                    // Value::Ord). NaN compares greatest, so Min will
2200                    // correctly ignore it in favour of any finite value.
2201                    let mut min_v: Option<f64> = None;
2202                    agg_float_loop!(
2203                        self,
2204                        table,
2205                        compiled_pred,
2206                        bitmap_byte,
2207                        bitmap_bit,
2208                        body_data_offset,
2209                        |v: f64| {
2210                            min_v = Some(match min_v {
2211                                Some(m) => {
2212                                    if v.total_cmp(&m).is_lt() {
2213                                        v
2214                                    } else {
2215                                        m
2216                                    }
2217                                }
2218                                None => v,
2219                            });
2220                        }
2221                    );
2222                    QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2223                }
2224                AggFunc::Max => {
2225                    let mut max_v: Option<f64> = None;
2226                    agg_float_loop!(
2227                        self,
2228                        table,
2229                        compiled_pred,
2230                        bitmap_byte,
2231                        bitmap_bit,
2232                        body_data_offset,
2233                        |v: f64| {
2234                            max_v = Some(match max_v {
2235                                Some(m) => {
2236                                    if v.total_cmp(&m).is_gt() {
2237                                        v
2238                                    } else {
2239                                        m
2240                                    }
2241                                }
2242                                None => v,
2243                            });
2244                        }
2245                    );
2246                    QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2247                }
2248                AggFunc::Count => {
2249                    let mut count: i64 = 0;
2250                    agg_float_loop!(
2251                        self,
2252                        table,
2253                        compiled_pred,
2254                        bitmap_byte,
2255                        bitmap_bit,
2256                        body_data_offset,
2257                        |_v: f64| {
2258                            count += 1;
2259                        }
2260                    );
2261                    QueryResult::Scalar(Value::Int(count))
2262                }
2263                AggFunc::CountDistinct => {
2264                    // Hash on `f64::to_bits` — matches `Value::Hash`, so
2265                    // distinct NaN bit patterns count as distinct and
2266                    // -0.0/+0.0 count as distinct. Consistent with how
2267                    // Float values are hashed in every other DISTINCT /
2268                    // GROUP BY path.
2269                    let mut seen = rustc_hash::FxHashSet::default();
2270                    agg_float_loop!(
2271                        self,
2272                        table,
2273                        compiled_pred,
2274                        bitmap_byte,
2275                        bitmap_bit,
2276                        body_data_offset,
2277                        |v: f64| {
2278                            seen.insert(v.to_bits());
2279                        }
2280                    );
2281                    QueryResult::Scalar(Value::Int(seen.len() as i64))
2282                }
2283            },
2284            _ => unreachable!("type guard above restricts to Int/Float"),
2285        };
2286        Ok(Some(result))
2287    }
2288
2289    /// `Project(Limit(Filter(SeqScan)))` and `Project(Limit(SeqScan))`.
2290    /// Streams rows, decodes only projected columns, stops at the limit.
2291    pub(super) fn project_filter_limit_fast(
2292        &self,
2293        table: &str,
2294        fields: &[ProjectField],
2295        limit: usize,
2296        predicate: Option<&Expr>,
2297    ) -> Result<Option<QueryResult>, QueryError> {
2298        let schema = self
2299            .catalog
2300            .schema(table)
2301            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2302            .clone();
2303        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2304
2305        // Each projection field must be a simple `.field` reference for this
2306        // fast path. Aliased or computed fields fall through.
2307        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2308        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2309        for f in fields {
2310            let name = match &f.expr {
2311                Expr::Field(n) => n.clone(),
2312                _ => return Ok(None),
2313            };
2314            let idx = match all_columns.iter().position(|c| c == &name) {
2315                Some(i) => i,
2316                None => return Ok(None),
2317            };
2318            proj_indices.push(idx);
2319            proj_columns.push(f.alias.clone().unwrap_or(name));
2320        }
2321
2322        let fast = FastLayout::new(&schema);
2323        let row_layout = RowLayout::new(&schema);
2324
2325        let compiled_pred: Option<CompiledPredicate> = match predicate {
2326            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2327                Some(c) => Some(c),
2328                None => return Ok(None),
2329            },
2330            None => None,
2331        };
2332
2333        let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2334        // Mission D2: use try_for_each_row_raw to actually stop iterating
2335        // once the limit is reached. The previous `done` flag only short-
2336        // circuited the closure body, so a `limit 100` over 100K rows still
2337        // walked all 100K slots — burning ~30x SQLite on scan_filter_project_top100.
2338        self.catalog
2339            .try_for_each_row_raw(table, |_rid, data| {
2340                use std::ops::ControlFlow;
2341                if let Some(ref pred) = compiled_pred {
2342                    if !pred(data) {
2343                        return ControlFlow::Continue(());
2344                    }
2345                }
2346                let row: Vec<Value> = proj_indices
2347                    .iter()
2348                    .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2349                    .collect();
2350                out.push(row);
2351                if out.len() >= limit {
2352                    ControlFlow::Break(())
2353                } else {
2354                    ControlFlow::Continue(())
2355                }
2356            })
2357            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2358
2359        Ok(Some(QueryResult::Rows {
2360            columns: proj_columns,
2361            rows: out,
2362        }))
2363    }
2364
2365    /// `Project(Limit(Sort(Filter(SeqScan))))` and `Project(Limit(Sort(SeqScan)))`.
2366    /// Bounded top-N heap over the sort key. Only the sort key needs to be
2367    /// read per row; projected columns are decoded only for the final
2368    /// winning rows when the heap drains.
2369    pub(super) fn project_filter_sort_limit_fast(
2370        &self,
2371        table: &str,
2372        fields: &[ProjectField],
2373        sort_field: &str,
2374        descending: bool,
2375        limit: usize,
2376        predicate: Option<&Expr>,
2377    ) -> Result<Option<QueryResult>, QueryError> {
2378        if limit == 0 {
2379            // Degenerate case — empty result. Let the generic path handle it
2380            // for proper column naming.
2381            return Ok(None);
2382        }
2383        // The top-N heaps never hold more than `limit` rows, but `limit` is an
2384        // attacker-supplied literal (`order .x limit 99999999999`). Reserving
2385        // that capacity up front would allocate gigabytes and abort the
2386        // process before a single row is read. Cap the pre-allocation; the
2387        // heaps still grow on demand up to the true `limit`.
2388        const TOPN_PREALLOC_CAP: usize = 4096;
2389        let prealloc = limit.min(TOPN_PREALLOC_CAP);
2390        let schema = self
2391            .catalog
2392            .schema(table)
2393            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2394            .clone();
2395        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2396
2397        // Sort key must be a fixed-size numeric column (Int or Float).
2398        // Mission D10: extended from Int-only. Float sort keys use a
2399        // sortable-u64 transform (see `f64_to_sortable_u64`) so the heap
2400        // path stays keyed on `u64` and the whole branch shape is
2401        // identical to the Int case — no new heap types, no `total_cmp`
2402        // closures in the hot loop.
2403        let sort_idx = match schema.column_index(sort_field) {
2404            Some(i) => i,
2405            None => return Ok(None),
2406        };
2407        let sort_col_type = schema.columns[sort_idx].type_id;
2408        if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2409            return Ok(None);
2410        }
2411
2412        // Each projection field must be a simple `.field`.
2413        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2414        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2415        for f in fields {
2416            let name = match &f.expr {
2417                Expr::Field(n) => n.clone(),
2418                _ => return Ok(None),
2419            };
2420            let idx = match all_columns.iter().position(|c| c == &name) {
2421                Some(i) => i,
2422                None => return Ok(None),
2423            };
2424            proj_indices.push(idx);
2425            proj_columns.push(f.alias.clone().unwrap_or(name));
2426        }
2427
2428        let fast = FastLayout::new(&schema);
2429        let row_layout = RowLayout::new(&schema);
2430        // Mission C Phase 20b: inline numeric-column reader (no Box<dyn Fn>).
2431        let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2432            Some(o) => o,
2433            None => return Ok(None),
2434        };
2435        let sort_bitmap_byte = sort_idx / 8;
2436        let sort_bitmap_bit = (sort_idx % 8) as u32;
2437        let sort_body_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2438
2439        let compiled_pred: Option<CompiledPredicate> = match predicate {
2440            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2441                Some(c) => Some(c),
2442                None => return Ok(None),
2443            },
2444            None => None,
2445        };
2446
2447        // Bounded top-N heap. For `order .x desc limit N`, we want the N
2448        // largest values — use a min-heap so the smallest is at the top and
2449        // can be popped when a better candidate arrives. For ascending, use
2450        // a max-heap. We tie-break with a monotonic `seq` counter so the
2451        // result is deterministic and stable.
2452        //
2453        // To keep this simple we maintain two typed heaps and pick by
2454        // direction.
2455        let drained: Vec<Vec<u8>> = match sort_col_type {
2456            TypeId::Int => {
2457                let mut seq: u64 = 0;
2458                let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2459                    BinaryHeap::with_capacity(prealloc);
2460                let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2461                    BinaryHeap::with_capacity(prealloc);
2462
2463                self.catalog
2464                    .for_each_row_raw(table, |_rid, data| {
2465                        if let Some(ref pred) = compiled_pred {
2466                            if !pred(data) {
2467                                return;
2468                            }
2469                        }
2470                        // Inlined int-column reader: null check + i64 decode.
2471                        let base = row_body_base(data);
2472                        let sort_data_offset = base + sort_body_data_offset;
2473                        if data.len() < sort_data_offset + 8
2474                            || data.len() <= base + 2 + sort_bitmap_byte
2475                        {
2476                            return;
2477                        }
2478                        let is_null =
2479                            (data[base + 2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2480                        if is_null {
2481                            return;
2482                        }
2483                        let key = i64::from_le_bytes(
2484                            data[sort_data_offset..sort_data_offset + 8]
2485                                .try_into()
2486                                .unwrap_or_else(|_| unreachable!()),
2487                        );
2488                        let id = seq;
2489                        seq += 1;
2490
2491                        if descending {
2492                            if heap_desc.len() < limit {
2493                                heap_desc.push(Reverse((key, id, data.to_vec())));
2494                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2495                                if key > *top_key {
2496                                    heap_desc.pop();
2497                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2498                                }
2499                            }
2500                        } else if heap_asc.len() < limit {
2501                            heap_asc.push((key, id, data.to_vec()));
2502                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2503                            if key < *top_key {
2504                                heap_asc.pop();
2505                                heap_asc.push((key, id, data.to_vec()));
2506                            }
2507                        }
2508                    })
2509                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2510
2511                let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2512                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2513                } else {
2514                    heap_asc.into_iter().collect()
2515                };
2516                if descending {
2517                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2518                } else {
2519                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2520                }
2521                drained.into_iter().map(|(_, _, d)| d).collect()
2522            }
2523            TypeId::Float => {
2524                // Novel angle: rather than introducing a `TotalF64` newtype
2525                // with `Ord via total_cmp`, transform the f64 bit pattern
2526                // into a sortable `u64` so `BinaryHeap<u64>` orders exactly
2527                // like `f64::total_cmp` would. Classic trick: flip the sign
2528                // bit on positives, flip all bits on negatives. Result:
2529                // - NaN (sign=0) stays greatest, matching total_cmp
2530                // - -0.0 sorts before +0.0, matching total_cmp
2531                // - Hot loop is branch-cheap (one compare + one xor)
2532                let mut seq: u64 = 0;
2533                let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2534                    BinaryHeap::with_capacity(prealloc);
2535                let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2536                    BinaryHeap::with_capacity(prealloc);
2537
2538                self.catalog
2539                    .for_each_row_raw(table, |_rid, data| {
2540                        if let Some(ref pred) = compiled_pred {
2541                            if !pred(data) {
2542                                return;
2543                            }
2544                        }
2545                        let base = row_body_base(data);
2546                        let sort_data_offset = base + sort_body_data_offset;
2547                        if data.len() < sort_data_offset + 8
2548                            || data.len() <= base + 2 + sort_bitmap_byte
2549                        {
2550                            return;
2551                        }
2552                        let is_null =
2553                            (data[base + 2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2554                        if is_null {
2555                            return;
2556                        }
2557                        let bits = u64::from_le_bytes(
2558                            data[sort_data_offset..sort_data_offset + 8]
2559                                .try_into()
2560                                .unwrap_or_else(|_| unreachable!()),
2561                        );
2562                        let key = f64_bits_to_sortable_u64(bits);
2563                        let id = seq;
2564                        seq += 1;
2565
2566                        if descending {
2567                            if heap_desc.len() < limit {
2568                                heap_desc.push(Reverse((key, id, data.to_vec())));
2569                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2570                                if key > *top_key {
2571                                    heap_desc.pop();
2572                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2573                                }
2574                            }
2575                        } else if heap_asc.len() < limit {
2576                            heap_asc.push((key, id, data.to_vec()));
2577                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2578                            if key < *top_key {
2579                                heap_asc.pop();
2580                                heap_asc.push((key, id, data.to_vec()));
2581                            }
2582                        }
2583                    })
2584                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2585
2586                let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2587                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2588                } else {
2589                    heap_asc.into_iter().collect()
2590                };
2591                if descending {
2592                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2593                } else {
2594                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2595                }
2596                drained.into_iter().map(|(_, _, d)| d).collect()
2597            }
2598            _ => unreachable!("type guard above restricts to Int/Float"),
2599        };
2600
2601        let rows: Vec<Vec<Value>> = drained
2602            .into_iter()
2603            .map(|data| {
2604                proj_indices
2605                    .iter()
2606                    .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2607                    .collect()
2608            })
2609            .collect();
2610
2611        Ok(Some(QueryResult::Rows {
2612            columns: proj_columns,
2613            rows,
2614        }))
2615    }
2616
2617    /// Gather the RowIds that a mutation should operate on, without
2618    /// materialising the full row set. Handles the shapes the planner emits
2619    /// for update/delete: SeqScan, IndexScan, and Filter(SeqScan). Other
2620    /// shapes fall back to `generic_rid_match`.
2621    ///
2622    /// Perf sprint: try to fuse the predicate evaluation and in-place
2623    /// byte-level mutation into a single heap walk. Returns `Some(result)`
2624    /// if the fused path fired, `None` to fall through to the generic
2625    /// two-pass code.
2626    ///
2627    /// Covers two shapes:
2628    /// 1. Fixed-width non-null literal assignments on non-indexed columns
2629    ///    → byte-patch every matched row in place (row length unchanged).
2630    /// 2. Single var-col literal assignment on a non-indexed column
2631    ///    → `patch_var_column_in_place` on every matched row (may shrink);
2632    ///    rows that can't be patched in place are collected for fallback.
2633    fn try_fused_scan_update(
2634        &mut self,
2635        table: &str,
2636        predicate: &Expr,
2637        resolved: &[(usize, Value)],
2638        changed_cols: &[usize],
2639    ) -> Option<Result<QueryResult, QueryError>> {
2640        // Build compiled predicate. Requires a schema borrow that must be
2641        // dropped before we call scan_patch_matching_logged.
2642        let compiled = {
2643            let schema = self.catalog.schema(table)?;
2644            let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2645            let fast = FastLayout::new(schema);
2646            compile_predicate(predicate, &columns, &fast, schema)?
2647        };
2648
2649        // ── Path 1: fixed-width fast patch ──────────────────────────
2650        let fixed_patches: Option<Vec<FastPatch>> = {
2651            let tbl = self.catalog.get_table(table)?;
2652            let schema = &tbl.schema;
2653            let all_fixed_nonnull = resolved
2654                .iter()
2655                .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2656            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2657            if all_fixed_nonnull && no_indexed {
2658                let layout = RowLayout::new(schema);
2659                let bitmap_size = layout.bitmap_size();
2660                Some(
2661                    resolved
2662                        .iter()
2663                        .map(|(idx, val)| {
2664                            let fixed_off = layout
2665                                .fixed_offset(*idx)
2666                                .expect("is_fixed_size already checked");
2667                            let field_off = 2 + bitmap_size + fixed_off;
2668                            let bytes: FixedBytes = match val {
2669                                Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2670                                Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2671                                Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2672                                Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2673                                Value::Uuid(v) => FixedBytes::Uuid(*v),
2674                                _ => unreachable!("all_fixed_nonnull guard"),
2675                            };
2676                            FastPatch {
2677                                field_off,
2678                                bitmap_byte_off: 2 + idx / 8,
2679                                bit_mask: 1u8 << (idx % 8),
2680                                bytes,
2681                            }
2682                        })
2683                        .collect(),
2684                )
2685            } else {
2686                None
2687            }
2688        };
2689        if let Some(patches) = fixed_patches {
2690            let result = self
2691                .catalog
2692                .scan_patch_matching_logged(table, compiled, |row| {
2693                    let base = row_body_base(row);
2694                    for p in &patches {
2695                        row[base + p.bitmap_byte_off] &= !p.bit_mask;
2696                        let field_bytes = p.bytes.as_slice();
2697                        row[base + p.field_off..base + p.field_off + field_bytes.len()]
2698                            .copy_from_slice(field_bytes);
2699                    }
2700                    Some(row.len() as u16)
2701                })
2702                .map_err(|e| e.to_string());
2703            match result {
2704                Ok((count, _)) => {
2705                    self.view_registry.mark_dependents_dirty(table);
2706                    return Some(Ok(QueryResult::Modified(count)));
2707                }
2708                Err(e) => return Some(Err(QueryError::Execution(e))),
2709            }
2710        }
2711
2712        // ── Path 2: single var-col shrink fast patch ────────────────
2713        let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2714            let tbl = self.catalog.get_table(table)?;
2715            let schema = &tbl.schema;
2716            let is_single = resolved.len() == 1;
2717            let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2718            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2719            if is_single && is_var && no_indexed {
2720                let (idx, val) = &resolved[0];
2721                let bytes_opt = match val {
2722                    Value::Str(s) => Some(s.as_bytes().to_vec()),
2723                    Value::Bytes(b) => Some(b.clone()),
2724                    Value::Empty => None,
2725                    _ => return None, // type mismatch, fall through
2726                };
2727                Some((*idx, bytes_opt))
2728            } else {
2729                None
2730            }
2731        };
2732        if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2733            // Build a fresh RowLayout before the mutable borrow.
2734            let layout = {
2735                let schema = self.catalog.schema(table)?;
2736                RowLayout::new(schema)
2737            };
2738            let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2739            let result = self
2740                .catalog
2741                .scan_patch_matching_logged(table, compiled, |row| {
2742                    patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2743                })
2744                .map_err(|e| e.to_string());
2745            match result {
2746                Ok((mut count, fallback_rids)) => {
2747                    // Handle rows where in-place patch failed (new > old).
2748                    for rid in fallback_rids {
2749                        let mut row = match self.catalog.get(table, rid) {
2750                            Some(r) => r,
2751                            None => continue,
2752                        };
2753                        for (idx, val) in resolved.iter() {
2754                            row[*idx] = val.clone();
2755                        }
2756                        if let Err(e) =
2757                            self.catalog
2758                                .update_hinted(table, rid, &row, Some(changed_cols))
2759                        {
2760                            return Some(Err(QueryError::StorageError(e.to_string())));
2761                        }
2762                        count += 1;
2763                    }
2764                    self.view_registry.mark_dependents_dirty(table);
2765                    return Some(Ok(QueryResult::Modified(count)));
2766                }
2767                Err(e) => return Some(Err(QueryError::Execution(e))),
2768            }
2769        }
2770
2771        None // no fused path applicable — fall through
2772    }
2773
2774    /// Mission C Phase 3: schema is looked up via `self.catalog.schema(table)`
2775    /// inside the branches that actually need it. Previously the caller had
2776    /// to clone the full Schema (6+ String allocs) before every mutation just
2777    /// so this function could borrow it — a cost the update/delete hot path
2778    /// did not need.
2779    fn collect_rids_for_mutation(
2780        &mut self,
2781        input: &PlanNode,
2782        table: &str,
2783    ) -> Result<Vec<RowId>, QueryError> {
2784        match input {
2785            PlanNode::SeqScan { table: t } if t == table => {
2786                // "Update/delete everything" — rare but legal.
2787                let rids: Vec<RowId> = self
2788                    .catalog
2789                    .scan(table)
2790                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2791                    .map(|(rid, _)| rid)
2792                    .collect();
2793                Ok(rids)
2794            }
2795            PlanNode::IndexScan {
2796                table: t,
2797                column,
2798                key,
2799            } if t == table => {
2800                let key_value = literal_to_value(key)?;
2801
2802                // Indexed case: single lookup, 0 or 1 rows.
2803                // Mission D7: int-specialized fast path on int-keyed indexes
2804                // (primary keys, created_at, etc.) — the common case for
2805                // `update_by_pk` / `delete where id = ?`.
2806                //
2807                // Scope the `tbl` borrow so it's released before we fall
2808                // through to the scan-based paths below (which reborrow
2809                // `self.catalog`).
2810                {
2811                    let tbl = self
2812                        .catalog
2813                        .get_table(table)
2814                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2815                    if tbl.has_index(column) {
2816                        let rids = tbl.index_lookup_all(column, &key_value);
2817                        return Ok(rids);
2818                    }
2819                }
2820
2821                // No index: the planner folds `.col = literal` to IndexScan
2822                // regardless of whether the column is actually unique. When
2823                // there's no index we must behave like Filter(SeqScan) and
2824                // return *all* matching RIDs — not just the first one.
2825                let schema = self
2826                    .catalog
2827                    .schema(table)
2828                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2829                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2830                let fast = FastLayout::new(schema);
2831                let synth = Expr::BinaryOp(
2832                    Box::new(Expr::Field(column.clone())),
2833                    BinOp::Eq,
2834                    Box::new(key.clone()),
2835                );
2836                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2837                    // Mission F: skip the first 4 Vec doublings.
2838                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2839                    self.catalog
2840                        .for_each_row_raw(table, |rid, data| {
2841                            if compiled(data) {
2842                                rids.push(rid);
2843                            }
2844                        })
2845                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2846                    return Ok(rids);
2847                }
2848
2849                // Fallback: decode each row, compare values.
2850                let col_idx =
2851                    schema
2852                        .column_index(column)
2853                        .ok_or_else(|| QueryError::ColumnNotFound {
2854                            table: String::new(),
2855                            column: column.clone(),
2856                        })?;
2857                let rids: Vec<RowId> = self
2858                    .catalog
2859                    .scan(table)
2860                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2861                    .filter_map(|(rid, row)| {
2862                        if row[col_idx] == key_value {
2863                            Some(rid)
2864                        } else {
2865                            None
2866                        }
2867                    })
2868                    .collect();
2869                Ok(rids)
2870            }
2871            PlanNode::Filter {
2872                input: inner,
2873                predicate,
2874            } => {
2875                if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2876                    if t != table {
2877                        return self.generic_rid_match(input, table);
2878                    }
2879                    let schema = self
2880                        .catalog
2881                        .schema(table)
2882                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2883                    let columns: Vec<String> =
2884                        schema.columns.iter().map(|c| c.name.clone()).collect();
2885                    let fast = FastLayout::new(schema);
2886                    let row_layout = RowLayout::new(schema);
2887
2888                    // Try compiled predicate first.
2889                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2890                        // Mission F: skip the first 4 Vec doublings.
2891                        let mut rids: Vec<RowId> = Vec::with_capacity(64);
2892                        self.catalog
2893                            .for_each_row_raw(table, |rid, data| {
2894                                if compiled(data) {
2895                                    rids.push(rid);
2896                                }
2897                            })
2898                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2899                        return Ok(rids);
2900                    }
2901
2902                    // Fallback: selective decode + eval.
2903                    let pred_cols = predicate_column_indices(predicate, &columns);
2904                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2905                    self.catalog
2906                        .for_each_row_raw(table, |rid, data| {
2907                            let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2908                            if eval_predicate(predicate, &pred_row, &columns) {
2909                                rids.push(rid);
2910                            }
2911                        })
2912                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2913                    return Ok(rids);
2914                }
2915                self.generic_rid_match(input, table)
2916            }
2917            _ => self.generic_rid_match(input, table),
2918        }
2919    }
2920
2921    /// Last-ditch generic match: execute the plan, collect matching rows,
2922    /// then find corresponding RowIds by value equality. This is the old
2923    /// O(N*M) code path; only used when the plan shape is something exotic.
2924    fn generic_rid_match(
2925        &mut self,
2926        input: &PlanNode,
2927        table: &str,
2928    ) -> Result<Vec<RowId>, QueryError> {
2929        let result = self.execute_plan(input)?;
2930        let rows = match result {
2931            QueryResult::Rows { rows, .. } => rows,
2932            _ => return Err("mutation source must be rows".into()),
2933        };
2934        let matching: Vec<RowId> = self
2935            .catalog
2936            .scan(table)
2937            .map_err(|e| QueryError::StorageError(e.to_string()))?
2938            .filter(|(_, row)| rows.iter().any(|r| r == row))
2939            .map(|(rid, _)| rid)
2940            .collect();
2941        Ok(matching)
2942    }
2943}
2944
2945pub(super) fn execute_window(
2946    result: QueryResult,
2947    windows: &[WindowDef],
2948) -> Result<QueryResult, QueryError> {
2949    let (mut columns, mut rows) = match result {
2950        QueryResult::Rows { columns, rows } => (columns, rows),
2951        _ => return Err("window function requires row input".into()),
2952    };
2953
2954    for wdef in windows {
2955        // Resolve partition/order column indices against current columns.
2956        let part_indices: Vec<usize> = wdef
2957            .partition_by
2958            .iter()
2959            .map(|name| {
2960                columns
2961                    .iter()
2962                    .position(|c| c == name)
2963                    .ok_or_else(|| format!("window partition column '{name}' not found"))
2964            })
2965            .collect::<Result<Vec<_>, _>>()?;
2966
2967        let ord_indices: Vec<(usize, bool)> = wdef
2968            .order_by
2969            .iter()
2970            .map(|sk| {
2971                columns
2972                    .iter()
2973                    .position(|c| c == &sk.field)
2974                    .map(|i| (i, sk.descending))
2975                    .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2976            })
2977            .collect::<Result<Vec<_>, _>>()?;
2978
2979        // Resolve the argument column index (for aggregate windows).
2980        let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2981            match arg {
2982                Expr::Field(name) => {
2983                    if name == "*" {
2984                        None // count(*) style — no specific column
2985                    } else {
2986                        Some(
2987                            columns
2988                                .iter()
2989                                .position(|c| c == name)
2990                                .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2991                        )
2992                    }
2993                }
2994                _ => None,
2995            }
2996        } else {
2997            None
2998        };
2999
3000        // Build a sort-index to sort rows by partition_by then order_by
3001        // without actually reordering the original Vec (we need original
3002        // order to write results back).
3003        let n = rows.len();
3004        let mut indices: Vec<usize> = (0..n).collect();
3005        indices.sort_by(|&a, &b| {
3006            // Compare partition keys first.
3007            for &pi in &part_indices {
3008                let cmp = rows[a][pi].cmp(&rows[b][pi]);
3009                if cmp != std::cmp::Ordering::Equal {
3010                    return cmp;
3011                }
3012            }
3013            // Then order keys.
3014            for &(oi, desc) in &ord_indices {
3015                let cmp = rows[a][oi].cmp(&rows[b][oi]);
3016                if cmp != std::cmp::Ordering::Equal {
3017                    return if desc { cmp.reverse() } else { cmp };
3018                }
3019            }
3020            std::cmp::Ordering::Equal
3021        });
3022
3023        // SQL window-frame semantics: with no `order` clause the frame for an
3024        // aggregate window is the ENTIRE partition, not the running prefix.
3025        // The loop below computes running values; for the no-order case we
3026        // back-fill every row of a partition with the partition's final
3027        // (i.e. complete) aggregate once its boundary is reached. Ranking
3028        // functions are untouched — row_number/rank/dense_rank are inherently
3029        // positional.
3030        let whole_partition_frame = wdef.order_by.is_empty()
3031            && matches!(
3032                wdef.function,
3033                WindowFunc::Sum
3034                    | WindowFunc::Avg
3035                    | WindowFunc::Count
3036                    | WindowFunc::Min
3037                    | WindowFunc::Max
3038            );
3039        // Original row indices of the partition currently being scanned
3040        // (only tracked when back-filling is needed).
3041        let mut partition_row_indices: Vec<usize> = Vec::new();
3042
3043        // Compute window values in sorted order, tracking partition boundaries.
3044        let mut win_values: Vec<Value> = vec![Value::Empty; n];
3045        let mut partition_start = 0usize;
3046        // Running state for aggregate windows:
3047        let mut running_count: i64 = 0;
3048        let mut running_int_sum: i64 = 0;
3049        let mut running_float_sum: f64 = 0.0;
3050        let mut running_saw_float = false;
3051        let mut running_min: Option<Value> = None;
3052        let mut running_max: Option<Value> = None;
3053        let mut rank_counter: i64 = 0;
3054        let mut dense_rank_counter: i64 = 0;
3055        let mut prev_order_key: Option<Vec<Value>> = None;
3056        let mut same_rank_count: i64 = 0;
3057
3058        for sorted_pos in 0..n {
3059            let row_idx = indices[sorted_pos];
3060
3061            // Detect partition boundary.
3062            let new_partition = if sorted_pos == 0 {
3063                true
3064            } else {
3065                let prev_row_idx = indices[sorted_pos - 1];
3066                part_indices
3067                    .iter()
3068                    .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
3069            };
3070
3071            if new_partition {
3072                // No-order aggregate frame: the partition that just ended is
3073                // complete, so its final running value IS the whole-partition
3074                // aggregate. Back-fill it onto every row of that partition.
3075                if whole_partition_frame && sorted_pos > 0 {
3076                    let final_v = win_values[indices[sorted_pos - 1]].clone();
3077                    for ri in partition_row_indices.drain(..) {
3078                        win_values[ri] = final_v.clone();
3079                    }
3080                }
3081                partition_start = sorted_pos;
3082                running_count = 0;
3083                running_int_sum = 0;
3084                running_float_sum = 0.0;
3085                running_saw_float = false;
3086                running_min = None;
3087                running_max = None;
3088                rank_counter = 0;
3089                dense_rank_counter = 0;
3090                prev_order_key = None;
3091                same_rank_count = 0;
3092            }
3093
3094            // Extract current order key for rank tracking.
3095            let current_order_key: Vec<Value> = ord_indices
3096                .iter()
3097                .map(|&(oi, _)| rows[row_idx][oi].clone())
3098                .collect();
3099            let same_as_prev = prev_order_key.as_ref() == Some(&current_order_key);
3100
3101            let value = match wdef.function {
3102                WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
3103                WindowFunc::Rank => {
3104                    if same_as_prev {
3105                        same_rank_count += 1;
3106                    } else {
3107                        rank_counter += same_rank_count + 1;
3108                        same_rank_count = 0;
3109                        if rank_counter == 0 {
3110                            rank_counter = 1;
3111                        }
3112                    }
3113                    Value::Int(rank_counter)
3114                }
3115                WindowFunc::DenseRank => {
3116                    if !same_as_prev {
3117                        dense_rank_counter += 1;
3118                    }
3119                    Value::Int(dense_rank_counter)
3120                }
3121                WindowFunc::Sum => {
3122                    if let Some(ci) = arg_col_idx {
3123                        match &rows[row_idx][ci] {
3124                            Value::Int(v) => running_int_sum += v,
3125                            Value::Float(v) => {
3126                                running_float_sum += v;
3127                                running_saw_float = true;
3128                            }
3129                            _ => {}
3130                        }
3131                    }
3132                    if running_saw_float {
3133                        Value::Float(running_float_sum + running_int_sum as f64)
3134                    } else {
3135                        Value::Int(running_int_sum)
3136                    }
3137                }
3138                WindowFunc::Avg => {
3139                    if let Some(ci) = arg_col_idx {
3140                        match &rows[row_idx][ci] {
3141                            Value::Int(v) => {
3142                                running_float_sum += *v as f64;
3143                                running_count += 1;
3144                            }
3145                            Value::Float(v) => {
3146                                running_float_sum += v;
3147                                running_count += 1;
3148                            }
3149                            _ => {}
3150                        }
3151                    }
3152                    if running_count == 0 {
3153                        Value::Empty
3154                    } else {
3155                        Value::Float(running_float_sum / running_count as f64)
3156                    }
3157                }
3158                WindowFunc::Count => {
3159                    if let Some(ci) = arg_col_idx {
3160                        if !rows[row_idx][ci].is_empty() {
3161                            running_count += 1;
3162                        }
3163                    } else {
3164                        // count(*) — count all rows
3165                        running_count += 1;
3166                    }
3167                    Value::Int(running_count)
3168                }
3169                WindowFunc::Min => {
3170                    if let Some(ci) = arg_col_idx {
3171                        let v = &rows[row_idx][ci];
3172                        if !v.is_empty() {
3173                            running_min = Some(match &running_min {
3174                                None => v.clone(),
3175                                Some(cur) => {
3176                                    if v < cur {
3177                                        v.clone()
3178                                    } else {
3179                                        cur.clone()
3180                                    }
3181                                }
3182                            });
3183                        }
3184                    }
3185                    running_min.clone().unwrap_or(Value::Empty)
3186                }
3187                WindowFunc::Max => {
3188                    if let Some(ci) = arg_col_idx {
3189                        let v = &rows[row_idx][ci];
3190                        if !v.is_empty() {
3191                            running_max = Some(match &running_max {
3192                                None => v.clone(),
3193                                Some(cur) => {
3194                                    if v > cur {
3195                                        v.clone()
3196                                    } else {
3197                                        cur.clone()
3198                                    }
3199                                }
3200                            });
3201                        }
3202                    }
3203                    running_max.clone().unwrap_or(Value::Empty)
3204                }
3205            };
3206
3207            prev_order_key = Some(current_order_key);
3208            win_values[row_idx] = value;
3209            if whole_partition_frame {
3210                partition_row_indices.push(row_idx);
3211            }
3212        }
3213
3214        // Back-fill the final partition (the loop only flushes at boundaries).
3215        if whole_partition_frame && n > 0 {
3216            let final_v = win_values[indices[n - 1]].clone();
3217            for ri in partition_row_indices.drain(..) {
3218                win_values[ri] = final_v.clone();
3219            }
3220        }
3221
3222        // Append the computed window column to each row.
3223        for (ri, row) in rows.iter_mut().enumerate() {
3224            row.push(win_values[ri].clone());
3225        }
3226        columns.push(wdef.output_name.clone());
3227    }
3228
3229    Ok(QueryResult::Rows { columns, rows })
3230}
3231
3232/// Mission E2b: compute one aggregate over a set of rows in a group.
3233pub(super) fn compute_group_aggregate(
3234    func: AggFunc,
3235    all_rows: &[Vec<Value>],
3236    row_indices: &[usize],
3237    col_idx: usize,
3238) -> Value {
3239    match func {
3240        AggFunc::Count => {
3241            if col_idx == usize::MAX {
3242                // count(*) — count all rows in the group.
3243                return Value::Int(row_indices.len() as i64);
3244            }
3245            let count = row_indices
3246                .iter()
3247                .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3248                .count();
3249            Value::Int(count as i64)
3250        }
3251        AggFunc::CountDistinct => {
3252            let mut seen = std::collections::HashSet::new();
3253            for &ri in row_indices {
3254                let v = &all_rows[ri][col_idx];
3255                if !v.is_empty() {
3256                    seen.insert(v.clone());
3257                }
3258            }
3259            Value::Int(seen.len() as i64)
3260        }
3261        AggFunc::Sum => {
3262            // Mirror the scalar Sum path: accumulate int and float
3263            // contributions separately and promote the final result to
3264            // Float if any Float row was observed. Prevents silent
3265            // drop of Float columns in GROUP BY aggregates.
3266            let mut int_sum: i64 = 0;
3267            let mut float_sum: f64 = 0.0;
3268            let mut saw_float = false;
3269            for &ri in row_indices {
3270                match &all_rows[ri][col_idx] {
3271                    Value::Int(v) => int_sum += v,
3272                    Value::Float(v) => {
3273                        float_sum += *v;
3274                        saw_float = true;
3275                    }
3276                    _ => {}
3277                }
3278            }
3279            if saw_float {
3280                Value::Float(float_sum + int_sum as f64)
3281            } else {
3282                Value::Int(int_sum)
3283            }
3284        }
3285        AggFunc::Avg => {
3286            let mut sum = 0.0f64;
3287            let mut count = 0usize;
3288            for &ri in row_indices {
3289                match &all_rows[ri][col_idx] {
3290                    Value::Int(v) => {
3291                        sum += *v as f64;
3292                        count += 1;
3293                    }
3294                    Value::Float(v) => {
3295                        sum += *v;
3296                        count += 1;
3297                    }
3298                    _ => {}
3299                }
3300            }
3301            if count == 0 {
3302                Value::Empty
3303            } else {
3304                Value::Float(sum / count as f64)
3305            }
3306        }
3307        AggFunc::Min => row_indices
3308            .iter()
3309            .map(|&ri| &all_rows[ri][col_idx])
3310            .filter(|v| !v.is_empty())
3311            .min()
3312            .cloned()
3313            .unwrap_or(Value::Empty),
3314        AggFunc::Max => row_indices
3315            .iter()
3316            .map(|&ri| &all_rows[ri][col_idx])
3317            .filter(|v| !v.is_empty())
3318            .max()
3319            .cloned()
3320            .unwrap_or(Value::Empty),
3321    }
3322}
3323
3324/// Mission E1.3: try to extract equi-join key indices from a join `on`
3325/// predicate. Returns `Some((left_col_idx, right_col_idx))` when the
3326/// predicate is exactly `L = R` (or `R = L`) and both sides resolve
3327/// cleanly — `L` to the left subtree's column list and `R` to the right
3328/// subtree's column list.
3329///
3330/// This is deliberately narrow. We only recognise the two shapes:
3331///   * `QualifiedField = QualifiedField`  (`u.id = o.user_id`)
3332///   * `Field = Field`                    (`.id = .user_id`, unqualified)
3333///
3334/// Anything else — conjunctions, constants, function calls, or predicates
3335/// that touch the same side on both halves — falls through to the
3336/// nested-loop path unchanged.
3337pub(super) fn try_extract_equi_join_keys(
3338    pred: &Expr,
3339    left_columns: &[String],
3340    right_columns: &[String],
3341) -> Option<(usize, usize)> {
3342    let (lhs, op, rhs) = match pred {
3343        Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3344        _ => return None,
3345    };
3346    if op != BinOp::Eq {
3347        return None;
3348    }
3349    // Normal orientation: lhs in left, rhs in right.
3350    if let (Some(li), Some(ri)) = (
3351        resolve_side_column(lhs, left_columns),
3352        resolve_side_column(rhs, right_columns),
3353    ) {
3354        return Some((li, ri));
3355    }
3356    // Swapped: rhs in left, lhs in right. Both sides of `=` are
3357    // commutative so this is safe.
3358    if let (Some(li), Some(ri)) = (
3359        resolve_side_column(rhs, left_columns),
3360        resolve_side_column(lhs, right_columns),
3361    ) {
3362        return Some((li, ri));
3363    }
3364    None
3365}
3366
3367fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3368    match expr {
3369        Expr::QualifiedField { qualifier, field } => {
3370            // Byte-level match so we don't allocate a fresh `format!` on
3371            // every call — this runs once per plan, so allocation would be
3372            // cheap, but the match is trivial enough to keep inline with
3373            // the eval_expr version.
3374            let q = qualifier.as_bytes();
3375            let f = field.as_bytes();
3376            columns.iter().position(|c| {
3377                let b = c.as_bytes();
3378                b.len() == q.len() + 1 + f.len()
3379                    && b[..q.len()] == *q
3380                    && b[q.len()] == b'.'
3381                    && b[q.len() + 1..] == *f
3382            })
3383        }
3384        Expr::Field(name) => columns.iter().position(|c| c == name),
3385        _ => None,
3386    }
3387}
3388
3389/// Mission E1.3: O(L + R) hash join. Builds a `FxHashMap<Value, Vec<usize>>`
3390/// over the right (inner) side's join keys, then streams the left (outer)
3391/// side and for each probe row emits every combined row whose right-side
3392/// key matches. For `JoinKind::LeftOuter`, unmatched left rows are emitted
3393/// padded with `Value::Empty` on the right side.
3394///
3395/// The right side is always the build side. That choice is forced for
3396/// LeftOuter (the left side must stream so we can detect orphans), and
3397/// for Inner it's a reasonable default — left-deep plans tend to grow the
3398/// left side with each join, so the un-joined right leaf is often the
3399/// smaller of the two at each level.
3400pub(super) fn hash_join(
3401    left_columns: Vec<String>,
3402    left_rows: Vec<Vec<Value>>,
3403    right_columns: Vec<String>,
3404    right_rows: Vec<Vec<Value>>,
3405    left_key_idx: usize,
3406    right_key_idx: usize,
3407    kind: JoinKind,
3408) -> QueryResult {
3409    use rustc_hash::FxHashMap;
3410
3411    let n_left = left_columns.len();
3412    let n_right = right_columns.len();
3413    let mut columns = Vec::with_capacity(n_left + n_right);
3414    columns.extend(left_columns);
3415    columns.extend(right_columns);
3416
3417    // Build: right_key -> list of right-row indices. Pre-size to the row
3418    // count so the map doesn't rehash mid-build.
3419    let mut build: FxHashMap<Value, Vec<usize>> =
3420        FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3421    for (i, row) in right_rows.iter().enumerate() {
3422        // Skip Empty keys on the build side — they can never match under
3423        // SQL semantics (NULL ≠ NULL) and would collapse all nullables to
3424        // one bucket.
3425        if matches!(row[right_key_idx], Value::Empty) {
3426            continue;
3427        }
3428        build.entry(row[right_key_idx].clone()).or_default().push(i);
3429    }
3430
3431    // Reasonable starting capacity — inner joins produce ≥ left_rows.len()
3432    // rows in the common 1:1 case, left-outer always emits ≥ left_rows.len().
3433    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3434
3435    for left_row in &left_rows {
3436        let key = &left_row[left_key_idx];
3437        let matched = if matches!(key, Value::Empty) {
3438            None
3439        } else {
3440            build.get(key)
3441        };
3442        match matched {
3443            Some(matches) if !matches.is_empty() => {
3444                for &ri in matches {
3445                    let right_row = &right_rows[ri];
3446                    let mut combined = Vec::with_capacity(n_left + n_right);
3447                    combined.extend_from_slice(left_row);
3448                    combined.extend_from_slice(right_row);
3449                    rows.push(combined);
3450                }
3451            }
3452            _ => {
3453                if matches!(kind, JoinKind::LeftOuter) {
3454                    let mut row = Vec::with_capacity(n_left + n_right);
3455                    row.extend_from_slice(left_row);
3456                    row.resize(n_left + n_right, Value::Empty);
3457                    rows.push(row);
3458                }
3459            }
3460        }
3461    }
3462
3463    QueryResult::Rows { columns, rows }
3464}
3465
3466/// Lower unindexed `RangeScan` and `IndexScan` nodes to `Filter(SeqScan)`
3467/// so that all downstream fast paths (count, project+limit, sort+limit,
3468/// agg, update, delete) continue to fire.
3469///
3470/// The planner emits `RangeScan` (for `.age > 30`) and `IndexScan` (for
3471/// `.email = lit`) speculatively because it has no catalog access. When
3472/// the column has a B-tree index, those plans are correct. When it
3473/// doesn't, the executor's fallbacks materialise every matching row with
3474/// full `decode_row` — bypassing the compiled-predicate fast paths that
3475/// `Filter(SeqScan)` would trigger. Lowering both speculative leaf kinds
3476/// also keeps EXPLAIN honest: it prints the plan that actually runs.
3477///
3478/// This pass runs once per query, before execution.
3479pub(super) fn lower_unindexed_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3480    match plan {
3481        PlanNode::RangeScan {
3482            table,
3483            column,
3484            start,
3485            end,
3486        } => {
3487            if let Some(tbl) = catalog.get_table(table) {
3488                // Keep RangeScan whenever ANY index exists on the column:
3489                // unique indexes store raw column values, non-unique indexes
3490                // store composite (value, rid) keys that the executor walks
3491                // natively via BTree::range_rids. Only lower to Filter(SeqScan)
3492                // when the column is unindexed.
3493                if tbl.has_index(column) {
3494                    return plan.clone();
3495                }
3496            }
3497            let pred = synthesize_range_predicate(column, start, end);
3498            PlanNode::Filter {
3499                input: Box::new(PlanNode::SeqScan {
3500                    table: table.clone(),
3501                }),
3502                predicate: pred,
3503            }
3504        }
3505        PlanNode::Filter { input, predicate } => PlanNode::Filter {
3506            input: Box::new(lower_unindexed_scans(catalog, input)),
3507            predicate: predicate.clone(),
3508        },
3509        PlanNode::Project { input, fields } => PlanNode::Project {
3510            input: Box::new(lower_unindexed_scans(catalog, input)),
3511            fields: fields.clone(),
3512        },
3513        PlanNode::Sort { input, keys } => PlanNode::Sort {
3514            input: Box::new(lower_unindexed_scans(catalog, input)),
3515            keys: keys.clone(),
3516        },
3517        PlanNode::Limit { input, count } => PlanNode::Limit {
3518            input: Box::new(lower_unindexed_scans(catalog, input)),
3519            count: count.clone(),
3520        },
3521        PlanNode::Offset { input, count } => PlanNode::Offset {
3522            input: Box::new(lower_unindexed_scans(catalog, input)),
3523            count: count.clone(),
3524        },
3525        PlanNode::Aggregate {
3526            input,
3527            function,
3528            field,
3529        } => PlanNode::Aggregate {
3530            input: Box::new(lower_unindexed_scans(catalog, input)),
3531            function: *function,
3532            field: field.clone(),
3533        },
3534        PlanNode::Distinct { input } => PlanNode::Distinct {
3535            input: Box::new(lower_unindexed_scans(catalog, input)),
3536        },
3537        PlanNode::GroupBy {
3538            input,
3539            keys,
3540            aggregates,
3541            having,
3542        } => PlanNode::GroupBy {
3543            input: Box::new(lower_unindexed_scans(catalog, input)),
3544            keys: keys.clone(),
3545            aggregates: aggregates.clone(),
3546            having: having.clone(),
3547        },
3548        PlanNode::Update {
3549            input,
3550            table,
3551            assignments,
3552        } => PlanNode::Update {
3553            input: Box::new(lower_unindexed_scans(catalog, input)),
3554            table: table.clone(),
3555            assignments: assignments.clone(),
3556        },
3557        PlanNode::Delete { input, table } => PlanNode::Delete {
3558            input: Box::new(lower_unindexed_scans(catalog, input)),
3559            table: table.clone(),
3560        },
3561        PlanNode::Window { input, windows } => PlanNode::Window {
3562            input: Box::new(lower_unindexed_scans(catalog, input)),
3563            windows: windows.clone(),
3564        },
3565        PlanNode::Union { left, right, all } => PlanNode::Union {
3566            left: Box::new(lower_unindexed_scans(catalog, left)),
3567            right: Box::new(lower_unindexed_scans(catalog, right)),
3568            all: *all,
3569        },
3570        PlanNode::Explain { input } => PlanNode::Explain {
3571            input: Box::new(lower_unindexed_scans(catalog, input)),
3572        },
3573        PlanNode::NestedLoopJoin {
3574            left,
3575            right,
3576            on,
3577            kind,
3578        } => PlanNode::NestedLoopJoin {
3579            left: Box::new(lower_unindexed_scans(catalog, left)),
3580            right: Box::new(lower_unindexed_scans(catalog, right)),
3581            on: on.clone(),
3582            kind: *kind,
3583        },
3584        PlanNode::IndexScan { table, column, key } => {
3585            if let Some(tbl) = catalog.get_table(table) {
3586                if tbl.has_index(column) {
3587                    return plan.clone();
3588                }
3589            }
3590            PlanNode::Filter {
3591                input: Box::new(PlanNode::SeqScan {
3592                    table: table.clone(),
3593                }),
3594                predicate: Expr::BinaryOp(
3595                    Box::new(Expr::Field(column.clone())),
3596                    BinOp::Eq,
3597                    Box::new(key.clone()),
3598                ),
3599            }
3600        }
3601        // Leaf nodes: no children to recurse into.
3602        _ => plan.clone(),
3603    }
3604}
3605
3606/// Synthesize a range predicate from RangeScan bounds for the fallback path.
3607pub(super) fn synthesize_range_predicate(
3608    column: &str,
3609    start: &Option<(Expr, bool)>,
3610    end: &Option<(Expr, bool)>,
3611) -> Expr {
3612    let lower = start.as_ref().map(|(expr, inclusive)| {
3613        let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3614        Expr::BinaryOp(
3615            Box::new(Expr::Field(column.to_string())),
3616            op,
3617            Box::new(expr.clone()),
3618        )
3619    });
3620    let upper = end.as_ref().map(|(expr, inclusive)| {
3621        let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3622        Expr::BinaryOp(
3623            Box::new(Expr::Field(column.to_string())),
3624            op,
3625            Box::new(expr.clone()),
3626        )
3627    });
3628    match (lower, upper) {
3629        (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3630        (Some(l), None) => l,
3631        (None, Some(u)) => u,
3632        (None, None) => Expr::Literal(Literal::Bool(true)),
3633    }
3634}
3635
3636/// Check if a value falls within a range (used in last-resort decoded-row eval).
3637pub(super) fn range_matches(
3638    val: &Value,
3639    start: &Option<Value>,
3640    start_inc: bool,
3641    end: &Option<Value>,
3642    end_inc: bool,
3643) -> bool {
3644    if let Some(ref s) = start {
3645        if start_inc {
3646            if val < s {
3647                return false;
3648            }
3649        } else if val <= s {
3650            return false;
3651        }
3652    }
3653    if let Some(ref e) = end {
3654        if end_inc {
3655            if val > e {
3656                return false;
3657            }
3658        } else if val >= e {
3659            return false;
3660        }
3661    }
3662    true
3663}
3664
3665/// Format a `PlanNode` tree as a human-readable, indented text
3666/// representation. Used by the `EXPLAIN` command.
3667pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3668    let indent = "  ".repeat(depth);
3669    match plan {
3670        PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3671        PlanNode::AliasScan { table, alias } => {
3672            format!("{indent}AliasScan table={table} alias={alias}")
3673        }
3674        PlanNode::IndexScan { table, column, key } => {
3675            format!("{indent}IndexScan table={table} column={column} key={key:?}")
3676        }
3677        PlanNode::RangeScan {
3678            table,
3679            column,
3680            start,
3681            end,
3682        } => {
3683            let s = match start {
3684                Some((expr, inc)) => {
3685                    let op = if *inc { ">=" } else { ">" };
3686                    format!("{op}{expr:?}")
3687                }
3688                None => "unbounded".to_string(),
3689            };
3690            let e = match end {
3691                Some((expr, inc)) => {
3692                    let op = if *inc { "<=" } else { "<" };
3693                    format!("{op}{expr:?}")
3694                }
3695                None => "unbounded".to_string(),
3696            };
3697            format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3698        }
3699        PlanNode::Filter { input, predicate } => {
3700            let child = format_plan_tree(input, depth + 1);
3701            format!("{indent}Filter predicate={predicate:?}\n{child}")
3702        }
3703        PlanNode::Project { input, fields } => {
3704            let names: Vec<String> = fields
3705                .iter()
3706                .map(|f| match &f.alias {
3707                    Some(a) => format!("{a}: {:?}", f.expr),
3708                    None => format!("{:?}", f.expr),
3709                })
3710                .collect();
3711            let child = format_plan_tree(input, depth + 1);
3712            format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3713        }
3714        PlanNode::Sort { input, keys } => {
3715            let ks: Vec<String> = keys
3716                .iter()
3717                .map(|k| {
3718                    if k.descending {
3719                        format!("{} desc", k.field)
3720                    } else {
3721                        k.field.clone()
3722                    }
3723                })
3724                .collect();
3725            let child = format_plan_tree(input, depth + 1);
3726            format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3727        }
3728        PlanNode::Limit { input, count } => {
3729            let child = format_plan_tree(input, depth + 1);
3730            format!("{indent}Limit count={count:?}\n{child}")
3731        }
3732        PlanNode::Offset { input, count } => {
3733            let child = format_plan_tree(input, depth + 1);
3734            format!("{indent}Offset count={count:?}\n{child}")
3735        }
3736        PlanNode::Aggregate {
3737            input,
3738            function,
3739            field,
3740        } => {
3741            let f = field.as_deref().unwrap_or("*");
3742            let child = format_plan_tree(input, depth + 1);
3743            format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3744        }
3745        PlanNode::NestedLoopJoin {
3746            left,
3747            right,
3748            on,
3749            kind,
3750        } => {
3751            let left_child = format_plan_tree(left, depth + 1);
3752            let right_child = format_plan_tree(right, depth + 1);
3753            let on_str = match on {
3754                Some(pred) => format!("{pred:?}"),
3755                None => "none".to_string(),
3756            };
3757            format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3758        }
3759        PlanNode::Distinct { input } => {
3760            let child = format_plan_tree(input, depth + 1);
3761            format!("{indent}Distinct\n{child}")
3762        }
3763        PlanNode::GroupBy {
3764            input,
3765            keys,
3766            aggregates,
3767            having,
3768        } => {
3769            let agg_strs: Vec<String> = aggregates
3770                .iter()
3771                .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3772                .collect();
3773            let having_str = match having {
3774                Some(h) => format!(" having={h:?}"),
3775                None => String::new(),
3776            };
3777            let child = format_plan_tree(input, depth + 1);
3778            format!(
3779                "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3780                keys.join(", "),
3781                agg_strs.join(", "),
3782            )
3783        }
3784        PlanNode::Insert { table, rows } => {
3785            let cols: Vec<&str> = rows
3786                .first()
3787                .map(|r| r.iter().map(|a| a.field.as_str()).collect())
3788                .unwrap_or_default();
3789            format!(
3790                "{indent}Insert table={table} rows={} cols=[{}]",
3791                rows.len(),
3792                cols.join(", ")
3793            )
3794        }
3795        PlanNode::Upsert {
3796            table,
3797            key_column,
3798            assignments,
3799            on_conflict,
3800        } => {
3801            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3802            let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3803            if conflict_cols.is_empty() {
3804                format!(
3805                    "{indent}Upsert table={table} key={key_column} cols=[{}]",
3806                    cols.join(", ")
3807                )
3808            } else {
3809                format!(
3810                    "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3811                    cols.join(", "),
3812                    conflict_cols.join(", ")
3813                )
3814            }
3815        }
3816        PlanNode::Update {
3817            input,
3818            table,
3819            assignments,
3820        } => {
3821            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3822            let child = format_plan_tree(input, depth + 1);
3823            format!(
3824                "{indent}Update table={table} set=[{}]\n{child}",
3825                cols.join(", ")
3826            )
3827        }
3828        PlanNode::Delete { input, table } => {
3829            let child = format_plan_tree(input, depth + 1);
3830            format!("{indent}Delete table={table}\n{child}")
3831        }
3832        PlanNode::CreateTable { name, fields } => {
3833            let fs: Vec<String> = fields
3834                .iter()
3835                .map(|f| {
3836                    let mut mods = String::new();
3837                    if f.required {
3838                        mods.push_str(" required");
3839                    }
3840                    if f.unique {
3841                        mods.push_str(" unique");
3842                    }
3843                    format!("{}: {}{mods}", f.name, f.type_name)
3844                })
3845                .collect();
3846            format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3847        }
3848        PlanNode::AlterTable { table, action } => {
3849            format!("{indent}AlterTable table={table} action={action:?}")
3850        }
3851        PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3852        PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3853        PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3854        PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3855        PlanNode::Window { input, windows } => {
3856            let ws: Vec<String> = windows
3857                .iter()
3858                .map(|w| format!("{:?} as {}", w.function, w.output_name))
3859                .collect();
3860            let child = format_plan_tree(input, depth + 1);
3861            format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3862        }
3863        PlanNode::Union { left, right, all } => {
3864            let kind = if *all { "UNION ALL" } else { "UNION" };
3865            let left_child = format_plan_tree(left, depth + 1);
3866            let right_child = format_plan_tree(right, depth + 1);
3867            format!("{indent}{kind}\n{left_child}\n{right_child}")
3868        }
3869        PlanNode::Explain { input } => {
3870            let child = format_plan_tree(input, depth + 1);
3871            format!("{indent}Explain\n{child}")
3872        }
3873        PlanNode::Begin => format!("{indent}Begin"),
3874        PlanNode::Commit => format!("{indent}Commit"),
3875        PlanNode::Rollback => format!("{indent}Rollback"),
3876    }
3877}