powdb_query/executor/mod.rs
1//! PowDB query executor.
2
3// Submodules that don't use macros defined in this file.
4mod compiled;
5mod eval;
6pub mod mem_budget;
7
8use crate::ast::*;
9use crate::canonicalize::canonicalize;
10use crate::plan::*;
11use crate::plan_cache::PlanCache;
12use crate::planner;
13use crate::result::{QueryError, QueryResult};
14use powdb_storage::catalog::Catalog;
15use powdb_storage::row::{decode_column, decode_row, RowLayout};
16use powdb_storage::types::*;
17use powdb_storage::view::ViewRegistry;
18
19use std::io;
20use std::path::Path;
21use std::sync::Mutex;
22use std::time::Instant;
23use tracing::{error, info, Level};
24
25use self::compiled::*;
26use self::eval::*;
27
28/// Legacy sentinel string constant — kept for backward compatibility with
29/// any external code matching on the string representation. New code should
30/// match on `QueryError::ReadonlyNeedsWrite` directly.
31pub const READONLY_NEEDS_WRITE: &str = "__POWDB_READONLY_NEEDS_WRITE__";
32
33/// Plan cache capacity. Bench workloads fill ~15 slots; real apps will sit
34/// comfortably in 256. Lookup is O(1), collisions clear the cache (see
35/// `plan_cache::PlanCache::insert`).
36const PLAN_CACHE_CAPACITY: usize = 256;
37
38/// Maximum number of rows a join may produce before the executor aborts.
39/// Prevents Cartesian-product blowups (e.g. `T cross join T` on 10K rows
40/// would produce 100M rows in memory without this cap).
41pub(super) const MAX_JOIN_ROWS: usize = 1_000_000;
42
43/// Maximum number of rows that may be materialized for sorting.
44/// Queries that exceed this should add a LIMIT clause to narrow the input
45/// before sorting.
46pub(super) const MAX_SORT_ROWS: usize = 10_000_000;
47
48#[inline]
49pub(super) fn check_join_limit(row_count: usize) -> Result<(), QueryError> {
50 if row_count > MAX_JOIN_ROWS {
51 return Err(QueryError::JoinLimitExceeded);
52 }
53 Ok(())
54}
55
56// ─── Mission D11 Phase 1: scalar hot-loop helpers ─────────────────────────
57//
58// These macros expand into the scan body of `agg_single_col_fast` and sit
59// inside the `for_each_row_raw` closure. They exist to:
60//
61// 1. Split the loop on presence of a predicate *outside* the hot body,
62// so the no-predicate path (agg_sum/agg_min/agg_max bench workloads)
63// never pays the `Option<CompiledPredicate>` branch per row.
64// 2. Drop two bounds checks per row by reading the null bitmap byte
65// and the 8-byte value via raw pointer casts.
66//
67// SAFETY (shared across every call site below):
68//
69// - `$bmp_byte` is `col_idx / 8` where `col_idx < n_cols`, and the row
70// encoding stores `bitmap_size = n_cols.div_ceil(8)` bytes of bitmap
71// starting at offset 2. So `2 + $bmp_byte < 2 + bitmap_size ≤ row_len`
72// and `get_unchecked(2 + $bmp_byte)` is inside the row slice.
73// - `$off = 2 + bitmap_size + fixed_offsets[col_idx]` for a fixed-size
74// column. Every fixed-size column contributes `fixed_size(type_id)`
75// bytes to the fixed region, so the row always has `[$off .. $off+8]`
76// available for any i64/f64 column — enforced by the row encoder
77// (`storage/src/row.rs`) and the schema invariant that a row with a
78// given schema has `row_len ≥ 2 + bitmap_size + fixed_region_size`.
79// - Both macros are only invoked from `agg_single_col_fast`, which
80// early-returns if the column isn't Int/Float (8-byte fixed) and
81// early-returns if `fast.fixed_offsets[col_idx]` is `None`.
82macro_rules! agg_int_loop {
83 (
84 $self:expr, $table:expr, $pred:expr,
85 $bmp_byte:expr, $bmp_bit:expr, $off:expr,
86 |$v:ident : i64| $body:block
87 ) => {{
88 let bmp_byte = $bmp_byte;
89 let bmp_bit = $bmp_bit;
90 let off = $off;
91 if let Some(pred) = &$pred {
92 $self
93 .catalog
94 .for_each_row_raw($table, |_rid, data| {
95 if !pred(data) {
96 return;
97 }
98 // Bounds guard: skip corrupt/truncated rows that are too
99 // short to contain the bitmap byte or the 8-byte value.
100 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
101 return;
102 }
103 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
104 // The bitmap byte lives at offset 2..2+bitmap_size in the
105 // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
106 // Corrupt rows are rejected by the bounds guard.
107 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
108 if (bmp >> bmp_bit) & 1 == 1 {
109 return;
110 }
111 // SAFETY: `off + 8 <= data.len()` is checked above.
112 // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
113 // to an 8-byte i64 in the fixed-size region of the row.
114 // The pointer cast is valid because we read exactly 8
115 // bytes via from_le_bytes. Corrupt rows are rejected by
116 // the bounds guard.
117 let $v: i64 =
118 unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
119 $body
120 })
121 .map_err(|e| QueryError::StorageError(e.to_string()))?;
122 } else {
123 $self
124 .catalog
125 .for_each_row_raw($table, |_rid, data| {
126 // Bounds guard: skip corrupt/truncated rows.
127 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
128 return;
129 }
130 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
131 // See the predicate branch for the full invariant.
132 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
133 if (bmp >> bmp_bit) & 1 == 1 {
134 return;
135 }
136 // SAFETY: `off + 8 <= data.len()` is checked above.
137 // See the predicate branch for the full invariant.
138 let $v: i64 =
139 unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
140 $body
141 })
142 .map_err(|e| QueryError::StorageError(e.to_string()))?;
143 }
144 }};
145}
146
147macro_rules! agg_float_loop {
148 (
149 $self:expr, $table:expr, $pred:expr,
150 $bmp_byte:expr, $bmp_bit:expr, $off:expr,
151 |$v:ident : f64| $body:block
152 ) => {{
153 let bmp_byte = $bmp_byte;
154 let bmp_bit = $bmp_bit;
155 let off = $off;
156 if let Some(pred) = &$pred {
157 $self
158 .catalog
159 .for_each_row_raw($table, |_rid, data| {
160 if !pred(data) {
161 return;
162 }
163 // Bounds guard: skip corrupt/truncated rows that are too
164 // short to contain the bitmap byte or the 8-byte value.
165 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
166 return;
167 }
168 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
169 // The bitmap byte lives at offset 2..2+bitmap_size in the
170 // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
171 // Corrupt rows are rejected by the bounds guard.
172 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
173 if (bmp >> bmp_bit) & 1 == 1 {
174 return;
175 }
176 // SAFETY: `off + 8 <= data.len()` is checked above.
177 // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
178 // to an 8-byte f64 in the fixed-size region of the row.
179 // The pointer cast is valid because we read exactly 8
180 // bytes via from_le_bytes. Corrupt rows are rejected by
181 // the bounds guard.
182 let $v: f64 =
183 unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
184 $body
185 })
186 .map_err(|e| QueryError::StorageError(e.to_string()))?;
187 } else {
188 $self
189 .catalog
190 .for_each_row_raw($table, |_rid, data| {
191 // Bounds guard: skip corrupt/truncated rows.
192 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
193 return;
194 }
195 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
196 // See the predicate branch for the full invariant.
197 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
198 if (bmp >> bmp_bit) & 1 == 1 {
199 return;
200 }
201 // SAFETY: `off + 8 <= data.len()` is checked above.
202 // See the predicate branch for the full invariant.
203 let $v: f64 =
204 unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
205 $body
206 })
207 .map_err(|e| QueryError::StorageError(e.to_string()))?;
208 }
209 }};
210}
211
212// Submodules that use the macros above — must be declared after macro_rules!.
213mod plan_exec;
214mod prepared;
215
216#[cfg(test)]
217mod tests;
218
219// Re-exports for the public API
220pub use self::prepared::PreparedQuery;
221
222use self::plan_exec::{
223 compute_group_aggregate, execute_window, format_plan_tree, hash_join, lower_unindexed_scans,
224 range_matches, synthesize_range_predicate, try_extract_equi_join_keys,
225};
226
227/// Mission infra-1: classify a parsed statement as read-only vs. mutating.
228/// Used by [`Engine::execute_powql_readonly`] and by the server handler
229/// to decide between the RwLock reader and writer sides. `Union` recurses
230/// because each side can independently be read/write (though in practice
231/// both sides are reads — the parser only builds Union from query shapes).
232pub fn is_read_only_statement(stmt: &Statement) -> bool {
233 match stmt {
234 Statement::Query(_) => true,
235 Statement::Union(u) => is_read_only_statement(&u.left) && is_read_only_statement(&u.right),
236 Statement::Insert(_)
237 | Statement::Upsert(_)
238 | Statement::UpdateQuery(_)
239 | Statement::DeleteQuery(_)
240 | Statement::CreateType(_)
241 | Statement::AlterTable(_)
242 | Statement::DropTable(_)
243 | Statement::CreateView(_)
244 | Statement::RefreshView(_)
245 | Statement::DropView(_) => false,
246 Statement::Begin | Statement::Commit | Statement::Rollback => false,
247 Statement::Explain(inner) => is_read_only_statement(inner),
248 }
249}
250
251pub struct Engine {
252 catalog: Catalog,
253 /// Mission D9 — cached parsed+planned query trees keyed by canonical
254 /// hash. Saves the ~3μs parse+plan cost on repeat queries that differ
255 /// only in literal values.
256 ///
257 /// Mission infra-1: wrapped in `Mutex` so the read path can be driven
258 /// by `&self`. The critical section is extremely short — a single
259 /// hashmap lookup + plan clone on a hit, or a single insert on a miss.
260 /// A full `RwLock` would be over-engineered here; the contention window
261 /// is smaller than the read-path scan work it gates.
262 plan_cache: Mutex<PlanCache>,
263 /// Mission C Phase 13: reusable `Vec<Value>` scratch buffer for the
264 /// prepared-insert fast path. `execute_prepared` used to allocate a
265 /// fresh `vec![Value::Empty; n_cols]` on every insert; recycling this
266 /// buffer shaves one heap alloc per row on `insert_batch_1k`.
267 insert_values_scratch: Vec<Value>,
268 /// Materialized view registry: tracks view definitions, dependencies,
269 /// and dirty state. Views are backed by regular catalog tables; this
270 /// registry adds the lifecycle metadata.
271 view_registry: ViewRegistry,
272 in_transaction: bool,
273 /// WS2 — per-query memory budget ceiling (bytes). The running total lives
274 /// in a thread-local (see [`mem_budget`]) and is reset at every top-level
275 /// query entry, so sort/join/GROUP BY/IN-list materialization can be capped
276 /// without OOM-killing the process. This field holds only the *limit* (a
277 /// plain `usize`, so `Engine` stays `Sync` for the concurrent read path).
278 /// Default [`mem_budget::DEFAULT_QUERY_MEMORY_LIMIT`] (256 MB); overridable
279 /// via `Engine::with_memory_limit` (server reads `POWDB_QUERY_MEMORY_LIMIT`).
280 query_memory_limit: usize,
281}
282
283impl Engine {
284 /// Open or create a PowDB engine rooted at `data_dir`.
285 ///
286 /// If the directory already contains a catalog, it is reopened.
287 /// Otherwise a fresh empty database is created.
288 ///
289 /// # Examples
290 ///
291 /// ```
292 /// use powdb_query::executor::Engine;
293 ///
294 /// let dir = tempfile::tempdir().unwrap();
295 /// let engine = Engine::new(dir.path()).unwrap();
296 /// // Engine is ready — the directory now contains a catalog.
297 /// ```
298 pub fn new(data_dir: &Path) -> io::Result<Self> {
299 powdb_storage::create_data_dir_secure(data_dir)?;
300 // Try to reopen an existing database first; only create a fresh
301 // catalog when there isn't one already on disk.
302 let catalog = match Catalog::open(data_dir) {
303 Ok(c) => {
304 info!(data_dir = %data_dir.display(), "engine reopened existing database");
305 c
306 }
307 Err(e) if e.kind() == io::ErrorKind::NotFound => {
308 info!(data_dir = %data_dir.display(), "engine initialized fresh database");
309 Catalog::create(data_dir)?
310 }
311 Err(e) => return Err(e),
312 };
313 let view_registry =
314 ViewRegistry::open(data_dir).unwrap_or_else(|_| ViewRegistry::new(data_dir));
315 Ok(Engine {
316 catalog,
317 plan_cache: Mutex::new(PlanCache::new(PLAN_CACHE_CAPACITY)),
318 insert_values_scratch: Vec::new(),
319 view_registry,
320 in_transaction: false,
321 query_memory_limit: mem_budget::DEFAULT_QUERY_MEMORY_LIMIT,
322 })
323 }
324
325 /// Open or create an engine with an explicit per-query memory limit
326 /// (bytes). Used by the server to apply `POWDB_QUERY_MEMORY_LIMIT`, and by
327 /// tests that need a tiny limit to exercise the budget guard.
328 pub fn with_memory_limit(data_dir: &Path, limit_bytes: usize) -> io::Result<Self> {
329 let mut engine = Engine::new(data_dir)?;
330 engine.set_query_memory_limit(limit_bytes);
331 Ok(engine)
332 }
333
334 /// Current per-query memory limit in bytes.
335 pub fn query_memory_limit(&self) -> usize {
336 self.query_memory_limit
337 }
338
339 /// Override the per-query memory limit in bytes (builder-style).
340 pub fn set_query_memory_limit(&mut self, limit_bytes: usize) {
341 self.query_memory_limit = limit_bytes;
342 }
343
344 /// Enter a budgeted-statement frame for the current query. The returned
345 /// guard must be held for the duration of the statement; on its drop the
346 /// reentrancy depth is decremented. Only the *outermost* statement entry
347 /// zeroes this thread's running total, so a nested `execute_powql` (the
348 /// source query of a `create_view`/`refresh_view`) does NOT discard the
349 /// outer frame's accounting. The accumulator is thread-local, so this never
350 /// touches another concurrent query's total.
351 #[must_use = "the budget guard must outlive the statement body"]
352 pub(super) fn enter_memory_budget(&self) -> mem_budget::EnterGuard {
353 mem_budget::enter()
354 }
355
356 /// Charge the estimated footprint of a freshly materialized batch of rows
357 /// against the current per-query budget. Returns
358 /// [`QueryError::MemoryLimitExceeded`] cleanly if the batch would push the
359 /// query over its limit. Used at every full-materialization point (sort
360 /// buffer, join build side, GROUP BY hash table, IN-list).
361 pub(super) fn charge_rows(&self, rows: &[Vec<Value>]) -> Result<(), QueryError> {
362 let mut total = 0usize;
363 for row in rows {
364 total = total.saturating_add(mem_budget::estimate_row_size(row));
365 }
366 mem_budget::charge(total, self.query_memory_limit)
367 }
368
369 /// Charge a materialized IN-list (the literal expressions pulled out of an
370 /// uncorrelated `IN (subquery)`) against the current per-query budget.
371 /// Each item is conservatively sized at the `Expr` slot plus, for string
372 /// literals, the owned heap bytes.
373 pub(super) fn charge_in_list(&self, list: &[crate::ast::Expr]) -> Result<(), QueryError> {
374 let base = std::mem::size_of::<crate::ast::Expr>();
375 let mut total = std::mem::size_of::<Vec<crate::ast::Expr>>();
376 for item in list {
377 total = total.saturating_add(base);
378 if let crate::ast::Expr::Literal(crate::ast::Literal::String(s)) = item {
379 total = total.saturating_add(s.capacity());
380 }
381 }
382 mem_budget::charge(total, self.query_memory_limit)
383 }
384
385 /// Parse + plan + execute a PowQL query.
386 ///
387 /// # Examples
388 ///
389 /// ```
390 /// use powdb_query::executor::Engine;
391 /// use powdb_query::result::QueryResult;
392 ///
393 /// let dir = tempfile::tempdir().unwrap();
394 /// let mut engine = Engine::new(dir.path()).unwrap();
395 ///
396 /// // Create a table and insert a row.
397 /// engine.execute_powql("type User { required name: str, age: int }").unwrap();
398 /// engine.execute_powql(r#"insert User { name := "Alice", age := 30 }"#).unwrap();
399 ///
400 /// // Query rows back.
401 /// let result = engine.execute_powql("User").unwrap();
402 /// assert_eq!(result.row_count(), 1);
403 /// ```
404 ///
405 /// Mission D6 — tracing collapse: the previous implementation ran 4
406 /// `Instant::now()` + 3 `elapsed().as_micros()` calls + formatted an
407 /// `info!` span on every query, even when tracing was disabled. On a
408 /// sub-microsecond `point_lookup_indexed` call that overhead was
409 /// 100-200ns — 20%+ of the whole query. We now measure time only when
410 /// INFO is actually enabled via `tracing::enabled!`, and we moved the
411 /// noisy `debug!(?plan)` line behind the same gate so the Debug
412 /// formatter can't run unconditionally either.
413 ///
414 /// Mission D9 — plan cache: on the hot path we canonicalise the query
415 /// text (lex + FNV-1a hash with literal values stripped), check the
416 /// cache, and on a hit substitute the new literals into a clone of the
417 /// cached plan. This skips re-lexing, re-parsing, and re-planning —
418 /// around 3μs per call on bench workloads. On a miss we plan as before
419 /// and insert the plan under its canonical hash.
420 pub fn execute_powql(&mut self, input: &str) -> Result<QueryResult, QueryError> {
421 // WS2: each *outermost* statement starts with the full memory
422 // allowance. The guard holds the reentrancy depth so a nested
423 // `execute_powql` (e.g. a view's source query) does not reset the
424 // outer frame's accounting mid-statement.
425 let _budget = self.enter_memory_budget();
426 // Hot path: tracing disabled. Zero syscalls, zero formatting.
427 if !tracing::enabled!(Level::INFO) {
428 // D9: try the plan cache first. Canonicalisation lexes the
429 // query once; on a hit we skip the parser and planner entirely.
430 if let Ok((hash, literals)) = canonicalize(input) {
431 let cached = self
432 .plan_cache
433 .lock()
434 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
435 .get_with_substitution(hash, &literals);
436 if let Some(plan) = cached {
437 let plan = lower_unindexed_scans(&self.catalog, &plan);
438 let result = self.execute_plan(&plan);
439 // Mission B (post-review): statement-boundary WAL
440 // group commit. Catalog::wal_log now only appends;
441 // the fsync happens here exactly once per statement.
442 // `sync_wal` is a no-op when nothing was buffered
443 // (pure reads pay zero fsync).
444 if !self.in_transaction {
445 self.catalog
446 .sync_wal()
447 .map_err(|e| QueryError::StorageError(e.to_string()))?;
448 }
449 return result;
450 }
451 // Miss — plan, insert, execute.
452 return match planner::plan(input) {
453 Ok(plan) => {
454 self.plan_cache
455 .lock()
456 .map_err(|e| {
457 QueryError::Execution(format!("plan cache lock poisoned: {e}"))
458 })?
459 .insert(hash, plan.clone());
460 let plan = lower_unindexed_scans(&self.catalog, &plan);
461 let result = self.execute_plan(&plan);
462 if !self.in_transaction {
463 self.catalog
464 .sync_wal()
465 .map_err(|e| QueryError::StorageError(e.to_string()))?;
466 }
467 result
468 }
469 Err(e) => Err(QueryError::Parse(e.to_string())),
470 };
471 }
472 // Lex error — fall through to the planner so the caller gets a
473 // consistent error shape.
474 return match planner::plan(input) {
475 Ok(plan) => {
476 let plan = lower_unindexed_scans(&self.catalog, &plan);
477 let result = self.execute_plan(&plan);
478 if !self.in_transaction {
479 self.catalog
480 .sync_wal()
481 .map_err(|e| QueryError::StorageError(e.to_string()))?;
482 }
483 result
484 }
485 Err(e) => Err(QueryError::Parse(e.to_string())),
486 };
487 }
488
489 // Instrumented path — only taken under explicit tracing subscribers.
490 let total_start = Instant::now();
491 let plan_start = Instant::now();
492 let plan = planner::plan(input).map_err(|e| {
493 let msg = e.to_string();
494 error!(query = %input, error = %msg, "query plan failed");
495 QueryError::Parse(msg)
496 })?;
497 let plan_us = plan_start.elapsed().as_micros();
498
499 let exec_start = Instant::now();
500 let plan = lower_unindexed_scans(&self.catalog, &plan);
501 let result = self.execute_plan(&plan);
502 if !self.in_transaction {
503 self.catalog
504 .sync_wal()
505 .map_err(|e| QueryError::StorageError(e.to_string()))?;
506 }
507 let exec_us = exec_start.elapsed().as_micros();
508
509 let total_us = total_start.elapsed().as_micros();
510 match &result {
511 Ok(r) => {
512 info!(
513 query = %input,
514 plan_us = plan_us,
515 exec_us = exec_us,
516 total_us = total_us,
517 rows = r.row_count(),
518 "query ok"
519 );
520 }
521 Err(e) => {
522 error!(
523 query = %input,
524 plan_us = plan_us,
525 exec_us = exec_us,
526 error = %e,
527 "query failed"
528 );
529 }
530 }
531 result
532 }
533
534 /// Execute PowQL with `$N` placeholders bound to positional `params`.
535 ///
536 /// Task 4: parameters are substituted as literal *tokens* before
537 /// parsing (see [`crate::parser::parse_with_params`]), so untrusted
538 /// input can never change the query's shape. This path deliberately
539 /// **bypasses the plan cache** — template caching is a follow-up — and
540 /// otherwise mirrors the non-cached tail of [`Engine::execute_powql`].
541 pub fn execute_powql_with_params(
542 &mut self,
543 input: &str,
544 params: &[crate::ast::ParamValue],
545 ) -> Result<QueryResult, QueryError> {
546 let _budget = self.enter_memory_budget();
547 let stmt = crate::parser::parse_with_params(input, params)
548 .map_err(|e| QueryError::Parse(e.to_string()))?;
549 let plan =
550 crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
551 let plan = lower_unindexed_scans(&self.catalog, &plan);
552 let result = self.execute_plan(&plan);
553 if !self.in_transaction {
554 self.catalog
555 .sync_wal()
556 .map_err(|e| QueryError::StorageError(e.to_string()))?;
557 }
558 result
559 }
560
561 /// Read-only variant of [`Engine::execute_powql_with_params`].
562 ///
563 /// Mirrors [`Engine::execute_powql_readonly`]: parses with bound
564 /// params, rejects any write statement with
565 /// [`QueryError::ReadonlyNeedsWrite`] so the caller can escalate to the
566 /// write lock, then executes under a shared borrow. No plan-cache
567 /// interaction.
568 pub fn execute_powql_readonly_with_params(
569 &self,
570 input: &str,
571 params: &[crate::ast::ParamValue],
572 ) -> Result<QueryResult, QueryError> {
573 let _budget = self.enter_memory_budget();
574 let stmt = crate::parser::parse_with_params(input, params)
575 .map_err(|e| QueryError::Parse(e.to_string()))?;
576 if !is_read_only_statement(&stmt) {
577 return Err(QueryError::ReadonlyNeedsWrite);
578 }
579 let plan =
580 crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
581 let plan = lower_unindexed_scans(&self.catalog, &plan);
582 self.execute_plan_readonly(&plan)
583 }
584
585 /// Plan cache stats — useful for benches and debugging.
586 pub fn plan_cache_stats(&self) -> (u64, u64, usize) {
587 let cache = self.plan_cache.lock().unwrap_or_else(|e| e.into_inner());
588 (cache.hits, cache.misses, cache.len())
589 }
590
591 /// Mission infra-1: read-only entry point.
592 ///
593 /// Parses + plans + executes a PowQL query using only a shared borrow
594 /// on the engine. Rejects any statement that would mutate state
595 /// (Insert/Update/Delete/CreateTable/AlterTable/DropTable/CreateView/
596 /// RefreshView/DropView) by returning [`READONLY_NEEDS_WRITE`] so the
597 /// caller can escalate to the write lock.
598 ///
599 /// Also returns [`READONLY_NEEDS_WRITE`] if a materialized view in the
600 /// query is dirty — refreshing one requires `&mut self`, so the caller
601 /// must retake the write lock for the first refresh.
602 ///
603 /// This method is the concurrent-read fast path behind
604 /// `Arc<RwLock<Engine>>`: multiple threads can call it simultaneously
605 /// under a shared `.read()` lock and each will scan independently.
606 pub fn execute_powql_readonly(&self, input: &str) -> Result<QueryResult, QueryError> {
607 // WS2: each *outermost* statement starts with the full memory
608 // allowance. The guard holds the reentrancy depth so a nested
609 // `execute_powql*` does not reset the outer frame's accounting.
610 let _budget = self.enter_memory_budget();
611 // Parse the statement first so we can classify read vs. write
612 // without touching the catalog. This is the same lex+parse cost
613 // the hot path would pay anyway.
614 let stmt = crate::parser::parse(input).map_err(|e| QueryError::Parse(e.to_string()))?;
615 if !is_read_only_statement(&stmt) {
616 return Err(QueryError::ReadonlyNeedsWrite);
617 }
618
619 // Try the plan cache first — identical hash scheme to
620 // `execute_powql` so both paths share cache state. The mutex
621 // section is just a hashmap lookup + plan clone.
622 if let Ok((hash, literals)) = canonicalize(input) {
623 let cached = self
624 .plan_cache
625 .lock()
626 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
627 .get_with_substitution(hash, &literals);
628 if let Some(plan) = cached {
629 let plan = lower_unindexed_scans(&self.catalog, &plan);
630 return self.execute_plan_readonly(&plan);
631 }
632 // Miss: plan + insert + execute. The planner is pure, so this
633 // is safe from `&self`.
634 let plan = crate::planner::plan_statement(stmt)
635 .map_err(|e| QueryError::Parse(e.to_string()))?;
636 self.plan_cache
637 .lock()
638 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
639 .insert(hash, plan.clone());
640 let plan = lower_unindexed_scans(&self.catalog, &plan);
641 return self.execute_plan_readonly(&plan);
642 }
643 // Lex error — fall through to the planner for a consistent error
644 // shape (though `parse` above would usually have caught it).
645 let plan =
646 crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
647 let plan = lower_unindexed_scans(&self.catalog, &plan);
648 self.execute_plan_readonly(&plan)
649 }
650
651 /// Read-only version of [`Engine::execute_plan`]. Dispatches the
652 /// read-path plan variants by calling `&self` helpers and errors with
653 /// [`READONLY_NEEDS_WRITE`] on any write variant. This is the
654 /// recursion target for composite read plans under the RwLock reader.
655 ///
656 /// The dispatch mirrors `execute_plan` for the read branches but does
657 /// not carry any of the fast-paths that need `&mut self` (e.g. plan-
658 /// cache mutation on inner subqueries is handled via the shared mutex
659 /// in [`Engine::execute_powql_readonly`]; in-flight subquery
660 /// materialisation uses [`Engine::materialize_subqueries_readonly`]).
661 fn execute_plan_readonly(&self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
662 match plan {
663 PlanNode::SeqScan { table } => {
664 // Dirty view means we'd need to refresh it — can't do that
665 // under `&self`. Escalate to the write path.
666 if self.view_registry.is_dirty(table) {
667 return Err(QueryError::ReadonlyNeedsWrite);
668 }
669 let schema = self
670 .catalog
671 .schema(table)
672 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
673 .clone();
674 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
675 let rows: Vec<Vec<Value>> = self
676 .catalog
677 .scan(table)
678 .map_err(|e| e.to_string())?
679 .map(|(_, row)| row)
680 .collect();
681 Ok(QueryResult::Rows { columns, rows })
682 }
683
684 PlanNode::AliasScan { table, alias } => {
685 let schema = self
686 .catalog
687 .schema(table)
688 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
689 .clone();
690 let columns: Vec<String> = schema
691 .columns
692 .iter()
693 .map(|c| format!("{alias}.{}", c.name))
694 .collect();
695 let rows: Vec<Vec<Value>> = self
696 .catalog
697 .scan(table)
698 .map_err(|e| e.to_string())?
699 .map(|(_, row)| row)
700 .collect();
701 Ok(QueryResult::Rows { columns, rows })
702 }
703
704 PlanNode::IndexScan { table, column, key } => {
705 let schema = self
706 .catalog
707 .schema(table)
708 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
709 .clone();
710 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
711 let key_value = literal_to_value(key)?;
712 let tbl = self
713 .catalog
714 .get_table(table)
715 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
716
717 if tbl.has_index(column) {
718 // Use index_lookup_all to handle both unique and
719 // non-unique indexes — returns all matching RowIds.
720 let rids = tbl.index_lookup_all(column, &key_value);
721 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
722 for rid in rids {
723 if let Some(data) = tbl.heap.get(rid) {
724 rows.push(decode_row(&tbl.schema, &data));
725 }
726 }
727 return Ok(QueryResult::Rows { columns, rows });
728 }
729
730 // No index: synthetic eq predicate + compiled scan.
731 let fast = FastLayout::new(&schema);
732 let synth_pred = Expr::BinaryOp(
733 Box::new(Expr::Field(column.clone())),
734 BinOp::Eq,
735 Box::new(key.clone()),
736 );
737 if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, &schema) {
738 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
739 self.catalog
740 .for_each_row_raw(table, |_rid, data| {
741 if compiled(data) {
742 rows.push(decode_row(&schema, data));
743 }
744 })
745 .map_err(|e| QueryError::StorageError(e.to_string()))?;
746 return Ok(QueryResult::Rows { columns, rows });
747 }
748
749 // Last resort: slow eq-check.
750 let col_idx =
751 schema
752 .column_index(column)
753 .ok_or_else(|| QueryError::ColumnNotFound {
754 table: String::new(),
755 column: column.clone(),
756 })?;
757 let rows: Vec<Vec<Value>> = tbl
758 .scan()
759 .filter_map(|(_, row)| {
760 if row[col_idx] == key_value {
761 Some(row)
762 } else {
763 None
764 }
765 })
766 .collect();
767 Ok(QueryResult::Rows { columns, rows })
768 }
769
770 PlanNode::RangeScan {
771 table,
772 column,
773 start,
774 end,
775 } => {
776 let tbl = self
777 .catalog
778 .get_table(table)
779 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
780 let columns: Vec<String> =
781 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
782 let schema = tbl.schema.clone();
783
784 let start_val = match start {
785 Some((expr, _)) => Some(literal_to_value(expr)?),
786 None => None,
787 };
788 let end_val = match end {
789 Some((expr, _)) => Some(literal_to_value(expr)?),
790 None => None,
791 };
792 let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
793 let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
794
795 // Range scans only use the btree fast path for unique indexes.
796 // Non-unique indexes store composite keys that don't compare
797 // directly against raw column values.
798 if tbl.is_index_unique(column) == Some(true) {
799 if let Some(btree) = tbl.index(column) {
800 let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
801 (Some(s), Some(e)) => btree.range(s, e).collect(),
802 (Some(s), None) => btree.range_from(s),
803 (None, Some(e)) => btree.range_to(e),
804 (None, None) => {
805 // Unbounded both sides — equivalent to seq scan.
806 let rows: Vec<Vec<Value>> =
807 tbl.scan().map(|(_, row)| row).collect();
808 return Ok(QueryResult::Rows { columns, rows });
809 }
810 };
811 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
812 for (key, rid) in hits {
813 // Filter for exclusive bounds.
814 if !start_inclusive {
815 if let Some(ref s) = start_val {
816 if &key == s {
817 continue;
818 }
819 }
820 }
821 if !end_inclusive {
822 if let Some(ref e) = end_val {
823 if &key == e {
824 continue;
825 }
826 }
827 }
828 if let Some(data) = tbl.heap.get(rid) {
829 rows.push(decode_row(&schema, &data));
830 }
831 }
832 return Ok(QueryResult::Rows { columns, rows });
833 }
834 }
835
836 // Fallback: no index — synthesize the range predicate and scan.
837 let fast = FastLayout::new(&schema);
838 let synth = synthesize_range_predicate(column, start, end);
839 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, &schema) {
840 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
841 self.catalog
842 .for_each_row_raw(table, |_rid, data| {
843 if compiled(data) {
844 rows.push(decode_row(&schema, data));
845 }
846 })
847 .map_err(|e| QueryError::StorageError(e.to_string()))?;
848 return Ok(QueryResult::Rows { columns, rows });
849 }
850
851 // Last resort: decoded row eval.
852 let col_idx =
853 schema
854 .column_index(column)
855 .ok_or_else(|| QueryError::ColumnNotFound {
856 table: String::new(),
857 column: column.clone(),
858 })?;
859 let rows: Vec<Vec<Value>> = tbl
860 .scan()
861 .filter(|(_, row)| {
862 range_matches(
863 &row[col_idx],
864 &start_val,
865 start_inclusive,
866 &end_val,
867 end_inclusive,
868 )
869 })
870 .map(|(_, row)| row)
871 .collect();
872 Ok(QueryResult::Rows { columns, rows })
873 }
874
875 PlanNode::Filter { input, predicate } => {
876 // Materialise subqueries using the `&self` variant.
877 // Uncorrelated subqueries are replaced with InList/Bool;
878 // correlated ones are left as InSubquery/ExistsSubquery
879 // for per-row materialisation below.
880 let materialized;
881 let predicate = if contains_subquery(predicate) {
882 materialized = self.materialize_subqueries_readonly(predicate)?;
883 &materialized
884 } else {
885 predicate
886 };
887
888 // Correlated subquery path: per-row materialisation.
889 if contains_subquery(predicate) {
890 let result = self.execute_plan_readonly(input)?;
891 return match result {
892 QueryResult::Rows { columns, rows } => {
893 let mut filtered = Vec::new();
894 for row in rows {
895 let row_pred = self.materialize_correlated_for_row_readonly(
896 predicate, &row, &columns,
897 )?;
898 if eval_predicate(&row_pred, &row, &columns) {
899 filtered.push(row);
900 }
901 }
902 Ok(QueryResult::Rows {
903 columns,
904 rows: filtered,
905 })
906 }
907 _ => Err("filter requires row input".into()),
908 };
909 }
910
911 // Fused Filter+SeqScan fast path.
912 if let PlanNode::SeqScan { table } = input.as_ref() {
913 if self.view_registry.is_dirty(table) {
914 return Err(QueryError::ReadonlyNeedsWrite);
915 }
916 let schema = self
917 .catalog
918 .schema(table)
919 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
920 .clone();
921 let columns: Vec<String> =
922 schema.columns.iter().map(|c| c.name.clone()).collect();
923 let fast = FastLayout::new(&schema);
924 let row_layout = RowLayout::new(&schema);
925 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
926
927 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
928 self.catalog
929 .for_each_row_raw(table, |_rid, data| {
930 if compiled(data) {
931 rows.push(decode_row(&schema, data));
932 }
933 })
934 .map_err(|e| QueryError::StorageError(e.to_string()))?;
935 } else {
936 let pred_cols = predicate_column_indices(predicate, &columns);
937 self.catalog
938 .for_each_row_raw(table, |_rid, data| {
939 let pred_row =
940 decode_selective(&schema, &row_layout, data, &pred_cols);
941 if eval_predicate(predicate, &pred_row, &columns) {
942 rows.push(decode_row(&schema, data));
943 }
944 })
945 .map_err(|e| QueryError::StorageError(e.to_string()))?;
946 }
947
948 return Ok(QueryResult::Rows { columns, rows });
949 }
950
951 // General path.
952 let result = self.execute_plan_readonly(input)?;
953 match result {
954 QueryResult::Rows { columns, rows } => {
955 let filtered: Vec<Vec<Value>> = rows
956 .into_iter()
957 .filter(|row| eval_predicate(predicate, row, &columns))
958 .collect();
959 Ok(QueryResult::Rows {
960 columns,
961 rows: filtered,
962 })
963 }
964 _ => Err("filter requires row input".into()),
965 }
966 }
967
968 PlanNode::Project { input, fields } => {
969 // Fast path: Project over IndexScan. Avoids full-row decode
970 // by calling decode_column only for projected fields.
971 if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
972 let key_value = literal_to_value(key)?;
973 let tbl = self
974 .catalog
975 .get_table(table)
976 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
977 let schema = &tbl.schema;
978 let layout = tbl.row_layout();
979
980 let proj_columns: Vec<String> = fields
981 .iter()
982 .map(|f| {
983 f.alias.clone().unwrap_or_else(|| match &f.expr {
984 Expr::Field(name) => name.clone(),
985 _ => "?".into(),
986 })
987 })
988 .collect();
989
990 let proj_indices: Vec<usize> = fields
991 .iter()
992 .filter_map(|f| {
993 if let Expr::Field(name) = &f.expr {
994 schema.column_index(name)
995 } else {
996 None
997 }
998 })
999 .collect();
1000
1001 if tbl.has_index(column) {
1002 let rids = tbl.index_lookup_all(column, &key_value);
1003 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1004 for rid in rids {
1005 if let Some(data) = tbl.heap.get(rid) {
1006 let row: Vec<Value> = proj_indices
1007 .iter()
1008 .map(|&ci| decode_column(schema, layout, &data, ci))
1009 .collect();
1010 rows.push(row);
1011 }
1012 }
1013 return Ok(QueryResult::Rows {
1014 columns: proj_columns,
1015 rows,
1016 });
1017 }
1018 }
1019
1020 // Fast paths over Limit(Sort(...)) / Limit(Filter(...)) / Limit(SeqScan).
1021 if let PlanNode::Limit {
1022 input: inner,
1023 count: limit_expr,
1024 } = input.as_ref()
1025 {
1026 if let PlanNode::Sort {
1027 input: sort_input,
1028 keys,
1029 } = inner.as_ref()
1030 {
1031 if keys.len() == 1 {
1032 let sort_field = &keys[0].field;
1033 let descending = keys[0].descending;
1034 let limit = match limit_expr {
1035 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1036 _ => usize::MAX,
1037 };
1038 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
1039 match sort_input.as_ref() {
1040 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
1041 PlanNode::Filter {
1042 input: fi,
1043 predicate,
1044 } => {
1045 if let PlanNode::SeqScan { table } = fi.as_ref() {
1046 (Some(table.as_str()), Some(predicate))
1047 } else {
1048 (None, None)
1049 }
1050 }
1051 _ => (None, None),
1052 };
1053 if let Some(table) = table_opt {
1054 if let Some(result) = self.project_filter_sort_limit_fast(
1055 table, fields, sort_field, descending, limit, pred_opt,
1056 )? {
1057 return Ok(result);
1058 }
1059 }
1060 }
1061 }
1062 if let PlanNode::Filter {
1063 input: fi,
1064 predicate,
1065 } = inner.as_ref()
1066 {
1067 if let PlanNode::SeqScan { table } = fi.as_ref() {
1068 let limit = match limit_expr {
1069 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1070 _ => usize::MAX,
1071 };
1072 if let Some(result) = self.project_filter_limit_fast(
1073 table,
1074 fields,
1075 limit,
1076 Some(predicate),
1077 )? {
1078 return Ok(result);
1079 }
1080 }
1081 }
1082 if let PlanNode::SeqScan { table } = inner.as_ref() {
1083 let limit = match limit_expr {
1084 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
1085 _ => usize::MAX,
1086 };
1087 if let Some(result) =
1088 self.project_filter_limit_fast(table, fields, limit, None)?
1089 {
1090 return Ok(result);
1091 }
1092 }
1093 }
1094
1095 // Project(Filter(SeqScan)) without Limit.
1096 if let PlanNode::Filter {
1097 input: fi,
1098 predicate,
1099 } = input.as_ref()
1100 {
1101 if let PlanNode::SeqScan { table } = fi.as_ref() {
1102 if let Some(result) = self.project_filter_limit_fast(
1103 table,
1104 fields,
1105 usize::MAX,
1106 Some(predicate),
1107 )? {
1108 return Ok(result);
1109 }
1110 }
1111 }
1112
1113 // Project(SeqScan) without Filter or Limit.
1114 if let PlanNode::SeqScan { table } = input.as_ref() {
1115 if let Some(result) =
1116 self.project_filter_limit_fast(table, fields, usize::MAX, None)?
1117 {
1118 return Ok(result);
1119 }
1120 }
1121
1122 // Generic path.
1123 let result = self.execute_plan_readonly(input)?;
1124 match result {
1125 QueryResult::Rows { columns, rows } => {
1126 let proj_columns: Vec<String> = fields
1127 .iter()
1128 .map(|f| {
1129 f.alias.clone().unwrap_or_else(|| match &f.expr {
1130 Expr::Field(name) => name.clone(),
1131 Expr::QualifiedField { qualifier, field } => {
1132 format!("{qualifier}.{field}")
1133 }
1134 _ => "?".into(),
1135 })
1136 })
1137 .collect();
1138 let proj_rows: Vec<Vec<Value>> = rows
1139 .iter()
1140 .map(|row| {
1141 fields
1142 .iter()
1143 .map(|f| eval_expr(&f.expr, row, &columns))
1144 .collect()
1145 })
1146 .collect();
1147 Ok(QueryResult::Rows {
1148 columns: proj_columns,
1149 rows: proj_rows,
1150 })
1151 }
1152 _ => Err("project requires row input".into()),
1153 }
1154 }
1155
1156 PlanNode::Sort { input, keys } => {
1157 let result = self.execute_plan_readonly(input)?;
1158 match result {
1159 QueryResult::Rows { columns, mut rows } => {
1160 if rows.len() > MAX_SORT_ROWS {
1161 return Err(QueryError::SortLimitExceeded);
1162 }
1163 // WS2: byte-budget guard on the sort buffer.
1164 self.charge_rows(&rows)?;
1165 let key_indices: Vec<(usize, bool)> = keys
1166 .iter()
1167 .map(|k| {
1168 columns
1169 .iter()
1170 .position(|c| c == &k.field)
1171 .map(|idx| (idx, k.descending))
1172 .ok_or_else(|| QueryError::ColumnNotFound {
1173 table: String::new(),
1174 column: k.field.clone(),
1175 })
1176 })
1177 .collect::<Result<_, QueryError>>()?;
1178 rows.sort_by(|a, b| {
1179 for &(col_idx, descending) in &key_indices {
1180 let cmp = a[col_idx].cmp(&b[col_idx]);
1181 let cmp = if descending { cmp.reverse() } else { cmp };
1182 if cmp != std::cmp::Ordering::Equal {
1183 return cmp;
1184 }
1185 }
1186 std::cmp::Ordering::Equal
1187 });
1188 Ok(QueryResult::Rows { columns, rows })
1189 }
1190 _ => Err("sort requires row input".into()),
1191 }
1192 }
1193
1194 PlanNode::Limit { input, count } => {
1195 let result = self.execute_plan_readonly(input)?;
1196 let n = match count {
1197 Expr::Literal(Literal::Int(v)) => *v as usize,
1198 _ => return Err("limit must be integer literal".into()),
1199 };
1200 match result {
1201 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1202 columns,
1203 rows: rows.into_iter().take(n).collect(),
1204 }),
1205 _ => Err("limit requires row input".into()),
1206 }
1207 }
1208
1209 PlanNode::Offset { input, count } => {
1210 let result = self.execute_plan_readonly(input)?;
1211 let n = match count {
1212 Expr::Literal(Literal::Int(v)) => *v as usize,
1213 _ => return Err("offset must be integer literal".into()),
1214 };
1215 match result {
1216 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1217 columns,
1218 rows: rows.into_iter().skip(n).collect(),
1219 }),
1220 _ => Err("offset requires row input".into()),
1221 }
1222 }
1223
1224 PlanNode::Aggregate {
1225 input,
1226 function,
1227 field,
1228 } => {
1229 // Fast path: count() over SeqScan.
1230 if *function == AggFunc::Count {
1231 if let PlanNode::SeqScan { table } = input.as_ref() {
1232 // A dirty materialized view must be refreshed before
1233 // it can be counted, which needs `&mut self`. Escalate
1234 // to the write path (F3: count(View) returned stale).
1235 if self.view_registry.is_dirty(table) {
1236 return Err(QueryError::ReadonlyNeedsWrite);
1237 }
1238 let mut count: i64 = 0;
1239 self.catalog
1240 .for_each_row_raw(table, |_rid, _data| {
1241 count += 1;
1242 })
1243 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1244 return Ok(QueryResult::Scalar(Value::Int(count)));
1245 }
1246 if let PlanNode::Filter {
1247 input: inner,
1248 predicate,
1249 } = input.as_ref()
1250 {
1251 // Only take the fast path for a plain Filter(SeqScan)
1252 // with no subquery in the predicate. A subquery
1253 // predicate (`count(T filter .x in (...))`) must be
1254 // resolved first; the fast path evaluates the raw
1255 // predicate with no subquery materialisation, which
1256 // silently yields 0 (F1). Falling through routes it to
1257 // the generic path that runs the subquery correctly.
1258 if let PlanNode::SeqScan { table } = inner.as_ref() {
1259 if self.view_registry.is_dirty(table) {
1260 // F3: count(View filter ...) over a dirty view.
1261 return Err(QueryError::ReadonlyNeedsWrite);
1262 }
1263 }
1264 if let (PlanNode::SeqScan { table }, false) =
1265 (inner.as_ref(), contains_subquery(predicate))
1266 {
1267 let schema = self
1268 .catalog
1269 .schema(table)
1270 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
1271 .clone();
1272 let columns: Vec<String> =
1273 schema.columns.iter().map(|c| c.name.clone()).collect();
1274 let fast = FastLayout::new(&schema);
1275 let row_layout = RowLayout::new(&schema);
1276
1277 if let Some(compiled) =
1278 compile_predicate(predicate, &columns, &fast, &schema)
1279 {
1280 let mut count: i64 = 0;
1281 self.catalog
1282 .for_each_row_raw(table, |_rid, data| {
1283 if compiled(data) {
1284 count += 1;
1285 }
1286 })
1287 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1288 return Ok(QueryResult::Scalar(Value::Int(count)));
1289 }
1290
1291 let pred_cols = predicate_column_indices(predicate, &columns);
1292 let mut count: i64 = 0;
1293 self.catalog
1294 .for_each_row_raw(table, |_rid, data| {
1295 let pred_row =
1296 decode_selective(&schema, &row_layout, data, &pred_cols);
1297 if eval_predicate(predicate, &pred_row, &columns) {
1298 count += 1;
1299 }
1300 })
1301 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1302 return Ok(QueryResult::Scalar(Value::Int(count)));
1303 }
1304 }
1305 }
1306
1307 // Fast path: sum/avg/min/max over single fixed-size numeric.
1308 if matches!(
1309 function,
1310 AggFunc::Sum
1311 | AggFunc::Avg
1312 | AggFunc::Min
1313 | AggFunc::Max
1314 | AggFunc::CountDistinct
1315 ) {
1316 if let Some(col) = field.as_ref() {
1317 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
1318 match input.as_ref() {
1319 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
1320 PlanNode::Filter {
1321 input: inner,
1322 predicate,
1323 } => {
1324 if let PlanNode::SeqScan { table } = inner.as_ref() {
1325 (Some(table.as_str()), Some(predicate))
1326 } else {
1327 (None, None)
1328 }
1329 }
1330 _ => (None, None),
1331 };
1332 if let Some(table) = table_opt {
1333 if let Some(result) =
1334 self.agg_single_col_fast(table, col, *function, pred_opt)?
1335 {
1336 return Ok(result);
1337 }
1338 }
1339 }
1340 }
1341
1342 // Generic path.
1343 let result = self.execute_plan_readonly(input)?;
1344 match result {
1345 QueryResult::Rows { columns, rows } => match function {
1346 AggFunc::Count => Ok(QueryResult::Scalar(Value::Int(rows.len() as i64))),
1347 AggFunc::CountDistinct => {
1348 let col = field.as_ref().ok_or("count distinct requires field")?;
1349 let idx = columns
1350 .iter()
1351 .position(|c| c == col)
1352 .ok_or("col not found")?;
1353 let mut seen = std::collections::HashSet::new();
1354 for row in &rows {
1355 let v = &row[idx];
1356 if !v.is_empty() {
1357 seen.insert(v.clone());
1358 }
1359 }
1360 Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
1361 }
1362 AggFunc::Avg => {
1363 let col = field.as_ref().ok_or("avg requires field")?;
1364 let idx = columns
1365 .iter()
1366 .position(|c| c == col)
1367 .ok_or("col not found")?;
1368 let sum: f64 = rows
1369 .iter()
1370 .filter_map(|r| match &r[idx] {
1371 Value::Int(v) => Some(*v as f64),
1372 Value::Float(v) => Some(*v),
1373 _ => None,
1374 })
1375 .sum();
1376 let count = rows.len() as f64;
1377 Ok(QueryResult::Scalar(Value::Float(sum / count)))
1378 }
1379 AggFunc::Sum => {
1380 let col = field.as_ref().ok_or("sum requires field")?;
1381 let idx = columns
1382 .iter()
1383 .position(|c| c == col)
1384 .ok_or("col not found")?;
1385 let mut int_sum: i64 = 0;
1386 let mut float_sum: f64 = 0.0;
1387 let mut saw_float = false;
1388 for r in &rows {
1389 match &r[idx] {
1390 Value::Int(v) => int_sum += *v,
1391 Value::Float(v) => {
1392 float_sum += *v;
1393 saw_float = true;
1394 }
1395 _ => {}
1396 }
1397 }
1398 let result = if saw_float {
1399 Value::Float(float_sum + int_sum as f64)
1400 } else {
1401 Value::Int(int_sum)
1402 };
1403 Ok(QueryResult::Scalar(result))
1404 }
1405 AggFunc::Min | AggFunc::Max => {
1406 let col = field.as_ref().ok_or("min/max requires field")?;
1407 let idx = columns
1408 .iter()
1409 .position(|c| c == col)
1410 .ok_or("col not found")?;
1411 let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
1412 let result = if *function == AggFunc::Min {
1413 vals.into_iter().min().cloned()
1414 } else {
1415 vals.into_iter().max().cloned()
1416 };
1417 Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
1418 }
1419 },
1420 _ => Err("aggregate requires row input".into()),
1421 }
1422 }
1423
1424 PlanNode::Distinct { input } => {
1425 let result = self.execute_plan_readonly(input)?;
1426 match result {
1427 QueryResult::Rows { columns, rows } => {
1428 let mut seen = std::collections::HashSet::new();
1429 let mut unique_rows = Vec::new();
1430 for row in rows {
1431 if seen.insert(row.clone()) {
1432 unique_rows.push(row);
1433 }
1434 }
1435 Ok(QueryResult::Rows {
1436 columns,
1437 rows: unique_rows,
1438 })
1439 }
1440 other => Ok(other),
1441 }
1442 }
1443
1444 PlanNode::GroupBy {
1445 input,
1446 keys,
1447 aggregates,
1448 having,
1449 } => {
1450 let result = self.execute_plan_readonly(input)?;
1451 match result {
1452 QueryResult::Rows { columns, rows } => {
1453 // WS2: byte-budget guard on the GROUP BY input buffer
1454 // (the hash table is bounded by the input it groups).
1455 self.charge_rows(&rows)?;
1456 let key_indices: Vec<usize> = keys
1457 .iter()
1458 .map(|k| {
1459 columns.iter().position(|c| c == k).ok_or_else(|| {
1460 QueryError::ColumnNotFound {
1461 table: String::new(),
1462 column: k.clone(),
1463 }
1464 })
1465 })
1466 .collect::<Result<Vec<_>, _>>()?;
1467
1468 let agg_field_indices: Vec<usize> = aggregates
1469 .iter()
1470 .map(|a| {
1471 if a.field == "*" {
1472 Ok(usize::MAX)
1473 } else {
1474 columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1475 QueryError::ColumnNotFound {
1476 table: String::new(),
1477 column: a.field.clone(),
1478 }
1479 })
1480 }
1481 })
1482 .collect::<Result<Vec<_>, _>>()?;
1483
1484 let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1485 rustc_hash::FxHashMap::default();
1486 let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1487 for (ri, row) in rows.iter().enumerate() {
1488 let key: Vec<Value> =
1489 key_indices.iter().map(|&i| row[i].clone()).collect();
1490 match group_map.get(&key) {
1491 Some(&idx) => groups[idx].1.push(ri),
1492 None => {
1493 let idx = groups.len();
1494 group_map.insert(key.clone(), idx);
1495 groups.push((key, vec![ri]));
1496 }
1497 }
1498 }
1499
1500 let mut out_columns: Vec<String> = keys.clone();
1501 for agg in aggregates.iter() {
1502 out_columns.push(agg.output_name.clone());
1503 }
1504
1505 let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1506 for (key_vals, row_indices) in &groups {
1507 let mut row = key_vals.clone();
1508 for (ai, agg) in aggregates.iter().enumerate() {
1509 let col_idx = agg_field_indices[ai];
1510 let val = compute_group_aggregate(
1511 agg.function,
1512 &rows,
1513 row_indices,
1514 col_idx,
1515 );
1516 row.push(val);
1517 }
1518 out_rows.push(row);
1519 }
1520
1521 if let Some(having_expr) = having {
1522 out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1523 }
1524
1525 Ok(QueryResult::Rows {
1526 columns: out_columns,
1527 rows: out_rows,
1528 })
1529 }
1530 _ => Err("group by requires row input".into()),
1531 }
1532 }
1533
1534 PlanNode::NestedLoopJoin {
1535 left,
1536 right,
1537 on,
1538 kind,
1539 } => {
1540 let left_result = self.execute_plan_readonly(left)?;
1541 let right_result = self.execute_plan_readonly(right)?;
1542 let (left_columns, left_rows) = match left_result {
1543 QueryResult::Rows { columns, rows } => (columns, rows),
1544 _ => return Err("join left side must produce rows".into()),
1545 };
1546 let (right_columns, right_rows) = match right_result {
1547 QueryResult::Rows { columns, rows } => (columns, rows),
1548 _ => return Err("join right side must produce rows".into()),
1549 };
1550
1551 // WS2: byte-budget guard on the join build side.
1552 self.charge_rows(&left_rows)?;
1553 self.charge_rows(&right_rows)?;
1554
1555 if !matches!(kind, JoinKind::Cross) {
1556 if let Some(pred) = on {
1557 if let Some((l_idx, r_idx)) =
1558 try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1559 {
1560 let result = hash_join(
1561 left_columns,
1562 left_rows,
1563 right_columns,
1564 right_rows,
1565 l_idx,
1566 r_idx,
1567 *kind,
1568 );
1569 if let QueryResult::Rows { ref rows, .. } = result {
1570 check_join_limit(rows.len())?;
1571 }
1572 return Ok(result);
1573 }
1574 }
1575 }
1576
1577 let n_left = left_columns.len();
1578 let n_right = right_columns.len();
1579 let mut columns = Vec::with_capacity(n_left + n_right);
1580 columns.extend(left_columns);
1581 columns.extend(right_columns);
1582
1583 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1584 let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1585
1586 for left_row in &left_rows {
1587 let mut matched = false;
1588 for right_row in &right_rows {
1589 combined.clear();
1590 combined.extend_from_slice(left_row);
1591 combined.extend_from_slice(right_row);
1592 let keep = match kind {
1593 JoinKind::Cross => true,
1594 JoinKind::Inner | JoinKind::LeftOuter => match on {
1595 Some(pred) => eval_predicate(pred, &combined, &columns),
1596 None => true,
1597 },
1598 JoinKind::RightOuter => {
1599 unreachable!("planner rewrites RightOuter to LeftOuter")
1600 }
1601 };
1602 if keep {
1603 rows.push(combined.clone());
1604 check_join_limit(rows.len())?;
1605 matched = true;
1606 }
1607 }
1608 if !matched && matches!(kind, JoinKind::LeftOuter) {
1609 let mut row = Vec::with_capacity(n_left + n_right);
1610 row.extend_from_slice(left_row);
1611 row.resize(n_left + n_right, Value::Empty);
1612 rows.push(row);
1613 check_join_limit(rows.len())?;
1614 }
1615 }
1616
1617 Ok(QueryResult::Rows { columns, rows })
1618 }
1619
1620 PlanNode::Window { input, windows } => {
1621 let result = self.execute_plan_readonly(input)?;
1622 execute_window(result, windows)
1623 }
1624
1625 PlanNode::Union { left, right, all } => {
1626 let left_result = self.execute_plan_readonly(left)?;
1627 let right_result = self.execute_plan_readonly(right)?;
1628 let (left_cols, left_rows) = match left_result {
1629 QueryResult::Rows { columns, rows } => (columns, rows),
1630 _ => return Err("UNION requires query results on left side".into()),
1631 };
1632 let (_, right_rows) = match right_result {
1633 QueryResult::Rows { columns, rows } => (columns, rows),
1634 _ => return Err("UNION requires query results on right side".into()),
1635 };
1636 let mut combined = left_rows;
1637 if *all {
1638 combined.extend(right_rows);
1639 } else {
1640 let mut seen = std::collections::HashSet::new();
1641 for row in &combined {
1642 seen.insert(row.clone());
1643 }
1644 for row in right_rows {
1645 if seen.insert(row.clone()) {
1646 combined.push(row);
1647 }
1648 }
1649 }
1650 Ok(QueryResult::Rows {
1651 columns: left_cols,
1652 rows: combined,
1653 })
1654 }
1655
1656 PlanNode::Explain { input } => {
1657 let text = format_plan_tree(input, 0);
1658 Ok(QueryResult::Rows {
1659 columns: vec!["plan".to_string()],
1660 rows: text
1661 .lines()
1662 .map(|line| vec![Value::Str(line.to_string())])
1663 .collect(),
1664 })
1665 }
1666
1667 // All write variants — caller must escalate to the write lock.
1668 PlanNode::Insert { .. }
1669 | PlanNode::Update { .. }
1670 | PlanNode::Delete { .. }
1671 | PlanNode::Upsert { .. }
1672 | PlanNode::CreateTable { .. }
1673 | PlanNode::AlterTable { .. }
1674 | PlanNode::DropTable { .. }
1675 | PlanNode::CreateView { .. }
1676 | PlanNode::RefreshView { .. }
1677 | PlanNode::DropView { .. }
1678 | PlanNode::Begin
1679 | PlanNode::Commit
1680 | PlanNode::Rollback => Err(QueryError::ReadonlyNeedsWrite),
1681 }
1682 }
1683
1684 /// `&self` variant of [`Engine::materialize_subqueries`]. Used by the
1685 /// read path so `Filter` predicates with `InSubquery`/`ExistsSubquery`
1686 /// children can evaluate their inner queries without taking the write
1687 /// lock. Inner queries that would themselves need a write (e.g. dirty
1688 /// view) escalate via [`READONLY_NEEDS_WRITE`] just like the top-level
1689 /// read path does.
1690 fn materialize_subqueries_readonly(&self, expr: &Expr) -> Result<Expr, QueryError> {
1691 match expr {
1692 Expr::InSubquery {
1693 expr: inner,
1694 subquery,
1695 negated,
1696 } => {
1697 if is_correlated_subquery(subquery, &self.catalog) {
1698 // Pass through — will be materialized per-row in the
1699 // Filter handler's correlated subquery path.
1700 let inner = self.materialize_subqueries_readonly(inner)?;
1701 return Ok(Expr::InSubquery {
1702 expr: Box::new(inner),
1703 subquery: subquery.clone(),
1704 negated: *negated,
1705 });
1706 }
1707 let inner = self.materialize_subqueries_readonly(inner)?;
1708 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1709 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1710 let result = self.execute_plan_readonly(&sub_plan)?;
1711 let values = match result {
1712 QueryResult::Rows { rows, .. } => rows
1713 .into_iter()
1714 .filter_map(|mut row| {
1715 if row.is_empty() {
1716 None
1717 } else {
1718 Some(value_to_expr(row.swap_remove(0)))
1719 }
1720 })
1721 .collect(),
1722 _ => Vec::new(),
1723 };
1724 // WS2: byte-budget guard on the materialized IN-list.
1725 self.charge_in_list(&values)?;
1726 Ok(Expr::InList {
1727 expr: Box::new(inner),
1728 list: values,
1729 negated: *negated,
1730 })
1731 }
1732 Expr::ExistsSubquery { subquery, negated } => {
1733 if is_correlated_subquery(subquery, &self.catalog) {
1734 return Ok(expr.clone());
1735 }
1736 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1737 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1738 let result = self.execute_plan_readonly(&sub_plan)?;
1739 let has_rows = match result {
1740 QueryResult::Rows { rows, .. } => !rows.is_empty(),
1741 _ => false,
1742 };
1743 let truth = if *negated { !has_rows } else { has_rows };
1744 Ok(Expr::Literal(Literal::Bool(truth)))
1745 }
1746 Expr::BinaryOp(l, op, r) => {
1747 let l = self.materialize_subqueries_readonly(l)?;
1748 let r = self.materialize_subqueries_readonly(r)?;
1749 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1750 }
1751 Expr::UnaryOp(op, inner) => {
1752 let inner = self.materialize_subqueries_readonly(inner)?;
1753 Ok(Expr::UnaryOp(*op, Box::new(inner)))
1754 }
1755 Expr::Case { whens, else_expr } => {
1756 let whens = whens
1757 .iter()
1758 .map(|(c, r)| {
1759 let c = self.materialize_subqueries_readonly(c)?;
1760 let r = self.materialize_subqueries_readonly(r)?;
1761 Ok((Box::new(c), Box::new(r)))
1762 })
1763 .collect::<Result<Vec<_>, QueryError>>()?;
1764 let else_expr = match else_expr {
1765 Some(e) => Some(Box::new(self.materialize_subqueries_readonly(e)?)),
1766 None => None,
1767 };
1768 Ok(Expr::Case { whens, else_expr })
1769 }
1770 other => Ok(other.clone()),
1771 }
1772 }
1773
1774 /// Per-row materialisation of correlated subqueries. For each row in the
1775 /// outer query, substitute outer column references in the subquery's
1776 /// filter with the current row's literal values, execute the modified
1777 /// subquery, and return the result as an InList or Bool literal.
1778 fn materialize_correlated_for_row_readonly(
1779 &self,
1780 expr: &Expr,
1781 outer_row: &[Value],
1782 outer_columns: &[String],
1783 ) -> Result<Expr, QueryError> {
1784 match expr {
1785 Expr::InSubquery {
1786 expr: inner,
1787 subquery,
1788 negated,
1789 } => {
1790 let inner =
1791 self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1792 let mut sub = *subquery.clone();
1793 if let Some(ref filter) = sub.filter {
1794 sub.filter = Some(substitute_outer_refs(
1795 filter,
1796 &sub.source,
1797 &self.catalog,
1798 outer_row,
1799 outer_columns,
1800 ));
1801 }
1802 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1803 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1804 let result = self.execute_plan_readonly(&sub_plan)?;
1805 let values = match result {
1806 QueryResult::Rows { rows, .. } => rows
1807 .into_iter()
1808 .filter_map(|mut row| {
1809 if row.is_empty() {
1810 None
1811 } else {
1812 Some(value_to_expr(row.swap_remove(0)))
1813 }
1814 })
1815 .collect(),
1816 _ => Vec::new(),
1817 };
1818 // WS2: byte-budget guard on the per-row materialized IN-list.
1819 self.charge_in_list(&values)?;
1820 Ok(Expr::InList {
1821 expr: Box::new(inner),
1822 list: values,
1823 negated: *negated,
1824 })
1825 }
1826 Expr::ExistsSubquery { subquery, negated } => {
1827 let mut sub = *subquery.clone();
1828 if let Some(ref filter) = sub.filter {
1829 sub.filter = Some(substitute_outer_refs(
1830 filter,
1831 &sub.source,
1832 &self.catalog,
1833 outer_row,
1834 outer_columns,
1835 ));
1836 }
1837 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1838 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1839 let result = self.execute_plan_readonly(&sub_plan)?;
1840 let has_rows = match result {
1841 QueryResult::Rows { rows, .. } => !rows.is_empty(),
1842 _ => false,
1843 };
1844 let truth = if *negated { !has_rows } else { has_rows };
1845 Ok(Expr::Literal(Literal::Bool(truth)))
1846 }
1847 Expr::BinaryOp(l, op, r) => {
1848 let l =
1849 self.materialize_correlated_for_row_readonly(l, outer_row, outer_columns)?;
1850 let r =
1851 self.materialize_correlated_for_row_readonly(r, outer_row, outer_columns)?;
1852 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1853 }
1854 Expr::UnaryOp(op, inner) => {
1855 let inner =
1856 self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1857 Ok(Expr::UnaryOp(*op, Box::new(inner)))
1858 }
1859 other => Ok(other.clone()),
1860 }
1861 }
1862
1863 pub fn catalog(&self) -> &Catalog {
1864 &self.catalog
1865 }
1866
1867 pub fn catalog_mut(&mut self) -> &mut Catalog {
1868 &mut self.catalog
1869 }
1870}