floe-core 0.3.6

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_glue::config::Region as GlueRegion;
use aws_sdk_glue::types::{
    StorageDescriptor as GlueStorageDescriptor, TableInput as GlueTableInput,
};
use aws_sdk_glue::Client as GlueClient;

use crate::errors::RunError;
use crate::FloeResult;

use super::GlueIcebergCatalogConfig;

#[derive(Debug, Clone)]
pub(super) struct GlueTableState {
    pub(super) metadata_location: Option<String>,
    pub(super) version_id: Option<String>,
}

async fn build_glue_client(region: &str) -> FloeResult<GlueClient> {
    let region_provider =
        RegionProviderChain::first_try(GlueRegion::new(region.to_string())).or_default_provider();
    let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
        .region(region_provider)
        .load()
        .await;
    Ok(GlueClient::new(&config))
}

pub(super) async fn load_glue_table_state(
    glue_cfg: &GlueIcebergCatalogConfig,
) -> FloeResult<GlueTableState> {
    let client = build_glue_client(&glue_cfg.region).await?;
    let response = client
        .get_table()
        .database_name(glue_cfg.database.as_str())
        .name(glue_cfg.table.as_str())
        .send()
        .await;
    match response {
        Ok(output) => {
            let table = output.table().ok_or_else(|| {
                Box::new(RunError(format!(
                    "glue get_table returned no table for {}.{}",
                    glue_cfg.database, glue_cfg.table
                ))) as Box<dyn std::error::Error + Send + Sync>
            })?;
            let parameters = table.parameters();
            let metadata_location =
                parameters.and_then(|params| params.get("metadata_location").cloned());
            let iceberg_param = parameters
                .and_then(|params| params.get("table_type"))
                .map(|value| value.eq_ignore_ascii_case("ICEBERG"))
                .unwrap_or(false);
            if !iceberg_param || metadata_location.is_none() {
                return Err(Box::new(RunError(format!(
                    "glue table {}.{} exists but is not an Iceberg table managed by Floe (missing Iceberg parameters/metadata_location)",
                    glue_cfg.database, glue_cfg.table
                ))));
            }
            Ok(GlueTableState {
                metadata_location,
                version_id: table.version_id().map(ToOwned::to_owned),
            })
        }
        Err(err) => {
            if err
                .as_service_error()
                .is_some_and(|service_err| service_err.is_entity_not_found_exception())
            {
                return Ok(GlueTableState {
                    metadata_location: None,
                    version_id: None,
                });
            }
            Err(Box::new(RunError(format!(
                "glue get_table failed for {}.{}: {err}",
                glue_cfg.database, glue_cfg.table
            ))))
        }
    }
}

pub(super) async fn upsert_glue_table(
    glue_cfg: &GlueIcebergCatalogConfig,
    table_root_uri: &str,
    metadata_location: &str,
    version_id: Option<&str>,
) -> FloeResult<()> {
    let client = build_glue_client(&glue_cfg.region).await?;
    let table_input = build_glue_table_input(glue_cfg, table_root_uri, metadata_location);

    let create_result = client
        .create_table()
        .database_name(glue_cfg.database.as_str())
        .name(glue_cfg.table.as_str())
        .table_input(table_input.clone())
        .send()
        .await;
    match create_result {
        Ok(_) => return Ok(()),
        Err(err) => {
            if !err
                .as_service_error()
                .is_some_and(|service_err| service_err.is_already_exists_exception())
            {
                return Err(Box::new(RunError(format!(
                    "glue create_table failed for {}.{}: {err}",
                    glue_cfg.database, glue_cfg.table
                ))));
            }
        }
    }

    let mut update = client
        .update_table()
        .database_name(glue_cfg.database.as_str())
        .name(glue_cfg.table.as_str())
        .table_input(table_input)
        .skip_archive(true);
    if let Some(version_id) = version_id {
        update = update.version_id(version_id);
    }
    update.send().await.map_err(|err| {
        Box::new(RunError(format!(
            "glue update_table failed for {}.{}: {err}",
            glue_cfg.database, glue_cfg.table
        ))) as Box<dyn std::error::Error + Send + Sync>
    })?;
    Ok(())
}

fn build_glue_table_input(
    glue_cfg: &GlueIcebergCatalogConfig,
    table_root_uri: &str,
    metadata_location: &str,
) -> GlueTableInput {
    let storage_descriptor = GlueStorageDescriptor::builder()
        .location(table_root_uri)
        .build();

    GlueTableInput::builder()
        .name(glue_cfg.table.as_str())
        .table_type("EXTERNAL_TABLE")
        .storage_descriptor(storage_descriptor)
        .set_partition_keys(Some(Vec::new()))
        .parameters("table_type", "ICEBERG")
        .parameters("EXTERNAL", "TRUE")
        .parameters("metadata_location", metadata_location)
        .parameters("floe.iceberg.namespace", glue_cfg.namespace.as_str())
        .build()
        .expect("glue table input builder validated")
}