delta_kernel/metrics/
metered_engine.rs1use std::sync::Arc;
11
12use crate::metrics::{MeteredJsonHandler, MeteredParquetHandler, MeteredStorageHandler};
13use crate::{Engine, EvaluationHandler, JsonHandler, ParquetHandler, StorageHandler};
14
15pub struct MeteredDeltaEngine {
18 inner: Arc<dyn Engine>,
19 storage: Arc<dyn StorageHandler>,
20 json: Arc<dyn JsonHandler>,
21 parquet: Arc<dyn ParquetHandler>,
22}
23
24impl MeteredDeltaEngine {
25 pub fn new(inner: Arc<dyn Engine>) -> Self {
33 let inner_storage = inner.storage_handler();
34 debug_assert!(
35 !inner_storage.any_ref().is::<MeteredStorageHandler>(),
36 "MeteredDeltaEngine wraps an engine whose storage_handler is already a \
37 MeteredStorageHandler; remove the outer wrap to avoid double-counting metrics",
38 );
39 let inner_json = inner.json_handler();
40 debug_assert!(
41 !inner_json.any_ref().is::<MeteredJsonHandler>(),
42 "MeteredDeltaEngine wraps an engine whose json_handler is already a \
43 MeteredJsonHandler; remove the outer wrap to avoid double-counting metrics",
44 );
45 let inner_parquet = inner.parquet_handler();
46 debug_assert!(
47 !inner_parquet.any_ref().is::<MeteredParquetHandler>(),
48 "MeteredDeltaEngine wraps an engine whose parquet_handler is already a \
49 MeteredParquetHandler; remove the outer wrap to avoid double-counting metrics",
50 );
51 let storage = Arc::new(MeteredStorageHandler::new(inner_storage));
52 let json = Arc::new(MeteredJsonHandler::new(inner_json));
53 let parquet = Arc::new(MeteredParquetHandler::new(inner_parquet));
54 Self {
55 inner,
56 storage,
57 json,
58 parquet,
59 }
60 }
61}
62
63impl std::fmt::Debug for MeteredDeltaEngine {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 f.debug_struct("MeteredDeltaEngine").finish_non_exhaustive()
66 }
67}
68
69impl Engine for MeteredDeltaEngine {
70 fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler> {
71 self.inner.evaluation_handler()
72 }
73
74 fn storage_handler(&self) -> Arc<dyn StorageHandler> {
75 Arc::clone(&self.storage)
76 }
77
78 fn json_handler(&self) -> Arc<dyn JsonHandler> {
79 Arc::clone(&self.json)
80 }
81
82 fn parquet_handler(&self) -> Arc<dyn ParquetHandler> {
83 Arc::clone(&self.parquet)
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use std::sync::Arc;
90
91 use bytes::Bytes;
92 use url::Url;
93
94 use super::*;
95 use crate::engine::sync::SyncEngine;
96 use crate::metrics::MetricEvent;
97 use crate::utils::test_utils::{install_thread_local_metrics_reporter, CapturingReporter};
98 use crate::{DeltaResult, FileMeta, FileSlice};
99
100 #[derive(Debug)]
102 struct StubStorageHandler {
103 list_results: Vec<FileMeta>,
104 }
105
106 impl StorageHandler for StubStorageHandler {
107 fn list_from(
108 &self,
109 _path: &Url,
110 ) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>> {
111 let results: Vec<_> = self.list_results.iter().cloned().map(Ok).collect();
112 Ok(Box::new(results.into_iter()))
113 }
114 fn read_files(
115 &self,
116 _files: Vec<FileSlice>,
117 ) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>> {
118 Ok(Box::new(std::iter::empty()))
119 }
120 fn copy_atomic(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> {
121 Ok(())
122 }
123 fn put(&self, _path: &Url, _data: Bytes, _overwrite: bool) -> DeltaResult<()> {
124 Ok(())
125 }
126 fn head(&self, _path: &Url) -> DeltaResult<FileMeta> {
127 unreachable!("not exercised")
128 }
129 }
130
131 struct StubEngine {
134 sync: Arc<SyncEngine>,
135 storage: Arc<dyn StorageHandler>,
136 }
137
138 impl StubEngine {
139 fn new() -> Self {
140 Self {
141 sync: Arc::new(SyncEngine::new()),
142 storage: Arc::new(StubStorageHandler {
143 list_results: vec![
144 FileMeta {
145 location: Url::parse("memory:///_delta_log/00000000000000000000.json")
146 .unwrap(),
147 last_modified: 0,
148 size: 0,
149 },
150 FileMeta {
151 location: Url::parse("memory:///_delta_log/00000000000000000001.json")
152 .unwrap(),
153 last_modified: 0,
154 size: 0,
155 },
156 ],
157 }),
158 }
159 }
160 }
161
162 impl Engine for StubEngine {
163 fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler> {
164 self.sync.evaluation_handler()
165 }
166 fn storage_handler(&self) -> Arc<dyn StorageHandler> {
167 Arc::clone(&self.storage)
168 }
169 fn json_handler(&self) -> Arc<dyn JsonHandler> {
170 self.sync.json_handler()
171 }
172 fn parquet_handler(&self) -> Arc<dyn ParquetHandler> {
173 self.sync.parquet_handler()
174 }
175 }
176
177 #[test]
178 fn storage_handler_emits_metered_spans() {
179 let reporter = Arc::new(CapturingReporter::default());
180 let _guard = install_thread_local_metrics_reporter(Arc::clone(&reporter) as _);
181
182 let engine = MeteredDeltaEngine::new(Arc::new(StubEngine::new()));
183 let url = Url::parse("memory:///_delta_log/").unwrap();
184 let iter = engine.storage_handler().list_from(&url).unwrap();
185 let _: Vec<_> = iter.collect();
186
187 let listed = reporter
188 .events()
189 .iter()
190 .find(|e| matches!(e, MetricEvent::StorageListCompleted(_)))
191 .cloned()
192 .expect("expected StorageListCompleted via metering wrapper");
193 let MetricEvent::StorageListCompleted(e) = listed else {
194 unreachable!();
195 };
196 assert_eq!(e.num_files, 2);
197 }
198
199 #[test]
200 fn evaluation_handler_passes_through() {
201 let inner: Arc<dyn Engine> = Arc::new(StubEngine::new());
202 let inner_eval = inner.evaluation_handler();
203
204 let engine = MeteredDeltaEngine::new(inner);
205 assert!(Arc::ptr_eq(&inner_eval, &engine.evaluation_handler()));
206 }
207
208 #[test]
209 fn json_and_parquet_handlers_are_metered() {
210 let engine = MeteredDeltaEngine::new(Arc::new(StubEngine::new()));
211 assert!(engine.json_handler().any_ref().is::<MeteredJsonHandler>());
212 assert!(engine
213 .parquet_handler()
214 .any_ref()
215 .is::<MeteredParquetHandler>());
216 }
217
218 struct PreMeteredEngine {
221 inner: StubEngine,
222 meter_storage: bool,
223 meter_json: bool,
224 meter_parquet: bool,
225 }
226
227 impl Engine for PreMeteredEngine {
228 fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler> {
229 self.inner.evaluation_handler()
230 }
231 fn storage_handler(&self) -> Arc<dyn StorageHandler> {
232 if self.meter_storage {
233 Arc::new(MeteredStorageHandler::new(self.inner.storage_handler()))
234 } else {
235 self.inner.storage_handler()
236 }
237 }
238 fn json_handler(&self) -> Arc<dyn JsonHandler> {
239 if self.meter_json {
240 Arc::new(MeteredJsonHandler::new(self.inner.json_handler()))
241 } else {
242 self.inner.json_handler()
243 }
244 }
245 fn parquet_handler(&self) -> Arc<dyn ParquetHandler> {
246 if self.meter_parquet {
247 Arc::new(MeteredParquetHandler::new(self.inner.parquet_handler()))
248 } else {
249 self.inner.parquet_handler()
250 }
251 }
252 }
253
254 fn pre_metered(storage: bool, json: bool, parquet: bool) -> PreMeteredEngine {
255 PreMeteredEngine {
256 inner: StubEngine::new(),
257 meter_storage: storage,
258 meter_json: json,
259 meter_parquet: parquet,
260 }
261 }
262
263 #[test]
264 #[should_panic(expected = "storage_handler is already a MeteredStorageHandler")]
265 fn new_panics_when_inner_storage_already_metered() {
266 let _ = MeteredDeltaEngine::new(Arc::new(pre_metered(true, false, false)));
267 }
268
269 #[test]
270 #[should_panic(expected = "json_handler is already a MeteredJsonHandler")]
271 fn new_panics_when_inner_json_already_metered() {
272 let _ = MeteredDeltaEngine::new(Arc::new(pre_metered(false, true, false)));
273 }
274
275 #[test]
276 #[should_panic(expected = "parquet_handler is already a MeteredParquetHandler")]
277 fn new_panics_when_inner_parquet_already_metered() {
278 let _ = MeteredDeltaEngine::new(Arc::new(pre_metered(false, false, true)));
279 }
280}