Skip to main content

vortex_io/object_store/
filesystem.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! [`FileSystem`] implementation backed by an [`ObjectStore`].
5
6use std::fmt::Debug;
7use std::fmt::Formatter;
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use futures::StreamExt;
12use futures::stream::BoxStream;
13use object_store::ObjectStore;
14use object_store::ObjectStoreExt;
15use object_store::path::Path;
16use vortex_error::VortexResult;
17use vortex_error::vortex_err;
18
19use crate::VortexReadAt;
20use crate::filesystem::FileListing;
21use crate::filesystem::FileSystem;
22use crate::object_store::ObjectStoreReadAt;
23use crate::runtime::Handle;
24
25/// A [`FileSystem`] backed by an [`ObjectStore`].
26///
27// TODO(ngates): we could consider spawning a driver task inside this file system such that we can
28//  apply concurrency limits to the overall object store, rather than on a per-file basis.
29pub struct ObjectStoreFileSystem {
30    store: Arc<dyn ObjectStore>,
31    handle: Handle,
32}
33
34impl Debug for ObjectStoreFileSystem {
35    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
36        f.debug_struct("ObjectStoreFileSystem")
37            .field("store", &self.store)
38            .finish()
39    }
40}
41
42impl ObjectStoreFileSystem {
43    /// Create a new filesystem backed by the given object store and runtime handle.
44    pub fn new(store: Arc<dyn ObjectStore>, handle: Handle) -> Self {
45        Self { store, handle }
46    }
47
48    /// Create a new filesystem backed by a local file system object store and the given runtime
49    /// handle.
50    pub fn local(handle: Handle) -> Self {
51        Self::new(
52            Arc::new(object_store::local::LocalFileSystem::new()),
53            handle,
54        )
55    }
56}
57
58/// Convert a literal filesystem path into an object-store [`Path`] (key).
59///
60/// Object stores key their objects *literally*: a file named `a~b.vortex` has the key
61/// `a~b.vortex`, and `LocalFileSystem` likewise surfaces real filenames verbatim. [`Path::parse`]
62/// preserves those characters, whereas [`Path::from`] percent-encodes `~`, `%`, `[`, `]`, `#`,
63/// etc. — turning `a~b.vortex` into the key `a%7Eb.vortex`, which no real object has. Using
64/// `parse` keeps inputs and the keys returned by [`list`](FileSystem::list) on the same literal
65/// representation, so a path from `list`/`head` round-trips back through `open_read` unchanged.
66///
67/// `parse` rejects empty, `.`, and `..` segments; for those we fall back to [`Path::from`], which
68/// normalizes them (this never applies to a key `list` produced, so it cannot break a round-trip).
69fn to_object_path(path: &str) -> Path {
70    Path::parse(path).unwrap_or_else(|_| Path::from(path))
71}
72
73fn listing_from_meta(location: &Path, size: u64) -> FileListing {
74    FileListing {
75        path: location.to_string(),
76        size: Some(size),
77    }
78}
79
80#[async_trait]
81impl FileSystem for ObjectStoreFileSystem {
82    fn list(&self, prefix: &str) -> BoxStream<'_, VortexResult<FileListing>> {
83        let path = if prefix.is_empty() {
84            None
85        } else {
86            Some(to_object_path(prefix))
87        };
88        self.store
89            .list(path.as_ref())
90            .map(|result| {
91                result
92                    .map(|meta| listing_from_meta(&meta.location, meta.size))
93                    .map_err(Into::into)
94            })
95            .boxed()
96    }
97
98    async fn head(&self, path: &str) -> VortexResult<Option<FileListing>> {
99        // `head` issues a single metadata lookup (e.g. an S3 HEAD) for the exact key, unlike
100        // `list`, which enumerates by path-segment prefix and never returns the key itself.
101        match self.store.head(&to_object_path(path)).await {
102            Ok(meta) => Ok(Some(listing_from_meta(&meta.location, meta.size))),
103            Err(object_store::Error::NotFound { .. }) => Ok(None),
104            Err(e) => Err(e.into()),
105        }
106    }
107
108    async fn open_read(&self, path: &str) -> VortexResult<Arc<dyn VortexReadAt>> {
109        Ok(Arc::new(ObjectStoreReadAt::new(
110            Arc::clone(&self.store),
111            to_object_path(path),
112            self.handle.clone(),
113        )))
114    }
115
116    async fn delete(&self, path: &str) -> VortexResult<()> {
117        self.store
118            .delete(
119                &Path::from_url_path(path)
120                    .map_err(|_| vortex_err!("invalid path for url {path}"))?,
121            )
122            .await?;
123        Ok(())
124    }
125}
126
127// Exercises the fix against a real object store, whose `list` excludes the exact-path match.
128// `Handle::find` only resolves a runtime under the `tokio` feature, so gate these tests on it.
129#[cfg(test)]
130#[cfg(feature = "tokio")]
131mod tests {
132    use futures::TryStreamExt;
133    use object_store::ObjectStoreExt;
134    use object_store::local::LocalFileSystem;
135    use object_store::memory::InMemory;
136    use rstest::rstest;
137
138    use super::*;
139    use crate::filesystem::FileSystem;
140    use crate::runtime::Handle;
141
142    /// Build an [`ObjectStoreFileSystem`] over an in-memory store seeded with `(path, size)` files.
143    ///
144    /// Keys are written with [`to_object_path`] so the store holds the same literal keys a real
145    /// backend would (e.g. `a~b.vortex`, not the percent-encoded `a%7Eb.vortex`).
146    async fn memory_fs(files: &[(&str, usize)]) -> VortexResult<ObjectStoreFileSystem> {
147        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
148        for &(path, size) in files {
149            store
150                .put(&to_object_path(path), vec![0u8; size].into())
151                .await?;
152        }
153        let handle = Handle::find().expect("tokio runtime available within #[tokio::test]");
154        Ok(ObjectStoreFileSystem::new(store, handle))
155    }
156
157    /// Regression test for #6599: globbing an exact path that exists must return that one file.
158    /// `ObjectStore::list` never yields the prefix itself, so this would return nothing if the
159    /// exact-path branch used `list`.
160    #[tokio::test]
161    async fn test_glob_exact_existing_path() -> VortexResult<()> {
162        let fs = memory_fs(&[("data/file.vortex", 1024)]).await?;
163        let fs_dyn: &dyn FileSystem = &fs;
164        let results: Vec<FileListing> = fs_dyn.glob("data/file.vortex")?.try_collect().await?;
165        assert_eq!(results.len(), 1);
166        assert_eq!(results[0].path, "data/file.vortex");
167        assert_eq!(results[0].size, Some(1024));
168        Ok(())
169    }
170
171    #[tokio::test]
172    async fn test_glob_exact_missing_path_is_empty() -> VortexResult<()> {
173        let fs = memory_fs(&[("data/other.vortex", 1)]).await?;
174        let fs_dyn: &dyn FileSystem = &fs;
175        let results: Vec<FileListing> = fs_dyn.glob("data/missing.vortex")?.try_collect().await?;
176        assert!(results.is_empty());
177        Ok(())
178    }
179
180    /// `list("foo.vortex")` would surface the prefix-sibling `foo.vortex.backup`; `head` does not.
181    #[tokio::test]
182    async fn test_glob_exact_path_ignores_prefix_siblings() -> VortexResult<()> {
183        let fs = memory_fs(&[("foo.vortex", 10), ("foo.vortex.backup", 20)]).await?;
184        let fs_dyn: &dyn FileSystem = &fs;
185        let results: Vec<FileListing> = fs_dyn.glob("foo.vortex")?.try_collect().await?;
186        assert_eq!(results.len(), 1);
187        assert_eq!(results[0].path, "foo.vortex");
188        assert_eq!(results[0].size, Some(10));
189        Ok(())
190    }
191
192    /// Paths containing characters that `object_store` percent-encodes (`~ % [ ] # { } ^`, …)
193    /// must round-trip on a single literal-path convention: `head` returns the literal key, and
194    /// that exact string must reopen the file via `open_read` (what multi-file scan does).
195    /// Previously these were converted with `Path::from`, which encoded them into a key no real
196    /// object has, so the file was silently lost.
197    #[tokio::test]
198    #[rstest]
199    #[case::tilde("dir/a~b.vortex")]
200    #[case::percent("dir/a%20b.vortex")]
201    #[case::brackets("dir/a[1].vortex")]
202    #[case::hash("dir/a#b.vortex")]
203    #[case::braces("dir/a{x}.vortex")]
204    #[case::caret("dir/a^b.vortex")]
205    #[case::backslash_tilde("dir/a\\~b.vortex")]
206    #[case::space("dir/a b.vortex")]
207    async fn test_head_open_read_round_trip_special_chars(#[case] path: &str) -> VortexResult<()> {
208        let fs = memory_fs(&[(path, 5)]).await?;
209        // `head` returns the literal key, matching the caller's input.
210        assert_eq!(
211            fs.head(path).await?,
212            Some(FileListing {
213                path: path.to_string(),
214                size: Some(5),
215            })
216        );
217        // and that literal path reopens the file (`size()` issues a `head` under the hood).
218        assert_eq!(fs.open_read(path).await?.size().await?, 5);
219        Ok(())
220    }
221
222    /// Glob (both branches) over paths with encoded—but not glob-metacharacter—characters returns
223    /// the literal path, which must reopen the file. (`*`, `?`, `[` are excluded here: they make
224    /// the input a pattern rather than an exact path.)
225    #[tokio::test]
226    #[rstest]
227    #[case::tilde("dir/a~b.vortex")]
228    #[case::percent("dir/a%20b.vortex")]
229    #[case::hash("dir/a#b.vortex")]
230    #[case::backslash_tilde("dir/a\\~b.vortex")]
231    #[case::space("dir/a b.vortex")]
232    #[case::plain("dir/plain.vortex")]
233    async fn test_glob_round_trip_special_chars(#[case] path: &str) -> VortexResult<()> {
234        let fs = memory_fs(&[(path, 5)]).await?;
235        let fs_dyn: &dyn FileSystem = &fs;
236        let expected = FileListing {
237            path: path.to_string(),
238            size: Some(5),
239        };
240
241        // Exact-path glob returns the literal path.
242        let exact: Vec<FileListing> = fs_dyn.glob(path)?.try_collect().await?;
243        assert_eq!(exact, vec![expected.clone()]);
244
245        // Wildcard glob over the directory lists the same literal path (not an encoded one).
246        let wild: Vec<FileListing> = fs_dyn.glob("dir/*.vortex")?.try_collect().await?;
247        assert!(
248            wild.contains(&expected),
249            "wildcard glob should list {path:?}, got {wild:?}"
250        );
251
252        // The returned path reopens the file.
253        assert_eq!(fs.open_read(path).await?.size().await?, 5);
254        Ok(())
255    }
256
257    /// The same round-trip against the real local-filesystem backend: a `~` in an on-disk filename
258    /// must be addressed literally (converting it with `Path::from` would percent-encode it and
259    /// miss the file). Confirms the literal-path convention holds across backends, not just for the
260    /// in-memory store.
261    #[tokio::test]
262    async fn test_local_filesystem_special_char_round_trip() -> anyhow::Result<()> {
263        let dir = tempfile::tempdir()?;
264        std::fs::write(dir.path().join("a~b.vortex"), [0u8; 5])?;
265        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?) as Arc<dyn ObjectStore>;
266        let fs = ObjectStoreFileSystem::new(store, Handle::find().expect("tokio runtime"));
267        let fs_dyn: &dyn FileSystem = &fs;
268        let expected = FileListing {
269            path: "a~b.vortex".to_string(),
270            size: Some(5),
271        };
272
273        let wild: Vec<FileListing> = fs_dyn.glob("*.vortex")?.try_collect().await?;
274        assert!(wild.contains(&expected), "wildcard glob got {wild:?}");
275
276        let exact: Vec<FileListing> = fs_dyn.glob("a~b.vortex")?.try_collect().await?;
277        assert_eq!(exact, vec![expected]);
278
279        assert_eq!(fs.open_read("a~b.vortex").await?.size().await?, 5);
280        Ok(())
281    }
282
283    #[tokio::test]
284    async fn test_head_existing_and_missing() -> VortexResult<()> {
285        let fs = memory_fs(&[("a/b.vortex", 7)]).await?;
286        assert_eq!(
287            fs.head("a/b.vortex").await?,
288            Some(FileListing {
289                path: "a/b.vortex".to_string(),
290                size: Some(7),
291            })
292        );
293        assert_eq!(fs.head("a/missing.vortex").await?, None);
294        Ok(())
295    }
296}