pub mod dml_planner;
mod error;
mod executor;
pub mod explain;
pub mod expression;
pub mod information_schema;
pub mod key_encoder;
mod parse_cache;
mod parser;
mod plan;
mod planner;
pub mod rbac_filter;
mod schema;
mod value;
pub mod window;
#[cfg(test)]
mod tests;
pub use error::{QueryError, Result};
pub use executor::{QueryResult, Row, execute};
pub use parser::{
AlterTableOperation, HavingCondition, HavingOp, ParsedAlterTable, ParsedColumn,
ParsedCreateIndex, ParsedCreateMask, ParsedCreateTable, ParsedCreateUser, ParsedCte,
ParsedDelete, ParsedGrant, ParsedInsert, ParsedSelect, ParsedSetClassification,
ParsedStatement, ParsedUnion, ParsedUpdate, Predicate, PredicateValue, TimeTravel,
extract_at_offset, extract_time_travel, parse_statement, try_parse_custom_statement,
};
pub use planner::plan_query;
pub use schema::{
ColumnDef, ColumnName, DataType, IndexDef, Schema, SchemaBuilder, TableDef, TableName,
};
pub use value::Value;
use kimberlite_store::ProjectionStore;
use kimberlite_types::Offset;
#[derive(Debug, Clone)]
pub struct QueryEngine {
schema: Schema,
parse_cache: Option<std::sync::Arc<parse_cache::ParseCache>>,
}
impl QueryEngine {
pub fn new(schema: Schema) -> Self {
Self {
schema,
parse_cache: None,
}
}
#[must_use]
pub fn with_parse_cache(mut self, max_size: usize) -> Self {
self.parse_cache = Some(std::sync::Arc::new(parse_cache::ParseCache::new(max_size)));
self
}
pub fn parse_cache_stats(&self) -> Option<parse_cache::ParseCacheStats> {
self.parse_cache
.as_deref()
.map(parse_cache::ParseCache::stats)
}
pub fn clear_parse_cache(&self) {
if let Some(c) = &self.parse_cache {
c.clear();
}
}
pub fn schema(&self) -> &Schema {
&self.schema
}
fn parse_query_statement(sql: &str) -> Result<parser::ParsedStatement> {
let stmt = parser::parse_statement(sql)?;
match &stmt {
parser::ParsedStatement::Select(_) | parser::ParsedStatement::Union(_) => Ok(stmt),
_ => Err(QueryError::UnsupportedFeature(
"only SELECT and UNION queries are supported".to_string(),
)),
}
}
fn parse_query_statement_cached(&self, sql: &str) -> Result<parser::ParsedStatement> {
if let Some(cache) = &self.parse_cache {
if let Some(stmt) = cache.get(sql) {
return Ok(stmt);
}
}
let stmt = Self::parse_query_statement(sql)?;
if let Some(cache) = &self.parse_cache {
cache.insert(sql.to_string(), stmt.clone());
}
Ok(stmt)
}
pub fn query<S: ProjectionStore>(
&self,
store: &mut S,
sql: &str,
params: &[Value],
) -> Result<QueryResult> {
let (after_break_glass, break_glass_reason) = explain::extract_break_glass(sql);
if let Some(ref reason) = break_glass_reason {
tracing::warn!(
break_glass_reason = %reason,
"BREAK_GLASS query — regulator-visible audit signal",
);
}
let sql = after_break_glass;
if let Some(result) = information_schema::maybe_answer(sql, &self.schema) {
return Ok(result);
}
let (after_explain, is_explain) = explain::extract_explain(sql);
if is_explain {
let plan_text = self.explain(after_explain, params)?;
return Ok(executor::QueryResult {
columns: vec!["plan".into()],
rows: vec![vec![Value::Text(plan_text)]],
});
}
let sql = after_explain;
let (cleaned_sql, time_travel) = parser::extract_time_travel(sql);
match time_travel {
Some(parser::TimeTravel::Offset(o)) => {
return self.query_at(store, &cleaned_sql, params, Offset::new(o));
}
Some(parser::TimeTravel::TimestampNs(_)) => {
return Err(QueryError::UnsupportedFeature(
"FOR SYSTEM_TIME AS OF '<iso>' / AS OF '<iso>' \
requires a timestamp→offset resolver — use \
QueryEngine::query_at_timestamp(..., resolver)"
.to_string(),
));
}
None => {}
}
let sql = sql;
let stmt = self.parse_query_statement_cached(sql)?;
match stmt {
parser::ParsedStatement::Select(mut parsed) => {
self.pre_execute_subqueries(store, &mut parsed.predicates, params)?;
let window_fns = parsed.window_fns.clone();
let result = if parsed.ctes.is_empty() {
let plan = planner::plan_query(&self.schema, &parsed, params)?;
let table_def = self
.schema
.get_table(&plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
executor::execute(store, &plan, table_def)?
} else {
self.execute_with_ctes(store, &parsed, params)?
};
window::apply_window_fns(result, &window_fns)
}
parser::ParsedStatement::Union(union_stmt) => {
self.execute_union(store, &union_stmt, params)
}
_ => unreachable!("parse_query_statement only returns Select or Union"),
}
}
fn execute_with_ctes<S: ProjectionStore>(
&self,
store: &mut S,
parsed: &parser::ParsedSelect,
params: &[Value],
) -> Result<QueryResult> {
let mut extended_schema = self.schema.clone();
for cte in &parsed.ctes {
let cte_plan = planner::plan_query(&extended_schema, &cte.query, params)?;
let cte_table_def = extended_schema
.get_table(&cte_plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(cte_plan.table_name().to_string()))?;
let mut cte_result = executor::execute(store, &cte_plan, cte_table_def)?;
let cte_table_id = {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
cte.name.hash(&mut hasher);
kimberlite_store::TableId::new(hasher.finish())
};
let cte_columns: Vec<schema::ColumnDef> = cte_result
.columns
.iter()
.map(|col| schema::ColumnDef::new(col.as_str(), schema::DataType::Text))
.collect();
let pk_cols = if cte_columns.is_empty() {
vec![]
} else {
vec![cte_result.columns[0].clone()]
};
let cte_table = schema::TableDef::new(cte_table_id, cte_columns, pk_cols);
extended_schema.add_table(cte.name.as_str(), cte_table);
let mut total_rows = cte_result.rows.len();
for (row_idx, row) in cte_result.rows.iter().enumerate() {
Self::write_cte_row(store, cte_table_id, row_idx, &cte_result.columns, row)?;
}
if let Some(recursive_select) = &cte.recursive_arm {
const MAX_RECURSIVE_DEPTH: usize = 1000;
let mut seen: std::collections::HashSet<String> =
cte_result.rows.iter().map(|r| format!("{r:?}")).collect();
for depth in 0..MAX_RECURSIVE_DEPTH {
let recursive_plan =
planner::plan_query(&extended_schema, recursive_select, params)?;
let recursive_table_def = extended_schema
.get_table(&recursive_plan.table_name().into())
.ok_or_else(|| {
QueryError::TableNotFound(recursive_plan.table_name().to_string())
})?;
let iteration_result =
executor::execute(store, &recursive_plan, recursive_table_def)?;
let mut new_rows = 0usize;
for row in iteration_result.rows {
let key = format!("{row:?}");
if seen.insert(key) {
Self::write_cte_row(
store,
cte_table_id,
total_rows,
&cte_result.columns,
&row,
)?;
cte_result.rows.push(row);
total_rows += 1;
new_rows += 1;
}
}
if new_rows == 0 {
break;
}
if depth + 1 == MAX_RECURSIVE_DEPTH {
return Err(QueryError::UnsupportedFeature(format!(
"recursive CTE `{}` exceeded maximum depth of {MAX_RECURSIVE_DEPTH} iterations",
cte.name
)));
}
}
}
}
let main_query = parser::ParsedSelect {
ctes: vec![], ..parsed.clone()
};
let plan = planner::plan_query(&extended_schema, &main_query, params)?;
let table_def = extended_schema
.get_table(&plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
executor::execute(store, &plan, table_def)
}
fn write_cte_row<S: ProjectionStore>(
store: &mut S,
cte_table_id: kimberlite_store::TableId,
row_idx: usize,
columns: &[crate::schema::ColumnName],
row: &[Value],
) -> Result<()> {
let mut row_map = serde_json::Map::new();
for (col, val) in columns.iter().zip(row.iter()) {
row_map.insert(col.as_str().to_string(), value_to_json(val));
}
let json_val = serde_json::to_vec(&serde_json::Value::Object(row_map)).map_err(|e| {
QueryError::UnsupportedFeature(format!("CTE serialization failed: {e}"))
})?;
let pk_key = crate::key_encoder::encode_key(&[Value::BigInt(row_idx as i64)]);
let batch = kimberlite_store::WriteBatch::new(kimberlite_types::Offset::new(
store.applied_position().as_u64() + 1,
))
.put(cte_table_id, pk_key, bytes::Bytes::from(json_val));
store.apply(batch)?;
Ok(())
}
fn pre_execute_subqueries<S: ProjectionStore>(
&self,
store: &mut S,
predicates: &mut Vec<parser::Predicate>,
params: &[Value],
) -> Result<()> {
for pred in predicates.iter_mut() {
match pred {
parser::Predicate::InSubquery { column, subquery } => {
let inner_plan = planner::plan_query(&self.schema, subquery, params)?;
let inner_table_def = self
.schema
.get_table(&inner_plan.table_name().into())
.ok_or_else(|| {
QueryError::TableNotFound(inner_plan.table_name().to_string())
})?;
let inner_result = executor::execute(store, &inner_plan, inner_table_def)?;
if inner_result.columns.len() != 1 {
return Err(QueryError::UnsupportedFeature(format!(
"IN (SELECT ...) subquery must project exactly 1 column, got {}",
inner_result.columns.len()
)));
}
let values: Vec<parser::PredicateValue> = inner_result
.rows
.into_iter()
.filter_map(|row| row.into_iter().next())
.map(parser::PredicateValue::Literal)
.collect();
*pred = parser::Predicate::In(column.clone(), values);
}
parser::Predicate::Exists { subquery, negated } => {
let inner_plan = planner::plan_query(&self.schema, subquery, params)?;
let inner_table_def = self
.schema
.get_table(&inner_plan.table_name().into())
.ok_or_else(|| {
QueryError::TableNotFound(inner_plan.table_name().to_string())
})?;
let inner_result = executor::execute(store, &inner_plan, inner_table_def)?;
let exists = !inner_result.rows.is_empty();
let predicate_holds = if *negated { !exists } else { exists };
*pred = parser::Predicate::Always(predicate_holds);
}
parser::Predicate::Or(left, right) => {
self.pre_execute_subqueries(store, left, params)?;
self.pre_execute_subqueries(store, right, params)?;
}
_ => {}
}
}
Ok(())
}
fn execute_union<S: ProjectionStore>(
&self,
store: &mut S,
union_stmt: &parser::ParsedUnion,
params: &[Value],
) -> Result<QueryResult> {
let left_plan = planner::plan_query(&self.schema, &union_stmt.left, params)?;
let left_table_def = self
.schema
.get_table(&left_plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(left_plan.table_name().to_string()))?;
let left_result = executor::execute(store, &left_plan, left_table_def)?;
let right_plan = planner::plan_query(&self.schema, &union_stmt.right, params)?;
let right_table_def = self
.schema
.get_table(&right_plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(right_plan.table_name().to_string()))?;
let right_result = executor::execute(store, &right_plan, right_table_def)?;
let column_names = left_result.columns;
let row_key = |row: &Vec<Value>| format!("{row:?}");
let combined_rows: Vec<Vec<Value>> = match (union_stmt.op, union_stmt.all) {
(parser::SetOp::Union, true) => {
let mut all_rows = left_result.rows;
all_rows.extend(right_result.rows);
all_rows
}
(parser::SetOp::Union, false) => {
let mut all_rows = left_result.rows;
all_rows.extend(right_result.rows);
let mut seen = std::collections::HashSet::new();
all_rows.retain(|row| seen.insert(row_key(row)));
all_rows
}
(parser::SetOp::Intersect, false) => {
let right_keys: std::collections::HashSet<String> =
right_result.rows.iter().map(&row_key).collect();
let mut seen = std::collections::HashSet::new();
left_result
.rows
.into_iter()
.filter(|row| {
let key = row_key(row);
right_keys.contains(&key) && seen.insert(key)
})
.collect()
}
(parser::SetOp::Intersect, true) => {
let mut right_counts: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();
for row in &right_result.rows {
*right_counts.entry(row_key(row)).or_insert(0) += 1;
}
let mut out = Vec::new();
for row in left_result.rows {
let key = row_key(&row);
if let Some(count) = right_counts.get_mut(&key) {
if *count > 0 {
*count -= 1;
out.push(row);
}
}
}
out
}
(parser::SetOp::Except, false) => {
let right_keys: std::collections::HashSet<String> =
right_result.rows.iter().map(&row_key).collect();
let mut seen = std::collections::HashSet::new();
left_result
.rows
.into_iter()
.filter(|row| {
let key = row_key(row);
!right_keys.contains(&key) && seen.insert(key)
})
.collect()
}
(parser::SetOp::Except, true) => {
let mut right_counts: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();
for row in &right_result.rows {
*right_counts.entry(row_key(row)).or_insert(0) += 1;
}
let mut out = Vec::new();
for row in left_result.rows {
let key = row_key(&row);
let count = right_counts.entry(key).or_insert(0);
if *count > 0 {
*count -= 1;
} else {
out.push(row);
}
}
out
}
};
Ok(QueryResult {
columns: column_names,
rows: combined_rows,
})
}
pub fn query_at<S: ProjectionStore>(
&self,
store: &mut S,
sql: &str,
params: &[Value],
position: Offset,
) -> Result<QueryResult> {
let stmt = self.parse_query_statement_cached(sql)?;
match stmt {
parser::ParsedStatement::Select(parsed) => {
let plan = planner::plan_query(&self.schema, &parsed, params)?;
let table_def = self
.schema
.get_table(&plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
executor::execute_at(store, &plan, table_def, position)
}
parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
"UNION is not supported in point-in-time queries".to_string(),
)),
_ => unreachable!("parse_query_statement only returns Select or Union"),
}
}
pub fn query_at_timestamp<S, R>(
&self,
store: &mut S,
sql: &str,
params: &[Value],
target_ns: i64,
resolver: R,
) -> Result<QueryResult>
where
S: ProjectionStore,
R: FnOnce(i64) -> Option<Offset>,
{
let offset = resolver(target_ns).ok_or_else(|| {
QueryError::UnsupportedFeature(format!(
"no log offset at or before timestamp {target_ns} ns \
(empty log or predates genesis)"
))
})?;
self.query_at(store, sql, params, offset)
}
pub fn explain(&self, sql: &str, params: &[Value]) -> Result<String> {
let stmt = self.parse_query_statement_cached(sql)?;
match stmt {
parser::ParsedStatement::Select(parsed) => {
let plan = planner::plan_query(&self.schema, &parsed, params)?;
Ok(explain::explain_plan(&plan))
}
parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
"EXPLAIN does not yet render UNION plans".to_string(),
)),
_ => unreachable!("parse_query_statement only returns Select or Union"),
}
}
pub fn prepare(&self, sql: &str, params: &[Value]) -> Result<PreparedQuery> {
let stmt = self.parse_query_statement_cached(sql)?;
let parsed = match stmt {
parser::ParsedStatement::Select(s) => s,
_ => {
return Err(QueryError::UnsupportedFeature(
"only SELECT queries can be prepared".to_string(),
));
}
};
let plan = planner::plan_query(&self.schema, &parsed, params)?;
Ok(PreparedQuery {
plan,
schema: self.schema.clone(),
})
}
}
#[derive(Debug, Clone)]
pub struct PreparedQuery {
plan: plan::QueryPlan,
schema: Schema,
}
impl PreparedQuery {
pub fn execute<S: ProjectionStore>(&self, store: &mut S) -> Result<QueryResult> {
let table_def = self
.schema
.get_table(&self.plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
executor::execute(store, &self.plan, table_def)
}
pub fn execute_at<S: ProjectionStore>(
&self,
store: &mut S,
position: Offset,
) -> Result<QueryResult> {
let table_def = self
.schema
.get_table(&self.plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
executor::execute_at(store, &self.plan, table_def, position)
}
pub fn columns(&self) -> &[ColumnName] {
self.plan.column_names()
}
pub fn table_name(&self) -> &str {
self.plan.table_name()
}
}
fn value_to_json(val: &Value) -> serde_json::Value {
kimberlite_properties::never!(
matches!(val, Value::Placeholder(_)),
"query.placeholder_reaches_result_boundary",
"Value::Placeholder must never reach query-result / JSON serialization boundary"
);
match val {
Value::Null | Value::Placeholder(_) => serde_json::Value::Null,
Value::BigInt(i) => serde_json::json!(i),
Value::TinyInt(i) => serde_json::json!(i),
Value::SmallInt(i) => serde_json::json!(i),
Value::Integer(i) => serde_json::json!(i),
Value::Real(f) => serde_json::json!(f),
Value::Decimal(v, scale) => {
if *scale == 0 {
serde_json::json!(v.to_string())
} else {
let divisor = 10i128.pow(u32::from(*scale));
let whole = v / divisor;
let frac = (v % divisor).unsigned_abs();
serde_json::json!(format!("{whole}.{frac:0>width$}", width = *scale as usize))
}
}
Value::Text(s) => serde_json::json!(s),
Value::Boolean(b) => serde_json::json!(b),
Value::Date(d) => serde_json::json!(d),
Value::Time(t) => serde_json::json!(t),
Value::Timestamp(ts) => serde_json::json!(ts.as_nanos()),
Value::Uuid(u) => {
let hex: String = u.iter().map(|b| format!("{b:02x}")).collect();
serde_json::json!(hex)
}
Value::Json(j) => j.clone(),
Value::Bytes(b) => {
use base64::Engine;
serde_json::json!(base64::engine::general_purpose::STANDARD.encode(b))
}
}
}