vortex_io/object_store/
filesystem.rs1use std::fmt::Debug;
7use std::fmt::Formatter;
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use futures::StreamExt;
12use futures::stream::BoxStream;
13use object_store::ObjectStore;
14use object_store::ObjectStoreExt;
15use object_store::path::Path;
16use vortex_error::VortexResult;
17use vortex_error::vortex_err;
18
19use crate::VortexReadAt;
20use crate::filesystem::FileListing;
21use crate::filesystem::FileSystem;
22use crate::object_store::ObjectStoreReadAt;
23use crate::runtime::Handle;
24
25pub struct ObjectStoreFileSystem {
30 store: Arc<dyn ObjectStore>,
31 handle: Handle,
32}
33
34impl Debug for ObjectStoreFileSystem {
35 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
36 f.debug_struct("ObjectStoreFileSystem")
37 .field("store", &self.store)
38 .finish()
39 }
40}
41
42impl ObjectStoreFileSystem {
43 pub fn new(store: Arc<dyn ObjectStore>, handle: Handle) -> Self {
45 Self { store, handle }
46 }
47
48 pub fn local(handle: Handle) -> Self {
51 Self::new(
52 Arc::new(object_store::local::LocalFileSystem::new()),
53 handle,
54 )
55 }
56}
57
58#[async_trait]
59impl FileSystem for ObjectStoreFileSystem {
60 fn list(&self, prefix: &str) -> BoxStream<'_, VortexResult<FileListing>> {
61 let path = if prefix.is_empty() {
62 None
63 } else {
64 Some(Path::from(prefix))
65 };
66 self.store
67 .list(path.as_ref())
68 .map(|result| {
69 result
70 .map(|meta| FileListing {
71 path: meta.location.to_string(),
72 size: Some(meta.size),
73 })
74 .map_err(Into::into)
75 })
76 .boxed()
77 }
78
79 async fn open_read(&self, path: &str) -> VortexResult<Arc<dyn VortexReadAt>> {
80 Ok(Arc::new(ObjectStoreReadAt::new(
81 Arc::clone(&self.store),
82 path.into(),
83 self.handle.clone(),
84 )))
85 }
86
87 async fn delete(&self, path: &str) -> VortexResult<()> {
88 self.store
89 .delete(
90 &Path::from_url_path(path)
91 .map_err(|_| vortex_err!("invalid path for url {path}"))?,
92 )
93 .await?;
94 Ok(())
95 }
96}