Skip to main content

delta_kernel/metrics/
metered_engine.rs

1//! [`MeteredDeltaEngine`] wraps any [`Engine`] so its `storage_handler`,
2//! `json_handler`, and `parquet_handler` emit the kernel's standard handler-completion
3//! tracing spans. `evaluation_handler` passes through unchanged.
4//!
5//! ```ignore
6//! let inner: Arc<dyn Engine> = Arc::new(MyEngine::build()?);
7//! let engine: Arc<dyn Engine> = Arc::new(MeteredDeltaEngine::new(inner));
8//! ```
9
10use std::sync::Arc;
11
12use crate::metrics::{MeteredJsonHandler, MeteredParquetHandler, MeteredStorageHandler};
13use crate::{Engine, EvaluationHandler, JsonHandler, ParquetHandler, StorageHandler};
14
15/// Decorator over any [`Engine`] that meters its storage, JSON, and Parquet handlers.
16/// See module docs.
17pub struct MeteredDeltaEngine {
18    inner: Arc<dyn Engine>,
19    storage: Arc<dyn StorageHandler>,
20    json: Arc<dyn JsonHandler>,
21    parquet: Arc<dyn ParquetHandler>,
22}
23
24impl MeteredDeltaEngine {
25    /// Wrap `inner`. Debug-asserts that none of `inner`'s handlers are already metered
26    /// wrappers, so the resulting engine emits each span exactly once.
27    ///
28    /// The check is shallow: it inspects the immediate concrete type returned by each
29    /// handler accessor and does not walk intermediate wrapper types. Wrapping a metered
30    /// handler behind a non-metered wrapper before re-wrapping (e.g.
31    /// `Metered(Foo(Metered(...)))`) silently double-counts.
32    pub fn new(inner: Arc<dyn Engine>) -> Self {
33        let inner_storage = inner.storage_handler();
34        debug_assert!(
35            !inner_storage.any_ref().is::<MeteredStorageHandler>(),
36            "MeteredDeltaEngine wraps an engine whose storage_handler is already a \
37             MeteredStorageHandler; remove the outer wrap to avoid double-counting metrics",
38        );
39        let inner_json = inner.json_handler();
40        debug_assert!(
41            !inner_json.any_ref().is::<MeteredJsonHandler>(),
42            "MeteredDeltaEngine wraps an engine whose json_handler is already a \
43             MeteredJsonHandler; remove the outer wrap to avoid double-counting metrics",
44        );
45        let inner_parquet = inner.parquet_handler();
46        debug_assert!(
47            !inner_parquet.any_ref().is::<MeteredParquetHandler>(),
48            "MeteredDeltaEngine wraps an engine whose parquet_handler is already a \
49             MeteredParquetHandler; remove the outer wrap to avoid double-counting metrics",
50        );
51        let storage = Arc::new(MeteredStorageHandler::new(inner_storage));
52        let json = Arc::new(MeteredJsonHandler::new(inner_json));
53        let parquet = Arc::new(MeteredParquetHandler::new(inner_parquet));
54        Self {
55            inner,
56            storage,
57            json,
58            parquet,
59        }
60    }
61}
62
63impl std::fmt::Debug for MeteredDeltaEngine {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        f.debug_struct("MeteredDeltaEngine").finish_non_exhaustive()
66    }
67}
68
69impl Engine for MeteredDeltaEngine {
70    fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler> {
71        self.inner.evaluation_handler()
72    }
73
74    fn storage_handler(&self) -> Arc<dyn StorageHandler> {
75        Arc::clone(&self.storage)
76    }
77
78    fn json_handler(&self) -> Arc<dyn JsonHandler> {
79        Arc::clone(&self.json)
80    }
81
82    fn parquet_handler(&self) -> Arc<dyn ParquetHandler> {
83        Arc::clone(&self.parquet)
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use std::sync::Arc;
90
91    use bytes::Bytes;
92    use url::Url;
93
94    use super::*;
95    use crate::engine::sync::SyncEngine;
96    use crate::metrics::MetricEvent;
97    use crate::utils::test_utils::{install_thread_local_metrics_reporter, CapturingReporter};
98    use crate::{DeltaResult, FileMeta, FileSlice};
99
100    /// Minimal storage handler used in tests: returns N preconfigured FileMeta items.
101    #[derive(Debug)]
102    struct StubStorageHandler {
103        list_results: Vec<FileMeta>,
104    }
105
106    impl StorageHandler for StubStorageHandler {
107        fn list_from(
108            &self,
109            _path: &Url,
110        ) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>> {
111            let results: Vec<_> = self.list_results.iter().cloned().map(Ok).collect();
112            Ok(Box::new(results.into_iter()))
113        }
114        fn read_files(
115            &self,
116            _files: Vec<FileSlice>,
117        ) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>> {
118            Ok(Box::new(std::iter::empty()))
119        }
120        fn copy_atomic(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> {
121            Ok(())
122        }
123        fn put(&self, _path: &Url, _data: Bytes, _overwrite: bool) -> DeltaResult<()> {
124            Ok(())
125        }
126        fn head(&self, _path: &Url) -> DeltaResult<FileMeta> {
127            unreachable!("not exercised")
128        }
129    }
130
131    /// Engine that delegates everything to a [`SyncEngine`] except `storage_handler`, which
132    /// returns a [`StubStorageHandler`] configured to yield two list entries.
133    struct StubEngine {
134        sync: Arc<SyncEngine>,
135        storage: Arc<dyn StorageHandler>,
136    }
137
138    impl StubEngine {
139        fn new() -> Self {
140            Self {
141                sync: Arc::new(SyncEngine::new()),
142                storage: Arc::new(StubStorageHandler {
143                    list_results: vec![
144                        FileMeta {
145                            location: Url::parse("memory:///_delta_log/00000000000000000000.json")
146                                .unwrap(),
147                            last_modified: 0,
148                            size: 0,
149                        },
150                        FileMeta {
151                            location: Url::parse("memory:///_delta_log/00000000000000000001.json")
152                                .unwrap(),
153                            last_modified: 0,
154                            size: 0,
155                        },
156                    ],
157                }),
158            }
159        }
160    }
161
162    impl Engine for StubEngine {
163        fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler> {
164            self.sync.evaluation_handler()
165        }
166        fn storage_handler(&self) -> Arc<dyn StorageHandler> {
167            Arc::clone(&self.storage)
168        }
169        fn json_handler(&self) -> Arc<dyn JsonHandler> {
170            self.sync.json_handler()
171        }
172        fn parquet_handler(&self) -> Arc<dyn ParquetHandler> {
173            self.sync.parquet_handler()
174        }
175    }
176
177    #[test]
178    fn storage_handler_emits_metered_spans() {
179        let reporter = Arc::new(CapturingReporter::default());
180        let _guard = install_thread_local_metrics_reporter(Arc::clone(&reporter) as _);
181
182        let engine = MeteredDeltaEngine::new(Arc::new(StubEngine::new()));
183        let url = Url::parse("memory:///_delta_log/").unwrap();
184        let iter = engine.storage_handler().list_from(&url).unwrap();
185        let _: Vec<_> = iter.collect();
186
187        let listed = reporter
188            .events()
189            .iter()
190            .find(|e| matches!(e, MetricEvent::StorageListCompleted(_)))
191            .cloned()
192            .expect("expected StorageListCompleted via metering wrapper");
193        let MetricEvent::StorageListCompleted(e) = listed else {
194            unreachable!();
195        };
196        assert_eq!(e.num_files, 2);
197    }
198
199    #[test]
200    fn evaluation_handler_passes_through() {
201        let inner: Arc<dyn Engine> = Arc::new(StubEngine::new());
202        let inner_eval = inner.evaluation_handler();
203
204        let engine = MeteredDeltaEngine::new(inner);
205        assert!(Arc::ptr_eq(&inner_eval, &engine.evaluation_handler()));
206    }
207
208    #[test]
209    fn json_and_parquet_handlers_are_metered() {
210        let engine = MeteredDeltaEngine::new(Arc::new(StubEngine::new()));
211        assert!(engine.json_handler().any_ref().is::<MeteredJsonHandler>());
212        assert!(engine
213            .parquet_handler()
214            .any_ref()
215            .is::<MeteredParquetHandler>());
216    }
217
218    /// Engine that pre-meters a single handler; lets each double-wrap test target one
219    /// guard without tripping the others.
220    struct PreMeteredEngine {
221        inner: StubEngine,
222        meter_storage: bool,
223        meter_json: bool,
224        meter_parquet: bool,
225    }
226
227    impl Engine for PreMeteredEngine {
228        fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler> {
229            self.inner.evaluation_handler()
230        }
231        fn storage_handler(&self) -> Arc<dyn StorageHandler> {
232            if self.meter_storage {
233                Arc::new(MeteredStorageHandler::new(self.inner.storage_handler()))
234            } else {
235                self.inner.storage_handler()
236            }
237        }
238        fn json_handler(&self) -> Arc<dyn JsonHandler> {
239            if self.meter_json {
240                Arc::new(MeteredJsonHandler::new(self.inner.json_handler()))
241            } else {
242                self.inner.json_handler()
243            }
244        }
245        fn parquet_handler(&self) -> Arc<dyn ParquetHandler> {
246            if self.meter_parquet {
247                Arc::new(MeteredParquetHandler::new(self.inner.parquet_handler()))
248            } else {
249                self.inner.parquet_handler()
250            }
251        }
252    }
253
254    fn pre_metered(storage: bool, json: bool, parquet: bool) -> PreMeteredEngine {
255        PreMeteredEngine {
256            inner: StubEngine::new(),
257            meter_storage: storage,
258            meter_json: json,
259            meter_parquet: parquet,
260        }
261    }
262
263    #[test]
264    #[should_panic(expected = "storage_handler is already a MeteredStorageHandler")]
265    fn new_panics_when_inner_storage_already_metered() {
266        let _ = MeteredDeltaEngine::new(Arc::new(pre_metered(true, false, false)));
267    }
268
269    #[test]
270    #[should_panic(expected = "json_handler is already a MeteredJsonHandler")]
271    fn new_panics_when_inner_json_already_metered() {
272        let _ = MeteredDeltaEngine::new(Arc::new(pre_metered(false, true, false)));
273    }
274
275    #[test]
276    #[should_panic(expected = "parquet_handler is already a MeteredParquetHandler")]
277    fn new_panics_when_inner_parquet_already_metered() {
278        let _ = MeteredDeltaEngine::new(Arc::new(pre_metered(false, false, true)));
279    }
280}