floe-core 0.4.0

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use polars::prelude::DataFrame;
use std::path::Path;
use std::time::Instant;

use crate::errors::RunError;
use crate::io::format::{
    AcceptedMergeMetrics, AcceptedWriteMetrics, AcceptedWriteOutput, AcceptedWritePerfBreakdown,
    AcceptedWriteRequest, CatalogRegistration,
};
use crate::io::storage::{object_store, Target};
use crate::io::unique_seed::seed_from_batches;
use crate::io::write::sink_format::{SeedContext, SinkFormat};
use crate::io::write::strategy::merge::{scd1, scd2, shared};
use crate::{check, config, FloeResult};

mod commit_metrics;
mod options;
pub(crate) mod record_batch;
mod unity;

use self::commit_metrics::delta_commit_metrics_for_target;
pub use self::commit_metrics::{
    delta_commit_metrics_from_log_bytes, delta_commit_metrics_from_log_bytes_best_effort,
    parse_delta_commit_add_stats_bytes, DeltaCommitAddStats,
};
pub use self::options::{delta_write_runtime_options, DeltaWriteRuntimeOptions};

pub(crate) struct DeltaSinkFormat;

pub(crate) static DELTA_SINK_FORMAT: DeltaSinkFormat = DeltaSinkFormat;

#[derive(Debug)]
struct DeltaWriteResult {
    version: i64,
    files_written: Option<u64>,
    part_files: Vec<String>,
    metrics: AcceptedWriteMetrics,
    merge: Option<AcceptedMergeMetrics>,
    schema_evolution: crate::io::format::AcceptedSchemaEvolution,
    perf: AcceptedWritePerfBreakdown,
}

pub fn write_delta_table(
    df: &mut DataFrame,
    target: &Target,
    resolver: &config::StorageResolver,
    entity: &config::EntityConfig,
    mode: config::WriteMode,
) -> FloeResult<i64> {
    Ok(write_delta_table_with_metrics(df, target, resolver, entity, mode)?.version)
}

fn write_delta_table_with_metrics(
    df: &mut DataFrame,
    target: &Target,
    resolver: &config::StorageResolver,
    entity: &config::EntityConfig,
    mode: config::WriteMode,
) -> FloeResult<DeltaWriteResult> {
    if let Target::Local { base_path, .. } = target {
        std::fs::create_dir_all(Path::new(base_path))?;
    }
    let runtime_options = delta_write_runtime_options(entity)?;
    let partition_by = runtime_options.partition_by.clone();
    let target_file_size_bytes = runtime_options.target_file_size_bytes;
    let small_file_threshold_bytes = runtime_options.small_file_threshold_bytes;
    let runtime = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .map_err(|err| Box::new(RunError(format!("delta runtime init failed: {err}"))))?;
    let (version, merge, schema_evolution, mut perf_breakdown) = match mode {
        config::WriteMode::Overwrite | config::WriteMode::Append => {
            let outcome = shared::write_standard_delta_version_with_perf(
                &runtime,
                df,
                target,
                resolver,
                entity,
                mode,
                partition_by,
                target_file_size_bytes,
            )?;
            (
                outcome.version,
                None,
                outcome.schema_evolution,
                AcceptedWritePerfBreakdown {
                    conversion_ms: Some(outcome.perf.conversion_ms),
                    commit_ms: Some(outcome.perf.commit_ms),
                    ..AcceptedWritePerfBreakdown::default()
                },
            )
        }
        config::WriteMode::MergeScd1 => {
            let (version, merge, schema_evolution, perf) = scd1::execute_merge_scd1_with_runtime(
                &runtime,
                df,
                target,
                resolver,
                entity,
                partition_by,
                target_file_size_bytes,
            )?;
            (
                version,
                Some(merge),
                schema_evolution,
                AcceptedWritePerfBreakdown {
                    conversion_ms: Some(perf.conversion_ms),
                    source_df_build_ms: Some(perf.source_df_build_ms),
                    merge_exec_ms: Some(perf.merge_exec_ms),
                    commit_ms: Some(perf.commit_ms),
                    ..AcceptedWritePerfBreakdown::default()
                },
            )
        }
        config::WriteMode::MergeScd2 => {
            let (version, merge, schema_evolution, perf) = scd2::execute_merge_scd2_with_runtime(
                &runtime,
                df,
                target,
                resolver,
                entity,
                partition_by,
                target_file_size_bytes,
            )?;
            (
                version,
                Some(merge),
                schema_evolution,
                AcceptedWritePerfBreakdown {
                    conversion_ms: Some(perf.conversion_ms),
                    source_df_build_ms: Some(perf.source_df_build_ms),
                    merge_exec_ms: Some(perf.merge_exec_ms),
                    commit_ms: Some(perf.commit_ms),
                    ..AcceptedWritePerfBreakdown::default()
                },
            )
        }
    };

    let metrics_read_start = Instant::now();
    let (files_written, part_files, metrics) = delta_commit_metrics_for_target(
        &runtime,
        target,
        resolver,
        entity,
        version,
        small_file_threshold_bytes,
    )?;
    perf_breakdown.metrics_read_ms = Some(metrics_read_start.elapsed().as_millis() as u64);

    Ok(DeltaWriteResult {
        version,
        files_written,
        part_files,
        metrics,
        merge,
        schema_evolution,
        perf: perf_breakdown,
    })
}

