use crate::planner::TracingQueryPlanner;
use crate::rule_options::RuleInstrumentationOptions;
use datafusion::common::config::ConfigOptions;
use datafusion::common::tree_node::{Transformed, TreeNodeRewriter};
use datafusion::common::{DataFusionError, Result};
use datafusion::execution::SessionState;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::logical_expr::LogicalPlan;
use datafusion::optimizer::analyzer::AnalyzerRule;
use datafusion::optimizer::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::{ExecutionPlan, displayable};
use similar::{ChangeTag, TextDiff};
use std::cell::RefCell;
use std::fmt::Debug;
use std::sync::Arc;
use tracing::Level;
use tracing::Span;
mod sentinel_names {
pub const ANALYZER: &str = "__tracing_analyzer_phase";
pub const OPTIMIZER: &str = "__tracing_optimizer_phase";
pub const PHYSICAL_OPTIMIZER: &str = "__tracing_physical_optimizer_phase";
}
mod phase_names {
pub const ANALYZE_LOGICAL_PLAN: &str = "analyze_logical_plan";
pub const OPTIMIZE_LOGICAL_PLAN: &str = "optimize_logical_plan";
pub const OPTIMIZE_PHYSICAL_PLAN: &str = "optimize_physical_plan";
}
trait FormatPlan {
fn format_for_diff(&self) -> String;
}
impl FormatPlan for LogicalPlan {
fn format_for_diff(&self) -> String {
self.display_indent_schema().to_string()
}
}
impl FormatPlan for Arc<dyn ExecutionPlan> {
fn format_for_diff(&self) -> String {
displayable(self.as_ref()).indent(true).to_string()
}
}
pub(crate) type PhaseSpanCreateFn = dyn Fn(&str) -> Span + Send + Sync;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum PlanningPhase {
Analyzer,
Optimizer,
PhysicalOptimizer,
}
struct PlanningContext {
phase: PlanningPhase,
_entered: tracing::span::EnteredSpan,
plan_before: Option<String>,
effective_rules: Vec<String>,
}
struct OptimizerPassTracker {
parent_span_id: Option<tracing::span::Id>,
pass_count: usize,
}
impl OptimizerPassTracker {
const fn new() -> Self {
Self {
parent_span_id: None,
pass_count: 0,
}
}
fn get_and_increment(
&mut self,
current_parent_id: Option<tracing::span::Id>,
) -> usize {
if self.parent_span_id != current_parent_id {
self.parent_span_id = current_parent_id;
self.pass_count = 0;
}
let pass = self.pass_count;
self.pass_count += 1;
pass
}
fn reset(&mut self) {
self.parent_span_id = None;
self.pass_count = 0;
}
}
thread_local! {
static PLANNING_CONTEXT: RefCell<Option<PlanningContext>> = const { RefCell::new(None) };
static OPTIMIZER_PASS_TRACKER: RefCell<OptimizerPassTracker> = const { RefCell::new(OptimizerPassTracker::new()) };
}
fn record_modified_rule_in_context(rule_name: &str) {
PLANNING_CONTEXT.with(|cell| {
if let Some(ref mut ctx) = *cell.borrow_mut() {
ctx.effective_rules.push(rule_name.to_string());
}
});
}
fn close_phase_span<P: FormatPlan>(ctx: PlanningContext, plan_after: &P) {
let current = Span::current();
if !current.is_disabled() {
if !ctx.effective_rules.is_empty() {
current.record("datafusion.effective_rules", ctx.effective_rules.join(", "));
}
if let Some(before) = &ctx.plan_before {
let after = plan_after.format_for_diff();
if before != &after {
let diff = generate_plan_diff(before, &after);
current.record("datafusion.plan_diff", diff);
}
}
}
}
struct AnalyzerPhaseSentinel {
phase_span_create_fn: Arc<PhaseSpanCreateFn>,
plan_diff: bool,
}
impl Debug for AnalyzerPhaseSentinel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AnalyzerPhaseSentinel").finish()
}
}
impl AnalyzerRule for AnalyzerPhaseSentinel {
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result<LogicalPlan> {
PLANNING_CONTEXT.with(|cell| {
let mut guard = cell.borrow_mut();
let should_open = guard
.as_ref()
.map(|ctx| ctx.phase != PlanningPhase::Analyzer)
.unwrap_or(true);
if should_open {
let span = (self.phase_span_create_fn)(phase_names::ANALYZE_LOGICAL_PLAN);
let plan_before = if self.plan_diff && !span.is_disabled() {
Some(plan.format_for_diff())
} else {
None
};
*guard = Some(PlanningContext {
phase: PlanningPhase::Analyzer,
_entered: span.entered(),
plan_before,
effective_rules: Vec::new(),
});
} else {
if let Some(ctx) = guard.take() {
close_phase_span(ctx, &plan);
}
}
});
Ok(plan)
}
fn name(&self) -> &str {
sentinel_names::ANALYZER
}
}
struct OptimizerPhaseSentinel {
phase_span_create_fn: Arc<PhaseSpanCreateFn>,
plan_diff: bool,
}
impl Debug for OptimizerPhaseSentinel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OptimizerPhaseSentinel").finish()
}
}
impl OptimizerRule for OptimizerPhaseSentinel {
fn name(&self) -> &str {
sentinel_names::OPTIMIZER
}
fn apply_order(&self) -> Option<ApplyOrder> {
None
}
#[allow(deprecated)]
fn supports_rewrite(&self) -> bool {
true
}
fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
PLANNING_CONTEXT.with(|cell| {
let mut guard = cell.borrow_mut();
let should_open = guard
.as_ref()
.map(|ctx| ctx.phase != PlanningPhase::Optimizer)
.unwrap_or(true);
if should_open {
let current_parent_id = Span::current().id();
let pass = OPTIMIZER_PASS_TRACKER.with(|tracker| {
tracker.borrow_mut().get_and_increment(current_parent_id)
});
let span =
(self.phase_span_create_fn)(phase_names::OPTIMIZE_LOGICAL_PLAN);
if !span.is_disabled() {
let max_passes = config.options().optimizer.max_passes;
span.record("datafusion.optimizer.pass", pass + 1);
span.record("datafusion.optimizer.max_passes", max_passes as i64);
}
let plan_before = if self.plan_diff && !span.is_disabled() {
Some(plan.format_for_diff())
} else {
None
};
*guard = Some(PlanningContext {
phase: PlanningPhase::Optimizer,
_entered: span.entered(),
plan_before,
effective_rules: Vec::new(),
});
} else {
if let Some(ctx) = guard.take() {
close_phase_span(ctx, &plan);
}
}
});
Ok(Transformed::no(plan))
}
}
struct PhysicalOptimizerPhaseSentinel {
phase_span_create_fn: Arc<PhaseSpanCreateFn>,
plan_diff: bool,
}
impl Debug for PhysicalOptimizerPhaseSentinel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PhysicalOptimizerPhaseSentinel").finish()
}
}
impl PhysicalOptimizerRule for PhysicalOptimizerPhaseSentinel {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
PLANNING_CONTEXT.with(|cell| {
let mut guard = cell.borrow_mut();
let should_open = guard
.as_ref()
.map(|ctx| ctx.phase != PlanningPhase::PhysicalOptimizer)
.unwrap_or(true);
if should_open {
let span =
(self.phase_span_create_fn)(phase_names::OPTIMIZE_PHYSICAL_PLAN);
let plan_before = if self.plan_diff && !span.is_disabled() {
Some(plan.format_for_diff())
} else {
None
};
*guard = Some(PlanningContext {
phase: PlanningPhase::PhysicalOptimizer,
_entered: span.entered(),
plan_before,
effective_rules: Vec::new(),
});
} else {
if let Some(ctx) = guard.take() {
close_phase_span(ctx, &plan);
}
OPTIMIZER_PASS_TRACKER.with(|tracker| {
tracker.borrow_mut().reset();
});
}
});
Ok(plan)
}
fn name(&self) -> &str {
sentinel_names::PHYSICAL_OPTIMIZER
}
fn schema_check(&self) -> bool {
true
}
}
struct SingleSpanTreeTraverser<'a> {
apply_order: ApplyOrder,
rule: &'a dyn OptimizerRule,
config: &'a dyn OptimizerConfig,
}
impl<'a> SingleSpanTreeTraverser<'a> {
fn new(
apply_order: ApplyOrder,
rule: &'a dyn OptimizerRule,
config: &'a dyn OptimizerConfig,
) -> Self {
Self {
apply_order,
rule,
config,
}
}
}
impl TreeNodeRewriter for SingleSpanTreeTraverser<'_> {
type Node = LogicalPlan;
fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
if self.apply_order == ApplyOrder::TopDown {
self.rule.rewrite(node, self.config)
} else {
Ok(Transformed::no(node))
}
}
fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
if self.apply_order == ApplyOrder::BottomUp {
self.rule.rewrite(node, self.config)
} else {
Ok(Transformed::no(node))
}
}
}
fn generate_plan_diff(before: &str, after: &str) -> String {
let diff = TextDiff::from_lines(before, after);
let mut output = String::new();
for change in diff.iter_all_changes() {
let sign = match change.tag() {
ChangeTag::Delete => "-",
ChangeTag::Insert => "+",
ChangeTag::Equal => " ",
};
output.push_str(&format!("{}{}", sign, change));
}
output
}
fn detect_and_record_modification(
before_str: &str,
after_str: &str,
span: &Span,
record_diff: bool,
rule_name: &str,
) {
if before_str == after_str {
return;
}
if record_diff {
let diff = generate_plan_diff(before_str, after_str);
span.record("datafusion.plan_diff", diff);
}
span.record("otel.name", format!("{} (modified)", rule_name));
record_modified_rule_in_context(rule_name);
}
pub(crate) type RuleSpanCreateFn = dyn Fn(&str) -> Span + Send + Sync;
struct InstrumentedAnalyzerRule {
inner: Arc<dyn AnalyzerRule + Send + Sync>,
options: RuleInstrumentationOptions,
span_create_fn: Arc<RuleSpanCreateFn>,
}
impl Debug for InstrumentedAnalyzerRule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InstrumentedAnalyzerRule")
.field("inner", &self.inner)
.field("options", &self.options)
.finish()
}
}
impl InstrumentedAnalyzerRule {
fn new(
inner: Arc<dyn AnalyzerRule + Send + Sync>,
options: RuleInstrumentationOptions,
span_create_fn: Arc<RuleSpanCreateFn>,
) -> Self {
Self {
inner,
options,
span_create_fn,
}
}
}
impl AnalyzerRule for InstrumentedAnalyzerRule {
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {
let span = (self.span_create_fn)(self.name());
let _enter = span.enter();
let plan_before = if !span.is_disabled() {
Some(plan.clone())
} else {
None
};
let result = self.inner.analyze(plan, config);
if !span.is_disabled() {
match &result {
Ok(plan_after) => {
if let Some(plan_before) = plan_before {
if &plan_before != plan_after {
let before_str = plan_before.format_for_diff();
let after_str = plan_after.format_for_diff();
detect_and_record_modification(
&before_str,
&after_str,
&span,
self.options.plan_diff,
self.name(),
);
}
}
}
Err(e) => {
tracing::error!(error = %e, "AnalyzerRule failed");
}
}
}
result
}
fn name(&self) -> &str {
self.inner.name()
}
}
struct InstrumentedOptimizerRule {
inner: Arc<dyn OptimizerRule + Send + Sync>,
options: RuleInstrumentationOptions,
span_create_fn: Arc<RuleSpanCreateFn>,
}
impl Debug for InstrumentedOptimizerRule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InstrumentedOptimizerRule")
.field("inner", &self.inner)
.field("options", &self.options)
.finish()
}
}
impl InstrumentedOptimizerRule {
fn new(
inner: Arc<dyn OptimizerRule + Send + Sync>,
options: RuleInstrumentationOptions,
span_create_fn: Arc<RuleSpanCreateFn>,
) -> Self {
Self {
inner,
options,
span_create_fn,
}
}
fn apply_inner(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
match self.inner.apply_order() {
Some(apply_order) => {
plan.rewrite_with_subqueries(&mut SingleSpanTreeTraverser::new(
apply_order,
self.inner.as_ref(),
config,
))
}
None => self.inner.rewrite(plan, config),
}
}
}
impl OptimizerRule for InstrumentedOptimizerRule {
fn name(&self) -> &str {
self.inner.name()
}
fn apply_order(&self) -> Option<ApplyOrder> {
None
}
#[allow(deprecated)]
fn supports_rewrite(&self) -> bool {
self.inner.supports_rewrite()
}
fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
let span = (self.span_create_fn)(self.name());
let _enter = span.enter();
let plan_before = if !span.is_disabled() {
Some(plan.clone())
} else {
None
};
let result = self.apply_inner(plan, config);
if !span.is_disabled() {
match &result {
Ok(transformed) => {
if transformed.transformed
&& let Some(before) = plan_before
{
let before_str = before.format_for_diff();
let after_str = transformed.data.format_for_diff();
detect_and_record_modification(
&before_str,
&after_str,
&span,
self.options.plan_diff,
self.name(),
);
}
}
Err(e) => {
tracing::error!(error = %e, "OptimizerRule failed");
}
}
}
result
}
}
struct InstrumentedPhysicalOptimizerRule {
inner: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
options: RuleInstrumentationOptions,
span_create_fn: Arc<RuleSpanCreateFn>,
}
impl Debug for InstrumentedPhysicalOptimizerRule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InstrumentedPhysicalOptimizerRule")
.field("inner", &self.inner)
.field("options", &self.options)
.finish()
}
}
impl InstrumentedPhysicalOptimizerRule {
fn new(
inner: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
options: RuleInstrumentationOptions,
span_create_fn: Arc<RuleSpanCreateFn>,
) -> Self {
Self {
inner,
options,
span_create_fn,
}
}
}
impl PhysicalOptimizerRule for InstrumentedPhysicalOptimizerRule {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let span = (self.span_create_fn)(self.name());
let _enter = span.enter();
let plan_clone = if !span.is_disabled() {
Some(Arc::clone(&plan))
} else {
None
};
let result = self.inner.optimize(plan, config);
if let Some(old_plan) = plan_clone {
match &result {
Ok(new_plan) => {
if !Arc::ptr_eq(&old_plan, new_plan) {
let before_str = old_plan.format_for_diff();
let after_str = new_plan.format_for_diff();
detect_and_record_modification(
&before_str,
&after_str,
&span,
self.options.plan_diff,
self.name(),
);
}
}
Err(e) => {
tracing::error!(error = %e, "PhysicalOptimizerRule failed");
}
}
}
result
}
fn name(&self) -> &str {
self.inner.name()
}
fn schema_check(&self) -> bool {
self.inner.schema_check()
}
}
#[doc(hidden)]
pub fn instrument_session_state(
state: SessionState,
options: RuleInstrumentationOptions,
span_create_fn: Arc<RuleSpanCreateFn>,
phase_span_create_fn: Arc<PhaseSpanCreateFn>,
span_level: Level,
) -> SessionState {
let analyzers = instrument_analyzer_rules(
state.analyzer().rules.clone(),
&options,
&span_create_fn,
&phase_span_create_fn,
);
let optimizers = instrument_optimizer_rules(
Vec::from(state.optimizers()),
&options,
&span_create_fn,
&phase_span_create_fn,
);
let physical_optimizers = instrument_physical_optimizer_rules(
Vec::from(state.physical_optimizers()),
&options,
&span_create_fn,
&phase_span_create_fn,
);
let state = SessionStateBuilder::from(state)
.with_analyzer_rules(analyzers)
.with_optimizer_rules(optimizers)
.with_physical_optimizer_rules(physical_optimizers)
.build();
if options.physical_optimizer.phase_span_enabled() {
TracingQueryPlanner::instrument_state_with_level(state, span_level)
} else {
state
}
}
fn instrument_analyzer_rules(
rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
options: &RuleInstrumentationOptions,
span_create_fn: &Arc<RuleSpanCreateFn>,
phase_span_create_fn: &Arc<PhaseSpanCreateFn>,
) -> Vec<Arc<dyn AnalyzerRule + Send + Sync>> {
let level = options.analyzer;
if !level.phase_span_enabled() || rules.is_empty() {
return rules;
}
let mut result = Vec::with_capacity(rules.len() + 2);
result.push(Arc::new(AnalyzerPhaseSentinel {
phase_span_create_fn: phase_span_create_fn.clone(),
plan_diff: options.plan_diff,
}) as Arc<dyn AnalyzerRule + Send + Sync>);
for rule in rules {
if level.rule_spans_enabled() {
result.push(Arc::new(InstrumentedAnalyzerRule::new(
rule,
options.clone(),
span_create_fn.clone(),
)) as Arc<dyn AnalyzerRule + Send + Sync>);
} else {
result.push(rule);
}
}
result.push(Arc::new(AnalyzerPhaseSentinel {
phase_span_create_fn: phase_span_create_fn.clone(),
plan_diff: options.plan_diff,
}) as Arc<dyn AnalyzerRule + Send + Sync>);
result
}
fn instrument_optimizer_rules(
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
options: &RuleInstrumentationOptions,
span_create_fn: &Arc<RuleSpanCreateFn>,
phase_span_create_fn: &Arc<PhaseSpanCreateFn>,
) -> Vec<Arc<dyn OptimizerRule + Send + Sync>> {
let level = options.optimizer;
if !level.phase_span_enabled() || rules.is_empty() {
return rules;
}
let mut result = Vec::with_capacity(rules.len() + 2);
result.push(Arc::new(OptimizerPhaseSentinel {
phase_span_create_fn: phase_span_create_fn.clone(),
plan_diff: options.plan_diff,
}) as Arc<dyn OptimizerRule + Send + Sync>);
for rule in rules {
if level.rule_spans_enabled() {
result.push(Arc::new(InstrumentedOptimizerRule::new(
rule,
options.clone(),
span_create_fn.clone(),
)) as Arc<dyn OptimizerRule + Send + Sync>);
} else {
result.push(rule);
}
}
result.push(Arc::new(OptimizerPhaseSentinel {
phase_span_create_fn: phase_span_create_fn.clone(),
plan_diff: options.plan_diff,
}) as Arc<dyn OptimizerRule + Send + Sync>);
result
}
fn instrument_physical_optimizer_rules(
rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
options: &RuleInstrumentationOptions,
span_create_fn: &Arc<RuleSpanCreateFn>,
phase_span_create_fn: &Arc<PhaseSpanCreateFn>,
) -> Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> {
let level = options.physical_optimizer;
if !level.phase_span_enabled() || rules.is_empty() {
return rules;
}
let mut result = Vec::with_capacity(rules.len() + 2);
result.push(Arc::new(PhysicalOptimizerPhaseSentinel {
phase_span_create_fn: phase_span_create_fn.clone(),
plan_diff: options.plan_diff,
}) as Arc<dyn PhysicalOptimizerRule + Send + Sync>);
for rule in rules {
if level.rule_spans_enabled() {
result.push(Arc::new(InstrumentedPhysicalOptimizerRule::new(
rule,
options.clone(),
span_create_fn.clone(),
))
as Arc<dyn PhysicalOptimizerRule + Send + Sync>);
} else {
result.push(rule);
}
}
result.push(Arc::new(PhysicalOptimizerPhaseSentinel {
phase_span_create_fn: phase_span_create_fn.clone(),
plan_diff: options.plan_diff,
}) as Arc<dyn PhysicalOptimizerRule + Send + Sync>);
result
}