datafusion_physical_plan/
analyze.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the ANALYZE operator
19
20use std::any::Any;
21use std::sync::Arc;
22
23use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
24use super::{
25    DisplayAs, Distribution, ExecutionPlanProperties, PlanProperties,
26    SendableRecordBatchStream,
27};
28use crate::display::DisplayableExecutionPlan;
29use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
30
31use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
32use datafusion_common::instant::Instant;
33use datafusion_common::{internal_err, DataFusionError, Result};
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::EquivalenceProperties;
36
37use futures::StreamExt;
38
39/// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
40/// discards the results, and then prints out an annotated plan with metrics
41#[derive(Debug, Clone)]
42pub struct AnalyzeExec {
43    /// Control how much extra to print
44    verbose: bool,
45    /// If statistics should be displayed
46    show_statistics: bool,
47    /// The input plan (the plan being analyzed)
48    pub(crate) input: Arc<dyn ExecutionPlan>,
49    /// The output schema for RecordBatches of this exec node
50    schema: SchemaRef,
51    cache: PlanProperties,
52}
53
54impl AnalyzeExec {
55    /// Create a new AnalyzeExec
56    pub fn new(
57        verbose: bool,
58        show_statistics: bool,
59        input: Arc<dyn ExecutionPlan>,
60        schema: SchemaRef,
61    ) -> Self {
62        let cache = Self::compute_properties(&input, Arc::clone(&schema));
63        AnalyzeExec {
64            verbose,
65            show_statistics,
66            input,
67            schema,
68            cache,
69        }
70    }
71
72    /// Access to verbose
73    pub fn verbose(&self) -> bool {
74        self.verbose
75    }
76
77    /// Access to show_statistics
78    pub fn show_statistics(&self) -> bool {
79        self.show_statistics
80    }
81
82    /// The input plan
83    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
84        &self.input
85    }
86
87    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
88    fn compute_properties(
89        input: &Arc<dyn ExecutionPlan>,
90        schema: SchemaRef,
91    ) -> PlanProperties {
92        PlanProperties::new(
93            EquivalenceProperties::new(schema),
94            Partitioning::UnknownPartitioning(1),
95            input.pipeline_behavior(),
96            input.boundedness(),
97        )
98    }
99}
100
101impl DisplayAs for AnalyzeExec {
102    fn fmt_as(
103        &self,
104        t: DisplayFormatType,
105        f: &mut std::fmt::Formatter,
106    ) -> std::fmt::Result {
107        match t {
108            DisplayFormatType::Default | DisplayFormatType::Verbose => {
109                write!(f, "AnalyzeExec verbose={}", self.verbose)
110            }
111            DisplayFormatType::TreeRender => {
112                // TODO: collect info
113                write!(f, "")
114            }
115        }
116    }
117}
118
119impl ExecutionPlan for AnalyzeExec {
120    fn name(&self) -> &'static str {
121        "AnalyzeExec"
122    }
123
124    /// Return a reference to Any that can be used for downcasting
125    fn as_any(&self) -> &dyn Any {
126        self
127    }
128
129    fn properties(&self) -> &PlanProperties {
130        &self.cache
131    }
132
133    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
134        vec![&self.input]
135    }
136
137    fn required_input_distribution(&self) -> Vec<Distribution> {
138        vec![Distribution::UnspecifiedDistribution]
139    }
140
141    fn with_new_children(
142        self: Arc<Self>,
143        mut children: Vec<Arc<dyn ExecutionPlan>>,
144    ) -> Result<Arc<dyn ExecutionPlan>> {
145        Ok(Arc::new(Self::new(
146            self.verbose,
147            self.show_statistics,
148            children.pop().unwrap(),
149            Arc::clone(&self.schema),
150        )))
151    }
152
153    fn execute(
154        &self,
155        partition: usize,
156        context: Arc<TaskContext>,
157    ) -> Result<SendableRecordBatchStream> {
158        if 0 != partition {
159            return internal_err!(
160                "AnalyzeExec invalid partition. Expected 0, got {partition}"
161            );
162        }
163
164        // Gather futures that will run each input partition in
165        // parallel (on a separate tokio task) using a JoinSet to
166        // cancel outstanding futures on drop
167        let num_input_partitions = self.input.output_partitioning().partition_count();
168        let mut builder =
169            RecordBatchReceiverStream::builder(self.schema(), num_input_partitions);
170
171        for input_partition in 0..num_input_partitions {
172            builder.run_input(
173                Arc::clone(&self.input),
174                input_partition,
175                Arc::clone(&context),
176            );
177        }
178
179        // Create future that computes the final output
180        let start = Instant::now();
181        let captured_input = Arc::clone(&self.input);
182        let captured_schema = Arc::clone(&self.schema);
183        let verbose = self.verbose;
184        let show_statistics = self.show_statistics;
185
186        // future that gathers the results from all the tasks in the
187        // JoinSet that computes the overall row count and final
188        // record batch
189        let mut input_stream = builder.build();
190        let output = async move {
191            let mut total_rows = 0;
192            while let Some(batch) = input_stream.next().await.transpose()? {
193                total_rows += batch.num_rows();
194            }
195
196            let duration = Instant::now() - start;
197            create_output_batch(
198                verbose,
199                show_statistics,
200                total_rows,
201                duration,
202                captured_input,
203                captured_schema,
204            )
205        };
206
207        Ok(Box::pin(RecordBatchStreamAdapter::new(
208            Arc::clone(&self.schema),
209            futures::stream::once(output),
210        )))
211    }
212}
213
214/// Creates the output of AnalyzeExec as a RecordBatch
215fn create_output_batch(
216    verbose: bool,
217    show_statistics: bool,
218    total_rows: usize,
219    duration: std::time::Duration,
220    input: Arc<dyn ExecutionPlan>,
221    schema: SchemaRef,
222) -> Result<RecordBatch> {
223    let mut type_builder = StringBuilder::with_capacity(1, 1024);
224    let mut plan_builder = StringBuilder::with_capacity(1, 1024);
225
226    // TODO use some sort of enum rather than strings?
227    type_builder.append_value("Plan with Metrics");
228
229    let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
230        .set_show_statistics(show_statistics)
231        .indent(verbose)
232        .to_string();
233    plan_builder.append_value(annotated_plan);
234
235    // Verbose output
236    // TODO make this more sophisticated
237    if verbose {
238        type_builder.append_value("Plan with Full Metrics");
239
240        let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref())
241            .set_show_statistics(show_statistics)
242            .indent(verbose)
243            .to_string();
244        plan_builder.append_value(annotated_plan);
245
246        type_builder.append_value("Output Rows");
247        plan_builder.append_value(total_rows.to_string());
248
249        type_builder.append_value("Duration");
250        plan_builder.append_value(format!("{duration:?}"));
251    }
252
253    RecordBatch::try_new(
254        schema,
255        vec![
256            Arc::new(type_builder.finish()),
257            Arc::new(plan_builder.finish()),
258        ],
259    )
260    .map_err(DataFusionError::from)
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use crate::{
267        collect,
268        test::{
269            assert_is_pending,
270            exec::{assert_strong_count_converges_to_zero, BlockingExec},
271        },
272    };
273
274    use arrow::datatypes::{DataType, Field, Schema};
275    use futures::FutureExt;
276
277    #[tokio::test]
278    async fn test_drop_cancel() -> Result<()> {
279        let task_ctx = Arc::new(TaskContext::default());
280        let schema =
281            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
282
283        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
284        let refs = blocking_exec.refs();
285        let analyze_exec = Arc::new(AnalyzeExec::new(true, false, blocking_exec, schema));
286
287        let fut = collect(analyze_exec, task_ctx);
288        let mut fut = fut.boxed();
289
290        assert_is_pending(&mut fut);
291        drop(fut);
292        assert_strong_count_converges_to_zero(refs).await;
293
294        Ok(())
295    }
296}