1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use serde_json::{json, Value};
7
8use crate::config::{EntityConfig, LineageConfig};
9use crate::run::events::{RunEvent, RunObserver};
10
11struct EntityUris {
12 source: String,
13 accepted: String,
14 rejected: Option<String>,
15}
16
17pub struct OpenLineageObserver {
18 client: reqwest::blocking::Client,
19 config: LineageConfig,
20 entity_start_ms: Mutex<HashMap<String, u128>>,
21 entity_run_ids: Mutex<HashMap<String, String>>,
22 run_start_ms: Mutex<Option<u128>>,
23 entity_schemas: HashMap<String, Vec<(String, String)>>,
24 entity_uris: HashMap<String, EntityUris>,
25 consecutive_failures: AtomicUsize,
26 circuit_open: AtomicBool,
27}
28
29impl OpenLineageObserver {
30 pub fn new(config: &LineageConfig, entities: &[EntityConfig]) -> crate::FloeResult<Self> {
31 let timeout = Duration::from_secs(config.timeout_secs.unwrap_or(5));
32 let client = reqwest::blocking::Client::builder()
33 .timeout(timeout)
34 .build()
35 .map_err(|e| {
36 Box::new(crate::errors::ConfigError(format!(
37 "lineage: failed to build HTTP client: {e}"
38 ))) as Box<dyn std::error::Error + Send + Sync>
39 })?;
40
41 let entity_schemas = entities
42 .iter()
43 .map(|e| {
44 let fields: Vec<(String, String)> = e
45 .schema
46 .columns
47 .iter()
48 .map(|c| (c.name.clone(), c.column_type.clone()))
49 .collect();
50 (e.name.clone(), fields)
51 })
52 .collect();
53
54 let entity_uris = entities
55 .iter()
56 .map(|e| {
57 (
58 e.name.clone(),
59 EntityUris {
60 source: e.source.path.clone(),
61 accepted: e.sink.accepted.path.clone(),
62 rejected: e.sink.rejected.as_ref().map(|r| r.path.clone()),
63 },
64 )
65 })
66 .collect();
67
68 Ok(Self {
69 client,
70 config: config.clone(),
71 entity_start_ms: Mutex::new(HashMap::new()),
72 entity_run_ids: Mutex::new(HashMap::new()),
73 run_start_ms: Mutex::new(None),
74 entity_schemas,
75 entity_uris,
76 consecutive_failures: AtomicUsize::new(0),
77 circuit_open: AtomicBool::new(false),
78 })
79 }
80
81 fn attempt_post(&self, url: &str, body: &Value) -> Result<(), bool> {
82 let mut req = self.client.post(url).json(body);
83 if let Some(api_key) = self.config.api_key.as_deref() {
84 req = req.bearer_auth(api_key);
85 }
86 match req.send() {
87 Err(_) => Err(true),
88 Ok(resp) => {
89 let status = resp.status();
90 if status.is_success() {
91 Ok(())
92 } else if status.as_u16() == 429 || status.is_server_error() {
93 Err(true)
94 } else {
95 Err(false)
96 }
97 }
98 }
99 }
100
101 fn post_event(&self, body: Value) {
102 if self.circuit_open.load(Ordering::Relaxed) {
103 return;
104 }
105
106 let url = format!("{}/api/v1/lineage", self.config.url.trim_end_matches('/'));
107 let max_failures = self.config.max_failures.unwrap_or(3) as usize;
108 let retry_delays_ms: &[u64] = &[0, 100, 500];
109
110 let mut succeeded = false;
111
112 'retry: for (attempt, &delay_ms) in retry_delays_ms.iter().enumerate() {
113 if delay_ms > 0 {
114 std::thread::sleep(Duration::from_millis(delay_ms));
115 }
116 match self.attempt_post(&url, &body) {
117 Ok(()) => {
118 self.consecutive_failures.store(0, Ordering::Relaxed);
119 succeeded = true;
120 break 'retry;
121 }
122 Err(is_retryable) => {
123 if !is_retryable || attempt == retry_delays_ms.len() - 1 {
124 break 'retry;
125 }
126 }
127 }
128 }
129
130 if succeeded {
131 return;
132 }
133
134 crate::warnings::emit(
136 "",
137 None,
138 None,
139 Some("lineage_http_error"),
140 &format!("lineage: POST {url} failed — event dropped"),
141 );
142
143 let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
144 if failures >= max_failures && !self.circuit_open.swap(true, Ordering::Relaxed) {
145 crate::warnings::emit(
146 "",
147 None,
148 None,
149 Some("lineage_circuit_open"),
150 &format!(
151 "lineage: disabled for this run after {failures} consecutive failures — check endpoint {url}"
152 ),
153 );
154 }
155 }
156
157 fn producer(&self) -> &str {
158 self.config
159 .producer
160 .as_deref()
161 .unwrap_or("https://github.com/malon64/floe")
162 }
163
164 fn parent_run_facet(&self) -> Option<Value> {
165 if let Ok(run_id) = std::env::var("AIRFLOW_CTX_DAG_RUN_ID") {
166 let dag = std::env::var("AIRFLOW_CTX_DAG_ID").unwrap_or_default();
167 let task = std::env::var("AIRFLOW_CTX_TASK_ID").unwrap_or_default();
168 let job_name = if task.is_empty() {
169 dag.clone()
170 } else {
171 format!("{dag}.{task}")
172 };
173 return Some(json!({
174 "run": { "runId": run_id },
175 "job": {
176 "namespace": self.config.namespace,
177 "name": job_name
178 },
179 "_producer": self.producer(),
180 "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
181 }));
182 }
183 if let Ok(run_id) = std::env::var("DAGSTER_RUN_ID") {
184 let job = std::env::var("DAGSTER_JOB_NAME").unwrap_or_default();
185 return Some(json!({
186 "run": { "runId": run_id },
187 "job": {
188 "namespace": self.config.namespace,
189 "name": job
190 },
191 "_producer": self.producer(),
192 "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
193 }));
194 }
195 None
196 }
197
198 fn emit_entity_run_event(
199 &self,
200 entity_run_id: &str,
201 name: &str,
202 event_type: &str,
203 ts_ms: u128,
204 stats: Option<EntityStats>,
205 uris: Option<&EntityUris>,
206 ) {
207 let event_time = ms_to_iso8601(ts_ms);
208 let job_name = format!("{}.{name}", self.config.namespace);
209
210 let mut run_facets = json!({});
211 if let Some(parent) = self.parent_run_facet() {
212 run_facets["parent"] = parent;
213 }
214
215 let inputs = match (stats.as_ref(), uris) {
217 (Some(s), Some(u)) => {
218 let rejection_rate = if s.rows > 0 {
219 s.rejected as f64 / s.rows as f64
220 } else {
221 0.0
222 };
223 let schema_facet = json!({
224 "fields": s.schema_fields.iter().map(|(col_name, col_type)| {
225 json!({ "name": col_name, "type": col_type })
226 }).collect::<Vec<_>>(),
227 "_producer": self.producer(),
228 "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json"
229 });
230 let dq_facet = json!({
231 "rowCount": s.rows,
232 "validCount": s.accepted,
233 "invalidCount": s.rejected,
234 "_producer": self.producer(),
235 "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/DataQualityMetricsInputDatasetFacet.json"
236 });
237 let floe_facet = json!({
238 "entity": name,
239 "rejectionRate": rejection_rate,
240 "files": s.files,
241 "rows": s.rows,
242 "accepted": s.accepted,
243 "rejected": s.rejected,
244 "warnings": s.warnings,
245 "errors": s.errors,
246 "_producer": self.producer(),
247 "_schemaURL": "https://github.com/malon64/floe/schemas/FloeQualityRunFacet.json"
248 });
249 json!([{
250 "namespace": self.config.namespace,
251 "name": u.source,
252 "facets": {
253 "schema": schema_facet,
254 "dataQualityMetrics": dq_facet,
255 "floeQualityRun": floe_facet
256 }
257 }])
258 }
259 _ => json!([]),
260 };
261
262 let outputs = match uris {
264 Some(u) => {
265 let mut out = vec![json!({
266 "namespace": self.config.namespace,
267 "name": u.accepted,
268 "facets": {}
269 })];
270 if let Some(ref rej) = u.rejected {
271 out.push(json!({
272 "namespace": self.config.namespace,
273 "name": rej,
274 "facets": {}
275 }));
276 }
277 json!(out)
278 }
279 None => json!([]),
280 };
281
282 let body = json!({
283 "eventType": event_type,
284 "eventTime": event_time,
285 "run": {
286 "runId": entity_run_id,
287 "facets": run_facets
288 },
289 "job": {
290 "namespace": self.config.namespace,
291 "name": job_name,
292 "facets": {}
293 },
294 "inputs": inputs,
295 "outputs": outputs,
296 "producer": self.producer(),
297 "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
298 });
299 self.post_event(body);
300 }
301}
302
303struct EntityStats {
304 files: u64,
305 rows: u64,
306 accepted: u64,
307 rejected: u64,
308 warnings: u64,
309 errors: u64,
310 schema_fields: Vec<(String, String)>,
311}
312
313fn ms_to_iso8601(ms: u128) -> String {
314 let secs = (ms / 1000) as i64;
315 let nanos = ((ms % 1000) * 1_000_000) as i64;
316 match time::OffsetDateTime::from_unix_timestamp(secs) {
317 Ok(dt) => {
318 let ns = time::Duration::nanoseconds(nanos);
319 let dt = dt.saturating_add(ns);
320 dt.format(&time::format_description::well_known::Rfc3339)
321 .unwrap_or_else(|_| ms.to_string())
322 }
323 Err(_) => ms.to_string(),
324 }
325}
326
327impl RunObserver for OpenLineageObserver {
328 fn on_event(&self, event: RunEvent) {
329 match event {
330 RunEvent::RunStarted { run_id, ts_ms, .. } => {
331 self.consecutive_failures.store(0, Ordering::Relaxed);
334 self.circuit_open.store(false, Ordering::Relaxed);
335 if let Ok(mut guard) = self.run_start_ms.lock() {
336 *guard = Some(ts_ms);
337 }
338 let event_time = ms_to_iso8601(ts_ms);
339 let mut run_facets = json!({});
340 if let Some(parent) = self.parent_run_facet() {
341 run_facets["parent"] = parent;
342 }
343 let body = json!({
344 "eventType": "START",
345 "eventTime": event_time,
346 "run": {
347 "runId": run_id,
348 "facets": run_facets
349 },
350 "job": {
351 "namespace": self.config.namespace,
352 "name": run_id,
353 "facets": {}
354 },
355 "inputs": [],
356 "outputs": [],
357 "producer": self.producer(),
358 "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
359 });
360 self.post_event(body);
361 }
362 RunEvent::EntityStarted {
363 run_id,
364 name,
365 ts_ms,
366 } => {
367 let entity_run_id = format!("{run_id}.entity.{name}");
370 if let Ok(mut guard) = self.entity_start_ms.lock() {
371 guard.insert(name.clone(), ts_ms);
372 }
373 if let Ok(mut guard) = self.entity_run_ids.lock() {
374 guard.insert(name.clone(), entity_run_id.clone());
375 }
376 self.emit_entity_run_event(&entity_run_id, &name, "START", ts_ms, None, None);
377 }
378 RunEvent::EntityFinished {
379 run_id,
380 name,
381 status,
382 files,
383 files_skipped: _,
384 rows,
385 accepted,
386 rejected,
387 warnings,
388 errors,
389 ts_ms,
390 } => {
391 let entity_run_id = self
392 .entity_run_ids
393 .lock()
394 .ok()
395 .and_then(|g| g.get(&name).cloned())
396 .unwrap_or_else(|| format!("{run_id}.entity.{name}"));
397 let event_type = if status == "failed" || status == "aborted" {
398 "FAIL"
399 } else {
400 "COMPLETE"
401 };
402 let schema_fields = self.entity_schemas.get(&name).cloned().unwrap_or_default();
403 let stats = EntityStats {
404 files,
405 rows,
406 accepted,
407 rejected,
408 warnings,
409 errors,
410 schema_fields,
411 };
412 let uris = self.entity_uris.get(&name);
413 self.emit_entity_run_event(
414 &entity_run_id,
415 &name,
416 event_type,
417 ts_ms,
418 Some(stats),
419 uris,
420 );
421 }
422 RunEvent::RunFinished {
423 run_id,
424 status,
425 ts_ms,
426 ..
427 } => {
428 let event_type = if status == "failed" || status == "aborted" {
429 "FAIL"
430 } else {
431 "COMPLETE"
432 };
433 let event_time = ms_to_iso8601(ts_ms);
434 let mut run_facets = json!({});
435 if let Some(parent) = self.parent_run_facet() {
436 run_facets["parent"] = parent;
437 }
438 let body = json!({
439 "eventType": event_type,
440 "eventTime": event_time,
441 "run": {
442 "runId": run_id,
443 "facets": run_facets
444 },
445 "job": {
446 "namespace": self.config.namespace,
447 "name": run_id,
448 "facets": {}
449 },
450 "inputs": [],
451 "outputs": [],
452 "producer": self.producer(),
453 "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
454 });
455 self.post_event(body);
456 }
457 _ => {}
458 }
459 }
460}
461
462pub fn build_observer(
463 config: &LineageConfig,
464 entities: &[EntityConfig],
465) -> crate::FloeResult<Arc<dyn RunObserver>> {
466 let obs = OpenLineageObserver::new(config, entities)?;
467 Ok(Arc::new(obs))
468}
469
470impl OpenLineageObserver {
471 pub fn is_circuit_open(&self) -> bool {
472 self.circuit_open.load(Ordering::Relaxed)
473 }
474
475 pub fn consecutive_failures(&self) -> usize {
476 self.consecutive_failures.load(Ordering::Relaxed)
477 }
478}