Skip to main content

powdb_query/executor/
prepared.rs

1//! PreparedQuery struct and related Engine methods.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::planner;
6use crate::result::{QueryError, QueryResult};
7use powdb_storage::catalog::Catalog;
8use powdb_storage::row::{ROW_MAGIC, ROW_PREFIX_SIZE};
9use powdb_storage::types::*;
10
11use super::compiled::*;
12use super::eval::*;
13use super::Engine;
14
15pub struct PreparedQuery {
16    plan_template: PlanNode,
17    /// Total number of `Expr::Literal` slots reachable from the plan.
18    /// Callers must supply exactly this many literals per execution.
19    pub param_count: usize,
20    /// Fast-path metadata for `PlanNode::Insert`. `Some` when:
21    ///   * the template is an Insert, and
22    ///   * every assignment RHS is `Expr::Literal(_)` (no computed exprs),
23    ///     which means param_count == assignments.len() and the caller's
24    ///     literal slice maps 1:1 to schema column indices.
25    ///
26    /// Mission C Phase 15: upgraded from a bare `Vec<usize>` to a
27    /// dedicated [`InsertFast`] struct so the execute path can skip the
28    /// second `catalog.schema(table)` HashMap lookup just to read
29    /// `n_cols`, and can dispatch through `get_table_mut` + `tbl.insert`
30    /// instead of going via the generic `catalog.insert` wrapper.
31    insert_fast: Option<InsertFast>,
32    /// Mission C Phase 14: fast-path metadata for point updates by primary
33    /// key — `T filter .pk = <lit> update { col := <lit> }` where `pk` is
34    /// an indexed column and `col` is fixed-size and not indexed. At
35    /// execute time we skip plan clone, substitute walk, schema re-lookup,
36    /// `resolved_assignments` + `FastPatch` + `matching_rids` Vec allocs,
37    /// and the whole `PlanNode::Update` arm. Just a btree lookup and a
38    /// byte patch.
39    update_pk_fast: Option<UpdatePkFast>,
40}
41
42/// Mission C Phase 15: precomputed insert fast-path metadata. Built once
43/// in [`Engine::prepare`] from a `PlanNode::Insert` template whose every
44/// assignment RHS is a raw literal. The execute path reads `n_cols` and
45/// `col_indices` directly — no catalog schema lookup needed.
46#[derive(Clone)]
47struct InsertFast {
48    /// Mission C Phase 18: cached slot index into `Catalog::tables`.
49    /// Resolved once at `prepare` time and stable for the lifetime of
50    /// the catalog (PowDB has no DROP TABLE). Lets the hot path dispatch
51    /// through `catalog.table_by_slot_mut(slot)` — a pure Vec index,
52    /// no hash, no bucket walk, no string compare.
53    table_slot: usize,
54    /// Schema column index for each positional literal, in the order the
55    /// caller passes them.
56    col_indices: Vec<usize>,
57    /// Total number of schema columns — the size `insert_values_scratch`
58    /// must be resized to before filling positions via `col_indices`.
59    /// Cached here so the hot loop skips `catalog.schema(table)` entirely.
60    n_cols: usize,
61}
62
63/// Mission C Phase 14: precomputed fast-path for `update_by_pk` shaped
64/// prepared queries. Built once in [`Engine::prepare`] and reused on every
65/// `execute_prepared` call.
66#[derive(Clone)]
67struct UpdatePkFast {
68    /// Mission C Phase 18: cached slot index into `Catalog::tables`.
69    /// Resolved once at `prepare` time and stable for the lifetime of
70    /// the catalog. At a 52ns total budget the swap from FxHashMap
71    /// probe to a Vec index is measurable.
72    table_slot: usize,
73    /// Name of the key column (the `.id = ?` side). We look this up in
74    /// the owning table's `indexed_cols` at execute time rather than
75    /// caching a raw `&BTree` — the engine owns the catalog and can't
76    /// hand out long-lived borrows anyway, and the n≤5 linear scan is
77    /// a handful of ns.
78    key_col: String,
79    /// Byte offset of the target fixed column in the row encoding:
80    /// `2 + bitmap_size + layout.fixed_offsets[target_col]`.
81    field_off: usize,
82    /// Byte offset of the bitmap byte containing the target column's null
83    /// bit (`2 + target_col / 8`).
84    bitmap_byte_off: usize,
85    /// Bit mask for the target column's null bit.
86    bit_mask: u8,
87    /// Type of the target fixed column — drives the literal-to-bytes
88    /// encoding at execute time.
89    target_type: TypeId,
90    /// Index into the caller's `literals` slice that holds the filter key.
91    /// Always 0 today (filter literal is visited before the assignment
92    /// RHS), but stored explicitly so the contract is obvious.
93    key_literal_idx: usize,
94    /// Index into the caller's `literals` slice that holds the new value.
95    value_literal_idx: usize,
96}
97
98impl Engine {
99    pub fn prepare(&mut self, query: &str) -> Result<PreparedQuery, QueryError> {
100        let plan = planner::plan(query).map_err(|e| QueryError::Parse(e.to_string()))?;
101        let param_count = crate::plan_cache::count_literal_slots(&plan);
102
103        // Insert fast path: if the template is Insert and every assignment
104        // RHS is a literal, resolve column indices once here and store
105        // them. execute_prepared will skip the plan-clone + substitute
106        // walk on this path.
107        //
108        // Mission C Phase 15: also cache `n_cols` and the target table
109        // name so execute_prepared doesn't need a second HashMap lookup
110        // on `self.catalog.schema(table)` just to size the scratch Vec.
111        let insert_fast = match &plan {
112            // Single-row inserts only: the byte-level fast path patches one
113            // row's worth of scratch. Multi-row `insert T {..},{..}` falls
114            // through to the generic plan path (always correct).
115            PlanNode::Insert { table, rows }
116                if rows.len() == 1
117                    && rows[0].iter().all(|a| matches!(a.value, Expr::Literal(_)))
118                    && param_count == rows[0].len() =>
119            {
120                let assignments = &rows[0];
121                let table_slot = self
122                    .catalog
123                    .table_slot(table)
124                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
125                let schema = &self.catalog.table_by_slot(table_slot).schema;
126                let n_cols = schema.columns.len();
127                let indices: Result<Vec<usize>, QueryError> = assignments
128                    .iter()
129                    .map(|a| {
130                        schema
131                            .column_index(&a.field)
132                            .ok_or_else(|| QueryError::ColumnNotFound {
133                                table: table.clone(),
134                                column: a.field.clone(),
135                            })
136                    })
137                    .collect();
138                Some(InsertFast {
139                    table_slot,
140                    col_indices: indices?,
141                    n_cols,
142                })
143            }
144            _ => None,
145        };
146
147        // Mission C Phase 14: update-by-pk fast path. Match on the shape
148        // planner::plan_update builds for `T filter .pk = ? update
149        // { col := ? }` — `Update { input: IndexScan(pk), assignments:
150        // [{col, Literal}] }` — and only if every precondition holds:
151        //   * `pk` is an indexed column (so the executor would take the
152        //     btree.lookup path at run time regardless)
153        //   * there's exactly one assignment
154        //   * the assigned column is fixed-size and *not* indexed (so we
155        //     don't have to maintain any secondary index on write)
156        //   * both literal slots are already `Expr::Literal` (no computed
157        //     expressions)
158        // If any of these fail we fall through to the standard substitute
159        // + execute path.
160        let update_pk_fast = Self::try_build_update_pk_fast(&self.catalog, &plan);
161
162        Ok(PreparedQuery {
163            plan_template: plan,
164            param_count,
165            insert_fast,
166            update_pk_fast,
167        })
168    }
169
170    /// Mission C Phase 14: inspect a planned tree and, if it matches the
171    /// `update_by_pk` fast-path shape, return the precomputed byte-patch
172    /// metadata. Returns `None` on any mismatch — the caller falls through
173    /// to the substitute-and-execute path, which is always correct.
174    fn try_build_update_pk_fast(catalog: &Catalog, plan: &PlanNode) -> Option<UpdatePkFast> {
175        // Top level must be `Update { input: IndexScan(...), ... }`.
176        let (table, input, assignments) = match plan {
177            PlanNode::Update {
178                table,
179                input,
180                assignments,
181            } => (table, input.as_ref(), assignments),
182            _ => return None,
183        };
184        // Exactly one assignment — the bench hot path and the only case
185        // where a single byte-patch covers the whole mutation.
186        if assignments.len() != 1 {
187            return None;
188        }
189        let assn = &assignments[0];
190        // Assignment RHS must be a raw literal, not a computed expr.
191        if !matches!(assn.value, Expr::Literal(_)) {
192            return None;
193        }
194        // Input must be an IndexScan on the same table with a literal key.
195        let (key_col, key_table) = match input {
196            PlanNode::IndexScan {
197                table: t,
198                column,
199                key: Expr::Literal(_),
200            } => (column.clone(), t.clone()),
201            _ => return None,
202        };
203        if &key_table != table {
204            return None;
205        }
206
207        // Look up schema + index state from the live catalog, caching
208        // the slot so the execute path skips the name probe.
209        let table_slot = catalog.table_slot(table)?;
210        let tbl = catalog.table_by_slot(table_slot);
211        let schema = &tbl.schema;
212
213        // Key column must have an index (the btree.lookup path is what
214        // makes the fast path worth building).
215        if !tbl.has_index(&key_col) {
216            return None;
217        }
218
219        // Target column must exist, be fixed-size, and NOT be indexed (so
220        // we don't have to maintain any secondary index here).
221        let target_col_idx = schema.column_index(&assn.field)?;
222        let target_type = schema.columns[target_col_idx].type_id;
223        if !is_fixed_size(target_type) {
224            return None;
225        }
226        if tbl.has_indexed_col(target_col_idx) {
227            return None;
228        }
229
230        // Precompute byte offsets from the cached row layout.
231        let layout = tbl.row_layout();
232        let fixed_off = layout.fixed_offset(target_col_idx)?;
233        let bitmap_size = layout.bitmap_size();
234        let field_off = 2 + bitmap_size + fixed_off;
235        let bitmap_byte_off = 2 + target_col_idx / 8;
236        let bit_mask = 1u8 << (target_col_idx % 8);
237
238        // Literal walk order for `Update { IndexScan(key), [{value}] }`
239        // (see `plan_cache::substitute_plan` — input first, then the
240        // assignments). The filter key is literal 0, the assignment RHS
241        // is literal 1.
242        Some(UpdatePkFast {
243            table_slot,
244            key_col,
245            field_off,
246            bitmap_byte_off,
247            bit_mask,
248            target_type,
249            key_literal_idx: 0,
250            value_literal_idx: 1,
251        })
252    }
253
254    /// Execute a [`PreparedQuery`] with the given literal values.
255    ///
256    /// The literals are substituted into a clone of the template plan in
257    /// the same deterministic walk order that [`crate::canonicalize`]
258    /// produces (filter predicate first, then projection, then assignment
259    /// RHS, and so on). Substitution errors here mean the caller passed
260    /// the wrong number of literals for this query shape.
261    pub fn execute_prepared(
262        &mut self,
263        prep: &PreparedQuery,
264        literals: &[Literal],
265    ) -> Result<QueryResult, QueryError> {
266        if literals.len() != prep.param_count {
267            return Err(QueryError::Execution(format!(
268                "prepared query expects {} literal(s), got {}",
269                prep.param_count,
270                literals.len(),
271            )));
272        }
273
274        // Mission C Phase 14: update-by-pk fast path. Skip plan clone,
275        // substitute walk, resolved_assignments, FastPatch, Vec<RowId>,
276        // RowLayout::new — straight to btree.lookup_int + byte patch.
277        // On rare mismatches (wrong literal type, index dropped after
278        // prepare) the helper returns `Ok(None)` and we fall through to
279        // the generic substitute-and-execute path below.
280        if let Some(fast) = &prep.update_pk_fast {
281            if let Some(result) = self.try_execute_update_pk_fast(fast, literals)? {
282                // Mark dependent views dirty for prepared update fast path.
283                if let PlanNode::Update { table, .. } = &prep.plan_template {
284                    self.view_registry.mark_dependents_dirty(table);
285                }
286                // Mission B (post-review): statement-boundary WAL group
287                // commit. The fast path appended an Update record but did
288                // not flush — flush it now so the executor's contract is
289                // "WAL is on disk before this returns".
290                self.catalog
291                    .commit_autocommit()
292                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
293                return Ok(result);
294            }
295        }
296
297        // Insert fast path: skip plan-clone + substitute walk + PlanNode::Insert
298        // arm's column-index resolution. Build the Row directly from the
299        // caller's literal slice using indices we resolved at prepare time.
300        // Saves ~300-500ns per insert on the bench.
301        //
302        // Mission C Phase 13: the scratch `Vec<Value>` is reused across
303        // calls — no fresh allocation per insert. We split the borrow
304        // between `self.catalog` and `self.insert_values_scratch` by
305        // moving the scratch into a local, filling it, passing to the
306        // catalog, and putting it back.
307        //
308        // Mission C Phase 15: the cached `InsertFast` carries `n_cols`
309        // and the table name, so the hot path makes exactly one catalog
310        // HashMap lookup (`get_table_mut`) and dispatches straight into
311        // `tbl.insert` — no intermediate schema lookup, no generic
312        // `Catalog::insert` wrapper.
313        if let Some(fast) = &prep.insert_fast {
314            let mut values = std::mem::take(&mut self.insert_values_scratch);
315            values.clear();
316            values.resize(fast.n_cols, Value::Empty);
317            for (pos, lit) in literals.iter().enumerate() {
318                values[fast.col_indices[pos]] = literal_value_from(lit);
319            }
320            // Mission C Phase 18: direct O(1) slot index — no
321            // catalog hash probe. Slot was resolved at prepare time.
322            // Durability fix: route through the WAL-logging `insert_by_slot`
323            // (was the raw `Table::insert`, which bypassed the WAL and lost
324            // every prepared insert on a crash).
325            let res = self
326                .catalog
327                .insert_by_slot(fast.table_slot, &values)
328                .map_err(|e| e.to_string());
329            // Clear strings before returning the scratch — don't keep
330            // dangling allocations from the previous row alive across
331            // calls. `clear()` drops the Value::Str entries.
332            values.clear();
333            self.insert_values_scratch = values;
334            res?;
335            // Mark dependent views dirty for prepared insert fast path.
336            if let PlanNode::Insert { table, .. } = &prep.plan_template {
337                self.view_registry.mark_dependents_dirty(table);
338            }
339            // Mission B (post-review): statement-boundary WAL group commit.
340            self.catalog
341                .commit_autocommit()
342                .map_err(|e| QueryError::StorageError(e.to_string()))?;
343            return Ok(QueryResult::Modified(1));
344        }
345
346        let mut plan = prep.plan_template.clone();
347        let mut idx = 0usize;
348        crate::plan_cache::substitute_plan(&mut plan, literals, &mut idx);
349        debug_assert_eq!(idx, literals.len());
350        let result = self.execute_plan(&plan);
351        // Mission B (post-review): statement-boundary WAL group commit.
352        // No-op when nothing was buffered (read-only plans).
353        self.catalog
354            .commit_autocommit()
355            .map_err(|e| QueryError::StorageError(e.to_string()))?;
356        result
357    }
358
359    /// Mission C Phase 14: point-update fast path for prepared
360    /// `T filter .pk = ? update { col := ? }` queries. The caller has
361    /// already verified this is an int-indexed pk with a fixed-size,
362    /// non-indexed target column; all we do here is pluck the two
363    /// literals out of the caller's slice, run one `btree.lookup_int`,
364    /// and patch 1–8 bytes of the row. No plan clone, no allocations.
365    ///
366    /// Returns:
367    ///   * `Ok(Some(result))` — fast path took the mutation.
368    ///   * `Ok(None)` — can't take the fast path this call (wrong
369    ///     literal type, index dropped since prepare, etc.). Caller
370    ///     falls through to the generic substitute-and-execute path.
371    ///   * `Err(_)` — real error (table gone, I/O, etc.).
372    #[inline]
373    fn try_execute_update_pk_fast(
374        &mut self,
375        fast: &UpdatePkFast,
376        literals: &[Literal],
377    ) -> Result<Option<QueryResult>, QueryError> {
378        // 1) Extract the key literal. The fast path is only built for
379        //    int key columns; any other literal type means the caller
380        //    is violating the prepared-query contract or the schema
381        //    changed — either way, fall back.
382        let key_int = match &literals[fast.key_literal_idx] {
383            Literal::Int(v) => *v,
384            _ => return Ok(None),
385        };
386
387        // 2) Encode the new value as little-endian bytes matching the
388        //    target column's fixed encoding.
389        let bytes: FixedBytes = match (fast.target_type, &literals[fast.value_literal_idx]) {
390            (TypeId::Int, Literal::Int(v)) => FixedBytes::I64(v.to_le_bytes()),
391            (TypeId::DateTime, Literal::Int(v)) => FixedBytes::I64(v.to_le_bytes()),
392            (TypeId::Float, Literal::Float(v)) => FixedBytes::F64(v.to_le_bytes()),
393            (TypeId::Bool, Literal::Bool(v)) => FixedBytes::Bool(if *v { 1 } else { 0 }),
394            // Type mismatch — fall back to the generic path for a
395            // consistent error shape.
396            _ => return Ok(None),
397        };
398
399        // 3) Look up the table + btree, do the int lookup, patch the row
400        //    in place. Phase 18: table dispatch is a direct slot index;
401        //    the btree lookup is the linear scan over `indexed_cols`.
402        //    Single btree.lookup_int + one `with_row_bytes_mut` call.
403        //    No Vec allocations at all.
404        //
405        // Mission B2: route the in-place patch through the catalog's
406        // WAL-logged wrapper so crash recovery sees the update. The
407        // extra cost is one WAL append + fsync per query — the hot
408        // loop structure is unchanged.
409        let tbl = self.catalog.table_by_slot_mut(fast.table_slot);
410        let Some(btree) = tbl.index(&fast.key_col) else {
411            // Index dropped since prepare — bail to the generic path.
412            return Ok(None);
413        };
414        let Some(rid) = btree.lookup_int(key_int) else {
415            return Ok(Some(QueryResult::Modified(0)));
416        };
417
418        let fast_table_slot = fast.table_slot;
419        let bitmap_byte_off = fast.bitmap_byte_off;
420        let bit_mask = fast.bit_mask;
421        let field_off = fast.field_off;
422        let ok = self
423            .catalog
424            .update_row_bytes_logged_by_slot(fast_table_slot, rid, |row| {
425                let base = if row.len() >= ROW_PREFIX_SIZE && &row[0..4] == ROW_MAGIC {
426                    ROW_PREFIX_SIZE
427                } else {
428                    0
429                };
430                // Idempotent null-bit clear — safe even when the column was
431                // already non-null (the overwhelmingly common case).
432                row[base + bitmap_byte_off] &= !bit_mask;
433                let field_bytes = bytes.as_slice();
434                row[base + field_off..base + field_off + field_bytes.len()]
435                    .copy_from_slice(field_bytes);
436            })
437            .map_err(|e| QueryError::StorageError(e.to_string()))?;
438
439        Ok(Some(QueryResult::Modified(if ok { 1 } else { 0 })))
440    }
441
442    /// Mission C Phase 13: moving variant of [`Engine::execute_prepared`]
443    /// for the insert fast path. Takes `literals` by mutable reference
444    /// so that each `Literal::String` can be consumed via `mem::take`
445    /// instead of cloned into a `Value::Str`. On `insert_batch_1k` that
446    /// removes three per-row heap allocations (name, status, email),
447    /// bringing the workload over the line vs SQLite's amortized
448    /// prepare+execute loop.
449    ///
450    /// The caller's `Literal::String` entries are replaced with empty
451    /// strings on successful inserts — the `literals` slice is *not*
452    /// left in a valid-for-reuse state except for `Int`/`Float`/`Bool`
453    /// values. Non-insert templates fall through to the standard
454    /// substitute-and-execute path.
455    pub fn execute_prepared_take(
456        &mut self,
457        prep: &PreparedQuery,
458        literals: &mut [Literal],
459    ) -> Result<QueryResult, QueryError> {
460        if literals.len() != prep.param_count {
461            return Err(QueryError::Execution(format!(
462                "prepared query expects {} literal(s), got {}",
463                prep.param_count,
464                literals.len(),
465            )));
466        }
467
468        if let Some(fast) = &prep.insert_fast {
469            let mut values = std::mem::take(&mut self.insert_values_scratch);
470            values.clear();
471            values.resize(fast.n_cols, Value::Empty);
472            for (pos, lit) in literals.iter_mut().enumerate() {
473                values[fast.col_indices[pos]] = literal_value_take(lit);
474            }
475            // Mission C Phase 18: direct O(1) slot index — see
476            // `execute_prepared` for rationale. This is the hot path
477            // for `insert_batch_1k`. Durability fix: WAL-logging
478            // `insert_by_slot` (was the raw `Table::insert`).
479            let res = self
480                .catalog
481                .insert_by_slot(fast.table_slot, &values)
482                .map_err(|e| e.to_string());
483            values.clear();
484            self.insert_values_scratch = values;
485            res?;
486            // Mission B (post-review): statement-boundary WAL group commit.
487            self.catalog
488                .commit_autocommit()
489                .map_err(|e| QueryError::StorageError(e.to_string()))?;
490            return Ok(QueryResult::Modified(1));
491        }
492
493        // Non-insert templates — fall back to the standard path. We
494        // can't usefully move the literals because `substitute_plan`
495        // still expects an immutable slice, and the non-insert hot
496        // paths are dominated by plan walks anyway.
497        self.execute_prepared(prep, literals)
498    }
499
500    /// Walk an expression tree and replace every `InSubquery` node with
501    /// an `InList` by executing the subquery and collecting its first
502    /// column as literal values. This must be called before entering
503    /// the row-by-row scan loop because the scan closure can't call back
504    /// into the engine.
505    pub(super) fn materialize_subqueries(&mut self, expr: &Expr) -> Result<Expr, QueryError> {
506        match expr {
507            Expr::InSubquery {
508                expr: inner,
509                subquery,
510                negated,
511            } => {
512                if is_correlated_subquery(subquery, &self.catalog) {
513                    let inner = self.materialize_subqueries(inner)?;
514                    return Ok(Expr::InSubquery {
515                        expr: Box::new(inner),
516                        subquery: subquery.clone(),
517                        negated: *negated,
518                    });
519                }
520                let inner = self.materialize_subqueries(inner)?;
521                // Plan and execute the subquery.
522                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
523                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
524                let result = self.execute_plan(&sub_plan)?;
525                let values = match result {
526                    QueryResult::Rows { rows, .. } => rows
527                        .into_iter()
528                        .filter_map(|mut row| {
529                            if row.is_empty() {
530                                None
531                            } else {
532                                Some(value_to_expr(row.swap_remove(0)))
533                            }
534                        })
535                        .collect(),
536                    _ => Vec::new(),
537                };
538                // WS2: byte-budget guard on the materialized IN-list.
539                self.charge_in_list(&values)?;
540                Ok(Expr::InList {
541                    expr: Box::new(inner),
542                    list: values,
543                    negated: *negated,
544                })
545            }
546            Expr::ExistsSubquery { subquery, negated } => {
547                if is_correlated_subquery(subquery, &self.catalog) {
548                    return Ok(expr.clone());
549                }
550                // Uncorrelated EXISTS: run the subquery once and collapse
551                // into a Bool literal.
552                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
553                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
554                let result = self.execute_plan(&sub_plan)?;
555                let has_rows = match result {
556                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
557                    _ => false,
558                };
559                let truth = if *negated { !has_rows } else { has_rows };
560                Ok(Expr::Literal(Literal::Bool(truth)))
561            }
562            Expr::BinaryOp(l, op, r) => {
563                let l = self.materialize_subqueries(l)?;
564                let r = self.materialize_subqueries(r)?;
565                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
566            }
567            Expr::UnaryOp(op, inner) => {
568                let inner = self.materialize_subqueries(inner)?;
569                Ok(Expr::UnaryOp(*op, Box::new(inner)))
570            }
571            Expr::Case { whens, else_expr } => {
572                let whens = whens
573                    .iter()
574                    .map(|(c, r)| {
575                        let c = self.materialize_subqueries(c)?;
576                        let r = self.materialize_subqueries(r)?;
577                        Ok((Box::new(c), Box::new(r)))
578                    })
579                    .collect::<Result<Vec<_>, QueryError>>()?;
580                let else_expr = match else_expr {
581                    Some(e) => Some(Box::new(self.materialize_subqueries(e)?)),
582                    None => None,
583                };
584                Ok(Expr::Case { whens, else_expr })
585            }
586            // Leaf nodes: no subqueries possible.
587            other => Ok(other.clone()),
588        }
589    }
590
591    /// Write-path per-row materialisation of correlated subqueries.
592    pub(super) fn materialize_correlated_for_row(
593        &mut self,
594        expr: &Expr,
595        outer_row: &[Value],
596        outer_columns: &[String],
597    ) -> Result<Expr, QueryError> {
598        match expr {
599            Expr::InSubquery {
600                expr: inner,
601                subquery,
602                negated,
603            } => {
604                let inner = self.materialize_correlated_for_row(inner, outer_row, outer_columns)?;
605                let mut sub = *subquery.clone();
606                if let Some(ref filter) = sub.filter {
607                    sub.filter = Some(substitute_outer_refs(
608                        filter,
609                        &sub.source,
610                        &self.catalog,
611                        outer_row,
612                        outer_columns,
613                    ));
614                }
615                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
616                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
617                let result = self.execute_plan(&sub_plan)?;
618                let values = match result {
619                    QueryResult::Rows { rows, .. } => rows
620                        .into_iter()
621                        .filter_map(|mut row| {
622                            if row.is_empty() {
623                                None
624                            } else {
625                                Some(value_to_expr(row.swap_remove(0)))
626                            }
627                        })
628                        .collect(),
629                    _ => Vec::new(),
630                };
631                Ok(Expr::InList {
632                    expr: Box::new(inner),
633                    list: values,
634                    negated: *negated,
635                })
636            }
637            Expr::ExistsSubquery { subquery, negated } => {
638                let mut sub = *subquery.clone();
639                if let Some(ref filter) = sub.filter {
640                    sub.filter = Some(substitute_outer_refs(
641                        filter,
642                        &sub.source,
643                        &self.catalog,
644                        outer_row,
645                        outer_columns,
646                    ));
647                }
648                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
649                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
650                let result = self.execute_plan(&sub_plan)?;
651                let has_rows = match result {
652                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
653                    _ => false,
654                };
655                let truth = if *negated { !has_rows } else { has_rows };
656                Ok(Expr::Literal(Literal::Bool(truth)))
657            }
658            Expr::BinaryOp(l, op, r) => {
659                let l = self.materialize_correlated_for_row(l, outer_row, outer_columns)?;
660                let r = self.materialize_correlated_for_row(r, outer_row, outer_columns)?;
661                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
662            }
663            Expr::UnaryOp(op, inner) => {
664                let inner = self.materialize_correlated_for_row(inner, outer_row, outer_columns)?;
665                Ok(Expr::UnaryOp(*op, Box::new(inner)))
666            }
667            other => Ok(other.clone()),
668        }
669    }
670}