use serde::Serialize;
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize)]
pub enum LineageSource {
DirectWrite { client_id: Option<String> },
Computed { query: String },
Imported { source: String },
}
#[derive(Debug, Clone, Serialize)]
pub struct Transformation {
pub operation: String,
pub timestamp: u64,
pub input_keys: Vec<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct DataLineage {
pub key: String,
pub timestamp: u64,
pub source: LineageSource,
pub transformations: Vec<Transformation>,
}
#[derive(Debug, Default)]
pub struct LineageTracker {
records: HashMap<String, Vec<DataLineage>>,
}
impl LineageTracker {
pub fn new() -> Self {
Self::default()
}
pub fn record(&mut self, lineage: DataLineage) {
self.records
.entry(lineage.key.clone())
.or_default()
.push(lineage);
}
pub fn get_lineage(&self, key: &str) -> Vec<&DataLineage> {
self.records
.get(key)
.map(|v| v.iter().collect())
.unwrap_or_default()
}
pub fn find_derived_from(&self, source_key: &str) -> Vec<String> {
self.records
.iter()
.filter(|(_, lineages)| {
lineages.iter().any(|l| {
l.transformations
.iter()
.any(|t| t.input_keys.contains(&source_key.to_string()))
})
})
.map(|(key, _)| key.clone())
.collect()
}
pub fn len(&self) -> usize {
self.records.len()
}
pub fn is_empty(&self) -> bool {
self.records.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_record_and_get_lineage() {
let mut tracker = LineageTracker::new();
tracker.record(DataLineage {
key: "sensor/temp/avg".into(),
timestamp: 1000,
source: LineageSource::Computed {
query: "SELECT AVG(value) FROM \"sensor/temp\"".into(),
},
transformations: vec![Transformation {
operation: "AVG".into(),
timestamp: 1000,
input_keys: vec!["sensor/temp".into()],
}],
});
let lineage = tracker.get_lineage("sensor/temp/avg");
assert_eq!(lineage.len(), 1);
assert!(matches!(&lineage[0].source, LineageSource::Computed { .. }));
}
#[test]
fn test_find_derived_from() {
let mut tracker = LineageTracker::new();
tracker.record(DataLineage {
key: "derived/a".into(),
timestamp: 100,
source: LineageSource::Computed {
query: "...".into(),
},
transformations: vec![Transformation {
operation: "SUM".into(),
timestamp: 100,
input_keys: vec!["source/x".into()],
}],
});
tracker.record(DataLineage {
key: "derived/b".into(),
timestamp: 200,
source: LineageSource::Computed {
query: "...".into(),
},
transformations: vec![Transformation {
operation: "AVG".into(),
timestamp: 200,
input_keys: vec!["source/x".into(), "source/y".into()],
}],
});
let derived = tracker.find_derived_from("source/x");
assert_eq!(derived.len(), 2);
assert!(derived.contains(&"derived/a".to_string()));
assert!(derived.contains(&"derived/b".to_string()));
}
#[test]
fn test_empty_lineage() {
let tracker = LineageTracker::new();
assert!(tracker.get_lineage("nonexistent").is_empty());
assert!(tracker.find_derived_from("x").is_empty());
}
}