powdb_query/executor/prepared.rs
1//! PreparedQuery struct and related Engine methods.
2
3use crate::ast::*;
4use crate::plan::*;
5use crate::planner;
6use crate::result::{QueryError, QueryResult};
7use powdb_storage::catalog::Catalog;
8use powdb_storage::types::*;
9
10use super::compiled::*;
11use super::eval::*;
12use super::Engine;
13
14pub struct PreparedQuery {
15 plan_template: PlanNode,
16 /// Total number of `Expr::Literal` slots reachable from the plan.
17 /// Callers must supply exactly this many literals per execution.
18 pub param_count: usize,
19 /// Fast-path metadata for `PlanNode::Insert`. `Some` when:
20 /// * the template is an Insert, and
21 /// * every assignment RHS is `Expr::Literal(_)` (no computed exprs),
22 /// which means param_count == assignments.len() and the caller's
23 /// literal slice maps 1:1 to schema column indices.
24 ///
25 /// Mission C Phase 15: upgraded from a bare `Vec<usize>` to a
26 /// dedicated [`InsertFast`] struct so the execute path can skip the
27 /// second `catalog.schema(table)` HashMap lookup just to read
28 /// `n_cols`, and can dispatch through `get_table_mut` + `tbl.insert`
29 /// instead of going via the generic `catalog.insert` wrapper.
30 insert_fast: Option<InsertFast>,
31 /// Mission C Phase 14: fast-path metadata for point updates by primary
32 /// key — `T filter .pk = <lit> update { col := <lit> }` where `pk` is
33 /// an indexed column and `col` is fixed-size and not indexed. At
34 /// execute time we skip plan clone, substitute walk, schema re-lookup,
35 /// `resolved_assignments` + `FastPatch` + `matching_rids` Vec allocs,
36 /// and the whole `PlanNode::Update` arm. Just a btree lookup and a
37 /// byte patch.
38 update_pk_fast: Option<UpdatePkFast>,
39}
40
41/// Mission C Phase 15: precomputed insert fast-path metadata. Built once
42/// in [`Engine::prepare`] from a `PlanNode::Insert` template whose every
43/// assignment RHS is a raw literal. The execute path reads `n_cols` and
44/// `col_indices` directly — no catalog schema lookup needed.
45#[derive(Clone)]
46struct InsertFast {
47 /// Mission C Phase 18: cached slot index into `Catalog::tables`.
48 /// Resolved once at `prepare` time and stable for the lifetime of
49 /// the catalog (PowDB has no DROP TABLE). Lets the hot path dispatch
50 /// through `catalog.table_by_slot_mut(slot)` — a pure Vec index,
51 /// no hash, no bucket walk, no string compare.
52 table_slot: usize,
53 /// Schema column index for each positional literal, in the order the
54 /// caller passes them.
55 col_indices: Vec<usize>,
56 /// Total number of schema columns — the size `insert_values_scratch`
57 /// must be resized to before filling positions via `col_indices`.
58 /// Cached here so the hot loop skips `catalog.schema(table)` entirely.
59 n_cols: usize,
60}
61
62/// Mission C Phase 14: precomputed fast-path for `update_by_pk` shaped
63/// prepared queries. Built once in [`Engine::prepare`] and reused on every
64/// `execute_prepared` call.
65#[derive(Clone)]
66struct UpdatePkFast {
67 /// Mission C Phase 18: cached slot index into `Catalog::tables`.
68 /// Resolved once at `prepare` time and stable for the lifetime of
69 /// the catalog. At a 52ns total budget the swap from FxHashMap
70 /// probe to a Vec index is measurable.
71 table_slot: usize,
72 /// Name of the key column (the `.id = ?` side). We look this up in
73 /// the owning table's `indexed_cols` at execute time rather than
74 /// caching a raw `&BTree` — the engine owns the catalog and can't
75 /// hand out long-lived borrows anyway, and the n≤5 linear scan is
76 /// a handful of ns.
77 key_col: String,
78 /// Byte offset of the target fixed column in the row encoding:
79 /// `2 + bitmap_size + layout.fixed_offsets[target_col]`.
80 field_off: usize,
81 /// Byte offset of the bitmap byte containing the target column's null
82 /// bit (`2 + target_col / 8`).
83 bitmap_byte_off: usize,
84 /// Bit mask for the target column's null bit.
85 bit_mask: u8,
86 /// Type of the target fixed column — drives the literal-to-bytes
87 /// encoding at execute time.
88 target_type: TypeId,
89 /// Index into the caller's `literals` slice that holds the filter key.
90 /// Always 0 today (filter literal is visited before the assignment
91 /// RHS), but stored explicitly so the contract is obvious.
92 key_literal_idx: usize,
93 /// Index into the caller's `literals` slice that holds the new value.
94 value_literal_idx: usize,
95}
96
97impl Engine {
98 pub fn prepare(&mut self, query: &str) -> Result<PreparedQuery, QueryError> {
99 let plan = planner::plan(query).map_err(|e| QueryError::Parse(e.to_string()))?;
100 let param_count = crate::plan_cache::count_literal_slots(&plan);
101
102 // Insert fast path: if the template is Insert and every assignment
103 // RHS is a literal, resolve column indices once here and store
104 // them. execute_prepared will skip the plan-clone + substitute
105 // walk on this path.
106 //
107 // Mission C Phase 15: also cache `n_cols` and the target table
108 // name so execute_prepared doesn't need a second HashMap lookup
109 // on `self.catalog.schema(table)` just to size the scratch Vec.
110 let insert_fast = match &plan {
111 PlanNode::Insert { table, assignments }
112 if assignments
113 .iter()
114 .all(|a| matches!(a.value, Expr::Literal(_)))
115 && param_count == assignments.len() =>
116 {
117 let table_slot = self
118 .catalog
119 .table_slot(table)
120 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
121 let schema = &self.catalog.table_by_slot(table_slot).schema;
122 let n_cols = schema.columns.len();
123 let indices: Result<Vec<usize>, QueryError> = assignments
124 .iter()
125 .map(|a| {
126 schema
127 .column_index(&a.field)
128 .ok_or_else(|| QueryError::ColumnNotFound {
129 table: table.clone(),
130 column: a.field.clone(),
131 })
132 })
133 .collect();
134 Some(InsertFast {
135 table_slot,
136 col_indices: indices?,
137 n_cols,
138 })
139 }
140 _ => None,
141 };
142
143 // Mission C Phase 14: update-by-pk fast path. Match on the shape
144 // planner::plan_update builds for `T filter .pk = ? update
145 // { col := ? }` — `Update { input: IndexScan(pk), assignments:
146 // [{col, Literal}] }` — and only if every precondition holds:
147 // * `pk` is an indexed column (so the executor would take the
148 // btree.lookup path at run time regardless)
149 // * there's exactly one assignment
150 // * the assigned column is fixed-size and *not* indexed (so we
151 // don't have to maintain any secondary index on write)
152 // * both literal slots are already `Expr::Literal` (no computed
153 // expressions)
154 // If any of these fail we fall through to the standard substitute
155 // + execute path.
156 let update_pk_fast = Self::try_build_update_pk_fast(&self.catalog, &plan);
157
158 Ok(PreparedQuery {
159 plan_template: plan,
160 param_count,
161 insert_fast,
162 update_pk_fast,
163 })
164 }
165
166 /// Mission C Phase 14: inspect a planned tree and, if it matches the
167 /// `update_by_pk` fast-path shape, return the precomputed byte-patch
168 /// metadata. Returns `None` on any mismatch — the caller falls through
169 /// to the substitute-and-execute path, which is always correct.
170 fn try_build_update_pk_fast(catalog: &Catalog, plan: &PlanNode) -> Option<UpdatePkFast> {
171 // Top level must be `Update { input: IndexScan(...), ... }`.
172 let (table, input, assignments) = match plan {
173 PlanNode::Update {
174 table,
175 input,
176 assignments,
177 } => (table, input.as_ref(), assignments),
178 _ => return None,
179 };
180 // Exactly one assignment — the bench hot path and the only case
181 // where a single byte-patch covers the whole mutation.
182 if assignments.len() != 1 {
183 return None;
184 }
185 let assn = &assignments[0];
186 // Assignment RHS must be a raw literal, not a computed expr.
187 if !matches!(assn.value, Expr::Literal(_)) {
188 return None;
189 }
190 // Input must be an IndexScan on the same table with a literal key.
191 let (key_col, key_table) = match input {
192 PlanNode::IndexScan {
193 table: t,
194 column,
195 key: Expr::Literal(_),
196 } => (column.clone(), t.clone()),
197 _ => return None,
198 };
199 if &key_table != table {
200 return None;
201 }
202
203 // Look up schema + index state from the live catalog, caching
204 // the slot so the execute path skips the name probe.
205 let table_slot = catalog.table_slot(table)?;
206 let tbl = catalog.table_by_slot(table_slot);
207 let schema = &tbl.schema;
208
209 // Key column must have an index (the btree.lookup path is what
210 // makes the fast path worth building).
211 if !tbl.has_index(&key_col) {
212 return None;
213 }
214
215 // Target column must exist, be fixed-size, and NOT be indexed (so
216 // we don't have to maintain any secondary index here).
217 let target_col_idx = schema.column_index(&assn.field)?;
218 let target_type = schema.columns[target_col_idx].type_id;
219 if !is_fixed_size(target_type) {
220 return None;
221 }
222 if tbl.has_indexed_col(target_col_idx) {
223 return None;
224 }
225
226 // Precompute byte offsets from the cached row layout.
227 let layout = tbl.row_layout();
228 let fixed_off = layout.fixed_offset(target_col_idx)?;
229 let bitmap_size = layout.bitmap_size();
230 let field_off = 2 + bitmap_size + fixed_off;
231 let bitmap_byte_off = 2 + target_col_idx / 8;
232 let bit_mask = 1u8 << (target_col_idx % 8);
233
234 // Literal walk order for `Update { IndexScan(key), [{value}] }`
235 // (see `plan_cache::substitute_plan` — input first, then the
236 // assignments). The filter key is literal 0, the assignment RHS
237 // is literal 1.
238 Some(UpdatePkFast {
239 table_slot,
240 key_col,
241 field_off,
242 bitmap_byte_off,
243 bit_mask,
244 target_type,
245 key_literal_idx: 0,
246 value_literal_idx: 1,
247 })
248 }
249
250 /// Execute a [`PreparedQuery`] with the given literal values.
251 ///
252 /// The literals are substituted into a clone of the template plan in
253 /// the same deterministic walk order that [`crate::canonicalize`]
254 /// produces (filter predicate first, then projection, then assignment
255 /// RHS, and so on). Substitution errors here mean the caller passed
256 /// the wrong number of literals for this query shape.
257 pub fn execute_prepared(
258 &mut self,
259 prep: &PreparedQuery,
260 literals: &[Literal],
261 ) -> Result<QueryResult, QueryError> {
262 if literals.len() != prep.param_count {
263 return Err(QueryError::Execution(format!(
264 "prepared query expects {} literal(s), got {}",
265 prep.param_count,
266 literals.len(),
267 )));
268 }
269
270 // Mission C Phase 14: update-by-pk fast path. Skip plan clone,
271 // substitute walk, resolved_assignments, FastPatch, Vec<RowId>,
272 // RowLayout::new — straight to btree.lookup_int + byte patch.
273 // On rare mismatches (wrong literal type, index dropped after
274 // prepare) the helper returns `Ok(None)` and we fall through to
275 // the generic substitute-and-execute path below.
276 if let Some(fast) = &prep.update_pk_fast {
277 if let Some(result) = self.try_execute_update_pk_fast(fast, literals)? {
278 // Mark dependent views dirty for prepared update fast path.
279 if let PlanNode::Update { table, .. } = &prep.plan_template {
280 self.view_registry.mark_dependents_dirty(table);
281 }
282 // Mission B (post-review): statement-boundary WAL group
283 // commit. The fast path appended an Update record but did
284 // not flush — flush it now so the executor's contract is
285 // "WAL is on disk before this returns".
286 self.catalog
287 .sync_wal()
288 .map_err(|e| QueryError::StorageError(e.to_string()))?;
289 return Ok(result);
290 }
291 }
292
293 // Insert fast path: skip plan-clone + substitute walk + PlanNode::Insert
294 // arm's column-index resolution. Build the Row directly from the
295 // caller's literal slice using indices we resolved at prepare time.
296 // Saves ~300-500ns per insert on the bench.
297 //
298 // Mission C Phase 13: the scratch `Vec<Value>` is reused across
299 // calls — no fresh allocation per insert. We split the borrow
300 // between `self.catalog` and `self.insert_values_scratch` by
301 // moving the scratch into a local, filling it, passing to the
302 // catalog, and putting it back.
303 //
304 // Mission C Phase 15: the cached `InsertFast` carries `n_cols`
305 // and the table name, so the hot path makes exactly one catalog
306 // HashMap lookup (`get_table_mut`) and dispatches straight into
307 // `tbl.insert` — no intermediate schema lookup, no generic
308 // `Catalog::insert` wrapper.
309 if let Some(fast) = &prep.insert_fast {
310 let mut values = std::mem::take(&mut self.insert_values_scratch);
311 values.clear();
312 values.resize(fast.n_cols, Value::Empty);
313 for (pos, lit) in literals.iter().enumerate() {
314 values[fast.col_indices[pos]] = literal_value_from(lit);
315 }
316 // Mission C Phase 18: direct O(1) slot index — no
317 // catalog hash probe. Slot was resolved at prepare time.
318 let tbl = self.catalog.table_by_slot_mut(fast.table_slot);
319 let res = tbl.insert(&values).map_err(|e| e.to_string());
320 // Clear strings before returning the scratch — don't keep
321 // dangling allocations from the previous row alive across
322 // calls. `clear()` drops the Value::Str entries.
323 values.clear();
324 self.insert_values_scratch = values;
325 res?;
326 // Mark dependent views dirty for prepared insert fast path.
327 if let PlanNode::Insert { table, .. } = &prep.plan_template {
328 self.view_registry.mark_dependents_dirty(table);
329 }
330 // Mission B (post-review): statement-boundary WAL group commit.
331 self.catalog
332 .sync_wal()
333 .map_err(|e| QueryError::StorageError(e.to_string()))?;
334 return Ok(QueryResult::Modified(1));
335 }
336
337 let mut plan = prep.plan_template.clone();
338 let mut idx = 0usize;
339 crate::plan_cache::substitute_plan(&mut plan, literals, &mut idx);
340 debug_assert_eq!(idx, literals.len());
341 let result = self.execute_plan(&plan);
342 // Mission B (post-review): statement-boundary WAL group commit.
343 // No-op when nothing was buffered (read-only plans).
344 self.catalog
345 .sync_wal()
346 .map_err(|e| QueryError::StorageError(e.to_string()))?;
347 result
348 }
349
350 /// Mission C Phase 14: point-update fast path for prepared
351 /// `T filter .pk = ? update { col := ? }` queries. The caller has
352 /// already verified this is an int-indexed pk with a fixed-size,
353 /// non-indexed target column; all we do here is pluck the two
354 /// literals out of the caller's slice, run one `btree.lookup_int`,
355 /// and patch 1–8 bytes of the row. No plan clone, no allocations.
356 ///
357 /// Returns:
358 /// * `Ok(Some(result))` — fast path took the mutation.
359 /// * `Ok(None)` — can't take the fast path this call (wrong
360 /// literal type, index dropped since prepare, etc.). Caller
361 /// falls through to the generic substitute-and-execute path.
362 /// * `Err(_)` — real error (table gone, I/O, etc.).
363 #[inline]
364 fn try_execute_update_pk_fast(
365 &mut self,
366 fast: &UpdatePkFast,
367 literals: &[Literal],
368 ) -> Result<Option<QueryResult>, QueryError> {
369 // 1) Extract the key literal. The fast path is only built for
370 // int key columns; any other literal type means the caller
371 // is violating the prepared-query contract or the schema
372 // changed — either way, fall back.
373 let key_int = match &literals[fast.key_literal_idx] {
374 Literal::Int(v) => *v,
375 _ => return Ok(None),
376 };
377
378 // 2) Encode the new value as little-endian bytes matching the
379 // target column's fixed encoding.
380 let bytes: FixedBytes = match (fast.target_type, &literals[fast.value_literal_idx]) {
381 (TypeId::Int, Literal::Int(v)) => FixedBytes::I64(v.to_le_bytes()),
382 (TypeId::DateTime, Literal::Int(v)) => FixedBytes::I64(v.to_le_bytes()),
383 (TypeId::Float, Literal::Float(v)) => FixedBytes::F64(v.to_le_bytes()),
384 (TypeId::Bool, Literal::Bool(v)) => FixedBytes::Bool(if *v { 1 } else { 0 }),
385 // Type mismatch — fall back to the generic path for a
386 // consistent error shape.
387 _ => return Ok(None),
388 };
389
390 // 3) Look up the table + btree, do the int lookup, patch the row
391 // in place. Phase 18: table dispatch is a direct slot index;
392 // the btree lookup is the linear scan over `indexed_cols`.
393 // Single btree.lookup_int + one `with_row_bytes_mut` call.
394 // No Vec allocations at all.
395 //
396 // Mission B2: route the in-place patch through the catalog's
397 // WAL-logged wrapper so crash recovery sees the update. The
398 // extra cost is one WAL append + fsync per query — the hot
399 // loop structure is unchanged.
400 let tbl = self.catalog.table_by_slot_mut(fast.table_slot);
401 let Some(btree) = tbl.index(&fast.key_col) else {
402 // Index dropped since prepare — bail to the generic path.
403 return Ok(None);
404 };
405 let Some(rid) = btree.lookup_int(key_int) else {
406 return Ok(Some(QueryResult::Modified(0)));
407 };
408
409 let fast_table_slot = fast.table_slot;
410 let bitmap_byte_off = fast.bitmap_byte_off;
411 let bit_mask = fast.bit_mask;
412 let field_off = fast.field_off;
413 let ok = self
414 .catalog
415 .update_row_bytes_logged_by_slot(fast_table_slot, rid, |row| {
416 // Idempotent null-bit clear — safe even when the column was
417 // already non-null (the overwhelmingly common case).
418 row[bitmap_byte_off] &= !bit_mask;
419 let field_bytes = bytes.as_slice();
420 row[field_off..field_off + field_bytes.len()].copy_from_slice(field_bytes);
421 })
422 .map_err(|e| QueryError::StorageError(e.to_string()))?;
423
424 Ok(Some(QueryResult::Modified(if ok { 1 } else { 0 })))
425 }
426
427 /// Mission C Phase 13: moving variant of [`Engine::execute_prepared`]
428 /// for the insert fast path. Takes `literals` by mutable reference
429 /// so that each `Literal::String` can be consumed via `mem::take`
430 /// instead of cloned into a `Value::Str`. On `insert_batch_1k` that
431 /// removes three per-row heap allocations (name, status, email),
432 /// bringing the workload over the line vs SQLite's amortized
433 /// prepare+execute loop.
434 ///
435 /// The caller's `Literal::String` entries are replaced with empty
436 /// strings on successful inserts — the `literals` slice is *not*
437 /// left in a valid-for-reuse state except for `Int`/`Float`/`Bool`
438 /// values. Non-insert templates fall through to the standard
439 /// substitute-and-execute path.
440 pub fn execute_prepared_take(
441 &mut self,
442 prep: &PreparedQuery,
443 literals: &mut [Literal],
444 ) -> Result<QueryResult, QueryError> {
445 if literals.len() != prep.param_count {
446 return Err(QueryError::Execution(format!(
447 "prepared query expects {} literal(s), got {}",
448 prep.param_count,
449 literals.len(),
450 )));
451 }
452
453 if let Some(fast) = &prep.insert_fast {
454 let mut values = std::mem::take(&mut self.insert_values_scratch);
455 values.clear();
456 values.resize(fast.n_cols, Value::Empty);
457 for (pos, lit) in literals.iter_mut().enumerate() {
458 values[fast.col_indices[pos]] = literal_value_take(lit);
459 }
460 // Mission C Phase 18: direct O(1) slot index — see
461 // `execute_prepared` for rationale. This is the hot path
462 // for `insert_batch_1k`.
463 let tbl = self.catalog.table_by_slot_mut(fast.table_slot);
464 let res = tbl.insert(&values).map_err(|e| e.to_string());
465 values.clear();
466 self.insert_values_scratch = values;
467 res?;
468 // Mission B (post-review): statement-boundary WAL group commit.
469 self.catalog
470 .sync_wal()
471 .map_err(|e| QueryError::StorageError(e.to_string()))?;
472 return Ok(QueryResult::Modified(1));
473 }
474
475 // Non-insert templates — fall back to the standard path. We
476 // can't usefully move the literals because `substitute_plan`
477 // still expects an immutable slice, and the non-insert hot
478 // paths are dominated by plan walks anyway.
479 self.execute_prepared(prep, literals)
480 }
481
482 /// Walk an expression tree and replace every `InSubquery` node with
483 /// an `InList` by executing the subquery and collecting its first
484 /// column as literal values. This must be called before entering
485 /// the row-by-row scan loop because the scan closure can't call back
486 /// into the engine.
487 pub(super) fn materialize_subqueries(&mut self, expr: &Expr) -> Result<Expr, QueryError> {
488 match expr {
489 Expr::InSubquery {
490 expr: inner,
491 subquery,
492 negated,
493 } => {
494 if is_correlated_subquery(subquery, &self.catalog) {
495 let inner = self.materialize_subqueries(inner)?;
496 return Ok(Expr::InSubquery {
497 expr: Box::new(inner),
498 subquery: subquery.clone(),
499 negated: *negated,
500 });
501 }
502 let inner = self.materialize_subqueries(inner)?;
503 // Plan and execute the subquery.
504 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
505 .map_err(|e| QueryError::StorageError(e.to_string()))?;
506 let result = self.execute_plan(&sub_plan)?;
507 let values = match result {
508 QueryResult::Rows { rows, .. } => rows
509 .into_iter()
510 .filter_map(|mut row| {
511 if row.is_empty() {
512 None
513 } else {
514 Some(value_to_expr(row.swap_remove(0)))
515 }
516 })
517 .collect(),
518 _ => Vec::new(),
519 };
520 Ok(Expr::InList {
521 expr: Box::new(inner),
522 list: values,
523 negated: *negated,
524 })
525 }
526 Expr::ExistsSubquery { subquery, negated } => {
527 if is_correlated_subquery(subquery, &self.catalog) {
528 return Ok(expr.clone());
529 }
530 // Uncorrelated EXISTS: run the subquery once and collapse
531 // into a Bool literal.
532 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
533 .map_err(|e| QueryError::StorageError(e.to_string()))?;
534 let result = self.execute_plan(&sub_plan)?;
535 let has_rows = match result {
536 QueryResult::Rows { rows, .. } => !rows.is_empty(),
537 _ => false,
538 };
539 let truth = if *negated { !has_rows } else { has_rows };
540 Ok(Expr::Literal(Literal::Bool(truth)))
541 }
542 Expr::BinaryOp(l, op, r) => {
543 let l = self.materialize_subqueries(l)?;
544 let r = self.materialize_subqueries(r)?;
545 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
546 }
547 Expr::UnaryOp(op, inner) => {
548 let inner = self.materialize_subqueries(inner)?;
549 Ok(Expr::UnaryOp(*op, Box::new(inner)))
550 }
551 Expr::Case { whens, else_expr } => {
552 let whens = whens
553 .iter()
554 .map(|(c, r)| {
555 let c = self.materialize_subqueries(c)?;
556 let r = self.materialize_subqueries(r)?;
557 Ok((Box::new(c), Box::new(r)))
558 })
559 .collect::<Result<Vec<_>, QueryError>>()?;
560 let else_expr = match else_expr {
561 Some(e) => Some(Box::new(self.materialize_subqueries(e)?)),
562 None => None,
563 };
564 Ok(Expr::Case { whens, else_expr })
565 }
566 // Leaf nodes: no subqueries possible.
567 other => Ok(other.clone()),
568 }
569 }
570
571 /// Write-path per-row materialisation of correlated subqueries.
572 pub(super) fn materialize_correlated_for_row(
573 &mut self,
574 expr: &Expr,
575 outer_row: &[Value],
576 outer_columns: &[String],
577 ) -> Result<Expr, QueryError> {
578 match expr {
579 Expr::InSubquery {
580 expr: inner,
581 subquery,
582 negated,
583 } => {
584 let inner = self.materialize_correlated_for_row(inner, outer_row, outer_columns)?;
585 let mut sub = *subquery.clone();
586 if let Some(ref filter) = sub.filter {
587 sub.filter = Some(substitute_outer_refs(
588 filter,
589 &sub.source,
590 &self.catalog,
591 outer_row,
592 outer_columns,
593 ));
594 }
595 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
596 .map_err(|e| QueryError::StorageError(e.to_string()))?;
597 let result = self.execute_plan(&sub_plan)?;
598 let values = match result {
599 QueryResult::Rows { rows, .. } => rows
600 .into_iter()
601 .filter_map(|mut row| {
602 if row.is_empty() {
603 None
604 } else {
605 Some(value_to_expr(row.swap_remove(0)))
606 }
607 })
608 .collect(),
609 _ => Vec::new(),
610 };
611 Ok(Expr::InList {
612 expr: Box::new(inner),
613 list: values,
614 negated: *negated,
615 })
616 }
617 Expr::ExistsSubquery { subquery, negated } => {
618 let mut sub = *subquery.clone();
619 if let Some(ref filter) = sub.filter {
620 sub.filter = Some(substitute_outer_refs(
621 filter,
622 &sub.source,
623 &self.catalog,
624 outer_row,
625 outer_columns,
626 ));
627 }
628 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
629 .map_err(|e| QueryError::StorageError(e.to_string()))?;
630 let result = self.execute_plan(&sub_plan)?;
631 let has_rows = match result {
632 QueryResult::Rows { rows, .. } => !rows.is_empty(),
633 _ => false,
634 };
635 let truth = if *negated { !has_rows } else { has_rows };
636 Ok(Expr::Literal(Literal::Bool(truth)))
637 }
638 Expr::BinaryOp(l, op, r) => {
639 let l = self.materialize_correlated_for_row(l, outer_row, outer_columns)?;
640 let r = self.materialize_correlated_for_row(r, outer_row, outer_columns)?;
641 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
642 }
643 Expr::UnaryOp(op, inner) => {
644 let inner = self.materialize_correlated_for_row(inner, outer_row, outer_columns)?;
645 Ok(Expr::UnaryOp(*op, Box::new(inner)))
646 }
647 other => Ok(other.clone()),
648 }
649 }
650}