1use chrono::Utc;
4use std::collections::HashMap;
5
6#[derive(Debug, Clone)]
8pub struct LineageNode {
9 pub id: String,
10 pub stage: String,
11 pub operation: String,
12 pub timestamp: String,
13 pub row_count: Option<u64>,
14 pub parent_ids: Vec<String>,
15}
16
17#[derive(Debug, Clone, Default)]
19pub struct LineageTracker {
20 nodes: Vec<LineageNode>,
21 node_map: HashMap<String, usize>,
22 next_id: u64,
23}
24
25impl LineageTracker {
26 pub fn new() -> Self {
27 Self::default()
28 }
29
30 pub fn record(
32 &mut self,
33 stage: &str,
34 operation: &str,
35 row_count: Option<u64>,
36 parent_ids: Vec<String>,
37 ) -> String {
38 let id = format!("node_{}", self.next_id);
39 self.next_id += 1;
40
41 let node = LineageNode {
42 id: id.clone(),
43 stage: stage.to_string(),
44 operation: operation.to_string(),
45 timestamp: Utc::now().to_rfc3339(),
46 row_count,
47 parent_ids,
48 };
49
50 let idx = self.nodes.len();
51 self.node_map.insert(id.clone(), idx);
52 self.nodes.push(node);
53 id
54 }
55
56 pub fn nodes(&self) -> &[LineageNode] {
58 &self.nodes
59 }
60
61 pub fn to_dot(&self) -> String {
63 let mut dot = String::from("digraph lineage {\n");
64 dot.push_str(" rankdir=LR;\n");
65 dot.push_str(" node [shape=box];\n\n");
66
67 for node in &self.nodes {
68 let label = match node.row_count {
69 Some(n) => format!("{}\\n{}\\n{} rows", node.stage, node.operation, n),
70 None => format!("{}\\n{}", node.stage, node.operation),
71 };
72 dot.push_str(&format!(" {} [label=\"{}\"];\n", node.id, label));
73 }
74
75 dot.push('\n');
76
77 for node in &self.nodes {
78 for parent_id in &node.parent_ids {
79 dot.push_str(&format!(" {} -> {};\n", parent_id, node.id));
80 }
81 }
82
83 dot.push_str("}\n");
84 dot
85 }
86
87 pub fn to_json(&self) -> String {
89 let nodes: Vec<serde_json::Value> = self
90 .nodes
91 .iter()
92 .map(|n| {
93 serde_json::json!({
94 "id": n.id,
95 "stage": n.stage,
96 "operation": n.operation,
97 "timestamp": n.timestamp,
98 "row_count": n.row_count,
99 "parent_ids": n.parent_ids,
100 })
101 })
102 .collect();
103 serde_json::to_string_pretty(&serde_json::json!({ "lineage": nodes }))
104 .unwrap_or_else(|_| "{}".to_string())
105 }
106
107 pub fn to_text(&self) -> String {
109 let mut text = String::new();
110 for node in &self.nodes {
111 let rows = node
112 .row_count
113 .map(|n| format!(" ({n} rows)"))
114 .unwrap_or_default();
115 text.push_str(&format!(
116 "[{}] {}: {}{}\n",
117 node.id, node.stage, node.operation, rows
118 ));
119 for parent in &node.parent_ids {
120 text.push_str(&format!(" <- {}\n", parent));
121 }
122 }
123 text
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130
131 #[test]
132 fn test_lineage_record() {
133 let mut tracker = LineageTracker::new();
134 let id1 = tracker.record("extract", "read_csv", Some(1000), vec![]);
135 let id2 = tracker.record("transform", "filter", Some(500), vec![id1.clone()]);
136 let _id3 = tracker.record("load", "write_parquet", Some(500), vec![id2.clone()]);
137
138 assert_eq!(tracker.nodes().len(), 3);
139 assert_eq!(tracker.nodes()[0].stage, "extract");
140 assert_eq!(tracker.nodes()[1].parent_ids, vec![id1]);
141 assert_eq!(tracker.nodes()[2].parent_ids, vec![id2]);
142 }
143
144 #[test]
145 fn test_lineage_dot_output() {
146 let mut tracker = LineageTracker::new();
147 let id1 = tracker.record("extract", "read_csv", Some(100), vec![]);
148 tracker.record("transform", "filter", Some(50), vec![id1]);
149
150 let dot = tracker.to_dot();
151 assert!(dot.contains("digraph lineage"));
152 assert!(dot.contains("node_0"));
153 assert!(dot.contains("node_1"));
154 assert!(dot.contains("node_0 -> node_1"));
155 assert!(dot.contains("100 rows"));
156 }
157
158 #[test]
159 fn test_lineage_json_output() {
160 let mut tracker = LineageTracker::new();
161 tracker.record("extract", "read_csv", Some(100), vec![]);
162
163 let json = tracker.to_json();
164 assert!(json.contains("\"lineage\""));
165 assert!(json.contains("\"extract\""));
166 assert!(json.contains("\"read_csv\""));
167 }
168
169 #[test]
170 fn test_lineage_text_output() {
171 let mut tracker = LineageTracker::new();
172 let id1 = tracker.record("extract", "read_csv", Some(100), vec![]);
173 tracker.record("transform", "filter", None, vec![id1]);
174
175 let text = tracker.to_text();
176 assert!(text.contains("[node_0] extract: read_csv (100 rows)"));
177 assert!(text.contains("[node_1] transform: filter"));
178 assert!(text.contains("<- node_0"));
179 }
180}