use std::sync::Arc;
use crate::metrics::{MeteredJsonHandler, MeteredParquetHandler, MeteredStorageHandler};
use crate::{Engine, EvaluationHandler, JsonHandler, ParquetHandler, StorageHandler};
pub struct MeteredDeltaEngine {
inner: Arc<dyn Engine>,
storage: Arc<dyn StorageHandler>,
json: Arc<dyn JsonHandler>,
parquet: Arc<dyn ParquetHandler>,
}
impl MeteredDeltaEngine {
pub fn new(inner: Arc<dyn Engine>) -> Self {
let inner_storage = inner.storage_handler();
debug_assert!(
!inner_storage.any_ref().is::<MeteredStorageHandler>(),
"MeteredDeltaEngine wraps an engine whose storage_handler is already a \
MeteredStorageHandler; remove the outer wrap to avoid double-counting metrics",
);
let inner_json = inner.json_handler();
debug_assert!(
!inner_json.any_ref().is::<MeteredJsonHandler>(),
"MeteredDeltaEngine wraps an engine whose json_handler is already a \
MeteredJsonHandler; remove the outer wrap to avoid double-counting metrics",
);
let inner_parquet = inner.parquet_handler();
debug_assert!(
!inner_parquet.any_ref().is::<MeteredParquetHandler>(),
"MeteredDeltaEngine wraps an engine whose parquet_handler is already a \
MeteredParquetHandler; remove the outer wrap to avoid double-counting metrics",
);
let storage = Arc::new(MeteredStorageHandler::new(inner_storage));
let json = Arc::new(MeteredJsonHandler::new(inner_json));
let parquet = Arc::new(MeteredParquetHandler::new(inner_parquet));
Self {
inner,
storage,
json,
parquet,
}
}
}
impl std::fmt::Debug for MeteredDeltaEngine {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MeteredDeltaEngine").finish_non_exhaustive()
}
}
impl Engine for MeteredDeltaEngine {
fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler> {
self.inner.evaluation_handler()
}
fn storage_handler(&self) -> Arc<dyn StorageHandler> {
Arc::clone(&self.storage)
}
fn json_handler(&self) -> Arc<dyn JsonHandler> {
Arc::clone(&self.json)
}
fn parquet_handler(&self) -> Arc<dyn ParquetHandler> {
Arc::clone(&self.parquet)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use bytes::Bytes;
use url::Url;
use super::*;
use crate::engine::sync::SyncEngine;
use crate::metrics::MetricEvent;
use crate::utils::test_utils::{install_thread_local_metrics_reporter, CapturingReporter};
use crate::{DeltaResult, FileMeta, FileSlice};
#[derive(Debug)]
struct StubStorageHandler {
list_results: Vec<FileMeta>,
}
impl StorageHandler for StubStorageHandler {
fn list_from(
&self,
_path: &Url,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>> {
let results: Vec<_> = self.list_results.iter().cloned().map(Ok).collect();
Ok(Box::new(results.into_iter()))
}
fn read_files(
&self,
_files: Vec<FileSlice>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>> {
Ok(Box::new(std::iter::empty()))
}
fn copy_atomic(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> {
Ok(())
}
fn put(&self, _path: &Url, _data: Bytes, _overwrite: bool) -> DeltaResult<()> {
Ok(())
}
fn head(&self, _path: &Url) -> DeltaResult<FileMeta> {
unreachable!("not exercised")
}
}
struct StubEngine {
sync: Arc<SyncEngine>,
storage: Arc<dyn StorageHandler>,
}
impl StubEngine {
fn new() -> Self {
Self {
sync: Arc::new(SyncEngine::new()),
storage: Arc::new(StubStorageHandler {
list_results: vec![
FileMeta {
location: Url::parse("memory:///_delta_log/00000000000000000000.json")
.unwrap(),
last_modified: 0,
size: 0,
},
FileMeta {
location: Url::parse("memory:///_delta_log/00000000000000000001.json")
.unwrap(),
last_modified: 0,
size: 0,
},
],
}),
}
}
}
impl Engine for StubEngine {
fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler> {
self.sync.evaluation_handler()
}
fn storage_handler(&self) -> Arc<dyn StorageHandler> {
Arc::clone(&self.storage)
}
fn json_handler(&self) -> Arc<dyn JsonHandler> {
self.sync.json_handler()
}
fn parquet_handler(&self) -> Arc<dyn ParquetHandler> {
self.sync.parquet_handler()
}
}
#[test]
fn storage_handler_emits_metered_spans() {
let reporter = Arc::new(CapturingReporter::default());
let _guard = install_thread_local_metrics_reporter(Arc::clone(&reporter) as _);
let engine = MeteredDeltaEngine::new(Arc::new(StubEngine::new()));
let url = Url::parse("memory:///_delta_log/").unwrap();
let iter = engine.storage_handler().list_from(&url).unwrap();
let _: Vec<_> = iter.collect();
let listed = reporter
.events()
.iter()
.find(|e| matches!(e, MetricEvent::StorageListCompleted(_)))
.cloned()
.expect("expected StorageListCompleted via metering wrapper");
let MetricEvent::StorageListCompleted(e) = listed else {
unreachable!();
};
assert_eq!(e.num_files, 2);
}
#[test]
fn evaluation_handler_passes_through() {
let inner: Arc<dyn Engine> = Arc::new(StubEngine::new());
let inner_eval = inner.evaluation_handler();
let engine = MeteredDeltaEngine::new(inner);
assert!(Arc::ptr_eq(&inner_eval, &engine.evaluation_handler()));
}
#[test]
fn json_and_parquet_handlers_are_metered() {
let engine = MeteredDeltaEngine::new(Arc::new(StubEngine::new()));
assert!(engine.json_handler().any_ref().is::<MeteredJsonHandler>());
assert!(engine
.parquet_handler()
.any_ref()
.is::<MeteredParquetHandler>());
}
struct PreMeteredEngine {
inner: StubEngine,
meter_storage: bool,
meter_json: bool,
meter_parquet: bool,
}
impl Engine for PreMeteredEngine {
fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler> {
self.inner.evaluation_handler()
}
fn storage_handler(&self) -> Arc<dyn StorageHandler> {
if self.meter_storage {
Arc::new(MeteredStorageHandler::new(self.inner.storage_handler()))
} else {
self.inner.storage_handler()
}
}
fn json_handler(&self) -> Arc<dyn JsonHandler> {
if self.meter_json {
Arc::new(MeteredJsonHandler::new(self.inner.json_handler()))
} else {
self.inner.json_handler()
}
}
fn parquet_handler(&self) -> Arc<dyn ParquetHandler> {
if self.meter_parquet {
Arc::new(MeteredParquetHandler::new(self.inner.parquet_handler()))
} else {
self.inner.parquet_handler()
}
}
}
fn pre_metered(storage: bool, json: bool, parquet: bool) -> PreMeteredEngine {
PreMeteredEngine {
inner: StubEngine::new(),
meter_storage: storage,
meter_json: json,
meter_parquet: parquet,
}
}
#[test]
#[should_panic(expected = "storage_handler is already a MeteredStorageHandler")]
fn new_panics_when_inner_storage_already_metered() {
let _ = MeteredDeltaEngine::new(Arc::new(pre_metered(true, false, false)));
}
#[test]
#[should_panic(expected = "json_handler is already a MeteredJsonHandler")]
fn new_panics_when_inner_json_already_metered() {
let _ = MeteredDeltaEngine::new(Arc::new(pre_metered(false, true, false)));
}
#[test]
#[should_panic(expected = "parquet_handler is already a MeteredParquetHandler")]
fn new_panics_when_inner_parquet_already_metered() {
let _ = MeteredDeltaEngine::new(Arc::new(pre_metered(false, false, true)));
}
}