use std::ops::Bound;
use std::sync::Arc;
use crate::error::{QueryError, Result};
use crate::expression::ScalarExpr;
use crate::key_encoder::{encode_key, successor_key};
use crate::parser::{
CaseWhenArm, ComputedColumn, LimitExpr, OrderByClause, ParsedSelect, Predicate, PredicateValue,
ScalarCmpOp,
};
use crate::plan::{
CaseColumnDef, CaseWhenClause, Filter, FilterCondition, FilterOp, QueryPlan, ScanOrder,
SortSpec,
};
use crate::schema::{ColumnName, Schema, TableDef};
use crate::value::Value;
#[inline]
fn create_metadata(table_def: &TableDef, table_name: String) -> crate::plan::TableMetadata {
crate::plan::TableMetadata {
table_id: table_def.table_id,
table_name,
columns: table_def.columns.clone(),
primary_key: table_def.primary_key.clone(),
}
}
#[inline]
fn build_point_lookup_plan(
table_def: &TableDef,
table_name: String,
key_values: &[Value],
column_indices: Vec<usize>,
column_names: Vec<ColumnName>,
) -> QueryPlan {
let key = encode_key(key_values);
QueryPlan::PointLookup {
metadata: create_metadata(table_def, table_name),
key,
columns: column_indices,
column_names,
}
}
#[inline]
#[allow(clippy::too_many_arguments)]
fn build_range_scan_plan(
table_def: &TableDef,
table_name: String,
start_key: Bound<kimberlite_store::Key>,
end_key: Bound<kimberlite_store::Key>,
remaining_predicates: &[ResolvedPredicate],
limit: Option<usize>,
offset: Option<usize>,
order_by: &[OrderByClause],
column_indices: Vec<usize>,
column_names: Vec<ColumnName>,
) -> Result<QueryPlan> {
let filter = build_filter(table_def, remaining_predicates, &table_name)?;
let order = determine_scan_order(order_by, table_def);
let needs_client_sort = !order_by.is_empty()
&& order_by
.iter()
.any(|clause| !table_def.is_primary_key(&clause.column));
let order_by_spec = if needs_client_sort {
build_sort_spec(order_by, table_def, &table_name)?
} else {
None
};
Ok(QueryPlan::RangeScan {
metadata: create_metadata(table_def, table_name),
start: start_key,
end: end_key,
filter,
limit,
offset,
order,
order_by: order_by_spec,
columns: column_indices,
column_names,
})
}
#[inline]
#[allow(clippy::too_many_arguments)]
fn build_index_scan_plan(
table_def: &TableDef,
table_name: String,
index_id: u64,
index_name: String,
start_key: Bound<kimberlite_store::Key>,
end_key: Bound<kimberlite_store::Key>,
remaining_predicates: &[ResolvedPredicate],
limit: Option<usize>,
offset: Option<usize>,
order_by: &[OrderByClause],
column_indices: Vec<usize>,
column_names: Vec<ColumnName>,
) -> Result<QueryPlan> {
let filter = build_filter(table_def, remaining_predicates, &table_name)?;
let order = determine_scan_order(order_by, table_def);
let needs_client_sort = !order_by.is_empty()
&& order_by
.iter()
.any(|clause| !table_def.is_primary_key(&clause.column));
let order_by_spec = if needs_client_sort {
build_sort_spec(order_by, table_def, &table_name)?
} else {
None
};
Ok(QueryPlan::IndexScan {
metadata: create_metadata(table_def, table_name),
index_id,
index_name,
start: start_key,
end: end_key,
filter,
limit,
offset,
order,
order_by: order_by_spec,
columns: column_indices,
column_names,
})
}
#[inline]
#[allow(clippy::too_many_arguments)]
fn build_table_scan_plan(
table_def: &TableDef,
table_name: String,
all_predicates: &[ResolvedPredicate],
limit: Option<usize>,
offset: Option<usize>,
order_by: &[OrderByClause],
column_indices: Vec<usize>,
column_names: Vec<ColumnName>,
) -> Result<QueryPlan> {
let filter = build_filter(table_def, all_predicates, &table_name)?;
let order = build_sort_spec(order_by, table_def, &table_name)?;
Ok(QueryPlan::TableScan {
metadata: create_metadata(table_def, table_name),
filter,
limit,
offset,
order,
columns: column_indices,
column_names,
})
}
fn resolve_limit(expr: Option<LimitExpr>, params: &[Value]) -> Result<Option<usize>> {
match expr {
None => Ok(None),
Some(LimitExpr::Literal(v)) => Ok(Some(v)),
Some(LimitExpr::Param(idx)) => {
let zero_idx = idx.checked_sub(1).ok_or(QueryError::ParameterNotFound(0))?;
let value = params
.get(zero_idx)
.cloned()
.ok_or(QueryError::ParameterNotFound(idx))?;
match value {
Value::BigInt(n) if n >= 0 => Ok(Some(n as usize)),
Value::Integer(n) if n >= 0 => Ok(Some(n as usize)),
Value::SmallInt(n) if n >= 0 => Ok(Some(n as usize)),
Value::TinyInt(n) if n >= 0 => Ok(Some(n as usize)),
Value::BigInt(_) | Value::Integer(_) | Value::SmallInt(_) | Value::TinyInt(_) => {
Err(QueryError::ParseError(
"LIMIT/OFFSET parameter must be non-negative".to_string(),
))
}
other => Err(QueryError::UnsupportedFeature(format!(
"LIMIT/OFFSET parameter must bind to an integer; got {other:?}"
))),
}
}
}
}
#[inline]
fn wrap_with_aggregate(
base_plan: QueryPlan,
table_def: &TableDef,
table_name: String,
parsed: &ParsedSelect,
params: &[Value],
) -> Result<QueryPlan> {
let group_by_columns = if parsed.distinct && parsed.group_by.is_empty() {
parsed
.columns
.clone()
.unwrap_or_else(|| table_def.columns.iter().map(|c| c.name.clone()).collect())
} else {
parsed.group_by.clone()
};
let aggregates = if parsed.distinct && parsed.aggregates.is_empty() {
vec![]
} else {
parsed.aggregates.clone()
};
let mut group_by_indices = Vec::new();
for col_name in &group_by_columns {
let (idx, _) =
table_def
.find_column(col_name)
.ok_or_else(|| QueryError::ColumnNotFound {
table: table_name.clone(),
column: col_name.to_string(),
})?;
group_by_indices.push(idx);
}
let mut result_columns = group_by_columns.clone();
for agg in &aggregates {
let agg_name = match agg {
crate::parser::AggregateFunction::CountStar => "COUNT(*)".to_string(),
crate::parser::AggregateFunction::Count(col) => format!("COUNT({col})"),
crate::parser::AggregateFunction::Sum(col) => format!("SUM({col})"),
crate::parser::AggregateFunction::Avg(col) => format!("AVG({col})"),
crate::parser::AggregateFunction::Min(col) => format!("MIN({col})"),
crate::parser::AggregateFunction::Max(col) => format!("MAX({col})"),
};
result_columns.push(ColumnName::new(agg_name));
}
let mut aggregate_filters: Vec<Option<crate::plan::Filter>> = Vec::new();
for (i, _agg) in aggregates.iter().enumerate() {
let filter_predicates = parsed.aggregate_filters.get(i).and_then(|f| f.as_ref());
let filter = match filter_predicates {
Some(preds) => {
let resolved = resolve_predicates(preds, params)?;
build_filter(table_def, &resolved, &table_name)?
}
None => None,
};
aggregate_filters.push(filter);
}
Ok(QueryPlan::Aggregate {
metadata: create_metadata(table_def, table_name),
source: Box::new(base_plan),
group_by_cols: group_by_indices,
group_by_names: group_by_columns,
aggregates,
aggregate_filters,
column_names: result_columns,
having: parsed.having.clone(),
})
}
pub fn plan_query(schema: &Schema, parsed: &ParsedSelect, params: &[Value]) -> Result<QueryPlan> {
plan_query_with_clock(schema, parsed, params, current_statement_timestamp_ns())
}
pub fn plan_query_with_clock(
schema: &Schema,
parsed: &ParsedSelect,
params: &[Value],
statement_ts_ns: i64,
) -> Result<QueryPlan> {
let mut plan = if parsed.joins.is_empty() {
plan_single_table_query(schema, parsed, params)?
} else {
plan_join_query(schema, parsed, params)?
};
fold_time_constants_in_plan(&mut plan, statement_ts_ns);
Ok(plan)
}
fn plan_single_table_query(
schema: &Schema,
parsed: &ParsedSelect,
params: &[Value],
) -> Result<QueryPlan> {
let table_name = parsed.table.clone();
let table_def = schema
.get_table(&table_name.clone().into())
.ok_or_else(|| QueryError::TableNotFound(table_name.clone()))?;
let resolved_predicates = resolve_predicates(&parsed.predicates, params)?;
let (column_indices, column_names) = resolve_query_columns(table_def, parsed, &table_name)?;
let needs_aggregate = !parsed.aggregates.is_empty()
|| !parsed.group_by.is_empty()
|| parsed.distinct
|| !parsed.having.is_empty();
let access_path = analyze_access_path(table_def, &resolved_predicates);
let base_plan = build_scan_plan(
access_path,
table_def,
table_name.clone(),
parsed,
params,
column_indices,
column_names,
)?;
let plan_after_agg = if needs_aggregate {
wrap_with_aggregate(base_plan, table_def, table_name.clone(), parsed, params)?
} else {
base_plan
};
if !parsed.case_columns.is_empty() || !parsed.scalar_projections.is_empty() {
let source_columns = plan_after_agg.column_names().to_vec();
let case_columns = resolve_case_columns_for_join(&parsed.case_columns, &source_columns)?;
let mut post_case_columns = source_columns.clone();
post_case_columns.extend(case_columns.iter().map(|c| c.alias.clone()));
let scalar_columns =
resolve_scalar_columns(&parsed.scalar_projections, &post_case_columns, params)?;
let mut output_columns: Vec<ColumnName> = match (&parsed.columns, &parsed.column_aliases) {
(None, _) => {
source_columns.clone()
}
(Some(cols), aliases) => {
let mut out = Vec::with_capacity(cols.len());
for (i, col) in cols.iter().enumerate() {
let alias = aliases
.as_ref()
.and_then(|v| v.get(i))
.and_then(|a| a.as_ref());
out.push(match alias {
Some(a) => ColumnName::new(a.clone()),
None => col.clone(),
});
}
out
}
};
output_columns.extend(case_columns.iter().map(|c| c.alias.clone()));
output_columns.extend(scalar_columns.iter().map(|c| c.output_name.clone()));
Ok(QueryPlan::Materialize {
source: Box::new(plan_after_agg),
filter: None,
case_columns,
scalar_columns,
order: None,
limit: None,
offset: None,
column_names: output_columns,
})
} else {
Ok(plan_after_agg)
}
}
fn resolve_scalar_columns(
projections: &[crate::parser::ParsedScalarProjection],
source_columns: &[ColumnName],
params: &[Value],
) -> Result<Vec<crate::plan::ScalarColumnDef>> {
if projections.is_empty() {
return Ok(Vec::new());
}
let columns: Arc<[ColumnName]> = source_columns.to_vec().into();
projections
.iter()
.map(|p| {
Ok(crate::plan::ScalarColumnDef {
output_name: p.output_name.clone(),
expr: substitute_scalar_params(&p.expr, params)?,
columns: columns.clone(),
})
})
.collect()
}
fn plan_join_query(schema: &Schema, parsed: &ParsedSelect, params: &[Value]) -> Result<QueryPlan> {
let mut current_plan = plan_table_access(schema, &parsed.table, params)?;
for join in &parsed.joins {
let right_plan = plan_table_access(schema, &join.table, params)?;
let on_conditions =
build_join_conditions(&join.on_condition, schema, &parsed.table, &join.table)?;
let left_columns = current_plan.column_names().to_vec();
let right_columns = right_plan.column_names().to_vec();
let mut all_columns = left_columns.clone();
all_columns.extend(right_columns);
current_plan = QueryPlan::Join {
join_type: join.join_type.clone(),
left: Box::new(current_plan),
right: Box::new(right_plan),
on_conditions,
columns: vec![], column_names: all_columns,
};
}
let combined_columns = current_plan.column_names().to_vec();
let resolved_predicates = resolve_predicates(&parsed.predicates, params)?;
let filter = build_filter_for_join(&resolved_predicates, &combined_columns)?;
let order = build_sort_spec_for_join(&parsed.order_by, &combined_columns)?;
let case_columns = resolve_case_columns_for_join(&parsed.case_columns, &combined_columns)?;
let mut post_case_columns = combined_columns.clone();
post_case_columns.extend(case_columns.iter().map(|c| c.alias.clone()));
let scalar_columns =
resolve_scalar_columns(&parsed.scalar_projections, &post_case_columns, params)?;
let output_columns: Vec<ColumnName> = match &parsed.columns {
None => {
let mut out = combined_columns.clone();
out.extend(case_columns.iter().map(|c| c.alias.clone()));
out.extend(scalar_columns.iter().map(|c| c.output_name.clone()));
out
}
Some(selected) => {
for col in selected {
if !combined_columns.iter().any(|c| c == col) {
return Err(QueryError::ColumnNotFound {
table: parsed.table.clone(),
column: col.to_string(),
});
}
}
let mut out = selected.clone();
out.extend(case_columns.iter().map(|c| c.alias.clone()));
out.extend(scalar_columns.iter().map(|c| c.output_name.clone()));
out
}
};
let limit = resolve_limit(parsed.limit, params)?;
let offset = resolve_limit(parsed.offset, params)?;
let needs_materialize = filter.is_some()
|| order.is_some()
|| limit.is_some()
|| offset.is_some()
|| !case_columns.is_empty()
|| !scalar_columns.is_empty();
if needs_materialize {
Ok(QueryPlan::Materialize {
source: Box::new(current_plan),
filter,
case_columns,
scalar_columns,
order,
limit,
offset,
column_names: output_columns,
})
} else {
Ok(current_plan)
}
}
fn build_filter_for_join(
predicates: &[ResolvedPredicate],
columns: &[ColumnName],
) -> Result<Option<Filter>> {
if predicates.is_empty() {
return Ok(None);
}
let filters: Result<Vec<_>> = predicates
.iter()
.map(|p| build_filter_for_join_predicate(p, columns))
.collect();
Ok(Some(Filter::and(filters?)))
}
fn build_filter_for_join_predicate(
pred: &ResolvedPredicate,
columns: &[ColumnName],
) -> Result<Filter> {
if let ResolvedOp::Or(left_preds, right_preds) = &pred.op {
let left_filter = build_filter_for_join(left_preds, columns)?.ok_or_else(|| {
QueryError::UnsupportedFeature("OR left side has no predicates".to_string())
})?;
let right_filter = build_filter_for_join(right_preds, columns)?.ok_or_else(|| {
QueryError::UnsupportedFeature("OR right side has no predicates".to_string())
})?;
Ok(Filter::or(vec![left_filter, right_filter]))
} else {
let condition = build_filter_condition_for_join(pred, columns)?;
Ok(Filter::single(condition))
}
}
fn build_filter_condition_for_join(
pred: &ResolvedPredicate,
columns: &[ColumnName],
) -> Result<FilterCondition> {
if matches!(pred.op, ResolvedOp::AlwaysTrue) {
return Ok(FilterCondition {
column_idx: 0,
op: FilterOp::AlwaysTrue,
value: Value::Null,
});
}
if matches!(pred.op, ResolvedOp::AlwaysFalse) {
return Ok(FilterCondition {
column_idx: 0,
op: FilterOp::AlwaysFalse,
value: Value::Null,
});
}
if let ResolvedOp::ScalarCmp { lhs, op, rhs } = &pred.op {
let cols: Arc<[ColumnName]> = columns.to_vec().into();
return Ok(FilterCondition {
column_idx: 0,
op: FilterOp::ScalarCmp {
columns: cols,
lhs: lhs.clone(),
op: *op,
rhs: rhs.clone(),
},
value: Value::Null,
});
}
let col_idx = columns
.iter()
.position(|c| c == &pred.column)
.ok_or_else(|| QueryError::ColumnNotFound {
table: "(join)".to_string(),
column: pred.column.to_string(),
})?;
let (op, value) = match &pred.op {
ResolvedOp::Eq(v) => (FilterOp::Eq, v.clone()),
ResolvedOp::Lt(v) => (FilterOp::Lt, v.clone()),
ResolvedOp::Le(v) => (FilterOp::Le, v.clone()),
ResolvedOp::Gt(v) => (FilterOp::Gt, v.clone()),
ResolvedOp::Ge(v) => (FilterOp::Ge, v.clone()),
ResolvedOp::In(vals) => (FilterOp::In(vals.clone()), Value::Null),
ResolvedOp::NotIn(vals) => (FilterOp::NotIn(vals.clone()), Value::Null),
ResolvedOp::NotBetween(low, high) => {
(FilterOp::NotBetween(low.clone(), high.clone()), Value::Null)
}
ResolvedOp::Like(pattern) => (FilterOp::Like(pattern.clone()), Value::Null),
ResolvedOp::NotLike(pattern) => (FilterOp::NotLike(pattern.clone()), Value::Null),
ResolvedOp::ILike(pattern) => (FilterOp::ILike(pattern.clone()), Value::Null),
ResolvedOp::NotILike(pattern) => (FilterOp::NotILike(pattern.clone()), Value::Null),
ResolvedOp::IsNull => (FilterOp::IsNull, Value::Null),
ResolvedOp::IsNotNull => (FilterOp::IsNotNull, Value::Null),
ResolvedOp::JsonExtractEq {
path,
as_text,
value: v,
} => (
FilterOp::JsonExtractEq {
path: path.clone(),
as_text: *as_text,
value: v.clone(),
},
Value::Null,
),
ResolvedOp::JsonContains(v) => (FilterOp::JsonContains(v.clone()), Value::Null),
ResolvedOp::AlwaysTrue => (FilterOp::AlwaysTrue, Value::Null),
ResolvedOp::AlwaysFalse => (FilterOp::AlwaysFalse, Value::Null),
ResolvedOp::Or(_, _) => {
return Err(QueryError::UnsupportedFeature(
"OR predicates must be handled at filter level".to_string(),
));
}
ResolvedOp::ScalarCmp { .. } => {
unreachable!("ScalarCmp handled by the short-circuit above");
}
};
Ok(FilterCondition {
column_idx: col_idx,
op,
value,
})
}
fn build_sort_spec_for_join(
order_by: &[OrderByClause],
columns: &[ColumnName],
) -> Result<Option<SortSpec>> {
if order_by.is_empty() {
return Ok(None);
}
let mut sort_cols = Vec::with_capacity(order_by.len());
for clause in order_by {
let idx = columns
.iter()
.position(|c| c == &clause.column)
.ok_or_else(|| QueryError::ColumnNotFound {
table: "(join)".to_string(),
column: clause.column.to_string(),
})?;
let order = if clause.ascending {
ScanOrder::Ascending
} else {
ScanOrder::Descending
};
sort_cols.push((idx, order));
}
Ok(Some(SortSpec { columns: sort_cols }))
}
fn resolve_case_columns_for_join(
case_columns: &[ComputedColumn],
columns: &[ColumnName],
) -> Result<Vec<CaseColumnDef>> {
case_columns
.iter()
.map(|cc| resolve_single_case_column(cc, columns))
.collect()
}
fn resolve_single_case_column(
cc: &ComputedColumn,
columns: &[ColumnName],
) -> Result<CaseColumnDef> {
let when_clauses: Result<Vec<_>> = cc
.when_clauses
.iter()
.map(|arm| resolve_case_when_arm(arm, columns))
.collect();
Ok(CaseColumnDef {
alias: cc.alias.clone(),
when_clauses: when_clauses?,
else_value: cc.else_value.clone(),
})
}
fn resolve_case_when_arm(arm: &CaseWhenArm, columns: &[ColumnName]) -> Result<CaseWhenClause> {
let resolved = resolve_predicates(&arm.condition, &[])?;
let filter = build_filter_for_join(&resolved, columns)?.ok_or_else(|| {
QueryError::UnsupportedFeature("CASE WHEN condition has no predicates".to_string())
})?;
Ok(CaseWhenClause {
condition: filter,
result: arm.result.clone(),
})
}
fn plan_table_access(schema: &Schema, table_name: &str, _params: &[Value]) -> Result<QueryPlan> {
let table_def = schema
.get_table(&table_name.into())
.ok_or_else(|| QueryError::TableNotFound(table_name.to_string()))?;
let all_column_indices: Vec<usize> = (0..table_def.columns.len()).collect();
let all_column_names: Vec<ColumnName> =
table_def.columns.iter().map(|c| c.name.clone()).collect();
Ok(QueryPlan::TableScan {
metadata: create_metadata(table_def, table_name.to_string()),
filter: None,
limit: None,
offset: None,
order: None,
columns: all_column_indices,
column_names: all_column_names,
})
}
fn build_join_conditions(
predicates: &[Predicate],
schema: &Schema,
left_table: &str,
right_table: &str,
) -> Result<Vec<crate::plan::JoinCondition>> {
let left_table_def = schema
.get_table(&left_table.into())
.ok_or_else(|| QueryError::TableNotFound(left_table.to_string()))?;
let right_table_def = schema
.get_table(&right_table.into())
.ok_or_else(|| QueryError::TableNotFound(right_table.to_string()))?;
let left_col_count = left_table_def.columns.len();
predicates
.iter()
.map(|pred| {
build_single_join_condition(
pred,
left_table,
left_table_def,
right_table,
right_table_def,
left_col_count,
)
})
.collect()
}
fn build_single_join_condition(
pred: &Predicate,
left_table: &str,
left_table_def: &TableDef,
right_table: &str,
right_table_def: &TableDef,
left_col_count: usize,
) -> Result<crate::plan::JoinCondition> {
use crate::plan::JoinOp;
let (left_col_name, op, right_value) = match pred {
Predicate::Eq(col, val) => (col, JoinOp::Eq, val),
Predicate::Lt(col, val) => (col, JoinOp::Lt, val),
Predicate::Le(col, val) => (col, JoinOp::Le, val),
Predicate::Gt(col, val) => (col, JoinOp::Gt, val),
Predicate::Ge(col, val) => (col, JoinOp::Ge, val),
_ => {
return Err(QueryError::UnsupportedFeature(
"only equality and comparison operators supported in JOIN ON clause".to_string(),
));
}
};
let left_col_idx = resolve_join_column(left_col_name, left_table, left_table_def, 0)?;
match right_value {
PredicateValue::ColumnRef(ref_str) => {
let right_col_idx = resolve_join_column_ref(
ref_str,
left_table,
left_table_def,
right_table,
right_table_def,
left_col_count,
)?;
Ok(crate::plan::JoinCondition {
left_col_idx,
right_col_idx,
op,
})
}
_ => Err(QueryError::UnsupportedFeature(
"JOIN ON clause requires column-to-column comparisons (e.g., users.id = orders.user_id)".to_string(),
)),
}
}
fn resolve_join_column(
col_name: &ColumnName,
table_name: &str,
table_def: &TableDef,
offset: usize,
) -> Result<usize> {
let (idx, _) = table_def
.find_column(col_name)
.ok_or_else(|| QueryError::ColumnNotFound {
table: table_name.to_string(),
column: col_name.to_string(),
})?;
Ok(offset + idx)
}
fn resolve_join_column_ref(
ref_str: &str,
left_table: &str,
left_table_def: &TableDef,
right_table: &str,
right_table_def: &TableDef,
left_col_count: usize,
) -> Result<usize> {
if let Some((table, column)) = ref_str.split_once('.') {
if table == left_table {
resolve_join_column(&column.into(), left_table, left_table_def, 0)
} else if table == right_table {
resolve_join_column(&column.into(), right_table, right_table_def, left_col_count)
} else {
Err(QueryError::TableNotFound(table.to_string()))
}
} else {
if let Ok(idx) = resolve_join_column(
&ref_str.into(),
right_table,
right_table_def,
left_col_count,
) {
Ok(idx)
} else {
resolve_join_column(&ref_str.into(), left_table, left_table_def, 0)
}
}
}
#[inline]
fn build_scan_plan(
access_path: AccessPath,
table_def: &TableDef,
table_name: String,
parsed: &ParsedSelect,
params: &[Value],
column_indices: Vec<usize>,
column_names: Vec<ColumnName>,
) -> Result<QueryPlan> {
let limit = resolve_limit(parsed.limit, params)?;
let offset = resolve_limit(parsed.offset, params)?;
match access_path {
AccessPath::PointLookup { key_values } => Ok(build_point_lookup_plan(
table_def,
table_name,
&key_values,
column_indices,
column_names,
)),
AccessPath::RangeScan {
start_key,
end_key,
remaining_predicates,
} => build_range_scan_plan(
table_def,
table_name,
start_key,
end_key,
&remaining_predicates,
limit,
offset,
&parsed.order_by,
column_indices,
column_names,
),
AccessPath::IndexScan {
index_id,
index_name,
start_key,
end_key,
remaining_predicates,
} => build_index_scan_plan(
table_def,
table_name,
index_id,
index_name,
start_key,
end_key,
&remaining_predicates,
limit,
offset,
&parsed.order_by,
column_indices,
column_names,
),
AccessPath::TableScan {
predicates: all_predicates,
} => build_table_scan_plan(
table_def,
table_name,
&all_predicates,
limit,
offset,
&parsed.order_by,
column_indices,
column_names,
),
}
}
#[inline]
fn resolve_query_columns(
table_def: &TableDef,
parsed: &ParsedSelect,
table_name: &str,
) -> Result<(Vec<usize>, Vec<ColumnName>)> {
let needs_aggregate = !parsed.aggregates.is_empty()
|| !parsed.group_by.is_empty()
|| parsed.distinct
|| !parsed.having.is_empty();
if needs_aggregate || !parsed.scalar_projections.is_empty() {
resolve_columns(table_def, None, None, table_name)
} else {
resolve_columns(
table_def,
parsed.columns.as_ref(),
parsed.column_aliases.as_ref(),
table_name,
)
}
}
fn resolve_columns(
table_def: &TableDef,
columns: Option<&Vec<ColumnName>>,
aliases: Option<&Vec<Option<String>>>,
table_name: &str,
) -> Result<(Vec<usize>, Vec<ColumnName>)> {
match columns {
None => {
let indices: Vec<usize> = (0..table_def.columns.len()).collect();
let names: Vec<ColumnName> = table_def.columns.iter().map(|c| c.name.clone()).collect();
Ok((indices, names))
}
Some(cols) => {
let mut indices = Vec::with_capacity(cols.len());
let mut names = Vec::with_capacity(cols.len());
for (i, col) in cols.iter().enumerate() {
let (idx, col_def) =
table_def
.find_column(col)
.ok_or_else(|| QueryError::ColumnNotFound {
table: table_name.to_string(),
column: col.to_string(),
})?;
indices.push(idx);
let alias = aliases.and_then(|v| v.get(i)).and_then(|a| a.as_ref());
let out_name = match alias {
Some(a) => ColumnName::new(a.clone()),
None => col_def.name.clone(),
};
names.push(out_name);
}
Ok((indices, names))
}
}
}
#[derive(Debug, Clone)]
struct ResolvedPredicate {
column: ColumnName,
op: ResolvedOp,
}
#[derive(Debug, Clone)]
enum ResolvedOp {
Eq(Value),
Lt(Value),
Le(Value),
Gt(Value),
Ge(Value),
In(Vec<Value>),
NotIn(Vec<Value>),
NotBetween(Value, Value),
Like(String),
NotLike(String),
ILike(String),
NotILike(String),
IsNull,
IsNotNull,
JsonExtractEq {
path: String,
as_text: bool,
value: Value,
},
JsonContains(Value),
Or(Vec<ResolvedPredicate>, Vec<ResolvedPredicate>),
AlwaysTrue,
AlwaysFalse,
ScalarCmp {
lhs: ScalarExpr,
op: ScalarCmpOp,
rhs: ScalarExpr,
},
}
fn resolve_predicates(
predicates: &[Predicate],
params: &[Value],
) -> Result<Vec<ResolvedPredicate>> {
predicates
.iter()
.map(|p| resolve_predicate(p, params))
.collect()
}
fn resolve_predicate(predicate: &Predicate, params: &[Value]) -> Result<ResolvedPredicate> {
match predicate {
Predicate::Eq(col, val) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::Eq(resolve_value(val, params)?),
}),
Predicate::Lt(col, val) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::Lt(resolve_value(val, params)?),
}),
Predicate::Le(col, val) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::Le(resolve_value(val, params)?),
}),
Predicate::Gt(col, val) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::Gt(resolve_value(val, params)?),
}),
Predicate::Ge(col, val) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::Ge(resolve_value(val, params)?),
}),
Predicate::In(col, vals) => {
let resolved: Result<Vec<_>> = vals.iter().map(|v| resolve_value(v, params)).collect();
Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::In(resolved?),
})
}
Predicate::NotIn(col, vals) => {
let resolved: Result<Vec<_>> = vals.iter().map(|v| resolve_value(v, params)).collect();
Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::NotIn(resolved?),
})
}
Predicate::NotBetween(col, low, high) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::NotBetween(resolve_value(low, params)?, resolve_value(high, params)?),
}),
Predicate::ScalarCmp { lhs, op, rhs } => Ok(ResolvedPredicate {
column: ColumnName::new(String::new()),
op: ResolvedOp::ScalarCmp {
lhs: substitute_scalar_params(lhs, params)?,
op: *op,
rhs: substitute_scalar_params(rhs, params)?,
},
}),
Predicate::Like(col, pattern) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::Like(pattern.clone()),
}),
Predicate::NotLike(col, pattern) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::NotLike(pattern.clone()),
}),
Predicate::ILike(col, pattern) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::ILike(pattern.clone()),
}),
Predicate::NotILike(col, pattern) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::NotILike(pattern.clone()),
}),
Predicate::IsNull(col) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::IsNull,
}),
Predicate::IsNotNull(col) => Ok(ResolvedPredicate {
column: col.clone(),
op: ResolvedOp::IsNotNull,
}),
Predicate::JsonExtractEq {
column,
path,
as_text,
value,
} => Ok(ResolvedPredicate {
column: column.clone(),
op: ResolvedOp::JsonExtractEq {
path: path.clone(),
as_text: *as_text,
value: resolve_value(value, params)?,
},
}),
Predicate::JsonContains { column, value } => Ok(ResolvedPredicate {
column: column.clone(),
op: ResolvedOp::JsonContains(resolve_value(value, params)?),
}),
Predicate::InSubquery { .. } | Predicate::Exists { .. } => {
Err(QueryError::UnsupportedFeature(
"subquery predicate not pre-executed (likely a correlated subquery)".to_string(),
))
}
Predicate::Always(b) => {
Ok(ResolvedPredicate {
column: ColumnName::new(String::new()),
op: if *b {
ResolvedOp::AlwaysTrue
} else {
ResolvedOp::AlwaysFalse
},
})
}
Predicate::Or(left_preds, right_preds) => {
let left_resolved = resolve_predicates(left_preds, params)?;
let right_resolved = resolve_predicates(right_preds, params)?;
Ok(ResolvedPredicate {
column: ColumnName::new(String::new()),
op: ResolvedOp::Or(left_resolved, right_resolved),
})
}
}
}
fn resolve_value(val: &PredicateValue, params: &[Value]) -> Result<Value> {
match val {
PredicateValue::Int(v) => Ok(Value::BigInt(*v)),
PredicateValue::String(s) => Ok(Value::Text(s.clone())),
PredicateValue::Bool(b) => Ok(Value::Boolean(*b)),
PredicateValue::Null => Ok(Value::Null),
PredicateValue::Literal(v) => Ok(v.clone()),
PredicateValue::Param(idx) => {
let zero_idx = idx.checked_sub(1).ok_or(QueryError::ParameterNotFound(0))?;
params
.get(zero_idx)
.cloned()
.ok_or(QueryError::ParameterNotFound(*idx))
}
PredicateValue::ColumnRef(_) => Err(QueryError::UnsupportedFeature(
"column references in WHERE clause not supported (use JOIN ON for column-to-column comparisons)".to_string(),
)),
}
}
pub fn current_statement_timestamp_ns() -> i64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos().min(i64::MAX as u128) as i64)
.unwrap_or(0)
}
fn ns_to_days_since_epoch(ts_ns: i64) -> i32 {
let days = ts_ns.div_euclid(86_400_000_000_000);
let clamped = days.clamp(i64::from(i32::MIN), i64::from(i32::MAX));
#[allow(clippy::cast_possible_truncation)]
{
clamped as i32
}
}
pub fn fold_time_constants(expr: &ScalarExpr, statement_ts_ns: i64) -> ScalarExpr {
fn f(e: &ScalarExpr, ts: i64) -> ScalarExpr {
match e {
ScalarExpr::Now | ScalarExpr::CurrentTimestamp => ScalarExpr::Literal(
Value::Timestamp(kimberlite_types::Timestamp::from_nanos(ts.max(0) as u64)),
),
ScalarExpr::CurrentDate => ScalarExpr::Literal(Value::Date(ns_to_days_since_epoch(ts))),
ScalarExpr::Literal(v) => ScalarExpr::Literal(v.clone()),
ScalarExpr::Column(c) => ScalarExpr::Column(c.clone()),
ScalarExpr::Upper(x) => ScalarExpr::Upper(Box::new(f(x, ts))),
ScalarExpr::Lower(x) => ScalarExpr::Lower(Box::new(f(x, ts))),
ScalarExpr::Length(x) => ScalarExpr::Length(Box::new(f(x, ts))),
ScalarExpr::Trim(x) => ScalarExpr::Trim(Box::new(f(x, ts))),
ScalarExpr::Concat(xs) => ScalarExpr::Concat(xs.iter().map(|x| f(x, ts)).collect()),
ScalarExpr::Abs(x) => ScalarExpr::Abs(Box::new(f(x, ts))),
ScalarExpr::Round(x) => ScalarExpr::Round(Box::new(f(x, ts))),
ScalarExpr::RoundScale(x, n) => ScalarExpr::RoundScale(Box::new(f(x, ts)), *n),
ScalarExpr::Ceil(x) => ScalarExpr::Ceil(Box::new(f(x, ts))),
ScalarExpr::Floor(x) => ScalarExpr::Floor(Box::new(f(x, ts))),
ScalarExpr::Coalesce(xs) => ScalarExpr::Coalesce(xs.iter().map(|x| f(x, ts)).collect()),
ScalarExpr::Nullif(a, b) => ScalarExpr::Nullif(Box::new(f(a, ts)), Box::new(f(b, ts))),
ScalarExpr::Cast(x, t) => ScalarExpr::Cast(Box::new(f(x, ts)), *t),
ScalarExpr::Mod(a, b) => ScalarExpr::Mod(Box::new(f(a, ts)), Box::new(f(b, ts))),
ScalarExpr::Power(a, b) => ScalarExpr::Power(Box::new(f(a, ts)), Box::new(f(b, ts))),
ScalarExpr::Sqrt(x) => ScalarExpr::Sqrt(Box::new(f(x, ts))),
ScalarExpr::Substring(x, r) => ScalarExpr::Substring(Box::new(f(x, ts)), *r),
ScalarExpr::Extract(field, x) => ScalarExpr::Extract(*field, Box::new(f(x, ts))),
ScalarExpr::DateTrunc(field, x) => ScalarExpr::DateTrunc(*field, Box::new(f(x, ts))),
}
}
f(expr, statement_ts_ns)
}
fn fold_time_constants_in_filter(filter: &mut crate::plan::Filter, statement_ts_ns: i64) {
use crate::plan::{Filter, FilterOp};
match filter {
Filter::Condition(cond) => {
if let FilterOp::ScalarCmp { lhs, rhs, .. } = &mut cond.op {
*lhs = fold_time_constants(lhs, statement_ts_ns);
*rhs = fold_time_constants(rhs, statement_ts_ns);
}
}
Filter::And(parts) | Filter::Or(parts) => {
for p in parts {
fold_time_constants_in_filter(p, statement_ts_ns);
}
}
}
}
pub fn fold_time_constants_in_plan(plan: &mut QueryPlan, statement_ts_ns: i64) {
match plan {
QueryPlan::PointLookup { .. } => {
}
QueryPlan::RangeScan { filter, .. }
| QueryPlan::IndexScan { filter, .. }
| QueryPlan::TableScan { filter, .. } => {
if let Some(f) = filter {
fold_time_constants_in_filter(f, statement_ts_ns);
}
}
QueryPlan::Aggregate {
source,
aggregate_filters,
..
} => {
fold_time_constants_in_plan(source, statement_ts_ns);
for af in aggregate_filters.iter_mut().flatten() {
fold_time_constants_in_filter(af, statement_ts_ns);
}
}
QueryPlan::Join { left, right, .. } => {
fold_time_constants_in_plan(left, statement_ts_ns);
fold_time_constants_in_plan(right, statement_ts_ns);
}
QueryPlan::Materialize {
source,
filter,
scalar_columns,
..
} => {
fold_time_constants_in_plan(source, statement_ts_ns);
if let Some(f) = filter {
fold_time_constants_in_filter(f, statement_ts_ns);
}
for sc in scalar_columns {
sc.expr = fold_time_constants(&sc.expr, statement_ts_ns);
}
}
}
}
pub(crate) fn substitute_scalar_params(expr: &ScalarExpr, params: &[Value]) -> Result<ScalarExpr> {
fn s(e: &ScalarExpr, p: &[Value]) -> Result<ScalarExpr> {
Ok(match e {
ScalarExpr::Literal(Value::Placeholder(idx)) => {
let zero = idx.checked_sub(1).ok_or(QueryError::ParameterNotFound(0))?;
let v = p
.get(zero)
.cloned()
.ok_or(QueryError::ParameterNotFound(*idx))?;
ScalarExpr::Literal(v)
}
ScalarExpr::Literal(v) => ScalarExpr::Literal(v.clone()),
ScalarExpr::Column(c) => ScalarExpr::Column(c.clone()),
ScalarExpr::Upper(x) => ScalarExpr::Upper(Box::new(s(x, p)?)),
ScalarExpr::Lower(x) => ScalarExpr::Lower(Box::new(s(x, p)?)),
ScalarExpr::Length(x) => ScalarExpr::Length(Box::new(s(x, p)?)),
ScalarExpr::Trim(x) => ScalarExpr::Trim(Box::new(s(x, p)?)),
ScalarExpr::Concat(xs) => {
ScalarExpr::Concat(xs.iter().map(|x| s(x, p)).collect::<Result<Vec<_>>>()?)
}
ScalarExpr::Abs(x) => ScalarExpr::Abs(Box::new(s(x, p)?)),
ScalarExpr::Round(x) => ScalarExpr::Round(Box::new(s(x, p)?)),
ScalarExpr::RoundScale(x, n) => ScalarExpr::RoundScale(Box::new(s(x, p)?), *n),
ScalarExpr::Ceil(x) => ScalarExpr::Ceil(Box::new(s(x, p)?)),
ScalarExpr::Floor(x) => ScalarExpr::Floor(Box::new(s(x, p)?)),
ScalarExpr::Coalesce(xs) => {
ScalarExpr::Coalesce(xs.iter().map(|x| s(x, p)).collect::<Result<Vec<_>>>()?)
}
ScalarExpr::Nullif(a, b) => ScalarExpr::Nullif(Box::new(s(a, p)?), Box::new(s(b, p)?)),
ScalarExpr::Cast(x, t) => ScalarExpr::Cast(Box::new(s(x, p)?), *t),
ScalarExpr::Mod(a, b) => ScalarExpr::Mod(Box::new(s(a, p)?), Box::new(s(b, p)?)),
ScalarExpr::Power(a, b) => ScalarExpr::Power(Box::new(s(a, p)?), Box::new(s(b, p)?)),
ScalarExpr::Sqrt(x) => ScalarExpr::Sqrt(Box::new(s(x, p)?)),
ScalarExpr::Substring(x, r) => ScalarExpr::Substring(Box::new(s(x, p)?), *r),
ScalarExpr::Extract(f, x) => ScalarExpr::Extract(*f, Box::new(s(x, p)?)),
ScalarExpr::DateTrunc(f, x) => ScalarExpr::DateTrunc(*f, Box::new(s(x, p)?)),
ScalarExpr::Now => ScalarExpr::Now,
ScalarExpr::CurrentTimestamp => ScalarExpr::CurrentTimestamp,
ScalarExpr::CurrentDate => ScalarExpr::CurrentDate,
})
}
s(expr, params)
}
enum AccessPath {
PointLookup { key_values: Vec<Value> },
RangeScan {
start_key: Bound<kimberlite_store::Key>,
end_key: Bound<kimberlite_store::Key>,
remaining_predicates: Vec<ResolvedPredicate>,
},
IndexScan {
index_id: u64,
index_name: String,
start_key: Bound<kimberlite_store::Key>,
end_key: Bound<kimberlite_store::Key>,
remaining_predicates: Vec<ResolvedPredicate>,
},
TableScan { predicates: Vec<ResolvedPredicate> },
}
fn analyze_access_path(table_def: &TableDef, predicates: &[ResolvedPredicate]) -> AccessPath {
let pk_columns = &table_def.primary_key;
if pk_columns.is_empty() {
return AccessPath::TableScan {
predicates: predicates.to_vec(),
};
}
let mut pk_values: Vec<Option<Value>> = vec![None; pk_columns.len()];
let mut non_pk_predicates = Vec::new();
for pred in predicates {
if let Some(pk_pos) = table_def.primary_key_position(&pred.column) {
if let ResolvedOp::Eq(val) = &pred.op {
pk_values[pk_pos] = Some(val.clone());
continue;
}
}
non_pk_predicates.push(pred.clone());
}
if pk_values.iter().all(Option::is_some) {
let key_values: Vec<Value> = pk_values.into_iter().flatten().collect();
return AccessPath::PointLookup { key_values };
}
if pk_columns.len() == 1 {
let pk_col = &pk_columns[0];
let pk_predicates: Vec<_> = predicates.iter().filter(|p| &p.column == pk_col).collect();
if !pk_predicates.is_empty() {
let bounds_result = compute_range_bounds(&pk_predicates);
if bounds_result.is_empty {
return AccessPath::TableScan {
predicates: vec![ResolvedPredicate {
column: ColumnName::new(String::new()),
op: ResolvedOp::AlwaysFalse,
}],
};
}
let has_bounds = !matches!(
(&bounds_result.start, &bounds_result.end),
(Bound::Unbounded, Bound::Unbounded)
);
if has_bounds {
let mut remaining: Vec<_> = predicates
.iter()
.filter(|p| &p.column != pk_col)
.cloned()
.collect();
remaining.extend(bounds_result.unconverted);
return AccessPath::RangeScan {
start_key: bounds_result.start,
end_key: bounds_result.end,
remaining_predicates: remaining,
};
}
}
}
let index_candidates = find_usable_indexes(table_def, predicates);
if let Some((best_index, start, end, remaining)) = select_best_index(&index_candidates) {
return AccessPath::IndexScan {
index_id: best_index.index_id,
index_name: best_index.name.clone(),
start_key: start,
end_key: end,
remaining_predicates: remaining,
};
}
AccessPath::TableScan {
predicates: predicates.to_vec(),
}
}
struct RangeBoundsResult {
start: Bound<kimberlite_store::Key>,
end: Bound<kimberlite_store::Key>,
unconverted: Vec<ResolvedPredicate>,
is_empty: bool,
}
fn compute_range_bounds(predicates: &[&ResolvedPredicate]) -> RangeBoundsResult {
let mut lower: Option<(Value, bool)> = None; let mut upper: Option<(Value, bool)> = None;
let mut unconverted = Vec::new();
for pred in predicates {
match &pred.op {
ResolvedOp::Eq(val) => {
lower = Some((val.clone(), true));
upper = Some((val.clone(), true));
}
ResolvedOp::Gt(val) => {
lower = Some((val.clone(), false));
}
ResolvedOp::Ge(val) => {
lower = Some((val.clone(), true));
}
ResolvedOp::Lt(val) => {
upper = Some((val.clone(), false));
}
ResolvedOp::Le(val) => {
upper = Some((val.clone(), true));
}
ResolvedOp::In(_)
| ResolvedOp::NotIn(_)
| ResolvedOp::NotBetween(_, _)
| ResolvedOp::Like(_)
| ResolvedOp::NotLike(_)
| ResolvedOp::ILike(_)
| ResolvedOp::NotILike(_)
| ResolvedOp::IsNull
| ResolvedOp::IsNotNull
| ResolvedOp::JsonExtractEq { .. }
| ResolvedOp::JsonContains(_)
| ResolvedOp::AlwaysTrue
| ResolvedOp::AlwaysFalse
| ResolvedOp::Or(_, _)
| ResolvedOp::ScalarCmp { .. } => {
unconverted.push((*pred).clone());
}
}
}
let start = match &lower {
Some((val, true)) => Bound::Included(encode_key(&[val.clone()])),
Some((val, false)) => Bound::Excluded(encode_key(&[val.clone()])),
None => Bound::Unbounded,
};
let end = match &upper {
Some((val, true)) => {
Bound::Excluded(successor_key(&encode_key(&[val.clone()])))
}
Some((val, false)) => Bound::Excluded(encode_key(&[val.clone()])),
None => Bound::Unbounded,
};
let is_empty = bounds_are_unsatisfiable(&start, &end);
RangeBoundsResult {
start,
end,
unconverted,
is_empty,
}
}
fn bounds_are_unsatisfiable(
start: &Bound<kimberlite_store::Key>,
end: &Bound<kimberlite_store::Key>,
) -> bool {
use std::cmp::Ordering;
let (start_bytes, start_excluded) = match start {
Bound::Included(b) => (b.as_ref(), false),
Bound::Excluded(b) => (b.as_ref(), true),
Bound::Unbounded => return false,
};
let (end_bytes, end_excluded) = match end {
Bound::Included(b) => (b.as_ref(), false),
Bound::Excluded(b) => (b.as_ref(), true),
Bound::Unbounded => return false,
};
match start_bytes.cmp(end_bytes) {
Ordering::Greater => true,
Ordering::Equal => start_excluded || end_excluded,
Ordering::Less => false,
}
}
struct IndexCandidate<'a> {
index_def: &'a crate::schema::IndexDef,
start: Bound<kimberlite_store::Key>,
end: Bound<kimberlite_store::Key>,
remaining: Vec<ResolvedPredicate>,
score: usize,
}
fn find_usable_indexes<'a>(
table_def: &'a TableDef,
predicates: &[ResolvedPredicate],
) -> Vec<IndexCandidate<'a>> {
let mut candidates = Vec::new();
let max_iterations = 100;
for (iter_count, index_def) in table_def.indexes().iter().enumerate() {
if iter_count >= max_iterations {
break;
}
if index_def.columns.is_empty() {
continue;
}
let first_col = &index_def.columns[0];
let first_col_predicates: Vec<_> = predicates
.iter()
.filter(|p| &p.column == first_col)
.collect();
if first_col_predicates.is_empty() {
continue;
}
let bounds_result = compute_range_bounds(&first_col_predicates);
if bounds_result.is_empty {
continue;
}
if matches!(
(&bounds_result.start, &bounds_result.end),
(Bound::Unbounded, Bound::Unbounded)
) {
continue;
}
let mut remaining: Vec<_> = predicates
.iter()
.filter(|p| !index_def.columns.contains(&p.column))
.cloned()
.collect();
remaining.extend(bounds_result.unconverted);
let score = score_index(index_def, predicates);
candidates.push(IndexCandidate {
index_def,
start: bounds_result.start,
end: bounds_result.end,
remaining,
score,
});
}
candidates
}
fn score_index(index_def: &crate::schema::IndexDef, predicates: &[ResolvedPredicate]) -> usize {
let mut score = 0;
let max_columns = 10;
for (iter_count, index_col) in index_def.columns.iter().enumerate() {
if iter_count >= max_columns {
break;
}
for pred in predicates {
if &pred.column == index_col {
match &pred.op {
ResolvedOp::Eq(_) => score += 10, ResolvedOp::Lt(_)
| ResolvedOp::Le(_)
| ResolvedOp::Gt(_)
| ResolvedOp::Ge(_) => {
score += 5; }
_ => score += 1, }
}
}
}
score
}
type BestIndexResult<'a> = (
&'a crate::schema::IndexDef,
Bound<kimberlite_store::Key>,
Bound<kimberlite_store::Key>,
Vec<ResolvedPredicate>,
);
fn select_best_index<'a>(candidates: &'a [IndexCandidate<'a>]) -> Option<BestIndexResult<'a>> {
if candidates.is_empty() {
return None;
}
let max_score = candidates.iter().map(|c| c.score).max().unwrap_or(0);
let best_candidates: Vec<_> = candidates.iter().filter(|c| c.score == max_score).collect();
let best = best_candidates
.iter()
.min_by_key(|c| (c.remaining.len(), c.index_def.columns.len()))?;
Some((
best.index_def,
best.start.clone(),
best.end.clone(),
best.remaining.clone(),
))
}
fn build_filter(
table_def: &TableDef,
predicates: &[ResolvedPredicate],
table_name: &str,
) -> Result<Option<Filter>> {
if predicates.is_empty() {
return Ok(None);
}
let filters: Result<Vec<_>> = predicates
.iter()
.map(|p| build_filter_from_predicate(table_def, p, table_name))
.collect();
Ok(Some(Filter::and(filters?)))
}
fn build_filter_from_predicate(
table_def: &TableDef,
pred: &ResolvedPredicate,
table_name: &str,
) -> Result<Filter> {
if let ResolvedOp::Or(left_preds, right_preds) = &pred.op {
let left_filter = build_filter(table_def, left_preds, table_name)?.ok_or_else(|| {
QueryError::UnsupportedFeature("OR left side has no predicates".to_string())
})?;
let right_filter = build_filter(table_def, right_preds, table_name)?.ok_or_else(|| {
QueryError::UnsupportedFeature("OR right side has no predicates".to_string())
})?;
Ok(Filter::or(vec![left_filter, right_filter]))
} else {
let condition = build_filter_condition(table_def, pred, table_name)?;
Ok(Filter::single(condition))
}
}
fn build_filter_condition(
table_def: &TableDef,
pred: &ResolvedPredicate,
table_name: &str,
) -> Result<FilterCondition> {
if matches!(pred.op, ResolvedOp::AlwaysTrue) {
return Ok(FilterCondition {
column_idx: 0,
op: FilterOp::AlwaysTrue,
value: Value::Null,
});
}
if matches!(pred.op, ResolvedOp::AlwaysFalse) {
return Ok(FilterCondition {
column_idx: 0,
op: FilterOp::AlwaysFalse,
value: Value::Null,
});
}
if let ResolvedOp::ScalarCmp { lhs, op, rhs } = &pred.op {
let columns: Arc<[ColumnName]> = table_def
.columns
.iter()
.map(|c| c.name.clone())
.collect::<Vec<_>>()
.into();
return Ok(FilterCondition {
column_idx: 0,
op: FilterOp::ScalarCmp {
columns,
lhs: lhs.clone(),
op: *op,
rhs: rhs.clone(),
},
value: Value::Null,
});
}
let (col_idx, _) =
table_def
.find_column(&pred.column)
.ok_or_else(|| QueryError::ColumnNotFound {
table: table_name.to_string(),
column: pred.column.to_string(),
})?;
let (op, value) = match &pred.op {
ResolvedOp::Eq(v) => (FilterOp::Eq, v.clone()),
ResolvedOp::Lt(v) => (FilterOp::Lt, v.clone()),
ResolvedOp::Le(v) => (FilterOp::Le, v.clone()),
ResolvedOp::Gt(v) => (FilterOp::Gt, v.clone()),
ResolvedOp::Ge(v) => (FilterOp::Ge, v.clone()),
ResolvedOp::In(vals) => (FilterOp::In(vals.clone()), Value::Null), ResolvedOp::NotIn(vals) => (FilterOp::NotIn(vals.clone()), Value::Null),
ResolvedOp::NotBetween(low, high) => {
(FilterOp::NotBetween(low.clone(), high.clone()), Value::Null)
}
ResolvedOp::Like(pattern) => (FilterOp::Like(pattern.clone()), Value::Null),
ResolvedOp::NotLike(pattern) => (FilterOp::NotLike(pattern.clone()), Value::Null),
ResolvedOp::ILike(pattern) => (FilterOp::ILike(pattern.clone()), Value::Null),
ResolvedOp::NotILike(pattern) => (FilterOp::NotILike(pattern.clone()), Value::Null),
ResolvedOp::IsNull => (FilterOp::IsNull, Value::Null),
ResolvedOp::IsNotNull => (FilterOp::IsNotNull, Value::Null),
ResolvedOp::JsonExtractEq {
path,
as_text,
value: v,
} => (
FilterOp::JsonExtractEq {
path: path.clone(),
as_text: *as_text,
value: v.clone(),
},
Value::Null,
),
ResolvedOp::JsonContains(v) => (FilterOp::JsonContains(v.clone()), Value::Null),
ResolvedOp::AlwaysTrue => (FilterOp::AlwaysTrue, Value::Null),
ResolvedOp::AlwaysFalse => (FilterOp::AlwaysFalse, Value::Null),
ResolvedOp::Or(_, _) => {
return Err(QueryError::UnsupportedFeature(
"OR predicates must be handled at filter level, not as individual conditions"
.to_string(),
));
}
ResolvedOp::ScalarCmp { .. } => {
unreachable!("ScalarCmp must be handled by the short-circuit branch");
}
};
Ok(FilterCondition {
column_idx: col_idx,
op,
value,
})
}
fn determine_scan_order(order_by: &[OrderByClause], table_def: &TableDef) -> ScanOrder {
if order_by.is_empty() {
return ScanOrder::Ascending;
}
let first = &order_by[0];
if table_def.is_primary_key(&first.column) {
if first.ascending {
ScanOrder::Ascending
} else {
ScanOrder::Descending
}
} else {
ScanOrder::Ascending
}
}
fn build_sort_spec(
order_by: &[OrderByClause],
table_def: &TableDef,
table_name: &str,
) -> Result<Option<SortSpec>> {
if order_by.is_empty() {
return Ok(None);
}
let mut columns = Vec::with_capacity(order_by.len());
for clause in order_by {
let (col_idx, _) =
table_def
.find_column(&clause.column)
.ok_or_else(|| QueryError::ColumnNotFound {
table: table_name.to_string(),
column: clause.column.to_string(),
})?;
let order = if clause.ascending {
ScanOrder::Ascending
} else {
ScanOrder::Descending
};
columns.push((col_idx, order));
}
Ok(Some(SortSpec { columns }))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::parser::{ParsedStatement, parse_statement};
use crate::schema::{ColumnDef, DataType, SchemaBuilder};
use kimberlite_store::TableId;
fn parse_test_select(sql: &str) -> ParsedSelect {
match parse_statement(sql).unwrap() {
ParsedStatement::Select(s) => s,
_ => panic!("expected SELECT statement"),
}
}
fn test_schema() -> Schema {
SchemaBuilder::new()
.table(
"users",
TableId::new(1),
vec![
ColumnDef::new("id", DataType::BigInt).not_null(),
ColumnDef::new("name", DataType::Text).not_null(),
ColumnDef::new("age", DataType::BigInt),
],
vec!["id".into()],
)
.build()
}
#[test]
fn test_plan_point_lookup() {
let schema = test_schema();
let parsed = parse_test_select("SELECT * FROM users WHERE id = 42");
let plan = plan_query(&schema, &parsed, &[]).unwrap();
assert!(matches!(plan, QueryPlan::PointLookup { .. }));
}
#[test]
fn test_plan_range_scan() {
let schema = test_schema();
let parsed = parse_test_select("SELECT * FROM users WHERE id > 10");
let plan = plan_query(&schema, &parsed, &[]).unwrap();
assert!(matches!(plan, QueryPlan::RangeScan { .. }));
}
#[test]
fn test_plan_table_scan() {
let schema = test_schema();
let parsed = parse_test_select("SELECT * FROM users WHERE name = 'alice'");
let plan = plan_query(&schema, &parsed, &[]).unwrap();
assert!(matches!(plan, QueryPlan::TableScan { .. }));
}
#[test]
fn test_plan_with_params() {
let schema = test_schema();
let parsed = parse_test_select("SELECT * FROM users WHERE id = $1");
let plan = plan_query(&schema, &parsed, &[Value::BigInt(42)]).unwrap();
assert!(matches!(plan, QueryPlan::PointLookup { .. }));
}
#[test]
fn test_plan_missing_param() {
let schema = test_schema();
let parsed = parse_test_select("SELECT * FROM users WHERE id = $1");
let result = plan_query(&schema, &parsed, &[]);
assert!(matches!(result, Err(QueryError::ParameterNotFound(1))));
}
#[test]
fn test_plan_unknown_table() {
let schema = test_schema();
let parsed = parse_test_select("SELECT * FROM unknown");
let result = plan_query(&schema, &parsed, &[]);
assert!(matches!(result, Err(QueryError::TableNotFound(_))));
}
#[test]
fn test_plan_unknown_column() {
let schema = test_schema();
let parsed = parse_test_select("SELECT unknown FROM users");
let result = plan_query(&schema, &parsed, &[]);
assert!(matches!(result, Err(QueryError::ColumnNotFound { .. })));
}
#[test]
fn inverted_range_pk_short_circuits_to_alwaysfalse_table_scan() {
let schema = test_schema();
let parsed = parse_test_select("SELECT * FROM users WHERE id > 5 AND id < 3");
let plan = plan_query(&schema, &parsed, &[]).unwrap();
match plan {
QueryPlan::TableScan { filter, .. } => {
let filter_str = format!("{filter:?}");
assert!(
filter_str.contains("AlwaysFalse"),
"expected AlwaysFalse filter for x > 5 AND x < 3, got: {filter_str}"
);
}
other => panic!("expected TableScan with AlwaysFalse filter, got: {other:?}"),
}
}
#[test]
fn excluded_equal_bounds_pk_short_circuits() {
let schema = test_schema();
let parsed = parse_test_select("SELECT * FROM users WHERE id > 5 AND id < 5");
let plan = plan_query(&schema, &parsed, &[]).unwrap();
assert!(matches!(plan, QueryPlan::TableScan { .. }));
}
#[test]
fn one_sided_lower_bound_still_uses_range_scan() {
let schema = test_schema();
let parsed = parse_test_select("SELECT * FROM users WHERE id > 5");
let plan = plan_query(&schema, &parsed, &[]).unwrap();
assert!(matches!(plan, QueryPlan::RangeScan { .. }));
}
#[test]
fn bounds_are_unsatisfiable_unit() {
use std::ops::Bound;
assert!(!bounds_are_unsatisfiable(
&Bound::<kimberlite_store::Key>::Unbounded,
&Bound::Unbounded,
));
let k = kimberlite_store::Key::from(vec![1u8, 2, 3]);
assert!(!bounds_are_unsatisfiable(
&Bound::Included(k.clone()),
&Bound::Included(k.clone()),
));
assert!(bounds_are_unsatisfiable(
&Bound::Excluded(k.clone()),
&Bound::Excluded(k.clone()),
));
let k_high = kimberlite_store::Key::from(vec![9u8]);
let k_low = kimberlite_store::Key::from(vec![1u8]);
assert!(bounds_are_unsatisfiable(
&Bound::Included(k_high),
&Bound::Included(k_low),
));
}
#[test]
fn fold_time_constants_replaces_now_with_timestamp_literal() {
let ts_ns = 1_746_316_800_i64 * 1_000_000_000; let folded = fold_time_constants(&ScalarExpr::Now, ts_ns);
match folded {
ScalarExpr::Literal(Value::Timestamp(t)) => {
assert_eq!(t.as_nanos() as i64, ts_ns);
}
other => panic!("expected Literal(Timestamp), got {other:?}"),
}
}
#[test]
fn fold_time_constants_replaces_current_timestamp() {
let ts_ns = 1_746_316_800_i64 * 1_000_000_000;
let folded = fold_time_constants(&ScalarExpr::CurrentTimestamp, ts_ns);
match folded {
ScalarExpr::Literal(Value::Timestamp(t)) => {
assert_eq!(t.as_nanos() as i64, ts_ns);
}
other => panic!("expected Literal(Timestamp), got {other:?}"),
}
}
#[test]
fn fold_time_constants_replaces_current_date_with_days_since_epoch() {
let ts_ns = 1_746_316_800_i64 * 1_000_000_000;
let folded = fold_time_constants(&ScalarExpr::CurrentDate, ts_ns);
match folded {
ScalarExpr::Literal(Value::Date(days)) => {
assert_eq!(days, 20_212);
}
other => panic!("expected Literal(Date), got {other:?}"),
}
}
#[test]
fn fold_time_constants_recurses_into_operands() {
let ts_ns = 1_746_316_800_i64 * 1_000_000_000;
let expr =
ScalarExpr::Extract(kimberlite_types::DateField::Year, Box::new(ScalarExpr::Now));
let folded = fold_time_constants(&expr, ts_ns);
match folded {
ScalarExpr::Extract(field, inner) => {
assert_eq!(field, kimberlite_types::DateField::Year);
match *inner {
ScalarExpr::Literal(Value::Timestamp(_)) => {}
other => panic!("expected Literal(Timestamp) inner, got {other:?}"),
}
}
other => panic!("expected Extract, got {other:?}"),
}
}
#[test]
fn fold_time_constants_is_idempotent_on_non_sentinel_exprs() {
let ts_ns = 1_746_316_800_i64 * 1_000_000_000;
let expr = ScalarExpr::Upper(Box::new(ScalarExpr::Literal(Value::Text("hi".into()))));
let folded = fold_time_constants(&expr, ts_ns);
match folded {
ScalarExpr::Upper(inner) => match *inner {
ScalarExpr::Literal(Value::Text(s)) => assert_eq!(s, "hi"),
other => panic!("unexpected inner: {other:?}"),
},
other => panic!("expected Upper, got {other:?}"),
}
}
#[test]
fn fold_time_constants_is_deterministic_with_same_clock() {
let ts_ns = 1_746_316_800_i64 * 1_000_000_000;
let a = fold_time_constants(&ScalarExpr::Now, ts_ns);
let b = fold_time_constants(&ScalarExpr::Now, ts_ns);
match (a, b) {
(
ScalarExpr::Literal(Value::Timestamp(ta)),
ScalarExpr::Literal(Value::Timestamp(tb)),
) => assert_eq!(ta.as_nanos(), tb.as_nanos()),
other => panic!("expected matching Timestamp literals: {other:?}"),
}
}
fn find_scalar_columns(p: &QueryPlan) -> Option<&[crate::plan::ScalarColumnDef]> {
match p {
QueryPlan::Materialize { scalar_columns, .. } => Some(scalar_columns),
QueryPlan::Aggregate { source, .. } => find_scalar_columns(source),
_ => None,
}
}
fn assert_no_sentinel(e: &ScalarExpr) {
match e {
ScalarExpr::Now | ScalarExpr::CurrentTimestamp | ScalarExpr::CurrentDate => {
panic!("planner left an unfolded time-now sentinel in the plan");
}
ScalarExpr::Upper(x)
| ScalarExpr::Lower(x)
| ScalarExpr::Length(x)
| ScalarExpr::Trim(x)
| ScalarExpr::Abs(x)
| ScalarExpr::Round(x)
| ScalarExpr::Ceil(x)
| ScalarExpr::Floor(x)
| ScalarExpr::RoundScale(x, _)
| ScalarExpr::Sqrt(x)
| ScalarExpr::Substring(x, _)
| ScalarExpr::Extract(_, x)
| ScalarExpr::DateTrunc(_, x)
| ScalarExpr::Cast(x, _) => assert_no_sentinel(x),
ScalarExpr::Concat(xs) | ScalarExpr::Coalesce(xs) => {
for x in xs {
assert_no_sentinel(x);
}
}
ScalarExpr::Nullif(a, b) | ScalarExpr::Mod(a, b) | ScalarExpr::Power(a, b) => {
assert_no_sentinel(a);
assert_no_sentinel(b);
}
ScalarExpr::Literal(_) | ScalarExpr::Column(_) => {}
}
}
#[test]
fn plan_query_with_clock_folds_select_now() {
let schema = test_schema();
let parsed = parse_test_select("SELECT NOW() FROM users");
let ts_ns = 1_746_316_800_i64 * 1_000_000_000;
let plan = plan_query_with_clock(&schema, &parsed, &[], ts_ns).unwrap();
let scalars = find_scalar_columns(&plan).expect("plan must have scalar projection");
assert!(!scalars.is_empty(), "expected at least one scalar column");
for sc in scalars {
assert_no_sentinel(&sc.expr);
}
}
}