Skip to main content

vortex_io/object_store/
filesystem.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! [`FileSystem`] implementation backed by an [`ObjectStore`].
5
6use std::fmt::Debug;
7use std::fmt::Formatter;
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use futures::StreamExt;
12use futures::stream::BoxStream;
13use object_store::ObjectStore;
14use object_store::path::Path;
15use vortex_error::VortexResult;
16
17use crate::VortexReadAt;
18use crate::filesystem::FileListing;
19use crate::filesystem::FileSystem;
20use crate::object_store::ObjectStoreReadAt;
21use crate::runtime::Handle;
22
23/// A [`FileSystem`] backed by an [`ObjectStore`].
24///
25// TODO(ngates): we could consider spawning a driver task inside this file system such that we can
26//  apply concurrency limits to the overall object store, rather than on a per-file basis.
27pub struct ObjectStoreFileSystem {
28    store: Arc<dyn ObjectStore>,
29    handle: Handle,
30}
31
32impl Debug for ObjectStoreFileSystem {
33    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
34        f.debug_struct("ObjectStoreFileSystem")
35            .field("store", &self.store)
36            .finish()
37    }
38}
39
40impl ObjectStoreFileSystem {
41    /// Create a new filesystem backed by the given object store and runtime handle.
42    pub fn new(store: Arc<dyn ObjectStore>, handle: Handle) -> Self {
43        Self { store, handle }
44    }
45
46    /// Create a new filesystem backed by a local file system object store and the given runtime
47    /// handle.
48    pub fn local(handle: Handle) -> Self {
49        Self::new(
50            Arc::new(object_store::local::LocalFileSystem::new()),
51            handle,
52        )
53    }
54}
55
56#[async_trait]
57impl FileSystem for ObjectStoreFileSystem {
58    fn list(&self, prefix: &str) -> BoxStream<'_, VortexResult<FileListing>> {
59        let path = if prefix.is_empty() {
60            None
61        } else {
62            Some(Path::from(prefix))
63        };
64        self.store
65            .list(path.as_ref())
66            .map(|result| {
67                result
68                    .map(|meta| FileListing {
69                        path: meta.location.to_string(),
70                        size: Some(meta.size),
71                    })
72                    .map_err(Into::into)
73            })
74            .boxed()
75    }
76
77    async fn open_read(&self, path: &str) -> VortexResult<Arc<dyn VortexReadAt>> {
78        Ok(Arc::new(ObjectStoreReadAt::new(
79            self.store.clone(),
80            path.into(),
81            self.handle.clone(),
82        )))
83    }
84}