datafusion_datasource_parquet/
metrics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion_physical_plan::metrics::{
19    Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, PruningMetrics,
20    RatioMergeStrategy, RatioMetrics, Time,
21};
22
23/// Stores metrics about the parquet execution for a particular parquet file.
24///
25/// This component is a subject to **change** in near future and is exposed for low level integrations
26/// through [`ParquetFileReaderFactory`].
27///
28/// [`ParquetFileReaderFactory`]: super::ParquetFileReaderFactory
29#[derive(Debug, Clone)]
30pub struct ParquetFileMetrics {
31    /// Number of file **ranges** pruned or matched by partition or file level statistics.
32    /// Pruning of files often happens at planning time but may happen at execution time
33    /// if dynamic filters (e.g. from a join) result in additional pruning.
34    ///
35    /// This does **not** necessarily equal the number of files pruned:
36    /// files may be scanned in sub-ranges to increase parallelism,
37    /// in which case this will represent the number of sub-ranges pruned, not the number of files.
38    /// The number of files pruned will always be less than or equal to this number.
39    ///
40    /// A single file may have some ranges that are not pruned and some that are pruned.
41    /// For example, with a query like `ORDER BY col LIMIT 10`, the TopK dynamic filter
42    /// pushdown optimization may fill up the TopK heap when reading the first part of a file,
43    /// then skip the second part if file statistics indicate it cannot contain rows
44    /// that would be in the TopK.
45    pub files_ranges_pruned_statistics: PruningMetrics,
46    /// Number of times the predicate could not be evaluated
47    pub predicate_evaluation_errors: Count,
48    /// Number of row groups whose bloom filters were checked, tracked with matched/pruned counts
49    pub row_groups_pruned_bloom_filter: PruningMetrics,
50    /// Number of row groups whose statistics were checked, tracked with matched/pruned counts
51    pub row_groups_pruned_statistics: PruningMetrics,
52    /// Total number of bytes scanned
53    pub bytes_scanned: Count,
54    /// Total rows filtered out by predicates pushed into parquet scan
55    pub pushdown_rows_pruned: Count,
56    /// Total rows passed predicates pushed into parquet scan
57    pub pushdown_rows_matched: Count,
58    /// Total time spent evaluating row-level pushdown filters
59    pub row_pushdown_eval_time: Time,
60    /// Total time spent evaluating row group-level statistics filters
61    pub statistics_eval_time: Time,
62    /// Total time spent evaluating row group Bloom Filters
63    pub bloom_filter_eval_time: Time,
64    /// Total rows filtered or matched by parquet page index
65    pub page_index_rows_pruned: PruningMetrics,
66    /// Total time spent evaluating parquet page index filters
67    pub page_index_eval_time: Time,
68    /// Total time spent reading and parsing metadata from the footer
69    pub metadata_load_time: Time,
70    /// Scan Efficiency Ratio, calculated as bytes_scanned / total_file_size
71    pub scan_efficiency_ratio: RatioMetrics,
72    /// Predicate Cache: number of records read directly from the inner reader.
73    /// This is the number of rows decoded while evaluating predicates
74    pub predicate_cache_inner_records: Count,
75    /// Predicate Cache: number of records read from the cache. This is the
76    /// number of rows that were stored in the cache after evaluating predicates
77    /// reused for the output.
78    pub predicate_cache_records: Count,
79}
80
81impl ParquetFileMetrics {
82    /// Create new metrics
83    pub fn new(
84        partition: usize,
85        filename: &str,
86        metrics: &ExecutionPlanMetricsSet,
87    ) -> Self {
88        // -----------------------
89        // 'summary' level metrics
90        // -----------------------
91        let row_groups_pruned_bloom_filter = MetricBuilder::new(metrics)
92            .with_new_label("filename", filename.to_string())
93            .with_type(MetricType::SUMMARY)
94            .pruning_metrics("row_groups_pruned_bloom_filter", partition);
95
96        let row_groups_pruned_statistics = MetricBuilder::new(metrics)
97            .with_new_label("filename", filename.to_string())
98            .with_type(MetricType::SUMMARY)
99            .pruning_metrics("row_groups_pruned_statistics", partition);
100
101        let page_index_rows_pruned = MetricBuilder::new(metrics)
102            .with_new_label("filename", filename.to_string())
103            .with_type(MetricType::SUMMARY)
104            .pruning_metrics("page_index_rows_pruned", partition);
105
106        let bytes_scanned = MetricBuilder::new(metrics)
107            .with_new_label("filename", filename.to_string())
108            .with_type(MetricType::SUMMARY)
109            .counter("bytes_scanned", partition);
110
111        let metadata_load_time = MetricBuilder::new(metrics)
112            .with_new_label("filename", filename.to_string())
113            .with_type(MetricType::SUMMARY)
114            .subset_time("metadata_load_time", partition);
115
116        let files_ranges_pruned_statistics = MetricBuilder::new(metrics)
117            .with_type(MetricType::SUMMARY)
118            .pruning_metrics("files_ranges_pruned_statistics", partition);
119
120        let scan_efficiency_ratio = MetricBuilder::new(metrics)
121            .with_new_label("filename", filename.to_string())
122            .with_type(MetricType::SUMMARY)
123            .ratio_metrics_with_strategy(
124                "scan_efficiency_ratio",
125                partition,
126                RatioMergeStrategy::AddPartSetTotal,
127            );
128
129        // -----------------------
130        // 'dev' level metrics
131        // -----------------------
132        let predicate_evaluation_errors = MetricBuilder::new(metrics)
133            .with_new_label("filename", filename.to_string())
134            .counter("predicate_evaluation_errors", partition);
135
136        let pushdown_rows_pruned = MetricBuilder::new(metrics)
137            .with_new_label("filename", filename.to_string())
138            .counter("pushdown_rows_pruned", partition);
139        let pushdown_rows_matched = MetricBuilder::new(metrics)
140            .with_new_label("filename", filename.to_string())
141            .counter("pushdown_rows_matched", partition);
142
143        let row_pushdown_eval_time = MetricBuilder::new(metrics)
144            .with_new_label("filename", filename.to_string())
145            .subset_time("row_pushdown_eval_time", partition);
146        let statistics_eval_time = MetricBuilder::new(metrics)
147            .with_new_label("filename", filename.to_string())
148            .subset_time("statistics_eval_time", partition);
149        let bloom_filter_eval_time = MetricBuilder::new(metrics)
150            .with_new_label("filename", filename.to_string())
151            .subset_time("bloom_filter_eval_time", partition);
152
153        let page_index_eval_time = MetricBuilder::new(metrics)
154            .with_new_label("filename", filename.to_string())
155            .subset_time("page_index_eval_time", partition);
156
157        let predicate_cache_inner_records = MetricBuilder::new(metrics)
158            .with_new_label("filename", filename.to_string())
159            .counter("predicate_cache_inner_records", partition);
160
161        let predicate_cache_records = MetricBuilder::new(metrics)
162            .with_new_label("filename", filename.to_string())
163            .counter("predicate_cache_records", partition);
164
165        Self {
166            files_ranges_pruned_statistics,
167            predicate_evaluation_errors,
168            row_groups_pruned_bloom_filter,
169            row_groups_pruned_statistics,
170            bytes_scanned,
171            pushdown_rows_pruned,
172            pushdown_rows_matched,
173            row_pushdown_eval_time,
174            page_index_rows_pruned,
175            statistics_eval_time,
176            bloom_filter_eval_time,
177            page_index_eval_time,
178            metadata_load_time,
179            scan_efficiency_ratio,
180            predicate_cache_inner_records,
181            predicate_cache_records,
182        }
183    }
184}