Skip to main content

floe_core/run/
events.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, OnceLock};
4
5use serde::Serialize;
6
7#[allow(dead_code)]
8#[derive(Debug, Clone, Serialize)]
9#[serde(tag = "event", rename_all = "snake_case")]
10pub enum RunEvent {
11    Log {
12        run_id: String,
13        log_level: String,
14        code: Option<String>,
15        message: String,
16        entity: Option<String>,
17        input: Option<String>,
18        ts_ms: u128,
19    },
20    RunStarted {
21        run_id: String,
22        config: String,
23        report_base: Option<String>,
24        ts_ms: u128,
25    },
26    EntityStarted {
27        run_id: String,
28        name: String,
29        ts_ms: u128,
30    },
31    FileStarted {
32        run_id: String,
33        entity: String,
34        input: String,
35        ts_ms: u128,
36    },
37    FileFinished {
38        run_id: String,
39        entity: String,
40        input: String,
41        status: String,
42        #[serde(skip_serializing_if = "Option::is_none")]
43        skip_reason: Option<String>,
44        rows: u64,
45        accepted: u64,
46        rejected: u64,
47        elapsed_ms: u64,
48        ts_ms: u128,
49    },
50    SchemaEvolutionApplied {
51        run_id: String,
52        entity: String,
53        mode: String,
54        added_columns: Vec<String>,
55        ts_ms: u128,
56    },
57    EntityFinished {
58        run_id: String,
59        name: String,
60        status: String,
61        files: u64,
62        files_skipped: u64,
63        rows: u64,
64        accepted: u64,
65        rejected: u64,
66        warnings: u64,
67        errors: u64,
68        ts_ms: u128,
69    },
70    RunFinished {
71        run_id: String,
72        status: String,
73        exit_code: i32,
74        files: u64,
75        files_skipped: u64,
76        rows: u64,
77        accepted: u64,
78        rejected: u64,
79        warnings: u64,
80        errors: u64,
81        summary_uri: Option<String>,
82        #[serde(skip_serializing_if = "Option::is_none")]
83        report_base: Option<String>,
84        #[serde(skip_serializing_if = "HashMap::is_empty")]
85        entity_report_uris: HashMap<String, String>,
86        ts_ms: u128,
87    },
88}
89
90pub trait RunObserver: Send + Sync {
91    fn on_event(&self, event: RunEvent);
92}
93
94pub struct NoopObserver;
95
96impl RunObserver for NoopObserver {
97    fn on_event(&self, _event: RunEvent) {}
98}
99
100pub struct MultiObserver {
101    observers: Vec<Arc<dyn RunObserver>>,
102}
103
104impl MultiObserver {
105    pub fn new(observers: Vec<Arc<dyn RunObserver>>) -> Self {
106        Self { observers }
107    }
108}
109
110impl RunObserver for MultiObserver {
111    fn on_event(&self, event: RunEvent) {
112        for obs in &self.observers {
113            obs.on_event(event.clone());
114        }
115    }
116}
117
118static NOOP_OBSERVER: NoopObserver = NoopObserver;
119
120static OBSERVER: OnceLock<Arc<dyn RunObserver>> = OnceLock::new();
121
122static RUN_STARTED_EMITTED: AtomicBool = AtomicBool::new(false);
123
124pub fn mark_run_started() {
125    RUN_STARTED_EMITTED.store(true, Ordering::Relaxed);
126}
127
128pub fn is_run_started() -> bool {
129    RUN_STARTED_EMITTED.load(Ordering::Relaxed)
130}
131
132pub fn set_observer(observer: Arc<dyn RunObserver>) -> bool {
133    OBSERVER.set(observer).is_ok()
134}
135
136pub fn is_observer_set() -> bool {
137    OBSERVER.get().is_some()
138}
139
140pub fn default_observer() -> &'static dyn RunObserver {
141    OBSERVER
142        .get()
143        .map(|observer| observer.as_ref())
144        .unwrap_or(&NOOP_OBSERVER)
145}
146
147pub fn event_time_ms() -> u128 {
148    std::time::SystemTime::now()
149        .duration_since(std::time::UNIX_EPOCH)
150        .map(|duration| duration.as_millis())
151        .unwrap_or(0)
152}