Skip to main content

kimberlite_query/
lib.rs

1//! # kmb-query: SQL query layer for `Kimberlite` projections
2//!
3//! This crate provides a minimal SQL query engine for compliance lookups
4//! against the projection store.
5//!
6//! ## SQL Subset
7//!
8//! Supported SQL features:
9//! - `SELECT` with column list or `*`
10//! - `FROM` single table or `JOIN` (`INNER`, `LEFT`, `RIGHT`, `FULL OUTER`,
11//!   `CROSS`, with `ON` or `USING(...)` clauses)
12//! - `WHERE` with comparison predicates (`=`, `<`, `>`, `<=`, `>=`, `IN`)
13//! - `IN (SELECT)`, `NOT IN (SELECT)`, `EXISTS`, `NOT EXISTS` — both
14//!   uncorrelated (pre-execute fast path) and correlated (semi-join
15//!   decorrelation or nested-loop fallback; see
16//!   `docs/reference/sql/correlated-subqueries.md`)
17//! - `ORDER BY` (ascending/descending)
18//! - `LIMIT` and `OFFSET` — literal or `$N` parameter
19//! - `GROUP BY` with aggregates (`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`)
20//! - Aggregate `FILTER (WHERE ...)` clauses, independent per aggregate
21//! - `HAVING` with aggregate filtering
22//! - `UNION` / `UNION ALL` / `INTERSECT` / `INTERSECT ALL` /
23//!   `EXCEPT` / `EXCEPT ALL`
24//! - `DISTINCT`
25//! - JSON operators `->`, `->>`, `@>` in WHERE clauses
26//! - `CASE` (searched and simple form)
27//! - `WITH` (Common Table Expressions / CTEs), including `WITH RECURSIVE`
28//!   via iterative fixed-point evaluation (depth cap 1000)
29//! - Subqueries in FROM and JOIN (`SELECT * FROM (SELECT ...) AS t`)
30//! - Window functions (`OVER`, `PARTITION BY`, `ROW_NUMBER`, `RANK`,
31//!   `DENSE_RANK`, `LAG`, `LEAD`, `FIRST_VALUE`, `LAST_VALUE`)
32//! - `ALTER TABLE` (ADD COLUMN, DROP COLUMN) — parser only; kernel
33//!   execution pending
34//! - Parameterized queries (`$1`, `$2`, ...) in WHERE, LIMIT, OFFSET, and
35//!   DML values
36//!
37//! - Scalar functions in SELECT projection and WHERE predicates:
38//!   `UPPER`, `LOWER`, `LENGTH`, `TRIM`, `CONCAT`, `||`, `ABS`,
39//!   `ROUND`, `CEIL`/`CEILING`, `FLOOR`, `COALESCE`, `NULLIF`, `CAST`
40//! - `ILIKE`, `NOT LIKE`, `NOT ILIKE` pattern matching
41//! - `NOT IN (list)`, `NOT BETWEEN low AND high`
42//!
43//! Not yet supported:
44//! - Scalar subquery `WHERE col = (SELECT ...)`, `ANY`, `ALL`, `SOME`
45//! - Clock-dependent functions (`NOW()`, `CURRENT_DATE`, `EXTRACT`,
46//!   `DATE_TRUNC`) — deferred pending a clock-threading decision
47//! - `MOD`, `POWER`, `SQRT`, `SUBSTRING` — deferred
48//!
49//! ## Usage
50//!
51//! ```ignore
52//! use kimberlite_query::{QueryEngine, Schema, SchemaBuilder, ColumnDef, DataType, Value};
53//! use kimberlite_store::{BTreeStore, TableId};
54//!
55//! // Define schema
56//! let schema = SchemaBuilder::new()
57//!     .table(
58//!         "users",
59//!         TableId::new(1),
60//!         vec![
61//!             ColumnDef::new("id", DataType::BigInt).not_null(),
62//!             ColumnDef::new("name", DataType::Text).not_null(),
63//!         ],
64//!         vec!["id".into()],
65//!     )
66//!     .build();
67//!
68//! // Create engine
69//! let engine = QueryEngine::new(schema);
70//!
71//! // Execute query
72//! let mut store = BTreeStore::open("data/projections")?;
73//! let result = engine.query(&mut store, "SELECT * FROM users WHERE id = $1", &[Value::BigInt(42)])?;
74//! ```
75//!
76//! ## Point-in-Time Queries
77//!
78//! For compliance, you can query at a specific log position:
79//!
80//! ```ignore
81//! let result = engine.query_at(
82//!     &mut store,
83//!     "SELECT * FROM users WHERE id = 1",
84//!     &[],
85//!     Offset::new(1000),  // Query state as of log position 1000
86//! )?;
87//! ```
88//!
89//! ## Scalar expressions (v0.5.1)
90//!
91//! The parser accepts scalar functions in SELECT projection and WHERE
92//! predicates. Each of these queries parses cleanly and produces a
93//! `ParsedStatement::Select` with either a `ScalarCmp` predicate or
94//! entries in `scalar_projections`:
95//!
96//! ```
97//! use kimberlite_query::{parse_statement, ParsedStatement, Predicate};
98//!
99//! // WHERE col NOT IN (list) — mirror of IN, v0.5.1.
100//! let s = parse_statement("SELECT id FROM t WHERE x NOT IN (1, 2, 3)").unwrap();
101//! let ParsedStatement::Select(sel) = s else { panic!() };
102//! assert!(matches!(sel.predicates[0], Predicate::NotIn(_, _)));
103//!
104//! // WHERE UPPER(name) = 'ALICE' — scalar LHS routes to ScalarCmp.
105//! let s = parse_statement("SELECT id FROM t WHERE UPPER(name) = 'ALICE'").unwrap();
106//! let ParsedStatement::Select(sel) = s else { panic!() };
107//! assert!(matches!(sel.predicates[0], Predicate::ScalarCmp { .. }));
108//!
109//! // SELECT col AS alias — alias preserved end-to-end.
110//! let s = parse_statement("SELECT name AS display FROM t").unwrap();
111//! let ParsedStatement::Select(sel) = s else { panic!() };
112//! let aliases = sel.column_aliases.as_ref().unwrap();
113//! assert_eq!(aliases[0].as_deref(), Some("display"));
114//!
115//! // SELECT CAST(x AS INTEGER) — lands in scalar_projections with a
116//! // synthesised output column name.
117//! let s = parse_statement("SELECT CAST(x AS INTEGER) FROM t").unwrap();
118//! let ParsedStatement::Select(sel) = s else { panic!() };
119//! assert_eq!(sel.scalar_projections.len(), 1);
120//! assert_eq!(sel.scalar_projections[0].output_name.as_str(), "cast");
121//! ```
122
123pub mod correlated;
124pub mod depth_check;
125pub mod dml_planner;
126mod error;
127mod executor;
128pub mod explain;
129pub mod expression;
130pub mod information_schema;
131pub mod key_encoder;
132mod parse_cache;
133mod parser;
134mod plan;
135mod planner;
136pub mod rbac_filter;
137mod schema;
138mod value;
139pub mod window;
140
141#[cfg(test)]
142mod tests;
143
144// Re-export public types
145pub use error::{QueryError, Result};
146pub use executor::{QueryResult, Row, execute};
147pub use expression::{EvalContext, ScalarExpr, evaluate};
148pub use parser::{
149    AlterTableOperation, HavingCondition, HavingOp, OnConflictAction, OnConflictClause,
150    ParsedAlterTable, ParsedAttachMaskingPolicy, ParsedColumn, ParsedCreateIndex, ParsedCreateMask,
151    ParsedCreateMaskingPolicy, ParsedCreateTable, ParsedCreateUser, ParsedCte, ParsedDelete,
152    ParsedDetachMaskingPolicy, ParsedGrant, ParsedInsert, ParsedMaskingStrategy, ParsedSelect,
153    ParsedSetClassification, ParsedStatement, ParsedUnion, ParsedUpdate, Predicate, PredicateValue,
154    ScalarCmpOp, TimeTravel, UpsertExpr, expr_to_scalar_expr, extract_at_offset,
155    extract_time_travel, parse_statement, try_parse_custom_statement,
156};
157pub use planner::plan_query;
158pub use schema::{
159    ColumnDef, ColumnName, DataType, IndexDef, Schema, SchemaBuilder, TableDef, TableName,
160};
161pub use value::Value;
162
163use kimberlite_store::ProjectionStore;
164use kimberlite_types::Offset;
165
166/// Outcome returned by a timestamp→offset resolver.
167///
168/// v0.6.0 Tier 2 #6 — a resolver that owns its own index (the
169/// runtime's in-memory timestamp index, an audit-log-backed index,
170/// etc.) can distinguish three cases that `Option<Offset>` cannot:
171///
172/// - We found an offset at or before the requested timestamp.
173/// - The log has entries, but the earliest is *after* the requested
174///   timestamp — i.e. the request predates the retention horizon.
175///   Surfacing this as a distinct variant lets the query layer emit
176///   [`QueryError::AsOfBeforeRetentionHorizon`] with the horizon
177///   attached, which is far more actionable to the caller.
178/// - The log is empty — no timestamps recorded yet.
179///
180/// See [`QueryEngine::query_at_timestamp_resolved`] for the
181/// consumer of this type.
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183pub enum TimestampResolution {
184    /// Resolver found a projection offset whose commit timestamp is
185    /// the greatest value ≤ the target.
186    Offset(Offset),
187    /// Resolver has entries, but the earliest commit timestamp is
188    /// strictly greater than the target. `horizon_ns` is that
189    /// earliest timestamp, so callers can tell users "the oldest
190    /// retained data is `<horizon>`, try a later instant".
191    BeforeRetentionHorizon { horizon_ns: i64 },
192    /// Resolver has no entries at all (fresh DB or the index hasn't
193    /// been seeded yet). Indistinguishable from "predates genesis"
194    /// in observable behaviour — surfaced via `UnsupportedFeature`.
195    LogEmpty,
196}
197
198/// Query engine for executing SQL against a projection store.
199///
200/// Holds the schema plus an optional parse cache (AUDIT-2026-04
201/// S3.4). The engine is `Clone` — the parse cache is shared via
202/// `Arc` so cloned handles hit the same memoised entries.
203#[derive(Debug, Clone)]
204pub struct QueryEngine {
205    schema: Schema,
206    parse_cache: Option<std::sync::Arc<parse_cache::ParseCache>>,
207    /// Cap on `outer_rows × inner_rows_per_iter` for correlated
208    /// subquery loops. See `docs/reference/sql/correlated-subqueries.md`.
209    /// Defaults to [`correlated::DEFAULT_CORRELATED_CAP`] (10 million).
210    correlated_cap: u64,
211}
212
213impl QueryEngine {
214    /// Creates a new query engine with the given schema. No
215    /// parse cache is attached by default — use
216    /// [`Self::with_parse_cache`] to opt in.
217    pub fn new(schema: Schema) -> Self {
218        Self {
219            schema,
220            parse_cache: None,
221            correlated_cap: correlated::DEFAULT_CORRELATED_CAP,
222        }
223    }
224
225    /// Attach an LRU parse cache of the given size. `0` disables
226    /// caching (every call re-parses).
227    #[must_use]
228    pub fn with_parse_cache(mut self, max_size: usize) -> Self {
229        self.parse_cache = Some(std::sync::Arc::new(parse_cache::ParseCache::new(max_size)));
230        self
231    }
232
233    /// Override the correlated-subquery row-evaluation cap. Defaults
234    /// to 10,000,000. Queries whose estimated
235    /// `outer_rows × inner_rows_per_iter` exceeds this cap fail with
236    /// [`QueryError::CorrelatedCardinalityExceeded`] before the
237    /// correlated loop runs. Set to `u64::MAX` to effectively disable.
238    #[must_use]
239    pub fn with_correlated_cap(mut self, cap: u64) -> Self {
240        self.correlated_cap = cap;
241        self
242    }
243
244    /// Returns a snapshot of parse-cache stats, or `None` if no
245    /// cache is attached.
246    pub fn parse_cache_stats(&self) -> Option<parse_cache::ParseCacheStats> {
247        self.parse_cache
248            .as_deref()
249            .map(parse_cache::ParseCache::stats)
250    }
251
252    /// Clear the parse cache, if any.
253    pub fn clear_parse_cache(&self) {
254        if let Some(c) = &self.parse_cache {
255            c.clear();
256        }
257    }
258
259    /// Returns a reference to the schema.
260    pub fn schema(&self) -> &Schema {
261        &self.schema
262    }
263
264    /// Parses a SQL string and extracts the SELECT or UNION statement.
265    ///
266    /// Static variant — bypasses the parse cache. Used by call
267    /// sites that predate the cache and by internal recursive
268    /// parsers.
269    fn parse_query_statement(sql: &str) -> Result<parser::ParsedStatement> {
270        let stmt = parser::parse_statement(sql)?;
271        match &stmt {
272            parser::ParsedStatement::Select(_) | parser::ParsedStatement::Union(_) => Ok(stmt),
273            _ => Err(QueryError::UnsupportedFeature(
274                "only SELECT and UNION queries are supported".to_string(),
275            )),
276        }
277    }
278
279    /// Cache-aware parse wrapper.
280    ///
281    /// Looks the SQL up in the parse cache (if attached). On
282    /// miss, parses via [`Self::parse_query_statement`] and
283    /// inserts into the cache. Non-SELECT/UNION errors are
284    /// returned directly without populating the cache — they're
285    /// errors for every subsequent call anyway and we don't want
286    /// to memoise them.
287    fn parse_query_statement_cached(&self, sql: &str) -> Result<parser::ParsedStatement> {
288        if let Some(cache) = &self.parse_cache {
289            if let Some(stmt) = cache.get(sql) {
290                return Ok(stmt);
291            }
292        }
293        let stmt = Self::parse_query_statement(sql)?;
294        if let Some(cache) = &self.parse_cache {
295            cache.insert(sql.to_string(), stmt.clone());
296        }
297        Ok(stmt)
298    }
299
300    /// Executes a SQL query against the current store state.
301    ///
302    /// Supports SELECT and UNION/UNION ALL queries.
303    ///
304    /// # Arguments
305    ///
306    /// * `store` - The projection store to query
307    /// * `sql` - SQL query string
308    /// * `params` - Query parameters (for `$1`, `$2`, etc.)
309    ///
310    /// # Example
311    ///
312    /// ```ignore
313    /// let result = engine.query(
314    ///     &mut store,
315    ///     "SELECT name FROM users WHERE id = $1",
316    ///     &[Value::BigInt(42)],
317    /// )?;
318    /// ```
319    pub fn query<S: ProjectionStore>(
320        &self,
321        store: &mut S,
322        sql: &str,
323        params: &[Value],
324    ) -> Result<QueryResult> {
325        // AUDIT-2026-04 S3.5 — healthcare BREAK_GLASS prefix.
326        // `WITH BREAK_GLASS REASON='...' SELECT ...` strips the
327        // prefix, emits a warn-level structured log for the
328        // caller's audit pipeline to pick up, and falls through
329        // to the normal query path. The reason is the attribution
330        // value — enforcement (RBAC + masking) is still applied.
331        let (after_break_glass, break_glass_reason) = explain::extract_break_glass(sql);
332        if let Some(ref reason) = break_glass_reason {
333            tracing::warn!(
334                break_glass_reason = %reason,
335                "BREAK_GLASS query — regulator-visible audit signal",
336            );
337        }
338        let sql = after_break_glass;
339
340        // AUDIT-2026-04 S3.4 — `information_schema.*` virtual-table
341        // interception. Synthesises results from the live schema
342        // without going through the planner/executor. Callers
343        // that point FROM at `information_schema.tables` or
344        // `.columns` get back schema introspection rows without
345        // the table needing to be registered in the store.
346        if let Some(result) = information_schema::maybe_answer(sql, &self.schema) {
347            return Ok(result);
348        }
349
350        // AUDIT-2026-04 S3.3 — EXPLAIN prefix dispatch. A caller
351        // issuing `EXPLAIN SELECT ...` gets a single-row result
352        // whose only value is the rendered plan tree, rather
353        // than executing the statement.
354        let (after_explain, is_explain) = explain::extract_explain(sql);
355        if is_explain {
356            let plan_text = self.explain(after_explain, params)?;
357            return Ok(executor::QueryResult {
358                columns: vec!["plan".into()],
359                rows: vec![vec![Value::Text(plan_text)]],
360            });
361        }
362        let sql = after_explain; // equivalent to original `sql` when EXPLAIN absent
363
364        // Extract time-travel clause (AT OFFSET / FOR SYSTEM_TIME AS OF / AS OF)
365        // before passing SQL to sqlparser. Offset syntax dispatches
366        // directly; timestamp syntax without a resolver errors out
367        // with a clear message pointing to
368        // `query_at_timestamp(..., resolver)`.
369        let (cleaned_sql, time_travel) = parser::extract_time_travel(sql);
370        match time_travel {
371            Some(parser::TimeTravel::Offset(o)) => {
372                return self.query_at(store, &cleaned_sql, params, Offset::new(o));
373            }
374            Some(parser::TimeTravel::TimestampNs(_)) => {
375                return Err(QueryError::UnsupportedFeature(
376                    "FOR SYSTEM_TIME AS OF '<iso>' / AS OF '<iso>' \
377                     requires a timestamp→offset resolver — use \
378                     QueryEngine::query_at_timestamp(..., resolver)"
379                        .to_string(),
380                ));
381            }
382            None => {}
383        }
384
385        let stmt = self.parse_query_statement_cached(sql)?;
386
387        match stmt {
388            parser::ParsedStatement::Select(mut parsed) => {
389                // Pre-execute uncorrelated subqueries (IN/EXISTS/NOT EXISTS),
390                // attempt semi-join decorrelation of correlated ones, and
391                // leave remaining correlated predicates in place for the
392                // correlated-loop executor.
393                //
394                // `outer_scope` is the enclosing scope stack visible to the
395                // subquery; at the top level we seed it with the outer
396                // SELECT's FROM table so the walker can detect
397                // correlation.
398                self.pre_execute_subqueries(store, &mut parsed, params)?;
399
400                let window_fns = parsed.window_fns.clone();
401                let result = if parsed.ctes.is_empty() {
402                    if has_correlated_predicate(&parsed.predicates) {
403                        self.execute_correlated_query(store, &parsed, params)?
404                    } else {
405                        // `plan_query` folds time-now sentinels via
406                        // AUDIT-2026-05 S3.7 — see `planner.rs`.
407                        let plan = planner::plan_query(&self.schema, &parsed, params)?;
408                        let table_def = self
409                            .schema
410                            .get_table(&plan.table_name().into())
411                            .ok_or_else(|| {
412                                QueryError::TableNotFound(plan.table_name().to_string())
413                            })?;
414                        executor::execute(store, &plan, table_def)?
415                    }
416                } else {
417                    self.execute_with_ctes(store, &parsed, params)?
418                };
419                // AUDIT-2026-04 S3.2 — window functions are a
420                // post-pass over the base SELECT result; the base
421                // plan already projected the columns the window
422                // fn references.
423                window::apply_window_fns(result, &window_fns)
424            }
425            parser::ParsedStatement::Union(union_stmt) => {
426                self.execute_union(store, &union_stmt, params)
427            }
428            _ => unreachable!("parse_query_statement only returns Select or Union"),
429        }
430    }
431
432    /// Executes a SELECT with CTEs by materializing each CTE and building
433    /// a temporary schema that includes the CTE result sets as tables.
434    fn execute_with_ctes<S: ProjectionStore>(
435        &self,
436        store: &mut S,
437        parsed: &parser::ParsedSelect,
438        params: &[Value],
439    ) -> Result<QueryResult> {
440        // Build an extended schema that includes CTE-derived tables
441        let mut extended_schema = self.schema.clone();
442
443        // Materialize each CTE
444        for cte in &parsed.ctes {
445            // Execute the CTE's anchor (non-recursive) query
446            let cte_plan = planner::plan_query(&extended_schema, &cte.query, params)?;
447            let cte_table_def = extended_schema
448                .get_table(&cte_plan.table_name().into())
449                .ok_or_else(|| QueryError::TableNotFound(cte_plan.table_name().to_string()))?;
450            let mut cte_result = executor::execute(store, &cte_plan, cte_table_def)?;
451
452            // Register CTE result as a virtual table in the extended schema
453            // Use a synthetic table ID based on the CTE name hash
454            let cte_table_id = {
455                use std::collections::hash_map::DefaultHasher;
456                use std::hash::{Hash, Hasher};
457                let mut hasher = DefaultHasher::new();
458                cte.name.hash(&mut hasher);
459                kimberlite_store::TableId::new(hasher.finish())
460            };
461
462            // Build column defs from the CTE result
463            let cte_columns: Vec<schema::ColumnDef> = cte_result
464                .columns
465                .iter()
466                .map(|col| schema::ColumnDef::new(col.as_str(), schema::DataType::Text))
467                .collect();
468
469            let pk_cols = if cte_columns.is_empty() {
470                vec![]
471            } else {
472                vec![cte_result.columns[0].clone()]
473            };
474
475            let cte_table = schema::TableDef::new(cte_table_id, cte_columns, pk_cols);
476            extended_schema.add_table(cte.name.as_str(), cte_table);
477
478            // Write anchor rows into the store as the initial working set.
479            let mut total_rows = cte_result.rows.len();
480            for (row_idx, row) in cte_result.rows.iter().enumerate() {
481                Self::write_cte_row(store, cte_table_id, row_idx, &cte_result.columns, row)?;
482            }
483
484            // Recursive iteration: keep evaluating the recursive arm against
485            // the growing CTE table until no new rows are produced or the
486            // depth cap is hit. Iterative fixed-point — honours the
487            // workspace "no recursion" constraint and prevents runaway loops.
488            if let Some(recursive_select) = &cte.recursive_arm {
489                const MAX_RECURSIVE_DEPTH: usize = 1000;
490                let mut seen: std::collections::HashSet<String> =
491                    cte_result.rows.iter().map(|r| format!("{r:?}")).collect();
492
493                for depth in 0..MAX_RECURSIVE_DEPTH {
494                    let recursive_plan =
495                        planner::plan_query(&extended_schema, recursive_select, params)?;
496                    let recursive_table_def = extended_schema
497                        .get_table(&recursive_plan.table_name().into())
498                        .ok_or_else(|| {
499                            QueryError::TableNotFound(recursive_plan.table_name().to_string())
500                        })?;
501                    let iteration_result =
502                        executor::execute(store, &recursive_plan, recursive_table_def)?;
503
504                    let mut new_rows = 0usize;
505                    for row in iteration_result.rows {
506                        let key = format!("{row:?}");
507                        if seen.insert(key) {
508                            Self::write_cte_row(
509                                store,
510                                cte_table_id,
511                                total_rows,
512                                &cte_result.columns,
513                                &row,
514                            )?;
515                            cte_result.rows.push(row);
516                            total_rows += 1;
517                            new_rows += 1;
518                        }
519                    }
520                    if new_rows == 0 {
521                        break;
522                    }
523                    if depth + 1 == MAX_RECURSIVE_DEPTH {
524                        return Err(QueryError::UnsupportedFeature(format!(
525                            "recursive CTE `{}` exceeded maximum depth of {MAX_RECURSIVE_DEPTH} iterations",
526                            cte.name
527                        )));
528                    }
529                }
530            }
531        }
532
533        // Execute the main query against the extended schema
534        let main_query = parser::ParsedSelect {
535            ctes: vec![], // CTEs already materialized
536            ..parsed.clone()
537        };
538
539        let plan = planner::plan_query(&extended_schema, &main_query, params)?;
540        let table_def = extended_schema
541            .get_table(&plan.table_name().into())
542            .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
543        executor::execute(store, &plan, table_def)
544    }
545
546    /// Writes a single CTE row into the store under the synthetic table id.
547    /// Helper extracted so the recursive-CTE iteration loop and the initial
548    /// anchor materialisation share the same path.
549    fn write_cte_row<S: ProjectionStore>(
550        store: &mut S,
551        cte_table_id: kimberlite_store::TableId,
552        row_idx: usize,
553        columns: &[crate::schema::ColumnName],
554        row: &[Value],
555    ) -> Result<()> {
556        let mut row_map = serde_json::Map::new();
557        for (col, val) in columns.iter().zip(row.iter()) {
558            row_map.insert(col.as_str().to_string(), value_to_json(val));
559        }
560        let json_val = serde_json::to_vec(&serde_json::Value::Object(row_map)).map_err(|e| {
561            QueryError::UnsupportedFeature(format!("CTE serialization failed: {e}"))
562        })?;
563        let pk_key = crate::key_encoder::encode_key(&[Value::BigInt(row_idx as i64)]);
564        let batch = kimberlite_store::WriteBatch::new(kimberlite_types::Offset::new(
565            store.applied_position().as_u64() + 1,
566        ))
567        .put(cte_table_id, pk_key, bytes::Bytes::from(json_val));
568        store.apply(batch)?;
569        Ok(())
570    }
571
572    /// Walks the outer SELECT's predicate tree, classifies each
573    /// subquery as uncorrelated or correlated, and rewrites as follows:
574    ///
575    /// - **Uncorrelated** `IN (SELECT)` / `EXISTS` / `NOT EXISTS` →
576    ///   pre-executed once; result substituted into `Predicate::In` /
577    ///   `Predicate::NotIn` / `Predicate::Always(bool)`. Matches the
578    ///   v0.5.0 fast path.
579    /// - **Correlated** `EXISTS` / `NOT EXISTS` with a single equijoin
580    ///   to the outer scope: rewritten by
581    ///   [`correlated::try_semi_join_rewrite`] into
582    ///   `Predicate::InSubquery { negated }` against the outer column,
583    ///   then the uncorrelated path pre-executes it.
584    /// - **Correlated** everything else: left in place for
585    ///   [`Self::execute_correlated_query`] to handle per-outer-row.
586    ///
587    /// Assertion: on return, every `InSubquery` / `Exists` still in
588    /// `parsed.predicates` is correlated. The caller's
589    /// `has_correlated_predicate` check uses that invariant to
590    /// dispatch.
591    fn pre_execute_subqueries<S: ProjectionStore>(
592        &self,
593        store: &mut S,
594        parsed: &mut parser::ParsedSelect,
595        params: &[Value],
596    ) -> Result<()> {
597        // Build the outer scope — the outer SELECT's FROM table (plus
598        // any JOIN tables) as the enclosing scope for each inner
599        // subquery.
600        let outer_scope = self.build_outer_scope(parsed);
601
602        let preds = std::mem::take(&mut parsed.predicates);
603        let mut out = Vec::with_capacity(preds.len());
604        for pred in preds {
605            out.push(self.classify_and_rewrite_predicate(store, pred, &outer_scope, params)?);
606        }
607        parsed.predicates = out;
608        Ok(())
609    }
610
611    /// Construct the outer `PlannerScope` for `parsed` — the visible
612    /// tables in the outer SELECT. FROM first, then any JOIN tables.
613    fn build_outer_scope<'s>(
614        &'s self,
615        parsed: &parser::ParsedSelect,
616    ) -> correlated::PlannerScope<'s> {
617        let mut bindings: Vec<(String, &schema::TableDef)> = Vec::new();
618        if let Some(t) = self.schema.get_table(&parsed.table.clone().into()) {
619            bindings.push((parsed.table.clone(), t));
620        }
621        for join in &parsed.joins {
622            if let Some(t) = self.schema.get_table(&join.table.clone().into()) {
623                bindings.push((join.table.clone(), t));
624            }
625        }
626        correlated::PlannerScope::empty().push(bindings)
627    }
628
629    /// Classifier for a single predicate — drives pre-execute vs.
630    /// semi-join rewrite vs. leave-as-correlated. Recursive on `Or`.
631    fn classify_and_rewrite_predicate<S: ProjectionStore>(
632        &self,
633        store: &mut S,
634        pred: parser::Predicate,
635        outer_scope: &correlated::PlannerScope<'_>,
636        params: &[Value],
637    ) -> Result<parser::Predicate> {
638        match pred {
639            parser::Predicate::InSubquery {
640                column,
641                subquery,
642                negated,
643            } => {
644                let outer_refs =
645                    correlated::collect_outer_refs(&subquery, outer_scope, &self.schema);
646                if outer_refs.is_empty() {
647                    // Uncorrelated — pre-execute and substitute.
648                    self.pre_execute_uncorrelated_in(store, &column, &subquery, negated, params)
649                } else {
650                    // Correlated IN/NOT IN — keep the predicate
651                    // in place; the correlated-loop executor handles it.
652                    Ok(parser::Predicate::InSubquery {
653                        column,
654                        subquery,
655                        negated,
656                    })
657                }
658            }
659            parser::Predicate::Exists { subquery, negated } => {
660                let outer_refs =
661                    correlated::collect_outer_refs(&subquery, outer_scope, &self.schema);
662                if outer_refs.is_empty() {
663                    // Uncorrelated.
664                    self.pre_execute_uncorrelated_exists(store, &subquery, negated, params)
665                } else if let Some((outer_col, rewritten)) =
666                    correlated::try_semi_join_rewrite(&subquery, negated, &outer_refs)
667                {
668                    // Decorrelated: rewrite as IN (SELECT) / NOT IN (SELECT)
669                    // against the outer column, then pre-execute.
670                    self.pre_execute_uncorrelated_in(store, &outer_col, &rewritten, negated, params)
671                } else {
672                    // Correlated loop fallback.
673                    Ok(parser::Predicate::Exists { subquery, negated })
674                }
675            }
676            parser::Predicate::Or(left, right) => {
677                let mut new_left = Vec::with_capacity(left.len());
678                for p in left {
679                    new_left.push(self.classify_and_rewrite_predicate(
680                        store,
681                        p,
682                        outer_scope,
683                        params,
684                    )?);
685                }
686                let mut new_right = Vec::with_capacity(right.len());
687                for p in right {
688                    new_right.push(self.classify_and_rewrite_predicate(
689                        store,
690                        p,
691                        outer_scope,
692                        params,
693                    )?);
694                }
695                Ok(parser::Predicate::Or(new_left, new_right))
696            }
697            other => Ok(other),
698        }
699    }
700
701    /// Pre-execute an uncorrelated `IN (SELECT)` / `NOT IN (SELECT)`
702    /// and return the substituted `Predicate::In` / `Predicate::NotIn`.
703    fn pre_execute_uncorrelated_in<S: ProjectionStore>(
704        &self,
705        store: &mut S,
706        column: &schema::ColumnName,
707        subquery: &parser::ParsedSelect,
708        negated: bool,
709        params: &[Value],
710    ) -> Result<parser::Predicate> {
711        let inner_plan = planner::plan_query(&self.schema, subquery, params)?;
712        let inner_table_def = self
713            .schema
714            .get_table(&inner_plan.table_name().into())
715            .ok_or_else(|| QueryError::TableNotFound(inner_plan.table_name().to_string()))?;
716        let inner_result = executor::execute(store, &inner_plan, inner_table_def)?;
717        if inner_result.columns.len() != 1 {
718            return Err(QueryError::UnsupportedFeature(format!(
719                "IN (SELECT ...) subquery must project exactly 1 column, got {}",
720                inner_result.columns.len()
721            )));
722        }
723        let values: Vec<parser::PredicateValue> = inner_result
724            .rows
725            .into_iter()
726            .filter_map(|row| row.into_iter().next())
727            .map(parser::PredicateValue::Literal)
728            .collect();
729        Ok(if negated {
730            parser::Predicate::NotIn(column.clone(), values)
731        } else {
732            parser::Predicate::In(column.clone(), values)
733        })
734    }
735
736    /// Pre-execute an uncorrelated `EXISTS` / `NOT EXISTS` and return
737    /// the collapsed `Predicate::Always(bool)`.
738    fn pre_execute_uncorrelated_exists<S: ProjectionStore>(
739        &self,
740        store: &mut S,
741        subquery: &parser::ParsedSelect,
742        negated: bool,
743        params: &[Value],
744    ) -> Result<parser::Predicate> {
745        let inner_plan = planner::plan_query(&self.schema, subquery, params)?;
746        let inner_table_def = self
747            .schema
748            .get_table(&inner_plan.table_name().into())
749            .ok_or_else(|| QueryError::TableNotFound(inner_plan.table_name().to_string()))?;
750        let inner_result = executor::execute(store, &inner_plan, inner_table_def)?;
751        let exists = !inner_result.rows.is_empty();
752        let predicate_holds = if negated { !exists } else { exists };
753        Ok(parser::Predicate::Always(predicate_holds))
754    }
755
756    /// Execute a SELECT whose predicate list still contains at least
757    /// one correlated subquery (InSubquery or Exists that survived
758    /// `pre_execute_subqueries`).
759    ///
760    /// Strategy: split the predicate list into "simple" (non-correlated)
761    /// predicates and "correlated" ones. Plan the outer query using
762    /// only the simple predicates. For each returned row, substitute
763    /// outer column values into each correlated inner subquery and
764    /// execute it; keep the row iff all correlated predicates pass.
765    fn execute_correlated_query<S: ProjectionStore>(
766        &self,
767        store: &mut S,
768        parsed: &parser::ParsedSelect,
769        params: &[Value],
770    ) -> Result<QueryResult> {
771        // Split simple vs. correlated predicates.
772        let mut simple_preds: Vec<parser::Predicate> = Vec::new();
773        let mut correlated_preds: Vec<parser::Predicate> = Vec::new();
774        for pred in &parsed.predicates {
775            match pred {
776                parser::Predicate::InSubquery { .. } | parser::Predicate::Exists { .. } => {
777                    correlated_preds.push(pred.clone());
778                }
779                other => simple_preds.push(other.clone()),
780            }
781        }
782
783        // Build the outer query stripped of correlated predicates — we
784        // need the FULL outer row (all columns) so we can substitute
785        // outer column values into each inner subquery, regardless of
786        // which columns the user SELECTed. We'll project to the
787        // requested columns after the row-by-row filter.
788        let outer_table_def = self
789            .schema
790            .get_table(&parsed.table.clone().into())
791            .ok_or_else(|| QueryError::TableNotFound(parsed.table.clone()))?;
792        let outer_scan = parser::ParsedSelect {
793            predicates: simple_preds,
794            columns: None, // force SELECT * so we have every column available
795            column_aliases: None,
796            order_by: Vec::new(),
797            limit: None,
798            offset: None,
799            aggregates: Vec::new(),
800            aggregate_filters: Vec::new(),
801            group_by: Vec::new(),
802            distinct: false,
803            having: Vec::new(),
804            ctes: Vec::new(),
805            window_fns: Vec::new(),
806            scalar_projections: Vec::new(),
807            case_columns: Vec::new(),
808            joins: Vec::new(),
809            ..parsed.clone()
810        };
811
812        let outer_plan = planner::plan_query(&self.schema, &outer_scan, params)?;
813        let outer_rows = executor::execute(store, &outer_plan, outer_table_def)?;
814
815        // Estimate correlated row-evaluation cost for cardinality guard.
816        //
817        // Inner cost per outer row is bounded by the total inner-table
818        // row count; we use the store's current table size as an upper
819        // bound. If multiple correlated predicates reference the same
820        // or different inner tables, sum the per-predicate cost.
821        let outer_count = outer_rows.rows.len() as u64;
822        let mut inner_cost_per_row: u64 = 0;
823        for pred in &correlated_preds {
824            let inner_table = match pred {
825                parser::Predicate::InSubquery { subquery, .. }
826                | parser::Predicate::Exists { subquery, .. } => &subquery.table,
827                _ => continue,
828            };
829            let inner_def = self
830                .schema
831                .get_table(&inner_table.clone().into())
832                .ok_or_else(|| QueryError::TableNotFound(inner_table.clone()))?;
833            // Upper-bound the inner cost by scanning the table once — we
834            // issue a bounded scan so this is cheap.
835            let pairs = store.scan(
836                inner_def.table_id,
837                kimberlite_store::Key::min()..kimberlite_store::Key::max(),
838                1_000_000,
839            )?;
840            inner_cost_per_row = inner_cost_per_row.saturating_add(pairs.len() as u64);
841        }
842        // When inner tables are empty, bound by 1 to keep estimation
843        // monotonic (so a 0 × N query doesn't look free).
844        let inner_cost_per_row = inner_cost_per_row.max(1);
845        let estimated = outer_count.saturating_mul(inner_cost_per_row);
846        if estimated > self.correlated_cap {
847            return Err(QueryError::CorrelatedCardinalityExceeded {
848                estimated,
849                cap: self.correlated_cap,
850            });
851        }
852
853        // For each outer row, evaluate the correlated predicates.
854        let outer_columns = outer_rows.columns.clone();
855        let outer_alias = parsed.table.clone();
856        let mut kept: Vec<Vec<Value>> = Vec::new();
857        for row in outer_rows.rows {
858            // Build the `"alias.column"` → Value binding map. We bind
859            // the FROM alias as it appears in the ParsedSelect (the
860            // parser already resolved user aliases into the `table`
861            // field when the alias shadows the table name).
862            let mut bindings = std::collections::HashMap::new();
863            for (col, val) in outer_columns.iter().zip(row.iter()) {
864                bindings.insert(format!("{outer_alias}.{col}"), val.clone());
865                // Also bind under every possible alias seen in the
866                // correlated predicates — the user may have written
867                // `p.id` while the parser stored the table name
868                // `patient_current`. We cover the common case by
869                // also binding the bare column name and any alias
870                // we can extract from the inner refs themselves.
871                bindings.insert(col.as_str().to_string(), val.clone());
872            }
873            // Extend bindings with each correlated-ref alias. Walking
874            // the correlated predicate trees once up-front is fine.
875            for pred in &correlated_preds {
876                let refs = correlated_predicate_outer_refs(pred);
877                for r in refs {
878                    let col_idx = outer_columns
879                        .iter()
880                        .position(|c| c.as_str() == r.column.as_str());
881                    if let Some(idx) = col_idx {
882                        if let Some(v) = row.get(idx) {
883                            bindings.insert(r.as_column_ref(), v.clone());
884                        }
885                    }
886                }
887            }
888
889            let mut all_pass = true;
890            for pred in &correlated_preds {
891                if !self.evaluate_correlated_predicate(store, pred, &bindings, params)? {
892                    all_pass = false;
893                    break;
894                }
895            }
896            if all_pass {
897                kept.push(row);
898            }
899        }
900
901        // Apply ORDER BY, LIMIT, OFFSET on the full rows before
902        // projecting to the user's requested column list.
903        // (Simple implementation: leverage the fact that we kept full
904        // rows. We construct a second plan that projects + orders +
905        // limits using a temporary store isn't worth it; do it inline.)
906        Self::post_process_correlated_result(parsed, params, outer_columns, kept)
907    }
908
909    /// Apply the outer query's projection / ORDER BY / LIMIT / OFFSET
910    /// to the rows surviving the correlated-predicate filter.
911    fn post_process_correlated_result(
912        parsed: &parser::ParsedSelect,
913        params: &[Value],
914        outer_columns: Vec<schema::ColumnName>,
915        mut rows: Vec<Vec<Value>>,
916    ) -> Result<QueryResult> {
917        // ORDER BY — bare-column only, resolved against outer_columns.
918        if !parsed.order_by.is_empty() {
919            let indices: Vec<(usize, bool)> = parsed
920                .order_by
921                .iter()
922                .map(|ob| {
923                    let idx = outer_columns
924                        .iter()
925                        .position(|c| c == &ob.column)
926                        .ok_or_else(|| QueryError::ColumnNotFound {
927                            table: parsed.table.clone(),
928                            column: ob.column.to_string(),
929                        })?;
930                    Ok::<_, QueryError>((idx, ob.ascending))
931                })
932                .collect::<Result<Vec<_>>>()?;
933            rows.sort_by(|a, b| {
934                for (idx, asc) in &indices {
935                    let ord = a
936                        .get(*idx)
937                        .and_then(|av| b.get(*idx).and_then(|bv| av.compare(bv)))
938                        .unwrap_or(std::cmp::Ordering::Equal);
939                    let ord = if *asc { ord } else { ord.reverse() };
940                    if ord != std::cmp::Ordering::Equal {
941                        return ord;
942                    }
943                }
944                std::cmp::Ordering::Equal
945            });
946        }
947
948        // OFFSET / LIMIT.
949        let offset = match parsed.offset {
950            Some(parser::LimitExpr::Literal(n)) => n,
951            Some(parser::LimitExpr::Param(idx)) => params
952                .get(idx.saturating_sub(1))
953                .and_then(|v| match v {
954                    Value::BigInt(n) if *n >= 0 => Some(*n as usize),
955                    Value::Integer(n) if *n >= 0 => Some(*n as usize),
956                    _ => None,
957                })
958                .unwrap_or(0),
959            None => 0,
960        };
961        let limit = match parsed.limit {
962            Some(parser::LimitExpr::Literal(n)) => Some(n),
963            Some(parser::LimitExpr::Param(idx)) => {
964                params.get(idx.saturating_sub(1)).and_then(|v| match v {
965                    Value::BigInt(n) if *n >= 0 => Some(*n as usize),
966                    Value::Integer(n) if *n >= 0 => Some(*n as usize),
967                    _ => None,
968                })
969            }
970            None => None,
971        };
972        if offset > 0 {
973            rows.drain(0..offset.min(rows.len()));
974        }
975        if let Some(l) = limit {
976            rows.truncate(l);
977        }
978
979        // Project to the requested column list.
980        let (out_columns, projected_rows) = match (&parsed.columns, &parsed.column_aliases) {
981            (None, _) => (outer_columns.clone(), rows),
982            (Some(cols), aliases) => {
983                let mut indices = Vec::with_capacity(cols.len());
984                let mut out_names: Vec<schema::ColumnName> = Vec::with_capacity(cols.len());
985                for (i, col) in cols.iter().enumerate() {
986                    let idx = outer_columns.iter().position(|c| c == col).ok_or_else(|| {
987                        QueryError::ColumnNotFound {
988                            table: parsed.table.clone(),
989                            column: col.to_string(),
990                        }
991                    })?;
992                    indices.push(idx);
993                    let alias = aliases
994                        .as_ref()
995                        .and_then(|a| a.get(i))
996                        .and_then(|a| a.as_ref());
997                    out_names.push(match alias {
998                        Some(a) => schema::ColumnName::new(a.clone()),
999                        None => col.clone(),
1000                    });
1001                }
1002                let projected: Vec<Vec<Value>> = rows
1003                    .into_iter()
1004                    .map(|r| indices.iter().map(|i| r[*i].clone()).collect())
1005                    .collect();
1006                (out_names, projected)
1007            }
1008        };
1009
1010        Ok(QueryResult {
1011            columns: out_columns,
1012            rows: projected_rows,
1013        })
1014    }
1015
1016    /// Evaluate a correlated `InSubquery` / `Exists` against one
1017    /// outer row (already baked into `bindings`). Returns true iff
1018    /// the predicate holds.
1019    fn evaluate_correlated_predicate<S: ProjectionStore>(
1020        &self,
1021        store: &mut S,
1022        pred: &parser::Predicate,
1023        bindings: &std::collections::HashMap<String, Value>,
1024        params: &[Value],
1025    ) -> Result<bool> {
1026        match pred {
1027            parser::Predicate::Exists { subquery, negated } => {
1028                let substituted = correlated::substitute_outer_refs(subquery, bindings);
1029                // The inner subquery may itself have nested subqueries;
1030                // run it through the full query engine path so nested
1031                // correlations (if any) are handled.
1032                let inner_result = self.execute_inner_subquery(store, &substituted, params)?;
1033                let exists = !inner_result.rows.is_empty();
1034                Ok(if *negated { !exists } else { exists })
1035            }
1036            parser::Predicate::InSubquery {
1037                column,
1038                subquery,
1039                negated,
1040            } => {
1041                let substituted = correlated::substitute_outer_refs(subquery, bindings);
1042                let inner_result = self.execute_inner_subquery(store, &substituted, params)?;
1043                if inner_result.columns.len() != 1 {
1044                    return Err(QueryError::UnsupportedFeature(format!(
1045                        "IN (SELECT ...) subquery must project exactly 1 column, got {}",
1046                        inner_result.columns.len()
1047                    )));
1048                }
1049                let outer_val = bindings
1050                    .get(column.as_str())
1051                    .or_else(|| bindings.values().next()) // defensive
1052                    .cloned();
1053                let Some(outer_val) = outer_val else {
1054                    return Ok(false);
1055                };
1056                let any_match = inner_result
1057                    .rows
1058                    .iter()
1059                    .any(|row| row.first().is_some_and(|v| v == &outer_val));
1060                Ok(if *negated { !any_match } else { any_match })
1061            }
1062            _ => Err(QueryError::UnsupportedFeature(
1063                "evaluate_correlated_predicate called on non-subquery predicate".to_string(),
1064            )),
1065        }
1066    }
1067
1068    /// Execute an inner subquery (with all outer refs already
1069    /// substituted). Delegates to `plan_query` + `executor::execute`.
1070    fn execute_inner_subquery<S: ProjectionStore>(
1071        &self,
1072        store: &mut S,
1073        inner: &parser::ParsedSelect,
1074        params: &[Value],
1075    ) -> Result<QueryResult> {
1076        // An inner subquery post-substitution might itself contain
1077        // nested correlations or uncorrelated subqueries. Run the
1078        // pre-execute pass once more to handle those cases.
1079        let mut inner_clone = inner.clone();
1080        self.pre_execute_subqueries(store, &mut inner_clone, params)?;
1081        if has_correlated_predicate(&inner_clone.predicates) {
1082            // v0.6.0 caps nesting at one correlated level — the
1083            // outer loop is already one nesting.
1084            return Err(QueryError::UnsupportedFeature(
1085                "nested correlated subqueries (depth > 1) are not supported in v0.6.0".to_string(),
1086            ));
1087        }
1088        let plan = planner::plan_query(&self.schema, &inner_clone, params)?;
1089        let table_def = self
1090            .schema
1091            .get_table(&plan.table_name().into())
1092            .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
1093        executor::execute(store, &plan, table_def)
1094    }
1095
1096    /// Executes a `UNION`, `INTERSECT`, or `EXCEPT` query (with or without `ALL`).
1097    ///
1098    /// Implementation: materialise both sides, then combine according to the
1099    /// operator. `ALL` keeps multiset semantics; the bare form (no `ALL`)
1100    /// deduplicates by row content. `Value` doesn't implement `Hash`, so the
1101    /// dedup/intersect/except keys use the debug format of each row — same
1102    /// trick already used by the prior UNION implementation.
1103    fn execute_union<S: ProjectionStore>(
1104        &self,
1105        store: &mut S,
1106        union_stmt: &parser::ParsedUnion,
1107        params: &[Value],
1108    ) -> Result<QueryResult> {
1109        // Plan and execute left side
1110        let left_plan = planner::plan_query(&self.schema, &union_stmt.left, params)?;
1111        let left_table_def = self
1112            .schema
1113            .get_table(&left_plan.table_name().into())
1114            .ok_or_else(|| QueryError::TableNotFound(left_plan.table_name().to_string()))?;
1115        let left_result = executor::execute(store, &left_plan, left_table_def)?;
1116
1117        // Plan and execute right side
1118        let right_plan = planner::plan_query(&self.schema, &union_stmt.right, params)?;
1119        let right_table_def = self
1120            .schema
1121            .get_table(&right_plan.table_name().into())
1122            .ok_or_else(|| QueryError::TableNotFound(right_plan.table_name().to_string()))?;
1123        let right_result = executor::execute(store, &right_plan, right_table_def)?;
1124
1125        // Use left side column names for the result
1126        let column_names = left_result.columns;
1127
1128        let row_key = |row: &Vec<Value>| format!("{row:?}");
1129
1130        let combined_rows: Vec<Vec<Value>> = match (union_stmt.op, union_stmt.all) {
1131            // UNION ALL: concatenate, keep duplicates
1132            (parser::SetOp::Union, true) => {
1133                let mut all_rows = left_result.rows;
1134                all_rows.extend(right_result.rows);
1135                all_rows
1136            }
1137            // UNION: concatenate then dedup
1138            (parser::SetOp::Union, false) => {
1139                let mut all_rows = left_result.rows;
1140                all_rows.extend(right_result.rows);
1141                let mut seen = std::collections::HashSet::new();
1142                all_rows.retain(|row| seen.insert(row_key(row)));
1143                all_rows
1144            }
1145            // INTERSECT: rows present in both sides (set semantics)
1146            (parser::SetOp::Intersect, false) => {
1147                let right_keys: std::collections::HashSet<String> =
1148                    right_result.rows.iter().map(&row_key).collect();
1149                let mut seen = std::collections::HashSet::new();
1150                left_result
1151                    .rows
1152                    .into_iter()
1153                    .filter(|row| {
1154                        let key = row_key(row);
1155                        right_keys.contains(&key) && seen.insert(key)
1156                    })
1157                    .collect()
1158            }
1159            // INTERSECT ALL: keep multiplicities — for each row appearing
1160            // min(left_count, right_count) times, emit that many copies.
1161            (parser::SetOp::Intersect, true) => {
1162                let mut right_counts: std::collections::HashMap<String, usize> =
1163                    std::collections::HashMap::new();
1164                for row in &right_result.rows {
1165                    *right_counts.entry(row_key(row)).or_insert(0) += 1;
1166                }
1167                let mut out = Vec::new();
1168                for row in left_result.rows {
1169                    let key = row_key(&row);
1170                    if let Some(count) = right_counts.get_mut(&key) {
1171                        if *count > 0 {
1172                            *count -= 1;
1173                            out.push(row);
1174                        }
1175                    }
1176                }
1177                out
1178            }
1179            // EXCEPT: rows in left side not in right side (set semantics)
1180            (parser::SetOp::Except, false) => {
1181                let right_keys: std::collections::HashSet<String> =
1182                    right_result.rows.iter().map(&row_key).collect();
1183                let mut seen = std::collections::HashSet::new();
1184                left_result
1185                    .rows
1186                    .into_iter()
1187                    .filter(|row| {
1188                        let key = row_key(row);
1189                        !right_keys.contains(&key) && seen.insert(key)
1190                    })
1191                    .collect()
1192            }
1193            // EXCEPT ALL: subtract multiplicities — left_count - right_count copies
1194            (parser::SetOp::Except, true) => {
1195                let mut right_counts: std::collections::HashMap<String, usize> =
1196                    std::collections::HashMap::new();
1197                for row in &right_result.rows {
1198                    *right_counts.entry(row_key(row)).or_insert(0) += 1;
1199                }
1200                let mut out = Vec::new();
1201                for row in left_result.rows {
1202                    let key = row_key(&row);
1203                    let count = right_counts.entry(key).or_insert(0);
1204                    if *count > 0 {
1205                        *count -= 1;
1206                    } else {
1207                        out.push(row);
1208                    }
1209                }
1210                out
1211            }
1212        };
1213
1214        Ok(QueryResult {
1215            columns: column_names,
1216            rows: combined_rows,
1217        })
1218    }
1219
1220    /// Executes a SQL query at a specific log position (point-in-time query).
1221    ///
1222    /// This enables compliance queries that show the state as it was
1223    /// at a specific point in the log.
1224    ///
1225    /// # Arguments
1226    ///
1227    /// * `store` - The projection store to query
1228    /// * `sql` - SQL query string
1229    /// * `params` - Query parameters
1230    /// * `position` - Log position to query at
1231    ///
1232    /// # Example
1233    ///
1234    /// ```ignore
1235    /// // Get user state as of log position 1000
1236    /// let result = engine.query_at(
1237    ///     &mut store,
1238    ///     "SELECT * FROM users WHERE id = 1",
1239    ///     &[],
1240    ///     Offset::new(1000),
1241    /// )?;
1242    /// ```
1243    pub fn query_at<S: ProjectionStore>(
1244        &self,
1245        store: &mut S,
1246        sql: &str,
1247        params: &[Value],
1248        position: Offset,
1249    ) -> Result<QueryResult> {
1250        let stmt = self.parse_query_statement_cached(sql)?;
1251
1252        match stmt {
1253            parser::ParsedStatement::Select(parsed) => {
1254                let plan = planner::plan_query(&self.schema, &parsed, params)?;
1255                let table_def = self
1256                    .schema
1257                    .get_table(&plan.table_name().into())
1258                    .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
1259                executor::execute_at(store, &plan, table_def, position)
1260            }
1261            parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
1262                "UNION is not supported in point-in-time queries".to_string(),
1263            )),
1264            _ => unreachable!("parse_query_statement only returns Select or Union"),
1265        }
1266    }
1267
1268    /// Executes a query against a historical snapshot selected by
1269    /// wall-clock timestamp (AUDIT-2026-04 L-4).
1270    ///
1271    /// This is the user-facing ergonomic form of
1272    /// [`Self::query_at`] — healthcare auditors ask "what did the
1273    /// chart look like on 2026-01-15?", not "what was log offset
1274    /// 948,274?". The caller supplies a `resolver` callback that
1275    /// translates a Unix-nanosecond timestamp into the log offset
1276    /// whose commit timestamp is the greatest value ≤ the target.
1277    ///
1278    /// The resolver is a callback rather than a hard dependency
1279    /// so the query crate does not take a direct dep on
1280    /// `kimberlite-compliance::audit` or the kernel's audit log.
1281    /// A typical impl performs a binary search on the in-memory
1282    /// audit index.
1283    ///
1284    /// # Errors
1285    ///
1286    /// - [`QueryError::UnsupportedFeature`] if the resolver
1287    ///   returns `None` (no offset exists at or before the target
1288    ///   — typically because the log is empty or the timestamp
1289    ///   predates genesis).
1290    ///
1291    /// # Example
1292    ///
1293    /// ```ignore
1294    /// let resolver = |ts_ns: i64| -> Option<Offset> {
1295    ///     audit_log.offset_at_or_before(ts_ns)
1296    /// };
1297    /// let result = engine.query_at_timestamp(
1298    ///     &mut store,
1299    ///     "SELECT * FROM charts WHERE patient_id = $1",
1300    ///     &[Value::BigInt(42)],
1301    ///     1_760_000_000_000_000_000, // 2025-10-09T07:06:40Z in ns
1302    ///     resolver,
1303    /// )?;
1304    /// ```
1305    pub fn query_at_timestamp<S, R>(
1306        &self,
1307        store: &mut S,
1308        sql: &str,
1309        params: &[Value],
1310        target_ns: i64,
1311        resolver: R,
1312    ) -> Result<QueryResult>
1313    where
1314        S: ProjectionStore,
1315        R: FnOnce(i64) -> Option<Offset>,
1316    {
1317        let offset = resolver(target_ns).ok_or_else(|| {
1318            QueryError::UnsupportedFeature(format!(
1319                "no log offset at or before timestamp {target_ns} ns \
1320                 (empty log or predates genesis)"
1321            ))
1322        })?;
1323        self.query_at(store, sql, params, offset)
1324    }
1325
1326    /// Executes a query against a historical snapshot selected by
1327    /// wall-clock timestamp, with a resolver that can distinguish
1328    /// the "timestamp predates retention horizon" case from a plain
1329    /// "no offset found".
1330    ///
1331    /// v0.6.0 Tier 2 #6: this is the runtime-layer variant of
1332    /// [`Self::query_at_timestamp`] used by `TenantHandle::query`
1333    /// when the resolver has a concrete notion of a retention
1334    /// horizon (e.g. an in-memory timestamp index maintained at
1335    /// append time). The existing `query_at_timestamp` stays as-is
1336    /// so callers with an `Option<Offset>` resolver (e.g. ad-hoc
1337    /// binary search over an external index) keep working.
1338    ///
1339    /// # Resolution semantics
1340    ///
1341    /// - `TimestampResolution::Offset(o)` → execute at `o`.
1342    /// - `TimestampResolution::BeforeRetentionHorizon { horizon_ns }` →
1343    ///   [`QueryError::AsOfBeforeRetentionHorizon`] with both the
1344    ///   requested and horizon timestamps.
1345    /// - `TimestampResolution::LogEmpty` →
1346    ///   [`QueryError::UnsupportedFeature`] (same message the
1347    ///   ergonomic form emits for an empty log).
1348    pub fn query_at_timestamp_resolved<S, R>(
1349        &self,
1350        store: &mut S,
1351        sql: &str,
1352        params: &[Value],
1353        target_ns: i64,
1354        resolver: R,
1355    ) -> Result<QueryResult>
1356    where
1357        S: ProjectionStore,
1358        R: FnOnce(i64) -> TimestampResolution,
1359    {
1360        match resolver(target_ns) {
1361            TimestampResolution::Offset(offset) => self.query_at(store, sql, params, offset),
1362            TimestampResolution::BeforeRetentionHorizon { horizon_ns } => {
1363                Err(QueryError::AsOfBeforeRetentionHorizon {
1364                    requested_ns: target_ns,
1365                    horizon_ns,
1366                })
1367            }
1368            TimestampResolution::LogEmpty => Err(QueryError::UnsupportedFeature(format!(
1369                "no log offset at or before timestamp {target_ns} ns \
1370                 (empty log or predates genesis)"
1371            ))),
1372        }
1373    }
1374
1375    /// AUDIT-2026-04 S3.3 — render a SQL query's access plan
1376    /// without executing it.
1377    ///
1378    /// Returns a deterministic multi-line tree string — same
1379    /// query always produces the same bytes, which lets apps
1380    /// diff plans across schema versions and catch unexpected
1381    /// regressions.
1382    ///
1383    /// The rendered plan **never reveals row data** — only table
1384    /// names, column counts, filter presence/absence, and LIMIT
1385    /// bounds. Masked column names render as their source name
1386    /// (masks are applied post-projection and are not a plan
1387    /// concern).
1388    ///
1389    /// # Errors
1390    ///
1391    /// Any error from parsing or planning (unsupported statement,
1392    /// missing table, etc.) propagates verbatim.
1393    ///
1394    /// # Example
1395    ///
1396    /// ```ignore
1397    /// let tree = engine.explain("SELECT * FROM patients WHERE id = $1", &[Value::BigInt(42)])?;
1398    /// println!("{tree}");
1399    /// // -> PointLookup [patients, cols=3]
1400    /// ```
1401    pub fn explain(&self, sql: &str, params: &[Value]) -> Result<String> {
1402        let stmt = self.parse_query_statement_cached(sql)?;
1403        match stmt {
1404            parser::ParsedStatement::Select(parsed) => {
1405                let plan = planner::plan_query(&self.schema, &parsed, params)?;
1406                Ok(explain::explain_plan(&plan))
1407            }
1408            parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
1409                "EXPLAIN does not yet render UNION plans".to_string(),
1410            )),
1411            _ => unreachable!("parse_query_statement only returns Select or Union"),
1412        }
1413    }
1414
1415    /// Parses a SQL query without executing it.
1416    ///
1417    /// Useful for validation or query plan inspection.
1418    pub fn prepare(&self, sql: &str, params: &[Value]) -> Result<PreparedQuery> {
1419        let stmt = self.parse_query_statement_cached(sql)?;
1420        let parsed = match stmt {
1421            parser::ParsedStatement::Select(s) => s,
1422            _ => {
1423                return Err(QueryError::UnsupportedFeature(
1424                    "only SELECT queries can be prepared".to_string(),
1425                ));
1426            }
1427        };
1428        let plan = planner::plan_query(&self.schema, &parsed, params)?;
1429
1430        Ok(PreparedQuery {
1431            plan,
1432            schema: self.schema.clone(),
1433        })
1434    }
1435}
1436
1437/// A prepared (planned) query ready for execution.
1438#[derive(Debug, Clone)]
1439pub struct PreparedQuery {
1440    plan: plan::QueryPlan,
1441    schema: Schema,
1442}
1443
1444impl PreparedQuery {
1445    /// Executes this prepared query against the current store state.
1446    pub fn execute<S: ProjectionStore>(&self, store: &mut S) -> Result<QueryResult> {
1447        let table_def = self
1448            .schema
1449            .get_table(&self.plan.table_name().into())
1450            .ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
1451
1452        executor::execute(store, &self.plan, table_def)
1453    }
1454
1455    /// Executes this prepared query at a specific log position.
1456    pub fn execute_at<S: ProjectionStore>(
1457        &self,
1458        store: &mut S,
1459        position: Offset,
1460    ) -> Result<QueryResult> {
1461        let table_def = self
1462            .schema
1463            .get_table(&self.plan.table_name().into())
1464            .ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
1465
1466        executor::execute_at(store, &self.plan, table_def, position)
1467    }
1468
1469    /// Returns the column names this query will return.
1470    pub fn columns(&self) -> &[ColumnName] {
1471        self.plan.column_names()
1472    }
1473
1474    /// Returns the table name being queried.
1475    pub fn table_name(&self) -> &str {
1476        self.plan.table_name()
1477    }
1478}
1479
1480/// True iff any top-level predicate is a surviving correlated
1481/// subquery (`InSubquery` / `Exists`). Uncorrelated subqueries were
1482/// rewritten by `pre_execute_subqueries` before this is called.
1483fn has_correlated_predicate(predicates: &[parser::Predicate]) -> bool {
1484    predicates.iter().any(|p| {
1485        matches!(
1486            p,
1487            parser::Predicate::InSubquery { .. } | parser::Predicate::Exists { .. }
1488        )
1489    })
1490}
1491
1492/// Extract outer references from a surviving correlated predicate.
1493/// Used by the correlated-loop executor to populate the
1494/// `alias.column → Value` binding map for each outer row.
1495fn correlated_predicate_outer_refs(pred: &parser::Predicate) -> Vec<correlated::OuterRef> {
1496    let subquery = match pred {
1497        parser::Predicate::InSubquery { subquery, .. }
1498        | parser::Predicate::Exists { subquery, .. } => subquery,
1499        _ => return Vec::new(),
1500    };
1501    // Walk inner predicates gathering any qualified ColumnRef —
1502    // post-pre-execute, anything that remains as a ColumnRef with a
1503    // qualifier is an outer reference (the inner FROM has its own
1504    // bare-column refs).
1505    let mut out = Vec::new();
1506    for pred in &subquery.predicates {
1507        collect_refs_in_pred(pred, &mut out);
1508    }
1509    out
1510}
1511
1512fn collect_refs_in_pred(pred: &parser::Predicate, out: &mut Vec<correlated::OuterRef>) {
1513    let push_if_colref = |pv: &parser::PredicateValue, out: &mut Vec<correlated::OuterRef>| {
1514        if let parser::PredicateValue::ColumnRef(raw) = pv {
1515            if let Some((q, c)) = raw.split_once('.') {
1516                out.push(correlated::OuterRef {
1517                    qualifier: q.to_string(),
1518                    column: schema::ColumnName::new(c.to_string()),
1519                    scope_depth: 1,
1520                });
1521            }
1522        }
1523    };
1524    match pred {
1525        parser::Predicate::Eq(_, v)
1526        | parser::Predicate::Lt(_, v)
1527        | parser::Predicate::Le(_, v)
1528        | parser::Predicate::Gt(_, v)
1529        | parser::Predicate::Ge(_, v) => push_if_colref(v, out),
1530        parser::Predicate::In(_, vs) | parser::Predicate::NotIn(_, vs) => {
1531            for v in vs {
1532                push_if_colref(v, out);
1533            }
1534        }
1535        parser::Predicate::NotBetween(_, lo, hi) => {
1536            push_if_colref(lo, out);
1537            push_if_colref(hi, out);
1538        }
1539        parser::Predicate::Or(l, r) => {
1540            for p in l {
1541                collect_refs_in_pred(p, out);
1542            }
1543            for p in r {
1544                collect_refs_in_pred(p, out);
1545            }
1546        }
1547        _ => {}
1548    }
1549}
1550
1551/// Converts a Value to a serde_json::Value for CTE materialization.
1552fn value_to_json(val: &Value) -> serde_json::Value {
1553    // NEVER: Placeholder values must be bound before reaching the CTE /
1554    // query-result JSON boundary. An unbound Placeholder here indicates a
1555    // parameter-binding bug (AUDIT fix: placeholders must be resolved upstream).
1556    kimberlite_properties::never!(
1557        matches!(val, Value::Placeholder(_)),
1558        "query.placeholder_reaches_result_boundary",
1559        "Value::Placeholder must never reach query-result / JSON serialization boundary"
1560    );
1561    match val {
1562        Value::Null | Value::Placeholder(_) => serde_json::Value::Null,
1563        Value::BigInt(i) => serde_json::json!(i),
1564        Value::TinyInt(i) => serde_json::json!(i),
1565        Value::SmallInt(i) => serde_json::json!(i),
1566        Value::Integer(i) => serde_json::json!(i),
1567        Value::Real(f) => serde_json::json!(f),
1568        Value::Decimal(v, scale) => {
1569            // Format decimal: store the raw value and scale as a string
1570            if *scale == 0 {
1571                serde_json::json!(v.to_string())
1572            } else {
1573                let divisor = 10i128.pow(u32::from(*scale));
1574                let whole = v / divisor;
1575                let frac = (v % divisor).unsigned_abs();
1576                serde_json::json!(format!("{whole}.{frac:0>width$}", width = *scale as usize))
1577            }
1578        }
1579        Value::Text(s) => serde_json::json!(s),
1580        Value::Boolean(b) => serde_json::json!(b),
1581        Value::Date(d) => serde_json::json!(d),
1582        Value::Time(t) => serde_json::json!(t),
1583        Value::Timestamp(ts) => serde_json::json!(ts.as_nanos()),
1584        Value::Uuid(u) => {
1585            // Format UUID bytes as hex string
1586            let hex: String = u.iter().map(|b| format!("{b:02x}")).collect();
1587            serde_json::json!(hex)
1588        }
1589        Value::Json(j) => j.clone(),
1590        Value::Bytes(b) => {
1591            use base64::Engine;
1592            serde_json::json!(base64::engine::general_purpose::STANDARD.encode(b))
1593        }
1594    }
1595}