use super::cost_model::{CostEstimator, OperationCost};
use crate::collection::stats::CollectionStats;
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum PhysicalPlan {
SeqScan {
collection: String,
estimated_rows: u64,
},
IndexScan {
collection: String,
index_name: String,
selectivity: f64,
},
VectorSearch {
collection: String,
k: u64,
ef_search: u64,
},
GraphTraversal {
collection: String,
max_depth: u32,
limit: u64,
},
Filter {
input: Box<PhysicalPlan>,
selectivity: f64,
},
Limit {
input: Box<PhysicalPlan>,
limit: u64,
offset: u64,
},
}
impl PhysicalPlan {
#[must_use]
pub fn plan_type(&self) -> &'static str {
match self {
Self::SeqScan { .. } => "SeqScan",
Self::IndexScan { .. } => "IndexScan",
Self::VectorSearch { .. } => "VectorSearch",
Self::GraphTraversal { .. } => "GraphTraversal",
Self::Filter { .. } => "Filter",
Self::Limit { .. } => "Limit",
}
}
}
#[derive(Debug, Clone)]
pub struct CandidatePlan {
pub plan: PhysicalPlan,
pub cost: OperationCost,
pub description: String,
}
impl CandidatePlan {
#[must_use]
pub fn new(plan: PhysicalPlan, cost: OperationCost, description: impl Into<String>) -> Self {
Self {
plan,
cost,
description: description.into(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct QueryCharacteristics {
pub collection: String,
pub has_similarity: bool,
pub has_match: bool,
pub has_filter: bool,
pub filter_selectivity: Option<f64>,
pub top_k: Option<u64>,
pub ef_search: Option<u64>,
pub max_depth: Option<u32>,
pub limit: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct PlanGenerator {
estimator: CostEstimator,
}
impl Default for PlanGenerator {
fn default() -> Self {
Self::new(CostEstimator::default())
}
}
impl PlanGenerator {
#[must_use]
pub fn new(estimator: CostEstimator) -> Self {
Self { estimator }
}
#[must_use]
pub fn generate_plans(
&self,
query: &QueryCharacteristics,
stats: &CollectionStats,
) -> Vec<CandidatePlan> {
let mut plans = Vec::new();
plans.push(self.generate_scan_plan(query, stats));
if query.has_filter {
if let Some(selectivity) = query.filter_selectivity {
plans.extend(self.generate_index_plans(query, stats, selectivity));
}
}
if query.has_similarity {
plans.push(self.generate_vector_plan(query, stats));
}
if query.has_match {
plans.push(self.generate_graph_plan(query, stats));
}
if query.has_similarity && query.has_match {
plans.extend(self.generate_hybrid_plans(query, stats));
}
plans
}
#[must_use]
pub fn select_best(&self, plans: Vec<CandidatePlan>) -> Option<CandidatePlan> {
plans.into_iter().min_by(|a, b| {
a.cost
.total
.partial_cmp(&b.cost.total)
.unwrap_or(std::cmp::Ordering::Equal)
})
}
#[must_use]
pub fn optimize(
&self,
query: &QueryCharacteristics,
stats: &CollectionStats,
) -> Option<CandidatePlan> {
let plans = self.generate_plans(query, stats);
self.select_best(plans)
}
fn generate_scan_plan(
&self,
query: &QueryCharacteristics,
stats: &CollectionStats,
) -> CandidatePlan {
let mut cost = self.estimator.estimate_scan(stats);
if query.has_filter {
let selectivity = query.filter_selectivity.unwrap_or(0.1);
let filter_cost = self.estimator.estimate_filter(cost.rows, selectivity);
cost = cost.then(filter_cost);
}
let plan = PhysicalPlan::SeqScan {
collection: query.collection.clone(),
estimated_rows: cost.rows,
};
CandidatePlan::new(plan, cost, "Full scan with optional filter")
}
fn generate_index_plans(
&self,
query: &QueryCharacteristics,
stats: &CollectionStats,
selectivity: f64,
) -> Vec<CandidatePlan> {
let mut plans = Vec::new();
for (name, index_stats) in &stats.index_stats {
if name.starts_with("prop_") || name == "bm25_text" {
let cost = self
.estimator
.estimate_index_lookup(index_stats, selectivity);
let plan = PhysicalPlan::IndexScan {
collection: query.collection.clone(),
index_name: name.clone(),
selectivity,
};
plans.push(CandidatePlan::new(
plan,
cost,
format!("Index scan on {name}"),
));
}
}
plans
}
fn generate_vector_plan(
&self,
query: &QueryCharacteristics,
stats: &CollectionStats,
) -> CandidatePlan {
let k = query.top_k.unwrap_or(10);
let ef_search = query.ef_search.unwrap_or(100);
let cost = self
.estimator
.estimate_vector_search(k, ef_search, stats.row_count);
let plan = PhysicalPlan::VectorSearch {
collection: query.collection.clone(),
k,
ef_search,
};
CandidatePlan::new(plan, cost, "HNSW vector search")
}
fn generate_graph_plan(
&self,
query: &QueryCharacteristics,
stats: &CollectionStats,
) -> CandidatePlan {
let max_depth = query.max_depth.unwrap_or(3);
let limit = query.limit.unwrap_or(100);
let avg_degree = if stats.row_count > 0 {
(stats
.index_stats
.get("hnsw_primary")
.map_or(0, |i| i.entry_count) as f64
/ stats.row_count as f64)
.max(2.0)
} else {
5.0
};
let cost = self
.estimator
.estimate_graph_traversal(avg_degree, max_depth, limit);
let plan = PhysicalPlan::GraphTraversal {
collection: query.collection.clone(),
max_depth,
limit,
};
CandidatePlan::new(plan, cost, "Graph pattern traversal")
}
fn generate_hybrid_plans(
&self,
query: &QueryCharacteristics,
stats: &CollectionStats,
) -> Vec<CandidatePlan> {
let mut plans = Vec::new();
let vector_plan = self.generate_vector_plan(query, stats);
let graph_filter_cost = self.estimator.estimate_filter(
vector_plan.cost.rows,
0.5, );
let vector_first_cost = vector_plan.cost.then(graph_filter_cost);
plans.push(CandidatePlan::new(
PhysicalPlan::Filter {
input: Box::new(vector_plan.plan.clone()),
selectivity: 0.5,
},
vector_first_cost,
"Vector search → Graph filter",
));
let graph_plan = self.generate_graph_plan(query, stats);
let vector_rerank_cost = self.estimator.estimate_vector_search(
query.top_k.unwrap_or(10),
query.ef_search.unwrap_or(50),
graph_plan.cost.rows,
);
let graph_first_cost = graph_plan.cost.then(vector_rerank_cost);
plans.push(CandidatePlan::new(
PhysicalPlan::Filter {
input: Box::new(graph_plan.plan.clone()),
selectivity: 1.0,
},
graph_first_cost,
"Graph traversal → Vector rerank",
));
plans
}
}