fusio 0.3.0

Fusio provides lean, minimal cost abstraction and extensible Read / Write trait to multiple storage on multiple poll-based / completion-based async runtime.
Documentation
use std::sync::Arc;

use bytes::Buf;
use http::{
    header::{CONTENT_LENGTH, RANGE},
    request::Builder,
    Method, Request,
};
use http_body_util::{BodyExt, Empty};
use percent_encoding::utf8_percent_encode;

use super::{fs::AmazonS3, sign::Sign, S3Error, STRICT_PATH_ENCODE_SET};
use crate::{
    buf::IoBufMut,
    path::Path,
    remotes::{
        aws::{multipart_upload::MultipartUpload, writer::S3Writer},
        http::{HttpClient, HttpError},
    },
    Error, IoBuf, Read, Write,
};

pub struct S3File {
    fs: AmazonS3,
    path: Path,
    writer: Option<S3Writer>,
}

impl S3File {
    pub(crate) fn new(fs: AmazonS3, path: Path) -> Self {
        Self {
            fs,
            path,
            writer: None,
        }
    }

    fn build_request(&self, method: Method) -> Builder {
        let url = format!(
            "{}/{}",
            self.fs.as_ref().options.endpoint,
            utf8_percent_encode(self.path.as_ref(), &STRICT_PATH_ENCODE_SET)
        );

        Request::builder().method(method).uri(url)
    }
}

impl Read for S3File {
    async fn read_exact_at<B: IoBufMut>(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) {
        let request = self
            .build_request(Method::GET)
            .header(
                RANGE,
                format!("bytes={}-{}", pos, pos + buf.as_slice().len() as u64 - 1),
            )
            .body(Empty::new())
            .map_err(|e| S3Error::from(HttpError::from(e)));

        let mut request = match request {
            Ok(request) => request,
            Err(e) => return (Err(e.into()), buf),
        };

        if let Err(e) = request
            .sign(&self.fs.as_ref().options)
            .await
            .map_err(S3Error::from)
        {
            return (Err(e.into()), buf);
        }

        let response = match self
            .fs
            .as_ref()
            .client
            .send_request(request)
            .await
            .map_err(S3Error::from)
        {
            Ok(response) => response,
            Err(e) => return (Err(e.into()), buf),
        };

        if !response.status().is_success() {
            return (
                Err(S3Error::from(HttpError::HttpNotSuccess {
                    status: response.status(),
                    body: String::from_utf8_lossy(
                        &response
                            .into_body()
                            .collect()
                            .await
                            .map(|b| b.to_bytes())
                            .unwrap_or_default(),
                    )
                    .to_string(),
                })
                .into()),
                buf,
            );
        } else {
            match response.into_body().collect().await.map_err(S3Error::from) {
                Ok(body) => {
                    if let Err(e) = std::io::Read::read_exact(
                        &mut body.aggregate().reader(),
                        buf.as_slice_mut(),
                    ) {
                        return (Err(e.into()), buf);
                    }
                }
                Err(e) => return (Err(e.into()), buf),
            }

            (Ok(()), buf)
        }
    }

    async fn read_to_end_at(&mut self, mut buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
        let mut request = match self
            .build_request(Method::GET)
            .header(RANGE, format!("bytes={}-", pos))
            .body(Empty::new())
            .map_err(|e| S3Error::from(HttpError::from(e)))
        {
            Err(e) => return (Err(e.into()), buf),
            Ok(request) => request,
        };

        if let Err(e) = request
            .sign(&self.fs.as_ref().options)
            .await
            .map_err(S3Error::from)
        {
            return (Err(e.into()), buf);
        }

        let response = match self
            .fs
            .as_ref()
            .client
            .send_request(request)
            .await
            .map_err(S3Error::from)
        {
            Ok(response) => response,
            Err(e) => return (Err(e.into()), buf),
        };

        if !response.status().is_success() {
            return (
                Err(S3Error::from(HttpError::HttpNotSuccess {
                    status: response.status(),
                    body: String::from_utf8_lossy(
                        &response
                            .into_body()
                            .collect()
                            .await
                            .map(|b| b.to_bytes())
                            .unwrap_or_default(),
                    )
                    .to_string(),
                })
                .into()),
                buf,
            );
        } else {
            match response.into_body().collect().await.map_err(S3Error::from) {
                Ok(body) => {
                    let mut body = body.to_bytes();
                    buf.resize(body.len(), 0);
                    body.copy_to_slice(&mut buf[..]);
                    (Ok(()), buf)
                }
                Err(e) => (Err(e.into()), buf),
            }
        }
    }

