Skip to main content

floe_core/lineage/
mod.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use serde_json::{json, Value};
7
8use crate::config::{EntityConfig, LineageConfig};
9use crate::run::events::{RunEvent, RunObserver};
10
11const DEFAULT_PRODUCER: &str = concat!(
12    "https://github.com/malon64/floe/releases/tag/v",
13    env!("CARGO_PKG_VERSION")
14);
15
16#[derive(Clone)]
17struct ColumnMapping {
18    output_name: String,
19    column_type: String,
20    source_field: Option<String>,
21}
22
23struct EntityUris {
24    source: String,
25    accepted: String,
26    rejected: Option<String>,
27}
28
29pub struct OpenLineageObserver {
30    client: reqwest::blocking::Client,
31    config: LineageConfig,
32    entity_start_ms: Mutex<HashMap<String, u128>>,
33    entity_run_ids: Mutex<HashMap<String, String>>,
34    run_start_ms: Mutex<Option<u128>>,
35    entity_schemas: HashMap<String, Vec<ColumnMapping>>,
36    entity_uris: HashMap<String, EntityUris>,
37    run_job_name: String,
38    consecutive_failures: AtomicUsize,
39    circuit_open: AtomicBool,
40}
41
42impl OpenLineageObserver {
43    pub fn new(
44        config: &LineageConfig,
45        entities: &[EntityConfig],
46        config_path: &str,
47    ) -> crate::FloeResult<Self> {
48        let timeout = Duration::from_secs(config.timeout_secs.unwrap_or(5));
49        let client = reqwest::blocking::Client::builder()
50            .timeout(timeout)
51            .build()
52            .map_err(|e| {
53                Box::new(crate::errors::ConfigError(format!(
54                    "lineage: failed to build HTTP client: {e}"
55                ))) as Box<dyn std::error::Error + Send + Sync>
56            })?;
57
58        let run_job_name = config
59            .job_name
60            .clone()
61            .filter(|s| !s.is_empty())
62            .unwrap_or_else(|| {
63                std::path::Path::new(config_path)
64                    .file_stem()
65                    .and_then(|s| s.to_str())
66                    .unwrap_or("floe-run")
67                    .to_string()
68            });
69
70        let entity_schemas = entities
71            .iter()
72            .map(|e| {
73                let fields: Vec<ColumnMapping> = e
74                    .schema
75                    .columns
76                    .iter()
77                    .map(|c| ColumnMapping {
78                        output_name: c.name.clone(),
79                        column_type: c.column_type.clone(),
80                        source_field: c.source.clone(),
81                    })
82                    .collect();
83                (e.name.clone(), fields)
84            })
85            .collect();
86
87        let entity_uris = entities
88            .iter()
89            .map(|e| {
90                (
91                    e.name.clone(),
92                    EntityUris {
93                        source: e.source.path.clone(),
94                        accepted: e.sink.accepted.path.clone(),
95                        rejected: e.sink.rejected.as_ref().map(|r| r.path.clone()),
96                    },
97                )
98            })
99            .collect();
100
101        Ok(Self {
102            client,
103            config: config.clone(),
104            entity_start_ms: Mutex::new(HashMap::new()),
105            entity_run_ids: Mutex::new(HashMap::new()),
106            run_start_ms: Mutex::new(None),
107            entity_schemas,
108            entity_uris,
109            run_job_name,
110            consecutive_failures: AtomicUsize::new(0),
111            circuit_open: AtomicBool::new(false),
112        })
113    }
114
115    fn attempt_post(&self, url: &str, body: &Value) -> Result<(), bool> {
116        let mut req = self.client.post(url).json(body);
117        if let Some(api_key) = self.config.api_key.as_deref() {
118            req = req.bearer_auth(api_key);
119        }
120        match req.send() {
121            Err(_) => Err(true),
122            Ok(resp) => {
123                let status = resp.status();
124                if status.is_success() {
125                    Ok(())
126                } else if status.as_u16() == 429 || status.is_server_error() {
127                    Err(true)
128                } else {
129                    Err(false)
130                }
131            }
132        }
133    }
134
135    fn post_event(&self, body: Value) {
136        if self.circuit_open.load(Ordering::Relaxed) {
137            return;
138        }
139
140        let url = format!("{}/api/v1/lineage", self.config.url.trim_end_matches('/'));
141        let max_failures = self.config.max_failures.unwrap_or(3) as usize;
142        let retry_delays_ms: &[u64] = &[0, 100, 500];
143
144        let mut succeeded = false;
145
146        'retry: for (attempt, &delay_ms) in retry_delays_ms.iter().enumerate() {
147            if delay_ms > 0 {
148                std::thread::sleep(Duration::from_millis(delay_ms));
149            }
150            match self.attempt_post(&url, &body) {
151                Ok(()) => {
152                    self.consecutive_failures.store(0, Ordering::Relaxed);
153                    succeeded = true;
154                    break 'retry;
155                }
156                Err(is_retryable) => {
157                    if !is_retryable || attempt == retry_delays_ms.len() - 1 {
158                        break 'retry;
159                    }
160                }
161            }
162        }
163
164        if succeeded {
165            return;
166        }
167
168        // Always warn when an event is dropped, whether the failure was retryable or not.
169        crate::warnings::emit(
170            "",
171            None,
172            None,
173            Some("lineage_http_error"),
174            &format!("lineage: POST {url} failed — event dropped"),
175        );
176
177        let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
178        if failures >= max_failures && !self.circuit_open.swap(true, Ordering::Relaxed) {
179            crate::warnings::emit(
180                "",
181                None,
182                None,
183                Some("lineage_circuit_open"),
184                &format!(
185                    "lineage: disabled for this run after {failures} consecutive failures — check endpoint {url}"
186                ),
187            );
188        }
189    }
190
191    fn producer(&self) -> &str {
192        self.config.producer.as_deref().unwrap_or(DEFAULT_PRODUCER)
193    }
194
195    fn parent_run_facet(&self) -> Option<Value> {
196        if let Ok(run_id) = std::env::var("AIRFLOW_CTX_DAG_RUN_ID") {
197            let dag = std::env::var("AIRFLOW_CTX_DAG_ID").unwrap_or_default();
198            let task = std::env::var("AIRFLOW_CTX_TASK_ID").unwrap_or_default();
199            let job_name = if task.is_empty() {
200                dag.clone()
201            } else {
202                format!("{dag}.{task}")
203            };
204            return Some(json!({
205                "run": { "runId": run_id },
206                "job": {
207                    "namespace": self.config.namespace,
208                    "name": job_name
209                },
210                "_producer": self.producer(),
211                "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
212            }));
213        }
214        if let Ok(run_id) = std::env::var("DAGSTER_RUN_ID") {
215            let job = std::env::var("DAGSTER_JOB_NAME").unwrap_or_default();
216            return Some(json!({
217                "run": { "runId": run_id },
218                "job": {
219                    "namespace": self.config.namespace,
220                    "name": job
221                },
222                "_producer": self.producer(),
223                "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
224            }));
225        }
226        None
227    }
228
229    fn emit_entity_run_event(
230        &self,
231        entity_run_id: &str,
232        name: &str,
233        event_type: &str,
234        ts_ms: u128,
235        stats: Option<EntityStats>,
236        uris: Option<&EntityUris>,
237    ) {
238        let event_time = ms_to_iso8601(ts_ms);
239        let job_name = format!("{}.{name}", self.config.namespace);
240
241        let mut run_facets = json!({});
242        if let Some(parent) = self.parent_run_facet() {
243            run_facets["parent"] = parent;
244        }
245
246        // Build inputs/outputs based on whether stats and uris are present (COMPLETE/FAIL)
247        // or absent (START — keep both empty).
248        let (inputs, outputs) = match (stats.as_ref(), uris) {
249            (Some(s), Some(u)) => {
250                let rejection_rate = if s.rows > 0 {
251                    s.rejected as f64 / s.rows as f64
252                } else {
253                    0.0
254                };
255
256                // Input: source dataset — sub-namespace avoids collision with real entity names.
257                let (src_ns, src_path) = split_storage_uri(&u.source);
258                let inputs = json!([{
259                    "namespace": format!("{}.source", self.config.namespace),
260                    "name": name,
261                    "facets": {
262                        "symlinks": symlinks_facet(self.producer(), &src_ns, &src_path, "DIRECTORY")
263                    }
264                }]);
265
266                // Accepted output: entity name as logical identifier, TABLE type.
267                let (acc_ns, acc_path) = split_storage_uri(&u.accepted);
268                let schema_facet = json!({
269                    "fields": s.schema_fields.iter().map(|col| {
270                        json!({ "name": col.output_name, "type": col.column_type })
271                    }).collect::<Vec<_>>(),
272                    "_producer": self.producer(),
273                    "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json"
274                });
275                let accepted_dq_facet = json!({
276                    "rowCount": s.accepted,
277                    "validCount": s.accepted,
278                    "invalidCount": 0u64,
279                    "_producer": self.producer(),
280                    "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/DataQualityMetricsOutputDatasetFacet.json"
281                });
282                let floe_facet = json!({
283                    "entity": name,
284                    "rejectionRate": rejection_rate,
285                    "files": s.files,
286                    "rows": s.rows,
287                    "warnings": s.warnings,
288                    "errors": s.errors,
289                    "_producer": self.producer(),
290                    "_schemaURL": "https://github.com/malon64/floe/schemas/FloeQualityRunFacet.json"
291                });
292
293                let mut accepted_facets = json!({
294                    "symlinks": symlinks_facet(self.producer(), &acc_ns, &acc_path, "TABLE"),
295                    "schema": schema_facet,
296                    "dataQualityMetrics": accepted_dq_facet,
297                    "floeQualityRun": floe_facet
298                });
299
300                if !s.schema_fields.is_empty() {
301                    let fields_map: serde_json::Map<String, Value> = s
302                        .schema_fields
303                        .iter()
304                        .map(|col| {
305                            let src = col.source_field.as_deref().unwrap_or(&col.output_name);
306                            let entry = json!({
307                                "inputFields": [{
308                                    "namespace": format!("{}.source", self.config.namespace),
309                                    "name": name,
310                                    "field": src
311                                }]
312                            });
313                            (col.output_name.clone(), entry)
314                        })
315                        .collect();
316                    accepted_facets["columnLineage"] = json!({
317                        "fields": fields_map,
318                        "_producer": self.producer(),
319                        "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ColumnLineageDatasetFacet.json"
320                    });
321                }
322
323                let mut out = vec![json!({
324                    "namespace": self.config.namespace,
325                    "name": name,
326                    "facets": accepted_facets
327                })];
328
329                // Rejected output (when configured): DIRECTORY type, rejected-row quality metrics.
330                if let Some(ref rej) = u.rejected {
331                    let (rej_ns, rej_path) = split_storage_uri(rej);
332                    let rejected_dq_facet = json!({
333                        "rowCount": s.rejected,
334                        "validCount": 0u64,
335                        "invalidCount": s.rejected,
336                        "_producer": self.producer(),
337                        "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/DataQualityMetricsOutputDatasetFacet.json"
338                    });
339                    out.push(json!({
340                        "namespace": format!("{}.rejected", self.config.namespace),
341                        "name": name,
342                        "facets": {
343                            "symlinks": symlinks_facet(self.producer(), &rej_ns, &rej_path, "DIRECTORY"),
344                            "dataQualityMetrics": rejected_dq_facet
345                        }
346                    }));
347                }
348
349                (inputs, json!(out))
350            }
351            _ => (json!([]), json!([])),
352        };
353
354        let body = json!({
355            "eventType": event_type,
356            "eventTime": event_time,
357            "run": {
358                "runId": entity_run_id,
359                "facets": run_facets
360            },
361            "job": {
362                "namespace": self.config.namespace,
363                "name": job_name,
364                "facets": {}
365            },
366            "inputs": inputs,
367            "outputs": outputs,
368            "producer": self.producer(),
369            "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
370        });
371        self.post_event(body);
372    }
373}
374
375struct EntityStats {
376    files: u64,
377    rows: u64,
378    accepted: u64,
379    rejected: u64,
380    warnings: u64,
381    errors: u64,
382    schema_fields: Vec<ColumnMapping>,
383}
384
385fn ms_to_iso8601(ms: u128) -> String {
386    let secs = (ms / 1000) as i64;
387    let nanos = ((ms % 1000) * 1_000_000) as i64;
388    match time::OffsetDateTime::from_unix_timestamp(secs) {
389        Ok(dt) => {
390            let ns = time::Duration::nanoseconds(nanos);
391            let dt = dt.saturating_add(ns);
392            dt.format(&time::format_description::well_known::Rfc3339)
393                .unwrap_or_else(|_| ms.to_string())
394        }
395        Err(_) => ms.to_string(),
396    }
397}
398
399fn split_storage_uri(uri: &str) -> (String, String) {
400    // abfss:// must precede abfs:// so the longer prefix matches first.
401    let cloud_prefixes = ["s3://", "gs://", "gcs://", "az://", "abfss://", "abfs://"];
402    for prefix in cloud_prefixes {
403        if let Some(after_scheme) = uri.strip_prefix(prefix) {
404            if let Some(slash) = after_scheme.find('/') {
405                let authority = uri[..prefix.len() + slash].to_string();
406                let path = after_scheme[slash..].to_string();
407                return (authority, path);
408            }
409            return (uri.to_string(), "/".to_string());
410        }
411    }
412    ("file".to_string(), uri.to_string())
413}
414
415fn symlinks_facet(producer: &str, namespace: &str, name: &str, ds_type: &str) -> Value {
416    json!({
417        "identifiers": [{ "namespace": namespace, "name": name, "type": ds_type }],
418        "_producer": producer,
419        "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SymlinksDatasetFacet.json"
420    })
421}
422
423impl RunObserver for OpenLineageObserver {
424    fn on_event(&self, event: RunEvent) {
425        match event {
426            RunEvent::RunStarted { run_id, ts_ms, .. } => {
427                // Reset circuit breaker at the start of each run so a recovered endpoint
428                // is retried in subsequent runs within the same long-lived process.
429                self.consecutive_failures.store(0, Ordering::Relaxed);
430                self.circuit_open.store(false, Ordering::Relaxed);
431                if let Ok(mut guard) = self.run_start_ms.lock() {
432                    *guard = Some(ts_ms);
433                }
434                let event_time = ms_to_iso8601(ts_ms);
435                let mut run_facets = json!({});
436                if let Some(parent) = self.parent_run_facet() {
437                    run_facets["parent"] = parent;
438                }
439                let body = json!({
440                    "eventType": "START",
441                    "eventTime": event_time,
442                    "run": {
443                        "runId": run_id,
444                        "facets": run_facets
445                    },
446                    "job": {
447                        "namespace": self.config.namespace,
448                        "name": self.run_job_name,
449                        "facets": {}
450                    },
451                    "inputs": [],
452                    "outputs": [],
453                    "producer": self.producer(),
454                    "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
455                });
456                self.post_event(body);
457            }
458            RunEvent::EntityStarted {
459                run_id,
460                name,
461                ts_ms,
462            } => {
463                // Use a per-entity run_id (overall_run_id.entity.name) so that
464                // the START and COMPLETE events for the same entity share the same run_id.
465                let entity_run_id = format!("{run_id}.entity.{name}");
466                if let Ok(mut guard) = self.entity_start_ms.lock() {
467                    guard.insert(name.clone(), ts_ms);
468                }
469                if let Ok(mut guard) = self.entity_run_ids.lock() {
470                    guard.insert(name.clone(), entity_run_id.clone());
471                }
472                self.emit_entity_run_event(&entity_run_id, &name, "START", ts_ms, None, None);
473            }
474            RunEvent::EntityFinished {
475                run_id,
476                name,
477                status,
478                files,
479                files_skipped: _,
480                rows,
481                accepted,
482                rejected,
483                warnings,
484                errors,
485                ts_ms,
486            } => {
487                let entity_run_id = self
488                    .entity_run_ids
489                    .lock()
490                    .ok()
491                    .and_then(|g| g.get(&name).cloned())
492                    .unwrap_or_else(|| format!("{run_id}.entity.{name}"));
493                let event_type = if status == "failed" || status == "aborted" {
494                    "FAIL"
495                } else {
496                    "COMPLETE"
497                };
498                let schema_fields = self.entity_schemas.get(&name).cloned().unwrap_or_default();
499                let stats = EntityStats {
500                    files,
501                    rows,
502                    accepted,
503                    rejected,
504                    warnings,
505                    errors,
506                    schema_fields,
507                };
508                let uris = self.entity_uris.get(&name);
509                self.emit_entity_run_event(
510                    &entity_run_id,
511                    &name,
512                    event_type,
513                    ts_ms,
514                    Some(stats),
515                    uris,
516                );
517            }
518            RunEvent::RunFinished {
519                run_id,
520                status,
521                ts_ms,
522                ..
523            } => {
524                let event_type = if status == "failed" || status == "aborted" {
525                    "FAIL"
526                } else {
527                    "COMPLETE"
528                };
529                let event_time = ms_to_iso8601(ts_ms);
530                let mut run_facets = json!({});
531                if let Some(parent) = self.parent_run_facet() {
532                    run_facets["parent"] = parent;
533                }
534                let body = json!({
535                    "eventType": event_type,
536                    "eventTime": event_time,
537                    "run": {
538                        "runId": run_id,
539                        "facets": run_facets
540                    },
541                    "job": {
542                        "namespace": self.config.namespace,
543                        "name": self.run_job_name,
544                        "facets": {}
545                    },
546                    "inputs": [],
547                    "outputs": [],
548                    "producer": self.producer(),
549                    "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
550                });
551                self.post_event(body);
552            }
553            _ => {}
554        }
555    }
556}
557
558pub fn build_observer(
559    config: &LineageConfig,
560    entities: &[EntityConfig],
561    config_path: &str,
562) -> crate::FloeResult<Arc<dyn RunObserver>> {
563    let obs = OpenLineageObserver::new(config, entities, config_path)?;
564    Ok(Arc::new(obs))
565}
566
567impl OpenLineageObserver {
568    pub fn is_circuit_open(&self) -> bool {
569        self.circuit_open.load(Ordering::Relaxed)
570    }
571
572    pub fn consecutive_failures(&self) -> usize {
573        self.consecutive_failures.load(Ordering::Relaxed)
574    }
575}