1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, OnceLock};
3
4use serde::Serialize;
5
6#[allow(dead_code)]
7#[derive(Debug, Clone, Serialize)]
8#[serde(tag = "event", rename_all = "snake_case")]
9pub enum RunEvent {
10 Log {
11 run_id: String,
12 log_level: String,
13 code: Option<String>,
14 message: String,
15 entity: Option<String>,
16 input: Option<String>,
17 ts_ms: u128,
18 },
19 RunStarted {
20 run_id: String,
21 config: String,
22 report_base: Option<String>,
23 ts_ms: u128,
24 },
25 EntityStarted {
26 run_id: String,
27 name: String,
28 ts_ms: u128,
29 },
30 FileStarted {
31 run_id: String,
32 entity: String,
33 input: String,
34 ts_ms: u128,
35 },
36 FileFinished {
37 run_id: String,
38 entity: String,
39 input: String,
40 status: String,
41 rows: u64,
42 accepted: u64,
43 rejected: u64,
44 elapsed_ms: u64,
45 ts_ms: u128,
46 },
47 SchemaEvolutionApplied {
48 run_id: String,
49 entity: String,
50 mode: String,
51 added_columns: Vec<String>,
52 ts_ms: u128,
53 },
54 EntityFinished {
55 run_id: String,
56 name: String,
57 status: String,
58 files: u64,
59 rows: u64,
60 accepted: u64,
61 rejected: u64,
62 warnings: u64,
63 errors: u64,
64 ts_ms: u128,
65 },
66 RunFinished {
67 run_id: String,
68 status: String,
69 exit_code: i32,
70 files: u64,
71 rows: u64,
72 accepted: u64,
73 rejected: u64,
74 warnings: u64,
75 errors: u64,
76 summary_uri: Option<String>,
77 ts_ms: u128,
78 },
79}
80
81pub trait RunObserver: Send + Sync {
82 fn on_event(&self, event: RunEvent);
83}
84
85pub struct NoopObserver;
86
87impl RunObserver for NoopObserver {
88 fn on_event(&self, _event: RunEvent) {}
89}
90
91pub struct MultiObserver {
92 observers: Vec<Arc<dyn RunObserver>>,
93}
94
95impl MultiObserver {
96 pub fn new(observers: Vec<Arc<dyn RunObserver>>) -> Self {
97 Self { observers }
98 }
99}
100
101impl RunObserver for MultiObserver {
102 fn on_event(&self, event: RunEvent) {
103 for obs in &self.observers {
104 obs.on_event(event.clone());
105 }
106 }
107}
108
109static NOOP_OBSERVER: NoopObserver = NoopObserver;
110
111static OBSERVER: OnceLock<Arc<dyn RunObserver>> = OnceLock::new();
112
113static RUN_STARTED_EMITTED: AtomicBool = AtomicBool::new(false);
114
115pub fn mark_run_started() {
116 RUN_STARTED_EMITTED.store(true, Ordering::Relaxed);
117}
118
119pub fn is_run_started() -> bool {
120 RUN_STARTED_EMITTED.load(Ordering::Relaxed)
121}
122
123pub fn set_observer(observer: Arc<dyn RunObserver>) -> bool {
124 OBSERVER.set(observer).is_ok()
125}
126
127pub fn is_observer_set() -> bool {
128 OBSERVER.get().is_some()
129}
130
131pub fn default_observer() -> &'static dyn RunObserver {
132 OBSERVER
133 .get()
134 .map(|observer| observer.as_ref())
135 .unwrap_or(&NOOP_OBSERVER)
136}
137
138pub fn event_time_ms() -> u128 {
139 std::time::SystemTime::now()
140 .duration_since(std::time::UNIX_EPOCH)
141 .map(|duration| duration.as_millis())
142 .unwrap_or(0)
143}