datafusion_datasource/file_stream/metrics.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion_common::instant::Instant;
19use datafusion_physical_plan::metrics::{
20 Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, Time,
21};
22
23/// A timer that can be started and stopped.
24pub struct StartableTime {
25 pub metrics: Time,
26 // use for record each part cost time, will eventually add into 'metrics'.
27 pub start: Option<Instant>,
28}
29
30impl StartableTime {
31 pub fn start(&mut self) {
32 assert!(self.start.is_none());
33 self.start = Some(Instant::now());
34 }
35
36 pub fn stop(&mut self) {
37 if let Some(start) = self.start.take() {
38 self.metrics.add_elapsed(start);
39 }
40 }
41}
42
43/// Metrics for [`FileStream`]
44///
45/// Note that all of these metrics are in terms of wall clock time
46/// (not cpu time) so they include time spent waiting on I/O as well
47/// as other operators.
48///
49/// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_stream.rs>
50pub struct FileStreamMetrics {
51 /// Wall clock time elapsed for file opening.
52 ///
53 /// Time between when [`FileOpener::open`] is called and when the
54 /// [`FileStream`] receives a stream for reading.
55 ///
56 /// [`FileStream`]: crate::file_stream::FileStream
57 /// [`FileOpener::open`]: crate::file_stream::FileOpener::open
58 pub time_opening: StartableTime,
59 /// Wall clock time elapsed for file scanning + first record batch of decompression + decoding
60 ///
61 /// Time between when the [`FileStream`] requests data from the
62 /// stream and when the first [`RecordBatch`] is produced.
63 ///
64 /// [`FileStream`]: crate::file_stream::FileStream
65 /// [`RecordBatch`]: arrow::record_batch::RecordBatch
66 pub time_scanning_until_data: StartableTime,
67 /// Total elapsed wall clock time for scanning + record batch decompression / decoding
68 ///
69 /// Sum of time between when the [`FileStream`] requests data from
70 /// the stream and when a [`RecordBatch`] is produced for all
71 /// record batches in the stream. Note that this metric also
72 /// includes the time of the parent operator's execution.
73 ///
74 /// [`FileStream`]: crate::file_stream::FileStream
75 /// [`RecordBatch`]: arrow::record_batch::RecordBatch
76 pub time_scanning_total: StartableTime,
77 /// Wall clock time elapsed for data decompression + decoding
78 ///
79 /// Time spent waiting for the FileStream's input.
80 pub time_processing: Time,
81 /// Count of errors opening file.
82 ///
83 /// If using `OnError::Skip` this will provide a count of the number of files
84 /// which were skipped and will not be included in the scan results.
85 pub file_open_errors: Count,
86 /// Count of errors scanning file
87 ///
88 /// If using `OnError::Skip` this will provide a count of the number of files
89 /// which were skipped and will not be included in the scan results.
90 pub file_scan_errors: Count,
91 /// Count of files successfully opened or evaluated for processing.
92 /// At t=end (completion of a query) this is equal to `files_opened`, and both values are equal
93 /// to the total number of files in the query; unless the query itself fails.
94 /// This value will always be greater than or equal to `files_open`.
95 /// Note that this value does *not* mean the file was actually scanned.
96 /// We increment this value for any processing of a file, even if that processing is
97 /// discarding it because we hit a `LIMIT` (in this case `files_opened` and `files_processed` are both incremented at the same time).
98 pub files_opened: Count,
99 /// Count of files completely processed / closed (opened, pruned, or skipped due to limit).
100 /// At t=0 (the beginning of a query) this is 0.
101 /// At t=end (completion of a query) this is equal to `files_opened`, and both values are equal
102 /// to the total number of files in the query; unless the query itself fails.
103 /// This value will always be less than or equal to `files_open`.
104 /// We increment this value for any processing of a file, even if that processing is
105 /// discarding it because we hit a `LIMIT` (in this case `files_opened` and `files_processed` are both incremented at the same time).
106 pub files_processed: Count,
107}
108
109impl FileStreamMetrics {
110 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
111 let time_opening = StartableTime {
112 metrics: MetricBuilder::new(metrics)
113 .subset_time("time_elapsed_opening", partition),
114 start: None,
115 };
116
117 let time_scanning_until_data = StartableTime {
118 metrics: MetricBuilder::new(metrics)
119 .subset_time("time_elapsed_scanning_until_data", partition),
120 start: None,
121 };
122
123 let time_scanning_total = StartableTime {
124 metrics: MetricBuilder::new(metrics)
125 .subset_time("time_elapsed_scanning_total", partition),
126 start: None,
127 };
128
129 let time_processing =
130 MetricBuilder::new(metrics).subset_time("time_elapsed_processing", partition);
131
132 let file_open_errors = MetricBuilder::new(metrics)
133 .with_category(MetricCategory::Rows)
134 .counter("file_open_errors", partition);
135
136 let file_scan_errors = MetricBuilder::new(metrics)
137 .with_category(MetricCategory::Rows)
138 .counter("file_scan_errors", partition);
139
140 let files_opened = MetricBuilder::new(metrics)
141 .with_category(MetricCategory::Rows)
142 .counter("files_opened", partition);
143
144 let files_processed = MetricBuilder::new(metrics)
145 .with_category(MetricCategory::Rows)
146 .counter("files_processed", partition);
147
148 Self {
149 time_opening,
150 time_scanning_until_data,
151 time_scanning_total,
152 time_processing,
153 file_open_errors,
154 file_scan_errors,
155 files_opened,
156 files_processed,
157 }
158 }
159}