Skip to main content

datafusion_app/
stats.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion::{
19    datasource::{physical_plan::ParquetSource, source::DataSourceExec},
20    physical_plan::{
21        aggregates::AggregateExec,
22        filter::FilterExec,
23        joins::{
24            CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec,
25            SymmetricHashJoinExec,
26        },
27        metrics::MetricValue,
28        projection::ProjectionExec,
29        sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec},
30        visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
31    },
32};
33use itertools::Itertools;
34use std::{sync::Arc, time::Duration};
35
36#[derive(Clone, Debug)]
37pub struct ExecutionStats {
38    query: String,
39    rows: usize,
40    batches: i32,
41    bytes: usize,
42    durations: ExecutionDurationStats,
43    io: Option<ExecutionIOStats>,
44    compute: Option<ExecutionComputeStats>,
45    plan: Arc<dyn ExecutionPlan>,
46}
47
48impl ExecutionStats {
49    pub fn try_new(
50        query: String,
51        durations: ExecutionDurationStats,
52        rows: usize,
53        batches: i32,
54        bytes: usize,
55        plan: Arc<dyn ExecutionPlan>,
56    ) -> color_eyre::Result<Self> {
57        Ok(Self {
58            query,
59            durations,
60            rows,
61            batches,
62            bytes,
63            plan,
64            io: None,
65            compute: None,
66        })
67    }
68
69    pub fn collect_stats(&mut self) {
70        if let Some(io) = collect_plan_io_stats(Arc::clone(&self.plan)) {
71            self.io = Some(io)
72        }
73        if let Some(compute) = collect_plan_compute_stats(Arc::clone(&self.plan)) {
74            self.compute = Some(compute)
75        }
76    }
77
78    pub fn rows_selectivity(&self) -> f64 {
79        let maybe_io_output_rows = self.io.as_ref().and_then(|io| io.parquet_output_rows);
80        if let Some(io_output_rows) = maybe_io_output_rows {
81            self.rows as f64 / io_output_rows as f64
82        } else {
83            0.0
84        }
85    }
86
87    pub fn bytes_selectivity(&self) -> f64 {
88        let maybe_io_output_bytes = self.io.as_ref().and_then(|io| io.bytes_scanned.clone());
89        if let Some(io_output_bytes) = maybe_io_output_bytes {
90            self.bytes as f64 / io_output_bytes.as_usize() as f64
91        } else {
92            0.0
93        }
94    }
95
96    pub fn selectivity_efficiency(&self) -> f64 {
97        if let Some(io) = &self.io {
98            io.parquet_rg_pruned_stats_ratio() / self.rows_selectivity()
99        } else {
100            0.0
101        }
102    }
103}
104
105impl std::fmt::Display for ExecutionStats {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        writeln!(
108            f,
109            "========================= Query ==========================="
110        )?;
111        writeln!(f, "{}", self.query)?;
112        writeln!(
113            f,
114            "==================== Execution Summary ===================="
115        )?;
116        writeln!(
117            f,
118            "{:<20} {:<20} {:<20}",
119            "Output Rows (%)", "Output Bytes (%)", "Batches Processed",
120        )?;
121        writeln!(
122            f,
123            "{:<20} {:<20} {:<20}",
124            format!("{} ({:.2})", self.rows, self.rows_selectivity()),
125            format!("{} ({:.2})", self.bytes, self.bytes_selectivity()),
126            self.batches,
127        )?;
128        writeln!(f)?;
129        writeln!(f, "{}", self.durations)?;
130        writeln!(f, "{:<20}", "Parquet Efficiency (Pruning / Selectivity)")?;
131        writeln!(f, "{:<20.2}", self.selectivity_efficiency())?;
132        writeln!(f)?;
133        if let Some(io_stats) = &self.io {
134            writeln!(f, "{}", io_stats)?;
135        };
136        if let Some(compute_stats) = &self.compute {
137            writeln!(f, "{}", compute_stats)?;
138        };
139        Ok(())
140    }
141}
142
143#[derive(Clone, Debug)]
144pub struct ExecutionDurationStats {
145    parsing: Duration,
146    logical_planning: Duration,
147    physical_planning: Duration,
148    execution: Duration,
149    total: Duration,
150}
151
152impl ExecutionDurationStats {
153    pub fn new(
154        parsing: Duration,
155        logical_planning: Duration,
156        physical_planning: Duration,
157        execution: Duration,
158        total: Duration,
159    ) -> Self {
160        Self {
161            parsing,
162            logical_planning,
163            physical_planning,
164            execution,
165            total,
166        }
167    }
168}
169
170impl std::fmt::Display for ExecutionDurationStats {
171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172        writeln!(
173            f,
174            "{:<20} {:<20} {:<20}",
175            "Parsing", "Logical Planning", "Physical Planning"
176        )?;
177        writeln!(
178            f,
179            "{:<20?} {:<20?} {:<20?}",
180            self.parsing, self.logical_planning, self.physical_planning
181        )?;
182        writeln!(f)?;
183        writeln!(f, "{:<20} {:<20}", "Execution", "Total",)?;
184        writeln!(f, "{:<20?} {:<20?}", self.execution, self.total)?;
185        Ok(())
186    }
187}
188
189#[derive(Clone, Debug)]
190pub struct ExecutionIOStats {
191    bytes_scanned: Option<MetricValue>,
192    time_opening: Option<MetricValue>,
193    time_scanning: Option<MetricValue>,
194    parquet_output_rows: Option<usize>,
195    parquet_pruned_page_index: Option<MetricValue>,
196    parquet_matched_page_index: Option<MetricValue>,
197    parquet_rg_pruned_stats: Option<MetricValue>,
198    parquet_rg_matched_stats: Option<MetricValue>,
199    parquet_rg_pruned_bloom_filter: Option<MetricValue>,
200    parquet_rg_matched_bloom_filter: Option<MetricValue>,
201}
202
203impl ExecutionIOStats {
204    fn parquet_rg_pruned_stats_ratio(&self) -> f64 {
205        if let (Some(pruned), Some(matched)) = (
206            self.parquet_rg_matched_stats.as_ref(),
207            self.parquet_rg_pruned_stats.as_ref(),
208        ) {
209            let pruned = pruned.as_usize() as f64;
210            let matched = matched.as_usize() as f64;
211            matched / (pruned + matched)
212        } else {
213            0.0
214        }
215    }
216
217    fn parquet_rg_pruned_bloom_filter_ratio(&self) -> f64 {
218        if let (Some(pruned), Some(matched)) = (
219            self.parquet_rg_matched_bloom_filter.as_ref(),
220            self.parquet_rg_pruned_bloom_filter.as_ref(),
221        ) {
222            let pruned = pruned.as_usize() as f64;
223            let matched = matched.as_usize() as f64;
224            matched / (pruned + matched)
225        } else {
226            0.0
227        }
228    }
229
230    fn parquet_rg_pruned_page_index_ratio(&self) -> f64 {
231        if let (Some(pruned), Some(matched)) = (
232            self.parquet_matched_page_index.as_ref(),
233            self.parquet_pruned_page_index.as_ref(),
234        ) {
235            let pruned = pruned.as_usize() as f64;
236            let matched = matched.as_usize() as f64;
237            matched / (pruned + matched)
238        } else {
239            0.0
240        }
241    }
242
243    fn row_group_count(&self) -> usize {
244        if let (Some(pruned), Some(matched)) = (
245            self.parquet_rg_matched_stats.as_ref(),
246            self.parquet_rg_pruned_stats.as_ref(),
247        ) {
248            let pruned = pruned.as_usize();
249            let matched = matched.as_usize();
250            pruned + matched
251        } else {
252            0
253        }
254    }
255}
256
257impl std::fmt::Display for ExecutionIOStats {
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        writeln!(
260            f,
261            "======================= IO Summary ========================"
262        )?;
263        writeln!(
264            f,
265            "{:<20} {:<20} {:<20}",
266            "Bytes Scanned", "Time Opening", "Time Scanning"
267        )?;
268        writeln!(
269            f,
270            "{:<20} {:<20} {:<20}",
271            self.bytes_scanned
272                .as_ref()
273                .map(|m| m.to_string())
274                .unwrap_or("None".to_string()),
275            self.time_opening
276                .as_ref()
277                .map(|m| m.to_string())
278                .unwrap_or("None".to_string()),
279            self.time_scanning
280                .as_ref()
281                .map(|m| m.to_string())
282                .unwrap_or("None".to_string())
283        )?;
284        writeln!(f)?;
285        writeln!(
286            f,
287            "Parquet Pruning Stats (Output Rows: {}, Row Groups: {} [{}ms per row group])",
288            self.parquet_output_rows
289                .as_ref()
290                .map(|m| m.to_string())
291                .unwrap_or("None".to_string()),
292            self.row_group_count(),
293            self.time_scanning
294                .as_ref()
295                .map(|ts| format!(
296                    "{:.2}",
297                    (ts.as_usize() / 1_000_000) as f64 / self.row_group_count() as f64
298                ))
299                .unwrap_or("None".to_string())
300        )?;
301        writeln!(
302            f,
303            "{:<20} {:<20} {:<20}",
304            "Matched RG Stats %", "Matched RG Bloom %", "Matched Page Index %"
305        )?;
306        writeln!(
307            f,
308            "{:<20.2} {:<20.2} {:<20.2}",
309            self.parquet_rg_pruned_stats_ratio(),
310            self.parquet_rg_pruned_bloom_filter_ratio(),
311            self.parquet_rg_pruned_page_index_ratio()
312        )?;
313        Ok(())
314    }
315}
316
317/// Visitor to collect IO metrics from an execution plan
318///
319/// IO metrics are collected from nodes that perform IO operations, such as
320/// `CsvExec`, `ParquetExec`, and `ArrowExec`.
321struct PlanIOVisitor {
322    bytes_scanned: Option<MetricValue>,
323    time_opening: Option<MetricValue>,
324    time_scanning: Option<MetricValue>,
325    parquet_output_rows: Option<usize>,
326    parquet_pruned_page_index: Option<MetricValue>,
327    parquet_matched_page_index: Option<MetricValue>,
328    parquet_rg_pruned_stats: Option<MetricValue>,
329    parquet_rg_matched_stats: Option<MetricValue>,
330    parquet_rg_pruned_bloom_filter: Option<MetricValue>,
331    parquet_rg_matched_bloom_filter: Option<MetricValue>,
332}
333
334impl PlanIOVisitor {
335    fn new() -> Self {
336        Self {
337            bytes_scanned: None,
338            time_opening: None,
339            time_scanning: None,
340            parquet_output_rows: None,
341            parquet_pruned_page_index: None,
342            parquet_matched_page_index: None,
343            parquet_rg_pruned_stats: None,
344            parquet_rg_matched_stats: None,
345            parquet_rg_pruned_bloom_filter: None,
346            parquet_rg_matched_bloom_filter: None,
347        }
348    }
349
350    fn collect_io_metrics(&mut self, plan: &dyn ExecutionPlan) {
351        let io_metrics = plan.metrics();
352        if let Some(metrics) = io_metrics {
353            self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
354            self.time_opening = metrics.sum_by_name("time_elapsed_opening");
355            self.time_scanning = metrics.sum_by_name("time_elapsed_scanning_total");
356
357            if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
358                if data_source_exec
359                    .data_source()
360                    .as_any()
361                    .downcast_ref::<ParquetSource>()
362                    .is_some()
363                {
364                    self.parquet_output_rows = metrics.output_rows();
365                    self.parquet_rg_pruned_stats =
366                        metrics.sum_by_name("row_groups_pruned_statistics");
367                    self.parquet_rg_matched_stats =
368                        metrics.sum_by_name("row_groups_matched_statistics");
369                }
370            }
371        }
372    }
373}
374
375impl From<PlanIOVisitor> for ExecutionIOStats {
376    fn from(value: PlanIOVisitor) -> Self {
377        Self {
378            bytes_scanned: value.bytes_scanned,
379            time_opening: value.time_opening,
380            time_scanning: value.time_scanning,
381            parquet_output_rows: value.parquet_output_rows,
382            parquet_pruned_page_index: value.parquet_pruned_page_index,
383            parquet_matched_page_index: value.parquet_matched_page_index,
384            parquet_rg_pruned_stats: value.parquet_rg_pruned_stats,
385            parquet_rg_matched_stats: value.parquet_rg_matched_stats,
386            parquet_rg_pruned_bloom_filter: value.parquet_rg_pruned_bloom_filter,
387            parquet_rg_matched_bloom_filter: value.parquet_rg_matched_bloom_filter,
388        }
389    }
390}
391
392impl ExecutionPlanVisitor for PlanIOVisitor {
393    type Error = datafusion::common::DataFusionError;
394
395    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> color_eyre::Result<bool, Self::Error> {
396        if is_io_plan(plan) {
397            self.collect_io_metrics(plan);
398        }
399        Ok(true)
400    }
401}
402
403#[derive(Clone, Debug)]
404pub struct PartitionsComputeStats {
405    name: String,
406    /// Sorted elapsed compute times
407    elapsed_computes: Vec<usize>,
408}
409
410impl PartitionsComputeStats {
411    fn summary_stats(&self) -> (usize, usize, usize, usize, usize) {
412        if self.elapsed_computes.is_empty() {
413            (0, 0, 0, 0, 0)
414        } else {
415            let min = self.elapsed_computes[0];
416            let median = self.elapsed_computes[self.elapsed_computes.len() / 2];
417            let max = self.elapsed_computes[self.elapsed_computes.len() - 1];
418            let total: usize = self.elapsed_computes.iter().sum();
419            let mean = total / self.elapsed_computes.len();
420            (min, median, mean, max, total)
421        }
422    }
423
424    fn partitions(&self) -> usize {
425        self.elapsed_computes.len()
426    }
427}
428
429#[derive(Clone, Debug)]
430pub struct ExecutionComputeStats {
431    elapsed_compute: Option<usize>,
432    projection_compute: Option<Vec<PartitionsComputeStats>>,
433    filter_compute: Option<Vec<PartitionsComputeStats>>,
434    sort_compute: Option<Vec<PartitionsComputeStats>>,
435    join_compute: Option<Vec<PartitionsComputeStats>>,
436    aggregate_compute: Option<Vec<PartitionsComputeStats>>,
437    other_compute: Option<Vec<PartitionsComputeStats>>,
438}
439
440impl ExecutionComputeStats {
441    fn display_compute(
442        &self,
443        f: &mut std::fmt::Formatter<'_>,
444        compute: &Option<Vec<PartitionsComputeStats>>,
445        label: &str,
446    ) -> std::fmt::Result {
447        if let (Some(filter_compute), Some(elapsed_compute)) = (compute, &self.elapsed_compute) {
448            let partitions = filter_compute.iter().fold(0, |acc, c| acc + c.partitions());
449            writeln!(
450                f,
451                "{label} Stats ({} nodes, {} partitions)",
452                filter_compute.len(),
453                partitions
454            )?;
455            writeln!(
456                f,
457                "{:<30} {:<16} {:<16} {:<16} {:<16} {:<16}",
458                "Node(Partitions)", "Min", "Median", "Mean", "Max", "Total (%)"
459            )?;
460            filter_compute.iter().try_for_each(|node| {
461                let (min, median, mean, max, total) = node.summary_stats();
462                let total = format!(
463                    "{} ({:.2}%)",
464                    total,
465                    (total as f32 / *elapsed_compute as f32) * 100.0
466                );
467                writeln!(
468                    f,
469                    "{:<30} {:<16} {:<16} {:<16} {:<16} {:<16}",
470                    format!("{}({})", node.name, node.elapsed_computes.len()),
471                    min,
472                    median,
473                    mean,
474                    max,
475                    total,
476                )
477            })
478        } else {
479            writeln!(f, "No {label} Stats")
480        }
481    }
482}
483
484impl std::fmt::Display for ExecutionComputeStats {
485    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486        writeln!(
487            f,
488            "==================================== Compute Summary ====================================="
489        )?;
490        writeln!(f, "{:<20}", "Elapsed Compute",)?;
491        writeln!(
492            f,
493            "{:<20}",
494            self.elapsed_compute
495                .as_ref()
496                .map(|m| m.to_string())
497                .unwrap_or("None".to_string()),
498        )?;
499        writeln!(f)?;
500        self.display_compute(f, &self.projection_compute, "Projection")?;
501        writeln!(f)?;
502        self.display_compute(f, &self.filter_compute, "Filter")?;
503        writeln!(f)?;
504        self.display_compute(f, &self.sort_compute, "Sort")?;
505        writeln!(f)?;
506        self.display_compute(f, &self.join_compute, "Join")?;
507        writeln!(f)?;
508        self.display_compute(f, &self.aggregate_compute, "Aggregate")?;
509        writeln!(f)?;
510        self.display_compute(f, &self.other_compute, "Other")?;
511        writeln!(f)
512    }
513}
514
515#[derive(Default)]
516pub struct PlanComputeVisitor {
517    elapsed_compute: Option<usize>,
518    filter_computes: Vec<PartitionsComputeStats>,
519    sort_computes: Vec<PartitionsComputeStats>,
520    projection_computes: Vec<PartitionsComputeStats>,
521    join_computes: Vec<PartitionsComputeStats>,
522    aggregate_computes: Vec<PartitionsComputeStats>,
523    other_computes: Vec<PartitionsComputeStats>,
524}
525
526impl PlanComputeVisitor {
527    fn add_elapsed_compute(&mut self, node_elapsed_compute: Option<usize>) {
528        match (self.elapsed_compute, node_elapsed_compute) {
529            (Some(agg_elapsed_compute), Some(node_elapsed_compute)) => {
530                self.elapsed_compute = Some(agg_elapsed_compute + node_elapsed_compute)
531            }
532            (Some(_), None) | (None, None) => {}
533            (None, Some(node_elapsed_compute)) => self.elapsed_compute = Some(node_elapsed_compute),
534        }
535    }
536
537    fn collect_compute_metrics(&mut self, plan: &dyn ExecutionPlan) {
538        let compute_metrics = plan.metrics();
539        if let Some(metrics) = compute_metrics {
540            self.add_elapsed_compute(metrics.elapsed_compute());
541        }
542        self.collect_filter_metrics(plan);
543        self.collect_sort_metrics(plan);
544        self.collect_projection_metrics(plan);
545        self.collect_join_metrics(plan);
546        self.collect_aggregate_metrics(plan);
547        self.collect_other_metrics(plan);
548    }
549
550    // TODO: Refactor to have a single function that takes predicate and collector
551    fn collect_filter_metrics(&mut self, plan: &dyn ExecutionPlan) {
552        if is_filter_plan(plan) {
553            if let Some(metrics) = plan.metrics() {
554                let sorted_computes: Vec<usize> = metrics
555                    .iter()
556                    .filter_map(|m| match m.value() {
557                        MetricValue::ElapsedCompute(t) => Some(t.value()),
558                        _ => None,
559                    })
560                    .sorted()
561                    .collect();
562                let p = PartitionsComputeStats {
563                    name: plan.name().to_string(),
564                    elapsed_computes: sorted_computes,
565                };
566                self.filter_computes.push(p)
567            }
568        }
569    }
570
571    fn collect_sort_metrics(&mut self, plan: &dyn ExecutionPlan) {
572        if is_sort_plan(plan) {
573            if let Some(metrics) = plan.metrics() {
574                let sorted_computes: Vec<usize> = metrics
575                    .iter()
576                    .filter_map(|m| match m.value() {
577                        MetricValue::ElapsedCompute(t) => Some(t.value()),
578                        _ => None,
579                    })
580                    .sorted()
581                    .collect();
582                let p = PartitionsComputeStats {
583                    name: plan.name().to_string(),
584                    elapsed_computes: sorted_computes,
585                };
586                self.sort_computes.push(p)
587            }
588        }
589    }
590
591    fn collect_projection_metrics(&mut self, plan: &dyn ExecutionPlan) {
592        if is_projection_plan(plan) {
593            if let Some(metrics) = plan.metrics() {
594                let sorted_computes: Vec<usize> = metrics
595                    .iter()
596                    .filter_map(|m| match m.value() {
597                        MetricValue::ElapsedCompute(t) => Some(t.value()),
598                        _ => None,
599                    })
600                    .sorted()
601                    .collect();
602                let p = PartitionsComputeStats {
603                    name: plan.name().to_string(),
604                    elapsed_computes: sorted_computes,
605                };
606                self.projection_computes.push(p)
607            }
608        }
609    }
610
611    fn collect_join_metrics(&mut self, plan: &dyn ExecutionPlan) {
612        if is_join_plan(plan) {
613            if let Some(metrics) = plan.metrics() {
614                let sorted_computes: Vec<usize> = metrics
615                    .iter()
616                    .filter_map(|m| match m.value() {
617                        MetricValue::ElapsedCompute(t) => Some(t.value()),
618                        _ => None,
619                    })
620                    .sorted()
621                    .collect();
622                let p = PartitionsComputeStats {
623                    name: plan.name().to_string(),
624                    elapsed_computes: sorted_computes,
625                };
626                self.join_computes.push(p)
627            }
628        }
629    }
630
631    fn collect_aggregate_metrics(&mut self, plan: &dyn ExecutionPlan) {
632        if is_aggregate_plan(plan) {
633            if let Some(metrics) = plan.metrics() {
634                let sorted_computes: Vec<usize> = metrics
635                    .iter()
636                    .filter_map(|m| match m.value() {
637                        MetricValue::ElapsedCompute(t) => Some(t.value()),
638                        _ => None,
639                    })
640                    .sorted()
641                    .collect();
642                let p = PartitionsComputeStats {
643                    name: plan.name().to_string(),
644                    elapsed_computes: sorted_computes,
645                };
646                self.aggregate_computes.push(p)
647            }
648        }
649    }
650
651    fn collect_other_metrics(&mut self, plan: &dyn ExecutionPlan) {
652        if !is_filter_plan(plan)
653            && !is_sort_plan(plan)
654            && !is_projection_plan(plan)
655            && !is_aggregate_plan(plan)
656            && !is_join_plan(plan)
657        {
658            if let Some(metrics) = plan.metrics() {
659                let sorted_computes: Vec<usize> = metrics
660                    .iter()
661                    .filter_map(|m| match m.value() {
662                        MetricValue::ElapsedCompute(t) => Some(t.value()),
663                        _ => None,
664                    })
665                    .sorted()
666                    .collect();
667                let p = PartitionsComputeStats {
668                    name: plan.name().to_string(),
669                    elapsed_computes: sorted_computes,
670                };
671                self.other_computes.push(p)
672            }
673        }
674    }
675}
676
677fn is_filter_plan(plan: &dyn ExecutionPlan) -> bool {
678    plan.as_any().downcast_ref::<FilterExec>().is_some()
679}
680
681fn is_sort_plan(plan: &dyn ExecutionPlan) -> bool {
682    plan.as_any().downcast_ref::<SortExec>().is_some()
683        || plan
684            .as_any()
685            .downcast_ref::<SortPreservingMergeExec>()
686            .is_some()
687}
688
689fn is_projection_plan(plan: &dyn ExecutionPlan) -> bool {
690    plan.as_any().downcast_ref::<ProjectionExec>().is_some()
691}
692
693fn is_join_plan(plan: &dyn ExecutionPlan) -> bool {
694    plan.as_any().downcast_ref::<HashJoinExec>().is_some()
695        || plan.as_any().downcast_ref::<CrossJoinExec>().is_some()
696        || plan.as_any().downcast_ref::<SortMergeJoinExec>().is_some()
697        || plan.as_any().downcast_ref::<NestedLoopJoinExec>().is_some()
698        || plan
699            .as_any()
700            .downcast_ref::<SymmetricHashJoinExec>()
701            .is_some()
702}
703
704fn is_aggregate_plan(plan: &dyn ExecutionPlan) -> bool {
705    plan.as_any().downcast_ref::<AggregateExec>().is_some()
706}
707
708impl From<PlanComputeVisitor> for ExecutionComputeStats {
709    fn from(value: PlanComputeVisitor) -> Self {
710        Self {
711            elapsed_compute: value.elapsed_compute,
712            filter_compute: Some(value.filter_computes),
713            sort_compute: Some(value.sort_computes),
714            projection_compute: Some(value.projection_computes),
715            join_compute: Some(value.join_computes),
716            aggregate_compute: Some(value.aggregate_computes),
717            other_compute: Some(value.other_computes),
718        }
719    }
720}
721
722impl ExecutionPlanVisitor for PlanComputeVisitor {
723    type Error = datafusion::common::DataFusionError;
724
725    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> color_eyre::Result<bool, Self::Error> {
726        if !is_io_plan(plan) {
727            self.collect_compute_metrics(plan);
728        }
729        Ok(true)
730    }
731}
732
733fn is_io_plan(plan: &dyn ExecutionPlan) -> bool {
734    let io_plans = ["CsvExec", "ParquetExec", "ArrowExec"];
735    io_plans.contains(&plan.name())
736}
737
738pub fn collect_plan_io_stats(plan: Arc<dyn ExecutionPlan>) -> Option<ExecutionIOStats> {
739    let mut visitor = PlanIOVisitor::new();
740    if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() {
741        Some(visitor.into())
742    } else {
743        None
744    }
745}
746
747pub fn collect_plan_compute_stats(plan: Arc<dyn ExecutionPlan>) -> Option<ExecutionComputeStats> {
748    let mut visitor = PlanComputeVisitor::default();
749    if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() {
750        Some(visitor.into())
751    } else {
752        None
753    }
754}
755
756pub fn print_io_summary(plan: Arc<dyn ExecutionPlan>) {
757    println!("======================= IO Summary ========================");
758    if let Some(stats) = collect_plan_io_stats(plan) {
759        println!("IO Stats: {:#?}", stats);
760    } else {
761        println!("No IO metrics found");
762    }
763}