1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use serde_json::{json, Value};
7
8use crate::config::{EntityConfig, LineageConfig};
9use crate::run::events::{RunEvent, RunObserver};
10
11const DEFAULT_PRODUCER: &str = concat!(
12 "https://github.com/malon64/floe/releases/tag/v",
13 env!("CARGO_PKG_VERSION")
14);
15
16#[derive(Clone)]
17struct ColumnMapping {
18 output_name: String,
19 column_type: String,
20 source_field: Option<String>,
21}
22
23struct EntityUris {
24 source: String,
25 accepted: String,
26 rejected: Option<String>,
27}
28
29pub struct OpenLineageObserver {
30 client: reqwest::blocking::Client,
31 config: LineageConfig,
32 entity_start_ms: Mutex<HashMap<String, u128>>,
33 entity_run_ids: Mutex<HashMap<String, String>>,
34 run_start_ms: Mutex<Option<u128>>,
35 entity_schemas: HashMap<String, Vec<ColumnMapping>>,
36 entity_uris: HashMap<String, EntityUris>,
37 run_job_name: String,
38 consecutive_failures: AtomicUsize,
39 circuit_open: AtomicBool,
40}
41
42impl OpenLineageObserver {
43 pub fn new(
44 config: &LineageConfig,
45 entities: &[EntityConfig],
46 config_path: &str,
47 ) -> crate::FloeResult<Self> {
48 let timeout = Duration::from_secs(config.timeout_secs.unwrap_or(5));
49 let client = reqwest::blocking::Client::builder()
50 .timeout(timeout)
51 .build()
52 .map_err(|e| {
53 Box::new(crate::errors::ConfigError(format!(
54 "lineage: failed to build HTTP client: {e}"
55 ))) as Box<dyn std::error::Error + Send + Sync>
56 })?;
57
58 let run_job_name = config
59 .job_name
60 .clone()
61 .filter(|s| !s.is_empty())
62 .unwrap_or_else(|| {
63 std::path::Path::new(config_path)
64 .file_stem()
65 .and_then(|s| s.to_str())
66 .unwrap_or("floe-run")
67 .to_string()
68 });
69
70 let entity_schemas = entities
71 .iter()
72 .map(|e| {
73 let fields: Vec<ColumnMapping> = e
74 .schema
75 .columns
76 .iter()
77 .map(|c| ColumnMapping {
78 output_name: c.name.clone(),
79 column_type: c.column_type.clone(),
80 source_field: c.source.clone(),
81 })
82 .collect();
83 (e.name.clone(), fields)
84 })
85 .collect();
86
87 let entity_uris = entities
88 .iter()
89 .map(|e| {
90 (
91 e.name.clone(),
92 EntityUris {
93 source: e.source.path.clone(),
94 accepted: e.sink.accepted.path.clone(),
95 rejected: e.sink.rejected.as_ref().map(|r| r.path.clone()),
96 },
97 )
98 })
99 .collect();
100
101 Ok(Self {
102 client,
103 config: config.clone(),
104 entity_start_ms: Mutex::new(HashMap::new()),
105 entity_run_ids: Mutex::new(HashMap::new()),
106 run_start_ms: Mutex::new(None),
107 entity_schemas,
108 entity_uris,
109 run_job_name,
110 consecutive_failures: AtomicUsize::new(0),
111 circuit_open: AtomicBool::new(false),
112 })
113 }
114
115 fn attempt_post(&self, url: &str, body: &Value) -> Result<(), bool> {
116 let mut req = self.client.post(url).json(body);
117 if let Some(api_key) = self.config.api_key.as_deref() {
118 req = req.bearer_auth(api_key);
119 }
120 match req.send() {
121 Err(_) => Err(true),
122 Ok(resp) => {
123 let status = resp.status();
124 if status.is_success() {
125 Ok(())
126 } else if status.as_u16() == 429 || status.is_server_error() {
127 Err(true)
128 } else {
129 Err(false)
130 }
131 }
132 }
133 }
134
135 fn post_event(&self, body: Value) {
136 if self.circuit_open.load(Ordering::Relaxed) {
137 return;
138 }
139
140 let url = format!("{}/api/v1/lineage", self.config.url.trim_end_matches('/'));
141 let max_failures = self.config.max_failures.unwrap_or(3) as usize;
142 let retry_delays_ms: &[u64] = &[0, 100, 500];
143
144 let mut succeeded = false;
145
146 'retry: for (attempt, &delay_ms) in retry_delays_ms.iter().enumerate() {
147 if delay_ms > 0 {
148 std::thread::sleep(Duration::from_millis(delay_ms));
149 }
150 match self.attempt_post(&url, &body) {
151 Ok(()) => {
152 self.consecutive_failures.store(0, Ordering::Relaxed);
153 succeeded = true;
154 break 'retry;
155 }
156 Err(is_retryable) => {
157 if !is_retryable || attempt == retry_delays_ms.len() - 1 {
158 break 'retry;
159 }
160 }
161 }
162 }
163
164 if succeeded {
165 return;
166 }
167
168 crate::warnings::emit(
170 "",
171 None,
172 None,
173 Some("lineage_http_error"),
174 &format!("lineage: POST {url} failed — event dropped"),
175 );
176
177 let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
178 if failures >= max_failures && !self.circuit_open.swap(true, Ordering::Relaxed) {
179 crate::warnings::emit(
180 "",
181 None,
182 None,
183 Some("lineage_circuit_open"),
184 &format!(
185 "lineage: disabled for this run after {failures} consecutive failures — check endpoint {url}"
186 ),
187 );
188 }
189 }
190
191 fn producer(&self) -> &str {
192 self.config.producer.as_deref().unwrap_or(DEFAULT_PRODUCER)
193 }
194
195 fn parent_run_facet(&self) -> Option<Value> {
196 if let Ok(run_id) = std::env::var("AIRFLOW_CTX_DAG_RUN_ID") {
197 let dag = std::env::var("AIRFLOW_CTX_DAG_ID").unwrap_or_default();
198 let task = std::env::var("AIRFLOW_CTX_TASK_ID").unwrap_or_default();
199 let job_name = if task.is_empty() {
200 dag.clone()
201 } else {
202 format!("{dag}.{task}")
203 };
204 return Some(json!({
205 "run": { "runId": run_id },
206 "job": {
207 "namespace": self.config.namespace,
208 "name": job_name
209 },
210 "_producer": self.producer(),
211 "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
212 }));
213 }
214 if let Ok(run_id) = std::env::var("DAGSTER_RUN_ID") {
215 let job = std::env::var("DAGSTER_JOB_NAME").unwrap_or_default();
216 return Some(json!({
217 "run": { "runId": run_id },
218 "job": {
219 "namespace": self.config.namespace,
220 "name": job
221 },
222 "_producer": self.producer(),
223 "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
224 }));
225 }
226 None
227 }
228
229 fn emit_entity_run_event(
230 &self,
231 entity_run_id: &str,
232 name: &str,
233 event_type: &str,
234 ts_ms: u128,
235 stats: Option<EntityStats>,
236 uris: Option<&EntityUris>,
237 ) {
238 let event_time = ms_to_iso8601(ts_ms);
239 let job_name = format!("{}.{name}", self.config.namespace);
240
241 let mut run_facets = json!({});
242 if let Some(parent) = self.parent_run_facet() {
243 run_facets["parent"] = parent;
244 }
245
246 let (inputs, outputs) = match (stats.as_ref(), uris) {
249 (Some(s), Some(u)) => {
250 let rejection_rate = if s.rows > 0 {
251 s.rejected as f64 / s.rows as f64
252 } else {
253 0.0
254 };
255
256 let (src_ns, src_path) = split_storage_uri(&u.source);
258 let inputs = json!([{
259 "namespace": format!("{}.source", self.config.namespace),
260 "name": name,
261 "facets": {
262 "symlinks": symlinks_facet(self.producer(), &src_ns, &src_path, "DIRECTORY")
263 }
264 }]);
265
266 let (acc_ns, acc_path) = split_storage_uri(&u.accepted);
268 let schema_facet = json!({
269 "fields": s.schema_fields.iter().map(|col| {
270 json!({ "name": col.output_name, "type": col.column_type })
271 }).collect::<Vec<_>>(),
272 "_producer": self.producer(),
273 "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json"
274 });
275 let accepted_dq_facet = json!({
276 "rowCount": s.accepted,
277 "validCount": s.accepted,
278 "invalidCount": 0u64,
279 "_producer": self.producer(),
280 "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/DataQualityMetricsOutputDatasetFacet.json"
281 });
282 let floe_facet = json!({
283 "entity": name,
284 "rejectionRate": rejection_rate,
285 "files": s.files,
286 "rows": s.rows,
287 "warnings": s.warnings,
288 "errors": s.errors,
289 "_producer": self.producer(),
290 "_schemaURL": "https://github.com/malon64/floe/schemas/FloeQualityRunFacet.json"
291 });
292
293 let mut accepted_facets = json!({
294 "symlinks": symlinks_facet(self.producer(), &acc_ns, &acc_path, "TABLE"),
295 "schema": schema_facet,
296 "dataQualityMetrics": accepted_dq_facet,
297 "floeQualityRun": floe_facet
298 });
299
300 if !s.schema_fields.is_empty() {
301 let fields_map: serde_json::Map<String, Value> = s
302 .schema_fields
303 .iter()
304 .map(|col| {
305 let src = col.source_field.as_deref().unwrap_or(&col.output_name);
306 let entry = json!({
307 "inputFields": [{
308 "namespace": format!("{}.source", self.config.namespace),
309 "name": name,
310 "field": src
311 }]
312 });
313 (col.output_name.clone(), entry)
314 })
315 .collect();
316 accepted_facets["columnLineage"] = json!({
317 "fields": fields_map,
318 "_producer": self.producer(),
319 "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ColumnLineageDatasetFacet.json"
320 });
321 }
322
323 let mut out = vec![json!({
324 "namespace": self.config.namespace,
325 "name": name,
326 "facets": accepted_facets
327 })];
328
329 if let Some(ref rej) = u.rejected {
331 let (rej_ns, rej_path) = split_storage_uri(rej);
332 let rejected_dq_facet = json!({
333 "rowCount": s.rejected,
334 "validCount": 0u64,
335 "invalidCount": s.rejected,
336 "_producer": self.producer(),
337 "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/DataQualityMetricsOutputDatasetFacet.json"
338 });
339 out.push(json!({
340 "namespace": format!("{}.rejected", self.config.namespace),
341 "name": name,
342 "facets": {
343 "symlinks": symlinks_facet(self.producer(), &rej_ns, &rej_path, "DIRECTORY"),
344 "dataQualityMetrics": rejected_dq_facet
345 }
346 }));
347 }
348
349 (inputs, json!(out))
350 }
351 _ => (json!([]), json!([])),
352 };
353
354 let body = json!({
355 "eventType": event_type,
356 "eventTime": event_time,
357 "run": {
358 "runId": entity_run_id,
359 "facets": run_facets
360 },
361 "job": {
362 "namespace": self.config.namespace,
363 "name": job_name,
364 "facets": {}
365 },
366 "inputs": inputs,
367 "outputs": outputs,
368 "producer": self.producer(),
369 "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
370 });
371 self.post_event(body);
372 }
373}
374
375struct EntityStats {
376 files: u64,
377 rows: u64,
378 accepted: u64,
379 rejected: u64,
380 warnings: u64,
381 errors: u64,
382 schema_fields: Vec<ColumnMapping>,
383}
384
385fn ms_to_iso8601(ms: u128) -> String {
386 let secs = (ms / 1000) as i64;
387 let nanos = ((ms % 1000) * 1_000_000) as i64;
388 match time::OffsetDateTime::from_unix_timestamp(secs) {
389 Ok(dt) => {
390 let ns = time::Duration::nanoseconds(nanos);
391 let dt = dt.saturating_add(ns);
392 dt.format(&time::format_description::well_known::Rfc3339)
393 .unwrap_or_else(|_| ms.to_string())
394 }
395 Err(_) => ms.to_string(),
396 }
397}
398
399fn split_storage_uri(uri: &str) -> (String, String) {
400 let cloud_prefixes = ["s3://", "gs://", "gcs://", "az://", "abfss://", "abfs://"];
402 for prefix in cloud_prefixes {
403 if let Some(after_scheme) = uri.strip_prefix(prefix) {
404 if let Some(slash) = after_scheme.find('/') {
405 let authority = uri[..prefix.len() + slash].to_string();
406 let path = after_scheme[slash..].to_string();
407 return (authority, path);
408 }
409 return (uri.to_string(), "/".to_string());
410 }
411 }
412 ("file".to_string(), uri.to_string())
413}
414
415fn symlinks_facet(producer: &str, namespace: &str, name: &str, ds_type: &str) -> Value {
416 json!({
417 "identifiers": [{ "namespace": namespace, "name": name, "type": ds_type }],
418 "_producer": producer,
419 "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SymlinksDatasetFacet.json"
420 })
421}
422
423impl RunObserver for OpenLineageObserver {
424 fn on_event(&self, event: RunEvent) {
425 match event {
426 RunEvent::RunStarted { run_id, ts_ms, .. } => {
427 self.consecutive_failures.store(0, Ordering::Relaxed);
430 self.circuit_open.store(false, Ordering::Relaxed);
431 if let Ok(mut guard) = self.run_start_ms.lock() {
432 *guard = Some(ts_ms);
433 }
434 let event_time = ms_to_iso8601(ts_ms);
435 let mut run_facets = json!({});
436 if let Some(parent) = self.parent_run_facet() {
437 run_facets["parent"] = parent;
438 }
439 let body = json!({
440 "eventType": "START",
441 "eventTime": event_time,
442 "run": {
443 "runId": run_id,
444 "facets": run_facets
445 },
446 "job": {
447 "namespace": self.config.namespace,
448 "name": self.run_job_name,
449 "facets": {}
450 },
451 "inputs": [],
452 "outputs": [],
453 "producer": self.producer(),
454 "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
455 });
456 self.post_event(body);
457 }
458 RunEvent::EntityStarted {
459 run_id,
460 name,
461 ts_ms,
462 } => {
463 let entity_run_id = format!("{run_id}.entity.{name}");
466 if let Ok(mut guard) = self.entity_start_ms.lock() {
467 guard.insert(name.clone(), ts_ms);
468 }
469 if let Ok(mut guard) = self.entity_run_ids.lock() {
470 guard.insert(name.clone(), entity_run_id.clone());
471 }
472 self.emit_entity_run_event(&entity_run_id, &name, "START", ts_ms, None, None);
473 }
474 RunEvent::EntityFinished {
475 run_id,
476 name,
477 status,
478 files,
479 files_skipped: _,
480 rows,
481 accepted,
482 rejected,
483 warnings,
484 errors,
485 ts_ms,
486 } => {
487 let entity_run_id = self
488 .entity_run_ids
489 .lock()
490 .ok()
491 .and_then(|g| g.get(&name).cloned())
492 .unwrap_or_else(|| format!("{run_id}.entity.{name}"));
493 let event_type = if status == "failed" || status == "aborted" {
494 "FAIL"
495 } else {
496 "COMPLETE"
497 };
498 let schema_fields = self.entity_schemas.get(&name).cloned().unwrap_or_default();
499 let stats = EntityStats {
500 files,
501 rows,
502 accepted,
503 rejected,
504 warnings,
505 errors,
506 schema_fields,
507 };
508 let uris = self.entity_uris.get(&name);
509 self.emit_entity_run_event(
510 &entity_run_id,
511 &name,
512 event_type,
513 ts_ms,
514 Some(stats),
515 uris,
516 );
517 }
518 RunEvent::RunFinished {
519 run_id,
520 status,
521 ts_ms,
522 ..
523 } => {
524 let event_type = if status == "failed" || status == "aborted" {
525 "FAIL"
526 } else {
527 "COMPLETE"
528 };
529 let event_time = ms_to_iso8601(ts_ms);
530 let mut run_facets = json!({});
531 if let Some(parent) = self.parent_run_facet() {
532 run_facets["parent"] = parent;
533 }
534 let body = json!({
535 "eventType": event_type,
536 "eventTime": event_time,
537 "run": {
538 "runId": run_id,
539 "facets": run_facets
540 },
541 "job": {
542 "namespace": self.config.namespace,
543 "name": self.run_job_name,
544 "facets": {}
545 },
546 "inputs": [],
547 "outputs": [],
548 "producer": self.producer(),
549 "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
550 });
551 self.post_event(body);
552 }
553 _ => {}
554 }
555 }
556}
557
558pub fn build_observer(
559 config: &LineageConfig,
560 entities: &[EntityConfig],
561 config_path: &str,
562) -> crate::FloeResult<Arc<dyn RunObserver>> {
563 let obs = OpenLineageObserver::new(config, entities, config_path)?;
564 Ok(Arc::new(obs))
565}
566
567impl OpenLineageObserver {
568 pub fn is_circuit_open(&self) -> bool {
569 self.circuit_open.load(Ordering::Relaxed)
570 }
571
572 pub fn consecutive_failures(&self) -> usize {
573 self.consecutive_failures.load(Ordering::Relaxed)
574 }
575}