Skip to main content

floe_core/lineage/
mod.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use serde_json::{json, Value};
7
8use crate::config::{EntityConfig, LineageConfig};
9use crate::run::events::{RunEvent, RunObserver};
10
11struct EntityUris {
12    source: String,
13    accepted: String,
14    rejected: Option<String>,
15}
16
17pub struct OpenLineageObserver {
18    client: reqwest::blocking::Client,
19    config: LineageConfig,
20    entity_start_ms: Mutex<HashMap<String, u128>>,
21    entity_run_ids: Mutex<HashMap<String, String>>,
22    run_start_ms: Mutex<Option<u128>>,
23    entity_schemas: HashMap<String, Vec<(String, String)>>,
24    entity_uris: HashMap<String, EntityUris>,
25    consecutive_failures: AtomicUsize,
26    circuit_open: AtomicBool,
27}
28
29impl OpenLineageObserver {
30    pub fn new(config: &LineageConfig, entities: &[EntityConfig]) -> crate::FloeResult<Self> {
31        let timeout = Duration::from_secs(config.timeout_secs.unwrap_or(5));
32        let client = reqwest::blocking::Client::builder()
33            .timeout(timeout)
34            .build()
35            .map_err(|e| {
36                Box::new(crate::errors::ConfigError(format!(
37                    "lineage: failed to build HTTP client: {e}"
38                ))) as Box<dyn std::error::Error + Send + Sync>
39            })?;
40
41        let entity_schemas = entities
42            .iter()
43            .map(|e| {
44                let fields: Vec<(String, String)> = e
45                    .schema
46                    .columns
47                    .iter()
48                    .map(|c| (c.name.clone(), c.column_type.clone()))
49                    .collect();
50                (e.name.clone(), fields)
51            })
52            .collect();
53
54        let entity_uris = entities
55            .iter()
56            .map(|e| {
57                (
58                    e.name.clone(),
59                    EntityUris {
60                        source: e.source.path.clone(),
61                        accepted: e.sink.accepted.path.clone(),
62                        rejected: e.sink.rejected.as_ref().map(|r| r.path.clone()),
63                    },
64                )
65            })
66            .collect();
67
68        Ok(Self {
69            client,
70            config: config.clone(),
71            entity_start_ms: Mutex::new(HashMap::new()),
72            entity_run_ids: Mutex::new(HashMap::new()),
73            run_start_ms: Mutex::new(None),
74            entity_schemas,
75            entity_uris,
76            consecutive_failures: AtomicUsize::new(0),
77            circuit_open: AtomicBool::new(false),
78        })
79    }
80
81    fn attempt_post(&self, url: &str, body: &Value) -> Result<(), bool> {
82        let mut req = self.client.post(url).json(body);
83        if let Some(api_key) = self.config.api_key.as_deref() {
84            req = req.bearer_auth(api_key);
85        }
86        match req.send() {
87            Err(_) => Err(true),
88            Ok(resp) => {
89                let status = resp.status();
90                if status.is_success() {
91                    Ok(())
92                } else if status.as_u16() == 429 || status.is_server_error() {
93                    Err(true)
94                } else {
95                    Err(false)
96                }
97            }
98        }
99    }
100
101    fn post_event(&self, body: Value) {
102        if self.circuit_open.load(Ordering::Relaxed) {
103            return;
104        }
105
106        let url = format!("{}/api/v1/lineage", self.config.url.trim_end_matches('/'));
107        let max_failures = self.config.max_failures.unwrap_or(3) as usize;
108        let retry_delays_ms: &[u64] = &[0, 100, 500];
109
110        let mut succeeded = false;
111
112        'retry: for (attempt, &delay_ms) in retry_delays_ms.iter().enumerate() {
113            if delay_ms > 0 {
114                std::thread::sleep(Duration::from_millis(delay_ms));
115            }
116            match self.attempt_post(&url, &body) {
117                Ok(()) => {
118                    self.consecutive_failures.store(0, Ordering::Relaxed);
119                    succeeded = true;
120                    break 'retry;
121                }
122                Err(is_retryable) => {
123                    if !is_retryable || attempt == retry_delays_ms.len() - 1 {
124                        break 'retry;
125                    }
126                }
127            }
128        }
129
130        if succeeded {
131            return;
132        }
133
134        // Always warn when an event is dropped, whether the failure was retryable or not.
135        crate::warnings::emit(
136            "",
137            None,
138            None,
139            Some("lineage_http_error"),
140            &format!("lineage: POST {url} failed — event dropped"),
141        );
142
143        let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
144        if failures >= max_failures && !self.circuit_open.swap(true, Ordering::Relaxed) {
145            crate::warnings::emit(
146                "",
147                None,
148                None,
149                Some("lineage_circuit_open"),
150                &format!(
151                    "lineage: disabled for this run after {failures} consecutive failures — check endpoint {url}"
152                ),
153            );
154        }
155    }
156
157    fn producer(&self) -> &str {
158        self.config
159            .producer
160            .as_deref()
161            .unwrap_or("https://github.com/malon64/floe")
162    }
163
164    fn parent_run_facet(&self) -> Option<Value> {
165        if let Ok(run_id) = std::env::var("AIRFLOW_CTX_DAG_RUN_ID") {
166            let dag = std::env::var("AIRFLOW_CTX_DAG_ID").unwrap_or_default();
167            let task = std::env::var("AIRFLOW_CTX_TASK_ID").unwrap_or_default();
168            let job_name = if task.is_empty() {
169                dag.clone()
170            } else {
171                format!("{dag}.{task}")
172            };
173            return Some(json!({
174                "run": { "runId": run_id },
175                "job": {
176                    "namespace": self.config.namespace,
177                    "name": job_name
178                },
179                "_producer": self.producer(),
180                "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
181            }));
182        }
183        if let Ok(run_id) = std::env::var("DAGSTER_RUN_ID") {
184            let job = std::env::var("DAGSTER_JOB_NAME").unwrap_or_default();
185            return Some(json!({
186                "run": { "runId": run_id },
187                "job": {
188                    "namespace": self.config.namespace,
189                    "name": job
190                },
191                "_producer": self.producer(),
192                "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
193            }));
194        }
195        None
196    }
197
198    fn emit_entity_run_event(
199        &self,
200        entity_run_id: &str,
201        name: &str,
202        event_type: &str,
203        ts_ms: u128,
204        stats: Option<EntityStats>,
205        uris: Option<&EntityUris>,
206    ) {
207        let event_time = ms_to_iso8601(ts_ms);
208        let job_name = format!("{}.{name}", self.config.namespace);
209
210        let mut run_facets = json!({});
211        if let Some(parent) = self.parent_run_facet() {
212            run_facets["parent"] = parent;
213        }
214
215        // Build inputs: source dataset with schema and quality facets on COMPLETE/FAIL.
216        let inputs = match (stats.as_ref(), uris) {
217            (Some(s), Some(u)) => {
218                let rejection_rate = if s.rows > 0 {
219                    s.rejected as f64 / s.rows as f64
220                } else {
221                    0.0
222                };
223                let schema_facet = json!({
224                    "fields": s.schema_fields.iter().map(|(col_name, col_type)| {
225                        json!({ "name": col_name, "type": col_type })
226                    }).collect::<Vec<_>>(),
227                    "_producer": self.producer(),
228                    "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json"
229                });
230                let dq_facet = json!({
231                    "rowCount": s.rows,
232                    "validCount": s.accepted,
233                    "invalidCount": s.rejected,
234                    "_producer": self.producer(),
235                    "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/DataQualityMetricsInputDatasetFacet.json"
236                });
237                let floe_facet = json!({
238                    "entity": name,
239                    "rejectionRate": rejection_rate,
240                    "files": s.files,
241                    "rows": s.rows,
242                    "accepted": s.accepted,
243                    "rejected": s.rejected,
244                    "warnings": s.warnings,
245                    "errors": s.errors,
246                    "_producer": self.producer(),
247                    "_schemaURL": "https://github.com/malon64/floe/schemas/FloeQualityRunFacet.json"
248                });
249                json!([{
250                    "namespace": self.config.namespace,
251                    "name": u.source,
252                    "facets": {
253                        "schema": schema_facet,
254                        "dataQualityMetrics": dq_facet,
255                        "floeQualityRun": floe_facet
256                    }
257                }])
258            }
259            _ => json!([]),
260        };
261
262        // Build outputs: accepted sink always present; rejected sink when configured.
263        let outputs = match uris {
264            Some(u) => {
265                let mut out = vec![json!({
266                    "namespace": self.config.namespace,
267                    "name": u.accepted,
268                    "facets": {}
269                })];
270                if let Some(ref rej) = u.rejected {
271                    out.push(json!({
272                        "namespace": self.config.namespace,
273                        "name": rej,
274                        "facets": {}
275                    }));
276                }
277                json!(out)
278            }
279            None => json!([]),
280        };
281
282        let body = json!({
283            "eventType": event_type,
284            "eventTime": event_time,
285            "run": {
286                "runId": entity_run_id,
287                "facets": run_facets
288            },
289            "job": {
290                "namespace": self.config.namespace,
291                "name": job_name,
292                "facets": {}
293            },
294            "inputs": inputs,
295            "outputs": outputs,
296            "producer": self.producer(),
297            "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
298        });
299        self.post_event(body);
300    }
301}
302
303struct EntityStats {
304    files: u64,
305    rows: u64,
306    accepted: u64,
307    rejected: u64,
308    warnings: u64,
309    errors: u64,
310    schema_fields: Vec<(String, String)>,
311}
312
313fn ms_to_iso8601(ms: u128) -> String {
314    let secs = (ms / 1000) as i64;
315    let nanos = ((ms % 1000) * 1_000_000) as i64;
316    match time::OffsetDateTime::from_unix_timestamp(secs) {
317        Ok(dt) => {
318            let ns = time::Duration::nanoseconds(nanos);
319            let dt = dt.saturating_add(ns);
320            dt.format(&time::format_description::well_known::Rfc3339)
321                .unwrap_or_else(|_| ms.to_string())
322        }
323        Err(_) => ms.to_string(),
324    }
325}
326
327impl RunObserver for OpenLineageObserver {
328    fn on_event(&self, event: RunEvent) {
329        match event {
330            RunEvent::RunStarted { run_id, ts_ms, .. } => {
331                // Reset circuit breaker at the start of each run so a recovered endpoint
332                // is retried in subsequent runs within the same long-lived process.
333                self.consecutive_failures.store(0, Ordering::Relaxed);
334                self.circuit_open.store(false, Ordering::Relaxed);
335                if let Ok(mut guard) = self.run_start_ms.lock() {
336                    *guard = Some(ts_ms);
337                }
338                let event_time = ms_to_iso8601(ts_ms);
339                let mut run_facets = json!({});
340                if let Some(parent) = self.parent_run_facet() {
341                    run_facets["parent"] = parent;
342                }
343                let body = json!({
344                    "eventType": "START",
345                    "eventTime": event_time,
346                    "run": {
347                        "runId": run_id,
348                        "facets": run_facets
349                    },
350                    "job": {
351                        "namespace": self.config.namespace,
352                        "name": run_id,
353                        "facets": {}
354                    },
355                    "inputs": [],
356                    "outputs": [],
357                    "producer": self.producer(),
358                    "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
359                });
360                self.post_event(body);
361            }
362            RunEvent::EntityStarted {
363                run_id,
364                name,
365                ts_ms,
366            } => {
367                // Use a per-entity run_id (overall_run_id.entity.name) so that
368                // the START and COMPLETE events for the same entity share the same run_id.
369                let entity_run_id = format!("{run_id}.entity.{name}");
370                if let Ok(mut guard) = self.entity_start_ms.lock() {
371                    guard.insert(name.clone(), ts_ms);
372                }
373                if let Ok(mut guard) = self.entity_run_ids.lock() {
374                    guard.insert(name.clone(), entity_run_id.clone());
375                }
376                self.emit_entity_run_event(&entity_run_id, &name, "START", ts_ms, None, None);
377            }
378            RunEvent::EntityFinished {
379                run_id,
380                name,
381                status,
382                files,
383                files_skipped: _,
384                rows,
385                accepted,
386                rejected,
387                warnings,
388                errors,
389                ts_ms,
390            } => {
391                let entity_run_id = self
392                    .entity_run_ids
393                    .lock()
394                    .ok()
395                    .and_then(|g| g.get(&name).cloned())
396                    .unwrap_or_else(|| format!("{run_id}.entity.{name}"));
397                let event_type = if status == "failed" || status == "aborted" {
398                    "FAIL"
399                } else {
400                    "COMPLETE"
401                };
402                let schema_fields = self.entity_schemas.get(&name).cloned().unwrap_or_default();
403                let stats = EntityStats {
404                    files,
405                    rows,
406                    accepted,
407                    rejected,
408                    warnings,
409                    errors,
410                    schema_fields,
411                };
412                let uris = self.entity_uris.get(&name);
413                self.emit_entity_run_event(
414                    &entity_run_id,
415                    &name,
416                    event_type,
417                    ts_ms,
418                    Some(stats),
419                    uris,
420                );
421            }
422            RunEvent::RunFinished {
423                run_id,
424                status,
425                ts_ms,
426                ..
427            } => {
428                let event_type = if status == "failed" || status == "aborted" {
429                    "FAIL"
430                } else {
431                    "COMPLETE"
432                };
433                let event_time = ms_to_iso8601(ts_ms);
434                let mut run_facets = json!({});
435                if let Some(parent) = self.parent_run_facet() {
436                    run_facets["parent"] = parent;
437                }
438                let body = json!({
439                    "eventType": event_type,
440                    "eventTime": event_time,
441                    "run": {
442                        "runId": run_id,
443                        "facets": run_facets
444                    },
445                    "job": {
446                        "namespace": self.config.namespace,
447                        "name": run_id,
448                        "facets": {}
449                    },
450                    "inputs": [],
451                    "outputs": [],
452                    "producer": self.producer(),
453                    "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
454                });
455                self.post_event(body);
456            }
457            _ => {}
458        }
459    }
460}
461
462pub fn build_observer(
463    config: &LineageConfig,
464    entities: &[EntityConfig],
465) -> crate::FloeResult<Arc<dyn RunObserver>> {
466    let obs = OpenLineageObserver::new(config, entities)?;
467    Ok(Arc::new(obs))
468}
469
470impl OpenLineageObserver {
471    pub fn is_circuit_open(&self) -> bool {
472        self.circuit_open.load(Ordering::Relaxed)
473    }
474
475    pub fn consecutive_failures(&self) -> usize {
476        self.consecutive_failures.load(Ordering::Relaxed)
477    }
478}