entrenar/integrity/lineage/
causal.rs1use super::event::{LineageEvent, LineageEventType};
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Clone, Default, Serialize, Deserialize)]
8pub struct CausalLineage {
9 pub events: Vec<LineageEvent>,
11}
12
13impl CausalLineage {
14 pub fn new() -> Self {
16 Self::default()
17 }
18
19 pub fn add_event(&mut self, event: LineageEvent) {
21 self.events.push(event);
22 self.events.sort();
23 }
24
25 pub fn events_in_order(&self) -> &[LineageEvent] {
27 &self.events
28 }
29
30 pub fn events_for_run(&self, run_id: &str) -> Vec<&LineageEvent> {
32 self.events.iter().filter(|e| e.run_id == run_id).collect()
33 }
34
35 pub fn events_of_type(&self, event_type: LineageEventType) -> Vec<&LineageEvent> {
37 self.events.iter().filter(|e| e.event_type == event_type).collect()
38 }
39
40 pub fn latest_event_for_run(&self, run_id: &str) -> Option<&LineageEvent> {
42 self.events.iter().rev().find(|e| e.run_id == run_id)
43 }
44
45 pub fn run_precedes(&self, run_a: &str, run_b: &str) -> bool {
47 let a_events = self.events_for_run(run_a);
48 let b_events = self.events_for_run(run_b);
49
50 if a_events.is_empty() || b_events.is_empty() {
51 return false;
52 }
53
54 let b_first = b_events.first().expect("b_events is non-empty (checked above)");
56 a_events.iter().all(|a| a.timestamp.happens_before(&b_first.timestamp))
57 }
58
59 pub fn len(&self) -> usize {
61 self.events.len()
62 }
63
64 pub fn is_empty(&self) -> bool {
66 self.events.is_empty()
67 }
68}