pub mod store;
pub mod types;
pub use store::{
LineageStats, add_edge, add_node, clear_all, delete_node, get_ancestors, get_by_object,
get_by_path, get_descendants, get_edges_from, get_edges_to, get_latest_version, get_node,
get_stats, record_copy, record_create, record_derivation, record_import, record_merge,
record_update, search, update_node,
};
pub use types::{
LineageEdge, LineageError, LineageGraph, LineageNode, LineageQuery, LineageRelation,
LineageResult,
};
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_exports_accessible() {
let _ = LineageRelation::Copy;
let _ = LineageQuery::new();
}
#[test]
fn test_complete_lineage_workflow() {
let base_obj = 10000u64;
let original = record_create(
"workflow_test",
base_obj,
"/raw/data.csv",
[0x1234, 0x5678, 0, 0],
"data_import",
1000,
)
.unwrap();
let step1 = record_create(
"workflow_test",
base_obj + 1,
"/processed/step1.csv",
[0xabcd, 0xef01, 0, 0],
"etl_job",
2000,
)
.unwrap();
record_derivation(&[original], step1, "filter_nulls", 2000).unwrap();
let step2 = record_create(
"workflow_test",
base_obj + 2,
"/processed/step2.csv",
[0xfeed, 0xface, 0, 0],
"etl_job",
3000,
)
.unwrap();
record_derivation(&[step1], step2, "aggregate", 3000).unwrap();
let backup = record_create(
"workflow_backup",
base_obj + 100,
"/backup/step2_backup.csv",
[0xfeed, 0xface, 0, 0],
"backup_job",
4000,
)
.unwrap();
record_copy(step2, backup, 4000).unwrap();
let ancestors = get_ancestors(step2, 10).unwrap();
assert_eq!(ancestors.node_count(), 3); assert_eq!(ancestors.edge_count(), 2);
let descendants = get_descendants(original, 10).unwrap();
assert_eq!(descendants.node_count(), 4);
let dot = ancestors.to_dot();
assert!(dot.contains("digraph"));
assert!(dot.contains("filter_nulls") || dot.contains("aggregate"));
}
#[test]
fn test_merge_lineage() {
let base = 20000u64;
let src1 =
record_create("merge_ds", base, "/source1.csv", [1, 0, 0, 0], "user", 0).unwrap();
let src2 = record_create(
"merge_ds",
base + 1,
"/source2.csv",
[2, 0, 0, 0],
"user",
0,
)
.unwrap();
let merged = record_create(
"merge_ds",
base + 2,
"/merged.csv",
[3, 0, 0, 0],
"user",
100,
)
.unwrap();
record_merge(&[src1, src2], merged, 100).unwrap();
let edges = get_edges_to(merged);
assert_eq!(edges.len(), 2);
assert!(edges.iter().all(|e| e.relation == LineageRelation::Merged));
}
#[test]
fn test_import_lineage() {
let obj = 30000u64;
let file = record_create(
"import_ds",
obj,
"/imported.csv",
[0; 4],
"import_job",
1000,
)
.unwrap();
record_import(file, "s3://bucket/external/file.csv", 1000).unwrap();
let edges = get_edges_to(file);
assert_eq!(edges.len(), 1);
assert_eq!(edges[0].relation, LineageRelation::Import);
}
#[test]
fn test_version_lineage() {
let obj = 40000u64;
let v1 = record_create("version_ds", obj, "/file.txt", [1, 0, 0, 0], "user", 1000).unwrap();
let mut v2_node = LineageNode::new(
0, obj,
2,
"/file.txt",
[2, 0, 0, 0],
2000,
"user",
"version_ds",
);
v2_node.id = 1000; let v2 = add_node(v2_node.clone()).unwrap();
let mut final_node = v2_node;
final_node.id = v2;
update_node(final_node).unwrap();
record_update(v1, v2, 2000).unwrap();
let latest = get_latest_version(obj).unwrap();
assert_eq!(latest.version, 2);
}
#[test]
fn test_search_by_time_range() {
let base = 50000u64;
let ts_base = 50000000u64;
record_create(
"time_ds",
base,
"/time_a",
[0; 4],
"time_user",
ts_base + 1000,
)
.unwrap();
record_create(
"time_ds",
base + 1,
"/time_b",
[0; 4],
"time_user",
ts_base + 2000,
)
.unwrap();
record_create(
"time_ds",
base + 2,
"/time_c",
[0; 4],
"time_user",
ts_base + 3000,
)
.unwrap();
let query = LineageQuery::new().created_between(ts_base + 1500, ts_base + 2500);
let results = search(&query);
assert_eq!(results.len(), 1);
assert_eq!(results[0].path, "/time_b");
}
#[test]
fn test_search_with_pagination() {
let base = 60000u64;
for i in 0..10 {
record_create(
"pagination_ds",
base + i,
&alloc::format!("/paged_file{}", i),
[0; 4],
"paged_user",
i,
)
.unwrap();
}
let query = LineageQuery::new()
.dataset("pagination_ds")
.limit(3)
.offset(2);
let results = search(&query);
assert_eq!(results.len(), 3);
}
#[test]
fn test_statistics() {
let base = 70000u64;
let n1 = record_create("stats_ds1", base, "/stats_a", [0; 4], "stats_user1", 0).unwrap();
let n2 =
record_create("stats_ds1", base + 1, "/stats_b", [0; 4], "stats_user2", 0).unwrap();
record_create("stats_ds2", base + 2, "/stats_c", [0; 4], "stats_user1", 0).unwrap();
let _ = record_copy(n1, n2, 0);
let stats = get_stats();
assert!(stats.total_nodes >= 3);
assert!(stats.datasets >= 2);
assert!(stats.creators >= 2);
}
#[test]
fn test_graph_dot_export() {
let base = 80000u64;
let n1 = record_create("dot_ds", base, "/dot_input.csv", [0; 4], "dot_etl", 0).unwrap();
let n2 = record_create(
"dot_ds",
base + 1,
"/dot_output.csv",
[0; 4],
"dot_etl",
100,
)
.unwrap();
record_derivation(&[n1], n2, "dot_transform", 100).unwrap();
let graph = get_descendants(n1, 10).unwrap();
let dot = graph.to_dot();
assert!(dot.contains("digraph lineage"));
assert!(dot.contains("dot_input.csv"));
assert!(dot.contains("dot_output.csv"));
assert!(dot.contains("dot_transform"));
}
}