Skip to main content

floe_core/lineage/
mod.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use serde_json::{json, Value};
6
7use crate::config::{EntityConfig, LineageConfig};
8use crate::run::events::{RunEvent, RunObserver};
9
10pub struct OpenLineageObserver {
11    client: reqwest::blocking::Client,
12    config: LineageConfig,
13    entity_start_ms: Mutex<HashMap<String, u128>>,
14    entity_run_ids: Mutex<HashMap<String, String>>,
15    run_start_ms: Mutex<Option<u128>>,
16    entity_schemas: HashMap<String, Vec<(String, String)>>,
17}
18
19impl OpenLineageObserver {
20    pub fn new(config: &LineageConfig, entities: &[EntityConfig]) -> crate::FloeResult<Self> {
21        let timeout = Duration::from_secs(config.timeout_secs.unwrap_or(5));
22        let client = reqwest::blocking::Client::builder()
23            .timeout(timeout)
24            .build()
25            .map_err(|e| {
26                Box::new(crate::errors::ConfigError(format!(
27                    "lineage: failed to build HTTP client: {e}"
28                ))) as Box<dyn std::error::Error + Send + Sync>
29            })?;
30
31        let entity_schemas = entities
32            .iter()
33            .map(|e| {
34                let fields: Vec<(String, String)> = e
35                    .schema
36                    .columns
37                    .iter()
38                    .map(|c| (c.name.clone(), c.column_type.clone()))
39                    .collect();
40                (e.name.clone(), fields)
41            })
42            .collect();
43
44        Ok(Self {
45            client,
46            config: config.clone(),
47            entity_start_ms: Mutex::new(HashMap::new()),
48            entity_run_ids: Mutex::new(HashMap::new()),
49            run_start_ms: Mutex::new(None),
50            entity_schemas,
51        })
52    }
53
54    fn post_event(&self, body: Value) {
55        let url = format!("{}/api/v1/lineage", self.config.url.trim_end_matches('/'));
56        let mut req = self.client.post(&url).json(&body);
57        if let Some(api_key) = self.config.api_key.as_deref() {
58            req = req.bearer_auth(api_key);
59        }
60        match req.send() {
61            Err(e) => {
62                crate::warnings::emit(
63                    "",
64                    None,
65                    None,
66                    Some("lineage_http_error"),
67                    &format!("lineage: POST {url} failed: {e}"),
68                );
69            }
70            Ok(resp) => {
71                if let Err(e) = resp.error_for_status() {
72                    crate::warnings::emit(
73                        "",
74                        None,
75                        None,
76                        Some("lineage_http_error"),
77                        &format!("lineage: POST {url} returned error status: {e}"),
78                    );
79                }
80            }
81        }
82    }
83
84    fn producer(&self) -> &str {
85        self.config
86            .producer
87            .as_deref()
88            .unwrap_or("https://github.com/malon64/floe")
89    }
90
91    fn parent_run_facet(&self) -> Option<Value> {
92        if let Ok(run_id) = std::env::var("AIRFLOW_CTX_DAG_RUN_ID") {
93            let dag = std::env::var("AIRFLOW_CTX_DAG_ID").unwrap_or_default();
94            let task = std::env::var("AIRFLOW_CTX_TASK_ID").unwrap_or_default();
95            let job_name = if task.is_empty() {
96                dag.clone()
97            } else {
98                format!("{dag}.{task}")
99            };
100            return Some(json!({
101                "run": { "runId": run_id },
102                "job": {
103                    "namespace": self.config.namespace,
104                    "name": job_name
105                },
106                "_producer": self.producer(),
107                "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
108            }));
109        }
110        if let Ok(run_id) = std::env::var("DAGSTER_RUN_ID") {
111            let job = std::env::var("DAGSTER_JOB_NAME").unwrap_or_default();
112            return Some(json!({
113                "run": { "runId": run_id },
114                "job": {
115                    "namespace": self.config.namespace,
116                    "name": job
117                },
118                "_producer": self.producer(),
119                "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
120            }));
121        }
122        None
123    }
124
125    fn emit_entity_run_event(
126        &self,
127        entity_run_id: &str,
128        name: &str,
129        event_type: &str,
130        ts_ms: u128,
131        stats: Option<EntityStats>,
132    ) {
133        let event_time = ms_to_iso8601(ts_ms);
134        let job_name = format!("{}.{name}", self.config.namespace);
135
136        let mut run_facets = json!({});
137        if let Some(parent) = self.parent_run_facet() {
138            run_facets["parent"] = parent;
139        }
140
141        // Build inputs with all facets for COMPLETE events; empty for START.
142        let inputs = if let Some(ref s) = stats {
143            let rejection_rate = if s.rows > 0 {
144                s.rejected as f64 / s.rows as f64
145            } else {
146                0.0
147            };
148            let schema_facet = json!({
149                "fields": s.schema_fields.iter().map(|(col_name, col_type)| {
150                    json!({ "name": col_name, "type": col_type })
151                }).collect::<Vec<_>>(),
152                "_producer": self.producer(),
153                "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json"
154            });
155            let dq_facet = json!({
156                "rowCount": s.rows,
157                "validCount": s.accepted,
158                "invalidCount": s.rejected,
159                "_producer": self.producer(),
160                "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/DataQualityMetricsInputDatasetFacet.json"
161            });
162            let floe_facet = json!({
163                "entity": name,
164                "rejectionRate": rejection_rate,
165                "files": s.files,
166                "rows": s.rows,
167                "accepted": s.accepted,
168                "rejected": s.rejected,
169                "warnings": s.warnings,
170                "errors": s.errors,
171                "_producer": self.producer(),
172                "_schemaURL": "https://github.com/malon64/floe/schemas/FloeQualityRunFacet.json"
173            });
174            json!([{
175                "namespace": self.config.namespace,
176                "name": name,
177                "facets": {
178                    "schema": schema_facet,
179                    "dataQualityMetrics": dq_facet,
180                    "floeQualityRun": floe_facet
181                }
182            }])
183        } else {
184            json!([])
185        };
186
187        let body = json!({
188            "eventType": event_type,
189            "eventTime": event_time,
190            "run": {
191                "runId": entity_run_id,
192                "facets": run_facets
193            },
194            "job": {
195                "namespace": self.config.namespace,
196                "name": job_name,
197                "facets": {}
198            },
199            "inputs": inputs,
200            "outputs": [],
201            "producer": self.producer(),
202            "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
203        });
204        self.post_event(body);
205    }
206}
207
208struct EntityStats {
209    files: u64,
210    rows: u64,
211    accepted: u64,
212    rejected: u64,
213    warnings: u64,
214    errors: u64,
215    schema_fields: Vec<(String, String)>,
216}
217
218fn ms_to_iso8601(ms: u128) -> String {
219    let secs = (ms / 1000) as i64;
220    let nanos = ((ms % 1000) * 1_000_000) as i64;
221    match time::OffsetDateTime::from_unix_timestamp(secs) {
222        Ok(dt) => {
223            let ns = time::Duration::nanoseconds(nanos);
224            let dt = dt.saturating_add(ns);
225            dt.format(&time::format_description::well_known::Rfc3339)
226                .unwrap_or_else(|_| ms.to_string())
227        }
228        Err(_) => ms.to_string(),
229    }
230}
231
232impl RunObserver for OpenLineageObserver {
233    fn on_event(&self, event: RunEvent) {
234        match event {
235            RunEvent::RunStarted { run_id, ts_ms, .. } => {
236                if let Ok(mut guard) = self.run_start_ms.lock() {
237                    *guard = Some(ts_ms);
238                }
239                let event_time = ms_to_iso8601(ts_ms);
240                let mut run_facets = json!({});
241                if let Some(parent) = self.parent_run_facet() {
242                    run_facets["parent"] = parent;
243                }
244                let body = json!({
245                    "eventType": "START",
246                    "eventTime": event_time,
247                    "run": {
248                        "runId": run_id,
249                        "facets": run_facets
250                    },
251                    "job": {
252                        "namespace": self.config.namespace,
253                        "name": run_id,
254                        "facets": {}
255                    },
256                    "inputs": [],
257                    "outputs": [],
258                    "producer": self.producer(),
259                    "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
260                });
261                self.post_event(body);
262            }
263            RunEvent::EntityStarted {
264                run_id,
265                name,
266                ts_ms,
267            } => {
268                // Use a per-entity run_id (overall_run_id.entity.name) so that
269                // the START and COMPLETE events for the same entity share the same run_id.
270                let entity_run_id = format!("{run_id}.entity.{name}");
271                if let Ok(mut guard) = self.entity_start_ms.lock() {
272                    guard.insert(name.clone(), ts_ms);
273                }
274                if let Ok(mut guard) = self.entity_run_ids.lock() {
275                    guard.insert(name.clone(), entity_run_id.clone());
276                }
277                self.emit_entity_run_event(&entity_run_id, &name, "START", ts_ms, None);
278            }
279            RunEvent::EntityFinished {
280                run_id,
281                name,
282                status,
283                files,
284                rows,
285                accepted,
286                rejected,
287                warnings,
288                errors,
289                ts_ms,
290            } => {
291                let entity_run_id = self
292                    .entity_run_ids
293                    .lock()
294                    .ok()
295                    .and_then(|g| g.get(&name).cloned())
296                    .unwrap_or_else(|| format!("{run_id}.entity.{name}"));
297                let event_type = if status == "failed" || status == "aborted" {
298                    "FAIL"
299                } else {
300                    "COMPLETE"
301                };
302                let schema_fields = self.entity_schemas.get(&name).cloned().unwrap_or_default();
303                let stats = EntityStats {
304                    files,
305                    rows,
306                    accepted,
307                    rejected,
308                    warnings,
309                    errors,
310                    schema_fields,
311                };
312                self.emit_entity_run_event(&entity_run_id, &name, event_type, ts_ms, Some(stats));
313            }
314            RunEvent::RunFinished {
315                run_id,
316                status,
317                ts_ms,
318                ..
319            } => {
320                let event_type = if status == "failed" || status == "aborted" {
321                    "FAIL"
322                } else {
323                    "COMPLETE"
324                };
325                let event_time = ms_to_iso8601(ts_ms);
326                let mut run_facets = json!({});
327                if let Some(parent) = self.parent_run_facet() {
328                    run_facets["parent"] = parent;
329                }
330                let body = json!({
331                    "eventType": event_type,
332                    "eventTime": event_time,
333                    "run": {
334                        "runId": run_id,
335                        "facets": run_facets
336                    },
337                    "job": {
338                        "namespace": self.config.namespace,
339                        "name": run_id,
340                        "facets": {}
341                    },
342                    "inputs": [],
343                    "outputs": [],
344                    "producer": self.producer(),
345                    "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
346                });
347                self.post_event(body);
348            }
349            _ => {}
350        }
351    }
352}
353
354pub fn build_observer(
355    config: &LineageConfig,
356    entities: &[EntityConfig],
357) -> crate::FloeResult<Arc<dyn RunObserver>> {
358    let obs = OpenLineageObserver::new(config, entities)?;
359    Ok(Arc::new(obs))
360}