use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
use async_trait::async_trait;
use futures::StreamExt;
use futures::stream::BoxStream;
use object_store::ObjectStore;
use object_store::ObjectStoreExt;
use object_store::path::Path;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use crate::VortexReadAt;
use crate::filesystem::FileListing;
use crate::filesystem::FileSystem;
use crate::object_store::ObjectStoreReadAt;
use crate::runtime::Handle;
pub struct ObjectStoreFileSystem {
store: Arc<dyn ObjectStore>,
handle: Handle,
}
impl Debug for ObjectStoreFileSystem {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ObjectStoreFileSystem")
.field("store", &self.store)
.finish()
}
}
impl ObjectStoreFileSystem {
pub fn new(store: Arc<dyn ObjectStore>, handle: Handle) -> Self {
Self { store, handle }
}
pub fn local(handle: Handle) -> Self {
Self::new(
Arc::new(object_store::local::LocalFileSystem::new()),
handle,
)
}
}
fn to_object_path(path: &str) -> Path {
Path::parse(path).unwrap_or_else(|_| Path::from(path))
}
fn listing_from_meta(location: &Path, size: u64) -> FileListing {
FileListing {
path: location.to_string(),
size: Some(size),
}
}
#[async_trait]
impl FileSystem for ObjectStoreFileSystem {
fn list(&self, prefix: &str) -> BoxStream<'_, VortexResult<FileListing>> {
let path = if prefix.is_empty() {
None
} else {
Some(to_object_path(prefix))
};
self.store
.list(path.as_ref())
.map(|result| {
result
.map(|meta| listing_from_meta(&meta.location, meta.size))
.map_err(Into::into)
})
.boxed()
}
async fn head(&self, path: &str) -> VortexResult<Option<FileListing>> {
match self.store.head(&to_object_path(path)).await {
Ok(meta) => Ok(Some(listing_from_meta(&meta.location, meta.size))),
Err(object_store::Error::NotFound { .. }) => Ok(None),
Err(e) => Err(e.into()),
}
}
async fn open_read(&self, path: &str) -> VortexResult<Arc<dyn VortexReadAt>> {
Ok(Arc::new(ObjectStoreReadAt::new(
Arc::clone(&self.store),
to_object_path(path),
self.handle.clone(),
)))
}
async fn delete(&self, path: &str) -> VortexResult<()> {
self.store
.delete(
&Path::from_url_path(path)
.map_err(|_| vortex_err!("invalid path for url {path}"))?,
)
.await?;
Ok(())
}
}
#[cfg(test)]
#[cfg(feature = "tokio")]
mod tests {
use futures::TryStreamExt;
use object_store::ObjectStoreExt;
use object_store::local::LocalFileSystem;
use object_store::memory::InMemory;
use rstest::rstest;
use super::*;
use crate::filesystem::FileSystem;
use crate::runtime::Handle;
async fn memory_fs(files: &[(&str, usize)]) -> VortexResult<ObjectStoreFileSystem> {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
for &(path, size) in files {
store
.put(&to_object_path(path), vec![0u8; size].into())
.await?;
}
let handle = Handle::find().expect("tokio runtime available within #[tokio::test]");
Ok(ObjectStoreFileSystem::new(store, handle))
}
#[tokio::test]
async fn test_glob_exact_existing_path() -> VortexResult<()> {
let fs = memory_fs(&[("data/file.vortex", 1024)]).await?;
let fs_dyn: &dyn FileSystem = &fs;
let results: Vec<FileListing> = fs_dyn.glob("data/file.vortex")?.try_collect().await?;
assert_eq!(results.len(), 1);
assert_eq!(results[0].path, "data/file.vortex");
assert_eq!(results[0].size, Some(1024));
Ok(())
}
#[tokio::test]
async fn test_glob_exact_missing_path_is_empty() -> VortexResult<()> {
let fs = memory_fs(&[("data/other.vortex", 1)]).await?;
let fs_dyn: &dyn FileSystem = &fs;
let results: Vec<FileListing> = fs_dyn.glob("data/missing.vortex")?.try_collect().await?;
assert!(results.is_empty());
Ok(())
}
#[tokio::test]
async fn test_glob_exact_path_ignores_prefix_siblings() -> VortexResult<()> {
let fs = memory_fs(&[("foo.vortex", 10), ("foo.vortex.backup", 20)]).await?;
let fs_dyn: &dyn FileSystem = &fs;
let results: Vec<FileListing> = fs_dyn.glob("foo.vortex")?.try_collect().await?;
assert_eq!(results.len(), 1);
assert_eq!(results[0].path, "foo.vortex");
assert_eq!(results[0].size, Some(10));
Ok(())
}
#[tokio::test]
#[rstest]
#[case::tilde("dir/a~b.vortex")]
#[case::percent("dir/a%20b.vortex")]
#[case::brackets("dir/a[1].vortex")]
#[case::hash("dir/a#b.vortex")]
#[case::braces("dir/a{x}.vortex")]
#[case::caret("dir/a^b.vortex")]
#[case::backslash_tilde("dir/a\\~b.vortex")]
#[case::space("dir/a b.vortex")]
async fn test_head_open_read_round_trip_special_chars(#[case] path: &str) -> VortexResult<()> {
let fs = memory_fs(&[(path, 5)]).await?;
assert_eq!(
fs.head(path).await?,
Some(FileListing {
path: path.to_string(),
size: Some(5),
})
);
assert_eq!(fs.open_read(path).await?.size().await?, 5);
Ok(())
}
#[tokio::test]
#[rstest]
#[case::tilde("dir/a~b.vortex")]
#[case::percent("dir/a%20b.vortex")]
#[case::hash("dir/a#b.vortex")]
#[case::backslash_tilde("dir/a\\~b.vortex")]
#[case::space("dir/a b.vortex")]
#[case::plain("dir/plain.vortex")]
async fn test_glob_round_trip_special_chars(#[case] path: &str) -> VortexResult<()> {
let fs = memory_fs(&[(path, 5)]).await?;
let fs_dyn: &dyn FileSystem = &fs;
let expected = FileListing {
path: path.to_string(),
size: Some(5),
};
let exact: Vec<FileListing> = fs_dyn.glob(path)?.try_collect().await?;
assert_eq!(exact, vec![expected.clone()]);
let wild: Vec<FileListing> = fs_dyn.glob("dir/*.vortex")?.try_collect().await?;
assert!(
wild.contains(&expected),
"wildcard glob should list {path:?}, got {wild:?}"
);
assert_eq!(fs.open_read(path).await?.size().await?, 5);
Ok(())
}
#[tokio::test]
async fn test_local_filesystem_special_char_round_trip() -> anyhow::Result<()> {
let dir = tempfile::tempdir()?;
std::fs::write(dir.path().join("a~b.vortex"), [0u8; 5])?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?) as Arc<dyn ObjectStore>;
let fs = ObjectStoreFileSystem::new(store, Handle::find().expect("tokio runtime"));
let fs_dyn: &dyn FileSystem = &fs;
let expected = FileListing {
path: "a~b.vortex".to_string(),
size: Some(5),
};
let wild: Vec<FileListing> = fs_dyn.glob("*.vortex")?.try_collect().await?;
assert!(wild.contains(&expected), "wildcard glob got {wild:?}");
let exact: Vec<FileListing> = fs_dyn.glob("a~b.vortex")?.try_collect().await?;
assert_eq!(exact, vec![expected]);
assert_eq!(fs.open_read("a~b.vortex").await?.size().await?, 5);
Ok(())
}
#[tokio::test]
async fn test_head_existing_and_missing() -> VortexResult<()> {
let fs = memory_fs(&[("a/b.vortex", 7)]).await?;
assert_eq!(
fs.head("a/b.vortex").await?,
Some(FileListing {
path: "a/b.vortex".to_string(),
size: Some(7),
})
);
assert_eq!(fs.head("a/missing.vortex").await?, None);
Ok(())
}
}