Skip to main content

floe_core/lineage/
mod.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use serde_json::{json, Value};
7
8use crate::config::{EntityConfig, LineageConfig};
9use crate::run::events::{RunEvent, RunObserver};
10
11pub struct OpenLineageObserver {
12    client: reqwest::blocking::Client,
13    config: LineageConfig,
14    entity_start_ms: Mutex<HashMap<String, u128>>,
15    entity_run_ids: Mutex<HashMap<String, String>>,
16    run_start_ms: Mutex<Option<u128>>,
17    entity_schemas: HashMap<String, Vec<(String, String)>>,
18    consecutive_failures: AtomicUsize,
19    circuit_open: AtomicBool,
20}
21
22impl OpenLineageObserver {
23    pub fn new(config: &LineageConfig, entities: &[EntityConfig]) -> crate::FloeResult<Self> {
24        let timeout = Duration::from_secs(config.timeout_secs.unwrap_or(5));
25        let client = reqwest::blocking::Client::builder()
26            .timeout(timeout)
27            .build()
28            .map_err(|e| {
29                Box::new(crate::errors::ConfigError(format!(
30                    "lineage: failed to build HTTP client: {e}"
31                ))) as Box<dyn std::error::Error + Send + Sync>
32            })?;
33
34        let entity_schemas = entities
35            .iter()
36            .map(|e| {
37                let fields: Vec<(String, String)> = e
38                    .schema
39                    .columns
40                    .iter()
41                    .map(|c| (c.name.clone(), c.column_type.clone()))
42                    .collect();
43                (e.name.clone(), fields)
44            })
45            .collect();
46
47        Ok(Self {
48            client,
49            config: config.clone(),
50            entity_start_ms: Mutex::new(HashMap::new()),
51            entity_run_ids: Mutex::new(HashMap::new()),
52            run_start_ms: Mutex::new(None),
53            entity_schemas,
54            consecutive_failures: AtomicUsize::new(0),
55            circuit_open: AtomicBool::new(false),
56        })
57    }
58
59    fn attempt_post(&self, url: &str, body: &Value) -> Result<(), bool> {
60        let mut req = self.client.post(url).json(body);
61        if let Some(api_key) = self.config.api_key.as_deref() {
62            req = req.bearer_auth(api_key);
63        }
64        match req.send() {
65            Err(_) => Err(true),
66            Ok(resp) => {
67                let status = resp.status();
68                if status.is_success() {
69                    Ok(())
70                } else if status.as_u16() == 429 || status.is_server_error() {
71                    Err(true)
72                } else {
73                    Err(false)
74                }
75            }
76        }
77    }
78
79    fn post_event(&self, body: Value) {
80        if self.circuit_open.load(Ordering::Relaxed) {
81            return;
82        }
83
84        let url = format!("{}/api/v1/lineage", self.config.url.trim_end_matches('/'));
85        let max_failures = self.config.max_failures.unwrap_or(3) as usize;
86        let retry_delays_ms: &[u64] = &[0, 100, 500];
87
88        let mut succeeded = false;
89
90        'retry: for (attempt, &delay_ms) in retry_delays_ms.iter().enumerate() {
91            if delay_ms > 0 {
92                std::thread::sleep(Duration::from_millis(delay_ms));
93            }
94            match self.attempt_post(&url, &body) {
95                Ok(()) => {
96                    self.consecutive_failures.store(0, Ordering::Relaxed);
97                    succeeded = true;
98                    break 'retry;
99                }
100                Err(is_retryable) => {
101                    if !is_retryable || attempt == retry_delays_ms.len() - 1 {
102                        break 'retry;
103                    }
104                }
105            }
106        }
107
108        if succeeded {
109            return;
110        }
111
112        // Always warn when an event is dropped, whether the failure was retryable or not.
113        crate::warnings::emit(
114            "",
115            None,
116            None,
117            Some("lineage_http_error"),
118            &format!("lineage: POST {url} failed — event dropped"),
119        );
120
121        let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
122        if failures >= max_failures && !self.circuit_open.swap(true, Ordering::Relaxed) {
123            crate::warnings::emit(
124                "",
125                None,
126                None,
127                Some("lineage_circuit_open"),
128                &format!(
129                    "lineage: disabled for this run after {failures} consecutive failures — check endpoint {url}"
130                ),
131            );
132        }
133    }
134
135    fn producer(&self) -> &str {
136        self.config
137            .producer
138            .as_deref()
139            .unwrap_or("https://github.com/malon64/floe")
140    }
141
142    fn parent_run_facet(&self) -> Option<Value> {
143        if let Ok(run_id) = std::env::var("AIRFLOW_CTX_DAG_RUN_ID") {
144            let dag = std::env::var("AIRFLOW_CTX_DAG_ID").unwrap_or_default();
145            let task = std::env::var("AIRFLOW_CTX_TASK_ID").unwrap_or_default();
146            let job_name = if task.is_empty() {
147                dag.clone()
148            } else {
149                format!("{dag}.{task}")
150            };
151            return Some(json!({
152                "run": { "runId": run_id },
153                "job": {
154                    "namespace": self.config.namespace,
155                    "name": job_name
156                },
157                "_producer": self.producer(),
158                "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
159            }));
160        }
161        if let Ok(run_id) = std::env::var("DAGSTER_RUN_ID") {
162            let job = std::env::var("DAGSTER_JOB_NAME").unwrap_or_default();
163            return Some(json!({
164                "run": { "runId": run_id },
165                "job": {
166                    "namespace": self.config.namespace,
167                    "name": job
168                },
169                "_producer": self.producer(),
170                "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
171            }));
172        }
173        None
174    }
175
176    fn emit_entity_run_event(
177        &self,
178        entity_run_id: &str,
179        name: &str,
180        event_type: &str,
181        ts_ms: u128,
182        stats: Option<EntityStats>,
183    ) {
184        let event_time = ms_to_iso8601(ts_ms);
185        let job_name = format!("{}.{name}", self.config.namespace);
186
187        let mut run_facets = json!({});
188        if let Some(parent) = self.parent_run_facet() {
189            run_facets["parent"] = parent;
190        }
191
192        // Build inputs with all facets for COMPLETE events; empty for START.
193        let inputs = if let Some(ref s) = stats {
194            let rejection_rate = if s.rows > 0 {
195                s.rejected as f64 / s.rows as f64
196            } else {
197                0.0
198            };
199            let schema_facet = json!({
200                "fields": s.schema_fields.iter().map(|(col_name, col_type)| {
201                    json!({ "name": col_name, "type": col_type })
202                }).collect::<Vec<_>>(),
203                "_producer": self.producer(),
204                "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json"
205            });
206            let dq_facet = json!({
207                "rowCount": s.rows,
208                "validCount": s.accepted,
209                "invalidCount": s.rejected,
210                "_producer": self.producer(),
211                "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/DataQualityMetricsInputDatasetFacet.json"
212            });
213            let floe_facet = json!({
214                "entity": name,
215                "rejectionRate": rejection_rate,
216                "files": s.files,
217                "rows": s.rows,
218                "accepted": s.accepted,
219                "rejected": s.rejected,
220                "warnings": s.warnings,
221                "errors": s.errors,
222                "_producer": self.producer(),
223                "_schemaURL": "https://github.com/malon64/floe/schemas/FloeQualityRunFacet.json"
224            });
225            json!([{
226                "namespace": self.config.namespace,
227                "name": name,
228                "facets": {
229                    "schema": schema_facet,
230                    "dataQualityMetrics": dq_facet,
231                    "floeQualityRun": floe_facet
232                }
233            }])
234        } else {
235            json!([])
236        };
237
238        let body = json!({
239            "eventType": event_type,
240            "eventTime": event_time,
241            "run": {
242                "runId": entity_run_id,
243                "facets": run_facets
244            },
245            "job": {
246                "namespace": self.config.namespace,
247                "name": job_name,
248                "facets": {}
249            },
250            "inputs": inputs,
251            "outputs": [],
252            "producer": self.producer(),
253            "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
254        });
255        self.post_event(body);
256    }
257}
258
259struct EntityStats {
260    files: u64,
261    rows: u64,
262    accepted: u64,
263    rejected: u64,
264    warnings: u64,
265    errors: u64,
266    schema_fields: Vec<(String, String)>,
267}
268
269fn ms_to_iso8601(ms: u128) -> String {
270    let secs = (ms / 1000) as i64;
271    let nanos = ((ms % 1000) * 1_000_000) as i64;
272    match time::OffsetDateTime::from_unix_timestamp(secs) {
273        Ok(dt) => {
274            let ns = time::Duration::nanoseconds(nanos);
275            let dt = dt.saturating_add(ns);
276            dt.format(&time::format_description::well_known::Rfc3339)
277                .unwrap_or_else(|_| ms.to_string())
278        }
279        Err(_) => ms.to_string(),
280    }
281}
282
283impl RunObserver for OpenLineageObserver {
284    fn on_event(&self, event: RunEvent) {
285        match event {
286            RunEvent::RunStarted { run_id, ts_ms, .. } => {
287                // Reset circuit breaker at the start of each run so a recovered endpoint
288                // is retried in subsequent runs within the same long-lived process.
289                self.consecutive_failures.store(0, Ordering::Relaxed);
290                self.circuit_open.store(false, Ordering::Relaxed);
291                if let Ok(mut guard) = self.run_start_ms.lock() {
292                    *guard = Some(ts_ms);
293                }
294                let event_time = ms_to_iso8601(ts_ms);
295                let mut run_facets = json!({});
296                if let Some(parent) = self.parent_run_facet() {
297                    run_facets["parent"] = parent;
298                }
299                let body = json!({
300                    "eventType": "START",
301                    "eventTime": event_time,
302                    "run": {
303                        "runId": run_id,
304                        "facets": run_facets
305                    },
306                    "job": {
307                        "namespace": self.config.namespace,
308                        "name": run_id,
309                        "facets": {}
310                    },
311                    "inputs": [],
312                    "outputs": [],
313                    "producer": self.producer(),
314                    "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
315                });
316                self.post_event(body);
317            }
318            RunEvent::EntityStarted {
319                run_id,
320                name,
321                ts_ms,
322            } => {
323                // Use a per-entity run_id (overall_run_id.entity.name) so that
324                // the START and COMPLETE events for the same entity share the same run_id.
325                let entity_run_id = format!("{run_id}.entity.{name}");
326                if let Ok(mut guard) = self.entity_start_ms.lock() {
327                    guard.insert(name.clone(), ts_ms);
328                }
329                if let Ok(mut guard) = self.entity_run_ids.lock() {
330                    guard.insert(name.clone(), entity_run_id.clone());
331                }
332                self.emit_entity_run_event(&entity_run_id, &name, "START", ts_ms, None);
333            }
334            RunEvent::EntityFinished {
335                run_id,
336                name,
337                status,
338                files,
339                rows,
340                accepted,
341                rejected,
342                warnings,
343                errors,
344                ts_ms,
345            } => {
346                let entity_run_id = self
347                    .entity_run_ids
348                    .lock()
349                    .ok()
350                    .and_then(|g| g.get(&name).cloned())
351                    .unwrap_or_else(|| format!("{run_id}.entity.{name}"));
352                let event_type = if status == "failed" || status == "aborted" {
353                    "FAIL"
354                } else {
355                    "COMPLETE"
356                };
357                let schema_fields = self.entity_schemas.get(&name).cloned().unwrap_or_default();
358                let stats = EntityStats {
359                    files,
360                    rows,
361                    accepted,
362                    rejected,
363                    warnings,
364                    errors,
365                    schema_fields,
366                };
367                self.emit_entity_run_event(&entity_run_id, &name, event_type, ts_ms, Some(stats));
368            }
369            RunEvent::RunFinished {
370                run_id,
371                status,
372                ts_ms,
373                ..
374            } => {
375                let event_type = if status == "failed" || status == "aborted" {
376                    "FAIL"
377                } else {
378                    "COMPLETE"
379                };
380                let event_time = ms_to_iso8601(ts_ms);
381                let mut run_facets = json!({});
382                if let Some(parent) = self.parent_run_facet() {
383                    run_facets["parent"] = parent;
384                }
385                let body = json!({
386                    "eventType": event_type,
387                    "eventTime": event_time,
388                    "run": {
389                        "runId": run_id,
390                        "facets": run_facets
391                    },
392                    "job": {
393                        "namespace": self.config.namespace,
394                        "name": run_id,
395                        "facets": {}
396                    },
397                    "inputs": [],
398                    "outputs": [],
399                    "producer": self.producer(),
400                    "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
401                });
402                self.post_event(body);
403            }
404            _ => {}
405        }
406    }
407}
408
409pub fn build_observer(
410    config: &LineageConfig,
411    entities: &[EntityConfig],
412) -> crate::FloeResult<Arc<dyn RunObserver>> {
413    let obs = OpenLineageObserver::new(config, entities)?;
414    Ok(Arc::new(obs))
415}
416
417impl OpenLineageObserver {
418    pub fn is_circuit_open(&self) -> bool {
419        self.circuit_open.load(Ordering::Relaxed)
420    }
421
422    pub fn consecutive_failures(&self) -> usize {
423        self.consecutive_failures.load(Ordering::Relaxed)
424    }
425}