Skip to main content

powdb_query/executor/
plan_exec.rs

1//! The execute_plan method and associated helpers.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::{check_join_limit, Engine, MAX_SORT_ROWS};
15use powdb_storage::view::{ViewDef, ViewRegistry};
16
17impl Engine {
18    pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
19        match plan {
20            PlanNode::SeqScan { table } => {
21                // Auto-refresh dirty materialized views on read.
22                if self.view_registry.is_dirty(table) {
23                    self.refresh_view(table)?;
24                }
25                let schema = self
26                    .catalog
27                    .schema(table)
28                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
29                    .clone();
30                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
31                let rows: Vec<Vec<Value>> = self
32                    .catalog
33                    .scan(table)
34                    .map_err(|e| QueryError::StorageError(e.to_string()))?
35                    .map(|(_, row)| row)
36                    .collect();
37                Ok(QueryResult::Rows { columns, rows })
38            }
39
40            PlanNode::Filter { input, predicate } => {
41                // Materialize any IN-subqueries in the predicate before the
42                // scan loop — the closure can't call back into the engine.
43                // Correlated subqueries are left in place for per-row eval.
44                let materialized;
45                let predicate = if contains_subquery(predicate) {
46                    materialized = self.materialize_subqueries(predicate)?;
47                    &materialized
48                } else {
49                    predicate
50                };
51
52                // Correlated subquery path: per-row materialisation.
53                if contains_subquery(predicate) {
54                    let result = self.execute_plan(input)?;
55                    return match result {
56                        QueryResult::Rows { columns, rows } => {
57                            let mut filtered = Vec::new();
58                            for row in rows {
59                                let row_pred =
60                                    self.materialize_correlated_for_row(predicate, &row, &columns)?;
61                                if eval_predicate(&row_pred, &row, &columns) {
62                                    filtered.push(row);
63                                }
64                            }
65                            Ok(QueryResult::Rows {
66                                columns,
67                                rows: filtered,
68                            })
69                        }
70                        _ => Err("filter requires row input".into()),
71                    };
72                }
73
74                // Fast path: fuse Filter + SeqScan into a zero-copy streaming
75                // loop. Uses decode_column() to evaluate the predicate on only
76                // the columns it references, avoiding heap allocations for
77                // String/Bytes columns that aren't part of the filter.
78                if let PlanNode::SeqScan { table } = input.as_ref() {
79                    // Auto-refresh dirty materialized views.
80                    if self.view_registry.is_dirty(table) {
81                        self.refresh_view(table)?;
82                    }
83                    let schema = self
84                        .catalog
85                        .schema(table)
86                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
87                        .clone();
88                    let columns: Vec<String> =
89                        schema.columns.iter().map(|c| c.name.clone()).collect();
90                    let fast = FastLayout::new(&schema);
91                    let row_layout = RowLayout::new(&schema);
92                    // Mission F: pre-size to skip the first 4 Vec doublings
93                    // (4 → 8 → 16 → 32 → 64). On a 100K-row scan with 30%
94                    // selectivity that's ~4 fewer reallocations + memcpys.
95                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
96
97                    // Try compiled predicate for the filter check (handles
98                    // int leaves, string-eq leaves, and And conjunctions).
99                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
100                        self.catalog
101                            .for_each_row_raw(table, |_rid, data| {
102                                if compiled(data) {
103                                    rows.push(decode_row(&schema, data));
104                                }
105                            })
106                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
107                    } else {
108                        let pred_cols = predicate_column_indices(predicate, &columns);
109                        self.catalog
110                            .for_each_row_raw(table, |_rid, data| {
111                                let pred_row =
112                                    decode_selective(&schema, &row_layout, data, &pred_cols);
113                                if eval_predicate(predicate, &pred_row, &columns) {
114                                    rows.push(decode_row(&schema, data));
115                                }
116                            })
117                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
118                    }
119
120                    return Ok(QueryResult::Rows { columns, rows });
121                }
122
123                // General path: materialise then filter.
124                let result = self.execute_plan(input)?;
125                match result {
126                    QueryResult::Rows { columns, rows } => {
127                        let filtered: Vec<Vec<Value>> = rows
128                            .into_iter()
129                            .filter(|row| eval_predicate(predicate, row, &columns))
130                            .collect();
131                        Ok(QueryResult::Rows {
132                            columns,
133                            rows: filtered,
134                        })
135                    }
136                    _ => Err("filter requires row input".into()),
137                }
138            }
139
140            PlanNode::Project { input, fields } => {
141                // Fast path: Project over IndexScan — decode only projected
142                // columns from raw bytes instead of full decode_row.
143                if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
144                    let schema = self
145                        .catalog
146                        .schema(table)
147                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
148                        .clone();
149                    let all_columns: Vec<String> =
150                        schema.columns.iter().map(|c| c.name.clone()).collect();
151                    let key_value = literal_to_value(key)?;
152                    let tbl = self
153                        .catalog
154                        .get_table(table)
155                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
156
157                    let proj_columns: Vec<String> = fields
158                        .iter()
159                        .map(|f| {
160                            f.alias.clone().unwrap_or_else(|| match &f.expr {
161                                Expr::Field(name) => name.clone(),
162                                _ => "?".into(),
163                            })
164                        })
165                        .collect();
166
167                    // Determine which column indices the projection needs
168                    let proj_indices: Vec<usize> = fields
169                        .iter()
170                        .filter_map(|f| {
171                            if let Expr::Field(name) = &f.expr {
172                                all_columns.iter().position(|c| c == name)
173                            } else {
174                                None
175                            }
176                        })
177                        .collect();
178
179                    if tbl.has_index(column) {
180                        let layout = RowLayout::new(&schema);
181                        let rids = tbl.index_lookup_all(column, &key_value);
182                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
183                        for rid in rids {
184                            if let Some(data) = tbl.heap.get(rid) {
185                                let row: Vec<Value> = proj_indices
186                                    .iter()
187                                    .map(|&ci| decode_column(&schema, &layout, &data, ci))
188                                    .collect();
189                                rows.push(row);
190                            }
191                        }
192                        return Ok(QueryResult::Rows {
193                            columns: proj_columns,
194                            rows,
195                        });
196                    }
197                }
198
199                // Fast path: Project(Limit(Sort(Filter(SeqScan)))) — bounded
200                // top-N heap. Decodes only the sort key + projected columns,
201                // keeps at most `limit` rows in a heap. Also handles the
202                // Project(Limit(Sort(SeqScan))) variant (no filter).
203                if let PlanNode::Limit {
204                    input: inner,
205                    count: limit_expr,
206                } = input.as_ref()
207                {
208                    if let PlanNode::Sort {
209                        input: sort_input,
210                        keys,
211                    } = inner.as_ref()
212                    {
213                        // Fast path only for single-key sorts
214                        if keys.len() == 1 {
215                            let sort_field = &keys[0].field;
216                            let descending = keys[0].descending;
217                            let limit = match limit_expr {
218                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
219                                _ => usize::MAX,
220                            };
221                            let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
222                                match sort_input.as_ref() {
223                                    PlanNode::SeqScan { table } => (Some(table.as_str()), None),
224                                    PlanNode::Filter {
225                                        input: fi,
226                                        predicate,
227                                    } => {
228                                        if let PlanNode::SeqScan { table } = fi.as_ref() {
229                                            (Some(table.as_str()), Some(predicate))
230                                        } else {
231                                            (None, None)
232                                        }
233                                    }
234                                    _ => (None, None),
235                                };
236                            if let Some(table) = table_opt {
237                                if let Some(result) = self.project_filter_sort_limit_fast(
238                                    table, fields, sort_field, descending, limit, pred_opt,
239                                )? {
240                                    return Ok(result);
241                                }
242                            }
243                        }
244                    }
245                    // Fast path: Project(Limit(Filter(SeqScan))) — stream,
246                    // decode only projected columns, stop at limit.
247                    if let PlanNode::Filter {
248                        input: fi,
249                        predicate,
250                    } = inner.as_ref()
251                    {
252                        if let PlanNode::SeqScan { table } = fi.as_ref() {
253                            let limit = match limit_expr {
254                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
255                                _ => usize::MAX,
256                            };
257                            if let Some(result) = self.project_filter_limit_fast(
258                                table,
259                                fields,
260                                limit,
261                                Some(predicate),
262                            )? {
263                                return Ok(result);
264                            }
265                        }
266                    }
267                    // Fast path: Project(Limit(SeqScan)) — stream, no filter.
268                    if let PlanNode::SeqScan { table } = inner.as_ref() {
269                        let limit = match limit_expr {
270                            Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
271                            _ => usize::MAX,
272                        };
273                        if let Some(result) =
274                            self.project_filter_limit_fast(table, fields, limit, None)?
275                        {
276                            return Ok(result);
277                        }
278                    }
279                }
280
281                // Mission D4: Project(Filter(SeqScan)) without Limit. Reuses
282                // `project_filter_limit_fast` with limit = usize::MAX so the
283                // hot loop decodes only projected columns and uses the
284                // compiled predicate. Previously this fell through to the
285                // generic Filter branch which materialised every column via
286                // `decode_row` then re-projected — quadratic work.
287                //
288                // multi_col_and_filter (`U filter .age > 30 and .status =
289                // "active" { .name, .age }`) was 6.18ms (0.7x SQLite) and
290                // is the load-bearing workload for this fast path.
291                if let PlanNode::Filter {
292                    input: fi,
293                    predicate,
294                } = input.as_ref()
295                {
296                    if let PlanNode::SeqScan { table } = fi.as_ref() {
297                        if let Some(result) = self.project_filter_limit_fast(
298                            table,
299                            fields,
300                            usize::MAX,
301                            Some(predicate),
302                        )? {
303                            return Ok(result);
304                        }
305                    }
306                }
307
308                // Mission D4: Project(SeqScan) without Filter or Limit.
309                // Decode only projected columns; the previous fall-through
310                // built full Vec<Value> rows then re-projected.
311                if let PlanNode::SeqScan { table } = input.as_ref() {
312                    if let Some(result) =
313                        self.project_filter_limit_fast(table, fields, usize::MAX, None)?
314                    {
315                        return Ok(result);
316                    }
317                }
318
319                let result = self.execute_plan(input)?;
320                match result {
321                    QueryResult::Rows { columns, rows } => {
322                        let proj_columns: Vec<String> = fields
323                            .iter()
324                            .map(|f| {
325                                f.alias.clone().unwrap_or_else(|| match &f.expr {
326                                    Expr::Field(name) => name.clone(),
327                                    // Mission E1.2: `{ u.name }` projects as the
328                                    // qualified column name so callers can still
329                                    // disambiguate across the join output.
330                                    Expr::QualifiedField { qualifier, field } => {
331                                        format!("{qualifier}.{field}")
332                                    }
333                                    _ => "?".into(),
334                                })
335                            })
336                            .collect();
337                        let proj_rows: Vec<Vec<Value>> = rows
338                            .iter()
339                            .map(|row| {
340                                fields
341                                    .iter()
342                                    .map(|f| eval_expr(&f.expr, row, &columns))
343                                    .collect()
344                            })
345                            .collect();
346                        Ok(QueryResult::Rows {
347                            columns: proj_columns,
348                            rows: proj_rows,
349                        })
350                    }
351                    _ => Err("project requires row input".into()),
352                }
353            }
354
355            PlanNode::Sort { input, keys } => {
356                let result = self.execute_plan(input)?;
357                match result {
358                    QueryResult::Rows { columns, mut rows } => {
359                        // WS2: row-count cap is a cheap secondary guard; the
360                        // byte budget is the real OOM defense for the sort
361                        // buffer (a few very large rows pass the row cap).
362                        if rows.len() > MAX_SORT_ROWS {
363                            return Err(QueryError::SortLimitExceeded);
364                        }
365                        self.charge_rows(&rows)?;
366                        let key_indices: Vec<(usize, bool)> = keys
367                            .iter()
368                            .map(|k| {
369                                columns
370                                    .iter()
371                                    .position(|c| c == &k.field)
372                                    .map(|idx| (idx, k.descending))
373                                    .ok_or_else(|| QueryError::ColumnNotFound {
374                                        table: String::new(),
375                                        column: k.field.clone(),
376                                    })
377                            })
378                            .collect::<Result<_, QueryError>>()?;
379                        rows.sort_by(|a, b| {
380                            for &(col_idx, descending) in &key_indices {
381                                let cmp = a[col_idx].cmp(&b[col_idx]);
382                                let cmp = if descending { cmp.reverse() } else { cmp };
383                                if cmp != std::cmp::Ordering::Equal {
384                                    return cmp;
385                                }
386                            }
387                            std::cmp::Ordering::Equal
388                        });
389                        Ok(QueryResult::Rows { columns, rows })
390                    }
391                    _ => Err("sort requires row input".into()),
392                }
393            }
394
395            PlanNode::Limit { input, count } => {
396                let result = self.execute_plan(input)?;
397                let n = match count {
398                    Expr::Literal(Literal::Int(v)) => *v as usize,
399                    _ => return Err("limit must be integer literal".into()),
400                };
401                match result {
402                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
403                        columns,
404                        rows: rows.into_iter().take(n).collect(),
405                    }),
406                    _ => Err("limit requires row input".into()),
407                }
408            }
409
410            PlanNode::Offset { input, count } => {
411                let result = self.execute_plan(input)?;
412                let n = match count {
413                    Expr::Literal(Literal::Int(v)) => *v as usize,
414                    _ => return Err("offset must be integer literal".into()),
415                };
416                match result {
417                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
418                        columns,
419                        rows: rows.into_iter().skip(n).collect(),
420                    }),
421                    _ => Err("offset requires row input".into()),
422                }
423            }
424
425            PlanNode::Aggregate {
426                input,
427                function,
428                field,
429            } => {
430                // Fast path: count() over SeqScan — count rows without any decode
431                if *function == AggFunc::Count {
432                    if let PlanNode::SeqScan { table } = input.as_ref() {
433                        // Auto-refresh a dirty materialized view before
434                        // counting it — otherwise count(View) returns stale
435                        // data after an underlying mutation (F3).
436                        if self.view_registry.is_dirty(table) {
437                            self.refresh_view(table)?;
438                        }
439                        let mut count: i64 = 0;
440                        self.catalog
441                            .for_each_row_raw(table, |_rid, _data| {
442                                count += 1;
443                            })
444                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
445                        return Ok(QueryResult::Scalar(Value::Int(count)));
446                    }
447                    // Fast path: count() over Filter(SeqScan) — try compiled
448                    // predicate first, fall back to decode_column path.
449                    // Skip a predicate carrying a subquery: the raw-bytes
450                    // evaluators here don't materialise subqueries, so
451                    // `count(T filter .x in (...))` would silently count 0
452                    // (F1). Falling through routes it to the generic path
453                    // that resolves the subquery correctly.
454                    if let PlanNode::Filter {
455                        input: inner,
456                        predicate,
457                    } = input.as_ref()
458                    {
459                        if let PlanNode::SeqScan { table } = inner.as_ref() {
460                            if self.view_registry.is_dirty(table) {
461                                self.refresh_view(table)?;
462                            }
463                        }
464                        if let (PlanNode::SeqScan { table }, false) =
465                            (inner.as_ref(), contains_subquery(predicate))
466                        {
467                            let schema = self
468                                .catalog
469                                .schema(table)
470                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
471                                .clone();
472                            let columns: Vec<String> =
473                                schema.columns.iter().map(|c| c.name.clone()).collect();
474                            let fast = FastLayout::new(&schema);
475                            let row_layout = RowLayout::new(&schema);
476
477                            // Try compiled predicate (zero-allocation hot path).
478                            // Handles int leaves, string-eq leaves, AND conjunctions.
479                            if let Some(compiled) =
480                                compile_predicate(predicate, &columns, &fast, &schema)
481                            {
482                                let mut count: i64 = 0;
483                                self.catalog
484                                    .for_each_row_raw(table, |_rid, data| {
485                                        if compiled(data) {
486                                            count += 1;
487                                        }
488                                    })
489                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
490                                return Ok(QueryResult::Scalar(Value::Int(count)));
491                            }
492
493                            // Fallback: decode predicate columns
494                            let pred_cols = predicate_column_indices(predicate, &columns);
495                            let mut count: i64 = 0;
496                            self.catalog
497                                .for_each_row_raw(table, |_rid, data| {
498                                    let pred_row =
499                                        decode_selective(&schema, &row_layout, data, &pred_cols);
500                                    if eval_predicate(predicate, &pred_row, &columns) {
501                                        count += 1;
502                                    }
503                                })
504                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
505
506                            return Ok(QueryResult::Scalar(Value::Int(count)));
507                        }
508                    }
509                }
510
511                // Fast path: sum/avg/min/max over a single fixed-size int
512                // column with an optional compiled filter predicate. Walks
513                // raw row bytes, zero allocation per row.
514                if matches!(
515                    function,
516                    AggFunc::Sum
517                        | AggFunc::Avg
518                        | AggFunc::Min
519                        | AggFunc::Max
520                        | AggFunc::CountDistinct
521                ) {
522                    if let Some(col) = field.as_ref() {
523                        // Shape: Aggregate(SeqScan) or Aggregate(Filter(SeqScan))
524                        let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
525                            match input.as_ref() {
526                                PlanNode::SeqScan { table } => (Some(table.as_str()), None),
527                                PlanNode::Filter {
528                                    input: inner,
529                                    predicate,
530                                } => {
531                                    if let PlanNode::SeqScan { table } = inner.as_ref() {
532                                        (Some(table.as_str()), Some(predicate))
533                                    } else {
534                                        (None, None)
535                                    }
536                                }
537                                _ => (None, None),
538                            };
539                        if let Some(table) = table_opt {
540                            if let Some(result) =
541                                self.agg_single_col_fast(table, col, *function, pred_opt)?
542                            {
543                                return Ok(result);
544                            }
545                        }
546                    }
547                }
548
549                // Fast path: Project(Limit(Filter(SeqScan))) — stream, decode
550                // only projected columns, stop once we hit the limit.
551                // (Handled in the Project branch; this branch only fires when
552                // the aggregate is the outer node.)
553                let result = self.execute_plan(input)?;
554                match result {
555                    QueryResult::Rows { columns, rows } => {
556                        match function {
557                            AggFunc::Count => {
558                                Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
559                            }
560                            AggFunc::CountDistinct => {
561                                let col = field.as_ref().ok_or("count distinct requires field")?;
562                                let idx = columns
563                                    .iter()
564                                    .position(|c| c == col)
565                                    .ok_or("col not found")?;
566                                let mut seen = std::collections::HashSet::new();
567                                for row in &rows {
568                                    let v = &row[idx];
569                                    if !v.is_empty() {
570                                        seen.insert(v.clone());
571                                    }
572                                }
573                                Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
574                            }
575                            AggFunc::Avg => {
576                                let col = field.as_ref().ok_or("avg requires field")?;
577                                let idx = columns
578                                    .iter()
579                                    .position(|c| c == col)
580                                    .ok_or("col not found")?;
581                                let sum: f64 = rows
582                                    .iter()
583                                    .filter_map(|r| match &r[idx] {
584                                        Value::Int(v) => Some(*v as f64),
585                                        Value::Float(v) => Some(*v),
586                                        _ => None,
587                                    })
588                                    .sum();
589                                let count = rows.len() as f64;
590                                Ok(QueryResult::Scalar(Value::Float(sum / count)))
591                            }
592                            AggFunc::Sum => {
593                                let col = field.as_ref().ok_or("sum requires field")?;
594                                let idx = columns
595                                    .iter()
596                                    .position(|c| c == col)
597                                    .ok_or("col not found")?;
598                                // Track int and float contributions separately so
599                                // Float columns (and mixed Int/Float rows) don't get
600                                // silently dropped as they did in the Int-only
601                                // version. If any Float is present, the whole sum
602                                // promotes to Float — matching Avg's semantics.
603                                let mut int_sum: i64 = 0;
604                                let mut float_sum: f64 = 0.0;
605                                let mut saw_float = false;
606                                for r in &rows {
607                                    match &r[idx] {
608                                        Value::Int(v) => int_sum += *v,
609                                        Value::Float(v) => {
610                                            float_sum += *v;
611                                            saw_float = true;
612                                        }
613                                        _ => {}
614                                    }
615                                }
616                                let result = if saw_float {
617                                    Value::Float(float_sum + int_sum as f64)
618                                } else {
619                                    Value::Int(int_sum)
620                                };
621                                Ok(QueryResult::Scalar(result))
622                            }
623                            AggFunc::Min | AggFunc::Max => {
624                                let col = field.as_ref().ok_or("min/max requires field")?;
625                                let idx = columns
626                                    .iter()
627                                    .position(|c| c == col)
628                                    .ok_or("col not found")?;
629                                let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
630                                let result = if *function == AggFunc::Min {
631                                    vals.into_iter().min().cloned()
632                                } else {
633                                    vals.into_iter().max().cloned()
634                                };
635                                Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
636                            }
637                        }
638                    }
639                    _ => Err("aggregate requires row input".into()),
640                }
641            }
642
643            PlanNode::Insert { table, rows } => {
644                // Build + validate EVERY row before inserting any, so a bad
645                // row (unknown/missing/uncoercible field) aborts the whole
646                // statement without a partial write. The WAL fsync happens
647                // once at statement end, so N rows = N appends + 1 fsync.
648                let all_values: Vec<Vec<Value>> = {
649                    let schema = self
650                        .catalog
651                        .schema(table)
652                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
653                    let mut all = Vec::with_capacity(rows.len());
654                    for assignments in rows {
655                        let mut values = vec![Value::Empty; schema.columns.len()];
656                        for a in assignments {
657                            let idx = schema.column_index(&a.field).ok_or_else(|| {
658                                QueryError::ColumnNotFound {
659                                    table: String::new(),
660                                    column: a.field.clone(),
661                                }
662                            })?;
663                            let raw = literal_to_value(&a.value)?;
664                            values[idx] = coerce_value(raw, &schema.columns[idx])?;
665                        }
666                        for col in &schema.columns {
667                            if col.required && matches!(values[col.position as usize], Value::Empty)
668                            {
669                                return Err(QueryError::Execution(format!(
670                                    "column '{}' is required but no value was provided",
671                                    col.name
672                                )));
673                            }
674                        }
675                        all.push(values);
676                    }
677                    all
678                };
679                // Charge the materialized batch against the per-query memory
680                // budget before inserting — keeps multi-row insert consistent
681                // with every other full-materialization point (sort/join/group)
682                // and bounds embedded callers (the server also caps the query
683                // string at 1 MB, but embedded callers have no such limit).
684                self.charge_rows(&all_values)?;
685                let n = all_values.len() as u64;
686                for values in &all_values {
687                    self.catalog
688                        .insert(table, values)
689                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
690                }
691                self.view_registry.mark_dependents_dirty(table);
692                Ok(QueryResult::Modified(n))
693            }
694
695            PlanNode::Upsert {
696                table,
697                key_column,
698                assignments,
699                on_conflict,
700            } => {
701                let (values, key_idx) = {
702                    let schema = self
703                        .catalog
704                        .schema(table)
705                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
706                    let mut values = vec![Value::Empty; schema.columns.len()];
707                    for a in assignments {
708                        let idx = schema.column_index(&a.field).ok_or_else(|| {
709                            QueryError::ColumnNotFound {
710                                table: String::new(),
711                                column: a.field.clone(),
712                            }
713                        })?;
714                        let raw = literal_to_value(&a.value)?;
715                        values[idx] = coerce_value(raw, &schema.columns[idx])?;
716                    }
717                    for col in &schema.columns {
718                        if col.required && matches!(values[col.position as usize], Value::Empty) {
719                            return Err(QueryError::Execution(format!(
720                                "column '{}' is required but no value was provided",
721                                col.name
722                            )));
723                        }
724                    }
725                    let key_idx = schema
726                        .column_index(key_column)
727                        .ok_or_else(|| format!("key column '{key_column}' not found"))?;
728                    (values, key_idx)
729                };
730
731                let key_value = values[key_idx].clone();
732
733                // Probe the index for a conflict.
734                let existing = {
735                    let tbl = self
736                        .catalog
737                        .get_table(table)
738                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
739                    if tbl.has_index(key_column) {
740                        // Upsert key lookup: return the first match.
741                        // For unique indexes this is the only match.
742                        // For non-unique indexes on a key column, also
743                        // just the first (upsert semantics).
744                        let rids = tbl.index_lookup_all(key_column, &key_value);
745                        rids.into_iter().next().and_then(|rid| {
746                            tbl.heap
747                                .get(rid)
748                                .map(|data| (rid, decode_row(&tbl.schema, &data)))
749                        })
750                    } else {
751                        // No index — linear scan for the key.
752                        let mut found = None;
753                        for (rid, row) in tbl.scan() {
754                            if row[key_idx] == key_value {
755                                found = Some((rid, row));
756                                break;
757                            }
758                        }
759                        found
760                    }
761                };
762
763                if let Some((rid, mut existing_row)) = existing {
764                    // Conflict: apply on_conflict assignments (or all non-key if empty).
765                    let update_assignments = if on_conflict.is_empty() {
766                        assignments
767                    } else {
768                        on_conflict
769                    };
770                    let changed_cols: Vec<usize> = {
771                        let schema = self
772                            .catalog
773                            .schema(table)
774                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
775                        let mut indices = Vec::new();
776                        for a in update_assignments {
777                            let idx = schema.column_index(&a.field).ok_or_else(|| {
778                                QueryError::ColumnNotFound {
779                                    table: String::new(),
780                                    column: a.field.clone(),
781                                }
782                            })?;
783                            if idx != key_idx {
784                                existing_row[idx] = literal_to_value(&a.value)?;
785                                indices.push(idx);
786                            }
787                        }
788                        indices
789                    };
790                    self.catalog
791                        .update_hinted(table, rid, &existing_row, Some(&changed_cols))
792                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
793                    self.view_registry.mark_dependents_dirty(table);
794                    Ok(QueryResult::Modified(1))
795                } else {
796                    // No conflict: insert.
797                    self.catalog
798                        .insert(table, &values)
799                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
800                    self.view_registry.mark_dependents_dirty(table);
801                    Ok(QueryResult::Modified(1))
802                }
803            }
804
805            PlanNode::Update {
806                input,
807                table,
808                assignments,
809            } => {
810                // Mission C Phase 3: resolve assignments against a borrowed
811                // schema, then drop the borrow before the mutation loop.
812                // Try literal-only path first; fall back to per-row expression
813                // evaluation if any assignment contains a non-literal expression
814                // (e.g., `age := .age + 1`).
815                let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
816                    let schema_ref = self
817                        .catalog
818                        .schema(table)
819                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
820                    let indices: Vec<usize> = assignments
821                        .iter()
822                        .map(|a| {
823                            schema_ref.column_index(&a.field).ok_or_else(|| {
824                                QueryError::ColumnNotFound {
825                                    table: String::new(),
826                                    column: a.field.clone(),
827                                }
828                            })
829                        })
830                        .collect::<Result<_, _>>()?;
831                    let vals: Result<Vec<Value>, _> = assignments
832                        .iter()
833                        .map(|a| literal_to_value(&a.value))
834                        .collect();
835                    (indices, vals.ok())
836                };
837                let resolved_assignments: Option<Vec<(usize, Value)>> =
838                    literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
839
840                // Mission C Phase 2: the hint Table::update_hinted needs to
841                // decide whether to read the old row for index diff.
842                let changed_cols: Vec<usize> = col_indices.clone();
843
844                // ── Fused scan+update for Update(Filter(SeqScan)) ────────
845                // Perf sprint: instead of the two-pass collect-RIDs-then-loop
846                // pattern (which pays one ensure_hot per matched row on the
847                // second pass), fuse the predicate evaluation and in-place
848                // byte-level mutation into a single heap walk. Same idea as
849                // the fused scan_delete_matching path for deletes.
850                if let Some(ref resolved_assignments) = resolved_assignments {
851                    if let PlanNode::Filter {
852                        input: inner,
853                        predicate,
854                    } = input.as_ref()
855                    {
856                        if let PlanNode::SeqScan { table: t } = inner.as_ref() {
857                            if t == table {
858                                let fused_result = self.try_fused_scan_update(
859                                    table,
860                                    predicate,
861                                    resolved_assignments,
862                                    &changed_cols,
863                                );
864                                if let Some(result) = fused_result {
865                                    return result;
866                                }
867                            }
868                        }
869                    }
870                }
871
872                // Collect matching RowIds in a single pass.
873                let matching_rids = self.collect_rids_for_mutation(input, table)?;
874
875                // ── Literal-only fast paths ─────────────────────────────
876                if let Some(ref resolved_assignments) = resolved_assignments {
877                    // Mission C Phase 4: in-place byte-patch fast path. If every
878                    // assignment targets a fixed-size non-null column AND none of
879                    // them is indexed, we can skip decode_row / Vec<Value> /
880                    // encode_row_into entirely and patch the row's raw bytes on
881                    // the hot page.
882                    let fast_patch: Option<Vec<FastPatch>> = {
883                        let tbl = self
884                            .catalog
885                            .get_table(table)
886                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
887                        let schema = &tbl.schema;
888                        let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
889                            is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
890                        });
891                        let no_indexed = !resolved_assignments
892                            .iter()
893                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
894
895                        if all_fixed_nonnull && no_indexed {
896                            let layout = RowLayout::new(schema);
897                            let bitmap_size = layout.bitmap_size();
898                            let patches: Vec<FastPatch> = resolved_assignments
899                                .iter()
900                                .map(|(idx, val)| {
901                                    let fixed_off = layout
902                                        .fixed_offset(*idx)
903                                        .expect("is_fixed_size already checked");
904                                    let field_off = 2 + bitmap_size + fixed_off;
905                                    let bytes: FixedBytes = match val {
906                                        Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
907                                        Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
908                                        Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
909                                        Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
910                                        Value::Uuid(v) => FixedBytes::Uuid(*v),
911                                        _ => unreachable!("all_fixed_nonnull guard lied"),
912                                    };
913                                    FastPatch {
914                                        field_off,
915                                        bitmap_byte_off: 2 + idx / 8,
916                                        bit_mask: 1u8 << (idx % 8),
917                                        bytes,
918                                    }
919                                })
920                                .collect();
921                            Some(patches)
922                        } else {
923                            None
924                        }
925                    };
926
927                    if let Some(patches) = fast_patch {
928                        let mut count = 0u64;
929                        for rid in matching_rids {
930                            // Mission B2: WAL-log every patch so crash
931                            // recovery replays the update. Same mutation
932                            // closure as before — the wrapper just sandwiches
933                            // it between a hot-page read and a WAL append.
934                            let ok = self
935                                .catalog
936                                .update_row_bytes_logged(table, rid, |row| {
937                                    for p in &patches {
938                                        row[p.bitmap_byte_off] &= !p.bit_mask;
939                                        let field_bytes = p.bytes.as_slice();
940                                        row[p.field_off..p.field_off + field_bytes.len()]
941                                            .copy_from_slice(field_bytes);
942                                    }
943                                })
944                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
945                            if ok {
946                                count += 1;
947                            }
948                        }
949                        self.view_registry.mark_dependents_dirty(table);
950                        return Ok(QueryResult::Modified(count));
951                    }
952
953                    // Mission C Phase 10: var-column in-place shrink fast path.
954                    let var_fast: Option<(usize, Option<Vec<u8>>)> = {
955                        let tbl = self
956                            .catalog
957                            .get_table(table)
958                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
959                        let schema = &tbl.schema;
960                        let is_single = resolved_assignments.len() == 1;
961                        let is_var_col = is_single
962                            && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
963                        let no_indexed = !resolved_assignments
964                            .iter()
965                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
966
967                        if is_single && is_var_col && no_indexed {
968                            let (idx, val) = &resolved_assignments[0];
969                            let bytes_opt: Option<Vec<u8>> = match val {
970                                Value::Str(s) => Some(s.as_bytes().to_vec()),
971                                Value::Bytes(b) => Some(b.clone()),
972                                Value::Empty => None,
973                                _ => {
974                                    return Err(QueryError::TypeError(format!(
975                                        "cannot assign non-var value to var column '{}'",
976                                        schema.columns[*idx].name
977                                    )))
978                                }
979                            };
980                            Some((*idx, bytes_opt))
981                        } else {
982                            None
983                        }
984                    };
985
986                    if let Some((col_idx, new_bytes_opt)) = var_fast {
987                        let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
988                        let mut count = 0u64;
989                        let mut fallback_rids: Vec<RowId> = Vec::new();
990                        for rid in &matching_rids {
991                            // Mission B2: logged variant so crash recovery
992                            // replays the shrink. On a false return (row
993                            // would have to grow), the rid is pushed to
994                            // `fallback_rids` and the slower `update_hinted`
995                            // path — which is already WAL-logged — picks it up.
996                            let ok = self
997                                .catalog
998                                .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
999                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1000                            if ok {
1001                                count += 1;
1002                            } else {
1003                                fallback_rids.push(*rid);
1004                            }
1005                        }
1006                        for rid in fallback_rids {
1007                            let mut row = match self.catalog.get(table, rid) {
1008                                Some(r) => r,
1009                                None => continue,
1010                            };
1011                            for (idx, val) in resolved_assignments.iter() {
1012                                row[*idx] = val.clone();
1013                            }
1014                            self.catalog
1015                                .update_hinted(table, rid, &row, Some(&changed_cols))
1016                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1017                            count += 1;
1018                        }
1019                        self.view_registry.mark_dependents_dirty(table);
1020                        return Ok(QueryResult::Modified(count));
1021                    }
1022
1023                    // Generic literal path: decode row, apply literal values.
1024                    let mut count = 0u64;
1025                    for rid in matching_rids {
1026                        let mut row = match self.catalog.get(table, rid) {
1027                            Some(r) => r,
1028                            None => continue,
1029                        };
1030                        for (idx, val) in resolved_assignments.iter() {
1031                            row[*idx] = val.clone();
1032                        }
1033                        self.catalog
1034                            .update_hinted(table, rid, &row, Some(&changed_cols))
1035                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1036                        count += 1;
1037                    }
1038                    self.view_registry.mark_dependents_dirty(table);
1039                    return Ok(QueryResult::Modified(count));
1040                } // end if let Some(resolved_assignments)
1041
1042                // ── Expression-based update path ────────────────────────
1043                // At least one assignment contains a non-literal expression
1044                // (e.g., `age := .age + 1`). Evaluate per-row.
1045                let col_names: Vec<String> = {
1046                    let schema_ref = self
1047                        .catalog
1048                        .schema(table)
1049                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1050                    schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1051                };
1052                let mut count = 0u64;
1053                for rid in matching_rids {
1054                    let mut row = match self.catalog.get(table, rid) {
1055                        Some(r) => r,
1056                        None => continue,
1057                    };
1058                    for (i, asgn) in assignments.iter().enumerate() {
1059                        let val = eval_expr(&asgn.value, &row, &col_names);
1060                        row[col_indices[i]] = val;
1061                    }
1062                    self.catalog
1063                        .update_hinted(table, rid, &row, Some(&changed_cols))
1064                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1065                    count += 1;
1066                }
1067                self.view_registry.mark_dependents_dirty(table);
1068                Ok(QueryResult::Modified(count))
1069            }
1070
1071            PlanNode::Delete { input, table } => {
1072                // Mission C Phase 3: no schema clone — collect_rids_for_mutation
1073                // looks up schema internally when it needs one, and the mutation
1074                // loop doesn't need the schema at all.
1075                //
1076                // Mission C Phase 12: route bulk deletes through
1077                // `Catalog::delete_many`, which batches the btree leaf
1078                // compaction and shares one `ensure_hot` per row between
1079                // the index-key extraction and the slot delete. On
1080                // `delete_by_filter` (100K fixture, ~20K matches) that
1081                // removes ~4ms of pure `Vec::remove` memmove from the btree
1082                // maintenance phase.
1083                //
1084                // Mission C Phase 16: for the common `delete where ...`
1085                // shape (Filter(SeqScan)) — and the rarer "delete
1086                // everything" shape (SeqScan) — skip the two-pass
1087                // `collect_rids_for_mutation` + `delete_many` flow entirely.
1088                // The fused `scan_delete_matching` primitive walks the
1089                // heap exactly once, paying one `ensure_hot` per page
1090                // instead of per-row. That closes the last major gap on
1091                // the bench's `delete_by_filter` workload.
1092                if let PlanNode::Filter {
1093                    input: inner,
1094                    predicate,
1095                } = input.as_ref()
1096                {
1097                    if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1098                        if t == table {
1099                            let schema = self
1100                                .catalog
1101                                .schema(table)
1102                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1103                            let columns: Vec<String> =
1104                                schema.columns.iter().map(|c| c.name.clone()).collect();
1105                            let fast = FastLayout::new(schema);
1106                            if let Some(compiled) =
1107                                compile_predicate(predicate, &columns, &fast, schema)
1108                            {
1109                                // Mission B2: logged variant so every
1110                                // matched rid hits the WAL during the
1111                                // single-pass scan. Structure of the
1112                                // fused scan is unchanged — only the
1113                                // hook closure now also appends.
1114                                let count = self
1115                                    .catalog
1116                                    .scan_delete_matching_logged(table, |data| compiled(data))
1117                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1118                                self.view_registry.mark_dependents_dirty(table);
1119                                return Ok(QueryResult::Modified(count));
1120                            }
1121                        }
1122                    }
1123                } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1124                    if t == table {
1125                        // `delete from T` with no predicate — every live
1126                        // row matches. One pass is still the right shape.
1127                        // Mission B2: logged variant — see above.
1128                        let count = self
1129                            .catalog
1130                            .scan_delete_matching_logged(table, |_| true)
1131                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1132                        self.view_registry.mark_dependents_dirty(table);
1133                        return Ok(QueryResult::Modified(count));
1134                    }
1135                }
1136
1137                let matching_rids = self.collect_rids_for_mutation(input, table)?;
1138                let count = self
1139                    .catalog
1140                    .delete_many(table, &matching_rids)
1141                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1142                self.view_registry.mark_dependents_dirty(table);
1143                Ok(QueryResult::Modified(count))
1144            }
1145
1146            PlanNode::AliasScan { table, alias } => {
1147                // Mission E1.2: scan `table` and rename every output column
1148                // to `alias.field`. Used as a join leaf so downstream
1149                // NestedLoopJoin + Filter + Project nodes can resolve
1150                // `Expr::QualifiedField` lookups by direct column-name match.
1151                //
1152                // We don't bother with a fused zero-copy loop here yet — the
1153                // whole join path is nested-loop and correctness-first
1154                // (Phase E1.3 will introduce hash join and at that point we
1155                // can revisit whether to specialise AliasScan).
1156                let schema = self
1157                    .catalog
1158                    .schema(table)
1159                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1160                    .clone();
1161                let columns: Vec<String> = schema
1162                    .columns
1163                    .iter()
1164                    .map(|c| format!("{alias}.{}", c.name))
1165                    .collect();
1166                let rows: Vec<Vec<Value>> = self
1167                    .catalog
1168                    .scan(table)
1169                    .map_err(|e| QueryError::StorageError(e.to_string()))?
1170                    .map(|(_, row)| row)
1171                    .collect();
1172                Ok(QueryResult::Rows { columns, rows })
1173            }
1174
1175            PlanNode::NestedLoopJoin {
1176                left,
1177                right,
1178                on,
1179                kind,
1180            } => {
1181                // Materialise both sides. The executor ships two strategies:
1182                //   1. Hash join (E1.3) — when the `on` predicate is a
1183                //      simple equi-predicate `left_col = right_col`, build a
1184                //      FxHashMap<Value, Vec<row_idx>> over the right side
1185                //      and probe with the left side. O(L + R) instead of
1186                //      O(L × R). Handles Inner and LeftOuter.
1187                //   2. Nested loop (E1.2) — fallback for Cross, non-equi
1188                //      predicates, or `on` expressions that reference
1189                //      either side with something more complex than a
1190                //      QualifiedField.
1191                let left_result = self.execute_plan(left)?;
1192                let right_result = self.execute_plan(right)?;
1193                let (left_columns, left_rows) = match left_result {
1194                    QueryResult::Rows { columns, rows } => (columns, rows),
1195                    _ => return Err("join left side must produce rows".into()),
1196                };
1197                let (right_columns, right_rows) = match right_result {
1198                    QueryResult::Rows { columns, rows } => (columns, rows),
1199                    _ => return Err("join right side must produce rows".into()),
1200                };
1201
1202                // WS2: byte-budget guard on the join build side. Charge both
1203                // materialized inputs before we build the hash table / probe;
1204                // the output is row-capped by check_join_limit below.
1205                self.charge_rows(&left_rows)?;
1206                self.charge_rows(&right_rows)?;
1207
1208                // Hash-join fast path.
1209                if !matches!(kind, JoinKind::Cross) {
1210                    if let Some(pred) = on {
1211                        if let Some((l_idx, r_idx)) =
1212                            try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1213                        {
1214                            let result = hash_join(
1215                                left_columns,
1216                                left_rows,
1217                                right_columns,
1218                                right_rows,
1219                                l_idx,
1220                                r_idx,
1221                                *kind,
1222                            );
1223                            if let QueryResult::Rows { ref rows, .. } = result {
1224                                check_join_limit(rows.len())?;
1225                            }
1226                            return Ok(result);
1227                        }
1228                    }
1229                }
1230
1231                // Nested-loop fallback.
1232                let n_left = left_columns.len();
1233                let n_right = right_columns.len();
1234                let mut columns = Vec::with_capacity(n_left + n_right);
1235                columns.extend(left_columns);
1236                columns.extend(right_columns);
1237
1238                let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1239                let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1240
1241                for left_row in &left_rows {
1242                    let mut matched = false;
1243                    for right_row in &right_rows {
1244                        combined.clear();
1245                        combined.extend_from_slice(left_row);
1246                        combined.extend_from_slice(right_row);
1247                        let keep = match kind {
1248                            JoinKind::Cross => true,
1249                            JoinKind::Inner | JoinKind::LeftOuter => match on {
1250                                Some(pred) => eval_predicate(pred, &combined, &columns),
1251                                // Missing `on` for non-cross joins is a
1252                                // parser error, but if it slips through we
1253                                // treat it as "match everything".
1254                                None => true,
1255                            },
1256                            // RightOuter is rewritten to LeftOuter by the
1257                            // planner, so we never see it here.
1258                            JoinKind::RightOuter => {
1259                                unreachable!("planner rewrites RightOuter to LeftOuter")
1260                            }
1261                        };
1262                        if keep {
1263                            rows.push(combined.clone());
1264                            check_join_limit(rows.len())?;
1265                            matched = true;
1266                        }
1267                    }
1268                    if !matched && matches!(kind, JoinKind::LeftOuter) {
1269                        let mut row = Vec::with_capacity(n_left + n_right);
1270                        row.extend_from_slice(left_row);
1271                        row.resize(n_left + n_right, Value::Empty);
1272                        rows.push(row);
1273                        check_join_limit(rows.len())?;
1274                    }
1275                }
1276
1277                Ok(QueryResult::Rows { columns, rows })
1278            }
1279
1280            PlanNode::Distinct { input } => {
1281                let result = self.execute_plan(input)?;
1282                match result {
1283                    QueryResult::Rows { columns, rows } => {
1284                        let mut seen = std::collections::HashSet::new();
1285                        let mut unique_rows = Vec::new();
1286                        for row in rows {
1287                            if seen.insert(row.clone()) {
1288                                unique_rows.push(row);
1289                            }
1290                        }
1291                        Ok(QueryResult::Rows {
1292                            columns,
1293                            rows: unique_rows,
1294                        })
1295                    }
1296                    other => Ok(other),
1297                }
1298            }
1299
1300            PlanNode::GroupBy {
1301                input,
1302                keys,
1303                aggregates,
1304                having,
1305            } => {
1306                let result = self.execute_plan(input)?;
1307                match result {
1308                    QueryResult::Rows { columns, rows } => {
1309                        // WS2: byte-budget guard on the GROUP BY input buffer
1310                        // (the hash table is bounded by the input it groups).
1311                        self.charge_rows(&rows)?;
1312                        // Resolve key column indices.
1313                        let key_indices: Vec<usize> = keys
1314                            .iter()
1315                            .map(|k| {
1316                                columns
1317                                    .iter()
1318                                    .position(|c| c == k)
1319                                    .ok_or_else(|| format!("group-by column '{k}' not found"))
1320                            })
1321                            .collect::<Result<Vec<_>, _>>()?;
1322
1323                        // Resolve aggregate field indices. count(*) uses
1324                        // sentinel usize::MAX — compute_group_aggregate
1325                        // treats it as "count all rows in the group".
1326                        let agg_field_indices: Vec<usize> = aggregates
1327                            .iter()
1328                            .map(|a| {
1329                                if a.field == "*" {
1330                                    Ok(usize::MAX)
1331                                } else {
1332                                    columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1333                                        format!("aggregate column '{}' not found", a.field)
1334                                    })
1335                                }
1336                            })
1337                            .collect::<Result<Vec<_>, _>>()?;
1338
1339                        // Group rows by key values (preserving insertion order).
1340                        let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1341                            rustc_hash::FxHashMap::default();
1342                        let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1343                        for (ri, row) in rows.iter().enumerate() {
1344                            let key: Vec<Value> =
1345                                key_indices.iter().map(|&i| row[i].clone()).collect();
1346                            match group_map.get(&key) {
1347                                Some(&idx) => groups[idx].1.push(ri),
1348                                None => {
1349                                    let idx = groups.len();
1350                                    group_map.insert(key.clone(), idx);
1351                                    groups.push((key, vec![ri]));
1352                                }
1353                            }
1354                        }
1355
1356                        // Build output column names: keys ++ aggregate output names.
1357                        let mut out_columns: Vec<String> = keys.clone();
1358                        for agg in aggregates.iter() {
1359                            out_columns.push(agg.output_name.clone());
1360                        }
1361
1362                        // Compute aggregates per group.
1363                        let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1364                        for (key_vals, row_indices) in &groups {
1365                            let mut row = key_vals.clone();
1366                            for (ai, agg) in aggregates.iter().enumerate() {
1367                                let col_idx = agg_field_indices[ai];
1368                                let val = compute_group_aggregate(
1369                                    agg.function,
1370                                    &rows,
1371                                    row_indices,
1372                                    col_idx,
1373                                );
1374                                row.push(val);
1375                            }
1376                            out_rows.push(row);
1377                        }
1378
1379                        // Apply HAVING filter.
1380                        if let Some(having_expr) = having {
1381                            out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1382                        }
1383
1384                        Ok(QueryResult::Rows {
1385                            columns: out_columns,
1386                            rows: out_rows,
1387                        })
1388                    }
1389                    _ => Err("group by requires row input".into()),
1390                }
1391            }
1392
1393            PlanNode::CreateTable { name, fields } => {
1394                let columns: Vec<ColumnDef> = fields
1395                    .iter()
1396                    .enumerate()
1397                    .map(
1398                        |(i, (fname, tname, req))| -> Result<ColumnDef, QueryError> {
1399                            Ok(ColumnDef {
1400                                name: fname.clone(),
1401                                type_id: type_name_to_id(tname).map_err(QueryError::TypeError)?,
1402                                required: *req,
1403                                position: i as u16,
1404                            })
1405                        },
1406                    )
1407                    .collect::<Result<Vec<_>, _>>()?;
1408                let schema = Schema {
1409                    table_name: name.clone(),
1410                    columns,
1411                };
1412                self.catalog
1413                    .create_table(schema)
1414                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1415                Ok(QueryResult::Created(name.clone()))
1416            }
1417
1418            PlanNode::AlterTable { table, action } => match action {
1419                AlterAction::AddColumn {
1420                    name,
1421                    type_name,
1422                    required,
1423                } => {
1424                    let position = self
1425                        .catalog
1426                        .schema(table)
1427                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1428                        .columns
1429                        .len() as u16;
1430                    let col = ColumnDef {
1431                        name: name.clone(),
1432                        type_id: type_name_to_id(type_name).map_err(QueryError::TypeError)?,
1433                        required: *required,
1434                        position,
1435                    };
1436                    self.catalog
1437                        .alter_table_add_column(table, col)
1438                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1439                    Ok(QueryResult::Executed {
1440                        message: format!("column '{name}' added to '{table}'"),
1441                    })
1442                }
1443                AlterAction::DropColumn { name } => {
1444                    self.catalog
1445                        .alter_table_drop_column(table, name)
1446                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1447                    Ok(QueryResult::Executed {
1448                        message: format!("column '{name}' dropped from '{table}'"),
1449                    })
1450                }
1451                AlterAction::AddIndex { column } => {
1452                    self.catalog
1453                        .create_index(table, column)
1454                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1455                    Ok(QueryResult::Executed {
1456                        message: format!("index on '{table}.{column}' created"),
1457                    })
1458                }
1459            },
1460
1461            PlanNode::DropTable { name } => {
1462                self.catalog
1463                    .drop_table(name)
1464                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1465                Ok(QueryResult::Executed {
1466                    message: format!("table '{name}' dropped"),
1467                })
1468            }
1469
1470            PlanNode::CreateView { name, query_text } => {
1471                self.create_view(name, query_text)?;
1472                Ok(QueryResult::Executed {
1473                    message: format!("materialized view '{name}' created"),
1474                })
1475            }
1476
1477            PlanNode::RefreshView { name } => {
1478                self.refresh_view(name)?;
1479                Ok(QueryResult::Executed {
1480                    message: format!("materialized view '{name}' refreshed"),
1481                })
1482            }
1483
1484            PlanNode::DropView { name } => {
1485                self.drop_view(name)?;
1486                Ok(QueryResult::Executed {
1487                    message: format!("materialized view '{name}' dropped"),
1488                })
1489            }
1490
1491            PlanNode::Window { input, windows } => {
1492                let result = self.execute_plan(input)?;
1493                execute_window(result, windows)
1494            }
1495
1496            PlanNode::Union { left, right, all } => {
1497                let left_result = self.execute_plan(left)?;
1498                let right_result = self.execute_plan(right)?;
1499                let (left_cols, left_rows) = match left_result {
1500                    QueryResult::Rows { columns, rows } => (columns, rows),
1501                    _ => return Err("UNION requires query results on left side".into()),
1502                };
1503                let (_, right_rows) = match right_result {
1504                    QueryResult::Rows { columns, rows } => (columns, rows),
1505                    _ => return Err("UNION requires query results on right side".into()),
1506                };
1507                let mut combined = left_rows;
1508                if *all {
1509                    // UNION ALL — just concatenate.
1510                    combined.extend(right_rows);
1511                } else {
1512                    // UNION — deduplicate using the same HashSet approach
1513                    // as DISTINCT. Value already implements Hash + Eq.
1514                    let mut seen = std::collections::HashSet::new();
1515                    for row in &combined {
1516                        seen.insert(row.clone());
1517                    }
1518                    for row in right_rows {
1519                        if seen.insert(row.clone()) {
1520                            combined.push(row);
1521                        }
1522                    }
1523                }
1524                Ok(QueryResult::Rows {
1525                    columns: left_cols,
1526                    rows: combined,
1527                })
1528            }
1529
1530            PlanNode::Explain { input } => {
1531                let text = format_plan_tree(input, 0);
1532                Ok(QueryResult::Rows {
1533                    columns: vec!["plan".to_string()],
1534                    rows: text
1535                        .lines()
1536                        .map(|line| vec![Value::Str(line.to_string())])
1537                        .collect(),
1538                })
1539            }
1540
1541            PlanNode::Begin => {
1542                if self.in_transaction {
1543                    return Err(QueryError::Execution(
1544                        "already in a transaction (nested transactions not supported)".into(),
1545                    ));
1546                }
1547                self.in_transaction = true;
1548                Ok(QueryResult::Executed {
1549                    message: "transaction started".to_string(),
1550                })
1551            }
1552
1553            PlanNode::Commit => {
1554                if !self.in_transaction {
1555                    return Err(QueryError::Execution(
1556                        "no active transaction to commit".into(),
1557                    ));
1558                }
1559                self.in_transaction = false;
1560                self.catalog
1561                    .sync_wal()
1562                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1563                Ok(QueryResult::Executed {
1564                    message: "transaction committed".to_string(),
1565                })
1566            }
1567
1568            PlanNode::Rollback => {
1569                if !self.in_transaction {
1570                    return Err(QueryError::Execution(
1571                        "no active transaction to roll back".into(),
1572                    ));
1573                }
1574                self.in_transaction = false;
1575                self.catalog
1576                    .rollback_to_last_sync()
1577                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1578                if let Ok(mut cache) = self.plan_cache.lock() {
1579                    cache.clear();
1580                }
1581                self.view_registry = ViewRegistry::open(self.catalog.data_dir())
1582                    .unwrap_or_else(|_| ViewRegistry::new(self.catalog.data_dir()));
1583                Ok(QueryResult::Executed {
1584                    message: "transaction rolled back".to_string(),
1585                })
1586            }
1587
1588            PlanNode::IndexScan { table, column, key } => {
1589                let key_value = literal_to_value(key)?;
1590                let tbl = self
1591                    .catalog
1592                    .get_table(table)
1593                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1594                let columns: Vec<String> =
1595                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1596
1597                // Fast path: the table has a B-tree on this column.
1598                // Uses index_lookup_all to return ALL matching rows for
1599                // both unique and non-unique indexes.
1600                if tbl.has_index(column) {
1601                    let rids = tbl.index_lookup_all(column, &key_value);
1602                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1603                    for rid in rids {
1604                        if let Some(data) = tbl.heap.get(rid) {
1605                            rows.push(decode_row(&tbl.schema, &data));
1606                        }
1607                    }
1608                    return Ok(QueryResult::Rows { columns, rows });
1609                }
1610
1611                // Fallback: no index on this column. The planner emits IndexScan
1612                // eagerly (it has no visibility into which columns are indexed
1613                // at plan time), so here we must behave like SeqScan+Filter on
1614                // `.col = literal`: return *all* matching rows, not just the
1615                // first one. A non-indexed column isn't necessarily unique.
1616                // We compile the eq predicate once and stream without any
1617                // per-row decode for non-matching rows.
1618                let schema = &tbl.schema;
1619                let fast = FastLayout::new(schema);
1620                let synth_pred = Expr::BinaryOp(
1621                    Box::new(Expr::Field(column.clone())),
1622                    BinOp::Eq,
1623                    Box::new(key.clone()),
1624                );
1625                if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1626                    // Mission F: skip the first 4 Vec doublings.
1627                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1628                    self.catalog
1629                        .for_each_row_raw(table, |_rid, data| {
1630                            if compiled(data) {
1631                                rows.push(decode_row(schema, data));
1632                            }
1633                        })
1634                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1635                    return Ok(QueryResult::Rows { columns, rows });
1636                }
1637
1638                // Last resort: slow eq-check on materialised rows.
1639                let col_idx =
1640                    schema
1641                        .column_index(column)
1642                        .ok_or_else(|| QueryError::ColumnNotFound {
1643                            table: String::new(),
1644                            column: column.clone(),
1645                        })?;
1646                let rows: Vec<Vec<Value>> = tbl
1647                    .scan()
1648                    .filter_map(|(_, row)| {
1649                        if row[col_idx] == key_value {
1650                            Some(row)
1651                        } else {
1652                            None
1653                        }
1654                    })
1655                    .collect();
1656                Ok(QueryResult::Rows { columns, rows })
1657            }
1658
1659            PlanNode::RangeScan {
1660                table,
1661                column,
1662                start,
1663                end,
1664            } => {
1665                let tbl = self
1666                    .catalog
1667                    .get_table(table)
1668                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1669                let columns: Vec<String> =
1670                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1671                let schema = &tbl.schema;
1672
1673                let start_val = match start {
1674                    Some((expr, _)) => Some(literal_to_value(expr)?),
1675                    None => None,
1676                };
1677                let end_val = match end {
1678                    Some((expr, _)) => Some(literal_to_value(expr)?),
1679                    None => None,
1680                };
1681                let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1682                let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1683
1684                // Range scans only use the btree fast path for unique indexes,
1685                // because non-unique indexes store composite keys (column_value
1686                // + RowId) that don't directly compare against raw column values.
1687                if tbl.is_index_unique(column) == Some(true) {
1688                    if let Some(btree) = tbl.index(column) {
1689                        let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1690                            (Some(s), Some(e)) => btree.range(s, e).collect(),
1691                            (Some(s), None) => btree.range_from(s),
1692                            (None, Some(e)) => btree.range_to(e),
1693                            (None, None) => {
1694                                let rows: Vec<Vec<Value>> =
1695                                    tbl.scan().map(|(_, row)| row).collect();
1696                                return Ok(QueryResult::Rows { columns, rows });
1697                            }
1698                        };
1699                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1700                        for (key, rid) in hits {
1701                            if !start_inclusive {
1702                                if let Some(ref s) = start_val {
1703                                    if &key == s {
1704                                        continue;
1705                                    }
1706                                }
1707                            }
1708                            if !end_inclusive {
1709                                if let Some(ref e) = end_val {
1710                                    if &key == e {
1711                                        continue;
1712                                    }
1713                                }
1714                            }
1715                            if let Some(data) = tbl.heap.get(rid) {
1716                                rows.push(decode_row(schema, &data));
1717                            }
1718                        }
1719                        return Ok(QueryResult::Rows { columns, rows });
1720                    }
1721                }
1722
1723                // Fallback: no index — synthesize range predicate and scan.
1724                let fast = FastLayout::new(schema);
1725                let synth = synthesize_range_predicate(column, start, end);
1726                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1727                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1728                    self.catalog
1729                        .for_each_row_raw(table, |_rid, data| {
1730                            if compiled(data) {
1731                                rows.push(decode_row(schema, data));
1732                            }
1733                        })
1734                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1735                    return Ok(QueryResult::Rows { columns, rows });
1736                }
1737
1738                let col_idx =
1739                    schema
1740                        .column_index(column)
1741                        .ok_or_else(|| QueryError::ColumnNotFound {
1742                            table: String::new(),
1743                            column: column.clone(),
1744                        })?;
1745                let rows: Vec<Vec<Value>> = tbl
1746                    .scan()
1747                    .filter(|(_, row)| {
1748                        range_matches(
1749                            &row[col_idx],
1750                            &start_val,
1751                            start_inclusive,
1752                            &end_val,
1753                            end_inclusive,
1754                        )
1755                    })
1756                    .map(|(_, row)| row)
1757                    .collect();
1758                Ok(QueryResult::Rows { columns, rows })
1759            }
1760        }
1761    }
1762
1763    // ─── Materialized view operations ──────────────────────────────────────
1764
1765    /// Create a materialized view: execute the source query, store results
1766    /// in a new backing table, and register the view.
1767    fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1768        if self.view_registry.is_view(name) {
1769            return Err(QueryError::ViewError(format!(
1770                "materialized view '{name}' already exists"
1771            )));
1772        }
1773        // Execute the source query to get the result set.
1774        let result = self.execute_powql(query_text)?;
1775        let (columns, rows) = match result {
1776            QueryResult::Rows { columns, rows } => (columns, rows),
1777            _ => return Err("view source query must be a SELECT".into()),
1778        };
1779        // Derive a schema for the backing table from the query result columns.
1780        let schema = self.derive_view_schema(name, &columns, &rows);
1781        // Create the backing table and insert the result rows.
1782        self.catalog
1783            .create_table(schema)
1784            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1785        for row in &rows {
1786            self.catalog
1787                .insert(name, row)
1788                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1789        }
1790        // Determine which base tables this view depends on by parsing the query.
1791        let depends_on = self.extract_view_deps(query_text);
1792        self.view_registry
1793            .register(ViewDef {
1794                name: name.to_string(),
1795                query: query_text.to_string(),
1796                depends_on,
1797                dirty: false,
1798            })
1799            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1800        Ok(())
1801    }
1802
1803    /// Refresh a materialized view: re-execute its source query and replace
1804    /// the backing table's contents.
1805    fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1806        let def = self
1807            .view_registry
1808            .get(name)
1809            .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1810        let query_text = def.query.clone();
1811        // Execute the source query.
1812        let result = self.execute_powql(&query_text)?;
1813        let (_columns, rows) = match result {
1814            QueryResult::Rows { columns, rows } => (columns, rows),
1815            _ => return Err("view source query must be a SELECT".into()),
1816        };
1817        // Clear old data and insert fresh results. Mission B2: logged
1818        // variant — view refreshes are a mutation and crash recovery
1819        // must see them.
1820        self.catalog
1821            .scan_delete_matching_logged(name, |_| true)
1822            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1823        for row in &rows {
1824            self.catalog
1825                .insert(name, row)
1826                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1827        }
1828        self.view_registry.mark_clean(name);
1829        Ok(())
1830    }
1831
1832    /// Drop a materialized view: remove the backing table and unregister.
1833    fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1834        if !self.view_registry.is_view(name) {
1835            return Err(QueryError::ViewError(format!(
1836                "materialized view '{name}' not found"
1837            )));
1838        }
1839        self.view_registry
1840            .unregister(name)
1841            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1842        self.catalog
1843            .drop_table(name)
1844            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1845        Ok(())
1846    }
1847
1848    /// Derive a storage `Schema` for a view's backing table from query
1849    /// result column names and the first row's types.
1850    fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1851        use powdb_storage::types::{ColumnDef, TypeId};
1852        let cols: Vec<ColumnDef> = columns
1853            .iter()
1854            .enumerate()
1855            .map(|(i, col_name)| {
1856                let type_id = rows
1857                    .first()
1858                    .and_then(|row| row.get(i))
1859                    .map(|v| v.type_id())
1860                    .unwrap_or(TypeId::Str);
1861                ColumnDef {
1862                    name: col_name.clone(),
1863                    type_id,
1864                    required: false,
1865                    position: i as u16,
1866                }
1867            })
1868            .collect();
1869        Schema {
1870            table_name: name.to_string(),
1871            columns: cols,
1872        }
1873    }
1874
1875    /// Extract base table dependencies from a view's source query by
1876    /// parsing it and collecting the source table name.
1877    fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1878        use crate::parser::parse;
1879        match parse(query_text) {
1880            Ok(Statement::Query(q)) => {
1881                let mut deps = vec![q.source.clone()];
1882                for j in &q.joins {
1883                    deps.push(j.source.clone());
1884                }
1885                deps
1886            }
1887            _ => Vec::new(),
1888        }
1889    }
1890
1891    // ─── Specialized fast paths ─────────────────────────────────────────────
1892    //
1893    // These methods are helpers for the `execute_plan` match arms above.
1894    // Each returns `Ok(Some(result))` when the fast path fires, `Ok(None)`
1895    // when the shape isn't supported (caller falls back to generic code).
1896
1897    /// Aggregate sum/avg/min/max over a single fixed-size i64 column, with
1898    /// an optional compiled filter predicate. Walks raw row bytes — zero
1899    /// per-row allocation. Uses i128 accumulator for sum/avg overflow safety.
1900    pub(super) fn agg_single_col_fast(
1901        &self,
1902        table: &str,
1903        col: &str,
1904        function: AggFunc,
1905        predicate: Option<&Expr>,
1906    ) -> Result<Option<QueryResult>, QueryError> {
1907        let schema = self
1908            .catalog
1909            .schema(table)
1910            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1911            .clone();
1912        let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1913        let col_idx = match schema.column_index(col) {
1914            Some(i) => i,
1915            None => return Ok(None),
1916        };
1917        // Only fast-path fixed-size numeric columns (Int/Float) for
1918        // sum/avg/min/max/count. Mission D10: Float parity — prior version
1919        // bailed on Float columns, forcing them through the generic row-
1920        // decoding path that allocated a Vec<Value> per row and dispatched
1921        // on Value::cmp for every compare. f64 decode is structurally the
1922        // same as i64 (load 8 bytes, cast), so the fast path handles both.
1923        let col_type = schema.columns[col_idx].type_id;
1924        if col_type != TypeId::Int && col_type != TypeId::Float {
1925            return Ok(None);
1926        }
1927
1928        let fast = FastLayout::new(&schema);
1929        // Mission C Phase 20b: inline the numeric-column reader instead of
1930        // building a `Box<dyn Fn>`. Eliminates 100K vtable dispatches per
1931        // 100K-row agg scan — every reader call folds directly into the
1932        // hot loop below.
1933        let byte_offset = match fast.fixed_offsets[col_idx] {
1934            Some(o) => o,
1935            None => return Ok(None),
1936        };
1937        let bitmap_byte = col_idx / 8;
1938        let bitmap_bit = (col_idx % 8) as u32;
1939        let data_offset = 2 + fast.bitmap_size + byte_offset;
1940
1941        // Optional compiled filter.
1942        let compiled_pred: Option<CompiledPredicate> = match predicate {
1943            Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
1944                Some(c) => Some(c),
1945                None => return Ok(None), // let generic path handle it
1946            },
1947            None => None,
1948        };
1949
1950        // Mission C Phase 20b: specialize the inner loop per aggregate
1951        // function. The previous version ran a `match function { ... }`
1952        // *inside* the closure, which kept LLVM from producing optimal
1953        // scalar code for each variant (agg_max regressed ~23% vs the
1954        // baseline Box<dyn Fn> version even though per-row vtable cost
1955        // should have been strictly lower). Pushing the match out of the
1956        // hot loop lets each specialized body fold cleanly into
1957        // `for_each_row_raw` and removes a captured `AggFunc` + match
1958        // dispatch per row.
1959        //
1960        // Mission D10: same specialisation applies to the Float branch.
1961        // For Min/Max we use `f64::total_cmp` so the result matches
1962        // `Value::Ord` — this is the same ordering ORDER BY and the
1963        // top-N sort fast path use, keeping semantics consistent across
1964        // read paths (NaN compares as greatest, -0.0 < +0.0 for
1965        // deterministic tie-breaking).
1966        //
1967        // Mission D11 Phase 1: each inner loop now splits on presence of
1968        // a predicate (`if let Some(pred) = &compiled_pred`) so the hot
1969        // body never re-tests `Option` per row, and reads column bytes
1970        // via `read_i64_unchecked` / `read_f64_unchecked` helpers that
1971        // drop two bounds checks per row (null bitmap byte + value
1972        // slice). Safety is carried by the `FastLayout` invariant that
1973        // `data_offset + 8 <= row_len` for any fixed-size column; see
1974        // the helper doc comments. Hot loops are macro-generated so the
1975        // with-pred / no-pred split can't drift between variants.
1976        let result = match col_type {
1977            TypeId::Int => match function {
1978                AggFunc::Sum | AggFunc::Avg => {
1979                    let mut sum_i128: i128 = 0;
1980                    let mut count: i64 = 0;
1981                    agg_int_loop!(
1982                        self,
1983                        table,
1984                        compiled_pred,
1985                        bitmap_byte,
1986                        bitmap_bit,
1987                        data_offset,
1988                        |v: i64| {
1989                            count += 1;
1990                            sum_i128 += v as i128;
1991                        }
1992                    );
1993                    if matches!(function, AggFunc::Sum) {
1994                        let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
1995                        QueryResult::Scalar(Value::Int(clamped))
1996                    } else if count == 0 {
1997                        QueryResult::Scalar(Value::Empty)
1998                    } else {
1999                        let avg = (sum_i128 as f64) / (count as f64);
2000                        QueryResult::Scalar(Value::Float(avg))
2001                    }
2002                }
2003                AggFunc::Min => {
2004                    let mut min_v: Option<i64> = None;
2005                    agg_int_loop!(
2006                        self,
2007                        table,
2008                        compiled_pred,
2009                        bitmap_byte,
2010                        bitmap_bit,
2011                        data_offset,
2012                        |v: i64| {
2013                            min_v = Some(match min_v {
2014                                Some(m) => m.min(v),
2015                                None => v,
2016                            });
2017                        }
2018                    );
2019                    QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
2020                }
2021                AggFunc::Max => {
2022                    let mut max_v: Option<i64> = None;
2023                    agg_int_loop!(
2024                        self,
2025                        table,
2026                        compiled_pred,
2027                        bitmap_byte,
2028                        bitmap_bit,
2029                        data_offset,
2030                        |v: i64| {
2031                            max_v = Some(match max_v {
2032                                Some(m) => m.max(v),
2033                                None => v,
2034                            });
2035                        }
2036                    );
2037                    QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
2038                }
2039                AggFunc::Count => {
2040                    let mut count: i64 = 0;
2041                    agg_int_loop!(
2042                        self,
2043                        table,
2044                        compiled_pred,
2045                        bitmap_byte,
2046                        bitmap_bit,
2047                        data_offset,
2048                        |_v: i64| {
2049                            count += 1;
2050                        }
2051                    );
2052                    QueryResult::Scalar(Value::Int(count))
2053                }
2054                AggFunc::CountDistinct => {
2055                    let mut seen = rustc_hash::FxHashSet::default();
2056                    agg_int_loop!(
2057                        self,
2058                        table,
2059                        compiled_pred,
2060                        bitmap_byte,
2061                        bitmap_bit,
2062                        data_offset,
2063                        |v: i64| {
2064                            seen.insert(v);
2065                        }
2066                    );
2067                    QueryResult::Scalar(Value::Int(seen.len() as i64))
2068                }
2069            },
2070            TypeId::Float => match function {
2071                AggFunc::Sum => {
2072                    // Use a single f64 accumulator. Naive summation is
2073                    // sufficient for MVP parity; if precision becomes an
2074                    // issue on long scans we can upgrade to Kahan–Neumaier
2075                    // compensated sum (~2x scalar cost, zero error growth).
2076                    let mut sum: f64 = 0.0;
2077                    agg_float_loop!(
2078                        self,
2079                        table,
2080                        compiled_pred,
2081                        bitmap_byte,
2082                        bitmap_bit,
2083                        data_offset,
2084                        |v: f64| {
2085                            sum += v;
2086                        }
2087                    );
2088                    QueryResult::Scalar(Value::Float(sum))
2089                }
2090                AggFunc::Avg => {
2091                    let mut sum: f64 = 0.0;
2092                    let mut count: i64 = 0;
2093                    agg_float_loop!(
2094                        self,
2095                        table,
2096                        compiled_pred,
2097                        bitmap_byte,
2098                        bitmap_bit,
2099                        data_offset,
2100                        |v: f64| {
2101                            sum += v;
2102                            count += 1;
2103                        }
2104                    );
2105                    if count == 0 {
2106                        QueryResult::Scalar(Value::Empty)
2107                    } else {
2108                        QueryResult::Scalar(Value::Float(sum / count as f64))
2109                    }
2110                }
2111                AggFunc::Min => {
2112                    // `total_cmp` for deterministic NaN handling (matches
2113                    // Value::Ord). NaN compares greatest, so Min will
2114                    // correctly ignore it in favour of any finite value.
2115                    let mut min_v: Option<f64> = None;
2116                    agg_float_loop!(
2117                        self,
2118                        table,
2119                        compiled_pred,
2120                        bitmap_byte,
2121                        bitmap_bit,
2122                        data_offset,
2123                        |v: f64| {
2124                            min_v = Some(match min_v {
2125                                Some(m) => {
2126                                    if v.total_cmp(&m).is_lt() {
2127                                        v
2128                                    } else {
2129                                        m
2130                                    }
2131                                }
2132                                None => v,
2133                            });
2134                        }
2135                    );
2136                    QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2137                }
2138                AggFunc::Max => {
2139                    let mut max_v: Option<f64> = None;
2140                    agg_float_loop!(
2141                        self,
2142                        table,
2143                        compiled_pred,
2144                        bitmap_byte,
2145                        bitmap_bit,
2146                        data_offset,
2147                        |v: f64| {
2148                            max_v = Some(match max_v {
2149                                Some(m) => {
2150                                    if v.total_cmp(&m).is_gt() {
2151                                        v
2152                                    } else {
2153                                        m
2154                                    }
2155                                }
2156                                None => v,
2157                            });
2158                        }
2159                    );
2160                    QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2161                }
2162                AggFunc::Count => {
2163                    let mut count: i64 = 0;
2164                    agg_float_loop!(
2165                        self,
2166                        table,
2167                        compiled_pred,
2168                        bitmap_byte,
2169                        bitmap_bit,
2170                        data_offset,
2171                        |_v: f64| {
2172                            count += 1;
2173                        }
2174                    );
2175                    QueryResult::Scalar(Value::Int(count))
2176                }
2177                AggFunc::CountDistinct => {
2178                    // Hash on `f64::to_bits` — matches `Value::Hash`, so
2179                    // distinct NaN bit patterns count as distinct and
2180                    // -0.0/+0.0 count as distinct. Consistent with how
2181                    // Float values are hashed in every other DISTINCT /
2182                    // GROUP BY path.
2183                    let mut seen = rustc_hash::FxHashSet::default();
2184                    agg_float_loop!(
2185                        self,
2186                        table,
2187                        compiled_pred,
2188                        bitmap_byte,
2189                        bitmap_bit,
2190                        data_offset,
2191                        |v: f64| {
2192                            seen.insert(v.to_bits());
2193                        }
2194                    );
2195                    QueryResult::Scalar(Value::Int(seen.len() as i64))
2196                }
2197            },
2198            _ => unreachable!("type guard above restricts to Int/Float"),
2199        };
2200        Ok(Some(result))
2201    }
2202
2203    /// `Project(Limit(Filter(SeqScan)))` and `Project(Limit(SeqScan))`.
2204    /// Streams rows, decodes only projected columns, stops at the limit.
2205    pub(super) fn project_filter_limit_fast(
2206        &self,
2207        table: &str,
2208        fields: &[ProjectField],
2209        limit: usize,
2210        predicate: Option<&Expr>,
2211    ) -> Result<Option<QueryResult>, QueryError> {
2212        let schema = self
2213            .catalog
2214            .schema(table)
2215            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2216            .clone();
2217        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2218
2219        // Each projection field must be a simple `.field` reference for this
2220        // fast path. Aliased or computed fields fall through.
2221        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2222        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2223        for f in fields {
2224            let name = match &f.expr {
2225                Expr::Field(n) => n.clone(),
2226                _ => return Ok(None),
2227            };
2228            let idx = match all_columns.iter().position(|c| c == &name) {
2229                Some(i) => i,
2230                None => return Ok(None),
2231            };
2232            proj_indices.push(idx);
2233            proj_columns.push(f.alias.clone().unwrap_or(name));
2234        }
2235
2236        let fast = FastLayout::new(&schema);
2237        let row_layout = RowLayout::new(&schema);
2238
2239        let compiled_pred: Option<CompiledPredicate> = match predicate {
2240            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2241                Some(c) => Some(c),
2242                None => return Ok(None),
2243            },
2244            None => None,
2245        };
2246
2247        let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2248        // Mission D2: use try_for_each_row_raw to actually stop iterating
2249        // once the limit is reached. The previous `done` flag only short-
2250        // circuited the closure body, so a `limit 100` over 100K rows still
2251        // walked all 100K slots — burning ~30x SQLite on scan_filter_project_top100.
2252        self.catalog
2253            .try_for_each_row_raw(table, |_rid, data| {
2254                use std::ops::ControlFlow;
2255                if let Some(ref pred) = compiled_pred {
2256                    if !pred(data) {
2257                        return ControlFlow::Continue(());
2258                    }
2259                }
2260                let row: Vec<Value> = proj_indices
2261                    .iter()
2262                    .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2263                    .collect();
2264                out.push(row);
2265                if out.len() >= limit {
2266                    ControlFlow::Break(())
2267                } else {
2268                    ControlFlow::Continue(())
2269                }
2270            })
2271            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2272
2273        Ok(Some(QueryResult::Rows {
2274            columns: proj_columns,
2275            rows: out,
2276        }))
2277    }
2278
2279    /// `Project(Limit(Sort(Filter(SeqScan))))` and `Project(Limit(Sort(SeqScan)))`.
2280    /// Bounded top-N heap over the sort key. Only the sort key needs to be
2281    /// read per row; projected columns are decoded only for the final
2282    /// winning rows when the heap drains.
2283    pub(super) fn project_filter_sort_limit_fast(
2284        &self,
2285        table: &str,
2286        fields: &[ProjectField],
2287        sort_field: &str,
2288        descending: bool,
2289        limit: usize,
2290        predicate: Option<&Expr>,
2291    ) -> Result<Option<QueryResult>, QueryError> {
2292        if limit == 0 {
2293            // Degenerate case — empty result. Let the generic path handle it
2294            // for proper column naming.
2295            return Ok(None);
2296        }
2297        let schema = self
2298            .catalog
2299            .schema(table)
2300            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2301            .clone();
2302        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2303
2304        // Sort key must be a fixed-size numeric column (Int or Float).
2305        // Mission D10: extended from Int-only. Float sort keys use a
2306        // sortable-u64 transform (see `f64_to_sortable_u64`) so the heap
2307        // path stays keyed on `u64` and the whole branch shape is
2308        // identical to the Int case — no new heap types, no `total_cmp`
2309        // closures in the hot loop.
2310        let sort_idx = match schema.column_index(sort_field) {
2311            Some(i) => i,
2312            None => return Ok(None),
2313        };
2314        let sort_col_type = schema.columns[sort_idx].type_id;
2315        if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2316            return Ok(None);
2317        }
2318
2319        // Each projection field must be a simple `.field`.
2320        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2321        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2322        for f in fields {
2323            let name = match &f.expr {
2324                Expr::Field(n) => n.clone(),
2325                _ => return Ok(None),
2326            };
2327            let idx = match all_columns.iter().position(|c| c == &name) {
2328                Some(i) => i,
2329                None => return Ok(None),
2330            };
2331            proj_indices.push(idx);
2332            proj_columns.push(f.alias.clone().unwrap_or(name));
2333        }
2334
2335        let fast = FastLayout::new(&schema);
2336        let row_layout = RowLayout::new(&schema);
2337        // Mission C Phase 20b: inline numeric-column reader (no Box<dyn Fn>).
2338        let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2339            Some(o) => o,
2340            None => return Ok(None),
2341        };
2342        let sort_bitmap_byte = sort_idx / 8;
2343        let sort_bitmap_bit = (sort_idx % 8) as u32;
2344        let sort_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2345
2346        let compiled_pred: Option<CompiledPredicate> = match predicate {
2347            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2348                Some(c) => Some(c),
2349                None => return Ok(None),
2350            },
2351            None => None,
2352        };
2353
2354        // Bounded top-N heap. For `order .x desc limit N`, we want the N
2355        // largest values — use a min-heap so the smallest is at the top and
2356        // can be popped when a better candidate arrives. For ascending, use
2357        // a max-heap. We tie-break with a monotonic `seq` counter so the
2358        // result is deterministic and stable.
2359        //
2360        // To keep this simple we maintain two typed heaps and pick by
2361        // direction.
2362        let drained: Vec<Vec<u8>> = match sort_col_type {
2363            TypeId::Int => {
2364                let mut seq: u64 = 0;
2365                let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2366                    BinaryHeap::with_capacity(limit);
2367                let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2368                    BinaryHeap::with_capacity(limit);
2369
2370                self.catalog
2371                    .for_each_row_raw(table, |_rid, data| {
2372                        if let Some(ref pred) = compiled_pred {
2373                            if !pred(data) {
2374                                return;
2375                            }
2376                        }
2377                        // Inlined int-column reader: null check + i64 decode.
2378                        if data.len() < sort_data_offset + 8 {
2379                            return;
2380                        }
2381                        let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2382                        if is_null {
2383                            return;
2384                        }
2385                        let key = i64::from_le_bytes(
2386                            data[sort_data_offset..sort_data_offset + 8]
2387                                .try_into()
2388                                .unwrap_or_else(|_| unreachable!()),
2389                        );
2390                        let id = seq;
2391                        seq += 1;
2392
2393                        if descending {
2394                            if heap_desc.len() < limit {
2395                                heap_desc.push(Reverse((key, id, data.to_vec())));
2396                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2397                                if key > *top_key {
2398                                    heap_desc.pop();
2399                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2400                                }
2401                            }
2402                        } else if heap_asc.len() < limit {
2403                            heap_asc.push((key, id, data.to_vec()));
2404                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2405                            if key < *top_key {
2406                                heap_asc.pop();
2407                                heap_asc.push((key, id, data.to_vec()));
2408                            }
2409                        }
2410                    })
2411                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2412
2413                let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2414                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2415                } else {
2416                    heap_asc.into_iter().collect()
2417                };
2418                if descending {
2419                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2420                } else {
2421                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2422                }
2423                drained.into_iter().map(|(_, _, d)| d).collect()
2424            }
2425            TypeId::Float => {
2426                // Novel angle: rather than introducing a `TotalF64` newtype
2427                // with `Ord via total_cmp`, transform the f64 bit pattern
2428                // into a sortable `u64` so `BinaryHeap<u64>` orders exactly
2429                // like `f64::total_cmp` would. Classic trick: flip the sign
2430                // bit on positives, flip all bits on negatives. Result:
2431                // - NaN (sign=0) stays greatest, matching total_cmp
2432                // - -0.0 sorts before +0.0, matching total_cmp
2433                // - Hot loop is branch-cheap (one compare + one xor)
2434                let mut seq: u64 = 0;
2435                let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2436                    BinaryHeap::with_capacity(limit);
2437                let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2438                    BinaryHeap::with_capacity(limit);
2439
2440                self.catalog
2441                    .for_each_row_raw(table, |_rid, data| {
2442                        if let Some(ref pred) = compiled_pred {
2443                            if !pred(data) {
2444                                return;
2445                            }
2446                        }
2447                        if data.len() < sort_data_offset + 8 {
2448                            return;
2449                        }
2450                        let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2451                        if is_null {
2452                            return;
2453                        }
2454                        let bits = u64::from_le_bytes(
2455                            data[sort_data_offset..sort_data_offset + 8]
2456                                .try_into()
2457                                .unwrap_or_else(|_| unreachable!()),
2458                        );
2459                        let key = f64_bits_to_sortable_u64(bits);
2460                        let id = seq;
2461                        seq += 1;
2462
2463                        if descending {
2464                            if heap_desc.len() < limit {
2465                                heap_desc.push(Reverse((key, id, data.to_vec())));
2466                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2467                                if key > *top_key {
2468                                    heap_desc.pop();
2469                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2470                                }
2471                            }
2472                        } else if heap_asc.len() < limit {
2473                            heap_asc.push((key, id, data.to_vec()));
2474                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2475                            if key < *top_key {
2476                                heap_asc.pop();
2477                                heap_asc.push((key, id, data.to_vec()));
2478                            }
2479                        }
2480                    })
2481                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2482
2483                let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2484                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2485                } else {
2486                    heap_asc.into_iter().collect()
2487                };
2488                if descending {
2489                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2490                } else {
2491                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2492                }
2493                drained.into_iter().map(|(_, _, d)| d).collect()
2494            }
2495            _ => unreachable!("type guard above restricts to Int/Float"),
2496        };
2497
2498        let rows: Vec<Vec<Value>> = drained
2499            .into_iter()
2500            .map(|data| {
2501                proj_indices
2502                    .iter()
2503                    .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2504                    .collect()
2505            })
2506            .collect();
2507
2508        Ok(Some(QueryResult::Rows {
2509            columns: proj_columns,
2510            rows,
2511        }))
2512    }
2513
2514    /// Gather the RowIds that a mutation should operate on, without
2515    /// materialising the full row set. Handles the shapes the planner emits
2516    /// for update/delete: SeqScan, IndexScan, and Filter(SeqScan). Other
2517    /// shapes fall back to `generic_rid_match`.
2518    ///
2519    /// Perf sprint: try to fuse the predicate evaluation and in-place
2520    /// byte-level mutation into a single heap walk. Returns `Some(result)`
2521    /// if the fused path fired, `None` to fall through to the generic
2522    /// two-pass code.
2523    ///
2524    /// Covers two shapes:
2525    /// 1. Fixed-width non-null literal assignments on non-indexed columns
2526    ///    → byte-patch every matched row in place (row length unchanged).
2527    /// 2. Single var-col literal assignment on a non-indexed column
2528    ///    → `patch_var_column_in_place` on every matched row (may shrink);
2529    ///    rows that can't be patched in place are collected for fallback.
2530    fn try_fused_scan_update(
2531        &mut self,
2532        table: &str,
2533        predicate: &Expr,
2534        resolved: &[(usize, Value)],
2535        changed_cols: &[usize],
2536    ) -> Option<Result<QueryResult, QueryError>> {
2537        // Build compiled predicate. Requires a schema borrow that must be
2538        // dropped before we call scan_patch_matching_logged.
2539        let compiled = {
2540            let schema = self.catalog.schema(table)?;
2541            let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2542            let fast = FastLayout::new(schema);
2543            compile_predicate(predicate, &columns, &fast, schema)?
2544        };
2545
2546        // ── Path 1: fixed-width fast patch ──────────────────────────
2547        let fixed_patches: Option<Vec<FastPatch>> = {
2548            let tbl = self.catalog.get_table(table)?;
2549            let schema = &tbl.schema;
2550            let all_fixed_nonnull = resolved
2551                .iter()
2552                .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2553            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2554            if all_fixed_nonnull && no_indexed {
2555                let layout = RowLayout::new(schema);
2556                let bitmap_size = layout.bitmap_size();
2557                Some(
2558                    resolved
2559                        .iter()
2560                        .map(|(idx, val)| {
2561                            let fixed_off = layout
2562                                .fixed_offset(*idx)
2563                                .expect("is_fixed_size already checked");
2564                            let field_off = 2 + bitmap_size + fixed_off;
2565                            let bytes: FixedBytes = match val {
2566                                Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2567                                Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2568                                Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2569                                Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2570                                Value::Uuid(v) => FixedBytes::Uuid(*v),
2571                                _ => unreachable!("all_fixed_nonnull guard"),
2572                            };
2573                            FastPatch {
2574                                field_off,
2575                                bitmap_byte_off: 2 + idx / 8,
2576                                bit_mask: 1u8 << (idx % 8),
2577                                bytes,
2578                            }
2579                        })
2580                        .collect(),
2581                )
2582            } else {
2583                None
2584            }
2585        };
2586        if let Some(patches) = fixed_patches {
2587            let result = self
2588                .catalog
2589                .scan_patch_matching_logged(table, compiled, |row| {
2590                    for p in &patches {
2591                        row[p.bitmap_byte_off] &= !p.bit_mask;
2592                        let field_bytes = p.bytes.as_slice();
2593                        row[p.field_off..p.field_off + field_bytes.len()]
2594                            .copy_from_slice(field_bytes);
2595                    }
2596                    Some(row.len() as u16)
2597                })
2598                .map_err(|e| e.to_string());
2599            match result {
2600                Ok((count, _)) => {
2601                    self.view_registry.mark_dependents_dirty(table);
2602                    return Some(Ok(QueryResult::Modified(count)));
2603                }
2604                Err(e) => return Some(Err(QueryError::Execution(e))),
2605            }
2606        }
2607
2608        // ── Path 2: single var-col shrink fast patch ────────────────
2609        let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2610            let tbl = self.catalog.get_table(table)?;
2611            let schema = &tbl.schema;
2612            let is_single = resolved.len() == 1;
2613            let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2614            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2615            if is_single && is_var && no_indexed {
2616                let (idx, val) = &resolved[0];
2617                let bytes_opt = match val {
2618                    Value::Str(s) => Some(s.as_bytes().to_vec()),
2619                    Value::Bytes(b) => Some(b.clone()),
2620                    Value::Empty => None,
2621                    _ => return None, // type mismatch, fall through
2622                };
2623                Some((*idx, bytes_opt))
2624            } else {
2625                None
2626            }
2627        };
2628        if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2629            // Build a fresh RowLayout before the mutable borrow.
2630            let layout = {
2631                let schema = self.catalog.schema(table)?;
2632                RowLayout::new(schema)
2633            };
2634            let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2635            let result = self
2636                .catalog
2637                .scan_patch_matching_logged(table, compiled, |row| {
2638                    patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2639                })
2640                .map_err(|e| e.to_string());
2641            match result {
2642                Ok((mut count, fallback_rids)) => {
2643                    // Handle rows where in-place patch failed (new > old).
2644                    for rid in fallback_rids {
2645                        let mut row = match self.catalog.get(table, rid) {
2646                            Some(r) => r,
2647                            None => continue,
2648                        };
2649                        for (idx, val) in resolved.iter() {
2650                            row[*idx] = val.clone();
2651                        }
2652                        self.catalog
2653                            .update_hinted(table, rid, &row, Some(changed_cols))
2654                            .map_err(|e| e.to_string())
2655                            .ok();
2656                        count += 1;
2657                    }
2658                    self.view_registry.mark_dependents_dirty(table);
2659                    return Some(Ok(QueryResult::Modified(count)));
2660                }
2661                Err(e) => return Some(Err(QueryError::Execution(e))),
2662            }
2663        }
2664
2665        None // no fused path applicable — fall through
2666    }
2667
2668    /// Mission C Phase 3: schema is looked up via `self.catalog.schema(table)`
2669    /// inside the branches that actually need it. Previously the caller had
2670    /// to clone the full Schema (6+ String allocs) before every mutation just
2671    /// so this function could borrow it — a cost the update/delete hot path
2672    /// did not need.
2673    fn collect_rids_for_mutation(
2674        &mut self,
2675        input: &PlanNode,
2676        table: &str,
2677    ) -> Result<Vec<RowId>, QueryError> {
2678        match input {
2679            PlanNode::SeqScan { table: t } if t == table => {
2680                // "Update/delete everything" — rare but legal.
2681                let rids: Vec<RowId> = self
2682                    .catalog
2683                    .scan(table)
2684                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2685                    .map(|(rid, _)| rid)
2686                    .collect();
2687                Ok(rids)
2688            }
2689            PlanNode::IndexScan {
2690                table: t,
2691                column,
2692                key,
2693            } if t == table => {
2694                let key_value = literal_to_value(key)?;
2695
2696                // Indexed case: single lookup, 0 or 1 rows.
2697                // Mission D7: int-specialized fast path on int-keyed indexes
2698                // (primary keys, created_at, etc.) — the common case for
2699                // `update_by_pk` / `delete where id = ?`.
2700                //
2701                // Scope the `tbl` borrow so it's released before we fall
2702                // through to the scan-based paths below (which reborrow
2703                // `self.catalog`).
2704                {
2705                    let tbl = self
2706                        .catalog
2707                        .get_table(table)
2708                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2709                    if tbl.has_index(column) {
2710                        let rids = tbl.index_lookup_all(column, &key_value);
2711                        return Ok(rids);
2712                    }
2713                }
2714
2715                // No index: the planner folds `.col = literal` to IndexScan
2716                // regardless of whether the column is actually unique. When
2717                // there's no index we must behave like Filter(SeqScan) and
2718                // return *all* matching RIDs — not just the first one.
2719                let schema = self
2720                    .catalog
2721                    .schema(table)
2722                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2723                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2724                let fast = FastLayout::new(schema);
2725                let synth = Expr::BinaryOp(
2726                    Box::new(Expr::Field(column.clone())),
2727                    BinOp::Eq,
2728                    Box::new(key.clone()),
2729                );
2730                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2731                    // Mission F: skip the first 4 Vec doublings.
2732                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2733                    self.catalog
2734                        .for_each_row_raw(table, |rid, data| {
2735                            if compiled(data) {
2736                                rids.push(rid);
2737                            }
2738                        })
2739                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2740                    return Ok(rids);
2741                }
2742
2743                // Fallback: decode each row, compare values.
2744                let col_idx =
2745                    schema
2746                        .column_index(column)
2747                        .ok_or_else(|| QueryError::ColumnNotFound {
2748                            table: String::new(),
2749                            column: column.clone(),
2750                        })?;
2751                let rids: Vec<RowId> = self
2752                    .catalog
2753                    .scan(table)
2754                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2755                    .filter_map(|(rid, row)| {
2756                        if row[col_idx] == key_value {
2757                            Some(rid)
2758                        } else {
2759                            None
2760                        }
2761                    })
2762                    .collect();
2763                Ok(rids)
2764            }
2765            PlanNode::Filter {
2766                input: inner,
2767                predicate,
2768            } => {
2769                if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2770                    if t != table {
2771                        return self.generic_rid_match(input, table);
2772                    }
2773                    let schema = self
2774                        .catalog
2775                        .schema(table)
2776                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2777                    let columns: Vec<String> =
2778                        schema.columns.iter().map(|c| c.name.clone()).collect();
2779                    let fast = FastLayout::new(schema);
2780                    let row_layout = RowLayout::new(schema);
2781
2782                    // Try compiled predicate first.
2783                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2784                        // Mission F: skip the first 4 Vec doublings.
2785                        let mut rids: Vec<RowId> = Vec::with_capacity(64);
2786                        self.catalog
2787                            .for_each_row_raw(table, |rid, data| {
2788                                if compiled(data) {
2789                                    rids.push(rid);
2790                                }
2791                            })
2792                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2793                        return Ok(rids);
2794                    }
2795
2796                    // Fallback: selective decode + eval.
2797                    let pred_cols = predicate_column_indices(predicate, &columns);
2798                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2799                    self.catalog
2800                        .for_each_row_raw(table, |rid, data| {
2801                            let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2802                            if eval_predicate(predicate, &pred_row, &columns) {
2803                                rids.push(rid);
2804                            }
2805                        })
2806                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2807                    return Ok(rids);
2808                }
2809                self.generic_rid_match(input, table)
2810            }
2811            _ => self.generic_rid_match(input, table),
2812        }
2813    }
2814
2815    /// Last-ditch generic match: execute the plan, collect matching rows,
2816    /// then find corresponding RowIds by value equality. This is the old
2817    /// O(N*M) code path; only used when the plan shape is something exotic.
2818    fn generic_rid_match(
2819        &mut self,
2820        input: &PlanNode,
2821        table: &str,
2822    ) -> Result<Vec<RowId>, QueryError> {
2823        let result = self.execute_plan(input)?;
2824        let rows = match result {
2825            QueryResult::Rows { rows, .. } => rows,
2826            _ => return Err("mutation source must be rows".into()),
2827        };
2828        let matching: Vec<RowId> = self
2829            .catalog
2830            .scan(table)
2831            .map_err(|e| QueryError::StorageError(e.to_string()))?
2832            .filter(|(_, row)| rows.iter().any(|r| r == row))
2833            .map(|(rid, _)| rid)
2834            .collect();
2835        Ok(matching)
2836    }
2837}
2838
2839pub(super) fn execute_window(
2840    result: QueryResult,
2841    windows: &[WindowDef],
2842) -> Result<QueryResult, QueryError> {
2843    let (mut columns, mut rows) = match result {
2844        QueryResult::Rows { columns, rows } => (columns, rows),
2845        _ => return Err("window function requires row input".into()),
2846    };
2847
2848    for wdef in windows {
2849        // Resolve partition/order column indices against current columns.
2850        let part_indices: Vec<usize> = wdef
2851            .partition_by
2852            .iter()
2853            .map(|name| {
2854                columns
2855                    .iter()
2856                    .position(|c| c == name)
2857                    .ok_or_else(|| format!("window partition column '{name}' not found"))
2858            })
2859            .collect::<Result<Vec<_>, _>>()?;
2860
2861        let ord_indices: Vec<(usize, bool)> = wdef
2862            .order_by
2863            .iter()
2864            .map(|sk| {
2865                columns
2866                    .iter()
2867                    .position(|c| c == &sk.field)
2868                    .map(|i| (i, sk.descending))
2869                    .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2870            })
2871            .collect::<Result<Vec<_>, _>>()?;
2872
2873        // Resolve the argument column index (for aggregate windows).
2874        let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2875            match arg {
2876                Expr::Field(name) => {
2877                    if name == "*" {
2878                        None // count(*) style — no specific column
2879                    } else {
2880                        Some(
2881                            columns
2882                                .iter()
2883                                .position(|c| c == name)
2884                                .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2885                        )
2886                    }
2887                }
2888                _ => None,
2889            }
2890        } else {
2891            None
2892        };
2893
2894        // Build a sort-index to sort rows by partition_by then order_by
2895        // without actually reordering the original Vec (we need original
2896        // order to write results back).
2897        let n = rows.len();
2898        let mut indices: Vec<usize> = (0..n).collect();
2899        indices.sort_by(|&a, &b| {
2900            // Compare partition keys first.
2901            for &pi in &part_indices {
2902                let cmp = rows[a][pi].cmp(&rows[b][pi]);
2903                if cmp != std::cmp::Ordering::Equal {
2904                    return cmp;
2905                }
2906            }
2907            // Then order keys.
2908            for &(oi, desc) in &ord_indices {
2909                let cmp = rows[a][oi].cmp(&rows[b][oi]);
2910                if cmp != std::cmp::Ordering::Equal {
2911                    return if desc { cmp.reverse() } else { cmp };
2912                }
2913            }
2914            std::cmp::Ordering::Equal
2915        });
2916
2917        // SQL window-frame semantics: with no `order` clause the frame for an
2918        // aggregate window is the ENTIRE partition, not the running prefix.
2919        // The loop below computes running values; for the no-order case we
2920        // back-fill every row of a partition with the partition's final
2921        // (i.e. complete) aggregate once its boundary is reached. Ranking
2922        // functions are untouched — row_number/rank/dense_rank are inherently
2923        // positional.
2924        let whole_partition_frame = wdef.order_by.is_empty()
2925            && matches!(
2926                wdef.function,
2927                WindowFunc::Sum
2928                    | WindowFunc::Avg
2929                    | WindowFunc::Count
2930                    | WindowFunc::Min
2931                    | WindowFunc::Max
2932            );
2933        // Original row indices of the partition currently being scanned
2934        // (only tracked when back-filling is needed).
2935        let mut partition_row_indices: Vec<usize> = Vec::new();
2936
2937        // Compute window values in sorted order, tracking partition boundaries.
2938        let mut win_values: Vec<Value> = vec![Value::Empty; n];
2939        let mut partition_start = 0usize;
2940        // Running state for aggregate windows:
2941        let mut running_count: i64 = 0;
2942        let mut running_int_sum: i64 = 0;
2943        let mut running_float_sum: f64 = 0.0;
2944        let mut running_saw_float = false;
2945        let mut running_min: Option<Value> = None;
2946        let mut running_max: Option<Value> = None;
2947        let mut rank_counter: i64 = 0;
2948        let mut dense_rank_counter: i64 = 0;
2949        let mut prev_order_key: Option<Vec<Value>> = None;
2950        let mut same_rank_count: i64 = 0;
2951
2952        for sorted_pos in 0..n {
2953            let row_idx = indices[sorted_pos];
2954
2955            // Detect partition boundary.
2956            let new_partition = if sorted_pos == 0 {
2957                true
2958            } else {
2959                let prev_row_idx = indices[sorted_pos - 1];
2960                part_indices
2961                    .iter()
2962                    .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
2963            };
2964
2965            if new_partition {
2966                // No-order aggregate frame: the partition that just ended is
2967                // complete, so its final running value IS the whole-partition
2968                // aggregate. Back-fill it onto every row of that partition.
2969                if whole_partition_frame && sorted_pos > 0 {
2970                    let final_v = win_values[indices[sorted_pos - 1]].clone();
2971                    for ri in partition_row_indices.drain(..) {
2972                        win_values[ri] = final_v.clone();
2973                    }
2974                }
2975                partition_start = sorted_pos;
2976                running_count = 0;
2977                running_int_sum = 0;
2978                running_float_sum = 0.0;
2979                running_saw_float = false;
2980                running_min = None;
2981                running_max = None;
2982                rank_counter = 0;
2983                dense_rank_counter = 0;
2984                prev_order_key = None;
2985                same_rank_count = 0;
2986            }
2987
2988            // Extract current order key for rank tracking.
2989            let current_order_key: Vec<Value> = ord_indices
2990                .iter()
2991                .map(|&(oi, _)| rows[row_idx][oi].clone())
2992                .collect();
2993            let same_as_prev = prev_order_key.as_ref() == Some(&current_order_key);
2994
2995            let value = match wdef.function {
2996                WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
2997                WindowFunc::Rank => {
2998                    if same_as_prev {
2999                        same_rank_count += 1;
3000                    } else {
3001                        rank_counter += same_rank_count + 1;
3002                        same_rank_count = 0;
3003                        if rank_counter == 0 {
3004                            rank_counter = 1;
3005                        }
3006                    }
3007                    Value::Int(rank_counter)
3008                }
3009                WindowFunc::DenseRank => {
3010                    if !same_as_prev {
3011                        dense_rank_counter += 1;
3012                    }
3013                    Value::Int(dense_rank_counter)
3014                }
3015                WindowFunc::Sum => {
3016                    if let Some(ci) = arg_col_idx {
3017                        match &rows[row_idx][ci] {
3018                            Value::Int(v) => running_int_sum += v,
3019                            Value::Float(v) => {
3020                                running_float_sum += v;
3021                                running_saw_float = true;
3022                            }
3023                            _ => {}
3024                        }
3025                    }
3026                    if running_saw_float {
3027                        Value::Float(running_float_sum + running_int_sum as f64)
3028                    } else {
3029                        Value::Int(running_int_sum)
3030                    }
3031                }
3032                WindowFunc::Avg => {
3033                    if let Some(ci) = arg_col_idx {
3034                        match &rows[row_idx][ci] {
3035                            Value::Int(v) => {
3036                                running_float_sum += *v as f64;
3037                                running_count += 1;
3038                            }
3039                            Value::Float(v) => {
3040                                running_float_sum += v;
3041                                running_count += 1;
3042                            }
3043                            _ => {}
3044                        }
3045                    }
3046                    if running_count == 0 {
3047                        Value::Empty
3048                    } else {
3049                        Value::Float(running_float_sum / running_count as f64)
3050                    }
3051                }
3052                WindowFunc::Count => {
3053                    if let Some(ci) = arg_col_idx {
3054                        if !rows[row_idx][ci].is_empty() {
3055                            running_count += 1;
3056                        }
3057                    } else {
3058                        // count(*) — count all rows
3059                        running_count += 1;
3060                    }
3061                    Value::Int(running_count)
3062                }
3063                WindowFunc::Min => {
3064                    if let Some(ci) = arg_col_idx {
3065                        let v = &rows[row_idx][ci];
3066                        if !v.is_empty() {
3067                            running_min = Some(match &running_min {
3068                                None => v.clone(),
3069                                Some(cur) => {
3070                                    if v < cur {
3071                                        v.clone()
3072                                    } else {
3073                                        cur.clone()
3074                                    }
3075                                }
3076                            });
3077                        }
3078                    }
3079                    running_min.clone().unwrap_or(Value::Empty)
3080                }
3081                WindowFunc::Max => {
3082                    if let Some(ci) = arg_col_idx {
3083                        let v = &rows[row_idx][ci];
3084                        if !v.is_empty() {
3085                            running_max = Some(match &running_max {
3086                                None => v.clone(),
3087                                Some(cur) => {
3088                                    if v > cur {
3089                                        v.clone()
3090                                    } else {
3091                                        cur.clone()
3092                                    }
3093                                }
3094                            });
3095                        }
3096                    }
3097                    running_max.clone().unwrap_or(Value::Empty)
3098                }
3099            };
3100
3101            prev_order_key = Some(current_order_key);
3102            win_values[row_idx] = value;
3103            if whole_partition_frame {
3104                partition_row_indices.push(row_idx);
3105            }
3106        }
3107
3108        // Back-fill the final partition (the loop only flushes at boundaries).
3109        if whole_partition_frame && n > 0 {
3110            let final_v = win_values[indices[n - 1]].clone();
3111            for ri in partition_row_indices.drain(..) {
3112                win_values[ri] = final_v.clone();
3113            }
3114        }
3115
3116        // Append the computed window column to each row.
3117        for (ri, row) in rows.iter_mut().enumerate() {
3118            row.push(win_values[ri].clone());
3119        }
3120        columns.push(wdef.output_name.clone());
3121    }
3122
3123    Ok(QueryResult::Rows { columns, rows })
3124}
3125
3126/// Mission E2b: compute one aggregate over a set of rows in a group.
3127pub(super) fn compute_group_aggregate(
3128    func: AggFunc,
3129    all_rows: &[Vec<Value>],
3130    row_indices: &[usize],
3131    col_idx: usize,
3132) -> Value {
3133    match func {
3134        AggFunc::Count => {
3135            if col_idx == usize::MAX {
3136                // count(*) — count all rows in the group.
3137                return Value::Int(row_indices.len() as i64);
3138            }
3139            let count = row_indices
3140                .iter()
3141                .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3142                .count();
3143            Value::Int(count as i64)
3144        }
3145        AggFunc::CountDistinct => {
3146            let mut seen = std::collections::HashSet::new();
3147            for &ri in row_indices {
3148                let v = &all_rows[ri][col_idx];
3149                if !v.is_empty() {
3150                    seen.insert(v.clone());
3151                }
3152            }
3153            Value::Int(seen.len() as i64)
3154        }
3155        AggFunc::Sum => {
3156            // Mirror the scalar Sum path: accumulate int and float
3157            // contributions separately and promote the final result to
3158            // Float if any Float row was observed. Prevents silent
3159            // drop of Float columns in GROUP BY aggregates.
3160            let mut int_sum: i64 = 0;
3161            let mut float_sum: f64 = 0.0;
3162            let mut saw_float = false;
3163            for &ri in row_indices {
3164                match &all_rows[ri][col_idx] {
3165                    Value::Int(v) => int_sum += v,
3166                    Value::Float(v) => {
3167                        float_sum += *v;
3168                        saw_float = true;
3169                    }
3170                    _ => {}
3171                }
3172            }
3173            if saw_float {
3174                Value::Float(float_sum + int_sum as f64)
3175            } else {
3176                Value::Int(int_sum)
3177            }
3178        }
3179        AggFunc::Avg => {
3180            let mut sum = 0.0f64;
3181            let mut count = 0usize;
3182            for &ri in row_indices {
3183                match &all_rows[ri][col_idx] {
3184                    Value::Int(v) => {
3185                        sum += *v as f64;
3186                        count += 1;
3187                    }
3188                    Value::Float(v) => {
3189                        sum += *v;
3190                        count += 1;
3191                    }
3192                    _ => {}
3193                }
3194            }
3195            if count == 0 {
3196                Value::Empty
3197            } else {
3198                Value::Float(sum / count as f64)
3199            }
3200        }
3201        AggFunc::Min => row_indices
3202            .iter()
3203            .map(|&ri| &all_rows[ri][col_idx])
3204            .filter(|v| !v.is_empty())
3205            .min()
3206            .cloned()
3207            .unwrap_or(Value::Empty),
3208        AggFunc::Max => row_indices
3209            .iter()
3210            .map(|&ri| &all_rows[ri][col_idx])
3211            .filter(|v| !v.is_empty())
3212            .max()
3213            .cloned()
3214            .unwrap_or(Value::Empty),
3215    }
3216}
3217
3218/// Mission E1.3: try to extract equi-join key indices from a join `on`
3219/// predicate. Returns `Some((left_col_idx, right_col_idx))` when the
3220/// predicate is exactly `L = R` (or `R = L`) and both sides resolve
3221/// cleanly — `L` to the left subtree's column list and `R` to the right
3222/// subtree's column list.
3223///
3224/// This is deliberately narrow. We only recognise the two shapes:
3225///   * `QualifiedField = QualifiedField`  (`u.id = o.user_id`)
3226///   * `Field = Field`                    (`.id = .user_id`, unqualified)
3227///
3228/// Anything else — conjunctions, constants, function calls, or predicates
3229/// that touch the same side on both halves — falls through to the
3230/// nested-loop path unchanged.
3231pub(super) fn try_extract_equi_join_keys(
3232    pred: &Expr,
3233    left_columns: &[String],
3234    right_columns: &[String],
3235) -> Option<(usize, usize)> {
3236    let (lhs, op, rhs) = match pred {
3237        Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3238        _ => return None,
3239    };
3240    if op != BinOp::Eq {
3241        return None;
3242    }
3243    // Normal orientation: lhs in left, rhs in right.
3244    if let (Some(li), Some(ri)) = (
3245        resolve_side_column(lhs, left_columns),
3246        resolve_side_column(rhs, right_columns),
3247    ) {
3248        return Some((li, ri));
3249    }
3250    // Swapped: rhs in left, lhs in right. Both sides of `=` are
3251    // commutative so this is safe.
3252    if let (Some(li), Some(ri)) = (
3253        resolve_side_column(rhs, left_columns),
3254        resolve_side_column(lhs, right_columns),
3255    ) {
3256        return Some((li, ri));
3257    }
3258    None
3259}
3260
3261fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3262    match expr {
3263        Expr::QualifiedField { qualifier, field } => {
3264            // Byte-level match so we don't allocate a fresh `format!` on
3265            // every call — this runs once per plan, so allocation would be
3266            // cheap, but the match is trivial enough to keep inline with
3267            // the eval_expr version.
3268            let q = qualifier.as_bytes();
3269            let f = field.as_bytes();
3270            columns.iter().position(|c| {
3271                let b = c.as_bytes();
3272                b.len() == q.len() + 1 + f.len()
3273                    && b[..q.len()] == *q
3274                    && b[q.len()] == b'.'
3275                    && b[q.len() + 1..] == *f
3276            })
3277        }
3278        Expr::Field(name) => columns.iter().position(|c| c == name),
3279        _ => None,
3280    }
3281}
3282
3283/// Mission E1.3: O(L + R) hash join. Builds a `FxHashMap<Value, Vec<usize>>`
3284/// over the right (inner) side's join keys, then streams the left (outer)
3285/// side and for each probe row emits every combined row whose right-side
3286/// key matches. For `JoinKind::LeftOuter`, unmatched left rows are emitted
3287/// padded with `Value::Empty` on the right side.
3288///
3289/// The right side is always the build side. That choice is forced for
3290/// LeftOuter (the left side must stream so we can detect orphans), and
3291/// for Inner it's a reasonable default — left-deep plans tend to grow the
3292/// left side with each join, so the un-joined right leaf is often the
3293/// smaller of the two at each level.
3294pub(super) fn hash_join(
3295    left_columns: Vec<String>,
3296    left_rows: Vec<Vec<Value>>,
3297    right_columns: Vec<String>,
3298    right_rows: Vec<Vec<Value>>,
3299    left_key_idx: usize,
3300    right_key_idx: usize,
3301    kind: JoinKind,
3302) -> QueryResult {
3303    use rustc_hash::FxHashMap;
3304
3305    let n_left = left_columns.len();
3306    let n_right = right_columns.len();
3307    let mut columns = Vec::with_capacity(n_left + n_right);
3308    columns.extend(left_columns);
3309    columns.extend(right_columns);
3310
3311    // Build: right_key -> list of right-row indices. Pre-size to the row
3312    // count so the map doesn't rehash mid-build.
3313    let mut build: FxHashMap<Value, Vec<usize>> =
3314        FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3315    for (i, row) in right_rows.iter().enumerate() {
3316        // Skip Empty keys on the build side — they can never match under
3317        // SQL semantics (NULL ≠ NULL) and would collapse all nullables to
3318        // one bucket.
3319        if matches!(row[right_key_idx], Value::Empty) {
3320            continue;
3321        }
3322        build.entry(row[right_key_idx].clone()).or_default().push(i);
3323    }
3324
3325    // Reasonable starting capacity — inner joins produce ≥ left_rows.len()
3326    // rows in the common 1:1 case, left-outer always emits ≥ left_rows.len().
3327    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3328
3329    for left_row in &left_rows {
3330        let key = &left_row[left_key_idx];
3331        let matched = if matches!(key, Value::Empty) {
3332            None
3333        } else {
3334            build.get(key)
3335        };
3336        match matched {
3337            Some(matches) if !matches.is_empty() => {
3338                for &ri in matches {
3339                    let right_row = &right_rows[ri];
3340                    let mut combined = Vec::with_capacity(n_left + n_right);
3341                    combined.extend_from_slice(left_row);
3342                    combined.extend_from_slice(right_row);
3343                    rows.push(combined);
3344                }
3345            }
3346            _ => {
3347                if matches!(kind, JoinKind::LeftOuter) {
3348                    let mut row = Vec::with_capacity(n_left + n_right);
3349                    row.extend_from_slice(left_row);
3350                    row.resize(n_left + n_right, Value::Empty);
3351                    rows.push(row);
3352                }
3353            }
3354        }
3355    }
3356
3357    QueryResult::Rows { columns, rows }
3358}
3359
3360/// Lower unindexed `RangeScan` nodes to `Filter(SeqScan)` so that all
3361/// downstream fast paths (count, project+limit, sort+limit, agg, update,
3362/// delete) continue to fire.
3363///
3364/// The planner emits `RangeScan` speculatively for every range inequality
3365/// (`.age > 30`) because it has no catalog access. When the column has a
3366/// B-tree index, `RangeScan` is the correct plan. When it doesn't, the
3367/// executor's `RangeScan` fallback materialises every matching row with
3368/// full `decode_row` — bypassing the compiled-predicate fast paths that
3369/// `Filter(SeqScan)` would trigger.
3370///
3371/// This pass runs once per query, before execution.
3372pub(super) fn lower_unindexed_range_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3373    match plan {
3374        PlanNode::RangeScan {
3375            table,
3376            column,
3377            start,
3378            end,
3379        } => {
3380            if let Some(tbl) = catalog.get_table(table) {
3381                // Keep RangeScan only for unique indexes — their btree
3382                // stores raw column values. Non-unique indexes store
3383                // composite keys that don't directly compare against
3384                // column values, so lower them to Filter(SeqScan).
3385                if tbl.is_index_unique(column) == Some(true) {
3386                    return plan.clone();
3387                }
3388            }
3389            let pred = synthesize_range_predicate(column, start, end);
3390            PlanNode::Filter {
3391                input: Box::new(PlanNode::SeqScan {
3392                    table: table.clone(),
3393                }),
3394                predicate: pred,
3395            }
3396        }
3397        PlanNode::Filter { input, predicate } => PlanNode::Filter {
3398            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3399            predicate: predicate.clone(),
3400        },
3401        PlanNode::Project { input, fields } => PlanNode::Project {
3402            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3403            fields: fields.clone(),
3404        },
3405        PlanNode::Sort { input, keys } => PlanNode::Sort {
3406            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3407            keys: keys.clone(),
3408        },
3409        PlanNode::Limit { input, count } => PlanNode::Limit {
3410            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3411            count: count.clone(),
3412        },
3413        PlanNode::Offset { input, count } => PlanNode::Offset {
3414            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3415            count: count.clone(),
3416        },
3417        PlanNode::Aggregate {
3418            input,
3419            function,
3420            field,
3421        } => PlanNode::Aggregate {
3422            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3423            function: *function,
3424            field: field.clone(),
3425        },
3426        PlanNode::Distinct { input } => PlanNode::Distinct {
3427            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3428        },
3429        PlanNode::GroupBy {
3430            input,
3431            keys,
3432            aggregates,
3433            having,
3434        } => PlanNode::GroupBy {
3435            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3436            keys: keys.clone(),
3437            aggregates: aggregates.clone(),
3438            having: having.clone(),
3439        },
3440        PlanNode::Update {
3441            input,
3442            table,
3443            assignments,
3444        } => PlanNode::Update {
3445            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3446            table: table.clone(),
3447            assignments: assignments.clone(),
3448        },
3449        PlanNode::Delete { input, table } => PlanNode::Delete {
3450            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3451            table: table.clone(),
3452        },
3453        PlanNode::Window { input, windows } => PlanNode::Window {
3454            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3455            windows: windows.clone(),
3456        },
3457        PlanNode::Union { left, right, all } => PlanNode::Union {
3458            left: Box::new(lower_unindexed_range_scans(catalog, left)),
3459            right: Box::new(lower_unindexed_range_scans(catalog, right)),
3460            all: *all,
3461        },
3462        PlanNode::Explain { input } => PlanNode::Explain {
3463            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3464        },
3465        PlanNode::NestedLoopJoin {
3466            left,
3467            right,
3468            on,
3469            kind,
3470        } => PlanNode::NestedLoopJoin {
3471            left: Box::new(lower_unindexed_range_scans(catalog, left)),
3472            right: Box::new(lower_unindexed_range_scans(catalog, right)),
3473            on: on.clone(),
3474            kind: *kind,
3475        },
3476        // Leaf nodes: no children to recurse into.
3477        _ => plan.clone(),
3478    }
3479}
3480
3481/// Synthesize a range predicate from RangeScan bounds for the fallback path.
3482pub(super) fn synthesize_range_predicate(
3483    column: &str,
3484    start: &Option<(Expr, bool)>,
3485    end: &Option<(Expr, bool)>,
3486) -> Expr {
3487    let lower = start.as_ref().map(|(expr, inclusive)| {
3488        let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3489        Expr::BinaryOp(
3490            Box::new(Expr::Field(column.to_string())),
3491            op,
3492            Box::new(expr.clone()),
3493        )
3494    });
3495    let upper = end.as_ref().map(|(expr, inclusive)| {
3496        let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3497        Expr::BinaryOp(
3498            Box::new(Expr::Field(column.to_string())),
3499            op,
3500            Box::new(expr.clone()),
3501        )
3502    });
3503    match (lower, upper) {
3504        (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3505        (Some(l), None) => l,
3506        (None, Some(u)) => u,
3507        (None, None) => Expr::Literal(Literal::Bool(true)),
3508    }
3509}
3510
3511/// Check if a value falls within a range (used in last-resort decoded-row eval).
3512pub(super) fn range_matches(
3513    val: &Value,
3514    start: &Option<Value>,
3515    start_inc: bool,
3516    end: &Option<Value>,
3517    end_inc: bool,
3518) -> bool {
3519    if let Some(ref s) = start {
3520        if start_inc {
3521            if val < s {
3522                return false;
3523            }
3524        } else if val <= s {
3525            return false;
3526        }
3527    }
3528    if let Some(ref e) = end {
3529        if end_inc {
3530            if val > e {
3531                return false;
3532            }
3533        } else if val >= e {
3534            return false;
3535        }
3536    }
3537    true
3538}
3539
3540/// Format a `PlanNode` tree as a human-readable, indented text
3541/// representation. Used by the `EXPLAIN` command.
3542pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3543    let indent = "  ".repeat(depth);
3544    match plan {
3545        PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3546        PlanNode::AliasScan { table, alias } => {
3547            format!("{indent}AliasScan table={table} alias={alias}")
3548        }
3549        PlanNode::IndexScan { table, column, key } => {
3550            format!("{indent}IndexScan table={table} column={column} key={key:?}")
3551        }
3552        PlanNode::RangeScan {
3553            table,
3554            column,
3555            start,
3556            end,
3557        } => {
3558            let s = match start {
3559                Some((expr, inc)) => {
3560                    let op = if *inc { ">=" } else { ">" };
3561                    format!("{op}{expr:?}")
3562                }
3563                None => "unbounded".to_string(),
3564            };
3565            let e = match end {
3566                Some((expr, inc)) => {
3567                    let op = if *inc { "<=" } else { "<" };
3568                    format!("{op}{expr:?}")
3569                }
3570                None => "unbounded".to_string(),
3571            };
3572            format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3573        }
3574        PlanNode::Filter { input, predicate } => {
3575            let child = format_plan_tree(input, depth + 1);
3576            format!("{indent}Filter predicate={predicate:?}\n{child}")
3577        }
3578        PlanNode::Project { input, fields } => {
3579            let names: Vec<String> = fields
3580                .iter()
3581                .map(|f| match &f.alias {
3582                    Some(a) => format!("{a}: {:?}", f.expr),
3583                    None => format!("{:?}", f.expr),
3584                })
3585                .collect();
3586            let child = format_plan_tree(input, depth + 1);
3587            format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3588        }
3589        PlanNode::Sort { input, keys } => {
3590            let ks: Vec<String> = keys
3591                .iter()
3592                .map(|k| {
3593                    if k.descending {
3594                        format!("{} desc", k.field)
3595                    } else {
3596                        k.field.clone()
3597                    }
3598                })
3599                .collect();
3600            let child = format_plan_tree(input, depth + 1);
3601            format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3602        }
3603        PlanNode::Limit { input, count } => {
3604            let child = format_plan_tree(input, depth + 1);
3605            format!("{indent}Limit count={count:?}\n{child}")
3606        }
3607        PlanNode::Offset { input, count } => {
3608            let child = format_plan_tree(input, depth + 1);
3609            format!("{indent}Offset count={count:?}\n{child}")
3610        }
3611        PlanNode::Aggregate {
3612            input,
3613            function,
3614            field,
3615        } => {
3616            let f = field.as_deref().unwrap_or("*");
3617            let child = format_plan_tree(input, depth + 1);
3618            format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3619        }
3620        PlanNode::NestedLoopJoin {
3621            left,
3622            right,
3623            on,
3624            kind,
3625        } => {
3626            let left_child = format_plan_tree(left, depth + 1);
3627            let right_child = format_plan_tree(right, depth + 1);
3628            let on_str = match on {
3629                Some(pred) => format!("{pred:?}"),
3630                None => "none".to_string(),
3631            };
3632            format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3633        }
3634        PlanNode::Distinct { input } => {
3635            let child = format_plan_tree(input, depth + 1);
3636            format!("{indent}Distinct\n{child}")
3637        }
3638        PlanNode::GroupBy {
3639            input,
3640            keys,
3641            aggregates,
3642            having,
3643        } => {
3644            let agg_strs: Vec<String> = aggregates
3645                .iter()
3646                .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3647                .collect();
3648            let having_str = match having {
3649                Some(h) => format!(" having={h:?}"),
3650                None => String::new(),
3651            };
3652            let child = format_plan_tree(input, depth + 1);
3653            format!(
3654                "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3655                keys.join(", "),
3656                agg_strs.join(", "),
3657            )
3658        }
3659        PlanNode::Insert { table, rows } => {
3660            let cols: Vec<&str> = rows
3661                .first()
3662                .map(|r| r.iter().map(|a| a.field.as_str()).collect())
3663                .unwrap_or_default();
3664            format!(
3665                "{indent}Insert table={table} rows={} cols=[{}]",
3666                rows.len(),
3667                cols.join(", ")
3668            )
3669        }
3670        PlanNode::Upsert {
3671            table,
3672            key_column,
3673            assignments,
3674            on_conflict,
3675        } => {
3676            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3677            let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3678            if conflict_cols.is_empty() {
3679                format!(
3680                    "{indent}Upsert table={table} key={key_column} cols=[{}]",
3681                    cols.join(", ")
3682                )
3683            } else {
3684                format!(
3685                    "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3686                    cols.join(", "),
3687                    conflict_cols.join(", ")
3688                )
3689            }
3690        }
3691        PlanNode::Update {
3692            input,
3693            table,
3694            assignments,
3695        } => {
3696            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3697            let child = format_plan_tree(input, depth + 1);
3698            format!(
3699                "{indent}Update table={table} set=[{}]\n{child}",
3700                cols.join(", ")
3701            )
3702        }
3703        PlanNode::Delete { input, table } => {
3704            let child = format_plan_tree(input, depth + 1);
3705            format!("{indent}Delete table={table}\n{child}")
3706        }
3707        PlanNode::CreateTable { name, fields } => {
3708            let fs: Vec<String> = fields
3709                .iter()
3710                .map(|(n, t, r)| {
3711                    if *r {
3712                        format!("{n}: {t} required")
3713                    } else {
3714                        format!("{n}: {t}")
3715                    }
3716                })
3717                .collect();
3718            format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3719        }
3720        PlanNode::AlterTable { table, action } => {
3721            format!("{indent}AlterTable table={table} action={action:?}")
3722        }
3723        PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3724        PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3725        PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3726        PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3727        PlanNode::Window { input, windows } => {
3728            let ws: Vec<String> = windows
3729                .iter()
3730                .map(|w| format!("{:?} as {}", w.function, w.output_name))
3731                .collect();
3732            let child = format_plan_tree(input, depth + 1);
3733            format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3734        }
3735        PlanNode::Union { left, right, all } => {
3736            let kind = if *all { "UNION ALL" } else { "UNION" };
3737            let left_child = format_plan_tree(left, depth + 1);
3738            let right_child = format_plan_tree(right, depth + 1);
3739            format!("{indent}{kind}\n{left_child}\n{right_child}")
3740        }
3741        PlanNode::Explain { input } => {
3742            let child = format_plan_tree(input, depth + 1);
3743            format!("{indent}Explain\n{child}")
3744        }
3745        PlanNode::Begin => format!("{indent}Begin"),
3746        PlanNode::Commit => format!("{indent}Commit"),
3747        PlanNode::Rollback => format!("{indent}Rollback"),
3748    }
3749}