fusio_object_store/
lib.rs1pub mod fs;
2
3use std::{ops::Range, sync::Arc};
4
5use fusio::{error::Error, IoBuf, IoBufMut, Read, Write};
6use futures_util::lock::Mutex;
7use object_store::{buffered::BufWriter, path::Path, GetOptions, GetRange, ObjectStore};
8use parquet::arrow::async_writer::{AsyncFileWriter, ParquetObjectWriter};
9
10pub type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
11
12pub struct S3File<O: ObjectStore> {
13 inner: Arc<O>,
14 path: Path,
15 buf: Option<Arc<Mutex<ParquetObjectWriter>>>,
16}
17
18impl<O: ObjectStore> S3File<O> {
19 async fn read_with_range<B: IoBufMut>(
20 &mut self,
21 range: GetRange,
22 mut buf: B,
23 ) -> (Result<(), Error>, B) {
24 let opts = GetOptions {
25 range: Some(range),
26 ..Default::default()
27 };
28 let result = match self
29 .inner
30 .get_opts(&self.path, opts)
31 .await
32 .map_err(BoxedError::from)
33 {
34 Ok(result) => result,
35 Err(e) => return (Err(Error::Remote(e)), buf),
36 };
37
38 let bytes = match result.bytes().await.map_err(BoxedError::from) {
39 Ok(bytes) => bytes,
40 Err(e) => return (Err(Error::Other(e)), buf),
41 };
42
43 buf.as_slice_mut().copy_from_slice(&bytes);
44 (Ok(()), buf)
45 }
46}
47
48impl<O: ObjectStore> Read for S3File<O> {
49 async fn read_exact_at<B: IoBufMut>(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) {
50 let range = GetRange::Bounded(Range {
51 start: pos,
52 end: pos + buf.bytes_init() as u64,
53 });
54
55 self.read_with_range(range, buf).await
56 }
57
58 async fn read_to_end_at(&mut self, buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
59 let range = GetRange::Offset(pos);
60
61 let (result, buf) = self.read_with_range(range, buf).await;
62 match result {
63 Ok(_) => (Ok(()), buf),
64 Err(e) => (Err(e), buf),
65 }
66 }
67
68 async fn size(&self) -> Result<u64, Error> {
69 let options = GetOptions {
70 head: true,
71 ..Default::default()
72 };
73 let response = self
74 .inner
75 .get_opts(&self.path, options)
76 .await
77 .map_err(|err| Error::Remote(err.into()))?;
78 Ok(response.meta.size)
79 }
80}
81
82impl<O: ObjectStore> Write for S3File<O> {
83 async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
84 let buf_writer = match self.buf {
85 Some(ref mut buf) => buf,
86 None => {
87 self.buf = Some(Arc::new(Mutex::new(
88 BufWriter::new(self.inner.clone(), self.path.clone()).into(),
89 )));
90 self.buf.as_mut().unwrap()
91 }
92 };
93 if let Err(e) = buf_writer.lock().await.write(buf.as_bytes()).await {
94 return (Err(Error::Other(e.into())), buf);
95 }
96
97 (Ok(()), buf)
98 }
99
100 async fn flush(&mut self) -> Result<(), Error> {
101 Ok(())
102 }
103
104 async fn close(&mut self) -> Result<(), Error> {
105 if let Some(buf) = self.buf.take() {
106 buf.lock()
107 .await
108 .complete()
109 .await
110 .map_err(|e| Error::Other(e.into()))?;
111 }
112 Ok(())
113 }
114}
115
116#[cfg(test)]
117mod tests {
118
119 #[tokio::test]
120 async fn test_s3() {
121 use std::{env, env::VarError, sync::Arc};
122
123 use bytes::Bytes;
124 use object_store::{aws::AmazonS3Builder, ObjectStore};
125
126 use crate::{Read, S3File, Write};
127
128 let fn_env = || {
129 let region = env::var("TEST_INTEGRATION")?;
130 let bucket_name = env::var("TEST_INTEGRATION")?;
131 let access_key_id = env::var("TEST_INTEGRATION")?;
132 let secret_access_key = env::var("TEST_INTEGRATION")?;
133
134 Ok::<(String, String, String, String), VarError>((
135 region,
136 bucket_name,
137 access_key_id,
138 secret_access_key,
139 ))
140 };
141 if let Ok((region, bucket_name, access_key_id, secret_access_key)) = fn_env() {
142 let path = object_store::path::Path::parse("/test_file").unwrap();
143 let s3 = AmazonS3Builder::new()
144 .with_region(region)
145 .with_bucket_name(bucket_name)
146 .with_access_key_id(access_key_id)
147 .with_secret_access_key(secret_access_key)
148 .build()
149 .unwrap();
150 let _ = s3.delete(&path).await;
151
152 let mut store = S3File {
153 inner: Arc::new(s3),
154 path,
155 buf: None,
156 };
157 let (result, bytes) = store.write_all(Bytes::from("hello! Fusio!")).await;
158 result.unwrap();
159
160 let mut buf = vec![0_u8; bytes.len()];
161 let (result, buf) = store.read_exact_at(&mut buf[..], 0).await;
162 result.unwrap();
163 assert_eq!(buf, &bytes[..]);
164 }
165 }
166}