Skip to main content

powdb_query/executor/
prepared.rs

1//! PreparedQuery struct and related Engine methods.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::planner;
6use crate::result::{QueryError, QueryResult};
7use powdb_storage::catalog::Catalog;
8use powdb_storage::types::*;
9
10use super::compiled::*;
11use super::eval::*;
12use super::Engine;
13
14pub struct PreparedQuery {
15    plan_template: PlanNode,
16    /// Total number of `Expr::Literal` slots reachable from the plan.
17    /// Callers must supply exactly this many literals per execution.
18    pub param_count: usize,
19    /// Fast-path metadata for `PlanNode::Insert`. `Some` when:
20    ///   * the template is an Insert, and
21    ///   * every assignment RHS is `Expr::Literal(_)` (no computed exprs),
22    ///     which means param_count == assignments.len() and the caller's
23    ///     literal slice maps 1:1 to schema column indices.
24    ///
25    /// Mission C Phase 15: upgraded from a bare `Vec<usize>` to a
26    /// dedicated [`InsertFast`] struct so the execute path can skip the
27    /// second `catalog.schema(table)` HashMap lookup just to read
28    /// `n_cols`, and can dispatch through `get_table_mut` + `tbl.insert`
29    /// instead of going via the generic `catalog.insert` wrapper.
30    insert_fast: Option<InsertFast>,
31    /// Mission C Phase 14: fast-path metadata for point updates by primary
32    /// key — `T filter .pk = <lit> update { col := <lit> }` where `pk` is
33    /// an indexed column and `col` is fixed-size and not indexed. At
34    /// execute time we skip plan clone, substitute walk, schema re-lookup,
35    /// `resolved_assignments` + `FastPatch` + `matching_rids` Vec allocs,
36    /// and the whole `PlanNode::Update` arm. Just a btree lookup and a
37    /// byte patch.
38    update_pk_fast: Option<UpdatePkFast>,
39}
40
41/// Mission C Phase 15: precomputed insert fast-path metadata. Built once
42/// in [`Engine::prepare`] from a `PlanNode::Insert` template whose every
43/// assignment RHS is a raw literal. The execute path reads `n_cols` and
44/// `col_indices` directly — no catalog schema lookup needed.
45#[derive(Clone)]
46struct InsertFast {
47    /// Mission C Phase 18: cached slot index into `Catalog::tables`.
48    /// Resolved once at `prepare` time and stable for the lifetime of
49    /// the catalog (PowDB has no DROP TABLE). Lets the hot path dispatch
50    /// through `catalog.table_by_slot_mut(slot)` — a pure Vec index,
51    /// no hash, no bucket walk, no string compare.
52    table_slot: usize,
53    /// Schema column index for each positional literal, in the order the
54    /// caller passes them.
55    col_indices: Vec<usize>,
56    /// Total number of schema columns — the size `insert_values_scratch`
57    /// must be resized to before filling positions via `col_indices`.
58    /// Cached here so the hot loop skips `catalog.schema(table)` entirely.
59    n_cols: usize,
60}
61
62/// Mission C Phase 14: precomputed fast-path for `update_by_pk` shaped
63/// prepared queries. Built once in [`Engine::prepare`] and reused on every
64/// `execute_prepared` call.
65#[derive(Clone)]
66struct UpdatePkFast {
67    /// Mission C Phase 18: cached slot index into `Catalog::tables`.
68    /// Resolved once at `prepare` time and stable for the lifetime of
69    /// the catalog. At a 52ns total budget the swap from FxHashMap
70    /// probe to a Vec index is measurable.
71    table_slot: usize,
72    /// Name of the key column (the `.id = ?` side). We look this up in
73    /// the owning table's `indexed_cols` at execute time rather than
74    /// caching a raw `&BTree` — the engine owns the catalog and can't
75    /// hand out long-lived borrows anyway, and the n≤5 linear scan is
76    /// a handful of ns.
77    key_col: String,
78    /// Byte offset of the target fixed column in the row encoding:
79    /// `2 + bitmap_size + layout.fixed_offsets[target_col]`.
80    field_off: usize,
81    /// Byte offset of the bitmap byte containing the target column's null
82    /// bit (`2 + target_col / 8`).
83    bitmap_byte_off: usize,
84    /// Bit mask for the target column's null bit.
85    bit_mask: u8,
86    /// Type of the target fixed column — drives the literal-to-bytes
87    /// encoding at execute time.
88    target_type: TypeId,
89    /// Index into the caller's `literals` slice that holds the filter key.
90    /// Always 0 today (filter literal is visited before the assignment
91    /// RHS), but stored explicitly so the contract is obvious.
92    key_literal_idx: usize,
93    /// Index into the caller's `literals` slice that holds the new value.
94    value_literal_idx: usize,
95}
96
97impl Engine {
98    pub fn prepare(&mut self, query: &str) -> Result<PreparedQuery, QueryError> {
99        let plan = planner::plan(query).map_err(|e| QueryError::Parse(e.to_string()))?;
100        let param_count = crate::plan_cache::count_literal_slots(&plan);
101
102        // Insert fast path: if the template is Insert and every assignment
103        // RHS is a literal, resolve column indices once here and store
104        // them. execute_prepared will skip the plan-clone + substitute
105        // walk on this path.
106        //
107        // Mission C Phase 15: also cache `n_cols` and the target table
108        // name so execute_prepared doesn't need a second HashMap lookup
109        // on `self.catalog.schema(table)` just to size the scratch Vec.
110        let insert_fast = match &plan {
111            PlanNode::Insert { table, assignments }
112                if assignments
113                    .iter()
114                    .all(|a| matches!(a.value, Expr::Literal(_)))
115                    && param_count == assignments.len() =>
116            {
117                let table_slot = self
118                    .catalog
119                    .table_slot(table)
120                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
121                let schema = &self.catalog.table_by_slot(table_slot).schema;
122                let n_cols = schema.columns.len();
123                let indices: Result<Vec<usize>, QueryError> = assignments
124                    .iter()
125                    .map(|a| {
126                        schema
127                            .column_index(&a.field)
128                            .ok_or_else(|| QueryError::ColumnNotFound {
129                                table: table.clone(),
130                                column: a.field.clone(),
131                            })
132                    })
133                    .collect();
134                Some(InsertFast {
135                    table_slot,
136                    col_indices: indices?,
137                    n_cols,
138                })
139            }
140            _ => None,
141        };
142
143        // Mission C Phase 14: update-by-pk fast path. Match on the shape
144        // planner::plan_update builds for `T filter .pk = ? update
145        // { col := ? }` — `Update { input: IndexScan(pk), assignments:
146        // [{col, Literal}] }` — and only if every precondition holds:
147        //   * `pk` is an indexed column (so the executor would take the
148        //     btree.lookup path at run time regardless)
149        //   * there's exactly one assignment
150        //   * the assigned column is fixed-size and *not* indexed (so we
151        //     don't have to maintain any secondary index on write)
152        //   * both literal slots are already `Expr::Literal` (no computed
153        //     expressions)
154        // If any of these fail we fall through to the standard substitute
155        // + execute path.
156        let update_pk_fast = Self::try_build_update_pk_fast(&self.catalog, &plan);
157
158        Ok(PreparedQuery {
159            plan_template: plan,
160            param_count,
161            insert_fast,
162            update_pk_fast,
163        })
164    }
165
166    /// Mission C Phase 14: inspect a planned tree and, if it matches the
167    /// `update_by_pk` fast-path shape, return the precomputed byte-patch
168    /// metadata. Returns `None` on any mismatch — the caller falls through
169    /// to the substitute-and-execute path, which is always correct.
170    fn try_build_update_pk_fast(catalog: &Catalog, plan: &PlanNode) -> Option<UpdatePkFast> {
171        // Top level must be `Update { input: IndexScan(...), ... }`.
172        let (table, input, assignments) = match plan {
173            PlanNode::Update {
174                table,
175                input,
176                assignments,
177            } => (table, input.as_ref(), assignments),
178            _ => return None,
179        };
180        // Exactly one assignment — the bench hot path and the only case
181        // where a single byte-patch covers the whole mutation.
182        if assignments.len() != 1 {
183            return None;
184        }
185        let assn = &assignments[0];
186        // Assignment RHS must be a raw literal, not a computed expr.
187        if !matches!(assn.value, Expr::Literal(_)) {
188            return None;
189        }
190        // Input must be an IndexScan on the same table with a literal key.
191        let (key_col, key_table) = match input {
192            PlanNode::IndexScan {
193                table: t,
194                column,
195                key: Expr::Literal(_),
196            } => (column.clone(), t.clone()),
197            _ => return None,
198        };
199        if &key_table != table {
200            return None;
201        }
202
203        // Look up schema + index state from the live catalog, caching
204        // the slot so the execute path skips the name probe.
205        let table_slot = catalog.table_slot(table)?;
206        let tbl = catalog.table_by_slot(table_slot);
207        let schema = &tbl.schema;
208
209        // Key column must have an index (the btree.lookup path is what
210        // makes the fast path worth building).
211        if !tbl.has_index(&key_col) {
212            return None;
213        }
214
215        // Target column must exist, be fixed-size, and NOT be indexed (so
216        // we don't have to maintain any secondary index here).
217        let target_col_idx = schema.column_index(&assn.field)?;
218        let target_type = schema.columns[target_col_idx].type_id;
219        if !is_fixed_size(target_type) {
220            return None;
221        }
222        if tbl.has_indexed_col(target_col_idx) {
223            return None;
224        }
225
226        // Precompute byte offsets from the cached row layout.
227        let layout = tbl.row_layout();
228        let fixed_off = layout.fixed_offset(target_col_idx)?;
229        let bitmap_size = layout.bitmap_size();
230        let field_off = 2 + bitmap_size + fixed_off;
231        let bitmap_byte_off = 2 + target_col_idx / 8;
232        let bit_mask = 1u8 << (target_col_idx % 8);
233
234        // Literal walk order for `Update { IndexScan(key), [{value}] }`
235        // (see `plan_cache::substitute_plan` — input first, then the
236        // assignments). The filter key is literal 0, the assignment RHS
237        // is literal 1.
238        Some(UpdatePkFast {
239            table_slot,
240            key_col,
241            field_off,
242            bitmap_byte_off,
243            bit_mask,
244            target_type,
245            key_literal_idx: 0,
246            value_literal_idx: 1,
247        })
248    }
249
250    /// Execute a [`PreparedQuery`] with the given literal values.
251    ///
252    /// The literals are substituted into a clone of the template plan in
253    /// the same deterministic walk order that [`crate::canonicalize`]
254    /// produces (filter predicate first, then projection, then assignment
255    /// RHS, and so on). Substitution errors here mean the caller passed
256    /// the wrong number of literals for this query shape.
257    pub fn execute_prepared(
258        &mut self,
259        prep: &PreparedQuery,
260        literals: &[Literal],
261    ) -> Result<QueryResult, QueryError> {
262        if literals.len() != prep.param_count {
263            return Err(QueryError::Execution(format!(
264                "prepared query expects {} literal(s), got {}",
265                prep.param_count,
266                literals.len(),
267            )));
268        }
269
270        // Mission C Phase 14: update-by-pk fast path. Skip plan clone,
271        // substitute walk, resolved_assignments, FastPatch, Vec<RowId>,
272        // RowLayout::new — straight to btree.lookup_int + byte patch.
273        // On rare mismatches (wrong literal type, index dropped after
274        // prepare) the helper returns `Ok(None)` and we fall through to
275        // the generic substitute-and-execute path below.
276        if let Some(fast) = &prep.update_pk_fast {
277            if let Some(result) = self.try_execute_update_pk_fast(fast, literals)? {
278                // Mark dependent views dirty for prepared update fast path.
279                if let PlanNode::Update { table, .. } = &prep.plan_template {
280                    self.view_registry.mark_dependents_dirty(table);
281                }
282                // Mission B (post-review): statement-boundary WAL group
283                // commit. The fast path appended an Update record but did
284                // not flush — flush it now so the executor's contract is
285                // "WAL is on disk before this returns".
286                self.catalog
287                    .sync_wal()
288                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
289                return Ok(result);
290            }
291        }
292
293        // Insert fast path: skip plan-clone + substitute walk + PlanNode::Insert
294        // arm's column-index resolution. Build the Row directly from the
295        // caller's literal slice using indices we resolved at prepare time.
296        // Saves ~300-500ns per insert on the bench.
297        //
298        // Mission C Phase 13: the scratch `Vec<Value>` is reused across
299        // calls — no fresh allocation per insert. We split the borrow
300        // between `self.catalog` and `self.insert_values_scratch` by
301        // moving the scratch into a local, filling it, passing to the
302        // catalog, and putting it back.
303        //
304        // Mission C Phase 15: the cached `InsertFast` carries `n_cols`
305        // and the table name, so the hot path makes exactly one catalog
306        // HashMap lookup (`get_table_mut`) and dispatches straight into
307        // `tbl.insert` — no intermediate schema lookup, no generic
308        // `Catalog::insert` wrapper.
309        if let Some(fast) = &prep.insert_fast {
310            let mut values = std::mem::take(&mut self.insert_values_scratch);
311            values.clear();
312            values.resize(fast.n_cols, Value::Empty);
313            for (pos, lit) in literals.iter().enumerate() {
314                values[fast.col_indices[pos]] = literal_value_from(lit);
315            }
316            // Mission C Phase 18: direct O(1) slot index — no
317            // catalog hash probe. Slot was resolved at prepare time.
318            let tbl = self.catalog.table_by_slot_mut(fast.table_slot);
319            let res = tbl.insert(&values).map_err(|e| e.to_string());
320            // Clear strings before returning the scratch — don't keep
321            // dangling allocations from the previous row alive across
322            // calls. `clear()` drops the Value::Str entries.
323            values.clear();
324            self.insert_values_scratch = values;
325            res?;
326            // Mark dependent views dirty for prepared insert fast path.
327            if let PlanNode::Insert { table, .. } = &prep.plan_template {
328                self.view_registry.mark_dependents_dirty(table);
329            }
330            // Mission B (post-review): statement-boundary WAL group commit.
331            self.catalog
332                .sync_wal()
333                .map_err(|e| QueryError::StorageError(e.to_string()))?;
334            return Ok(QueryResult::Modified(1));
335        }
336
337        let mut plan = prep.plan_template.clone();
338        let mut idx = 0usize;
339        crate::plan_cache::substitute_plan(&mut plan, literals, &mut idx);
340        debug_assert_eq!(idx, literals.len());
341        let result = self.execute_plan(&plan);
342        // Mission B (post-review): statement-boundary WAL group commit.
343        // No-op when nothing was buffered (read-only plans).
344        self.catalog
345            .sync_wal()
346            .map_err(|e| QueryError::StorageError(e.to_string()))?;
347        result
348    }
349
350    /// Mission C Phase 14: point-update fast path for prepared
351    /// `T filter .pk = ? update { col := ? }` queries. The caller has
352    /// already verified this is an int-indexed pk with a fixed-size,
353    /// non-indexed target column; all we do here is pluck the two
354    /// literals out of the caller's slice, run one `btree.lookup_int`,
355    /// and patch 1–8 bytes of the row. No plan clone, no allocations.
356    ///
357    /// Returns:
358    ///   * `Ok(Some(result))` — fast path took the mutation.
359    ///   * `Ok(None)` — can't take the fast path this call (wrong
360    ///     literal type, index dropped since prepare, etc.). Caller
361    ///     falls through to the generic substitute-and-execute path.
362    ///   * `Err(_)` — real error (table gone, I/O, etc.).
363    #[inline]
364    fn try_execute_update_pk_fast(
365        &mut self,
366        fast: &UpdatePkFast,
367        literals: &[Literal],
368    ) -> Result<Option<QueryResult>, QueryError> {
369        // 1) Extract the key literal. The fast path is only built for
370        //    int key columns; any other literal type means the caller
371        //    is violating the prepared-query contract or the schema
372        //    changed — either way, fall back.
373        let key_int = match &literals[fast.key_literal_idx] {
374            Literal::Int(v) => *v,
375            _ => return Ok(None),
376        };
377
378        // 2) Encode the new value as little-endian bytes matching the
379        //    target column's fixed encoding.
380        let bytes: FixedBytes = match (fast.target_type, &literals[fast.value_literal_idx]) {
381            (TypeId::Int, Literal::Int(v)) => FixedBytes::I64(v.to_le_bytes()),
382            (TypeId::DateTime, Literal::Int(v)) => FixedBytes::I64(v.to_le_bytes()),
383            (TypeId::Float, Literal::Float(v)) => FixedBytes::F64(v.to_le_bytes()),
384            (TypeId::Bool, Literal::Bool(v)) => FixedBytes::Bool(if *v { 1 } else { 0 }),
385            // Type mismatch — fall back to the generic path for a
386            // consistent error shape.
387            _ => return Ok(None),
388        };
389
390        // 3) Look up the table + btree, do the int lookup, patch the row
391        //    in place. Phase 18: table dispatch is a direct slot index;
392        //    the btree lookup is the linear scan over `indexed_cols`.
393        //    Single btree.lookup_int + one `with_row_bytes_mut` call.
394        //    No Vec allocations at all.
395        //
396        // Mission B2: route the in-place patch through the catalog's
397        // WAL-logged wrapper so crash recovery sees the update. The
398        // extra cost is one WAL append + fsync per query — the hot
399        // loop structure is unchanged.
400        let tbl = self.catalog.table_by_slot_mut(fast.table_slot);
401        let Some(btree) = tbl.index(&fast.key_col) else {
402            // Index dropped since prepare — bail to the generic path.
403            return Ok(None);
404        };
405        let Some(rid) = btree.lookup_int(key_int) else {
406            return Ok(Some(QueryResult::Modified(0)));
407        };
408
409        let fast_table_slot = fast.table_slot;
410        let bitmap_byte_off = fast.bitmap_byte_off;
411        let bit_mask = fast.bit_mask;
412        let field_off = fast.field_off;
413        let ok = self
414            .catalog
415            .update_row_bytes_logged_by_slot(fast_table_slot, rid, |row| {
416                // Idempotent null-bit clear — safe even when the column was
417                // already non-null (the overwhelmingly common case).
418                row[bitmap_byte_off] &= !bit_mask;
419                let field_bytes = bytes.as_slice();
420                row[field_off..field_off + field_bytes.len()].copy_from_slice(field_bytes);
421            })
422            .map_err(|e| QueryError::StorageError(e.to_string()))?;
423
424        Ok(Some(QueryResult::Modified(if ok { 1 } else { 0 })))
425    }
426
427    /// Mission C Phase 13: moving variant of [`Engine::execute_prepared`]
428    /// for the insert fast path. Takes `literals` by mutable reference
429    /// so that each `Literal::String` can be consumed via `mem::take`
430    /// instead of cloned into a `Value::Str`. On `insert_batch_1k` that
431    /// removes three per-row heap allocations (name, status, email),
432    /// bringing the workload over the line vs SQLite's amortized
433    /// prepare+execute loop.
434    ///
435    /// The caller's `Literal::String` entries are replaced with empty
436    /// strings on successful inserts — the `literals` slice is *not*
437    /// left in a valid-for-reuse state except for `Int`/`Float`/`Bool`
438    /// values. Non-insert templates fall through to the standard
439    /// substitute-and-execute path.
440    pub fn execute_prepared_take(
441        &mut self,
442        prep: &PreparedQuery,
443        literals: &mut [Literal],
444    ) -> Result<QueryResult, QueryError> {
445        if literals.len() != prep.param_count {
446            return Err(QueryError::Execution(format!(
447                "prepared query expects {} literal(s), got {}",
448                prep.param_count,
449                literals.len(),
450            )));
451        }
452
453        if let Some(fast) = &prep.insert_fast {
454            let mut values = std::mem::take(&mut self.insert_values_scratch);
455            values.clear();
456            values.resize(fast.n_cols, Value::Empty);
457            for (pos, lit) in literals.iter_mut().enumerate() {
458                values[fast.col_indices[pos]] = literal_value_take(lit);
459            }
460            // Mission C Phase 18: direct O(1) slot index — see
461            // `execute_prepared` for rationale. This is the hot path
462            // for `insert_batch_1k`.
463            let tbl = self.catalog.table_by_slot_mut(fast.table_slot);
464            let res = tbl.insert(&values).map_err(|e| e.to_string());
465            values.clear();
466            self.insert_values_scratch = values;
467            res?;
468            // Mission B (post-review): statement-boundary WAL group commit.
469            self.catalog
470                .sync_wal()
471                .map_err(|e| QueryError::StorageError(e.to_string()))?;
472            return Ok(QueryResult::Modified(1));
473        }
474
475        // Non-insert templates — fall back to the standard path. We
476        // can't usefully move the literals because `substitute_plan`
477        // still expects an immutable slice, and the non-insert hot
478        // paths are dominated by plan walks anyway.
479        self.execute_prepared(prep, literals)
480    }
481
482    /// Walk an expression tree and replace every `InSubquery` node with
483    /// an `InList` by executing the subquery and collecting its first
484    /// column as literal values. This must be called before entering
485    /// the row-by-row scan loop because the scan closure can't call back
486    /// into the engine.
487    pub(super) fn materialize_subqueries(&mut self, expr: &Expr) -> Result<Expr, QueryError> {
488        match expr {
489            Expr::InSubquery {
490                expr: inner,
491                subquery,
492                negated,
493            } => {
494                if is_correlated_subquery(subquery, &self.catalog) {
495                    let inner = self.materialize_subqueries(inner)?;
496                    return Ok(Expr::InSubquery {
497                        expr: Box::new(inner),
498                        subquery: subquery.clone(),
499                        negated: *negated,
500                    });
501                }
502                let inner = self.materialize_subqueries(inner)?;
503                // Plan and execute the subquery.
504                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
505                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
506                let result = self.execute_plan(&sub_plan)?;
507                let values = match result {
508                    QueryResult::Rows { rows, .. } => rows
509                        .into_iter()
510                        .filter_map(|mut row| {
511                            if row.is_empty() {
512                                None
513                            } else {
514                                Some(value_to_expr(row.swap_remove(0)))
515                            }
516                        })
517                        .collect(),
518                    _ => Vec::new(),
519                };
520                // WS2: byte-budget guard on the materialized IN-list.
521                self.charge_in_list(&values)?;
522                Ok(Expr::InList {
523                    expr: Box::new(inner),
524                    list: values,
525                    negated: *negated,
526                })
527            }
528            Expr::ExistsSubquery { subquery, negated } => {
529                if is_correlated_subquery(subquery, &self.catalog) {
530                    return Ok(expr.clone());
531                }
532                // Uncorrelated EXISTS: run the subquery once and collapse
533                // into a Bool literal.
534                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
535                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
536                let result = self.execute_plan(&sub_plan)?;
537                let has_rows = match result {
538                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
539                    _ => false,
540                };
541                let truth = if *negated { !has_rows } else { has_rows };
542                Ok(Expr::Literal(Literal::Bool(truth)))
543            }
544            Expr::BinaryOp(l, op, r) => {
545                let l = self.materialize_subqueries(l)?;
546                let r = self.materialize_subqueries(r)?;
547                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
548            }
549            Expr::UnaryOp(op, inner) => {
550                let inner = self.materialize_subqueries(inner)?;
551                Ok(Expr::UnaryOp(*op, Box::new(inner)))
552            }
553            Expr::Case { whens, else_expr } => {
554                let whens = whens
555                    .iter()
556                    .map(|(c, r)| {
557                        let c = self.materialize_subqueries(c)?;
558                        let r = self.materialize_subqueries(r)?;
559                        Ok((Box::new(c), Box::new(r)))
560                    })
561                    .collect::<Result<Vec<_>, QueryError>>()?;
562                let else_expr = match else_expr {
563                    Some(e) => Some(Box::new(self.materialize_subqueries(e)?)),
564                    None => None,
565                };
566                Ok(Expr::Case { whens, else_expr })
567            }
568            // Leaf nodes: no subqueries possible.
569            other => Ok(other.clone()),
570        }
571    }
572
573    /// Write-path per-row materialisation of correlated subqueries.
574    pub(super) fn materialize_correlated_for_row(
575        &mut self,
576        expr: &Expr,
577        outer_row: &[Value],
578        outer_columns: &[String],
579    ) -> Result<Expr, QueryError> {
580        match expr {
581            Expr::InSubquery {
582                expr: inner,
583                subquery,
584                negated,
585            } => {
586                let inner = self.materialize_correlated_for_row(inner, outer_row, outer_columns)?;
587                let mut sub = *subquery.clone();
588                if let Some(ref filter) = sub.filter {
589                    sub.filter = Some(substitute_outer_refs(
590                        filter,
591                        &sub.source,
592                        &self.catalog,
593                        outer_row,
594                        outer_columns,
595                    ));
596                }
597                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
598                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
599                let result = self.execute_plan(&sub_plan)?;
600                let values = match result {
601                    QueryResult::Rows { rows, .. } => rows
602                        .into_iter()
603                        .filter_map(|mut row| {
604                            if row.is_empty() {
605                                None
606                            } else {
607                                Some(value_to_expr(row.swap_remove(0)))
608                            }
609                        })
610                        .collect(),
611                    _ => Vec::new(),
612                };
613                Ok(Expr::InList {
614                    expr: Box::new(inner),
615                    list: values,
616                    negated: *negated,
617                })
618            }
619            Expr::ExistsSubquery { subquery, negated } => {
620                let mut sub = *subquery.clone();
621                if let Some(ref filter) = sub.filter {
622                    sub.filter = Some(substitute_outer_refs(
623                        filter,
624                        &sub.source,
625                        &self.catalog,
626                        outer_row,
627                        outer_columns,
628                    ));
629                }
630                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
631                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
632                let result = self.execute_plan(&sub_plan)?;
633                let has_rows = match result {
634                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
635                    _ => false,
636                };
637                let truth = if *negated { !has_rows } else { has_rows };
638                Ok(Expr::Literal(Literal::Bool(truth)))
639            }
640            Expr::BinaryOp(l, op, r) => {
641                let l = self.materialize_correlated_for_row(l, outer_row, outer_columns)?;
642                let r = self.materialize_correlated_for_row(r, outer_row, outer_columns)?;
643                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
644            }
645            Expr::UnaryOp(op, inner) => {
646                let inner = self.materialize_correlated_for_row(inner, outer_row, outer_columns)?;
647                Ok(Expr::UnaryOp(*op, Box::new(inner)))
648            }
649            other => Ok(other.clone()),
650        }
651    }
652}