fusio_object_store/
fs.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use std::sync::Arc;

use async_stream::stream;
use fusio::{
    fs::{FileMeta, Fs, OpenOptions},
    path::Path,
    Error,
};
use futures_core::Stream;
use futures_util::stream::StreamExt;
use object_store::ObjectStore;

use crate::{BoxedError, S3File};

pub struct S3Store<O: ObjectStore> {
    inner: Arc<O>,
}

impl<O: ObjectStore> From<O> for S3Store<O> {
    fn from(inner: O) -> Self {
        Self {
            inner: Arc::new(inner),
        }
    }
}

impl<O: ObjectStore> Fs for S3Store<O> {
    type File = S3File<O>;

    async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
        if !options.truncate {
            return Err(Error::Unsupported {
                message: "append mode is not supported in Amazon S3".into(),
            });
        }
        Ok(S3File {
            inner: self.inner.clone(),
            path: path.clone().into(),
            buf: None,
        })
    }

    async fn create_dir_all(_: &Path) -> Result<(), Error> {
        Ok(())
    }

    async fn list(
        &self,
        path: &Path,
    ) -> Result<impl Stream<Item = Result<FileMeta, Error>>, Error> {
        let path = path.clone().into();
        let mut stream = self.inner.list(Some(&path));

        Ok(stream! {
            while let Some(meta) = stream.next().await.transpose().map_err(BoxedError::from)? {
                yield Ok(FileMeta { path: meta.location.into(), size: meta.size as u64 });
            }
        })
    }

    async fn remove(&self, path: &Path) -> Result<(), Error> {
        let path = path.clone().into();
        self.inner.delete(&path).await.map_err(BoxedError::from)?;

        Ok(())
    }
}