use alloc::string::{String, ToString};
use alloc::vec::Vec;
use core::fmt;
#[derive(Debug, Clone)]
pub struct LineageNode {
pub id: u64,
pub object_id: u64,
pub version: u64,
pub path: String,
pub checksum: [u64; 4],
pub created: u64,
pub creator: String,
pub dataset: String,
pub size: u64,
pub metadata: Option<String>,
}
impl LineageNode {
pub fn new(
id: u64,
object_id: u64,
version: u64,
path: &str,
checksum: [u64; 4],
created: u64,
creator: &str,
dataset: &str,
) -> Self {
Self {
id,
object_id,
version,
path: path.to_string(),
checksum,
created,
creator: creator.to_string(),
dataset: dataset.to_string(),
size: 0,
metadata: None,
}
}
pub fn with_size(mut self, size: u64) -> Self {
self.size = size;
self
}
pub fn with_metadata(mut self, metadata: &str) -> Self {
self.metadata = Some(metadata.to_string());
self
}
pub fn checksum_hex(&self) -> String {
alloc::format!(
"{:016x}{:016x}{:016x}{:016x}",
self.checksum[0],
self.checksum[1],
self.checksum[2],
self.checksum[3]
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum LineageRelation {
Copy = 1,
Derived = 2,
Merged = 3,
Import = 4,
Updated = 5,
Snapshot = 6,
Clone = 7,
Renamed = 8,
}
impl LineageRelation {
pub fn from_u8(val: u8) -> Option<Self> {
match val {
1 => Some(Self::Copy),
2 => Some(Self::Derived),
3 => Some(Self::Merged),
4 => Some(Self::Import),
5 => Some(Self::Updated),
6 => Some(Self::Snapshot),
7 => Some(Self::Clone),
8 => Some(Self::Renamed),
_ => None,
}
}
pub fn name(&self) -> &'static str {
match self {
Self::Copy => "copy",
Self::Derived => "derived",
Self::Merged => "merged",
Self::Import => "import",
Self::Updated => "updated",
Self::Snapshot => "snapshot",
Self::Clone => "clone",
Self::Renamed => "renamed",
}
}
}
impl fmt::Display for LineageRelation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.name())
}
}
#[derive(Debug, Clone)]
pub struct LineageEdge {
pub source: u64,
pub target: u64,
pub relation: LineageRelation,
pub transform: Option<String>,
pub timestamp: u64,
pub weight: f32,
}
impl LineageEdge {
pub fn new(source: u64, target: u64, relation: LineageRelation) -> Self {
Self {
source,
target,
relation,
transform: None,
timestamp: 0,
weight: 1.0,
}
}
pub fn with_transform(mut self, transform: &str) -> Self {
self.transform = Some(transform.to_string());
self
}
pub fn with_timestamp(mut self, timestamp: u64) -> Self {
self.timestamp = timestamp;
self
}
pub fn with_weight(mut self, weight: f32) -> Self {
self.weight = weight;
self
}
}
#[derive(Debug, Clone, Default)]
pub struct LineageGraph {
pub nodes: Vec<LineageNode>,
pub edges: Vec<LineageEdge>,
pub root: Option<u64>,
pub max_depth: usize,
}
impl LineageGraph {
pub fn new() -> Self {
Self::default()
}
pub fn with_root(root: u64) -> Self {
Self {
root: Some(root),
..Default::default()
}
}
pub fn add_node(&mut self, node: LineageNode) {
self.nodes.push(node);
}
pub fn add_edge(&mut self, edge: LineageEdge) {
self.edges.push(edge);
}
pub fn node_count(&self) -> usize {
self.nodes.len()
}
pub fn edge_count(&self) -> usize {
self.edges.len()
}
pub fn find_node(&self, id: u64) -> Option<&LineageNode> {
self.nodes.iter().find(|n| n.id == id)
}
pub fn edges_from(&self, node_id: u64) -> Vec<&LineageEdge> {
self.edges.iter().filter(|e| e.source == node_id).collect()
}
pub fn edges_to(&self, node_id: u64) -> Vec<&LineageEdge> {
self.edges.iter().filter(|e| e.target == node_id).collect()
}
pub fn to_dot(&self) -> String {
let mut dot = String::from("digraph lineage {\n");
dot.push_str(" rankdir=TB;\n");
dot.push_str(" node [shape=box];\n\n");
for node in &self.nodes {
let label = alloc::format!(
"{}\\nv{}\\n{}",
node.path.split('/').next_back().unwrap_or(&node.path),
node.version,
&node.checksum_hex()[..8]
);
dot.push_str(&alloc::format!(" n{} [label=\"{}\"];\n", node.id, label));
}
dot.push('\n');
for edge in &self.edges {
let style = match edge.relation {
LineageRelation::Copy => "style=dashed",
LineageRelation::Merged => "style=bold",
LineageRelation::Import => "style=dotted",
_ => "",
};
let label = match &edge.transform {
Some(t) => alloc::format!("label=\"{}\"", t),
None => alloc::format!("label=\"{}\"", edge.relation),
};
dot.push_str(&alloc::format!(
" n{} -> n{} [{}{}];\n",
edge.source,
edge.target,
label,
if style.is_empty() {
String::new()
} else {
alloc::format!(",{}", style)
}
));
}
dot.push_str("}\n");
dot
}
}
#[derive(Debug, Clone, Default)]
pub struct LineageQuery {
pub dataset: Option<String>,
pub path_pattern: Option<String>,
pub creator: Option<String>,
pub created_after: Option<u64>,
pub created_before: Option<u64>,
pub checksum: Option<[u64; 4]>,
pub limit: Option<usize>,
pub offset: Option<usize>,
}
impl LineageQuery {
pub fn new() -> Self {
Self::default()
}
pub fn dataset(mut self, dataset: &str) -> Self {
self.dataset = Some(dataset.to_string());
self
}
pub fn path(mut self, pattern: &str) -> Self {
self.path_pattern = Some(pattern.to_string());
self
}
pub fn creator(mut self, creator: &str) -> Self {
self.creator = Some(creator.to_string());
self
}
pub fn created_between(mut self, after: u64, before: u64) -> Self {
self.created_after = Some(after);
self.created_before = Some(before);
self
}
pub fn limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
pub fn offset(mut self, offset: usize) -> Self {
self.offset = Some(offset);
self
}
pub fn matches(&self, node: &LineageNode) -> bool {
if let Some(ref ds) = self.dataset {
if &node.dataset != ds {
return false;
}
}
if let Some(ref pattern) = self.path_pattern {
if !path_matches(pattern, &node.path) {
return false;
}
}
if let Some(ref creator) = self.creator {
if &node.creator != creator {
return false;
}
}
if let Some(after) = self.created_after {
if node.created < after {
return false;
}
}
if let Some(before) = self.created_before {
if node.created > before {
return false;
}
}
if let Some(checksum) = self.checksum {
if node.checksum != checksum {
return false;
}
}
true
}
}
fn path_matches(pattern: &str, path: &str) -> bool {
if pattern.is_empty() {
return path.is_empty();
}
if pattern == "*" {
return true;
}
if pattern.starts_with('*') && pattern.ends_with('*') {
let inner = pattern
.strip_prefix('*')
.and_then(|s| s.strip_suffix('*'))
.unwrap_or("");
return path.contains(inner);
}
if let Some(suffix) = pattern.strip_prefix('*') {
return path.ends_with(suffix);
}
if let Some(prefix) = pattern.strip_suffix('*') {
return path.starts_with(prefix);
}
path == pattern
}
#[derive(Debug, Clone)]
pub enum LineageError {
NodeNotFound(u64),
EdgeExists {
source: u64,
target: u64,
},
CycleDetected {
path: Vec<u64>,
},
InvalidRelation(String),
StorageError(String),
}
impl fmt::Display for LineageError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NodeNotFound(id) => write!(f, "node not found: {}", id),
Self::EdgeExists { source, target } => {
write!(f, "edge already exists: {} -> {}", source, target)
}
Self::CycleDetected { path } => {
write!(f, "cycle detected: {:?}", path)
}
Self::InvalidRelation(msg) => write!(f, "invalid relation: {}", msg),
Self::StorageError(msg) => write!(f, "storage error: {}", msg),
}
}
}
pub type LineageResult<T> = Result<T, LineageError>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lineage_node() {
let node = LineageNode::new(
1,
100,
1,
"/data/file.txt",
[0x1234, 0x5678, 0x9abc, 0xdef0],
1000,
"user1",
"pool/data",
);
assert_eq!(node.id, 1);
assert_eq!(node.object_id, 100);
assert_eq!(node.path, "/data/file.txt");
assert!(node.checksum_hex().contains("1234"));
}
#[test]
fn test_lineage_relation() {
assert_eq!(LineageRelation::Copy.name(), "copy");
assert_eq!(LineageRelation::from_u8(1), Some(LineageRelation::Copy));
assert_eq!(LineageRelation::from_u8(99), None);
}
#[test]
fn test_lineage_edge() {
let edge = LineageEdge::new(1, 2, LineageRelation::Derived)
.with_transform("gzip")
.with_timestamp(1000);
assert_eq!(edge.source, 1);
assert_eq!(edge.target, 2);
assert_eq!(edge.transform, Some("gzip".into()));
}
#[test]
fn test_lineage_graph() {
let mut graph = LineageGraph::with_root(1);
graph.add_node(LineageNode::new(1, 1, 1, "/a", [0; 4], 0, "user", "ds"));
graph.add_node(LineageNode::new(2, 2, 1, "/b", [0; 4], 0, "user", "ds"));
graph.add_edge(LineageEdge::new(1, 2, LineageRelation::Copy));
assert_eq!(graph.node_count(), 2);
assert_eq!(graph.edge_count(), 1);
assert!(graph.find_node(1).is_some());
assert_eq!(graph.edges_from(1).len(), 1);
assert_eq!(graph.edges_to(2).len(), 1);
}
#[test]
fn test_to_dot() {
let mut graph = LineageGraph::new();
graph.add_node(LineageNode::new(1, 1, 1, "/a.txt", [0; 4], 0, "user", "ds"));
graph.add_node(LineageNode::new(2, 2, 1, "/b.txt", [0; 4], 0, "user", "ds"));
graph.add_edge(LineageEdge::new(1, 2, LineageRelation::Copy));
let dot = graph.to_dot();
assert!(dot.contains("digraph"));
assert!(dot.contains("n1"));
assert!(dot.contains("n2"));
assert!(dot.contains("->"));
}
#[test]
fn test_lineage_query() {
let query = LineageQuery::new()
.dataset("pool/data")
.creator("user1")
.limit(10);
let node = LineageNode::new(1, 1, 1, "/file.txt", [0; 4], 1000, "user1", "pool/data");
assert!(query.matches(&node));
let other = LineageNode::new(2, 2, 1, "/file.txt", [0; 4], 1000, "user2", "pool/data");
assert!(!query.matches(&other));
}
#[test]
fn test_path_matches() {
assert!(path_matches("*.txt", "/file.txt"));
assert!(path_matches("/data/*", "/data/file"));
assert!(path_matches("*file*", "/path/to/file.txt"));
assert!(path_matches("*", "/anything"));
assert!(!path_matches("*.txt", "/file.csv"));
}
#[test]
fn test_error_display() {
let err = LineageError::NodeNotFound(123);
assert!(err.to_string().contains("123"));
}
}