use crate::planner::TracingQueryPlanner;
use crate::rule_options::RuleInstrumentationOptions;
use datafusion::common::config::ConfigOptions;
use datafusion::common::tree_node::{Transformed, TreeNodeRewriter};
use datafusion::common::{DataFusionError, Result};
use datafusion::execution::SessionState;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::logical_expr::LogicalPlan;
use datafusion::optimizer::analyzer::AnalyzerRule;
use datafusion::optimizer::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::{ExecutionPlan, displayable};
use similar::{ChangeTag, TextDiff};
use std::cell::RefCell;
use std::fmt::Debug;
use std::sync::Arc;
use tracing::Level;
use tracing::Span;
mod sentinel_names {
pub const ANALYZER: &str = "__tracing_analyzer_phase";
pub const OPTIMIZER: &str = "__tracing_optimizer_phase";
pub const PHYSICAL_OPTIMIZER: &str = "__tracing_physical_optimizer_phase";
}
mod phase_names {
pub const ANALYZE_LOGICAL_PLAN: &str = "analyze_logical_plan";
pub const OPTIMIZE_LOGICAL_PLAN: &str = "optimize_logical_plan";
pub const OPTIMIZE_PHYSICAL_PLAN: &str = "optimize_physical_plan";
}
trait FormatPlan {
fn format_for_diff(&self) -> String;
}
impl FormatPlan for LogicalPlan {
fn format_for_diff(&self) -> String {
self.display_indent_schema().to_string()
}
}
impl FormatPlan for Arc<dyn ExecutionPlan> {
fn format_for_diff(&self) -> String {
displayable(self.as_ref()).indent(true).to_string()
}
}
pub(crate) type PhaseSpanCreateFn = dyn Fn(&str) -> Span + Send + Sync;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum PlanningPhase {
Analyzer,
Optimizer,
PhysicalOptimizer,
}
struct PlanningContext {
phase: PlanningPhase,
_entered: tracing::span::EnteredSpan,
plan_before: Option<String>,
effective_rules: Vec<String>,
}
struct OptimizerPassTracker {
parent_span_id: Option<tracing::span::Id>,
pass_count: usize,
}
impl OptimizerPassTracker {
const fn new() -> Self {
Self {
parent_span_id: None,
pass_count: 0,
}
}
fn get_and_increment(
&mut self,
current_parent_id: Option<tracing::span::Id>,
) -> usize {
if self.parent_span_id != current_parent_id {
self.parent_span_id = current_parent_id;
self.pass_count = 0;
}
let pass = self.pass_count;
self.pass_count += 1;
pass
}
fn reset(&mut self) {
self.parent_span_id = None;
self.pass_count = 0;
}
}
thread_local! {
static PLANNING_CONTEXT: RefCell<Option<PlanningContext>> = const { RefCell::new(None) };
static OPTIMIZER_PASS_TRACKER: RefCell<OptimizerPassTracker> = const { RefCell::new(OptimizerPassTracker::new()) };
}
fn record_modified_rule_in_context(rule_name: &str) {
PLANNING_CONTEXT.with(|cell| {
if let Some(ref mut ctx) = *cell.borrow_mut() {
ctx.effective_rules.push(rule_name.to_string());
}
});
}
fn drop_planning_context() {
let ctx = PLANNING_CONTEXT.with(|cell| cell.borrow_mut().take());
if let Some(ctx) = ctx {
if matches!(
ctx.phase,
PlanningPhase::Optimizer | PlanningPhase::PhysicalOptimizer
) {
OPTIMIZER_PASS_TRACKER.with(|cell| cell.borrow_mut().reset());
}
drop(ctx);
}
}
fn close_phase_span<P: FormatPlan>(ctx: PlanningContext, plan_after: &P) {
let current = Span::current();
if !current.is_disabled() {
if !ctx.effective_rules.is_empty() {
current.record("datafusion.effective_rules", ctx.effective_rules.join(", "));
}
if let Some(before) = &ctx.plan_before {
let after = plan_after.format_for_diff();
if before != &after {
let diff = generate_plan_diff(before, &after);
current.record("datafusion.plan_diff", diff);
}
}
}
}
struct AnalyzerPhaseSentinel {
phase_span_create_fn: Arc<PhaseSpanCreateFn>,
plan_diff: bool,
}
impl Debug for AnalyzerPhaseSentinel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AnalyzerPhaseSentinel").finish()
}
}
impl AnalyzerRule for AnalyzerPhaseSentinel {
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result<LogicalPlan> {
PLANNING_CONTEXT.with(|cell| {
let mut guard = cell.borrow_mut();
let should_open = guard
.as_ref()
.map(|ctx| ctx.phase != PlanningPhase::Analyzer)
.unwrap_or(true);
if should_open {
let span = (self.phase_span_create_fn)(phase_names::ANALYZE_LOGICAL_PLAN);
let plan_before = if self.plan_diff && !span.is_disabled() {
Some(plan.format_for_diff())
} else {
None
};
*guard = Some(PlanningContext {
phase: PlanningPhase::Analyzer,
_entered: span.entered(),
plan_before,
effective_rules: Vec::new(),
});
} else {
if let Some(ctx) = guard.take() {
close_phase_span(ctx, &plan);
}
}
});
Ok(plan)
}
fn name(&self) -> &str {
sentinel_names::ANALYZER
}
}
struct OptimizerPhaseSentinel {
phase_span_create_fn: Arc<PhaseSpanCreateFn>,
plan_diff: bool,
}
impl Debug for OptimizerPhaseSentinel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OptimizerPhaseSentinel").finish()
}
}
impl OptimizerRule for OptimizerPhaseSentinel {
fn name(&self) -> &str {
sentinel_names::OPTIMIZER
}
fn apply_order(&self) -> Option<ApplyOrder> {
None
}
#[allow(deprecated)]
fn supports_rewrite(&self) -> bool {
true
}
fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
PLANNING_CONTEXT.with(|cell| {
let mut guard = cell.borrow_mut();
let should_open = guard
.as_ref()
.map(|ctx| ctx.phase != PlanningPhase::Optimizer)
.unwrap_or(true);
if should_open {
let current_parent_id = Span::current().id();
let pass = OPTIMIZER_PASS_TRACKER.with(|tracker| {
tracker.borrow_mut().get_and_increment(current_parent_id)
});
let span =
(self.phase_span_create_fn)(phase_names::OPTIMIZE_LOGICAL_PLAN);
if !span.is_disabled() {
let max_passes = config.options().optimizer.max_passes;
span.record("datafusion.optimizer.pass", pass + 1);
span.record("datafusion.optimizer.max_passes", max_passes as i64);
}
let plan_before = if self.plan_diff && !span.is_disabled() {
Some(plan.format_for_diff())
} else {
None
};
*guard = Some(PlanningContext {
phase: PlanningPhase::Optimizer,
_entered: span.entered(),
plan_before,
effective_rules: Vec::new(),
});
} else {
if let Some(ctx) = guard.take() {
close_phase_span(ctx, &plan);
}
}
});
Ok(Transformed::no(plan))
}
}
struct PhysicalOptimizerPhaseSentinel {
phase_span_create_fn: Arc<PhaseSpanCreateFn>,
plan_diff: bool,
}
impl Debug for PhysicalOptimizerPhaseSentinel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PhysicalOptimizerPhaseSentinel").finish()
}
}
impl PhysicalOptimizerRule for PhysicalOptimizerPhaseSentinel {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
PLANNING_CONTEXT.with(|cell| {
let mut guard = cell.borrow_mut();
let should_open = guard
.as_ref()
.map(|ctx| ctx.phase != PlanningPhase::PhysicalOptimizer)
.unwrap_or(true);
if should_open {
let span =
(self.phase_span_create_fn)(phase_names::OPTIMIZE_PHYSICAL_PLAN);
let plan_before = if self.plan_diff && !span.is_disabled() {
Some(plan.format_for_diff())
} else {
None
};
*guard = Some(PlanningContext {
phase: PlanningPhase::PhysicalOptimizer,
_entered: span.entered(),
plan_before,
effective_rules: Vec::new(),
});
} else {
if let Some(ctx) = guard.take() {
close_phase_span(ctx, &plan);
}
OPTIMIZER_PASS_TRACKER.with(|tracker| {
tracker.borrow_mut().reset();
});
}
});
Ok(plan)
}
fn name(&self) -> &str {
sentinel_names::PHYSICAL_OPTIMIZER
}
fn schema_check(&self) -> bool {
true
}
}
struct ErrorCleanupAnalyzerRule {
inner: Arc<dyn AnalyzerRule + Send + Sync>,
}
impl Debug for ErrorCleanupAnalyzerRule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ErrorCleanupAnalyzerRule").finish()
}
}
impl AnalyzerRule for ErrorCleanupAnalyzerRule {
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {
let result = self.inner.analyze(plan, config);
if result.is_err() {
drop_planning_context();
}
result
}
fn name(&self) -> &str {
self.inner.name()
}
}
struct ErrorCleanupOptimizerRule {
inner: Arc<dyn OptimizerRule + Send + Sync>,
}
impl Debug for ErrorCleanupOptimizerRule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ErrorCleanupOptimizerRule").finish()
}
}
impl OptimizerRule for ErrorCleanupOptimizerRule {
fn name(&self) -> &str {
self.inner.name()
}
fn apply_order(&self) -> Option<ApplyOrder> {
self.inner.apply_order()
}
#[allow(deprecated)]
fn supports_rewrite(&self) -> bool {
self.inner.supports_rewrite()
}
fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
let result = self.inner.rewrite(plan, config);
if result.is_err() && !config.options().optimizer.skip_failed_rules {
drop_planning_context();
}
result
}
}
struct ErrorCleanupPhysicalOptimizerRule {
inner: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
}
impl Debug for ErrorCleanupPhysicalOptimizerRule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ErrorCleanupPhysicalOptimizerRule").finish()
}
}
impl PhysicalOptimizerRule for ErrorCleanupPhysicalOptimizerRule {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let result = self.inner.optimize(plan, config);
if result.is_err() {
drop_planning_context();
}
result
}
fn name(&self) -> &str {
self.inner.name()
}
fn schema_check(&self) -> bool {
self.inner.schema_check()
}
}
struct SingleSpanTreeTraverser<'a> {
apply_order: ApplyOrder,
rule: &'a dyn OptimizerRule,
config: &'a dyn OptimizerConfig,
}
impl<'a> SingleSpanTreeTraverser<'a> {
fn new(
apply_order: ApplyOrder,
rule: &'a dyn OptimizerRule,
config: &'a dyn OptimizerConfig,
) -> Self {
Self {
apply_order,
rule,
config,
}
}
}
impl TreeNodeRewriter for SingleSpanTreeTraverser<'_> {
type Node = LogicalPlan;
fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
if self.apply_order == ApplyOrder::TopDown {
self.rule.rewrite(node, self.config)
} else {
Ok(Transformed::no(node))
}
}
fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
if self.apply_order == ApplyOrder::BottomUp {
self.rule.rewrite(node, self.config)
} else {
Ok(Transformed::no(node))
}
}
}
fn generate_plan_diff(before: &str, after: &str) -> String {
let diff = TextDiff::from_lines(before, after);
let mut output = String::new();
for change in diff.iter_all_changes() {
let sign = match change.tag() {
ChangeTag::Delete => "-",
ChangeTag::Insert => "+",
ChangeTag::Equal => " ",
};
output.push_str(&format!("{}{}", sign, change));
}
output
}
fn detect_and_record_modification(
before_str: &str,
after_str: &str,
span: &Span,
record_diff: bool,
rule_name: &str,
) {
if before_str == after_str {
return;
}
if record_diff {
let diff = generate_plan_diff(before_str, after_str);
span.record("datafusion.plan_diff", diff);
}
span.record("otel.name", format!("{} (modified)", rule_name));
record_modified_rule_in_context(rule_name);
}
pub(crate) type RuleSpanCreateFn = dyn Fn(&str) -> Span + Send + Sync;
struct InstrumentedAnalyzerRule {
inner: Arc<dyn AnalyzerRule + Send + Sync>,
options: RuleInstrumentationOptions,
span_create_fn: Arc<RuleSpanCreateFn>,
}
impl Debug for InstrumentedAnalyzerRule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InstrumentedAnalyzerRule")
.field("inner", &self.inner)
.field("options", &self.options)
.finish()
}
}
impl InstrumentedAnalyzerRule {
fn new(
inner: Arc<dyn AnalyzerRule + Send + Sync>,
options: RuleInstrumentationOptions,
span_create_fn: Arc<RuleSpanCreateFn>,
) -> Self {
Self {
inner,
options,
span_create_fn,
}
}
}
impl AnalyzerRule for InstrumentedAnalyzerRule {
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {
let span = (self.span_create_fn)(self.name());
let enter = span.enter();
let plan_before = if !span.is_disabled() {
Some(plan.clone())
} else {
None
};
let result = self.inner.analyze(plan, config);
if !span.is_disabled() {
match &result {
Ok(plan_after) => {
if let Some(plan_before) = plan_before {
if &plan_before != plan_after {
let before_str = plan_before.format_for_diff();
let after_str = plan_after.format_for_diff();
detect_and_record_modification(
&before_str,
&after_str,
&span,
self.options.plan_diff,
self.name(),
);
}
}
}
Err(e) => {
tracing::error!(error = %e, "AnalyzerRule failed");
}
}
}
if result.is_err() {
drop(enter);
drop_planning_context();
}
result
}
fn name(&self) -> &str {
self.inner.name()
}
}
struct InstrumentedOptimizerRule {
inner: Arc<dyn OptimizerRule + Send + Sync>,
options: RuleInstrumentationOptions,
span_create_fn: Arc<RuleSpanCreateFn>,
}
impl Debug for InstrumentedOptimizerRule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InstrumentedOptimizerRule")
.field("inner", &self.inner)
.field("options", &self.options)
.finish()
}
}
impl InstrumentedOptimizerRule {
fn new(
inner: Arc<dyn OptimizerRule + Send + Sync>,
options: RuleInstrumentationOptions,
span_create_fn: Arc<RuleSpanCreateFn>,
) -> Self {
Self {
inner,
options,
span_create_fn,
}
}
fn apply_inner(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
match self.inner.apply_order() {
Some(apply_order) => {
plan.rewrite_with_subqueries(&mut SingleSpanTreeTraverser::new(
apply_order,
self.inner.as_ref(),
config,
))
}
None => self.inner.rewrite(plan, config),
}
}
}
impl OptimizerRule for InstrumentedOptimizerRule {
fn name(&self) -> &str {
self.inner.name()
}
fn apply_order(&self) -> Option<ApplyOrder> {
None
}
#[allow(deprecated)]
fn supports_rewrite(&self) -> bool {
self.inner.supports_rewrite()
}
fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
let span = (self.span_create_fn)(self.name());
let enter = span.enter();
let plan_before = if !span.is_disabled() {
Some(plan.clone())
} else {
None
};
let result = self.apply_inner(plan, config);
if !span.is_disabled() {
match &result {
Ok(transformed) => {
if transformed.transformed
&& let Some(before) = plan_before
{
let before_str = before.format_for_diff();
let after_str = transformed.data.format_for_diff();
detect_and_record_modification(
&before_str,
&after_str,
&span,
self.options.plan_diff,
self.name(),
);
}
}
Err(e) => {
tracing::error!(error = %e, "OptimizerRule failed");
}
}
}
if result.is_err() && !config.options().optimizer.skip_failed_rules {
drop(enter);
drop_planning_context();
}
result
}
}
struct InstrumentedPhysicalOptimizerRule {
inner: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
options: RuleInstrumentationOptions,
span_create_fn: Arc<RuleSpanCreateFn>,
}
impl Debug for InstrumentedPhysicalOptimizerRule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InstrumentedPhysicalOptimizerRule")
.field("inner", &self.inner)
.field("options", &self.options)
.finish()
}
}
impl InstrumentedPhysicalOptimizerRule {
fn new(
inner: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
options: RuleInstrumentationOptions,
span_create_fn: Arc<RuleSpanCreateFn>,
) -> Self {
Self {
inner,
options,
span_create_fn,
}
}
}
impl PhysicalOptimizerRule for InstrumentedPhysicalOptimizerRule {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let span = (self.span_create_fn)(self.name());
let enter = span.enter();
let plan_clone = if !span.is_disabled() {
Some(Arc::clone(&plan))
} else {
None
};
let result = self.inner.optimize(plan, config);
if let Some(old_plan) = plan_clone {
match &result {
Ok(new_plan) => {
if !Arc::ptr_eq(&old_plan, new_plan) {
let before_str = old_plan.format_for_diff();
let after_str = new_plan.format_for_diff();
detect_and_record_modification(
&before_str,
&after_str,
&span,
self.options.plan_diff,
self.name(),
);
}
}
Err(e) => {
tracing::error!(error = %e, "PhysicalOptimizerRule failed");
}
}
}
if result.is_err() {
drop(enter);
drop_planning_context();
}
result
}
fn name(&self) -> &str {
self.inner.name()
}
fn schema_check(&self) -> bool {
self.inner.schema_check()
}
}
#[doc(hidden)]
pub fn instrument_session_state(
state: SessionState,
options: RuleInstrumentationOptions,
span_create_fn: Arc<RuleSpanCreateFn>,
phase_span_create_fn: Arc<PhaseSpanCreateFn>,
span_level: Level,
) -> SessionState {
let analyzers = instrument_analyzer_rules(
state.analyzer().rules.clone(),
&options,
&span_create_fn,
&phase_span_create_fn,
);
let optimizers = instrument_optimizer_rules(
Vec::from(state.optimizers()),
&options,
&span_create_fn,
&phase_span_create_fn,
);
let physical_optimizers = instrument_physical_optimizer_rules(
Vec::from(state.physical_optimizers()),
&options,
&span_create_fn,
&phase_span_create_fn,
);
let state = SessionStateBuilder::from(state)
.with_analyzer_rules(analyzers)
.with_optimizer_rules(optimizers)
.with_physical_optimizer_rules(physical_optimizers)
.build();
if options.physical_optimizer.phase_span_enabled() {
TracingQueryPlanner::instrument_state_with_level(state, span_level)
} else {
state
}
}
fn instrument_analyzer_rules(
rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
options: &RuleInstrumentationOptions,
span_create_fn: &Arc<RuleSpanCreateFn>,
phase_span_create_fn: &Arc<PhaseSpanCreateFn>,
) -> Vec<Arc<dyn AnalyzerRule + Send + Sync>> {
let level = options.analyzer;
if !level.phase_span_enabled() || rules.is_empty() {
return rules;
}
let mut result = Vec::with_capacity(rules.len() + 2);
result.push(Arc::new(AnalyzerPhaseSentinel {
phase_span_create_fn: phase_span_create_fn.clone(),
plan_diff: options.plan_diff,
}) as Arc<dyn AnalyzerRule + Send + Sync>);
for rule in rules {
if level.rule_spans_enabled() {
result.push(Arc::new(InstrumentedAnalyzerRule::new(
rule,
options.clone(),
span_create_fn.clone(),
)) as Arc<dyn AnalyzerRule + Send + Sync>);
} else {
result.push(Arc::new(ErrorCleanupAnalyzerRule { inner: rule })
as Arc<dyn AnalyzerRule + Send + Sync>);
}
}
result.push(Arc::new(AnalyzerPhaseSentinel {
phase_span_create_fn: phase_span_create_fn.clone(),
plan_diff: options.plan_diff,
}) as Arc<dyn AnalyzerRule + Send + Sync>);
result
}
fn instrument_optimizer_rules(
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
options: &RuleInstrumentationOptions,
span_create_fn: &Arc<RuleSpanCreateFn>,
phase_span_create_fn: &Arc<PhaseSpanCreateFn>,
) -> Vec<Arc<dyn OptimizerRule + Send + Sync>> {
let level = options.optimizer;
if !level.phase_span_enabled() || rules.is_empty() {
return rules;
}
let mut result = Vec::with_capacity(rules.len() + 2);
result.push(Arc::new(OptimizerPhaseSentinel {
phase_span_create_fn: phase_span_create_fn.clone(),
plan_diff: options.plan_diff,
}) as Arc<dyn OptimizerRule + Send + Sync>);
for rule in rules {
if level.rule_spans_enabled() {
result.push(Arc::new(InstrumentedOptimizerRule::new(
rule,
options.clone(),
span_create_fn.clone(),
)) as Arc<dyn OptimizerRule + Send + Sync>);
} else {
result.push(Arc::new(ErrorCleanupOptimizerRule { inner: rule })
as Arc<dyn OptimizerRule + Send + Sync>);
}
}
result.push(Arc::new(OptimizerPhaseSentinel {
phase_span_create_fn: phase_span_create_fn.clone(),
plan_diff: options.plan_diff,
}) as Arc<dyn OptimizerRule + Send + Sync>);
result
}
fn instrument_physical_optimizer_rules(
rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
options: &RuleInstrumentationOptions,
span_create_fn: &Arc<RuleSpanCreateFn>,
phase_span_create_fn: &Arc<PhaseSpanCreateFn>,
) -> Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> {
let level = options.physical_optimizer;
if !level.phase_span_enabled() || rules.is_empty() {
return rules;
}
let mut result = Vec::with_capacity(rules.len() + 2);
result.push(Arc::new(PhysicalOptimizerPhaseSentinel {
phase_span_create_fn: phase_span_create_fn.clone(),
plan_diff: options.plan_diff,
}) as Arc<dyn PhysicalOptimizerRule + Send + Sync>);
for rule in rules {
if level.rule_spans_enabled() {
result.push(Arc::new(InstrumentedPhysicalOptimizerRule::new(
rule,
options.clone(),
span_create_fn.clone(),
))
as Arc<dyn PhysicalOptimizerRule + Send + Sync>);
} else {
result.push(Arc::new(ErrorCleanupPhysicalOptimizerRule { inner: rule })
as Arc<dyn PhysicalOptimizerRule + Send + Sync>);
}
}
result.push(Arc::new(PhysicalOptimizerPhaseSentinel {
phase_span_create_fn: phase_span_create_fn.clone(),
plan_diff: options.plan_diff,
}) as Arc<dyn PhysicalOptimizerRule + Send + Sync>);
result
}
#[cfg(test)]
mod tests {
use super::*;
use datafusion::common::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{SessionConfig, SessionContext};
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tracing::Instrument as _;
use tracing::field::{Field, Visit};
use tracing::{Id, Subscriber};
use tracing_subscriber::Layer;
use tracing_subscriber::layer::Context;
use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::LookupSpan;
struct CapturedName(String);
#[derive(Clone, Debug)]
struct SpanEvent {
kind: &'static str, name: String,
parent: Option<String>,
}
#[derive(Clone, Default)]
struct SpanCapture(Arc<Mutex<Vec<SpanEvent>>>);
impl SpanCapture {
fn snapshot(&self) -> Vec<SpanEvent> {
self.0.lock().unwrap().clone()
}
}
struct OtelNameVisitor(Option<String>);
impl Visit for OtelNameVisitor {
fn record_str(&mut self, field: &Field, value: &str) {
if field.name() == "otel.name" {
self.0 = Some(value.to_owned());
}
}
fn record_debug(&mut self, _: &Field, _: &dyn Debug) {}
}
impl<S: Subscriber + for<'s> LookupSpan<'s>> Layer<S> for SpanCapture {
fn on_new_span(
&self,
attrs: &tracing::span::Attributes<'_>,
id: &Id,
ctx: Context<'_, S>,
) {
let mut v = OtelNameVisitor(None);
attrs.record(&mut v);
let name = v.0.unwrap_or_else(|| attrs.metadata().name().to_owned());
if let Some(span) = ctx.span(id) {
span.extensions_mut().insert(CapturedName(name.clone()));
}
let parent = ctx.span(id).and_then(|span| {
span.parent().and_then(|p| {
p.extensions().get::<CapturedName>().map(|n| n.0.clone())
})
});
self.0.lock().unwrap().push(SpanEvent {
kind: "open",
name,
parent,
});
}
fn on_close(&self, id: Id, ctx: Context<'_, S>) {
let name = ctx
.span(&id)
.and_then(|s| s.extensions().get::<CapturedName>().map(|n| n.0.clone()))
.unwrap_or_default();
self.0.lock().unwrap().push(SpanEvent {
kind: "close",
name,
parent: None,
});
}
}
struct FailOnceAnalyzer(Arc<AtomicBool>);
impl Debug for FailOnceAnalyzer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("FailOnceAnalyzer")
}
}
impl AnalyzerRule for FailOnceAnalyzer {
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
if self.0.swap(false, Ordering::SeqCst) {
return Err(DataFusionError::Internal(
"intentional analyzer failure".into(),
));
}
Ok(plan)
}
fn name(&self) -> &str {
"fail_once"
}
}
fn make_ctx_with_fail_once(options: RuleInstrumentationOptions) -> SessionContext {
let state = SessionStateBuilder::new()
.with_config(SessionConfig::new())
.with_default_features()
.with_analyzer_rule(Arc::new(FailOnceAnalyzer(Arc::new(AtomicBool::new(
true,
)))))
.build();
let state = crate::instrument_rules_with_trace_spans!(
options: options,
state: state
);
SessionContext::new_with_state(state)
}
fn is_planning_context_open() -> bool {
PLANNING_CONTEXT.with(|cell| cell.borrow().is_some())
}
fn optimizer_pass_count() -> usize {
OPTIMIZER_PASS_TRACKER.with(|cell| cell.borrow().pass_count)
}
struct FailOnceOptimizer(Arc<AtomicBool>);
impl Debug for FailOnceOptimizer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("FailOnceOptimizer")
}
}
impl OptimizerRule for FailOnceOptimizer {
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
if self.0.swap(false, Ordering::SeqCst) {
return Err(DataFusionError::Internal(
"intentional optimizer failure".into(),
));
}
Ok(Transformed::no(plan))
}
fn name(&self) -> &str {
"fail_once_optimizer"
}
}
fn make_ctx_with_fail_once_optimizer(
options: RuleInstrumentationOptions,
) -> SessionContext {
let state = SessionStateBuilder::new()
.with_config(SessionConfig::new())
.with_default_features()
.with_optimizer_rule(Arc::new(FailOnceOptimizer(Arc::new(AtomicBool::new(
true,
)))))
.build();
let state = crate::instrument_rules_with_trace_spans!(
options: options,
state: state
);
SessionContext::new_with_state(state)
}
fn make_ctx_with_fail_once_optimizer_skip_failed(
options: RuleInstrumentationOptions,
) -> SessionContext {
let mut config = SessionConfig::new();
config.options_mut().optimizer.skip_failed_rules = true;
let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.with_optimizer_rule(Arc::new(FailOnceOptimizer(Arc::new(AtomicBool::new(
true,
)))))
.build();
let state = crate::instrument_rules_with_trace_spans!(
options: options,
state: state
);
SessionContext::new_with_state(state)
}
struct FailOncePhysicalOptimizer(Arc<AtomicBool>);
impl Debug for FailOncePhysicalOptimizer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("FailOncePhysicalOptimizer")
}
}
impl PhysicalOptimizerRule for FailOncePhysicalOptimizer {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
if self.0.swap(false, Ordering::SeqCst) {
return Err(DataFusionError::Internal(
"intentional physical optimizer failure".into(),
));
}
Ok(plan)
}
fn name(&self) -> &str {
"fail_once_physical_optimizer"
}
fn schema_check(&self) -> bool {
false
}
}
fn make_ctx_with_fail_once_physical_optimizer(
options: RuleInstrumentationOptions,
) -> SessionContext {
let state = SessionStateBuilder::new()
.with_config(SessionConfig::new())
.with_default_features()
.with_physical_optimizer_rule(Arc::new(FailOncePhysicalOptimizer(Arc::new(
AtomicBool::new(true),
))))
.build();
let state = crate::instrument_rules_with_trace_spans!(
options: options,
state: state
);
SessionContext::new_with_state(state)
}
fn count(events: &[SpanEvent], kind: &str, name: &str) -> usize {
events
.iter()
.filter(|e| e.kind == kind && e.name == name)
.count()
}
async fn run_query(ctx: &SessionContext) -> Result<()> {
ctx.sql("SELECT 1").await?.collect().await?;
Ok(())
}
#[tokio::test]
async fn phase_only_closes_phase_span_on_analyzer_error() {
let ctx = make_ctx_with_fail_once(RuleInstrumentationOptions::phase_only());
let result = run_query(&ctx).await;
assert!(result.is_err(), "expected analyzer failure");
assert!(
!is_planning_context_open(),
"phase span must not be left open after analyzer error (phase_only mode)"
);
}
#[tokio::test]
async fn full_closes_phase_span_on_analyzer_error() {
let ctx = make_ctx_with_fail_once(RuleInstrumentationOptions::full());
let result = run_query(&ctx).await;
assert!(result.is_err(), "expected analyzer failure");
assert!(
!is_planning_context_open(),
"phase span must not be left open after analyzer error (full mode)"
);
}
#[tokio::test]
async fn stale_phase_span_not_parent_of_subsequent_query() {
let capture = SpanCapture::default();
let subscriber = tracing_subscriber::registry()
.with(tracing_subscriber::filter::LevelFilter::TRACE)
.with(capture.clone());
let _guard = tracing::subscriber::set_default(subscriber);
let ctx = make_ctx_with_fail_once(RuleInstrumentationOptions::phase_only());
let _ = run_query(&ctx).await;
let after_failure = capture.snapshot();
let opened_after_failure = count(&after_failure, "open", "analyze_logical_plan");
let closed_after_failure = count(&after_failure, "close", "analyze_logical_plan");
assert_eq!(
opened_after_failure, 1,
"one phase span opened during failed query"
);
assert_eq!(
closed_after_failure, 1,
"phase span must be closed immediately after analyzer error"
);
async {
ctx.sql("SELECT 1").await.unwrap().collect().await.unwrap();
}
.instrument(tracing::info_span!("successful_query"))
.await;
let after_success = capture.snapshot();
let total_opened = count(&after_success, "open", "analyze_logical_plan");
let total_closed = count(&after_success, "close", "analyze_logical_plan");
assert_eq!(
total_opened, total_closed,
"every analyze_logical_plan span must be closed"
);
let successful_query_parent = after_success
.iter()
.find(|e| e.kind == "open" && e.name == "successful_query")
.and_then(|e| e.parent.as_deref());
assert_ne!(
successful_query_parent,
Some("analyze_logical_plan"),
"successful_query must not be parented under the stale analyze_logical_plan span"
);
}
#[tokio::test]
async fn phase_only_closes_phase_span_on_optimizer_error() {
let ctx =
make_ctx_with_fail_once_optimizer(RuleInstrumentationOptions::phase_only());
let result = run_query(&ctx).await;
assert!(result.is_err(), "expected optimizer failure");
assert!(
!is_planning_context_open(),
"phase span must not be left open after optimizer error (phase_only mode)"
);
assert_eq!(
optimizer_pass_count(),
0,
"optimizer pass tracker must be reset after fatal optimizer error (phase_only mode)"
);
}
#[tokio::test]
async fn full_closes_phase_span_on_optimizer_error() {
let ctx = make_ctx_with_fail_once_optimizer(RuleInstrumentationOptions::full());
let result = run_query(&ctx).await;
assert!(result.is_err(), "expected optimizer failure");
assert!(
!is_planning_context_open(),
"phase span must not be left open after optimizer error (full mode)"
);
assert_eq!(
optimizer_pass_count(),
0,
"optimizer pass tracker must be reset after fatal optimizer error (full mode)"
);
}
#[tokio::test]
async fn phase_only_closes_phase_span_on_physical_optimizer_error() {
let ctx = make_ctx_with_fail_once_physical_optimizer(
RuleInstrumentationOptions::phase_only(),
);
let result = run_query(&ctx).await;
assert!(result.is_err(), "expected physical optimizer failure");
assert!(
!is_planning_context_open(),
"phase span must not be left open after physical optimizer error (phase_only mode)"
);
assert_eq!(
optimizer_pass_count(),
0,
"optimizer pass tracker must be reset after fatal physical optimizer error (phase_only mode)"
);
}
#[tokio::test]
async fn full_closes_phase_span_on_physical_optimizer_error() {
let ctx = make_ctx_with_fail_once_physical_optimizer(
RuleInstrumentationOptions::full(),
);
let result = run_query(&ctx).await;
assert!(result.is_err(), "expected physical optimizer failure");
assert!(
!is_planning_context_open(),
"phase span must not be left open after physical optimizer error (full mode)"
);
assert_eq!(
optimizer_pass_count(),
0,
"optimizer pass tracker must be reset after fatal physical optimizer error (full mode)"
);
}
#[tokio::test]
async fn skip_failed_rules_sentinel_closes_phase_span() {
let ctx = make_ctx_with_fail_once_optimizer_skip_failed(
RuleInstrumentationOptions::phase_only(),
);
let result = run_query(&ctx).await;
assert!(
result.is_ok(),
"query must succeed when skip_failed_rules is true, got: {result:?}"
);
assert!(
!is_planning_context_open(),
"phase span must be closed by the sentinel after skip_failed_rules optimizer error"
);
}
}