fusio_object_store/
lib.rs

1pub mod fs;
2
3use std::{ops::Range, sync::Arc};
4
5use fusio::{error::Error, IoBuf, IoBufMut, Read, Write};
6use futures_util::lock::Mutex;
7use object_store::{buffered::BufWriter, path::Path, GetOptions, GetRange, ObjectStore};
8use parquet::arrow::async_writer::{AsyncFileWriter, ParquetObjectWriter};
9
10pub type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
11
12pub struct S3File<O: ObjectStore> {
13    inner: Arc<O>,
14    path: Path,
15    buf: Option<Arc<Mutex<ParquetObjectWriter>>>,
16}
17
18impl<O: ObjectStore> S3File<O> {
19    async fn read_with_range<B: IoBufMut>(
20        &mut self,
21        range: GetRange,
22        mut buf: B,
23    ) -> (Result<(), Error>, B) {
24        let opts = GetOptions {
25            range: Some(range),
26            ..Default::default()
27        };
28        let result = match self
29            .inner
30            .get_opts(&self.path, opts)
31            .await
32            .map_err(BoxedError::from)
33        {
34            Ok(result) => result,
35            Err(e) => return (Err(Error::Remote(e)), buf),
36        };
37
38        let bytes = match result.bytes().await.map_err(BoxedError::from) {
39            Ok(bytes) => bytes,
40            Err(e) => return (Err(Error::Other(e)), buf),
41        };
42
43        buf.as_slice_mut().copy_from_slice(&bytes);
44        (Ok(()), buf)
45    }
46}
47
48impl<O: ObjectStore> Read for S3File<O> {
49    async fn read_exact_at<B: IoBufMut>(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) {
50        let range = GetRange::Bounded(Range {
51            start: pos,
52            end: pos + buf.bytes_init() as u64,
53        });
54
55        self.read_with_range(range, buf).await
56    }
57
58    async fn read_to_end_at(&mut self, buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
59        let range = GetRange::Offset(pos);
60
61        let (result, buf) = self.read_with_range(range, buf).await;
62        match result {
63            Ok(_) => (Ok(()), buf),
64            Err(e) => (Err(e), buf),
65        }
66    }
67
68    async fn size(&self) -> Result<u64, Error> {
69        let options = GetOptions {
70            head: true,
71            ..Default::default()
72        };
73        let response = self
74            .inner
75            .get_opts(&self.path, options)
76            .await
77            .map_err(|err| Error::Remote(err.into()))?;
78        Ok(response.meta.size)
79    }
80}
81
82impl<O: ObjectStore> Write for S3File<O> {
83    async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
84        let buf_writer = match self.buf {
85            Some(ref mut buf) => buf,
86            None => {
87                self.buf = Some(Arc::new(Mutex::new(
88                    BufWriter::new(self.inner.clone(), self.path.clone()).into(),
89                )));
90                self.buf.as_mut().unwrap()
91            }
92        };
93        if let Err(e) = buf_writer.lock().await.write(buf.as_bytes()).await {
94            return (Err(Error::Other(e.into())), buf);
95        }
96
97        (Ok(()), buf)
98    }
99
100    async fn flush(&mut self) -> Result<(), Error> {
101        Ok(())
102    }
103
104    async fn close(&mut self) -> Result<(), Error> {
105        if let Some(buf) = self.buf.take() {
106            buf.lock()
107                .await
108                .complete()
109                .await
110                .map_err(|e| Error::Other(e.into()))?;
111        }
112        Ok(())
113    }
114}
115
116#[cfg(test)]
117mod tests {
118
119    #[tokio::test]
120    async fn test_s3() {
121        use std::{env, env::VarError, sync::Arc};
122
123        use bytes::Bytes;
124        use object_store::{aws::AmazonS3Builder, ObjectStore};
125
126        use crate::{Read, S3File, Write};
127
128        let fn_env = || {
129            let region = env::var("TEST_INTEGRATION")?;
130            let bucket_name = env::var("TEST_INTEGRATION")?;
131            let access_key_id = env::var("TEST_INTEGRATION")?;
132            let secret_access_key = env::var("TEST_INTEGRATION")?;
133
134            Ok::<(String, String, String, String), VarError>((
135                region,
136                bucket_name,
137                access_key_id,
138                secret_access_key,
139            ))
140        };
141        if let Ok((region, bucket_name, access_key_id, secret_access_key)) = fn_env() {
142            let path = object_store::path::Path::parse("/test_file").unwrap();
143            let s3 = AmazonS3Builder::new()
144                .with_region(region)
145                .with_bucket_name(bucket_name)
146                .with_access_key_id(access_key_id)
147                .with_secret_access_key(secret_access_key)
148                .build()
149                .unwrap();
150            let _ = s3.delete(&path).await;
151
152            let mut store = S3File {
153                inner: Arc::new(s3),
154                path,
155                buf: None,
156            };
157            let (result, bytes) = store.write_all(Bytes::from("hello! Fusio!")).await;
158            result.unwrap();
159
160            let mut buf = vec![0_u8; bytes.len()];
161            let (result, buf) = store.read_exact_at(&mut buf[..], 0).await;
162            result.unwrap();
163            assert_eq!(buf, &bytes[..]);
164        }
165    }
166}