Skip to main content

powdb_query/executor/
mod.rs

1//! PowDB query executor.
2
3// Submodules that don't use macros defined in this file.
4mod compiled;
5mod eval;
6
7use crate::ast::*;
8use crate::canonicalize::canonicalize;
9use crate::plan::*;
10use crate::plan_cache::PlanCache;
11use crate::planner;
12use crate::result::{QueryError, QueryResult};
13use powdb_storage::catalog::Catalog;
14use powdb_storage::row::{decode_column, decode_row, RowLayout};
15use powdb_storage::types::*;
16use powdb_storage::view::ViewRegistry;
17
18use std::io;
19use std::path::Path;
20use std::sync::Mutex;
21use std::time::Instant;
22use tracing::{error, info, Level};
23
24use self::compiled::*;
25use self::eval::*;
26
27/// Legacy sentinel string constant — kept for backward compatibility with
28/// any external code matching on the string representation. New code should
29/// match on `QueryError::ReadonlyNeedsWrite` directly.
30pub const READONLY_NEEDS_WRITE: &str = "__POWDB_READONLY_NEEDS_WRITE__";
31
32/// Plan cache capacity. Bench workloads fill ~15 slots; real apps will sit
33/// comfortably in 256. Lookup is O(1), collisions clear the cache (see
34/// `plan_cache::PlanCache::insert`).
35const PLAN_CACHE_CAPACITY: usize = 256;
36
37/// Maximum number of rows a join may produce before the executor aborts.
38/// Prevents Cartesian-product blowups (e.g. `T cross join T` on 10K rows
39/// would produce 100M rows in memory without this cap).
40pub(super) const MAX_JOIN_ROWS: usize = 1_000_000;
41
42/// Maximum number of rows that may be materialized for sorting.
43/// Queries that exceed this should add a LIMIT clause to narrow the input
44/// before sorting.
45pub(super) const MAX_SORT_ROWS: usize = 10_000_000;
46
47#[inline]
48pub(super) fn check_join_limit(row_count: usize) -> Result<(), QueryError> {
49    if row_count > MAX_JOIN_ROWS {
50        return Err(QueryError::JoinLimitExceeded);
51    }
52    Ok(())
53}
54
55// ─── Mission D11 Phase 1: scalar hot-loop helpers ─────────────────────────
56//
57// These macros expand into the scan body of `agg_single_col_fast` and sit
58// inside the `for_each_row_raw` closure. They exist to:
59//
60//   1. Split the loop on presence of a predicate *outside* the hot body,
61//      so the no-predicate path (agg_sum/agg_min/agg_max bench workloads)
62//      never pays the `Option<CompiledPredicate>` branch per row.
63//   2. Drop two bounds checks per row by reading the null bitmap byte
64//      and the 8-byte value via raw pointer casts.
65//
66// SAFETY (shared across every call site below):
67//
68//   - `$bmp_byte` is `col_idx / 8` where `col_idx < n_cols`, and the row
69//     encoding stores `bitmap_size = n_cols.div_ceil(8)` bytes of bitmap
70//     starting at offset 2. So `2 + $bmp_byte < 2 + bitmap_size ≤ row_len`
71//     and `get_unchecked(2 + $bmp_byte)` is inside the row slice.
72//   - `$off = 2 + bitmap_size + fixed_offsets[col_idx]` for a fixed-size
73//     column. Every fixed-size column contributes `fixed_size(type_id)`
74//     bytes to the fixed region, so the row always has `[$off .. $off+8]`
75//     available for any i64/f64 column — enforced by the row encoder
76//     (`storage/src/row.rs`) and the schema invariant that a row with a
77//     given schema has `row_len ≥ 2 + bitmap_size + fixed_region_size`.
78//   - Both macros are only invoked from `agg_single_col_fast`, which
79//     early-returns if the column isn't Int/Float (8-byte fixed) and
80//     early-returns if `fast.fixed_offsets[col_idx]` is `None`.
81macro_rules! agg_int_loop {
82    (
83        $self:expr, $table:expr, $pred:expr,
84        $bmp_byte:expr, $bmp_bit:expr, $off:expr,
85        |$v:ident : i64| $body:block
86    ) => {{
87        let bmp_byte = $bmp_byte;
88        let bmp_bit = $bmp_bit;
89        let off = $off;
90        if let Some(pred) = &$pred {
91            $self
92                .catalog
93                .for_each_row_raw($table, |_rid, data| {
94                    if !pred(data) {
95                        return;
96                    }
97                    // Bounds guard: skip corrupt/truncated rows that are too
98                    // short to contain the bitmap byte or the 8-byte value.
99                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
100                        return;
101                    }
102                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
103                    // The bitmap byte lives at offset 2..2+bitmap_size in the
104                    // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
105                    // Corrupt rows are rejected by the bounds guard.
106                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
107                    if (bmp >> bmp_bit) & 1 == 1 {
108                        return;
109                    }
110                    // SAFETY: `off + 8 <= data.len()` is checked above.
111                    // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
112                    // to an 8-byte i64 in the fixed-size region of the row.
113                    // The pointer cast is valid because we read exactly 8
114                    // bytes via from_le_bytes. Corrupt rows are rejected by
115                    // the bounds guard.
116                    let $v: i64 =
117                        unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
118                    $body
119                })
120                .map_err(|e| QueryError::StorageError(e.to_string()))?;
121        } else {
122            $self
123                .catalog
124                .for_each_row_raw($table, |_rid, data| {
125                    // Bounds guard: skip corrupt/truncated rows.
126                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
127                        return;
128                    }
129                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
130                    // See the predicate branch for the full invariant.
131                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
132                    if (bmp >> bmp_bit) & 1 == 1 {
133                        return;
134                    }
135                    // SAFETY: `off + 8 <= data.len()` is checked above.
136                    // See the predicate branch for the full invariant.
137                    let $v: i64 =
138                        unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
139                    $body
140                })
141                .map_err(|e| QueryError::StorageError(e.to_string()))?;
142        }
143    }};
144}
145
146macro_rules! agg_float_loop {
147    (
148        $self:expr, $table:expr, $pred:expr,
149        $bmp_byte:expr, $bmp_bit:expr, $off:expr,
150        |$v:ident : f64| $body:block
151    ) => {{
152        let bmp_byte = $bmp_byte;
153        let bmp_bit = $bmp_bit;
154        let off = $off;
155        if let Some(pred) = &$pred {
156            $self
157                .catalog
158                .for_each_row_raw($table, |_rid, data| {
159                    if !pred(data) {
160                        return;
161                    }
162                    // Bounds guard: skip corrupt/truncated rows that are too
163                    // short to contain the bitmap byte or the 8-byte value.
164                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
165                        return;
166                    }
167                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
168                    // The bitmap byte lives at offset 2..2+bitmap_size in the
169                    // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
170                    // Corrupt rows are rejected by the bounds guard.
171                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
172                    if (bmp >> bmp_bit) & 1 == 1 {
173                        return;
174                    }
175                    // SAFETY: `off + 8 <= data.len()` is checked above.
176                    // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
177                    // to an 8-byte f64 in the fixed-size region of the row.
178                    // The pointer cast is valid because we read exactly 8
179                    // bytes via from_le_bytes. Corrupt rows are rejected by
180                    // the bounds guard.
181                    let $v: f64 =
182                        unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
183                    $body
184                })
185                .map_err(|e| QueryError::StorageError(e.to_string()))?;
186        } else {
187            $self
188                .catalog
189                .for_each_row_raw($table, |_rid, data| {
190                    // Bounds guard: skip corrupt/truncated rows.
191                    if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
192                        return;
193                    }
194                    // SAFETY: `2 + bmp_byte < data.len()` is checked above.
195                    // See the predicate branch for the full invariant.
196                    let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
197                    if (bmp >> bmp_bit) & 1 == 1 {
198                        return;
199                    }
200                    // SAFETY: `off + 8 <= data.len()` is checked above.
201                    // See the predicate branch for the full invariant.
202                    let $v: f64 =
203                        unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
204                    $body
205                })
206                .map_err(|e| QueryError::StorageError(e.to_string()))?;
207        }
208    }};
209}
210
211// Submodules that use the macros above — must be declared after macro_rules!.
212mod plan_exec;
213mod prepared;
214
215#[cfg(test)]
216mod tests;
217
218// Re-exports for the public API
219pub use self::prepared::PreparedQuery;
220
221use self::plan_exec::{
222    compute_group_aggregate, execute_window, format_plan_tree, hash_join,
223    lower_unindexed_range_scans, range_matches, synthesize_range_predicate,
224    try_extract_equi_join_keys,
225};
226
227/// Mission infra-1: classify a parsed statement as read-only vs. mutating.
228/// Used by [`Engine::execute_powql_readonly`] and by the server handler
229/// to decide between the RwLock reader and writer sides. `Union` recurses
230/// because each side can independently be read/write (though in practice
231/// both sides are reads — the parser only builds Union from query shapes).
232pub fn is_read_only_statement(stmt: &Statement) -> bool {
233    match stmt {
234        Statement::Query(_) => true,
235        Statement::Union(u) => is_read_only_statement(&u.left) && is_read_only_statement(&u.right),
236        Statement::Insert(_)
237        | Statement::Upsert(_)
238        | Statement::UpdateQuery(_)
239        | Statement::DeleteQuery(_)
240        | Statement::CreateType(_)
241        | Statement::AlterTable(_)
242        | Statement::DropTable(_)
243        | Statement::CreateView(_)
244        | Statement::RefreshView(_)
245        | Statement::DropView(_) => false,
246        Statement::Begin | Statement::Commit | Statement::Rollback => false,
247        Statement::Explain(inner) => is_read_only_statement(inner),
248    }
249}
250
251pub struct Engine {
252    catalog: Catalog,
253    /// Mission D9 — cached parsed+planned query trees keyed by canonical
254    /// hash. Saves the ~3μs parse+plan cost on repeat queries that differ
255    /// only in literal values.
256    ///
257    /// Mission infra-1: wrapped in `Mutex` so the read path can be driven
258    /// by `&self`. The critical section is extremely short — a single
259    /// hashmap lookup + plan clone on a hit, or a single insert on a miss.
260    /// A full `RwLock` would be over-engineered here; the contention window
261    /// is smaller than the read-path scan work it gates.
262    plan_cache: Mutex<PlanCache>,
263    /// Mission C Phase 13: reusable `Vec<Value>` scratch buffer for the
264    /// prepared-insert fast path. `execute_prepared` used to allocate a
265    /// fresh `vec![Value::Empty; n_cols]` on every insert; recycling this
266    /// buffer shaves one heap alloc per row on `insert_batch_1k`.
267    insert_values_scratch: Vec<Value>,
268    /// Materialized view registry: tracks view definitions, dependencies,
269    /// and dirty state. Views are backed by regular catalog tables; this
270    /// registry adds the lifecycle metadata.
271    view_registry: ViewRegistry,
272    in_transaction: bool,
273}
274
275impl Engine {
276    /// Open or create a PowDB engine rooted at `data_dir`.
277    ///
278    /// If the directory already contains a catalog, it is reopened.
279    /// Otherwise a fresh empty database is created.
280    ///
281    /// # Examples
282    ///
283    /// ```
284    /// use powdb_query::executor::Engine;
285    ///
286    /// let dir = tempfile::tempdir().unwrap();
287    /// let engine = Engine::new(dir.path()).unwrap();
288    /// // Engine is ready — the directory now contains a catalog.
289    /// ```
290    pub fn new(data_dir: &Path) -> io::Result<Self> {
291        std::fs::create_dir_all(data_dir)?;
292        // Try to reopen an existing database first; only create a fresh
293        // catalog when there isn't one already on disk.
294        let catalog = match Catalog::open(data_dir) {
295            Ok(c) => {
296                info!(data_dir = %data_dir.display(), "engine reopened existing database");
297                c
298            }
299            Err(e) if e.kind() == io::ErrorKind::NotFound => {
300                info!(data_dir = %data_dir.display(), "engine initialized fresh database");
301                Catalog::create(data_dir)?
302            }
303            Err(e) => return Err(e),
304        };
305        let view_registry =
306            ViewRegistry::open(data_dir).unwrap_or_else(|_| ViewRegistry::new(data_dir));
307        Ok(Engine {
308            catalog,
309            plan_cache: Mutex::new(PlanCache::new(PLAN_CACHE_CAPACITY)),
310            insert_values_scratch: Vec::new(),
311            view_registry,
312            in_transaction: false,
313        })
314    }
315
316    /// Parse + plan + execute a PowQL query.
317    ///
318    /// # Examples
319    ///
320    /// ```
321    /// use powdb_query::executor::Engine;
322    /// use powdb_query::result::QueryResult;
323    ///
324    /// let dir = tempfile::tempdir().unwrap();
325    /// let mut engine = Engine::new(dir.path()).unwrap();
326    ///
327    /// // Create a table and insert a row.
328    /// engine.execute_powql("type User { required name: str, age: int }").unwrap();
329    /// engine.execute_powql(r#"insert User { name := "Alice", age := 30 }"#).unwrap();
330    ///
331    /// // Query rows back.
332    /// let result = engine.execute_powql("User").unwrap();
333    /// assert_eq!(result.row_count(), 1);
334    /// ```
335    ///
336    /// Mission D6 — tracing collapse: the previous implementation ran 4
337    /// `Instant::now()` + 3 `elapsed().as_micros()` calls + formatted an
338    /// `info!` span on every query, even when tracing was disabled. On a
339    /// sub-microsecond `point_lookup_indexed` call that overhead was
340    /// 100-200ns — 20%+ of the whole query. We now measure time only when
341    /// INFO is actually enabled via `tracing::enabled!`, and we moved the
342    /// noisy `debug!(?plan)` line behind the same gate so the Debug
343    /// formatter can't run unconditionally either.
344    ///
345    /// Mission D9 — plan cache: on the hot path we canonicalise the query
346    /// text (lex + FNV-1a hash with literal values stripped), check the
347    /// cache, and on a hit substitute the new literals into a clone of the
348    /// cached plan. This skips re-lexing, re-parsing, and re-planning —
349    /// around 3μs per call on bench workloads. On a miss we plan as before
350    /// and insert the plan under its canonical hash.
351    pub fn execute_powql(&mut self, input: &str) -> Result<QueryResult, QueryError> {
352        // Hot path: tracing disabled. Zero syscalls, zero formatting.
353        if !tracing::enabled!(Level::INFO) {
354            // D9: try the plan cache first. Canonicalisation lexes the
355            // query once; on a hit we skip the parser and planner entirely.
356            if let Ok((hash, literals)) = canonicalize(input) {
357                let cached = self
358                    .plan_cache
359                    .lock()
360                    .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
361                    .get_with_substitution(hash, &literals);
362                if let Some(plan) = cached {
363                    let plan = lower_unindexed_range_scans(&self.catalog, &plan);
364                    let result = self.execute_plan(&plan);
365                    // Mission B (post-review): statement-boundary WAL
366                    // group commit. Catalog::wal_log now only appends;
367                    // the fsync happens here exactly once per statement.
368                    // `sync_wal` is a no-op when nothing was buffered
369                    // (pure reads pay zero fsync).
370                    if !self.in_transaction {
371                        self.catalog
372                            .sync_wal()
373                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
374                    }
375                    return result;
376                }
377                // Miss — plan, insert, execute.
378                return match planner::plan(input) {
379                    Ok(plan) => {
380                        self.plan_cache
381                            .lock()
382                            .map_err(|e| {
383                                QueryError::Execution(format!("plan cache lock poisoned: {e}"))
384                            })?
385                            .insert(hash, plan.clone());
386                        let plan = lower_unindexed_range_scans(&self.catalog, &plan);
387                        let result = self.execute_plan(&plan);
388                        if !self.in_transaction {
389                            self.catalog
390                                .sync_wal()
391                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
392                        }
393                        result
394                    }
395                    Err(e) => Err(QueryError::Parse(e.to_string())),
396                };
397            }
398            // Lex error — fall through to the planner so the caller gets a
399            // consistent error shape.
400            return match planner::plan(input) {
401                Ok(plan) => {
402                    let plan = lower_unindexed_range_scans(&self.catalog, &plan);
403                    let result = self.execute_plan(&plan);
404                    if !self.in_transaction {
405                        self.catalog
406                            .sync_wal()
407                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
408                    }
409                    result
410                }
411                Err(e) => Err(QueryError::Parse(e.to_string())),
412            };
413        }
414
415        // Instrumented path — only taken under explicit tracing subscribers.
416        let total_start = Instant::now();
417        let plan_start = Instant::now();
418        let plan = planner::plan(input).map_err(|e| {
419            let msg = e.to_string();
420            error!(query = %input, error = %msg, "query plan failed");
421            QueryError::Parse(msg)
422        })?;
423        let plan_us = plan_start.elapsed().as_micros();
424
425        let exec_start = Instant::now();
426        let plan = lower_unindexed_range_scans(&self.catalog, &plan);
427        let result = self.execute_plan(&plan);
428        if !self.in_transaction {
429            self.catalog
430                .sync_wal()
431                .map_err(|e| QueryError::StorageError(e.to_string()))?;
432        }
433        let exec_us = exec_start.elapsed().as_micros();
434
435        let total_us = total_start.elapsed().as_micros();
436        match &result {
437            Ok(r) => {
438                info!(
439                    query = %input,
440                    plan_us = plan_us,
441                    exec_us = exec_us,
442                    total_us = total_us,
443                    rows = r.row_count(),
444                    "query ok"
445                );
446            }
447            Err(e) => {
448                error!(
449                    query = %input,
450                    plan_us = plan_us,
451                    exec_us = exec_us,
452                    error = %e,
453                    "query failed"
454                );
455            }
456        }
457        result
458    }
459
460    /// Plan cache stats — useful for benches and debugging.
461    pub fn plan_cache_stats(&self) -> (u64, u64, usize) {
462        let cache = self.plan_cache.lock().unwrap_or_else(|e| e.into_inner());
463        (cache.hits, cache.misses, cache.len())
464    }
465
466    /// Mission infra-1: read-only entry point.
467    ///
468    /// Parses + plans + executes a PowQL query using only a shared borrow
469    /// on the engine. Rejects any statement that would mutate state
470    /// (Insert/Update/Delete/CreateTable/AlterTable/DropTable/CreateView/
471    /// RefreshView/DropView) by returning [`READONLY_NEEDS_WRITE`] so the
472    /// caller can escalate to the write lock.
473    ///
474    /// Also returns [`READONLY_NEEDS_WRITE`] if a materialized view in the
475    /// query is dirty — refreshing one requires `&mut self`, so the caller
476    /// must retake the write lock for the first refresh.
477    ///
478    /// This method is the concurrent-read fast path behind
479    /// `Arc<RwLock<Engine>>`: multiple threads can call it simultaneously
480    /// under a shared `.read()` lock and each will scan independently.
481    pub fn execute_powql_readonly(&self, input: &str) -> Result<QueryResult, QueryError> {
482        // Parse the statement first so we can classify read vs. write
483        // without touching the catalog. This is the same lex+parse cost
484        // the hot path would pay anyway.
485        let stmt = crate::parser::parse(input).map_err(|e| QueryError::Parse(e.to_string()))?;
486        if !is_read_only_statement(&stmt) {
487            return Err(QueryError::ReadonlyNeedsWrite);
488        }
489
490        // Try the plan cache first — identical hash scheme to
491        // `execute_powql` so both paths share cache state. The mutex
492        // section is just a hashmap lookup + plan clone.
493        if let Ok((hash, literals)) = canonicalize(input) {
494            let cached = self
495                .plan_cache
496                .lock()
497                .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
498                .get_with_substitution(hash, &literals);
499            if let Some(plan) = cached {
500                let plan = lower_unindexed_range_scans(&self.catalog, &plan);
501                return self.execute_plan_readonly(&plan);
502            }
503            // Miss: plan + insert + execute. The planner is pure, so this
504            // is safe from `&self`.
505            let plan = crate::planner::plan_statement(stmt)
506                .map_err(|e| QueryError::Parse(e.to_string()))?;
507            self.plan_cache
508                .lock()
509                .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
510                .insert(hash, plan.clone());
511            let plan = lower_unindexed_range_scans(&self.catalog, &plan);
512            return self.execute_plan_readonly(&plan);
513        }
514        // Lex error — fall through to the planner for a consistent error
515        // shape (though `parse` above would usually have caught it).
516        let plan =
517            crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
518        let plan = lower_unindexed_range_scans(&self.catalog, &plan);
519        self.execute_plan_readonly(&plan)
520    }
521
522    /// Read-only version of [`Engine::execute_plan`]. Dispatches the
523    /// read-path plan variants by calling `&self` helpers and errors with
524    /// [`READONLY_NEEDS_WRITE`] on any write variant. This is the
525    /// recursion target for composite read plans under the RwLock reader.
526    ///
527    /// The dispatch mirrors `execute_plan` for the read branches but does
528    /// not carry any of the fast-paths that need `&mut self` (e.g. plan-
529    /// cache mutation on inner subqueries is handled via the shared mutex
530    /// in [`Engine::execute_powql_readonly`]; in-flight subquery
531    /// materialisation uses [`Engine::materialize_subqueries_readonly`]).
532    fn execute_plan_readonly(&self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
533        match plan {
534            PlanNode::SeqScan { table } => {
535                // Dirty view means we'd need to refresh it — can't do that
536                // under `&self`. Escalate to the write path.
537                if self.view_registry.is_dirty(table) {
538                    return Err(QueryError::ReadonlyNeedsWrite);
539                }
540                let schema = self
541                    .catalog
542                    .schema(table)
543                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
544                    .clone();
545                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
546                let rows: Vec<Vec<Value>> = self
547                    .catalog
548                    .scan(table)
549                    .map_err(|e| e.to_string())?
550                    .map(|(_, row)| row)
551                    .collect();
552                Ok(QueryResult::Rows { columns, rows })
553            }
554
555            PlanNode::AliasScan { table, alias } => {
556                let schema = self
557                    .catalog
558                    .schema(table)
559                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
560                    .clone();
561                let columns: Vec<String> = schema
562                    .columns
563                    .iter()
564                    .map(|c| format!("{alias}.{}", c.name))
565                    .collect();
566                let rows: Vec<Vec<Value>> = self
567                    .catalog
568                    .scan(table)
569                    .map_err(|e| e.to_string())?
570                    .map(|(_, row)| row)
571                    .collect();
572                Ok(QueryResult::Rows { columns, rows })
573            }
574
575            PlanNode::IndexScan { table, column, key } => {
576                let schema = self
577                    .catalog
578                    .schema(table)
579                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
580                    .clone();
581                let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
582                let key_value = literal_to_value(key)?;
583                let tbl = self
584                    .catalog
585                    .get_table(table)
586                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
587
588                if tbl.has_index(column) {
589                    // Use index_lookup_all to handle both unique and
590                    // non-unique indexes — returns all matching RowIds.
591                    let rids = tbl.index_lookup_all(column, &key_value);
592                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
593                    for rid in rids {
594                        if let Some(data) = tbl.heap.get(rid) {
595                            rows.push(decode_row(&tbl.schema, &data));
596                        }
597                    }
598                    return Ok(QueryResult::Rows { columns, rows });
599                }
600
601                // No index: synthetic eq predicate + compiled scan.
602                let fast = FastLayout::new(&schema);
603                let synth_pred = Expr::BinaryOp(
604                    Box::new(Expr::Field(column.clone())),
605                    BinOp::Eq,
606                    Box::new(key.clone()),
607                );
608                if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, &schema) {
609                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
610                    self.catalog
611                        .for_each_row_raw(table, |_rid, data| {
612                            if compiled(data) {
613                                rows.push(decode_row(&schema, data));
614                            }
615                        })
616                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
617                    return Ok(QueryResult::Rows { columns, rows });
618                }
619
620                // Last resort: slow eq-check.
621                let col_idx =
622                    schema
623                        .column_index(column)
624                        .ok_or_else(|| QueryError::ColumnNotFound {
625                            table: String::new(),
626                            column: column.clone(),
627                        })?;
628                let rows: Vec<Vec<Value>> = tbl
629                    .scan()
630                    .filter_map(|(_, row)| {
631                        if row[col_idx] == key_value {
632                            Some(row)
633                        } else {
634                            None
635                        }
636                    })
637                    .collect();
638                Ok(QueryResult::Rows { columns, rows })
639            }
640
641            PlanNode::RangeScan {
642                table,
643                column,
644                start,
645                end,
646            } => {
647                let tbl = self
648                    .catalog
649                    .get_table(table)
650                    .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
651                let columns: Vec<String> =
652                    tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
653                let schema = tbl.schema.clone();
654
655                let start_val = match start {
656                    Some((expr, _)) => Some(literal_to_value(expr)?),
657                    None => None,
658                };
659                let end_val = match end {
660                    Some((expr, _)) => Some(literal_to_value(expr)?),
661                    None => None,
662                };
663                let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
664                let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
665
666                // Range scans only use the btree fast path for unique indexes.
667                // Non-unique indexes store composite keys that don't compare
668                // directly against raw column values.
669                if tbl.is_index_unique(column) == Some(true) {
670                    if let Some(btree) = tbl.index(column) {
671                        let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
672                            (Some(s), Some(e)) => btree.range(s, e).collect(),
673                            (Some(s), None) => btree.range_from(s),
674                            (None, Some(e)) => btree.range_to(e),
675                            (None, None) => {
676                                // Unbounded both sides — equivalent to seq scan.
677                                let rows: Vec<Vec<Value>> =
678                                    tbl.scan().map(|(_, row)| row).collect();
679                                return Ok(QueryResult::Rows { columns, rows });
680                            }
681                        };
682                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
683                        for (key, rid) in hits {
684                            // Filter for exclusive bounds.
685                            if !start_inclusive {
686                                if let Some(ref s) = start_val {
687                                    if &key == s {
688                                        continue;
689                                    }
690                                }
691                            }
692                            if !end_inclusive {
693                                if let Some(ref e) = end_val {
694                                    if &key == e {
695                                        continue;
696                                    }
697                                }
698                            }
699                            if let Some(data) = tbl.heap.get(rid) {
700                                rows.push(decode_row(&schema, &data));
701                            }
702                        }
703                        return Ok(QueryResult::Rows { columns, rows });
704                    }
705                }
706
707                // Fallback: no index — synthesize the range predicate and scan.
708                let fast = FastLayout::new(&schema);
709                let synth = synthesize_range_predicate(column, start, end);
710                if let Some(compiled) = compile_predicate(&synth, &columns, &fast, &schema) {
711                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
712                    self.catalog
713                        .for_each_row_raw(table, |_rid, data| {
714                            if compiled(data) {
715                                rows.push(decode_row(&schema, data));
716                            }
717                        })
718                        .map_err(|e| QueryError::StorageError(e.to_string()))?;
719                    return Ok(QueryResult::Rows { columns, rows });
720                }
721
722                // Last resort: decoded row eval.
723                let col_idx =
724                    schema
725                        .column_index(column)
726                        .ok_or_else(|| QueryError::ColumnNotFound {
727                            table: String::new(),
728                            column: column.clone(),
729                        })?;
730                let rows: Vec<Vec<Value>> = tbl
731                    .scan()
732                    .filter(|(_, row)| {
733                        range_matches(
734                            &row[col_idx],
735                            &start_val,
736                            start_inclusive,
737                            &end_val,
738                            end_inclusive,
739                        )
740                    })
741                    .map(|(_, row)| row)
742                    .collect();
743                Ok(QueryResult::Rows { columns, rows })
744            }
745
746            PlanNode::Filter { input, predicate } => {
747                // Materialise subqueries using the `&self` variant.
748                // Uncorrelated subqueries are replaced with InList/Bool;
749                // correlated ones are left as InSubquery/ExistsSubquery
750                // for per-row materialisation below.
751                let materialized;
752                let predicate = if contains_subquery(predicate) {
753                    materialized = self.materialize_subqueries_readonly(predicate)?;
754                    &materialized
755                } else {
756                    predicate
757                };
758
759                // Correlated subquery path: per-row materialisation.
760                if contains_subquery(predicate) {
761                    let result = self.execute_plan_readonly(input)?;
762                    return match result {
763                        QueryResult::Rows { columns, rows } => {
764                            let mut filtered = Vec::new();
765                            for row in rows {
766                                let row_pred = self.materialize_correlated_for_row_readonly(
767                                    predicate, &row, &columns,
768                                )?;
769                                if eval_predicate(&row_pred, &row, &columns) {
770                                    filtered.push(row);
771                                }
772                            }
773                            Ok(QueryResult::Rows {
774                                columns,
775                                rows: filtered,
776                            })
777                        }
778                        _ => Err("filter requires row input".into()),
779                    };
780                }
781
782                // Fused Filter+SeqScan fast path.
783                if let PlanNode::SeqScan { table } = input.as_ref() {
784                    if self.view_registry.is_dirty(table) {
785                        return Err(QueryError::ReadonlyNeedsWrite);
786                    }
787                    let schema = self
788                        .catalog
789                        .schema(table)
790                        .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
791                        .clone();
792                    let columns: Vec<String> =
793                        schema.columns.iter().map(|c| c.name.clone()).collect();
794                    let fast = FastLayout::new(&schema);
795                    let row_layout = RowLayout::new(&schema);
796                    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
797
798                    if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
799                        self.catalog
800                            .for_each_row_raw(table, |_rid, data| {
801                                if compiled(data) {
802                                    rows.push(decode_row(&schema, data));
803                                }
804                            })
805                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
806                    } else {
807                        let pred_cols = predicate_column_indices(predicate, &columns);
808                        self.catalog
809                            .for_each_row_raw(table, |_rid, data| {
810                                let pred_row =
811                                    decode_selective(&schema, &row_layout, data, &pred_cols);
812                                if eval_predicate(predicate, &pred_row, &columns) {
813                                    rows.push(decode_row(&schema, data));
814                                }
815                            })
816                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
817                    }
818
819                    return Ok(QueryResult::Rows { columns, rows });
820                }
821
822                // General path.
823                let result = self.execute_plan_readonly(input)?;
824                match result {
825                    QueryResult::Rows { columns, rows } => {
826                        let filtered: Vec<Vec<Value>> = rows
827                            .into_iter()
828                            .filter(|row| eval_predicate(predicate, row, &columns))
829                            .collect();
830                        Ok(QueryResult::Rows {
831                            columns,
832                            rows: filtered,
833                        })
834                    }
835                    _ => Err("filter requires row input".into()),
836                }
837            }
838
839            PlanNode::Project { input, fields } => {
840                // Fast path: Project over IndexScan. Avoids full-row decode
841                // by calling decode_column only for projected fields.
842                if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
843                    let key_value = literal_to_value(key)?;
844                    let tbl = self
845                        .catalog
846                        .get_table(table)
847                        .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
848                    let schema = &tbl.schema;
849                    let layout = tbl.row_layout();
850
851                    let proj_columns: Vec<String> = fields
852                        .iter()
853                        .map(|f| {
854                            f.alias.clone().unwrap_or_else(|| match &f.expr {
855                                Expr::Field(name) => name.clone(),
856                                _ => "?".into(),
857                            })
858                        })
859                        .collect();
860
861                    let proj_indices: Vec<usize> = fields
862                        .iter()
863                        .filter_map(|f| {
864                            if let Expr::Field(name) = &f.expr {
865                                schema.column_index(name)
866                            } else {
867                                None
868                            }
869                        })
870                        .collect();
871
872                    if tbl.has_index(column) {
873                        let rids = tbl.index_lookup_all(column, &key_value);
874                        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
875                        for rid in rids {
876                            if let Some(data) = tbl.heap.get(rid) {
877                                let row: Vec<Value> = proj_indices
878                                    .iter()
879                                    .map(|&ci| decode_column(schema, layout, &data, ci))
880                                    .collect();
881                                rows.push(row);
882                            }
883                        }
884                        return Ok(QueryResult::Rows {
885                            columns: proj_columns,
886                            rows,
887                        });
888                    }
889                }
890
891                // Fast paths over Limit(Sort(...)) / Limit(Filter(...)) / Limit(SeqScan).
892                if let PlanNode::Limit {
893                    input: inner,
894                    count: limit_expr,
895                } = input.as_ref()
896                {
897                    if let PlanNode::Sort {
898                        input: sort_input,
899                        keys,
900                    } = inner.as_ref()
901                    {
902                        if keys.len() == 1 {
903                            let sort_field = &keys[0].field;
904                            let descending = keys[0].descending;
905                            let limit = match limit_expr {
906                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
907                                _ => usize::MAX,
908                            };
909                            let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
910                                match sort_input.as_ref() {
911                                    PlanNode::SeqScan { table } => (Some(table.as_str()), None),
912                                    PlanNode::Filter {
913                                        input: fi,
914                                        predicate,
915                                    } => {
916                                        if let PlanNode::SeqScan { table } = fi.as_ref() {
917                                            (Some(table.as_str()), Some(predicate))
918                                        } else {
919                                            (None, None)
920                                        }
921                                    }
922                                    _ => (None, None),
923                                };
924                            if let Some(table) = table_opt {
925                                if let Some(result) = self.project_filter_sort_limit_fast(
926                                    table, fields, sort_field, descending, limit, pred_opt,
927                                )? {
928                                    return Ok(result);
929                                }
930                            }
931                        }
932                    }
933                    if let PlanNode::Filter {
934                        input: fi,
935                        predicate,
936                    } = inner.as_ref()
937                    {
938                        if let PlanNode::SeqScan { table } = fi.as_ref() {
939                            let limit = match limit_expr {
940                                Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
941                                _ => usize::MAX,
942                            };
943                            if let Some(result) = self.project_filter_limit_fast(
944                                table,
945                                fields,
946                                limit,
947                                Some(predicate),
948                            )? {
949                                return Ok(result);
950                            }
951                        }
952                    }
953                    if let PlanNode::SeqScan { table } = inner.as_ref() {
954                        let limit = match limit_expr {
955                            Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
956                            _ => usize::MAX,
957                        };
958                        if let Some(result) =
959                            self.project_filter_limit_fast(table, fields, limit, None)?
960                        {
961                            return Ok(result);
962                        }
963                    }
964                }
965
966                // Project(Filter(SeqScan)) without Limit.
967                if let PlanNode::Filter {
968                    input: fi,
969                    predicate,
970                } = input.as_ref()
971                {
972                    if let PlanNode::SeqScan { table } = fi.as_ref() {
973                        if let Some(result) = self.project_filter_limit_fast(
974                            table,
975                            fields,
976                            usize::MAX,
977                            Some(predicate),
978                        )? {
979                            return Ok(result);
980                        }
981                    }
982                }
983
984                // Project(SeqScan) without Filter or Limit.
985                if let PlanNode::SeqScan { table } = input.as_ref() {
986                    if let Some(result) =
987                        self.project_filter_limit_fast(table, fields, usize::MAX, None)?
988                    {
989                        return Ok(result);
990                    }
991                }
992
993                // Generic path.
994                let result = self.execute_plan_readonly(input)?;
995                match result {
996                    QueryResult::Rows { columns, rows } => {
997                        let proj_columns: Vec<String> = fields
998                            .iter()
999                            .map(|f| {
1000                                f.alias.clone().unwrap_or_else(|| match &f.expr {
1001                                    Expr::Field(name) => name.clone(),
1002                                    Expr::QualifiedField { qualifier, field } => {
1003                                        format!("{qualifier}.{field}")
1004                                    }
1005                                    _ => "?".into(),
1006                                })
1007                            })
1008                            .collect();
1009                        let proj_rows: Vec<Vec<Value>> = rows
1010                            .iter()
1011                            .map(|row| {
1012                                fields
1013                                    .iter()
1014                                    .map(|f| eval_expr(&f.expr, row, &columns))
1015                                    .collect()
1016                            })
1017                            .collect();
1018                        Ok(QueryResult::Rows {
1019                            columns: proj_columns,
1020                            rows: proj_rows,
1021                        })
1022                    }
1023                    _ => Err("project requires row input".into()),
1024                }
1025            }
1026
1027            PlanNode::Sort { input, keys } => {
1028                let result = self.execute_plan_readonly(input)?;
1029                match result {
1030                    QueryResult::Rows { columns, mut rows } => {
1031                        if rows.len() > MAX_SORT_ROWS {
1032                            return Err(QueryError::SortLimitExceeded);
1033                        }
1034                        let key_indices: Vec<(usize, bool)> = keys
1035                            .iter()
1036                            .map(|k| {
1037                                columns
1038                                    .iter()
1039                                    .position(|c| c == &k.field)
1040                                    .map(|idx| (idx, k.descending))
1041                                    .ok_or_else(|| QueryError::ColumnNotFound {
1042                                        table: String::new(),
1043                                        column: k.field.clone(),
1044                                    })
1045                            })
1046                            .collect::<Result<_, QueryError>>()?;
1047                        rows.sort_by(|a, b| {
1048                            for &(col_idx, descending) in &key_indices {
1049                                let cmp = a[col_idx].cmp(&b[col_idx]);
1050                                let cmp = if descending { cmp.reverse() } else { cmp };
1051                                if cmp != std::cmp::Ordering::Equal {
1052                                    return cmp;
1053                                }
1054                            }
1055                            std::cmp::Ordering::Equal
1056                        });
1057                        Ok(QueryResult::Rows { columns, rows })
1058                    }
1059                    _ => Err("sort requires row input".into()),
1060                }
1061            }
1062
1063            PlanNode::Limit { input, count } => {
1064                let result = self.execute_plan_readonly(input)?;
1065                let n = match count {
1066                    Expr::Literal(Literal::Int(v)) => *v as usize,
1067                    _ => return Err("limit must be integer literal".into()),
1068                };
1069                match result {
1070                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1071                        columns,
1072                        rows: rows.into_iter().take(n).collect(),
1073                    }),
1074                    _ => Err("limit requires row input".into()),
1075                }
1076            }
1077
1078            PlanNode::Offset { input, count } => {
1079                let result = self.execute_plan_readonly(input)?;
1080                let n = match count {
1081                    Expr::Literal(Literal::Int(v)) => *v as usize,
1082                    _ => return Err("offset must be integer literal".into()),
1083                };
1084                match result {
1085                    QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1086                        columns,
1087                        rows: rows.into_iter().skip(n).collect(),
1088                    }),
1089                    _ => Err("offset requires row input".into()),
1090                }
1091            }
1092
1093            PlanNode::Aggregate {
1094                input,
1095                function,
1096                field,
1097            } => {
1098                // Fast path: count() over SeqScan.
1099                if *function == AggFunc::Count {
1100                    if let PlanNode::SeqScan { table } = input.as_ref() {
1101                        let mut count: i64 = 0;
1102                        self.catalog
1103                            .for_each_row_raw(table, |_rid, _data| {
1104                                count += 1;
1105                            })
1106                            .map_err(|e| QueryError::StorageError(e.to_string()))?;
1107                        return Ok(QueryResult::Scalar(Value::Int(count)));
1108                    }
1109                    if let PlanNode::Filter {
1110                        input: inner,
1111                        predicate,
1112                    } = input.as_ref()
1113                    {
1114                        if let PlanNode::SeqScan { table } = inner.as_ref() {
1115                            let schema = self
1116                                .catalog
1117                                .schema(table)
1118                                .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
1119                                .clone();
1120                            let columns: Vec<String> =
1121                                schema.columns.iter().map(|c| c.name.clone()).collect();
1122                            let fast = FastLayout::new(&schema);
1123                            let row_layout = RowLayout::new(&schema);
1124
1125                            if let Some(compiled) =
1126                                compile_predicate(predicate, &columns, &fast, &schema)
1127                            {
1128                                let mut count: i64 = 0;
1129                                self.catalog
1130                                    .for_each_row_raw(table, |_rid, data| {
1131                                        if compiled(data) {
1132                                            count += 1;
1133                                        }
1134                                    })
1135                                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1136                                return Ok(QueryResult::Scalar(Value::Int(count)));
1137                            }
1138
1139                            let pred_cols = predicate_column_indices(predicate, &columns);
1140                            let mut count: i64 = 0;
1141                            self.catalog
1142                                .for_each_row_raw(table, |_rid, data| {
1143                                    let pred_row =
1144                                        decode_selective(&schema, &row_layout, data, &pred_cols);
1145                                    if eval_predicate(predicate, &pred_row, &columns) {
1146                                        count += 1;
1147                                    }
1148                                })
1149                                .map_err(|e| QueryError::StorageError(e.to_string()))?;
1150                            return Ok(QueryResult::Scalar(Value::Int(count)));
1151                        }
1152                    }
1153                }
1154
1155                // Fast path: sum/avg/min/max over single fixed-size numeric.
1156                if matches!(
1157                    function,
1158                    AggFunc::Sum
1159                        | AggFunc::Avg
1160                        | AggFunc::Min
1161                        | AggFunc::Max
1162                        | AggFunc::CountDistinct
1163                ) {
1164                    if let Some(col) = field.as_ref() {
1165                        let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
1166                            match input.as_ref() {
1167                                PlanNode::SeqScan { table } => (Some(table.as_str()), None),
1168                                PlanNode::Filter {
1169                                    input: inner,
1170                                    predicate,
1171                                } => {
1172                                    if let PlanNode::SeqScan { table } = inner.as_ref() {
1173                                        (Some(table.as_str()), Some(predicate))
1174                                    } else {
1175                                        (None, None)
1176                                    }
1177                                }
1178                                _ => (None, None),
1179                            };
1180                        if let Some(table) = table_opt {
1181                            if let Some(result) =
1182                                self.agg_single_col_fast(table, col, *function, pred_opt)?
1183                            {
1184                                return Ok(result);
1185                            }
1186                        }
1187                    }
1188                }
1189
1190                // Generic path.
1191                let result = self.execute_plan_readonly(input)?;
1192                match result {
1193                    QueryResult::Rows { columns, rows } => match function {
1194                        AggFunc::Count => Ok(QueryResult::Scalar(Value::Int(rows.len() as i64))),
1195                        AggFunc::CountDistinct => {
1196                            let col = field.as_ref().ok_or("count distinct requires field")?;
1197                            let idx = columns
1198                                .iter()
1199                                .position(|c| c == col)
1200                                .ok_or("col not found")?;
1201                            let mut seen = std::collections::HashSet::new();
1202                            for row in &rows {
1203                                let v = &row[idx];
1204                                if !v.is_empty() {
1205                                    seen.insert(v.clone());
1206                                }
1207                            }
1208                            Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
1209                        }
1210                        AggFunc::Avg => {
1211                            let col = field.as_ref().ok_or("avg requires field")?;
1212                            let idx = columns
1213                                .iter()
1214                                .position(|c| c == col)
1215                                .ok_or("col not found")?;
1216                            let sum: f64 = rows
1217                                .iter()
1218                                .filter_map(|r| match &r[idx] {
1219                                    Value::Int(v) => Some(*v as f64),
1220                                    Value::Float(v) => Some(*v),
1221                                    _ => None,
1222                                })
1223                                .sum();
1224                            let count = rows.len() as f64;
1225                            Ok(QueryResult::Scalar(Value::Float(sum / count)))
1226                        }
1227                        AggFunc::Sum => {
1228                            let col = field.as_ref().ok_or("sum requires field")?;
1229                            let idx = columns
1230                                .iter()
1231                                .position(|c| c == col)
1232                                .ok_or("col not found")?;
1233                            let mut int_sum: i64 = 0;
1234                            let mut float_sum: f64 = 0.0;
1235                            let mut saw_float = false;
1236                            for r in &rows {
1237                                match &r[idx] {
1238                                    Value::Int(v) => int_sum += *v,
1239                                    Value::Float(v) => {
1240                                        float_sum += *v;
1241                                        saw_float = true;
1242                                    }
1243                                    _ => {}
1244                                }
1245                            }
1246                            let result = if saw_float {
1247                                Value::Float(float_sum + int_sum as f64)
1248                            } else {
1249                                Value::Int(int_sum)
1250                            };
1251                            Ok(QueryResult::Scalar(result))
1252                        }
1253                        AggFunc::Min | AggFunc::Max => {
1254                            let col = field.as_ref().ok_or("min/max requires field")?;
1255                            let idx = columns
1256                                .iter()
1257                                .position(|c| c == col)
1258                                .ok_or("col not found")?;
1259                            let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
1260                            let result = if *function == AggFunc::Min {
1261                                vals.into_iter().min().cloned()
1262                            } else {
1263                                vals.into_iter().max().cloned()
1264                            };
1265                            Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
1266                        }
1267                    },
1268                    _ => Err("aggregate requires row input".into()),
1269                }
1270            }
1271
1272            PlanNode::Distinct { input } => {
1273                let result = self.execute_plan_readonly(input)?;
1274                match result {
1275                    QueryResult::Rows { columns, rows } => {
1276                        let mut seen = std::collections::HashSet::new();
1277                        let mut unique_rows = Vec::new();
1278                        for row in rows {
1279                            if seen.insert(row.clone()) {
1280                                unique_rows.push(row);
1281                            }
1282                        }
1283                        Ok(QueryResult::Rows {
1284                            columns,
1285                            rows: unique_rows,
1286                        })
1287                    }
1288                    other => Ok(other),
1289                }
1290            }
1291
1292            PlanNode::GroupBy {
1293                input,
1294                keys,
1295                aggregates,
1296                having,
1297            } => {
1298                let result = self.execute_plan_readonly(input)?;
1299                match result {
1300                    QueryResult::Rows { columns, rows } => {
1301                        let key_indices: Vec<usize> = keys
1302                            .iter()
1303                            .map(|k| {
1304                                columns.iter().position(|c| c == k).ok_or_else(|| {
1305                                    QueryError::ColumnNotFound {
1306                                        table: String::new(),
1307                                        column: k.clone(),
1308                                    }
1309                                })
1310                            })
1311                            .collect::<Result<Vec<_>, _>>()?;
1312
1313                        let agg_field_indices: Vec<usize> = aggregates
1314                            .iter()
1315                            .map(|a| {
1316                                if a.field == "*" {
1317                                    Ok(usize::MAX)
1318                                } else {
1319                                    columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1320                                        QueryError::ColumnNotFound {
1321                                            table: String::new(),
1322                                            column: a.field.clone(),
1323                                        }
1324                                    })
1325                                }
1326                            })
1327                            .collect::<Result<Vec<_>, _>>()?;
1328
1329                        let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1330                            rustc_hash::FxHashMap::default();
1331                        let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1332                        for (ri, row) in rows.iter().enumerate() {
1333                            let key: Vec<Value> =
1334                                key_indices.iter().map(|&i| row[i].clone()).collect();
1335                            match group_map.get(&key) {
1336                                Some(&idx) => groups[idx].1.push(ri),
1337                                None => {
1338                                    let idx = groups.len();
1339                                    group_map.insert(key.clone(), idx);
1340                                    groups.push((key, vec![ri]));
1341                                }
1342                            }
1343                        }
1344
1345                        let mut out_columns: Vec<String> = keys.clone();
1346                        for agg in aggregates.iter() {
1347                            out_columns.push(agg.output_name.clone());
1348                        }
1349
1350                        let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1351                        for (key_vals, row_indices) in &groups {
1352                            let mut row = key_vals.clone();
1353                            for (ai, agg) in aggregates.iter().enumerate() {
1354                                let col_idx = agg_field_indices[ai];
1355                                let val = compute_group_aggregate(
1356                                    agg.function,
1357                                    &rows,
1358                                    row_indices,
1359                                    col_idx,
1360                                );
1361                                row.push(val);
1362                            }
1363                            out_rows.push(row);
1364                        }
1365
1366                        if let Some(having_expr) = having {
1367                            out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1368                        }
1369
1370                        Ok(QueryResult::Rows {
1371                            columns: out_columns,
1372                            rows: out_rows,
1373                        })
1374                    }
1375                    _ => Err("group by requires row input".into()),
1376                }
1377            }
1378
1379            PlanNode::NestedLoopJoin {
1380                left,
1381                right,
1382                on,
1383                kind,
1384            } => {
1385                let left_result = self.execute_plan_readonly(left)?;
1386                let right_result = self.execute_plan_readonly(right)?;
1387                let (left_columns, left_rows) = match left_result {
1388                    QueryResult::Rows { columns, rows } => (columns, rows),
1389                    _ => return Err("join left side must produce rows".into()),
1390                };
1391                let (right_columns, right_rows) = match right_result {
1392                    QueryResult::Rows { columns, rows } => (columns, rows),
1393                    _ => return Err("join right side must produce rows".into()),
1394                };
1395
1396                if !matches!(kind, JoinKind::Cross) {
1397                    if let Some(pred) = on {
1398                        if let Some((l_idx, r_idx)) =
1399                            try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1400                        {
1401                            let result = hash_join(
1402                                left_columns,
1403                                left_rows,
1404                                right_columns,
1405                                right_rows,
1406                                l_idx,
1407                                r_idx,
1408                                *kind,
1409                            );
1410                            if let QueryResult::Rows { ref rows, .. } = result {
1411                                check_join_limit(rows.len())?;
1412                            }
1413                            return Ok(result);
1414                        }
1415                    }
1416                }
1417
1418                let n_left = left_columns.len();
1419                let n_right = right_columns.len();
1420                let mut columns = Vec::with_capacity(n_left + n_right);
1421                columns.extend(left_columns);
1422                columns.extend(right_columns);
1423
1424                let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1425                let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1426
1427                for left_row in &left_rows {
1428                    let mut matched = false;
1429                    for right_row in &right_rows {
1430                        combined.clear();
1431                        combined.extend_from_slice(left_row);
1432                        combined.extend_from_slice(right_row);
1433                        let keep = match kind {
1434                            JoinKind::Cross => true,
1435                            JoinKind::Inner | JoinKind::LeftOuter => match on {
1436                                Some(pred) => eval_predicate(pred, &combined, &columns),
1437                                None => true,
1438                            },
1439                            JoinKind::RightOuter => {
1440                                unreachable!("planner rewrites RightOuter to LeftOuter")
1441                            }
1442                        };
1443                        if keep {
1444                            rows.push(combined.clone());
1445                            check_join_limit(rows.len())?;
1446                            matched = true;
1447                        }
1448                    }
1449                    if !matched && matches!(kind, JoinKind::LeftOuter) {
1450                        let mut row = Vec::with_capacity(n_left + n_right);
1451                        row.extend_from_slice(left_row);
1452                        row.resize(n_left + n_right, Value::Empty);
1453                        rows.push(row);
1454                        check_join_limit(rows.len())?;
1455                    }
1456                }
1457
1458                Ok(QueryResult::Rows { columns, rows })
1459            }
1460
1461            PlanNode::Window { input, windows } => {
1462                let result = self.execute_plan_readonly(input)?;
1463                execute_window(result, windows)
1464            }
1465
1466            PlanNode::Union { left, right, all } => {
1467                let left_result = self.execute_plan_readonly(left)?;
1468                let right_result = self.execute_plan_readonly(right)?;
1469                let (left_cols, left_rows) = match left_result {
1470                    QueryResult::Rows { columns, rows } => (columns, rows),
1471                    _ => return Err("UNION requires query results on left side".into()),
1472                };
1473                let (_, right_rows) = match right_result {
1474                    QueryResult::Rows { columns, rows } => (columns, rows),
1475                    _ => return Err("UNION requires query results on right side".into()),
1476                };
1477                let mut combined = left_rows;
1478                if *all {
1479                    combined.extend(right_rows);
1480                } else {
1481                    let mut seen = std::collections::HashSet::new();
1482                    for row in &combined {
1483                        seen.insert(row.clone());
1484                    }
1485                    for row in right_rows {
1486                        if seen.insert(row.clone()) {
1487                            combined.push(row);
1488                        }
1489                    }
1490                }
1491                Ok(QueryResult::Rows {
1492                    columns: left_cols,
1493                    rows: combined,
1494                })
1495            }
1496
1497            PlanNode::Explain { input } => {
1498                let text = format_plan_tree(input, 0);
1499                Ok(QueryResult::Rows {
1500                    columns: vec!["plan".to_string()],
1501                    rows: text
1502                        .lines()
1503                        .map(|line| vec![Value::Str(line.to_string())])
1504                        .collect(),
1505                })
1506            }
1507
1508            // All write variants — caller must escalate to the write lock.
1509            PlanNode::Insert { .. }
1510            | PlanNode::Update { .. }
1511            | PlanNode::Delete { .. }
1512            | PlanNode::Upsert { .. }
1513            | PlanNode::CreateTable { .. }
1514            | PlanNode::AlterTable { .. }
1515            | PlanNode::DropTable { .. }
1516            | PlanNode::CreateView { .. }
1517            | PlanNode::RefreshView { .. }
1518            | PlanNode::DropView { .. }
1519            | PlanNode::Begin
1520            | PlanNode::Commit
1521            | PlanNode::Rollback => Err(QueryError::ReadonlyNeedsWrite),
1522        }
1523    }
1524
1525    /// `&self` variant of [`Engine::materialize_subqueries`]. Used by the
1526    /// read path so `Filter` predicates with `InSubquery`/`ExistsSubquery`
1527    /// children can evaluate their inner queries without taking the write
1528    /// lock. Inner queries that would themselves need a write (e.g. dirty
1529    /// view) escalate via [`READONLY_NEEDS_WRITE`] just like the top-level
1530    /// read path does.
1531    fn materialize_subqueries_readonly(&self, expr: &Expr) -> Result<Expr, QueryError> {
1532        match expr {
1533            Expr::InSubquery {
1534                expr: inner,
1535                subquery,
1536                negated,
1537            } => {
1538                if is_correlated_subquery(subquery, &self.catalog) {
1539                    // Pass through — will be materialized per-row in the
1540                    // Filter handler's correlated subquery path.
1541                    let inner = self.materialize_subqueries_readonly(inner)?;
1542                    return Ok(Expr::InSubquery {
1543                        expr: Box::new(inner),
1544                        subquery: subquery.clone(),
1545                        negated: *negated,
1546                    });
1547                }
1548                let inner = self.materialize_subqueries_readonly(inner)?;
1549                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1550                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1551                let result = self.execute_plan_readonly(&sub_plan)?;
1552                let values = match result {
1553                    QueryResult::Rows { rows, .. } => rows
1554                        .into_iter()
1555                        .filter_map(|mut row| {
1556                            if row.is_empty() {
1557                                None
1558                            } else {
1559                                Some(value_to_expr(row.swap_remove(0)))
1560                            }
1561                        })
1562                        .collect(),
1563                    _ => Vec::new(),
1564                };
1565                Ok(Expr::InList {
1566                    expr: Box::new(inner),
1567                    list: values,
1568                    negated: *negated,
1569                })
1570            }
1571            Expr::ExistsSubquery { subquery, negated } => {
1572                if is_correlated_subquery(subquery, &self.catalog) {
1573                    return Ok(expr.clone());
1574                }
1575                let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1576                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1577                let result = self.execute_plan_readonly(&sub_plan)?;
1578                let has_rows = match result {
1579                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
1580                    _ => false,
1581                };
1582                let truth = if *negated { !has_rows } else { has_rows };
1583                Ok(Expr::Literal(Literal::Bool(truth)))
1584            }
1585            Expr::BinaryOp(l, op, r) => {
1586                let l = self.materialize_subqueries_readonly(l)?;
1587                let r = self.materialize_subqueries_readonly(r)?;
1588                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1589            }
1590            Expr::UnaryOp(op, inner) => {
1591                let inner = self.materialize_subqueries_readonly(inner)?;
1592                Ok(Expr::UnaryOp(*op, Box::new(inner)))
1593            }
1594            Expr::Case { whens, else_expr } => {
1595                let whens = whens
1596                    .iter()
1597                    .map(|(c, r)| {
1598                        let c = self.materialize_subqueries_readonly(c)?;
1599                        let r = self.materialize_subqueries_readonly(r)?;
1600                        Ok((Box::new(c), Box::new(r)))
1601                    })
1602                    .collect::<Result<Vec<_>, QueryError>>()?;
1603                let else_expr = match else_expr {
1604                    Some(e) => Some(Box::new(self.materialize_subqueries_readonly(e)?)),
1605                    None => None,
1606                };
1607                Ok(Expr::Case { whens, else_expr })
1608            }
1609            other => Ok(other.clone()),
1610        }
1611    }
1612
1613    /// Per-row materialisation of correlated subqueries. For each row in the
1614    /// outer query, substitute outer column references in the subquery's
1615    /// filter with the current row's literal values, execute the modified
1616    /// subquery, and return the result as an InList or Bool literal.
1617    fn materialize_correlated_for_row_readonly(
1618        &self,
1619        expr: &Expr,
1620        outer_row: &[Value],
1621        outer_columns: &[String],
1622    ) -> Result<Expr, QueryError> {
1623        match expr {
1624            Expr::InSubquery {
1625                expr: inner,
1626                subquery,
1627                negated,
1628            } => {
1629                let inner =
1630                    self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1631                let mut sub = *subquery.clone();
1632                if let Some(ref filter) = sub.filter {
1633                    sub.filter = Some(substitute_outer_refs(
1634                        filter,
1635                        &sub.source,
1636                        &self.catalog,
1637                        outer_row,
1638                        outer_columns,
1639                    ));
1640                }
1641                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1642                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1643                let result = self.execute_plan_readonly(&sub_plan)?;
1644                let values = match result {
1645                    QueryResult::Rows { rows, .. } => rows
1646                        .into_iter()
1647                        .filter_map(|mut row| {
1648                            if row.is_empty() {
1649                                None
1650                            } else {
1651                                Some(value_to_expr(row.swap_remove(0)))
1652                            }
1653                        })
1654                        .collect(),
1655                    _ => Vec::new(),
1656                };
1657                Ok(Expr::InList {
1658                    expr: Box::new(inner),
1659                    list: values,
1660                    negated: *negated,
1661                })
1662            }
1663            Expr::ExistsSubquery { subquery, negated } => {
1664                let mut sub = *subquery.clone();
1665                if let Some(ref filter) = sub.filter {
1666                    sub.filter = Some(substitute_outer_refs(
1667                        filter,
1668                        &sub.source,
1669                        &self.catalog,
1670                        outer_row,
1671                        outer_columns,
1672                    ));
1673                }
1674                let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1675                    .map_err(|e| QueryError::StorageError(e.to_string()))?;
1676                let result = self.execute_plan_readonly(&sub_plan)?;
1677                let has_rows = match result {
1678                    QueryResult::Rows { rows, .. } => !rows.is_empty(),
1679                    _ => false,
1680                };
1681                let truth = if *negated { !has_rows } else { has_rows };
1682                Ok(Expr::Literal(Literal::Bool(truth)))
1683            }
1684            Expr::BinaryOp(l, op, r) => {
1685                let l =
1686                    self.materialize_correlated_for_row_readonly(l, outer_row, outer_columns)?;
1687                let r =
1688                    self.materialize_correlated_for_row_readonly(r, outer_row, outer_columns)?;
1689                Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1690            }
1691            Expr::UnaryOp(op, inner) => {
1692                let inner =
1693                    self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1694                Ok(Expr::UnaryOp(*op, Box::new(inner)))
1695            }
1696            other => Ok(other.clone()),
1697        }
1698    }
1699
1700    pub fn catalog(&self) -> &Catalog {
1701        &self.catalog
1702    }
1703
1704    pub fn catalog_mut(&mut self) -> &mut Catalog {
1705        &mut self.catalog
1706    }
1707}