Skip to main content

datafusion_physical_plan/
analyze.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the ANALYZE operator
19
20use std::sync::Arc;
21
22use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
23use super::{
24    DisplayAs, Distribution, ExecutionPlanProperties, PlanProperties,
25    SendableRecordBatchStream,
26};
27use crate::display::DisplayableExecutionPlan;
28use crate::metrics::{MetricCategory, MetricType};
29use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
30
31use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
32use datafusion_common::instant::Instant;
33use datafusion_common::{DataFusionError, Result, assert_eq_or_internal_err};
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::EquivalenceProperties;
36
37use futures::StreamExt;
38
39/// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
40/// discards the results, and then prints out an annotated plan with metrics
41#[derive(Debug, Clone)]
42pub struct AnalyzeExec {
43    /// Control how much extra to print
44    verbose: bool,
45    /// If statistics should be displayed
46    show_statistics: bool,
47    /// Which metric categories should be displayed
48    metric_types: Vec<MetricType>,
49    /// Optional filter by semantic category (rows / bytes / timing).
50    metric_categories: Option<Vec<MetricCategory>>,
51    /// The input plan (the plan being analyzed)
52    pub(crate) input: Arc<dyn ExecutionPlan>,
53    /// The output schema for RecordBatches of this exec node
54    schema: SchemaRef,
55    cache: Arc<PlanProperties>,
56}
57
58impl AnalyzeExec {
59    /// Create a new AnalyzeExec
60    pub fn new(
61        verbose: bool,
62        show_statistics: bool,
63        metric_types: Vec<MetricType>,
64        metric_categories: Option<Vec<MetricCategory>>,
65        input: Arc<dyn ExecutionPlan>,
66        schema: SchemaRef,
67    ) -> Self {
68        let cache = Self::compute_properties(&input, Arc::clone(&schema));
69        AnalyzeExec {
70            verbose,
71            show_statistics,
72            metric_types,
73            metric_categories,
74            input,
75            schema,
76            cache: Arc::new(cache),
77        }
78    }
79
80    /// Access to verbose
81    pub fn verbose(&self) -> bool {
82        self.verbose
83    }
84
85    /// Access to show_statistics
86    pub fn show_statistics(&self) -> bool {
87        self.show_statistics
88    }
89
90    /// Access to metric_categories
91    pub fn metric_categories(&self) -> Option<&[MetricCategory]> {
92        self.metric_categories.as_deref()
93    }
94
95    /// The input plan
96    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
97        &self.input
98    }
99
100    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
101    fn compute_properties(
102        input: &Arc<dyn ExecutionPlan>,
103        schema: SchemaRef,
104    ) -> PlanProperties {
105        PlanProperties::new(
106            EquivalenceProperties::new(schema),
107            Partitioning::UnknownPartitioning(1),
108            input.pipeline_behavior(),
109            input.boundedness(),
110        )
111    }
112}
113
114impl DisplayAs for AnalyzeExec {
115    fn fmt_as(
116        &self,
117        t: DisplayFormatType,
118        f: &mut std::fmt::Formatter,
119    ) -> std::fmt::Result {
120        match t {
121            DisplayFormatType::Default | DisplayFormatType::Verbose => {
122                write!(f, "AnalyzeExec verbose={}", self.verbose)
123            }
124            DisplayFormatType::TreeRender => {
125                // TODO: collect info
126                write!(f, "")
127            }
128        }
129    }
130}
131
132impl ExecutionPlan for AnalyzeExec {
133    fn name(&self) -> &'static str {
134        "AnalyzeExec"
135    }
136
137    /// Return a reference to Any that can be used for downcasting
138    fn properties(&self) -> &Arc<PlanProperties> {
139        &self.cache
140    }
141
142    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
143        vec![&self.input]
144    }
145
146    fn required_input_distribution(&self) -> Vec<Distribution> {
147        vec![Distribution::UnspecifiedDistribution]
148    }
149
150    fn with_new_children(
151        self: Arc<Self>,
152        mut children: Vec<Arc<dyn ExecutionPlan>>,
153    ) -> Result<Arc<dyn ExecutionPlan>> {
154        Ok(Arc::new(Self::new(
155            self.verbose,
156            self.show_statistics,
157            self.metric_types.clone(),
158            self.metric_categories.clone(),
159            children.pop().unwrap(),
160            Arc::clone(&self.schema),
161        )))
162    }
163
164    fn execute(
165        &self,
166        partition: usize,
167        context: Arc<TaskContext>,
168    ) -> Result<SendableRecordBatchStream> {
169        assert_eq_or_internal_err!(
170            partition,
171            0,
172            "AnalyzeExec invalid partition. Expected 0, got {partition}"
173        );
174
175        // Gather futures that will run each input partition in
176        // parallel (on a separate tokio task) using a JoinSet to
177        // cancel outstanding futures on drop
178        let num_input_partitions = self.input.output_partitioning().partition_count();
179        let mut builder =
180            RecordBatchReceiverStream::builder(self.schema(), num_input_partitions);
181
182        for input_partition in 0..num_input_partitions {
183            builder.run_input(
184                Arc::clone(&self.input),
185                input_partition,
186                Arc::clone(&context),
187            );
188        }
189
190        // Create future that computes the final output
191        let start = Instant::now();
192        let captured_input = Arc::clone(&self.input);
193        let captured_schema = Arc::clone(&self.schema);
194        let verbose = self.verbose;
195        let show_statistics = self.show_statistics;
196        let metric_types = self.metric_types.clone();
197        let metric_categories = self.metric_categories.clone();
198
199        // future that gathers the results from all the tasks in the
200        // JoinSet that computes the overall row count and final
201        // record batch
202        let mut input_stream = builder.build();
203        let output = async move {
204            let mut total_rows = 0;
205            while let Some(batch) = input_stream.next().await.transpose()? {
206                total_rows += batch.num_rows();
207            }
208            drop(input_stream);
209
210            let duration = Instant::now() - start;
211            create_output_batch(
212                verbose,
213                show_statistics,
214                total_rows,
215                duration,
216                &captured_input,
217                &captured_schema,
218                &metric_types,
219                metric_categories.as_deref(),
220            )
221        };
222
223        Ok(Box::pin(RecordBatchStreamAdapter::new(
224            Arc::clone(&self.schema),
225            futures::stream::once(output),
226        )))
227    }
228}
229
230/// Creates the output of AnalyzeExec as a RecordBatch
231#[expect(clippy::too_many_arguments)]
232fn create_output_batch(
233    verbose: bool,
234    show_statistics: bool,
235    total_rows: usize,
236    duration: std::time::Duration,
237    input: &Arc<dyn ExecutionPlan>,
238    schema: &SchemaRef,
239    metric_types: &[MetricType],
240    metric_categories: Option<&[MetricCategory]>,
241) -> Result<RecordBatch> {
242    let mut type_builder = StringBuilder::with_capacity(1, 1024);
243    let mut plan_builder = StringBuilder::with_capacity(1, 1024);
244
245    // TODO use some sort of enum rather than strings?
246    type_builder.append_value("Plan with Metrics");
247
248    let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
249        .set_metric_types(metric_types.to_vec())
250        .set_metric_categories(metric_categories.map(|c| c.to_vec()))
251        .set_show_statistics(show_statistics)
252        .indent(verbose)
253        .to_string();
254    plan_builder.append_value(annotated_plan);
255
256    // Verbose output
257    // TODO make this more sophisticated
258    if verbose {
259        type_builder.append_value("Plan with Full Metrics");
260
261        let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref())
262            .set_metric_types(metric_types.to_vec())
263            .set_metric_categories(metric_categories.map(|c| c.to_vec()))
264            .set_show_statistics(show_statistics)
265            .indent(verbose)
266            .to_string();
267        plan_builder.append_value(annotated_plan);
268
269        type_builder.append_value("Output Rows");
270        plan_builder.append_value(total_rows.to_string());
271
272        type_builder.append_value("Duration");
273        plan_builder.append_value(format!("{duration:?}"));
274    }
275
276    RecordBatch::try_new(
277        Arc::clone(schema),
278        vec![
279            Arc::new(type_builder.finish()),
280            Arc::new(plan_builder.finish()),
281        ],
282    )
283    .map_err(DataFusionError::from)
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use crate::{
290        collect,
291        test::{
292            assert_is_pending,
293            exec::{BlockingExec, assert_strong_count_converges_to_zero},
294        },
295    };
296
297    use arrow::datatypes::{DataType, Field, Schema};
298    use futures::FutureExt;
299
300    #[tokio::test]
301    async fn test_drop_cancel() -> Result<()> {
302        let task_ctx = Arc::new(TaskContext::default());
303        let schema =
304            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
305
306        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
307        let refs = blocking_exec.refs();
308        let analyze_exec = Arc::new(AnalyzeExec::new(
309            true,
310            false,
311            vec![MetricType::Summary, MetricType::Dev],
312            None,
313            blocking_exec,
314            schema,
315        ));
316
317        let fut = collect(analyze_exec, task_ctx);
318        let mut fut = fut.boxed();
319
320        assert_is_pending(&mut fut);
321        drop(fut);
322        assert_strong_count_converges_to_zero(refs).await;
323
324        Ok(())
325    }
326}