Skip to main content

datafusion_datasource/file_stream/
metrics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion_common::instant::Instant;
19use datafusion_physical_plan::metrics::{
20    Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, Time,
21};
22
23/// A timer that can be started and stopped.
24pub struct StartableTime {
25    pub metrics: Time,
26    // use for record each part cost time, will eventually add into 'metrics'.
27    pub start: Option<Instant>,
28}
29
30impl StartableTime {
31    pub fn start(&mut self) {
32        assert!(self.start.is_none());
33        self.start = Some(Instant::now());
34    }
35
36    pub fn stop(&mut self) {
37        if let Some(start) = self.start.take() {
38            self.metrics.add_elapsed(start);
39        }
40    }
41}
42
43/// Metrics for [`FileStream`]
44///
45/// Note that all of these metrics are in terms of wall clock time
46/// (not cpu time) so they include time spent waiting on I/O as well
47/// as other operators.
48///
49/// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_stream.rs>
50pub struct FileStreamMetrics {
51    /// Wall clock time elapsed for file opening.
52    ///
53    /// Time between when [`FileOpener::open`] is called and when the
54    /// [`FileStream`] receives a stream for reading.
55    ///
56    /// [`FileStream`]: crate::file_stream::FileStream
57    /// [`FileOpener::open`]: crate::file_stream::FileOpener::open
58    pub time_opening: StartableTime,
59    /// Wall clock time elapsed for file scanning + first record batch of decompression + decoding
60    ///
61    /// Time between when the [`FileStream`] requests data from the
62    /// stream and when the first [`RecordBatch`] is produced.
63    ///
64    /// [`FileStream`]: crate::file_stream::FileStream
65    /// [`RecordBatch`]: arrow::record_batch::RecordBatch
66    pub time_scanning_until_data: StartableTime,
67    /// Total elapsed wall clock time for scanning + record batch decompression / decoding
68    ///
69    /// Sum of time between when the [`FileStream`] requests data from
70    /// the stream and when a [`RecordBatch`] is produced for all
71    /// record batches in the stream. Note that this metric also
72    /// includes the time of the parent operator's execution.
73    ///
74    /// [`FileStream`]: crate::file_stream::FileStream
75    /// [`RecordBatch`]: arrow::record_batch::RecordBatch
76    pub time_scanning_total: StartableTime,
77    /// Wall clock time elapsed for data decompression + decoding
78    ///
79    /// Time spent waiting for the FileStream's input.
80    pub time_processing: Time,
81    /// Count of errors opening file.
82    ///
83    /// If using `OnError::Skip` this will provide a count of the number of files
84    /// which were skipped and will not be included in the scan results.
85    pub file_open_errors: Count,
86    /// Count of errors scanning file
87    ///
88    /// If using `OnError::Skip` this will provide a count of the number of files
89    /// which were skipped and will not be included in the scan results.
90    pub file_scan_errors: Count,
91    /// Count of files successfully opened or evaluated for processing.
92    /// At t=end (completion of a query) this is equal to `files_opened`, and both values are equal
93    /// to the total number of files in the query; unless the query itself fails.
94    /// This value will always be greater than or equal to `files_open`.
95    /// Note that this value does *not* mean the file was actually scanned.
96    /// We increment this value for any processing of a file, even if that processing is
97    /// discarding it because we hit a `LIMIT` (in this case `files_opened` and `files_processed` are both incremented at the same time).
98    pub files_opened: Count,
99    /// Count of files completely processed / closed (opened, pruned, or skipped due to limit).
100    /// At t=0 (the beginning of a query) this is 0.
101    /// At t=end (completion of a query) this is equal to `files_opened`, and both values are equal
102    /// to the total number of files in the query; unless the query itself fails.
103    /// This value will always be less than or equal to `files_open`.
104    /// We increment this value for any processing of a file, even if that processing is
105    /// discarding it because we hit a `LIMIT` (in this case `files_opened` and `files_processed` are both incremented at the same time).
106    pub files_processed: Count,
107}
108
109impl FileStreamMetrics {
110    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
111        let time_opening = StartableTime {
112            metrics: MetricBuilder::new(metrics)
113                .subset_time("time_elapsed_opening", partition),
114            start: None,
115        };
116
117        let time_scanning_until_data = StartableTime {
118            metrics: MetricBuilder::new(metrics)
119                .subset_time("time_elapsed_scanning_until_data", partition),
120            start: None,
121        };
122
123        let time_scanning_total = StartableTime {
124            metrics: MetricBuilder::new(metrics)
125                .subset_time("time_elapsed_scanning_total", partition),
126            start: None,
127        };
128
129        let time_processing =
130            MetricBuilder::new(metrics).subset_time("time_elapsed_processing", partition);
131
132        let file_open_errors = MetricBuilder::new(metrics)
133            .with_category(MetricCategory::Rows)
134            .counter("file_open_errors", partition);
135
136        let file_scan_errors = MetricBuilder::new(metrics)
137            .with_category(MetricCategory::Rows)
138            .counter("file_scan_errors", partition);
139
140        let files_opened = MetricBuilder::new(metrics)
141            .with_category(MetricCategory::Rows)
142            .counter("files_opened", partition);
143
144        let files_processed = MetricBuilder::new(metrics)
145            .with_category(MetricCategory::Rows)
146            .counter("files_processed", partition);
147
148        Self {
149            time_opening,
150            time_scanning_until_data,
151            time_scanning_total,
152            time_processing,
153            file_open_errors,
154            file_scan_errors,
155            files_opened,
156            files_processed,
157        }
158    }
159}