1use std::sync::{Arc, OnceLock};
2
3use serde::Serialize;
4
5#[allow(dead_code)]
6#[derive(Debug, Clone, Serialize)]
7#[serde(tag = "event", rename_all = "snake_case")]
8pub enum RunEvent {
9 Log {
10 run_id: String,
11 log_level: String,
12 code: Option<String>,
13 message: String,
14 entity: Option<String>,
15 input: Option<String>,
16 ts_ms: u128,
17 },
18 RunStarted {
19 run_id: String,
20 config: String,
21 report_base: Option<String>,
22 ts_ms: u128,
23 },
24 EntityStarted {
25 run_id: String,
26 name: String,
27 ts_ms: u128,
28 },
29 FileStarted {
30 run_id: String,
31 entity: String,
32 input: String,
33 ts_ms: u128,
34 },
35 FileFinished {
36 run_id: String,
37 entity: String,
38 input: String,
39 status: String,
40 rows: u64,
41 accepted: u64,
42 rejected: u64,
43 elapsed_ms: u64,
44 ts_ms: u128,
45 },
46 SchemaEvolutionApplied {
47 run_id: String,
48 entity: String,
49 mode: String,
50 added_columns: Vec<String>,
51 ts_ms: u128,
52 },
53 EntityFinished {
54 run_id: String,
55 name: String,
56 status: String,
57 files: u64,
58 rows: u64,
59 accepted: u64,
60 rejected: u64,
61 warnings: u64,
62 errors: u64,
63 ts_ms: u128,
64 },
65 RunFinished {
66 run_id: String,
67 status: String,
68 exit_code: i32,
69 files: u64,
70 rows: u64,
71 accepted: u64,
72 rejected: u64,
73 warnings: u64,
74 errors: u64,
75 summary_uri: Option<String>,
76 ts_ms: u128,
77 },
78}
79
80pub trait RunObserver: Send + Sync {
81 fn on_event(&self, event: RunEvent);
82}
83
84pub struct NoopObserver;
85
86impl RunObserver for NoopObserver {
87 fn on_event(&self, _event: RunEvent) {}
88}
89
90static NOOP_OBSERVER: NoopObserver = NoopObserver;
91
92static OBSERVER: OnceLock<Arc<dyn RunObserver>> = OnceLock::new();
93
94pub fn set_observer(observer: Arc<dyn RunObserver>) -> bool {
95 OBSERVER.set(observer).is_ok()
96}
97
98pub fn is_observer_set() -> bool {
99 OBSERVER.get().is_some()
100}
101
102pub fn default_observer() -> &'static dyn RunObserver {
103 OBSERVER
104 .get()
105 .map(|observer| observer.as_ref())
106 .unwrap_or(&NOOP_OBSERVER)
107}
108
109pub fn event_time_ms() -> u128 {
110 std::time::SystemTime::now()
111 .duration_since(std::time::UNIX_EPOCH)
112 .map(|duration| duration.as_millis())
113 .unwrap_or(0)
114}