Skip to main content

datafusion_physical_plan/
explain.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the EXPLAIN operator
19
20use std::any::Any;
21use std::sync::Arc;
22
23use super::{DisplayAs, PlanProperties, SendableRecordBatchStream};
24use crate::execution_plan::{Boundedness, EmissionType};
25use crate::stream::RecordBatchStreamAdapter;
26use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
27
28use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
29use datafusion_common::display::StringifiedPlan;
30use datafusion_common::{Result, assert_eq_or_internal_err};
31use datafusion_execution::TaskContext;
32use datafusion_physical_expr::EquivalenceProperties;
33
34use log::trace;
35
36/// Explain execution plan operator. This operator contains the string
37/// values of the various plans it has when it is created, and passes
38/// them to its output.
39#[derive(Debug, Clone)]
40pub struct ExplainExec {
41    /// The schema that this exec plan node outputs
42    schema: SchemaRef,
43    /// The strings to be printed
44    stringified_plans: Vec<StringifiedPlan>,
45    /// control which plans to print
46    verbose: bool,
47    cache: PlanProperties,
48}
49
50impl ExplainExec {
51    /// Create a new ExplainExec
52    pub fn new(
53        schema: SchemaRef,
54        stringified_plans: Vec<StringifiedPlan>,
55        verbose: bool,
56    ) -> Self {
57        let cache = Self::compute_properties(Arc::clone(&schema));
58        ExplainExec {
59            schema,
60            stringified_plans,
61            verbose,
62            cache,
63        }
64    }
65
66    /// The strings to be printed
67    pub fn stringified_plans(&self) -> &[StringifiedPlan] {
68        &self.stringified_plans
69    }
70
71    /// Access to verbose
72    pub fn verbose(&self) -> bool {
73        self.verbose
74    }
75
76    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
77    fn compute_properties(schema: SchemaRef) -> PlanProperties {
78        PlanProperties::new(
79            EquivalenceProperties::new(schema),
80            Partitioning::UnknownPartitioning(1),
81            EmissionType::Final,
82            Boundedness::Bounded,
83        )
84    }
85}
86
87impl DisplayAs for ExplainExec {
88    fn fmt_as(
89        &self,
90        t: DisplayFormatType,
91        f: &mut std::fmt::Formatter,
92    ) -> std::fmt::Result {
93        match t {
94            DisplayFormatType::Default | DisplayFormatType::Verbose => {
95                write!(f, "ExplainExec")
96            }
97            DisplayFormatType::TreeRender => {
98                // TODO: collect info
99                write!(f, "")
100            }
101        }
102    }
103}
104
105impl ExecutionPlan for ExplainExec {
106    fn name(&self) -> &'static str {
107        "ExplainExec"
108    }
109
110    /// Return a reference to Any that can be used for downcasting
111    fn as_any(&self) -> &dyn Any {
112        self
113    }
114
115    fn properties(&self) -> &PlanProperties {
116        &self.cache
117    }
118
119    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
120        // This is a leaf node and has no children
121        vec![]
122    }
123
124    fn with_new_children(
125        self: Arc<Self>,
126        _: Vec<Arc<dyn ExecutionPlan>>,
127    ) -> Result<Arc<dyn ExecutionPlan>> {
128        Ok(self)
129    }
130
131    fn execute(
132        &self,
133        partition: usize,
134        context: Arc<TaskContext>,
135    ) -> Result<SendableRecordBatchStream> {
136        trace!(
137            "Start ExplainExec::execute for partition {} of context session_id {} and task_id {:?}",
138            partition,
139            context.session_id(),
140            context.task_id()
141        );
142        assert_eq_or_internal_err!(
143            partition,
144            0,
145            "ExplainExec invalid partition {partition}"
146        );
147        let mut type_builder =
148            StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
149        let mut plan_builder =
150            StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
151
152        let plans_to_print = self
153            .stringified_plans
154            .iter()
155            .filter(|s| s.should_display(self.verbose));
156
157        // Identify plans that are not changed
158        let mut prev: Option<&StringifiedPlan> = None;
159
160        for p in plans_to_print {
161            type_builder.append_value(p.plan_type.to_string());
162            match prev {
163                Some(prev) if !should_show(prev, p) => {
164                    plan_builder.append_value("SAME TEXT AS ABOVE");
165                }
166                Some(_) | None => {
167                    plan_builder.append_value(&*p.plan);
168                }
169            }
170            prev = Some(p);
171        }
172
173        let record_batch = RecordBatch::try_new(
174            Arc::clone(&self.schema),
175            vec![
176                Arc::new(type_builder.finish()),
177                Arc::new(plan_builder.finish()),
178            ],
179        )?;
180
181        trace!(
182            "Before returning RecordBatchStream in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}",
183            partition,
184            context.session_id(),
185            context.task_id()
186        );
187
188        Ok(Box::pin(RecordBatchStreamAdapter::new(
189            Arc::clone(&self.schema),
190            futures::stream::iter(vec![Ok(record_batch)]),
191        )))
192    }
193}
194
195/// If this plan should be shown, given the previous plan that was
196/// displayed.
197///
198/// This is meant to avoid repeating the same plan over and over again
199/// in explain plans to make clear what is changing
200fn should_show(previous_plan: &StringifiedPlan, this_plan: &StringifiedPlan) -> bool {
201    // if the plans are different, or if they would have been
202    // displayed in the normal explain (aka non verbose) plan
203    (previous_plan.plan != this_plan.plan) || this_plan.should_display(false)
204}