use crate::{
metrics::{MetricsRecorder, MetricsRecordingStream},
node::{NodeRecorder, NodeRecordingStream},
options::InstrumentationOptions,
preview::{PreviewFn, PreviewRecorder, PreviewRecordingStream},
utils::is_internal_optimizer_check,
};
use datafusion::{
arrow::datatypes::SchemaRef,
common::Statistics,
config::ConfigOptions,
error::Result,
execution::{SendableRecordBatchStream, TaskContext},
physical_expr::{Distribution, OrderingRequirements, PhysicalSortExpr},
physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
PhysicalExpr, PlanProperties,
execution_plan::{CardinalityEffect, InvariantLevel},
filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
},
metrics::MetricsSet,
projection::ProjectionExec,
sort_pushdown::SortOrderPushdownResult,
stream::RecordBatchStreamAdapter,
},
};
use delegate::delegate;
use std::{
any::Any,
collections::HashMap,
fmt::{self, Debug},
sync::{Arc, OnceLock},
};
use tracing::{Span, field};
use tracing_futures::Instrument;
pub(crate) type SpanCreateFn = dyn Fn() -> Span + Send + Sync;
pub struct InstrumentedExec {
inner: Arc<dyn ExecutionPlan>,
span: OnceLock<Span>,
record_metrics: bool,
metrics_recorder: OnceLock<Arc<MetricsRecorder>>,
node_recorder: OnceLock<Arc<NodeRecorder>>,
preview_limit: usize,
preview_fn: Option<Arc<PreviewFn>>,
preview_recorder: OnceLock<Arc<PreviewRecorder>>,
span_create_fn: Arc<SpanCreateFn>,
}
impl Debug for InstrumentedExec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("InstrumentedExec")
.field("inner", &self.inner)
.finish()
}
}
impl InstrumentedExec {
pub fn new(
inner: Arc<dyn ExecutionPlan>,
span_create_fn: Arc<SpanCreateFn>,
options: &InstrumentationOptions,
) -> InstrumentedExec {
Self {
inner,
span: OnceLock::new(),
record_metrics: options.record_metrics,
metrics_recorder: OnceLock::new(),
node_recorder: OnceLock::new(),
preview_limit: options.preview_limit,
preview_fn: options.preview_fn.clone(),
preview_recorder: OnceLock::new(),
span_create_fn,
}
}
fn with_new_inner(&self, inner: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(InstrumentedExec::new(
inner,
self.span_create_fn.clone(),
&InstrumentationOptions {
record_metrics: self.record_metrics,
preview_limit: self.preview_limit,
preview_fn: self.preview_fn.clone(),
custom_fields: HashMap::new(), },
))
}
fn get_span(&self) -> Span {
self.span
.get_or_init(|| self.create_populated_span())
.clone()
}
fn metrics_recording_stream(
&self,
inner_stream: SendableRecordBatchStream,
span: &Span,
) -> SendableRecordBatchStream {
if !self.record_metrics {
return inner_stream;
}
let recorder = self
.metrics_recorder
.get_or_init(|| {
Arc::new(MetricsRecorder::new(self.inner.clone(), span.clone()))
})
.clone();
Box::pin(MetricsRecordingStream::new(inner_stream, recorder))
}
fn preview_recording_stream(
&self,
inner_stream: SendableRecordBatchStream,
span: &Span,
partition: usize,
) -> SendableRecordBatchStream {
if self.preview_limit == 0 {
return inner_stream;
}
let recorder = self
.preview_recorder
.get_or_init(|| {
let partition_count = self.inner.output_partitioning().partition_count();
Arc::new(
PreviewRecorder::builder(span.clone(), partition_count)
.limit(self.preview_limit)
.preview_fn(self.preview_fn.clone())
.build(),
)
})
.clone();
Box::pin(PreviewRecordingStream::new(
inner_stream,
recorder,
partition,
))
}
fn node_recording_stream(
&self,
inner_stream: SendableRecordBatchStream,
span: &Span,
) -> SendableRecordBatchStream {
let recorder = self
.node_recorder
.get_or_init(|| Arc::new(NodeRecorder::new(self.inner.clone(), span.clone())))
.clone();
Box::pin(NodeRecordingStream::new(inner_stream, recorder))
}
fn create_populated_span(&self) -> Span {
let span = self.span_create_fn.as_ref()();
span.record("otel.name", field::display(self.inner.name()));
span.record(
"datafusion.partitioning",
field::display(self.inner.properties().partitioning.clone()),
);
span.record(
"datafusion.emission_type",
field::debug(self.inner.properties().emission_type),
);
span.record(
"datafusion.boundedness",
field::debug(self.inner.properties().boundedness),
);
span
}
pub(crate) fn is_instrumented(plan: &dyn ExecutionPlan) -> bool {
plan.as_any().is::<InstrumentedExec>()
}
}
impl ExecutionPlan for InstrumentedExec {
delegate! {
to self.inner {
fn schema(&self) -> SchemaRef;
fn properties(&self) -> &Arc<PlanProperties>;
fn name(&self) -> &str;
fn check_invariants(&self, check: InvariantLevel) -> Result<()>;
fn required_input_distribution(&self) -> Vec<Distribution>;
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>>;
fn maintains_input_order(&self) -> Vec<bool>;
fn benefits_from_input_partitioning(&self) -> Vec<bool>;
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
fn metrics(&self) -> Option<MetricsSet>;
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>;
fn supports_limit_pushdown(&self) -> bool;
fn fetch(&self) -> Option<usize>;
fn cardinality_effect(&self) -> CardinalityEffect;
fn gather_filters_for_pushdown(
&self,
phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> Result<FilterDescription>;
}
}
fn static_name() -> &'static str {
"InstrumentedExec"
}
fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if let Some(new_inner) = self
.inner
.clone()
.repartitioned(target_partitions, config)?
{
Ok(Some(self.with_new_inner(new_inner)))
} else {
Ok(None)
}
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
if let Some(new_inner) = self.inner.clone().with_fetch(limit) {
Some(self.with_new_inner(new_inner))
} else {
None
}
}
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if let Some(new_inner) = self
.inner
.clone()
.try_swapping_with_projection(projection)?
{
Ok(Some(self.with_new_inner(new_inner)))
} else {
Ok(None)
}
}
fn handle_child_pushdown_result(
&self,
phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
let FilterPushdownPropagation {
filters,
updated_node,
} = self.inner.handle_child_pushdown_result(
phase,
child_pushdown_result,
config,
)?;
let updated_node = updated_node.map(|n| self.with_new_inner(n));
Ok(FilterPushdownPropagation {
filters,
updated_node,
})
}
fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
let result = self.inner.try_pushdown_sort(order)?;
Ok(match result {
SortOrderPushdownResult::Exact { inner } => SortOrderPushdownResult::Exact {
inner: self.with_new_inner(inner),
},
SortOrderPushdownResult::Inexact { inner } => {
SortOrderPushdownResult::Inexact {
inner: self.with_new_inner(inner),
}
}
SortOrderPushdownResult::Unsupported => SortOrderPushdownResult::Unsupported,
})
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let new_inner = self.inner.clone().with_new_children(children)?;
Ok(self.with_new_inner(new_inner))
}
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
let new_inner = self.inner.clone().reset_state()?;
Ok(self.with_new_inner(new_inner))
}
fn with_new_state(
&self,
state: Arc<dyn Any + Send + Sync>,
) -> Option<Arc<dyn ExecutionPlan>> {
let new_inner = self.inner.with_new_state(state)?;
Some(self.with_new_inner(new_inner))
}
fn as_any(&self) -> &dyn Any {
if is_internal_optimizer_check() {
self
} else {
self.inner.as_any()
}
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let span = self.get_span();
let inner_stream = span.in_scope(|| self.inner.execute(partition, context))?;
let node_stream = self.node_recording_stream(inner_stream, &span);
let metrics_stream = self.metrics_recording_stream(node_stream, &span);
let preview_stream =
self.preview_recording_stream(metrics_stream, &span, partition);
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.inner.schema(),
preview_stream.instrument(span),
)))
}
}
impl DisplayAs for InstrumentedExec {
delegate! {
to self.inner {
fn fmt_as(&self, format: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result;
}
}
}