Skip to main content

powdb_query/executor/
mod.rs

1//! PowDB query executor.
2
3// Submodules that don't use macros defined in this file.
4mod compiled;
5mod eval;
6pub mod mem_budget;
7
8use crate::ast::*;
9use crate::canonicalize::canonicalize;
10use crate::plan::*;
11use crate::plan_cache::PlanCache;
12use crate::planner;
13use crate::result::{QueryError, QueryResult};
14use powdb_storage::catalog::Catalog;
15use powdb_storage::row::{decode_column, decode_row, RowLayout};
16use powdb_storage::types::*;
17use powdb_storage::view::ViewRegistry;
18
19use std::io;
20use std::path::Path;
21use std::sync::Mutex;
22use std::time::Instant;
23use tracing::{error, info, Level};
24
25use self::compiled::*;
26use self::eval::*;
27
28/// Legacy sentinel string constant — kept for backward compatibility with
29/// any external code matching on the string representation. New code should
30/// match on `QueryError::ReadonlyNeedsWrite` directly.
31pub const READONLY_NEEDS_WRITE: &str = "__POWDB_READONLY_NEEDS_WRITE__";
32
33/// Plan cache capacity. Bench workloads fill ~15 slots; real apps will sit
34/// comfortably in 256. Lookup is O(1), collisions clear the cache (see
35/// `plan_cache::PlanCache::insert`).
36const PLAN_CACHE_CAPACITY: usize = 256;
37
38/// Maximum number of rows a join may produce before the executor aborts.
39/// Prevents Cartesian-product blowups (e.g. `T cross join T` on 10K rows
40/// would produce 100M rows in memory without this cap).
41pub(super) const MAX_JOIN_ROWS: usize = 1_000_000;
42
43/// Maximum number of rows that may be materialized for sorting.
44/// Queries that exceed this should add a LIMIT clause to narrow the input
45/// before sorting.
46pub(super) const MAX_SORT_ROWS: usize = 10_000_000;
47
48#[inline]
49pub(super) fn check_join_limit(row_count: usize) -> Result<(), QueryError> {
50    if row_count > MAX_JOIN_ROWS {
51        return Err(QueryError::JoinLimitExceeded);
52    }
53    Ok(())
54}
55
56// ─── Mission D11 Phase 1: scalar hot-loop helpers ─────────────────────────
57//
58// These macros expand into the scan body of `agg_single_col_fast` and sit
59// inside the `for_each_row_raw` closure. They exist to:
60//
61//   1. Split the loop on presence of a predicate *outside* the hot body,
62//      so the no-predicate path (agg_sum/agg_min/agg_max bench workloads)
63//      never pays the `Option<CompiledPredicate>` branch per row.
64//   2. Drop two bounds checks per row by reading the null bitmap byte
65//      and the 8-byte value via raw pointer casts.
66//
67// SAFETY (shared across every call site below):
68//
69//   - `$bmp_byte` is `col_idx / 8` where `col_idx < n_cols`, and the row
70//     encoding stores `bitmap_size = n_cols.div_ceil(8)` bytes of bitmap
71//     starting at offset 2. So `2 + $bmp_byte < 2 + bitmap_size ≤ row_len`
72//     and `get_unchecked(2 + $bmp_byte)` is inside the row slice.
73//   - `$off = 2 + bitmap_size + fixed_offsets[col_idx]` for a fixed-size
74//     column. Every fixed-size column contributes `fixed_size(type_id)`
75//     bytes to the fixed region, so the row always has `[$off .. $off+8]`
76//     available for any i64/f64 column — enforced by the row encoder
77//     (`storage/src/row.rs`) and the schema invariant that a row with a
78//     given schema has `row_len ≥ 2 + bitmap_size + fixed_region_size`.
79//   - Both macros are only invoked from `agg_single_col_fast`, which
80//     early-returns if the column isn't Int/Float (8-byte fixed) and
81//     early-returns if `fast.fixed_offsets[col_idx]` is `None`.
82macro_rules! agg_int_loop {
83    (
84        $self:expr, $table:expr, $pred:expr,
85        $bmp_byte:expr, $bmp_bit:expr, $off:expr,
86        |$v:ident : i64| $body:block
87    ) => {{
88        let bmp_byte = $bmp_byte;
89        let bmp_bit = $bmp_bit;
90        let off = $off;
91        if let Some(pred) = &$pred {
92            $self
93                .catalog
94                .for_each_row_raw($table, |_rid, data| {
95                    if !pred(data) {
96                        return;
97                    }
98                    // Bounds guard: skip corrupt/truncated rows that are too
99                    // short to contain the bitmap byte or the 8-byte value.
100                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
101                        return;
102                    }
103                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
104                    // The bitmap byte lives at offset 2..2+bitmap_size in the
105                    // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
106                    // Corrupt rows are rejected by the bounds guard.
107                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
108                    if (bmp >> bmp_bit) & 1 == 1 {
109                        return;
110                    }
111                    // SAFETY: `off + 8 <= data.len()` is checked above.
112                    // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
113                    // to an 8-byte i64 in the fixed-size region of the row.
114                    // The pointer cast is valid because we read exactly 8
115                    // bytes via from_le_bytes. Corrupt rows are rejected by
116                    // the bounds guard.
117                    let $v: i64 =
118                        unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
119                    $body
120                })
121                .map_err(|e| QueryError::StorageError(e.to_string()))?;
122        } else {
123            $self
124                .catalog
125                .for_each_row_raw($table, |_rid, data| {
126                    // Bounds guard: skip corrupt/truncated rows.
127                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
128                        return;
129                    }
130                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
131                    // See the predicate branch for the full invariant.
132                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
133                    if (bmp >> bmp_bit) & 1 == 1 {
134                        return;
135                    }
136                    // SAFETY: `off + 8 <= data.len()` is checked above.
137                    // See the predicate branch for the full invariant.
138                    let $v: i64 =
139                        unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
140                    $body
141                })
142                .map_err(|e| QueryError::StorageError(e.to_string()))?;
143        }
144    }};
145}
146
147macro_rules! agg_float_loop {
148    (
149        $self:expr, $table:expr, $pred:expr,
150        $bmp_byte:expr, $bmp_bit:expr, $off:expr,
151        |$v:ident : f64| $body:block
152    ) => {{
153        let bmp_byte = $bmp_byte;
154        let bmp_bit = $bmp_bit;
155        let off = $off;
156        if let Some(pred) = &$pred {
157            $self
158                .catalog
159                .for_each_row_raw($table, |_rid, data| {
160                    if !pred(data) {
161                        return;
162                    }
163                    // Bounds guard: skip corrupt/truncated rows that are too
164                    // short to contain the bitmap byte or the 8-byte value.
165                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
166                        return;
167                    }
168                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
169                    // The bitmap byte lives at offset 2..2+bitmap_size in the
170                    // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
171                    // Corrupt rows are rejected by the bounds guard.
172                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
173                    if (bmp >> bmp_bit) & 1 == 1 {
174                        return;
175                    }
176                    // SAFETY: `off + 8 <= data.len()` is checked above.
177                    // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
178                    // to an 8-byte f64 in the fixed-size region of the row.
179                    // The pointer cast is valid because we read exactly 8
180                    // bytes via from_le_bytes. Corrupt rows are rejected by
181                    // the bounds guard.
182                    let $v: f64 =
183                        unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
184                    $body
185                })
186                .map_err(|e| QueryError::StorageError(e.to_string()))?;
187        } else {
188            $self
189                .catalog
190                .for_each_row_raw($table, |_rid, data| {
191                    // Bounds guard: skip corrupt/truncated rows.
192                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
193                        return;
194                    }
195                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
196                    // See the predicate branch for the full invariant.
197                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
198                    if (bmp >> bmp_bit) & 1 == 1 {
199                        return;
200                    }
201                    // SAFETY: `off + 8 <= data.len()` is checked above.
202                    // See the predicate branch for the full invariant.
203                    let $v: f64 =
204                        unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
205                    $body
206                })
207                .map_err(|e| QueryError::StorageError(e.to_string()))?;
208        }
209    }};
210}
211
212// Submodules that use the macros above — must be declared after macro_rules!.
213mod plan_exec;
214mod prepared;
215
216#[cfg(test)]
217mod tests;
218
219// Re-exports for the public API
220pub use self::prepared::PreparedQuery;
221
222use self::plan_exec::{
223    compute_group_aggregate, execute_window, format_plan_tree, hash_join,
224    lower_unindexed_range_scans, range_matches, synthesize_range_predicate,
225    try_extract_equi_join_keys,
226};
227
228/// Mission infra-1: classify a parsed statement as read-only vs. mutating.
229/// Used by [`Engine::execute_powql_readonly`] and by the server handler
230/// to decide between the RwLock reader and writer sides. `Union` recurses
231/// because each side can independently be read/write (though in practice
232/// both sides are reads — the parser only builds Union from query shapes).
233pub fn is_read_only_statement(stmt: &Statement) -> bool {
234    match stmt {
235        Statement::Query(_) => true,
236        Statement::Union(u) => is_read_only_statement(&u.left) && is_read_only_statement(&u.right),
237        Statement::Insert(_)
238        | Statement::Upsert(_)
239        | Statement::UpdateQuery(_)
240        | Statement::DeleteQuery(_)
241        | Statement::CreateType(_)
242        | Statement::AlterTable(_)
243        | Statement::DropTable(_)
244        | Statement::CreateView(_)
245        | Statement::RefreshView(_)
246        | Statement::DropView(_) => false,
247        Statement::Begin | Statement::Commit | Statement::Rollback => false,
248        Statement::Explain(inner) => is_read_only_statement(inner),
249    }
250}
251
252pub struct Engine {
253    catalog: Catalog,
254    /// Mission D9 — cached parsed+planned query trees keyed by canonical
255    /// hash. Saves the ~3μs parse+plan cost on repeat queries that differ
256    /// only in literal values.
257    ///
258    /// Mission infra-1: wrapped in `Mutex` so the read path can be driven
259    /// by `&self`. The critical section is extremely short — a single
260    /// hashmap lookup + plan clone on a hit, or a single insert on a miss.
261    /// A full `RwLock` would be over-engineered here; the contention window
262    /// is smaller than the read-path scan work it gates.
263    plan_cache: Mutex<PlanCache>,
264    /// Mission C Phase 13: reusable `Vec<Value>` scratch buffer for the
265    /// prepared-insert fast path. `execute_prepared` used to allocate a
266    /// fresh `vec![Value::Empty; n_cols]` on every insert; recycling this
267    /// buffer shaves one heap alloc per row on `insert_batch_1k`.
268    insert_values_scratch: Vec<Value>,
269    /// Materialized view registry: tracks view definitions, dependencies,
270    /// and dirty state. Views are backed by regular catalog tables; this
271    /// registry adds the lifecycle metadata.
272    view_registry: ViewRegistry,
273    in_transaction: bool,
274    /// WS2 — per-query memory budget ceiling (bytes). The running total lives
275    /// in a thread-local (see [`mem_budget`]) and is reset at every top-level
276    /// query entry, so sort/join/GROUP BY/IN-list materialization can be capped
277    /// without OOM-killing the process. This field holds only the *limit* (a
278    /// plain `usize`, so `Engine` stays `Sync` for the concurrent read path).
279    /// Default [`mem_budget::DEFAULT_QUERY_MEMORY_LIMIT`] (256 MB); overridable
280    /// via `Engine::with_memory_limit` (server reads `POWDB_QUERY_MEMORY_LIMIT`).
281    query_memory_limit: usize,
282}
283
284impl Engine {
285    /// Open or create a PowDB engine rooted at `data_dir`.
286    ///
287    /// If the directory already contains a catalog, it is reopened.
288    /// Otherwise a fresh empty database is created.
289    ///
290    /// # Examples
291    ///
292    /// ```
293    /// use powdb_query::executor::Engine;
294    ///
295    /// let dir = tempfile::tempdir().unwrap();
296    /// let engine = Engine::new(dir.path()).unwrap();
297    /// // Engine is ready — the directory now contains a catalog.
298    /// ```
299    pub fn new(data_dir: &Path) -> io::Result<Self> {
300        std::fs::create_dir_all(data_dir)?;
301        // Try to reopen an existing database first; only create a fresh
302        // catalog when there isn't one already on disk.
303        let catalog = match Catalog::open(data_dir) {
304            Ok(c) => {
305                info!(data_dir = %data_dir.display(), "engine reopened existing database");
306                c
307            }
308            Err(e) if e.kind() == io::ErrorKind::NotFound => {
309                info!(data_dir = %data_dir.display(), "engine initialized fresh database");
310                Catalog::create(data_dir)?
311            }
312            Err(e) => return Err(e),
313        };
314        let view_registry =
315            ViewRegistry::open(data_dir).unwrap_or_else(|_| ViewRegistry::new(data_dir));
316        Ok(Engine {
317            catalog,
318            plan_cache: Mutex::new(PlanCache::new(PLAN_CACHE_CAPACITY)),
319            insert_values_scratch: Vec::new(),
320            view_registry,
321            in_transaction: false,
322            query_memory_limit: mem_budget::DEFAULT_QUERY_MEMORY_LIMIT,
323        })
324    }
325
326    /// Open or create an engine with an explicit per-query memory limit
327    /// (bytes). Used by the server to apply `POWDB_QUERY_MEMORY_LIMIT`, and by
328    /// tests that need a tiny limit to exercise the budget guard.
329    pub fn with_memory_limit(data_dir: &Path, limit_bytes: usize) -> io::Result<Self> {
330        let mut engine = Engine::new(data_dir)?;
331        engine.set_query_memory_limit(limit_bytes);
332        Ok(engine)
333    }
334
335    /// Current per-query memory limit in bytes.
336    pub fn query_memory_limit(&self) -> usize {
337        self.query_memory_limit
338    }
339
340    /// Override the per-query memory limit in bytes (builder-style).
341    pub fn set_query_memory_limit(&mut self, limit_bytes: usize) {
342        self.query_memory_limit = limit_bytes;
343    }
344
345    /// Enter a budgeted-statement frame for the current query. The returned
346    /// guard must be held for the duration of the statement; on its drop the
347    /// reentrancy depth is decremented. Only the *outermost* statement entry
348    /// zeroes this thread's running total, so a nested `execute_powql` (the
349    /// source query of a `create_view`/`refresh_view`) does NOT discard the
350    /// outer frame's accounting. The accumulator is thread-local, so this never
351    /// touches another concurrent query's total.
352    #[must_use = "the budget guard must outlive the statement body"]
353    pub(super) fn enter_memory_budget(&self) -> mem_budget::EnterGuard {
354        mem_budget::enter()
355    }
356
357    /// Charge the estimated footprint of a freshly materialized batch of rows
358    /// against the current per-query budget. Returns
359    /// [`QueryError::MemoryLimitExceeded`] cleanly if the batch would push the
360    /// query over its limit. Used at every full-materialization point (sort
361    /// buffer, join build side, GROUP BY hash table, IN-list).
362    pub(super) fn charge_rows(&self, rows: &[Vec<Value>]) -> Result<(), QueryError> {
363        let mut total = 0usize;
364        for row in rows {
365            total = total.saturating_add(mem_budget::estimate_row_size(row));
366        }
367        mem_budget::charge(total, self.query_memory_limit)
368    }
369
370    /// Charge a materialized IN-list (the literal expressions pulled out of an
371    /// uncorrelated `IN (subquery)`) against the current per-query budget.
372    /// Each item is conservatively sized at the `Expr` slot plus, for string
373    /// literals, the owned heap bytes.
374    pub(super) fn charge_in_list(&self, list: &[crate::ast::Expr]) -> Result<(), QueryError> {
375        let base = std::mem::size_of::<crate::ast::Expr>();
376        let mut total = std::mem::size_of::<Vec<crate::ast::Expr>>();
377        for item in list {
378            total = total.saturating_add(base);
379            if let crate::ast::Expr::Literal(crate::ast::Literal::String(s)) = item {
380                total = total.saturating_add(s.capacity());
381            }
382        }
383        mem_budget::charge(total, self.query_memory_limit)
384    }
385
386    /// Parse + plan + execute a PowQL query.
387    ///
388    /// # Examples
389    ///
390    /// ```
391    /// use powdb_query::executor::Engine;
392    /// use powdb_query::result::QueryResult;
393    ///
394    /// let dir = tempfile::tempdir().unwrap();
395    /// let mut engine = Engine::new(dir.path()).unwrap();
396    ///
397    /// // Create a table and insert a row.
398    /// engine.execute_powql("type User { required name: str, age: int }").unwrap();
399    /// engine.execute_powql(r#"insert User { name := "Alice", age := 30 }"#).unwrap();
400    ///
401    /// // Query rows back.
402    /// let result = engine.execute_powql("User").unwrap();
403    /// assert_eq!(result.row_count(), 1);
404    /// ```
405    ///
406    /// Mission D6 — tracing collapse: the previous implementation ran 4
407    /// `Instant::now()` + 3 `elapsed().as_micros()` calls + formatted an
408    /// `info!` span on every query, even when tracing was disabled. On a
409    /// sub-microsecond `point_lookup_indexed` call that overhead was
410    /// 100-200ns — 20%+ of the whole query. We now measure time only when
411    /// INFO is actually enabled via `tracing::enabled!`, and we moved the
412    /// noisy `debug!(?plan)` line behind the same gate so the Debug
413    /// formatter can't run unconditionally either.
414    ///
415    /// Mission D9 — plan cache: on the hot path we canonicalise the query
416    /// text (lex + FNV-1a hash with literal values stripped), check the
417    /// cache, and on a hit substitute the new literals into a clone of the
418    /// cached plan. This skips re-lexing, re-parsing, and re-planning —
419    /// around 3μs per call on bench workloads. On a miss we plan as before
420    /// and insert the plan under its canonical hash.
421    pub fn execute_powql(&mut self, input: &str) -> Result<QueryResult, QueryError> {
422        // WS2: each *outermost* statement starts with the full memory
423        // allowance. The guard holds the reentrancy depth so a nested
424        // `execute_powql` (e.g. a view's source query) does not reset the
425        // outer frame's accounting mid-statement.
426        let _budget = self.enter_memory_budget();
427        // Hot path: tracing disabled. Zero syscalls, zero formatting.
428        if !tracing::enabled!(Level::INFO) {
429            // D9: try the plan cache first. Canonicalisation lexes the
430            // query once; on a hit we skip the parser and planner entirely.
431            if let Ok((hash, literals)) = canonicalize(input) {
432                let cached = self
433                    .plan_cache
434                    .lock()
435                    .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
436                    .get_with_substitution(hash, &literals);
437                if let Some(plan) = cached {
438                    let plan = lower_unindexed_range_scans(&self.catalog, &plan);
439                    let result = self.execute_plan(&plan);
440                    // Mission B (post-review): statement-boundary WAL
441                    // group commit. Catalog::wal_log now only appends;
442                    // the fsync happens here exactly once per statement.
443                    // `sync_wal` is a no-op when nothing was buffered
444                    // (pure reads pay zero fsync).
445                    if !self.in_transaction {
446                        self.catalog
447                            .sync_wal()
448                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
449                    }
450                    return result;
451                }
452                // Miss — plan, insert, execute.
453                return match planner::plan(input) {
454                    Ok(plan) => {
455                        self.plan_cache
456                            .lock()
457                            .map_err(|e| {
458                                QueryError::Execution(format!("plan cache lock poisoned: {e}"))
459                            })?
460                            .insert(hash, plan.clone());
461                        let plan = lower_unindexed_range_scans(&self.catalog, &plan);
462                        let result = self.execute_plan(&plan);
463                        if !self.in_transaction {
464                            self.catalog
465                                .sync_wal()
466                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
467                        }
468                        result
469                    }
470                    Err(e) => Err(QueryError::Parse(e.to_string())),
471                };
472            }
473            // Lex error — fall through to the planner so the caller gets a
474            // consistent error shape.
475            return match planner::plan(input) {
476                Ok(plan) => {
477                    let plan = lower_unindexed_range_scans(&self.catalog, &plan);
478                    let result = self.execute_plan(&plan);
479                    if !self.in_transaction {
480                        self.catalog
481                            .sync_wal()
482                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
483                    }
484                    result
485                }
486                Err(e) => Err(QueryError::Parse(e.to_string())),
487            };
488        }
489
490        // Instrumented path — only taken under explicit tracing subscribers.
491        let total_start = Instant::now();
492        let plan_start = Instant::now();
493        let plan = planner::plan(input).map_err(|e| {
494            let msg = e.to_string();
495            error!(query = %input, error = %msg, "query plan failed");
496            QueryError::Parse(msg)
497        })?;
498        let plan_us = plan_start.elapsed().as_micros();
499
500        let exec_start = Instant::now();
501        let plan = lower_unindexed_range_scans(&self.catalog, &plan);
502        let result = self.execute_plan(&plan);
503        if !self.in_transaction {
504            self.catalog
505                .sync_wal()
506                .map_err(|e| QueryError::StorageError(e.to_string()))?;
507        }
508        let exec_us = exec_start.elapsed().as_micros();
509
510        let total_us = total_start.elapsed().as_micros();
511        match &result {
512            Ok(r) => {
513                info!(
514                    query = %input,
515                    plan_us = plan_us,
516                    exec_us = exec_us,
517                    total_us = total_us,
518                    rows = r.row_count(),
519                    "query ok"
520                );
521            }
522            Err(e) => {
523                error!(
524                    query = %input,
525                    plan_us = plan_us,
526                    exec_us = exec_us,
527                    error = %e,
528                    "query failed"
529                );
530            }
531        }
532        result
533    }
534
535    /// Plan cache stats — useful for benches and debugging.
536    pub fn plan_cache_stats(&self) -> (u64, u64, usize) {
537        let cache = self.plan_cache.lock().unwrap_or_else(|e| e.into_inner());
538        (cache.hits, cache.misses, cache.len())
539    }
540
541    /// Mission infra-1: read-only entry point.
542    ///
543    /// Parses + plans + executes a PowQL query using only a shared borrow
544    /// on the engine. Rejects any statement that would mutate state
545    /// (Insert/Update/Delete/CreateTable/AlterTable/DropTable/CreateView/
546    /// RefreshView/DropView) by returning [`READONLY_NEEDS_WRITE`] so the
547    /// caller can escalate to the write lock.
548    ///
549    /// Also returns [`READONLY_NEEDS_WRITE`] if a materialized view in the
550    /// query is dirty — refreshing one requires `&mut self`, so the caller
551    /// must retake the write lock for the first refresh.
552    ///
553    /// This method is the concurrent-read fast path behind
554    /// `Arc<RwLock<Engine>>`: multiple threads can call it simultaneously
555    /// under a shared `.read()` lock and each will scan independently.
556    pub fn execute_powql_readonly(&self, input: &str) -> Result<QueryResult, QueryError> {
557        // WS2: each *outermost* statement starts with the full memory
558        // allowance. The guard holds the reentrancy depth so a nested
559        // `execute_powql*` does not reset the outer frame's accounting.
560        let _budget = self.enter_memory_budget();
561        // Parse the statement first so we can classify read vs. write
562        // without touching the catalog. This is the same lex+parse cost
563        // the hot path would pay anyway.
564        let stmt = crate::parser::parse(input).map_err(|e| QueryError::Parse(e.to_string()))?;
565        if !is_read_only_statement(&stmt) {
566            return Err(QueryError::ReadonlyNeedsWrite);
567        }
568
569        // Try the plan cache first — identical hash scheme to
570        // `execute_powql` so both paths share cache state. The mutex
571        // section is just a hashmap lookup + plan clone.
572        if let Ok((hash, literals)) = canonicalize(input) {
573            let cached = self
574                .plan_cache
575                .lock()
576                .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
577                .get_with_substitution(hash, &literals);
578            if let Some(plan) = cached {
579                let plan = lower_unindexed_range_scans(&self.catalog, &plan);
580                return self.execute_plan_readonly(&plan);
581            }
582            // Miss: plan + insert + execute. The planner is pure, so this
583            // is safe from `&self`.
584            let plan = crate::planner::plan_statement(stmt)
585                .map_err(|e| QueryError::Parse(e.to_string()))?;
586            self.plan_cache
587                .lock()
588                .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
589                .insert(hash, plan.clone());
590            let plan = lower_unindexed_range_scans(&self.catalog, &plan);
591            return self.execute_plan_readonly(&plan);
592        }
593        // Lex error — fall through to the planner for a consistent error
594        // shape (though `parse` above would usually have caught it).
595        let plan =
596            crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
597        let plan = lower_unindexed_range_scans(&self.catalog, &plan);
598        self.execute_plan_readonly(&plan)
599    }
600
601    /// Read-only version of [`Engine::execute_plan`]. Dispatches the
602    /// read-path plan variants by calling `&self` helpers and errors with
603    /// [`READONLY_NEEDS_WRITE`] on any write variant. This is the
604    /// recursion target for composite read plans under the RwLock reader.
605    ///
606    /// The dispatch mirrors `execute_plan` for the read branches but does
607    /// not carry any of the fast-paths that need `&mut self` (e.g. plan-
608    /// cache mutation on inner subqueries is handled via the shared mutex
609    /// in [`Engine::execute_powql_readonly`]; in-flight subquery
610    /// materialisation uses [`Engine::materialize_subqueries_readonly`]).
611    fn execute_plan_readonly(&self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
612        match plan {
613            PlanNode::SeqScan { table } => {
614                // Dirty view means we'd need to refresh it — can't do that
615                // under `&self`. Escalate to the write path.
616                if self.view_registry.is_dirty(table) {
617                    return Err(QueryError::ReadonlyNeedsWrite);
618                }
619                let schema = self
620                    .catalog
621                    .schema(table)
622                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
623                    .clone();
624                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
625                let rows: Vec<Vec<Value>> = self
626                    .catalog
627                    .scan(table)
628                    .map_err(|e| e.to_string())?
629                    .map(|(_, row)| row)
630                    .collect();
631                Ok(QueryResult::Rows { columns, rows })
632            }
633
634            PlanNode::AliasScan { table, alias } => {
635                let schema = self
636                    .catalog
637                    .schema(table)
638                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
639                    .clone();
640                let columns: Vec<String> = schema
641                    .columns
642                    .iter()
643                    .map(|c| format!("{alias}.{}", c.name))
644                    .collect();
645                let rows: Vec<Vec<Value>> = self
646                    .catalog
647                    .scan(table)
648                    .map_err(|e| e.to_string())?
649                    .map(|(_, row)| row)
650                    .collect();
651                Ok(QueryResult::Rows { columns, rows })
652            }
653
654            PlanNode::IndexScan { table, column, key } => {
655                let schema = self
656                    .catalog
657                    .schema(table)
658                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
659                    .clone();
660                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
661                let key_value = literal_to_value(key)?;
662                let tbl = self
663                    .catalog
664                    .get_table(table)
665                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
666
667                if tbl.has_index(column) {
668                    // Use index_lookup_all to handle both unique and
669                    // non-unique indexes — returns all matching RowIds.
670                    let rids = tbl.index_lookup_all(column, &key_value);
671                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
672                    for rid in rids {
673                        if let Some(data) = tbl.heap.get(rid) {
674                            rows.push(decode_row(&tbl.schema, &data));
675                        }
676                    }
677                    return Ok(QueryResult::Rows { columns, rows });
678                }
679
680                // No index: synthetic eq predicate + compiled scan.
681                let fast = FastLayout::new(&schema);
682                let synth_pred = Expr::BinaryOp(
683                    Box::new(Expr::Field(column.clone())),
684                    BinOp::Eq,
685                    Box::new(key.clone()),
686                );
687                if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, &schema) {
688                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
689                    self.catalog
690                        .for_each_row_raw(table, |_rid, data| {
691                            if compiled(data) {
692                                rows.push(decode_row(&schema, data));
693                            }
694                        })
695                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
696                    return Ok(QueryResult::Rows { columns, rows });
697                }
698
699                // Last resort: slow eq-check.
700                let col_idx =
701                    schema
702                        .column_index(column)
703                        .ok_or_else(|| QueryError::ColumnNotFound {
704                            table: String::new(),
705                            column: column.clone(),
706                        })?;
707                let rows: Vec<Vec<Value>> = tbl
708                    .scan()
709                    .filter_map(|(_, row)| {
710                        if row[col_idx] == key_value {
711                            Some(row)
712                        } else {
713                            None
714                        }
715                    })
716                    .collect();
717                Ok(QueryResult::Rows { columns, rows })
718            }
719
720            PlanNode::RangeScan {
721                table,
722                column,
723                start,
724                end,
725            } => {
726                let tbl = self
727                    .catalog
728                    .get_table(table)
729                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
730                let columns: Vec<String> =
731                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
732                let schema = tbl.schema.clone();
733
734                let start_val = match start {
735                    Some((expr, _)) => Some(literal_to_value(expr)?),
736                    None => None,
737                };
738                let end_val = match end {
739                    Some((expr, _)) => Some(literal_to_value(expr)?),
740                    None => None,
741                };
742                let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
743                let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
744
745                // Range scans only use the btree fast path for unique indexes.
746                // Non-unique indexes store composite keys that don't compare
747                // directly against raw column values.
748                if tbl.is_index_unique(column) == Some(true) {
749                    if let Some(btree) = tbl.index(column) {
750                        let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
751                            (Some(s), Some(e)) => btree.range(s, e).collect(),
752                            (Some(s), None) => btree.range_from(s),
753                            (None, Some(e)) => btree.range_to(e),
754                            (None, None) => {
755                                // Unbounded both sides — equivalent to seq scan.
756                                let rows: Vec<Vec<Value>> =
757                                    tbl.scan().map(|(_, row)| row).collect();
758                                return Ok(QueryResult::Rows { columns, rows });
759                            }
760                        };
761                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
762                        for (key, rid) in hits {
763                            // Filter for exclusive bounds.
764                            if !start_inclusive {
765                                if let Some(ref s) = start_val {
766                                    if &key == s {
767                                        continue;
768                                    }
769                                }
770                            }
771                            if !end_inclusive {
772                                if let Some(ref e) = end_val {
773                                    if &key == e {
774                                        continue;
775                                    }
776                                }
777                            }
778                            if let Some(data) = tbl.heap.get(rid) {
779                                rows.push(decode_row(&schema, &data));
780                            }
781                        }
782                        return Ok(QueryResult::Rows { columns, rows });
783                    }
784                }
785
786                // Fallback: no index — synthesize the range predicate and scan.
787                let fast = FastLayout::new(&schema);
788                let synth = synthesize_range_predicate(column, start, end);
789                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, &schema) {
790                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
791                    self.catalog
792                        .for_each_row_raw(table, |_rid, data| {
793                            if compiled(data) {
794                                rows.push(decode_row(&schema, data));
795                            }
796                        })
797                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
798                    return Ok(QueryResult::Rows { columns, rows });
799                }
800
801                // Last resort: decoded row eval.
802                let col_idx =
803                    schema
804                        .column_index(column)
805                        .ok_or_else(|| QueryError::ColumnNotFound {
806                            table: String::new(),
807                            column: column.clone(),
808                        })?;
809                let rows: Vec<Vec<Value>> = tbl
810                    .scan()
811                    .filter(|(_, row)| {
812                        range_matches(
813                            &row[col_idx],
814                            &start_val,
815                            start_inclusive,
816                            &end_val,
817                            end_inclusive,
818                        )
819                    })
820                    .map(|(_, row)| row)
821                    .collect();
822                Ok(QueryResult::Rows { columns, rows })
823            }
824
825            PlanNode::Filter { input, predicate } => {
826                // Materialise subqueries using the `&self` variant.
827                // Uncorrelated subqueries are replaced with InList/Bool;
828                // correlated ones are left as InSubquery/ExistsSubquery
829                // for per-row materialisation below.
830                let materialized;
831                let predicate = if contains_subquery(predicate) {
832                    materialized = self.materialize_subqueries_readonly(predicate)?;
833                    &materialized
834                } else {
835                    predicate
836                };
837
838                // Correlated subquery path: per-row materialisation.
839                if contains_subquery(predicate) {
840                    let result = self.execute_plan_readonly(input)?;
841                    return match result {
842                        QueryResult::Rows { columns, rows } => {
843                            let mut filtered = Vec::new();
844                            for row in rows {
845                                let row_pred = self.materialize_correlated_for_row_readonly(
846                                    predicate, &row, &columns,
847                                )?;
848                                if eval_predicate(&row_pred, &row, &columns) {
849                                    filtered.push(row);
850                                }
851                            }
852                            Ok(QueryResult::Rows {
853                                columns,
854                                rows: filtered,
855                            })
856                        }
857                        _ => Err("filter requires row input".into()),
858                    };
859                }
860
861                // Fused Filter+SeqScan fast path.
862                if let PlanNode::SeqScan { table } = input.as_ref() {
863                    if self.view_registry.is_dirty(table) {
864                        return Err(QueryError::ReadonlyNeedsWrite);
865                    }
866                    let schema = self
867                        .catalog
868                        .schema(table)
869                        .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
870                        .clone();
871                    let columns: Vec<String> =
872                        schema.columns.iter().map(|c| c.name.clone()).collect();
873                    let fast = FastLayout::new(&schema);
874                    let row_layout = RowLayout::new(&schema);
875                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
876
877                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
878                        self.catalog
879                            .for_each_row_raw(table, |_rid, data| {
880                                if compiled(data) {
881                                    rows.push(decode_row(&schema, data));
882                                }
883                            })
884                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
885                    } else {
886                        let pred_cols = predicate_column_indices(predicate, &columns);
887                        self.catalog
888                            .for_each_row_raw(table, |_rid, data| {
889                                let pred_row =
890                                    decode_selective(&schema, &row_layout, data, &pred_cols);
891                                if eval_predicate(predicate, &pred_row, &columns) {
892                                    rows.push(decode_row(&schema, data));
893                                }
894                            })
895                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
896                    }
897
898                    return Ok(QueryResult::Rows { columns, rows });
899                }
900
901                // General path.
902                let result = self.execute_plan_readonly(input)?;
903                match result {
904                    QueryResult::Rows { columns, rows } => {
905                        let filtered: Vec<Vec<Value>> = rows
906                            .into_iter()
907                            .filter(|row| eval_predicate(predicate, row, &columns))
908                            .collect();
909                        Ok(QueryResult::Rows {
910                            columns,
911                            rows: filtered,
912                        })
913                    }
914                    _ => Err("filter requires row input".into()),
915                }
916            }
917
918            PlanNode::Project { input, fields } => {
919                // Fast path: Project over IndexScan. Avoids full-row decode
920                // by calling decode_column only for projected fields.
921                if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
922                    let key_value = literal_to_value(key)?;
923                    let tbl = self
924                        .catalog
925                        .get_table(table)
926                        .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
927                    let schema = &tbl.schema;
928                    let layout = tbl.row_layout();
929
930                    let proj_columns: Vec<String> = fields
931                        .iter()
932                        .map(|f| {
933                            f.alias.clone().unwrap_or_else(|| match &f.expr {
934                                Expr::Field(name) => name.clone(),
935                                _ => "?".into(),
936                            })
937                        })
938                        .collect();
939
940                    let proj_indices: Vec<usize> = fields
941                        .iter()
942                        .filter_map(|f| {
943                            if let Expr::Field(name) = &f.expr {
944                                schema.column_index(name)
945                            } else {
946                                None
947                            }
948                        })
949                        .collect();
950
951                    if tbl.has_index(column) {
952                        let rids = tbl.index_lookup_all(column, &key_value);
953                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
954                        for rid in rids {
955                            if let Some(data) = tbl.heap.get(rid) {
956                                let row: Vec<Value> = proj_indices
957                                    .iter()
958                                    .map(|&ci| decode_column(schema, layout, &data, ci))
959                                    .collect();
960                                rows.push(row);
961                            }
962                        }
963                        return Ok(QueryResult::Rows {
964                            columns: proj_columns,
965                            rows,
966                        });
967                    }
968                }
969
970                // Fast paths over Limit(Sort(...)) / Limit(Filter(...)) / Limit(SeqScan).
971                if let PlanNode::Limit {
972                    input: inner,
973                    count: limit_expr,
974                } = input.as_ref()
975                {
976                    if let PlanNode::Sort {
977                        input: sort_input,
978                        keys,
979                    } = inner.as_ref()
980                    {
981                        if keys.len() == 1 {
982                            let sort_field = &keys[0].field;
983                            let descending = keys[0].descending;
984                            let limit = match limit_expr {
985                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
986                                _ => usize::MAX,
987                            };
988                            let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
989                                match sort_input.as_ref() {
990                                    PlanNode::SeqScan { table } => (Some(table.as_str()), None),
991                                    PlanNode::Filter {
992                                        input: fi,
993                                        predicate,
994                                    } => {
995                                        if let PlanNode::SeqScan { table } = fi.as_ref() {
996                                            (Some(table.as_str()), Some(predicate))
997                                        } else {
998                                            (None, None)
999                                        }
1000                                    }
1001                                    _ => (None, None),
1002                                };
1003                            if let Some(table) = table_opt {
1004                                if let Some(result) = self.project_filter_sort_limit_fast(
1005                                    table, fields, sort_field, descending, limit, pred_opt,
1006                                )? {
1007                                    return Ok(result);
1008                                }
1009                            }
1010                        }
1011                    }
1012                    if let PlanNode::Filter {
1013                        input: fi,
1014                        predicate,
1015                    } = inner.as_ref()
1016                    {
1017                        if let PlanNode::SeqScan { table } = fi.as_ref() {
1018                            let limit = match limit_expr {
1019                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1020                                _ => usize::MAX,
1021                            };
1022                            if let Some(result) = self.project_filter_limit_fast(
1023                                table,
1024                                fields,
1025                                limit,
1026                                Some(predicate),
1027                            )? {
1028                                return Ok(result);
1029                            }
1030                        }
1031                    }
1032                    if let PlanNode::SeqScan { table } = inner.as_ref() {
1033                        let limit = match limit_expr {
1034                            Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1035                            _ => usize::MAX,
1036                        };
1037                        if let Some(result) =
1038                            self.project_filter_limit_fast(table, fields, limit, None)?
1039                        {
1040                            return Ok(result);
1041                        }
1042                    }
1043                }
1044
1045                // Project(Filter(SeqScan)) without Limit.
1046                if let PlanNode::Filter {
1047                    input: fi,
1048                    predicate,
1049                } = input.as_ref()
1050                {
1051                    if let PlanNode::SeqScan { table } = fi.as_ref() {
1052                        if let Some(result) = self.project_filter_limit_fast(
1053                            table,
1054                            fields,
1055                            usize::MAX,
1056                            Some(predicate),
1057                        )? {
1058                            return Ok(result);
1059                        }
1060                    }
1061                }
1062
1063                // Project(SeqScan) without Filter or Limit.
1064                if let PlanNode::SeqScan { table } = input.as_ref() {
1065                    if let Some(result) =
1066                        self.project_filter_limit_fast(table, fields, usize::MAX, None)?
1067                    {
1068                        return Ok(result);
1069                    }
1070                }
1071
1072                // Generic path.
1073                let result = self.execute_plan_readonly(input)?;
1074                match result {
1075                    QueryResult::Rows { columns, rows } => {
1076                        let proj_columns: Vec<String> = fields
1077                            .iter()
1078                            .map(|f| {
1079                                f.alias.clone().unwrap_or_else(|| match &f.expr {
1080                                    Expr::Field(name) => name.clone(),
1081                                    Expr::QualifiedField { qualifier, field } => {
1082                                        format!("{qualifier}.{field}")
1083                                    }
1084                                    _ => "?".into(),
1085                                })
1086                            })
1087                            .collect();
1088                        let proj_rows: Vec<Vec<Value>> = rows
1089                            .iter()
1090                            .map(|row| {
1091                                fields
1092                                    .iter()
1093                                    .map(|f| eval_expr(&f.expr, row, &columns))
1094                                    .collect()
1095                            })
1096                            .collect();
1097                        Ok(QueryResult::Rows {
1098                            columns: proj_columns,
1099                            rows: proj_rows,
1100                        })
1101                    }
1102                    _ => Err("project requires row input".into()),
1103                }
1104            }
1105
1106            PlanNode::Sort { input, keys } => {
1107                let result = self.execute_plan_readonly(input)?;
1108                match result {
1109                    QueryResult::Rows { columns, mut rows } => {
1110                        if rows.len() > MAX_SORT_ROWS {
1111                            return Err(QueryError::SortLimitExceeded);
1112                        }
1113                        // WS2: byte-budget guard on the sort buffer.
1114                        self.charge_rows(&rows)?;
1115                        let key_indices: Vec<(usize, bool)> = keys
1116                            .iter()
1117                            .map(|k| {
1118                                columns
1119                                    .iter()
1120                                    .position(|c| c == &k.field)
1121                                    .map(|idx| (idx, k.descending))
1122                                    .ok_or_else(|| QueryError::ColumnNotFound {
1123                                        table: String::new(),
1124                                        column: k.field.clone(),
1125                                    })
1126                            })
1127                            .collect::<Result<_, QueryError>>()?;
1128                        rows.sort_by(|a, b| {
1129                            for &(col_idx, descending) in &key_indices {
1130                                let cmp = a[col_idx].cmp(&b[col_idx]);
1131                                let cmp = if descending { cmp.reverse() } else { cmp };
1132                                if cmp != std::cmp::Ordering::Equal {
1133                                    return cmp;
1134                                }
1135                            }
1136                            std::cmp::Ordering::Equal
1137                        });
1138                        Ok(QueryResult::Rows { columns, rows })
1139                    }
1140                    _ => Err("sort requires row input".into()),
1141                }
1142            }
1143
1144            PlanNode::Limit { input, count } => {
1145                let result = self.execute_plan_readonly(input)?;
1146                let n = match count {
1147                    Expr::Literal(Literal::Int(v)) => *v as usize,
1148                    _ => return Err("limit must be integer literal".into()),
1149                };
1150                match result {
1151                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1152                        columns,
1153                        rows: rows.into_iter().take(n).collect(),
1154                    }),
1155                    _ => Err("limit requires row input".into()),
1156                }
1157            }
1158
1159            PlanNode::Offset { input, count } => {
1160                let result = self.execute_plan_readonly(input)?;
1161                let n = match count {
1162                    Expr::Literal(Literal::Int(v)) => *v as usize,
1163                    _ => return Err("offset must be integer literal".into()),
1164                };
1165                match result {
1166                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1167                        columns,
1168                        rows: rows.into_iter().skip(n).collect(),
1169                    }),
1170                    _ => Err("offset requires row input".into()),
1171                }
1172            }
1173
1174            PlanNode::Aggregate {
1175                input,
1176                function,
1177                field,
1178            } => {
1179                // Fast path: count() over SeqScan.
1180                if *function == AggFunc::Count {
1181                    if let PlanNode::SeqScan { table } = input.as_ref() {
1182                        let mut count: i64 = 0;
1183                        self.catalog
1184                            .for_each_row_raw(table, |_rid, _data| {
1185                                count += 1;
1186                            })
1187                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1188                        return Ok(QueryResult::Scalar(Value::Int(count)));
1189                    }
1190                    if let PlanNode::Filter {
1191                        input: inner,
1192                        predicate,
1193                    } = input.as_ref()
1194                    {
1195                        if let PlanNode::SeqScan { table } = inner.as_ref() {
1196                            let schema = self
1197                                .catalog
1198                                .schema(table)
1199                                .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
1200                                .clone();
1201                            let columns: Vec<String> =
1202                                schema.columns.iter().map(|c| c.name.clone()).collect();
1203                            let fast = FastLayout::new(&schema);
1204                            let row_layout = RowLayout::new(&schema);
1205
1206                            if let Some(compiled) =
1207                                compile_predicate(predicate, &columns, &fast, &schema)
1208                            {
1209                                let mut count: i64 = 0;
1210                                self.catalog
1211                                    .for_each_row_raw(table, |_rid, data| {
1212                                        if compiled(data) {
1213                                            count += 1;
1214                                        }
1215                                    })
1216                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1217                                return Ok(QueryResult::Scalar(Value::Int(count)));
1218                            }
1219
1220                            let pred_cols = predicate_column_indices(predicate, &columns);
1221                            let mut count: i64 = 0;
1222                            self.catalog
1223                                .for_each_row_raw(table, |_rid, data| {
1224                                    let pred_row =
1225                                        decode_selective(&schema, &row_layout, data, &pred_cols);
1226                                    if eval_predicate(predicate, &pred_row, &columns) {
1227                                        count += 1;
1228                                    }
1229                                })
1230                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1231                            return Ok(QueryResult::Scalar(Value::Int(count)));
1232                        }
1233                    }
1234                }
1235
1236                // Fast path: sum/avg/min/max over single fixed-size numeric.
1237                if matches!(
1238                    function,
1239                    AggFunc::Sum
1240                        | AggFunc::Avg
1241                        | AggFunc::Min
1242                        | AggFunc::Max
1243                        | AggFunc::CountDistinct
1244                ) {
1245                    if let Some(col) = field.as_ref() {
1246                        let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
1247                            match input.as_ref() {
1248                                PlanNode::SeqScan { table } => (Some(table.as_str()), None),
1249                                PlanNode::Filter {
1250                                    input: inner,
1251                                    predicate,
1252                                } => {
1253                                    if let PlanNode::SeqScan { table } = inner.as_ref() {
1254                                        (Some(table.as_str()), Some(predicate))
1255                                    } else {
1256                                        (None, None)
1257                                    }
1258                                }
1259                                _ => (None, None),
1260                            };
1261                        if let Some(table) = table_opt {
1262                            if let Some(result) =
1263                                self.agg_single_col_fast(table, col, *function, pred_opt)?
1264                            {
1265                                return Ok(result);
1266                            }
1267                        }
1268                    }
1269                }
1270
1271                // Generic path.
1272                let result = self.execute_plan_readonly(input)?;
1273                match result {
1274                    QueryResult::Rows { columns, rows } => match function {
1275                        AggFunc::Count => Ok(QueryResult::Scalar(Value::Int(rows.len() as i64))),
1276                        AggFunc::CountDistinct => {
1277                            let col = field.as_ref().ok_or("count distinct requires field")?;
1278                            let idx = columns
1279                                .iter()
1280                                .position(|c| c == col)
1281                                .ok_or("col not found")?;
1282                            let mut seen = std::collections::HashSet::new();
1283                            for row in &rows {
1284                                let v = &row[idx];
1285                                if !v.is_empty() {
1286                                    seen.insert(v.clone());
1287                                }
1288                            }
1289                            Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
1290                        }
1291                        AggFunc::Avg => {
1292                            let col = field.as_ref().ok_or("avg requires field")?;
1293                            let idx = columns
1294                                .iter()
1295                                .position(|c| c == col)
1296                                .ok_or("col not found")?;
1297                            let sum: f64 = rows
1298                                .iter()
1299                                .filter_map(|r| match &r[idx] {
1300                                    Value::Int(v) => Some(*v as f64),
1301                                    Value::Float(v) => Some(*v),
1302                                    _ => None,
1303                                })
1304                                .sum();
1305                            let count = rows.len() as f64;
1306                            Ok(QueryResult::Scalar(Value::Float(sum / count)))
1307                        }
1308                        AggFunc::Sum => {
1309                            let col = field.as_ref().ok_or("sum requires field")?;
1310                            let idx = columns
1311                                .iter()
1312                                .position(|c| c == col)
1313                                .ok_or("col not found")?;
1314                            let mut int_sum: i64 = 0;
1315                            let mut float_sum: f64 = 0.0;
1316                            let mut saw_float = false;
1317                            for r in &rows {
1318                                match &r[idx] {
1319                                    Value::Int(v) => int_sum += *v,
1320                                    Value::Float(v) => {
1321                                        float_sum += *v;
1322                                        saw_float = true;
1323                                    }
1324                                    _ => {}
1325                                }
1326                            }
1327                            let result = if saw_float {
1328                                Value::Float(float_sum + int_sum as f64)
1329                            } else {
1330                                Value::Int(int_sum)
1331                            };
1332                            Ok(QueryResult::Scalar(result))
1333                        }
1334                        AggFunc::Min | AggFunc::Max => {
1335                            let col = field.as_ref().ok_or("min/max requires field")?;
1336                            let idx = columns
1337                                .iter()
1338                                .position(|c| c == col)
1339                                .ok_or("col not found")?;
1340                            let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
1341                            let result = if *function == AggFunc::Min {
1342                                vals.into_iter().min().cloned()
1343                            } else {
1344                                vals.into_iter().max().cloned()
1345                            };
1346                            Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
1347                        }
1348                    },
1349                    _ => Err("aggregate requires row input".into()),
1350                }
1351            }
1352
1353            PlanNode::Distinct { input } => {
1354                let result = self.execute_plan_readonly(input)?;
1355                match result {
1356                    QueryResult::Rows { columns, rows } => {
1357                        let mut seen = std::collections::HashSet::new();
1358                        let mut unique_rows = Vec::new();
1359                        for row in rows {
1360                            if seen.insert(row.clone()) {
1361                                unique_rows.push(row);
1362                            }
1363                        }
1364                        Ok(QueryResult::Rows {
1365                            columns,
1366                            rows: unique_rows,
1367                        })
1368                    }
1369                    other => Ok(other),
1370                }
1371            }
1372
1373            PlanNode::GroupBy {
1374                input,
1375                keys,
1376                aggregates,
1377                having,
1378            } => {
1379                let result = self.execute_plan_readonly(input)?;
1380                match result {
1381                    QueryResult::Rows { columns, rows } => {
1382                        // WS2: byte-budget guard on the GROUP BY input buffer
1383                        // (the hash table is bounded by the input it groups).
1384                        self.charge_rows(&rows)?;
1385                        let key_indices: Vec<usize> = keys
1386                            .iter()
1387                            .map(|k| {
1388                                columns.iter().position(|c| c == k).ok_or_else(|| {
1389                                    QueryError::ColumnNotFound {
1390                                        table: String::new(),
1391                                        column: k.clone(),
1392                                    }
1393                                })
1394                            })
1395                            .collect::<Result<Vec<_>, _>>()?;
1396
1397                        let agg_field_indices: Vec<usize> = aggregates
1398                            .iter()
1399                            .map(|a| {
1400                                if a.field == "*" {
1401                                    Ok(usize::MAX)
1402                                } else {
1403                                    columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1404                                        QueryError::ColumnNotFound {
1405                                            table: String::new(),
1406                                            column: a.field.clone(),
1407                                        }
1408                                    })
1409                                }
1410                            })
1411                            .collect::<Result<Vec<_>, _>>()?;
1412
1413                        let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1414                            rustc_hash::FxHashMap::default();
1415                        let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1416                        for (ri, row) in rows.iter().enumerate() {
1417                            let key: Vec<Value> =
1418                                key_indices.iter().map(|&i| row[i].clone()).collect();
1419                            match group_map.get(&key) {
1420                                Some(&idx) => groups[idx].1.push(ri),
1421                                None => {
1422                                    let idx = groups.len();
1423                                    group_map.insert(key.clone(), idx);
1424                                    groups.push((key, vec![ri]));
1425                                }
1426                            }
1427                        }
1428
1429                        let mut out_columns: Vec<String> = keys.clone();
1430                        for agg in aggregates.iter() {
1431                            out_columns.push(agg.output_name.clone());
1432                        }
1433
1434                        let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1435                        for (key_vals, row_indices) in &groups {
1436                            let mut row = key_vals.clone();
1437                            for (ai, agg) in aggregates.iter().enumerate() {
1438                                let col_idx = agg_field_indices[ai];
1439                                let val = compute_group_aggregate(
1440                                    agg.function,
1441                                    &rows,
1442                                    row_indices,
1443                                    col_idx,
1444                                );
1445                                row.push(val);
1446                            }
1447                            out_rows.push(row);
1448                        }
1449
1450                        if let Some(having_expr) = having {
1451                            out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1452                        }
1453
1454                        Ok(QueryResult::Rows {
1455                            columns: out_columns,
1456                            rows: out_rows,
1457                        })
1458                    }
1459                    _ => Err("group by requires row input".into()),
1460                }
1461            }
1462
1463            PlanNode::NestedLoopJoin {
1464                left,
1465                right,
1466                on,
1467                kind,
1468            } => {
1469                let left_result = self.execute_plan_readonly(left)?;
1470                let right_result = self.execute_plan_readonly(right)?;
1471                let (left_columns, left_rows) = match left_result {
1472                    QueryResult::Rows { columns, rows } => (columns, rows),
1473                    _ => return Err("join left side must produce rows".into()),
1474                };
1475                let (right_columns, right_rows) = match right_result {
1476                    QueryResult::Rows { columns, rows } => (columns, rows),
1477                    _ => return Err("join right side must produce rows".into()),
1478                };
1479
1480                // WS2: byte-budget guard on the join build side.
1481                self.charge_rows(&left_rows)?;
1482                self.charge_rows(&right_rows)?;
1483
1484                if !matches!(kind, JoinKind::Cross) {
1485                    if let Some(pred) = on {
1486                        if let Some((l_idx, r_idx)) =
1487                            try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1488                        {
1489                            let result = hash_join(
1490                                left_columns,
1491                                left_rows,
1492                                right_columns,
1493                                right_rows,
1494                                l_idx,
1495                                r_idx,
1496                                *kind,
1497                            );
1498                            if let QueryResult::Rows { ref rows, .. } = result {
1499                                check_join_limit(rows.len())?;
1500                            }
1501                            return Ok(result);
1502                        }
1503                    }
1504                }
1505
1506                let n_left = left_columns.len();
1507                let n_right = right_columns.len();
1508                let mut columns = Vec::with_capacity(n_left + n_right);
1509                columns.extend(left_columns);
1510                columns.extend(right_columns);
1511
1512                let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1513                let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1514
1515                for left_row in &left_rows {
1516                    let mut matched = false;
1517                    for right_row in &right_rows {
1518                        combined.clear();
1519                        combined.extend_from_slice(left_row);
1520                        combined.extend_from_slice(right_row);
1521                        let keep = match kind {
1522                            JoinKind::Cross => true,
1523                            JoinKind::Inner | JoinKind::LeftOuter => match on {
1524                                Some(pred) => eval_predicate(pred, &combined, &columns),
1525                                None => true,
1526                            },
1527                            JoinKind::RightOuter => {
1528                                unreachable!("planner rewrites RightOuter to LeftOuter")
1529                            }
1530                        };
1531                        if keep {
1532                            rows.push(combined.clone());
1533                            check_join_limit(rows.len())?;
1534                            matched = true;
1535                        }
1536                    }
1537                    if !matched && matches!(kind, JoinKind::LeftOuter) {
1538                        let mut row = Vec::with_capacity(n_left + n_right);
1539                        row.extend_from_slice(left_row);
1540                        row.resize(n_left + n_right, Value::Empty);
1541                        rows.push(row);
1542                        check_join_limit(rows.len())?;
1543                    }
1544                }
1545
1546                Ok(QueryResult::Rows { columns, rows })
1547            }
1548
1549            PlanNode::Window { input, windows } => {
1550                let result = self.execute_plan_readonly(input)?;
1551                execute_window(result, windows)
1552            }
1553
1554            PlanNode::Union { left, right, all } => {
1555                let left_result = self.execute_plan_readonly(left)?;
1556                let right_result = self.execute_plan_readonly(right)?;
1557                let (left_cols, left_rows) = match left_result {
1558                    QueryResult::Rows { columns, rows } => (columns, rows),
1559                    _ => return Err("UNION requires query results on left side".into()),
1560                };
1561                let (_, right_rows) = match right_result {
1562                    QueryResult::Rows { columns, rows } => (columns, rows),
1563                    _ => return Err("UNION requires query results on right side".into()),
1564                };
1565                let mut combined = left_rows;
1566                if *all {
1567                    combined.extend(right_rows);
1568                } else {
1569                    let mut seen = std::collections::HashSet::new();
1570                    for row in &combined {
1571                        seen.insert(row.clone());
1572                    }
1573                    for row in right_rows {
1574                        if seen.insert(row.clone()) {
1575                            combined.push(row);
1576                        }
1577                    }
1578                }
1579                Ok(QueryResult::Rows {
1580                    columns: left_cols,
1581                    rows: combined,
1582                })
1583            }
1584
1585            PlanNode::Explain { input } => {
1586                let text = format_plan_tree(input, 0);
1587                Ok(QueryResult::Rows {
1588                    columns: vec!["plan".to_string()],
1589                    rows: text
1590                        .lines()
1591                        .map(|line| vec![Value::Str(line.to_string())])
1592                        .collect(),
1593                })
1594            }
1595
1596            // All write variants — caller must escalate to the write lock.
1597            PlanNode::Insert { .. }
1598            | PlanNode::Update { .. }
1599            | PlanNode::Delete { .. }
1600            | PlanNode::Upsert { .. }
1601            | PlanNode::CreateTable { .. }
1602            | PlanNode::AlterTable { .. }
1603            | PlanNode::DropTable { .. }
1604            | PlanNode::CreateView { .. }
1605            | PlanNode::RefreshView { .. }
1606            | PlanNode::DropView { .. }
1607            | PlanNode::Begin
1608            | PlanNode::Commit
1609            | PlanNode::Rollback => Err(QueryError::ReadonlyNeedsWrite),
1610        }
1611    }
1612
1613    /// `&self` variant of [`Engine::materialize_subqueries`]. Used by the
1614    /// read path so `Filter` predicates with `InSubquery`/`ExistsSubquery`
1615    /// children can evaluate their inner queries without taking the write
1616    /// lock. Inner queries that would themselves need a write (e.g. dirty
1617    /// view) escalate via [`READONLY_NEEDS_WRITE`] just like the top-level
1618    /// read path does.
1619    fn materialize_subqueries_readonly(&self, expr: &Expr) -> Result<Expr, QueryError> {
1620        match expr {
1621            Expr::InSubquery {
1622                expr: inner,
1623                subquery,
1624                negated,
1625            } => {
1626                if is_correlated_subquery(subquery, &self.catalog) {
1627                    // Pass through — will be materialized per-row in the
1628                    // Filter handler's correlated subquery path.
1629                    let inner = self.materialize_subqueries_readonly(inner)?;
1630                    return Ok(Expr::InSubquery {
1631                        expr: Box::new(inner),
1632                        subquery: subquery.clone(),
1633                        negated: *negated,
1634                    });
1635                }
1636                let inner = self.materialize_subqueries_readonly(inner)?;
1637                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1638                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1639                let result = self.execute_plan_readonly(&sub_plan)?;
1640                let values = match result {
1641                    QueryResult::Rows { rows, .. } => rows
1642                        .into_iter()
1643                        .filter_map(|mut row| {
1644                            if row.is_empty() {
1645                                None
1646                            } else {
1647                                Some(value_to_expr(row.swap_remove(0)))
1648                            }
1649                        })
1650                        .collect(),
1651                    _ => Vec::new(),
1652                };
1653                // WS2: byte-budget guard on the materialized IN-list.
1654                self.charge_in_list(&values)?;
1655                Ok(Expr::InList {
1656                    expr: Box::new(inner),
1657                    list: values,
1658                    negated: *negated,
1659                })
1660            }
1661            Expr::ExistsSubquery { subquery, negated } => {
1662                if is_correlated_subquery(subquery, &self.catalog) {
1663                    return Ok(expr.clone());
1664                }
1665                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1666                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1667                let result = self.execute_plan_readonly(&sub_plan)?;
1668                let has_rows = match result {
1669                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
1670                    _ => false,
1671                };
1672                let truth = if *negated { !has_rows } else { has_rows };
1673                Ok(Expr::Literal(Literal::Bool(truth)))
1674            }
1675            Expr::BinaryOp(l, op, r) => {
1676                let l = self.materialize_subqueries_readonly(l)?;
1677                let r = self.materialize_subqueries_readonly(r)?;
1678                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1679            }
1680            Expr::UnaryOp(op, inner) => {
1681                let inner = self.materialize_subqueries_readonly(inner)?;
1682                Ok(Expr::UnaryOp(*op, Box::new(inner)))
1683            }
1684            Expr::Case { whens, else_expr } => {
1685                let whens = whens
1686                    .iter()
1687                    .map(|(c, r)| {
1688                        let c = self.materialize_subqueries_readonly(c)?;
1689                        let r = self.materialize_subqueries_readonly(r)?;
1690                        Ok((Box::new(c), Box::new(r)))
1691                    })
1692                    .collect::<Result<Vec<_>, QueryError>>()?;
1693                let else_expr = match else_expr {
1694                    Some(e) => Some(Box::new(self.materialize_subqueries_readonly(e)?)),
1695                    None => None,
1696                };
1697                Ok(Expr::Case { whens, else_expr })
1698            }
1699            other => Ok(other.clone()),
1700        }
1701    }
1702
1703    /// Per-row materialisation of correlated subqueries. For each row in the
1704    /// outer query, substitute outer column references in the subquery's
1705    /// filter with the current row's literal values, execute the modified
1706    /// subquery, and return the result as an InList or Bool literal.
1707    fn materialize_correlated_for_row_readonly(
1708        &self,
1709        expr: &Expr,
1710        outer_row: &[Value],
1711        outer_columns: &[String],
1712    ) -> Result<Expr, QueryError> {
1713        match expr {
1714            Expr::InSubquery {
1715                expr: inner,
1716                subquery,
1717                negated,
1718            } => {
1719                let inner =
1720                    self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1721                let mut sub = *subquery.clone();
1722                if let Some(ref filter) = sub.filter {
1723                    sub.filter = Some(substitute_outer_refs(
1724                        filter,
1725                        &sub.source,
1726                        &self.catalog,
1727                        outer_row,
1728                        outer_columns,
1729                    ));
1730                }
1731                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1732                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1733                let result = self.execute_plan_readonly(&sub_plan)?;
1734                let values = match result {
1735                    QueryResult::Rows { rows, .. } => rows
1736                        .into_iter()
1737                        .filter_map(|mut row| {
1738                            if row.is_empty() {
1739                                None
1740                            } else {
1741                                Some(value_to_expr(row.swap_remove(0)))
1742                            }
1743                        })
1744                        .collect(),
1745                    _ => Vec::new(),
1746                };
1747                // WS2: byte-budget guard on the per-row materialized IN-list.
1748                self.charge_in_list(&values)?;
1749                Ok(Expr::InList {
1750                    expr: Box::new(inner),
1751                    list: values,
1752                    negated: *negated,
1753                })
1754            }
1755            Expr::ExistsSubquery { subquery, negated } => {
1756                let mut sub = *subquery.clone();
1757                if let Some(ref filter) = sub.filter {
1758                    sub.filter = Some(substitute_outer_refs(
1759                        filter,
1760                        &sub.source,
1761                        &self.catalog,
1762                        outer_row,
1763                        outer_columns,
1764                    ));
1765                }
1766                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1767                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1768                let result = self.execute_plan_readonly(&sub_plan)?;
1769                let has_rows = match result {
1770                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
1771                    _ => false,
1772                };
1773                let truth = if *negated { !has_rows } else { has_rows };
1774                Ok(Expr::Literal(Literal::Bool(truth)))
1775            }
1776            Expr::BinaryOp(l, op, r) => {
1777                let l =
1778                    self.materialize_correlated_for_row_readonly(l, outer_row, outer_columns)?;
1779                let r =
1780                    self.materialize_correlated_for_row_readonly(r, outer_row, outer_columns)?;
1781                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1782            }
1783            Expr::UnaryOp(op, inner) => {
1784                let inner =
1785                    self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1786                Ok(Expr::UnaryOp(*op, Box::new(inner)))
1787            }
1788            other => Ok(other.clone()),
1789        }
1790    }
1791
1792    pub fn catalog(&self) -> &Catalog {
1793        &self.catalog
1794    }
1795
1796    pub fn catalog_mut(&mut self) -> &mut Catalog {
1797        &mut self.catalog
1798    }
1799}