Skip to main content

powdb_query/executor/
plan_exec.rs

1//! The execute_plan method and associated helpers.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::{check_join_limit, Engine, MAX_SORT_ROWS};
15use powdb_storage::view::{ViewDef, ViewRegistry};
16
17impl Engine {
18    pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
19        match plan {
20            PlanNode::SeqScan { table } => {
21                // Auto-refresh dirty materialized views on read.
22                if self.view_registry.is_dirty(table) {
23                    self.refresh_view(table)?;
24                }
25                let schema = self
26                    .catalog
27                    .schema(table)
28                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
29                    .clone();
30                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
31                let rows: Vec<Vec<Value>> = self
32                    .catalog
33                    .scan(table)
34                    .map_err(|e| QueryError::StorageError(e.to_string()))?
35                    .map(|(_, row)| row)
36                    .collect();
37                Ok(QueryResult::Rows { columns, rows })
38            }
39
40            PlanNode::Filter { input, predicate } => {
41                // Materialize any IN-subqueries in the predicate before the
42                // scan loop — the closure can't call back into the engine.
43                // Correlated subqueries are left in place for per-row eval.
44                let materialized;
45                let predicate = if contains_subquery(predicate) {
46                    materialized = self.materialize_subqueries(predicate)?;
47                    &materialized
48                } else {
49                    predicate
50                };
51
52                // Correlated subquery path: per-row materialisation.
53                if contains_subquery(predicate) {
54                    let result = self.execute_plan(input)?;
55                    return match result {
56                        QueryResult::Rows { columns, rows } => {
57                            let mut filtered = Vec::new();
58                            for row in rows {
59                                let row_pred =
60                                    self.materialize_correlated_for_row(predicate, &row, &columns)?;
61                                if eval_predicate(&row_pred, &row, &columns) {
62                                    filtered.push(row);
63                                }
64                            }
65                            Ok(QueryResult::Rows {
66                                columns,
67                                rows: filtered,
68                            })
69                        }
70                        _ => Err("filter requires row input".into()),
71                    };
72                }
73
74                // Fast path: fuse Filter + SeqScan into a zero-copy streaming
75                // loop. Uses decode_column() to evaluate the predicate on only
76                // the columns it references, avoiding heap allocations for
77                // String/Bytes columns that aren't part of the filter.
78                if let PlanNode::SeqScan { table } = input.as_ref() {
79                    // Auto-refresh dirty materialized views.
80                    if self.view_registry.is_dirty(table) {
81                        self.refresh_view(table)?;
82                    }
83                    let schema = self
84                        .catalog
85                        .schema(table)
86                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
87                        .clone();
88                    let columns: Vec<String> =
89                        schema.columns.iter().map(|c| c.name.clone()).collect();
90                    let fast = FastLayout::new(&schema);
91                    let row_layout = RowLayout::new(&schema);
92                    // Mission F: pre-size to skip the first 4 Vec doublings
93                    // (4 → 8 → 16 → 32 → 64). On a 100K-row scan with 30%
94                    // selectivity that's ~4 fewer reallocations + memcpys.
95                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
96
97                    // Try compiled predicate for the filter check (handles
98                    // int leaves, string-eq leaves, and And conjunctions).
99                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
100                        self.catalog
101                            .for_each_row_raw(table, |_rid, data| {
102                                if compiled(data) {
103                                    rows.push(decode_row(&schema, data));
104                                }
105                            })
106                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
107                    } else {
108                        let pred_cols = predicate_column_indices(predicate, &columns);
109                        self.catalog
110                            .for_each_row_raw(table, |_rid, data| {
111                                let pred_row =
112                                    decode_selective(&schema, &row_layout, data, &pred_cols);
113                                if eval_predicate(predicate, &pred_row, &columns) {
114                                    rows.push(decode_row(&schema, data));
115                                }
116                            })
117                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
118                    }
119
120                    return Ok(QueryResult::Rows { columns, rows });
121                }
122
123                // General path: materialise then filter.
124                let result = self.execute_plan(input)?;
125                match result {
126                    QueryResult::Rows { columns, rows } => {
127                        let filtered: Vec<Vec<Value>> = rows
128                            .into_iter()
129                            .filter(|row| eval_predicate(predicate, row, &columns))
130                            .collect();
131                        Ok(QueryResult::Rows {
132                            columns,
133                            rows: filtered,
134                        })
135                    }
136                    _ => Err("filter requires row input".into()),
137                }
138            }
139
140            PlanNode::Project { input, fields } => {
141                // Fast path: Project over IndexScan — decode only projected
142                // columns from raw bytes instead of full decode_row.
143                if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
144                    let schema = self
145                        .catalog
146                        .schema(table)
147                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
148                        .clone();
149                    let all_columns: Vec<String> =
150                        schema.columns.iter().map(|c| c.name.clone()).collect();
151                    let key_value = literal_to_value(key)?;
152                    let tbl = self
153                        .catalog
154                        .get_table(table)
155                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
156
157                    let proj_columns: Vec<String> = fields
158                        .iter()
159                        .map(|f| {
160                            f.alias.clone().unwrap_or_else(|| match &f.expr {
161                                Expr::Field(name) => name.clone(),
162                                _ => "?".into(),
163                            })
164                        })
165                        .collect();
166
167                    // Determine which column indices the projection needs
168                    let proj_indices: Vec<usize> = fields
169                        .iter()
170                        .filter_map(|f| {
171                            if let Expr::Field(name) = &f.expr {
172                                all_columns.iter().position(|c| c == name)
173                            } else {
174                                None
175                            }
176                        })
177                        .collect();
178
179                    if tbl.has_index(column) {
180                        let layout = RowLayout::new(&schema);
181                        let rids = tbl.index_lookup_all(column, &key_value);
182                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
183                        for rid in rids {
184                            if let Some(data) = tbl.heap.get(rid) {
185                                let row: Vec<Value> = proj_indices
186                                    .iter()
187                                    .map(|&ci| decode_column(&schema, &layout, &data, ci))
188                                    .collect();
189                                rows.push(row);
190                            }
191                        }
192                        return Ok(QueryResult::Rows {
193                            columns: proj_columns,
194                            rows,
195                        });
196                    }
197                }
198
199                // Fast path: Project(Limit(Sort(Filter(SeqScan)))) — bounded
200                // top-N heap. Decodes only the sort key + projected columns,
201                // keeps at most `limit` rows in a heap. Also handles the
202                // Project(Limit(Sort(SeqScan))) variant (no filter).
203                if let PlanNode::Limit {
204                    input: inner,
205                    count: limit_expr,
206                } = input.as_ref()
207                {
208                    if let PlanNode::Sort {
209                        input: sort_input,
210                        keys,
211                    } = inner.as_ref()
212                    {
213                        // Fast path only for single-key sorts
214                        if keys.len() == 1 {
215                            let sort_field = &keys[0].field;
216                            let descending = keys[0].descending;
217                            let limit = match limit_expr {
218                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
219                                _ => usize::MAX,
220                            };
221                            let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
222                                match sort_input.as_ref() {
223                                    PlanNode::SeqScan { table } => (Some(table.as_str()), None),
224                                    PlanNode::Filter {
225                                        input: fi,
226                                        predicate,
227                                    } => {
228                                        if let PlanNode::SeqScan { table } = fi.as_ref() {
229                                            (Some(table.as_str()), Some(predicate))
230                                        } else {
231                                            (None, None)
232                                        }
233                                    }
234                                    _ => (None, None),
235                                };
236                            if let Some(table) = table_opt {
237                                if let Some(result) = self.project_filter_sort_limit_fast(
238                                    table, fields, sort_field, descending, limit, pred_opt,
239                                )? {
240                                    return Ok(result);
241                                }
242                            }
243                        }
244                    }
245                    // Fast path: Project(Limit(Filter(SeqScan))) — stream,
246                    // decode only projected columns, stop at limit.
247                    if let PlanNode::Filter {
248                        input: fi,
249                        predicate,
250                    } = inner.as_ref()
251                    {
252                        if let PlanNode::SeqScan { table } = fi.as_ref() {
253                            let limit = match limit_expr {
254                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
255                                _ => usize::MAX,
256                            };
257                            if let Some(result) = self.project_filter_limit_fast(
258                                table,
259                                fields,
260                                limit,
261                                Some(predicate),
262                            )? {
263                                return Ok(result);
264                            }
265                        }
266                    }
267                    // Fast path: Project(Limit(SeqScan)) — stream, no filter.
268                    if let PlanNode::SeqScan { table } = inner.as_ref() {
269                        let limit = match limit_expr {
270                            Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
271                            _ => usize::MAX,
272                        };
273                        if let Some(result) =
274                            self.project_filter_limit_fast(table, fields, limit, None)?
275                        {
276                            return Ok(result);
277                        }
278                    }
279                }
280
281                // Mission D4: Project(Filter(SeqScan)) without Limit. Reuses
282                // `project_filter_limit_fast` with limit = usize::MAX so the
283                // hot loop decodes only projected columns and uses the
284                // compiled predicate. Previously this fell through to the
285                // generic Filter branch which materialised every column via
286                // `decode_row` then re-projected — quadratic work.
287                //
288                // multi_col_and_filter (`U filter .age > 30 and .status =
289                // "active" { .name, .age }`) was 6.18ms (0.7x SQLite) and
290                // is the load-bearing workload for this fast path.
291                if let PlanNode::Filter {
292                    input: fi,
293                    predicate,
294                } = input.as_ref()
295                {
296                    if let PlanNode::SeqScan { table } = fi.as_ref() {
297                        if let Some(result) = self.project_filter_limit_fast(
298                            table,
299                            fields,
300                            usize::MAX,
301                            Some(predicate),
302                        )? {
303                            return Ok(result);
304                        }
305                    }
306                }
307
308                // Mission D4: Project(SeqScan) without Filter or Limit.
309                // Decode only projected columns; the previous fall-through
310                // built full Vec<Value> rows then re-projected.
311                if let PlanNode::SeqScan { table } = input.as_ref() {
312                    if let Some(result) =
313                        self.project_filter_limit_fast(table, fields, usize::MAX, None)?
314                    {
315                        return Ok(result);
316                    }
317                }
318
319                let result = self.execute_plan(input)?;
320                match result {
321                    QueryResult::Rows { columns, rows } => {
322                        let proj_columns: Vec<String> = fields
323                            .iter()
324                            .map(|f| {
325                                f.alias.clone().unwrap_or_else(|| match &f.expr {
326                                    Expr::Field(name) => name.clone(),
327                                    // Mission E1.2: `{ u.name }` projects as the
328                                    // qualified column name so callers can still
329                                    // disambiguate across the join output.
330                                    Expr::QualifiedField { qualifier, field } => {
331                                        format!("{qualifier}.{field}")
332                                    }
333                                    _ => "?".into(),
334                                })
335                            })
336                            .collect();
337                        let proj_rows: Vec<Vec<Value>> = rows
338                            .iter()
339                            .map(|row| {
340                                fields
341                                    .iter()
342                                    .map(|f| eval_expr(&f.expr, row, &columns))
343                                    .collect()
344                            })
345                            .collect();
346                        Ok(QueryResult::Rows {
347                            columns: proj_columns,
348                            rows: proj_rows,
349                        })
350                    }
351                    _ => Err("project requires row input".into()),
352                }
353            }
354
355            PlanNode::Sort { input, keys } => {
356                let result = self.execute_plan(input)?;
357                match result {
358                    QueryResult::Rows { columns, mut rows } => {
359                        // WS2: row-count cap is a cheap secondary guard; the
360                        // byte budget is the real OOM defense for the sort
361                        // buffer (a few very large rows pass the row cap).
362                        if rows.len() > MAX_SORT_ROWS {
363                            return Err(QueryError::SortLimitExceeded);
364                        }
365                        self.charge_rows(&rows)?;
366                        let key_indices: Vec<(usize, bool)> = keys
367                            .iter()
368                            .map(|k| {
369                                columns
370                                    .iter()
371                                    .position(|c| c == &k.field)
372                                    .map(|idx| (idx, k.descending))
373                                    .ok_or_else(|| QueryError::ColumnNotFound {
374                                        table: String::new(),
375                                        column: k.field.clone(),
376                                    })
377                            })
378                            .collect::<Result<_, QueryError>>()?;
379                        rows.sort_by(|a, b| {
380                            for &(col_idx, descending) in &key_indices {
381                                let cmp = a[col_idx].cmp(&b[col_idx]);
382                                let cmp = if descending { cmp.reverse() } else { cmp };
383                                if cmp != std::cmp::Ordering::Equal {
384                                    return cmp;
385                                }
386                            }
387                            std::cmp::Ordering::Equal
388                        });
389                        Ok(QueryResult::Rows { columns, rows })
390                    }
391                    _ => Err("sort requires row input".into()),
392                }
393            }
394
395            PlanNode::Limit { input, count } => {
396                let result = self.execute_plan(input)?;
397                let n = match count {
398                    Expr::Literal(Literal::Int(v)) => *v as usize,
399                    _ => return Err("limit must be integer literal".into()),
400                };
401                match result {
402                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
403                        columns,
404                        rows: rows.into_iter().take(n).collect(),
405                    }),
406                    _ => Err("limit requires row input".into()),
407                }
408            }
409
410            PlanNode::Offset { input, count } => {
411                let result = self.execute_plan(input)?;
412                let n = match count {
413                    Expr::Literal(Literal::Int(v)) => *v as usize,
414                    _ => return Err("offset must be integer literal".into()),
415                };
416                match result {
417                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
418                        columns,
419                        rows: rows.into_iter().skip(n).collect(),
420                    }),
421                    _ => Err("offset requires row input".into()),
422                }
423            }
424
425            PlanNode::Aggregate {
426                input,
427                function,
428                field,
429            } => {
430                // Fast path: count() over SeqScan — count rows without any decode
431                if *function == AggFunc::Count {
432                    if let PlanNode::SeqScan { table } = input.as_ref() {
433                        // Auto-refresh a dirty materialized view before
434                        // counting it — otherwise count(View) returns stale
435                        // data after an underlying mutation (F3).
436                        if self.view_registry.is_dirty(table) {
437                            self.refresh_view(table)?;
438                        }
439                        let mut count: i64 = 0;
440                        self.catalog
441                            .for_each_row_raw(table, |_rid, _data| {
442                                count += 1;
443                            })
444                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
445                        return Ok(QueryResult::Scalar(Value::Int(count)));
446                    }
447                    // Fast path: count() over Filter(SeqScan) — try compiled
448                    // predicate first, fall back to decode_column path.
449                    // Skip a predicate carrying a subquery: the raw-bytes
450                    // evaluators here don't materialise subqueries, so
451                    // `count(T filter .x in (...))` would silently count 0
452                    // (F1). Falling through routes it to the generic path
453                    // that resolves the subquery correctly.
454                    if let PlanNode::Filter {
455                        input: inner,
456                        predicate,
457                    } = input.as_ref()
458                    {
459                        if let PlanNode::SeqScan { table } = inner.as_ref() {
460                            if self.view_registry.is_dirty(table) {
461                                self.refresh_view(table)?;
462                            }
463                        }
464                        if let (PlanNode::SeqScan { table }, false) =
465                            (inner.as_ref(), contains_subquery(predicate))
466                        {
467                            let schema = self
468                                .catalog
469                                .schema(table)
470                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
471                                .clone();
472                            let columns: Vec<String> =
473                                schema.columns.iter().map(|c| c.name.clone()).collect();
474                            let fast = FastLayout::new(&schema);
475                            let row_layout = RowLayout::new(&schema);
476
477                            // Try compiled predicate (zero-allocation hot path).
478                            // Handles int leaves, string-eq leaves, AND conjunctions.
479                            if let Some(compiled) =
480                                compile_predicate(predicate, &columns, &fast, &schema)
481                            {
482                                let mut count: i64 = 0;
483                                self.catalog
484                                    .for_each_row_raw(table, |_rid, data| {
485                                        if compiled(data) {
486                                            count += 1;
487                                        }
488                                    })
489                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
490                                return Ok(QueryResult::Scalar(Value::Int(count)));
491                            }
492
493                            // Fallback: decode predicate columns
494                            let pred_cols = predicate_column_indices(predicate, &columns);
495                            let mut count: i64 = 0;
496                            self.catalog
497                                .for_each_row_raw(table, |_rid, data| {
498                                    let pred_row =
499                                        decode_selective(&schema, &row_layout, data, &pred_cols);
500                                    if eval_predicate(predicate, &pred_row, &columns) {
501                                        count += 1;
502                                    }
503                                })
504                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
505
506                            return Ok(QueryResult::Scalar(Value::Int(count)));
507                        }
508                    }
509                }
510
511                // Fast path: sum/avg/min/max over a single fixed-size int
512                // column with an optional compiled filter predicate. Walks
513                // raw row bytes, zero allocation per row.
514                if matches!(
515                    function,
516                    AggFunc::Sum
517                        | AggFunc::Avg
518                        | AggFunc::Min
519                        | AggFunc::Max
520                        | AggFunc::CountDistinct
521                ) {
522                    if let Some(col) = field.as_ref() {
523                        // Shape: Aggregate(SeqScan) or Aggregate(Filter(SeqScan))
524                        let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
525                            match input.as_ref() {
526                                PlanNode::SeqScan { table } => (Some(table.as_str()), None),
527                                PlanNode::Filter {
528                                    input: inner,
529                                    predicate,
530                                } => {
531                                    if let PlanNode::SeqScan { table } = inner.as_ref() {
532                                        (Some(table.as_str()), Some(predicate))
533                                    } else {
534                                        (None, None)
535                                    }
536                                }
537                                _ => (None, None),
538                            };
539                        if let Some(table) = table_opt {
540                            if let Some(result) =
541                                self.agg_single_col_fast(table, col, *function, pred_opt)?
542                            {
543                                return Ok(result);
544                            }
545                        }
546                    }
547                }
548
549                // Fast path: Project(Limit(Filter(SeqScan))) — stream, decode
550                // only projected columns, stop once we hit the limit.
551                // (Handled in the Project branch; this branch only fires when
552                // the aggregate is the outer node.)
553                let result = self.execute_plan(input)?;
554                match result {
555                    QueryResult::Rows { columns, rows } => {
556                        match function {
557                            AggFunc::Count => {
558                                Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
559                            }
560                            AggFunc::CountDistinct => {
561                                let col = field.as_ref().ok_or("count distinct requires field")?;
562                                let idx = columns
563                                    .iter()
564                                    .position(|c| c == col)
565                                    .ok_or("col not found")?;
566                                let mut seen = std::collections::HashSet::new();
567                                for row in &rows {
568                                    let v = &row[idx];
569                                    if !v.is_empty() {
570                                        seen.insert(v.clone());
571                                    }
572                                }
573                                Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
574                            }
575                            AggFunc::Avg => {
576                                let col = field.as_ref().ok_or("avg requires field")?;
577                                let idx = columns
578                                    .iter()
579                                    .position(|c| c == col)
580                                    .ok_or("col not found")?;
581                                let sum: f64 = rows
582                                    .iter()
583                                    .filter_map(|r| match &r[idx] {
584                                        Value::Int(v) => Some(*v as f64),
585                                        Value::Float(v) => Some(*v),
586                                        _ => None,
587                                    })
588                                    .sum();
589                                let count = rows.len() as f64;
590                                Ok(QueryResult::Scalar(Value::Float(sum / count)))
591                            }
592                            AggFunc::Sum => {
593                                let col = field.as_ref().ok_or("sum requires field")?;
594                                let idx = columns
595                                    .iter()
596                                    .position(|c| c == col)
597                                    .ok_or("col not found")?;
598                                // Track int and float contributions separately so
599                                // Float columns (and mixed Int/Float rows) don't get
600                                // silently dropped as they did in the Int-only
601                                // version. If any Float is present, the whole sum
602                                // promotes to Float — matching Avg's semantics.
603                                let mut int_sum: i64 = 0;
604                                let mut float_sum: f64 = 0.0;
605                                let mut saw_float = false;
606                                for r in &rows {
607                                    match &r[idx] {
608                                        Value::Int(v) => int_sum += *v,
609                                        Value::Float(v) => {
610                                            float_sum += *v;
611                                            saw_float = true;
612                                        }
613                                        _ => {}
614                                    }
615                                }
616                                let result = if saw_float {
617                                    Value::Float(float_sum + int_sum as f64)
618                                } else {
619                                    Value::Int(int_sum)
620                                };
621                                Ok(QueryResult::Scalar(result))
622                            }
623                            AggFunc::Min | AggFunc::Max => {
624                                let col = field.as_ref().ok_or("min/max requires field")?;
625                                let idx = columns
626                                    .iter()
627                                    .position(|c| c == col)
628                                    .ok_or("col not found")?;
629                                let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
630                                let result = if *function == AggFunc::Min {
631                                    vals.into_iter().min().cloned()
632                                } else {
633                                    vals.into_iter().max().cloned()
634                                };
635                                Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
636                            }
637                        }
638                    }
639                    _ => Err("aggregate requires row input".into()),
640                }
641            }
642
643            PlanNode::Insert { table, assignments } => {
644                let values = {
645                    let schema = self
646                        .catalog
647                        .schema(table)
648                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
649                    let mut values = vec![Value::Empty; schema.columns.len()];
650                    for a in assignments {
651                        let idx = schema.column_index(&a.field).ok_or_else(|| {
652                            QueryError::ColumnNotFound {
653                                table: String::new(),
654                                column: a.field.clone(),
655                            }
656                        })?;
657                        let raw = literal_to_value(&a.value)?;
658                        values[idx] = coerce_value(raw, &schema.columns[idx])?;
659                    }
660                    for col in &schema.columns {
661                        if col.required && matches!(values[col.position as usize], Value::Empty) {
662                            return Err(QueryError::Execution(format!(
663                                "column '{}' is required but no value was provided",
664                                col.name
665                            )));
666                        }
667                    }
668                    values
669                };
670                self.catalog
671                    .insert(table, &values)
672                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
673                self.view_registry.mark_dependents_dirty(table);
674                Ok(QueryResult::Modified(1))
675            }
676
677            PlanNode::Upsert {
678                table,
679                key_column,
680                assignments,
681                on_conflict,
682            } => {
683                let (values, key_idx) = {
684                    let schema = self
685                        .catalog
686                        .schema(table)
687                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
688                    let mut values = vec![Value::Empty; schema.columns.len()];
689                    for a in assignments {
690                        let idx = schema.column_index(&a.field).ok_or_else(|| {
691                            QueryError::ColumnNotFound {
692                                table: String::new(),
693                                column: a.field.clone(),
694                            }
695                        })?;
696                        let raw = literal_to_value(&a.value)?;
697                        values[idx] = coerce_value(raw, &schema.columns[idx])?;
698                    }
699                    for col in &schema.columns {
700                        if col.required && matches!(values[col.position as usize], Value::Empty) {
701                            return Err(QueryError::Execution(format!(
702                                "column '{}' is required but no value was provided",
703                                col.name
704                            )));
705                        }
706                    }
707                    let key_idx = schema
708                        .column_index(key_column)
709                        .ok_or_else(|| format!("key column '{key_column}' not found"))?;
710                    (values, key_idx)
711                };
712
713                let key_value = values[key_idx].clone();
714
715                // Probe the index for a conflict.
716                let existing = {
717                    let tbl = self
718                        .catalog
719                        .get_table(table)
720                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
721                    if tbl.has_index(key_column) {
722                        // Upsert key lookup: return the first match.
723                        // For unique indexes this is the only match.
724                        // For non-unique indexes on a key column, also
725                        // just the first (upsert semantics).
726                        let rids = tbl.index_lookup_all(key_column, &key_value);
727                        rids.into_iter().next().and_then(|rid| {
728                            tbl.heap
729                                .get(rid)
730                                .map(|data| (rid, decode_row(&tbl.schema, &data)))
731                        })
732                    } else {
733                        // No index — linear scan for the key.
734                        let mut found = None;
735                        for (rid, row) in tbl.scan() {
736                            if row[key_idx] == key_value {
737                                found = Some((rid, row));
738                                break;
739                            }
740                        }
741                        found
742                    }
743                };
744
745                if let Some((rid, mut existing_row)) = existing {
746                    // Conflict: apply on_conflict assignments (or all non-key if empty).
747                    let update_assignments = if on_conflict.is_empty() {
748                        assignments
749                    } else {
750                        on_conflict
751                    };
752                    let changed_cols: Vec<usize> = {
753                        let schema = self
754                            .catalog
755                            .schema(table)
756                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
757                        let mut indices = Vec::new();
758                        for a in update_assignments {
759                            let idx = schema.column_index(&a.field).ok_or_else(|| {
760                                QueryError::ColumnNotFound {
761                                    table: String::new(),
762                                    column: a.field.clone(),
763                                }
764                            })?;
765                            if idx != key_idx {
766                                existing_row[idx] = literal_to_value(&a.value)?;
767                                indices.push(idx);
768                            }
769                        }
770                        indices
771                    };
772                    self.catalog
773                        .update_hinted(table, rid, &existing_row, Some(&changed_cols))
774                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
775                    self.view_registry.mark_dependents_dirty(table);
776                    Ok(QueryResult::Modified(1))
777                } else {
778                    // No conflict: insert.
779                    self.catalog
780                        .insert(table, &values)
781                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
782                    self.view_registry.mark_dependents_dirty(table);
783                    Ok(QueryResult::Modified(1))
784                }
785            }
786
787            PlanNode::Update {
788                input,
789                table,
790                assignments,
791            } => {
792                // Mission C Phase 3: resolve assignments against a borrowed
793                // schema, then drop the borrow before the mutation loop.
794                // Try literal-only path first; fall back to per-row expression
795                // evaluation if any assignment contains a non-literal expression
796                // (e.g., `age := .age + 1`).
797                let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
798                    let schema_ref = self
799                        .catalog
800                        .schema(table)
801                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
802                    let indices: Vec<usize> = assignments
803                        .iter()
804                        .map(|a| {
805                            schema_ref.column_index(&a.field).ok_or_else(|| {
806                                QueryError::ColumnNotFound {
807                                    table: String::new(),
808                                    column: a.field.clone(),
809                                }
810                            })
811                        })
812                        .collect::<Result<_, _>>()?;
813                    let vals: Result<Vec<Value>, _> = assignments
814                        .iter()
815                        .map(|a| literal_to_value(&a.value))
816                        .collect();
817                    (indices, vals.ok())
818                };
819                let resolved_assignments: Option<Vec<(usize, Value)>> =
820                    literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
821
822                // Mission C Phase 2: the hint Table::update_hinted needs to
823                // decide whether to read the old row for index diff.
824                let changed_cols: Vec<usize> = col_indices.clone();
825
826                // ── Fused scan+update for Update(Filter(SeqScan)) ────────
827                // Perf sprint: instead of the two-pass collect-RIDs-then-loop
828                // pattern (which pays one ensure_hot per matched row on the
829                // second pass), fuse the predicate evaluation and in-place
830                // byte-level mutation into a single heap walk. Same idea as
831                // the fused scan_delete_matching path for deletes.
832                if let Some(ref resolved_assignments) = resolved_assignments {
833                    if let PlanNode::Filter {
834                        input: inner,
835                        predicate,
836                    } = input.as_ref()
837                    {
838                        if let PlanNode::SeqScan { table: t } = inner.as_ref() {
839                            if t == table {
840                                let fused_result = self.try_fused_scan_update(
841                                    table,
842                                    predicate,
843                                    resolved_assignments,
844                                    &changed_cols,
845                                );
846                                if let Some(result) = fused_result {
847                                    return result;
848                                }
849                            }
850                        }
851                    }
852                }
853
854                // Collect matching RowIds in a single pass.
855                let matching_rids = self.collect_rids_for_mutation(input, table)?;
856
857                // ── Literal-only fast paths ─────────────────────────────
858                if let Some(ref resolved_assignments) = resolved_assignments {
859                    // Mission C Phase 4: in-place byte-patch fast path. If every
860                    // assignment targets a fixed-size non-null column AND none of
861                    // them is indexed, we can skip decode_row / Vec<Value> /
862                    // encode_row_into entirely and patch the row's raw bytes on
863                    // the hot page.
864                    let fast_patch: Option<Vec<FastPatch>> = {
865                        let tbl = self
866                            .catalog
867                            .get_table(table)
868                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
869                        let schema = &tbl.schema;
870                        let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
871                            is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
872                        });
873                        let no_indexed = !resolved_assignments
874                            .iter()
875                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
876
877                        if all_fixed_nonnull && no_indexed {
878                            let layout = RowLayout::new(schema);
879                            let bitmap_size = layout.bitmap_size();
880                            let patches: Vec<FastPatch> = resolved_assignments
881                                .iter()
882                                .map(|(idx, val)| {
883                                    let fixed_off = layout
884                                        .fixed_offset(*idx)
885                                        .expect("is_fixed_size already checked");
886                                    let field_off = 2 + bitmap_size + fixed_off;
887                                    let bytes: FixedBytes = match val {
888                                        Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
889                                        Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
890                                        Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
891                                        Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
892                                        Value::Uuid(v) => FixedBytes::Uuid(*v),
893                                        _ => unreachable!("all_fixed_nonnull guard lied"),
894                                    };
895                                    FastPatch {
896                                        field_off,
897                                        bitmap_byte_off: 2 + idx / 8,
898                                        bit_mask: 1u8 << (idx % 8),
899                                        bytes,
900                                    }
901                                })
902                                .collect();
903                            Some(patches)
904                        } else {
905                            None
906                        }
907                    };
908
909                    if let Some(patches) = fast_patch {
910                        let mut count = 0u64;
911                        for rid in matching_rids {
912                            // Mission B2: WAL-log every patch so crash
913                            // recovery replays the update. Same mutation
914                            // closure as before — the wrapper just sandwiches
915                            // it between a hot-page read and a WAL append.
916                            let ok = self
917                                .catalog
918                                .update_row_bytes_logged(table, rid, |row| {
919                                    for p in &patches {
920                                        row[p.bitmap_byte_off] &= !p.bit_mask;
921                                        let field_bytes = p.bytes.as_slice();
922                                        row[p.field_off..p.field_off + field_bytes.len()]
923                                            .copy_from_slice(field_bytes);
924                                    }
925                                })
926                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
927                            if ok {
928                                count += 1;
929                            }
930                        }
931                        self.view_registry.mark_dependents_dirty(table);
932                        return Ok(QueryResult::Modified(count));
933                    }
934
935                    // Mission C Phase 10: var-column in-place shrink fast path.
936                    let var_fast: Option<(usize, Option<Vec<u8>>)> = {
937                        let tbl = self
938                            .catalog
939                            .get_table(table)
940                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
941                        let schema = &tbl.schema;
942                        let is_single = resolved_assignments.len() == 1;
943                        let is_var_col = is_single
944                            && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
945                        let no_indexed = !resolved_assignments
946                            .iter()
947                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
948
949                        if is_single && is_var_col && no_indexed {
950                            let (idx, val) = &resolved_assignments[0];
951                            let bytes_opt: Option<Vec<u8>> = match val {
952                                Value::Str(s) => Some(s.as_bytes().to_vec()),
953                                Value::Bytes(b) => Some(b.clone()),
954                                Value::Empty => None,
955                                _ => {
956                                    return Err(QueryError::TypeError(format!(
957                                        "cannot assign non-var value to var column '{}'",
958                                        schema.columns[*idx].name
959                                    )))
960                                }
961                            };
962                            Some((*idx, bytes_opt))
963                        } else {
964                            None
965                        }
966                    };
967
968                    if let Some((col_idx, new_bytes_opt)) = var_fast {
969                        let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
970                        let mut count = 0u64;
971                        let mut fallback_rids: Vec<RowId> = Vec::new();
972                        for rid in &matching_rids {
973                            // Mission B2: logged variant so crash recovery
974                            // replays the shrink. On a false return (row
975                            // would have to grow), the rid is pushed to
976                            // `fallback_rids` and the slower `update_hinted`
977                            // path — which is already WAL-logged — picks it up.
978                            let ok = self
979                                .catalog
980                                .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
981                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
982                            if ok {
983                                count += 1;
984                            } else {
985                                fallback_rids.push(*rid);
986                            }
987                        }
988                        for rid in fallback_rids {
989                            let mut row = match self.catalog.get(table, rid) {
990                                Some(r) => r,
991                                None => continue,
992                            };
993                            for (idx, val) in resolved_assignments.iter() {
994                                row[*idx] = val.clone();
995                            }
996                            self.catalog
997                                .update_hinted(table, rid, &row, Some(&changed_cols))
998                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
999                            count += 1;
1000                        }
1001                        self.view_registry.mark_dependents_dirty(table);
1002                        return Ok(QueryResult::Modified(count));
1003                    }
1004
1005                    // Generic literal path: decode row, apply literal values.
1006                    let mut count = 0u64;
1007                    for rid in matching_rids {
1008                        let mut row = match self.catalog.get(table, rid) {
1009                            Some(r) => r,
1010                            None => continue,
1011                        };
1012                        for (idx, val) in resolved_assignments.iter() {
1013                            row[*idx] = val.clone();
1014                        }
1015                        self.catalog
1016                            .update_hinted(table, rid, &row, Some(&changed_cols))
1017                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1018                        count += 1;
1019                    }
1020                    self.view_registry.mark_dependents_dirty(table);
1021                    return Ok(QueryResult::Modified(count));
1022                } // end if let Some(resolved_assignments)
1023
1024                // ── Expression-based update path ────────────────────────
1025                // At least one assignment contains a non-literal expression
1026                // (e.g., `age := .age + 1`). Evaluate per-row.
1027                let col_names: Vec<String> = {
1028                    let schema_ref = self
1029                        .catalog
1030                        .schema(table)
1031                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1032                    schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1033                };
1034                let mut count = 0u64;
1035                for rid in matching_rids {
1036                    let mut row = match self.catalog.get(table, rid) {
1037                        Some(r) => r,
1038                        None => continue,
1039                    };
1040                    for (i, asgn) in assignments.iter().enumerate() {
1041                        let val = eval_expr(&asgn.value, &row, &col_names);
1042                        row[col_indices[i]] = val;
1043                    }
1044                    self.catalog
1045                        .update_hinted(table, rid, &row, Some(&changed_cols))
1046                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1047                    count += 1;
1048                }
1049                self.view_registry.mark_dependents_dirty(table);
1050                Ok(QueryResult::Modified(count))
1051            }
1052
1053            PlanNode::Delete { input, table } => {
1054                // Mission C Phase 3: no schema clone — collect_rids_for_mutation
1055                // looks up schema internally when it needs one, and the mutation
1056                // loop doesn't need the schema at all.
1057                //
1058                // Mission C Phase 12: route bulk deletes through
1059                // `Catalog::delete_many`, which batches the btree leaf
1060                // compaction and shares one `ensure_hot` per row between
1061                // the index-key extraction and the slot delete. On
1062                // `delete_by_filter` (100K fixture, ~20K matches) that
1063                // removes ~4ms of pure `Vec::remove` memmove from the btree
1064                // maintenance phase.
1065                //
1066                // Mission C Phase 16: for the common `delete where ...`
1067                // shape (Filter(SeqScan)) — and the rarer "delete
1068                // everything" shape (SeqScan) — skip the two-pass
1069                // `collect_rids_for_mutation` + `delete_many` flow entirely.
1070                // The fused `scan_delete_matching` primitive walks the
1071                // heap exactly once, paying one `ensure_hot` per page
1072                // instead of per-row. That closes the last major gap on
1073                // the bench's `delete_by_filter` workload.
1074                if let PlanNode::Filter {
1075                    input: inner,
1076                    predicate,
1077                } = input.as_ref()
1078                {
1079                    if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1080                        if t == table {
1081                            let schema = self
1082                                .catalog
1083                                .schema(table)
1084                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1085                            let columns: Vec<String> =
1086                                schema.columns.iter().map(|c| c.name.clone()).collect();
1087                            let fast = FastLayout::new(schema);
1088                            if let Some(compiled) =
1089                                compile_predicate(predicate, &columns, &fast, schema)
1090                            {
1091                                // Mission B2: logged variant so every
1092                                // matched rid hits the WAL during the
1093                                // single-pass scan. Structure of the
1094                                // fused scan is unchanged — only the
1095                                // hook closure now also appends.
1096                                let count = self
1097                                    .catalog
1098                                    .scan_delete_matching_logged(table, |data| compiled(data))
1099                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1100                                self.view_registry.mark_dependents_dirty(table);
1101                                return Ok(QueryResult::Modified(count));
1102                            }
1103                        }
1104                    }
1105                } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1106                    if t == table {
1107                        // `delete from T` with no predicate — every live
1108                        // row matches. One pass is still the right shape.
1109                        // Mission B2: logged variant — see above.
1110                        let count = self
1111                            .catalog
1112                            .scan_delete_matching_logged(table, |_| true)
1113                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1114                        self.view_registry.mark_dependents_dirty(table);
1115                        return Ok(QueryResult::Modified(count));
1116                    }
1117                }
1118
1119                let matching_rids = self.collect_rids_for_mutation(input, table)?;
1120                let count = self
1121                    .catalog
1122                    .delete_many(table, &matching_rids)
1123                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1124                self.view_registry.mark_dependents_dirty(table);
1125                Ok(QueryResult::Modified(count))
1126            }
1127
1128            PlanNode::AliasScan { table, alias } => {
1129                // Mission E1.2: scan `table` and rename every output column
1130                // to `alias.field`. Used as a join leaf so downstream
1131                // NestedLoopJoin + Filter + Project nodes can resolve
1132                // `Expr::QualifiedField` lookups by direct column-name match.
1133                //
1134                // We don't bother with a fused zero-copy loop here yet — the
1135                // whole join path is nested-loop and correctness-first
1136                // (Phase E1.3 will introduce hash join and at that point we
1137                // can revisit whether to specialise AliasScan).
1138                let schema = self
1139                    .catalog
1140                    .schema(table)
1141                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1142                    .clone();
1143                let columns: Vec<String> = schema
1144                    .columns
1145                    .iter()
1146                    .map(|c| format!("{alias}.{}", c.name))
1147                    .collect();
1148                let rows: Vec<Vec<Value>> = self
1149                    .catalog
1150                    .scan(table)
1151                    .map_err(|e| QueryError::StorageError(e.to_string()))?
1152                    .map(|(_, row)| row)
1153                    .collect();
1154                Ok(QueryResult::Rows { columns, rows })
1155            }
1156
1157            PlanNode::NestedLoopJoin {
1158                left,
1159                right,
1160                on,
1161                kind,
1162            } => {
1163                // Materialise both sides. The executor ships two strategies:
1164                //   1. Hash join (E1.3) — when the `on` predicate is a
1165                //      simple equi-predicate `left_col = right_col`, build a
1166                //      FxHashMap<Value, Vec<row_idx>> over the right side
1167                //      and probe with the left side. O(L + R) instead of
1168                //      O(L × R). Handles Inner and LeftOuter.
1169                //   2. Nested loop (E1.2) — fallback for Cross, non-equi
1170                //      predicates, or `on` expressions that reference
1171                //      either side with something more complex than a
1172                //      QualifiedField.
1173                let left_result = self.execute_plan(left)?;
1174                let right_result = self.execute_plan(right)?;
1175                let (left_columns, left_rows) = match left_result {
1176                    QueryResult::Rows { columns, rows } => (columns, rows),
1177                    _ => return Err("join left side must produce rows".into()),
1178                };
1179                let (right_columns, right_rows) = match right_result {
1180                    QueryResult::Rows { columns, rows } => (columns, rows),
1181                    _ => return Err("join right side must produce rows".into()),
1182                };
1183
1184                // WS2: byte-budget guard on the join build side. Charge both
1185                // materialized inputs before we build the hash table / probe;
1186                // the output is row-capped by check_join_limit below.
1187                self.charge_rows(&left_rows)?;
1188                self.charge_rows(&right_rows)?;
1189
1190                // Hash-join fast path.
1191                if !matches!(kind, JoinKind::Cross) {
1192                    if let Some(pred) = on {
1193                        if let Some((l_idx, r_idx)) =
1194                            try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1195                        {
1196                            let result = hash_join(
1197                                left_columns,
1198                                left_rows,
1199                                right_columns,
1200                                right_rows,
1201                                l_idx,
1202                                r_idx,
1203                                *kind,
1204                            );
1205                            if let QueryResult::Rows { ref rows, .. } = result {
1206                                check_join_limit(rows.len())?;
1207                            }
1208                            return Ok(result);
1209                        }
1210                    }
1211                }
1212
1213                // Nested-loop fallback.
1214                let n_left = left_columns.len();
1215                let n_right = right_columns.len();
1216                let mut columns = Vec::with_capacity(n_left + n_right);
1217                columns.extend(left_columns);
1218                columns.extend(right_columns);
1219
1220                let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1221                let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1222
1223                for left_row in &left_rows {
1224                    let mut matched = false;
1225                    for right_row in &right_rows {
1226                        combined.clear();
1227                        combined.extend_from_slice(left_row);
1228                        combined.extend_from_slice(right_row);
1229                        let keep = match kind {
1230                            JoinKind::Cross => true,
1231                            JoinKind::Inner | JoinKind::LeftOuter => match on {
1232                                Some(pred) => eval_predicate(pred, &combined, &columns),
1233                                // Missing `on` for non-cross joins is a
1234                                // parser error, but if it slips through we
1235                                // treat it as "match everything".
1236                                None => true,
1237                            },
1238                            // RightOuter is rewritten to LeftOuter by the
1239                            // planner, so we never see it here.
1240                            JoinKind::RightOuter => {
1241                                unreachable!("planner rewrites RightOuter to LeftOuter")
1242                            }
1243                        };
1244                        if keep {
1245                            rows.push(combined.clone());
1246                            check_join_limit(rows.len())?;
1247                            matched = true;
1248                        }
1249                    }
1250                    if !matched && matches!(kind, JoinKind::LeftOuter) {
1251                        let mut row = Vec::with_capacity(n_left + n_right);
1252                        row.extend_from_slice(left_row);
1253                        row.resize(n_left + n_right, Value::Empty);
1254                        rows.push(row);
1255                        check_join_limit(rows.len())?;
1256                    }
1257                }
1258
1259                Ok(QueryResult::Rows { columns, rows })
1260            }
1261
1262            PlanNode::Distinct { input } => {
1263                let result = self.execute_plan(input)?;
1264                match result {
1265                    QueryResult::Rows { columns, rows } => {
1266                        let mut seen = std::collections::HashSet::new();
1267                        let mut unique_rows = Vec::new();
1268                        for row in rows {
1269                            if seen.insert(row.clone()) {
1270                                unique_rows.push(row);
1271                            }
1272                        }
1273                        Ok(QueryResult::Rows {
1274                            columns,
1275                            rows: unique_rows,
1276                        })
1277                    }
1278                    other => Ok(other),
1279                }
1280            }
1281
1282            PlanNode::GroupBy {
1283                input,
1284                keys,
1285                aggregates,
1286                having,
1287            } => {
1288                let result = self.execute_plan(input)?;
1289                match result {
1290                    QueryResult::Rows { columns, rows } => {
1291                        // WS2: byte-budget guard on the GROUP BY input buffer
1292                        // (the hash table is bounded by the input it groups).
1293                        self.charge_rows(&rows)?;
1294                        // Resolve key column indices.
1295                        let key_indices: Vec<usize> = keys
1296                            .iter()
1297                            .map(|k| {
1298                                columns
1299                                    .iter()
1300                                    .position(|c| c == k)
1301                                    .ok_or_else(|| format!("group-by column '{k}' not found"))
1302                            })
1303                            .collect::<Result<Vec<_>, _>>()?;
1304
1305                        // Resolve aggregate field indices. count(*) uses
1306                        // sentinel usize::MAX — compute_group_aggregate
1307                        // treats it as "count all rows in the group".
1308                        let agg_field_indices: Vec<usize> = aggregates
1309                            .iter()
1310                            .map(|a| {
1311                                if a.field == "*" {
1312                                    Ok(usize::MAX)
1313                                } else {
1314                                    columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1315                                        format!("aggregate column '{}' not found", a.field)
1316                                    })
1317                                }
1318                            })
1319                            .collect::<Result<Vec<_>, _>>()?;
1320
1321                        // Group rows by key values (preserving insertion order).
1322                        let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1323                            rustc_hash::FxHashMap::default();
1324                        let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1325                        for (ri, row) in rows.iter().enumerate() {
1326                            let key: Vec<Value> =
1327                                key_indices.iter().map(|&i| row[i].clone()).collect();
1328                            match group_map.get(&key) {
1329                                Some(&idx) => groups[idx].1.push(ri),
1330                                None => {
1331                                    let idx = groups.len();
1332                                    group_map.insert(key.clone(), idx);
1333                                    groups.push((key, vec![ri]));
1334                                }
1335                            }
1336                        }
1337
1338                        // Build output column names: keys ++ aggregate output names.
1339                        let mut out_columns: Vec<String> = keys.clone();
1340                        for agg in aggregates.iter() {
1341                            out_columns.push(agg.output_name.clone());
1342                        }
1343
1344                        // Compute aggregates per group.
1345                        let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1346                        for (key_vals, row_indices) in &groups {
1347                            let mut row = key_vals.clone();
1348                            for (ai, agg) in aggregates.iter().enumerate() {
1349                                let col_idx = agg_field_indices[ai];
1350                                let val = compute_group_aggregate(
1351                                    agg.function,
1352                                    &rows,
1353                                    row_indices,
1354                                    col_idx,
1355                                );
1356                                row.push(val);
1357                            }
1358                            out_rows.push(row);
1359                        }
1360
1361                        // Apply HAVING filter.
1362                        if let Some(having_expr) = having {
1363                            out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1364                        }
1365
1366                        Ok(QueryResult::Rows {
1367                            columns: out_columns,
1368                            rows: out_rows,
1369                        })
1370                    }
1371                    _ => Err("group by requires row input".into()),
1372                }
1373            }
1374
1375            PlanNode::CreateTable { name, fields } => {
1376                let columns: Vec<ColumnDef> = fields
1377                    .iter()
1378                    .enumerate()
1379                    .map(
1380                        |(i, (fname, tname, req))| -> Result<ColumnDef, QueryError> {
1381                            Ok(ColumnDef {
1382                                name: fname.clone(),
1383                                type_id: type_name_to_id(tname).map_err(QueryError::TypeError)?,
1384                                required: *req,
1385                                position: i as u16,
1386                            })
1387                        },
1388                    )
1389                    .collect::<Result<Vec<_>, _>>()?;
1390                let schema = Schema {
1391                    table_name: name.clone(),
1392                    columns,
1393                };
1394                self.catalog
1395                    .create_table(schema)
1396                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1397                Ok(QueryResult::Created(name.clone()))
1398            }
1399
1400            PlanNode::AlterTable { table, action } => match action {
1401                AlterAction::AddColumn {
1402                    name,
1403                    type_name,
1404                    required,
1405                } => {
1406                    let position = self
1407                        .catalog
1408                        .schema(table)
1409                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1410                        .columns
1411                        .len() as u16;
1412                    let col = ColumnDef {
1413                        name: name.clone(),
1414                        type_id: type_name_to_id(type_name).map_err(QueryError::TypeError)?,
1415                        required: *required,
1416                        position,
1417                    };
1418                    self.catalog
1419                        .alter_table_add_column(table, col)
1420                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1421                    Ok(QueryResult::Executed {
1422                        message: format!("column '{name}' added to '{table}'"),
1423                    })
1424                }
1425                AlterAction::DropColumn { name } => {
1426                    self.catalog
1427                        .alter_table_drop_column(table, name)
1428                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1429                    Ok(QueryResult::Executed {
1430                        message: format!("column '{name}' dropped from '{table}'"),
1431                    })
1432                }
1433                AlterAction::AddIndex { column } => {
1434                    self.catalog
1435                        .create_index(table, column)
1436                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1437                    Ok(QueryResult::Executed {
1438                        message: format!("index on '{table}.{column}' created"),
1439                    })
1440                }
1441            },
1442
1443            PlanNode::DropTable { name } => {
1444                self.catalog
1445                    .drop_table(name)
1446                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1447                Ok(QueryResult::Executed {
1448                    message: format!("table '{name}' dropped"),
1449                })
1450            }
1451
1452            PlanNode::CreateView { name, query_text } => {
1453                self.create_view(name, query_text)?;
1454                Ok(QueryResult::Executed {
1455                    message: format!("materialized view '{name}' created"),
1456                })
1457            }
1458
1459            PlanNode::RefreshView { name } => {
1460                self.refresh_view(name)?;
1461                Ok(QueryResult::Executed {
1462                    message: format!("materialized view '{name}' refreshed"),
1463                })
1464            }
1465
1466            PlanNode::DropView { name } => {
1467                self.drop_view(name)?;
1468                Ok(QueryResult::Executed {
1469                    message: format!("materialized view '{name}' dropped"),
1470                })
1471            }
1472
1473            PlanNode::Window { input, windows } => {
1474                let result = self.execute_plan(input)?;
1475                execute_window(result, windows)
1476            }
1477
1478            PlanNode::Union { left, right, all } => {
1479                let left_result = self.execute_plan(left)?;
1480                let right_result = self.execute_plan(right)?;
1481                let (left_cols, left_rows) = match left_result {
1482                    QueryResult::Rows { columns, rows } => (columns, rows),
1483                    _ => return Err("UNION requires query results on left side".into()),
1484                };
1485                let (_, right_rows) = match right_result {
1486                    QueryResult::Rows { columns, rows } => (columns, rows),
1487                    _ => return Err("UNION requires query results on right side".into()),
1488                };
1489                let mut combined = left_rows;
1490                if *all {
1491                    // UNION ALL — just concatenate.
1492                    combined.extend(right_rows);
1493                } else {
1494                    // UNION — deduplicate using the same HashSet approach
1495                    // as DISTINCT. Value already implements Hash + Eq.
1496                    let mut seen = std::collections::HashSet::new();
1497                    for row in &combined {
1498                        seen.insert(row.clone());
1499                    }
1500                    for row in right_rows {
1501                        if seen.insert(row.clone()) {
1502                            combined.push(row);
1503                        }
1504                    }
1505                }
1506                Ok(QueryResult::Rows {
1507                    columns: left_cols,
1508                    rows: combined,
1509                })
1510            }
1511
1512            PlanNode::Explain { input } => {
1513                let text = format_plan_tree(input, 0);
1514                Ok(QueryResult::Rows {
1515                    columns: vec!["plan".to_string()],
1516                    rows: text
1517                        .lines()
1518                        .map(|line| vec![Value::Str(line.to_string())])
1519                        .collect(),
1520                })
1521            }
1522
1523            PlanNode::Begin => {
1524                if self.in_transaction {
1525                    return Err(QueryError::Execution(
1526                        "already in a transaction (nested transactions not supported)".into(),
1527                    ));
1528                }
1529                self.in_transaction = true;
1530                Ok(QueryResult::Executed {
1531                    message: "transaction started".to_string(),
1532                })
1533            }
1534
1535            PlanNode::Commit => {
1536                if !self.in_transaction {
1537                    return Err(QueryError::Execution(
1538                        "no active transaction to commit".into(),
1539                    ));
1540                }
1541                self.in_transaction = false;
1542                self.catalog
1543                    .sync_wal()
1544                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1545                Ok(QueryResult::Executed {
1546                    message: "transaction committed".to_string(),
1547                })
1548            }
1549
1550            PlanNode::Rollback => {
1551                if !self.in_transaction {
1552                    return Err(QueryError::Execution(
1553                        "no active transaction to roll back".into(),
1554                    ));
1555                }
1556                self.in_transaction = false;
1557                self.catalog
1558                    .rollback_to_last_sync()
1559                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1560                if let Ok(mut cache) = self.plan_cache.lock() {
1561                    cache.clear();
1562                }
1563                self.view_registry = ViewRegistry::open(self.catalog.data_dir())
1564                    .unwrap_or_else(|_| ViewRegistry::new(self.catalog.data_dir()));
1565                Ok(QueryResult::Executed {
1566                    message: "transaction rolled back".to_string(),
1567                })
1568            }
1569
1570            PlanNode::IndexScan { table, column, key } => {
1571                let key_value = literal_to_value(key)?;
1572                let tbl = self
1573                    .catalog
1574                    .get_table(table)
1575                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1576                let columns: Vec<String> =
1577                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1578
1579                // Fast path: the table has a B-tree on this column.
1580                // Uses index_lookup_all to return ALL matching rows for
1581                // both unique and non-unique indexes.
1582                if tbl.has_index(column) {
1583                    let rids = tbl.index_lookup_all(column, &key_value);
1584                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1585                    for rid in rids {
1586                        if let Some(data) = tbl.heap.get(rid) {
1587                            rows.push(decode_row(&tbl.schema, &data));
1588                        }
1589                    }
1590                    return Ok(QueryResult::Rows { columns, rows });
1591                }
1592
1593                // Fallback: no index on this column. The planner emits IndexScan
1594                // eagerly (it has no visibility into which columns are indexed
1595                // at plan time), so here we must behave like SeqScan+Filter on
1596                // `.col = literal`: return *all* matching rows, not just the
1597                // first one. A non-indexed column isn't necessarily unique.
1598                // We compile the eq predicate once and stream without any
1599                // per-row decode for non-matching rows.
1600                let schema = &tbl.schema;
1601                let fast = FastLayout::new(schema);
1602                let synth_pred = Expr::BinaryOp(
1603                    Box::new(Expr::Field(column.clone())),
1604                    BinOp::Eq,
1605                    Box::new(key.clone()),
1606                );
1607                if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1608                    // Mission F: skip the first 4 Vec doublings.
1609                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1610                    self.catalog
1611                        .for_each_row_raw(table, |_rid, data| {
1612                            if compiled(data) {
1613                                rows.push(decode_row(schema, data));
1614                            }
1615                        })
1616                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1617                    return Ok(QueryResult::Rows { columns, rows });
1618                }
1619
1620                // Last resort: slow eq-check on materialised rows.
1621                let col_idx =
1622                    schema
1623                        .column_index(column)
1624                        .ok_or_else(|| QueryError::ColumnNotFound {
1625                            table: String::new(),
1626                            column: column.clone(),
1627                        })?;
1628                let rows: Vec<Vec<Value>> = tbl
1629                    .scan()
1630                    .filter_map(|(_, row)| {
1631                        if row[col_idx] == key_value {
1632                            Some(row)
1633                        } else {
1634                            None
1635                        }
1636                    })
1637                    .collect();
1638                Ok(QueryResult::Rows { columns, rows })
1639            }
1640
1641            PlanNode::RangeScan {
1642                table,
1643                column,
1644                start,
1645                end,
1646            } => {
1647                let tbl = self
1648                    .catalog
1649                    .get_table(table)
1650                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1651                let columns: Vec<String> =
1652                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1653                let schema = &tbl.schema;
1654
1655                let start_val = match start {
1656                    Some((expr, _)) => Some(literal_to_value(expr)?),
1657                    None => None,
1658                };
1659                let end_val = match end {
1660                    Some((expr, _)) => Some(literal_to_value(expr)?),
1661                    None => None,
1662                };
1663                let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1664                let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1665
1666                // Range scans only use the btree fast path for unique indexes,
1667                // because non-unique indexes store composite keys (column_value
1668                // + RowId) that don't directly compare against raw column values.
1669                if tbl.is_index_unique(column) == Some(true) {
1670                    if let Some(btree) = tbl.index(column) {
1671                        let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1672                            (Some(s), Some(e)) => btree.range(s, e).collect(),
1673                            (Some(s), None) => btree.range_from(s),
1674                            (None, Some(e)) => btree.range_to(e),
1675                            (None, None) => {
1676                                let rows: Vec<Vec<Value>> =
1677                                    tbl.scan().map(|(_, row)| row).collect();
1678                                return Ok(QueryResult::Rows { columns, rows });
1679                            }
1680                        };
1681                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1682                        for (key, rid) in hits {
1683                            if !start_inclusive {
1684                                if let Some(ref s) = start_val {
1685                                    if &key == s {
1686                                        continue;
1687                                    }
1688                                }
1689                            }
1690                            if !end_inclusive {
1691                                if let Some(ref e) = end_val {
1692                                    if &key == e {
1693                                        continue;
1694                                    }
1695                                }
1696                            }
1697                            if let Some(data) = tbl.heap.get(rid) {
1698                                rows.push(decode_row(schema, &data));
1699                            }
1700                        }
1701                        return Ok(QueryResult::Rows { columns, rows });
1702                    }
1703                }
1704
1705                // Fallback: no index — synthesize range predicate and scan.
1706                let fast = FastLayout::new(schema);
1707                let synth = synthesize_range_predicate(column, start, end);
1708                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1709                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1710                    self.catalog
1711                        .for_each_row_raw(table, |_rid, data| {
1712                            if compiled(data) {
1713                                rows.push(decode_row(schema, data));
1714                            }
1715                        })
1716                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1717                    return Ok(QueryResult::Rows { columns, rows });
1718                }
1719
1720                let col_idx =
1721                    schema
1722                        .column_index(column)
1723                        .ok_or_else(|| QueryError::ColumnNotFound {
1724                            table: String::new(),
1725                            column: column.clone(),
1726                        })?;
1727                let rows: Vec<Vec<Value>> = tbl
1728                    .scan()
1729                    .filter(|(_, row)| {
1730                        range_matches(
1731                            &row[col_idx],
1732                            &start_val,
1733                            start_inclusive,
1734                            &end_val,
1735                            end_inclusive,
1736                        )
1737                    })
1738                    .map(|(_, row)| row)
1739                    .collect();
1740                Ok(QueryResult::Rows { columns, rows })
1741            }
1742        }
1743    }
1744
1745    // ─── Materialized view operations ──────────────────────────────────────
1746
1747    /// Create a materialized view: execute the source query, store results
1748    /// in a new backing table, and register the view.
1749    fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1750        if self.view_registry.is_view(name) {
1751            return Err(QueryError::ViewError(format!(
1752                "materialized view '{name}' already exists"
1753            )));
1754        }
1755        // Execute the source query to get the result set.
1756        let result = self.execute_powql(query_text)?;
1757        let (columns, rows) = match result {
1758            QueryResult::Rows { columns, rows } => (columns, rows),
1759            _ => return Err("view source query must be a SELECT".into()),
1760        };
1761        // Derive a schema for the backing table from the query result columns.
1762        let schema = self.derive_view_schema(name, &columns, &rows);
1763        // Create the backing table and insert the result rows.
1764        self.catalog
1765            .create_table(schema)
1766            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1767        for row in &rows {
1768            self.catalog
1769                .insert(name, row)
1770                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1771        }
1772        // Determine which base tables this view depends on by parsing the query.
1773        let depends_on = self.extract_view_deps(query_text);
1774        self.view_registry
1775            .register(ViewDef {
1776                name: name.to_string(),
1777                query: query_text.to_string(),
1778                depends_on,
1779                dirty: false,
1780            })
1781            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1782        Ok(())
1783    }
1784
1785    /// Refresh a materialized view: re-execute its source query and replace
1786    /// the backing table's contents.
1787    fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1788        let def = self
1789            .view_registry
1790            .get(name)
1791            .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1792        let query_text = def.query.clone();
1793        // Execute the source query.
1794        let result = self.execute_powql(&query_text)?;
1795        let (_columns, rows) = match result {
1796            QueryResult::Rows { columns, rows } => (columns, rows),
1797            _ => return Err("view source query must be a SELECT".into()),
1798        };
1799        // Clear old data and insert fresh results. Mission B2: logged
1800        // variant — view refreshes are a mutation and crash recovery
1801        // must see them.
1802        self.catalog
1803            .scan_delete_matching_logged(name, |_| true)
1804            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1805        for row in &rows {
1806            self.catalog
1807                .insert(name, row)
1808                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1809        }
1810        self.view_registry.mark_clean(name);
1811        Ok(())
1812    }
1813
1814    /// Drop a materialized view: remove the backing table and unregister.
1815    fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1816        if !self.view_registry.is_view(name) {
1817            return Err(QueryError::ViewError(format!(
1818                "materialized view '{name}' not found"
1819            )));
1820        }
1821        self.view_registry
1822            .unregister(name)
1823            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1824        self.catalog
1825            .drop_table(name)
1826            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1827        Ok(())
1828    }
1829
1830    /// Derive a storage `Schema` for a view's backing table from query
1831    /// result column names and the first row's types.
1832    fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1833        use powdb_storage::types::{ColumnDef, TypeId};
1834        let cols: Vec<ColumnDef> = columns
1835            .iter()
1836            .enumerate()
1837            .map(|(i, col_name)| {
1838                let type_id = rows
1839                    .first()
1840                    .and_then(|row| row.get(i))
1841                    .map(|v| v.type_id())
1842                    .unwrap_or(TypeId::Str);
1843                ColumnDef {
1844                    name: col_name.clone(),
1845                    type_id,
1846                    required: false,
1847                    position: i as u16,
1848                }
1849            })
1850            .collect();
1851        Schema {
1852            table_name: name.to_string(),
1853            columns: cols,
1854        }
1855    }
1856
1857    /// Extract base table dependencies from a view's source query by
1858    /// parsing it and collecting the source table name.
1859    fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1860        use crate::parser::parse;
1861        match parse(query_text) {
1862            Ok(Statement::Query(q)) => {
1863                let mut deps = vec![q.source.clone()];
1864                for j in &q.joins {
1865                    deps.push(j.source.clone());
1866                }
1867                deps
1868            }
1869            _ => Vec::new(),
1870        }
1871    }
1872
1873    // ─── Specialized fast paths ─────────────────────────────────────────────
1874    //
1875    // These methods are helpers for the `execute_plan` match arms above.
1876    // Each returns `Ok(Some(result))` when the fast path fires, `Ok(None)`
1877    // when the shape isn't supported (caller falls back to generic code).
1878
1879    /// Aggregate sum/avg/min/max over a single fixed-size i64 column, with
1880    /// an optional compiled filter predicate. Walks raw row bytes — zero
1881    /// per-row allocation. Uses i128 accumulator for sum/avg overflow safety.
1882    pub(super) fn agg_single_col_fast(
1883        &self,
1884        table: &str,
1885        col: &str,
1886        function: AggFunc,
1887        predicate: Option<&Expr>,
1888    ) -> Result<Option<QueryResult>, QueryError> {
1889        let schema = self
1890            .catalog
1891            .schema(table)
1892            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1893            .clone();
1894        let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1895        let col_idx = match schema.column_index(col) {
1896            Some(i) => i,
1897            None => return Ok(None),
1898        };
1899        // Only fast-path fixed-size numeric columns (Int/Float) for
1900        // sum/avg/min/max/count. Mission D10: Float parity — prior version
1901        // bailed on Float columns, forcing them through the generic row-
1902        // decoding path that allocated a Vec<Value> per row and dispatched
1903        // on Value::cmp for every compare. f64 decode is structurally the
1904        // same as i64 (load 8 bytes, cast), so the fast path handles both.
1905        let col_type = schema.columns[col_idx].type_id;
1906        if col_type != TypeId::Int && col_type != TypeId::Float {
1907            return Ok(None);
1908        }
1909
1910        let fast = FastLayout::new(&schema);
1911        // Mission C Phase 20b: inline the numeric-column reader instead of
1912        // building a `Box<dyn Fn>`. Eliminates 100K vtable dispatches per
1913        // 100K-row agg scan — every reader call folds directly into the
1914        // hot loop below.
1915        let byte_offset = match fast.fixed_offsets[col_idx] {
1916            Some(o) => o,
1917            None => return Ok(None),
1918        };
1919        let bitmap_byte = col_idx / 8;
1920        let bitmap_bit = (col_idx % 8) as u32;
1921        let data_offset = 2 + fast.bitmap_size + byte_offset;
1922
1923        // Optional compiled filter.
1924        let compiled_pred: Option<CompiledPredicate> = match predicate {
1925            Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
1926                Some(c) => Some(c),
1927                None => return Ok(None), // let generic path handle it
1928            },
1929            None => None,
1930        };
1931
1932        // Mission C Phase 20b: specialize the inner loop per aggregate
1933        // function. The previous version ran a `match function { ... }`
1934        // *inside* the closure, which kept LLVM from producing optimal
1935        // scalar code for each variant (agg_max regressed ~23% vs the
1936        // baseline Box<dyn Fn> version even though per-row vtable cost
1937        // should have been strictly lower). Pushing the match out of the
1938        // hot loop lets each specialized body fold cleanly into
1939        // `for_each_row_raw` and removes a captured `AggFunc` + match
1940        // dispatch per row.
1941        //
1942        // Mission D10: same specialisation applies to the Float branch.
1943        // For Min/Max we use `f64::total_cmp` so the result matches
1944        // `Value::Ord` — this is the same ordering ORDER BY and the
1945        // top-N sort fast path use, keeping semantics consistent across
1946        // read paths (NaN compares as greatest, -0.0 < +0.0 for
1947        // deterministic tie-breaking).
1948        //
1949        // Mission D11 Phase 1: each inner loop now splits on presence of
1950        // a predicate (`if let Some(pred) = &compiled_pred`) so the hot
1951        // body never re-tests `Option` per row, and reads column bytes
1952        // via `read_i64_unchecked` / `read_f64_unchecked` helpers that
1953        // drop two bounds checks per row (null bitmap byte + value
1954        // slice). Safety is carried by the `FastLayout` invariant that
1955        // `data_offset + 8 <= row_len` for any fixed-size column; see
1956        // the helper doc comments. Hot loops are macro-generated so the
1957        // with-pred / no-pred split can't drift between variants.
1958        let result = match col_type {
1959            TypeId::Int => match function {
1960                AggFunc::Sum | AggFunc::Avg => {
1961                    let mut sum_i128: i128 = 0;
1962                    let mut count: i64 = 0;
1963                    agg_int_loop!(
1964                        self,
1965                        table,
1966                        compiled_pred,
1967                        bitmap_byte,
1968                        bitmap_bit,
1969                        data_offset,
1970                        |v: i64| {
1971                            count += 1;
1972                            sum_i128 += v as i128;
1973                        }
1974                    );
1975                    if matches!(function, AggFunc::Sum) {
1976                        let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
1977                        QueryResult::Scalar(Value::Int(clamped))
1978                    } else if count == 0 {
1979                        QueryResult::Scalar(Value::Empty)
1980                    } else {
1981                        let avg = (sum_i128 as f64) / (count as f64);
1982                        QueryResult::Scalar(Value::Float(avg))
1983                    }
1984                }
1985                AggFunc::Min => {
1986                    let mut min_v: Option<i64> = None;
1987                    agg_int_loop!(
1988                        self,
1989                        table,
1990                        compiled_pred,
1991                        bitmap_byte,
1992                        bitmap_bit,
1993                        data_offset,
1994                        |v: i64| {
1995                            min_v = Some(match min_v {
1996                                Some(m) => m.min(v),
1997                                None => v,
1998                            });
1999                        }
2000                    );
2001                    QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
2002                }
2003                AggFunc::Max => {
2004                    let mut max_v: Option<i64> = None;
2005                    agg_int_loop!(
2006                        self,
2007                        table,
2008                        compiled_pred,
2009                        bitmap_byte,
2010                        bitmap_bit,
2011                        data_offset,
2012                        |v: i64| {
2013                            max_v = Some(match max_v {
2014                                Some(m) => m.max(v),
2015                                None => v,
2016                            });
2017                        }
2018                    );
2019                    QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
2020                }
2021                AggFunc::Count => {
2022                    let mut count: i64 = 0;
2023                    agg_int_loop!(
2024                        self,
2025                        table,
2026                        compiled_pred,
2027                        bitmap_byte,
2028                        bitmap_bit,
2029                        data_offset,
2030                        |_v: i64| {
2031                            count += 1;
2032                        }
2033                    );
2034                    QueryResult::Scalar(Value::Int(count))
2035                }
2036                AggFunc::CountDistinct => {
2037                    let mut seen = rustc_hash::FxHashSet::default();
2038                    agg_int_loop!(
2039                        self,
2040                        table,
2041                        compiled_pred,
2042                        bitmap_byte,
2043                        bitmap_bit,
2044                        data_offset,
2045                        |v: i64| {
2046                            seen.insert(v);
2047                        }
2048                    );
2049                    QueryResult::Scalar(Value::Int(seen.len() as i64))
2050                }
2051            },
2052            TypeId::Float => match function {
2053                AggFunc::Sum => {
2054                    // Use a single f64 accumulator. Naive summation is
2055                    // sufficient for MVP parity; if precision becomes an
2056                    // issue on long scans we can upgrade to Kahan–Neumaier
2057                    // compensated sum (~2x scalar cost, zero error growth).
2058                    let mut sum: f64 = 0.0;
2059                    agg_float_loop!(
2060                        self,
2061                        table,
2062                        compiled_pred,
2063                        bitmap_byte,
2064                        bitmap_bit,
2065                        data_offset,
2066                        |v: f64| {
2067                            sum += v;
2068                        }
2069                    );
2070                    QueryResult::Scalar(Value::Float(sum))
2071                }
2072                AggFunc::Avg => {
2073                    let mut sum: f64 = 0.0;
2074                    let mut count: i64 = 0;
2075                    agg_float_loop!(
2076                        self,
2077                        table,
2078                        compiled_pred,
2079                        bitmap_byte,
2080                        bitmap_bit,
2081                        data_offset,
2082                        |v: f64| {
2083                            sum += v;
2084                            count += 1;
2085                        }
2086                    );
2087                    if count == 0 {
2088                        QueryResult::Scalar(Value::Empty)
2089                    } else {
2090                        QueryResult::Scalar(Value::Float(sum / count as f64))
2091                    }
2092                }
2093                AggFunc::Min => {
2094                    // `total_cmp` for deterministic NaN handling (matches
2095                    // Value::Ord). NaN compares greatest, so Min will
2096                    // correctly ignore it in favour of any finite value.
2097                    let mut min_v: Option<f64> = None;
2098                    agg_float_loop!(
2099                        self,
2100                        table,
2101                        compiled_pred,
2102                        bitmap_byte,
2103                        bitmap_bit,
2104                        data_offset,
2105                        |v: f64| {
2106                            min_v = Some(match min_v {
2107                                Some(m) => {
2108                                    if v.total_cmp(&m).is_lt() {
2109                                        v
2110                                    } else {
2111                                        m
2112                                    }
2113                                }
2114                                None => v,
2115                            });
2116                        }
2117                    );
2118                    QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2119                }
2120                AggFunc::Max => {
2121                    let mut max_v: Option<f64> = None;
2122                    agg_float_loop!(
2123                        self,
2124                        table,
2125                        compiled_pred,
2126                        bitmap_byte,
2127                        bitmap_bit,
2128                        data_offset,
2129                        |v: f64| {
2130                            max_v = Some(match max_v {
2131                                Some(m) => {
2132                                    if v.total_cmp(&m).is_gt() {
2133                                        v
2134                                    } else {
2135                                        m
2136                                    }
2137                                }
2138                                None => v,
2139                            });
2140                        }
2141                    );
2142                    QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2143                }
2144                AggFunc::Count => {
2145                    let mut count: i64 = 0;
2146                    agg_float_loop!(
2147                        self,
2148                        table,
2149                        compiled_pred,
2150                        bitmap_byte,
2151                        bitmap_bit,
2152                        data_offset,
2153                        |_v: f64| {
2154                            count += 1;
2155                        }
2156                    );
2157                    QueryResult::Scalar(Value::Int(count))
2158                }
2159                AggFunc::CountDistinct => {
2160                    // Hash on `f64::to_bits` — matches `Value::Hash`, so
2161                    // distinct NaN bit patterns count as distinct and
2162                    // -0.0/+0.0 count as distinct. Consistent with how
2163                    // Float values are hashed in every other DISTINCT /
2164                    // GROUP BY path.
2165                    let mut seen = rustc_hash::FxHashSet::default();
2166                    agg_float_loop!(
2167                        self,
2168                        table,
2169                        compiled_pred,
2170                        bitmap_byte,
2171                        bitmap_bit,
2172                        data_offset,
2173                        |v: f64| {
2174                            seen.insert(v.to_bits());
2175                        }
2176                    );
2177                    QueryResult::Scalar(Value::Int(seen.len() as i64))
2178                }
2179            },
2180            _ => unreachable!("type guard above restricts to Int/Float"),
2181        };
2182        Ok(Some(result))
2183    }
2184
2185    /// `Project(Limit(Filter(SeqScan)))` and `Project(Limit(SeqScan))`.
2186    /// Streams rows, decodes only projected columns, stops at the limit.
2187    pub(super) fn project_filter_limit_fast(
2188        &self,
2189        table: &str,
2190        fields: &[ProjectField],
2191        limit: usize,
2192        predicate: Option<&Expr>,
2193    ) -> Result<Option<QueryResult>, QueryError> {
2194        let schema = self
2195            .catalog
2196            .schema(table)
2197            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2198            .clone();
2199        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2200
2201        // Each projection field must be a simple `.field` reference for this
2202        // fast path. Aliased or computed fields fall through.
2203        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2204        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2205        for f in fields {
2206            let name = match &f.expr {
2207                Expr::Field(n) => n.clone(),
2208                _ => return Ok(None),
2209            };
2210            let idx = match all_columns.iter().position(|c| c == &name) {
2211                Some(i) => i,
2212                None => return Ok(None),
2213            };
2214            proj_indices.push(idx);
2215            proj_columns.push(f.alias.clone().unwrap_or(name));
2216        }
2217
2218        let fast = FastLayout::new(&schema);
2219        let row_layout = RowLayout::new(&schema);
2220
2221        let compiled_pred: Option<CompiledPredicate> = match predicate {
2222            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2223                Some(c) => Some(c),
2224                None => return Ok(None),
2225            },
2226            None => None,
2227        };
2228
2229        let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2230        // Mission D2: use try_for_each_row_raw to actually stop iterating
2231        // once the limit is reached. The previous `done` flag only short-
2232        // circuited the closure body, so a `limit 100` over 100K rows still
2233        // walked all 100K slots — burning ~30x SQLite on scan_filter_project_top100.
2234        self.catalog
2235            .try_for_each_row_raw(table, |_rid, data| {
2236                use std::ops::ControlFlow;
2237                if let Some(ref pred) = compiled_pred {
2238                    if !pred(data) {
2239                        return ControlFlow::Continue(());
2240                    }
2241                }
2242                let row: Vec<Value> = proj_indices
2243                    .iter()
2244                    .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2245                    .collect();
2246                out.push(row);
2247                if out.len() >= limit {
2248                    ControlFlow::Break(())
2249                } else {
2250                    ControlFlow::Continue(())
2251                }
2252            })
2253            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2254
2255        Ok(Some(QueryResult::Rows {
2256            columns: proj_columns,
2257            rows: out,
2258        }))
2259    }
2260
2261    /// `Project(Limit(Sort(Filter(SeqScan))))` and `Project(Limit(Sort(SeqScan)))`.
2262    /// Bounded top-N heap over the sort key. Only the sort key needs to be
2263    /// read per row; projected columns are decoded only for the final
2264    /// winning rows when the heap drains.
2265    pub(super) fn project_filter_sort_limit_fast(
2266        &self,
2267        table: &str,
2268        fields: &[ProjectField],
2269        sort_field: &str,
2270        descending: bool,
2271        limit: usize,
2272        predicate: Option<&Expr>,
2273    ) -> Result<Option<QueryResult>, QueryError> {
2274        if limit == 0 {
2275            // Degenerate case — empty result. Let the generic path handle it
2276            // for proper column naming.
2277            return Ok(None);
2278        }
2279        let schema = self
2280            .catalog
2281            .schema(table)
2282            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2283            .clone();
2284        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2285
2286        // Sort key must be a fixed-size numeric column (Int or Float).
2287        // Mission D10: extended from Int-only. Float sort keys use a
2288        // sortable-u64 transform (see `f64_to_sortable_u64`) so the heap
2289        // path stays keyed on `u64` and the whole branch shape is
2290        // identical to the Int case — no new heap types, no `total_cmp`
2291        // closures in the hot loop.
2292        let sort_idx = match schema.column_index(sort_field) {
2293            Some(i) => i,
2294            None => return Ok(None),
2295        };
2296        let sort_col_type = schema.columns[sort_idx].type_id;
2297        if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2298            return Ok(None);
2299        }
2300
2301        // Each projection field must be a simple `.field`.
2302        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2303        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2304        for f in fields {
2305            let name = match &f.expr {
2306                Expr::Field(n) => n.clone(),
2307                _ => return Ok(None),
2308            };
2309            let idx = match all_columns.iter().position(|c| c == &name) {
2310                Some(i) => i,
2311                None => return Ok(None),
2312            };
2313            proj_indices.push(idx);
2314            proj_columns.push(f.alias.clone().unwrap_or(name));
2315        }
2316
2317        let fast = FastLayout::new(&schema);
2318        let row_layout = RowLayout::new(&schema);
2319        // Mission C Phase 20b: inline numeric-column reader (no Box<dyn Fn>).
2320        let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2321            Some(o) => o,
2322            None => return Ok(None),
2323        };
2324        let sort_bitmap_byte = sort_idx / 8;
2325        let sort_bitmap_bit = (sort_idx % 8) as u32;
2326        let sort_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2327
2328        let compiled_pred: Option<CompiledPredicate> = match predicate {
2329            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2330                Some(c) => Some(c),
2331                None => return Ok(None),
2332            },
2333            None => None,
2334        };
2335
2336        // Bounded top-N heap. For `order .x desc limit N`, we want the N
2337        // largest values — use a min-heap so the smallest is at the top and
2338        // can be popped when a better candidate arrives. For ascending, use
2339        // a max-heap. We tie-break with a monotonic `seq` counter so the
2340        // result is deterministic and stable.
2341        //
2342        // To keep this simple we maintain two typed heaps and pick by
2343        // direction.
2344        let drained: Vec<Vec<u8>> = match sort_col_type {
2345            TypeId::Int => {
2346                let mut seq: u64 = 0;
2347                let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2348                    BinaryHeap::with_capacity(limit);
2349                let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2350                    BinaryHeap::with_capacity(limit);
2351
2352                self.catalog
2353                    .for_each_row_raw(table, |_rid, data| {
2354                        if let Some(ref pred) = compiled_pred {
2355                            if !pred(data) {
2356                                return;
2357                            }
2358                        }
2359                        // Inlined int-column reader: null check + i64 decode.
2360                        if data.len() < sort_data_offset + 8 {
2361                            return;
2362                        }
2363                        let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2364                        if is_null {
2365                            return;
2366                        }
2367                        let key = i64::from_le_bytes(
2368                            data[sort_data_offset..sort_data_offset + 8]
2369                                .try_into()
2370                                .unwrap_or_else(|_| unreachable!()),
2371                        );
2372                        let id = seq;
2373                        seq += 1;
2374
2375                        if descending {
2376                            if heap_desc.len() < limit {
2377                                heap_desc.push(Reverse((key, id, data.to_vec())));
2378                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2379                                if key > *top_key {
2380                                    heap_desc.pop();
2381                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2382                                }
2383                            }
2384                        } else if heap_asc.len() < limit {
2385                            heap_asc.push((key, id, data.to_vec()));
2386                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2387                            if key < *top_key {
2388                                heap_asc.pop();
2389                                heap_asc.push((key, id, data.to_vec()));
2390                            }
2391                        }
2392                    })
2393                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2394
2395                let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2396                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2397                } else {
2398                    heap_asc.into_iter().collect()
2399                };
2400                if descending {
2401                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2402                } else {
2403                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2404                }
2405                drained.into_iter().map(|(_, _, d)| d).collect()
2406            }
2407            TypeId::Float => {
2408                // Novel angle: rather than introducing a `TotalF64` newtype
2409                // with `Ord via total_cmp`, transform the f64 bit pattern
2410                // into a sortable `u64` so `BinaryHeap<u64>` orders exactly
2411                // like `f64::total_cmp` would. Classic trick: flip the sign
2412                // bit on positives, flip all bits on negatives. Result:
2413                // - NaN (sign=0) stays greatest, matching total_cmp
2414                // - -0.0 sorts before +0.0, matching total_cmp
2415                // - Hot loop is branch-cheap (one compare + one xor)
2416                let mut seq: u64 = 0;
2417                let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2418                    BinaryHeap::with_capacity(limit);
2419                let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2420                    BinaryHeap::with_capacity(limit);
2421
2422                self.catalog
2423                    .for_each_row_raw(table, |_rid, data| {
2424                        if let Some(ref pred) = compiled_pred {
2425                            if !pred(data) {
2426                                return;
2427                            }
2428                        }
2429                        if data.len() < sort_data_offset + 8 {
2430                            return;
2431                        }
2432                        let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2433                        if is_null {
2434                            return;
2435                        }
2436                        let bits = u64::from_le_bytes(
2437                            data[sort_data_offset..sort_data_offset + 8]
2438                                .try_into()
2439                                .unwrap_or_else(|_| unreachable!()),
2440                        );
2441                        let key = f64_bits_to_sortable_u64(bits);
2442                        let id = seq;
2443                        seq += 1;
2444
2445                        if descending {
2446                            if heap_desc.len() < limit {
2447                                heap_desc.push(Reverse((key, id, data.to_vec())));
2448                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2449                                if key > *top_key {
2450                                    heap_desc.pop();
2451                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2452                                }
2453                            }
2454                        } else if heap_asc.len() < limit {
2455                            heap_asc.push((key, id, data.to_vec()));
2456                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2457                            if key < *top_key {
2458                                heap_asc.pop();
2459                                heap_asc.push((key, id, data.to_vec()));
2460                            }
2461                        }
2462                    })
2463                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2464
2465                let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2466                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2467                } else {
2468                    heap_asc.into_iter().collect()
2469                };
2470                if descending {
2471                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2472                } else {
2473                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2474                }
2475                drained.into_iter().map(|(_, _, d)| d).collect()
2476            }
2477            _ => unreachable!("type guard above restricts to Int/Float"),
2478        };
2479
2480        let rows: Vec<Vec<Value>> = drained
2481            .into_iter()
2482            .map(|data| {
2483                proj_indices
2484                    .iter()
2485                    .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2486                    .collect()
2487            })
2488            .collect();
2489
2490        Ok(Some(QueryResult::Rows {
2491            columns: proj_columns,
2492            rows,
2493        }))
2494    }
2495
2496    /// Gather the RowIds that a mutation should operate on, without
2497    /// materialising the full row set. Handles the shapes the planner emits
2498    /// for update/delete: SeqScan, IndexScan, and Filter(SeqScan). Other
2499    /// shapes fall back to `generic_rid_match`.
2500    ///
2501    /// Perf sprint: try to fuse the predicate evaluation and in-place
2502    /// byte-level mutation into a single heap walk. Returns `Some(result)`
2503    /// if the fused path fired, `None` to fall through to the generic
2504    /// two-pass code.
2505    ///
2506    /// Covers two shapes:
2507    /// 1. Fixed-width non-null literal assignments on non-indexed columns
2508    ///    → byte-patch every matched row in place (row length unchanged).
2509    /// 2. Single var-col literal assignment on a non-indexed column
2510    ///    → `patch_var_column_in_place` on every matched row (may shrink);
2511    ///    rows that can't be patched in place are collected for fallback.
2512    fn try_fused_scan_update(
2513        &mut self,
2514        table: &str,
2515        predicate: &Expr,
2516        resolved: &[(usize, Value)],
2517        changed_cols: &[usize],
2518    ) -> Option<Result<QueryResult, QueryError>> {
2519        // Build compiled predicate. Requires a schema borrow that must be
2520        // dropped before we call scan_patch_matching_logged.
2521        let compiled = {
2522            let schema = self.catalog.schema(table)?;
2523            let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2524            let fast = FastLayout::new(schema);
2525            compile_predicate(predicate, &columns, &fast, schema)?
2526        };
2527
2528        // ── Path 1: fixed-width fast patch ──────────────────────────
2529        let fixed_patches: Option<Vec<FastPatch>> = {
2530            let tbl = self.catalog.get_table(table)?;
2531            let schema = &tbl.schema;
2532            let all_fixed_nonnull = resolved
2533                .iter()
2534                .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2535            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2536            if all_fixed_nonnull && no_indexed {
2537                let layout = RowLayout::new(schema);
2538                let bitmap_size = layout.bitmap_size();
2539                Some(
2540                    resolved
2541                        .iter()
2542                        .map(|(idx, val)| {
2543                            let fixed_off = layout
2544                                .fixed_offset(*idx)
2545                                .expect("is_fixed_size already checked");
2546                            let field_off = 2 + bitmap_size + fixed_off;
2547                            let bytes: FixedBytes = match val {
2548                                Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2549                                Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2550                                Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2551                                Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2552                                Value::Uuid(v) => FixedBytes::Uuid(*v),
2553                                _ => unreachable!("all_fixed_nonnull guard"),
2554                            };
2555                            FastPatch {
2556                                field_off,
2557                                bitmap_byte_off: 2 + idx / 8,
2558                                bit_mask: 1u8 << (idx % 8),
2559                                bytes,
2560                            }
2561                        })
2562                        .collect(),
2563                )
2564            } else {
2565                None
2566            }
2567        };
2568        if let Some(patches) = fixed_patches {
2569            let result = self
2570                .catalog
2571                .scan_patch_matching_logged(table, compiled, |row| {
2572                    for p in &patches {
2573                        row[p.bitmap_byte_off] &= !p.bit_mask;
2574                        let field_bytes = p.bytes.as_slice();
2575                        row[p.field_off..p.field_off + field_bytes.len()]
2576                            .copy_from_slice(field_bytes);
2577                    }
2578                    Some(row.len() as u16)
2579                })
2580                .map_err(|e| e.to_string());
2581            match result {
2582                Ok((count, _)) => {
2583                    self.view_registry.mark_dependents_dirty(table);
2584                    return Some(Ok(QueryResult::Modified(count)));
2585                }
2586                Err(e) => return Some(Err(QueryError::Execution(e))),
2587            }
2588        }
2589
2590        // ── Path 2: single var-col shrink fast patch ────────────────
2591        let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2592            let tbl = self.catalog.get_table(table)?;
2593            let schema = &tbl.schema;
2594            let is_single = resolved.len() == 1;
2595            let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2596            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2597            if is_single && is_var && no_indexed {
2598                let (idx, val) = &resolved[0];
2599                let bytes_opt = match val {
2600                    Value::Str(s) => Some(s.as_bytes().to_vec()),
2601                    Value::Bytes(b) => Some(b.clone()),
2602                    Value::Empty => None,
2603                    _ => return None, // type mismatch, fall through
2604                };
2605                Some((*idx, bytes_opt))
2606            } else {
2607                None
2608            }
2609        };
2610        if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2611            // Build a fresh RowLayout before the mutable borrow.
2612            let layout = {
2613                let schema = self.catalog.schema(table)?;
2614                RowLayout::new(schema)
2615            };
2616            let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2617            let result = self
2618                .catalog
2619                .scan_patch_matching_logged(table, compiled, |row| {
2620                    patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2621                })
2622                .map_err(|e| e.to_string());
2623            match result {
2624                Ok((mut count, fallback_rids)) => {
2625                    // Handle rows where in-place patch failed (new > old).
2626                    for rid in fallback_rids {
2627                        let mut row = match self.catalog.get(table, rid) {
2628                            Some(r) => r,
2629                            None => continue,
2630                        };
2631                        for (idx, val) in resolved.iter() {
2632                            row[*idx] = val.clone();
2633                        }
2634                        self.catalog
2635                            .update_hinted(table, rid, &row, Some(changed_cols))
2636                            .map_err(|e| e.to_string())
2637                            .ok();
2638                        count += 1;
2639                    }
2640                    self.view_registry.mark_dependents_dirty(table);
2641                    return Some(Ok(QueryResult::Modified(count)));
2642                }
2643                Err(e) => return Some(Err(QueryError::Execution(e))),
2644            }
2645        }
2646
2647        None // no fused path applicable — fall through
2648    }
2649
2650    /// Mission C Phase 3: schema is looked up via `self.catalog.schema(table)`
2651    /// inside the branches that actually need it. Previously the caller had
2652    /// to clone the full Schema (6+ String allocs) before every mutation just
2653    /// so this function could borrow it — a cost the update/delete hot path
2654    /// did not need.
2655    fn collect_rids_for_mutation(
2656        &mut self,
2657        input: &PlanNode,
2658        table: &str,
2659    ) -> Result<Vec<RowId>, QueryError> {
2660        match input {
2661            PlanNode::SeqScan { table: t } if t == table => {
2662                // "Update/delete everything" — rare but legal.
2663                let rids: Vec<RowId> = self
2664                    .catalog
2665                    .scan(table)
2666                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2667                    .map(|(rid, _)| rid)
2668                    .collect();
2669                Ok(rids)
2670            }
2671            PlanNode::IndexScan {
2672                table: t,
2673                column,
2674                key,
2675            } if t == table => {
2676                let key_value = literal_to_value(key)?;
2677
2678                // Indexed case: single lookup, 0 or 1 rows.
2679                // Mission D7: int-specialized fast path on int-keyed indexes
2680                // (primary keys, created_at, etc.) — the common case for
2681                // `update_by_pk` / `delete where id = ?`.
2682                //
2683                // Scope the `tbl` borrow so it's released before we fall
2684                // through to the scan-based paths below (which reborrow
2685                // `self.catalog`).
2686                {
2687                    let tbl = self
2688                        .catalog
2689                        .get_table(table)
2690                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2691                    if tbl.has_index(column) {
2692                        let rids = tbl.index_lookup_all(column, &key_value);
2693                        return Ok(rids);
2694                    }
2695                }
2696
2697                // No index: the planner folds `.col = literal` to IndexScan
2698                // regardless of whether the column is actually unique. When
2699                // there's no index we must behave like Filter(SeqScan) and
2700                // return *all* matching RIDs — not just the first one.
2701                let schema = self
2702                    .catalog
2703                    .schema(table)
2704                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2705                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2706                let fast = FastLayout::new(schema);
2707                let synth = Expr::BinaryOp(
2708                    Box::new(Expr::Field(column.clone())),
2709                    BinOp::Eq,
2710                    Box::new(key.clone()),
2711                );
2712                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2713                    // Mission F: skip the first 4 Vec doublings.
2714                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2715                    self.catalog
2716                        .for_each_row_raw(table, |rid, data| {
2717                            if compiled(data) {
2718                                rids.push(rid);
2719                            }
2720                        })
2721                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2722                    return Ok(rids);
2723                }
2724
2725                // Fallback: decode each row, compare values.
2726                let col_idx =
2727                    schema
2728                        .column_index(column)
2729                        .ok_or_else(|| QueryError::ColumnNotFound {
2730                            table: String::new(),
2731                            column: column.clone(),
2732                        })?;
2733                let rids: Vec<RowId> = self
2734                    .catalog
2735                    .scan(table)
2736                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2737                    .filter_map(|(rid, row)| {
2738                        if row[col_idx] == key_value {
2739                            Some(rid)
2740                        } else {
2741                            None
2742                        }
2743                    })
2744                    .collect();
2745                Ok(rids)
2746            }
2747            PlanNode::Filter {
2748                input: inner,
2749                predicate,
2750            } => {
2751                if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2752                    if t != table {
2753                        return self.generic_rid_match(input, table);
2754                    }
2755                    let schema = self
2756                        .catalog
2757                        .schema(table)
2758                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2759                    let columns: Vec<String> =
2760                        schema.columns.iter().map(|c| c.name.clone()).collect();
2761                    let fast = FastLayout::new(schema);
2762                    let row_layout = RowLayout::new(schema);
2763
2764                    // Try compiled predicate first.
2765                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2766                        // Mission F: skip the first 4 Vec doublings.
2767                        let mut rids: Vec<RowId> = Vec::with_capacity(64);
2768                        self.catalog
2769                            .for_each_row_raw(table, |rid, data| {
2770                                if compiled(data) {
2771                                    rids.push(rid);
2772                                }
2773                            })
2774                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2775                        return Ok(rids);
2776                    }
2777
2778                    // Fallback: selective decode + eval.
2779                    let pred_cols = predicate_column_indices(predicate, &columns);
2780                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2781                    self.catalog
2782                        .for_each_row_raw(table, |rid, data| {
2783                            let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2784                            if eval_predicate(predicate, &pred_row, &columns) {
2785                                rids.push(rid);
2786                            }
2787                        })
2788                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2789                    return Ok(rids);
2790                }
2791                self.generic_rid_match(input, table)
2792            }
2793            _ => self.generic_rid_match(input, table),
2794        }
2795    }
2796
2797    /// Last-ditch generic match: execute the plan, collect matching rows,
2798    /// then find corresponding RowIds by value equality. This is the old
2799    /// O(N*M) code path; only used when the plan shape is something exotic.
2800    fn generic_rid_match(
2801        &mut self,
2802        input: &PlanNode,
2803        table: &str,
2804    ) -> Result<Vec<RowId>, QueryError> {
2805        let result = self.execute_plan(input)?;
2806        let rows = match result {
2807            QueryResult::Rows { rows, .. } => rows,
2808            _ => return Err("mutation source must be rows".into()),
2809        };
2810        let matching: Vec<RowId> = self
2811            .catalog
2812            .scan(table)
2813            .map_err(|e| QueryError::StorageError(e.to_string()))?
2814            .filter(|(_, row)| rows.iter().any(|r| r == row))
2815            .map(|(rid, _)| rid)
2816            .collect();
2817        Ok(matching)
2818    }
2819}
2820
2821pub(super) fn execute_window(
2822    result: QueryResult,
2823    windows: &[WindowDef],
2824) -> Result<QueryResult, QueryError> {
2825    let (mut columns, mut rows) = match result {
2826        QueryResult::Rows { columns, rows } => (columns, rows),
2827        _ => return Err("window function requires row input".into()),
2828    };
2829
2830    for wdef in windows {
2831        // Resolve partition/order column indices against current columns.
2832        let part_indices: Vec<usize> = wdef
2833            .partition_by
2834            .iter()
2835            .map(|name| {
2836                columns
2837                    .iter()
2838                    .position(|c| c == name)
2839                    .ok_or_else(|| format!("window partition column '{name}' not found"))
2840            })
2841            .collect::<Result<Vec<_>, _>>()?;
2842
2843        let ord_indices: Vec<(usize, bool)> = wdef
2844            .order_by
2845            .iter()
2846            .map(|sk| {
2847                columns
2848                    .iter()
2849                    .position(|c| c == &sk.field)
2850                    .map(|i| (i, sk.descending))
2851                    .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2852            })
2853            .collect::<Result<Vec<_>, _>>()?;
2854
2855        // Resolve the argument column index (for aggregate windows).
2856        let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2857            match arg {
2858                Expr::Field(name) => {
2859                    if name == "*" {
2860                        None // count(*) style — no specific column
2861                    } else {
2862                        Some(
2863                            columns
2864                                .iter()
2865                                .position(|c| c == name)
2866                                .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2867                        )
2868                    }
2869                }
2870                _ => None,
2871            }
2872        } else {
2873            None
2874        };
2875
2876        // Build a sort-index to sort rows by partition_by then order_by
2877        // without actually reordering the original Vec (we need original
2878        // order to write results back).
2879        let n = rows.len();
2880        let mut indices: Vec<usize> = (0..n).collect();
2881        indices.sort_by(|&a, &b| {
2882            // Compare partition keys first.
2883            for &pi in &part_indices {
2884                let cmp = rows[a][pi].cmp(&rows[b][pi]);
2885                if cmp != std::cmp::Ordering::Equal {
2886                    return cmp;
2887                }
2888            }
2889            // Then order keys.
2890            for &(oi, desc) in &ord_indices {
2891                let cmp = rows[a][oi].cmp(&rows[b][oi]);
2892                if cmp != std::cmp::Ordering::Equal {
2893                    return if desc { cmp.reverse() } else { cmp };
2894                }
2895            }
2896            std::cmp::Ordering::Equal
2897        });
2898
2899        // Compute window values in sorted order, tracking partition boundaries.
2900        let mut win_values: Vec<Value> = vec![Value::Empty; n];
2901        let mut partition_start = 0usize;
2902        // Running state for aggregate windows:
2903        let mut running_count: i64 = 0;
2904        let mut running_int_sum: i64 = 0;
2905        let mut running_float_sum: f64 = 0.0;
2906        let mut running_saw_float = false;
2907        let mut running_min: Option<Value> = None;
2908        let mut running_max: Option<Value> = None;
2909        let mut rank_counter: i64 = 0;
2910        let mut dense_rank_counter: i64 = 0;
2911        let mut prev_order_key: Option<Vec<Value>> = None;
2912        let mut same_rank_count: i64 = 0;
2913
2914        for sorted_pos in 0..n {
2915            let row_idx = indices[sorted_pos];
2916
2917            // Detect partition boundary.
2918            let new_partition = if sorted_pos == 0 {
2919                true
2920            } else {
2921                let prev_row_idx = indices[sorted_pos - 1];
2922                part_indices
2923                    .iter()
2924                    .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
2925            };
2926
2927            if new_partition {
2928                partition_start = sorted_pos;
2929                running_count = 0;
2930                running_int_sum = 0;
2931                running_float_sum = 0.0;
2932                running_saw_float = false;
2933                running_min = None;
2934                running_max = None;
2935                rank_counter = 0;
2936                dense_rank_counter = 0;
2937                prev_order_key = None;
2938                same_rank_count = 0;
2939            }
2940
2941            // Extract current order key for rank tracking.
2942            let current_order_key: Vec<Value> = ord_indices
2943                .iter()
2944                .map(|&(oi, _)| rows[row_idx][oi].clone())
2945                .collect();
2946            let same_as_prev = prev_order_key.as_ref() == Some(&current_order_key);
2947
2948            let value = match wdef.function {
2949                WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
2950                WindowFunc::Rank => {
2951                    if same_as_prev {
2952                        same_rank_count += 1;
2953                    } else {
2954                        rank_counter += same_rank_count + 1;
2955                        same_rank_count = 0;
2956                        if rank_counter == 0 {
2957                            rank_counter = 1;
2958                        }
2959                    }
2960                    Value::Int(rank_counter)
2961                }
2962                WindowFunc::DenseRank => {
2963                    if !same_as_prev {
2964                        dense_rank_counter += 1;
2965                    }
2966                    Value::Int(dense_rank_counter)
2967                }
2968                WindowFunc::Sum => {
2969                    if let Some(ci) = arg_col_idx {
2970                        match &rows[row_idx][ci] {
2971                            Value::Int(v) => running_int_sum += v,
2972                            Value::Float(v) => {
2973                                running_float_sum += v;
2974                                running_saw_float = true;
2975                            }
2976                            _ => {}
2977                        }
2978                    }
2979                    if running_saw_float {
2980                        Value::Float(running_float_sum + running_int_sum as f64)
2981                    } else {
2982                        Value::Int(running_int_sum)
2983                    }
2984                }
2985                WindowFunc::Avg => {
2986                    if let Some(ci) = arg_col_idx {
2987                        match &rows[row_idx][ci] {
2988                            Value::Int(v) => {
2989                                running_float_sum += *v as f64;
2990                                running_count += 1;
2991                            }
2992                            Value::Float(v) => {
2993                                running_float_sum += v;
2994                                running_count += 1;
2995                            }
2996                            _ => {}
2997                        }
2998                    }
2999                    if running_count == 0 {
3000                        Value::Empty
3001                    } else {
3002                        Value::Float(running_float_sum / running_count as f64)
3003                    }
3004                }
3005                WindowFunc::Count => {
3006                    if let Some(ci) = arg_col_idx {
3007                        if !rows[row_idx][ci].is_empty() {
3008                            running_count += 1;
3009                        }
3010                    } else {
3011                        // count(*) — count all rows
3012                        running_count += 1;
3013                    }
3014                    Value::Int(running_count)
3015                }
3016                WindowFunc::Min => {
3017                    if let Some(ci) = arg_col_idx {
3018                        let v = &rows[row_idx][ci];
3019                        if !v.is_empty() {
3020                            running_min = Some(match &running_min {
3021                                None => v.clone(),
3022                                Some(cur) => {
3023                                    if v < cur {
3024                                        v.clone()
3025                                    } else {
3026                                        cur.clone()
3027                                    }
3028                                }
3029                            });
3030                        }
3031                    }
3032                    running_min.clone().unwrap_or(Value::Empty)
3033                }
3034                WindowFunc::Max => {
3035                    if let Some(ci) = arg_col_idx {
3036                        let v = &rows[row_idx][ci];
3037                        if !v.is_empty() {
3038                            running_max = Some(match &running_max {
3039                                None => v.clone(),
3040                                Some(cur) => {
3041                                    if v > cur {
3042                                        v.clone()
3043                                    } else {
3044                                        cur.clone()
3045                                    }
3046                                }
3047                            });
3048                        }
3049                    }
3050                    running_max.clone().unwrap_or(Value::Empty)
3051                }
3052            };
3053
3054            prev_order_key = Some(current_order_key);
3055            win_values[row_idx] = value;
3056        }
3057
3058        // Append the computed window column to each row.
3059        for (ri, row) in rows.iter_mut().enumerate() {
3060            row.push(win_values[ri].clone());
3061        }
3062        columns.push(wdef.output_name.clone());
3063    }
3064
3065    Ok(QueryResult::Rows { columns, rows })
3066}
3067
3068/// Mission E2b: compute one aggregate over a set of rows in a group.
3069pub(super) fn compute_group_aggregate(
3070    func: AggFunc,
3071    all_rows: &[Vec<Value>],
3072    row_indices: &[usize],
3073    col_idx: usize,
3074) -> Value {
3075    match func {
3076        AggFunc::Count => {
3077            if col_idx == usize::MAX {
3078                // count(*) — count all rows in the group.
3079                return Value::Int(row_indices.len() as i64);
3080            }
3081            let count = row_indices
3082                .iter()
3083                .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3084                .count();
3085            Value::Int(count as i64)
3086        }
3087        AggFunc::CountDistinct => {
3088            let mut seen = std::collections::HashSet::new();
3089            for &ri in row_indices {
3090                let v = &all_rows[ri][col_idx];
3091                if !v.is_empty() {
3092                    seen.insert(v.clone());
3093                }
3094            }
3095            Value::Int(seen.len() as i64)
3096        }
3097        AggFunc::Sum => {
3098            // Mirror the scalar Sum path: accumulate int and float
3099            // contributions separately and promote the final result to
3100            // Float if any Float row was observed. Prevents silent
3101            // drop of Float columns in GROUP BY aggregates.
3102            let mut int_sum: i64 = 0;
3103            let mut float_sum: f64 = 0.0;
3104            let mut saw_float = false;
3105            for &ri in row_indices {
3106                match &all_rows[ri][col_idx] {
3107                    Value::Int(v) => int_sum += v,
3108                    Value::Float(v) => {
3109                        float_sum += *v;
3110                        saw_float = true;
3111                    }
3112                    _ => {}
3113                }
3114            }
3115            if saw_float {
3116                Value::Float(float_sum + int_sum as f64)
3117            } else {
3118                Value::Int(int_sum)
3119            }
3120        }
3121        AggFunc::Avg => {
3122            let mut sum = 0.0f64;
3123            let mut count = 0usize;
3124            for &ri in row_indices {
3125                match &all_rows[ri][col_idx] {
3126                    Value::Int(v) => {
3127                        sum += *v as f64;
3128                        count += 1;
3129                    }
3130                    Value::Float(v) => {
3131                        sum += *v;
3132                        count += 1;
3133                    }
3134                    _ => {}
3135                }
3136            }
3137            if count == 0 {
3138                Value::Empty
3139            } else {
3140                Value::Float(sum / count as f64)
3141            }
3142        }
3143        AggFunc::Min => row_indices
3144            .iter()
3145            .map(|&ri| &all_rows[ri][col_idx])
3146            .filter(|v| !v.is_empty())
3147            .min()
3148            .cloned()
3149            .unwrap_or(Value::Empty),
3150        AggFunc::Max => row_indices
3151            .iter()
3152            .map(|&ri| &all_rows[ri][col_idx])
3153            .filter(|v| !v.is_empty())
3154            .max()
3155            .cloned()
3156            .unwrap_or(Value::Empty),
3157    }
3158}
3159
3160/// Mission E1.3: try to extract equi-join key indices from a join `on`
3161/// predicate. Returns `Some((left_col_idx, right_col_idx))` when the
3162/// predicate is exactly `L = R` (or `R = L`) and both sides resolve
3163/// cleanly — `L` to the left subtree's column list and `R` to the right
3164/// subtree's column list.
3165///
3166/// This is deliberately narrow. We only recognise the two shapes:
3167///   * `QualifiedField = QualifiedField`  (`u.id = o.user_id`)
3168///   * `Field = Field`                    (`.id = .user_id`, unqualified)
3169///
3170/// Anything else — conjunctions, constants, function calls, or predicates
3171/// that touch the same side on both halves — falls through to the
3172/// nested-loop path unchanged.
3173pub(super) fn try_extract_equi_join_keys(
3174    pred: &Expr,
3175    left_columns: &[String],
3176    right_columns: &[String],
3177) -> Option<(usize, usize)> {
3178    let (lhs, op, rhs) = match pred {
3179        Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3180        _ => return None,
3181    };
3182    if op != BinOp::Eq {
3183        return None;
3184    }
3185    // Normal orientation: lhs in left, rhs in right.
3186    if let (Some(li), Some(ri)) = (
3187        resolve_side_column(lhs, left_columns),
3188        resolve_side_column(rhs, right_columns),
3189    ) {
3190        return Some((li, ri));
3191    }
3192    // Swapped: rhs in left, lhs in right. Both sides of `=` are
3193    // commutative so this is safe.
3194    if let (Some(li), Some(ri)) = (
3195        resolve_side_column(rhs, left_columns),
3196        resolve_side_column(lhs, right_columns),
3197    ) {
3198        return Some((li, ri));
3199    }
3200    None
3201}
3202
3203fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3204    match expr {
3205        Expr::QualifiedField { qualifier, field } => {
3206            // Byte-level match so we don't allocate a fresh `format!` on
3207            // every call — this runs once per plan, so allocation would be
3208            // cheap, but the match is trivial enough to keep inline with
3209            // the eval_expr version.
3210            let q = qualifier.as_bytes();
3211            let f = field.as_bytes();
3212            columns.iter().position(|c| {
3213                let b = c.as_bytes();
3214                b.len() == q.len() + 1 + f.len()
3215                    && b[..q.len()] == *q
3216                    && b[q.len()] == b'.'
3217                    && b[q.len() + 1..] == *f
3218            })
3219        }
3220        Expr::Field(name) => columns.iter().position(|c| c == name),
3221        _ => None,
3222    }
3223}
3224
3225/// Mission E1.3: O(L + R) hash join. Builds a `FxHashMap<Value, Vec<usize>>`
3226/// over the right (inner) side's join keys, then streams the left (outer)
3227/// side and for each probe row emits every combined row whose right-side
3228/// key matches. For `JoinKind::LeftOuter`, unmatched left rows are emitted
3229/// padded with `Value::Empty` on the right side.
3230///
3231/// The right side is always the build side. That choice is forced for
3232/// LeftOuter (the left side must stream so we can detect orphans), and
3233/// for Inner it's a reasonable default — left-deep plans tend to grow the
3234/// left side with each join, so the un-joined right leaf is often the
3235/// smaller of the two at each level.
3236pub(super) fn hash_join(
3237    left_columns: Vec<String>,
3238    left_rows: Vec<Vec<Value>>,
3239    right_columns: Vec<String>,
3240    right_rows: Vec<Vec<Value>>,
3241    left_key_idx: usize,
3242    right_key_idx: usize,
3243    kind: JoinKind,
3244) -> QueryResult {
3245    use rustc_hash::FxHashMap;
3246
3247    let n_left = left_columns.len();
3248    let n_right = right_columns.len();
3249    let mut columns = Vec::with_capacity(n_left + n_right);
3250    columns.extend(left_columns);
3251    columns.extend(right_columns);
3252
3253    // Build: right_key -> list of right-row indices. Pre-size to the row
3254    // count so the map doesn't rehash mid-build.
3255    let mut build: FxHashMap<Value, Vec<usize>> =
3256        FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3257    for (i, row) in right_rows.iter().enumerate() {
3258        // Skip Empty keys on the build side — they can never match under
3259        // SQL semantics (NULL ≠ NULL) and would collapse all nullables to
3260        // one bucket.
3261        if matches!(row[right_key_idx], Value::Empty) {
3262            continue;
3263        }
3264        build.entry(row[right_key_idx].clone()).or_default().push(i);
3265    }
3266
3267    // Reasonable starting capacity — inner joins produce ≥ left_rows.len()
3268    // rows in the common 1:1 case, left-outer always emits ≥ left_rows.len().
3269    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3270
3271    for left_row in &left_rows {
3272        let key = &left_row[left_key_idx];
3273        let matched = if matches!(key, Value::Empty) {
3274            None
3275        } else {
3276            build.get(key)
3277        };
3278        match matched {
3279            Some(matches) if !matches.is_empty() => {
3280                for &ri in matches {
3281                    let right_row = &right_rows[ri];
3282                    let mut combined = Vec::with_capacity(n_left + n_right);
3283                    combined.extend_from_slice(left_row);
3284                    combined.extend_from_slice(right_row);
3285                    rows.push(combined);
3286                }
3287            }
3288            _ => {
3289                if matches!(kind, JoinKind::LeftOuter) {
3290                    let mut row = Vec::with_capacity(n_left + n_right);
3291                    row.extend_from_slice(left_row);
3292                    row.resize(n_left + n_right, Value::Empty);
3293                    rows.push(row);
3294                }
3295            }
3296        }
3297    }
3298
3299    QueryResult::Rows { columns, rows }
3300}
3301
3302/// Lower unindexed `RangeScan` nodes to `Filter(SeqScan)` so that all
3303/// downstream fast paths (count, project+limit, sort+limit, agg, update,
3304/// delete) continue to fire.
3305///
3306/// The planner emits `RangeScan` speculatively for every range inequality
3307/// (`.age > 30`) because it has no catalog access. When the column has a
3308/// B-tree index, `RangeScan` is the correct plan. When it doesn't, the
3309/// executor's `RangeScan` fallback materialises every matching row with
3310/// full `decode_row` — bypassing the compiled-predicate fast paths that
3311/// `Filter(SeqScan)` would trigger.
3312///
3313/// This pass runs once per query, before execution.
3314pub(super) fn lower_unindexed_range_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3315    match plan {
3316        PlanNode::RangeScan {
3317            table,
3318            column,
3319            start,
3320            end,
3321        } => {
3322            if let Some(tbl) = catalog.get_table(table) {
3323                // Keep RangeScan only for unique indexes — their btree
3324                // stores raw column values. Non-unique indexes store
3325                // composite keys that don't directly compare against
3326                // column values, so lower them to Filter(SeqScan).
3327                if tbl.is_index_unique(column) == Some(true) {
3328                    return plan.clone();
3329                }
3330            }
3331            let pred = synthesize_range_predicate(column, start, end);
3332            PlanNode::Filter {
3333                input: Box::new(PlanNode::SeqScan {
3334                    table: table.clone(),
3335                }),
3336                predicate: pred,
3337            }
3338        }
3339        PlanNode::Filter { input, predicate } => PlanNode::Filter {
3340            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3341            predicate: predicate.clone(),
3342        },
3343        PlanNode::Project { input, fields } => PlanNode::Project {
3344            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3345            fields: fields.clone(),
3346        },
3347        PlanNode::Sort { input, keys } => PlanNode::Sort {
3348            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3349            keys: keys.clone(),
3350        },
3351        PlanNode::Limit { input, count } => PlanNode::Limit {
3352            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3353            count: count.clone(),
3354        },
3355        PlanNode::Offset { input, count } => PlanNode::Offset {
3356            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3357            count: count.clone(),
3358        },
3359        PlanNode::Aggregate {
3360            input,
3361            function,
3362            field,
3363        } => PlanNode::Aggregate {
3364            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3365            function: *function,
3366            field: field.clone(),
3367        },
3368        PlanNode::Distinct { input } => PlanNode::Distinct {
3369            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3370        },
3371        PlanNode::GroupBy {
3372            input,
3373            keys,
3374            aggregates,
3375            having,
3376        } => PlanNode::GroupBy {
3377            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3378            keys: keys.clone(),
3379            aggregates: aggregates.clone(),
3380            having: having.clone(),
3381        },
3382        PlanNode::Update {
3383            input,
3384            table,
3385            assignments,
3386        } => PlanNode::Update {
3387            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3388            table: table.clone(),
3389            assignments: assignments.clone(),
3390        },
3391        PlanNode::Delete { input, table } => PlanNode::Delete {
3392            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3393            table: table.clone(),
3394        },
3395        PlanNode::Window { input, windows } => PlanNode::Window {
3396            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3397            windows: windows.clone(),
3398        },
3399        PlanNode::Union { left, right, all } => PlanNode::Union {
3400            left: Box::new(lower_unindexed_range_scans(catalog, left)),
3401            right: Box::new(lower_unindexed_range_scans(catalog, right)),
3402            all: *all,
3403        },
3404        PlanNode::Explain { input } => PlanNode::Explain {
3405            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3406        },
3407        PlanNode::NestedLoopJoin {
3408            left,
3409            right,
3410            on,
3411            kind,
3412        } => PlanNode::NestedLoopJoin {
3413            left: Box::new(lower_unindexed_range_scans(catalog, left)),
3414            right: Box::new(lower_unindexed_range_scans(catalog, right)),
3415            on: on.clone(),
3416            kind: *kind,
3417        },
3418        // Leaf nodes: no children to recurse into.
3419        _ => plan.clone(),
3420    }
3421}
3422
3423/// Synthesize a range predicate from RangeScan bounds for the fallback path.
3424pub(super) fn synthesize_range_predicate(
3425    column: &str,
3426    start: &Option<(Expr, bool)>,
3427    end: &Option<(Expr, bool)>,
3428) -> Expr {
3429    let lower = start.as_ref().map(|(expr, inclusive)| {
3430        let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3431        Expr::BinaryOp(
3432            Box::new(Expr::Field(column.to_string())),
3433            op,
3434            Box::new(expr.clone()),
3435        )
3436    });
3437    let upper = end.as_ref().map(|(expr, inclusive)| {
3438        let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3439        Expr::BinaryOp(
3440            Box::new(Expr::Field(column.to_string())),
3441            op,
3442            Box::new(expr.clone()),
3443        )
3444    });
3445    match (lower, upper) {
3446        (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3447        (Some(l), None) => l,
3448        (None, Some(u)) => u,
3449        (None, None) => Expr::Literal(Literal::Bool(true)),
3450    }
3451}
3452
3453/// Check if a value falls within a range (used in last-resort decoded-row eval).
3454pub(super) fn range_matches(
3455    val: &Value,
3456    start: &Option<Value>,
3457    start_inc: bool,
3458    end: &Option<Value>,
3459    end_inc: bool,
3460) -> bool {
3461    if let Some(ref s) = start {
3462        if start_inc {
3463            if val < s {
3464                return false;
3465            }
3466        } else if val <= s {
3467            return false;
3468        }
3469    }
3470    if let Some(ref e) = end {
3471        if end_inc {
3472            if val > e {
3473                return false;
3474            }
3475        } else if val >= e {
3476            return false;
3477        }
3478    }
3479    true
3480}
3481
3482/// Format a `PlanNode` tree as a human-readable, indented text
3483/// representation. Used by the `EXPLAIN` command.
3484pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3485    let indent = "  ".repeat(depth);
3486    match plan {
3487        PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3488        PlanNode::AliasScan { table, alias } => {
3489            format!("{indent}AliasScan table={table} alias={alias}")
3490        }
3491        PlanNode::IndexScan { table, column, key } => {
3492            format!("{indent}IndexScan table={table} column={column} key={key:?}")
3493        }
3494        PlanNode::RangeScan {
3495            table,
3496            column,
3497            start,
3498            end,
3499        } => {
3500            let s = match start {
3501                Some((expr, inc)) => {
3502                    let op = if *inc { ">=" } else { ">" };
3503                    format!("{op}{expr:?}")
3504                }
3505                None => "unbounded".to_string(),
3506            };
3507            let e = match end {
3508                Some((expr, inc)) => {
3509                    let op = if *inc { "<=" } else { "<" };
3510                    format!("{op}{expr:?}")
3511                }
3512                None => "unbounded".to_string(),
3513            };
3514            format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3515        }
3516        PlanNode::Filter { input, predicate } => {
3517            let child = format_plan_tree(input, depth + 1);
3518            format!("{indent}Filter predicate={predicate:?}\n{child}")
3519        }
3520        PlanNode::Project { input, fields } => {
3521            let names: Vec<String> = fields
3522                .iter()
3523                .map(|f| match &f.alias {
3524                    Some(a) => format!("{a}: {:?}", f.expr),
3525                    None => format!("{:?}", f.expr),
3526                })
3527                .collect();
3528            let child = format_plan_tree(input, depth + 1);
3529            format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3530        }
3531        PlanNode::Sort { input, keys } => {
3532            let ks: Vec<String> = keys
3533                .iter()
3534                .map(|k| {
3535                    if k.descending {
3536                        format!("{} desc", k.field)
3537                    } else {
3538                        k.field.clone()
3539                    }
3540                })
3541                .collect();
3542            let child = format_plan_tree(input, depth + 1);
3543            format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3544        }
3545        PlanNode::Limit { input, count } => {
3546            let child = format_plan_tree(input, depth + 1);
3547            format!("{indent}Limit count={count:?}\n{child}")
3548        }
3549        PlanNode::Offset { input, count } => {
3550            let child = format_plan_tree(input, depth + 1);
3551            format!("{indent}Offset count={count:?}\n{child}")
3552        }
3553        PlanNode::Aggregate {
3554            input,
3555            function,
3556            field,
3557        } => {
3558            let f = field.as_deref().unwrap_or("*");
3559            let child = format_plan_tree(input, depth + 1);
3560            format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3561        }
3562        PlanNode::NestedLoopJoin {
3563            left,
3564            right,
3565            on,
3566            kind,
3567        } => {
3568            let left_child = format_plan_tree(left, depth + 1);
3569            let right_child = format_plan_tree(right, depth + 1);
3570            let on_str = match on {
3571                Some(pred) => format!("{pred:?}"),
3572                None => "none".to_string(),
3573            };
3574            format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3575        }
3576        PlanNode::Distinct { input } => {
3577            let child = format_plan_tree(input, depth + 1);
3578            format!("{indent}Distinct\n{child}")
3579        }
3580        PlanNode::GroupBy {
3581            input,
3582            keys,
3583            aggregates,
3584            having,
3585        } => {
3586            let agg_strs: Vec<String> = aggregates
3587                .iter()
3588                .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3589                .collect();
3590            let having_str = match having {
3591                Some(h) => format!(" having={h:?}"),
3592                None => String::new(),
3593            };
3594            let child = format_plan_tree(input, depth + 1);
3595            format!(
3596                "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3597                keys.join(", "),
3598                agg_strs.join(", "),
3599            )
3600        }
3601        PlanNode::Insert { table, assignments } => {
3602            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3603            format!("{indent}Insert table={table} cols=[{}]", cols.join(", "))
3604        }
3605        PlanNode::Upsert {
3606            table,
3607            key_column,
3608            assignments,
3609            on_conflict,
3610        } => {
3611            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3612            let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3613            if conflict_cols.is_empty() {
3614                format!(
3615                    "{indent}Upsert table={table} key={key_column} cols=[{}]",
3616                    cols.join(", ")
3617                )
3618            } else {
3619                format!(
3620                    "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3621                    cols.join(", "),
3622                    conflict_cols.join(", ")
3623                )
3624            }
3625        }
3626        PlanNode::Update {
3627            input,
3628            table,
3629            assignments,
3630        } => {
3631            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3632            let child = format_plan_tree(input, depth + 1);
3633            format!(
3634                "{indent}Update table={table} set=[{}]\n{child}",
3635                cols.join(", ")
3636            )
3637        }
3638        PlanNode::Delete { input, table } => {
3639            let child = format_plan_tree(input, depth + 1);
3640            format!("{indent}Delete table={table}\n{child}")
3641        }
3642        PlanNode::CreateTable { name, fields } => {
3643            let fs: Vec<String> = fields
3644                .iter()
3645                .map(|(n, t, r)| {
3646                    if *r {
3647                        format!("{n}: {t} required")
3648                    } else {
3649                        format!("{n}: {t}")
3650                    }
3651                })
3652                .collect();
3653            format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3654        }
3655        PlanNode::AlterTable { table, action } => {
3656            format!("{indent}AlterTable table={table} action={action:?}")
3657        }
3658        PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3659        PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3660        PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3661        PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3662        PlanNode::Window { input, windows } => {
3663            let ws: Vec<String> = windows
3664                .iter()
3665                .map(|w| format!("{:?} as {}", w.function, w.output_name))
3666                .collect();
3667            let child = format_plan_tree(input, depth + 1);
3668            format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3669        }
3670        PlanNode::Union { left, right, all } => {
3671            let kind = if *all { "UNION ALL" } else { "UNION" };
3672            let left_child = format_plan_tree(left, depth + 1);
3673            let right_child = format_plan_tree(right, depth + 1);
3674            format!("{indent}{kind}\n{left_child}\n{right_child}")
3675        }
3676        PlanNode::Explain { input } => {
3677            let child = format_plan_tree(input, depth + 1);
3678            format!("{indent}Explain\n{child}")
3679        }
3680        PlanNode::Begin => format!("{indent}Begin"),
3681        PlanNode::Commit => format!("{indent}Commit"),
3682        PlanNode::Rollback => format!("{indent}Rollback"),
3683    }
3684}