datafusion_physical_plan/
explain.rs1use std::any::Any;
21use std::sync::Arc;
22
23use super::{DisplayAs, PlanProperties, SendableRecordBatchStream};
24use crate::execution_plan::{Boundedness, EmissionType};
25use crate::stream::RecordBatchStreamAdapter;
26use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
27
28use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
29use datafusion_common::display::StringifiedPlan;
30use datafusion_common::{internal_err, Result};
31use datafusion_execution::TaskContext;
32use datafusion_physical_expr::EquivalenceProperties;
33
34use log::trace;
35
36#[derive(Debug, Clone)]
40pub struct ExplainExec {
41 schema: SchemaRef,
43 stringified_plans: Vec<StringifiedPlan>,
45 verbose: bool,
47 cache: PlanProperties,
48}
49
50impl ExplainExec {
51 pub fn new(
53 schema: SchemaRef,
54 stringified_plans: Vec<StringifiedPlan>,
55 verbose: bool,
56 ) -> Self {
57 let cache = Self::compute_properties(Arc::clone(&schema));
58 ExplainExec {
59 schema,
60 stringified_plans,
61 verbose,
62 cache,
63 }
64 }
65
66 pub fn stringified_plans(&self) -> &[StringifiedPlan] {
68 &self.stringified_plans
69 }
70
71 pub fn verbose(&self) -> bool {
73 self.verbose
74 }
75
76 fn compute_properties(schema: SchemaRef) -> PlanProperties {
78 PlanProperties::new(
79 EquivalenceProperties::new(schema),
80 Partitioning::UnknownPartitioning(1),
81 EmissionType::Final,
82 Boundedness::Bounded,
83 )
84 }
85}
86
87impl DisplayAs for ExplainExec {
88 fn fmt_as(
89 &self,
90 t: DisplayFormatType,
91 f: &mut std::fmt::Formatter,
92 ) -> std::fmt::Result {
93 match t {
94 DisplayFormatType::Default | DisplayFormatType::Verbose => {
95 write!(f, "ExplainExec")
96 }
97 DisplayFormatType::TreeRender => {
98 write!(f, "")
100 }
101 }
102 }
103}
104
105impl ExecutionPlan for ExplainExec {
106 fn name(&self) -> &'static str {
107 "ExplainExec"
108 }
109
110 fn as_any(&self) -> &dyn Any {
112 self
113 }
114
115 fn properties(&self) -> &PlanProperties {
116 &self.cache
117 }
118
119 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
120 vec![]
122 }
123
124 fn with_new_children(
125 self: Arc<Self>,
126 _: Vec<Arc<dyn ExecutionPlan>>,
127 ) -> Result<Arc<dyn ExecutionPlan>> {
128 Ok(self)
129 }
130
131 fn execute(
132 &self,
133 partition: usize,
134 context: Arc<TaskContext>,
135 ) -> Result<SendableRecordBatchStream> {
136 trace!("Start ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
137 if 0 != partition {
138 return internal_err!("ExplainExec invalid partition {partition}");
139 }
140 let mut type_builder =
141 StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
142 let mut plan_builder =
143 StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
144
145 let plans_to_print = self
146 .stringified_plans
147 .iter()
148 .filter(|s| s.should_display(self.verbose));
149
150 let mut prev: Option<&StringifiedPlan> = None;
152
153 for p in plans_to_print {
154 type_builder.append_value(p.plan_type.to_string());
155 match prev {
156 Some(prev) if !should_show(prev, p) => {
157 plan_builder.append_value("SAME TEXT AS ABOVE");
158 }
159 Some(_) | None => {
160 plan_builder.append_value(&*p.plan);
161 }
162 }
163 prev = Some(p);
164 }
165
166 let record_batch = RecordBatch::try_new(
167 Arc::clone(&self.schema),
168 vec![
169 Arc::new(type_builder.finish()),
170 Arc::new(plan_builder.finish()),
171 ],
172 )?;
173
174 trace!(
175 "Before returning RecordBatchStream in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
176
177 Ok(Box::pin(RecordBatchStreamAdapter::new(
178 Arc::clone(&self.schema),
179 futures::stream::iter(vec![Ok(record_batch)]),
180 )))
181 }
182}
183
184fn should_show(previous_plan: &StringifiedPlan, this_plan: &StringifiedPlan) -> bool {
190 (previous_plan.plan != this_plan.plan) || this_plan.should_display(false)
193}