pub mod correlated;
pub mod depth_check;
pub mod dml_planner;
mod error;
mod executor;
pub mod explain;
pub mod expression;
pub mod information_schema;
pub mod key_encoder;
mod parse_cache;
mod parser;
mod plan;
mod planner;
pub mod rbac_filter;
mod schema;
mod value;
pub mod window;
#[cfg(test)]
mod tests;
pub use error::{QueryError, Result};
pub use executor::{QueryResult, Row, execute};
pub use expression::{EvalContext, ScalarExpr, evaluate};
pub use parser::{
AlterTableOperation, HavingCondition, HavingOp, OnConflictAction, OnConflictClause,
ParsedAlterTable, ParsedAttachMaskingPolicy, ParsedColumn, ParsedCreateIndex, ParsedCreateMask,
ParsedCreateMaskingPolicy, ParsedCreateTable, ParsedCreateUser, ParsedCte, ParsedDelete,
ParsedDetachMaskingPolicy, ParsedGrant, ParsedInsert, ParsedMaskingStrategy, ParsedSelect,
ParsedSetClassification, ParsedStatement, ParsedUnion, ParsedUpdate, Predicate, PredicateValue,
ScalarCmpOp, TimeTravel, UpsertExpr, expr_to_scalar_expr, extract_at_offset,
extract_time_travel, parse_statement, try_parse_custom_statement,
};
pub use planner::plan_query;
pub use schema::{
ColumnDef, ColumnName, DataType, IndexDef, Schema, SchemaBuilder, TableDef, TableName,
};
pub use value::Value;
use kimberlite_store::ProjectionStore;
use kimberlite_types::Offset;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TimestampResolution {
Offset(Offset),
BeforeRetentionHorizon { horizon_ns: i64 },
LogEmpty,
}
#[derive(Debug, Clone)]
pub struct QueryEngine {
schema: Schema,
parse_cache: Option<std::sync::Arc<parse_cache::ParseCache>>,
correlated_cap: u64,
}
impl QueryEngine {
pub fn new(schema: Schema) -> Self {
Self {
schema,
parse_cache: None,
correlated_cap: correlated::DEFAULT_CORRELATED_CAP,
}
}
#[must_use]
pub fn with_parse_cache(mut self, max_size: usize) -> Self {
self.parse_cache = Some(std::sync::Arc::new(parse_cache::ParseCache::new(max_size)));
self
}
#[must_use]
pub fn with_correlated_cap(mut self, cap: u64) -> Self {
self.correlated_cap = cap;
self
}
pub fn parse_cache_stats(&self) -> Option<parse_cache::ParseCacheStats> {
self.parse_cache
.as_deref()
.map(parse_cache::ParseCache::stats)
}
pub fn clear_parse_cache(&self) {
if let Some(c) = &self.parse_cache {
c.clear();
}
}
pub fn schema(&self) -> &Schema {
&self.schema
}
fn parse_query_statement(sql: &str) -> Result<parser::ParsedStatement> {
let stmt = parser::parse_statement(sql)?;
match &stmt {
parser::ParsedStatement::Select(_) | parser::ParsedStatement::Union(_) => Ok(stmt),
_ => Err(QueryError::UnsupportedFeature(
"only SELECT and UNION queries are supported".to_string(),
)),
}
}
fn parse_query_statement_cached(&self, sql: &str) -> Result<parser::ParsedStatement> {
if let Some(cache) = &self.parse_cache {
if let Some(stmt) = cache.get(sql) {
return Ok(stmt);
}
}
let stmt = Self::parse_query_statement(sql)?;
if let Some(cache) = &self.parse_cache {
cache.insert(sql.to_string(), stmt.clone());
}
Ok(stmt)
}
pub fn query<S: ProjectionStore>(
&self,
store: &mut S,
sql: &str,
params: &[Value],
) -> Result<QueryResult> {
let (after_break_glass, break_glass_reason) = explain::extract_break_glass(sql);
if let Some(ref reason) = break_glass_reason {
tracing::warn!(
break_glass_reason = %reason,
"BREAK_GLASS query — regulator-visible audit signal",
);
}
let sql = after_break_glass;
if let Some(result) = information_schema::maybe_answer(sql, &self.schema) {
return Ok(result);
}
let (after_explain, is_explain) = explain::extract_explain(sql);
if is_explain {
let plan_text = self.explain(after_explain, params)?;
return Ok(executor::QueryResult {
columns: vec!["plan".into()],
rows: vec![vec![Value::Text(plan_text)]],
});
}
let sql = after_explain;
let (cleaned_sql, time_travel) = parser::extract_time_travel(sql);
match time_travel {
Some(parser::TimeTravel::Offset(o)) => {
return self.query_at(store, &cleaned_sql, params, Offset::new(o));
}
Some(parser::TimeTravel::TimestampNs(_)) => {
return Err(QueryError::UnsupportedFeature(
"FOR SYSTEM_TIME AS OF '<iso>' / AS OF '<iso>' \
requires a timestamp→offset resolver — use \
QueryEngine::query_at_timestamp(..., resolver)"
.to_string(),
));
}
None => {}
}
let stmt = self.parse_query_statement_cached(sql)?;
match stmt {
parser::ParsedStatement::Select(mut parsed) => {
self.pre_execute_subqueries(store, &mut parsed, params)?;
let window_fns = parsed.window_fns.clone();
let result = if parsed.ctes.is_empty() {
if has_correlated_predicate(&parsed.predicates) {
self.execute_correlated_query(store, &parsed, params)?
} else {
let plan = planner::plan_query(&self.schema, &parsed, params)?;
let table_def = self
.schema
.get_table(&plan.table_name().into())
.ok_or_else(|| {
QueryError::TableNotFound(plan.table_name().to_string())
})?;
executor::execute(store, &plan, table_def)?
}
} else {
self.execute_with_ctes(store, &parsed, params)?
};
window::apply_window_fns(result, &window_fns)
}
parser::ParsedStatement::Union(union_stmt) => {
self.execute_union(store, &union_stmt, params)
}
_ => unreachable!("parse_query_statement only returns Select or Union"),
}
}
fn execute_with_ctes<S: ProjectionStore>(
&self,
store: &mut S,
parsed: &parser::ParsedSelect,
params: &[Value],
) -> Result<QueryResult> {
let mut extended_schema = self.schema.clone();
for cte in &parsed.ctes {
let cte_plan = planner::plan_query(&extended_schema, &cte.query, params)?;
let cte_table_def = extended_schema
.get_table(&cte_plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(cte_plan.table_name().to_string()))?;
let mut cte_result = executor::execute(store, &cte_plan, cte_table_def)?;
let cte_table_id = {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
cte.name.hash(&mut hasher);
kimberlite_store::TableId::new(hasher.finish())
};
let cte_columns: Vec<schema::ColumnDef> = cte_result
.columns
.iter()
.map(|col| schema::ColumnDef::new(col.as_str(), schema::DataType::Text))
.collect();
let pk_cols = if cte_columns.is_empty() {
vec![]
} else {
vec![cte_result.columns[0].clone()]
};
let cte_table = schema::TableDef::new(cte_table_id, cte_columns, pk_cols);
extended_schema.add_table(cte.name.as_str(), cte_table);
let mut total_rows = cte_result.rows.len();
for (row_idx, row) in cte_result.rows.iter().enumerate() {
Self::write_cte_row(store, cte_table_id, row_idx, &cte_result.columns, row)?;
}
if let Some(recursive_select) = &cte.recursive_arm {
const MAX_RECURSIVE_DEPTH: usize = 1000;
let mut seen: std::collections::HashSet<String> =
cte_result.rows.iter().map(|r| format!("{r:?}")).collect();
for depth in 0..MAX_RECURSIVE_DEPTH {
let recursive_plan =
planner::plan_query(&extended_schema, recursive_select, params)?;
let recursive_table_def = extended_schema
.get_table(&recursive_plan.table_name().into())
.ok_or_else(|| {
QueryError::TableNotFound(recursive_plan.table_name().to_string())
})?;
let iteration_result =
executor::execute(store, &recursive_plan, recursive_table_def)?;
let mut new_rows = 0usize;
for row in iteration_result.rows {
let key = format!("{row:?}");
if seen.insert(key) {
Self::write_cte_row(
store,
cte_table_id,
total_rows,
&cte_result.columns,
&row,
)?;
cte_result.rows.push(row);
total_rows += 1;
new_rows += 1;
}
}
if new_rows == 0 {
break;
}
if depth + 1 == MAX_RECURSIVE_DEPTH {
return Err(QueryError::UnsupportedFeature(format!(
"recursive CTE `{}` exceeded maximum depth of {MAX_RECURSIVE_DEPTH} iterations",
cte.name
)));
}
}
}
}
let main_query = parser::ParsedSelect {
ctes: vec![], ..parsed.clone()
};
let plan = planner::plan_query(&extended_schema, &main_query, params)?;
let table_def = extended_schema
.get_table(&plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
executor::execute(store, &plan, table_def)
}
fn write_cte_row<S: ProjectionStore>(
store: &mut S,
cte_table_id: kimberlite_store::TableId,
row_idx: usize,
columns: &[crate::schema::ColumnName],
row: &[Value],
) -> Result<()> {
let mut row_map = serde_json::Map::new();
for (col, val) in columns.iter().zip(row.iter()) {
row_map.insert(col.as_str().to_string(), value_to_json(val));
}
let json_val = serde_json::to_vec(&serde_json::Value::Object(row_map)).map_err(|e| {
QueryError::UnsupportedFeature(format!("CTE serialization failed: {e}"))
})?;
let pk_key = crate::key_encoder::encode_key(&[Value::BigInt(row_idx as i64)]);
let batch = kimberlite_store::WriteBatch::new(kimberlite_types::Offset::new(
store.applied_position().as_u64() + 1,
))
.put(cte_table_id, pk_key, bytes::Bytes::from(json_val));
store.apply(batch)?;
Ok(())
}
fn pre_execute_subqueries<S: ProjectionStore>(
&self,
store: &mut S,
parsed: &mut parser::ParsedSelect,
params: &[Value],
) -> Result<()> {
let outer_scope = self.build_outer_scope(parsed);
let preds = std::mem::take(&mut parsed.predicates);
let mut out = Vec::with_capacity(preds.len());
for pred in preds {
out.push(self.classify_and_rewrite_predicate(store, pred, &outer_scope, params)?);
}
parsed.predicates = out;
Ok(())
}
fn build_outer_scope<'s>(
&'s self,
parsed: &parser::ParsedSelect,
) -> correlated::PlannerScope<'s> {
let mut bindings: Vec<(String, &schema::TableDef)> = Vec::new();
if let Some(t) = self.schema.get_table(&parsed.table.clone().into()) {
bindings.push((parsed.table.clone(), t));
}
for join in &parsed.joins {
if let Some(t) = self.schema.get_table(&join.table.clone().into()) {
bindings.push((join.table.clone(), t));
}
}
correlated::PlannerScope::empty().push(bindings)
}
fn classify_and_rewrite_predicate<S: ProjectionStore>(
&self,
store: &mut S,
pred: parser::Predicate,
outer_scope: &correlated::PlannerScope<'_>,
params: &[Value],
) -> Result<parser::Predicate> {
match pred {
parser::Predicate::InSubquery {
column,
subquery,
negated,
} => {
let outer_refs =
correlated::collect_outer_refs(&subquery, outer_scope, &self.schema);
if outer_refs.is_empty() {
self.pre_execute_uncorrelated_in(store, &column, &subquery, negated, params)
} else {
Ok(parser::Predicate::InSubquery {
column,
subquery,
negated,
})
}
}
parser::Predicate::Exists { subquery, negated } => {
let outer_refs =
correlated::collect_outer_refs(&subquery, outer_scope, &self.schema);
if outer_refs.is_empty() {
self.pre_execute_uncorrelated_exists(store, &subquery, negated, params)
} else if let Some((outer_col, rewritten)) =
correlated::try_semi_join_rewrite(&subquery, negated, &outer_refs)
{
self.pre_execute_uncorrelated_in(store, &outer_col, &rewritten, negated, params)
} else {
Ok(parser::Predicate::Exists { subquery, negated })
}
}
parser::Predicate::Or(left, right) => {
let mut new_left = Vec::with_capacity(left.len());
for p in left {
new_left.push(self.classify_and_rewrite_predicate(
store,
p,
outer_scope,
params,
)?);
}
let mut new_right = Vec::with_capacity(right.len());
for p in right {
new_right.push(self.classify_and_rewrite_predicate(
store,
p,
outer_scope,
params,
)?);
}
Ok(parser::Predicate::Or(new_left, new_right))
}
other => Ok(other),
}
}
fn pre_execute_uncorrelated_in<S: ProjectionStore>(
&self,
store: &mut S,
column: &schema::ColumnName,
subquery: &parser::ParsedSelect,
negated: bool,
params: &[Value],
) -> Result<parser::Predicate> {
let inner_plan = planner::plan_query(&self.schema, subquery, params)?;
let inner_table_def = self
.schema
.get_table(&inner_plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(inner_plan.table_name().to_string()))?;
let inner_result = executor::execute(store, &inner_plan, inner_table_def)?;
if inner_result.columns.len() != 1 {
return Err(QueryError::UnsupportedFeature(format!(
"IN (SELECT ...) subquery must project exactly 1 column, got {}",
inner_result.columns.len()
)));
}
let values: Vec<parser::PredicateValue> = inner_result
.rows
.into_iter()
.filter_map(|row| row.into_iter().next())
.map(parser::PredicateValue::Literal)
.collect();
Ok(if negated {
parser::Predicate::NotIn(column.clone(), values)
} else {
parser::Predicate::In(column.clone(), values)
})
}
fn pre_execute_uncorrelated_exists<S: ProjectionStore>(
&self,
store: &mut S,
subquery: &parser::ParsedSelect,
negated: bool,
params: &[Value],
) -> Result<parser::Predicate> {
let inner_plan = planner::plan_query(&self.schema, subquery, params)?;
let inner_table_def = self
.schema
.get_table(&inner_plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(inner_plan.table_name().to_string()))?;
let inner_result = executor::execute(store, &inner_plan, inner_table_def)?;
let exists = !inner_result.rows.is_empty();
let predicate_holds = if negated { !exists } else { exists };
Ok(parser::Predicate::Always(predicate_holds))
}
fn execute_correlated_query<S: ProjectionStore>(
&self,
store: &mut S,
parsed: &parser::ParsedSelect,
params: &[Value],
) -> Result<QueryResult> {
let mut simple_preds: Vec<parser::Predicate> = Vec::new();
let mut correlated_preds: Vec<parser::Predicate> = Vec::new();
for pred in &parsed.predicates {
match pred {
parser::Predicate::InSubquery { .. } | parser::Predicate::Exists { .. } => {
correlated_preds.push(pred.clone());
}
other => simple_preds.push(other.clone()),
}
}
let outer_table_def = self
.schema
.get_table(&parsed.table.clone().into())
.ok_or_else(|| QueryError::TableNotFound(parsed.table.clone()))?;
let outer_scan = parser::ParsedSelect {
predicates: simple_preds,
columns: None, column_aliases: None,
order_by: Vec::new(),
limit: None,
offset: None,
aggregates: Vec::new(),
aggregate_filters: Vec::new(),
group_by: Vec::new(),
distinct: false,
having: Vec::new(),
ctes: Vec::new(),
window_fns: Vec::new(),
scalar_projections: Vec::new(),
case_columns: Vec::new(),
joins: Vec::new(),
..parsed.clone()
};
let outer_plan = planner::plan_query(&self.schema, &outer_scan, params)?;
let outer_rows = executor::execute(store, &outer_plan, outer_table_def)?;
let outer_count = outer_rows.rows.len() as u64;
let mut inner_cost_per_row: u64 = 0;
for pred in &correlated_preds {
let inner_table = match pred {
parser::Predicate::InSubquery { subquery, .. }
| parser::Predicate::Exists { subquery, .. } => &subquery.table,
_ => continue,
};
let inner_def = self
.schema
.get_table(&inner_table.clone().into())
.ok_or_else(|| QueryError::TableNotFound(inner_table.clone()))?;
let pairs = store.scan(
inner_def.table_id,
kimberlite_store::Key::min()..kimberlite_store::Key::max(),
1_000_000,
)?;
inner_cost_per_row = inner_cost_per_row.saturating_add(pairs.len() as u64);
}
let inner_cost_per_row = inner_cost_per_row.max(1);
let estimated = outer_count.saturating_mul(inner_cost_per_row);
if estimated > self.correlated_cap {
return Err(QueryError::CorrelatedCardinalityExceeded {
estimated,
cap: self.correlated_cap,
});
}
let outer_columns = outer_rows.columns.clone();
let outer_alias = parsed.table.clone();
let mut kept: Vec<Vec<Value>> = Vec::new();
for row in outer_rows.rows {
let mut bindings = std::collections::HashMap::new();
for (col, val) in outer_columns.iter().zip(row.iter()) {
bindings.insert(format!("{outer_alias}.{col}"), val.clone());
bindings.insert(col.as_str().to_string(), val.clone());
}
for pred in &correlated_preds {
let refs = correlated_predicate_outer_refs(pred);
for r in refs {
let col_idx = outer_columns
.iter()
.position(|c| c.as_str() == r.column.as_str());
if let Some(idx) = col_idx {
if let Some(v) = row.get(idx) {
bindings.insert(r.as_column_ref(), v.clone());
}
}
}
}
let mut all_pass = true;
for pred in &correlated_preds {
if !self.evaluate_correlated_predicate(store, pred, &bindings, params)? {
all_pass = false;
break;
}
}
if all_pass {
kept.push(row);
}
}
Self::post_process_correlated_result(parsed, params, outer_columns, kept)
}
fn post_process_correlated_result(
parsed: &parser::ParsedSelect,
params: &[Value],
outer_columns: Vec<schema::ColumnName>,
mut rows: Vec<Vec<Value>>,
) -> Result<QueryResult> {
if !parsed.order_by.is_empty() {
let indices: Vec<(usize, bool)> = parsed
.order_by
.iter()
.map(|ob| {
let idx = outer_columns
.iter()
.position(|c| c == &ob.column)
.ok_or_else(|| QueryError::ColumnNotFound {
table: parsed.table.clone(),
column: ob.column.to_string(),
})?;
Ok::<_, QueryError>((idx, ob.ascending))
})
.collect::<Result<Vec<_>>>()?;
rows.sort_by(|a, b| {
for (idx, asc) in &indices {
let ord = a
.get(*idx)
.and_then(|av| b.get(*idx).and_then(|bv| av.compare(bv)))
.unwrap_or(std::cmp::Ordering::Equal);
let ord = if *asc { ord } else { ord.reverse() };
if ord != std::cmp::Ordering::Equal {
return ord;
}
}
std::cmp::Ordering::Equal
});
}
let offset = match parsed.offset {
Some(parser::LimitExpr::Literal(n)) => n,
Some(parser::LimitExpr::Param(idx)) => params
.get(idx.saturating_sub(1))
.and_then(|v| match v {
Value::BigInt(n) if *n >= 0 => Some(*n as usize),
Value::Integer(n) if *n >= 0 => Some(*n as usize),
_ => None,
})
.unwrap_or(0),
None => 0,
};
let limit = match parsed.limit {
Some(parser::LimitExpr::Literal(n)) => Some(n),
Some(parser::LimitExpr::Param(idx)) => {
params.get(idx.saturating_sub(1)).and_then(|v| match v {
Value::BigInt(n) if *n >= 0 => Some(*n as usize),
Value::Integer(n) if *n >= 0 => Some(*n as usize),
_ => None,
})
}
None => None,
};
if offset > 0 {
rows.drain(0..offset.min(rows.len()));
}
if let Some(l) = limit {
rows.truncate(l);
}
let (out_columns, projected_rows) = match (&parsed.columns, &parsed.column_aliases) {
(None, _) => (outer_columns.clone(), rows),
(Some(cols), aliases) => {
let mut indices = Vec::with_capacity(cols.len());
let mut out_names: Vec<schema::ColumnName> = Vec::with_capacity(cols.len());
for (i, col) in cols.iter().enumerate() {
let idx = outer_columns.iter().position(|c| c == col).ok_or_else(|| {
QueryError::ColumnNotFound {
table: parsed.table.clone(),
column: col.to_string(),
}
})?;
indices.push(idx);
let alias = aliases
.as_ref()
.and_then(|a| a.get(i))
.and_then(|a| a.as_ref());
out_names.push(match alias {
Some(a) => schema::ColumnName::new(a.clone()),
None => col.clone(),
});
}
let projected: Vec<Vec<Value>> = rows
.into_iter()
.map(|r| indices.iter().map(|i| r[*i].clone()).collect())
.collect();
(out_names, projected)
}
};
Ok(QueryResult {
columns: out_columns,
rows: projected_rows,
})
}
fn evaluate_correlated_predicate<S: ProjectionStore>(
&self,
store: &mut S,
pred: &parser::Predicate,
bindings: &std::collections::HashMap<String, Value>,
params: &[Value],
) -> Result<bool> {
match pred {
parser::Predicate::Exists { subquery, negated } => {
let substituted = correlated::substitute_outer_refs(subquery, bindings);
let inner_result = self.execute_inner_subquery(store, &substituted, params)?;
let exists = !inner_result.rows.is_empty();
Ok(if *negated { !exists } else { exists })
}
parser::Predicate::InSubquery {
column,
subquery,
negated,
} => {
let substituted = correlated::substitute_outer_refs(subquery, bindings);
let inner_result = self.execute_inner_subquery(store, &substituted, params)?;
if inner_result.columns.len() != 1 {
return Err(QueryError::UnsupportedFeature(format!(
"IN (SELECT ...) subquery must project exactly 1 column, got {}",
inner_result.columns.len()
)));
}
let outer_val = bindings
.get(column.as_str())
.or_else(|| bindings.values().next()) .cloned();
let Some(outer_val) = outer_val else {
return Ok(false);
};
let any_match = inner_result
.rows
.iter()
.any(|row| row.first().is_some_and(|v| v == &outer_val));
Ok(if *negated { !any_match } else { any_match })
}
_ => Err(QueryError::UnsupportedFeature(
"evaluate_correlated_predicate called on non-subquery predicate".to_string(),
)),
}
}
fn execute_inner_subquery<S: ProjectionStore>(
&self,
store: &mut S,
inner: &parser::ParsedSelect,
params: &[Value],
) -> Result<QueryResult> {
let mut inner_clone = inner.clone();
self.pre_execute_subqueries(store, &mut inner_clone, params)?;
if has_correlated_predicate(&inner_clone.predicates) {
return Err(QueryError::UnsupportedFeature(
"nested correlated subqueries (depth > 1) are not supported in v0.6.0".to_string(),
));
}
let plan = planner::plan_query(&self.schema, &inner_clone, params)?;
let table_def = self
.schema
.get_table(&plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
executor::execute(store, &plan, table_def)
}
fn execute_union<S: ProjectionStore>(
&self,
store: &mut S,
union_stmt: &parser::ParsedUnion,
params: &[Value],
) -> Result<QueryResult> {
let left_plan = planner::plan_query(&self.schema, &union_stmt.left, params)?;
let left_table_def = self
.schema
.get_table(&left_plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(left_plan.table_name().to_string()))?;
let left_result = executor::execute(store, &left_plan, left_table_def)?;
let right_plan = planner::plan_query(&self.schema, &union_stmt.right, params)?;
let right_table_def = self
.schema
.get_table(&right_plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(right_plan.table_name().to_string()))?;
let right_result = executor::execute(store, &right_plan, right_table_def)?;
let column_names = left_result.columns;
let row_key = |row: &Vec<Value>| format!("{row:?}");
let combined_rows: Vec<Vec<Value>> = match (union_stmt.op, union_stmt.all) {
(parser::SetOp::Union, true) => {
let mut all_rows = left_result.rows;
all_rows.extend(right_result.rows);
all_rows
}
(parser::SetOp::Union, false) => {
let mut all_rows = left_result.rows;
all_rows.extend(right_result.rows);
let mut seen = std::collections::HashSet::new();
all_rows.retain(|row| seen.insert(row_key(row)));
all_rows
}
(parser::SetOp::Intersect, false) => {
let right_keys: std::collections::HashSet<String> =
right_result.rows.iter().map(&row_key).collect();
let mut seen = std::collections::HashSet::new();
left_result
.rows
.into_iter()
.filter(|row| {
let key = row_key(row);
right_keys.contains(&key) && seen.insert(key)
})
.collect()
}
(parser::SetOp::Intersect, true) => {
let mut right_counts: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();
for row in &right_result.rows {
*right_counts.entry(row_key(row)).or_insert(0) += 1;
}
let mut out = Vec::new();
for row in left_result.rows {
let key = row_key(&row);
if let Some(count) = right_counts.get_mut(&key) {
if *count > 0 {
*count -= 1;
out.push(row);
}
}
}
out
}
(parser::SetOp::Except, false) => {
let right_keys: std::collections::HashSet<String> =
right_result.rows.iter().map(&row_key).collect();
let mut seen = std::collections::HashSet::new();
left_result
.rows
.into_iter()
.filter(|row| {
let key = row_key(row);
!right_keys.contains(&key) && seen.insert(key)
})
.collect()
}
(parser::SetOp::Except, true) => {
let mut right_counts: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();
for row in &right_result.rows {
*right_counts.entry(row_key(row)).or_insert(0) += 1;
}
let mut out = Vec::new();
for row in left_result.rows {
let key = row_key(&row);
let count = right_counts.entry(key).or_insert(0);
if *count > 0 {
*count -= 1;
} else {
out.push(row);
}
}
out
}
};
Ok(QueryResult {
columns: column_names,
rows: combined_rows,
})
}
pub fn query_at<S: ProjectionStore>(
&self,
store: &mut S,
sql: &str,
params: &[Value],
position: Offset,
) -> Result<QueryResult> {
let stmt = self.parse_query_statement_cached(sql)?;
match stmt {
parser::ParsedStatement::Select(parsed) => {
let plan = planner::plan_query(&self.schema, &parsed, params)?;
let table_def = self
.schema
.get_table(&plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
executor::execute_at(store, &plan, table_def, position)
}
parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
"UNION is not supported in point-in-time queries".to_string(),
)),
_ => unreachable!("parse_query_statement only returns Select or Union"),
}
}
pub fn query_at_timestamp<S, R>(
&self,
store: &mut S,
sql: &str,
params: &[Value],
target_ns: i64,
resolver: R,
) -> Result<QueryResult>
where
S: ProjectionStore,
R: FnOnce(i64) -> Option<Offset>,
{
let offset = resolver(target_ns).ok_or_else(|| {
QueryError::UnsupportedFeature(format!(
"no log offset at or before timestamp {target_ns} ns \
(empty log or predates genesis)"
))
})?;
self.query_at(store, sql, params, offset)
}
pub fn query_at_timestamp_resolved<S, R>(
&self,
store: &mut S,
sql: &str,
params: &[Value],
target_ns: i64,
resolver: R,
) -> Result<QueryResult>
where
S: ProjectionStore,
R: FnOnce(i64) -> TimestampResolution,
{
match resolver(target_ns) {
TimestampResolution::Offset(offset) => self.query_at(store, sql, params, offset),
TimestampResolution::BeforeRetentionHorizon { horizon_ns } => {
Err(QueryError::AsOfBeforeRetentionHorizon {
requested_ns: target_ns,
horizon_ns,
})
}
TimestampResolution::LogEmpty => Err(QueryError::UnsupportedFeature(format!(
"no log offset at or before timestamp {target_ns} ns \
(empty log or predates genesis)"
))),
}
}
pub fn explain(&self, sql: &str, params: &[Value]) -> Result<String> {
let stmt = self.parse_query_statement_cached(sql)?;
match stmt {
parser::ParsedStatement::Select(parsed) => {
let plan = planner::plan_query(&self.schema, &parsed, params)?;
Ok(explain::explain_plan(&plan))
}
parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
"EXPLAIN does not yet render UNION plans".to_string(),
)),
_ => unreachable!("parse_query_statement only returns Select or Union"),
}
}
pub fn prepare(&self, sql: &str, params: &[Value]) -> Result<PreparedQuery> {
let stmt = self.parse_query_statement_cached(sql)?;
let parsed = match stmt {
parser::ParsedStatement::Select(s) => s,
_ => {
return Err(QueryError::UnsupportedFeature(
"only SELECT queries can be prepared".to_string(),
));
}
};
let plan = planner::plan_query(&self.schema, &parsed, params)?;
Ok(PreparedQuery {
plan,
schema: self.schema.clone(),
})
}
}
#[derive(Debug, Clone)]
pub struct PreparedQuery {
plan: plan::QueryPlan,
schema: Schema,
}
impl PreparedQuery {
pub fn execute<S: ProjectionStore>(&self, store: &mut S) -> Result<QueryResult> {
let table_def = self
.schema
.get_table(&self.plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
executor::execute(store, &self.plan, table_def)
}
pub fn execute_at<S: ProjectionStore>(
&self,
store: &mut S,
position: Offset,
) -> Result<QueryResult> {
let table_def = self
.schema
.get_table(&self.plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
executor::execute_at(store, &self.plan, table_def, position)
}
pub fn columns(&self) -> &[ColumnName] {
self.plan.column_names()
}
pub fn table_name(&self) -> &str {
self.plan.table_name()
}
}
fn has_correlated_predicate(predicates: &[parser::Predicate]) -> bool {
predicates.iter().any(|p| {
matches!(
p,
parser::Predicate::InSubquery { .. } | parser::Predicate::Exists { .. }
)
})
}
fn correlated_predicate_outer_refs(pred: &parser::Predicate) -> Vec<correlated::OuterRef> {
let subquery = match pred {
parser::Predicate::InSubquery { subquery, .. }
| parser::Predicate::Exists { subquery, .. } => subquery,
_ => return Vec::new(),
};
let mut out = Vec::new();
for pred in &subquery.predicates {
collect_refs_in_pred(pred, &mut out);
}
out
}
fn collect_refs_in_pred(pred: &parser::Predicate, out: &mut Vec<correlated::OuterRef>) {
let push_if_colref = |pv: &parser::PredicateValue, out: &mut Vec<correlated::OuterRef>| {
if let parser::PredicateValue::ColumnRef(raw) = pv {
if let Some((q, c)) = raw.split_once('.') {
out.push(correlated::OuterRef {
qualifier: q.to_string(),
column: schema::ColumnName::new(c.to_string()),
scope_depth: 1,
});
}
}
};
match pred {
parser::Predicate::Eq(_, v)
| parser::Predicate::Lt(_, v)
| parser::Predicate::Le(_, v)
| parser::Predicate::Gt(_, v)
| parser::Predicate::Ge(_, v) => push_if_colref(v, out),
parser::Predicate::In(_, vs) | parser::Predicate::NotIn(_, vs) => {
for v in vs {
push_if_colref(v, out);
}
}
parser::Predicate::NotBetween(_, lo, hi) => {
push_if_colref(lo, out);
push_if_colref(hi, out);
}
parser::Predicate::Or(l, r) => {
for p in l {
collect_refs_in_pred(p, out);
}
for p in r {
collect_refs_in_pred(p, out);
}
}
_ => {}
}
}
fn value_to_json(val: &Value) -> serde_json::Value {
kimberlite_properties::never!(
matches!(val, Value::Placeholder(_)),
"query.placeholder_reaches_result_boundary",
"Value::Placeholder must never reach query-result / JSON serialization boundary"
);
match val {
Value::Null | Value::Placeholder(_) => serde_json::Value::Null,
Value::BigInt(i) => serde_json::json!(i),
Value::TinyInt(i) => serde_json::json!(i),
Value::SmallInt(i) => serde_json::json!(i),
Value::Integer(i) => serde_json::json!(i),
Value::Real(f) => serde_json::json!(f),
Value::Decimal(v, scale) => {
if *scale == 0 {
serde_json::json!(v.to_string())
} else {
let divisor = 10i128.pow(u32::from(*scale));
let whole = v / divisor;
let frac = (v % divisor).unsigned_abs();
serde_json::json!(format!("{whole}.{frac:0>width$}", width = *scale as usize))
}
}
Value::Text(s) => serde_json::json!(s),
Value::Boolean(b) => serde_json::json!(b),
Value::Date(d) => serde_json::json!(d),
Value::Time(t) => serde_json::json!(t),
Value::Timestamp(ts) => serde_json::json!(ts.as_nanos()),
Value::Uuid(u) => {
let hex: String = u.iter().map(|b| format!("{b:02x}")).collect();
serde_json::json!(hex)
}
Value::Json(j) => j.clone(),
Value::Bytes(b) => {
use base64::Engine;
serde_json::json!(base64::engine::general_purpose::STANDARD.encode(b))
}
}
}