    async fn size(&self) -> Result<u64, Error> {
        let mut request = self
            .build_request(Method::HEAD)
            .body(Empty::new())
            .map_err(|e| S3Error::from(HttpError::from(e)))?;
        request
            .sign(&self.fs.as_ref().options)
            .await
            .map_err(S3Error::from)?;

        let response = self
            .fs
            .as_ref()
            .client
            .send_request(request)
            .await
            .map_err(S3Error::from)?;

        if !response.status().is_success() {
            Err(S3Error::from(HttpError::HttpNotSuccess {
                status: response.status(),
                body: String::from_utf8_lossy(
                    &response
                        .into_body()
                        .collect()
                        .await
                        .map_err(S3Error::from)?
                        .to_bytes(),
                )
                .to_string(),
            })
            .into())
        } else {
            let size = response
                .headers()
                .get(CONTENT_LENGTH)
                .ok_or_else(|| Error::Other("missing content-length header".into()))?
                .to_str()
                .map_err(|e| Error::Other(e.into()))?
                .parse::<u64>()
                .map_err(|e| Error::Other(e.into()))?;
            Ok(size)
        }
    }
}

impl Write for S3File {
    async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
        self.writer
            .get_or_insert_with(|| {
                S3Writer::new(Arc::new(MultipartUpload::new(
                    self.fs.clone(),
                    self.path.clone(),
                )))
            })
            .write_all(buf)
            .await
    }

    async fn flush(&mut self) -> Result<(), Error> {
        if let Some(writer) = self.writer.as_mut() {
            writer.flush().await?;
        }
        Ok(())
    }

    async fn close(&mut self) -> Result<(), Error> {
        if let Some(mut writer) = self.writer.take() {
            writer.close().await?;
        }
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    #[cfg(all(feature = "tokio-http", not(feature = "completion-based")))]
    #[tokio::test]
    async fn write_and_read_s3_file() {
        use std::{env, sync::Arc};

        use crate::{
            remotes::{
                aws::{
                    credential::AwsCredential,
                    fs::{AmazonS3, AmazonS3Inner},
                    options::S3Options,
                    s3::S3File,
                },
                http::{tokio::TokioClient, DynHttpClient, HttpClient},
            },
            Read, Write,
        };

        if env::var("AWS_ACCESS_KEY_ID").is_err() {
            eprintln!("skipping AWS s3 test");
            return;
        }
        let key_id = env::var("AWS_ACCESS_KEY_ID").unwrap();
        let secret_key = env::var("AWS_SECRET_ACCESS_KEY").unwrap();

        let client = TokioClient::new();
        let region = "ap-southeast-1";
        let options = S3Options {
            endpoint: "https://fusio-test.s3.ap-southeast-1.amazonaws.com".into(),
            credential: Some(AwsCredential {
                key_id,
                secret_key,
                token: None,
            }),
            region: region.into(),
            sign_payload: true,
            checksum: false,
        };

        let s3 = AmazonS3 {
            inner: Arc::new(AmazonS3Inner {
                options,
                client: Box::new(client) as Box<dyn DynHttpClient>,
            }),
        };

        let mut s3 = S3File::new(s3, "read-write.txt".into());

        let (result, _) = s3
            .write_all(&b"The answer of life, universe and everthing"[..])
            .await;
        result.unwrap();

        let size = s3.size().await.unwrap();
        assert_eq!(size, 42);
        let buf = Vec::new();
        let (result, buf) = s3.read_to_end_at(buf, 0).await;
        result.unwrap();
        assert_eq!(buf, b"The answer of life, universe and everthing");
    }
}