pub mod dml_planner;
mod error;
mod executor;
pub mod explain;
pub mod information_schema;
pub mod key_encoder;
mod parse_cache;
mod parser;
mod plan;
mod planner;
pub mod rbac_filter;
mod schema;
mod value;
pub mod window;
#[cfg(test)]
mod tests;
pub use error::{QueryError, Result};
pub use executor::{QueryResult, Row, execute};
pub use parser::{
HavingCondition, HavingOp, ParsedAlterTable, ParsedColumn, ParsedCreateIndex,
ParsedCreateMask, ParsedCreateTable, ParsedCreateUser, ParsedCte, ParsedDelete, ParsedGrant,
ParsedInsert, ParsedSelect, ParsedSetClassification, ParsedStatement, ParsedUnion,
ParsedUpdate, Predicate, PredicateValue, TimeTravel, extract_at_offset,
extract_time_travel, parse_statement, try_parse_custom_statement,
};
pub use planner::plan_query;
pub use schema::{
ColumnDef, ColumnName, DataType, IndexDef, Schema, SchemaBuilder, TableDef, TableName,
};
pub use value::Value;
use kimberlite_store::ProjectionStore;
use kimberlite_types::Offset;
#[derive(Debug, Clone)]
pub struct QueryEngine {
schema: Schema,
parse_cache: Option<std::sync::Arc<parse_cache::ParseCache>>,
}
impl QueryEngine {
pub fn new(schema: Schema) -> Self {
Self {
schema,
parse_cache: None,
}
}
#[must_use]
pub fn with_parse_cache(mut self, max_size: usize) -> Self {
self.parse_cache = Some(std::sync::Arc::new(parse_cache::ParseCache::new(max_size)));
self
}
pub fn parse_cache_stats(&self) -> Option<parse_cache::ParseCacheStats> {
self.parse_cache.as_deref().map(parse_cache::ParseCache::stats)
}
pub fn clear_parse_cache(&self) {
if let Some(c) = &self.parse_cache {
c.clear();
}
}
pub fn schema(&self) -> &Schema {
&self.schema
}
fn parse_query_statement(sql: &str) -> Result<parser::ParsedStatement> {
let stmt = parser::parse_statement(sql)?;
match &stmt {
parser::ParsedStatement::Select(_) | parser::ParsedStatement::Union(_) => Ok(stmt),
_ => Err(QueryError::UnsupportedFeature(
"only SELECT and UNION queries are supported".to_string(),
)),
}
}
fn parse_query_statement_cached(
&self,
sql: &str,
) -> Result<parser::ParsedStatement> {
if let Some(cache) = &self.parse_cache {
if let Some(stmt) = cache.get(sql) {
return Ok(stmt);
}
}
let stmt = Self::parse_query_statement(sql)?;
if let Some(cache) = &self.parse_cache {
cache.insert(sql.to_string(), stmt.clone());
}
Ok(stmt)
}
pub fn query<S: ProjectionStore>(
&self,
store: &mut S,
sql: &str,
params: &[Value],
) -> Result<QueryResult> {
let (after_break_glass, break_glass_reason) =
explain::extract_break_glass(sql);
if let Some(ref reason) = break_glass_reason {
tracing::warn!(
break_glass_reason = %reason,
"BREAK_GLASS query — regulator-visible audit signal",
);
}
let sql = after_break_glass;
if let Some(result) = information_schema::maybe_answer(sql, &self.schema) {
return Ok(result);
}
let (after_explain, is_explain) = explain::extract_explain(sql);
if is_explain {
let plan_text = self.explain(after_explain, params)?;
return Ok(executor::QueryResult {
columns: vec!["plan".into()],
rows: vec![vec![Value::Text(plan_text)]],
});
}
let sql = after_explain;
let (cleaned_sql, time_travel) = parser::extract_time_travel(sql);
match time_travel {
Some(parser::TimeTravel::Offset(o)) => {
return self.query_at(store, &cleaned_sql, params, Offset::new(o));
}
Some(parser::TimeTravel::TimestampNs(_)) => {
return Err(QueryError::UnsupportedFeature(
"FOR SYSTEM_TIME AS OF '<iso>' / AS OF '<iso>' \
requires a timestamp→offset resolver — use \
QueryEngine::query_at_timestamp(..., resolver)"
.to_string(),
));
}
None => {}
}
let sql = sql;
let stmt = self.parse_query_statement_cached(sql)?;
match stmt {
parser::ParsedStatement::Select(parsed) => {
let window_fns = parsed.window_fns.clone();
let result = if parsed.ctes.is_empty() {
let plan = planner::plan_query(&self.schema, &parsed, params)?;
let table_def = self
.schema
.get_table(&plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
executor::execute(store, &plan, table_def)?
} else {
self.execute_with_ctes(store, &parsed, params)?
};
window::apply_window_fns(result, &window_fns)
}
parser::ParsedStatement::Union(union_stmt) => {
self.execute_union(store, &union_stmt, params)
}
_ => unreachable!("parse_query_statement only returns Select or Union"),
}
}
fn execute_with_ctes<S: ProjectionStore>(
&self,
store: &mut S,
parsed: &parser::ParsedSelect,
params: &[Value],
) -> Result<QueryResult> {
let mut extended_schema = self.schema.clone();
for cte in &parsed.ctes {
let cte_plan = planner::plan_query(&extended_schema, &cte.query, params)?;
let cte_table_def = extended_schema
.get_table(&cte_plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(cte_plan.table_name().to_string()))?;
let cte_result = executor::execute(store, &cte_plan, cte_table_def)?;
let cte_table_id = {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
cte.name.hash(&mut hasher);
kimberlite_store::TableId::new(hasher.finish())
};
let cte_columns: Vec<schema::ColumnDef> = cte_result
.columns
.iter()
.map(|col| schema::ColumnDef::new(col.as_str(), schema::DataType::Text))
.collect();
let pk_cols = if cte_columns.is_empty() {
vec![]
} else {
vec![cte_result.columns[0].clone()]
};
let cte_table = schema::TableDef::new(cte_table_id, cte_columns, pk_cols);
extended_schema.add_table(cte.name.as_str(), cte_table);
for (row_idx, row) in cte_result.rows.iter().enumerate() {
let mut row_map = serde_json::Map::new();
for (col, val) in cte_result.columns.iter().zip(row.iter()) {
row_map.insert(col.as_str().to_string(), value_to_json(val));
}
let json_val =
serde_json::to_vec(&serde_json::Value::Object(row_map)).map_err(|e| {
QueryError::UnsupportedFeature(format!("CTE serialization failed: {e}"))
})?;
let pk_key = crate::key_encoder::encode_key(&[Value::BigInt(row_idx as i64)]);
let batch = kimberlite_store::WriteBatch::new(kimberlite_types::Offset::new(
store.applied_position().as_u64() + 1,
))
.put(cte_table_id, pk_key, bytes::Bytes::from(json_val));
store.apply(batch)?;
}
}
let main_query = parser::ParsedSelect {
ctes: vec![], ..parsed.clone()
};
let plan = planner::plan_query(&extended_schema, &main_query, params)?;
let table_def = extended_schema
.get_table(&plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
executor::execute(store, &plan, table_def)
}
fn execute_union<S: ProjectionStore>(
&self,
store: &mut S,
union_stmt: &parser::ParsedUnion,
params: &[Value],
) -> Result<QueryResult> {
let left_plan = planner::plan_query(&self.schema, &union_stmt.left, params)?;
let left_table_def = self
.schema
.get_table(&left_plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(left_plan.table_name().to_string()))?;
let left_result = executor::execute(store, &left_plan, left_table_def)?;
let right_plan = planner::plan_query(&self.schema, &union_stmt.right, params)?;
let right_table_def = self
.schema
.get_table(&right_plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(right_plan.table_name().to_string()))?;
let right_result = executor::execute(store, &right_plan, right_table_def)?;
let column_names = left_result.columns;
let mut all_rows = left_result.rows;
all_rows.extend(right_result.rows);
if !union_stmt.all {
let mut seen = std::collections::HashSet::new();
all_rows.retain(|row| {
let key = format!("{row:?}");
seen.insert(key)
});
}
Ok(QueryResult {
columns: column_names,
rows: all_rows,
})
}
pub fn query_at<S: ProjectionStore>(
&self,
store: &mut S,
sql: &str,
params: &[Value],
position: Offset,
) -> Result<QueryResult> {
let stmt = self.parse_query_statement_cached(sql)?;
match stmt {
parser::ParsedStatement::Select(parsed) => {
let plan = planner::plan_query(&self.schema, &parsed, params)?;
let table_def = self
.schema
.get_table(&plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
executor::execute_at(store, &plan, table_def, position)
}
parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
"UNION is not supported in point-in-time queries".to_string(),
)),
_ => unreachable!("parse_query_statement only returns Select or Union"),
}
}
pub fn query_at_timestamp<S, R>(
&self,
store: &mut S,
sql: &str,
params: &[Value],
target_ns: i64,
resolver: R,
) -> Result<QueryResult>
where
S: ProjectionStore,
R: FnOnce(i64) -> Option<Offset>,
{
let offset = resolver(target_ns).ok_or_else(|| {
QueryError::UnsupportedFeature(format!(
"no log offset at or before timestamp {target_ns} ns \
(empty log or predates genesis)"
))
})?;
self.query_at(store, sql, params, offset)
}
pub fn explain(&self, sql: &str, params: &[Value]) -> Result<String> {
let stmt = self.parse_query_statement_cached(sql)?;
match stmt {
parser::ParsedStatement::Select(parsed) => {
let plan = planner::plan_query(&self.schema, &parsed, params)?;
Ok(explain::explain_plan(&plan))
}
parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
"EXPLAIN does not yet render UNION plans".to_string(),
)),
_ => unreachable!("parse_query_statement only returns Select or Union"),
}
}
pub fn prepare(&self, sql: &str, params: &[Value]) -> Result<PreparedQuery> {
let stmt = self.parse_query_statement_cached(sql)?;
let parsed = match stmt {
parser::ParsedStatement::Select(s) => s,
_ => {
return Err(QueryError::UnsupportedFeature(
"only SELECT queries can be prepared".to_string(),
));
}
};
let plan = planner::plan_query(&self.schema, &parsed, params)?;
Ok(PreparedQuery {
plan,
schema: self.schema.clone(),
})
}
}
#[derive(Debug, Clone)]
pub struct PreparedQuery {
plan: plan::QueryPlan,
schema: Schema,
}
impl PreparedQuery {
pub fn execute<S: ProjectionStore>(&self, store: &mut S) -> Result<QueryResult> {
let table_def = self
.schema
.get_table(&self.plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
executor::execute(store, &self.plan, table_def)
}
pub fn execute_at<S: ProjectionStore>(
&self,
store: &mut S,
position: Offset,
) -> Result<QueryResult> {
let table_def = self
.schema
.get_table(&self.plan.table_name().into())
.ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
executor::execute_at(store, &self.plan, table_def, position)
}
pub fn columns(&self) -> &[ColumnName] {
self.plan.column_names()
}
pub fn table_name(&self) -> &str {
self.plan.table_name()
}
}
fn value_to_json(val: &Value) -> serde_json::Value {
kimberlite_properties::never!(
matches!(val, Value::Placeholder(_)),
"query.placeholder_reaches_result_boundary",
"Value::Placeholder must never reach query-result / JSON serialization boundary"
);
match val {
Value::Null | Value::Placeholder(_) => serde_json::Value::Null,
Value::BigInt(i) => serde_json::json!(i),
Value::TinyInt(i) => serde_json::json!(i),
Value::SmallInt(i) => serde_json::json!(i),
Value::Integer(i) => serde_json::json!(i),
Value::Real(f) => serde_json::json!(f),
Value::Decimal(v, scale) => {
if *scale == 0 {
serde_json::json!(v.to_string())
} else {
let divisor = 10i128.pow(u32::from(*scale));
let whole = v / divisor;
let frac = (v % divisor).unsigned_abs();
serde_json::json!(format!("{whole}.{frac:0>width$}", width = *scale as usize))
}
}
Value::Text(s) => serde_json::json!(s),
Value::Boolean(b) => serde_json::json!(b),
Value::Date(d) => serde_json::json!(d),
Value::Time(t) => serde_json::json!(t),
Value::Timestamp(ts) => serde_json::json!(ts.as_nanos()),
Value::Uuid(u) => {
let hex: String = u.iter().map(|b| format!("{b:02x}")).collect();
serde_json::json!(hex)
}
Value::Json(j) => j.clone(),
Value::Bytes(b) => {
use base64::Engine;
serde_json::json!(base64::engine::general_purpose::STANDARD.encode(b))
}
}
}