datafusion_datasource_orc/
metrics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Performance metrics for ORC file operations.
19//!
20//! This module provides metrics for monitoring and analyzing ORC file reading
21//! performance, including I/O statistics, pruning effectiveness, and timing.
22//!
23//! # Metric Categories
24//!
25//! - **I/O Metrics**: Track bytes scanned and I/O requests
26//! - **Metadata Metrics**: Track metadata loading time
27//! - **Stripe Pruning**: Track stripe-level filtering effectiveness
28//! - **Predicate Evaluation**: Track predicate pushdown statistics
29//!
30//! # Example
31//!
32//! ```ignore
33//! use datafusion_datasource_orc::metrics::OrcFileMetrics;
34//! use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
35//!
36//! let metrics_set = ExecutionPlanMetricsSet::new();
37//! let metrics = OrcFileMetrics::new(0, "example.orc", &metrics_set);
38//!
39//! // Record bytes scanned
40//! metrics.bytes_scanned.add(1024);
41//!
42//! // Record metadata load time
43//! let timer = metrics.metadata_load_time.timer();
44//! // ... load metadata ...
45//! timer.done();
46//! ```
47
48use datafusion_physical_plan::metrics::{
49    Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, PruningMetrics, Time,
50};
51
52/// Metrics for ORC file operations.
53///
54/// Tracks performance statistics for reading ORC files, including I/O,
55/// metadata loading, stripe pruning, and predicate evaluation.
56#[derive(Debug, Clone)]
57pub struct OrcFileMetrics {
58    // =========================================================================
59    // I/O Metrics
60    // =========================================================================
61    /// Total number of bytes scanned from the file.
62    ///
63    /// This includes all data read from the object store, including metadata,
64    /// stripe data, and index data.
65    pub bytes_scanned: Count,
66
67    /// Total file size in bytes (for calculating scan efficiency).
68    pub file_size: Count,
69
70    /// Number of I/O requests made to the object store.
71    pub io_requests: Count,
72
73    // =========================================================================
74    // Metadata Metrics
75    // =========================================================================
76    /// Time spent reading and parsing ORC file metadata (footer, postscript).
77    pub metadata_load_time: Time,
78
79    // =========================================================================
80    // Stripe Pruning Metrics
81    // =========================================================================
82    /// Number of stripes pruned or matched by statistics.
83    ///
84    /// This uses `PruningMetrics` which tracks both pruned and matched counts.
85    /// - `pruned`: Stripes skipped due to statistics not matching the predicate
86    /// - `matched`: Stripes that were read because they might contain matching rows
87    pub stripes_pruned_statistics: PruningMetrics,
88
89    /// Time spent evaluating stripe-level statistics for pruning.
90    pub statistics_eval_time: Time,
91
92    // =========================================================================
93    // Predicate Evaluation Metrics
94    // =========================================================================
95    /// Number of times predicate evaluation encountered errors.
96    ///
97    /// This can happen when statistics are malformed or when the predicate
98    /// cannot be evaluated against the available metadata.
99    pub predicate_evaluation_errors: Count,
100
101    /// Number of rows filtered out by predicates pushed into the ORC scan.
102    pub pushdown_rows_pruned: Count,
103
104    /// Number of rows that passed predicates pushed into the ORC scan.
105    pub pushdown_rows_matched: Count,
106
107    // =========================================================================
108    // Decode Metrics
109    // =========================================================================
110    /// Time spent decoding ORC data into Arrow arrays.
111    pub decode_time: Time,
112
113    /// Total number of rows decoded from the file.
114    pub rows_decoded: Count,
115
116    /// Number of RecordBatches produced.
117    pub batches_produced: Count,
118}
119
120impl OrcFileMetrics {
121    /// Create new ORC file metrics.
122    ///
123    /// # Arguments
124    ///
125    /// * `partition` - The partition index for this scan
126    /// * `filename` - The name of the ORC file being scanned
127    /// * `metrics` - The metrics set to register metrics with
128    pub fn new(partition: usize, filename: &str, metrics: &ExecutionPlanMetricsSet) -> Self {
129        // -----------------------
130        // Summary level metrics (user-visible)
131        // -----------------------
132        let bytes_scanned = MetricBuilder::new(metrics)
133            .with_new_label("filename", filename.to_string())
134            .with_type(MetricType::SUMMARY)
135            .counter("bytes_scanned", partition);
136
137        let file_size = MetricBuilder::new(metrics)
138            .with_new_label("filename", filename.to_string())
139            .with_type(MetricType::SUMMARY)
140            .counter("file_size", partition);
141
142        let metadata_load_time = MetricBuilder::new(metrics)
143            .with_new_label("filename", filename.to_string())
144            .with_type(MetricType::SUMMARY)
145            .subset_time("metadata_load_time", partition);
146
147        let stripes_pruned_statistics = MetricBuilder::new(metrics)
148            .with_new_label("filename", filename.to_string())
149            .with_type(MetricType::SUMMARY)
150            .pruning_metrics("stripes_pruned_statistics", partition);
151
152        // -----------------------
153        // Dev level metrics (for debugging/optimization)
154        // -----------------------
155        let io_requests = MetricBuilder::new(metrics)
156            .with_new_label("filename", filename.to_string())
157            .counter("io_requests", partition);
158
159        let statistics_eval_time = MetricBuilder::new(metrics)
160            .with_new_label("filename", filename.to_string())
161            .subset_time("statistics_eval_time", partition);
162
163        let predicate_evaluation_errors = MetricBuilder::new(metrics)
164            .with_new_label("filename", filename.to_string())
165            .counter("predicate_evaluation_errors", partition);
166
167        let pushdown_rows_pruned = MetricBuilder::new(metrics)
168            .with_new_label("filename", filename.to_string())
169            .counter("pushdown_rows_pruned", partition);
170
171        let pushdown_rows_matched = MetricBuilder::new(metrics)
172            .with_new_label("filename", filename.to_string())
173            .counter("pushdown_rows_matched", partition);
174
175        let decode_time = MetricBuilder::new(metrics)
176            .with_new_label("filename", filename.to_string())
177            .subset_time("decode_time", partition);
178
179        let rows_decoded = MetricBuilder::new(metrics)
180            .with_new_label("filename", filename.to_string())
181            .counter("rows_decoded", partition);
182
183        let batches_produced = MetricBuilder::new(metrics)
184            .with_new_label("filename", filename.to_string())
185            .counter("batches_produced", partition);
186
187        Self {
188            bytes_scanned,
189            file_size,
190            io_requests,
191            metadata_load_time,
192            stripes_pruned_statistics,
193            statistics_eval_time,
194            predicate_evaluation_errors,
195            pushdown_rows_pruned,
196            pushdown_rows_matched,
197            decode_time,
198            rows_decoded,
199            batches_produced,
200        }
201    }
202
203    /// Calculate the scan efficiency ratio.
204    ///
205    /// Returns the ratio of bytes scanned to total file size.
206    /// - 1.0 = entire file was read
207    /// - 0.5 = half the file was read (due to projection/filtering)
208    /// - 0.0 = no data read (all stripes pruned)
209    ///
210    /// Returns `None` if file size is zero.
211    pub fn scan_efficiency(&self) -> Option<f64> {
212        let file_size = self.file_size.value();
213        if file_size == 0 {
214            return None;
215        }
216        Some(self.bytes_scanned.value() as f64 / file_size as f64)
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223
224    #[test]
225    fn test_orc_file_metrics_creation() {
226        let metrics_set = ExecutionPlanMetricsSet::new();
227        let metrics = OrcFileMetrics::new(0, "test.orc", &metrics_set);
228
229        // Verify metrics are initialized to zero
230        assert_eq!(metrics.bytes_scanned.value(), 0);
231        assert_eq!(metrics.io_requests.value(), 0);
232        assert_eq!(metrics.rows_decoded.value(), 0);
233        assert_eq!(metrics.batches_produced.value(), 0);
234    }
235
236    #[test]
237    fn test_orc_file_metrics_recording() {
238        let metrics_set = ExecutionPlanMetricsSet::new();
239        let metrics = OrcFileMetrics::new(0, "test.orc", &metrics_set);
240
241        // Record some metrics
242        metrics.bytes_scanned.add(1024);
243        metrics.io_requests.add(5);
244        metrics.rows_decoded.add(100);
245        metrics.batches_produced.add(2);
246
247        assert_eq!(metrics.bytes_scanned.value(), 1024);
248        assert_eq!(metrics.io_requests.value(), 5);
249        assert_eq!(metrics.rows_decoded.value(), 100);
250        assert_eq!(metrics.batches_produced.value(), 2);
251    }
252
253    #[test]
254    fn test_orc_file_metrics_scan_efficiency() {
255        let metrics_set = ExecutionPlanMetricsSet::new();
256        let metrics = OrcFileMetrics::new(0, "test.orc", &metrics_set);
257
258        // Set file size and bytes scanned
259        metrics.file_size.add(1024);
260        metrics.bytes_scanned.add(512);
261
262        // Scan efficiency should be 0.5 (512/1024)
263        let efficiency = metrics.scan_efficiency().unwrap();
264        assert!((efficiency - 0.5).abs() < 0.001);
265    }
266
267    #[test]
268    fn test_orc_file_metrics_scan_efficiency_zero_file() {
269        let metrics_set = ExecutionPlanMetricsSet::new();
270        let metrics = OrcFileMetrics::new(0, "test.orc", &metrics_set);
271
272        // File size is zero
273        assert!(metrics.scan_efficiency().is_none());
274    }
275
276    #[test]
277    fn test_orc_file_metrics_stripe_pruning() {
278        let metrics_set = ExecutionPlanMetricsSet::new();
279        let metrics = OrcFileMetrics::new(0, "test.orc", &metrics_set);
280
281        // Record stripe pruning
282        metrics.stripes_pruned_statistics.add_pruned(3);
283        metrics.stripes_pruned_statistics.add_matched(2);
284
285        assert_eq!(metrics.stripes_pruned_statistics.pruned(), 3);
286        assert_eq!(metrics.stripes_pruned_statistics.matched(), 2);
287    }
288
289    #[test]
290    fn test_orc_file_metrics_predicate_evaluation() {
291        let metrics_set = ExecutionPlanMetricsSet::new();
292        let metrics = OrcFileMetrics::new(0, "test.orc", &metrics_set);
293
294        // Record predicate evaluation results
295        metrics.pushdown_rows_pruned.add(50);
296        metrics.pushdown_rows_matched.add(100);
297        metrics.predicate_evaluation_errors.add(1);
298
299        assert_eq!(metrics.pushdown_rows_pruned.value(), 50);
300        assert_eq!(metrics.pushdown_rows_matched.value(), 100);
301        assert_eq!(metrics.predicate_evaluation_errors.value(), 1);
302    }
303
304    #[test]
305    fn test_orc_file_metrics_timing() {
306        let metrics_set = ExecutionPlanMetricsSet::new();
307        let metrics = OrcFileMetrics::new(0, "test.orc", &metrics_set);
308
309        // Test timer (basic functionality)
310        let timer = metrics.metadata_load_time.timer();
311        // Simulate some work
312        std::thread::sleep(std::time::Duration::from_millis(1));
313        timer.done();
314
315        // Time should be recorded (greater than 0)
316        assert!(metrics.metadata_load_time.value() > 0);
317    }
318}