Skip to main content

tl_stream/
lineage.rs

1// ThinkingLanguage — Data lineage tracking
2
3use chrono::Utc;
4use std::collections::HashMap;
5
6/// A node in the data lineage graph.
7#[derive(Debug, Clone)]
8pub struct LineageNode {
9    pub id: String,
10    pub stage: String,
11    pub operation: String,
12    pub timestamp: String,
13    pub row_count: Option<u64>,
14    pub parent_ids: Vec<String>,
15}
16
17/// Tracks data lineage through pipeline stages.
18#[derive(Debug, Clone, Default)]
19pub struct LineageTracker {
20    nodes: Vec<LineageNode>,
21    node_map: HashMap<String, usize>,
22    next_id: u64,
23}
24
25impl LineageTracker {
26    pub fn new() -> Self {
27        Self::default()
28    }
29
30    /// Record a lineage node.
31    pub fn record(
32        &mut self,
33        stage: &str,
34        operation: &str,
35        row_count: Option<u64>,
36        parent_ids: Vec<String>,
37    ) -> String {
38        let id = format!("node_{}", self.next_id);
39        self.next_id += 1;
40
41        let node = LineageNode {
42            id: id.clone(),
43            stage: stage.to_string(),
44            operation: operation.to_string(),
45            timestamp: Utc::now().to_rfc3339(),
46            row_count,
47            parent_ids,
48        };
49
50        let idx = self.nodes.len();
51        self.node_map.insert(id.clone(), idx);
52        self.nodes.push(node);
53        id
54    }
55
56    /// Get all lineage nodes.
57    pub fn nodes(&self) -> &[LineageNode] {
58        &self.nodes
59    }
60
61    /// Export lineage as DOT graph format.
62    pub fn to_dot(&self) -> String {
63        let mut dot = String::from("digraph lineage {\n");
64        dot.push_str("  rankdir=LR;\n");
65        dot.push_str("  node [shape=box];\n\n");
66
67        for node in &self.nodes {
68            let label = match node.row_count {
69                Some(n) => format!("{}\\n{}\\n{} rows", node.stage, node.operation, n),
70                None => format!("{}\\n{}", node.stage, node.operation),
71            };
72            dot.push_str(&format!("  {} [label=\"{}\"];\n", node.id, label));
73        }
74
75        dot.push('\n');
76
77        for node in &self.nodes {
78            for parent_id in &node.parent_ids {
79                dot.push_str(&format!("  {} -> {};\n", parent_id, node.id));
80            }
81        }
82
83        dot.push_str("}\n");
84        dot
85    }
86
87    /// Export lineage as JSON.
88    pub fn to_json(&self) -> String {
89        let nodes: Vec<serde_json::Value> = self
90            .nodes
91            .iter()
92            .map(|n| {
93                serde_json::json!({
94                    "id": n.id,
95                    "stage": n.stage,
96                    "operation": n.operation,
97                    "timestamp": n.timestamp,
98                    "row_count": n.row_count,
99                    "parent_ids": n.parent_ids,
100                })
101            })
102            .collect();
103        serde_json::to_string_pretty(&serde_json::json!({ "lineage": nodes }))
104            .unwrap_or_else(|_| "{}".to_string())
105    }
106
107    /// Export lineage as plain text.
108    pub fn to_text(&self) -> String {
109        let mut text = String::new();
110        for node in &self.nodes {
111            let rows = node
112                .row_count
113                .map(|n| format!(" ({n} rows)"))
114                .unwrap_or_default();
115            text.push_str(&format!(
116                "[{}] {}: {}{}\n",
117                node.id, node.stage, node.operation, rows
118            ));
119            for parent in &node.parent_ids {
120                text.push_str(&format!("  <- {}\n", parent));
121            }
122        }
123        text
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130
131    #[test]
132    fn test_lineage_record() {
133        let mut tracker = LineageTracker::new();
134        let id1 = tracker.record("extract", "read_csv", Some(1000), vec![]);
135        let id2 = tracker.record("transform", "filter", Some(500), vec![id1.clone()]);
136        let _id3 = tracker.record("load", "write_parquet", Some(500), vec![id2.clone()]);
137
138        assert_eq!(tracker.nodes().len(), 3);
139        assert_eq!(tracker.nodes()[0].stage, "extract");
140        assert_eq!(tracker.nodes()[1].parent_ids, vec![id1]);
141        assert_eq!(tracker.nodes()[2].parent_ids, vec![id2]);
142    }
143
144    #[test]
145    fn test_lineage_dot_output() {
146        let mut tracker = LineageTracker::new();
147        let id1 = tracker.record("extract", "read_csv", Some(100), vec![]);
148        tracker.record("transform", "filter", Some(50), vec![id1]);
149
150        let dot = tracker.to_dot();
151        assert!(dot.contains("digraph lineage"));
152        assert!(dot.contains("node_0"));
153        assert!(dot.contains("node_1"));
154        assert!(dot.contains("node_0 -> node_1"));
155        assert!(dot.contains("100 rows"));
156    }
157
158    #[test]
159    fn test_lineage_json_output() {
160        let mut tracker = LineageTracker::new();
161        tracker.record("extract", "read_csv", Some(100), vec![]);
162
163        let json = tracker.to_json();
164        assert!(json.contains("\"lineage\""));
165        assert!(json.contains("\"extract\""));
166        assert!(json.contains("\"read_csv\""));
167    }
168
169    #[test]
170    fn test_lineage_text_output() {
171        let mut tracker = LineageTracker::new();
172        let id1 = tracker.record("extract", "read_csv", Some(100), vec![]);
173        tracker.record("transform", "filter", None, vec![id1]);
174
175        let text = tracker.to_text();
176        assert!(text.contains("[node_0] extract: read_csv (100 rows)"));
177        assert!(text.contains("[node_1] transform: filter"));
178        assert!(text.contains("<- node_0"));
179    }
180}