floe-core 0.3.6

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::path::{Path, PathBuf};

use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::config::Region;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::Client;
use tokio::io::AsyncWriteExt;
use tokio::runtime::Runtime;

use crate::errors::StorageError;
use crate::io::storage::uri::{format_bucket_uri, parse_bucket_uri, BucketLocation};
use crate::io::storage::{planner, ObjectRef, StorageClient};
use crate::FloeResult;

pub struct S3Client {
    bucket: String,
    client: Client,
    runtime: Runtime,
}

impl S3Client {
    pub fn new(bucket: String, region: Option<&str>) -> FloeResult<Self> {
        let runtime = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .map_err(|err| Box::new(StorageError(format!("failed to build aws runtime: {err}"))))?;
        let config = runtime.block_on(async {
            let region_provider = match region {
                Some(region) => RegionProviderChain::first_try(Region::new(region.to_string()))
                    .or_default_provider(),
                None => RegionProviderChain::default_provider(),
            };
            aws_config::defaults(aws_config::BehaviorVersion::latest())
                .region(region_provider)
                .load()
                .await
        });
        let client = Client::new(&config);
        Ok(Self {
            bucket,
            client,
            runtime,
        })
    }

    fn bucket(&self) -> &str {
        self.bucket.as_str()
    }
}

impl StorageClient for S3Client {
    fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
        let bucket = self.bucket().to_string();
        let prefix = prefix.to_string();
        self.runtime.block_on(async {
            let mut refs = Vec::new();
            let mut continuation = None;
            loop {
                let mut request = self.client.list_objects_v2().bucket(&bucket);
                if !prefix.is_empty() {
                    request = request.prefix(&prefix);
                }
                if let Some(token) = continuation {
                    request = request.continuation_token(token);
                }
                let response = request.send().await.map_err(|err| {
                    Box::new(StorageError(format!(
                        "s3 list objects failed for bucket {}: {err}",
                        bucket
                    ))) as Box<dyn std::error::Error + Send + Sync>
                })?;
                if let Some(contents) = response.contents {
                    for object in contents {
                        if let Some(key) = object.key {
                            let uri = format_s3_uri(&bucket, &key);
                            refs.push(planner::object_ref(
                                uri,
                                key,
                                object.last_modified.as_ref().map(|value| value.to_string()),
                                object.size.map(|value| value as u64),
                            ));
                        }
                    }
                }
                if response.is_truncated.unwrap_or(false) {
                    continuation = response.next_continuation_token;
                    if continuation.is_none() {
                        break;
                    }
                } else {
                    break;
                }
            }
            Ok(refs)
        })
    }

    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
        let location = parse_s3_uri(uri)?;
        let bucket = location.bucket;
        let key = location.key;
        let dest = planner::temp_path_for_key(temp_dir, &key);
        let dest_clone = dest.clone();
        self.runtime.block_on(async move {
            let response = self
                .client
                .get_object()
                .bucket(bucket)
                .key(key.clone())
                .send()
                .await
                .map_err(|err| {
                    Box::new(StorageError(format!("s3 get object failed: {err}")))
                        as Box<dyn std::error::Error + Send + Sync>
                })?;
            if let Some(parent) = dest_clone.parent() {
                tokio::fs::create_dir_all(parent).await?;
            }
            let mut file = tokio::fs::File::create(&dest_clone).await?;
            let mut reader = response.body.into_async_read();
            tokio::io::copy(&mut reader, &mut file).await?;
            file.flush().await?;
            Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
        })?;
        Ok(dest)
    }

    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
        let location = parse_s3_uri(uri)?;
        let bucket = location.bucket;
        let key = location.key;
        let path = local_path.to_path_buf();
        self.runtime.block_on(async move {
            let body = ByteStream::from_path(path).await.map_err(|err| {
                Box::new(StorageError(format!("s3 upload body failed: {err}")))
                    as Box<dyn std::error::Error + Send + Sync>
            })?;
            self.client
                .put_object()
                .bucket(bucket)
                .key(key)
                .body(body)
                .send()
                .await
                .map_err(|err| {
                    Box::new(StorageError(format!("s3 put object failed: {err}")))
                        as Box<dyn std::error::Error + Send + Sync>
                })?;
            Ok(())
        })
    }

    fn resolve_uri(&self, path: &str) -> FloeResult<String> {
        Ok(format_s3_uri(self.bucket(), path.trim_start_matches('/')))
    }

    fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
        let src = parse_s3_uri(src_uri)?;
        let dst = parse_s3_uri(dst_uri)?;
        let copy_source = format!("{}/{}", src.bucket, src.key);
        self.runtime.block_on(async move {
            self.client
                .copy_object()
                .bucket(dst.bucket)
                .key(dst.key)
                .copy_source(copy_source)
                .send()
                .await
                .map_err(|err| {
                    Box::new(StorageError(format!("s3 copy object failed: {err}")))
                        as Box<dyn std::error::Error + Send + Sync>
                })?;
            Ok(())
        })
    }

    fn delete_object(&self, uri: &str) -> FloeResult<()> {
        let location = parse_s3_uri(uri)?;
        let bucket = location.bucket;
        let key = location.key;
        self.runtime.block_on(async move {
            self.client
                .delete_object()
                .bucket(bucket)
                .key(key)
                .send()
                .await
                .map_err(|err| {
                    Box::new(StorageError(format!("s3 delete object failed: {err}")))
                        as Box<dyn std::error::Error + Send + Sync>
                })?;
            Ok(())
        })
    }

    fn exists(&self, uri: &str) -> FloeResult<bool> {
        let location = parse_s3_uri(uri)?;
        planner::exists_by_key(self, &location.key)
    }
}

pub fn parse_s3_uri(uri: &str) -> FloeResult<S3Location> {
    parse_bucket_uri("s3", uri)
}

pub fn format_s3_uri(bucket: &str, key: &str) -> String {
    format_bucket_uri("s3", bucket, key)
}

pub type S3Location = BucketLocation;

pub fn filter_keys_by_suffixes(mut keys: Vec<String>, suffixes: &[String]) -> Vec<String> {
    let mut refs = Vec::with_capacity(keys.len());
    for key in keys.drain(..) {
        refs.push(ObjectRef {
            uri: key.clone(),
            key,
            last_modified: None,
            size: None,
        });
    }
    let filtered = planner::filter_by_suffixes(refs, suffixes);
    let sorted = planner::stable_sort_refs(filtered);
    sorted.into_iter().map(|obj| obj.key).collect()
}