floe-core 0.4.2

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock};

use serde::Serialize;

#[allow(dead_code)]
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "event", rename_all = "snake_case")]
pub enum RunEvent {
    Log {
        run_id: String,
        log_level: String,
        code: Option<String>,
        message: String,
        entity: Option<String>,
        input: Option<String>,
        ts_ms: u128,
    },
    RunStarted {
        run_id: String,
        config: String,
        report_base: Option<String>,
        ts_ms: u128,
    },
    EntityStarted {
        run_id: String,
        name: String,
        ts_ms: u128,
    },
    FileStarted {
        run_id: String,
        entity: String,
        input: String,
        ts_ms: u128,
    },
    FileFinished {
        run_id: String,
        entity: String,
        input: String,
        status: String,
        #[serde(skip_serializing_if = "Option::is_none")]
        skip_reason: Option<String>,
        rows: u64,
        accepted: u64,
        rejected: u64,
        elapsed_ms: u64,
        ts_ms: u128,
    },
    SchemaEvolutionApplied {
        run_id: String,
        entity: String,
        mode: String,
        added_columns: Vec<String>,
        ts_ms: u128,
    },
    EntityFinished {
        run_id: String,
        name: String,
        status: String,
        files: u64,
        files_skipped: u64,
        rows: u64,
        accepted: u64,
        rejected: u64,
        warnings: u64,
        errors: u64,
        ts_ms: u128,
    },
    RunFinished {
        run_id: String,
        status: String,
        exit_code: i32,
        files: u64,
        files_skipped: u64,
        rows: u64,
        accepted: u64,
        rejected: u64,
        warnings: u64,
        errors: u64,
        summary_uri: Option<String>,
        ts_ms: u128,
    },
}

pub trait RunObserver: Send + Sync {
    fn on_event(&self, event: RunEvent);
}

pub struct NoopObserver;

impl RunObserver for NoopObserver {
    fn on_event(&self, _event: RunEvent) {}
}

pub struct MultiObserver {
    observers: Vec<Arc<dyn RunObserver>>,
}

impl MultiObserver {
    pub fn new(observers: Vec<Arc<dyn RunObserver>>) -> Self {
        Self { observers }
    }
}

impl RunObserver for MultiObserver {
    fn on_event(&self, event: RunEvent) {
        for obs in &self.observers {
            obs.on_event(event.clone());
        }
    }
}

static NOOP_OBSERVER: NoopObserver = NoopObserver;

static OBSERVER: OnceLock<Arc<dyn RunObserver>> = OnceLock::new();

static RUN_STARTED_EMITTED: AtomicBool = AtomicBool::new(false);

pub fn mark_run_started() {
    RUN_STARTED_EMITTED.store(true, Ordering::Relaxed);
}

pub fn is_run_started() -> bool {
    RUN_STARTED_EMITTED.load(Ordering::Relaxed)
}

pub fn set_observer(observer: Arc<dyn RunObserver>) -> bool {
    OBSERVER.set(observer).is_ok()
}

pub fn is_observer_set() -> bool {
    OBSERVER.get().is_some()
}

pub fn default_observer() -> &'static dyn RunObserver {
    OBSERVER
        .get()
        .map(|observer| observer.as_ref())
        .unwrap_or(&NOOP_OBSERVER)
}

pub fn event_time_ms() -> u128 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|duration| duration.as_millis())
        .unwrap_or(0)
}