datafusion_dft/execution/
stats.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion::{
19    datasource::physical_plan::ParquetExec,
20    physical_plan::{
21        aggregates::AggregateExec,
22        filter::FilterExec,
23        joins::{
24            CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec,
25            SymmetricHashJoinExec,
26        },
27        metrics::MetricValue,
28        projection::ProjectionExec,
29        sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec},
30        visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
31    },
32};
33use itertools::Itertools;
34use std::{sync::Arc, time::Duration};
35
36#[derive(Clone, Debug)]
37pub struct ExecutionStats {
38    query: String,
39    rows: usize,
40    batches: i32,
41    bytes: usize,
42    durations: ExecutionDurationStats,
43    io: Option<ExecutionIOStats>,
44    compute: Option<ExecutionComputeStats>,
45    plan: Arc<dyn ExecutionPlan>,
46}
47
48impl ExecutionStats {
49    pub fn try_new(
50        query: String,
51        durations: ExecutionDurationStats,
52        rows: usize,
53        batches: i32,
54        bytes: usize,
55        plan: Arc<dyn ExecutionPlan>,
56    ) -> color_eyre::Result<Self> {
57        Ok(Self {
58            query,
59            durations,
60            rows,
61            batches,
62            bytes,
63            plan,
64            io: None,
65            compute: None,
66        })
67    }
68
69    pub fn collect_stats(&mut self) {
70        if let Some(io) = collect_plan_io_stats(Arc::clone(&self.plan)) {
71            self.io = Some(io)
72        }
73        if let Some(compute) = collect_plan_compute_stats(Arc::clone(&self.plan)) {
74            self.compute = Some(compute)
75        }
76    }
77
78    pub fn rows_selectivity(&self) -> f64 {
79        let maybe_io_output_rows = self.io.as_ref().and_then(|io| io.parquet_output_rows);
80        if let Some(io_output_rows) = maybe_io_output_rows {
81            self.rows as f64 / io_output_rows as f64
82        } else {
83            0.0
84        }
85    }
86
87    pub fn bytes_selectivity(&self) -> f64 {
88        let maybe_io_output_bytes = self.io.as_ref().and_then(|io| io.bytes_scanned.clone());
89        if let Some(io_output_bytes) = maybe_io_output_bytes {
90            self.bytes as f64 / io_output_bytes.as_usize() as f64
91        } else {
92            0.0
93        }
94    }
95
96    pub fn selectivity_efficiency(&self) -> f64 {
97        if let Some(io) = &self.io {
98            io.parquet_rg_pruned_stats_ratio() / self.rows_selectivity()
99        } else {
100            0.0
101        }
102    }
103}
104
105impl std::fmt::Display for ExecutionStats {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        writeln!(
108            f,
109            "========================= Query ==========================="
110        )?;
111        writeln!(f, "{}", self.query)?;
112        writeln!(
113            f,
114            "==================== Execution Summary ===================="
115        )?;
116        writeln!(
117            f,
118            "{:<20} {:<20} {:<20}",
119            "Output Rows (%)", "Output Bytes (%)", "Batches Processed",
120        )?;
121        writeln!(
122            f,
123            "{:<20} {:<20} {:<20}",
124            format!("{} ({:.2})", self.rows, self.rows_selectivity()),
125            format!("{} ({:.2})", self.bytes, self.bytes_selectivity()),
126            self.batches,
127        )?;
128        writeln!(f)?;
129        writeln!(f, "{}", self.durations)?;
130        writeln!(f, "{:<20}", "Parquet Efficiency (Pruning / Selectivity)")?;
131        writeln!(f, "{:<20.2}", self.selectivity_efficiency())?;
132        writeln!(f)?;
133        if let Some(io_stats) = &self.io {
134            writeln!(f, "{}", io_stats)?;
135        };
136        if let Some(compute_stats) = &self.compute {
137            writeln!(f, "{}", compute_stats)?;
138        };
139        Ok(())
140    }
141}
142
143#[derive(Clone, Debug)]
144pub struct ExecutionDurationStats {
145    parsing: Duration,
146    logical_planning: Duration,
147    physical_planning: Duration,
148    execution: Duration,
149    total: Duration,
150}
151
152impl ExecutionDurationStats {
153    pub fn new(
154        parsing: Duration,
155        logical_planning: Duration,
156        physical_planning: Duration,
157        execution: Duration,
158        total: Duration,
159    ) -> Self {
160        Self {
161            parsing,
162            logical_planning,
163            physical_planning,
164            execution,
165            total,
166        }
167    }
168}
169
170impl std::fmt::Display for ExecutionDurationStats {
171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172        writeln!(
173            f,
174            "{:<20} {:<20} {:<20}",
175            "Parsing", "Logical Planning", "Physical Planning"
176        )?;
177        writeln!(
178            f,
179            "{:<20?} {:<20?} {:<20?}",
180            self.parsing, self.logical_planning, self.physical_planning
181        )?;
182        writeln!(f)?;
183        writeln!(f, "{:<20} {:<20}", "Execution", "Total",)?;
184        writeln!(f, "{:<20?} {:<20?}", self.execution, self.total)?;
185        Ok(())
186    }
187}
188
189#[derive(Clone, Debug)]
190pub struct ExecutionIOStats {
191    bytes_scanned: Option<MetricValue>,
192    time_opening: Option<MetricValue>,
193    time_scanning: Option<MetricValue>,
194    parquet_output_rows: Option<usize>,
195    parquet_pruned_page_index: Option<MetricValue>,
196    parquet_matched_page_index: Option<MetricValue>,
197    parquet_rg_pruned_stats: Option<MetricValue>,
198    parquet_rg_matched_stats: Option<MetricValue>,
199    parquet_rg_pruned_bloom_filter: Option<MetricValue>,
200    parquet_rg_matched_bloom_filter: Option<MetricValue>,
201}
202
203impl ExecutionIOStats {
204    fn parquet_rg_pruned_stats_ratio(&self) -> f64 {
205        if let (Some(pruned), Some(matched)) = (
206            self.parquet_rg_matched_stats.as_ref(),
207            self.parquet_rg_pruned_stats.as_ref(),
208        ) {
209            let pruned = pruned.as_usize() as f64;
210            let matched = matched.as_usize() as f64;
211            matched / (pruned + matched)
212        } else {
213            0.0
214        }
215    }
216
217    fn parquet_rg_pruned_bloom_filter_ratio(&self) -> f64 {
218        if let (Some(pruned), Some(matched)) = (
219            self.parquet_rg_matched_bloom_filter.as_ref(),
220            self.parquet_rg_pruned_bloom_filter.as_ref(),
221        ) {
222            let pruned = pruned.as_usize() as f64;
223            let matched = matched.as_usize() as f64;
224            matched / (pruned + matched)
225        } else {
226            0.0
227        }
228    }
229
230    fn parquet_rg_pruned_page_index_ratio(&self) -> f64 {
231        if let (Some(pruned), Some(matched)) = (
232            self.parquet_matched_page_index.as_ref(),
233            self.parquet_pruned_page_index.as_ref(),
234        ) {
235            let pruned = pruned.as_usize() as f64;
236            let matched = matched.as_usize() as f64;
237            matched / (pruned + matched)
238        } else {
239            0.0
240        }
241    }
242
243    fn row_group_count(&self) -> usize {
244        if let (Some(pruned), Some(matched)) = (
245            self.parquet_rg_matched_stats.as_ref(),
246            self.parquet_rg_pruned_stats.as_ref(),
247        ) {
248            let pruned = pruned.as_usize();
249            let matched = matched.as_usize();
250            pruned + matched
251        } else {
252            0
253        }
254    }
255}
256
257impl std::fmt::Display for ExecutionIOStats {
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        writeln!(
260            f,
261            "======================= IO Summary ========================"
262        )?;
263        writeln!(
264            f,
265            "{:<20} {:<20} {:<20}",
266            "Bytes Scanned", "Time Opening", "Time Scanning"
267        )?;
268        writeln!(
269            f,
270            "{:<20} {:<20} {:<20}",
271            self.bytes_scanned
272                .as_ref()
273                .map(|m| m.to_string())
274                .unwrap_or("None".to_string()),
275            self.time_opening
276                .as_ref()
277                .map(|m| m.to_string())
278                .unwrap_or("None".to_string()),
279            self.time_scanning
280                .as_ref()
281                .map(|m| m.to_string())
282                .unwrap_or("None".to_string())
283        )?;
284        writeln!(f)?;
285        writeln!(
286            f,
287            "Parquet Pruning Stats (Output Rows: {}, Row Groups: {} [{}ms per row group])",
288            self.parquet_output_rows
289                .as_ref()
290                .map(|m| m.to_string())
291                .unwrap_or("None".to_string()),
292            self.row_group_count(),
293            self.time_scanning
294                .as_ref()
295                .map(|ts| format!(
296                    "{:.2}",
297                    (ts.as_usize() / 1_000_000) as f64 / self.row_group_count() as f64
298                ))
299                .unwrap_or("None".to_string())
300        )?;
301        writeln!(
302            f,
303            "{:<20} {:<20} {:<20}",
304            "Matched RG Stats %", "Matched RG Bloom %", "Matched Page Index %"
305        )?;
306        writeln!(
307            f,
308            "{:<20.2} {:<20.2} {:<20.2}",
309            self.parquet_rg_pruned_stats_ratio(),
310            self.parquet_rg_pruned_bloom_filter_ratio(),
311            self.parquet_rg_pruned_page_index_ratio()
312        )?;
313        Ok(())
314    }
315}
316
317/// Visitor to collect IO metrics from an execution plan
318///
319/// IO metrics are collected from nodes that perform IO operations, such as
320/// `CsvExec`, `ParquetExec`, and `ArrowExec`.
321struct PlanIOVisitor {
322    bytes_scanned: Option<MetricValue>,
323    time_opening: Option<MetricValue>,
324    time_scanning: Option<MetricValue>,
325    parquet_output_rows: Option<usize>,
326    parquet_pruned_page_index: Option<MetricValue>,
327    parquet_matched_page_index: Option<MetricValue>,
328    parquet_rg_pruned_stats: Option<MetricValue>,
329    parquet_rg_matched_stats: Option<MetricValue>,
330    parquet_rg_pruned_bloom_filter: Option<MetricValue>,
331    parquet_rg_matched_bloom_filter: Option<MetricValue>,
332}
333
334impl PlanIOVisitor {
335    fn new() -> Self {
336        Self {
337            bytes_scanned: None,
338            time_opening: None,
339            time_scanning: None,
340            parquet_output_rows: None,
341            parquet_pruned_page_index: None,
342            parquet_matched_page_index: None,
343            parquet_rg_pruned_stats: None,
344            parquet_rg_matched_stats: None,
345            parquet_rg_pruned_bloom_filter: None,
346            parquet_rg_matched_bloom_filter: None,
347        }
348    }
349
350    fn collect_io_metrics(&mut self, plan: &dyn ExecutionPlan) {
351        let io_metrics = plan.metrics();
352        if let Some(metrics) = io_metrics {
353            self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
354            self.time_opening = metrics.sum_by_name("time_elapsed_opening");
355            self.time_scanning = metrics.sum_by_name("time_elapsed_scanning_total");
356
357            if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
358                self.parquet_output_rows = metrics.output_rows();
359                self.parquet_rg_pruned_stats = metrics.sum_by_name("row_groups_pruned_statistics");
360                self.parquet_rg_matched_stats =
361                    metrics.sum_by_name("row_groups_matched_statistics");
362            }
363        }
364    }
365}
366
367impl From<PlanIOVisitor> for ExecutionIOStats {
368    fn from(value: PlanIOVisitor) -> Self {
369        Self {
370            bytes_scanned: value.bytes_scanned,
371            time_opening: value.time_opening,
372            time_scanning: value.time_scanning,
373            parquet_output_rows: value.parquet_output_rows,
374            parquet_pruned_page_index: value.parquet_pruned_page_index,
375            parquet_matched_page_index: value.parquet_matched_page_index,
376            parquet_rg_pruned_stats: value.parquet_rg_pruned_stats,
377            parquet_rg_matched_stats: value.parquet_rg_matched_stats,
378            parquet_rg_pruned_bloom_filter: value.parquet_rg_pruned_bloom_filter,
379            parquet_rg_matched_bloom_filter: value.parquet_rg_matched_bloom_filter,
380        }
381    }
382}
383
384impl ExecutionPlanVisitor for PlanIOVisitor {
385    type Error = datafusion_common::DataFusionError;
386
387    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> color_eyre::Result<bool, Self::Error> {
388        if is_io_plan(plan) {
389            self.collect_io_metrics(plan);
390        }
391        Ok(true)
392    }
393}
394
395#[derive(Clone, Debug)]
396pub struct PartitionsComputeStats {
397    name: String,
398    /// Sorted elapsed compute times
399    elapsed_computes: Vec<usize>,
400}
401
402impl PartitionsComputeStats {
403    fn summary_stats(&self) -> (usize, usize, usize, usize, usize) {
404        if self.elapsed_computes.is_empty() {
405            (0, 0, 0, 0, 0)
406        } else {
407            let min = self.elapsed_computes[0];
408            let median = self.elapsed_computes[self.elapsed_computes.len() / 2];
409            let max = self.elapsed_computes[self.elapsed_computes.len() - 1];
410            let total: usize = self.elapsed_computes.iter().sum();
411            let mean = total / self.elapsed_computes.len();
412            (min, median, mean, max, total)
413        }
414    }
415
416    fn partitions(&self) -> usize {
417        self.elapsed_computes.len()
418    }
419}
420
421#[derive(Clone, Debug)]
422pub struct ExecutionComputeStats {
423    elapsed_compute: Option<usize>,
424    projection_compute: Option<Vec<PartitionsComputeStats>>,
425    filter_compute: Option<Vec<PartitionsComputeStats>>,
426    sort_compute: Option<Vec<PartitionsComputeStats>>,
427    join_compute: Option<Vec<PartitionsComputeStats>>,
428    aggregate_compute: Option<Vec<PartitionsComputeStats>>,
429    other_compute: Option<Vec<PartitionsComputeStats>>,
430}
431
432impl ExecutionComputeStats {
433    fn display_compute(
434        &self,
435        f: &mut std::fmt::Formatter<'_>,
436        compute: &Option<Vec<PartitionsComputeStats>>,
437        label: &str,
438    ) -> std::fmt::Result {
439        if let (Some(filter_compute), Some(elapsed_compute)) = (compute, &self.elapsed_compute) {
440            let partitions = filter_compute.iter().fold(0, |acc, c| acc + c.partitions());
441            writeln!(
442                f,
443                "{label} Stats ({} nodes, {} partitions)",
444                filter_compute.len(),
445                partitions
446            )?;
447            writeln!(
448                f,
449                "{:<30} {:<16} {:<16} {:<16} {:<16} {:<16}",
450                "Node(Partitions)", "Min", "Median", "Mean", "Max", "Total (%)"
451            )?;
452            filter_compute.iter().try_for_each(|node| {
453                let (min, median, mean, max, total) = node.summary_stats();
454                let total = format!(
455                    "{} ({:.2}%)",
456                    total,
457                    (total as f32 / *elapsed_compute as f32) * 100.0
458                );
459                writeln!(
460                    f,
461                    "{:<30} {:<16} {:<16} {:<16} {:<16} {:<16}",
462                    format!("{}({})", node.name, node.elapsed_computes.len()),
463                    min,
464                    median,
465                    mean,
466                    max,
467                    total,
468                )
469            })
470        } else {
471            writeln!(f, "No {label} Stats")
472        }
473    }
474}
475
476impl std::fmt::Display for ExecutionComputeStats {
477    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
478        writeln!(
479            f,
480            "==================================== Compute Summary ====================================="
481        )?;
482        writeln!(f, "{:<20}", "Elapsed Compute",)?;
483        writeln!(
484            f,
485            "{:<20}",
486            self.elapsed_compute
487                .as_ref()
488                .map(|m| m.to_string())
489                .unwrap_or("None".to_string()),
490        )?;
491        writeln!(f)?;
492        self.display_compute(f, &self.projection_compute, "Projection")?;
493        writeln!(f)?;
494        self.display_compute(f, &self.filter_compute, "Filter")?;
495        writeln!(f)?;
496        self.display_compute(f, &self.sort_compute, "Sort")?;
497        writeln!(f)?;
498        self.display_compute(f, &self.join_compute, "Join")?;
499        writeln!(f)?;
500        self.display_compute(f, &self.aggregate_compute, "Aggregate")?;
501        writeln!(f)?;
502        self.display_compute(f, &self.other_compute, "Other")?;
503        writeln!(f)
504    }
505}
506
507#[derive(Default)]
508pub struct PlanComputeVisitor {
509    elapsed_compute: Option<usize>,
510    filter_computes: Vec<PartitionsComputeStats>,
511    sort_computes: Vec<PartitionsComputeStats>,
512    projection_computes: Vec<PartitionsComputeStats>,
513    join_computes: Vec<PartitionsComputeStats>,
514    aggregate_computes: Vec<PartitionsComputeStats>,
515    other_computes: Vec<PartitionsComputeStats>,
516}
517
518impl PlanComputeVisitor {
519    fn add_elapsed_compute(&mut self, node_elapsed_compute: Option<usize>) {
520        match (self.elapsed_compute, node_elapsed_compute) {
521            (Some(agg_elapsed_compute), Some(node_elapsed_compute)) => {
522                self.elapsed_compute = Some(agg_elapsed_compute + node_elapsed_compute)
523            }
524            (Some(_), None) | (None, None) => {}
525            (None, Some(node_elapsed_compute)) => self.elapsed_compute = Some(node_elapsed_compute),
526        }
527    }
528
529    fn collect_compute_metrics(&mut self, plan: &dyn ExecutionPlan) {
530        let compute_metrics = plan.metrics();
531        if let Some(metrics) = compute_metrics {
532            self.add_elapsed_compute(metrics.elapsed_compute());
533        }
534        self.collect_filter_metrics(plan);
535        self.collect_sort_metrics(plan);
536        self.collect_projection_metrics(plan);
537        self.collect_join_metrics(plan);
538        self.collect_aggregate_metrics(plan);
539        self.collect_other_metrics(plan);
540    }
541
542    // TODO: Refactor to have a single function that takes predicate and collector
543    fn collect_filter_metrics(&mut self, plan: &dyn ExecutionPlan) {
544        if is_filter_plan(plan) {
545            if let Some(metrics) = plan.metrics() {
546                let sorted_computes: Vec<usize> = metrics
547                    .iter()
548                    .filter_map(|m| match m.value() {
549                        MetricValue::ElapsedCompute(t) => Some(t.value()),
550                        _ => None,
551                    })
552                    .sorted()
553                    .collect();
554                let p = PartitionsComputeStats {
555                    name: plan.name().to_string(),
556                    elapsed_computes: sorted_computes,
557                };
558                self.filter_computes.push(p)
559            }
560        }
561    }
562
563    fn collect_sort_metrics(&mut self, plan: &dyn ExecutionPlan) {
564        if is_sort_plan(plan) {
565            if let Some(metrics) = plan.metrics() {
566                let sorted_computes: Vec<usize> = metrics
567                    .iter()
568                    .filter_map(|m| match m.value() {
569                        MetricValue::ElapsedCompute(t) => Some(t.value()),
570                        _ => None,
571                    })
572                    .sorted()
573                    .collect();
574                let p = PartitionsComputeStats {
575                    name: plan.name().to_string(),
576                    elapsed_computes: sorted_computes,
577                };
578                self.sort_computes.push(p)
579            }
580        }
581    }
582
583    fn collect_projection_metrics(&mut self, plan: &dyn ExecutionPlan) {
584        if is_projection_plan(plan) {
585            if let Some(metrics) = plan.metrics() {
586                let sorted_computes: Vec<usize> = metrics
587                    .iter()
588                    .filter_map(|m| match m.value() {
589                        MetricValue::ElapsedCompute(t) => Some(t.value()),
590                        _ => None,
591                    })
592                    .sorted()
593                    .collect();
594                let p = PartitionsComputeStats {
595                    name: plan.name().to_string(),
596                    elapsed_computes: sorted_computes,
597                };
598                self.projection_computes.push(p)
599            }
600        }
601    }
602
603    fn collect_join_metrics(&mut self, plan: &dyn ExecutionPlan) {
604        if is_join_plan(plan) {
605            if let Some(metrics) = plan.metrics() {
606                let sorted_computes: Vec<usize> = metrics
607                    .iter()
608                    .filter_map(|m| match m.value() {
609                        MetricValue::ElapsedCompute(t) => Some(t.value()),
610                        _ => None,
611                    })
612                    .sorted()
613                    .collect();
614                let p = PartitionsComputeStats {
615                    name: plan.name().to_string(),
616                    elapsed_computes: sorted_computes,
617                };
618                self.join_computes.push(p)
619            }
620        }
621    }
622
623    fn collect_aggregate_metrics(&mut self, plan: &dyn ExecutionPlan) {
624        if is_aggregate_plan(plan) {
625            if let Some(metrics) = plan.metrics() {
626                let sorted_computes: Vec<usize> = metrics
627                    .iter()
628                    .filter_map(|m| match m.value() {
629                        MetricValue::ElapsedCompute(t) => Some(t.value()),
630                        _ => None,
631                    })
632                    .sorted()
633                    .collect();
634                let p = PartitionsComputeStats {
635                    name: plan.name().to_string(),
636                    elapsed_computes: sorted_computes,
637                };
638                self.aggregate_computes.push(p)
639            }
640        }
641    }
642
643    fn collect_other_metrics(&mut self, plan: &dyn ExecutionPlan) {
644        if !is_filter_plan(plan)
645            && !is_sort_plan(plan)
646            && !is_projection_plan(plan)
647            && !is_aggregate_plan(plan)
648            && !is_join_plan(plan)
649        {
650            if let Some(metrics) = plan.metrics() {
651                let sorted_computes: Vec<usize> = metrics
652                    .iter()
653                    .filter_map(|m| match m.value() {
654                        MetricValue::ElapsedCompute(t) => Some(t.value()),
655                        _ => None,
656                    })
657                    .sorted()
658                    .collect();
659                let p = PartitionsComputeStats {
660                    name: plan.name().to_string(),
661                    elapsed_computes: sorted_computes,
662                };
663                self.other_computes.push(p)
664            }
665        }
666    }
667}
668
669fn is_filter_plan(plan: &dyn ExecutionPlan) -> bool {
670    plan.as_any().downcast_ref::<FilterExec>().is_some()
671}
672
673fn is_sort_plan(plan: &dyn ExecutionPlan) -> bool {
674    plan.as_any().downcast_ref::<SortExec>().is_some()
675        || plan
676            .as_any()
677            .downcast_ref::<SortPreservingMergeExec>()
678            .is_some()
679}
680
681fn is_projection_plan(plan: &dyn ExecutionPlan) -> bool {
682    plan.as_any().downcast_ref::<ProjectionExec>().is_some()
683}
684
685fn is_join_plan(plan: &dyn ExecutionPlan) -> bool {
686    plan.as_any().downcast_ref::<HashJoinExec>().is_some()
687        || plan.as_any().downcast_ref::<CrossJoinExec>().is_some()
688        || plan.as_any().downcast_ref::<SortMergeJoinExec>().is_some()
689        || plan.as_any().downcast_ref::<NestedLoopJoinExec>().is_some()
690        || plan
691            .as_any()
692            .downcast_ref::<SymmetricHashJoinExec>()
693            .is_some()
694}
695
696fn is_aggregate_plan(plan: &dyn ExecutionPlan) -> bool {
697    plan.as_any().downcast_ref::<AggregateExec>().is_some()
698}
699
700impl From<PlanComputeVisitor> for ExecutionComputeStats {
701    fn from(value: PlanComputeVisitor) -> Self {
702        Self {
703            elapsed_compute: value.elapsed_compute,
704            filter_compute: Some(value.filter_computes),
705            sort_compute: Some(value.sort_computes),
706            projection_compute: Some(value.projection_computes),
707            join_compute: Some(value.join_computes),
708            aggregate_compute: Some(value.aggregate_computes),
709            other_compute: Some(value.other_computes),
710        }
711    }
712}
713
714impl ExecutionPlanVisitor for PlanComputeVisitor {
715    type Error = datafusion_common::DataFusionError;
716
717    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> color_eyre::Result<bool, Self::Error> {
718        if !is_io_plan(plan) {
719            self.collect_compute_metrics(plan);
720        }
721        Ok(true)
722    }
723}
724
725fn is_io_plan(plan: &dyn ExecutionPlan) -> bool {
726    let io_plans = ["CsvExec", "ParquetExec", "ArrowExec"];
727    io_plans.contains(&plan.name())
728}
729
730pub fn collect_plan_io_stats(plan: Arc<dyn ExecutionPlan>) -> Option<ExecutionIOStats> {
731    let mut visitor = PlanIOVisitor::new();
732    if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() {
733        Some(visitor.into())
734    } else {
735        None
736    }
737}
738
739pub fn collect_plan_compute_stats(plan: Arc<dyn ExecutionPlan>) -> Option<ExecutionComputeStats> {
740    let mut visitor = PlanComputeVisitor::default();
741    if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() {
742        Some(visitor.into())
743    } else {
744        None
745    }
746}
747
748pub fn print_io_summary(plan: Arc<dyn ExecutionPlan>) {
749    println!("======================= IO Summary ========================");
750    if let Some(stats) = collect_plan_io_stats(plan) {
751        println!("IO Stats: {:#?}", stats);
752    } else {
753        println!("No IO metrics found");
754    }
755}