impl SinkFormat for DeltaSinkFormat {
    fn format_name(&self) -> &'static str {
        "delta"
    }

    fn supported_modes(&self) -> &'static [config::WriteMode] {
        &[
            config::WriteMode::Overwrite,
            config::WriteMode::Append,
            config::WriteMode::MergeScd1,
            config::WriteMode::MergeScd2,
        ]
    }

    fn supported_storages(&self) -> &'static [&'static str] {
        &["local", "s3", "gcs", "adls"]
    }

    fn write(&self, req: AcceptedWriteRequest<'_>) -> FloeResult<AcceptedWriteOutput> {
        let AcceptedWriteRequest {
            target,
            df,
            mode,
            resolver,
            catalogs,
            entity,
            ..
        } = req;
        let result = write_delta_table_with_metrics(df, target, resolver, entity, mode)?;

        // Post-write Unity Catalog registration (no-op if delta.catalog is not configured).
        let table_uri = target_uri(target);
        let unity_output = match catalogs.resolve_delta_target(entity, &entity.sink.accepted)? {
            Some(resolved) => {
                let cfg = unity::UnityCatalogConfig::from_resolved(&resolved)?;
                let runtime = tokio::runtime::Builder::new_current_thread()
                    .enable_all()
                    .build()
                    .map_err(|err| {
                        Box::new(RunError(format!(
                            "unity catalog runtime init failed: {err}"
                        )))
                    })?;
                runtime
                    .block_on(unity::register_unity_table(&cfg, &table_uri))
                    .map_err(|err| {
                        Box::new(RunError(format!(
                            "entity.name={} unity catalog registration failed: {err}",
                            entity.name
                        )))
                    })?;
                Some(resolved)
            }
            None => None,
        };

        let catalog = unity_output
            .as_ref()
            .map(|r| CatalogRegistration::UnityDelta {
                catalog_name: r.catalog_name.clone(),
                schema: r.schema.clone(),
                table: r.table.clone(),
            });
        Ok(AcceptedWriteOutput {
            files_written: result.files_written,
            parts_written: 1,
            part_files: result.part_files,
            table_version: Some(result.version),
            snapshot_id: None,
            table_root_uri: unity_output.as_ref().map(|_| table_uri),
            catalog,
            metrics: result.metrics,
            merge: result.merge,
            schema_evolution: result.schema_evolution,
            perf: Some(result.perf),
        })
    }

    fn seed_unique_tracker(
        &self,
        tracker: &mut check::UniqueTracker,
        ctx: &mut SeedContext<'_>,
    ) -> FloeResult<()> {
        let store = object_store::delta_store_config(ctx.target, ctx.resolver, ctx.entity)?;
        let builder = deltalake::table::builder::DeltaTableBuilder::from_url(store.table_url)
            .map_err(|err| Box::new(RunError(format!("delta builder failed: {err}"))))?
            .with_storage_options(store.storage_options);
        let runtime = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .map_err(|err| Box::new(RunError(format!("delta runtime init failed: {err}"))))?;
        let table = runtime.block_on(async move { builder.load().await });
        let table = match table {
            Ok(table) => table,
            Err(deltalake::DeltaTableError::NotATable(_)) => return Ok(()),
            Err(err) => return Err(Box::new(RunError(format!("delta load failed: {err}")))),
        };
        let scan_cols = ctx.scan_cols.to_vec();
        let batches = runtime
            .block_on(async {
                let (_t, stream) = table.scan_table().with_columns(scan_cols).await?;
                deltalake::operations::collect_sendable_stream(stream).await
            })
            .map_err(|err| Box::new(RunError(format!("delta scan failed: {err}"))))?;
        seed_from_batches(tracker, batches, ctx.rename_back)
    }
}

fn target_uri(target: &Target) -> String {
    match target {
        Target::Local { base_path, .. } => base_path.clone(),
        Target::S3 { uri, .. } | Target::Gcs { uri, .. } => uri.clone(),
        // Unity Catalog external locations require the secure abfss:// scheme; Floe stores ADLS
        // targets with abfs:// internally, so normalise here before registration.
        Target::Adls { uri, .. } => uri.replacen("abfs://", "abfss://", 1),
    }
}