use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock};
use serde::Serialize;
#[allow(dead_code)]
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "event", rename_all = "snake_case")]
pub enum RunEvent {
Log {
run_id: String,
log_level: String,
code: Option<String>,
message: String,
entity: Option<String>,
input: Option<String>,
ts_ms: u128,
},
RunStarted {
run_id: String,
config: String,
report_base: Option<String>,
ts_ms: u128,
},
EntityStarted {
run_id: String,
name: String,
ts_ms: u128,
},
FileStarted {
run_id: String,
entity: String,
input: String,
ts_ms: u128,
},
FileFinished {
run_id: String,
entity: String,
input: String,
status: String,
#[serde(skip_serializing_if = "Option::is_none")]
skip_reason: Option<String>,
rows: u64,
accepted: u64,
rejected: u64,
elapsed_ms: u64,
ts_ms: u128,
},
SchemaEvolutionApplied {
run_id: String,
entity: String,
mode: String,
added_columns: Vec<String>,
ts_ms: u128,
},
EntityFinished {
run_id: String,
name: String,
status: String,
files: u64,
files_skipped: u64,
rows: u64,
accepted: u64,
rejected: u64,
warnings: u64,
errors: u64,
ts_ms: u128,
},
RunFinished {
run_id: String,
status: String,
exit_code: i32,
files: u64,
files_skipped: u64,
rows: u64,
accepted: u64,
rejected: u64,
warnings: u64,
errors: u64,
summary_uri: Option<String>,
ts_ms: u128,
},
}
pub trait RunObserver: Send + Sync {
fn on_event(&self, event: RunEvent);
}
pub struct NoopObserver;
impl RunObserver for NoopObserver {
fn on_event(&self, _event: RunEvent) {}
}
pub struct MultiObserver {
observers: Vec<Arc<dyn RunObserver>>,
}
impl MultiObserver {
pub fn new(observers: Vec<Arc<dyn RunObserver>>) -> Self {
Self { observers }
}
}
impl RunObserver for MultiObserver {
fn on_event(&self, event: RunEvent) {
for obs in &self.observers {
obs.on_event(event.clone());
}
}
}
static NOOP_OBSERVER: NoopObserver = NoopObserver;
static OBSERVER: OnceLock<Arc<dyn RunObserver>> = OnceLock::new();
static RUN_STARTED_EMITTED: AtomicBool = AtomicBool::new(false);
pub fn mark_run_started() {
RUN_STARTED_EMITTED.store(true, Ordering::Relaxed);
}
pub fn is_run_started() -> bool {
RUN_STARTED_EMITTED.load(Ordering::Relaxed)
}
pub fn set_observer(observer: Arc<dyn RunObserver>) -> bool {
OBSERVER.set(observer).is_ok()
}
pub fn is_observer_set() -> bool {
OBSERVER.get().is_some()
}
pub fn default_observer() -> &'static dyn RunObserver {
OBSERVER
.get()
.map(|observer| observer.as_ref())
.unwrap_or(&NOOP_OBSERVER)
}
pub fn event_time_ms() -> u128 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|duration| duration.as_millis())
.unwrap_or(0)
}