rust-integration-services 0.5.12

A modern, fast, and lightweight integration library written in Rust, designed for memory safety and stability.
Documentation
use std::{marker::PhantomData, path::{Path, PathBuf}, sync::Arc};

use aws_config::{BehaviorVersion, Region, SdkConfig};
use aws_sdk_s3::{Client, config::{Credentials, SharedCredentialsProvider}, primitives::ByteStream};
use bytes::Bytes;
use regex::Regex;
use tokio::{fs::File, io::AsyncWriteExt};

use crate::s3::s3_client_config::S3ClientConfig;

pub struct NoBucket;
pub struct HasBucket;

pub struct S3Client<State> {
    client: Arc<Client>,
    bucket: Option<String>,
    _state: PhantomData<State>,
}

impl S3Client<NoBucket> {
    pub fn new(config: S3ClientConfig) -> Self {
        Self {
            client: Arc::new(Self::build_client(config)),
            bucket: None,
            _state: PhantomData
        }
    }

    fn build_client(config: S3ClientConfig) -> Client {
        let creds = Credentials::new(config.access_key, config.secret_key, None, None, "static");
        let provider = SharedCredentialsProvider::new(creds);
        let region = Region::new(config.region);
        
        let sdk_config = SdkConfig::builder()
        .region(region)
        .credentials_provider(provider)
        .behavior_version(BehaviorVersion::latest())
        .endpoint_url(config.endpoint.as_str());

        Client::new(&sdk_config.build())
    }

    pub fn bucket(&self, bucket: impl Into<String>) -> S3Client<HasBucket> {
        S3Client {
            client: self.client.clone(),
            bucket: Some(bucket.into()),
            _state: PhantomData
        }
    }
}

impl S3Client<HasBucket> {
    pub async fn put_object_file(&mut self, key: impl AsRef<str>, file_path: impl AsRef<Path>) -> anyhow::Result<()> {
        let stream = ByteStream::from_path(file_path).await?;
        let _result = self.client
            .put_object()
            .bucket(self.bucket.as_ref().unwrap())
            .key(key.as_ref())
            .body(stream)
            .send()
            .await?;

        Ok(())
    }

    pub async fn put_object_bytes(&mut self, key: impl AsRef<str>, bytes: impl Into<Bytes>) -> anyhow::Result<()> {
        let bytes = bytes.into();
        let _result = self.client
            .put_object()
            .bucket(self.bucket.as_ref().unwrap())
            .key(key.as_ref())
            .body(bytes.into())
            .send()
            .await?;

        Ok(())
    }

    pub async fn get_object(&mut self, key: impl AsRef<str>, target_path: impl AsRef<Path>) -> anyhow::Result<PathBuf> {
        let result = self.client
            .get_object()
            .bucket(self.bucket.as_ref().unwrap())
            .key(key.as_ref())
            .send()
            .await?;

        let file_path = target_path.as_ref().join(PathBuf::from(key.as_ref()));
        let mut file = File::create(&file_path).await?;
        let mut stream = result.body.into_async_read();

        tokio::io::copy(&mut stream, &mut file).await?;
        file.flush().await?;

        Ok(file_path)
    }

    pub async fn get_object_bytes(&mut self, key: impl AsRef<str>) -> anyhow::Result<Bytes> {
        let result = self.client
            .get_object()
            .bucket(self.bucket.as_ref().unwrap())
            .key(key.as_ref())
            .send()
            .await?;

        let body = result.body.collect().await?;
        let bytes = body.into_bytes();

        Ok(bytes)
    }

    pub async fn get_objects(&mut self, target_path: impl AsRef<Path>) -> anyhow::Result<Vec<PathBuf>> {
        let mut objects = Vec::new();

        let mut paginator = self.client
            .list_objects_v2()
            .bucket(self.bucket.as_ref().unwrap())
            .into_paginator()
            .send();

        while let Some(page) = paginator.next().await {
            let page = page?;
            if let Some(contents) = page.contents {
                for obj in contents {
                    if let Some(key) = obj.key {
                        let result = self.client.get_object()
                            .bucket(self.bucket.as_ref().unwrap())
                            .key(&key)
                            .send()
                            .await?;

                        let file_path = target_path.as_ref().join(PathBuf::from(&key));
                        let mut file = File::create(&file_path).await?;
                        let mut stream = result.body.into_async_read();

                        tokio::io::copy(&mut stream, &mut file).await?;
                        file.flush().await?;

                        objects.push(file_path);
                    }
                }
            }
        }

        Ok(objects)
    }

    pub async fn get_objects_with_regex(&mut self, target_path: impl AsRef<Path>, regex: impl AsRef<str>) -> anyhow::Result<Vec<PathBuf>> {
        let mut objects = Vec::new();
        let regex = Regex::new(regex.as_ref())?;

        let mut paginator = self.client
            .list_objects_v2()
            .bucket(self.bucket.as_ref().unwrap())
            .into_paginator()
            .send();

        while let Some(page) = paginator.next().await {
            let page = page?;
            if let Some(contents) = page.contents {
                for obj in contents {
                    if let Some(key) = obj.key {
                        if !regex.is_match(&key) {
                            continue;
                        }

                        let result = self.client.get_object()
                            .bucket(self.bucket.as_ref().unwrap())
                            .key(&key)
                            .send()
                            .await?;

                        let file_path = target_path.as_ref().join(PathBuf::from(&key));
                        let mut file = File::create(&file_path).await?;
                        let mut stream = result.body.into_async_read();

                        tokio::io::copy(&mut stream, &mut file).await?;
                        file.flush().await?;

                        objects.push(file_path);
                    }
                }
            }
        }

        Ok(objects)
    }

    pub async fn delete_object(&mut self, key: impl AsRef<str>) -> anyhow::Result<()> {
        let _result = self.client
        .delete_object()
        .bucket(self.bucket.as_ref().unwrap())
        .key(key.as_ref())
        .send()
        .await?;

        Ok(())
    }
}