datafusion_physical_plan/
analyze.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the ANALYZE operator
19
20use std::any::Any;
21use std::sync::Arc;
22
23use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
24use super::{
25    DisplayAs, Distribution, ExecutionPlanProperties, PlanProperties,
26    SendableRecordBatchStream,
27};
28use crate::display::DisplayableExecutionPlan;
29use crate::metrics::MetricType;
30use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
31
32use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
33use datafusion_common::instant::Instant;
34use datafusion_common::{internal_err, DataFusionError, Result};
35use datafusion_execution::TaskContext;
36use datafusion_physical_expr::EquivalenceProperties;
37
38use futures::StreamExt;
39
40/// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
41/// discards the results, and then prints out an annotated plan with metrics
42#[derive(Debug, Clone)]
43pub struct AnalyzeExec {
44    /// Control how much extra to print
45    verbose: bool,
46    /// If statistics should be displayed
47    show_statistics: bool,
48    /// Which metric categories should be displayed
49    metric_types: Vec<MetricType>,
50    /// The input plan (the plan being analyzed)
51    pub(crate) input: Arc<dyn ExecutionPlan>,
52    /// The output schema for RecordBatches of this exec node
53    schema: SchemaRef,
54    cache: PlanProperties,
55}
56
57impl AnalyzeExec {
58    /// Create a new AnalyzeExec
59    pub fn new(
60        verbose: bool,
61        show_statistics: bool,
62        metric_types: Vec<MetricType>,
63        input: Arc<dyn ExecutionPlan>,
64        schema: SchemaRef,
65    ) -> Self {
66        let cache = Self::compute_properties(&input, Arc::clone(&schema));
67        AnalyzeExec {
68            verbose,
69            show_statistics,
70            metric_types,
71            input,
72            schema,
73            cache,
74        }
75    }
76
77    /// Access to verbose
78    pub fn verbose(&self) -> bool {
79        self.verbose
80    }
81
82    /// Access to show_statistics
83    pub fn show_statistics(&self) -> bool {
84        self.show_statistics
85    }
86
87    /// The input plan
88    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
89        &self.input
90    }
91
92    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
93    fn compute_properties(
94        input: &Arc<dyn ExecutionPlan>,
95        schema: SchemaRef,
96    ) -> PlanProperties {
97        PlanProperties::new(
98            EquivalenceProperties::new(schema),
99            Partitioning::UnknownPartitioning(1),
100            input.pipeline_behavior(),
101            input.boundedness(),
102        )
103    }
104}
105
106impl DisplayAs for AnalyzeExec {
107    fn fmt_as(
108        &self,
109        t: DisplayFormatType,
110        f: &mut std::fmt::Formatter,
111    ) -> std::fmt::Result {
112        match t {
113            DisplayFormatType::Default | DisplayFormatType::Verbose => {
114                write!(f, "AnalyzeExec verbose={}", self.verbose)
115            }
116            DisplayFormatType::TreeRender => {
117                // TODO: collect info
118                write!(f, "")
119            }
120        }
121    }
122}
123
124impl ExecutionPlan for AnalyzeExec {
125    fn name(&self) -> &'static str {
126        "AnalyzeExec"
127    }
128
129    /// Return a reference to Any that can be used for downcasting
130    fn as_any(&self) -> &dyn Any {
131        self
132    }
133
134    fn properties(&self) -> &PlanProperties {
135        &self.cache
136    }
137
138    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
139        vec![&self.input]
140    }
141
142    fn required_input_distribution(&self) -> Vec<Distribution> {
143        vec![Distribution::UnspecifiedDistribution]
144    }
145
146    fn with_new_children(
147        self: Arc<Self>,
148        mut children: Vec<Arc<dyn ExecutionPlan>>,
149    ) -> Result<Arc<dyn ExecutionPlan>> {
150        Ok(Arc::new(Self::new(
151            self.verbose,
152            self.show_statistics,
153            self.metric_types.clone(),
154            children.pop().unwrap(),
155            Arc::clone(&self.schema),
156        )))
157    }
158
159    fn execute(
160        &self,
161        partition: usize,
162        context: Arc<TaskContext>,
163    ) -> Result<SendableRecordBatchStream> {
164        if 0 != partition {
165            return internal_err!(
166                "AnalyzeExec invalid partition. Expected 0, got {partition}"
167            );
168        }
169
170        // Gather futures that will run each input partition in
171        // parallel (on a separate tokio task) using a JoinSet to
172        // cancel outstanding futures on drop
173        let num_input_partitions = self.input.output_partitioning().partition_count();
174        let mut builder =
175            RecordBatchReceiverStream::builder(self.schema(), num_input_partitions);
176
177        for input_partition in 0..num_input_partitions {
178            builder.run_input(
179                Arc::clone(&self.input),
180                input_partition,
181                Arc::clone(&context),
182            );
183        }
184
185        // Create future that computes the final output
186        let start = Instant::now();
187        let captured_input = Arc::clone(&self.input);
188        let captured_schema = Arc::clone(&self.schema);
189        let verbose = self.verbose;
190        let show_statistics = self.show_statistics;
191        let metric_types = self.metric_types.clone();
192
193        // future that gathers the results from all the tasks in the
194        // JoinSet that computes the overall row count and final
195        // record batch
196        let mut input_stream = builder.build();
197        let output = async move {
198            let mut total_rows = 0;
199            while let Some(batch) = input_stream.next().await.transpose()? {
200                total_rows += batch.num_rows();
201            }
202
203            let duration = Instant::now() - start;
204            create_output_batch(
205                verbose,
206                show_statistics,
207                total_rows,
208                duration,
209                captured_input,
210                captured_schema,
211                &metric_types,
212            )
213        };
214
215        Ok(Box::pin(RecordBatchStreamAdapter::new(
216            Arc::clone(&self.schema),
217            futures::stream::once(output),
218        )))
219    }
220}
221
222/// Creates the output of AnalyzeExec as a RecordBatch
223fn create_output_batch(
224    verbose: bool,
225    show_statistics: bool,
226    total_rows: usize,
227    duration: std::time::Duration,
228    input: Arc<dyn ExecutionPlan>,
229    schema: SchemaRef,
230    metric_types: &[MetricType],
231) -> Result<RecordBatch> {
232    let mut type_builder = StringBuilder::with_capacity(1, 1024);
233    let mut plan_builder = StringBuilder::with_capacity(1, 1024);
234
235    // TODO use some sort of enum rather than strings?
236    type_builder.append_value("Plan with Metrics");
237
238    let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
239        .set_metric_types(metric_types.to_vec())
240        .set_show_statistics(show_statistics)
241        .indent(verbose)
242        .to_string();
243    plan_builder.append_value(annotated_plan);
244
245    // Verbose output
246    // TODO make this more sophisticated
247    if verbose {
248        type_builder.append_value("Plan with Full Metrics");
249
250        let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref())
251            .set_metric_types(metric_types.to_vec())
252            .set_show_statistics(show_statistics)
253            .indent(verbose)
254            .to_string();
255        plan_builder.append_value(annotated_plan);
256
257        type_builder.append_value("Output Rows");
258        plan_builder.append_value(total_rows.to_string());
259
260        type_builder.append_value("Duration");
261        plan_builder.append_value(format!("{duration:?}"));
262    }
263
264    RecordBatch::try_new(
265        schema,
266        vec![
267            Arc::new(type_builder.finish()),
268            Arc::new(plan_builder.finish()),
269        ],
270    )
271    .map_err(DataFusionError::from)
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277    use crate::{
278        collect,
279        test::{
280            assert_is_pending,
281            exec::{assert_strong_count_converges_to_zero, BlockingExec},
282        },
283    };
284
285    use arrow::datatypes::{DataType, Field, Schema};
286    use futures::FutureExt;
287
288    #[tokio::test]
289    async fn test_drop_cancel() -> Result<()> {
290        let task_ctx = Arc::new(TaskContext::default());
291        let schema =
292            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
293
294        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
295        let refs = blocking_exec.refs();
296        let analyze_exec = Arc::new(AnalyzeExec::new(
297            true,
298            false,
299            vec![MetricType::SUMMARY, MetricType::DEV],
300            blocking_exec,
301            schema,
302        ));
303
304        let fut = collect(analyze_exec, task_ctx);
305        let mut fut = fut.boxed();
306
307        assert_is_pending(&mut fut);
308        drop(fut);
309        assert_strong_count_converges_to_zero(refs).await;
310
311        Ok(())
312    }
313}