Skip to main content

floe_core/run/
events.rs

1use std::sync::{Arc, OnceLock};
2
3use serde::Serialize;
4
5#[allow(dead_code)]
6#[derive(Debug, Clone, Serialize)]
7#[serde(tag = "event", rename_all = "snake_case")]
8pub enum RunEvent {
9    Log {
10        run_id: String,
11        log_level: String,
12        code: Option<String>,
13        message: String,
14        entity: Option<String>,
15        input: Option<String>,
16        ts_ms: u128,
17    },
18    RunStarted {
19        run_id: String,
20        config: String,
21        report_base: Option<String>,
22        ts_ms: u128,
23    },
24    EntityStarted {
25        run_id: String,
26        name: String,
27        ts_ms: u128,
28    },
29    FileStarted {
30        run_id: String,
31        entity: String,
32        input: String,
33        ts_ms: u128,
34    },
35    FileFinished {
36        run_id: String,
37        entity: String,
38        input: String,
39        status: String,
40        rows: u64,
41        accepted: u64,
42        rejected: u64,
43        elapsed_ms: u64,
44        ts_ms: u128,
45    },
46    SchemaEvolutionApplied {
47        run_id: String,
48        entity: String,
49        mode: String,
50        added_columns: Vec<String>,
51        ts_ms: u128,
52    },
53    EntityFinished {
54        run_id: String,
55        name: String,
56        status: String,
57        files: u64,
58        rows: u64,
59        accepted: u64,
60        rejected: u64,
61        warnings: u64,
62        errors: u64,
63        ts_ms: u128,
64    },
65    RunFinished {
66        run_id: String,
67        status: String,
68        exit_code: i32,
69        files: u64,
70        rows: u64,
71        accepted: u64,
72        rejected: u64,
73        warnings: u64,
74        errors: u64,
75        summary_uri: Option<String>,
76        ts_ms: u128,
77    },
78}
79
80pub trait RunObserver: Send + Sync {
81    fn on_event(&self, event: RunEvent);
82}
83
84pub struct NoopObserver;
85
86impl RunObserver for NoopObserver {
87    fn on_event(&self, _event: RunEvent) {}
88}
89
90static NOOP_OBSERVER: NoopObserver = NoopObserver;
91
92static OBSERVER: OnceLock<Arc<dyn RunObserver>> = OnceLock::new();
93
94pub fn set_observer(observer: Arc<dyn RunObserver>) -> bool {
95    OBSERVER.set(observer).is_ok()
96}
97
98pub fn is_observer_set() -> bool {
99    OBSERVER.get().is_some()
100}
101
102pub fn default_observer() -> &'static dyn RunObserver {
103    OBSERVER
104        .get()
105        .map(|observer| observer.as_ref())
106        .unwrap_or(&NOOP_OBSERVER)
107}
108
109pub fn event_time_ms() -> u128 {
110    std::time::SystemTime::now()
111        .duration_since(std::time::UNIX_EPOCH)
112        .map(|duration| duration.as_millis())
113        .unwrap_or(0)
114}