1use datafusion::{
19 datasource::{physical_plan::ParquetSource, source::DataSourceExec},
20 physical_plan::{
21 aggregates::AggregateExec,
22 filter::FilterExec,
23 joins::{
24 CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec,
25 SymmetricHashJoinExec,
26 },
27 metrics::MetricValue,
28 projection::ProjectionExec,
29 sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec},
30 visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
31 },
32};
33use itertools::Itertools;
34use std::{sync::Arc, time::Duration};
35
36#[derive(Clone, Debug)]
37pub struct ExecutionStats {
38 query: String,
39 rows: usize,
40 batches: i32,
41 bytes: usize,
42 durations: ExecutionDurationStats,
43 io: Option<ExecutionIOStats>,
44 compute: Option<ExecutionComputeStats>,
45 plan: Arc<dyn ExecutionPlan>,
46}
47
48impl ExecutionStats {
49 pub fn try_new(
50 query: String,
51 durations: ExecutionDurationStats,
52 rows: usize,
53 batches: i32,
54 bytes: usize,
55 plan: Arc<dyn ExecutionPlan>,
56 ) -> color_eyre::Result<Self> {
57 Ok(Self {
58 query,
59 durations,
60 rows,
61 batches,
62 bytes,
63 plan,
64 io: None,
65 compute: None,
66 })
67 }
68
69 pub fn collect_stats(&mut self) {
70 if let Some(io) = collect_plan_io_stats(Arc::clone(&self.plan)) {
71 self.io = Some(io)
72 }
73 if let Some(compute) = collect_plan_compute_stats(Arc::clone(&self.plan)) {
74 self.compute = Some(compute)
75 }
76 }
77
78 pub fn rows_selectivity(&self) -> f64 {
79 let maybe_io_output_rows = self.io.as_ref().and_then(|io| io.parquet_output_rows);
80 if let Some(io_output_rows) = maybe_io_output_rows {
81 self.rows as f64 / io_output_rows as f64
82 } else {
83 0.0
84 }
85 }
86
87 pub fn bytes_selectivity(&self) -> f64 {
88 let maybe_io_output_bytes = self.io.as_ref().and_then(|io| io.bytes_scanned.clone());
89 if let Some(io_output_bytes) = maybe_io_output_bytes {
90 self.bytes as f64 / io_output_bytes.as_usize() as f64
91 } else {
92 0.0
93 }
94 }
95
96 pub fn selectivity_efficiency(&self) -> f64 {
97 if let Some(io) = &self.io {
98 io.parquet_rg_pruned_stats_ratio() / self.rows_selectivity()
99 } else {
100 0.0
101 }
102 }
103}
104
105impl std::fmt::Display for ExecutionStats {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 writeln!(
108 f,
109 "========================= Query ==========================="
110 )?;
111 writeln!(f, "{}", self.query)?;
112 writeln!(
113 f,
114 "==================== Execution Summary ===================="
115 )?;
116 writeln!(
117 f,
118 "{:<20} {:<20} {:<20}",
119 "Output Rows (%)", "Output Bytes (%)", "Batches Processed",
120 )?;
121 writeln!(
122 f,
123 "{:<20} {:<20} {:<20}",
124 format!("{} ({:.2})", self.rows, self.rows_selectivity()),
125 format!("{} ({:.2})", self.bytes, self.bytes_selectivity()),
126 self.batches,
127 )?;
128 writeln!(f)?;
129 writeln!(f, "{}", self.durations)?;
130 writeln!(f, "{:<20}", "Parquet Efficiency (Pruning / Selectivity)")?;
131 writeln!(f, "{:<20.2}", self.selectivity_efficiency())?;
132 writeln!(f)?;
133 if let Some(io_stats) = &self.io {
134 writeln!(f, "{}", io_stats)?;
135 };
136 if let Some(compute_stats) = &self.compute {
137 writeln!(f, "{}", compute_stats)?;
138 };
139 Ok(())
140 }
141}
142
143#[derive(Clone, Debug)]
144pub struct ExecutionDurationStats {
145 parsing: Duration,
146 logical_planning: Duration,
147 physical_planning: Duration,
148 execution: Duration,
149 total: Duration,
150}
151
152impl ExecutionDurationStats {
153 pub fn new(
154 parsing: Duration,
155 logical_planning: Duration,
156 physical_planning: Duration,
157 execution: Duration,
158 total: Duration,
159 ) -> Self {
160 Self {
161 parsing,
162 logical_planning,
163 physical_planning,
164 execution,
165 total,
166 }
167 }
168}
169
170impl std::fmt::Display for ExecutionDurationStats {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 writeln!(
173 f,
174 "{:<20} {:<20} {:<20}",
175 "Parsing", "Logical Planning", "Physical Planning"
176 )?;
177 writeln!(
178 f,
179 "{:<20?} {:<20?} {:<20?}",
180 self.parsing, self.logical_planning, self.physical_planning
181 )?;
182 writeln!(f)?;
183 writeln!(f, "{:<20} {:<20}", "Execution", "Total",)?;
184 writeln!(f, "{:<20?} {:<20?}", self.execution, self.total)?;
185 Ok(())
186 }
187}
188
189#[derive(Clone, Debug)]
190pub struct ExecutionIOStats {
191 bytes_scanned: Option<MetricValue>,
192 time_opening: Option<MetricValue>,
193 time_scanning: Option<MetricValue>,
194 parquet_output_rows: Option<usize>,
195 parquet_pruned_page_index: Option<MetricValue>,
196 parquet_matched_page_index: Option<MetricValue>,
197 parquet_rg_pruned_stats: Option<MetricValue>,
198 parquet_rg_matched_stats: Option<MetricValue>,
199 parquet_rg_pruned_bloom_filter: Option<MetricValue>,
200 parquet_rg_matched_bloom_filter: Option<MetricValue>,
201}
202
203impl ExecutionIOStats {
204 fn parquet_rg_pruned_stats_ratio(&self) -> f64 {
205 if let (Some(pruned), Some(matched)) = (
206 self.parquet_rg_matched_stats.as_ref(),
207 self.parquet_rg_pruned_stats.as_ref(),
208 ) {
209 let pruned = pruned.as_usize() as f64;
210 let matched = matched.as_usize() as f64;
211 matched / (pruned + matched)
212 } else {
213 0.0
214 }
215 }
216
217 fn parquet_rg_pruned_bloom_filter_ratio(&self) -> f64 {
218 if let (Some(pruned), Some(matched)) = (
219 self.parquet_rg_matched_bloom_filter.as_ref(),
220 self.parquet_rg_pruned_bloom_filter.as_ref(),
221 ) {
222 let pruned = pruned.as_usize() as f64;
223 let matched = matched.as_usize() as f64;
224 matched / (pruned + matched)
225 } else {
226 0.0
227 }
228 }
229
230 fn parquet_rg_pruned_page_index_ratio(&self) -> f64 {
231 if let (Some(pruned), Some(matched)) = (
232 self.parquet_matched_page_index.as_ref(),
233 self.parquet_pruned_page_index.as_ref(),
234 ) {
235 let pruned = pruned.as_usize() as f64;
236 let matched = matched.as_usize() as f64;
237 matched / (pruned + matched)
238 } else {
239 0.0
240 }
241 }
242
243 fn row_group_count(&self) -> usize {
244 if let (Some(pruned), Some(matched)) = (
245 self.parquet_rg_matched_stats.as_ref(),
246 self.parquet_rg_pruned_stats.as_ref(),
247 ) {
248 let pruned = pruned.as_usize();
249 let matched = matched.as_usize();
250 pruned + matched
251 } else {
252 0
253 }
254 }
255}
256
257impl std::fmt::Display for ExecutionIOStats {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 writeln!(
260 f,
261 "======================= IO Summary ========================"
262 )?;
263 writeln!(
264 f,
265 "{:<20} {:<20} {:<20}",
266 "Bytes Scanned", "Time Opening", "Time Scanning"
267 )?;
268 writeln!(
269 f,
270 "{:<20} {:<20} {:<20}",
271 self.bytes_scanned
272 .as_ref()
273 .map(|m| m.to_string())
274 .unwrap_or("None".to_string()),
275 self.time_opening
276 .as_ref()
277 .map(|m| m.to_string())
278 .unwrap_or("None".to_string()),
279 self.time_scanning
280 .as_ref()
281 .map(|m| m.to_string())
282 .unwrap_or("None".to_string())
283 )?;
284 writeln!(f)?;
285 writeln!(
286 f,
287 "Parquet Pruning Stats (Output Rows: {}, Row Groups: {} [{}ms per row group])",
288 self.parquet_output_rows
289 .as_ref()
290 .map(|m| m.to_string())
291 .unwrap_or("None".to_string()),
292 self.row_group_count(),
293 self.time_scanning
294 .as_ref()
295 .map(|ts| format!(
296 "{:.2}",
297 (ts.as_usize() / 1_000_000) as f64 / self.row_group_count() as f64
298 ))
299 .unwrap_or("None".to_string())
300 )?;
301 writeln!(
302 f,
303 "{:<20} {:<20} {:<20}",
304 "Matched RG Stats %", "Matched RG Bloom %", "Matched Page Index %"
305 )?;
306 writeln!(
307 f,
308 "{:<20.2} {:<20.2} {:<20.2}",
309 self.parquet_rg_pruned_stats_ratio(),
310 self.parquet_rg_pruned_bloom_filter_ratio(),
311 self.parquet_rg_pruned_page_index_ratio()
312 )?;
313 Ok(())
314 }
315}
316
317struct PlanIOVisitor {
322 bytes_scanned: Option<MetricValue>,
323 time_opening: Option<MetricValue>,
324 time_scanning: Option<MetricValue>,
325 parquet_output_rows: Option<usize>,
326 parquet_pruned_page_index: Option<MetricValue>,
327 parquet_matched_page_index: Option<MetricValue>,
328 parquet_rg_pruned_stats: Option<MetricValue>,
329 parquet_rg_matched_stats: Option<MetricValue>,
330 parquet_rg_pruned_bloom_filter: Option<MetricValue>,
331 parquet_rg_matched_bloom_filter: Option<MetricValue>,
332}
333
334impl PlanIOVisitor {
335 fn new() -> Self {
336 Self {
337 bytes_scanned: None,
338 time_opening: None,
339 time_scanning: None,
340 parquet_output_rows: None,
341 parquet_pruned_page_index: None,
342 parquet_matched_page_index: None,
343 parquet_rg_pruned_stats: None,
344 parquet_rg_matched_stats: None,
345 parquet_rg_pruned_bloom_filter: None,
346 parquet_rg_matched_bloom_filter: None,
347 }
348 }
349
350 fn collect_io_metrics(&mut self, plan: &dyn ExecutionPlan) {
351 let io_metrics = plan.metrics();
352 if let Some(metrics) = io_metrics {
353 self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
354 self.time_opening = metrics.sum_by_name("time_elapsed_opening");
355 self.time_scanning = metrics.sum_by_name("time_elapsed_scanning_total");
356
357 if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
358 if data_source_exec
359 .data_source()
360 .as_any()
361 .downcast_ref::<ParquetSource>()
362 .is_some()
363 {
364 self.parquet_output_rows = metrics.output_rows();
365 self.parquet_rg_pruned_stats =
366 metrics.sum_by_name("row_groups_pruned_statistics");
367 self.parquet_rg_matched_stats =
368 metrics.sum_by_name("row_groups_matched_statistics");
369 }
370 }
371 }
372 }
373}
374
375impl From<PlanIOVisitor> for ExecutionIOStats {
376 fn from(value: PlanIOVisitor) -> Self {
377 Self {
378 bytes_scanned: value.bytes_scanned,
379 time_opening: value.time_opening,
380 time_scanning: value.time_scanning,
381 parquet_output_rows: value.parquet_output_rows,
382 parquet_pruned_page_index: value.parquet_pruned_page_index,
383 parquet_matched_page_index: value.parquet_matched_page_index,
384 parquet_rg_pruned_stats: value.parquet_rg_pruned_stats,
385 parquet_rg_matched_stats: value.parquet_rg_matched_stats,
386 parquet_rg_pruned_bloom_filter: value.parquet_rg_pruned_bloom_filter,
387 parquet_rg_matched_bloom_filter: value.parquet_rg_matched_bloom_filter,
388 }
389 }
390}
391
392impl ExecutionPlanVisitor for PlanIOVisitor {
393 type Error = datafusion::common::DataFusionError;
394
395 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> color_eyre::Result<bool, Self::Error> {
396 if is_io_plan(plan) {
397 self.collect_io_metrics(plan);
398 }
399 Ok(true)
400 }
401}
402
403#[derive(Clone, Debug)]
404pub struct PartitionsComputeStats {
405 name: String,
406 elapsed_computes: Vec<usize>,
408}
409
410impl PartitionsComputeStats {
411 fn summary_stats(&self) -> (usize, usize, usize, usize, usize) {
412 if self.elapsed_computes.is_empty() {
413 (0, 0, 0, 0, 0)
414 } else {
415 let min = self.elapsed_computes[0];
416 let median = self.elapsed_computes[self.elapsed_computes.len() / 2];
417 let max = self.elapsed_computes[self.elapsed_computes.len() - 1];
418 let total: usize = self.elapsed_computes.iter().sum();
419 let mean = total / self.elapsed_computes.len();
420 (min, median, mean, max, total)
421 }
422 }
423
424 fn partitions(&self) -> usize {
425 self.elapsed_computes.len()
426 }
427}
428
429#[derive(Clone, Debug)]
430pub struct ExecutionComputeStats {
431 elapsed_compute: Option<usize>,
432 projection_compute: Option<Vec<PartitionsComputeStats>>,
433 filter_compute: Option<Vec<PartitionsComputeStats>>,
434 sort_compute: Option<Vec<PartitionsComputeStats>>,
435 join_compute: Option<Vec<PartitionsComputeStats>>,
436 aggregate_compute: Option<Vec<PartitionsComputeStats>>,
437 other_compute: Option<Vec<PartitionsComputeStats>>,
438}
439
440impl ExecutionComputeStats {
441 fn display_compute(
442 &self,
443 f: &mut std::fmt::Formatter<'_>,
444 compute: &Option<Vec<PartitionsComputeStats>>,
445 label: &str,
446 ) -> std::fmt::Result {
447 if let (Some(filter_compute), Some(elapsed_compute)) = (compute, &self.elapsed_compute) {
448 let partitions = filter_compute.iter().fold(0, |acc, c| acc + c.partitions());
449 writeln!(
450 f,
451 "{label} Stats ({} nodes, {} partitions)",
452 filter_compute.len(),
453 partitions
454 )?;
455 writeln!(
456 f,
457 "{:<30} {:<16} {:<16} {:<16} {:<16} {:<16}",
458 "Node(Partitions)", "Min", "Median", "Mean", "Max", "Total (%)"
459 )?;
460 filter_compute.iter().try_for_each(|node| {
461 let (min, median, mean, max, total) = node.summary_stats();
462 let total = format!(
463 "{} ({:.2}%)",
464 total,
465 (total as f32 / *elapsed_compute as f32) * 100.0
466 );
467 writeln!(
468 f,
469 "{:<30} {:<16} {:<16} {:<16} {:<16} {:<16}",
470 format!("{}({})", node.name, node.elapsed_computes.len()),
471 min,
472 median,
473 mean,
474 max,
475 total,
476 )
477 })
478 } else {
479 writeln!(f, "No {label} Stats")
480 }
481 }
482}
483
484impl std::fmt::Display for ExecutionComputeStats {
485 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486 writeln!(
487 f,
488 "==================================== Compute Summary ====================================="
489 )?;
490 writeln!(f, "{:<20}", "Elapsed Compute",)?;
491 writeln!(
492 f,
493 "{:<20}",
494 self.elapsed_compute
495 .as_ref()
496 .map(|m| m.to_string())
497 .unwrap_or("None".to_string()),
498 )?;
499 writeln!(f)?;
500 self.display_compute(f, &self.projection_compute, "Projection")?;
501 writeln!(f)?;
502 self.display_compute(f, &self.filter_compute, "Filter")?;
503 writeln!(f)?;
504 self.display_compute(f, &self.sort_compute, "Sort")?;
505 writeln!(f)?;
506 self.display_compute(f, &self.join_compute, "Join")?;
507 writeln!(f)?;
508 self.display_compute(f, &self.aggregate_compute, "Aggregate")?;
509 writeln!(f)?;
510 self.display_compute(f, &self.other_compute, "Other")?;
511 writeln!(f)
512 }
513}
514
515#[derive(Default)]
516pub struct PlanComputeVisitor {
517 elapsed_compute: Option<usize>,
518 filter_computes: Vec<PartitionsComputeStats>,
519 sort_computes: Vec<PartitionsComputeStats>,
520 projection_computes: Vec<PartitionsComputeStats>,
521 join_computes: Vec<PartitionsComputeStats>,
522 aggregate_computes: Vec<PartitionsComputeStats>,
523 other_computes: Vec<PartitionsComputeStats>,
524}
525
526impl PlanComputeVisitor {
527 fn add_elapsed_compute(&mut self, node_elapsed_compute: Option<usize>) {
528 match (self.elapsed_compute, node_elapsed_compute) {
529 (Some(agg_elapsed_compute), Some(node_elapsed_compute)) => {
530 self.elapsed_compute = Some(agg_elapsed_compute + node_elapsed_compute)
531 }
532 (Some(_), None) | (None, None) => {}
533 (None, Some(node_elapsed_compute)) => self.elapsed_compute = Some(node_elapsed_compute),
534 }
535 }
536
537 fn collect_compute_metrics(&mut self, plan: &dyn ExecutionPlan) {
538 let compute_metrics = plan.metrics();
539 if let Some(metrics) = compute_metrics {
540 self.add_elapsed_compute(metrics.elapsed_compute());
541 }
542 self.collect_filter_metrics(plan);
543 self.collect_sort_metrics(plan);
544 self.collect_projection_metrics(plan);
545 self.collect_join_metrics(plan);
546 self.collect_aggregate_metrics(plan);
547 self.collect_other_metrics(plan);
548 }
549
550 fn collect_filter_metrics(&mut self, plan: &dyn ExecutionPlan) {
552 if is_filter_plan(plan) {
553 if let Some(metrics) = plan.metrics() {
554 let sorted_computes: Vec<usize> = metrics
555 .iter()
556 .filter_map(|m| match m.value() {
557 MetricValue::ElapsedCompute(t) => Some(t.value()),
558 _ => None,
559 })
560 .sorted()
561 .collect();
562 let p = PartitionsComputeStats {
563 name: plan.name().to_string(),
564 elapsed_computes: sorted_computes,
565 };
566 self.filter_computes.push(p)
567 }
568 }
569 }
570
571 fn collect_sort_metrics(&mut self, plan: &dyn ExecutionPlan) {
572 if is_sort_plan(plan) {
573 if let Some(metrics) = plan.metrics() {
574 let sorted_computes: Vec<usize> = metrics
575 .iter()
576 .filter_map(|m| match m.value() {
577 MetricValue::ElapsedCompute(t) => Some(t.value()),
578 _ => None,
579 })
580 .sorted()
581 .collect();
582 let p = PartitionsComputeStats {
583 name: plan.name().to_string(),
584 elapsed_computes: sorted_computes,
585 };
586 self.sort_computes.push(p)
587 }
588 }
589 }
590
591 fn collect_projection_metrics(&mut self, plan: &dyn ExecutionPlan) {
592 if is_projection_plan(plan) {
593 if let Some(metrics) = plan.metrics() {
594 let sorted_computes: Vec<usize> = metrics
595 .iter()
596 .filter_map(|m| match m.value() {
597 MetricValue::ElapsedCompute(t) => Some(t.value()),
598 _ => None,
599 })
600 .sorted()
601 .collect();
602 let p = PartitionsComputeStats {
603 name: plan.name().to_string(),
604 elapsed_computes: sorted_computes,
605 };
606 self.projection_computes.push(p)
607 }
608 }
609 }
610
611 fn collect_join_metrics(&mut self, plan: &dyn ExecutionPlan) {
612 if is_join_plan(plan) {
613 if let Some(metrics) = plan.metrics() {
614 let sorted_computes: Vec<usize> = metrics
615 .iter()
616 .filter_map(|m| match m.value() {
617 MetricValue::ElapsedCompute(t) => Some(t.value()),
618 _ => None,
619 })
620 .sorted()
621 .collect();
622 let p = PartitionsComputeStats {
623 name: plan.name().to_string(),
624 elapsed_computes: sorted_computes,
625 };
626 self.join_computes.push(p)
627 }
628 }
629 }
630
631 fn collect_aggregate_metrics(&mut self, plan: &dyn ExecutionPlan) {
632 if is_aggregate_plan(plan) {
633 if let Some(metrics) = plan.metrics() {
634 let sorted_computes: Vec<usize> = metrics
635 .iter()
636 .filter_map(|m| match m.value() {
637 MetricValue::ElapsedCompute(t) => Some(t.value()),
638 _ => None,
639 })
640 .sorted()
641 .collect();
642 let p = PartitionsComputeStats {
643 name: plan.name().to_string(),
644 elapsed_computes: sorted_computes,
645 };
646 self.aggregate_computes.push(p)
647 }
648 }
649 }
650
651 fn collect_other_metrics(&mut self, plan: &dyn ExecutionPlan) {
652 if !is_filter_plan(plan)
653 && !is_sort_plan(plan)
654 && !is_projection_plan(plan)
655 && !is_aggregate_plan(plan)
656 && !is_join_plan(plan)
657 {
658 if let Some(metrics) = plan.metrics() {
659 let sorted_computes: Vec<usize> = metrics
660 .iter()
661 .filter_map(|m| match m.value() {
662 MetricValue::ElapsedCompute(t) => Some(t.value()),
663 _ => None,
664 })
665 .sorted()
666 .collect();
667 let p = PartitionsComputeStats {
668 name: plan.name().to_string(),
669 elapsed_computes: sorted_computes,
670 };
671 self.other_computes.push(p)
672 }
673 }
674 }
675}
676
677fn is_filter_plan(plan: &dyn ExecutionPlan) -> bool {
678 plan.as_any().downcast_ref::<FilterExec>().is_some()
679}
680
681fn is_sort_plan(plan: &dyn ExecutionPlan) -> bool {
682 plan.as_any().downcast_ref::<SortExec>().is_some()
683 || plan
684 .as_any()
685 .downcast_ref::<SortPreservingMergeExec>()
686 .is_some()
687}
688
689fn is_projection_plan(plan: &dyn ExecutionPlan) -> bool {
690 plan.as_any().downcast_ref::<ProjectionExec>().is_some()
691}
692
693fn is_join_plan(plan: &dyn ExecutionPlan) -> bool {
694 plan.as_any().downcast_ref::<HashJoinExec>().is_some()
695 || plan.as_any().downcast_ref::<CrossJoinExec>().is_some()
696 || plan.as_any().downcast_ref::<SortMergeJoinExec>().is_some()
697 || plan.as_any().downcast_ref::<NestedLoopJoinExec>().is_some()
698 || plan
699 .as_any()
700 .downcast_ref::<SymmetricHashJoinExec>()
701 .is_some()
702}
703
704fn is_aggregate_plan(plan: &dyn ExecutionPlan) -> bool {
705 plan.as_any().downcast_ref::<AggregateExec>().is_some()
706}
707
708impl From<PlanComputeVisitor> for ExecutionComputeStats {
709 fn from(value: PlanComputeVisitor) -> Self {
710 Self {
711 elapsed_compute: value.elapsed_compute,
712 filter_compute: Some(value.filter_computes),
713 sort_compute: Some(value.sort_computes),
714 projection_compute: Some(value.projection_computes),
715 join_compute: Some(value.join_computes),
716 aggregate_compute: Some(value.aggregate_computes),
717 other_compute: Some(value.other_computes),
718 }
719 }
720}
721
722impl ExecutionPlanVisitor for PlanComputeVisitor {
723 type Error = datafusion::common::DataFusionError;
724
725 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> color_eyre::Result<bool, Self::Error> {
726 if !is_io_plan(plan) {
727 self.collect_compute_metrics(plan);
728 }
729 Ok(true)
730 }
731}
732
733fn is_io_plan(plan: &dyn ExecutionPlan) -> bool {
734 let io_plans = ["CsvExec", "ParquetExec", "ArrowExec"];
735 io_plans.contains(&plan.name())
736}
737
738pub fn collect_plan_io_stats(plan: Arc<dyn ExecutionPlan>) -> Option<ExecutionIOStats> {
739 let mut visitor = PlanIOVisitor::new();
740 if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() {
741 Some(visitor.into())
742 } else {
743 None
744 }
745}
746
747pub fn collect_plan_compute_stats(plan: Arc<dyn ExecutionPlan>) -> Option<ExecutionComputeStats> {
748 let mut visitor = PlanComputeVisitor::default();
749 if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() {
750 Some(visitor.into())
751 } else {
752 None
753 }
754}
755
756pub fn print_io_summary(plan: Arc<dyn ExecutionPlan>) {
757 println!("======================= IO Summary ========================");
758 if let Some(stats) = collect_plan_io_stats(plan) {
759 println!("IO Stats: {:#?}", stats);
760 } else {
761 println!("No IO metrics found");
762 }
763}