powdb_query/executor/mod.rs
1//! PowDB query executor.
2
3// Submodules that don't use macros defined in this file.
4mod compiled;
5mod eval;
6pub mod mem_budget;
7
8use crate::ast::*;
9use crate::canonicalize::canonicalize;
10use crate::plan::*;
11use crate::plan_cache::PlanCache;
12use crate::planner;
13use crate::result::{QueryError, QueryResult};
14use powdb_storage::catalog::Catalog;
15use powdb_storage::row::{decode_column, decode_row, RowLayout};
16use powdb_storage::types::*;
17use powdb_storage::view::ViewRegistry;
18
19use std::io;
20use std::path::Path;
21use std::sync::Mutex;
22use std::time::Instant;
23use tracing::{error, info, Level};
24
25use self::compiled::*;
26use self::eval::*;
27
28/// Legacy sentinel string constant — kept for backward compatibility with
29/// any external code matching on the string representation. New code should
30/// match on `QueryError::ReadonlyNeedsWrite` directly.
31pub const READONLY_NEEDS_WRITE: &str = "__POWDB_READONLY_NEEDS_WRITE__";
32
33/// Plan cache capacity. Bench workloads fill ~15 slots; real apps will sit
34/// comfortably in 256. Lookup is O(1), collisions clear the cache (see
35/// `plan_cache::PlanCache::insert`).
36const PLAN_CACHE_CAPACITY: usize = 256;
37
38/// Maximum number of rows a join may produce before the executor aborts.
39/// Prevents Cartesian-product blowups (e.g. `T cross join T` on 10K rows
40/// would produce 100M rows in memory without this cap).
41pub(super) const MAX_JOIN_ROWS: usize = 1_000_000;
42
43/// Maximum number of rows that may be materialized for sorting.
44/// Queries that exceed this should add a LIMIT clause to narrow the input
45/// before sorting.
46pub(super) const MAX_SORT_ROWS: usize = 10_000_000;
47
48#[inline]
49pub(super) fn check_join_limit(row_count: usize) -> Result<(), QueryError> {
50 if row_count > MAX_JOIN_ROWS {
51 return Err(QueryError::JoinLimitExceeded);
52 }
53 Ok(())
54}
55
56// ─── Mission D11 Phase 1: scalar hot-loop helpers ─────────────────────────
57//
58// These macros expand into the scan body of `agg_single_col_fast` and sit
59// inside the `for_each_row_raw` closure. They exist to:
60//
61// 1. Split the loop on presence of a predicate *outside* the hot body,
62// so the no-predicate path (agg_sum/agg_min/agg_max bench workloads)
63// never pays the `Option<CompiledPredicate>` branch per row.
64// 2. Drop two bounds checks per row by reading the null bitmap byte
65// and the 8-byte value via raw pointer casts.
66//
67// SAFETY (shared across every call site below):
68//
69// - `$bmp_byte` is `col_idx / 8` where `col_idx < n_cols`, and the row
70// encoding stores `bitmap_size = n_cols.div_ceil(8)` bytes of bitmap
71// starting at offset 2. So `2 + $bmp_byte < 2 + bitmap_size ≤ row_len`
72// and `get_unchecked(2 + $bmp_byte)` is inside the row slice.
73// - `$off = 2 + bitmap_size + fixed_offsets[col_idx]` for a fixed-size
74// column. Every fixed-size column contributes `fixed_size(type_id)`
75// bytes to the fixed region, so the row always has `[$off .. $off+8]`
76// available for any i64/f64 column — enforced by the row encoder
77// (`storage/src/row.rs`) and the schema invariant that a row with a
78// given schema has `row_len ≥ 2 + bitmap_size + fixed_region_size`.
79// - Both macros are only invoked from `agg_single_col_fast`, which
80// early-returns if the column isn't Int/Float (8-byte fixed) and
81// early-returns if `fast.fixed_offsets[col_idx]` is `None`.
82macro_rules! agg_int_loop {
83 (
84 $self:expr, $table:expr, $pred:expr,
85 $bmp_byte:expr, $bmp_bit:expr, $off:expr,
86 |$v:ident : i64| $body:block
87 ) => {{
88 let bmp_byte = $bmp_byte;
89 let bmp_bit = $bmp_bit;
90 let off = $off;
91 if let Some(pred) = &$pred {
92 $self
93 .catalog
94 .for_each_row_raw($table, |_rid, data| {
95 if !pred(data) {
96 return;
97 }
98 // Bounds guard: skip corrupt/truncated rows that are too
99 // short to contain the bitmap byte or the 8-byte value.
100 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
101 return;
102 }
103 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
104 // The bitmap byte lives at offset 2..2+bitmap_size in the
105 // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
106 // Corrupt rows are rejected by the bounds guard.
107 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
108 if (bmp >> bmp_bit) & 1 == 1 {
109 return;
110 }
111 // SAFETY: `off + 8 <= data.len()` is checked above.
112 // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
113 // to an 8-byte i64 in the fixed-size region of the row.
114 // The pointer cast is valid because we read exactly 8
115 // bytes via from_le_bytes. Corrupt rows are rejected by
116 // the bounds guard.
117 let $v: i64 =
118 unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
119 $body
120 })
121 .map_err(|e| QueryError::StorageError(e.to_string()))?;
122 } else {
123 $self
124 .catalog
125 .for_each_row_raw($table, |_rid, data| {
126 // Bounds guard: skip corrupt/truncated rows.
127 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
128 return;
129 }
130 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
131 // See the predicate branch for the full invariant.
132 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
133 if (bmp >> bmp_bit) & 1 == 1 {
134 return;
135 }
136 // SAFETY: `off + 8 <= data.len()` is checked above.
137 // See the predicate branch for the full invariant.
138 let $v: i64 =
139 unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
140 $body
141 })
142 .map_err(|e| QueryError::StorageError(e.to_string()))?;
143 }
144 }};
145}
146
147macro_rules! agg_float_loop {
148 (
149 $self:expr, $table:expr, $pred:expr,
150 $bmp_byte:expr, $bmp_bit:expr, $off:expr,
151 |$v:ident : f64| $body:block
152 ) => {{
153 let bmp_byte = $bmp_byte;
154 let bmp_bit = $bmp_bit;
155 let off = $off;
156 if let Some(pred) = &$pred {
157 $self
158 .catalog
159 .for_each_row_raw($table, |_rid, data| {
160 if !pred(data) {
161 return;
162 }
163 // Bounds guard: skip corrupt/truncated rows that are too
164 // short to contain the bitmap byte or the 8-byte value.
165 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
166 return;
167 }
168 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
169 // The bitmap byte lives at offset 2..2+bitmap_size in the
170 // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
171 // Corrupt rows are rejected by the bounds guard.
172 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
173 if (bmp >> bmp_bit) & 1 == 1 {
174 return;
175 }
176 // SAFETY: `off + 8 <= data.len()` is checked above.
177 // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
178 // to an 8-byte f64 in the fixed-size region of the row.
179 // The pointer cast is valid because we read exactly 8
180 // bytes via from_le_bytes. Corrupt rows are rejected by
181 // the bounds guard.
182 let $v: f64 =
183 unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
184 $body
185 })
186 .map_err(|e| QueryError::StorageError(e.to_string()))?;
187 } else {
188 $self
189 .catalog
190 .for_each_row_raw($table, |_rid, data| {
191 // Bounds guard: skip corrupt/truncated rows.
192 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
193 return;
194 }
195 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
196 // See the predicate branch for the full invariant.
197 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
198 if (bmp >> bmp_bit) & 1 == 1 {
199 return;
200 }
201 // SAFETY: `off + 8 <= data.len()` is checked above.
202 // See the predicate branch for the full invariant.
203 let $v: f64 =
204 unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
205 $body
206 })
207 .map_err(|e| QueryError::StorageError(e.to_string()))?;
208 }
209 }};
210}
211
212// Submodules that use the macros above — must be declared after macro_rules!.
213mod plan_exec;
214mod prepared;
215
216#[cfg(test)]
217mod tests;
218
219// Re-exports for the public API
220pub use self::prepared::PreparedQuery;
221
222use self::plan_exec::{
223 compute_group_aggregate, execute_window, format_plan_tree, hash_join,
224 lower_unindexed_range_scans, range_matches, synthesize_range_predicate,
225 try_extract_equi_join_keys,
226};
227
228/// Mission infra-1: classify a parsed statement as read-only vs. mutating.
229/// Used by [`Engine::execute_powql_readonly`] and by the server handler
230/// to decide between the RwLock reader and writer sides. `Union` recurses
231/// because each side can independently be read/write (though in practice
232/// both sides are reads — the parser only builds Union from query shapes).
233pub fn is_read_only_statement(stmt: &Statement) -> bool {
234 match stmt {
235 Statement::Query(_) => true,
236 Statement::Union(u) => is_read_only_statement(&u.left) && is_read_only_statement(&u.right),
237 Statement::Insert(_)
238 | Statement::Upsert(_)
239 | Statement::UpdateQuery(_)
240 | Statement::DeleteQuery(_)
241 | Statement::CreateType(_)
242 | Statement::AlterTable(_)
243 | Statement::DropTable(_)
244 | Statement::CreateView(_)
245 | Statement::RefreshView(_)
246 | Statement::DropView(_) => false,
247 Statement::Begin | Statement::Commit | Statement::Rollback => false,
248 Statement::Explain(inner) => is_read_only_statement(inner),
249 }
250}
251
252pub struct Engine {
253 catalog: Catalog,
254 /// Mission D9 — cached parsed+planned query trees keyed by canonical
255 /// hash. Saves the ~3μs parse+plan cost on repeat queries that differ
256 /// only in literal values.
257 ///
258 /// Mission infra-1: wrapped in `Mutex` so the read path can be driven
259 /// by `&self`. The critical section is extremely short — a single
260 /// hashmap lookup + plan clone on a hit, or a single insert on a miss.
261 /// A full `RwLock` would be over-engineered here; the contention window
262 /// is smaller than the read-path scan work it gates.
263 plan_cache: Mutex<PlanCache>,
264 /// Mission C Phase 13: reusable `Vec<Value>` scratch buffer for the
265 /// prepared-insert fast path. `execute_prepared` used to allocate a
266 /// fresh `vec![Value::Empty; n_cols]` on every insert; recycling this
267 /// buffer shaves one heap alloc per row on `insert_batch_1k`.
268 insert_values_scratch: Vec<Value>,
269 /// Materialized view registry: tracks view definitions, dependencies,
270 /// and dirty state. Views are backed by regular catalog tables; this
271 /// registry adds the lifecycle metadata.
272 view_registry: ViewRegistry,
273 in_transaction: bool,
274 /// WS2 — per-query memory budget ceiling (bytes). The running total lives
275 /// in a thread-local (see [`mem_budget`]) and is reset at every top-level
276 /// query entry, so sort/join/GROUP BY/IN-list materialization can be capped
277 /// without OOM-killing the process. This field holds only the *limit* (a
278 /// plain `usize`, so `Engine` stays `Sync` for the concurrent read path).
279 /// Default [`mem_budget::DEFAULT_QUERY_MEMORY_LIMIT`] (256 MB); overridable
280 /// via `Engine::with_memory_limit` (server reads `POWDB_QUERY_MEMORY_LIMIT`).
281 query_memory_limit: usize,
282}
283
284impl Engine {
285 /// Open or create a PowDB engine rooted at `data_dir`.
286 ///
287 /// If the directory already contains a catalog, it is reopened.
288 /// Otherwise a fresh empty database is created.
289 ///
290 /// # Examples
291 ///
292 /// ```
293 /// use powdb_query::executor::Engine;
294 ///
295 /// let dir = tempfile::tempdir().unwrap();
296 /// let engine = Engine::new(dir.path()).unwrap();
297 /// // Engine is ready — the directory now contains a catalog.
298 /// ```
299 pub fn new(data_dir: &Path) -> io::Result<Self> {
300 std::fs::create_dir_all(data_dir)?;
301 // Try to reopen an existing database first; only create a fresh
302 // catalog when there isn't one already on disk.
303 let catalog = match Catalog::open(data_dir) {
304 Ok(c) => {
305 info!(data_dir = %data_dir.display(), "engine reopened existing database");
306 c
307 }
308 Err(e) if e.kind() == io::ErrorKind::NotFound => {
309 info!(data_dir = %data_dir.display(), "engine initialized fresh database");
310 Catalog::create(data_dir)?
311 }
312 Err(e) => return Err(e),
313 };
314 let view_registry =
315 ViewRegistry::open(data_dir).unwrap_or_else(|_| ViewRegistry::new(data_dir));
316 Ok(Engine {
317 catalog,
318 plan_cache: Mutex::new(PlanCache::new(PLAN_CACHE_CAPACITY)),
319 insert_values_scratch: Vec::new(),
320 view_registry,
321 in_transaction: false,
322 query_memory_limit: mem_budget::DEFAULT_QUERY_MEMORY_LIMIT,
323 })
324 }
325
326 /// Open or create an engine with an explicit per-query memory limit
327 /// (bytes). Used by the server to apply `POWDB_QUERY_MEMORY_LIMIT`, and by
328 /// tests that need a tiny limit to exercise the budget guard.
329 pub fn with_memory_limit(data_dir: &Path, limit_bytes: usize) -> io::Result<Self> {
330 let mut engine = Engine::new(data_dir)?;
331 engine.set_query_memory_limit(limit_bytes);
332 Ok(engine)
333 }
334
335 /// Current per-query memory limit in bytes.
336 pub fn query_memory_limit(&self) -> usize {
337 self.query_memory_limit
338 }
339
340 /// Override the per-query memory limit in bytes (builder-style).
341 pub fn set_query_memory_limit(&mut self, limit_bytes: usize) {
342 self.query_memory_limit = limit_bytes;
343 }
344
345 /// Enter a budgeted-statement frame for the current query. The returned
346 /// guard must be held for the duration of the statement; on its drop the
347 /// reentrancy depth is decremented. Only the *outermost* statement entry
348 /// zeroes this thread's running total, so a nested `execute_powql` (the
349 /// source query of a `create_view`/`refresh_view`) does NOT discard the
350 /// outer frame's accounting. The accumulator is thread-local, so this never
351 /// touches another concurrent query's total.
352 #[must_use = "the budget guard must outlive the statement body"]
353 pub(super) fn enter_memory_budget(&self) -> mem_budget::EnterGuard {
354 mem_budget::enter()
355 }
356
357 /// Charge the estimated footprint of a freshly materialized batch of rows
358 /// against the current per-query budget. Returns
359 /// [`QueryError::MemoryLimitExceeded`] cleanly if the batch would push the
360 /// query over its limit. Used at every full-materialization point (sort
361 /// buffer, join build side, GROUP BY hash table, IN-list).
362 pub(super) fn charge_rows(&self, rows: &[Vec<Value>]) -> Result<(), QueryError> {
363 let mut total = 0usize;
364 for row in rows {
365 total = total.saturating_add(mem_budget::estimate_row_size(row));
366 }
367 mem_budget::charge(total, self.query_memory_limit)
368 }
369
370 /// Charge a materialized IN-list (the literal expressions pulled out of an
371 /// uncorrelated `IN (subquery)`) against the current per-query budget.
372 /// Each item is conservatively sized at the `Expr` slot plus, for string
373 /// literals, the owned heap bytes.
374 pub(super) fn charge_in_list(&self, list: &[crate::ast::Expr]) -> Result<(), QueryError> {
375 let base = std::mem::size_of::<crate::ast::Expr>();
376 let mut total = std::mem::size_of::<Vec<crate::ast::Expr>>();
377 for item in list {
378 total = total.saturating_add(base);
379 if let crate::ast::Expr::Literal(crate::ast::Literal::String(s)) = item {
380 total = total.saturating_add(s.capacity());
381 }
382 }
383 mem_budget::charge(total, self.query_memory_limit)
384 }
385
386 /// Parse + plan + execute a PowQL query.
387 ///
388 /// # Examples
389 ///
390 /// ```
391 /// use powdb_query::executor::Engine;
392 /// use powdb_query::result::QueryResult;
393 ///
394 /// let dir = tempfile::tempdir().unwrap();
395 /// let mut engine = Engine::new(dir.path()).unwrap();
396 ///
397 /// // Create a table and insert a row.
398 /// engine.execute_powql("type User { required name: str, age: int }").unwrap();
399 /// engine.execute_powql(r#"insert User { name := "Alice", age := 30 }"#).unwrap();
400 ///
401 /// // Query rows back.
402 /// let result = engine.execute_powql("User").unwrap();
403 /// assert_eq!(result.row_count(), 1);
404 /// ```
405 ///
406 /// Mission D6 — tracing collapse: the previous implementation ran 4
407 /// `Instant::now()` + 3 `elapsed().as_micros()` calls + formatted an
408 /// `info!` span on every query, even when tracing was disabled. On a
409 /// sub-microsecond `point_lookup_indexed` call that overhead was
410 /// 100-200ns — 20%+ of the whole query. We now measure time only when
411 /// INFO is actually enabled via `tracing::enabled!`, and we moved the
412 /// noisy `debug!(?plan)` line behind the same gate so the Debug
413 /// formatter can't run unconditionally either.
414 ///
415 /// Mission D9 — plan cache: on the hot path we canonicalise the query
416 /// text (lex + FNV-1a hash with literal values stripped), check the
417 /// cache, and on a hit substitute the new literals into a clone of the
418 /// cached plan. This skips re-lexing, re-parsing, and re-planning —
419 /// around 3μs per call on bench workloads. On a miss we plan as before
420 /// and insert the plan under its canonical hash.
421 pub fn execute_powql(&mut self, input: &str) -> Result<QueryResult, QueryError> {
422 // WS2: each *outermost* statement starts with the full memory
423 // allowance. The guard holds the reentrancy depth so a nested
424 // `execute_powql` (e.g. a view's source query) does not reset the
425 // outer frame's accounting mid-statement.
426 let _budget = self.enter_memory_budget();
427 // Hot path: tracing disabled. Zero syscalls, zero formatting.
428 if !tracing::enabled!(Level::INFO) {
429 // D9: try the plan cache first. Canonicalisation lexes the
430 // query once; on a hit we skip the parser and planner entirely.
431 if let Ok((hash, literals)) = canonicalize(input) {
432 let cached = self
433 .plan_cache
434 .lock()
435 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
436 .get_with_substitution(hash, &literals);
437 if let Some(plan) = cached {
438 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
439 let result = self.execute_plan(&plan);
440 // Mission B (post-review): statement-boundary WAL
441 // group commit. Catalog::wal_log now only appends;
442 // the fsync happens here exactly once per statement.
443 // `sync_wal` is a no-op when nothing was buffered
444 // (pure reads pay zero fsync).
445 if !self.in_transaction {
446 self.catalog
447 .sync_wal()
448 .map_err(|e| QueryError::StorageError(e.to_string()))?;
449 }
450 return result;
451 }
452 // Miss — plan, insert, execute.
453 return match planner::plan(input) {
454 Ok(plan) => {
455 self.plan_cache
456 .lock()
457 .map_err(|e| {
458 QueryError::Execution(format!("plan cache lock poisoned: {e}"))
459 })?
460 .insert(hash, plan.clone());
461 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
462 let result = self.execute_plan(&plan);
463 if !self.in_transaction {
464 self.catalog
465 .sync_wal()
466 .map_err(|e| QueryError::StorageError(e.to_string()))?;
467 }
468 result
469 }
470 Err(e) => Err(QueryError::Parse(e.to_string())),
471 };
472 }
473 // Lex error — fall through to the planner so the caller gets a
474 // consistent error shape.
475 return match planner::plan(input) {
476 Ok(plan) => {
477 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
478 let result = self.execute_plan(&plan);
479 if !self.in_transaction {
480 self.catalog
481 .sync_wal()
482 .map_err(|e| QueryError::StorageError(e.to_string()))?;
483 }
484 result
485 }
486 Err(e) => Err(QueryError::Parse(e.to_string())),
487 };
488 }
489
490 // Instrumented path — only taken under explicit tracing subscribers.
491 let total_start = Instant::now();
492 let plan_start = Instant::now();
493 let plan = planner::plan(input).map_err(|e| {
494 let msg = e.to_string();
495 error!(query = %input, error = %msg, "query plan failed");
496 QueryError::Parse(msg)
497 })?;
498 let plan_us = plan_start.elapsed().as_micros();
499
500 let exec_start = Instant::now();
501 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
502 let result = self.execute_plan(&plan);
503 if !self.in_transaction {
504 self.catalog
505 .sync_wal()
506 .map_err(|e| QueryError::StorageError(e.to_string()))?;
507 }
508 let exec_us = exec_start.elapsed().as_micros();
509
510 let total_us = total_start.elapsed().as_micros();
511 match &result {
512 Ok(r) => {
513 info!(
514 query = %input,
515 plan_us = plan_us,
516 exec_us = exec_us,
517 total_us = total_us,
518 rows = r.row_count(),
519 "query ok"
520 );
521 }
522 Err(e) => {
523 error!(
524 query = %input,
525 plan_us = plan_us,
526 exec_us = exec_us,
527 error = %e,
528 "query failed"
529 );
530 }
531 }
532 result
533 }
534
535 /// Plan cache stats — useful for benches and debugging.
536 pub fn plan_cache_stats(&self) -> (u64, u64, usize) {
537 let cache = self.plan_cache.lock().unwrap_or_else(|e| e.into_inner());
538 (cache.hits, cache.misses, cache.len())
539 }
540
541 /// Mission infra-1: read-only entry point.
542 ///
543 /// Parses + plans + executes a PowQL query using only a shared borrow
544 /// on the engine. Rejects any statement that would mutate state
545 /// (Insert/Update/Delete/CreateTable/AlterTable/DropTable/CreateView/
546 /// RefreshView/DropView) by returning [`READONLY_NEEDS_WRITE`] so the
547 /// caller can escalate to the write lock.
548 ///
549 /// Also returns [`READONLY_NEEDS_WRITE`] if a materialized view in the
550 /// query is dirty — refreshing one requires `&mut self`, so the caller
551 /// must retake the write lock for the first refresh.
552 ///
553 /// This method is the concurrent-read fast path behind
554 /// `Arc<RwLock<Engine>>`: multiple threads can call it simultaneously
555 /// under a shared `.read()` lock and each will scan independently.
556 pub fn execute_powql_readonly(&self, input: &str) -> Result<QueryResult, QueryError> {
557 // WS2: each *outermost* statement starts with the full memory
558 // allowance. The guard holds the reentrancy depth so a nested
559 // `execute_powql*` does not reset the outer frame's accounting.
560 let _budget = self.enter_memory_budget();
561 // Parse the statement first so we can classify read vs. write
562 // without touching the catalog. This is the same lex+parse cost
563 // the hot path would pay anyway.
564 let stmt = crate::parser::parse(input).map_err(|e| QueryError::Parse(e.to_string()))?;
565 if !is_read_only_statement(&stmt) {
566 return Err(QueryError::ReadonlyNeedsWrite);
567 }
568
569 // Try the plan cache first — identical hash scheme to
570 // `execute_powql` so both paths share cache state. The mutex
571 // section is just a hashmap lookup + plan clone.
572 if let Ok((hash, literals)) = canonicalize(input) {
573 let cached = self
574 .plan_cache
575 .lock()
576 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
577 .get_with_substitution(hash, &literals);
578 if let Some(plan) = cached {
579 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
580 return self.execute_plan_readonly(&plan);
581 }
582 // Miss: plan + insert + execute. The planner is pure, so this
583 // is safe from `&self`.
584 let plan = crate::planner::plan_statement(stmt)
585 .map_err(|e| QueryError::Parse(e.to_string()))?;
586 self.plan_cache
587 .lock()
588 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
589 .insert(hash, plan.clone());
590 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
591 return self.execute_plan_readonly(&plan);
592 }
593 // Lex error — fall through to the planner for a consistent error
594 // shape (though `parse` above would usually have caught it).
595 let plan =
596 crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
597 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
598 self.execute_plan_readonly(&plan)
599 }
600
601 /// Read-only version of [`Engine::execute_plan`]. Dispatches the
602 /// read-path plan variants by calling `&self` helpers and errors with
603 /// [`READONLY_NEEDS_WRITE`] on any write variant. This is the
604 /// recursion target for composite read plans under the RwLock reader.
605 ///
606 /// The dispatch mirrors `execute_plan` for the read branches but does
607 /// not carry any of the fast-paths that need `&mut self` (e.g. plan-
608 /// cache mutation on inner subqueries is handled via the shared mutex
609 /// in [`Engine::execute_powql_readonly`]; in-flight subquery
610 /// materialisation uses [`Engine::materialize_subqueries_readonly`]).
611 fn execute_plan_readonly(&self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
612 match plan {
613 PlanNode::SeqScan { table } => {
614 // Dirty view means we'd need to refresh it — can't do that
615 // under `&self`. Escalate to the write path.
616 if self.view_registry.is_dirty(table) {
617 return Err(QueryError::ReadonlyNeedsWrite);
618 }
619 let schema = self
620 .catalog
621 .schema(table)
622 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
623 .clone();
624 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
625 let rows: Vec<Vec<Value>> = self
626 .catalog
627 .scan(table)
628 .map_err(|e| e.to_string())?
629 .map(|(_, row)| row)
630 .collect();
631 Ok(QueryResult::Rows { columns, rows })
632 }
633
634 PlanNode::AliasScan { table, alias } => {
635 let schema = self
636 .catalog
637 .schema(table)
638 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
639 .clone();
640 let columns: Vec<String> = schema
641 .columns
642 .iter()
643 .map(|c| format!("{alias}.{}", c.name))
644 .collect();
645 let rows: Vec<Vec<Value>> = self
646 .catalog
647 .scan(table)
648 .map_err(|e| e.to_string())?
649 .map(|(_, row)| row)
650 .collect();
651 Ok(QueryResult::Rows { columns, rows })
652 }
653
654 PlanNode::IndexScan { table, column, key } => {
655 let schema = self
656 .catalog
657 .schema(table)
658 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
659 .clone();
660 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
661 let key_value = literal_to_value(key)?;
662 let tbl = self
663 .catalog
664 .get_table(table)
665 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
666
667 if tbl.has_index(column) {
668 // Use index_lookup_all to handle both unique and
669 // non-unique indexes — returns all matching RowIds.
670 let rids = tbl.index_lookup_all(column, &key_value);
671 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
672 for rid in rids {
673 if let Some(data) = tbl.heap.get(rid) {
674 rows.push(decode_row(&tbl.schema, &data));
675 }
676 }
677 return Ok(QueryResult::Rows { columns, rows });
678 }
679
680 // No index: synthetic eq predicate + compiled scan.
681 let fast = FastLayout::new(&schema);
682 let synth_pred = Expr::BinaryOp(
683 Box::new(Expr::Field(column.clone())),
684 BinOp::Eq,
685 Box::new(key.clone()),
686 );
687 if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, &schema) {
688 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
689 self.catalog
690 .for_each_row_raw(table, |_rid, data| {
691 if compiled(data) {
692 rows.push(decode_row(&schema, data));
693 }
694 })
695 .map_err(|e| QueryError::StorageError(e.to_string()))?;
696 return Ok(QueryResult::Rows { columns, rows });
697 }
698
699 // Last resort: slow eq-check.
700 let col_idx =
701 schema
702 .column_index(column)
703 .ok_or_else(|| QueryError::ColumnNotFound {
704 table: String::new(),
705 column: column.clone(),
706 })?;
707 let rows: Vec<Vec<Value>> = tbl
708 .scan()
709 .filter_map(|(_, row)| {
710 if row[col_idx] == key_value {
711 Some(row)
712 } else {
713 None
714 }
715 })
716 .collect();
717 Ok(QueryResult::Rows { columns, rows })
718 }
719
720 PlanNode::RangeScan {
721 table,
722 column,
723 start,
724 end,
725 } => {
726 let tbl = self
727 .catalog
728 .get_table(table)
729 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
730 let columns: Vec<String> =
731 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
732 let schema = tbl.schema.clone();
733
734 let start_val = match start {
735 Some((expr, _)) => Some(literal_to_value(expr)?),
736 None => None,
737 };
738 let end_val = match end {
739 Some((expr, _)) => Some(literal_to_value(expr)?),
740 None => None,
741 };
742 let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
743 let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
744
745 // Range scans only use the btree fast path for unique indexes.
746 // Non-unique indexes store composite keys that don't compare
747 // directly against raw column values.
748 if tbl.is_index_unique(column) == Some(true) {
749 if let Some(btree) = tbl.index(column) {
750 let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
751 (Some(s), Some(e)) => btree.range(s, e).collect(),
752 (Some(s), None) => btree.range_from(s),
753 (None, Some(e)) => btree.range_to(e),
754 (None, None) => {
755 // Unbounded both sides — equivalent to seq scan.
756 let rows: Vec<Vec<Value>> =
757 tbl.scan().map(|(_, row)| row).collect();
758 return Ok(QueryResult::Rows { columns, rows });
759 }
760 };
761 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
762 for (key, rid) in hits {
763 // Filter for exclusive bounds.
764 if !start_inclusive {
765 if let Some(ref s) = start_val {
766 if &key == s {
767 continue;
768 }
769 }
770 }
771 if !end_inclusive {
772 if let Some(ref e) = end_val {
773 if &key == e {
774 continue;
775 }
776 }
777 }
778 if let Some(data) = tbl.heap.get(rid) {
779 rows.push(decode_row(&schema, &data));
780 }
781 }
782 return Ok(QueryResult::Rows { columns, rows });
783 }
784 }
785
786 // Fallback: no index — synthesize the range predicate and scan.
787 let fast = FastLayout::new(&schema);
788 let synth = synthesize_range_predicate(column, start, end);
789 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, &schema) {
790 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
791 self.catalog
792 .for_each_row_raw(table, |_rid, data| {
793 if compiled(data) {
794 rows.push(decode_row(&schema, data));
795 }
796 })
797 .map_err(|e| QueryError::StorageError(e.to_string()))?;
798 return Ok(QueryResult::Rows { columns, rows });
799 }
800
801 // Last resort: decoded row eval.
802 let col_idx =
803 schema
804 .column_index(column)
805 .ok_or_else(|| QueryError::ColumnNotFound {
806 table: String::new(),
807 column: column.clone(),
808 })?;
809 let rows: Vec<Vec<Value>> = tbl
810 .scan()
811 .filter(|(_, row)| {
812 range_matches(
813 &row[col_idx],
814 &start_val,
815 start_inclusive,
816 &end_val,
817 end_inclusive,
818 )
819 })
820 .map(|(_, row)| row)
821 .collect();
822 Ok(QueryResult::Rows { columns, rows })
823 }
824
825 PlanNode::Filter { input, predicate } => {
826 // Materialise subqueries using the `&self` variant.
827 // Uncorrelated subqueries are replaced with InList/Bool;
828 // correlated ones are left as InSubquery/ExistsSubquery
829 // for per-row materialisation below.
830 let materialized;
831 let predicate = if contains_subquery(predicate) {
832 materialized = self.materialize_subqueries_readonly(predicate)?;
833 &materialized
834 } else {
835 predicate
836 };
837
838 // Correlated subquery path: per-row materialisation.
839 if contains_subquery(predicate) {
840 let result = self.execute_plan_readonly(input)?;
841 return match result {
842 QueryResult::Rows { columns, rows } => {
843 let mut filtered = Vec::new();
844 for row in rows {
845 let row_pred = self.materialize_correlated_for_row_readonly(
846 predicate, &row, &columns,
847 )?;
848 if eval_predicate(&row_pred, &row, &columns) {
849 filtered.push(row);
850 }
851 }
852 Ok(QueryResult::Rows {
853 columns,
854 rows: filtered,
855 })
856 }
857 _ => Err("filter requires row input".into()),
858 };
859 }
860
861 // Fused Filter+SeqScan fast path.
862 if let PlanNode::SeqScan { table } = input.as_ref() {
863 if self.view_registry.is_dirty(table) {
864 return Err(QueryError::ReadonlyNeedsWrite);
865 }
866 let schema = self
867 .catalog
868 .schema(table)
869 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
870 .clone();
871 let columns: Vec<String> =
872 schema.columns.iter().map(|c| c.name.clone()).collect();
873 let fast = FastLayout::new(&schema);
874 let row_layout = RowLayout::new(&schema);
875 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
876
877 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
878 self.catalog
879 .for_each_row_raw(table, |_rid, data| {
880 if compiled(data) {
881 rows.push(decode_row(&schema, data));
882 }
883 })
884 .map_err(|e| QueryError::StorageError(e.to_string()))?;
885 } else {
886 let pred_cols = predicate_column_indices(predicate, &columns);
887 self.catalog
888 .for_each_row_raw(table, |_rid, data| {
889 let pred_row =
890 decode_selective(&schema, &row_layout, data, &pred_cols);
891 if eval_predicate(predicate, &pred_row, &columns) {
892 rows.push(decode_row(&schema, data));
893 }
894 })
895 .map_err(|e| QueryError::StorageError(e.to_string()))?;
896 }
897
898 return Ok(QueryResult::Rows { columns, rows });
899 }
900
901 // General path.
902 let result = self.execute_plan_readonly(input)?;
903 match result {
904 QueryResult::Rows { columns, rows } => {
905 let filtered: Vec<Vec<Value>> = rows
906 .into_iter()
907 .filter(|row| eval_predicate(predicate, row, &columns))
908 .collect();
909 Ok(QueryResult::Rows {
910 columns,
911 rows: filtered,
912 })
913 }
914 _ => Err("filter requires row input".into()),
915 }
916 }
917
918 PlanNode::Project { input, fields } => {
919 // Fast path: Project over IndexScan. Avoids full-row decode
920 // by calling decode_column only for projected fields.
921 if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
922 let key_value = literal_to_value(key)?;
923 let tbl = self
924 .catalog
925 .get_table(table)
926 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
927 let schema = &tbl.schema;
928 let layout = tbl.row_layout();
929
930 let proj_columns: Vec<String> = fields
931 .iter()
932 .map(|f| {
933 f.alias.clone().unwrap_or_else(|| match &f.expr {
934 Expr::Field(name) => name.clone(),
935 _ => "?".into(),
936 })
937 })
938 .collect();
939
940 let proj_indices: Vec<usize> = fields
941 .iter()
942 .filter_map(|f| {
943 if let Expr::Field(name) = &f.expr {
944 schema.column_index(name)
945 } else {
946 None
947 }
948 })
949 .collect();
950
951 if tbl.has_index(column) {
952 let rids = tbl.index_lookup_all(column, &key_value);
953 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
954 for rid in rids {
955 if let Some(data) = tbl.heap.get(rid) {
956 let row: Vec<Value> = proj_indices
957 .iter()
958 .map(|&ci| decode_column(schema, layout, &data, ci))
959 .collect();
960 rows.push(row);
961 }
962 }
963 return Ok(QueryResult::Rows {
964 columns: proj_columns,
965 rows,
966 });
967 }
968 }
969
970 // Fast paths over Limit(Sort(...)) / Limit(Filter(...)) / Limit(SeqScan).
971 if let PlanNode::Limit {
972 input: inner,
973 count: limit_expr,
974 } = input.as_ref()
975 {
976 if let PlanNode::Sort {
977 input: sort_input,
978 keys,
979 } = inner.as_ref()
980 {
981 if keys.len() == 1 {
982 let sort_field = &keys[0].field;
983 let descending = keys[0].descending;
984 let limit = match limit_expr {
985 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
986 _ => usize::MAX,
987 };
988 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
989 match sort_input.as_ref() {
990 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
991 PlanNode::Filter {
992 input: fi,
993 predicate,
994 } => {
995 if let PlanNode::SeqScan { table } = fi.as_ref() {
996 (Some(table.as_str()), Some(predicate))
997 } else {
998 (None, None)
999 }
1000 }
1001 _ => (None, None),
1002 };
1003 if let Some(table) = table_opt {
1004 if let Some(result) = self.project_filter_sort_limit_fast(
1005 table, fields, sort_field, descending, limit, pred_opt,
1006 )? {
1007 return Ok(result);
1008 }
1009 }
1010 }
1011 }
1012 if let PlanNode::Filter {
1013 input: fi,
1014 predicate,
1015 } = inner.as_ref()
1016 {
1017 if let PlanNode::SeqScan { table } = fi.as_ref() {
1018 let limit = match limit_expr {
1019 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1020 _ => usize::MAX,
1021 };
1022 if let Some(result) = self.project_filter_limit_fast(
1023 table,
1024 fields,
1025 limit,
1026 Some(predicate),
1027 )? {
1028 return Ok(result);
1029 }
1030 }
1031 }
1032 if let PlanNode::SeqScan { table } = inner.as_ref() {
1033 let limit = match limit_expr {
1034 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1035 _ => usize::MAX,
1036 };
1037 if let Some(result) =
1038 self.project_filter_limit_fast(table, fields, limit, None)?
1039 {
1040 return Ok(result);
1041 }
1042 }
1043 }
1044
1045 // Project(Filter(SeqScan)) without Limit.
1046 if let PlanNode::Filter {
1047 input: fi,
1048 predicate,
1049 } = input.as_ref()
1050 {
1051 if let PlanNode::SeqScan { table } = fi.as_ref() {
1052 if let Some(result) = self.project_filter_limit_fast(
1053 table,
1054 fields,
1055 usize::MAX,
1056 Some(predicate),
1057 )? {
1058 return Ok(result);
1059 }
1060 }
1061 }
1062
1063 // Project(SeqScan) without Filter or Limit.
1064 if let PlanNode::SeqScan { table } = input.as_ref() {
1065 if let Some(result) =
1066 self.project_filter_limit_fast(table, fields, usize::MAX, None)?
1067 {
1068 return Ok(result);
1069 }
1070 }
1071
1072 // Generic path.
1073 let result = self.execute_plan_readonly(input)?;
1074 match result {
1075 QueryResult::Rows { columns, rows } => {
1076 let proj_columns: Vec<String> = fields
1077 .iter()
1078 .map(|f| {
1079 f.alias.clone().unwrap_or_else(|| match &f.expr {
1080 Expr::Field(name) => name.clone(),
1081 Expr::QualifiedField { qualifier, field } => {
1082 format!("{qualifier}.{field}")
1083 }
1084 _ => "?".into(),
1085 })
1086 })
1087 .collect();
1088 let proj_rows: Vec<Vec<Value>> = rows
1089 .iter()
1090 .map(|row| {
1091 fields
1092 .iter()
1093 .map(|f| eval_expr(&f.expr, row, &columns))
1094 .collect()
1095 })
1096 .collect();
1097 Ok(QueryResult::Rows {
1098 columns: proj_columns,
1099 rows: proj_rows,
1100 })
1101 }
1102 _ => Err("project requires row input".into()),
1103 }
1104 }
1105
1106 PlanNode::Sort { input, keys } => {
1107 let result = self.execute_plan_readonly(input)?;
1108 match result {
1109 QueryResult::Rows { columns, mut rows } => {
1110 if rows.len() > MAX_SORT_ROWS {
1111 return Err(QueryError::SortLimitExceeded);
1112 }
1113 // WS2: byte-budget guard on the sort buffer.
1114 self.charge_rows(&rows)?;
1115 let key_indices: Vec<(usize, bool)> = keys
1116 .iter()
1117 .map(|k| {
1118 columns
1119 .iter()
1120 .position(|c| c == &k.field)
1121 .map(|idx| (idx, k.descending))
1122 .ok_or_else(|| QueryError::ColumnNotFound {
1123 table: String::new(),
1124 column: k.field.clone(),
1125 })
1126 })
1127 .collect::<Result<_, QueryError>>()?;
1128 rows.sort_by(|a, b| {
1129 for &(col_idx, descending) in &key_indices {
1130 let cmp = a[col_idx].cmp(&b[col_idx]);
1131 let cmp = if descending { cmp.reverse() } else { cmp };
1132 if cmp != std::cmp::Ordering::Equal {
1133 return cmp;
1134 }
1135 }
1136 std::cmp::Ordering::Equal
1137 });
1138 Ok(QueryResult::Rows { columns, rows })
1139 }
1140 _ => Err("sort requires row input".into()),
1141 }
1142 }
1143
1144 PlanNode::Limit { input, count } => {
1145 let result = self.execute_plan_readonly(input)?;
1146 let n = match count {
1147 Expr::Literal(Literal::Int(v)) => *v as usize,
1148 _ => return Err("limit must be integer literal".into()),
1149 };
1150 match result {
1151 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1152 columns,
1153 rows: rows.into_iter().take(n).collect(),
1154 }),
1155 _ => Err("limit requires row input".into()),
1156 }
1157 }
1158
1159 PlanNode::Offset { input, count } => {
1160 let result = self.execute_plan_readonly(input)?;
1161 let n = match count {
1162 Expr::Literal(Literal::Int(v)) => *v as usize,
1163 _ => return Err("offset must be integer literal".into()),
1164 };
1165 match result {
1166 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1167 columns,
1168 rows: rows.into_iter().skip(n).collect(),
1169 }),
1170 _ => Err("offset requires row input".into()),
1171 }
1172 }
1173
1174 PlanNode::Aggregate {
1175 input,
1176 function,
1177 field,
1178 } => {
1179 // Fast path: count() over SeqScan.
1180 if *function == AggFunc::Count {
1181 if let PlanNode::SeqScan { table } = input.as_ref() {
1182 // A dirty materialized view must be refreshed before
1183 // it can be counted, which needs `&mut self`. Escalate
1184 // to the write path (F3: count(View) returned stale).
1185 if self.view_registry.is_dirty(table) {
1186 return Err(QueryError::ReadonlyNeedsWrite);
1187 }
1188 let mut count: i64 = 0;
1189 self.catalog
1190 .for_each_row_raw(table, |_rid, _data| {
1191 count += 1;
1192 })
1193 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1194 return Ok(QueryResult::Scalar(Value::Int(count)));
1195 }
1196 if let PlanNode::Filter {
1197 input: inner,
1198 predicate,
1199 } = input.as_ref()
1200 {
1201 // Only take the fast path for a plain Filter(SeqScan)
1202 // with no subquery in the predicate. A subquery
1203 // predicate (`count(T filter .x in (...))`) must be
1204 // resolved first; the fast path evaluates the raw
1205 // predicate with no subquery materialisation, which
1206 // silently yields 0 (F1). Falling through routes it to
1207 // the generic path that runs the subquery correctly.
1208 if let PlanNode::SeqScan { table } = inner.as_ref() {
1209 if self.view_registry.is_dirty(table) {
1210 // F3: count(View filter ...) over a dirty view.
1211 return Err(QueryError::ReadonlyNeedsWrite);
1212 }
1213 }
1214 if let (PlanNode::SeqScan { table }, false) =
1215 (inner.as_ref(), contains_subquery(predicate))
1216 {
1217 let schema = self
1218 .catalog
1219 .schema(table)
1220 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
1221 .clone();
1222 let columns: Vec<String> =
1223 schema.columns.iter().map(|c| c.name.clone()).collect();
1224 let fast = FastLayout::new(&schema);
1225 let row_layout = RowLayout::new(&schema);
1226
1227 if let Some(compiled) =
1228 compile_predicate(predicate, &columns, &fast, &schema)
1229 {
1230 let mut count: i64 = 0;
1231 self.catalog
1232 .for_each_row_raw(table, |_rid, data| {
1233 if compiled(data) {
1234 count += 1;
1235 }
1236 })
1237 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1238 return Ok(QueryResult::Scalar(Value::Int(count)));
1239 }
1240
1241 let pred_cols = predicate_column_indices(predicate, &columns);
1242 let mut count: i64 = 0;
1243 self.catalog
1244 .for_each_row_raw(table, |_rid, data| {
1245 let pred_row =
1246 decode_selective(&schema, &row_layout, data, &pred_cols);
1247 if eval_predicate(predicate, &pred_row, &columns) {
1248 count += 1;
1249 }
1250 })
1251 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1252 return Ok(QueryResult::Scalar(Value::Int(count)));
1253 }
1254 }
1255 }
1256
1257 // Fast path: sum/avg/min/max over single fixed-size numeric.
1258 if matches!(
1259 function,
1260 AggFunc::Sum
1261 | AggFunc::Avg
1262 | AggFunc::Min
1263 | AggFunc::Max
1264 | AggFunc::CountDistinct
1265 ) {
1266 if let Some(col) = field.as_ref() {
1267 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
1268 match input.as_ref() {
1269 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
1270 PlanNode::Filter {
1271 input: inner,
1272 predicate,
1273 } => {
1274 if let PlanNode::SeqScan { table } = inner.as_ref() {
1275 (Some(table.as_str()), Some(predicate))
1276 } else {
1277 (None, None)
1278 }
1279 }
1280 _ => (None, None),
1281 };
1282 if let Some(table) = table_opt {
1283 if let Some(result) =
1284 self.agg_single_col_fast(table, col, *function, pred_opt)?
1285 {
1286 return Ok(result);
1287 }
1288 }
1289 }
1290 }
1291
1292 // Generic path.
1293 let result = self.execute_plan_readonly(input)?;
1294 match result {
1295 QueryResult::Rows { columns, rows } => match function {
1296 AggFunc::Count => Ok(QueryResult::Scalar(Value::Int(rows.len() as i64))),
1297 AggFunc::CountDistinct => {
1298 let col = field.as_ref().ok_or("count distinct requires field")?;
1299 let idx = columns
1300 .iter()
1301 .position(|c| c == col)
1302 .ok_or("col not found")?;
1303 let mut seen = std::collections::HashSet::new();
1304 for row in &rows {
1305 let v = &row[idx];
1306 if !v.is_empty() {
1307 seen.insert(v.clone());
1308 }
1309 }
1310 Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
1311 }
1312 AggFunc::Avg => {
1313 let col = field.as_ref().ok_or("avg requires field")?;
1314 let idx = columns
1315 .iter()
1316 .position(|c| c == col)
1317 .ok_or("col not found")?;
1318 let sum: f64 = rows
1319 .iter()
1320 .filter_map(|r| match &r[idx] {
1321 Value::Int(v) => Some(*v as f64),
1322 Value::Float(v) => Some(*v),
1323 _ => None,
1324 })
1325 .sum();
1326 let count = rows.len() as f64;
1327 Ok(QueryResult::Scalar(Value::Float(sum / count)))
1328 }
1329 AggFunc::Sum => {
1330 let col = field.as_ref().ok_or("sum requires field")?;
1331 let idx = columns
1332 .iter()
1333 .position(|c| c == col)
1334 .ok_or("col not found")?;
1335 let mut int_sum: i64 = 0;
1336 let mut float_sum: f64 = 0.0;
1337 let mut saw_float = false;
1338 for r in &rows {
1339 match &r[idx] {
1340 Value::Int(v) => int_sum += *v,
1341 Value::Float(v) => {
1342 float_sum += *v;
1343 saw_float = true;
1344 }
1345 _ => {}
1346 }
1347 }
1348 let result = if saw_float {
1349 Value::Float(float_sum + int_sum as f64)
1350 } else {
1351 Value::Int(int_sum)
1352 };
1353 Ok(QueryResult::Scalar(result))
1354 }
1355 AggFunc::Min | AggFunc::Max => {
1356 let col = field.as_ref().ok_or("min/max requires field")?;
1357 let idx = columns
1358 .iter()
1359 .position(|c| c == col)
1360 .ok_or("col not found")?;
1361 let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
1362 let result = if *function == AggFunc::Min {
1363 vals.into_iter().min().cloned()
1364 } else {
1365 vals.into_iter().max().cloned()
1366 };
1367 Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
1368 }
1369 },
1370 _ => Err("aggregate requires row input".into()),
1371 }
1372 }
1373
1374 PlanNode::Distinct { input } => {
1375 let result = self.execute_plan_readonly(input)?;
1376 match result {
1377 QueryResult::Rows { columns, rows } => {
1378 let mut seen = std::collections::HashSet::new();
1379 let mut unique_rows = Vec::new();
1380 for row in rows {
1381 if seen.insert(row.clone()) {
1382 unique_rows.push(row);
1383 }
1384 }
1385 Ok(QueryResult::Rows {
1386 columns,
1387 rows: unique_rows,
1388 })
1389 }
1390 other => Ok(other),
1391 }
1392 }
1393
1394 PlanNode::GroupBy {
1395 input,
1396 keys,
1397 aggregates,
1398 having,
1399 } => {
1400 let result = self.execute_plan_readonly(input)?;
1401 match result {
1402 QueryResult::Rows { columns, rows } => {
1403 // WS2: byte-budget guard on the GROUP BY input buffer
1404 // (the hash table is bounded by the input it groups).
1405 self.charge_rows(&rows)?;
1406 let key_indices: Vec<usize> = keys
1407 .iter()
1408 .map(|k| {
1409 columns.iter().position(|c| c == k).ok_or_else(|| {
1410 QueryError::ColumnNotFound {
1411 table: String::new(),
1412 column: k.clone(),
1413 }
1414 })
1415 })
1416 .collect::<Result<Vec<_>, _>>()?;
1417
1418 let agg_field_indices: Vec<usize> = aggregates
1419 .iter()
1420 .map(|a| {
1421 if a.field == "*" {
1422 Ok(usize::MAX)
1423 } else {
1424 columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1425 QueryError::ColumnNotFound {
1426 table: String::new(),
1427 column: a.field.clone(),
1428 }
1429 })
1430 }
1431 })
1432 .collect::<Result<Vec<_>, _>>()?;
1433
1434 let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1435 rustc_hash::FxHashMap::default();
1436 let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1437 for (ri, row) in rows.iter().enumerate() {
1438 let key: Vec<Value> =
1439 key_indices.iter().map(|&i| row[i].clone()).collect();
1440 match group_map.get(&key) {
1441 Some(&idx) => groups[idx].1.push(ri),
1442 None => {
1443 let idx = groups.len();
1444 group_map.insert(key.clone(), idx);
1445 groups.push((key, vec![ri]));
1446 }
1447 }
1448 }
1449
1450 let mut out_columns: Vec<String> = keys.clone();
1451 for agg in aggregates.iter() {
1452 out_columns.push(agg.output_name.clone());
1453 }
1454
1455 let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1456 for (key_vals, row_indices) in &groups {
1457 let mut row = key_vals.clone();
1458 for (ai, agg) in aggregates.iter().enumerate() {
1459 let col_idx = agg_field_indices[ai];
1460 let val = compute_group_aggregate(
1461 agg.function,
1462 &rows,
1463 row_indices,
1464 col_idx,
1465 );
1466 row.push(val);
1467 }
1468 out_rows.push(row);
1469 }
1470
1471 if let Some(having_expr) = having {
1472 out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1473 }
1474
1475 Ok(QueryResult::Rows {
1476 columns: out_columns,
1477 rows: out_rows,
1478 })
1479 }
1480 _ => Err("group by requires row input".into()),
1481 }
1482 }
1483
1484 PlanNode::NestedLoopJoin {
1485 left,
1486 right,
1487 on,
1488 kind,
1489 } => {
1490 let left_result = self.execute_plan_readonly(left)?;
1491 let right_result = self.execute_plan_readonly(right)?;
1492 let (left_columns, left_rows) = match left_result {
1493 QueryResult::Rows { columns, rows } => (columns, rows),
1494 _ => return Err("join left side must produce rows".into()),
1495 };
1496 let (right_columns, right_rows) = match right_result {
1497 QueryResult::Rows { columns, rows } => (columns, rows),
1498 _ => return Err("join right side must produce rows".into()),
1499 };
1500
1501 // WS2: byte-budget guard on the join build side.
1502 self.charge_rows(&left_rows)?;
1503 self.charge_rows(&right_rows)?;
1504
1505 if !matches!(kind, JoinKind::Cross) {
1506 if let Some(pred) = on {
1507 if let Some((l_idx, r_idx)) =
1508 try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1509 {
1510 let result = hash_join(
1511 left_columns,
1512 left_rows,
1513 right_columns,
1514 right_rows,
1515 l_idx,
1516 r_idx,
1517 *kind,
1518 );
1519 if let QueryResult::Rows { ref rows, .. } = result {
1520 check_join_limit(rows.len())?;
1521 }
1522 return Ok(result);
1523 }
1524 }
1525 }
1526
1527 let n_left = left_columns.len();
1528 let n_right = right_columns.len();
1529 let mut columns = Vec::with_capacity(n_left + n_right);
1530 columns.extend(left_columns);
1531 columns.extend(right_columns);
1532
1533 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1534 let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1535
1536 for left_row in &left_rows {
1537 let mut matched = false;
1538 for right_row in &right_rows {
1539 combined.clear();
1540 combined.extend_from_slice(left_row);
1541 combined.extend_from_slice(right_row);
1542 let keep = match kind {
1543 JoinKind::Cross => true,
1544 JoinKind::Inner | JoinKind::LeftOuter => match on {
1545 Some(pred) => eval_predicate(pred, &combined, &columns),
1546 None => true,
1547 },
1548 JoinKind::RightOuter => {
1549 unreachable!("planner rewrites RightOuter to LeftOuter")
1550 }
1551 };
1552 if keep {
1553 rows.push(combined.clone());
1554 check_join_limit(rows.len())?;
1555 matched = true;
1556 }
1557 }
1558 if !matched && matches!(kind, JoinKind::LeftOuter) {
1559 let mut row = Vec::with_capacity(n_left + n_right);
1560 row.extend_from_slice(left_row);
1561 row.resize(n_left + n_right, Value::Empty);
1562 rows.push(row);
1563 check_join_limit(rows.len())?;
1564 }
1565 }
1566
1567 Ok(QueryResult::Rows { columns, rows })
1568 }
1569
1570 PlanNode::Window { input, windows } => {
1571 let result = self.execute_plan_readonly(input)?;
1572 execute_window(result, windows)
1573 }
1574
1575 PlanNode::Union { left, right, all } => {
1576 let left_result = self.execute_plan_readonly(left)?;
1577 let right_result = self.execute_plan_readonly(right)?;
1578 let (left_cols, left_rows) = match left_result {
1579 QueryResult::Rows { columns, rows } => (columns, rows),
1580 _ => return Err("UNION requires query results on left side".into()),
1581 };
1582 let (_, right_rows) = match right_result {
1583 QueryResult::Rows { columns, rows } => (columns, rows),
1584 _ => return Err("UNION requires query results on right side".into()),
1585 };
1586 let mut combined = left_rows;
1587 if *all {
1588 combined.extend(right_rows);
1589 } else {
1590 let mut seen = std::collections::HashSet::new();
1591 for row in &combined {
1592 seen.insert(row.clone());
1593 }
1594 for row in right_rows {
1595 if seen.insert(row.clone()) {
1596 combined.push(row);
1597 }
1598 }
1599 }
1600 Ok(QueryResult::Rows {
1601 columns: left_cols,
1602 rows: combined,
1603 })
1604 }
1605
1606 PlanNode::Explain { input } => {
1607 let text = format_plan_tree(input, 0);
1608 Ok(QueryResult::Rows {
1609 columns: vec!["plan".to_string()],
1610 rows: text
1611 .lines()
1612 .map(|line| vec![Value::Str(line.to_string())])
1613 .collect(),
1614 })
1615 }
1616
1617 // All write variants — caller must escalate to the write lock.
1618 PlanNode::Insert { .. }
1619 | PlanNode::Update { .. }
1620 | PlanNode::Delete { .. }
1621 | PlanNode::Upsert { .. }
1622 | PlanNode::CreateTable { .. }
1623 | PlanNode::AlterTable { .. }
1624 | PlanNode::DropTable { .. }
1625 | PlanNode::CreateView { .. }
1626 | PlanNode::RefreshView { .. }
1627 | PlanNode::DropView { .. }
1628 | PlanNode::Begin
1629 | PlanNode::Commit
1630 | PlanNode::Rollback => Err(QueryError::ReadonlyNeedsWrite),
1631 }
1632 }
1633
1634 /// `&self` variant of [`Engine::materialize_subqueries`]. Used by the
1635 /// read path so `Filter` predicates with `InSubquery`/`ExistsSubquery`
1636 /// children can evaluate their inner queries without taking the write
1637 /// lock. Inner queries that would themselves need a write (e.g. dirty
1638 /// view) escalate via [`READONLY_NEEDS_WRITE`] just like the top-level
1639 /// read path does.
1640 fn materialize_subqueries_readonly(&self, expr: &Expr) -> Result<Expr, QueryError> {
1641 match expr {
1642 Expr::InSubquery {
1643 expr: inner,
1644 subquery,
1645 negated,
1646 } => {
1647 if is_correlated_subquery(subquery, &self.catalog) {
1648 // Pass through — will be materialized per-row in the
1649 // Filter handler's correlated subquery path.
1650 let inner = self.materialize_subqueries_readonly(inner)?;
1651 return Ok(Expr::InSubquery {
1652 expr: Box::new(inner),
1653 subquery: subquery.clone(),
1654 negated: *negated,
1655 });
1656 }
1657 let inner = self.materialize_subqueries_readonly(inner)?;
1658 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1659 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1660 let result = self.execute_plan_readonly(&sub_plan)?;
1661 let values = match result {
1662 QueryResult::Rows { rows, .. } => rows
1663 .into_iter()
1664 .filter_map(|mut row| {
1665 if row.is_empty() {
1666 None
1667 } else {
1668 Some(value_to_expr(row.swap_remove(0)))
1669 }
1670 })
1671 .collect(),
1672 _ => Vec::new(),
1673 };
1674 // WS2: byte-budget guard on the materialized IN-list.
1675 self.charge_in_list(&values)?;
1676 Ok(Expr::InList {
1677 expr: Box::new(inner),
1678 list: values,
1679 negated: *negated,
1680 })
1681 }
1682 Expr::ExistsSubquery { subquery, negated } => {
1683 if is_correlated_subquery(subquery, &self.catalog) {
1684 return Ok(expr.clone());
1685 }
1686 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1687 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1688 let result = self.execute_plan_readonly(&sub_plan)?;
1689 let has_rows = match result {
1690 QueryResult::Rows { rows, .. } => !rows.is_empty(),
1691 _ => false,
1692 };
1693 let truth = if *negated { !has_rows } else { has_rows };
1694 Ok(Expr::Literal(Literal::Bool(truth)))
1695 }
1696 Expr::BinaryOp(l, op, r) => {
1697 let l = self.materialize_subqueries_readonly(l)?;
1698 let r = self.materialize_subqueries_readonly(r)?;
1699 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1700 }
1701 Expr::UnaryOp(op, inner) => {
1702 let inner = self.materialize_subqueries_readonly(inner)?;
1703 Ok(Expr::UnaryOp(*op, Box::new(inner)))
1704 }
1705 Expr::Case { whens, else_expr } => {
1706 let whens = whens
1707 .iter()
1708 .map(|(c, r)| {
1709 let c = self.materialize_subqueries_readonly(c)?;
1710 let r = self.materialize_subqueries_readonly(r)?;
1711 Ok((Box::new(c), Box::new(r)))
1712 })
1713 .collect::<Result<Vec<_>, QueryError>>()?;
1714 let else_expr = match else_expr {
1715 Some(e) => Some(Box::new(self.materialize_subqueries_readonly(e)?)),
1716 None => None,
1717 };
1718 Ok(Expr::Case { whens, else_expr })
1719 }
1720 other => Ok(other.clone()),
1721 }
1722 }
1723
1724 /// Per-row materialisation of correlated subqueries. For each row in the
1725 /// outer query, substitute outer column references in the subquery's
1726 /// filter with the current row's literal values, execute the modified
1727 /// subquery, and return the result as an InList or Bool literal.
1728 fn materialize_correlated_for_row_readonly(
1729 &self,
1730 expr: &Expr,
1731 outer_row: &[Value],
1732 outer_columns: &[String],
1733 ) -> Result<Expr, QueryError> {
1734 match expr {
1735 Expr::InSubquery {
1736 expr: inner,
1737 subquery,
1738 negated,
1739 } => {
1740 let inner =
1741 self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1742 let mut sub = *subquery.clone();
1743 if let Some(ref filter) = sub.filter {
1744 sub.filter = Some(substitute_outer_refs(
1745 filter,
1746 &sub.source,
1747 &self.catalog,
1748 outer_row,
1749 outer_columns,
1750 ));
1751 }
1752 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1753 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1754 let result = self.execute_plan_readonly(&sub_plan)?;
1755 let values = match result {
1756 QueryResult::Rows { rows, .. } => rows
1757 .into_iter()
1758 .filter_map(|mut row| {
1759 if row.is_empty() {
1760 None
1761 } else {
1762 Some(value_to_expr(row.swap_remove(0)))
1763 }
1764 })
1765 .collect(),
1766 _ => Vec::new(),
1767 };
1768 // WS2: byte-budget guard on the per-row materialized IN-list.
1769 self.charge_in_list(&values)?;
1770 Ok(Expr::InList {
1771 expr: Box::new(inner),
1772 list: values,
1773 negated: *negated,
1774 })
1775 }
1776 Expr::ExistsSubquery { subquery, negated } => {
1777 let mut sub = *subquery.clone();
1778 if let Some(ref filter) = sub.filter {
1779 sub.filter = Some(substitute_outer_refs(
1780 filter,
1781 &sub.source,
1782 &self.catalog,
1783 outer_row,
1784 outer_columns,
1785 ));
1786 }
1787 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1788 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1789 let result = self.execute_plan_readonly(&sub_plan)?;
1790 let has_rows = match result {
1791 QueryResult::Rows { rows, .. } => !rows.is_empty(),
1792 _ => false,
1793 };
1794 let truth = if *negated { !has_rows } else { has_rows };
1795 Ok(Expr::Literal(Literal::Bool(truth)))
1796 }
1797 Expr::BinaryOp(l, op, r) => {
1798 let l =
1799 self.materialize_correlated_for_row_readonly(l, outer_row, outer_columns)?;
1800 let r =
1801 self.materialize_correlated_for_row_readonly(r, outer_row, outer_columns)?;
1802 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1803 }
1804 Expr::UnaryOp(op, inner) => {
1805 let inner =
1806 self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1807 Ok(Expr::UnaryOp(*op, Box::new(inner)))
1808 }
1809 other => Ok(other.clone()),
1810 }
1811 }
1812
1813 pub fn catalog(&self) -> &Catalog {
1814 &self.catalog
1815 }
1816
1817 pub fn catalog_mut(&mut self) -> &mut Catalog {
1818 &mut self.catalog
1819 }
1820}