Skip to main content

datafusion_physical_plan/
explain.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the EXPLAIN operator
19
20use std::sync::Arc;
21
22use super::{DisplayAs, PlanProperties, SendableRecordBatchStream};
23use crate::execution_plan::{Boundedness, EmissionType};
24use crate::stream::RecordBatchStreamAdapter;
25use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
26
27use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
28use datafusion_common::display::StringifiedPlan;
29use datafusion_common::{Result, assert_eq_or_internal_err};
30use datafusion_execution::TaskContext;
31use datafusion_physical_expr::EquivalenceProperties;
32
33use log::trace;
34
35/// Explain execution plan operator. This operator contains the string
36/// values of the various plans it has when it is created, and passes
37/// them to its output.
38#[derive(Debug, Clone)]
39pub struct ExplainExec {
40    /// The schema that this exec plan node outputs
41    schema: SchemaRef,
42    /// The strings to be printed
43    stringified_plans: Vec<StringifiedPlan>,
44    /// control which plans to print
45    verbose: bool,
46    cache: Arc<PlanProperties>,
47}
48
49impl ExplainExec {
50    /// Create a new ExplainExec
51    pub fn new(
52        schema: SchemaRef,
53        stringified_plans: Vec<StringifiedPlan>,
54        verbose: bool,
55    ) -> Self {
56        let cache = Self::compute_properties(Arc::clone(&schema));
57        ExplainExec {
58            schema,
59            stringified_plans,
60            verbose,
61            cache: Arc::new(cache),
62        }
63    }
64
65    /// The strings to be printed
66    pub fn stringified_plans(&self) -> &[StringifiedPlan] {
67        &self.stringified_plans
68    }
69
70    /// Access to verbose
71    pub fn verbose(&self) -> bool {
72        self.verbose
73    }
74
75    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
76    fn compute_properties(schema: SchemaRef) -> PlanProperties {
77        PlanProperties::new(
78            EquivalenceProperties::new(schema),
79            Partitioning::UnknownPartitioning(1),
80            EmissionType::Final,
81            Boundedness::Bounded,
82        )
83    }
84}
85
86impl DisplayAs for ExplainExec {
87    fn fmt_as(
88        &self,
89        t: DisplayFormatType,
90        f: &mut std::fmt::Formatter,
91    ) -> std::fmt::Result {
92        match t {
93            DisplayFormatType::Default | DisplayFormatType::Verbose => {
94                write!(f, "ExplainExec")
95            }
96            DisplayFormatType::TreeRender => {
97                // TODO: collect info
98                write!(f, "")
99            }
100        }
101    }
102}
103
104impl ExecutionPlan for ExplainExec {
105    fn name(&self) -> &'static str {
106        "ExplainExec"
107    }
108
109    /// Return a reference to Any that can be used for downcasting
110    fn properties(&self) -> &Arc<PlanProperties> {
111        &self.cache
112    }
113
114    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
115        // This is a leaf node and has no children
116        vec![]
117    }
118
119    fn with_new_children(
120        self: Arc<Self>,
121        _: Vec<Arc<dyn ExecutionPlan>>,
122    ) -> Result<Arc<dyn ExecutionPlan>> {
123        Ok(self)
124    }
125
126    fn execute(
127        &self,
128        partition: usize,
129        context: Arc<TaskContext>,
130    ) -> Result<SendableRecordBatchStream> {
131        trace!(
132            "Start ExplainExec::execute for partition {} of context session_id {} and task_id {:?}",
133            partition,
134            context.session_id(),
135            context.task_id()
136        );
137        assert_eq_or_internal_err!(
138            partition,
139            0,
140            "ExplainExec invalid partition {partition}"
141        );
142        let mut type_builder =
143            StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
144        let mut plan_builder =
145            StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
146
147        let plans_to_print = self
148            .stringified_plans
149            .iter()
150            .filter(|s| s.should_display(self.verbose));
151
152        // Identify plans that are not changed
153        let mut prev: Option<&StringifiedPlan> = None;
154
155        for p in plans_to_print {
156            type_builder.append_value(p.plan_type.to_string());
157            match prev {
158                Some(prev) if !should_show(prev, p) => {
159                    plan_builder.append_value("SAME TEXT AS ABOVE");
160                }
161                Some(_) | None => {
162                    plan_builder.append_value(&*p.plan);
163                }
164            }
165            prev = Some(p);
166        }
167
168        let record_batch = RecordBatch::try_new(
169            Arc::clone(&self.schema),
170            vec![
171                Arc::new(type_builder.finish()),
172                Arc::new(plan_builder.finish()),
173            ],
174        )?;
175
176        trace!(
177            "Before returning RecordBatchStream in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}",
178            partition,
179            context.session_id(),
180            context.task_id()
181        );
182
183        Ok(Box::pin(RecordBatchStreamAdapter::new(
184            Arc::clone(&self.schema),
185            futures::stream::iter(vec![Ok(record_batch)]),
186        )))
187    }
188}
189
190/// If this plan should be shown, given the previous plan that was
191/// displayed.
192///
193/// This is meant to avoid repeating the same plan over and over again
194/// in explain plans to make clear what is changing
195fn should_show(previous_plan: &StringifiedPlan, this_plan: &StringifiedPlan) -> bool {
196    // if the plans are different, or if they would have been
197    // displayed in the normal explain (aka non verbose) plan
198    (previous_plan.plan != this_plan.plan) || this_plan.should_display(false)
199}