Skip to main content

vortex_io/object_store/
filesystem.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! [`FileSystem`] implementation backed by an [`ObjectStore`].
5
6use std::fmt::Debug;
7use std::fmt::Formatter;
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use futures::StreamExt;
12use futures::stream::BoxStream;
13use object_store::ObjectStore;
14use object_store::ObjectStoreExt;
15use object_store::path::Path;
16use vortex_error::VortexResult;
17use vortex_error::vortex_err;
18
19use crate::VortexReadAt;
20use crate::filesystem::FileListing;
21use crate::filesystem::FileSystem;
22use crate::object_store::ObjectStoreReadAt;
23use crate::runtime::Handle;
24
25/// A [`FileSystem`] backed by an [`ObjectStore`].
26///
27// TODO(ngates): we could consider spawning a driver task inside this file system such that we can
28//  apply concurrency limits to the overall object store, rather than on a per-file basis.
29pub struct ObjectStoreFileSystem {
30    store: Arc<dyn ObjectStore>,
31    handle: Handle,
32}
33
34impl Debug for ObjectStoreFileSystem {
35    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
36        f.debug_struct("ObjectStoreFileSystem")
37            .field("store", &self.store)
38            .finish()
39    }
40}
41
42impl ObjectStoreFileSystem {
43    /// Create a new filesystem backed by the given object store and runtime handle.
44    pub fn new(store: Arc<dyn ObjectStore>, handle: Handle) -> Self {
45        Self { store, handle }
46    }
47
48    /// Create a new filesystem backed by a local file system object store and the given runtime
49    /// handle.
50    pub fn local(handle: Handle) -> Self {
51        Self::new(
52            Arc::new(object_store::local::LocalFileSystem::new()),
53            handle,
54        )
55    }
56}
57
58#[async_trait]
59impl FileSystem for ObjectStoreFileSystem {
60    fn list(&self, prefix: &str) -> BoxStream<'_, VortexResult<FileListing>> {
61        let path = if prefix.is_empty() {
62            None
63        } else {
64            Some(Path::from(prefix))
65        };
66        self.store
67            .list(path.as_ref())
68            .map(|result| {
69                result
70                    .map(|meta| FileListing {
71                        path: meta.location.to_string(),
72                        size: Some(meta.size),
73                    })
74                    .map_err(Into::into)
75            })
76            .boxed()
77    }
78
79    async fn open_read(&self, path: &str) -> VortexResult<Arc<dyn VortexReadAt>> {
80        Ok(Arc::new(ObjectStoreReadAt::new(
81            Arc::clone(&self.store),
82            path.into(),
83            self.handle.clone(),
84        )))
85    }
86
87    async fn delete(&self, path: &str) -> VortexResult<()> {
88        self.store
89            .delete(
90                &Path::from_url_path(path)
91                    .map_err(|_| vortex_err!("invalid path for url {path}"))?,
92            )
93            .await?;
94        Ok(())
95    }
96}