Skip to main content

powdb_query/executor/
prepared.rs

1//! PreparedQuery struct and related Engine methods.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::planner;
6use crate::result::{QueryError, QueryResult};
7use powdb_storage::catalog::Catalog;
8use powdb_storage::types::*;
9
10use super::compiled::*;
11use super::eval::*;
12use super::Engine;
13
14pub struct PreparedQuery {
15    plan_template: PlanNode,
16    /// Total number of `Expr::Literal` slots reachable from the plan.
17    /// Callers must supply exactly this many literals per execution.
18    pub param_count: usize,
19    /// Fast-path metadata for `PlanNode::Insert`. `Some` when:
20    ///   * the template is an Insert, and
21    ///   * every assignment RHS is `Expr::Literal(_)` (no computed exprs),
22    ///     which means param_count == assignments.len() and the caller's
23    ///     literal slice maps 1:1 to schema column indices.
24    ///
25    /// Mission C Phase 15: upgraded from a bare `Vec<usize>` to a
26    /// dedicated [`InsertFast`] struct so the execute path can skip the
27    /// second `catalog.schema(table)` HashMap lookup just to read
28    /// `n_cols`, and can dispatch through `get_table_mut` + `tbl.insert`
29    /// instead of going via the generic `catalog.insert` wrapper.
30    insert_fast: Option<InsertFast>,
31    /// Mission C Phase 14: fast-path metadata for point updates by primary
32    /// key — `T filter .pk = <lit> update { col := <lit> }` where `pk` is
33    /// an indexed column and `col` is fixed-size and not indexed. At
34    /// execute time we skip plan clone, substitute walk, schema re-lookup,
35    /// `resolved_assignments` + `FastPatch` + `matching_rids` Vec allocs,
36    /// and the whole `PlanNode::Update` arm. Just a btree lookup and a
37    /// byte patch.
38    update_pk_fast: Option<UpdatePkFast>,
39}
40
41/// Mission C Phase 15: precomputed insert fast-path metadata. Built once
42/// in [`Engine::prepare`] from a `PlanNode::Insert` template whose every
43/// assignment RHS is a raw literal. The execute path reads `n_cols` and
44/// `col_indices` directly — no catalog schema lookup needed.
45#[derive(Clone)]
46struct InsertFast {
47    /// Mission C Phase 18: cached slot index into `Catalog::tables`.
48    /// Resolved once at `prepare` time and stable for the lifetime of
49    /// the catalog (PowDB has no DROP TABLE). Lets the hot path dispatch
50    /// through `catalog.table_by_slot_mut(slot)` — a pure Vec index,
51    /// no hash, no bucket walk, no string compare.
52    table_slot: usize,
53    /// Schema column index for each positional literal, in the order the
54    /// caller passes them.
55    col_indices: Vec<usize>,
56    /// Total number of schema columns — the size `insert_values_scratch`
57    /// must be resized to before filling positions via `col_indices`.
58    /// Cached here so the hot loop skips `catalog.schema(table)` entirely.
59    n_cols: usize,
60}
61
62/// Mission C Phase 14: precomputed fast-path for `update_by_pk` shaped
63/// prepared queries. Built once in [`Engine::prepare`] and reused on every
64/// `execute_prepared` call.
65#[derive(Clone)]
66struct UpdatePkFast {
67    /// Mission C Phase 18: cached slot index into `Catalog::tables`.
68    /// Resolved once at `prepare` time and stable for the lifetime of
69    /// the catalog. At a 52ns total budget the swap from FxHashMap
70    /// probe to a Vec index is measurable.
71    table_slot: usize,
72    /// Name of the key column (the `.id = ?` side). We look this up in
73    /// the owning table's `indexed_cols` at execute time rather than
74    /// caching a raw `&BTree` — the engine owns the catalog and can't
75    /// hand out long-lived borrows anyway, and the n≤5 linear scan is
76    /// a handful of ns.
77    key_col: String,
78    /// Byte offset of the target fixed column in the row encoding:
79    /// `2 + bitmap_size + layout.fixed_offsets[target_col]`.
80    field_off: usize,
81    /// Byte offset of the bitmap byte containing the target column's null
82    /// bit (`2 + target_col / 8`).
83    bitmap_byte_off: usize,
84    /// Bit mask for the target column's null bit.
85    bit_mask: u8,
86    /// Type of the target fixed column — drives the literal-to-bytes
87    /// encoding at execute time.
88    target_type: TypeId,
89    /// Index into the caller's `literals` slice that holds the filter key.
90    /// Always 0 today (filter literal is visited before the assignment
91    /// RHS), but stored explicitly so the contract is obvious.
92    key_literal_idx: usize,
93    /// Index into the caller's `literals` slice that holds the new value.
94    value_literal_idx: usize,
95}
96
97impl Engine {
98    pub fn prepare(&mut self, query: &str) -> Result<PreparedQuery, QueryError> {
99        let plan = planner::plan(query).map_err(|e| QueryError::Parse(e.to_string()))?;
100        let param_count = crate::plan_cache::count_literal_slots(&plan);
101
102        // Insert fast path: if the template is Insert and every assignment
103        // RHS is a literal, resolve column indices once here and store
104        // them. execute_prepared will skip the plan-clone + substitute
105        // walk on this path.
106        //
107        // Mission C Phase 15: also cache `n_cols` and the target table
108        // name so execute_prepared doesn't need a second HashMap lookup
109        // on `self.catalog.schema(table)` just to size the scratch Vec.
110        let insert_fast = match &plan {
111            // Single-row inserts only: the byte-level fast path patches one
112            // row's worth of scratch. Multi-row `insert T {..},{..}` falls
113            // through to the generic plan path (always correct).
114            PlanNode::Insert { table, rows }
115                if rows.len() == 1
116                    && rows[0].iter().all(|a| matches!(a.value, Expr::Literal(_)))
117                    && param_count == rows[0].len() =>
118            {
119                let assignments = &rows[0];
120                let table_slot = self
121                    .catalog
122                    .table_slot(table)
123                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
124                let schema = &self.catalog.table_by_slot(table_slot).schema;
125                let n_cols = schema.columns.len();
126                let indices: Result<Vec<usize>, QueryError> = assignments
127                    .iter()
128                    .map(|a| {
129                        schema
130                            .column_index(&a.field)
131                            .ok_or_else(|| QueryError::ColumnNotFound {
132                                table: table.clone(),
133                                column: a.field.clone(),
134                            })
135                    })
136                    .collect();
137                Some(InsertFast {
138                    table_slot,
139                    col_indices: indices?,
140                    n_cols,
141                })
142            }
143            _ => None,
144        };
145
146        // Mission C Phase 14: update-by-pk fast path. Match on the shape
147        // planner::plan_update builds for `T filter .pk = ? update
148        // { col := ? }` — `Update { input: IndexScan(pk), assignments:
149        // [{col, Literal}] }` — and only if every precondition holds:
150        //   * `pk` is an indexed column (so the executor would take the
151        //     btree.lookup path at run time regardless)
152        //   * there's exactly one assignment
153        //   * the assigned column is fixed-size and *not* indexed (so we
154        //     don't have to maintain any secondary index on write)
155        //   * both literal slots are already `Expr::Literal` (no computed
156        //     expressions)
157        // If any of these fail we fall through to the standard substitute
158        // + execute path.
159        let update_pk_fast = Self::try_build_update_pk_fast(&self.catalog, &plan);
160
161        Ok(PreparedQuery {
162            plan_template: plan,
163            param_count,
164            insert_fast,
165            update_pk_fast,
166        })
167    }
168
169    /// Mission C Phase 14: inspect a planned tree and, if it matches the
170    /// `update_by_pk` fast-path shape, return the precomputed byte-patch
171    /// metadata. Returns `None` on any mismatch — the caller falls through
172    /// to the substitute-and-execute path, which is always correct.
173    fn try_build_update_pk_fast(catalog: &Catalog, plan: &PlanNode) -> Option<UpdatePkFast> {
174        // Top level must be `Update { input: IndexScan(...), ... }`.
175        let (table, input, assignments) = match plan {
176            PlanNode::Update {
177                table,
178                input,
179                assignments,
180            } => (table, input.as_ref(), assignments),
181            _ => return None,
182        };
183        // Exactly one assignment — the bench hot path and the only case
184        // where a single byte-patch covers the whole mutation.
185        if assignments.len() != 1 {
186            return None;
187        }
188        let assn = &assignments[0];
189        // Assignment RHS must be a raw literal, not a computed expr.
190        if !matches!(assn.value, Expr::Literal(_)) {
191            return None;
192        }
193        // Input must be an IndexScan on the same table with a literal key.
194        let (key_col, key_table) = match input {
195            PlanNode::IndexScan {
196                table: t,
197                column,
198                key: Expr::Literal(_),
199            } => (column.clone(), t.clone()),
200            _ => return None,
201        };
202        if &key_table != table {
203            return None;
204        }
205
206        // Look up schema + index state from the live catalog, caching
207        // the slot so the execute path skips the name probe.
208        let table_slot = catalog.table_slot(table)?;
209        let tbl = catalog.table_by_slot(table_slot);
210        let schema = &tbl.schema;
211
212        // Key column must have an index (the btree.lookup path is what
213        // makes the fast path worth building).
214        if !tbl.has_index(&key_col) {
215            return None;
216        }
217
218        // Target column must exist, be fixed-size, and NOT be indexed (so
219        // we don't have to maintain any secondary index here).
220        let target_col_idx = schema.column_index(&assn.field)?;
221        let target_type = schema.columns[target_col_idx].type_id;
222        if !is_fixed_size(target_type) {
223            return None;
224        }
225        if tbl.has_indexed_col(target_col_idx) {
226            return None;
227        }
228
229        // Precompute byte offsets from the cached row layout.
230        let layout = tbl.row_layout();
231        let fixed_off = layout.fixed_offset(target_col_idx)?;
232        let bitmap_size = layout.bitmap_size();
233        let field_off = 2 + bitmap_size + fixed_off;
234        let bitmap_byte_off = 2 + target_col_idx / 8;
235        let bit_mask = 1u8 << (target_col_idx % 8);
236
237        // Literal walk order for `Update { IndexScan(key), [{value}] }`
238        // (see `plan_cache::substitute_plan` — input first, then the
239        // assignments). The filter key is literal 0, the assignment RHS
240        // is literal 1.
241        Some(UpdatePkFast {
242            table_slot,
243            key_col,
244            field_off,
245            bitmap_byte_off,
246            bit_mask,
247            target_type,
248            key_literal_idx: 0,
249            value_literal_idx: 1,
250        })
251    }
252
253    /// Execute a [`PreparedQuery`] with the given literal values.
254    ///
255    /// The literals are substituted into a clone of the template plan in
256    /// the same deterministic walk order that [`crate::canonicalize`]
257    /// produces (filter predicate first, then projection, then assignment
258    /// RHS, and so on). Substitution errors here mean the caller passed
259    /// the wrong number of literals for this query shape.
260    pub fn execute_prepared(
261        &mut self,
262        prep: &PreparedQuery,
263        literals: &[Literal],
264    ) -> Result<QueryResult, QueryError> {
265        if literals.len() != prep.param_count {
266            return Err(QueryError::Execution(format!(
267                "prepared query expects {} literal(s), got {}",
268                prep.param_count,
269                literals.len(),
270            )));
271        }
272
273        // Mission C Phase 14: update-by-pk fast path. Skip plan clone,
274        // substitute walk, resolved_assignments, FastPatch, Vec<RowId>,
275        // RowLayout::new — straight to btree.lookup_int + byte patch.
276        // On rare mismatches (wrong literal type, index dropped after
277        // prepare) the helper returns `Ok(None)` and we fall through to
278        // the generic substitute-and-execute path below.
279        if let Some(fast) = &prep.update_pk_fast {
280            if let Some(result) = self.try_execute_update_pk_fast(fast, literals)? {
281                // Mark dependent views dirty for prepared update fast path.
282                if let PlanNode::Update { table, .. } = &prep.plan_template {
283                    self.view_registry.mark_dependents_dirty(table);
284                }
285                // Mission B (post-review): statement-boundary WAL group
286                // commit. The fast path appended an Update record but did
287                // not flush — flush it now so the executor's contract is
288                // "WAL is on disk before this returns".
289                self.catalog
290                    .sync_wal()
291                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
292                return Ok(result);
293            }
294        }
295
296        // Insert fast path: skip plan-clone + substitute walk + PlanNode::Insert
297        // arm's column-index resolution. Build the Row directly from the
298        // caller's literal slice using indices we resolved at prepare time.
299        // Saves ~300-500ns per insert on the bench.
300        //
301        // Mission C Phase 13: the scratch `Vec<Value>` is reused across
302        // calls — no fresh allocation per insert. We split the borrow
303        // between `self.catalog` and `self.insert_values_scratch` by
304        // moving the scratch into a local, filling it, passing to the
305        // catalog, and putting it back.
306        //
307        // Mission C Phase 15: the cached `InsertFast` carries `n_cols`
308        // and the table name, so the hot path makes exactly one catalog
309        // HashMap lookup (`get_table_mut`) and dispatches straight into
310        // `tbl.insert` — no intermediate schema lookup, no generic
311        // `Catalog::insert` wrapper.
312        if let Some(fast) = &prep.insert_fast {
313            let mut values = std::mem::take(&mut self.insert_values_scratch);
314            values.clear();
315            values.resize(fast.n_cols, Value::Empty);
316            for (pos, lit) in literals.iter().enumerate() {
317                values[fast.col_indices[pos]] = literal_value_from(lit);
318            }
319            // Mission C Phase 18: direct O(1) slot index — no
320            // catalog hash probe. Slot was resolved at prepare time.
321            // Durability fix: route through the WAL-logging `insert_by_slot`
322            // (was the raw `Table::insert`, which bypassed the WAL and lost
323            // every prepared insert on a crash).
324            let res = self
325                .catalog
326                .insert_by_slot(fast.table_slot, &values)
327                .map_err(|e| e.to_string());
328            // Clear strings before returning the scratch — don't keep
329            // dangling allocations from the previous row alive across
330            // calls. `clear()` drops the Value::Str entries.
331            values.clear();
332            self.insert_values_scratch = values;
333            res?;
334            // Mark dependent views dirty for prepared insert fast path.
335            if let PlanNode::Insert { table, .. } = &prep.plan_template {
336                self.view_registry.mark_dependents_dirty(table);
337            }
338            // Mission B (post-review): statement-boundary WAL group commit.
339            self.catalog
340                .sync_wal()
341                .map_err(|e| QueryError::StorageError(e.to_string()))?;
342            return Ok(QueryResult::Modified(1));
343        }
344
345        let mut plan = prep.plan_template.clone();
346        let mut idx = 0usize;
347        crate::plan_cache::substitute_plan(&mut plan, literals, &mut idx);
348        debug_assert_eq!(idx, literals.len());
349        let result = self.execute_plan(&plan);
350        // Mission B (post-review): statement-boundary WAL group commit.
351        // No-op when nothing was buffered (read-only plans).
352        self.catalog
353            .sync_wal()
354            .map_err(|e| QueryError::StorageError(e.to_string()))?;
355        result
356    }
357
358    /// Mission C Phase 14: point-update fast path for prepared
359    /// `T filter .pk = ? update { col := ? }` queries. The caller has
360    /// already verified this is an int-indexed pk with a fixed-size,
361    /// non-indexed target column; all we do here is pluck the two
362    /// literals out of the caller's slice, run one `btree.lookup_int`,
363    /// and patch 1–8 bytes of the row. No plan clone, no allocations.
364    ///
365    /// Returns:
366    ///   * `Ok(Some(result))` — fast path took the mutation.
367    ///   * `Ok(None)` — can't take the fast path this call (wrong
368    ///     literal type, index dropped since prepare, etc.). Caller
369    ///     falls through to the generic substitute-and-execute path.
370    ///   * `Err(_)` — real error (table gone, I/O, etc.).
371    #[inline]
372    fn try_execute_update_pk_fast(
373        &mut self,
374        fast: &UpdatePkFast,
375        literals: &[Literal],
376    ) -> Result<Option<QueryResult>, QueryError> {
377        // 1) Extract the key literal. The fast path is only built for
378        //    int key columns; any other literal type means the caller
379        //    is violating the prepared-query contract or the schema
380        //    changed — either way, fall back.
381        let key_int = match &literals[fast.key_literal_idx] {
382            Literal::Int(v) => *v,
383            _ => return Ok(None),
384        };
385
386        // 2) Encode the new value as little-endian bytes matching the
387        //    target column's fixed encoding.
388        let bytes: FixedBytes = match (fast.target_type, &literals[fast.value_literal_idx]) {
389            (TypeId::Int, Literal::Int(v)) => FixedBytes::I64(v.to_le_bytes()),
390            (TypeId::DateTime, Literal::Int(v)) => FixedBytes::I64(v.to_le_bytes()),
391            (TypeId::Float, Literal::Float(v)) => FixedBytes::F64(v.to_le_bytes()),
392            (TypeId::Bool, Literal::Bool(v)) => FixedBytes::Bool(if *v { 1 } else { 0 }),
393            // Type mismatch — fall back to the generic path for a
394            // consistent error shape.
395            _ => return Ok(None),
396        };
397
398        // 3) Look up the table + btree, do the int lookup, patch the row
399        //    in place. Phase 18: table dispatch is a direct slot index;
400        //    the btree lookup is the linear scan over `indexed_cols`.
401        //    Single btree.lookup_int + one `with_row_bytes_mut` call.
402        //    No Vec allocations at all.
403        //
404        // Mission B2: route the in-place patch through the catalog's
405        // WAL-logged wrapper so crash recovery sees the update. The
406        // extra cost is one WAL append + fsync per query — the hot
407        // loop structure is unchanged.
408        let tbl = self.catalog.table_by_slot_mut(fast.table_slot);
409        let Some(btree) = tbl.index(&fast.key_col) else {
410            // Index dropped since prepare — bail to the generic path.
411            return Ok(None);
412        };
413        let Some(rid) = btree.lookup_int(key_int) else {
414            return Ok(Some(QueryResult::Modified(0)));
415        };
416
417        let fast_table_slot = fast.table_slot;
418        let bitmap_byte_off = fast.bitmap_byte_off;
419        let bit_mask = fast.bit_mask;
420        let field_off = fast.field_off;
421        let ok = self
422            .catalog
423            .update_row_bytes_logged_by_slot(fast_table_slot, rid, |row| {
424                // Idempotent null-bit clear — safe even when the column was
425                // already non-null (the overwhelmingly common case).
426                row[bitmap_byte_off] &= !bit_mask;
427                let field_bytes = bytes.as_slice();
428                row[field_off..field_off + field_bytes.len()].copy_from_slice(field_bytes);
429            })
430            .map_err(|e| QueryError::StorageError(e.to_string()))?;
431
432        Ok(Some(QueryResult::Modified(if ok { 1 } else { 0 })))
433    }
434
435    /// Mission C Phase 13: moving variant of [`Engine::execute_prepared`]
436    /// for the insert fast path. Takes `literals` by mutable reference
437    /// so that each `Literal::String` can be consumed via `mem::take`
438    /// instead of cloned into a `Value::Str`. On `insert_batch_1k` that
439    /// removes three per-row heap allocations (name, status, email),
440    /// bringing the workload over the line vs SQLite's amortized
441    /// prepare+execute loop.
442    ///
443    /// The caller's `Literal::String` entries are replaced with empty
444    /// strings on successful inserts — the `literals` slice is *not*
445    /// left in a valid-for-reuse state except for `Int`/`Float`/`Bool`
446    /// values. Non-insert templates fall through to the standard
447    /// substitute-and-execute path.
448    pub fn execute_prepared_take(
449        &mut self,
450        prep: &PreparedQuery,
451        literals: &mut [Literal],
452    ) -> Result<QueryResult, QueryError> {
453        if literals.len() != prep.param_count {
454            return Err(QueryError::Execution(format!(
455                "prepared query expects {} literal(s), got {}",
456                prep.param_count,
457                literals.len(),
458            )));
459        }
460
461        if let Some(fast) = &prep.insert_fast {
462            let mut values = std::mem::take(&mut self.insert_values_scratch);
463            values.clear();
464            values.resize(fast.n_cols, Value::Empty);
465            for (pos, lit) in literals.iter_mut().enumerate() {
466                values[fast.col_indices[pos]] = literal_value_take(lit);
467            }
468            // Mission C Phase 18: direct O(1) slot index — see
469            // `execute_prepared` for rationale. This is the hot path
470            // for `insert_batch_1k`. Durability fix: WAL-logging
471            // `insert_by_slot` (was the raw `Table::insert`).
472            let res = self
473                .catalog
474                .insert_by_slot(fast.table_slot, &values)
475                .map_err(|e| e.to_string());
476            values.clear();
477            self.insert_values_scratch = values;
478            res?;
479            // Mission B (post-review): statement-boundary WAL group commit.
480            self.catalog
481                .sync_wal()
482                .map_err(|e| QueryError::StorageError(e.to_string()))?;
483            return Ok(QueryResult::Modified(1));
484        }
485
486        // Non-insert templates — fall back to the standard path. We
487        // can't usefully move the literals because `substitute_plan`
488        // still expects an immutable slice, and the non-insert hot
489        // paths are dominated by plan walks anyway.
490        self.execute_prepared(prep, literals)
491    }
492
493    /// Walk an expression tree and replace every `InSubquery` node with
494    /// an `InList` by executing the subquery and collecting its first
495    /// column as literal values. This must be called before entering
496    /// the row-by-row scan loop because the scan closure can't call back
497    /// into the engine.
498    pub(super) fn materialize_subqueries(&mut self, expr: &Expr) -> Result<Expr, QueryError> {
499        match expr {
500            Expr::InSubquery {
501                expr: inner,
502                subquery,
503                negated,
504            } => {
505                if is_correlated_subquery(subquery, &self.catalog) {
506                    let inner = self.materialize_subqueries(inner)?;
507                    return Ok(Expr::InSubquery {
508                        expr: Box::new(inner),
509                        subquery: subquery.clone(),
510                        negated: *negated,
511                    });
512                }
513                let inner = self.materialize_subqueries(inner)?;
514                // Plan and execute the subquery.
515                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
516                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
517                let result = self.execute_plan(&sub_plan)?;
518                let values = match result {
519                    QueryResult::Rows { rows, .. } => rows
520                        .into_iter()
521                        .filter_map(|mut row| {
522                            if row.is_empty() {
523                                None
524                            } else {
525                                Some(value_to_expr(row.swap_remove(0)))
526                            }
527                        })
528                        .collect(),
529                    _ => Vec::new(),
530                };
531                // WS2: byte-budget guard on the materialized IN-list.
532                self.charge_in_list(&values)?;
533                Ok(Expr::InList {
534                    expr: Box::new(inner),
535                    list: values,
536                    negated: *negated,
537                })
538            }
539            Expr::ExistsSubquery { subquery, negated } => {
540                if is_correlated_subquery(subquery, &self.catalog) {
541                    return Ok(expr.clone());
542                }
543                // Uncorrelated EXISTS: run the subquery once and collapse
544                // into a Bool literal.
545                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
546                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
547                let result = self.execute_plan(&sub_plan)?;
548                let has_rows = match result {
549                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
550                    _ => false,
551                };
552                let truth = if *negated { !has_rows } else { has_rows };
553                Ok(Expr::Literal(Literal::Bool(truth)))
554            }
555            Expr::BinaryOp(l, op, r) => {
556                let l = self.materialize_subqueries(l)?;
557                let r = self.materialize_subqueries(r)?;
558                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
559            }
560            Expr::UnaryOp(op, inner) => {
561                let inner = self.materialize_subqueries(inner)?;
562                Ok(Expr::UnaryOp(*op, Box::new(inner)))
563            }
564            Expr::Case { whens, else_expr } => {
565                let whens = whens
566                    .iter()
567                    .map(|(c, r)| {
568                        let c = self.materialize_subqueries(c)?;
569                        let r = self.materialize_subqueries(r)?;
570                        Ok((Box::new(c), Box::new(r)))
571                    })
572                    .collect::<Result<Vec<_>, QueryError>>()?;
573                let else_expr = match else_expr {
574                    Some(e) => Some(Box::new(self.materialize_subqueries(e)?)),
575                    None => None,
576                };
577                Ok(Expr::Case { whens, else_expr })
578            }
579            // Leaf nodes: no subqueries possible.
580            other => Ok(other.clone()),
581        }
582    }
583
584    /// Write-path per-row materialisation of correlated subqueries.
585    pub(super) fn materialize_correlated_for_row(
586        &mut self,
587        expr: &Expr,
588        outer_row: &[Value],
589        outer_columns: &[String],
590    ) -> Result<Expr, QueryError> {
591        match expr {
592            Expr::InSubquery {
593                expr: inner,
594                subquery,
595                negated,
596            } => {
597                let inner = self.materialize_correlated_for_row(inner, outer_row, outer_columns)?;
598                let mut sub = *subquery.clone();
599                if let Some(ref filter) = sub.filter {
600                    sub.filter = Some(substitute_outer_refs(
601                        filter,
602                        &sub.source,
603                        &self.catalog,
604                        outer_row,
605                        outer_columns,
606                    ));
607                }
608                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
609                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
610                let result = self.execute_plan(&sub_plan)?;
611                let values = match result {
612                    QueryResult::Rows { rows, .. } => rows
613                        .into_iter()
614                        .filter_map(|mut row| {
615                            if row.is_empty() {
616                                None
617                            } else {
618                                Some(value_to_expr(row.swap_remove(0)))
619                            }
620                        })
621                        .collect(),
622                    _ => Vec::new(),
623                };
624                Ok(Expr::InList {
625                    expr: Box::new(inner),
626                    list: values,
627                    negated: *negated,
628                })
629            }
630            Expr::ExistsSubquery { subquery, negated } => {
631                let mut sub = *subquery.clone();
632                if let Some(ref filter) = sub.filter {
633                    sub.filter = Some(substitute_outer_refs(
634                        filter,
635                        &sub.source,
636                        &self.catalog,
637                        outer_row,
638                        outer_columns,
639                    ));
640                }
641                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
642                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
643                let result = self.execute_plan(&sub_plan)?;
644                let has_rows = match result {
645                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
646                    _ => false,
647                };
648                let truth = if *negated { !has_rows } else { has_rows };
649                Ok(Expr::Literal(Literal::Bool(truth)))
650            }
651            Expr::BinaryOp(l, op, r) => {
652                let l = self.materialize_correlated_for_row(l, outer_row, outer_columns)?;
653                let r = self.materialize_correlated_for_row(r, outer_row, outer_columns)?;
654                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
655            }
656            Expr::UnaryOp(op, inner) => {
657                let inner = self.materialize_correlated_for_row(inner, outer_row, outer_columns)?;
658                Ok(Expr::UnaryOp(*op, Box::new(inner)))
659            }
660            other => Ok(other.clone()),
661        }
662    }
663}