Skip to main content

powdb_query/executor/
plan_exec.rs

1//! The execute_plan method and associated helpers.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::{check_join_limit, Engine, MAX_SORT_ROWS};
15use powdb_storage::view::{ViewDef, ViewRegistry};
16
17impl Engine {
18    pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
19        match plan {
20            PlanNode::SeqScan { table } => {
21                // Auto-refresh dirty materialized views on read.
22                if self.view_registry.is_dirty(table) {
23                    self.refresh_view(table)?;
24                }
25                let schema = self
26                    .catalog
27                    .schema(table)
28                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
29                    .clone();
30                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
31                let rows: Vec<Vec<Value>> = self
32                    .catalog
33                    .scan(table)
34                    .map_err(|e| QueryError::StorageError(e.to_string()))?
35                    .map(|(_, row)| row)
36                    .collect();
37                Ok(QueryResult::Rows { columns, rows })
38            }
39
40            PlanNode::Filter { input, predicate } => {
41                // Materialize any IN-subqueries in the predicate before the
42                // scan loop — the closure can't call back into the engine.
43                // Correlated subqueries are left in place for per-row eval.
44                let materialized;
45                let predicate = if contains_subquery(predicate) {
46                    materialized = self.materialize_subqueries(predicate)?;
47                    &materialized
48                } else {
49                    predicate
50                };
51
52                // Correlated subquery path: per-row materialisation.
53                if contains_subquery(predicate) {
54                    let result = self.execute_plan(input)?;
55                    return match result {
56                        QueryResult::Rows { columns, rows } => {
57                            let mut filtered = Vec::new();
58                            for row in rows {
59                                let row_pred =
60                                    self.materialize_correlated_for_row(predicate, &row, &columns)?;
61                                if eval_predicate(&row_pred, &row, &columns) {
62                                    filtered.push(row);
63                                }
64                            }
65                            Ok(QueryResult::Rows {
66                                columns,
67                                rows: filtered,
68                            })
69                        }
70                        _ => Err("filter requires row input".into()),
71                    };
72                }
73
74                // Fast path: fuse Filter + SeqScan into a zero-copy streaming
75                // loop. Uses decode_column() to evaluate the predicate on only
76                // the columns it references, avoiding heap allocations for
77                // String/Bytes columns that aren't part of the filter.
78                if let PlanNode::SeqScan { table } = input.as_ref() {
79                    // Auto-refresh dirty materialized views.
80                    if self.view_registry.is_dirty(table) {
81                        self.refresh_view(table)?;
82                    }
83                    let schema = self
84                        .catalog
85                        .schema(table)
86                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
87                        .clone();
88                    let columns: Vec<String> =
89                        schema.columns.iter().map(|c| c.name.clone()).collect();
90                    let fast = FastLayout::new(&schema);
91                    let row_layout = RowLayout::new(&schema);
92                    // Mission F: pre-size to skip the first 4 Vec doublings
93                    // (4 → 8 → 16 → 32 → 64). On a 100K-row scan with 30%
94                    // selectivity that's ~4 fewer reallocations + memcpys.
95                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
96
97                    // Try compiled predicate for the filter check (handles
98                    // int leaves, string-eq leaves, and And conjunctions).
99                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
100                        self.catalog
101                            .for_each_row_raw(table, |_rid, data| {
102                                if compiled(data) {
103                                    rows.push(decode_row(&schema, data));
104                                }
105                            })
106                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
107                    } else {
108                        let pred_cols = predicate_column_indices(predicate, &columns);
109                        self.catalog
110                            .for_each_row_raw(table, |_rid, data| {
111                                let pred_row =
112                                    decode_selective(&schema, &row_layout, data, &pred_cols);
113                                if eval_predicate(predicate, &pred_row, &columns) {
114                                    rows.push(decode_row(&schema, data));
115                                }
116                            })
117                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
118                    }
119
120                    return Ok(QueryResult::Rows { columns, rows });
121                }
122
123                // General path: materialise then filter.
124                let result = self.execute_plan(input)?;
125                match result {
126                    QueryResult::Rows { columns, rows } => {
127                        let filtered: Vec<Vec<Value>> = rows
128                            .into_iter()
129                            .filter(|row| eval_predicate(predicate, row, &columns))
130                            .collect();
131                        Ok(QueryResult::Rows {
132                            columns,
133                            rows: filtered,
134                        })
135                    }
136                    _ => Err("filter requires row input".into()),
137                }
138            }
139
140            PlanNode::Project { input, fields } => {
141                // Fast path: Project over IndexScan — decode only projected
142                // columns from raw bytes instead of full decode_row.
143                if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
144                    let schema = self
145                        .catalog
146                        .schema(table)
147                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
148                        .clone();
149                    let all_columns: Vec<String> =
150                        schema.columns.iter().map(|c| c.name.clone()).collect();
151                    let key_value = literal_to_value(key)?;
152                    let tbl = self
153                        .catalog
154                        .get_table(table)
155                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
156
157                    let proj_columns: Vec<String> = fields
158                        .iter()
159                        .map(|f| {
160                            f.alias.clone().unwrap_or_else(|| match &f.expr {
161                                Expr::Field(name) => name.clone(),
162                                _ => "?".into(),
163                            })
164                        })
165                        .collect();
166
167                    // Determine which column indices the projection needs
168                    let proj_indices: Vec<usize> = fields
169                        .iter()
170                        .filter_map(|f| {
171                            if let Expr::Field(name) = &f.expr {
172                                all_columns.iter().position(|c| c == name)
173                            } else {
174                                None
175                            }
176                        })
177                        .collect();
178
179                    if tbl.has_index(column) {
180                        let layout = RowLayout::new(&schema);
181                        let rids = tbl.index_lookup_all(column, &key_value);
182                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
183                        for rid in rids {
184                            if let Some(data) = tbl.heap.get(rid) {
185                                let row: Vec<Value> = proj_indices
186                                    .iter()
187                                    .map(|&ci| decode_column(&schema, &layout, &data, ci))
188                                    .collect();
189                                rows.push(row);
190                            }
191                        }
192                        return Ok(QueryResult::Rows {
193                            columns: proj_columns,
194                            rows,
195                        });
196                    }
197                }
198
199                // Fast path: Project(Limit(Sort(Filter(SeqScan)))) — bounded
200                // top-N heap. Decodes only the sort key + projected columns,
201                // keeps at most `limit` rows in a heap. Also handles the
202                // Project(Limit(Sort(SeqScan))) variant (no filter).
203                if let PlanNode::Limit {
204                    input: inner,
205                    count: limit_expr,
206                } = input.as_ref()
207                {
208                    if let PlanNode::Sort {
209                        input: sort_input,
210                        keys,
211                    } = inner.as_ref()
212                    {
213                        // Fast path only for single-key sorts
214                        if keys.len() == 1 {
215                            let sort_field = &keys[0].field;
216                            let descending = keys[0].descending;
217                            let limit = match limit_expr {
218                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
219                                _ => usize::MAX,
220                            };
221                            let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
222                                match sort_input.as_ref() {
223                                    PlanNode::SeqScan { table } => (Some(table.as_str()), None),
224                                    PlanNode::Filter {
225                                        input: fi,
226                                        predicate,
227                                    } => {
228                                        if let PlanNode::SeqScan { table } = fi.as_ref() {
229                                            (Some(table.as_str()), Some(predicate))
230                                        } else {
231                                            (None, None)
232                                        }
233                                    }
234                                    _ => (None, None),
235                                };
236                            if let Some(table) = table_opt {
237                                if let Some(result) = self.project_filter_sort_limit_fast(
238                                    table, fields, sort_field, descending, limit, pred_opt,
239                                )? {
240                                    return Ok(result);
241                                }
242                            }
243                        }
244                    }
245                    // Fast path: Project(Limit(Filter(SeqScan))) — stream,
246                    // decode only projected columns, stop at limit.
247                    if let PlanNode::Filter {
248                        input: fi,
249                        predicate,
250                    } = inner.as_ref()
251                    {
252                        if let PlanNode::SeqScan { table } = fi.as_ref() {
253                            let limit = match limit_expr {
254                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
255                                _ => usize::MAX,
256                            };
257                            if let Some(result) = self.project_filter_limit_fast(
258                                table,
259                                fields,
260                                limit,
261                                Some(predicate),
262                            )? {
263                                return Ok(result);
264                            }
265                        }
266                    }
267                    // Fast path: Project(Limit(SeqScan)) — stream, no filter.
268                    if let PlanNode::SeqScan { table } = inner.as_ref() {
269                        let limit = match limit_expr {
270                            Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
271                            _ => usize::MAX,
272                        };
273                        if let Some(result) =
274                            self.project_filter_limit_fast(table, fields, limit, None)?
275                        {
276                            return Ok(result);
277                        }
278                    }
279                }
280
281                // Mission D4: Project(Filter(SeqScan)) without Limit. Reuses
282                // `project_filter_limit_fast` with limit = usize::MAX so the
283                // hot loop decodes only projected columns and uses the
284                // compiled predicate. Previously this fell through to the
285                // generic Filter branch which materialised every column via
286                // `decode_row` then re-projected — quadratic work.
287                //
288                // multi_col_and_filter (`U filter .age > 30 and .status =
289                // "active" { .name, .age }`) was 6.18ms (0.7x SQLite) and
290                // is the load-bearing workload for this fast path.
291                if let PlanNode::Filter {
292                    input: fi,
293                    predicate,
294                } = input.as_ref()
295                {
296                    if let PlanNode::SeqScan { table } = fi.as_ref() {
297                        if let Some(result) = self.project_filter_limit_fast(
298                            table,
299                            fields,
300                            usize::MAX,
301                            Some(predicate),
302                        )? {
303                            return Ok(result);
304                        }
305                    }
306                }
307
308                // Mission D4: Project(SeqScan) without Filter or Limit.
309                // Decode only projected columns; the previous fall-through
310                // built full Vec<Value> rows then re-projected.
311                if let PlanNode::SeqScan { table } = input.as_ref() {
312                    if let Some(result) =
313                        self.project_filter_limit_fast(table, fields, usize::MAX, None)?
314                    {
315                        return Ok(result);
316                    }
317                }
318
319                let result = self.execute_plan(input)?;
320                match result {
321                    QueryResult::Rows { columns, rows } => {
322                        let proj_columns: Vec<String> = fields
323                            .iter()
324                            .map(|f| {
325                                f.alias.clone().unwrap_or_else(|| match &f.expr {
326                                    Expr::Field(name) => name.clone(),
327                                    // Mission E1.2: `{ u.name }` projects as the
328                                    // qualified column name so callers can still
329                                    // disambiguate across the join output.
330                                    Expr::QualifiedField { qualifier, field } => {
331                                        format!("{qualifier}.{field}")
332                                    }
333                                    _ => "?".into(),
334                                })
335                            })
336                            .collect();
337                        let proj_rows: Vec<Vec<Value>> = rows
338                            .iter()
339                            .map(|row| {
340                                fields
341                                    .iter()
342                                    .map(|f| eval_expr(&f.expr, row, &columns))
343                                    .collect()
344                            })
345                            .collect();
346                        Ok(QueryResult::Rows {
347                            columns: proj_columns,
348                            rows: proj_rows,
349                        })
350                    }
351                    _ => Err("project requires row input".into()),
352                }
353            }
354
355            PlanNode::Sort { input, keys } => {
356                let result = self.execute_plan(input)?;
357                match result {
358                    QueryResult::Rows { columns, mut rows } => {
359                        // WS2: row-count cap is a cheap secondary guard; the
360                        // byte budget is the real OOM defense for the sort
361                        // buffer (a few very large rows pass the row cap).
362                        if rows.len() > MAX_SORT_ROWS {
363                            return Err(QueryError::SortLimitExceeded);
364                        }
365                        self.charge_rows(&rows)?;
366                        let key_indices: Vec<(usize, bool)> = keys
367                            .iter()
368                            .map(|k| {
369                                columns
370                                    .iter()
371                                    .position(|c| c == &k.field)
372                                    .map(|idx| (idx, k.descending))
373                                    .ok_or_else(|| QueryError::ColumnNotFound {
374                                        table: String::new(),
375                                        column: k.field.clone(),
376                                    })
377                            })
378                            .collect::<Result<_, QueryError>>()?;
379                        rows.sort_by(|a, b| {
380                            for &(col_idx, descending) in &key_indices {
381                                let cmp = a[col_idx].cmp(&b[col_idx]);
382                                let cmp = if descending { cmp.reverse() } else { cmp };
383                                if cmp != std::cmp::Ordering::Equal {
384                                    return cmp;
385                                }
386                            }
387                            std::cmp::Ordering::Equal
388                        });
389                        Ok(QueryResult::Rows { columns, rows })
390                    }
391                    _ => Err("sort requires row input".into()),
392                }
393            }
394
395            PlanNode::Limit { input, count } => {
396                let result = self.execute_plan(input)?;
397                let n = match count {
398                    Expr::Literal(Literal::Int(v)) => *v as usize,
399                    _ => return Err("limit must be integer literal".into()),
400                };
401                match result {
402                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
403                        columns,
404                        rows: rows.into_iter().take(n).collect(),
405                    }),
406                    _ => Err("limit requires row input".into()),
407                }
408            }
409
410            PlanNode::Offset { input, count } => {
411                let result = self.execute_plan(input)?;
412                let n = match count {
413                    Expr::Literal(Literal::Int(v)) => *v as usize,
414                    _ => return Err("offset must be integer literal".into()),
415                };
416                match result {
417                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
418                        columns,
419                        rows: rows.into_iter().skip(n).collect(),
420                    }),
421                    _ => Err("offset requires row input".into()),
422                }
423            }
424
425            PlanNode::Aggregate {
426                input,
427                function,
428                field,
429            } => {
430                // Fast path: count() over SeqScan — count rows without any decode
431                if *function == AggFunc::Count {
432                    if let PlanNode::SeqScan { table } = input.as_ref() {
433                        // Auto-refresh a dirty materialized view before
434                        // counting it — otherwise count(View) returns stale
435                        // data after an underlying mutation (F3).
436                        if self.view_registry.is_dirty(table) {
437                            self.refresh_view(table)?;
438                        }
439                        let mut count: i64 = 0;
440                        self.catalog
441                            .for_each_row_raw(table, |_rid, _data| {
442                                count += 1;
443                            })
444                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
445                        return Ok(QueryResult::Scalar(Value::Int(count)));
446                    }
447                    // Fast path: count() over Filter(SeqScan) — try compiled
448                    // predicate first, fall back to decode_column path.
449                    // Skip a predicate carrying a subquery: the raw-bytes
450                    // evaluators here don't materialise subqueries, so
451                    // `count(T filter .x in (...))` would silently count 0
452                    // (F1). Falling through routes it to the generic path
453                    // that resolves the subquery correctly.
454                    if let PlanNode::Filter {
455                        input: inner,
456                        predicate,
457                    } = input.as_ref()
458                    {
459                        if let PlanNode::SeqScan { table } = inner.as_ref() {
460                            if self.view_registry.is_dirty(table) {
461                                self.refresh_view(table)?;
462                            }
463                        }
464                        if let (PlanNode::SeqScan { table }, false) =
465                            (inner.as_ref(), contains_subquery(predicate))
466                        {
467                            let schema = self
468                                .catalog
469                                .schema(table)
470                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
471                                .clone();
472                            let columns: Vec<String> =
473                                schema.columns.iter().map(|c| c.name.clone()).collect();
474                            let fast = FastLayout::new(&schema);
475                            let row_layout = RowLayout::new(&schema);
476
477                            // Try compiled predicate (zero-allocation hot path).
478                            // Handles int leaves, string-eq leaves, AND conjunctions.
479                            if let Some(compiled) =
480                                compile_predicate(predicate, &columns, &fast, &schema)
481                            {
482                                let mut count: i64 = 0;
483                                self.catalog
484                                    .for_each_row_raw(table, |_rid, data| {
485                                        if compiled(data) {
486                                            count += 1;
487                                        }
488                                    })
489                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
490                                return Ok(QueryResult::Scalar(Value::Int(count)));
491                            }
492
493                            // Fallback: decode predicate columns
494                            let pred_cols = predicate_column_indices(predicate, &columns);
495                            let mut count: i64 = 0;
496                            self.catalog
497                                .for_each_row_raw(table, |_rid, data| {
498                                    let pred_row =
499                                        decode_selective(&schema, &row_layout, data, &pred_cols);
500                                    if eval_predicate(predicate, &pred_row, &columns) {
501                                        count += 1;
502                                    }
503                                })
504                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
505
506                            return Ok(QueryResult::Scalar(Value::Int(count)));
507                        }
508                    }
509                }
510
511                // Fast path: sum/avg/min/max over a single fixed-size int
512                // column with an optional compiled filter predicate. Walks
513                // raw row bytes, zero allocation per row.
514                if matches!(
515                    function,
516                    AggFunc::Sum
517                        | AggFunc::Avg
518                        | AggFunc::Min
519                        | AggFunc::Max
520                        | AggFunc::CountDistinct
521                ) {
522                    if let Some(col) = field.as_ref() {
523                        // Shape: Aggregate(SeqScan) or Aggregate(Filter(SeqScan))
524                        let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
525                            match input.as_ref() {
526                                PlanNode::SeqScan { table } => (Some(table.as_str()), None),
527                                PlanNode::Filter {
528                                    input: inner,
529                                    predicate,
530                                } => {
531                                    if let PlanNode::SeqScan { table } = inner.as_ref() {
532                                        (Some(table.as_str()), Some(predicate))
533                                    } else {
534                                        (None, None)
535                                    }
536                                }
537                                _ => (None, None),
538                            };
539                        if let Some(table) = table_opt {
540                            if let Some(result) =
541                                self.agg_single_col_fast(table, col, *function, pred_opt)?
542                            {
543                                return Ok(result);
544                            }
545                        }
546                    }
547                }
548
549                // Fast path: Project(Limit(Filter(SeqScan))) — stream, decode
550                // only projected columns, stop once we hit the limit.
551                // (Handled in the Project branch; this branch only fires when
552                // the aggregate is the outer node.)
553                let result = self.execute_plan(input)?;
554                match result {
555                    QueryResult::Rows { columns, rows } => {
556                        match function {
557                            AggFunc::Count => {
558                                Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
559                            }
560                            AggFunc::CountDistinct => {
561                                let col = field.as_ref().ok_or("count distinct requires field")?;
562                                let idx = columns
563                                    .iter()
564                                    .position(|c| c == col)
565                                    .ok_or("col not found")?;
566                                let mut seen = std::collections::HashSet::new();
567                                for row in &rows {
568                                    let v = &row[idx];
569                                    if !v.is_empty() {
570                                        seen.insert(v.clone());
571                                    }
572                                }
573                                Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
574                            }
575                            AggFunc::Avg => {
576                                let col = field.as_ref().ok_or("avg requires field")?;
577                                let idx = columns
578                                    .iter()
579                                    .position(|c| c == col)
580                                    .ok_or("col not found")?;
581                                let sum: f64 = rows
582                                    .iter()
583                                    .filter_map(|r| match &r[idx] {
584                                        Value::Int(v) => Some(*v as f64),
585                                        Value::Float(v) => Some(*v),
586                                        _ => None,
587                                    })
588                                    .sum();
589                                let count = rows.len() as f64;
590                                Ok(QueryResult::Scalar(Value::Float(sum / count)))
591                            }
592                            AggFunc::Sum => {
593                                let col = field.as_ref().ok_or("sum requires field")?;
594                                let idx = columns
595                                    .iter()
596                                    .position(|c| c == col)
597                                    .ok_or("col not found")?;
598                                // Track int and float contributions separately so
599                                // Float columns (and mixed Int/Float rows) don't get
600                                // silently dropped as they did in the Int-only
601                                // version. If any Float is present, the whole sum
602                                // promotes to Float — matching Avg's semantics.
603                                let mut int_sum: i64 = 0;
604                                let mut float_sum: f64 = 0.0;
605                                let mut saw_float = false;
606                                for r in &rows {
607                                    match &r[idx] {
608                                        Value::Int(v) => int_sum += *v,
609                                        Value::Float(v) => {
610                                            float_sum += *v;
611                                            saw_float = true;
612                                        }
613                                        _ => {}
614                                    }
615                                }
616                                let result = if saw_float {
617                                    Value::Float(float_sum + int_sum as f64)
618                                } else {
619                                    Value::Int(int_sum)
620                                };
621                                Ok(QueryResult::Scalar(result))
622                            }
623                            AggFunc::Min | AggFunc::Max => {
624                                let col = field.as_ref().ok_or("min/max requires field")?;
625                                let idx = columns
626                                    .iter()
627                                    .position(|c| c == col)
628                                    .ok_or("col not found")?;
629                                let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
630                                let result = if *function == AggFunc::Min {
631                                    vals.into_iter().min().cloned()
632                                } else {
633                                    vals.into_iter().max().cloned()
634                                };
635                                Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
636                            }
637                        }
638                    }
639                    _ => Err("aggregate requires row input".into()),
640                }
641            }
642
643            PlanNode::Insert { table, rows } => {
644                // Build + validate EVERY row before inserting any, so a bad
645                // row (unknown/missing/uncoercible field) aborts the whole
646                // statement without a partial write. The WAL fsync happens
647                // once at statement end, so N rows = N appends + 1 fsync.
648                let all_values: Vec<Vec<Value>> = {
649                    let schema = self
650                        .catalog
651                        .schema(table)
652                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
653                    let mut all = Vec::with_capacity(rows.len());
654                    for assignments in rows {
655                        let mut values = vec![Value::Empty; schema.columns.len()];
656                        for a in assignments {
657                            let idx = schema.column_index(&a.field).ok_or_else(|| {
658                                QueryError::ColumnNotFound {
659                                    table: String::new(),
660                                    column: a.field.clone(),
661                                }
662                            })?;
663                            let raw = literal_to_value(&a.value)?;
664                            values[idx] = coerce_value(raw, &schema.columns[idx])?;
665                        }
666                        for col in &schema.columns {
667                            if col.required && matches!(values[col.position as usize], Value::Empty)
668                            {
669                                return Err(QueryError::Execution(format!(
670                                    "column '{}' is required but no value was provided",
671                                    col.name
672                                )));
673                            }
674                        }
675                        all.push(values);
676                    }
677                    all
678                };
679                // Charge the materialized batch against the per-query memory
680                // budget before inserting — keeps multi-row insert consistent
681                // with every other full-materialization point (sort/join/group)
682                // and bounds embedded callers (the server also caps the query
683                // string at 1 MB, but embedded callers have no such limit).
684                self.charge_rows(&all_values)?;
685                let n = all_values.len() as u64;
686                for values in &all_values {
687                    self.catalog
688                        .insert(table, values)
689                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
690                }
691                self.view_registry.mark_dependents_dirty(table);
692                Ok(QueryResult::Modified(n))
693            }
694
695            PlanNode::Upsert {
696                table,
697                key_column,
698                assignments,
699                on_conflict,
700            } => {
701                let (values, key_idx) = {
702                    let schema = self
703                        .catalog
704                        .schema(table)
705                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
706                    let mut values = vec![Value::Empty; schema.columns.len()];
707                    for a in assignments {
708                        let idx = schema.column_index(&a.field).ok_or_else(|| {
709                            QueryError::ColumnNotFound {
710                                table: String::new(),
711                                column: a.field.clone(),
712                            }
713                        })?;
714                        let raw = literal_to_value(&a.value)?;
715                        values[idx] = coerce_value(raw, &schema.columns[idx])?;
716                    }
717                    for col in &schema.columns {
718                        if col.required && matches!(values[col.position as usize], Value::Empty) {
719                            return Err(QueryError::Execution(format!(
720                                "column '{}' is required but no value was provided",
721                                col.name
722                            )));
723                        }
724                    }
725                    let key_idx = schema
726                        .column_index(key_column)
727                        .ok_or_else(|| format!("key column '{key_column}' not found"))?;
728                    (values, key_idx)
729                };
730
731                // Upsert requires the `on` column to be unique — otherwise
732                // there is no well-defined row to overwrite and a plain
733                // insert could silently create duplicate keys.
734                if self.catalog.is_index_unique(table, key_column) != Some(true) {
735                    return Err(QueryError::Execution(format!(
736                        "upsert on .{key_column} requires a unique column (declare it with \
737                         `unique {key_column}: <type>` or `alter {table} add unique .{key_column}`)"
738                    )));
739                }
740
741                let key_value = values[key_idx].clone();
742
743                // Probe the unique index for a conflict.
744                let existing = {
745                    let tbl = self
746                        .catalog
747                        .get_table(table)
748                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
749                    // The key column is guaranteed unique above, so this
750                    // returns at most one matching row.
751                    let rids = tbl.index_lookup_all(key_column, &key_value);
752                    rids.into_iter().next().and_then(|rid| {
753                        tbl.heap
754                            .get(rid)
755                            .map(|data| (rid, decode_row(&tbl.schema, &data)))
756                    })
757                };
758
759                if let Some((rid, mut existing_row)) = existing {
760                    // Conflict: apply on_conflict assignments (or all non-key if empty).
761                    let update_assignments = if on_conflict.is_empty() {
762                        assignments
763                    } else {
764                        on_conflict
765                    };
766                    let changed_cols: Vec<usize> = {
767                        let schema = self
768                            .catalog
769                            .schema(table)
770                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
771                        let mut indices = Vec::new();
772                        for a in update_assignments {
773                            let idx = schema.column_index(&a.field).ok_or_else(|| {
774                                QueryError::ColumnNotFound {
775                                    table: String::new(),
776                                    column: a.field.clone(),
777                                }
778                            })?;
779                            if idx != key_idx {
780                                existing_row[idx] = literal_to_value(&a.value)?;
781                                indices.push(idx);
782                            }
783                        }
784                        indices
785                    };
786                    self.catalog
787                        .update_hinted(table, rid, &existing_row, Some(&changed_cols))
788                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
789                    self.view_registry.mark_dependents_dirty(table);
790                    Ok(QueryResult::Modified(1))
791                } else {
792                    // No conflict: insert.
793                    self.catalog
794                        .insert(table, &values)
795                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
796                    self.view_registry.mark_dependents_dirty(table);
797                    Ok(QueryResult::Modified(1))
798                }
799            }
800
801            PlanNode::Update {
802                input,
803                table,
804                assignments,
805            } => {
806                // Mission C Phase 3: resolve assignments against a borrowed
807                // schema, then drop the borrow before the mutation loop.
808                // Try literal-only path first; fall back to per-row expression
809                // evaluation if any assignment contains a non-literal expression
810                // (e.g., `age := .age + 1`).
811                let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
812                    let schema_ref = self
813                        .catalog
814                        .schema(table)
815                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
816                    let indices: Vec<usize> = assignments
817                        .iter()
818                        .map(|a| {
819                            schema_ref.column_index(&a.field).ok_or_else(|| {
820                                QueryError::ColumnNotFound {
821                                    table: String::new(),
822                                    column: a.field.clone(),
823                                }
824                            })
825                        })
826                        .collect::<Result<_, _>>()?;
827                    let vals: Result<Vec<Value>, _> = assignments
828                        .iter()
829                        .map(|a| literal_to_value(&a.value))
830                        .collect();
831                    (indices, vals.ok())
832                };
833                let resolved_assignments: Option<Vec<(usize, Value)>> =
834                    literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
835
836                // Mission C Phase 2: the hint Table::update_hinted needs to
837                // decide whether to read the old row for index diff.
838                let changed_cols: Vec<usize> = col_indices.clone();
839
840                // ── Fused scan+update for Update(Filter(SeqScan)) ────────
841                // Perf sprint: instead of the two-pass collect-RIDs-then-loop
842                // pattern (which pays one ensure_hot per matched row on the
843                // second pass), fuse the predicate evaluation and in-place
844                // byte-level mutation into a single heap walk. Same idea as
845                // the fused scan_delete_matching path for deletes.
846                if let Some(ref resolved_assignments) = resolved_assignments {
847                    if let PlanNode::Filter {
848                        input: inner,
849                        predicate,
850                    } = input.as_ref()
851                    {
852                        if let PlanNode::SeqScan { table: t } = inner.as_ref() {
853                            if t == table {
854                                let fused_result = self.try_fused_scan_update(
855                                    table,
856                                    predicate,
857                                    resolved_assignments,
858                                    &changed_cols,
859                                );
860                                if let Some(result) = fused_result {
861                                    return result;
862                                }
863                            }
864                        }
865                    }
866                }
867
868                // Collect matching RowIds in a single pass.
869                let matching_rids = self.collect_rids_for_mutation(input, table)?;
870
871                // ── Literal-only fast paths ─────────────────────────────
872                if let Some(ref resolved_assignments) = resolved_assignments {
873                    // Mission C Phase 4: in-place byte-patch fast path. If every
874                    // assignment targets a fixed-size non-null column AND none of
875                    // them is indexed, we can skip decode_row / Vec<Value> /
876                    // encode_row_into entirely and patch the row's raw bytes on
877                    // the hot page.
878                    let fast_patch: Option<Vec<FastPatch>> = {
879                        let tbl = self
880                            .catalog
881                            .get_table(table)
882                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
883                        let schema = &tbl.schema;
884                        let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
885                            is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
886                        });
887                        let no_indexed = !resolved_assignments
888                            .iter()
889                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
890
891                        if all_fixed_nonnull && no_indexed {
892                            let layout = RowLayout::new(schema);
893                            let bitmap_size = layout.bitmap_size();
894                            let patches: Vec<FastPatch> = resolved_assignments
895                                .iter()
896                                .map(|(idx, val)| {
897                                    let fixed_off = layout
898                                        .fixed_offset(*idx)
899                                        .expect("is_fixed_size already checked");
900                                    let field_off = 2 + bitmap_size + fixed_off;
901                                    let bytes: FixedBytes = match val {
902                                        Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
903                                        Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
904                                        Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
905                                        Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
906                                        Value::Uuid(v) => FixedBytes::Uuid(*v),
907                                        _ => unreachable!("all_fixed_nonnull guard lied"),
908                                    };
909                                    FastPatch {
910                                        field_off,
911                                        bitmap_byte_off: 2 + idx / 8,
912                                        bit_mask: 1u8 << (idx % 8),
913                                        bytes,
914                                    }
915                                })
916                                .collect();
917                            Some(patches)
918                        } else {
919                            None
920                        }
921                    };
922
923                    if let Some(patches) = fast_patch {
924                        let mut count = 0u64;
925                        for rid in matching_rids {
926                            // Mission B2: WAL-log every patch so crash
927                            // recovery replays the update. Same mutation
928                            // closure as before — the wrapper just sandwiches
929                            // it between a hot-page read and a WAL append.
930                            let ok = self
931                                .catalog
932                                .update_row_bytes_logged(table, rid, |row| {
933                                    for p in &patches {
934                                        row[p.bitmap_byte_off] &= !p.bit_mask;
935                                        let field_bytes = p.bytes.as_slice();
936                                        row[p.field_off..p.field_off + field_bytes.len()]
937                                            .copy_from_slice(field_bytes);
938                                    }
939                                })
940                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
941                            if ok {
942                                count += 1;
943                            }
944                        }
945                        self.view_registry.mark_dependents_dirty(table);
946                        return Ok(QueryResult::Modified(count));
947                    }
948
949                    // Mission C Phase 10: var-column in-place shrink fast path.
950                    let var_fast: Option<(usize, Option<Vec<u8>>)> = {
951                        let tbl = self
952                            .catalog
953                            .get_table(table)
954                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
955                        let schema = &tbl.schema;
956                        let is_single = resolved_assignments.len() == 1;
957                        let is_var_col = is_single
958                            && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
959                        let no_indexed = !resolved_assignments
960                            .iter()
961                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
962
963                        if is_single && is_var_col && no_indexed {
964                            let (idx, val) = &resolved_assignments[0];
965                            let bytes_opt: Option<Vec<u8>> = match val {
966                                Value::Str(s) => Some(s.as_bytes().to_vec()),
967                                Value::Bytes(b) => Some(b.clone()),
968                                Value::Empty => None,
969                                _ => {
970                                    return Err(QueryError::TypeError(format!(
971                                        "cannot assign non-var value to var column '{}'",
972                                        schema.columns[*idx].name
973                                    )))
974                                }
975                            };
976                            Some((*idx, bytes_opt))
977                        } else {
978                            None
979                        }
980                    };
981
982                    if let Some((col_idx, new_bytes_opt)) = var_fast {
983                        let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
984                        let mut count = 0u64;
985                        let mut fallback_rids: Vec<RowId> = Vec::new();
986                        for rid in &matching_rids {
987                            // Mission B2: logged variant so crash recovery
988                            // replays the shrink. On a false return (row
989                            // would have to grow), the rid is pushed to
990                            // `fallback_rids` and the slower `update_hinted`
991                            // path — which is already WAL-logged — picks it up.
992                            let ok = self
993                                .catalog
994                                .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
995                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
996                            if ok {
997                                count += 1;
998                            } else {
999                                fallback_rids.push(*rid);
1000                            }
1001                        }
1002                        for rid in fallback_rids {
1003                            let mut row = match self.catalog.get(table, rid) {
1004                                Some(r) => r,
1005                                None => continue,
1006                            };
1007                            for (idx, val) in resolved_assignments.iter() {
1008                                row[*idx] = val.clone();
1009                            }
1010                            self.catalog
1011                                .update_hinted(table, rid, &row, Some(&changed_cols))
1012                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1013                            count += 1;
1014                        }
1015                        self.view_registry.mark_dependents_dirty(table);
1016                        return Ok(QueryResult::Modified(count));
1017                    }
1018
1019                    // Generic literal path: decode row, apply literal values.
1020                    let mut count = 0u64;
1021                    for rid in matching_rids {
1022                        let mut row = match self.catalog.get(table, rid) {
1023                            Some(r) => r,
1024                            None => continue,
1025                        };
1026                        for (idx, val) in resolved_assignments.iter() {
1027                            row[*idx] = val.clone();
1028                        }
1029                        self.catalog
1030                            .update_hinted(table, rid, &row, Some(&changed_cols))
1031                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1032                        count += 1;
1033                    }
1034                    self.view_registry.mark_dependents_dirty(table);
1035                    return Ok(QueryResult::Modified(count));
1036                } // end if let Some(resolved_assignments)
1037
1038                // ── Expression-based update path ────────────────────────
1039                // At least one assignment contains a non-literal expression
1040                // (e.g., `age := .age + 1`). Evaluate per-row.
1041                let col_names: Vec<String> = {
1042                    let schema_ref = self
1043                        .catalog
1044                        .schema(table)
1045                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1046                    schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1047                };
1048                let mut count = 0u64;
1049                for rid in matching_rids {
1050                    let mut row = match self.catalog.get(table, rid) {
1051                        Some(r) => r,
1052                        None => continue,
1053                    };
1054                    for (i, asgn) in assignments.iter().enumerate() {
1055                        let val = eval_expr(&asgn.value, &row, &col_names);
1056                        row[col_indices[i]] = val;
1057                    }
1058                    self.catalog
1059                        .update_hinted(table, rid, &row, Some(&changed_cols))
1060                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1061                    count += 1;
1062                }
1063                self.view_registry.mark_dependents_dirty(table);
1064                Ok(QueryResult::Modified(count))
1065            }
1066
1067            PlanNode::Delete { input, table } => {
1068                // Mission C Phase 3: no schema clone — collect_rids_for_mutation
1069                // looks up schema internally when it needs one, and the mutation
1070                // loop doesn't need the schema at all.
1071                //
1072                // Mission C Phase 12: route bulk deletes through
1073                // `Catalog::delete_many`, which batches the btree leaf
1074                // compaction and shares one `ensure_hot` per row between
1075                // the index-key extraction and the slot delete. On
1076                // `delete_by_filter` (100K fixture, ~20K matches) that
1077                // removes ~4ms of pure `Vec::remove` memmove from the btree
1078                // maintenance phase.
1079                //
1080                // Mission C Phase 16: for the common `delete where ...`
1081                // shape (Filter(SeqScan)) — and the rarer "delete
1082                // everything" shape (SeqScan) — skip the two-pass
1083                // `collect_rids_for_mutation` + `delete_many` flow entirely.
1084                // The fused `scan_delete_matching` primitive walks the
1085                // heap exactly once, paying one `ensure_hot` per page
1086                // instead of per-row. That closes the last major gap on
1087                // the bench's `delete_by_filter` workload.
1088                if let PlanNode::Filter {
1089                    input: inner,
1090                    predicate,
1091                } = input.as_ref()
1092                {
1093                    if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1094                        if t == table {
1095                            let schema = self
1096                                .catalog
1097                                .schema(table)
1098                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1099                            let columns: Vec<String> =
1100                                schema.columns.iter().map(|c| c.name.clone()).collect();
1101                            let fast = FastLayout::new(schema);
1102                            if let Some(compiled) =
1103                                compile_predicate(predicate, &columns, &fast, schema)
1104                            {
1105                                // Mission B2: logged variant so every
1106                                // matched rid hits the WAL during the
1107                                // single-pass scan. Structure of the
1108                                // fused scan is unchanged — only the
1109                                // hook closure now also appends.
1110                                let count = self
1111                                    .catalog
1112                                    .scan_delete_matching_logged(table, |data| compiled(data))
1113                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1114                                self.view_registry.mark_dependents_dirty(table);
1115                                return Ok(QueryResult::Modified(count));
1116                            }
1117                        }
1118                    }
1119                } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1120                    if t == table {
1121                        // `delete from T` with no predicate — every live
1122                        // row matches. One pass is still the right shape.
1123                        // Mission B2: logged variant — see above.
1124                        let count = self
1125                            .catalog
1126                            .scan_delete_matching_logged(table, |_| true)
1127                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1128                        self.view_registry.mark_dependents_dirty(table);
1129                        return Ok(QueryResult::Modified(count));
1130                    }
1131                }
1132
1133                let matching_rids = self.collect_rids_for_mutation(input, table)?;
1134                let count = self
1135                    .catalog
1136                    .delete_many(table, &matching_rids)
1137                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1138                self.view_registry.mark_dependents_dirty(table);
1139                Ok(QueryResult::Modified(count))
1140            }
1141
1142            PlanNode::AliasScan { table, alias } => {
1143                // Mission E1.2: scan `table` and rename every output column
1144                // to `alias.field`. Used as a join leaf so downstream
1145                // NestedLoopJoin + Filter + Project nodes can resolve
1146                // `Expr::QualifiedField` lookups by direct column-name match.
1147                //
1148                // We don't bother with a fused zero-copy loop here yet — the
1149                // whole join path is nested-loop and correctness-first
1150                // (Phase E1.3 will introduce hash join and at that point we
1151                // can revisit whether to specialise AliasScan).
1152                let schema = self
1153                    .catalog
1154                    .schema(table)
1155                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1156                    .clone();
1157                let columns: Vec<String> = schema
1158                    .columns
1159                    .iter()
1160                    .map(|c| format!("{alias}.{}", c.name))
1161                    .collect();
1162                let rows: Vec<Vec<Value>> = self
1163                    .catalog
1164                    .scan(table)
1165                    .map_err(|e| QueryError::StorageError(e.to_string()))?
1166                    .map(|(_, row)| row)
1167                    .collect();
1168                Ok(QueryResult::Rows { columns, rows })
1169            }
1170
1171            PlanNode::NestedLoopJoin {
1172                left,
1173                right,
1174                on,
1175                kind,
1176            } => {
1177                // Materialise both sides. The executor ships two strategies:
1178                //   1. Hash join (E1.3) — when the `on` predicate is a
1179                //      simple equi-predicate `left_col = right_col`, build a
1180                //      FxHashMap<Value, Vec<row_idx>> over the right side
1181                //      and probe with the left side. O(L + R) instead of
1182                //      O(L × R). Handles Inner and LeftOuter.
1183                //   2. Nested loop (E1.2) — fallback for Cross, non-equi
1184                //      predicates, or `on` expressions that reference
1185                //      either side with something more complex than a
1186                //      QualifiedField.
1187                let left_result = self.execute_plan(left)?;
1188                let right_result = self.execute_plan(right)?;
1189                let (left_columns, left_rows) = match left_result {
1190                    QueryResult::Rows { columns, rows } => (columns, rows),
1191                    _ => return Err("join left side must produce rows".into()),
1192                };
1193                let (right_columns, right_rows) = match right_result {
1194                    QueryResult::Rows { columns, rows } => (columns, rows),
1195                    _ => return Err("join right side must produce rows".into()),
1196                };
1197
1198                // WS2: byte-budget guard on the join build side. Charge both
1199                // materialized inputs before we build the hash table / probe;
1200                // the output is row-capped by check_join_limit below.
1201                self.charge_rows(&left_rows)?;
1202                self.charge_rows(&right_rows)?;
1203
1204                // Hash-join fast path.
1205                if !matches!(kind, JoinKind::Cross) {
1206                    if let Some(pred) = on {
1207                        if let Some((l_idx, r_idx)) =
1208                            try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1209                        {
1210                            let result = hash_join(
1211                                left_columns,
1212                                left_rows,
1213                                right_columns,
1214                                right_rows,
1215                                l_idx,
1216                                r_idx,
1217                                *kind,
1218                            );
1219                            if let QueryResult::Rows { ref rows, .. } = result {
1220                                check_join_limit(rows.len())?;
1221                            }
1222                            return Ok(result);
1223                        }
1224                    }
1225                }
1226
1227                // Nested-loop fallback.
1228                let n_left = left_columns.len();
1229                let n_right = right_columns.len();
1230                let mut columns = Vec::with_capacity(n_left + n_right);
1231                columns.extend(left_columns);
1232                columns.extend(right_columns);
1233
1234                let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1235                let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1236
1237                for left_row in &left_rows {
1238                    let mut matched = false;
1239                    for right_row in &right_rows {
1240                        combined.clear();
1241                        combined.extend_from_slice(left_row);
1242                        combined.extend_from_slice(right_row);
1243                        let keep = match kind {
1244                            JoinKind::Cross => true,
1245                            JoinKind::Inner | JoinKind::LeftOuter => match on {
1246                                Some(pred) => eval_predicate(pred, &combined, &columns),
1247                                // Missing `on` for non-cross joins is a
1248                                // parser error, but if it slips through we
1249                                // treat it as "match everything".
1250                                None => true,
1251                            },
1252                            // RightOuter is rewritten to LeftOuter by the
1253                            // planner, so we never see it here.
1254                            JoinKind::RightOuter => {
1255                                unreachable!("planner rewrites RightOuter to LeftOuter")
1256                            }
1257                        };
1258                        if keep {
1259                            rows.push(combined.clone());
1260                            check_join_limit(rows.len())?;
1261                            matched = true;
1262                        }
1263                    }
1264                    if !matched && matches!(kind, JoinKind::LeftOuter) {
1265                        let mut row = Vec::with_capacity(n_left + n_right);
1266                        row.extend_from_slice(left_row);
1267                        row.resize(n_left + n_right, Value::Empty);
1268                        rows.push(row);
1269                        check_join_limit(rows.len())?;
1270                    }
1271                }
1272
1273                Ok(QueryResult::Rows { columns, rows })
1274            }
1275
1276            PlanNode::Distinct { input } => {
1277                let result = self.execute_plan(input)?;
1278                match result {
1279                    QueryResult::Rows { columns, rows } => {
1280                        let mut seen = std::collections::HashSet::new();
1281                        let mut unique_rows = Vec::new();
1282                        for row in rows {
1283                            if seen.insert(row.clone()) {
1284                                unique_rows.push(row);
1285                            }
1286                        }
1287                        Ok(QueryResult::Rows {
1288                            columns,
1289                            rows: unique_rows,
1290                        })
1291                    }
1292                    other => Ok(other),
1293                }
1294            }
1295
1296            PlanNode::GroupBy {
1297                input,
1298                keys,
1299                aggregates,
1300                having,
1301            } => {
1302                let result = self.execute_plan(input)?;
1303                match result {
1304                    QueryResult::Rows { columns, rows } => {
1305                        // WS2: byte-budget guard on the GROUP BY input buffer
1306                        // (the hash table is bounded by the input it groups).
1307                        self.charge_rows(&rows)?;
1308                        // Resolve key column indices.
1309                        let key_indices: Vec<usize> = keys
1310                            .iter()
1311                            .map(|k| {
1312                                columns
1313                                    .iter()
1314                                    .position(|c| c == k)
1315                                    .ok_or_else(|| format!("group-by column '{k}' not found"))
1316                            })
1317                            .collect::<Result<Vec<_>, _>>()?;
1318
1319                        // Resolve aggregate field indices. count(*) uses
1320                        // sentinel usize::MAX — compute_group_aggregate
1321                        // treats it as "count all rows in the group".
1322                        let agg_field_indices: Vec<usize> = aggregates
1323                            .iter()
1324                            .map(|a| {
1325                                if a.field == "*" {
1326                                    Ok(usize::MAX)
1327                                } else {
1328                                    columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1329                                        format!("aggregate column '{}' not found", a.field)
1330                                    })
1331                                }
1332                            })
1333                            .collect::<Result<Vec<_>, _>>()?;
1334
1335                        // Group rows by key values (preserving insertion order).
1336                        let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1337                            rustc_hash::FxHashMap::default();
1338                        let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1339                        for (ri, row) in rows.iter().enumerate() {
1340                            let key: Vec<Value> =
1341                                key_indices.iter().map(|&i| row[i].clone()).collect();
1342                            match group_map.get(&key) {
1343                                Some(&idx) => groups[idx].1.push(ri),
1344                                None => {
1345                                    let idx = groups.len();
1346                                    group_map.insert(key.clone(), idx);
1347                                    groups.push((key, vec![ri]));
1348                                }
1349                            }
1350                        }
1351
1352                        // Build output column names: keys ++ aggregate output names.
1353                        let mut out_columns: Vec<String> = keys.clone();
1354                        for agg in aggregates.iter() {
1355                            out_columns.push(agg.output_name.clone());
1356                        }
1357
1358                        // Compute aggregates per group.
1359                        let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1360                        for (key_vals, row_indices) in &groups {
1361                            let mut row = key_vals.clone();
1362                            for (ai, agg) in aggregates.iter().enumerate() {
1363                                let col_idx = agg_field_indices[ai];
1364                                let val = compute_group_aggregate(
1365                                    agg.function,
1366                                    &rows,
1367                                    row_indices,
1368                                    col_idx,
1369                                );
1370                                row.push(val);
1371                            }
1372                            out_rows.push(row);
1373                        }
1374
1375                        // Apply HAVING filter.
1376                        if let Some(having_expr) = having {
1377                            out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1378                        }
1379
1380                        Ok(QueryResult::Rows {
1381                            columns: out_columns,
1382                            rows: out_rows,
1383                        })
1384                    }
1385                    _ => Err("group by requires row input".into()),
1386                }
1387            }
1388
1389            PlanNode::CreateTable { name, fields } => {
1390                let columns: Vec<ColumnDef> = fields
1391                    .iter()
1392                    .enumerate()
1393                    .map(|(i, f)| -> Result<ColumnDef, QueryError> {
1394                        Ok(ColumnDef {
1395                            name: f.name.clone(),
1396                            type_id: type_name_to_id(&f.type_name)
1397                                .map_err(QueryError::TypeError)?,
1398                            required: f.required,
1399                            position: i as u16,
1400                        })
1401                    })
1402                    .collect::<Result<Vec<_>, _>>()?;
1403                let schema = Schema {
1404                    table_name: name.clone(),
1405                    columns,
1406                };
1407                self.catalog
1408                    .create_table(schema)
1409                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1410                // Declaring a field `unique` auto-creates a unique B+tree
1411                // index, which is where uniqueness is enforced on writes.
1412                for f in fields.iter().filter(|f| f.unique) {
1413                    self.catalog
1414                        .create_index_unique(name, &f.name, true)
1415                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1416                }
1417                Ok(QueryResult::Created(name.clone()))
1418            }
1419
1420            PlanNode::AlterTable { table, action } => match action {
1421                AlterAction::AddColumn {
1422                    name,
1423                    type_name,
1424                    required,
1425                } => {
1426                    let position = self
1427                        .catalog
1428                        .schema(table)
1429                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1430                        .columns
1431                        .len() as u16;
1432                    let col = ColumnDef {
1433                        name: name.clone(),
1434                        type_id: type_name_to_id(type_name).map_err(QueryError::TypeError)?,
1435                        required: *required,
1436                        position,
1437                    };
1438                    self.catalog
1439                        .alter_table_add_column(table, col)
1440                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1441                    Ok(QueryResult::Executed {
1442                        message: format!("column '{name}' added to '{table}'"),
1443                    })
1444                }
1445                AlterAction::DropColumn { name } => {
1446                    self.catalog
1447                        .alter_table_drop_column(table, name)
1448                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1449                    Ok(QueryResult::Executed {
1450                        message: format!("column '{name}' dropped from '{table}'"),
1451                    })
1452                }
1453                AlterAction::AddIndex { column } => {
1454                    self.catalog
1455                        .create_index(table, column)
1456                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1457                    Ok(QueryResult::Executed {
1458                        message: format!("index on '{table}.{column}' created"),
1459                    })
1460                }
1461                AlterAction::AddUnique { column } => {
1462                    // No DropIndex exists, so we cannot upgrade an existing
1463                    // non-unique index in place — reject it cleanly.
1464                    if self.catalog.has_index(table, column) {
1465                        return Err(QueryError::Execution(format!(
1466                            "cannot add unique on {table}.{column}: column already indexed"
1467                        )));
1468                    }
1469                    // Scan existing rows for duplicate (non-null) values
1470                    // before creating the unique index.
1471                    {
1472                        let tbl = self
1473                            .catalog
1474                            .get_table(table)
1475                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1476                        let col_idx = tbl.schema.column_index(column).ok_or_else(|| {
1477                            QueryError::ColumnNotFound {
1478                                table: table.to_string(),
1479                                column: column.clone(),
1480                            }
1481                        })?;
1482                        let mut seen = std::collections::HashSet::new();
1483                        for (_, row) in tbl.scan() {
1484                            let v = &row[col_idx];
1485                            if v.is_empty() {
1486                                continue;
1487                            }
1488                            if !seen.insert(v.clone()) {
1489                                return Err(QueryError::Execution(format!(
1490                                    "cannot add unique on {table}.{column}: \
1491                                     duplicate value {v:?} exists"
1492                                )));
1493                            }
1494                        }
1495                    }
1496                    self.catalog
1497                        .create_index_unique(table, column, true)
1498                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1499                    Ok(QueryResult::Executed {
1500                        message: format!("unique index on '{table}.{column}' created"),
1501                    })
1502                }
1503            },
1504
1505            PlanNode::DropTable { name } => {
1506                self.catalog
1507                    .drop_table(name)
1508                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1509                Ok(QueryResult::Executed {
1510                    message: format!("table '{name}' dropped"),
1511                })
1512            }
1513
1514            PlanNode::CreateView { name, query_text } => {
1515                self.create_view(name, query_text)?;
1516                Ok(QueryResult::Executed {
1517                    message: format!("materialized view '{name}' created"),
1518                })
1519            }
1520
1521            PlanNode::RefreshView { name } => {
1522                self.refresh_view(name)?;
1523                Ok(QueryResult::Executed {
1524                    message: format!("materialized view '{name}' refreshed"),
1525                })
1526            }
1527
1528            PlanNode::DropView { name } => {
1529                self.drop_view(name)?;
1530                Ok(QueryResult::Executed {
1531                    message: format!("materialized view '{name}' dropped"),
1532                })
1533            }
1534
1535            PlanNode::Window { input, windows } => {
1536                let result = self.execute_plan(input)?;
1537                execute_window(result, windows)
1538            }
1539
1540            PlanNode::Union { left, right, all } => {
1541                let left_result = self.execute_plan(left)?;
1542                let right_result = self.execute_plan(right)?;
1543                let (left_cols, left_rows) = match left_result {
1544                    QueryResult::Rows { columns, rows } => (columns, rows),
1545                    _ => return Err("UNION requires query results on left side".into()),
1546                };
1547                let (_, right_rows) = match right_result {
1548                    QueryResult::Rows { columns, rows } => (columns, rows),
1549                    _ => return Err("UNION requires query results on right side".into()),
1550                };
1551                let mut combined = left_rows;
1552                if *all {
1553                    // UNION ALL — just concatenate.
1554                    combined.extend(right_rows);
1555                } else {
1556                    // UNION — deduplicate using the same HashSet approach
1557                    // as DISTINCT. Value already implements Hash + Eq.
1558                    let mut seen = std::collections::HashSet::new();
1559                    for row in &combined {
1560                        seen.insert(row.clone());
1561                    }
1562                    for row in right_rows {
1563                        if seen.insert(row.clone()) {
1564                            combined.push(row);
1565                        }
1566                    }
1567                }
1568                Ok(QueryResult::Rows {
1569                    columns: left_cols,
1570                    rows: combined,
1571                })
1572            }
1573
1574            PlanNode::Explain { input } => {
1575                let text = format_plan_tree(input, 0);
1576                Ok(QueryResult::Rows {
1577                    columns: vec!["plan".to_string()],
1578                    rows: text
1579                        .lines()
1580                        .map(|line| vec![Value::Str(line.to_string())])
1581                        .collect(),
1582                })
1583            }
1584
1585            PlanNode::Begin => {
1586                if self.in_transaction {
1587                    return Err(QueryError::Execution(
1588                        "already in a transaction (nested transactions not supported)".into(),
1589                    ));
1590                }
1591                self.in_transaction = true;
1592                Ok(QueryResult::Executed {
1593                    message: "transaction started".to_string(),
1594                })
1595            }
1596
1597            PlanNode::Commit => {
1598                if !self.in_transaction {
1599                    return Err(QueryError::Execution(
1600                        "no active transaction to commit".into(),
1601                    ));
1602                }
1603                self.in_transaction = false;
1604                self.catalog
1605                    .sync_wal()
1606                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1607                Ok(QueryResult::Executed {
1608                    message: "transaction committed".to_string(),
1609                })
1610            }
1611
1612            PlanNode::Rollback => {
1613                if !self.in_transaction {
1614                    return Err(QueryError::Execution(
1615                        "no active transaction to roll back".into(),
1616                    ));
1617                }
1618                self.in_transaction = false;
1619                self.catalog
1620                    .rollback_to_last_sync()
1621                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1622                if let Ok(mut cache) = self.plan_cache.lock() {
1623                    cache.clear();
1624                }
1625                self.view_registry = ViewRegistry::open(self.catalog.data_dir())
1626                    .unwrap_or_else(|_| ViewRegistry::new(self.catalog.data_dir()));
1627                Ok(QueryResult::Executed {
1628                    message: "transaction rolled back".to_string(),
1629                })
1630            }
1631
1632            PlanNode::IndexScan { table, column, key } => {
1633                let key_value = literal_to_value(key)?;
1634                let tbl = self
1635                    .catalog
1636                    .get_table(table)
1637                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1638                let columns: Vec<String> =
1639                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1640
1641                // Fast path: the table has a B-tree on this column.
1642                // Uses index_lookup_all to return ALL matching rows for
1643                // both unique and non-unique indexes.
1644                if tbl.has_index(column) {
1645                    let rids = tbl.index_lookup_all(column, &key_value);
1646                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1647                    for rid in rids {
1648                        if let Some(data) = tbl.heap.get(rid) {
1649                            rows.push(decode_row(&tbl.schema, &data));
1650                        }
1651                    }
1652                    return Ok(QueryResult::Rows { columns, rows });
1653                }
1654
1655                // Fallback: no index on this column. The planner emits IndexScan
1656                // eagerly (it has no visibility into which columns are indexed
1657                // at plan time), so here we must behave like SeqScan+Filter on
1658                // `.col = literal`: return *all* matching rows, not just the
1659                // first one. A non-indexed column isn't necessarily unique.
1660                // We compile the eq predicate once and stream without any
1661                // per-row decode for non-matching rows.
1662                let schema = &tbl.schema;
1663                let fast = FastLayout::new(schema);
1664                let synth_pred = Expr::BinaryOp(
1665                    Box::new(Expr::Field(column.clone())),
1666                    BinOp::Eq,
1667                    Box::new(key.clone()),
1668                );
1669                if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1670                    // Mission F: skip the first 4 Vec doublings.
1671                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1672                    self.catalog
1673                        .for_each_row_raw(table, |_rid, data| {
1674                            if compiled(data) {
1675                                rows.push(decode_row(schema, data));
1676                            }
1677                        })
1678                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1679                    return Ok(QueryResult::Rows { columns, rows });
1680                }
1681
1682                // Last resort: slow eq-check on materialised rows.
1683                let col_idx =
1684                    schema
1685                        .column_index(column)
1686                        .ok_or_else(|| QueryError::ColumnNotFound {
1687                            table: String::new(),
1688                            column: column.clone(),
1689                        })?;
1690                let rows: Vec<Vec<Value>> = tbl
1691                    .scan()
1692                    .filter_map(|(_, row)| {
1693                        if row[col_idx] == key_value {
1694                            Some(row)
1695                        } else {
1696                            None
1697                        }
1698                    })
1699                    .collect();
1700                Ok(QueryResult::Rows { columns, rows })
1701            }
1702
1703            PlanNode::RangeScan {
1704                table,
1705                column,
1706                start,
1707                end,
1708            } => {
1709                let tbl = self
1710                    .catalog
1711                    .get_table(table)
1712                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1713                let columns: Vec<String> =
1714                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1715                let schema = &tbl.schema;
1716
1717                let start_val = match start {
1718                    Some((expr, _)) => Some(literal_to_value(expr)?),
1719                    None => None,
1720                };
1721                let end_val = match end {
1722                    Some((expr, _)) => Some(literal_to_value(expr)?),
1723                    None => None,
1724                };
1725                let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1726                let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1727
1728                // Non-unique index: walk the composite (value, rid) leaf
1729                // chain between prefix bounds, fetch each row from the heap,
1730                // and recheck. The recheck enforces exclusive bounds
1731                // (range_rids is inclusive) and defensively skips any decoded
1732                // null (nulls are never indexed, so they must not match).
1733                if tbl.is_index_unique(column) == Some(false) {
1734                    if let Some(btree) = tbl.index(column) {
1735                        if start_val.is_some() || end_val.is_some() {
1736                            let col_idx = schema.column_index(column).ok_or_else(|| {
1737                                QueryError::ColumnNotFound {
1738                                    table: String::new(),
1739                                    column: column.clone(),
1740                                }
1741                            })?;
1742                            let rids = btree.range_rids(start_val.as_ref(), end_val.as_ref());
1743                            let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1744                            for rid in rids {
1745                                if let Some(data) = tbl.heap.get(rid) {
1746                                    let row = decode_row(schema, &data);
1747                                    if !row[col_idx].is_empty()
1748                                        && range_matches(
1749                                            &row[col_idx],
1750                                            &start_val,
1751                                            start_inclusive,
1752                                            &end_val,
1753                                            end_inclusive,
1754                                        )
1755                                    {
1756                                        rows.push(row);
1757                                    }
1758                                }
1759                            }
1760                            return Ok(QueryResult::Rows { columns, rows });
1761                        }
1762                    }
1763                }
1764
1765                // Range scans use the btree fast path for unique indexes,
1766                // walking raw column-value keys directly.
1767                if tbl.is_index_unique(column) == Some(true) {
1768                    if let Some(btree) = tbl.index(column) {
1769                        let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1770                            (Some(s), Some(e)) => btree.range(s, e).collect(),
1771                            (Some(s), None) => btree.range_from(s),
1772                            (None, Some(e)) => btree.range_to(e),
1773                            (None, None) => {
1774                                let rows: Vec<Vec<Value>> =
1775                                    tbl.scan().map(|(_, row)| row).collect();
1776                                return Ok(QueryResult::Rows { columns, rows });
1777                            }
1778                        };
1779                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1780                        for (key, rid) in hits {
1781                            if !start_inclusive {
1782                                if let Some(ref s) = start_val {
1783                                    if &key == s {
1784                                        continue;
1785                                    }
1786                                }
1787                            }
1788                            if !end_inclusive {
1789                                if let Some(ref e) = end_val {
1790                                    if &key == e {
1791                                        continue;
1792                                    }
1793                                }
1794                            }
1795                            if let Some(data) = tbl.heap.get(rid) {
1796                                rows.push(decode_row(schema, &data));
1797                            }
1798                        }
1799                        return Ok(QueryResult::Rows { columns, rows });
1800                    }
1801                }
1802
1803                // Fallback: no index — synthesize range predicate and scan.
1804                let fast = FastLayout::new(schema);
1805                let synth = synthesize_range_predicate(column, start, end);
1806                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1807                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1808                    self.catalog
1809                        .for_each_row_raw(table, |_rid, data| {
1810                            if compiled(data) {
1811                                rows.push(decode_row(schema, data));
1812                            }
1813                        })
1814                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1815                    return Ok(QueryResult::Rows { columns, rows });
1816                }
1817
1818                let col_idx =
1819                    schema
1820                        .column_index(column)
1821                        .ok_or_else(|| QueryError::ColumnNotFound {
1822                            table: String::new(),
1823                            column: column.clone(),
1824                        })?;
1825                let rows: Vec<Vec<Value>> = tbl
1826                    .scan()
1827                    .filter(|(_, row)| {
1828                        range_matches(
1829                            &row[col_idx],
1830                            &start_val,
1831                            start_inclusive,
1832                            &end_val,
1833                            end_inclusive,
1834                        )
1835                    })
1836                    .map(|(_, row)| row)
1837                    .collect();
1838                Ok(QueryResult::Rows { columns, rows })
1839            }
1840        }
1841    }
1842
1843    // ─── Materialized view operations ──────────────────────────────────────
1844
1845    /// Create a materialized view: execute the source query, store results
1846    /// in a new backing table, and register the view.
1847    fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1848        if self.view_registry.is_view(name) {
1849            return Err(QueryError::ViewError(format!(
1850                "materialized view '{name}' already exists"
1851            )));
1852        }
1853        // Execute the source query to get the result set.
1854        let result = self.execute_powql(query_text)?;
1855        let (columns, rows) = match result {
1856            QueryResult::Rows { columns, rows } => (columns, rows),
1857            _ => return Err("view source query must be a SELECT".into()),
1858        };
1859        // Derive a schema for the backing table from the query result columns.
1860        let schema = self.derive_view_schema(name, &columns, &rows);
1861        // Create the backing table and insert the result rows.
1862        self.catalog
1863            .create_table(schema)
1864            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1865        for row in &rows {
1866            self.catalog
1867                .insert(name, row)
1868                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1869        }
1870        // Determine which base tables this view depends on by parsing the query.
1871        let depends_on = self.extract_view_deps(query_text);
1872        self.view_registry
1873            .register(ViewDef {
1874                name: name.to_string(),
1875                query: query_text.to_string(),
1876                depends_on,
1877                dirty: false,
1878            })
1879            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1880        Ok(())
1881    }
1882
1883    /// Refresh a materialized view: re-execute its source query and replace
1884    /// the backing table's contents.
1885    fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1886        let def = self
1887            .view_registry
1888            .get(name)
1889            .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1890        let query_text = def.query.clone();
1891        // Execute the source query.
1892        let result = self.execute_powql(&query_text)?;
1893        let (_columns, rows) = match result {
1894            QueryResult::Rows { columns, rows } => (columns, rows),
1895            _ => return Err("view source query must be a SELECT".into()),
1896        };
1897        // Clear old data and insert fresh results. Mission B2: logged
1898        // variant — view refreshes are a mutation and crash recovery
1899        // must see them.
1900        self.catalog
1901            .scan_delete_matching_logged(name, |_| true)
1902            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1903        for row in &rows {
1904            self.catalog
1905                .insert(name, row)
1906                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1907        }
1908        self.view_registry.mark_clean(name);
1909        Ok(())
1910    }
1911
1912    /// Drop a materialized view: remove the backing table and unregister.
1913    fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1914        if !self.view_registry.is_view(name) {
1915            return Err(QueryError::ViewError(format!(
1916                "materialized view '{name}' not found"
1917            )));
1918        }
1919        self.view_registry
1920            .unregister(name)
1921            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1922        self.catalog
1923            .drop_table(name)
1924            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1925        Ok(())
1926    }
1927
1928    /// Derive a storage `Schema` for a view's backing table from query
1929    /// result column names and the first row's types.
1930    fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1931        use powdb_storage::types::{ColumnDef, TypeId};
1932        let cols: Vec<ColumnDef> = columns
1933            .iter()
1934            .enumerate()
1935            .map(|(i, col_name)| {
1936                let type_id = rows
1937                    .first()
1938                    .and_then(|row| row.get(i))
1939                    .map(|v| v.type_id())
1940                    .unwrap_or(TypeId::Str);
1941                ColumnDef {
1942                    name: col_name.clone(),
1943                    type_id,
1944                    required: false,
1945                    position: i as u16,
1946                }
1947            })
1948            .collect();
1949        Schema {
1950            table_name: name.to_string(),
1951            columns: cols,
1952        }
1953    }
1954
1955    /// Extract base table dependencies from a view's source query by
1956    /// parsing it and collecting the source table name.
1957    fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1958        use crate::parser::parse;
1959        match parse(query_text) {
1960            Ok(Statement::Query(q)) => {
1961                let mut deps = vec![q.source.clone()];
1962                for j in &q.joins {
1963                    deps.push(j.source.clone());
1964                }
1965                deps
1966            }
1967            _ => Vec::new(),
1968        }
1969    }
1970
1971    // ─── Specialized fast paths ─────────────────────────────────────────────
1972    //
1973    // These methods are helpers for the `execute_plan` match arms above.
1974    // Each returns `Ok(Some(result))` when the fast path fires, `Ok(None)`
1975    // when the shape isn't supported (caller falls back to generic code).
1976
1977    /// Aggregate sum/avg/min/max over a single fixed-size i64 column, with
1978    /// an optional compiled filter predicate. Walks raw row bytes — zero
1979    /// per-row allocation. Uses i128 accumulator for sum/avg overflow safety.
1980    pub(super) fn agg_single_col_fast(
1981        &self,
1982        table: &str,
1983        col: &str,
1984        function: AggFunc,
1985        predicate: Option<&Expr>,
1986    ) -> Result<Option<QueryResult>, QueryError> {
1987        let schema = self
1988            .catalog
1989            .schema(table)
1990            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1991            .clone();
1992        let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1993        let col_idx = match schema.column_index(col) {
1994            Some(i) => i,
1995            None => return Ok(None),
1996        };
1997        // Only fast-path fixed-size numeric columns (Int/Float) for
1998        // sum/avg/min/max/count. Mission D10: Float parity — prior version
1999        // bailed on Float columns, forcing them through the generic row-
2000        // decoding path that allocated a Vec<Value> per row and dispatched
2001        // on Value::cmp for every compare. f64 decode is structurally the
2002        // same as i64 (load 8 bytes, cast), so the fast path handles both.
2003        let col_type = schema.columns[col_idx].type_id;
2004        if col_type != TypeId::Int && col_type != TypeId::Float {
2005            return Ok(None);
2006        }
2007
2008        let fast = FastLayout::new(&schema);
2009        // Mission C Phase 20b: inline the numeric-column reader instead of
2010        // building a `Box<dyn Fn>`. Eliminates 100K vtable dispatches per
2011        // 100K-row agg scan — every reader call folds directly into the
2012        // hot loop below.
2013        let byte_offset = match fast.fixed_offsets[col_idx] {
2014            Some(o) => o,
2015            None => return Ok(None),
2016        };
2017        let bitmap_byte = col_idx / 8;
2018        let bitmap_bit = (col_idx % 8) as u32;
2019        let data_offset = 2 + fast.bitmap_size + byte_offset;
2020
2021        // Optional compiled filter.
2022        let compiled_pred: Option<CompiledPredicate> = match predicate {
2023            Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
2024                Some(c) => Some(c),
2025                None => return Ok(None), // let generic path handle it
2026            },
2027            None => None,
2028        };
2029
2030        // Mission C Phase 20b: specialize the inner loop per aggregate
2031        // function. The previous version ran a `match function { ... }`
2032        // *inside* the closure, which kept LLVM from producing optimal
2033        // scalar code for each variant (agg_max regressed ~23% vs the
2034        // baseline Box<dyn Fn> version even though per-row vtable cost
2035        // should have been strictly lower). Pushing the match out of the
2036        // hot loop lets each specialized body fold cleanly into
2037        // `for_each_row_raw` and removes a captured `AggFunc` + match
2038        // dispatch per row.
2039        //
2040        // Mission D10: same specialisation applies to the Float branch.
2041        // For Min/Max we use `f64::total_cmp` so the result matches
2042        // `Value::Ord` — this is the same ordering ORDER BY and the
2043        // top-N sort fast path use, keeping semantics consistent across
2044        // read paths (NaN compares as greatest, -0.0 < +0.0 for
2045        // deterministic tie-breaking).
2046        //
2047        // Mission D11 Phase 1: each inner loop now splits on presence of
2048        // a predicate (`if let Some(pred) = &compiled_pred`) so the hot
2049        // body never re-tests `Option` per row, and reads column bytes
2050        // via `read_i64_unchecked` / `read_f64_unchecked` helpers that
2051        // drop two bounds checks per row (null bitmap byte + value
2052        // slice). Safety is carried by the `FastLayout` invariant that
2053        // `data_offset + 8 <= row_len` for any fixed-size column; see
2054        // the helper doc comments. Hot loops are macro-generated so the
2055        // with-pred / no-pred split can't drift between variants.
2056        let result = match col_type {
2057            TypeId::Int => match function {
2058                AggFunc::Sum | AggFunc::Avg => {
2059                    let mut sum_i128: i128 = 0;
2060                    let mut count: i64 = 0;
2061                    agg_int_loop!(
2062                        self,
2063                        table,
2064                        compiled_pred,
2065                        bitmap_byte,
2066                        bitmap_bit,
2067                        data_offset,
2068                        |v: i64| {
2069                            count += 1;
2070                            sum_i128 += v as i128;
2071                        }
2072                    );
2073                    if matches!(function, AggFunc::Sum) {
2074                        let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
2075                        QueryResult::Scalar(Value::Int(clamped))
2076                    } else if count == 0 {
2077                        QueryResult::Scalar(Value::Empty)
2078                    } else {
2079                        let avg = (sum_i128 as f64) / (count as f64);
2080                        QueryResult::Scalar(Value::Float(avg))
2081                    }
2082                }
2083                AggFunc::Min => {
2084                    let mut min_v: Option<i64> = None;
2085                    agg_int_loop!(
2086                        self,
2087                        table,
2088                        compiled_pred,
2089                        bitmap_byte,
2090                        bitmap_bit,
2091                        data_offset,
2092                        |v: i64| {
2093                            min_v = Some(match min_v {
2094                                Some(m) => m.min(v),
2095                                None => v,
2096                            });
2097                        }
2098                    );
2099                    QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
2100                }
2101                AggFunc::Max => {
2102                    let mut max_v: Option<i64> = None;
2103                    agg_int_loop!(
2104                        self,
2105                        table,
2106                        compiled_pred,
2107                        bitmap_byte,
2108                        bitmap_bit,
2109                        data_offset,
2110                        |v: i64| {
2111                            max_v = Some(match max_v {
2112                                Some(m) => m.max(v),
2113                                None => v,
2114                            });
2115                        }
2116                    );
2117                    QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
2118                }
2119                AggFunc::Count => {
2120                    let mut count: i64 = 0;
2121                    agg_int_loop!(
2122                        self,
2123                        table,
2124                        compiled_pred,
2125                        bitmap_byte,
2126                        bitmap_bit,
2127                        data_offset,
2128                        |_v: i64| {
2129                            count += 1;
2130                        }
2131                    );
2132                    QueryResult::Scalar(Value::Int(count))
2133                }
2134                AggFunc::CountDistinct => {
2135                    let mut seen = rustc_hash::FxHashSet::default();
2136                    agg_int_loop!(
2137                        self,
2138                        table,
2139                        compiled_pred,
2140                        bitmap_byte,
2141                        bitmap_bit,
2142                        data_offset,
2143                        |v: i64| {
2144                            seen.insert(v);
2145                        }
2146                    );
2147                    QueryResult::Scalar(Value::Int(seen.len() as i64))
2148                }
2149            },
2150            TypeId::Float => match function {
2151                AggFunc::Sum => {
2152                    // Use a single f64 accumulator. Naive summation is
2153                    // sufficient for MVP parity; if precision becomes an
2154                    // issue on long scans we can upgrade to Kahan–Neumaier
2155                    // compensated sum (~2x scalar cost, zero error growth).
2156                    let mut sum: f64 = 0.0;
2157                    agg_float_loop!(
2158                        self,
2159                        table,
2160                        compiled_pred,
2161                        bitmap_byte,
2162                        bitmap_bit,
2163                        data_offset,
2164                        |v: f64| {
2165                            sum += v;
2166                        }
2167                    );
2168                    QueryResult::Scalar(Value::Float(sum))
2169                }
2170                AggFunc::Avg => {
2171                    let mut sum: f64 = 0.0;
2172                    let mut count: i64 = 0;
2173                    agg_float_loop!(
2174                        self,
2175                        table,
2176                        compiled_pred,
2177                        bitmap_byte,
2178                        bitmap_bit,
2179                        data_offset,
2180                        |v: f64| {
2181                            sum += v;
2182                            count += 1;
2183                        }
2184                    );
2185                    if count == 0 {
2186                        QueryResult::Scalar(Value::Empty)
2187                    } else {
2188                        QueryResult::Scalar(Value::Float(sum / count as f64))
2189                    }
2190                }
2191                AggFunc::Min => {
2192                    // `total_cmp` for deterministic NaN handling (matches
2193                    // Value::Ord). NaN compares greatest, so Min will
2194                    // correctly ignore it in favour of any finite value.
2195                    let mut min_v: Option<f64> = None;
2196                    agg_float_loop!(
2197                        self,
2198                        table,
2199                        compiled_pred,
2200                        bitmap_byte,
2201                        bitmap_bit,
2202                        data_offset,
2203                        |v: f64| {
2204                            min_v = Some(match min_v {
2205                                Some(m) => {
2206                                    if v.total_cmp(&m).is_lt() {
2207                                        v
2208                                    } else {
2209                                        m
2210                                    }
2211                                }
2212                                None => v,
2213                            });
2214                        }
2215                    );
2216                    QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2217                }
2218                AggFunc::Max => {
2219                    let mut max_v: Option<f64> = None;
2220                    agg_float_loop!(
2221                        self,
2222                        table,
2223                        compiled_pred,
2224                        bitmap_byte,
2225                        bitmap_bit,
2226                        data_offset,
2227                        |v: f64| {
2228                            max_v = Some(match max_v {
2229                                Some(m) => {
2230                                    if v.total_cmp(&m).is_gt() {
2231                                        v
2232                                    } else {
2233                                        m
2234                                    }
2235                                }
2236                                None => v,
2237                            });
2238                        }
2239                    );
2240                    QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2241                }
2242                AggFunc::Count => {
2243                    let mut count: i64 = 0;
2244                    agg_float_loop!(
2245                        self,
2246                        table,
2247                        compiled_pred,
2248                        bitmap_byte,
2249                        bitmap_bit,
2250                        data_offset,
2251                        |_v: f64| {
2252                            count += 1;
2253                        }
2254                    );
2255                    QueryResult::Scalar(Value::Int(count))
2256                }
2257                AggFunc::CountDistinct => {
2258                    // Hash on `f64::to_bits` — matches `Value::Hash`, so
2259                    // distinct NaN bit patterns count as distinct and
2260                    // -0.0/+0.0 count as distinct. Consistent with how
2261                    // Float values are hashed in every other DISTINCT /
2262                    // GROUP BY path.
2263                    let mut seen = rustc_hash::FxHashSet::default();
2264                    agg_float_loop!(
2265                        self,
2266                        table,
2267                        compiled_pred,
2268                        bitmap_byte,
2269                        bitmap_bit,
2270                        data_offset,
2271                        |v: f64| {
2272                            seen.insert(v.to_bits());
2273                        }
2274                    );
2275                    QueryResult::Scalar(Value::Int(seen.len() as i64))
2276                }
2277            },
2278            _ => unreachable!("type guard above restricts to Int/Float"),
2279        };
2280        Ok(Some(result))
2281    }
2282
2283    /// `Project(Limit(Filter(SeqScan)))` and `Project(Limit(SeqScan))`.
2284    /// Streams rows, decodes only projected columns, stops at the limit.
2285    pub(super) fn project_filter_limit_fast(
2286        &self,
2287        table: &str,
2288        fields: &[ProjectField],
2289        limit: usize,
2290        predicate: Option<&Expr>,
2291    ) -> Result<Option<QueryResult>, QueryError> {
2292        let schema = self
2293            .catalog
2294            .schema(table)
2295            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2296            .clone();
2297        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2298
2299        // Each projection field must be a simple `.field` reference for this
2300        // fast path. Aliased or computed fields fall through.
2301        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2302        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2303        for f in fields {
2304            let name = match &f.expr {
2305                Expr::Field(n) => n.clone(),
2306                _ => return Ok(None),
2307            };
2308            let idx = match all_columns.iter().position(|c| c == &name) {
2309                Some(i) => i,
2310                None => return Ok(None),
2311            };
2312            proj_indices.push(idx);
2313            proj_columns.push(f.alias.clone().unwrap_or(name));
2314        }
2315
2316        let fast = FastLayout::new(&schema);
2317        let row_layout = RowLayout::new(&schema);
2318
2319        let compiled_pred: Option<CompiledPredicate> = match predicate {
2320            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2321                Some(c) => Some(c),
2322                None => return Ok(None),
2323            },
2324            None => None,
2325        };
2326
2327        let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2328        // Mission D2: use try_for_each_row_raw to actually stop iterating
2329        // once the limit is reached. The previous `done` flag only short-
2330        // circuited the closure body, so a `limit 100` over 100K rows still
2331        // walked all 100K slots — burning ~30x SQLite on scan_filter_project_top100.
2332        self.catalog
2333            .try_for_each_row_raw(table, |_rid, data| {
2334                use std::ops::ControlFlow;
2335                if let Some(ref pred) = compiled_pred {
2336                    if !pred(data) {
2337                        return ControlFlow::Continue(());
2338                    }
2339                }
2340                let row: Vec<Value> = proj_indices
2341                    .iter()
2342                    .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2343                    .collect();
2344                out.push(row);
2345                if out.len() >= limit {
2346                    ControlFlow::Break(())
2347                } else {
2348                    ControlFlow::Continue(())
2349                }
2350            })
2351            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2352
2353        Ok(Some(QueryResult::Rows {
2354            columns: proj_columns,
2355            rows: out,
2356        }))
2357    }
2358
2359    /// `Project(Limit(Sort(Filter(SeqScan))))` and `Project(Limit(Sort(SeqScan)))`.
2360    /// Bounded top-N heap over the sort key. Only the sort key needs to be
2361    /// read per row; projected columns are decoded only for the final
2362    /// winning rows when the heap drains.
2363    pub(super) fn project_filter_sort_limit_fast(
2364        &self,
2365        table: &str,
2366        fields: &[ProjectField],
2367        sort_field: &str,
2368        descending: bool,
2369        limit: usize,
2370        predicate: Option<&Expr>,
2371    ) -> Result<Option<QueryResult>, QueryError> {
2372        if limit == 0 {
2373            // Degenerate case — empty result. Let the generic path handle it
2374            // for proper column naming.
2375            return Ok(None);
2376        }
2377        let schema = self
2378            .catalog
2379            .schema(table)
2380            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2381            .clone();
2382        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2383
2384        // Sort key must be a fixed-size numeric column (Int or Float).
2385        // Mission D10: extended from Int-only. Float sort keys use a
2386        // sortable-u64 transform (see `f64_to_sortable_u64`) so the heap
2387        // path stays keyed on `u64` and the whole branch shape is
2388        // identical to the Int case — no new heap types, no `total_cmp`
2389        // closures in the hot loop.
2390        let sort_idx = match schema.column_index(sort_field) {
2391            Some(i) => i,
2392            None => return Ok(None),
2393        };
2394        let sort_col_type = schema.columns[sort_idx].type_id;
2395        if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2396            return Ok(None);
2397        }
2398
2399        // Each projection field must be a simple `.field`.
2400        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2401        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2402        for f in fields {
2403            let name = match &f.expr {
2404                Expr::Field(n) => n.clone(),
2405                _ => return Ok(None),
2406            };
2407            let idx = match all_columns.iter().position(|c| c == &name) {
2408                Some(i) => i,
2409                None => return Ok(None),
2410            };
2411            proj_indices.push(idx);
2412            proj_columns.push(f.alias.clone().unwrap_or(name));
2413        }
2414
2415        let fast = FastLayout::new(&schema);
2416        let row_layout = RowLayout::new(&schema);
2417        // Mission C Phase 20b: inline numeric-column reader (no Box<dyn Fn>).
2418        let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2419            Some(o) => o,
2420            None => return Ok(None),
2421        };
2422        let sort_bitmap_byte = sort_idx / 8;
2423        let sort_bitmap_bit = (sort_idx % 8) as u32;
2424        let sort_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2425
2426        let compiled_pred: Option<CompiledPredicate> = match predicate {
2427            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2428                Some(c) => Some(c),
2429                None => return Ok(None),
2430            },
2431            None => None,
2432        };
2433
2434        // Bounded top-N heap. For `order .x desc limit N`, we want the N
2435        // largest values — use a min-heap so the smallest is at the top and
2436        // can be popped when a better candidate arrives. For ascending, use
2437        // a max-heap. We tie-break with a monotonic `seq` counter so the
2438        // result is deterministic and stable.
2439        //
2440        // To keep this simple we maintain two typed heaps and pick by
2441        // direction.
2442        let drained: Vec<Vec<u8>> = match sort_col_type {
2443            TypeId::Int => {
2444                let mut seq: u64 = 0;
2445                let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2446                    BinaryHeap::with_capacity(limit);
2447                let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2448                    BinaryHeap::with_capacity(limit);
2449
2450                self.catalog
2451                    .for_each_row_raw(table, |_rid, data| {
2452                        if let Some(ref pred) = compiled_pred {
2453                            if !pred(data) {
2454                                return;
2455                            }
2456                        }
2457                        // Inlined int-column reader: null check + i64 decode.
2458                        if data.len() < sort_data_offset + 8 {
2459                            return;
2460                        }
2461                        let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2462                        if is_null {
2463                            return;
2464                        }
2465                        let key = i64::from_le_bytes(
2466                            data[sort_data_offset..sort_data_offset + 8]
2467                                .try_into()
2468                                .unwrap_or_else(|_| unreachable!()),
2469                        );
2470                        let id = seq;
2471                        seq += 1;
2472
2473                        if descending {
2474                            if heap_desc.len() < limit {
2475                                heap_desc.push(Reverse((key, id, data.to_vec())));
2476                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2477                                if key > *top_key {
2478                                    heap_desc.pop();
2479                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2480                                }
2481                            }
2482                        } else if heap_asc.len() < limit {
2483                            heap_asc.push((key, id, data.to_vec()));
2484                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2485                            if key < *top_key {
2486                                heap_asc.pop();
2487                                heap_asc.push((key, id, data.to_vec()));
2488                            }
2489                        }
2490                    })
2491                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2492
2493                let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2494                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2495                } else {
2496                    heap_asc.into_iter().collect()
2497                };
2498                if descending {
2499                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2500                } else {
2501                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2502                }
2503                drained.into_iter().map(|(_, _, d)| d).collect()
2504            }
2505            TypeId::Float => {
2506                // Novel angle: rather than introducing a `TotalF64` newtype
2507                // with `Ord via total_cmp`, transform the f64 bit pattern
2508                // into a sortable `u64` so `BinaryHeap<u64>` orders exactly
2509                // like `f64::total_cmp` would. Classic trick: flip the sign
2510                // bit on positives, flip all bits on negatives. Result:
2511                // - NaN (sign=0) stays greatest, matching total_cmp
2512                // - -0.0 sorts before +0.0, matching total_cmp
2513                // - Hot loop is branch-cheap (one compare + one xor)
2514                let mut seq: u64 = 0;
2515                let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2516                    BinaryHeap::with_capacity(limit);
2517                let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2518                    BinaryHeap::with_capacity(limit);
2519
2520                self.catalog
2521                    .for_each_row_raw(table, |_rid, data| {
2522                        if let Some(ref pred) = compiled_pred {
2523                            if !pred(data) {
2524                                return;
2525                            }
2526                        }
2527                        if data.len() < sort_data_offset + 8 {
2528                            return;
2529                        }
2530                        let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2531                        if is_null {
2532                            return;
2533                        }
2534                        let bits = u64::from_le_bytes(
2535                            data[sort_data_offset..sort_data_offset + 8]
2536                                .try_into()
2537                                .unwrap_or_else(|_| unreachable!()),
2538                        );
2539                        let key = f64_bits_to_sortable_u64(bits);
2540                        let id = seq;
2541                        seq += 1;
2542
2543                        if descending {
2544                            if heap_desc.len() < limit {
2545                                heap_desc.push(Reverse((key, id, data.to_vec())));
2546                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2547                                if key > *top_key {
2548                                    heap_desc.pop();
2549                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2550                                }
2551                            }
2552                        } else if heap_asc.len() < limit {
2553                            heap_asc.push((key, id, data.to_vec()));
2554                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2555                            if key < *top_key {
2556                                heap_asc.pop();
2557                                heap_asc.push((key, id, data.to_vec()));
2558                            }
2559                        }
2560                    })
2561                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2562
2563                let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2564                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2565                } else {
2566                    heap_asc.into_iter().collect()
2567                };
2568                if descending {
2569                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2570                } else {
2571                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2572                }
2573                drained.into_iter().map(|(_, _, d)| d).collect()
2574            }
2575            _ => unreachable!("type guard above restricts to Int/Float"),
2576        };
2577
2578        let rows: Vec<Vec<Value>> = drained
2579            .into_iter()
2580            .map(|data| {
2581                proj_indices
2582                    .iter()
2583                    .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2584                    .collect()
2585            })
2586            .collect();
2587
2588        Ok(Some(QueryResult::Rows {
2589            columns: proj_columns,
2590            rows,
2591        }))
2592    }
2593
2594    /// Gather the RowIds that a mutation should operate on, without
2595    /// materialising the full row set. Handles the shapes the planner emits
2596    /// for update/delete: SeqScan, IndexScan, and Filter(SeqScan). Other
2597    /// shapes fall back to `generic_rid_match`.
2598    ///
2599    /// Perf sprint: try to fuse the predicate evaluation and in-place
2600    /// byte-level mutation into a single heap walk. Returns `Some(result)`
2601    /// if the fused path fired, `None` to fall through to the generic
2602    /// two-pass code.
2603    ///
2604    /// Covers two shapes:
2605    /// 1. Fixed-width non-null literal assignments on non-indexed columns
2606    ///    → byte-patch every matched row in place (row length unchanged).
2607    /// 2. Single var-col literal assignment on a non-indexed column
2608    ///    → `patch_var_column_in_place` on every matched row (may shrink);
2609    ///    rows that can't be patched in place are collected for fallback.
2610    fn try_fused_scan_update(
2611        &mut self,
2612        table: &str,
2613        predicate: &Expr,
2614        resolved: &[(usize, Value)],
2615        changed_cols: &[usize],
2616    ) -> Option<Result<QueryResult, QueryError>> {
2617        // Build compiled predicate. Requires a schema borrow that must be
2618        // dropped before we call scan_patch_matching_logged.
2619        let compiled = {
2620            let schema = self.catalog.schema(table)?;
2621            let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2622            let fast = FastLayout::new(schema);
2623            compile_predicate(predicate, &columns, &fast, schema)?
2624        };
2625
2626        // ── Path 1: fixed-width fast patch ──────────────────────────
2627        let fixed_patches: Option<Vec<FastPatch>> = {
2628            let tbl = self.catalog.get_table(table)?;
2629            let schema = &tbl.schema;
2630            let all_fixed_nonnull = resolved
2631                .iter()
2632                .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2633            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2634            if all_fixed_nonnull && no_indexed {
2635                let layout = RowLayout::new(schema);
2636                let bitmap_size = layout.bitmap_size();
2637                Some(
2638                    resolved
2639                        .iter()
2640                        .map(|(idx, val)| {
2641                            let fixed_off = layout
2642                                .fixed_offset(*idx)
2643                                .expect("is_fixed_size already checked");
2644                            let field_off = 2 + bitmap_size + fixed_off;
2645                            let bytes: FixedBytes = match val {
2646                                Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2647                                Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2648                                Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2649                                Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2650                                Value::Uuid(v) => FixedBytes::Uuid(*v),
2651                                _ => unreachable!("all_fixed_nonnull guard"),
2652                            };
2653                            FastPatch {
2654                                field_off,
2655                                bitmap_byte_off: 2 + idx / 8,
2656                                bit_mask: 1u8 << (idx % 8),
2657                                bytes,
2658                            }
2659                        })
2660                        .collect(),
2661                )
2662            } else {
2663                None
2664            }
2665        };
2666        if let Some(patches) = fixed_patches {
2667            let result = self
2668                .catalog
2669                .scan_patch_matching_logged(table, compiled, |row| {
2670                    for p in &patches {
2671                        row[p.bitmap_byte_off] &= !p.bit_mask;
2672                        let field_bytes = p.bytes.as_slice();
2673                        row[p.field_off..p.field_off + field_bytes.len()]
2674                            .copy_from_slice(field_bytes);
2675                    }
2676                    Some(row.len() as u16)
2677                })
2678                .map_err(|e| e.to_string());
2679            match result {
2680                Ok((count, _)) => {
2681                    self.view_registry.mark_dependents_dirty(table);
2682                    return Some(Ok(QueryResult::Modified(count)));
2683                }
2684                Err(e) => return Some(Err(QueryError::Execution(e))),
2685            }
2686        }
2687
2688        // ── Path 2: single var-col shrink fast patch ────────────────
2689        let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2690            let tbl = self.catalog.get_table(table)?;
2691            let schema = &tbl.schema;
2692            let is_single = resolved.len() == 1;
2693            let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2694            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2695            if is_single && is_var && no_indexed {
2696                let (idx, val) = &resolved[0];
2697                let bytes_opt = match val {
2698                    Value::Str(s) => Some(s.as_bytes().to_vec()),
2699                    Value::Bytes(b) => Some(b.clone()),
2700                    Value::Empty => None,
2701                    _ => return None, // type mismatch, fall through
2702                };
2703                Some((*idx, bytes_opt))
2704            } else {
2705                None
2706            }
2707        };
2708        if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2709            // Build a fresh RowLayout before the mutable borrow.
2710            let layout = {
2711                let schema = self.catalog.schema(table)?;
2712                RowLayout::new(schema)
2713            };
2714            let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2715            let result = self
2716                .catalog
2717                .scan_patch_matching_logged(table, compiled, |row| {
2718                    patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2719                })
2720                .map_err(|e| e.to_string());
2721            match result {
2722                Ok((mut count, fallback_rids)) => {
2723                    // Handle rows where in-place patch failed (new > old).
2724                    for rid in fallback_rids {
2725                        let mut row = match self.catalog.get(table, rid) {
2726                            Some(r) => r,
2727                            None => continue,
2728                        };
2729                        for (idx, val) in resolved.iter() {
2730                            row[*idx] = val.clone();
2731                        }
2732                        if let Err(e) =
2733                            self.catalog
2734                                .update_hinted(table, rid, &row, Some(changed_cols))
2735                        {
2736                            return Some(Err(QueryError::StorageError(e.to_string())));
2737                        }
2738                        count += 1;
2739                    }
2740                    self.view_registry.mark_dependents_dirty(table);
2741                    return Some(Ok(QueryResult::Modified(count)));
2742                }
2743                Err(e) => return Some(Err(QueryError::Execution(e))),
2744            }
2745        }
2746
2747        None // no fused path applicable — fall through
2748    }
2749
2750    /// Mission C Phase 3: schema is looked up via `self.catalog.schema(table)`
2751    /// inside the branches that actually need it. Previously the caller had
2752    /// to clone the full Schema (6+ String allocs) before every mutation just
2753    /// so this function could borrow it — a cost the update/delete hot path
2754    /// did not need.
2755    fn collect_rids_for_mutation(
2756        &mut self,
2757        input: &PlanNode,
2758        table: &str,
2759    ) -> Result<Vec<RowId>, QueryError> {
2760        match input {
2761            PlanNode::SeqScan { table: t } if t == table => {
2762                // "Update/delete everything" — rare but legal.
2763                let rids: Vec<RowId> = self
2764                    .catalog
2765                    .scan(table)
2766                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2767                    .map(|(rid, _)| rid)
2768                    .collect();
2769                Ok(rids)
2770            }
2771            PlanNode::IndexScan {
2772                table: t,
2773                column,
2774                key,
2775            } if t == table => {
2776                let key_value = literal_to_value(key)?;
2777
2778                // Indexed case: single lookup, 0 or 1 rows.
2779                // Mission D7: int-specialized fast path on int-keyed indexes
2780                // (primary keys, created_at, etc.) — the common case for
2781                // `update_by_pk` / `delete where id = ?`.
2782                //
2783                // Scope the `tbl` borrow so it's released before we fall
2784                // through to the scan-based paths below (which reborrow
2785                // `self.catalog`).
2786                {
2787                    let tbl = self
2788                        .catalog
2789                        .get_table(table)
2790                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2791                    if tbl.has_index(column) {
2792                        let rids = tbl.index_lookup_all(column, &key_value);
2793                        return Ok(rids);
2794                    }
2795                }
2796
2797                // No index: the planner folds `.col = literal` to IndexScan
2798                // regardless of whether the column is actually unique. When
2799                // there's no index we must behave like Filter(SeqScan) and
2800                // return *all* matching RIDs — not just the first one.
2801                let schema = self
2802                    .catalog
2803                    .schema(table)
2804                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2805                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2806                let fast = FastLayout::new(schema);
2807                let synth = Expr::BinaryOp(
2808                    Box::new(Expr::Field(column.clone())),
2809                    BinOp::Eq,
2810                    Box::new(key.clone()),
2811                );
2812                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2813                    // Mission F: skip the first 4 Vec doublings.
2814                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2815                    self.catalog
2816                        .for_each_row_raw(table, |rid, data| {
2817                            if compiled(data) {
2818                                rids.push(rid);
2819                            }
2820                        })
2821                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2822                    return Ok(rids);
2823                }
2824
2825                // Fallback: decode each row, compare values.
2826                let col_idx =
2827                    schema
2828                        .column_index(column)
2829                        .ok_or_else(|| QueryError::ColumnNotFound {
2830                            table: String::new(),
2831                            column: column.clone(),
2832                        })?;
2833                let rids: Vec<RowId> = self
2834                    .catalog
2835                    .scan(table)
2836                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2837                    .filter_map(|(rid, row)| {
2838                        if row[col_idx] == key_value {
2839                            Some(rid)
2840                        } else {
2841                            None
2842                        }
2843                    })
2844                    .collect();
2845                Ok(rids)
2846            }
2847            PlanNode::Filter {
2848                input: inner,
2849                predicate,
2850            } => {
2851                if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2852                    if t != table {
2853                        return self.generic_rid_match(input, table);
2854                    }
2855                    let schema = self
2856                        .catalog
2857                        .schema(table)
2858                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2859                    let columns: Vec<String> =
2860                        schema.columns.iter().map(|c| c.name.clone()).collect();
2861                    let fast = FastLayout::new(schema);
2862                    let row_layout = RowLayout::new(schema);
2863
2864                    // Try compiled predicate first.
2865                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2866                        // Mission F: skip the first 4 Vec doublings.
2867                        let mut rids: Vec<RowId> = Vec::with_capacity(64);
2868                        self.catalog
2869                            .for_each_row_raw(table, |rid, data| {
2870                                if compiled(data) {
2871                                    rids.push(rid);
2872                                }
2873                            })
2874                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2875                        return Ok(rids);
2876                    }
2877
2878                    // Fallback: selective decode + eval.
2879                    let pred_cols = predicate_column_indices(predicate, &columns);
2880                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2881                    self.catalog
2882                        .for_each_row_raw(table, |rid, data| {
2883                            let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2884                            if eval_predicate(predicate, &pred_row, &columns) {
2885                                rids.push(rid);
2886                            }
2887                        })
2888                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2889                    return Ok(rids);
2890                }
2891                self.generic_rid_match(input, table)
2892            }
2893            _ => self.generic_rid_match(input, table),
2894        }
2895    }
2896
2897    /// Last-ditch generic match: execute the plan, collect matching rows,
2898    /// then find corresponding RowIds by value equality. This is the old
2899    /// O(N*M) code path; only used when the plan shape is something exotic.
2900    fn generic_rid_match(
2901        &mut self,
2902        input: &PlanNode,
2903        table: &str,
2904    ) -> Result<Vec<RowId>, QueryError> {
2905        let result = self.execute_plan(input)?;
2906        let rows = match result {
2907            QueryResult::Rows { rows, .. } => rows,
2908            _ => return Err("mutation source must be rows".into()),
2909        };
2910        let matching: Vec<RowId> = self
2911            .catalog
2912            .scan(table)
2913            .map_err(|e| QueryError::StorageError(e.to_string()))?
2914            .filter(|(_, row)| rows.iter().any(|r| r == row))
2915            .map(|(rid, _)| rid)
2916            .collect();
2917        Ok(matching)
2918    }
2919}
2920
2921pub(super) fn execute_window(
2922    result: QueryResult,
2923    windows: &[WindowDef],
2924) -> Result<QueryResult, QueryError> {
2925    let (mut columns, mut rows) = match result {
2926        QueryResult::Rows { columns, rows } => (columns, rows),
2927        _ => return Err("window function requires row input".into()),
2928    };
2929
2930    for wdef in windows {
2931        // Resolve partition/order column indices against current columns.
2932        let part_indices: Vec<usize> = wdef
2933            .partition_by
2934            .iter()
2935            .map(|name| {
2936                columns
2937                    .iter()
2938                    .position(|c| c == name)
2939                    .ok_or_else(|| format!("window partition column '{name}' not found"))
2940            })
2941            .collect::<Result<Vec<_>, _>>()?;
2942
2943        let ord_indices: Vec<(usize, bool)> = wdef
2944            .order_by
2945            .iter()
2946            .map(|sk| {
2947                columns
2948                    .iter()
2949                    .position(|c| c == &sk.field)
2950                    .map(|i| (i, sk.descending))
2951                    .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2952            })
2953            .collect::<Result<Vec<_>, _>>()?;
2954
2955        // Resolve the argument column index (for aggregate windows).
2956        let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2957            match arg {
2958                Expr::Field(name) => {
2959                    if name == "*" {
2960                        None // count(*) style — no specific column
2961                    } else {
2962                        Some(
2963                            columns
2964                                .iter()
2965                                .position(|c| c == name)
2966                                .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2967                        )
2968                    }
2969                }
2970                _ => None,
2971            }
2972        } else {
2973            None
2974        };
2975
2976        // Build a sort-index to sort rows by partition_by then order_by
2977        // without actually reordering the original Vec (we need original
2978        // order to write results back).
2979        let n = rows.len();
2980        let mut indices: Vec<usize> = (0..n).collect();
2981        indices.sort_by(|&a, &b| {
2982            // Compare partition keys first.
2983            for &pi in &part_indices {
2984                let cmp = rows[a][pi].cmp(&rows[b][pi]);
2985                if cmp != std::cmp::Ordering::Equal {
2986                    return cmp;
2987                }
2988            }
2989            // Then order keys.
2990            for &(oi, desc) in &ord_indices {
2991                let cmp = rows[a][oi].cmp(&rows[b][oi]);
2992                if cmp != std::cmp::Ordering::Equal {
2993                    return if desc { cmp.reverse() } else { cmp };
2994                }
2995            }
2996            std::cmp::Ordering::Equal
2997        });
2998
2999        // SQL window-frame semantics: with no `order` clause the frame for an
3000        // aggregate window is the ENTIRE partition, not the running prefix.
3001        // The loop below computes running values; for the no-order case we
3002        // back-fill every row of a partition with the partition's final
3003        // (i.e. complete) aggregate once its boundary is reached. Ranking
3004        // functions are untouched — row_number/rank/dense_rank are inherently
3005        // positional.
3006        let whole_partition_frame = wdef.order_by.is_empty()
3007            && matches!(
3008                wdef.function,
3009                WindowFunc::Sum
3010                    | WindowFunc::Avg
3011                    | WindowFunc::Count
3012                    | WindowFunc::Min
3013                    | WindowFunc::Max
3014            );
3015        // Original row indices of the partition currently being scanned
3016        // (only tracked when back-filling is needed).
3017        let mut partition_row_indices: Vec<usize> = Vec::new();
3018
3019        // Compute window values in sorted order, tracking partition boundaries.
3020        let mut win_values: Vec<Value> = vec![Value::Empty; n];
3021        let mut partition_start = 0usize;
3022        // Running state for aggregate windows:
3023        let mut running_count: i64 = 0;
3024        let mut running_int_sum: i64 = 0;
3025        let mut running_float_sum: f64 = 0.0;
3026        let mut running_saw_float = false;
3027        let mut running_min: Option<Value> = None;
3028        let mut running_max: Option<Value> = None;
3029        let mut rank_counter: i64 = 0;
3030        let mut dense_rank_counter: i64 = 0;
3031        let mut prev_order_key: Option<Vec<Value>> = None;
3032        let mut same_rank_count: i64 = 0;
3033
3034        for sorted_pos in 0..n {
3035            let row_idx = indices[sorted_pos];
3036
3037            // Detect partition boundary.
3038            let new_partition = if sorted_pos == 0 {
3039                true
3040            } else {
3041                let prev_row_idx = indices[sorted_pos - 1];
3042                part_indices
3043                    .iter()
3044                    .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
3045            };
3046
3047            if new_partition {
3048                // No-order aggregate frame: the partition that just ended is
3049                // complete, so its final running value IS the whole-partition
3050                // aggregate. Back-fill it onto every row of that partition.
3051                if whole_partition_frame && sorted_pos > 0 {
3052                    let final_v = win_values[indices[sorted_pos - 1]].clone();
3053                    for ri in partition_row_indices.drain(..) {
3054                        win_values[ri] = final_v.clone();
3055                    }
3056                }
3057                partition_start = sorted_pos;
3058                running_count = 0;
3059                running_int_sum = 0;
3060                running_float_sum = 0.0;
3061                running_saw_float = false;
3062                running_min = None;
3063                running_max = None;
3064                rank_counter = 0;
3065                dense_rank_counter = 0;
3066                prev_order_key = None;
3067                same_rank_count = 0;
3068            }
3069
3070            // Extract current order key for rank tracking.
3071            let current_order_key: Vec<Value> = ord_indices
3072                .iter()
3073                .map(|&(oi, _)| rows[row_idx][oi].clone())
3074                .collect();
3075            let same_as_prev = prev_order_key.as_ref() == Some(&current_order_key);
3076
3077            let value = match wdef.function {
3078                WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
3079                WindowFunc::Rank => {
3080                    if same_as_prev {
3081                        same_rank_count += 1;
3082                    } else {
3083                        rank_counter += same_rank_count + 1;
3084                        same_rank_count = 0;
3085                        if rank_counter == 0 {
3086                            rank_counter = 1;
3087                        }
3088                    }
3089                    Value::Int(rank_counter)
3090                }
3091                WindowFunc::DenseRank => {
3092                    if !same_as_prev {
3093                        dense_rank_counter += 1;
3094                    }
3095                    Value::Int(dense_rank_counter)
3096                }
3097                WindowFunc::Sum => {
3098                    if let Some(ci) = arg_col_idx {
3099                        match &rows[row_idx][ci] {
3100                            Value::Int(v) => running_int_sum += v,
3101                            Value::Float(v) => {
3102                                running_float_sum += v;
3103                                running_saw_float = true;
3104                            }
3105                            _ => {}
3106                        }
3107                    }
3108                    if running_saw_float {
3109                        Value::Float(running_float_sum + running_int_sum as f64)
3110                    } else {
3111                        Value::Int(running_int_sum)
3112                    }
3113                }
3114                WindowFunc::Avg => {
3115                    if let Some(ci) = arg_col_idx {
3116                        match &rows[row_idx][ci] {
3117                            Value::Int(v) => {
3118                                running_float_sum += *v as f64;
3119                                running_count += 1;
3120                            }
3121                            Value::Float(v) => {
3122                                running_float_sum += v;
3123                                running_count += 1;
3124                            }
3125                            _ => {}
3126                        }
3127                    }
3128                    if running_count == 0 {
3129                        Value::Empty
3130                    } else {
3131                        Value::Float(running_float_sum / running_count as f64)
3132                    }
3133                }
3134                WindowFunc::Count => {
3135                    if let Some(ci) = arg_col_idx {
3136                        if !rows[row_idx][ci].is_empty() {
3137                            running_count += 1;
3138                        }
3139                    } else {
3140                        // count(*) — count all rows
3141                        running_count += 1;
3142                    }
3143                    Value::Int(running_count)
3144                }
3145                WindowFunc::Min => {
3146                    if let Some(ci) = arg_col_idx {
3147                        let v = &rows[row_idx][ci];
3148                        if !v.is_empty() {
3149                            running_min = Some(match &running_min {
3150                                None => v.clone(),
3151                                Some(cur) => {
3152                                    if v < cur {
3153                                        v.clone()
3154                                    } else {
3155                                        cur.clone()
3156                                    }
3157                                }
3158                            });
3159                        }
3160                    }
3161                    running_min.clone().unwrap_or(Value::Empty)
3162                }
3163                WindowFunc::Max => {
3164                    if let Some(ci) = arg_col_idx {
3165                        let v = &rows[row_idx][ci];
3166                        if !v.is_empty() {
3167                            running_max = Some(match &running_max {
3168                                None => v.clone(),
3169                                Some(cur) => {
3170                                    if v > cur {
3171                                        v.clone()
3172                                    } else {
3173                                        cur.clone()
3174                                    }
3175                                }
3176                            });
3177                        }
3178                    }
3179                    running_max.clone().unwrap_or(Value::Empty)
3180                }
3181            };
3182
3183            prev_order_key = Some(current_order_key);
3184            win_values[row_idx] = value;
3185            if whole_partition_frame {
3186                partition_row_indices.push(row_idx);
3187            }
3188        }
3189
3190        // Back-fill the final partition (the loop only flushes at boundaries).
3191        if whole_partition_frame && n > 0 {
3192            let final_v = win_values[indices[n - 1]].clone();
3193            for ri in partition_row_indices.drain(..) {
3194                win_values[ri] = final_v.clone();
3195            }
3196        }
3197
3198        // Append the computed window column to each row.
3199        for (ri, row) in rows.iter_mut().enumerate() {
3200            row.push(win_values[ri].clone());
3201        }
3202        columns.push(wdef.output_name.clone());
3203    }
3204
3205    Ok(QueryResult::Rows { columns, rows })
3206}
3207
3208/// Mission E2b: compute one aggregate over a set of rows in a group.
3209pub(super) fn compute_group_aggregate(
3210    func: AggFunc,
3211    all_rows: &[Vec<Value>],
3212    row_indices: &[usize],
3213    col_idx: usize,
3214) -> Value {
3215    match func {
3216        AggFunc::Count => {
3217            if col_idx == usize::MAX {
3218                // count(*) — count all rows in the group.
3219                return Value::Int(row_indices.len() as i64);
3220            }
3221            let count = row_indices
3222                .iter()
3223                .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3224                .count();
3225            Value::Int(count as i64)
3226        }
3227        AggFunc::CountDistinct => {
3228            let mut seen = std::collections::HashSet::new();
3229            for &ri in row_indices {
3230                let v = &all_rows[ri][col_idx];
3231                if !v.is_empty() {
3232                    seen.insert(v.clone());
3233                }
3234            }
3235            Value::Int(seen.len() as i64)
3236        }
3237        AggFunc::Sum => {
3238            // Mirror the scalar Sum path: accumulate int and float
3239            // contributions separately and promote the final result to
3240            // Float if any Float row was observed. Prevents silent
3241            // drop of Float columns in GROUP BY aggregates.
3242            let mut int_sum: i64 = 0;
3243            let mut float_sum: f64 = 0.0;
3244            let mut saw_float = false;
3245            for &ri in row_indices {
3246                match &all_rows[ri][col_idx] {
3247                    Value::Int(v) => int_sum += v,
3248                    Value::Float(v) => {
3249                        float_sum += *v;
3250                        saw_float = true;
3251                    }
3252                    _ => {}
3253                }
3254            }
3255            if saw_float {
3256                Value::Float(float_sum + int_sum as f64)
3257            } else {
3258                Value::Int(int_sum)
3259            }
3260        }
3261        AggFunc::Avg => {
3262            let mut sum = 0.0f64;
3263            let mut count = 0usize;
3264            for &ri in row_indices {
3265                match &all_rows[ri][col_idx] {
3266                    Value::Int(v) => {
3267                        sum += *v as f64;
3268                        count += 1;
3269                    }
3270                    Value::Float(v) => {
3271                        sum += *v;
3272                        count += 1;
3273                    }
3274                    _ => {}
3275                }
3276            }
3277            if count == 0 {
3278                Value::Empty
3279            } else {
3280                Value::Float(sum / count as f64)
3281            }
3282        }
3283        AggFunc::Min => row_indices
3284            .iter()
3285            .map(|&ri| &all_rows[ri][col_idx])
3286            .filter(|v| !v.is_empty())
3287            .min()
3288            .cloned()
3289            .unwrap_or(Value::Empty),
3290        AggFunc::Max => row_indices
3291            .iter()
3292            .map(|&ri| &all_rows[ri][col_idx])
3293            .filter(|v| !v.is_empty())
3294            .max()
3295            .cloned()
3296            .unwrap_or(Value::Empty),
3297    }
3298}
3299
3300/// Mission E1.3: try to extract equi-join key indices from a join `on`
3301/// predicate. Returns `Some((left_col_idx, right_col_idx))` when the
3302/// predicate is exactly `L = R` (or `R = L`) and both sides resolve
3303/// cleanly — `L` to the left subtree's column list and `R` to the right
3304/// subtree's column list.
3305///
3306/// This is deliberately narrow. We only recognise the two shapes:
3307///   * `QualifiedField = QualifiedField`  (`u.id = o.user_id`)
3308///   * `Field = Field`                    (`.id = .user_id`, unqualified)
3309///
3310/// Anything else — conjunctions, constants, function calls, or predicates
3311/// that touch the same side on both halves — falls through to the
3312/// nested-loop path unchanged.
3313pub(super) fn try_extract_equi_join_keys(
3314    pred: &Expr,
3315    left_columns: &[String],
3316    right_columns: &[String],
3317) -> Option<(usize, usize)> {
3318    let (lhs, op, rhs) = match pred {
3319        Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3320        _ => return None,
3321    };
3322    if op != BinOp::Eq {
3323        return None;
3324    }
3325    // Normal orientation: lhs in left, rhs in right.
3326    if let (Some(li), Some(ri)) = (
3327        resolve_side_column(lhs, left_columns),
3328        resolve_side_column(rhs, right_columns),
3329    ) {
3330        return Some((li, ri));
3331    }
3332    // Swapped: rhs in left, lhs in right. Both sides of `=` are
3333    // commutative so this is safe.
3334    if let (Some(li), Some(ri)) = (
3335        resolve_side_column(rhs, left_columns),
3336        resolve_side_column(lhs, right_columns),
3337    ) {
3338        return Some((li, ri));
3339    }
3340    None
3341}
3342
3343fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3344    match expr {
3345        Expr::QualifiedField { qualifier, field } => {
3346            // Byte-level match so we don't allocate a fresh `format!` on
3347            // every call — this runs once per plan, so allocation would be
3348            // cheap, but the match is trivial enough to keep inline with
3349            // the eval_expr version.
3350            let q = qualifier.as_bytes();
3351            let f = field.as_bytes();
3352            columns.iter().position(|c| {
3353                let b = c.as_bytes();
3354                b.len() == q.len() + 1 + f.len()
3355                    && b[..q.len()] == *q
3356                    && b[q.len()] == b'.'
3357                    && b[q.len() + 1..] == *f
3358            })
3359        }
3360        Expr::Field(name) => columns.iter().position(|c| c == name),
3361        _ => None,
3362    }
3363}
3364
3365/// Mission E1.3: O(L + R) hash join. Builds a `FxHashMap<Value, Vec<usize>>`
3366/// over the right (inner) side's join keys, then streams the left (outer)
3367/// side and for each probe row emits every combined row whose right-side
3368/// key matches. For `JoinKind::LeftOuter`, unmatched left rows are emitted
3369/// padded with `Value::Empty` on the right side.
3370///
3371/// The right side is always the build side. That choice is forced for
3372/// LeftOuter (the left side must stream so we can detect orphans), and
3373/// for Inner it's a reasonable default — left-deep plans tend to grow the
3374/// left side with each join, so the un-joined right leaf is often the
3375/// smaller of the two at each level.
3376pub(super) fn hash_join(
3377    left_columns: Vec<String>,
3378    left_rows: Vec<Vec<Value>>,
3379    right_columns: Vec<String>,
3380    right_rows: Vec<Vec<Value>>,
3381    left_key_idx: usize,
3382    right_key_idx: usize,
3383    kind: JoinKind,
3384) -> QueryResult {
3385    use rustc_hash::FxHashMap;
3386
3387    let n_left = left_columns.len();
3388    let n_right = right_columns.len();
3389    let mut columns = Vec::with_capacity(n_left + n_right);
3390    columns.extend(left_columns);
3391    columns.extend(right_columns);
3392
3393    // Build: right_key -> list of right-row indices. Pre-size to the row
3394    // count so the map doesn't rehash mid-build.
3395    let mut build: FxHashMap<Value, Vec<usize>> =
3396        FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3397    for (i, row) in right_rows.iter().enumerate() {
3398        // Skip Empty keys on the build side — they can never match under
3399        // SQL semantics (NULL ≠ NULL) and would collapse all nullables to
3400        // one bucket.
3401        if matches!(row[right_key_idx], Value::Empty) {
3402            continue;
3403        }
3404        build.entry(row[right_key_idx].clone()).or_default().push(i);
3405    }
3406
3407    // Reasonable starting capacity — inner joins produce ≥ left_rows.len()
3408    // rows in the common 1:1 case, left-outer always emits ≥ left_rows.len().
3409    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3410
3411    for left_row in &left_rows {
3412        let key = &left_row[left_key_idx];
3413        let matched = if matches!(key, Value::Empty) {
3414            None
3415        } else {
3416            build.get(key)
3417        };
3418        match matched {
3419            Some(matches) if !matches.is_empty() => {
3420                for &ri in matches {
3421                    let right_row = &right_rows[ri];
3422                    let mut combined = Vec::with_capacity(n_left + n_right);
3423                    combined.extend_from_slice(left_row);
3424                    combined.extend_from_slice(right_row);
3425                    rows.push(combined);
3426                }
3427            }
3428            _ => {
3429                if matches!(kind, JoinKind::LeftOuter) {
3430                    let mut row = Vec::with_capacity(n_left + n_right);
3431                    row.extend_from_slice(left_row);
3432                    row.resize(n_left + n_right, Value::Empty);
3433                    rows.push(row);
3434                }
3435            }
3436        }
3437    }
3438
3439    QueryResult::Rows { columns, rows }
3440}
3441
3442/// Lower unindexed `RangeScan` and `IndexScan` nodes to `Filter(SeqScan)`
3443/// so that all downstream fast paths (count, project+limit, sort+limit,
3444/// agg, update, delete) continue to fire.
3445///
3446/// The planner emits `RangeScan` (for `.age > 30`) and `IndexScan` (for
3447/// `.email = lit`) speculatively because it has no catalog access. When
3448/// the column has a B-tree index, those plans are correct. When it
3449/// doesn't, the executor's fallbacks materialise every matching row with
3450/// full `decode_row` — bypassing the compiled-predicate fast paths that
3451/// `Filter(SeqScan)` would trigger. Lowering both speculative leaf kinds
3452/// also keeps EXPLAIN honest: it prints the plan that actually runs.
3453///
3454/// This pass runs once per query, before execution.
3455pub(super) fn lower_unindexed_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3456    match plan {
3457        PlanNode::RangeScan {
3458            table,
3459            column,
3460            start,
3461            end,
3462        } => {
3463            if let Some(tbl) = catalog.get_table(table) {
3464                // Keep RangeScan whenever ANY index exists on the column:
3465                // unique indexes store raw column values, non-unique indexes
3466                // store composite (value, rid) keys that the executor walks
3467                // natively via BTree::range_rids. Only lower to Filter(SeqScan)
3468                // when the column is unindexed.
3469                if tbl.has_index(column) {
3470                    return plan.clone();
3471                }
3472            }
3473            let pred = synthesize_range_predicate(column, start, end);
3474            PlanNode::Filter {
3475                input: Box::new(PlanNode::SeqScan {
3476                    table: table.clone(),
3477                }),
3478                predicate: pred,
3479            }
3480        }
3481        PlanNode::Filter { input, predicate } => PlanNode::Filter {
3482            input: Box::new(lower_unindexed_scans(catalog, input)),
3483            predicate: predicate.clone(),
3484        },
3485        PlanNode::Project { input, fields } => PlanNode::Project {
3486            input: Box::new(lower_unindexed_scans(catalog, input)),
3487            fields: fields.clone(),
3488        },
3489        PlanNode::Sort { input, keys } => PlanNode::Sort {
3490            input: Box::new(lower_unindexed_scans(catalog, input)),
3491            keys: keys.clone(),
3492        },
3493        PlanNode::Limit { input, count } => PlanNode::Limit {
3494            input: Box::new(lower_unindexed_scans(catalog, input)),
3495            count: count.clone(),
3496        },
3497        PlanNode::Offset { input, count } => PlanNode::Offset {
3498            input: Box::new(lower_unindexed_scans(catalog, input)),
3499            count: count.clone(),
3500        },
3501        PlanNode::Aggregate {
3502            input,
3503            function,
3504            field,
3505        } => PlanNode::Aggregate {
3506            input: Box::new(lower_unindexed_scans(catalog, input)),
3507            function: *function,
3508            field: field.clone(),
3509        },
3510        PlanNode::Distinct { input } => PlanNode::Distinct {
3511            input: Box::new(lower_unindexed_scans(catalog, input)),
3512        },
3513        PlanNode::GroupBy {
3514            input,
3515            keys,
3516            aggregates,
3517            having,
3518        } => PlanNode::GroupBy {
3519            input: Box::new(lower_unindexed_scans(catalog, input)),
3520            keys: keys.clone(),
3521            aggregates: aggregates.clone(),
3522            having: having.clone(),
3523        },
3524        PlanNode::Update {
3525            input,
3526            table,
3527            assignments,
3528        } => PlanNode::Update {
3529            input: Box::new(lower_unindexed_scans(catalog, input)),
3530            table: table.clone(),
3531            assignments: assignments.clone(),
3532        },
3533        PlanNode::Delete { input, table } => PlanNode::Delete {
3534            input: Box::new(lower_unindexed_scans(catalog, input)),
3535            table: table.clone(),
3536        },
3537        PlanNode::Window { input, windows } => PlanNode::Window {
3538            input: Box::new(lower_unindexed_scans(catalog, input)),
3539            windows: windows.clone(),
3540        },
3541        PlanNode::Union { left, right, all } => PlanNode::Union {
3542            left: Box::new(lower_unindexed_scans(catalog, left)),
3543            right: Box::new(lower_unindexed_scans(catalog, right)),
3544            all: *all,
3545        },
3546        PlanNode::Explain { input } => PlanNode::Explain {
3547            input: Box::new(lower_unindexed_scans(catalog, input)),
3548        },
3549        PlanNode::NestedLoopJoin {
3550            left,
3551            right,
3552            on,
3553            kind,
3554        } => PlanNode::NestedLoopJoin {
3555            left: Box::new(lower_unindexed_scans(catalog, left)),
3556            right: Box::new(lower_unindexed_scans(catalog, right)),
3557            on: on.clone(),
3558            kind: *kind,
3559        },
3560        PlanNode::IndexScan { table, column, key } => {
3561            if let Some(tbl) = catalog.get_table(table) {
3562                if tbl.has_index(column) {
3563                    return plan.clone();
3564                }
3565            }
3566            PlanNode::Filter {
3567                input: Box::new(PlanNode::SeqScan {
3568                    table: table.clone(),
3569                }),
3570                predicate: Expr::BinaryOp(
3571                    Box::new(Expr::Field(column.clone())),
3572                    BinOp::Eq,
3573                    Box::new(key.clone()),
3574                ),
3575            }
3576        }
3577        // Leaf nodes: no children to recurse into.
3578        _ => plan.clone(),
3579    }
3580}
3581
3582/// Synthesize a range predicate from RangeScan bounds for the fallback path.
3583pub(super) fn synthesize_range_predicate(
3584    column: &str,
3585    start: &Option<(Expr, bool)>,
3586    end: &Option<(Expr, bool)>,
3587) -> Expr {
3588    let lower = start.as_ref().map(|(expr, inclusive)| {
3589        let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3590        Expr::BinaryOp(
3591            Box::new(Expr::Field(column.to_string())),
3592            op,
3593            Box::new(expr.clone()),
3594        )
3595    });
3596    let upper = end.as_ref().map(|(expr, inclusive)| {
3597        let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3598        Expr::BinaryOp(
3599            Box::new(Expr::Field(column.to_string())),
3600            op,
3601            Box::new(expr.clone()),
3602        )
3603    });
3604    match (lower, upper) {
3605        (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3606        (Some(l), None) => l,
3607        (None, Some(u)) => u,
3608        (None, None) => Expr::Literal(Literal::Bool(true)),
3609    }
3610}
3611
3612/// Check if a value falls within a range (used in last-resort decoded-row eval).
3613pub(super) fn range_matches(
3614    val: &Value,
3615    start: &Option<Value>,
3616    start_inc: bool,
3617    end: &Option<Value>,
3618    end_inc: bool,
3619) -> bool {
3620    if let Some(ref s) = start {
3621        if start_inc {
3622            if val < s {
3623                return false;
3624            }
3625        } else if val <= s {
3626            return false;
3627        }
3628    }
3629    if let Some(ref e) = end {
3630        if end_inc {
3631            if val > e {
3632                return false;
3633            }
3634        } else if val >= e {
3635            return false;
3636        }
3637    }
3638    true
3639}
3640
3641/// Format a `PlanNode` tree as a human-readable, indented text
3642/// representation. Used by the `EXPLAIN` command.
3643pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3644    let indent = "  ".repeat(depth);
3645    match plan {
3646        PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3647        PlanNode::AliasScan { table, alias } => {
3648            format!("{indent}AliasScan table={table} alias={alias}")
3649        }
3650        PlanNode::IndexScan { table, column, key } => {
3651            format!("{indent}IndexScan table={table} column={column} key={key:?}")
3652        }
3653        PlanNode::RangeScan {
3654            table,
3655            column,
3656            start,
3657            end,
3658        } => {
3659            let s = match start {
3660                Some((expr, inc)) => {
3661                    let op = if *inc { ">=" } else { ">" };
3662                    format!("{op}{expr:?}")
3663                }
3664                None => "unbounded".to_string(),
3665            };
3666            let e = match end {
3667                Some((expr, inc)) => {
3668                    let op = if *inc { "<=" } else { "<" };
3669                    format!("{op}{expr:?}")
3670                }
3671                None => "unbounded".to_string(),
3672            };
3673            format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3674        }
3675        PlanNode::Filter { input, predicate } => {
3676            let child = format_plan_tree(input, depth + 1);
3677            format!("{indent}Filter predicate={predicate:?}\n{child}")
3678        }
3679        PlanNode::Project { input, fields } => {
3680            let names: Vec<String> = fields
3681                .iter()
3682                .map(|f| match &f.alias {
3683                    Some(a) => format!("{a}: {:?}", f.expr),
3684                    None => format!("{:?}", f.expr),
3685                })
3686                .collect();
3687            let child = format_plan_tree(input, depth + 1);
3688            format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3689        }
3690        PlanNode::Sort { input, keys } => {
3691            let ks: Vec<String> = keys
3692                .iter()
3693                .map(|k| {
3694                    if k.descending {
3695                        format!("{} desc", k.field)
3696                    } else {
3697                        k.field.clone()
3698                    }
3699                })
3700                .collect();
3701            let child = format_plan_tree(input, depth + 1);
3702            format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3703        }
3704        PlanNode::Limit { input, count } => {
3705            let child = format_plan_tree(input, depth + 1);
3706            format!("{indent}Limit count={count:?}\n{child}")
3707        }
3708        PlanNode::Offset { input, count } => {
3709            let child = format_plan_tree(input, depth + 1);
3710            format!("{indent}Offset count={count:?}\n{child}")
3711        }
3712        PlanNode::Aggregate {
3713            input,
3714            function,
3715            field,
3716        } => {
3717            let f = field.as_deref().unwrap_or("*");
3718            let child = format_plan_tree(input, depth + 1);
3719            format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3720        }
3721        PlanNode::NestedLoopJoin {
3722            left,
3723            right,
3724            on,
3725            kind,
3726        } => {
3727            let left_child = format_plan_tree(left, depth + 1);
3728            let right_child = format_plan_tree(right, depth + 1);
3729            let on_str = match on {
3730                Some(pred) => format!("{pred:?}"),
3731                None => "none".to_string(),
3732            };
3733            format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3734        }
3735        PlanNode::Distinct { input } => {
3736            let child = format_plan_tree(input, depth + 1);
3737            format!("{indent}Distinct\n{child}")
3738        }
3739        PlanNode::GroupBy {
3740            input,
3741            keys,
3742            aggregates,
3743            having,
3744        } => {
3745            let agg_strs: Vec<String> = aggregates
3746                .iter()
3747                .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3748                .collect();
3749            let having_str = match having {
3750                Some(h) => format!(" having={h:?}"),
3751                None => String::new(),
3752            };
3753            let child = format_plan_tree(input, depth + 1);
3754            format!(
3755                "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3756                keys.join(", "),
3757                agg_strs.join(", "),
3758            )
3759        }
3760        PlanNode::Insert { table, rows } => {
3761            let cols: Vec<&str> = rows
3762                .first()
3763                .map(|r| r.iter().map(|a| a.field.as_str()).collect())
3764                .unwrap_or_default();
3765            format!(
3766                "{indent}Insert table={table} rows={} cols=[{}]",
3767                rows.len(),
3768                cols.join(", ")
3769            )
3770        }
3771        PlanNode::Upsert {
3772            table,
3773            key_column,
3774            assignments,
3775            on_conflict,
3776        } => {
3777            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3778            let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3779            if conflict_cols.is_empty() {
3780                format!(
3781                    "{indent}Upsert table={table} key={key_column} cols=[{}]",
3782                    cols.join(", ")
3783                )
3784            } else {
3785                format!(
3786                    "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3787                    cols.join(", "),
3788                    conflict_cols.join(", ")
3789                )
3790            }
3791        }
3792        PlanNode::Update {
3793            input,
3794            table,
3795            assignments,
3796        } => {
3797            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3798            let child = format_plan_tree(input, depth + 1);
3799            format!(
3800                "{indent}Update table={table} set=[{}]\n{child}",
3801                cols.join(", ")
3802            )
3803        }
3804        PlanNode::Delete { input, table } => {
3805            let child = format_plan_tree(input, depth + 1);
3806            format!("{indent}Delete table={table}\n{child}")
3807        }
3808        PlanNode::CreateTable { name, fields } => {
3809            let fs: Vec<String> = fields
3810                .iter()
3811                .map(|f| {
3812                    let mut mods = String::new();
3813                    if f.required {
3814                        mods.push_str(" required");
3815                    }
3816                    if f.unique {
3817                        mods.push_str(" unique");
3818                    }
3819                    format!("{}: {}{mods}", f.name, f.type_name)
3820                })
3821                .collect();
3822            format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3823        }
3824        PlanNode::AlterTable { table, action } => {
3825            format!("{indent}AlterTable table={table} action={action:?}")
3826        }
3827        PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3828        PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3829        PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3830        PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3831        PlanNode::Window { input, windows } => {
3832            let ws: Vec<String> = windows
3833                .iter()
3834                .map(|w| format!("{:?} as {}", w.function, w.output_name))
3835                .collect();
3836            let child = format_plan_tree(input, depth + 1);
3837            format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3838        }
3839        PlanNode::Union { left, right, all } => {
3840            let kind = if *all { "UNION ALL" } else { "UNION" };
3841            let left_child = format_plan_tree(left, depth + 1);
3842            let right_child = format_plan_tree(right, depth + 1);
3843            format!("{indent}{kind}\n{left_child}\n{right_child}")
3844        }
3845        PlanNode::Explain { input } => {
3846            let child = format_plan_tree(input, depth + 1);
3847            format!("{indent}Explain\n{child}")
3848        }
3849        PlanNode::Begin => format!("{indent}Begin"),
3850        PlanNode::Commit => format!("{indent}Commit"),
3851        PlanNode::Rollback => format!("{indent}Rollback"),
3852    }
3853}