1use std::sync::{Arc, OnceLock};
2
3use serde::Serialize;
4
5#[allow(dead_code)]
6#[derive(Debug, Clone, Serialize)]
7#[serde(tag = "event", rename_all = "snake_case")]
8pub enum RunEvent {
9 Log {
10 run_id: String,
11 log_level: String,
12 code: Option<String>,
13 message: String,
14 entity: Option<String>,
15 input: Option<String>,
16 ts_ms: u128,
17 },
18 RunStarted {
19 run_id: String,
20 config: String,
21 report_base: Option<String>,
22 ts_ms: u128,
23 },
24 EntityStarted {
25 run_id: String,
26 name: String,
27 ts_ms: u128,
28 },
29 FileStarted {
30 run_id: String,
31 entity: String,
32 input: String,
33 ts_ms: u128,
34 },
35 FileFinished {
36 run_id: String,
37 entity: String,
38 input: String,
39 status: String,
40 rows: u64,
41 accepted: u64,
42 rejected: u64,
43 elapsed_ms: u64,
44 ts_ms: u128,
45 },
46 EntityFinished {
47 run_id: String,
48 name: String,
49 status: String,
50 files: u64,
51 rows: u64,
52 accepted: u64,
53 rejected: u64,
54 warnings: u64,
55 errors: u64,
56 ts_ms: u128,
57 },
58 RunFinished {
59 run_id: String,
60 status: String,
61 exit_code: i32,
62 files: u64,
63 rows: u64,
64 accepted: u64,
65 rejected: u64,
66 warnings: u64,
67 errors: u64,
68 summary_uri: Option<String>,
69 ts_ms: u128,
70 },
71}
72
73pub trait RunObserver: Send + Sync {
74 fn on_event(&self, event: RunEvent);
75}
76
77pub struct NoopObserver;
78
79impl RunObserver for NoopObserver {
80 fn on_event(&self, _event: RunEvent) {}
81}
82
83static NOOP_OBSERVER: NoopObserver = NoopObserver;
84
85static OBSERVER: OnceLock<Arc<dyn RunObserver>> = OnceLock::new();
86
87pub fn set_observer(observer: Arc<dyn RunObserver>) -> bool {
88 OBSERVER.set(observer).is_ok()
89}
90
91pub fn is_observer_set() -> bool {
92 OBSERVER.get().is_some()
93}
94
95pub fn default_observer() -> &'static dyn RunObserver {
96 OBSERVER
97 .get()
98 .map(|observer| observer.as_ref())
99 .unwrap_or(&NOOP_OBSERVER)
100}
101
102pub fn event_time_ms() -> u128 {
103 std::time::SystemTime::now()
104 .duration_since(std::time::UNIX_EPOCH)
105 .map(|duration| duration.as_millis())
106 .unwrap_or(0)
107}