use std::any::Any;
use std::sync::Arc;
use super::{DisplayAs, PlanProperties, SendableRecordBatchStream};
use crate::execution_plan::{Boundedness, EmissionType};
use crate::stream::RecordBatchStreamAdapter;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::display::StringifiedPlan;
use datafusion_common::{Result, assert_eq_or_internal_err};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;
use log::trace;
#[derive(Debug, Clone)]
pub struct ExplainExec {
schema: SchemaRef,
stringified_plans: Vec<StringifiedPlan>,
verbose: bool,
cache: Arc<PlanProperties>,
}
impl ExplainExec {
pub fn new(
schema: SchemaRef,
stringified_plans: Vec<StringifiedPlan>,
verbose: bool,
) -> Self {
let cache = Self::compute_properties(Arc::clone(&schema));
ExplainExec {
schema,
stringified_plans,
verbose,
cache: Arc::new(cache),
}
}
pub fn stringified_plans(&self) -> &[StringifiedPlan] {
&self.stringified_plans
}
pub fn verbose(&self) -> bool {
self.verbose
}
fn compute_properties(schema: SchemaRef) -> PlanProperties {
PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
EmissionType::Final,
Boundedness::Bounded,
)
}
}
impl DisplayAs for ExplainExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "ExplainExec")
}
DisplayFormatType::TreeRender => {
write!(f, "")
}
}
}
}
impl ExecutionPlan for ExplainExec {
fn name(&self) -> &'static str {
"ExplainExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
trace!(
"Start ExplainExec::execute for partition {} of context session_id {} and task_id {:?}",
partition,
context.session_id(),
context.task_id()
);
assert_eq_or_internal_err!(
partition,
0,
"ExplainExec invalid partition {partition}"
);
let mut type_builder =
StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
let mut plan_builder =
StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
let plans_to_print = self
.stringified_plans
.iter()
.filter(|s| s.should_display(self.verbose));
let mut prev: Option<&StringifiedPlan> = None;
for p in plans_to_print {
type_builder.append_value(p.plan_type.to_string());
match prev {
Some(prev) if !should_show(prev, p) => {
plan_builder.append_value("SAME TEXT AS ABOVE");
}
Some(_) | None => {
plan_builder.append_value(&*p.plan);
}
}
prev = Some(p);
}
let record_batch = RecordBatch::try_new(
Arc::clone(&self.schema),
vec![
Arc::new(type_builder.finish()),
Arc::new(plan_builder.finish()),
],
)?;
trace!(
"Before returning RecordBatchStream in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}",
partition,
context.session_id(),
context.task_id()
);
Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&self.schema),
futures::stream::iter(vec![Ok(record_batch)]),
)))
}
}
fn should_show(previous_plan: &StringifiedPlan, this_plan: &StringifiedPlan) -> bool {
(previous_plan.plan != this_plan.plan) || this_plan.should_display(false)
}