fusio_object_store/
fs.rs

1use std::sync::Arc;
2
3use async_stream::stream;
4use fusio::{
5    error::Error,
6    fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
7    path::Path,
8};
9use futures_core::Stream;
10use futures_util::stream::StreamExt;
11use object_store::ObjectStore;
12
13use crate::{BoxedError, S3File};
14
15pub struct S3Store<O: ObjectStore> {
16    inner: Arc<O>,
17}
18
19impl<O: ObjectStore> From<O> for S3Store<O> {
20    fn from(inner: O) -> Self {
21        Self {
22            inner: Arc::new(inner),
23        }
24    }
25}
26
27impl<O: ObjectStore> Fs for S3Store<O> {
28    type File = S3File<O>;
29
30    fn file_system(&self) -> FileSystemTag {
31        FileSystemTag::S3
32    }
33
34    async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
35        if !options.truncate {
36            return Err(Error::Unsupported {
37                message: "append mode is not supported in Amazon S3".into(),
38            });
39        }
40        Ok(S3File {
41            inner: self.inner.clone(),
42            path: path.clone().into(),
43            buf: None,
44        })
45    }
46
47    async fn create_dir_all(_: &Path) -> Result<(), Error> {
48        Ok(())
49    }
50
51    async fn list(
52        &self,
53        path: &Path,
54    ) -> Result<impl Stream<Item = Result<FileMeta, Error>>, Error> {
55        let path = path.clone().into();
56        let mut stream = self.inner.list(Some(&path));
57
58        Ok(stream! {
59            while let Some(meta) = stream.next().await.transpose().map_err(|err| Error::Remote(BoxedError::from(err)))? {
60                yield Ok(FileMeta { path: meta.location.into(), size: meta.size });
61            }
62        })
63    }
64
65    async fn remove(&self, path: &Path) -> Result<(), Error> {
66        let path = path.clone().into();
67        self.inner
68            .delete(&path)
69            .await
70            .map_err(|err| Error::Remote(BoxedError::from(err)))?;
71
72        Ok(())
73    }
74
75    async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
76        let from = from.clone().into();
77        let to = to.clone().into();
78
79        self.inner
80            .copy(&from, &to)
81            .await
82            .map_err(|err| Error::Remote(BoxedError::from(err)))?;
83
84        Ok(())
85    }
86
87    async fn link(&self, _: &Path, _: &Path) -> Result<(), Error> {
88        Err(Error::Unsupported {
89            message: "s3 does not support link file".to_string(),
90        })
91    }
92}