use crate::query::ast::*;
use crate::query::error::QueryError;
use crate::types::Atom;
#[derive(Debug)]
pub struct QueryPlan {
pub scan: ScanType,
pub filters: Vec<FilterStep>,
pub aggregation: Option<AggregationPlan>,
pub projections: Vec<Projection>,
pub ordering: Option<OrderBy>,
pub pagination: Option<PaginationStep>,
pub estimated_cost: CostEstimate,
}
#[derive(Debug, Clone)]
pub enum ScanType {
IndexExact(String),
IndexPrefix(String),
PatternScan(KeyPattern),
FullScan,
}
#[derive(Debug, Clone)]
pub enum FilterStep {
KeyPattern(KeyPattern),
ValueComparison {
field: OrderField,
op: ComparisonOp,
value: ValueFilter,
},
TimeRange(TimeRange),
Boolean {
op: BoolOp,
children: Vec<FilterStep>,
},
}
#[derive(Debug, Clone)]
pub enum BoolOp {
And,
Or,
Not,
}
#[derive(Debug, Clone)]
pub struct AggregationPlan {
pub functions: Vec<AggregateFunction>,
pub group_by: Option<GroupBy>,
}
#[derive(Debug, Clone, Copy)]
pub struct PaginationStep {
pub limit: u64,
pub offset: u64,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct CostEstimate {
pub io_cost: f64,
pub cpu_cost: f64,
pub memory_bytes: u64,
pub estimated_rows: u64,
}
impl Default for CostEstimate {
fn default() -> Self {
Self {
io_cost: 0.0,
cpu_cost: 0.0,
memory_bytes: 0,
estimated_rows: 0,
}
}
}
impl QueryPlan {
pub fn from_ast(ast: &QueryAst, total_keys: u64) -> Result<Self, QueryError> {
match ast {
QueryAst::Select(q) => Self::plan_select(q, total_keys),
QueryAst::Aggregate(q) => Self::plan_aggregate(q, total_keys),
QueryAst::Explain(inner) => Self::from_ast(inner, total_keys),
_ => Err(QueryError::Internal(
"unsupported query type for planning".into(),
)),
}
}
fn plan_select(q: &SelectQuery, total_keys: u64) -> Result<Self, QueryError> {
let scan = select_scan_type(q.from.as_ref(), total_keys);
let filters = build_filters(q.where_clause.as_ref());
let pagination = match (q.limit, q.offset) {
(Some(limit), offset) => Some(PaginationStep {
limit,
offset: offset.unwrap_or(0),
}),
(None, Some(offset)) => Some(PaginationStep {
limit: u64::MAX,
offset,
}),
_ => None,
};
let estimated_cost = estimate_cost(&scan, &filters, total_keys);
Ok(QueryPlan {
scan,
filters,
aggregation: None,
projections: q.projections.clone(),
ordering: q.order_by.clone(),
pagination,
estimated_cost,
})
}
fn plan_aggregate(q: &AggregateQuery, total_keys: u64) -> Result<Self, QueryError> {
for func in &q.aggregations {
if matches!(func, AggregateFunction::Sum | AggregateFunction::Avg) {
}
}
let scan = select_scan_type(q.from.as_ref(), total_keys);
let filters = build_filters(q.where_clause.as_ref());
let estimated_cost = estimate_cost(&scan, &filters, total_keys);
Ok(QueryPlan {
scan,
filters,
aggregation: Some(AggregationPlan {
functions: q.aggregations.clone(),
group_by: q.group_by.clone(),
}),
projections: vec![],
ordering: q.order_by.clone(),
pagination: q.limit.map(|limit| PaginationStep { limit, offset: 0 }),
estimated_cost,
})
}
}
fn select_scan_type(from: Option<&KeyPattern>, _total_keys: u64) -> ScanType {
match from {
None => ScanType::FullScan,
Some(KeyPattern::Exact(key)) => ScanType::IndexExact(key.clone()),
Some(KeyPattern::Prefix(prefix)) => ScanType::IndexPrefix(prefix.clone()),
Some(KeyPattern::Glob(glob)) => {
if let Some(prefix) = extract_prefix_from_glob(glob) {
ScanType::IndexPrefix(prefix)
} else {
ScanType::PatternScan(KeyPattern::Glob(glob.clone()))
}
}
Some(KeyPattern::Regex(re)) => ScanType::PatternScan(KeyPattern::Regex(re.clone())),
Some(KeyPattern::Union(parts)) => {
ScanType::PatternScan(KeyPattern::Union(parts.clone()))
}
}
}
fn extract_prefix_from_glob(glob: &str) -> Option<String> {
if let Some(star_pos) = glob.find('*') {
let prefix = &glob[..star_pos];
if !prefix.contains('?') && !prefix.is_empty() {
return Some(prefix.to_string());
}
}
None
}
fn build_filters(where_clause: Option<&WhereClause>) -> Vec<FilterStep> {
match where_clause {
None => vec![],
Some(wc) => vec![condition_to_filter(&wc.root)],
}
}
fn condition_to_filter(cond: &Condition) -> FilterStep {
match cond {
Condition::Comparison { field, op, rhs } => FilterStep::ValueComparison {
field: field.clone(),
op: *op,
value: rhs.clone(),
},
Condition::Key(kp) => FilterStep::KeyPattern(kp.clone()),
Condition::TimeRange(tr) => FilterStep::TimeRange(*tr),
Condition::Boolean(BooleanOp::And(cs)) => FilterStep::Boolean {
op: BoolOp::And,
children: cs.iter().map(condition_to_filter).collect(),
},
Condition::Boolean(BooleanOp::Or(cs)) => FilterStep::Boolean {
op: BoolOp::Or,
children: cs.iter().map(condition_to_filter).collect(),
},
Condition::Boolean(BooleanOp::Not(c)) => FilterStep::Boolean {
op: BoolOp::Not,
children: vec![condition_to_filter(c)],
},
_ => FilterStep::ValueComparison {
field: OrderField::Value,
op: ComparisonOp::Eq,
value: ValueFilter::Single(Atom::Null),
},
}
}
fn estimate_cost(scan: &ScanType, filters: &[FilterStep], total_keys: u64) -> CostEstimate {
let (io_cost, estimated_rows) = match scan {
ScanType::IndexExact(_) => (1.0, 1),
ScanType::IndexPrefix(_) => {
let est = (total_keys as f64 * 0.1).max(1.0) as u64;
(est as f64, est)
}
ScanType::PatternScan(_) => (total_keys as f64, total_keys / 2),
ScanType::FullScan => (total_keys as f64, total_keys),
};
let selectivity = 0.5_f64.powi(filters.len() as i32);
let final_rows = (estimated_rows as f64 * selectivity).max(1.0) as u64;
CostEstimate {
io_cost,
cpu_cost: io_cost * 2.0, memory_bytes: final_rows * 128, estimated_rows: final_rows,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::query::parser::parse_eql;
#[test]
fn plan_simple_select() {
let ast = parse_eql("SELECT * FROM \"sensor/*\"").unwrap();
let plan = QueryPlan::from_ast(&ast, 1000).unwrap();
match &plan.scan {
ScanType::IndexPrefix(p) => assert_eq!(p, "sensor/"),
other => panic!("expected IndexPrefix, got {:?}", other),
}
}
#[test]
fn plan_exact_key() {
let ast = parse_eql("SELECT * FROM \"config/name\"").unwrap();
let plan = QueryPlan::from_ast(&ast, 1000).unwrap();
match &plan.scan {
ScanType::IndexExact(k) => assert_eq!(k, "config/name"),
other => panic!("expected IndexExact, got {:?}", other),
}
}
#[test]
fn plan_with_limit() {
let ast = parse_eql("SELECT * FROM \"k\" LIMIT 10 OFFSET 5").unwrap();
let plan = QueryPlan::from_ast(&ast, 100).unwrap();
let pag = plan.pagination.unwrap();
assert_eq!(pag.limit, 10);
assert_eq!(pag.offset, 5);
}
#[test]
fn plan_aggregate_has_aggregation() {
let ast = parse_eql("SELECT AVG(value) FROM \"s\" GROUP BY HOUR").unwrap();
let plan = QueryPlan::from_ast(&ast, 500).unwrap();
let agg = plan.aggregation.unwrap();
assert_eq!(agg.functions, vec![AggregateFunction::Avg]);
assert_eq!(agg.group_by, Some(GroupBy::TimeBucket(TimeBucket::Hour)));
}
#[test]
fn plan_cost_estimate_index_cheaper_than_full() {
let ast_prefix = parse_eql("SELECT * FROM \"sensor/*\"").unwrap();
let ast_full = parse_eql("SELECT * FROM \"*\"").unwrap();
let plan_prefix = QueryPlan::from_ast(&ast_prefix, 10000).unwrap();
let plan_full = QueryPlan::from_ast(&ast_full, 10000).unwrap();
assert!(plan_prefix.estimated_cost.io_cost < plan_full.estimated_cost.io_cost);
}
#[test]
fn extract_prefix_from_glob_works() {
assert_eq!(extract_prefix_from_glob("sensor/*"), Some("sensor/".into()));
assert_eq!(
extract_prefix_from_glob("data/temp/*"),
Some("data/temp/".into())
);
assert_eq!(extract_prefix_from_glob("*"), None);
assert_eq!(extract_prefix_from_glob("s?nsor/*"), None);
}
}