use crate::collection::types::Collection;
use crate::error::Result;
use crate::point::SearchResult;
use super::{distinct, pushdown, ExtractedComponents, MAX_LIMIT};
impl Collection {
pub(super) fn compute_cbo_strategy(
&self,
stmt: &crate::velesql::SelectStatement,
filter_condition: Option<&crate::velesql::Condition>,
limit: usize,
) -> (crate::velesql::ExecutionStrategy, usize) {
let meaningful_filter = filter_condition.and_then(crate::velesql::strip_vector_predicates);
let effective_filter = meaningful_filter.as_ref();
if Self::has_order_by_similarity(stmt) {
return self.cbo_strategy_for_order_by_similarity(effective_filter, limit);
}
let col_stats = self.get_stats();
let result = self.query_planner.choose_strategy_with_cbo_and_overfetch(
&col_stats,
effective_filter,
limit,
);
tracing::debug!(
strategy = ?result.0, over_fetch = result.1,
"CBO selected execution strategy (calibrated cost path)"
);
result
}
fn has_order_by_similarity(stmt: &crate::velesql::SelectStatement) -> bool {
let Some(order_by) = stmt.order_by.as_ref() else {
return false;
};
order_by
.iter()
.any(|item| Self::order_by_item_reduces_to_similarity(&item.expr))
}
fn order_by_item_reduces_to_similarity(expr: &crate::velesql::OrderByExpr) -> bool {
use crate::velesql::OrderByExpr;
match expr {
OrderByExpr::Similarity(_) | OrderByExpr::SimilarityBare => true,
OrderByExpr::Arithmetic(arith) => {
Self::arith_contains_similarity(arith) && !Self::arith_contains_variable(arith)
}
_ => false,
}
}
fn arith_contains_similarity(expr: &crate::velesql::ArithmeticExpr) -> bool {
use crate::velesql::ArithmeticExpr;
match expr {
ArithmeticExpr::Similarity(_) => true,
ArithmeticExpr::BinaryOp { left, right, .. } => {
Self::arith_contains_similarity(left) || Self::arith_contains_similarity(right)
}
_ => false,
}
}
fn arith_contains_variable(expr: &crate::velesql::ArithmeticExpr) -> bool {
use crate::velesql::ArithmeticExpr;
match expr {
ArithmeticExpr::Variable(_) => true,
ArithmeticExpr::BinaryOp { left, right, .. } => {
Self::arith_contains_variable(left) || Self::arith_contains_variable(right)
}
_ => false,
}
}
fn cbo_strategy_for_order_by_similarity(
&self,
filter_condition: Option<&crate::velesql::Condition>,
limit: usize,
) -> (crate::velesql::ExecutionStrategy, usize) {
let col_stats = self.get_stats();
let estimated_selectivity = filter_condition.map(|cond| {
crate::velesql::CostEstimator::new(&col_stats)
.estimate_condition_selectivity(cond)
.clamp(0.001, 1.0)
});
let limit_u64 = u64::try_from(limit).unwrap_or(u64::MAX);
let plan = self.query_planner.choose_hybrid_strategy(
true, filter_condition.is_some(),
Some(limit_u64),
estimated_selectivity,
);
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_precision_loss
)]
let over_fetch = (plan.over_fetch_factor.ceil() as usize).max(1);
tracing::debug!(
strategy = ?plan.strategy,
over_fetch,
use_early_termination = plan.use_early_termination,
recompute_scores = plan.recompute_scores,
"CBO selected execution strategy (ORDER BY similarity() path)"
);
(plan.strategy, over_fetch)
}
pub(super) fn dispatch_main_select(
&self,
stmt: &crate::velesql::SelectStatement,
params: &std::collections::HashMap<String, serde_json::Value>,
extracted: &ExtractedComponents,
limit: usize,
_ctx: &crate::guardrails::QueryContext,
) -> Result<Vec<SearchResult>> {
let has_graph_predicates = !extracted.graph_match_predicates.is_empty();
let skip_metadata_prefilter_for_graph_or = has_graph_predicates
&& stmt
.where_clause
.as_ref()
.is_some_and(Self::condition_contains_or);
let execution_limit = main_select_execution_limit(extracted, limit);
let search_opts = super::QuerySearchOptions::from_with_clause(stmt.with_clause.as_ref())
.with_fusion(stmt.fusion_clause.clone());
let (cbo_strategy, cbo_over_fetch) =
self.compute_cbo_strategy(stmt, extracted.filter_condition.as_ref(), limit);
let mut graph_cache = super::where_eval::GraphMatchEvalCache::default();
let anchor_fetch_limit = anchor_fetch_limit(stmt, extracted, limit);
let anchored = self.try_anchored_fetch(
stmt,
params,
extracted,
anchor_fetch_limit,
&mut graph_cache,
)?;
let mut results = self.resolve_initial_results(
anchored,
stmt,
params,
extracted,
execution_limit,
skip_metadata_prefilter_for_graph_or,
&search_opts,
cbo_strategy,
cbo_over_fetch,
&mut graph_cache,
)?;
if has_graph_predicates {
results = self.post_filter_graph_where(stmt, params, results, &mut graph_cache)?;
}
Ok(results)
}
#[allow(clippy::too_many_arguments)]
fn resolve_initial_results(
&self,
anchored: Option<Vec<SearchResult>>,
stmt: &crate::velesql::SelectStatement,
params: &std::collections::HashMap<String, serde_json::Value>,
extracted: &ExtractedComponents,
execution_limit: usize,
skip_metadata_prefilter_for_graph_or: bool,
search_opts: &super::QuerySearchOptions,
cbo_strategy: crate::velesql::ExecutionStrategy,
cbo_over_fetch: usize,
graph_cache: &mut super::where_eval::GraphMatchEvalCache,
) -> Result<Vec<SearchResult>> {
if let Some(results) = anchored {
return Ok(results);
}
let hybrid_anchored =
self.try_anchored_hybrid_fetch(stmt, params, extracted, execution_limit, graph_cache)?;
if let Some(r) = hybrid_anchored {
return Ok(r);
}
self.dispatch_vector_query(
extracted.vector_search.as_ref(),
extracted.similarity_conditions.first(),
&extracted.similarity_conditions,
extracted.filter_condition.as_ref(),
execution_limit,
skip_metadata_prefilter_for_graph_or,
search_opts,
cbo_strategy,
cbo_over_fetch,
)
.inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())
}
fn post_filter_graph_where(
&self,
stmt: &crate::velesql::SelectStatement,
params: &std::collections::HashMap<String, serde_json::Value>,
results: Vec<SearchResult>,
graph_cache: &mut super::where_eval::GraphMatchEvalCache,
) -> Result<Vec<SearchResult>> {
let Some(cond) = stmt.where_clause.as_ref() else {
return Ok(results);
};
self.apply_where_condition_to_results_with_cache(
results,
cond,
params,
&stmt.from_alias,
graph_cache,
)
.inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())
}
fn try_anchored_fetch(
&self,
stmt: &crate::velesql::SelectStatement,
params: &std::collections::HashMap<String, serde_json::Value>,
extracted: &ExtractedComponents,
limit: usize,
graph_cache: &mut super::where_eval::GraphMatchEvalCache,
) -> Result<Option<Vec<SearchResult>>> {
let Some(cond) = stmt.where_clause.as_ref() else {
return Ok(None);
};
if !anchored_fetch_applies(extracted, cond) {
return Ok(None);
}
let Some(anchor_ids) =
self.compute_required_anchor_ids(cond, params, &stmt.from_alias, graph_cache)?
else {
return Ok(None);
};
let results = match extracted.vector_search.as_ref() {
Some(vector) => self.search_near_with_anchor_ids(
vector,
&anchor_ids,
extracted.filter_condition.as_ref(),
limit,
)?,
None => self.fetch_anchor_candidates(
&anchor_ids,
cond,
params,
&stmt.from_alias,
graph_cache,
limit,
)?,
};
Ok(Some(results))
}
fn try_anchored_hybrid_fetch(
&self,
stmt: &crate::velesql::SelectStatement,
params: &std::collections::HashMap<String, serde_json::Value>,
extracted: &ExtractedComponents,
limit: usize,
graph_cache: &mut super::where_eval::GraphMatchEvalCache,
) -> Result<Option<Vec<SearchResult>>> {
let (Some(vector), Some(cond)) =
(extracted.vector_search.as_ref(), stmt.where_clause.as_ref())
else {
return Ok(None);
};
if extracted.graph_match_predicates.is_empty() {
return Ok(None);
}
let Some(text_query) = Self::extract_match_query(cond) else {
return Ok(None);
};
if !Self::graph_predicates_are_and_required(cond) {
return Ok(None);
}
let Some(anchor_ids) =
self.compute_required_anchor_ids(cond, params, &stmt.from_alias, graph_cache)?
else {
return Ok(None);
};
let vector_weight = stmt
.fusion_clause
.as_ref()
.and_then(|fc| fc.vector_weight)
.map(|w| {
#[allow(clippy::cast_possible_truncation)]
let w_f32 = w as f32;
w_f32
});
let rrf_k = stmt.fusion_clause.as_ref().and_then(|fc| fc.k);
let results = self.hybrid_search_with_anchors(
vector,
&text_query,
limit,
vector_weight,
rrf_k,
&anchor_ids,
)?;
Ok(Some(results))
}
fn graph_predicates_are_and_required(cond: &crate::velesql::Condition) -> bool {
!Self::condition_contains_or(cond)
}
#[allow(clippy::unused_self)]
pub(super) fn analyze_join_pushdown(
&self,
stmt: &crate::velesql::SelectStatement,
) -> pushdown::PushdownAnalysis {
if stmt.joins.is_empty() {
return pushdown::PushdownAnalysis::default();
}
let Some(ref cond) = stmt.where_clause else {
return pushdown::PushdownAnalysis::default();
};
let graph_vars: std::collections::HashSet<String> =
stmt.from_alias.iter().cloned().collect();
let join_tables = pushdown::extract_join_tables(&stmt.joins);
let analysis = pushdown::analyze_for_pushdown(cond, &graph_vars, &join_tables);
tracing::debug!(
column_store_filters = analysis.column_store_filters.len(),
graph_filters = analysis.graph_filters.len(),
post_join_filters = analysis.post_join_filters.len(),
has_pushdown = analysis.has_pushdown(),
"JOIN pushdown analysis complete"
);
analysis
}
pub(super) fn apply_select_postprocessing(
&self,
stmt: &crate::velesql::SelectStatement,
mut results: Vec<SearchResult>,
params: &std::collections::HashMap<String, serde_json::Value>,
limit: usize,
let_bindings: &[crate::velesql::LetBinding],
) -> Result<Vec<SearchResult>> {
if stmt.distinct == crate::velesql::DistinctMode::All {
results = distinct::apply_distinct(results, &stmt.columns);
}
if let Some(wfs) = Self::extract_window_functions(&stmt.columns) {
crate::velesql::window_evaluator::evaluate(&mut results, wfs)?;
}
self.apply_order_by_step(stmt, &mut results, params, let_bindings)?;
if let Some(offset) = stmt.offset {
let skip = usize::try_from(offset).unwrap_or(usize::MAX);
results = results.into_iter().skip(skip).collect();
}
results.truncate(limit);
if !let_bindings.is_empty() {
let per_result_let = Self::evaluate_let_for_results(let_bindings, &results);
inject_let_into_payloads(&mut results, &per_result_let);
}
Ok(results)
}
fn apply_order_by_step(
&self,
stmt: &crate::velesql::SelectStatement,
results: &mut [SearchResult],
params: &std::collections::HashMap<String, serde_json::Value>,
let_bindings: &[crate::velesql::LetBinding],
) -> Result<()> {
let Some(ref order_by) = stmt.order_by else {
return Ok(());
};
if let_bindings.is_empty() {
self.apply_order_by(results, order_by, params)?;
} else {
let per_result_let = Self::evaluate_let_for_results(let_bindings, results);
self.apply_order_by_with_let(results, order_by, params, &per_result_let)?;
}
Ok(())
}
fn evaluate_let_for_results(
let_bindings: &[crate::velesql::LetBinding],
results: &[SearchResult],
) -> Vec<Vec<(String, f32)>> {
results
.iter()
.map(|r| {
super::ordering::evaluate_let_bindings(
let_bindings,
r.score,
r.point.payload.as_ref(),
r.component_scores.as_deref(),
)
})
.collect()
}
fn extract_window_functions(
columns: &crate::velesql::SelectColumns,
) -> Option<&[crate::velesql::WindowFunction]> {
match columns {
crate::velesql::SelectColumns::Mixed {
window_functions, ..
} if !window_functions.is_empty() => Some(window_functions),
_ => None,
}
}
}
fn anchored_fetch_applies(
extracted: &ExtractedComponents,
cond: &crate::velesql::Condition,
) -> bool {
!extracted.graph_match_predicates.is_empty()
&& extracted.similarity_conditions.is_empty()
&& Collection::extract_match_query(cond).is_none()
}
fn main_select_execution_limit(extracted: &ExtractedComponents, limit: usize) -> usize {
let has_graph_predicates = !extracted.graph_match_predicates.is_empty();
let has_ranked_fetch =
extracted.vector_search.is_some() || !extracted.similarity_conditions.is_empty();
match (has_graph_predicates, has_ranked_fetch) {
(true, true) => graph_overfetch_limit(limit),
(true, false) => MAX_LIMIT,
(false, _) => limit,
}
}
fn graph_overfetch_limit(limit: usize) -> usize {
const GRAPH_OVERFETCH_CAP: usize = 10_000;
limit.max(limit.saturating_mul(10).min(GRAPH_OVERFETCH_CAP))
}
fn anchor_fetch_limit(
stmt: &crate::velesql::SelectStatement,
extracted: &ExtractedComponents,
limit: usize,
) -> usize {
if Collection::has_order_by_similarity(stmt) && extracted.vector_search.is_none() {
MAX_LIMIT
} else {
limit
}
}
fn inject_let_into_payloads(results: &mut [SearchResult], per_result_let: &[Vec<(String, f32)>]) {
for (result, bindings) in results.iter_mut().zip(per_result_let.iter()) {
if bindings.is_empty() {
continue;
}
let payload = result
.point
.payload
.get_or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
if let serde_json::Value::Object(map) = payload {
for (name, value) in bindings {
map.insert(name.clone(), serde_json::Value::from(f64::from(*value)));
}
}
}
}