kimberlite_query/lib.rs
1//! # kmb-query: SQL query layer for `Kimberlite` projections
2//!
3//! This crate provides a minimal SQL query engine for compliance lookups
4//! against the projection store.
5//!
6//! ## SQL Subset
7//!
8//! Supported SQL features:
9//! - `SELECT` with column list or `*`
10//! - `FROM` single table or `JOIN` (`INNER`, `LEFT`, `RIGHT`, `FULL OUTER`,
11//! `CROSS`, with `ON` or `USING(...)` clauses)
12//! - `WHERE` with comparison predicates (`=`, `<`, `>`, `<=`, `>=`, `IN`)
13//! - `IN (SELECT)`, `NOT IN (SELECT)`, `EXISTS`, `NOT EXISTS` — both
14//! uncorrelated (pre-execute fast path) and correlated (semi-join
15//! decorrelation or nested-loop fallback; see
16//! `docs/reference/sql/correlated-subqueries.md`)
17//! - `ORDER BY` (ascending/descending)
18//! - `LIMIT` and `OFFSET` — literal or `$N` parameter
19//! - `GROUP BY` with aggregates (`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`)
20//! - Aggregate `FILTER (WHERE ...)` clauses, independent per aggregate
21//! - `HAVING` with aggregate filtering
22//! - `UNION` / `UNION ALL` / `INTERSECT` / `INTERSECT ALL` /
23//! `EXCEPT` / `EXCEPT ALL`
24//! - `DISTINCT`
25//! - JSON operators `->`, `->>`, `@>` in WHERE clauses
26//! - `CASE` (searched and simple form)
27//! - `WITH` (Common Table Expressions / CTEs), including `WITH RECURSIVE`
28//! via iterative fixed-point evaluation (depth cap 1000)
29//! - Subqueries in FROM and JOIN (`SELECT * FROM (SELECT ...) AS t`)
30//! - Window functions (`OVER`, `PARTITION BY`, `ROW_NUMBER`, `RANK`,
31//! `DENSE_RANK`, `LAG`, `LEAD`, `FIRST_VALUE`, `LAST_VALUE`)
32//! - `ALTER TABLE` (ADD COLUMN, DROP COLUMN) — parser only; kernel
33//! execution pending
34//! - Parameterized queries (`$1`, `$2`, ...) in WHERE, LIMIT, OFFSET, and
35//! DML values
36//!
37//! - Scalar functions in SELECT projection and WHERE predicates:
38//! `UPPER`, `LOWER`, `LENGTH`, `TRIM`, `CONCAT`, `||`, `ABS`,
39//! `ROUND`, `CEIL`/`CEILING`, `FLOOR`, `COALESCE`, `NULLIF`, `CAST`
40//! - `ILIKE`, `NOT LIKE`, `NOT ILIKE` pattern matching
41//! - `NOT IN (list)`, `NOT BETWEEN low AND high`
42//!
43//! Not yet supported:
44//! - Scalar subquery `WHERE col = (SELECT ...)`, `ANY`, `ALL`, `SOME`
45//! - Clock-dependent functions (`NOW()`, `CURRENT_DATE`, `EXTRACT`,
46//! `DATE_TRUNC`) — deferred pending a clock-threading decision
47//! - `MOD`, `POWER`, `SQRT`, `SUBSTRING` — deferred
48//!
49//! ## Usage
50//!
51//! ```ignore
52//! use kimberlite_query::{QueryEngine, Schema, SchemaBuilder, ColumnDef, DataType, Value};
53//! use kimberlite_store::{BTreeStore, TableId};
54//!
55//! // Define schema
56//! let schema = SchemaBuilder::new()
57//! .table(
58//! "users",
59//! TableId::new(1),
60//! vec![
61//! ColumnDef::new("id", DataType::BigInt).not_null(),
62//! ColumnDef::new("name", DataType::Text).not_null(),
63//! ],
64//! vec!["id".into()],
65//! )
66//! .build();
67//!
68//! // Create engine
69//! let engine = QueryEngine::new(schema);
70//!
71//! // Execute query
72//! let mut store = BTreeStore::open("data/projections")?;
73//! let result = engine.query(&mut store, "SELECT * FROM users WHERE id = $1", &[Value::BigInt(42)])?;
74//! ```
75//!
76//! ## Point-in-Time Queries
77//!
78//! For compliance, you can query at a specific log position:
79//!
80//! ```ignore
81//! let result = engine.query_at(
82//! &mut store,
83//! "SELECT * FROM users WHERE id = 1",
84//! &[],
85//! Offset::new(1000), // Query state as of log position 1000
86//! )?;
87//! ```
88//!
89//! ## Scalar expressions (v0.5.1)
90//!
91//! The parser accepts scalar functions in SELECT projection and WHERE
92//! predicates. Each of these queries parses cleanly and produces a
93//! `ParsedStatement::Select` with either a `ScalarCmp` predicate or
94//! entries in `scalar_projections`:
95//!
96//! ```
97//! use kimberlite_query::{parse_statement, ParsedStatement, Predicate};
98//!
99//! // WHERE col NOT IN (list) — mirror of IN, v0.5.1.
100//! let s = parse_statement("SELECT id FROM t WHERE x NOT IN (1, 2, 3)").unwrap();
101//! let ParsedStatement::Select(sel) = s else { panic!() };
102//! assert!(matches!(sel.predicates[0], Predicate::NotIn(_, _)));
103//!
104//! // WHERE UPPER(name) = 'ALICE' — scalar LHS routes to ScalarCmp.
105//! let s = parse_statement("SELECT id FROM t WHERE UPPER(name) = 'ALICE'").unwrap();
106//! let ParsedStatement::Select(sel) = s else { panic!() };
107//! assert!(matches!(sel.predicates[0], Predicate::ScalarCmp { .. }));
108//!
109//! // SELECT col AS alias — alias preserved end-to-end.
110//! let s = parse_statement("SELECT name AS display FROM t").unwrap();
111//! let ParsedStatement::Select(sel) = s else { panic!() };
112//! let aliases = sel.column_aliases.as_ref().unwrap();
113//! assert_eq!(aliases[0].as_deref(), Some("display"));
114//!
115//! // SELECT CAST(x AS INTEGER) — lands in scalar_projections with a
116//! // synthesised output column name.
117//! let s = parse_statement("SELECT CAST(x AS INTEGER) FROM t").unwrap();
118//! let ParsedStatement::Select(sel) = s else { panic!() };
119//! assert_eq!(sel.scalar_projections.len(), 1);
120//! assert_eq!(sel.scalar_projections[0].output_name.as_str(), "cast");
121//! ```
122
123pub mod correlated;
124pub mod depth_check;
125pub mod dml_planner;
126mod error;
127mod executor;
128pub mod explain;
129pub mod expression;
130pub mod information_schema;
131pub mod key_encoder;
132mod parse_cache;
133mod parser;
134mod plan;
135mod planner;
136pub mod rbac_filter;
137mod schema;
138mod value;
139pub mod window;
140
141#[cfg(test)]
142mod tests;
143
144// Re-export public types
145pub use error::{QueryError, Result};
146pub use executor::{QueryResult, Row, execute};
147pub use expression::{EvalContext, ScalarExpr, evaluate};
148pub use parser::{
149 AlterTableOperation, HavingCondition, HavingOp, OnConflictAction, OnConflictClause,
150 ParsedAlterTable, ParsedAttachMaskingPolicy, ParsedColumn, ParsedCreateIndex, ParsedCreateMask,
151 ParsedCreateMaskingPolicy, ParsedCreateTable, ParsedCreateUser, ParsedCte, ParsedDelete,
152 ParsedDetachMaskingPolicy, ParsedGrant, ParsedInsert, ParsedMaskingStrategy, ParsedSelect,
153 ParsedSetClassification, ParsedStatement, ParsedUnion, ParsedUpdate, Predicate, PredicateValue,
154 ScalarCmpOp, TimeTravel, UpsertExpr, expr_to_scalar_expr, extract_at_offset,
155 extract_time_travel, parse_statement, try_parse_custom_statement,
156};
157pub use planner::plan_query;
158pub use schema::{
159 ColumnDef, ColumnName, DataType, IndexDef, Schema, SchemaBuilder, TableDef, TableName,
160};
161pub use value::Value;
162
163use kimberlite_store::ProjectionStore;
164use kimberlite_types::Offset;
165
166/// Outcome returned by a timestamp→offset resolver.
167///
168/// v0.6.0 Tier 2 #6 — a resolver that owns its own index (the
169/// runtime's in-memory timestamp index, an audit-log-backed index,
170/// etc.) can distinguish three cases that `Option<Offset>` cannot:
171///
172/// - We found an offset at or before the requested timestamp.
173/// - The log has entries, but the earliest is *after* the requested
174/// timestamp — i.e. the request predates the retention horizon.
175/// Surfacing this as a distinct variant lets the query layer emit
176/// [`QueryError::AsOfBeforeRetentionHorizon`] with the horizon
177/// attached, which is far more actionable to the caller.
178/// - The log is empty — no timestamps recorded yet.
179///
180/// See [`QueryEngine::query_at_timestamp_resolved`] for the
181/// consumer of this type.
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183pub enum TimestampResolution {
184 /// Resolver found a projection offset whose commit timestamp is
185 /// the greatest value ≤ the target.
186 Offset(Offset),
187 /// Resolver has entries, but the earliest commit timestamp is
188 /// strictly greater than the target. `horizon_ns` is that
189 /// earliest timestamp, so callers can tell users "the oldest
190 /// retained data is `<horizon>`, try a later instant".
191 BeforeRetentionHorizon { horizon_ns: i64 },
192 /// Resolver has no entries at all (fresh DB or the index hasn't
193 /// been seeded yet). Indistinguishable from "predates genesis"
194 /// in observable behaviour — surfaced via `UnsupportedFeature`.
195 LogEmpty,
196}
197
198/// Query engine for executing SQL against a projection store.
199///
200/// Holds the schema plus an optional parse cache (AUDIT-2026-04
201/// S3.4). The engine is `Clone` — the parse cache is shared via
202/// `Arc` so cloned handles hit the same memoised entries.
203#[derive(Debug, Clone)]
204pub struct QueryEngine {
205 schema: Schema,
206 parse_cache: Option<std::sync::Arc<parse_cache::ParseCache>>,
207 /// Cap on `outer_rows × inner_rows_per_iter` for correlated
208 /// subquery loops. See `docs/reference/sql/correlated-subqueries.md`.
209 /// Defaults to [`correlated::DEFAULT_CORRELATED_CAP`] (10 million).
210 correlated_cap: u64,
211}
212
213impl QueryEngine {
214 /// Creates a new query engine with the given schema. No
215 /// parse cache is attached by default — use
216 /// [`Self::with_parse_cache`] to opt in.
217 pub fn new(schema: Schema) -> Self {
218 Self {
219 schema,
220 parse_cache: None,
221 correlated_cap: correlated::DEFAULT_CORRELATED_CAP,
222 }
223 }
224
225 /// Attach an LRU parse cache of the given size. `0` disables
226 /// caching (every call re-parses).
227 #[must_use]
228 pub fn with_parse_cache(mut self, max_size: usize) -> Self {
229 self.parse_cache = Some(std::sync::Arc::new(parse_cache::ParseCache::new(max_size)));
230 self
231 }
232
233 /// Override the correlated-subquery row-evaluation cap. Defaults
234 /// to 10,000,000. Queries whose estimated
235 /// `outer_rows × inner_rows_per_iter` exceeds this cap fail with
236 /// [`QueryError::CorrelatedCardinalityExceeded`] before the
237 /// correlated loop runs. Set to `u64::MAX` to effectively disable.
238 #[must_use]
239 pub fn with_correlated_cap(mut self, cap: u64) -> Self {
240 self.correlated_cap = cap;
241 self
242 }
243
244 /// Returns a snapshot of parse-cache stats, or `None` if no
245 /// cache is attached.
246 pub fn parse_cache_stats(&self) -> Option<parse_cache::ParseCacheStats> {
247 self.parse_cache
248 .as_deref()
249 .map(parse_cache::ParseCache::stats)
250 }
251
252 /// Clear the parse cache, if any.
253 pub fn clear_parse_cache(&self) {
254 if let Some(c) = &self.parse_cache {
255 c.clear();
256 }
257 }
258
259 /// Returns a reference to the schema.
260 pub fn schema(&self) -> &Schema {
261 &self.schema
262 }
263
264 /// Parses a SQL string and extracts the SELECT or UNION statement.
265 ///
266 /// Static variant — bypasses the parse cache. Used by call
267 /// sites that predate the cache and by internal recursive
268 /// parsers.
269 fn parse_query_statement(sql: &str) -> Result<parser::ParsedStatement> {
270 let stmt = parser::parse_statement(sql)?;
271 match &stmt {
272 parser::ParsedStatement::Select(_) | parser::ParsedStatement::Union(_) => Ok(stmt),
273 _ => Err(QueryError::UnsupportedFeature(
274 "only SELECT and UNION queries are supported".to_string(),
275 )),
276 }
277 }
278
279 /// Cache-aware parse wrapper.
280 ///
281 /// Looks the SQL up in the parse cache (if attached). On
282 /// miss, parses via [`Self::parse_query_statement`] and
283 /// inserts into the cache. Non-SELECT/UNION errors are
284 /// returned directly without populating the cache — they're
285 /// errors for every subsequent call anyway and we don't want
286 /// to memoise them.
287 fn parse_query_statement_cached(&self, sql: &str) -> Result<parser::ParsedStatement> {
288 if let Some(cache) = &self.parse_cache {
289 if let Some(stmt) = cache.get(sql) {
290 return Ok(stmt);
291 }
292 }
293 let stmt = Self::parse_query_statement(sql)?;
294 if let Some(cache) = &self.parse_cache {
295 cache.insert(sql.to_string(), stmt.clone());
296 }
297 Ok(stmt)
298 }
299
300 /// Executes a SQL query against the current store state.
301 ///
302 /// Supports SELECT and UNION/UNION ALL queries.
303 ///
304 /// # Arguments
305 ///
306 /// * `store` - The projection store to query
307 /// * `sql` - SQL query string
308 /// * `params` - Query parameters (for `$1`, `$2`, etc.)
309 ///
310 /// # Example
311 ///
312 /// ```ignore
313 /// let result = engine.query(
314 /// &mut store,
315 /// "SELECT name FROM users WHERE id = $1",
316 /// &[Value::BigInt(42)],
317 /// )?;
318 /// ```
319 pub fn query<S: ProjectionStore>(
320 &self,
321 store: &mut S,
322 sql: &str,
323 params: &[Value],
324 ) -> Result<QueryResult> {
325 // AUDIT-2026-04 S3.5 — healthcare BREAK_GLASS prefix.
326 // `WITH BREAK_GLASS REASON='...' SELECT ...` strips the
327 // prefix, emits a warn-level structured log for the
328 // caller's audit pipeline to pick up, and falls through
329 // to the normal query path. The reason is the attribution
330 // value — enforcement (RBAC + masking) is still applied.
331 let (after_break_glass, break_glass_reason) = explain::extract_break_glass(sql);
332 if let Some(ref reason) = break_glass_reason {
333 tracing::warn!(
334 break_glass_reason = %reason,
335 "BREAK_GLASS query — regulator-visible audit signal",
336 );
337 }
338 let sql = after_break_glass;
339
340 // AUDIT-2026-04 S3.4 — `information_schema.*` virtual-table
341 // interception. Synthesises results from the live schema
342 // without going through the planner/executor. Callers
343 // that point FROM at `information_schema.tables` or
344 // `.columns` get back schema introspection rows without
345 // the table needing to be registered in the store.
346 if let Some(result) = information_schema::maybe_answer(sql, &self.schema) {
347 return Ok(result);
348 }
349
350 // AUDIT-2026-04 S3.3 — EXPLAIN prefix dispatch. A caller
351 // issuing `EXPLAIN SELECT ...` gets a single-row result
352 // whose only value is the rendered plan tree, rather
353 // than executing the statement.
354 let (after_explain, is_explain) = explain::extract_explain(sql);
355 if is_explain {
356 let plan_text = self.explain(after_explain, params)?;
357 return Ok(executor::QueryResult {
358 columns: vec!["plan".into()],
359 rows: vec![vec![Value::Text(plan_text)]],
360 });
361 }
362 let sql = after_explain; // equivalent to original `sql` when EXPLAIN absent
363
364 // Extract time-travel clause (AT OFFSET / FOR SYSTEM_TIME AS OF / AS OF)
365 // before passing SQL to sqlparser. Offset syntax dispatches
366 // directly; timestamp syntax without a resolver errors out
367 // with a clear message pointing to
368 // `query_at_timestamp(..., resolver)`.
369 let (cleaned_sql, time_travel) = parser::extract_time_travel(sql);
370 match time_travel {
371 Some(parser::TimeTravel::Offset(o)) => {
372 return self.query_at(store, &cleaned_sql, params, Offset::new(o));
373 }
374 Some(parser::TimeTravel::TimestampNs(_)) => {
375 return Err(QueryError::UnsupportedFeature(
376 "FOR SYSTEM_TIME AS OF '<iso>' / AS OF '<iso>' \
377 requires a timestamp→offset resolver — use \
378 QueryEngine::query_at_timestamp(..., resolver)"
379 .to_string(),
380 ));
381 }
382 None => {}
383 }
384
385 let stmt = self.parse_query_statement_cached(sql)?;
386
387 match stmt {
388 parser::ParsedStatement::Select(mut parsed) => {
389 // Pre-execute uncorrelated subqueries (IN/EXISTS/NOT EXISTS),
390 // attempt semi-join decorrelation of correlated ones, and
391 // leave remaining correlated predicates in place for the
392 // correlated-loop executor.
393 //
394 // `outer_scope` is the enclosing scope stack visible to the
395 // subquery; at the top level we seed it with the outer
396 // SELECT's FROM table so the walker can detect
397 // correlation.
398 self.pre_execute_subqueries(store, &mut parsed, params)?;
399
400 let window_fns = parsed.window_fns.clone();
401 let result = if parsed.ctes.is_empty() {
402 if has_correlated_predicate(&parsed.predicates) {
403 self.execute_correlated_query(store, &parsed, params)?
404 } else {
405 let plan = planner::plan_query(&self.schema, &parsed, params)?;
406 let table_def = self
407 .schema
408 .get_table(&plan.table_name().into())
409 .ok_or_else(|| {
410 QueryError::TableNotFound(plan.table_name().to_string())
411 })?;
412 executor::execute(store, &plan, table_def)?
413 }
414 } else {
415 self.execute_with_ctes(store, &parsed, params)?
416 };
417 // AUDIT-2026-04 S3.2 — window functions are a
418 // post-pass over the base SELECT result; the base
419 // plan already projected the columns the window
420 // fn references.
421 window::apply_window_fns(result, &window_fns)
422 }
423 parser::ParsedStatement::Union(union_stmt) => {
424 self.execute_union(store, &union_stmt, params)
425 }
426 _ => unreachable!("parse_query_statement only returns Select or Union"),
427 }
428 }
429
430 /// Executes a SELECT with CTEs by materializing each CTE and building
431 /// a temporary schema that includes the CTE result sets as tables.
432 fn execute_with_ctes<S: ProjectionStore>(
433 &self,
434 store: &mut S,
435 parsed: &parser::ParsedSelect,
436 params: &[Value],
437 ) -> Result<QueryResult> {
438 // Build an extended schema that includes CTE-derived tables
439 let mut extended_schema = self.schema.clone();
440
441 // Materialize each CTE
442 for cte in &parsed.ctes {
443 // Execute the CTE's anchor (non-recursive) query
444 let cte_plan = planner::plan_query(&extended_schema, &cte.query, params)?;
445 let cte_table_def = extended_schema
446 .get_table(&cte_plan.table_name().into())
447 .ok_or_else(|| QueryError::TableNotFound(cte_plan.table_name().to_string()))?;
448 let mut cte_result = executor::execute(store, &cte_plan, cte_table_def)?;
449
450 // Register CTE result as a virtual table in the extended schema
451 // Use a synthetic table ID based on the CTE name hash
452 let cte_table_id = {
453 use std::collections::hash_map::DefaultHasher;
454 use std::hash::{Hash, Hasher};
455 let mut hasher = DefaultHasher::new();
456 cte.name.hash(&mut hasher);
457 kimberlite_store::TableId::new(hasher.finish())
458 };
459
460 // Build column defs from the CTE result
461 let cte_columns: Vec<schema::ColumnDef> = cte_result
462 .columns
463 .iter()
464 .map(|col| schema::ColumnDef::new(col.as_str(), schema::DataType::Text))
465 .collect();
466
467 let pk_cols = if cte_columns.is_empty() {
468 vec![]
469 } else {
470 vec![cte_result.columns[0].clone()]
471 };
472
473 let cte_table = schema::TableDef::new(cte_table_id, cte_columns, pk_cols);
474 extended_schema.add_table(cte.name.as_str(), cte_table);
475
476 // Write anchor rows into the store as the initial working set.
477 let mut total_rows = cte_result.rows.len();
478 for (row_idx, row) in cte_result.rows.iter().enumerate() {
479 Self::write_cte_row(store, cte_table_id, row_idx, &cte_result.columns, row)?;
480 }
481
482 // Recursive iteration: keep evaluating the recursive arm against
483 // the growing CTE table until no new rows are produced or the
484 // depth cap is hit. Iterative fixed-point — honours the
485 // workspace "no recursion" constraint and prevents runaway loops.
486 if let Some(recursive_select) = &cte.recursive_arm {
487 const MAX_RECURSIVE_DEPTH: usize = 1000;
488 let mut seen: std::collections::HashSet<String> =
489 cte_result.rows.iter().map(|r| format!("{r:?}")).collect();
490
491 for depth in 0..MAX_RECURSIVE_DEPTH {
492 let recursive_plan =
493 planner::plan_query(&extended_schema, recursive_select, params)?;
494 let recursive_table_def = extended_schema
495 .get_table(&recursive_plan.table_name().into())
496 .ok_or_else(|| {
497 QueryError::TableNotFound(recursive_plan.table_name().to_string())
498 })?;
499 let iteration_result =
500 executor::execute(store, &recursive_plan, recursive_table_def)?;
501
502 let mut new_rows = 0usize;
503 for row in iteration_result.rows {
504 let key = format!("{row:?}");
505 if seen.insert(key) {
506 Self::write_cte_row(
507 store,
508 cte_table_id,
509 total_rows,
510 &cte_result.columns,
511 &row,
512 )?;
513 cte_result.rows.push(row);
514 total_rows += 1;
515 new_rows += 1;
516 }
517 }
518 if new_rows == 0 {
519 break;
520 }
521 if depth + 1 == MAX_RECURSIVE_DEPTH {
522 return Err(QueryError::UnsupportedFeature(format!(
523 "recursive CTE `{}` exceeded maximum depth of {MAX_RECURSIVE_DEPTH} iterations",
524 cte.name
525 )));
526 }
527 }
528 }
529 }
530
531 // Execute the main query against the extended schema
532 let main_query = parser::ParsedSelect {
533 ctes: vec![], // CTEs already materialized
534 ..parsed.clone()
535 };
536
537 let plan = planner::plan_query(&extended_schema, &main_query, params)?;
538 let table_def = extended_schema
539 .get_table(&plan.table_name().into())
540 .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
541 executor::execute(store, &plan, table_def)
542 }
543
544 /// Writes a single CTE row into the store under the synthetic table id.
545 /// Helper extracted so the recursive-CTE iteration loop and the initial
546 /// anchor materialisation share the same path.
547 fn write_cte_row<S: ProjectionStore>(
548 store: &mut S,
549 cte_table_id: kimberlite_store::TableId,
550 row_idx: usize,
551 columns: &[crate::schema::ColumnName],
552 row: &[Value],
553 ) -> Result<()> {
554 let mut row_map = serde_json::Map::new();
555 for (col, val) in columns.iter().zip(row.iter()) {
556 row_map.insert(col.as_str().to_string(), value_to_json(val));
557 }
558 let json_val = serde_json::to_vec(&serde_json::Value::Object(row_map)).map_err(|e| {
559 QueryError::UnsupportedFeature(format!("CTE serialization failed: {e}"))
560 })?;
561 let pk_key = crate::key_encoder::encode_key(&[Value::BigInt(row_idx as i64)]);
562 let batch = kimberlite_store::WriteBatch::new(kimberlite_types::Offset::new(
563 store.applied_position().as_u64() + 1,
564 ))
565 .put(cte_table_id, pk_key, bytes::Bytes::from(json_val));
566 store.apply(batch)?;
567 Ok(())
568 }
569
570 /// Walks the outer SELECT's predicate tree, classifies each
571 /// subquery as uncorrelated or correlated, and rewrites as follows:
572 ///
573 /// - **Uncorrelated** `IN (SELECT)` / `EXISTS` / `NOT EXISTS` →
574 /// pre-executed once; result substituted into `Predicate::In` /
575 /// `Predicate::NotIn` / `Predicate::Always(bool)`. Matches the
576 /// v0.5.0 fast path.
577 /// - **Correlated** `EXISTS` / `NOT EXISTS` with a single equijoin
578 /// to the outer scope: rewritten by
579 /// [`correlated::try_semi_join_rewrite`] into
580 /// `Predicate::InSubquery { negated }` against the outer column,
581 /// then the uncorrelated path pre-executes it.
582 /// - **Correlated** everything else: left in place for
583 /// [`Self::execute_correlated_query`] to handle per-outer-row.
584 ///
585 /// Assertion: on return, every `InSubquery` / `Exists` still in
586 /// `parsed.predicates` is correlated. The caller's
587 /// `has_correlated_predicate` check uses that invariant to
588 /// dispatch.
589 fn pre_execute_subqueries<S: ProjectionStore>(
590 &self,
591 store: &mut S,
592 parsed: &mut parser::ParsedSelect,
593 params: &[Value],
594 ) -> Result<()> {
595 // Build the outer scope — the outer SELECT's FROM table (plus
596 // any JOIN tables) as the enclosing scope for each inner
597 // subquery.
598 let outer_scope = self.build_outer_scope(parsed);
599
600 let preds = std::mem::take(&mut parsed.predicates);
601 let mut out = Vec::with_capacity(preds.len());
602 for pred in preds {
603 out.push(self.classify_and_rewrite_predicate(store, pred, &outer_scope, params)?);
604 }
605 parsed.predicates = out;
606 Ok(())
607 }
608
609 /// Construct the outer `PlannerScope` for `parsed` — the visible
610 /// tables in the outer SELECT. FROM first, then any JOIN tables.
611 fn build_outer_scope<'s>(
612 &'s self,
613 parsed: &parser::ParsedSelect,
614 ) -> correlated::PlannerScope<'s> {
615 let mut bindings: Vec<(String, &schema::TableDef)> = Vec::new();
616 if let Some(t) = self.schema.get_table(&parsed.table.clone().into()) {
617 bindings.push((parsed.table.clone(), t));
618 }
619 for join in &parsed.joins {
620 if let Some(t) = self.schema.get_table(&join.table.clone().into()) {
621 bindings.push((join.table.clone(), t));
622 }
623 }
624 correlated::PlannerScope::empty().push(bindings)
625 }
626
627 /// Classifier for a single predicate — drives pre-execute vs.
628 /// semi-join rewrite vs. leave-as-correlated. Recursive on `Or`.
629 fn classify_and_rewrite_predicate<S: ProjectionStore>(
630 &self,
631 store: &mut S,
632 pred: parser::Predicate,
633 outer_scope: &correlated::PlannerScope<'_>,
634 params: &[Value],
635 ) -> Result<parser::Predicate> {
636 match pred {
637 parser::Predicate::InSubquery {
638 column,
639 subquery,
640 negated,
641 } => {
642 let outer_refs =
643 correlated::collect_outer_refs(&subquery, outer_scope, &self.schema);
644 if outer_refs.is_empty() {
645 // Uncorrelated — pre-execute and substitute.
646 self.pre_execute_uncorrelated_in(store, &column, &subquery, negated, params)
647 } else {
648 // Correlated IN/NOT IN — keep the predicate
649 // in place; the correlated-loop executor handles it.
650 Ok(parser::Predicate::InSubquery {
651 column,
652 subquery,
653 negated,
654 })
655 }
656 }
657 parser::Predicate::Exists { subquery, negated } => {
658 let outer_refs =
659 correlated::collect_outer_refs(&subquery, outer_scope, &self.schema);
660 if outer_refs.is_empty() {
661 // Uncorrelated.
662 self.pre_execute_uncorrelated_exists(store, &subquery, negated, params)
663 } else if let Some((outer_col, rewritten)) =
664 correlated::try_semi_join_rewrite(&subquery, negated, &outer_refs)
665 {
666 // Decorrelated: rewrite as IN (SELECT) / NOT IN (SELECT)
667 // against the outer column, then pre-execute.
668 self.pre_execute_uncorrelated_in(store, &outer_col, &rewritten, negated, params)
669 } else {
670 // Correlated loop fallback.
671 Ok(parser::Predicate::Exists { subquery, negated })
672 }
673 }
674 parser::Predicate::Or(left, right) => {
675 let mut new_left = Vec::with_capacity(left.len());
676 for p in left {
677 new_left.push(self.classify_and_rewrite_predicate(
678 store,
679 p,
680 outer_scope,
681 params,
682 )?);
683 }
684 let mut new_right = Vec::with_capacity(right.len());
685 for p in right {
686 new_right.push(self.classify_and_rewrite_predicate(
687 store,
688 p,
689 outer_scope,
690 params,
691 )?);
692 }
693 Ok(parser::Predicate::Or(new_left, new_right))
694 }
695 other => Ok(other),
696 }
697 }
698
699 /// Pre-execute an uncorrelated `IN (SELECT)` / `NOT IN (SELECT)`
700 /// and return the substituted `Predicate::In` / `Predicate::NotIn`.
701 fn pre_execute_uncorrelated_in<S: ProjectionStore>(
702 &self,
703 store: &mut S,
704 column: &schema::ColumnName,
705 subquery: &parser::ParsedSelect,
706 negated: bool,
707 params: &[Value],
708 ) -> Result<parser::Predicate> {
709 let inner_plan = planner::plan_query(&self.schema, subquery, params)?;
710 let inner_table_def = self
711 .schema
712 .get_table(&inner_plan.table_name().into())
713 .ok_or_else(|| QueryError::TableNotFound(inner_plan.table_name().to_string()))?;
714 let inner_result = executor::execute(store, &inner_plan, inner_table_def)?;
715 if inner_result.columns.len() != 1 {
716 return Err(QueryError::UnsupportedFeature(format!(
717 "IN (SELECT ...) subquery must project exactly 1 column, got {}",
718 inner_result.columns.len()
719 )));
720 }
721 let values: Vec<parser::PredicateValue> = inner_result
722 .rows
723 .into_iter()
724 .filter_map(|row| row.into_iter().next())
725 .map(parser::PredicateValue::Literal)
726 .collect();
727 Ok(if negated {
728 parser::Predicate::NotIn(column.clone(), values)
729 } else {
730 parser::Predicate::In(column.clone(), values)
731 })
732 }
733
734 /// Pre-execute an uncorrelated `EXISTS` / `NOT EXISTS` and return
735 /// the collapsed `Predicate::Always(bool)`.
736 fn pre_execute_uncorrelated_exists<S: ProjectionStore>(
737 &self,
738 store: &mut S,
739 subquery: &parser::ParsedSelect,
740 negated: bool,
741 params: &[Value],
742 ) -> Result<parser::Predicate> {
743 let inner_plan = planner::plan_query(&self.schema, subquery, params)?;
744 let inner_table_def = self
745 .schema
746 .get_table(&inner_plan.table_name().into())
747 .ok_or_else(|| QueryError::TableNotFound(inner_plan.table_name().to_string()))?;
748 let inner_result = executor::execute(store, &inner_plan, inner_table_def)?;
749 let exists = !inner_result.rows.is_empty();
750 let predicate_holds = if negated { !exists } else { exists };
751 Ok(parser::Predicate::Always(predicate_holds))
752 }
753
754 /// Execute a SELECT whose predicate list still contains at least
755 /// one correlated subquery (InSubquery or Exists that survived
756 /// `pre_execute_subqueries`).
757 ///
758 /// Strategy: split the predicate list into "simple" (non-correlated)
759 /// predicates and "correlated" ones. Plan the outer query using
760 /// only the simple predicates. For each returned row, substitute
761 /// outer column values into each correlated inner subquery and
762 /// execute it; keep the row iff all correlated predicates pass.
763 fn execute_correlated_query<S: ProjectionStore>(
764 &self,
765 store: &mut S,
766 parsed: &parser::ParsedSelect,
767 params: &[Value],
768 ) -> Result<QueryResult> {
769 // Split simple vs. correlated predicates.
770 let mut simple_preds: Vec<parser::Predicate> = Vec::new();
771 let mut correlated_preds: Vec<parser::Predicate> = Vec::new();
772 for pred in &parsed.predicates {
773 match pred {
774 parser::Predicate::InSubquery { .. } | parser::Predicate::Exists { .. } => {
775 correlated_preds.push(pred.clone());
776 }
777 other => simple_preds.push(other.clone()),
778 }
779 }
780
781 // Build the outer query stripped of correlated predicates — we
782 // need the FULL outer row (all columns) so we can substitute
783 // outer column values into each inner subquery, regardless of
784 // which columns the user SELECTed. We'll project to the
785 // requested columns after the row-by-row filter.
786 let outer_table_def = self
787 .schema
788 .get_table(&parsed.table.clone().into())
789 .ok_or_else(|| QueryError::TableNotFound(parsed.table.clone()))?;
790 let outer_scan = parser::ParsedSelect {
791 predicates: simple_preds,
792 columns: None, // force SELECT * so we have every column available
793 column_aliases: None,
794 order_by: Vec::new(),
795 limit: None,
796 offset: None,
797 aggregates: Vec::new(),
798 aggregate_filters: Vec::new(),
799 group_by: Vec::new(),
800 distinct: false,
801 having: Vec::new(),
802 ctes: Vec::new(),
803 window_fns: Vec::new(),
804 scalar_projections: Vec::new(),
805 case_columns: Vec::new(),
806 joins: Vec::new(),
807 ..parsed.clone()
808 };
809
810 let outer_plan = planner::plan_query(&self.schema, &outer_scan, params)?;
811 let outer_rows = executor::execute(store, &outer_plan, outer_table_def)?;
812
813 // Estimate correlated row-evaluation cost for cardinality guard.
814 //
815 // Inner cost per outer row is bounded by the total inner-table
816 // row count; we use the store's current table size as an upper
817 // bound. If multiple correlated predicates reference the same
818 // or different inner tables, sum the per-predicate cost.
819 let outer_count = outer_rows.rows.len() as u64;
820 let mut inner_cost_per_row: u64 = 0;
821 for pred in &correlated_preds {
822 let inner_table = match pred {
823 parser::Predicate::InSubquery { subquery, .. }
824 | parser::Predicate::Exists { subquery, .. } => &subquery.table,
825 _ => continue,
826 };
827 let inner_def = self
828 .schema
829 .get_table(&inner_table.clone().into())
830 .ok_or_else(|| QueryError::TableNotFound(inner_table.clone()))?;
831 // Upper-bound the inner cost by scanning the table once — we
832 // issue a bounded scan so this is cheap.
833 let pairs = store.scan(
834 inner_def.table_id,
835 kimberlite_store::Key::min()..kimberlite_store::Key::max(),
836 1_000_000,
837 )?;
838 inner_cost_per_row = inner_cost_per_row.saturating_add(pairs.len() as u64);
839 }
840 // When inner tables are empty, bound by 1 to keep estimation
841 // monotonic (so a 0 × N query doesn't look free).
842 let inner_cost_per_row = inner_cost_per_row.max(1);
843 let estimated = outer_count.saturating_mul(inner_cost_per_row);
844 if estimated > self.correlated_cap {
845 return Err(QueryError::CorrelatedCardinalityExceeded {
846 estimated,
847 cap: self.correlated_cap,
848 });
849 }
850
851 // For each outer row, evaluate the correlated predicates.
852 let outer_columns = outer_rows.columns.clone();
853 let outer_alias = parsed.table.clone();
854 let mut kept: Vec<Vec<Value>> = Vec::new();
855 for row in outer_rows.rows {
856 // Build the `"alias.column"` → Value binding map. We bind
857 // the FROM alias as it appears in the ParsedSelect (the
858 // parser already resolved user aliases into the `table`
859 // field when the alias shadows the table name).
860 let mut bindings = std::collections::HashMap::new();
861 for (col, val) in outer_columns.iter().zip(row.iter()) {
862 bindings.insert(format!("{outer_alias}.{col}"), val.clone());
863 // Also bind under every possible alias seen in the
864 // correlated predicates — the user may have written
865 // `p.id` while the parser stored the table name
866 // `patient_current`. We cover the common case by
867 // also binding the bare column name and any alias
868 // we can extract from the inner refs themselves.
869 bindings.insert(col.as_str().to_string(), val.clone());
870 }
871 // Extend bindings with each correlated-ref alias. Walking
872 // the correlated predicate trees once up-front is fine.
873 for pred in &correlated_preds {
874 let refs = correlated_predicate_outer_refs(pred);
875 for r in refs {
876 let col_idx = outer_columns
877 .iter()
878 .position(|c| c.as_str() == r.column.as_str());
879 if let Some(idx) = col_idx {
880 if let Some(v) = row.get(idx) {
881 bindings.insert(r.as_column_ref(), v.clone());
882 }
883 }
884 }
885 }
886
887 let mut all_pass = true;
888 for pred in &correlated_preds {
889 if !self.evaluate_correlated_predicate(store, pred, &bindings, params)? {
890 all_pass = false;
891 break;
892 }
893 }
894 if all_pass {
895 kept.push(row);
896 }
897 }
898
899 // Apply ORDER BY, LIMIT, OFFSET on the full rows before
900 // projecting to the user's requested column list.
901 // (Simple implementation: leverage the fact that we kept full
902 // rows. We construct a second plan that projects + orders +
903 // limits using a temporary store isn't worth it; do it inline.)
904 Self::post_process_correlated_result(parsed, params, outer_columns, kept)
905 }
906
907 /// Apply the outer query's projection / ORDER BY / LIMIT / OFFSET
908 /// to the rows surviving the correlated-predicate filter.
909 fn post_process_correlated_result(
910 parsed: &parser::ParsedSelect,
911 params: &[Value],
912 outer_columns: Vec<schema::ColumnName>,
913 mut rows: Vec<Vec<Value>>,
914 ) -> Result<QueryResult> {
915 // ORDER BY — bare-column only, resolved against outer_columns.
916 if !parsed.order_by.is_empty() {
917 let indices: Vec<(usize, bool)> = parsed
918 .order_by
919 .iter()
920 .map(|ob| {
921 let idx = outer_columns
922 .iter()
923 .position(|c| c == &ob.column)
924 .ok_or_else(|| QueryError::ColumnNotFound {
925 table: parsed.table.clone(),
926 column: ob.column.to_string(),
927 })?;
928 Ok::<_, QueryError>((idx, ob.ascending))
929 })
930 .collect::<Result<Vec<_>>>()?;
931 rows.sort_by(|a, b| {
932 for (idx, asc) in &indices {
933 let ord = a
934 .get(*idx)
935 .and_then(|av| b.get(*idx).and_then(|bv| av.compare(bv)))
936 .unwrap_or(std::cmp::Ordering::Equal);
937 let ord = if *asc { ord } else { ord.reverse() };
938 if ord != std::cmp::Ordering::Equal {
939 return ord;
940 }
941 }
942 std::cmp::Ordering::Equal
943 });
944 }
945
946 // OFFSET / LIMIT.
947 let offset = match parsed.offset {
948 Some(parser::LimitExpr::Literal(n)) => n,
949 Some(parser::LimitExpr::Param(idx)) => params
950 .get(idx.saturating_sub(1))
951 .and_then(|v| match v {
952 Value::BigInt(n) if *n >= 0 => Some(*n as usize),
953 Value::Integer(n) if *n >= 0 => Some(*n as usize),
954 _ => None,
955 })
956 .unwrap_or(0),
957 None => 0,
958 };
959 let limit = match parsed.limit {
960 Some(parser::LimitExpr::Literal(n)) => Some(n),
961 Some(parser::LimitExpr::Param(idx)) => {
962 params.get(idx.saturating_sub(1)).and_then(|v| match v {
963 Value::BigInt(n) if *n >= 0 => Some(*n as usize),
964 Value::Integer(n) if *n >= 0 => Some(*n as usize),
965 _ => None,
966 })
967 }
968 None => None,
969 };
970 if offset > 0 {
971 rows.drain(0..offset.min(rows.len()));
972 }
973 if let Some(l) = limit {
974 rows.truncate(l);
975 }
976
977 // Project to the requested column list.
978 let (out_columns, projected_rows) = match (&parsed.columns, &parsed.column_aliases) {
979 (None, _) => (outer_columns.clone(), rows),
980 (Some(cols), aliases) => {
981 let mut indices = Vec::with_capacity(cols.len());
982 let mut out_names: Vec<schema::ColumnName> = Vec::with_capacity(cols.len());
983 for (i, col) in cols.iter().enumerate() {
984 let idx = outer_columns.iter().position(|c| c == col).ok_or_else(|| {
985 QueryError::ColumnNotFound {
986 table: parsed.table.clone(),
987 column: col.to_string(),
988 }
989 })?;
990 indices.push(idx);
991 let alias = aliases
992 .as_ref()
993 .and_then(|a| a.get(i))
994 .and_then(|a| a.as_ref());
995 out_names.push(match alias {
996 Some(a) => schema::ColumnName::new(a.clone()),
997 None => col.clone(),
998 });
999 }
1000 let projected: Vec<Vec<Value>> = rows
1001 .into_iter()
1002 .map(|r| indices.iter().map(|i| r[*i].clone()).collect())
1003 .collect();
1004 (out_names, projected)
1005 }
1006 };
1007
1008 Ok(QueryResult {
1009 columns: out_columns,
1010 rows: projected_rows,
1011 })
1012 }
1013
1014 /// Evaluate a correlated `InSubquery` / `Exists` against one
1015 /// outer row (already baked into `bindings`). Returns true iff
1016 /// the predicate holds.
1017 fn evaluate_correlated_predicate<S: ProjectionStore>(
1018 &self,
1019 store: &mut S,
1020 pred: &parser::Predicate,
1021 bindings: &std::collections::HashMap<String, Value>,
1022 params: &[Value],
1023 ) -> Result<bool> {
1024 match pred {
1025 parser::Predicate::Exists { subquery, negated } => {
1026 let substituted = correlated::substitute_outer_refs(subquery, bindings);
1027 // The inner subquery may itself have nested subqueries;
1028 // run it through the full query engine path so nested
1029 // correlations (if any) are handled.
1030 let inner_result = self.execute_inner_subquery(store, &substituted, params)?;
1031 let exists = !inner_result.rows.is_empty();
1032 Ok(if *negated { !exists } else { exists })
1033 }
1034 parser::Predicate::InSubquery {
1035 column,
1036 subquery,
1037 negated,
1038 } => {
1039 let substituted = correlated::substitute_outer_refs(subquery, bindings);
1040 let inner_result = self.execute_inner_subquery(store, &substituted, params)?;
1041 if inner_result.columns.len() != 1 {
1042 return Err(QueryError::UnsupportedFeature(format!(
1043 "IN (SELECT ...) subquery must project exactly 1 column, got {}",
1044 inner_result.columns.len()
1045 )));
1046 }
1047 let outer_val = bindings
1048 .get(column.as_str())
1049 .or_else(|| bindings.values().next()) // defensive
1050 .cloned();
1051 let Some(outer_val) = outer_val else {
1052 return Ok(false);
1053 };
1054 let any_match = inner_result
1055 .rows
1056 .iter()
1057 .any(|row| row.first().is_some_and(|v| v == &outer_val));
1058 Ok(if *negated { !any_match } else { any_match })
1059 }
1060 _ => Err(QueryError::UnsupportedFeature(
1061 "evaluate_correlated_predicate called on non-subquery predicate".to_string(),
1062 )),
1063 }
1064 }
1065
1066 /// Execute an inner subquery (with all outer refs already
1067 /// substituted). Delegates to `plan_query` + `executor::execute`.
1068 fn execute_inner_subquery<S: ProjectionStore>(
1069 &self,
1070 store: &mut S,
1071 inner: &parser::ParsedSelect,
1072 params: &[Value],
1073 ) -> Result<QueryResult> {
1074 // An inner subquery post-substitution might itself contain
1075 // nested correlations or uncorrelated subqueries. Run the
1076 // pre-execute pass once more to handle those cases.
1077 let mut inner_clone = inner.clone();
1078 self.pre_execute_subqueries(store, &mut inner_clone, params)?;
1079 if has_correlated_predicate(&inner_clone.predicates) {
1080 // v0.6.0 caps nesting at one correlated level — the
1081 // outer loop is already one nesting.
1082 return Err(QueryError::UnsupportedFeature(
1083 "nested correlated subqueries (depth > 1) are not supported in v0.6.0".to_string(),
1084 ));
1085 }
1086 let plan = planner::plan_query(&self.schema, &inner_clone, params)?;
1087 let table_def = self
1088 .schema
1089 .get_table(&plan.table_name().into())
1090 .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
1091 executor::execute(store, &plan, table_def)
1092 }
1093
1094 /// Executes a `UNION`, `INTERSECT`, or `EXCEPT` query (with or without `ALL`).
1095 ///
1096 /// Implementation: materialise both sides, then combine according to the
1097 /// operator. `ALL` keeps multiset semantics; the bare form (no `ALL`)
1098 /// deduplicates by row content. `Value` doesn't implement `Hash`, so the
1099 /// dedup/intersect/except keys use the debug format of each row — same
1100 /// trick already used by the prior UNION implementation.
1101 fn execute_union<S: ProjectionStore>(
1102 &self,
1103 store: &mut S,
1104 union_stmt: &parser::ParsedUnion,
1105 params: &[Value],
1106 ) -> Result<QueryResult> {
1107 // Plan and execute left side
1108 let left_plan = planner::plan_query(&self.schema, &union_stmt.left, params)?;
1109 let left_table_def = self
1110 .schema
1111 .get_table(&left_plan.table_name().into())
1112 .ok_or_else(|| QueryError::TableNotFound(left_plan.table_name().to_string()))?;
1113 let left_result = executor::execute(store, &left_plan, left_table_def)?;
1114
1115 // Plan and execute right side
1116 let right_plan = planner::plan_query(&self.schema, &union_stmt.right, params)?;
1117 let right_table_def = self
1118 .schema
1119 .get_table(&right_plan.table_name().into())
1120 .ok_or_else(|| QueryError::TableNotFound(right_plan.table_name().to_string()))?;
1121 let right_result = executor::execute(store, &right_plan, right_table_def)?;
1122
1123 // Use left side column names for the result
1124 let column_names = left_result.columns;
1125
1126 let row_key = |row: &Vec<Value>| format!("{row:?}");
1127
1128 let combined_rows: Vec<Vec<Value>> = match (union_stmt.op, union_stmt.all) {
1129 // UNION ALL: concatenate, keep duplicates
1130 (parser::SetOp::Union, true) => {
1131 let mut all_rows = left_result.rows;
1132 all_rows.extend(right_result.rows);
1133 all_rows
1134 }
1135 // UNION: concatenate then dedup
1136 (parser::SetOp::Union, false) => {
1137 let mut all_rows = left_result.rows;
1138 all_rows.extend(right_result.rows);
1139 let mut seen = std::collections::HashSet::new();
1140 all_rows.retain(|row| seen.insert(row_key(row)));
1141 all_rows
1142 }
1143 // INTERSECT: rows present in both sides (set semantics)
1144 (parser::SetOp::Intersect, false) => {
1145 let right_keys: std::collections::HashSet<String> =
1146 right_result.rows.iter().map(&row_key).collect();
1147 let mut seen = std::collections::HashSet::new();
1148 left_result
1149 .rows
1150 .into_iter()
1151 .filter(|row| {
1152 let key = row_key(row);
1153 right_keys.contains(&key) && seen.insert(key)
1154 })
1155 .collect()
1156 }
1157 // INTERSECT ALL: keep multiplicities — for each row appearing
1158 // min(left_count, right_count) times, emit that many copies.
1159 (parser::SetOp::Intersect, true) => {
1160 let mut right_counts: std::collections::HashMap<String, usize> =
1161 std::collections::HashMap::new();
1162 for row in &right_result.rows {
1163 *right_counts.entry(row_key(row)).or_insert(0) += 1;
1164 }
1165 let mut out = Vec::new();
1166 for row in left_result.rows {
1167 let key = row_key(&row);
1168 if let Some(count) = right_counts.get_mut(&key) {
1169 if *count > 0 {
1170 *count -= 1;
1171 out.push(row);
1172 }
1173 }
1174 }
1175 out
1176 }
1177 // EXCEPT: rows in left side not in right side (set semantics)
1178 (parser::SetOp::Except, false) => {
1179 let right_keys: std::collections::HashSet<String> =
1180 right_result.rows.iter().map(&row_key).collect();
1181 let mut seen = std::collections::HashSet::new();
1182 left_result
1183 .rows
1184 .into_iter()
1185 .filter(|row| {
1186 let key = row_key(row);
1187 !right_keys.contains(&key) && seen.insert(key)
1188 })
1189 .collect()
1190 }
1191 // EXCEPT ALL: subtract multiplicities — left_count - right_count copies
1192 (parser::SetOp::Except, true) => {
1193 let mut right_counts: std::collections::HashMap<String, usize> =
1194 std::collections::HashMap::new();
1195 for row in &right_result.rows {
1196 *right_counts.entry(row_key(row)).or_insert(0) += 1;
1197 }
1198 let mut out = Vec::new();
1199 for row in left_result.rows {
1200 let key = row_key(&row);
1201 let count = right_counts.entry(key).or_insert(0);
1202 if *count > 0 {
1203 *count -= 1;
1204 } else {
1205 out.push(row);
1206 }
1207 }
1208 out
1209 }
1210 };
1211
1212 Ok(QueryResult {
1213 columns: column_names,
1214 rows: combined_rows,
1215 })
1216 }
1217
1218 /// Executes a SQL query at a specific log position (point-in-time query).
1219 ///
1220 /// This enables compliance queries that show the state as it was
1221 /// at a specific point in the log.
1222 ///
1223 /// # Arguments
1224 ///
1225 /// * `store` - The projection store to query
1226 /// * `sql` - SQL query string
1227 /// * `params` - Query parameters
1228 /// * `position` - Log position to query at
1229 ///
1230 /// # Example
1231 ///
1232 /// ```ignore
1233 /// // Get user state as of log position 1000
1234 /// let result = engine.query_at(
1235 /// &mut store,
1236 /// "SELECT * FROM users WHERE id = 1",
1237 /// &[],
1238 /// Offset::new(1000),
1239 /// )?;
1240 /// ```
1241 pub fn query_at<S: ProjectionStore>(
1242 &self,
1243 store: &mut S,
1244 sql: &str,
1245 params: &[Value],
1246 position: Offset,
1247 ) -> Result<QueryResult> {
1248 let stmt = self.parse_query_statement_cached(sql)?;
1249
1250 match stmt {
1251 parser::ParsedStatement::Select(parsed) => {
1252 let plan = planner::plan_query(&self.schema, &parsed, params)?;
1253 let table_def = self
1254 .schema
1255 .get_table(&plan.table_name().into())
1256 .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
1257 executor::execute_at(store, &plan, table_def, position)
1258 }
1259 parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
1260 "UNION is not supported in point-in-time queries".to_string(),
1261 )),
1262 _ => unreachable!("parse_query_statement only returns Select or Union"),
1263 }
1264 }
1265
1266 /// Executes a query against a historical snapshot selected by
1267 /// wall-clock timestamp (AUDIT-2026-04 L-4).
1268 ///
1269 /// This is the user-facing ergonomic form of
1270 /// [`Self::query_at`] — healthcare auditors ask "what did the
1271 /// chart look like on 2026-01-15?", not "what was log offset
1272 /// 948,274?". The caller supplies a `resolver` callback that
1273 /// translates a Unix-nanosecond timestamp into the log offset
1274 /// whose commit timestamp is the greatest value ≤ the target.
1275 ///
1276 /// The resolver is a callback rather than a hard dependency
1277 /// so the query crate does not take a direct dep on
1278 /// `kimberlite-compliance::audit` or the kernel's audit log.
1279 /// A typical impl performs a binary search on the in-memory
1280 /// audit index.
1281 ///
1282 /// # Errors
1283 ///
1284 /// - [`QueryError::UnsupportedFeature`] if the resolver
1285 /// returns `None` (no offset exists at or before the target
1286 /// — typically because the log is empty or the timestamp
1287 /// predates genesis).
1288 ///
1289 /// # Example
1290 ///
1291 /// ```ignore
1292 /// let resolver = |ts_ns: i64| -> Option<Offset> {
1293 /// audit_log.offset_at_or_before(ts_ns)
1294 /// };
1295 /// let result = engine.query_at_timestamp(
1296 /// &mut store,
1297 /// "SELECT * FROM charts WHERE patient_id = $1",
1298 /// &[Value::BigInt(42)],
1299 /// 1_760_000_000_000_000_000, // 2025-10-09T07:06:40Z in ns
1300 /// resolver,
1301 /// )?;
1302 /// ```
1303 pub fn query_at_timestamp<S, R>(
1304 &self,
1305 store: &mut S,
1306 sql: &str,
1307 params: &[Value],
1308 target_ns: i64,
1309 resolver: R,
1310 ) -> Result<QueryResult>
1311 where
1312 S: ProjectionStore,
1313 R: FnOnce(i64) -> Option<Offset>,
1314 {
1315 let offset = resolver(target_ns).ok_or_else(|| {
1316 QueryError::UnsupportedFeature(format!(
1317 "no log offset at or before timestamp {target_ns} ns \
1318 (empty log or predates genesis)"
1319 ))
1320 })?;
1321 self.query_at(store, sql, params, offset)
1322 }
1323
1324 /// Executes a query against a historical snapshot selected by
1325 /// wall-clock timestamp, with a resolver that can distinguish
1326 /// the "timestamp predates retention horizon" case from a plain
1327 /// "no offset found".
1328 ///
1329 /// v0.6.0 Tier 2 #6: this is the runtime-layer variant of
1330 /// [`Self::query_at_timestamp`] used by `TenantHandle::query`
1331 /// when the resolver has a concrete notion of a retention
1332 /// horizon (e.g. an in-memory timestamp index maintained at
1333 /// append time). The existing `query_at_timestamp` stays as-is
1334 /// so callers with an `Option<Offset>` resolver (e.g. ad-hoc
1335 /// binary search over an external index) keep working.
1336 ///
1337 /// # Resolution semantics
1338 ///
1339 /// - `TimestampResolution::Offset(o)` → execute at `o`.
1340 /// - `TimestampResolution::BeforeRetentionHorizon { horizon_ns }` →
1341 /// [`QueryError::AsOfBeforeRetentionHorizon`] with both the
1342 /// requested and horizon timestamps.
1343 /// - `TimestampResolution::LogEmpty` →
1344 /// [`QueryError::UnsupportedFeature`] (same message the
1345 /// ergonomic form emits for an empty log).
1346 pub fn query_at_timestamp_resolved<S, R>(
1347 &self,
1348 store: &mut S,
1349 sql: &str,
1350 params: &[Value],
1351 target_ns: i64,
1352 resolver: R,
1353 ) -> Result<QueryResult>
1354 where
1355 S: ProjectionStore,
1356 R: FnOnce(i64) -> TimestampResolution,
1357 {
1358 match resolver(target_ns) {
1359 TimestampResolution::Offset(offset) => self.query_at(store, sql, params, offset),
1360 TimestampResolution::BeforeRetentionHorizon { horizon_ns } => {
1361 Err(QueryError::AsOfBeforeRetentionHorizon {
1362 requested_ns: target_ns,
1363 horizon_ns,
1364 })
1365 }
1366 TimestampResolution::LogEmpty => Err(QueryError::UnsupportedFeature(format!(
1367 "no log offset at or before timestamp {target_ns} ns \
1368 (empty log or predates genesis)"
1369 ))),
1370 }
1371 }
1372
1373 /// AUDIT-2026-04 S3.3 — render a SQL query's access plan
1374 /// without executing it.
1375 ///
1376 /// Returns a deterministic multi-line tree string — same
1377 /// query always produces the same bytes, which lets apps
1378 /// diff plans across schema versions and catch unexpected
1379 /// regressions.
1380 ///
1381 /// The rendered plan **never reveals row data** — only table
1382 /// names, column counts, filter presence/absence, and LIMIT
1383 /// bounds. Masked column names render as their source name
1384 /// (masks are applied post-projection and are not a plan
1385 /// concern).
1386 ///
1387 /// # Errors
1388 ///
1389 /// Any error from parsing or planning (unsupported statement,
1390 /// missing table, etc.) propagates verbatim.
1391 ///
1392 /// # Example
1393 ///
1394 /// ```ignore
1395 /// let tree = engine.explain("SELECT * FROM patients WHERE id = $1", &[Value::BigInt(42)])?;
1396 /// println!("{tree}");
1397 /// // -> PointLookup [patients, cols=3]
1398 /// ```
1399 pub fn explain(&self, sql: &str, params: &[Value]) -> Result<String> {
1400 let stmt = self.parse_query_statement_cached(sql)?;
1401 match stmt {
1402 parser::ParsedStatement::Select(parsed) => {
1403 let plan = planner::plan_query(&self.schema, &parsed, params)?;
1404 Ok(explain::explain_plan(&plan))
1405 }
1406 parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
1407 "EXPLAIN does not yet render UNION plans".to_string(),
1408 )),
1409 _ => unreachable!("parse_query_statement only returns Select or Union"),
1410 }
1411 }
1412
1413 /// Parses a SQL query without executing it.
1414 ///
1415 /// Useful for validation or query plan inspection.
1416 pub fn prepare(&self, sql: &str, params: &[Value]) -> Result<PreparedQuery> {
1417 let stmt = self.parse_query_statement_cached(sql)?;
1418 let parsed = match stmt {
1419 parser::ParsedStatement::Select(s) => s,
1420 _ => {
1421 return Err(QueryError::UnsupportedFeature(
1422 "only SELECT queries can be prepared".to_string(),
1423 ));
1424 }
1425 };
1426 let plan = planner::plan_query(&self.schema, &parsed, params)?;
1427
1428 Ok(PreparedQuery {
1429 plan,
1430 schema: self.schema.clone(),
1431 })
1432 }
1433}
1434
1435/// A prepared (planned) query ready for execution.
1436#[derive(Debug, Clone)]
1437pub struct PreparedQuery {
1438 plan: plan::QueryPlan,
1439 schema: Schema,
1440}
1441
1442impl PreparedQuery {
1443 /// Executes this prepared query against the current store state.
1444 pub fn execute<S: ProjectionStore>(&self, store: &mut S) -> Result<QueryResult> {
1445 let table_def = self
1446 .schema
1447 .get_table(&self.plan.table_name().into())
1448 .ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
1449
1450 executor::execute(store, &self.plan, table_def)
1451 }
1452
1453 /// Executes this prepared query at a specific log position.
1454 pub fn execute_at<S: ProjectionStore>(
1455 &self,
1456 store: &mut S,
1457 position: Offset,
1458 ) -> Result<QueryResult> {
1459 let table_def = self
1460 .schema
1461 .get_table(&self.plan.table_name().into())
1462 .ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
1463
1464 executor::execute_at(store, &self.plan, table_def, position)
1465 }
1466
1467 /// Returns the column names this query will return.
1468 pub fn columns(&self) -> &[ColumnName] {
1469 self.plan.column_names()
1470 }
1471
1472 /// Returns the table name being queried.
1473 pub fn table_name(&self) -> &str {
1474 self.plan.table_name()
1475 }
1476}
1477
1478/// True iff any top-level predicate is a surviving correlated
1479/// subquery (`InSubquery` / `Exists`). Uncorrelated subqueries were
1480/// rewritten by `pre_execute_subqueries` before this is called.
1481fn has_correlated_predicate(predicates: &[parser::Predicate]) -> bool {
1482 predicates.iter().any(|p| {
1483 matches!(
1484 p,
1485 parser::Predicate::InSubquery { .. } | parser::Predicate::Exists { .. }
1486 )
1487 })
1488}
1489
1490/// Extract outer references from a surviving correlated predicate.
1491/// Used by the correlated-loop executor to populate the
1492/// `alias.column → Value` binding map for each outer row.
1493fn correlated_predicate_outer_refs(pred: &parser::Predicate) -> Vec<correlated::OuterRef> {
1494 let subquery = match pred {
1495 parser::Predicate::InSubquery { subquery, .. }
1496 | parser::Predicate::Exists { subquery, .. } => subquery,
1497 _ => return Vec::new(),
1498 };
1499 // Walk inner predicates gathering any qualified ColumnRef —
1500 // post-pre-execute, anything that remains as a ColumnRef with a
1501 // qualifier is an outer reference (the inner FROM has its own
1502 // bare-column refs).
1503 let mut out = Vec::new();
1504 for pred in &subquery.predicates {
1505 collect_refs_in_pred(pred, &mut out);
1506 }
1507 out
1508}
1509
1510fn collect_refs_in_pred(pred: &parser::Predicate, out: &mut Vec<correlated::OuterRef>) {
1511 let push_if_colref = |pv: &parser::PredicateValue, out: &mut Vec<correlated::OuterRef>| {
1512 if let parser::PredicateValue::ColumnRef(raw) = pv {
1513 if let Some((q, c)) = raw.split_once('.') {
1514 out.push(correlated::OuterRef {
1515 qualifier: q.to_string(),
1516 column: schema::ColumnName::new(c.to_string()),
1517 scope_depth: 1,
1518 });
1519 }
1520 }
1521 };
1522 match pred {
1523 parser::Predicate::Eq(_, v)
1524 | parser::Predicate::Lt(_, v)
1525 | parser::Predicate::Le(_, v)
1526 | parser::Predicate::Gt(_, v)
1527 | parser::Predicate::Ge(_, v) => push_if_colref(v, out),
1528 parser::Predicate::In(_, vs) | parser::Predicate::NotIn(_, vs) => {
1529 for v in vs {
1530 push_if_colref(v, out);
1531 }
1532 }
1533 parser::Predicate::NotBetween(_, lo, hi) => {
1534 push_if_colref(lo, out);
1535 push_if_colref(hi, out);
1536 }
1537 parser::Predicate::Or(l, r) => {
1538 for p in l {
1539 collect_refs_in_pred(p, out);
1540 }
1541 for p in r {
1542 collect_refs_in_pred(p, out);
1543 }
1544 }
1545 _ => {}
1546 }
1547}
1548
1549/// Converts a Value to a serde_json::Value for CTE materialization.
1550fn value_to_json(val: &Value) -> serde_json::Value {
1551 // NEVER: Placeholder values must be bound before reaching the CTE /
1552 // query-result JSON boundary. An unbound Placeholder here indicates a
1553 // parameter-binding bug (AUDIT fix: placeholders must be resolved upstream).
1554 kimberlite_properties::never!(
1555 matches!(val, Value::Placeholder(_)),
1556 "query.placeholder_reaches_result_boundary",
1557 "Value::Placeholder must never reach query-result / JSON serialization boundary"
1558 );
1559 match val {
1560 Value::Null | Value::Placeholder(_) => serde_json::Value::Null,
1561 Value::BigInt(i) => serde_json::json!(i),
1562 Value::TinyInt(i) => serde_json::json!(i),
1563 Value::SmallInt(i) => serde_json::json!(i),
1564 Value::Integer(i) => serde_json::json!(i),
1565 Value::Real(f) => serde_json::json!(f),
1566 Value::Decimal(v, scale) => {
1567 // Format decimal: store the raw value and scale as a string
1568 if *scale == 0 {
1569 serde_json::json!(v.to_string())
1570 } else {
1571 let divisor = 10i128.pow(u32::from(*scale));
1572 let whole = v / divisor;
1573 let frac = (v % divisor).unsigned_abs();
1574 serde_json::json!(format!("{whole}.{frac:0>width$}", width = *scale as usize))
1575 }
1576 }
1577 Value::Text(s) => serde_json::json!(s),
1578 Value::Boolean(b) => serde_json::json!(b),
1579 Value::Date(d) => serde_json::json!(d),
1580 Value::Time(t) => serde_json::json!(t),
1581 Value::Timestamp(ts) => serde_json::json!(ts.as_nanos()),
1582 Value::Uuid(u) => {
1583 // Format UUID bytes as hex string
1584 let hex: String = u.iter().map(|b| format!("{b:02x}")).collect();
1585 serde_json::json!(hex)
1586 }
1587 Value::Json(j) => j.clone(),
1588 Value::Bytes(b) => {
1589 use base64::Engine;
1590 serde_json::json!(base64::engine::general_purpose::STANDARD.encode(b))
1591 }
1592 }
1593}