powdb_query/executor/prepared.rs
1//! PreparedQuery struct and related Engine methods.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::planner;
6use crate::result::{QueryError, QueryResult};
7use powdb_storage::catalog::Catalog;
8use powdb_storage::types::*;
9
10use super::compiled::*;
11use super::eval::*;
12use super::Engine;
13
14pub struct PreparedQuery {
15 plan_template: PlanNode,
16 /// Total number of `Expr::Literal` slots reachable from the plan.
17 /// Callers must supply exactly this many literals per execution.
18 pub param_count: usize,
19 /// Fast-path metadata for `PlanNode::Insert`. `Some` when:
20 /// * the template is an Insert, and
21 /// * every assignment RHS is `Expr::Literal(_)` (no computed exprs),
22 /// which means param_count == assignments.len() and the caller's
23 /// literal slice maps 1:1 to schema column indices.
24 ///
25 /// Mission C Phase 15: upgraded from a bare `Vec<usize>` to a
26 /// dedicated [`InsertFast`] struct so the execute path can skip the
27 /// second `catalog.schema(table)` HashMap lookup just to read
28 /// `n_cols`, and can dispatch through `get_table_mut` + `tbl.insert`
29 /// instead of going via the generic `catalog.insert` wrapper.
30 insert_fast: Option<InsertFast>,
31 /// Mission C Phase 14: fast-path metadata for point updates by primary
32 /// key — `T filter .pk = <lit> update { col := <lit> }` where `pk` is
33 /// an indexed column and `col` is fixed-size and not indexed. At
34 /// execute time we skip plan clone, substitute walk, schema re-lookup,
35 /// `resolved_assignments` + `FastPatch` + `matching_rids` Vec allocs,
36 /// and the whole `PlanNode::Update` arm. Just a btree lookup and a
37 /// byte patch.
38 update_pk_fast: Option<UpdatePkFast>,
39}
40
41/// Mission C Phase 15: precomputed insert fast-path metadata. Built once
42/// in [`Engine::prepare`] from a `PlanNode::Insert` template whose every
43/// assignment RHS is a raw literal. The execute path reads `n_cols` and
44/// `col_indices` directly — no catalog schema lookup needed.
45#[derive(Clone)]
46struct InsertFast {
47 /// Mission C Phase 18: cached slot index into `Catalog::tables`.
48 /// Resolved once at `prepare` time and stable for the lifetime of
49 /// the catalog (PowDB has no DROP TABLE). Lets the hot path dispatch
50 /// through `catalog.table_by_slot_mut(slot)` — a pure Vec index,
51 /// no hash, no bucket walk, no string compare.
52 table_slot: usize,
53 /// Schema column index for each positional literal, in the order the
54 /// caller passes them.
55 col_indices: Vec<usize>,
56 /// Total number of schema columns — the size `insert_values_scratch`
57 /// must be resized to before filling positions via `col_indices`.
58 /// Cached here so the hot loop skips `catalog.schema(table)` entirely.
59 n_cols: usize,
60}
61
62/// Mission C Phase 14: precomputed fast-path for `update_by_pk` shaped
63/// prepared queries. Built once in [`Engine::prepare`] and reused on every
64/// `execute_prepared` call.
65#[derive(Clone)]
66struct UpdatePkFast {
67 /// Mission C Phase 18: cached slot index into `Catalog::tables`.
68 /// Resolved once at `prepare` time and stable for the lifetime of
69 /// the catalog. At a 52ns total budget the swap from FxHashMap
70 /// probe to a Vec index is measurable.
71 table_slot: usize,
72 /// Name of the key column (the `.id = ?` side). We look this up in
73 /// the owning table's `indexed_cols` at execute time rather than
74 /// caching a raw `&BTree` — the engine owns the catalog and can't
75 /// hand out long-lived borrows anyway, and the n≤5 linear scan is
76 /// a handful of ns.
77 key_col: String,
78 /// Byte offset of the target fixed column in the row encoding:
79 /// `2 + bitmap_size + layout.fixed_offsets[target_col]`.
80 field_off: usize,
81 /// Byte offset of the bitmap byte containing the target column's null
82 /// bit (`2 + target_col / 8`).
83 bitmap_byte_off: usize,
84 /// Bit mask for the target column's null bit.
85 bit_mask: u8,
86 /// Type of the target fixed column — drives the literal-to-bytes
87 /// encoding at execute time.
88 target_type: TypeId,
89 /// Index into the caller's `literals` slice that holds the filter key.
90 /// Always 0 today (filter literal is visited before the assignment
91 /// RHS), but stored explicitly so the contract is obvious.
92 key_literal_idx: usize,
93 /// Index into the caller's `literals` slice that holds the new value.
94 value_literal_idx: usize,
95}
96
97impl Engine {
98 pub fn prepare(&mut self, query: &str) -> Result<PreparedQuery, QueryError> {
99 let plan = planner::plan(query).map_err(|e| QueryError::Parse(e.to_string()))?;
100 let param_count = crate::plan_cache::count_literal_slots(&plan);
101
102 // Insert fast path: if the template is Insert and every assignment
103 // RHS is a literal, resolve column indices once here and store
104 // them. execute_prepared will skip the plan-clone + substitute
105 // walk on this path.
106 //
107 // Mission C Phase 15: also cache `n_cols` and the target table
108 // name so execute_prepared doesn't need a second HashMap lookup
109 // on `self.catalog.schema(table)` just to size the scratch Vec.
110 let insert_fast = match &plan {
111 // Single-row inserts only: the byte-level fast path patches one
112 // row's worth of scratch. Multi-row `insert T {..},{..}` falls
113 // through to the generic plan path (always correct).
114 PlanNode::Insert { table, rows }
115 if rows.len() == 1
116 && rows[0].iter().all(|a| matches!(a.value, Expr::Literal(_)))
117 && param_count == rows[0].len() =>
118 {
119 let assignments = &rows[0];
120 let table_slot = self
121 .catalog
122 .table_slot(table)
123 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
124 let schema = &self.catalog.table_by_slot(table_slot).schema;
125 let n_cols = schema.columns.len();
126 let indices: Result<Vec<usize>, QueryError> = assignments
127 .iter()
128 .map(|a| {
129 schema
130 .column_index(&a.field)
131 .ok_or_else(|| QueryError::ColumnNotFound {
132 table: table.clone(),
133 column: a.field.clone(),
134 })
135 })
136 .collect();
137 Some(InsertFast {
138 table_slot,
139 col_indices: indices?,
140 n_cols,
141 })
142 }
143 _ => None,
144 };
145
146 // Mission C Phase 14: update-by-pk fast path. Match on the shape
147 // planner::plan_update builds for `T filter .pk = ? update
148 // { col := ? }` — `Update { input: IndexScan(pk), assignments:
149 // [{col, Literal}] }` — and only if every precondition holds:
150 // * `pk` is an indexed column (so the executor would take the
151 // btree.lookup path at run time regardless)
152 // * there's exactly one assignment
153 // * the assigned column is fixed-size and *not* indexed (so we
154 // don't have to maintain any secondary index on write)
155 // * both literal slots are already `Expr::Literal` (no computed
156 // expressions)
157 // If any of these fail we fall through to the standard substitute
158 // + execute path.
159 let update_pk_fast = Self::try_build_update_pk_fast(&self.catalog, &plan);
160
161 Ok(PreparedQuery {
162 plan_template: plan,
163 param_count,
164 insert_fast,
165 update_pk_fast,
166 })
167 }
168
169 /// Mission C Phase 14: inspect a planned tree and, if it matches the
170 /// `update_by_pk` fast-path shape, return the precomputed byte-patch
171 /// metadata. Returns `None` on any mismatch — the caller falls through
172 /// to the substitute-and-execute path, which is always correct.
173 fn try_build_update_pk_fast(catalog: &Catalog, plan: &PlanNode) -> Option<UpdatePkFast> {
174 // Top level must be `Update { input: IndexScan(...), ... }`.
175 let (table, input, assignments) = match plan {
176 PlanNode::Update {
177 table,
178 input,
179 assignments,
180 } => (table, input.as_ref(), assignments),
181 _ => return None,
182 };
183 // Exactly one assignment — the bench hot path and the only case
184 // where a single byte-patch covers the whole mutation.
185 if assignments.len() != 1 {
186 return None;
187 }
188 let assn = &assignments[0];
189 // Assignment RHS must be a raw literal, not a computed expr.
190 if !matches!(assn.value, Expr::Literal(_)) {
191 return None;
192 }
193 // Input must be an IndexScan on the same table with a literal key.
194 let (key_col, key_table) = match input {
195 PlanNode::IndexScan {
196 table: t,
197 column,
198 key: Expr::Literal(_),
199 } => (column.clone(), t.clone()),
200 _ => return None,
201 };
202 if &key_table != table {
203 return None;
204 }
205
206 // Look up schema + index state from the live catalog, caching
207 // the slot so the execute path skips the name probe.
208 let table_slot = catalog.table_slot(table)?;
209 let tbl = catalog.table_by_slot(table_slot);
210 let schema = &tbl.schema;
211
212 // Key column must have an index (the btree.lookup path is what
213 // makes the fast path worth building).
214 if !tbl.has_index(&key_col) {
215 return None;
216 }
217
218 // Target column must exist, be fixed-size, and NOT be indexed (so
219 // we don't have to maintain any secondary index here).
220 let target_col_idx = schema.column_index(&assn.field)?;
221 let target_type = schema.columns[target_col_idx].type_id;
222 if !is_fixed_size(target_type) {
223 return None;
224 }
225 if tbl.has_indexed_col(target_col_idx) {
226 return None;
227 }
228
229 // Precompute byte offsets from the cached row layout.
230 let layout = tbl.row_layout();
231 let fixed_off = layout.fixed_offset(target_col_idx)?;
232 let bitmap_size = layout.bitmap_size();
233 let field_off = 2 + bitmap_size + fixed_off;
234 let bitmap_byte_off = 2 + target_col_idx / 8;
235 let bit_mask = 1u8 << (target_col_idx % 8);
236
237 // Literal walk order for `Update { IndexScan(key), [{value}] }`
238 // (see `plan_cache::substitute_plan` — input first, then the
239 // assignments). The filter key is literal 0, the assignment RHS
240 // is literal 1.
241 Some(UpdatePkFast {
242 table_slot,
243 key_col,
244 field_off,
245 bitmap_byte_off,
246 bit_mask,
247 target_type,
248 key_literal_idx: 0,
249 value_literal_idx: 1,
250 })
251 }
252
253 /// Execute a [`PreparedQuery`] with the given literal values.
254 ///
255 /// The literals are substituted into a clone of the template plan in
256 /// the same deterministic walk order that [`crate::canonicalize`]
257 /// produces (filter predicate first, then projection, then assignment
258 /// RHS, and so on). Substitution errors here mean the caller passed
259 /// the wrong number of literals for this query shape.
260 pub fn execute_prepared(
261 &mut self,
262 prep: &PreparedQuery,
263 literals: &[Literal],
264 ) -> Result<QueryResult, QueryError> {
265 if literals.len() != prep.param_count {
266 return Err(QueryError::Execution(format!(
267 "prepared query expects {} literal(s), got {}",
268 prep.param_count,
269 literals.len(),
270 )));
271 }
272
273 // Mission C Phase 14: update-by-pk fast path. Skip plan clone,
274 // substitute walk, resolved_assignments, FastPatch, Vec<RowId>,
275 // RowLayout::new — straight to btree.lookup_int + byte patch.
276 // On rare mismatches (wrong literal type, index dropped after
277 // prepare) the helper returns `Ok(None)` and we fall through to
278 // the generic substitute-and-execute path below.
279 if let Some(fast) = &prep.update_pk_fast {
280 if let Some(result) = self.try_execute_update_pk_fast(fast, literals)? {
281 // Mark dependent views dirty for prepared update fast path.
282 if let PlanNode::Update { table, .. } = &prep.plan_template {
283 self.view_registry.mark_dependents_dirty(table);
284 }
285 // Mission B (post-review): statement-boundary WAL group
286 // commit. The fast path appended an Update record but did
287 // not flush — flush it now so the executor's contract is
288 // "WAL is on disk before this returns".
289 self.catalog
290 .sync_wal()
291 .map_err(|e| QueryError::StorageError(e.to_string()))?;
292 return Ok(result);
293 }
294 }
295
296 // Insert fast path: skip plan-clone + substitute walk + PlanNode::Insert
297 // arm's column-index resolution. Build the Row directly from the
298 // caller's literal slice using indices we resolved at prepare time.
299 // Saves ~300-500ns per insert on the bench.
300 //
301 // Mission C Phase 13: the scratch `Vec<Value>` is reused across
302 // calls — no fresh allocation per insert. We split the borrow
303 // between `self.catalog` and `self.insert_values_scratch` by
304 // moving the scratch into a local, filling it, passing to the
305 // catalog, and putting it back.
306 //
307 // Mission C Phase 15: the cached `InsertFast` carries `n_cols`
308 // and the table name, so the hot path makes exactly one catalog
309 // HashMap lookup (`get_table_mut`) and dispatches straight into
310 // `tbl.insert` — no intermediate schema lookup, no generic
311 // `Catalog::insert` wrapper.
312 if let Some(fast) = &prep.insert_fast {
313 let mut values = std::mem::take(&mut self.insert_values_scratch);
314 values.clear();
315 values.resize(fast.n_cols, Value::Empty);
316 for (pos, lit) in literals.iter().enumerate() {
317 values[fast.col_indices[pos]] = literal_value_from(lit);
318 }
319 // Mission C Phase 18: direct O(1) slot index — no
320 // catalog hash probe. Slot was resolved at prepare time.
321 // Durability fix: route through the WAL-logging `insert_by_slot`
322 // (was the raw `Table::insert`, which bypassed the WAL and lost
323 // every prepared insert on a crash).
324 let res = self
325 .catalog
326 .insert_by_slot(fast.table_slot, &values)
327 .map_err(|e| e.to_string());
328 // Clear strings before returning the scratch — don't keep
329 // dangling allocations from the previous row alive across
330 // calls. `clear()` drops the Value::Str entries.
331 values.clear();
332 self.insert_values_scratch = values;
333 res?;
334 // Mark dependent views dirty for prepared insert fast path.
335 if let PlanNode::Insert { table, .. } = &prep.plan_template {
336 self.view_registry.mark_dependents_dirty(table);
337 }
338 // Mission B (post-review): statement-boundary WAL group commit.
339 self.catalog
340 .sync_wal()
341 .map_err(|e| QueryError::StorageError(e.to_string()))?;
342 return Ok(QueryResult::Modified(1));
343 }
344
345 let mut plan = prep.plan_template.clone();
346 let mut idx = 0usize;
347 crate::plan_cache::substitute_plan(&mut plan, literals, &mut idx);
348 debug_assert_eq!(idx, literals.len());
349 let result = self.execute_plan(&plan);
350 // Mission B (post-review): statement-boundary WAL group commit.
351 // No-op when nothing was buffered (read-only plans).
352 self.catalog
353 .sync_wal()
354 .map_err(|e| QueryError::StorageError(e.to_string()))?;
355 result
356 }
357
358 /// Mission C Phase 14: point-update fast path for prepared
359 /// `T filter .pk = ? update { col := ? }` queries. The caller has
360 /// already verified this is an int-indexed pk with a fixed-size,
361 /// non-indexed target column; all we do here is pluck the two
362 /// literals out of the caller's slice, run one `btree.lookup_int`,
363 /// and patch 1–8 bytes of the row. No plan clone, no allocations.
364 ///
365 /// Returns:
366 /// * `Ok(Some(result))` — fast path took the mutation.
367 /// * `Ok(None)` — can't take the fast path this call (wrong
368 /// literal type, index dropped since prepare, etc.). Caller
369 /// falls through to the generic substitute-and-execute path.
370 /// * `Err(_)` — real error (table gone, I/O, etc.).
371 #[inline]
372 fn try_execute_update_pk_fast(
373 &mut self,
374 fast: &UpdatePkFast,
375 literals: &[Literal],
376 ) -> Result<Option<QueryResult>, QueryError> {
377 // 1) Extract the key literal. The fast path is only built for
378 // int key columns; any other literal type means the caller
379 // is violating the prepared-query contract or the schema
380 // changed — either way, fall back.
381 let key_int = match &literals[fast.key_literal_idx] {
382 Literal::Int(v) => *v,
383 _ => return Ok(None),
384 };
385
386 // 2) Encode the new value as little-endian bytes matching the
387 // target column's fixed encoding.
388 let bytes: FixedBytes = match (fast.target_type, &literals[fast.value_literal_idx]) {
389 (TypeId::Int, Literal::Int(v)) => FixedBytes::I64(v.to_le_bytes()),
390 (TypeId::DateTime, Literal::Int(v)) => FixedBytes::I64(v.to_le_bytes()),
391 (TypeId::Float, Literal::Float(v)) => FixedBytes::F64(v.to_le_bytes()),
392 (TypeId::Bool, Literal::Bool(v)) => FixedBytes::Bool(if *v { 1 } else { 0 }),
393 // Type mismatch — fall back to the generic path for a
394 // consistent error shape.
395 _ => return Ok(None),
396 };
397
398 // 3) Look up the table + btree, do the int lookup, patch the row
399 // in place. Phase 18: table dispatch is a direct slot index;
400 // the btree lookup is the linear scan over `indexed_cols`.
401 // Single btree.lookup_int + one `with_row_bytes_mut` call.
402 // No Vec allocations at all.
403 //
404 // Mission B2: route the in-place patch through the catalog's
405 // WAL-logged wrapper so crash recovery sees the update. The
406 // extra cost is one WAL append + fsync per query — the hot
407 // loop structure is unchanged.
408 let tbl = self.catalog.table_by_slot_mut(fast.table_slot);
409 let Some(btree) = tbl.index(&fast.key_col) else {
410 // Index dropped since prepare — bail to the generic path.
411 return Ok(None);
412 };
413 let Some(rid) = btree.lookup_int(key_int) else {
414 return Ok(Some(QueryResult::Modified(0)));
415 };
416
417 let fast_table_slot = fast.table_slot;
418 let bitmap_byte_off = fast.bitmap_byte_off;
419 let bit_mask = fast.bit_mask;
420 let field_off = fast.field_off;
421 let ok = self
422 .catalog
423 .update_row_bytes_logged_by_slot(fast_table_slot, rid, |row| {
424 // Idempotent null-bit clear — safe even when the column was
425 // already non-null (the overwhelmingly common case).
426 row[bitmap_byte_off] &= !bit_mask;
427 let field_bytes = bytes.as_slice();
428 row[field_off..field_off + field_bytes.len()].copy_from_slice(field_bytes);
429 })
430 .map_err(|e| QueryError::StorageError(e.to_string()))?;
431
432 Ok(Some(QueryResult::Modified(if ok { 1 } else { 0 })))
433 }
434
435 /// Mission C Phase 13: moving variant of [`Engine::execute_prepared`]
436 /// for the insert fast path. Takes `literals` by mutable reference
437 /// so that each `Literal::String` can be consumed via `mem::take`
438 /// instead of cloned into a `Value::Str`. On `insert_batch_1k` that
439 /// removes three per-row heap allocations (name, status, email),
440 /// bringing the workload over the line vs SQLite's amortized
441 /// prepare+execute loop.
442 ///
443 /// The caller's `Literal::String` entries are replaced with empty
444 /// strings on successful inserts — the `literals` slice is *not*
445 /// left in a valid-for-reuse state except for `Int`/`Float`/`Bool`
446 /// values. Non-insert templates fall through to the standard
447 /// substitute-and-execute path.
448 pub fn execute_prepared_take(
449 &mut self,
450 prep: &PreparedQuery,
451 literals: &mut [Literal],
452 ) -> Result<QueryResult, QueryError> {
453 if literals.len() != prep.param_count {
454 return Err(QueryError::Execution(format!(
455 "prepared query expects {} literal(s), got {}",
456 prep.param_count,
457 literals.len(),
458 )));
459 }
460
461 if let Some(fast) = &prep.insert_fast {
462 let mut values = std::mem::take(&mut self.insert_values_scratch);
463 values.clear();
464 values.resize(fast.n_cols, Value::Empty);
465 for (pos, lit) in literals.iter_mut().enumerate() {
466 values[fast.col_indices[pos]] = literal_value_take(lit);
467 }
468 // Mission C Phase 18: direct O(1) slot index — see
469 // `execute_prepared` for rationale. This is the hot path
470 // for `insert_batch_1k`. Durability fix: WAL-logging
471 // `insert_by_slot` (was the raw `Table::insert`).
472 let res = self
473 .catalog
474 .insert_by_slot(fast.table_slot, &values)
475 .map_err(|e| e.to_string());
476 values.clear();
477 self.insert_values_scratch = values;
478 res?;
479 // Mission B (post-review): statement-boundary WAL group commit.
480 self.catalog
481 .sync_wal()
482 .map_err(|e| QueryError::StorageError(e.to_string()))?;
483 return Ok(QueryResult::Modified(1));
484 }
485
486 // Non-insert templates — fall back to the standard path. We
487 // can't usefully move the literals because `substitute_plan`
488 // still expects an immutable slice, and the non-insert hot
489 // paths are dominated by plan walks anyway.
490 self.execute_prepared(prep, literals)
491 }
492
493 /// Walk an expression tree and replace every `InSubquery` node with
494 /// an `InList` by executing the subquery and collecting its first
495 /// column as literal values. This must be called before entering
496 /// the row-by-row scan loop because the scan closure can't call back
497 /// into the engine.
498 pub(super) fn materialize_subqueries(&mut self, expr: &Expr) -> Result<Expr, QueryError> {
499 match expr {
500 Expr::InSubquery {
501 expr: inner,
502 subquery,
503 negated,
504 } => {
505 if is_correlated_subquery(subquery, &self.catalog) {
506 let inner = self.materialize_subqueries(inner)?;
507 return Ok(Expr::InSubquery {
508 expr: Box::new(inner),
509 subquery: subquery.clone(),
510 negated: *negated,
511 });
512 }
513 let inner = self.materialize_subqueries(inner)?;
514 // Plan and execute the subquery.
515 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
516 .map_err(|e| QueryError::StorageError(e.to_string()))?;
517 let result = self.execute_plan(&sub_plan)?;
518 let values = match result {
519 QueryResult::Rows { rows, .. } => rows
520 .into_iter()
521 .filter_map(|mut row| {
522 if row.is_empty() {
523 None
524 } else {
525 Some(value_to_expr(row.swap_remove(0)))
526 }
527 })
528 .collect(),
529 _ => Vec::new(),
530 };
531 // WS2: byte-budget guard on the materialized IN-list.
532 self.charge_in_list(&values)?;
533 Ok(Expr::InList {
534 expr: Box::new(inner),
535 list: values,
536 negated: *negated,
537 })
538 }
539 Expr::ExistsSubquery { subquery, negated } => {
540 if is_correlated_subquery(subquery, &self.catalog) {
541 return Ok(expr.clone());
542 }
543 // Uncorrelated EXISTS: run the subquery once and collapse
544 // into a Bool literal.
545 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
546 .map_err(|e| QueryError::StorageError(e.to_string()))?;
547 let result = self.execute_plan(&sub_plan)?;
548 let has_rows = match result {
549 QueryResult::Rows { rows, .. } => !rows.is_empty(),
550 _ => false,
551 };
552 let truth = if *negated { !has_rows } else { has_rows };
553 Ok(Expr::Literal(Literal::Bool(truth)))
554 }
555 Expr::BinaryOp(l, op, r) => {
556 let l = self.materialize_subqueries(l)?;
557 let r = self.materialize_subqueries(r)?;
558 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
559 }
560 Expr::UnaryOp(op, inner) => {
561 let inner = self.materialize_subqueries(inner)?;
562 Ok(Expr::UnaryOp(*op, Box::new(inner)))
563 }
564 Expr::Case { whens, else_expr } => {
565 let whens = whens
566 .iter()
567 .map(|(c, r)| {
568 let c = self.materialize_subqueries(c)?;
569 let r = self.materialize_subqueries(r)?;
570 Ok((Box::new(c), Box::new(r)))
571 })
572 .collect::<Result<Vec<_>, QueryError>>()?;
573 let else_expr = match else_expr {
574 Some(e) => Some(Box::new(self.materialize_subqueries(e)?)),
575 None => None,
576 };
577 Ok(Expr::Case { whens, else_expr })
578 }
579 // Leaf nodes: no subqueries possible.
580 other => Ok(other.clone()),
581 }
582 }
583
584 /// Write-path per-row materialisation of correlated subqueries.
585 pub(super) fn materialize_correlated_for_row(
586 &mut self,
587 expr: &Expr,
588 outer_row: &[Value],
589 outer_columns: &[String],
590 ) -> Result<Expr, QueryError> {
591 match expr {
592 Expr::InSubquery {
593 expr: inner,
594 subquery,
595 negated,
596 } => {
597 let inner = self.materialize_correlated_for_row(inner, outer_row, outer_columns)?;
598 let mut sub = *subquery.clone();
599 if let Some(ref filter) = sub.filter {
600 sub.filter = Some(substitute_outer_refs(
601 filter,
602 &sub.source,
603 &self.catalog,
604 outer_row,
605 outer_columns,
606 ));
607 }
608 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
609 .map_err(|e| QueryError::StorageError(e.to_string()))?;
610 let result = self.execute_plan(&sub_plan)?;
611 let values = match result {
612 QueryResult::Rows { rows, .. } => rows
613 .into_iter()
614 .filter_map(|mut row| {
615 if row.is_empty() {
616 None
617 } else {
618 Some(value_to_expr(row.swap_remove(0)))
619 }
620 })
621 .collect(),
622 _ => Vec::new(),
623 };
624 Ok(Expr::InList {
625 expr: Box::new(inner),
626 list: values,
627 negated: *negated,
628 })
629 }
630 Expr::ExistsSubquery { subquery, negated } => {
631 let mut sub = *subquery.clone();
632 if let Some(ref filter) = sub.filter {
633 sub.filter = Some(substitute_outer_refs(
634 filter,
635 &sub.source,
636 &self.catalog,
637 outer_row,
638 outer_columns,
639 ));
640 }
641 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
642 .map_err(|e| QueryError::StorageError(e.to_string()))?;
643 let result = self.execute_plan(&sub_plan)?;
644 let has_rows = match result {
645 QueryResult::Rows { rows, .. } => !rows.is_empty(),
646 _ => false,
647 };
648 let truth = if *negated { !has_rows } else { has_rows };
649 Ok(Expr::Literal(Literal::Bool(truth)))
650 }
651 Expr::BinaryOp(l, op, r) => {
652 let l = self.materialize_correlated_for_row(l, outer_row, outer_columns)?;
653 let r = self.materialize_correlated_for_row(r, outer_row, outer_columns)?;
654 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
655 }
656 Expr::UnaryOp(op, inner) => {
657 let inner = self.materialize_correlated_for_row(inner, outer_row, outer_columns)?;
658 Ok(Expr::UnaryOp(*op, Box::new(inner)))
659 }
660 other => Ok(other.clone()),
661 }
662 }
663}