use async_trait::async_trait;
use datafusion::common::Result;
use datafusion::execution::SessionStateBuilder;
use datafusion::execution::context::{QueryPlanner, SessionState};
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::{ExecutionPlan, displayable};
use std::sync::Arc;
use tracing::{Instrument, Level};
macro_rules! span_at_level {
($level:expr, $name:expr, $($field:tt)*) => {
match $level {
Level::TRACE => tracing::trace_span!($name, $($field)*),
Level::DEBUG => tracing::debug_span!($name, $($field)*),
Level::INFO => tracing::info_span!($name, $($field)*),
Level::WARN => tracing::warn_span!($name, $($field)*),
Level::ERROR => tracing::error_span!($name, $($field)*),
}
};
}
#[derive(Debug)]
pub(crate) struct TracingQueryPlanner {
inner: Arc<dyn QueryPlanner + Send + Sync>,
level: Level,
}
impl TracingQueryPlanner {
fn new_with_level(inner: Arc<dyn QueryPlanner + Send + Sync>, level: Level) -> Self {
Self { inner, level }
}
pub(crate) fn instrument_state_with_level(
state: SessionState,
level: Level,
) -> SessionState {
let current_planner = state.query_planner().clone();
let wrapped_planner = Arc::new(Self::new_with_level(current_planner, level));
SessionStateBuilder::from(state)
.with_query_planner(wrapped_planner)
.build()
}
}
#[async_trait]
impl QueryPlanner for TracingQueryPlanner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
let span = span_at_level!(
self.level,
"create_physical_plan",
logical_plan = tracing::field::Empty,
physical_plan = tracing::field::Empty,
error = tracing::field::Empty
);
if !span.is_disabled() {
let logical_plan_str = logical_plan.display_indent_schema().to_string();
span.record("logical_plan", logical_plan_str.as_str());
}
let physical_plan = self
.inner
.create_physical_plan(logical_plan, session_state)
.instrument(span.clone())
.await;
if !span.is_disabled() {
match &physical_plan {
Ok(plan) => {
let physical_plan_str =
displayable(plan.as_ref()).indent(true).to_string();
span.record("physical_plan", physical_plan_str.as_str());
}
Err(e) => {
span.record("error", e.to_string().as_str());
}
}
}
physical_plan
}
}