powdb_query/executor/mod.rs
1//! PowDB query executor.
2
3// Submodules that don't use macros defined in this file.
4mod compiled;
5mod eval;
6pub mod mem_budget;
7
8use crate::ast::*;
9use crate::canonicalize::canonicalize;
10use crate::plan::*;
11use crate::plan_cache::PlanCache;
12use crate::planner;
13use crate::result::{QueryError, QueryResult};
14use powdb_storage::catalog::Catalog;
15use powdb_storage::row::{decode_column, decode_row, RowLayout, ROW_MAGIC, ROW_PREFIX_SIZE};
16use powdb_storage::types::*;
17use powdb_storage::view::ViewRegistry;
18
19use std::io;
20use std::path::Path;
21use std::sync::Mutex;
22use std::time::Instant;
23use tracing::{error, info, Level};
24
25use self::compiled::*;
26use self::eval::*;
27
28/// Legacy sentinel string constant — kept for backward compatibility with
29/// any external code matching on the string representation. New code should
30/// match on `QueryError::ReadonlyNeedsWrite` directly.
31pub const READONLY_NEEDS_WRITE: &str = "__POWDB_READONLY_NEEDS_WRITE__";
32
33/// Return the byte offset where the row body starts.
34///
35/// v0.5 rows begin with the `PROW` magic/version prefix. Legacy rows start
36/// directly with the row body. Raw executor fast paths must add this base
37/// before reading body-relative bitmap/data offsets.
38#[inline]
39pub(crate) fn row_body_base(row: &[u8]) -> usize {
40 if row.len() >= ROW_PREFIX_SIZE && &row[0..4] == ROW_MAGIC {
41 ROW_PREFIX_SIZE
42 } else {
43 0
44 }
45}
46
47/// Query frontend dialect. PowQL remains the default/native dialect; SQL is
48/// an explicit frontend that lowers to the same AST before planning.
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum QueryDialect {
51 PowQL,
52 Sql,
53}
54
55/// Plan cache capacity. Bench workloads fill ~15 slots; real apps will sit
56/// comfortably in 256. Lookup is O(1), collisions clear the cache (see
57/// `plan_cache::PlanCache::insert`).
58const PLAN_CACHE_CAPACITY: usize = 256;
59
60/// Maximum number of rows a join may produce before the executor aborts.
61/// Prevents Cartesian-product blowups (e.g. `T cross join T` on 10K rows
62/// would produce 100M rows in memory without this cap).
63pub(super) const MAX_JOIN_ROWS: usize = 1_000_000;
64
65/// Maximum number of rows that may be materialized for sorting.
66/// Queries that exceed this should add a LIMIT clause to narrow the input
67/// before sorting.
68pub(super) const MAX_SORT_ROWS: usize = 10_000_000;
69
70#[inline]
71pub(super) fn check_join_limit(row_count: usize) -> Result<(), QueryError> {
72 if row_count > MAX_JOIN_ROWS {
73 return Err(QueryError::JoinLimitExceeded);
74 }
75 Ok(())
76}
77
78// ─── Mission D11 Phase 1: scalar hot-loop helpers ─────────────────────────
79//
80// These macros expand into the scan body of `agg_single_col_fast` and sit
81// inside the `for_each_row_raw` closure. They exist to:
82//
83// 1. Split the loop on presence of a predicate *outside* the hot body,
84// so the no-predicate path (agg_sum/agg_min/agg_max bench workloads)
85// never pays the `Option<CompiledPredicate>` branch per row.
86// 2. Drop two bounds checks per row by reading the null bitmap byte
87// and the 8-byte value via raw pointer casts.
88//
89// SAFETY (shared across every call site below):
90//
91// - `$bmp_byte` is `col_idx / 8` where `col_idx < n_cols`, and the row body
92// encoding stores `bitmap_size = n_cols.div_ceil(8)` bytes of bitmap
93// starting at body offset 2. So `bmp_off = row_body_base(row) + 2 +
94// $bmp_byte < row_len`, and `get_unchecked(bmp_off)` is inside the
95// row slice.
96// - `$off = 2 + bitmap_size + fixed_offsets[col_idx]` is body-relative for a fixed-size
97// column. Every fixed-size column contributes `fixed_size(type_id)`
98// bytes to the fixed region, so the row always has
99// `[data_off .. data_off + 8]` available for any i64/f64 column, where
100// `data_off = row_body_base(row) + $off` — enforced by the row encoder
101// (`storage/src/row.rs`) and the schema invariant that a row with a
102// given schema has enough body bytes for `2 + bitmap_size + fixed_region_size`.
103// - Both macros are only invoked from `agg_single_col_fast`, which
104// early-returns if the column isn't Int/Float (8-byte fixed) and
105// early-returns if `fast.fixed_offsets[col_idx]` is `None`.
106macro_rules! agg_int_loop {
107 (
108 $self:expr, $table:expr, $pred:expr,
109 $bmp_byte:expr, $bmp_bit:expr, $off:expr,
110 |$v:ident : i64| $body:block
111 ) => {{
112 let bmp_byte = $bmp_byte;
113 let bmp_bit = $bmp_bit;
114 let off = $off;
115 if let Some(pred) = &$pred {
116 $self
117 .catalog
118 .for_each_row_raw($table, |_rid, data| {
119 if !pred(data) {
120 return;
121 }
122 let base = row_body_base(data);
123 let bmp_off = base + 2 + bmp_byte;
124 let data_off = base + off;
125 // Bounds guard: skip corrupt/truncated rows that are too
126 // short to contain the bitmap byte or the 8-byte value.
127 if bmp_off >= data.len() || data_off + 8 > data.len() {
128 return;
129 }
130 // SAFETY: `bmp_off < data.len()` is checked above.
131 // The bitmap byte lives at body offset 2..2+bitmap_size in
132 // the row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
133 // Corrupt rows are rejected by the bounds guard.
134 let bmp = unsafe { *data.get_unchecked(bmp_off) };
135 if (bmp >> bmp_bit) & 1 == 1 {
136 return;
137 }
138 // SAFETY: `data_off + 8 <= data.len()` is checked above.
139 // `data_off = base + 2 + bitmap_size + fixed_offsets[col_idx]`
140 // points to an 8-byte i64 in the fixed-size region of the row.
141 // The pointer cast is valid because we read exactly 8
142 // bytes via from_le_bytes. Corrupt rows are rejected by
143 // the bounds guard.
144 let $v: i64 = unsafe {
145 i64::from_le_bytes(*(data.as_ptr().add(data_off) as *const [u8; 8]))
146 };
147 $body
148 })
149 .map_err(|e| QueryError::StorageError(e.to_string()))?;
150 } else {
151 $self
152 .catalog
153 .for_each_row_raw($table, |_rid, data| {
154 let base = row_body_base(data);
155 let bmp_off = base + 2 + bmp_byte;
156 let data_off = base + off;
157 // Bounds guard: skip corrupt/truncated rows.
158 if bmp_off >= data.len() || data_off + 8 > data.len() {
159 return;
160 }
161 // SAFETY: `bmp_off < data.len()` is checked above.
162 // See the predicate branch for the full invariant.
163 let bmp = unsafe { *data.get_unchecked(bmp_off) };
164 if (bmp >> bmp_bit) & 1 == 1 {
165 return;
166 }
167 // SAFETY: `data_off + 8 <= data.len()` is checked above.
168 // See the predicate branch for the full invariant.
169 let $v: i64 = unsafe {
170 i64::from_le_bytes(*(data.as_ptr().add(data_off) as *const [u8; 8]))
171 };
172 $body
173 })
174 .map_err(|e| QueryError::StorageError(e.to_string()))?;
175 }
176 }};
177}
178
179macro_rules! agg_float_loop {
180 (
181 $self:expr, $table:expr, $pred:expr,
182 $bmp_byte:expr, $bmp_bit:expr, $off:expr,
183 |$v:ident : f64| $body:block
184 ) => {{
185 let bmp_byte = $bmp_byte;
186 let bmp_bit = $bmp_bit;
187 let off = $off;
188 if let Some(pred) = &$pred {
189 $self
190 .catalog
191 .for_each_row_raw($table, |_rid, data| {
192 if !pred(data) {
193 return;
194 }
195 let base = row_body_base(data);
196 let bmp_off = base + 2 + bmp_byte;
197 let data_off = base + off;
198 // Bounds guard: skip corrupt/truncated rows that are too
199 // short to contain the bitmap byte or the 8-byte value.
200 if bmp_off >= data.len() || data_off + 8 > data.len() {
201 return;
202 }
203 // SAFETY: `bmp_off < data.len()` is checked above.
204 // The bitmap byte lives at body offset 2..2+bitmap_size in
205 // the row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
206 // Corrupt rows are rejected by the bounds guard.
207 let bmp = unsafe { *data.get_unchecked(bmp_off) };
208 if (bmp >> bmp_bit) & 1 == 1 {
209 return;
210 }
211 // SAFETY: `data_off + 8 <= data.len()` is checked above.
212 // `data_off = base + 2 + bitmap_size + fixed_offsets[col_idx]`
213 // points to an 8-byte f64 in the fixed-size region of the row.
214 // The pointer cast is valid because we read exactly 8
215 // bytes via from_le_bytes. Corrupt rows are rejected by
216 // the bounds guard.
217 let $v: f64 = unsafe {
218 f64::from_le_bytes(*(data.as_ptr().add(data_off) as *const [u8; 8]))
219 };
220 $body
221 })
222 .map_err(|e| QueryError::StorageError(e.to_string()))?;
223 } else {
224 $self
225 .catalog
226 .for_each_row_raw($table, |_rid, data| {
227 let base = row_body_base(data);
228 let bmp_off = base + 2 + bmp_byte;
229 let data_off = base + off;
230 // Bounds guard: skip corrupt/truncated rows.
231 if bmp_off >= data.len() || data_off + 8 > data.len() {
232 return;
233 }
234 // SAFETY: `bmp_off < data.len()` is checked above.
235 // See the predicate branch for the full invariant.
236 let bmp = unsafe { *data.get_unchecked(bmp_off) };
237 if (bmp >> bmp_bit) & 1 == 1 {
238 return;
239 }
240 // SAFETY: `data_off + 8 <= data.len()` is checked above.
241 // See the predicate branch for the full invariant.
242 let $v: f64 = unsafe {
243 f64::from_le_bytes(*(data.as_ptr().add(data_off) as *const [u8; 8]))
244 };
245 $body
246 })
247 .map_err(|e| QueryError::StorageError(e.to_string()))?;
248 }
249 }};
250}
251
252// Submodules that use the macros above — must be declared after macro_rules!.
253mod plan_exec;
254mod prepared;
255
256#[cfg(test)]
257mod tests;
258
259// Re-exports for the public API
260pub use self::prepared::PreparedQuery;
261
262use self::plan_exec::{
263 compute_group_aggregate, execute_window, format_plan_tree, hash_join, lower_unindexed_scans,
264 range_matches, synthesize_range_predicate, try_extract_equi_join_keys,
265};
266
267/// Mission infra-1: classify a parsed statement as read-only vs. mutating.
268/// Used by [`Engine::execute_powql_readonly`] and by the server handler
269/// to decide between the RwLock reader and writer sides. `Union` recurses
270/// because each side can independently be read/write (though in practice
271/// both sides are reads — the parser only builds Union from query shapes).
272pub fn is_read_only_statement(stmt: &Statement) -> bool {
273 match stmt {
274 Statement::Query(_) => true,
275 Statement::Union(u) => is_read_only_statement(&u.left) && is_read_only_statement(&u.right),
276 Statement::Insert(_)
277 | Statement::Upsert(_)
278 | Statement::UpdateQuery(_)
279 | Statement::DeleteQuery(_)
280 | Statement::CreateType(_)
281 | Statement::AlterTable(_)
282 | Statement::DropTable(_)
283 | Statement::CreateView(_)
284 | Statement::RefreshView(_)
285 | Statement::DropView(_) => false,
286 Statement::Begin | Statement::Commit | Statement::Rollback => false,
287 Statement::Explain(inner) => is_read_only_statement(inner),
288 }
289}
290
291pub struct Engine {
292 catalog: Catalog,
293 /// Mission D9 — cached parsed+planned query trees keyed by canonical
294 /// hash. Saves the ~3μs parse+plan cost on repeat queries that differ
295 /// only in literal values.
296 ///
297 /// Mission infra-1: wrapped in `Mutex` so the read path can be driven
298 /// by `&self`. The critical section is extremely short — a single
299 /// hashmap lookup + plan clone on a hit, or a single insert on a miss.
300 /// A full `RwLock` would be over-engineered here; the contention window
301 /// is smaller than the read-path scan work it gates.
302 plan_cache: Mutex<PlanCache>,
303 /// Mission C Phase 13: reusable `Vec<Value>` scratch buffer for the
304 /// prepared-insert fast path. `execute_prepared` used to allocate a
305 /// fresh `vec![Value::Empty; n_cols]` on every insert; recycling this
306 /// buffer shaves one heap alloc per row on `insert_batch_1k`.
307 insert_values_scratch: Vec<Value>,
308 /// Materialized view registry: tracks view definitions, dependencies,
309 /// and dirty state. Views are backed by regular catalog tables; this
310 /// registry adds the lifecycle metadata.
311 view_registry: ViewRegistry,
312 in_transaction: bool,
313 /// WS2 — per-query memory budget ceiling (bytes). The running total lives
314 /// in a thread-local (see [`mem_budget`]) and is reset at every top-level
315 /// query entry, so sort/join/GROUP BY/IN-list materialization can be capped
316 /// without OOM-killing the process. This field holds only the *limit* (a
317 /// plain `usize`, so `Engine` stays `Sync` for the concurrent read path).
318 /// Default [`mem_budget::DEFAULT_QUERY_MEMORY_LIMIT`] (256 MB); overridable
319 /// via `Engine::with_memory_limit` (server reads `POWDB_QUERY_MEMORY_LIMIT`).
320 query_memory_limit: usize,
321}
322
323impl Engine {
324 /// Open or create a PowDB engine rooted at `data_dir`.
325 ///
326 /// If the directory already contains a catalog, it is reopened.
327 /// Otherwise a fresh empty database is created.
328 ///
329 /// # Examples
330 ///
331 /// ```
332 /// use powdb_query::executor::Engine;
333 ///
334 /// let dir = tempfile::tempdir().unwrap();
335 /// let engine = Engine::new(dir.path()).unwrap();
336 /// // Engine is ready — the directory now contains a catalog.
337 /// ```
338 pub fn new(data_dir: &Path) -> io::Result<Self> {
339 powdb_storage::create_data_dir_secure(data_dir)?;
340 // Try to reopen an existing database first; only create a fresh
341 // catalog when there isn't one already on disk.
342 let catalog = match Catalog::open(data_dir) {
343 Ok(c) => {
344 info!(data_dir = %data_dir.display(), "engine reopened existing database");
345 c
346 }
347 Err(e) if e.kind() == io::ErrorKind::NotFound => {
348 info!(data_dir = %data_dir.display(), "engine initialized fresh database");
349 Catalog::create(data_dir)?
350 }
351 Err(e) => return Err(e),
352 };
353 let view_registry =
354 ViewRegistry::open(data_dir).unwrap_or_else(|_| ViewRegistry::new(data_dir));
355 Ok(Engine {
356 catalog,
357 plan_cache: Mutex::new(PlanCache::new(PLAN_CACHE_CAPACITY)),
358 insert_values_scratch: Vec::new(),
359 view_registry,
360 in_transaction: false,
361 query_memory_limit: mem_budget::DEFAULT_QUERY_MEMORY_LIMIT,
362 })
363 }
364
365 /// Open or create an engine with an explicit per-query memory limit
366 /// (bytes). Used by the server to apply `POWDB_QUERY_MEMORY_LIMIT`, and by
367 /// tests that need a tiny limit to exercise the budget guard.
368 pub fn with_memory_limit(data_dir: &Path, limit_bytes: usize) -> io::Result<Self> {
369 let mut engine = Engine::new(data_dir)?;
370 engine.set_query_memory_limit(limit_bytes);
371 Ok(engine)
372 }
373
374 /// Current per-query memory limit in bytes.
375 pub fn query_memory_limit(&self) -> usize {
376 self.query_memory_limit
377 }
378
379 /// Override the per-query memory limit in bytes (builder-style).
380 pub fn set_query_memory_limit(&mut self, limit_bytes: usize) {
381 self.query_memory_limit = limit_bytes;
382 }
383
384 /// Enter a budgeted-statement frame for the current query. The returned
385 /// guard must be held for the duration of the statement; on its drop the
386 /// reentrancy depth is decremented. Only the *outermost* statement entry
387 /// zeroes this thread's running total, so a nested `execute_powql` (the
388 /// source query of a `create_view`/`refresh_view`) does NOT discard the
389 /// outer frame's accounting. The accumulator is thread-local, so this never
390 /// touches another concurrent query's total.
391 #[must_use = "the budget guard must outlive the statement body"]
392 pub(super) fn enter_memory_budget(&self) -> mem_budget::EnterGuard {
393 mem_budget::enter()
394 }
395
396 /// Charge the estimated footprint of a freshly materialized batch of rows
397 /// against the current per-query budget. Returns
398 /// [`QueryError::MemoryLimitExceeded`] cleanly if the batch would push the
399 /// query over its limit. Used at every full-materialization point (sort
400 /// buffer, join build side, GROUP BY hash table, IN-list).
401 pub(super) fn charge_rows(&self, rows: &[Vec<Value>]) -> Result<(), QueryError> {
402 let mut total = 0usize;
403 for row in rows {
404 total = total.saturating_add(mem_budget::estimate_row_size(row));
405 }
406 mem_budget::charge(total, self.query_memory_limit)
407 }
408
409 /// Charge a materialized IN-list (the literal expressions pulled out of an
410 /// uncorrelated `IN (subquery)`) against the current per-query budget.
411 /// Each item is conservatively sized at the `Expr` slot plus, for string
412 /// literals, the owned heap bytes.
413 pub(super) fn charge_in_list(&self, list: &[crate::ast::Expr]) -> Result<(), QueryError> {
414 let base = std::mem::size_of::<crate::ast::Expr>();
415 let mut total = std::mem::size_of::<Vec<crate::ast::Expr>>();
416 for item in list {
417 total = total.saturating_add(base);
418 if let crate::ast::Expr::Literal(crate::ast::Literal::String(s)) = item {
419 total = total.saturating_add(s.capacity());
420 }
421 }
422 mem_budget::charge(total, self.query_memory_limit)
423 }
424
425 /// Dispatch to the requested query frontend.
426 pub fn execute_with_dialect(
427 &mut self,
428 dialect: QueryDialect,
429 input: &str,
430 ) -> Result<QueryResult, QueryError> {
431 match dialect {
432 QueryDialect::PowQL => self.execute_powql(input),
433 QueryDialect::Sql => self.execute_sql(input),
434 }
435 }
436
437 /// Read-only variant of [`Engine::execute_with_dialect`].
438 pub fn execute_readonly_with_dialect(
439 &self,
440 dialect: QueryDialect,
441 input: &str,
442 ) -> Result<QueryResult, QueryError> {
443 match dialect {
444 QueryDialect::PowQL => self.execute_powql_readonly(input),
445 QueryDialect::Sql => self.execute_sql_readonly(input),
446 }
447 }
448
449 /// Parse + plan + execute a PowQL query.
450 ///
451 /// # Examples
452 ///
453 /// ```
454 /// use powdb_query::executor::Engine;
455 /// use powdb_query::result::QueryResult;
456 ///
457 /// let dir = tempfile::tempdir().unwrap();
458 /// let mut engine = Engine::new(dir.path()).unwrap();
459 ///
460 /// // Create a table and insert a row.
461 /// engine.execute_powql("type User { required name: str, age: int }").unwrap();
462 /// engine.execute_powql(r#"insert User { name := "Alice", age := 30 }"#).unwrap();
463 ///
464 /// // Query rows back.
465 /// let result = engine.execute_powql("User").unwrap();
466 /// assert_eq!(result.row_count(), 1);
467 /// ```
468 ///
469 /// Mission D6 — tracing collapse: the previous implementation ran 4
470 /// `Instant::now()` + 3 `elapsed().as_micros()` calls + formatted an
471 /// `info!` span on every query, even when tracing was disabled. On a
472 /// sub-microsecond `point_lookup_indexed` call that overhead was
473 /// 100-200ns — 20%+ of the whole query. We now measure time only when
474 /// INFO is actually enabled via `tracing::enabled!`, and we moved the
475 /// noisy `debug!(?plan)` line behind the same gate so the Debug
476 /// formatter can't run unconditionally either.
477 ///
478 /// Mission D9 — plan cache: on the hot path we canonicalise the query
479 /// text (lex + FNV-1a hash with literal values stripped), check the
480 /// cache, and on a hit substitute the new literals into a clone of the
481 /// cached plan. This skips re-lexing, re-parsing, and re-planning —
482 /// around 3μs per call on bench workloads. On a miss we plan as before
483 /// and insert the plan under its canonical hash.
484 pub fn execute_powql(&mut self, input: &str) -> Result<QueryResult, QueryError> {
485 // WS2: each *outermost* statement starts with the full memory
486 // allowance. The guard holds the reentrancy depth so a nested
487 // `execute_powql` (e.g. a view's source query) does not reset the
488 // outer frame's accounting mid-statement.
489 let _budget = self.enter_memory_budget();
490 // Hot path: tracing disabled. Zero syscalls, zero formatting.
491 if !tracing::enabled!(Level::INFO) {
492 // D9: try the plan cache first. Canonicalisation lexes the
493 // query once; on a hit we skip the parser and planner entirely.
494 if let Ok((hash, literals)) = canonicalize(input) {
495 let cached = self
496 .plan_cache
497 .lock()
498 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
499 .get_with_substitution(hash, &literals);
500 if let Some(plan) = cached {
501 let plan = lower_unindexed_scans(&self.catalog, &plan);
502 let result = self.execute_plan(&plan);
503 // Mission B (post-review): statement-boundary WAL
504 // group commit. Catalog::wal_log now only appends;
505 // the fsync happens here exactly once per statement.
506 // `sync_wal` is a no-op when nothing was buffered
507 // (pure reads pay zero fsync).
508 if !self.in_transaction {
509 self.catalog
510 .commit_autocommit()
511 .map_err(|e| QueryError::StorageError(e.to_string()))?;
512 }
513 return result;
514 }
515 // Miss — plan, insert, execute.
516 return match planner::plan(input) {
517 Ok(plan) => {
518 self.plan_cache
519 .lock()
520 .map_err(|e| {
521 QueryError::Execution(format!("plan cache lock poisoned: {e}"))
522 })?
523 .insert(hash, plan.clone());
524 let plan = lower_unindexed_scans(&self.catalog, &plan);
525 let result = self.execute_plan(&plan);
526 if !self.in_transaction {
527 self.catalog
528 .commit_autocommit()
529 .map_err(|e| QueryError::StorageError(e.to_string()))?;
530 }
531 result
532 }
533 Err(e) => Err(QueryError::Parse(e.to_string())),
534 };
535 }
536 // Lex error — fall through to the planner so the caller gets a
537 // consistent error shape.
538 return match planner::plan(input) {
539 Ok(plan) => {
540 let plan = lower_unindexed_scans(&self.catalog, &plan);
541 let result = self.execute_plan(&plan);
542 if !self.in_transaction {
543 self.catalog
544 .commit_autocommit()
545 .map_err(|e| QueryError::StorageError(e.to_string()))?;
546 }
547 result
548 }
549 Err(e) => Err(QueryError::Parse(e.to_string())),
550 };
551 }
552
553 // Instrumented path — only taken under explicit tracing subscribers.
554 let total_start = Instant::now();
555 let plan_start = Instant::now();
556 let plan = planner::plan(input).map_err(|e| {
557 let msg = e.to_string();
558 error!(query = %input, error = %msg, "query plan failed");
559 QueryError::Parse(msg)
560 })?;
561 let plan_us = plan_start.elapsed().as_micros();
562
563 let exec_start = Instant::now();
564 let plan = lower_unindexed_scans(&self.catalog, &plan);
565 let result = self.execute_plan(&plan);
566 if !self.in_transaction {
567 self.catalog
568 .commit_autocommit()
569 .map_err(|e| QueryError::StorageError(e.to_string()))?;
570 }
571 let exec_us = exec_start.elapsed().as_micros();
572
573 let total_us = total_start.elapsed().as_micros();
574 match &result {
575 Ok(r) => {
576 info!(
577 query = %input,
578 plan_us = plan_us,
579 exec_us = exec_us,
580 total_us = total_us,
581 rows = r.row_count(),
582 "query ok"
583 );
584 }
585 Err(e) => {
586 error!(
587 query = %input,
588 plan_us = plan_us,
589 exec_us = exec_us,
590 error = %e,
591 "query failed"
592 );
593 }
594 }
595 result
596 }
597
598 /// Parse + plan + execute a SQL query through the SQL frontend.
599 ///
600 /// SQL is lowered to the existing PowDB AST and to canonical PowQL text.
601 /// The canonical PowQL text is used as the plan-cache key, so equivalent
602 /// SQL and PowQL spellings share cached plans.
603 pub fn execute_sql(&mut self, input: &str) -> Result<QueryResult, QueryError> {
604 let _budget = self.enter_memory_budget();
605 let parsed = crate::sql::parse_sql_with_canonical(input)
606 .map_err(|e| QueryError::Parse(e.to_string()))?;
607
608 if !tracing::enabled!(Level::INFO) {
609 if let Ok((hash, literals)) = canonicalize(&parsed.canonical_powql) {
610 let cached = self
611 .plan_cache
612 .lock()
613 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
614 .get_with_substitution(hash, &literals);
615 if let Some(plan) = cached {
616 let plan = lower_unindexed_scans(&self.catalog, &plan);
617 let result = self.execute_plan(&plan);
618 if !self.in_transaction {
619 self.catalog
620 .commit_autocommit()
621 .map_err(|e| QueryError::StorageError(e.to_string()))?;
622 }
623 return result;
624 }
625
626 let plan = crate::planner::plan_statement(parsed.statement)
627 .map_err(|e| QueryError::Parse(e.to_string()))?;
628 self.plan_cache
629 .lock()
630 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
631 .insert(hash, plan.clone());
632 let plan = lower_unindexed_scans(&self.catalog, &plan);
633 let result = self.execute_plan(&plan);
634 if !self.in_transaction {
635 self.catalog
636 .commit_autocommit()
637 .map_err(|e| QueryError::StorageError(e.to_string()))?;
638 }
639 return result;
640 }
641 }
642
643 let plan = crate::planner::plan_statement(parsed.statement)
644 .map_err(|e| QueryError::Parse(e.to_string()))?;
645 let plan = lower_unindexed_scans(&self.catalog, &plan);
646 let result = self.execute_plan(&plan);
647 if !self.in_transaction {
648 self.catalog
649 .commit_autocommit()
650 .map_err(|e| QueryError::StorageError(e.to_string()))?;
651 }
652 result
653 }
654
655 /// Read-only variant of [`Engine::execute_sql`].
656 pub fn execute_sql_readonly(&self, input: &str) -> Result<QueryResult, QueryError> {
657 let _budget = self.enter_memory_budget();
658 let parsed = crate::sql::parse_sql_with_canonical(input)
659 .map_err(|e| QueryError::Parse(e.to_string()))?;
660 if !is_read_only_statement(&parsed.statement) {
661 return Err(QueryError::ReadonlyNeedsWrite);
662 }
663
664 if let Ok((hash, literals)) = canonicalize(&parsed.canonical_powql) {
665 let cached = self
666 .plan_cache
667 .lock()
668 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
669 .get_with_substitution(hash, &literals);
670 if let Some(plan) = cached {
671 let plan = lower_unindexed_scans(&self.catalog, &plan);
672 return self.execute_plan_readonly(&plan);
673 }
674 let plan = crate::planner::plan_statement(parsed.statement)
675 .map_err(|e| QueryError::Parse(e.to_string()))?;
676 self.plan_cache
677 .lock()
678 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
679 .insert(hash, plan.clone());
680 let plan = lower_unindexed_scans(&self.catalog, &plan);
681 return self.execute_plan_readonly(&plan);
682 }
683
684 let plan = crate::planner::plan_statement(parsed.statement)
685 .map_err(|e| QueryError::Parse(e.to_string()))?;
686 let plan = lower_unindexed_scans(&self.catalog, &plan);
687 self.execute_plan_readonly(&plan)
688 }
689
690 /// Execute PowQL with `$N` placeholders bound to positional `params`.
691 ///
692 /// Task 4: parameters are substituted as literal *tokens* before
693 /// parsing (see [`crate::parser::parse_with_params`]), so untrusted
694 /// input can never change the query's shape. This path deliberately
695 /// **bypasses the plan cache** — template caching is a follow-up — and
696 /// otherwise mirrors the non-cached tail of [`Engine::execute_powql`].
697 pub fn execute_powql_with_params(
698 &mut self,
699 input: &str,
700 params: &[crate::ast::ParamValue],
701 ) -> Result<QueryResult, QueryError> {
702 let _budget = self.enter_memory_budget();
703 let stmt = crate::parser::parse_with_params(input, params)
704 .map_err(|e| QueryError::Parse(e.to_string()))?;
705 let plan =
706 crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
707 let plan = lower_unindexed_scans(&self.catalog, &plan);
708 let result = self.execute_plan(&plan);
709 if !self.in_transaction {
710 self.catalog
711 .commit_autocommit()
712 .map_err(|e| QueryError::StorageError(e.to_string()))?;
713 }
714 result
715 }
716
717 /// Read-only variant of [`Engine::execute_powql_with_params`].
718 ///
719 /// Mirrors [`Engine::execute_powql_readonly`]: parses with bound
720 /// params, rejects any write statement with
721 /// [`QueryError::ReadonlyNeedsWrite`] so the caller can escalate to the
722 /// write lock, then executes under a shared borrow. No plan-cache
723 /// interaction.
724 pub fn execute_powql_readonly_with_params(
725 &self,
726 input: &str,
727 params: &[crate::ast::ParamValue],
728 ) -> Result<QueryResult, QueryError> {
729 let _budget = self.enter_memory_budget();
730 let stmt = crate::parser::parse_with_params(input, params)
731 .map_err(|e| QueryError::Parse(e.to_string()))?;
732 if !is_read_only_statement(&stmt) {
733 return Err(QueryError::ReadonlyNeedsWrite);
734 }
735 let plan =
736 crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
737 let plan = lower_unindexed_scans(&self.catalog, &plan);
738 self.execute_plan_readonly(&plan)
739 }
740
741 /// Plan cache stats — useful for benches and debugging.
742 pub fn plan_cache_stats(&self) -> (u64, u64, usize) {
743 let cache = self.plan_cache.lock().unwrap_or_else(|e| e.into_inner());
744 (cache.hits, cache.misses, cache.len())
745 }
746
747 /// Mission infra-1: read-only entry point.
748 ///
749 /// Parses + plans + executes a PowQL query using only a shared borrow
750 /// on the engine. Rejects any statement that would mutate state
751 /// (Insert/Update/Delete/CreateTable/AlterTable/DropTable/CreateView/
752 /// RefreshView/DropView) by returning [`READONLY_NEEDS_WRITE`] so the
753 /// caller can escalate to the write lock.
754 ///
755 /// Also returns [`READONLY_NEEDS_WRITE`] if a materialized view in the
756 /// query is dirty — refreshing one requires `&mut self`, so the caller
757 /// must retake the write lock for the first refresh.
758 ///
759 /// This method is the concurrent-read fast path behind
760 /// `Arc<RwLock<Engine>>`: multiple threads can call it simultaneously
761 /// under a shared `.read()` lock and each will scan independently.
762 pub fn execute_powql_readonly(&self, input: &str) -> Result<QueryResult, QueryError> {
763 // WS2: each *outermost* statement starts with the full memory
764 // allowance. The guard holds the reentrancy depth so a nested
765 // `execute_powql*` does not reset the outer frame's accounting.
766 let _budget = self.enter_memory_budget();
767 // Parse the statement first so we can classify read vs. write
768 // without touching the catalog. This is the same lex+parse cost
769 // the hot path would pay anyway.
770 let stmt = crate::parser::parse(input).map_err(|e| QueryError::Parse(e.to_string()))?;
771 if !is_read_only_statement(&stmt) {
772 return Err(QueryError::ReadonlyNeedsWrite);
773 }
774
775 // Try the plan cache first — identical hash scheme to
776 // `execute_powql` so both paths share cache state. The mutex
777 // section is just a hashmap lookup + plan clone.
778 if let Ok((hash, literals)) = canonicalize(input) {
779 let cached = self
780 .plan_cache
781 .lock()
782 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
783 .get_with_substitution(hash, &literals);
784 if let Some(plan) = cached {
785 let plan = lower_unindexed_scans(&self.catalog, &plan);
786 return self.execute_plan_readonly(&plan);
787 }
788 // Miss: plan + insert + execute. The planner is pure, so this
789 // is safe from `&self`.
790 let plan = crate::planner::plan_statement(stmt)
791 .map_err(|e| QueryError::Parse(e.to_string()))?;
792 self.plan_cache
793 .lock()
794 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
795 .insert(hash, plan.clone());
796 let plan = lower_unindexed_scans(&self.catalog, &plan);
797 return self.execute_plan_readonly(&plan);
798 }
799 // Lex error — fall through to the planner for a consistent error
800 // shape (though `parse` above would usually have caught it).
801 let plan =
802 crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
803 let plan = lower_unindexed_scans(&self.catalog, &plan);
804 self.execute_plan_readonly(&plan)
805 }
806
807 /// Read-only version of [`Engine::execute_plan`]. Dispatches the
808 /// read-path plan variants by calling `&self` helpers and errors with
809 /// [`READONLY_NEEDS_WRITE`] on any write variant. This is the
810 /// recursion target for composite read plans under the RwLock reader.
811 ///
812 /// The dispatch mirrors `execute_plan` for the read branches but does
813 /// not carry any of the fast-paths that need `&mut self` (e.g. plan-
814 /// cache mutation on inner subqueries is handled via the shared mutex
815 /// in [`Engine::execute_powql_readonly`]; in-flight subquery
816 /// materialisation uses [`Engine::materialize_subqueries_readonly`]).
817 fn execute_plan_readonly(&self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
818 match plan {
819 PlanNode::SeqScan { table } => {
820 // Dirty view means we'd need to refresh it — can't do that
821 // under `&self`. Escalate to the write path.
822 if self.view_registry.is_dirty(table) {
823 return Err(QueryError::ReadonlyNeedsWrite);
824 }
825 let schema = self
826 .catalog
827 .schema(table)
828 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
829 .clone();
830 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
831 let rows: Vec<Vec<Value>> = self
832 .catalog
833 .scan(table)
834 .map_err(|e| e.to_string())?
835 .map(|(_, row)| row)
836 .collect();
837 Ok(QueryResult::Rows { columns, rows })
838 }
839
840 PlanNode::AliasScan { table, alias } => {
841 let schema = self
842 .catalog
843 .schema(table)
844 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
845 .clone();
846 let columns: Vec<String> = schema
847 .columns
848 .iter()
849 .map(|c| format!("{alias}.{}", c.name))
850 .collect();
851 let rows: Vec<Vec<Value>> = self
852 .catalog
853 .scan(table)
854 .map_err(|e| e.to_string())?
855 .map(|(_, row)| row)
856 .collect();
857 Ok(QueryResult::Rows { columns, rows })
858 }
859
860 PlanNode::IndexScan { table, column, key } => {
861 let schema = self
862 .catalog
863 .schema(table)
864 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
865 .clone();
866 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
867 let key_value = literal_to_value(key)?;
868 let tbl = self
869 .catalog
870 .get_table(table)
871 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
872
873 if tbl.has_index(column) {
874 // Use index_lookup_all to handle both unique and
875 // non-unique indexes — returns all matching RowIds.
876 let rids = tbl.index_lookup_all(column, &key_value);
877 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
878 for rid in rids {
879 if let Some(data) = tbl.heap.get(rid) {
880 rows.push(decode_row(&tbl.schema, &data));
881 }
882 }
883 return Ok(QueryResult::Rows { columns, rows });
884 }
885
886 // No index: synthetic eq predicate + compiled scan.
887 let fast = FastLayout::new(&schema);
888 let synth_pred = Expr::BinaryOp(
889 Box::new(Expr::Field(column.clone())),
890 BinOp::Eq,
891 Box::new(key.clone()),
892 );
893 if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, &schema) {
894 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
895 self.catalog
896 .for_each_row_raw(table, |_rid, data| {
897 if compiled(data) {
898 rows.push(decode_row(&schema, data));
899 }
900 })
901 .map_err(|e| QueryError::StorageError(e.to_string()))?;
902 return Ok(QueryResult::Rows { columns, rows });
903 }
904
905 // Last resort: slow eq-check.
906 let col_idx =
907 schema
908 .column_index(column)
909 .ok_or_else(|| QueryError::ColumnNotFound {
910 table: String::new(),
911 column: column.clone(),
912 })?;
913 let rows: Vec<Vec<Value>> = tbl
914 .scan()
915 .filter_map(|(_, row)| {
916 if row[col_idx] == key_value {
917 Some(row)
918 } else {
919 None
920 }
921 })
922 .collect();
923 Ok(QueryResult::Rows { columns, rows })
924 }
925
926 PlanNode::RangeScan {
927 table,
928 column,
929 start,
930 end,
931 } => {
932 let tbl = self
933 .catalog
934 .get_table(table)
935 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
936 let columns: Vec<String> =
937 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
938 let schema = tbl.schema.clone();
939
940 let start_val = match start {
941 Some((expr, _)) => Some(literal_to_value(expr)?),
942 None => None,
943 };
944 let end_val = match end {
945 Some((expr, _)) => Some(literal_to_value(expr)?),
946 None => None,
947 };
948 let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
949 let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
950
951 // Range scans only use the btree fast path for unique indexes.
952 // Non-unique indexes store composite keys that don't compare
953 // directly against raw column values.
954 if tbl.is_index_unique(column) == Some(true) {
955 if let Some(btree) = tbl.index(column) {
956 let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
957 (Some(s), Some(e)) => btree.range(s, e).collect(),
958 (Some(s), None) => btree.range_from(s),
959 (None, Some(e)) => btree.range_to(e),
960 (None, None) => {
961 // Unbounded both sides — equivalent to seq scan.
962 let rows: Vec<Vec<Value>> =
963 tbl.scan().map(|(_, row)| row).collect();
964 return Ok(QueryResult::Rows { columns, rows });
965 }
966 };
967 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
968 for (key, rid) in hits {
969 // Filter for exclusive bounds.
970 if !start_inclusive {
971 if let Some(ref s) = start_val {
972 if &key == s {
973 continue;
974 }
975 }
976 }
977 if !end_inclusive {
978 if let Some(ref e) = end_val {
979 if &key == e {
980 continue;
981 }
982 }
983 }
984 if let Some(data) = tbl.heap.get(rid) {
985 rows.push(decode_row(&schema, &data));
986 }
987 }
988 return Ok(QueryResult::Rows { columns, rows });
989 }
990 }
991
992 // Fallback: no index — synthesize the range predicate and scan.
993 let fast = FastLayout::new(&schema);
994 let synth = synthesize_range_predicate(column, start, end);
995 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, &schema) {
996 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
997 self.catalog
998 .for_each_row_raw(table, |_rid, data| {
999 if compiled(data) {
1000 rows.push(decode_row(&schema, data));
1001 }
1002 })
1003 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1004 return Ok(QueryResult::Rows { columns, rows });
1005 }
1006
1007 // Last resort: decoded row eval.
1008 let col_idx =
1009 schema
1010 .column_index(column)
1011 .ok_or_else(|| QueryError::ColumnNotFound {
1012 table: String::new(),
1013 column: column.clone(),
1014 })?;
1015 let rows: Vec<Vec<Value>> = tbl
1016 .scan()
1017 .filter(|(_, row)| {
1018 range_matches(
1019 &row[col_idx],
1020 &start_val,
1021 start_inclusive,
1022 &end_val,
1023 end_inclusive,
1024 )
1025 })
1026 .map(|(_, row)| row)
1027 .collect();
1028 Ok(QueryResult::Rows { columns, rows })
1029 }
1030
1031 PlanNode::Filter { input, predicate } => {
1032 // Materialise subqueries using the `&self` variant.
1033 // Uncorrelated subqueries are replaced with InList/Bool;
1034 // correlated ones are left as InSubquery/ExistsSubquery
1035 // for per-row materialisation below.
1036 let materialized;
1037 let predicate = if contains_subquery(predicate) {
1038 materialized = self.materialize_subqueries_readonly(predicate)?;
1039 &materialized
1040 } else {
1041 predicate
1042 };
1043
1044 // Correlated subquery path: per-row materialisation.
1045 if contains_subquery(predicate) {
1046 let result = self.execute_plan_readonly(input)?;
1047 return match result {
1048 QueryResult::Rows { columns, rows } => {
1049 let mut filtered = Vec::new();
1050 for row in rows {
1051 let row_pred = self.materialize_correlated_for_row_readonly(
1052 predicate, &row, &columns,
1053 )?;
1054 if eval_predicate(&row_pred, &row, &columns) {
1055 filtered.push(row);
1056 }
1057 }
1058 Ok(QueryResult::Rows {
1059 columns,
1060 rows: filtered,
1061 })
1062 }
1063 _ => Err("filter requires row input".into()),
1064 };
1065 }
1066
1067 // Fused Filter+SeqScan fast path.
1068 if let PlanNode::SeqScan { table } = input.as_ref() {
1069 if self.view_registry.is_dirty(table) {
1070 return Err(QueryError::ReadonlyNeedsWrite);
1071 }
1072 let schema = self
1073 .catalog
1074 .schema(table)
1075 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
1076 .clone();
1077 let columns: Vec<String> =
1078 schema.columns.iter().map(|c| c.name.clone()).collect();
1079 let fast = FastLayout::new(&schema);
1080 let row_layout = RowLayout::new(&schema);
1081 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1082
1083 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
1084 self.catalog
1085 .for_each_row_raw(table, |_rid, data| {
1086 if compiled(data) {
1087 rows.push(decode_row(&schema, data));
1088 }
1089 })
1090 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1091 } else {
1092 let pred_cols = predicate_column_indices(predicate, &columns);
1093 self.catalog
1094 .for_each_row_raw(table, |_rid, data| {
1095 let pred_row =
1096 decode_selective(&schema, &row_layout, data, &pred_cols);
1097 if eval_predicate(predicate, &pred_row, &columns) {
1098 rows.push(decode_row(&schema, data));
1099 }
1100 })
1101 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1102 }
1103
1104 return Ok(QueryResult::Rows { columns, rows });
1105 }
1106
1107 // General path.
1108 let result = self.execute_plan_readonly(input)?;
1109 match result {
1110 QueryResult::Rows { columns, rows } => {
1111 let filtered: Vec<Vec<Value>> = rows
1112 .into_iter()
1113 .filter(|row| eval_predicate(predicate, row, &columns))
1114 .collect();
1115 Ok(QueryResult::Rows {
1116 columns,
1117 rows: filtered,
1118 })
1119 }
1120 _ => Err("filter requires row input".into()),
1121 }
1122 }
1123
1124 PlanNode::Project { input, fields } => {
1125 // Fast path: Project over IndexScan. Avoids full-row decode
1126 // by calling decode_column only for projected fields.
1127 if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
1128 let key_value = literal_to_value(key)?;
1129 let tbl = self
1130 .catalog
1131 .get_table(table)
1132 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
1133 let schema = &tbl.schema;
1134 let layout = tbl.row_layout();
1135
1136 let proj_columns: Vec<String> = fields
1137 .iter()
1138 .map(|f| {
1139 f.alias.clone().unwrap_or_else(|| match &f.expr {
1140 Expr::Field(name) => name.clone(),
1141 _ => "?".into(),
1142 })
1143 })
1144 .collect();
1145
1146 let proj_indices: Vec<usize> = fields
1147 .iter()
1148 .filter_map(|f| {
1149 if let Expr::Field(name) = &f.expr {
1150 schema.column_index(name)
1151 } else {
1152 None
1153 }
1154 })
1155 .collect();
1156
1157 if tbl.has_index(column) {
1158 let rids = tbl.index_lookup_all(column, &key_value);
1159 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1160 for rid in rids {
1161 if let Some(data) = tbl.heap.get(rid) {
1162 let row: Vec<Value> = proj_indices
1163 .iter()
1164 .map(|&ci| decode_column(schema, layout, &data, ci))
1165 .collect();
1166 rows.push(row);
1167 }
1168 }
1169 return Ok(QueryResult::Rows {
1170 columns: proj_columns,
1171 rows,
1172 });
1173 }
1174 }
1175
1176 // Fast paths over Limit(Sort(...)) / Limit(Filter(...)) / Limit(SeqScan).
1177 if let PlanNode::Limit {
1178 input: inner,
1179 count: limit_expr,
1180 } = input.as_ref()
1181 {
1182 if let PlanNode::Sort {
1183 input: sort_input,
1184 keys,
1185 } = inner.as_ref()
1186 {
1187 if keys.len() == 1 {
1188 let sort_field = &keys[0].field;
1189 let descending = keys[0].descending;
1190 let limit = match limit_expr {
1191 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1192 _ => usize::MAX,
1193 };
1194 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
1195 match sort_input.as_ref() {
1196 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
1197 PlanNode::Filter {
1198 input: fi,
1199 predicate,
1200 } => {
1201 if let PlanNode::SeqScan { table } = fi.as_ref() {
1202 (Some(table.as_str()), Some(predicate))
1203 } else {
1204 (None, None)
1205 }
1206 }
1207 _ => (None, None),
1208 };
1209 if let Some(table) = table_opt {
1210 if let Some(result) = self.project_filter_sort_limit_fast(
1211 table, fields, sort_field, descending, limit, pred_opt,
1212 )? {
1213 return Ok(result);
1214 }
1215 }
1216 }
1217 }
1218 if let PlanNode::Filter {
1219 input: fi,
1220 predicate,
1221 } = inner.as_ref()
1222 {
1223 if let PlanNode::SeqScan { table } = fi.as_ref() {
1224 let limit = match limit_expr {
1225 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1226 _ => usize::MAX,
1227 };
1228 if let Some(result) = self.project_filter_limit_fast(
1229 table,
1230 fields,
1231 limit,
1232 Some(predicate),
1233 )? {
1234 return Ok(result);
1235 }
1236 }
1237 }
1238 if let PlanNode::SeqScan { table } = inner.as_ref() {
1239 let limit = match limit_expr {
1240 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1241 _ => usize::MAX,
1242 };
1243 if let Some(result) =
1244 self.project_filter_limit_fast(table, fields, limit, None)?
1245 {
1246 return Ok(result);
1247 }
1248 }
1249 }
1250
1251 // Project(Filter(SeqScan)) without Limit.
1252 if let PlanNode::Filter {
1253 input: fi,
1254 predicate,
1255 } = input.as_ref()
1256 {
1257 if let PlanNode::SeqScan { table } = fi.as_ref() {
1258 if let Some(result) = self.project_filter_limit_fast(
1259 table,
1260 fields,
1261 usize::MAX,
1262 Some(predicate),
1263 )? {
1264 return Ok(result);
1265 }
1266 }
1267 }
1268
1269 // Project(SeqScan) without Filter or Limit.
1270 if let PlanNode::SeqScan { table } = input.as_ref() {
1271 if let Some(result) =
1272 self.project_filter_limit_fast(table, fields, usize::MAX, None)?
1273 {
1274 return Ok(result);
1275 }
1276 }
1277
1278 // Generic path.
1279 let result = self.execute_plan_readonly(input)?;
1280 match result {
1281 QueryResult::Rows { columns, rows } => {
1282 let proj_columns: Vec<String> = fields
1283 .iter()
1284 .map(|f| {
1285 f.alias.clone().unwrap_or_else(|| match &f.expr {
1286 Expr::Field(name) => name.clone(),
1287 Expr::QualifiedField { qualifier, field } => {
1288 format!("{qualifier}.{field}")
1289 }
1290 _ => "?".into(),
1291 })
1292 })
1293 .collect();
1294 let proj_rows: Vec<Vec<Value>> = rows
1295 .iter()
1296 .map(|row| {
1297 fields
1298 .iter()
1299 .map(|f| eval_expr(&f.expr, row, &columns))
1300 .collect()
1301 })
1302 .collect();
1303 Ok(QueryResult::Rows {
1304 columns: proj_columns,
1305 rows: proj_rows,
1306 })
1307 }
1308 _ => Err("project requires row input".into()),
1309 }
1310 }
1311
1312 PlanNode::Sort { input, keys } => {
1313 let result = self.execute_plan_readonly(input)?;
1314 match result {
1315 QueryResult::Rows { columns, mut rows } => {
1316 if rows.len() > MAX_SORT_ROWS {
1317 return Err(QueryError::SortLimitExceeded);
1318 }
1319 // WS2: byte-budget guard on the sort buffer.
1320 self.charge_rows(&rows)?;
1321 let key_indices: Vec<(usize, bool)> = keys
1322 .iter()
1323 .map(|k| {
1324 columns
1325 .iter()
1326 .position(|c| c == &k.field)
1327 .map(|idx| (idx, k.descending))
1328 .ok_or_else(|| QueryError::ColumnNotFound {
1329 table: String::new(),
1330 column: k.field.clone(),
1331 })
1332 })
1333 .collect::<Result<_, QueryError>>()?;
1334 rows.sort_by(|a, b| {
1335 for &(col_idx, descending) in &key_indices {
1336 let cmp = a[col_idx].cmp(&b[col_idx]);
1337 let cmp = if descending { cmp.reverse() } else { cmp };
1338 if cmp != std::cmp::Ordering::Equal {
1339 return cmp;
1340 }
1341 }
1342 std::cmp::Ordering::Equal
1343 });
1344 Ok(QueryResult::Rows { columns, rows })
1345 }
1346 _ => Err("sort requires row input".into()),
1347 }
1348 }
1349
1350 PlanNode::Limit { input, count } => {
1351 let result = self.execute_plan_readonly(input)?;
1352 let n = match count {
1353 Expr::Literal(Literal::Int(v)) => *v as usize,
1354 _ => return Err("limit must be integer literal".into()),
1355 };
1356 match result {
1357 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1358 columns,
1359 rows: rows.into_iter().take(n).collect(),
1360 }),
1361 _ => Err("limit requires row input".into()),
1362 }
1363 }
1364
1365 PlanNode::Offset { input, count } => {
1366 let result = self.execute_plan_readonly(input)?;
1367 let n = match count {
1368 Expr::Literal(Literal::Int(v)) => *v as usize,
1369 _ => return Err("offset must be integer literal".into()),
1370 };
1371 match result {
1372 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1373 columns,
1374 rows: rows.into_iter().skip(n).collect(),
1375 }),
1376 _ => Err("offset requires row input".into()),
1377 }
1378 }
1379
1380 PlanNode::Aggregate {
1381 input,
1382 function,
1383 field,
1384 } => {
1385 // Fast path: count() over SeqScan.
1386 if *function == AggFunc::Count {
1387 if let PlanNode::SeqScan { table } = input.as_ref() {
1388 // A dirty materialized view must be refreshed before
1389 // it can be counted, which needs `&mut self`. Escalate
1390 // to the write path (F3: count(View) returned stale).
1391 if self.view_registry.is_dirty(table) {
1392 return Err(QueryError::ReadonlyNeedsWrite);
1393 }
1394 let mut count: i64 = 0;
1395 self.catalog
1396 .for_each_row_raw(table, |_rid, _data| {
1397 count += 1;
1398 })
1399 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1400 return Ok(QueryResult::Scalar(Value::Int(count)));
1401 }
1402 if let PlanNode::Filter {
1403 input: inner,
1404 predicate,
1405 } = input.as_ref()
1406 {
1407 // Only take the fast path for a plain Filter(SeqScan)
1408 // with no subquery in the predicate. A subquery
1409 // predicate (`count(T filter .x in (...))`) must be
1410 // resolved first; the fast path evaluates the raw
1411 // predicate with no subquery materialisation, which
1412 // silently yields 0 (F1). Falling through routes it to
1413 // the generic path that runs the subquery correctly.
1414 if let PlanNode::SeqScan { table } = inner.as_ref() {
1415 if self.view_registry.is_dirty(table) {
1416 // F3: count(View filter ...) over a dirty view.
1417 return Err(QueryError::ReadonlyNeedsWrite);
1418 }
1419 }
1420 if let (PlanNode::SeqScan { table }, false) =
1421 (inner.as_ref(), contains_subquery(predicate))
1422 {
1423 let schema = self
1424 .catalog
1425 .schema(table)
1426 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
1427 .clone();
1428 let columns: Vec<String> =
1429 schema.columns.iter().map(|c| c.name.clone()).collect();
1430 let fast = FastLayout::new(&schema);
1431 let row_layout = RowLayout::new(&schema);
1432
1433 if let Some(compiled) =
1434 compile_predicate(predicate, &columns, &fast, &schema)
1435 {
1436 let mut count: i64 = 0;
1437 self.catalog
1438 .for_each_row_raw(table, |_rid, data| {
1439 if compiled(data) {
1440 count += 1;
1441 }
1442 })
1443 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1444 return Ok(QueryResult::Scalar(Value::Int(count)));
1445 }
1446
1447 let pred_cols = predicate_column_indices(predicate, &columns);
1448 let mut count: i64 = 0;
1449 self.catalog
1450 .for_each_row_raw(table, |_rid, data| {
1451 let pred_row =
1452 decode_selective(&schema, &row_layout, data, &pred_cols);
1453 if eval_predicate(predicate, &pred_row, &columns) {
1454 count += 1;
1455 }
1456 })
1457 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1458 return Ok(QueryResult::Scalar(Value::Int(count)));
1459 }
1460 }
1461 }
1462
1463 // Fast path: sum/avg/min/max over single fixed-size numeric.
1464 if matches!(
1465 function,
1466 AggFunc::Sum
1467 | AggFunc::Avg
1468 | AggFunc::Min
1469 | AggFunc::Max
1470 | AggFunc::CountDistinct
1471 ) {
1472 if let Some(col) = field.as_ref() {
1473 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
1474 match input.as_ref() {
1475 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
1476 PlanNode::Filter {
1477 input: inner,
1478 predicate,
1479 } => {
1480 if let PlanNode::SeqScan { table } = inner.as_ref() {
1481 (Some(table.as_str()), Some(predicate))
1482 } else {
1483 (None, None)
1484 }
1485 }
1486 _ => (None, None),
1487 };
1488 if let Some(table) = table_opt {
1489 if let Some(result) =
1490 self.agg_single_col_fast(table, col, *function, pred_opt)?
1491 {
1492 return Ok(result);
1493 }
1494 }
1495 }
1496 }
1497
1498 // Generic path.
1499 let result = self.execute_plan_readonly(input)?;
1500 match result {
1501 QueryResult::Rows { columns, rows } => match function {
1502 AggFunc::Count => Ok(QueryResult::Scalar(Value::Int(rows.len() as i64))),
1503 AggFunc::CountDistinct => {
1504 let col = field.as_ref().ok_or("count distinct requires field")?;
1505 let idx = columns
1506 .iter()
1507 .position(|c| c == col)
1508 .ok_or("col not found")?;
1509 let mut seen = std::collections::HashSet::new();
1510 for row in &rows {
1511 let v = &row[idx];
1512 if !v.is_empty() {
1513 seen.insert(v.clone());
1514 }
1515 }
1516 Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
1517 }
1518 AggFunc::Avg => {
1519 let col = field.as_ref().ok_or("avg requires field")?;
1520 let idx = columns
1521 .iter()
1522 .position(|c| c == col)
1523 .ok_or("col not found")?;
1524 let sum: f64 = rows
1525 .iter()
1526 .filter_map(|r| match &r[idx] {
1527 Value::Int(v) => Some(*v as f64),
1528 Value::Float(v) => Some(*v),
1529 _ => None,
1530 })
1531 .sum();
1532 let count = rows.len() as f64;
1533 Ok(QueryResult::Scalar(Value::Float(sum / count)))
1534 }
1535 AggFunc::Sum => {
1536 let col = field.as_ref().ok_or("sum requires field")?;
1537 let idx = columns
1538 .iter()
1539 .position(|c| c == col)
1540 .ok_or("col not found")?;
1541 let mut int_sum: i64 = 0;
1542 let mut float_sum: f64 = 0.0;
1543 let mut saw_float = false;
1544 for r in &rows {
1545 match &r[idx] {
1546 Value::Int(v) => int_sum += *v,
1547 Value::Float(v) => {
1548 float_sum += *v;
1549 saw_float = true;
1550 }
1551 _ => {}
1552 }
1553 }
1554 let result = if saw_float {
1555 Value::Float(float_sum + int_sum as f64)
1556 } else {
1557 Value::Int(int_sum)
1558 };
1559 Ok(QueryResult::Scalar(result))
1560 }
1561 AggFunc::Min | AggFunc::Max => {
1562 let col = field.as_ref().ok_or("min/max requires field")?;
1563 let idx = columns
1564 .iter()
1565 .position(|c| c == col)
1566 .ok_or("col not found")?;
1567 let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
1568 let result = if *function == AggFunc::Min {
1569 vals.into_iter().min().cloned()
1570 } else {
1571 vals.into_iter().max().cloned()
1572 };
1573 Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
1574 }
1575 },
1576 _ => Err("aggregate requires row input".into()),
1577 }
1578 }
1579
1580 PlanNode::Distinct { input } => {
1581 let result = self.execute_plan_readonly(input)?;
1582 match result {
1583 QueryResult::Rows { columns, rows } => {
1584 let mut seen = std::collections::HashSet::new();
1585 let mut unique_rows = Vec::new();
1586 for row in rows {
1587 if seen.insert(row.clone()) {
1588 unique_rows.push(row);
1589 }
1590 }
1591 Ok(QueryResult::Rows {
1592 columns,
1593 rows: unique_rows,
1594 })
1595 }
1596 other => Ok(other),
1597 }
1598 }
1599
1600 PlanNode::GroupBy {
1601 input,
1602 keys,
1603 aggregates,
1604 having,
1605 } => {
1606 let result = self.execute_plan_readonly(input)?;
1607 match result {
1608 QueryResult::Rows { columns, rows } => {
1609 // WS2: byte-budget guard on the GROUP BY input buffer
1610 // (the hash table is bounded by the input it groups).
1611 self.charge_rows(&rows)?;
1612 let key_indices: Vec<usize> = keys
1613 .iter()
1614 .map(|k| {
1615 columns.iter().position(|c| c == k).ok_or_else(|| {
1616 QueryError::ColumnNotFound {
1617 table: String::new(),
1618 column: k.clone(),
1619 }
1620 })
1621 })
1622 .collect::<Result<Vec<_>, _>>()?;
1623
1624 let agg_field_indices: Vec<usize> = aggregates
1625 .iter()
1626 .map(|a| {
1627 if a.field == "*" {
1628 Ok(usize::MAX)
1629 } else {
1630 columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1631 QueryError::ColumnNotFound {
1632 table: String::new(),
1633 column: a.field.clone(),
1634 }
1635 })
1636 }
1637 })
1638 .collect::<Result<Vec<_>, _>>()?;
1639
1640 let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1641 rustc_hash::FxHashMap::default();
1642 let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1643 for (ri, row) in rows.iter().enumerate() {
1644 let key: Vec<Value> =
1645 key_indices.iter().map(|&i| row[i].clone()).collect();
1646 match group_map.get(&key) {
1647 Some(&idx) => groups[idx].1.push(ri),
1648 None => {
1649 let idx = groups.len();
1650 group_map.insert(key.clone(), idx);
1651 groups.push((key, vec![ri]));
1652 }
1653 }
1654 }
1655
1656 let mut out_columns: Vec<String> = keys.clone();
1657 for agg in aggregates.iter() {
1658 out_columns.push(agg.output_name.clone());
1659 }
1660
1661 let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1662 for (key_vals, row_indices) in &groups {
1663 let mut row = key_vals.clone();
1664 for (ai, agg) in aggregates.iter().enumerate() {
1665 let col_idx = agg_field_indices[ai];
1666 let val = compute_group_aggregate(
1667 agg.function,
1668 &rows,
1669 row_indices,
1670 col_idx,
1671 );
1672 row.push(val);
1673 }
1674 out_rows.push(row);
1675 }
1676
1677 if let Some(having_expr) = having {
1678 out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1679 }
1680
1681 Ok(QueryResult::Rows {
1682 columns: out_columns,
1683 rows: out_rows,
1684 })
1685 }
1686 _ => Err("group by requires row input".into()),
1687 }
1688 }
1689
1690 PlanNode::NestedLoopJoin {
1691 left,
1692 right,
1693 on,
1694 kind,
1695 } => {
1696 let left_result = self.execute_plan_readonly(left)?;
1697 let right_result = self.execute_plan_readonly(right)?;
1698 let (left_columns, left_rows) = match left_result {
1699 QueryResult::Rows { columns, rows } => (columns, rows),
1700 _ => return Err("join left side must produce rows".into()),
1701 };
1702 let (right_columns, right_rows) = match right_result {
1703 QueryResult::Rows { columns, rows } => (columns, rows),
1704 _ => return Err("join right side must produce rows".into()),
1705 };
1706
1707 // WS2: byte-budget guard on the join build side.
1708 self.charge_rows(&left_rows)?;
1709 self.charge_rows(&right_rows)?;
1710
1711 if !matches!(kind, JoinKind::Cross) {
1712 if let Some(pred) = on {
1713 if let Some((l_idx, r_idx)) =
1714 try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1715 {
1716 let result = hash_join(
1717 left_columns,
1718 left_rows,
1719 right_columns,
1720 right_rows,
1721 l_idx,
1722 r_idx,
1723 *kind,
1724 );
1725 if let QueryResult::Rows { ref rows, .. } = result {
1726 check_join_limit(rows.len())?;
1727 }
1728 return Ok(result);
1729 }
1730 }
1731 }
1732
1733 let n_left = left_columns.len();
1734 let n_right = right_columns.len();
1735 let mut columns = Vec::with_capacity(n_left + n_right);
1736 columns.extend(left_columns);
1737 columns.extend(right_columns);
1738
1739 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1740 let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1741
1742 for left_row in &left_rows {
1743 let mut matched = false;
1744 for right_row in &right_rows {
1745 combined.clear();
1746 combined.extend_from_slice(left_row);
1747 combined.extend_from_slice(right_row);
1748 let keep = match kind {
1749 JoinKind::Cross => true,
1750 JoinKind::Inner | JoinKind::LeftOuter => match on {
1751 Some(pred) => eval_predicate(pred, &combined, &columns),
1752 None => true,
1753 },
1754 JoinKind::RightOuter => {
1755 unreachable!("planner rewrites RightOuter to LeftOuter")
1756 }
1757 };
1758 if keep {
1759 rows.push(combined.clone());
1760 check_join_limit(rows.len())?;
1761 matched = true;
1762 }
1763 }
1764 if !matched && matches!(kind, JoinKind::LeftOuter) {
1765 let mut row = Vec::with_capacity(n_left + n_right);
1766 row.extend_from_slice(left_row);
1767 row.resize(n_left + n_right, Value::Empty);
1768 rows.push(row);
1769 check_join_limit(rows.len())?;
1770 }
1771 }
1772
1773 Ok(QueryResult::Rows { columns, rows })
1774 }
1775
1776 PlanNode::Window { input, windows } => {
1777 let result = self.execute_plan_readonly(input)?;
1778 execute_window(result, windows)
1779 }
1780
1781 PlanNode::Union { left, right, all } => {
1782 let left_result = self.execute_plan_readonly(left)?;
1783 let right_result = self.execute_plan_readonly(right)?;
1784 let (left_cols, left_rows) = match left_result {
1785 QueryResult::Rows { columns, rows } => (columns, rows),
1786 _ => return Err("UNION requires query results on left side".into()),
1787 };
1788 let (_, right_rows) = match right_result {
1789 QueryResult::Rows { columns, rows } => (columns, rows),
1790 _ => return Err("UNION requires query results on right side".into()),
1791 };
1792 let mut combined = left_rows;
1793 if *all {
1794 combined.extend(right_rows);
1795 } else {
1796 let mut seen = std::collections::HashSet::new();
1797 for row in &combined {
1798 seen.insert(row.clone());
1799 }
1800 for row in right_rows {
1801 if seen.insert(row.clone()) {
1802 combined.push(row);
1803 }
1804 }
1805 }
1806 Ok(QueryResult::Rows {
1807 columns: left_cols,
1808 rows: combined,
1809 })
1810 }
1811
1812 PlanNode::Explain { input } => {
1813 let text = format_plan_tree(input, 0);
1814 Ok(QueryResult::Rows {
1815 columns: vec!["plan".to_string()],
1816 rows: text
1817 .lines()
1818 .map(|line| vec![Value::Str(line.to_string())])
1819 .collect(),
1820 })
1821 }
1822
1823 // All write variants — caller must escalate to the write lock.
1824 PlanNode::Insert { .. }
1825 | PlanNode::Update { .. }
1826 | PlanNode::Delete { .. }
1827 | PlanNode::Upsert { .. }
1828 | PlanNode::CreateTable { .. }
1829 | PlanNode::AlterTable { .. }
1830 | PlanNode::DropTable { .. }
1831 | PlanNode::CreateView { .. }
1832 | PlanNode::RefreshView { .. }
1833 | PlanNode::DropView { .. }
1834 | PlanNode::Begin
1835 | PlanNode::Commit
1836 | PlanNode::Rollback => Err(QueryError::ReadonlyNeedsWrite),
1837 }
1838 }
1839
1840 /// `&self` variant of [`Engine::materialize_subqueries`]. Used by the
1841 /// read path so `Filter` predicates with `InSubquery`/`ExistsSubquery`
1842 /// children can evaluate their inner queries without taking the write
1843 /// lock. Inner queries that would themselves need a write (e.g. dirty
1844 /// view) escalate via [`READONLY_NEEDS_WRITE`] just like the top-level
1845 /// read path does.
1846 fn materialize_subqueries_readonly(&self, expr: &Expr) -> Result<Expr, QueryError> {
1847 match expr {
1848 Expr::InSubquery {
1849 expr: inner,
1850 subquery,
1851 negated,
1852 } => {
1853 if is_correlated_subquery(subquery, &self.catalog) {
1854 // Pass through — will be materialized per-row in the
1855 // Filter handler's correlated subquery path.
1856 let inner = self.materialize_subqueries_readonly(inner)?;
1857 return Ok(Expr::InSubquery {
1858 expr: Box::new(inner),
1859 subquery: subquery.clone(),
1860 negated: *negated,
1861 });
1862 }
1863 let inner = self.materialize_subqueries_readonly(inner)?;
1864 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1865 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1866 let result = self.execute_plan_readonly(&sub_plan)?;
1867 let values = match result {
1868 QueryResult::Rows { rows, .. } => rows
1869 .into_iter()
1870 .filter_map(|mut row| {
1871 if row.is_empty() {
1872 None
1873 } else {
1874 Some(value_to_expr(row.swap_remove(0)))
1875 }
1876 })
1877 .collect(),
1878 _ => Vec::new(),
1879 };
1880 // WS2: byte-budget guard on the materialized IN-list.
1881 self.charge_in_list(&values)?;
1882 Ok(Expr::InList {
1883 expr: Box::new(inner),
1884 list: values,
1885 negated: *negated,
1886 })
1887 }
1888 Expr::ExistsSubquery { subquery, negated } => {
1889 if is_correlated_subquery(subquery, &self.catalog) {
1890 return Ok(expr.clone());
1891 }
1892 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1893 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1894 let result = self.execute_plan_readonly(&sub_plan)?;
1895 let has_rows = match result {
1896 QueryResult::Rows { rows, .. } => !rows.is_empty(),
1897 _ => false,
1898 };
1899 let truth = if *negated { !has_rows } else { has_rows };
1900 Ok(Expr::Literal(Literal::Bool(truth)))
1901 }
1902 Expr::BinaryOp(l, op, r) => {
1903 let l = self.materialize_subqueries_readonly(l)?;
1904 let r = self.materialize_subqueries_readonly(r)?;
1905 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1906 }
1907 Expr::UnaryOp(op, inner) => {
1908 let inner = self.materialize_subqueries_readonly(inner)?;
1909 Ok(Expr::UnaryOp(*op, Box::new(inner)))
1910 }
1911 Expr::Case { whens, else_expr } => {
1912 let whens = whens
1913 .iter()
1914 .map(|(c, r)| {
1915 let c = self.materialize_subqueries_readonly(c)?;
1916 let r = self.materialize_subqueries_readonly(r)?;
1917 Ok((Box::new(c), Box::new(r)))
1918 })
1919 .collect::<Result<Vec<_>, QueryError>>()?;
1920 let else_expr = match else_expr {
1921 Some(e) => Some(Box::new(self.materialize_subqueries_readonly(e)?)),
1922 None => None,
1923 };
1924 Ok(Expr::Case { whens, else_expr })
1925 }
1926 other => Ok(other.clone()),
1927 }
1928 }
1929
1930 /// Per-row materialisation of correlated subqueries. For each row in the
1931 /// outer query, substitute outer column references in the subquery's
1932 /// filter with the current row's literal values, execute the modified
1933 /// subquery, and return the result as an InList or Bool literal.
1934 fn materialize_correlated_for_row_readonly(
1935 &self,
1936 expr: &Expr,
1937 outer_row: &[Value],
1938 outer_columns: &[String],
1939 ) -> Result<Expr, QueryError> {
1940 match expr {
1941 Expr::InSubquery {
1942 expr: inner,
1943 subquery,
1944 negated,
1945 } => {
1946 let inner =
1947 self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1948 let mut sub = *subquery.clone();
1949 if let Some(ref filter) = sub.filter {
1950 sub.filter = Some(substitute_outer_refs(
1951 filter,
1952 &sub.source,
1953 &self.catalog,
1954 outer_row,
1955 outer_columns,
1956 ));
1957 }
1958 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1959 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1960 let result = self.execute_plan_readonly(&sub_plan)?;
1961 let values = match result {
1962 QueryResult::Rows { rows, .. } => rows
1963 .into_iter()
1964 .filter_map(|mut row| {
1965 if row.is_empty() {
1966 None
1967 } else {
1968 Some(value_to_expr(row.swap_remove(0)))
1969 }
1970 })
1971 .collect(),
1972 _ => Vec::new(),
1973 };
1974 // WS2: byte-budget guard on the per-row materialized IN-list.
1975 self.charge_in_list(&values)?;
1976 Ok(Expr::InList {
1977 expr: Box::new(inner),
1978 list: values,
1979 negated: *negated,
1980 })
1981 }
1982 Expr::ExistsSubquery { subquery, negated } => {
1983 let mut sub = *subquery.clone();
1984 if let Some(ref filter) = sub.filter {
1985 sub.filter = Some(substitute_outer_refs(
1986 filter,
1987 &sub.source,
1988 &self.catalog,
1989 outer_row,
1990 outer_columns,
1991 ));
1992 }
1993 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1994 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1995 let result = self.execute_plan_readonly(&sub_plan)?;
1996 let has_rows = match result {
1997 QueryResult::Rows { rows, .. } => !rows.is_empty(),
1998 _ => false,
1999 };
2000 let truth = if *negated { !has_rows } else { has_rows };
2001 Ok(Expr::Literal(Literal::Bool(truth)))
2002 }
2003 Expr::BinaryOp(l, op, r) => {
2004 let l =
2005 self.materialize_correlated_for_row_readonly(l, outer_row, outer_columns)?;
2006 let r =
2007 self.materialize_correlated_for_row_readonly(r, outer_row, outer_columns)?;
2008 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
2009 }
2010 Expr::UnaryOp(op, inner) => {
2011 let inner =
2012 self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
2013 Ok(Expr::UnaryOp(*op, Box::new(inner)))
2014 }
2015 other => Ok(other.clone()),
2016 }
2017 }
2018
2019 pub fn catalog(&self) -> &Catalog {
2020 &self.catalog
2021 }
2022
2023 pub fn catalog_mut(&mut self) -> &mut Catalog {
2024 &mut self.catalog
2025 }
2026}