use crate::instrumented_exec::{InstrumentedExec, SpanCreateFn};
use crate::options::InstrumentationOptions;
use crate::utils::InternalOptimizerGuard;
use datafusion::common::runtime::{JoinSetTracer, set_join_set_tracer};
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion::{
config::ConfigOptions, physical_optimizer::PhysicalOptimizerRule,
physical_plan::ExecutionPlan,
};
use futures::FutureExt;
use futures::future::BoxFuture;
use std::any::Any;
use std::fmt::{self, Debug};
use std::sync::Arc;
use std::sync::Once;
use tracing::Span;
use tracing_futures::Instrument;
pub fn new_instrument_rule(
span_create_fn: Arc<SpanCreateFn>,
options: InstrumentationOptions,
) -> Arc<dyn PhysicalOptimizerRule + Send + Sync> {
INIT.call_once(|| {
set_join_set_tracer(&SpanTracer).unwrap_or_else(|e| {
tracing::warn!("set_join_set_tracer failed to set join_set_tracer: {}", e);
})
});
Arc::new(InstrumentRule {
span_create_fn,
options,
})
}
struct InstrumentRule {
span_create_fn: Arc<SpanCreateFn>,
options: InstrumentationOptions,
}
impl Debug for InstrumentRule {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(self.name()).finish()
}
}
impl PhysicalOptimizerRule for InstrumentRule {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
let _guard = InternalOptimizerGuard::new();
plan.transform_down(|plan| {
if InstrumentedExec::is_instrumented(plan.as_ref()) {
return Ok(Transformed::no(plan));
}
Ok(Transformed::yes(Arc::new(InstrumentedExec::new(
plan,
self.span_create_fn.clone(),
&self.options,
))))
})
.data()
}
fn name(&self) -> &str {
"Instrument"
}
fn schema_check(&self) -> bool {
false
}
}
struct SpanTracer;
impl JoinSetTracer for SpanTracer {
fn trace_future(&self, fut: BoxedFuture) -> BoxedFuture {
fut.in_current_span().boxed()
}
fn trace_block(&self, f: BoxedClosure) -> BoxedClosure {
Box::new(move || Span::current().in_scope(f))
}
}
type BoxedAny = Box<dyn Any + Send>;
type BoxedFuture = BoxFuture<'static, BoxedAny>;
type BoxedClosure = Box<dyn FnOnce() -> BoxedAny + Send>;
static INIT: Once = Once::new();
#[cfg(test)]
mod tests {
use super::*;
use crate::options::InstrumentationOptions;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
use std::sync::Arc;
use tracing::Span;
#[test]
fn test_skip_already_instrumented() -> datafusion::error::Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
let plan = Arc::new(PlaceholderRowExec::new(schema));
let span_create_fn: Arc<SpanCreateFn> = Arc::new(Span::none);
let options = InstrumentationOptions::default();
let rule = new_instrument_rule(span_create_fn, options);
let optimized_once = rule.optimize(plan, &ConfigOptions::default())?;
let optimized_twice =
rule.optimize(optimized_once.clone(), &ConfigOptions::default())?;
assert!(
Arc::ptr_eq(&optimized_once, &optimized_twice),
"Plan should not be wrapped twice; it should be the same Arc as the first pass"
);
Ok(())
}
}