floe-core 0.3.1

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::path::Path;

use polars::prelude::DataFrame;

use crate::errors::RunError;
use crate::{check, config, io, ConfigError, FloeResult};

use io::format::{self, InputFile};
use io::storage::Target;

pub(super) struct AcceptedOutputContext<'a> {
    pub(super) adapter: &'a dyn format::AcceptedSinkAdapter,
    pub(super) target: &'a Target,
    pub(super) df: &'a mut DataFrame,
    pub(super) output_stem: &'a str,
    pub(super) temp_dir: Option<&'a Path>,
    pub(super) cloud: &'a mut io::storage::CloudClient,
    pub(super) resolver: &'a config::StorageResolver,
    pub(super) catalogs: &'a config::CatalogResolver,
    pub(super) entity: &'a config::EntityConfig,
    pub(super) mode: config::WriteMode,
}

pub(super) struct RejectedOutputContext<'a> {
    pub(super) adapter: &'a dyn format::RejectedSinkAdapter,
    pub(super) target: &'a Target,
    pub(super) df: &'a mut DataFrame,
    pub(super) source_stem: &'a str,
    pub(super) temp_dir: Option<&'a Path>,
    pub(super) cloud: &'a mut io::storage::CloudClient,
    pub(super) resolver: &'a config::StorageResolver,
    pub(super) entity: &'a config::EntityConfig,
    pub(super) mode: config::WriteMode,
}

pub(super) fn write_accepted_output(
    context: AcceptedOutputContext<'_>,
) -> FloeResult<format::AcceptedWriteOutput> {
    let AcceptedOutputContext {
        adapter,
        target,
        df,
        output_stem,
        temp_dir,
        cloud,
        resolver,
        catalogs,
        entity,
        mode,
    } = context;
    io::write::accepted::write_with_adapter(
        adapter,
        io::write::accepted::AcceptedWriteRequest {
            target,
            df,
            output_stem,
            temp_dir,
            cloud,
            resolver,
            catalogs,
            entity,
            mode,
        },
    )
}
pub(super) fn write_rejected_output(context: RejectedOutputContext<'_>) -> FloeResult<String> {
    let RejectedOutputContext {
        adapter,
        target,
        df,
        source_stem,
        temp_dir,
        cloud,
        resolver,
        entity,
        mode,
    } = context;
    adapter.write_rejected(format::RejectedWriteRequest {
        target,
        df,
        source_stem,
        temp_dir,
        cloud,
        resolver,
        entity,
        mode,
    })
}

pub(super) fn write_rejected_raw_output(
    target: &Target,
    input_file: &InputFile,
    temp_dir: Option<&Path>,
    cloud: &mut io::storage::CloudClient,
    resolver: &config::StorageResolver,
    entity: &config::EntityConfig,
) -> FloeResult<String> {
    io::storage::output::write_output(
        target,
        io::storage::OutputPlacement::Output,
        &input_file.source_name,
        temp_dir,
        cloud,
        resolver,
        entity,
        |path| {
            io::write::write_rejected_raw(&input_file.source_local_path, path)?;
            Ok(())
        },
    )
}

pub(super) fn write_error_report_output(
    target: &Target,
    source_stem: &str,
    errors_json: &[Option<String>],
    temp_dir: Option<&Path>,
    cloud: &mut io::storage::CloudClient,
    resolver: &config::StorageResolver,
    entity: &config::EntityConfig,
) -> FloeResult<String> {
    let filename = io::storage::paths::build_output_filename(source_stem, "_reject_errors", "json");
    io::storage::output::write_output(
        target,
        io::storage::OutputPlacement::Sibling,
        &filename,
        temp_dir,
        cloud,
        resolver,
        entity,
        |path| {
            io::write::write_error_report(path, errors_json)?;
            Ok(())
        },
    )
}

pub(super) fn validate_rejected_target<'a>(
    entity: &'a config::EntityConfig,
    severity: &str,
) -> FloeResult<&'a config::SinkTarget> {
    let rejected_target = entity.sink.rejected.as_ref().ok_or_else(|| {
        Box::new(ConfigError(format!(
            "sink.rejected is required for {severity} severity"
        )))
    })?;
    Ok(rejected_target)
}

pub(super) fn append_rejection_columns(
    df: &mut DataFrame,
    errors_per_row: &[Option<String>],
    include_all_rows: bool,
) -> FloeResult<()> {
    let (row_index, errors) = check::rejected_error_columns(errors_per_row, include_all_rows);
    df.with_column(row_index)
        .map_err(|err| Box::new(RunError(format!("failed to add __floe_row_index: {err}"))))?;
    df.with_column(errors)
        .map_err(|err| Box::new(RunError(format!("failed to add __floe_errors: {err}"))))?;
    Ok(())
}