use std::sync::Arc;
use bytes::Bytes;
use futures::StreamExt as _;
use url::Url;
use super::{put_bytes, resolve_scope};
use crate::object_store::path::Path;
use crate::object_store::DynObjectStore;
#[allow(unused_imports)]
use crate::object_store::ObjectStoreExt as _;
use crate::{DeltaResult, Error, FileMeta, FileSlice, StorageHandler};
pub(crate) struct SyncStorageHandler {
store: Option<Arc<DynObjectStore>>,
}
impl SyncStorageHandler {
pub(crate) fn new(store: Option<Arc<DynObjectStore>>) -> Self {
Self { store }
}
}
impl StorageHandler for SyncStorageHandler {
fn list_from(
&self,
url_path: &Url,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>> {
let (store, base_url, offset) = resolve_scope(self.store.as_ref(), url_path)?;
let prefix = if url_path.path().ends_with('/') {
offset.clone()
} else {
let mut parts: Vec<_> = offset.parts().collect();
parts.pop();
Path::from_iter(parts)
};
let mut metas: Vec<_> = futures::executor::block_on(
store
.list_with_offset(Some(&prefix), &offset)
.collect::<Vec<_>>(),
)
.into_iter()
.collect::<Result<_, _>>()?;
metas.sort_unstable_by(|a, b| a.location.cmp(&b.location));
let iter = metas.into_iter().map(move |meta| {
let location = base_url
.join(meta.location.as_ref())
.map_err(|e| Error::generic(format!("Failed to construct URL: {e}")))?;
Ok(FileMeta {
location,
last_modified: meta.last_modified.timestamp_millis(),
size: meta.size,
})
});
Ok(Box::new(iter))
}
fn read_files(
&self,
files: Vec<FileSlice>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>> {
let store = self.store.clone();
let results: Vec<DeltaResult<Bytes>> = files
.into_iter()
.map(|(url, _range_opt)| {
let (s, _, path) = resolve_scope(store.as_ref(), &url)?;
let get_result = futures::executor::block_on(s.get(&path))?;
Ok(futures::executor::block_on(get_result.bytes())?)
})
.collect();
Ok(Box::new(results.into_iter()))
}
fn put(&self, path: &Url, data: Bytes, overwrite: bool) -> DeltaResult<()> {
put_bytes(self.store.as_ref(), path, data, overwrite)
}
fn copy_atomic(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> {
unimplemented!("SyncStorageHandler does not implement copy");
}
fn head(&self, url: &Url) -> DeltaResult<FileMeta> {
let (store, _, path) = resolve_scope(self.store.as_ref(), url)?;
let meta = futures::executor::block_on(store.head(&path))?;
Ok(FileMeta {
location: url.clone(),
last_modified: meta.last_modified.timestamp_millis(),
size: meta.size,
})
}
}
#[cfg(test)]
mod tests {
use std::fs::File;
use std::io::Write;
use std::time::Duration;
use bytes::{BufMut, BytesMut};
use itertools::Itertools;
use url::Url;
use super::SyncStorageHandler;
use crate::object_store::memory::InMemory;
use crate::object_store::ObjectStoreExt as _;
use crate::utils::current_time_duration;
use crate::{Error, StorageHandler};
fn get_json_filename(index: usize) -> String {
format!("{index:020}.json")
}
#[test]
fn test_file_meta_is_correct() -> Result<(), Box<dyn std::error::Error>> {
let storage = SyncStorageHandler::new(None);
let tmp_dir = tempfile::tempdir().unwrap();
let begin_time = current_time_duration()?;
let path = tmp_dir.path().join(get_json_filename(1));
let mut f = File::create(path)?;
writeln!(f, "null")?;
f.flush()?;
let url_path = tmp_dir.path().join(get_json_filename(0));
let url = Url::from_file_path(url_path).unwrap();
let files: Vec<_> = storage.list_from(&url)?.try_collect()?;
assert!(!files.is_empty());
for meta in files.iter() {
let meta_time = Duration::from_millis(meta.last_modified.try_into()?);
assert!(meta_time.abs_diff(begin_time) < Duration::from_secs(10));
}
Ok(())
}
#[test]
fn test_list_from() -> Result<(), Box<dyn std::error::Error>> {
let storage = SyncStorageHandler::new(None);
let tmp_dir = tempfile::tempdir().unwrap();
let mut expected = vec![];
for i in 0..3 {
let path = tmp_dir.path().join(get_json_filename(i));
expected.push(path.clone());
let mut f = File::create(path)?;
writeln!(f, "null")?;
}
let url_path = tmp_dir.path().join(get_json_filename(1));
let url = Url::from_file_path(url_path).unwrap();
let list = storage.list_from(&url)?;
let mut file_count = 0;
for (i, file) in list.enumerate() {
assert_eq!(
file?.location.to_file_path().unwrap().to_str().unwrap(),
expected[i + 2].to_str().unwrap()
);
file_count += 1;
}
assert_eq!(file_count, 1);
let url = Url::from_directory_path(tmp_dir.path()).unwrap();
let list = storage.list_from(&url)?;
file_count = list.count();
assert_eq!(file_count, 3);
let url_path = tmp_dir.path().join(format!("{:020}", 1));
let url = Url::from_file_path(url_path).unwrap();
let list = storage.list_from(&url)?;
file_count = list.count();
assert_eq!(file_count, 2);
Ok(())
}
#[test]
fn test_read_files() -> Result<(), Box<dyn std::error::Error>> {
let storage = SyncStorageHandler::new(None);
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().join(get_json_filename(1));
let mut f = File::create(path.clone())?;
writeln!(f, "null")?;
let url = Url::from_file_path(path).unwrap();
let file_slice = (url.clone(), None);
let read = storage.read_files(vec![file_slice])?;
let mut file_count = 0;
let mut buf = BytesMut::with_capacity(16);
buf.put(&b"null\n"[..]);
let a = buf.split();
for result in read {
let result = result?;
assert_eq!(result, a);
file_count += 1;
}
assert_eq!(file_count, 1);
Ok(())
}
#[tokio::test]
async fn list_from_store_is_recursive() {
let store = std::sync::Arc::new(InMemory::new());
store
.put(
&crate::object_store::path::Path::from("a/file1.json"),
bytes::Bytes::from("x").into(),
)
.await
.unwrap();
store
.put(
&crate::object_store::path::Path::from("a/sub/file2.json"),
bytes::Bytes::from("y").into(),
)
.await
.unwrap();
let storage = SyncStorageHandler::new(Some(store));
let url = Url::parse("memory:///a/").unwrap();
let listed: Vec<_> = storage
.list_from(&url)
.unwrap()
.map(Result::unwrap)
.collect();
let names: Vec<_> = listed
.iter()
.map(|f| f.location.path().to_string())
.collect();
assert_eq!(names, vec!["/a/file1.json", "/a/sub/file2.json"]);
}
#[test]
fn head_local_returns_metadata() {
let storage = SyncStorageHandler::new(None);
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().join("file.json");
std::fs::write(&path, b"hello").unwrap();
let url = Url::from_file_path(&path).unwrap();
let meta = storage.head(&url).unwrap();
assert_eq!(meta.location, url);
assert_eq!(meta.size, 5);
}
#[test]
fn head_local_missing_file_errors() {
let storage = SyncStorageHandler::new(None);
let tmp_dir = tempfile::tempdir().unwrap();
let url = Url::from_file_path(tmp_dir.path().join("missing.json")).unwrap();
assert!(matches!(
storage.head(&url).unwrap_err(),
Error::FileNotFound(_)
));
}
#[tokio::test]
async fn head_store_returns_metadata() {
let store = std::sync::Arc::new(InMemory::new());
let path = crate::object_store::path::Path::from("file.json");
store
.put(&path, bytes::Bytes::from("hello").into())
.await
.unwrap();
let storage = SyncStorageHandler::new(Some(store));
let url = Url::parse("memory:///file.json").unwrap();
let meta = storage.head(&url).unwrap();
assert_eq!(meta.location, url);
assert_eq!(meta.size, 5);
}
#[tokio::test]
async fn put_store_creates_and_blocks_overwrite() {
let store = std::sync::Arc::new(InMemory::new());
let storage = SyncStorageHandler::new(Some(store.clone()));
let url = Url::parse("memory:///file.json").unwrap();
storage
.put(&url, bytes::Bytes::from("first"), false)
.unwrap();
let path = crate::object_store::path::Path::from("file.json");
let bytes = futures::executor::block_on(async {
store.get(&path).await.unwrap().bytes().await.unwrap()
});
assert_eq!(bytes.as_ref(), b"first");
let err = storage
.put(&url, bytes::Bytes::from("second"), false)
.unwrap_err();
assert!(matches!(err, Error::FileAlreadyExists(_)));
storage
.put(&url, bytes::Bytes::from("third"), true)
.unwrap();
let bytes = futures::executor::block_on(async {
store.get(&path).await.unwrap().bytes().await.unwrap()
});
assert_eq!(bytes.as_ref(), b"third");
}
}