kimberlite_query/lib.rs
1//! # kmb-query: SQL query layer for `Kimberlite` projections
2//!
3//! This crate provides a minimal SQL query engine for compliance lookups
4//! against the projection store.
5//!
6//! ## SQL Subset
7//!
8//! Supported SQL features:
9//! - `SELECT` with column list or `*`
10//! - `FROM` single table or `JOIN` (`INNER`, `LEFT`, `RIGHT`, `FULL OUTER`,
11//! `CROSS`, with `ON` or `USING(...)` clauses)
12//! - `WHERE` with comparison predicates (`=`, `<`, `>`, `<=`, `>=`, `IN`)
13//! - `IN (SELECT)`, `NOT IN (SELECT)`, `EXISTS`, `NOT EXISTS` — both
14//! uncorrelated (pre-execute fast path) and correlated (semi-join
15//! decorrelation or nested-loop fallback; see
16//! `docs/reference/sql/correlated-subqueries.md`)
17//! - `ORDER BY` (ascending/descending)
18//! - `LIMIT` and `OFFSET` — literal or `$N` parameter
19//! - `GROUP BY` with aggregates (`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`)
20//! - Aggregate `FILTER (WHERE ...)` clauses, independent per aggregate
21//! - `HAVING` with aggregate filtering
22//! - `UNION` / `UNION ALL` / `INTERSECT` / `INTERSECT ALL` /
23//! `EXCEPT` / `EXCEPT ALL`
24//! - `DISTINCT`
25//! - JSON operators `->`, `->>`, `@>` in WHERE clauses
26//! - `CASE` (searched and simple form)
27//! - `WITH` (Common Table Expressions / CTEs), including `WITH RECURSIVE`
28//! via iterative fixed-point evaluation (depth cap 1000)
29//! - Subqueries in FROM and JOIN (`SELECT * FROM (SELECT ...) AS t`)
30//! - Window functions (`OVER`, `PARTITION BY`, `ROW_NUMBER`, `RANK`,
31//! `DENSE_RANK`, `LAG`, `LEAD`, `FIRST_VALUE`, `LAST_VALUE`)
32//! - `ALTER TABLE` (ADD COLUMN, DROP COLUMN) — parser only; kernel
33//! execution pending
34//! - Parameterized queries (`$1`, `$2`, ...) in WHERE, LIMIT, OFFSET, and
35//! DML values
36//!
37//! - Scalar functions in SELECT projection and WHERE predicates:
38//! `UPPER`, `LOWER`, `LENGTH`, `TRIM`, `CONCAT`, `||`, `ABS`,
39//! `ROUND`, `CEIL`/`CEILING`, `FLOOR`, `COALESCE`, `NULLIF`, `CAST`
40//! - `ILIKE`, `NOT LIKE`, `NOT ILIKE` pattern matching
41//! - `NOT IN (list)`, `NOT BETWEEN low AND high`
42//!
43//! Not yet supported:
44//! - Scalar subquery `WHERE col = (SELECT ...)`, `ANY`, `ALL`, `SOME`
45//! - Clock-dependent functions (`NOW()`, `CURRENT_DATE`, `EXTRACT`,
46//! `DATE_TRUNC`) — deferred pending a clock-threading decision
47//! - `MOD`, `POWER`, `SQRT`, `SUBSTRING` — deferred
48//!
49//! ## Usage
50//!
51//! ```ignore
52//! use kimberlite_query::{QueryEngine, Schema, SchemaBuilder, ColumnDef, DataType, Value};
53//! use kimberlite_store::{BTreeStore, TableId};
54//!
55//! // Define schema
56//! let schema = SchemaBuilder::new()
57//! .table(
58//! "users",
59//! TableId::new(1),
60//! vec![
61//! ColumnDef::new("id", DataType::BigInt).not_null(),
62//! ColumnDef::new("name", DataType::Text).not_null(),
63//! ],
64//! vec!["id".into()],
65//! )
66//! .build();
67//!
68//! // Create engine
69//! let engine = QueryEngine::new(schema);
70//!
71//! // Execute query
72//! let mut store = BTreeStore::open("data/projections")?;
73//! let result = engine.query(&mut store, "SELECT * FROM users WHERE id = $1", &[Value::BigInt(42)])?;
74//! ```
75//!
76//! ## Point-in-Time Queries
77//!
78//! For compliance, you can query at a specific log position:
79//!
80//! ```ignore
81//! let result = engine.query_at(
82//! &mut store,
83//! "SELECT * FROM users WHERE id = 1",
84//! &[],
85//! Offset::new(1000), // Query state as of log position 1000
86//! )?;
87//! ```
88//!
89//! ## Scalar expressions (v0.5.1)
90//!
91//! The parser accepts scalar functions in SELECT projection and WHERE
92//! predicates. Each of these queries parses cleanly and produces a
93//! `ParsedStatement::Select` with either a `ScalarCmp` predicate or
94//! entries in `scalar_projections`:
95//!
96//! ```
97//! use kimberlite_query::{parse_statement, ParsedStatement, Predicate};
98//!
99//! // WHERE col NOT IN (list) — mirror of IN, v0.5.1.
100//! let s = parse_statement("SELECT id FROM t WHERE x NOT IN (1, 2, 3)").unwrap();
101//! let ParsedStatement::Select(sel) = s else { panic!() };
102//! assert!(matches!(sel.predicates[0], Predicate::NotIn(_, _)));
103//!
104//! // WHERE UPPER(name) = 'ALICE' — scalar LHS routes to ScalarCmp.
105//! let s = parse_statement("SELECT id FROM t WHERE UPPER(name) = 'ALICE'").unwrap();
106//! let ParsedStatement::Select(sel) = s else { panic!() };
107//! assert!(matches!(sel.predicates[0], Predicate::ScalarCmp { .. }));
108//!
109//! // SELECT col AS alias — alias preserved end-to-end.
110//! let s = parse_statement("SELECT name AS display FROM t").unwrap();
111//! let ParsedStatement::Select(sel) = s else { panic!() };
112//! let aliases = sel.column_aliases.as_ref().unwrap();
113//! assert_eq!(aliases[0].as_deref(), Some("display"));
114//!
115//! // SELECT CAST(x AS INTEGER) — lands in scalar_projections with a
116//! // synthesised output column name.
117//! let s = parse_statement("SELECT CAST(x AS INTEGER) FROM t").unwrap();
118//! let ParsedStatement::Select(sel) = s else { panic!() };
119//! assert_eq!(sel.scalar_projections.len(), 1);
120//! assert_eq!(sel.scalar_projections[0].output_name.as_str(), "cast");
121//! ```
122
123pub mod correlated;
124pub mod depth_check;
125pub mod dml_planner;
126mod error;
127mod executor;
128pub mod explain;
129pub mod expression;
130pub mod information_schema;
131pub mod key_encoder;
132mod parse_cache;
133mod parser;
134mod plan;
135mod planner;
136pub mod rbac_filter;
137mod schema;
138mod value;
139pub mod window;
140
141#[cfg(test)]
142mod tests;
143
144// Re-export public types
145pub use error::{QueryError, Result};
146pub use executor::{QueryResult, Row, execute};
147pub use expression::{EvalContext, ScalarExpr, evaluate};
148pub use parser::{
149 AlterTableOperation, HavingCondition, HavingOp, OnConflictAction, OnConflictClause,
150 ParsedAlterTable, ParsedAttachMaskingPolicy, ParsedColumn, ParsedCreateIndex, ParsedCreateMask,
151 ParsedCreateMaskingPolicy, ParsedCreateTable, ParsedCreateUser, ParsedCte, ParsedDelete,
152 ParsedDetachMaskingPolicy, ParsedGrant, ParsedInsert, ParsedMaskingStrategy, ParsedSelect,
153 ParsedSetClassification, ParsedStatement, ParsedUnion, ParsedUpdate, Predicate, PredicateValue,
154 ScalarCmpOp, TimeTravel, UpsertExpr, expr_to_scalar_expr, extract_at_offset,
155 extract_time_travel, parse_statement, try_parse_custom_statement,
156};
157pub use planner::plan_query;
158pub use schema::{
159 ColumnDef, ColumnName, DataType, IndexDef, Schema, SchemaBuilder, TableDef, TableName,
160};
161pub use value::Value;
162
163use kimberlite_store::ProjectionStore;
164use kimberlite_types::Offset;
165
166/// Outcome returned by a timestamp→offset resolver.
167///
168/// v0.6.0 Tier 2 #6 — a resolver that owns its own index (the
169/// runtime's in-memory timestamp index, an audit-log-backed index,
170/// etc.) can distinguish three cases that `Option<Offset>` cannot:
171///
172/// - We found an offset at or before the requested timestamp.
173/// - The log has entries, but the earliest is *after* the requested
174/// timestamp — i.e. the request predates the retention horizon.
175/// Surfacing this as a distinct variant lets the query layer emit
176/// [`QueryError::AsOfBeforeRetentionHorizon`] with the horizon
177/// attached, which is far more actionable to the caller.
178/// - The log is empty — no timestamps recorded yet.
179///
180/// See [`QueryEngine::query_at_timestamp_resolved`] for the
181/// consumer of this type.
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183pub enum TimestampResolution {
184 /// Resolver found a projection offset whose commit timestamp is
185 /// the greatest value ≤ the target.
186 Offset(Offset),
187 /// Resolver has entries, but the earliest commit timestamp is
188 /// strictly greater than the target. `horizon_ns` is that
189 /// earliest timestamp, so callers can tell users "the oldest
190 /// retained data is `<horizon>`, try a later instant".
191 BeforeRetentionHorizon { horizon_ns: i64 },
192 /// Resolver has no entries at all (fresh DB or the index hasn't
193 /// been seeded yet). Indistinguishable from "predates genesis"
194 /// in observable behaviour — surfaced via `UnsupportedFeature`.
195 LogEmpty,
196}
197
198/// Query engine for executing SQL against a projection store.
199///
200/// Holds the schema plus an optional parse cache (AUDIT-2026-04
201/// S3.4). The engine is `Clone` — the parse cache is shared via
202/// `Arc` so cloned handles hit the same memoised entries.
203#[derive(Debug, Clone)]
204pub struct QueryEngine {
205 schema: Schema,
206 parse_cache: Option<std::sync::Arc<parse_cache::ParseCache>>,
207 /// Cap on `outer_rows × inner_rows_per_iter` for correlated
208 /// subquery loops. See `docs/reference/sql/correlated-subqueries.md`.
209 /// Defaults to [`correlated::DEFAULT_CORRELATED_CAP`] (10 million).
210 correlated_cap: u64,
211}
212
213impl QueryEngine {
214 /// Creates a new query engine with the given schema. No
215 /// parse cache is attached by default — use
216 /// [`Self::with_parse_cache`] to opt in.
217 pub fn new(schema: Schema) -> Self {
218 Self {
219 schema,
220 parse_cache: None,
221 correlated_cap: correlated::DEFAULT_CORRELATED_CAP,
222 }
223 }
224
225 /// Attach an LRU parse cache of the given size. `0` disables
226 /// caching (every call re-parses).
227 #[must_use]
228 pub fn with_parse_cache(mut self, max_size: usize) -> Self {
229 self.parse_cache = Some(std::sync::Arc::new(parse_cache::ParseCache::new(max_size)));
230 self
231 }
232
233 /// Override the correlated-subquery row-evaluation cap. Defaults
234 /// to 10,000,000. Queries whose estimated
235 /// `outer_rows × inner_rows_per_iter` exceeds this cap fail with
236 /// [`QueryError::CorrelatedCardinalityExceeded`] before the
237 /// correlated loop runs. Set to `u64::MAX` to effectively disable.
238 #[must_use]
239 pub fn with_correlated_cap(mut self, cap: u64) -> Self {
240 self.correlated_cap = cap;
241 self
242 }
243
244 /// Returns a snapshot of parse-cache stats, or `None` if no
245 /// cache is attached.
246 pub fn parse_cache_stats(&self) -> Option<parse_cache::ParseCacheStats> {
247 self.parse_cache
248 .as_deref()
249 .map(parse_cache::ParseCache::stats)
250 }
251
252 /// Clear the parse cache, if any.
253 pub fn clear_parse_cache(&self) {
254 if let Some(c) = &self.parse_cache {
255 c.clear();
256 }
257 }
258
259 /// Returns a reference to the schema.
260 pub fn schema(&self) -> &Schema {
261 &self.schema
262 }
263
264 /// Parses a SQL string and extracts the SELECT or UNION statement.
265 ///
266 /// Static variant — bypasses the parse cache. Used by call
267 /// sites that predate the cache and by internal recursive
268 /// parsers.
269 fn parse_query_statement(sql: &str) -> Result<parser::ParsedStatement> {
270 let stmt = parser::parse_statement(sql)?;
271 match &stmt {
272 parser::ParsedStatement::Select(_) | parser::ParsedStatement::Union(_) => Ok(stmt),
273 _ => Err(QueryError::UnsupportedFeature(
274 "only SELECT and UNION queries are supported".to_string(),
275 )),
276 }
277 }
278
279 /// Cache-aware parse wrapper.
280 ///
281 /// Looks the SQL up in the parse cache (if attached). On
282 /// miss, parses via [`Self::parse_query_statement`] and
283 /// inserts into the cache. Non-SELECT/UNION errors are
284 /// returned directly without populating the cache — they're
285 /// errors for every subsequent call anyway and we don't want
286 /// to memoise them.
287 fn parse_query_statement_cached(&self, sql: &str) -> Result<parser::ParsedStatement> {
288 if let Some(cache) = &self.parse_cache {
289 if let Some(stmt) = cache.get(sql) {
290 return Ok(stmt);
291 }
292 }
293 let stmt = Self::parse_query_statement(sql)?;
294 if let Some(cache) = &self.parse_cache {
295 cache.insert(sql.to_string(), stmt.clone());
296 }
297 Ok(stmt)
298 }
299
300 /// Executes a SQL query against the current store state.
301 ///
302 /// Supports SELECT and UNION/UNION ALL queries.
303 ///
304 /// # Arguments
305 ///
306 /// * `store` - The projection store to query
307 /// * `sql` - SQL query string
308 /// * `params` - Query parameters (for `$1`, `$2`, etc.)
309 ///
310 /// # Example
311 ///
312 /// ```ignore
313 /// let result = engine.query(
314 /// &mut store,
315 /// "SELECT name FROM users WHERE id = $1",
316 /// &[Value::BigInt(42)],
317 /// )?;
318 /// ```
319 pub fn query<S: ProjectionStore>(
320 &self,
321 store: &mut S,
322 sql: &str,
323 params: &[Value],
324 ) -> Result<QueryResult> {
325 // AUDIT-2026-04 S3.5 — healthcare BREAK_GLASS prefix.
326 // `WITH BREAK_GLASS REASON='...' SELECT ...` strips the
327 // prefix, emits a warn-level structured log for the
328 // caller's audit pipeline to pick up, and falls through
329 // to the normal query path. The reason is the attribution
330 // value — enforcement (RBAC + masking) is still applied.
331 let (after_break_glass, break_glass_reason) = explain::extract_break_glass(sql);
332 if let Some(ref reason) = break_glass_reason {
333 tracing::warn!(
334 break_glass_reason = %reason,
335 "BREAK_GLASS query — regulator-visible audit signal",
336 );
337 }
338 let sql = after_break_glass;
339
340 // AUDIT-2026-04 S3.4 — `information_schema.*` virtual-table
341 // interception. Synthesises results from the live schema
342 // without going through the planner/executor. Callers
343 // that point FROM at `information_schema.tables` or
344 // `.columns` get back schema introspection rows without
345 // the table needing to be registered in the store.
346 if let Some(result) = information_schema::maybe_answer(sql, &self.schema) {
347 return Ok(result);
348 }
349
350 // AUDIT-2026-04 S3.3 — EXPLAIN prefix dispatch. A caller
351 // issuing `EXPLAIN SELECT ...` gets a single-row result
352 // whose only value is the rendered plan tree, rather
353 // than executing the statement.
354 let (after_explain, is_explain) = explain::extract_explain(sql);
355 if is_explain {
356 let plan_text = self.explain(after_explain, params)?;
357 return Ok(executor::QueryResult {
358 columns: vec!["plan".into()],
359 rows: vec![vec![Value::Text(plan_text)]],
360 });
361 }
362 let sql = after_explain; // equivalent to original `sql` when EXPLAIN absent
363
364 // Extract time-travel clause (AT OFFSET / FOR SYSTEM_TIME AS OF / AS OF)
365 // before passing SQL to sqlparser. Offset syntax dispatches
366 // directly; timestamp syntax without a resolver errors out
367 // with a clear message pointing to
368 // `query_at_timestamp(..., resolver)`.
369 let (cleaned_sql, time_travel) = parser::extract_time_travel(sql);
370 match time_travel {
371 Some(parser::TimeTravel::Offset(o)) => {
372 return self.query_at(store, &cleaned_sql, params, Offset::new(o));
373 }
374 Some(parser::TimeTravel::TimestampNs(_)) => {
375 return Err(QueryError::UnsupportedFeature(
376 "FOR SYSTEM_TIME AS OF '<iso>' / AS OF '<iso>' \
377 requires a timestamp→offset resolver — use \
378 QueryEngine::query_at_timestamp(..., resolver)"
379 .to_string(),
380 ));
381 }
382 None => {}
383 }
384
385 let stmt = self.parse_query_statement_cached(sql)?;
386
387 match stmt {
388 parser::ParsedStatement::Select(mut parsed) => {
389 // Pre-execute uncorrelated subqueries (IN/EXISTS/NOT EXISTS),
390 // attempt semi-join decorrelation of correlated ones, and
391 // leave remaining correlated predicates in place for the
392 // correlated-loop executor.
393 //
394 // `outer_scope` is the enclosing scope stack visible to the
395 // subquery; at the top level we seed it with the outer
396 // SELECT's FROM table so the walker can detect
397 // correlation.
398 self.pre_execute_subqueries(store, &mut parsed, params)?;
399
400 let window_fns = parsed.window_fns.clone();
401 let result = if parsed.ctes.is_empty() {
402 if has_correlated_predicate(&parsed.predicates) {
403 self.execute_correlated_query(store, &parsed, params)?
404 } else {
405 // `plan_query` folds time-now sentinels via
406 // AUDIT-2026-05 S3.7 — see `planner.rs`.
407 let plan = planner::plan_query(&self.schema, &parsed, params)?;
408 let table_def = self
409 .schema
410 .get_table(&plan.table_name().into())
411 .ok_or_else(|| {
412 QueryError::TableNotFound(plan.table_name().to_string())
413 })?;
414 executor::execute(store, &plan, table_def)?
415 }
416 } else {
417 self.execute_with_ctes(store, &parsed, params)?
418 };
419 // AUDIT-2026-04 S3.2 — window functions are a
420 // post-pass over the base SELECT result; the base
421 // plan already projected the columns the window
422 // fn references.
423 window::apply_window_fns(result, &window_fns)
424 }
425 parser::ParsedStatement::Union(union_stmt) => {
426 self.execute_union(store, &union_stmt, params)
427 }
428 _ => unreachable!("parse_query_statement only returns Select or Union"),
429 }
430 }
431
432 /// Executes a SELECT with CTEs by materializing each CTE and building
433 /// a temporary schema that includes the CTE result sets as tables.
434 fn execute_with_ctes<S: ProjectionStore>(
435 &self,
436 store: &mut S,
437 parsed: &parser::ParsedSelect,
438 params: &[Value],
439 ) -> Result<QueryResult> {
440 // Build an extended schema that includes CTE-derived tables
441 let mut extended_schema = self.schema.clone();
442
443 // Materialize each CTE
444 for cte in &parsed.ctes {
445 // Execute the CTE's anchor (non-recursive) query
446 let cte_plan = planner::plan_query(&extended_schema, &cte.query, params)?;
447 let cte_table_def = extended_schema
448 .get_table(&cte_plan.table_name().into())
449 .ok_or_else(|| QueryError::TableNotFound(cte_plan.table_name().to_string()))?;
450 let mut cte_result = executor::execute(store, &cte_plan, cte_table_def)?;
451
452 // Register CTE result as a virtual table in the extended schema
453 // Use a synthetic table ID based on the CTE name hash
454 let cte_table_id = {
455 use std::collections::hash_map::DefaultHasher;
456 use std::hash::{Hash, Hasher};
457 let mut hasher = DefaultHasher::new();
458 cte.name.hash(&mut hasher);
459 kimberlite_store::TableId::new(hasher.finish())
460 };
461
462 // Build column defs from the CTE result
463 let cte_columns: Vec<schema::ColumnDef> = cte_result
464 .columns
465 .iter()
466 .map(|col| schema::ColumnDef::new(col.as_str(), schema::DataType::Text))
467 .collect();
468
469 let pk_cols = if cte_columns.is_empty() {
470 vec![]
471 } else {
472 vec![cte_result.columns[0].clone()]
473 };
474
475 let cte_table = schema::TableDef::new(cte_table_id, cte_columns, pk_cols);
476 extended_schema.add_table(cte.name.as_str(), cte_table);
477
478 // Write anchor rows into the store as the initial working set.
479 let mut total_rows = cte_result.rows.len();
480 for (row_idx, row) in cte_result.rows.iter().enumerate() {
481 Self::write_cte_row(store, cte_table_id, row_idx, &cte_result.columns, row)?;
482 }
483
484 // Recursive iteration: keep evaluating the recursive arm against
485 // the growing CTE table until no new rows are produced or the
486 // depth cap is hit. Iterative fixed-point — honours the
487 // workspace "no recursion" constraint and prevents runaway loops.
488 if let Some(recursive_select) = &cte.recursive_arm {
489 const MAX_RECURSIVE_DEPTH: usize = 1000;
490 let mut seen: std::collections::HashSet<String> =
491 cte_result.rows.iter().map(|r| format!("{r:?}")).collect();
492
493 for depth in 0..MAX_RECURSIVE_DEPTH {
494 let recursive_plan =
495 planner::plan_query(&extended_schema, recursive_select, params)?;
496 let recursive_table_def = extended_schema
497 .get_table(&recursive_plan.table_name().into())
498 .ok_or_else(|| {
499 QueryError::TableNotFound(recursive_plan.table_name().to_string())
500 })?;
501 let iteration_result =
502 executor::execute(store, &recursive_plan, recursive_table_def)?;
503
504 let mut new_rows = 0usize;
505 for row in iteration_result.rows {
506 let key = format!("{row:?}");
507 if seen.insert(key) {
508 Self::write_cte_row(
509 store,
510 cte_table_id,
511 total_rows,
512 &cte_result.columns,
513 &row,
514 )?;
515 cte_result.rows.push(row);
516 total_rows += 1;
517 new_rows += 1;
518 }
519 }
520 if new_rows == 0 {
521 break;
522 }
523 if depth + 1 == MAX_RECURSIVE_DEPTH {
524 return Err(QueryError::UnsupportedFeature(format!(
525 "recursive CTE `{}` exceeded maximum depth of {MAX_RECURSIVE_DEPTH} iterations",
526 cte.name
527 )));
528 }
529 }
530 }
531 }
532
533 // Execute the main query against the extended schema
534 let main_query = parser::ParsedSelect {
535 ctes: vec![], // CTEs already materialized
536 ..parsed.clone()
537 };
538
539 let plan = planner::plan_query(&extended_schema, &main_query, params)?;
540 let table_def = extended_schema
541 .get_table(&plan.table_name().into())
542 .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
543 executor::execute(store, &plan, table_def)
544 }
545
546 /// Writes a single CTE row into the store under the synthetic table id.
547 /// Helper extracted so the recursive-CTE iteration loop and the initial
548 /// anchor materialisation share the same path.
549 fn write_cte_row<S: ProjectionStore>(
550 store: &mut S,
551 cte_table_id: kimberlite_store::TableId,
552 row_idx: usize,
553 columns: &[crate::schema::ColumnName],
554 row: &[Value],
555 ) -> Result<()> {
556 let mut row_map = serde_json::Map::new();
557 for (col, val) in columns.iter().zip(row.iter()) {
558 row_map.insert(col.as_str().to_string(), value_to_json(val));
559 }
560 let json_val = serde_json::to_vec(&serde_json::Value::Object(row_map)).map_err(|e| {
561 QueryError::UnsupportedFeature(format!("CTE serialization failed: {e}"))
562 })?;
563 let pk_key = crate::key_encoder::encode_key(&[Value::BigInt(row_idx as i64)]);
564 let batch = kimberlite_store::WriteBatch::new(kimberlite_types::Offset::new(
565 store.applied_position().as_u64() + 1,
566 ))
567 .put(cte_table_id, pk_key, bytes::Bytes::from(json_val));
568 store.apply(batch)?;
569 Ok(())
570 }
571
572 /// Walks the outer SELECT's predicate tree, classifies each
573 /// subquery as uncorrelated or correlated, and rewrites as follows:
574 ///
575 /// - **Uncorrelated** `IN (SELECT)` / `EXISTS` / `NOT EXISTS` →
576 /// pre-executed once; result substituted into `Predicate::In` /
577 /// `Predicate::NotIn` / `Predicate::Always(bool)`. Matches the
578 /// v0.5.0 fast path.
579 /// - **Correlated** `EXISTS` / `NOT EXISTS` with a single equijoin
580 /// to the outer scope: rewritten by
581 /// [`correlated::try_semi_join_rewrite`] into
582 /// `Predicate::InSubquery { negated }` against the outer column,
583 /// then the uncorrelated path pre-executes it.
584 /// - **Correlated** everything else: left in place for
585 /// [`Self::execute_correlated_query`] to handle per-outer-row.
586 ///
587 /// Assertion: on return, every `InSubquery` / `Exists` still in
588 /// `parsed.predicates` is correlated. The caller's
589 /// `has_correlated_predicate` check uses that invariant to
590 /// dispatch.
591 fn pre_execute_subqueries<S: ProjectionStore>(
592 &self,
593 store: &mut S,
594 parsed: &mut parser::ParsedSelect,
595 params: &[Value],
596 ) -> Result<()> {
597 // Build the outer scope — the outer SELECT's FROM table (plus
598 // any JOIN tables) as the enclosing scope for each inner
599 // subquery.
600 let outer_scope = self.build_outer_scope(parsed);
601
602 let preds = std::mem::take(&mut parsed.predicates);
603 let mut out = Vec::with_capacity(preds.len());
604 for pred in preds {
605 out.push(self.classify_and_rewrite_predicate(store, pred, &outer_scope, params)?);
606 }
607 parsed.predicates = out;
608 Ok(())
609 }
610
611 /// Construct the outer `PlannerScope` for `parsed` — the visible
612 /// tables in the outer SELECT. FROM first, then any JOIN tables.
613 fn build_outer_scope<'s>(
614 &'s self,
615 parsed: &parser::ParsedSelect,
616 ) -> correlated::PlannerScope<'s> {
617 let mut bindings: Vec<(String, &schema::TableDef)> = Vec::new();
618 if let Some(t) = self.schema.get_table(&parsed.table.clone().into()) {
619 bindings.push((parsed.table.clone(), t));
620 }
621 for join in &parsed.joins {
622 if let Some(t) = self.schema.get_table(&join.table.clone().into()) {
623 bindings.push((join.table.clone(), t));
624 }
625 }
626 correlated::PlannerScope::empty().push(bindings)
627 }
628
629 /// Classifier for a single predicate — drives pre-execute vs.
630 /// semi-join rewrite vs. leave-as-correlated. Recursive on `Or`.
631 fn classify_and_rewrite_predicate<S: ProjectionStore>(
632 &self,
633 store: &mut S,
634 pred: parser::Predicate,
635 outer_scope: &correlated::PlannerScope<'_>,
636 params: &[Value],
637 ) -> Result<parser::Predicate> {
638 match pred {
639 parser::Predicate::InSubquery {
640 column,
641 subquery,
642 negated,
643 } => {
644 let outer_refs =
645 correlated::collect_outer_refs(&subquery, outer_scope, &self.schema);
646 if outer_refs.is_empty() {
647 // Uncorrelated — pre-execute and substitute.
648 self.pre_execute_uncorrelated_in(store, &column, &subquery, negated, params)
649 } else {
650 // Correlated IN/NOT IN — keep the predicate
651 // in place; the correlated-loop executor handles it.
652 Ok(parser::Predicate::InSubquery {
653 column,
654 subquery,
655 negated,
656 })
657 }
658 }
659 parser::Predicate::Exists { subquery, negated } => {
660 let outer_refs =
661 correlated::collect_outer_refs(&subquery, outer_scope, &self.schema);
662 if outer_refs.is_empty() {
663 // Uncorrelated.
664 self.pre_execute_uncorrelated_exists(store, &subquery, negated, params)
665 } else if let Some((outer_col, rewritten)) =
666 correlated::try_semi_join_rewrite(&subquery, negated, &outer_refs)
667 {
668 // Decorrelated: rewrite as IN (SELECT) / NOT IN (SELECT)
669 // against the outer column, then pre-execute.
670 self.pre_execute_uncorrelated_in(store, &outer_col, &rewritten, negated, params)
671 } else {
672 // Correlated loop fallback.
673 Ok(parser::Predicate::Exists { subquery, negated })
674 }
675 }
676 parser::Predicate::Or(left, right) => {
677 let mut new_left = Vec::with_capacity(left.len());
678 for p in left {
679 new_left.push(self.classify_and_rewrite_predicate(
680 store,
681 p,
682 outer_scope,
683 params,
684 )?);
685 }
686 let mut new_right = Vec::with_capacity(right.len());
687 for p in right {
688 new_right.push(self.classify_and_rewrite_predicate(
689 store,
690 p,
691 outer_scope,
692 params,
693 )?);
694 }
695 Ok(parser::Predicate::Or(new_left, new_right))
696 }
697 other => Ok(other),
698 }
699 }
700
701 /// Pre-execute an uncorrelated `IN (SELECT)` / `NOT IN (SELECT)`
702 /// and return the substituted `Predicate::In` / `Predicate::NotIn`.
703 fn pre_execute_uncorrelated_in<S: ProjectionStore>(
704 &self,
705 store: &mut S,
706 column: &schema::ColumnName,
707 subquery: &parser::ParsedSelect,
708 negated: bool,
709 params: &[Value],
710 ) -> Result<parser::Predicate> {
711 let inner_plan = planner::plan_query(&self.schema, subquery, params)?;
712 let inner_table_def = self
713 .schema
714 .get_table(&inner_plan.table_name().into())
715 .ok_or_else(|| QueryError::TableNotFound(inner_plan.table_name().to_string()))?;
716 let inner_result = executor::execute(store, &inner_plan, inner_table_def)?;
717 if inner_result.columns.len() != 1 {
718 return Err(QueryError::UnsupportedFeature(format!(
719 "IN (SELECT ...) subquery must project exactly 1 column, got {}",
720 inner_result.columns.len()
721 )));
722 }
723 let values: Vec<parser::PredicateValue> = inner_result
724 .rows
725 .into_iter()
726 .filter_map(|row| row.into_iter().next())
727 .map(parser::PredicateValue::Literal)
728 .collect();
729 Ok(if negated {
730 parser::Predicate::NotIn(column.clone(), values)
731 } else {
732 parser::Predicate::In(column.clone(), values)
733 })
734 }
735
736 /// Pre-execute an uncorrelated `EXISTS` / `NOT EXISTS` and return
737 /// the collapsed `Predicate::Always(bool)`.
738 fn pre_execute_uncorrelated_exists<S: ProjectionStore>(
739 &self,
740 store: &mut S,
741 subquery: &parser::ParsedSelect,
742 negated: bool,
743 params: &[Value],
744 ) -> Result<parser::Predicate> {
745 let inner_plan = planner::plan_query(&self.schema, subquery, params)?;
746 let inner_table_def = self
747 .schema
748 .get_table(&inner_plan.table_name().into())
749 .ok_or_else(|| QueryError::TableNotFound(inner_plan.table_name().to_string()))?;
750 let inner_result = executor::execute(store, &inner_plan, inner_table_def)?;
751 let exists = !inner_result.rows.is_empty();
752 let predicate_holds = if negated { !exists } else { exists };
753 Ok(parser::Predicate::Always(predicate_holds))
754 }
755
756 /// Execute a SELECT whose predicate list still contains at least
757 /// one correlated subquery (InSubquery or Exists that survived
758 /// `pre_execute_subqueries`).
759 ///
760 /// Strategy: split the predicate list into "simple" (non-correlated)
761 /// predicates and "correlated" ones. Plan the outer query using
762 /// only the simple predicates. For each returned row, substitute
763 /// outer column values into each correlated inner subquery and
764 /// execute it; keep the row iff all correlated predicates pass.
765 fn execute_correlated_query<S: ProjectionStore>(
766 &self,
767 store: &mut S,
768 parsed: &parser::ParsedSelect,
769 params: &[Value],
770 ) -> Result<QueryResult> {
771 // Split simple vs. correlated predicates.
772 let mut simple_preds: Vec<parser::Predicate> = Vec::new();
773 let mut correlated_preds: Vec<parser::Predicate> = Vec::new();
774 for pred in &parsed.predicates {
775 match pred {
776 parser::Predicate::InSubquery { .. } | parser::Predicate::Exists { .. } => {
777 correlated_preds.push(pred.clone());
778 }
779 other => simple_preds.push(other.clone()),
780 }
781 }
782
783 // Build the outer query stripped of correlated predicates — we
784 // need the FULL outer row (all columns) so we can substitute
785 // outer column values into each inner subquery, regardless of
786 // which columns the user SELECTed. We'll project to the
787 // requested columns after the row-by-row filter.
788 let outer_table_def = self
789 .schema
790 .get_table(&parsed.table.clone().into())
791 .ok_or_else(|| QueryError::TableNotFound(parsed.table.clone()))?;
792 let outer_scan = parser::ParsedSelect {
793 predicates: simple_preds,
794 columns: None, // force SELECT * so we have every column available
795 column_aliases: None,
796 order_by: Vec::new(),
797 limit: None,
798 offset: None,
799 aggregates: Vec::new(),
800 aggregate_filters: Vec::new(),
801 group_by: Vec::new(),
802 distinct: false,
803 having: Vec::new(),
804 ctes: Vec::new(),
805 window_fns: Vec::new(),
806 scalar_projections: Vec::new(),
807 case_columns: Vec::new(),
808 joins: Vec::new(),
809 ..parsed.clone()
810 };
811
812 let outer_plan = planner::plan_query(&self.schema, &outer_scan, params)?;
813 let outer_rows = executor::execute(store, &outer_plan, outer_table_def)?;
814
815 // Estimate correlated row-evaluation cost for cardinality guard.
816 //
817 // Inner cost per outer row is bounded by the total inner-table
818 // row count; we use the store's current table size as an upper
819 // bound. If multiple correlated predicates reference the same
820 // or different inner tables, sum the per-predicate cost.
821 let outer_count = outer_rows.rows.len() as u64;
822 let mut inner_cost_per_row: u64 = 0;
823 for pred in &correlated_preds {
824 let inner_table = match pred {
825 parser::Predicate::InSubquery { subquery, .. }
826 | parser::Predicate::Exists { subquery, .. } => &subquery.table,
827 _ => continue,
828 };
829 let inner_def = self
830 .schema
831 .get_table(&inner_table.clone().into())
832 .ok_or_else(|| QueryError::TableNotFound(inner_table.clone()))?;
833 // Upper-bound the inner cost by scanning the table once — we
834 // issue a bounded scan so this is cheap.
835 let pairs = store.scan(
836 inner_def.table_id,
837 kimberlite_store::Key::min()..kimberlite_store::Key::max(),
838 1_000_000,
839 )?;
840 inner_cost_per_row = inner_cost_per_row.saturating_add(pairs.len() as u64);
841 }
842 // When inner tables are empty, bound by 1 to keep estimation
843 // monotonic (so a 0 × N query doesn't look free).
844 let inner_cost_per_row = inner_cost_per_row.max(1);
845 let estimated = outer_count.saturating_mul(inner_cost_per_row);
846 if estimated > self.correlated_cap {
847 return Err(QueryError::CorrelatedCardinalityExceeded {
848 estimated,
849 cap: self.correlated_cap,
850 });
851 }
852
853 // For each outer row, evaluate the correlated predicates.
854 let outer_columns = outer_rows.columns.clone();
855 let outer_alias = parsed.table.clone();
856 let mut kept: Vec<Vec<Value>> = Vec::new();
857 for row in outer_rows.rows {
858 // Build the `"alias.column"` → Value binding map. We bind
859 // the FROM alias as it appears in the ParsedSelect (the
860 // parser already resolved user aliases into the `table`
861 // field when the alias shadows the table name).
862 let mut bindings = std::collections::HashMap::new();
863 for (col, val) in outer_columns.iter().zip(row.iter()) {
864 bindings.insert(format!("{outer_alias}.{col}"), val.clone());
865 // Also bind under every possible alias seen in the
866 // correlated predicates — the user may have written
867 // `p.id` while the parser stored the table name
868 // `patient_current`. We cover the common case by
869 // also binding the bare column name and any alias
870 // we can extract from the inner refs themselves.
871 bindings.insert(col.as_str().to_string(), val.clone());
872 }
873 // Extend bindings with each correlated-ref alias. Walking
874 // the correlated predicate trees once up-front is fine.
875 for pred in &correlated_preds {
876 let refs = correlated_predicate_outer_refs(pred);
877 for r in refs {
878 let col_idx = outer_columns
879 .iter()
880 .position(|c| c.as_str() == r.column.as_str());
881 if let Some(idx) = col_idx {
882 if let Some(v) = row.get(idx) {
883 bindings.insert(r.as_column_ref(), v.clone());
884 }
885 }
886 }
887 }
888
889 let mut all_pass = true;
890 for pred in &correlated_preds {
891 if !self.evaluate_correlated_predicate(store, pred, &bindings, params)? {
892 all_pass = false;
893 break;
894 }
895 }
896 if all_pass {
897 kept.push(row);
898 }
899 }
900
901 // Apply ORDER BY, LIMIT, OFFSET on the full rows before
902 // projecting to the user's requested column list.
903 // (Simple implementation: leverage the fact that we kept full
904 // rows. We construct a second plan that projects + orders +
905 // limits using a temporary store isn't worth it; do it inline.)
906 Self::post_process_correlated_result(parsed, params, outer_columns, kept)
907 }
908
909 /// Apply the outer query's projection / ORDER BY / LIMIT / OFFSET
910 /// to the rows surviving the correlated-predicate filter.
911 fn post_process_correlated_result(
912 parsed: &parser::ParsedSelect,
913 params: &[Value],
914 outer_columns: Vec<schema::ColumnName>,
915 mut rows: Vec<Vec<Value>>,
916 ) -> Result<QueryResult> {
917 // ORDER BY — bare-column only, resolved against outer_columns.
918 if !parsed.order_by.is_empty() {
919 let indices: Vec<(usize, bool)> = parsed
920 .order_by
921 .iter()
922 .map(|ob| {
923 let idx = outer_columns
924 .iter()
925 .position(|c| c == &ob.column)
926 .ok_or_else(|| QueryError::ColumnNotFound {
927 table: parsed.table.clone(),
928 column: ob.column.to_string(),
929 })?;
930 Ok::<_, QueryError>((idx, ob.ascending))
931 })
932 .collect::<Result<Vec<_>>>()?;
933 rows.sort_by(|a, b| {
934 for (idx, asc) in &indices {
935 let ord = a
936 .get(*idx)
937 .and_then(|av| b.get(*idx).and_then(|bv| av.compare(bv)))
938 .unwrap_or(std::cmp::Ordering::Equal);
939 let ord = if *asc { ord } else { ord.reverse() };
940 if ord != std::cmp::Ordering::Equal {
941 return ord;
942 }
943 }
944 std::cmp::Ordering::Equal
945 });
946 }
947
948 // OFFSET / LIMIT.
949 let offset = match parsed.offset {
950 Some(parser::LimitExpr::Literal(n)) => n,
951 Some(parser::LimitExpr::Param(idx)) => params
952 .get(idx.saturating_sub(1))
953 .and_then(|v| match v {
954 Value::BigInt(n) if *n >= 0 => Some(*n as usize),
955 Value::Integer(n) if *n >= 0 => Some(*n as usize),
956 _ => None,
957 })
958 .unwrap_or(0),
959 None => 0,
960 };
961 let limit = match parsed.limit {
962 Some(parser::LimitExpr::Literal(n)) => Some(n),
963 Some(parser::LimitExpr::Param(idx)) => {
964 params.get(idx.saturating_sub(1)).and_then(|v| match v {
965 Value::BigInt(n) if *n >= 0 => Some(*n as usize),
966 Value::Integer(n) if *n >= 0 => Some(*n as usize),
967 _ => None,
968 })
969 }
970 None => None,
971 };
972 if offset > 0 {
973 rows.drain(0..offset.min(rows.len()));
974 }
975 if let Some(l) = limit {
976 rows.truncate(l);
977 }
978
979 // Project to the requested column list.
980 let (out_columns, projected_rows) = match (&parsed.columns, &parsed.column_aliases) {
981 (None, _) => (outer_columns.clone(), rows),
982 (Some(cols), aliases) => {
983 let mut indices = Vec::with_capacity(cols.len());
984 let mut out_names: Vec<schema::ColumnName> = Vec::with_capacity(cols.len());
985 for (i, col) in cols.iter().enumerate() {
986 let idx = outer_columns.iter().position(|c| c == col).ok_or_else(|| {
987 QueryError::ColumnNotFound {
988 table: parsed.table.clone(),
989 column: col.to_string(),
990 }
991 })?;
992 indices.push(idx);
993 let alias = aliases
994 .as_ref()
995 .and_then(|a| a.get(i))
996 .and_then(|a| a.as_ref());
997 out_names.push(match alias {
998 Some(a) => schema::ColumnName::new(a.clone()),
999 None => col.clone(),
1000 });
1001 }
1002 let projected: Vec<Vec<Value>> = rows
1003 .into_iter()
1004 .map(|r| indices.iter().map(|i| r[*i].clone()).collect())
1005 .collect();
1006 (out_names, projected)
1007 }
1008 };
1009
1010 Ok(QueryResult {
1011 columns: out_columns,
1012 rows: projected_rows,
1013 })
1014 }
1015
1016 /// Evaluate a correlated `InSubquery` / `Exists` against one
1017 /// outer row (already baked into `bindings`). Returns true iff
1018 /// the predicate holds.
1019 fn evaluate_correlated_predicate<S: ProjectionStore>(
1020 &self,
1021 store: &mut S,
1022 pred: &parser::Predicate,
1023 bindings: &std::collections::HashMap<String, Value>,
1024 params: &[Value],
1025 ) -> Result<bool> {
1026 match pred {
1027 parser::Predicate::Exists { subquery, negated } => {
1028 let substituted = correlated::substitute_outer_refs(subquery, bindings);
1029 // The inner subquery may itself have nested subqueries;
1030 // run it through the full query engine path so nested
1031 // correlations (if any) are handled.
1032 let inner_result = self.execute_inner_subquery(store, &substituted, params)?;
1033 let exists = !inner_result.rows.is_empty();
1034 Ok(if *negated { !exists } else { exists })
1035 }
1036 parser::Predicate::InSubquery {
1037 column,
1038 subquery,
1039 negated,
1040 } => {
1041 let substituted = correlated::substitute_outer_refs(subquery, bindings);
1042 let inner_result = self.execute_inner_subquery(store, &substituted, params)?;
1043 if inner_result.columns.len() != 1 {
1044 return Err(QueryError::UnsupportedFeature(format!(
1045 "IN (SELECT ...) subquery must project exactly 1 column, got {}",
1046 inner_result.columns.len()
1047 )));
1048 }
1049 let outer_val = bindings
1050 .get(column.as_str())
1051 .or_else(|| bindings.values().next()) // defensive
1052 .cloned();
1053 let Some(outer_val) = outer_val else {
1054 return Ok(false);
1055 };
1056 let any_match = inner_result
1057 .rows
1058 .iter()
1059 .any(|row| row.first().is_some_and(|v| v == &outer_val));
1060 Ok(if *negated { !any_match } else { any_match })
1061 }
1062 _ => Err(QueryError::UnsupportedFeature(
1063 "evaluate_correlated_predicate called on non-subquery predicate".to_string(),
1064 )),
1065 }
1066 }
1067
1068 /// Execute an inner subquery (with all outer refs already
1069 /// substituted). Delegates to `plan_query` + `executor::execute`.
1070 fn execute_inner_subquery<S: ProjectionStore>(
1071 &self,
1072 store: &mut S,
1073 inner: &parser::ParsedSelect,
1074 params: &[Value],
1075 ) -> Result<QueryResult> {
1076 // An inner subquery post-substitution might itself contain
1077 // nested correlations or uncorrelated subqueries. Run the
1078 // pre-execute pass once more to handle those cases.
1079 let mut inner_clone = inner.clone();
1080 self.pre_execute_subqueries(store, &mut inner_clone, params)?;
1081 if has_correlated_predicate(&inner_clone.predicates) {
1082 // v0.6.0 caps nesting at one correlated level — the
1083 // outer loop is already one nesting.
1084 return Err(QueryError::UnsupportedFeature(
1085 "nested correlated subqueries (depth > 1) are not supported in v0.6.0".to_string(),
1086 ));
1087 }
1088 let plan = planner::plan_query(&self.schema, &inner_clone, params)?;
1089 let table_def = self
1090 .schema
1091 .get_table(&plan.table_name().into())
1092 .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
1093 executor::execute(store, &plan, table_def)
1094 }
1095
1096 /// Executes a `UNION`, `INTERSECT`, or `EXCEPT` query (with or without `ALL`).
1097 ///
1098 /// Implementation: materialise both sides, then combine according to the
1099 /// operator. `ALL` keeps multiset semantics; the bare form (no `ALL`)
1100 /// deduplicates by row content. `Value` doesn't implement `Hash`, so the
1101 /// dedup/intersect/except keys use the debug format of each row — same
1102 /// trick already used by the prior UNION implementation.
1103 fn execute_union<S: ProjectionStore>(
1104 &self,
1105 store: &mut S,
1106 union_stmt: &parser::ParsedUnion,
1107 params: &[Value],
1108 ) -> Result<QueryResult> {
1109 // Plan and execute left side
1110 let left_plan = planner::plan_query(&self.schema, &union_stmt.left, params)?;
1111 let left_table_def = self
1112 .schema
1113 .get_table(&left_plan.table_name().into())
1114 .ok_or_else(|| QueryError::TableNotFound(left_plan.table_name().to_string()))?;
1115 let left_result = executor::execute(store, &left_plan, left_table_def)?;
1116
1117 // Plan and execute right side
1118 let right_plan = planner::plan_query(&self.schema, &union_stmt.right, params)?;
1119 let right_table_def = self
1120 .schema
1121 .get_table(&right_plan.table_name().into())
1122 .ok_or_else(|| QueryError::TableNotFound(right_plan.table_name().to_string()))?;
1123 let right_result = executor::execute(store, &right_plan, right_table_def)?;
1124
1125 // Use left side column names for the result
1126 let column_names = left_result.columns;
1127
1128 let row_key = |row: &Vec<Value>| format!("{row:?}");
1129
1130 let combined_rows: Vec<Vec<Value>> = match (union_stmt.op, union_stmt.all) {
1131 // UNION ALL: concatenate, keep duplicates
1132 (parser::SetOp::Union, true) => {
1133 let mut all_rows = left_result.rows;
1134 all_rows.extend(right_result.rows);
1135 all_rows
1136 }
1137 // UNION: concatenate then dedup
1138 (parser::SetOp::Union, false) => {
1139 let mut all_rows = left_result.rows;
1140 all_rows.extend(right_result.rows);
1141 let mut seen = std::collections::HashSet::new();
1142 all_rows.retain(|row| seen.insert(row_key(row)));
1143 all_rows
1144 }
1145 // INTERSECT: rows present in both sides (set semantics)
1146 (parser::SetOp::Intersect, false) => {
1147 let right_keys: std::collections::HashSet<String> =
1148 right_result.rows.iter().map(&row_key).collect();
1149 let mut seen = std::collections::HashSet::new();
1150 left_result
1151 .rows
1152 .into_iter()
1153 .filter(|row| {
1154 let key = row_key(row);
1155 right_keys.contains(&key) && seen.insert(key)
1156 })
1157 .collect()
1158 }
1159 // INTERSECT ALL: keep multiplicities — for each row appearing
1160 // min(left_count, right_count) times, emit that many copies.
1161 (parser::SetOp::Intersect, true) => {
1162 let mut right_counts: std::collections::HashMap<String, usize> =
1163 std::collections::HashMap::new();
1164 for row in &right_result.rows {
1165 *right_counts.entry(row_key(row)).or_insert(0) += 1;
1166 }
1167 let mut out = Vec::new();
1168 for row in left_result.rows {
1169 let key = row_key(&row);
1170 if let Some(count) = right_counts.get_mut(&key) {
1171 if *count > 0 {
1172 *count -= 1;
1173 out.push(row);
1174 }
1175 }
1176 }
1177 out
1178 }
1179 // EXCEPT: rows in left side not in right side (set semantics)
1180 (parser::SetOp::Except, false) => {
1181 let right_keys: std::collections::HashSet<String> =
1182 right_result.rows.iter().map(&row_key).collect();
1183 let mut seen = std::collections::HashSet::new();
1184 left_result
1185 .rows
1186 .into_iter()
1187 .filter(|row| {
1188 let key = row_key(row);
1189 !right_keys.contains(&key) && seen.insert(key)
1190 })
1191 .collect()
1192 }
1193 // EXCEPT ALL: subtract multiplicities — left_count - right_count copies
1194 (parser::SetOp::Except, true) => {
1195 let mut right_counts: std::collections::HashMap<String, usize> =
1196 std::collections::HashMap::new();
1197 for row in &right_result.rows {
1198 *right_counts.entry(row_key(row)).or_insert(0) += 1;
1199 }
1200 let mut out = Vec::new();
1201 for row in left_result.rows {
1202 let key = row_key(&row);
1203 let count = right_counts.entry(key).or_insert(0);
1204 if *count > 0 {
1205 *count -= 1;
1206 } else {
1207 out.push(row);
1208 }
1209 }
1210 out
1211 }
1212 };
1213
1214 Ok(QueryResult {
1215 columns: column_names,
1216 rows: combined_rows,
1217 })
1218 }
1219
1220 /// Executes a SQL query at a specific log position (point-in-time query).
1221 ///
1222 /// This enables compliance queries that show the state as it was
1223 /// at a specific point in the log.
1224 ///
1225 /// # Arguments
1226 ///
1227 /// * `store` - The projection store to query
1228 /// * `sql` - SQL query string
1229 /// * `params` - Query parameters
1230 /// * `position` - Log position to query at
1231 ///
1232 /// # Example
1233 ///
1234 /// ```ignore
1235 /// // Get user state as of log position 1000
1236 /// let result = engine.query_at(
1237 /// &mut store,
1238 /// "SELECT * FROM users WHERE id = 1",
1239 /// &[],
1240 /// Offset::new(1000),
1241 /// )?;
1242 /// ```
1243 pub fn query_at<S: ProjectionStore>(
1244 &self,
1245 store: &mut S,
1246 sql: &str,
1247 params: &[Value],
1248 position: Offset,
1249 ) -> Result<QueryResult> {
1250 let stmt = self.parse_query_statement_cached(sql)?;
1251
1252 match stmt {
1253 parser::ParsedStatement::Select(parsed) => {
1254 let plan = planner::plan_query(&self.schema, &parsed, params)?;
1255 let table_def = self
1256 .schema
1257 .get_table(&plan.table_name().into())
1258 .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
1259 executor::execute_at(store, &plan, table_def, position)
1260 }
1261 parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
1262 "UNION is not supported in point-in-time queries".to_string(),
1263 )),
1264 _ => unreachable!("parse_query_statement only returns Select or Union"),
1265 }
1266 }
1267
1268 /// Executes a query against a historical snapshot selected by
1269 /// wall-clock timestamp (AUDIT-2026-04 L-4).
1270 ///
1271 /// This is the user-facing ergonomic form of
1272 /// [`Self::query_at`] — healthcare auditors ask "what did the
1273 /// chart look like on 2026-01-15?", not "what was log offset
1274 /// 948,274?". The caller supplies a `resolver` callback that
1275 /// translates a Unix-nanosecond timestamp into the log offset
1276 /// whose commit timestamp is the greatest value ≤ the target.
1277 ///
1278 /// The resolver is a callback rather than a hard dependency
1279 /// so the query crate does not take a direct dep on
1280 /// `kimberlite-compliance::audit` or the kernel's audit log.
1281 /// A typical impl performs a binary search on the in-memory
1282 /// audit index.
1283 ///
1284 /// # Errors
1285 ///
1286 /// - [`QueryError::UnsupportedFeature`] if the resolver
1287 /// returns `None` (no offset exists at or before the target
1288 /// — typically because the log is empty or the timestamp
1289 /// predates genesis).
1290 ///
1291 /// # Example
1292 ///
1293 /// ```ignore
1294 /// let resolver = |ts_ns: i64| -> Option<Offset> {
1295 /// audit_log.offset_at_or_before(ts_ns)
1296 /// };
1297 /// let result = engine.query_at_timestamp(
1298 /// &mut store,
1299 /// "SELECT * FROM charts WHERE patient_id = $1",
1300 /// &[Value::BigInt(42)],
1301 /// 1_760_000_000_000_000_000, // 2025-10-09T07:06:40Z in ns
1302 /// resolver,
1303 /// )?;
1304 /// ```
1305 pub fn query_at_timestamp<S, R>(
1306 &self,
1307 store: &mut S,
1308 sql: &str,
1309 params: &[Value],
1310 target_ns: i64,
1311 resolver: R,
1312 ) -> Result<QueryResult>
1313 where
1314 S: ProjectionStore,
1315 R: FnOnce(i64) -> Option<Offset>,
1316 {
1317 let offset = resolver(target_ns).ok_or_else(|| {
1318 QueryError::UnsupportedFeature(format!(
1319 "no log offset at or before timestamp {target_ns} ns \
1320 (empty log or predates genesis)"
1321 ))
1322 })?;
1323 self.query_at(store, sql, params, offset)
1324 }
1325
1326 /// Executes a query against a historical snapshot selected by
1327 /// wall-clock timestamp, with a resolver that can distinguish
1328 /// the "timestamp predates retention horizon" case from a plain
1329 /// "no offset found".
1330 ///
1331 /// v0.6.0 Tier 2 #6: this is the runtime-layer variant of
1332 /// [`Self::query_at_timestamp`] used by `TenantHandle::query`
1333 /// when the resolver has a concrete notion of a retention
1334 /// horizon (e.g. an in-memory timestamp index maintained at
1335 /// append time). The existing `query_at_timestamp` stays as-is
1336 /// so callers with an `Option<Offset>` resolver (e.g. ad-hoc
1337 /// binary search over an external index) keep working.
1338 ///
1339 /// # Resolution semantics
1340 ///
1341 /// - `TimestampResolution::Offset(o)` → execute at `o`.
1342 /// - `TimestampResolution::BeforeRetentionHorizon { horizon_ns }` →
1343 /// [`QueryError::AsOfBeforeRetentionHorizon`] with both the
1344 /// requested and horizon timestamps.
1345 /// - `TimestampResolution::LogEmpty` →
1346 /// [`QueryError::UnsupportedFeature`] (same message the
1347 /// ergonomic form emits for an empty log).
1348 pub fn query_at_timestamp_resolved<S, R>(
1349 &self,
1350 store: &mut S,
1351 sql: &str,
1352 params: &[Value],
1353 target_ns: i64,
1354 resolver: R,
1355 ) -> Result<QueryResult>
1356 where
1357 S: ProjectionStore,
1358 R: FnOnce(i64) -> TimestampResolution,
1359 {
1360 match resolver(target_ns) {
1361 TimestampResolution::Offset(offset) => self.query_at(store, sql, params, offset),
1362 TimestampResolution::BeforeRetentionHorizon { horizon_ns } => {
1363 Err(QueryError::AsOfBeforeRetentionHorizon {
1364 requested_ns: target_ns,
1365 horizon_ns,
1366 })
1367 }
1368 TimestampResolution::LogEmpty => Err(QueryError::UnsupportedFeature(format!(
1369 "no log offset at or before timestamp {target_ns} ns \
1370 (empty log or predates genesis)"
1371 ))),
1372 }
1373 }
1374
1375 /// AUDIT-2026-04 S3.3 — render a SQL query's access plan
1376 /// without executing it.
1377 ///
1378 /// Returns a deterministic multi-line tree string — same
1379 /// query always produces the same bytes, which lets apps
1380 /// diff plans across schema versions and catch unexpected
1381 /// regressions.
1382 ///
1383 /// The rendered plan **never reveals row data** — only table
1384 /// names, column counts, filter presence/absence, and LIMIT
1385 /// bounds. Masked column names render as their source name
1386 /// (masks are applied post-projection and are not a plan
1387 /// concern).
1388 ///
1389 /// # Errors
1390 ///
1391 /// Any error from parsing or planning (unsupported statement,
1392 /// missing table, etc.) propagates verbatim.
1393 ///
1394 /// # Example
1395 ///
1396 /// ```ignore
1397 /// let tree = engine.explain("SELECT * FROM patients WHERE id = $1", &[Value::BigInt(42)])?;
1398 /// println!("{tree}");
1399 /// // -> PointLookup [patients, cols=3]
1400 /// ```
1401 pub fn explain(&self, sql: &str, params: &[Value]) -> Result<String> {
1402 let stmt = self.parse_query_statement_cached(sql)?;
1403 match stmt {
1404 parser::ParsedStatement::Select(parsed) => {
1405 let plan = planner::plan_query(&self.schema, &parsed, params)?;
1406 Ok(explain::explain_plan(&plan))
1407 }
1408 parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
1409 "EXPLAIN does not yet render UNION plans".to_string(),
1410 )),
1411 _ => unreachable!("parse_query_statement only returns Select or Union"),
1412 }
1413 }
1414
1415 /// Parses a SQL query without executing it.
1416 ///
1417 /// Useful for validation or query plan inspection.
1418 pub fn prepare(&self, sql: &str, params: &[Value]) -> Result<PreparedQuery> {
1419 let stmt = self.parse_query_statement_cached(sql)?;
1420 let parsed = match stmt {
1421 parser::ParsedStatement::Select(s) => s,
1422 _ => {
1423 return Err(QueryError::UnsupportedFeature(
1424 "only SELECT queries can be prepared".to_string(),
1425 ));
1426 }
1427 };
1428 let plan = planner::plan_query(&self.schema, &parsed, params)?;
1429
1430 Ok(PreparedQuery {
1431 plan,
1432 schema: self.schema.clone(),
1433 })
1434 }
1435}
1436
1437/// A prepared (planned) query ready for execution.
1438#[derive(Debug, Clone)]
1439pub struct PreparedQuery {
1440 plan: plan::QueryPlan,
1441 schema: Schema,
1442}
1443
1444impl PreparedQuery {
1445 /// Executes this prepared query against the current store state.
1446 pub fn execute<S: ProjectionStore>(&self, store: &mut S) -> Result<QueryResult> {
1447 let table_def = self
1448 .schema
1449 .get_table(&self.plan.table_name().into())
1450 .ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
1451
1452 executor::execute(store, &self.plan, table_def)
1453 }
1454
1455 /// Executes this prepared query at a specific log position.
1456 pub fn execute_at<S: ProjectionStore>(
1457 &self,
1458 store: &mut S,
1459 position: Offset,
1460 ) -> Result<QueryResult> {
1461 let table_def = self
1462 .schema
1463 .get_table(&self.plan.table_name().into())
1464 .ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
1465
1466 executor::execute_at(store, &self.plan, table_def, position)
1467 }
1468
1469 /// Returns the column names this query will return.
1470 pub fn columns(&self) -> &[ColumnName] {
1471 self.plan.column_names()
1472 }
1473
1474 /// Returns the table name being queried.
1475 pub fn table_name(&self) -> &str {
1476 self.plan.table_name()
1477 }
1478}
1479
1480/// True iff any top-level predicate is a surviving correlated
1481/// subquery (`InSubquery` / `Exists`). Uncorrelated subqueries were
1482/// rewritten by `pre_execute_subqueries` before this is called.
1483fn has_correlated_predicate(predicates: &[parser::Predicate]) -> bool {
1484 predicates.iter().any(|p| {
1485 matches!(
1486 p,
1487 parser::Predicate::InSubquery { .. } | parser::Predicate::Exists { .. }
1488 )
1489 })
1490}
1491
1492/// Extract outer references from a surviving correlated predicate.
1493/// Used by the correlated-loop executor to populate the
1494/// `alias.column → Value` binding map for each outer row.
1495fn correlated_predicate_outer_refs(pred: &parser::Predicate) -> Vec<correlated::OuterRef> {
1496 let subquery = match pred {
1497 parser::Predicate::InSubquery { subquery, .. }
1498 | parser::Predicate::Exists { subquery, .. } => subquery,
1499 _ => return Vec::new(),
1500 };
1501 // Walk inner predicates gathering any qualified ColumnRef —
1502 // post-pre-execute, anything that remains as a ColumnRef with a
1503 // qualifier is an outer reference (the inner FROM has its own
1504 // bare-column refs).
1505 let mut out = Vec::new();
1506 for pred in &subquery.predicates {
1507 collect_refs_in_pred(pred, &mut out);
1508 }
1509 out
1510}
1511
1512fn collect_refs_in_pred(pred: &parser::Predicate, out: &mut Vec<correlated::OuterRef>) {
1513 let push_if_colref = |pv: &parser::PredicateValue, out: &mut Vec<correlated::OuterRef>| {
1514 if let parser::PredicateValue::ColumnRef(raw) = pv {
1515 if let Some((q, c)) = raw.split_once('.') {
1516 out.push(correlated::OuterRef {
1517 qualifier: q.to_string(),
1518 column: schema::ColumnName::new(c.to_string()),
1519 scope_depth: 1,
1520 });
1521 }
1522 }
1523 };
1524 match pred {
1525 parser::Predicate::Eq(_, v)
1526 | parser::Predicate::Lt(_, v)
1527 | parser::Predicate::Le(_, v)
1528 | parser::Predicate::Gt(_, v)
1529 | parser::Predicate::Ge(_, v) => push_if_colref(v, out),
1530 parser::Predicate::In(_, vs) | parser::Predicate::NotIn(_, vs) => {
1531 for v in vs {
1532 push_if_colref(v, out);
1533 }
1534 }
1535 parser::Predicate::NotBetween(_, lo, hi) => {
1536 push_if_colref(lo, out);
1537 push_if_colref(hi, out);
1538 }
1539 parser::Predicate::Or(l, r) => {
1540 for p in l {
1541 collect_refs_in_pred(p, out);
1542 }
1543 for p in r {
1544 collect_refs_in_pred(p, out);
1545 }
1546 }
1547 _ => {}
1548 }
1549}
1550
1551/// Converts a Value to a serde_json::Value for CTE materialization.
1552fn value_to_json(val: &Value) -> serde_json::Value {
1553 // NEVER: Placeholder values must be bound before reaching the CTE /
1554 // query-result JSON boundary. An unbound Placeholder here indicates a
1555 // parameter-binding bug (AUDIT fix: placeholders must be resolved upstream).
1556 kimberlite_properties::never!(
1557 matches!(val, Value::Placeholder(_)),
1558 "query.placeholder_reaches_result_boundary",
1559 "Value::Placeholder must never reach query-result / JSON serialization boundary"
1560 );
1561 match val {
1562 Value::Null | Value::Placeholder(_) => serde_json::Value::Null,
1563 Value::BigInt(i) => serde_json::json!(i),
1564 Value::TinyInt(i) => serde_json::json!(i),
1565 Value::SmallInt(i) => serde_json::json!(i),
1566 Value::Integer(i) => serde_json::json!(i),
1567 Value::Real(f) => serde_json::json!(f),
1568 Value::Decimal(v, scale) => {
1569 // Format decimal: store the raw value and scale as a string
1570 if *scale == 0 {
1571 serde_json::json!(v.to_string())
1572 } else {
1573 let divisor = 10i128.pow(u32::from(*scale));
1574 let whole = v / divisor;
1575 let frac = (v % divisor).unsigned_abs();
1576 serde_json::json!(format!("{whole}.{frac:0>width$}", width = *scale as usize))
1577 }
1578 }
1579 Value::Text(s) => serde_json::json!(s),
1580 Value::Boolean(b) => serde_json::json!(b),
1581 Value::Date(d) => serde_json::json!(d),
1582 Value::Time(t) => serde_json::json!(t),
1583 Value::Timestamp(ts) => serde_json::json!(ts.as_nanos()),
1584 Value::Uuid(u) => {
1585 // Format UUID bytes as hex string
1586 let hex: String = u.iter().map(|b| format!("{b:02x}")).collect();
1587 serde_json::json!(hex)
1588 }
1589 Value::Json(j) => j.clone(),
1590 Value::Bytes(b) => {
1591 use base64::Engine;
1592 serde_json::json!(base64::engine::general_purpose::STANDARD.encode(b))
1593 }
1594 }
1595}