mod bounds_detail;
mod catalog_summary;
mod op_summary;
mod pattern;
use std::{
collections::{BTreeMap, BTreeSet},
fmt,
};
use crate::{
analyze::BindingId,
plan::{Aggregate, BindingTableColumn, ExecutionPlan, PipelineOp, PlannedYieldItem, YieldKind},
};
use super::{DEFAULT_RULES, OptimizeContext, RULE_NAMES, Rule};
use catalog_summary::catalog_summary;
use op_summary::{mutation_summary, session_summary, tx_summary};
use pattern::{binding_map, binding_refs, join_tree_shape, pattern_snapshot};
#[must_use]
pub fn optimize_summary(plan: ExecutionPlan, ctx: &OptimizeContext<'_>) -> PlanSnapshot {
let (plan, fired_rules) = optimize_recording(plan, DEFAULT_RULES, ctx);
PlanSnapshot::from_plan(&plan, fired_rules)
}
#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub struct PlanSnapshot {
pub pipeline_ops: Vec<PipelineOpSummary>,
pub pattern: Option<PatternSnapshot>,
pub output_columns: Vec<String>,
pub fired_rules: Vec<&'static str>,
pub next_pipeline_op_id: u32,
}
#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub struct PipelineOpSummary {
pub kind: &'static str,
pub payload: String,
}
#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub struct PatternSnapshot {
pub binding_count: usize,
pub binding_names: Vec<String>,
pub join_tree_shape: String,
pub pattern_filter_count: usize,
pub scans: Vec<ScanSnapshot>,
pub order_access: Vec<Option<String>>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub struct ScanSnapshot {
pub binding: String,
pub kind: &'static str,
pub access: &'static str,
pub residual_predicates: usize,
pub consumed_predicates: usize,
pub bounds_detail: Option<String>,
}
impl PlanSnapshot {
fn from_plan(plan: &ExecutionPlan, fired_rules: Vec<&'static str>) -> Self {
let bindings = plan
.pattern_plan
.as_ref()
.map(|pattern| binding_map(&pattern.bindings))
.unwrap_or_default();
Self {
pipeline_ops: plan
.pipeline
.iter()
.map(|op| pipeline_summary(op, &bindings))
.collect(),
pattern: plan
.pattern_plan
.as_ref()
.map(|pattern| pattern_snapshot(pattern, &plan.pipeline)),
output_columns: output_columns(&plan.output_schema.columns),
fired_rules,
next_pipeline_op_id: plan.next_pipeline_op_id.get(),
}
}
fn compact(&self) -> String {
let pipeline = self
.pipeline_ops
.iter()
.map(|op| op.kind)
.collect::<Vec<_>>()
.join(",");
let pattern = self
.pattern
.as_ref()
.map(|pattern| pattern.join_tree_shape.as_str())
.unwrap_or("none");
format!(
"pipeline=[{}], output=[{}], pattern={}",
pipeline,
self.output_columns.join(","),
pattern
)
}
}
impl fmt::Display for PlanSnapshot {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "fired_rules: {}", display_list(&self.fired_rules))?;
writeln!(f, "output: [{}]", self.output_columns.join(", "))?;
if let Some(pattern) = &self.pattern {
writeln!(f, "pattern:")?;
writeln!(
f,
" bindings: {} [{}]",
pattern.binding_count,
pattern.binding_names.join(", ")
)?;
writeln!(f, " join_tree: {}", pattern.join_tree_shape)?;
writeln!(f, " pattern_filters: {}", pattern.pattern_filter_count)?;
writeln!(f, " scans:")?;
if pattern.scans.is_empty() {
writeln!(f, " - none")?;
} else {
for scan in &pattern.scans {
let bounds_suffix = scan
.bounds_detail
.as_deref()
.map(|detail| format!(" [bounds={detail}]"))
.unwrap_or_default();
writeln!(
f,
" - {} ({}): {}{} residual={} consumed={}",
scan.binding,
scan.kind,
scan.access,
bounds_suffix,
scan.residual_predicates,
scan.consumed_predicates
)?;
}
}
writeln!(
f,
" order_access: [{}]",
pattern
.order_access
.iter()
.map(|access| access.as_deref().unwrap_or("none"))
.collect::<Vec<_>>()
.join(", ")
)?;
} else {
writeln!(f, "pattern: none")?;
}
writeln!(f, "pipeline:")?;
if self.pipeline_ops.is_empty() {
writeln!(f, " - none")?;
} else {
for op in &self.pipeline_ops {
if op.payload.is_empty() {
writeln!(f, " - {}", op.kind)?;
} else {
writeln!(f, " - {}({})", op.kind, op.payload)?;
}
}
}
Ok(())
}
}
fn optimize_recording(
mut plan: ExecutionPlan,
rules: &[&'static dyn Rule],
ctx: &OptimizeContext<'_>,
) -> (ExecutionPlan, Vec<&'static str>) {
let mut fired = BTreeSet::new();
for _ in 0..ctx.impl_defined_caps.max_optimizer_iterations {
let mut changed_any = false;
for rule in rules {
let transformed = rule.rewrite(plan, ctx);
plan = transformed.plan;
if transformed.changed {
fired.insert(rule.name());
changed_any = true;
}
}
if !changed_any {
break;
}
}
plan.refresh_pipeline_op_high_water();
let mut fired_rules = fired.into_iter().collect::<Vec<_>>();
fired_rules.sort_unstable_by_key(|name| {
RULE_NAMES
.iter()
.position(|candidate| candidate == name)
.unwrap_or(usize::MAX)
});
(plan, fired_rules)
}
fn pipeline_summary(op: &PipelineOp, bindings: &BTreeMap<BindingId, String>) -> PipelineOpSummary {
match op {
PipelineOp::Filter(pred) => PipelineOpSummary {
kind: "Filter",
payload: format!(
"binding_refs=[{}]",
binding_refs(&pred.binding_refs, bindings)
),
},
PipelineOp::Project(items) => PipelineOpSummary {
kind: "Project",
payload: format!(
"columns=[{}]",
items
.iter()
.enumerate()
.map(|(index, item)| item
.alias
.clone()
.map(|alias| alias.as_str().to_owned())
.unwrap_or_else(|| format!("expr{index}")))
.collect::<Vec<_>>()
.join(",")
),
},
PipelineOp::Let(items) => PipelineOpSummary {
kind: "Let",
payload: format!(
"bindings=[{}]",
items
.iter()
.filter_map(|item| item.alias.clone().map(|alias| alias.as_str().to_owned()))
.collect::<Vec<_>>()
.join(",")
),
},
PipelineOp::Unwind {
source,
alias,
position,
..
} => {
let payload = if let Some(position) = position {
let kind = match position.kind {
crate::RowExpansionPositionKind::Ordinality => "ordinality",
crate::RowExpansionPositionKind::Offset => "offset",
};
format!(
"alias={}, position={}:{}, source=binding_refs=[{}]",
alias.as_str(),
kind,
position.alias.as_str(),
binding_refs(&source.binding_refs, bindings)
)
} else {
format!(
"alias={}, source=binding_refs=[{}]",
alias.as_str(),
binding_refs(&source.binding_refs, bindings)
)
};
PipelineOpSummary {
kind: "Unwind",
payload,
}
}
PipelineOp::OrderBy(keys) => PipelineOpSummary {
kind: "OrderBy",
payload: format!("keys={}", keys.len()),
},
PipelineOp::Limit { offset, count } => PipelineOpSummary {
kind: "Limit",
payload: format!("offset={offset:?}, count={count:?}"),
},
PipelineOp::TopK {
keys,
offset,
count,
} => PipelineOpSummary {
kind: "TopK",
payload: format!("keys={}, offset={offset:?}, count={count:?}", keys.len()),
},
PipelineOp::GroupBy { keys, aggregates } => PipelineOpSummary {
kind: "GroupBy",
payload: format!(
"keys={}, aggs=[{}]",
keys.len(),
aggregates
.iter()
.map(aggregate_summary)
.collect::<Vec<_>>()
.join(",")
),
},
PipelineOp::Distinct => PipelineOpSummary {
kind: "Distinct",
payload: String::new(),
},
PipelineOp::Union { op, rhs } => PipelineOpSummary {
kind: "Union",
payload: format!(
"op={op:?}, rhs={}",
PlanSnapshot::from_plan(rhs, Vec::new()).compact()
),
},
PipelineOp::Chain(rhs) => PipelineOpSummary {
kind: "Chain",
payload: format!("rhs={}", PlanSnapshot::from_plan(rhs, Vec::new()).compact()),
},
PipelineOp::CorrelatedChain(rhs) => PipelineOpSummary {
kind: "CorrelatedChain",
payload: format!("rhs={}", PlanSnapshot::from_plan(rhs, Vec::new()).compact()),
},
PipelineOp::Match(pattern) => PipelineOpSummary {
kind: "Match",
payload: join_tree_shape(&pattern.join_tree, bindings),
},
PipelineOp::OptionalMatch(pattern) => PipelineOpSummary {
kind: "OptionalMatch",
payload: join_tree_shape(&pattern.join_tree, bindings),
},
PipelineOp::Call(call) => PipelineOpSummary {
kind: "Call",
payload: format!(
"name={}, args={}, yield=[{}]",
call.procedure
.iter()
.map(|part| part.as_str())
.collect::<Vec<_>>()
.join("."),
call.args.len(),
call.yield_cols
.iter()
.map(yield_summary)
.collect::<Vec<_>>()
.join(",")
),
},
PipelineOp::CallSubquery(call) => PipelineOpSummary {
kind: "CallSubquery",
payload: format!(
"yield=[{}], body={}",
call.yield_items
.iter()
.map(|item| format!("{}=>{}", item.source.as_str(), item.output.as_str()))
.collect::<Vec<_>>()
.join(","),
PlanSnapshot::from_plan(&call.body, Vec::new()).compact()
),
},
PipelineOp::Mutation(mutation) => PipelineOpSummary {
kind: "Mutation",
payload: mutation_summary(mutation),
},
PipelineOp::Catalog(catalog) => PipelineOpSummary {
kind: "Catalog",
payload: catalog_summary(catalog),
},
PipelineOp::ExplainPlan { inner, .. } => PipelineOpSummary {
kind: "ExplainPlan",
payload: format!(
"inner={}",
PlanSnapshot::from_plan(inner, Vec::new()).compact()
),
},
PipelineOp::Tx(tx) => PipelineOpSummary {
kind: "Tx",
payload: tx_summary(tx),
},
PipelineOp::Session(session) => PipelineOpSummary {
kind: "Session",
payload: session_summary(session),
},
}
}
fn output_columns(columns: &[BindingTableColumn]) -> Vec<String> {
columns
.iter()
.enumerate()
.map(|(index, column)| {
column
.name
.clone()
.map(|name| name.as_str().to_owned())
.unwrap_or_else(|| format!("expr{index}"))
})
.collect()
}
fn aggregate_summary(aggregate: &Aggregate) -> String {
if aggregate.star {
format!("{}(*)", aggregate.function.as_str())
} else {
let distinct = if aggregate.distinct { " distinct" } else { "" };
format!(
"{}({} args{distinct})",
aggregate.function.as_str(),
aggregate.args.len()
)
}
}
fn yield_summary(item: &PlannedYieldItem) -> String {
let column = match &item.column {
YieldKind::Star => "*".to_owned(),
YieldKind::Named(name) => name.as_str().to_owned(),
};
item.alias
.clone()
.map(|alias| format!("{column} as {}", alias.as_str()))
.unwrap_or(column)
}
fn display_list(values: &[&str]) -> String {
if values.is_empty() {
"none".to_owned()
} else {
values.join(", ")
}
}