Skip to main content

floe_core/run/
events.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, OnceLock};
3
4use serde::Serialize;
5
6#[allow(dead_code)]
7#[derive(Debug, Clone, Serialize)]
8#[serde(tag = "event", rename_all = "snake_case")]
9pub enum RunEvent {
10    Log {
11        run_id: String,
12        log_level: String,
13        code: Option<String>,
14        message: String,
15        entity: Option<String>,
16        input: Option<String>,
17        ts_ms: u128,
18    },
19    RunStarted {
20        run_id: String,
21        config: String,
22        report_base: Option<String>,
23        ts_ms: u128,
24    },
25    EntityStarted {
26        run_id: String,
27        name: String,
28        ts_ms: u128,
29    },
30    FileStarted {
31        run_id: String,
32        entity: String,
33        input: String,
34        ts_ms: u128,
35    },
36    FileFinished {
37        run_id: String,
38        entity: String,
39        input: String,
40        status: String,
41        rows: u64,
42        accepted: u64,
43        rejected: u64,
44        elapsed_ms: u64,
45        ts_ms: u128,
46    },
47    SchemaEvolutionApplied {
48        run_id: String,
49        entity: String,
50        mode: String,
51        added_columns: Vec<String>,
52        ts_ms: u128,
53    },
54    EntityFinished {
55        run_id: String,
56        name: String,
57        status: String,
58        files: u64,
59        rows: u64,
60        accepted: u64,
61        rejected: u64,
62        warnings: u64,
63        errors: u64,
64        ts_ms: u128,
65    },
66    RunFinished {
67        run_id: String,
68        status: String,
69        exit_code: i32,
70        files: u64,
71        rows: u64,
72        accepted: u64,
73        rejected: u64,
74        warnings: u64,
75        errors: u64,
76        summary_uri: Option<String>,
77        ts_ms: u128,
78    },
79}
80
81pub trait RunObserver: Send + Sync {
82    fn on_event(&self, event: RunEvent);
83}
84
85pub struct NoopObserver;
86
87impl RunObserver for NoopObserver {
88    fn on_event(&self, _event: RunEvent) {}
89}
90
91pub struct MultiObserver {
92    observers: Vec<Arc<dyn RunObserver>>,
93}
94
95impl MultiObserver {
96    pub fn new(observers: Vec<Arc<dyn RunObserver>>) -> Self {
97        Self { observers }
98    }
99}
100
101impl RunObserver for MultiObserver {
102    fn on_event(&self, event: RunEvent) {
103        for obs in &self.observers {
104            obs.on_event(event.clone());
105        }
106    }
107}
108
109static NOOP_OBSERVER: NoopObserver = NoopObserver;
110
111static OBSERVER: OnceLock<Arc<dyn RunObserver>> = OnceLock::new();
112
113static RUN_STARTED_EMITTED: AtomicBool = AtomicBool::new(false);
114
115pub fn mark_run_started() {
116    RUN_STARTED_EMITTED.store(true, Ordering::Relaxed);
117}
118
119pub fn is_run_started() -> bool {
120    RUN_STARTED_EMITTED.load(Ordering::Relaxed)
121}
122
123pub fn set_observer(observer: Arc<dyn RunObserver>) -> bool {
124    OBSERVER.set(observer).is_ok()
125}
126
127pub fn is_observer_set() -> bool {
128    OBSERVER.get().is_some()
129}
130
131pub fn default_observer() -> &'static dyn RunObserver {
132    OBSERVER
133        .get()
134        .map(|observer| observer.as_ref())
135        .unwrap_or(&NOOP_OBSERVER)
136}
137
138pub fn event_time_ms() -> u128 {
139    std::time::SystemTime::now()
140        .duration_since(std::time::UNIX_EPOCH)
141        .map(|duration| duration.as_millis())
142        .unwrap_or(0)
143}