1use datafusion::{
19 datasource::physical_plan::ParquetExec,
20 physical_plan::{
21 aggregates::AggregateExec,
22 filter::FilterExec,
23 joins::{
24 CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec,
25 SymmetricHashJoinExec,
26 },
27 metrics::MetricValue,
28 projection::ProjectionExec,
29 sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec},
30 visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
31 },
32};
33use itertools::Itertools;
34use std::{sync::Arc, time::Duration};
35
36#[derive(Clone, Debug)]
37pub struct ExecutionStats {
38 query: String,
39 rows: usize,
40 batches: i32,
41 bytes: usize,
42 durations: ExecutionDurationStats,
43 io: Option<ExecutionIOStats>,
44 compute: Option<ExecutionComputeStats>,
45 plan: Arc<dyn ExecutionPlan>,
46}
47
48impl ExecutionStats {
49 pub fn try_new(
50 query: String,
51 durations: ExecutionDurationStats,
52 rows: usize,
53 batches: i32,
54 bytes: usize,
55 plan: Arc<dyn ExecutionPlan>,
56 ) -> color_eyre::Result<Self> {
57 Ok(Self {
58 query,
59 durations,
60 rows,
61 batches,
62 bytes,
63 plan,
64 io: None,
65 compute: None,
66 })
67 }
68
69 pub fn collect_stats(&mut self) {
70 if let Some(io) = collect_plan_io_stats(Arc::clone(&self.plan)) {
71 self.io = Some(io)
72 }
73 if let Some(compute) = collect_plan_compute_stats(Arc::clone(&self.plan)) {
74 self.compute = Some(compute)
75 }
76 }
77
78 pub fn rows_selectivity(&self) -> f64 {
79 let maybe_io_output_rows = self.io.as_ref().and_then(|io| io.parquet_output_rows);
80 if let Some(io_output_rows) = maybe_io_output_rows {
81 self.rows as f64 / io_output_rows as f64
82 } else {
83 0.0
84 }
85 }
86
87 pub fn bytes_selectivity(&self) -> f64 {
88 let maybe_io_output_bytes = self.io.as_ref().and_then(|io| io.bytes_scanned.clone());
89 if let Some(io_output_bytes) = maybe_io_output_bytes {
90 self.bytes as f64 / io_output_bytes.as_usize() as f64
91 } else {
92 0.0
93 }
94 }
95
96 pub fn selectivity_efficiency(&self) -> f64 {
97 if let Some(io) = &self.io {
98 io.parquet_rg_pruned_stats_ratio() / self.rows_selectivity()
99 } else {
100 0.0
101 }
102 }
103}
104
105impl std::fmt::Display for ExecutionStats {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 writeln!(
108 f,
109 "========================= Query ==========================="
110 )?;
111 writeln!(f, "{}", self.query)?;
112 writeln!(
113 f,
114 "==================== Execution Summary ===================="
115 )?;
116 writeln!(
117 f,
118 "{:<20} {:<20} {:<20}",
119 "Output Rows (%)", "Output Bytes (%)", "Batches Processed",
120 )?;
121 writeln!(
122 f,
123 "{:<20} {:<20} {:<20}",
124 format!("{} ({:.2})", self.rows, self.rows_selectivity()),
125 format!("{} ({:.2})", self.bytes, self.bytes_selectivity()),
126 self.batches,
127 )?;
128 writeln!(f)?;
129 writeln!(f, "{}", self.durations)?;
130 writeln!(f, "{:<20}", "Parquet Efficiency (Pruning / Selectivity)")?;
131 writeln!(f, "{:<20.2}", self.selectivity_efficiency())?;
132 writeln!(f)?;
133 if let Some(io_stats) = &self.io {
134 writeln!(f, "{}", io_stats)?;
135 };
136 if let Some(compute_stats) = &self.compute {
137 writeln!(f, "{}", compute_stats)?;
138 };
139 Ok(())
140 }
141}
142
143#[derive(Clone, Debug)]
144pub struct ExecutionDurationStats {
145 parsing: Duration,
146 logical_planning: Duration,
147 physical_planning: Duration,
148 execution: Duration,
149 total: Duration,
150}
151
152impl ExecutionDurationStats {
153 pub fn new(
154 parsing: Duration,
155 logical_planning: Duration,
156 physical_planning: Duration,
157 execution: Duration,
158 total: Duration,
159 ) -> Self {
160 Self {
161 parsing,
162 logical_planning,
163 physical_planning,
164 execution,
165 total,
166 }
167 }
168}
169
170impl std::fmt::Display for ExecutionDurationStats {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 writeln!(
173 f,
174 "{:<20} {:<20} {:<20}",
175 "Parsing", "Logical Planning", "Physical Planning"
176 )?;
177 writeln!(
178 f,
179 "{:<20?} {:<20?} {:<20?}",
180 self.parsing, self.logical_planning, self.physical_planning
181 )?;
182 writeln!(f)?;
183 writeln!(f, "{:<20} {:<20}", "Execution", "Total",)?;
184 writeln!(f, "{:<20?} {:<20?}", self.execution, self.total)?;
185 Ok(())
186 }
187}
188
189#[derive(Clone, Debug)]
190pub struct ExecutionIOStats {
191 bytes_scanned: Option<MetricValue>,
192 time_opening: Option<MetricValue>,
193 time_scanning: Option<MetricValue>,
194 parquet_output_rows: Option<usize>,
195 parquet_pruned_page_index: Option<MetricValue>,
196 parquet_matched_page_index: Option<MetricValue>,
197 parquet_rg_pruned_stats: Option<MetricValue>,
198 parquet_rg_matched_stats: Option<MetricValue>,
199 parquet_rg_pruned_bloom_filter: Option<MetricValue>,
200 parquet_rg_matched_bloom_filter: Option<MetricValue>,
201}
202
203impl ExecutionIOStats {
204 fn parquet_rg_pruned_stats_ratio(&self) -> f64 {
205 if let (Some(pruned), Some(matched)) = (
206 self.parquet_rg_matched_stats.as_ref(),
207 self.parquet_rg_pruned_stats.as_ref(),
208 ) {
209 let pruned = pruned.as_usize() as f64;
210 let matched = matched.as_usize() as f64;
211 matched / (pruned + matched)
212 } else {
213 0.0
214 }
215 }
216
217 fn parquet_rg_pruned_bloom_filter_ratio(&self) -> f64 {
218 if let (Some(pruned), Some(matched)) = (
219 self.parquet_rg_matched_bloom_filter.as_ref(),
220 self.parquet_rg_pruned_bloom_filter.as_ref(),
221 ) {
222 let pruned = pruned.as_usize() as f64;
223 let matched = matched.as_usize() as f64;
224 matched / (pruned + matched)
225 } else {
226 0.0
227 }
228 }
229
230 fn parquet_rg_pruned_page_index_ratio(&self) -> f64 {
231 if let (Some(pruned), Some(matched)) = (
232 self.parquet_matched_page_index.as_ref(),
233 self.parquet_pruned_page_index.as_ref(),
234 ) {
235 let pruned = pruned.as_usize() as f64;
236 let matched = matched.as_usize() as f64;
237 matched / (pruned + matched)
238 } else {
239 0.0
240 }
241 }
242
243 fn row_group_count(&self) -> usize {
244 if let (Some(pruned), Some(matched)) = (
245 self.parquet_rg_matched_stats.as_ref(),
246 self.parquet_rg_pruned_stats.as_ref(),
247 ) {
248 let pruned = pruned.as_usize();
249 let matched = matched.as_usize();
250 pruned + matched
251 } else {
252 0
253 }
254 }
255}
256
257impl std::fmt::Display for ExecutionIOStats {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 writeln!(
260 f,
261 "======================= IO Summary ========================"
262 )?;
263 writeln!(
264 f,
265 "{:<20} {:<20} {:<20}",
266 "Bytes Scanned", "Time Opening", "Time Scanning"
267 )?;
268 writeln!(
269 f,
270 "{:<20} {:<20} {:<20}",
271 self.bytes_scanned
272 .as_ref()
273 .map(|m| m.to_string())
274 .unwrap_or("None".to_string()),
275 self.time_opening
276 .as_ref()
277 .map(|m| m.to_string())
278 .unwrap_or("None".to_string()),
279 self.time_scanning
280 .as_ref()
281 .map(|m| m.to_string())
282 .unwrap_or("None".to_string())
283 )?;
284 writeln!(f)?;
285 writeln!(
286 f,
287 "Parquet Pruning Stats (Output Rows: {}, Row Groups: {} [{}ms per row group])",
288 self.parquet_output_rows
289 .as_ref()
290 .map(|m| m.to_string())
291 .unwrap_or("None".to_string()),
292 self.row_group_count(),
293 self.time_scanning
294 .as_ref()
295 .map(|ts| format!(
296 "{:.2}",
297 (ts.as_usize() / 1_000_000) as f64 / self.row_group_count() as f64
298 ))
299 .unwrap_or("None".to_string())
300 )?;
301 writeln!(
302 f,
303 "{:<20} {:<20} {:<20}",
304 "Matched RG Stats %", "Matched RG Bloom %", "Matched Page Index %"
305 )?;
306 writeln!(
307 f,
308 "{:<20.2} {:<20.2} {:<20.2}",
309 self.parquet_rg_pruned_stats_ratio(),
310 self.parquet_rg_pruned_bloom_filter_ratio(),
311 self.parquet_rg_pruned_page_index_ratio()
312 )?;
313 Ok(())
314 }
315}
316
317struct PlanIOVisitor {
322 bytes_scanned: Option<MetricValue>,
323 time_opening: Option<MetricValue>,
324 time_scanning: Option<MetricValue>,
325 parquet_output_rows: Option<usize>,
326 parquet_pruned_page_index: Option<MetricValue>,
327 parquet_matched_page_index: Option<MetricValue>,
328 parquet_rg_pruned_stats: Option<MetricValue>,
329 parquet_rg_matched_stats: Option<MetricValue>,
330 parquet_rg_pruned_bloom_filter: Option<MetricValue>,
331 parquet_rg_matched_bloom_filter: Option<MetricValue>,
332}
333
334impl PlanIOVisitor {
335 fn new() -> Self {
336 Self {
337 bytes_scanned: None,
338 time_opening: None,
339 time_scanning: None,
340 parquet_output_rows: None,
341 parquet_pruned_page_index: None,
342 parquet_matched_page_index: None,
343 parquet_rg_pruned_stats: None,
344 parquet_rg_matched_stats: None,
345 parquet_rg_pruned_bloom_filter: None,
346 parquet_rg_matched_bloom_filter: None,
347 }
348 }
349
350 fn collect_io_metrics(&mut self, plan: &dyn ExecutionPlan) {
351 let io_metrics = plan.metrics();
352 if let Some(metrics) = io_metrics {
353 self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
354 self.time_opening = metrics.sum_by_name("time_elapsed_opening");
355 self.time_scanning = metrics.sum_by_name("time_elapsed_scanning_total");
356
357 if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
358 self.parquet_output_rows = metrics.output_rows();
359 self.parquet_rg_pruned_stats = metrics.sum_by_name("row_groups_pruned_statistics");
360 self.parquet_rg_matched_stats =
361 metrics.sum_by_name("row_groups_matched_statistics");
362 }
363 }
364 }
365}
366
367impl From<PlanIOVisitor> for ExecutionIOStats {
368 fn from(value: PlanIOVisitor) -> Self {
369 Self {
370 bytes_scanned: value.bytes_scanned,
371 time_opening: value.time_opening,
372 time_scanning: value.time_scanning,
373 parquet_output_rows: value.parquet_output_rows,
374 parquet_pruned_page_index: value.parquet_pruned_page_index,
375 parquet_matched_page_index: value.parquet_matched_page_index,
376 parquet_rg_pruned_stats: value.parquet_rg_pruned_stats,
377 parquet_rg_matched_stats: value.parquet_rg_matched_stats,
378 parquet_rg_pruned_bloom_filter: value.parquet_rg_pruned_bloom_filter,
379 parquet_rg_matched_bloom_filter: value.parquet_rg_matched_bloom_filter,
380 }
381 }
382}
383
384impl ExecutionPlanVisitor for PlanIOVisitor {
385 type Error = datafusion_common::DataFusionError;
386
387 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> color_eyre::Result<bool, Self::Error> {
388 if is_io_plan(plan) {
389 self.collect_io_metrics(plan);
390 }
391 Ok(true)
392 }
393}
394
395#[derive(Clone, Debug)]
396pub struct PartitionsComputeStats {
397 name: String,
398 elapsed_computes: Vec<usize>,
400}
401
402impl PartitionsComputeStats {
403 fn summary_stats(&self) -> (usize, usize, usize, usize, usize) {
404 if self.elapsed_computes.is_empty() {
405 (0, 0, 0, 0, 0)
406 } else {
407 let min = self.elapsed_computes[0];
408 let median = self.elapsed_computes[self.elapsed_computes.len() / 2];
409 let max = self.elapsed_computes[self.elapsed_computes.len() - 1];
410 let total: usize = self.elapsed_computes.iter().sum();
411 let mean = total / self.elapsed_computes.len();
412 (min, median, mean, max, total)
413 }
414 }
415
416 fn partitions(&self) -> usize {
417 self.elapsed_computes.len()
418 }
419}
420
421#[derive(Clone, Debug)]
422pub struct ExecutionComputeStats {
423 elapsed_compute: Option<usize>,
424 projection_compute: Option<Vec<PartitionsComputeStats>>,
425 filter_compute: Option<Vec<PartitionsComputeStats>>,
426 sort_compute: Option<Vec<PartitionsComputeStats>>,
427 join_compute: Option<Vec<PartitionsComputeStats>>,
428 aggregate_compute: Option<Vec<PartitionsComputeStats>>,
429 other_compute: Option<Vec<PartitionsComputeStats>>,
430}
431
432impl ExecutionComputeStats {
433 fn display_compute(
434 &self,
435 f: &mut std::fmt::Formatter<'_>,
436 compute: &Option<Vec<PartitionsComputeStats>>,
437 label: &str,
438 ) -> std::fmt::Result {
439 if let (Some(filter_compute), Some(elapsed_compute)) = (compute, &self.elapsed_compute) {
440 let partitions = filter_compute.iter().fold(0, |acc, c| acc + c.partitions());
441 writeln!(
442 f,
443 "{label} Stats ({} nodes, {} partitions)",
444 filter_compute.len(),
445 partitions
446 )?;
447 writeln!(
448 f,
449 "{:<30} {:<16} {:<16} {:<16} {:<16} {:<16}",
450 "Node(Partitions)", "Min", "Median", "Mean", "Max", "Total (%)"
451 )?;
452 filter_compute.iter().try_for_each(|node| {
453 let (min, median, mean, max, total) = node.summary_stats();
454 let total = format!(
455 "{} ({:.2}%)",
456 total,
457 (total as f32 / *elapsed_compute as f32) * 100.0
458 );
459 writeln!(
460 f,
461 "{:<30} {:<16} {:<16} {:<16} {:<16} {:<16}",
462 format!("{}({})", node.name, node.elapsed_computes.len()),
463 min,
464 median,
465 mean,
466 max,
467 total,
468 )
469 })
470 } else {
471 writeln!(f, "No {label} Stats")
472 }
473 }
474}
475
476impl std::fmt::Display for ExecutionComputeStats {
477 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
478 writeln!(
479 f,
480 "==================================== Compute Summary ====================================="
481 )?;
482 writeln!(f, "{:<20}", "Elapsed Compute",)?;
483 writeln!(
484 f,
485 "{:<20}",
486 self.elapsed_compute
487 .as_ref()
488 .map(|m| m.to_string())
489 .unwrap_or("None".to_string()),
490 )?;
491 writeln!(f)?;
492 self.display_compute(f, &self.projection_compute, "Projection")?;
493 writeln!(f)?;
494 self.display_compute(f, &self.filter_compute, "Filter")?;
495 writeln!(f)?;
496 self.display_compute(f, &self.sort_compute, "Sort")?;
497 writeln!(f)?;
498 self.display_compute(f, &self.join_compute, "Join")?;
499 writeln!(f)?;
500 self.display_compute(f, &self.aggregate_compute, "Aggregate")?;
501 writeln!(f)?;
502 self.display_compute(f, &self.other_compute, "Other")?;
503 writeln!(f)
504 }
505}
506
507#[derive(Default)]
508pub struct PlanComputeVisitor {
509 elapsed_compute: Option<usize>,
510 filter_computes: Vec<PartitionsComputeStats>,
511 sort_computes: Vec<PartitionsComputeStats>,
512 projection_computes: Vec<PartitionsComputeStats>,
513 join_computes: Vec<PartitionsComputeStats>,
514 aggregate_computes: Vec<PartitionsComputeStats>,
515 other_computes: Vec<PartitionsComputeStats>,
516}
517
518impl PlanComputeVisitor {
519 fn add_elapsed_compute(&mut self, node_elapsed_compute: Option<usize>) {
520 match (self.elapsed_compute, node_elapsed_compute) {
521 (Some(agg_elapsed_compute), Some(node_elapsed_compute)) => {
522 self.elapsed_compute = Some(agg_elapsed_compute + node_elapsed_compute)
523 }
524 (Some(_), None) | (None, None) => {}
525 (None, Some(node_elapsed_compute)) => self.elapsed_compute = Some(node_elapsed_compute),
526 }
527 }
528
529 fn collect_compute_metrics(&mut self, plan: &dyn ExecutionPlan) {
530 let compute_metrics = plan.metrics();
531 if let Some(metrics) = compute_metrics {
532 self.add_elapsed_compute(metrics.elapsed_compute());
533 }
534 self.collect_filter_metrics(plan);
535 self.collect_sort_metrics(plan);
536 self.collect_projection_metrics(plan);
537 self.collect_join_metrics(plan);
538 self.collect_aggregate_metrics(plan);
539 self.collect_other_metrics(plan);
540 }
541
542 fn collect_filter_metrics(&mut self, plan: &dyn ExecutionPlan) {
544 if is_filter_plan(plan) {
545 if let Some(metrics) = plan.metrics() {
546 let sorted_computes: Vec<usize> = metrics
547 .iter()
548 .filter_map(|m| match m.value() {
549 MetricValue::ElapsedCompute(t) => Some(t.value()),
550 _ => None,
551 })
552 .sorted()
553 .collect();
554 let p = PartitionsComputeStats {
555 name: plan.name().to_string(),
556 elapsed_computes: sorted_computes,
557 };
558 self.filter_computes.push(p)
559 }
560 }
561 }
562
563 fn collect_sort_metrics(&mut self, plan: &dyn ExecutionPlan) {
564 if is_sort_plan(plan) {
565 if let Some(metrics) = plan.metrics() {
566 let sorted_computes: Vec<usize> = metrics
567 .iter()
568 .filter_map(|m| match m.value() {
569 MetricValue::ElapsedCompute(t) => Some(t.value()),
570 _ => None,
571 })
572 .sorted()
573 .collect();
574 let p = PartitionsComputeStats {
575 name: plan.name().to_string(),
576 elapsed_computes: sorted_computes,
577 };
578 self.sort_computes.push(p)
579 }
580 }
581 }
582
583 fn collect_projection_metrics(&mut self, plan: &dyn ExecutionPlan) {
584 if is_projection_plan(plan) {
585 if let Some(metrics) = plan.metrics() {
586 let sorted_computes: Vec<usize> = metrics
587 .iter()
588 .filter_map(|m| match m.value() {
589 MetricValue::ElapsedCompute(t) => Some(t.value()),
590 _ => None,
591 })
592 .sorted()
593 .collect();
594 let p = PartitionsComputeStats {
595 name: plan.name().to_string(),
596 elapsed_computes: sorted_computes,
597 };
598 self.projection_computes.push(p)
599 }
600 }
601 }
602
603 fn collect_join_metrics(&mut self, plan: &dyn ExecutionPlan) {
604 if is_join_plan(plan) {
605 if let Some(metrics) = plan.metrics() {
606 let sorted_computes: Vec<usize> = metrics
607 .iter()
608 .filter_map(|m| match m.value() {
609 MetricValue::ElapsedCompute(t) => Some(t.value()),
610 _ => None,
611 })
612 .sorted()
613 .collect();
614 let p = PartitionsComputeStats {
615 name: plan.name().to_string(),
616 elapsed_computes: sorted_computes,
617 };
618 self.join_computes.push(p)
619 }
620 }
621 }
622
623 fn collect_aggregate_metrics(&mut self, plan: &dyn ExecutionPlan) {
624 if is_aggregate_plan(plan) {
625 if let Some(metrics) = plan.metrics() {
626 let sorted_computes: Vec<usize> = metrics
627 .iter()
628 .filter_map(|m| match m.value() {
629 MetricValue::ElapsedCompute(t) => Some(t.value()),
630 _ => None,
631 })
632 .sorted()
633 .collect();
634 let p = PartitionsComputeStats {
635 name: plan.name().to_string(),
636 elapsed_computes: sorted_computes,
637 };
638 self.aggregate_computes.push(p)
639 }
640 }
641 }
642
643 fn collect_other_metrics(&mut self, plan: &dyn ExecutionPlan) {
644 if !is_filter_plan(plan)
645 && !is_sort_plan(plan)
646 && !is_projection_plan(plan)
647 && !is_aggregate_plan(plan)
648 && !is_join_plan(plan)
649 {
650 if let Some(metrics) = plan.metrics() {
651 let sorted_computes: Vec<usize> = metrics
652 .iter()
653 .filter_map(|m| match m.value() {
654 MetricValue::ElapsedCompute(t) => Some(t.value()),
655 _ => None,
656 })
657 .sorted()
658 .collect();
659 let p = PartitionsComputeStats {
660 name: plan.name().to_string(),
661 elapsed_computes: sorted_computes,
662 };
663 self.other_computes.push(p)
664 }
665 }
666 }
667}
668
669fn is_filter_plan(plan: &dyn ExecutionPlan) -> bool {
670 plan.as_any().downcast_ref::<FilterExec>().is_some()
671}
672
673fn is_sort_plan(plan: &dyn ExecutionPlan) -> bool {
674 plan.as_any().downcast_ref::<SortExec>().is_some()
675 || plan
676 .as_any()
677 .downcast_ref::<SortPreservingMergeExec>()
678 .is_some()
679}
680
681fn is_projection_plan(plan: &dyn ExecutionPlan) -> bool {
682 plan.as_any().downcast_ref::<ProjectionExec>().is_some()
683}
684
685fn is_join_plan(plan: &dyn ExecutionPlan) -> bool {
686 plan.as_any().downcast_ref::<HashJoinExec>().is_some()
687 || plan.as_any().downcast_ref::<CrossJoinExec>().is_some()
688 || plan.as_any().downcast_ref::<SortMergeJoinExec>().is_some()
689 || plan.as_any().downcast_ref::<NestedLoopJoinExec>().is_some()
690 || plan
691 .as_any()
692 .downcast_ref::<SymmetricHashJoinExec>()
693 .is_some()
694}
695
696fn is_aggregate_plan(plan: &dyn ExecutionPlan) -> bool {
697 plan.as_any().downcast_ref::<AggregateExec>().is_some()
698}
699
700impl From<PlanComputeVisitor> for ExecutionComputeStats {
701 fn from(value: PlanComputeVisitor) -> Self {
702 Self {
703 elapsed_compute: value.elapsed_compute,
704 filter_compute: Some(value.filter_computes),
705 sort_compute: Some(value.sort_computes),
706 projection_compute: Some(value.projection_computes),
707 join_compute: Some(value.join_computes),
708 aggregate_compute: Some(value.aggregate_computes),
709 other_compute: Some(value.other_computes),
710 }
711 }
712}
713
714impl ExecutionPlanVisitor for PlanComputeVisitor {
715 type Error = datafusion_common::DataFusionError;
716
717 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> color_eyre::Result<bool, Self::Error> {
718 if !is_io_plan(plan) {
719 self.collect_compute_metrics(plan);
720 }
721 Ok(true)
722 }
723}
724
725fn is_io_plan(plan: &dyn ExecutionPlan) -> bool {
726 let io_plans = ["CsvExec", "ParquetExec", "ArrowExec"];
727 io_plans.contains(&plan.name())
728}
729
730pub fn collect_plan_io_stats(plan: Arc<dyn ExecutionPlan>) -> Option<ExecutionIOStats> {
731 let mut visitor = PlanIOVisitor::new();
732 if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() {
733 Some(visitor.into())
734 } else {
735 None
736 }
737}
738
739pub fn collect_plan_compute_stats(plan: Arc<dyn ExecutionPlan>) -> Option<ExecutionComputeStats> {
740 let mut visitor = PlanComputeVisitor::default();
741 if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() {
742 Some(visitor.into())
743 } else {
744 None
745 }
746}
747
748pub fn print_io_summary(plan: Arc<dyn ExecutionPlan>) {
749 println!("======================= IO Summary ========================");
750 if let Some(stats) = collect_plan_io_stats(plan) {
751 println!("IO Stats: {:#?}", stats);
752 } else {
753 println!("No IO metrics found");
754 }
755}