Skip to main content

kimberlite_query/
lib.rs

1//! # kmb-query: SQL query layer for `Kimberlite` projections
2//!
3//! This crate provides a minimal SQL query engine for compliance lookups
4//! against the projection store.
5//!
6//! ## SQL Subset
7//!
8//! Supported SQL features:
9//! - `SELECT` with column list or `*`
10//! - `FROM` single table or `JOIN` (`INNER`, `LEFT`, `RIGHT`, `FULL OUTER`,
11//!   `CROSS`, with `ON` or `USING(...)` clauses)
12//! - `WHERE` with comparison predicates (`=`, `<`, `>`, `<=`, `>=`, `IN`)
13//! - `IN (SELECT)`, `NOT IN (SELECT)`, `EXISTS`, `NOT EXISTS` — both
14//!   uncorrelated (pre-execute fast path) and correlated (semi-join
15//!   decorrelation or nested-loop fallback; see
16//!   `docs/reference/sql/correlated-subqueries.md`)
17//! - `ORDER BY` (ascending/descending)
18//! - `LIMIT` and `OFFSET` — literal or `$N` parameter
19//! - `GROUP BY` with aggregates (`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`)
20//! - Aggregate `FILTER (WHERE ...)` clauses, independent per aggregate
21//! - `HAVING` with aggregate filtering
22//! - `UNION` / `UNION ALL` / `INTERSECT` / `INTERSECT ALL` /
23//!   `EXCEPT` / `EXCEPT ALL`
24//! - `DISTINCT`
25//! - JSON operators `->`, `->>`, `@>` in WHERE clauses
26//! - `CASE` (searched and simple form)
27//! - `WITH` (Common Table Expressions / CTEs), including `WITH RECURSIVE`
28//!   via iterative fixed-point evaluation (depth cap 1000)
29//! - Subqueries in FROM and JOIN (`SELECT * FROM (SELECT ...) AS t`)
30//! - Window functions (`OVER`, `PARTITION BY`, `ROW_NUMBER`, `RANK`,
31//!   `DENSE_RANK`, `LAG`, `LEAD`, `FIRST_VALUE`, `LAST_VALUE`)
32//! - `ALTER TABLE` (ADD COLUMN, DROP COLUMN) — parser only; kernel
33//!   execution pending
34//! - Parameterized queries (`$1`, `$2`, ...) in WHERE, LIMIT, OFFSET, and
35//!   DML values
36//!
37//! - Scalar functions in SELECT projection and WHERE predicates:
38//!   `UPPER`, `LOWER`, `LENGTH`, `TRIM`, `CONCAT`, `||`, `ABS`,
39//!   `ROUND`, `CEIL`/`CEILING`, `FLOOR`, `COALESCE`, `NULLIF`, `CAST`
40//! - `ILIKE`, `NOT LIKE`, `NOT ILIKE` pattern matching
41//! - `NOT IN (list)`, `NOT BETWEEN low AND high`
42//!
43//! Not yet supported:
44//! - Scalar subquery `WHERE col = (SELECT ...)`, `ANY`, `ALL`, `SOME`
45//! - Clock-dependent functions (`NOW()`, `CURRENT_DATE`, `EXTRACT`,
46//!   `DATE_TRUNC`) — deferred pending a clock-threading decision
47//! - `MOD`, `POWER`, `SQRT`, `SUBSTRING` — deferred
48//!
49//! ## Usage
50//!
51//! ```ignore
52//! use kimberlite_query::{QueryEngine, Schema, SchemaBuilder, ColumnDef, DataType, Value};
53//! use kimberlite_store::{BTreeStore, TableId};
54//!
55//! // Define schema
56//! let schema = SchemaBuilder::new()
57//!     .table(
58//!         "users",
59//!         TableId::new(1),
60//!         vec![
61//!             ColumnDef::new("id", DataType::BigInt).not_null(),
62//!             ColumnDef::new("name", DataType::Text).not_null(),
63//!         ],
64//!         vec!["id".into()],
65//!     )
66//!     .build();
67//!
68//! // Create engine
69//! let engine = QueryEngine::new(schema);
70//!
71//! // Execute query
72//! let mut store = BTreeStore::open("data/projections")?;
73//! let result = engine.query(&mut store, "SELECT * FROM users WHERE id = $1", &[Value::BigInt(42)])?;
74//! ```
75//!
76//! ## Point-in-Time Queries
77//!
78//! For compliance, you can query at a specific log position:
79//!
80//! ```ignore
81//! let result = engine.query_at(
82//!     &mut store,
83//!     "SELECT * FROM users WHERE id = 1",
84//!     &[],
85//!     Offset::new(1000),  // Query state as of log position 1000
86//! )?;
87//! ```
88//!
89//! ## Scalar expressions (v0.5.1)
90//!
91//! The parser accepts scalar functions in SELECT projection and WHERE
92//! predicates. Each of these queries parses cleanly and produces a
93//! `ParsedStatement::Select` with either a `ScalarCmp` predicate or
94//! entries in `scalar_projections`:
95//!
96//! ```
97//! use kimberlite_query::{parse_statement, ParsedStatement, Predicate};
98//!
99//! // WHERE col NOT IN (list) — mirror of IN, v0.5.1.
100//! let s = parse_statement("SELECT id FROM t WHERE x NOT IN (1, 2, 3)").unwrap();
101//! let ParsedStatement::Select(sel) = s else { panic!() };
102//! assert!(matches!(sel.predicates[0], Predicate::NotIn(_, _)));
103//!
104//! // WHERE UPPER(name) = 'ALICE' — scalar LHS routes to ScalarCmp.
105//! let s = parse_statement("SELECT id FROM t WHERE UPPER(name) = 'ALICE'").unwrap();
106//! let ParsedStatement::Select(sel) = s else { panic!() };
107//! assert!(matches!(sel.predicates[0], Predicate::ScalarCmp { .. }));
108//!
109//! // SELECT col AS alias — alias preserved end-to-end.
110//! let s = parse_statement("SELECT name AS display FROM t").unwrap();
111//! let ParsedStatement::Select(sel) = s else { panic!() };
112//! let aliases = sel.column_aliases.as_ref().unwrap();
113//! assert_eq!(aliases[0].as_deref(), Some("display"));
114//!
115//! // SELECT CAST(x AS INTEGER) — lands in scalar_projections with a
116//! // synthesised output column name.
117//! let s = parse_statement("SELECT CAST(x AS INTEGER) FROM t").unwrap();
118//! let ParsedStatement::Select(sel) = s else { panic!() };
119//! assert_eq!(sel.scalar_projections.len(), 1);
120//! assert_eq!(sel.scalar_projections[0].output_name.as_str(), "cast");
121//! ```
122
123pub mod correlated;
124pub mod depth_check;
125pub mod dml_planner;
126mod error;
127mod executor;
128pub mod explain;
129pub mod expression;
130pub mod information_schema;
131pub mod key_encoder;
132mod parse_cache;
133mod parser;
134mod plan;
135mod planner;
136pub mod rbac_filter;
137mod schema;
138mod value;
139pub mod window;
140
141#[cfg(test)]
142mod tests;
143
144// Re-export public types
145pub use error::{QueryError, Result};
146pub use executor::{QueryResult, Row, execute};
147pub use expression::{EvalContext, ScalarExpr, evaluate};
148pub use parser::{
149    AlterTableOperation, HavingCondition, HavingOp, OnConflictAction, OnConflictClause,
150    ParsedAlterTable, ParsedAttachMaskingPolicy, ParsedColumn, ParsedCreateIndex, ParsedCreateMask,
151    ParsedCreateMaskingPolicy, ParsedCreateTable, ParsedCreateUser, ParsedCte, ParsedDelete,
152    ParsedDetachMaskingPolicy, ParsedGrant, ParsedInsert, ParsedMaskingStrategy, ParsedSelect,
153    ParsedSetClassification, ParsedStatement, ParsedUnion, ParsedUpdate, Predicate, PredicateValue,
154    ScalarCmpOp, TimeTravel, UpsertExpr, expr_to_scalar_expr, extract_at_offset,
155    extract_time_travel, parse_statement, try_parse_custom_statement,
156};
157pub use planner::plan_query;
158pub use schema::{
159    ColumnDef, ColumnName, DataType, IndexDef, Schema, SchemaBuilder, TableDef, TableName,
160};
161pub use value::Value;
162
163use kimberlite_store::ProjectionStore;
164use kimberlite_types::Offset;
165
166/// Outcome returned by a timestamp→offset resolver.
167///
168/// v0.6.0 Tier 2 #6 — a resolver that owns its own index (the
169/// runtime's in-memory timestamp index, an audit-log-backed index,
170/// etc.) can distinguish three cases that `Option<Offset>` cannot:
171///
172/// - We found an offset at or before the requested timestamp.
173/// - The log has entries, but the earliest is *after* the requested
174///   timestamp — i.e. the request predates the retention horizon.
175///   Surfacing this as a distinct variant lets the query layer emit
176///   [`QueryError::AsOfBeforeRetentionHorizon`] with the horizon
177///   attached, which is far more actionable to the caller.
178/// - The log is empty — no timestamps recorded yet.
179///
180/// See [`QueryEngine::query_at_timestamp_resolved`] for the
181/// consumer of this type.
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183pub enum TimestampResolution {
184    /// Resolver found a projection offset whose commit timestamp is
185    /// the greatest value ≤ the target.
186    Offset(Offset),
187    /// Resolver has entries, but the earliest commit timestamp is
188    /// strictly greater than the target. `horizon_ns` is that
189    /// earliest timestamp, so callers can tell users "the oldest
190    /// retained data is `<horizon>`, try a later instant".
191    BeforeRetentionHorizon { horizon_ns: i64 },
192    /// Resolver has no entries at all (fresh DB or the index hasn't
193    /// been seeded yet). Indistinguishable from "predates genesis"
194    /// in observable behaviour — surfaced via `UnsupportedFeature`.
195    LogEmpty,
196}
197
198/// Query engine for executing SQL against a projection store.
199///
200/// Holds the schema plus an optional parse cache (AUDIT-2026-04
201/// S3.4). The engine is `Clone` — the parse cache is shared via
202/// `Arc` so cloned handles hit the same memoised entries.
203#[derive(Debug, Clone)]
204pub struct QueryEngine {
205    schema: Schema,
206    parse_cache: Option<std::sync::Arc<parse_cache::ParseCache>>,
207    /// Cap on `outer_rows × inner_rows_per_iter` for correlated
208    /// subquery loops. See `docs/reference/sql/correlated-subqueries.md`.
209    /// Defaults to [`correlated::DEFAULT_CORRELATED_CAP`] (10 million).
210    correlated_cap: u64,
211}
212
213impl QueryEngine {
214    /// Creates a new query engine with the given schema. No
215    /// parse cache is attached by default — use
216    /// [`Self::with_parse_cache`] to opt in.
217    pub fn new(schema: Schema) -> Self {
218        Self {
219            schema,
220            parse_cache: None,
221            correlated_cap: correlated::DEFAULT_CORRELATED_CAP,
222        }
223    }
224
225    /// Attach an LRU parse cache of the given size. `0` disables
226    /// caching (every call re-parses).
227    #[must_use]
228    pub fn with_parse_cache(mut self, max_size: usize) -> Self {
229        self.parse_cache = Some(std::sync::Arc::new(parse_cache::ParseCache::new(max_size)));
230        self
231    }
232
233    /// Override the correlated-subquery row-evaluation cap. Defaults
234    /// to 10,000,000. Queries whose estimated
235    /// `outer_rows × inner_rows_per_iter` exceeds this cap fail with
236    /// [`QueryError::CorrelatedCardinalityExceeded`] before the
237    /// correlated loop runs. Set to `u64::MAX` to effectively disable.
238    #[must_use]
239    pub fn with_correlated_cap(mut self, cap: u64) -> Self {
240        self.correlated_cap = cap;
241        self
242    }
243
244    /// Returns a snapshot of parse-cache stats, or `None` if no
245    /// cache is attached.
246    pub fn parse_cache_stats(&self) -> Option<parse_cache::ParseCacheStats> {
247        self.parse_cache
248            .as_deref()
249            .map(parse_cache::ParseCache::stats)
250    }
251
252    /// Clear the parse cache, if any.
253    pub fn clear_parse_cache(&self) {
254        if let Some(c) = &self.parse_cache {
255            c.clear();
256        }
257    }
258
259    /// Returns a reference to the schema.
260    pub fn schema(&self) -> &Schema {
261        &self.schema
262    }
263
264    /// Parses a SQL string and extracts the SELECT or UNION statement.
265    ///
266    /// Static variant — bypasses the parse cache. Used by call
267    /// sites that predate the cache and by internal recursive
268    /// parsers.
269    fn parse_query_statement(sql: &str) -> Result<parser::ParsedStatement> {
270        let stmt = parser::parse_statement(sql)?;
271        match &stmt {
272            parser::ParsedStatement::Select(_) | parser::ParsedStatement::Union(_) => Ok(stmt),
273            _ => Err(QueryError::UnsupportedFeature(
274                "only SELECT and UNION queries are supported".to_string(),
275            )),
276        }
277    }
278
279    /// Cache-aware parse wrapper.
280    ///
281    /// Looks the SQL up in the parse cache (if attached). On
282    /// miss, parses via [`Self::parse_query_statement`] and
283    /// inserts into the cache. Non-SELECT/UNION errors are
284    /// returned directly without populating the cache — they're
285    /// errors for every subsequent call anyway and we don't want
286    /// to memoise them.
287    fn parse_query_statement_cached(&self, sql: &str) -> Result<parser::ParsedStatement> {
288        if let Some(cache) = &self.parse_cache {
289            if let Some(stmt) = cache.get(sql) {
290                return Ok(stmt);
291            }
292        }
293        let stmt = Self::parse_query_statement(sql)?;
294        if let Some(cache) = &self.parse_cache {
295            cache.insert(sql.to_string(), stmt.clone());
296        }
297        Ok(stmt)
298    }
299
300    /// Executes a SQL query against the current store state.
301    ///
302    /// Supports SELECT and UNION/UNION ALL queries.
303    ///
304    /// # Arguments
305    ///
306    /// * `store` - The projection store to query
307    /// * `sql` - SQL query string
308    /// * `params` - Query parameters (for `$1`, `$2`, etc.)
309    ///
310    /// # Example
311    ///
312    /// ```ignore
313    /// let result = engine.query(
314    ///     &mut store,
315    ///     "SELECT name FROM users WHERE id = $1",
316    ///     &[Value::BigInt(42)],
317    /// )?;
318    /// ```
319    pub fn query<S: ProjectionStore>(
320        &self,
321        store: &mut S,
322        sql: &str,
323        params: &[Value],
324    ) -> Result<QueryResult> {
325        // AUDIT-2026-04 S3.5 — healthcare BREAK_GLASS prefix.
326        // `WITH BREAK_GLASS REASON='...' SELECT ...` strips the
327        // prefix, emits a warn-level structured log for the
328        // caller's audit pipeline to pick up, and falls through
329        // to the normal query path. The reason is the attribution
330        // value — enforcement (RBAC + masking) is still applied.
331        let (after_break_glass, break_glass_reason) = explain::extract_break_glass(sql);
332        if let Some(ref reason) = break_glass_reason {
333            tracing::warn!(
334                break_glass_reason = %reason,
335                "BREAK_GLASS query — regulator-visible audit signal",
336            );
337        }
338        let sql = after_break_glass;
339
340        // AUDIT-2026-04 S3.4 — `information_schema.*` virtual-table
341        // interception. Synthesises results from the live schema
342        // without going through the planner/executor. Callers
343        // that point FROM at `information_schema.tables` or
344        // `.columns` get back schema introspection rows without
345        // the table needing to be registered in the store.
346        if let Some(result) = information_schema::maybe_answer(sql, &self.schema) {
347            return Ok(result);
348        }
349
350        // AUDIT-2026-04 S3.3 — EXPLAIN prefix dispatch. A caller
351        // issuing `EXPLAIN SELECT ...` gets a single-row result
352        // whose only value is the rendered plan tree, rather
353        // than executing the statement.
354        let (after_explain, is_explain) = explain::extract_explain(sql);
355        if is_explain {
356            let plan_text = self.explain(after_explain, params)?;
357            return Ok(executor::QueryResult {
358                columns: vec!["plan".into()],
359                rows: vec![vec![Value::Text(plan_text)]],
360            });
361        }
362        let sql = after_explain; // equivalent to original `sql` when EXPLAIN absent
363
364        // Extract time-travel clause (AT OFFSET / FOR SYSTEM_TIME AS OF / AS OF)
365        // before passing SQL to sqlparser. Offset syntax dispatches
366        // directly; timestamp syntax without a resolver errors out
367        // with a clear message pointing to
368        // `query_at_timestamp(..., resolver)`.
369        let (cleaned_sql, time_travel) = parser::extract_time_travel(sql);
370        match time_travel {
371            Some(parser::TimeTravel::Offset(o)) => {
372                return self.query_at(store, &cleaned_sql, params, Offset::new(o));
373            }
374            Some(parser::TimeTravel::TimestampNs(_)) => {
375                return Err(QueryError::UnsupportedFeature(
376                    "FOR SYSTEM_TIME AS OF '<iso>' / AS OF '<iso>' \
377                     requires a timestamp→offset resolver — use \
378                     QueryEngine::query_at_timestamp(..., resolver)"
379                        .to_string(),
380                ));
381            }
382            None => {}
383        }
384
385        let stmt = self.parse_query_statement_cached(sql)?;
386
387        match stmt {
388            parser::ParsedStatement::Select(mut parsed) => {
389                // Pre-execute uncorrelated subqueries (IN/EXISTS/NOT EXISTS),
390                // attempt semi-join decorrelation of correlated ones, and
391                // leave remaining correlated predicates in place for the
392                // correlated-loop executor.
393                //
394                // `outer_scope` is the enclosing scope stack visible to the
395                // subquery; at the top level we seed it with the outer
396                // SELECT's FROM table so the walker can detect
397                // correlation.
398                self.pre_execute_subqueries(store, &mut parsed, params)?;
399
400                let window_fns = parsed.window_fns.clone();
401                let result = if parsed.ctes.is_empty() {
402                    if has_correlated_predicate(&parsed.predicates) {
403                        self.execute_correlated_query(store, &parsed, params)?
404                    } else {
405                        let plan = planner::plan_query(&self.schema, &parsed, params)?;
406                        let table_def = self
407                            .schema
408                            .get_table(&plan.table_name().into())
409                            .ok_or_else(|| {
410                                QueryError::TableNotFound(plan.table_name().to_string())
411                            })?;
412                        executor::execute(store, &plan, table_def)?
413                    }
414                } else {
415                    self.execute_with_ctes(store, &parsed, params)?
416                };
417                // AUDIT-2026-04 S3.2 — window functions are a
418                // post-pass over the base SELECT result; the base
419                // plan already projected the columns the window
420                // fn references.
421                window::apply_window_fns(result, &window_fns)
422            }
423            parser::ParsedStatement::Union(union_stmt) => {
424                self.execute_union(store, &union_stmt, params)
425            }
426            _ => unreachable!("parse_query_statement only returns Select or Union"),
427        }
428    }
429
430    /// Executes a SELECT with CTEs by materializing each CTE and building
431    /// a temporary schema that includes the CTE result sets as tables.
432    fn execute_with_ctes<S: ProjectionStore>(
433        &self,
434        store: &mut S,
435        parsed: &parser::ParsedSelect,
436        params: &[Value],
437    ) -> Result<QueryResult> {
438        // Build an extended schema that includes CTE-derived tables
439        let mut extended_schema = self.schema.clone();
440
441        // Materialize each CTE
442        for cte in &parsed.ctes {
443            // Execute the CTE's anchor (non-recursive) query
444            let cte_plan = planner::plan_query(&extended_schema, &cte.query, params)?;
445            let cte_table_def = extended_schema
446                .get_table(&cte_plan.table_name().into())
447                .ok_or_else(|| QueryError::TableNotFound(cte_plan.table_name().to_string()))?;
448            let mut cte_result = executor::execute(store, &cte_plan, cte_table_def)?;
449
450            // Register CTE result as a virtual table in the extended schema
451            // Use a synthetic table ID based on the CTE name hash
452            let cte_table_id = {
453                use std::collections::hash_map::DefaultHasher;
454                use std::hash::{Hash, Hasher};
455                let mut hasher = DefaultHasher::new();
456                cte.name.hash(&mut hasher);
457                kimberlite_store::TableId::new(hasher.finish())
458            };
459
460            // Build column defs from the CTE result
461            let cte_columns: Vec<schema::ColumnDef> = cte_result
462                .columns
463                .iter()
464                .map(|col| schema::ColumnDef::new(col.as_str(), schema::DataType::Text))
465                .collect();
466
467            let pk_cols = if cte_columns.is_empty() {
468                vec![]
469            } else {
470                vec![cte_result.columns[0].clone()]
471            };
472
473            let cte_table = schema::TableDef::new(cte_table_id, cte_columns, pk_cols);
474            extended_schema.add_table(cte.name.as_str(), cte_table);
475
476            // Write anchor rows into the store as the initial working set.
477            let mut total_rows = cte_result.rows.len();
478            for (row_idx, row) in cte_result.rows.iter().enumerate() {
479                Self::write_cte_row(store, cte_table_id, row_idx, &cte_result.columns, row)?;
480            }
481
482            // Recursive iteration: keep evaluating the recursive arm against
483            // the growing CTE table until no new rows are produced or the
484            // depth cap is hit. Iterative fixed-point — honours the
485            // workspace "no recursion" constraint and prevents runaway loops.
486            if let Some(recursive_select) = &cte.recursive_arm {
487                const MAX_RECURSIVE_DEPTH: usize = 1000;
488                let mut seen: std::collections::HashSet<String> =
489                    cte_result.rows.iter().map(|r| format!("{r:?}")).collect();
490
491                for depth in 0..MAX_RECURSIVE_DEPTH {
492                    let recursive_plan =
493                        planner::plan_query(&extended_schema, recursive_select, params)?;
494                    let recursive_table_def = extended_schema
495                        .get_table(&recursive_plan.table_name().into())
496                        .ok_or_else(|| {
497                            QueryError::TableNotFound(recursive_plan.table_name().to_string())
498                        })?;
499                    let iteration_result =
500                        executor::execute(store, &recursive_plan, recursive_table_def)?;
501
502                    let mut new_rows = 0usize;
503                    for row in iteration_result.rows {
504                        let key = format!("{row:?}");
505                        if seen.insert(key) {
506                            Self::write_cte_row(
507                                store,
508                                cte_table_id,
509                                total_rows,
510                                &cte_result.columns,
511                                &row,
512                            )?;
513                            cte_result.rows.push(row);
514                            total_rows += 1;
515                            new_rows += 1;
516                        }
517                    }
518                    if new_rows == 0 {
519                        break;
520                    }
521                    if depth + 1 == MAX_RECURSIVE_DEPTH {
522                        return Err(QueryError::UnsupportedFeature(format!(
523                            "recursive CTE `{}` exceeded maximum depth of {MAX_RECURSIVE_DEPTH} iterations",
524                            cte.name
525                        )));
526                    }
527                }
528            }
529        }
530
531        // Execute the main query against the extended schema
532        let main_query = parser::ParsedSelect {
533            ctes: vec![], // CTEs already materialized
534            ..parsed.clone()
535        };
536
537        let plan = planner::plan_query(&extended_schema, &main_query, params)?;
538        let table_def = extended_schema
539            .get_table(&plan.table_name().into())
540            .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
541        executor::execute(store, &plan, table_def)
542    }
543
544    /// Writes a single CTE row into the store under the synthetic table id.
545    /// Helper extracted so the recursive-CTE iteration loop and the initial
546    /// anchor materialisation share the same path.
547    fn write_cte_row<S: ProjectionStore>(
548        store: &mut S,
549        cte_table_id: kimberlite_store::TableId,
550        row_idx: usize,
551        columns: &[crate::schema::ColumnName],
552        row: &[Value],
553    ) -> Result<()> {
554        let mut row_map = serde_json::Map::new();
555        for (col, val) in columns.iter().zip(row.iter()) {
556            row_map.insert(col.as_str().to_string(), value_to_json(val));
557        }
558        let json_val = serde_json::to_vec(&serde_json::Value::Object(row_map)).map_err(|e| {
559            QueryError::UnsupportedFeature(format!("CTE serialization failed: {e}"))
560        })?;
561        let pk_key = crate::key_encoder::encode_key(&[Value::BigInt(row_idx as i64)]);
562        let batch = kimberlite_store::WriteBatch::new(kimberlite_types::Offset::new(
563            store.applied_position().as_u64() + 1,
564        ))
565        .put(cte_table_id, pk_key, bytes::Bytes::from(json_val));
566        store.apply(batch)?;
567        Ok(())
568    }
569
570    /// Walks the outer SELECT's predicate tree, classifies each
571    /// subquery as uncorrelated or correlated, and rewrites as follows:
572    ///
573    /// - **Uncorrelated** `IN (SELECT)` / `EXISTS` / `NOT EXISTS` →
574    ///   pre-executed once; result substituted into `Predicate::In` /
575    ///   `Predicate::NotIn` / `Predicate::Always(bool)`. Matches the
576    ///   v0.5.0 fast path.
577    /// - **Correlated** `EXISTS` / `NOT EXISTS` with a single equijoin
578    ///   to the outer scope: rewritten by
579    ///   [`correlated::try_semi_join_rewrite`] into
580    ///   `Predicate::InSubquery { negated }` against the outer column,
581    ///   then the uncorrelated path pre-executes it.
582    /// - **Correlated** everything else: left in place for
583    ///   [`Self::execute_correlated_query`] to handle per-outer-row.
584    ///
585    /// Assertion: on return, every `InSubquery` / `Exists` still in
586    /// `parsed.predicates` is correlated. The caller's
587    /// `has_correlated_predicate` check uses that invariant to
588    /// dispatch.
589    fn pre_execute_subqueries<S: ProjectionStore>(
590        &self,
591        store: &mut S,
592        parsed: &mut parser::ParsedSelect,
593        params: &[Value],
594    ) -> Result<()> {
595        // Build the outer scope — the outer SELECT's FROM table (plus
596        // any JOIN tables) as the enclosing scope for each inner
597        // subquery.
598        let outer_scope = self.build_outer_scope(parsed);
599
600        let preds = std::mem::take(&mut parsed.predicates);
601        let mut out = Vec::with_capacity(preds.len());
602        for pred in preds {
603            out.push(self.classify_and_rewrite_predicate(store, pred, &outer_scope, params)?);
604        }
605        parsed.predicates = out;
606        Ok(())
607    }
608
609    /// Construct the outer `PlannerScope` for `parsed` — the visible
610    /// tables in the outer SELECT. FROM first, then any JOIN tables.
611    fn build_outer_scope<'s>(
612        &'s self,
613        parsed: &parser::ParsedSelect,
614    ) -> correlated::PlannerScope<'s> {
615        let mut bindings: Vec<(String, &schema::TableDef)> = Vec::new();
616        if let Some(t) = self.schema.get_table(&parsed.table.clone().into()) {
617            bindings.push((parsed.table.clone(), t));
618        }
619        for join in &parsed.joins {
620            if let Some(t) = self.schema.get_table(&join.table.clone().into()) {
621                bindings.push((join.table.clone(), t));
622            }
623        }
624        correlated::PlannerScope::empty().push(bindings)
625    }
626
627    /// Classifier for a single predicate — drives pre-execute vs.
628    /// semi-join rewrite vs. leave-as-correlated. Recursive on `Or`.
629    fn classify_and_rewrite_predicate<S: ProjectionStore>(
630        &self,
631        store: &mut S,
632        pred: parser::Predicate,
633        outer_scope: &correlated::PlannerScope<'_>,
634        params: &[Value],
635    ) -> Result<parser::Predicate> {
636        match pred {
637            parser::Predicate::InSubquery {
638                column,
639                subquery,
640                negated,
641            } => {
642                let outer_refs =
643                    correlated::collect_outer_refs(&subquery, outer_scope, &self.schema);
644                if outer_refs.is_empty() {
645                    // Uncorrelated — pre-execute and substitute.
646                    self.pre_execute_uncorrelated_in(store, &column, &subquery, negated, params)
647                } else {
648                    // Correlated IN/NOT IN — keep the predicate
649                    // in place; the correlated-loop executor handles it.
650                    Ok(parser::Predicate::InSubquery {
651                        column,
652                        subquery,
653                        negated,
654                    })
655                }
656            }
657            parser::Predicate::Exists { subquery, negated } => {
658                let outer_refs =
659                    correlated::collect_outer_refs(&subquery, outer_scope, &self.schema);
660                if outer_refs.is_empty() {
661                    // Uncorrelated.
662                    self.pre_execute_uncorrelated_exists(store, &subquery, negated, params)
663                } else if let Some((outer_col, rewritten)) =
664                    correlated::try_semi_join_rewrite(&subquery, negated, &outer_refs)
665                {
666                    // Decorrelated: rewrite as IN (SELECT) / NOT IN (SELECT)
667                    // against the outer column, then pre-execute.
668                    self.pre_execute_uncorrelated_in(store, &outer_col, &rewritten, negated, params)
669                } else {
670                    // Correlated loop fallback.
671                    Ok(parser::Predicate::Exists { subquery, negated })
672                }
673            }
674            parser::Predicate::Or(left, right) => {
675                let mut new_left = Vec::with_capacity(left.len());
676                for p in left {
677                    new_left.push(self.classify_and_rewrite_predicate(
678                        store,
679                        p,
680                        outer_scope,
681                        params,
682                    )?);
683                }
684                let mut new_right = Vec::with_capacity(right.len());
685                for p in right {
686                    new_right.push(self.classify_and_rewrite_predicate(
687                        store,
688                        p,
689                        outer_scope,
690                        params,
691                    )?);
692                }
693                Ok(parser::Predicate::Or(new_left, new_right))
694            }
695            other => Ok(other),
696        }
697    }
698
699    /// Pre-execute an uncorrelated `IN (SELECT)` / `NOT IN (SELECT)`
700    /// and return the substituted `Predicate::In` / `Predicate::NotIn`.
701    fn pre_execute_uncorrelated_in<S: ProjectionStore>(
702        &self,
703        store: &mut S,
704        column: &schema::ColumnName,
705        subquery: &parser::ParsedSelect,
706        negated: bool,
707        params: &[Value],
708    ) -> Result<parser::Predicate> {
709        let inner_plan = planner::plan_query(&self.schema, subquery, params)?;
710        let inner_table_def = self
711            .schema
712            .get_table(&inner_plan.table_name().into())
713            .ok_or_else(|| QueryError::TableNotFound(inner_plan.table_name().to_string()))?;
714        let inner_result = executor::execute(store, &inner_plan, inner_table_def)?;
715        if inner_result.columns.len() != 1 {
716            return Err(QueryError::UnsupportedFeature(format!(
717                "IN (SELECT ...) subquery must project exactly 1 column, got {}",
718                inner_result.columns.len()
719            )));
720        }
721        let values: Vec<parser::PredicateValue> = inner_result
722            .rows
723            .into_iter()
724            .filter_map(|row| row.into_iter().next())
725            .map(parser::PredicateValue::Literal)
726            .collect();
727        Ok(if negated {
728            parser::Predicate::NotIn(column.clone(), values)
729        } else {
730            parser::Predicate::In(column.clone(), values)
731        })
732    }
733
734    /// Pre-execute an uncorrelated `EXISTS` / `NOT EXISTS` and return
735    /// the collapsed `Predicate::Always(bool)`.
736    fn pre_execute_uncorrelated_exists<S: ProjectionStore>(
737        &self,
738        store: &mut S,
739        subquery: &parser::ParsedSelect,
740        negated: bool,
741        params: &[Value],
742    ) -> Result<parser::Predicate> {
743        let inner_plan = planner::plan_query(&self.schema, subquery, params)?;
744        let inner_table_def = self
745            .schema
746            .get_table(&inner_plan.table_name().into())
747            .ok_or_else(|| QueryError::TableNotFound(inner_plan.table_name().to_string()))?;
748        let inner_result = executor::execute(store, &inner_plan, inner_table_def)?;
749        let exists = !inner_result.rows.is_empty();
750        let predicate_holds = if negated { !exists } else { exists };
751        Ok(parser::Predicate::Always(predicate_holds))
752    }
753
754    /// Execute a SELECT whose predicate list still contains at least
755    /// one correlated subquery (InSubquery or Exists that survived
756    /// `pre_execute_subqueries`).
757    ///
758    /// Strategy: split the predicate list into "simple" (non-correlated)
759    /// predicates and "correlated" ones. Plan the outer query using
760    /// only the simple predicates. For each returned row, substitute
761    /// outer column values into each correlated inner subquery and
762    /// execute it; keep the row iff all correlated predicates pass.
763    fn execute_correlated_query<S: ProjectionStore>(
764        &self,
765        store: &mut S,
766        parsed: &parser::ParsedSelect,
767        params: &[Value],
768    ) -> Result<QueryResult> {
769        // Split simple vs. correlated predicates.
770        let mut simple_preds: Vec<parser::Predicate> = Vec::new();
771        let mut correlated_preds: Vec<parser::Predicate> = Vec::new();
772        for pred in &parsed.predicates {
773            match pred {
774                parser::Predicate::InSubquery { .. } | parser::Predicate::Exists { .. } => {
775                    correlated_preds.push(pred.clone());
776                }
777                other => simple_preds.push(other.clone()),
778            }
779        }
780
781        // Build the outer query stripped of correlated predicates — we
782        // need the FULL outer row (all columns) so we can substitute
783        // outer column values into each inner subquery, regardless of
784        // which columns the user SELECTed. We'll project to the
785        // requested columns after the row-by-row filter.
786        let outer_table_def = self
787            .schema
788            .get_table(&parsed.table.clone().into())
789            .ok_or_else(|| QueryError::TableNotFound(parsed.table.clone()))?;
790        let outer_scan = parser::ParsedSelect {
791            predicates: simple_preds,
792            columns: None, // force SELECT * so we have every column available
793            column_aliases: None,
794            order_by: Vec::new(),
795            limit: None,
796            offset: None,
797            aggregates: Vec::new(),
798            aggregate_filters: Vec::new(),
799            group_by: Vec::new(),
800            distinct: false,
801            having: Vec::new(),
802            ctes: Vec::new(),
803            window_fns: Vec::new(),
804            scalar_projections: Vec::new(),
805            case_columns: Vec::new(),
806            joins: Vec::new(),
807            ..parsed.clone()
808        };
809
810        let outer_plan = planner::plan_query(&self.schema, &outer_scan, params)?;
811        let outer_rows = executor::execute(store, &outer_plan, outer_table_def)?;
812
813        // Estimate correlated row-evaluation cost for cardinality guard.
814        //
815        // Inner cost per outer row is bounded by the total inner-table
816        // row count; we use the store's current table size as an upper
817        // bound. If multiple correlated predicates reference the same
818        // or different inner tables, sum the per-predicate cost.
819        let outer_count = outer_rows.rows.len() as u64;
820        let mut inner_cost_per_row: u64 = 0;
821        for pred in &correlated_preds {
822            let inner_table = match pred {
823                parser::Predicate::InSubquery { subquery, .. }
824                | parser::Predicate::Exists { subquery, .. } => &subquery.table,
825                _ => continue,
826            };
827            let inner_def = self
828                .schema
829                .get_table(&inner_table.clone().into())
830                .ok_or_else(|| QueryError::TableNotFound(inner_table.clone()))?;
831            // Upper-bound the inner cost by scanning the table once — we
832            // issue a bounded scan so this is cheap.
833            let pairs = store.scan(
834                inner_def.table_id,
835                kimberlite_store::Key::min()..kimberlite_store::Key::max(),
836                1_000_000,
837            )?;
838            inner_cost_per_row = inner_cost_per_row.saturating_add(pairs.len() as u64);
839        }
840        // When inner tables are empty, bound by 1 to keep estimation
841        // monotonic (so a 0 × N query doesn't look free).
842        let inner_cost_per_row = inner_cost_per_row.max(1);
843        let estimated = outer_count.saturating_mul(inner_cost_per_row);
844        if estimated > self.correlated_cap {
845            return Err(QueryError::CorrelatedCardinalityExceeded {
846                estimated,
847                cap: self.correlated_cap,
848            });
849        }
850
851        // For each outer row, evaluate the correlated predicates.
852        let outer_columns = outer_rows.columns.clone();
853        let outer_alias = parsed.table.clone();
854        let mut kept: Vec<Vec<Value>> = Vec::new();
855        for row in outer_rows.rows {
856            // Build the `"alias.column"` → Value binding map. We bind
857            // the FROM alias as it appears in the ParsedSelect (the
858            // parser already resolved user aliases into the `table`
859            // field when the alias shadows the table name).
860            let mut bindings = std::collections::HashMap::new();
861            for (col, val) in outer_columns.iter().zip(row.iter()) {
862                bindings.insert(format!("{outer_alias}.{col}"), val.clone());
863                // Also bind under every possible alias seen in the
864                // correlated predicates — the user may have written
865                // `p.id` while the parser stored the table name
866                // `patient_current`. We cover the common case by
867                // also binding the bare column name and any alias
868                // we can extract from the inner refs themselves.
869                bindings.insert(col.as_str().to_string(), val.clone());
870            }
871            // Extend bindings with each correlated-ref alias. Walking
872            // the correlated predicate trees once up-front is fine.
873            for pred in &correlated_preds {
874                let refs = correlated_predicate_outer_refs(pred);
875                for r in refs {
876                    let col_idx = outer_columns
877                        .iter()
878                        .position(|c| c.as_str() == r.column.as_str());
879                    if let Some(idx) = col_idx {
880                        if let Some(v) = row.get(idx) {
881                            bindings.insert(r.as_column_ref(), v.clone());
882                        }
883                    }
884                }
885            }
886
887            let mut all_pass = true;
888            for pred in &correlated_preds {
889                if !self.evaluate_correlated_predicate(store, pred, &bindings, params)? {
890                    all_pass = false;
891                    break;
892                }
893            }
894            if all_pass {
895                kept.push(row);
896            }
897        }
898
899        // Apply ORDER BY, LIMIT, OFFSET on the full rows before
900        // projecting to the user's requested column list.
901        // (Simple implementation: leverage the fact that we kept full
902        // rows. We construct a second plan that projects + orders +
903        // limits using a temporary store isn't worth it; do it inline.)
904        Self::post_process_correlated_result(parsed, params, outer_columns, kept)
905    }
906
907    /// Apply the outer query's projection / ORDER BY / LIMIT / OFFSET
908    /// to the rows surviving the correlated-predicate filter.
909    fn post_process_correlated_result(
910        parsed: &parser::ParsedSelect,
911        params: &[Value],
912        outer_columns: Vec<schema::ColumnName>,
913        mut rows: Vec<Vec<Value>>,
914    ) -> Result<QueryResult> {
915        // ORDER BY — bare-column only, resolved against outer_columns.
916        if !parsed.order_by.is_empty() {
917            let indices: Vec<(usize, bool)> = parsed
918                .order_by
919                .iter()
920                .map(|ob| {
921                    let idx = outer_columns
922                        .iter()
923                        .position(|c| c == &ob.column)
924                        .ok_or_else(|| QueryError::ColumnNotFound {
925                            table: parsed.table.clone(),
926                            column: ob.column.to_string(),
927                        })?;
928                    Ok::<_, QueryError>((idx, ob.ascending))
929                })
930                .collect::<Result<Vec<_>>>()?;
931            rows.sort_by(|a, b| {
932                for (idx, asc) in &indices {
933                    let ord = a
934                        .get(*idx)
935                        .and_then(|av| b.get(*idx).and_then(|bv| av.compare(bv)))
936                        .unwrap_or(std::cmp::Ordering::Equal);
937                    let ord = if *asc { ord } else { ord.reverse() };
938                    if ord != std::cmp::Ordering::Equal {
939                        return ord;
940                    }
941                }
942                std::cmp::Ordering::Equal
943            });
944        }
945
946        // OFFSET / LIMIT.
947        let offset = match parsed.offset {
948            Some(parser::LimitExpr::Literal(n)) => n,
949            Some(parser::LimitExpr::Param(idx)) => params
950                .get(idx.saturating_sub(1))
951                .and_then(|v| match v {
952                    Value::BigInt(n) if *n >= 0 => Some(*n as usize),
953                    Value::Integer(n) if *n >= 0 => Some(*n as usize),
954                    _ => None,
955                })
956                .unwrap_or(0),
957            None => 0,
958        };
959        let limit = match parsed.limit {
960            Some(parser::LimitExpr::Literal(n)) => Some(n),
961            Some(parser::LimitExpr::Param(idx)) => {
962                params.get(idx.saturating_sub(1)).and_then(|v| match v {
963                    Value::BigInt(n) if *n >= 0 => Some(*n as usize),
964                    Value::Integer(n) if *n >= 0 => Some(*n as usize),
965                    _ => None,
966                })
967            }
968            None => None,
969        };
970        if offset > 0 {
971            rows.drain(0..offset.min(rows.len()));
972        }
973        if let Some(l) = limit {
974            rows.truncate(l);
975        }
976
977        // Project to the requested column list.
978        let (out_columns, projected_rows) = match (&parsed.columns, &parsed.column_aliases) {
979            (None, _) => (outer_columns.clone(), rows),
980            (Some(cols), aliases) => {
981                let mut indices = Vec::with_capacity(cols.len());
982                let mut out_names: Vec<schema::ColumnName> = Vec::with_capacity(cols.len());
983                for (i, col) in cols.iter().enumerate() {
984                    let idx = outer_columns.iter().position(|c| c == col).ok_or_else(|| {
985                        QueryError::ColumnNotFound {
986                            table: parsed.table.clone(),
987                            column: col.to_string(),
988                        }
989                    })?;
990                    indices.push(idx);
991                    let alias = aliases
992                        .as_ref()
993                        .and_then(|a| a.get(i))
994                        .and_then(|a| a.as_ref());
995                    out_names.push(match alias {
996                        Some(a) => schema::ColumnName::new(a.clone()),
997                        None => col.clone(),
998                    });
999                }
1000                let projected: Vec<Vec<Value>> = rows
1001                    .into_iter()
1002                    .map(|r| indices.iter().map(|i| r[*i].clone()).collect())
1003                    .collect();
1004                (out_names, projected)
1005            }
1006        };
1007
1008        Ok(QueryResult {
1009            columns: out_columns,
1010            rows: projected_rows,
1011        })
1012    }
1013
1014    /// Evaluate a correlated `InSubquery` / `Exists` against one
1015    /// outer row (already baked into `bindings`). Returns true iff
1016    /// the predicate holds.
1017    fn evaluate_correlated_predicate<S: ProjectionStore>(
1018        &self,
1019        store: &mut S,
1020        pred: &parser::Predicate,
1021        bindings: &std::collections::HashMap<String, Value>,
1022        params: &[Value],
1023    ) -> Result<bool> {
1024        match pred {
1025            parser::Predicate::Exists { subquery, negated } => {
1026                let substituted = correlated::substitute_outer_refs(subquery, bindings);
1027                // The inner subquery may itself have nested subqueries;
1028                // run it through the full query engine path so nested
1029                // correlations (if any) are handled.
1030                let inner_result = self.execute_inner_subquery(store, &substituted, params)?;
1031                let exists = !inner_result.rows.is_empty();
1032                Ok(if *negated { !exists } else { exists })
1033            }
1034            parser::Predicate::InSubquery {
1035                column,
1036                subquery,
1037                negated,
1038            } => {
1039                let substituted = correlated::substitute_outer_refs(subquery, bindings);
1040                let inner_result = self.execute_inner_subquery(store, &substituted, params)?;
1041                if inner_result.columns.len() != 1 {
1042                    return Err(QueryError::UnsupportedFeature(format!(
1043                        "IN (SELECT ...) subquery must project exactly 1 column, got {}",
1044                        inner_result.columns.len()
1045                    )));
1046                }
1047                let outer_val = bindings
1048                    .get(column.as_str())
1049                    .or_else(|| bindings.values().next()) // defensive
1050                    .cloned();
1051                let Some(outer_val) = outer_val else {
1052                    return Ok(false);
1053                };
1054                let any_match = inner_result
1055                    .rows
1056                    .iter()
1057                    .any(|row| row.first().is_some_and(|v| v == &outer_val));
1058                Ok(if *negated { !any_match } else { any_match })
1059            }
1060            _ => Err(QueryError::UnsupportedFeature(
1061                "evaluate_correlated_predicate called on non-subquery predicate".to_string(),
1062            )),
1063        }
1064    }
1065
1066    /// Execute an inner subquery (with all outer refs already
1067    /// substituted). Delegates to `plan_query` + `executor::execute`.
1068    fn execute_inner_subquery<S: ProjectionStore>(
1069        &self,
1070        store: &mut S,
1071        inner: &parser::ParsedSelect,
1072        params: &[Value],
1073    ) -> Result<QueryResult> {
1074        // An inner subquery post-substitution might itself contain
1075        // nested correlations or uncorrelated subqueries. Run the
1076        // pre-execute pass once more to handle those cases.
1077        let mut inner_clone = inner.clone();
1078        self.pre_execute_subqueries(store, &mut inner_clone, params)?;
1079        if has_correlated_predicate(&inner_clone.predicates) {
1080            // v0.6.0 caps nesting at one correlated level — the
1081            // outer loop is already one nesting.
1082            return Err(QueryError::UnsupportedFeature(
1083                "nested correlated subqueries (depth > 1) are not supported in v0.6.0".to_string(),
1084            ));
1085        }
1086        let plan = planner::plan_query(&self.schema, &inner_clone, params)?;
1087        let table_def = self
1088            .schema
1089            .get_table(&plan.table_name().into())
1090            .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
1091        executor::execute(store, &plan, table_def)
1092    }
1093
1094    /// Executes a `UNION`, `INTERSECT`, or `EXCEPT` query (with or without `ALL`).
1095    ///
1096    /// Implementation: materialise both sides, then combine according to the
1097    /// operator. `ALL` keeps multiset semantics; the bare form (no `ALL`)
1098    /// deduplicates by row content. `Value` doesn't implement `Hash`, so the
1099    /// dedup/intersect/except keys use the debug format of each row — same
1100    /// trick already used by the prior UNION implementation.
1101    fn execute_union<S: ProjectionStore>(
1102        &self,
1103        store: &mut S,
1104        union_stmt: &parser::ParsedUnion,
1105        params: &[Value],
1106    ) -> Result<QueryResult> {
1107        // Plan and execute left side
1108        let left_plan = planner::plan_query(&self.schema, &union_stmt.left, params)?;
1109        let left_table_def = self
1110            .schema
1111            .get_table(&left_plan.table_name().into())
1112            .ok_or_else(|| QueryError::TableNotFound(left_plan.table_name().to_string()))?;
1113        let left_result = executor::execute(store, &left_plan, left_table_def)?;
1114
1115        // Plan and execute right side
1116        let right_plan = planner::plan_query(&self.schema, &union_stmt.right, params)?;
1117        let right_table_def = self
1118            .schema
1119            .get_table(&right_plan.table_name().into())
1120            .ok_or_else(|| QueryError::TableNotFound(right_plan.table_name().to_string()))?;
1121        let right_result = executor::execute(store, &right_plan, right_table_def)?;
1122
1123        // Use left side column names for the result
1124        let column_names = left_result.columns;
1125
1126        let row_key = |row: &Vec<Value>| format!("{row:?}");
1127
1128        let combined_rows: Vec<Vec<Value>> = match (union_stmt.op, union_stmt.all) {
1129            // UNION ALL: concatenate, keep duplicates
1130            (parser::SetOp::Union, true) => {
1131                let mut all_rows = left_result.rows;
1132                all_rows.extend(right_result.rows);
1133                all_rows
1134            }
1135            // UNION: concatenate then dedup
1136            (parser::SetOp::Union, false) => {
1137                let mut all_rows = left_result.rows;
1138                all_rows.extend(right_result.rows);
1139                let mut seen = std::collections::HashSet::new();
1140                all_rows.retain(|row| seen.insert(row_key(row)));
1141                all_rows
1142            }
1143            // INTERSECT: rows present in both sides (set semantics)
1144            (parser::SetOp::Intersect, false) => {
1145                let right_keys: std::collections::HashSet<String> =
1146                    right_result.rows.iter().map(&row_key).collect();
1147                let mut seen = std::collections::HashSet::new();
1148                left_result
1149                    .rows
1150                    .into_iter()
1151                    .filter(|row| {
1152                        let key = row_key(row);
1153                        right_keys.contains(&key) && seen.insert(key)
1154                    })
1155                    .collect()
1156            }
1157            // INTERSECT ALL: keep multiplicities — for each row appearing
1158            // min(left_count, right_count) times, emit that many copies.
1159            (parser::SetOp::Intersect, true) => {
1160                let mut right_counts: std::collections::HashMap<String, usize> =
1161                    std::collections::HashMap::new();
1162                for row in &right_result.rows {
1163                    *right_counts.entry(row_key(row)).or_insert(0) += 1;
1164                }
1165                let mut out = Vec::new();
1166                for row in left_result.rows {
1167                    let key = row_key(&row);
1168                    if let Some(count) = right_counts.get_mut(&key) {
1169                        if *count > 0 {
1170                            *count -= 1;
1171                            out.push(row);
1172                        }
1173                    }
1174                }
1175                out
1176            }
1177            // EXCEPT: rows in left side not in right side (set semantics)
1178            (parser::SetOp::Except, false) => {
1179                let right_keys: std::collections::HashSet<String> =
1180                    right_result.rows.iter().map(&row_key).collect();
1181                let mut seen = std::collections::HashSet::new();
1182                left_result
1183                    .rows
1184                    .into_iter()
1185                    .filter(|row| {
1186                        let key = row_key(row);
1187                        !right_keys.contains(&key) && seen.insert(key)
1188                    })
1189                    .collect()
1190            }
1191            // EXCEPT ALL: subtract multiplicities — left_count - right_count copies
1192            (parser::SetOp::Except, true) => {
1193                let mut right_counts: std::collections::HashMap<String, usize> =
1194                    std::collections::HashMap::new();
1195                for row in &right_result.rows {
1196                    *right_counts.entry(row_key(row)).or_insert(0) += 1;
1197                }
1198                let mut out = Vec::new();
1199                for row in left_result.rows {
1200                    let key = row_key(&row);
1201                    let count = right_counts.entry(key).or_insert(0);
1202                    if *count > 0 {
1203                        *count -= 1;
1204                    } else {
1205                        out.push(row);
1206                    }
1207                }
1208                out
1209            }
1210        };
1211
1212        Ok(QueryResult {
1213            columns: column_names,
1214            rows: combined_rows,
1215        })
1216    }
1217
1218    /// Executes a SQL query at a specific log position (point-in-time query).
1219    ///
1220    /// This enables compliance queries that show the state as it was
1221    /// at a specific point in the log.
1222    ///
1223    /// # Arguments
1224    ///
1225    /// * `store` - The projection store to query
1226    /// * `sql` - SQL query string
1227    /// * `params` - Query parameters
1228    /// * `position` - Log position to query at
1229    ///
1230    /// # Example
1231    ///
1232    /// ```ignore
1233    /// // Get user state as of log position 1000
1234    /// let result = engine.query_at(
1235    ///     &mut store,
1236    ///     "SELECT * FROM users WHERE id = 1",
1237    ///     &[],
1238    ///     Offset::new(1000),
1239    /// )?;
1240    /// ```
1241    pub fn query_at<S: ProjectionStore>(
1242        &self,
1243        store: &mut S,
1244        sql: &str,
1245        params: &[Value],
1246        position: Offset,
1247    ) -> Result<QueryResult> {
1248        let stmt = self.parse_query_statement_cached(sql)?;
1249
1250        match stmt {
1251            parser::ParsedStatement::Select(parsed) => {
1252                let plan = planner::plan_query(&self.schema, &parsed, params)?;
1253                let table_def = self
1254                    .schema
1255                    .get_table(&plan.table_name().into())
1256                    .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
1257                executor::execute_at(store, &plan, table_def, position)
1258            }
1259            parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
1260                "UNION is not supported in point-in-time queries".to_string(),
1261            )),
1262            _ => unreachable!("parse_query_statement only returns Select or Union"),
1263        }
1264    }
1265
1266    /// Executes a query against a historical snapshot selected by
1267    /// wall-clock timestamp (AUDIT-2026-04 L-4).
1268    ///
1269    /// This is the user-facing ergonomic form of
1270    /// [`Self::query_at`] — healthcare auditors ask "what did the
1271    /// chart look like on 2026-01-15?", not "what was log offset
1272    /// 948,274?". The caller supplies a `resolver` callback that
1273    /// translates a Unix-nanosecond timestamp into the log offset
1274    /// whose commit timestamp is the greatest value ≤ the target.
1275    ///
1276    /// The resolver is a callback rather than a hard dependency
1277    /// so the query crate does not take a direct dep on
1278    /// `kimberlite-compliance::audit` or the kernel's audit log.
1279    /// A typical impl performs a binary search on the in-memory
1280    /// audit index.
1281    ///
1282    /// # Errors
1283    ///
1284    /// - [`QueryError::UnsupportedFeature`] if the resolver
1285    ///   returns `None` (no offset exists at or before the target
1286    ///   — typically because the log is empty or the timestamp
1287    ///   predates genesis).
1288    ///
1289    /// # Example
1290    ///
1291    /// ```ignore
1292    /// let resolver = |ts_ns: i64| -> Option<Offset> {
1293    ///     audit_log.offset_at_or_before(ts_ns)
1294    /// };
1295    /// let result = engine.query_at_timestamp(
1296    ///     &mut store,
1297    ///     "SELECT * FROM charts WHERE patient_id = $1",
1298    ///     &[Value::BigInt(42)],
1299    ///     1_760_000_000_000_000_000, // 2025-10-09T07:06:40Z in ns
1300    ///     resolver,
1301    /// )?;
1302    /// ```
1303    pub fn query_at_timestamp<S, R>(
1304        &self,
1305        store: &mut S,
1306        sql: &str,
1307        params: &[Value],
1308        target_ns: i64,
1309        resolver: R,
1310    ) -> Result<QueryResult>
1311    where
1312        S: ProjectionStore,
1313        R: FnOnce(i64) -> Option<Offset>,
1314    {
1315        let offset = resolver(target_ns).ok_or_else(|| {
1316            QueryError::UnsupportedFeature(format!(
1317                "no log offset at or before timestamp {target_ns} ns \
1318                 (empty log or predates genesis)"
1319            ))
1320        })?;
1321        self.query_at(store, sql, params, offset)
1322    }
1323
1324    /// Executes a query against a historical snapshot selected by
1325    /// wall-clock timestamp, with a resolver that can distinguish
1326    /// the "timestamp predates retention horizon" case from a plain
1327    /// "no offset found".
1328    ///
1329    /// v0.6.0 Tier 2 #6: this is the runtime-layer variant of
1330    /// [`Self::query_at_timestamp`] used by `TenantHandle::query`
1331    /// when the resolver has a concrete notion of a retention
1332    /// horizon (e.g. an in-memory timestamp index maintained at
1333    /// append time). The existing `query_at_timestamp` stays as-is
1334    /// so callers with an `Option<Offset>` resolver (e.g. ad-hoc
1335    /// binary search over an external index) keep working.
1336    ///
1337    /// # Resolution semantics
1338    ///
1339    /// - `TimestampResolution::Offset(o)` → execute at `o`.
1340    /// - `TimestampResolution::BeforeRetentionHorizon { horizon_ns }` →
1341    ///   [`QueryError::AsOfBeforeRetentionHorizon`] with both the
1342    ///   requested and horizon timestamps.
1343    /// - `TimestampResolution::LogEmpty` →
1344    ///   [`QueryError::UnsupportedFeature`] (same message the
1345    ///   ergonomic form emits for an empty log).
1346    pub fn query_at_timestamp_resolved<S, R>(
1347        &self,
1348        store: &mut S,
1349        sql: &str,
1350        params: &[Value],
1351        target_ns: i64,
1352        resolver: R,
1353    ) -> Result<QueryResult>
1354    where
1355        S: ProjectionStore,
1356        R: FnOnce(i64) -> TimestampResolution,
1357    {
1358        match resolver(target_ns) {
1359            TimestampResolution::Offset(offset) => self.query_at(store, sql, params, offset),
1360            TimestampResolution::BeforeRetentionHorizon { horizon_ns } => {
1361                Err(QueryError::AsOfBeforeRetentionHorizon {
1362                    requested_ns: target_ns,
1363                    horizon_ns,
1364                })
1365            }
1366            TimestampResolution::LogEmpty => Err(QueryError::UnsupportedFeature(format!(
1367                "no log offset at or before timestamp {target_ns} ns \
1368                 (empty log or predates genesis)"
1369            ))),
1370        }
1371    }
1372
1373    /// AUDIT-2026-04 S3.3 — render a SQL query's access plan
1374    /// without executing it.
1375    ///
1376    /// Returns a deterministic multi-line tree string — same
1377    /// query always produces the same bytes, which lets apps
1378    /// diff plans across schema versions and catch unexpected
1379    /// regressions.
1380    ///
1381    /// The rendered plan **never reveals row data** — only table
1382    /// names, column counts, filter presence/absence, and LIMIT
1383    /// bounds. Masked column names render as their source name
1384    /// (masks are applied post-projection and are not a plan
1385    /// concern).
1386    ///
1387    /// # Errors
1388    ///
1389    /// Any error from parsing or planning (unsupported statement,
1390    /// missing table, etc.) propagates verbatim.
1391    ///
1392    /// # Example
1393    ///
1394    /// ```ignore
1395    /// let tree = engine.explain("SELECT * FROM patients WHERE id = $1", &[Value::BigInt(42)])?;
1396    /// println!("{tree}");
1397    /// // -> PointLookup [patients, cols=3]
1398    /// ```
1399    pub fn explain(&self, sql: &str, params: &[Value]) -> Result<String> {
1400        let stmt = self.parse_query_statement_cached(sql)?;
1401        match stmt {
1402            parser::ParsedStatement::Select(parsed) => {
1403                let plan = planner::plan_query(&self.schema, &parsed, params)?;
1404                Ok(explain::explain_plan(&plan))
1405            }
1406            parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
1407                "EXPLAIN does not yet render UNION plans".to_string(),
1408            )),
1409            _ => unreachable!("parse_query_statement only returns Select or Union"),
1410        }
1411    }
1412
1413    /// Parses a SQL query without executing it.
1414    ///
1415    /// Useful for validation or query plan inspection.
1416    pub fn prepare(&self, sql: &str, params: &[Value]) -> Result<PreparedQuery> {
1417        let stmt = self.parse_query_statement_cached(sql)?;
1418        let parsed = match stmt {
1419            parser::ParsedStatement::Select(s) => s,
1420            _ => {
1421                return Err(QueryError::UnsupportedFeature(
1422                    "only SELECT queries can be prepared".to_string(),
1423                ));
1424            }
1425        };
1426        let plan = planner::plan_query(&self.schema, &parsed, params)?;
1427
1428        Ok(PreparedQuery {
1429            plan,
1430            schema: self.schema.clone(),
1431        })
1432    }
1433}
1434
1435/// A prepared (planned) query ready for execution.
1436#[derive(Debug, Clone)]
1437pub struct PreparedQuery {
1438    plan: plan::QueryPlan,
1439    schema: Schema,
1440}
1441
1442impl PreparedQuery {
1443    /// Executes this prepared query against the current store state.
1444    pub fn execute<S: ProjectionStore>(&self, store: &mut S) -> Result<QueryResult> {
1445        let table_def = self
1446            .schema
1447            .get_table(&self.plan.table_name().into())
1448            .ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
1449
1450        executor::execute(store, &self.plan, table_def)
1451    }
1452
1453    /// Executes this prepared query at a specific log position.
1454    pub fn execute_at<S: ProjectionStore>(
1455        &self,
1456        store: &mut S,
1457        position: Offset,
1458    ) -> Result<QueryResult> {
1459        let table_def = self
1460            .schema
1461            .get_table(&self.plan.table_name().into())
1462            .ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
1463
1464        executor::execute_at(store, &self.plan, table_def, position)
1465    }
1466
1467    /// Returns the column names this query will return.
1468    pub fn columns(&self) -> &[ColumnName] {
1469        self.plan.column_names()
1470    }
1471
1472    /// Returns the table name being queried.
1473    pub fn table_name(&self) -> &str {
1474        self.plan.table_name()
1475    }
1476}
1477
1478/// True iff any top-level predicate is a surviving correlated
1479/// subquery (`InSubquery` / `Exists`). Uncorrelated subqueries were
1480/// rewritten by `pre_execute_subqueries` before this is called.
1481fn has_correlated_predicate(predicates: &[parser::Predicate]) -> bool {
1482    predicates.iter().any(|p| {
1483        matches!(
1484            p,
1485            parser::Predicate::InSubquery { .. } | parser::Predicate::Exists { .. }
1486        )
1487    })
1488}
1489
1490/// Extract outer references from a surviving correlated predicate.
1491/// Used by the correlated-loop executor to populate the
1492/// `alias.column → Value` binding map for each outer row.
1493fn correlated_predicate_outer_refs(pred: &parser::Predicate) -> Vec<correlated::OuterRef> {
1494    let subquery = match pred {
1495        parser::Predicate::InSubquery { subquery, .. }
1496        | parser::Predicate::Exists { subquery, .. } => subquery,
1497        _ => return Vec::new(),
1498    };
1499    // Walk inner predicates gathering any qualified ColumnRef —
1500    // post-pre-execute, anything that remains as a ColumnRef with a
1501    // qualifier is an outer reference (the inner FROM has its own
1502    // bare-column refs).
1503    let mut out = Vec::new();
1504    for pred in &subquery.predicates {
1505        collect_refs_in_pred(pred, &mut out);
1506    }
1507    out
1508}
1509
1510fn collect_refs_in_pred(pred: &parser::Predicate, out: &mut Vec<correlated::OuterRef>) {
1511    let push_if_colref = |pv: &parser::PredicateValue, out: &mut Vec<correlated::OuterRef>| {
1512        if let parser::PredicateValue::ColumnRef(raw) = pv {
1513            if let Some((q, c)) = raw.split_once('.') {
1514                out.push(correlated::OuterRef {
1515                    qualifier: q.to_string(),
1516                    column: schema::ColumnName::new(c.to_string()),
1517                    scope_depth: 1,
1518                });
1519            }
1520        }
1521    };
1522    match pred {
1523        parser::Predicate::Eq(_, v)
1524        | parser::Predicate::Lt(_, v)
1525        | parser::Predicate::Le(_, v)
1526        | parser::Predicate::Gt(_, v)
1527        | parser::Predicate::Ge(_, v) => push_if_colref(v, out),
1528        parser::Predicate::In(_, vs) | parser::Predicate::NotIn(_, vs) => {
1529            for v in vs {
1530                push_if_colref(v, out);
1531            }
1532        }
1533        parser::Predicate::NotBetween(_, lo, hi) => {
1534            push_if_colref(lo, out);
1535            push_if_colref(hi, out);
1536        }
1537        parser::Predicate::Or(l, r) => {
1538            for p in l {
1539                collect_refs_in_pred(p, out);
1540            }
1541            for p in r {
1542                collect_refs_in_pred(p, out);
1543            }
1544        }
1545        _ => {}
1546    }
1547}
1548
1549/// Converts a Value to a serde_json::Value for CTE materialization.
1550fn value_to_json(val: &Value) -> serde_json::Value {
1551    // NEVER: Placeholder values must be bound before reaching the CTE /
1552    // query-result JSON boundary. An unbound Placeholder here indicates a
1553    // parameter-binding bug (AUDIT fix: placeholders must be resolved upstream).
1554    kimberlite_properties::never!(
1555        matches!(val, Value::Placeholder(_)),
1556        "query.placeholder_reaches_result_boundary",
1557        "Value::Placeholder must never reach query-result / JSON serialization boundary"
1558    );
1559    match val {
1560        Value::Null | Value::Placeholder(_) => serde_json::Value::Null,
1561        Value::BigInt(i) => serde_json::json!(i),
1562        Value::TinyInt(i) => serde_json::json!(i),
1563        Value::SmallInt(i) => serde_json::json!(i),
1564        Value::Integer(i) => serde_json::json!(i),
1565        Value::Real(f) => serde_json::json!(f),
1566        Value::Decimal(v, scale) => {
1567            // Format decimal: store the raw value and scale as a string
1568            if *scale == 0 {
1569                serde_json::json!(v.to_string())
1570            } else {
1571                let divisor = 10i128.pow(u32::from(*scale));
1572                let whole = v / divisor;
1573                let frac = (v % divisor).unsigned_abs();
1574                serde_json::json!(format!("{whole}.{frac:0>width$}", width = *scale as usize))
1575            }
1576        }
1577        Value::Text(s) => serde_json::json!(s),
1578        Value::Boolean(b) => serde_json::json!(b),
1579        Value::Date(d) => serde_json::json!(d),
1580        Value::Time(t) => serde_json::json!(t),
1581        Value::Timestamp(ts) => serde_json::json!(ts.as_nanos()),
1582        Value::Uuid(u) => {
1583            // Format UUID bytes as hex string
1584            let hex: String = u.iter().map(|b| format!("{b:02x}")).collect();
1585            serde_json::json!(hex)
1586        }
1587        Value::Json(j) => j.clone(),
1588        Value::Bytes(b) => {
1589            use base64::Engine;
1590            serde_json::json!(base64::engine::general_purpose::STANDARD.encode(b))
1591        }
1592    }
1593}