floe-core 0.3.6

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::time::Instant;

use crate::checks::normalize::resolve_normalize_strategy;
use crate::{check, config, io, report, warnings, ConfigError, FloeResult};

use super::super::output::{validate_rejected_target, write_rejected_raw_output};
use super::ResolvedEntityTargets;
use crate::run::RunContext;
use io::format::{InputAdapter, InputFile, LocalInputFile};
use io::storage::inputs::localize_input;

pub(super) struct PrecheckedInput {
    pub(super) input_file: LocalInputFile,
    pub(super) input_columns: Vec<String>,
    pub(super) mismatch: check::MismatchOutcome,
    pub(super) file_timer: Instant,
}

pub(super) struct PrecheckResult {
    pub(super) prechecked: Vec<PrecheckedInput>,
    pub(super) abort_run: bool,
}

pub(super) struct PrecheckContext<'a> {
    pub(super) context: &'a RunContext,
    pub(super) entity: &'a config::EntityConfig,
    pub(super) input_adapter: &'a dyn InputAdapter,
    pub(super) normalized_columns: &'a [config::ColumnConfig],
    pub(super) resolved_targets: &'a ResolvedEntityTargets,
    pub(super) archive_target: Option<&'a io::storage::Target>,
    pub(super) temp_dir: Option<&'a tempfile::TempDir>,
    pub(super) cloud: &'a mut io::storage::CloudClient,
    pub(super) observer: &'a dyn crate::run::events::RunObserver,
    pub(super) file_reports: &'a mut Vec<report::FileReport>,
    pub(super) file_timings_ms: &'a mut Vec<Option<u64>>,
    pub(super) totals: &'a mut report::ResultsTotals,
}

