Skip to main content

powdb_query/executor/
mod.rs

1//! PowDB query executor.
2
3// Submodules that don't use macros defined in this file.
4mod compiled;
5mod eval;
6pub mod mem_budget;
7
8use crate::ast::*;
9use crate::canonicalize::canonicalize;
10use crate::plan::*;
11use crate::plan_cache::PlanCache;
12use crate::planner;
13use crate::result::{QueryError, QueryResult};
14use powdb_storage::catalog::Catalog;
15use powdb_storage::row::{decode_column, decode_row, RowLayout, ROW_MAGIC, ROW_PREFIX_SIZE};
16use powdb_storage::types::*;
17use powdb_storage::view::ViewRegistry;
18
19use std::io;
20use std::path::Path;
21use std::sync::Mutex;
22use std::time::Instant;
23use tracing::{error, info, Level};
24
25use self::compiled::*;
26use self::eval::*;
27
28/// Legacy sentinel string constant — kept for backward compatibility with
29/// any external code matching on the string representation. New code should
30/// match on `QueryError::ReadonlyNeedsWrite` directly.
31pub const READONLY_NEEDS_WRITE: &str = "__POWDB_READONLY_NEEDS_WRITE__";
32
33/// Return the byte offset where the row body starts.
34///
35/// v0.5 rows begin with the `PROW` magic/version prefix. Legacy rows start
36/// directly with the row body. Raw executor fast paths must add this base
37/// before reading body-relative bitmap/data offsets.
38#[inline]
39pub(crate) fn row_body_base(row: &[u8]) -> usize {
40    if row.len() >= ROW_PREFIX_SIZE && &row[0..4] == ROW_MAGIC {
41        ROW_PREFIX_SIZE
42    } else {
43        0
44    }
45}
46
47/// Query frontend dialect. PowQL remains the default/native dialect; SQL is
48/// an explicit frontend that lowers to the same AST before planning.
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum QueryDialect {
51    PowQL,
52    Sql,
53}
54
55/// Plan cache capacity. Bench workloads fill ~15 slots; real apps will sit
56/// comfortably in 256. Lookup is O(1), collisions clear the cache (see
57/// `plan_cache::PlanCache::insert`).
58const PLAN_CACHE_CAPACITY: usize = 256;
59
60/// Maximum number of rows a join may produce before the executor aborts.
61/// Prevents Cartesian-product blowups (e.g. `T cross join T` on 10K rows
62/// would produce 100M rows in memory without this cap).
63pub(super) const MAX_JOIN_ROWS: usize = 1_000_000;
64
65/// Maximum number of rows that may be materialized for sorting.
66/// Queries that exceed this should add a LIMIT clause to narrow the input
67/// before sorting.
68pub(super) const MAX_SORT_ROWS: usize = 10_000_000;
69
70#[inline]
71pub(super) fn check_join_limit(row_count: usize) -> Result<(), QueryError> {
72    if row_count > MAX_JOIN_ROWS {
73        return Err(QueryError::JoinLimitExceeded);
74    }
75    Ok(())
76}
77
78// ─── Mission D11 Phase 1: scalar hot-loop helpers ─────────────────────────
79//
80// These macros expand into the scan body of `agg_single_col_fast` and sit
81// inside the `for_each_row_raw` closure. They exist to:
82//
83//   1. Split the loop on presence of a predicate *outside* the hot body,
84//      so the no-predicate path (agg_sum/agg_min/agg_max bench workloads)
85//      never pays the `Option<CompiledPredicate>` branch per row.
86//   2. Drop two bounds checks per row by reading the null bitmap byte
87//      and the 8-byte value via raw pointer casts.
88//
89// SAFETY (shared across every call site below):
90//
91//   - `$bmp_byte` is `col_idx / 8` where `col_idx < n_cols`, and the row body
92//     encoding stores `bitmap_size = n_cols.div_ceil(8)` bytes of bitmap
93//     starting at body offset 2. So `bmp_off = row_body_base(row) + 2 +
94//     $bmp_byte < row_len`, and `get_unchecked(bmp_off)` is inside the
95//     row slice.
96//   - `$off = 2 + bitmap_size + fixed_offsets[col_idx]` is body-relative for a fixed-size
97//     column. Every fixed-size column contributes `fixed_size(type_id)`
98//     bytes to the fixed region, so the row always has
99//     `[data_off .. data_off + 8]` available for any i64/f64 column, where
100//     `data_off = row_body_base(row) + $off` — enforced by the row encoder
101//     (`storage/src/row.rs`) and the schema invariant that a row with a
102//     given schema has enough body bytes for `2 + bitmap_size + fixed_region_size`.
103//   - Both macros are only invoked from `agg_single_col_fast`, which
104//     early-returns if the column isn't Int/Float (8-byte fixed) and
105//     early-returns if `fast.fixed_offsets[col_idx]` is `None`.
106macro_rules! agg_int_loop {
107    (
108        $self:expr, $table:expr, $pred:expr,
109        $bmp_byte:expr, $bmp_bit:expr, $off:expr,
110        |$v:ident : i64| $body:block
111    ) => {{
112        let bmp_byte = $bmp_byte;
113        let bmp_bit = $bmp_bit;
114        let off = $off;
115        if let Some(pred) = &$pred {
116            $self
117                .catalog
118                .for_each_row_raw($table, |_rid, data| {
119                    if !pred(data) {
120                        return;
121                    }
122                    let base = row_body_base(data);
123                    let bmp_off = base + 2 + bmp_byte;
124                    let data_off = base + off;
125                    // Bounds guard: skip corrupt/truncated rows that are too
126                    // short to contain the bitmap byte or the 8-byte value.
127                    if bmp_off >= data.len() || data_off + 8 > data.len() {
128                        return;
129                    }
130                    // SAFETY: `bmp_off < data.len()` is checked above.
131                    // The bitmap byte lives at body offset 2..2+bitmap_size in
132                    // the row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
133                    // Corrupt rows are rejected by the bounds guard.
134                    let bmp = unsafe { *data.get_unchecked(bmp_off) };
135                    if (bmp >> bmp_bit) & 1 == 1 {
136                        return;
137                    }
138                    // SAFETY: `data_off + 8 <= data.len()` is checked above.
139                    // `data_off = base + 2 + bitmap_size + fixed_offsets[col_idx]`
140                    // points to an 8-byte i64 in the fixed-size region of the row.
141                    // The pointer cast is valid because we read exactly 8
142                    // bytes via from_le_bytes. Corrupt rows are rejected by
143                    // the bounds guard.
144                    let $v: i64 = unsafe {
145                        i64::from_le_bytes(*(data.as_ptr().add(data_off) as *const [u8; 8]))
146                    };
147                    $body
148                })
149                .map_err(|e| QueryError::StorageError(e.to_string()))?;
150        } else {
151            $self
152                .catalog
153                .for_each_row_raw($table, |_rid, data| {
154                    let base = row_body_base(data);
155                    let bmp_off = base + 2 + bmp_byte;
156                    let data_off = base + off;
157                    // Bounds guard: skip corrupt/truncated rows.
158                    if bmp_off >= data.len() || data_off + 8 > data.len() {
159                        return;
160                    }
161                    // SAFETY: `bmp_off < data.len()` is checked above.
162                    // See the predicate branch for the full invariant.
163                    let bmp = unsafe { *data.get_unchecked(bmp_off) };
164                    if (bmp >> bmp_bit) & 1 == 1 {
165                        return;
166                    }
167                    // SAFETY: `data_off + 8 <= data.len()` is checked above.
168                    // See the predicate branch for the full invariant.
169                    let $v: i64 = unsafe {
170                        i64::from_le_bytes(*(data.as_ptr().add(data_off) as *const [u8; 8]))
171                    };
172                    $body
173                })
174                .map_err(|e| QueryError::StorageError(e.to_string()))?;
175        }
176    }};
177}
178
179macro_rules! agg_float_loop {
180    (
181        $self:expr, $table:expr, $pred:expr,
182        $bmp_byte:expr, $bmp_bit:expr, $off:expr,
183        |$v:ident : f64| $body:block
184    ) => {{
185        let bmp_byte = $bmp_byte;
186        let bmp_bit = $bmp_bit;
187        let off = $off;
188        if let Some(pred) = &$pred {
189            $self
190                .catalog
191                .for_each_row_raw($table, |_rid, data| {
192                    if !pred(data) {
193                        return;
194                    }
195                    let base = row_body_base(data);
196                    let bmp_off = base + 2 + bmp_byte;
197                    let data_off = base + off;
198                    // Bounds guard: skip corrupt/truncated rows that are too
199                    // short to contain the bitmap byte or the 8-byte value.
200                    if bmp_off >= data.len() || data_off + 8 > data.len() {
201                        return;
202                    }
203                    // SAFETY: `bmp_off < data.len()` is checked above.
204                    // The bitmap byte lives at body offset 2..2+bitmap_size in
205                    // the row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
206                    // Corrupt rows are rejected by the bounds guard.
207                    let bmp = unsafe { *data.get_unchecked(bmp_off) };
208                    if (bmp >> bmp_bit) & 1 == 1 {
209                        return;
210                    }
211                    // SAFETY: `data_off + 8 <= data.len()` is checked above.
212                    // `data_off = base + 2 + bitmap_size + fixed_offsets[col_idx]`
213                    // points to an 8-byte f64 in the fixed-size region of the row.
214                    // The pointer cast is valid because we read exactly 8
215                    // bytes via from_le_bytes. Corrupt rows are rejected by
216                    // the bounds guard.
217                    let $v: f64 = unsafe {
218                        f64::from_le_bytes(*(data.as_ptr().add(data_off) as *const [u8; 8]))
219                    };
220                    $body
221                })
222                .map_err(|e| QueryError::StorageError(e.to_string()))?;
223        } else {
224            $self
225                .catalog
226                .for_each_row_raw($table, |_rid, data| {
227                    let base = row_body_base(data);
228                    let bmp_off = base + 2 + bmp_byte;
229                    let data_off = base + off;
230                    // Bounds guard: skip corrupt/truncated rows.
231                    if bmp_off >= data.len() || data_off + 8 > data.len() {
232                        return;
233                    }
234                    // SAFETY: `bmp_off < data.len()` is checked above.
235                    // See the predicate branch for the full invariant.
236                    let bmp = unsafe { *data.get_unchecked(bmp_off) };
237                    if (bmp >> bmp_bit) & 1 == 1 {
238                        return;
239                    }
240                    // SAFETY: `data_off + 8 <= data.len()` is checked above.
241                    // See the predicate branch for the full invariant.
242                    let $v: f64 = unsafe {
243                        f64::from_le_bytes(*(data.as_ptr().add(data_off) as *const [u8; 8]))
244                    };
245                    $body
246                })
247                .map_err(|e| QueryError::StorageError(e.to_string()))?;
248        }
249    }};
250}
251
252// Submodules that use the macros above — must be declared after macro_rules!.
253mod plan_exec;
254mod prepared;
255
256#[cfg(test)]
257mod tests;
258
259// Re-exports for the public API
260pub use self::prepared::PreparedQuery;
261
262use self::plan_exec::{
263    compute_group_aggregate, execute_window, format_plan_tree, hash_join, lower_unindexed_scans,
264    range_matches, synthesize_range_predicate, try_extract_equi_join_keys,
265};
266
267/// Mission infra-1: classify a parsed statement as read-only vs. mutating.
268/// Used by [`Engine::execute_powql_readonly`] and by the server handler
269/// to decide between the RwLock reader and writer sides. `Union` recurses
270/// because each side can independently be read/write (though in practice
271/// both sides are reads — the parser only builds Union from query shapes).
272pub fn is_read_only_statement(stmt: &Statement) -> bool {
273    match stmt {
274        Statement::Query(_) => true,
275        Statement::Union(u) => is_read_only_statement(&u.left) && is_read_only_statement(&u.right),
276        Statement::Insert(_)
277        | Statement::Upsert(_)
278        | Statement::UpdateQuery(_)
279        | Statement::DeleteQuery(_)
280        | Statement::CreateType(_)
281        | Statement::AlterTable(_)
282        | Statement::DropTable(_)
283        | Statement::CreateView(_)
284        | Statement::RefreshView(_)
285        | Statement::DropView(_) => false,
286        Statement::Begin | Statement::Commit | Statement::Rollback => false,
287        Statement::Explain(inner) => is_read_only_statement(inner),
288    }
289}
290
291pub struct Engine {
292    catalog: Catalog,
293    /// Mission D9 — cached parsed+planned query trees keyed by canonical
294    /// hash. Saves the ~3μs parse+plan cost on repeat queries that differ
295    /// only in literal values.
296    ///
297    /// Mission infra-1: wrapped in `Mutex` so the read path can be driven
298    /// by `&self`. The critical section is extremely short — a single
299    /// hashmap lookup + plan clone on a hit, or a single insert on a miss.
300    /// A full `RwLock` would be over-engineered here; the contention window
301    /// is smaller than the read-path scan work it gates.
302    plan_cache: Mutex<PlanCache>,
303    /// Mission C Phase 13: reusable `Vec<Value>` scratch buffer for the
304    /// prepared-insert fast path. `execute_prepared` used to allocate a
305    /// fresh `vec![Value::Empty; n_cols]` on every insert; recycling this
306    /// buffer shaves one heap alloc per row on `insert_batch_1k`.
307    insert_values_scratch: Vec<Value>,
308    /// Materialized view registry: tracks view definitions, dependencies,
309    /// and dirty state. Views are backed by regular catalog tables; this
310    /// registry adds the lifecycle metadata.
311    view_registry: ViewRegistry,
312    in_transaction: bool,
313    /// WS2 — per-query memory budget ceiling (bytes). The running total lives
314    /// in a thread-local (see [`mem_budget`]) and is reset at every top-level
315    /// query entry, so sort/join/GROUP BY/IN-list materialization can be capped
316    /// without OOM-killing the process. This field holds only the *limit* (a
317    /// plain `usize`, so `Engine` stays `Sync` for the concurrent read path).
318    /// Default [`mem_budget::DEFAULT_QUERY_MEMORY_LIMIT`] (256 MB); overridable
319    /// via `Engine::with_memory_limit` (server reads `POWDB_QUERY_MEMORY_LIMIT`).
320    query_memory_limit: usize,
321}
322
323impl Engine {
324    /// Open or create a PowDB engine rooted at `data_dir`.
325    ///
326    /// If the directory already contains a catalog, it is reopened.
327    /// Otherwise a fresh empty database is created.
328    ///
329    /// # Examples
330    ///
331    /// ```
332    /// use powdb_query::executor::Engine;
333    ///
334    /// let dir = tempfile::tempdir().unwrap();
335    /// let engine = Engine::new(dir.path()).unwrap();
336    /// // Engine is ready — the directory now contains a catalog.
337    /// ```
338    pub fn new(data_dir: &Path) -> io::Result<Self> {
339        powdb_storage::create_data_dir_secure(data_dir)?;
340        // Try to reopen an existing database first; only create a fresh
341        // catalog when there isn't one already on disk.
342        let catalog = match Catalog::open(data_dir) {
343            Ok(c) => {
344                info!(data_dir = %data_dir.display(), "engine reopened existing database");
345                c
346            }
347            Err(e) if e.kind() == io::ErrorKind::NotFound => {
348                info!(data_dir = %data_dir.display(), "engine initialized fresh database");
349                Catalog::create(data_dir)?
350            }
351            Err(e) => return Err(e),
352        };
353        let view_registry =
354            ViewRegistry::open(data_dir).unwrap_or_else(|_| ViewRegistry::new(data_dir));
355        Ok(Engine {
356            catalog,
357            plan_cache: Mutex::new(PlanCache::new(PLAN_CACHE_CAPACITY)),
358            insert_values_scratch: Vec::new(),
359            view_registry,
360            in_transaction: false,
361            query_memory_limit: mem_budget::DEFAULT_QUERY_MEMORY_LIMIT,
362        })
363    }
364
365    /// Open or create an engine with an explicit per-query memory limit
366    /// (bytes). Used by the server to apply `POWDB_QUERY_MEMORY_LIMIT`, and by
367    /// tests that need a tiny limit to exercise the budget guard.
368    pub fn with_memory_limit(data_dir: &Path, limit_bytes: usize) -> io::Result<Self> {
369        let mut engine = Engine::new(data_dir)?;
370        engine.set_query_memory_limit(limit_bytes);
371        Ok(engine)
372    }
373
374    /// Current per-query memory limit in bytes.
375    pub fn query_memory_limit(&self) -> usize {
376        self.query_memory_limit
377    }
378
379    /// Override the per-query memory limit in bytes (builder-style).
380    pub fn set_query_memory_limit(&mut self, limit_bytes: usize) {
381        self.query_memory_limit = limit_bytes;
382    }
383
384    /// Enter a budgeted-statement frame for the current query. The returned
385    /// guard must be held for the duration of the statement; on its drop the
386    /// reentrancy depth is decremented. Only the *outermost* statement entry
387    /// zeroes this thread's running total, so a nested `execute_powql` (the
388    /// source query of a `create_view`/`refresh_view`) does NOT discard the
389    /// outer frame's accounting. The accumulator is thread-local, so this never
390    /// touches another concurrent query's total.
391    #[must_use = "the budget guard must outlive the statement body"]
392    pub(super) fn enter_memory_budget(&self) -> mem_budget::EnterGuard {
393        mem_budget::enter()
394    }
395
396    /// Charge the estimated footprint of a freshly materialized batch of rows
397    /// against the current per-query budget. Returns
398    /// [`QueryError::MemoryLimitExceeded`] cleanly if the batch would push the
399    /// query over its limit. Used at every full-materialization point (sort
400    /// buffer, join build side, GROUP BY hash table, IN-list).
401    pub(super) fn charge_rows(&self, rows: &[Vec<Value>]) -> Result<(), QueryError> {
402        let mut total = 0usize;
403        for row in rows {
404            total = total.saturating_add(mem_budget::estimate_row_size(row));
405        }
406        mem_budget::charge(total, self.query_memory_limit)
407    }
408
409    /// Charge a materialized IN-list (the literal expressions pulled out of an
410    /// uncorrelated `IN (subquery)`) against the current per-query budget.
411    /// Each item is conservatively sized at the `Expr` slot plus, for string
412    /// literals, the owned heap bytes.
413    pub(super) fn charge_in_list(&self, list: &[crate::ast::Expr]) -> Result<(), QueryError> {
414        let base = std::mem::size_of::<crate::ast::Expr>();
415        let mut total = std::mem::size_of::<Vec<crate::ast::Expr>>();
416        for item in list {
417            total = total.saturating_add(base);
418            if let crate::ast::Expr::Literal(crate::ast::Literal::String(s)) = item {
419                total = total.saturating_add(s.capacity());
420            }
421        }
422        mem_budget::charge(total, self.query_memory_limit)
423    }
424
425    /// Dispatch to the requested query frontend.
426    pub fn execute_with_dialect(
427        &mut self,
428        dialect: QueryDialect,
429        input: &str,
430    ) -> Result<QueryResult, QueryError> {
431        match dialect {
432            QueryDialect::PowQL => self.execute_powql(input),
433            QueryDialect::Sql => self.execute_sql(input),
434        }
435    }
436
437    /// Read-only variant of [`Engine::execute_with_dialect`].
438    pub fn execute_readonly_with_dialect(
439        &self,
440        dialect: QueryDialect,
441        input: &str,
442    ) -> Result<QueryResult, QueryError> {
443        match dialect {
444            QueryDialect::PowQL => self.execute_powql_readonly(input),
445            QueryDialect::Sql => self.execute_sql_readonly(input),
446        }
447    }
448
449    /// Parse + plan + execute a PowQL query.
450    ///
451    /// # Examples
452    ///
453    /// ```
454    /// use powdb_query::executor::Engine;
455    /// use powdb_query::result::QueryResult;
456    ///
457    /// let dir = tempfile::tempdir().unwrap();
458    /// let mut engine = Engine::new(dir.path()).unwrap();
459    ///
460    /// // Create a table and insert a row.
461    /// engine.execute_powql("type User { required name: str, age: int }").unwrap();
462    /// engine.execute_powql(r#"insert User { name := "Alice", age := 30 }"#).unwrap();
463    ///
464    /// // Query rows back.
465    /// let result = engine.execute_powql("User").unwrap();
466    /// assert_eq!(result.row_count(), 1);
467    /// ```
468    ///
469    /// Mission D6 — tracing collapse: the previous implementation ran 4
470    /// `Instant::now()` + 3 `elapsed().as_micros()` calls + formatted an
471    /// `info!` span on every query, even when tracing was disabled. On a
472    /// sub-microsecond `point_lookup_indexed` call that overhead was
473    /// 100-200ns — 20%+ of the whole query. We now measure time only when
474    /// INFO is actually enabled via `tracing::enabled!`, and we moved the
475    /// noisy `debug!(?plan)` line behind the same gate so the Debug
476    /// formatter can't run unconditionally either.
477    ///
478    /// Mission D9 — plan cache: on the hot path we canonicalise the query
479    /// text (lex + FNV-1a hash with literal values stripped), check the
480    /// cache, and on a hit substitute the new literals into a clone of the
481    /// cached plan. This skips re-lexing, re-parsing, and re-planning —
482    /// around 3μs per call on bench workloads. On a miss we plan as before
483    /// and insert the plan under its canonical hash.
484    pub fn execute_powql(&mut self, input: &str) -> Result<QueryResult, QueryError> {
485        // WS2: each *outermost* statement starts with the full memory
486        // allowance. The guard holds the reentrancy depth so a nested
487        // `execute_powql` (e.g. a view's source query) does not reset the
488        // outer frame's accounting mid-statement.
489        let _budget = self.enter_memory_budget();
490        // Hot path: tracing disabled. Zero syscalls, zero formatting.
491        if !tracing::enabled!(Level::INFO) {
492            // D9: try the plan cache first. Canonicalisation lexes the
493            // query once; on a hit we skip the parser and planner entirely.
494            if let Ok((hash, literals)) = canonicalize(input) {
495                let cached = self
496                    .plan_cache
497                    .lock()
498                    .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
499                    .get_with_substitution(hash, &literals);
500                if let Some(plan) = cached {
501                    let plan = lower_unindexed_scans(&self.catalog, &plan);
502                    let result = self.execute_plan(&plan);
503                    // Mission B (post-review): statement-boundary WAL
504                    // group commit. Catalog::wal_log now only appends;
505                    // the fsync happens here exactly once per statement.
506                    // `sync_wal` is a no-op when nothing was buffered
507                    // (pure reads pay zero fsync).
508                    if !self.in_transaction {
509                        self.catalog
510                            .commit_autocommit()
511                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
512                    }
513                    return result;
514                }
515                // Miss — plan, insert, execute.
516                return match planner::plan(input) {
517                    Ok(plan) => {
518                        self.plan_cache
519                            .lock()
520                            .map_err(|e| {
521                                QueryError::Execution(format!("plan cache lock poisoned: {e}"))
522                            })?
523                            .insert(hash, plan.clone());
524                        let plan = lower_unindexed_scans(&self.catalog, &plan);
525                        let result = self.execute_plan(&plan);
526                        if !self.in_transaction {
527                            self.catalog
528                                .commit_autocommit()
529                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
530                        }
531                        result
532                    }
533                    Err(e) => Err(QueryError::Parse(e.to_string())),
534                };
535            }
536            // Lex error — fall through to the planner so the caller gets a
537            // consistent error shape.
538            return match planner::plan(input) {
539                Ok(plan) => {
540                    let plan = lower_unindexed_scans(&self.catalog, &plan);
541                    let result = self.execute_plan(&plan);
542                    if !self.in_transaction {
543                        self.catalog
544                            .commit_autocommit()
545                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
546                    }
547                    result
548                }
549                Err(e) => Err(QueryError::Parse(e.to_string())),
550            };
551        }
552
553        // Instrumented path — only taken under explicit tracing subscribers.
554        let total_start = Instant::now();
555        let plan_start = Instant::now();
556        let plan = planner::plan(input).map_err(|e| {
557            let msg = e.to_string();
558            error!(query = %input, error = %msg, "query plan failed");
559            QueryError::Parse(msg)
560        })?;
561        let plan_us = plan_start.elapsed().as_micros();
562
563        let exec_start = Instant::now();
564        let plan = lower_unindexed_scans(&self.catalog, &plan);
565        let result = self.execute_plan(&plan);
566        if !self.in_transaction {
567            self.catalog
568                .commit_autocommit()
569                .map_err(|e| QueryError::StorageError(e.to_string()))?;
570        }
571        let exec_us = exec_start.elapsed().as_micros();
572
573        let total_us = total_start.elapsed().as_micros();
574        match &result {
575            Ok(r) => {
576                info!(
577                    query = %input,
578                    plan_us = plan_us,
579                    exec_us = exec_us,
580                    total_us = total_us,
581                    rows = r.row_count(),
582                    "query ok"
583                );
584            }
585            Err(e) => {
586                error!(
587                    query = %input,
588                    plan_us = plan_us,
589                    exec_us = exec_us,
590                    error = %e,
591                    "query failed"
592                );
593            }
594        }
595        result
596    }
597
598    /// Parse + plan + execute a SQL query through the SQL frontend.
599    ///
600    /// SQL is lowered to the existing PowDB AST and to canonical PowQL text.
601    /// The canonical PowQL text is used as the plan-cache key, so equivalent
602    /// SQL and PowQL spellings share cached plans.
603    pub fn execute_sql(&mut self, input: &str) -> Result<QueryResult, QueryError> {
604        let _budget = self.enter_memory_budget();
605        let parsed = crate::sql::parse_sql_with_canonical(input)
606            .map_err(|e| QueryError::Parse(e.to_string()))?;
607
608        if !tracing::enabled!(Level::INFO) {
609            if let Ok((hash, literals)) = canonicalize(&parsed.canonical_powql) {
610                let cached = self
611                    .plan_cache
612                    .lock()
613                    .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
614                    .get_with_substitution(hash, &literals);
615                if let Some(plan) = cached {
616                    let plan = lower_unindexed_scans(&self.catalog, &plan);
617                    let result = self.execute_plan(&plan);
618                    if !self.in_transaction {
619                        self.catalog
620                            .commit_autocommit()
621                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
622                    }
623                    return result;
624                }
625
626                let plan = crate::planner::plan_statement(parsed.statement)
627                    .map_err(|e| QueryError::Parse(e.to_string()))?;
628                self.plan_cache
629                    .lock()
630                    .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
631                    .insert(hash, plan.clone());
632                let plan = lower_unindexed_scans(&self.catalog, &plan);
633                let result = self.execute_plan(&plan);
634                if !self.in_transaction {
635                    self.catalog
636                        .commit_autocommit()
637                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
638                }
639                return result;
640            }
641        }
642
643        let plan = crate::planner::plan_statement(parsed.statement)
644            .map_err(|e| QueryError::Parse(e.to_string()))?;
645        let plan = lower_unindexed_scans(&self.catalog, &plan);
646        let result = self.execute_plan(&plan);
647        if !self.in_transaction {
648            self.catalog
649                .commit_autocommit()
650                .map_err(|e| QueryError::StorageError(e.to_string()))?;
651        }
652        result
653    }
654
655    /// Read-only variant of [`Engine::execute_sql`].
656    pub fn execute_sql_readonly(&self, input: &str) -> Result<QueryResult, QueryError> {
657        let _budget = self.enter_memory_budget();
658        let parsed = crate::sql::parse_sql_with_canonical(input)
659            .map_err(|e| QueryError::Parse(e.to_string()))?;
660        if !is_read_only_statement(&parsed.statement) {
661            return Err(QueryError::ReadonlyNeedsWrite);
662        }
663
664        if let Ok((hash, literals)) = canonicalize(&parsed.canonical_powql) {
665            let cached = self
666                .plan_cache
667                .lock()
668                .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
669                .get_with_substitution(hash, &literals);
670            if let Some(plan) = cached {
671                let plan = lower_unindexed_scans(&self.catalog, &plan);
672                return self.execute_plan_readonly(&plan);
673            }
674            let plan = crate::planner::plan_statement(parsed.statement)
675                .map_err(|e| QueryError::Parse(e.to_string()))?;
676            self.plan_cache
677                .lock()
678                .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
679                .insert(hash, plan.clone());
680            let plan = lower_unindexed_scans(&self.catalog, &plan);
681            return self.execute_plan_readonly(&plan);
682        }
683
684        let plan = crate::planner::plan_statement(parsed.statement)
685            .map_err(|e| QueryError::Parse(e.to_string()))?;
686        let plan = lower_unindexed_scans(&self.catalog, &plan);
687        self.execute_plan_readonly(&plan)
688    }
689
690    /// Execute PowQL with `$N` placeholders bound to positional `params`.
691    ///
692    /// Task 4: parameters are substituted as literal *tokens* before
693    /// parsing (see [`crate::parser::parse_with_params`]), so untrusted
694    /// input can never change the query's shape. This path deliberately
695    /// **bypasses the plan cache** — template caching is a follow-up — and
696    /// otherwise mirrors the non-cached tail of [`Engine::execute_powql`].
697    pub fn execute_powql_with_params(
698        &mut self,
699        input: &str,
700        params: &[crate::ast::ParamValue],
701    ) -> Result<QueryResult, QueryError> {
702        let _budget = self.enter_memory_budget();
703        let stmt = crate::parser::parse_with_params(input, params)
704            .map_err(|e| QueryError::Parse(e.to_string()))?;
705        let plan =
706            crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
707        let plan = lower_unindexed_scans(&self.catalog, &plan);
708        let result = self.execute_plan(&plan);
709        if !self.in_transaction {
710            self.catalog
711                .commit_autocommit()
712                .map_err(|e| QueryError::StorageError(e.to_string()))?;
713        }
714        result
715    }
716
717    /// Read-only variant of [`Engine::execute_powql_with_params`].
718    ///
719    /// Mirrors [`Engine::execute_powql_readonly`]: parses with bound
720    /// params, rejects any write statement with
721    /// [`QueryError::ReadonlyNeedsWrite`] so the caller can escalate to the
722    /// write lock, then executes under a shared borrow. No plan-cache
723    /// interaction.
724    pub fn execute_powql_readonly_with_params(
725        &self,
726        input: &str,
727        params: &[crate::ast::ParamValue],
728    ) -> Result<QueryResult, QueryError> {
729        let _budget = self.enter_memory_budget();
730        let stmt = crate::parser::parse_with_params(input, params)
731            .map_err(|e| QueryError::Parse(e.to_string()))?;
732        if !is_read_only_statement(&stmt) {
733            return Err(QueryError::ReadonlyNeedsWrite);
734        }
735        let plan =
736            crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
737        let plan = lower_unindexed_scans(&self.catalog, &plan);
738        self.execute_plan_readonly(&plan)
739    }
740
741    /// Plan cache stats — useful for benches and debugging.
742    pub fn plan_cache_stats(&self) -> (u64, u64, usize) {
743        let cache = self.plan_cache.lock().unwrap_or_else(|e| e.into_inner());
744        (cache.hits, cache.misses, cache.len())
745    }
746
747    /// Mission infra-1: read-only entry point.
748    ///
749    /// Parses + plans + executes a PowQL query using only a shared borrow
750    /// on the engine. Rejects any statement that would mutate state
751    /// (Insert/Update/Delete/CreateTable/AlterTable/DropTable/CreateView/
752    /// RefreshView/DropView) by returning [`READONLY_NEEDS_WRITE`] so the
753    /// caller can escalate to the write lock.
754    ///
755    /// Also returns [`READONLY_NEEDS_WRITE`] if a materialized view in the
756    /// query is dirty — refreshing one requires `&mut self`, so the caller
757    /// must retake the write lock for the first refresh.
758    ///
759    /// This method is the concurrent-read fast path behind
760    /// `Arc<RwLock<Engine>>`: multiple threads can call it simultaneously
761    /// under a shared `.read()` lock and each will scan independently.
762    pub fn execute_powql_readonly(&self, input: &str) -> Result<QueryResult, QueryError> {
763        // WS2: each *outermost* statement starts with the full memory
764        // allowance. The guard holds the reentrancy depth so a nested
765        // `execute_powql*` does not reset the outer frame's accounting.
766        let _budget = self.enter_memory_budget();
767        // Parse the statement first so we can classify read vs. write
768        // without touching the catalog. This is the same lex+parse cost
769        // the hot path would pay anyway.
770        let stmt = crate::parser::parse(input).map_err(|e| QueryError::Parse(e.to_string()))?;
771        if !is_read_only_statement(&stmt) {
772            return Err(QueryError::ReadonlyNeedsWrite);
773        }
774
775        // Try the plan cache first — identical hash scheme to
776        // `execute_powql` so both paths share cache state. The mutex
777        // section is just a hashmap lookup + plan clone.
778        if let Ok((hash, literals)) = canonicalize(input) {
779            let cached = self
780                .plan_cache
781                .lock()
782                .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
783                .get_with_substitution(hash, &literals);
784            if let Some(plan) = cached {
785                let plan = lower_unindexed_scans(&self.catalog, &plan);
786                return self.execute_plan_readonly(&plan);
787            }
788            // Miss: plan + insert + execute. The planner is pure, so this
789            // is safe from `&self`.
790            let plan = crate::planner::plan_statement(stmt)
791                .map_err(|e| QueryError::Parse(e.to_string()))?;
792            self.plan_cache
793                .lock()
794                .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
795                .insert(hash, plan.clone());
796            let plan = lower_unindexed_scans(&self.catalog, &plan);
797            return self.execute_plan_readonly(&plan);
798        }
799        // Lex error — fall through to the planner for a consistent error
800        // shape (though `parse` above would usually have caught it).
801        let plan =
802            crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
803        let plan = lower_unindexed_scans(&self.catalog, &plan);
804        self.execute_plan_readonly(&plan)
805    }
806
807    /// Read-only version of [`Engine::execute_plan`]. Dispatches the
808    /// read-path plan variants by calling `&self` helpers and errors with
809    /// [`READONLY_NEEDS_WRITE`] on any write variant. This is the
810    /// recursion target for composite read plans under the RwLock reader.
811    ///
812    /// The dispatch mirrors `execute_plan` for the read branches but does
813    /// not carry any of the fast-paths that need `&mut self` (e.g. plan-
814    /// cache mutation on inner subqueries is handled via the shared mutex
815    /// in [`Engine::execute_powql_readonly`]; in-flight subquery
816    /// materialisation uses [`Engine::materialize_subqueries_readonly`]).
817    fn execute_plan_readonly(&self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
818        match plan {
819            PlanNode::SeqScan { table } => {
820                // Dirty view means we'd need to refresh it — can't do that
821                // under `&self`. Escalate to the write path.
822                if self.view_registry.is_dirty(table) {
823                    return Err(QueryError::ReadonlyNeedsWrite);
824                }
825                let schema = self
826                    .catalog
827                    .schema(table)
828                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
829                    .clone();
830                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
831                let rows: Vec<Vec<Value>> = self
832                    .catalog
833                    .scan(table)
834                    .map_err(|e| e.to_string())?
835                    .map(|(_, row)| row)
836                    .collect();
837                Ok(QueryResult::Rows { columns, rows })
838            }
839
840            PlanNode::AliasScan { table, alias } => {
841                let schema = self
842                    .catalog
843                    .schema(table)
844                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
845                    .clone();
846                let columns: Vec<String> = schema
847                    .columns
848                    .iter()
849                    .map(|c| format!("{alias}.{}", c.name))
850                    .collect();
851                let rows: Vec<Vec<Value>> = self
852                    .catalog
853                    .scan(table)
854                    .map_err(|e| e.to_string())?
855                    .map(|(_, row)| row)
856                    .collect();
857                Ok(QueryResult::Rows { columns, rows })
858            }
859
860            PlanNode::IndexScan { table, column, key } => {
861                let schema = self
862                    .catalog
863                    .schema(table)
864                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
865                    .clone();
866                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
867                let key_value = literal_to_value(key)?;
868                let tbl = self
869                    .catalog
870                    .get_table(table)
871                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
872
873                if tbl.has_index(column) {
874                    // Use index_lookup_all to handle both unique and
875                    // non-unique indexes — returns all matching RowIds.
876                    let rids = tbl.index_lookup_all(column, &key_value);
877                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
878                    for rid in rids {
879                        if let Some(data) = tbl.heap.get(rid) {
880                            rows.push(decode_row(&tbl.schema, &data));
881                        }
882                    }
883                    return Ok(QueryResult::Rows { columns, rows });
884                }
885
886                // No index: synthetic eq predicate + compiled scan.
887                let fast = FastLayout::new(&schema);
888                let synth_pred = Expr::BinaryOp(
889                    Box::new(Expr::Field(column.clone())),
890                    BinOp::Eq,
891                    Box::new(key.clone()),
892                );
893                if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, &schema) {
894                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
895                    self.catalog
896                        .for_each_row_raw(table, |_rid, data| {
897                            if compiled(data) {
898                                rows.push(decode_row(&schema, data));
899                            }
900                        })
901                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
902                    return Ok(QueryResult::Rows { columns, rows });
903                }
904
905                // Last resort: slow eq-check.
906                let col_idx =
907                    schema
908                        .column_index(column)
909                        .ok_or_else(|| QueryError::ColumnNotFound {
910                            table: String::new(),
911                            column: column.clone(),
912                        })?;
913                let rows: Vec<Vec<Value>> = tbl
914                    .scan()
915                    .filter_map(|(_, row)| {
916                        if row[col_idx] == key_value {
917                            Some(row)
918                        } else {
919                            None
920                        }
921                    })
922                    .collect();
923                Ok(QueryResult::Rows { columns, rows })
924            }
925
926            PlanNode::RangeScan {
927                table,
928                column,
929                start,
930                end,
931            } => {
932                let tbl = self
933                    .catalog
934                    .get_table(table)
935                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
936                let columns: Vec<String> =
937                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
938                let schema = tbl.schema.clone();
939
940                let start_val = match start {
941                    Some((expr, _)) => Some(literal_to_value(expr)?),
942                    None => None,
943                };
944                let end_val = match end {
945                    Some((expr, _)) => Some(literal_to_value(expr)?),
946                    None => None,
947                };
948                let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
949                let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
950
951                // Range scans only use the btree fast path for unique indexes.
952                // Non-unique indexes store composite keys that don't compare
953                // directly against raw column values.
954                if tbl.is_index_unique(column) == Some(true) {
955                    if let Some(btree) = tbl.index(column) {
956                        let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
957                            (Some(s), Some(e)) => btree.range(s, e).collect(),
958                            (Some(s), None) => btree.range_from(s),
959                            (None, Some(e)) => btree.range_to(e),
960                            (None, None) => {
961                                // Unbounded both sides — equivalent to seq scan.
962                                let rows: Vec<Vec<Value>> =
963                                    tbl.scan().map(|(_, row)| row).collect();
964                                return Ok(QueryResult::Rows { columns, rows });
965                            }
966                        };
967                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
968                        for (key, rid) in hits {
969                            // Filter for exclusive bounds.
970                            if !start_inclusive {
971                                if let Some(ref s) = start_val {
972                                    if &key == s {
973                                        continue;
974                                    }
975                                }
976                            }
977                            if !end_inclusive {
978                                if let Some(ref e) = end_val {
979                                    if &key == e {
980                                        continue;
981                                    }
982                                }
983                            }
984                            if let Some(data) = tbl.heap.get(rid) {
985                                rows.push(decode_row(&schema, &data));
986                            }
987                        }
988                        return Ok(QueryResult::Rows { columns, rows });
989                    }
990                }
991
992                // Fallback: no index — synthesize the range predicate and scan.
993                let fast = FastLayout::new(&schema);
994                let synth = synthesize_range_predicate(column, start, end);
995                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, &schema) {
996                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
997                    self.catalog
998                        .for_each_row_raw(table, |_rid, data| {
999                            if compiled(data) {
1000                                rows.push(decode_row(&schema, data));
1001                            }
1002                        })
1003                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
1004                    return Ok(QueryResult::Rows { columns, rows });
1005                }
1006
1007                // Last resort: decoded row eval.
1008                let col_idx =
1009                    schema
1010                        .column_index(column)
1011                        .ok_or_else(|| QueryError::ColumnNotFound {
1012                            table: String::new(),
1013                            column: column.clone(),
1014                        })?;
1015                let rows: Vec<Vec<Value>> = tbl
1016                    .scan()
1017                    .filter(|(_, row)| {
1018                        range_matches(
1019                            &row[col_idx],
1020                            &start_val,
1021                            start_inclusive,
1022                            &end_val,
1023                            end_inclusive,
1024                        )
1025                    })
1026                    .map(|(_, row)| row)
1027                    .collect();
1028                Ok(QueryResult::Rows { columns, rows })
1029            }
1030
1031            PlanNode::Filter { input, predicate } => {
1032                // Materialise subqueries using the `&self` variant.
1033                // Uncorrelated subqueries are replaced with InList/Bool;
1034                // correlated ones are left as InSubquery/ExistsSubquery
1035                // for per-row materialisation below.
1036                let materialized;
1037                let predicate = if contains_subquery(predicate) {
1038                    materialized = self.materialize_subqueries_readonly(predicate)?;
1039                    &materialized
1040                } else {
1041                    predicate
1042                };
1043
1044                // Correlated subquery path: per-row materialisation.
1045                if contains_subquery(predicate) {
1046                    let result = self.execute_plan_readonly(input)?;
1047                    return match result {
1048                        QueryResult::Rows { columns, rows } => {
1049                            let mut filtered = Vec::new();
1050                            for row in rows {
1051                                let row_pred = self.materialize_correlated_for_row_readonly(
1052                                    predicate, &row, &columns,
1053                                )?;
1054                                if eval_predicate(&row_pred, &row, &columns) {
1055                                    filtered.push(row);
1056                                }
1057                            }
1058                            Ok(QueryResult::Rows {
1059                                columns,
1060                                rows: filtered,
1061                            })
1062                        }
1063                        _ => Err("filter requires row input".into()),
1064                    };
1065                }
1066
1067                // Fused Filter+SeqScan fast path.
1068                if let PlanNode::SeqScan { table } = input.as_ref() {
1069                    if self.view_registry.is_dirty(table) {
1070                        return Err(QueryError::ReadonlyNeedsWrite);
1071                    }
1072                    let schema = self
1073                        .catalog
1074                        .schema(table)
1075                        .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
1076                        .clone();
1077                    let columns: Vec<String> =
1078                        schema.columns.iter().map(|c| c.name.clone()).collect();
1079                    let fast = FastLayout::new(&schema);
1080                    let row_layout = RowLayout::new(&schema);
1081                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1082
1083                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
1084                        self.catalog
1085                            .for_each_row_raw(table, |_rid, data| {
1086                                if compiled(data) {
1087                                    rows.push(decode_row(&schema, data));
1088                                }
1089                            })
1090                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1091                    } else {
1092                        let pred_cols = predicate_column_indices(predicate, &columns);
1093                        self.catalog
1094                            .for_each_row_raw(table, |_rid, data| {
1095                                let pred_row =
1096                                    decode_selective(&schema, &row_layout, data, &pred_cols);
1097                                if eval_predicate(predicate, &pred_row, &columns) {
1098                                    rows.push(decode_row(&schema, data));
1099                                }
1100                            })
1101                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1102                    }
1103
1104                    return Ok(QueryResult::Rows { columns, rows });
1105                }
1106
1107                // General path.
1108                let result = self.execute_plan_readonly(input)?;
1109                match result {
1110                    QueryResult::Rows { columns, rows } => {
1111                        let filtered: Vec<Vec<Value>> = rows
1112                            .into_iter()
1113                            .filter(|row| eval_predicate(predicate, row, &columns))
1114                            .collect();
1115                        Ok(QueryResult::Rows {
1116                            columns,
1117                            rows: filtered,
1118                        })
1119                    }
1120                    _ => Err("filter requires row input".into()),
1121                }
1122            }
1123
1124            PlanNode::Project { input, fields } => {
1125                // Fast path: Project over IndexScan. Avoids full-row decode
1126                // by calling decode_column only for projected fields.
1127                if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
1128                    let key_value = literal_to_value(key)?;
1129                    let tbl = self
1130                        .catalog
1131                        .get_table(table)
1132                        .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
1133                    let schema = &tbl.schema;
1134                    let layout = tbl.row_layout();
1135
1136                    let proj_columns: Vec<String> = fields
1137                        .iter()
1138                        .map(|f| {
1139                            f.alias.clone().unwrap_or_else(|| match &f.expr {
1140                                Expr::Field(name) => name.clone(),
1141                                _ => "?".into(),
1142                            })
1143                        })
1144                        .collect();
1145
1146                    let proj_indices: Vec<usize> = fields
1147                        .iter()
1148                        .filter_map(|f| {
1149                            if let Expr::Field(name) = &f.expr {
1150                                schema.column_index(name)
1151                            } else {
1152                                None
1153                            }
1154                        })
1155                        .collect();
1156
1157                    if tbl.has_index(column) {
1158                        let rids = tbl.index_lookup_all(column, &key_value);
1159                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1160                        for rid in rids {
1161                            if let Some(data) = tbl.heap.get(rid) {
1162                                let row: Vec<Value> = proj_indices
1163                                    .iter()
1164                                    .map(|&ci| decode_column(schema, layout, &data, ci))
1165                                    .collect();
1166                                rows.push(row);
1167                            }
1168                        }
1169                        return Ok(QueryResult::Rows {
1170                            columns: proj_columns,
1171                            rows,
1172                        });
1173                    }
1174                }
1175
1176                // Fast paths over Limit(Sort(...)) / Limit(Filter(...)) / Limit(SeqScan).
1177                if let PlanNode::Limit {
1178                    input: inner,
1179                    count: limit_expr,
1180                } = input.as_ref()
1181                {
1182                    if let PlanNode::Sort {
1183                        input: sort_input,
1184                        keys,
1185                    } = inner.as_ref()
1186                    {
1187                        if keys.len() == 1 {
1188                            let sort_field = &keys[0].field;
1189                            let descending = keys[0].descending;
1190                            let limit = match limit_expr {
1191                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1192                                _ => usize::MAX,
1193                            };
1194                            let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
1195                                match sort_input.as_ref() {
1196                                    PlanNode::SeqScan { table } => (Some(table.as_str()), None),
1197                                    PlanNode::Filter {
1198                                        input: fi,
1199                                        predicate,
1200                                    } => {
1201                                        if let PlanNode::SeqScan { table } = fi.as_ref() {
1202                                            (Some(table.as_str()), Some(predicate))
1203                                        } else {
1204                                            (None, None)
1205                                        }
1206                                    }
1207                                    _ => (None, None),
1208                                };
1209                            if let Some(table) = table_opt {
1210                                if let Some(result) = self.project_filter_sort_limit_fast(
1211                                    table, fields, sort_field, descending, limit, pred_opt,
1212                                )? {
1213                                    return Ok(result);
1214                                }
1215                            }
1216                        }
1217                    }
1218                    if let PlanNode::Filter {
1219                        input: fi,
1220                        predicate,
1221                    } = inner.as_ref()
1222                    {
1223                        if let PlanNode::SeqScan { table } = fi.as_ref() {
1224                            let limit = match limit_expr {
1225                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1226                                _ => usize::MAX,
1227                            };
1228                            if let Some(result) = self.project_filter_limit_fast(
1229                                table,
1230                                fields,
1231                                limit,
1232                                Some(predicate),
1233                            )? {
1234                                return Ok(result);
1235                            }
1236                        }
1237                    }
1238                    if let PlanNode::SeqScan { table } = inner.as_ref() {
1239                        let limit = match limit_expr {
1240                            Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1241                            _ => usize::MAX,
1242                        };
1243                        if let Some(result) =
1244                            self.project_filter_limit_fast(table, fields, limit, None)?
1245                        {
1246                            return Ok(result);
1247                        }
1248                    }
1249                }
1250
1251                // Project(Filter(SeqScan)) without Limit.
1252                if let PlanNode::Filter {
1253                    input: fi,
1254                    predicate,
1255                } = input.as_ref()
1256                {
1257                    if let PlanNode::SeqScan { table } = fi.as_ref() {
1258                        if let Some(result) = self.project_filter_limit_fast(
1259                            table,
1260                            fields,
1261                            usize::MAX,
1262                            Some(predicate),
1263                        )? {
1264                            return Ok(result);
1265                        }
1266                    }
1267                }
1268
1269                // Project(SeqScan) without Filter or Limit.
1270                if let PlanNode::SeqScan { table } = input.as_ref() {
1271                    if let Some(result) =
1272                        self.project_filter_limit_fast(table, fields, usize::MAX, None)?
1273                    {
1274                        return Ok(result);
1275                    }
1276                }
1277
1278                // Generic path.
1279                let result = self.execute_plan_readonly(input)?;
1280                match result {
1281                    QueryResult::Rows { columns, rows } => {
1282                        let proj_columns: Vec<String> = fields
1283                            .iter()
1284                            .map(|f| {
1285                                f.alias.clone().unwrap_or_else(|| match &f.expr {
1286                                    Expr::Field(name) => name.clone(),
1287                                    Expr::QualifiedField { qualifier, field } => {
1288                                        format!("{qualifier}.{field}")
1289                                    }
1290                                    _ => "?".into(),
1291                                })
1292                            })
1293                            .collect();
1294                        let proj_rows: Vec<Vec<Value>> = rows
1295                            .iter()
1296                            .map(|row| {
1297                                fields
1298                                    .iter()
1299                                    .map(|f| eval_expr(&f.expr, row, &columns))
1300                                    .collect()
1301                            })
1302                            .collect();
1303                        Ok(QueryResult::Rows {
1304                            columns: proj_columns,
1305                            rows: proj_rows,
1306                        })
1307                    }
1308                    _ => Err("project requires row input".into()),
1309                }
1310            }
1311
1312            PlanNode::Sort { input, keys } => {
1313                let result = self.execute_plan_readonly(input)?;
1314                match result {
1315                    QueryResult::Rows { columns, mut rows } => {
1316                        if rows.len() > MAX_SORT_ROWS {
1317                            return Err(QueryError::SortLimitExceeded);
1318                        }
1319                        // WS2: byte-budget guard on the sort buffer.
1320                        self.charge_rows(&rows)?;
1321                        let key_indices: Vec<(usize, bool)> = keys
1322                            .iter()
1323                            .map(|k| {
1324                                columns
1325                                    .iter()
1326                                    .position(|c| c == &k.field)
1327                                    .map(|idx| (idx, k.descending))
1328                                    .ok_or_else(|| QueryError::ColumnNotFound {
1329                                        table: String::new(),
1330                                        column: k.field.clone(),
1331                                    })
1332                            })
1333                            .collect::<Result<_, QueryError>>()?;
1334                        rows.sort_by(|a, b| {
1335                            for &(col_idx, descending) in &key_indices {
1336                                let cmp = a[col_idx].cmp(&b[col_idx]);
1337                                let cmp = if descending { cmp.reverse() } else { cmp };
1338                                if cmp != std::cmp::Ordering::Equal {
1339                                    return cmp;
1340                                }
1341                            }
1342                            std::cmp::Ordering::Equal
1343                        });
1344                        Ok(QueryResult::Rows { columns, rows })
1345                    }
1346                    _ => Err("sort requires row input".into()),
1347                }
1348            }
1349
1350            PlanNode::Limit { input, count } => {
1351                let result = self.execute_plan_readonly(input)?;
1352                let n = match count {
1353                    Expr::Literal(Literal::Int(v)) => *v as usize,
1354                    _ => return Err("limit must be integer literal".into()),
1355                };
1356                match result {
1357                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1358                        columns,
1359                        rows: rows.into_iter().take(n).collect(),
1360                    }),
1361                    _ => Err("limit requires row input".into()),
1362                }
1363            }
1364
1365            PlanNode::Offset { input, count } => {
1366                let result = self.execute_plan_readonly(input)?;
1367                let n = match count {
1368                    Expr::Literal(Literal::Int(v)) => *v as usize,
1369                    _ => return Err("offset must be integer literal".into()),
1370                };
1371                match result {
1372                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1373                        columns,
1374                        rows: rows.into_iter().skip(n).collect(),
1375                    }),
1376                    _ => Err("offset requires row input".into()),
1377                }
1378            }
1379
1380            PlanNode::Aggregate {
1381                input,
1382                function,
1383                field,
1384            } => {
1385                // Fast path: count() over SeqScan.
1386                if *function == AggFunc::Count {
1387                    if let PlanNode::SeqScan { table } = input.as_ref() {
1388                        // A dirty materialized view must be refreshed before
1389                        // it can be counted, which needs `&mut self`. Escalate
1390                        // to the write path (F3: count(View) returned stale).
1391                        if self.view_registry.is_dirty(table) {
1392                            return Err(QueryError::ReadonlyNeedsWrite);
1393                        }
1394                        let mut count: i64 = 0;
1395                        self.catalog
1396                            .for_each_row_raw(table, |_rid, _data| {
1397                                count += 1;
1398                            })
1399                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1400                        return Ok(QueryResult::Scalar(Value::Int(count)));
1401                    }
1402                    if let PlanNode::Filter {
1403                        input: inner,
1404                        predicate,
1405                    } = input.as_ref()
1406                    {
1407                        // Only take the fast path for a plain Filter(SeqScan)
1408                        // with no subquery in the predicate. A subquery
1409                        // predicate (`count(T filter .x in (...))`) must be
1410                        // resolved first; the fast path evaluates the raw
1411                        // predicate with no subquery materialisation, which
1412                        // silently yields 0 (F1). Falling through routes it to
1413                        // the generic path that runs the subquery correctly.
1414                        if let PlanNode::SeqScan { table } = inner.as_ref() {
1415                            if self.view_registry.is_dirty(table) {
1416                                // F3: count(View filter ...) over a dirty view.
1417                                return Err(QueryError::ReadonlyNeedsWrite);
1418                            }
1419                        }
1420                        if let (PlanNode::SeqScan { table }, false) =
1421                            (inner.as_ref(), contains_subquery(predicate))
1422                        {
1423                            let schema = self
1424                                .catalog
1425                                .schema(table)
1426                                .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
1427                                .clone();
1428                            let columns: Vec<String> =
1429                                schema.columns.iter().map(|c| c.name.clone()).collect();
1430                            let fast = FastLayout::new(&schema);
1431                            let row_layout = RowLayout::new(&schema);
1432
1433                            if let Some(compiled) =
1434                                compile_predicate(predicate, &columns, &fast, &schema)
1435                            {
1436                                let mut count: i64 = 0;
1437                                self.catalog
1438                                    .for_each_row_raw(table, |_rid, data| {
1439                                        if compiled(data) {
1440                                            count += 1;
1441                                        }
1442                                    })
1443                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1444                                return Ok(QueryResult::Scalar(Value::Int(count)));
1445                            }
1446
1447                            let pred_cols = predicate_column_indices(predicate, &columns);
1448                            let mut count: i64 = 0;
1449                            self.catalog
1450                                .for_each_row_raw(table, |_rid, data| {
1451                                    let pred_row =
1452                                        decode_selective(&schema, &row_layout, data, &pred_cols);
1453                                    if eval_predicate(predicate, &pred_row, &columns) {
1454                                        count += 1;
1455                                    }
1456                                })
1457                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1458                            return Ok(QueryResult::Scalar(Value::Int(count)));
1459                        }
1460                    }
1461                }
1462
1463                // Fast path: sum/avg/min/max over single fixed-size numeric.
1464                if matches!(
1465                    function,
1466                    AggFunc::Sum
1467                        | AggFunc::Avg
1468                        | AggFunc::Min
1469                        | AggFunc::Max
1470                        | AggFunc::CountDistinct
1471                ) {
1472                    if let Some(col) = field.as_ref() {
1473                        let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
1474                            match input.as_ref() {
1475                                PlanNode::SeqScan { table } => (Some(table.as_str()), None),
1476                                PlanNode::Filter {
1477                                    input: inner,
1478                                    predicate,
1479                                } => {
1480                                    if let PlanNode::SeqScan { table } = inner.as_ref() {
1481                                        (Some(table.as_str()), Some(predicate))
1482                                    } else {
1483                                        (None, None)
1484                                    }
1485                                }
1486                                _ => (None, None),
1487                            };
1488                        if let Some(table) = table_opt {
1489                            if let Some(result) =
1490                                self.agg_single_col_fast(table, col, *function, pred_opt)?
1491                            {
1492                                return Ok(result);
1493                            }
1494                        }
1495                    }
1496                }
1497
1498                // Generic path.
1499                let result = self.execute_plan_readonly(input)?;
1500                match result {
1501                    QueryResult::Rows { columns, rows } => match function {
1502                        AggFunc::Count => Ok(QueryResult::Scalar(Value::Int(rows.len() as i64))),
1503                        AggFunc::CountDistinct => {
1504                            let col = field.as_ref().ok_or("count distinct requires field")?;
1505                            let idx = columns
1506                                .iter()
1507                                .position(|c| c == col)
1508                                .ok_or("col not found")?;
1509                            let mut seen = std::collections::HashSet::new();
1510                            for row in &rows {
1511                                let v = &row[idx];
1512                                if !v.is_empty() {
1513                                    seen.insert(v.clone());
1514                                }
1515                            }
1516                            Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
1517                        }
1518                        AggFunc::Avg => {
1519                            let col = field.as_ref().ok_or("avg requires field")?;
1520                            let idx = columns
1521                                .iter()
1522                                .position(|c| c == col)
1523                                .ok_or("col not found")?;
1524                            let sum: f64 = rows
1525                                .iter()
1526                                .filter_map(|r| match &r[idx] {
1527                                    Value::Int(v) => Some(*v as f64),
1528                                    Value::Float(v) => Some(*v),
1529                                    _ => None,
1530                                })
1531                                .sum();
1532                            let count = rows.len() as f64;
1533                            Ok(QueryResult::Scalar(Value::Float(sum / count)))
1534                        }
1535                        AggFunc::Sum => {
1536                            let col = field.as_ref().ok_or("sum requires field")?;
1537                            let idx = columns
1538                                .iter()
1539                                .position(|c| c == col)
1540                                .ok_or("col not found")?;
1541                            let mut int_sum: i64 = 0;
1542                            let mut float_sum: f64 = 0.0;
1543                            let mut saw_float = false;
1544                            for r in &rows {
1545                                match &r[idx] {
1546                                    Value::Int(v) => int_sum += *v,
1547                                    Value::Float(v) => {
1548                                        float_sum += *v;
1549                                        saw_float = true;
1550                                    }
1551                                    _ => {}
1552                                }
1553                            }
1554                            let result = if saw_float {
1555                                Value::Float(float_sum + int_sum as f64)
1556                            } else {
1557                                Value::Int(int_sum)
1558                            };
1559                            Ok(QueryResult::Scalar(result))
1560                        }
1561                        AggFunc::Min | AggFunc::Max => {
1562                            let col = field.as_ref().ok_or("min/max requires field")?;
1563                            let idx = columns
1564                                .iter()
1565                                .position(|c| c == col)
1566                                .ok_or("col not found")?;
1567                            let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
1568                            let result = if *function == AggFunc::Min {
1569                                vals.into_iter().min().cloned()
1570                            } else {
1571                                vals.into_iter().max().cloned()
1572                            };
1573                            Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
1574                        }
1575                    },
1576                    _ => Err("aggregate requires row input".into()),
1577                }
1578            }
1579
1580            PlanNode::Distinct { input } => {
1581                let result = self.execute_plan_readonly(input)?;
1582                match result {
1583                    QueryResult::Rows { columns, rows } => {
1584                        let mut seen = std::collections::HashSet::new();
1585                        let mut unique_rows = Vec::new();
1586                        for row in rows {
1587                            if seen.insert(row.clone()) {
1588                                unique_rows.push(row);
1589                            }
1590                        }
1591                        Ok(QueryResult::Rows {
1592                            columns,
1593                            rows: unique_rows,
1594                        })
1595                    }
1596                    other => Ok(other),
1597                }
1598            }
1599
1600            PlanNode::GroupBy {
1601                input,
1602                keys,
1603                aggregates,
1604                having,
1605            } => {
1606                let result = self.execute_plan_readonly(input)?;
1607                match result {
1608                    QueryResult::Rows { columns, rows } => {
1609                        // WS2: byte-budget guard on the GROUP BY input buffer
1610                        // (the hash table is bounded by the input it groups).
1611                        self.charge_rows(&rows)?;
1612                        let key_indices: Vec<usize> = keys
1613                            .iter()
1614                            .map(|k| {
1615                                columns.iter().position(|c| c == k).ok_or_else(|| {
1616                                    QueryError::ColumnNotFound {
1617                                        table: String::new(),
1618                                        column: k.clone(),
1619                                    }
1620                                })
1621                            })
1622                            .collect::<Result<Vec<_>, _>>()?;
1623
1624                        let agg_field_indices: Vec<usize> = aggregates
1625                            .iter()
1626                            .map(|a| {
1627                                if a.field == "*" {
1628                                    Ok(usize::MAX)
1629                                } else {
1630                                    columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1631                                        QueryError::ColumnNotFound {
1632                                            table: String::new(),
1633                                            column: a.field.clone(),
1634                                        }
1635                                    })
1636                                }
1637                            })
1638                            .collect::<Result<Vec<_>, _>>()?;
1639
1640                        let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1641                            rustc_hash::FxHashMap::default();
1642                        let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1643                        for (ri, row) in rows.iter().enumerate() {
1644                            let key: Vec<Value> =
1645                                key_indices.iter().map(|&i| row[i].clone()).collect();
1646                            match group_map.get(&key) {
1647                                Some(&idx) => groups[idx].1.push(ri),
1648                                None => {
1649                                    let idx = groups.len();
1650                                    group_map.insert(key.clone(), idx);
1651                                    groups.push((key, vec![ri]));
1652                                }
1653                            }
1654                        }
1655
1656                        let mut out_columns: Vec<String> = keys.clone();
1657                        for agg in aggregates.iter() {
1658                            out_columns.push(agg.output_name.clone());
1659                        }
1660
1661                        let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1662                        for (key_vals, row_indices) in &groups {
1663                            let mut row = key_vals.clone();
1664                            for (ai, agg) in aggregates.iter().enumerate() {
1665                                let col_idx = agg_field_indices[ai];
1666                                let val = compute_group_aggregate(
1667                                    agg.function,
1668                                    &rows,
1669                                    row_indices,
1670                                    col_idx,
1671                                );
1672                                row.push(val);
1673                            }
1674                            out_rows.push(row);
1675                        }
1676
1677                        if let Some(having_expr) = having {
1678                            out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1679                        }
1680
1681                        Ok(QueryResult::Rows {
1682                            columns: out_columns,
1683                            rows: out_rows,
1684                        })
1685                    }
1686                    _ => Err("group by requires row input".into()),
1687                }
1688            }
1689
1690            PlanNode::NestedLoopJoin {
1691                left,
1692                right,
1693                on,
1694                kind,
1695            } => {
1696                let left_result = self.execute_plan_readonly(left)?;
1697                let right_result = self.execute_plan_readonly(right)?;
1698                let (left_columns, left_rows) = match left_result {
1699                    QueryResult::Rows { columns, rows } => (columns, rows),
1700                    _ => return Err("join left side must produce rows".into()),
1701                };
1702                let (right_columns, right_rows) = match right_result {
1703                    QueryResult::Rows { columns, rows } => (columns, rows),
1704                    _ => return Err("join right side must produce rows".into()),
1705                };
1706
1707                // WS2: byte-budget guard on the join build side.
1708                self.charge_rows(&left_rows)?;
1709                self.charge_rows(&right_rows)?;
1710
1711                if !matches!(kind, JoinKind::Cross) {
1712                    if let Some(pred) = on {
1713                        if let Some((l_idx, r_idx)) =
1714                            try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1715                        {
1716                            let result = hash_join(
1717                                left_columns,
1718                                left_rows,
1719                                right_columns,
1720                                right_rows,
1721                                l_idx,
1722                                r_idx,
1723                                *kind,
1724                            );
1725                            if let QueryResult::Rows { ref rows, .. } = result {
1726                                check_join_limit(rows.len())?;
1727                            }
1728                            return Ok(result);
1729                        }
1730                    }
1731                }
1732
1733                let n_left = left_columns.len();
1734                let n_right = right_columns.len();
1735                let mut columns = Vec::with_capacity(n_left + n_right);
1736                columns.extend(left_columns);
1737                columns.extend(right_columns);
1738
1739                let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1740                let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1741
1742                for left_row in &left_rows {
1743                    let mut matched = false;
1744                    for right_row in &right_rows {
1745                        combined.clear();
1746                        combined.extend_from_slice(left_row);
1747                        combined.extend_from_slice(right_row);
1748                        let keep = match kind {
1749                            JoinKind::Cross => true,
1750                            JoinKind::Inner | JoinKind::LeftOuter => match on {
1751                                Some(pred) => eval_predicate(pred, &combined, &columns),
1752                                None => true,
1753                            },
1754                            JoinKind::RightOuter => {
1755                                unreachable!("planner rewrites RightOuter to LeftOuter")
1756                            }
1757                        };
1758                        if keep {
1759                            rows.push(combined.clone());
1760                            check_join_limit(rows.len())?;
1761                            matched = true;
1762                        }
1763                    }
1764                    if !matched && matches!(kind, JoinKind::LeftOuter) {
1765                        let mut row = Vec::with_capacity(n_left + n_right);
1766                        row.extend_from_slice(left_row);
1767                        row.resize(n_left + n_right, Value::Empty);
1768                        rows.push(row);
1769                        check_join_limit(rows.len())?;
1770                    }
1771                }
1772
1773                Ok(QueryResult::Rows { columns, rows })
1774            }
1775
1776            PlanNode::Window { input, windows } => {
1777                let result = self.execute_plan_readonly(input)?;
1778                execute_window(result, windows)
1779            }
1780
1781            PlanNode::Union { left, right, all } => {
1782                let left_result = self.execute_plan_readonly(left)?;
1783                let right_result = self.execute_plan_readonly(right)?;
1784                let (left_cols, left_rows) = match left_result {
1785                    QueryResult::Rows { columns, rows } => (columns, rows),
1786                    _ => return Err("UNION requires query results on left side".into()),
1787                };
1788                let (_, right_rows) = match right_result {
1789                    QueryResult::Rows { columns, rows } => (columns, rows),
1790                    _ => return Err("UNION requires query results on right side".into()),
1791                };
1792                let mut combined = left_rows;
1793                if *all {
1794                    combined.extend(right_rows);
1795                } else {
1796                    let mut seen = std::collections::HashSet::new();
1797                    for row in &combined {
1798                        seen.insert(row.clone());
1799                    }
1800                    for row in right_rows {
1801                        if seen.insert(row.clone()) {
1802                            combined.push(row);
1803                        }
1804                    }
1805                }
1806                Ok(QueryResult::Rows {
1807                    columns: left_cols,
1808                    rows: combined,
1809                })
1810            }
1811
1812            PlanNode::Explain { input } => {
1813                let text = format_plan_tree(input, 0);
1814                Ok(QueryResult::Rows {
1815                    columns: vec!["plan".to_string()],
1816                    rows: text
1817                        .lines()
1818                        .map(|line| vec![Value::Str(line.to_string())])
1819                        .collect(),
1820                })
1821            }
1822
1823            // All write variants — caller must escalate to the write lock.
1824            PlanNode::Insert { .. }
1825            | PlanNode::Update { .. }
1826            | PlanNode::Delete { .. }
1827            | PlanNode::Upsert { .. }
1828            | PlanNode::CreateTable { .. }
1829            | PlanNode::AlterTable { .. }
1830            | PlanNode::DropTable { .. }
1831            | PlanNode::CreateView { .. }
1832            | PlanNode::RefreshView { .. }
1833            | PlanNode::DropView { .. }
1834            | PlanNode::Begin
1835            | PlanNode::Commit
1836            | PlanNode::Rollback => Err(QueryError::ReadonlyNeedsWrite),
1837        }
1838    }
1839
1840    /// `&self` variant of [`Engine::materialize_subqueries`]. Used by the
1841    /// read path so `Filter` predicates with `InSubquery`/`ExistsSubquery`
1842    /// children can evaluate their inner queries without taking the write
1843    /// lock. Inner queries that would themselves need a write (e.g. dirty
1844    /// view) escalate via [`READONLY_NEEDS_WRITE`] just like the top-level
1845    /// read path does.
1846    fn materialize_subqueries_readonly(&self, expr: &Expr) -> Result<Expr, QueryError> {
1847        match expr {
1848            Expr::InSubquery {
1849                expr: inner,
1850                subquery,
1851                negated,
1852            } => {
1853                if is_correlated_subquery(subquery, &self.catalog) {
1854                    // Pass through — will be materialized per-row in the
1855                    // Filter handler's correlated subquery path.
1856                    let inner = self.materialize_subqueries_readonly(inner)?;
1857                    return Ok(Expr::InSubquery {
1858                        expr: Box::new(inner),
1859                        subquery: subquery.clone(),
1860                        negated: *negated,
1861                    });
1862                }
1863                let inner = self.materialize_subqueries_readonly(inner)?;
1864                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1865                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1866                let result = self.execute_plan_readonly(&sub_plan)?;
1867                let values = match result {
1868                    QueryResult::Rows { rows, .. } => rows
1869                        .into_iter()
1870                        .filter_map(|mut row| {
1871                            if row.is_empty() {
1872                                None
1873                            } else {
1874                                Some(value_to_expr(row.swap_remove(0)))
1875                            }
1876                        })
1877                        .collect(),
1878                    _ => Vec::new(),
1879                };
1880                // WS2: byte-budget guard on the materialized IN-list.
1881                self.charge_in_list(&values)?;
1882                Ok(Expr::InList {
1883                    expr: Box::new(inner),
1884                    list: values,
1885                    negated: *negated,
1886                })
1887            }
1888            Expr::ExistsSubquery { subquery, negated } => {
1889                if is_correlated_subquery(subquery, &self.catalog) {
1890                    return Ok(expr.clone());
1891                }
1892                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1893                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1894                let result = self.execute_plan_readonly(&sub_plan)?;
1895                let has_rows = match result {
1896                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
1897                    _ => false,
1898                };
1899                let truth = if *negated { !has_rows } else { has_rows };
1900                Ok(Expr::Literal(Literal::Bool(truth)))
1901            }
1902            Expr::BinaryOp(l, op, r) => {
1903                let l = self.materialize_subqueries_readonly(l)?;
1904                let r = self.materialize_subqueries_readonly(r)?;
1905                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1906            }
1907            Expr::UnaryOp(op, inner) => {
1908                let inner = self.materialize_subqueries_readonly(inner)?;
1909                Ok(Expr::UnaryOp(*op, Box::new(inner)))
1910            }
1911            Expr::Case { whens, else_expr } => {
1912                let whens = whens
1913                    .iter()
1914                    .map(|(c, r)| {
1915                        let c = self.materialize_subqueries_readonly(c)?;
1916                        let r = self.materialize_subqueries_readonly(r)?;
1917                        Ok((Box::new(c), Box::new(r)))
1918                    })
1919                    .collect::<Result<Vec<_>, QueryError>>()?;
1920                let else_expr = match else_expr {
1921                    Some(e) => Some(Box::new(self.materialize_subqueries_readonly(e)?)),
1922                    None => None,
1923                };
1924                Ok(Expr::Case { whens, else_expr })
1925            }
1926            other => Ok(other.clone()),
1927        }
1928    }
1929
1930    /// Per-row materialisation of correlated subqueries. For each row in the
1931    /// outer query, substitute outer column references in the subquery's
1932    /// filter with the current row's literal values, execute the modified
1933    /// subquery, and return the result as an InList or Bool literal.
1934    fn materialize_correlated_for_row_readonly(
1935        &self,
1936        expr: &Expr,
1937        outer_row: &[Value],
1938        outer_columns: &[String],
1939    ) -> Result<Expr, QueryError> {
1940        match expr {
1941            Expr::InSubquery {
1942                expr: inner,
1943                subquery,
1944                negated,
1945            } => {
1946                let inner =
1947                    self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1948                let mut sub = *subquery.clone();
1949                if let Some(ref filter) = sub.filter {
1950                    sub.filter = Some(substitute_outer_refs(
1951                        filter,
1952                        &sub.source,
1953                        &self.catalog,
1954                        outer_row,
1955                        outer_columns,
1956                    ));
1957                }
1958                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1959                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1960                let result = self.execute_plan_readonly(&sub_plan)?;
1961                let values = match result {
1962                    QueryResult::Rows { rows, .. } => rows
1963                        .into_iter()
1964                        .filter_map(|mut row| {
1965                            if row.is_empty() {
1966                                None
1967                            } else {
1968                                Some(value_to_expr(row.swap_remove(0)))
1969                            }
1970                        })
1971                        .collect(),
1972                    _ => Vec::new(),
1973                };
1974                // WS2: byte-budget guard on the per-row materialized IN-list.
1975                self.charge_in_list(&values)?;
1976                Ok(Expr::InList {
1977                    expr: Box::new(inner),
1978                    list: values,
1979                    negated: *negated,
1980                })
1981            }
1982            Expr::ExistsSubquery { subquery, negated } => {
1983                let mut sub = *subquery.clone();
1984                if let Some(ref filter) = sub.filter {
1985                    sub.filter = Some(substitute_outer_refs(
1986                        filter,
1987                        &sub.source,
1988                        &self.catalog,
1989                        outer_row,
1990                        outer_columns,
1991                    ));
1992                }
1993                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1994                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1995                let result = self.execute_plan_readonly(&sub_plan)?;
1996                let has_rows = match result {
1997                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
1998                    _ => false,
1999                };
2000                let truth = if *negated { !has_rows } else { has_rows };
2001                Ok(Expr::Literal(Literal::Bool(truth)))
2002            }
2003            Expr::BinaryOp(l, op, r) => {
2004                let l =
2005                    self.materialize_correlated_for_row_readonly(l, outer_row, outer_columns)?;
2006                let r =
2007                    self.materialize_correlated_for_row_readonly(r, outer_row, outer_columns)?;
2008                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
2009            }
2010            Expr::UnaryOp(op, inner) => {
2011                let inner =
2012                    self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
2013                Ok(Expr::UnaryOp(*op, Box::new(inner)))
2014            }
2015            other => Ok(other.clone()),
2016        }
2017    }
2018
2019    pub fn catalog(&self) -> &Catalog {
2020        &self.catalog
2021    }
2022
2023    pub fn catalog_mut(&mut self) -> &mut Catalog {
2024        &mut self.catalog
2025    }
2026}