floe-core 0.3.7

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::collections::HashMap;

use iceberg::io::{CLIENT_REGION, S3_REGION};
use url::Url;

use crate::{config, ConfigError, FloeResult};

use super::Target;

pub struct DeltaStoreConfig {
    pub table_url: Url,
    pub storage_options: HashMap<String, String>,
}

#[derive(Debug)]
pub struct IcebergStoreConfig {
    pub warehouse_location: String,
    pub file_io_props: HashMap<String, String>,
}

pub fn delta_store_config(
    target: &Target,
    resolver: &config::StorageResolver,
    entity: &config::EntityConfig,
) -> FloeResult<DeltaStoreConfig> {
    match target {
        Target::Local { base_path, .. } => {
            let url = Url::from_directory_path(base_path).map_err(|_| {
                Box::new(ConfigError(format!(
                    "entity.name={} delta table path is not a valid url: {}",
                    entity.name, base_path
                )))
            })?;
            Ok(DeltaStoreConfig {
                table_url: url,
                storage_options: HashMap::new(),
            })
        }
        Target::S3 {
            storage,
            uri,
            bucket,
            ..
        } => {
            let url = Url::parse(uri).map_err(|err| {
                Box::new(ConfigError(format!(
                    "entity.name={} delta s3 path is invalid: {} ({err})",
                    entity.name, uri
                )))
            })?;
            let mut storage_options = HashMap::new();
            if let Some(definition) = resolver.definition(storage) {
                if let Some(region) = definition.region {
                    storage_options.insert("region".to_string(), region);
                }
            }
            storage_options.insert("bucket".to_string(), bucket.to_string());
            Ok(DeltaStoreConfig {
                table_url: url,
                storage_options,
            })
        }
        Target::Adls {
            uri,
            account,
            container,
            ..
        } => {
            let url = Url::parse(uri).map_err(|err| {
                Box::new(ConfigError(format!(
                    "entity.name={} delta adls path is invalid: {} ({err})",
                    entity.name, uri
                )))
            })?;
            let mut storage_options = HashMap::new();
            storage_options.insert(
                "azure_storage_account_name".to_string(),
                account.to_string(),
            );
            storage_options.insert("azure_container_name".to_string(), container.to_string());
            Ok(DeltaStoreConfig {
                table_url: url,
                storage_options,
            })
        }
        Target::Gcs { uri, .. } => {
            let url = Url::parse(uri).map_err(|err| {
                Box::new(ConfigError(format!(
                    "entity.name={} delta gcs path is invalid: {} ({err})",
                    entity.name, uri
                )))
            })?;
            Ok(DeltaStoreConfig {
                table_url: url,
                storage_options: HashMap::new(),
            })
        }
    }
}

pub fn iceberg_store_config(
    target: &Target,
    resolver: &config::StorageResolver,
    entity: &config::EntityConfig,
) -> FloeResult<IcebergStoreConfig> {
    match target {
        Target::Local { base_path, .. } => Ok(IcebergStoreConfig {
            warehouse_location: base_path.to_string(),
            file_io_props: HashMap::new(),
        }),
        Target::S3 { storage, uri, .. } => {
            let mut file_io_props = HashMap::new();
            if let Some(definition) = resolver.definition(storage) {
                if let Some(region) = definition.region {
                    file_io_props.insert(S3_REGION.to_string(), region.clone());
                    file_io_props.insert(CLIENT_REGION.to_string(), region);
                }
            }
            Ok(IcebergStoreConfig {
                warehouse_location: uri.to_string(),
                file_io_props,
            })
        }
        Target::Gcs { uri, .. } => Ok(IcebergStoreConfig {
            warehouse_location: uri.to_string(),
            file_io_props: HashMap::new(),
        }),
        Target::Adls { .. } => Err(Box::new(ConfigError(format!(
            "entity.name={} iceberg sink is only supported on local, s3, or gcs storage for now",
            entity.name
        )))),
    }
}