use alloc::collections::{BTreeMap, BTreeSet};
use alloc::string::{String, ToString};
use alloc::vec;
use alloc::vec::Vec;
use lazy_static::lazy_static;
use spin::Mutex;
use super::types::{
LineageEdge, LineageError, LineageGraph, LineageNode, LineageQuery, LineageRelation,
LineageResult,
};
lazy_static! {
static ref LINEAGE_NODES: Mutex<BTreeMap<u64, LineageNode>> = Mutex::new(BTreeMap::new());
static ref LINEAGE_EDGES: Mutex<Vec<LineageEdge>> = Mutex::new(Vec::new());
static ref OBJECT_INDEX: Mutex<BTreeMap<u64, Vec<u64>>> = Mutex::new(BTreeMap::new());
static ref PATH_INDEX: Mutex<BTreeMap<String, Vec<u64>>> = Mutex::new(BTreeMap::new());
static ref NEXT_NODE_ID: Mutex<u64> = Mutex::new(1);
}
fn allocate_node_id() -> u64 {
let mut next = NEXT_NODE_ID.lock();
let id = *next;
*next += 1;
id
}
pub fn add_node(node: LineageNode) -> LineageResult<u64> {
let id = node.id;
LINEAGE_NODES.lock().insert(id, node.clone());
OBJECT_INDEX
.lock()
.entry(node.object_id)
.or_default()
.push(id);
PATH_INDEX
.lock()
.entry(node.path.clone())
.or_default()
.push(id);
Ok(id)
}
pub fn get_node(id: u64) -> LineageResult<LineageNode> {
LINEAGE_NODES
.lock()
.get(&id)
.cloned()
.ok_or(LineageError::NodeNotFound(id))
}
pub fn update_node(node: LineageNode) -> LineageResult<()> {
let mut nodes = LINEAGE_NODES.lock();
if !nodes.contains_key(&node.id) {
return Err(LineageError::NodeNotFound(node.id));
}
nodes.insert(node.id, node);
Ok(())
}
pub fn delete_node(id: u64) -> LineageResult<()> {
let mut nodes = LINEAGE_NODES.lock();
let node = nodes.remove(&id).ok_or(LineageError::NodeNotFound(id))?;
if let Some(ids) = OBJECT_INDEX.lock().get_mut(&node.object_id) {
ids.retain(|&nid| nid != id);
}
if let Some(ids) = PATH_INDEX.lock().get_mut(&node.path) {
ids.retain(|&nid| nid != id);
}
LINEAGE_EDGES
.lock()
.retain(|e| e.source != id && e.target != id);
Ok(())
}
pub fn add_edge(edge: LineageEdge) -> LineageResult<()> {
let nodes = LINEAGE_NODES.lock();
if !nodes.contains_key(&edge.source) {
return Err(LineageError::NodeNotFound(edge.source));
}
if !nodes.contains_key(&edge.target) {
return Err(LineageError::NodeNotFound(edge.target));
}
drop(nodes);
if would_create_cycle(edge.source, edge.target)? {
return Err(LineageError::CycleDetected {
path: vec![edge.source, edge.target],
});
}
LINEAGE_EDGES.lock().push(edge);
Ok(())
}
fn would_create_cycle(source: u64, target: u64) -> LineageResult<bool> {
let ancestors = get_ancestors_internal(source, 100)?;
Ok(ancestors.contains(&target))
}
pub fn get_edges_from(node_id: u64) -> Vec<LineageEdge> {
LINEAGE_EDGES
.lock()
.iter()
.filter(|e| e.source == node_id)
.cloned()
.collect()
}
pub fn get_edges_to(node_id: u64) -> Vec<LineageEdge> {
LINEAGE_EDGES
.lock()
.iter()
.filter(|e| e.target == node_id)
.cloned()
.collect()
}
pub fn record_create(
dataset: &str,
object_id: u64,
path: &str,
checksum: [u64; 4],
creator: &str,
timestamp: u64,
) -> LineageResult<u64> {
let id = allocate_node_id();
let node = LineageNode::new(
id, object_id, 1, path, checksum, timestamp, creator, dataset,
);
add_node(node)?;
Ok(id)
}
pub fn record_derivation(
source_ids: &[u64],
target_id: u64,
transform: &str,
timestamp: u64,
) -> LineageResult<()> {
for &source in source_ids {
let edge = LineageEdge::new(source, target_id, LineageRelation::Derived)
.with_transform(transform)
.with_timestamp(timestamp);
add_edge(edge)?;
}
Ok(())
}
pub fn record_copy(source_id: u64, target_id: u64, timestamp: u64) -> LineageResult<()> {
let edge =
LineageEdge::new(source_id, target_id, LineageRelation::Copy).with_timestamp(timestamp);
add_edge(edge)
}
pub fn record_import(target_id: u64, external_source: &str, timestamp: u64) -> LineageResult<()> {
let source_id = allocate_node_id();
let node = LineageNode::new(
source_id,
0, 0,
external_source,
[0; 4],
timestamp,
"external",
"external",
);
add_node(node)?;
let edge =
LineageEdge::new(source_id, target_id, LineageRelation::Import).with_timestamp(timestamp);
add_edge(edge)
}
pub fn record_update(source_id: u64, new_version_id: u64, timestamp: u64) -> LineageResult<()> {
let edge = LineageEdge::new(source_id, new_version_id, LineageRelation::Updated)
.with_timestamp(timestamp);
add_edge(edge)
}
pub fn record_merge(source_ids: &[u64], target_id: u64, timestamp: u64) -> LineageResult<()> {
for &source in source_ids {
let edge =
LineageEdge::new(source, target_id, LineageRelation::Merged).with_timestamp(timestamp);
add_edge(edge)?;
}
Ok(())
}
fn get_ancestors_internal(node_id: u64, max_depth: usize) -> LineageResult<BTreeSet<u64>> {
let mut ancestors = BTreeSet::new();
let mut frontier = vec![node_id];
let mut depth = 0;
let edges = LINEAGE_EDGES.lock();
while depth < max_depth && !frontier.is_empty() {
let mut next_frontier = Vec::new();
for current in frontier {
for edge in edges.iter() {
if edge.target == current && !ancestors.contains(&edge.source) {
ancestors.insert(edge.source);
next_frontier.push(edge.source);
}
}
}
frontier = next_frontier;
depth += 1;
}
Ok(ancestors)
}
pub fn get_ancestors(node_id: u64, max_depth: usize) -> LineageResult<LineageGraph> {
let ancestor_ids = get_ancestors_internal(node_id, max_depth)?;
let mut graph = LineageGraph::with_root(node_id);
let nodes = LINEAGE_NODES.lock();
let edges = LINEAGE_EDGES.lock();
if let Some(node) = nodes.get(&node_id) {
graph.add_node(node.clone());
}
for id in &ancestor_ids {
if let Some(node) = nodes.get(id) {
graph.add_node(node.clone());
}
}
for edge in edges.iter() {
if (ancestor_ids.contains(&edge.source) || ancestor_ids.contains(&edge.target))
&& (edge.target == node_id || ancestor_ids.contains(&edge.target))
{
graph.add_edge(edge.clone());
}
}
graph.max_depth = max_depth;
Ok(graph)
}
pub fn get_descendants(node_id: u64, max_depth: usize) -> LineageResult<LineageGraph> {
let mut descendants = BTreeSet::new();
let mut frontier = vec![node_id];
let mut depth = 0;
let edges = LINEAGE_EDGES.lock();
while depth < max_depth && !frontier.is_empty() {
let mut next_frontier = Vec::new();
for current in frontier {
for edge in edges.iter() {
if edge.source == current && !descendants.contains(&edge.target) {
descendants.insert(edge.target);
next_frontier.push(edge.target);
}
}
}
frontier = next_frontier;
depth += 1;
}
drop(edges);
let mut graph = LineageGraph::with_root(node_id);
let nodes = LINEAGE_NODES.lock();
let edges = LINEAGE_EDGES.lock();
if let Some(node) = nodes.get(&node_id) {
graph.add_node(node.clone());
}
for id in &descendants {
if let Some(node) = nodes.get(id) {
graph.add_node(node.clone());
}
}
for edge in edges.iter() {
if (edge.source == node_id || descendants.contains(&edge.source))
&& descendants.contains(&edge.target)
{
graph.add_edge(edge.clone());
}
}
graph.max_depth = max_depth;
Ok(graph)
}
pub fn search(query: &LineageQuery) -> Vec<LineageNode> {
let nodes = LINEAGE_NODES.lock();
let mut results: Vec<LineageNode> = nodes
.values()
.filter(|n| query.matches(n))
.cloned()
.collect();
if let Some(offset) = query.offset {
if offset < results.len() {
results = results.into_iter().skip(offset).collect();
} else {
results.clear();
}
}
if let Some(limit) = query.limit {
results.truncate(limit);
}
results
}
pub fn get_by_object(object_id: u64) -> Vec<LineageNode> {
let index = OBJECT_INDEX.lock();
let nodes = LINEAGE_NODES.lock();
index
.get(&object_id)
.map(|ids| ids.iter().filter_map(|id| nodes.get(id).cloned()).collect())
.unwrap_or_default()
}
pub fn get_by_path(path: &str) -> Vec<LineageNode> {
let index = PATH_INDEX.lock();
let nodes = LINEAGE_NODES.lock();
index
.get(path)
.map(|ids| ids.iter().filter_map(|id| nodes.get(id).cloned()).collect())
.unwrap_or_default()
}
pub fn get_latest_version(object_id: u64) -> Option<LineageNode> {
let versions = get_by_object(object_id);
versions.into_iter().max_by_key(|n| n.version)
}
pub fn get_stats() -> LineageStats {
let nodes = LINEAGE_NODES.lock();
let edges = LINEAGE_EDGES.lock();
let mut datasets = BTreeSet::new();
let mut creators = BTreeSet::new();
for node in nodes.values() {
datasets.insert(node.dataset.clone());
creators.insert(node.creator.clone());
}
LineageStats {
total_nodes: nodes.len(),
total_edges: edges.len(),
datasets: datasets.len(),
creators: creators.len(),
}
}
#[derive(Debug, Clone)]
pub struct LineageStats {
pub total_nodes: usize,
pub total_edges: usize,
pub datasets: usize,
pub creators: usize,
}
pub fn clear_all() {
LINEAGE_NODES.lock().clear();
LINEAGE_EDGES.lock().clear();
OBJECT_INDEX.lock().clear();
PATH_INDEX.lock().clear();
*NEXT_NODE_ID.lock() = 1;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_add_get_node() {
let id = record_create(
"test_add_get",
100,
"/file.txt",
[1, 2, 3, 4],
"user1",
1000,
)
.unwrap();
let node = get_node(id).unwrap();
assert_eq!(node.object_id, 100);
assert_eq!(node.path, "/file.txt");
assert_eq!(node.creator, "user1");
}
#[test]
fn test_delete_node() {
let id = record_create("test_delete", 100, "/file.txt", [0; 4], "user", 0).unwrap();
delete_node(id).unwrap();
assert!(get_node(id).is_err());
}
#[test]
fn test_add_edge() {
let id1 = record_create("test_edge", 1001, "/edge_a", [0; 4], "user", 0).unwrap();
let id2 = record_create("test_edge", 1002, "/edge_b", [0; 4], "user", 0).unwrap();
record_copy(id1, id2, 100).unwrap();
let edges = get_edges_from(id1);
assert!(edges.iter().any(|e| e.target == id2));
}
#[test]
fn test_record_derivation() {
let s1 = record_create("test_deriv", 2001, "/src1", [0; 4], "user", 0).unwrap();
let s2 = record_create("test_deriv", 2002, "/src2", [0; 4], "user", 0).unwrap();
let tgt = record_create("test_deriv", 2003, "/target", [0; 4], "user", 0).unwrap();
record_derivation(&[s1, s2], tgt, "merge", 100).unwrap();
let edges = get_edges_to(tgt);
let deriv_edges: Vec<_> = edges
.iter()
.filter(|e| e.source == s1 || e.source == s2)
.collect();
assert_eq!(deriv_edges.len(), 2);
}
#[test]
fn test_get_ancestors() {
let n1 = record_create("test_anc", 3001, "/anc_a", [0; 4], "user", 0).unwrap();
let n2 = record_create("test_anc", 3002, "/anc_b", [0; 4], "user", 0).unwrap();
let n3 = record_create("test_anc", 3003, "/anc_c", [0; 4], "user", 0).unwrap();
record_copy(n1, n2, 0).unwrap();
record_copy(n2, n3, 0).unwrap();
let graph = get_ancestors(n3, 10).unwrap();
assert!(graph.node_count() >= 3);
assert!(graph.find_node(n1).is_some());
assert!(graph.find_node(n2).is_some());
}
#[test]
fn test_get_descendants() {
let n1 = record_create("test_desc", 4001, "/desc_a", [0; 4], "user", 0).unwrap();
let n2 = record_create("test_desc", 4002, "/desc_b", [0; 4], "user", 0).unwrap();
let n3 = record_create("test_desc", 4003, "/desc_c", [0; 4], "user", 0).unwrap();
record_copy(n1, n2, 0).unwrap();
record_copy(n1, n3, 0).unwrap();
let graph = get_descendants(n1, 10).unwrap();
assert!(graph.node_count() >= 3);
assert!(graph.find_node(n2).is_some());
assert!(graph.find_node(n3).is_some());
}
#[test]
fn test_search() {
let unique_ds = "test_search_unique_12345";
let unique_creator = "unique_creator_67890";
record_create(unique_ds, 5001, "/file1.txt", [0; 4], unique_creator, 1000).unwrap();
record_create(unique_ds, 5002, "/file2.txt", [0; 4], "other_user", 2000).unwrap();
let results = search(&LineageQuery::new().dataset(unique_ds));
assert!(results.len() >= 2);
let results = search(&LineageQuery::new().creator(unique_creator));
assert!(!results.is_empty());
}
#[test]
fn test_get_by_object() {
let unique_obj = 999001u64;
record_create("test_obj", unique_obj, "/v1", [0; 4], "user", 0).unwrap();
record_create("test_obj", unique_obj, "/v2", [0; 4], "user", 0).unwrap();
let versions = get_by_object(unique_obj);
assert!(versions.len() >= 2);
}
#[test]
fn test_cycle_detection() {
let n1 = record_create("test_cycle", 6001, "/cycle_a", [0; 4], "user", 0).unwrap();
let n2 = record_create("test_cycle", 6002, "/cycle_b", [0; 4], "user", 0).unwrap();
record_copy(n1, n2, 0).unwrap();
let result = add_edge(LineageEdge::new(n2, n1, LineageRelation::Copy));
assert!(matches!(result, Err(LineageError::CycleDetected { .. })));
}
#[test]
fn test_stats() {
let stats = get_stats();
let _ = stats.total_nodes;
}
}