floe-core 0.3.7

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::collections::HashMap;
use std::path::{Path, PathBuf};

use crate::{config, ConfigError, FloeResult};

pub mod core;
pub mod object_store;
pub mod ops;
pub mod providers;
pub mod target;

pub use core::{extensions, paths, placement, planner, uri, validation};
pub use ops::{archive, inputs, output};
pub use placement::OutputPlacement;
pub use planner::{
    filter_by_suffixes, join_prefix, normalize_separators, stable_sort_refs, temp_path_for_key,
};
pub use providers::{adls, gcs, local, s3};
pub use target::Target;

pub use planner::ObjectRef;

pub trait StorageClient: Send + Sync {
    fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>>;
    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf>;
    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()>;
    fn resolve_uri(&self, path: &str) -> FloeResult<String>;
    fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()>;
    fn delete_object(&self, uri: &str) -> FloeResult<()>;
    fn exists(&self, uri: &str) -> FloeResult<bool>;
}

pub struct CloudClient {
    clients: HashMap<String, Box<dyn StorageClient>>,
}

impl CloudClient {
    pub fn new() -> Self {
        Self {
            clients: HashMap::new(),
        }
    }

    pub fn client_for<'a>(
        &'a mut self,
        resolver: &config::StorageResolver,
        storage: &str,
        entity: &config::EntityConfig,
    ) -> FloeResult<&'a mut dyn StorageClient> {
        let context = format!("entity.name={}", entity.name);
        self.client_for_context(resolver, storage, &context)
    }

    pub fn client_for_context<'a>(
        &'a mut self,
        resolver: &config::StorageResolver,
        storage: &str,
        context: &str,
    ) -> FloeResult<&'a mut dyn StorageClient> {
        if !self.clients.contains_key(storage) {
            let definition = resolver.definition(storage).ok_or_else(|| {
                Box::new(ConfigError(format!(
                    "{} storage {} is not defined",
                    context, storage
                ))) as Box<dyn std::error::Error + Send + Sync>
            })?;
            let client = build_client(&definition)?;
            self.clients.insert(storage.to_string(), client);
        }
        Ok(self
            .clients
            .get_mut(storage)
            .expect("storage client inserted")
            .as_mut())
    }
}

impl Default for CloudClient {
    fn default() -> Self {
        Self::new()
    }
}

fn build_client(definition: &config::StorageDefinition) -> FloeResult<Box<dyn StorageClient>> {
    let client: Box<dyn StorageClient> = match definition.fs_type.as_str() {
        "local" => Box::new(local::LocalClient::new()),
        "s3" => {
            let bucket =
                validation::require_field(definition, definition.bucket.as_ref(), "bucket", "s3")?;
            Box::new(s3::S3Client::new(bucket, definition.region.as_deref())?)
        }
        "adls" => Box::new(adls::AdlsClient::new(definition)?),
        "gcs" => {
            let bucket =
                validation::require_field(definition, definition.bucket.as_ref(), "bucket", "gcs")?;
            Box::new(gcs::GcsClient::new(bucket)?)
        }
        other => {
            return Err(Box::new(ConfigError(format!(
                "storage type {} is unsupported",
                other
            ))))
        }
    };
    Ok(client)
}