pub mod columnar;
pub mod document_schemaless;
pub mod document_strict;
pub mod kv;
pub mod spatial;
pub mod timeseries;
use crate::error::Result;
use crate::types::*;
pub struct InsertParams {
pub collection: String,
pub columns: Vec<String>,
pub rows: Vec<Vec<(String, SqlValue)>>,
pub column_defaults: Vec<(String, String)>,
}
pub struct ScanParams {
pub collection: String,
pub alias: Option<String>,
pub filters: Vec<Filter>,
pub projection: Vec<Projection>,
pub sort_keys: Vec<SortKey>,
pub limit: Option<usize>,
pub offset: usize,
pub distinct: bool,
pub window_functions: Vec<WindowSpec>,
pub indexes: Vec<IndexSpec>,
}
pub struct PointGetParams {
pub collection: String,
pub alias: Option<String>,
pub key_column: String,
pub key_value: SqlValue,
}
pub struct UpdateParams {
pub collection: String,
pub assignments: Vec<(String, SqlExpr)>,
pub filters: Vec<Filter>,
pub target_keys: Vec<SqlValue>,
pub returning: bool,
}
pub struct DeleteParams {
pub collection: String,
pub filters: Vec<Filter>,
pub target_keys: Vec<SqlValue>,
}
pub struct UpsertParams {
pub collection: String,
pub columns: Vec<String>,
pub rows: Vec<Vec<(String, SqlValue)>>,
pub column_defaults: Vec<(String, String)>,
pub on_conflict_updates: Vec<(String, SqlExpr)>,
}
pub struct AggregateParams {
pub collection: String,
pub alias: Option<String>,
pub filters: Vec<Filter>,
pub group_by: Vec<SqlExpr>,
pub aggregates: Vec<AggregateExpr>,
pub having: Vec<Filter>,
pub limit: usize,
pub bucket_interval_ms: Option<i64>,
pub group_columns: Vec<String>,
pub has_auto_tier: bool,
}
pub trait EngineRules {
fn plan_insert(&self, params: InsertParams) -> Result<Vec<SqlPlan>>;
fn plan_upsert(&self, params: UpsertParams) -> Result<Vec<SqlPlan>>;
fn plan_scan(&self, params: ScanParams) -> Result<SqlPlan>;
fn plan_point_get(&self, params: PointGetParams) -> Result<SqlPlan>;
fn plan_update(&self, params: UpdateParams) -> Result<Vec<SqlPlan>>;
fn plan_delete(&self, params: DeleteParams) -> Result<Vec<SqlPlan>>;
fn plan_aggregate(&self, params: AggregateParams) -> Result<SqlPlan>;
}
pub(crate) fn try_document_index_lookup(
params: &ScanParams,
engine: EngineType,
) -> Option<SqlPlan> {
if !params.sort_keys.is_empty() || !params.window_functions.is_empty() {
return None;
}
let query_conjuncts: Vec<SqlExpr> = params
.filters
.iter()
.flat_map(|f| match &f.expr {
FilterExpr::Expr(e) => {
let mut v = Vec::new();
flatten_and(e, &mut v);
v
}
_ => Vec::new(),
})
.collect();
for (i, f) in params.filters.iter().enumerate() {
let Some((field, value, residual)) = extract_equality_with_residual(&f.expr) else {
continue;
};
let canonical = canonical_index_field(&field);
let Some(idx) = params.indexes.iter().find(|i| {
i.state == IndexState::Ready
&& i.field == canonical
&& partial_index_entailed(i.predicate.as_deref(), &query_conjuncts)
}) else {
continue;
};
let mut remaining = params.filters.clone();
match residual {
Some(expr) => {
remaining[i] = Filter {
expr: FilterExpr::Expr(expr),
};
}
None => {
remaining.remove(i);
}
}
let lookup_value = if idx.case_insensitive {
lowercase_string_value(&value)
} else {
value
};
return Some(SqlPlan::DocumentIndexLookup {
collection: params.collection.clone(),
alias: params.alias.clone(),
engine,
field: idx.field.clone(),
value: lookup_value,
filters: remaining,
projection: params.projection.clone(),
sort_keys: params.sort_keys.clone(),
limit: params.limit,
offset: params.offset,
distinct: params.distinct,
window_functions: params.window_functions.clone(),
case_insensitive: idx.case_insensitive,
});
}
None
}
fn extract_equality_with_residual(
expr: &FilterExpr,
) -> Option<(String, SqlValue, Option<SqlExpr>)> {
match expr {
FilterExpr::Comparison {
field,
op: CompareOp::Eq,
value,
} => Some((field.clone(), value.clone(), None)),
FilterExpr::Expr(sql_expr) => {
let (col, lit, residual) = split_equality_from_expr(sql_expr)?;
Some((col, lit, residual))
}
_ => None,
}
}
fn split_equality_from_expr(expr: &SqlExpr) -> Option<(String, SqlValue, Option<SqlExpr>)> {
let mut conjuncts: Vec<SqlExpr> = Vec::new();
flatten_and(expr, &mut conjuncts);
let eq_idx = conjuncts.iter().position(is_column_eq_literal)?;
let eq = conjuncts.remove(eq_idx);
let (col, lit) = match eq {
SqlExpr::BinaryOp { left, op, right } => match (*left, op, *right) {
(SqlExpr::Column { name, .. }, BinaryOp::Eq, SqlExpr::Literal(v)) => (name, v),
(SqlExpr::Literal(v), BinaryOp::Eq, SqlExpr::Column { name, .. }) => (name, v),
_ => return None,
},
_ => return None,
};
let residual = rebuild_and(conjuncts);
Some((col, lit, residual))
}
fn flatten_and(expr: &SqlExpr, out: &mut Vec<SqlExpr>) {
match expr {
SqlExpr::BinaryOp {
left,
op: BinaryOp::And,
right,
} => {
flatten_and(left, out);
flatten_and(right, out);
}
other => out.push(other.clone()),
}
}
fn rebuild_and(mut conjuncts: Vec<SqlExpr>) -> Option<SqlExpr> {
let last = conjuncts.pop()?;
Some(
conjuncts
.into_iter()
.rfold(last, |acc, next| SqlExpr::BinaryOp {
left: Box::new(next),
op: BinaryOp::And,
right: Box::new(acc),
}),
)
}
fn partial_index_entailed(predicate: Option<&str>, query_conjuncts: &[SqlExpr]) -> bool {
let Some(text) = predicate else {
return true;
};
let Ok(parsed) = crate::parse_expr_string(text) else {
return false;
};
let mut index_conjuncts: Vec<SqlExpr> = Vec::new();
flatten_and(&parsed, &mut index_conjuncts);
index_conjuncts.iter().all(|ic| {
let ic_dbg = format!("{ic:?}");
query_conjuncts.iter().any(|qc| format!("{qc:?}") == ic_dbg)
})
}
fn is_column_eq_literal(expr: &SqlExpr) -> bool {
matches!(
expr,
SqlExpr::BinaryOp { left, op: BinaryOp::Eq, right }
if matches!(
(left.as_ref(), right.as_ref()),
(SqlExpr::Column { .. }, SqlExpr::Literal(_))
| (SqlExpr::Literal(_), SqlExpr::Column { .. }),
)
)
}
fn canonical_index_field(field: &str) -> String {
if field.starts_with("$.") || field.starts_with('$') {
field.to_string()
} else {
format!("$.{field}")
}
}
fn lowercase_string_value(v: &SqlValue) -> SqlValue {
if let SqlValue::String(s) = v {
SqlValue::String(s.to_lowercase())
} else {
v.clone()
}
}
pub fn resolve_engine_rules(engine: EngineType) -> &'static dyn EngineRules {
match engine {
EngineType::DocumentSchemaless => &document_schemaless::SchemalessRules,
EngineType::DocumentStrict => &document_strict::StrictRules,
EngineType::KeyValue => &kv::KvRules,
EngineType::Columnar => &columnar::ColumnarRules,
EngineType::Timeseries => ×eries::TimeseriesRules,
EngineType::Spatial => &spatial::SpatialRules,
}
}