powdb_query/executor/mod.rs
1//! PowDB query executor.
2
3// Submodules that don't use macros defined in this file.
4mod compiled;
5mod eval;
6
7use crate::ast::*;
8use crate::canonicalize::canonicalize;
9use crate::plan::*;
10use crate::plan_cache::PlanCache;
11use crate::planner;
12use crate::result::{QueryError, QueryResult};
13use powdb_storage::catalog::Catalog;
14use powdb_storage::row::{decode_column, decode_row, RowLayout};
15use powdb_storage::types::*;
16use powdb_storage::view::ViewRegistry;
17
18use std::io;
19use std::path::Path;
20use std::sync::Mutex;
21use std::time::Instant;
22use tracing::{error, info, Level};
23
24use self::compiled::*;
25use self::eval::*;
26
27/// Legacy sentinel string constant — kept for backward compatibility with
28/// any external code matching on the string representation. New code should
29/// match on `QueryError::ReadonlyNeedsWrite` directly.
30pub const READONLY_NEEDS_WRITE: &str = "__POWDB_READONLY_NEEDS_WRITE__";
31
32/// Plan cache capacity. Bench workloads fill ~15 slots; real apps will sit
33/// comfortably in 256. Lookup is O(1), collisions clear the cache (see
34/// `plan_cache::PlanCache::insert`).
35const PLAN_CACHE_CAPACITY: usize = 256;
36
37/// Maximum number of rows a join may produce before the executor aborts.
38/// Prevents Cartesian-product blowups (e.g. `T cross join T` on 10K rows
39/// would produce 100M rows in memory without this cap).
40pub(super) const MAX_JOIN_ROWS: usize = 1_000_000;
41
42/// Maximum number of rows that may be materialized for sorting.
43/// Queries that exceed this should add a LIMIT clause to narrow the input
44/// before sorting.
45pub(super) const MAX_SORT_ROWS: usize = 10_000_000;
46
47#[inline]
48pub(super) fn check_join_limit(row_count: usize) -> Result<(), QueryError> {
49 if row_count > MAX_JOIN_ROWS {
50 return Err(QueryError::JoinLimitExceeded);
51 }
52 Ok(())
53}
54
55// ─── Mission D11 Phase 1: scalar hot-loop helpers ─────────────────────────
56//
57// These macros expand into the scan body of `agg_single_col_fast` and sit
58// inside the `for_each_row_raw` closure. They exist to:
59//
60// 1. Split the loop on presence of a predicate *outside* the hot body,
61// so the no-predicate path (agg_sum/agg_min/agg_max bench workloads)
62// never pays the `Option<CompiledPredicate>` branch per row.
63// 2. Drop two bounds checks per row by reading the null bitmap byte
64// and the 8-byte value via raw pointer casts.
65//
66// SAFETY (shared across every call site below):
67//
68// - `$bmp_byte` is `col_idx / 8` where `col_idx < n_cols`, and the row
69// encoding stores `bitmap_size = n_cols.div_ceil(8)` bytes of bitmap
70// starting at offset 2. So `2 + $bmp_byte < 2 + bitmap_size ≤ row_len`
71// and `get_unchecked(2 + $bmp_byte)` is inside the row slice.
72// - `$off = 2 + bitmap_size + fixed_offsets[col_idx]` for a fixed-size
73// column. Every fixed-size column contributes `fixed_size(type_id)`
74// bytes to the fixed region, so the row always has `[$off .. $off+8]`
75// available for any i64/f64 column — enforced by the row encoder
76// (`storage/src/row.rs`) and the schema invariant that a row with a
77// given schema has `row_len ≥ 2 + bitmap_size + fixed_region_size`.
78// - Both macros are only invoked from `agg_single_col_fast`, which
79// early-returns if the column isn't Int/Float (8-byte fixed) and
80// early-returns if `fast.fixed_offsets[col_idx]` is `None`.
81macro_rules! agg_int_loop {
82 (
83 $self:expr, $table:expr, $pred:expr,
84 $bmp_byte:expr, $bmp_bit:expr, $off:expr,
85 |$v:ident : i64| $body:block
86 ) => {{
87 let bmp_byte = $bmp_byte;
88 let bmp_bit = $bmp_bit;
89 let off = $off;
90 if let Some(pred) = &$pred {
91 $self
92 .catalog
93 .for_each_row_raw($table, |_rid, data| {
94 if !pred(data) {
95 return;
96 }
97 // Bounds guard: skip corrupt/truncated rows that are too
98 // short to contain the bitmap byte or the 8-byte value.
99 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
100 return;
101 }
102 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
103 // The bitmap byte lives at offset 2..2+bitmap_size in the
104 // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
105 // Corrupt rows are rejected by the bounds guard.
106 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
107 if (bmp >> bmp_bit) & 1 == 1 {
108 return;
109 }
110 // SAFETY: `off + 8 <= data.len()` is checked above.
111 // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
112 // to an 8-byte i64 in the fixed-size region of the row.
113 // The pointer cast is valid because we read exactly 8
114 // bytes via from_le_bytes. Corrupt rows are rejected by
115 // the bounds guard.
116 let $v: i64 =
117 unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
118 $body
119 })
120 .map_err(|e| QueryError::StorageError(e.to_string()))?;
121 } else {
122 $self
123 .catalog
124 .for_each_row_raw($table, |_rid, data| {
125 // Bounds guard: skip corrupt/truncated rows.
126 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
127 return;
128 }
129 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
130 // See the predicate branch for the full invariant.
131 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
132 if (bmp >> bmp_bit) & 1 == 1 {
133 return;
134 }
135 // SAFETY: `off + 8 <= data.len()` is checked above.
136 // See the predicate branch for the full invariant.
137 let $v: i64 =
138 unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
139 $body
140 })
141 .map_err(|e| QueryError::StorageError(e.to_string()))?;
142 }
143 }};
144}
145
146macro_rules! agg_float_loop {
147 (
148 $self:expr, $table:expr, $pred:expr,
149 $bmp_byte:expr, $bmp_bit:expr, $off:expr,
150 |$v:ident : f64| $body:block
151 ) => {{
152 let bmp_byte = $bmp_byte;
153 let bmp_bit = $bmp_bit;
154 let off = $off;
155 if let Some(pred) = &$pred {
156 $self
157 .catalog
158 .for_each_row_raw($table, |_rid, data| {
159 if !pred(data) {
160 return;
161 }
162 // Bounds guard: skip corrupt/truncated rows that are too
163 // short to contain the bitmap byte or the 8-byte value.
164 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
165 return;
166 }
167 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
168 // The bitmap byte lives at offset 2..2+bitmap_size in the
169 // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
170 // Corrupt rows are rejected by the bounds guard.
171 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
172 if (bmp >> bmp_bit) & 1 == 1 {
173 return;
174 }
175 // SAFETY: `off + 8 <= data.len()` is checked above.
176 // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
177 // to an 8-byte f64 in the fixed-size region of the row.
178 // The pointer cast is valid because we read exactly 8
179 // bytes via from_le_bytes. Corrupt rows are rejected by
180 // the bounds guard.
181 let $v: f64 =
182 unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
183 $body
184 })
185 .map_err(|e| QueryError::StorageError(e.to_string()))?;
186 } else {
187 $self
188 .catalog
189 .for_each_row_raw($table, |_rid, data| {
190 // Bounds guard: skip corrupt/truncated rows.
191 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
192 return;
193 }
194 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
195 // See the predicate branch for the full invariant.
196 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
197 if (bmp >> bmp_bit) & 1 == 1 {
198 return;
199 }
200 // SAFETY: `off + 8 <= data.len()` is checked above.
201 // See the predicate branch for the full invariant.
202 let $v: f64 =
203 unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
204 $body
205 })
206 .map_err(|e| QueryError::StorageError(e.to_string()))?;
207 }
208 }};
209}
210
211// Submodules that use the macros above — must be declared after macro_rules!.
212mod plan_exec;
213mod prepared;
214
215#[cfg(test)]
216mod tests;
217
218// Re-exports for the public API
219pub use self::prepared::PreparedQuery;
220
221use self::plan_exec::{
222 compute_group_aggregate, execute_window, format_plan_tree, hash_join,
223 lower_unindexed_range_scans, range_matches, synthesize_range_predicate,
224 try_extract_equi_join_keys,
225};
226
227/// Mission infra-1: classify a parsed statement as read-only vs. mutating.
228/// Used by [`Engine::execute_powql_readonly`] and by the server handler
229/// to decide between the RwLock reader and writer sides. `Union` recurses
230/// because each side can independently be read/write (though in practice
231/// both sides are reads — the parser only builds Union from query shapes).
232pub fn is_read_only_statement(stmt: &Statement) -> bool {
233 match stmt {
234 Statement::Query(_) => true,
235 Statement::Union(u) => is_read_only_statement(&u.left) && is_read_only_statement(&u.right),
236 Statement::Insert(_)
237 | Statement::Upsert(_)
238 | Statement::UpdateQuery(_)
239 | Statement::DeleteQuery(_)
240 | Statement::CreateType(_)
241 | Statement::AlterTable(_)
242 | Statement::DropTable(_)
243 | Statement::CreateView(_)
244 | Statement::RefreshView(_)
245 | Statement::DropView(_) => false,
246 Statement::Explain(inner) => is_read_only_statement(inner),
247 }
248}
249
250pub struct Engine {
251 catalog: Catalog,
252 /// Mission D9 — cached parsed+planned query trees keyed by canonical
253 /// hash. Saves the ~3μs parse+plan cost on repeat queries that differ
254 /// only in literal values.
255 ///
256 /// Mission infra-1: wrapped in `Mutex` so the read path can be driven
257 /// by `&self`. The critical section is extremely short — a single
258 /// hashmap lookup + plan clone on a hit, or a single insert on a miss.
259 /// A full `RwLock` would be over-engineered here; the contention window
260 /// is smaller than the read-path scan work it gates.
261 plan_cache: Mutex<PlanCache>,
262 /// Mission C Phase 13: reusable `Vec<Value>` scratch buffer for the
263 /// prepared-insert fast path. `execute_prepared` used to allocate a
264 /// fresh `vec![Value::Empty; n_cols]` on every insert; recycling this
265 /// buffer shaves one heap alloc per row on `insert_batch_1k`.
266 insert_values_scratch: Vec<Value>,
267 /// Materialized view registry: tracks view definitions, dependencies,
268 /// and dirty state. Views are backed by regular catalog tables; this
269 /// registry adds the lifecycle metadata.
270 view_registry: ViewRegistry,
271}
272
273impl Engine {
274 /// Open or create a PowDB engine rooted at `data_dir`.
275 ///
276 /// If the directory already contains a catalog, it is reopened.
277 /// Otherwise a fresh empty database is created.
278 ///
279 /// # Examples
280 ///
281 /// ```
282 /// use powdb_query::executor::Engine;
283 ///
284 /// let dir = tempfile::tempdir().unwrap();
285 /// let engine = Engine::new(dir.path()).unwrap();
286 /// // Engine is ready — the directory now contains a catalog.
287 /// ```
288 pub fn new(data_dir: &Path) -> io::Result<Self> {
289 std::fs::create_dir_all(data_dir)?;
290 // Try to reopen an existing database first; only create a fresh
291 // catalog when there isn't one already on disk.
292 let catalog = match Catalog::open(data_dir) {
293 Ok(c) => {
294 info!(data_dir = %data_dir.display(), "engine reopened existing database");
295 c
296 }
297 Err(e) if e.kind() == io::ErrorKind::NotFound => {
298 info!(data_dir = %data_dir.display(), "engine initialized fresh database");
299 Catalog::create(data_dir)?
300 }
301 Err(e) => return Err(e),
302 };
303 let view_registry =
304 ViewRegistry::open(data_dir).unwrap_or_else(|_| ViewRegistry::new(data_dir));
305 Ok(Engine {
306 catalog,
307 plan_cache: Mutex::new(PlanCache::new(PLAN_CACHE_CAPACITY)),
308 insert_values_scratch: Vec::new(),
309 view_registry,
310 })
311 }
312
313 /// Parse + plan + execute a PowQL query.
314 ///
315 /// # Examples
316 ///
317 /// ```
318 /// use powdb_query::executor::Engine;
319 /// use powdb_query::result::QueryResult;
320 ///
321 /// let dir = tempfile::tempdir().unwrap();
322 /// let mut engine = Engine::new(dir.path()).unwrap();
323 ///
324 /// // Create a table and insert a row.
325 /// engine.execute_powql("type User { required name: str, age: int }").unwrap();
326 /// engine.execute_powql(r#"insert User { name := "Alice", age := 30 }"#).unwrap();
327 ///
328 /// // Query rows back.
329 /// let result = engine.execute_powql("User").unwrap();
330 /// assert_eq!(result.row_count(), 1);
331 /// ```
332 ///
333 /// Mission D6 — tracing collapse: the previous implementation ran 4
334 /// `Instant::now()` + 3 `elapsed().as_micros()` calls + formatted an
335 /// `info!` span on every query, even when tracing was disabled. On a
336 /// sub-microsecond `point_lookup_indexed` call that overhead was
337 /// 100-200ns — 20%+ of the whole query. We now measure time only when
338 /// INFO is actually enabled via `tracing::enabled!`, and we moved the
339 /// noisy `debug!(?plan)` line behind the same gate so the Debug
340 /// formatter can't run unconditionally either.
341 ///
342 /// Mission D9 — plan cache: on the hot path we canonicalise the query
343 /// text (lex + FNV-1a hash with literal values stripped), check the
344 /// cache, and on a hit substitute the new literals into a clone of the
345 /// cached plan. This skips re-lexing, re-parsing, and re-planning —
346 /// around 3μs per call on bench workloads. On a miss we plan as before
347 /// and insert the plan under its canonical hash.
348 pub fn execute_powql(&mut self, input: &str) -> Result<QueryResult, QueryError> {
349 // Hot path: tracing disabled. Zero syscalls, zero formatting.
350 if !tracing::enabled!(Level::INFO) {
351 // D9: try the plan cache first. Canonicalisation lexes the
352 // query once; on a hit we skip the parser and planner entirely.
353 if let Ok((hash, literals)) = canonicalize(input) {
354 let cached = self
355 .plan_cache
356 .lock()
357 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
358 .get_with_substitution(hash, &literals);
359 if let Some(plan) = cached {
360 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
361 let result = self.execute_plan(&plan);
362 // Mission B (post-review): statement-boundary WAL
363 // group commit. Catalog::wal_log now only appends;
364 // the fsync happens here exactly once per statement.
365 // `sync_wal` is a no-op when nothing was buffered
366 // (pure reads pay zero fsync).
367 self.catalog
368 .sync_wal()
369 .map_err(|e| QueryError::StorageError(e.to_string()))?;
370 return result;
371 }
372 // Miss — plan, insert, execute.
373 return match planner::plan(input) {
374 Ok(plan) => {
375 self.plan_cache
376 .lock()
377 .map_err(|e| {
378 QueryError::Execution(format!("plan cache lock poisoned: {e}"))
379 })?
380 .insert(hash, plan.clone());
381 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
382 let result = self.execute_plan(&plan);
383 self.catalog
384 .sync_wal()
385 .map_err(|e| QueryError::StorageError(e.to_string()))?;
386 result
387 }
388 Err(e) => Err(QueryError::Parse(e.to_string())),
389 };
390 }
391 // Lex error — fall through to the planner so the caller gets a
392 // consistent error shape.
393 return match planner::plan(input) {
394 Ok(plan) => {
395 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
396 let result = self.execute_plan(&plan);
397 self.catalog
398 .sync_wal()
399 .map_err(|e| QueryError::StorageError(e.to_string()))?;
400 result
401 }
402 Err(e) => Err(QueryError::Parse(e.to_string())),
403 };
404 }
405
406 // Instrumented path — only taken under explicit tracing subscribers.
407 let total_start = Instant::now();
408 let plan_start = Instant::now();
409 let plan = planner::plan(input).map_err(|e| {
410 let msg = e.to_string();
411 error!(query = %input, error = %msg, "query plan failed");
412 QueryError::Parse(msg)
413 })?;
414 let plan_us = plan_start.elapsed().as_micros();
415
416 let exec_start = Instant::now();
417 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
418 let result = self.execute_plan(&plan);
419 // Mission B (post-review): statement-boundary WAL flush.
420 let _ = self.catalog.sync_wal();
421 let exec_us = exec_start.elapsed().as_micros();
422
423 let total_us = total_start.elapsed().as_micros();
424 match &result {
425 Ok(r) => {
426 info!(
427 query = %input,
428 plan_us = plan_us,
429 exec_us = exec_us,
430 total_us = total_us,
431 rows = r.row_count(),
432 "query ok"
433 );
434 }
435 Err(e) => {
436 error!(
437 query = %input,
438 plan_us = plan_us,
439 exec_us = exec_us,
440 error = %e,
441 "query failed"
442 );
443 }
444 }
445 result
446 }
447
448 /// Plan cache stats — useful for benches and debugging.
449 pub fn plan_cache_stats(&self) -> (u64, u64, usize) {
450 let cache = self.plan_cache.lock().unwrap_or_else(|e| e.into_inner());
451 (cache.hits, cache.misses, cache.len())
452 }
453
454 /// Mission infra-1: read-only entry point.
455 ///
456 /// Parses + plans + executes a PowQL query using only a shared borrow
457 /// on the engine. Rejects any statement that would mutate state
458 /// (Insert/Update/Delete/CreateTable/AlterTable/DropTable/CreateView/
459 /// RefreshView/DropView) by returning [`READONLY_NEEDS_WRITE`] so the
460 /// caller can escalate to the write lock.
461 ///
462 /// Also returns [`READONLY_NEEDS_WRITE`] if a materialized view in the
463 /// query is dirty — refreshing one requires `&mut self`, so the caller
464 /// must retake the write lock for the first refresh.
465 ///
466 /// This method is the concurrent-read fast path behind
467 /// `Arc<RwLock<Engine>>`: multiple threads can call it simultaneously
468 /// under a shared `.read()` lock and each will scan independently.
469 pub fn execute_powql_readonly(&self, input: &str) -> Result<QueryResult, QueryError> {
470 // Parse the statement first so we can classify read vs. write
471 // without touching the catalog. This is the same lex+parse cost
472 // the hot path would pay anyway.
473 let stmt = crate::parser::parse(input).map_err(|e| QueryError::Parse(e.to_string()))?;
474 if !is_read_only_statement(&stmt) {
475 return Err(QueryError::ReadonlyNeedsWrite);
476 }
477
478 // Try the plan cache first — identical hash scheme to
479 // `execute_powql` so both paths share cache state. The mutex
480 // section is just a hashmap lookup + plan clone.
481 if let Ok((hash, literals)) = canonicalize(input) {
482 let cached = self
483 .plan_cache
484 .lock()
485 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
486 .get_with_substitution(hash, &literals);
487 if let Some(plan) = cached {
488 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
489 return self.execute_plan_readonly(&plan);
490 }
491 // Miss: plan + insert + execute. The planner is pure, so this
492 // is safe from `&self`.
493 let plan = crate::planner::plan_statement(stmt)
494 .map_err(|e| QueryError::Parse(e.to_string()))?;
495 self.plan_cache
496 .lock()
497 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
498 .insert(hash, plan.clone());
499 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
500 return self.execute_plan_readonly(&plan);
501 }
502 // Lex error — fall through to the planner for a consistent error
503 // shape (though `parse` above would usually have caught it).
504 let plan =
505 crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
506 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
507 self.execute_plan_readonly(&plan)
508 }
509
510 /// Read-only version of [`Engine::execute_plan`]. Dispatches the
511 /// read-path plan variants by calling `&self` helpers and errors with
512 /// [`READONLY_NEEDS_WRITE`] on any write variant. This is the
513 /// recursion target for composite read plans under the RwLock reader.
514 ///
515 /// The dispatch mirrors `execute_plan` for the read branches but does
516 /// not carry any of the fast-paths that need `&mut self` (e.g. plan-
517 /// cache mutation on inner subqueries is handled via the shared mutex
518 /// in [`Engine::execute_powql_readonly`]; in-flight subquery
519 /// materialisation uses [`Engine::materialize_subqueries_readonly`]).
520 fn execute_plan_readonly(&self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
521 match plan {
522 PlanNode::SeqScan { table } => {
523 // Dirty view means we'd need to refresh it — can't do that
524 // under `&self`. Escalate to the write path.
525 if self.view_registry.is_dirty(table) {
526 return Err(QueryError::ReadonlyNeedsWrite);
527 }
528 let schema = self
529 .catalog
530 .schema(table)
531 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
532 .clone();
533 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
534 let rows: Vec<Vec<Value>> = self
535 .catalog
536 .scan(table)
537 .map_err(|e| e.to_string())?
538 .map(|(_, row)| row)
539 .collect();
540 Ok(QueryResult::Rows { columns, rows })
541 }
542
543 PlanNode::AliasScan { table, alias } => {
544 let schema = self
545 .catalog
546 .schema(table)
547 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
548 .clone();
549 let columns: Vec<String> = schema
550 .columns
551 .iter()
552 .map(|c| format!("{alias}.{}", c.name))
553 .collect();
554 let rows: Vec<Vec<Value>> = self
555 .catalog
556 .scan(table)
557 .map_err(|e| e.to_string())?
558 .map(|(_, row)| row)
559 .collect();
560 Ok(QueryResult::Rows { columns, rows })
561 }
562
563 PlanNode::IndexScan { table, column, key } => {
564 let schema = self
565 .catalog
566 .schema(table)
567 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
568 .clone();
569 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
570 let key_value = literal_to_value(key)?;
571 let tbl = self
572 .catalog
573 .get_table(table)
574 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
575
576 if let Some(btree) = tbl.index(column) {
577 let hit = match &key_value {
578 Value::Int(k) => btree.lookup_int(*k),
579 other => btree.lookup(other),
580 };
581 let rows = match hit {
582 Some(rid) => match tbl.heap.get(rid) {
583 Some(data) => vec![decode_row(&tbl.schema, &data)],
584 None => Vec::new(),
585 },
586 None => Vec::new(),
587 };
588 return Ok(QueryResult::Rows { columns, rows });
589 }
590
591 // No index: synthetic eq predicate + compiled scan.
592 let fast = FastLayout::new(&schema);
593 let synth_pred = Expr::BinaryOp(
594 Box::new(Expr::Field(column.clone())),
595 BinOp::Eq,
596 Box::new(key.clone()),
597 );
598 if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, &schema) {
599 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
600 self.catalog
601 .for_each_row_raw(table, |_rid, data| {
602 if compiled(data) {
603 rows.push(decode_row(&schema, data));
604 }
605 })
606 .map_err(|e| QueryError::StorageError(e.to_string()))?;
607 return Ok(QueryResult::Rows { columns, rows });
608 }
609
610 // Last resort: slow eq-check.
611 let col_idx =
612 schema
613 .column_index(column)
614 .ok_or_else(|| QueryError::ColumnNotFound {
615 table: String::new(),
616 column: column.clone(),
617 })?;
618 let rows: Vec<Vec<Value>> = tbl
619 .scan()
620 .filter_map(|(_, row)| {
621 if row[col_idx] == key_value {
622 Some(row)
623 } else {
624 None
625 }
626 })
627 .collect();
628 Ok(QueryResult::Rows { columns, rows })
629 }
630
631 PlanNode::RangeScan {
632 table,
633 column,
634 start,
635 end,
636 } => {
637 let tbl = self
638 .catalog
639 .get_table(table)
640 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
641 let columns: Vec<String> =
642 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
643 let schema = tbl.schema.clone();
644
645 let start_val = match start {
646 Some((expr, _)) => Some(literal_to_value(expr)?),
647 None => None,
648 };
649 let end_val = match end {
650 Some((expr, _)) => Some(literal_to_value(expr)?),
651 None => None,
652 };
653 let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
654 let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
655
656 if let Some(btree) = tbl.index(column) {
657 let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
658 (Some(s), Some(e)) => btree.range(s, e).collect(),
659 (Some(s), None) => btree.range_from(s),
660 (None, Some(e)) => btree.range_to(e),
661 (None, None) => {
662 // Unbounded both sides — equivalent to seq scan.
663 let rows: Vec<Vec<Value>> = tbl.scan().map(|(_, row)| row).collect();
664 return Ok(QueryResult::Rows { columns, rows });
665 }
666 };
667 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
668 for (key, rid) in hits {
669 // Filter for exclusive bounds.
670 if !start_inclusive {
671 if let Some(ref s) = start_val {
672 if &key == s {
673 continue;
674 }
675 }
676 }
677 if !end_inclusive {
678 if let Some(ref e) = end_val {
679 if &key == e {
680 continue;
681 }
682 }
683 }
684 if let Some(data) = tbl.heap.get(rid) {
685 rows.push(decode_row(&schema, &data));
686 }
687 }
688 return Ok(QueryResult::Rows { columns, rows });
689 }
690
691 // Fallback: no index — synthesize the range predicate and scan.
692 let fast = FastLayout::new(&schema);
693 let synth = synthesize_range_predicate(column, start, end);
694 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, &schema) {
695 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
696 self.catalog
697 .for_each_row_raw(table, |_rid, data| {
698 if compiled(data) {
699 rows.push(decode_row(&schema, data));
700 }
701 })
702 .map_err(|e| QueryError::StorageError(e.to_string()))?;
703 return Ok(QueryResult::Rows { columns, rows });
704 }
705
706 // Last resort: decoded row eval.
707 let col_idx =
708 schema
709 .column_index(column)
710 .ok_or_else(|| QueryError::ColumnNotFound {
711 table: String::new(),
712 column: column.clone(),
713 })?;
714 let rows: Vec<Vec<Value>> = tbl
715 .scan()
716 .filter(|(_, row)| {
717 range_matches(
718 &row[col_idx],
719 &start_val,
720 start_inclusive,
721 &end_val,
722 end_inclusive,
723 )
724 })
725 .map(|(_, row)| row)
726 .collect();
727 Ok(QueryResult::Rows { columns, rows })
728 }
729
730 PlanNode::Filter { input, predicate } => {
731 // Materialise subqueries using the `&self` variant.
732 // Uncorrelated subqueries are replaced with InList/Bool;
733 // correlated ones are left as InSubquery/ExistsSubquery
734 // for per-row materialisation below.
735 let materialized;
736 let predicate = if contains_subquery(predicate) {
737 materialized = self.materialize_subqueries_readonly(predicate)?;
738 &materialized
739 } else {
740 predicate
741 };
742
743 // Correlated subquery path: per-row materialisation.
744 if contains_subquery(predicate) {
745 let result = self.execute_plan_readonly(input)?;
746 return match result {
747 QueryResult::Rows { columns, rows } => {
748 let mut filtered = Vec::new();
749 for row in rows {
750 let row_pred = self.materialize_correlated_for_row_readonly(
751 predicate, &row, &columns,
752 )?;
753 if eval_predicate(&row_pred, &row, &columns) {
754 filtered.push(row);
755 }
756 }
757 Ok(QueryResult::Rows {
758 columns,
759 rows: filtered,
760 })
761 }
762 _ => Err("filter requires row input".into()),
763 };
764 }
765
766 // Fused Filter+SeqScan fast path.
767 if let PlanNode::SeqScan { table } = input.as_ref() {
768 if self.view_registry.is_dirty(table) {
769 return Err(QueryError::ReadonlyNeedsWrite);
770 }
771 let schema = self
772 .catalog
773 .schema(table)
774 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
775 .clone();
776 let columns: Vec<String> =
777 schema.columns.iter().map(|c| c.name.clone()).collect();
778 let fast = FastLayout::new(&schema);
779 let row_layout = RowLayout::new(&schema);
780 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
781
782 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
783 self.catalog
784 .for_each_row_raw(table, |_rid, data| {
785 if compiled(data) {
786 rows.push(decode_row(&schema, data));
787 }
788 })
789 .map_err(|e| QueryError::StorageError(e.to_string()))?;
790 } else {
791 let pred_cols = predicate_column_indices(predicate, &columns);
792 self.catalog
793 .for_each_row_raw(table, |_rid, data| {
794 let pred_row =
795 decode_selective(&schema, &row_layout, data, &pred_cols);
796 if eval_predicate(predicate, &pred_row, &columns) {
797 rows.push(decode_row(&schema, data));
798 }
799 })
800 .map_err(|e| QueryError::StorageError(e.to_string()))?;
801 }
802
803 return Ok(QueryResult::Rows { columns, rows });
804 }
805
806 // General path.
807 let result = self.execute_plan_readonly(input)?;
808 match result {
809 QueryResult::Rows { columns, rows } => {
810 let filtered: Vec<Vec<Value>> = rows
811 .into_iter()
812 .filter(|row| eval_predicate(predicate, row, &columns))
813 .collect();
814 Ok(QueryResult::Rows {
815 columns,
816 rows: filtered,
817 })
818 }
819 _ => Err("filter requires row input".into()),
820 }
821 }
822
823 PlanNode::Project { input, fields } => {
824 // Fast path: Project over IndexScan. Avoids full-row decode
825 // by calling decode_column only for projected fields.
826 if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
827 let key_value = literal_to_value(key)?;
828 let tbl = self
829 .catalog
830 .get_table(table)
831 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
832 let schema = &tbl.schema;
833 let layout = tbl.row_layout();
834
835 let proj_columns: Vec<String> = fields
836 .iter()
837 .map(|f| {
838 f.alias.clone().unwrap_or_else(|| match &f.expr {
839 Expr::Field(name) => name.clone(),
840 _ => "?".into(),
841 })
842 })
843 .collect();
844
845 let proj_indices: Vec<usize> = fields
846 .iter()
847 .filter_map(|f| {
848 if let Expr::Field(name) = &f.expr {
849 schema.column_index(name)
850 } else {
851 None
852 }
853 })
854 .collect();
855
856 if let Some(btree) = tbl.index(column) {
857 let lookup_result = match &key_value {
858 Value::Int(k) => btree.lookup_int(*k),
859 other => btree.lookup(other),
860 };
861 let rows = match lookup_result {
862 Some(rid) => match tbl.heap.get(rid) {
863 Some(data) => {
864 let row: Vec<Value> = proj_indices
865 .iter()
866 .map(|&ci| decode_column(schema, layout, &data, ci))
867 .collect();
868 vec![row]
869 }
870 None => Vec::new(),
871 },
872 None => Vec::new(),
873 };
874 return Ok(QueryResult::Rows {
875 columns: proj_columns,
876 rows,
877 });
878 }
879 }
880
881 // Fast paths over Limit(Sort(...)) / Limit(Filter(...)) / Limit(SeqScan).
882 if let PlanNode::Limit {
883 input: inner,
884 count: limit_expr,
885 } = input.as_ref()
886 {
887 if let PlanNode::Sort {
888 input: sort_input,
889 keys,
890 } = inner.as_ref()
891 {
892 if keys.len() == 1 {
893 let sort_field = &keys[0].field;
894 let descending = keys[0].descending;
895 let limit = match limit_expr {
896 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
897 _ => usize::MAX,
898 };
899 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
900 match sort_input.as_ref() {
901 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
902 PlanNode::Filter {
903 input: fi,
904 predicate,
905 } => {
906 if let PlanNode::SeqScan { table } = fi.as_ref() {
907 (Some(table.as_str()), Some(predicate))
908 } else {
909 (None, None)
910 }
911 }
912 _ => (None, None),
913 };
914 if let Some(table) = table_opt {
915 if let Some(result) = self.project_filter_sort_limit_fast(
916 table, fields, sort_field, descending, limit, pred_opt,
917 )? {
918 return Ok(result);
919 }
920 }
921 }
922 }
923 if let PlanNode::Filter {
924 input: fi,
925 predicate,
926 } = inner.as_ref()
927 {
928 if let PlanNode::SeqScan { table } = fi.as_ref() {
929 let limit = match limit_expr {
930 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
931 _ => usize::MAX,
932 };
933 if let Some(result) = self.project_filter_limit_fast(
934 table,
935 fields,
936 limit,
937 Some(predicate),
938 )? {
939 return Ok(result);
940 }
941 }
942 }
943 if let PlanNode::SeqScan { table } = inner.as_ref() {
944 let limit = match limit_expr {
945 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
946 _ => usize::MAX,
947 };
948 if let Some(result) =
949 self.project_filter_limit_fast(table, fields, limit, None)?
950 {
951 return Ok(result);
952 }
953 }
954 }
955
956 // Project(Filter(SeqScan)) without Limit.
957 if let PlanNode::Filter {
958 input: fi,
959 predicate,
960 } = input.as_ref()
961 {
962 if let PlanNode::SeqScan { table } = fi.as_ref() {
963 if let Some(result) = self.project_filter_limit_fast(
964 table,
965 fields,
966 usize::MAX,
967 Some(predicate),
968 )? {
969 return Ok(result);
970 }
971 }
972 }
973
974 // Project(SeqScan) without Filter or Limit.
975 if let PlanNode::SeqScan { table } = input.as_ref() {
976 if let Some(result) =
977 self.project_filter_limit_fast(table, fields, usize::MAX, None)?
978 {
979 return Ok(result);
980 }
981 }
982
983 // Generic path.
984 let result = self.execute_plan_readonly(input)?;
985 match result {
986 QueryResult::Rows { columns, rows } => {
987 let proj_columns: Vec<String> = fields
988 .iter()
989 .map(|f| {
990 f.alias.clone().unwrap_or_else(|| match &f.expr {
991 Expr::Field(name) => name.clone(),
992 Expr::QualifiedField { qualifier, field } => {
993 format!("{qualifier}.{field}")
994 }
995 _ => "?".into(),
996 })
997 })
998 .collect();
999 let proj_rows: Vec<Vec<Value>> = rows
1000 .iter()
1001 .map(|row| {
1002 fields
1003 .iter()
1004 .map(|f| eval_expr(&f.expr, row, &columns))
1005 .collect()
1006 })
1007 .collect();
1008 Ok(QueryResult::Rows {
1009 columns: proj_columns,
1010 rows: proj_rows,
1011 })
1012 }
1013 _ => Err("project requires row input".into()),
1014 }
1015 }
1016
1017 PlanNode::Sort { input, keys } => {
1018 let result = self.execute_plan_readonly(input)?;
1019 match result {
1020 QueryResult::Rows { columns, mut rows } => {
1021 if rows.len() > MAX_SORT_ROWS {
1022 return Err(QueryError::SortLimitExceeded);
1023 }
1024 let key_indices: Vec<(usize, bool)> = keys
1025 .iter()
1026 .map(|k| {
1027 columns
1028 .iter()
1029 .position(|c| c == &k.field)
1030 .map(|idx| (idx, k.descending))
1031 .ok_or_else(|| QueryError::ColumnNotFound {
1032 table: String::new(),
1033 column: k.field.clone(),
1034 })
1035 })
1036 .collect::<Result<_, QueryError>>()?;
1037 rows.sort_by(|a, b| {
1038 for &(col_idx, descending) in &key_indices {
1039 let cmp = a[col_idx].cmp(&b[col_idx]);
1040 let cmp = if descending { cmp.reverse() } else { cmp };
1041 if cmp != std::cmp::Ordering::Equal {
1042 return cmp;
1043 }
1044 }
1045 std::cmp::Ordering::Equal
1046 });
1047 Ok(QueryResult::Rows { columns, rows })
1048 }
1049 _ => Err("sort requires row input".into()),
1050 }
1051 }
1052
1053 PlanNode::Limit { input, count } => {
1054 let result = self.execute_plan_readonly(input)?;
1055 let n = match count {
1056 Expr::Literal(Literal::Int(v)) => *v as usize,
1057 _ => return Err("limit must be integer literal".into()),
1058 };
1059 match result {
1060 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1061 columns,
1062 rows: rows.into_iter().take(n).collect(),
1063 }),
1064 _ => Err("limit requires row input".into()),
1065 }
1066 }
1067
1068 PlanNode::Offset { input, count } => {
1069 let result = self.execute_plan_readonly(input)?;
1070 let n = match count {
1071 Expr::Literal(Literal::Int(v)) => *v as usize,
1072 _ => return Err("offset must be integer literal".into()),
1073 };
1074 match result {
1075 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1076 columns,
1077 rows: rows.into_iter().skip(n).collect(),
1078 }),
1079 _ => Err("offset requires row input".into()),
1080 }
1081 }
1082
1083 PlanNode::Aggregate {
1084 input,
1085 function,
1086 field,
1087 } => {
1088 // Fast path: count() over SeqScan.
1089 if *function == AggFunc::Count {
1090 if let PlanNode::SeqScan { table } = input.as_ref() {
1091 let mut count: i64 = 0;
1092 self.catalog
1093 .for_each_row_raw(table, |_rid, _data| {
1094 count += 1;
1095 })
1096 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1097 return Ok(QueryResult::Scalar(Value::Int(count)));
1098 }
1099 if let PlanNode::Filter {
1100 input: inner,
1101 predicate,
1102 } = input.as_ref()
1103 {
1104 if let PlanNode::SeqScan { table } = inner.as_ref() {
1105 let schema = self
1106 .catalog
1107 .schema(table)
1108 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
1109 .clone();
1110 let columns: Vec<String> =
1111 schema.columns.iter().map(|c| c.name.clone()).collect();
1112 let fast = FastLayout::new(&schema);
1113 let row_layout = RowLayout::new(&schema);
1114
1115 if let Some(compiled) =
1116 compile_predicate(predicate, &columns, &fast, &schema)
1117 {
1118 let mut count: i64 = 0;
1119 self.catalog
1120 .for_each_row_raw(table, |_rid, data| {
1121 if compiled(data) {
1122 count += 1;
1123 }
1124 })
1125 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1126 return Ok(QueryResult::Scalar(Value::Int(count)));
1127 }
1128
1129 let pred_cols = predicate_column_indices(predicate, &columns);
1130 let mut count: i64 = 0;
1131 self.catalog
1132 .for_each_row_raw(table, |_rid, data| {
1133 let pred_row =
1134 decode_selective(&schema, &row_layout, data, &pred_cols);
1135 if eval_predicate(predicate, &pred_row, &columns) {
1136 count += 1;
1137 }
1138 })
1139 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1140 return Ok(QueryResult::Scalar(Value::Int(count)));
1141 }
1142 }
1143 }
1144
1145 // Fast path: sum/avg/min/max over single fixed-size numeric.
1146 if matches!(
1147 function,
1148 AggFunc::Sum
1149 | AggFunc::Avg
1150 | AggFunc::Min
1151 | AggFunc::Max
1152 | AggFunc::CountDistinct
1153 ) {
1154 if let Some(col) = field.as_ref() {
1155 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
1156 match input.as_ref() {
1157 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
1158 PlanNode::Filter {
1159 input: inner,
1160 predicate,
1161 } => {
1162 if let PlanNode::SeqScan { table } = inner.as_ref() {
1163 (Some(table.as_str()), Some(predicate))
1164 } else {
1165 (None, None)
1166 }
1167 }
1168 _ => (None, None),
1169 };
1170 if let Some(table) = table_opt {
1171 if let Some(result) =
1172 self.agg_single_col_fast(table, col, *function, pred_opt)?
1173 {
1174 return Ok(result);
1175 }
1176 }
1177 }
1178 }
1179
1180 // Generic path.
1181 let result = self.execute_plan_readonly(input)?;
1182 match result {
1183 QueryResult::Rows { columns, rows } => match function {
1184 AggFunc::Count => Ok(QueryResult::Scalar(Value::Int(rows.len() as i64))),
1185 AggFunc::CountDistinct => {
1186 let col = field.as_ref().ok_or("count distinct requires field")?;
1187 let idx = columns
1188 .iter()
1189 .position(|c| c == col)
1190 .ok_or("col not found")?;
1191 let mut seen = std::collections::HashSet::new();
1192 for row in &rows {
1193 let v = &row[idx];
1194 if !v.is_empty() {
1195 seen.insert(v.clone());
1196 }
1197 }
1198 Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
1199 }
1200 AggFunc::Avg => {
1201 let col = field.as_ref().ok_or("avg requires field")?;
1202 let idx = columns
1203 .iter()
1204 .position(|c| c == col)
1205 .ok_or("col not found")?;
1206 let sum: f64 = rows
1207 .iter()
1208 .filter_map(|r| match &r[idx] {
1209 Value::Int(v) => Some(*v as f64),
1210 Value::Float(v) => Some(*v),
1211 _ => None,
1212 })
1213 .sum();
1214 let count = rows.len() as f64;
1215 Ok(QueryResult::Scalar(Value::Float(sum / count)))
1216 }
1217 AggFunc::Sum => {
1218 let col = field.as_ref().ok_or("sum requires field")?;
1219 let idx = columns
1220 .iter()
1221 .position(|c| c == col)
1222 .ok_or("col not found")?;
1223 let mut int_sum: i64 = 0;
1224 let mut float_sum: f64 = 0.0;
1225 let mut saw_float = false;
1226 for r in &rows {
1227 match &r[idx] {
1228 Value::Int(v) => int_sum += *v,
1229 Value::Float(v) => {
1230 float_sum += *v;
1231 saw_float = true;
1232 }
1233 _ => {}
1234 }
1235 }
1236 let result = if saw_float {
1237 Value::Float(float_sum + int_sum as f64)
1238 } else {
1239 Value::Int(int_sum)
1240 };
1241 Ok(QueryResult::Scalar(result))
1242 }
1243 AggFunc::Min | AggFunc::Max => {
1244 let col = field.as_ref().ok_or("min/max requires field")?;
1245 let idx = columns
1246 .iter()
1247 .position(|c| c == col)
1248 .ok_or("col not found")?;
1249 let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
1250 let result = if *function == AggFunc::Min {
1251 vals.into_iter().min().cloned()
1252 } else {
1253 vals.into_iter().max().cloned()
1254 };
1255 Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
1256 }
1257 },
1258 _ => Err("aggregate requires row input".into()),
1259 }
1260 }
1261
1262 PlanNode::Distinct { input } => {
1263 let result = self.execute_plan_readonly(input)?;
1264 match result {
1265 QueryResult::Rows { columns, rows } => {
1266 let mut seen = std::collections::HashSet::new();
1267 let mut unique_rows = Vec::new();
1268 for row in rows {
1269 if seen.insert(row.clone()) {
1270 unique_rows.push(row);
1271 }
1272 }
1273 Ok(QueryResult::Rows {
1274 columns,
1275 rows: unique_rows,
1276 })
1277 }
1278 other => Ok(other),
1279 }
1280 }
1281
1282 PlanNode::GroupBy {
1283 input,
1284 keys,
1285 aggregates,
1286 having,
1287 } => {
1288 let result = self.execute_plan_readonly(input)?;
1289 match result {
1290 QueryResult::Rows { columns, rows } => {
1291 let key_indices: Vec<usize> = keys
1292 .iter()
1293 .map(|k| {
1294 columns.iter().position(|c| c == k).ok_or_else(|| {
1295 QueryError::ColumnNotFound {
1296 table: String::new(),
1297 column: k.clone(),
1298 }
1299 })
1300 })
1301 .collect::<Result<Vec<_>, _>>()?;
1302
1303 let agg_field_indices: Vec<usize> = aggregates
1304 .iter()
1305 .map(|a| {
1306 if a.field == "*" {
1307 Ok(usize::MAX)
1308 } else {
1309 columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1310 QueryError::ColumnNotFound {
1311 table: String::new(),
1312 column: a.field.clone(),
1313 }
1314 })
1315 }
1316 })
1317 .collect::<Result<Vec<_>, _>>()?;
1318
1319 let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1320 rustc_hash::FxHashMap::default();
1321 let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1322 for (ri, row) in rows.iter().enumerate() {
1323 let key: Vec<Value> =
1324 key_indices.iter().map(|&i| row[i].clone()).collect();
1325 match group_map.get(&key) {
1326 Some(&idx) => groups[idx].1.push(ri),
1327 None => {
1328 let idx = groups.len();
1329 group_map.insert(key.clone(), idx);
1330 groups.push((key, vec![ri]));
1331 }
1332 }
1333 }
1334
1335 let mut out_columns: Vec<String> = keys.clone();
1336 for agg in aggregates.iter() {
1337 out_columns.push(agg.output_name.clone());
1338 }
1339
1340 let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1341 for (key_vals, row_indices) in &groups {
1342 let mut row = key_vals.clone();
1343 for (ai, agg) in aggregates.iter().enumerate() {
1344 let col_idx = agg_field_indices[ai];
1345 let val = compute_group_aggregate(
1346 agg.function,
1347 &rows,
1348 row_indices,
1349 col_idx,
1350 );
1351 row.push(val);
1352 }
1353 out_rows.push(row);
1354 }
1355
1356 if let Some(having_expr) = having {
1357 out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1358 }
1359
1360 Ok(QueryResult::Rows {
1361 columns: out_columns,
1362 rows: out_rows,
1363 })
1364 }
1365 _ => Err("group by requires row input".into()),
1366 }
1367 }
1368
1369 PlanNode::NestedLoopJoin {
1370 left,
1371 right,
1372 on,
1373 kind,
1374 } => {
1375 let left_result = self.execute_plan_readonly(left)?;
1376 let right_result = self.execute_plan_readonly(right)?;
1377 let (left_columns, left_rows) = match left_result {
1378 QueryResult::Rows { columns, rows } => (columns, rows),
1379 _ => return Err("join left side must produce rows".into()),
1380 };
1381 let (right_columns, right_rows) = match right_result {
1382 QueryResult::Rows { columns, rows } => (columns, rows),
1383 _ => return Err("join right side must produce rows".into()),
1384 };
1385
1386 if !matches!(kind, JoinKind::Cross) {
1387 if let Some(pred) = on {
1388 if let Some((l_idx, r_idx)) =
1389 try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1390 {
1391 let result = hash_join(
1392 left_columns,
1393 left_rows,
1394 right_columns,
1395 right_rows,
1396 l_idx,
1397 r_idx,
1398 *kind,
1399 );
1400 if let QueryResult::Rows { ref rows, .. } = result {
1401 check_join_limit(rows.len())?;
1402 }
1403 return Ok(result);
1404 }
1405 }
1406 }
1407
1408 let n_left = left_columns.len();
1409 let n_right = right_columns.len();
1410 let mut columns = Vec::with_capacity(n_left + n_right);
1411 columns.extend(left_columns);
1412 columns.extend(right_columns);
1413
1414 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1415 let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1416
1417 for left_row in &left_rows {
1418 let mut matched = false;
1419 for right_row in &right_rows {
1420 combined.clear();
1421 combined.extend_from_slice(left_row);
1422 combined.extend_from_slice(right_row);
1423 let keep = match kind {
1424 JoinKind::Cross => true,
1425 JoinKind::Inner | JoinKind::LeftOuter => match on {
1426 Some(pred) => eval_predicate(pred, &combined, &columns),
1427 None => true,
1428 },
1429 JoinKind::RightOuter => {
1430 unreachable!("planner rewrites RightOuter to LeftOuter")
1431 }
1432 };
1433 if keep {
1434 rows.push(combined.clone());
1435 check_join_limit(rows.len())?;
1436 matched = true;
1437 }
1438 }
1439 if !matched && matches!(kind, JoinKind::LeftOuter) {
1440 let mut row = Vec::with_capacity(n_left + n_right);
1441 row.extend_from_slice(left_row);
1442 row.resize(n_left + n_right, Value::Empty);
1443 rows.push(row);
1444 check_join_limit(rows.len())?;
1445 }
1446 }
1447
1448 Ok(QueryResult::Rows { columns, rows })
1449 }
1450
1451 PlanNode::Window { input, windows } => {
1452 let result = self.execute_plan_readonly(input)?;
1453 execute_window(result, windows)
1454 }
1455
1456 PlanNode::Union { left, right, all } => {
1457 let left_result = self.execute_plan_readonly(left)?;
1458 let right_result = self.execute_plan_readonly(right)?;
1459 let (left_cols, left_rows) = match left_result {
1460 QueryResult::Rows { columns, rows } => (columns, rows),
1461 _ => return Err("UNION requires query results on left side".into()),
1462 };
1463 let (_, right_rows) = match right_result {
1464 QueryResult::Rows { columns, rows } => (columns, rows),
1465 _ => return Err("UNION requires query results on right side".into()),
1466 };
1467 let mut combined = left_rows;
1468 if *all {
1469 combined.extend(right_rows);
1470 } else {
1471 let mut seen = std::collections::HashSet::new();
1472 for row in &combined {
1473 seen.insert(row.clone());
1474 }
1475 for row in right_rows {
1476 if seen.insert(row.clone()) {
1477 combined.push(row);
1478 }
1479 }
1480 }
1481 Ok(QueryResult::Rows {
1482 columns: left_cols,
1483 rows: combined,
1484 })
1485 }
1486
1487 PlanNode::Explain { input } => {
1488 let text = format_plan_tree(input, 0);
1489 Ok(QueryResult::Rows {
1490 columns: vec!["plan".to_string()],
1491 rows: text
1492 .lines()
1493 .map(|line| vec![Value::Str(line.to_string())])
1494 .collect(),
1495 })
1496 }
1497
1498 // All write variants — caller must escalate to the write lock.
1499 PlanNode::Insert { .. }
1500 | PlanNode::Update { .. }
1501 | PlanNode::Delete { .. }
1502 | PlanNode::Upsert { .. }
1503 | PlanNode::CreateTable { .. }
1504 | PlanNode::AlterTable { .. }
1505 | PlanNode::DropTable { .. }
1506 | PlanNode::CreateView { .. }
1507 | PlanNode::RefreshView { .. }
1508 | PlanNode::DropView { .. } => Err(QueryError::ReadonlyNeedsWrite),
1509 }
1510 }
1511
1512 /// `&self` variant of [`Engine::materialize_subqueries`]. Used by the
1513 /// read path so `Filter` predicates with `InSubquery`/`ExistsSubquery`
1514 /// children can evaluate their inner queries without taking the write
1515 /// lock. Inner queries that would themselves need a write (e.g. dirty
1516 /// view) escalate via [`READONLY_NEEDS_WRITE`] just like the top-level
1517 /// read path does.
1518 fn materialize_subqueries_readonly(&self, expr: &Expr) -> Result<Expr, QueryError> {
1519 match expr {
1520 Expr::InSubquery {
1521 expr: inner,
1522 subquery,
1523 negated,
1524 } => {
1525 if is_correlated_subquery(subquery, &self.catalog) {
1526 // Pass through — will be materialized per-row in the
1527 // Filter handler's correlated subquery path.
1528 let inner = self.materialize_subqueries_readonly(inner)?;
1529 return Ok(Expr::InSubquery {
1530 expr: Box::new(inner),
1531 subquery: subquery.clone(),
1532 negated: *negated,
1533 });
1534 }
1535 let inner = self.materialize_subqueries_readonly(inner)?;
1536 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1537 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1538 let result = self.execute_plan_readonly(&sub_plan)?;
1539 let values = match result {
1540 QueryResult::Rows { rows, .. } => rows
1541 .into_iter()
1542 .filter_map(|mut row| {
1543 if row.is_empty() {
1544 None
1545 } else {
1546 Some(value_to_expr(row.swap_remove(0)))
1547 }
1548 })
1549 .collect(),
1550 _ => Vec::new(),
1551 };
1552 Ok(Expr::InList {
1553 expr: Box::new(inner),
1554 list: values,
1555 negated: *negated,
1556 })
1557 }
1558 Expr::ExistsSubquery { subquery, negated } => {
1559 if is_correlated_subquery(subquery, &self.catalog) {
1560 return Ok(expr.clone());
1561 }
1562 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1563 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1564 let result = self.execute_plan_readonly(&sub_plan)?;
1565 let has_rows = match result {
1566 QueryResult::Rows { rows, .. } => !rows.is_empty(),
1567 _ => false,
1568 };
1569 let truth = if *negated { !has_rows } else { has_rows };
1570 Ok(Expr::Literal(Literal::Bool(truth)))
1571 }
1572 Expr::BinaryOp(l, op, r) => {
1573 let l = self.materialize_subqueries_readonly(l)?;
1574 let r = self.materialize_subqueries_readonly(r)?;
1575 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1576 }
1577 Expr::UnaryOp(op, inner) => {
1578 let inner = self.materialize_subqueries_readonly(inner)?;
1579 Ok(Expr::UnaryOp(*op, Box::new(inner)))
1580 }
1581 Expr::Case { whens, else_expr } => {
1582 let whens = whens
1583 .iter()
1584 .map(|(c, r)| {
1585 let c = self.materialize_subqueries_readonly(c)?;
1586 let r = self.materialize_subqueries_readonly(r)?;
1587 Ok((Box::new(c), Box::new(r)))
1588 })
1589 .collect::<Result<Vec<_>, QueryError>>()?;
1590 let else_expr = match else_expr {
1591 Some(e) => Some(Box::new(self.materialize_subqueries_readonly(e)?)),
1592 None => None,
1593 };
1594 Ok(Expr::Case { whens, else_expr })
1595 }
1596 other => Ok(other.clone()),
1597 }
1598 }
1599
1600 /// Per-row materialisation of correlated subqueries. For each row in the
1601 /// outer query, substitute outer column references in the subquery's
1602 /// filter with the current row's literal values, execute the modified
1603 /// subquery, and return the result as an InList or Bool literal.
1604 fn materialize_correlated_for_row_readonly(
1605 &self,
1606 expr: &Expr,
1607 outer_row: &[Value],
1608 outer_columns: &[String],
1609 ) -> Result<Expr, QueryError> {
1610 match expr {
1611 Expr::InSubquery {
1612 expr: inner,
1613 subquery,
1614 negated,
1615 } => {
1616 let inner =
1617 self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1618 let mut sub = *subquery.clone();
1619 if let Some(ref filter) = sub.filter {
1620 sub.filter = Some(substitute_outer_refs(
1621 filter,
1622 &sub.source,
1623 &self.catalog,
1624 outer_row,
1625 outer_columns,
1626 ));
1627 }
1628 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1629 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1630 let result = self.execute_plan_readonly(&sub_plan)?;
1631 let values = match result {
1632 QueryResult::Rows { rows, .. } => rows
1633 .into_iter()
1634 .filter_map(|mut row| {
1635 if row.is_empty() {
1636 None
1637 } else {
1638 Some(value_to_expr(row.swap_remove(0)))
1639 }
1640 })
1641 .collect(),
1642 _ => Vec::new(),
1643 };
1644 Ok(Expr::InList {
1645 expr: Box::new(inner),
1646 list: values,
1647 negated: *negated,
1648 })
1649 }
1650 Expr::ExistsSubquery { subquery, negated } => {
1651 let mut sub = *subquery.clone();
1652 if let Some(ref filter) = sub.filter {
1653 sub.filter = Some(substitute_outer_refs(
1654 filter,
1655 &sub.source,
1656 &self.catalog,
1657 outer_row,
1658 outer_columns,
1659 ));
1660 }
1661 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1662 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1663 let result = self.execute_plan_readonly(&sub_plan)?;
1664 let has_rows = match result {
1665 QueryResult::Rows { rows, .. } => !rows.is_empty(),
1666 _ => false,
1667 };
1668 let truth = if *negated { !has_rows } else { has_rows };
1669 Ok(Expr::Literal(Literal::Bool(truth)))
1670 }
1671 Expr::BinaryOp(l, op, r) => {
1672 let l =
1673 self.materialize_correlated_for_row_readonly(l, outer_row, outer_columns)?;
1674 let r =
1675 self.materialize_correlated_for_row_readonly(r, outer_row, outer_columns)?;
1676 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1677 }
1678 Expr::UnaryOp(op, inner) => {
1679 let inner =
1680 self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1681 Ok(Expr::UnaryOp(*op, Box::new(inner)))
1682 }
1683 other => Ok(other.clone()),
1684 }
1685 }
1686
1687 pub fn catalog(&self) -> &Catalog {
1688 &self.catalog
1689 }
1690
1691 pub fn catalog_mut(&mut self) -> &mut Catalog {
1692 &mut self.catalog
1693 }
1694}