powdb_query/executor/mod.rs
1//! PowDB query executor.
2
3// Submodules that don't use macros defined in this file.
4mod compiled;
5mod eval;
6
7use crate::ast::*;
8use crate::canonicalize::canonicalize;
9use crate::plan::*;
10use crate::plan_cache::PlanCache;
11use crate::planner;
12use crate::result::{QueryError, QueryResult};
13use powdb_storage::catalog::Catalog;
14use powdb_storage::row::{decode_column, decode_row, RowLayout};
15use powdb_storage::types::*;
16use powdb_storage::view::ViewRegistry;
17
18use std::io;
19use std::path::Path;
20use std::sync::Mutex;
21use std::time::Instant;
22use tracing::{error, info, Level};
23
24use self::compiled::*;
25use self::eval::*;
26
27/// Legacy sentinel string constant — kept for backward compatibility with
28/// any external code matching on the string representation. New code should
29/// match on `QueryError::ReadonlyNeedsWrite` directly.
30pub const READONLY_NEEDS_WRITE: &str = "__POWDB_READONLY_NEEDS_WRITE__";
31
32/// Plan cache capacity. Bench workloads fill ~15 slots; real apps will sit
33/// comfortably in 256. Lookup is O(1), collisions clear the cache (see
34/// `plan_cache::PlanCache::insert`).
35const PLAN_CACHE_CAPACITY: usize = 256;
36
37/// Maximum number of rows a join may produce before the executor aborts.
38/// Prevents Cartesian-product blowups (e.g. `T cross join T` on 10K rows
39/// would produce 100M rows in memory without this cap).
40pub(super) const MAX_JOIN_ROWS: usize = 1_000_000;
41
42/// Maximum number of rows that may be materialized for sorting.
43/// Queries that exceed this should add a LIMIT clause to narrow the input
44/// before sorting.
45pub(super) const MAX_SORT_ROWS: usize = 10_000_000;
46
47#[inline]
48pub(super) fn check_join_limit(row_count: usize) -> Result<(), QueryError> {
49 if row_count > MAX_JOIN_ROWS {
50 return Err(QueryError::JoinLimitExceeded);
51 }
52 Ok(())
53}
54
55// ─── Mission D11 Phase 1: scalar hot-loop helpers ─────────────────────────
56//
57// These macros expand into the scan body of `agg_single_col_fast` and sit
58// inside the `for_each_row_raw` closure. They exist to:
59//
60// 1. Split the loop on presence of a predicate *outside* the hot body,
61// so the no-predicate path (agg_sum/agg_min/agg_max bench workloads)
62// never pays the `Option<CompiledPredicate>` branch per row.
63// 2. Drop two bounds checks per row by reading the null bitmap byte
64// and the 8-byte value via raw pointer casts.
65//
66// SAFETY (shared across every call site below):
67//
68// - `$bmp_byte` is `col_idx / 8` where `col_idx < n_cols`, and the row
69// encoding stores `bitmap_size = n_cols.div_ceil(8)` bytes of bitmap
70// starting at offset 2. So `2 + $bmp_byte < 2 + bitmap_size ≤ row_len`
71// and `get_unchecked(2 + $bmp_byte)` is inside the row slice.
72// - `$off = 2 + bitmap_size + fixed_offsets[col_idx]` for a fixed-size
73// column. Every fixed-size column contributes `fixed_size(type_id)`
74// bytes to the fixed region, so the row always has `[$off .. $off+8]`
75// available for any i64/f64 column — enforced by the row encoder
76// (`storage/src/row.rs`) and the schema invariant that a row with a
77// given schema has `row_len ≥ 2 + bitmap_size + fixed_region_size`.
78// - Both macros are only invoked from `agg_single_col_fast`, which
79// early-returns if the column isn't Int/Float (8-byte fixed) and
80// early-returns if `fast.fixed_offsets[col_idx]` is `None`.
81macro_rules! agg_int_loop {
82 (
83 $self:expr, $table:expr, $pred:expr,
84 $bmp_byte:expr, $bmp_bit:expr, $off:expr,
85 |$v:ident : i64| $body:block
86 ) => {{
87 let bmp_byte = $bmp_byte;
88 let bmp_bit = $bmp_bit;
89 let off = $off;
90 if let Some(pred) = &$pred {
91 $self
92 .catalog
93 .for_each_row_raw($table, |_rid, data| {
94 if !pred(data) {
95 return;
96 }
97 // Bounds guard: skip corrupt/truncated rows that are too
98 // short to contain the bitmap byte or the 8-byte value.
99 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
100 return;
101 }
102 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
103 // The bitmap byte lives at offset 2..2+bitmap_size in the
104 // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
105 // Corrupt rows are rejected by the bounds guard.
106 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
107 if (bmp >> bmp_bit) & 1 == 1 {
108 return;
109 }
110 // SAFETY: `off + 8 <= data.len()` is checked above.
111 // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
112 // to an 8-byte i64 in the fixed-size region of the row.
113 // The pointer cast is valid because we read exactly 8
114 // bytes via from_le_bytes. Corrupt rows are rejected by
115 // the bounds guard.
116 let $v: i64 =
117 unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
118 $body
119 })
120 .map_err(|e| QueryError::StorageError(e.to_string()))?;
121 } else {
122 $self
123 .catalog
124 .for_each_row_raw($table, |_rid, data| {
125 // Bounds guard: skip corrupt/truncated rows.
126 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
127 return;
128 }
129 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
130 // See the predicate branch for the full invariant.
131 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
132 if (bmp >> bmp_bit) & 1 == 1 {
133 return;
134 }
135 // SAFETY: `off + 8 <= data.len()` is checked above.
136 // See the predicate branch for the full invariant.
137 let $v: i64 =
138 unsafe { i64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
139 $body
140 })
141 .map_err(|e| QueryError::StorageError(e.to_string()))?;
142 }
143 }};
144}
145
146macro_rules! agg_float_loop {
147 (
148 $self:expr, $table:expr, $pred:expr,
149 $bmp_byte:expr, $bmp_bit:expr, $off:expr,
150 |$v:ident : f64| $body:block
151 ) => {{
152 let bmp_byte = $bmp_byte;
153 let bmp_bit = $bmp_bit;
154 let off = $off;
155 if let Some(pred) = &$pred {
156 $self
157 .catalog
158 .for_each_row_raw($table, |_rid, data| {
159 if !pred(data) {
160 return;
161 }
162 // Bounds guard: skip corrupt/truncated rows that are too
163 // short to contain the bitmap byte or the 8-byte value.
164 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
165 return;
166 }
167 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
168 // The bitmap byte lives at offset 2..2+bitmap_size in the
169 // row encoding, and bmp_byte = col_idx / 8 < bitmap_size.
170 // Corrupt rows are rejected by the bounds guard.
171 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
172 if (bmp >> bmp_bit) & 1 == 1 {
173 return;
174 }
175 // SAFETY: `off + 8 <= data.len()` is checked above.
176 // `off = 2 + bitmap_size + fixed_offsets[col_idx]` points
177 // to an 8-byte f64 in the fixed-size region of the row.
178 // The pointer cast is valid because we read exactly 8
179 // bytes via from_le_bytes. Corrupt rows are rejected by
180 // the bounds guard.
181 let $v: f64 =
182 unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
183 $body
184 })
185 .map_err(|e| QueryError::StorageError(e.to_string()))?;
186 } else {
187 $self
188 .catalog
189 .for_each_row_raw($table, |_rid, data| {
190 // Bounds guard: skip corrupt/truncated rows.
191 if 2 + bmp_byte >= data.len() || off + 8 > data.len() {
192 return;
193 }
194 // SAFETY: `2 + bmp_byte < data.len()` is checked above.
195 // See the predicate branch for the full invariant.
196 let bmp = unsafe { *data.get_unchecked(2 + bmp_byte) };
197 if (bmp >> bmp_bit) & 1 == 1 {
198 return;
199 }
200 // SAFETY: `off + 8 <= data.len()` is checked above.
201 // See the predicate branch for the full invariant.
202 let $v: f64 =
203 unsafe { f64::from_le_bytes(*(data.as_ptr().add(off) as *const [u8; 8])) };
204 $body
205 })
206 .map_err(|e| QueryError::StorageError(e.to_string()))?;
207 }
208 }};
209}
210
211// Submodules that use the macros above — must be declared after macro_rules!.
212mod plan_exec;
213mod prepared;
214
215#[cfg(test)]
216mod tests;
217
218// Re-exports for the public API
219pub use self::prepared::PreparedQuery;
220
221use self::plan_exec::{
222 compute_group_aggregate, execute_window, format_plan_tree, hash_join,
223 lower_unindexed_range_scans, range_matches, synthesize_range_predicate,
224 try_extract_equi_join_keys,
225};
226
227/// Mission infra-1: classify a parsed statement as read-only vs. mutating.
228/// Used by [`Engine::execute_powql_readonly`] and by the server handler
229/// to decide between the RwLock reader and writer sides. `Union` recurses
230/// because each side can independently be read/write (though in practice
231/// both sides are reads — the parser only builds Union from query shapes).
232pub fn is_read_only_statement(stmt: &Statement) -> bool {
233 match stmt {
234 Statement::Query(_) => true,
235 Statement::Union(u) => is_read_only_statement(&u.left) && is_read_only_statement(&u.right),
236 Statement::Insert(_)
237 | Statement::Upsert(_)
238 | Statement::UpdateQuery(_)
239 | Statement::DeleteQuery(_)
240 | Statement::CreateType(_)
241 | Statement::AlterTable(_)
242 | Statement::DropTable(_)
243 | Statement::CreateView(_)
244 | Statement::RefreshView(_)
245 | Statement::DropView(_) => false,
246 Statement::Begin | Statement::Commit | Statement::Rollback => false,
247 Statement::Explain(inner) => is_read_only_statement(inner),
248 }
249}
250
251pub struct Engine {
252 catalog: Catalog,
253 /// Mission D9 — cached parsed+planned query trees keyed by canonical
254 /// hash. Saves the ~3μs parse+plan cost on repeat queries that differ
255 /// only in literal values.
256 ///
257 /// Mission infra-1: wrapped in `Mutex` so the read path can be driven
258 /// by `&self`. The critical section is extremely short — a single
259 /// hashmap lookup + plan clone on a hit, or a single insert on a miss.
260 /// A full `RwLock` would be over-engineered here; the contention window
261 /// is smaller than the read-path scan work it gates.
262 plan_cache: Mutex<PlanCache>,
263 /// Mission C Phase 13: reusable `Vec<Value>` scratch buffer for the
264 /// prepared-insert fast path. `execute_prepared` used to allocate a
265 /// fresh `vec![Value::Empty; n_cols]` on every insert; recycling this
266 /// buffer shaves one heap alloc per row on `insert_batch_1k`.
267 insert_values_scratch: Vec<Value>,
268 /// Materialized view registry: tracks view definitions, dependencies,
269 /// and dirty state. Views are backed by regular catalog tables; this
270 /// registry adds the lifecycle metadata.
271 view_registry: ViewRegistry,
272 in_transaction: bool,
273}
274
275impl Engine {
276 /// Open or create a PowDB engine rooted at `data_dir`.
277 ///
278 /// If the directory already contains a catalog, it is reopened.
279 /// Otherwise a fresh empty database is created.
280 ///
281 /// # Examples
282 ///
283 /// ```
284 /// use powdb_query::executor::Engine;
285 ///
286 /// let dir = tempfile::tempdir().unwrap();
287 /// let engine = Engine::new(dir.path()).unwrap();
288 /// // Engine is ready — the directory now contains a catalog.
289 /// ```
290 pub fn new(data_dir: &Path) -> io::Result<Self> {
291 std::fs::create_dir_all(data_dir)?;
292 // Try to reopen an existing database first; only create a fresh
293 // catalog when there isn't one already on disk.
294 let catalog = match Catalog::open(data_dir) {
295 Ok(c) => {
296 info!(data_dir = %data_dir.display(), "engine reopened existing database");
297 c
298 }
299 Err(e) if e.kind() == io::ErrorKind::NotFound => {
300 info!(data_dir = %data_dir.display(), "engine initialized fresh database");
301 Catalog::create(data_dir)?
302 }
303 Err(e) => return Err(e),
304 };
305 let view_registry =
306 ViewRegistry::open(data_dir).unwrap_or_else(|_| ViewRegistry::new(data_dir));
307 Ok(Engine {
308 catalog,
309 plan_cache: Mutex::new(PlanCache::new(PLAN_CACHE_CAPACITY)),
310 insert_values_scratch: Vec::new(),
311 view_registry,
312 in_transaction: false,
313 })
314 }
315
316 /// Parse + plan + execute a PowQL query.
317 ///
318 /// # Examples
319 ///
320 /// ```
321 /// use powdb_query::executor::Engine;
322 /// use powdb_query::result::QueryResult;
323 ///
324 /// let dir = tempfile::tempdir().unwrap();
325 /// let mut engine = Engine::new(dir.path()).unwrap();
326 ///
327 /// // Create a table and insert a row.
328 /// engine.execute_powql("type User { required name: str, age: int }").unwrap();
329 /// engine.execute_powql(r#"insert User { name := "Alice", age := 30 }"#).unwrap();
330 ///
331 /// // Query rows back.
332 /// let result = engine.execute_powql("User").unwrap();
333 /// assert_eq!(result.row_count(), 1);
334 /// ```
335 ///
336 /// Mission D6 — tracing collapse: the previous implementation ran 4
337 /// `Instant::now()` + 3 `elapsed().as_micros()` calls + formatted an
338 /// `info!` span on every query, even when tracing was disabled. On a
339 /// sub-microsecond `point_lookup_indexed` call that overhead was
340 /// 100-200ns — 20%+ of the whole query. We now measure time only when
341 /// INFO is actually enabled via `tracing::enabled!`, and we moved the
342 /// noisy `debug!(?plan)` line behind the same gate so the Debug
343 /// formatter can't run unconditionally either.
344 ///
345 /// Mission D9 — plan cache: on the hot path we canonicalise the query
346 /// text (lex + FNV-1a hash with literal values stripped), check the
347 /// cache, and on a hit substitute the new literals into a clone of the
348 /// cached plan. This skips re-lexing, re-parsing, and re-planning —
349 /// around 3μs per call on bench workloads. On a miss we plan as before
350 /// and insert the plan under its canonical hash.
351 pub fn execute_powql(&mut self, input: &str) -> Result<QueryResult, QueryError> {
352 // Hot path: tracing disabled. Zero syscalls, zero formatting.
353 if !tracing::enabled!(Level::INFO) {
354 // D9: try the plan cache first. Canonicalisation lexes the
355 // query once; on a hit we skip the parser and planner entirely.
356 if let Ok((hash, literals)) = canonicalize(input) {
357 let cached = self
358 .plan_cache
359 .lock()
360 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
361 .get_with_substitution(hash, &literals);
362 if let Some(plan) = cached {
363 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
364 let result = self.execute_plan(&plan);
365 // Mission B (post-review): statement-boundary WAL
366 // group commit. Catalog::wal_log now only appends;
367 // the fsync happens here exactly once per statement.
368 // `sync_wal` is a no-op when nothing was buffered
369 // (pure reads pay zero fsync).
370 if !self.in_transaction {
371 self.catalog
372 .sync_wal()
373 .map_err(|e| QueryError::StorageError(e.to_string()))?;
374 }
375 return result;
376 }
377 // Miss — plan, insert, execute.
378 return match planner::plan(input) {
379 Ok(plan) => {
380 self.plan_cache
381 .lock()
382 .map_err(|e| {
383 QueryError::Execution(format!("plan cache lock poisoned: {e}"))
384 })?
385 .insert(hash, plan.clone());
386 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
387 let result = self.execute_plan(&plan);
388 if !self.in_transaction {
389 self.catalog
390 .sync_wal()
391 .map_err(|e| QueryError::StorageError(e.to_string()))?;
392 }
393 result
394 }
395 Err(e) => Err(QueryError::Parse(e.to_string())),
396 };
397 }
398 // Lex error — fall through to the planner so the caller gets a
399 // consistent error shape.
400 return match planner::plan(input) {
401 Ok(plan) => {
402 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
403 let result = self.execute_plan(&plan);
404 if !self.in_transaction {
405 self.catalog
406 .sync_wal()
407 .map_err(|e| QueryError::StorageError(e.to_string()))?;
408 }
409 result
410 }
411 Err(e) => Err(QueryError::Parse(e.to_string())),
412 };
413 }
414
415 // Instrumented path — only taken under explicit tracing subscribers.
416 let total_start = Instant::now();
417 let plan_start = Instant::now();
418 let plan = planner::plan(input).map_err(|e| {
419 let msg = e.to_string();
420 error!(query = %input, error = %msg, "query plan failed");
421 QueryError::Parse(msg)
422 })?;
423 let plan_us = plan_start.elapsed().as_micros();
424
425 let exec_start = Instant::now();
426 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
427 let result = self.execute_plan(&plan);
428 if !self.in_transaction {
429 self.catalog
430 .sync_wal()
431 .map_err(|e| QueryError::StorageError(e.to_string()))?;
432 }
433 let exec_us = exec_start.elapsed().as_micros();
434
435 let total_us = total_start.elapsed().as_micros();
436 match &result {
437 Ok(r) => {
438 info!(
439 query = %input,
440 plan_us = plan_us,
441 exec_us = exec_us,
442 total_us = total_us,
443 rows = r.row_count(),
444 "query ok"
445 );
446 }
447 Err(e) => {
448 error!(
449 query = %input,
450 plan_us = plan_us,
451 exec_us = exec_us,
452 error = %e,
453 "query failed"
454 );
455 }
456 }
457 result
458 }
459
460 /// Plan cache stats — useful for benches and debugging.
461 pub fn plan_cache_stats(&self) -> (u64, u64, usize) {
462 let cache = self.plan_cache.lock().unwrap_or_else(|e| e.into_inner());
463 (cache.hits, cache.misses, cache.len())
464 }
465
466 /// Mission infra-1: read-only entry point.
467 ///
468 /// Parses + plans + executes a PowQL query using only a shared borrow
469 /// on the engine. Rejects any statement that would mutate state
470 /// (Insert/Update/Delete/CreateTable/AlterTable/DropTable/CreateView/
471 /// RefreshView/DropView) by returning [`READONLY_NEEDS_WRITE`] so the
472 /// caller can escalate to the write lock.
473 ///
474 /// Also returns [`READONLY_NEEDS_WRITE`] if a materialized view in the
475 /// query is dirty — refreshing one requires `&mut self`, so the caller
476 /// must retake the write lock for the first refresh.
477 ///
478 /// This method is the concurrent-read fast path behind
479 /// `Arc<RwLock<Engine>>`: multiple threads can call it simultaneously
480 /// under a shared `.read()` lock and each will scan independently.
481 pub fn execute_powql_readonly(&self, input: &str) -> Result<QueryResult, QueryError> {
482 // Parse the statement first so we can classify read vs. write
483 // without touching the catalog. This is the same lex+parse cost
484 // the hot path would pay anyway.
485 let stmt = crate::parser::parse(input).map_err(|e| QueryError::Parse(e.to_string()))?;
486 if !is_read_only_statement(&stmt) {
487 return Err(QueryError::ReadonlyNeedsWrite);
488 }
489
490 // Try the plan cache first — identical hash scheme to
491 // `execute_powql` so both paths share cache state. The mutex
492 // section is just a hashmap lookup + plan clone.
493 if let Ok((hash, literals)) = canonicalize(input) {
494 let cached = self
495 .plan_cache
496 .lock()
497 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
498 .get_with_substitution(hash, &literals);
499 if let Some(plan) = cached {
500 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
501 return self.execute_plan_readonly(&plan);
502 }
503 // Miss: plan + insert + execute. The planner is pure, so this
504 // is safe from `&self`.
505 let plan = crate::planner::plan_statement(stmt)
506 .map_err(|e| QueryError::Parse(e.to_string()))?;
507 self.plan_cache
508 .lock()
509 .map_err(|e| QueryError::Execution(format!("plan cache lock poisoned: {e}")))?
510 .insert(hash, plan.clone());
511 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
512 return self.execute_plan_readonly(&plan);
513 }
514 // Lex error — fall through to the planner for a consistent error
515 // shape (though `parse` above would usually have caught it).
516 let plan =
517 crate::planner::plan_statement(stmt).map_err(|e| QueryError::Parse(e.to_string()))?;
518 let plan = lower_unindexed_range_scans(&self.catalog, &plan);
519 self.execute_plan_readonly(&plan)
520 }
521
522 /// Read-only version of [`Engine::execute_plan`]. Dispatches the
523 /// read-path plan variants by calling `&self` helpers and errors with
524 /// [`READONLY_NEEDS_WRITE`] on any write variant. This is the
525 /// recursion target for composite read plans under the RwLock reader.
526 ///
527 /// The dispatch mirrors `execute_plan` for the read branches but does
528 /// not carry any of the fast-paths that need `&mut self` (e.g. plan-
529 /// cache mutation on inner subqueries is handled via the shared mutex
530 /// in [`Engine::execute_powql_readonly`]; in-flight subquery
531 /// materialisation uses [`Engine::materialize_subqueries_readonly`]).
532 fn execute_plan_readonly(&self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
533 match plan {
534 PlanNode::SeqScan { table } => {
535 // Dirty view means we'd need to refresh it — can't do that
536 // under `&self`. Escalate to the write path.
537 if self.view_registry.is_dirty(table) {
538 return Err(QueryError::ReadonlyNeedsWrite);
539 }
540 let schema = self
541 .catalog
542 .schema(table)
543 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
544 .clone();
545 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
546 let rows: Vec<Vec<Value>> = self
547 .catalog
548 .scan(table)
549 .map_err(|e| e.to_string())?
550 .map(|(_, row)| row)
551 .collect();
552 Ok(QueryResult::Rows { columns, rows })
553 }
554
555 PlanNode::AliasScan { table, alias } => {
556 let schema = self
557 .catalog
558 .schema(table)
559 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
560 .clone();
561 let columns: Vec<String> = schema
562 .columns
563 .iter()
564 .map(|c| format!("{alias}.{}", c.name))
565 .collect();
566 let rows: Vec<Vec<Value>> = self
567 .catalog
568 .scan(table)
569 .map_err(|e| e.to_string())?
570 .map(|(_, row)| row)
571 .collect();
572 Ok(QueryResult::Rows { columns, rows })
573 }
574
575 PlanNode::IndexScan { table, column, key } => {
576 let schema = self
577 .catalog
578 .schema(table)
579 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
580 .clone();
581 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
582 let key_value = literal_to_value(key)?;
583 let tbl = self
584 .catalog
585 .get_table(table)
586 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
587
588 if tbl.has_index(column) {
589 // Use index_lookup_all to handle both unique and
590 // non-unique indexes — returns all matching RowIds.
591 let rids = tbl.index_lookup_all(column, &key_value);
592 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
593 for rid in rids {
594 if let Some(data) = tbl.heap.get(rid) {
595 rows.push(decode_row(&tbl.schema, &data));
596 }
597 }
598 return Ok(QueryResult::Rows { columns, rows });
599 }
600
601 // No index: synthetic eq predicate + compiled scan.
602 let fast = FastLayout::new(&schema);
603 let synth_pred = Expr::BinaryOp(
604 Box::new(Expr::Field(column.clone())),
605 BinOp::Eq,
606 Box::new(key.clone()),
607 );
608 if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, &schema) {
609 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
610 self.catalog
611 .for_each_row_raw(table, |_rid, data| {
612 if compiled(data) {
613 rows.push(decode_row(&schema, data));
614 }
615 })
616 .map_err(|e| QueryError::StorageError(e.to_string()))?;
617 return Ok(QueryResult::Rows { columns, rows });
618 }
619
620 // Last resort: slow eq-check.
621 let col_idx =
622 schema
623 .column_index(column)
624 .ok_or_else(|| QueryError::ColumnNotFound {
625 table: String::new(),
626 column: column.clone(),
627 })?;
628 let rows: Vec<Vec<Value>> = tbl
629 .scan()
630 .filter_map(|(_, row)| {
631 if row[col_idx] == key_value {
632 Some(row)
633 } else {
634 None
635 }
636 })
637 .collect();
638 Ok(QueryResult::Rows { columns, rows })
639 }
640
641 PlanNode::RangeScan {
642 table,
643 column,
644 start,
645 end,
646 } => {
647 let tbl = self
648 .catalog
649 .get_table(table)
650 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
651 let columns: Vec<String> =
652 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
653 let schema = tbl.schema.clone();
654
655 let start_val = match start {
656 Some((expr, _)) => Some(literal_to_value(expr)?),
657 None => None,
658 };
659 let end_val = match end {
660 Some((expr, _)) => Some(literal_to_value(expr)?),
661 None => None,
662 };
663 let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
664 let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
665
666 // Range scans only use the btree fast path for unique indexes.
667 // Non-unique indexes store composite keys that don't compare
668 // directly against raw column values.
669 if tbl.is_index_unique(column) == Some(true) {
670 if let Some(btree) = tbl.index(column) {
671 let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
672 (Some(s), Some(e)) => btree.range(s, e).collect(),
673 (Some(s), None) => btree.range_from(s),
674 (None, Some(e)) => btree.range_to(e),
675 (None, None) => {
676 // Unbounded both sides — equivalent to seq scan.
677 let rows: Vec<Vec<Value>> =
678 tbl.scan().map(|(_, row)| row).collect();
679 return Ok(QueryResult::Rows { columns, rows });
680 }
681 };
682 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
683 for (key, rid) in hits {
684 // Filter for exclusive bounds.
685 if !start_inclusive {
686 if let Some(ref s) = start_val {
687 if &key == s {
688 continue;
689 }
690 }
691 }
692 if !end_inclusive {
693 if let Some(ref e) = end_val {
694 if &key == e {
695 continue;
696 }
697 }
698 }
699 if let Some(data) = tbl.heap.get(rid) {
700 rows.push(decode_row(&schema, &data));
701 }
702 }
703 return Ok(QueryResult::Rows { columns, rows });
704 }
705 }
706
707 // Fallback: no index — synthesize the range predicate and scan.
708 let fast = FastLayout::new(&schema);
709 let synth = synthesize_range_predicate(column, start, end);
710 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, &schema) {
711 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
712 self.catalog
713 .for_each_row_raw(table, |_rid, data| {
714 if compiled(data) {
715 rows.push(decode_row(&schema, data));
716 }
717 })
718 .map_err(|e| QueryError::StorageError(e.to_string()))?;
719 return Ok(QueryResult::Rows { columns, rows });
720 }
721
722 // Last resort: decoded row eval.
723 let col_idx =
724 schema
725 .column_index(column)
726 .ok_or_else(|| QueryError::ColumnNotFound {
727 table: String::new(),
728 column: column.clone(),
729 })?;
730 let rows: Vec<Vec<Value>> = tbl
731 .scan()
732 .filter(|(_, row)| {
733 range_matches(
734 &row[col_idx],
735 &start_val,
736 start_inclusive,
737 &end_val,
738 end_inclusive,
739 )
740 })
741 .map(|(_, row)| row)
742 .collect();
743 Ok(QueryResult::Rows { columns, rows })
744 }
745
746 PlanNode::Filter { input, predicate } => {
747 // Materialise subqueries using the `&self` variant.
748 // Uncorrelated subqueries are replaced with InList/Bool;
749 // correlated ones are left as InSubquery/ExistsSubquery
750 // for per-row materialisation below.
751 let materialized;
752 let predicate = if contains_subquery(predicate) {
753 materialized = self.materialize_subqueries_readonly(predicate)?;
754 &materialized
755 } else {
756 predicate
757 };
758
759 // Correlated subquery path: per-row materialisation.
760 if contains_subquery(predicate) {
761 let result = self.execute_plan_readonly(input)?;
762 return match result {
763 QueryResult::Rows { columns, rows } => {
764 let mut filtered = Vec::new();
765 for row in rows {
766 let row_pred = self.materialize_correlated_for_row_readonly(
767 predicate, &row, &columns,
768 )?;
769 if eval_predicate(&row_pred, &row, &columns) {
770 filtered.push(row);
771 }
772 }
773 Ok(QueryResult::Rows {
774 columns,
775 rows: filtered,
776 })
777 }
778 _ => Err("filter requires row input".into()),
779 };
780 }
781
782 // Fused Filter+SeqScan fast path.
783 if let PlanNode::SeqScan { table } = input.as_ref() {
784 if self.view_registry.is_dirty(table) {
785 return Err(QueryError::ReadonlyNeedsWrite);
786 }
787 let schema = self
788 .catalog
789 .schema(table)
790 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
791 .clone();
792 let columns: Vec<String> =
793 schema.columns.iter().map(|c| c.name.clone()).collect();
794 let fast = FastLayout::new(&schema);
795 let row_layout = RowLayout::new(&schema);
796 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
797
798 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
799 self.catalog
800 .for_each_row_raw(table, |_rid, data| {
801 if compiled(data) {
802 rows.push(decode_row(&schema, data));
803 }
804 })
805 .map_err(|e| QueryError::StorageError(e.to_string()))?;
806 } else {
807 let pred_cols = predicate_column_indices(predicate, &columns);
808 self.catalog
809 .for_each_row_raw(table, |_rid, data| {
810 let pred_row =
811 decode_selective(&schema, &row_layout, data, &pred_cols);
812 if eval_predicate(predicate, &pred_row, &columns) {
813 rows.push(decode_row(&schema, data));
814 }
815 })
816 .map_err(|e| QueryError::StorageError(e.to_string()))?;
817 }
818
819 return Ok(QueryResult::Rows { columns, rows });
820 }
821
822 // General path.
823 let result = self.execute_plan_readonly(input)?;
824 match result {
825 QueryResult::Rows { columns, rows } => {
826 let filtered: Vec<Vec<Value>> = rows
827 .into_iter()
828 .filter(|row| eval_predicate(predicate, row, &columns))
829 .collect();
830 Ok(QueryResult::Rows {
831 columns,
832 rows: filtered,
833 })
834 }
835 _ => Err("filter requires row input".into()),
836 }
837 }
838
839 PlanNode::Project { input, fields } => {
840 // Fast path: Project over IndexScan. Avoids full-row decode
841 // by calling decode_column only for projected fields.
842 if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
843 let key_value = literal_to_value(key)?;
844 let tbl = self
845 .catalog
846 .get_table(table)
847 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?;
848 let schema = &tbl.schema;
849 let layout = tbl.row_layout();
850
851 let proj_columns: Vec<String> = fields
852 .iter()
853 .map(|f| {
854 f.alias.clone().unwrap_or_else(|| match &f.expr {
855 Expr::Field(name) => name.clone(),
856 _ => "?".into(),
857 })
858 })
859 .collect();
860
861 let proj_indices: Vec<usize> = fields
862 .iter()
863 .filter_map(|f| {
864 if let Expr::Field(name) = &f.expr {
865 schema.column_index(name)
866 } else {
867 None
868 }
869 })
870 .collect();
871
872 if tbl.has_index(column) {
873 let rids = tbl.index_lookup_all(column, &key_value);
874 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
875 for rid in rids {
876 if let Some(data) = tbl.heap.get(rid) {
877 let row: Vec<Value> = proj_indices
878 .iter()
879 .map(|&ci| decode_column(schema, layout, &data, ci))
880 .collect();
881 rows.push(row);
882 }
883 }
884 return Ok(QueryResult::Rows {
885 columns: proj_columns,
886 rows,
887 });
888 }
889 }
890
891 // Fast paths over Limit(Sort(...)) / Limit(Filter(...)) / Limit(SeqScan).
892 if let PlanNode::Limit {
893 input: inner,
894 count: limit_expr,
895 } = input.as_ref()
896 {
897 if let PlanNode::Sort {
898 input: sort_input,
899 keys,
900 } = inner.as_ref()
901 {
902 if keys.len() == 1 {
903 let sort_field = &keys[0].field;
904 let descending = keys[0].descending;
905 let limit = match limit_expr {
906 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
907 _ => usize::MAX,
908 };
909 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
910 match sort_input.as_ref() {
911 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
912 PlanNode::Filter {
913 input: fi,
914 predicate,
915 } => {
916 if let PlanNode::SeqScan { table } = fi.as_ref() {
917 (Some(table.as_str()), Some(predicate))
918 } else {
919 (None, None)
920 }
921 }
922 _ => (None, None),
923 };
924 if let Some(table) = table_opt {
925 if let Some(result) = self.project_filter_sort_limit_fast(
926 table, fields, sort_field, descending, limit, pred_opt,
927 )? {
928 return Ok(result);
929 }
930 }
931 }
932 }
933 if let PlanNode::Filter {
934 input: fi,
935 predicate,
936 } = inner.as_ref()
937 {
938 if let PlanNode::SeqScan { table } = fi.as_ref() {
939 let limit = match limit_expr {
940 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
941 _ => usize::MAX,
942 };
943 if let Some(result) = self.project_filter_limit_fast(
944 table,
945 fields,
946 limit,
947 Some(predicate),
948 )? {
949 return Ok(result);
950 }
951 }
952 }
953 if let PlanNode::SeqScan { table } = inner.as_ref() {
954 let limit = match limit_expr {
955 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
956 _ => usize::MAX,
957 };
958 if let Some(result) =
959 self.project_filter_limit_fast(table, fields, limit, None)?
960 {
961 return Ok(result);
962 }
963 }
964 }
965
966 // Project(Filter(SeqScan)) without Limit.
967 if let PlanNode::Filter {
968 input: fi,
969 predicate,
970 } = input.as_ref()
971 {
972 if let PlanNode::SeqScan { table } = fi.as_ref() {
973 if let Some(result) = self.project_filter_limit_fast(
974 table,
975 fields,
976 usize::MAX,
977 Some(predicate),
978 )? {
979 return Ok(result);
980 }
981 }
982 }
983
984 // Project(SeqScan) without Filter or Limit.
985 if let PlanNode::SeqScan { table } = input.as_ref() {
986 if let Some(result) =
987 self.project_filter_limit_fast(table, fields, usize::MAX, None)?
988 {
989 return Ok(result);
990 }
991 }
992
993 // Generic path.
994 let result = self.execute_plan_readonly(input)?;
995 match result {
996 QueryResult::Rows { columns, rows } => {
997 let proj_columns: Vec<String> = fields
998 .iter()
999 .map(|f| {
1000 f.alias.clone().unwrap_or_else(|| match &f.expr {
1001 Expr::Field(name) => name.clone(),
1002 Expr::QualifiedField { qualifier, field } => {
1003 format!("{qualifier}.{field}")
1004 }
1005 _ => "?".into(),
1006 })
1007 })
1008 .collect();
1009 let proj_rows: Vec<Vec<Value>> = rows
1010 .iter()
1011 .map(|row| {
1012 fields
1013 .iter()
1014 .map(|f| eval_expr(&f.expr, row, &columns))
1015 .collect()
1016 })
1017 .collect();
1018 Ok(QueryResult::Rows {
1019 columns: proj_columns,
1020 rows: proj_rows,
1021 })
1022 }
1023 _ => Err("project requires row input".into()),
1024 }
1025 }
1026
1027 PlanNode::Sort { input, keys } => {
1028 let result = self.execute_plan_readonly(input)?;
1029 match result {
1030 QueryResult::Rows { columns, mut rows } => {
1031 if rows.len() > MAX_SORT_ROWS {
1032 return Err(QueryError::SortLimitExceeded);
1033 }
1034 let key_indices: Vec<(usize, bool)> = keys
1035 .iter()
1036 .map(|k| {
1037 columns
1038 .iter()
1039 .position(|c| c == &k.field)
1040 .map(|idx| (idx, k.descending))
1041 .ok_or_else(|| QueryError::ColumnNotFound {
1042 table: String::new(),
1043 column: k.field.clone(),
1044 })
1045 })
1046 .collect::<Result<_, QueryError>>()?;
1047 rows.sort_by(|a, b| {
1048 for &(col_idx, descending) in &key_indices {
1049 let cmp = a[col_idx].cmp(&b[col_idx]);
1050 let cmp = if descending { cmp.reverse() } else { cmp };
1051 if cmp != std::cmp::Ordering::Equal {
1052 return cmp;
1053 }
1054 }
1055 std::cmp::Ordering::Equal
1056 });
1057 Ok(QueryResult::Rows { columns, rows })
1058 }
1059 _ => Err("sort requires row input".into()),
1060 }
1061 }
1062
1063 PlanNode::Limit { input, count } => {
1064 let result = self.execute_plan_readonly(input)?;
1065 let n = match count {
1066 Expr::Literal(Literal::Int(v)) => *v as usize,
1067 _ => return Err("limit must be integer literal".into()),
1068 };
1069 match result {
1070 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1071 columns,
1072 rows: rows.into_iter().take(n).collect(),
1073 }),
1074 _ => Err("limit requires row input".into()),
1075 }
1076 }
1077
1078 PlanNode::Offset { input, count } => {
1079 let result = self.execute_plan_readonly(input)?;
1080 let n = match count {
1081 Expr::Literal(Literal::Int(v)) => *v as usize,
1082 _ => return Err("offset must be integer literal".into()),
1083 };
1084 match result {
1085 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
1086 columns,
1087 rows: rows.into_iter().skip(n).collect(),
1088 }),
1089 _ => Err("offset requires row input".into()),
1090 }
1091 }
1092
1093 PlanNode::Aggregate {
1094 input,
1095 function,
1096 field,
1097 } => {
1098 // Fast path: count() over SeqScan.
1099 if *function == AggFunc::Count {
1100 if let PlanNode::SeqScan { table } = input.as_ref() {
1101 let mut count: i64 = 0;
1102 self.catalog
1103 .for_each_row_raw(table, |_rid, _data| {
1104 count += 1;
1105 })
1106 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1107 return Ok(QueryResult::Scalar(Value::Int(count)));
1108 }
1109 if let PlanNode::Filter {
1110 input: inner,
1111 predicate,
1112 } = input.as_ref()
1113 {
1114 if let PlanNode::SeqScan { table } = inner.as_ref() {
1115 let schema = self
1116 .catalog
1117 .schema(table)
1118 .ok_or_else(|| QueryError::TableNotFound(table.clone()))?
1119 .clone();
1120 let columns: Vec<String> =
1121 schema.columns.iter().map(|c| c.name.clone()).collect();
1122 let fast = FastLayout::new(&schema);
1123 let row_layout = RowLayout::new(&schema);
1124
1125 if let Some(compiled) =
1126 compile_predicate(predicate, &columns, &fast, &schema)
1127 {
1128 let mut count: i64 = 0;
1129 self.catalog
1130 .for_each_row_raw(table, |_rid, data| {
1131 if compiled(data) {
1132 count += 1;
1133 }
1134 })
1135 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1136 return Ok(QueryResult::Scalar(Value::Int(count)));
1137 }
1138
1139 let pred_cols = predicate_column_indices(predicate, &columns);
1140 let mut count: i64 = 0;
1141 self.catalog
1142 .for_each_row_raw(table, |_rid, data| {
1143 let pred_row =
1144 decode_selective(&schema, &row_layout, data, &pred_cols);
1145 if eval_predicate(predicate, &pred_row, &columns) {
1146 count += 1;
1147 }
1148 })
1149 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1150 return Ok(QueryResult::Scalar(Value::Int(count)));
1151 }
1152 }
1153 }
1154
1155 // Fast path: sum/avg/min/max over single fixed-size numeric.
1156 if matches!(
1157 function,
1158 AggFunc::Sum
1159 | AggFunc::Avg
1160 | AggFunc::Min
1161 | AggFunc::Max
1162 | AggFunc::CountDistinct
1163 ) {
1164 if let Some(col) = field.as_ref() {
1165 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
1166 match input.as_ref() {
1167 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
1168 PlanNode::Filter {
1169 input: inner,
1170 predicate,
1171 } => {
1172 if let PlanNode::SeqScan { table } = inner.as_ref() {
1173 (Some(table.as_str()), Some(predicate))
1174 } else {
1175 (None, None)
1176 }
1177 }
1178 _ => (None, None),
1179 };
1180 if let Some(table) = table_opt {
1181 if let Some(result) =
1182 self.agg_single_col_fast(table, col, *function, pred_opt)?
1183 {
1184 return Ok(result);
1185 }
1186 }
1187 }
1188 }
1189
1190 // Generic path.
1191 let result = self.execute_plan_readonly(input)?;
1192 match result {
1193 QueryResult::Rows { columns, rows } => match function {
1194 AggFunc::Count => Ok(QueryResult::Scalar(Value::Int(rows.len() as i64))),
1195 AggFunc::CountDistinct => {
1196 let col = field.as_ref().ok_or("count distinct requires field")?;
1197 let idx = columns
1198 .iter()
1199 .position(|c| c == col)
1200 .ok_or("col not found")?;
1201 let mut seen = std::collections::HashSet::new();
1202 for row in &rows {
1203 let v = &row[idx];
1204 if !v.is_empty() {
1205 seen.insert(v.clone());
1206 }
1207 }
1208 Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
1209 }
1210 AggFunc::Avg => {
1211 let col = field.as_ref().ok_or("avg requires field")?;
1212 let idx = columns
1213 .iter()
1214 .position(|c| c == col)
1215 .ok_or("col not found")?;
1216 let sum: f64 = rows
1217 .iter()
1218 .filter_map(|r| match &r[idx] {
1219 Value::Int(v) => Some(*v as f64),
1220 Value::Float(v) => Some(*v),
1221 _ => None,
1222 })
1223 .sum();
1224 let count = rows.len() as f64;
1225 Ok(QueryResult::Scalar(Value::Float(sum / count)))
1226 }
1227 AggFunc::Sum => {
1228 let col = field.as_ref().ok_or("sum requires field")?;
1229 let idx = columns
1230 .iter()
1231 .position(|c| c == col)
1232 .ok_or("col not found")?;
1233 let mut int_sum: i64 = 0;
1234 let mut float_sum: f64 = 0.0;
1235 let mut saw_float = false;
1236 for r in &rows {
1237 match &r[idx] {
1238 Value::Int(v) => int_sum += *v,
1239 Value::Float(v) => {
1240 float_sum += *v;
1241 saw_float = true;
1242 }
1243 _ => {}
1244 }
1245 }
1246 let result = if saw_float {
1247 Value::Float(float_sum + int_sum as f64)
1248 } else {
1249 Value::Int(int_sum)
1250 };
1251 Ok(QueryResult::Scalar(result))
1252 }
1253 AggFunc::Min | AggFunc::Max => {
1254 let col = field.as_ref().ok_or("min/max requires field")?;
1255 let idx = columns
1256 .iter()
1257 .position(|c| c == col)
1258 .ok_or("col not found")?;
1259 let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
1260 let result = if *function == AggFunc::Min {
1261 vals.into_iter().min().cloned()
1262 } else {
1263 vals.into_iter().max().cloned()
1264 };
1265 Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
1266 }
1267 },
1268 _ => Err("aggregate requires row input".into()),
1269 }
1270 }
1271
1272 PlanNode::Distinct { input } => {
1273 let result = self.execute_plan_readonly(input)?;
1274 match result {
1275 QueryResult::Rows { columns, rows } => {
1276 let mut seen = std::collections::HashSet::new();
1277 let mut unique_rows = Vec::new();
1278 for row in rows {
1279 if seen.insert(row.clone()) {
1280 unique_rows.push(row);
1281 }
1282 }
1283 Ok(QueryResult::Rows {
1284 columns,
1285 rows: unique_rows,
1286 })
1287 }
1288 other => Ok(other),
1289 }
1290 }
1291
1292 PlanNode::GroupBy {
1293 input,
1294 keys,
1295 aggregates,
1296 having,
1297 } => {
1298 let result = self.execute_plan_readonly(input)?;
1299 match result {
1300 QueryResult::Rows { columns, rows } => {
1301 let key_indices: Vec<usize> = keys
1302 .iter()
1303 .map(|k| {
1304 columns.iter().position(|c| c == k).ok_or_else(|| {
1305 QueryError::ColumnNotFound {
1306 table: String::new(),
1307 column: k.clone(),
1308 }
1309 })
1310 })
1311 .collect::<Result<Vec<_>, _>>()?;
1312
1313 let agg_field_indices: Vec<usize> = aggregates
1314 .iter()
1315 .map(|a| {
1316 if a.field == "*" {
1317 Ok(usize::MAX)
1318 } else {
1319 columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1320 QueryError::ColumnNotFound {
1321 table: String::new(),
1322 column: a.field.clone(),
1323 }
1324 })
1325 }
1326 })
1327 .collect::<Result<Vec<_>, _>>()?;
1328
1329 let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1330 rustc_hash::FxHashMap::default();
1331 let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1332 for (ri, row) in rows.iter().enumerate() {
1333 let key: Vec<Value> =
1334 key_indices.iter().map(|&i| row[i].clone()).collect();
1335 match group_map.get(&key) {
1336 Some(&idx) => groups[idx].1.push(ri),
1337 None => {
1338 let idx = groups.len();
1339 group_map.insert(key.clone(), idx);
1340 groups.push((key, vec![ri]));
1341 }
1342 }
1343 }
1344
1345 let mut out_columns: Vec<String> = keys.clone();
1346 for agg in aggregates.iter() {
1347 out_columns.push(agg.output_name.clone());
1348 }
1349
1350 let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1351 for (key_vals, row_indices) in &groups {
1352 let mut row = key_vals.clone();
1353 for (ai, agg) in aggregates.iter().enumerate() {
1354 let col_idx = agg_field_indices[ai];
1355 let val = compute_group_aggregate(
1356 agg.function,
1357 &rows,
1358 row_indices,
1359 col_idx,
1360 );
1361 row.push(val);
1362 }
1363 out_rows.push(row);
1364 }
1365
1366 if let Some(having_expr) = having {
1367 out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1368 }
1369
1370 Ok(QueryResult::Rows {
1371 columns: out_columns,
1372 rows: out_rows,
1373 })
1374 }
1375 _ => Err("group by requires row input".into()),
1376 }
1377 }
1378
1379 PlanNode::NestedLoopJoin {
1380 left,
1381 right,
1382 on,
1383 kind,
1384 } => {
1385 let left_result = self.execute_plan_readonly(left)?;
1386 let right_result = self.execute_plan_readonly(right)?;
1387 let (left_columns, left_rows) = match left_result {
1388 QueryResult::Rows { columns, rows } => (columns, rows),
1389 _ => return Err("join left side must produce rows".into()),
1390 };
1391 let (right_columns, right_rows) = match right_result {
1392 QueryResult::Rows { columns, rows } => (columns, rows),
1393 _ => return Err("join right side must produce rows".into()),
1394 };
1395
1396 if !matches!(kind, JoinKind::Cross) {
1397 if let Some(pred) = on {
1398 if let Some((l_idx, r_idx)) =
1399 try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1400 {
1401 let result = hash_join(
1402 left_columns,
1403 left_rows,
1404 right_columns,
1405 right_rows,
1406 l_idx,
1407 r_idx,
1408 *kind,
1409 );
1410 if let QueryResult::Rows { ref rows, .. } = result {
1411 check_join_limit(rows.len())?;
1412 }
1413 return Ok(result);
1414 }
1415 }
1416 }
1417
1418 let n_left = left_columns.len();
1419 let n_right = right_columns.len();
1420 let mut columns = Vec::with_capacity(n_left + n_right);
1421 columns.extend(left_columns);
1422 columns.extend(right_columns);
1423
1424 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1425 let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1426
1427 for left_row in &left_rows {
1428 let mut matched = false;
1429 for right_row in &right_rows {
1430 combined.clear();
1431 combined.extend_from_slice(left_row);
1432 combined.extend_from_slice(right_row);
1433 let keep = match kind {
1434 JoinKind::Cross => true,
1435 JoinKind::Inner | JoinKind::LeftOuter => match on {
1436 Some(pred) => eval_predicate(pred, &combined, &columns),
1437 None => true,
1438 },
1439 JoinKind::RightOuter => {
1440 unreachable!("planner rewrites RightOuter to LeftOuter")
1441 }
1442 };
1443 if keep {
1444 rows.push(combined.clone());
1445 check_join_limit(rows.len())?;
1446 matched = true;
1447 }
1448 }
1449 if !matched && matches!(kind, JoinKind::LeftOuter) {
1450 let mut row = Vec::with_capacity(n_left + n_right);
1451 row.extend_from_slice(left_row);
1452 row.resize(n_left + n_right, Value::Empty);
1453 rows.push(row);
1454 check_join_limit(rows.len())?;
1455 }
1456 }
1457
1458 Ok(QueryResult::Rows { columns, rows })
1459 }
1460
1461 PlanNode::Window { input, windows } => {
1462 let result = self.execute_plan_readonly(input)?;
1463 execute_window(result, windows)
1464 }
1465
1466 PlanNode::Union { left, right, all } => {
1467 let left_result = self.execute_plan_readonly(left)?;
1468 let right_result = self.execute_plan_readonly(right)?;
1469 let (left_cols, left_rows) = match left_result {
1470 QueryResult::Rows { columns, rows } => (columns, rows),
1471 _ => return Err("UNION requires query results on left side".into()),
1472 };
1473 let (_, right_rows) = match right_result {
1474 QueryResult::Rows { columns, rows } => (columns, rows),
1475 _ => return Err("UNION requires query results on right side".into()),
1476 };
1477 let mut combined = left_rows;
1478 if *all {
1479 combined.extend(right_rows);
1480 } else {
1481 let mut seen = std::collections::HashSet::new();
1482 for row in &combined {
1483 seen.insert(row.clone());
1484 }
1485 for row in right_rows {
1486 if seen.insert(row.clone()) {
1487 combined.push(row);
1488 }
1489 }
1490 }
1491 Ok(QueryResult::Rows {
1492 columns: left_cols,
1493 rows: combined,
1494 })
1495 }
1496
1497 PlanNode::Explain { input } => {
1498 let text = format_plan_tree(input, 0);
1499 Ok(QueryResult::Rows {
1500 columns: vec!["plan".to_string()],
1501 rows: text
1502 .lines()
1503 .map(|line| vec![Value::Str(line.to_string())])
1504 .collect(),
1505 })
1506 }
1507
1508 // All write variants — caller must escalate to the write lock.
1509 PlanNode::Insert { .. }
1510 | PlanNode::Update { .. }
1511 | PlanNode::Delete { .. }
1512 | PlanNode::Upsert { .. }
1513 | PlanNode::CreateTable { .. }
1514 | PlanNode::AlterTable { .. }
1515 | PlanNode::DropTable { .. }
1516 | PlanNode::CreateView { .. }
1517 | PlanNode::RefreshView { .. }
1518 | PlanNode::DropView { .. }
1519 | PlanNode::Begin
1520 | PlanNode::Commit
1521 | PlanNode::Rollback => Err(QueryError::ReadonlyNeedsWrite),
1522 }
1523 }
1524
1525 /// `&self` variant of [`Engine::materialize_subqueries`]. Used by the
1526 /// read path so `Filter` predicates with `InSubquery`/`ExistsSubquery`
1527 /// children can evaluate their inner queries without taking the write
1528 /// lock. Inner queries that would themselves need a write (e.g. dirty
1529 /// view) escalate via [`READONLY_NEEDS_WRITE`] just like the top-level
1530 /// read path does.
1531 fn materialize_subqueries_readonly(&self, expr: &Expr) -> Result<Expr, QueryError> {
1532 match expr {
1533 Expr::InSubquery {
1534 expr: inner,
1535 subquery,
1536 negated,
1537 } => {
1538 if is_correlated_subquery(subquery, &self.catalog) {
1539 // Pass through — will be materialized per-row in the
1540 // Filter handler's correlated subquery path.
1541 let inner = self.materialize_subqueries_readonly(inner)?;
1542 return Ok(Expr::InSubquery {
1543 expr: Box::new(inner),
1544 subquery: subquery.clone(),
1545 negated: *negated,
1546 });
1547 }
1548 let inner = self.materialize_subqueries_readonly(inner)?;
1549 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1550 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1551 let result = self.execute_plan_readonly(&sub_plan)?;
1552 let values = match result {
1553 QueryResult::Rows { rows, .. } => rows
1554 .into_iter()
1555 .filter_map(|mut row| {
1556 if row.is_empty() {
1557 None
1558 } else {
1559 Some(value_to_expr(row.swap_remove(0)))
1560 }
1561 })
1562 .collect(),
1563 _ => Vec::new(),
1564 };
1565 Ok(Expr::InList {
1566 expr: Box::new(inner),
1567 list: values,
1568 negated: *negated,
1569 })
1570 }
1571 Expr::ExistsSubquery { subquery, negated } => {
1572 if is_correlated_subquery(subquery, &self.catalog) {
1573 return Ok(expr.clone());
1574 }
1575 let sub_plan = crate::planner::plan_statement(Statement::Query(*subquery.clone()))
1576 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1577 let result = self.execute_plan_readonly(&sub_plan)?;
1578 let has_rows = match result {
1579 QueryResult::Rows { rows, .. } => !rows.is_empty(),
1580 _ => false,
1581 };
1582 let truth = if *negated { !has_rows } else { has_rows };
1583 Ok(Expr::Literal(Literal::Bool(truth)))
1584 }
1585 Expr::BinaryOp(l, op, r) => {
1586 let l = self.materialize_subqueries_readonly(l)?;
1587 let r = self.materialize_subqueries_readonly(r)?;
1588 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1589 }
1590 Expr::UnaryOp(op, inner) => {
1591 let inner = self.materialize_subqueries_readonly(inner)?;
1592 Ok(Expr::UnaryOp(*op, Box::new(inner)))
1593 }
1594 Expr::Case { whens, else_expr } => {
1595 let whens = whens
1596 .iter()
1597 .map(|(c, r)| {
1598 let c = self.materialize_subqueries_readonly(c)?;
1599 let r = self.materialize_subqueries_readonly(r)?;
1600 Ok((Box::new(c), Box::new(r)))
1601 })
1602 .collect::<Result<Vec<_>, QueryError>>()?;
1603 let else_expr = match else_expr {
1604 Some(e) => Some(Box::new(self.materialize_subqueries_readonly(e)?)),
1605 None => None,
1606 };
1607 Ok(Expr::Case { whens, else_expr })
1608 }
1609 other => Ok(other.clone()),
1610 }
1611 }
1612
1613 /// Per-row materialisation of correlated subqueries. For each row in the
1614 /// outer query, substitute outer column references in the subquery's
1615 /// filter with the current row's literal values, execute the modified
1616 /// subquery, and return the result as an InList or Bool literal.
1617 fn materialize_correlated_for_row_readonly(
1618 &self,
1619 expr: &Expr,
1620 outer_row: &[Value],
1621 outer_columns: &[String],
1622 ) -> Result<Expr, QueryError> {
1623 match expr {
1624 Expr::InSubquery {
1625 expr: inner,
1626 subquery,
1627 negated,
1628 } => {
1629 let inner =
1630 self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1631 let mut sub = *subquery.clone();
1632 if let Some(ref filter) = sub.filter {
1633 sub.filter = Some(substitute_outer_refs(
1634 filter,
1635 &sub.source,
1636 &self.catalog,
1637 outer_row,
1638 outer_columns,
1639 ));
1640 }
1641 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1642 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1643 let result = self.execute_plan_readonly(&sub_plan)?;
1644 let values = match result {
1645 QueryResult::Rows { rows, .. } => rows
1646 .into_iter()
1647 .filter_map(|mut row| {
1648 if row.is_empty() {
1649 None
1650 } else {
1651 Some(value_to_expr(row.swap_remove(0)))
1652 }
1653 })
1654 .collect(),
1655 _ => Vec::new(),
1656 };
1657 Ok(Expr::InList {
1658 expr: Box::new(inner),
1659 list: values,
1660 negated: *negated,
1661 })
1662 }
1663 Expr::ExistsSubquery { subquery, negated } => {
1664 let mut sub = *subquery.clone();
1665 if let Some(ref filter) = sub.filter {
1666 sub.filter = Some(substitute_outer_refs(
1667 filter,
1668 &sub.source,
1669 &self.catalog,
1670 outer_row,
1671 outer_columns,
1672 ));
1673 }
1674 let sub_plan = crate::planner::plan_statement(Statement::Query(sub))
1675 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1676 let result = self.execute_plan_readonly(&sub_plan)?;
1677 let has_rows = match result {
1678 QueryResult::Rows { rows, .. } => !rows.is_empty(),
1679 _ => false,
1680 };
1681 let truth = if *negated { !has_rows } else { has_rows };
1682 Ok(Expr::Literal(Literal::Bool(truth)))
1683 }
1684 Expr::BinaryOp(l, op, r) => {
1685 let l =
1686 self.materialize_correlated_for_row_readonly(l, outer_row, outer_columns)?;
1687 let r =
1688 self.materialize_correlated_for_row_readonly(r, outer_row, outer_columns)?;
1689 Ok(Expr::BinaryOp(Box::new(l), *op, Box::new(r)))
1690 }
1691 Expr::UnaryOp(op, inner) => {
1692 let inner =
1693 self.materialize_correlated_for_row_readonly(inner, outer_row, outer_columns)?;
1694 Ok(Expr::UnaryOp(*op, Box::new(inner)))
1695 }
1696 other => Ok(other.clone()),
1697 }
1698 }
1699
1700 pub fn catalog(&self) -> &Catalog {
1701 &self.catalog
1702 }
1703
1704 pub fn catalog_mut(&mut self) -> &mut Catalog {
1705 &mut self.catalog
1706 }
1707}