Skip to main content

powdb_query/executor/
plan_exec.rs

1//! The execute_plan method and associated helpers.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::{check_join_limit, Engine, MAX_SORT_ROWS};
15use powdb_storage::view::ViewDef;
16
17impl Engine {
18    pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
19        match plan {
20            PlanNode::SeqScan { table } => {
21                // Auto-refresh dirty materialized views on read.
22                if self.view_registry.is_dirty(table) {
23                    self.refresh_view(table)?;
24                }
25                let schema = self
26                    .catalog
27                    .schema(table)
28                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
29                    .clone();
30                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
31                let rows: Vec<Vec<Value>> = self
32                    .catalog
33                    .scan(table)
34                    .map_err(|e| QueryError::StorageError(e.to_string()))?
35                    .map(|(_, row)| row)
36                    .collect();
37                Ok(QueryResult::Rows { columns, rows })
38            }
39
40            PlanNode::Filter { input, predicate } => {
41                // Materialize any IN-subqueries in the predicate before the
42                // scan loop — the closure can't call back into the engine.
43                // Correlated subqueries are left in place for per-row eval.
44                let materialized;
45                let predicate = if contains_subquery(predicate) {
46                    materialized = self.materialize_subqueries(predicate)?;
47                    &materialized
48                } else {
49                    predicate
50                };
51
52                // Correlated subquery path: per-row materialisation.
53                if contains_subquery(predicate) {
54                    let result = self.execute_plan(input)?;
55                    return match result {
56                        QueryResult::Rows { columns, rows } => {
57                            let mut filtered = Vec::new();
58                            for row in rows {
59                                let row_pred =
60                                    self.materialize_correlated_for_row(predicate, &row, &columns)?;
61                                if eval_predicate(&row_pred, &row, &columns) {
62                                    filtered.push(row);
63                                }
64                            }
65                            Ok(QueryResult::Rows {
66                                columns,
67                                rows: filtered,
68                            })
69                        }
70                        _ => Err("filter requires row input".into()),
71                    };
72                }
73
74                // Fast path: fuse Filter + SeqScan into a zero-copy streaming
75                // loop. Uses decode_column() to evaluate the predicate on only
76                // the columns it references, avoiding heap allocations for
77                // String/Bytes columns that aren't part of the filter.
78                if let PlanNode::SeqScan { table } = input.as_ref() {
79                    // Auto-refresh dirty materialized views.
80                    if self.view_registry.is_dirty(table) {
81                        self.refresh_view(table)?;
82                    }
83                    let schema = self
84                        .catalog
85                        .schema(table)
86                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
87                        .clone();
88                    let columns: Vec<String> =
89                        schema.columns.iter().map(|c| c.name.clone()).collect();
90                    let fast = FastLayout::new(&schema);
91                    let row_layout = RowLayout::new(&schema);
92                    // Mission F: pre-size to skip the first 4 Vec doublings
93                    // (4 → 8 → 16 → 32 → 64). On a 100K-row scan with 30%
94                    // selectivity that's ~4 fewer reallocations + memcpys.
95                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
96
97                    // Try compiled predicate for the filter check (handles
98                    // int leaves, string-eq leaves, and And conjunctions).
99                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
100                        self.catalog
101                            .for_each_row_raw(table, |_rid, data| {
102                                if compiled(data) {
103                                    rows.push(decode_row(&schema, data));
104                                }
105                            })
106                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
107                    } else {
108                        let pred_cols = predicate_column_indices(predicate, &columns);
109                        self.catalog
110                            .for_each_row_raw(table, |_rid, data| {
111                                let pred_row =
112                                    decode_selective(&schema, &row_layout, data, &pred_cols);
113                                if eval_predicate(predicate, &pred_row, &columns) {
114                                    rows.push(decode_row(&schema, data));
115                                }
116                            })
117                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
118                    }
119
120                    return Ok(QueryResult::Rows { columns, rows });
121                }
122
123                // General path: materialise then filter.
124                let result = self.execute_plan(input)?;
125                match result {
126                    QueryResult::Rows { columns, rows } => {
127                        let filtered: Vec<Vec<Value>> = rows
128                            .into_iter()
129                            .filter(|row| eval_predicate(predicate, row, &columns))
130                            .collect();
131                        Ok(QueryResult::Rows {
132                            columns,
133                            rows: filtered,
134                        })
135                    }
136                    _ => Err("filter requires row input".into()),
137                }
138            }
139
140            PlanNode::Project { input, fields } => {
141                // Fast path: Project over IndexScan — decode only projected
142                // columns from raw bytes instead of full decode_row.
143                if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
144                    let schema = self
145                        .catalog
146                        .schema(table)
147                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
148                        .clone();
149                    let all_columns: Vec<String> =
150                        schema.columns.iter().map(|c| c.name.clone()).collect();
151                    let key_value = literal_to_value(key)?;
152                    let tbl = self
153                        .catalog
154                        .get_table(table)
155                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
156
157                    let proj_columns: Vec<String> = fields
158                        .iter()
159                        .map(|f| {
160                            f.alias.clone().unwrap_or_else(|| match &f.expr {
161                                Expr::Field(name) => name.clone(),
162                                _ => "?".into(),
163                            })
164                        })
165                        .collect();
166
167                    // Determine which column indices the projection needs
168                    let proj_indices: Vec<usize> = fields
169                        .iter()
170                        .filter_map(|f| {
171                            if let Expr::Field(name) = &f.expr {
172                                all_columns.iter().position(|c| c == name)
173                            } else {
174                                None
175                            }
176                        })
177                        .collect();
178
179                    if let Some(btree) = tbl.index(column) {
180                        let layout = RowLayout::new(&schema);
181                        // Mission D7: int-specialized lookup skips the
182                        // `<Value as Ord>::cmp` discriminant dispatch on
183                        // int-keyed indexes (the vast majority).
184                        let lookup_result = match &key_value {
185                            Value::Int(k) => btree.lookup_int(*k),
186                            other => btree.lookup(other),
187                        };
188                        let rows = match lookup_result {
189                            Some(rid) => match tbl.heap.get(rid) {
190                                Some(data) => {
191                                    let row: Vec<Value> = proj_indices
192                                        .iter()
193                                        .map(|&ci| decode_column(&schema, &layout, &data, ci))
194                                        .collect();
195                                    vec![row]
196                                }
197                                None => Vec::new(),
198                            },
199                            None => Vec::new(),
200                        };
201                        return Ok(QueryResult::Rows {
202                            columns: proj_columns,
203                            rows,
204                        });
205                    }
206                }
207
208                // Fast path: Project(Limit(Sort(Filter(SeqScan)))) — bounded
209                // top-N heap. Decodes only the sort key + projected columns,
210                // keeps at most `limit` rows in a heap. Also handles the
211                // Project(Limit(Sort(SeqScan))) variant (no filter).
212                if let PlanNode::Limit {
213                    input: inner,
214                    count: limit_expr,
215                } = input.as_ref()
216                {
217                    if let PlanNode::Sort {
218                        input: sort_input,
219                        keys,
220                    } = inner.as_ref()
221                    {
222                        // Fast path only for single-key sorts
223                        if keys.len() == 1 {
224                            let sort_field = &keys[0].field;
225                            let descending = keys[0].descending;
226                            let limit = match limit_expr {
227                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
228                                _ => usize::MAX,
229                            };
230                            let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
231                                match sort_input.as_ref() {
232                                    PlanNode::SeqScan { table } => (Some(table.as_str()), None),
233                                    PlanNode::Filter {
234                                        input: fi,
235                                        predicate,
236                                    } => {
237                                        if let PlanNode::SeqScan { table } = fi.as_ref() {
238                                            (Some(table.as_str()), Some(predicate))
239                                        } else {
240                                            (None, None)
241                                        }
242                                    }
243                                    _ => (None, None),
244                                };
245                            if let Some(table) = table_opt {
246                                if let Some(result) = self.project_filter_sort_limit_fast(
247                                    table, fields, sort_field, descending, limit, pred_opt,
248                                )? {
249                                    return Ok(result);
250                                }
251                            }
252                        }
253                    }
254                    // Fast path: Project(Limit(Filter(SeqScan))) — stream,
255                    // decode only projected columns, stop at limit.
256                    if let PlanNode::Filter {
257                        input: fi,
258                        predicate,
259                    } = inner.as_ref()
260                    {
261                        if let PlanNode::SeqScan { table } = fi.as_ref() {
262                            let limit = match limit_expr {
263                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
264                                _ => usize::MAX,
265                            };
266                            if let Some(result) = self.project_filter_limit_fast(
267                                table,
268                                fields,
269                                limit,
270                                Some(predicate),
271                            )? {
272                                return Ok(result);
273                            }
274                        }
275                    }
276                    // Fast path: Project(Limit(SeqScan)) — stream, no filter.
277                    if let PlanNode::SeqScan { table } = inner.as_ref() {
278                        let limit = match limit_expr {
279                            Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
280                            _ => usize::MAX,
281                        };
282                        if let Some(result) =
283                            self.project_filter_limit_fast(table, fields, limit, None)?
284                        {
285                            return Ok(result);
286                        }
287                    }
288                }
289
290                // Mission D4: Project(Filter(SeqScan)) without Limit. Reuses
291                // `project_filter_limit_fast` with limit = usize::MAX so the
292                // hot loop decodes only projected columns and uses the
293                // compiled predicate. Previously this fell through to the
294                // generic Filter branch which materialised every column via
295                // `decode_row` then re-projected — quadratic work.
296                //
297                // multi_col_and_filter (`U filter .age > 30 and .status =
298                // "active" { .name, .age }`) was 6.18ms (0.7x SQLite) and
299                // is the load-bearing workload for this fast path.
300                if let PlanNode::Filter {
301                    input: fi,
302                    predicate,
303                } = input.as_ref()
304                {
305                    if let PlanNode::SeqScan { table } = fi.as_ref() {
306                        if let Some(result) = self.project_filter_limit_fast(
307                            table,
308                            fields,
309                            usize::MAX,
310                            Some(predicate),
311                        )? {
312                            return Ok(result);
313                        }
314                    }
315                }
316
317                // Mission D4: Project(SeqScan) without Filter or Limit.
318                // Decode only projected columns; the previous fall-through
319                // built full Vec<Value> rows then re-projected.
320                if let PlanNode::SeqScan { table } = input.as_ref() {
321                    if let Some(result) =
322                        self.project_filter_limit_fast(table, fields, usize::MAX, None)?
323                    {
324                        return Ok(result);
325                    }
326                }
327
328                let result = self.execute_plan(input)?;
329                match result {
330                    QueryResult::Rows { columns, rows } => {
331                        let proj_columns: Vec<String> = fields
332                            .iter()
333                            .map(|f| {
334                                f.alias.clone().unwrap_or_else(|| match &f.expr {
335                                    Expr::Field(name) => name.clone(),
336                                    // Mission E1.2: `{ u.name }` projects as the
337                                    // qualified column name so callers can still
338                                    // disambiguate across the join output.
339                                    Expr::QualifiedField { qualifier, field } => {
340                                        format!("{qualifier}.{field}")
341                                    }
342                                    _ => "?".into(),
343                                })
344                            })
345                            .collect();
346                        let proj_rows: Vec<Vec<Value>> = rows
347                            .iter()
348                            .map(|row| {
349                                fields
350                                    .iter()
351                                    .map(|f| eval_expr(&f.expr, row, &columns))
352                                    .collect()
353                            })
354                            .collect();
355                        Ok(QueryResult::Rows {
356                            columns: proj_columns,
357                            rows: proj_rows,
358                        })
359                    }
360                    _ => Err("project requires row input".into()),
361                }
362            }
363
364            PlanNode::Sort { input, keys } => {
365                let result = self.execute_plan(input)?;
366                match result {
367                    QueryResult::Rows { columns, mut rows } => {
368                        if rows.len() > MAX_SORT_ROWS {
369                            return Err(QueryError::SortLimitExceeded);
370                        }
371                        let key_indices: Vec<(usize, bool)> = keys
372                            .iter()
373                            .map(|k| {
374                                columns
375                                    .iter()
376                                    .position(|c| c == &k.field)
377                                    .map(|idx| (idx, k.descending))
378                                    .ok_or_else(|| QueryError::ColumnNotFound {
379                                        table: String::new(),
380                                        column: k.field.clone(),
381                                    })
382                            })
383                            .collect::<Result<_, QueryError>>()?;
384                        rows.sort_by(|a, b| {
385                            for &(col_idx, descending) in &key_indices {
386                                let cmp = a[col_idx].cmp(&b[col_idx]);
387                                let cmp = if descending { cmp.reverse() } else { cmp };
388                                if cmp != std::cmp::Ordering::Equal {
389                                    return cmp;
390                                }
391                            }
392                            std::cmp::Ordering::Equal
393                        });
394                        Ok(QueryResult::Rows { columns, rows })
395                    }
396                    _ => Err("sort requires row input".into()),
397                }
398            }
399
400            PlanNode::Limit { input, count } => {
401                let result = self.execute_plan(input)?;
402                let n = match count {
403                    Expr::Literal(Literal::Int(v)) => *v as usize,
404                    _ => return Err("limit must be integer literal".into()),
405                };
406                match result {
407                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
408                        columns,
409                        rows: rows.into_iter().take(n).collect(),
410                    }),
411                    _ => Err("limit requires row input".into()),
412                }
413            }
414
415            PlanNode::Offset { input, count } => {
416                let result = self.execute_plan(input)?;
417                let n = match count {
418                    Expr::Literal(Literal::Int(v)) => *v as usize,
419                    _ => return Err("offset must be integer literal".into()),
420                };
421                match result {
422                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
423                        columns,
424                        rows: rows.into_iter().skip(n).collect(),
425                    }),
426                    _ => Err("offset requires row input".into()),
427                }
428            }
429
430            PlanNode::Aggregate {
431                input,
432                function,
433                field,
434            } => {
435                // Fast path: count() over SeqScan — count rows without any decode
436                if *function == AggFunc::Count {
437                    if let PlanNode::SeqScan { table } = input.as_ref() {
438                        let mut count: i64 = 0;
439                        self.catalog
440                            .for_each_row_raw(table, |_rid, _data| {
441                                count += 1;
442                            })
443                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
444                        return Ok(QueryResult::Scalar(Value::Int(count)));
445                    }
446                    // Fast path: count() over Filter(SeqScan) — try compiled
447                    // predicate first, fall back to decode_column path.
448                    if let PlanNode::Filter {
449                        input: inner,
450                        predicate,
451                    } = input.as_ref()
452                    {
453                        if let PlanNode::SeqScan { table } = inner.as_ref() {
454                            let schema = self
455                                .catalog
456                                .schema(table)
457                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
458                                .clone();
459                            let columns: Vec<String> =
460                                schema.columns.iter().map(|c| c.name.clone()).collect();
461                            let fast = FastLayout::new(&schema);
462                            let row_layout = RowLayout::new(&schema);
463
464                            // Try compiled predicate (zero-allocation hot path).
465                            // Handles int leaves, string-eq leaves, AND conjunctions.
466                            if let Some(compiled) =
467                                compile_predicate(predicate, &columns, &fast, &schema)
468                            {
469                                let mut count: i64 = 0;
470                                self.catalog
471                                    .for_each_row_raw(table, |_rid, data| {
472                                        if compiled(data) {
473                                            count += 1;
474                                        }
475                                    })
476                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
477                                return Ok(QueryResult::Scalar(Value::Int(count)));
478                            }
479
480                            // Fallback: decode predicate columns
481                            let pred_cols = predicate_column_indices(predicate, &columns);
482                            let mut count: i64 = 0;
483                            self.catalog
484                                .for_each_row_raw(table, |_rid, data| {
485                                    let pred_row =
486                                        decode_selective(&schema, &row_layout, data, &pred_cols);
487                                    if eval_predicate(predicate, &pred_row, &columns) {
488                                        count += 1;
489                                    }
490                                })
491                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
492
493                            return Ok(QueryResult::Scalar(Value::Int(count)));
494                        }
495                    }
496                }
497
498                // Fast path: sum/avg/min/max over a single fixed-size int
499                // column with an optional compiled filter predicate. Walks
500                // raw row bytes, zero allocation per row.
501                if matches!(
502                    function,
503                    AggFunc::Sum
504                        | AggFunc::Avg
505                        | AggFunc::Min
506                        | AggFunc::Max
507                        | AggFunc::CountDistinct
508                ) {
509                    if let Some(col) = field.as_ref() {
510                        // Shape: Aggregate(SeqScan) or Aggregate(Filter(SeqScan))
511                        let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
512                            match input.as_ref() {
513                                PlanNode::SeqScan { table } => (Some(table.as_str()), None),
514                                PlanNode::Filter {
515                                    input: inner,
516                                    predicate,
517                                } => {
518                                    if let PlanNode::SeqScan { table } = inner.as_ref() {
519                                        (Some(table.as_str()), Some(predicate))
520                                    } else {
521                                        (None, None)
522                                    }
523                                }
524                                _ => (None, None),
525                            };
526                        if let Some(table) = table_opt {
527                            if let Some(result) =
528                                self.agg_single_col_fast(table, col, *function, pred_opt)?
529                            {
530                                return Ok(result);
531                            }
532                        }
533                    }
534                }
535
536                // Fast path: Project(Limit(Filter(SeqScan))) — stream, decode
537                // only projected columns, stop once we hit the limit.
538                // (Handled in the Project branch; this branch only fires when
539                // the aggregate is the outer node.)
540                let result = self.execute_plan(input)?;
541                match result {
542                    QueryResult::Rows { columns, rows } => {
543                        match function {
544                            AggFunc::Count => {
545                                Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
546                            }
547                            AggFunc::CountDistinct => {
548                                let col = field.as_ref().ok_or("count distinct requires field")?;
549                                let idx = columns
550                                    .iter()
551                                    .position(|c| c == col)
552                                    .ok_or("col not found")?;
553                                let mut seen = std::collections::HashSet::new();
554                                for row in &rows {
555                                    let v = &row[idx];
556                                    if !v.is_empty() {
557                                        seen.insert(v.clone());
558                                    }
559                                }
560                                Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
561                            }
562                            AggFunc::Avg => {
563                                let col = field.as_ref().ok_or("avg requires field")?;
564                                let idx = columns
565                                    .iter()
566                                    .position(|c| c == col)
567                                    .ok_or("col not found")?;
568                                let sum: f64 = rows
569                                    .iter()
570                                    .filter_map(|r| match &r[idx] {
571                                        Value::Int(v) => Some(*v as f64),
572                                        Value::Float(v) => Some(*v),
573                                        _ => None,
574                                    })
575                                    .sum();
576                                let count = rows.len() as f64;
577                                Ok(QueryResult::Scalar(Value::Float(sum / count)))
578                            }
579                            AggFunc::Sum => {
580                                let col = field.as_ref().ok_or("sum requires field")?;
581                                let idx = columns
582                                    .iter()
583                                    .position(|c| c == col)
584                                    .ok_or("col not found")?;
585                                // Track int and float contributions separately so
586                                // Float columns (and mixed Int/Float rows) don't get
587                                // silently dropped as they did in the Int-only
588                                // version. If any Float is present, the whole sum
589                                // promotes to Float — matching Avg's semantics.
590                                let mut int_sum: i64 = 0;
591                                let mut float_sum: f64 = 0.0;
592                                let mut saw_float = false;
593                                for r in &rows {
594                                    match &r[idx] {
595                                        Value::Int(v) => int_sum += *v,
596                                        Value::Float(v) => {
597                                            float_sum += *v;
598                                            saw_float = true;
599                                        }
600                                        _ => {}
601                                    }
602                                }
603                                let result = if saw_float {
604                                    Value::Float(float_sum + int_sum as f64)
605                                } else {
606                                    Value::Int(int_sum)
607                                };
608                                Ok(QueryResult::Scalar(result))
609                            }
610                            AggFunc::Min | AggFunc::Max => {
611                                let col = field.as_ref().ok_or("min/max requires field")?;
612                                let idx = columns
613                                    .iter()
614                                    .position(|c| c == col)
615                                    .ok_or("col not found")?;
616                                let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
617                                let result = if *function == AggFunc::Min {
618                                    vals.into_iter().min().cloned()
619                                } else {
620                                    vals.into_iter().max().cloned()
621                                };
622                                Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
623                            }
624                        }
625                    }
626                    _ => Err("aggregate requires row input".into()),
627                }
628            }
629
630            PlanNode::Insert { table, assignments } => {
631                // Mission C Phase 3: resolve column indices + literals under
632                // a short-lived shared borrow on the catalog, then release
633                // it before calling insert(). The previous code cloned the
634                // full Schema (6+ String allocations on User) just to dodge
635                // the borrow checker — a measurable 200-400ns on every
636                // insert_single call in the bench.
637                let values = {
638                    let schema = self
639                        .catalog
640                        .schema(table)
641                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
642                    let mut values = vec![Value::Empty; schema.columns.len()];
643                    for a in assignments {
644                        let idx = schema.column_index(&a.field).ok_or_else(|| {
645                            QueryError::ColumnNotFound {
646                                table: String::new(),
647                                column: a.field.clone(),
648                            }
649                        })?;
650                        values[idx] = literal_to_value(&a.value)?;
651                    }
652                    values
653                };
654                self.catalog
655                    .insert(table, &values)
656                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
657                self.view_registry.mark_dependents_dirty(table);
658                Ok(QueryResult::Modified(1))
659            }
660
661            PlanNode::Upsert {
662                table,
663                key_column,
664                assignments,
665                on_conflict,
666            } => {
667                // Build the insert values from assignments.
668                let (values, key_idx) = {
669                    let schema = self
670                        .catalog
671                        .schema(table)
672                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
673                    let mut values = vec![Value::Empty; schema.columns.len()];
674                    for a in assignments {
675                        let idx = schema.column_index(&a.field).ok_or_else(|| {
676                            QueryError::ColumnNotFound {
677                                table: String::new(),
678                                column: a.field.clone(),
679                            }
680                        })?;
681                        values[idx] = literal_to_value(&a.value)?;
682                    }
683                    let key_idx = schema
684                        .column_index(key_column)
685                        .ok_or_else(|| format!("key column '{key_column}' not found"))?;
686                    (values, key_idx)
687                };
688
689                let key_value = values[key_idx].clone();
690
691                // Probe the index for a conflict.
692                let existing = {
693                    let tbl = self
694                        .catalog
695                        .get_table(table)
696                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
697                    if let Some(btree) = tbl.index(key_column) {
698                        let hit = match &key_value {
699                            Value::Int(k) => btree.lookup_int(*k),
700                            other => btree.lookup(other),
701                        };
702                        hit.and_then(|rid| {
703                            tbl.heap
704                                .get(rid)
705                                .map(|data| (rid, decode_row(&tbl.schema, &data)))
706                        })
707                    } else {
708                        // No index — linear scan for the key.
709                        let mut found = None;
710                        for (rid, row) in tbl.scan() {
711                            if row[key_idx] == key_value {
712                                found = Some((rid, row));
713                                break;
714                            }
715                        }
716                        found
717                    }
718                };
719
720                if let Some((rid, mut existing_row)) = existing {
721                    // Conflict: apply on_conflict assignments (or all non-key if empty).
722                    let update_assignments = if on_conflict.is_empty() {
723                        assignments
724                    } else {
725                        on_conflict
726                    };
727                    let changed_cols: Vec<usize> = {
728                        let schema = self
729                            .catalog
730                            .schema(table)
731                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
732                        let mut indices = Vec::new();
733                        for a in update_assignments {
734                            let idx = schema.column_index(&a.field).ok_or_else(|| {
735                                QueryError::ColumnNotFound {
736                                    table: String::new(),
737                                    column: a.field.clone(),
738                                }
739                            })?;
740                            if idx != key_idx {
741                                existing_row[idx] = literal_to_value(&a.value)?;
742                                indices.push(idx);
743                            }
744                        }
745                        indices
746                    };
747                    self.catalog
748                        .update_hinted(table, rid, &existing_row, Some(&changed_cols))
749                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
750                    self.view_registry.mark_dependents_dirty(table);
751                    Ok(QueryResult::Modified(1))
752                } else {
753                    // No conflict: insert.
754                    self.catalog
755                        .insert(table, &values)
756                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
757                    self.view_registry.mark_dependents_dirty(table);
758                    Ok(QueryResult::Modified(1))
759                }
760            }
761
762            PlanNode::Update {
763                input,
764                table,
765                assignments,
766            } => {
767                // Mission C Phase 3: resolve assignments against a borrowed
768                // schema, then drop the borrow before the mutation loop.
769                // Try literal-only path first; fall back to per-row expression
770                // evaluation if any assignment contains a non-literal expression
771                // (e.g., `age := .age + 1`).
772                let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
773                    let schema_ref = self
774                        .catalog
775                        .schema(table)
776                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
777                    let indices: Vec<usize> = assignments
778                        .iter()
779                        .map(|a| {
780                            schema_ref.column_index(&a.field).ok_or_else(|| {
781                                QueryError::ColumnNotFound {
782                                    table: String::new(),
783                                    column: a.field.clone(),
784                                }
785                            })
786                        })
787                        .collect::<Result<_, _>>()?;
788                    let vals: Result<Vec<Value>, _> = assignments
789                        .iter()
790                        .map(|a| literal_to_value(&a.value))
791                        .collect();
792                    (indices, vals.ok())
793                };
794                let resolved_assignments: Option<Vec<(usize, Value)>> =
795                    literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
796
797                // Mission C Phase 2: the hint Table::update_hinted needs to
798                // decide whether to read the old row for index diff.
799                let changed_cols: Vec<usize> = col_indices.clone();
800
801                // ── Fused scan+update for Update(Filter(SeqScan)) ────────
802                // Perf sprint: instead of the two-pass collect-RIDs-then-loop
803                // pattern (which pays one ensure_hot per matched row on the
804                // second pass), fuse the predicate evaluation and in-place
805                // byte-level mutation into a single heap walk. Same idea as
806                // the fused scan_delete_matching path for deletes.
807                if let Some(ref resolved_assignments) = resolved_assignments {
808                    if let PlanNode::Filter {
809                        input: inner,
810                        predicate,
811                    } = input.as_ref()
812                    {
813                        if let PlanNode::SeqScan { table: t } = inner.as_ref() {
814                            if t == table {
815                                let fused_result = self.try_fused_scan_update(
816                                    table,
817                                    predicate,
818                                    resolved_assignments,
819                                    &changed_cols,
820                                );
821                                if let Some(result) = fused_result {
822                                    return result;
823                                }
824                            }
825                        }
826                    }
827                }
828
829                // Collect matching RowIds in a single pass.
830                let matching_rids = self.collect_rids_for_mutation(input, table)?;
831
832                // ── Literal-only fast paths ─────────────────────────────
833                if let Some(ref resolved_assignments) = resolved_assignments {
834                    // Mission C Phase 4: in-place byte-patch fast path. If every
835                    // assignment targets a fixed-size non-null column AND none of
836                    // them is indexed, we can skip decode_row / Vec<Value> /
837                    // encode_row_into entirely and patch the row's raw bytes on
838                    // the hot page.
839                    let fast_patch: Option<Vec<FastPatch>> = {
840                        let tbl = self
841                            .catalog
842                            .get_table(table)
843                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
844                        let schema = &tbl.schema;
845                        let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
846                            is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
847                        });
848                        let no_indexed = !resolved_assignments
849                            .iter()
850                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
851
852                        if all_fixed_nonnull && no_indexed {
853                            let layout = RowLayout::new(schema);
854                            let bitmap_size = layout.bitmap_size();
855                            let patches: Vec<FastPatch> = resolved_assignments
856                                .iter()
857                                .map(|(idx, val)| {
858                                    let fixed_off = layout
859                                        .fixed_offset(*idx)
860                                        .expect("is_fixed_size already checked");
861                                    let field_off = 2 + bitmap_size + fixed_off;
862                                    let bytes: FixedBytes = match val {
863                                        Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
864                                        Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
865                                        Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
866                                        Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
867                                        Value::Uuid(v) => FixedBytes::Uuid(*v),
868                                        _ => unreachable!("all_fixed_nonnull guard lied"),
869                                    };
870                                    FastPatch {
871                                        field_off,
872                                        bitmap_byte_off: 2 + idx / 8,
873                                        bit_mask: 1u8 << (idx % 8),
874                                        bytes,
875                                    }
876                                })
877                                .collect();
878                            Some(patches)
879                        } else {
880                            None
881                        }
882                    };
883
884                    if let Some(patches) = fast_patch {
885                        let mut count = 0u64;
886                        for rid in matching_rids {
887                            // Mission B2: WAL-log every patch so crash
888                            // recovery replays the update. Same mutation
889                            // closure as before — the wrapper just sandwiches
890                            // it between a hot-page read and a WAL append.
891                            let ok = self
892                                .catalog
893                                .update_row_bytes_logged(table, rid, |row| {
894                                    for p in &patches {
895                                        row[p.bitmap_byte_off] &= !p.bit_mask;
896                                        let field_bytes = p.bytes.as_slice();
897                                        row[p.field_off..p.field_off + field_bytes.len()]
898                                            .copy_from_slice(field_bytes);
899                                    }
900                                })
901                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
902                            if ok {
903                                count += 1;
904                            }
905                        }
906                        self.view_registry.mark_dependents_dirty(table);
907                        return Ok(QueryResult::Modified(count));
908                    }
909
910                    // Mission C Phase 10: var-column in-place shrink fast path.
911                    let var_fast: Option<(usize, Option<Vec<u8>>)> = {
912                        let tbl = self
913                            .catalog
914                            .get_table(table)
915                            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
916                        let schema = &tbl.schema;
917                        let is_single = resolved_assignments.len() == 1;
918                        let is_var_col = is_single
919                            && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
920                        let no_indexed = !resolved_assignments
921                            .iter()
922                            .any(|(idx, _)| tbl.has_indexed_col(*idx));
923
924                        if is_single && is_var_col && no_indexed {
925                            let (idx, val) = &resolved_assignments[0];
926                            let bytes_opt: Option<Vec<u8>> = match val {
927                                Value::Str(s) => Some(s.as_bytes().to_vec()),
928                                Value::Bytes(b) => Some(b.clone()),
929                                Value::Empty => None,
930                                _ => {
931                                    return Err(QueryError::TypeError(format!(
932                                        "cannot assign non-var value to var column '{}'",
933                                        schema.columns[*idx].name
934                                    )))
935                                }
936                            };
937                            Some((*idx, bytes_opt))
938                        } else {
939                            None
940                        }
941                    };
942
943                    if let Some((col_idx, new_bytes_opt)) = var_fast {
944                        let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
945                        let mut count = 0u64;
946                        let mut fallback_rids: Vec<RowId> = Vec::new();
947                        for rid in &matching_rids {
948                            // Mission B2: logged variant so crash recovery
949                            // replays the shrink. On a false return (row
950                            // would have to grow), the rid is pushed to
951                            // `fallback_rids` and the slower `update_hinted`
952                            // path — which is already WAL-logged — picks it up.
953                            let ok = self
954                                .catalog
955                                .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
956                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
957                            if ok {
958                                count += 1;
959                            } else {
960                                fallback_rids.push(*rid);
961                            }
962                        }
963                        for rid in fallback_rids {
964                            let mut row = match self.catalog.get(table, rid) {
965                                Some(r) => r,
966                                None => continue,
967                            };
968                            for (idx, val) in resolved_assignments.iter() {
969                                row[*idx] = val.clone();
970                            }
971                            self.catalog
972                                .update_hinted(table, rid, &row, Some(&changed_cols))
973                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
974                            count += 1;
975                        }
976                        self.view_registry.mark_dependents_dirty(table);
977                        return Ok(QueryResult::Modified(count));
978                    }
979
980                    // Generic literal path: decode row, apply literal values.
981                    let mut count = 0u64;
982                    for rid in matching_rids {
983                        let mut row = match self.catalog.get(table, rid) {
984                            Some(r) => r,
985                            None => continue,
986                        };
987                        for (idx, val) in resolved_assignments.iter() {
988                            row[*idx] = val.clone();
989                        }
990                        self.catalog
991                            .update_hinted(table, rid, &row, Some(&changed_cols))
992                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
993                        count += 1;
994                    }
995                    self.view_registry.mark_dependents_dirty(table);
996                    return Ok(QueryResult::Modified(count));
997                } // end if let Some(resolved_assignments)
998
999                // ── Expression-based update path ────────────────────────
1000                // At least one assignment contains a non-literal expression
1001                // (e.g., `age := .age + 1`). Evaluate per-row.
1002                let col_names: Vec<String> = {
1003                    let schema_ref = self
1004                        .catalog
1005                        .schema(table)
1006                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1007                    schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1008                };
1009                let mut count = 0u64;
1010                for rid in matching_rids {
1011                    let mut row = match self.catalog.get(table, rid) {
1012                        Some(r) => r,
1013                        None => continue,
1014                    };
1015                    for (i, asgn) in assignments.iter().enumerate() {
1016                        let val = eval_expr(&asgn.value, &row, &col_names);
1017                        row[col_indices[i]] = val;
1018                    }
1019                    self.catalog
1020                        .update_hinted(table, rid, &row, Some(&changed_cols))
1021                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1022                    count += 1;
1023                }
1024                self.view_registry.mark_dependents_dirty(table);
1025                Ok(QueryResult::Modified(count))
1026            }
1027
1028            PlanNode::Delete { input, table } => {
1029                // Mission C Phase 3: no schema clone — collect_rids_for_mutation
1030                // looks up schema internally when it needs one, and the mutation
1031                // loop doesn't need the schema at all.
1032                //
1033                // Mission C Phase 12: route bulk deletes through
1034                // `Catalog::delete_many`, which batches the btree leaf
1035                // compaction and shares one `ensure_hot` per row between
1036                // the index-key extraction and the slot delete. On
1037                // `delete_by_filter` (100K fixture, ~20K matches) that
1038                // removes ~4ms of pure `Vec::remove` memmove from the btree
1039                // maintenance phase.
1040                //
1041                // Mission C Phase 16: for the common `delete where ...`
1042                // shape (Filter(SeqScan)) — and the rarer "delete
1043                // everything" shape (SeqScan) — skip the two-pass
1044                // `collect_rids_for_mutation` + `delete_many` flow entirely.
1045                // The fused `scan_delete_matching` primitive walks the
1046                // heap exactly once, paying one `ensure_hot` per page
1047                // instead of per-row. That closes the last major gap on
1048                // the bench's `delete_by_filter` workload.
1049                if let PlanNode::Filter {
1050                    input: inner,
1051                    predicate,
1052                } = input.as_ref()
1053                {
1054                    if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1055                        if t == table {
1056                            let schema = self
1057                                .catalog
1058                                .schema(table)
1059                                .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1060                            let columns: Vec<String> =
1061                                schema.columns.iter().map(|c| c.name.clone()).collect();
1062                            let fast = FastLayout::new(schema);
1063                            if let Some(compiled) =
1064                                compile_predicate(predicate, &columns, &fast, schema)
1065                            {
1066                                // Mission B2: logged variant so every
1067                                // matched rid hits the WAL during the
1068                                // single-pass scan. Structure of the
1069                                // fused scan is unchanged — only the
1070                                // hook closure now also appends.
1071                                let count = self
1072                                    .catalog
1073                                    .scan_delete_matching_logged(table, |data| compiled(data))
1074                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1075                                self.view_registry.mark_dependents_dirty(table);
1076                                return Ok(QueryResult::Modified(count));
1077                            }
1078                        }
1079                    }
1080                } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1081                    if t == table {
1082                        // `delete from T` with no predicate — every live
1083                        // row matches. One pass is still the right shape.
1084                        // Mission B2: logged variant — see above.
1085                        let count = self
1086                            .catalog
1087                            .scan_delete_matching_logged(table, |_| true)
1088                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1089                        self.view_registry.mark_dependents_dirty(table);
1090                        return Ok(QueryResult::Modified(count));
1091                    }
1092                }
1093
1094                let matching_rids = self.collect_rids_for_mutation(input, table)?;
1095                let count = self
1096                    .catalog
1097                    .delete_many(table, &matching_rids)
1098                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1099                self.view_registry.mark_dependents_dirty(table);
1100                Ok(QueryResult::Modified(count))
1101            }
1102
1103            PlanNode::AliasScan { table, alias } => {
1104                // Mission E1.2: scan `table` and rename every output column
1105                // to `alias.field`. Used as a join leaf so downstream
1106                // NestedLoopJoin + Filter + Project nodes can resolve
1107                // `Expr::QualifiedField` lookups by direct column-name match.
1108                //
1109                // We don't bother with a fused zero-copy loop here yet — the
1110                // whole join path is nested-loop and correctness-first
1111                // (Phase E1.3 will introduce hash join and at that point we
1112                // can revisit whether to specialise AliasScan).
1113                let schema = self
1114                    .catalog
1115                    .schema(table)
1116                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1117                    .clone();
1118                let columns: Vec<String> = schema
1119                    .columns
1120                    .iter()
1121                    .map(|c| format!("{alias}.{}", c.name))
1122                    .collect();
1123                let rows: Vec<Vec<Value>> = self
1124                    .catalog
1125                    .scan(table)
1126                    .map_err(|e| QueryError::StorageError(e.to_string()))?
1127                    .map(|(_, row)| row)
1128                    .collect();
1129                Ok(QueryResult::Rows { columns, rows })
1130            }
1131
1132            PlanNode::NestedLoopJoin {
1133                left,
1134                right,
1135                on,
1136                kind,
1137            } => {
1138                // Materialise both sides. The executor ships two strategies:
1139                //   1. Hash join (E1.3) — when the `on` predicate is a
1140                //      simple equi-predicate `left_col = right_col`, build a
1141                //      FxHashMap<Value, Vec<row_idx>> over the right side
1142                //      and probe with the left side. O(L + R) instead of
1143                //      O(L × R). Handles Inner and LeftOuter.
1144                //   2. Nested loop (E1.2) — fallback for Cross, non-equi
1145                //      predicates, or `on` expressions that reference
1146                //      either side with something more complex than a
1147                //      QualifiedField.
1148                let left_result = self.execute_plan(left)?;
1149                let right_result = self.execute_plan(right)?;
1150                let (left_columns, left_rows) = match left_result {
1151                    QueryResult::Rows { columns, rows } => (columns, rows),
1152                    _ => return Err("join left side must produce rows".into()),
1153                };
1154                let (right_columns, right_rows) = match right_result {
1155                    QueryResult::Rows { columns, rows } => (columns, rows),
1156                    _ => return Err("join right side must produce rows".into()),
1157                };
1158
1159                // Hash-join fast path.
1160                if !matches!(kind, JoinKind::Cross) {
1161                    if let Some(pred) = on {
1162                        if let Some((l_idx, r_idx)) =
1163                            try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1164                        {
1165                            let result = hash_join(
1166                                left_columns,
1167                                left_rows,
1168                                right_columns,
1169                                right_rows,
1170                                l_idx,
1171                                r_idx,
1172                                *kind,
1173                            );
1174                            if let QueryResult::Rows { ref rows, .. } = result {
1175                                check_join_limit(rows.len())?;
1176                            }
1177                            return Ok(result);
1178                        }
1179                    }
1180                }
1181
1182                // Nested-loop fallback.
1183                let n_left = left_columns.len();
1184                let n_right = right_columns.len();
1185                let mut columns = Vec::with_capacity(n_left + n_right);
1186                columns.extend(left_columns);
1187                columns.extend(right_columns);
1188
1189                let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1190                let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1191
1192                for left_row in &left_rows {
1193                    let mut matched = false;
1194                    for right_row in &right_rows {
1195                        combined.clear();
1196                        combined.extend_from_slice(left_row);
1197                        combined.extend_from_slice(right_row);
1198                        let keep = match kind {
1199                            JoinKind::Cross => true,
1200                            JoinKind::Inner | JoinKind::LeftOuter => match on {
1201                                Some(pred) => eval_predicate(pred, &combined, &columns),
1202                                // Missing `on` for non-cross joins is a
1203                                // parser error, but if it slips through we
1204                                // treat it as "match everything".
1205                                None => true,
1206                            },
1207                            // RightOuter is rewritten to LeftOuter by the
1208                            // planner, so we never see it here.
1209                            JoinKind::RightOuter => {
1210                                unreachable!("planner rewrites RightOuter to LeftOuter")
1211                            }
1212                        };
1213                        if keep {
1214                            rows.push(combined.clone());
1215                            check_join_limit(rows.len())?;
1216                            matched = true;
1217                        }
1218                    }
1219                    if !matched && matches!(kind, JoinKind::LeftOuter) {
1220                        let mut row = Vec::with_capacity(n_left + n_right);
1221                        row.extend_from_slice(left_row);
1222                        row.resize(n_left + n_right, Value::Empty);
1223                        rows.push(row);
1224                        check_join_limit(rows.len())?;
1225                    }
1226                }
1227
1228                Ok(QueryResult::Rows { columns, rows })
1229            }
1230
1231            PlanNode::Distinct { input } => {
1232                let result = self.execute_plan(input)?;
1233                match result {
1234                    QueryResult::Rows { columns, rows } => {
1235                        let mut seen = std::collections::HashSet::new();
1236                        let mut unique_rows = Vec::new();
1237                        for row in rows {
1238                            if seen.insert(row.clone()) {
1239                                unique_rows.push(row);
1240                            }
1241                        }
1242                        Ok(QueryResult::Rows {
1243                            columns,
1244                            rows: unique_rows,
1245                        })
1246                    }
1247                    other => Ok(other),
1248                }
1249            }
1250
1251            PlanNode::GroupBy {
1252                input,
1253                keys,
1254                aggregates,
1255                having,
1256            } => {
1257                let result = self.execute_plan(input)?;
1258                match result {
1259                    QueryResult::Rows { columns, rows } => {
1260                        // Resolve key column indices.
1261                        let key_indices: Vec<usize> = keys
1262                            .iter()
1263                            .map(|k| {
1264                                columns
1265                                    .iter()
1266                                    .position(|c| c == k)
1267                                    .ok_or_else(|| format!("group-by column '{k}' not found"))
1268                            })
1269                            .collect::<Result<Vec<_>, _>>()?;
1270
1271                        // Resolve aggregate field indices. count(*) uses
1272                        // sentinel usize::MAX — compute_group_aggregate
1273                        // treats it as "count all rows in the group".
1274                        let agg_field_indices: Vec<usize> = aggregates
1275                            .iter()
1276                            .map(|a| {
1277                                if a.field == "*" {
1278                                    Ok(usize::MAX)
1279                                } else {
1280                                    columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1281                                        format!("aggregate column '{}' not found", a.field)
1282                                    })
1283                                }
1284                            })
1285                            .collect::<Result<Vec<_>, _>>()?;
1286
1287                        // Group rows by key values (preserving insertion order).
1288                        let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1289                            rustc_hash::FxHashMap::default();
1290                        let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1291                        for (ri, row) in rows.iter().enumerate() {
1292                            let key: Vec<Value> =
1293                                key_indices.iter().map(|&i| row[i].clone()).collect();
1294                            match group_map.get(&key) {
1295                                Some(&idx) => groups[idx].1.push(ri),
1296                                None => {
1297                                    let idx = groups.len();
1298                                    group_map.insert(key.clone(), idx);
1299                                    groups.push((key, vec![ri]));
1300                                }
1301                            }
1302                        }
1303
1304                        // Build output column names: keys ++ aggregate output names.
1305                        let mut out_columns: Vec<String> = keys.clone();
1306                        for agg in aggregates.iter() {
1307                            out_columns.push(agg.output_name.clone());
1308                        }
1309
1310                        // Compute aggregates per group.
1311                        let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1312                        for (key_vals, row_indices) in &groups {
1313                            let mut row = key_vals.clone();
1314                            for (ai, agg) in aggregates.iter().enumerate() {
1315                                let col_idx = agg_field_indices[ai];
1316                                let val = compute_group_aggregate(
1317                                    agg.function,
1318                                    &rows,
1319                                    row_indices,
1320                                    col_idx,
1321                                );
1322                                row.push(val);
1323                            }
1324                            out_rows.push(row);
1325                        }
1326
1327                        // Apply HAVING filter.
1328                        if let Some(having_expr) = having {
1329                            out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1330                        }
1331
1332                        Ok(QueryResult::Rows {
1333                            columns: out_columns,
1334                            rows: out_rows,
1335                        })
1336                    }
1337                    _ => Err("group by requires row input".into()),
1338                }
1339            }
1340
1341            PlanNode::CreateTable { name, fields } => {
1342                let columns: Vec<ColumnDef> = fields
1343                    .iter()
1344                    .enumerate()
1345                    .map(|(i, (fname, tname, req))| ColumnDef {
1346                        name: fname.clone(),
1347                        type_id: type_name_to_id(tname),
1348                        required: *req,
1349                        position: i as u16,
1350                    })
1351                    .collect();
1352                let schema = Schema {
1353                    table_name: name.clone(),
1354                    columns,
1355                };
1356                self.catalog
1357                    .create_table(schema)
1358                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1359                Ok(QueryResult::Created(name.clone()))
1360            }
1361
1362            PlanNode::AlterTable { table, action } => match action {
1363                AlterAction::AddColumn {
1364                    name,
1365                    type_name,
1366                    required,
1367                } => {
1368                    let position = self
1369                        .catalog
1370                        .schema(table)
1371                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1372                        .columns
1373                        .len() as u16;
1374                    let col = ColumnDef {
1375                        name: name.clone(),
1376                        type_id: type_name_to_id(type_name),
1377                        required: *required,
1378                        position,
1379                    };
1380                    self.catalog
1381                        .alter_table_add_column(table, col)
1382                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1383                    Ok(QueryResult::Executed {
1384                        message: format!("column '{name}' added to '{table}'"),
1385                    })
1386                }
1387                AlterAction::DropColumn { name } => {
1388                    self.catalog
1389                        .alter_table_drop_column(table, name)
1390                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1391                    Ok(QueryResult::Executed {
1392                        message: format!("column '{name}' dropped from '{table}'"),
1393                    })
1394                }
1395                AlterAction::AddIndex { column } => {
1396                    self.catalog
1397                        .create_index(table, column)
1398                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1399                    Ok(QueryResult::Executed {
1400                        message: format!("index on '{table}.{column}' created"),
1401                    })
1402                }
1403            },
1404
1405            PlanNode::DropTable { name } => {
1406                self.catalog
1407                    .drop_table(name)
1408                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1409                Ok(QueryResult::Executed {
1410                    message: format!("table '{name}' dropped"),
1411                })
1412            }
1413
1414            PlanNode::CreateView { name, query_text } => {
1415                self.create_view(name, query_text)?;
1416                Ok(QueryResult::Executed {
1417                    message: format!("materialized view '{name}' created"),
1418                })
1419            }
1420
1421            PlanNode::RefreshView { name } => {
1422                self.refresh_view(name)?;
1423                Ok(QueryResult::Executed {
1424                    message: format!("materialized view '{name}' refreshed"),
1425                })
1426            }
1427
1428            PlanNode::DropView { name } => {
1429                self.drop_view(name)?;
1430                Ok(QueryResult::Executed {
1431                    message: format!("materialized view '{name}' dropped"),
1432                })
1433            }
1434
1435            PlanNode::Window { input, windows } => {
1436                let result = self.execute_plan(input)?;
1437                execute_window(result, windows)
1438            }
1439
1440            PlanNode::Union { left, right, all } => {
1441                let left_result = self.execute_plan(left)?;
1442                let right_result = self.execute_plan(right)?;
1443                let (left_cols, left_rows) = match left_result {
1444                    QueryResult::Rows { columns, rows } => (columns, rows),
1445                    _ => return Err("UNION requires query results on left side".into()),
1446                };
1447                let (_, right_rows) = match right_result {
1448                    QueryResult::Rows { columns, rows } => (columns, rows),
1449                    _ => return Err("UNION requires query results on right side".into()),
1450                };
1451                let mut combined = left_rows;
1452                if *all {
1453                    // UNION ALL — just concatenate.
1454                    combined.extend(right_rows);
1455                } else {
1456                    // UNION — deduplicate using the same HashSet approach
1457                    // as DISTINCT. Value already implements Hash + Eq.
1458                    let mut seen = std::collections::HashSet::new();
1459                    for row in &combined {
1460                        seen.insert(row.clone());
1461                    }
1462                    for row in right_rows {
1463                        if seen.insert(row.clone()) {
1464                            combined.push(row);
1465                        }
1466                    }
1467                }
1468                Ok(QueryResult::Rows {
1469                    columns: left_cols,
1470                    rows: combined,
1471                })
1472            }
1473
1474            PlanNode::Explain { input } => {
1475                let text = format_plan_tree(input, 0);
1476                Ok(QueryResult::Rows {
1477                    columns: vec!["plan".to_string()],
1478                    rows: text
1479                        .lines()
1480                        .map(|line| vec![Value::Str(line.to_string())])
1481                        .collect(),
1482                })
1483            }
1484
1485            PlanNode::IndexScan { table, column, key } => {
1486                let key_value = literal_to_value(key)?;
1487                let tbl = self
1488                    .catalog
1489                    .get_table(table)
1490                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1491                let columns: Vec<String> =
1492                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1493
1494                // Fast path: the table has a B-tree on this column. A single
1495                // point lookup returns 0 or 1 rows — this is the whole reason
1496                // the planner bothers emitting IndexScan.
1497                //
1498                // Mission D7: use `lookup_int` on int-keyed indexes to skip
1499                // the Value enum dispatch in the inner binary search. The
1500                // generic `tbl.index_lookup` helper can't do this without
1501                // lying about the key type, so we inline the index+heap
1502                // touch here.
1503                if let Some(btree) = tbl.index(column) {
1504                    let hit = match &key_value {
1505                        Value::Int(k) => btree.lookup_int(*k),
1506                        other => btree.lookup(other),
1507                    };
1508                    let rows = match hit {
1509                        Some(rid) => match tbl.heap.get(rid) {
1510                            Some(data) => vec![decode_row(&tbl.schema, &data)],
1511                            None => Vec::new(),
1512                        },
1513                        None => Vec::new(),
1514                    };
1515                    return Ok(QueryResult::Rows { columns, rows });
1516                }
1517
1518                // Fallback: no index on this column. The planner emits IndexScan
1519                // eagerly (it has no visibility into which columns are indexed
1520                // at plan time), so here we must behave like SeqScan+Filter on
1521                // `.col = literal`: return *all* matching rows, not just the
1522                // first one. A non-indexed column isn't necessarily unique.
1523                // We compile the eq predicate once and stream without any
1524                // per-row decode for non-matching rows.
1525                let schema = &tbl.schema;
1526                let fast = FastLayout::new(schema);
1527                let synth_pred = Expr::BinaryOp(
1528                    Box::new(Expr::Field(column.clone())),
1529                    BinOp::Eq,
1530                    Box::new(key.clone()),
1531                );
1532                if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1533                    // Mission F: skip the first 4 Vec doublings.
1534                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1535                    self.catalog
1536                        .for_each_row_raw(table, |_rid, data| {
1537                            if compiled(data) {
1538                                rows.push(decode_row(schema, data));
1539                            }
1540                        })
1541                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1542                    return Ok(QueryResult::Rows { columns, rows });
1543                }
1544
1545                // Last resort: slow eq-check on materialised rows.
1546                let col_idx =
1547                    schema
1548                        .column_index(column)
1549                        .ok_or_else(|| QueryError::ColumnNotFound {
1550                            table: String::new(),
1551                            column: column.clone(),
1552                        })?;
1553                let rows: Vec<Vec<Value>> = tbl
1554                    .scan()
1555                    .filter_map(|(_, row)| {
1556                        if row[col_idx] == key_value {
1557                            Some(row)
1558                        } else {
1559                            None
1560                        }
1561                    })
1562                    .collect();
1563                Ok(QueryResult::Rows { columns, rows })
1564            }
1565
1566            PlanNode::RangeScan {
1567                table,
1568                column,
1569                start,
1570                end,
1571            } => {
1572                let tbl = self
1573                    .catalog
1574                    .get_table(table)
1575                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1576                let columns: Vec<String> =
1577                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1578                let schema = &tbl.schema;
1579
1580                let start_val = match start {
1581                    Some((expr, _)) => Some(literal_to_value(expr)?),
1582                    None => None,
1583                };
1584                let end_val = match end {
1585                    Some((expr, _)) => Some(literal_to_value(expr)?),
1586                    None => None,
1587                };
1588                let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1589                let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1590
1591                if let Some(btree) = tbl.index(column) {
1592                    let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1593                        (Some(s), Some(e)) => btree.range(s, e).collect(),
1594                        (Some(s), None) => btree.range_from(s),
1595                        (None, Some(e)) => btree.range_to(e),
1596                        (None, None) => {
1597                            let rows: Vec<Vec<Value>> = tbl.scan().map(|(_, row)| row).collect();
1598                            return Ok(QueryResult::Rows { columns, rows });
1599                        }
1600                    };
1601                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1602                    for (key, rid) in hits {
1603                        if !start_inclusive {
1604                            if let Some(ref s) = start_val {
1605                                if &key == s {
1606                                    continue;
1607                                }
1608                            }
1609                        }
1610                        if !end_inclusive {
1611                            if let Some(ref e) = end_val {
1612                                if &key == e {
1613                                    continue;
1614                                }
1615                            }
1616                        }
1617                        if let Some(data) = tbl.heap.get(rid) {
1618                            rows.push(decode_row(schema, &data));
1619                        }
1620                    }
1621                    return Ok(QueryResult::Rows { columns, rows });
1622                }
1623
1624                // Fallback: no index — synthesize range predicate and scan.
1625                let fast = FastLayout::new(schema);
1626                let synth = synthesize_range_predicate(column, start, end);
1627                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1628                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1629                    self.catalog
1630                        .for_each_row_raw(table, |_rid, data| {
1631                            if compiled(data) {
1632                                rows.push(decode_row(schema, data));
1633                            }
1634                        })
1635                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1636                    return Ok(QueryResult::Rows { columns, rows });
1637                }
1638
1639                let col_idx =
1640                    schema
1641                        .column_index(column)
1642                        .ok_or_else(|| QueryError::ColumnNotFound {
1643                            table: String::new(),
1644                            column: column.clone(),
1645                        })?;
1646                let rows: Vec<Vec<Value>> = tbl
1647                    .scan()
1648                    .filter(|(_, row)| {
1649                        range_matches(
1650                            &row[col_idx],
1651                            &start_val,
1652                            start_inclusive,
1653                            &end_val,
1654                            end_inclusive,
1655                        )
1656                    })
1657                    .map(|(_, row)| row)
1658                    .collect();
1659                Ok(QueryResult::Rows { columns, rows })
1660            }
1661        }
1662    }
1663
1664    // ─── Materialized view operations ──────────────────────────────────────
1665
1666    /// Create a materialized view: execute the source query, store results
1667    /// in a new backing table, and register the view.
1668    fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1669        if self.view_registry.is_view(name) {
1670            return Err(QueryError::ViewError(format!(
1671                "materialized view '{name}' already exists"
1672            )));
1673        }
1674        // Execute the source query to get the result set.
1675        let result = self.execute_powql(query_text)?;
1676        let (columns, rows) = match result {
1677            QueryResult::Rows { columns, rows } => (columns, rows),
1678            _ => return Err("view source query must be a SELECT".into()),
1679        };
1680        // Derive a schema for the backing table from the query result columns.
1681        let schema = self.derive_view_schema(name, &columns, &rows);
1682        // Create the backing table and insert the result rows.
1683        self.catalog
1684            .create_table(schema)
1685            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1686        for row in &rows {
1687            self.catalog
1688                .insert(name, row)
1689                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1690        }
1691        // Determine which base tables this view depends on by parsing the query.
1692        let depends_on = self.extract_view_deps(query_text);
1693        self.view_registry
1694            .register(ViewDef {
1695                name: name.to_string(),
1696                query: query_text.to_string(),
1697                depends_on,
1698                dirty: false,
1699            })
1700            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1701        Ok(())
1702    }
1703
1704    /// Refresh a materialized view: re-execute its source query and replace
1705    /// the backing table's contents.
1706    fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1707        let def = self
1708            .view_registry
1709            .get(name)
1710            .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1711        let query_text = def.query.clone();
1712        // Execute the source query.
1713        let result = self.execute_powql(&query_text)?;
1714        let (_columns, rows) = match result {
1715            QueryResult::Rows { columns, rows } => (columns, rows),
1716            _ => return Err("view source query must be a SELECT".into()),
1717        };
1718        // Clear old data and insert fresh results. Mission B2: logged
1719        // variant — view refreshes are a mutation and crash recovery
1720        // must see them.
1721        self.catalog
1722            .scan_delete_matching_logged(name, |_| true)
1723            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1724        for row in &rows {
1725            self.catalog
1726                .insert(name, row)
1727                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1728        }
1729        self.view_registry.mark_clean(name);
1730        Ok(())
1731    }
1732
1733    /// Drop a materialized view: remove the backing table and unregister.
1734    fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1735        if !self.view_registry.is_view(name) {
1736            return Err(QueryError::ViewError(format!(
1737                "materialized view '{name}' not found"
1738            )));
1739        }
1740        self.view_registry
1741            .unregister(name)
1742            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1743        self.catalog
1744            .drop_table(name)
1745            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1746        Ok(())
1747    }
1748
1749    /// Derive a storage `Schema` for a view's backing table from query
1750    /// result column names and the first row's types.
1751    fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1752        use powdb_storage::types::{ColumnDef, TypeId};
1753        let cols: Vec<ColumnDef> = columns
1754            .iter()
1755            .enumerate()
1756            .map(|(i, col_name)| {
1757                let type_id = rows
1758                    .first()
1759                    .and_then(|row| row.get(i))
1760                    .map(|v| v.type_id())
1761                    .unwrap_or(TypeId::Str);
1762                ColumnDef {
1763                    name: col_name.clone(),
1764                    type_id,
1765                    required: false,
1766                    position: i as u16,
1767                }
1768            })
1769            .collect();
1770        Schema {
1771            table_name: name.to_string(),
1772            columns: cols,
1773        }
1774    }
1775
1776    /// Extract base table dependencies from a view's source query by
1777    /// parsing it and collecting the source table name.
1778    fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1779        use crate::parser::parse;
1780        match parse(query_text) {
1781            Ok(Statement::Query(q)) => {
1782                let mut deps = vec![q.source.clone()];
1783                for j in &q.joins {
1784                    deps.push(j.source.clone());
1785                }
1786                deps
1787            }
1788            _ => Vec::new(),
1789        }
1790    }
1791
1792    // ─── Specialized fast paths ─────────────────────────────────────────────
1793    //
1794    // These methods are helpers for the `execute_plan` match arms above.
1795    // Each returns `Ok(Some(result))` when the fast path fires, `Ok(None)`
1796    // when the shape isn't supported (caller falls back to generic code).
1797
1798    /// Aggregate sum/avg/min/max over a single fixed-size i64 column, with
1799    /// an optional compiled filter predicate. Walks raw row bytes — zero
1800    /// per-row allocation. Uses i128 accumulator for sum/avg overflow safety.
1801    pub(super) fn agg_single_col_fast(
1802        &self,
1803        table: &str,
1804        col: &str,
1805        function: AggFunc,
1806        predicate: Option<&Expr>,
1807    ) -> Result<Option<QueryResult>, QueryError> {
1808        let schema = self
1809            .catalog
1810            .schema(table)
1811            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1812            .clone();
1813        let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1814        let col_idx = match schema.column_index(col) {
1815            Some(i) => i,
1816            None => return Ok(None),
1817        };
1818        // Only fast-path fixed-size numeric columns (Int/Float) for
1819        // sum/avg/min/max/count. Mission D10: Float parity — prior version
1820        // bailed on Float columns, forcing them through the generic row-
1821        // decoding path that allocated a Vec<Value> per row and dispatched
1822        // on Value::cmp for every compare. f64 decode is structurally the
1823        // same as i64 (load 8 bytes, cast), so the fast path handles both.
1824        let col_type = schema.columns[col_idx].type_id;
1825        if col_type != TypeId::Int && col_type != TypeId::Float {
1826            return Ok(None);
1827        }
1828
1829        let fast = FastLayout::new(&schema);
1830        // Mission C Phase 20b: inline the numeric-column reader instead of
1831        // building a `Box<dyn Fn>`. Eliminates 100K vtable dispatches per
1832        // 100K-row agg scan — every reader call folds directly into the
1833        // hot loop below.
1834        let byte_offset = match fast.fixed_offsets[col_idx] {
1835            Some(o) => o,
1836            None => return Ok(None),
1837        };
1838        let bitmap_byte = col_idx / 8;
1839        let bitmap_bit = (col_idx % 8) as u32;
1840        let data_offset = 2 + fast.bitmap_size + byte_offset;
1841
1842        // Optional compiled filter.
1843        let compiled_pred: Option<CompiledPredicate> = match predicate {
1844            Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
1845                Some(c) => Some(c),
1846                None => return Ok(None), // let generic path handle it
1847            },
1848            None => None,
1849        };
1850
1851        // Mission C Phase 20b: specialize the inner loop per aggregate
1852        // function. The previous version ran a `match function { ... }`
1853        // *inside* the closure, which kept LLVM from producing optimal
1854        // scalar code for each variant (agg_max regressed ~23% vs the
1855        // baseline Box<dyn Fn> version even though per-row vtable cost
1856        // should have been strictly lower). Pushing the match out of the
1857        // hot loop lets each specialized body fold cleanly into
1858        // `for_each_row_raw` and removes a captured `AggFunc` + match
1859        // dispatch per row.
1860        //
1861        // Mission D10: same specialisation applies to the Float branch.
1862        // For Min/Max we use `f64::total_cmp` so the result matches
1863        // `Value::Ord` — this is the same ordering ORDER BY and the
1864        // top-N sort fast path use, keeping semantics consistent across
1865        // read paths (NaN compares as greatest, -0.0 < +0.0 for
1866        // deterministic tie-breaking).
1867        //
1868        // Mission D11 Phase 1: each inner loop now splits on presence of
1869        // a predicate (`if let Some(pred) = &compiled_pred`) so the hot
1870        // body never re-tests `Option` per row, and reads column bytes
1871        // via `read_i64_unchecked` / `read_f64_unchecked` helpers that
1872        // drop two bounds checks per row (null bitmap byte + value
1873        // slice). Safety is carried by the `FastLayout` invariant that
1874        // `data_offset + 8 <= row_len` for any fixed-size column; see
1875        // the helper doc comments. Hot loops are macro-generated so the
1876        // with-pred / no-pred split can't drift between variants.
1877        let result = match col_type {
1878            TypeId::Int => match function {
1879                AggFunc::Sum | AggFunc::Avg => {
1880                    let mut sum_i128: i128 = 0;
1881                    let mut count: i64 = 0;
1882                    agg_int_loop!(
1883                        self,
1884                        table,
1885                        compiled_pred,
1886                        bitmap_byte,
1887                        bitmap_bit,
1888                        data_offset,
1889                        |v: i64| {
1890                            count += 1;
1891                            sum_i128 += v as i128;
1892                        }
1893                    );
1894                    if matches!(function, AggFunc::Sum) {
1895                        let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
1896                        QueryResult::Scalar(Value::Int(clamped))
1897                    } else if count == 0 {
1898                        QueryResult::Scalar(Value::Empty)
1899                    } else {
1900                        let avg = (sum_i128 as f64) / (count as f64);
1901                        QueryResult::Scalar(Value::Float(avg))
1902                    }
1903                }
1904                AggFunc::Min => {
1905                    let mut min_v: Option<i64> = None;
1906                    agg_int_loop!(
1907                        self,
1908                        table,
1909                        compiled_pred,
1910                        bitmap_byte,
1911                        bitmap_bit,
1912                        data_offset,
1913                        |v: i64| {
1914                            min_v = Some(match min_v {
1915                                Some(m) => m.min(v),
1916                                None => v,
1917                            });
1918                        }
1919                    );
1920                    QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
1921                }
1922                AggFunc::Max => {
1923                    let mut max_v: Option<i64> = None;
1924                    agg_int_loop!(
1925                        self,
1926                        table,
1927                        compiled_pred,
1928                        bitmap_byte,
1929                        bitmap_bit,
1930                        data_offset,
1931                        |v: i64| {
1932                            max_v = Some(match max_v {
1933                                Some(m) => m.max(v),
1934                                None => v,
1935                            });
1936                        }
1937                    );
1938                    QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
1939                }
1940                AggFunc::Count => {
1941                    let mut count: i64 = 0;
1942                    agg_int_loop!(
1943                        self,
1944                        table,
1945                        compiled_pred,
1946                        bitmap_byte,
1947                        bitmap_bit,
1948                        data_offset,
1949                        |_v: i64| {
1950                            count += 1;
1951                        }
1952                    );
1953                    QueryResult::Scalar(Value::Int(count))
1954                }
1955                AggFunc::CountDistinct => {
1956                    let mut seen = rustc_hash::FxHashSet::default();
1957                    agg_int_loop!(
1958                        self,
1959                        table,
1960                        compiled_pred,
1961                        bitmap_byte,
1962                        bitmap_bit,
1963                        data_offset,
1964                        |v: i64| {
1965                            seen.insert(v);
1966                        }
1967                    );
1968                    QueryResult::Scalar(Value::Int(seen.len() as i64))
1969                }
1970            },
1971            TypeId::Float => match function {
1972                AggFunc::Sum => {
1973                    // Use a single f64 accumulator. Naive summation is
1974                    // sufficient for MVP parity; if precision becomes an
1975                    // issue on long scans we can upgrade to Kahan–Neumaier
1976                    // compensated sum (~2x scalar cost, zero error growth).
1977                    let mut sum: f64 = 0.0;
1978                    agg_float_loop!(
1979                        self,
1980                        table,
1981                        compiled_pred,
1982                        bitmap_byte,
1983                        bitmap_bit,
1984                        data_offset,
1985                        |v: f64| {
1986                            sum += v;
1987                        }
1988                    );
1989                    QueryResult::Scalar(Value::Float(sum))
1990                }
1991                AggFunc::Avg => {
1992                    let mut sum: f64 = 0.0;
1993                    let mut count: i64 = 0;
1994                    agg_float_loop!(
1995                        self,
1996                        table,
1997                        compiled_pred,
1998                        bitmap_byte,
1999                        bitmap_bit,
2000                        data_offset,
2001                        |v: f64| {
2002                            sum += v;
2003                            count += 1;
2004                        }
2005                    );
2006                    if count == 0 {
2007                        QueryResult::Scalar(Value::Empty)
2008                    } else {
2009                        QueryResult::Scalar(Value::Float(sum / count as f64))
2010                    }
2011                }
2012                AggFunc::Min => {
2013                    // `total_cmp` for deterministic NaN handling (matches
2014                    // Value::Ord). NaN compares greatest, so Min will
2015                    // correctly ignore it in favour of any finite value.
2016                    let mut min_v: Option<f64> = None;
2017                    agg_float_loop!(
2018                        self,
2019                        table,
2020                        compiled_pred,
2021                        bitmap_byte,
2022                        bitmap_bit,
2023                        data_offset,
2024                        |v: f64| {
2025                            min_v = Some(match min_v {
2026                                Some(m) => {
2027                                    if v.total_cmp(&m).is_lt() {
2028                                        v
2029                                    } else {
2030                                        m
2031                                    }
2032                                }
2033                                None => v,
2034                            });
2035                        }
2036                    );
2037                    QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2038                }
2039                AggFunc::Max => {
2040                    let mut max_v: Option<f64> = None;
2041                    agg_float_loop!(
2042                        self,
2043                        table,
2044                        compiled_pred,
2045                        bitmap_byte,
2046                        bitmap_bit,
2047                        data_offset,
2048                        |v: f64| {
2049                            max_v = Some(match max_v {
2050                                Some(m) => {
2051                                    if v.total_cmp(&m).is_gt() {
2052                                        v
2053                                    } else {
2054                                        m
2055                                    }
2056                                }
2057                                None => v,
2058                            });
2059                        }
2060                    );
2061                    QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2062                }
2063                AggFunc::Count => {
2064                    let mut count: i64 = 0;
2065                    agg_float_loop!(
2066                        self,
2067                        table,
2068                        compiled_pred,
2069                        bitmap_byte,
2070                        bitmap_bit,
2071                        data_offset,
2072                        |_v: f64| {
2073                            count += 1;
2074                        }
2075                    );
2076                    QueryResult::Scalar(Value::Int(count))
2077                }
2078                AggFunc::CountDistinct => {
2079                    // Hash on `f64::to_bits` — matches `Value::Hash`, so
2080                    // distinct NaN bit patterns count as distinct and
2081                    // -0.0/+0.0 count as distinct. Consistent with how
2082                    // Float values are hashed in every other DISTINCT /
2083                    // GROUP BY path.
2084                    let mut seen = rustc_hash::FxHashSet::default();
2085                    agg_float_loop!(
2086                        self,
2087                        table,
2088                        compiled_pred,
2089                        bitmap_byte,
2090                        bitmap_bit,
2091                        data_offset,
2092                        |v: f64| {
2093                            seen.insert(v.to_bits());
2094                        }
2095                    );
2096                    QueryResult::Scalar(Value::Int(seen.len() as i64))
2097                }
2098            },
2099            _ => unreachable!("type guard above restricts to Int/Float"),
2100        };
2101        Ok(Some(result))
2102    }
2103
2104    /// `Project(Limit(Filter(SeqScan)))` and `Project(Limit(SeqScan))`.
2105    /// Streams rows, decodes only projected columns, stops at the limit.
2106    pub(super) fn project_filter_limit_fast(
2107        &self,
2108        table: &str,
2109        fields: &[ProjectField],
2110        limit: usize,
2111        predicate: Option<&Expr>,
2112    ) -> Result<Option<QueryResult>, QueryError> {
2113        let schema = self
2114            .catalog
2115            .schema(table)
2116            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2117            .clone();
2118        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2119
2120        // Each projection field must be a simple `.field` reference for this
2121        // fast path. Aliased or computed fields fall through.
2122        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2123        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2124        for f in fields {
2125            let name = match &f.expr {
2126                Expr::Field(n) => n.clone(),
2127                _ => return Ok(None),
2128            };
2129            let idx = match all_columns.iter().position(|c| c == &name) {
2130                Some(i) => i,
2131                None => return Ok(None),
2132            };
2133            proj_indices.push(idx);
2134            proj_columns.push(f.alias.clone().unwrap_or(name));
2135        }
2136
2137        let fast = FastLayout::new(&schema);
2138        let row_layout = RowLayout::new(&schema);
2139
2140        let compiled_pred: Option<CompiledPredicate> = match predicate {
2141            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2142                Some(c) => Some(c),
2143                None => return Ok(None),
2144            },
2145            None => None,
2146        };
2147
2148        let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2149        // Mission D2: use try_for_each_row_raw to actually stop iterating
2150        // once the limit is reached. The previous `done` flag only short-
2151        // circuited the closure body, so a `limit 100` over 100K rows still
2152        // walked all 100K slots — burning ~30x SQLite on scan_filter_project_top100.
2153        self.catalog
2154            .try_for_each_row_raw(table, |_rid, data| {
2155                use std::ops::ControlFlow;
2156                if let Some(ref pred) = compiled_pred {
2157                    if !pred(data) {
2158                        return ControlFlow::Continue(());
2159                    }
2160                }
2161                let row: Vec<Value> = proj_indices
2162                    .iter()
2163                    .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2164                    .collect();
2165                out.push(row);
2166                if out.len() >= limit {
2167                    ControlFlow::Break(())
2168                } else {
2169                    ControlFlow::Continue(())
2170                }
2171            })
2172            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2173
2174        Ok(Some(QueryResult::Rows {
2175            columns: proj_columns,
2176            rows: out,
2177        }))
2178    }
2179
2180    /// `Project(Limit(Sort(Filter(SeqScan))))` and `Project(Limit(Sort(SeqScan)))`.
2181    /// Bounded top-N heap over the sort key. Only the sort key needs to be
2182    /// read per row; projected columns are decoded only for the final
2183    /// winning rows when the heap drains.
2184    pub(super) fn project_filter_sort_limit_fast(
2185        &self,
2186        table: &str,
2187        fields: &[ProjectField],
2188        sort_field: &str,
2189        descending: bool,
2190        limit: usize,
2191        predicate: Option<&Expr>,
2192    ) -> Result<Option<QueryResult>, QueryError> {
2193        if limit == 0 {
2194            // Degenerate case — empty result. Let the generic path handle it
2195            // for proper column naming.
2196            return Ok(None);
2197        }
2198        let schema = self
2199            .catalog
2200            .schema(table)
2201            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2202            .clone();
2203        let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2204
2205        // Sort key must be a fixed-size numeric column (Int or Float).
2206        // Mission D10: extended from Int-only. Float sort keys use a
2207        // sortable-u64 transform (see `f64_to_sortable_u64`) so the heap
2208        // path stays keyed on `u64` and the whole branch shape is
2209        // identical to the Int case — no new heap types, no `total_cmp`
2210        // closures in the hot loop.
2211        let sort_idx = match schema.column_index(sort_field) {
2212            Some(i) => i,
2213            None => return Ok(None),
2214        };
2215        let sort_col_type = schema.columns[sort_idx].type_id;
2216        if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2217            return Ok(None);
2218        }
2219
2220        // Each projection field must be a simple `.field`.
2221        let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2222        let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2223        for f in fields {
2224            let name = match &f.expr {
2225                Expr::Field(n) => n.clone(),
2226                _ => return Ok(None),
2227            };
2228            let idx = match all_columns.iter().position(|c| c == &name) {
2229                Some(i) => i,
2230                None => return Ok(None),
2231            };
2232            proj_indices.push(idx);
2233            proj_columns.push(f.alias.clone().unwrap_or(name));
2234        }
2235
2236        let fast = FastLayout::new(&schema);
2237        let row_layout = RowLayout::new(&schema);
2238        // Mission C Phase 20b: inline numeric-column reader (no Box<dyn Fn>).
2239        let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2240            Some(o) => o,
2241            None => return Ok(None),
2242        };
2243        let sort_bitmap_byte = sort_idx / 8;
2244        let sort_bitmap_bit = (sort_idx % 8) as u32;
2245        let sort_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2246
2247        let compiled_pred: Option<CompiledPredicate> = match predicate {
2248            Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2249                Some(c) => Some(c),
2250                None => return Ok(None),
2251            },
2252            None => None,
2253        };
2254
2255        // Bounded top-N heap. For `order .x desc limit N`, we want the N
2256        // largest values — use a min-heap so the smallest is at the top and
2257        // can be popped when a better candidate arrives. For ascending, use
2258        // a max-heap. We tie-break with a monotonic `seq` counter so the
2259        // result is deterministic and stable.
2260        //
2261        // To keep this simple we maintain two typed heaps and pick by
2262        // direction.
2263        let drained: Vec<Vec<u8>> = match sort_col_type {
2264            TypeId::Int => {
2265                let mut seq: u64 = 0;
2266                let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2267                    BinaryHeap::with_capacity(limit);
2268                let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2269                    BinaryHeap::with_capacity(limit);
2270
2271                self.catalog
2272                    .for_each_row_raw(table, |_rid, data| {
2273                        if let Some(ref pred) = compiled_pred {
2274                            if !pred(data) {
2275                                return;
2276                            }
2277                        }
2278                        // Inlined int-column reader: null check + i64 decode.
2279                        if data.len() < sort_data_offset + 8 {
2280                            return;
2281                        }
2282                        let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2283                        if is_null {
2284                            return;
2285                        }
2286                        let key = i64::from_le_bytes(
2287                            data[sort_data_offset..sort_data_offset + 8]
2288                                .try_into()
2289                                .unwrap_or_else(|_| unreachable!()),
2290                        );
2291                        let id = seq;
2292                        seq += 1;
2293
2294                        if descending {
2295                            if heap_desc.len() < limit {
2296                                heap_desc.push(Reverse((key, id, data.to_vec())));
2297                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2298                                if key > *top_key {
2299                                    heap_desc.pop();
2300                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2301                                }
2302                            }
2303                        } else if heap_asc.len() < limit {
2304                            heap_asc.push((key, id, data.to_vec()));
2305                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2306                            if key < *top_key {
2307                                heap_asc.pop();
2308                                heap_asc.push((key, id, data.to_vec()));
2309                            }
2310                        }
2311                    })
2312                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2313
2314                let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2315                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2316                } else {
2317                    heap_asc.into_iter().collect()
2318                };
2319                if descending {
2320                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2321                } else {
2322                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2323                }
2324                drained.into_iter().map(|(_, _, d)| d).collect()
2325            }
2326            TypeId::Float => {
2327                // Novel angle: rather than introducing a `TotalF64` newtype
2328                // with `Ord via total_cmp`, transform the f64 bit pattern
2329                // into a sortable `u64` so `BinaryHeap<u64>` orders exactly
2330                // like `f64::total_cmp` would. Classic trick: flip the sign
2331                // bit on positives, flip all bits on negatives. Result:
2332                // - NaN (sign=0) stays greatest, matching total_cmp
2333                // - -0.0 sorts before +0.0, matching total_cmp
2334                // - Hot loop is branch-cheap (one compare + one xor)
2335                let mut seq: u64 = 0;
2336                let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2337                    BinaryHeap::with_capacity(limit);
2338                let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2339                    BinaryHeap::with_capacity(limit);
2340
2341                self.catalog
2342                    .for_each_row_raw(table, |_rid, data| {
2343                        if let Some(ref pred) = compiled_pred {
2344                            if !pred(data) {
2345                                return;
2346                            }
2347                        }
2348                        if data.len() < sort_data_offset + 8 {
2349                            return;
2350                        }
2351                        let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2352                        if is_null {
2353                            return;
2354                        }
2355                        let bits = u64::from_le_bytes(
2356                            data[sort_data_offset..sort_data_offset + 8]
2357                                .try_into()
2358                                .unwrap_or_else(|_| unreachable!()),
2359                        );
2360                        let key = f64_bits_to_sortable_u64(bits);
2361                        let id = seq;
2362                        seq += 1;
2363
2364                        if descending {
2365                            if heap_desc.len() < limit {
2366                                heap_desc.push(Reverse((key, id, data.to_vec())));
2367                            } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2368                                if key > *top_key {
2369                                    heap_desc.pop();
2370                                    heap_desc.push(Reverse((key, id, data.to_vec())));
2371                                }
2372                            }
2373                        } else if heap_asc.len() < limit {
2374                            heap_asc.push((key, id, data.to_vec()));
2375                        } else if let Some((top_key, _, _)) = heap_asc.peek() {
2376                            if key < *top_key {
2377                                heap_asc.pop();
2378                                heap_asc.push((key, id, data.to_vec()));
2379                            }
2380                        }
2381                    })
2382                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
2383
2384                let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2385                    heap_desc.into_iter().map(|Reverse(t)| t).collect()
2386                } else {
2387                    heap_asc.into_iter().collect()
2388                };
2389                if descending {
2390                    drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2391                } else {
2392                    drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2393                }
2394                drained.into_iter().map(|(_, _, d)| d).collect()
2395            }
2396            _ => unreachable!("type guard above restricts to Int/Float"),
2397        };
2398
2399        let rows: Vec<Vec<Value>> = drained
2400            .into_iter()
2401            .map(|data| {
2402                proj_indices
2403                    .iter()
2404                    .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2405                    .collect()
2406            })
2407            .collect();
2408
2409        Ok(Some(QueryResult::Rows {
2410            columns: proj_columns,
2411            rows,
2412        }))
2413    }
2414
2415    /// Gather the RowIds that a mutation should operate on, without
2416    /// materialising the full row set. Handles the shapes the planner emits
2417    /// for update/delete: SeqScan, IndexScan, and Filter(SeqScan). Other
2418    /// shapes fall back to `generic_rid_match`.
2419    ///
2420    /// Perf sprint: try to fuse the predicate evaluation and in-place
2421    /// byte-level mutation into a single heap walk. Returns `Some(result)`
2422    /// if the fused path fired, `None` to fall through to the generic
2423    /// two-pass code.
2424    ///
2425    /// Covers two shapes:
2426    /// 1. Fixed-width non-null literal assignments on non-indexed columns
2427    ///    → byte-patch every matched row in place (row length unchanged).
2428    /// 2. Single var-col literal assignment on a non-indexed column
2429    ///    → `patch_var_column_in_place` on every matched row (may shrink);
2430    ///    rows that can't be patched in place are collected for fallback.
2431    fn try_fused_scan_update(
2432        &mut self,
2433        table: &str,
2434        predicate: &Expr,
2435        resolved: &[(usize, Value)],
2436        changed_cols: &[usize],
2437    ) -> Option<Result<QueryResult, QueryError>> {
2438        // Build compiled predicate. Requires a schema borrow that must be
2439        // dropped before we call scan_patch_matching_logged.
2440        let compiled = {
2441            let schema = self.catalog.schema(table)?;
2442            let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2443            let fast = FastLayout::new(schema);
2444            compile_predicate(predicate, &columns, &fast, schema)?
2445        };
2446
2447        // ── Path 1: fixed-width fast patch ──────────────────────────
2448        let fixed_patches: Option<Vec<FastPatch>> = {
2449            let tbl = self.catalog.get_table(table)?;
2450            let schema = &tbl.schema;
2451            let all_fixed_nonnull = resolved
2452                .iter()
2453                .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2454            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2455            if all_fixed_nonnull && no_indexed {
2456                let layout = RowLayout::new(schema);
2457                let bitmap_size = layout.bitmap_size();
2458                Some(
2459                    resolved
2460                        .iter()
2461                        .map(|(idx, val)| {
2462                            let fixed_off = layout
2463                                .fixed_offset(*idx)
2464                                .expect("is_fixed_size already checked");
2465                            let field_off = 2 + bitmap_size + fixed_off;
2466                            let bytes: FixedBytes = match val {
2467                                Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2468                                Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2469                                Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2470                                Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2471                                Value::Uuid(v) => FixedBytes::Uuid(*v),
2472                                _ => unreachable!("all_fixed_nonnull guard"),
2473                            };
2474                            FastPatch {
2475                                field_off,
2476                                bitmap_byte_off: 2 + idx / 8,
2477                                bit_mask: 1u8 << (idx % 8),
2478                                bytes,
2479                            }
2480                        })
2481                        .collect(),
2482                )
2483            } else {
2484                None
2485            }
2486        };
2487        if let Some(patches) = fixed_patches {
2488            let result = self
2489                .catalog
2490                .scan_patch_matching_logged(table, compiled, |row| {
2491                    for p in &patches {
2492                        row[p.bitmap_byte_off] &= !p.bit_mask;
2493                        let field_bytes = p.bytes.as_slice();
2494                        row[p.field_off..p.field_off + field_bytes.len()]
2495                            .copy_from_slice(field_bytes);
2496                    }
2497                    Some(row.len() as u16)
2498                })
2499                .map_err(|e| e.to_string());
2500            match result {
2501                Ok((count, _)) => {
2502                    self.view_registry.mark_dependents_dirty(table);
2503                    return Some(Ok(QueryResult::Modified(count)));
2504                }
2505                Err(e) => return Some(Err(QueryError::Execution(e))),
2506            }
2507        }
2508
2509        // ── Path 2: single var-col shrink fast patch ────────────────
2510        let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2511            let tbl = self.catalog.get_table(table)?;
2512            let schema = &tbl.schema;
2513            let is_single = resolved.len() == 1;
2514            let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2515            let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2516            if is_single && is_var && no_indexed {
2517                let (idx, val) = &resolved[0];
2518                let bytes_opt = match val {
2519                    Value::Str(s) => Some(s.as_bytes().to_vec()),
2520                    Value::Bytes(b) => Some(b.clone()),
2521                    Value::Empty => None,
2522                    _ => return None, // type mismatch, fall through
2523                };
2524                Some((*idx, bytes_opt))
2525            } else {
2526                None
2527            }
2528        };
2529        if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2530            // Build a fresh RowLayout before the mutable borrow.
2531            let layout = {
2532                let schema = self.catalog.schema(table)?;
2533                RowLayout::new(schema)
2534            };
2535            let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2536            let result = self
2537                .catalog
2538                .scan_patch_matching_logged(table, compiled, |row| {
2539                    patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2540                })
2541                .map_err(|e| e.to_string());
2542            match result {
2543                Ok((mut count, fallback_rids)) => {
2544                    // Handle rows where in-place patch failed (new > old).
2545                    for rid in fallback_rids {
2546                        let mut row = match self.catalog.get(table, rid) {
2547                            Some(r) => r,
2548                            None => continue,
2549                        };
2550                        for (idx, val) in resolved.iter() {
2551                            row[*idx] = val.clone();
2552                        }
2553                        self.catalog
2554                            .update_hinted(table, rid, &row, Some(changed_cols))
2555                            .map_err(|e| e.to_string())
2556                            .ok();
2557                        count += 1;
2558                    }
2559                    self.view_registry.mark_dependents_dirty(table);
2560                    return Some(Ok(QueryResult::Modified(count)));
2561                }
2562                Err(e) => return Some(Err(QueryError::Execution(e))),
2563            }
2564        }
2565
2566        None // no fused path applicable — fall through
2567    }
2568
2569    /// Mission C Phase 3: schema is looked up via `self.catalog.schema(table)`
2570    /// inside the branches that actually need it. Previously the caller had
2571    /// to clone the full Schema (6+ String allocs) before every mutation just
2572    /// so this function could borrow it — a cost the update/delete hot path
2573    /// did not need.
2574    fn collect_rids_for_mutation(
2575        &mut self,
2576        input: &PlanNode,
2577        table: &str,
2578    ) -> Result<Vec<RowId>, QueryError> {
2579        match input {
2580            PlanNode::SeqScan { table: t } if t == table => {
2581                // "Update/delete everything" — rare but legal.
2582                let rids: Vec<RowId> = self
2583                    .catalog
2584                    .scan(table)
2585                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2586                    .map(|(rid, _)| rid)
2587                    .collect();
2588                Ok(rids)
2589            }
2590            PlanNode::IndexScan {
2591                table: t,
2592                column,
2593                key,
2594            } if t == table => {
2595                let key_value = literal_to_value(key)?;
2596
2597                // Indexed case: single lookup, 0 or 1 rows.
2598                // Mission D7: int-specialized fast path on int-keyed indexes
2599                // (primary keys, created_at, etc.) — the common case for
2600                // `update_by_pk` / `delete where id = ?`.
2601                //
2602                // Scope the `tbl` borrow so it's released before we fall
2603                // through to the scan-based paths below (which reborrow
2604                // `self.catalog`).
2605                {
2606                    let tbl = self
2607                        .catalog
2608                        .get_table(table)
2609                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2610                    if let Some(btree) = tbl.index(column) {
2611                        let hit = match &key_value {
2612                            Value::Int(k) => btree.lookup_int(*k),
2613                            other => btree.lookup(other),
2614                        };
2615                        return Ok(match hit {
2616                            Some(rid) => vec![rid],
2617                            None => Vec::new(),
2618                        });
2619                    }
2620                }
2621
2622                // No index: the planner folds `.col = literal` to IndexScan
2623                // regardless of whether the column is actually unique. When
2624                // there's no index we must behave like Filter(SeqScan) and
2625                // return *all* matching RIDs — not just the first one.
2626                let schema = self
2627                    .catalog
2628                    .schema(table)
2629                    .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2630                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2631                let fast = FastLayout::new(schema);
2632                let synth = Expr::BinaryOp(
2633                    Box::new(Expr::Field(column.clone())),
2634                    BinOp::Eq,
2635                    Box::new(key.clone()),
2636                );
2637                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2638                    // Mission F: skip the first 4 Vec doublings.
2639                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2640                    self.catalog
2641                        .for_each_row_raw(table, |rid, data| {
2642                            if compiled(data) {
2643                                rids.push(rid);
2644                            }
2645                        })
2646                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2647                    return Ok(rids);
2648                }
2649
2650                // Fallback: decode each row, compare values.
2651                let col_idx =
2652                    schema
2653                        .column_index(column)
2654                        .ok_or_else(|| QueryError::ColumnNotFound {
2655                            table: String::new(),
2656                            column: column.clone(),
2657                        })?;
2658                let rids: Vec<RowId> = self
2659                    .catalog
2660                    .scan(table)
2661                    .map_err(|e| QueryError::StorageError(e.to_string()))?
2662                    .filter_map(|(rid, row)| {
2663                        if row[col_idx] == key_value {
2664                            Some(rid)
2665                        } else {
2666                            None
2667                        }
2668                    })
2669                    .collect();
2670                Ok(rids)
2671            }
2672            PlanNode::Filter {
2673                input: inner,
2674                predicate,
2675            } => {
2676                if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2677                    if t != table {
2678                        return self.generic_rid_match(input, table);
2679                    }
2680                    let schema = self
2681                        .catalog
2682                        .schema(table)
2683                        .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2684                    let columns: Vec<String> =
2685                        schema.columns.iter().map(|c| c.name.clone()).collect();
2686                    let fast = FastLayout::new(schema);
2687                    let row_layout = RowLayout::new(schema);
2688
2689                    // Try compiled predicate first.
2690                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2691                        // Mission F: skip the first 4 Vec doublings.
2692                        let mut rids: Vec<RowId> = Vec::with_capacity(64);
2693                        self.catalog
2694                            .for_each_row_raw(table, |rid, data| {
2695                                if compiled(data) {
2696                                    rids.push(rid);
2697                                }
2698                            })
2699                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
2700                        return Ok(rids);
2701                    }
2702
2703                    // Fallback: selective decode + eval.
2704                    let pred_cols = predicate_column_indices(predicate, &columns);
2705                    let mut rids: Vec<RowId> = Vec::with_capacity(64);
2706                    self.catalog
2707                        .for_each_row_raw(table, |rid, data| {
2708                            let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2709                            if eval_predicate(predicate, &pred_row, &columns) {
2710                                rids.push(rid);
2711                            }
2712                        })
2713                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
2714                    return Ok(rids);
2715                }
2716                self.generic_rid_match(input, table)
2717            }
2718            _ => self.generic_rid_match(input, table),
2719        }
2720    }
2721
2722    /// Last-ditch generic match: execute the plan, collect matching rows,
2723    /// then find corresponding RowIds by value equality. This is the old
2724    /// O(N*M) code path; only used when the plan shape is something exotic.
2725    fn generic_rid_match(
2726        &mut self,
2727        input: &PlanNode,
2728        table: &str,
2729    ) -> Result<Vec<RowId>, QueryError> {
2730        let result = self.execute_plan(input)?;
2731        let rows = match result {
2732            QueryResult::Rows { rows, .. } => rows,
2733            _ => return Err("mutation source must be rows".into()),
2734        };
2735        let matching: Vec<RowId> = self
2736            .catalog
2737            .scan(table)
2738            .map_err(|e| QueryError::StorageError(e.to_string()))?
2739            .filter(|(_, row)| rows.iter().any(|r| r == row))
2740            .map(|(rid, _)| rid)
2741            .collect();
2742        Ok(matching)
2743    }
2744}
2745
2746pub(super) fn execute_window(
2747    result: QueryResult,
2748    windows: &[WindowDef],
2749) -> Result<QueryResult, QueryError> {
2750    let (mut columns, mut rows) = match result {
2751        QueryResult::Rows { columns, rows } => (columns, rows),
2752        _ => return Err("window function requires row input".into()),
2753    };
2754
2755    for wdef in windows {
2756        // Resolve partition/order column indices against current columns.
2757        let part_indices: Vec<usize> = wdef
2758            .partition_by
2759            .iter()
2760            .map(|name| {
2761                columns
2762                    .iter()
2763                    .position(|c| c == name)
2764                    .ok_or_else(|| format!("window partition column '{name}' not found"))
2765            })
2766            .collect::<Result<Vec<_>, _>>()?;
2767
2768        let ord_indices: Vec<(usize, bool)> = wdef
2769            .order_by
2770            .iter()
2771            .map(|sk| {
2772                columns
2773                    .iter()
2774                    .position(|c| c == &sk.field)
2775                    .map(|i| (i, sk.descending))
2776                    .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2777            })
2778            .collect::<Result<Vec<_>, _>>()?;
2779
2780        // Resolve the argument column index (for aggregate windows).
2781        let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2782            match arg {
2783                Expr::Field(name) => {
2784                    if name == "*" {
2785                        None // count(*) style — no specific column
2786                    } else {
2787                        Some(
2788                            columns
2789                                .iter()
2790                                .position(|c| c == name)
2791                                .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2792                        )
2793                    }
2794                }
2795                _ => None,
2796            }
2797        } else {
2798            None
2799        };
2800
2801        // Build a sort-index to sort rows by partition_by then order_by
2802        // without actually reordering the original Vec (we need original
2803        // order to write results back).
2804        let n = rows.len();
2805        let mut indices: Vec<usize> = (0..n).collect();
2806        indices.sort_by(|&a, &b| {
2807            // Compare partition keys first.
2808            for &pi in &part_indices {
2809                let cmp = rows[a][pi].cmp(&rows[b][pi]);
2810                if cmp != std::cmp::Ordering::Equal {
2811                    return cmp;
2812                }
2813            }
2814            // Then order keys.
2815            for &(oi, desc) in &ord_indices {
2816                let cmp = rows[a][oi].cmp(&rows[b][oi]);
2817                if cmp != std::cmp::Ordering::Equal {
2818                    return if desc { cmp.reverse() } else { cmp };
2819                }
2820            }
2821            std::cmp::Ordering::Equal
2822        });
2823
2824        // Compute window values in sorted order, tracking partition boundaries.
2825        let mut win_values: Vec<Value> = vec![Value::Empty; n];
2826        let mut partition_start = 0usize;
2827        // Running state for aggregate windows:
2828        let mut running_count: i64 = 0;
2829        let mut running_int_sum: i64 = 0;
2830        let mut running_float_sum: f64 = 0.0;
2831        let mut running_saw_float = false;
2832        let mut running_min: Option<Value> = None;
2833        let mut running_max: Option<Value> = None;
2834        let mut rank_counter: i64 = 0;
2835        let mut dense_rank_counter: i64 = 0;
2836        let mut prev_order_key: Option<Vec<Value>> = None;
2837        let mut same_rank_count: i64 = 0;
2838
2839        for sorted_pos in 0..n {
2840            let row_idx = indices[sorted_pos];
2841
2842            // Detect partition boundary.
2843            let new_partition = if sorted_pos == 0 {
2844                true
2845            } else {
2846                let prev_row_idx = indices[sorted_pos - 1];
2847                part_indices
2848                    .iter()
2849                    .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
2850            };
2851
2852            if new_partition {
2853                partition_start = sorted_pos;
2854                running_count = 0;
2855                running_int_sum = 0;
2856                running_float_sum = 0.0;
2857                running_saw_float = false;
2858                running_min = None;
2859                running_max = None;
2860                rank_counter = 0;
2861                dense_rank_counter = 0;
2862                prev_order_key = None;
2863                same_rank_count = 0;
2864            }
2865
2866            // Extract current order key for rank tracking.
2867            let current_order_key: Vec<Value> = ord_indices
2868                .iter()
2869                .map(|&(oi, _)| rows[row_idx][oi].clone())
2870                .collect();
2871            let same_as_prev = prev_order_key.as_ref() == Some(&current_order_key);
2872
2873            let value = match wdef.function {
2874                WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
2875                WindowFunc::Rank => {
2876                    if same_as_prev {
2877                        same_rank_count += 1;
2878                    } else {
2879                        rank_counter += same_rank_count + 1;
2880                        same_rank_count = 0;
2881                        if rank_counter == 0 {
2882                            rank_counter = 1;
2883                        }
2884                    }
2885                    Value::Int(rank_counter)
2886                }
2887                WindowFunc::DenseRank => {
2888                    if !same_as_prev {
2889                        dense_rank_counter += 1;
2890                    }
2891                    Value::Int(dense_rank_counter)
2892                }
2893                WindowFunc::Sum => {
2894                    if let Some(ci) = arg_col_idx {
2895                        match &rows[row_idx][ci] {
2896                            Value::Int(v) => running_int_sum += v,
2897                            Value::Float(v) => {
2898                                running_float_sum += v;
2899                                running_saw_float = true;
2900                            }
2901                            _ => {}
2902                        }
2903                    }
2904                    if running_saw_float {
2905                        Value::Float(running_float_sum + running_int_sum as f64)
2906                    } else {
2907                        Value::Int(running_int_sum)
2908                    }
2909                }
2910                WindowFunc::Avg => {
2911                    if let Some(ci) = arg_col_idx {
2912                        match &rows[row_idx][ci] {
2913                            Value::Int(v) => {
2914                                running_float_sum += *v as f64;
2915                                running_count += 1;
2916                            }
2917                            Value::Float(v) => {
2918                                running_float_sum += v;
2919                                running_count += 1;
2920                            }
2921                            _ => {}
2922                        }
2923                    }
2924                    if running_count == 0 {
2925                        Value::Empty
2926                    } else {
2927                        Value::Float(running_float_sum / running_count as f64)
2928                    }
2929                }
2930                WindowFunc::Count => {
2931                    if let Some(ci) = arg_col_idx {
2932                        if !rows[row_idx][ci].is_empty() {
2933                            running_count += 1;
2934                        }
2935                    } else {
2936                        // count(*) — count all rows
2937                        running_count += 1;
2938                    }
2939                    Value::Int(running_count)
2940                }
2941                WindowFunc::Min => {
2942                    if let Some(ci) = arg_col_idx {
2943                        let v = &rows[row_idx][ci];
2944                        if !v.is_empty() {
2945                            running_min = Some(match &running_min {
2946                                None => v.clone(),
2947                                Some(cur) => {
2948                                    if v < cur {
2949                                        v.clone()
2950                                    } else {
2951                                        cur.clone()
2952                                    }
2953                                }
2954                            });
2955                        }
2956                    }
2957                    running_min.clone().unwrap_or(Value::Empty)
2958                }
2959                WindowFunc::Max => {
2960                    if let Some(ci) = arg_col_idx {
2961                        let v = &rows[row_idx][ci];
2962                        if !v.is_empty() {
2963                            running_max = Some(match &running_max {
2964                                None => v.clone(),
2965                                Some(cur) => {
2966                                    if v > cur {
2967                                        v.clone()
2968                                    } else {
2969                                        cur.clone()
2970                                    }
2971                                }
2972                            });
2973                        }
2974                    }
2975                    running_max.clone().unwrap_or(Value::Empty)
2976                }
2977            };
2978
2979            prev_order_key = Some(current_order_key);
2980            win_values[row_idx] = value;
2981        }
2982
2983        // Append the computed window column to each row.
2984        for (ri, row) in rows.iter_mut().enumerate() {
2985            row.push(win_values[ri].clone());
2986        }
2987        columns.push(wdef.output_name.clone());
2988    }
2989
2990    Ok(QueryResult::Rows { columns, rows })
2991}
2992
2993/// Mission E2b: compute one aggregate over a set of rows in a group.
2994pub(super) fn compute_group_aggregate(
2995    func: AggFunc,
2996    all_rows: &[Vec<Value>],
2997    row_indices: &[usize],
2998    col_idx: usize,
2999) -> Value {
3000    match func {
3001        AggFunc::Count => {
3002            if col_idx == usize::MAX {
3003                // count(*) — count all rows in the group.
3004                return Value::Int(row_indices.len() as i64);
3005            }
3006            let count = row_indices
3007                .iter()
3008                .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3009                .count();
3010            Value::Int(count as i64)
3011        }
3012        AggFunc::CountDistinct => {
3013            let mut seen = std::collections::HashSet::new();
3014            for &ri in row_indices {
3015                let v = &all_rows[ri][col_idx];
3016                if !v.is_empty() {
3017                    seen.insert(v.clone());
3018                }
3019            }
3020            Value::Int(seen.len() as i64)
3021        }
3022        AggFunc::Sum => {
3023            // Mirror the scalar Sum path: accumulate int and float
3024            // contributions separately and promote the final result to
3025            // Float if any Float row was observed. Prevents silent
3026            // drop of Float columns in GROUP BY aggregates.
3027            let mut int_sum: i64 = 0;
3028            let mut float_sum: f64 = 0.0;
3029            let mut saw_float = false;
3030            for &ri in row_indices {
3031                match &all_rows[ri][col_idx] {
3032                    Value::Int(v) => int_sum += v,
3033                    Value::Float(v) => {
3034                        float_sum += *v;
3035                        saw_float = true;
3036                    }
3037                    _ => {}
3038                }
3039            }
3040            if saw_float {
3041                Value::Float(float_sum + int_sum as f64)
3042            } else {
3043                Value::Int(int_sum)
3044            }
3045        }
3046        AggFunc::Avg => {
3047            let mut sum = 0.0f64;
3048            let mut count = 0usize;
3049            for &ri in row_indices {
3050                match &all_rows[ri][col_idx] {
3051                    Value::Int(v) => {
3052                        sum += *v as f64;
3053                        count += 1;
3054                    }
3055                    Value::Float(v) => {
3056                        sum += *v;
3057                        count += 1;
3058                    }
3059                    _ => {}
3060                }
3061            }
3062            if count == 0 {
3063                Value::Empty
3064            } else {
3065                Value::Float(sum / count as f64)
3066            }
3067        }
3068        AggFunc::Min => row_indices
3069            .iter()
3070            .map(|&ri| &all_rows[ri][col_idx])
3071            .filter(|v| !v.is_empty())
3072            .min()
3073            .cloned()
3074            .unwrap_or(Value::Empty),
3075        AggFunc::Max => row_indices
3076            .iter()
3077            .map(|&ri| &all_rows[ri][col_idx])
3078            .filter(|v| !v.is_empty())
3079            .max()
3080            .cloned()
3081            .unwrap_or(Value::Empty),
3082    }
3083}
3084
3085/// Mission E1.3: try to extract equi-join key indices from a join `on`
3086/// predicate. Returns `Some((left_col_idx, right_col_idx))` when the
3087/// predicate is exactly `L = R` (or `R = L`) and both sides resolve
3088/// cleanly — `L` to the left subtree's column list and `R` to the right
3089/// subtree's column list.
3090///
3091/// This is deliberately narrow. We only recognise the two shapes:
3092///   * `QualifiedField = QualifiedField`  (`u.id = o.user_id`)
3093///   * `Field = Field`                    (`.id = .user_id`, unqualified)
3094///
3095/// Anything else — conjunctions, constants, function calls, or predicates
3096/// that touch the same side on both halves — falls through to the
3097/// nested-loop path unchanged.
3098pub(super) fn try_extract_equi_join_keys(
3099    pred: &Expr,
3100    left_columns: &[String],
3101    right_columns: &[String],
3102) -> Option<(usize, usize)> {
3103    let (lhs, op, rhs) = match pred {
3104        Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3105        _ => return None,
3106    };
3107    if op != BinOp::Eq {
3108        return None;
3109    }
3110    // Normal orientation: lhs in left, rhs in right.
3111    if let (Some(li), Some(ri)) = (
3112        resolve_side_column(lhs, left_columns),
3113        resolve_side_column(rhs, right_columns),
3114    ) {
3115        return Some((li, ri));
3116    }
3117    // Swapped: rhs in left, lhs in right. Both sides of `=` are
3118    // commutative so this is safe.
3119    if let (Some(li), Some(ri)) = (
3120        resolve_side_column(rhs, left_columns),
3121        resolve_side_column(lhs, right_columns),
3122    ) {
3123        return Some((li, ri));
3124    }
3125    None
3126}
3127
3128fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3129    match expr {
3130        Expr::QualifiedField { qualifier, field } => {
3131            // Byte-level match so we don't allocate a fresh `format!` on
3132            // every call — this runs once per plan, so allocation would be
3133            // cheap, but the match is trivial enough to keep inline with
3134            // the eval_expr version.
3135            let q = qualifier.as_bytes();
3136            let f = field.as_bytes();
3137            columns.iter().position(|c| {
3138                let b = c.as_bytes();
3139                b.len() == q.len() + 1 + f.len()
3140                    && b[..q.len()] == *q
3141                    && b[q.len()] == b'.'
3142                    && b[q.len() + 1..] == *f
3143            })
3144        }
3145        Expr::Field(name) => columns.iter().position(|c| c == name),
3146        _ => None,
3147    }
3148}
3149
3150/// Mission E1.3: O(L + R) hash join. Builds a `FxHashMap<Value, Vec<usize>>`
3151/// over the right (inner) side's join keys, then streams the left (outer)
3152/// side and for each probe row emits every combined row whose right-side
3153/// key matches. For `JoinKind::LeftOuter`, unmatched left rows are emitted
3154/// padded with `Value::Empty` on the right side.
3155///
3156/// The right side is always the build side. That choice is forced for
3157/// LeftOuter (the left side must stream so we can detect orphans), and
3158/// for Inner it's a reasonable default — left-deep plans tend to grow the
3159/// left side with each join, so the un-joined right leaf is often the
3160/// smaller of the two at each level.
3161pub(super) fn hash_join(
3162    left_columns: Vec<String>,
3163    left_rows: Vec<Vec<Value>>,
3164    right_columns: Vec<String>,
3165    right_rows: Vec<Vec<Value>>,
3166    left_key_idx: usize,
3167    right_key_idx: usize,
3168    kind: JoinKind,
3169) -> QueryResult {
3170    use rustc_hash::FxHashMap;
3171
3172    let n_left = left_columns.len();
3173    let n_right = right_columns.len();
3174    let mut columns = Vec::with_capacity(n_left + n_right);
3175    columns.extend(left_columns);
3176    columns.extend(right_columns);
3177
3178    // Build: right_key -> list of right-row indices. Pre-size to the row
3179    // count so the map doesn't rehash mid-build.
3180    let mut build: FxHashMap<Value, Vec<usize>> =
3181        FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3182    for (i, row) in right_rows.iter().enumerate() {
3183        // Skip Empty keys on the build side — they can never match under
3184        // SQL semantics (NULL ≠ NULL) and would collapse all nullables to
3185        // one bucket.
3186        if matches!(row[right_key_idx], Value::Empty) {
3187            continue;
3188        }
3189        build.entry(row[right_key_idx].clone()).or_default().push(i);
3190    }
3191
3192    // Reasonable starting capacity — inner joins produce ≥ left_rows.len()
3193    // rows in the common 1:1 case, left-outer always emits ≥ left_rows.len().
3194    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3195
3196    for left_row in &left_rows {
3197        let key = &left_row[left_key_idx];
3198        let matched = if matches!(key, Value::Empty) {
3199            None
3200        } else {
3201            build.get(key)
3202        };
3203        match matched {
3204            Some(matches) if !matches.is_empty() => {
3205                for &ri in matches {
3206                    let right_row = &right_rows[ri];
3207                    let mut combined = Vec::with_capacity(n_left + n_right);
3208                    combined.extend_from_slice(left_row);
3209                    combined.extend_from_slice(right_row);
3210                    rows.push(combined);
3211                }
3212            }
3213            _ => {
3214                if matches!(kind, JoinKind::LeftOuter) {
3215                    let mut row = Vec::with_capacity(n_left + n_right);
3216                    row.extend_from_slice(left_row);
3217                    row.resize(n_left + n_right, Value::Empty);
3218                    rows.push(row);
3219                }
3220            }
3221        }
3222    }
3223
3224    QueryResult::Rows { columns, rows }
3225}
3226
3227/// Lower unindexed `RangeScan` nodes to `Filter(SeqScan)` so that all
3228/// downstream fast paths (count, project+limit, sort+limit, agg, update,
3229/// delete) continue to fire.
3230///
3231/// The planner emits `RangeScan` speculatively for every range inequality
3232/// (`.age > 30`) because it has no catalog access. When the column has a
3233/// B-tree index, `RangeScan` is the correct plan. When it doesn't, the
3234/// executor's `RangeScan` fallback materialises every matching row with
3235/// full `decode_row` — bypassing the compiled-predicate fast paths that
3236/// `Filter(SeqScan)` would trigger.
3237///
3238/// This pass runs once per query, before execution.
3239pub(super) fn lower_unindexed_range_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3240    match plan {
3241        PlanNode::RangeScan {
3242            table,
3243            column,
3244            start,
3245            end,
3246        } => {
3247            if let Some(tbl) = catalog.get_table(table) {
3248                if tbl.index(column).is_some() {
3249                    return plan.clone();
3250                }
3251            }
3252            let pred = synthesize_range_predicate(column, start, end);
3253            PlanNode::Filter {
3254                input: Box::new(PlanNode::SeqScan {
3255                    table: table.clone(),
3256                }),
3257                predicate: pred,
3258            }
3259        }
3260        PlanNode::Filter { input, predicate } => PlanNode::Filter {
3261            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3262            predicate: predicate.clone(),
3263        },
3264        PlanNode::Project { input, fields } => PlanNode::Project {
3265            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3266            fields: fields.clone(),
3267        },
3268        PlanNode::Sort { input, keys } => PlanNode::Sort {
3269            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3270            keys: keys.clone(),
3271        },
3272        PlanNode::Limit { input, count } => PlanNode::Limit {
3273            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3274            count: count.clone(),
3275        },
3276        PlanNode::Offset { input, count } => PlanNode::Offset {
3277            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3278            count: count.clone(),
3279        },
3280        PlanNode::Aggregate {
3281            input,
3282            function,
3283            field,
3284        } => PlanNode::Aggregate {
3285            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3286            function: *function,
3287            field: field.clone(),
3288        },
3289        PlanNode::Distinct { input } => PlanNode::Distinct {
3290            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3291        },
3292        PlanNode::GroupBy {
3293            input,
3294            keys,
3295            aggregates,
3296            having,
3297        } => PlanNode::GroupBy {
3298            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3299            keys: keys.clone(),
3300            aggregates: aggregates.clone(),
3301            having: having.clone(),
3302        },
3303        PlanNode::Update {
3304            input,
3305            table,
3306            assignments,
3307        } => PlanNode::Update {
3308            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3309            table: table.clone(),
3310            assignments: assignments.clone(),
3311        },
3312        PlanNode::Delete { input, table } => PlanNode::Delete {
3313            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3314            table: table.clone(),
3315        },
3316        PlanNode::Window { input, windows } => PlanNode::Window {
3317            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3318            windows: windows.clone(),
3319        },
3320        PlanNode::Union { left, right, all } => PlanNode::Union {
3321            left: Box::new(lower_unindexed_range_scans(catalog, left)),
3322            right: Box::new(lower_unindexed_range_scans(catalog, right)),
3323            all: *all,
3324        },
3325        PlanNode::Explain { input } => PlanNode::Explain {
3326            input: Box::new(lower_unindexed_range_scans(catalog, input)),
3327        },
3328        PlanNode::NestedLoopJoin {
3329            left,
3330            right,
3331            on,
3332            kind,
3333        } => PlanNode::NestedLoopJoin {
3334            left: Box::new(lower_unindexed_range_scans(catalog, left)),
3335            right: Box::new(lower_unindexed_range_scans(catalog, right)),
3336            on: on.clone(),
3337            kind: *kind,
3338        },
3339        // Leaf nodes: no children to recurse into.
3340        _ => plan.clone(),
3341    }
3342}
3343
3344/// Synthesize a range predicate from RangeScan bounds for the fallback path.
3345pub(super) fn synthesize_range_predicate(
3346    column: &str,
3347    start: &Option<(Expr, bool)>,
3348    end: &Option<(Expr, bool)>,
3349) -> Expr {
3350    let lower = start.as_ref().map(|(expr, inclusive)| {
3351        let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3352        Expr::BinaryOp(
3353            Box::new(Expr::Field(column.to_string())),
3354            op,
3355            Box::new(expr.clone()),
3356        )
3357    });
3358    let upper = end.as_ref().map(|(expr, inclusive)| {
3359        let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3360        Expr::BinaryOp(
3361            Box::new(Expr::Field(column.to_string())),
3362            op,
3363            Box::new(expr.clone()),
3364        )
3365    });
3366    match (lower, upper) {
3367        (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3368        (Some(l), None) => l,
3369        (None, Some(u)) => u,
3370        (None, None) => Expr::Literal(Literal::Bool(true)),
3371    }
3372}
3373
3374/// Check if a value falls within a range (used in last-resort decoded-row eval).
3375pub(super) fn range_matches(
3376    val: &Value,
3377    start: &Option<Value>,
3378    start_inc: bool,
3379    end: &Option<Value>,
3380    end_inc: bool,
3381) -> bool {
3382    if let Some(ref s) = start {
3383        if start_inc {
3384            if val < s {
3385                return false;
3386            }
3387        } else if val <= s {
3388            return false;
3389        }
3390    }
3391    if let Some(ref e) = end {
3392        if end_inc {
3393            if val > e {
3394                return false;
3395            }
3396        } else if val >= e {
3397            return false;
3398        }
3399    }
3400    true
3401}
3402
3403/// Format a `PlanNode` tree as a human-readable, indented text
3404/// representation. Used by the `EXPLAIN` command.
3405pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3406    let indent = "  ".repeat(depth);
3407    match plan {
3408        PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3409        PlanNode::AliasScan { table, alias } => {
3410            format!("{indent}AliasScan table={table} alias={alias}")
3411        }
3412        PlanNode::IndexScan { table, column, key } => {
3413            format!("{indent}IndexScan table={table} column={column} key={key:?}")
3414        }
3415        PlanNode::RangeScan {
3416            table,
3417            column,
3418            start,
3419            end,
3420        } => {
3421            let s = match start {
3422                Some((expr, inc)) => {
3423                    let op = if *inc { ">=" } else { ">" };
3424                    format!("{op}{expr:?}")
3425                }
3426                None => "unbounded".to_string(),
3427            };
3428            let e = match end {
3429                Some((expr, inc)) => {
3430                    let op = if *inc { "<=" } else { "<" };
3431                    format!("{op}{expr:?}")
3432                }
3433                None => "unbounded".to_string(),
3434            };
3435            format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3436        }
3437        PlanNode::Filter { input, predicate } => {
3438            let child = format_plan_tree(input, depth + 1);
3439            format!("{indent}Filter predicate={predicate:?}\n{child}")
3440        }
3441        PlanNode::Project { input, fields } => {
3442            let names: Vec<String> = fields
3443                .iter()
3444                .map(|f| match &f.alias {
3445                    Some(a) => format!("{a}: {:?}", f.expr),
3446                    None => format!("{:?}", f.expr),
3447                })
3448                .collect();
3449            let child = format_plan_tree(input, depth + 1);
3450            format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3451        }
3452        PlanNode::Sort { input, keys } => {
3453            let ks: Vec<String> = keys
3454                .iter()
3455                .map(|k| {
3456                    if k.descending {
3457                        format!("{} desc", k.field)
3458                    } else {
3459                        k.field.clone()
3460                    }
3461                })
3462                .collect();
3463            let child = format_plan_tree(input, depth + 1);
3464            format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3465        }
3466        PlanNode::Limit { input, count } => {
3467            let child = format_plan_tree(input, depth + 1);
3468            format!("{indent}Limit count={count:?}\n{child}")
3469        }
3470        PlanNode::Offset { input, count } => {
3471            let child = format_plan_tree(input, depth + 1);
3472            format!("{indent}Offset count={count:?}\n{child}")
3473        }
3474        PlanNode::Aggregate {
3475            input,
3476            function,
3477            field,
3478        } => {
3479            let f = field.as_deref().unwrap_or("*");
3480            let child = format_plan_tree(input, depth + 1);
3481            format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3482        }
3483        PlanNode::NestedLoopJoin {
3484            left,
3485            right,
3486            on,
3487            kind,
3488        } => {
3489            let left_child = format_plan_tree(left, depth + 1);
3490            let right_child = format_plan_tree(right, depth + 1);
3491            let on_str = match on {
3492                Some(pred) => format!("{pred:?}"),
3493                None => "none".to_string(),
3494            };
3495            format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3496        }
3497        PlanNode::Distinct { input } => {
3498            let child = format_plan_tree(input, depth + 1);
3499            format!("{indent}Distinct\n{child}")
3500        }
3501        PlanNode::GroupBy {
3502            input,
3503            keys,
3504            aggregates,
3505            having,
3506        } => {
3507            let agg_strs: Vec<String> = aggregates
3508                .iter()
3509                .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3510                .collect();
3511            let having_str = match having {
3512                Some(h) => format!(" having={h:?}"),
3513                None => String::new(),
3514            };
3515            let child = format_plan_tree(input, depth + 1);
3516            format!(
3517                "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3518                keys.join(", "),
3519                agg_strs.join(", "),
3520            )
3521        }
3522        PlanNode::Insert { table, assignments } => {
3523            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3524            format!("{indent}Insert table={table} cols=[{}]", cols.join(", "))
3525        }
3526        PlanNode::Upsert {
3527            table,
3528            key_column,
3529            assignments,
3530            on_conflict,
3531        } => {
3532            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3533            let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3534            if conflict_cols.is_empty() {
3535                format!(
3536                    "{indent}Upsert table={table} key={key_column} cols=[{}]",
3537                    cols.join(", ")
3538                )
3539            } else {
3540                format!(
3541                    "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3542                    cols.join(", "),
3543                    conflict_cols.join(", ")
3544                )
3545            }
3546        }
3547        PlanNode::Update {
3548            input,
3549            table,
3550            assignments,
3551        } => {
3552            let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3553            let child = format_plan_tree(input, depth + 1);
3554            format!(
3555                "{indent}Update table={table} set=[{}]\n{child}",
3556                cols.join(", ")
3557            )
3558        }
3559        PlanNode::Delete { input, table } => {
3560            let child = format_plan_tree(input, depth + 1);
3561            format!("{indent}Delete table={table}\n{child}")
3562        }
3563        PlanNode::CreateTable { name, fields } => {
3564            let fs: Vec<String> = fields
3565                .iter()
3566                .map(|(n, t, r)| {
3567                    if *r {
3568                        format!("{n}: {t} required")
3569                    } else {
3570                        format!("{n}: {t}")
3571                    }
3572                })
3573                .collect();
3574            format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3575        }
3576        PlanNode::AlterTable { table, action } => {
3577            format!("{indent}AlterTable table={table} action={action:?}")
3578        }
3579        PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3580        PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3581        PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3582        PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3583        PlanNode::Window { input, windows } => {
3584            let ws: Vec<String> = windows
3585                .iter()
3586                .map(|w| format!("{:?} as {}", w.function, w.output_name))
3587                .collect();
3588            let child = format_plan_tree(input, depth + 1);
3589            format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3590        }
3591        PlanNode::Union { left, right, all } => {
3592            let kind = if *all { "UNION ALL" } else { "UNION" };
3593            let left_child = format_plan_tree(left, depth + 1);
3594            let right_child = format_plan_tree(right, depth + 1);
3595            format!("{indent}{kind}\n{left_child}\n{right_child}")
3596        }
3597        PlanNode::Explain { input } => {
3598            let child = format_plan_tree(input, depth + 1);
3599            format!("{indent}Explain\n{child}")
3600        }
3601    }
3602}