Skip to main content

powdb_query/executor/
plan_exec.rs

1//! The execute_plan method and associated helpers.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::{check_join_limit, Engine, MAX_SORT_ROWS};
15use powdb_storage::view::{ViewDef, ViewRegistry};
16
17impl Engine {
18    pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
19        match plan {
20            PlanNode::SeqScan { table } => {
21                // Auto-refresh dirty materialized views on read.
22                if self.view_registry.is_dirty(table) {
23                    self.refresh_view(table)?;
24                }
25                let schema = self
26                    .catalog
27                    .schema(table)
28                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
29                    .clone();
30                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
31                let rows: Vec<Vec<Value>> = self
32                    .catalog
33                    .scan(table)
34                    .map_err(|e| QueryError::StorageError(e.to_string()))?
35                    .map(|(_, row)| row)
36                    .collect();
37                Ok(QueryResult::Rows { columns, rows })
38            }
39
40            PlanNode::Filter { input, predicate } => {
41                // Materialize any IN-subqueries in the predicate before the
42                // scan loop — the closure can't call back into the engine.
43                // Correlated subqueries are left in place for per-row eval.
44                let materialized;
45                let predicate = if contains_subquery(predicate) {
46                    materialized = self.materialize_subqueries(predicate)?;
47                    &materialized
48                } else {
49                    predicate
50                };
51
52                // Correlated subquery path: per-row materialisation.
53                if contains_subquery(predicate) {
54                    let result = self.execute_plan(input)?;
55                    return match result {
56                        QueryResult::Rows { columns, rows } => {
57                            let mut filtered = Vec::new();
58                            for row in rows {
59                                let row_pred =
60                                    self.materialize_correlated_for_row(predicate, &row, &columns)?;
61                                if eval_predicate(&row_pred, &row, &columns) {
62                                    filtered.push(row);
63                                }
64                            }
65                            Ok(QueryResult::Rows {
66                                columns,
67                                rows: filtered,
68                            })
69                        }
70                        _ => Err("filter requires row input".into()),
71                    };
72                }
73
74                // Fast path: fuse Filter + SeqScan into a zero-copy streaming
75                // loop. Uses decode_column() to evaluate the predicate on only
76                // the columns it references, avoiding heap allocations for
77                // String/Bytes columns that aren't part of the filter.
78                if let PlanNode::SeqScan { table } = input.as_ref() {
79                    // Auto-refresh dirty materialized views.
80                    if self.view_registry.is_dirty(table) {
81                        self.refresh_view(table)?;
82                    }
83                    let schema = self
84                        .catalog
85                        .schema(table)
86                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
87                        .clone();
88                    let columns: Vec<String> =
89                        schema.columns.iter().map(|c| c.name.clone()).collect();
90                    let fast = FastLayout::new(&schema);
91                    let row_layout = RowLayout::new(&schema);
92                    // Mission F: pre-size to skip the first 4 Vec doublings
93                    // (4 → 8 → 16 → 32 → 64). On a 100K-row scan with 30%
94                    // selectivity that's ~4 fewer reallocations + memcpys.
95                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
96
97                    // Try compiled predicate for the filter check (handles
98                    // int leaves, string-eq leaves, and And conjunctions).
99                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
100                        self.catalog
101                            .for_each_row_raw(table, |_rid, data| {
102                                if compiled(data) {
103                                    rows.push(decode_row(&schema, data));
104                                }
105                            })
106                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
107                    } else {
108                        let pred_cols = predicate_column_indices(predicate, &columns);
109                        self.catalog
110                            .for_each_row_raw(table, |_rid, data| {
111                                let pred_row =
112                                    decode_selective(&schema, &row_layout, data, &pred_cols);
113                                if eval_predicate(predicate, &pred_row, &columns) {
114                                    rows.push(decode_row(&schema, data));
115                                }
116                            })
117                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
118                    }
119
120                    return Ok(QueryResult::Rows { columns, rows });
121                }
122
123                // General path: materialise then filter.
124                let result = self.execute_plan(input)?;
125                match result {
126                    QueryResult::Rows { columns, rows } => {
127                        let filtered: Vec<Vec<Value>> = rows
128                            .into_iter()
129                            .filter(|row| eval_predicate(predicate, row, &columns))
130                            .collect();
131                        Ok(QueryResult::Rows {
132                            columns,
133                            rows: filtered,
134                        })
135                    }
136                    _ => Err("filter requires row input".into()),
137                }
138            }
139
140            PlanNode::Project { input, fields } => {
141                // Fast path: Project over IndexScan — decode only projected
142                // columns from raw bytes instead of full decode_row.
143                if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
144                    let schema = self
145                        .catalog
146                        .schema(table)
147                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
148                        .clone();
149                    let all_columns: Vec<String> =
150                        schema.columns.iter().map(|c| c.name.clone()).collect();
151                    let key_value = literal_to_value(key)?;
152                    let tbl = self
153                        .catalog
154                        .get_table(table)
155                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
156
157                    let proj_columns: Vec<String> = fields
158                        .iter()
159                        .map(|f| {
160                            f.alias.clone().unwrap_or_else(|| match &f.expr {
161                                Expr::Field(name) => name.clone(),
162                                _ => "?".into(),
163                            })
164                        })
165                        .collect();
166
167                    // Determine which column indices the projection needs
168                    let proj_indices: Vec<usize> = fields
169                        .iter()
170                        .filter_map(|f| {
171                            if let Expr::Field(name) = &f.expr {
172                                all_columns.iter().position(|c| c == name)
173                            } else {
174                                None
175                            }
176                        })
177                        .collect();
178
179                    if tbl.has_index(column) {
180                        let layout = RowLayout::new(&schema);
181                        let rids = tbl.index_lookup_all(column, &key_value);
182                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
183                        for rid in rids {
184                            if let Some(data) = tbl.heap.get(rid) {
185                                let row: Vec<Value> = proj_indices
186                                    .iter()
187                                    .map(|&ci| decode_column(&schema, &layout, &data, ci))
188                                    .collect();
189                                rows.push(row);
190                            }
191                        }
192                        return Ok(QueryResult::Rows {
193                            columns: proj_columns,
194                            rows,
195                        });
196                    }
197                }
198
199                // Fast path: Project(Limit(Sort(Filter(SeqScan)))) — bounded
200                // top-N heap. Decodes only the sort key + projected columns,
201                // keeps at most `limit` rows in a heap. Also handles the
202                // Project(Limit(Sort(SeqScan))) variant (no filter).
203                if let PlanNode::Limit {
204                    input: inner,
205                    count: limit_expr,
206                } = input.as_ref()
207                {
208                    if let PlanNode::Sort {
209                        input: sort_input,
210                        keys,
211                    } = inner.as_ref()
212                    {
213                        // Fast path only for single-key sorts
214                        if keys.len() == 1 {
215                            let sort_field = &keys[0].field;
216                            let descending = keys[0].descending;
217                            let limit = match limit_expr {
218                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
219                                _ => usize::MAX,
220                            };
221                            let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
222                                match sort_input.as_ref() {
223                                    PlanNode::SeqScan { table } => (Some(table.as_str()), None),
224                                    PlanNode::Filter {
225                                        input: fi,
226                                        predicate,
227                                    } => {
228                                        if let PlanNode::SeqScan { table } = fi.as_ref() {
229                                            (Some(table.as_str()), Some(predicate))
230                                        } else {
231                                            (None, None)
232                                        }
233                                    }
234                                    _ => (None, None),
235                                };
236                            if let Some(table) = table_opt {
237                                if let Some(result) = self.project_filter_sort_limit_fast(
238                                    table, fields, sort_field, descending, limit, pred_opt,
239                                )? {
240                                    return Ok(result);
241                                }
242                            }
243                        }
244                    }
245                    // Fast path: Project(Limit(Filter(SeqScan))) — stream,
246                    // decode only projected columns, stop at limit.
247                    if let PlanNode::Filter {
248                        input: fi,
249                        predicate,
250                    } = inner.as_ref()
251                    {
252                        if let PlanNode::SeqScan { table } = fi.as_ref() {
253                            let limit = match limit_expr {
254                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
255                                _ => usize::MAX,
256                            };
257                            if let Some(result) = self.project_filter_limit_fast(
258                                table,
259                                fields,
260                                limit,
261                                Some(predicate),
262                            )? {
263                                return Ok(result);
264                            }
265                        }
266                    }
267                    // Fast path: Project(Limit(SeqScan)) — stream, no filter.
268                    if let PlanNode::SeqScan { table } = inner.as_ref() {
269                        let limit = match limit_expr {
270                            Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
271                            _ => usize::MAX,
272                        };
273                        if let Some(result) =
274                            self.project_filter_limit_fast(table, fields, limit, None)?
275                        {
276                            return Ok(result);
277                        }
278                    }
279                }
280
281                // Mission D4: Project(Filter(SeqScan)) without Limit. Reuses
282                // `project_filter_limit_fast` with limit = usize::MAX so the
283                // hot loop decodes only projected columns and uses the
284                // compiled predicate. Previously this fell through to the
285                // generic Filter branch which materialised every column via
286                // `decode_row` then re-projected — quadratic work.
287                //
288                // multi_col_and_filter (`U filter .age > 30 and .status =
289                // "active" { .name, .age }`) was 6.18ms (0.7x SQLite) and
290                // is the load-bearing workload for this fast path.
291                if let PlanNode::Filter {
292                    input: fi,
293                    predicate,
294                } = input.as_ref()
295                {
296                    if let PlanNode::SeqScan { table } = fi.as_ref() {
297                        if let Some(result) = self.project_filter_limit_fast(
298                            table,
299                            fields,
300                            usize::MAX,
301                            Some(predicate),
302                        )? {
303                            return Ok(result);
304                        }
305                    }
306                }
307
308                // Mission D4: Project(SeqScan) without Filter or Limit.
309                // Decode only projected columns; the previous fall-through
310                // built full Vec<Value> rows then re-projected.
311                if let PlanNode::SeqScan { table } = input.as_ref() {
312                    if let Some(result) =
313                        self.project_filter_limit_fast(table, fields, usize::MAX, None)?
314                    {
315                        return Ok(result);
316                    }
317                }
318
319                let result = self.execute_plan(input)?;
320                match result {
321                    QueryResult::Rows { columns, rows } => {
322                        let proj_columns: Vec<String> = fields
323                            .iter()
324                            .map(|f| {
325                                f.alias.clone().unwrap_or_else(|| match &f.expr {
326                                    Expr::Field(name) => name.clone(),
327                                    // Mission E1.2: `{ u.name }` projects as the
328                                    // qualified column name so callers can still
329                                    // disambiguate across the join output.
330                                    Expr::QualifiedField { qualifier, field } => {
331                                        format!("{qualifier}.{field}")
332                                    }
333                                    _ => "?".into(),
334                                })
335                            })
336                            .collect();
337                        let proj_rows: Vec<Vec<Value>> = rows
338                            .iter()
339                            .map(|row| {
340                                fields
341                                    .iter()
342                                    .map(|f| eval_expr(&f.expr, row, &columns))
343                                    .collect()
344                            })
345                            .collect();
346                        Ok(QueryResult::Rows {
347                            columns: proj_columns,
348                            rows: proj_rows,
349                        })
350                    }
351                    _ => Err("project requires row input".into()),
352                }
353            }
354
355            PlanNode::Sort { input, keys } => {
356                let result = self.execute_plan(input)?;
357                match result {
358                    QueryResult::Rows { columns, mut rows } => {
359                        // WS2: row-count cap is a cheap secondary guard; the
360                        // byte budget is the real OOM defense for the sort
361                        // buffer (a few very large rows pass the row cap).
362                        if rows.len() > MAX_SORT_ROWS {
363                            return Err(QueryError::SortLimitExceeded);
364                        }
365                        self.charge_rows(&rows)?;
366                        let key_indices: Vec<(usize, bool)> = keys
367                            .iter()
368                            .map(|k| {
369                                columns
370                                    .iter()
371                                    .position(|c| c == &k.field)
372                                    .map(|idx| (idx, k.descending))
373                                    .ok_or_else(|| QueryError::ColumnNotFound {
374                                        table: String::new(),
375                                        column: k.field.clone(),
376                                    })
377                            })
378                            .collect::<Result<_, QueryError>>()?;
379                        rows.sort_by(|a, b| {
380                            for &(col_idx, descending) in &key_indices {
381                                let cmp = a[col_idx].cmp(&b[col_idx]);
382                                let cmp = if descending { cmp.reverse() } else { cmp };
383                                if cmp != std::cmp::Ordering::Equal {
384                                    return cmp;
385                                }
386                            }
387                            std::cmp::Ordering::Equal
388                        });
389                        Ok(QueryResult::Rows { columns, rows })
390                    }
391                    _ => Err("sort requires row input".into()),
392                }
393            }
394
395            PlanNode::Limit { input, count } => {
396                let result = self.execute_plan(input)?;
397                let n = match count {
398                    Expr::Literal(Literal::Int(v)) => *v as usize,
399                    _ => return Err("limit must be integer literal".into()),
400                };
401                match result {
402                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
403                        columns,
404                        rows: rows.into_iter().take(n).collect(),
405                    }),
406                    _ => Err("limit requires row input".into()),
407                }
408            }
409
410            PlanNode::Offset { input, count } => {
411                let result = self.execute_plan(input)?;
412                let n = match count {
413                    Expr::Literal(Literal::Int(v)) => *v as usize,
414                    _ => return Err("offset must be integer literal".into()),
415                };
416                match result {
417                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
418                        columns,
419                        rows: rows.into_iter().skip(n).collect(),
420                    }),
421                    _ => Err("offset requires row input".into()),
422                }
423            }
424
425            PlanNode::Aggregate {
426                input,
427                function,
428                field,
429            } => {
430                // Fast path: count() over SeqScan — count rows without any decode
431                if *function == AggFunc::Count {
432                    if let PlanNode::SeqScan { table } = input.as_ref() {
433                        // Auto-refresh a dirty materialized view before
434                        // counting it — otherwise count(View) returns stale
435                        // data after an underlying mutation (F3).
436                        if self.view_registry.is_dirty(table) {
437                            self.refresh_view(table)?;
438                        }
439                        let mut count: i64 = 0;
440                        self.catalog
441                            .for_each_row_raw(table, |_rid, _data| {
442                                count += 1;
443                            })
444                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
445                        return Ok(QueryResult::Scalar(Value::Int(count)));
446                    }
447                    // Fast path: count() over Filter(SeqScan) — try compiled
448                    // predicate first, fall back to decode_column path.
449                    // Skip a predicate carrying a subquery: the raw-bytes
450                    // evaluators here don't materialise subqueries, so
451                    // `count(T filter .x in (...))` would silently count 0
452                    // (F1). Falling through routes it to the generic path
453                    // that resolves the subquery correctly.
454                    if let PlanNode::Filter {
455                        input: inner,
456                        predicate,
457                    } = input.as_ref()
458                    {
459                        if let PlanNode::SeqScan { table } = inner.as_ref() {
460                            if self.view_registry.is_dirty(table) {
461                                self.refresh_view(table)?;
462                            }
463                        }
464                        if let (PlanNode::SeqScan { table }, false) =
465                            (inner.as_ref(), contains_subquery(predicate))
466                        {
467                            let schema = self
468                                .catalog
469                                .schema(table)
470                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
471                                .clone();
472                            let columns: Vec<String> =
473                                schema.columns.iter().map(|c| c.name.clone()).collect();
474                            let fast = FastLayout::new(&schema);
475                            let row_layout = RowLayout::new(&schema);
476
477                            // Try compiled predicate (zero-allocation hot path).
478                            // Handles int leaves, string-eq leaves, AND conjunctions.
479                            if let Some(compiled) =
480                                compile_predicate(predicate, &columns, &fast, &schema)
481                            {
482                                let mut count: i64 = 0;
483                                self.catalog
484                                    .for_each_row_raw(table, |_rid, data| {
485                                        if compiled(data) {
486                                            count += 1;
487                                        }
488                                    })
489                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
490                                return Ok(QueryResult::Scalar(Value::Int(count)));
491                            }
492
493                            // Fallback: decode predicate columns
494                            let pred_cols = predicate_column_indices(predicate, &columns);
495                            let mut count: i64 = 0;
496                            self.catalog
497                                .for_each_row_raw(table, |_rid, data| {
498                                    let pred_row =
499                                        decode_selective(&schema, &row_layout, data, &pred_cols);
500                                    if eval_predicate(predicate, &pred_row, &columns) {
501                                        count += 1;
502                                    }
503                                })
504                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
505
506                            return Ok(QueryResult::Scalar(Value::Int(count)));
507                        }
508                    }
509                }
510
511                // Fast path: sum/avg/min/max over a single fixed-size int
512                // column with an optional compiled filter predicate. Walks
513                // raw row bytes, zero allocation per row.
514                if matches!(
515                    function,
516                    AggFunc::Sum
517                        | AggFunc::Avg
518                        | AggFunc::Min
519                        | AggFunc::Max
520                        | AggFunc::CountDistinct
521                ) {
522                    if let Some(col) = field.as_ref() {
523                        // Shape: Aggregate(SeqScan) or Aggregate(Filter(SeqScan))
524                        let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
525                            match input.as_ref() {
526                                PlanNode::SeqScan { table } => (Some(table.as_str()), None),
527                                PlanNode::Filter {
528                                    input: inner,
529                                    predicate,
530                                } => {
531                                    if let PlanNode::SeqScan { table } = inner.as_ref() {
532                                        (Some(table.as_str()), Some(predicate))
533                                    } else {
534                                        (None, None)
535                                    }
536                                }
537                                _ => (None, None),
538                            };
539                        if let Some(table) = table_opt {
540                            if let Some(result) =
541                                self.agg_single_col_fast(table, col, *function, pred_opt)?
542                            {
543                                return Ok(result);
544                            }
545                        }
546                    }
547                }
548
549                // Fast path: Project(Limit(Filter(SeqScan))) — stream, decode
550                // only projected columns, stop once we hit the limit.
551                // (Handled in the Project branch; this branch only fires when
552                // the aggregate is the outer node.)
553                let result = self.execute_plan(input)?;
554                match result {
555                    QueryResult::Rows { columns, rows } => {
556                        match function {
557                            AggFunc::Count => {
558                                Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
559                            }
560                            AggFunc::CountDistinct => {
561                                let col = field.as_ref().ok_or("count distinct requires field")?;
562                                let idx = columns
563                                    .iter()
564                                    .position(|c| c == col)
565                                    .ok_or("col not found")?;
566                                let mut seen = std::collections::HashSet::new();
567                                for row in &rows {
568                                    let v = &row[idx];
569                                    if !v.is_empty() {
570                                        seen.insert(v.clone());
571                                    }
572                                }
573                                Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
574                            }
575                            AggFunc::Avg => {
576                                let col = field.as_ref().ok_or("avg requires field")?;
577                                let idx = columns
578                                    .iter()
579                                    .position(|c| c == col)
580                                    .ok_or("col not found")?;
581                                let sum: f64 = rows
582                                    .iter()
583                                    .filter_map(|r| match &r[idx] {
584                                        Value::Int(v) => Some(*v as f64),
585                                        Value::Float(v) => Some(*v),
586                                        _ => None,
587                                    })
588                                    .sum();
589                                let count = rows.len() as f64;
590                                Ok(QueryResult::Scalar(Value::Float(sum / count)))
591                            }
592                            AggFunc::Sum => {
593                                let col = field.as_ref().ok_or("sum requires field")?;
594                                let idx = columns
595                                    .iter()
596                                    .position(|c| c == col)
597                                    .ok_or("col not found")?;
598                                // Track int and float contributions separately so
599                                // Float columns (and mixed Int/Float rows) don't get
600                                // silently dropped as they did in the Int-only
601                                // version. If any Float is present, the whole sum
602                                // promotes to Float — matching Avg's semantics.
603                                let mut int_sum: i64 = 0;
604                                let mut float_sum: f64 = 0.0;
605                                let mut saw_float = false;
606                                for r in &rows {
607                                    match &r[idx] {
608                                        Value::Int(v) => int_sum += *v,
609                                        Value::Float(v) => {
610                                            float_sum += *v;
611                                            saw_float = true;
612                                        }
613                                        _ => {}
614                                    }
615                                }
616                                let result = if saw_float {
617                                    Value::Float(float_sum + int_sum as f64)
618                                } else {
619                                    Value::Int(int_sum)
620                                };
621                                Ok(QueryResult::Scalar(result))
622                            }
623                            AggFunc::Min | AggFunc::Max => {
624                                let col = field.as_ref().ok_or("min/max requires field")?;
625                                let idx = columns
626                                    .iter()
627                                    .position(|c| c == col)
628                                    .ok_or("col not found")?;
629                                let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
630                                let result = if *function == AggFunc::Min {
631                                    vals.into_iter().min().cloned()
632                                } else {
633                                    vals.into_iter().max().cloned()
634                                };
635                                Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
636                            }
637                        }
638                    }
639                    _ => Err("aggregate requires row input".into()),
640                }
641            }
642
643            PlanNode::Insert { table, rows } => {
644                // Build + validate EVERY row before inserting any, so a bad
645                // row (unknown/missing/uncoercible field) aborts the whole
646                // statement without a partial write. The WAL fsync happens
647                // once at statement end, so N rows = N appends + 1 fsync.
648                let all_values: Vec<Vec<Value>> = {
649                    let schema = self
650                        .catalog
651                        .schema(table)
652                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
653                    let mut all = Vec::with_capacity(rows.len());
654                    for assignments in rows {
655                        let mut values = vec![Value::Empty; schema.columns.len()];
656                        for a in assignments {
657                            let idx = schema.column_index(&a.field).ok_or_else(|| {
658                                QueryError::ColumnNotFound {
659                                    table: String::new(),
660                                    column: a.field.clone(),
661                                }
662                            })?;
663                            let raw = literal_to_value(&a.value)?;
664                            values[idx] = coerce_value(raw, &schema.columns[idx])?;
665                        }
666                        for col in &schema.columns {
667                            if col.required && matches!(values[col.position as usize], Value::Empty)
668                            {
669                                return Err(QueryError::Execution(format!(
670                                    "column '{}' is required but no value was provided",
671                                    col.name
672                                )));
673                            }
674                        }
675                        all.push(values);
676                    }
677                    all
678                };
679                // Charge the materialized batch against the per-query memory
680                // budget before inserting — keeps multi-row insert consistent
681                // with every other full-materialization point (sort/join/group)
682                // and bounds embedded callers (the server also caps the query
683                // string at 1 MB, but embedded callers have no such limit).
684                self.charge_rows(&all_values)?;
685                let n = all_values.len() as u64;
686                for values in &all_values {
687                    self.catalog
688                        .insert(table, values)
689                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
690                }
691                self.view_registry.mark_dependents_dirty(table);
692                Ok(QueryResult::Modified(n))
693            }
694
695            PlanNode::Upsert {
696                table,
697                key_column,
698                assignments,
699                on_conflict,
700            } => {
701                let (values, key_idx) = {
702                    let schema = self
703                        .catalog
704                        .schema(table)
705                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
706                    let mut values = vec![Value::Empty; schema.columns.len()];
707                    for a in assignments {
708                        let idx = schema.column_index(&a.field).ok_or_else(|| {
709                            QueryError::ColumnNotFound {
710                                table: String::new(),
711                                column: a.field.clone(),
712                            }
713                        })?;
714                        let raw = literal_to_value(&a.value)?;
715                        values[idx] = coerce_value(raw, &schema.columns[idx])?;
716                    }
717                    for col in &schema.columns {
718                        if col.required && matches!(values[col.position as usize], Value::Empty) {
719                            return Err(QueryError::Execution(format!(
720                                "column '{}' is required but no value was provided",
721                                col.name
722                            )));
723                        }
724                    }
725                    let key_idx = schema
726                        .column_index(key_column)
727                        .ok_or_else(|| format!("key column '{key_column}' not found"))?;
728                    (values, key_idx)
729                };
730
731                // Upsert requires the `on` column to be unique — otherwise
732                // there is no well-defined row to overwrite and a plain
733                // insert could silently create duplicate keys.
734                if self.catalog.is_index_unique(table, key_column) != Some(true) {
735                    return Err(QueryError::Execution(format!(
736                        "upsert on .{key_column} requires a unique column (declare it with \
737                         `unique {key_column}: <type>` or `alter {table} add unique .{key_column}`)"
738                    )));
739                }
740
741                let key_value = values[key_idx].clone();
742
743                // Probe the unique index for a conflict.
744                let existing = {
745                    let tbl = self
746                        .catalog
747                        .get_table(table)
748                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
749                    // The key column is guaranteed unique above, so this
750                    // returns at most one matching row.
751                    let rids = tbl.index_lookup_all(key_column, &key_value);
752                    rids.into_iter().next().and_then(|rid| {
753                        tbl.heap
754                            .get(rid)
755                            .map(|data| (rid, decode_row(&tbl.schema, &data)))
756                    })
757                };
758
759                if let Some((rid, mut existing_row)) = existing {
760                    // Conflict: apply on_conflict assignments (or all non-key if empty).
761                    let update_assignments = if on_conflict.is_empty() {
762                        assignments
763                    } else {
764                        on_conflict
765                    };
766                    let changed_cols: Vec<usize> = {
767                        let schema = self
768                            .catalog
769                            .schema(table)
770                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
771                        let mut indices = Vec::new();
772                        for a in update_assignments {
773                            let idx = schema.column_index(&a.field).ok_or_else(|| {
774                                QueryError::ColumnNotFound {
775                                    table: String::new(),
776                                    column: a.field.clone(),
777                                }
778                            })?;
779                            if idx != key_idx {
780                                existing_row[idx] = literal_to_value(&a.value)?;
781                                indices.push(idx);
782                            }
783                        }
784                        indices
785                    };
786                    self.catalog
787                        .update_hinted(table, rid, &existing_row, Some(&changed_cols))
788                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
789                    self.view_registry.mark_dependents_dirty(table);
790                    Ok(QueryResult::Modified(1))
791                } else {
792                    // No conflict: insert.
793                    self.catalog
794                        .insert(table, &values)
795                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
796                    self.view_registry.mark_dependents_dirty(table);
797                    Ok(QueryResult::Modified(1))
798                }
799            }
800
801            PlanNode::Update {
802                input,
803                table,
804                assignments,
805            } => {
806                // Mission C Phase 3: resolve assignments against a borrowed
807                // schema, then drop the borrow before the mutation loop.
808                // Try literal-only path first; fall back to per-row expression
809                // evaluation if any assignment contains a non-literal expression
810                // (e.g., `age := .age + 1`).
811                let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
812                    let schema_ref = self
813                        .catalog
814                        .schema(table)
815                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
816                    let indices: Vec<usize> = assignments
817                        .iter()
818                        .map(|a| {
819                            schema_ref.column_index(&a.field).ok_or_else(|| {
820                                QueryError::ColumnNotFound {
821                                    table: String::new(),
822                                    column: a.field.clone(),
823                                }
824                            })
825                        })
826                        .collect::<Result<_, _>>()?;
827                    let vals: Result<Vec<Value>, _> = assignments
828                        .iter()
829                        .map(|a| literal_to_value(&a.value))
830                        .collect();
831                    (indices, vals.ok())
832                };
833                let resolved_assignments: Option<Vec<(usize, Value)>> =
834                    literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
835
836                // Mission C Phase 2: the hint Table::update_hinted needs to
837                // decide whether to read the old row for index diff.
838                let changed_cols: Vec<usize> = col_indices.clone();
839
840                // ── Fused scan+update for Update(Filter(SeqScan)) ────────
841                // Perf sprint: instead of the two-pass collect-RIDs-then-loop
842                // pattern (which pays one ensure_hot per matched row on the
843                // second pass), fuse the predicate evaluation and in-place
844                // byte-level mutation into a single heap walk. Same idea as
845                // the fused scan_delete_matching path for deletes.
846                if let Some(ref resolved_assignments) = resolved_assignments {
847                    if let PlanNode::Filter {
848                        input: inner,
849                        predicate,
850                    } = input.as_ref()
851                    {
852                        if let PlanNode::SeqScan { table: t } = inner.as_ref() {
853                            if t == table {
854                                let fused_result = self.try_fused_scan_update(
855                                    table,
856                                    predicate,
857                                    resolved_assignments,
858                                    &changed_cols,
859                                );
860                                if let Some(result) = fused_result {
861                                    return result;
862                                }
863                            }
864                        }
865                    }
866                }
867
868                // Collect matching RowIds in a single pass.
869                let matching_rids = self.collect_rids_for_mutation(input, table)?;
870
871                // ── Literal-only fast paths ─────────────────────────────
872                if let Some(ref resolved_assignments) = resolved_assignments {
873                    // Mission C Phase 4: in-place byte-patch fast path. If every
874                    // assignment targets a fixed-size non-null column AND none of
875                    // them is indexed, we can skip decode_row / Vec<Value> /
876                    // encode_row_into entirely and patch the row's raw bytes on
877                    // the hot page.
878                    let fast_patch: Option<Vec<FastPatch>> = {
879                        let tbl = self
880                            .catalog
881                            .get_table(table)
882                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
883                        let schema = &tbl.schema;
884                        let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
885                            is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
886                        });
887                        let no_indexed = !resolved_assignments
888                            .iter()
889                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
890
891                        if all_fixed_nonnull && no_indexed {
892                            let layout = RowLayout::new(schema);
893                            let bitmap_size = layout.bitmap_size();
894                            let patches: Vec<FastPatch> = resolved_assignments
895                                .iter()
896                                .map(|(idx, val)| {
897                                    let fixed_off = layout
898                                        .fixed_offset(*idx)
899                                        .expect("is_fixed_size already checked");
900                                    let field_off = 2 + bitmap_size + fixed_off;
901                                    let bytes: FixedBytes = match val {
902                                        Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
903                                        Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
904                                        Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
905                                        Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
906                                        Value::Uuid(v) => FixedBytes::Uuid(*v),
907                                        _ => unreachable!("all_fixed_nonnull guard lied"),
908                                    };
909                                    FastPatch {
910                                        field_off,
911                                        bitmap_byte_off: 2 + idx / 8,
912                                        bit_mask: 1u8 << (idx % 8),
913                                        bytes,
914                                    }
915                                })
916                                .collect();
917                            Some(patches)
918                        } else {
919                            None
920                        }
921                    };
922
923                    if let Some(patches) = fast_patch {
924                        let mut count = 0u64;
925                        for rid in matching_rids {
926                            // Mission B2: WAL-log every patch so crash
927                            // recovery replays the update. Same mutation
928                            // closure as before — the wrapper just sandwiches
929                            // it between a hot-page read and a WAL append.
930                            let ok = self
931                                .catalog
932                                .update_row_bytes_logged(table, rid, |row| {
933                                    for p in &patches {
934                                        row[p.bitmap_byte_off] &= !p.bit_mask;
935                                        let field_bytes = p.bytes.as_slice();
936                                        row[p.field_off..p.field_off + field_bytes.len()]
937                                            .copy_from_slice(field_bytes);
938                                    }
939                                })
940                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
941                            if ok {
942                                count += 1;
943                            }
944                        }
945                        self.view_registry.mark_dependents_dirty(table);
946                        return Ok(QueryResult::Modified(count));
947                    }
948
949                    // Mission C Phase 10: var-column in-place shrink fast path.
950                    let var_fast: Option<(usize, Option<Vec<u8>>)> = {
951                        let tbl = self
952                            .catalog
953                            .get_table(table)
954                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
955                        let schema = &tbl.schema;
956                        let is_single = resolved_assignments.len() == 1;
957                        let is_var_col = is_single
958                            && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
959                        let no_indexed = !resolved_assignments
960                            .iter()
961                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
962
963                        if is_single && is_var_col && no_indexed {
964                            let (idx, val) = &resolved_assignments[0];
965                            let bytes_opt: Option<Vec<u8>> = match val {
966                                Value::Str(s) => Some(s.as_bytes().to_vec()),
967                                Value::Bytes(b) => Some(b.clone()),
968                                Value::Empty => None,
969                                _ => {
970                                    return Err(QueryError::TypeError(format!(
971                                        "cannot assign non-var value to var column '{}'",
972                                        schema.columns[*idx].name
973                                    )))
974                                }
975                            };
976                            Some((*idx, bytes_opt))
977                        } else {
978                            None
979                        }
980                    };
981
982                    if let Some((col_idx, new_bytes_opt)) = var_fast {
983                        let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
984                        let mut count = 0u64;
985                        let mut fallback_rids: Vec<RowId> = Vec::new();
986                        for rid in &matching_rids {
987                            // Mission B2: logged variant so crash recovery
988                            // replays the shrink. On a false return (row
989                            // would have to grow), the rid is pushed to
990                            // `fallback_rids` and the slower `update_hinted`
991                            // path — which is already WAL-logged — picks it up.
992                            let ok = self
993                                .catalog
994                                .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
995                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
996                            if ok {
997                                count += 1;
998                            } else {
999                                fallback_rids.push(*rid);
1000                            }
1001                        }
1002                        for rid in fallback_rids {
1003                            let mut row = match self.catalog.get(table, rid) {
1004                                Some(r) => r,
1005                                None => continue,
1006                            };
1007                            for (idx, val) in resolved_assignments.iter() {
1008                                row[*idx] = val.clone();
1009                            }
1010                            self.catalog
1011                                .update_hinted(table, rid, &row, Some(&changed_cols))
1012                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1013                            count += 1;
1014                        }
1015                        self.view_registry.mark_dependents_dirty(table);
1016                        return Ok(QueryResult::Modified(count));
1017                    }
1018
1019                    // Generic literal path: decode row, apply literal values.
1020                    let mut count = 0u64;
1021                    for rid in matching_rids {
1022                        let mut row = match self.catalog.get(table, rid) {
1023                            Some(r) => r,
1024                            None => continue,
1025                        };
1026                        for (idx, val) in resolved_assignments.iter() {
1027                            row[*idx] = val.clone();
1028                        }
1029                        self.catalog
1030                            .update_hinted(table, rid, &row, Some(&changed_cols))
1031                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1032                        count += 1;
1033                    }
1034                    self.view_registry.mark_dependents_dirty(table);
1035                    return Ok(QueryResult::Modified(count));
1036                } // end if let Some(resolved_assignments)
1037
1038                // ── Expression-based update path ────────────────────────
1039                // At least one assignment contains a non-literal expression
1040                // (e.g., `age := .age + 1`). Evaluate per-row.
1041                let col_names: Vec<String> = {
1042                    let schema_ref = self
1043                        .catalog
1044                        .schema(table)
1045                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1046                    schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1047                };
1048                let mut count = 0u64;
1049                for rid in matching_rids {
1050                    let mut row = match self.catalog.get(table, rid) {
1051                        Some(r) => r,
1052                        None => continue,
1053                    };
1054                    for (i, asgn) in assignments.iter().enumerate() {
1055                        let val = eval_expr(&asgn.value, &row, &col_names);
1056                        row[col_indices[i]] = val;
1057                    }
1058                    self.catalog
1059                        .update_hinted(table, rid, &row, Some(&changed_cols))
1060                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1061                    count += 1;
1062                }
1063                self.view_registry.mark_dependents_dirty(table);
1064                Ok(QueryResult::Modified(count))
1065            }
1066
1067            PlanNode::Delete { input, table } => {
1068                // Mission C Phase 3: no schema clone — collect_rids_for_mutation
1069                // looks up schema internally when it needs one, and the mutation
1070                // loop doesn't need the schema at all.
1071                //
1072                // Mission C Phase 12: route bulk deletes through
1073                // `Catalog::delete_many`, which batches the btree leaf
1074                // compaction and shares one `ensure_hot` per row between
1075                // the index-key extraction and the slot delete. On
1076                // `delete_by_filter` (100K fixture, ~20K matches) that
1077                // removes ~4ms of pure `Vec::remove` memmove from the btree
1078                // maintenance phase.
1079                //
1080                // Mission C Phase 16: for the common `delete where ...`
1081                // shape (Filter(SeqScan)) — and the rarer "delete
1082                // everything" shape (SeqScan) — skip the two-pass
1083                // `collect_rids_for_mutation` + `delete_many` flow entirely.
1084                // The fused `scan_delete_matching` primitive walks the
1085                // heap exactly once, paying one `ensure_hot` per page
1086                // instead of per-row. That closes the last major gap on
1087                // the bench's `delete_by_filter` workload.
1088                if let PlanNode::Filter {
1089                    input: inner,
1090                    predicate,
1091                } = input.as_ref()
1092                {
1093                    if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1094                        if t == table {
1095                            let schema = self
1096                                .catalog
1097                                .schema(table)
1098                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1099                            let columns: Vec<String> =
1100                                schema.columns.iter().map(|c| c.name.clone()).collect();
1101                            let fast = FastLayout::new(schema);
1102                            if let Some(compiled) =
1103                                compile_predicate(predicate, &columns, &fast, schema)
1104                            {
1105                                // Mission B2: logged variant so every
1106                                // matched rid hits the WAL during the
1107                                // single-pass scan. Structure of the
1108                                // fused scan is unchanged — only the
1109                                // hook closure now also appends.
1110                                let count = self
1111                                    .catalog
1112                                    .scan_delete_matching_logged(table, |data| compiled(data))
1113                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1114                                self.view_registry.mark_dependents_dirty(table);
1115                                return Ok(QueryResult::Modified(count));
1116                            }
1117                        }
1118                    }
1119                } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1120                    if t == table {
1121                        // `delete from T` with no predicate — every live
1122                        // row matches. One pass is still the right shape.
1123                        // Mission B2: logged variant — see above.
1124                        let count = self
1125                            .catalog
1126                            .scan_delete_matching_logged(table, |_| true)
1127                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1128                        self.view_registry.mark_dependents_dirty(table);
1129                        return Ok(QueryResult::Modified(count));
1130                    }
1131                }
1132
1133                let matching_rids = self.collect_rids_for_mutation(input, table)?;
1134                let count = self
1135                    .catalog
1136                    .delete_many(table, &matching_rids)
1137                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1138                self.view_registry.mark_dependents_dirty(table);
1139                Ok(QueryResult::Modified(count))
1140            }
1141
1142            PlanNode::AliasScan { table, alias } => {
1143                // Mission E1.2: scan `table` and rename every output column
1144                // to `alias.field`. Used as a join leaf so downstream
1145                // NestedLoopJoin + Filter + Project nodes can resolve
1146                // `Expr::QualifiedField` lookups by direct column-name match.
1147                //
1148                // We don't bother with a fused zero-copy loop here yet — the
1149                // whole join path is nested-loop and correctness-first
1150                // (Phase E1.3 will introduce hash join and at that point we
1151                // can revisit whether to specialise AliasScan).
1152                let schema = self
1153                    .catalog
1154                    .schema(table)
1155                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1156                    .clone();
1157                let columns: Vec<String> = schema
1158                    .columns
1159                    .iter()
1160                    .map(|c| format!("{alias}.{}", c.name))
1161                    .collect();
1162                let rows: Vec<Vec<Value>> = self
1163                    .catalog
1164                    .scan(table)
1165                    .map_err(|e| QueryError::StorageError(e.to_string()))?
1166                    .map(|(_, row)| row)
1167                    .collect();
1168                Ok(QueryResult::Rows { columns, rows })
1169            }
1170
1171            PlanNode::NestedLoopJoin {
1172                left,
1173                right,
1174                on,
1175                kind,
1176            } => {
1177                // Materialise both sides. The executor ships two strategies:
1178                //   1. Hash join (E1.3) — when the `on` predicate is a
1179                //      simple equi-predicate `left_col = right_col`, build a
1180                //      FxHashMap<Value, Vec<row_idx>> over the right side
1181                //      and probe with the left side. O(L + R) instead of
1182                //      O(L × R). Handles Inner and LeftOuter.
1183                //   2. Nested loop (E1.2) — fallback for Cross, non-equi
1184                //      predicates, or `on` expressions that reference
1185                //      either side with something more complex than a
1186                //      QualifiedField.
1187                let left_result = self.execute_plan(left)?;
1188                let right_result = self.execute_plan(right)?;
1189                let (left_columns, left_rows) = match left_result {
1190                    QueryResult::Rows { columns, rows } => (columns, rows),
1191                    _ => return Err("join left side must produce rows".into()),
1192                };
1193                let (right_columns, right_rows) = match right_result {
1194                    QueryResult::Rows { columns, rows } => (columns, rows),
1195                    _ => return Err("join right side must produce rows".into()),
1196                };
1197
1198                // WS2: byte-budget guard on the join build side. Charge both
1199                // materialized inputs before we build the hash table / probe;
1200                // the output is row-capped by check_join_limit below.
1201                self.charge_rows(&left_rows)?;
1202                self.charge_rows(&right_rows)?;
1203
1204                // Hash-join fast path.
1205                if !matches!(kind, JoinKind::Cross) {
1206                    if let Some(pred) = on {
1207                        if let Some((l_idx, r_idx)) =
1208                            try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1209                        {
1210                            let result = hash_join(
1211                                left_columns,
1212                                left_rows,
1213                                right_columns,
1214                                right_rows,
1215                                l_idx,
1216                                r_idx,
1217                                *kind,
1218                            );
1219                            if let QueryResult::Rows { ref rows, .. } = result {
1220                                check_join_limit(rows.len())?;
1221                            }
1222                            return Ok(result);
1223                        }
1224                    }
1225                }
1226
1227                // Nested-loop fallback.
1228                let n_left = left_columns.len();
1229                let n_right = right_columns.len();
1230                let mut columns = Vec::with_capacity(n_left + n_right);
1231                columns.extend(left_columns);
1232                columns.extend(right_columns);
1233
1234                let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1235                let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1236
1237                for left_row in &left_rows {
1238                    let mut matched = false;
1239                    for right_row in &right_rows {
1240                        combined.clear();
1241                        combined.extend_from_slice(left_row);
1242                        combined.extend_from_slice(right_row);
1243                        let keep = match kind {
1244                            JoinKind::Cross => true,
1245                            JoinKind::Inner | JoinKind::LeftOuter => match on {
1246                                Some(pred) => eval_predicate(pred, &combined, &columns),
1247                                // Missing `on` for non-cross joins is a
1248                                // parser error, but if it slips through we
1249                                // treat it as "match everything".
1250                                None => true,
1251                            },
1252                            // RightOuter is rewritten to LeftOuter by the
1253                            // planner, so we never see it here.
1254                            JoinKind::RightOuter => {
1255                                unreachable!("planner rewrites RightOuter to LeftOuter")
1256                            }
1257                        };
1258                        if keep {
1259                            rows.push(combined.clone());
1260                            check_join_limit(rows.len())?;
1261                            matched = true;
1262                        }
1263                    }
1264                    if !matched && matches!(kind, JoinKind::LeftOuter) {
1265                        let mut row = Vec::with_capacity(n_left + n_right);
1266                        row.extend_from_slice(left_row);
1267                        row.resize(n_left + n_right, Value::Empty);
1268                        rows.push(row);
1269                        check_join_limit(rows.len())?;
1270                    }
1271                }
1272
1273                Ok(QueryResult::Rows { columns, rows })
1274            }
1275
1276            PlanNode::Distinct { input } => {
1277                let result = self.execute_plan(input)?;
1278                match result {
1279                    QueryResult::Rows { columns, rows } => {
1280                        let mut seen = std::collections::HashSet::new();
1281                        let mut unique_rows = Vec::new();
1282                        for row in rows {
1283                            if seen.insert(row.clone()) {
1284                                unique_rows.push(row);
1285                            }
1286                        }
1287                        Ok(QueryResult::Rows {
1288                            columns,
1289                            rows: unique_rows,
1290                        })
1291                    }
1292                    other => Ok(other),
1293                }
1294            }
1295
1296            PlanNode::GroupBy {
1297                input,
1298                keys,
1299                aggregates,
1300                having,
1301            } => {
1302                let result = self.execute_plan(input)?;
1303                match result {
1304                    QueryResult::Rows { columns, rows } => {
1305                        // WS2: byte-budget guard on the GROUP BY input buffer
1306                        // (the hash table is bounded by the input it groups).
1307                        self.charge_rows(&rows)?;
1308                        // Resolve key column indices.
1309                        let key_indices: Vec<usize> = keys
1310                            .iter()
1311                            .map(|k| {
1312                                columns
1313                                    .iter()
1314                                    .position(|c| c == k)
1315                                    .ok_or_else(|| format!("group-by column '{k}' not found"))
1316                            })
1317                            .collect::<Result<Vec<_>, _>>()?;
1318
1319                        // Resolve aggregate field indices. count(*) uses
1320                        // sentinel usize::MAX — compute_group_aggregate
1321                        // treats it as "count all rows in the group".
1322                        let agg_field_indices: Vec<usize> = aggregates
1323                            .iter()
1324                            .map(|a| {
1325                                if a.field == "*" {
1326                                    Ok(usize::MAX)
1327                                } else {
1328                                    columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1329                                        format!("aggregate column '{}' not found", a.field)
1330                                    })
1331                                }
1332                            })
1333                            .collect::<Result<Vec<_>, _>>()?;
1334
1335                        // Group rows by key values (preserving insertion order).
1336                        let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1337                            rustc_hash::FxHashMap::default();
1338                        let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1339                        for (ri, row) in rows.iter().enumerate() {
1340                            let key: Vec<Value> =
1341                                key_indices.iter().map(|&i| row[i].clone()).collect();
1342                            match group_map.get(&key) {
1343                                Some(&idx) => groups[idx].1.push(ri),
1344                                None => {
1345                                    let idx = groups.len();
1346                                    group_map.insert(key.clone(), idx);
1347                                    groups.push((key, vec![ri]));
1348                                }
1349                            }
1350                        }
1351
1352                        // Build output column names: keys ++ aggregate output names.
1353                        let mut out_columns: Vec<String> = keys.clone();
1354                        for agg in aggregates.iter() {
1355                            out_columns.push(agg.output_name.clone());
1356                        }
1357
1358                        // Compute aggregates per group.
1359                        let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1360                        for (key_vals, row_indices) in &groups {
1361                            let mut row = key_vals.clone();
1362                            for (ai, agg) in aggregates.iter().enumerate() {
1363                                let col_idx = agg_field_indices[ai];
1364                                let val = compute_group_aggregate(
1365                                    agg.function,
1366                                    &rows,
1367                                    row_indices,
1368                                    col_idx,
1369                                );
1370                                row.push(val);
1371                            }
1372                            out_rows.push(row);
1373                        }
1374
1375                        // Apply HAVING filter.
1376                        if let Some(having_expr) = having {
1377                            out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1378                        }
1379
1380                        Ok(QueryResult::Rows {
1381                            columns: out_columns,
1382                            rows: out_rows,
1383                        })
1384                    }
1385                    _ => Err("group by requires row input".into()),
1386                }
1387            }
1388
1389            PlanNode::CreateTable { name, fields } => {
1390                let columns: Vec<ColumnDef> = fields
1391                    .iter()
1392                    .enumerate()
1393                    .map(|(i, f)| -> Result<ColumnDef, QueryError> {
1394                        Ok(ColumnDef {
1395                            name: f.name.clone(),
1396                            type_id: type_name_to_id(&f.type_name)
1397                                .map_err(QueryError::TypeError)?,
1398                            required: f.required,
1399                            position: i as u16,
1400                        })
1401                    })
1402                    .collect::<Result<Vec<_>, _>>()?;
1403                let schema = Schema {
1404                    table_name: name.clone(),
1405                    columns,
1406                };
1407                self.catalog
1408                    .create_table(schema)
1409                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1410                // Declaring a field `unique` auto-creates a unique B+tree
1411                // index, which is where uniqueness is enforced on writes.
1412                for f in fields.iter().filter(|f| f.unique) {
1413                    self.catalog
1414                        .create_index_unique(name, &f.name, true)
1415                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1416                }
1417                Ok(QueryResult::Created(name.clone()))
1418            }
1419
1420            PlanNode::AlterTable { table, action } => match action {
1421                AlterAction::AddColumn {
1422                    name,
1423                    type_name,
1424                    required,
1425                } => {
1426                    let position = self
1427                        .catalog
1428                        .schema(table)
1429                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1430                        .columns
1431                        .len() as u16;
1432                    let col = ColumnDef {
1433                        name: name.clone(),
1434                        type_id: type_name_to_id(type_name).map_err(QueryError::TypeError)?,
1435                        required: *required,
1436                        position,
1437                    };
1438                    self.catalog
1439                        .alter_table_add_column(table, col)
1440                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1441                    Ok(QueryResult::Executed {
1442                        message: format!("column '{name}' added to '{table}'"),
1443                    })
1444                }
1445                AlterAction::DropColumn { name } => {
1446                    self.catalog
1447                        .alter_table_drop_column(table, name)
1448                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1449                    Ok(QueryResult::Executed {
1450                        message: format!("column '{name}' dropped from '{table}'"),
1451                    })
1452                }
1453                AlterAction::AddIndex { column } => {
1454                    self.catalog
1455                        .create_index(table, column)
1456                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1457                    Ok(QueryResult::Executed {
1458                        message: format!("index on '{table}.{column}' created"),
1459                    })
1460                }
1461                AlterAction::AddUnique { column } => {
1462                    // No DropIndex exists, so we cannot upgrade an existing
1463                    // non-unique index in place — reject it cleanly.
1464                    if self.catalog.has_index(table, column) {
1465                        return Err(QueryError::Execution(format!(
1466                            "cannot add unique on {table}.{column}: column already indexed"
1467                        )));
1468                    }
1469                    // Scan existing rows for duplicate (non-null) values
1470                    // before creating the unique index.
1471                    {
1472                        let tbl = self
1473                            .catalog
1474                            .get_table(table)
1475                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1476                        let col_idx = tbl.schema.column_index(column).ok_or_else(|| {
1477                            QueryError::ColumnNotFound {
1478                                table: table.to_string(),
1479                                column: column.clone(),
1480                            }
1481                        })?;
1482                        let mut seen = std::collections::HashSet::new();
1483                        for (_, row) in tbl.scan() {
1484                            let v = &row[col_idx];
1485                            if v.is_empty() {
1486                                continue;
1487                            }
1488                            if !seen.insert(v.clone()) {
1489                                return Err(QueryError::Execution(format!(
1490                                    "cannot add unique on {table}.{column}: \
1491                                     duplicate value {v:?} exists"
1492                                )));
1493                            }
1494                        }
1495                    }
1496                    self.catalog
1497                        .create_index_unique(table, column, true)
1498                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1499                    Ok(QueryResult::Executed {
1500                        message: format!("unique index on '{table}.{column}' created"),
1501                    })
1502                }
1503            },
1504
1505            PlanNode::DropTable { name } => {
1506                self.catalog
1507                    .drop_table(name)
1508                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1509                Ok(QueryResult::Executed {
1510                    message: format!("table '{name}' dropped"),
1511                })
1512            }
1513
1514            PlanNode::CreateView { name, query_text } => {
1515                self.create_view(name, query_text)?;
1516                Ok(QueryResult::Executed {
1517                    message: format!("materialized view '{name}' created"),
1518                })
1519            }
1520
1521            PlanNode::RefreshView { name } => {
1522                self.refresh_view(name)?;
1523                Ok(QueryResult::Executed {
1524                    message: format!("materialized view '{name}' refreshed"),
1525                })
1526            }
1527
1528            PlanNode::DropView { name } => {
1529                self.drop_view(name)?;
1530                Ok(QueryResult::Executed {
1531                    message: format!("materialized view '{name}' dropped"),
1532                })
1533            }
1534
1535            PlanNode::Window { input, windows } => {
1536                let result = self.execute_plan(input)?;
1537                execute_window(result, windows)
1538            }
1539
1540            PlanNode::Union { left, right, all } => {
1541                let left_result = self.execute_plan(left)?;
1542                let right_result = self.execute_plan(right)?;
1543                let (left_cols, left_rows) = match left_result {
1544                    QueryResult::Rows { columns, rows } => (columns, rows),
1545                    _ => return Err("UNION requires query results on left side".into()),
1546                };
1547                let (_, right_rows) = match right_result {
1548                    QueryResult::Rows { columns, rows } => (columns, rows),
1549                    _ => return Err("UNION requires query results on right side".into()),
1550                };
1551                let mut combined = left_rows;
1552                if *all {
1553                    // UNION ALL — just concatenate.
1554                    combined.extend(right_rows);
1555                } else {
1556                    // UNION — deduplicate using the same HashSet approach
1557                    // as DISTINCT. Value already implements Hash + Eq.
1558                    let mut seen = std::collections::HashSet::new();
1559                    for row in &combined {
1560                        seen.insert(row.clone());
1561                    }
1562                    for row in right_rows {
1563                        if seen.insert(row.clone()) {
1564                            combined.push(row);
1565                        }
1566                    }
1567                }
1568                Ok(QueryResult::Rows {
1569                    columns: left_cols,
1570                    rows: combined,
1571                })
1572            }
1573
1574            PlanNode::Explain { input } => {
1575                let text = format_plan_tree(input, 0);
1576                Ok(QueryResult::Rows {
1577                    columns: vec!["plan".to_string()],
1578                    rows: text
1579                        .lines()
1580                        .map(|line| vec![Value::Str(line.to_string())])
1581                        .collect(),
1582                })
1583            }
1584
1585            PlanNode::Begin => {
1586                if self.in_transaction {
1587                    return Err(QueryError::Execution(
1588                        "already in a transaction (nested transactions not supported)".into(),
1589                    ));
1590                }
1591                self.in_transaction = true;
1592                Ok(QueryResult::Executed {
1593                    message: "transaction started".to_string(),
1594                })
1595            }
1596
1597            PlanNode::Commit => {
1598                if !self.in_transaction {
1599                    return Err(QueryError::Execution(
1600                        "no active transaction to commit".into(),
1601                    ));
1602                }
1603                self.in_transaction = false;
1604                self.catalog
1605                    .sync_wal()
1606                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1607                Ok(QueryResult::Executed {
1608                    message: "transaction committed".to_string(),
1609                })
1610            }
1611
1612            PlanNode::Rollback => {
1613                if !self.in_transaction {
1614                    return Err(QueryError::Execution(
1615                        "no active transaction to roll back".into(),
1616                    ));
1617                }
1618                self.in_transaction = false;
1619                self.catalog
1620                    .rollback_to_last_sync()
1621                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1622                if let Ok(mut cache) = self.plan_cache.lock() {
1623                    cache.clear();
1624                }
1625                self.view_registry = ViewRegistry::open(self.catalog.data_dir())
1626                    .unwrap_or_else(|_| ViewRegistry::new(self.catalog.data_dir()));
1627                Ok(QueryResult::Executed {
1628                    message: "transaction rolled back".to_string(),
1629                })
1630            }
1631
1632            PlanNode::IndexScan { table, column, key } => {
1633                let key_value = literal_to_value(key)?;
1634                let tbl = self
1635                    .catalog
1636                    .get_table(table)
1637                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1638                let columns: Vec<String> =
1639                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1640
1641                // Fast path: the table has a B-tree on this column.
1642                // Uses index_lookup_all to return ALL matching rows for
1643                // both unique and non-unique indexes.
1644                if tbl.has_index(column) {
1645                    let rids = tbl.index_lookup_all(column, &key_value);
1646                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1647                    for rid in rids {
1648                        if let Some(data) = tbl.heap.get(rid) {
1649                            rows.push(decode_row(&tbl.schema, &data));
1650                        }
1651                    }
1652                    return Ok(QueryResult::Rows { columns, rows });
1653                }
1654
1655                // Fallback: no index on this column. The planner emits IndexScan
1656                // eagerly (it has no visibility into which columns are indexed
1657                // at plan time), so here we must behave like SeqScan+Filter on
1658                // `.col = literal`: return *all* matching rows, not just the
1659                // first one. A non-indexed column isn't necessarily unique.
1660                // We compile the eq predicate once and stream without any
1661                // per-row decode for non-matching rows.
1662                let schema = &tbl.schema;
1663                let fast = FastLayout::new(schema);
1664                let synth_pred = Expr::BinaryOp(
1665                    Box::new(Expr::Field(column.clone())),
1666                    BinOp::Eq,
1667                    Box::new(key.clone()),
1668                );
1669                if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1670                    // Mission F: skip the first 4 Vec doublings.
1671                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1672                    self.catalog
1673                        .for_each_row_raw(table, |_rid, data| {
1674                            if compiled(data) {
1675                                rows.push(decode_row(schema, data));
1676                            }
1677                        })
1678                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1679                    return Ok(QueryResult::Rows { columns, rows });
1680                }
1681
1682                // Last resort: slow eq-check on materialised rows.
1683                let col_idx =
1684                    schema
1685                        .column_index(column)
1686                        .ok_or_else(|| QueryError::ColumnNotFound {
1687                            table: String::new(),
1688                            column: column.clone(),
1689                        })?;
1690                let rows: Vec<Vec<Value>> = tbl
1691                    .scan()
1692                    .filter_map(|(_, row)| {
1693                        if row[col_idx] == key_value {
1694                            Some(row)
1695                        } else {
1696                            None
1697                        }
1698                    })
1699                    .collect();
1700                Ok(QueryResult::Rows { columns, rows })
1701            }
1702
1703            PlanNode::RangeScan {
1704                table,
1705                column,
1706                start,
1707                end,
1708            } => {
1709                let tbl = self
1710                    .catalog
1711                    .get_table(table)
1712                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1713                let columns: Vec<String> =
1714                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1715                let schema = &tbl.schema;
1716
1717                let start_val = match start {
1718                    Some((expr, _)) => Some(literal_to_value(expr)?),
1719                    None => None,
1720                };
1721                let end_val = match end {
1722                    Some((expr, _)) => Some(literal_to_value(expr)?),
1723                    None => None,
1724                };
1725                let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1726                let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1727
1728                // Non-unique index: walk the composite (value, rid) leaf
1729                // chain between prefix bounds, fetch each row from the heap,
1730                // and recheck. The recheck enforces exclusive bounds
1731                // (range_rids is inclusive) and defensively skips any decoded
1732                // null (nulls are never indexed, so they must not match).
1733                if tbl.is_index_unique(column) == Some(false) {
1734                    if let Some(btree) = tbl.index(column) {
1735                        if start_val.is_some() || end_val.is_some() {
1736                            let col_idx = schema.column_index(column).ok_or_else(|| {
1737                                QueryError::ColumnNotFound {
1738                                    table: String::new(),
1739                                    column: column.clone(),
1740                                }
1741                            })?;
1742                            let rids = btree.range_rids(start_val.as_ref(), end_val.as_ref());
1743                            let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1744                            for rid in rids {
1745                                if let Some(data) = tbl.heap.get(rid) {
1746                                    let row = decode_row(schema, &data);
1747                                    if !row[col_idx].is_empty()
1748                                        && range_matches(
1749                                            &row[col_idx],
1750                                            &start_val,
1751                                            start_inclusive,
1752                                            &end_val,
1753                                            end_inclusive,
1754                                        )
1755                                    {
1756                                        rows.push(row);
1757                                    }
1758                                }
1759                            }
1760                            return Ok(QueryResult::Rows { columns, rows });
1761                        }
1762                    }
1763                }
1764
1765                // Range scans use the btree fast path for unique indexes,
1766                // walking raw column-value keys directly.
1767                if tbl.is_index_unique(column) == Some(true) {
1768                    if let Some(btree) = tbl.index(column) {
1769                        let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1770                            (Some(s), Some(e)) => btree.range(s, e).collect(),
1771                            (Some(s), None) => btree.range_from(s),
1772                            (None, Some(e)) => btree.range_to(e),
1773                            (None, None) => {
1774                                let rows: Vec<Vec<Value>> =
1775                                    tbl.scan().map(|(_, row)| row).collect();
1776                                return Ok(QueryResult::Rows { columns, rows });
1777                            }
1778                        };
1779                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1780                        for (key, rid) in hits {
1781                            if !start_inclusive {
1782                                if let Some(ref s) = start_val {
1783                                    if &key == s {
1784                                        continue;
1785                                    }
1786                                }
1787                            }
1788                            if !end_inclusive {
1789                                if let Some(ref e) = end_val {
1790                                    if &key == e {
1791                                        continue;
1792                                    }
1793                                }
1794                            }
1795                            if let Some(data) = tbl.heap.get(rid) {
1796                                rows.push(decode_row(schema, &data));
1797                            }
1798                        }
1799                        return Ok(QueryResult::Rows { columns, rows });
1800                    }
1801                }
1802
1803                // Fallback: no index — synthesize range predicate and scan.
1804                let fast = FastLayout::new(schema);
1805                let synth = synthesize_range_predicate(column, start, end);
1806                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1807                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1808                    self.catalog
1809                        .for_each_row_raw(table, |_rid, data| {
1810                            if compiled(data) {
1811                                rows.push(decode_row(schema, data));
1812                            }
1813                        })
1814                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1815                    return Ok(QueryResult::Rows { columns, rows });
1816                }
1817
1818                let col_idx =
1819                    schema
1820                        .column_index(column)
1821                        .ok_or_else(|| QueryError::ColumnNotFound {
1822                            table: String::new(),
1823                            column: column.clone(),
1824                        })?;
1825                let rows: Vec<Vec<Value>> = tbl
1826                    .scan()
1827                    .filter(|(_, row)| {
1828                        range_matches(
1829                            &row[col_idx],
1830                            &start_val,
1831                            start_inclusive,
1832                            &end_val,
1833                            end_inclusive,
1834                        )
1835                    })
1836                    .map(|(_, row)| row)
1837                    .collect();
1838                Ok(QueryResult::Rows { columns, rows })
1839            }
1840        }
1841    }
1842
1843    // ─── Materialized view operations ──────────────────────────────────────
1844
1845    /// Create a materialized view: execute the source query, store results
1846    /// in a new backing table, and register the view.
1847    fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1848        if self.view_registry.is_view(name) {
1849            return Err(QueryError::ViewError(format!(
1850                "materialized view '{name}' already exists"
1851            )));
1852        }
1853        // Execute the source query to get the result set.
1854        let result = self.execute_powql(query_text)?;
1855        let (columns, rows) = match result {
1856            QueryResult::Rows { columns, rows } => (columns, rows),
1857            _ => return Err("view source query must be a SELECT".into()),
1858        };
1859        // Derive a schema for the backing table from the query result columns.
1860        let schema = self.derive_view_schema(name, &columns, &rows);
1861        // Create the backing table and insert the result rows.
1862        self.catalog
1863            .create_table(schema)
1864            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1865        for row in &rows {
1866            self.catalog
1867                .insert(name, row)
1868                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1869        }
1870        // Determine which base tables this view depends on by parsing the query.
1871        let depends_on = self.extract_view_deps(query_text);
1872        self.view_registry
1873            .register(ViewDef {
1874                name: name.to_string(),
1875                query: query_text.to_string(),
1876                depends_on,
1877                dirty: false,
1878            })
1879            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1880        Ok(())
1881    }
1882
1883    /// Refresh a materialized view: re-execute its source query and replace
1884    /// the backing table's contents.
1885    fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1886        let def = self
1887            .view_registry
1888            .get(name)
1889            .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1890        let query_text = def.query.clone();
1891        // Execute the source query.
1892        let result = self.execute_powql(&query_text)?;
1893        let (_columns, rows) = match result {
1894            QueryResult::Rows { columns, rows } => (columns, rows),
1895            _ => return Err("view source query must be a SELECT".into()),
1896        };
1897        // Clear old data and insert fresh results. Mission B2: logged
1898        // variant — view refreshes are a mutation and crash recovery
1899        // must see them.
1900        self.catalog
1901            .scan_delete_matching_logged(name, |_| true)
1902            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1903        for row in &rows {
1904            self.catalog
1905                .insert(name, row)
1906                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1907        }
1908        self.view_registry.mark_clean(name);
1909        Ok(())
1910    }
1911
1912    /// Drop a materialized view: remove the backing table and unregister.
1913    fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1914        if !self.view_registry.is_view(name) {
1915            return Err(QueryError::ViewError(format!(
1916                "materialized view '{name}' not found"
1917            )));
1918        }
1919        self.view_registry
1920            .unregister(name)
1921            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1922        self.catalog
1923            .drop_table(name)
1924            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1925        Ok(())
1926    }
1927
1928    /// Derive a storage `Schema` for a view's backing table from query
1929    /// result column names and the first row's types.
1930    fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1931        use powdb_storage::types::{ColumnDef, TypeId};
1932        let cols: Vec<ColumnDef> = columns
1933            .iter()
1934            .enumerate()
1935            .map(|(i, col_name)| {
1936                let type_id = rows
1937                    .first()
1938                    .and_then(|row| row.get(i))
1939                    .map(|v| v.type_id())
1940                    .unwrap_or(TypeId::Str);
1941                ColumnDef {
1942                    name: col_name.clone(),
1943                    type_id,
1944                    required: false,
1945                    position: i as u16,
1946                }
1947            })
1948            .collect();
1949        Schema {
1950            table_name: name.to_string(),
1951            columns: cols,
1952        }
1953    }
1954
1955    /// Extract base table dependencies from a view's source query by
1956    /// parsing it and collecting the source table name.
1957    fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1958        use crate::parser::parse;
1959        match parse(query_text) {
1960            Ok(Statement::Query(q)) => {
1961                let mut deps = vec![q.source.clone()];
1962                for j in &q.joins {
1963                    deps.push(j.source.clone());
1964                }
1965                deps
1966            }
1967            _ => Vec::new(),
1968        }
1969    }
1970
1971    // ─── Specialized fast paths ─────────────────────────────────────────────
1972    //
1973    // These methods are helpers for the `execute_plan` match arms above.
1974    // Each returns `Ok(Some(result))` when the fast path fires, `Ok(None)`
1975    // when the shape isn't supported (caller falls back to generic code).
1976
1977    /// Aggregate sum/avg/min/max over a single fixed-size i64 column, with
1978    /// an optional compiled filter predicate. Walks raw row bytes — zero
1979    /// per-row allocation. Uses i128 accumulator for sum/avg overflow safety.
1980    pub(super) fn agg_single_col_fast(
1981        &self,
1982        table: &str,
1983        col: &str,
1984        function: AggFunc,
1985        predicate: Option<&Expr>,
1986    ) -> Result<Option<QueryResult>, QueryError> {
1987        let schema = self
1988            .catalog
1989            .schema(table)
1990            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1991            .clone();
1992        let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1993        let col_idx = match schema.column_index(col) {
1994            Some(i) => i,
1995            None => return Ok(None),
1996        };
1997        // Only fast-path fixed-size numeric columns (Int/Float) for
1998        // sum/avg/min/max/count. Mission D10: Float parity — prior version
1999        // bailed on Float columns, forcing them through the generic row-
2000        // decoding path that allocated a Vec<Value> per row and dispatched
2001        // on Value::cmp for every compare. f64 decode is structurally the
2002        // same as i64 (load 8 bytes, cast), so the fast path handles both.
2003        let col_type = schema.columns[col_idx].type_id;
2004        if col_type != TypeId::Int && col_type != TypeId::Float {
2005            return Ok(None);
2006        }
2007
2008        let fast = FastLayout::new(&schema);
2009        // Mission C Phase 20b: inline the numeric-column reader instead of
2010        // building a `Box<dyn Fn>`. Eliminates 100K vtable dispatches per
2011        // 100K-row agg scan — every reader call folds directly into the
2012        // hot loop below.
2013        let byte_offset = match fast.fixed_offsets[col_idx] {
2014            Some(o) => o,
2015            None => return Ok(None),
2016        };
2017        let bitmap_byte = col_idx / 8;
2018        let bitmap_bit = (col_idx % 8) as u32;
2019        let data_offset = 2 + fast.bitmap_size + byte_offset;
2020
2021        // Optional compiled filter.
2022        let compiled_pred: Option<CompiledPredicate> = match predicate {
2023            Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
2024                Some(c) => Some(c),
2025                None => return Ok(None), // let generic path handle it
2026            },
2027            None => None,
2028        };
2029
2030        // Mission C Phase 20b: specialize the inner loop per aggregate
2031        // function. The previous version ran a `match function { ... }`
2032        // *inside* the closure, which kept LLVM from producing optimal
2033        // scalar code for each variant (agg_max regressed ~23% vs the
2034        // baseline Box<dyn Fn> version even though per-row vtable cost
2035        // should have been strictly lower). Pushing the match out of the
2036        // hot loop lets each specialized body fold cleanly into
2037        // `for_each_row_raw` and removes a captured `AggFunc` + match
2038        // dispatch per row.
2039        //
2040        // Mission D10: same specialisation applies to the Float branch.
2041        // For Min/Max we use `f64::total_cmp` so the result matches
2042        // `Value::Ord` — this is the same ordering ORDER BY and the
2043        // top-N sort fast path use, keeping semantics consistent across
2044        // read paths (NaN compares as greatest, -0.0 < +0.0 for
2045        // deterministic tie-breaking).
2046        //
2047        // Mission D11 Phase 1: each inner loop now splits on presence of
2048        // a predicate (`if let Some(pred) = &compiled_pred`) so the hot
2049        // body never re-tests `Option` per row, and reads column bytes
2050        // via `read_i64_unchecked` / `read_f64_unchecked` helpers that
2051        // drop two bounds checks per row (null bitmap byte + value
2052        // slice). Safety is carried by the `FastLayout` invariant that
2053        // `data_offset + 8 <= row_len` for any fixed-size column; see
2054        // the helper doc comments. Hot loops are macro-generated so the
2055        // with-pred / no-pred split can't drift between variants.
2056        let result = match col_type {
2057            TypeId::Int => match function {
2058                AggFunc::Sum | AggFunc::Avg => {
2059                    let mut sum_i128: i128 = 0;
2060                    let mut count: i64 = 0;
2061                    agg_int_loop!(
2062                        self,
2063                        table,
2064                        compiled_pred,
2065                        bitmap_byte,
2066                        bitmap_bit,
2067                        data_offset,
2068                        |v: i64| {
2069                            count += 1;
2070                            sum_i128 += v as i128;
2071                        }
2072                    );
2073                    if matches!(function, AggFunc::Sum) {
2074                        let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
2075                        QueryResult::Scalar(Value::Int(clamped))
2076                    } else if count == 0 {
2077                        QueryResult::Scalar(Value::Empty)
2078                    } else {
2079                        let avg = (sum_i128 as f64) / (count as f64);
2080                        QueryResult::Scalar(Value::Float(avg))
2081                    }
2082                }
2083                AggFunc::Min => {
2084                    let mut min_v: Option<i64> = None;
2085                    agg_int_loop!(
2086                        self,
2087                        table,
2088                        compiled_pred,
2089                        bitmap_byte,
2090                        bitmap_bit,
2091                        data_offset,
2092                        |v: i64| {
2093                            min_v = Some(match min_v {
2094                                Some(m) => m.min(v),
2095                                None => v,
2096                            });
2097                        }
2098                    );
2099                    QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
2100                }
2101                AggFunc::Max => {
2102                    let mut max_v: Option<i64> = None;
2103                    agg_int_loop!(
2104                        self,
2105                        table,
2106                        compiled_pred,
2107                        bitmap_byte,
2108                        bitmap_bit,
2109                        data_offset,
2110                        |v: i64| {
2111                            max_v = Some(match max_v {
2112                                Some(m) => m.max(v),
2113                                None => v,
2114                            });
2115                        }
2116                    );
2117                    QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
2118                }
2119                AggFunc::Count => {
2120                    let mut count: i64 = 0;
2121                    agg_int_loop!(
2122                        self,
2123                        table,
2124                        compiled_pred,
2125                        bitmap_byte,
2126                        bitmap_bit,
2127                        data_offset,
2128                        |_v: i64| {
2129                            count += 1;
2130                        }
2131                    );
2132                    QueryResult::Scalar(Value::Int(count))
2133                }
2134                AggFunc::CountDistinct => {
2135                    let mut seen = rustc_hash::FxHashSet::default();
2136                    agg_int_loop!(
2137                        self,
2138                        table,
2139                        compiled_pred,
2140                        bitmap_byte,
2141                        bitmap_bit,
2142                        data_offset,
2143                        |v: i64| {
2144                            seen.insert(v);
2145                        }
2146                    );
2147                    QueryResult::Scalar(Value::Int(seen.len() as i64))
2148                }
2149            },
2150            TypeId::Float => match function {
2151                AggFunc::Sum => {
2152                    // Use a single f64 accumulator. Naive summation is
2153                    // sufficient for MVP parity; if precision becomes an
2154                    // issue on long scans we can upgrade to Kahan–Neumaier
2155                    // compensated sum (~2x scalar cost, zero error growth).
2156                    let mut sum: f64 = 0.0;
2157                    agg_float_loop!(
2158                        self,
2159                        table,
2160                        compiled_pred,
2161                        bitmap_byte,
2162                        bitmap_bit,
2163                        data_offset,
2164                        |v: f64| {
2165                            sum += v;
2166                        }
2167                    );
2168                    QueryResult::Scalar(Value::Float(sum))
2169                }
2170                AggFunc::Avg => {
2171                    let mut sum: f64 = 0.0;
2172                    let mut count: i64 = 0;
2173                    agg_float_loop!(
2174                        self,
2175                        table,
2176                        compiled_pred,
2177                        bitmap_byte,
2178                        bitmap_bit,
2179                        data_offset,
2180                        |v: f64| {
2181                            sum += v;
2182                            count += 1;
2183                        }
2184                    );
2185                    if count == 0 {
2186                        QueryResult::Scalar(Value::Empty)
2187                    } else {
2188                        QueryResult::Scalar(Value::Float(sum / count as f64))
2189                    }
2190                }
2191                AggFunc::Min => {
2192                    // `total_cmp` for deterministic NaN handling (matches
2193                    // Value::Ord). NaN compares greatest, so Min will
2194                    // correctly ignore it in favour of any finite value.
2195                    let mut min_v: Option<f64> = None;
2196                    agg_float_loop!(
2197                        self,
2198                        table,
2199                        compiled_pred,
2200                        bitmap_byte,
2201                        bitmap_bit,
2202                        data_offset,
2203                        |v: f64| {
2204                            min_v = Some(match min_v {
2205                                Some(m) => {
2206                                    if v.total_cmp(&m).is_lt() {
2207                                        v
2208                                    } else {
2209                                        m
2210                                    }
2211                                }
2212                                None => v,
2213                            });
2214                        }
2215                    );
2216                    QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2217                }
2218                AggFunc::Max => {
2219                    let mut max_v: Option<f64> = None;
2220                    agg_float_loop!(
2221                        self,
2222                        table,
2223                        compiled_pred,
2224                        bitmap_byte,
2225                        bitmap_bit,
2226                        data_offset,
2227                        |v: f64| {
2228                            max_v = Some(match max_v {
2229                                Some(m) => {
2230                                    if v.total_cmp(&m).is_gt() {
2231                                        v
2232                                    } else {
2233                                        m
2234                                    }
2235                                }
2236                                None => v,
2237                            });
2238                        }
2239                    );
2240                    QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2241                }
2242                AggFunc::Count => {
2243                    let mut count: i64 = 0;
2244                    agg_float_loop!(
2245                        self,
2246                        table,
2247                        compiled_pred,
2248                        bitmap_byte,
2249                        bitmap_bit,
2250                        data_offset,
2251                        |_v: f64| {
2252                            count += 1;
2253                        }
2254                    );
2255                    QueryResult::Scalar(Value::Int(count))
2256                }
2257                AggFunc::CountDistinct => {
2258                    // Hash on `f64::to_bits` — matches `Value::Hash`, so
2259                    // distinct NaN bit patterns count as distinct and
2260                    // -0.0/+0.0 count as distinct. Consistent with how
2261                    // Float values are hashed in every other DISTINCT /
2262                    // GROUP BY path.
2263                    let mut seen = rustc_hash::FxHashSet::default();
2264                    agg_float_loop!(
2265                        self,
2266                        table,
2267                        compiled_pred,
2268                        bitmap_byte,
2269                        bitmap_bit,
2270                        data_offset,
2271                        |v: f64| {
2272                            seen.insert(v.to_bits());
2273                        }
2274                    );
2275                    QueryResult::Scalar(Value::Int(seen.len() as i64))
2276                }
2277            },
2278            _ => unreachable!("type guard above restricts to Int/Float"),
2279        };
2280        Ok(Some(result))
2281    }
2282
2283    /// `Project(Limit(Filter(SeqScan)))` and `Project(Limit(SeqScan))`.
2284    /// Streams rows, decodes only projected columns, stops at the limit.
2285    pub(super) fn project_filter_limit_fast(
2286        &self,
2287        table: &str,
2288        fields: &[ProjectField],
2289        limit: usize,
2290        predicate: Option<&Expr>,
2291    ) -> Result<Option<QueryResult>, QueryError> {
2292        let schema = self
2293            .catalog
2294            .schema(table)
2295            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2296            .clone();
2297        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2298
2299        // Each projection field must be a simple `.field` reference for this
2300        // fast path. Aliased or computed fields fall through.
2301        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2302        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2303        for f in fields {
2304            let name = match &f.expr {
2305                Expr::Field(n) => n.clone(),
2306                _ => return Ok(None),
2307            };
2308            let idx = match all_columns.iter().position(|c| c == &name) {
2309                Some(i) => i,
2310                None => return Ok(None),
2311            };
2312            proj_indices.push(idx);
2313            proj_columns.push(f.alias.clone().unwrap_or(name));
2314        }
2315
2316        let fast = FastLayout::new(&schema);
2317        let row_layout = RowLayout::new(&schema);
2318
2319        let compiled_pred: Option<CompiledPredicate> = match predicate {
2320            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2321                Some(c) => Some(c),
2322                None => return Ok(None),
2323            },
2324            None => None,
2325        };
2326
2327        let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2328        // Mission D2: use try_for_each_row_raw to actually stop iterating
2329        // once the limit is reached. The previous `done` flag only short-
2330        // circuited the closure body, so a `limit 100` over 100K rows still
2331        // walked all 100K slots — burning ~30x SQLite on scan_filter_project_top100.
2332        self.catalog
2333            .try_for_each_row_raw(table, |_rid, data| {
2334                use std::ops::ControlFlow;
2335                if let Some(ref pred) = compiled_pred {
2336                    if !pred(data) {
2337                        return ControlFlow::Continue(());
2338                    }
2339                }
2340                let row: Vec<Value> = proj_indices
2341                    .iter()
2342                    .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2343                    .collect();
2344                out.push(row);
2345                if out.len() >= limit {
2346                    ControlFlow::Break(())
2347                } else {
2348                    ControlFlow::Continue(())
2349                }
2350            })
2351            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2352
2353        Ok(Some(QueryResult::Rows {
2354            columns: proj_columns,
2355            rows: out,
2356        }))
2357    }
2358
2359    /// `Project(Limit(Sort(Filter(SeqScan))))` and `Project(Limit(Sort(SeqScan)))`.
2360    /// Bounded top-N heap over the sort key. Only the sort key needs to be
2361    /// read per row; projected columns are decoded only for the final
2362    /// winning rows when the heap drains.
2363    pub(super) fn project_filter_sort_limit_fast(
2364        &self,
2365        table: &str,
2366        fields: &[ProjectField],
2367        sort_field: &str,
2368        descending: bool,
2369        limit: usize,
2370        predicate: Option<&Expr>,
2371    ) -> Result<Option<QueryResult>, QueryError> {
2372        if limit == 0 {
2373            // Degenerate case — empty result. Let the generic path handle it
2374            // for proper column naming.
2375            return Ok(None);
2376        }
2377        // The top-N heaps never hold more than `limit` rows, but `limit` is an
2378        // attacker-supplied literal (`order .x limit 99999999999`). Reserving
2379        // that capacity up front would allocate gigabytes and abort the
2380        // process before a single row is read. Cap the pre-allocation; the
2381        // heaps still grow on demand up to the true `limit`.
2382        const TOPN_PREALLOC_CAP: usize = 4096;
2383        let prealloc = limit.min(TOPN_PREALLOC_CAP);
2384        let schema = self
2385            .catalog
2386            .schema(table)
2387            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2388            .clone();
2389        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2390
2391        // Sort key must be a fixed-size numeric column (Int or Float).
2392        // Mission D10: extended from Int-only. Float sort keys use a
2393        // sortable-u64 transform (see `f64_to_sortable_u64`) so the heap
2394        // path stays keyed on `u64` and the whole branch shape is
2395        // identical to the Int case — no new heap types, no `total_cmp`
2396        // closures in the hot loop.
2397        let sort_idx = match schema.column_index(sort_field) {
2398            Some(i) => i,
2399            None => return Ok(None),
2400        };
2401        let sort_col_type = schema.columns[sort_idx].type_id;
2402        if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2403            return Ok(None);
2404        }
2405
2406        // Each projection field must be a simple `.field`.
2407        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2408        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2409        for f in fields {
2410            let name = match &f.expr {
2411                Expr::Field(n) => n.clone(),
2412                _ => return Ok(None),
2413            };
2414            let idx = match all_columns.iter().position(|c| c == &name) {
2415                Some(i) => i,
2416                None => return Ok(None),
2417            };
2418            proj_indices.push(idx);
2419            proj_columns.push(f.alias.clone().unwrap_or(name));
2420        }
2421
2422        let fast = FastLayout::new(&schema);
2423        let row_layout = RowLayout::new(&schema);
2424        // Mission C Phase 20b: inline numeric-column reader (no Box<dyn Fn>).
2425        let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2426            Some(o) => o,
2427            None => return Ok(None),
2428        };
2429        let sort_bitmap_byte = sort_idx / 8;
2430        let sort_bitmap_bit = (sort_idx % 8) as u32;
2431        let sort_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2432
2433        let compiled_pred: Option<CompiledPredicate> = match predicate {
2434            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2435                Some(c) => Some(c),
2436                None => return Ok(None),
2437            },
2438            None => None,
2439        };
2440
2441        // Bounded top-N heap. For `order .x desc limit N`, we want the N
2442        // largest values — use a min-heap so the smallest is at the top and
2443        // can be popped when a better candidate arrives. For ascending, use
2444        // a max-heap. We tie-break with a monotonic `seq` counter so the
2445        // result is deterministic and stable.
2446        //
2447        // To keep this simple we maintain two typed heaps and pick by
2448        // direction.
2449        let drained: Vec<Vec<u8>> = match sort_col_type {
2450            TypeId::Int => {
2451                let mut seq: u64 = 0;
2452                let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2453                    BinaryHeap::with_capacity(prealloc);
2454                let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2455                    BinaryHeap::with_capacity(prealloc);
2456
2457                self.catalog
2458                    .for_each_row_raw(table, |_rid, data| {
2459                        if let Some(ref pred) = compiled_pred {
2460                            if !pred(data) {
2461                                return;
2462                            }
2463                        }
2464                        // Inlined int-column reader: null check + i64 decode.
2465                        if data.len() < sort_data_offset + 8 {
2466                            return;
2467                        }
2468                        let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2469                        if is_null {
2470                            return;
2471                        }
2472                        let key = i64::from_le_bytes(
2473                            data[sort_data_offset..sort_data_offset + 8]
2474                                .try_into()
2475                                .unwrap_or_else(|_| unreachable!()),
2476                        );
2477                        let id = seq;
2478                        seq += 1;
2479
2480                        if descending {
2481                            if heap_desc.len() < limit {
2482                                heap_desc.push(Reverse((key, id, data.to_vec())));
2483                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2484                                if key > *top_key {
2485                                    heap_desc.pop();
2486                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2487                                }
2488                            }
2489                        } else if heap_asc.len() < limit {
2490                            heap_asc.push((key, id, data.to_vec()));
2491                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2492                            if key < *top_key {
2493                                heap_asc.pop();
2494                                heap_asc.push((key, id, data.to_vec()));
2495                            }
2496                        }
2497                    })
2498                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2499
2500                let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2501                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2502                } else {
2503                    heap_asc.into_iter().collect()
2504                };
2505                if descending {
2506                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2507                } else {
2508                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2509                }
2510                drained.into_iter().map(|(_, _, d)| d).collect()
2511            }
2512            TypeId::Float => {
2513                // Novel angle: rather than introducing a `TotalF64` newtype
2514                // with `Ord via total_cmp`, transform the f64 bit pattern
2515                // into a sortable `u64` so `BinaryHeap<u64>` orders exactly
2516                // like `f64::total_cmp` would. Classic trick: flip the sign
2517                // bit on positives, flip all bits on negatives. Result:
2518                // - NaN (sign=0) stays greatest, matching total_cmp
2519                // - -0.0 sorts before +0.0, matching total_cmp
2520                // - Hot loop is branch-cheap (one compare + one xor)
2521                let mut seq: u64 = 0;
2522                let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2523                    BinaryHeap::with_capacity(prealloc);
2524                let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2525                    BinaryHeap::with_capacity(prealloc);
2526
2527                self.catalog
2528                    .for_each_row_raw(table, |_rid, data| {
2529                        if let Some(ref pred) = compiled_pred {
2530                            if !pred(data) {
2531                                return;
2532                            }
2533                        }
2534                        if data.len() < sort_data_offset + 8 {
2535                            return;
2536                        }
2537                        let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2538                        if is_null {
2539                            return;
2540                        }
2541                        let bits = u64::from_le_bytes(
2542                            data[sort_data_offset..sort_data_offset + 8]
2543                                .try_into()
2544                                .unwrap_or_else(|_| unreachable!()),
2545                        );
2546                        let key = f64_bits_to_sortable_u64(bits);
2547                        let id = seq;
2548                        seq += 1;
2549
2550                        if descending {
2551                            if heap_desc.len() < limit {
2552                                heap_desc.push(Reverse((key, id, data.to_vec())));
2553                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2554                                if key > *top_key {
2555                                    heap_desc.pop();
2556                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2557                                }
2558                            }
2559                        } else if heap_asc.len() < limit {
2560                            heap_asc.push((key, id, data.to_vec()));
2561                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2562                            if key < *top_key {
2563                                heap_asc.pop();
2564                                heap_asc.push((key, id, data.to_vec()));
2565                            }
2566                        }
2567                    })
2568                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2569
2570                let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2571                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2572                } else {
2573                    heap_asc.into_iter().collect()
2574                };
2575                if descending {
2576                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2577                } else {
2578                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2579                }
2580                drained.into_iter().map(|(_, _, d)| d).collect()
2581            }
2582            _ => unreachable!("type guard above restricts to Int/Float"),
2583        };
2584
2585        let rows: Vec<Vec<Value>> = drained
2586            .into_iter()
2587            .map(|data| {
2588                proj_indices
2589                    .iter()
2590                    .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2591                    .collect()
2592            })
2593            .collect();
2594
2595        Ok(Some(QueryResult::Rows {
2596            columns: proj_columns,
2597            rows,
2598        }))
2599    }
2600
2601    /// Gather the RowIds that a mutation should operate on, without
2602    /// materialising the full row set. Handles the shapes the planner emits
2603    /// for update/delete: SeqScan, IndexScan, and Filter(SeqScan). Other
2604    /// shapes fall back to `generic_rid_match`.
2605    ///
2606    /// Perf sprint: try to fuse the predicate evaluation and in-place
2607    /// byte-level mutation into a single heap walk. Returns `Some(result)`
2608    /// if the fused path fired, `None` to fall through to the generic
2609    /// two-pass code.
2610    ///
2611    /// Covers two shapes:
2612    /// 1. Fixed-width non-null literal assignments on non-indexed columns
2613    ///    → byte-patch every matched row in place (row length unchanged).
2614    /// 2. Single var-col literal assignment on a non-indexed column
2615    ///    → `patch_var_column_in_place` on every matched row (may shrink);
2616    ///    rows that can't be patched in place are collected for fallback.
2617    fn try_fused_scan_update(
2618        &mut self,
2619        table: &str,
2620        predicate: &Expr,
2621        resolved: &[(usize, Value)],
2622        changed_cols: &[usize],
2623    ) -> Option<Result<QueryResult, QueryError>> {
2624        // Build compiled predicate. Requires a schema borrow that must be
2625        // dropped before we call scan_patch_matching_logged.
2626        let compiled = {
2627            let schema = self.catalog.schema(table)?;
2628            let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2629            let fast = FastLayout::new(schema);
2630            compile_predicate(predicate, &columns, &fast, schema)?
2631        };
2632
2633        // ── Path 1: fixed-width fast patch ──────────────────────────
2634        let fixed_patches: Option<Vec<FastPatch>> = {
2635            let tbl = self.catalog.get_table(table)?;
2636            let schema = &tbl.schema;
2637            let all_fixed_nonnull = resolved
2638                .iter()
2639                .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2640            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2641            if all_fixed_nonnull && no_indexed {
2642                let layout = RowLayout::new(schema);
2643                let bitmap_size = layout.bitmap_size();
2644                Some(
2645                    resolved
2646                        .iter()
2647                        .map(|(idx, val)| {
2648                            let fixed_off = layout
2649                                .fixed_offset(*idx)
2650                                .expect("is_fixed_size already checked");
2651                            let field_off = 2 + bitmap_size + fixed_off;
2652                            let bytes: FixedBytes = match val {
2653                                Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2654                                Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2655                                Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2656                                Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2657                                Value::Uuid(v) => FixedBytes::Uuid(*v),
2658                                _ => unreachable!("all_fixed_nonnull guard"),
2659                            };
2660                            FastPatch {
2661                                field_off,
2662                                bitmap_byte_off: 2 + idx / 8,
2663                                bit_mask: 1u8 << (idx % 8),
2664                                bytes,
2665                            }
2666                        })
2667                        .collect(),
2668                )
2669            } else {
2670                None
2671            }
2672        };
2673        if let Some(patches) = fixed_patches {
2674            let result = self
2675                .catalog
2676                .scan_patch_matching_logged(table, compiled, |row| {
2677                    for p in &patches {
2678                        row[p.bitmap_byte_off] &= !p.bit_mask;
2679                        let field_bytes = p.bytes.as_slice();
2680                        row[p.field_off..p.field_off + field_bytes.len()]
2681                            .copy_from_slice(field_bytes);
2682                    }
2683                    Some(row.len() as u16)
2684                })
2685                .map_err(|e| e.to_string());
2686            match result {
2687                Ok((count, _)) => {
2688                    self.view_registry.mark_dependents_dirty(table);
2689                    return Some(Ok(QueryResult::Modified(count)));
2690                }
2691                Err(e) => return Some(Err(QueryError::Execution(e))),
2692            }
2693        }
2694
2695        // ── Path 2: single var-col shrink fast patch ────────────────
2696        let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2697            let tbl = self.catalog.get_table(table)?;
2698            let schema = &tbl.schema;
2699            let is_single = resolved.len() == 1;
2700            let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2701            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2702            if is_single && is_var && no_indexed {
2703                let (idx, val) = &resolved[0];
2704                let bytes_opt = match val {
2705                    Value::Str(s) => Some(s.as_bytes().to_vec()),
2706                    Value::Bytes(b) => Some(b.clone()),
2707                    Value::Empty => None,
2708                    _ => return None, // type mismatch, fall through
2709                };
2710                Some((*idx, bytes_opt))
2711            } else {
2712                None
2713            }
2714        };
2715        if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2716            // Build a fresh RowLayout before the mutable borrow.
2717            let layout = {
2718                let schema = self.catalog.schema(table)?;
2719                RowLayout::new(schema)
2720            };
2721            let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2722            let result = self
2723                .catalog
2724                .scan_patch_matching_logged(table, compiled, |row| {
2725                    patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2726                })
2727                .map_err(|e| e.to_string());
2728            match result {
2729                Ok((mut count, fallback_rids)) => {
2730                    // Handle rows where in-place patch failed (new > old).
2731                    for rid in fallback_rids {
2732                        let mut row = match self.catalog.get(table, rid) {
2733                            Some(r) => r,
2734                            None => continue,
2735                        };
2736                        for (idx, val) in resolved.iter() {
2737                            row[*idx] = val.clone();
2738                        }
2739                        if let Err(e) =
2740                            self.catalog
2741                                .update_hinted(table, rid, &row, Some(changed_cols))
2742                        {
2743                            return Some(Err(QueryError::StorageError(e.to_string())));
2744                        }
2745                        count += 1;
2746                    }
2747                    self.view_registry.mark_dependents_dirty(table);
2748                    return Some(Ok(QueryResult::Modified(count)));
2749                }
2750                Err(e) => return Some(Err(QueryError::Execution(e))),
2751            }
2752        }
2753
2754        None // no fused path applicable — fall through
2755    }
2756
2757    /// Mission C Phase 3: schema is looked up via `self.catalog.schema(table)`
2758    /// inside the branches that actually need it. Previously the caller had
2759    /// to clone the full Schema (6+ String allocs) before every mutation just
2760    /// so this function could borrow it — a cost the update/delete hot path
2761    /// did not need.
2762    fn collect_rids_for_mutation(
2763        &mut self,
2764        input: &PlanNode,
2765        table: &str,
2766    ) -> Result<Vec<RowId>, QueryError> {
2767        match input {
2768            PlanNode::SeqScan { table: t } if t == table => {
2769                // "Update/delete everything" — rare but legal.
2770                let rids: Vec<RowId> = self
2771                    .catalog
2772                    .scan(table)
2773                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2774                    .map(|(rid, _)| rid)
2775                    .collect();
2776                Ok(rids)
2777            }
2778            PlanNode::IndexScan {
2779                table: t,
2780                column,
2781                key,
2782            } if t == table => {
2783                let key_value = literal_to_value(key)?;
2784
2785                // Indexed case: single lookup, 0 or 1 rows.
2786                // Mission D7: int-specialized fast path on int-keyed indexes
2787                // (primary keys, created_at, etc.) — the common case for
2788                // `update_by_pk` / `delete where id = ?`.
2789                //
2790                // Scope the `tbl` borrow so it's released before we fall
2791                // through to the scan-based paths below (which reborrow
2792                // `self.catalog`).
2793                {
2794                    let tbl = self
2795                        .catalog
2796                        .get_table(table)
2797                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2798                    if tbl.has_index(column) {
2799                        let rids = tbl.index_lookup_all(column, &key_value);
2800                        return Ok(rids);
2801                    }
2802                }
2803
2804                // No index: the planner folds `.col = literal` to IndexScan
2805                // regardless of whether the column is actually unique. When
2806                // there's no index we must behave like Filter(SeqScan) and
2807                // return *all* matching RIDs — not just the first one.
2808                let schema = self
2809                    .catalog
2810                    .schema(table)
2811                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2812                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2813                let fast = FastLayout::new(schema);
2814                let synth = Expr::BinaryOp(
2815                    Box::new(Expr::Field(column.clone())),
2816                    BinOp::Eq,
2817                    Box::new(key.clone()),
2818                );
2819                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2820                    // Mission F: skip the first 4 Vec doublings.
2821                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2822                    self.catalog
2823                        .for_each_row_raw(table, |rid, data| {
2824                            if compiled(data) {
2825                                rids.push(rid);
2826                            }
2827                        })
2828                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2829                    return Ok(rids);
2830                }
2831
2832                // Fallback: decode each row, compare values.
2833                let col_idx =
2834                    schema
2835                        .column_index(column)
2836                        .ok_or_else(|| QueryError::ColumnNotFound {
2837                            table: String::new(),
2838                            column: column.clone(),
2839                        })?;
2840                let rids: Vec<RowId> = self
2841                    .catalog
2842                    .scan(table)
2843                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2844                    .filter_map(|(rid, row)| {
2845                        if row[col_idx] == key_value {
2846                            Some(rid)
2847                        } else {
2848                            None
2849                        }
2850                    })
2851                    .collect();
2852                Ok(rids)
2853            }
2854            PlanNode::Filter {
2855                input: inner,
2856                predicate,
2857            } => {
2858                if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2859                    if t != table {
2860                        return self.generic_rid_match(input, table);
2861                    }
2862                    let schema = self
2863                        .catalog
2864                        .schema(table)
2865                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2866                    let columns: Vec<String> =
2867                        schema.columns.iter().map(|c| c.name.clone()).collect();
2868                    let fast = FastLayout::new(schema);
2869                    let row_layout = RowLayout::new(schema);
2870
2871                    // Try compiled predicate first.
2872                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2873                        // Mission F: skip the first 4 Vec doublings.
2874                        let mut rids: Vec<RowId> = Vec::with_capacity(64);
2875                        self.catalog
2876                            .for_each_row_raw(table, |rid, data| {
2877                                if compiled(data) {
2878                                    rids.push(rid);
2879                                }
2880                            })
2881                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2882                        return Ok(rids);
2883                    }
2884
2885                    // Fallback: selective decode + eval.
2886                    let pred_cols = predicate_column_indices(predicate, &columns);
2887                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2888                    self.catalog
2889                        .for_each_row_raw(table, |rid, data| {
2890                            let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2891                            if eval_predicate(predicate, &pred_row, &columns) {
2892                                rids.push(rid);
2893                            }
2894                        })
2895                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2896                    return Ok(rids);
2897                }
2898                self.generic_rid_match(input, table)
2899            }
2900            _ => self.generic_rid_match(input, table),
2901        }
2902    }
2903
2904    /// Last-ditch generic match: execute the plan, collect matching rows,
2905    /// then find corresponding RowIds by value equality. This is the old
2906    /// O(N*M) code path; only used when the plan shape is something exotic.
2907    fn generic_rid_match(
2908        &mut self,
2909        input: &PlanNode,
2910        table: &str,
2911    ) -> Result<Vec<RowId>, QueryError> {
2912        let result = self.execute_plan(input)?;
2913        let rows = match result {
2914            QueryResult::Rows { rows, .. } => rows,
2915            _ => return Err("mutation source must be rows".into()),
2916        };
2917        let matching: Vec<RowId> = self
2918            .catalog
2919            .scan(table)
2920            .map_err(|e| QueryError::StorageError(e.to_string()))?
2921            .filter(|(_, row)| rows.iter().any(|r| r == row))
2922            .map(|(rid, _)| rid)
2923            .collect();
2924        Ok(matching)
2925    }
2926}
2927
2928pub(super) fn execute_window(
2929    result: QueryResult,
2930    windows: &[WindowDef],
2931) -> Result<QueryResult, QueryError> {
2932    let (mut columns, mut rows) = match result {
2933        QueryResult::Rows { columns, rows } => (columns, rows),
2934        _ => return Err("window function requires row input".into()),
2935    };
2936
2937    for wdef in windows {
2938        // Resolve partition/order column indices against current columns.
2939        let part_indices: Vec<usize> = wdef
2940            .partition_by
2941            .iter()
2942            .map(|name| {
2943                columns
2944                    .iter()
2945                    .position(|c| c == name)
2946                    .ok_or_else(|| format!("window partition column '{name}' not found"))
2947            })
2948            .collect::<Result<Vec<_>, _>>()?;
2949
2950        let ord_indices: Vec<(usize, bool)> = wdef
2951            .order_by
2952            .iter()
2953            .map(|sk| {
2954                columns
2955                    .iter()
2956                    .position(|c| c == &sk.field)
2957                    .map(|i| (i, sk.descending))
2958                    .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2959            })
2960            .collect::<Result<Vec<_>, _>>()?;
2961
2962        // Resolve the argument column index (for aggregate windows).
2963        let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2964            match arg {
2965                Expr::Field(name) => {
2966                    if name == "*" {
2967                        None // count(*) style — no specific column
2968                    } else {
2969                        Some(
2970                            columns
2971                                .iter()
2972                                .position(|c| c == name)
2973                                .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2974                        )
2975                    }
2976                }
2977                _ => None,
2978            }
2979        } else {
2980            None
2981        };
2982
2983        // Build a sort-index to sort rows by partition_by then order_by
2984        // without actually reordering the original Vec (we need original
2985        // order to write results back).
2986        let n = rows.len();
2987        let mut indices: Vec<usize> = (0..n).collect();
2988        indices.sort_by(|&a, &b| {
2989            // Compare partition keys first.
2990            for &pi in &part_indices {
2991                let cmp = rows[a][pi].cmp(&rows[b][pi]);
2992                if cmp != std::cmp::Ordering::Equal {
2993                    return cmp;
2994                }
2995            }
2996            // Then order keys.
2997            for &(oi, desc) in &ord_indices {
2998                let cmp = rows[a][oi].cmp(&rows[b][oi]);
2999                if cmp != std::cmp::Ordering::Equal {
3000                    return if desc { cmp.reverse() } else { cmp };
3001                }
3002            }
3003            std::cmp::Ordering::Equal
3004        });
3005
3006        // SQL window-frame semantics: with no `order` clause the frame for an
3007        // aggregate window is the ENTIRE partition, not the running prefix.
3008        // The loop below computes running values; for the no-order case we
3009        // back-fill every row of a partition with the partition's final
3010        // (i.e. complete) aggregate once its boundary is reached. Ranking
3011        // functions are untouched — row_number/rank/dense_rank are inherently
3012        // positional.
3013        let whole_partition_frame = wdef.order_by.is_empty()
3014            && matches!(
3015                wdef.function,
3016                WindowFunc::Sum
3017                    | WindowFunc::Avg
3018                    | WindowFunc::Count
3019                    | WindowFunc::Min
3020                    | WindowFunc::Max
3021            );
3022        // Original row indices of the partition currently being scanned
3023        // (only tracked when back-filling is needed).
3024        let mut partition_row_indices: Vec<usize> = Vec::new();
3025
3026        // Compute window values in sorted order, tracking partition boundaries.
3027        let mut win_values: Vec<Value> = vec![Value::Empty; n];
3028        let mut partition_start = 0usize;
3029        // Running state for aggregate windows:
3030        let mut running_count: i64 = 0;
3031        let mut running_int_sum: i64 = 0;
3032        let mut running_float_sum: f64 = 0.0;
3033        let mut running_saw_float = false;
3034        let mut running_min: Option<Value> = None;
3035        let mut running_max: Option<Value> = None;
3036        let mut rank_counter: i64 = 0;
3037        let mut dense_rank_counter: i64 = 0;
3038        let mut prev_order_key: Option<Vec<Value>> = None;
3039        let mut same_rank_count: i64 = 0;
3040
3041        for sorted_pos in 0..n {
3042            let row_idx = indices[sorted_pos];
3043
3044            // Detect partition boundary.
3045            let new_partition = if sorted_pos == 0 {
3046                true
3047            } else {
3048                let prev_row_idx = indices[sorted_pos - 1];
3049                part_indices
3050                    .iter()
3051                    .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
3052            };
3053
3054            if new_partition {
3055                // No-order aggregate frame: the partition that just ended is
3056                // complete, so its final running value IS the whole-partition
3057                // aggregate. Back-fill it onto every row of that partition.
3058                if whole_partition_frame && sorted_pos > 0 {
3059                    let final_v = win_values[indices[sorted_pos - 1]].clone();
3060                    for ri in partition_row_indices.drain(..) {
3061                        win_values[ri] = final_v.clone();
3062                    }
3063                }
3064                partition_start = sorted_pos;
3065                running_count = 0;
3066                running_int_sum = 0;
3067                running_float_sum = 0.0;
3068                running_saw_float = false;
3069                running_min = None;
3070                running_max = None;
3071                rank_counter = 0;
3072                dense_rank_counter = 0;
3073                prev_order_key = None;
3074                same_rank_count = 0;
3075            }
3076
3077            // Extract current order key for rank tracking.
3078            let current_order_key: Vec<Value> = ord_indices
3079                .iter()
3080                .map(|&(oi, _)| rows[row_idx][oi].clone())
3081                .collect();
3082            let same_as_prev = prev_order_key.as_ref() == Some(&current_order_key);
3083
3084            let value = match wdef.function {
3085                WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
3086                WindowFunc::Rank => {
3087                    if same_as_prev {
3088                        same_rank_count += 1;
3089                    } else {
3090                        rank_counter += same_rank_count + 1;
3091                        same_rank_count = 0;
3092                        if rank_counter == 0 {
3093                            rank_counter = 1;
3094                        }
3095                    }
3096                    Value::Int(rank_counter)
3097                }
3098                WindowFunc::DenseRank => {
3099                    if !same_as_prev {
3100                        dense_rank_counter += 1;
3101                    }
3102                    Value::Int(dense_rank_counter)
3103                }
3104                WindowFunc::Sum => {
3105                    if let Some(ci) = arg_col_idx {
3106                        match &rows[row_idx][ci] {
3107                            Value::Int(v) => running_int_sum += v,
3108                            Value::Float(v) => {
3109                                running_float_sum += v;
3110                                running_saw_float = true;
3111                            }
3112                            _ => {}
3113                        }
3114                    }
3115                    if running_saw_float {
3116                        Value::Float(running_float_sum + running_int_sum as f64)
3117                    } else {
3118                        Value::Int(running_int_sum)
3119                    }
3120                }
3121                WindowFunc::Avg => {
3122                    if let Some(ci) = arg_col_idx {
3123                        match &rows[row_idx][ci] {
3124                            Value::Int(v) => {
3125                                running_float_sum += *v as f64;
3126                                running_count += 1;
3127                            }
3128                            Value::Float(v) => {
3129                                running_float_sum += v;
3130                                running_count += 1;
3131                            }
3132                            _ => {}
3133                        }
3134                    }
3135                    if running_count == 0 {
3136                        Value::Empty
3137                    } else {
3138                        Value::Float(running_float_sum / running_count as f64)
3139                    }
3140                }
3141                WindowFunc::Count => {
3142                    if let Some(ci) = arg_col_idx {
3143                        if !rows[row_idx][ci].is_empty() {
3144                            running_count += 1;
3145                        }
3146                    } else {
3147                        // count(*) — count all rows
3148                        running_count += 1;
3149                    }
3150                    Value::Int(running_count)
3151                }
3152                WindowFunc::Min => {
3153                    if let Some(ci) = arg_col_idx {
3154                        let v = &rows[row_idx][ci];
3155                        if !v.is_empty() {
3156                            running_min = Some(match &running_min {
3157                                None => v.clone(),
3158                                Some(cur) => {
3159                                    if v < cur {
3160                                        v.clone()
3161                                    } else {
3162                                        cur.clone()
3163                                    }
3164                                }
3165                            });
3166                        }
3167                    }
3168                    running_min.clone().unwrap_or(Value::Empty)
3169                }
3170                WindowFunc::Max => {
3171                    if let Some(ci) = arg_col_idx {
3172                        let v = &rows[row_idx][ci];
3173                        if !v.is_empty() {
3174                            running_max = Some(match &running_max {
3175                                None => v.clone(),
3176                                Some(cur) => {
3177                                    if v > cur {
3178                                        v.clone()
3179                                    } else {
3180                                        cur.clone()
3181                                    }
3182                                }
3183                            });
3184                        }
3185                    }
3186                    running_max.clone().unwrap_or(Value::Empty)
3187                }
3188            };
3189
3190            prev_order_key = Some(current_order_key);
3191            win_values[row_idx] = value;
3192            if whole_partition_frame {
3193                partition_row_indices.push(row_idx);
3194            }
3195        }
3196
3197        // Back-fill the final partition (the loop only flushes at boundaries).
3198        if whole_partition_frame && n > 0 {
3199            let final_v = win_values[indices[n - 1]].clone();
3200            for ri in partition_row_indices.drain(..) {
3201                win_values[ri] = final_v.clone();
3202            }
3203        }
3204
3205        // Append the computed window column to each row.
3206        for (ri, row) in rows.iter_mut().enumerate() {
3207            row.push(win_values[ri].clone());
3208        }
3209        columns.push(wdef.output_name.clone());
3210    }
3211
3212    Ok(QueryResult::Rows { columns, rows })
3213}
3214
3215/// Mission E2b: compute one aggregate over a set of rows in a group.
3216pub(super) fn compute_group_aggregate(
3217    func: AggFunc,
3218    all_rows: &[Vec<Value>],
3219    row_indices: &[usize],
3220    col_idx: usize,
3221) -> Value {
3222    match func {
3223        AggFunc::Count => {
3224            if col_idx == usize::MAX {
3225                // count(*) — count all rows in the group.
3226                return Value::Int(row_indices.len() as i64);
3227            }
3228            let count = row_indices
3229                .iter()
3230                .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3231                .count();
3232            Value::Int(count as i64)
3233        }
3234        AggFunc::CountDistinct => {
3235            let mut seen = std::collections::HashSet::new();
3236            for &ri in row_indices {
3237                let v = &all_rows[ri][col_idx];
3238                if !v.is_empty() {
3239                    seen.insert(v.clone());
3240                }
3241            }
3242            Value::Int(seen.len() as i64)
3243        }
3244        AggFunc::Sum => {
3245            // Mirror the scalar Sum path: accumulate int and float
3246            // contributions separately and promote the final result to
3247            // Float if any Float row was observed. Prevents silent
3248            // drop of Float columns in GROUP BY aggregates.
3249            let mut int_sum: i64 = 0;
3250            let mut float_sum: f64 = 0.0;
3251            let mut saw_float = false;
3252            for &ri in row_indices {
3253                match &all_rows[ri][col_idx] {
3254                    Value::Int(v) => int_sum += v,
3255                    Value::Float(v) => {
3256                        float_sum += *v;
3257                        saw_float = true;
3258                    }
3259                    _ => {}
3260                }
3261            }
3262            if saw_float {
3263                Value::Float(float_sum + int_sum as f64)
3264            } else {
3265                Value::Int(int_sum)
3266            }
3267        }
3268        AggFunc::Avg => {
3269            let mut sum = 0.0f64;
3270            let mut count = 0usize;
3271            for &ri in row_indices {
3272                match &all_rows[ri][col_idx] {
3273                    Value::Int(v) => {
3274                        sum += *v as f64;
3275                        count += 1;
3276                    }
3277                    Value::Float(v) => {
3278                        sum += *v;
3279                        count += 1;
3280                    }
3281                    _ => {}
3282                }
3283            }
3284            if count == 0 {
3285                Value::Empty
3286            } else {
3287                Value::Float(sum / count as f64)
3288            }
3289        }
3290        AggFunc::Min => row_indices
3291            .iter()
3292            .map(|&ri| &all_rows[ri][col_idx])
3293            .filter(|v| !v.is_empty())
3294            .min()
3295            .cloned()
3296            .unwrap_or(Value::Empty),
3297        AggFunc::Max => row_indices
3298            .iter()
3299            .map(|&ri| &all_rows[ri][col_idx])
3300            .filter(|v| !v.is_empty())
3301            .max()
3302            .cloned()
3303            .unwrap_or(Value::Empty),
3304    }
3305}
3306
3307/// Mission E1.3: try to extract equi-join key indices from a join `on`
3308/// predicate. Returns `Some((left_col_idx, right_col_idx))` when the
3309/// predicate is exactly `L = R` (or `R = L`) and both sides resolve
3310/// cleanly — `L` to the left subtree's column list and `R` to the right
3311/// subtree's column list.
3312///
3313/// This is deliberately narrow. We only recognise the two shapes:
3314///   * `QualifiedField = QualifiedField`  (`u.id = o.user_id`)
3315///   * `Field = Field`                    (`.id = .user_id`, unqualified)
3316///
3317/// Anything else — conjunctions, constants, function calls, or predicates
3318/// that touch the same side on both halves — falls through to the
3319/// nested-loop path unchanged.
3320pub(super) fn try_extract_equi_join_keys(
3321    pred: &Expr,
3322    left_columns: &[String],
3323    right_columns: &[String],
3324) -> Option<(usize, usize)> {
3325    let (lhs, op, rhs) = match pred {
3326        Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3327        _ => return None,
3328    };
3329    if op != BinOp::Eq {
3330        return None;
3331    }
3332    // Normal orientation: lhs in left, rhs in right.
3333    if let (Some(li), Some(ri)) = (
3334        resolve_side_column(lhs, left_columns),
3335        resolve_side_column(rhs, right_columns),
3336    ) {
3337        return Some((li, ri));
3338    }
3339    // Swapped: rhs in left, lhs in right. Both sides of `=` are
3340    // commutative so this is safe.
3341    if let (Some(li), Some(ri)) = (
3342        resolve_side_column(rhs, left_columns),
3343        resolve_side_column(lhs, right_columns),
3344    ) {
3345        return Some((li, ri));
3346    }
3347    None
3348}
3349
3350fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3351    match expr {
3352        Expr::QualifiedField { qualifier, field } => {
3353            // Byte-level match so we don't allocate a fresh `format!` on
3354            // every call — this runs once per plan, so allocation would be
3355            // cheap, but the match is trivial enough to keep inline with
3356            // the eval_expr version.
3357            let q = qualifier.as_bytes();
3358            let f = field.as_bytes();
3359            columns.iter().position(|c| {
3360                let b = c.as_bytes();
3361                b.len() == q.len() + 1 + f.len()
3362                    && b[..q.len()] == *q
3363                    && b[q.len()] == b'.'
3364                    && b[q.len() + 1..] == *f
3365            })
3366        }
3367        Expr::Field(name) => columns.iter().position(|c| c == name),
3368        _ => None,
3369    }
3370}
3371
3372/// Mission E1.3: O(L + R) hash join. Builds a `FxHashMap<Value, Vec<usize>>`
3373/// over the right (inner) side's join keys, then streams the left (outer)
3374/// side and for each probe row emits every combined row whose right-side
3375/// key matches. For `JoinKind::LeftOuter`, unmatched left rows are emitted
3376/// padded with `Value::Empty` on the right side.
3377///
3378/// The right side is always the build side. That choice is forced for
3379/// LeftOuter (the left side must stream so we can detect orphans), and
3380/// for Inner it's a reasonable default — left-deep plans tend to grow the
3381/// left side with each join, so the un-joined right leaf is often the
3382/// smaller of the two at each level.
3383pub(super) fn hash_join(
3384    left_columns: Vec<String>,
3385    left_rows: Vec<Vec<Value>>,
3386    right_columns: Vec<String>,
3387    right_rows: Vec<Vec<Value>>,
3388    left_key_idx: usize,
3389    right_key_idx: usize,
3390    kind: JoinKind,
3391) -> QueryResult {
3392    use rustc_hash::FxHashMap;
3393
3394    let n_left = left_columns.len();
3395    let n_right = right_columns.len();
3396    let mut columns = Vec::with_capacity(n_left + n_right);
3397    columns.extend(left_columns);
3398    columns.extend(right_columns);
3399
3400    // Build: right_key -> list of right-row indices. Pre-size to the row
3401    // count so the map doesn't rehash mid-build.
3402    let mut build: FxHashMap<Value, Vec<usize>> =
3403        FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3404    for (i, row) in right_rows.iter().enumerate() {
3405        // Skip Empty keys on the build side — they can never match under
3406        // SQL semantics (NULL ≠ NULL) and would collapse all nullables to
3407        // one bucket.
3408        if matches!(row[right_key_idx], Value::Empty) {
3409            continue;
3410        }
3411        build.entry(row[right_key_idx].clone()).or_default().push(i);
3412    }
3413
3414    // Reasonable starting capacity — inner joins produce ≥ left_rows.len()
3415    // rows in the common 1:1 case, left-outer always emits ≥ left_rows.len().
3416    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3417
3418    for left_row in &left_rows {
3419        let key = &left_row[left_key_idx];
3420        let matched = if matches!(key, Value::Empty) {
3421            None
3422        } else {
3423            build.get(key)
3424        };
3425        match matched {
3426            Some(matches) if !matches.is_empty() => {
3427                for &ri in matches {
3428                    let right_row = &right_rows[ri];
3429                    let mut combined = Vec::with_capacity(n_left + n_right);
3430                    combined.extend_from_slice(left_row);
3431                    combined.extend_from_slice(right_row);
3432                    rows.push(combined);
3433                }
3434            }
3435            _ => {
3436                if matches!(kind, JoinKind::LeftOuter) {
3437                    let mut row = Vec::with_capacity(n_left + n_right);
3438                    row.extend_from_slice(left_row);
3439                    row.resize(n_left + n_right, Value::Empty);
3440                    rows.push(row);
3441                }
3442            }
3443        }
3444    }
3445
3446    QueryResult::Rows { columns, rows }
3447}
3448
3449/// Lower unindexed `RangeScan` and `IndexScan` nodes to `Filter(SeqScan)`
3450/// so that all downstream fast paths (count, project+limit, sort+limit,
3451/// agg, update, delete) continue to fire.
3452///
3453/// The planner emits `RangeScan` (for `.age > 30`) and `IndexScan` (for
3454/// `.email = lit`) speculatively because it has no catalog access. When
3455/// the column has a B-tree index, those plans are correct. When it
3456/// doesn't, the executor's fallbacks materialise every matching row with
3457/// full `decode_row` — bypassing the compiled-predicate fast paths that
3458/// `Filter(SeqScan)` would trigger. Lowering both speculative leaf kinds
3459/// also keeps EXPLAIN honest: it prints the plan that actually runs.
3460///
3461/// This pass runs once per query, before execution.
3462pub(super) fn lower_unindexed_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3463    match plan {
3464        PlanNode::RangeScan {
3465            table,
3466            column,
3467            start,
3468            end,
3469        } => {
3470            if let Some(tbl) = catalog.get_table(table) {
3471                // Keep RangeScan whenever ANY index exists on the column:
3472                // unique indexes store raw column values, non-unique indexes
3473                // store composite (value, rid) keys that the executor walks
3474                // natively via BTree::range_rids. Only lower to Filter(SeqScan)
3475                // when the column is unindexed.
3476                if tbl.has_index(column) {
3477                    return plan.clone();
3478                }
3479            }
3480            let pred = synthesize_range_predicate(column, start, end);
3481            PlanNode::Filter {
3482                input: Box::new(PlanNode::SeqScan {
3483                    table: table.clone(),
3484                }),
3485                predicate: pred,
3486            }
3487        }
3488        PlanNode::Filter { input, predicate } => PlanNode::Filter {
3489            input: Box::new(lower_unindexed_scans(catalog, input)),
3490            predicate: predicate.clone(),
3491        },
3492        PlanNode::Project { input, fields } => PlanNode::Project {
3493            input: Box::new(lower_unindexed_scans(catalog, input)),
3494            fields: fields.clone(),
3495        },
3496        PlanNode::Sort { input, keys } => PlanNode::Sort {
3497            input: Box::new(lower_unindexed_scans(catalog, input)),
3498            keys: keys.clone(),
3499        },
3500        PlanNode::Limit { input, count } => PlanNode::Limit {
3501            input: Box::new(lower_unindexed_scans(catalog, input)),
3502            count: count.clone(),
3503        },
3504        PlanNode::Offset { input, count } => PlanNode::Offset {
3505            input: Box::new(lower_unindexed_scans(catalog, input)),
3506            count: count.clone(),
3507        },
3508        PlanNode::Aggregate {
3509            input,
3510            function,
3511            field,
3512        } => PlanNode::Aggregate {
3513            input: Box::new(lower_unindexed_scans(catalog, input)),
3514            function: *function,
3515            field: field.clone(),
3516        },
3517        PlanNode::Distinct { input } => PlanNode::Distinct {
3518            input: Box::new(lower_unindexed_scans(catalog, input)),
3519        },
3520        PlanNode::GroupBy {
3521            input,
3522            keys,
3523            aggregates,
3524            having,
3525        } => PlanNode::GroupBy {
3526            input: Box::new(lower_unindexed_scans(catalog, input)),
3527            keys: keys.clone(),
3528            aggregates: aggregates.clone(),
3529            having: having.clone(),
3530        },
3531        PlanNode::Update {
3532            input,
3533            table,
3534            assignments,
3535        } => PlanNode::Update {
3536            input: Box::new(lower_unindexed_scans(catalog, input)),
3537            table: table.clone(),
3538            assignments: assignments.clone(),
3539        },
3540        PlanNode::Delete { input, table } => PlanNode::Delete {
3541            input: Box::new(lower_unindexed_scans(catalog, input)),
3542            table: table.clone(),
3543        },
3544        PlanNode::Window { input, windows } => PlanNode::Window {
3545            input: Box::new(lower_unindexed_scans(catalog, input)),
3546            windows: windows.clone(),
3547        },
3548        PlanNode::Union { left, right, all } => PlanNode::Union {
3549            left: Box::new(lower_unindexed_scans(catalog, left)),
3550            right: Box::new(lower_unindexed_scans(catalog, right)),
3551            all: *all,
3552        },
3553        PlanNode::Explain { input } => PlanNode::Explain {
3554            input: Box::new(lower_unindexed_scans(catalog, input)),
3555        },
3556        PlanNode::NestedLoopJoin {
3557            left,
3558            right,
3559            on,
3560            kind,
3561        } => PlanNode::NestedLoopJoin {
3562            left: Box::new(lower_unindexed_scans(catalog, left)),
3563            right: Box::new(lower_unindexed_scans(catalog, right)),
3564            on: on.clone(),
3565            kind: *kind,
3566        },
3567        PlanNode::IndexScan { table, column, key } => {
3568            if let Some(tbl) = catalog.get_table(table) {
3569                if tbl.has_index(column) {
3570                    return plan.clone();
3571                }
3572            }
3573            PlanNode::Filter {
3574                input: Box::new(PlanNode::SeqScan {
3575                    table: table.clone(),
3576                }),
3577                predicate: Expr::BinaryOp(
3578                    Box::new(Expr::Field(column.clone())),
3579                    BinOp::Eq,
3580                    Box::new(key.clone()),
3581                ),
3582            }
3583        }
3584        // Leaf nodes: no children to recurse into.
3585        _ => plan.clone(),
3586    }
3587}
3588
3589/// Synthesize a range predicate from RangeScan bounds for the fallback path.
3590pub(super) fn synthesize_range_predicate(
3591    column: &str,
3592    start: &Option<(Expr, bool)>,
3593    end: &Option<(Expr, bool)>,
3594) -> Expr {
3595    let lower = start.as_ref().map(|(expr, inclusive)| {
3596        let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3597        Expr::BinaryOp(
3598            Box::new(Expr::Field(column.to_string())),
3599            op,
3600            Box::new(expr.clone()),
3601        )
3602    });
3603    let upper = end.as_ref().map(|(expr, inclusive)| {
3604        let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3605        Expr::BinaryOp(
3606            Box::new(Expr::Field(column.to_string())),
3607            op,
3608            Box::new(expr.clone()),
3609        )
3610    });
3611    match (lower, upper) {
3612        (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3613        (Some(l), None) => l,
3614        (None, Some(u)) => u,
3615        (None, None) => Expr::Literal(Literal::Bool(true)),
3616    }
3617}
3618
3619/// Check if a value falls within a range (used in last-resort decoded-row eval).
3620pub(super) fn range_matches(
3621    val: &Value,
3622    start: &Option<Value>,
3623    start_inc: bool,
3624    end: &Option<Value>,
3625    end_inc: bool,
3626) -> bool {
3627    if let Some(ref s) = start {
3628        if start_inc {
3629            if val < s {
3630                return false;
3631            }
3632        } else if val <= s {
3633            return false;
3634        }
3635    }
3636    if let Some(ref e) = end {
3637        if end_inc {
3638            if val > e {
3639                return false;
3640            }
3641        } else if val >= e {
3642            return false;
3643        }
3644    }
3645    true
3646}
3647
3648/// Format a `PlanNode` tree as a human-readable, indented text
3649/// representation. Used by the `EXPLAIN` command.
3650pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3651    let indent = "  ".repeat(depth);
3652    match plan {
3653        PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3654        PlanNode::AliasScan { table, alias } => {
3655            format!("{indent}AliasScan table={table} alias={alias}")
3656        }
3657        PlanNode::IndexScan { table, column, key } => {
3658            format!("{indent}IndexScan table={table} column={column} key={key:?}")
3659        }
3660        PlanNode::RangeScan {
3661            table,
3662            column,
3663            start,
3664            end,
3665        } => {
3666            let s = match start {
3667                Some((expr, inc)) => {
3668                    let op = if *inc { ">=" } else { ">" };
3669                    format!("{op}{expr:?}")
3670                }
3671                None => "unbounded".to_string(),
3672            };
3673            let e = match end {
3674                Some((expr, inc)) => {
3675                    let op = if *inc { "<=" } else { "<" };
3676                    format!("{op}{expr:?}")
3677                }
3678                None => "unbounded".to_string(),
3679            };
3680            format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3681        }
3682        PlanNode::Filter { input, predicate } => {
3683            let child = format_plan_tree(input, depth + 1);
3684            format!("{indent}Filter predicate={predicate:?}\n{child}")
3685        }
3686        PlanNode::Project { input, fields } => {
3687            let names: Vec<String> = fields
3688                .iter()
3689                .map(|f| match &f.alias {
3690                    Some(a) => format!("{a}: {:?}", f.expr),
3691                    None => format!("{:?}", f.expr),
3692                })
3693                .collect();
3694            let child = format_plan_tree(input, depth + 1);
3695            format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3696        }
3697        PlanNode::Sort { input, keys } => {
3698            let ks: Vec<String> = keys
3699                .iter()
3700                .map(|k| {
3701                    if k.descending {
3702                        format!("{} desc", k.field)
3703                    } else {
3704                        k.field.clone()
3705                    }
3706                })
3707                .collect();
3708            let child = format_plan_tree(input, depth + 1);
3709            format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3710        }
3711        PlanNode::Limit { input, count } => {
3712            let child = format_plan_tree(input, depth + 1);
3713            format!("{indent}Limit count={count:?}\n{child}")
3714        }
3715        PlanNode::Offset { input, count } => {
3716            let child = format_plan_tree(input, depth + 1);
3717            format!("{indent}Offset count={count:?}\n{child}")
3718        }
3719        PlanNode::Aggregate {
3720            input,
3721            function,
3722            field,
3723        } => {
3724            let f = field.as_deref().unwrap_or("*");
3725            let child = format_plan_tree(input, depth + 1);
3726            format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3727        }
3728        PlanNode::NestedLoopJoin {
3729            left,
3730            right,
3731            on,
3732            kind,
3733        } => {
3734            let left_child = format_plan_tree(left, depth + 1);
3735            let right_child = format_plan_tree(right, depth + 1);
3736            let on_str = match on {
3737                Some(pred) => format!("{pred:?}"),
3738                None => "none".to_string(),
3739            };
3740            format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3741        }
3742        PlanNode::Distinct { input } => {
3743            let child = format_plan_tree(input, depth + 1);
3744            format!("{indent}Distinct\n{child}")
3745        }
3746        PlanNode::GroupBy {
3747            input,
3748            keys,
3749            aggregates,
3750            having,
3751        } => {
3752            let agg_strs: Vec<String> = aggregates
3753                .iter()
3754                .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3755                .collect();
3756            let having_str = match having {
3757                Some(h) => format!(" having={h:?}"),
3758                None => String::new(),
3759            };
3760            let child = format_plan_tree(input, depth + 1);
3761            format!(
3762                "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3763                keys.join(", "),
3764                agg_strs.join(", "),
3765            )
3766        }
3767        PlanNode::Insert { table, rows } => {
3768            let cols: Vec<&str> = rows
3769                .first()
3770                .map(|r| r.iter().map(|a| a.field.as_str()).collect())
3771                .unwrap_or_default();
3772            format!(
3773                "{indent}Insert table={table} rows={} cols=[{}]",
3774                rows.len(),
3775                cols.join(", ")
3776            )
3777        }
3778        PlanNode::Upsert {
3779            table,
3780            key_column,
3781            assignments,
3782            on_conflict,
3783        } => {
3784            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3785            let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3786            if conflict_cols.is_empty() {
3787                format!(
3788                    "{indent}Upsert table={table} key={key_column} cols=[{}]",
3789                    cols.join(", ")
3790                )
3791            } else {
3792                format!(
3793                    "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3794                    cols.join(", "),
3795                    conflict_cols.join(", ")
3796                )
3797            }
3798        }
3799        PlanNode::Update {
3800            input,
3801            table,
3802            assignments,
3803        } => {
3804            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3805            let child = format_plan_tree(input, depth + 1);
3806            format!(
3807                "{indent}Update table={table} set=[{}]\n{child}",
3808                cols.join(", ")
3809            )
3810        }
3811        PlanNode::Delete { input, table } => {
3812            let child = format_plan_tree(input, depth + 1);
3813            format!("{indent}Delete table={table}\n{child}")
3814        }
3815        PlanNode::CreateTable { name, fields } => {
3816            let fs: Vec<String> = fields
3817                .iter()
3818                .map(|f| {
3819                    let mut mods = String::new();
3820                    if f.required {
3821                        mods.push_str(" required");
3822                    }
3823                    if f.unique {
3824                        mods.push_str(" unique");
3825                    }
3826                    format!("{}: {}{mods}", f.name, f.type_name)
3827                })
3828                .collect();
3829            format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3830        }
3831        PlanNode::AlterTable { table, action } => {
3832            format!("{indent}AlterTable table={table} action={action:?}")
3833        }
3834        PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3835        PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3836        PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3837        PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3838        PlanNode::Window { input, windows } => {
3839            let ws: Vec<String> = windows
3840                .iter()
3841                .map(|w| format!("{:?} as {}", w.function, w.output_name))
3842                .collect();
3843            let child = format_plan_tree(input, depth + 1);
3844            format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3845        }
3846        PlanNode::Union { left, right, all } => {
3847            let kind = if *all { "UNION ALL" } else { "UNION" };
3848            let left_child = format_plan_tree(left, depth + 1);
3849            let right_child = format_plan_tree(right, depth + 1);
3850            format!("{indent}{kind}\n{left_child}\n{right_child}")
3851        }
3852        PlanNode::Explain { input } => {
3853            let child = format_plan_tree(input, depth + 1);
3854            format!("{indent}Explain\n{child}")
3855        }
3856        PlanNode::Begin => format!("{indent}Begin"),
3857        PlanNode::Commit => format!("{indent}Commit"),
3858        PlanNode::Rollback => format!("{indent}Rollback"),
3859    }
3860}