fusio-object-store 0.4.0

the object_store integration of Fusio.
Documentation
pub mod fs;

use std::{ops::Range, sync::Arc};

use fusio::{error::Error, IoBuf, IoBufMut, Read, Write};
use futures_util::lock::Mutex;
use object_store::{buffered::BufWriter, path::Path, GetOptions, GetRange, ObjectStore};
use parquet::arrow::async_writer::{AsyncFileWriter, ParquetObjectWriter};

pub type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;

pub struct S3File<O: ObjectStore> {
    inner: Arc<O>,
    path: Path,
    buf: Option<Arc<Mutex<ParquetObjectWriter>>>,
}

impl<O: ObjectStore> S3File<O> {
    async fn read_with_range<B: IoBufMut>(
        &mut self,
        range: GetRange,
        mut buf: B,
    ) -> (Result<(), Error>, B) {
        let opts = GetOptions {
            range: Some(range),
            ..Default::default()
        };
        let result = match self
            .inner
            .get_opts(&self.path, opts)
            .await
            .map_err(BoxedError::from)
        {
            Ok(result) => result,
            Err(e) => return (Err(Error::Remote(e.into())), buf),
        };

        let bytes = match result.bytes().await.map_err(BoxedError::from) {
            Ok(bytes) => bytes,
            Err(e) => return (Err(Error::Other(e.into())), buf),
        };

        buf.as_slice_mut().copy_from_slice(&bytes);
        (Ok(()), buf)
    }
}

impl<O: ObjectStore> Read for S3File<O> {
    async fn read_exact_at<B: IoBufMut>(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) {
        let range = GetRange::Bounded(Range {
            start: pos,
            end: pos + buf.bytes_init() as u64,
        });

        self.read_with_range(range, buf).await
    }

    async fn read_to_end_at(&mut self, buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
        let range = GetRange::Offset(pos);

        let (result, buf) = self.read_with_range(range, buf).await;
        match result {
            Ok(_) => (Ok(()), buf),
            Err(e) => (Err(e), buf),
        }
    }

    async fn size(&self) -> Result<u64, Error> {
        let options = GetOptions {
            head: true,
            ..Default::default()
        };
        let response = self
            .inner
            .get_opts(&self.path, options)
            .await
            .map_err(|err| Error::Remote(err.into()))?;
        Ok(response.meta.size as u64)
    }
}

impl<O: ObjectStore> Write for S3File<O> {
    async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
        let buf_writer = match self.buf {
            Some(ref mut buf) => buf,
            None => {
                self.buf = Some(Arc::new(Mutex::new(
                    BufWriter::new(self.inner.clone(), self.path.clone()).into(),
                )));
                self.buf.as_mut().unwrap()
            }
        };
        if let Err(e) = buf_writer.lock().await.write(buf.as_bytes()).await {
            return (Err(Error::Other(e.into())), buf);
        }

        (Ok(()), buf)
    }

    async fn flush(&mut self) -> Result<(), Error> {
        Ok(())
    }

    async fn close(&mut self) -> Result<(), Error> {
        if let Some(buf) = self.buf.take() {
            buf.lock()
                .await
                .complete()
                .await
                .map_err(|e| Error::Other(e.into()))?;
        }
        Ok(())
    }
}

#[cfg(test)]
mod tests {

    #[tokio::test]
    async fn test_s3() {
        use std::{env, env::VarError, sync::Arc};

        use bytes::Bytes;
        use object_store::{aws::AmazonS3Builder, ObjectStore};

        use crate::{Read, S3File, Write};

        let fn_env = || {
            let region = env::var("TEST_INTEGRATION")?;
            let bucket_name = env::var("TEST_INTEGRATION")?;
            let access_key_id = env::var("TEST_INTEGRATION")?;
            let secret_access_key = env::var("TEST_INTEGRATION")?;

            Ok::<(String, String, String, String), VarError>((
                region,
                bucket_name,
                access_key_id,
                secret_access_key,
            ))
        };
        if let Ok((region, bucket_name, access_key_id, secret_access_key)) = fn_env() {
            let path = object_store::path::Path::parse("/test_file").unwrap();
            let s3 = AmazonS3Builder::new()
                .with_region(region)
                .with_bucket_name(bucket_name)
                .with_access_key_id(access_key_id)
                .with_secret_access_key(secret_access_key)
                .build()
                .unwrap();
            let _ = s3.delete(&path).await;

            let mut store = S3File {
                inner: Arc::new(s3),
                path,
                buf: None,
            };
            let (result, bytes) = store.write_all(Bytes::from("hello! Fusio!")).await;
            result.unwrap();

            let mut buf = vec![0_u8; bytes.len()];
            let (result, buf) = store.read_exact_at(&mut buf[..], 0).await;
            result.unwrap();
            assert_eq!(buf, &bytes[..]);
        }
    }
}