use crate::sync::Arc;
use rustc_hash::FxHashMap as HashMap;
use smallvec::SmallVec;
use turso_ext::{ConstraintInfo, ConstraintUsage, ResultCode};
use turso_parser::ast::{self, SortOrder, TableInternalId};
use crate::schema::Schema;
use crate::stats::AnalyzeStats;
use crate::translate::expr::{as_binary_components, walk_expr, WalkControl};
use crate::translate::optimizer::constraints::{
convert_to_vtab_constraint, ordered_materialized_key_columns, BinaryExprSide, Constraint,
ConstraintOperator, RangeConstraintRef,
};
use crate::translate::optimizer::cost::{rows_per_leaf_page_for_index, RowCountEstimate};
use crate::translate::optimizer::cost_params::CostModelParams;
use crate::translate::plan::{
plan_has_outer_scope_dependency, HashJoinKey, HashJoinType, NonFromClauseSubquery,
SetOperation, SubqueryState, TableReferences, WhereTerm,
};
use crate::vdbe::affinity::Affinity;
use crate::vdbe::hash_table::DEFAULT_MEM_BUDGET;
use crate::{
schema::{FromClauseSubquery, Index, IndexColumn, Table},
translate::plan::{IndexMethodQuery, IterationDirection, JoinOrderMember, JoinedTable},
vtab::VirtualTable,
LimboError, Result,
};
use super::{
constraints::{
usable_constraints_for_join_order, usable_constraints_for_lhs_mask, TableConstraints,
},
cost::{
estimate_cost_for_scan_or_seek, estimate_index_cost, estimate_rows_per_seek, AnalyzeCtx,
Cost, IndexInfo,
},
join::JoinPlanningContext,
multi_index::{
consider_multi_index_intersection, consider_multi_index_union, MultiIndexBranchParams,
},
order::{
btree_access_order_consumed, subquery_intrinsic_order_consumed, ColumnTarget,
EqualityPrefixScope, OrderTarget,
},
AvailableIndexes,
};
use crate::translate::planner::TableMask;
#[derive(Debug, Clone)]
pub struct AccessMethod {
pub cost: Cost,
pub estimated_rows_per_outer_row: f64,
pub residual_constraints: ResidualConstraintMode,
pub consumed_where_terms: SmallVec<[usize; 4]>,
pub params: AccessMethodParams,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResidualConstraintMode {
ApplyUnconsumed,
None,
}
#[derive(Debug, Clone)]
pub enum AccessMethodParams {
BTreeTable {
iter_dir: IterationDirection,
index: Option<Arc<Index>>,
constraint_refs: Vec<RangeConstraintRef>,
},
VirtualTable {
idx_num: i32,
idx_str: Option<String>,
constraints: Vec<ConstraintInfo>,
constraint_usages: Vec<ConstraintUsage>,
},
Subquery { iter_dir: IterationDirection },
MaterializedSubquery {
index: Arc<Index>,
constraint_refs: Vec<RangeConstraintRef>,
iter_dir: IterationDirection,
},
HashJoin {
build_table_idx: usize,
probe_table_idx: usize,
join_keys: Vec<HashJoinKey>,
mem_budget: usize,
materialize_build_input: bool,
use_bloom_filter: bool,
join_type: HashJoinType,
},
IndexMethod {
query: IndexMethodQuery,
where_covered: Option<usize>,
},
MultiIndexScan {
branches: Vec<MultiIndexBranchParams>,
where_term_idx: usize,
set_op: SetOperation,
},
InSeek {
index: Option<Arc<Index>>,
affinity: Affinity,
where_term_idx: usize,
},
}
pub(super) struct ChosenBtreeCandidate {
pub(super) iter_dir: IterationDirection,
pub(super) index: Option<Arc<Index>>,
pub(super) constraint_refs: Vec<RangeConstraintRef>,
pub(super) cost: Cost,
}
#[derive(Debug, Clone)]
pub(super) struct ChosenInSeekCandidate {
pub(super) index: Option<Arc<Index>>,
pub(super) affinity: Affinity,
pub(super) constraint_idx: usize,
pub(super) cost: Cost,
pub(super) estimated_rows_per_outer_row: f64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum BranchReadMode {
RowIdOnly,
FullRow,
}
#[allow(clippy::too_many_arguments)]
pub(super) fn choose_best_btree_candidate(
rhs_table: &JoinedTable,
rhs_constraints: &TableConstraints,
lhs_mask: &TableMask,
rhs_table_idx: usize,
maybe_order_target: Option<&OrderTarget>,
schema: &Schema,
available_indexes: &AvailableIndexes,
analyze_stats: &AnalyzeStats,
input_cardinality: f64,
base_row_count: RowCountEstimate,
params: &CostModelParams,
) -> Option<ChosenBtreeCandidate> {
let has_rowid_candidate = rhs_constraints.candidates.iter().any(|c| c.index.is_none());
let mut best_cost = if has_rowid_candidate {
estimate_cost_for_scan_or_seek(
None,
&[],
&[],
input_cardinality,
base_row_count,
false,
params,
None,
)
} else {
Cost(f64::MAX)
};
let mut best_choice = ChosenBtreeCandidate {
iter_dir: IterationDirection::Forwards,
index: None,
constraint_refs: vec![],
cost: best_cost,
};
let mut best_adjusted_output = f64::MAX;
let mut best_is_ordered = false;
let mut rhs_table_mask = TableMask::default();
rhs_table_mask.set(rhs_table_idx);
for candidate in rhs_constraints.candidates.iter() {
let usable_constraint_refs = usable_constraints_for_lhs_mask(
&rhs_constraints.constraints,
&candidate.refs,
lhs_mask,
rhs_table_idx,
);
let index_info = match candidate.index.as_ref() {
Some(index) => IndexInfo {
unique: index.unique,
covering: rhs_table.index_is_covering(index),
column_count: index.columns.len(),
rows_per_leaf_page: rows_per_leaf_page_for_index(
index.columns.len(),
rhs_table,
params.rows_per_table_page,
),
},
None => IndexInfo {
unique: true,
covering: !usable_constraint_refs.is_empty(),
column_count: 1,
rows_per_leaf_page: params.rows_per_table_page,
},
};
let (iter_dir, is_index_ordered, order_satisfiability_bonus) =
if let Some(order_target) = maybe_order_target {
let all_same_direction = btree_access_order_consumed(
rhs_table,
IterationDirection::Forwards,
candidate.index.as_deref(),
&usable_constraint_refs,
&order_target.columns,
schema,
EqualityPrefixScope::AnyEquality,
) == order_target.columns.len();
let all_opposite_direction = btree_access_order_consumed(
rhs_table,
IterationDirection::Backwards,
candidate.index.as_deref(),
&usable_constraint_refs,
&order_target.columns,
schema,
EqualityPrefixScope::AnyEquality,
) == order_target.columns.len();
let satisfies_order = all_same_direction || all_opposite_direction;
if satisfies_order {
let n = *base_row_count;
let sort_cost_saved = Cost(n * (n.max(1.0).log2()) * params.sort_cpu_per_row);
(
if all_same_direction {
IterationDirection::Forwards
} else {
IterationDirection::Backwards
},
true,
sort_cost_saved,
)
} else {
(IterationDirection::Forwards, false, Cost(0.0))
}
} else {
(IterationDirection::Forwards, false, Cost(0.0))
};
let analyze_ctx = AnalyzeCtx {
rhs_table,
index: candidate.index.as_ref(),
stats: analyze_stats,
};
let candidate_base_row_count = match candidate
.index
.as_ref()
.and_then(|idx| idx.where_clause.as_ref())
{
Some(where_expr) => {
let selectivity = super::constraints::estimate_partial_index_where_selectivity(
where_expr.as_ref(),
rhs_table,
schema,
available_indexes,
params,
)
.clamp(1e-6, 1.0);
RowCountEstimate::AnalyzeStats((*base_row_count * selectivity).max(1.0))
}
None => base_row_count,
};
let cost = estimate_cost_for_scan_or_seek(
Some(index_info),
&rhs_constraints.constraints,
&usable_constraint_refs,
input_cardinality,
candidate_base_row_count,
is_index_ordered,
params,
Some(&analyze_ctx),
);
let loop_prereq_mask = {
let mut mask = TableMask::default();
for ucref in usable_constraint_refs.iter() {
for idx in [
ucref.eq.as_ref().map(|e| e.constraint_pos),
ucref.lower_bound,
ucref.upper_bound,
]
.into_iter()
.flatten()
{
let c = &rhs_constraints.constraints[idx];
mask = mask.iter().chain(c.lhs_mask.iter()).collect();
}
}
mask
};
let allowed_mask: TableMask = loop_prereq_mask
.iter()
.chain(rhs_table_mask.iter())
.collect();
let consumed: SmallVec<[usize; 8]> = usable_constraint_refs
.iter()
.flat_map(|ucref| {
[
ucref.eq.as_ref().map(|e| e.constraint_pos),
ucref.lower_bound,
ucref.upper_bound,
]
.into_iter()
.flatten()
})
.collect();
let residual_selectivity: f64 = rhs_constraints
.constraints
.iter()
.enumerate()
.filter(|(i, c)| {
!consumed.contains(i)
&& c.usable
&& allowed_mask.contains_all_set_bits_of(&c.lhs_mask)
&& matches!(
c.operator,
ConstraintOperator::AstNativeOperator(ast::Operator::Equals)
| ConstraintOperator::AstNativeOperator(ast::Operator::Greater)
| ConstraintOperator::AstNativeOperator(ast::Operator::GreaterEquals)
| ConstraintOperator::AstNativeOperator(ast::Operator::Less)
| ConstraintOperator::AstNativeOperator(ast::Operator::LessEquals)
)
})
.map(|(_, c)| c.selectivity)
.product();
let adjusted_output = residual_selectivity;
let effective_bonus = if is_index_ordered && !best_is_ordered {
order_satisfiability_bonus
} else {
Cost(0.0)
};
let adjusted_best = best_cost + effective_bonus;
let costs_equal = (cost.0 - adjusted_best.0).abs() < 1e-9;
if cost < adjusted_best || (costs_equal && adjusted_output < best_adjusted_output - 1e-12) {
best_cost = cost;
best_adjusted_output = adjusted_output;
best_is_ordered = is_index_ordered;
best_choice = ChosenBtreeCandidate {
iter_dir,
index: candidate.index.clone(),
constraint_refs: usable_constraint_refs.clone(),
cost,
};
}
}
Some(best_choice)
}
fn consumed_where_terms_from_constraint_refs(
constraints: &[Constraint],
constraint_refs: &[RangeConstraintRef],
) -> SmallVec<[usize; 4]> {
let mut consumed = SmallVec::new();
for cref in constraint_refs {
for constraint_idx in [
cref.eq.as_ref().map(|eq| eq.constraint_pos),
cref.lower_bound,
cref.upper_bound,
]
.into_iter()
.flatten()
{
let where_term_idx = constraints[constraint_idx].where_clause_pos.0;
if !consumed.contains(&where_term_idx) {
consumed.push(where_term_idx);
}
}
}
consumed
}
#[allow(clippy::too_many_arguments)]
pub(super) fn choose_best_in_seek_candidate(
rhs_table: &JoinedTable,
rhs_constraints: &TableConstraints,
lhs_mask: &TableMask,
input_cardinality: f64,
base_row_count: RowCountEstimate,
params: &CostModelParams,
best_cost: Cost,
read_mode: BranchReadMode,
) -> Result<Option<ChosenInSeekCandidate>> {
let Table::BTree(btree) = &rhs_table.table else {
return Err(LimboError::InternalError(
"consider_in_seek_access_method called on non-BTree table".into(),
));
};
let base = *base_row_count;
let tree_depth = if base <= 1.0 {
1.0
} else {
(base.ln() / params.rows_per_table_page.ln())
.ceil()
.max(1.0)
};
let mut best_in_seek = None;
let mut best_in_seek_cost = best_cost;
for candidate in rhs_constraints.candidates.iter() {
let first_col_pos = candidate
.index
.as_ref()
.and_then(|idx| idx.columns.first().map(|c| c.pos_in_table));
let rowid_only = matches!(read_mode, BranchReadMode::RowIdOnly);
let index_info = match candidate.index.as_ref() {
Some(index) => IndexInfo {
unique: index.unique,
covering: rowid_only || rhs_table.index_is_covering(index),
column_count: index.columns.len(),
rows_per_leaf_page: rows_per_leaf_page_for_index(
index.columns.len(),
rhs_table,
params.rows_per_table_page,
),
},
None => IndexInfo {
unique: true,
covering: false,
column_count: 1,
rows_per_leaf_page: params.rows_per_table_page,
},
};
for constraint in &rhs_constraints.constraints {
let ConstraintOperator::In {
not,
estimated_values,
} = constraint.operator
else {
continue;
};
if not || !lhs_mask.contains_all_set_bits_of(&constraint.lhs_mask) {
continue;
}
let matches = if candidate.index.is_none() {
constraint.is_rowid
} else {
!constraint.is_rowid
&& constraint.table_col_pos.is_some()
&& constraint.table_col_pos == first_col_pos
};
if !matches {
continue;
}
if let (Some(index), Some(col_pos)) = (&candidate.index, constraint.table_col_pos) {
let constrained_column = &rhs_table.table.columns()[col_pos];
let table_collation = constrained_column.collation();
let index_collation = index.columns[0].collation.unwrap_or_default();
if table_collation != index_collation {
continue;
}
}
let rows_per_seek = if (index_info.unique && index_info.column_count == 1)
|| candidate.index.is_none()
{
1.0
} else {
(base * params.sel_eq_indexed).sqrt().max(1.0)
};
let in_cost = estimate_index_cost(
base,
tree_depth,
index_info,
estimated_values * input_cardinality,
rows_per_seek,
params,
);
if in_cost >= best_in_seek_cost {
continue;
}
let affinity = if let Some(col_pos) = constraint.table_col_pos {
btree
.columns()
.get(col_pos)
.map(|col| col.affinity())
.unwrap_or(Affinity::Blob)
} else {
Affinity::Integer
};
best_in_seek_cost = in_cost;
best_in_seek = Some(ChosenInSeekCandidate {
index: candidate.index.clone(),
affinity,
constraint_idx: constraint.where_clause_pos.0,
cost: in_cost,
estimated_rows_per_outer_row: (constraint.selectivity * base).max(1.0),
});
}
}
Ok(best_in_seek)
}
fn consider_in_seek_access_method(
rhs_table: &JoinedTable,
rhs_constraints: &TableConstraints,
lhs_mask: &TableMask,
input_cardinality: f64,
base_row_count: RowCountEstimate,
params: &CostModelParams,
best_cost: Cost,
) -> Result<Option<AccessMethod>> {
Ok(choose_best_in_seek_candidate(
rhs_table,
rhs_constraints,
lhs_mask,
input_cardinality,
base_row_count,
params,
best_cost,
BranchReadMode::FullRow,
)?
.map(|chosen| AccessMethod {
cost: chosen.cost,
estimated_rows_per_outer_row: chosen.estimated_rows_per_outer_row,
residual_constraints: ResidualConstraintMode::ApplyUnconsumed,
consumed_where_terms: smallvec::smallvec![chosen.constraint_idx],
params: AccessMethodParams::InSeek {
index: chosen.index,
affinity: chosen.affinity,
where_term_idx: chosen.constraint_idx,
},
}))
}
#[allow(clippy::too_many_arguments)]
pub fn find_best_access_method_for_join_order(
rhs_table: &JoinedTable,
rhs_constraints: &TableConstraints,
join_order: &[JoinOrderMember],
planning_context: JoinPlanningContext<'_>,
where_clause: &[WhereTerm],
available_indexes: &AvailableIndexes,
table_references: &TableReferences,
subqueries: &[NonFromClauseSubquery],
schema: &Schema,
analyze_stats: &AnalyzeStats,
input_cardinality: f64,
base_row_count: RowCountEstimate,
params: &CostModelParams,
) -> Result<Option<AccessMethod>> {
match &rhs_table.table {
Table::BTree(_) => find_best_access_method_for_btree(
rhs_table,
rhs_constraints,
join_order,
planning_context.maybe_order_target,
where_clause,
available_indexes,
table_references,
subqueries,
schema,
analyze_stats,
input_cardinality,
base_row_count,
params,
),
Table::Virtual(vtab) => find_best_access_method_for_vtab(
vtab,
&rhs_constraints.constraints,
join_order,
input_cardinality,
base_row_count,
params,
),
Table::FromClauseSubquery(subquery) => find_best_access_method_for_subquery(
rhs_table,
subquery,
rhs_constraints,
join_order,
planning_context,
schema,
input_cardinality,
base_row_count,
params,
),
}
}
#[allow(clippy::too_many_arguments)]
fn find_best_access_method_for_btree(
rhs_table: &JoinedTable,
rhs_constraints: &TableConstraints,
join_order: &[JoinOrderMember],
maybe_order_target: Option<&OrderTarget>,
where_clause: &[WhereTerm],
available_indexes: &AvailableIndexes,
table_references: &TableReferences,
subqueries: &[NonFromClauseSubquery],
schema: &Schema,
analyze_stats: &AnalyzeStats,
input_cardinality: f64,
base_row_count: RowCountEstimate,
params: &CostModelParams,
) -> Result<Option<AccessMethod>> {
let rhs_table_idx = join_order.last().unwrap().original_idx;
let lhs_mask: TableMask = join_order
.iter()
.take(join_order.len() - 1)
.map(|member| member.original_idx)
.collect();
let best = choose_best_btree_candidate(
rhs_table,
rhs_constraints,
&lhs_mask,
rhs_table_idx,
maybe_order_target,
schema,
available_indexes,
analyze_stats,
input_cardinality,
base_row_count,
params,
)
.expect("btree candidate selection must always consider the rowid candidate");
let estimated_rows_per_outer_row = if best.constraint_refs.is_empty() {
*base_row_count
} else {
let index_info = match best.index.as_ref() {
Some(index) => IndexInfo {
unique: index.unique,
covering: rhs_table.index_is_covering(index),
column_count: index.columns.len(),
rows_per_leaf_page: rows_per_leaf_page_for_index(
index.columns.len(),
rhs_table,
params.rows_per_table_page,
),
},
None => IndexInfo {
unique: true,
covering: true,
column_count: 1,
rows_per_leaf_page: params.rows_per_table_page,
},
};
let analyze_ctx = AnalyzeCtx {
rhs_table,
index: best.index.as_ref(),
stats: analyze_stats,
};
estimate_rows_per_seek(
index_info,
&rhs_constraints.constraints,
&best.constraint_refs,
base_row_count,
Some(&analyze_ctx),
)
};
let mut best_access_method = AccessMethod {
cost: best.cost,
estimated_rows_per_outer_row,
residual_constraints: ResidualConstraintMode::ApplyUnconsumed,
consumed_where_terms: consumed_where_terms_from_constraint_refs(
&rhs_constraints.constraints,
&best.constraint_refs,
),
params: AccessMethodParams::BTreeTable {
iter_dir: best.iter_dir,
index: best.index,
constraint_refs: best.constraint_refs,
},
};
if rhs_table.indexed.is_none() && rhs_table.btree().is_some_and(|b| b.has_rowid) {
if let Some(in_seek_method) = consider_in_seek_access_method(
rhs_table,
rhs_constraints,
&lhs_mask,
input_cardinality,
base_row_count,
params,
best_access_method.cost,
)? {
best_access_method = in_seek_method;
}
if let Some(multi_idx_method) = consider_multi_index_union(
rhs_table,
where_clause,
available_indexes,
table_references,
subqueries,
schema,
input_cardinality,
base_row_count,
params,
best_access_method.cost,
&lhs_mask,
analyze_stats,
) {
best_access_method = multi_idx_method;
}
if let Some(multi_idx_and_method) = consider_multi_index_intersection(
rhs_table,
where_clause,
available_indexes,
table_references,
subqueries,
schema,
input_cardinality,
base_row_count,
params,
best_access_method.cost,
&lhs_mask,
analyze_stats,
) {
best_access_method = multi_idx_and_method;
}
}
Ok(Some(best_access_method))
}
fn find_best_access_method_for_vtab(
vtab: &VirtualTable,
constraints: &[Constraint],
join_order: &[JoinOrderMember],
input_cardinality: f64,
base_row_count: RowCountEstimate,
params: &CostModelParams,
) -> Result<Option<AccessMethod>> {
let vtab_constraints = convert_to_vtab_constraint(constraints, join_order);
let best_index_result = vtab.best_index(&vtab_constraints, &[]);
match best_index_result {
Ok(index_info) => {
Ok(Some(AccessMethod {
cost: estimate_cost_for_scan_or_seek(
None,
&[],
&[],
input_cardinality,
base_row_count,
false,
params,
None,
),
estimated_rows_per_outer_row: *base_row_count,
residual_constraints: ResidualConstraintMode::ApplyUnconsumed,
consumed_where_terms: SmallVec::new(),
params: AccessMethodParams::VirtualTable {
idx_num: index_info.idx_num,
idx_str: index_info.idx_str,
constraints: vtab_constraints,
constraint_usages: index_info.constraint_usages,
},
}))
}
Err(ResultCode::ConstraintViolation) => Ok(None),
Err(e) => Err(LimboError::from(e)),
}
}
fn collect_table_refs(expr: &ast::Expr) -> Option<Vec<TableInternalId>> {
let mut tables = Vec::new();
let result = walk_expr(expr, &mut |e| {
match e {
ast::Expr::Column { table, .. } | ast::Expr::RowId { table, .. } => {
if !tables.contains(table) {
tables.push(*table);
}
}
_ => {}
}
Ok(WalkControl::Continue)
});
result.ok().map(|_| tables)
}
pub fn find_equijoin_conditions(
build_table_id: TableInternalId,
probe_table_id: TableInternalId,
where_clause: &[WhereTerm],
) -> Vec<HashJoinKey> {
let mut join_keys = Vec::new();
for (where_idx, where_term) in where_clause.iter().enumerate() {
if where_term.consumed {
continue;
}
let Ok(Some((lhs, op, rhs))) = as_binary_components(&where_term.expr) else {
continue;
};
if !matches!(op.as_ast_operator(), Some(ast::Operator::Equals)) {
continue;
}
let Some(lhs_tables) = collect_table_refs(lhs) else {
continue;
};
let Some(rhs_tables) = collect_table_refs(rhs) else {
continue;
};
if lhs_tables.len() != 1 || rhs_tables.len() != 1 {
continue;
}
let lhs_tid = lhs_tables[0];
let rhs_tid = rhs_tables[0];
let build_side = if lhs_tid == build_table_id && rhs_tid == probe_table_id {
Some(BinaryExprSide::Lhs)
} else if rhs_tid == build_table_id && lhs_tid == probe_table_id {
Some(BinaryExprSide::Rhs)
} else {
None
};
if let Some(build_side) = build_side {
join_keys.push(HashJoinKey {
where_clause_idx: where_idx,
build_side,
});
}
}
join_keys
}
pub fn estimate_hash_join_cost(
build_cardinality: f64,
probe_cardinality: f64,
mem_budget: usize,
probe_multiplier: f64,
params: &CostModelParams,
) -> Cost {
let estimated_hash_table_size =
(build_cardinality as usize).saturating_mul(params.hash_bytes_per_row as usize);
let will_spill = estimated_hash_table_size > mem_budget;
let build_cost = build_cardinality * (params.hash_cpu_cost + params.hash_insert_cost);
let probe_cost =
probe_cardinality * (params.hash_cpu_cost + params.hash_lookup_cost) * probe_multiplier;
let spill_cost = if will_spill {
let build_pages = (build_cardinality / params.rows_per_table_page).ceil();
let probe_pages = (probe_cardinality / params.rows_per_table_page).ceil();
(build_pages + probe_pages) * 2.0 * probe_multiplier
} else {
0.0
};
Cost(build_cost + probe_cost + spill_cost)
}
#[allow(clippy::too_many_arguments)]
pub fn try_hash_join_access_method(
build_table: &JoinedTable,
probe_table: &JoinedTable,
build_table_idx: usize,
probe_table_idx: usize,
build_constraints: &TableConstraints,
probe_constraints: &TableConstraints,
where_clause: &mut [WhereTerm],
build_cardinality: f64,
probe_cardinality: f64,
probe_multiplier: f64,
subqueries: &[NonFromClauseSubquery],
params: &CostModelParams,
) -> Option<AccessMethod> {
if !matches!(build_table.table, Table::BTree(_))
|| !matches!(probe_table.table, Table::BTree(_))
{
return None;
}
let probe_root_page = probe_table.table.btree().expect("table is BTree").root_page;
let build_root_page = build_table.table.btree().expect("table is BTree").root_page;
if build_root_page == probe_root_page {
return None;
}
if build_table.indexed.is_some() || probe_table.indexed.is_some() {
return None;
}
if probe_table
.join_info
.as_ref()
.is_some_and(|ji| ji.is_semi_or_anti())
|| build_table
.join_info
.as_ref()
.is_some_and(|ji| ji.is_semi_or_anti())
{
return None;
}
let hash_join_type = if probe_table
.join_info
.as_ref()
.is_some_and(|ji| ji.is_full_outer())
{
HashJoinType::FullOuter
} else if probe_table
.join_info
.as_ref()
.is_some_and(|ji| ji.is_outer())
{
HashJoinType::LeftOuter
} else {
HashJoinType::Inner
};
if build_table
.join_info
.as_ref()
.is_some_and(|ji| ji.is_outer())
{
return None;
}
if build_table
.join_info
.as_ref()
.is_some_and(|ji| !ji.using.is_empty())
|| probe_table
.join_info
.as_ref()
.is_some_and(|ji| !ji.using.is_empty())
{
return None;
}
for subquery in subqueries {
if !subquery.correlated {
continue;
}
if let SubqueryState::Unevaluated { plan } = &subquery.state {
if let Some(plan) = plan.as_ref() {
let outer_ref_ids = plan.used_outer_query_ref_ids();
for outer_ref_id in &outer_ref_ids {
if *outer_ref_id == build_table.internal_id
|| *outer_ref_id == probe_table.internal_id
{
return None;
}
}
}
}
}
let join_keys = find_equijoin_conditions(
build_table.internal_id,
probe_table.internal_id,
where_clause,
)
.into_iter()
.filter(|join_key| {
let probe_expr = join_key.get_probe_expr(where_clause);
let Some(probe_tables) = collect_table_refs(probe_expr) else {
return false;
};
probe_tables.len() == 1 && probe_tables[0] == probe_table.internal_id
})
.collect::<Vec<_>>();
tracing::debug!(
build_table = build_table.table.get_name(),
probe_table = probe_table.table.get_name(),
join_key_count = join_keys.len(),
"hash-join equi-join keys"
);
if join_keys.is_empty() {
return None;
}
if hash_join_type != HashJoinType::FullOuter {
for join_key in &join_keys {
let probe_expr = join_key.get_probe_expr(where_clause);
let probe_tables = collect_table_refs(probe_expr).unwrap_or_default();
let probe_is_single_table =
probe_tables.len() == 1 && probe_tables[0] == probe_table.internal_id;
let probe_is_simple_column =
expr_is_simple_column_from_table(probe_expr, probe_table.internal_id);
let build_expr = join_key.get_build_expr(where_clause);
let build_is_simple_column =
expr_is_simple_column_from_table(build_expr, build_table.internal_id);
if probe_is_single_table && probe_is_simple_column {
if let Some(constraint) = probe_constraints
.constraints
.iter()
.find(|c| c.where_clause_pos.0 == join_key.where_clause_idx)
{
if let Some(col_pos) = constraint.table_col_pos {
if let Some(column) = probe_table.columns().get(col_pos) {
if column.is_rowid_alias() {
return None;
}
}
for candidate in &probe_constraints.candidates {
if let Some(index) = &candidate.index {
if index.column_table_pos_to_index_pos(col_pos).is_some() {
return None;
}
}
}
}
}
}
if build_is_simple_column {
if let Some(constraint) = build_constraints
.constraints
.iter()
.find(|c| c.where_clause_pos.0 == join_key.where_clause_idx)
{
if let Some(col_pos) = constraint.table_col_pos {
if let Some(column) = build_table.columns().get(col_pos) {
if column.is_rowid_alias() {
return None;
}
}
for candidate in &build_constraints.candidates {
if let Some(index) = &candidate.index {
if index.column_table_pos_to_index_pos(col_pos).is_some() {
return None;
}
}
}
}
}
}
}
}
let cost = estimate_hash_join_cost(
build_cardinality,
probe_cardinality,
DEFAULT_MEM_BUDGET,
probe_multiplier,
params,
);
Some(AccessMethod {
cost,
estimated_rows_per_outer_row: probe_cardinality,
residual_constraints: ResidualConstraintMode::ApplyUnconsumed,
consumed_where_terms: join_keys.iter().map(|key| key.where_clause_idx).collect(),
params: AccessMethodParams::HashJoin {
build_table_idx,
probe_table_idx,
join_keys,
mem_budget: DEFAULT_MEM_BUDGET,
materialize_build_input: false,
use_bloom_filter: false,
join_type: hash_join_type,
},
})
}
fn expr_is_simple_column_from_table(expr: &ast::Expr, table_id: TableInternalId) -> bool {
matches!(
expr,
ast::Expr::Column { table, .. } | ast::Expr::RowId { table, .. } if *table == table_id
)
}
fn intrinsic_subquery_scan_direction(
rhs_table: &JoinedTable,
subquery: &FromClauseSubquery,
maybe_order_target: Option<&OrderTarget>,
table_materialization_required: bool,
schema: &Schema,
) -> Option<IterationDirection> {
let order_target = maybe_order_target?;
let cols = &order_target.columns;
let matches_forwards = subquery_intrinsic_order_consumed(
rhs_table.internal_id,
subquery,
IterationDirection::Forwards,
cols,
schema,
) == cols.len();
if matches_forwards {
return Some(IterationDirection::Forwards);
}
let matches_backwards = table_materialization_required
&& subquery_intrinsic_order_consumed(
rhs_table.internal_id,
subquery,
IterationDirection::Backwards,
cols,
schema,
) == cols.len();
matches_backwards.then_some(IterationDirection::Backwards)
}
#[expect(clippy::too_many_arguments)]
fn find_best_access_method_for_subquery(
rhs_table: &JoinedTable,
subquery: &FromClauseSubquery,
rhs_constraints: &TableConstraints,
join_order: &[JoinOrderMember],
planning_context: JoinPlanningContext<'_>,
schema: &Schema,
input_cardinality: f64,
base_row_count: RowCountEstimate,
params: &CostModelParams,
) -> Result<Option<AccessMethod>> {
use super::constraints::ConstraintRef;
let maybe_order_target = planning_context.maybe_order_target;
let table_materialization_required = subquery.requires_table_materialization();
let can_direct_materialize_index = subquery.supports_direct_index_materialization();
let coroutine_scan_cost = estimate_cost_for_scan_or_seek(
None,
&[],
&[],
input_cardinality,
base_row_count,
false,
params,
None,
);
let coroutine_reexecution_overhead =
Cost((input_cardinality - 1.0).max(0.0) * *base_row_count * params.cpu_cost_per_seek);
let coroutine_cost = coroutine_scan_cost + coroutine_reexecution_overhead;
let scan_cost = if table_materialization_required {
coroutine_scan_cost
} else {
coroutine_cost
};
if plan_has_outer_scope_dependency(&subquery.plan) {
return Ok(Some(AccessMethod {
cost: coroutine_cost,
estimated_rows_per_outer_row: *base_row_count,
residual_constraints: ResidualConstraintMode::ApplyUnconsumed,
consumed_where_terms: SmallVec::new(),
params: AccessMethodParams::Subquery {
iter_dir: IterationDirection::Forwards,
},
}));
}
let usable: Vec<(usize, &Constraint)> = rhs_constraints
.constraints
.iter()
.enumerate()
.filter(|(_, c)| {
c.usable
&& c.table_col_pos.is_some()
&& matches!(
c.operator.as_ast_operator(),
Some(
ast::Operator::Equals
| ast::Operator::Greater
| ast::Operator::GreaterEquals
| ast::Operator::Less
| ast::Operator::LessEquals
)
)
})
.collect();
let extremum_constraints_compatible = maybe_order_target.is_some_and(|ot| ot.is_extremum())
&& match maybe_order_target
.and_then(|ot| ot.columns.first())
.map(|c| &c.target)
{
Some(ColumnTarget::Column(pos)) => {
usable.iter().all(|(_, c)| c.table_col_pos == Some(*pos))
}
_ => false,
};
if extremum_constraints_compatible || usable.is_empty() {
if let Some(iter_dir) = intrinsic_subquery_scan_direction(
rhs_table,
subquery,
maybe_order_target,
table_materialization_required,
schema,
) {
return Ok(Some(AccessMethod {
cost: scan_cost,
estimated_rows_per_outer_row: *base_row_count,
residual_constraints: ResidualConstraintMode::ApplyUnconsumed,
consumed_where_terms: SmallVec::new(),
params: AccessMethodParams::Subquery { iter_dir },
}));
}
}
if usable.is_empty() {
return Ok(Some(AccessMethod {
cost: scan_cost,
estimated_rows_per_outer_row: *base_row_count,
residual_constraints: ResidualConstraintMode::ApplyUnconsumed,
consumed_where_terms: SmallVec::new(),
params: AccessMethodParams::Subquery {
iter_dir: IterationDirection::Forwards,
},
}));
}
let usable_constraints: Vec<&Constraint> = usable.iter().map(|(_, c)| *c).collect();
let key_col_positions = ordered_materialized_key_columns(&usable_constraints);
if key_col_positions.is_empty() {
return Ok(Some(AccessMethod {
cost: scan_cost,
estimated_rows_per_outer_row: *base_row_count,
residual_constraints: ResidualConstraintMode::ApplyUnconsumed,
consumed_where_terms: SmallVec::new(),
params: AccessMethodParams::Subquery {
iter_dir: IterationDirection::Forwards,
},
}));
}
let key_col_pos_to_index_pos: HashMap<usize, usize> = key_col_positions
.iter()
.enumerate()
.map(|(index_col_pos, table_col_pos)| (*table_col_pos, index_col_pos))
.collect();
let mut temp_constraint_refs: Vec<ConstraintRef> = usable
.iter()
.map(|(orig_idx, c)| {
let table_col_pos = c.table_col_pos.expect("table_col_pos was Some above");
let index_col_pos = *key_col_pos_to_index_pos
.get(&table_col_pos)
.expect("table_col_pos must exist in key_col_positions");
ConstraintRef {
constraint_vec_pos: *orig_idx,
index_col_pos,
sort_order: SortOrder::Asc,
}
})
.collect();
temp_constraint_refs.sort_by_key(|x| x.index_col_pos);
let usable_constraint_refs = usable_constraints_for_join_order(
&rhs_constraints.constraints,
&temp_constraint_refs,
join_order,
);
let has_search_constraints = !usable_constraint_refs.is_empty();
if !has_search_constraints {
tracing::trace!(
table = rhs_table.table.get_name(),
cost = ?scan_cost,
"using coroutine subquery access because no usable seek constraints remain"
);
return Ok(Some(AccessMethod {
cost: scan_cost,
estimated_rows_per_outer_row: *base_row_count,
residual_constraints: ResidualConstraintMode::ApplyUnconsumed,
consumed_where_terms: SmallVec::new(),
params: AccessMethodParams::Subquery {
iter_dir: IterationDirection::Forwards,
},
}));
}
let ephemeral_index =
materialized_subquery_ephemeral_index(rhs_table, subquery, &key_col_positions);
let (iter_dir, _is_index_ordered, order_satisfiability_bonus) =
materialized_subquery_order_properties(
rhs_table,
&ephemeral_index,
&usable_constraint_refs,
maybe_order_target,
schema,
base_row_count,
params,
);
let estimated_rows_per_outer_row = estimate_rows_per_seek(
IndexInfo {
unique: false,
column_count: key_col_positions.len(),
covering: true,
rows_per_leaf_page: params.rows_per_table_page,
},
&rhs_constraints.constraints,
&usable_constraint_refs,
base_row_count,
None,
);
let one_pass_scan_cost =
estimate_cost_for_scan_or_seek(None, &[], &[], 1.0, base_row_count, false, params, None);
let append_build_cost = Cost(*base_row_count * params.cpu_cost_per_seek);
let seek_setup_cost = if table_materialization_required || can_direct_materialize_index {
one_pass_scan_cost + append_build_cost
} else {
one_pass_scan_cost + one_pass_scan_cost + append_build_cost
};
let seek_cost = Cost(
input_cardinality * params.cpu_cost_per_seek
+ input_cardinality * estimated_rows_per_outer_row * params.cpu_cost_per_row,
);
let total_cost = seek_setup_cost + seek_cost;
if total_cost >= scan_cost + order_satisfiability_bonus {
return Ok(Some(AccessMethod {
cost: scan_cost,
estimated_rows_per_outer_row: *base_row_count,
residual_constraints: ResidualConstraintMode::ApplyUnconsumed,
consumed_where_terms: SmallVec::new(),
params: AccessMethodParams::Subquery {
iter_dir: IterationDirection::Forwards,
},
}));
}
Ok(Some(AccessMethod {
cost: total_cost,
estimated_rows_per_outer_row,
residual_constraints: ResidualConstraintMode::ApplyUnconsumed,
consumed_where_terms: consumed_where_terms_from_constraint_refs(
&rhs_constraints.constraints,
&usable_constraint_refs,
),
params: AccessMethodParams::MaterializedSubquery {
index: ephemeral_index,
constraint_refs: usable_constraint_refs,
iter_dir,
},
}))
}
fn materialized_subquery_ephemeral_index(
rhs_table: &JoinedTable,
subquery: &FromClauseSubquery,
key_col_positions: &[usize],
) -> Arc<Index> {
let mut index_columns: Vec<IndexColumn> = Vec::new();
let mut seen_col_positions = std::collections::HashSet::new();
for &col_pos in key_col_positions {
let column = subquery
.columns
.get(col_pos)
.expect("key column position out of bounds for materialized subquery");
if !seen_col_positions.insert(col_pos) {
continue;
}
index_columns.push(IndexColumn {
name: column.name.clone().unwrap_or_default(),
order: SortOrder::Asc,
pos_in_table: col_pos,
collation: column.collation_opt(),
default: column.default.clone(),
expr: None,
});
}
for (col_pos, column) in subquery.columns.iter().enumerate() {
if seen_col_positions.contains(&col_pos) {
continue;
}
index_columns.push(IndexColumn {
name: column.name.clone().unwrap_or_default(),
order: SortOrder::Asc,
pos_in_table: col_pos,
collation: column.collation_opt(),
default: column.default.clone(),
expr: None,
});
}
Arc::new(Index {
name: format!("ephemeral_subquery_{}", rhs_table.internal_id),
columns: index_columns,
unique: false,
ephemeral: true,
table_name: subquery.name.clone(),
root_page: 0,
where_clause: None,
has_rowid: true,
index_method: None,
on_conflict: None,
})
}
fn materialized_subquery_order_properties(
rhs_table: &JoinedTable,
index: &Arc<Index>,
constraint_refs: &[RangeConstraintRef],
maybe_order_target: Option<&OrderTarget>,
schema: &Schema,
base_row_count: RowCountEstimate,
params: &CostModelParams,
) -> (IterationDirection, bool, Cost) {
let Some(order_target) = maybe_order_target else {
return (IterationDirection::Forwards, false, Cost(0.0));
};
let all_same_direction = btree_access_order_consumed(
rhs_table,
IterationDirection::Forwards,
Some(index.as_ref()),
constraint_refs,
&order_target.columns,
schema,
EqualityPrefixScope::AnyEquality,
) == order_target.columns.len();
let all_opposite_direction = btree_access_order_consumed(
rhs_table,
IterationDirection::Backwards,
Some(index.as_ref()),
constraint_refs,
&order_target.columns,
schema,
EqualityPrefixScope::AnyEquality,
) == order_target.columns.len();
if !(all_same_direction || all_opposite_direction) {
return (IterationDirection::Forwards, false, Cost(0.0));
}
let n = *base_row_count;
let order_bonus = Cost(n * n.max(1.0).log2() * params.sort_cpu_per_row);
(
if all_same_direction {
IterationDirection::Forwards
} else {
IterationDirection::Backwards
},
true,
order_bonus,
)
}