use super::report::{PipelineInspection, PipelineStageInspection};
use crate::exec::pipeline::{FallbackBoundary, PhysicalExecPath, PipelineBody, Sink, Stage};
use crate::ir::physical::{PipelinePlanSource, PlanNode, QueryPlan};
pub(crate) fn inspect_first_pipeline(plan: &QueryPlan) -> Option<PipelineInspection> {
plan.node_ids().find_map(|id| {
let PlanNode::Pipeline { source, body } = plan.node(id) else {
return None;
};
Some(inspect_pipeline(source, body))
})
}
pub(crate) fn inspect_pipeline(
source: &PipelinePlanSource,
body: &PipelineBody,
) -> PipelineInspection {
let exec_path = crate::exec::pipeline::select_exec_path(&body.stages, &body.sink);
let fallback_boundary =
crate::exec::pipeline::Pipeline::fallback_boundary_for(&body.stages, exec_path);
PipelineInspection {
source: source_label(source),
stages: body
.stages
.iter()
.enumerate()
.map(|(index, stage)| inspect_stage(index, stage))
.collect(),
sink: sink_label(&body.sink).to_string(),
source_demand: format!("{:?}", body.source_demand().chain.pull),
fallback_boundary: fallback_boundary_label(fallback_boundary),
execution_path: Some(exec_path_label(exec_path).to_string()),
}
}
pub(crate) fn source_label(source: &PipelinePlanSource) -> String {
match source {
PipelinePlanSource::FieldChain { keys } => {
let joined = keys
.iter()
.map(|key| key.as_ref())
.collect::<Vec<_>>()
.join(".");
if joined.is_empty() {
"field-chain:$".to_string()
} else {
format!("field-chain:$.{joined}")
}
}
PipelinePlanSource::Expr(id) => format!("expr:{}", id.0),
}
}
pub(crate) fn inspect_stage(index: usize, stage: &Stage) -> PipelineStageInspection {
let (kind, detail) = stage_label(stage);
PipelineStageInspection {
index,
kind: kind.to_string(),
detail,
}
}
pub(crate) fn sink_label(sink: &Sink) -> &'static str {
match sink {
Sink::Collect => "collect",
Sink::Reducer(spec) => match spec.op {
crate::exec::pipeline::ReducerOp::Count => "count",
crate::exec::pipeline::ReducerOp::Numeric(crate::exec::pipeline::NumOp::Sum) => "sum",
crate::exec::pipeline::ReducerOp::Numeric(crate::exec::pipeline::NumOp::Min) => "min",
crate::exec::pipeline::ReducerOp::Numeric(crate::exec::pipeline::NumOp::Max) => "max",
crate::exec::pipeline::ReducerOp::Numeric(crate::exec::pipeline::NumOp::Avg) => "avg",
},
Sink::Predicate(spec) => match spec.op {
crate::exec::pipeline::PredicateSinkOp::Any => "any",
crate::exec::pipeline::PredicateSinkOp::All => "all",
crate::exec::pipeline::PredicateSinkOp::FindIndex => "find-index",
crate::exec::pipeline::PredicateSinkOp::IndicesWhere => "indices-where",
crate::exec::pipeline::PredicateSinkOp::FindOne => "find-one",
},
Sink::Membership(_) => "membership",
Sink::ArgExtreme(spec) => {
if spec.want_max {
"max-by"
} else {
"min-by"
}
}
Sink::Terminal(_) => "terminal",
Sink::SelectMany { from_end: true, .. } => "take-last",
Sink::SelectMany {
from_end: false, ..
} => "take",
Sink::Nth(_) => "nth",
Sink::ApproxCountDistinct => "approx-count-distinct",
}
}
fn stage_label(stage: &Stage) -> (&'static str, Option<String>) {
match stage {
Stage::Filter(_, view) => ("filter", Some(format!("view={view:?}"))),
Stage::Map(_, view) => ("map", Some(format!("view={view:?}"))),
Stage::FlatMap(_, view) => ("flat-map", Some(format!("view={view:?}"))),
Stage::Reverse(_) => ("reverse", None),
Stage::UniqueBy(Some(_)) => ("unique-by", Some("keyed".to_string())),
Stage::UniqueBy(None) => ("unique", None),
Stage::Sort(spec) => (
"sort",
Some(format!(
"descending={}, keyed={}",
spec.descending,
spec.key.is_some()
)),
),
Stage::Builtin(call) => ("builtin", Some(format!("{:?}", call.method))),
Stage::UsizeBuiltin { method, value } => {
("builtin-usize", Some(format!("{method:?}({value})")))
}
Stage::StringBuiltin { method, value } => {
("builtin-string", Some(format!("{method:?}({value:?})")))
}
Stage::StringPairBuiltin {
method,
first,
second,
} => (
"builtin-string-pair",
Some(format!("{method:?}({first:?}, {second:?})")),
),
Stage::IntRangeBuiltin { method, start, end } => (
"builtin-range",
Some(format!("{method:?}({start}, {end:?})")),
),
Stage::CompiledMap(_) => ("compiled-map", None),
Stage::ExprBuiltin { method, .. } => ("expr-builtin", Some(format!("{method:?}"))),
Stage::SortedDedup(Some(_)) => ("sorted-dedup", Some("keyed".to_string())),
Stage::SortedDedup(None) => ("sorted-dedup", None),
}
}
fn fallback_boundary_label(boundary: FallbackBoundary) -> Option<String> {
match boundary {
FallbackBoundary::None => None,
FallbackBoundary::LegacyStage { index } => Some(format!("legacy-stage:{index}")),
FallbackBoundary::MaterializedExecution => Some("materialized-execution".to_string()),
}
}
fn exec_path_label(path: PhysicalExecPath) -> &'static str {
match path {
PhysicalExecPath::Indexed => "indexed",
PhysicalExecPath::Columnar => "columnar",
PhysicalExecPath::Composed => "composed",
PhysicalExecPath::Legacy => "legacy",
}
}
#[cfg(test)]
mod tests {
#[test]
fn pipeline_inspection_exports_stage_sink_and_demand() {
let plan = crate::plan::physical::plan_query_with_context(
"$.books.filter(price > 10).take(2).map(title)",
crate::plan::physical::PlanningContext::bytes(),
);
let pipeline = super::inspect_first_pipeline(&plan).expect("pipeline");
assert_eq!(pipeline.source, "field-chain:$.books");
assert!(!pipeline.stages.is_empty());
assert!(!pipeline.sink.is_empty());
assert!(!pipeline.source_demand.is_empty());
}
}