1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, OnceLock};
3
4use serde::Serialize;
5
6#[allow(dead_code)]
7#[derive(Debug, Clone, Serialize)]
8#[serde(tag = "event", rename_all = "snake_case")]
9pub enum RunEvent {
10 Log {
11 run_id: String,
12 log_level: String,
13 code: Option<String>,
14 message: String,
15 entity: Option<String>,
16 input: Option<String>,
17 ts_ms: u128,
18 },
19 RunStarted {
20 run_id: String,
21 config: String,
22 report_base: Option<String>,
23 ts_ms: u128,
24 },
25 EntityStarted {
26 run_id: String,
27 name: String,
28 ts_ms: u128,
29 },
30 FileStarted {
31 run_id: String,
32 entity: String,
33 input: String,
34 ts_ms: u128,
35 },
36 FileFinished {
37 run_id: String,
38 entity: String,
39 input: String,
40 status: String,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 skip_reason: Option<String>,
43 rows: u64,
44 accepted: u64,
45 rejected: u64,
46 elapsed_ms: u64,
47 ts_ms: u128,
48 },
49 SchemaEvolutionApplied {
50 run_id: String,
51 entity: String,
52 mode: String,
53 added_columns: Vec<String>,
54 ts_ms: u128,
55 },
56 EntityFinished {
57 run_id: String,
58 name: String,
59 status: String,
60 files: u64,
61 files_skipped: u64,
62 rows: u64,
63 accepted: u64,
64 rejected: u64,
65 warnings: u64,
66 errors: u64,
67 ts_ms: u128,
68 },
69 RunFinished {
70 run_id: String,
71 status: String,
72 exit_code: i32,
73 files: u64,
74 files_skipped: u64,
75 rows: u64,
76 accepted: u64,
77 rejected: u64,
78 warnings: u64,
79 errors: u64,
80 summary_uri: Option<String>,
81 ts_ms: u128,
82 },
83}
84
85pub trait RunObserver: Send + Sync {
86 fn on_event(&self, event: RunEvent);
87}
88
89pub struct NoopObserver;
90
91impl RunObserver for NoopObserver {
92 fn on_event(&self, _event: RunEvent) {}
93}
94
95pub struct MultiObserver {
96 observers: Vec<Arc<dyn RunObserver>>,
97}
98
99impl MultiObserver {
100 pub fn new(observers: Vec<Arc<dyn RunObserver>>) -> Self {
101 Self { observers }
102 }
103}
104
105impl RunObserver for MultiObserver {
106 fn on_event(&self, event: RunEvent) {
107 for obs in &self.observers {
108 obs.on_event(event.clone());
109 }
110 }
111}
112
113static NOOP_OBSERVER: NoopObserver = NoopObserver;
114
115static OBSERVER: OnceLock<Arc<dyn RunObserver>> = OnceLock::new();
116
117static RUN_STARTED_EMITTED: AtomicBool = AtomicBool::new(false);
118
119pub fn mark_run_started() {
120 RUN_STARTED_EMITTED.store(true, Ordering::Relaxed);
121}
122
123pub fn is_run_started() -> bool {
124 RUN_STARTED_EMITTED.load(Ordering::Relaxed)
125}
126
127pub fn set_observer(observer: Arc<dyn RunObserver>) -> bool {
128 OBSERVER.set(observer).is_ok()
129}
130
131pub fn is_observer_set() -> bool {
132 OBSERVER.get().is_some()
133}
134
135pub fn default_observer() -> &'static dyn RunObserver {
136 OBSERVER
137 .get()
138 .map(|observer| observer.as_ref())
139 .unwrap_or(&NOOP_OBSERVER)
140}
141
142pub fn event_time_ms() -> u128 {
143 std::time::SystemTime::now()
144 .duration_since(std::time::UNIX_EPOCH)
145 .map(|duration| duration.as_millis())
146 .unwrap_or(0)
147}