Skip to main content

powdb_query/executor/
plan_exec.rs

1//! The execute_plan method and associated helpers.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::{check_join_limit, Engine, MAX_SORT_ROWS};
15use powdb_storage::view::{ViewDef, ViewRegistry};
16
17impl Engine {
18    pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
19        match plan {
20            PlanNode::SeqScan { table } => {
21                // Auto-refresh dirty materialized views on read.
22                if self.view_registry.is_dirty(table) {
23                    self.refresh_view(table)?;
24                }
25                let schema = self
26                    .catalog
27                    .schema(table)
28                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
29                    .clone();
30                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
31                let rows: Vec<Vec<Value>> = self
32                    .catalog
33                    .scan(table)
34                    .map_err(|e| QueryError::StorageError(e.to_string()))?
35                    .map(|(_, row)| row)
36                    .collect();
37                Ok(QueryResult::Rows { columns, rows })
38            }
39
40            PlanNode::Filter { input, predicate } => {
41                // Materialize any IN-subqueries in the predicate before the
42                // scan loop — the closure can't call back into the engine.
43                // Correlated subqueries are left in place for per-row eval.
44                let materialized;
45                let predicate = if contains_subquery(predicate) {
46                    materialized = self.materialize_subqueries(predicate)?;
47                    &materialized
48                } else {
49                    predicate
50                };
51
52                // Correlated subquery path: per-row materialisation.
53                if contains_subquery(predicate) {
54                    let result = self.execute_plan(input)?;
55                    return match result {
56                        QueryResult::Rows { columns, rows } => {
57                            let mut filtered = Vec::new();
58                            for row in rows {
59                                let row_pred =
60                                    self.materialize_correlated_for_row(predicate, &row, &columns)?;
61                                if eval_predicate(&row_pred, &row, &columns) {
62                                    filtered.push(row);
63                                }
64                            }
65                            Ok(QueryResult::Rows {
66                                columns,
67                                rows: filtered,
68                            })
69                        }
70                        _ => Err("filter requires row input".into()),
71                    };
72                }
73
74                // Fast path: fuse Filter + SeqScan into a zero-copy streaming
75                // loop. Uses decode_column() to evaluate the predicate on only
76                // the columns it references, avoiding heap allocations for
77                // String/Bytes columns that aren't part of the filter.
78                if let PlanNode::SeqScan { table } = input.as_ref() {
79                    // Auto-refresh dirty materialized views.
80                    if self.view_registry.is_dirty(table) {
81                        self.refresh_view(table)?;
82                    }
83                    let schema = self
84                        .catalog
85                        .schema(table)
86                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
87                        .clone();
88                    let columns: Vec<String> =
89                        schema.columns.iter().map(|c| c.name.clone()).collect();
90                    let fast = FastLayout::new(&schema);
91                    let row_layout = RowLayout::new(&schema);
92                    // Mission F: pre-size to skip the first 4 Vec doublings
93                    // (4 → 8 → 16 → 32 → 64). On a 100K-row scan with 30%
94                    // selectivity that's ~4 fewer reallocations + memcpys.
95                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
96
97                    // Try compiled predicate for the filter check (handles
98                    // int leaves, string-eq leaves, and And conjunctions).
99                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
100                        self.catalog
101                            .for_each_row_raw(table, |_rid, data| {
102                                if compiled(data) {
103                                    rows.push(decode_row(&schema, data));
104                                }
105                            })
106                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
107                    } else {
108                        let pred_cols = predicate_column_indices(predicate, &columns);
109                        self.catalog
110                            .for_each_row_raw(table, |_rid, data| {
111                                let pred_row =
112                                    decode_selective(&schema, &row_layout, data, &pred_cols);
113                                if eval_predicate(predicate, &pred_row, &columns) {
114                                    rows.push(decode_row(&schema, data));
115                                }
116                            })
117                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
118                    }
119
120                    return Ok(QueryResult::Rows { columns, rows });
121                }
122
123                // General path: materialise then filter.
124                let result = self.execute_plan(input)?;
125                match result {
126                    QueryResult::Rows { columns, rows } => {
127                        let filtered: Vec<Vec<Value>> = rows
128                            .into_iter()
129                            .filter(|row| eval_predicate(predicate, row, &columns))
130                            .collect();
131                        Ok(QueryResult::Rows {
132                            columns,
133                            rows: filtered,
134                        })
135                    }
136                    _ => Err("filter requires row input".into()),
137                }
138            }
139
140            PlanNode::Project { input, fields } => {
141                // Fast path: Project over IndexScan — decode only projected
142                // columns from raw bytes instead of full decode_row.
143                if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
144                    let schema = self
145                        .catalog
146                        .schema(table)
147                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
148                        .clone();
149                    let all_columns: Vec<String> =
150                        schema.columns.iter().map(|c| c.name.clone()).collect();
151                    let key_value = literal_to_value(key)?;
152                    let tbl = self
153                        .catalog
154                        .get_table(table)
155                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
156
157                    let proj_columns: Vec<String> = fields
158                        .iter()
159                        .map(|f| {
160                            f.alias.clone().unwrap_or_else(|| match &f.expr {
161                                Expr::Field(name) => name.clone(),
162                                _ => "?".into(),
163                            })
164                        })
165                        .collect();
166
167                    // Determine which column indices the projection needs
168                    let proj_indices: Vec<usize> = fields
169                        .iter()
170                        .filter_map(|f| {
171                            if let Expr::Field(name) = &f.expr {
172                                all_columns.iter().position(|c| c == name)
173                            } else {
174                                None
175                            }
176                        })
177                        .collect();
178
179                    if tbl.has_index(column) {
180                        let layout = RowLayout::new(&schema);
181                        let rids = tbl.index_lookup_all(column, &key_value);
182                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
183                        for rid in rids {
184                            if let Some(data) = tbl.heap.get(rid) {
185                                let row: Vec<Value> = proj_indices
186                                    .iter()
187                                    .map(|&ci| decode_column(&schema, &layout, &data, ci))
188                                    .collect();
189                                rows.push(row);
190                            }
191                        }
192                        return Ok(QueryResult::Rows {
193                            columns: proj_columns,
194                            rows,
195                        });
196                    }
197                }
198
199                // Fast path: Project(Limit(Sort(Filter(SeqScan)))) — bounded
200                // top-N heap. Decodes only the sort key + projected columns,
201                // keeps at most `limit` rows in a heap. Also handles the
202                // Project(Limit(Sort(SeqScan))) variant (no filter).
203                if let PlanNode::Limit {
204                    input: inner,
205                    count: limit_expr,
206                } = input.as_ref()
207                {
208                    if let PlanNode::Sort {
209                        input: sort_input,
210                        keys,
211                    } = inner.as_ref()
212                    {
213                        // Fast path only for single-key sorts
214                        if keys.len() == 1 {
215                            let sort_field = &keys[0].field;
216                            let descending = keys[0].descending;
217                            let limit = match limit_expr {
218                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
219                                _ => usize::MAX,
220                            };
221                            let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
222                                match sort_input.as_ref() {
223                                    PlanNode::SeqScan { table } => (Some(table.as_str()), None),
224                                    PlanNode::Filter {
225                                        input: fi,
226                                        predicate,
227                                    } => {
228                                        if let PlanNode::SeqScan { table } = fi.as_ref() {
229                                            (Some(table.as_str()), Some(predicate))
230                                        } else {
231                                            (None, None)
232                                        }
233                                    }
234                                    _ => (None, None),
235                                };
236                            if let Some(table) = table_opt {
237                                if let Some(result) = self.project_filter_sort_limit_fast(
238                                    table, fields, sort_field, descending, limit, pred_opt,
239                                )? {
240                                    return Ok(result);
241                                }
242                            }
243                        }
244                    }
245                    // Fast path: Project(Limit(Filter(SeqScan))) — stream,
246                    // decode only projected columns, stop at limit.
247                    if let PlanNode::Filter {
248                        input: fi,
249                        predicate,
250                    } = inner.as_ref()
251                    {
252                        if let PlanNode::SeqScan { table } = fi.as_ref() {
253                            let limit = match limit_expr {
254                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
255                                _ => usize::MAX,
256                            };
257                            if let Some(result) = self.project_filter_limit_fast(
258                                table,
259                                fields,
260                                limit,
261                                Some(predicate),
262                            )? {
263                                return Ok(result);
264                            }
265                        }
266                    }
267                    // Fast path: Project(Limit(SeqScan)) — stream, no filter.
268                    if let PlanNode::SeqScan { table } = inner.as_ref() {
269                        let limit = match limit_expr {
270                            Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
271                            _ => usize::MAX,
272                        };
273                        if let Some(result) =
274                            self.project_filter_limit_fast(table, fields, limit, None)?
275                        {
276                            return Ok(result);
277                        }
278                    }
279                }
280
281                // Mission D4: Project(Filter(SeqScan)) without Limit. Reuses
282                // `project_filter_limit_fast` with limit = usize::MAX so the
283                // hot loop decodes only projected columns and uses the
284                // compiled predicate. Previously this fell through to the
285                // generic Filter branch which materialised every column via
286                // `decode_row` then re-projected — quadratic work.
287                //
288                // multi_col_and_filter (`U filter .age > 30 and .status =
289                // "active" { .name, .age }`) was 6.18ms (0.7x SQLite) and
290                // is the load-bearing workload for this fast path.
291                if let PlanNode::Filter {
292                    input: fi,
293                    predicate,
294                } = input.as_ref()
295                {
296                    if let PlanNode::SeqScan { table } = fi.as_ref() {
297                        if let Some(result) = self.project_filter_limit_fast(
298                            table,
299                            fields,
300                            usize::MAX,
301                            Some(predicate),
302                        )? {
303                            return Ok(result);
304                        }
305                    }
306                }
307
308                // Mission D4: Project(SeqScan) without Filter or Limit.
309                // Decode only projected columns; the previous fall-through
310                // built full Vec<Value> rows then re-projected.
311                if let PlanNode::SeqScan { table } = input.as_ref() {
312                    if let Some(result) =
313                        self.project_filter_limit_fast(table, fields, usize::MAX, None)?
314                    {
315                        return Ok(result);
316                    }
317                }
318
319                let result = self.execute_plan(input)?;
320                match result {
321                    QueryResult::Rows { columns, rows } => {
322                        let proj_columns: Vec<String> = fields
323                            .iter()
324                            .map(|f| {
325                                f.alias.clone().unwrap_or_else(|| match &f.expr {
326                                    Expr::Field(name) => name.clone(),
327                                    // Mission E1.2: `{ u.name }` projects as the
328                                    // qualified column name so callers can still
329                                    // disambiguate across the join output.
330                                    Expr::QualifiedField { qualifier, field } => {
331                                        format!("{qualifier}.{field}")
332                                    }
333                                    _ => "?".into(),
334                                })
335                            })
336                            .collect();
337                        let proj_rows: Vec<Vec<Value>> = rows
338                            .iter()
339                            .map(|row| {
340                                fields
341                                    .iter()
342                                    .map(|f| eval_expr(&f.expr, row, &columns))
343                                    .collect()
344                            })
345                            .collect();
346                        Ok(QueryResult::Rows {
347                            columns: proj_columns,
348                            rows: proj_rows,
349                        })
350                    }
351                    _ => Err("project requires row input".into()),
352                }
353            }
354
355            PlanNode::Sort { input, keys } => {
356                let result = self.execute_plan(input)?;
357                match result {
358                    QueryResult::Rows { columns, mut rows } => {
359                        if rows.len() > MAX_SORT_ROWS {
360                            return Err(QueryError::SortLimitExceeded);
361                        }
362                        let key_indices: Vec<(usize, bool)> = keys
363                            .iter()
364                            .map(|k| {
365                                columns
366                                    .iter()
367                                    .position(|c| c == &k.field)
368                                    .map(|idx| (idx, k.descending))
369                                    .ok_or_else(|| QueryError::ColumnNotFound {
370                                        table: String::new(),
371                                        column: k.field.clone(),
372                                    })
373                            })
374                            .collect::<Result<_, QueryError>>()?;
375                        rows.sort_by(|a, b| {
376                            for &(col_idx, descending) in &key_indices {
377                                let cmp = a[col_idx].cmp(&b[col_idx]);
378                                let cmp = if descending { cmp.reverse() } else { cmp };
379                                if cmp != std::cmp::Ordering::Equal {
380                                    return cmp;
381                                }
382                            }
383                            std::cmp::Ordering::Equal
384                        });
385                        Ok(QueryResult::Rows { columns, rows })
386                    }
387                    _ => Err("sort requires row input".into()),
388                }
389            }
390
391            PlanNode::Limit { input, count } => {
392                let result = self.execute_plan(input)?;
393                let n = match count {
394                    Expr::Literal(Literal::Int(v)) => *v as usize,
395                    _ => return Err("limit must be integer literal".into()),
396                };
397                match result {
398                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
399                        columns,
400                        rows: rows.into_iter().take(n).collect(),
401                    }),
402                    _ => Err("limit requires row input".into()),
403                }
404            }
405
406            PlanNode::Offset { input, count } => {
407                let result = self.execute_plan(input)?;
408                let n = match count {
409                    Expr::Literal(Literal::Int(v)) => *v as usize,
410                    _ => return Err("offset must be integer literal".into()),
411                };
412                match result {
413                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
414                        columns,
415                        rows: rows.into_iter().skip(n).collect(),
416                    }),
417                    _ => Err("offset requires row input".into()),
418                }
419            }
420
421            PlanNode::Aggregate {
422                input,
423                function,
424                field,
425            } => {
426                // Fast path: count() over SeqScan — count rows without any decode
427                if *function == AggFunc::Count {
428                    if let PlanNode::SeqScan { table } = input.as_ref() {
429                        let mut count: i64 = 0;
430                        self.catalog
431                            .for_each_row_raw(table, |_rid, _data| {
432                                count += 1;
433                            })
434                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
435                        return Ok(QueryResult::Scalar(Value::Int(count)));
436                    }
437                    // Fast path: count() over Filter(SeqScan) — try compiled
438                    // predicate first, fall back to decode_column path.
439                    if let PlanNode::Filter {
440                        input: inner,
441                        predicate,
442                    } = input.as_ref()
443                    {
444                        if let PlanNode::SeqScan { table } = inner.as_ref() {
445                            let schema = self
446                                .catalog
447                                .schema(table)
448                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
449                                .clone();
450                            let columns: Vec<String> =
451                                schema.columns.iter().map(|c| c.name.clone()).collect();
452                            let fast = FastLayout::new(&schema);
453                            let row_layout = RowLayout::new(&schema);
454
455                            // Try compiled predicate (zero-allocation hot path).
456                            // Handles int leaves, string-eq leaves, AND conjunctions.
457                            if let Some(compiled) =
458                                compile_predicate(predicate, &columns, &fast, &schema)
459                            {
460                                let mut count: i64 = 0;
461                                self.catalog
462                                    .for_each_row_raw(table, |_rid, data| {
463                                        if compiled(data) {
464                                            count += 1;
465                                        }
466                                    })
467                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
468                                return Ok(QueryResult::Scalar(Value::Int(count)));
469                            }
470
471                            // Fallback: decode predicate columns
472                            let pred_cols = predicate_column_indices(predicate, &columns);
473                            let mut count: i64 = 0;
474                            self.catalog
475                                .for_each_row_raw(table, |_rid, data| {
476                                    let pred_row =
477                                        decode_selective(&schema, &row_layout, data, &pred_cols);
478                                    if eval_predicate(predicate, &pred_row, &columns) {
479                                        count += 1;
480                                    }
481                                })
482                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
483
484                            return Ok(QueryResult::Scalar(Value::Int(count)));
485                        }
486                    }
487                }
488
489                // Fast path: sum/avg/min/max over a single fixed-size int
490                // column with an optional compiled filter predicate. Walks
491                // raw row bytes, zero allocation per row.
492                if matches!(
493                    function,
494                    AggFunc::Sum
495                        | AggFunc::Avg
496                        | AggFunc::Min
497                        | AggFunc::Max
498                        | AggFunc::CountDistinct
499                ) {
500                    if let Some(col) = field.as_ref() {
501                        // Shape: Aggregate(SeqScan) or Aggregate(Filter(SeqScan))
502                        let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
503                            match input.as_ref() {
504                                PlanNode::SeqScan { table } => (Some(table.as_str()), None),
505                                PlanNode::Filter {
506                                    input: inner,
507                                    predicate,
508                                } => {
509                                    if let PlanNode::SeqScan { table } = inner.as_ref() {
510                                        (Some(table.as_str()), Some(predicate))
511                                    } else {
512                                        (None, None)
513                                    }
514                                }
515                                _ => (None, None),
516                            };
517                        if let Some(table) = table_opt {
518                            if let Some(result) =
519                                self.agg_single_col_fast(table, col, *function, pred_opt)?
520                            {
521                                return Ok(result);
522                            }
523                        }
524                    }
525                }
526
527                // Fast path: Project(Limit(Filter(SeqScan))) — stream, decode
528                // only projected columns, stop once we hit the limit.
529                // (Handled in the Project branch; this branch only fires when
530                // the aggregate is the outer node.)
531                let result = self.execute_plan(input)?;
532                match result {
533                    QueryResult::Rows { columns, rows } => {
534                        match function {
535                            AggFunc::Count => {
536                                Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
537                            }
538                            AggFunc::CountDistinct => {
539                                let col = field.as_ref().ok_or("count distinct requires field")?;
540                                let idx = columns
541                                    .iter()
542                                    .position(|c| c == col)
543                                    .ok_or("col not found")?;
544                                let mut seen = std::collections::HashSet::new();
545                                for row in &rows {
546                                    let v = &row[idx];
547                                    if !v.is_empty() {
548                                        seen.insert(v.clone());
549                                    }
550                                }
551                                Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
552                            }
553                            AggFunc::Avg => {
554                                let col = field.as_ref().ok_or("avg requires field")?;
555                                let idx = columns
556                                    .iter()
557                                    .position(|c| c == col)
558                                    .ok_or("col not found")?;
559                                let sum: f64 = rows
560                                    .iter()
561                                    .filter_map(|r| match &r[idx] {
562                                        Value::Int(v) => Some(*v as f64),
563                                        Value::Float(v) => Some(*v),
564                                        _ => None,
565                                    })
566                                    .sum();
567                                let count = rows.len() as f64;
568                                Ok(QueryResult::Scalar(Value::Float(sum / count)))
569                            }
570                            AggFunc::Sum => {
571                                let col = field.as_ref().ok_or("sum requires field")?;
572                                let idx = columns
573                                    .iter()
574                                    .position(|c| c == col)
575                                    .ok_or("col not found")?;
576                                // Track int and float contributions separately so
577                                // Float columns (and mixed Int/Float rows) don't get
578                                // silently dropped as they did in the Int-only
579                                // version. If any Float is present, the whole sum
580                                // promotes to Float — matching Avg's semantics.
581                                let mut int_sum: i64 = 0;
582                                let mut float_sum: f64 = 0.0;
583                                let mut saw_float = false;
584                                for r in &rows {
585                                    match &r[idx] {
586                                        Value::Int(v) => int_sum += *v,
587                                        Value::Float(v) => {
588                                            float_sum += *v;
589                                            saw_float = true;
590                                        }
591                                        _ => {}
592                                    }
593                                }
594                                let result = if saw_float {
595                                    Value::Float(float_sum + int_sum as f64)
596                                } else {
597                                    Value::Int(int_sum)
598                                };
599                                Ok(QueryResult::Scalar(result))
600                            }
601                            AggFunc::Min | AggFunc::Max => {
602                                let col = field.as_ref().ok_or("min/max requires field")?;
603                                let idx = columns
604                                    .iter()
605                                    .position(|c| c == col)
606                                    .ok_or("col not found")?;
607                                let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
608                                let result = if *function == AggFunc::Min {
609                                    vals.into_iter().min().cloned()
610                                } else {
611                                    vals.into_iter().max().cloned()
612                                };
613                                Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
614                            }
615                        }
616                    }
617                    _ => Err("aggregate requires row input".into()),
618                }
619            }
620
621            PlanNode::Insert { table, assignments } => {
622                let values = {
623                    let schema = self
624                        .catalog
625                        .schema(table)
626                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
627                    let mut values = vec![Value::Empty; schema.columns.len()];
628                    for a in assignments {
629                        let idx = schema.column_index(&a.field).ok_or_else(|| {
630                            QueryError::ColumnNotFound {
631                                table: String::new(),
632                                column: a.field.clone(),
633                            }
634                        })?;
635                        let raw = literal_to_value(&a.value)?;
636                        values[idx] = coerce_value(raw, &schema.columns[idx])?;
637                    }
638                    for col in &schema.columns {
639                        if col.required && matches!(values[col.position as usize], Value::Empty) {
640                            return Err(QueryError::Execution(format!(
641                                "column '{}' is required but no value was provided",
642                                col.name
643                            )));
644                        }
645                    }
646                    values
647                };
648                self.catalog
649                    .insert(table, &values)
650                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
651                self.view_registry.mark_dependents_dirty(table);
652                Ok(QueryResult::Modified(1))
653            }
654
655            PlanNode::Upsert {
656                table,
657                key_column,
658                assignments,
659                on_conflict,
660            } => {
661                let (values, key_idx) = {
662                    let schema = self
663                        .catalog
664                        .schema(table)
665                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
666                    let mut values = vec![Value::Empty; schema.columns.len()];
667                    for a in assignments {
668                        let idx = schema.column_index(&a.field).ok_or_else(|| {
669                            QueryError::ColumnNotFound {
670                                table: String::new(),
671                                column: a.field.clone(),
672                            }
673                        })?;
674                        let raw = literal_to_value(&a.value)?;
675                        values[idx] = coerce_value(raw, &schema.columns[idx])?;
676                    }
677                    for col in &schema.columns {
678                        if col.required && matches!(values[col.position as usize], Value::Empty) {
679                            return Err(QueryError::Execution(format!(
680                                "column '{}' is required but no value was provided",
681                                col.name
682                            )));
683                        }
684                    }
685                    let key_idx = schema
686                        .column_index(key_column)
687                        .ok_or_else(|| format!("key column '{key_column}' not found"))?;
688                    (values, key_idx)
689                };
690
691                let key_value = values[key_idx].clone();
692
693                // Probe the index for a conflict.
694                let existing = {
695                    let tbl = self
696                        .catalog
697                        .get_table(table)
698                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
699                    if tbl.has_index(key_column) {
700                        // Upsert key lookup: return the first match.
701                        // For unique indexes this is the only match.
702                        // For non-unique indexes on a key column, also
703                        // just the first (upsert semantics).
704                        let rids = tbl.index_lookup_all(key_column, &key_value);
705                        rids.into_iter().next().and_then(|rid| {
706                            tbl.heap
707                                .get(rid)
708                                .map(|data| (rid, decode_row(&tbl.schema, &data)))
709                        })
710                    } else {
711                        // No index — linear scan for the key.
712                        let mut found = None;
713                        for (rid, row) in tbl.scan() {
714                            if row[key_idx] == key_value {
715                                found = Some((rid, row));
716                                break;
717                            }
718                        }
719                        found
720                    }
721                };
722
723                if let Some((rid, mut existing_row)) = existing {
724                    // Conflict: apply on_conflict assignments (or all non-key if empty).
725                    let update_assignments = if on_conflict.is_empty() {
726                        assignments
727                    } else {
728                        on_conflict
729                    };
730                    let changed_cols: Vec<usize> = {
731                        let schema = self
732                            .catalog
733                            .schema(table)
734                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
735                        let mut indices = Vec::new();
736                        for a in update_assignments {
737                            let idx = schema.column_index(&a.field).ok_or_else(|| {
738                                QueryError::ColumnNotFound {
739                                    table: String::new(),
740                                    column: a.field.clone(),
741                                }
742                            })?;
743                            if idx != key_idx {
744                                existing_row[idx] = literal_to_value(&a.value)?;
745                                indices.push(idx);
746                            }
747                        }
748                        indices
749                    };
750                    self.catalog
751                        .update_hinted(table, rid, &existing_row, Some(&changed_cols))
752                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
753                    self.view_registry.mark_dependents_dirty(table);
754                    Ok(QueryResult::Modified(1))
755                } else {
756                    // No conflict: insert.
757                    self.catalog
758                        .insert(table, &values)
759                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
760                    self.view_registry.mark_dependents_dirty(table);
761                    Ok(QueryResult::Modified(1))
762                }
763            }
764
765            PlanNode::Update {
766                input,
767                table,
768                assignments,
769            } => {
770                // Mission C Phase 3: resolve assignments against a borrowed
771                // schema, then drop the borrow before the mutation loop.
772                // Try literal-only path first; fall back to per-row expression
773                // evaluation if any assignment contains a non-literal expression
774                // (e.g., `age := .age + 1`).
775                let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
776                    let schema_ref = self
777                        .catalog
778                        .schema(table)
779                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
780                    let indices: Vec<usize> = assignments
781                        .iter()
782                        .map(|a| {
783                            schema_ref.column_index(&a.field).ok_or_else(|| {
784                                QueryError::ColumnNotFound {
785                                    table: String::new(),
786                                    column: a.field.clone(),
787                                }
788                            })
789                        })
790                        .collect::<Result<_, _>>()?;
791                    let vals: Result<Vec<Value>, _> = assignments
792                        .iter()
793                        .map(|a| literal_to_value(&a.value))
794                        .collect();
795                    (indices, vals.ok())
796                };
797                let resolved_assignments: Option<Vec<(usize, Value)>> =
798                    literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
799
800                // Mission C Phase 2: the hint Table::update_hinted needs to
801                // decide whether to read the old row for index diff.
802                let changed_cols: Vec<usize> = col_indices.clone();
803
804                // ── Fused scan+update for Update(Filter(SeqScan)) ────────
805                // Perf sprint: instead of the two-pass collect-RIDs-then-loop
806                // pattern (which pays one ensure_hot per matched row on the
807                // second pass), fuse the predicate evaluation and in-place
808                // byte-level mutation into a single heap walk. Same idea as
809                // the fused scan_delete_matching path for deletes.
810                if let Some(ref resolved_assignments) = resolved_assignments {
811                    if let PlanNode::Filter {
812                        input: inner,
813                        predicate,
814                    } = input.as_ref()
815                    {
816                        if let PlanNode::SeqScan { table: t } = inner.as_ref() {
817                            if t == table {
818                                let fused_result = self.try_fused_scan_update(
819                                    table,
820                                    predicate,
821                                    resolved_assignments,
822                                    &changed_cols,
823                                );
824                                if let Some(result) = fused_result {
825                                    return result;
826                                }
827                            }
828                        }
829                    }
830                }
831
832                // Collect matching RowIds in a single pass.
833                let matching_rids = self.collect_rids_for_mutation(input, table)?;
834
835                // ── Literal-only fast paths ─────────────────────────────
836                if let Some(ref resolved_assignments) = resolved_assignments {
837                    // Mission C Phase 4: in-place byte-patch fast path. If every
838                    // assignment targets a fixed-size non-null column AND none of
839                    // them is indexed, we can skip decode_row / Vec<Value> /
840                    // encode_row_into entirely and patch the row's raw bytes on
841                    // the hot page.
842                    let fast_patch: Option<Vec<FastPatch>> = {
843                        let tbl = self
844                            .catalog
845                            .get_table(table)
846                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
847                        let schema = &tbl.schema;
848                        let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
849                            is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
850                        });
851                        let no_indexed = !resolved_assignments
852                            .iter()
853                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
854
855                        if all_fixed_nonnull && no_indexed {
856                            let layout = RowLayout::new(schema);
857                            let bitmap_size = layout.bitmap_size();
858                            let patches: Vec<FastPatch> = resolved_assignments
859                                .iter()
860                                .map(|(idx, val)| {
861                                    let fixed_off = layout
862                                        .fixed_offset(*idx)
863                                        .expect("is_fixed_size already checked");
864                                    let field_off = 2 + bitmap_size + fixed_off;
865                                    let bytes: FixedBytes = match val {
866                                        Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
867                                        Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
868                                        Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
869                                        Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
870                                        Value::Uuid(v) => FixedBytes::Uuid(*v),
871                                        _ => unreachable!("all_fixed_nonnull guard lied"),
872                                    };
873                                    FastPatch {
874                                        field_off,
875                                        bitmap_byte_off: 2 + idx / 8,
876                                        bit_mask: 1u8 << (idx % 8),
877                                        bytes,
878                                    }
879                                })
880                                .collect();
881                            Some(patches)
882                        } else {
883                            None
884                        }
885                    };
886
887                    if let Some(patches) = fast_patch {
888                        let mut count = 0u64;
889                        for rid in matching_rids {
890                            // Mission B2: WAL-log every patch so crash
891                            // recovery replays the update. Same mutation
892                            // closure as before — the wrapper just sandwiches
893                            // it between a hot-page read and a WAL append.
894                            let ok = self
895                                .catalog
896                                .update_row_bytes_logged(table, rid, |row| {
897                                    for p in &patches {
898                                        row[p.bitmap_byte_off] &= !p.bit_mask;
899                                        let field_bytes = p.bytes.as_slice();
900                                        row[p.field_off..p.field_off + field_bytes.len()]
901                                            .copy_from_slice(field_bytes);
902                                    }
903                                })
904                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
905                            if ok {
906                                count += 1;
907                            }
908                        }
909                        self.view_registry.mark_dependents_dirty(table);
910                        return Ok(QueryResult::Modified(count));
911                    }
912
913                    // Mission C Phase 10: var-column in-place shrink fast path.
914                    let var_fast: Option<(usize, Option<Vec<u8>>)> = {
915                        let tbl = self
916                            .catalog
917                            .get_table(table)
918                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
919                        let schema = &tbl.schema;
920                        let is_single = resolved_assignments.len() == 1;
921                        let is_var_col = is_single
922                            && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
923                        let no_indexed = !resolved_assignments
924                            .iter()
925                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
926
927                        if is_single && is_var_col && no_indexed {
928                            let (idx, val) = &resolved_assignments[0];
929                            let bytes_opt: Option<Vec<u8>> = match val {
930                                Value::Str(s) => Some(s.as_bytes().to_vec()),
931                                Value::Bytes(b) => Some(b.clone()),
932                                Value::Empty => None,
933                                _ => {
934                                    return Err(QueryError::TypeError(format!(
935                                        "cannot assign non-var value to var column '{}'",
936                                        schema.columns[*idx].name
937                                    )))
938                                }
939                            };
940                            Some((*idx, bytes_opt))
941                        } else {
942                            None
943                        }
944                    };
945
946                    if let Some((col_idx, new_bytes_opt)) = var_fast {
947                        let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
948                        let mut count = 0u64;
949                        let mut fallback_rids: Vec<RowId> = Vec::new();
950                        for rid in &matching_rids {
951                            // Mission B2: logged variant so crash recovery
952                            // replays the shrink. On a false return (row
953                            // would have to grow), the rid is pushed to
954                            // `fallback_rids` and the slower `update_hinted`
955                            // path — which is already WAL-logged — picks it up.
956                            let ok = self
957                                .catalog
958                                .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
959                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
960                            if ok {
961                                count += 1;
962                            } else {
963                                fallback_rids.push(*rid);
964                            }
965                        }
966                        for rid in fallback_rids {
967                            let mut row = match self.catalog.get(table, rid) {
968                                Some(r) => r,
969                                None => continue,
970                            };
971                            for (idx, val) in resolved_assignments.iter() {
972                                row[*idx] = val.clone();
973                            }
974                            self.catalog
975                                .update_hinted(table, rid, &row, Some(&changed_cols))
976                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
977                            count += 1;
978                        }
979                        self.view_registry.mark_dependents_dirty(table);
980                        return Ok(QueryResult::Modified(count));
981                    }
982
983                    // Generic literal path: decode row, apply literal values.
984                    let mut count = 0u64;
985                    for rid in matching_rids {
986                        let mut row = match self.catalog.get(table, rid) {
987                            Some(r) => r,
988                            None => continue,
989                        };
990                        for (idx, val) in resolved_assignments.iter() {
991                            row[*idx] = val.clone();
992                        }
993                        self.catalog
994                            .update_hinted(table, rid, &row, Some(&changed_cols))
995                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
996                        count += 1;
997                    }
998                    self.view_registry.mark_dependents_dirty(table);
999                    return Ok(QueryResult::Modified(count));
1000                } // end if let Some(resolved_assignments)
1001
1002                // ── Expression-based update path ────────────────────────
1003                // At least one assignment contains a non-literal expression
1004                // (e.g., `age := .age + 1`). Evaluate per-row.
1005                let col_names: Vec<String> = {
1006                    let schema_ref = self
1007                        .catalog
1008                        .schema(table)
1009                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1010                    schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1011                };
1012                let mut count = 0u64;
1013                for rid in matching_rids {
1014                    let mut row = match self.catalog.get(table, rid) {
1015                        Some(r) => r,
1016                        None => continue,
1017                    };
1018                    for (i, asgn) in assignments.iter().enumerate() {
1019                        let val = eval_expr(&asgn.value, &row, &col_names);
1020                        row[col_indices[i]] = val;
1021                    }
1022                    self.catalog
1023                        .update_hinted(table, rid, &row, Some(&changed_cols))
1024                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1025                    count += 1;
1026                }
1027                self.view_registry.mark_dependents_dirty(table);
1028                Ok(QueryResult::Modified(count))
1029            }
1030
1031            PlanNode::Delete { input, table } => {
1032                // Mission C Phase 3: no schema clone — collect_rids_for_mutation
1033                // looks up schema internally when it needs one, and the mutation
1034                // loop doesn't need the schema at all.
1035                //
1036                // Mission C Phase 12: route bulk deletes through
1037                // `Catalog::delete_many`, which batches the btree leaf
1038                // compaction and shares one `ensure_hot` per row between
1039                // the index-key extraction and the slot delete. On
1040                // `delete_by_filter` (100K fixture, ~20K matches) that
1041                // removes ~4ms of pure `Vec::remove` memmove from the btree
1042                // maintenance phase.
1043                //
1044                // Mission C Phase 16: for the common `delete where ...`
1045                // shape (Filter(SeqScan)) — and the rarer "delete
1046                // everything" shape (SeqScan) — skip the two-pass
1047                // `collect_rids_for_mutation` + `delete_many` flow entirely.
1048                // The fused `scan_delete_matching` primitive walks the
1049                // heap exactly once, paying one `ensure_hot` per page
1050                // instead of per-row. That closes the last major gap on
1051                // the bench's `delete_by_filter` workload.
1052                if let PlanNode::Filter {
1053                    input: inner,
1054                    predicate,
1055                } = input.as_ref()
1056                {
1057                    if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1058                        if t == table {
1059                            let schema = self
1060                                .catalog
1061                                .schema(table)
1062                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1063                            let columns: Vec<String> =
1064                                schema.columns.iter().map(|c| c.name.clone()).collect();
1065                            let fast = FastLayout::new(schema);
1066                            if let Some(compiled) =
1067                                compile_predicate(predicate, &columns, &fast, schema)
1068                            {
1069                                // Mission B2: logged variant so every
1070                                // matched rid hits the WAL during the
1071                                // single-pass scan. Structure of the
1072                                // fused scan is unchanged — only the
1073                                // hook closure now also appends.
1074                                let count = self
1075                                    .catalog
1076                                    .scan_delete_matching_logged(table, |data| compiled(data))
1077                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1078                                self.view_registry.mark_dependents_dirty(table);
1079                                return Ok(QueryResult::Modified(count));
1080                            }
1081                        }
1082                    }
1083                } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1084                    if t == table {
1085                        // `delete from T` with no predicate — every live
1086                        // row matches. One pass is still the right shape.
1087                        // Mission B2: logged variant — see above.
1088                        let count = self
1089                            .catalog
1090                            .scan_delete_matching_logged(table, |_| true)
1091                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1092                        self.view_registry.mark_dependents_dirty(table);
1093                        return Ok(QueryResult::Modified(count));
1094                    }
1095                }
1096
1097                let matching_rids = self.collect_rids_for_mutation(input, table)?;
1098                let count = self
1099                    .catalog
1100                    .delete_many(table, &matching_rids)
1101                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1102                self.view_registry.mark_dependents_dirty(table);
1103                Ok(QueryResult::Modified(count))
1104            }
1105
1106            PlanNode::AliasScan { table, alias } => {
1107                // Mission E1.2: scan `table` and rename every output column
1108                // to `alias.field`. Used as a join leaf so downstream
1109                // NestedLoopJoin + Filter + Project nodes can resolve
1110                // `Expr::QualifiedField` lookups by direct column-name match.
1111                //
1112                // We don't bother with a fused zero-copy loop here yet — the
1113                // whole join path is nested-loop and correctness-first
1114                // (Phase E1.3 will introduce hash join and at that point we
1115                // can revisit whether to specialise AliasScan).
1116                let schema = self
1117                    .catalog
1118                    .schema(table)
1119                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1120                    .clone();
1121                let columns: Vec<String> = schema
1122                    .columns
1123                    .iter()
1124                    .map(|c| format!("{alias}.{}", c.name))
1125                    .collect();
1126                let rows: Vec<Vec<Value>> = self
1127                    .catalog
1128                    .scan(table)
1129                    .map_err(|e| QueryError::StorageError(e.to_string()))?
1130                    .map(|(_, row)| row)
1131                    .collect();
1132                Ok(QueryResult::Rows { columns, rows })
1133            }
1134
1135            PlanNode::NestedLoopJoin {
1136                left,
1137                right,
1138                on,
1139                kind,
1140            } => {
1141                // Materialise both sides. The executor ships two strategies:
1142                //   1. Hash join (E1.3) — when the `on` predicate is a
1143                //      simple equi-predicate `left_col = right_col`, build a
1144                //      FxHashMap<Value, Vec<row_idx>> over the right side
1145                //      and probe with the left side. O(L + R) instead of
1146                //      O(L × R). Handles Inner and LeftOuter.
1147                //   2. Nested loop (E1.2) — fallback for Cross, non-equi
1148                //      predicates, or `on` expressions that reference
1149                //      either side with something more complex than a
1150                //      QualifiedField.
1151                let left_result = self.execute_plan(left)?;
1152                let right_result = self.execute_plan(right)?;
1153                let (left_columns, left_rows) = match left_result {
1154                    QueryResult::Rows { columns, rows } => (columns, rows),
1155                    _ => return Err("join left side must produce rows".into()),
1156                };
1157                let (right_columns, right_rows) = match right_result {
1158                    QueryResult::Rows { columns, rows } => (columns, rows),
1159                    _ => return Err("join right side must produce rows".into()),
1160                };
1161
1162                // Hash-join fast path.
1163                if !matches!(kind, JoinKind::Cross) {
1164                    if let Some(pred) = on {
1165                        if let Some((l_idx, r_idx)) =
1166                            try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1167                        {
1168                            let result = hash_join(
1169                                left_columns,
1170                                left_rows,
1171                                right_columns,
1172                                right_rows,
1173                                l_idx,
1174                                r_idx,
1175                                *kind,
1176                            );
1177                            if let QueryResult::Rows { ref rows, .. } = result {
1178                                check_join_limit(rows.len())?;
1179                            }
1180                            return Ok(result);
1181                        }
1182                    }
1183                }
1184
1185                // Nested-loop fallback.
1186                let n_left = left_columns.len();
1187                let n_right = right_columns.len();
1188                let mut columns = Vec::with_capacity(n_left + n_right);
1189                columns.extend(left_columns);
1190                columns.extend(right_columns);
1191
1192                let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1193                let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1194
1195                for left_row in &left_rows {
1196                    let mut matched = false;
1197                    for right_row in &right_rows {
1198                        combined.clear();
1199                        combined.extend_from_slice(left_row);
1200                        combined.extend_from_slice(right_row);
1201                        let keep = match kind {
1202                            JoinKind::Cross => true,
1203                            JoinKind::Inner | JoinKind::LeftOuter => match on {
1204                                Some(pred) => eval_predicate(pred, &combined, &columns),
1205                                // Missing `on` for non-cross joins is a
1206                                // parser error, but if it slips through we
1207                                // treat it as "match everything".
1208                                None => true,
1209                            },
1210                            // RightOuter is rewritten to LeftOuter by the
1211                            // planner, so we never see it here.
1212                            JoinKind::RightOuter => {
1213                                unreachable!("planner rewrites RightOuter to LeftOuter")
1214                            }
1215                        };
1216                        if keep {
1217                            rows.push(combined.clone());
1218                            check_join_limit(rows.len())?;
1219                            matched = true;
1220                        }
1221                    }
1222                    if !matched && matches!(kind, JoinKind::LeftOuter) {
1223                        let mut row = Vec::with_capacity(n_left + n_right);
1224                        row.extend_from_slice(left_row);
1225                        row.resize(n_left + n_right, Value::Empty);
1226                        rows.push(row);
1227                        check_join_limit(rows.len())?;
1228                    }
1229                }
1230
1231                Ok(QueryResult::Rows { columns, rows })
1232            }
1233
1234            PlanNode::Distinct { input } => {
1235                let result = self.execute_plan(input)?;
1236                match result {
1237                    QueryResult::Rows { columns, rows } => {
1238                        let mut seen = std::collections::HashSet::new();
1239                        let mut unique_rows = Vec::new();
1240                        for row in rows {
1241                            if seen.insert(row.clone()) {
1242                                unique_rows.push(row);
1243                            }
1244                        }
1245                        Ok(QueryResult::Rows {
1246                            columns,
1247                            rows: unique_rows,
1248                        })
1249                    }
1250                    other => Ok(other),
1251                }
1252            }
1253
1254            PlanNode::GroupBy {
1255                input,
1256                keys,
1257                aggregates,
1258                having,
1259            } => {
1260                let result = self.execute_plan(input)?;
1261                match result {
1262                    QueryResult::Rows { columns, rows } => {
1263                        // Resolve key column indices.
1264                        let key_indices: Vec<usize> = keys
1265                            .iter()
1266                            .map(|k| {
1267                                columns
1268                                    .iter()
1269                                    .position(|c| c == k)
1270                                    .ok_or_else(|| format!("group-by column '{k}' not found"))
1271                            })
1272                            .collect::<Result<Vec<_>, _>>()?;
1273
1274                        // Resolve aggregate field indices. count(*) uses
1275                        // sentinel usize::MAX — compute_group_aggregate
1276                        // treats it as "count all rows in the group".
1277                        let agg_field_indices: Vec<usize> = aggregates
1278                            .iter()
1279                            .map(|a| {
1280                                if a.field == "*" {
1281                                    Ok(usize::MAX)
1282                                } else {
1283                                    columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1284                                        format!("aggregate column '{}' not found", a.field)
1285                                    })
1286                                }
1287                            })
1288                            .collect::<Result<Vec<_>, _>>()?;
1289
1290                        // Group rows by key values (preserving insertion order).
1291                        let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1292                            rustc_hash::FxHashMap::default();
1293                        let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1294                        for (ri, row) in rows.iter().enumerate() {
1295                            let key: Vec<Value> =
1296                                key_indices.iter().map(|&i| row[i].clone()).collect();
1297                            match group_map.get(&key) {
1298                                Some(&idx) => groups[idx].1.push(ri),
1299                                None => {
1300                                    let idx = groups.len();
1301                                    group_map.insert(key.clone(), idx);
1302                                    groups.push((key, vec![ri]));
1303                                }
1304                            }
1305                        }
1306
1307                        // Build output column names: keys ++ aggregate output names.
1308                        let mut out_columns: Vec<String> = keys.clone();
1309                        for agg in aggregates.iter() {
1310                            out_columns.push(agg.output_name.clone());
1311                        }
1312
1313                        // Compute aggregates per group.
1314                        let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1315                        for (key_vals, row_indices) in &groups {
1316                            let mut row = key_vals.clone();
1317                            for (ai, agg) in aggregates.iter().enumerate() {
1318                                let col_idx = agg_field_indices[ai];
1319                                let val = compute_group_aggregate(
1320                                    agg.function,
1321                                    &rows,
1322                                    row_indices,
1323                                    col_idx,
1324                                );
1325                                row.push(val);
1326                            }
1327                            out_rows.push(row);
1328                        }
1329
1330                        // Apply HAVING filter.
1331                        if let Some(having_expr) = having {
1332                            out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1333                        }
1334
1335                        Ok(QueryResult::Rows {
1336                            columns: out_columns,
1337                            rows: out_rows,
1338                        })
1339                    }
1340                    _ => Err("group by requires row input".into()),
1341                }
1342            }
1343
1344            PlanNode::CreateTable { name, fields } => {
1345                let columns: Vec<ColumnDef> = fields
1346                    .iter()
1347                    .enumerate()
1348                    .map(
1349                        |(i, (fname, tname, req))| -> Result<ColumnDef, QueryError> {
1350                            Ok(ColumnDef {
1351                                name: fname.clone(),
1352                                type_id: type_name_to_id(tname).map_err(QueryError::TypeError)?,
1353                                required: *req,
1354                                position: i as u16,
1355                            })
1356                        },
1357                    )
1358                    .collect::<Result<Vec<_>, _>>()?;
1359                let schema = Schema {
1360                    table_name: name.clone(),
1361                    columns,
1362                };
1363                self.catalog
1364                    .create_table(schema)
1365                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1366                Ok(QueryResult::Created(name.clone()))
1367            }
1368
1369            PlanNode::AlterTable { table, action } => match action {
1370                AlterAction::AddColumn {
1371                    name,
1372                    type_name,
1373                    required,
1374                } => {
1375                    let position = self
1376                        .catalog
1377                        .schema(table)
1378                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1379                        .columns
1380                        .len() as u16;
1381                    let col = ColumnDef {
1382                        name: name.clone(),
1383                        type_id: type_name_to_id(type_name).map_err(QueryError::TypeError)?,
1384                        required: *required,
1385                        position,
1386                    };
1387                    self.catalog
1388                        .alter_table_add_column(table, col)
1389                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1390                    Ok(QueryResult::Executed {
1391                        message: format!("column '{name}' added to '{table}'"),
1392                    })
1393                }
1394                AlterAction::DropColumn { name } => {
1395                    self.catalog
1396                        .alter_table_drop_column(table, name)
1397                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1398                    Ok(QueryResult::Executed {
1399                        message: format!("column '{name}' dropped from '{table}'"),
1400                    })
1401                }
1402                AlterAction::AddIndex { column } => {
1403                    self.catalog
1404                        .create_index(table, column)
1405                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1406                    Ok(QueryResult::Executed {
1407                        message: format!("index on '{table}.{column}' created"),
1408                    })
1409                }
1410            },
1411
1412            PlanNode::DropTable { name } => {
1413                self.catalog
1414                    .drop_table(name)
1415                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1416                Ok(QueryResult::Executed {
1417                    message: format!("table '{name}' dropped"),
1418                })
1419            }
1420
1421            PlanNode::CreateView { name, query_text } => {
1422                self.create_view(name, query_text)?;
1423                Ok(QueryResult::Executed {
1424                    message: format!("materialized view '{name}' created"),
1425                })
1426            }
1427
1428            PlanNode::RefreshView { name } => {
1429                self.refresh_view(name)?;
1430                Ok(QueryResult::Executed {
1431                    message: format!("materialized view '{name}' refreshed"),
1432                })
1433            }
1434
1435            PlanNode::DropView { name } => {
1436                self.drop_view(name)?;
1437                Ok(QueryResult::Executed {
1438                    message: format!("materialized view '{name}' dropped"),
1439                })
1440            }
1441
1442            PlanNode::Window { input, windows } => {
1443                let result = self.execute_plan(input)?;
1444                execute_window(result, windows)
1445            }
1446
1447            PlanNode::Union { left, right, all } => {
1448                let left_result = self.execute_plan(left)?;
1449                let right_result = self.execute_plan(right)?;
1450                let (left_cols, left_rows) = match left_result {
1451                    QueryResult::Rows { columns, rows } => (columns, rows),
1452                    _ => return Err("UNION requires query results on left side".into()),
1453                };
1454                let (_, right_rows) = match right_result {
1455                    QueryResult::Rows { columns, rows } => (columns, rows),
1456                    _ => return Err("UNION requires query results on right side".into()),
1457                };
1458                let mut combined = left_rows;
1459                if *all {
1460                    // UNION ALL — just concatenate.
1461                    combined.extend(right_rows);
1462                } else {
1463                    // UNION — deduplicate using the same HashSet approach
1464                    // as DISTINCT. Value already implements Hash + Eq.
1465                    let mut seen = std::collections::HashSet::new();
1466                    for row in &combined {
1467                        seen.insert(row.clone());
1468                    }
1469                    for row in right_rows {
1470                        if seen.insert(row.clone()) {
1471                            combined.push(row);
1472                        }
1473                    }
1474                }
1475                Ok(QueryResult::Rows {
1476                    columns: left_cols,
1477                    rows: combined,
1478                })
1479            }
1480
1481            PlanNode::Explain { input } => {
1482                let text = format_plan_tree(input, 0);
1483                Ok(QueryResult::Rows {
1484                    columns: vec!["plan".to_string()],
1485                    rows: text
1486                        .lines()
1487                        .map(|line| vec![Value::Str(line.to_string())])
1488                        .collect(),
1489                })
1490            }
1491
1492            PlanNode::Begin => {
1493                if self.in_transaction {
1494                    return Err(QueryError::Execution(
1495                        "already in a transaction (nested transactions not supported)".into(),
1496                    ));
1497                }
1498                self.in_transaction = true;
1499                Ok(QueryResult::Executed {
1500                    message: "transaction started".to_string(),
1501                })
1502            }
1503
1504            PlanNode::Commit => {
1505                if !self.in_transaction {
1506                    return Err(QueryError::Execution(
1507                        "no active transaction to commit".into(),
1508                    ));
1509                }
1510                self.in_transaction = false;
1511                self.catalog
1512                    .sync_wal()
1513                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1514                Ok(QueryResult::Executed {
1515                    message: "transaction committed".to_string(),
1516                })
1517            }
1518
1519            PlanNode::Rollback => {
1520                if !self.in_transaction {
1521                    return Err(QueryError::Execution(
1522                        "no active transaction to roll back".into(),
1523                    ));
1524                }
1525                self.in_transaction = false;
1526                self.catalog
1527                    .rollback_to_last_sync()
1528                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1529                if let Ok(mut cache) = self.plan_cache.lock() {
1530                    cache.clear();
1531                }
1532                self.view_registry = ViewRegistry::open(self.catalog.data_dir())
1533                    .unwrap_or_else(|_| ViewRegistry::new(self.catalog.data_dir()));
1534                Ok(QueryResult::Executed {
1535                    message: "transaction rolled back".to_string(),
1536                })
1537            }
1538
1539            PlanNode::IndexScan { table, column, key } => {
1540                let key_value = literal_to_value(key)?;
1541                let tbl = self
1542                    .catalog
1543                    .get_table(table)
1544                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1545                let columns: Vec<String> =
1546                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1547
1548                // Fast path: the table has a B-tree on this column.
1549                // Uses index_lookup_all to return ALL matching rows for
1550                // both unique and non-unique indexes.
1551                if tbl.has_index(column) {
1552                    let rids = tbl.index_lookup_all(column, &key_value);
1553                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1554                    for rid in rids {
1555                        if let Some(data) = tbl.heap.get(rid) {
1556                            rows.push(decode_row(&tbl.schema, &data));
1557                        }
1558                    }
1559                    return Ok(QueryResult::Rows { columns, rows });
1560                }
1561
1562                // Fallback: no index on this column. The planner emits IndexScan
1563                // eagerly (it has no visibility into which columns are indexed
1564                // at plan time), so here we must behave like SeqScan+Filter on
1565                // `.col = literal`: return *all* matching rows, not just the
1566                // first one. A non-indexed column isn't necessarily unique.
1567                // We compile the eq predicate once and stream without any
1568                // per-row decode for non-matching rows.
1569                let schema = &tbl.schema;
1570                let fast = FastLayout::new(schema);
1571                let synth_pred = Expr::BinaryOp(
1572                    Box::new(Expr::Field(column.clone())),
1573                    BinOp::Eq,
1574                    Box::new(key.clone()),
1575                );
1576                if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1577                    // Mission F: skip the first 4 Vec doublings.
1578                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1579                    self.catalog
1580                        .for_each_row_raw(table, |_rid, data| {
1581                            if compiled(data) {
1582                                rows.push(decode_row(schema, data));
1583                            }
1584                        })
1585                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1586                    return Ok(QueryResult::Rows { columns, rows });
1587                }
1588
1589                // Last resort: slow eq-check on materialised rows.
1590                let col_idx =
1591                    schema
1592                        .column_index(column)
1593                        .ok_or_else(|| QueryError::ColumnNotFound {
1594                            table: String::new(),
1595                            column: column.clone(),
1596                        })?;
1597                let rows: Vec<Vec<Value>> = tbl
1598                    .scan()
1599                    .filter_map(|(_, row)| {
1600                        if row[col_idx] == key_value {
1601                            Some(row)
1602                        } else {
1603                            None
1604                        }
1605                    })
1606                    .collect();
1607                Ok(QueryResult::Rows { columns, rows })
1608            }
1609
1610            PlanNode::RangeScan {
1611                table,
1612                column,
1613                start,
1614                end,
1615            } => {
1616                let tbl = self
1617                    .catalog
1618                    .get_table(table)
1619                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1620                let columns: Vec<String> =
1621                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1622                let schema = &tbl.schema;
1623
1624                let start_val = match start {
1625                    Some((expr, _)) => Some(literal_to_value(expr)?),
1626                    None => None,
1627                };
1628                let end_val = match end {
1629                    Some((expr, _)) => Some(literal_to_value(expr)?),
1630                    None => None,
1631                };
1632                let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1633                let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1634
1635                // Range scans only use the btree fast path for unique indexes,
1636                // because non-unique indexes store composite keys (column_value
1637                // + RowId) that don't directly compare against raw column values.
1638                if tbl.is_index_unique(column) == Some(true) {
1639                    if let Some(btree) = tbl.index(column) {
1640                        let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1641                            (Some(s), Some(e)) => btree.range(s, e).collect(),
1642                            (Some(s), None) => btree.range_from(s),
1643                            (None, Some(e)) => btree.range_to(e),
1644                            (None, None) => {
1645                                let rows: Vec<Vec<Value>> =
1646                                    tbl.scan().map(|(_, row)| row).collect();
1647                                return Ok(QueryResult::Rows { columns, rows });
1648                            }
1649                        };
1650                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1651                        for (key, rid) in hits {
1652                            if !start_inclusive {
1653                                if let Some(ref s) = start_val {
1654                                    if &key == s {
1655                                        continue;
1656                                    }
1657                                }
1658                            }
1659                            if !end_inclusive {
1660                                if let Some(ref e) = end_val {
1661                                    if &key == e {
1662                                        continue;
1663                                    }
1664                                }
1665                            }
1666                            if let Some(data) = tbl.heap.get(rid) {
1667                                rows.push(decode_row(schema, &data));
1668                            }
1669                        }
1670                        return Ok(QueryResult::Rows { columns, rows });
1671                    }
1672                }
1673
1674                // Fallback: no index — synthesize range predicate and scan.
1675                let fast = FastLayout::new(schema);
1676                let synth = synthesize_range_predicate(column, start, end);
1677                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1678                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1679                    self.catalog
1680                        .for_each_row_raw(table, |_rid, data| {
1681                            if compiled(data) {
1682                                rows.push(decode_row(schema, data));
1683                            }
1684                        })
1685                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1686                    return Ok(QueryResult::Rows { columns, rows });
1687                }
1688
1689                let col_idx =
1690                    schema
1691                        .column_index(column)
1692                        .ok_or_else(|| QueryError::ColumnNotFound {
1693                            table: String::new(),
1694                            column: column.clone(),
1695                        })?;
1696                let rows: Vec<Vec<Value>> = tbl
1697                    .scan()
1698                    .filter(|(_, row)| {
1699                        range_matches(
1700                            &row[col_idx],
1701                            &start_val,
1702                            start_inclusive,
1703                            &end_val,
1704                            end_inclusive,
1705                        )
1706                    })
1707                    .map(|(_, row)| row)
1708                    .collect();
1709                Ok(QueryResult::Rows { columns, rows })
1710            }
1711        }
1712    }
1713
1714    // ─── Materialized view operations ──────────────────────────────────────
1715
1716    /// Create a materialized view: execute the source query, store results
1717    /// in a new backing table, and register the view.
1718    fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1719        if self.view_registry.is_view(name) {
1720            return Err(QueryError::ViewError(format!(
1721                "materialized view '{name}' already exists"
1722            )));
1723        }
1724        // Execute the source query to get the result set.
1725        let result = self.execute_powql(query_text)?;
1726        let (columns, rows) = match result {
1727            QueryResult::Rows { columns, rows } => (columns, rows),
1728            _ => return Err("view source query must be a SELECT".into()),
1729        };
1730        // Derive a schema for the backing table from the query result columns.
1731        let schema = self.derive_view_schema(name, &columns, &rows);
1732        // Create the backing table and insert the result rows.
1733        self.catalog
1734            .create_table(schema)
1735            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1736        for row in &rows {
1737            self.catalog
1738                .insert(name, row)
1739                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1740        }
1741        // Determine which base tables this view depends on by parsing the query.
1742        let depends_on = self.extract_view_deps(query_text);
1743        self.view_registry
1744            .register(ViewDef {
1745                name: name.to_string(),
1746                query: query_text.to_string(),
1747                depends_on,
1748                dirty: false,
1749            })
1750            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1751        Ok(())
1752    }
1753
1754    /// Refresh a materialized view: re-execute its source query and replace
1755    /// the backing table's contents.
1756    fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1757        let def = self
1758            .view_registry
1759            .get(name)
1760            .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1761        let query_text = def.query.clone();
1762        // Execute the source query.
1763        let result = self.execute_powql(&query_text)?;
1764        let (_columns, rows) = match result {
1765            QueryResult::Rows { columns, rows } => (columns, rows),
1766            _ => return Err("view source query must be a SELECT".into()),
1767        };
1768        // Clear old data and insert fresh results. Mission B2: logged
1769        // variant — view refreshes are a mutation and crash recovery
1770        // must see them.
1771        self.catalog
1772            .scan_delete_matching_logged(name, |_| true)
1773            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1774        for row in &rows {
1775            self.catalog
1776                .insert(name, row)
1777                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1778        }
1779        self.view_registry.mark_clean(name);
1780        Ok(())
1781    }
1782
1783    /// Drop a materialized view: remove the backing table and unregister.
1784    fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1785        if !self.view_registry.is_view(name) {
1786            return Err(QueryError::ViewError(format!(
1787                "materialized view '{name}' not found"
1788            )));
1789        }
1790        self.view_registry
1791            .unregister(name)
1792            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1793        self.catalog
1794            .drop_table(name)
1795            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1796        Ok(())
1797    }
1798
1799    /// Derive a storage `Schema` for a view's backing table from query
1800    /// result column names and the first row's types.
1801    fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1802        use powdb_storage::types::{ColumnDef, TypeId};
1803        let cols: Vec<ColumnDef> = columns
1804            .iter()
1805            .enumerate()
1806            .map(|(i, col_name)| {
1807                let type_id = rows
1808                    .first()
1809                    .and_then(|row| row.get(i))
1810                    .map(|v| v.type_id())
1811                    .unwrap_or(TypeId::Str);
1812                ColumnDef {
1813                    name: col_name.clone(),
1814                    type_id,
1815                    required: false,
1816                    position: i as u16,
1817                }
1818            })
1819            .collect();
1820        Schema {
1821            table_name: name.to_string(),
1822            columns: cols,
1823        }
1824    }
1825
1826    /// Extract base table dependencies from a view's source query by
1827    /// parsing it and collecting the source table name.
1828    fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1829        use crate::parser::parse;
1830        match parse(query_text) {
1831            Ok(Statement::Query(q)) => {
1832                let mut deps = vec![q.source.clone()];
1833                for j in &q.joins {
1834                    deps.push(j.source.clone());
1835                }
1836                deps
1837            }
1838            _ => Vec::new(),
1839        }
1840    }
1841
1842    // ─── Specialized fast paths ─────────────────────────────────────────────
1843    //
1844    // These methods are helpers for the `execute_plan` match arms above.
1845    // Each returns `Ok(Some(result))` when the fast path fires, `Ok(None)`
1846    // when the shape isn't supported (caller falls back to generic code).
1847
1848    /// Aggregate sum/avg/min/max over a single fixed-size i64 column, with
1849    /// an optional compiled filter predicate. Walks raw row bytes — zero
1850    /// per-row allocation. Uses i128 accumulator for sum/avg overflow safety.
1851    pub(super) fn agg_single_col_fast(
1852        &self,
1853        table: &str,
1854        col: &str,
1855        function: AggFunc,
1856        predicate: Option<&Expr>,
1857    ) -> Result<Option<QueryResult>, QueryError> {
1858        let schema = self
1859            .catalog
1860            .schema(table)
1861            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1862            .clone();
1863        let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1864        let col_idx = match schema.column_index(col) {
1865            Some(i) => i,
1866            None => return Ok(None),
1867        };
1868        // Only fast-path fixed-size numeric columns (Int/Float) for
1869        // sum/avg/min/max/count. Mission D10: Float parity — prior version
1870        // bailed on Float columns, forcing them through the generic row-
1871        // decoding path that allocated a Vec<Value> per row and dispatched
1872        // on Value::cmp for every compare. f64 decode is structurally the
1873        // same as i64 (load 8 bytes, cast), so the fast path handles both.
1874        let col_type = schema.columns[col_idx].type_id;
1875        if col_type != TypeId::Int && col_type != TypeId::Float {
1876            return Ok(None);
1877        }
1878
1879        let fast = FastLayout::new(&schema);
1880        // Mission C Phase 20b: inline the numeric-column reader instead of
1881        // building a `Box<dyn Fn>`. Eliminates 100K vtable dispatches per
1882        // 100K-row agg scan — every reader call folds directly into the
1883        // hot loop below.
1884        let byte_offset = match fast.fixed_offsets[col_idx] {
1885            Some(o) => o,
1886            None => return Ok(None),
1887        };
1888        let bitmap_byte = col_idx / 8;
1889        let bitmap_bit = (col_idx % 8) as u32;
1890        let data_offset = 2 + fast.bitmap_size + byte_offset;
1891
1892        // Optional compiled filter.
1893        let compiled_pred: Option<CompiledPredicate> = match predicate {
1894            Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
1895                Some(c) => Some(c),
1896                None => return Ok(None), // let generic path handle it
1897            },
1898            None => None,
1899        };
1900
1901        // Mission C Phase 20b: specialize the inner loop per aggregate
1902        // function. The previous version ran a `match function { ... }`
1903        // *inside* the closure, which kept LLVM from producing optimal
1904        // scalar code for each variant (agg_max regressed ~23% vs the
1905        // baseline Box<dyn Fn> version even though per-row vtable cost
1906        // should have been strictly lower). Pushing the match out of the
1907        // hot loop lets each specialized body fold cleanly into
1908        // `for_each_row_raw` and removes a captured `AggFunc` + match
1909        // dispatch per row.
1910        //
1911        // Mission D10: same specialisation applies to the Float branch.
1912        // For Min/Max we use `f64::total_cmp` so the result matches
1913        // `Value::Ord` — this is the same ordering ORDER BY and the
1914        // top-N sort fast path use, keeping semantics consistent across
1915        // read paths (NaN compares as greatest, -0.0 < +0.0 for
1916        // deterministic tie-breaking).
1917        //
1918        // Mission D11 Phase 1: each inner loop now splits on presence of
1919        // a predicate (`if let Some(pred) = &compiled_pred`) so the hot
1920        // body never re-tests `Option` per row, and reads column bytes
1921        // via `read_i64_unchecked` / `read_f64_unchecked` helpers that
1922        // drop two bounds checks per row (null bitmap byte + value
1923        // slice). Safety is carried by the `FastLayout` invariant that
1924        // `data_offset + 8 <= row_len` for any fixed-size column; see
1925        // the helper doc comments. Hot loops are macro-generated so the
1926        // with-pred / no-pred split can't drift between variants.
1927        let result = match col_type {
1928            TypeId::Int => match function {
1929                AggFunc::Sum | AggFunc::Avg => {
1930                    let mut sum_i128: i128 = 0;
1931                    let mut count: i64 = 0;
1932                    agg_int_loop!(
1933                        self,
1934                        table,
1935                        compiled_pred,
1936                        bitmap_byte,
1937                        bitmap_bit,
1938                        data_offset,
1939                        |v: i64| {
1940                            count += 1;
1941                            sum_i128 += v as i128;
1942                        }
1943                    );
1944                    if matches!(function, AggFunc::Sum) {
1945                        let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
1946                        QueryResult::Scalar(Value::Int(clamped))
1947                    } else if count == 0 {
1948                        QueryResult::Scalar(Value::Empty)
1949                    } else {
1950                        let avg = (sum_i128 as f64) / (count as f64);
1951                        QueryResult::Scalar(Value::Float(avg))
1952                    }
1953                }
1954                AggFunc::Min => {
1955                    let mut min_v: Option<i64> = None;
1956                    agg_int_loop!(
1957                        self,
1958                        table,
1959                        compiled_pred,
1960                        bitmap_byte,
1961                        bitmap_bit,
1962                        data_offset,
1963                        |v: i64| {
1964                            min_v = Some(match min_v {
1965                                Some(m) => m.min(v),
1966                                None => v,
1967                            });
1968                        }
1969                    );
1970                    QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
1971                }
1972                AggFunc::Max => {
1973                    let mut max_v: Option<i64> = None;
1974                    agg_int_loop!(
1975                        self,
1976                        table,
1977                        compiled_pred,
1978                        bitmap_byte,
1979                        bitmap_bit,
1980                        data_offset,
1981                        |v: i64| {
1982                            max_v = Some(match max_v {
1983                                Some(m) => m.max(v),
1984                                None => v,
1985                            });
1986                        }
1987                    );
1988                    QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
1989                }
1990                AggFunc::Count => {
1991                    let mut count: i64 = 0;
1992                    agg_int_loop!(
1993                        self,
1994                        table,
1995                        compiled_pred,
1996                        bitmap_byte,
1997                        bitmap_bit,
1998                        data_offset,
1999                        |_v: i64| {
2000                            count += 1;
2001                        }
2002                    );
2003                    QueryResult::Scalar(Value::Int(count))
2004                }
2005                AggFunc::CountDistinct => {
2006                    let mut seen = rustc_hash::FxHashSet::default();
2007                    agg_int_loop!(
2008                        self,
2009                        table,
2010                        compiled_pred,
2011                        bitmap_byte,
2012                        bitmap_bit,
2013                        data_offset,
2014                        |v: i64| {
2015                            seen.insert(v);
2016                        }
2017                    );
2018                    QueryResult::Scalar(Value::Int(seen.len() as i64))
2019                }
2020            },
2021            TypeId::Float => match function {
2022                AggFunc::Sum => {
2023                    // Use a single f64 accumulator. Naive summation is
2024                    // sufficient for MVP parity; if precision becomes an
2025                    // issue on long scans we can upgrade to Kahan–Neumaier
2026                    // compensated sum (~2x scalar cost, zero error growth).
2027                    let mut sum: f64 = 0.0;
2028                    agg_float_loop!(
2029                        self,
2030                        table,
2031                        compiled_pred,
2032                        bitmap_byte,
2033                        bitmap_bit,
2034                        data_offset,
2035                        |v: f64| {
2036                            sum += v;
2037                        }
2038                    );
2039                    QueryResult::Scalar(Value::Float(sum))
2040                }
2041                AggFunc::Avg => {
2042                    let mut sum: f64 = 0.0;
2043                    let mut count: i64 = 0;
2044                    agg_float_loop!(
2045                        self,
2046                        table,
2047                        compiled_pred,
2048                        bitmap_byte,
2049                        bitmap_bit,
2050                        data_offset,
2051                        |v: f64| {
2052                            sum += v;
2053                            count += 1;
2054                        }
2055                    );
2056                    if count == 0 {
2057                        QueryResult::Scalar(Value::Empty)
2058                    } else {
2059                        QueryResult::Scalar(Value::Float(sum / count as f64))
2060                    }
2061                }
2062                AggFunc::Min => {
2063                    // `total_cmp` for deterministic NaN handling (matches
2064                    // Value::Ord). NaN compares greatest, so Min will
2065                    // correctly ignore it in favour of any finite value.
2066                    let mut min_v: Option<f64> = None;
2067                    agg_float_loop!(
2068                        self,
2069                        table,
2070                        compiled_pred,
2071                        bitmap_byte,
2072                        bitmap_bit,
2073                        data_offset,
2074                        |v: f64| {
2075                            min_v = Some(match min_v {
2076                                Some(m) => {
2077                                    if v.total_cmp(&m).is_lt() {
2078                                        v
2079                                    } else {
2080                                        m
2081                                    }
2082                                }
2083                                None => v,
2084                            });
2085                        }
2086                    );
2087                    QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2088                }
2089                AggFunc::Max => {
2090                    let mut max_v: Option<f64> = None;
2091                    agg_float_loop!(
2092                        self,
2093                        table,
2094                        compiled_pred,
2095                        bitmap_byte,
2096                        bitmap_bit,
2097                        data_offset,
2098                        |v: f64| {
2099                            max_v = Some(match max_v {
2100                                Some(m) => {
2101                                    if v.total_cmp(&m).is_gt() {
2102                                        v
2103                                    } else {
2104                                        m
2105                                    }
2106                                }
2107                                None => v,
2108                            });
2109                        }
2110                    );
2111                    QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2112                }
2113                AggFunc::Count => {
2114                    let mut count: i64 = 0;
2115                    agg_float_loop!(
2116                        self,
2117                        table,
2118                        compiled_pred,
2119                        bitmap_byte,
2120                        bitmap_bit,
2121                        data_offset,
2122                        |_v: f64| {
2123                            count += 1;
2124                        }
2125                    );
2126                    QueryResult::Scalar(Value::Int(count))
2127                }
2128                AggFunc::CountDistinct => {
2129                    // Hash on `f64::to_bits` — matches `Value::Hash`, so
2130                    // distinct NaN bit patterns count as distinct and
2131                    // -0.0/+0.0 count as distinct. Consistent with how
2132                    // Float values are hashed in every other DISTINCT /
2133                    // GROUP BY path.
2134                    let mut seen = rustc_hash::FxHashSet::default();
2135                    agg_float_loop!(
2136                        self,
2137                        table,
2138                        compiled_pred,
2139                        bitmap_byte,
2140                        bitmap_bit,
2141                        data_offset,
2142                        |v: f64| {
2143                            seen.insert(v.to_bits());
2144                        }
2145                    );
2146                    QueryResult::Scalar(Value::Int(seen.len() as i64))
2147                }
2148            },
2149            _ => unreachable!("type guard above restricts to Int/Float"),
2150        };
2151        Ok(Some(result))
2152    }
2153
2154    /// `Project(Limit(Filter(SeqScan)))` and `Project(Limit(SeqScan))`.
2155    /// Streams rows, decodes only projected columns, stops at the limit.
2156    pub(super) fn project_filter_limit_fast(
2157        &self,
2158        table: &str,
2159        fields: &[ProjectField],
2160        limit: usize,
2161        predicate: Option<&Expr>,
2162    ) -> Result<Option<QueryResult>, QueryError> {
2163        let schema = self
2164            .catalog
2165            .schema(table)
2166            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2167            .clone();
2168        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2169
2170        // Each projection field must be a simple `.field` reference for this
2171        // fast path. Aliased or computed fields fall through.
2172        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2173        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2174        for f in fields {
2175            let name = match &f.expr {
2176                Expr::Field(n) => n.clone(),
2177                _ => return Ok(None),
2178            };
2179            let idx = match all_columns.iter().position(|c| c == &name) {
2180                Some(i) => i,
2181                None => return Ok(None),
2182            };
2183            proj_indices.push(idx);
2184            proj_columns.push(f.alias.clone().unwrap_or(name));
2185        }
2186
2187        let fast = FastLayout::new(&schema);
2188        let row_layout = RowLayout::new(&schema);
2189
2190        let compiled_pred: Option<CompiledPredicate> = match predicate {
2191            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2192                Some(c) => Some(c),
2193                None => return Ok(None),
2194            },
2195            None => None,
2196        };
2197
2198        let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2199        // Mission D2: use try_for_each_row_raw to actually stop iterating
2200        // once the limit is reached. The previous `done` flag only short-
2201        // circuited the closure body, so a `limit 100` over 100K rows still
2202        // walked all 100K slots — burning ~30x SQLite on scan_filter_project_top100.
2203        self.catalog
2204            .try_for_each_row_raw(table, |_rid, data| {
2205                use std::ops::ControlFlow;
2206                if let Some(ref pred) = compiled_pred {
2207                    if !pred(data) {
2208                        return ControlFlow::Continue(());
2209                    }
2210                }
2211                let row: Vec<Value> = proj_indices
2212                    .iter()
2213                    .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2214                    .collect();
2215                out.push(row);
2216                if out.len() >= limit {
2217                    ControlFlow::Break(())
2218                } else {
2219                    ControlFlow::Continue(())
2220                }
2221            })
2222            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2223
2224        Ok(Some(QueryResult::Rows {
2225            columns: proj_columns,
2226            rows: out,
2227        }))
2228    }
2229
2230    /// `Project(Limit(Sort(Filter(SeqScan))))` and `Project(Limit(Sort(SeqScan)))`.
2231    /// Bounded top-N heap over the sort key. Only the sort key needs to be
2232    /// read per row; projected columns are decoded only for the final
2233    /// winning rows when the heap drains.
2234    pub(super) fn project_filter_sort_limit_fast(
2235        &self,
2236        table: &str,
2237        fields: &[ProjectField],
2238        sort_field: &str,
2239        descending: bool,
2240        limit: usize,
2241        predicate: Option<&Expr>,
2242    ) -> Result<Option<QueryResult>, QueryError> {
2243        if limit == 0 {
2244            // Degenerate case — empty result. Let the generic path handle it
2245            // for proper column naming.
2246            return Ok(None);
2247        }
2248        let schema = self
2249            .catalog
2250            .schema(table)
2251            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2252            .clone();
2253        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2254
2255        // Sort key must be a fixed-size numeric column (Int or Float).
2256        // Mission D10: extended from Int-only. Float sort keys use a
2257        // sortable-u64 transform (see `f64_to_sortable_u64`) so the heap
2258        // path stays keyed on `u64` and the whole branch shape is
2259        // identical to the Int case — no new heap types, no `total_cmp`
2260        // closures in the hot loop.
2261        let sort_idx = match schema.column_index(sort_field) {
2262            Some(i) => i,
2263            None => return Ok(None),
2264        };
2265        let sort_col_type = schema.columns[sort_idx].type_id;
2266        if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2267            return Ok(None);
2268        }
2269
2270        // Each projection field must be a simple `.field`.
2271        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2272        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2273        for f in fields {
2274            let name = match &f.expr {
2275                Expr::Field(n) => n.clone(),
2276                _ => return Ok(None),
2277            };
2278            let idx = match all_columns.iter().position(|c| c == &name) {
2279                Some(i) => i,
2280                None => return Ok(None),
2281            };
2282            proj_indices.push(idx);
2283            proj_columns.push(f.alias.clone().unwrap_or(name));
2284        }
2285
2286        let fast = FastLayout::new(&schema);
2287        let row_layout = RowLayout::new(&schema);
2288        // Mission C Phase 20b: inline numeric-column reader (no Box<dyn Fn>).
2289        let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2290            Some(o) => o,
2291            None => return Ok(None),
2292        };
2293        let sort_bitmap_byte = sort_idx / 8;
2294        let sort_bitmap_bit = (sort_idx % 8) as u32;
2295        let sort_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2296
2297        let compiled_pred: Option<CompiledPredicate> = match predicate {
2298            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2299                Some(c) => Some(c),
2300                None => return Ok(None),
2301            },
2302            None => None,
2303        };
2304
2305        // Bounded top-N heap. For `order .x desc limit N`, we want the N
2306        // largest values — use a min-heap so the smallest is at the top and
2307        // can be popped when a better candidate arrives. For ascending, use
2308        // a max-heap. We tie-break with a monotonic `seq` counter so the
2309        // result is deterministic and stable.
2310        //
2311        // To keep this simple we maintain two typed heaps and pick by
2312        // direction.
2313        let drained: Vec<Vec<u8>> = match sort_col_type {
2314            TypeId::Int => {
2315                let mut seq: u64 = 0;
2316                let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2317                    BinaryHeap::with_capacity(limit);
2318                let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2319                    BinaryHeap::with_capacity(limit);
2320
2321                self.catalog
2322                    .for_each_row_raw(table, |_rid, data| {
2323                        if let Some(ref pred) = compiled_pred {
2324                            if !pred(data) {
2325                                return;
2326                            }
2327                        }
2328                        // Inlined int-column reader: null check + i64 decode.
2329                        if data.len() < sort_data_offset + 8 {
2330                            return;
2331                        }
2332                        let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2333                        if is_null {
2334                            return;
2335                        }
2336                        let key = i64::from_le_bytes(
2337                            data[sort_data_offset..sort_data_offset + 8]
2338                                .try_into()
2339                                .unwrap_or_else(|_| unreachable!()),
2340                        );
2341                        let id = seq;
2342                        seq += 1;
2343
2344                        if descending {
2345                            if heap_desc.len() < limit {
2346                                heap_desc.push(Reverse((key, id, data.to_vec())));
2347                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2348                                if key > *top_key {
2349                                    heap_desc.pop();
2350                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2351                                }
2352                            }
2353                        } else if heap_asc.len() < limit {
2354                            heap_asc.push((key, id, data.to_vec()));
2355                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2356                            if key < *top_key {
2357                                heap_asc.pop();
2358                                heap_asc.push((key, id, data.to_vec()));
2359                            }
2360                        }
2361                    })
2362                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2363
2364                let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2365                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2366                } else {
2367                    heap_asc.into_iter().collect()
2368                };
2369                if descending {
2370                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2371                } else {
2372                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2373                }
2374                drained.into_iter().map(|(_, _, d)| d).collect()
2375            }
2376            TypeId::Float => {
2377                // Novel angle: rather than introducing a `TotalF64` newtype
2378                // with `Ord via total_cmp`, transform the f64 bit pattern
2379                // into a sortable `u64` so `BinaryHeap<u64>` orders exactly
2380                // like `f64::total_cmp` would. Classic trick: flip the sign
2381                // bit on positives, flip all bits on negatives. Result:
2382                // - NaN (sign=0) stays greatest, matching total_cmp
2383                // - -0.0 sorts before +0.0, matching total_cmp
2384                // - Hot loop is branch-cheap (one compare + one xor)
2385                let mut seq: u64 = 0;
2386                let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2387                    BinaryHeap::with_capacity(limit);
2388                let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2389                    BinaryHeap::with_capacity(limit);
2390
2391                self.catalog
2392                    .for_each_row_raw(table, |_rid, data| {
2393                        if let Some(ref pred) = compiled_pred {
2394                            if !pred(data) {
2395                                return;
2396                            }
2397                        }
2398                        if data.len() < sort_data_offset + 8 {
2399                            return;
2400                        }
2401                        let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2402                        if is_null {
2403                            return;
2404                        }
2405                        let bits = u64::from_le_bytes(
2406                            data[sort_data_offset..sort_data_offset + 8]
2407                                .try_into()
2408                                .unwrap_or_else(|_| unreachable!()),
2409                        );
2410                        let key = f64_bits_to_sortable_u64(bits);
2411                        let id = seq;
2412                        seq += 1;
2413
2414                        if descending {
2415                            if heap_desc.len() < limit {
2416                                heap_desc.push(Reverse((key, id, data.to_vec())));
2417                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2418                                if key > *top_key {
2419                                    heap_desc.pop();
2420                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2421                                }
2422                            }
2423                        } else if heap_asc.len() < limit {
2424                            heap_asc.push((key, id, data.to_vec()));
2425                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2426                            if key < *top_key {
2427                                heap_asc.pop();
2428                                heap_asc.push((key, id, data.to_vec()));
2429                            }
2430                        }
2431                    })
2432                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2433
2434                let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2435                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2436                } else {
2437                    heap_asc.into_iter().collect()
2438                };
2439                if descending {
2440                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2441                } else {
2442                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2443                }
2444                drained.into_iter().map(|(_, _, d)| d).collect()
2445            }
2446            _ => unreachable!("type guard above restricts to Int/Float"),
2447        };
2448
2449        let rows: Vec<Vec<Value>> = drained
2450            .into_iter()
2451            .map(|data| {
2452                proj_indices
2453                    .iter()
2454                    .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2455                    .collect()
2456            })
2457            .collect();
2458
2459        Ok(Some(QueryResult::Rows {
2460            columns: proj_columns,
2461            rows,
2462        }))
2463    }
2464
2465    /// Gather the RowIds that a mutation should operate on, without
2466    /// materialising the full row set. Handles the shapes the planner emits
2467    /// for update/delete: SeqScan, IndexScan, and Filter(SeqScan). Other
2468    /// shapes fall back to `generic_rid_match`.
2469    ///
2470    /// Perf sprint: try to fuse the predicate evaluation and in-place
2471    /// byte-level mutation into a single heap walk. Returns `Some(result)`
2472    /// if the fused path fired, `None` to fall through to the generic
2473    /// two-pass code.
2474    ///
2475    /// Covers two shapes:
2476    /// 1. Fixed-width non-null literal assignments on non-indexed columns
2477    ///    → byte-patch every matched row in place (row length unchanged).
2478    /// 2. Single var-col literal assignment on a non-indexed column
2479    ///    → `patch_var_column_in_place` on every matched row (may shrink);
2480    ///    rows that can't be patched in place are collected for fallback.
2481    fn try_fused_scan_update(
2482        &mut self,
2483        table: &str,
2484        predicate: &Expr,
2485        resolved: &[(usize, Value)],
2486        changed_cols: &[usize],
2487    ) -> Option<Result<QueryResult, QueryError>> {
2488        // Build compiled predicate. Requires a schema borrow that must be
2489        // dropped before we call scan_patch_matching_logged.
2490        let compiled = {
2491            let schema = self.catalog.schema(table)?;
2492            let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2493            let fast = FastLayout::new(schema);
2494            compile_predicate(predicate, &columns, &fast, schema)?
2495        };
2496
2497        // ── Path 1: fixed-width fast patch ──────────────────────────
2498        let fixed_patches: Option<Vec<FastPatch>> = {
2499            let tbl = self.catalog.get_table(table)?;
2500            let schema = &tbl.schema;
2501            let all_fixed_nonnull = resolved
2502                .iter()
2503                .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2504            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2505            if all_fixed_nonnull && no_indexed {
2506                let layout = RowLayout::new(schema);
2507                let bitmap_size = layout.bitmap_size();
2508                Some(
2509                    resolved
2510                        .iter()
2511                        .map(|(idx, val)| {
2512                            let fixed_off = layout
2513                                .fixed_offset(*idx)
2514                                .expect("is_fixed_size already checked");
2515                            let field_off = 2 + bitmap_size + fixed_off;
2516                            let bytes: FixedBytes = match val {
2517                                Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2518                                Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2519                                Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2520                                Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2521                                Value::Uuid(v) => FixedBytes::Uuid(*v),
2522                                _ => unreachable!("all_fixed_nonnull guard"),
2523                            };
2524                            FastPatch {
2525                                field_off,
2526                                bitmap_byte_off: 2 + idx / 8,
2527                                bit_mask: 1u8 << (idx % 8),
2528                                bytes,
2529                            }
2530                        })
2531                        .collect(),
2532                )
2533            } else {
2534                None
2535            }
2536        };
2537        if let Some(patches) = fixed_patches {
2538            let result = self
2539                .catalog
2540                .scan_patch_matching_logged(table, compiled, |row| {
2541                    for p in &patches {
2542                        row[p.bitmap_byte_off] &= !p.bit_mask;
2543                        let field_bytes = p.bytes.as_slice();
2544                        row[p.field_off..p.field_off + field_bytes.len()]
2545                            .copy_from_slice(field_bytes);
2546                    }
2547                    Some(row.len() as u16)
2548                })
2549                .map_err(|e| e.to_string());
2550            match result {
2551                Ok((count, _)) => {
2552                    self.view_registry.mark_dependents_dirty(table);
2553                    return Some(Ok(QueryResult::Modified(count)));
2554                }
2555                Err(e) => return Some(Err(QueryError::Execution(e))),
2556            }
2557        }
2558
2559        // ── Path 2: single var-col shrink fast patch ────────────────
2560        let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2561            let tbl = self.catalog.get_table(table)?;
2562            let schema = &tbl.schema;
2563            let is_single = resolved.len() == 1;
2564            let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2565            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2566            if is_single && is_var && no_indexed {
2567                let (idx, val) = &resolved[0];
2568                let bytes_opt = match val {
2569                    Value::Str(s) => Some(s.as_bytes().to_vec()),
2570                    Value::Bytes(b) => Some(b.clone()),
2571                    Value::Empty => None,
2572                    _ => return None, // type mismatch, fall through
2573                };
2574                Some((*idx, bytes_opt))
2575            } else {
2576                None
2577            }
2578        };
2579        if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2580            // Build a fresh RowLayout before the mutable borrow.
2581            let layout = {
2582                let schema = self.catalog.schema(table)?;
2583                RowLayout::new(schema)
2584            };
2585            let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2586            let result = self
2587                .catalog
2588                .scan_patch_matching_logged(table, compiled, |row| {
2589                    patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2590                })
2591                .map_err(|e| e.to_string());
2592            match result {
2593                Ok((mut count, fallback_rids)) => {
2594                    // Handle rows where in-place patch failed (new > old).
2595                    for rid in fallback_rids {
2596                        let mut row = match self.catalog.get(table, rid) {
2597                            Some(r) => r,
2598                            None => continue,
2599                        };
2600                        for (idx, val) in resolved.iter() {
2601                            row[*idx] = val.clone();
2602                        }
2603                        self.catalog
2604                            .update_hinted(table, rid, &row, Some(changed_cols))
2605                            .map_err(|e| e.to_string())
2606                            .ok();
2607                        count += 1;
2608                    }
2609                    self.view_registry.mark_dependents_dirty(table);
2610                    return Some(Ok(QueryResult::Modified(count)));
2611                }
2612                Err(e) => return Some(Err(QueryError::Execution(e))),
2613            }
2614        }
2615
2616        None // no fused path applicable — fall through
2617    }
2618
2619    /// Mission C Phase 3: schema is looked up via `self.catalog.schema(table)`
2620    /// inside the branches that actually need it. Previously the caller had
2621    /// to clone the full Schema (6+ String allocs) before every mutation just
2622    /// so this function could borrow it — a cost the update/delete hot path
2623    /// did not need.
2624    fn collect_rids_for_mutation(
2625        &mut self,
2626        input: &PlanNode,
2627        table: &str,
2628    ) -> Result<Vec<RowId>, QueryError> {
2629        match input {
2630            PlanNode::SeqScan { table: t } if t == table => {
2631                // "Update/delete everything" — rare but legal.
2632                let rids: Vec<RowId> = self
2633                    .catalog
2634                    .scan(table)
2635                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2636                    .map(|(rid, _)| rid)
2637                    .collect();
2638                Ok(rids)
2639            }
2640            PlanNode::IndexScan {
2641                table: t,
2642                column,
2643                key,
2644            } if t == table => {
2645                let key_value = literal_to_value(key)?;
2646
2647                // Indexed case: single lookup, 0 or 1 rows.
2648                // Mission D7: int-specialized fast path on int-keyed indexes
2649                // (primary keys, created_at, etc.) — the common case for
2650                // `update_by_pk` / `delete where id = ?`.
2651                //
2652                // Scope the `tbl` borrow so it's released before we fall
2653                // through to the scan-based paths below (which reborrow
2654                // `self.catalog`).
2655                {
2656                    let tbl = self
2657                        .catalog
2658                        .get_table(table)
2659                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2660                    if tbl.has_index(column) {
2661                        let rids = tbl.index_lookup_all(column, &key_value);
2662                        return Ok(rids);
2663                    }
2664                }
2665
2666                // No index: the planner folds `.col = literal` to IndexScan
2667                // regardless of whether the column is actually unique. When
2668                // there's no index we must behave like Filter(SeqScan) and
2669                // return *all* matching RIDs — not just the first one.
2670                let schema = self
2671                    .catalog
2672                    .schema(table)
2673                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2674                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2675                let fast = FastLayout::new(schema);
2676                let synth = Expr::BinaryOp(
2677                    Box::new(Expr::Field(column.clone())),
2678                    BinOp::Eq,
2679                    Box::new(key.clone()),
2680                );
2681                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2682                    // Mission F: skip the first 4 Vec doublings.
2683                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2684                    self.catalog
2685                        .for_each_row_raw(table, |rid, data| {
2686                            if compiled(data) {
2687                                rids.push(rid);
2688                            }
2689                        })
2690                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2691                    return Ok(rids);
2692                }
2693
2694                // Fallback: decode each row, compare values.
2695                let col_idx =
2696                    schema
2697                        .column_index(column)
2698                        .ok_or_else(|| QueryError::ColumnNotFound {
2699                            table: String::new(),
2700                            column: column.clone(),
2701                        })?;
2702                let rids: Vec<RowId> = self
2703                    .catalog
2704                    .scan(table)
2705                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2706                    .filter_map(|(rid, row)| {
2707                        if row[col_idx] == key_value {
2708                            Some(rid)
2709                        } else {
2710                            None
2711                        }
2712                    })
2713                    .collect();
2714                Ok(rids)
2715            }
2716            PlanNode::Filter {
2717                input: inner,
2718                predicate,
2719            } => {
2720                if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2721                    if t != table {
2722                        return self.generic_rid_match(input, table);
2723                    }
2724                    let schema = self
2725                        .catalog
2726                        .schema(table)
2727                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2728                    let columns: Vec<String> =
2729                        schema.columns.iter().map(|c| c.name.clone()).collect();
2730                    let fast = FastLayout::new(schema);
2731                    let row_layout = RowLayout::new(schema);
2732
2733                    // Try compiled predicate first.
2734                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2735                        // Mission F: skip the first 4 Vec doublings.
2736                        let mut rids: Vec<RowId> = Vec::with_capacity(64);
2737                        self.catalog
2738                            .for_each_row_raw(table, |rid, data| {
2739                                if compiled(data) {
2740                                    rids.push(rid);
2741                                }
2742                            })
2743                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2744                        return Ok(rids);
2745                    }
2746
2747                    // Fallback: selective decode + eval.
2748                    let pred_cols = predicate_column_indices(predicate, &columns);
2749                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2750                    self.catalog
2751                        .for_each_row_raw(table, |rid, data| {
2752                            let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2753                            if eval_predicate(predicate, &pred_row, &columns) {
2754                                rids.push(rid);
2755                            }
2756                        })
2757                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2758                    return Ok(rids);
2759                }
2760                self.generic_rid_match(input, table)
2761            }
2762            _ => self.generic_rid_match(input, table),
2763        }
2764    }
2765
2766    /// Last-ditch generic match: execute the plan, collect matching rows,
2767    /// then find corresponding RowIds by value equality. This is the old
2768    /// O(N*M) code path; only used when the plan shape is something exotic.
2769    fn generic_rid_match(
2770        &mut self,
2771        input: &PlanNode,
2772        table: &str,
2773    ) -> Result<Vec<RowId>, QueryError> {
2774        let result = self.execute_plan(input)?;
2775        let rows = match result {
2776            QueryResult::Rows { rows, .. } => rows,
2777            _ => return Err("mutation source must be rows".into()),
2778        };
2779        let matching: Vec<RowId> = self
2780            .catalog
2781            .scan(table)
2782            .map_err(|e| QueryError::StorageError(e.to_string()))?
2783            .filter(|(_, row)| rows.iter().any(|r| r == row))
2784            .map(|(rid, _)| rid)
2785            .collect();
2786        Ok(matching)
2787    }
2788}
2789
2790pub(super) fn execute_window(
2791    result: QueryResult,
2792    windows: &[WindowDef],
2793) -> Result<QueryResult, QueryError> {
2794    let (mut columns, mut rows) = match result {
2795        QueryResult::Rows { columns, rows } => (columns, rows),
2796        _ => return Err("window function requires row input".into()),
2797    };
2798
2799    for wdef in windows {
2800        // Resolve partition/order column indices against current columns.
2801        let part_indices: Vec<usize> = wdef
2802            .partition_by
2803            .iter()
2804            .map(|name| {
2805                columns
2806                    .iter()
2807                    .position(|c| c == name)
2808                    .ok_or_else(|| format!("window partition column '{name}' not found"))
2809            })
2810            .collect::<Result<Vec<_>, _>>()?;
2811
2812        let ord_indices: Vec<(usize, bool)> = wdef
2813            .order_by
2814            .iter()
2815            .map(|sk| {
2816                columns
2817                    .iter()
2818                    .position(|c| c == &sk.field)
2819                    .map(|i| (i, sk.descending))
2820                    .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2821            })
2822            .collect::<Result<Vec<_>, _>>()?;
2823
2824        // Resolve the argument column index (for aggregate windows).
2825        let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2826            match arg {
2827                Expr::Field(name) => {
2828                    if name == "*" {
2829                        None // count(*) style — no specific column
2830                    } else {
2831                        Some(
2832                            columns
2833                                .iter()
2834                                .position(|c| c == name)
2835                                .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2836                        )
2837                    }
2838                }
2839                _ => None,
2840            }
2841        } else {
2842            None
2843        };
2844
2845        // Build a sort-index to sort rows by partition_by then order_by
2846        // without actually reordering the original Vec (we need original
2847        // order to write results back).
2848        let n = rows.len();
2849        let mut indices: Vec<usize> = (0..n).collect();
2850        indices.sort_by(|&a, &b| {
2851            // Compare partition keys first.
2852            for &pi in &part_indices {
2853                let cmp = rows[a][pi].cmp(&rows[b][pi]);
2854                if cmp != std::cmp::Ordering::Equal {
2855                    return cmp;
2856                }
2857            }
2858            // Then order keys.
2859            for &(oi, desc) in &ord_indices {
2860                let cmp = rows[a][oi].cmp(&rows[b][oi]);
2861                if cmp != std::cmp::Ordering::Equal {
2862                    return if desc { cmp.reverse() } else { cmp };
2863                }
2864            }
2865            std::cmp::Ordering::Equal
2866        });
2867
2868        // Compute window values in sorted order, tracking partition boundaries.
2869        let mut win_values: Vec<Value> = vec![Value::Empty; n];
2870        let mut partition_start = 0usize;
2871        // Running state for aggregate windows:
2872        let mut running_count: i64 = 0;
2873        let mut running_int_sum: i64 = 0;
2874        let mut running_float_sum: f64 = 0.0;
2875        let mut running_saw_float = false;
2876        let mut running_min: Option<Value> = None;
2877        let mut running_max: Option<Value> = None;
2878        let mut rank_counter: i64 = 0;
2879        let mut dense_rank_counter: i64 = 0;
2880        let mut prev_order_key: Option<Vec<Value>> = None;
2881        let mut same_rank_count: i64 = 0;
2882
2883        for sorted_pos in 0..n {
2884            let row_idx = indices[sorted_pos];
2885
2886            // Detect partition boundary.
2887            let new_partition = if sorted_pos == 0 {
2888                true
2889            } else {
2890                let prev_row_idx = indices[sorted_pos - 1];
2891                part_indices
2892                    .iter()
2893                    .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
2894            };
2895
2896            if new_partition {
2897                partition_start = sorted_pos;
2898                running_count = 0;
2899                running_int_sum = 0;
2900                running_float_sum = 0.0;
2901                running_saw_float = false;
2902                running_min = None;
2903                running_max = None;
2904                rank_counter = 0;
2905                dense_rank_counter = 0;
2906                prev_order_key = None;
2907                same_rank_count = 0;
2908            }
2909
2910            // Extract current order key for rank tracking.
2911            let current_order_key: Vec<Value> = ord_indices
2912                .iter()
2913                .map(|&(oi, _)| rows[row_idx][oi].clone())
2914                .collect();
2915            let same_as_prev = prev_order_key.as_ref() == Some(&current_order_key);
2916
2917            let value = match wdef.function {
2918                WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
2919                WindowFunc::Rank => {
2920                    if same_as_prev {
2921                        same_rank_count += 1;
2922                    } else {
2923                        rank_counter += same_rank_count + 1;
2924                        same_rank_count = 0;
2925                        if rank_counter == 0 {
2926                            rank_counter = 1;
2927                        }
2928                    }
2929                    Value::Int(rank_counter)
2930                }
2931                WindowFunc::DenseRank => {
2932                    if !same_as_prev {
2933                        dense_rank_counter += 1;
2934                    }
2935                    Value::Int(dense_rank_counter)
2936                }
2937                WindowFunc::Sum => {
2938                    if let Some(ci) = arg_col_idx {
2939                        match &rows[row_idx][ci] {
2940                            Value::Int(v) => running_int_sum += v,
2941                            Value::Float(v) => {
2942                                running_float_sum += v;
2943                                running_saw_float = true;
2944                            }
2945                            _ => {}
2946                        }
2947                    }
2948                    if running_saw_float {
2949                        Value::Float(running_float_sum + running_int_sum as f64)
2950                    } else {
2951                        Value::Int(running_int_sum)
2952                    }
2953                }
2954                WindowFunc::Avg => {
2955                    if let Some(ci) = arg_col_idx {
2956                        match &rows[row_idx][ci] {
2957                            Value::Int(v) => {
2958                                running_float_sum += *v as f64;
2959                                running_count += 1;
2960                            }
2961                            Value::Float(v) => {
2962                                running_float_sum += v;
2963                                running_count += 1;
2964                            }
2965                            _ => {}
2966                        }
2967                    }
2968                    if running_count == 0 {
2969                        Value::Empty
2970                    } else {
2971                        Value::Float(running_float_sum / running_count as f64)
2972                    }
2973                }
2974                WindowFunc::Count => {
2975                    if let Some(ci) = arg_col_idx {
2976                        if !rows[row_idx][ci].is_empty() {
2977                            running_count += 1;
2978                        }
2979                    } else {
2980                        // count(*) — count all rows
2981                        running_count += 1;
2982                    }
2983                    Value::Int(running_count)
2984                }
2985                WindowFunc::Min => {
2986                    if let Some(ci) = arg_col_idx {
2987                        let v = &rows[row_idx][ci];
2988                        if !v.is_empty() {
2989                            running_min = Some(match &running_min {
2990                                None => v.clone(),
2991                                Some(cur) => {
2992                                    if v < cur {
2993                                        v.clone()
2994                                    } else {
2995                                        cur.clone()
2996                                    }
2997                                }
2998                            });
2999                        }
3000                    }
3001                    running_min.clone().unwrap_or(Value::Empty)
3002                }
3003                WindowFunc::Max => {
3004                    if let Some(ci) = arg_col_idx {
3005                        let v = &rows[row_idx][ci];
3006                        if !v.is_empty() {
3007                            running_max = Some(match &running_max {
3008                                None => v.clone(),
3009                                Some(cur) => {
3010                                    if v > cur {
3011                                        v.clone()
3012                                    } else {
3013                                        cur.clone()
3014                                    }
3015                                }
3016                            });
3017                        }
3018                    }
3019                    running_max.clone().unwrap_or(Value::Empty)
3020                }
3021            };
3022
3023            prev_order_key = Some(current_order_key);
3024            win_values[row_idx] = value;
3025        }
3026
3027        // Append the computed window column to each row.
3028        for (ri, row) in rows.iter_mut().enumerate() {
3029            row.push(win_values[ri].clone());
3030        }
3031        columns.push(wdef.output_name.clone());
3032    }
3033
3034    Ok(QueryResult::Rows { columns, rows })
3035}
3036
3037/// Mission E2b: compute one aggregate over a set of rows in a group.
3038pub(super) fn compute_group_aggregate(
3039    func: AggFunc,
3040    all_rows: &[Vec<Value>],
3041    row_indices: &[usize],
3042    col_idx: usize,
3043) -> Value {
3044    match func {
3045        AggFunc::Count => {
3046            if col_idx == usize::MAX {
3047                // count(*) — count all rows in the group.
3048                return Value::Int(row_indices.len() as i64);
3049            }
3050            let count = row_indices
3051                .iter()
3052                .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3053                .count();
3054            Value::Int(count as i64)
3055        }
3056        AggFunc::CountDistinct => {
3057            let mut seen = std::collections::HashSet::new();
3058            for &ri in row_indices {
3059                let v = &all_rows[ri][col_idx];
3060                if !v.is_empty() {
3061                    seen.insert(v.clone());
3062                }
3063            }
3064            Value::Int(seen.len() as i64)
3065        }
3066        AggFunc::Sum => {
3067            // Mirror the scalar Sum path: accumulate int and float
3068            // contributions separately and promote the final result to
3069            // Float if any Float row was observed. Prevents silent
3070            // drop of Float columns in GROUP BY aggregates.
3071            let mut int_sum: i64 = 0;
3072            let mut float_sum: f64 = 0.0;
3073            let mut saw_float = false;
3074            for &ri in row_indices {
3075                match &all_rows[ri][col_idx] {
3076                    Value::Int(v) => int_sum += v,
3077                    Value::Float(v) => {
3078                        float_sum += *v;
3079                        saw_float = true;
3080                    }
3081                    _ => {}
3082                }
3083            }
3084            if saw_float {
3085                Value::Float(float_sum + int_sum as f64)
3086            } else {
3087                Value::Int(int_sum)
3088            }
3089        }
3090        AggFunc::Avg => {
3091            let mut sum = 0.0f64;
3092            let mut count = 0usize;
3093            for &ri in row_indices {
3094                match &all_rows[ri][col_idx] {
3095                    Value::Int(v) => {
3096                        sum += *v as f64;
3097                        count += 1;
3098                    }
3099                    Value::Float(v) => {
3100                        sum += *v;
3101                        count += 1;
3102                    }
3103                    _ => {}
3104                }
3105            }
3106            if count == 0 {
3107                Value::Empty
3108            } else {
3109                Value::Float(sum / count as f64)
3110            }
3111        }
3112        AggFunc::Min => row_indices
3113            .iter()
3114            .map(|&ri| &all_rows[ri][col_idx])
3115            .filter(|v| !v.is_empty())
3116            .min()
3117            .cloned()
3118            .unwrap_or(Value::Empty),
3119        AggFunc::Max => row_indices
3120            .iter()
3121            .map(|&ri| &all_rows[ri][col_idx])
3122            .filter(|v| !v.is_empty())
3123            .max()
3124            .cloned()
3125            .unwrap_or(Value::Empty),
3126    }
3127}
3128
3129/// Mission E1.3: try to extract equi-join key indices from a join `on`
3130/// predicate. Returns `Some((left_col_idx, right_col_idx))` when the
3131/// predicate is exactly `L = R` (or `R = L`) and both sides resolve
3132/// cleanly — `L` to the left subtree's column list and `R` to the right
3133/// subtree's column list.
3134///
3135/// This is deliberately narrow. We only recognise the two shapes:
3136///   * `QualifiedField = QualifiedField`  (`u.id = o.user_id`)
3137///   * `Field = Field`                    (`.id = .user_id`, unqualified)
3138///
3139/// Anything else — conjunctions, constants, function calls, or predicates
3140/// that touch the same side on both halves — falls through to the
3141/// nested-loop path unchanged.
3142pub(super) fn try_extract_equi_join_keys(
3143    pred: &Expr,
3144    left_columns: &[String],
3145    right_columns: &[String],
3146) -> Option<(usize, usize)> {
3147    let (lhs, op, rhs) = match pred {
3148        Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3149        _ => return None,
3150    };
3151    if op != BinOp::Eq {
3152        return None;
3153    }
3154    // Normal orientation: lhs in left, rhs in right.
3155    if let (Some(li), Some(ri)) = (
3156        resolve_side_column(lhs, left_columns),
3157        resolve_side_column(rhs, right_columns),
3158    ) {
3159        return Some((li, ri));
3160    }
3161    // Swapped: rhs in left, lhs in right. Both sides of `=` are
3162    // commutative so this is safe.
3163    if let (Some(li), Some(ri)) = (
3164        resolve_side_column(rhs, left_columns),
3165        resolve_side_column(lhs, right_columns),
3166    ) {
3167        return Some((li, ri));
3168    }
3169    None
3170}
3171
3172fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3173    match expr {
3174        Expr::QualifiedField { qualifier, field } => {
3175            // Byte-level match so we don't allocate a fresh `format!` on
3176            // every call — this runs once per plan, so allocation would be
3177            // cheap, but the match is trivial enough to keep inline with
3178            // the eval_expr version.
3179            let q = qualifier.as_bytes();
3180            let f = field.as_bytes();
3181            columns.iter().position(|c| {
3182                let b = c.as_bytes();
3183                b.len() == q.len() + 1 + f.len()
3184                    && b[..q.len()] == *q
3185                    && b[q.len()] == b'.'
3186                    && b[q.len() + 1..] == *f
3187            })
3188        }
3189        Expr::Field(name) => columns.iter().position(|c| c == name),
3190        _ => None,
3191    }
3192}
3193
3194/// Mission E1.3: O(L + R) hash join. Builds a `FxHashMap<Value, Vec<usize>>`
3195/// over the right (inner) side's join keys, then streams the left (outer)
3196/// side and for each probe row emits every combined row whose right-side
3197/// key matches. For `JoinKind::LeftOuter`, unmatched left rows are emitted
3198/// padded with `Value::Empty` on the right side.
3199///
3200/// The right side is always the build side. That choice is forced for
3201/// LeftOuter (the left side must stream so we can detect orphans), and
3202/// for Inner it's a reasonable default — left-deep plans tend to grow the
3203/// left side with each join, so the un-joined right leaf is often the
3204/// smaller of the two at each level.
3205pub(super) fn hash_join(
3206    left_columns: Vec<String>,
3207    left_rows: Vec<Vec<Value>>,
3208    right_columns: Vec<String>,
3209    right_rows: Vec<Vec<Value>>,
3210    left_key_idx: usize,
3211    right_key_idx: usize,
3212    kind: JoinKind,
3213) -> QueryResult {
3214    use rustc_hash::FxHashMap;
3215
3216    let n_left = left_columns.len();
3217    let n_right = right_columns.len();
3218    let mut columns = Vec::with_capacity(n_left + n_right);
3219    columns.extend(left_columns);
3220    columns.extend(right_columns);
3221
3222    // Build: right_key -> list of right-row indices. Pre-size to the row
3223    // count so the map doesn't rehash mid-build.
3224    let mut build: FxHashMap<Value, Vec<usize>> =
3225        FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3226    for (i, row) in right_rows.iter().enumerate() {
3227        // Skip Empty keys on the build side — they can never match under
3228        // SQL semantics (NULL ≠ NULL) and would collapse all nullables to
3229        // one bucket.
3230        if matches!(row[right_key_idx], Value::Empty) {
3231            continue;
3232        }
3233        build.entry(row[right_key_idx].clone()).or_default().push(i);
3234    }
3235
3236    // Reasonable starting capacity — inner joins produce ≥ left_rows.len()
3237    // rows in the common 1:1 case, left-outer always emits ≥ left_rows.len().
3238    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3239
3240    for left_row in &left_rows {
3241        let key = &left_row[left_key_idx];
3242        let matched = if matches!(key, Value::Empty) {
3243            None
3244        } else {
3245            build.get(key)
3246        };
3247        match matched {
3248            Some(matches) if !matches.is_empty() => {
3249                for &ri in matches {
3250                    let right_row = &right_rows[ri];
3251                    let mut combined = Vec::with_capacity(n_left + n_right);
3252                    combined.extend_from_slice(left_row);
3253                    combined.extend_from_slice(right_row);
3254                    rows.push(combined);
3255                }
3256            }
3257            _ => {
3258                if matches!(kind, JoinKind::LeftOuter) {
3259                    let mut row = Vec::with_capacity(n_left + n_right);
3260                    row.extend_from_slice(left_row);
3261                    row.resize(n_left + n_right, Value::Empty);
3262                    rows.push(row);
3263                }
3264            }
3265        }
3266    }
3267
3268    QueryResult::Rows { columns, rows }
3269}
3270
3271/// Lower unindexed `RangeScan` nodes to `Filter(SeqScan)` so that all
3272/// downstream fast paths (count, project+limit, sort+limit, agg, update,
3273/// delete) continue to fire.
3274///
3275/// The planner emits `RangeScan` speculatively for every range inequality
3276/// (`.age > 30`) because it has no catalog access. When the column has a
3277/// B-tree index, `RangeScan` is the correct plan. When it doesn't, the
3278/// executor's `RangeScan` fallback materialises every matching row with
3279/// full `decode_row` — bypassing the compiled-predicate fast paths that
3280/// `Filter(SeqScan)` would trigger.
3281///
3282/// This pass runs once per query, before execution.
3283pub(super) fn lower_unindexed_range_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3284    match plan {
3285        PlanNode::RangeScan {
3286            table,
3287            column,
3288            start,
3289            end,
3290        } => {
3291            if let Some(tbl) = catalog.get_table(table) {
3292                // Keep RangeScan only for unique indexes — their btree
3293                // stores raw column values. Non-unique indexes store
3294                // composite keys that don't directly compare against
3295                // column values, so lower them to Filter(SeqScan).
3296                if tbl.is_index_unique(column) == Some(true) {
3297                    return plan.clone();
3298                }
3299            }
3300            let pred = synthesize_range_predicate(column, start, end);
3301            PlanNode::Filter {
3302                input: Box::new(PlanNode::SeqScan {
3303                    table: table.clone(),
3304                }),
3305                predicate: pred,
3306            }
3307        }
3308        PlanNode::Filter { input, predicate } => PlanNode::Filter {
3309            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3310            predicate: predicate.clone(),
3311        },
3312        PlanNode::Project { input, fields } => PlanNode::Project {
3313            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3314            fields: fields.clone(),
3315        },
3316        PlanNode::Sort { input, keys } => PlanNode::Sort {
3317            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3318            keys: keys.clone(),
3319        },
3320        PlanNode::Limit { input, count } => PlanNode::Limit {
3321            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3322            count: count.clone(),
3323        },
3324        PlanNode::Offset { input, count } => PlanNode::Offset {
3325            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3326            count: count.clone(),
3327        },
3328        PlanNode::Aggregate {
3329            input,
3330            function,
3331            field,
3332        } => PlanNode::Aggregate {
3333            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3334            function: *function,
3335            field: field.clone(),
3336        },
3337        PlanNode::Distinct { input } => PlanNode::Distinct {
3338            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3339        },
3340        PlanNode::GroupBy {
3341            input,
3342            keys,
3343            aggregates,
3344            having,
3345        } => PlanNode::GroupBy {
3346            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3347            keys: keys.clone(),
3348            aggregates: aggregates.clone(),
3349            having: having.clone(),
3350        },
3351        PlanNode::Update {
3352            input,
3353            table,
3354            assignments,
3355        } => PlanNode::Update {
3356            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3357            table: table.clone(),
3358            assignments: assignments.clone(),
3359        },
3360        PlanNode::Delete { input, table } => PlanNode::Delete {
3361            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3362            table: table.clone(),
3363        },
3364        PlanNode::Window { input, windows } => PlanNode::Window {
3365            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3366            windows: windows.clone(),
3367        },
3368        PlanNode::Union { left, right, all } => PlanNode::Union {
3369            left: Box::new(lower_unindexed_range_scans(catalog, left)),
3370            right: Box::new(lower_unindexed_range_scans(catalog, right)),
3371            all: *all,
3372        },
3373        PlanNode::Explain { input } => PlanNode::Explain {
3374            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3375        },
3376        PlanNode::NestedLoopJoin {
3377            left,
3378            right,
3379            on,
3380            kind,
3381        } => PlanNode::NestedLoopJoin {
3382            left: Box::new(lower_unindexed_range_scans(catalog, left)),
3383            right: Box::new(lower_unindexed_range_scans(catalog, right)),
3384            on: on.clone(),
3385            kind: *kind,
3386        },
3387        // Leaf nodes: no children to recurse into.
3388        _ => plan.clone(),
3389    }
3390}
3391
3392/// Synthesize a range predicate from RangeScan bounds for the fallback path.
3393pub(super) fn synthesize_range_predicate(
3394    column: &str,
3395    start: &Option<(Expr, bool)>,
3396    end: &Option<(Expr, bool)>,
3397) -> Expr {
3398    let lower = start.as_ref().map(|(expr, inclusive)| {
3399        let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3400        Expr::BinaryOp(
3401            Box::new(Expr::Field(column.to_string())),
3402            op,
3403            Box::new(expr.clone()),
3404        )
3405    });
3406    let upper = end.as_ref().map(|(expr, inclusive)| {
3407        let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3408        Expr::BinaryOp(
3409            Box::new(Expr::Field(column.to_string())),
3410            op,
3411            Box::new(expr.clone()),
3412        )
3413    });
3414    match (lower, upper) {
3415        (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3416        (Some(l), None) => l,
3417        (None, Some(u)) => u,
3418        (None, None) => Expr::Literal(Literal::Bool(true)),
3419    }
3420}
3421
3422/// Check if a value falls within a range (used in last-resort decoded-row eval).
3423pub(super) fn range_matches(
3424    val: &Value,
3425    start: &Option<Value>,
3426    start_inc: bool,
3427    end: &Option<Value>,
3428    end_inc: bool,
3429) -> bool {
3430    if let Some(ref s) = start {
3431        if start_inc {
3432            if val < s {
3433                return false;
3434            }
3435        } else if val <= s {
3436            return false;
3437        }
3438    }
3439    if let Some(ref e) = end {
3440        if end_inc {
3441            if val > e {
3442                return false;
3443            }
3444        } else if val >= e {
3445            return false;
3446        }
3447    }
3448    true
3449}
3450
3451/// Format a `PlanNode` tree as a human-readable, indented text
3452/// representation. Used by the `EXPLAIN` command.
3453pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3454    let indent = "  ".repeat(depth);
3455    match plan {
3456        PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3457        PlanNode::AliasScan { table, alias } => {
3458            format!("{indent}AliasScan table={table} alias={alias}")
3459        }
3460        PlanNode::IndexScan { table, column, key } => {
3461            format!("{indent}IndexScan table={table} column={column} key={key:?}")
3462        }
3463        PlanNode::RangeScan {
3464            table,
3465            column,
3466            start,
3467            end,
3468        } => {
3469            let s = match start {
3470                Some((expr, inc)) => {
3471                    let op = if *inc { ">=" } else { ">" };
3472                    format!("{op}{expr:?}")
3473                }
3474                None => "unbounded".to_string(),
3475            };
3476            let e = match end {
3477                Some((expr, inc)) => {
3478                    let op = if *inc { "<=" } else { "<" };
3479                    format!("{op}{expr:?}")
3480                }
3481                None => "unbounded".to_string(),
3482            };
3483            format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3484        }
3485        PlanNode::Filter { input, predicate } => {
3486            let child = format_plan_tree(input, depth + 1);
3487            format!("{indent}Filter predicate={predicate:?}\n{child}")
3488        }
3489        PlanNode::Project { input, fields } => {
3490            let names: Vec<String> = fields
3491                .iter()
3492                .map(|f| match &f.alias {
3493                    Some(a) => format!("{a}: {:?}", f.expr),
3494                    None => format!("{:?}", f.expr),
3495                })
3496                .collect();
3497            let child = format_plan_tree(input, depth + 1);
3498            format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3499        }
3500        PlanNode::Sort { input, keys } => {
3501            let ks: Vec<String> = keys
3502                .iter()
3503                .map(|k| {
3504                    if k.descending {
3505                        format!("{} desc", k.field)
3506                    } else {
3507                        k.field.clone()
3508                    }
3509                })
3510                .collect();
3511            let child = format_plan_tree(input, depth + 1);
3512            format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3513        }
3514        PlanNode::Limit { input, count } => {
3515            let child = format_plan_tree(input, depth + 1);
3516            format!("{indent}Limit count={count:?}\n{child}")
3517        }
3518        PlanNode::Offset { input, count } => {
3519            let child = format_plan_tree(input, depth + 1);
3520            format!("{indent}Offset count={count:?}\n{child}")
3521        }
3522        PlanNode::Aggregate {
3523            input,
3524            function,
3525            field,
3526        } => {
3527            let f = field.as_deref().unwrap_or("*");
3528            let child = format_plan_tree(input, depth + 1);
3529            format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3530        }
3531        PlanNode::NestedLoopJoin {
3532            left,
3533            right,
3534            on,
3535            kind,
3536        } => {
3537            let left_child = format_plan_tree(left, depth + 1);
3538            let right_child = format_plan_tree(right, depth + 1);
3539            let on_str = match on {
3540                Some(pred) => format!("{pred:?}"),
3541                None => "none".to_string(),
3542            };
3543            format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3544        }
3545        PlanNode::Distinct { input } => {
3546            let child = format_plan_tree(input, depth + 1);
3547            format!("{indent}Distinct\n{child}")
3548        }
3549        PlanNode::GroupBy {
3550            input,
3551            keys,
3552            aggregates,
3553            having,
3554        } => {
3555            let agg_strs: Vec<String> = aggregates
3556                .iter()
3557                .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3558                .collect();
3559            let having_str = match having {
3560                Some(h) => format!(" having={h:?}"),
3561                None => String::new(),
3562            };
3563            let child = format_plan_tree(input, depth + 1);
3564            format!(
3565                "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3566                keys.join(", "),
3567                agg_strs.join(", "),
3568            )
3569        }
3570        PlanNode::Insert { table, assignments } => {
3571            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3572            format!("{indent}Insert table={table} cols=[{}]", cols.join(", "))
3573        }
3574        PlanNode::Upsert {
3575            table,
3576            key_column,
3577            assignments,
3578            on_conflict,
3579        } => {
3580            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3581            let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3582            if conflict_cols.is_empty() {
3583                format!(
3584                    "{indent}Upsert table={table} key={key_column} cols=[{}]",
3585                    cols.join(", ")
3586                )
3587            } else {
3588                format!(
3589                    "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3590                    cols.join(", "),
3591                    conflict_cols.join(", ")
3592                )
3593            }
3594        }
3595        PlanNode::Update {
3596            input,
3597            table,
3598            assignments,
3599        } => {
3600            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3601            let child = format_plan_tree(input, depth + 1);
3602            format!(
3603                "{indent}Update table={table} set=[{}]\n{child}",
3604                cols.join(", ")
3605            )
3606        }
3607        PlanNode::Delete { input, table } => {
3608            let child = format_plan_tree(input, depth + 1);
3609            format!("{indent}Delete table={table}\n{child}")
3610        }
3611        PlanNode::CreateTable { name, fields } => {
3612            let fs: Vec<String> = fields
3613                .iter()
3614                .map(|(n, t, r)| {
3615                    if *r {
3616                        format!("{n}: {t} required")
3617                    } else {
3618                        format!("{n}: {t}")
3619                    }
3620                })
3621                .collect();
3622            format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3623        }
3624        PlanNode::AlterTable { table, action } => {
3625            format!("{indent}AlterTable table={table} action={action:?}")
3626        }
3627        PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3628        PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3629        PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3630        PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3631        PlanNode::Window { input, windows } => {
3632            let ws: Vec<String> = windows
3633                .iter()
3634                .map(|w| format!("{:?} as {}", w.function, w.output_name))
3635                .collect();
3636            let child = format_plan_tree(input, depth + 1);
3637            format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3638        }
3639        PlanNode::Union { left, right, all } => {
3640            let kind = if *all { "UNION ALL" } else { "UNION" };
3641            let left_child = format_plan_tree(left, depth + 1);
3642            let right_child = format_plan_tree(right, depth + 1);
3643            format!("{indent}{kind}\n{left_child}\n{right_child}")
3644        }
3645        PlanNode::Explain { input } => {
3646            let child = format_plan_tree(input, depth + 1);
3647            format!("{indent}Explain\n{child}")
3648        }
3649        PlanNode::Begin => format!("{indent}Begin"),
3650        PlanNode::Commit => format!("{indent}Commit"),
3651        PlanNode::Rollback => format!("{indent}Rollback"),
3652    }
3653}