Skip to main content

powdb_query/executor/
plan_exec.rs

1//! The execute_plan method and associated helpers.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::{check_join_limit, Engine, MAX_SORT_ROWS};
15use powdb_storage::view::{ViewDef, ViewRegistry};
16
17impl Engine {
18    pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
19        match plan {
20            PlanNode::SeqScan { table } => {
21                // Auto-refresh dirty materialized views on read.
22                if self.view_registry.is_dirty(table) {
23                    self.refresh_view(table)?;
24                }
25                let schema = self
26                    .catalog
27                    .schema(table)
28                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
29                    .clone();
30                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
31                let rows: Vec<Vec<Value>> = self
32                    .catalog
33                    .scan(table)
34                    .map_err(|e| QueryError::StorageError(e.to_string()))?
35                    .map(|(_, row)| row)
36                    .collect();
37                Ok(QueryResult::Rows { columns, rows })
38            }
39
40            PlanNode::Filter { input, predicate } => {
41                // Materialize any IN-subqueries in the predicate before the
42                // scan loop — the closure can't call back into the engine.
43                // Correlated subqueries are left in place for per-row eval.
44                let materialized;
45                let predicate = if contains_subquery(predicate) {
46                    materialized = self.materialize_subqueries(predicate)?;
47                    &materialized
48                } else {
49                    predicate
50                };
51
52                // Correlated subquery path: per-row materialisation.
53                if contains_subquery(predicate) {
54                    let result = self.execute_plan(input)?;
55                    return match result {
56                        QueryResult::Rows { columns, rows } => {
57                            let mut filtered = Vec::new();
58                            for row in rows {
59                                let row_pred =
60                                    self.materialize_correlated_for_row(predicate, &row, &columns)?;
61                                if eval_predicate(&row_pred, &row, &columns) {
62                                    filtered.push(row);
63                                }
64                            }
65                            Ok(QueryResult::Rows {
66                                columns,
67                                rows: filtered,
68                            })
69                        }
70                        _ => Err("filter requires row input".into()),
71                    };
72                }
73
74                // Fast path: fuse Filter + SeqScan into a zero-copy streaming
75                // loop. Uses decode_column() to evaluate the predicate on only
76                // the columns it references, avoiding heap allocations for
77                // String/Bytes columns that aren't part of the filter.
78                if let PlanNode::SeqScan { table } = input.as_ref() {
79                    // Auto-refresh dirty materialized views.
80                    if self.view_registry.is_dirty(table) {
81                        self.refresh_view(table)?;
82                    }
83                    let schema = self
84                        .catalog
85                        .schema(table)
86                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
87                        .clone();
88                    let columns: Vec<String> =
89                        schema.columns.iter().map(|c| c.name.clone()).collect();
90                    let fast = FastLayout::new(&schema);
91                    let row_layout = RowLayout::new(&schema);
92                    // Mission F: pre-size to skip the first 4 Vec doublings
93                    // (4 → 8 → 16 → 32 → 64). On a 100K-row scan with 30%
94                    // selectivity that's ~4 fewer reallocations + memcpys.
95                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
96
97                    // Try compiled predicate for the filter check (handles
98                    // int leaves, string-eq leaves, and And conjunctions).
99                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
100                        self.catalog
101                            .for_each_row_raw(table, |_rid, data| {
102                                if compiled(data) {
103                                    rows.push(decode_row(&schema, data));
104                                }
105                            })
106                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
107                    } else {
108                        let pred_cols = predicate_column_indices(predicate, &columns);
109                        self.catalog
110                            .for_each_row_raw(table, |_rid, data| {
111                                let pred_row =
112                                    decode_selective(&schema, &row_layout, data, &pred_cols);
113                                if eval_predicate(predicate, &pred_row, &columns) {
114                                    rows.push(decode_row(&schema, data));
115                                }
116                            })
117                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
118                    }
119
120                    return Ok(QueryResult::Rows { columns, rows });
121                }
122
123                // General path: materialise then filter.
124                let result = self.execute_plan(input)?;
125                match result {
126                    QueryResult::Rows { columns, rows } => {
127                        let filtered: Vec<Vec<Value>> = rows
128                            .into_iter()
129                            .filter(|row| eval_predicate(predicate, row, &columns))
130                            .collect();
131                        Ok(QueryResult::Rows {
132                            columns,
133                            rows: filtered,
134                        })
135                    }
136                    _ => Err("filter requires row input".into()),
137                }
138            }
139
140            PlanNode::Project { input, fields } => {
141                // Fast path: Project over IndexScan — decode only projected
142                // columns from raw bytes instead of full decode_row.
143                if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
144                    let schema = self
145                        .catalog
146                        .schema(table)
147                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
148                        .clone();
149                    let all_columns: Vec<String> =
150                        schema.columns.iter().map(|c| c.name.clone()).collect();
151                    let key_value = literal_to_value(key)?;
152                    let tbl = self
153                        .catalog
154                        .get_table(table)
155                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
156
157                    let proj_columns: Vec<String> = fields
158                        .iter()
159                        .map(|f| {
160                            f.alias.clone().unwrap_or_else(|| match &f.expr {
161                                Expr::Field(name) => name.clone(),
162                                _ => "?".into(),
163                            })
164                        })
165                        .collect();
166
167                    // Determine which column indices the projection needs
168                    let proj_indices: Vec<usize> = fields
169                        .iter()
170                        .filter_map(|f| {
171                            if let Expr::Field(name) = &f.expr {
172                                all_columns.iter().position(|c| c == name)
173                            } else {
174                                None
175                            }
176                        })
177                        .collect();
178
179                    if tbl.has_index(column) {
180                        let layout = RowLayout::new(&schema);
181                        let rids = tbl.index_lookup_all(column, &key_value);
182                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
183                        for rid in rids {
184                            if let Some(data) = tbl.heap.get(rid) {
185                                let row: Vec<Value> = proj_indices
186                                    .iter()
187                                    .map(|&ci| decode_column(&schema, &layout, &data, ci))
188                                    .collect();
189                                rows.push(row);
190                            }
191                        }
192                        return Ok(QueryResult::Rows {
193                            columns: proj_columns,
194                            rows,
195                        });
196                    }
197                }
198
199                // Fast path: Project(Limit(Sort(Filter(SeqScan)))) — bounded
200                // top-N heap. Decodes only the sort key + projected columns,
201                // keeps at most `limit` rows in a heap. Also handles the
202                // Project(Limit(Sort(SeqScan))) variant (no filter).
203                if let PlanNode::Limit {
204                    input: inner,
205                    count: limit_expr,
206                } = input.as_ref()
207                {
208                    if let PlanNode::Sort {
209                        input: sort_input,
210                        keys,
211                    } = inner.as_ref()
212                    {
213                        // Fast path only for single-key sorts
214                        if keys.len() == 1 {
215                            let sort_field = &keys[0].field;
216                            let descending = keys[0].descending;
217                            let limit = match limit_expr {
218                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
219                                _ => usize::MAX,
220                            };
221                            let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
222                                match sort_input.as_ref() {
223                                    PlanNode::SeqScan { table } => (Some(table.as_str()), None),
224                                    PlanNode::Filter {
225                                        input: fi,
226                                        predicate,
227                                    } => {
228                                        if let PlanNode::SeqScan { table } = fi.as_ref() {
229                                            (Some(table.as_str()), Some(predicate))
230                                        } else {
231                                            (None, None)
232                                        }
233                                    }
234                                    _ => (None, None),
235                                };
236                            if let Some(table) = table_opt {
237                                if let Some(result) = self.project_filter_sort_limit_fast(
238                                    table, fields, sort_field, descending, limit, pred_opt,
239                                )? {
240                                    return Ok(result);
241                                }
242                            }
243                        }
244                    }
245                    // Fast path: Project(Limit(Filter(SeqScan))) — stream,
246                    // decode only projected columns, stop at limit.
247                    if let PlanNode::Filter {
248                        input: fi,
249                        predicate,
250                    } = inner.as_ref()
251                    {
252                        if let PlanNode::SeqScan { table } = fi.as_ref() {
253                            let limit = match limit_expr {
254                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
255                                _ => usize::MAX,
256                            };
257                            if let Some(result) = self.project_filter_limit_fast(
258                                table,
259                                fields,
260                                limit,
261                                Some(predicate),
262                            )? {
263                                return Ok(result);
264                            }
265                        }
266                    }
267                    // Fast path: Project(Limit(SeqScan)) — stream, no filter.
268                    if let PlanNode::SeqScan { table } = inner.as_ref() {
269                        let limit = match limit_expr {
270                            Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
271                            _ => usize::MAX,
272                        };
273                        if let Some(result) =
274                            self.project_filter_limit_fast(table, fields, limit, None)?
275                        {
276                            return Ok(result);
277                        }
278                    }
279                }
280
281                // Mission D4: Project(Filter(SeqScan)) without Limit. Reuses
282                // `project_filter_limit_fast` with limit = usize::MAX so the
283                // hot loop decodes only projected columns and uses the
284                // compiled predicate. Previously this fell through to the
285                // generic Filter branch which materialised every column via
286                // `decode_row` then re-projected — quadratic work.
287                //
288                // multi_col_and_filter (`U filter .age > 30 and .status =
289                // "active" { .name, .age }`) was 6.18ms (0.7x SQLite) and
290                // is the load-bearing workload for this fast path.
291                if let PlanNode::Filter {
292                    input: fi,
293                    predicate,
294                } = input.as_ref()
295                {
296                    if let PlanNode::SeqScan { table } = fi.as_ref() {
297                        if let Some(result) = self.project_filter_limit_fast(
298                            table,
299                            fields,
300                            usize::MAX,
301                            Some(predicate),
302                        )? {
303                            return Ok(result);
304                        }
305                    }
306                }
307
308                // Mission D4: Project(SeqScan) without Filter or Limit.
309                // Decode only projected columns; the previous fall-through
310                // built full Vec<Value> rows then re-projected.
311                if let PlanNode::SeqScan { table } = input.as_ref() {
312                    if let Some(result) =
313                        self.project_filter_limit_fast(table, fields, usize::MAX, None)?
314                    {
315                        return Ok(result);
316                    }
317                }
318
319                let result = self.execute_plan(input)?;
320                match result {
321                    QueryResult::Rows { columns, rows } => {
322                        let proj_columns: Vec<String> = fields
323                            .iter()
324                            .map(|f| {
325                                f.alias.clone().unwrap_or_else(|| match &f.expr {
326                                    Expr::Field(name) => name.clone(),
327                                    // Mission E1.2: `{ u.name }` projects as the
328                                    // qualified column name so callers can still
329                                    // disambiguate across the join output.
330                                    Expr::QualifiedField { qualifier, field } => {
331                                        format!("{qualifier}.{field}")
332                                    }
333                                    _ => "?".into(),
334                                })
335                            })
336                            .collect();
337                        let proj_rows: Vec<Vec<Value>> = rows
338                            .iter()
339                            .map(|row| {
340                                fields
341                                    .iter()
342                                    .map(|f| eval_expr(&f.expr, row, &columns))
343                                    .collect()
344                            })
345                            .collect();
346                        Ok(QueryResult::Rows {
347                            columns: proj_columns,
348                            rows: proj_rows,
349                        })
350                    }
351                    _ => Err("project requires row input".into()),
352                }
353            }
354
355            PlanNode::Sort { input, keys } => {
356                let result = self.execute_plan(input)?;
357                match result {
358                    QueryResult::Rows { columns, mut rows } => {
359                        // WS2: row-count cap is a cheap secondary guard; the
360                        // byte budget is the real OOM defense for the sort
361                        // buffer (a few very large rows pass the row cap).
362                        if rows.len() > MAX_SORT_ROWS {
363                            return Err(QueryError::SortLimitExceeded);
364                        }
365                        self.charge_rows(&rows)?;
366                        let key_indices: Vec<(usize, bool)> = keys
367                            .iter()
368                            .map(|k| {
369                                columns
370                                    .iter()
371                                    .position(|c| c == &k.field)
372                                    .map(|idx| (idx, k.descending))
373                                    .ok_or_else(|| QueryError::ColumnNotFound {
374                                        table: String::new(),
375                                        column: k.field.clone(),
376                                    })
377                            })
378                            .collect::<Result<_, QueryError>>()?;
379                        rows.sort_by(|a, b| {
380                            for &(col_idx, descending) in &key_indices {
381                                let cmp = a[col_idx].cmp(&b[col_idx]);
382                                let cmp = if descending { cmp.reverse() } else { cmp };
383                                if cmp != std::cmp::Ordering::Equal {
384                                    return cmp;
385                                }
386                            }
387                            std::cmp::Ordering::Equal
388                        });
389                        Ok(QueryResult::Rows { columns, rows })
390                    }
391                    _ => Err("sort requires row input".into()),
392                }
393            }
394
395            PlanNode::Limit { input, count } => {
396                let result = self.execute_plan(input)?;
397                let n = match count {
398                    Expr::Literal(Literal::Int(v)) => *v as usize,
399                    _ => return Err("limit must be integer literal".into()),
400                };
401                match result {
402                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
403                        columns,
404                        rows: rows.into_iter().take(n).collect(),
405                    }),
406                    _ => Err("limit requires row input".into()),
407                }
408            }
409
410            PlanNode::Offset { input, count } => {
411                let result = self.execute_plan(input)?;
412                let n = match count {
413                    Expr::Literal(Literal::Int(v)) => *v as usize,
414                    _ => return Err("offset must be integer literal".into()),
415                };
416                match result {
417                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
418                        columns,
419                        rows: rows.into_iter().skip(n).collect(),
420                    }),
421                    _ => Err("offset requires row input".into()),
422                }
423            }
424
425            PlanNode::Aggregate {
426                input,
427                function,
428                field,
429            } => {
430                // Fast path: count() over SeqScan — count rows without any decode
431                if *function == AggFunc::Count {
432                    if let PlanNode::SeqScan { table } = input.as_ref() {
433                        let mut count: i64 = 0;
434                        self.catalog
435                            .for_each_row_raw(table, |_rid, _data| {
436                                count += 1;
437                            })
438                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
439                        return Ok(QueryResult::Scalar(Value::Int(count)));
440                    }
441                    // Fast path: count() over Filter(SeqScan) — try compiled
442                    // predicate first, fall back to decode_column path.
443                    if let PlanNode::Filter {
444                        input: inner,
445                        predicate,
446                    } = input.as_ref()
447                    {
448                        if let PlanNode::SeqScan { table } = inner.as_ref() {
449                            let schema = self
450                                .catalog
451                                .schema(table)
452                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
453                                .clone();
454                            let columns: Vec<String> =
455                                schema.columns.iter().map(|c| c.name.clone()).collect();
456                            let fast = FastLayout::new(&schema);
457                            let row_layout = RowLayout::new(&schema);
458
459                            // Try compiled predicate (zero-allocation hot path).
460                            // Handles int leaves, string-eq leaves, AND conjunctions.
461                            if let Some(compiled) =
462                                compile_predicate(predicate, &columns, &fast, &schema)
463                            {
464                                let mut count: i64 = 0;
465                                self.catalog
466                                    .for_each_row_raw(table, |_rid, data| {
467                                        if compiled(data) {
468                                            count += 1;
469                                        }
470                                    })
471                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
472                                return Ok(QueryResult::Scalar(Value::Int(count)));
473                            }
474
475                            // Fallback: decode predicate columns
476                            let pred_cols = predicate_column_indices(predicate, &columns);
477                            let mut count: i64 = 0;
478                            self.catalog
479                                .for_each_row_raw(table, |_rid, data| {
480                                    let pred_row =
481                                        decode_selective(&schema, &row_layout, data, &pred_cols);
482                                    if eval_predicate(predicate, &pred_row, &columns) {
483                                        count += 1;
484                                    }
485                                })
486                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
487
488                            return Ok(QueryResult::Scalar(Value::Int(count)));
489                        }
490                    }
491                }
492
493                // Fast path: sum/avg/min/max over a single fixed-size int
494                // column with an optional compiled filter predicate. Walks
495                // raw row bytes, zero allocation per row.
496                if matches!(
497                    function,
498                    AggFunc::Sum
499                        | AggFunc::Avg
500                        | AggFunc::Min
501                        | AggFunc::Max
502                        | AggFunc::CountDistinct
503                ) {
504                    if let Some(col) = field.as_ref() {
505                        // Shape: Aggregate(SeqScan) or Aggregate(Filter(SeqScan))
506                        let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
507                            match input.as_ref() {
508                                PlanNode::SeqScan { table } => (Some(table.as_str()), None),
509                                PlanNode::Filter {
510                                    input: inner,
511                                    predicate,
512                                } => {
513                                    if let PlanNode::SeqScan { table } = inner.as_ref() {
514                                        (Some(table.as_str()), Some(predicate))
515                                    } else {
516                                        (None, None)
517                                    }
518                                }
519                                _ => (None, None),
520                            };
521                        if let Some(table) = table_opt {
522                            if let Some(result) =
523                                self.agg_single_col_fast(table, col, *function, pred_opt)?
524                            {
525                                return Ok(result);
526                            }
527                        }
528                    }
529                }
530
531                // Fast path: Project(Limit(Filter(SeqScan))) — stream, decode
532                // only projected columns, stop once we hit the limit.
533                // (Handled in the Project branch; this branch only fires when
534                // the aggregate is the outer node.)
535                let result = self.execute_plan(input)?;
536                match result {
537                    QueryResult::Rows { columns, rows } => {
538                        match function {
539                            AggFunc::Count => {
540                                Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
541                            }
542                            AggFunc::CountDistinct => {
543                                let col = field.as_ref().ok_or("count distinct requires field")?;
544                                let idx = columns
545                                    .iter()
546                                    .position(|c| c == col)
547                                    .ok_or("col not found")?;
548                                let mut seen = std::collections::HashSet::new();
549                                for row in &rows {
550                                    let v = &row[idx];
551                                    if !v.is_empty() {
552                                        seen.insert(v.clone());
553                                    }
554                                }
555                                Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
556                            }
557                            AggFunc::Avg => {
558                                let col = field.as_ref().ok_or("avg requires field")?;
559                                let idx = columns
560                                    .iter()
561                                    .position(|c| c == col)
562                                    .ok_or("col not found")?;
563                                let sum: f64 = rows
564                                    .iter()
565                                    .filter_map(|r| match &r[idx] {
566                                        Value::Int(v) => Some(*v as f64),
567                                        Value::Float(v) => Some(*v),
568                                        _ => None,
569                                    })
570                                    .sum();
571                                let count = rows.len() as f64;
572                                Ok(QueryResult::Scalar(Value::Float(sum / count)))
573                            }
574                            AggFunc::Sum => {
575                                let col = field.as_ref().ok_or("sum requires field")?;
576                                let idx = columns
577                                    .iter()
578                                    .position(|c| c == col)
579                                    .ok_or("col not found")?;
580                                // Track int and float contributions separately so
581                                // Float columns (and mixed Int/Float rows) don't get
582                                // silently dropped as they did in the Int-only
583                                // version. If any Float is present, the whole sum
584                                // promotes to Float — matching Avg's semantics.
585                                let mut int_sum: i64 = 0;
586                                let mut float_sum: f64 = 0.0;
587                                let mut saw_float = false;
588                                for r in &rows {
589                                    match &r[idx] {
590                                        Value::Int(v) => int_sum += *v,
591                                        Value::Float(v) => {
592                                            float_sum += *v;
593                                            saw_float = true;
594                                        }
595                                        _ => {}
596                                    }
597                                }
598                                let result = if saw_float {
599                                    Value::Float(float_sum + int_sum as f64)
600                                } else {
601                                    Value::Int(int_sum)
602                                };
603                                Ok(QueryResult::Scalar(result))
604                            }
605                            AggFunc::Min | AggFunc::Max => {
606                                let col = field.as_ref().ok_or("min/max requires field")?;
607                                let idx = columns
608                                    .iter()
609                                    .position(|c| c == col)
610                                    .ok_or("col not found")?;
611                                let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
612                                let result = if *function == AggFunc::Min {
613                                    vals.into_iter().min().cloned()
614                                } else {
615                                    vals.into_iter().max().cloned()
616                                };
617                                Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
618                            }
619                        }
620                    }
621                    _ => Err("aggregate requires row input".into()),
622                }
623            }
624
625            PlanNode::Insert { table, assignments } => {
626                let values = {
627                    let schema = self
628                        .catalog
629                        .schema(table)
630                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
631                    let mut values = vec![Value::Empty; schema.columns.len()];
632                    for a in assignments {
633                        let idx = schema.column_index(&a.field).ok_or_else(|| {
634                            QueryError::ColumnNotFound {
635                                table: String::new(),
636                                column: a.field.clone(),
637                            }
638                        })?;
639                        let raw = literal_to_value(&a.value)?;
640                        values[idx] = coerce_value(raw, &schema.columns[idx])?;
641                    }
642                    for col in &schema.columns {
643                        if col.required && matches!(values[col.position as usize], Value::Empty) {
644                            return Err(QueryError::Execution(format!(
645                                "column '{}' is required but no value was provided",
646                                col.name
647                            )));
648                        }
649                    }
650                    values
651                };
652                self.catalog
653                    .insert(table, &values)
654                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
655                self.view_registry.mark_dependents_dirty(table);
656                Ok(QueryResult::Modified(1))
657            }
658
659            PlanNode::Upsert {
660                table,
661                key_column,
662                assignments,
663                on_conflict,
664            } => {
665                let (values, key_idx) = {
666                    let schema = self
667                        .catalog
668                        .schema(table)
669                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
670                    let mut values = vec![Value::Empty; schema.columns.len()];
671                    for a in assignments {
672                        let idx = schema.column_index(&a.field).ok_or_else(|| {
673                            QueryError::ColumnNotFound {
674                                table: String::new(),
675                                column: a.field.clone(),
676                            }
677                        })?;
678                        let raw = literal_to_value(&a.value)?;
679                        values[idx] = coerce_value(raw, &schema.columns[idx])?;
680                    }
681                    for col in &schema.columns {
682                        if col.required && matches!(values[col.position as usize], Value::Empty) {
683                            return Err(QueryError::Execution(format!(
684                                "column '{}' is required but no value was provided",
685                                col.name
686                            )));
687                        }
688                    }
689                    let key_idx = schema
690                        .column_index(key_column)
691                        .ok_or_else(|| format!("key column '{key_column}' not found"))?;
692                    (values, key_idx)
693                };
694
695                let key_value = values[key_idx].clone();
696
697                // Probe the index for a conflict.
698                let existing = {
699                    let tbl = self
700                        .catalog
701                        .get_table(table)
702                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
703                    if tbl.has_index(key_column) {
704                        // Upsert key lookup: return the first match.
705                        // For unique indexes this is the only match.
706                        // For non-unique indexes on a key column, also
707                        // just the first (upsert semantics).
708                        let rids = tbl.index_lookup_all(key_column, &key_value);
709                        rids.into_iter().next().and_then(|rid| {
710                            tbl.heap
711                                .get(rid)
712                                .map(|data| (rid, decode_row(&tbl.schema, &data)))
713                        })
714                    } else {
715                        // No index — linear scan for the key.
716                        let mut found = None;
717                        for (rid, row) in tbl.scan() {
718                            if row[key_idx] == key_value {
719                                found = Some((rid, row));
720                                break;
721                            }
722                        }
723                        found
724                    }
725                };
726
727                if let Some((rid, mut existing_row)) = existing {
728                    // Conflict: apply on_conflict assignments (or all non-key if empty).
729                    let update_assignments = if on_conflict.is_empty() {
730                        assignments
731                    } else {
732                        on_conflict
733                    };
734                    let changed_cols: Vec<usize> = {
735                        let schema = self
736                            .catalog
737                            .schema(table)
738                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
739                        let mut indices = Vec::new();
740                        for a in update_assignments {
741                            let idx = schema.column_index(&a.field).ok_or_else(|| {
742                                QueryError::ColumnNotFound {
743                                    table: String::new(),
744                                    column: a.field.clone(),
745                                }
746                            })?;
747                            if idx != key_idx {
748                                existing_row[idx] = literal_to_value(&a.value)?;
749                                indices.push(idx);
750                            }
751                        }
752                        indices
753                    };
754                    self.catalog
755                        .update_hinted(table, rid, &existing_row, Some(&changed_cols))
756                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
757                    self.view_registry.mark_dependents_dirty(table);
758                    Ok(QueryResult::Modified(1))
759                } else {
760                    // No conflict: insert.
761                    self.catalog
762                        .insert(table, &values)
763                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
764                    self.view_registry.mark_dependents_dirty(table);
765                    Ok(QueryResult::Modified(1))
766                }
767            }
768
769            PlanNode::Update {
770                input,
771                table,
772                assignments,
773            } => {
774                // Mission C Phase 3: resolve assignments against a borrowed
775                // schema, then drop the borrow before the mutation loop.
776                // Try literal-only path first; fall back to per-row expression
777                // evaluation if any assignment contains a non-literal expression
778                // (e.g., `age := .age + 1`).
779                let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
780                    let schema_ref = self
781                        .catalog
782                        .schema(table)
783                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
784                    let indices: Vec<usize> = assignments
785                        .iter()
786                        .map(|a| {
787                            schema_ref.column_index(&a.field).ok_or_else(|| {
788                                QueryError::ColumnNotFound {
789                                    table: String::new(),
790                                    column: a.field.clone(),
791                                }
792                            })
793                        })
794                        .collect::<Result<_, _>>()?;
795                    let vals: Result<Vec<Value>, _> = assignments
796                        .iter()
797                        .map(|a| literal_to_value(&a.value))
798                        .collect();
799                    (indices, vals.ok())
800                };
801                let resolved_assignments: Option<Vec<(usize, Value)>> =
802                    literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
803
804                // Mission C Phase 2: the hint Table::update_hinted needs to
805                // decide whether to read the old row for index diff.
806                let changed_cols: Vec<usize> = col_indices.clone();
807
808                // ── Fused scan+update for Update(Filter(SeqScan)) ────────
809                // Perf sprint: instead of the two-pass collect-RIDs-then-loop
810                // pattern (which pays one ensure_hot per matched row on the
811                // second pass), fuse the predicate evaluation and in-place
812                // byte-level mutation into a single heap walk. Same idea as
813                // the fused scan_delete_matching path for deletes.
814                if let Some(ref resolved_assignments) = resolved_assignments {
815                    if let PlanNode::Filter {
816                        input: inner,
817                        predicate,
818                    } = input.as_ref()
819                    {
820                        if let PlanNode::SeqScan { table: t } = inner.as_ref() {
821                            if t == table {
822                                let fused_result = self.try_fused_scan_update(
823                                    table,
824                                    predicate,
825                                    resolved_assignments,
826                                    &changed_cols,
827                                );
828                                if let Some(result) = fused_result {
829                                    return result;
830                                }
831                            }
832                        }
833                    }
834                }
835
836                // Collect matching RowIds in a single pass.
837                let matching_rids = self.collect_rids_for_mutation(input, table)?;
838
839                // ── Literal-only fast paths ─────────────────────────────
840                if let Some(ref resolved_assignments) = resolved_assignments {
841                    // Mission C Phase 4: in-place byte-patch fast path. If every
842                    // assignment targets a fixed-size non-null column AND none of
843                    // them is indexed, we can skip decode_row / Vec<Value> /
844                    // encode_row_into entirely and patch the row's raw bytes on
845                    // the hot page.
846                    let fast_patch: Option<Vec<FastPatch>> = {
847                        let tbl = self
848                            .catalog
849                            .get_table(table)
850                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
851                        let schema = &tbl.schema;
852                        let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
853                            is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
854                        });
855                        let no_indexed = !resolved_assignments
856                            .iter()
857                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
858
859                        if all_fixed_nonnull && no_indexed {
860                            let layout = RowLayout::new(schema);
861                            let bitmap_size = layout.bitmap_size();
862                            let patches: Vec<FastPatch> = resolved_assignments
863                                .iter()
864                                .map(|(idx, val)| {
865                                    let fixed_off = layout
866                                        .fixed_offset(*idx)
867                                        .expect("is_fixed_size already checked");
868                                    let field_off = 2 + bitmap_size + fixed_off;
869                                    let bytes: FixedBytes = match val {
870                                        Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
871                                        Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
872                                        Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
873                                        Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
874                                        Value::Uuid(v) => FixedBytes::Uuid(*v),
875                                        _ => unreachable!("all_fixed_nonnull guard lied"),
876                                    };
877                                    FastPatch {
878                                        field_off,
879                                        bitmap_byte_off: 2 + idx / 8,
880                                        bit_mask: 1u8 << (idx % 8),
881                                        bytes,
882                                    }
883                                })
884                                .collect();
885                            Some(patches)
886                        } else {
887                            None
888                        }
889                    };
890
891                    if let Some(patches) = fast_patch {
892                        let mut count = 0u64;
893                        for rid in matching_rids {
894                            // Mission B2: WAL-log every patch so crash
895                            // recovery replays the update. Same mutation
896                            // closure as before — the wrapper just sandwiches
897                            // it between a hot-page read and a WAL append.
898                            let ok = self
899                                .catalog
900                                .update_row_bytes_logged(table, rid, |row| {
901                                    for p in &patches {
902                                        row[p.bitmap_byte_off] &= !p.bit_mask;
903                                        let field_bytes = p.bytes.as_slice();
904                                        row[p.field_off..p.field_off + field_bytes.len()]
905                                            .copy_from_slice(field_bytes);
906                                    }
907                                })
908                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
909                            if ok {
910                                count += 1;
911                            }
912                        }
913                        self.view_registry.mark_dependents_dirty(table);
914                        return Ok(QueryResult::Modified(count));
915                    }
916
917                    // Mission C Phase 10: var-column in-place shrink fast path.
918                    let var_fast: Option<(usize, Option<Vec<u8>>)> = {
919                        let tbl = self
920                            .catalog
921                            .get_table(table)
922                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
923                        let schema = &tbl.schema;
924                        let is_single = resolved_assignments.len() == 1;
925                        let is_var_col = is_single
926                            && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
927                        let no_indexed = !resolved_assignments
928                            .iter()
929                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
930
931                        if is_single && is_var_col && no_indexed {
932                            let (idx, val) = &resolved_assignments[0];
933                            let bytes_opt: Option<Vec<u8>> = match val {
934                                Value::Str(s) => Some(s.as_bytes().to_vec()),
935                                Value::Bytes(b) => Some(b.clone()),
936                                Value::Empty => None,
937                                _ => {
938                                    return Err(QueryError::TypeError(format!(
939                                        "cannot assign non-var value to var column '{}'",
940                                        schema.columns[*idx].name
941                                    )))
942                                }
943                            };
944                            Some((*idx, bytes_opt))
945                        } else {
946                            None
947                        }
948                    };
949
950                    if let Some((col_idx, new_bytes_opt)) = var_fast {
951                        let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
952                        let mut count = 0u64;
953                        let mut fallback_rids: Vec<RowId> = Vec::new();
954                        for rid in &matching_rids {
955                            // Mission B2: logged variant so crash recovery
956                            // replays the shrink. On a false return (row
957                            // would have to grow), the rid is pushed to
958                            // `fallback_rids` and the slower `update_hinted`
959                            // path — which is already WAL-logged — picks it up.
960                            let ok = self
961                                .catalog
962                                .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
963                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
964                            if ok {
965                                count += 1;
966                            } else {
967                                fallback_rids.push(*rid);
968                            }
969                        }
970                        for rid in fallback_rids {
971                            let mut row = match self.catalog.get(table, rid) {
972                                Some(r) => r,
973                                None => continue,
974                            };
975                            for (idx, val) in resolved_assignments.iter() {
976                                row[*idx] = val.clone();
977                            }
978                            self.catalog
979                                .update_hinted(table, rid, &row, Some(&changed_cols))
980                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
981                            count += 1;
982                        }
983                        self.view_registry.mark_dependents_dirty(table);
984                        return Ok(QueryResult::Modified(count));
985                    }
986
987                    // Generic literal path: decode row, apply literal values.
988                    let mut count = 0u64;
989                    for rid in matching_rids {
990                        let mut row = match self.catalog.get(table, rid) {
991                            Some(r) => r,
992                            None => continue,
993                        };
994                        for (idx, val) in resolved_assignments.iter() {
995                            row[*idx] = val.clone();
996                        }
997                        self.catalog
998                            .update_hinted(table, rid, &row, Some(&changed_cols))
999                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1000                        count += 1;
1001                    }
1002                    self.view_registry.mark_dependents_dirty(table);
1003                    return Ok(QueryResult::Modified(count));
1004                } // end if let Some(resolved_assignments)
1005
1006                // ── Expression-based update path ────────────────────────
1007                // At least one assignment contains a non-literal expression
1008                // (e.g., `age := .age + 1`). Evaluate per-row.
1009                let col_names: Vec<String> = {
1010                    let schema_ref = self
1011                        .catalog
1012                        .schema(table)
1013                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1014                    schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1015                };
1016                let mut count = 0u64;
1017                for rid in matching_rids {
1018                    let mut row = match self.catalog.get(table, rid) {
1019                        Some(r) => r,
1020                        None => continue,
1021                    };
1022                    for (i, asgn) in assignments.iter().enumerate() {
1023                        let val = eval_expr(&asgn.value, &row, &col_names);
1024                        row[col_indices[i]] = val;
1025                    }
1026                    self.catalog
1027                        .update_hinted(table, rid, &row, Some(&changed_cols))
1028                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1029                    count += 1;
1030                }
1031                self.view_registry.mark_dependents_dirty(table);
1032                Ok(QueryResult::Modified(count))
1033            }
1034
1035            PlanNode::Delete { input, table } => {
1036                // Mission C Phase 3: no schema clone — collect_rids_for_mutation
1037                // looks up schema internally when it needs one, and the mutation
1038                // loop doesn't need the schema at all.
1039                //
1040                // Mission C Phase 12: route bulk deletes through
1041                // `Catalog::delete_many`, which batches the btree leaf
1042                // compaction and shares one `ensure_hot` per row between
1043                // the index-key extraction and the slot delete. On
1044                // `delete_by_filter` (100K fixture, ~20K matches) that
1045                // removes ~4ms of pure `Vec::remove` memmove from the btree
1046                // maintenance phase.
1047                //
1048                // Mission C Phase 16: for the common `delete where ...`
1049                // shape (Filter(SeqScan)) — and the rarer "delete
1050                // everything" shape (SeqScan) — skip the two-pass
1051                // `collect_rids_for_mutation` + `delete_many` flow entirely.
1052                // The fused `scan_delete_matching` primitive walks the
1053                // heap exactly once, paying one `ensure_hot` per page
1054                // instead of per-row. That closes the last major gap on
1055                // the bench's `delete_by_filter` workload.
1056                if let PlanNode::Filter {
1057                    input: inner,
1058                    predicate,
1059                } = input.as_ref()
1060                {
1061                    if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1062                        if t == table {
1063                            let schema = self
1064                                .catalog
1065                                .schema(table)
1066                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1067                            let columns: Vec<String> =
1068                                schema.columns.iter().map(|c| c.name.clone()).collect();
1069                            let fast = FastLayout::new(schema);
1070                            if let Some(compiled) =
1071                                compile_predicate(predicate, &columns, &fast, schema)
1072                            {
1073                                // Mission B2: logged variant so every
1074                                // matched rid hits the WAL during the
1075                                // single-pass scan. Structure of the
1076                                // fused scan is unchanged — only the
1077                                // hook closure now also appends.
1078                                let count = self
1079                                    .catalog
1080                                    .scan_delete_matching_logged(table, |data| compiled(data))
1081                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1082                                self.view_registry.mark_dependents_dirty(table);
1083                                return Ok(QueryResult::Modified(count));
1084                            }
1085                        }
1086                    }
1087                } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1088                    if t == table {
1089                        // `delete from T` with no predicate — every live
1090                        // row matches. One pass is still the right shape.
1091                        // Mission B2: logged variant — see above.
1092                        let count = self
1093                            .catalog
1094                            .scan_delete_matching_logged(table, |_| true)
1095                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1096                        self.view_registry.mark_dependents_dirty(table);
1097                        return Ok(QueryResult::Modified(count));
1098                    }
1099                }
1100
1101                let matching_rids = self.collect_rids_for_mutation(input, table)?;
1102                let count = self
1103                    .catalog
1104                    .delete_many(table, &matching_rids)
1105                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1106                self.view_registry.mark_dependents_dirty(table);
1107                Ok(QueryResult::Modified(count))
1108            }
1109
1110            PlanNode::AliasScan { table, alias } => {
1111                // Mission E1.2: scan `table` and rename every output column
1112                // to `alias.field`. Used as a join leaf so downstream
1113                // NestedLoopJoin + Filter + Project nodes can resolve
1114                // `Expr::QualifiedField` lookups by direct column-name match.
1115                //
1116                // We don't bother with a fused zero-copy loop here yet — the
1117                // whole join path is nested-loop and correctness-first
1118                // (Phase E1.3 will introduce hash join and at that point we
1119                // can revisit whether to specialise AliasScan).
1120                let schema = self
1121                    .catalog
1122                    .schema(table)
1123                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1124                    .clone();
1125                let columns: Vec<String> = schema
1126                    .columns
1127                    .iter()
1128                    .map(|c| format!("{alias}.{}", c.name))
1129                    .collect();
1130                let rows: Vec<Vec<Value>> = self
1131                    .catalog
1132                    .scan(table)
1133                    .map_err(|e| QueryError::StorageError(e.to_string()))?
1134                    .map(|(_, row)| row)
1135                    .collect();
1136                Ok(QueryResult::Rows { columns, rows })
1137            }
1138
1139            PlanNode::NestedLoopJoin {
1140                left,
1141                right,
1142                on,
1143                kind,
1144            } => {
1145                // Materialise both sides. The executor ships two strategies:
1146                //   1. Hash join (E1.3) — when the `on` predicate is a
1147                //      simple equi-predicate `left_col = right_col`, build a
1148                //      FxHashMap<Value, Vec<row_idx>> over the right side
1149                //      and probe with the left side. O(L + R) instead of
1150                //      O(L × R). Handles Inner and LeftOuter.
1151                //   2. Nested loop (E1.2) — fallback for Cross, non-equi
1152                //      predicates, or `on` expressions that reference
1153                //      either side with something more complex than a
1154                //      QualifiedField.
1155                let left_result = self.execute_plan(left)?;
1156                let right_result = self.execute_plan(right)?;
1157                let (left_columns, left_rows) = match left_result {
1158                    QueryResult::Rows { columns, rows } => (columns, rows),
1159                    _ => return Err("join left side must produce rows".into()),
1160                };
1161                let (right_columns, right_rows) = match right_result {
1162                    QueryResult::Rows { columns, rows } => (columns, rows),
1163                    _ => return Err("join right side must produce rows".into()),
1164                };
1165
1166                // WS2: byte-budget guard on the join build side. Charge both
1167                // materialized inputs before we build the hash table / probe;
1168                // the output is row-capped by check_join_limit below.
1169                self.charge_rows(&left_rows)?;
1170                self.charge_rows(&right_rows)?;
1171
1172                // Hash-join fast path.
1173                if !matches!(kind, JoinKind::Cross) {
1174                    if let Some(pred) = on {
1175                        if let Some((l_idx, r_idx)) =
1176                            try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1177                        {
1178                            let result = hash_join(
1179                                left_columns,
1180                                left_rows,
1181                                right_columns,
1182                                right_rows,
1183                                l_idx,
1184                                r_idx,
1185                                *kind,
1186                            );
1187                            if let QueryResult::Rows { ref rows, .. } = result {
1188                                check_join_limit(rows.len())?;
1189                            }
1190                            return Ok(result);
1191                        }
1192                    }
1193                }
1194
1195                // Nested-loop fallback.
1196                let n_left = left_columns.len();
1197                let n_right = right_columns.len();
1198                let mut columns = Vec::with_capacity(n_left + n_right);
1199                columns.extend(left_columns);
1200                columns.extend(right_columns);
1201
1202                let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1203                let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1204
1205                for left_row in &left_rows {
1206                    let mut matched = false;
1207                    for right_row in &right_rows {
1208                        combined.clear();
1209                        combined.extend_from_slice(left_row);
1210                        combined.extend_from_slice(right_row);
1211                        let keep = match kind {
1212                            JoinKind::Cross => true,
1213                            JoinKind::Inner | JoinKind::LeftOuter => match on {
1214                                Some(pred) => eval_predicate(pred, &combined, &columns),
1215                                // Missing `on` for non-cross joins is a
1216                                // parser error, but if it slips through we
1217                                // treat it as "match everything".
1218                                None => true,
1219                            },
1220                            // RightOuter is rewritten to LeftOuter by the
1221                            // planner, so we never see it here.
1222                            JoinKind::RightOuter => {
1223                                unreachable!("planner rewrites RightOuter to LeftOuter")
1224                            }
1225                        };
1226                        if keep {
1227                            rows.push(combined.clone());
1228                            check_join_limit(rows.len())?;
1229                            matched = true;
1230                        }
1231                    }
1232                    if !matched && matches!(kind, JoinKind::LeftOuter) {
1233                        let mut row = Vec::with_capacity(n_left + n_right);
1234                        row.extend_from_slice(left_row);
1235                        row.resize(n_left + n_right, Value::Empty);
1236                        rows.push(row);
1237                        check_join_limit(rows.len())?;
1238                    }
1239                }
1240
1241                Ok(QueryResult::Rows { columns, rows })
1242            }
1243
1244            PlanNode::Distinct { input } => {
1245                let result = self.execute_plan(input)?;
1246                match result {
1247                    QueryResult::Rows { columns, rows } => {
1248                        let mut seen = std::collections::HashSet::new();
1249                        let mut unique_rows = Vec::new();
1250                        for row in rows {
1251                            if seen.insert(row.clone()) {
1252                                unique_rows.push(row);
1253                            }
1254                        }
1255                        Ok(QueryResult::Rows {
1256                            columns,
1257                            rows: unique_rows,
1258                        })
1259                    }
1260                    other => Ok(other),
1261                }
1262            }
1263
1264            PlanNode::GroupBy {
1265                input,
1266                keys,
1267                aggregates,
1268                having,
1269            } => {
1270                let result = self.execute_plan(input)?;
1271                match result {
1272                    QueryResult::Rows { columns, rows } => {
1273                        // WS2: byte-budget guard on the GROUP BY input buffer
1274                        // (the hash table is bounded by the input it groups).
1275                        self.charge_rows(&rows)?;
1276                        // Resolve key column indices.
1277                        let key_indices: Vec<usize> = keys
1278                            .iter()
1279                            .map(|k| {
1280                                columns
1281                                    .iter()
1282                                    .position(|c| c == k)
1283                                    .ok_or_else(|| format!("group-by column '{k}' not found"))
1284                            })
1285                            .collect::<Result<Vec<_>, _>>()?;
1286
1287                        // Resolve aggregate field indices. count(*) uses
1288                        // sentinel usize::MAX — compute_group_aggregate
1289                        // treats it as "count all rows in the group".
1290                        let agg_field_indices: Vec<usize> = aggregates
1291                            .iter()
1292                            .map(|a| {
1293                                if a.field == "*" {
1294                                    Ok(usize::MAX)
1295                                } else {
1296                                    columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1297                                        format!("aggregate column '{}' not found", a.field)
1298                                    })
1299                                }
1300                            })
1301                            .collect::<Result<Vec<_>, _>>()?;
1302
1303                        // Group rows by key values (preserving insertion order).
1304                        let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1305                            rustc_hash::FxHashMap::default();
1306                        let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1307                        for (ri, row) in rows.iter().enumerate() {
1308                            let key: Vec<Value> =
1309                                key_indices.iter().map(|&i| row[i].clone()).collect();
1310                            match group_map.get(&key) {
1311                                Some(&idx) => groups[idx].1.push(ri),
1312                                None => {
1313                                    let idx = groups.len();
1314                                    group_map.insert(key.clone(), idx);
1315                                    groups.push((key, vec![ri]));
1316                                }
1317                            }
1318                        }
1319
1320                        // Build output column names: keys ++ aggregate output names.
1321                        let mut out_columns: Vec<String> = keys.clone();
1322                        for agg in aggregates.iter() {
1323                            out_columns.push(agg.output_name.clone());
1324                        }
1325
1326                        // Compute aggregates per group.
1327                        let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1328                        for (key_vals, row_indices) in &groups {
1329                            let mut row = key_vals.clone();
1330                            for (ai, agg) in aggregates.iter().enumerate() {
1331                                let col_idx = agg_field_indices[ai];
1332                                let val = compute_group_aggregate(
1333                                    agg.function,
1334                                    &rows,
1335                                    row_indices,
1336                                    col_idx,
1337                                );
1338                                row.push(val);
1339                            }
1340                            out_rows.push(row);
1341                        }
1342
1343                        // Apply HAVING filter.
1344                        if let Some(having_expr) = having {
1345                            out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1346                        }
1347
1348                        Ok(QueryResult::Rows {
1349                            columns: out_columns,
1350                            rows: out_rows,
1351                        })
1352                    }
1353                    _ => Err("group by requires row input".into()),
1354                }
1355            }
1356
1357            PlanNode::CreateTable { name, fields } => {
1358                let columns: Vec<ColumnDef> = fields
1359                    .iter()
1360                    .enumerate()
1361                    .map(
1362                        |(i, (fname, tname, req))| -> Result<ColumnDef, QueryError> {
1363                            Ok(ColumnDef {
1364                                name: fname.clone(),
1365                                type_id: type_name_to_id(tname).map_err(QueryError::TypeError)?,
1366                                required: *req,
1367                                position: i as u16,
1368                            })
1369                        },
1370                    )
1371                    .collect::<Result<Vec<_>, _>>()?;
1372                let schema = Schema {
1373                    table_name: name.clone(),
1374                    columns,
1375                };
1376                self.catalog
1377                    .create_table(schema)
1378                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1379                Ok(QueryResult::Created(name.clone()))
1380            }
1381
1382            PlanNode::AlterTable { table, action } => match action {
1383                AlterAction::AddColumn {
1384                    name,
1385                    type_name,
1386                    required,
1387                } => {
1388                    let position = self
1389                        .catalog
1390                        .schema(table)
1391                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1392                        .columns
1393                        .len() as u16;
1394                    let col = ColumnDef {
1395                        name: name.clone(),
1396                        type_id: type_name_to_id(type_name).map_err(QueryError::TypeError)?,
1397                        required: *required,
1398                        position,
1399                    };
1400                    self.catalog
1401                        .alter_table_add_column(table, col)
1402                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1403                    Ok(QueryResult::Executed {
1404                        message: format!("column '{name}' added to '{table}'"),
1405                    })
1406                }
1407                AlterAction::DropColumn { name } => {
1408                    self.catalog
1409                        .alter_table_drop_column(table, name)
1410                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1411                    Ok(QueryResult::Executed {
1412                        message: format!("column '{name}' dropped from '{table}'"),
1413                    })
1414                }
1415                AlterAction::AddIndex { column } => {
1416                    self.catalog
1417                        .create_index(table, column)
1418                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1419                    Ok(QueryResult::Executed {
1420                        message: format!("index on '{table}.{column}' created"),
1421                    })
1422                }
1423            },
1424
1425            PlanNode::DropTable { name } => {
1426                self.catalog
1427                    .drop_table(name)
1428                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1429                Ok(QueryResult::Executed {
1430                    message: format!("table '{name}' dropped"),
1431                })
1432            }
1433
1434            PlanNode::CreateView { name, query_text } => {
1435                self.create_view(name, query_text)?;
1436                Ok(QueryResult::Executed {
1437                    message: format!("materialized view '{name}' created"),
1438                })
1439            }
1440
1441            PlanNode::RefreshView { name } => {
1442                self.refresh_view(name)?;
1443                Ok(QueryResult::Executed {
1444                    message: format!("materialized view '{name}' refreshed"),
1445                })
1446            }
1447
1448            PlanNode::DropView { name } => {
1449                self.drop_view(name)?;
1450                Ok(QueryResult::Executed {
1451                    message: format!("materialized view '{name}' dropped"),
1452                })
1453            }
1454
1455            PlanNode::Window { input, windows } => {
1456                let result = self.execute_plan(input)?;
1457                execute_window(result, windows)
1458            }
1459
1460            PlanNode::Union { left, right, all } => {
1461                let left_result = self.execute_plan(left)?;
1462                let right_result = self.execute_plan(right)?;
1463                let (left_cols, left_rows) = match left_result {
1464                    QueryResult::Rows { columns, rows } => (columns, rows),
1465                    _ => return Err("UNION requires query results on left side".into()),
1466                };
1467                let (_, right_rows) = match right_result {
1468                    QueryResult::Rows { columns, rows } => (columns, rows),
1469                    _ => return Err("UNION requires query results on right side".into()),
1470                };
1471                let mut combined = left_rows;
1472                if *all {
1473                    // UNION ALL — just concatenate.
1474                    combined.extend(right_rows);
1475                } else {
1476                    // UNION — deduplicate using the same HashSet approach
1477                    // as DISTINCT. Value already implements Hash + Eq.
1478                    let mut seen = std::collections::HashSet::new();
1479                    for row in &combined {
1480                        seen.insert(row.clone());
1481                    }
1482                    for row in right_rows {
1483                        if seen.insert(row.clone()) {
1484                            combined.push(row);
1485                        }
1486                    }
1487                }
1488                Ok(QueryResult::Rows {
1489                    columns: left_cols,
1490                    rows: combined,
1491                })
1492            }
1493
1494            PlanNode::Explain { input } => {
1495                let text = format_plan_tree(input, 0);
1496                Ok(QueryResult::Rows {
1497                    columns: vec!["plan".to_string()],
1498                    rows: text
1499                        .lines()
1500                        .map(|line| vec![Value::Str(line.to_string())])
1501                        .collect(),
1502                })
1503            }
1504
1505            PlanNode::Begin => {
1506                if self.in_transaction {
1507                    return Err(QueryError::Execution(
1508                        "already in a transaction (nested transactions not supported)".into(),
1509                    ));
1510                }
1511                self.in_transaction = true;
1512                Ok(QueryResult::Executed {
1513                    message: "transaction started".to_string(),
1514                })
1515            }
1516
1517            PlanNode::Commit => {
1518                if !self.in_transaction {
1519                    return Err(QueryError::Execution(
1520                        "no active transaction to commit".into(),
1521                    ));
1522                }
1523                self.in_transaction = false;
1524                self.catalog
1525                    .sync_wal()
1526                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1527                Ok(QueryResult::Executed {
1528                    message: "transaction committed".to_string(),
1529                })
1530            }
1531
1532            PlanNode::Rollback => {
1533                if !self.in_transaction {
1534                    return Err(QueryError::Execution(
1535                        "no active transaction to roll back".into(),
1536                    ));
1537                }
1538                self.in_transaction = false;
1539                self.catalog
1540                    .rollback_to_last_sync()
1541                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1542                if let Ok(mut cache) = self.plan_cache.lock() {
1543                    cache.clear();
1544                }
1545                self.view_registry = ViewRegistry::open(self.catalog.data_dir())
1546                    .unwrap_or_else(|_| ViewRegistry::new(self.catalog.data_dir()));
1547                Ok(QueryResult::Executed {
1548                    message: "transaction rolled back".to_string(),
1549                })
1550            }
1551
1552            PlanNode::IndexScan { table, column, key } => {
1553                let key_value = literal_to_value(key)?;
1554                let tbl = self
1555                    .catalog
1556                    .get_table(table)
1557                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1558                let columns: Vec<String> =
1559                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1560
1561                // Fast path: the table has a B-tree on this column.
1562                // Uses index_lookup_all to return ALL matching rows for
1563                // both unique and non-unique indexes.
1564                if tbl.has_index(column) {
1565                    let rids = tbl.index_lookup_all(column, &key_value);
1566                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1567                    for rid in rids {
1568                        if let Some(data) = tbl.heap.get(rid) {
1569                            rows.push(decode_row(&tbl.schema, &data));
1570                        }
1571                    }
1572                    return Ok(QueryResult::Rows { columns, rows });
1573                }
1574
1575                // Fallback: no index on this column. The planner emits IndexScan
1576                // eagerly (it has no visibility into which columns are indexed
1577                // at plan time), so here we must behave like SeqScan+Filter on
1578                // `.col = literal`: return *all* matching rows, not just the
1579                // first one. A non-indexed column isn't necessarily unique.
1580                // We compile the eq predicate once and stream without any
1581                // per-row decode for non-matching rows.
1582                let schema = &tbl.schema;
1583                let fast = FastLayout::new(schema);
1584                let synth_pred = Expr::BinaryOp(
1585                    Box::new(Expr::Field(column.clone())),
1586                    BinOp::Eq,
1587                    Box::new(key.clone()),
1588                );
1589                if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1590                    // Mission F: skip the first 4 Vec doublings.
1591                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1592                    self.catalog
1593                        .for_each_row_raw(table, |_rid, data| {
1594                            if compiled(data) {
1595                                rows.push(decode_row(schema, data));
1596                            }
1597                        })
1598                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1599                    return Ok(QueryResult::Rows { columns, rows });
1600                }
1601
1602                // Last resort: slow eq-check on materialised rows.
1603                let col_idx =
1604                    schema
1605                        .column_index(column)
1606                        .ok_or_else(|| QueryError::ColumnNotFound {
1607                            table: String::new(),
1608                            column: column.clone(),
1609                        })?;
1610                let rows: Vec<Vec<Value>> = tbl
1611                    .scan()
1612                    .filter_map(|(_, row)| {
1613                        if row[col_idx] == key_value {
1614                            Some(row)
1615                        } else {
1616                            None
1617                        }
1618                    })
1619                    .collect();
1620                Ok(QueryResult::Rows { columns, rows })
1621            }
1622
1623            PlanNode::RangeScan {
1624                table,
1625                column,
1626                start,
1627                end,
1628            } => {
1629                let tbl = self
1630                    .catalog
1631                    .get_table(table)
1632                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1633                let columns: Vec<String> =
1634                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1635                let schema = &tbl.schema;
1636
1637                let start_val = match start {
1638                    Some((expr, _)) => Some(literal_to_value(expr)?),
1639                    None => None,
1640                };
1641                let end_val = match end {
1642                    Some((expr, _)) => Some(literal_to_value(expr)?),
1643                    None => None,
1644                };
1645                let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1646                let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1647
1648                // Range scans only use the btree fast path for unique indexes,
1649                // because non-unique indexes store composite keys (column_value
1650                // + RowId) that don't directly compare against raw column values.
1651                if tbl.is_index_unique(column) == Some(true) {
1652                    if let Some(btree) = tbl.index(column) {
1653                        let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1654                            (Some(s), Some(e)) => btree.range(s, e).collect(),
1655                            (Some(s), None) => btree.range_from(s),
1656                            (None, Some(e)) => btree.range_to(e),
1657                            (None, None) => {
1658                                let rows: Vec<Vec<Value>> =
1659                                    tbl.scan().map(|(_, row)| row).collect();
1660                                return Ok(QueryResult::Rows { columns, rows });
1661                            }
1662                        };
1663                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1664                        for (key, rid) in hits {
1665                            if !start_inclusive {
1666                                if let Some(ref s) = start_val {
1667                                    if &key == s {
1668                                        continue;
1669                                    }
1670                                }
1671                            }
1672                            if !end_inclusive {
1673                                if let Some(ref e) = end_val {
1674                                    if &key == e {
1675                                        continue;
1676                                    }
1677                                }
1678                            }
1679                            if let Some(data) = tbl.heap.get(rid) {
1680                                rows.push(decode_row(schema, &data));
1681                            }
1682                        }
1683                        return Ok(QueryResult::Rows { columns, rows });
1684                    }
1685                }
1686
1687                // Fallback: no index — synthesize range predicate and scan.
1688                let fast = FastLayout::new(schema);
1689                let synth = synthesize_range_predicate(column, start, end);
1690                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1691                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1692                    self.catalog
1693                        .for_each_row_raw(table, |_rid, data| {
1694                            if compiled(data) {
1695                                rows.push(decode_row(schema, data));
1696                            }
1697                        })
1698                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1699                    return Ok(QueryResult::Rows { columns, rows });
1700                }
1701
1702                let col_idx =
1703                    schema
1704                        .column_index(column)
1705                        .ok_or_else(|| QueryError::ColumnNotFound {
1706                            table: String::new(),
1707                            column: column.clone(),
1708                        })?;
1709                let rows: Vec<Vec<Value>> = tbl
1710                    .scan()
1711                    .filter(|(_, row)| {
1712                        range_matches(
1713                            &row[col_idx],
1714                            &start_val,
1715                            start_inclusive,
1716                            &end_val,
1717                            end_inclusive,
1718                        )
1719                    })
1720                    .map(|(_, row)| row)
1721                    .collect();
1722                Ok(QueryResult::Rows { columns, rows })
1723            }
1724        }
1725    }
1726
1727    // ─── Materialized view operations ──────────────────────────────────────
1728
1729    /// Create a materialized view: execute the source query, store results
1730    /// in a new backing table, and register the view.
1731    fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1732        if self.view_registry.is_view(name) {
1733            return Err(QueryError::ViewError(format!(
1734                "materialized view '{name}' already exists"
1735            )));
1736        }
1737        // Execute the source query to get the result set.
1738        let result = self.execute_powql(query_text)?;
1739        let (columns, rows) = match result {
1740            QueryResult::Rows { columns, rows } => (columns, rows),
1741            _ => return Err("view source query must be a SELECT".into()),
1742        };
1743        // Derive a schema for the backing table from the query result columns.
1744        let schema = self.derive_view_schema(name, &columns, &rows);
1745        // Create the backing table and insert the result rows.
1746        self.catalog
1747            .create_table(schema)
1748            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1749        for row in &rows {
1750            self.catalog
1751                .insert(name, row)
1752                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1753        }
1754        // Determine which base tables this view depends on by parsing the query.
1755        let depends_on = self.extract_view_deps(query_text);
1756        self.view_registry
1757            .register(ViewDef {
1758                name: name.to_string(),
1759                query: query_text.to_string(),
1760                depends_on,
1761                dirty: false,
1762            })
1763            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1764        Ok(())
1765    }
1766
1767    /// Refresh a materialized view: re-execute its source query and replace
1768    /// the backing table's contents.
1769    fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1770        let def = self
1771            .view_registry
1772            .get(name)
1773            .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1774        let query_text = def.query.clone();
1775        // Execute the source query.
1776        let result = self.execute_powql(&query_text)?;
1777        let (_columns, rows) = match result {
1778            QueryResult::Rows { columns, rows } => (columns, rows),
1779            _ => return Err("view source query must be a SELECT".into()),
1780        };
1781        // Clear old data and insert fresh results. Mission B2: logged
1782        // variant — view refreshes are a mutation and crash recovery
1783        // must see them.
1784        self.catalog
1785            .scan_delete_matching_logged(name, |_| true)
1786            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1787        for row in &rows {
1788            self.catalog
1789                .insert(name, row)
1790                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1791        }
1792        self.view_registry.mark_clean(name);
1793        Ok(())
1794    }
1795
1796    /// Drop a materialized view: remove the backing table and unregister.
1797    fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1798        if !self.view_registry.is_view(name) {
1799            return Err(QueryError::ViewError(format!(
1800                "materialized view '{name}' not found"
1801            )));
1802        }
1803        self.view_registry
1804            .unregister(name)
1805            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1806        self.catalog
1807            .drop_table(name)
1808            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1809        Ok(())
1810    }
1811
1812    /// Derive a storage `Schema` for a view's backing table from query
1813    /// result column names and the first row's types.
1814    fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1815        use powdb_storage::types::{ColumnDef, TypeId};
1816        let cols: Vec<ColumnDef> = columns
1817            .iter()
1818            .enumerate()
1819            .map(|(i, col_name)| {
1820                let type_id = rows
1821                    .first()
1822                    .and_then(|row| row.get(i))
1823                    .map(|v| v.type_id())
1824                    .unwrap_or(TypeId::Str);
1825                ColumnDef {
1826                    name: col_name.clone(),
1827                    type_id,
1828                    required: false,
1829                    position: i as u16,
1830                }
1831            })
1832            .collect();
1833        Schema {
1834            table_name: name.to_string(),
1835            columns: cols,
1836        }
1837    }
1838
1839    /// Extract base table dependencies from a view's source query by
1840    /// parsing it and collecting the source table name.
1841    fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1842        use crate::parser::parse;
1843        match parse(query_text) {
1844            Ok(Statement::Query(q)) => {
1845                let mut deps = vec![q.source.clone()];
1846                for j in &q.joins {
1847                    deps.push(j.source.clone());
1848                }
1849                deps
1850            }
1851            _ => Vec::new(),
1852        }
1853    }
1854
1855    // ─── Specialized fast paths ─────────────────────────────────────────────
1856    //
1857    // These methods are helpers for the `execute_plan` match arms above.
1858    // Each returns `Ok(Some(result))` when the fast path fires, `Ok(None)`
1859    // when the shape isn't supported (caller falls back to generic code).
1860
1861    /// Aggregate sum/avg/min/max over a single fixed-size i64 column, with
1862    /// an optional compiled filter predicate. Walks raw row bytes — zero
1863    /// per-row allocation. Uses i128 accumulator for sum/avg overflow safety.
1864    pub(super) fn agg_single_col_fast(
1865        &self,
1866        table: &str,
1867        col: &str,
1868        function: AggFunc,
1869        predicate: Option<&Expr>,
1870    ) -> Result<Option<QueryResult>, QueryError> {
1871        let schema = self
1872            .catalog
1873            .schema(table)
1874            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1875            .clone();
1876        let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1877        let col_idx = match schema.column_index(col) {
1878            Some(i) => i,
1879            None => return Ok(None),
1880        };
1881        // Only fast-path fixed-size numeric columns (Int/Float) for
1882        // sum/avg/min/max/count. Mission D10: Float parity — prior version
1883        // bailed on Float columns, forcing them through the generic row-
1884        // decoding path that allocated a Vec<Value> per row and dispatched
1885        // on Value::cmp for every compare. f64 decode is structurally the
1886        // same as i64 (load 8 bytes, cast), so the fast path handles both.
1887        let col_type = schema.columns[col_idx].type_id;
1888        if col_type != TypeId::Int && col_type != TypeId::Float {
1889            return Ok(None);
1890        }
1891
1892        let fast = FastLayout::new(&schema);
1893        // Mission C Phase 20b: inline the numeric-column reader instead of
1894        // building a `Box<dyn Fn>`. Eliminates 100K vtable dispatches per
1895        // 100K-row agg scan — every reader call folds directly into the
1896        // hot loop below.
1897        let byte_offset = match fast.fixed_offsets[col_idx] {
1898            Some(o) => o,
1899            None => return Ok(None),
1900        };
1901        let bitmap_byte = col_idx / 8;
1902        let bitmap_bit = (col_idx % 8) as u32;
1903        let data_offset = 2 + fast.bitmap_size + byte_offset;
1904
1905        // Optional compiled filter.
1906        let compiled_pred: Option<CompiledPredicate> = match predicate {
1907            Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
1908                Some(c) => Some(c),
1909                None => return Ok(None), // let generic path handle it
1910            },
1911            None => None,
1912        };
1913
1914        // Mission C Phase 20b: specialize the inner loop per aggregate
1915        // function. The previous version ran a `match function { ... }`
1916        // *inside* the closure, which kept LLVM from producing optimal
1917        // scalar code for each variant (agg_max regressed ~23% vs the
1918        // baseline Box<dyn Fn> version even though per-row vtable cost
1919        // should have been strictly lower). Pushing the match out of the
1920        // hot loop lets each specialized body fold cleanly into
1921        // `for_each_row_raw` and removes a captured `AggFunc` + match
1922        // dispatch per row.
1923        //
1924        // Mission D10: same specialisation applies to the Float branch.
1925        // For Min/Max we use `f64::total_cmp` so the result matches
1926        // `Value::Ord` — this is the same ordering ORDER BY and the
1927        // top-N sort fast path use, keeping semantics consistent across
1928        // read paths (NaN compares as greatest, -0.0 < +0.0 for
1929        // deterministic tie-breaking).
1930        //
1931        // Mission D11 Phase 1: each inner loop now splits on presence of
1932        // a predicate (`if let Some(pred) = &compiled_pred`) so the hot
1933        // body never re-tests `Option` per row, and reads column bytes
1934        // via `read_i64_unchecked` / `read_f64_unchecked` helpers that
1935        // drop two bounds checks per row (null bitmap byte + value
1936        // slice). Safety is carried by the `FastLayout` invariant that
1937        // `data_offset + 8 <= row_len` for any fixed-size column; see
1938        // the helper doc comments. Hot loops are macro-generated so the
1939        // with-pred / no-pred split can't drift between variants.
1940        let result = match col_type {
1941            TypeId::Int => match function {
1942                AggFunc::Sum | AggFunc::Avg => {
1943                    let mut sum_i128: i128 = 0;
1944                    let mut count: i64 = 0;
1945                    agg_int_loop!(
1946                        self,
1947                        table,
1948                        compiled_pred,
1949                        bitmap_byte,
1950                        bitmap_bit,
1951                        data_offset,
1952                        |v: i64| {
1953                            count += 1;
1954                            sum_i128 += v as i128;
1955                        }
1956                    );
1957                    if matches!(function, AggFunc::Sum) {
1958                        let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
1959                        QueryResult::Scalar(Value::Int(clamped))
1960                    } else if count == 0 {
1961                        QueryResult::Scalar(Value::Empty)
1962                    } else {
1963                        let avg = (sum_i128 as f64) / (count as f64);
1964                        QueryResult::Scalar(Value::Float(avg))
1965                    }
1966                }
1967                AggFunc::Min => {
1968                    let mut min_v: Option<i64> = None;
1969                    agg_int_loop!(
1970                        self,
1971                        table,
1972                        compiled_pred,
1973                        bitmap_byte,
1974                        bitmap_bit,
1975                        data_offset,
1976                        |v: i64| {
1977                            min_v = Some(match min_v {
1978                                Some(m) => m.min(v),
1979                                None => v,
1980                            });
1981                        }
1982                    );
1983                    QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
1984                }
1985                AggFunc::Max => {
1986                    let mut max_v: Option<i64> = None;
1987                    agg_int_loop!(
1988                        self,
1989                        table,
1990                        compiled_pred,
1991                        bitmap_byte,
1992                        bitmap_bit,
1993                        data_offset,
1994                        |v: i64| {
1995                            max_v = Some(match max_v {
1996                                Some(m) => m.max(v),
1997                                None => v,
1998                            });
1999                        }
2000                    );
2001                    QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
2002                }
2003                AggFunc::Count => {
2004                    let mut count: i64 = 0;
2005                    agg_int_loop!(
2006                        self,
2007                        table,
2008                        compiled_pred,
2009                        bitmap_byte,
2010                        bitmap_bit,
2011                        data_offset,
2012                        |_v: i64| {
2013                            count += 1;
2014                        }
2015                    );
2016                    QueryResult::Scalar(Value::Int(count))
2017                }
2018                AggFunc::CountDistinct => {
2019                    let mut seen = rustc_hash::FxHashSet::default();
2020                    agg_int_loop!(
2021                        self,
2022                        table,
2023                        compiled_pred,
2024                        bitmap_byte,
2025                        bitmap_bit,
2026                        data_offset,
2027                        |v: i64| {
2028                            seen.insert(v);
2029                        }
2030                    );
2031                    QueryResult::Scalar(Value::Int(seen.len() as i64))
2032                }
2033            },
2034            TypeId::Float => match function {
2035                AggFunc::Sum => {
2036                    // Use a single f64 accumulator. Naive summation is
2037                    // sufficient for MVP parity; if precision becomes an
2038                    // issue on long scans we can upgrade to Kahan–Neumaier
2039                    // compensated sum (~2x scalar cost, zero error growth).
2040                    let mut sum: f64 = 0.0;
2041                    agg_float_loop!(
2042                        self,
2043                        table,
2044                        compiled_pred,
2045                        bitmap_byte,
2046                        bitmap_bit,
2047                        data_offset,
2048                        |v: f64| {
2049                            sum += v;
2050                        }
2051                    );
2052                    QueryResult::Scalar(Value::Float(sum))
2053                }
2054                AggFunc::Avg => {
2055                    let mut sum: f64 = 0.0;
2056                    let mut count: i64 = 0;
2057                    agg_float_loop!(
2058                        self,
2059                        table,
2060                        compiled_pred,
2061                        bitmap_byte,
2062                        bitmap_bit,
2063                        data_offset,
2064                        |v: f64| {
2065                            sum += v;
2066                            count += 1;
2067                        }
2068                    );
2069                    if count == 0 {
2070                        QueryResult::Scalar(Value::Empty)
2071                    } else {
2072                        QueryResult::Scalar(Value::Float(sum / count as f64))
2073                    }
2074                }
2075                AggFunc::Min => {
2076                    // `total_cmp` for deterministic NaN handling (matches
2077                    // Value::Ord). NaN compares greatest, so Min will
2078                    // correctly ignore it in favour of any finite value.
2079                    let mut min_v: Option<f64> = None;
2080                    agg_float_loop!(
2081                        self,
2082                        table,
2083                        compiled_pred,
2084                        bitmap_byte,
2085                        bitmap_bit,
2086                        data_offset,
2087                        |v: f64| {
2088                            min_v = Some(match min_v {
2089                                Some(m) => {
2090                                    if v.total_cmp(&m).is_lt() {
2091                                        v
2092                                    } else {
2093                                        m
2094                                    }
2095                                }
2096                                None => v,
2097                            });
2098                        }
2099                    );
2100                    QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2101                }
2102                AggFunc::Max => {
2103                    let mut max_v: Option<f64> = None;
2104                    agg_float_loop!(
2105                        self,
2106                        table,
2107                        compiled_pred,
2108                        bitmap_byte,
2109                        bitmap_bit,
2110                        data_offset,
2111                        |v: f64| {
2112                            max_v = Some(match max_v {
2113                                Some(m) => {
2114                                    if v.total_cmp(&m).is_gt() {
2115                                        v
2116                                    } else {
2117                                        m
2118                                    }
2119                                }
2120                                None => v,
2121                            });
2122                        }
2123                    );
2124                    QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2125                }
2126                AggFunc::Count => {
2127                    let mut count: i64 = 0;
2128                    agg_float_loop!(
2129                        self,
2130                        table,
2131                        compiled_pred,
2132                        bitmap_byte,
2133                        bitmap_bit,
2134                        data_offset,
2135                        |_v: f64| {
2136                            count += 1;
2137                        }
2138                    );
2139                    QueryResult::Scalar(Value::Int(count))
2140                }
2141                AggFunc::CountDistinct => {
2142                    // Hash on `f64::to_bits` — matches `Value::Hash`, so
2143                    // distinct NaN bit patterns count as distinct and
2144                    // -0.0/+0.0 count as distinct. Consistent with how
2145                    // Float values are hashed in every other DISTINCT /
2146                    // GROUP BY path.
2147                    let mut seen = rustc_hash::FxHashSet::default();
2148                    agg_float_loop!(
2149                        self,
2150                        table,
2151                        compiled_pred,
2152                        bitmap_byte,
2153                        bitmap_bit,
2154                        data_offset,
2155                        |v: f64| {
2156                            seen.insert(v.to_bits());
2157                        }
2158                    );
2159                    QueryResult::Scalar(Value::Int(seen.len() as i64))
2160                }
2161            },
2162            _ => unreachable!("type guard above restricts to Int/Float"),
2163        };
2164        Ok(Some(result))
2165    }
2166
2167    /// `Project(Limit(Filter(SeqScan)))` and `Project(Limit(SeqScan))`.
2168    /// Streams rows, decodes only projected columns, stops at the limit.
2169    pub(super) fn project_filter_limit_fast(
2170        &self,
2171        table: &str,
2172        fields: &[ProjectField],
2173        limit: usize,
2174        predicate: Option<&Expr>,
2175    ) -> Result<Option<QueryResult>, QueryError> {
2176        let schema = self
2177            .catalog
2178            .schema(table)
2179            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2180            .clone();
2181        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2182
2183        // Each projection field must be a simple `.field` reference for this
2184        // fast path. Aliased or computed fields fall through.
2185        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2186        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2187        for f in fields {
2188            let name = match &f.expr {
2189                Expr::Field(n) => n.clone(),
2190                _ => return Ok(None),
2191            };
2192            let idx = match all_columns.iter().position(|c| c == &name) {
2193                Some(i) => i,
2194                None => return Ok(None),
2195            };
2196            proj_indices.push(idx);
2197            proj_columns.push(f.alias.clone().unwrap_or(name));
2198        }
2199
2200        let fast = FastLayout::new(&schema);
2201        let row_layout = RowLayout::new(&schema);
2202
2203        let compiled_pred: Option<CompiledPredicate> = match predicate {
2204            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2205                Some(c) => Some(c),
2206                None => return Ok(None),
2207            },
2208            None => None,
2209        };
2210
2211        let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2212        // Mission D2: use try_for_each_row_raw to actually stop iterating
2213        // once the limit is reached. The previous `done` flag only short-
2214        // circuited the closure body, so a `limit 100` over 100K rows still
2215        // walked all 100K slots — burning ~30x SQLite on scan_filter_project_top100.
2216        self.catalog
2217            .try_for_each_row_raw(table, |_rid, data| {
2218                use std::ops::ControlFlow;
2219                if let Some(ref pred) = compiled_pred {
2220                    if !pred(data) {
2221                        return ControlFlow::Continue(());
2222                    }
2223                }
2224                let row: Vec<Value> = proj_indices
2225                    .iter()
2226                    .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2227                    .collect();
2228                out.push(row);
2229                if out.len() >= limit {
2230                    ControlFlow::Break(())
2231                } else {
2232                    ControlFlow::Continue(())
2233                }
2234            })
2235            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2236
2237        Ok(Some(QueryResult::Rows {
2238            columns: proj_columns,
2239            rows: out,
2240        }))
2241    }
2242
2243    /// `Project(Limit(Sort(Filter(SeqScan))))` and `Project(Limit(Sort(SeqScan)))`.
2244    /// Bounded top-N heap over the sort key. Only the sort key needs to be
2245    /// read per row; projected columns are decoded only for the final
2246    /// winning rows when the heap drains.
2247    pub(super) fn project_filter_sort_limit_fast(
2248        &self,
2249        table: &str,
2250        fields: &[ProjectField],
2251        sort_field: &str,
2252        descending: bool,
2253        limit: usize,
2254        predicate: Option<&Expr>,
2255    ) -> Result<Option<QueryResult>, QueryError> {
2256        if limit == 0 {
2257            // Degenerate case — empty result. Let the generic path handle it
2258            // for proper column naming.
2259            return Ok(None);
2260        }
2261        let schema = self
2262            .catalog
2263            .schema(table)
2264            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2265            .clone();
2266        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2267
2268        // Sort key must be a fixed-size numeric column (Int or Float).
2269        // Mission D10: extended from Int-only. Float sort keys use a
2270        // sortable-u64 transform (see `f64_to_sortable_u64`) so the heap
2271        // path stays keyed on `u64` and the whole branch shape is
2272        // identical to the Int case — no new heap types, no `total_cmp`
2273        // closures in the hot loop.
2274        let sort_idx = match schema.column_index(sort_field) {
2275            Some(i) => i,
2276            None => return Ok(None),
2277        };
2278        let sort_col_type = schema.columns[sort_idx].type_id;
2279        if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2280            return Ok(None);
2281        }
2282
2283        // Each projection field must be a simple `.field`.
2284        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2285        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2286        for f in fields {
2287            let name = match &f.expr {
2288                Expr::Field(n) => n.clone(),
2289                _ => return Ok(None),
2290            };
2291            let idx = match all_columns.iter().position(|c| c == &name) {
2292                Some(i) => i,
2293                None => return Ok(None),
2294            };
2295            proj_indices.push(idx);
2296            proj_columns.push(f.alias.clone().unwrap_or(name));
2297        }
2298
2299        let fast = FastLayout::new(&schema);
2300        let row_layout = RowLayout::new(&schema);
2301        // Mission C Phase 20b: inline numeric-column reader (no Box<dyn Fn>).
2302        let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2303            Some(o) => o,
2304            None => return Ok(None),
2305        };
2306        let sort_bitmap_byte = sort_idx / 8;
2307        let sort_bitmap_bit = (sort_idx % 8) as u32;
2308        let sort_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2309
2310        let compiled_pred: Option<CompiledPredicate> = match predicate {
2311            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2312                Some(c) => Some(c),
2313                None => return Ok(None),
2314            },
2315            None => None,
2316        };
2317
2318        // Bounded top-N heap. For `order .x desc limit N`, we want the N
2319        // largest values — use a min-heap so the smallest is at the top and
2320        // can be popped when a better candidate arrives. For ascending, use
2321        // a max-heap. We tie-break with a monotonic `seq` counter so the
2322        // result is deterministic and stable.
2323        //
2324        // To keep this simple we maintain two typed heaps and pick by
2325        // direction.
2326        let drained: Vec<Vec<u8>> = match sort_col_type {
2327            TypeId::Int => {
2328                let mut seq: u64 = 0;
2329                let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2330                    BinaryHeap::with_capacity(limit);
2331                let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2332                    BinaryHeap::with_capacity(limit);
2333
2334                self.catalog
2335                    .for_each_row_raw(table, |_rid, data| {
2336                        if let Some(ref pred) = compiled_pred {
2337                            if !pred(data) {
2338                                return;
2339                            }
2340                        }
2341                        // Inlined int-column reader: null check + i64 decode.
2342                        if data.len() < sort_data_offset + 8 {
2343                            return;
2344                        }
2345                        let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2346                        if is_null {
2347                            return;
2348                        }
2349                        let key = i64::from_le_bytes(
2350                            data[sort_data_offset..sort_data_offset + 8]
2351                                .try_into()
2352                                .unwrap_or_else(|_| unreachable!()),
2353                        );
2354                        let id = seq;
2355                        seq += 1;
2356
2357                        if descending {
2358                            if heap_desc.len() < limit {
2359                                heap_desc.push(Reverse((key, id, data.to_vec())));
2360                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2361                                if key > *top_key {
2362                                    heap_desc.pop();
2363                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2364                                }
2365                            }
2366                        } else if heap_asc.len() < limit {
2367                            heap_asc.push((key, id, data.to_vec()));
2368                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2369                            if key < *top_key {
2370                                heap_asc.pop();
2371                                heap_asc.push((key, id, data.to_vec()));
2372                            }
2373                        }
2374                    })
2375                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2376
2377                let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2378                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2379                } else {
2380                    heap_asc.into_iter().collect()
2381                };
2382                if descending {
2383                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2384                } else {
2385                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2386                }
2387                drained.into_iter().map(|(_, _, d)| d).collect()
2388            }
2389            TypeId::Float => {
2390                // Novel angle: rather than introducing a `TotalF64` newtype
2391                // with `Ord via total_cmp`, transform the f64 bit pattern
2392                // into a sortable `u64` so `BinaryHeap<u64>` orders exactly
2393                // like `f64::total_cmp` would. Classic trick: flip the sign
2394                // bit on positives, flip all bits on negatives. Result:
2395                // - NaN (sign=0) stays greatest, matching total_cmp
2396                // - -0.0 sorts before +0.0, matching total_cmp
2397                // - Hot loop is branch-cheap (one compare + one xor)
2398                let mut seq: u64 = 0;
2399                let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2400                    BinaryHeap::with_capacity(limit);
2401                let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2402                    BinaryHeap::with_capacity(limit);
2403
2404                self.catalog
2405                    .for_each_row_raw(table, |_rid, data| {
2406                        if let Some(ref pred) = compiled_pred {
2407                            if !pred(data) {
2408                                return;
2409                            }
2410                        }
2411                        if data.len() < sort_data_offset + 8 {
2412                            return;
2413                        }
2414                        let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2415                        if is_null {
2416                            return;
2417                        }
2418                        let bits = u64::from_le_bytes(
2419                            data[sort_data_offset..sort_data_offset + 8]
2420                                .try_into()
2421                                .unwrap_or_else(|_| unreachable!()),
2422                        );
2423                        let key = f64_bits_to_sortable_u64(bits);
2424                        let id = seq;
2425                        seq += 1;
2426
2427                        if descending {
2428                            if heap_desc.len() < limit {
2429                                heap_desc.push(Reverse((key, id, data.to_vec())));
2430                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2431                                if key > *top_key {
2432                                    heap_desc.pop();
2433                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2434                                }
2435                            }
2436                        } else if heap_asc.len() < limit {
2437                            heap_asc.push((key, id, data.to_vec()));
2438                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2439                            if key < *top_key {
2440                                heap_asc.pop();
2441                                heap_asc.push((key, id, data.to_vec()));
2442                            }
2443                        }
2444                    })
2445                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2446
2447                let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2448                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2449                } else {
2450                    heap_asc.into_iter().collect()
2451                };
2452                if descending {
2453                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2454                } else {
2455                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2456                }
2457                drained.into_iter().map(|(_, _, d)| d).collect()
2458            }
2459            _ => unreachable!("type guard above restricts to Int/Float"),
2460        };
2461
2462        let rows: Vec<Vec<Value>> = drained
2463            .into_iter()
2464            .map(|data| {
2465                proj_indices
2466                    .iter()
2467                    .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2468                    .collect()
2469            })
2470            .collect();
2471
2472        Ok(Some(QueryResult::Rows {
2473            columns: proj_columns,
2474            rows,
2475        }))
2476    }
2477
2478    /// Gather the RowIds that a mutation should operate on, without
2479    /// materialising the full row set. Handles the shapes the planner emits
2480    /// for update/delete: SeqScan, IndexScan, and Filter(SeqScan). Other
2481    /// shapes fall back to `generic_rid_match`.
2482    ///
2483    /// Perf sprint: try to fuse the predicate evaluation and in-place
2484    /// byte-level mutation into a single heap walk. Returns `Some(result)`
2485    /// if the fused path fired, `None` to fall through to the generic
2486    /// two-pass code.
2487    ///
2488    /// Covers two shapes:
2489    /// 1. Fixed-width non-null literal assignments on non-indexed columns
2490    ///    → byte-patch every matched row in place (row length unchanged).
2491    /// 2. Single var-col literal assignment on a non-indexed column
2492    ///    → `patch_var_column_in_place` on every matched row (may shrink);
2493    ///    rows that can't be patched in place are collected for fallback.
2494    fn try_fused_scan_update(
2495        &mut self,
2496        table: &str,
2497        predicate: &Expr,
2498        resolved: &[(usize, Value)],
2499        changed_cols: &[usize],
2500    ) -> Option<Result<QueryResult, QueryError>> {
2501        // Build compiled predicate. Requires a schema borrow that must be
2502        // dropped before we call scan_patch_matching_logged.
2503        let compiled = {
2504            let schema = self.catalog.schema(table)?;
2505            let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2506            let fast = FastLayout::new(schema);
2507            compile_predicate(predicate, &columns, &fast, schema)?
2508        };
2509
2510        // ── Path 1: fixed-width fast patch ──────────────────────────
2511        let fixed_patches: Option<Vec<FastPatch>> = {
2512            let tbl = self.catalog.get_table(table)?;
2513            let schema = &tbl.schema;
2514            let all_fixed_nonnull = resolved
2515                .iter()
2516                .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2517            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2518            if all_fixed_nonnull && no_indexed {
2519                let layout = RowLayout::new(schema);
2520                let bitmap_size = layout.bitmap_size();
2521                Some(
2522                    resolved
2523                        .iter()
2524                        .map(|(idx, val)| {
2525                            let fixed_off = layout
2526                                .fixed_offset(*idx)
2527                                .expect("is_fixed_size already checked");
2528                            let field_off = 2 + bitmap_size + fixed_off;
2529                            let bytes: FixedBytes = match val {
2530                                Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2531                                Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2532                                Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2533                                Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2534                                Value::Uuid(v) => FixedBytes::Uuid(*v),
2535                                _ => unreachable!("all_fixed_nonnull guard"),
2536                            };
2537                            FastPatch {
2538                                field_off,
2539                                bitmap_byte_off: 2 + idx / 8,
2540                                bit_mask: 1u8 << (idx % 8),
2541                                bytes,
2542                            }
2543                        })
2544                        .collect(),
2545                )
2546            } else {
2547                None
2548            }
2549        };
2550        if let Some(patches) = fixed_patches {
2551            let result = self
2552                .catalog
2553                .scan_patch_matching_logged(table, compiled, |row| {
2554                    for p in &patches {
2555                        row[p.bitmap_byte_off] &= !p.bit_mask;
2556                        let field_bytes = p.bytes.as_slice();
2557                        row[p.field_off..p.field_off + field_bytes.len()]
2558                            .copy_from_slice(field_bytes);
2559                    }
2560                    Some(row.len() as u16)
2561                })
2562                .map_err(|e| e.to_string());
2563            match result {
2564                Ok((count, _)) => {
2565                    self.view_registry.mark_dependents_dirty(table);
2566                    return Some(Ok(QueryResult::Modified(count)));
2567                }
2568                Err(e) => return Some(Err(QueryError::Execution(e))),
2569            }
2570        }
2571
2572        // ── Path 2: single var-col shrink fast patch ────────────────
2573        let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2574            let tbl = self.catalog.get_table(table)?;
2575            let schema = &tbl.schema;
2576            let is_single = resolved.len() == 1;
2577            let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2578            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2579            if is_single && is_var && no_indexed {
2580                let (idx, val) = &resolved[0];
2581                let bytes_opt = match val {
2582                    Value::Str(s) => Some(s.as_bytes().to_vec()),
2583                    Value::Bytes(b) => Some(b.clone()),
2584                    Value::Empty => None,
2585                    _ => return None, // type mismatch, fall through
2586                };
2587                Some((*idx, bytes_opt))
2588            } else {
2589                None
2590            }
2591        };
2592        if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2593            // Build a fresh RowLayout before the mutable borrow.
2594            let layout = {
2595                let schema = self.catalog.schema(table)?;
2596                RowLayout::new(schema)
2597            };
2598            let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2599            let result = self
2600                .catalog
2601                .scan_patch_matching_logged(table, compiled, |row| {
2602                    patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2603                })
2604                .map_err(|e| e.to_string());
2605            match result {
2606                Ok((mut count, fallback_rids)) => {
2607                    // Handle rows where in-place patch failed (new > old).
2608                    for rid in fallback_rids {
2609                        let mut row = match self.catalog.get(table, rid) {
2610                            Some(r) => r,
2611                            None => continue,
2612                        };
2613                        for (idx, val) in resolved.iter() {
2614                            row[*idx] = val.clone();
2615                        }
2616                        self.catalog
2617                            .update_hinted(table, rid, &row, Some(changed_cols))
2618                            .map_err(|e| e.to_string())
2619                            .ok();
2620                        count += 1;
2621                    }
2622                    self.view_registry.mark_dependents_dirty(table);
2623                    return Some(Ok(QueryResult::Modified(count)));
2624                }
2625                Err(e) => return Some(Err(QueryError::Execution(e))),
2626            }
2627        }
2628
2629        None // no fused path applicable — fall through
2630    }
2631
2632    /// Mission C Phase 3: schema is looked up via `self.catalog.schema(table)`
2633    /// inside the branches that actually need it. Previously the caller had
2634    /// to clone the full Schema (6+ String allocs) before every mutation just
2635    /// so this function could borrow it — a cost the update/delete hot path
2636    /// did not need.
2637    fn collect_rids_for_mutation(
2638        &mut self,
2639        input: &PlanNode,
2640        table: &str,
2641    ) -> Result<Vec<RowId>, QueryError> {
2642        match input {
2643            PlanNode::SeqScan { table: t } if t == table => {
2644                // "Update/delete everything" — rare but legal.
2645                let rids: Vec<RowId> = self
2646                    .catalog
2647                    .scan(table)
2648                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2649                    .map(|(rid, _)| rid)
2650                    .collect();
2651                Ok(rids)
2652            }
2653            PlanNode::IndexScan {
2654                table: t,
2655                column,
2656                key,
2657            } if t == table => {
2658                let key_value = literal_to_value(key)?;
2659
2660                // Indexed case: single lookup, 0 or 1 rows.
2661                // Mission D7: int-specialized fast path on int-keyed indexes
2662                // (primary keys, created_at, etc.) — the common case for
2663                // `update_by_pk` / `delete where id = ?`.
2664                //
2665                // Scope the `tbl` borrow so it's released before we fall
2666                // through to the scan-based paths below (which reborrow
2667                // `self.catalog`).
2668                {
2669                    let tbl = self
2670                        .catalog
2671                        .get_table(table)
2672                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2673                    if tbl.has_index(column) {
2674                        let rids = tbl.index_lookup_all(column, &key_value);
2675                        return Ok(rids);
2676                    }
2677                }
2678
2679                // No index: the planner folds `.col = literal` to IndexScan
2680                // regardless of whether the column is actually unique. When
2681                // there's no index we must behave like Filter(SeqScan) and
2682                // return *all* matching RIDs — not just the first one.
2683                let schema = self
2684                    .catalog
2685                    .schema(table)
2686                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2687                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2688                let fast = FastLayout::new(schema);
2689                let synth = Expr::BinaryOp(
2690                    Box::new(Expr::Field(column.clone())),
2691                    BinOp::Eq,
2692                    Box::new(key.clone()),
2693                );
2694                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2695                    // Mission F: skip the first 4 Vec doublings.
2696                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2697                    self.catalog
2698                        .for_each_row_raw(table, |rid, data| {
2699                            if compiled(data) {
2700                                rids.push(rid);
2701                            }
2702                        })
2703                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2704                    return Ok(rids);
2705                }
2706
2707                // Fallback: decode each row, compare values.
2708                let col_idx =
2709                    schema
2710                        .column_index(column)
2711                        .ok_or_else(|| QueryError::ColumnNotFound {
2712                            table: String::new(),
2713                            column: column.clone(),
2714                        })?;
2715                let rids: Vec<RowId> = self
2716                    .catalog
2717                    .scan(table)
2718                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2719                    .filter_map(|(rid, row)| {
2720                        if row[col_idx] == key_value {
2721                            Some(rid)
2722                        } else {
2723                            None
2724                        }
2725                    })
2726                    .collect();
2727                Ok(rids)
2728            }
2729            PlanNode::Filter {
2730                input: inner,
2731                predicate,
2732            } => {
2733                if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2734                    if t != table {
2735                        return self.generic_rid_match(input, table);
2736                    }
2737                    let schema = self
2738                        .catalog
2739                        .schema(table)
2740                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2741                    let columns: Vec<String> =
2742                        schema.columns.iter().map(|c| c.name.clone()).collect();
2743                    let fast = FastLayout::new(schema);
2744                    let row_layout = RowLayout::new(schema);
2745
2746                    // Try compiled predicate first.
2747                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2748                        // Mission F: skip the first 4 Vec doublings.
2749                        let mut rids: Vec<RowId> = Vec::with_capacity(64);
2750                        self.catalog
2751                            .for_each_row_raw(table, |rid, data| {
2752                                if compiled(data) {
2753                                    rids.push(rid);
2754                                }
2755                            })
2756                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2757                        return Ok(rids);
2758                    }
2759
2760                    // Fallback: selective decode + eval.
2761                    let pred_cols = predicate_column_indices(predicate, &columns);
2762                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2763                    self.catalog
2764                        .for_each_row_raw(table, |rid, data| {
2765                            let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2766                            if eval_predicate(predicate, &pred_row, &columns) {
2767                                rids.push(rid);
2768                            }
2769                        })
2770                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2771                    return Ok(rids);
2772                }
2773                self.generic_rid_match(input, table)
2774            }
2775            _ => self.generic_rid_match(input, table),
2776        }
2777    }
2778
2779    /// Last-ditch generic match: execute the plan, collect matching rows,
2780    /// then find corresponding RowIds by value equality. This is the old
2781    /// O(N*M) code path; only used when the plan shape is something exotic.
2782    fn generic_rid_match(
2783        &mut self,
2784        input: &PlanNode,
2785        table: &str,
2786    ) -> Result<Vec<RowId>, QueryError> {
2787        let result = self.execute_plan(input)?;
2788        let rows = match result {
2789            QueryResult::Rows { rows, .. } => rows,
2790            _ => return Err("mutation source must be rows".into()),
2791        };
2792        let matching: Vec<RowId> = self
2793            .catalog
2794            .scan(table)
2795            .map_err(|e| QueryError::StorageError(e.to_string()))?
2796            .filter(|(_, row)| rows.iter().any(|r| r == row))
2797            .map(|(rid, _)| rid)
2798            .collect();
2799        Ok(matching)
2800    }
2801}
2802
2803pub(super) fn execute_window(
2804    result: QueryResult,
2805    windows: &[WindowDef],
2806) -> Result<QueryResult, QueryError> {
2807    let (mut columns, mut rows) = match result {
2808        QueryResult::Rows { columns, rows } => (columns, rows),
2809        _ => return Err("window function requires row input".into()),
2810    };
2811
2812    for wdef in windows {
2813        // Resolve partition/order column indices against current columns.
2814        let part_indices: Vec<usize> = wdef
2815            .partition_by
2816            .iter()
2817            .map(|name| {
2818                columns
2819                    .iter()
2820                    .position(|c| c == name)
2821                    .ok_or_else(|| format!("window partition column '{name}' not found"))
2822            })
2823            .collect::<Result<Vec<_>, _>>()?;
2824
2825        let ord_indices: Vec<(usize, bool)> = wdef
2826            .order_by
2827            .iter()
2828            .map(|sk| {
2829                columns
2830                    .iter()
2831                    .position(|c| c == &sk.field)
2832                    .map(|i| (i, sk.descending))
2833                    .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2834            })
2835            .collect::<Result<Vec<_>, _>>()?;
2836
2837        // Resolve the argument column index (for aggregate windows).
2838        let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2839            match arg {
2840                Expr::Field(name) => {
2841                    if name == "*" {
2842                        None // count(*) style — no specific column
2843                    } else {
2844                        Some(
2845                            columns
2846                                .iter()
2847                                .position(|c| c == name)
2848                                .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2849                        )
2850                    }
2851                }
2852                _ => None,
2853            }
2854        } else {
2855            None
2856        };
2857
2858        // Build a sort-index to sort rows by partition_by then order_by
2859        // without actually reordering the original Vec (we need original
2860        // order to write results back).
2861        let n = rows.len();
2862        let mut indices: Vec<usize> = (0..n).collect();
2863        indices.sort_by(|&a, &b| {
2864            // Compare partition keys first.
2865            for &pi in &part_indices {
2866                let cmp = rows[a][pi].cmp(&rows[b][pi]);
2867                if cmp != std::cmp::Ordering::Equal {
2868                    return cmp;
2869                }
2870            }
2871            // Then order keys.
2872            for &(oi, desc) in &ord_indices {
2873                let cmp = rows[a][oi].cmp(&rows[b][oi]);
2874                if cmp != std::cmp::Ordering::Equal {
2875                    return if desc { cmp.reverse() } else { cmp };
2876                }
2877            }
2878            std::cmp::Ordering::Equal
2879        });
2880
2881        // Compute window values in sorted order, tracking partition boundaries.
2882        let mut win_values: Vec<Value> = vec![Value::Empty; n];
2883        let mut partition_start = 0usize;
2884        // Running state for aggregate windows:
2885        let mut running_count: i64 = 0;
2886        let mut running_int_sum: i64 = 0;
2887        let mut running_float_sum: f64 = 0.0;
2888        let mut running_saw_float = false;
2889        let mut running_min: Option<Value> = None;
2890        let mut running_max: Option<Value> = None;
2891        let mut rank_counter: i64 = 0;
2892        let mut dense_rank_counter: i64 = 0;
2893        let mut prev_order_key: Option<Vec<Value>> = None;
2894        let mut same_rank_count: i64 = 0;
2895
2896        for sorted_pos in 0..n {
2897            let row_idx = indices[sorted_pos];
2898
2899            // Detect partition boundary.
2900            let new_partition = if sorted_pos == 0 {
2901                true
2902            } else {
2903                let prev_row_idx = indices[sorted_pos - 1];
2904                part_indices
2905                    .iter()
2906                    .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
2907            };
2908
2909            if new_partition {
2910                partition_start = sorted_pos;
2911                running_count = 0;
2912                running_int_sum = 0;
2913                running_float_sum = 0.0;
2914                running_saw_float = false;
2915                running_min = None;
2916                running_max = None;
2917                rank_counter = 0;
2918                dense_rank_counter = 0;
2919                prev_order_key = None;
2920                same_rank_count = 0;
2921            }
2922
2923            // Extract current order key for rank tracking.
2924            let current_order_key: Vec<Value> = ord_indices
2925                .iter()
2926                .map(|&(oi, _)| rows[row_idx][oi].clone())
2927                .collect();
2928            let same_as_prev = prev_order_key.as_ref() == Some(&current_order_key);
2929
2930            let value = match wdef.function {
2931                WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
2932                WindowFunc::Rank => {
2933                    if same_as_prev {
2934                        same_rank_count += 1;
2935                    } else {
2936                        rank_counter += same_rank_count + 1;
2937                        same_rank_count = 0;
2938                        if rank_counter == 0 {
2939                            rank_counter = 1;
2940                        }
2941                    }
2942                    Value::Int(rank_counter)
2943                }
2944                WindowFunc::DenseRank => {
2945                    if !same_as_prev {
2946                        dense_rank_counter += 1;
2947                    }
2948                    Value::Int(dense_rank_counter)
2949                }
2950                WindowFunc::Sum => {
2951                    if let Some(ci) = arg_col_idx {
2952                        match &rows[row_idx][ci] {
2953                            Value::Int(v) => running_int_sum += v,
2954                            Value::Float(v) => {
2955                                running_float_sum += v;
2956                                running_saw_float = true;
2957                            }
2958                            _ => {}
2959                        }
2960                    }
2961                    if running_saw_float {
2962                        Value::Float(running_float_sum + running_int_sum as f64)
2963                    } else {
2964                        Value::Int(running_int_sum)
2965                    }
2966                }
2967                WindowFunc::Avg => {
2968                    if let Some(ci) = arg_col_idx {
2969                        match &rows[row_idx][ci] {
2970                            Value::Int(v) => {
2971                                running_float_sum += *v as f64;
2972                                running_count += 1;
2973                            }
2974                            Value::Float(v) => {
2975                                running_float_sum += v;
2976                                running_count += 1;
2977                            }
2978                            _ => {}
2979                        }
2980                    }
2981                    if running_count == 0 {
2982                        Value::Empty
2983                    } else {
2984                        Value::Float(running_float_sum / running_count as f64)
2985                    }
2986                }
2987                WindowFunc::Count => {
2988                    if let Some(ci) = arg_col_idx {
2989                        if !rows[row_idx][ci].is_empty() {
2990                            running_count += 1;
2991                        }
2992                    } else {
2993                        // count(*) — count all rows
2994                        running_count += 1;
2995                    }
2996                    Value::Int(running_count)
2997                }
2998                WindowFunc::Min => {
2999                    if let Some(ci) = arg_col_idx {
3000                        let v = &rows[row_idx][ci];
3001                        if !v.is_empty() {
3002                            running_min = Some(match &running_min {
3003                                None => v.clone(),
3004                                Some(cur) => {
3005                                    if v < cur {
3006                                        v.clone()
3007                                    } else {
3008                                        cur.clone()
3009                                    }
3010                                }
3011                            });
3012                        }
3013                    }
3014                    running_min.clone().unwrap_or(Value::Empty)
3015                }
3016                WindowFunc::Max => {
3017                    if let Some(ci) = arg_col_idx {
3018                        let v = &rows[row_idx][ci];
3019                        if !v.is_empty() {
3020                            running_max = Some(match &running_max {
3021                                None => v.clone(),
3022                                Some(cur) => {
3023                                    if v > cur {
3024                                        v.clone()
3025                                    } else {
3026                                        cur.clone()
3027                                    }
3028                                }
3029                            });
3030                        }
3031                    }
3032                    running_max.clone().unwrap_or(Value::Empty)
3033                }
3034            };
3035
3036            prev_order_key = Some(current_order_key);
3037            win_values[row_idx] = value;
3038        }
3039
3040        // Append the computed window column to each row.
3041        for (ri, row) in rows.iter_mut().enumerate() {
3042            row.push(win_values[ri].clone());
3043        }
3044        columns.push(wdef.output_name.clone());
3045    }
3046
3047    Ok(QueryResult::Rows { columns, rows })
3048}
3049
3050/// Mission E2b: compute one aggregate over a set of rows in a group.
3051pub(super) fn compute_group_aggregate(
3052    func: AggFunc,
3053    all_rows: &[Vec<Value>],
3054    row_indices: &[usize],
3055    col_idx: usize,
3056) -> Value {
3057    match func {
3058        AggFunc::Count => {
3059            if col_idx == usize::MAX {
3060                // count(*) — count all rows in the group.
3061                return Value::Int(row_indices.len() as i64);
3062            }
3063            let count = row_indices
3064                .iter()
3065                .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3066                .count();
3067            Value::Int(count as i64)
3068        }
3069        AggFunc::CountDistinct => {
3070            let mut seen = std::collections::HashSet::new();
3071            for &ri in row_indices {
3072                let v = &all_rows[ri][col_idx];
3073                if !v.is_empty() {
3074                    seen.insert(v.clone());
3075                }
3076            }
3077            Value::Int(seen.len() as i64)
3078        }
3079        AggFunc::Sum => {
3080            // Mirror the scalar Sum path: accumulate int and float
3081            // contributions separately and promote the final result to
3082            // Float if any Float row was observed. Prevents silent
3083            // drop of Float columns in GROUP BY aggregates.
3084            let mut int_sum: i64 = 0;
3085            let mut float_sum: f64 = 0.0;
3086            let mut saw_float = false;
3087            for &ri in row_indices {
3088                match &all_rows[ri][col_idx] {
3089                    Value::Int(v) => int_sum += v,
3090                    Value::Float(v) => {
3091                        float_sum += *v;
3092                        saw_float = true;
3093                    }
3094                    _ => {}
3095                }
3096            }
3097            if saw_float {
3098                Value::Float(float_sum + int_sum as f64)
3099            } else {
3100                Value::Int(int_sum)
3101            }
3102        }
3103        AggFunc::Avg => {
3104            let mut sum = 0.0f64;
3105            let mut count = 0usize;
3106            for &ri in row_indices {
3107                match &all_rows[ri][col_idx] {
3108                    Value::Int(v) => {
3109                        sum += *v as f64;
3110                        count += 1;
3111                    }
3112                    Value::Float(v) => {
3113                        sum += *v;
3114                        count += 1;
3115                    }
3116                    _ => {}
3117                }
3118            }
3119            if count == 0 {
3120                Value::Empty
3121            } else {
3122                Value::Float(sum / count as f64)
3123            }
3124        }
3125        AggFunc::Min => row_indices
3126            .iter()
3127            .map(|&ri| &all_rows[ri][col_idx])
3128            .filter(|v| !v.is_empty())
3129            .min()
3130            .cloned()
3131            .unwrap_or(Value::Empty),
3132        AggFunc::Max => row_indices
3133            .iter()
3134            .map(|&ri| &all_rows[ri][col_idx])
3135            .filter(|v| !v.is_empty())
3136            .max()
3137            .cloned()
3138            .unwrap_or(Value::Empty),
3139    }
3140}
3141
3142/// Mission E1.3: try to extract equi-join key indices from a join `on`
3143/// predicate. Returns `Some((left_col_idx, right_col_idx))` when the
3144/// predicate is exactly `L = R` (or `R = L`) and both sides resolve
3145/// cleanly — `L` to the left subtree's column list and `R` to the right
3146/// subtree's column list.
3147///
3148/// This is deliberately narrow. We only recognise the two shapes:
3149///   * `QualifiedField = QualifiedField`  (`u.id = o.user_id`)
3150///   * `Field = Field`                    (`.id = .user_id`, unqualified)
3151///
3152/// Anything else — conjunctions, constants, function calls, or predicates
3153/// that touch the same side on both halves — falls through to the
3154/// nested-loop path unchanged.
3155pub(super) fn try_extract_equi_join_keys(
3156    pred: &Expr,
3157    left_columns: &[String],
3158    right_columns: &[String],
3159) -> Option<(usize, usize)> {
3160    let (lhs, op, rhs) = match pred {
3161        Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3162        _ => return None,
3163    };
3164    if op != BinOp::Eq {
3165        return None;
3166    }
3167    // Normal orientation: lhs in left, rhs in right.
3168    if let (Some(li), Some(ri)) = (
3169        resolve_side_column(lhs, left_columns),
3170        resolve_side_column(rhs, right_columns),
3171    ) {
3172        return Some((li, ri));
3173    }
3174    // Swapped: rhs in left, lhs in right. Both sides of `=` are
3175    // commutative so this is safe.
3176    if let (Some(li), Some(ri)) = (
3177        resolve_side_column(rhs, left_columns),
3178        resolve_side_column(lhs, right_columns),
3179    ) {
3180        return Some((li, ri));
3181    }
3182    None
3183}
3184
3185fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3186    match expr {
3187        Expr::QualifiedField { qualifier, field } => {
3188            // Byte-level match so we don't allocate a fresh `format!` on
3189            // every call — this runs once per plan, so allocation would be
3190            // cheap, but the match is trivial enough to keep inline with
3191            // the eval_expr version.
3192            let q = qualifier.as_bytes();
3193            let f = field.as_bytes();
3194            columns.iter().position(|c| {
3195                let b = c.as_bytes();
3196                b.len() == q.len() + 1 + f.len()
3197                    && b[..q.len()] == *q
3198                    && b[q.len()] == b'.'
3199                    && b[q.len() + 1..] == *f
3200            })
3201        }
3202        Expr::Field(name) => columns.iter().position(|c| c == name),
3203        _ => None,
3204    }
3205}
3206
3207/// Mission E1.3: O(L + R) hash join. Builds a `FxHashMap<Value, Vec<usize>>`
3208/// over the right (inner) side's join keys, then streams the left (outer)
3209/// side and for each probe row emits every combined row whose right-side
3210/// key matches. For `JoinKind::LeftOuter`, unmatched left rows are emitted
3211/// padded with `Value::Empty` on the right side.
3212///
3213/// The right side is always the build side. That choice is forced for
3214/// LeftOuter (the left side must stream so we can detect orphans), and
3215/// for Inner it's a reasonable default — left-deep plans tend to grow the
3216/// left side with each join, so the un-joined right leaf is often the
3217/// smaller of the two at each level.
3218pub(super) fn hash_join(
3219    left_columns: Vec<String>,
3220    left_rows: Vec<Vec<Value>>,
3221    right_columns: Vec<String>,
3222    right_rows: Vec<Vec<Value>>,
3223    left_key_idx: usize,
3224    right_key_idx: usize,
3225    kind: JoinKind,
3226) -> QueryResult {
3227    use rustc_hash::FxHashMap;
3228
3229    let n_left = left_columns.len();
3230    let n_right = right_columns.len();
3231    let mut columns = Vec::with_capacity(n_left + n_right);
3232    columns.extend(left_columns);
3233    columns.extend(right_columns);
3234
3235    // Build: right_key -> list of right-row indices. Pre-size to the row
3236    // count so the map doesn't rehash mid-build.
3237    let mut build: FxHashMap<Value, Vec<usize>> =
3238        FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3239    for (i, row) in right_rows.iter().enumerate() {
3240        // Skip Empty keys on the build side — they can never match under
3241        // SQL semantics (NULL ≠ NULL) and would collapse all nullables to
3242        // one bucket.
3243        if matches!(row[right_key_idx], Value::Empty) {
3244            continue;
3245        }
3246        build.entry(row[right_key_idx].clone()).or_default().push(i);
3247    }
3248
3249    // Reasonable starting capacity — inner joins produce ≥ left_rows.len()
3250    // rows in the common 1:1 case, left-outer always emits ≥ left_rows.len().
3251    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3252
3253    for left_row in &left_rows {
3254        let key = &left_row[left_key_idx];
3255        let matched = if matches!(key, Value::Empty) {
3256            None
3257        } else {
3258            build.get(key)
3259        };
3260        match matched {
3261            Some(matches) if !matches.is_empty() => {
3262                for &ri in matches {
3263                    let right_row = &right_rows[ri];
3264                    let mut combined = Vec::with_capacity(n_left + n_right);
3265                    combined.extend_from_slice(left_row);
3266                    combined.extend_from_slice(right_row);
3267                    rows.push(combined);
3268                }
3269            }
3270            _ => {
3271                if matches!(kind, JoinKind::LeftOuter) {
3272                    let mut row = Vec::with_capacity(n_left + n_right);
3273                    row.extend_from_slice(left_row);
3274                    row.resize(n_left + n_right, Value::Empty);
3275                    rows.push(row);
3276                }
3277            }
3278        }
3279    }
3280
3281    QueryResult::Rows { columns, rows }
3282}
3283
3284/// Lower unindexed `RangeScan` nodes to `Filter(SeqScan)` so that all
3285/// downstream fast paths (count, project+limit, sort+limit, agg, update,
3286/// delete) continue to fire.
3287///
3288/// The planner emits `RangeScan` speculatively for every range inequality
3289/// (`.age > 30`) because it has no catalog access. When the column has a
3290/// B-tree index, `RangeScan` is the correct plan. When it doesn't, the
3291/// executor's `RangeScan` fallback materialises every matching row with
3292/// full `decode_row` — bypassing the compiled-predicate fast paths that
3293/// `Filter(SeqScan)` would trigger.
3294///
3295/// This pass runs once per query, before execution.
3296pub(super) fn lower_unindexed_range_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3297    match plan {
3298        PlanNode::RangeScan {
3299            table,
3300            column,
3301            start,
3302            end,
3303        } => {
3304            if let Some(tbl) = catalog.get_table(table) {
3305                // Keep RangeScan only for unique indexes — their btree
3306                // stores raw column values. Non-unique indexes store
3307                // composite keys that don't directly compare against
3308                // column values, so lower them to Filter(SeqScan).
3309                if tbl.is_index_unique(column) == Some(true) {
3310                    return plan.clone();
3311                }
3312            }
3313            let pred = synthesize_range_predicate(column, start, end);
3314            PlanNode::Filter {
3315                input: Box::new(PlanNode::SeqScan {
3316                    table: table.clone(),
3317                }),
3318                predicate: pred,
3319            }
3320        }
3321        PlanNode::Filter { input, predicate } => PlanNode::Filter {
3322            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3323            predicate: predicate.clone(),
3324        },
3325        PlanNode::Project { input, fields } => PlanNode::Project {
3326            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3327            fields: fields.clone(),
3328        },
3329        PlanNode::Sort { input, keys } => PlanNode::Sort {
3330            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3331            keys: keys.clone(),
3332        },
3333        PlanNode::Limit { input, count } => PlanNode::Limit {
3334            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3335            count: count.clone(),
3336        },
3337        PlanNode::Offset { input, count } => PlanNode::Offset {
3338            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3339            count: count.clone(),
3340        },
3341        PlanNode::Aggregate {
3342            input,
3343            function,
3344            field,
3345        } => PlanNode::Aggregate {
3346            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3347            function: *function,
3348            field: field.clone(),
3349        },
3350        PlanNode::Distinct { input } => PlanNode::Distinct {
3351            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3352        },
3353        PlanNode::GroupBy {
3354            input,
3355            keys,
3356            aggregates,
3357            having,
3358        } => PlanNode::GroupBy {
3359            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3360            keys: keys.clone(),
3361            aggregates: aggregates.clone(),
3362            having: having.clone(),
3363        },
3364        PlanNode::Update {
3365            input,
3366            table,
3367            assignments,
3368        } => PlanNode::Update {
3369            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3370            table: table.clone(),
3371            assignments: assignments.clone(),
3372        },
3373        PlanNode::Delete { input, table } => PlanNode::Delete {
3374            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3375            table: table.clone(),
3376        },
3377        PlanNode::Window { input, windows } => PlanNode::Window {
3378            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3379            windows: windows.clone(),
3380        },
3381        PlanNode::Union { left, right, all } => PlanNode::Union {
3382            left: Box::new(lower_unindexed_range_scans(catalog, left)),
3383            right: Box::new(lower_unindexed_range_scans(catalog, right)),
3384            all: *all,
3385        },
3386        PlanNode::Explain { input } => PlanNode::Explain {
3387            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3388        },
3389        PlanNode::NestedLoopJoin {
3390            left,
3391            right,
3392            on,
3393            kind,
3394        } => PlanNode::NestedLoopJoin {
3395            left: Box::new(lower_unindexed_range_scans(catalog, left)),
3396            right: Box::new(lower_unindexed_range_scans(catalog, right)),
3397            on: on.clone(),
3398            kind: *kind,
3399        },
3400        // Leaf nodes: no children to recurse into.
3401        _ => plan.clone(),
3402    }
3403}
3404
3405/// Synthesize a range predicate from RangeScan bounds for the fallback path.
3406pub(super) fn synthesize_range_predicate(
3407    column: &str,
3408    start: &Option<(Expr, bool)>,
3409    end: &Option<(Expr, bool)>,
3410) -> Expr {
3411    let lower = start.as_ref().map(|(expr, inclusive)| {
3412        let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3413        Expr::BinaryOp(
3414            Box::new(Expr::Field(column.to_string())),
3415            op,
3416            Box::new(expr.clone()),
3417        )
3418    });
3419    let upper = end.as_ref().map(|(expr, inclusive)| {
3420        let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3421        Expr::BinaryOp(
3422            Box::new(Expr::Field(column.to_string())),
3423            op,
3424            Box::new(expr.clone()),
3425        )
3426    });
3427    match (lower, upper) {
3428        (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3429        (Some(l), None) => l,
3430        (None, Some(u)) => u,
3431        (None, None) => Expr::Literal(Literal::Bool(true)),
3432    }
3433}
3434
3435/// Check if a value falls within a range (used in last-resort decoded-row eval).
3436pub(super) fn range_matches(
3437    val: &Value,
3438    start: &Option<Value>,
3439    start_inc: bool,
3440    end: &Option<Value>,
3441    end_inc: bool,
3442) -> bool {
3443    if let Some(ref s) = start {
3444        if start_inc {
3445            if val < s {
3446                return false;
3447            }
3448        } else if val <= s {
3449            return false;
3450        }
3451    }
3452    if let Some(ref e) = end {
3453        if end_inc {
3454            if val > e {
3455                return false;
3456            }
3457        } else if val >= e {
3458            return false;
3459        }
3460    }
3461    true
3462}
3463
3464/// Format a `PlanNode` tree as a human-readable, indented text
3465/// representation. Used by the `EXPLAIN` command.
3466pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3467    let indent = "  ".repeat(depth);
3468    match plan {
3469        PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3470        PlanNode::AliasScan { table, alias } => {
3471            format!("{indent}AliasScan table={table} alias={alias}")
3472        }
3473        PlanNode::IndexScan { table, column, key } => {
3474            format!("{indent}IndexScan table={table} column={column} key={key:?}")
3475        }
3476        PlanNode::RangeScan {
3477            table,
3478            column,
3479            start,
3480            end,
3481        } => {
3482            let s = match start {
3483                Some((expr, inc)) => {
3484                    let op = if *inc { ">=" } else { ">" };
3485                    format!("{op}{expr:?}")
3486                }
3487                None => "unbounded".to_string(),
3488            };
3489            let e = match end {
3490                Some((expr, inc)) => {
3491                    let op = if *inc { "<=" } else { "<" };
3492                    format!("{op}{expr:?}")
3493                }
3494                None => "unbounded".to_string(),
3495            };
3496            format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3497        }
3498        PlanNode::Filter { input, predicate } => {
3499            let child = format_plan_tree(input, depth + 1);
3500            format!("{indent}Filter predicate={predicate:?}\n{child}")
3501        }
3502        PlanNode::Project { input, fields } => {
3503            let names: Vec<String> = fields
3504                .iter()
3505                .map(|f| match &f.alias {
3506                    Some(a) => format!("{a}: {:?}", f.expr),
3507                    None => format!("{:?}", f.expr),
3508                })
3509                .collect();
3510            let child = format_plan_tree(input, depth + 1);
3511            format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3512        }
3513        PlanNode::Sort { input, keys } => {
3514            let ks: Vec<String> = keys
3515                .iter()
3516                .map(|k| {
3517                    if k.descending {
3518                        format!("{} desc", k.field)
3519                    } else {
3520                        k.field.clone()
3521                    }
3522                })
3523                .collect();
3524            let child = format_plan_tree(input, depth + 1);
3525            format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3526        }
3527        PlanNode::Limit { input, count } => {
3528            let child = format_plan_tree(input, depth + 1);
3529            format!("{indent}Limit count={count:?}\n{child}")
3530        }
3531        PlanNode::Offset { input, count } => {
3532            let child = format_plan_tree(input, depth + 1);
3533            format!("{indent}Offset count={count:?}\n{child}")
3534        }
3535        PlanNode::Aggregate {
3536            input,
3537            function,
3538            field,
3539        } => {
3540            let f = field.as_deref().unwrap_or("*");
3541            let child = format_plan_tree(input, depth + 1);
3542            format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3543        }
3544        PlanNode::NestedLoopJoin {
3545            left,
3546            right,
3547            on,
3548            kind,
3549        } => {
3550            let left_child = format_plan_tree(left, depth + 1);
3551            let right_child = format_plan_tree(right, depth + 1);
3552            let on_str = match on {
3553                Some(pred) => format!("{pred:?}"),
3554                None => "none".to_string(),
3555            };
3556            format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3557        }
3558        PlanNode::Distinct { input } => {
3559            let child = format_plan_tree(input, depth + 1);
3560            format!("{indent}Distinct\n{child}")
3561        }
3562        PlanNode::GroupBy {
3563            input,
3564            keys,
3565            aggregates,
3566            having,
3567        } => {
3568            let agg_strs: Vec<String> = aggregates
3569                .iter()
3570                .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3571                .collect();
3572            let having_str = match having {
3573                Some(h) => format!(" having={h:?}"),
3574                None => String::new(),
3575            };
3576            let child = format_plan_tree(input, depth + 1);
3577            format!(
3578                "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3579                keys.join(", "),
3580                agg_strs.join(", "),
3581            )
3582        }
3583        PlanNode::Insert { table, assignments } => {
3584            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3585            format!("{indent}Insert table={table} cols=[{}]", cols.join(", "))
3586        }
3587        PlanNode::Upsert {
3588            table,
3589            key_column,
3590            assignments,
3591            on_conflict,
3592        } => {
3593            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3594            let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3595            if conflict_cols.is_empty() {
3596                format!(
3597                    "{indent}Upsert table={table} key={key_column} cols=[{}]",
3598                    cols.join(", ")
3599                )
3600            } else {
3601                format!(
3602                    "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3603                    cols.join(", "),
3604                    conflict_cols.join(", ")
3605                )
3606            }
3607        }
3608        PlanNode::Update {
3609            input,
3610            table,
3611            assignments,
3612        } => {
3613            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3614            let child = format_plan_tree(input, depth + 1);
3615            format!(
3616                "{indent}Update table={table} set=[{}]\n{child}",
3617                cols.join(", ")
3618            )
3619        }
3620        PlanNode::Delete { input, table } => {
3621            let child = format_plan_tree(input, depth + 1);
3622            format!("{indent}Delete table={table}\n{child}")
3623        }
3624        PlanNode::CreateTable { name, fields } => {
3625            let fs: Vec<String> = fields
3626                .iter()
3627                .map(|(n, t, r)| {
3628                    if *r {
3629                        format!("{n}: {t} required")
3630                    } else {
3631                        format!("{n}: {t}")
3632                    }
3633                })
3634                .collect();
3635            format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3636        }
3637        PlanNode::AlterTable { table, action } => {
3638            format!("{indent}AlterTable table={table} action={action:?}")
3639        }
3640        PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3641        PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3642        PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3643        PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3644        PlanNode::Window { input, windows } => {
3645            let ws: Vec<String> = windows
3646                .iter()
3647                .map(|w| format!("{:?} as {}", w.function, w.output_name))
3648                .collect();
3649            let child = format_plan_tree(input, depth + 1);
3650            format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3651        }
3652        PlanNode::Union { left, right, all } => {
3653            let kind = if *all { "UNION ALL" } else { "UNION" };
3654            let left_child = format_plan_tree(left, depth + 1);
3655            let right_child = format_plan_tree(right, depth + 1);
3656            format!("{indent}{kind}\n{left_child}\n{right_child}")
3657        }
3658        PlanNode::Explain { input } => {
3659            let child = format_plan_tree(input, depth + 1);
3660            format!("{indent}Explain\n{child}")
3661        }
3662        PlanNode::Begin => format!("{indent}Begin"),
3663        PlanNode::Commit => format!("{indent}Commit"),
3664        PlanNode::Rollback => format!("{indent}Rollback"),
3665    }
3666}