powdb_query/executor/prepared.rs
1//! PreparedQuery struct and related Engine methods.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::planner;
6use crate::result::{QueryError, QueryResult};
7use powdb_storage::catalog::Catalog;
8use powdb_storage::row::{ROW_MAGIC, ROW_PREFIX_SIZE};
9use powdb_storage::types::*;
10
11use super::compiled::*;
12use super::eval::*;
13use super::Engine;
14
15pub struct PreparedQuery {
16 plan_template: PlanNode,
17 /// Total number of `Expr::Literal` slots reachable from the plan.
18 /// Callers must supply exactly this many literals per execution.
19 pub param_count: usize,
20 /// Fast-path metadata for `PlanNode::Insert`. `Some` when:
21 /// * the template is an Insert, and
22 /// * every assignment RHS is `Expr::Literal(_)` (no computed exprs),
23 /// which means param_count == assignments.len() and the caller's
24 /// literal slice maps 1:1 to schema column indices.
25 ///
26 /// Mission C Phase 15: upgraded from a bare `Vec<usize>` to a
27 /// dedicated [`InsertFast`] struct so the execute path can skip the
28 /// second `catalog.schema(table)` HashMap lookup just to read
29 /// `n_cols`, and can dispatch through `get_table_mut` + `tbl.insert`
30 /// instead of going via the generic `catalog.insert` wrapper.
31 insert_fast: Option<InsertFast>,
32 /// Mission C Phase 14: fast-path metadata for point updates by primary
33 /// key — `T filter .pk = <lit> update { col := <lit> }` where `pk` is
34 /// an indexed column and `col` is fixed-size and not indexed. At
35 /// execute time we skip plan clone, substitute walk, schema re-lookup,
36 /// `resolved_assignments` + `FastPatch` + `matching_rids` Vec allocs,
37 /// and the whole `PlanNode::Update` arm. Just a btree lookup and a
38 /// byte patch.
39 update_pk_fast: Option<UpdatePkFast>,
40}
41
42/// Mission C Phase 15: precomputed insert fast-path metadata. Built once
43/// in [`Engine::prepare`] from a `PlanNode::Insert` template whose every
44/// assignment RHS is a raw literal. The execute path reads `n_cols` and
45/// `col_indices` directly — no catalog schema lookup needed.
46#[derive(Clone)]
47struct InsertFast {
48 /// Mission C Phase 18: cached slot index into `Catalog::tables`.
49 /// Resolved once at `prepare` time and stable for the lifetime of
50 /// the catalog (PowDB has no DROP TABLE). Lets the hot path dispatch
51 /// through `catalog.table_by_slot_mut(slot)` — a pure Vec index,
52 /// no hash, no bucket walk, no string compare.
53 table_slot: usize,
54 /// Schema column index for each positional literal, in the order the
55 /// caller passes them.
56 col_indices: Vec<usize>,
57 /// Total number of schema columns — the size `insert_values_scratch`
58 /// must be resized to before filling positions via `col_indices`.
59 /// Cached here so the hot loop skips `catalog.schema(table)` entirely.
60 n_cols: usize,
61}
62
63/// Mission C Phase 14: precomputed fast-path for `update_by_pk` shaped
64/// prepared queries. Built once in [`Engine::prepare`] and reused on every
65/// `execute_prepared` call.
66#[derive(Clone)]
67struct UpdatePkFast {
68 /// Mission C Phase 18: cached slot index into `Catalog::tables`.
69 /// Resolved once at `prepare` time and stable for the lifetime of
70 /// the catalog. At a 52ns total budget the swap from FxHashMap
71 /// probe to a Vec index is measurable.
72 table_slot: usize,
73 /// Name of the key column (the `.id = ?` side). We look this up in
74 /// the owning table's `indexed_cols` at execute time rather than
75 /// caching a raw `&BTree` — the engine owns the catalog and can't
76 /// hand out long-lived borrows anyway, and the n≤5 linear scan is
77 /// a handful of ns.
78 key_col: String,
79 /// Byte offset of the target fixed column in the row encoding:
80 /// `2 + bitmap_size + layout.fixed_offsets[target_col]`.
81 field_off: usize,
82 /// Byte offset of the bitmap byte containing the target column's null
83 /// bit (`2 + target_col / 8`).
84 bitmap_byte_off: usize,
85 /// Bit mask for the target column's null bit.
86 bit_mask: u8,
87 /// Type of the target fixed column — drives the literal-to-bytes
88 /// encoding at execute time.
89 target_type: TypeId,
90 /// Index into the caller's `literals` slice that holds the filter key.
91 /// Always 0 today (filter literal is visited before the assignment
92 /// RHS), but stored explicitly so the contract is obvious.
93 key_literal_idx: usize,
94 /// Index into the caller's `literals` slice that holds the new value.
95 value_literal_idx: usize,
96}
97
98impl Engine {
99 pub fn prepare(&mut self, query: &str) -> Result<PreparedQuery, QueryError> {
100 let plan = planner::plan(query).map_err(|e| QueryError::Parse(e.to_string()))?;
101 let param_count = crate::plan_cache::count_literal_slots(&plan);
102
103 // Insert fast path: if the template is Insert and every assignment
104 // RHS is a literal, resolve column indices once here and store
105 // them. execute_prepared will skip the plan-clone + substitute
106 // walk on this path.
107 //
108 // Mission C Phase 15: also cache `n_cols` and the target table
109 // name so execute_prepared doesn't need a second HashMap lookup
110 // on `self.catalog.schema(table)` just to size the scratch Vec.
111 let insert_fast = match &plan {
112 // Single-row inserts only: the byte-level fast path patches one
113 // row's worth of scratch. Multi-row `insert T {..},{..}` falls
114 // through to the generic plan path (always correct).
115 PlanNode::Insert { table, rows }
116 if rows.len() == 1
117 && rows[0].iter().all(|a| matches!(a.value, Expr::Literal(_)))
118 && param_count == rows[0].len() =>
119 {
120 let assignments = &rows[0];
121 let table_slot = self
122 .catalog
123 .table_slot(table)
124 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
125 let schema = &self.catalog.table_by_slot(table_slot).schema;
126 let n_cols = schema.columns.len();
127 let indices: Result<Vec<usize>, QueryError> = assignments
128 .iter()
129 .map(|a| {
130 schema
131 .column_index(&a.field)
132 .ok_or_else(|| QueryError::ColumnNotFound {
133 table: table.clone(),
134 column: a.field.clone(),
135 })
136 })
137 .collect();
138 Some(InsertFast {
139 table_slot,
140 col_indices: indices?,
141 n_cols,
142 })
143 }
144 _ => None,
145 };
146
147 // Mission C Phase 14: update-by-pk fast path. Match on the shape
148 // planner::plan_update builds for `T filter .pk = ? update
149 // { col := ? }` — `Update { input: IndexScan(pk), assignments:
150 // [{col, Literal}] }` — and only if every precondition holds:
151 // * `pk` is an indexed column (so the executor would take the
152 // btree.lookup path at run time regardless)
153 // * there's exactly one assignment
154 // * the assigned column is fixed-size and *not* indexed (so we
155 // don't have to maintain any secondary index on write)
156 // * both literal slots are already `Expr::Literal` (no computed
157 // expressions)
158 // If any of these fail we fall through to the standard substitute
159 // + execute path.
160 let update_pk_fast = Self::try_build_update_pk_fast(&self.catalog, &plan);
161
162 Ok(PreparedQuery {
163 plan_template: plan,
164 param_count,
165 insert_fast,
166 update_pk_fast,
167 })
168 }
169
170 /// Mission C Phase 14: inspect a planned tree and, if it matches the
171 /// `update_by_pk` fast-path shape, return the precomputed byte-patch
172 /// metadata. Returns `None` on any mismatch — the caller falls through
173 /// to the substitute-and-execute path, which is always correct.
174 fn try_build_update_pk_fast(catalog: &Catalog, plan: &PlanNode) -> Option<UpdatePkFast> {
175 // Top level must be `Update { input: IndexScan(...), ... }`.
176 let (table, input, assignments) = match plan {
177 PlanNode::Update {
178 table,
179 input,
180 assignments,
181 } => (table, input.as_ref(), assignments),
182 _ => return None,
183 };
184 // Exactly one assignment — the bench hot path and the only case
185 // where a single byte-patch covers the whole mutation.
186 if assignments.len() != 1 {
187 return None;
188 }
189 let assn = &assignments[0];
190 // Assignment RHS must be a raw literal, not a computed expr.
191 if !matches!(assn.value, Expr::Literal(_)) {
192 return None;
193 }
194 // Input must be an IndexScan on the same table with a literal key.
195 let (key_col, key_table) = match input {
196 PlanNode::IndexScan {
197 table: t,
198 column,
199 key: Expr::Literal(_),
200 } => (column.clone(), t.clone()),
201 _ => return None,
202 };
203 if &key_table != table {
204 return None;
205 }
206
207 // Look up schema + index state from the live catalog, caching
208 // the slot so the execute path skips the name probe.
209 let table_slot = catalog.table_slot(table)?;
210 let tbl = catalog.table_by_slot(table_slot);
211 let schema = &tbl.schema;
212
213 // Key column must have an index (the btree.lookup path is what
214 // makes the fast path worth building).
215 if !tbl.has_index(&key_col) {
216 return None;
217 }
218
219 // Target column must exist, be fixed-size, and NOT be indexed (so
220 // we don't have to maintain any secondary index here).
221 let target_col_idx = schema.column_index(&assn.field)?;
222 let target_type = schema.columns[target_col_idx].type_id;
223 if !is_fixed_size(target_type) {
224 return None;
225 }
226 if tbl.has_indexed_col(target_col_idx) {
227 return None;
228 }
229
230 // Precompute byte offsets from the cached row layout.
231 let layout = tbl.row_layout();
232 let fixed_off = layout.fixed_offset(target_col_idx)?;
233 let bitmap_size = layout.bitmap_size();
234 let field_off = 2 + bitmap_size + fixed_off;
235 let bitmap_byte_off = 2 + target_col_idx / 8;
236 let bit_mask = 1u8 << (target_col_idx % 8);
237
238 // Literal walk order for `Update { IndexScan(key), [{value}] }`
239 // (see `plan_cache::substitute_plan` — input first, then the
240 // assignments). The filter key is literal 0, the assignment RHS
241 // is literal 1.
242 Some(UpdatePkFast {
243 table_slot,
244 key_col,
245 field_off,
246 bitmap_byte_off,
247 bit_mask,
248 target_type,
249 key_literal_idx: 0,
250 value_literal_idx: 1,
251 })
252 }
253
254 /// Execute a [`PreparedQuery`] with the given literal values.
255 ///
256 /// The literals are substituted into a clone of the template plan in
257 /// the same deterministic walk order that [`crate::canonicalize`]
258 /// produces (filter predicate first, then projection, then assignment
259 /// RHS, and so on). Substitution errors here mean the caller passed
260 /// the wrong number of literals for this query shape.
261 pub fn execute_prepared(
262 &mut self,
263 prep: &PreparedQuery,
264 literals: &[Literal],
265 ) -> Result<QueryResult, QueryError> {
266 if literals.len() != prep.param_count {
267 return Err(QueryError::Execution(format!(
268 "prepared query expects {} literal(s), got {}",
269 prep.param_count,
270 literals.len(),
271 )));
272 }
273
274 // Mission C Phase 14: update-by-pk fast path. Skip plan clone,
275 // substitute walk, resolved_assignments, FastPatch, Vec<RowId>,
276 // RowLayout::new — straight to btree.lookup_int + byte patch.
277 // On rare mismatches (wrong literal type, index dropped after
278 // prepare) the helper returns `Ok(None)` and we fall through to
279 // the generic substitute-and-execute path below.
280 if let Some(fast) = &prep.update_pk_fast {
281 if let Some(result) = self.try_execute_update_pk_fast(fast, literals)? {
282 // Mark dependent views dirty for prepared update fast path.
283 if let PlanNode::Update { table, .. } = &prep.plan_template {
284 self.view_registry.mark_dependents_dirty(table);
285 }
286 // Mission B (post-review): statement-boundary WAL group
287 // commit. The fast path appended an Update record but did
288 // not flush — flush it now so the executor's contract is
289 // "WAL is on disk before this returns".
290 self.catalog
291 .commit_autocommit()
292 .map_err(|e| QueryError::StorageError(e.to_string()))?;
293 return Ok(result);
294 }
295 }
296
297 // Insert fast path: skip plan-clone + substitute walk + PlanNode::Insert
298 // arm's column-index resolution. Build the Row directly from the
299 // caller's literal slice using indices we resolved at prepare time.
300 // Saves ~300-500ns per insert on the bench.
301 //
302 // Mission C Phase 13: the scratch `Vec<Value>` is reused across
303 // calls — no fresh allocation per insert. We split the borrow
304 // between `self.catalog` and `self.insert_values_scratch` by
305 // moving the scratch into a local, filling it, passing to the
306 // catalog, and putting it back.
307 //
308 // Mission C Phase 15: the cached `InsertFast` carries `n_cols`
309 // and the table name, so the hot path makes exactly one catalog
310 // HashMap lookup (`get_table_mut`) and dispatches straight into
311 // `tbl.insert` — no intermediate schema lookup, no generic
312 // `Catalog::insert` wrapper.
313 if let Some(fast) = &prep.insert_fast {
314 let mut values = std::mem::take(&mut self.insert_values_scratch);
315 values.clear();
316 values.resize(fast.n_cols, Value::Empty);
317 for (pos, lit) in literals.iter().enumerate() {
318 values[fast.col_indices[pos]] = literal_value_from(lit);
319 }
320 // Mission C Phase 18: direct O(1) slot index — no
321 // catalog hash probe. Slot was resolved at prepare time.
322 // Durability fix: route through the WAL-logging `insert_by_slot`
323 // (was the raw `Table::insert`, which bypassed the WAL and lost
324 // every prepared insert on a crash).
325 let res = self
326 .catalog
327 .insert_by_slot(fast.table_slot, &values)
328 .map_err(|e| e.to_string());
329 // Clear strings before returning the scratch — don't keep
330 // dangling allocations from the previous row alive across
331 // calls. `clear()` drops the Value::Str entries.
332 values.clear();
333 self.insert_values_scratch = values;
334 res?;
335 // Mark dependent views dirty for prepared insert fast path.
336 if let PlanNode::Insert { table, .. } = &prep.plan_template {
337 self.view_registry.mark_dependents_dirty(table);
338 }
339 // Mission B (post-review): statement-boundary WAL group commit.
340 self.catalog
341 .commit_autocommit()
342 .map_err(|e| QueryError::StorageError(e.to_string()))?;
343 return Ok(QueryResult::Modified(1));
344 }
345
346 let mut plan = prep.plan_template.clone();
347 let mut idx = 0usize;
348 crate::plan_cache::substitute_plan(&mut plan, literals, &mut idx);
349 debug_assert_eq!(idx, literals.len());
350 let result = self.execute_plan(&plan);
351 // Mission B (post-review): statement-boundary WAL group commit.
352 // No-op when nothing was buffered (read-only plans).
353 self.catalog
354 .commit_autocommit()
355 .map_err(|e| QueryError::StorageError(e.to_string()))?;
356 result
357 }
358
359 /// Mission C Phase 14: point-update fast path for prepared
360 /// `T filter .pk = ? update { col := ? }` queries. The caller has
361 /// already verified this is an int-indexed pk with a fixed-size,
362 /// non-indexed target column; all we do here is pluck the two
363 /// literals out of the caller's slice, run one `btree.lookup_int`,
364 /// and patch 1–8 bytes of the row. No plan clone, no allocations.
365 ///
366 /// Returns:
367 /// * `Ok(Some(result))` — fast path took the mutation.
368 /// * `Ok(None)` — can't take the fast path this call (wrong
369 /// literal type, index dropped since prepare, etc.). Caller
370 /// falls through to the generic substitute-and-execute path.
371 /// * `Err(_)` — real error (table gone, I/O, etc.).
372 #[inline]
373 fn try_execute_update_pk_fast(
374 &mut self,
375 fast: &UpdatePkFast,
376 literals: &[Literal],
377 ) -> Result<Option<QueryResult>, QueryError> {
378 // 1) Extract the key literal. The fast path is only built for
379 // int key columns; any other literal type means the caller
380 // is violating the prepared-query contract or the schema
381 // changed — either way, fall back.
382 let key_int = match &literals[fast.key_literal_idx] {
383 Literal::Int(v) => *v,
384 _ => return Ok(None),
385 };
386
387 // 2) Encode the new value as little-endian bytes matching the
388 // target column's fixed encoding.
389 let bytes: FixedBytes = match (fast.target_type, &literals[fast.value_literal_idx]) {
390 (TypeId::Int, Literal::Int(v)) => FixedBytes::I64(v.to_le_bytes()),
391 (TypeId::DateTime, Literal::Int(v)) => FixedBytes::I64(v.to_le_bytes()),
392 (TypeId::Float, Literal::Float(v)) => FixedBytes::F64(v.to_le_bytes()),
393 (TypeId::Bool, Literal::Bool(v)) => FixedBytes::Bool(if *v { 1 } else { 0 }),
394 // Type mismatch — fall back to the generic path for a
395 // consistent error shape.
396 _ => return Ok(None),
397 };
398
399 // 3) Look up the table + btree, do the int lookup, patch the row
400 // in place. Phase 18: table dispatch is a direct slot index;
401 // the btree lookup is the linear scan over `indexed_cols`.
402 // Single btree.lookup_int + one `with_row_bytes_mut` call.
403 // No Vec allocations at all.
404 //
405 // Mission B2: route the in-place patch through the catalog's
406 // WAL-logged wrapper so crash recovery sees the update. The
407 // extra cost is one WAL append + fsync per query — the hot
408 // loop structure is unchanged.
409 let tbl = self.catalog.table_by_slot_mut(fast.table_slot);
410 let Some(btree) = tbl.index(&fast.key_col) else {
411 // Index dropped since prepare — bail to the generic path.
412 return Ok(None);
413 };
414 let Some(rid) = btree.lookup_int(key_int) else {
415 return Ok(Some(QueryResult::Modified(0)));
416 };
417
418 let fast_table_slot = fast.table_slot;
419 let bitmap_byte_off = fast.bitmap_byte_off;
420 let bit_mask = fast.bit_mask;
421 let field_off = fast.field_off;
422 let ok = self
423 .catalog
424 .update_row_bytes_logged_by_slot(fast_table_slot, rid, |row| {
425 let base = if row.len() >= ROW_PREFIX_SIZE && &row[0..4] == ROW_MAGIC {
426 ROW_PREFIX_SIZE
427 } else {
428 0
429 };
430 // Idempotent null-bit clear — safe even when the column was
431 // already non-null (the overwhelmingly common case).
432 row[base + bitmap_byte_off] &= !bit_mask;
433 let field_bytes = bytes.as_slice();
434 row[base + field_off..base + field_off + field_bytes.len()]
435 .copy_from_slice(field_bytes);
436 })
437 .map_err(|e| QueryError::StorageError(e.to_string()))?;
438
439 Ok(Some(QueryResult::Modified(if ok { 1 } else { 0 })))
440 }
441
442 /// Mission C Phase 13: moving variant of [`Engine::execute_prepared`]
443 /// for the insert fast path. Takes `literals` by mutable reference
444 /// so that each `Literal::String` can be consumed via `mem::take`
445 /// instead of cloned into a `Value::Str`. On `insert_batch_1k` that
446 /// removes three per-row heap allocations (name, status, email),
447 /// bringing the workload over the line vs SQLite's amortized
448 /// prepare+execute loop.
449 ///
450 /// The caller's `Literal::String` entries are replaced with empty
451 /// strings on successful inserts — the `literals` slice is *not*
452 /// left in a valid-for-reuse state except for `Int`/`Float`/`Bool`
453 /// values. Non-insert templates fall through to the standard
454 /// substitute-and-execute path.
455 pub fn execute_prepared_take(
456 &mut self,
457 prep: &PreparedQuery,
458 literals: &mut [Literal],
459 ) -> Result<QueryResult, QueryError> {
460 if literals.len() != prep.param_count {
461 return Err(QueryError::Execution(format!(
462 "prepared query expects {} literal(s), got {}",
463 prep.param_count,
464 literals.len(),
465 )));
466 }
467
468 if let Some(fast) = &prep.insert_fast {
469 let mut values = std::mem::take(&mut self.insert_values_scratch);
470 values.clear();
471 values.resize(fast.n_cols, Value::Empty);
472 for (pos, lit) in literals.iter_mut().enumerate() {
473 values[fast.col_indices[pos]] = literal_value_take(lit);
474 }
475 // Mission C Phase 18: direct O(1) slot index — see
476 // `execute_prepared` for rationale. This is the hot path
477 // for `insert_batch_1k`. Durability fix: WAL-logging
478 // `insert_by_slot` (was the raw `Table::insert`).
479 let res = self
480 .catalog
481 .insert_by_slot(fast.table_slot, &values)
482 .map_err(|e| e.to_string());
483 values.clear();
484 self.insert_values_scratch = values;
485 res?;
486 // Mission B (post-review): statement-boundary WAL group commit.
487 self.catalog
488 .commit_autocommit()
489 .map_err(|e| QueryError::StorageError(e.to_string()))?;
490 return Ok(QueryResult::Modified(1));
491 }
492
493 // Non-insert templates — fall back to the standard path. We
494 // can't usefully move the literals because `substitute_plan`
495 // still expects an immutable slice, and the non-insert hot
496 // paths are dominated by plan walks anyway.
497 self.execute_prepared(prep, literals)
498 }
499
500 /// Walk an expression tree and replace every `InSubquery` node with
501 /// an `InList` by executing the subquery and collecting its first
502 /// column as literal values. This must be called before entering
503 /// the row-by-row scan loop because the scan closure can't call back
504 /// into the engine.
505 pub(super) fn materialize_subqueries(&mut self, expr: &Expr) -> Result<Expr, QueryError> {
506 match expr {
507 Expr::InSubquery {
508 expr: inner,
509 subquery,
510 negated,
511 } => {
512 if is_correlated_subquery(subquery, &self.catalog) {
513 let inner = self.materialize_subqueries(inner)?;
514 return Ok(Expr::InSubquery {
515 expr: Box::new(inner),
516 subquery: subquery.clone(),
517 negated: *negated,
518 });
519 }
520 let inner = self.materialize_subqueries(inner)?;
521 // Plan and execute the subquery.
522 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
523 .map_err(|e| QueryError::StorageError(e.to_string()))?;
524 let result = self.execute_plan(&sub_plan)?;
525 let values = match result {
526 QueryResult::Rows { rows, .. } => rows
527 .into_iter()
528 .filter_map(|mut row| {
529 if row.is_empty() {
530 None
531 } else {
532 Some(value_to_expr(row.swap_remove(0)))
533 }
534 })
535 .collect(),
536 _ => Vec::new(),
537 };
538 // WS2: byte-budget guard on the materialized IN-list.
539 self.charge_in_list(&values)?;
540 Ok(Expr::InList {
541 expr: Box::new(inner),
542 list: values,
543 negated: *negated,
544 })
545 }
546 Expr::ExistsSubquery { subquery, negated } => {
547 if is_correlated_subquery(subquery, &self.catalog) {
548 return Ok(expr.clone());
549 }
550 // Uncorrelated EXISTS: run the subquery once and collapse
551 // into a Bool literal.
552 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
553 .map_err(|e| QueryError::StorageError(e.to_string()))?;
554 let result = self.execute_plan(&sub_plan)?;
555 let has_rows = match result {
556 QueryResult::Rows { rows, .. } => !rows.is_empty(),
557 _ => false,
558 };
559 let truth = if *negated { !has_rows } else { has_rows };
560 Ok(Expr::Literal(Literal::Bool(truth)))
561 }
562 Expr::BinaryOp(l, op, r) => {
563 let l = self.materialize_subqueries(l)?;
564 let r = self.materialize_subqueries(r)?;
565 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
566 }
567 Expr::UnaryOp(op, inner) => {
568 let inner = self.materialize_subqueries(inner)?;
569 Ok(Expr::UnaryOp(*op, Box::new(inner)))
570 }
571 Expr::Case { whens, else_expr } => {
572 let whens = whens
573 .iter()
574 .map(|(c, r)| {
575 let c = self.materialize_subqueries(c)?;
576 let r = self.materialize_subqueries(r)?;
577 Ok((Box::new(c), Box::new(r)))
578 })
579 .collect::<Result<Vec<_>, QueryError>>()?;
580 let else_expr = match else_expr {
581 Some(e) => Some(Box::new(self.materialize_subqueries(e)?)),
582 None => None,
583 };
584 Ok(Expr::Case { whens, else_expr })
585 }
586 // Leaf nodes: no subqueries possible.
587 other => Ok(other.clone()),
588 }
589 }
590
591 /// Write-path per-row materialisation of correlated subqueries.
592 pub(super) fn materialize_correlated_for_row(
593 &mut self,
594 expr: &Expr,
595 outer_row: &[Value],
596 outer_columns: &[String],
597 ) -> Result<Expr, QueryError> {
598 match expr {
599 Expr::InSubquery {
600 expr: inner,
601 subquery,
602 negated,
603 } => {
604 let inner = self.materialize_correlated_for_row(inner, outer_row, outer_columns)?;
605 let mut sub = *subquery.clone();
606 if let Some(ref filter) = sub.filter {
607 sub.filter = Some(substitute_outer_refs(
608 filter,
609 &sub.source,
610 &self.catalog,
611 outer_row,
612 outer_columns,
613 ));
614 }
615 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
616 .map_err(|e| QueryError::StorageError(e.to_string()))?;
617 let result = self.execute_plan(&sub_plan)?;
618 let values = match result {
619 QueryResult::Rows { rows, .. } => rows
620 .into_iter()
621 .filter_map(|mut row| {
622 if row.is_empty() {
623 None
624 } else {
625 Some(value_to_expr(row.swap_remove(0)))
626 }
627 })
628 .collect(),
629 _ => Vec::new(),
630 };
631 Ok(Expr::InList {
632 expr: Box::new(inner),
633 list: values,
634 negated: *negated,
635 })
636 }
637 Expr::ExistsSubquery { subquery, negated } => {
638 let mut sub = *subquery.clone();
639 if let Some(ref filter) = sub.filter {
640 sub.filter = Some(substitute_outer_refs(
641 filter,
642 &sub.source,
643 &self.catalog,
644 outer_row,
645 outer_columns,
646 ));
647 }
648 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
649 .map_err(|e| QueryError::StorageError(e.to_string()))?;
650 let result = self.execute_plan(&sub_plan)?;
651 let has_rows = match result {
652 QueryResult::Rows { rows, .. } => !rows.is_empty(),
653 _ => false,
654 };
655 let truth = if *negated { !has_rows } else { has_rows };
656 Ok(Expr::Literal(Literal::Bool(truth)))
657 }
658 Expr::BinaryOp(l, op, r) => {
659 let l = self.materialize_correlated_for_row(l, outer_row, outer_columns)?;
660 let r = self.materialize_correlated_for_row(r, outer_row, outer_columns)?;
661 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
662 }
663 Expr::UnaryOp(op, inner) => {
664 let inner = self.materialize_correlated_for_row(inner, outer_row, outer_columns)?;
665 Ok(Expr::UnaryOp(*op, Box::new(inner)))
666 }
667 other => Ok(other.clone()),
668 }
669 }
670}