1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use serde_json::{json, Value};
6
7use crate::config::{EntityConfig, LineageConfig};
8use crate::run::events::{RunEvent, RunObserver};
9
10pub struct OpenLineageObserver {
11 client: reqwest::blocking::Client,
12 config: LineageConfig,
13 entity_start_ms: Mutex<HashMap<String, u128>>,
14 entity_run_ids: Mutex<HashMap<String, String>>,
15 run_start_ms: Mutex<Option<u128>>,
16 entity_schemas: HashMap<String, Vec<(String, String)>>,
17}
18
19impl OpenLineageObserver {
20 pub fn new(config: &LineageConfig, entities: &[EntityConfig]) -> crate::FloeResult<Self> {
21 let timeout = Duration::from_secs(config.timeout_secs.unwrap_or(5));
22 let client = reqwest::blocking::Client::builder()
23 .timeout(timeout)
24 .build()
25 .map_err(|e| {
26 Box::new(crate::errors::ConfigError(format!(
27 "lineage: failed to build HTTP client: {e}"
28 ))) as Box<dyn std::error::Error + Send + Sync>
29 })?;
30
31 let entity_schemas = entities
32 .iter()
33 .map(|e| {
34 let fields: Vec<(String, String)> = e
35 .schema
36 .columns
37 .iter()
38 .map(|c| (c.name.clone(), c.column_type.clone()))
39 .collect();
40 (e.name.clone(), fields)
41 })
42 .collect();
43
44 Ok(Self {
45 client,
46 config: config.clone(),
47 entity_start_ms: Mutex::new(HashMap::new()),
48 entity_run_ids: Mutex::new(HashMap::new()),
49 run_start_ms: Mutex::new(None),
50 entity_schemas,
51 })
52 }
53
54 fn post_event(&self, body: Value) {
55 let url = format!("{}/api/v1/lineage", self.config.url.trim_end_matches('/'));
56 let mut req = self.client.post(&url).json(&body);
57 if let Some(api_key) = self.config.api_key.as_deref() {
58 req = req.bearer_auth(api_key);
59 }
60 match req.send() {
61 Err(e) => {
62 crate::warnings::emit(
63 "",
64 None,
65 None,
66 Some("lineage_http_error"),
67 &format!("lineage: POST {url} failed: {e}"),
68 );
69 }
70 Ok(resp) => {
71 if let Err(e) = resp.error_for_status() {
72 crate::warnings::emit(
73 "",
74 None,
75 None,
76 Some("lineage_http_error"),
77 &format!("lineage: POST {url} returned error status: {e}"),
78 );
79 }
80 }
81 }
82 }
83
84 fn producer(&self) -> &str {
85 self.config
86 .producer
87 .as_deref()
88 .unwrap_or("https://github.com/malon64/floe")
89 }
90
91 fn parent_run_facet(&self) -> Option<Value> {
92 if let Ok(run_id) = std::env::var("AIRFLOW_CTX_DAG_RUN_ID") {
93 let dag = std::env::var("AIRFLOW_CTX_DAG_ID").unwrap_or_default();
94 let task = std::env::var("AIRFLOW_CTX_TASK_ID").unwrap_or_default();
95 let job_name = if task.is_empty() {
96 dag.clone()
97 } else {
98 format!("{dag}.{task}")
99 };
100 return Some(json!({
101 "run": { "runId": run_id },
102 "job": {
103 "namespace": self.config.namespace,
104 "name": job_name
105 },
106 "_producer": self.producer(),
107 "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
108 }));
109 }
110 if let Ok(run_id) = std::env::var("DAGSTER_RUN_ID") {
111 let job = std::env::var("DAGSTER_JOB_NAME").unwrap_or_default();
112 return Some(json!({
113 "run": { "runId": run_id },
114 "job": {
115 "namespace": self.config.namespace,
116 "name": job
117 },
118 "_producer": self.producer(),
119 "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
120 }));
121 }
122 None
123 }
124
125 fn emit_entity_run_event(
126 &self,
127 entity_run_id: &str,
128 name: &str,
129 event_type: &str,
130 ts_ms: u128,
131 stats: Option<EntityStats>,
132 ) {
133 let event_time = ms_to_iso8601(ts_ms);
134 let job_name = format!("{}.{name}", self.config.namespace);
135
136 let mut run_facets = json!({});
137 if let Some(parent) = self.parent_run_facet() {
138 run_facets["parent"] = parent;
139 }
140
141 let inputs = if let Some(ref s) = stats {
143 let rejection_rate = if s.rows > 0 {
144 s.rejected as f64 / s.rows as f64
145 } else {
146 0.0
147 };
148 let schema_facet = json!({
149 "fields": s.schema_fields.iter().map(|(col_name, col_type)| {
150 json!({ "name": col_name, "type": col_type })
151 }).collect::<Vec<_>>(),
152 "_producer": self.producer(),
153 "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json"
154 });
155 let dq_facet = json!({
156 "rowCount": s.rows,
157 "validCount": s.accepted,
158 "invalidCount": s.rejected,
159 "_producer": self.producer(),
160 "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/DataQualityMetricsInputDatasetFacet.json"
161 });
162 let floe_facet = json!({
163 "entity": name,
164 "rejectionRate": rejection_rate,
165 "files": s.files,
166 "rows": s.rows,
167 "accepted": s.accepted,
168 "rejected": s.rejected,
169 "warnings": s.warnings,
170 "errors": s.errors,
171 "_producer": self.producer(),
172 "_schemaURL": "https://github.com/malon64/floe/schemas/FloeQualityRunFacet.json"
173 });
174 json!([{
175 "namespace": self.config.namespace,
176 "name": name,
177 "facets": {
178 "schema": schema_facet,
179 "dataQualityMetrics": dq_facet,
180 "floeQualityRun": floe_facet
181 }
182 }])
183 } else {
184 json!([])
185 };
186
187 let body = json!({
188 "eventType": event_type,
189 "eventTime": event_time,
190 "run": {
191 "runId": entity_run_id,
192 "facets": run_facets
193 },
194 "job": {
195 "namespace": self.config.namespace,
196 "name": job_name,
197 "facets": {}
198 },
199 "inputs": inputs,
200 "outputs": [],
201 "producer": self.producer(),
202 "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
203 });
204 self.post_event(body);
205 }
206}
207
208struct EntityStats {
209 files: u64,
210 rows: u64,
211 accepted: u64,
212 rejected: u64,
213 warnings: u64,
214 errors: u64,
215 schema_fields: Vec<(String, String)>,
216}
217
218fn ms_to_iso8601(ms: u128) -> String {
219 let secs = (ms / 1000) as i64;
220 let nanos = ((ms % 1000) * 1_000_000) as i64;
221 match time::OffsetDateTime::from_unix_timestamp(secs) {
222 Ok(dt) => {
223 let ns = time::Duration::nanoseconds(nanos);
224 let dt = dt.saturating_add(ns);
225 dt.format(&time::format_description::well_known::Rfc3339)
226 .unwrap_or_else(|_| ms.to_string())
227 }
228 Err(_) => ms.to_string(),
229 }
230}
231
232impl RunObserver for OpenLineageObserver {
233 fn on_event(&self, event: RunEvent) {
234 match event {
235 RunEvent::RunStarted { run_id, ts_ms, .. } => {
236 if let Ok(mut guard) = self.run_start_ms.lock() {
237 *guard = Some(ts_ms);
238 }
239 let event_time = ms_to_iso8601(ts_ms);
240 let mut run_facets = json!({});
241 if let Some(parent) = self.parent_run_facet() {
242 run_facets["parent"] = parent;
243 }
244 let body = json!({
245 "eventType": "START",
246 "eventTime": event_time,
247 "run": {
248 "runId": run_id,
249 "facets": run_facets
250 },
251 "job": {
252 "namespace": self.config.namespace,
253 "name": run_id,
254 "facets": {}
255 },
256 "inputs": [],
257 "outputs": [],
258 "producer": self.producer(),
259 "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
260 });
261 self.post_event(body);
262 }
263 RunEvent::EntityStarted {
264 run_id,
265 name,
266 ts_ms,
267 } => {
268 let entity_run_id = format!("{run_id}.entity.{name}");
271 if let Ok(mut guard) = self.entity_start_ms.lock() {
272 guard.insert(name.clone(), ts_ms);
273 }
274 if let Ok(mut guard) = self.entity_run_ids.lock() {
275 guard.insert(name.clone(), entity_run_id.clone());
276 }
277 self.emit_entity_run_event(&entity_run_id, &name, "START", ts_ms, None);
278 }
279 RunEvent::EntityFinished {
280 run_id,
281 name,
282 status,
283 files,
284 rows,
285 accepted,
286 rejected,
287 warnings,
288 errors,
289 ts_ms,
290 } => {
291 let entity_run_id = self
292 .entity_run_ids
293 .lock()
294 .ok()
295 .and_then(|g| g.get(&name).cloned())
296 .unwrap_or_else(|| format!("{run_id}.entity.{name}"));
297 let event_type = if status == "failed" || status == "aborted" {
298 "FAIL"
299 } else {
300 "COMPLETE"
301 };
302 let schema_fields = self.entity_schemas.get(&name).cloned().unwrap_or_default();
303 let stats = EntityStats {
304 files,
305 rows,
306 accepted,
307 rejected,
308 warnings,
309 errors,
310 schema_fields,
311 };
312 self.emit_entity_run_event(&entity_run_id, &name, event_type, ts_ms, Some(stats));
313 }
314 RunEvent::RunFinished {
315 run_id,
316 status,
317 ts_ms,
318 ..
319 } => {
320 let event_type = if status == "failed" || status == "aborted" {
321 "FAIL"
322 } else {
323 "COMPLETE"
324 };
325 let event_time = ms_to_iso8601(ts_ms);
326 let mut run_facets = json!({});
327 if let Some(parent) = self.parent_run_facet() {
328 run_facets["parent"] = parent;
329 }
330 let body = json!({
331 "eventType": event_type,
332 "eventTime": event_time,
333 "run": {
334 "runId": run_id,
335 "facets": run_facets
336 },
337 "job": {
338 "namespace": self.config.namespace,
339 "name": run_id,
340 "facets": {}
341 },
342 "inputs": [],
343 "outputs": [],
344 "producer": self.producer(),
345 "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
346 });
347 self.post_event(body);
348 }
349 _ => {}
350 }
351 }
352}
353
354pub fn build_observer(
355 config: &LineageConfig,
356 entities: &[EntityConfig],
357) -> crate::FloeResult<Arc<dyn RunObserver>> {
358 let obs = OpenLineageObserver::new(config, entities)?;
359 Ok(Arc::new(obs))
360}