use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use bytes::Bytes;
use delta_kernel_derive::internal_api;
use futures::stream::{self, BoxStream, Stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use url::Url;
use super::UrlExt;
use crate::engine::default::executor::TaskExecutor;
use crate::metrics::{MetricEvent, MetricsReporter};
use crate::object_store::path::Path;
use crate::object_store::{self, DynObjectStore, ObjectStoreExt as _, PutMode};
use crate::{DeltaResult, Error, FileMeta, FileSlice, StorageHandler};
struct MetricsIterator<I, T> {
inner: I,
reporter: Option<Arc<dyn MetricsReporter>>,
start: Instant,
num_files: u64,
bytes_read: u64,
event_fn: fn(Duration, u64, u64) -> MetricEvent,
_phantom: std::marker::PhantomData<T>,
}
impl<I, T> MetricsIterator<I, T> {
fn new(
inner: I,
reporter: Option<Arc<dyn MetricsReporter>>,
start: Instant,
event_fn: fn(Duration, u64, u64) -> MetricEvent,
) -> Self {
Self {
inner,
reporter,
start,
num_files: 0,
bytes_read: 0,
event_fn,
_phantom: std::marker::PhantomData,
}
}
fn emit_metrics_once(&mut self) {
if let Some(r) = self.reporter.take() {
r.report((self.event_fn)(
self.start.elapsed(),
self.num_files,
self.bytes_read,
));
}
}
}
impl<I, T> Drop for MetricsIterator<I, T> {
fn drop(&mut self) {
self.emit_metrics_once();
}
}
impl<I> Stream for MetricsIterator<I, FileMeta>
where
I: Stream<Item = DeltaResult<FileMeta>> + Unpin,
{
type Item = I::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
Some(item) => {
if item.is_ok() {
self.num_files += 1;
}
Poll::Ready(Some(item))
}
None => {
self.emit_metrics_once();
Poll::Ready(None)
}
}
}
}
impl<I> Stream for MetricsIterator<I, Bytes>
where
I: Stream<Item = DeltaResult<Bytes>> + Unpin,
{
type Item = I::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
Some(item) => {
if let Ok(ref bytes) = item {
self.num_files += 1;
self.bytes_read += bytes.len() as u64;
}
Poll::Ready(Some(item))
}
None => {
self.emit_metrics_once();
Poll::Ready(None)
}
}
}
}
#[derive(Debug)]
pub struct ObjectStoreStorageHandler<E: TaskExecutor> {
inner: Arc<DynObjectStore>,
task_executor: Arc<E>,
reporter: Option<Arc<dyn MetricsReporter>>,
readahead: usize,
}
impl<E: TaskExecutor> ObjectStoreStorageHandler<E> {
#[internal_api]
pub(crate) fn new(
store: Arc<DynObjectStore>,
task_executor: Arc<E>,
reporter: Option<Arc<dyn MetricsReporter>>,
) -> Self {
Self {
inner: store,
task_executor,
reporter,
readahead: 10,
}
}
pub fn with_readahead(mut self, readahead: usize) -> Self {
self.readahead = readahead;
self
}
}
async fn list_from_impl(
store: Arc<DynObjectStore>,
path: Url,
reporter: Option<Arc<dyn MetricsReporter>>,
) -> DeltaResult<BoxStream<'static, DeltaResult<FileMeta>>> {
let start = Instant::now();
let offset = Path::from_url_path(path.path())?;
let prefix = if path.path().ends_with('/') {
offset.clone()
} else {
let mut parts = offset.parts().collect_vec();
if parts.pop().is_none() {
return Err(Error::Generic(format!(
"Offset path must not be a root directory. Got: '{path}'",
)));
}
Path::from_iter(parts)
};
let has_ordered_listing = supports_ordered_listing(&path);
let stream = store
.list_with_offset(Some(&prefix), &offset)
.map(move |meta| {
let meta = meta?;
let mut location = path.clone();
location.set_path(&format!("/{}", meta.location.as_ref()));
Ok(FileMeta {
location,
last_modified: meta.last_modified.timestamp_millis(),
size: meta.size,
})
});
if !has_ordered_listing {
let mut items: Vec<_> = stream.try_collect().await?;
items.sort_unstable();
if let Some(r) = reporter {
r.report(MetricEvent::StorageListCompleted {
duration: start.elapsed(),
num_files: items.len() as u64,
});
}
Ok(Box::pin(stream::iter(items.into_iter().map(Ok))))
} else {
let stream = MetricsIterator::new(
stream,
reporter,
start,
|duration, num_files, _bytes_read| MetricEvent::StorageListCompleted {
duration,
num_files,
},
);
Ok(Box::pin(stream))
}
}
async fn read_files_impl(
store: Arc<DynObjectStore>,
files: Vec<FileSlice>,
readahead: usize,
reporter: Option<Arc<dyn MetricsReporter>>,
) -> DeltaResult<BoxStream<'static, DeltaResult<Bytes>>> {
let start = Instant::now();
let files = stream::iter(files).map(move |(url, range)| {
let store = store.clone();
async move {
let path = if url.scheme() == "file" {
let file_path = url
.to_file_path()
.map_err(|_| Error::InvalidTableLocation(format!("Invalid file URL: {url}")))?;
Path::from_absolute_path(file_path)
.map_err(|e| Error::InvalidTableLocation(format!("Invalid file path: {e}")))?
} else {
Path::from(url.path())
};
if url.is_presigned() {
Ok::<bytes::Bytes, Error>(reqwest::get(url).await?.bytes().await?)
} else if let Some(rng) = range {
Ok(store.get_range(&path, rng).await?)
} else {
let result = store.get(&path).await?;
Ok(result.bytes().await?)
}
}
});
Ok(Box::pin(MetricsIterator::new(
files.buffered(readahead),
reporter,
start,
|duration, num_files, bytes_read| MetricEvent::StorageReadCompleted {
duration,
num_files,
bytes_read,
},
)))
}
async fn copy_atomic_impl(
store: Arc<DynObjectStore>,
src_path: Path,
dest_path: Path,
reporter: Option<Arc<dyn MetricsReporter>>,
) -> DeltaResult<()> {
let start = Instant::now();
let data = store.get(&src_path).await?.bytes().await?;
let result = store
.put_opts(&dest_path, data.into(), PutMode::Create.into())
.await;
if let Some(r) = reporter {
r.report(MetricEvent::StorageCopyCompleted {
duration: start.elapsed(),
});
}
result.map_err(|e| match e {
object_store::Error::AlreadyExists { .. } => Error::FileAlreadyExists(dest_path.into()),
e => e.into(),
})?;
Ok(())
}
async fn put_impl(
store: Arc<DynObjectStore>,
path: Path,
data: Bytes,
overwrite: bool,
) -> DeltaResult<()> {
let put_mode = if overwrite {
PutMode::Overwrite
} else {
PutMode::Create
};
let result = store.put_opts(&path, data.into(), put_mode.into()).await;
result.map_err(|e| match e {
object_store::Error::AlreadyExists { .. } => Error::FileAlreadyExists(path.into()),
e => e.into(),
})?;
Ok(())
}
async fn head_impl(store: Arc<DynObjectStore>, url: Url) -> DeltaResult<FileMeta> {
let meta = store.head(&Path::from_url_path(url.path())?).await?;
Ok(FileMeta {
location: url,
last_modified: meta.last_modified.timestamp_millis(),
size: meta.size,
})
}
impl<E: TaskExecutor> StorageHandler for ObjectStoreStorageHandler<E> {
fn list_from(
&self,
path: &Url,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>> {
let future = list_from_impl(self.inner.clone(), path.clone(), self.reporter.clone());
let iter = super::stream_future_to_iter(self.task_executor.clone(), future)?;
Ok(iter) }
fn read_files(
&self,
files: Vec<FileSlice>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>> {
let future = read_files_impl(
self.inner.clone(),
files,
self.readahead,
self.reporter.clone(),
);
let iter = super::stream_future_to_iter(self.task_executor.clone(), future)?;
Ok(iter) }
fn put(&self, path: &Url, data: Bytes, overwrite: bool) -> DeltaResult<()> {
let path = Path::from_url_path(path.path())?;
self.task_executor
.block_on(put_impl(self.inner.clone(), path, data, overwrite))
}
fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()> {
let src_path = Path::from_url_path(src.path())?;
let dest_path = Path::from_url_path(dest.path())?;
let future = copy_atomic_impl(
self.inner.clone(),
src_path,
dest_path,
self.reporter.clone(),
);
self.task_executor.block_on(future)
}
fn head(&self, path: &Url) -> DeltaResult<FileMeta> {
let future = head_impl(self.inner.clone(), path.clone());
self.task_executor.block_on(future)
}
}
fn supports_ordered_listing(url: &Url) -> bool {
!((url.scheme() == "file")
|| url.domain().map(|d| d.contains("--x-s3")).unwrap_or(false)
|| url.domain().map(|d| d.contains("-xa-s3")).unwrap_or(false))
}
#[cfg(test)]
mod tests {
use std::ops::Range;
use std::time::Duration;
use itertools::Itertools;
use test_utils::delta_path_for_version;
use super::*;
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::DefaultEngineBuilder;
use crate::object_store::local::LocalFileSystem;
use crate::object_store::memory::InMemory;
use crate::utils::current_time_duration;
use crate::Engine as _;
fn setup_test() -> (
tempfile::TempDir,
Arc<LocalFileSystem>,
ObjectStoreStorageHandler<TokioBackgroundExecutor>,
) {
let tmp = tempfile::tempdir().unwrap();
let store = Arc::new(LocalFileSystem::new());
let executor = Arc::new(TokioBackgroundExecutor::new());
let handler = ObjectStoreStorageHandler::new(store.clone(), executor, None);
(tmp, store, handler)
}
#[test]
fn test_ordered_listing_for_url() {
for (u, expected) in &[
(Url::parse("file:///dev/null").unwrap(), false),
(Url::parse("s3://robbert").unwrap(), true),
(Url::parse("s3://robbert/likes/paths").unwrap(), true),
(Url::parse("s3://robbie-one-zone--x-s3").unwrap(), false),
(
Url::parse("https://robbie-one-zone-xa-s3.us-east-2.amazonaws.biz").unwrap(),
false,
),
] {
assert_eq!(
*expected,
supports_ordered_listing(u),
"expected {expected} on {u:?}"
);
}
}
#[tokio::test]
async fn test_read_files() {
let tmp = tempfile::tempdir().unwrap();
let tmp_store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
let data = Bytes::from("kernel-data");
tmp_store
.put(&Path::from("a"), data.clone().into())
.await
.unwrap();
tmp_store
.put(&Path::from("b"), data.clone().into())
.await
.unwrap();
tmp_store
.put(&Path::from("c"), data.clone().into())
.await
.unwrap();
let mut url = Url::from_directory_path(tmp.path()).unwrap();
let store = Arc::new(LocalFileSystem::new());
let executor = Arc::new(TokioBackgroundExecutor::new());
let storage = ObjectStoreStorageHandler::new(store, executor, None);
let mut slices: Vec<FileSlice> = Vec::new();
let mut url1 = url.clone();
url1.set_path(&format!("{}/b", url.path()));
slices.push((url1.clone(), Some(Range { start: 0, end: 6 })));
slices.push((url1, Some(Range { start: 7, end: 11 })));
url.set_path(&format!("{}/c", url.path()));
slices.push((url, Some(Range { start: 4, end: 9 })));
dbg!("Slices are: {}", &slices);
let data: Vec<Bytes> = storage.read_files(slices).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 3);
assert_eq!(data[0], Bytes::from("kernel"));
assert_eq!(data[1], Bytes::from("data"));
assert_eq!(data[2], Bytes::from("el-da"));
}
#[tokio::test]
async fn test_file_meta_is_correct() {
let store = Arc::new(InMemory::new());
let begin_time = current_time_duration().unwrap();
let data = Bytes::from("kernel-data");
let name = delta_path_for_version(1, "json");
store.put(&name, data.clone().into()).await.unwrap();
let table_root = Url::parse("memory:///").expect("valid url");
let engine = DefaultEngineBuilder::new(store).build();
let files: Vec<_> = engine
.storage_handler()
.list_from(&table_root.join("_delta_log").unwrap().join("0").unwrap())
.unwrap()
.try_collect()
.unwrap();
assert!(!files.is_empty());
for meta in files.into_iter() {
let meta_time = Duration::from_millis(meta.last_modified.try_into().unwrap());
assert!(meta_time.abs_diff(begin_time) < Duration::from_secs(10));
}
}
#[tokio::test]
async fn test_default_engine_listing() {
let tmp = tempfile::tempdir().unwrap();
let tmp_store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
let data = Bytes::from("kernel-data");
let expected_names: Vec<Path> =
(0..10).map(|i| delta_path_for_version(i, "json")).collect();
for name in expected_names.iter().rev() {
tmp_store.put(name, data.clone().into()).await.unwrap();
}
let url = Url::from_directory_path(tmp.path()).unwrap();
let store = Arc::new(LocalFileSystem::new());
let engine = DefaultEngineBuilder::new(store).build();
let files = engine
.storage_handler()
.list_from(&url.join("_delta_log").unwrap().join("0").unwrap())
.unwrap();
let mut len = 0;
for (file, expected) in files.zip(expected_names.iter()) {
assert!(
file.as_ref()
.unwrap()
.location
.path()
.ends_with(expected.as_ref()),
"{} does not end with {}",
file.unwrap().location.path(),
expected
);
len += 1;
}
assert_eq!(len, 10, "list_from should have returned 10 files");
}
#[tokio::test]
async fn test_copy() {
let (tmp, store, handler) = setup_test();
let data = Bytes::from("test-data");
let src_path = Path::from_absolute_path(tmp.path().join("src.txt")).unwrap();
store.put(&src_path, data.clone().into()).await.unwrap();
let src_url = Url::from_file_path(tmp.path().join("src.txt")).unwrap();
let dest_url = Url::from_file_path(tmp.path().join("dest.txt")).unwrap();
assert!(handler.copy_atomic(&src_url, &dest_url).is_ok());
let dest_path = Path::from_absolute_path(tmp.path().join("dest.txt")).unwrap();
assert_eq!(
store.get(&dest_path).await.unwrap().bytes().await.unwrap(),
data
);
assert!(matches!(
handler.copy_atomic(&src_url, &dest_url),
Err(Error::FileAlreadyExists(_))
));
let missing_url = Url::from_file_path(tmp.path().join("missing.txt")).unwrap();
let new_dest_url = Url::from_file_path(tmp.path().join("new_dest.txt")).unwrap();
assert!(handler.copy_atomic(&missing_url, &new_dest_url).is_err());
}
#[tokio::test]
async fn test_head() {
let (tmp, store, handler) = setup_test();
let data = Bytes::from("test-content");
let file_path = Path::from_absolute_path(tmp.path().join("test.txt")).unwrap();
let write_time = current_time_duration().unwrap();
store.put(&file_path, data.clone().into()).await.unwrap();
let file_url = Url::from_file_path(tmp.path().join("test.txt")).unwrap();
let file_meta = handler.head(&file_url).unwrap();
assert_eq!(file_meta.location, file_url);
assert_eq!(file_meta.size, data.len() as u64);
let meta_time = Duration::from_millis(file_meta.last_modified as u64);
assert!(
meta_time.abs_diff(write_time) < Duration::from_millis(100),
"last_modified timestamp should be around {} ms, but was {} ms",
write_time.as_millis(),
meta_time.as_millis()
);
}
#[tokio::test]
async fn test_head_non_existent() {
let (tmp, _store, handler) = setup_test();
let missing_url = Url::from_file_path(tmp.path().join("missing.txt")).unwrap();
let result = handler.head(&missing_url);
assert!(matches!(result, Err(Error::FileNotFound(_))));
}
#[test]
fn test_put() {
let (tmp, _store, handler) = setup_test();
let data = Bytes::from("put-test-data");
let file_url = Url::from_file_path(tmp.path().join("put.txt")).unwrap();
handler.put(&file_url, data.clone(), false).unwrap();
let read_back: Vec<Bytes> = handler
.read_files(vec![(file_url, None)])
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(read_back.len(), 1);
assert_eq!(read_back[0], data);
}
#[test]
fn test_put_already_exists() {
let (tmp, _store, handler) = setup_test();
let data = Bytes::from("original");
let file_url = Url::from_file_path(tmp.path().join("put.txt")).unwrap();
handler.put(&file_url, data, false).unwrap();
let new_data = Bytes::from("updated");
assert!(matches!(
handler.put(&file_url, new_data.clone(), false),
Err(Error::FileAlreadyExists(_))
));
handler.put(&file_url, new_data.clone(), true).unwrap();
let read_back: Vec<Bytes> = handler
.read_files(vec![(file_url, None)])
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(read_back.len(), 1);
assert_eq!(read_back[0], new_data);
}
}