Skip to main content

powdb_query/executor/
mod.rs

1//! PowDB query executor.
2
3// Submodules that don't use macros defined in this file.
4mod compiled;
5mod eval;
6pub mod mem_budget;
7
8use crate::ast::*;
9use crate::canonicalize::canonicalize;
10use crate::plan::*;
11use crate::plan_cache::PlanCache;
12use crate::planner;
13use crate::result::{QueryError, QueryResult};
14use powdb_storage::catalog::Catalog;
15use powdb_storage::row::{decode_column, decode_row, RowLayout};
16use powdb_storage::types::*;
17use powdb_storage::view::ViewRegistry;
18
19use std::io;
20use std::path::Path;
21use std::sync::Mutex;
22use std::time::Instant;
23use tracing::{error, info, Level};
24
25use self::compiled::*;
26use self::eval::*;
27
28/// Legacy sentinel string constant — kept for backward compatibility with
29/// any external code matching on the string representation. New code should
30/// match on `QueryError::ReadonlyNeedsWrite` directly.
31pub const READONLY_NEEDS_WRITE: &str = "__POWDB_READONLY_NEEDS_WRITE__";
32
33/// Plan cache capacity. Bench workloads fill ~15 slots; real apps will sit
34/// comfortably in 256. Lookup is O(1), collisions clear the cache (see
35/// `plan_cache::PlanCache::insert`).
36const PLAN_CACHE_CAPACITY: usize = 256;
37
38/// Maximum number of rows a join may produce before the executor aborts.
39/// Prevents Cartesian-product blowups (e.g. `T cross join T` on 10K rows
40/// would produce 100M rows in memory without this cap).
41pub(super) const MAX_JOIN_ROWS: usize = 1_000_000;
42
43/// Maximum number of rows that may be materialized for sorting.
44/// Queries that exceed this should add a LIMIT clause to narrow the input
45/// before sorting.
46pub(super) const MAX_SORT_ROWS: usize = 10_000_000;
47
48#[inline]
49pub(super) fn check_join_limit(row_count: usize) -> Result<(), QueryError> {
50    if row_count > MAX_JOIN_ROWS {
51        return Err(QueryError::JoinLimitExceeded);
52    }
53    Ok(())
54}
55
56// ─── Mission D11 Phase 1: scalar hot-loop helpers ─────────────────────────
57//
58// These macros expand into the scan body of `agg_single_col_fast` and sit
59// inside the `for_each_row_raw` closure. They exist to:
60//
61//   1. Split the loop on presence of a predicate *outside* the hot body,
62//      so the no-predicate path (agg_sum/agg_min/agg_max bench workloads)
63//      never pays the `Option<CompiledPredicate>` branch per row.
64//   2. Drop two bounds checks per row by reading the null bitmap byte
65//      and the 8-byte value via raw pointer casts.
66//
67// SAFETY (shared across every call site below):
68//
69//   - `$bmp_byte` is `col_idx / 8` where `col_idx < n_cols`, and the row
70//     encoding stores `bitmap_size = n_cols.div_ceil(8)` bytes of bitmap
71//     starting at offset 2. So `2 + $bmp_byte < 2 + bitmap_size ≤ row_len`
72//     and `get_unchecked(2 + $bmp_byte)` is inside the row slice.
73//   - `$off = 2 + bitmap_size + fixed_offsets[col_idx]` for a fixed-size
74//     column. Every fixed-size column contributes `fixed_size(type_id)`
75//     bytes to the fixed region, so the row always has `[$off .. $off+8]`
76//     available for any i64/f64 column — enforced by the row encoder
77//     (`storage/src/row.rs`) and the schema invariant that a row with a
78//     given schema has `row_len ≥ 2 + bitmap_size + fixed_region_size`.
79//   - Both macros are only invoked from `agg_single_col_fast`, which
80//     early-returns if the column isn't Int/Float (8-byte fixed) and
81//     early-returns if `fast.fixed_offsets[col_idx]` is `None`.
82macro_rules! agg_int_loop {
83    (
84        $self:expr, $table:expr, $pred:expr,
85        $bmp_byte:expr, $bmp_bit:expr, $off:expr,
86        |$v:ident : i64| $body:block
87    ) => {{
88        let bmp_byte = $bmp_byte;
89        let bmp_bit = $bmp_bit;
90        let off = $off;
91        if let Some(pred) = &$pred {
92            $self
93                .catalog
94                .for_each_row_raw($table, |_rid, data| {
95                    if !pred(data) {
96                        return;
97                    }
98                    // Bounds guard: skip corrupt/truncated rows that are too
99                    // short to contain the bitmap byte or the 8-byte value.
100                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
101                        return;
102                    }
103                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
104                    // The bitmap byte lives at offset 2..2+bitmap_size in the
105                    // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
106                    // Corrupt rows are rejected by the bounds guard.
107                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
108                    if (bmp >> bmp_bit) & 1 == 1 {
109                        return;
110                    }
111                    // SAFETY: `off + 8 <= data.len()` is checked above.
112                    // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
113                    // to an 8-byte i64 in the fixed-size region of the row.
114                    // The pointer cast is valid because we read exactly 8
115                    // bytes via from_le_bytes. Corrupt rows are rejected by
116                    // the bounds guard.
117                    let $v: i64 =
118                        unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
119                    $body
120                })
121                .map_err(|e| QueryError::StorageError(e.to_string()))?;
122        } else {
123            $self
124                .catalog
125                .for_each_row_raw($table, |_rid, data| {
126                    // Bounds guard: skip corrupt/truncated rows.
127                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
128                        return;
129                    }
130                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
131                    // See the predicate branch for the full invariant.
132                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
133                    if (bmp >> bmp_bit) & 1 == 1 {
134                        return;
135                    }
136                    // SAFETY: `off + 8 <= data.len()` is checked above.
137                    // See the predicate branch for the full invariant.
138                    let $v: i64 =
139                        unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
140                    $body
141                })
142                .map_err(|e| QueryError::StorageError(e.to_string()))?;
143        }
144    }};
145}
146
147macro_rules! agg_float_loop {
148    (
149        $self:expr, $table:expr, $pred:expr,
150        $bmp_byte:expr, $bmp_bit:expr, $off:expr,
151        |$v:ident : f64| $body:block
152    ) => {{
153        let bmp_byte = $bmp_byte;
154        let bmp_bit = $bmp_bit;
155        let off = $off;
156        if let Some(pred) = &$pred {
157            $self
158                .catalog
159                .for_each_row_raw($table, |_rid, data| {
160                    if !pred(data) {
161                        return;
162                    }
163                    // Bounds guard: skip corrupt/truncated rows that are too
164                    // short to contain the bitmap byte or the 8-byte value.
165                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
166                        return;
167                    }
168                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
169                    // The bitmap byte lives at offset 2..2+bitmap_size in the
170                    // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
171                    // Corrupt rows are rejected by the bounds guard.
172                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
173                    if (bmp >> bmp_bit) & 1 == 1 {
174                        return;
175                    }
176                    // SAFETY: `off + 8 <= data.len()` is checked above.
177                    // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
178                    // to an 8-byte f64 in the fixed-size region of the row.
179                    // The pointer cast is valid because we read exactly 8
180                    // bytes via from_le_bytes. Corrupt rows are rejected by
181                    // the bounds guard.
182                    let $v: f64 =
183                        unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
184                    $body
185                })
186                .map_err(|e| QueryError::StorageError(e.to_string()))?;
187        } else {
188            $self
189                .catalog
190                .for_each_row_raw($table, |_rid, data| {
191                    // Bounds guard: skip corrupt/truncated rows.
192                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
193                        return;
194                    }
195                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
196                    // See the predicate branch for the full invariant.
197                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
198                    if (bmp >> bmp_bit) & 1 == 1 {
199                        return;
200                    }
201                    // SAFETY: `off + 8 <= data.len()` is checked above.
202                    // See the predicate branch for the full invariant.
203                    let $v: f64 =
204                        unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
205                    $body
206                })
207                .map_err(|e| QueryError::StorageError(e.to_string()))?;
208        }
209    }};
210}
211
212// Submodules that use the macros above — must be declared after macro_rules!.
213mod plan_exec;
214mod prepared;
215
216#[cfg(test)]
217mod tests;
218
219// Re-exports for the public API
220pub use self::prepared::PreparedQuery;
221
222use self::plan_exec::{
223    compute_group_aggregate, execute_window, format_plan_tree, hash_join, lower_unindexed_scans,
224    range_matches, synthesize_range_predicate, try_extract_equi_join_keys,
225};
226
227/// Mission infra-1: classify a parsed statement as read-only vs. mutating.
228/// Used by [`Engine::execute_powql_readonly`] and by the server handler
229/// to decide between the RwLock reader and writer sides. `Union` recurses
230/// because each side can independently be read/write (though in practice
231/// both sides are reads — the parser only builds Union from query shapes).
232pub fn is_read_only_statement(stmt: &Statement) -> bool {
233    match stmt {
234        Statement::Query(_) => true,
235        Statement::Union(u) => is_read_only_statement(&u.left) && is_read_only_statement(&u.right),
236        Statement::Insert(_)
237        | Statement::Upsert(_)
238        | Statement::UpdateQuery(_)
239        | Statement::DeleteQuery(_)
240        | Statement::CreateType(_)
241        | Statement::AlterTable(_)
242        | Statement::DropTable(_)
243        | Statement::CreateView(_)
244        | Statement::RefreshView(_)
245        | Statement::DropView(_) => false,
246        Statement::Begin | Statement::Commit | Statement::Rollback => false,
247        Statement::Explain(inner) => is_read_only_statement(inner),
248    }
249}
250
251pub struct Engine {
252    catalog: Catalog,
253    /// Mission D9 — cached parsed+planned query trees keyed by canonical
254    /// hash. Saves the ~3μs parse+plan cost on repeat queries that differ
255    /// only in literal values.
256    ///
257    /// Mission infra-1: wrapped in `Mutex` so the read path can be driven
258    /// by `&self`. The critical section is extremely short — a single
259    /// hashmap lookup + plan clone on a hit, or a single insert on a miss.
260    /// A full `RwLock` would be over-engineered here; the contention window
261    /// is smaller than the read-path scan work it gates.
262    plan_cache: Mutex<PlanCache>,
263    /// Mission C Phase 13: reusable `Vec<Value>` scratch buffer for the
264    /// prepared-insert fast path. `execute_prepared` used to allocate a
265    /// fresh `vec![Value::Empty; n_cols]` on every insert; recycling this
266    /// buffer shaves one heap alloc per row on `insert_batch_1k`.
267    insert_values_scratch: Vec<Value>,
268    /// Materialized view registry: tracks view definitions, dependencies,
269    /// and dirty state. Views are backed by regular catalog tables; this
270    /// registry adds the lifecycle metadata.
271    view_registry: ViewRegistry,
272    in_transaction: bool,
273    /// WS2 — per-query memory budget ceiling (bytes). The running total lives
274    /// in a thread-local (see [`mem_budget`]) and is reset at every top-level
275    /// query entry, so sort/join/GROUP BY/IN-list materialization can be capped
276    /// without OOM-killing the process. This field holds only the *limit* (a
277    /// plain `usize`, so `Engine` stays `Sync` for the concurrent read path).
278    /// Default [`mem_budget::DEFAULT_QUERY_MEMORY_LIMIT`] (256 MB); overridable
279    /// via `Engine::with_memory_limit` (server reads `POWDB_QUERY_MEMORY_LIMIT`).
280    query_memory_limit: usize,
281}
282
283impl Engine {
284    /// Open or create a PowDB engine rooted at `data_dir`.
285    ///
286    /// If the directory already contains a catalog, it is reopened.
287    /// Otherwise a fresh empty database is created.
288    ///
289    /// # Examples
290    ///
291    /// ```
292    /// use powdb_query::executor::Engine;
293    ///
294    /// let dir = tempfile::tempdir().unwrap();
295    /// let engine = Engine::new(dir.path()).unwrap();
296    /// // Engine is ready — the directory now contains a catalog.
297    /// ```
298    pub fn new(data_dir: &Path) -> io::Result<Self> {
299        std::fs::create_dir_all(data_dir)?;
300        // Try to reopen an existing database first; only create a fresh
301        // catalog when there isn't one already on disk.
302        let catalog = match Catalog::open(data_dir) {
303            Ok(c) => {
304                info!(data_dir = %data_dir.display(), "engine reopened existing database");
305                c
306            }
307            Err(e) if e.kind() == io::ErrorKind::NotFound => {
308                info!(data_dir = %data_dir.display(), "engine initialized fresh database");
309                Catalog::create(data_dir)?
310            }
311            Err(e) => return Err(e),
312        };
313        let view_registry =
314            ViewRegistry::open(data_dir).unwrap_or_else(|_| ViewRegistry::new(data_dir));
315        Ok(Engine {
316            catalog,
317            plan_cache: Mutex::new(PlanCache::new(PLAN_CACHE_CAPACITY)),
318            insert_values_scratch: Vec::new(),
319            view_registry,
320            in_transaction: false,
321            query_memory_limit: mem_budget::DEFAULT_QUERY_MEMORY_LIMIT,
322        })
323    }
324
325    /// Open or create an engine with an explicit per-query memory limit
326    /// (bytes). Used by the server to apply `POWDB_QUERY_MEMORY_LIMIT`, and by
327    /// tests that need a tiny limit to exercise the budget guard.
328    pub fn with_memory_limit(data_dir: &Path, limit_bytes: usize) -> io::Result<Self> {
329        let mut engine = Engine::new(data_dir)?;
330        engine.set_query_memory_limit(limit_bytes);
331        Ok(engine)
332    }
333
334    /// Current per-query memory limit in bytes.
335    pub fn query_memory_limit(&self) -> usize {
336        self.query_memory_limit
337    }
338
339    /// Override the per-query memory limit in bytes (builder-style).
340    pub fn set_query_memory_limit(&mut self, limit_bytes: usize) {
341        self.query_memory_limit = limit_bytes;
342    }
343
344    /// Enter a budgeted-statement frame for the current query. The returned
345    /// guard must be held for the duration of the statement; on its drop the
346    /// reentrancy depth is decremented. Only the *outermost* statement entry
347    /// zeroes this thread's running total, so a nested `execute_powql` (the
348    /// source query of a `create_view`/`refresh_view`) does NOT discard the
349    /// outer frame's accounting. The accumulator is thread-local, so this never
350    /// touches another concurrent query's total.
351    #[must_use = "the budget guard must outlive the statement body"]
352    pub(super) fn enter_memory_budget(&self) -> mem_budget::EnterGuard {
353        mem_budget::enter()
354    }
355
356    /// Charge the estimated footprint of a freshly materialized batch of rows
357    /// against the current per-query budget. Returns
358    /// [`QueryError::MemoryLimitExceeded`] cleanly if the batch would push the
359    /// query over its limit. Used at every full-materialization point (sort
360    /// buffer, join build side, GROUP BY hash table, IN-list).
361    pub(super) fn charge_rows(&self, rows: &[Vec<Value>]) -> Result<(), QueryError> {
362        let mut total = 0usize;
363        for row in rows {
364            total = total.saturating_add(mem_budget::estimate_row_size(row));
365        }
366        mem_budget::charge(total, self.query_memory_limit)
367    }
368
369    /// Charge a materialized IN-list (the literal expressions pulled out of an
370    /// uncorrelated `IN (subquery)`) against the current per-query budget.
371    /// Each item is conservatively sized at the `Expr` slot plus, for string
372    /// literals, the owned heap bytes.
373    pub(super) fn charge_in_list(&self, list: &[crate::ast::Expr]) -> Result<(), QueryError> {
374        let base = std::mem::size_of::<crate::ast::Expr>();
375        let mut total = std::mem::size_of::<Vec<crate::ast::Expr>>();
376        for item in list {
377            total = total.saturating_add(base);
378            if let crate::ast::Expr::Literal(crate::ast::Literal::String(s)) = item {
379                total = total.saturating_add(s.capacity());
380            }
381        }
382        mem_budget::charge(total, self.query_memory_limit)
383    }
384
385    /// Parse + plan + execute a PowQL query.
386    ///
387    /// # Examples
388    ///
389    /// ```
390    /// use powdb_query::executor::Engine;
391    /// use powdb_query::result::QueryResult;
392    ///
393    /// let dir = tempfile::tempdir().unwrap();
394    /// let mut engine = Engine::new(dir.path()).unwrap();
395    ///
396    /// // Create a table and insert a row.
397    /// engine.execute_powql("type User { required name: str, age: int }").unwrap();
398    /// engine.execute_powql(r#"insert User { name := "Alice", age := 30 }"#).unwrap();
399    ///
400    /// // Query rows back.
401    /// let result = engine.execute_powql("User").unwrap();
402    /// assert_eq!(result.row_count(), 1);
403    /// ```
404    ///
405    /// Mission D6 — tracing collapse: the previous implementation ran 4
406    /// `Instant::now()` + 3 `elapsed().as_micros()` calls + formatted an
407    /// `info!` span on every query, even when tracing was disabled. On a
408    /// sub-microsecond `point_lookup_indexed` call that overhead was
409    /// 100-200ns — 20%+ of the whole query. We now measure time only when
410    /// INFO is actually enabled via `tracing::enabled!`, and we moved the
411    /// noisy `debug!(?plan)` line behind the same gate so the Debug
412    /// formatter can't run unconditionally either.
413    ///
414    /// Mission D9 — plan cache: on the hot path we canonicalise the query
415    /// text (lex + FNV-1a hash with literal values stripped), check the
416    /// cache, and on a hit substitute the new literals into a clone of the
417    /// cached plan. This skips re-lexing, re-parsing, and re-planning —
418    /// around 3μs per call on bench workloads. On a miss we plan as before
419    /// and insert the plan under its canonical hash.
420    pub fn execute_powql(&mut self, input: &str) -> Result<QueryResult, QueryError> {
421        // WS2: each *outermost* statement starts with the full memory
422        // allowance. The guard holds the reentrancy depth so a nested
423        // `execute_powql` (e.g. a view's source query) does not reset the
424        // outer frame's accounting mid-statement.
425        let _budget = self.enter_memory_budget();
426        // Hot path: tracing disabled. Zero syscalls, zero formatting.
427        if !tracing::enabled!(Level::INFO) {
428            // D9: try the plan cache first. Canonicalisation lexes the
429            // query once; on a hit we skip the parser and planner entirely.
430            if let Ok((hash, literals)) = canonicalize(input) {
431                let cached = self
432                    .plan_cache
433                    .lock()
434                    .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
435                    .get_with_substitution(hash, &literals);
436                if let Some(plan) = cached {
437                    let plan = lower_unindexed_scans(&self.catalog, &plan);
438                    let result = self.execute_plan(&plan);
439                    // Mission B (post-review): statement-boundary WAL
440                    // group commit. Catalog::wal_log now only appends;
441                    // the fsync happens here exactly once per statement.
442                    // `sync_wal` is a no-op when nothing was buffered
443                    // (pure reads pay zero fsync).
444                    if !self.in_transaction {
445                        self.catalog
446                            .sync_wal()
447                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
448                    }
449                    return result;
450                }
451                // Miss — plan, insert, execute.
452                return match planner::plan(input) {
453                    Ok(plan) => {
454                        self.plan_cache
455                            .lock()
456                            .map_err(|e| {
457                                QueryError::Execution(format!("plan cache lock poisoned: {e}"))
458                            })?
459                            .insert(hash, plan.clone());
460                        let plan = lower_unindexed_scans(&self.catalog, &plan);
461                        let result = self.execute_plan(&plan);
462                        if !self.in_transaction {
463                            self.catalog
464                                .sync_wal()
465                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
466                        }
467                        result
468                    }
469                    Err(e) => Err(QueryError::Parse(e.to_string())),
470                };
471            }
472            // Lex error — fall through to the planner so the caller gets a
473            // consistent error shape.
474            return match planner::plan(input) {
475                Ok(plan) => {
476                    let plan = lower_unindexed_scans(&self.catalog, &plan);
477                    let result = self.execute_plan(&plan);
478                    if !self.in_transaction {
479                        self.catalog
480                            .sync_wal()
481                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
482                    }
483                    result
484                }
485                Err(e) => Err(QueryError::Parse(e.to_string())),
486            };
487        }
488
489        // Instrumented path — only taken under explicit tracing subscribers.
490        let total_start = Instant::now();
491        let plan_start = Instant::now();
492        let plan = planner::plan(input).map_err(|e| {
493            let msg = e.to_string();
494            error!(query = %input, error = %msg, "query plan failed");
495            QueryError::Parse(msg)
496        })?;
497        let plan_us = plan_start.elapsed().as_micros();
498
499        let exec_start = Instant::now();
500        let plan = lower_unindexed_scans(&self.catalog, &plan);
501        let result = self.execute_plan(&plan);
502        if !self.in_transaction {
503            self.catalog
504                .sync_wal()
505                .map_err(|e| QueryError::StorageError(e.to_string()))?;
506        }
507        let exec_us = exec_start.elapsed().as_micros();
508
509        let total_us = total_start.elapsed().as_micros();
510        match &result {
511            Ok(r) => {
512                info!(
513                    query = %input,
514                    plan_us = plan_us,
515                    exec_us = exec_us,
516                    total_us = total_us,
517                    rows = r.row_count(),
518                    "query ok"
519                );
520            }
521            Err(e) => {
522                error!(
523                    query = %input,
524                    plan_us = plan_us,
525                    exec_us = exec_us,
526                    error = %e,
527                    "query failed"
528                );
529            }
530        }
531        result
532    }
533
534    /// Execute PowQL with `$N` placeholders bound to positional `params`.
535    ///
536    /// Task 4: parameters are substituted as literal *tokens* before
537    /// parsing (see [`crate::parser::parse_with_params`]), so untrusted
538    /// input can never change the query's shape. This path deliberately
539    /// **bypasses the plan cache** — template caching is a follow-up — and
540    /// otherwise mirrors the non-cached tail of [`Engine::execute_powql`].
541    pub fn execute_powql_with_params(
542        &mut self,
543        input: &str,
544        params: &[crate::ast::ParamValue],
545    ) -> Result<QueryResult, QueryError> {
546        let _budget = self.enter_memory_budget();
547        let stmt = crate::parser::parse_with_params(input, params)
548            .map_err(|e| QueryError::Parse(e.to_string()))?;
549        let plan =
550            crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
551        let plan = lower_unindexed_scans(&self.catalog, &plan);
552        let result = self.execute_plan(&plan);
553        if !self.in_transaction {
554            self.catalog
555                .sync_wal()
556                .map_err(|e| QueryError::StorageError(e.to_string()))?;
557        }
558        result
559    }
560
561    /// Read-only variant of [`Engine::execute_powql_with_params`].
562    ///
563    /// Mirrors [`Engine::execute_powql_readonly`]: parses with bound
564    /// params, rejects any write statement with
565    /// [`QueryError::ReadonlyNeedsWrite`] so the caller can escalate to the
566    /// write lock, then executes under a shared borrow. No plan-cache
567    /// interaction.
568    pub fn execute_powql_readonly_with_params(
569        &self,
570        input: &str,
571        params: &[crate::ast::ParamValue],
572    ) -> Result<QueryResult, QueryError> {
573        let _budget = self.enter_memory_budget();
574        let stmt = crate::parser::parse_with_params(input, params)
575            .map_err(|e| QueryError::Parse(e.to_string()))?;
576        if !is_read_only_statement(&stmt) {
577            return Err(QueryError::ReadonlyNeedsWrite);
578        }
579        let plan =
580            crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
581        let plan = lower_unindexed_scans(&self.catalog, &plan);
582        self.execute_plan_readonly(&plan)
583    }
584
585    /// Plan cache stats — useful for benches and debugging.
586    pub fn plan_cache_stats(&self) -> (u64, u64, usize) {
587        let cache = self.plan_cache.lock().unwrap_or_else(|e| e.into_inner());
588        (cache.hits, cache.misses, cache.len())
589    }
590
591    /// Mission infra-1: read-only entry point.
592    ///
593    /// Parses + plans + executes a PowQL query using only a shared borrow
594    /// on the engine. Rejects any statement that would mutate state
595    /// (Insert/Update/Delete/CreateTable/AlterTable/DropTable/CreateView/
596    /// RefreshView/DropView) by returning [`READONLY_NEEDS_WRITE`] so the
597    /// caller can escalate to the write lock.
598    ///
599    /// Also returns [`READONLY_NEEDS_WRITE`] if a materialized view in the
600    /// query is dirty — refreshing one requires `&mut self`, so the caller
601    /// must retake the write lock for the first refresh.
602    ///
603    /// This method is the concurrent-read fast path behind
604    /// `Arc<RwLock<Engine>>`: multiple threads can call it simultaneously
605    /// under a shared `.read()` lock and each will scan independently.
606    pub fn execute_powql_readonly(&self, input: &str) -> Result<QueryResult, QueryError> {
607        // WS2: each *outermost* statement starts with the full memory
608        // allowance. The guard holds the reentrancy depth so a nested
609        // `execute_powql*` does not reset the outer frame's accounting.
610        let _budget = self.enter_memory_budget();
611        // Parse the statement first so we can classify read vs. write
612        // without touching the catalog. This is the same lex+parse cost
613        // the hot path would pay anyway.
614        let stmt = crate::parser::parse(input).map_err(|e| QueryError::Parse(e.to_string()))?;
615        if !is_read_only_statement(&stmt) {
616            return Err(QueryError::ReadonlyNeedsWrite);
617        }
618
619        // Try the plan cache first — identical hash scheme to
620        // `execute_powql` so both paths share cache state. The mutex
621        // section is just a hashmap lookup + plan clone.
622        if let Ok((hash, literals)) = canonicalize(input) {
623            let cached = self
624                .plan_cache
625                .lock()
626                .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
627                .get_with_substitution(hash, &literals);
628            if let Some(plan) = cached {
629                let plan = lower_unindexed_scans(&self.catalog, &plan);
630                return self.execute_plan_readonly(&plan);
631            }
632            // Miss: plan + insert + execute. The planner is pure, so this
633            // is safe from `&self`.
634            let plan = crate::planner::plan_statement(stmt)
635                .map_err(|e| QueryError::Parse(e.to_string()))?;
636            self.plan_cache
637                .lock()
638                .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
639                .insert(hash, plan.clone());
640            let plan = lower_unindexed_scans(&self.catalog, &plan);
641            return self.execute_plan_readonly(&plan);
642        }
643        // Lex error — fall through to the planner for a consistent error
644        // shape (though `parse` above would usually have caught it).
645        let plan =
646            crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
647        let plan = lower_unindexed_scans(&self.catalog, &plan);
648        self.execute_plan_readonly(&plan)
649    }
650
651    /// Read-only version of [`Engine::execute_plan`]. Dispatches the
652    /// read-path plan variants by calling `&self` helpers and errors with
653    /// [`READONLY_NEEDS_WRITE`] on any write variant. This is the
654    /// recursion target for composite read plans under the RwLock reader.
655    ///
656    /// The dispatch mirrors `execute_plan` for the read branches but does
657    /// not carry any of the fast-paths that need `&mut self` (e.g. plan-
658    /// cache mutation on inner subqueries is handled via the shared mutex
659    /// in [`Engine::execute_powql_readonly`]; in-flight subquery
660    /// materialisation uses [`Engine::materialize_subqueries_readonly`]).
661    fn execute_plan_readonly(&self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
662        match plan {
663            PlanNode::SeqScan { table } => {
664                // Dirty view means we'd need to refresh it — can't do that
665                // under `&self`. Escalate to the write path.
666                if self.view_registry.is_dirty(table) {
667                    return Err(QueryError::ReadonlyNeedsWrite);
668                }
669                let schema = self
670                    .catalog
671                    .schema(table)
672                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
673                    .clone();
674                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
675                let rows: Vec<Vec<Value>> = self
676                    .catalog
677                    .scan(table)
678                    .map_err(|e| e.to_string())?
679                    .map(|(_, row)| row)
680                    .collect();
681                Ok(QueryResult::Rows { columns, rows })
682            }
683
684            PlanNode::AliasScan { table, alias } => {
685                let schema = self
686                    .catalog
687                    .schema(table)
688                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
689                    .clone();
690                let columns: Vec<String> = schema
691                    .columns
692                    .iter()
693                    .map(|c| format!("{alias}.{}", c.name))
694                    .collect();
695                let rows: Vec<Vec<Value>> = self
696                    .catalog
697                    .scan(table)
698                    .map_err(|e| e.to_string())?
699                    .map(|(_, row)| row)
700                    .collect();
701                Ok(QueryResult::Rows { columns, rows })
702            }
703
704            PlanNode::IndexScan { table, column, key } => {
705                let schema = self
706                    .catalog
707                    .schema(table)
708                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
709                    .clone();
710                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
711                let key_value = literal_to_value(key)?;
712                let tbl = self
713                    .catalog
714                    .get_table(table)
715                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
716
717                if tbl.has_index(column) {
718                    // Use index_lookup_all to handle both unique and
719                    // non-unique indexes — returns all matching RowIds.
720                    let rids = tbl.index_lookup_all(column, &key_value);
721                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
722                    for rid in rids {
723                        if let Some(data) = tbl.heap.get(rid) {
724                            rows.push(decode_row(&tbl.schema, &data));
725                        }
726                    }
727                    return Ok(QueryResult::Rows { columns, rows });
728                }
729
730                // No index: synthetic eq predicate + compiled scan.
731                let fast = FastLayout::new(&schema);
732                let synth_pred = Expr::BinaryOp(
733                    Box::new(Expr::Field(column.clone())),
734                    BinOp::Eq,
735                    Box::new(key.clone()),
736                );
737                if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, &schema) {
738                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
739                    self.catalog
740                        .for_each_row_raw(table, |_rid, data| {
741                            if compiled(data) {
742                                rows.push(decode_row(&schema, data));
743                            }
744                        })
745                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
746                    return Ok(QueryResult::Rows { columns, rows });
747                }
748
749                // Last resort: slow eq-check.
750                let col_idx =
751                    schema
752                        .column_index(column)
753                        .ok_or_else(|| QueryError::ColumnNotFound {
754                            table: String::new(),
755                            column: column.clone(),
756                        })?;
757                let rows: Vec<Vec<Value>> = tbl
758                    .scan()
759                    .filter_map(|(_, row)| {
760                        if row[col_idx] == key_value {
761                            Some(row)
762                        } else {
763                            None
764                        }
765                    })
766                    .collect();
767                Ok(QueryResult::Rows { columns, rows })
768            }
769
770            PlanNode::RangeScan {
771                table,
772                column,
773                start,
774                end,
775            } => {
776                let tbl = self
777                    .catalog
778                    .get_table(table)
779                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
780                let columns: Vec<String> =
781                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
782                let schema = tbl.schema.clone();
783
784                let start_val = match start {
785                    Some((expr, _)) => Some(literal_to_value(expr)?),
786                    None => None,
787                };
788                let end_val = match end {
789                    Some((expr, _)) => Some(literal_to_value(expr)?),
790                    None => None,
791                };
792                let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
793                let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
794
795                // Range scans only use the btree fast path for unique indexes.
796                // Non-unique indexes store composite keys that don't compare
797                // directly against raw column values.
798                if tbl.is_index_unique(column) == Some(true) {
799                    if let Some(btree) = tbl.index(column) {
800                        let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
801                            (Some(s), Some(e)) => btree.range(s, e).collect(),
802                            (Some(s), None) => btree.range_from(s),
803                            (None, Some(e)) => btree.range_to(e),
804                            (None, None) => {
805                                // Unbounded both sides — equivalent to seq scan.
806                                let rows: Vec<Vec<Value>> =
807                                    tbl.scan().map(|(_, row)| row).collect();
808                                return Ok(QueryResult::Rows { columns, rows });
809                            }
810                        };
811                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
812                        for (key, rid) in hits {
813                            // Filter for exclusive bounds.
814                            if !start_inclusive {
815                                if let Some(ref s) = start_val {
816                                    if &key == s {
817                                        continue;
818                                    }
819                                }
820                            }
821                            if !end_inclusive {
822                                if let Some(ref e) = end_val {
823                                    if &key == e {
824                                        continue;
825                                    }
826                                }
827                            }
828                            if let Some(data) = tbl.heap.get(rid) {
829                                rows.push(decode_row(&schema, &data));
830                            }
831                        }
832                        return Ok(QueryResult::Rows { columns, rows });
833                    }
834                }
835
836                // Fallback: no index — synthesize the range predicate and scan.
837                let fast = FastLayout::new(&schema);
838                let synth = synthesize_range_predicate(column, start, end);
839                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, &schema) {
840                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
841                    self.catalog
842                        .for_each_row_raw(table, |_rid, data| {
843                            if compiled(data) {
844                                rows.push(decode_row(&schema, data));
845                            }
846                        })
847                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
848                    return Ok(QueryResult::Rows { columns, rows });
849                }
850
851                // Last resort: decoded row eval.
852                let col_idx =
853                    schema
854                        .column_index(column)
855                        .ok_or_else(|| QueryError::ColumnNotFound {
856                            table: String::new(),
857                            column: column.clone(),
858                        })?;
859                let rows: Vec<Vec<Value>> = tbl
860                    .scan()
861                    .filter(|(_, row)| {
862                        range_matches(
863                            &row[col_idx],
864                            &start_val,
865                            start_inclusive,
866                            &end_val,
867                            end_inclusive,
868                        )
869                    })
870                    .map(|(_, row)| row)
871                    .collect();
872                Ok(QueryResult::Rows { columns, rows })
873            }
874
875            PlanNode::Filter { input, predicate } => {
876                // Materialise subqueries using the `&self` variant.
877                // Uncorrelated subqueries are replaced with InList/Bool;
878                // correlated ones are left as InSubquery/ExistsSubquery
879                // for per-row materialisation below.
880                let materialized;
881                let predicate = if contains_subquery(predicate) {
882                    materialized = self.materialize_subqueries_readonly(predicate)?;
883                    &materialized
884                } else {
885                    predicate
886                };
887
888                // Correlated subquery path: per-row materialisation.
889                if contains_subquery(predicate) {
890                    let result = self.execute_plan_readonly(input)?;
891                    return match result {
892                        QueryResult::Rows { columns, rows } => {
893                            let mut filtered = Vec::new();
894                            for row in rows {
895                                let row_pred = self.materialize_correlated_for_row_readonly(
896                                    predicate, &row, &columns,
897                                )?;
898                                if eval_predicate(&row_pred, &row, &columns) {
899                                    filtered.push(row);
900                                }
901                            }
902                            Ok(QueryResult::Rows {
903                                columns,
904                                rows: filtered,
905                            })
906                        }
907                        _ => Err("filter requires row input".into()),
908                    };
909                }
910
911                // Fused Filter+SeqScan fast path.
912                if let PlanNode::SeqScan { table } = input.as_ref() {
913                    if self.view_registry.is_dirty(table) {
914                        return Err(QueryError::ReadonlyNeedsWrite);
915                    }
916                    let schema = self
917                        .catalog
918                        .schema(table)
919                        .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
920                        .clone();
921                    let columns: Vec<String> =
922                        schema.columns.iter().map(|c| c.name.clone()).collect();
923                    let fast = FastLayout::new(&schema);
924                    let row_layout = RowLayout::new(&schema);
925                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
926
927                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
928                        self.catalog
929                            .for_each_row_raw(table, |_rid, data| {
930                                if compiled(data) {
931                                    rows.push(decode_row(&schema, data));
932                                }
933                            })
934                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
935                    } else {
936                        let pred_cols = predicate_column_indices(predicate, &columns);
937                        self.catalog
938                            .for_each_row_raw(table, |_rid, data| {
939                                let pred_row =
940                                    decode_selective(&schema, &row_layout, data, &pred_cols);
941                                if eval_predicate(predicate, &pred_row, &columns) {
942                                    rows.push(decode_row(&schema, data));
943                                }
944                            })
945                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
946                    }
947
948                    return Ok(QueryResult::Rows { columns, rows });
949                }
950
951                // General path.
952                let result = self.execute_plan_readonly(input)?;
953                match result {
954                    QueryResult::Rows { columns, rows } => {
955                        let filtered: Vec<Vec<Value>> = rows
956                            .into_iter()
957                            .filter(|row| eval_predicate(predicate, row, &columns))
958                            .collect();
959                        Ok(QueryResult::Rows {
960                            columns,
961                            rows: filtered,
962                        })
963                    }
964                    _ => Err("filter requires row input".into()),
965                }
966            }
967
968            PlanNode::Project { input, fields } => {
969                // Fast path: Project over IndexScan. Avoids full-row decode
970                // by calling decode_column only for projected fields.
971                if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
972                    let key_value = literal_to_value(key)?;
973                    let tbl = self
974                        .catalog
975                        .get_table(table)
976                        .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
977                    let schema = &tbl.schema;
978                    let layout = tbl.row_layout();
979
980                    let proj_columns: Vec<String> = fields
981                        .iter()
982                        .map(|f| {
983                            f.alias.clone().unwrap_or_else(|| match &f.expr {
984                                Expr::Field(name) => name.clone(),
985                                _ => "?".into(),
986                            })
987                        })
988                        .collect();
989
990                    let proj_indices: Vec<usize> = fields
991                        .iter()
992                        .filter_map(|f| {
993                            if let Expr::Field(name) = &f.expr {
994                                schema.column_index(name)
995                            } else {
996                                None
997                            }
998                        })
999                        .collect();
1000
1001                    if tbl.has_index(column) {
1002                        let rids = tbl.index_lookup_all(column, &key_value);
1003                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1004                        for rid in rids {
1005                            if let Some(data) = tbl.heap.get(rid) {
1006                                let row: Vec<Value> = proj_indices
1007                                    .iter()
1008                                    .map(|&ci| decode_column(schema, layout, &data, ci))
1009                                    .collect();
1010                                rows.push(row);
1011                            }
1012                        }
1013                        return Ok(QueryResult::Rows {
1014                            columns: proj_columns,
1015                            rows,
1016                        });
1017                    }
1018                }
1019
1020                // Fast paths over Limit(Sort(...)) / Limit(Filter(...)) / Limit(SeqScan).
1021                if let PlanNode::Limit {
1022                    input: inner,
1023                    count: limit_expr,
1024                } = input.as_ref()
1025                {
1026                    if let PlanNode::Sort {
1027                        input: sort_input,
1028                        keys,
1029                    } = inner.as_ref()
1030                    {
1031                        if keys.len() == 1 {
1032                            let sort_field = &keys[0].field;
1033                            let descending = keys[0].descending;
1034                            let limit = match limit_expr {
1035                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1036                                _ => usize::MAX,
1037                            };
1038                            let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
1039                                match sort_input.as_ref() {
1040                                    PlanNode::SeqScan { table } => (Some(table.as_str()), None),
1041                                    PlanNode::Filter {
1042                                        input: fi,
1043                                        predicate,
1044                                    } => {
1045                                        if let PlanNode::SeqScan { table } = fi.as_ref() {
1046                                            (Some(table.as_str()), Some(predicate))
1047                                        } else {
1048                                            (None, None)
1049                                        }
1050                                    }
1051                                    _ => (None, None),
1052                                };
1053                            if let Some(table) = table_opt {
1054                                if let Some(result) = self.project_filter_sort_limit_fast(
1055                                    table, fields, sort_field, descending, limit, pred_opt,
1056                                )? {
1057                                    return Ok(result);
1058                                }
1059                            }
1060                        }
1061                    }
1062                    if let PlanNode::Filter {
1063                        input: fi,
1064                        predicate,
1065                    } = inner.as_ref()
1066                    {
1067                        if let PlanNode::SeqScan { table } = fi.as_ref() {
1068                            let limit = match limit_expr {
1069                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1070                                _ => usize::MAX,
1071                            };
1072                            if let Some(result) = self.project_filter_limit_fast(
1073                                table,
1074                                fields,
1075                                limit,
1076                                Some(predicate),
1077                            )? {
1078                                return Ok(result);
1079                            }
1080                        }
1081                    }
1082                    if let PlanNode::SeqScan { table } = inner.as_ref() {
1083                        let limit = match limit_expr {
1084                            Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1085                            _ => usize::MAX,
1086                        };
1087                        if let Some(result) =
1088                            self.project_filter_limit_fast(table, fields, limit, None)?
1089                        {
1090                            return Ok(result);
1091                        }
1092                    }
1093                }
1094
1095                // Project(Filter(SeqScan)) without Limit.
1096                if let PlanNode::Filter {
1097                    input: fi,
1098                    predicate,
1099                } = input.as_ref()
1100                {
1101                    if let PlanNode::SeqScan { table } = fi.as_ref() {
1102                        if let Some(result) = self.project_filter_limit_fast(
1103                            table,
1104                            fields,
1105                            usize::MAX,
1106                            Some(predicate),
1107                        )? {
1108                            return Ok(result);
1109                        }
1110                    }
1111                }
1112
1113                // Project(SeqScan) without Filter or Limit.
1114                if let PlanNode::SeqScan { table } = input.as_ref() {
1115                    if let Some(result) =
1116                        self.project_filter_limit_fast(table, fields, usize::MAX, None)?
1117                    {
1118                        return Ok(result);
1119                    }
1120                }
1121
1122                // Generic path.
1123                let result = self.execute_plan_readonly(input)?;
1124                match result {
1125                    QueryResult::Rows { columns, rows } => {
1126                        let proj_columns: Vec<String> = fields
1127                            .iter()
1128                            .map(|f| {
1129                                f.alias.clone().unwrap_or_else(|| match &f.expr {
1130                                    Expr::Field(name) => name.clone(),
1131                                    Expr::QualifiedField { qualifier, field } => {
1132                                        format!("{qualifier}.{field}")
1133                                    }
1134                                    _ => "?".into(),
1135                                })
1136                            })
1137                            .collect();
1138                        let proj_rows: Vec<Vec<Value>> = rows
1139                            .iter()
1140                            .map(|row| {
1141                                fields
1142                                    .iter()
1143                                    .map(|f| eval_expr(&f.expr, row, &columns))
1144                                    .collect()
1145                            })
1146                            .collect();
1147                        Ok(QueryResult::Rows {
1148                            columns: proj_columns,
1149                            rows: proj_rows,
1150                        })
1151                    }
1152                    _ => Err("project requires row input".into()),
1153                }
1154            }
1155
1156            PlanNode::Sort { input, keys } => {
1157                let result = self.execute_plan_readonly(input)?;
1158                match result {
1159                    QueryResult::Rows { columns, mut rows } => {
1160                        if rows.len() > MAX_SORT_ROWS {
1161                            return Err(QueryError::SortLimitExceeded);
1162                        }
1163                        // WS2: byte-budget guard on the sort buffer.
1164                        self.charge_rows(&rows)?;
1165                        let key_indices: Vec<(usize, bool)> = keys
1166                            .iter()
1167                            .map(|k| {
1168                                columns
1169                                    .iter()
1170                                    .position(|c| c == &k.field)
1171                                    .map(|idx| (idx, k.descending))
1172                                    .ok_or_else(|| QueryError::ColumnNotFound {
1173                                        table: String::new(),
1174                                        column: k.field.clone(),
1175                                    })
1176                            })
1177                            .collect::<Result<_, QueryError>>()?;
1178                        rows.sort_by(|a, b| {
1179                            for &(col_idx, descending) in &key_indices {
1180                                let cmp = a[col_idx].cmp(&b[col_idx]);
1181                                let cmp = if descending { cmp.reverse() } else { cmp };
1182                                if cmp != std::cmp::Ordering::Equal {
1183                                    return cmp;
1184                                }
1185                            }
1186                            std::cmp::Ordering::Equal
1187                        });
1188                        Ok(QueryResult::Rows { columns, rows })
1189                    }
1190                    _ => Err("sort requires row input".into()),
1191                }
1192            }
1193
1194            PlanNode::Limit { input, count } => {
1195                let result = self.execute_plan_readonly(input)?;
1196                let n = match count {
1197                    Expr::Literal(Literal::Int(v)) => *v as usize,
1198                    _ => return Err("limit must be integer literal".into()),
1199                };
1200                match result {
1201                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1202                        columns,
1203                        rows: rows.into_iter().take(n).collect(),
1204                    }),
1205                    _ => Err("limit requires row input".into()),
1206                }
1207            }
1208
1209            PlanNode::Offset { input, count } => {
1210                let result = self.execute_plan_readonly(input)?;
1211                let n = match count {
1212                    Expr::Literal(Literal::Int(v)) => *v as usize,
1213                    _ => return Err("offset must be integer literal".into()),
1214                };
1215                match result {
1216                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1217                        columns,
1218                        rows: rows.into_iter().skip(n).collect(),
1219                    }),
1220                    _ => Err("offset requires row input".into()),
1221                }
1222            }
1223
1224            PlanNode::Aggregate {
1225                input,
1226                function,
1227                field,
1228            } => {
1229                // Fast path: count() over SeqScan.
1230                if *function == AggFunc::Count {
1231                    if let PlanNode::SeqScan { table } = input.as_ref() {
1232                        // A dirty materialized view must be refreshed before
1233                        // it can be counted, which needs `&mut self`. Escalate
1234                        // to the write path (F3: count(View) returned stale).
1235                        if self.view_registry.is_dirty(table) {
1236                            return Err(QueryError::ReadonlyNeedsWrite);
1237                        }
1238                        let mut count: i64 = 0;
1239                        self.catalog
1240                            .for_each_row_raw(table, |_rid, _data| {
1241                                count += 1;
1242                            })
1243                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1244                        return Ok(QueryResult::Scalar(Value::Int(count)));
1245                    }
1246                    if let PlanNode::Filter {
1247                        input: inner,
1248                        predicate,
1249                    } = input.as_ref()
1250                    {
1251                        // Only take the fast path for a plain Filter(SeqScan)
1252                        // with no subquery in the predicate. A subquery
1253                        // predicate (`count(T filter .x in (...))`) must be
1254                        // resolved first; the fast path evaluates the raw
1255                        // predicate with no subquery materialisation, which
1256                        // silently yields 0 (F1). Falling through routes it to
1257                        // the generic path that runs the subquery correctly.
1258                        if let PlanNode::SeqScan { table } = inner.as_ref() {
1259                            if self.view_registry.is_dirty(table) {
1260                                // F3: count(View filter ...) over a dirty view.
1261                                return Err(QueryError::ReadonlyNeedsWrite);
1262                            }
1263                        }
1264                        if let (PlanNode::SeqScan { table }, false) =
1265                            (inner.as_ref(), contains_subquery(predicate))
1266                        {
1267                            let schema = self
1268                                .catalog
1269                                .schema(table)
1270                                .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
1271                                .clone();
1272                            let columns: Vec<String> =
1273                                schema.columns.iter().map(|c| c.name.clone()).collect();
1274                            let fast = FastLayout::new(&schema);
1275                            let row_layout = RowLayout::new(&schema);
1276
1277                            if let Some(compiled) =
1278                                compile_predicate(predicate, &columns, &fast, &schema)
1279                            {
1280                                let mut count: i64 = 0;
1281                                self.catalog
1282                                    .for_each_row_raw(table, |_rid, data| {
1283                                        if compiled(data) {
1284                                            count += 1;
1285                                        }
1286                                    })
1287                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1288                                return Ok(QueryResult::Scalar(Value::Int(count)));
1289                            }
1290
1291                            let pred_cols = predicate_column_indices(predicate, &columns);
1292                            let mut count: i64 = 0;
1293                            self.catalog
1294                                .for_each_row_raw(table, |_rid, data| {
1295                                    let pred_row =
1296                                        decode_selective(&schema, &row_layout, data, &pred_cols);
1297                                    if eval_predicate(predicate, &pred_row, &columns) {
1298                                        count += 1;
1299                                    }
1300                                })
1301                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1302                            return Ok(QueryResult::Scalar(Value::Int(count)));
1303                        }
1304                    }
1305                }
1306
1307                // Fast path: sum/avg/min/max over single fixed-size numeric.
1308                if matches!(
1309                    function,
1310                    AggFunc::Sum
1311                        | AggFunc::Avg
1312                        | AggFunc::Min
1313                        | AggFunc::Max
1314                        | AggFunc::CountDistinct
1315                ) {
1316                    if let Some(col) = field.as_ref() {
1317                        let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
1318                            match input.as_ref() {
1319                                PlanNode::SeqScan { table } => (Some(table.as_str()), None),
1320                                PlanNode::Filter {
1321                                    input: inner,
1322                                    predicate,
1323                                } => {
1324                                    if let PlanNode::SeqScan { table } = inner.as_ref() {
1325                                        (Some(table.as_str()), Some(predicate))
1326                                    } else {
1327                                        (None, None)
1328                                    }
1329                                }
1330                                _ => (None, None),
1331                            };
1332                        if let Some(table) = table_opt {
1333                            if let Some(result) =
1334                                self.agg_single_col_fast(table, col, *function, pred_opt)?
1335                            {
1336                                return Ok(result);
1337                            }
1338                        }
1339                    }
1340                }
1341
1342                // Generic path.
1343                let result = self.execute_plan_readonly(input)?;
1344                match result {
1345                    QueryResult::Rows { columns, rows } => match function {
1346                        AggFunc::Count => Ok(QueryResult::Scalar(Value::Int(rows.len() as i64))),
1347                        AggFunc::CountDistinct => {
1348                            let col = field.as_ref().ok_or("count distinct requires field")?;
1349                            let idx = columns
1350                                .iter()
1351                                .position(|c| c == col)
1352                                .ok_or("col not found")?;
1353                            let mut seen = std::collections::HashSet::new();
1354                            for row in &rows {
1355                                let v = &row[idx];
1356                                if !v.is_empty() {
1357                                    seen.insert(v.clone());
1358                                }
1359                            }
1360                            Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
1361                        }
1362                        AggFunc::Avg => {
1363                            let col = field.as_ref().ok_or("avg requires field")?;
1364                            let idx = columns
1365                                .iter()
1366                                .position(|c| c == col)
1367                                .ok_or("col not found")?;
1368                            let sum: f64 = rows
1369                                .iter()
1370                                .filter_map(|r| match &r[idx] {
1371                                    Value::Int(v) => Some(*v as f64),
1372                                    Value::Float(v) => Some(*v),
1373                                    _ => None,
1374                                })
1375                                .sum();
1376                            let count = rows.len() as f64;
1377                            Ok(QueryResult::Scalar(Value::Float(sum / count)))
1378                        }
1379                        AggFunc::Sum => {
1380                            let col = field.as_ref().ok_or("sum requires field")?;
1381                            let idx = columns
1382                                .iter()
1383                                .position(|c| c == col)
1384                                .ok_or("col not found")?;
1385                            let mut int_sum: i64 = 0;
1386                            let mut float_sum: f64 = 0.0;
1387                            let mut saw_float = false;
1388                            for r in &rows {
1389                                match &r[idx] {
1390                                    Value::Int(v) => int_sum += *v,
1391                                    Value::Float(v) => {
1392                                        float_sum += *v;
1393                                        saw_float = true;
1394                                    }
1395                                    _ => {}
1396                                }
1397                            }
1398                            let result = if saw_float {
1399                                Value::Float(float_sum + int_sum as f64)
1400                            } else {
1401                                Value::Int(int_sum)
1402                            };
1403                            Ok(QueryResult::Scalar(result))
1404                        }
1405                        AggFunc::Min | AggFunc::Max => {
1406                            let col = field.as_ref().ok_or("min/max requires field")?;
1407                            let idx = columns
1408                                .iter()
1409                                .position(|c| c == col)
1410                                .ok_or("col not found")?;
1411                            let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
1412                            let result = if *function == AggFunc::Min {
1413                                vals.into_iter().min().cloned()
1414                            } else {
1415                                vals.into_iter().max().cloned()
1416                            };
1417                            Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
1418                        }
1419                    },
1420                    _ => Err("aggregate requires row input".into()),
1421                }
1422            }
1423
1424            PlanNode::Distinct { input } => {
1425                let result = self.execute_plan_readonly(input)?;
1426                match result {
1427                    QueryResult::Rows { columns, rows } => {
1428                        let mut seen = std::collections::HashSet::new();
1429                        let mut unique_rows = Vec::new();
1430                        for row in rows {
1431                            if seen.insert(row.clone()) {
1432                                unique_rows.push(row);
1433                            }
1434                        }
1435                        Ok(QueryResult::Rows {
1436                            columns,
1437                            rows: unique_rows,
1438                        })
1439                    }
1440                    other => Ok(other),
1441                }
1442            }
1443
1444            PlanNode::GroupBy {
1445                input,
1446                keys,
1447                aggregates,
1448                having,
1449            } => {
1450                let result = self.execute_plan_readonly(input)?;
1451                match result {
1452                    QueryResult::Rows { columns, rows } => {
1453                        // WS2: byte-budget guard on the GROUP BY input buffer
1454                        // (the hash table is bounded by the input it groups).
1455                        self.charge_rows(&rows)?;
1456                        let key_indices: Vec<usize> = keys
1457                            .iter()
1458                            .map(|k| {
1459                                columns.iter().position(|c| c == k).ok_or_else(|| {
1460                                    QueryError::ColumnNotFound {
1461                                        table: String::new(),
1462                                        column: k.clone(),
1463                                    }
1464                                })
1465                            })
1466                            .collect::<Result<Vec<_>, _>>()?;
1467
1468                        let agg_field_indices: Vec<usize> = aggregates
1469                            .iter()
1470                            .map(|a| {
1471                                if a.field == "*" {
1472                                    Ok(usize::MAX)
1473                                } else {
1474                                    columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1475                                        QueryError::ColumnNotFound {
1476                                            table: String::new(),
1477                                            column: a.field.clone(),
1478                                        }
1479                                    })
1480                                }
1481                            })
1482                            .collect::<Result<Vec<_>, _>>()?;
1483
1484                        let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1485                            rustc_hash::FxHashMap::default();
1486                        let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1487                        for (ri, row) in rows.iter().enumerate() {
1488                            let key: Vec<Value> =
1489                                key_indices.iter().map(|&i| row[i].clone()).collect();
1490                            match group_map.get(&key) {
1491                                Some(&idx) => groups[idx].1.push(ri),
1492                                None => {
1493                                    let idx = groups.len();
1494                                    group_map.insert(key.clone(), idx);
1495                                    groups.push((key, vec![ri]));
1496                                }
1497                            }
1498                        }
1499
1500                        let mut out_columns: Vec<String> = keys.clone();
1501                        for agg in aggregates.iter() {
1502                            out_columns.push(agg.output_name.clone());
1503                        }
1504
1505                        let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1506                        for (key_vals, row_indices) in &groups {
1507                            let mut row = key_vals.clone();
1508                            for (ai, agg) in aggregates.iter().enumerate() {
1509                                let col_idx = agg_field_indices[ai];
1510                                let val = compute_group_aggregate(
1511                                    agg.function,
1512                                    &rows,
1513                                    row_indices,
1514                                    col_idx,
1515                                );
1516                                row.push(val);
1517                            }
1518                            out_rows.push(row);
1519                        }
1520
1521                        if let Some(having_expr) = having {
1522                            out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1523                        }
1524
1525                        Ok(QueryResult::Rows {
1526                            columns: out_columns,
1527                            rows: out_rows,
1528                        })
1529                    }
1530                    _ => Err("group by requires row input".into()),
1531                }
1532            }
1533
1534            PlanNode::NestedLoopJoin {
1535                left,
1536                right,
1537                on,
1538                kind,
1539            } => {
1540                let left_result = self.execute_plan_readonly(left)?;
1541                let right_result = self.execute_plan_readonly(right)?;
1542                let (left_columns, left_rows) = match left_result {
1543                    QueryResult::Rows { columns, rows } => (columns, rows),
1544                    _ => return Err("join left side must produce rows".into()),
1545                };
1546                let (right_columns, right_rows) = match right_result {
1547                    QueryResult::Rows { columns, rows } => (columns, rows),
1548                    _ => return Err("join right side must produce rows".into()),
1549                };
1550
1551                // WS2: byte-budget guard on the join build side.
1552                self.charge_rows(&left_rows)?;
1553                self.charge_rows(&right_rows)?;
1554
1555                if !matches!(kind, JoinKind::Cross) {
1556                    if let Some(pred) = on {
1557                        if let Some((l_idx, r_idx)) =
1558                            try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1559                        {
1560                            let result = hash_join(
1561                                left_columns,
1562                                left_rows,
1563                                right_columns,
1564                                right_rows,
1565                                l_idx,
1566                                r_idx,
1567                                *kind,
1568                            );
1569                            if let QueryResult::Rows { ref rows, .. } = result {
1570                                check_join_limit(rows.len())?;
1571                            }
1572                            return Ok(result);
1573                        }
1574                    }
1575                }
1576
1577                let n_left = left_columns.len();
1578                let n_right = right_columns.len();
1579                let mut columns = Vec::with_capacity(n_left + n_right);
1580                columns.extend(left_columns);
1581                columns.extend(right_columns);
1582
1583                let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1584                let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1585
1586                for left_row in &left_rows {
1587                    let mut matched = false;
1588                    for right_row in &right_rows {
1589                        combined.clear();
1590                        combined.extend_from_slice(left_row);
1591                        combined.extend_from_slice(right_row);
1592                        let keep = match kind {
1593                            JoinKind::Cross => true,
1594                            JoinKind::Inner | JoinKind::LeftOuter => match on {
1595                                Some(pred) => eval_predicate(pred, &combined, &columns),
1596                                None => true,
1597                            },
1598                            JoinKind::RightOuter => {
1599                                unreachable!("planner rewrites RightOuter to LeftOuter")
1600                            }
1601                        };
1602                        if keep {
1603                            rows.push(combined.clone());
1604                            check_join_limit(rows.len())?;
1605                            matched = true;
1606                        }
1607                    }
1608                    if !matched && matches!(kind, JoinKind::LeftOuter) {
1609                        let mut row = Vec::with_capacity(n_left + n_right);
1610                        row.extend_from_slice(left_row);
1611                        row.resize(n_left + n_right, Value::Empty);
1612                        rows.push(row);
1613                        check_join_limit(rows.len())?;
1614                    }
1615                }
1616
1617                Ok(QueryResult::Rows { columns, rows })
1618            }
1619
1620            PlanNode::Window { input, windows } => {
1621                let result = self.execute_plan_readonly(input)?;
1622                execute_window(result, windows)
1623            }
1624
1625            PlanNode::Union { left, right, all } => {
1626                let left_result = self.execute_plan_readonly(left)?;
1627                let right_result = self.execute_plan_readonly(right)?;
1628                let (left_cols, left_rows) = match left_result {
1629                    QueryResult::Rows { columns, rows } => (columns, rows),
1630                    _ => return Err("UNION requires query results on left side".into()),
1631                };
1632                let (_, right_rows) = match right_result {
1633                    QueryResult::Rows { columns, rows } => (columns, rows),
1634                    _ => return Err("UNION requires query results on right side".into()),
1635                };
1636                let mut combined = left_rows;
1637                if *all {
1638                    combined.extend(right_rows);
1639                } else {
1640                    let mut seen = std::collections::HashSet::new();
1641                    for row in &combined {
1642                        seen.insert(row.clone());
1643                    }
1644                    for row in right_rows {
1645                        if seen.insert(row.clone()) {
1646                            combined.push(row);
1647                        }
1648                    }
1649                }
1650                Ok(QueryResult::Rows {
1651                    columns: left_cols,
1652                    rows: combined,
1653                })
1654            }
1655
1656            PlanNode::Explain { input } => {
1657                let text = format_plan_tree(input, 0);
1658                Ok(QueryResult::Rows {
1659                    columns: vec!["plan".to_string()],
1660                    rows: text
1661                        .lines()
1662                        .map(|line| vec![Value::Str(line.to_string())])
1663                        .collect(),
1664                })
1665            }
1666
1667            // All write variants — caller must escalate to the write lock.
1668            PlanNode::Insert { .. }
1669            | PlanNode::Update { .. }
1670            | PlanNode::Delete { .. }
1671            | PlanNode::Upsert { .. }
1672            | PlanNode::CreateTable { .. }
1673            | PlanNode::AlterTable { .. }
1674            | PlanNode::DropTable { .. }
1675            | PlanNode::CreateView { .. }
1676            | PlanNode::RefreshView { .. }
1677            | PlanNode::DropView { .. }
1678            | PlanNode::Begin
1679            | PlanNode::Commit
1680            | PlanNode::Rollback => Err(QueryError::ReadonlyNeedsWrite),
1681        }
1682    }
1683
1684    /// `&self` variant of [`Engine::materialize_subqueries`]. Used by the
1685    /// read path so `Filter` predicates with `InSubquery`/`ExistsSubquery`
1686    /// children can evaluate their inner queries without taking the write
1687    /// lock. Inner queries that would themselves need a write (e.g. dirty
1688    /// view) escalate via [`READONLY_NEEDS_WRITE`] just like the top-level
1689    /// read path does.
1690    fn materialize_subqueries_readonly(&self, expr: &Expr) -> Result<Expr, QueryError> {
1691        match expr {
1692            Expr::InSubquery {
1693                expr: inner,
1694                subquery,
1695                negated,
1696            } => {
1697                if is_correlated_subquery(subquery, &self.catalog) {
1698                    // Pass through — will be materialized per-row in the
1699                    // Filter handler's correlated subquery path.
1700                    let inner = self.materialize_subqueries_readonly(inner)?;
1701                    return Ok(Expr::InSubquery {
1702                        expr: Box::new(inner),
1703                        subquery: subquery.clone(),
1704                        negated: *negated,
1705                    });
1706                }
1707                let inner = self.materialize_subqueries_readonly(inner)?;
1708                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1709                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1710                let result = self.execute_plan_readonly(&sub_plan)?;
1711                let values = match result {
1712                    QueryResult::Rows { rows, .. } => rows
1713                        .into_iter()
1714                        .filter_map(|mut row| {
1715                            if row.is_empty() {
1716                                None
1717                            } else {
1718                                Some(value_to_expr(row.swap_remove(0)))
1719                            }
1720                        })
1721                        .collect(),
1722                    _ => Vec::new(),
1723                };
1724                // WS2: byte-budget guard on the materialized IN-list.
1725                self.charge_in_list(&values)?;
1726                Ok(Expr::InList {
1727                    expr: Box::new(inner),
1728                    list: values,
1729                    negated: *negated,
1730                })
1731            }
1732            Expr::ExistsSubquery { subquery, negated } => {
1733                if is_correlated_subquery(subquery, &self.catalog) {
1734                    return Ok(expr.clone());
1735                }
1736                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1737                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1738                let result = self.execute_plan_readonly(&sub_plan)?;
1739                let has_rows = match result {
1740                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
1741                    _ => false,
1742                };
1743                let truth = if *negated { !has_rows } else { has_rows };
1744                Ok(Expr::Literal(Literal::Bool(truth)))
1745            }
1746            Expr::BinaryOp(l, op, r) => {
1747                let l = self.materialize_subqueries_readonly(l)?;
1748                let r = self.materialize_subqueries_readonly(r)?;
1749                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1750            }
1751            Expr::UnaryOp(op, inner) => {
1752                let inner = self.materialize_subqueries_readonly(inner)?;
1753                Ok(Expr::UnaryOp(*op, Box::new(inner)))
1754            }
1755            Expr::Case { whens, else_expr } => {
1756                let whens = whens
1757                    .iter()
1758                    .map(|(c, r)| {
1759                        let c = self.materialize_subqueries_readonly(c)?;
1760                        let r = self.materialize_subqueries_readonly(r)?;
1761                        Ok((Box::new(c), Box::new(r)))
1762                    })
1763                    .collect::<Result<Vec<_>, QueryError>>()?;
1764                let else_expr = match else_expr {
1765                    Some(e) => Some(Box::new(self.materialize_subqueries_readonly(e)?)),
1766                    None => None,
1767                };
1768                Ok(Expr::Case { whens, else_expr })
1769            }
1770            other => Ok(other.clone()),
1771        }
1772    }
1773
1774    /// Per-row materialisation of correlated subqueries. For each row in the
1775    /// outer query, substitute outer column references in the subquery's
1776    /// filter with the current row's literal values, execute the modified
1777    /// subquery, and return the result as an InList or Bool literal.
1778    fn materialize_correlated_for_row_readonly(
1779        &self,
1780        expr: &Expr,
1781        outer_row: &[Value],
1782        outer_columns: &[String],
1783    ) -> Result<Expr, QueryError> {
1784        match expr {
1785            Expr::InSubquery {
1786                expr: inner,
1787                subquery,
1788                negated,
1789            } => {
1790                let inner =
1791                    self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1792                let mut sub = *subquery.clone();
1793                if let Some(ref filter) = sub.filter {
1794                    sub.filter = Some(substitute_outer_refs(
1795                        filter,
1796                        &sub.source,
1797                        &self.catalog,
1798                        outer_row,
1799                        outer_columns,
1800                    ));
1801                }
1802                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1803                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1804                let result = self.execute_plan_readonly(&sub_plan)?;
1805                let values = match result {
1806                    QueryResult::Rows { rows, .. } => rows
1807                        .into_iter()
1808                        .filter_map(|mut row| {
1809                            if row.is_empty() {
1810                                None
1811                            } else {
1812                                Some(value_to_expr(row.swap_remove(0)))
1813                            }
1814                        })
1815                        .collect(),
1816                    _ => Vec::new(),
1817                };
1818                // WS2: byte-budget guard on the per-row materialized IN-list.
1819                self.charge_in_list(&values)?;
1820                Ok(Expr::InList {
1821                    expr: Box::new(inner),
1822                    list: values,
1823                    negated: *negated,
1824                })
1825            }
1826            Expr::ExistsSubquery { subquery, negated } => {
1827                let mut sub = *subquery.clone();
1828                if let Some(ref filter) = sub.filter {
1829                    sub.filter = Some(substitute_outer_refs(
1830                        filter,
1831                        &sub.source,
1832                        &self.catalog,
1833                        outer_row,
1834                        outer_columns,
1835                    ));
1836                }
1837                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1838                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1839                let result = self.execute_plan_readonly(&sub_plan)?;
1840                let has_rows = match result {
1841                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
1842                    _ => false,
1843                };
1844                let truth = if *negated { !has_rows } else { has_rows };
1845                Ok(Expr::Literal(Literal::Bool(truth)))
1846            }
1847            Expr::BinaryOp(l, op, r) => {
1848                let l =
1849                    self.materialize_correlated_for_row_readonly(l, outer_row, outer_columns)?;
1850                let r =
1851                    self.materialize_correlated_for_row_readonly(r, outer_row, outer_columns)?;
1852                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1853            }
1854            Expr::UnaryOp(op, inner) => {
1855                let inner =
1856                    self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1857                Ok(Expr::UnaryOp(*op, Box::new(inner)))
1858            }
1859            other => Ok(other.clone()),
1860        }
1861    }
1862
1863    pub fn catalog(&self) -> &Catalog {
1864        &self.catalog
1865    }
1866
1867    pub fn catalog_mut(&mut self) -> &mut Catalog {
1868        &mut self.catalog
1869    }
1870}