pub(super) fn run_precheck(
    context: PrecheckContext<'_>,
    input_files: Vec<InputFile>,
) -> FloeResult<PrecheckResult> {
    let PrecheckContext {
        context,
        entity,
        input_adapter,
        normalized_columns,
        resolved_targets,
        archive_target,
        temp_dir,
        cloud,
        observer,
        file_reports,
        file_timings_ms,
        totals,
    } = context;
    let mut abort_run = false;
    let mut prechecked_inputs = Vec::with_capacity(input_files.len());
    let normalize_strategy = resolve_normalize_strategy(entity)?;
    let mismatch_columns =
        check::resolve_mismatch_columns(entity, normalized_columns, normalize_strategy.as_deref())?;

    // Use the resolved source target's storage name so that default-remote configs
    // (where source.storage is unset but the resolver default is S3/GCS/ADLS) get
    // the correct client instead of falling back to "local".
    let storage_name = resolved_targets.source.storage();
    let temp_dir_path = temp_dir.as_ref().map(|d| d.path());

    for listed_file in input_files {
        let file_timer = Instant::now();
        observer.on_event(crate::run::events::RunEvent::FileStarted {
            run_id: context.run_id.clone(),
            entity: entity.name.clone(),
            input: listed_file.source_uri.clone(),
            ts_ms: crate::run::events::event_time_ms(),
        });

        // JIT download: resolve listed InputFile to a LocalInputFile before any reads.
        let listed_source_uri = listed_file.source_uri.clone();
        let storage_client = cloud
            .client_for_context(&context.storage_resolver, storage_name, &entity.name)
            .ok()
            .map(|c| c as &dyn io::storage::StorageClient);
        let local_file = match localize_input(listed_file, storage_client, temp_dir_path) {
            Ok(f) => f,
            Err(err) => {
                let status = if entity.policy.severity == "abort" {
                    report::FileStatus::Aborted
                } else {
                    report::FileStatus::Rejected
                };
                let mismatch_action = if status == report::FileStatus::Aborted {
                    report::MismatchAction::Aborted
                } else {
                    report::MismatchAction::RejectedFile
                };
                let file_report = report::FileReport {
                    input_file: listed_source_uri.clone(),
                    status,
                    row_count: 0,
                    accepted_count: 0,
                    rejected_count: 0,
                    mismatch: report::FileMismatch {
                        declared_columns_count: mismatch_columns.len() as u64,
                        input_columns_count: 0,
                        missing_columns: Vec::new(),
                        extra_columns: Vec::new(),
                        mismatch_action,
                        error: Some(report::MismatchIssue {
                            rule: "download_error".to_string(),
                            message: format!("entity.name={} {err}", entity.name),
                        }),
                        warning: None,
                    },
                    output: report::FileOutput {
                        accepted_path: None,
                        rejected_path: None,
                        errors_path: None,
                        archived_path: None,
                    },
                    validation: report::FileValidation {
                        errors: 1,
                        warnings: 0,
                        rules: Vec::new(),
                    },
                };
                totals.errors_total += 1;
                file_reports.push(file_report);
                file_timings_ms.push(Some(file_timer.elapsed().as_millis() as u64));
                observer.on_event(crate::run::events::RunEvent::FileFinished {
                    run_id: context.run_id.clone(),
                    entity: entity.name.clone(),
                    input: listed_source_uri.clone(),
                    status: if status == report::FileStatus::Aborted {
                        "aborted"
                    } else {
                        "rejected"
                    }
                    .to_string(),
                    rows: 0,
                    accepted: 0,
                    rejected: 0,
                    elapsed_ms: file_timer.elapsed().as_millis() as u64,
                    ts_ms: crate::run::events::event_time_ms(),
                });
                if status == report::FileStatus::Aborted {
                    abort_run = true;
                    break;
                }
                continue;
            }
        };

        let input_columns =
            match input_adapter.read_input_columns(entity, &local_file, normalized_columns) {
                Ok(columns) => columns,
                Err(error) => {
                    let status = if entity.policy.severity == "abort" {
                        report::FileStatus::Aborted
                    } else {
                        report::FileStatus::Rejected
                    };
                    let mismatch_action = if status == report::FileStatus::Aborted {
                        report::MismatchAction::Aborted
                    } else {
                        report::MismatchAction::RejectedFile
                    };

                    let rejected_path = resolved_targets
                        .rejected
                        .as_ref()
                        .map(|target| {
                            write_rejected_raw_output(
                                target,
                                &local_file,
                                temp_dir_path,
                                cloud,
                                &context.storage_resolver,
                                entity,
                            )
                        })
                        .transpose()?;

                    let file_report = report::FileReport {
                        input_file: local_file.file.source_uri.clone(),
                        status,
                        row_count: 0,
                        accepted_count: 0,
                        rejected_count: 0,
                        mismatch: report::FileMismatch {
                            declared_columns_count: mismatch_columns.len() as u64,
                            input_columns_count: 0,
                            missing_columns: Vec::new(),
                            extra_columns: Vec::new(),
                            mismatch_action,
                            error: Some(report::MismatchIssue {
                                rule: error.rule,
                                message: format!("entity.name={} {}", entity.name, error.message),
                            }),
                            warning: None,
                        },
                        output: report::FileOutput {
                            accepted_path: None,
                            rejected_path,
                            errors_path: None,
                            archived_path: None,
                        },
                        validation: report::FileValidation {
                            errors: 1,
                            warnings: 0,
                            rules: Vec::new(),
                        },
                    };

                    totals.errors_total += 1;
                    file_reports.push(file_report);
                    file_timings_ms.push(Some(file_timer.elapsed().as_millis() as u64));
                    observer.on_event(crate::run::events::RunEvent::FileFinished {
                        run_id: context.run_id.clone(),
                        entity: entity.name.clone(),
                        input: local_file.file.source_uri.clone(),
                        status: if status == report::FileStatus::Aborted {
                            "aborted"
                        } else {
                            "rejected"
                        }
                        .to_string(),
                        rows: 0,
                        accepted: 0,
                        rejected: 0,
                        elapsed_ms: file_timer.elapsed().as_millis() as u64,
                        ts_ms: crate::run::events::event_time_ms(),
                    });

                    if local_file.is_ephemeral {
                        let _ = std::fs::remove_file(&local_file.local_path);
                    }
                    if status == report::FileStatus::Aborted {
                        abort_run = true;
                        break;
                    }
                    continue;
                }
            };

        let mismatch = check::plan_schema_mismatch(entity, &mismatch_columns, &input_columns)?;
        if let Some(message) = mismatch.report.warning.as_deref() {
            warnings::emit(
                &context.run_id,
                Some(&entity.name),
                Some(&local_file.file.source_uri),
                Some("schema_mismatch_warn_override"),
                message,
            );
        }

        if mismatch.rejected || mismatch.aborted {
            let accepted_path = None;
            let errors_path = None;
            let row_count = 0;

            validate_rejected_target(entity, if mismatch.aborted { "abort" } else { "reject" })?;
            let rejected_target = resolved_targets.rejected.as_ref().ok_or_else(|| {
                Box::new(ConfigError(format!(
                    "entity.name={} sink.rejected.storage is required for rejection",
                    entity.name
                )))
            })?;
            let rejected_path = Some(write_rejected_raw_output(
                rejected_target,
                &local_file,
                temp_dir_path,
                cloud,
                &context.storage_resolver,
                entity,
            )?);

            let archived_path = io::storage::ops::archive_input(
                cloud,
                &context.storage_resolver,
                &context.run_id,
                entity,
                archive_target,
                &local_file.file,
            )?;

            let status = if mismatch.aborted {
                report::FileStatus::Aborted
            } else {
                report::FileStatus::Rejected
            };
            let accepted_count = 0;
            let rejected_count = row_count;
            let mismatch_report = mismatch.report;
            let errors = mismatch.errors;
            let warnings = mismatch.warnings;

            let file_report = report::FileReport {
                input_file: local_file.file.source_uri.clone(),
                status,
                row_count,
                accepted_count,
                rejected_count,
                mismatch: mismatch_report,
                output: report::FileOutput {
                    accepted_path,
                    rejected_path,
                    errors_path,
                    archived_path,
                },
                validation: report::FileValidation {
                    errors,
                    warnings,
                    rules: Vec::new(),
                },
            };

            totals.rows_total += row_count;
            totals.accepted_total += accepted_count;
            totals.rejected_total += rejected_count;
            totals.errors_total += errors;
            totals.warnings_total += warnings;
            file_reports.push(file_report);
            file_timings_ms.push(Some(file_timer.elapsed().as_millis() as u64));
            observer.on_event(crate::run::events::RunEvent::FileFinished {
                run_id: context.run_id.clone(),
                entity: entity.name.clone(),
                input: local_file.file.source_uri.clone(),
                status: if status == report::FileStatus::Aborted {
                    "aborted"
                } else {
                    "rejected"
                }
                .to_string(),
                rows: row_count,
                accepted: accepted_count,
                rejected: rejected_count,
                elapsed_ms: file_timer.elapsed().as_millis() as u64,
                ts_ms: crate::run::events::event_time_ms(),
            });

            if local_file.is_ephemeral {
                let _ = std::fs::remove_file(&local_file.local_path);
            }
            if mismatch.aborted {
                abort_run = true;
                break;
            }
            continue;
        }

        prechecked_inputs.push(PrecheckedInput {
            input_file: local_file,
            input_columns,
            mismatch,
            file_timer,
        });
    }

    Ok(PrecheckResult {
        prechecked: prechecked_inputs,
        abort_run,
    })
}