floe-core 0.3.6

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use polars::prelude::DataFrame;
use std::collections::HashSet;
use std::time::Instant;

use crate::errors::RunError;
use crate::io::format::AcceptedMergeMetrics;
use crate::io::storage::Target;
use crate::{config, FloeResult};

use super::{shared, MergeBackend, MergeExecutionContext};

struct DeltaMergeBackend;

pub(crate) fn execute_merge_scd1_with_runtime(
    runtime: &tokio::runtime::Runtime,
    source_df: &mut DataFrame,
    target: &Target,
    resolver: &config::StorageResolver,
    entity: &config::EntityConfig,
    partition_by: Option<Vec<String>>,
    target_file_size_bytes: Option<usize>,
) -> FloeResult<(
    i64,
    AcceptedMergeMetrics,
    crate::io::format::AcceptedSchemaEvolution,
    shared::DeltaMergePerfBreakdown,
)> {
    let ctx = MergeExecutionContext {
        runtime,
        target,
        resolver,
        entity,
        partition_by,
        target_file_size_bytes,
    };
    DeltaMergeBackend.execute_scd1(source_df, &ctx)
}

impl MergeBackend for DeltaMergeBackend {
    fn execute_scd1(
        &self,
        source_df: &mut DataFrame,
        ctx: &MergeExecutionContext<'_>,
    ) -> FloeResult<(
        i64,
        AcceptedMergeMetrics,
        crate::io::format::AcceptedSchemaEvolution,
        shared::DeltaMergePerfBreakdown,
    )> {
        let merge_start = Instant::now();
        let mut perf = shared::DeltaMergePerfBreakdown::default();
        let merge_key = shared::resolve_merge_key(ctx.entity)?;
        // Merge-key uniqueness is enforced upstream by check::UniqueTracker for merge_scd1,
        // so the writer can stay focused on sink-level merge execution.
        let loaded_table =
            shared::load_delta_table(ctx.runtime, ctx.target, ctx.resolver, ctx.entity)?;
        if loaded_table.is_none() {
            let outcome = shared::write_standard_delta_version_with_perf(
                ctx.runtime,
                source_df,
                ctx.target,
                ctx.resolver,
                ctx.entity,
                config::WriteMode::Append,
                ctx.partition_by.clone(),
                ctx.target_file_size_bytes,
            )?;
            perf.conversion_ms = outcome.perf.conversion_ms;
            perf.commit_ms = outcome.perf.commit_ms;
            return Ok((
                outcome.version,
                AcceptedMergeMetrics {
                    merge_key,
                    inserted_count: source_df.height() as u64,
                    updated_count: 0,
                    closed_count: None,
                    unchanged_count: None,
                    target_rows_before: 0,
                    target_rows_after: source_df.height() as u64,
                    merge_elapsed_ms: merge_start.elapsed().as_millis() as u64,
                },
                outcome.schema_evolution,
                perf,
            ));
        }

        let table = loaded_table.expect("checked is_some");
        let conversion_start = Instant::now();
        let source_batch = shared::source_record_batch(source_df, ctx.entity)?;
        perf.conversion_ms = conversion_start.elapsed().as_millis() as u64;
        let schema_evolution = shared::plan_merge_delta_schema_evolution(
            ctx.runtime,
            &source_batch,
            ctx.target,
            ctx.resolver,
            ctx.entity,
            config::WriteMode::MergeScd1,
            &[],
        )?;
        let target_schema_columns = shared::delta_schema_columns(&table)?;
        shared::validate_merge_schema_compatibility(
            &target_schema_columns,
            source_df,
            &ctx.entity.name,
            schema_evolution.merge_schema,
        )?;
        let source_df_build_start = Instant::now();
        let source = shared::source_as_datafusion_df_from_batch(source_batch, &ctx.entity.name)?;
        perf.source_df_build_ms = source_df_build_start.elapsed().as_millis() as u64;
        let source_columns = source_df
            .get_column_names()
            .iter()
            .map(|name| name.to_string())
            .collect::<Vec<_>>();
        let merge_key_set = merge_key.iter().map(String::as_str).collect::<HashSet<_>>();
        let ignore_columns = shared::resolve_merge_ignore_columns(ctx.entity)?;
        let update_columns = source_columns
            .iter()
            .filter(|name| {
                !merge_key_set.contains(name.as_str()) && !ignore_columns.contains(name.as_str())
            })
            .cloned()
            .collect::<Vec<_>>();
        let predicate = shared::merge_predicate_sql(&merge_key);
        let merge_exec_start = Instant::now();
        let merge_result = ctx.runtime.block_on(async move {
            let mut merge = table
                .merge(source, predicate)
                .with_source_alias("source")
                .with_target_alias("target")
                .with_merge_schema(schema_evolution.merge_schema);
            if !update_columns.is_empty() {
                let update_cols = update_columns.clone();
                merge = merge.when_matched_update(|update| {
                    update_cols.iter().fold(update, |builder, column| {
                        builder.update(
                            shared::qualified_column("target", column),
                            shared::qualified_column("source", column),
                        )
                    })
                })?;
            }
            let insert_cols = source_columns.clone();
            merge = merge.when_not_matched_insert(|insert| {
                insert_cols.iter().fold(insert, |builder, column| {
                    builder.set(
                        shared::qualified_column("target", column),
                        shared::qualified_column("source", column),
                    )
                })
            })?;
            merge.await
        });
        perf.merge_exec_ms = merge_exec_start.elapsed().as_millis() as u64;
        let (table, merge_metrics) =
            merge_result.map_err(|err| Box::new(RunError(format!("delta merge failed: {err}"))))?;
        let version = table.version().ok_or_else(|| {
            Box::new(RunError(
                "delta table version missing after merge".to_string(),
            ))
        })?;
        let accepted_merge_metrics = shared::accepted_merge_metrics_from_delta(
            merge_key,
            &merge_metrics,
            merge_start.elapsed().as_millis() as u64,
        );
        Ok((
            version,
            accepted_merge_metrics,
            schema_evolution.summary,
            perf,
        ))
    }
}