floe-core 0.4.3

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::sync::mpsc::{self, Sender};
use std::thread::{self, JoinHandle};
use std::time::Duration;

use crate::config::IncrementalMode;
use crate::config::StorageResolver;
use crate::io::format::InputFile;
use crate::io::storage::CloudClient;
use crate::report::{
    FileMismatch, FileOutput, FileReport, FileStatus, FileValidation, MismatchAction,
};
use crate::run::RunContext;
use crate::state::{
    claim_entity_inputs, promote_claimed_entity_state, release_claimed_entity_state,
    renew_claimed_entity_state, ClaimedEntityState, EntityFileState, CLAIM_TTL_SECONDS,
};
use crate::{config, warnings, FloeResult};

pub(super) struct IncrementalContext {
    pub(super) pending_inputs: Vec<InputFile>,
    pub(super) skipped_reports: Vec<FileReport>,
    pub(super) pending_state: Option<PendingEntityState>,
}

pub(super) fn prepare_incremental_context(
    context: &RunContext,
    cloud: &mut CloudClient,
    entity: &config::EntityConfig,
    input_files: Vec<InputFile>,
) -> FloeResult<IncrementalContext> {
    if entity.incremental_mode != IncrementalMode::File {
        return Ok(IncrementalContext {
            pending_inputs: input_files,
            skipped_reports: Vec::new(),
            pending_state: None,
        });
    }

    let mut skipped_reports = Vec::new();
    let claim_outcome = claim_entity_inputs(
        &context.storage_resolver,
        cloud,
        entity,
        &context.run_id,
        input_files,
    )?;

    for (input_file, recorded) in &claim_outcome.already_processed {
        if file_state_matches(recorded, input_file) {
            skipped_reports.push(build_skipped_report(
                input_file.source_uri.clone(),
                "already_ingested",
                None,
            ));
        } else {
            let message = format!(
                "entity.name={} incremental_mode=file skipping previously ingested file with changed metadata: {} (recorded size={:?}, mtime={:?}; current size={:?}, mtime={:?})",
                entity.name,
                input_file.source_uri,
                recorded.size,
                recorded.mtime,
                input_file.source_size,
                input_file.source_mtime,
            );
            warnings::emit(
                &context.run_id,
                Some(&entity.name),
                Some(&input_file.source_uri),
                Some("incremental_file_changed"),
                &message,
            );
            skipped_reports.push(build_skipped_report(
                input_file.source_uri.clone(),
                "already_ingested_changed",
                Some(message),
            ));
        }
    }
    for source_uri in claim_outcome.active_claims {
        let message = format!(
            "entity.name={} incremental_mode=file skipping file claimed by another active run: {}",
            entity.name, source_uri
        );
        warnings::emit(
            &context.run_id,
            Some(&entity.name),
            Some(&source_uri),
            Some("incremental_file_claimed"),
            &message,
        );
        skipped_reports.push(build_skipped_report(
            source_uri,
            "active_claim",
            Some(message),
        ));
    }

    Ok(IncrementalContext {
        pending_inputs: claim_outcome.pending_inputs,
        skipped_reports,
        pending_state: claim_outcome.claimed_state.map(|claimed| {
            PendingEntityState::new(
                claimed,
                context.storage_resolver.clone(),
                entity.name.clone(),
                context.run_id.clone(),
            )
        }),
    })
}

fn file_state_matches(recorded: &EntityFileState, input_file: &InputFile) -> bool {
    recorded.size == input_file.source_size && recorded.mtime == input_file.source_mtime
}

