datafusion_physical_plan/
explain.rs1use std::sync::Arc;
21
22use super::{DisplayAs, PlanProperties, SendableRecordBatchStream};
23use crate::execution_plan::{Boundedness, EmissionType};
24use crate::stream::RecordBatchStreamAdapter;
25use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
26
27use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
28use datafusion_common::display::StringifiedPlan;
29use datafusion_common::{Result, assert_eq_or_internal_err};
30use datafusion_execution::TaskContext;
31use datafusion_physical_expr::EquivalenceProperties;
32
33use log::trace;
34
35#[derive(Debug, Clone)]
39pub struct ExplainExec {
40 schema: SchemaRef,
42 stringified_plans: Vec<StringifiedPlan>,
44 verbose: bool,
46 cache: Arc<PlanProperties>,
47}
48
49impl ExplainExec {
50 pub fn new(
52 schema: SchemaRef,
53 stringified_plans: Vec<StringifiedPlan>,
54 verbose: bool,
55 ) -> Self {
56 let cache = Self::compute_properties(Arc::clone(&schema));
57 ExplainExec {
58 schema,
59 stringified_plans,
60 verbose,
61 cache: Arc::new(cache),
62 }
63 }
64
65 pub fn stringified_plans(&self) -> &[StringifiedPlan] {
67 &self.stringified_plans
68 }
69
70 pub fn verbose(&self) -> bool {
72 self.verbose
73 }
74
75 fn compute_properties(schema: SchemaRef) -> PlanProperties {
77 PlanProperties::new(
78 EquivalenceProperties::new(schema),
79 Partitioning::UnknownPartitioning(1),
80 EmissionType::Final,
81 Boundedness::Bounded,
82 )
83 }
84}
85
86impl DisplayAs for ExplainExec {
87 fn fmt_as(
88 &self,
89 t: DisplayFormatType,
90 f: &mut std::fmt::Formatter,
91 ) -> std::fmt::Result {
92 match t {
93 DisplayFormatType::Default | DisplayFormatType::Verbose => {
94 write!(f, "ExplainExec")
95 }
96 DisplayFormatType::TreeRender => {
97 write!(f, "")
99 }
100 }
101 }
102}
103
104impl ExecutionPlan for ExplainExec {
105 fn name(&self) -> &'static str {
106 "ExplainExec"
107 }
108
109 fn properties(&self) -> &Arc<PlanProperties> {
111 &self.cache
112 }
113
114 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
115 vec![]
117 }
118
119 fn with_new_children(
120 self: Arc<Self>,
121 _: Vec<Arc<dyn ExecutionPlan>>,
122 ) -> Result<Arc<dyn ExecutionPlan>> {
123 Ok(self)
124 }
125
126 fn execute(
127 &self,
128 partition: usize,
129 context: Arc<TaskContext>,
130 ) -> Result<SendableRecordBatchStream> {
131 trace!(
132 "Start ExplainExec::execute for partition {} of context session_id {} and task_id {:?}",
133 partition,
134 context.session_id(),
135 context.task_id()
136 );
137 assert_eq_or_internal_err!(
138 partition,
139 0,
140 "ExplainExec invalid partition {partition}"
141 );
142 let mut type_builder =
143 StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
144 let mut plan_builder =
145 StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
146
147 let plans_to_print = self
148 .stringified_plans
149 .iter()
150 .filter(|s| s.should_display(self.verbose));
151
152 let mut prev: Option<&StringifiedPlan> = None;
154
155 for p in plans_to_print {
156 type_builder.append_value(p.plan_type.to_string());
157 match prev {
158 Some(prev) if !should_show(prev, p) => {
159 plan_builder.append_value("SAME TEXT AS ABOVE");
160 }
161 Some(_) | None => {
162 plan_builder.append_value(&*p.plan);
163 }
164 }
165 prev = Some(p);
166 }
167
168 let record_batch = RecordBatch::try_new(
169 Arc::clone(&self.schema),
170 vec![
171 Arc::new(type_builder.finish()),
172 Arc::new(plan_builder.finish()),
173 ],
174 )?;
175
176 trace!(
177 "Before returning RecordBatchStream in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}",
178 partition,
179 context.session_id(),
180 context.task_id()
181 );
182
183 Ok(Box::pin(RecordBatchStreamAdapter::new(
184 Arc::clone(&self.schema),
185 futures::stream::iter(vec![Ok(record_batch)]),
186 )))
187 }
188}
189
190fn should_show(previous_plan: &StringifiedPlan, this_plan: &StringifiedPlan) -> bool {
196 (previous_plan.plan != this_plan.plan) || this_plan.should_display(false)
199}