floe-core 0.3.6

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::collections::BTreeMap;
use std::path::PathBuf;

use crate::config::IncrementalMode;
use crate::io::format::InputFile;
use crate::report::{
    FileMismatch, FileOutput, FileReport, FileStatus, FileValidation, MismatchAction,
};
use crate::run::RunContext;
use crate::state::{
    read_entity_state, resolve_entity_state_path, validate_entity_state, write_entity_state_atomic,
    EntityFileState, EntityState,
};
use crate::{config, report, warnings, FloeResult};

pub(super) struct IncrementalContext {
    pub(super) pending_inputs: Vec<InputFile>,
    pub(super) skipped_reports: Vec<FileReport>,
    pub(super) pending_state: Option<PendingEntityState>,
}

pub(super) fn prepare_incremental_context(
    context: &RunContext,
    entity: &config::EntityConfig,
    input_files: Vec<InputFile>,
) -> FloeResult<IncrementalContext> {
    if entity.incremental_mode != IncrementalMode::File {
        return Ok(IncrementalContext {
            pending_inputs: input_files,
            skipped_reports: Vec::new(),
            pending_state: None,
        });
    }

    let resolved_state = resolve_entity_state_path(&context.storage_resolver, entity)?;
    let state_path = resolved_state.local_path.ok_or_else(|| {
        Box::new(crate::ConfigError(format!(
            "entity.name={} incremental_mode=file requires a local state path",
            entity.name
        ))) as Box<dyn std::error::Error + Send + Sync>
    })?;
    let existing = read_entity_state(&state_path)?
        .map(|state| validate_entity_state(entity, state))
        .transpose()?
        .unwrap_or_else(|| EntityState::new(&entity.name));

    let mut pending_inputs = Vec::new();
    let mut skipped_reports = Vec::new();
    let mut pending_entries = BTreeMap::new();

    for input_file in input_files {
        match existing.files.get(&input_file.source_uri) {
            Some(recorded) if file_state_matches(recorded, &input_file) => {
                skipped_reports.push(build_skipped_report(input_file.source_uri.clone(), None));
            }
            Some(recorded) => {
                let message = format!(
                    "entity.name={} incremental_mode=file skipping previously ingested file with changed metadata: {} (recorded size={:?}, mtime={:?}; current size={:?}, mtime={:?})",
                    entity.name,
                    input_file.source_uri,
                    recorded.size,
                    recorded.mtime,
                    input_file.source_size,
                    input_file.source_mtime,
                );
                warnings::emit(
                    &context.run_id,
                    Some(&entity.name),
                    Some(&input_file.source_uri),
                    Some("incremental_file_changed"),
                    &message,
                );
                skipped_reports.push(build_skipped_report(
                    input_file.source_uri.clone(),
                    Some(message),
                ));
            }
            None => {
                pending_entries.insert(
                    input_file.source_uri.clone(),
                    PendingFileState {
                        size: input_file.source_size,
                        mtime: input_file.source_mtime.clone(),
                    },
                );
                pending_inputs.push(input_file);
            }
        }
    }

    Ok(IncrementalContext {
        pending_inputs,
        skipped_reports,
        pending_state: Some(PendingEntityState {
            path: state_path,
            entity_name: entity.name.clone(),
            base_state: existing,
            pending_entries,
        }),
    })
}

fn file_state_matches(recorded: &EntityFileState, input_file: &InputFile) -> bool {
    recorded.size == input_file.source_size && recorded.mtime == input_file.source_mtime
}

fn build_skipped_report(input_file: String, warning: Option<String>) -> FileReport {
    let warnings = u64::from(warning.is_some());
    FileReport {
        input_file,
        status: FileStatus::Success,
        row_count: 0,
        accepted_count: 0,
        rejected_count: 0,
        mismatch: FileMismatch {
            declared_columns_count: 0,
            input_columns_count: 0,
            missing_columns: Vec::new(),
            extra_columns: Vec::new(),
            mismatch_action: MismatchAction::None,
            error: None,
            warning,
        },
        output: FileOutput {
            accepted_path: None,
            rejected_path: None,
            errors_path: None,
            archived_path: None,
        },
        validation: FileValidation {
            errors: 0,
            warnings,
            rules: Vec::new(),
        },
    }
}

struct PendingFileState {
    size: Option<u64>,
    mtime: Option<String>,
}

pub(super) struct PendingEntityState {
    path: PathBuf,
    entity_name: String,
    base_state: EntityState,
    pending_entries: BTreeMap<String, PendingFileState>,
}

impl PendingEntityState {
    pub(super) fn commit(&self) -> FloeResult<()> {
        if self.pending_entries.is_empty() {
            return Ok(());
        }

        let processed_at = report::now_rfc3339();
        let mut state = self.base_state.clone();
        if state.schema.is_empty() {
            state.schema = crate::state::ENTITY_STATE_SCHEMA_V1.to_string();
        }
        if state.entity.is_empty() {
            state.entity = self.entity_name.clone();
        }
        state.updated_at = Some(processed_at.clone());
        for (source_uri, pending) in &self.pending_entries {
            state.files.insert(
                source_uri.clone(),
                EntityFileState {
                    processed_at: processed_at.clone(),
                    size: pending.size,
                    mtime: pending.mtime.clone(),
                },
            );
        }
        write_entity_state_atomic(&self.path, &state)
    }
}