1use std::sync::Arc;
2
3use async_stream::stream;
4use fusio::{
5 fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
6 path::Path,
7 Error,
8};
9use futures_core::Stream;
10use futures_util::stream::StreamExt;
11use object_store::ObjectStore;
12
13use crate::{BoxedError, S3File};
14
15pub struct S3Store<O: ObjectStore> {
16 inner: Arc<O>,
17}
18
19impl<O: ObjectStore> From<O> for S3Store<O> {
20 fn from(inner: O) -> Self {
21 Self {
22 inner: Arc::new(inner),
23 }
24 }
25}
26
27impl<O: ObjectStore> Fs for S3Store<O> {
28 type File = S3File<O>;
29
30 fn file_system(&self) -> FileSystemTag {
31 FileSystemTag::S3
32 }
33
34 async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
35 if !options.truncate {
36 return Err(Error::Unsupported {
37 message: "append mode is not supported in Amazon S3".into(),
38 });
39 }
40 Ok(S3File {
41 inner: self.inner.clone(),
42 path: path.clone().into(),
43 buf: None,
44 })
45 }
46
47 async fn create_dir_all(_: &Path) -> Result<(), Error> {
48 Ok(())
49 }
50
51 async fn list(
52 &self,
53 path: &Path,
54 ) -> Result<impl Stream<Item = Result<FileMeta, Error>>, Error> {
55 let path = path.clone().into();
56 let mut stream = self.inner.list(Some(&path));
57
58 Ok(stream! {
59 while let Some(meta) = stream.next().await.transpose().map_err(BoxedError::from)? {
60 yield Ok(FileMeta { path: meta.location.into(), size: meta.size as u64 });
61 }
62 })
63 }
64
65 async fn remove(&self, path: &Path) -> Result<(), Error> {
66 let path = path.clone().into();
67 self.inner.delete(&path).await.map_err(BoxedError::from)?;
68
69 Ok(())
70 }
71
72 async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
73 let from = from.clone().into();
74 let to = to.clone().into();
75
76 self.inner
77 .copy(&from, &to)
78 .await
79 .map_err(BoxedError::from)?;
80
81 Ok(())
82 }
83
84 async fn link(&self, _: &Path, _: &Path) -> Result<(), Error> {
85 Err(Error::Unsupported {
86 message: "s3 does not support link file".to_string(),
87 })
88 }
89}