wasmcloud-s3 0.10.1

AWS S3 capability provider for the wasmCloud wasm host runtime
Documentation
use crate::FileUpload;
use futures::TryStreamExt;
use hyper::{Client, Uri};
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use hyper_tls::HttpsConnector;
use log::info;
use rusoto_core::credential::{DefaultCredentialsProvider, StaticProvider};
use rusoto_core::Region;
use rusoto_s3::HeadObjectOutput;
use rusoto_s3::ListObjectsV2Output;
use rusoto_s3::Object;
use rusoto_s3::{
    CreateBucketRequest, DeleteBucketRequest, DeleteObjectRequest, GetObjectRequest,
    HeadObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
};
use wasmcloud_actor_core::CapabilityConfiguration;

use std::error::Error;

type HttpConnector =
    hyper_proxy::ProxyConnector<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>;

pub(crate) fn client_for_config(
    config: &CapabilityConfiguration,
) -> std::result::Result<S3Client, Box<dyn std::error::Error + Sync + Send>> {
    let region = if config.values.contains_key("REGION") {
        Region::Custom {
            name: config.values["REGION"].clone(),
            endpoint: if config.values.contains_key("ENDPOINT") {
                config.values["ENDPOINT"].clone()
            } else {
                "s3.us-east-1.amazonaws.com".to_string()
            },
        }
    } else {
        Region::UsEast1
    };

    let client = if config.values.contains_key("AWS_ACCESS_KEY") {
        info!("Creating provider from provided keys");
        let provider = StaticProvider::new(
            config.values["AWS_ACCESS_KEY"].to_string(),
            config.values["AWS_SECRET_ACCESS_KEY"].to_string(),
            config.values.get("AWS_TOKEN").cloned(),
            config
                .values
                .get("TOKEN_VALID_FOR")
                .map(|t| t.parse::<i64>().unwrap()),
        );
        let connector: HttpConnector = if let Some(proxy) = config.values.get("HTTP_PROXY") {
            info!("Proxy enabled for S3 client");
            let proxy = Proxy::new(Intercept::All, proxy.parse::<Uri>()?);
            ProxyConnector::from_proxy(hyper_tls::HttpsConnector::new(), proxy)?
        } else {
            ProxyConnector::new(HttpsConnector::new())?
        };
        let mut hyper_builder: hyper::client::Builder = Client::builder();
        // disabling due to connection closed issue
        hyper_builder.pool_max_idle_per_host(0);
        let client = rusoto_core::HttpClient::from_builder(hyper_builder, connector);
        S3Client::new_with(client, provider, region)
    } else {
        info!("Creating provider with default credentials");
        let provider = DefaultCredentialsProvider::new()?;
        S3Client::new_with(
            rusoto_core::request::HttpClient::new().expect("Failed to create HTTP client"),
            provider,
            region,
        )
    };

    Ok(client)
}

pub(crate) async fn create_bucket(
    client: &S3Client,
    name: &str,
) -> Result<(), Box<dyn Error + Sync + Send>> {
    let create_bucket_req = CreateBucketRequest {
        bucket: name.to_string(),
        ..Default::default()
    };
    client.create_bucket(create_bucket_req).await?;
    Ok(())
}

pub(crate) async fn remove_bucket(
    client: &S3Client,
    bucket: &str,
) -> Result<(), Box<dyn Error + Sync + Send>> {
    let delete_bucket_req = DeleteBucketRequest {
        bucket: bucket.to_owned(),
        ..Default::default()
    };

    client.delete_bucket(delete_bucket_req).await?;

    Ok(())
}

pub(crate) async fn remove_object(
    client: &S3Client,
    bucket: &str,
    id: &str,
) -> Result<(), Box<dyn Error + Sync + Send>> {
    let delete_object_req = DeleteObjectRequest {
        bucket: bucket.to_string(),
        key: id.to_string(),
        ..Default::default()
    };

    client.delete_object(delete_object_req).await?;

    Ok(())
}

pub(crate) async fn get_blob_range(
    client: &S3Client,
    bucket: &str,
    id: &str,
    start: u64,
    end: u64,
) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
    let get_req = GetObjectRequest {
        bucket: bucket.to_owned(),
        key: id.to_owned(),
        range: Some(format!("bytes={}-{}", start, end)),
        ..Default::default()
    };

    let result = client.get_object(get_req).await?;
    let stream = result.body.unwrap();
    let body = stream
        .map_ok(|b| bytes::BytesMut::from(&b[..]))
        .try_concat()
        .await
        .unwrap();
    Ok(body.to_vec())
}

pub(crate) async fn complete_upload(
    client: &S3Client,
    upload: &FileUpload,
) -> Result<(), Box<dyn Error + Sync + Send>> {
    let bytes = upload
        .chunks
        .iter()
        .fold(vec![], |a, c| [&a[..], &c.chunk_bytes[..]].concat());
    let put_request = PutObjectRequest {
        bucket: upload.container.to_string(),
        key: upload.id.to_string(),
        body: Some(bytes.into()),
        ..Default::default()
    };

    client.put_object(put_request).await?;
    Ok(())
}

pub(crate) async fn list_objects(
    client: &S3Client,
    bucket: &str,
) -> Result<Option<Vec<Object>>, Box<dyn Error + Sync + Send>> {
    let list_obj_req = ListObjectsV2Request {
        bucket: bucket.to_owned(),
        ..Default::default()
    };
    let res: ListObjectsV2Output = client.list_objects_v2(list_obj_req).await?;

    Ok(res.contents)
}

pub(crate) async fn head_object(
    client: &S3Client,
    bucket: &str,
    key: &str,
) -> Result<HeadObjectOutput, Box<dyn Error + Sync + Send>> {
    let head_req = HeadObjectRequest {
        bucket: bucket.to_owned(),
        key: key.to_owned(),
        ..Default::default()
    };

    client.head_object(head_req).await.map_err(|e| e.into())
}