Skip to main content

floe_core/run/
events.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, OnceLock};
3
4use serde::Serialize;
5
6#[allow(dead_code)]
7#[derive(Debug, Clone, Serialize)]
8#[serde(tag = "event", rename_all = "snake_case")]
9pub enum RunEvent {
10    Log {
11        run_id: String,
12        log_level: String,
13        code: Option<String>,
14        message: String,
15        entity: Option<String>,
16        input: Option<String>,
17        ts_ms: u128,
18    },
19    RunStarted {
20        run_id: String,
21        config: String,
22        report_base: Option<String>,
23        ts_ms: u128,
24    },
25    EntityStarted {
26        run_id: String,
27        name: String,
28        ts_ms: u128,
29    },
30    FileStarted {
31        run_id: String,
32        entity: String,
33        input: String,
34        ts_ms: u128,
35    },
36    FileFinished {
37        run_id: String,
38        entity: String,
39        input: String,
40        status: String,
41        #[serde(skip_serializing_if = "Option::is_none")]
42        skip_reason: Option<String>,
43        rows: u64,
44        accepted: u64,
45        rejected: u64,
46        elapsed_ms: u64,
47        ts_ms: u128,
48    },
49    SchemaEvolutionApplied {
50        run_id: String,
51        entity: String,
52        mode: String,
53        added_columns: Vec<String>,
54        ts_ms: u128,
55    },
56    EntityFinished {
57        run_id: String,
58        name: String,
59        status: String,
60        files: u64,
61        files_skipped: u64,
62        rows: u64,
63        accepted: u64,
64        rejected: u64,
65        warnings: u64,
66        errors: u64,
67        ts_ms: u128,
68    },
69    RunFinished {
70        run_id: String,
71        status: String,
72        exit_code: i32,
73        files: u64,
74        files_skipped: u64,
75        rows: u64,
76        accepted: u64,
77        rejected: u64,
78        warnings: u64,
79        errors: u64,
80        summary_uri: Option<String>,
81        ts_ms: u128,
82    },
83}
84
85pub trait RunObserver: Send + Sync {
86    fn on_event(&self, event: RunEvent);
87}
88
89pub struct NoopObserver;
90
91impl RunObserver for NoopObserver {
92    fn on_event(&self, _event: RunEvent) {}
93}
94
95pub struct MultiObserver {
96    observers: Vec<Arc<dyn RunObserver>>,
97}
98
99impl MultiObserver {
100    pub fn new(observers: Vec<Arc<dyn RunObserver>>) -> Self {
101        Self { observers }
102    }
103}
104
105impl RunObserver for MultiObserver {
106    fn on_event(&self, event: RunEvent) {
107        for obs in &self.observers {
108            obs.on_event(event.clone());
109        }
110    }
111}
112
113static NOOP_OBSERVER: NoopObserver = NoopObserver;
114
115static OBSERVER: OnceLock<Arc<dyn RunObserver>> = OnceLock::new();
116
117static RUN_STARTED_EMITTED: AtomicBool = AtomicBool::new(false);
118
119pub fn mark_run_started() {
120    RUN_STARTED_EMITTED.store(true, Ordering::Relaxed);
121}
122
123pub fn is_run_started() -> bool {
124    RUN_STARTED_EMITTED.load(Ordering::Relaxed)
125}
126
127pub fn set_observer(observer: Arc<dyn RunObserver>) -> bool {
128    OBSERVER.set(observer).is_ok()
129}
130
131pub fn is_observer_set() -> bool {
132    OBSERVER.get().is_some()
133}
134
135pub fn default_observer() -> &'static dyn RunObserver {
136    OBSERVER
137        .get()
138        .map(|observer| observer.as_ref())
139        .unwrap_or(&NOOP_OBSERVER)
140}
141
142pub fn event_time_ms() -> u128 {
143    std::time::SystemTime::now()
144        .duration_since(std::time::UNIX_EPOCH)
145        .map(|duration| duration.as_millis())
146        .unwrap_or(0)
147}