datafusion_physical_plan/
explain.rs1use std::any::Any;
21use std::sync::Arc;
22
23use super::{DisplayAs, PlanProperties, SendableRecordBatchStream};
24use crate::execution_plan::{Boundedness, EmissionType};
25use crate::stream::RecordBatchStreamAdapter;
26use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
27
28use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
29use datafusion_common::display::StringifiedPlan;
30use datafusion_common::{Result, assert_eq_or_internal_err};
31use datafusion_execution::TaskContext;
32use datafusion_physical_expr::EquivalenceProperties;
33
34use log::trace;
35
36#[derive(Debug, Clone)]
40pub struct ExplainExec {
41 schema: SchemaRef,
43 stringified_plans: Vec<StringifiedPlan>,
45 verbose: bool,
47 cache: PlanProperties,
48}
49
50impl ExplainExec {
51 pub fn new(
53 schema: SchemaRef,
54 stringified_plans: Vec<StringifiedPlan>,
55 verbose: bool,
56 ) -> Self {
57 let cache = Self::compute_properties(Arc::clone(&schema));
58 ExplainExec {
59 schema,
60 stringified_plans,
61 verbose,
62 cache,
63 }
64 }
65
66 pub fn stringified_plans(&self) -> &[StringifiedPlan] {
68 &self.stringified_plans
69 }
70
71 pub fn verbose(&self) -> bool {
73 self.verbose
74 }
75
76 fn compute_properties(schema: SchemaRef) -> PlanProperties {
78 PlanProperties::new(
79 EquivalenceProperties::new(schema),
80 Partitioning::UnknownPartitioning(1),
81 EmissionType::Final,
82 Boundedness::Bounded,
83 )
84 }
85}
86
87impl DisplayAs for ExplainExec {
88 fn fmt_as(
89 &self,
90 t: DisplayFormatType,
91 f: &mut std::fmt::Formatter,
92 ) -> std::fmt::Result {
93 match t {
94 DisplayFormatType::Default | DisplayFormatType::Verbose => {
95 write!(f, "ExplainExec")
96 }
97 DisplayFormatType::TreeRender => {
98 write!(f, "")
100 }
101 }
102 }
103}
104
105impl ExecutionPlan for ExplainExec {
106 fn name(&self) -> &'static str {
107 "ExplainExec"
108 }
109
110 fn as_any(&self) -> &dyn Any {
112 self
113 }
114
115 fn properties(&self) -> &PlanProperties {
116 &self.cache
117 }
118
119 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
120 vec![]
122 }
123
124 fn with_new_children(
125 self: Arc<Self>,
126 _: Vec<Arc<dyn ExecutionPlan>>,
127 ) -> Result<Arc<dyn ExecutionPlan>> {
128 Ok(self)
129 }
130
131 fn execute(
132 &self,
133 partition: usize,
134 context: Arc<TaskContext>,
135 ) -> Result<SendableRecordBatchStream> {
136 trace!(
137 "Start ExplainExec::execute for partition {} of context session_id {} and task_id {:?}",
138 partition,
139 context.session_id(),
140 context.task_id()
141 );
142 assert_eq_or_internal_err!(
143 partition,
144 0,
145 "ExplainExec invalid partition {partition}"
146 );
147 let mut type_builder =
148 StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
149 let mut plan_builder =
150 StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
151
152 let plans_to_print = self
153 .stringified_plans
154 .iter()
155 .filter(|s| s.should_display(self.verbose));
156
157 let mut prev: Option<&StringifiedPlan> = None;
159
160 for p in plans_to_print {
161 type_builder.append_value(p.plan_type.to_string());
162 match prev {
163 Some(prev) if !should_show(prev, p) => {
164 plan_builder.append_value("SAME TEXT AS ABOVE");
165 }
166 Some(_) | None => {
167 plan_builder.append_value(&*p.plan);
168 }
169 }
170 prev = Some(p);
171 }
172
173 let record_batch = RecordBatch::try_new(
174 Arc::clone(&self.schema),
175 vec![
176 Arc::new(type_builder.finish()),
177 Arc::new(plan_builder.finish()),
178 ],
179 )?;
180
181 trace!(
182 "Before returning RecordBatchStream in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}",
183 partition,
184 context.session_id(),
185 context.task_id()
186 );
187
188 Ok(Box::pin(RecordBatchStreamAdapter::new(
189 Arc::clone(&self.schema),
190 futures::stream::iter(vec![Ok(record_batch)]),
191 )))
192 }
193}
194
195fn should_show(previous_plan: &StringifiedPlan, this_plan: &StringifiedPlan) -> bool {
201 (previous_plan.plan != this_plan.plan) || this_plan.should_display(false)
204}