vortex_io/object_store/
filesystem.rs1use std::fmt::Debug;
7use std::fmt::Formatter;
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use futures::StreamExt;
12use futures::stream::BoxStream;
13use object_store::ObjectStore;
14use object_store::ObjectStoreExt;
15use object_store::path::Path;
16use vortex_error::VortexResult;
17use vortex_error::vortex_err;
18
19use crate::VortexReadAt;
20use crate::filesystem::FileListing;
21use crate::filesystem::FileSystem;
22use crate::object_store::ObjectStoreReadAt;
23use crate::runtime::Handle;
24
25pub struct ObjectStoreFileSystem {
30 store: Arc<dyn ObjectStore>,
31 handle: Handle,
32}
33
34impl Debug for ObjectStoreFileSystem {
35 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
36 f.debug_struct("ObjectStoreFileSystem")
37 .field("store", &self.store)
38 .finish()
39 }
40}
41
42impl ObjectStoreFileSystem {
43 pub fn new(store: Arc<dyn ObjectStore>, handle: Handle) -> Self {
45 Self { store, handle }
46 }
47
48 pub fn local(handle: Handle) -> Self {
51 Self::new(
52 Arc::new(object_store::local::LocalFileSystem::new()),
53 handle,
54 )
55 }
56}
57
58fn to_object_path(path: &str) -> Path {
70 Path::parse(path).unwrap_or_else(|_| Path::from(path))
71}
72
73fn listing_from_meta(location: &Path, size: u64) -> FileListing {
74 FileListing {
75 path: location.to_string(),
76 size: Some(size),
77 }
78}
79
80#[async_trait]
81impl FileSystem for ObjectStoreFileSystem {
82 fn list(&self, prefix: &str) -> BoxStream<'_, VortexResult<FileListing>> {
83 let path = if prefix.is_empty() {
84 None
85 } else {
86 Some(to_object_path(prefix))
87 };
88 self.store
89 .list(path.as_ref())
90 .map(|result| {
91 result
92 .map(|meta| listing_from_meta(&meta.location, meta.size))
93 .map_err(Into::into)
94 })
95 .boxed()
96 }
97
98 async fn head(&self, path: &str) -> VortexResult<Option<FileListing>> {
99 match self.store.head(&to_object_path(path)).await {
102 Ok(meta) => Ok(Some(listing_from_meta(&meta.location, meta.size))),
103 Err(object_store::Error::NotFound { .. }) => Ok(None),
104 Err(e) => Err(e.into()),
105 }
106 }
107
108 async fn open_read(&self, path: &str) -> VortexResult<Arc<dyn VortexReadAt>> {
109 Ok(Arc::new(ObjectStoreReadAt::new(
110 Arc::clone(&self.store),
111 to_object_path(path),
112 self.handle.clone(),
113 )))
114 }
115
116 async fn delete(&self, path: &str) -> VortexResult<()> {
117 self.store
118 .delete(
119 &Path::from_url_path(path)
120 .map_err(|_| vortex_err!("invalid path for url {path}"))?,
121 )
122 .await?;
123 Ok(())
124 }
125}
126
127#[cfg(test)]
130#[cfg(feature = "tokio")]
131mod tests {
132 use futures::TryStreamExt;
133 use object_store::ObjectStoreExt;
134 use object_store::local::LocalFileSystem;
135 use object_store::memory::InMemory;
136 use rstest::rstest;
137
138 use super::*;
139 use crate::filesystem::FileSystem;
140 use crate::runtime::Handle;
141
142 async fn memory_fs(files: &[(&str, usize)]) -> VortexResult<ObjectStoreFileSystem> {
147 let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
148 for &(path, size) in files {
149 store
150 .put(&to_object_path(path), vec![0u8; size].into())
151 .await?;
152 }
153 let handle = Handle::find().expect("tokio runtime available within #[tokio::test]");
154 Ok(ObjectStoreFileSystem::new(store, handle))
155 }
156
157 #[tokio::test]
161 async fn test_glob_exact_existing_path() -> VortexResult<()> {
162 let fs = memory_fs(&[("data/file.vortex", 1024)]).await?;
163 let fs_dyn: &dyn FileSystem = &fs;
164 let results: Vec<FileListing> = fs_dyn.glob("data/file.vortex")?.try_collect().await?;
165 assert_eq!(results.len(), 1);
166 assert_eq!(results[0].path, "data/file.vortex");
167 assert_eq!(results[0].size, Some(1024));
168 Ok(())
169 }
170
171 #[tokio::test]
172 async fn test_glob_exact_missing_path_is_empty() -> VortexResult<()> {
173 let fs = memory_fs(&[("data/other.vortex", 1)]).await?;
174 let fs_dyn: &dyn FileSystem = &fs;
175 let results: Vec<FileListing> = fs_dyn.glob("data/missing.vortex")?.try_collect().await?;
176 assert!(results.is_empty());
177 Ok(())
178 }
179
180 #[tokio::test]
182 async fn test_glob_exact_path_ignores_prefix_siblings() -> VortexResult<()> {
183 let fs = memory_fs(&[("foo.vortex", 10), ("foo.vortex.backup", 20)]).await?;
184 let fs_dyn: &dyn FileSystem = &fs;
185 let results: Vec<FileListing> = fs_dyn.glob("foo.vortex")?.try_collect().await?;
186 assert_eq!(results.len(), 1);
187 assert_eq!(results[0].path, "foo.vortex");
188 assert_eq!(results[0].size, Some(10));
189 Ok(())
190 }
191
192 #[tokio::test]
198 #[rstest]
199 #[case::tilde("dir/a~b.vortex")]
200 #[case::percent("dir/a%20b.vortex")]
201 #[case::brackets("dir/a[1].vortex")]
202 #[case::hash("dir/a#b.vortex")]
203 #[case::braces("dir/a{x}.vortex")]
204 #[case::caret("dir/a^b.vortex")]
205 #[case::backslash_tilde("dir/a\\~b.vortex")]
206 #[case::space("dir/a b.vortex")]
207 async fn test_head_open_read_round_trip_special_chars(#[case] path: &str) -> VortexResult<()> {
208 let fs = memory_fs(&[(path, 5)]).await?;
209 assert_eq!(
211 fs.head(path).await?,
212 Some(FileListing {
213 path: path.to_string(),
214 size: Some(5),
215 })
216 );
217 assert_eq!(fs.open_read(path).await?.size().await?, 5);
219 Ok(())
220 }
221
222 #[tokio::test]
226 #[rstest]
227 #[case::tilde("dir/a~b.vortex")]
228 #[case::percent("dir/a%20b.vortex")]
229 #[case::hash("dir/a#b.vortex")]
230 #[case::backslash_tilde("dir/a\\~b.vortex")]
231 #[case::space("dir/a b.vortex")]
232 #[case::plain("dir/plain.vortex")]
233 async fn test_glob_round_trip_special_chars(#[case] path: &str) -> VortexResult<()> {
234 let fs = memory_fs(&[(path, 5)]).await?;
235 let fs_dyn: &dyn FileSystem = &fs;
236 let expected = FileListing {
237 path: path.to_string(),
238 size: Some(5),
239 };
240
241 let exact: Vec<FileListing> = fs_dyn.glob(path)?.try_collect().await?;
243 assert_eq!(exact, vec![expected.clone()]);
244
245 let wild: Vec<FileListing> = fs_dyn.glob("dir/*.vortex")?.try_collect().await?;
247 assert!(
248 wild.contains(&expected),
249 "wildcard glob should list {path:?}, got {wild:?}"
250 );
251
252 assert_eq!(fs.open_read(path).await?.size().await?, 5);
254 Ok(())
255 }
256
257 #[tokio::test]
262 async fn test_local_filesystem_special_char_round_trip() -> anyhow::Result<()> {
263 let dir = tempfile::tempdir()?;
264 std::fs::write(dir.path().join("a~b.vortex"), [0u8; 5])?;
265 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?) as Arc<dyn ObjectStore>;
266 let fs = ObjectStoreFileSystem::new(store, Handle::find().expect("tokio runtime"));
267 let fs_dyn: &dyn FileSystem = &fs;
268 let expected = FileListing {
269 path: "a~b.vortex".to_string(),
270 size: Some(5),
271 };
272
273 let wild: Vec<FileListing> = fs_dyn.glob("*.vortex")?.try_collect().await?;
274 assert!(wild.contains(&expected), "wildcard glob got {wild:?}");
275
276 let exact: Vec<FileListing> = fs_dyn.glob("a~b.vortex")?.try_collect().await?;
277 assert_eq!(exact, vec![expected]);
278
279 assert_eq!(fs.open_read("a~b.vortex").await?.size().await?, 5);
280 Ok(())
281 }
282
283 #[tokio::test]
284 async fn test_head_existing_and_missing() -> VortexResult<()> {
285 let fs = memory_fs(&[("a/b.vortex", 7)]).await?;
286 assert_eq!(
287 fs.head("a/b.vortex").await?,
288 Some(FileListing {
289 path: "a/b.vortex".to_string(),
290 size: Some(7),
291 })
292 );
293 assert_eq!(fs.head("a/missing.vortex").await?, None);
294 Ok(())
295 }
296}