fn build_skipped_report(
    input_file: String,
    skip_reason: &str,
    warning: Option<String>,
) -> FileReport {
    let warnings = u64::from(warning.is_some());
    FileReport {
        input_file,
        status: FileStatus::Skipped,
        row_count: 0,
        accepted_count: 0,
        rejected_count: 0,
        skip_reason: Some(skip_reason.to_string()),
        mismatch: FileMismatch {
            declared_columns_count: 0,
            input_columns_count: 0,
            missing_columns: Vec::new(),
            extra_columns: Vec::new(),
            mismatch_action: MismatchAction::None,
            error: None,
            warning,
        },
        output: FileOutput {
            accepted_path: None,
            rejected_path: None,
            errors_path: None,
            archived_path: None,
        },
        validation: FileValidation {
            errors: 0,
            warnings,
            rules: Vec::new(),
        },
    }
}

pub(super) struct PendingEntityState {
    claimed: ClaimedEntityState,
    resolver: StorageResolver,
    entity_name: String,
    run_id: String,
    heartbeat: Option<ClaimHeartbeat>,
    finalized: bool,
}

impl PendingEntityState {
    fn new(
        claimed: ClaimedEntityState,
        resolver: StorageResolver,
        entity_name: String,
        run_id: String,
    ) -> Self {
        let heartbeat = ClaimHeartbeat::start(
            resolver.clone(),
            entity_name.clone(),
            run_id.clone(),
            claimed.clone(),
        );
        Self {
            claimed,
            resolver,
            entity_name,
            run_id,
            heartbeat: Some(heartbeat),
            finalized: false,
        }
    }

    pub(super) fn commit(
        &mut self,
        _context: &RunContext,
        cloud: &mut CloudClient,
        _entity: &config::EntityConfig,
    ) -> FloeResult<()> {
        self.stop_heartbeat();
        promote_claimed_entity_state(
            &self.resolver,
            cloud,
            &self.entity_name,
            &self.run_id,
            &self.claimed,
        )?;
        self.finalized = true;
        Ok(())
    }

    pub(super) fn release(
        &mut self,
        _context: &RunContext,
        cloud: &mut CloudClient,
        _entity: &config::EntityConfig,
    ) -> FloeResult<()> {
        self.stop_heartbeat();
        release_claimed_entity_state(
            &self.resolver,
            cloud,
            &self.entity_name,
            &self.run_id,
            &self.claimed,
        )?;
        self.finalized = true;
        Ok(())
    }

    fn stop_heartbeat(&mut self) {
        if let Some(mut heartbeat) = self.heartbeat.take() {
            heartbeat.stop();
        }
    }
}

impl Drop for PendingEntityState {
    fn drop(&mut self) {
        self.stop_heartbeat();
        if self.finalized {
            return;
        }
        let mut cloud = CloudClient::new();
        let _ = release_claimed_entity_state(
            &self.resolver,
            &mut cloud,
            &self.entity_name,
            &self.run_id,
            &self.claimed,
        );
    }
}

struct ClaimHeartbeat {
    stop_tx: Option<Sender<()>>,
    handle: Option<JoinHandle<()>>,
}

impl ClaimHeartbeat {
    fn start(
        resolver: StorageResolver,
        entity_name: String,
        run_id: String,
        claimed: ClaimedEntityState,
    ) -> Self {
        let (stop_tx, stop_rx) = mpsc::channel();
        let interval = claim_renewal_interval();
        let handle = thread::spawn(move || {
            while stop_rx.recv_timeout(interval).is_err() {
                let mut cloud = CloudClient::new();
                let _ = renew_claimed_entity_state(
                    &resolver,
                    &mut cloud,
                    &entity_name,
                    &run_id,
                    &claimed,
                );
            }
        });
        Self {
            stop_tx: Some(stop_tx),
            handle: Some(handle),
        }
    }

    fn stop(&mut self) {
        if let Some(stop_tx) = self.stop_tx.take() {
            let _ = stop_tx.send(());
        }
        if let Some(handle) = self.handle.take() {
            let _ = handle.join();
        }
    }
}

impl Drop for ClaimHeartbeat {
    fn drop(&mut self) {
        self.stop();
    }
}

fn claim_renewal_interval() -> Duration {
    let ttl = CLAIM_TTL_SECONDS.max(1) as u64;
    Duration::from_secs((ttl / 3).clamp(1, 300))
}