1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use serde_json::{json, Value};
7
8use crate::config::{EntityConfig, LineageConfig};
9use crate::run::events::{RunEvent, RunObserver};
10
11pub struct OpenLineageObserver {
12 client: reqwest::blocking::Client,
13 config: LineageConfig,
14 entity_start_ms: Mutex<HashMap<String, u128>>,
15 entity_run_ids: Mutex<HashMap<String, String>>,
16 run_start_ms: Mutex<Option<u128>>,
17 entity_schemas: HashMap<String, Vec<(String, String)>>,
18 consecutive_failures: AtomicUsize,
19 circuit_open: AtomicBool,
20}
21
22impl OpenLineageObserver {
23 pub fn new(config: &LineageConfig, entities: &[EntityConfig]) -> crate::FloeResult<Self> {
24 let timeout = Duration::from_secs(config.timeout_secs.unwrap_or(5));
25 let client = reqwest::blocking::Client::builder()
26 .timeout(timeout)
27 .build()
28 .map_err(|e| {
29 Box::new(crate::errors::ConfigError(format!(
30 "lineage: failed to build HTTP client: {e}"
31 ))) as Box<dyn std::error::Error + Send + Sync>
32 })?;
33
34 let entity_schemas = entities
35 .iter()
36 .map(|e| {
37 let fields: Vec<(String, String)> = e
38 .schema
39 .columns
40 .iter()
41 .map(|c| (c.name.clone(), c.column_type.clone()))
42 .collect();
43 (e.name.clone(), fields)
44 })
45 .collect();
46
47 Ok(Self {
48 client,
49 config: config.clone(),
50 entity_start_ms: Mutex::new(HashMap::new()),
51 entity_run_ids: Mutex::new(HashMap::new()),
52 run_start_ms: Mutex::new(None),
53 entity_schemas,
54 consecutive_failures: AtomicUsize::new(0),
55 circuit_open: AtomicBool::new(false),
56 })
57 }
58
59 fn attempt_post(&self, url: &str, body: &Value) -> Result<(), bool> {
60 let mut req = self.client.post(url).json(body);
61 if let Some(api_key) = self.config.api_key.as_deref() {
62 req = req.bearer_auth(api_key);
63 }
64 match req.send() {
65 Err(_) => Err(true),
66 Ok(resp) => {
67 let status = resp.status();
68 if status.is_success() {
69 Ok(())
70 } else if status.as_u16() == 429 || status.is_server_error() {
71 Err(true)
72 } else {
73 Err(false)
74 }
75 }
76 }
77 }
78
79 fn post_event(&self, body: Value) {
80 if self.circuit_open.load(Ordering::Relaxed) {
81 return;
82 }
83
84 let url = format!("{}/api/v1/lineage", self.config.url.trim_end_matches('/'));
85 let max_failures = self.config.max_failures.unwrap_or(3) as usize;
86 let retry_delays_ms: &[u64] = &[0, 100, 500];
87
88 let mut succeeded = false;
89
90 'retry: for (attempt, &delay_ms) in retry_delays_ms.iter().enumerate() {
91 if delay_ms > 0 {
92 std::thread::sleep(Duration::from_millis(delay_ms));
93 }
94 match self.attempt_post(&url, &body) {
95 Ok(()) => {
96 self.consecutive_failures.store(0, Ordering::Relaxed);
97 succeeded = true;
98 break 'retry;
99 }
100 Err(is_retryable) => {
101 if !is_retryable || attempt == retry_delays_ms.len() - 1 {
102 break 'retry;
103 }
104 }
105 }
106 }
107
108 if succeeded {
109 return;
110 }
111
112 crate::warnings::emit(
114 "",
115 None,
116 None,
117 Some("lineage_http_error"),
118 &format!("lineage: POST {url} failed — event dropped"),
119 );
120
121 let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
122 if failures >= max_failures && !self.circuit_open.swap(true, Ordering::Relaxed) {
123 crate::warnings::emit(
124 "",
125 None,
126 None,
127 Some("lineage_circuit_open"),
128 &format!(
129 "lineage: disabled for this run after {failures} consecutive failures — check endpoint {url}"
130 ),
131 );
132 }
133 }
134
135 fn producer(&self) -> &str {
136 self.config
137 .producer
138 .as_deref()
139 .unwrap_or("https://github.com/malon64/floe")
140 }
141
142 fn parent_run_facet(&self) -> Option<Value> {
143 if let Ok(run_id) = std::env::var("AIRFLOW_CTX_DAG_RUN_ID") {
144 let dag = std::env::var("AIRFLOW_CTX_DAG_ID").unwrap_or_default();
145 let task = std::env::var("AIRFLOW_CTX_TASK_ID").unwrap_or_default();
146 let job_name = if task.is_empty() {
147 dag.clone()
148 } else {
149 format!("{dag}.{task}")
150 };
151 return Some(json!({
152 "run": { "runId": run_id },
153 "job": {
154 "namespace": self.config.namespace,
155 "name": job_name
156 },
157 "_producer": self.producer(),
158 "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
159 }));
160 }
161 if let Ok(run_id) = std::env::var("DAGSTER_RUN_ID") {
162 let job = std::env::var("DAGSTER_JOB_NAME").unwrap_or_default();
163 return Some(json!({
164 "run": { "runId": run_id },
165 "job": {
166 "namespace": self.config.namespace,
167 "name": job
168 },
169 "_producer": self.producer(),
170 "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json"
171 }));
172 }
173 None
174 }
175
176 fn emit_entity_run_event(
177 &self,
178 entity_run_id: &str,
179 name: &str,
180 event_type: &str,
181 ts_ms: u128,
182 stats: Option<EntityStats>,
183 ) {
184 let event_time = ms_to_iso8601(ts_ms);
185 let job_name = format!("{}.{name}", self.config.namespace);
186
187 let mut run_facets = json!({});
188 if let Some(parent) = self.parent_run_facet() {
189 run_facets["parent"] = parent;
190 }
191
192 let inputs = if let Some(ref s) = stats {
194 let rejection_rate = if s.rows > 0 {
195 s.rejected as f64 / s.rows as f64
196 } else {
197 0.0
198 };
199 let schema_facet = json!({
200 "fields": s.schema_fields.iter().map(|(col_name, col_type)| {
201 json!({ "name": col_name, "type": col_type })
202 }).collect::<Vec<_>>(),
203 "_producer": self.producer(),
204 "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json"
205 });
206 let dq_facet = json!({
207 "rowCount": s.rows,
208 "validCount": s.accepted,
209 "invalidCount": s.rejected,
210 "_producer": self.producer(),
211 "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/DataQualityMetricsInputDatasetFacet.json"
212 });
213 let floe_facet = json!({
214 "entity": name,
215 "rejectionRate": rejection_rate,
216 "files": s.files,
217 "rows": s.rows,
218 "accepted": s.accepted,
219 "rejected": s.rejected,
220 "warnings": s.warnings,
221 "errors": s.errors,
222 "_producer": self.producer(),
223 "_schemaURL": "https://github.com/malon64/floe/schemas/FloeQualityRunFacet.json"
224 });
225 json!([{
226 "namespace": self.config.namespace,
227 "name": name,
228 "facets": {
229 "schema": schema_facet,
230 "dataQualityMetrics": dq_facet,
231 "floeQualityRun": floe_facet
232 }
233 }])
234 } else {
235 json!([])
236 };
237
238 let body = json!({
239 "eventType": event_type,
240 "eventTime": event_time,
241 "run": {
242 "runId": entity_run_id,
243 "facets": run_facets
244 },
245 "job": {
246 "namespace": self.config.namespace,
247 "name": job_name,
248 "facets": {}
249 },
250 "inputs": inputs,
251 "outputs": [],
252 "producer": self.producer(),
253 "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
254 });
255 self.post_event(body);
256 }
257}
258
259struct EntityStats {
260 files: u64,
261 rows: u64,
262 accepted: u64,
263 rejected: u64,
264 warnings: u64,
265 errors: u64,
266 schema_fields: Vec<(String, String)>,
267}
268
269fn ms_to_iso8601(ms: u128) -> String {
270 let secs = (ms / 1000) as i64;
271 let nanos = ((ms % 1000) * 1_000_000) as i64;
272 match time::OffsetDateTime::from_unix_timestamp(secs) {
273 Ok(dt) => {
274 let ns = time::Duration::nanoseconds(nanos);
275 let dt = dt.saturating_add(ns);
276 dt.format(&time::format_description::well_known::Rfc3339)
277 .unwrap_or_else(|_| ms.to_string())
278 }
279 Err(_) => ms.to_string(),
280 }
281}
282
283impl RunObserver for OpenLineageObserver {
284 fn on_event(&self, event: RunEvent) {
285 match event {
286 RunEvent::RunStarted { run_id, ts_ms, .. } => {
287 self.consecutive_failures.store(0, Ordering::Relaxed);
290 self.circuit_open.store(false, Ordering::Relaxed);
291 if let Ok(mut guard) = self.run_start_ms.lock() {
292 *guard = Some(ts_ms);
293 }
294 let event_time = ms_to_iso8601(ts_ms);
295 let mut run_facets = json!({});
296 if let Some(parent) = self.parent_run_facet() {
297 run_facets["parent"] = parent;
298 }
299 let body = json!({
300 "eventType": "START",
301 "eventTime": event_time,
302 "run": {
303 "runId": run_id,
304 "facets": run_facets
305 },
306 "job": {
307 "namespace": self.config.namespace,
308 "name": run_id,
309 "facets": {}
310 },
311 "inputs": [],
312 "outputs": [],
313 "producer": self.producer(),
314 "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
315 });
316 self.post_event(body);
317 }
318 RunEvent::EntityStarted {
319 run_id,
320 name,
321 ts_ms,
322 } => {
323 let entity_run_id = format!("{run_id}.entity.{name}");
326 if let Ok(mut guard) = self.entity_start_ms.lock() {
327 guard.insert(name.clone(), ts_ms);
328 }
329 if let Ok(mut guard) = self.entity_run_ids.lock() {
330 guard.insert(name.clone(), entity_run_id.clone());
331 }
332 self.emit_entity_run_event(&entity_run_id, &name, "START", ts_ms, None);
333 }
334 RunEvent::EntityFinished {
335 run_id,
336 name,
337 status,
338 files,
339 rows,
340 accepted,
341 rejected,
342 warnings,
343 errors,
344 ts_ms,
345 } => {
346 let entity_run_id = self
347 .entity_run_ids
348 .lock()
349 .ok()
350 .and_then(|g| g.get(&name).cloned())
351 .unwrap_or_else(|| format!("{run_id}.entity.{name}"));
352 let event_type = if status == "failed" || status == "aborted" {
353 "FAIL"
354 } else {
355 "COMPLETE"
356 };
357 let schema_fields = self.entity_schemas.get(&name).cloned().unwrap_or_default();
358 let stats = EntityStats {
359 files,
360 rows,
361 accepted,
362 rejected,
363 warnings,
364 errors,
365 schema_fields,
366 };
367 self.emit_entity_run_event(&entity_run_id, &name, event_type, ts_ms, Some(stats));
368 }
369 RunEvent::RunFinished {
370 run_id,
371 status,
372 ts_ms,
373 ..
374 } => {
375 let event_type = if status == "failed" || status == "aborted" {
376 "FAIL"
377 } else {
378 "COMPLETE"
379 };
380 let event_time = ms_to_iso8601(ts_ms);
381 let mut run_facets = json!({});
382 if let Some(parent) = self.parent_run_facet() {
383 run_facets["parent"] = parent;
384 }
385 let body = json!({
386 "eventType": event_type,
387 "eventTime": event_time,
388 "run": {
389 "runId": run_id,
390 "facets": run_facets
391 },
392 "job": {
393 "namespace": self.config.namespace,
394 "name": run_id,
395 "facets": {}
396 },
397 "inputs": [],
398 "outputs": [],
399 "producer": self.producer(),
400 "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
401 });
402 self.post_event(body);
403 }
404 _ => {}
405 }
406 }
407}
408
409pub fn build_observer(
410 config: &LineageConfig,
411 entities: &[EntityConfig],
412) -> crate::FloeResult<Arc<dyn RunObserver>> {
413 let obs = OpenLineageObserver::new(config, entities)?;
414 Ok(Arc::new(obs))
415}
416
417impl OpenLineageObserver {
418 pub fn is_circuit_open(&self) -> bool {
419 self.circuit_open.load(Ordering::Relaxed)
420 }
421
422 pub fn consecutive_failures(&self) -> usize {
423 self.consecutive_failures.load(Ordering::Relaxed)
424 }
425}