vortex_io/object_store/
filesystem.rs1use std::fmt::Debug;
7use std::fmt::Formatter;
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use futures::StreamExt;
12use futures::stream::BoxStream;
13use object_store::ObjectStore;
14use object_store::path::Path;
15use vortex_error::VortexResult;
16
17use crate::VortexReadAt;
18use crate::filesystem::FileListing;
19use crate::filesystem::FileSystem;
20use crate::object_store::ObjectStoreReadAt;
21use crate::runtime::Handle;
22
23pub struct ObjectStoreFileSystem {
28 store: Arc<dyn ObjectStore>,
29 handle: Handle,
30}
31
32impl Debug for ObjectStoreFileSystem {
33 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
34 f.debug_struct("ObjectStoreFileSystem")
35 .field("store", &self.store)
36 .finish()
37 }
38}
39
40impl ObjectStoreFileSystem {
41 pub fn new(store: Arc<dyn ObjectStore>, handle: Handle) -> Self {
43 Self { store, handle }
44 }
45
46 pub fn local(handle: Handle) -> Self {
49 Self::new(
50 Arc::new(object_store::local::LocalFileSystem::new()),
51 handle,
52 )
53 }
54}
55
56#[async_trait]
57impl FileSystem for ObjectStoreFileSystem {
58 fn list(&self, prefix: &str) -> BoxStream<'_, VortexResult<FileListing>> {
59 let path = if prefix.is_empty() {
60 None
61 } else {
62 Some(Path::from(prefix))
63 };
64 self.store
65 .list(path.as_ref())
66 .map(|result| {
67 result
68 .map(|meta| FileListing {
69 path: meta.location.to_string(),
70 size: Some(meta.size),
71 })
72 .map_err(Into::into)
73 })
74 .boxed()
75 }
76
77 async fn open_read(&self, path: &str) -> VortexResult<Arc<dyn VortexReadAt>> {
78 Ok(Arc::new(ObjectStoreReadAt::new(
79 self.store.clone(),
80 path.into(),
81 self.handle.clone(),
82 )))
83 }
84}