lakestream 0.0.3

Portable file-utility for object-stores
Documentation
use async_trait::async_trait;

use super::get::get_object;
use super::list::list_files;
use crate::base::config::Config;
use crate::s3::config::validate_config;
use crate::{
    FileObjectFilter, FileObjectVec, LakestreamError, ObjectStoreTrait,
};

#[derive(Clone)]
pub struct S3Credentials {
    access_key: String,
    secret_key: String,
}

impl S3Credentials {
    pub fn new(access_key: String, secret_key: String) -> S3Credentials {
        S3Credentials {
            access_key,
            secret_key,
        }
    }

    pub fn access_key(&self) -> &str {
        &self.access_key
    }

    pub fn secret_key(&self) -> &str {
        &self.secret_key
    }
}

#[derive(Clone)]
pub struct S3Bucket {
    name: String,
    config: Config,
}

impl S3Bucket {
    pub fn new(
        name: &str,
        mut config: Config,
    ) -> Result<S3Bucket, LakestreamError> {
        validate_config(&mut config)?;

        Ok(S3Bucket {
            name: name.to_string(),
            config,
        })
    }

    pub fn config(&self) -> &Config {
        &self.config
    }

    pub fn bucket_path(&self) -> String {
        let region = self.config.settings.get("AWS_REGION").unwrap();
        let endpoint_url = self
            .config
            .settings
            .get("S3_ENDPOINT_URL")
            .map(String::as_str);
        let name = Some(self.name().to_string());

        configure_bucket_url(region, endpoint_url, name.as_deref())
    }
}

#[async_trait(?Send)]
impl ObjectStoreTrait for S3Bucket {
    fn name(&self) -> &str {
        &self.name
    }

    fn config(&self) -> &Config {
        &self.config
    }
    async fn list_files(
        &self,
        prefix: Option<&str>,
        recursive: bool,
        max_keys: Option<u32>,
        filter: &Option<FileObjectFilter>,
        file_objects: &mut FileObjectVec,
    ) -> Result<(), LakestreamError> {
        list_files(self, prefix, recursive, max_keys, filter, file_objects)
            .await
    }

    async fn get_object(
        &self,
        key: &str,
        data: &mut Vec<u8>,
    ) -> Result<(), LakestreamError> {
        get_object(self, key, data).await
    }
}

pub fn configure_bucket_url(
    region: &str,
    endpoint_url: Option<&str>,
    bucket_name: Option<&str>,
) -> String {
    match endpoint_url {
        Some(url) => match bucket_name {
            Some(name) => format!("{}/{}", url.trim_end_matches('/'), name),
            None => url.to_owned(),
        },
        None => match bucket_name {
            Some(name) => {
                format!("https://{}.s3.{}.amazonaws.com", name, region)
            }
            None => format!("https://s3.{}.amazonaws.com", region),
        },
    }
}