Skip to main content

powdb_query/executor/
mod.rs

1//! PowDB query executor.
2
3// Submodules that don't use macros defined in this file.
4mod compiled;
5mod eval;
6
7use crate::ast::*;
8use crate::canonicalize::canonicalize;
9use crate::plan::*;
10use crate::plan_cache::PlanCache;
11use crate::planner;
12use crate::result::{QueryError, QueryResult};
13use powdb_storage::catalog::Catalog;
14use powdb_storage::row::{decode_column, decode_row, RowLayout};
15use powdb_storage::types::*;
16use powdb_storage::view::ViewRegistry;
17
18use std::io;
19use std::path::Path;
20use std::sync::Mutex;
21use std::time::Instant;
22use tracing::{error, info, Level};
23
24use self::compiled::*;
25use self::eval::*;
26
27/// Legacy sentinel string constant — kept for backward compatibility with
28/// any external code matching on the string representation. New code should
29/// match on `QueryError::ReadonlyNeedsWrite` directly.
30pub const READONLY_NEEDS_WRITE: &str = "__POWDB_READONLY_NEEDS_WRITE__";
31
32/// Plan cache capacity. Bench workloads fill ~15 slots; real apps will sit
33/// comfortably in 256. Lookup is O(1), collisions clear the cache (see
34/// `plan_cache::PlanCache::insert`).
35const PLAN_CACHE_CAPACITY: usize = 256;
36
37/// Maximum number of rows a join may produce before the executor aborts.
38/// Prevents Cartesian-product blowups (e.g. `T cross join T` on 10K rows
39/// would produce 100M rows in memory without this cap).
40pub(super) const MAX_JOIN_ROWS: usize = 1_000_000;
41
42/// Maximum number of rows that may be materialized for sorting.
43/// Queries that exceed this should add a LIMIT clause to narrow the input
44/// before sorting.
45pub(super) const MAX_SORT_ROWS: usize = 10_000_000;
46
47#[inline]
48pub(super) fn check_join_limit(row_count: usize) -> Result<(), QueryError> {
49    if row_count > MAX_JOIN_ROWS {
50        return Err(QueryError::JoinLimitExceeded);
51    }
52    Ok(())
53}
54
55// ─── Mission D11 Phase 1: scalar hot-loop helpers ─────────────────────────
56//
57// These macros expand into the scan body of `agg_single_col_fast` and sit
58// inside the `for_each_row_raw` closure. They exist to:
59//
60//   1. Split the loop on presence of a predicate *outside* the hot body,
61//      so the no-predicate path (agg_sum/agg_min/agg_max bench workloads)
62//      never pays the `Option<CompiledPredicate>` branch per row.
63//   2. Drop two bounds checks per row by reading the null bitmap byte
64//      and the 8-byte value via raw pointer casts.
65//
66// SAFETY (shared across every call site below):
67//
68//   - `$bmp_byte` is `col_idx / 8` where `col_idx < n_cols`, and the row
69//     encoding stores `bitmap_size = n_cols.div_ceil(8)` bytes of bitmap
70//     starting at offset 2. So `2 + $bmp_byte < 2 + bitmap_size ≤ row_len`
71//     and `get_unchecked(2 + $bmp_byte)` is inside the row slice.
72//   - `$off = 2 + bitmap_size + fixed_offsets[col_idx]` for a fixed-size
73//     column. Every fixed-size column contributes `fixed_size(type_id)`
74//     bytes to the fixed region, so the row always has `[$off .. $off+8]`
75//     available for any i64/f64 column — enforced by the row encoder
76//     (`storage/src/row.rs`) and the schema invariant that a row with a
77//     given schema has `row_len ≥ 2 + bitmap_size + fixed_region_size`.
78//   - Both macros are only invoked from `agg_single_col_fast`, which
79//     early-returns if the column isn't Int/Float (8-byte fixed) and
80//     early-returns if `fast.fixed_offsets[col_idx]` is `None`.
81macro_rules! agg_int_loop {
82    (
83        $self:expr, $table:expr, $pred:expr,
84        $bmp_byte:expr, $bmp_bit:expr, $off:expr,
85        |$v:ident : i64| $body:block
86    ) => {{
87        let bmp_byte = $bmp_byte;
88        let bmp_bit = $bmp_bit;
89        let off = $off;
90        if let Some(pred) = &$pred {
91            $self
92                .catalog
93                .for_each_row_raw($table, |_rid, data| {
94                    if !pred(data) {
95                        return;
96                    }
97                    // Bounds guard: skip corrupt/truncated rows that are too
98                    // short to contain the bitmap byte or the 8-byte value.
99                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
100                        return;
101                    }
102                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
103                    // The bitmap byte lives at offset 2..2+bitmap_size in the
104                    // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
105                    // Corrupt rows are rejected by the bounds guard.
106                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
107                    if (bmp >> bmp_bit) & 1 == 1 {
108                        return;
109                    }
110                    // SAFETY: `off + 8 <= data.len()` is checked above.
111                    // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
112                    // to an 8-byte i64 in the fixed-size region of the row.
113                    // The pointer cast is valid because we read exactly 8
114                    // bytes via from_le_bytes. Corrupt rows are rejected by
115                    // the bounds guard.
116                    let $v: i64 =
117                        unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
118                    $body
119                })
120                .map_err(|e| QueryError::StorageError(e.to_string()))?;
121        } else {
122            $self
123                .catalog
124                .for_each_row_raw($table, |_rid, data| {
125                    // Bounds guard: skip corrupt/truncated rows.
126                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
127                        return;
128                    }
129                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
130                    // See the predicate branch for the full invariant.
131                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
132                    if (bmp >> bmp_bit) & 1 == 1 {
133                        return;
134                    }
135                    // SAFETY: `off + 8 <= data.len()` is checked above.
136                    // See the predicate branch for the full invariant.
137                    let $v: i64 =
138                        unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
139                    $body
140                })
141                .map_err(|e| QueryError::StorageError(e.to_string()))?;
142        }
143    }};
144}
145
146macro_rules! agg_float_loop {
147    (
148        $self:expr, $table:expr, $pred:expr,
149        $bmp_byte:expr, $bmp_bit:expr, $off:expr,
150        |$v:ident : f64| $body:block
151    ) => {{
152        let bmp_byte = $bmp_byte;
153        let bmp_bit = $bmp_bit;
154        let off = $off;
155        if let Some(pred) = &$pred {
156            $self
157                .catalog
158                .for_each_row_raw($table, |_rid, data| {
159                    if !pred(data) {
160                        return;
161                    }
162                    // Bounds guard: skip corrupt/truncated rows that are too
163                    // short to contain the bitmap byte or the 8-byte value.
164                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
165                        return;
166                    }
167                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
168                    // The bitmap byte lives at offset 2..2+bitmap_size in the
169                    // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
170                    // Corrupt rows are rejected by the bounds guard.
171                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
172                    if (bmp >> bmp_bit) & 1 == 1 {
173                        return;
174                    }
175                    // SAFETY: `off + 8 <= data.len()` is checked above.
176                    // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
177                    // to an 8-byte f64 in the fixed-size region of the row.
178                    // The pointer cast is valid because we read exactly 8
179                    // bytes via from_le_bytes. Corrupt rows are rejected by
180                    // the bounds guard.
181                    let $v: f64 =
182                        unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
183                    $body
184                })
185                .map_err(|e| QueryError::StorageError(e.to_string()))?;
186        } else {
187            $self
188                .catalog
189                .for_each_row_raw($table, |_rid, data| {
190                    // Bounds guard: skip corrupt/truncated rows.
191                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
192                        return;
193                    }
194                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
195                    // See the predicate branch for the full invariant.
196                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
197                    if (bmp >> bmp_bit) & 1 == 1 {
198                        return;
199                    }
200                    // SAFETY: `off + 8 <= data.len()` is checked above.
201                    // See the predicate branch for the full invariant.
202                    let $v: f64 =
203                        unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
204                    $body
205                })
206                .map_err(|e| QueryError::StorageError(e.to_string()))?;
207        }
208    }};
209}
210
211// Submodules that use the macros above — must be declared after macro_rules!.
212mod plan_exec;
213mod prepared;
214
215#[cfg(test)]
216mod tests;
217
218// Re-exports for the public API
219pub use self::prepared::PreparedQuery;
220
221use self::plan_exec::{
222    compute_group_aggregate, execute_window, format_plan_tree, hash_join,
223    lower_unindexed_range_scans, range_matches, synthesize_range_predicate,
224    try_extract_equi_join_keys,
225};
226
227/// Mission infra-1: classify a parsed statement as read-only vs. mutating.
228/// Used by [`Engine::execute_powql_readonly`] and by the server handler
229/// to decide between the RwLock reader and writer sides. `Union` recurses
230/// because each side can independently be read/write (though in practice
231/// both sides are reads — the parser only builds Union from query shapes).
232pub fn is_read_only_statement(stmt: &Statement) -> bool {
233    match stmt {
234        Statement::Query(_) => true,
235        Statement::Union(u) => is_read_only_statement(&u.left) && is_read_only_statement(&u.right),
236        Statement::Insert(_)
237        | Statement::Upsert(_)
238        | Statement::UpdateQuery(_)
239        | Statement::DeleteQuery(_)
240        | Statement::CreateType(_)
241        | Statement::AlterTable(_)
242        | Statement::DropTable(_)
243        | Statement::CreateView(_)
244        | Statement::RefreshView(_)
245        | Statement::DropView(_) => false,
246        Statement::Explain(inner) => is_read_only_statement(inner),
247    }
248}
249
250pub struct Engine {
251    catalog: Catalog,
252    /// Mission D9 — cached parsed+planned query trees keyed by canonical
253    /// hash. Saves the ~3μs parse+plan cost on repeat queries that differ
254    /// only in literal values.
255    ///
256    /// Mission infra-1: wrapped in `Mutex` so the read path can be driven
257    /// by `&self`. The critical section is extremely short — a single
258    /// hashmap lookup + plan clone on a hit, or a single insert on a miss.
259    /// A full `RwLock` would be over-engineered here; the contention window
260    /// is smaller than the read-path scan work it gates.
261    plan_cache: Mutex<PlanCache>,
262    /// Mission C Phase 13: reusable `Vec<Value>` scratch buffer for the
263    /// prepared-insert fast path. `execute_prepared` used to allocate a
264    /// fresh `vec![Value::Empty; n_cols]` on every insert; recycling this
265    /// buffer shaves one heap alloc per row on `insert_batch_1k`.
266    insert_values_scratch: Vec<Value>,
267    /// Materialized view registry: tracks view definitions, dependencies,
268    /// and dirty state. Views are backed by regular catalog tables; this
269    /// registry adds the lifecycle metadata.
270    view_registry: ViewRegistry,
271}
272
273impl Engine {
274    /// Open or create a PowDB engine rooted at `data_dir`.
275    ///
276    /// If the directory already contains a catalog, it is reopened.
277    /// Otherwise a fresh empty database is created.
278    ///
279    /// # Examples
280    ///
281    /// ```
282    /// use powdb_query::executor::Engine;
283    ///
284    /// let dir = tempfile::tempdir().unwrap();
285    /// let engine = Engine::new(dir.path()).unwrap();
286    /// // Engine is ready — the directory now contains a catalog.
287    /// ```
288    pub fn new(data_dir: &Path) -> io::Result<Self> {
289        std::fs::create_dir_all(data_dir)?;
290        // Try to reopen an existing database first; only create a fresh
291        // catalog when there isn't one already on disk.
292        let catalog = match Catalog::open(data_dir) {
293            Ok(c) => {
294                info!(data_dir = %data_dir.display(), "engine reopened existing database");
295                c
296            }
297            Err(e) if e.kind() == io::ErrorKind::NotFound => {
298                info!(data_dir = %data_dir.display(), "engine initialized fresh database");
299                Catalog::create(data_dir)?
300            }
301            Err(e) => return Err(e),
302        };
303        let view_registry =
304            ViewRegistry::open(data_dir).unwrap_or_else(|_| ViewRegistry::new(data_dir));
305        Ok(Engine {
306            catalog,
307            plan_cache: Mutex::new(PlanCache::new(PLAN_CACHE_CAPACITY)),
308            insert_values_scratch: Vec::new(),
309            view_registry,
310        })
311    }
312
313    /// Parse + plan + execute a PowQL query.
314    ///
315    /// # Examples
316    ///
317    /// ```
318    /// use powdb_query::executor::Engine;
319    /// use powdb_query::result::QueryResult;
320    ///
321    /// let dir = tempfile::tempdir().unwrap();
322    /// let mut engine = Engine::new(dir.path()).unwrap();
323    ///
324    /// // Create a table and insert a row.
325    /// engine.execute_powql("type User { required name: str, age: int }").unwrap();
326    /// engine.execute_powql(r#"insert User { name := "Alice", age := 30 }"#).unwrap();
327    ///
328    /// // Query rows back.
329    /// let result = engine.execute_powql("User").unwrap();
330    /// assert_eq!(result.row_count(), 1);
331    /// ```
332    ///
333    /// Mission D6 — tracing collapse: the previous implementation ran 4
334    /// `Instant::now()` + 3 `elapsed().as_micros()` calls + formatted an
335    /// `info!` span on every query, even when tracing was disabled. On a
336    /// sub-microsecond `point_lookup_indexed` call that overhead was
337    /// 100-200ns — 20%+ of the whole query. We now measure time only when
338    /// INFO is actually enabled via `tracing::enabled!`, and we moved the
339    /// noisy `debug!(?plan)` line behind the same gate so the Debug
340    /// formatter can't run unconditionally either.
341    ///
342    /// Mission D9 — plan cache: on the hot path we canonicalise the query
343    /// text (lex + FNV-1a hash with literal values stripped), check the
344    /// cache, and on a hit substitute the new literals into a clone of the
345    /// cached plan. This skips re-lexing, re-parsing, and re-planning —
346    /// around 3μs per call on bench workloads. On a miss we plan as before
347    /// and insert the plan under its canonical hash.
348    pub fn execute_powql(&mut self, input: &str) -> Result<QueryResult, QueryError> {
349        // Hot path: tracing disabled. Zero syscalls, zero formatting.
350        if !tracing::enabled!(Level::INFO) {
351            // D9: try the plan cache first. Canonicalisation lexes the
352            // query once; on a hit we skip the parser and planner entirely.
353            if let Ok((hash, literals)) = canonicalize(input) {
354                let cached = self
355                    .plan_cache
356                    .lock()
357                    .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
358                    .get_with_substitution(hash, &literals);
359                if let Some(plan) = cached {
360                    let plan = lower_unindexed_range_scans(&self.catalog, &plan);
361                    let result = self.execute_plan(&plan);
362                    // Mission B (post-review): statement-boundary WAL
363                    // group commit. Catalog::wal_log now only appends;
364                    // the fsync happens here exactly once per statement.
365                    // `sync_wal` is a no-op when nothing was buffered
366                    // (pure reads pay zero fsync).
367                    self.catalog
368                        .sync_wal()
369                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
370                    return result;
371                }
372                // Miss — plan, insert, execute.
373                return match planner::plan(input) {
374                    Ok(plan) => {
375                        self.plan_cache
376                            .lock()
377                            .map_err(|e| {
378                                QueryError::Execution(format!("plan cache lock poisoned: {e}"))
379                            })?
380                            .insert(hash, plan.clone());
381                        let plan = lower_unindexed_range_scans(&self.catalog, &plan);
382                        let result = self.execute_plan(&plan);
383                        self.catalog
384                            .sync_wal()
385                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
386                        result
387                    }
388                    Err(e) => Err(QueryError::Parse(e.to_string())),
389                };
390            }
391            // Lex error — fall through to the planner so the caller gets a
392            // consistent error shape.
393            return match planner::plan(input) {
394                Ok(plan) => {
395                    let plan = lower_unindexed_range_scans(&self.catalog, &plan);
396                    let result = self.execute_plan(&plan);
397                    self.catalog
398                        .sync_wal()
399                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
400                    result
401                }
402                Err(e) => Err(QueryError::Parse(e.to_string())),
403            };
404        }
405
406        // Instrumented path — only taken under explicit tracing subscribers.
407        let total_start = Instant::now();
408        let plan_start = Instant::now();
409        let plan = planner::plan(input).map_err(|e| {
410            let msg = e.to_string();
411            error!(query = %input, error = %msg, "query plan failed");
412            QueryError::Parse(msg)
413        })?;
414        let plan_us = plan_start.elapsed().as_micros();
415
416        let exec_start = Instant::now();
417        let plan = lower_unindexed_range_scans(&self.catalog, &plan);
418        let result = self.execute_plan(&plan);
419        // Mission B (post-review): statement-boundary WAL flush.
420        let _ = self.catalog.sync_wal();
421        let exec_us = exec_start.elapsed().as_micros();
422
423        let total_us = total_start.elapsed().as_micros();
424        match &result {
425            Ok(r) => {
426                info!(
427                    query = %input,
428                    plan_us = plan_us,
429                    exec_us = exec_us,
430                    total_us = total_us,
431                    rows = r.row_count(),
432                    "query ok"
433                );
434            }
435            Err(e) => {
436                error!(
437                    query = %input,
438                    plan_us = plan_us,
439                    exec_us = exec_us,
440                    error = %e,
441                    "query failed"
442                );
443            }
444        }
445        result
446    }
447
448    /// Plan cache stats — useful for benches and debugging.
449    pub fn plan_cache_stats(&self) -> (u64, u64, usize) {
450        let cache = self.plan_cache.lock().unwrap_or_else(|e| e.into_inner());
451        (cache.hits, cache.misses, cache.len())
452    }
453
454    /// Mission infra-1: read-only entry point.
455    ///
456    /// Parses + plans + executes a PowQL query using only a shared borrow
457    /// on the engine. Rejects any statement that would mutate state
458    /// (Insert/Update/Delete/CreateTable/AlterTable/DropTable/CreateView/
459    /// RefreshView/DropView) by returning [`READONLY_NEEDS_WRITE`] so the
460    /// caller can escalate to the write lock.
461    ///
462    /// Also returns [`READONLY_NEEDS_WRITE`] if a materialized view in the
463    /// query is dirty — refreshing one requires `&mut self`, so the caller
464    /// must retake the write lock for the first refresh.
465    ///
466    /// This method is the concurrent-read fast path behind
467    /// `Arc<RwLock<Engine>>`: multiple threads can call it simultaneously
468    /// under a shared `.read()` lock and each will scan independently.
469    pub fn execute_powql_readonly(&self, input: &str) -> Result<QueryResult, QueryError> {
470        // Parse the statement first so we can classify read vs. write
471        // without touching the catalog. This is the same lex+parse cost
472        // the hot path would pay anyway.
473        let stmt = crate::parser::parse(input).map_err(|e| QueryError::Parse(e.to_string()))?;
474        if !is_read_only_statement(&stmt) {
475            return Err(QueryError::ReadonlyNeedsWrite);
476        }
477
478        // Try the plan cache first — identical hash scheme to
479        // `execute_powql` so both paths share cache state. The mutex
480        // section is just a hashmap lookup + plan clone.
481        if let Ok((hash, literals)) = canonicalize(input) {
482            let cached = self
483                .plan_cache
484                .lock()
485                .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
486                .get_with_substitution(hash, &literals);
487            if let Some(plan) = cached {
488                let plan = lower_unindexed_range_scans(&self.catalog, &plan);
489                return self.execute_plan_readonly(&plan);
490            }
491            // Miss: plan + insert + execute. The planner is pure, so this
492            // is safe from `&self`.
493            let plan = crate::planner::plan_statement(stmt)
494                .map_err(|e| QueryError::Parse(e.to_string()))?;
495            self.plan_cache
496                .lock()
497                .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
498                .insert(hash, plan.clone());
499            let plan = lower_unindexed_range_scans(&self.catalog, &plan);
500            return self.execute_plan_readonly(&plan);
501        }
502        // Lex error — fall through to the planner for a consistent error
503        // shape (though `parse` above would usually have caught it).
504        let plan =
505            crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
506        let plan = lower_unindexed_range_scans(&self.catalog, &plan);
507        self.execute_plan_readonly(&plan)
508    }
509
510    /// Read-only version of [`Engine::execute_plan`]. Dispatches the
511    /// read-path plan variants by calling `&self` helpers and errors with
512    /// [`READONLY_NEEDS_WRITE`] on any write variant. This is the
513    /// recursion target for composite read plans under the RwLock reader.
514    ///
515    /// The dispatch mirrors `execute_plan` for the read branches but does
516    /// not carry any of the fast-paths that need `&mut self` (e.g. plan-
517    /// cache mutation on inner subqueries is handled via the shared mutex
518    /// in [`Engine::execute_powql_readonly`]; in-flight subquery
519    /// materialisation uses [`Engine::materialize_subqueries_readonly`]).
520    fn execute_plan_readonly(&self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
521        match plan {
522            PlanNode::SeqScan { table } => {
523                // Dirty view means we'd need to refresh it — can't do that
524                // under `&self`. Escalate to the write path.
525                if self.view_registry.is_dirty(table) {
526                    return Err(QueryError::ReadonlyNeedsWrite);
527                }
528                let schema = self
529                    .catalog
530                    .schema(table)
531                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
532                    .clone();
533                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
534                let rows: Vec<Vec<Value>> = self
535                    .catalog
536                    .scan(table)
537                    .map_err(|e| e.to_string())?
538                    .map(|(_, row)| row)
539                    .collect();
540                Ok(QueryResult::Rows { columns, rows })
541            }
542
543            PlanNode::AliasScan { table, alias } => {
544                let schema = self
545                    .catalog
546                    .schema(table)
547                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
548                    .clone();
549                let columns: Vec<String> = schema
550                    .columns
551                    .iter()
552                    .map(|c| format!("{alias}.{}", c.name))
553                    .collect();
554                let rows: Vec<Vec<Value>> = self
555                    .catalog
556                    .scan(table)
557                    .map_err(|e| e.to_string())?
558                    .map(|(_, row)| row)
559                    .collect();
560                Ok(QueryResult::Rows { columns, rows })
561            }
562
563            PlanNode::IndexScan { table, column, key } => {
564                let schema = self
565                    .catalog
566                    .schema(table)
567                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
568                    .clone();
569                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
570                let key_value = literal_to_value(key)?;
571                let tbl = self
572                    .catalog
573                    .get_table(table)
574                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
575
576                if let Some(btree) = tbl.index(column) {
577                    let hit = match &key_value {
578                        Value::Int(k) => btree.lookup_int(*k),
579                        other => btree.lookup(other),
580                    };
581                    let rows = match hit {
582                        Some(rid) => match tbl.heap.get(rid) {
583                            Some(data) => vec![decode_row(&tbl.schema, &data)],
584                            None => Vec::new(),
585                        },
586                        None => Vec::new(),
587                    };
588                    return Ok(QueryResult::Rows { columns, rows });
589                }
590
591                // No index: synthetic eq predicate + compiled scan.
592                let fast = FastLayout::new(&schema);
593                let synth_pred = Expr::BinaryOp(
594                    Box::new(Expr::Field(column.clone())),
595                    BinOp::Eq,
596                    Box::new(key.clone()),
597                );
598                if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, &schema) {
599                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
600                    self.catalog
601                        .for_each_row_raw(table, |_rid, data| {
602                            if compiled(data) {
603                                rows.push(decode_row(&schema, data));
604                            }
605                        })
606                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
607                    return Ok(QueryResult::Rows { columns, rows });
608                }
609
610                // Last resort: slow eq-check.
611                let col_idx =
612                    schema
613                        .column_index(column)
614                        .ok_or_else(|| QueryError::ColumnNotFound {
615                            table: String::new(),
616                            column: column.clone(),
617                        })?;
618                let rows: Vec<Vec<Value>> = tbl
619                    .scan()
620                    .filter_map(|(_, row)| {
621                        if row[col_idx] == key_value {
622                            Some(row)
623                        } else {
624                            None
625                        }
626                    })
627                    .collect();
628                Ok(QueryResult::Rows { columns, rows })
629            }
630
631            PlanNode::RangeScan {
632                table,
633                column,
634                start,
635                end,
636            } => {
637                let tbl = self
638                    .catalog
639                    .get_table(table)
640                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
641                let columns: Vec<String> =
642                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
643                let schema = tbl.schema.clone();
644
645                let start_val = match start {
646                    Some((expr, _)) => Some(literal_to_value(expr)?),
647                    None => None,
648                };
649                let end_val = match end {
650                    Some((expr, _)) => Some(literal_to_value(expr)?),
651                    None => None,
652                };
653                let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
654                let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
655
656                if let Some(btree) = tbl.index(column) {
657                    let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
658                        (Some(s), Some(e)) => btree.range(s, e).collect(),
659                        (Some(s), None) => btree.range_from(s),
660                        (None, Some(e)) => btree.range_to(e),
661                        (None, None) => {
662                            // Unbounded both sides — equivalent to seq scan.
663                            let rows: Vec<Vec<Value>> = tbl.scan().map(|(_, row)| row).collect();
664                            return Ok(QueryResult::Rows { columns, rows });
665                        }
666                    };
667                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
668                    for (key, rid) in hits {
669                        // Filter for exclusive bounds.
670                        if !start_inclusive {
671                            if let Some(ref s) = start_val {
672                                if &key == s {
673                                    continue;
674                                }
675                            }
676                        }
677                        if !end_inclusive {
678                            if let Some(ref e) = end_val {
679                                if &key == e {
680                                    continue;
681                                }
682                            }
683                        }
684                        if let Some(data) = tbl.heap.get(rid) {
685                            rows.push(decode_row(&schema, &data));
686                        }
687                    }
688                    return Ok(QueryResult::Rows { columns, rows });
689                }
690
691                // Fallback: no index — synthesize the range predicate and scan.
692                let fast = FastLayout::new(&schema);
693                let synth = synthesize_range_predicate(column, start, end);
694                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, &schema) {
695                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
696                    self.catalog
697                        .for_each_row_raw(table, |_rid, data| {
698                            if compiled(data) {
699                                rows.push(decode_row(&schema, data));
700                            }
701                        })
702                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
703                    return Ok(QueryResult::Rows { columns, rows });
704                }
705
706                // Last resort: decoded row eval.
707                let col_idx =
708                    schema
709                        .column_index(column)
710                        .ok_or_else(|| QueryError::ColumnNotFound {
711                            table: String::new(),
712                            column: column.clone(),
713                        })?;
714                let rows: Vec<Vec<Value>> = tbl
715                    .scan()
716                    .filter(|(_, row)| {
717                        range_matches(
718                            &row[col_idx],
719                            &start_val,
720                            start_inclusive,
721                            &end_val,
722                            end_inclusive,
723                        )
724                    })
725                    .map(|(_, row)| row)
726                    .collect();
727                Ok(QueryResult::Rows { columns, rows })
728            }
729
730            PlanNode::Filter { input, predicate } => {
731                // Materialise subqueries using the `&self` variant.
732                // Uncorrelated subqueries are replaced with InList/Bool;
733                // correlated ones are left as InSubquery/ExistsSubquery
734                // for per-row materialisation below.
735                let materialized;
736                let predicate = if contains_subquery(predicate) {
737                    materialized = self.materialize_subqueries_readonly(predicate)?;
738                    &materialized
739                } else {
740                    predicate
741                };
742
743                // Correlated subquery path: per-row materialisation.
744                if contains_subquery(predicate) {
745                    let result = self.execute_plan_readonly(input)?;
746                    return match result {
747                        QueryResult::Rows { columns, rows } => {
748                            let mut filtered = Vec::new();
749                            for row in rows {
750                                let row_pred = self.materialize_correlated_for_row_readonly(
751                                    predicate, &row, &columns,
752                                )?;
753                                if eval_predicate(&row_pred, &row, &columns) {
754                                    filtered.push(row);
755                                }
756                            }
757                            Ok(QueryResult::Rows {
758                                columns,
759                                rows: filtered,
760                            })
761                        }
762                        _ => Err("filter requires row input".into()),
763                    };
764                }
765
766                // Fused Filter+SeqScan fast path.
767                if let PlanNode::SeqScan { table } = input.as_ref() {
768                    if self.view_registry.is_dirty(table) {
769                        return Err(QueryError::ReadonlyNeedsWrite);
770                    }
771                    let schema = self
772                        .catalog
773                        .schema(table)
774                        .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
775                        .clone();
776                    let columns: Vec<String> =
777                        schema.columns.iter().map(|c| c.name.clone()).collect();
778                    let fast = FastLayout::new(&schema);
779                    let row_layout = RowLayout::new(&schema);
780                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
781
782                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
783                        self.catalog
784                            .for_each_row_raw(table, |_rid, data| {
785                                if compiled(data) {
786                                    rows.push(decode_row(&schema, data));
787                                }
788                            })
789                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
790                    } else {
791                        let pred_cols = predicate_column_indices(predicate, &columns);
792                        self.catalog
793                            .for_each_row_raw(table, |_rid, data| {
794                                let pred_row =
795                                    decode_selective(&schema, &row_layout, data, &pred_cols);
796                                if eval_predicate(predicate, &pred_row, &columns) {
797                                    rows.push(decode_row(&schema, data));
798                                }
799                            })
800                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
801                    }
802
803                    return Ok(QueryResult::Rows { columns, rows });
804                }
805
806                // General path.
807                let result = self.execute_plan_readonly(input)?;
808                match result {
809                    QueryResult::Rows { columns, rows } => {
810                        let filtered: Vec<Vec<Value>> = rows
811                            .into_iter()
812                            .filter(|row| eval_predicate(predicate, row, &columns))
813                            .collect();
814                        Ok(QueryResult::Rows {
815                            columns,
816                            rows: filtered,
817                        })
818                    }
819                    _ => Err("filter requires row input".into()),
820                }
821            }
822
823            PlanNode::Project { input, fields } => {
824                // Fast path: Project over IndexScan. Avoids full-row decode
825                // by calling decode_column only for projected fields.
826                if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
827                    let key_value = literal_to_value(key)?;
828                    let tbl = self
829                        .catalog
830                        .get_table(table)
831                        .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
832                    let schema = &tbl.schema;
833                    let layout = tbl.row_layout();
834
835                    let proj_columns: Vec<String> = fields
836                        .iter()
837                        .map(|f| {
838                            f.alias.clone().unwrap_or_else(|| match &f.expr {
839                                Expr::Field(name) => name.clone(),
840                                _ => "?".into(),
841                            })
842                        })
843                        .collect();
844
845                    let proj_indices: Vec<usize> = fields
846                        .iter()
847                        .filter_map(|f| {
848                            if let Expr::Field(name) = &f.expr {
849                                schema.column_index(name)
850                            } else {
851                                None
852                            }
853                        })
854                        .collect();
855
856                    if let Some(btree) = tbl.index(column) {
857                        let lookup_result = match &key_value {
858                            Value::Int(k) => btree.lookup_int(*k),
859                            other => btree.lookup(other),
860                        };
861                        let rows = match lookup_result {
862                            Some(rid) => match tbl.heap.get(rid) {
863                                Some(data) => {
864                                    let row: Vec<Value> = proj_indices
865                                        .iter()
866                                        .map(|&ci| decode_column(schema, layout, &data, ci))
867                                        .collect();
868                                    vec![row]
869                                }
870                                None => Vec::new(),
871                            },
872                            None => Vec::new(),
873                        };
874                        return Ok(QueryResult::Rows {
875                            columns: proj_columns,
876                            rows,
877                        });
878                    }
879                }
880
881                // Fast paths over Limit(Sort(...)) / Limit(Filter(...)) / Limit(SeqScan).
882                if let PlanNode::Limit {
883                    input: inner,
884                    count: limit_expr,
885                } = input.as_ref()
886                {
887                    if let PlanNode::Sort {
888                        input: sort_input,
889                        keys,
890                    } = inner.as_ref()
891                    {
892                        if keys.len() == 1 {
893                            let sort_field = &keys[0].field;
894                            let descending = keys[0].descending;
895                            let limit = match limit_expr {
896                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
897                                _ => usize::MAX,
898                            };
899                            let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
900                                match sort_input.as_ref() {
901                                    PlanNode::SeqScan { table } => (Some(table.as_str()), None),
902                                    PlanNode::Filter {
903                                        input: fi,
904                                        predicate,
905                                    } => {
906                                        if let PlanNode::SeqScan { table } = fi.as_ref() {
907                                            (Some(table.as_str()), Some(predicate))
908                                        } else {
909                                            (None, None)
910                                        }
911                                    }
912                                    _ => (None, None),
913                                };
914                            if let Some(table) = table_opt {
915                                if let Some(result) = self.project_filter_sort_limit_fast(
916                                    table, fields, sort_field, descending, limit, pred_opt,
917                                )? {
918                                    return Ok(result);
919                                }
920                            }
921                        }
922                    }
923                    if let PlanNode::Filter {
924                        input: fi,
925                        predicate,
926                    } = inner.as_ref()
927                    {
928                        if let PlanNode::SeqScan { table } = fi.as_ref() {
929                            let limit = match limit_expr {
930                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
931                                _ => usize::MAX,
932                            };
933                            if let Some(result) = self.project_filter_limit_fast(
934                                table,
935                                fields,
936                                limit,
937                                Some(predicate),
938                            )? {
939                                return Ok(result);
940                            }
941                        }
942                    }
943                    if let PlanNode::SeqScan { table } = inner.as_ref() {
944                        let limit = match limit_expr {
945                            Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
946                            _ => usize::MAX,
947                        };
948                        if let Some(result) =
949                            self.project_filter_limit_fast(table, fields, limit, None)?
950                        {
951                            return Ok(result);
952                        }
953                    }
954                }
955
956                // Project(Filter(SeqScan)) without Limit.
957                if let PlanNode::Filter {
958                    input: fi,
959                    predicate,
960                } = input.as_ref()
961                {
962                    if let PlanNode::SeqScan { table } = fi.as_ref() {
963                        if let Some(result) = self.project_filter_limit_fast(
964                            table,
965                            fields,
966                            usize::MAX,
967                            Some(predicate),
968                        )? {
969                            return Ok(result);
970                        }
971                    }
972                }
973
974                // Project(SeqScan) without Filter or Limit.
975                if let PlanNode::SeqScan { table } = input.as_ref() {
976                    if let Some(result) =
977                        self.project_filter_limit_fast(table, fields, usize::MAX, None)?
978                    {
979                        return Ok(result);
980                    }
981                }
982
983                // Generic path.
984                let result = self.execute_plan_readonly(input)?;
985                match result {
986                    QueryResult::Rows { columns, rows } => {
987                        let proj_columns: Vec<String> = fields
988                            .iter()
989                            .map(|f| {
990                                f.alias.clone().unwrap_or_else(|| match &f.expr {
991                                    Expr::Field(name) => name.clone(),
992                                    Expr::QualifiedField { qualifier, field } => {
993                                        format!("{qualifier}.{field}")
994                                    }
995                                    _ => "?".into(),
996                                })
997                            })
998                            .collect();
999                        let proj_rows: Vec<Vec<Value>> = rows
1000                            .iter()
1001                            .map(|row| {
1002                                fields
1003                                    .iter()
1004                                    .map(|f| eval_expr(&f.expr, row, &columns))
1005                                    .collect()
1006                            })
1007                            .collect();
1008                        Ok(QueryResult::Rows {
1009                            columns: proj_columns,
1010                            rows: proj_rows,
1011                        })
1012                    }
1013                    _ => Err("project requires row input".into()),
1014                }
1015            }
1016
1017            PlanNode::Sort { input, keys } => {
1018                let result = self.execute_plan_readonly(input)?;
1019                match result {
1020                    QueryResult::Rows { columns, mut rows } => {
1021                        if rows.len() > MAX_SORT_ROWS {
1022                            return Err(QueryError::SortLimitExceeded);
1023                        }
1024                        let key_indices: Vec<(usize, bool)> = keys
1025                            .iter()
1026                            .map(|k| {
1027                                columns
1028                                    .iter()
1029                                    .position(|c| c == &k.field)
1030                                    .map(|idx| (idx, k.descending))
1031                                    .ok_or_else(|| QueryError::ColumnNotFound {
1032                                        table: String::new(),
1033                                        column: k.field.clone(),
1034                                    })
1035                            })
1036                            .collect::<Result<_, QueryError>>()?;
1037                        rows.sort_by(|a, b| {
1038                            for &(col_idx, descending) in &key_indices {
1039                                let cmp = a[col_idx].cmp(&b[col_idx]);
1040                                let cmp = if descending { cmp.reverse() } else { cmp };
1041                                if cmp != std::cmp::Ordering::Equal {
1042                                    return cmp;
1043                                }
1044                            }
1045                            std::cmp::Ordering::Equal
1046                        });
1047                        Ok(QueryResult::Rows { columns, rows })
1048                    }
1049                    _ => Err("sort requires row input".into()),
1050                }
1051            }
1052
1053            PlanNode::Limit { input, count } => {
1054                let result = self.execute_plan_readonly(input)?;
1055                let n = match count {
1056                    Expr::Literal(Literal::Int(v)) => *v as usize,
1057                    _ => return Err("limit must be integer literal".into()),
1058                };
1059                match result {
1060                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1061                        columns,
1062                        rows: rows.into_iter().take(n).collect(),
1063                    }),
1064                    _ => Err("limit requires row input".into()),
1065                }
1066            }
1067
1068            PlanNode::Offset { input, count } => {
1069                let result = self.execute_plan_readonly(input)?;
1070                let n = match count {
1071                    Expr::Literal(Literal::Int(v)) => *v as usize,
1072                    _ => return Err("offset must be integer literal".into()),
1073                };
1074                match result {
1075                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1076                        columns,
1077                        rows: rows.into_iter().skip(n).collect(),
1078                    }),
1079                    _ => Err("offset requires row input".into()),
1080                }
1081            }
1082
1083            PlanNode::Aggregate {
1084                input,
1085                function,
1086                field,
1087            } => {
1088                // Fast path: count() over SeqScan.
1089                if *function == AggFunc::Count {
1090                    if let PlanNode::SeqScan { table } = input.as_ref() {
1091                        let mut count: i64 = 0;
1092                        self.catalog
1093                            .for_each_row_raw(table, |_rid, _data| {
1094                                count += 1;
1095                            })
1096                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1097                        return Ok(QueryResult::Scalar(Value::Int(count)));
1098                    }
1099                    if let PlanNode::Filter {
1100                        input: inner,
1101                        predicate,
1102                    } = input.as_ref()
1103                    {
1104                        if let PlanNode::SeqScan { table } = inner.as_ref() {
1105                            let schema = self
1106                                .catalog
1107                                .schema(table)
1108                                .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
1109                                .clone();
1110                            let columns: Vec<String> =
1111                                schema.columns.iter().map(|c| c.name.clone()).collect();
1112                            let fast = FastLayout::new(&schema);
1113                            let row_layout = RowLayout::new(&schema);
1114
1115                            if let Some(compiled) =
1116                                compile_predicate(predicate, &columns, &fast, &schema)
1117                            {
1118                                let mut count: i64 = 0;
1119                                self.catalog
1120                                    .for_each_row_raw(table, |_rid, data| {
1121                                        if compiled(data) {
1122                                            count += 1;
1123                                        }
1124                                    })
1125                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1126                                return Ok(QueryResult::Scalar(Value::Int(count)));
1127                            }
1128
1129                            let pred_cols = predicate_column_indices(predicate, &columns);
1130                            let mut count: i64 = 0;
1131                            self.catalog
1132                                .for_each_row_raw(table, |_rid, data| {
1133                                    let pred_row =
1134                                        decode_selective(&schema, &row_layout, data, &pred_cols);
1135                                    if eval_predicate(predicate, &pred_row, &columns) {
1136                                        count += 1;
1137                                    }
1138                                })
1139                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1140                            return Ok(QueryResult::Scalar(Value::Int(count)));
1141                        }
1142                    }
1143                }
1144
1145                // Fast path: sum/avg/min/max over single fixed-size numeric.
1146                if matches!(
1147                    function,
1148                    AggFunc::Sum
1149                        | AggFunc::Avg
1150                        | AggFunc::Min
1151                        | AggFunc::Max
1152                        | AggFunc::CountDistinct
1153                ) {
1154                    if let Some(col) = field.as_ref() {
1155                        let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
1156                            match input.as_ref() {
1157                                PlanNode::SeqScan { table } => (Some(table.as_str()), None),
1158                                PlanNode::Filter {
1159                                    input: inner,
1160                                    predicate,
1161                                } => {
1162                                    if let PlanNode::SeqScan { table } = inner.as_ref() {
1163                                        (Some(table.as_str()), Some(predicate))
1164                                    } else {
1165                                        (None, None)
1166                                    }
1167                                }
1168                                _ => (None, None),
1169                            };
1170                        if let Some(table) = table_opt {
1171                            if let Some(result) =
1172                                self.agg_single_col_fast(table, col, *function, pred_opt)?
1173                            {
1174                                return Ok(result);
1175                            }
1176                        }
1177                    }
1178                }
1179
1180                // Generic path.
1181                let result = self.execute_plan_readonly(input)?;
1182                match result {
1183                    QueryResult::Rows { columns, rows } => match function {
1184                        AggFunc::Count => Ok(QueryResult::Scalar(Value::Int(rows.len() as i64))),
1185                        AggFunc::CountDistinct => {
1186                            let col = field.as_ref().ok_or("count distinct requires field")?;
1187                            let idx = columns
1188                                .iter()
1189                                .position(|c| c == col)
1190                                .ok_or("col not found")?;
1191                            let mut seen = std::collections::HashSet::new();
1192                            for row in &rows {
1193                                let v = &row[idx];
1194                                if !v.is_empty() {
1195                                    seen.insert(v.clone());
1196                                }
1197                            }
1198                            Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
1199                        }
1200                        AggFunc::Avg => {
1201                            let col = field.as_ref().ok_or("avg requires field")?;
1202                            let idx = columns
1203                                .iter()
1204                                .position(|c| c == col)
1205                                .ok_or("col not found")?;
1206                            let sum: f64 = rows
1207                                .iter()
1208                                .filter_map(|r| match &r[idx] {
1209                                    Value::Int(v) => Some(*v as f64),
1210                                    Value::Float(v) => Some(*v),
1211                                    _ => None,
1212                                })
1213                                .sum();
1214                            let count = rows.len() as f64;
1215                            Ok(QueryResult::Scalar(Value::Float(sum / count)))
1216                        }
1217                        AggFunc::Sum => {
1218                            let col = field.as_ref().ok_or("sum requires field")?;
1219                            let idx = columns
1220                                .iter()
1221                                .position(|c| c == col)
1222                                .ok_or("col not found")?;
1223                            let mut int_sum: i64 = 0;
1224                            let mut float_sum: f64 = 0.0;
1225                            let mut saw_float = false;
1226                            for r in &rows {
1227                                match &r[idx] {
1228                                    Value::Int(v) => int_sum += *v,
1229                                    Value::Float(v) => {
1230                                        float_sum += *v;
1231                                        saw_float = true;
1232                                    }
1233                                    _ => {}
1234                                }
1235                            }
1236                            let result = if saw_float {
1237                                Value::Float(float_sum + int_sum as f64)
1238                            } else {
1239                                Value::Int(int_sum)
1240                            };
1241                            Ok(QueryResult::Scalar(result))
1242                        }
1243                        AggFunc::Min | AggFunc::Max => {
1244                            let col = field.as_ref().ok_or("min/max requires field")?;
1245                            let idx = columns
1246                                .iter()
1247                                .position(|c| c == col)
1248                                .ok_or("col not found")?;
1249                            let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
1250                            let result = if *function == AggFunc::Min {
1251                                vals.into_iter().min().cloned()
1252                            } else {
1253                                vals.into_iter().max().cloned()
1254                            };
1255                            Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
1256                        }
1257                    },
1258                    _ => Err("aggregate requires row input".into()),
1259                }
1260            }
1261
1262            PlanNode::Distinct { input } => {
1263                let result = self.execute_plan_readonly(input)?;
1264                match result {
1265                    QueryResult::Rows { columns, rows } => {
1266                        let mut seen = std::collections::HashSet::new();
1267                        let mut unique_rows = Vec::new();
1268                        for row in rows {
1269                            if seen.insert(row.clone()) {
1270                                unique_rows.push(row);
1271                            }
1272                        }
1273                        Ok(QueryResult::Rows {
1274                            columns,
1275                            rows: unique_rows,
1276                        })
1277                    }
1278                    other => Ok(other),
1279                }
1280            }
1281
1282            PlanNode::GroupBy {
1283                input,
1284                keys,
1285                aggregates,
1286                having,
1287            } => {
1288                let result = self.execute_plan_readonly(input)?;
1289                match result {
1290                    QueryResult::Rows { columns, rows } => {
1291                        let key_indices: Vec<usize> = keys
1292                            .iter()
1293                            .map(|k| {
1294                                columns.iter().position(|c| c == k).ok_or_else(|| {
1295                                    QueryError::ColumnNotFound {
1296                                        table: String::new(),
1297                                        column: k.clone(),
1298                                    }
1299                                })
1300                            })
1301                            .collect::<Result<Vec<_>, _>>()?;
1302
1303                        let agg_field_indices: Vec<usize> = aggregates
1304                            .iter()
1305                            .map(|a| {
1306                                if a.field == "*" {
1307                                    Ok(usize::MAX)
1308                                } else {
1309                                    columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1310                                        QueryError::ColumnNotFound {
1311                                            table: String::new(),
1312                                            column: a.field.clone(),
1313                                        }
1314                                    })
1315                                }
1316                            })
1317                            .collect::<Result<Vec<_>, _>>()?;
1318
1319                        let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1320                            rustc_hash::FxHashMap::default();
1321                        let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1322                        for (ri, row) in rows.iter().enumerate() {
1323                            let key: Vec<Value> =
1324                                key_indices.iter().map(|&i| row[i].clone()).collect();
1325                            match group_map.get(&key) {
1326                                Some(&idx) => groups[idx].1.push(ri),
1327                                None => {
1328                                    let idx = groups.len();
1329                                    group_map.insert(key.clone(), idx);
1330                                    groups.push((key, vec![ri]));
1331                                }
1332                            }
1333                        }
1334
1335                        let mut out_columns: Vec<String> = keys.clone();
1336                        for agg in aggregates.iter() {
1337                            out_columns.push(agg.output_name.clone());
1338                        }
1339
1340                        let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1341                        for (key_vals, row_indices) in &groups {
1342                            let mut row = key_vals.clone();
1343                            for (ai, agg) in aggregates.iter().enumerate() {
1344                                let col_idx = agg_field_indices[ai];
1345                                let val = compute_group_aggregate(
1346                                    agg.function,
1347                                    &rows,
1348                                    row_indices,
1349                                    col_idx,
1350                                );
1351                                row.push(val);
1352                            }
1353                            out_rows.push(row);
1354                        }
1355
1356                        if let Some(having_expr) = having {
1357                            out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1358                        }
1359
1360                        Ok(QueryResult::Rows {
1361                            columns: out_columns,
1362                            rows: out_rows,
1363                        })
1364                    }
1365                    _ => Err("group by requires row input".into()),
1366                }
1367            }
1368
1369            PlanNode::NestedLoopJoin {
1370                left,
1371                right,
1372                on,
1373                kind,
1374            } => {
1375                let left_result = self.execute_plan_readonly(left)?;
1376                let right_result = self.execute_plan_readonly(right)?;
1377                let (left_columns, left_rows) = match left_result {
1378                    QueryResult::Rows { columns, rows } => (columns, rows),
1379                    _ => return Err("join left side must produce rows".into()),
1380                };
1381                let (right_columns, right_rows) = match right_result {
1382                    QueryResult::Rows { columns, rows } => (columns, rows),
1383                    _ => return Err("join right side must produce rows".into()),
1384                };
1385
1386                if !matches!(kind, JoinKind::Cross) {
1387                    if let Some(pred) = on {
1388                        if let Some((l_idx, r_idx)) =
1389                            try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1390                        {
1391                            let result = hash_join(
1392                                left_columns,
1393                                left_rows,
1394                                right_columns,
1395                                right_rows,
1396                                l_idx,
1397                                r_idx,
1398                                *kind,
1399                            );
1400                            if let QueryResult::Rows { ref rows, .. } = result {
1401                                check_join_limit(rows.len())?;
1402                            }
1403                            return Ok(result);
1404                        }
1405                    }
1406                }
1407
1408                let n_left = left_columns.len();
1409                let n_right = right_columns.len();
1410                let mut columns = Vec::with_capacity(n_left + n_right);
1411                columns.extend(left_columns);
1412                columns.extend(right_columns);
1413
1414                let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1415                let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1416
1417                for left_row in &left_rows {
1418                    let mut matched = false;
1419                    for right_row in &right_rows {
1420                        combined.clear();
1421                        combined.extend_from_slice(left_row);
1422                        combined.extend_from_slice(right_row);
1423                        let keep = match kind {
1424                            JoinKind::Cross => true,
1425                            JoinKind::Inner | JoinKind::LeftOuter => match on {
1426                                Some(pred) => eval_predicate(pred, &combined, &columns),
1427                                None => true,
1428                            },
1429                            JoinKind::RightOuter => {
1430                                unreachable!("planner rewrites RightOuter to LeftOuter")
1431                            }
1432                        };
1433                        if keep {
1434                            rows.push(combined.clone());
1435                            check_join_limit(rows.len())?;
1436                            matched = true;
1437                        }
1438                    }
1439                    if !matched && matches!(kind, JoinKind::LeftOuter) {
1440                        let mut row = Vec::with_capacity(n_left + n_right);
1441                        row.extend_from_slice(left_row);
1442                        row.resize(n_left + n_right, Value::Empty);
1443                        rows.push(row);
1444                        check_join_limit(rows.len())?;
1445                    }
1446                }
1447
1448                Ok(QueryResult::Rows { columns, rows })
1449            }
1450
1451            PlanNode::Window { input, windows } => {
1452                let result = self.execute_plan_readonly(input)?;
1453                execute_window(result, windows)
1454            }
1455
1456            PlanNode::Union { left, right, all } => {
1457                let left_result = self.execute_plan_readonly(left)?;
1458                let right_result = self.execute_plan_readonly(right)?;
1459                let (left_cols, left_rows) = match left_result {
1460                    QueryResult::Rows { columns, rows } => (columns, rows),
1461                    _ => return Err("UNION requires query results on left side".into()),
1462                };
1463                let (_, right_rows) = match right_result {
1464                    QueryResult::Rows { columns, rows } => (columns, rows),
1465                    _ => return Err("UNION requires query results on right side".into()),
1466                };
1467                let mut combined = left_rows;
1468                if *all {
1469                    combined.extend(right_rows);
1470                } else {
1471                    let mut seen = std::collections::HashSet::new();
1472                    for row in &combined {
1473                        seen.insert(row.clone());
1474                    }
1475                    for row in right_rows {
1476                        if seen.insert(row.clone()) {
1477                            combined.push(row);
1478                        }
1479                    }
1480                }
1481                Ok(QueryResult::Rows {
1482                    columns: left_cols,
1483                    rows: combined,
1484                })
1485            }
1486
1487            PlanNode::Explain { input } => {
1488                let text = format_plan_tree(input, 0);
1489                Ok(QueryResult::Rows {
1490                    columns: vec!["plan".to_string()],
1491                    rows: text
1492                        .lines()
1493                        .map(|line| vec![Value::Str(line.to_string())])
1494                        .collect(),
1495                })
1496            }
1497
1498            // All write variants — caller must escalate to the write lock.
1499            PlanNode::Insert { .. }
1500            | PlanNode::Update { .. }
1501            | PlanNode::Delete { .. }
1502            | PlanNode::Upsert { .. }
1503            | PlanNode::CreateTable { .. }
1504            | PlanNode::AlterTable { .. }
1505            | PlanNode::DropTable { .. }
1506            | PlanNode::CreateView { .. }
1507            | PlanNode::RefreshView { .. }
1508            | PlanNode::DropView { .. } => Err(QueryError::ReadonlyNeedsWrite),
1509        }
1510    }
1511
1512    /// `&self` variant of [`Engine::materialize_subqueries`]. Used by the
1513    /// read path so `Filter` predicates with `InSubquery`/`ExistsSubquery`
1514    /// children can evaluate their inner queries without taking the write
1515    /// lock. Inner queries that would themselves need a write (e.g. dirty
1516    /// view) escalate via [`READONLY_NEEDS_WRITE`] just like the top-level
1517    /// read path does.
1518    fn materialize_subqueries_readonly(&self, expr: &Expr) -> Result<Expr, QueryError> {
1519        match expr {
1520            Expr::InSubquery {
1521                expr: inner,
1522                subquery,
1523                negated,
1524            } => {
1525                if is_correlated_subquery(subquery, &self.catalog) {
1526                    // Pass through — will be materialized per-row in the
1527                    // Filter handler's correlated subquery path.
1528                    let inner = self.materialize_subqueries_readonly(inner)?;
1529                    return Ok(Expr::InSubquery {
1530                        expr: Box::new(inner),
1531                        subquery: subquery.clone(),
1532                        negated: *negated,
1533                    });
1534                }
1535                let inner = self.materialize_subqueries_readonly(inner)?;
1536                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1537                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1538                let result = self.execute_plan_readonly(&sub_plan)?;
1539                let values = match result {
1540                    QueryResult::Rows { rows, .. } => rows
1541                        .into_iter()
1542                        .filter_map(|mut row| {
1543                            if row.is_empty() {
1544                                None
1545                            } else {
1546                                Some(value_to_expr(row.swap_remove(0)))
1547                            }
1548                        })
1549                        .collect(),
1550                    _ => Vec::new(),
1551                };
1552                Ok(Expr::InList {
1553                    expr: Box::new(inner),
1554                    list: values,
1555                    negated: *negated,
1556                })
1557            }
1558            Expr::ExistsSubquery { subquery, negated } => {
1559                if is_correlated_subquery(subquery, &self.catalog) {
1560                    return Ok(expr.clone());
1561                }
1562                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1563                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1564                let result = self.execute_plan_readonly(&sub_plan)?;
1565                let has_rows = match result {
1566                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
1567                    _ => false,
1568                };
1569                let truth = if *negated { !has_rows } else { has_rows };
1570                Ok(Expr::Literal(Literal::Bool(truth)))
1571            }
1572            Expr::BinaryOp(l, op, r) => {
1573                let l = self.materialize_subqueries_readonly(l)?;
1574                let r = self.materialize_subqueries_readonly(r)?;
1575                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1576            }
1577            Expr::UnaryOp(op, inner) => {
1578                let inner = self.materialize_subqueries_readonly(inner)?;
1579                Ok(Expr::UnaryOp(*op, Box::new(inner)))
1580            }
1581            Expr::Case { whens, else_expr } => {
1582                let whens = whens
1583                    .iter()
1584                    .map(|(c, r)| {
1585                        let c = self.materialize_subqueries_readonly(c)?;
1586                        let r = self.materialize_subqueries_readonly(r)?;
1587                        Ok((Box::new(c), Box::new(r)))
1588                    })
1589                    .collect::<Result<Vec<_>, QueryError>>()?;
1590                let else_expr = match else_expr {
1591                    Some(e) => Some(Box::new(self.materialize_subqueries_readonly(e)?)),
1592                    None => None,
1593                };
1594                Ok(Expr::Case { whens, else_expr })
1595            }
1596            other => Ok(other.clone()),
1597        }
1598    }
1599
1600    /// Per-row materialisation of correlated subqueries. For each row in the
1601    /// outer query, substitute outer column references in the subquery's
1602    /// filter with the current row's literal values, execute the modified
1603    /// subquery, and return the result as an InList or Bool literal.
1604    fn materialize_correlated_for_row_readonly(
1605        &self,
1606        expr: &Expr,
1607        outer_row: &[Value],
1608        outer_columns: &[String],
1609    ) -> Result<Expr, QueryError> {
1610        match expr {
1611            Expr::InSubquery {
1612                expr: inner,
1613                subquery,
1614                negated,
1615            } => {
1616                let inner =
1617                    self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1618                let mut sub = *subquery.clone();
1619                if let Some(ref filter) = sub.filter {
1620                    sub.filter = Some(substitute_outer_refs(
1621                        filter,
1622                        &sub.source,
1623                        &self.catalog,
1624                        outer_row,
1625                        outer_columns,
1626                    ));
1627                }
1628                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1629                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1630                let result = self.execute_plan_readonly(&sub_plan)?;
1631                let values = match result {
1632                    QueryResult::Rows { rows, .. } => rows
1633                        .into_iter()
1634                        .filter_map(|mut row| {
1635                            if row.is_empty() {
1636                                None
1637                            } else {
1638                                Some(value_to_expr(row.swap_remove(0)))
1639                            }
1640                        })
1641                        .collect(),
1642                    _ => Vec::new(),
1643                };
1644                Ok(Expr::InList {
1645                    expr: Box::new(inner),
1646                    list: values,
1647                    negated: *negated,
1648                })
1649            }
1650            Expr::ExistsSubquery { subquery, negated } => {
1651                let mut sub = *subquery.clone();
1652                if let Some(ref filter) = sub.filter {
1653                    sub.filter = Some(substitute_outer_refs(
1654                        filter,
1655                        &sub.source,
1656                        &self.catalog,
1657                        outer_row,
1658                        outer_columns,
1659                    ));
1660                }
1661                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1662                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1663                let result = self.execute_plan_readonly(&sub_plan)?;
1664                let has_rows = match result {
1665                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
1666                    _ => false,
1667                };
1668                let truth = if *negated { !has_rows } else { has_rows };
1669                Ok(Expr::Literal(Literal::Bool(truth)))
1670            }
1671            Expr::BinaryOp(l, op, r) => {
1672                let l =
1673                    self.materialize_correlated_for_row_readonly(l, outer_row, outer_columns)?;
1674                let r =
1675                    self.materialize_correlated_for_row_readonly(r, outer_row, outer_columns)?;
1676                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1677            }
1678            Expr::UnaryOp(op, inner) => {
1679                let inner =
1680                    self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1681                Ok(Expr::UnaryOp(*op, Box::new(inner)))
1682            }
1683            other => Ok(other.clone()),
1684        }
1685    }
1686
1687    pub fn catalog(&self) -> &Catalog {
1688        &self.catalog
1689    }
1690
1691    pub fn catalog_mut(&mut self) -> &mut Catalog {
1692        &mut self.catalog
1693    }
1694}