use std::ops::Bound;
use crate::error::{QueryError, Result};
use crate::key_encoder::{encode_key, successor_key};
use crate::parser::{
CaseWhenArm, ComputedColumn, LimitExpr, OrderByClause, ParsedSelect, Predicate, PredicateValue,
};
use crate::plan::{
CaseColumnDef, CaseWhenClause, Filter, FilterCondition, FilterOp, QueryPlan, ScanOrder,
SortSpec,
};
use crate::schema::{ColumnName, Schema, TableDef};
use crate::value::Value;
#[inline]
fn create_metadata(table_def: &TableDef, table_name: String) -> crate::plan::TableMetadata {
crate::plan::TableMetadata {
table_id: table_def.table_id,
table_name,
columns: table_def.columns.clone(),
primary_key: table_def.primary_key.clone(),
}
}
#[inline]
fn build_point_lookup_plan(
table_def: &TableDef,
table_name: String,
key_values: &[Value],
column_indices: Vec<usize>,
column_names: Vec<ColumnName>,
) -> QueryPlan {
let key = encode_key(key_values);
QueryPlan::PointLookup {
metadata: create_metadata(table_def, table_name),
key,
columns: column_indices,
column_names,
}
}
#[inline]
#[allow(clippy::too_many_arguments)]
fn build_range_scan_plan(
table_def: &TableDef,
table_name: String,
start_key: Bound<kimberlite_store::Key>,
end_key: Bound<kimberlite_store::Key>,
remaining_predicates: &[ResolvedPredicate],
limit: Option<usize>,
offset: Option<usize>,
order_by: &[OrderByClause],
column_indices: Vec<usize>,
column_names: Vec<ColumnName>,
) -> Result<QueryPlan> {
let filter = build_filter(table_def, remaining_predicates, &table_name)?;
let order = determine_scan_order(order_by, table_def);
let needs_client_sort = !order_by.is_empty()
&& order_by
.iter()
.any(|clause| !table_def.is_primary_key(&clause.column));
let order_by_spec = if needs_client_sort {
build_sort_spec(order_by, table_def, &table_name)?
} else {
None
};
Ok(QueryPlan::RangeScan {
metadata: create_metadata(table_def, table_name),
start: start_key,
end: end_key,
filter,
limit,
offset,
order,
order_by: order_by_spec,
columns: column_indices,
column_names,
})
}
#[inline]
#[allow(clippy::too_many_arguments)]
fn build_index_scan_plan(
table_def: &TableDef,
table_name: String,
index_id: u64,
index_name: String,
start_key: Bound<kimberlite_store::Key>,
end_key: Bound<kimberlite_store::Key>,
remaining_predicates: &[ResolvedPredicate],
limit: Option<usize>,
offset: Option<usize>,
order_by: &[OrderByClause],
column_indices: Vec<usize>,
column_names: Vec<ColumnName>,
) -> Result<QueryPlan> {
let filter = build_filter(table_def, remaining_predicates, &table_name)?;
let order = determine_scan_order(order_by, table_def);
let needs_client_sort = !order_by.is_empty()
&& order_by
.iter()
.any(|clause| !table_def.is_primary_key(&clause.column));
let order_by_spec = if needs_client_sort {
build_sort_spec(order_by, table_def, &table_name)?
} else {
None
};
Ok(QueryPlan::IndexScan {
metadata: create_metadata(table_def, table_name),
index_id,
index_name,
start: start_key,
end: end_key,
filter,
limit,
offset,
order,
order_by: order_by_spec,
columns: column_indices,
column_names,
})
}
#[inline]
#[allow(clippy::too_many_arguments)]
fn build_table_scan_plan(
table_def: &TableDef,
table_name: String,
all_predicates: &[ResolvedPredicate],
limit: Option<usize>,
offset: Option<usize>,
order_by: &[OrderByClause],
column_indices: Vec<usize>,
column_names: Vec<ColumnName>,
) -> Result<QueryPlan> {
let filter = build_filter(table_def, all_predicates, &table_name)?;
let order = build_sort_spec(order_by, table_def, &table_name)?;
Ok(QueryPlan::TableScan {
metadata: create_metadata(table_def, table_name),
filter,
limit,
offset,
order,
columns: column_indices,
column_names,
})
}
fn resolve_limit(expr: Option<LimitExpr>, params: &[Value]) -> Result<Option<usize>> {
match expr {
None => Ok(None),
Some(LimitExpr::Literal(v)) => Ok(Some(v)),
Some(LimitExpr::Param(idx)) => {
let zero_idx = idx.checked_sub(1).ok_or(QueryError::ParameterNotFound(0))?;
let value = params
.get(zero_idx)
.cloned()
.ok_or(QueryError::ParameterNotFound(idx))?;
match value {
Value::BigInt(n) if n >= 0 => Ok(Some(n as usize)),
Value::Integer(n) if n >= 0 => Ok(Some(n as usize)),
Value::SmallInt(n) if n >= 0 => Ok(Some(n as usize)),
Value::TinyInt(n) if n >= 0 => Ok(Some(n as usize)),
Value::BigInt(_) | Value::Integer(_) | Value::SmallInt(_) | Value::TinyInt(_) => {
Err(QueryError::ParseError(
"LIMIT/OFFSET parameter must be non-negative".to_string(),
))
}
other => Err(QueryError::UnsupportedFeature(format!(
"LIMIT/OFFSET parameter must bind to an integer; got {other:?}"
))),
}
}
}
}
#[inline]
fn wrap_with_aggregate(
base_plan: QueryPlan,
table_def: &TableDef,
table_name: String,
parsed: &ParsedSelect,
params: &[Value],
) -> Result<QueryPlan> {
let group_by_columns = if parsed.distinct && parsed.group_by.is_empty() {
parsed
.columns
.clone()
.unwrap_or_else(|| table_def.columns.iter().map(|c| c.name.clone()).collect())
} else {
parsed.group_by.clone()
};
let aggregates = if parsed.distinct && parsed.aggregates.is_empty() {
vec![]
} else {
parsed.aggregates.clone()
};
let mut group_by_indices = Vec::new();
for col_name in &group_by_columns {
let (idx, _) =
table_def
.find_column(col_name)
.ok_or_else(|| QueryError::ColumnNotFound {
table: table_name.clone(),
column: col_name.to_string(),
})?;
group_by_indices.push(idx);
}
let mut result_columns = group_by_columns.clone();
for agg in &aggregates {
let agg_name = match agg {
crate::parser::AggregateFunction::CountStar => "COUNT(*)".to_string(),
crate::parser::AggregateFunction::Count(col) => format!("COUNT({col})"),
crate::parser::AggregateFunction::Sum(col) => format!("SUM({col})"),
crate::parser::AggregateFunction::Avg(col) => format!("AVG({col})"),
crate::parser::AggregateFunction::Min(col) => format!("MIN({col})"),
crate::parser::AggregateFunction::Max(col) => format!("MAX({col})"),
};
result_columns.push(ColumnName::new(agg_name));
}
let mut aggregate_filters: Vec<Option<crate::plan::Filter>> = Vec::new();
for (i, _agg) in aggregates.iter().enumerate() {
let filter_predicates = parsed.aggregate_filters.get(i).and_then(|f| f.as_ref());
let filter = match filter_predicates {
Some(preds) => {
let resolved = resolve_predicates(preds, params)?;
build_filter(table_def, &resolved, &table_name)?
}
None => None,
};
aggregate_filters.push(filter);
}
Ok(QueryPlan::Aggregate {
metadata: create_metadata(table_def, table_name),
source: Box::new(base_plan),
group_by_cols: group_by_indices,
group_by_names: group_by_columns,
aggregates,
aggregate_filters,
column_names: result_columns,
having: parsed.having.clone(),
})
}
pub fn plan_query(schema: &Schema, parsed: &ParsedSelect, params: &[Value]) -> Result<QueryPlan> {
if parsed.joins.is_empty() {
plan_single_table_query(schema, parsed, params)
} else {
plan_join_query(schema, parsed, params)
}
}
fn plan_single_table_query(
schema: &Schema,
parsed: &ParsedSelect,
params: &[Value],
) -> Result<QueryPlan> {
let table_name = parsed.table.clone();
let table_def = schema
.get_table(&table_name.clone().into())
.ok_or_else(|| QueryError::TableNotFound(table_name.clone()))?;
let resolved_predicates = resolve_predicates(&parsed.predicates, params)?;
let (column_indices, column_names) = resolve_query_columns(table_def, parsed, &table_name)?;
let needs_aggregate = !parsed.aggregates.is_empty()
|| !parsed.group_by.is_empty()
|| parsed.distinct
|| !parsed.having.is_empty();
let access_path = analyze_access_path(table_def, &resolved_predicates);
let base_plan = build_scan_plan(
access_path,
table_def,
table_name.clone(),
parsed,
params,
column_indices,
column_names,
)?;
let plan_after_agg = if needs_aggregate {
wrap_with_aggregate(base_plan, table_def, table_name.clone(), parsed, params)?
} else {
base_plan
};
if !parsed.case_columns.is_empty() {
let existing_columns = plan_after_agg.column_names().to_vec();
let case_columns = resolve_case_columns_for_join(&parsed.case_columns, &existing_columns)?;
let mut output_columns = existing_columns;
output_columns.extend(case_columns.iter().map(|c| c.alias.clone()));
Ok(QueryPlan::Materialize {
source: Box::new(plan_after_agg),
filter: None,
case_columns,
order: None,
limit: None,
offset: None,
column_names: output_columns,
})
} else {
Ok(plan_after_agg)
}
}
fn plan_join_query(schema: &Schema, parsed: &ParsedSelect, params: &[Value]) -> Result<QueryPlan> {
let mut current_plan = plan_table_access(schema, &parsed.table, params)?;
for join in &parsed.joins {
let right_plan = plan_table_access(schema, &join.table, params)?;
let on_conditions =
build_join_conditions(&join.on_condition, schema, &parsed.table, &join.table)?;
let left_columns = current_plan.column_names().to_vec();
let right_columns = right_plan.column_names().to_vec();
let mut all_columns = left_columns.clone();
all_columns.extend(right_columns);
current_plan = QueryPlan::Join {
join_type: join.join_type.clone(),
left: Box::new(current_plan),
right: Box::new(right_plan),
on_conditions,
columns: vec![], column_names: all_columns,
};
}
let combined_columns = current_plan.column_names().to_vec();
let resolved_predicates = resolve_predicates(&parsed.predicates, params)?;
let filter = build_filter_for_join(&resolved_predicates, &combined_columns)?;
let order = build_sort_spec_for_join(&parsed.order_by, &combined_columns)?;
let case_columns = resolve_case_columns_for_join(&parsed.case_columns, &combined_columns)?;
let output_columns: Vec<ColumnName> = match &parsed.columns {
None => {
let mut out = combined_columns.clone();
out.extend(case_columns.iter().map(|c| c.alias.clone()));
out
}
Some(selected) => {
for col in selected {
if !combined_columns.iter().any(|c| c == col) {
return Err(QueryError::ColumnNotFound {
table: parsed.table.clone(),
column: col.to_string(),
});
}
}
let mut out = selected.clone();
out.extend(case_columns.iter().map(|c| c.alias.clone()));
out
}
};
let limit = resolve_limit(parsed.limit, params)?;
let offset = resolve_limit(parsed.offset, params)?;
let needs_materialize = filter.is_some()
|| order.is_some()
|| limit.is_some()
|| offset.is_some()
|| !case_columns.is_empty();
if needs_materialize {
Ok(QueryPlan::Materialize {
source: Box::new(current_plan),
filter,
case_columns,
order,
limit,
offset,
column_names: output_columns,
})
} else {
Ok(current_plan)
}
}
fn build_filter_for_join(
predicates: &[ResolvedPredicate],
columns: &[ColumnName],
) -> Result<Option<Filter>> {
if predicates.is_empty() {
return Ok(None);
}
let filters: Result<Vec<_>> = predicates
.iter()
.map(|p| build_filter_for_join_predicate(p, columns))
.collect();
Ok(Some(Filter::and(filters?)))
}
fn build_filter_for_join_predicate(
pred: &ResolvedPredicate,
columns: &[ColumnName],
) -> Result<Filter> {
if let ResolvedOp::Or(left_preds, right_preds) = &pred.op {
let left_filter = build_filter_for_join(left_preds, columns)?.ok_or_else(|| {
QueryError::UnsupportedFeature("OR left side has no predicates".to_string())
})?;
let right_filter = build_filter_for_join(right_preds, columns)?.ok_or_else(|| {
QueryError::UnsupportedFeature("OR right side has no predicates".to_string())
})?;
Ok(Filter::or(vec![left_filter, right_filter]))
} else {
let condition = build_filter_condition_for_join(pred, columns)?;
Ok(Filter::single(condition))
}
}
fn build_filter_condition_for_join(
pred: &ResolvedPredicate,
columns: &[ColumnName],
) -> Result<FilterCondition> {
if matches!(pred.op, ResolvedOp::AlwaysTrue) {
return Ok(FilterCondition {
column_idx: 0,
op: FilterOp::AlwaysTrue,
value: Value::Null,
});
}
if matches!(pred.op, ResolvedOp::AlwaysFalse) {
return Ok(FilterCondition {
column_idx: 0,
op: FilterOp::AlwaysFalse,
value: Value::Null,
});
}
let col_idx = columns
.iter()
.position(|c| c == &pred.column)
.ok_or_else(|| QueryError::ColumnNotFound {
table: "(join)".to_string(),
column: pred.column.to_string(),
})?;
let (op, value) = match &pred.op {
ResolvedOp::Eq(v) => (FilterOp::Eq, v.clone()),
ResolvedOp::Lt(v) => (FilterOp::Lt, v.clone()),
ResolvedOp::Le(v) => (FilterOp::Le, v.clone()),
ResolvedOp::Gt(v) => (FilterOp::Gt, v.clone()),
ResolvedOp::Ge(v) => (FilterOp::Ge, v.clone()),
ResolvedOp::In(vals) => (FilterOp::In(vals.clone()), Value::Null),
ResolvedOp::Like(pattern) => (FilterOp::Like(pattern.clone()), Value::Null),
ResolvedOp::NotLike(pattern) => (FilterOp::NotLike(pattern.clone()), Value::Null),
ResolvedOp::ILike(pattern) => (FilterOp::ILike(pattern.clone()), Value::Null),
ResolvedOp::NotILike(pattern) => (FilterOp::NotILike(pattern.clone()), Value::Null),
ResolvedOp::IsNull => (FilterOp::IsNull, Value::Null),
ResolvedOp::IsNotNull => (FilterOp::IsNotNull, Value::Null),
ResolvedOp::JsonExtractEq {
path,
as_text,
value: v,
} => (
FilterOp::JsonExtractEq {
path: path.clone(),
as_text: *as_text,
value: v.clone(),
},
Value::Null,
),
ResolvedOp::JsonContains(v) => (FilterOp::JsonContains(v.clone()), Value::Null),
ResolvedOp::AlwaysTrue => (FilterOp::AlwaysTrue, Value::Null),
ResolvedOp::AlwaysFalse => (FilterOp::AlwaysFalse, Value::Null),
ResolvedOp::Or(_, _) => {
return Err(QueryError::UnsupportedFeature(
"OR predicates must be handled at filter level".to_string(),
));
}
};
Ok(FilterCondition {
column_idx: col_idx,
op,
value,
})
}
fn build_sort_spec_for_join(
order_by: &[OrderByClause],
columns: &[ColumnName],
) -> Result<Option<SortSpec>> {
if order_by.is_empty() {
return Ok(None);
}
let mut sort_cols = Vec::with_capacity(order_by.len());
for clause in order_by {
let idx = columns
.iter()
.position(|c| c == &clause.column)
.ok_or_else(|| QueryError::ColumnNotFound {
table: "(join)".to_string(),
column: clause.column.to_string(),
})?;
let order = if clause.ascending {
ScanOrder::Ascending
} else {
ScanOrder::Descending
};
sort_cols.push((idx, order));
}
Ok(Some(SortSpec { columns: sort_cols }))
}
fn resolve_case_columns_for_join(
case_columns: &[ComputedColumn],
columns: &[ColumnName],
) -> Result<Vec<CaseColumnDef>> {
case_columns
.iter()
.map(|cc| resolve_single_case_column(cc, columns))
.collect()
}
fn resolve_single_case_column(
cc: &ComputedColumn,
columns: &[ColumnName],
) -> Result<CaseColumnDef> {
let when_clauses: Result<Vec<_>> = cc
.when_clauses
.iter()
.map(|arm| resolve_case_when_arm(arm, columns))
.collect();
Ok(CaseColumnDef {
alias: cc.alias.clone(),
when_clauses: when_clauses?,
else_value: cc.else_value.clone(),
})
}
fn resolve_case_when_arm(arm: &CaseWhenArm, columns: &[ColumnName]) -> Result<CaseWhenClause> {
let resolved = resolve_predicates(&arm.condition, &[])?;
let filter = build_filter_for_join(&resolved, columns)?.ok_or_else(|| {
QueryError::UnsupportedFeature("CASE WHEN condition has no predicates".to_string())
})?;
Ok(CaseWhenClause {
condition: filter,
result: arm.result.clone(),
})
}
fn plan_table_access(schema: &Schema, table_name: &str, _params: &[Value]) -> Result<QueryPlan> {
let table_def = schema
.get_table(&table_name.into())
.ok_or_else(|| QueryError::TableNotFound(table_name.to_string()))?;
let all_column_indices: Vec<usize> = (0..table_def.columns.len()).collect();
let all_column_names: Vec<ColumnName> =
table_def.columns.iter().map(|c| c.name.clone()).collect();
Ok(QueryPlan::TableScan {
metadata: create_metadata(table_def, table_name.to_string()),
filter: None,
limit: None,
offset: None,
order: None,
columns: all_column_indices,
column_names: all_column_names,
})
}
fn build_join_conditions(
predicates: &[Predicate],
schema: &Schema,
left_table: &str,
right_table: &str,
) -> Result<Vec<crate::plan::JoinCondition>> {
let left_table_def = schema
.get_table(&left_table.into())
.ok_or_else(|| QueryError::TableNotFound(left_table.to_string()))?;
let right_table_def = schema
.get_table(&right_table.into())
.ok_or_else(|| QueryError::TableNotFound(right_table.to_string()))?;
let left_col_count = left_table_def.columns.len();
predicates
.iter()
.map(|pred| {
build_single_join_condition(
pred,
left_table,
left_table_def,
right_table,
right_table_def,
left_col_count,
)
})
.collect()
}
fn build_single_join_condition(
pred: &Predicate,
left_table: &str,
left_table_def: &TableDef,
right_table: &str,
right_table_def: &TableDef,
left_col_count: usize,
) -> Result<crate::plan::JoinCondition> {
use crate::plan::JoinOp;
let (left_col_name, op, right_value) = match pred {
Predicate::Eq(col, val) => (col, JoinOp::Eq, val),
Predicate::Lt(col, val) => (col, JoinOp::Lt, val),
Predicate::Le(col, val) => (col, JoinOp::Le, val),
Predicate::Gt(col, val) => (col, JoinOp::Gt, val),
Predicate::Ge(col, val) => (col, JoinOp::Ge, val),
_ => {
return Err(QueryError::UnsupportedFeature(
"only equality and comparison operators supported in JOIN ON clause".to_string(),
));
}
};
let left_col_idx = resolve_join_column(left_col_name, left_table, left_table_def, 0)?;
match right_value {
PredicateValue::ColumnRef(ref_str) => {
let right_col_idx = resolve_join_column_ref(
ref_str,
left_table,
left_table_def,
right_table,
right_table_def,
left_col_count,
)?;
Ok(crate::plan::JoinCondition {
left_col_idx,
right_col_idx,
op,
})
}
_ => Err(QueryError::UnsupportedFeature(
"JOIN ON clause requires column-to-column comparisons (e.g., users.id = orders.user_id)".to_string(),
)),
}
}
fn resolve_join_column(
col_name: &ColumnName,
table_name: &str,
table_def: &TableDef,
offset: usize,
) -> Result<usize> {
let (idx, _) = table_def
.find_column(col_name)
.ok_or_else(|| QueryError::ColumnNotFound {
table: table_name.to_string(),
column: col_name.to_string(),
})?;
Ok(offset + idx)
}
fn resolve_join_column_ref(
ref_str: &str,
left_table: &str,
left_table_def: &TableDef,
right_table: &str,
right_table_def: &TableDef,
left_col_count: usize,
) -> Result<usize> {
if let Some((table, column)) = ref_str.split_once('.') {
if table == left_table {
resolve_join_column(&column.into(), left_table, left_table_def, 0)
} else if table == right_table {
resolve_join_column(&column.into(), right_table, right_table_def, left_col_count)
} else {
Err(QueryError::TableNotFound(table.to_string()))
}
} else {
if let Ok(idx) = resolve_join_column(
&ref_str.into(),
right_table,
right_table_def,
left_col_count,
) {
Ok(idx)
} else {
resolve_join_column(&ref_str.into(), left_table, left_table_def, 0)
}
}
}
#[inline]
fn build_scan_plan(
access_path: AccessPath,
table_def: &TableDef,
table_name: String,
parsed: &ParsedSelect,
params: &[Value],
column_indices: Vec<usize>,
column_names: Vec<ColumnName>,
) -> Result<QueryPlan> {
let limit = resolve_limit(parsed.limit, params)?;
let offset = resolve_limit(parsed.offset, params)?;
match access_path {
AccessPath::PointLookup { key_values } => Ok(build_point_lookup_plan(
table_def,
table_name,
&key_values,
column_indices,
column_names,
)),
AccessPath::RangeScan {
start_key,
end_key,
remaining_predicates,
} => build_range_scan_plan(
table_def,
table_name,
start_key,
end_key,
&remaining_predicates,
limit,
offset,
&parsed.order_by,
column_indices,
column_names,
),
AccessPath::IndexScan {
index_id,
index_name,
start_key,
end_key,
remaining_predicates,
} => build_index_scan_plan(
table_def,
table_name,
index_id,
index_name,
start_key,
end_key,
&remaining_predicates,
limit,
offset,
&parsed.order_by,
column_indices,
column_names,
),
AccessPath::TableScan {
predicates: all_predicates,
} => build_table_scan_plan(
table_def,
table_name,
&all_predicates,
limit,
offset,
&parsed.order_by,
column_indices,
column_names,
),
}
}
#[inline]
fn resolve_query_columns(
table_def: &TableDef,
parsed: &ParsedSelect,
table_name: &str,
) -> Result<(Vec<usize>, Vec<ColumnName>)> {
let needs_aggregate = !parsed.aggregates.is_empty()
|| !parsed.group_by.is_empty()
|| parsed.distinct
|| !parsed.having.is_empty();
if needs_aggregate {
resolve_columns(table_def, None, None, table_name)
} else {
resolve_columns(
table_def,
parsed.columns.as_ref(),
parsed.column_aliases.as_ref(),
table_name,
)
}
}
fn resolve_columns(
table_def: &TableDef,
columns: Option<&Vec<ColumnName>>,
aliases: Option<&Vec<Option<String>>>,
table_name: &str,
) -> Result<(Vec<usize>, Vec<ColumnName>)> {
match columns {
None => {
let indices: Vec<usize> = (0..table_def.columns.len()).collect();
let names: Vec<ColumnName> = table_def.columns.iter().map(|c| c.name.clone()).collect();
Ok((indices, names))
}
Some(cols) => {
let mut indices = Vec::with_capacity(cols.len());
let mut names = Vec::with_capacity(cols.len());
for (i, col) in cols.iter().enumerate() {
let (idx, col_def) =
table_def
.find_column(col)
.ok_or_else(|| QueryError::ColumnNotFound {
table: table_name.to_string(),
column: col.to_string(),
})?;
indices.push(idx);
let alias = aliases.and_then(|v| v.get(i)).and_then(|a| a.as_ref());
let out_name = match alias {
Some(a) => ColumnName::new(a.clone()),
None => col_def.name.clone(),
};
names.push(out_name);
}
Ok((indices, names))
}
}
}
#[derive(Debug, Clone)]
struct ResolvedPredicate {
column: ColumnName,
op: ResolvedOp,
}
#[derive(Debug, Clone)]
enum ResolvedOp {
Eq(Value),
Lt(Value),
Le(Value),
Gt(Value),
Ge(Value),
In(Vec<Value>),
Like(String),
NotLike(String),
ILike(String),
NotILike(String),
IsNull,
IsNotNull,
JsonExtractEq {
path: String,
as_text: bool,
value: Value,
},
JsonContains(Value),
Or(Vec<ResolvedPredicate>, Vec<ResolvedPredicate>),
AlwaysTrue,
AlwaysFalse,
}
fn resolve_predicates(
predicates: &[Predicate],
params: &[Value],
) -> Result<Vec<ResolvedPredicate>> {
predicates
.iter()
.map(|p| resolve_predicate(p, params))
.collect()
}
fn resolve_predicate(predicate: &Predicate, params: &[Value]) -> Result<ResolvedPredicate> {
match predicate {
Predicate::Eq(col, val) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::Eq(resolve_value(val, params)?),
}),
Predicate::Lt(col, val) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::Lt(resolve_value(val, params)?),
}),
Predicate::Le(col, val) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::Le(resolve_value(val, params)?),
}),
Predicate::Gt(col, val) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::Gt(resolve_value(val, params)?),
}),
Predicate::Ge(col, val) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::Ge(resolve_value(val, params)?),
}),
Predicate::In(col, vals) => {
let resolved: Result<Vec<_>> = vals.iter().map(|v| resolve_value(v, params)).collect();
Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::In(resolved?),
})
}
Predicate::Like(col, pattern) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::Like(pattern.clone()),
}),
Predicate::NotLike(col, pattern) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::NotLike(pattern.clone()),
}),
Predicate::ILike(col, pattern) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::ILike(pattern.clone()),
}),
Predicate::NotILike(col, pattern) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::NotILike(pattern.clone()),
}),
Predicate::IsNull(col) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::IsNull,
}),
Predicate::IsNotNull(col) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::IsNotNull,
}),
Predicate::JsonExtractEq {
column,
path,
as_text,
value,
} => Ok(ResolvedPredicate {
column: column.clone(),
op: ResolvedOp::JsonExtractEq {
path: path.clone(),
as_text: *as_text,
value: resolve_value(value, params)?,
},
}),
Predicate::JsonContains { column, value } => Ok(ResolvedPredicate {
column: column.clone(),
op: ResolvedOp::JsonContains(resolve_value(value, params)?),
}),
Predicate::InSubquery { .. } | Predicate::Exists { .. } => {
Err(QueryError::UnsupportedFeature(
"subquery predicate not pre-executed (likely a correlated subquery)".to_string(),
))
}
Predicate::Always(b) => {
Ok(ResolvedPredicate {
column: ColumnName::new(String::new()),
op: if *b {
ResolvedOp::AlwaysTrue
} else {
ResolvedOp::AlwaysFalse
},
})
}
Predicate::Or(left_preds, right_preds) => {
let left_resolved = resolve_predicates(left_preds, params)?;
let right_resolved = resolve_predicates(right_preds, params)?;
Ok(ResolvedPredicate {
column: ColumnName::new(String::new()),
op: ResolvedOp::Or(left_resolved, right_resolved),
})
}
}
}
fn resolve_value(val: &PredicateValue, params: &[Value]) -> Result<Value> {
match val {
PredicateValue::Int(v) => Ok(Value::BigInt(*v)),
PredicateValue::String(s) => Ok(Value::Text(s.clone())),
PredicateValue::Bool(b) => Ok(Value::Boolean(*b)),
PredicateValue::Null => Ok(Value::Null),
PredicateValue::Literal(v) => Ok(v.clone()),
PredicateValue::Param(idx) => {
let zero_idx = idx.checked_sub(1).ok_or(QueryError::ParameterNotFound(0))?;
params
.get(zero_idx)
.cloned()
.ok_or(QueryError::ParameterNotFound(*idx))
}
PredicateValue::ColumnRef(_) => Err(QueryError::UnsupportedFeature(
"column references in WHERE clause not supported (use JOIN ON for column-to-column comparisons)".to_string(),
)),
}
}
enum AccessPath {
PointLookup { key_values: Vec<Value> },
RangeScan {
start_key: Bound<kimberlite_store::Key>,
end_key: Bound<kimberlite_store::Key>,
remaining_predicates: Vec<ResolvedPredicate>,
},
IndexScan {
index_id: u64,
index_name: String,
start_key: Bound<kimberlite_store::Key>,
end_key: Bound<kimberlite_store::Key>,
remaining_predicates: Vec<ResolvedPredicate>,
},
TableScan { predicates: Vec<ResolvedPredicate> },
}
fn analyze_access_path(table_def: &TableDef, predicates: &[ResolvedPredicate]) -> AccessPath {
let pk_columns = &table_def.primary_key;
if pk_columns.is_empty() {
return AccessPath::TableScan {
predicates: predicates.to_vec(),
};
}
let mut pk_values: Vec<Option<Value>> = vec![None; pk_columns.len()];
let mut non_pk_predicates = Vec::new();
for pred in predicates {
if let Some(pk_pos) = table_def.primary_key_position(&pred.column) {
if let ResolvedOp::Eq(val) = &pred.op {
pk_values[pk_pos] = Some(val.clone());
continue;
}
}
non_pk_predicates.push(pred.clone());
}
if pk_values.iter().all(Option::is_some) {
let key_values: Vec<Value> = pk_values.into_iter().flatten().collect();
return AccessPath::PointLookup { key_values };
}
if pk_columns.len() == 1 {
let pk_col = &pk_columns[0];
let pk_predicates: Vec<_> = predicates.iter().filter(|p| &p.column == pk_col).collect();
if !pk_predicates.is_empty() {
let bounds_result = compute_range_bounds(&pk_predicates);
let has_bounds = !matches!(
(&bounds_result.start, &bounds_result.end),
(Bound::Unbounded, Bound::Unbounded)
);
if has_bounds {
let mut remaining: Vec<_> = predicates
.iter()
.filter(|p| &p.column != pk_col)
.cloned()
.collect();
remaining.extend(bounds_result.unconverted);
return AccessPath::RangeScan {
start_key: bounds_result.start,
end_key: bounds_result.end,
remaining_predicates: remaining,
};
}
}
}
let index_candidates = find_usable_indexes(table_def, predicates);
if let Some((best_index, start, end, remaining)) = select_best_index(&index_candidates) {
return AccessPath::IndexScan {
index_id: best_index.index_id,
index_name: best_index.name.clone(),
start_key: start,
end_key: end,
remaining_predicates: remaining,
};
}
AccessPath::TableScan {
predicates: predicates.to_vec(),
}
}
struct RangeBoundsResult {
start: Bound<kimberlite_store::Key>,
end: Bound<kimberlite_store::Key>,
unconverted: Vec<ResolvedPredicate>,
}
fn compute_range_bounds(predicates: &[&ResolvedPredicate]) -> RangeBoundsResult {
let mut lower: Option<(Value, bool)> = None; let mut upper: Option<(Value, bool)> = None;
let mut unconverted = Vec::new();
for pred in predicates {
match &pred.op {
ResolvedOp::Eq(val) => {
lower = Some((val.clone(), true));
upper = Some((val.clone(), true));
}
ResolvedOp::Gt(val) => {
lower = Some((val.clone(), false));
}
ResolvedOp::Ge(val) => {
lower = Some((val.clone(), true));
}
ResolvedOp::Lt(val) => {
upper = Some((val.clone(), false));
}
ResolvedOp::Le(val) => {
upper = Some((val.clone(), true));
}
ResolvedOp::In(_)
| ResolvedOp::Like(_)
| ResolvedOp::NotLike(_)
| ResolvedOp::ILike(_)
| ResolvedOp::NotILike(_)
| ResolvedOp::IsNull
| ResolvedOp::IsNotNull
| ResolvedOp::JsonExtractEq { .. }
| ResolvedOp::JsonContains(_)
| ResolvedOp::AlwaysTrue
| ResolvedOp::AlwaysFalse
| ResolvedOp::Or(_, _) => {
unconverted.push((*pred).clone());
}
}
}
let start = match lower {
Some((val, true)) => Bound::Included(encode_key(&[val])),
Some((val, false)) => Bound::Excluded(encode_key(&[val])),
None => Bound::Unbounded,
};
let end = match upper {
Some((val, true)) => {
Bound::Excluded(successor_key(&encode_key(&[val])))
}
Some((val, false)) => Bound::Excluded(encode_key(&[val])),
None => Bound::Unbounded,
};
RangeBoundsResult {
start,
end,
unconverted,
}
}
struct IndexCandidate<'a> {
index_def: &'a crate::schema::IndexDef,
start: Bound<kimberlite_store::Key>,
end: Bound<kimberlite_store::Key>,
remaining: Vec<ResolvedPredicate>,
score: usize,
}
fn find_usable_indexes<'a>(
table_def: &'a TableDef,
predicates: &[ResolvedPredicate],
) -> Vec<IndexCandidate<'a>> {
let mut candidates = Vec::new();
let max_iterations = 100;
for (iter_count, index_def) in table_def.indexes().iter().enumerate() {
if iter_count >= max_iterations {
break;
}
if index_def.columns.is_empty() {
continue;
}
let first_col = &index_def.columns[0];
let first_col_predicates: Vec<_> = predicates
.iter()
.filter(|p| &p.column == first_col)
.collect();
if first_col_predicates.is_empty() {
continue;
}
let bounds_result = compute_range_bounds(&first_col_predicates);
if matches!(
(&bounds_result.start, &bounds_result.end),
(Bound::Unbounded, Bound::Unbounded)
) {
continue;
}
let mut remaining: Vec<_> = predicates
.iter()
.filter(|p| !index_def.columns.contains(&p.column))
.cloned()
.collect();
remaining.extend(bounds_result.unconverted);
let score = score_index(index_def, predicates);
candidates.push(IndexCandidate {
index_def,
start: bounds_result.start,
end: bounds_result.end,
remaining,
score,
});
}
candidates
}
fn score_index(index_def: &crate::schema::IndexDef, predicates: &[ResolvedPredicate]) -> usize {
let mut score = 0;
let max_columns = 10;
for (iter_count, index_col) in index_def.columns.iter().enumerate() {
if iter_count >= max_columns {
break;
}
for pred in predicates {
if &pred.column == index_col {
match &pred.op {
ResolvedOp::Eq(_) => score += 10, ResolvedOp::Lt(_)
| ResolvedOp::Le(_)
| ResolvedOp::Gt(_)
| ResolvedOp::Ge(_) => {
score += 5; }
_ => score += 1, }
}
}
}
score
}
type BestIndexResult<'a> = (
&'a crate::schema::IndexDef,
Bound<kimberlite_store::Key>,
Bound<kimberlite_store::Key>,
Vec<ResolvedPredicate>,
);
fn select_best_index<'a>(candidates: &'a [IndexCandidate<'a>]) -> Option<BestIndexResult<'a>> {
if candidates.is_empty() {
return None;
}
let max_score = candidates.iter().map(|c| c.score).max().unwrap_or(0);
let best_candidates: Vec<_> = candidates.iter().filter(|c| c.score == max_score).collect();
let best = best_candidates
.iter()
.min_by_key(|c| (c.remaining.len(), c.index_def.columns.len()))?;
Some((
best.index_def,
best.start.clone(),
best.end.clone(),
best.remaining.clone(),
))
}
fn build_filter(
table_def: &TableDef,
predicates: &[ResolvedPredicate],
table_name: &str,
) -> Result<Option<Filter>> {
if predicates.is_empty() {
return Ok(None);
}
let filters: Result<Vec<_>> = predicates
.iter()
.map(|p| build_filter_from_predicate(table_def, p, table_name))
.collect();
Ok(Some(Filter::and(filters?)))
}
fn build_filter_from_predicate(
table_def: &TableDef,
pred: &ResolvedPredicate,
table_name: &str,
) -> Result<Filter> {
if let ResolvedOp::Or(left_preds, right_preds) = &pred.op {
let left_filter = build_filter(table_def, left_preds, table_name)?.ok_or_else(|| {
QueryError::UnsupportedFeature("OR left side has no predicates".to_string())
})?;
let right_filter = build_filter(table_def, right_preds, table_name)?.ok_or_else(|| {
QueryError::UnsupportedFeature("OR right side has no predicates".to_string())
})?;
Ok(Filter::or(vec![left_filter, right_filter]))
} else {
let condition = build_filter_condition(table_def, pred, table_name)?;
Ok(Filter::single(condition))
}
}
fn build_filter_condition(
table_def: &TableDef,
pred: &ResolvedPredicate,
table_name: &str,
) -> Result<FilterCondition> {
if matches!(pred.op, ResolvedOp::AlwaysTrue) {
return Ok(FilterCondition {
column_idx: 0,
op: FilterOp::AlwaysTrue,
value: Value::Null,
});
}
if matches!(pred.op, ResolvedOp::AlwaysFalse) {
return Ok(FilterCondition {
column_idx: 0,
op: FilterOp::AlwaysFalse,
value: Value::Null,
});
}
let (col_idx, _) =
table_def
.find_column(&pred.column)
.ok_or_else(|| QueryError::ColumnNotFound {
table: table_name.to_string(),
column: pred.column.to_string(),
})?;
let (op, value) = match &pred.op {
ResolvedOp::Eq(v) => (FilterOp::Eq, v.clone()),
ResolvedOp::Lt(v) => (FilterOp::Lt, v.clone()),
ResolvedOp::Le(v) => (FilterOp::Le, v.clone()),
ResolvedOp::Gt(v) => (FilterOp::Gt, v.clone()),
ResolvedOp::Ge(v) => (FilterOp::Ge, v.clone()),
ResolvedOp::In(vals) => (FilterOp::In(vals.clone()), Value::Null), ResolvedOp::Like(pattern) => (FilterOp::Like(pattern.clone()), Value::Null),
ResolvedOp::NotLike(pattern) => (FilterOp::NotLike(pattern.clone()), Value::Null),
ResolvedOp::ILike(pattern) => (FilterOp::ILike(pattern.clone()), Value::Null),
ResolvedOp::NotILike(pattern) => (FilterOp::NotILike(pattern.clone()), Value::Null),
ResolvedOp::IsNull => (FilterOp::IsNull, Value::Null),
ResolvedOp::IsNotNull => (FilterOp::IsNotNull, Value::Null),
ResolvedOp::JsonExtractEq {
path,
as_text,
value: v,
} => (
FilterOp::JsonExtractEq {
path: path.clone(),
as_text: *as_text,
value: v.clone(),
},
Value::Null,
),
ResolvedOp::JsonContains(v) => (FilterOp::JsonContains(v.clone()), Value::Null),
ResolvedOp::AlwaysTrue => (FilterOp::AlwaysTrue, Value::Null),
ResolvedOp::AlwaysFalse => (FilterOp::AlwaysFalse, Value::Null),
ResolvedOp::Or(_, _) => {
return Err(QueryError::UnsupportedFeature(
"OR predicates must be handled at filter level, not as individual conditions"
.to_string(),
));
}
};
Ok(FilterCondition {
column_idx: col_idx,
op,
value,
})
}
fn determine_scan_order(order_by: &[OrderByClause], table_def: &TableDef) -> ScanOrder {
if order_by.is_empty() {
return ScanOrder::Ascending;
}
let first = &order_by[0];
if table_def.is_primary_key(&first.column) {
if first.ascending {
ScanOrder::Ascending
} else {
ScanOrder::Descending
}
} else {
ScanOrder::Ascending
}
}
fn build_sort_spec(
order_by: &[OrderByClause],
table_def: &TableDef,
table_name: &str,
) -> Result<Option<SortSpec>> {
if order_by.is_empty() {
return Ok(None);
}
let mut columns = Vec::with_capacity(order_by.len());
for clause in order_by {
let (col_idx, _) =
table_def
.find_column(&clause.column)
.ok_or_else(|| QueryError::ColumnNotFound {
table: table_name.to_string(),
column: clause.column.to_string(),
})?;
let order = if clause.ascending {
ScanOrder::Ascending
} else {
ScanOrder::Descending
};
columns.push((col_idx, order));
}
Ok(Some(SortSpec { columns }))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::parser::{ParsedStatement, parse_statement};
use crate::schema::{ColumnDef, DataType, SchemaBuilder};
use kimberlite_store::TableId;
fn parse_test_select(sql: &str) -> ParsedSelect {
match parse_statement(sql).unwrap() {
ParsedStatement::Select(s) => s,
_ => panic!("expected SELECT statement"),
}
}
fn test_schema() -> Schema {
SchemaBuilder::new()
.table(
"users",
TableId::new(1),
vec![
ColumnDef::new("id", DataType::BigInt).not_null(),
ColumnDef::new("name", DataType::Text).not_null(),
ColumnDef::new("age", DataType::BigInt),
],
vec!["id".into()],
)
.build()
}
#[test]
fn test_plan_point_lookup() {
let schema = test_schema();
let parsed = parse_test_select("SELECT * FROM users WHERE id = 42");
let plan = plan_query(&schema, &parsed, &[]).unwrap();
assert!(matches!(plan, QueryPlan::PointLookup { .. }));
}
#[test]
fn test_plan_range_scan() {
let schema = test_schema();
let parsed = parse_test_select("SELECT * FROM users WHERE id > 10");
let plan = plan_query(&schema, &parsed, &[]).unwrap();
assert!(matches!(plan, QueryPlan::RangeScan { .. }));
}
#[test]
fn test_plan_table_scan() {
let schema = test_schema();
let parsed = parse_test_select("SELECT * FROM users WHERE name = 'alice'");
let plan = plan_query(&schema, &parsed, &[]).unwrap();
assert!(matches!(plan, QueryPlan::TableScan { .. }));
}
#[test]
fn test_plan_with_params() {
let schema = test_schema();
let parsed = parse_test_select("SELECT * FROM users WHERE id = $1");
let plan = plan_query(&schema, &parsed, &[Value::BigInt(42)]).unwrap();
assert!(matches!(plan, QueryPlan::PointLookup { .. }));
}
#[test]
fn test_plan_missing_param() {
let schema = test_schema();
let parsed = parse_test_select("SELECT * FROM users WHERE id = $1");
let result = plan_query(&schema, &parsed, &[]);
assert!(matches!(result, Err(QueryError::ParameterNotFound(1))));
}
#[test]
fn test_plan_unknown_table() {
let schema = test_schema();
let parsed = parse_test_select("SELECT * FROM unknown");
let result = plan_query(&schema, &parsed, &[]);
assert!(matches!(result, Err(QueryError::TableNotFound(_))));
}
#[test]
fn test_plan_unknown_column() {
let schema = test_schema();
let parsed = parse_test_select("SELECT unknown FROM users");
let result = plan_query(&schema, &parsed, &[]);
assert!(matches!(result, Err(QueryError::ColumnNotFound { .. })));
}
}