1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, OnceLock};
4
5use serde::Serialize;
6
7#[allow(dead_code)]
8#[derive(Debug, Clone, Serialize)]
9#[serde(tag = "event", rename_all = "snake_case")]
10pub enum RunEvent {
11 Log {
12 run_id: String,
13 log_level: String,
14 code: Option<String>,
15 message: String,
16 entity: Option<String>,
17 input: Option<String>,
18 ts_ms: u128,
19 },
20 RunStarted {
21 run_id: String,
22 config: String,
23 report_base: Option<String>,
24 ts_ms: u128,
25 },
26 EntityStarted {
27 run_id: String,
28 name: String,
29 ts_ms: u128,
30 },
31 FileStarted {
32 run_id: String,
33 entity: String,
34 input: String,
35 ts_ms: u128,
36 },
37 FileFinished {
38 run_id: String,
39 entity: String,
40 input: String,
41 status: String,
42 #[serde(skip_serializing_if = "Option::is_none")]
43 skip_reason: Option<String>,
44 rows: u64,
45 accepted: u64,
46 rejected: u64,
47 elapsed_ms: u64,
48 ts_ms: u128,
49 },
50 SchemaEvolutionApplied {
51 run_id: String,
52 entity: String,
53 mode: String,
54 added_columns: Vec<String>,
55 ts_ms: u128,
56 },
57 EntityFinished {
58 run_id: String,
59 name: String,
60 status: String,
61 files: u64,
62 files_skipped: u64,
63 rows: u64,
64 accepted: u64,
65 rejected: u64,
66 warnings: u64,
67 errors: u64,
68 ts_ms: u128,
69 },
70 RunFinished {
71 run_id: String,
72 status: String,
73 exit_code: i32,
74 files: u64,
75 files_skipped: u64,
76 rows: u64,
77 accepted: u64,
78 rejected: u64,
79 warnings: u64,
80 errors: u64,
81 summary_uri: Option<String>,
82 #[serde(skip_serializing_if = "Option::is_none")]
83 report_base: Option<String>,
84 #[serde(skip_serializing_if = "HashMap::is_empty")]
85 entity_report_uris: HashMap<String, String>,
86 ts_ms: u128,
87 },
88}
89
90pub trait RunObserver: Send + Sync {
91 fn on_event(&self, event: RunEvent);
92}
93
94pub struct NoopObserver;
95
96impl RunObserver for NoopObserver {
97 fn on_event(&self, _event: RunEvent) {}
98}
99
100pub struct MultiObserver {
101 observers: Vec<Arc<dyn RunObserver>>,
102}
103
104impl MultiObserver {
105 pub fn new(observers: Vec<Arc<dyn RunObserver>>) -> Self {
106 Self { observers }
107 }
108}
109
110impl RunObserver for MultiObserver {
111 fn on_event(&self, event: RunEvent) {
112 for obs in &self.observers {
113 obs.on_event(event.clone());
114 }
115 }
116}
117
118static NOOP_OBSERVER: NoopObserver = NoopObserver;
119
120static OBSERVER: OnceLock<Arc<dyn RunObserver>> = OnceLock::new();
121
122static RUN_STARTED_EMITTED: AtomicBool = AtomicBool::new(false);
123
124pub fn mark_run_started() {
125 RUN_STARTED_EMITTED.store(true, Ordering::Relaxed);
126}
127
128pub fn is_run_started() -> bool {
129 RUN_STARTED_EMITTED.load(Ordering::Relaxed)
130}
131
132pub fn set_observer(observer: Arc<dyn RunObserver>) -> bool {
133 OBSERVER.set(observer).is_ok()
134}
135
136pub fn is_observer_set() -> bool {
137 OBSERVER.get().is_some()
138}
139
140pub fn default_observer() -> &'static dyn RunObserver {
141 OBSERVER
142 .get()
143 .map(|observer| observer.as_ref())
144 .unwrap_or(&NOOP_OBSERVER)
145}
146
147pub fn event_time_ms() -> u128 {
148 std::time::SystemTime::now()
149 .duration_since(std::time::UNIX_EPOCH)
150 .map(|duration| duration.as_millis())
151 .unwrap_or(0)
152}