use std::collections::{HashMap, HashSet};
use ahash::AHashSet;
use crate::mvcc::{SnapshotState, VersionedSnapshot};
#[derive(Debug, Clone)]
pub struct TemporalPersistencePoint {
pub version: u64,
pub active: usize,
pub n_components: usize,
pub largest_size: usize,
pub fraction_largest: f32,
pub cycle_rank: usize,
pub n_nontrivial_sccs: usize,
pub largest_nontrivial_size: usize,
}
#[derive(Debug, Clone, PartialEq)]
pub struct TemporalBarcode {
pub birth_version: u64,
pub death_version: Option<u64>,
pub peak_size: usize,
}
#[derive(Debug, Clone, PartialEq)]
pub struct LineageBarcode {
pub birth_version: u64,
pub death_version: Option<u64>,
pub birth_size: usize,
pub peak_size: usize,
pub final_size: usize,
pub versions_seen: usize,
}
pub fn strongly_connected_components_snapshot(state: &SnapshotState) -> Vec<HashSet<i64>> {
let nodes: Vec<i64> = state.outgoing.keys().copied().collect();
if nodes.is_empty() {
return Vec::new();
}
let mut index_counter: i64 = 0;
let mut stack: Vec<i64> = Vec::new();
let mut on_stack: AHashSet<i64> = AHashSet::new();
let mut indices: HashMap<i64, i64> = HashMap::new();
let mut lowlink: HashMap<i64, i64> = HashMap::new();
let mut components: Vec<HashSet<i64>> = Vec::new();
fn strongconnect(
v: i64,
state: &SnapshotState,
index_counter: &mut i64,
stack: &mut Vec<i64>,
on_stack: &mut AHashSet<i64>,
indices: &mut HashMap<i64, i64>,
lowlink: &mut HashMap<i64, i64>,
components: &mut Vec<HashSet<i64>>,
) {
indices.insert(v, *index_counter);
lowlink.insert(v, *index_counter);
*index_counter += 1;
stack.push(v);
on_stack.insert(v);
if let Some(neighbors) = state.get_outgoing(v) {
for &w in neighbors {
if !indices.contains_key(&w) {
strongconnect(
w,
state,
index_counter,
stack,
on_stack,
indices,
lowlink,
components,
);
let wl = *lowlink.get(&w).unwrap();
let vl = *lowlink.get(&v).unwrap();
lowlink.insert(v, vl.min(wl));
} else if on_stack.contains(&w) {
let wi = *indices.get(&w).unwrap();
let vl = *lowlink.get(&v).unwrap();
lowlink.insert(v, vl.min(wi));
}
}
}
if lowlink.get(&v) == indices.get(&v) {
let mut component = HashSet::new();
loop {
let w = stack.pop().expect("stack non-empty for SCC root");
on_stack.remove(&w);
component.insert(w);
if w == v {
break;
}
}
components.push(component);
}
}
for &node in &nodes {
if !indices.contains_key(&node) {
strongconnect(
node,
state,
&mut index_counter,
&mut stack,
&mut on_stack,
&mut indices,
&mut lowlink,
&mut components,
);
}
}
components
}
fn count_weakly_connected_components(state: &SnapshotState) -> usize {
if state.outgoing.is_empty() {
return 0;
}
let mut parent: HashMap<i64, i64> = HashMap::new();
fn find(parent: &mut HashMap<i64, i64>, x: i64) -> i64 {
let mut root = x;
while parent.get(&root).copied().unwrap_or(root) != root {
root = parent[&root];
}
let mut curr = x;
while parent.get(&curr).copied().unwrap_or(curr) != root {
let next = parent[&curr];
parent.insert(curr, root);
curr = next;
}
root
}
fn union(parent: &mut HashMap<i64, i64>, a: i64, b: i64) {
let ra = find(parent, a);
let rb = find(parent, b);
if ra != rb {
parent.insert(ra, rb);
}
}
for &node in state.outgoing.keys() {
parent.entry(node).or_insert(node);
}
for (&src, dsts) in &state.outgoing {
for &dst in dsts {
parent.entry(dst).or_insert(dst);
union(&mut parent, src, dst);
}
}
let roots: AHashSet<i64> = state
.outgoing
.keys()
.map(|&n| find(&mut parent, n))
.collect();
roots.len()
}
pub fn cycle_rank_snapshot(state: &SnapshotState) -> usize {
let v = state.node_count() as i64;
let e = state.edge_count() as i64;
let w = count_weakly_connected_components(state) as i64;
let beta1 = e - v + w;
beta1.max(0) as usize
}
pub fn temporal_persistence_sweep(versions: &[VersionedSnapshot]) -> Vec<TemporalPersistencePoint> {
versions
.iter()
.map(|vs| {
let sccs = strongly_connected_components_snapshot(&vs.state);
let active: usize = sccs.iter().map(|c| c.len()).sum();
let n_components = sccs.len();
let largest_size = sccs.iter().map(|c| c.len()).max().unwrap_or(0);
let fraction_largest = if active == 0 {
0.0
} else {
largest_size as f32 / active as f32
};
let cycle_rank = cycle_rank_snapshot(&vs.state);
let nontrivial: Vec<&HashSet<i64>> = sccs.iter().filter(|c| c.len() >= 2).collect();
let n_nontrivial_sccs = nontrivial.len();
let largest_nontrivial_size = nontrivial.iter().map(|c| c.len()).max().unwrap_or(0);
TemporalPersistencePoint {
version: vs.version,
active,
n_components,
largest_size,
fraction_largest,
cycle_rank,
n_nontrivial_sccs,
largest_nontrivial_size,
}
})
.collect()
}
pub fn compute_temporal_barcode(points: &[TemporalPersistencePoint]) -> Vec<TemporalBarcode> {
let mut open: Vec<(u64, usize)> = Vec::new();
let mut bars: Vec<TemporalBarcode> = Vec::new();
let mut prev_count = 0usize;
for pt in points {
let curr = pt.n_components;
if curr > prev_count {
for _ in 0..(curr - prev_count) {
open.push((pt.version, pt.largest_size));
}
}
for ob in &mut open {
ob.1 = ob.1.max(pt.largest_size);
}
if curr < prev_count {
let died = prev_count - curr;
let close_from = open.len().saturating_sub(died);
for (birth, peak) in open.drain(close_from..) {
bars.push(TemporalBarcode {
birth_version: birth,
death_version: Some(pt.version),
peak_size: peak,
});
}
}
prev_count = curr;
}
for (birth, peak) in open {
bars.push(TemporalBarcode {
birth_version: birth,
death_version: None,
peak_size: peak,
});
}
bars.sort_by_key(|b| b.birth_version);
bars
}
fn jaccard(a: &HashSet<i64>, b: &HashSet<i64>) -> f64 {
if a.is_empty() && b.is_empty() {
return 1.0;
}
let union_size = a.union(b).count();
if union_size == 0 {
return 0.0;
}
let inter_size = a.intersection(b).count();
inter_size as f64 / union_size as f64
}
fn match_sccs(prev: &[HashSet<i64>], next: &[HashSet<i64>]) -> Vec<Option<usize>> {
let mut pairs: Vec<(f64, usize, usize)> = Vec::new();
for (i, s_prev) in prev.iter().enumerate() {
for (j, s_next) in next.iter().enumerate() {
let sim = jaccard(s_prev, s_next);
if sim > 0.0 {
pairs.push((sim, i, j));
}
}
}
pairs.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
let mut prev_used = vec![false; prev.len()];
let mut next_match: Vec<Option<usize>> = vec![None; next.len()];
for (_, pi, ni) in pairs {
if !prev_used[pi] && next_match[ni].is_none() {
next_match[ni] = Some(pi);
prev_used[pi] = true;
}
}
next_match
}
fn lineage_barcode(versions: &[VersionedSnapshot], min_size: usize) -> Vec<LineageBarcode> {
if versions.is_empty() {
return Vec::new();
}
let sccs_per_version: Vec<(u64, Vec<HashSet<i64>>)> = versions
.iter()
.map(|vs| {
let filtered = strongly_connected_components_snapshot(&vs.state)
.into_iter()
.filter(|c| c.len() >= min_size)
.collect();
(vs.version, filtered)
})
.collect();
struct Lineage {
birth_version: u64,
birth_size: usize,
peak_size: usize,
final_size: usize,
versions_seen: usize,
members: HashSet<i64>,
}
let mut active: Vec<Lineage> = Vec::new();
let mut bars: Vec<LineageBarcode> = Vec::new();
for (i, &(version, ref sccs)) in sccs_per_version.iter().enumerate() {
if i == 0 {
for scc in sccs {
active.push(Lineage {
birth_version: version,
birth_size: scc.len(),
peak_size: scc.len(),
final_size: scc.len(),
versions_seen: 1,
members: scc.clone(),
});
}
continue;
}
let prev_members: Vec<&HashSet<i64>> = active.iter().map(|l| &l.members).collect();
let prev_owned: Vec<HashSet<i64>> = prev_members.into_iter().cloned().collect();
let next_match = match_sccs(&prev_owned, sccs);
let mut still_active = Vec::with_capacity(active.len());
for (li, lineage) in active.drain(..).enumerate() {
let matched_next = next_match.iter().position(|&m| m == Some(li));
if let Some(ni) = matched_next {
let scc = &sccs[ni];
still_active.push(Lineage {
birth_version: lineage.birth_version,
birth_size: lineage.birth_size,
peak_size: lineage.peak_size.max(scc.len()),
final_size: scc.len(),
versions_seen: lineage.versions_seen + 1,
members: scc.clone(),
});
} else {
bars.push(LineageBarcode {
birth_version: lineage.birth_version,
death_version: Some(version),
birth_size: lineage.birth_size,
peak_size: lineage.peak_size,
final_size: lineage.final_size,
versions_seen: lineage.versions_seen,
});
}
}
for (ni, match_opt) in next_match.iter().enumerate() {
if match_opt.is_none() {
still_active.push(Lineage {
birth_version: version,
birth_size: sccs[ni].len(),
peak_size: sccs[ni].len(),
final_size: sccs[ni].len(),
versions_seen: 1,
members: sccs[ni].clone(),
});
}
}
active = still_active;
}
for lineage in active {
bars.push(LineageBarcode {
birth_version: lineage.birth_version,
death_version: None,
birth_size: lineage.birth_size,
peak_size: lineage.peak_size,
final_size: lineage.final_size,
versions_seen: lineage.versions_seen,
});
}
bars.sort_by_key(|b| (b.birth_version, b.peak_size));
bars
}
pub fn scc_lineage_barcode(versions: &[VersionedSnapshot]) -> Vec<LineageBarcode> {
lineage_barcode(versions, 1)
}
pub fn cycle_scc_barcode(versions: &[VersionedSnapshot]) -> Vec<LineageBarcode> {
lineage_barcode(versions, 2)
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
fn state_from(outgoing: &[(&i64, &[i64])]) -> SnapshotState {
let mut out: HashMap<i64, Vec<i64>> = HashMap::new();
let mut inc: HashMap<i64, Vec<i64>> = HashMap::new();
let mut all_nodes: HashSet<i64> = HashSet::new();
for &(src, dsts) in outgoing {
all_nodes.insert(*src);
for &d in dsts {
out.entry(*src).or_default().push(d);
inc.entry(d).or_default().push(*src);
all_nodes.insert(d);
}
out.entry(*src).or_default();
}
for &n in &all_nodes {
out.entry(n).or_default();
}
SnapshotState::new(&out, &inc)
}
fn vs(version: u64, outgoing: &[(&i64, &[i64])]) -> VersionedSnapshot {
VersionedSnapshot {
version,
created_at: std::time::SystemTime::now(),
state: std::sync::Arc::new(state_from(outgoing)),
}
}
#[test]
fn scc_empty_state_has_zero_components() {
let state = SnapshotState::new(&HashMap::new(), &HashMap::new());
assert!(strongly_connected_components_snapshot(&state).is_empty());
}
#[test]
fn scc_isolated_nodes_are_all_trivial() {
let state = state_from(&[(&1, &[]), (&2, &[]), (&3, &[])]);
let sccs = strongly_connected_components_snapshot(&state);
assert_eq!(sccs.len(), 3);
for c in &sccs {
assert_eq!(c.len(), 1);
}
}
#[test]
fn scc_directed_cycle_is_one_component() {
let state = state_from(&[(&1, &[2]), (&2, &[3]), (&3, &[1])]);
let sccs = strongly_connected_components_snapshot(&state);
assert_eq!(sccs.len(), 1);
assert_eq!(sccs[0].len(), 3);
}
#[test]
fn scc_dag_all_trivial() {
let state = state_from(&[(&1, &[2]), (&2, &[3])]);
let sccs = strongly_connected_components_snapshot(&state);
assert_eq!(sccs.len(), 3);
}
#[test]
fn cycle_rank_empty_is_zero() {
let state = SnapshotState::new(&HashMap::new(), &HashMap::new());
assert_eq!(cycle_rank_snapshot(&state), 0);
}
#[test]
fn cycle_rank_dag_is_zero() {
let state = state_from(&[(&1, &[2]), (&2, &[3])]);
assert_eq!(cycle_rank_snapshot(&state), 0);
}
#[test]
fn cycle_rank_single_cycle_is_one() {
let state = state_from(&[(&1, &[2]), (&2, &[3]), (&3, &[1])]);
assert_eq!(cycle_rank_snapshot(&state), 1);
}
#[test]
fn cycle_rank_two_disconnected_cycles_is_two() {
let state = state_from(&[(&1, &[2]), (&2, &[1]), (&3, &[4]), (&4, &[3])]);
assert_eq!(cycle_rank_snapshot(&state), 2);
}
#[test]
fn sweep_tracks_cycle_metrics() {
let versions = [
vs(1, &[(&1, &[]), (&2, &[]), (&3, &[])]), vs(2, &[(&1, &[2]), (&2, &[3]), (&3, &[1])]), ];
let pts = temporal_persistence_sweep(&versions);
assert_eq!(pts.len(), 2);
assert_eq!(pts[0].cycle_rank, 0);
assert_eq!(pts[0].n_nontrivial_sccs, 0);
assert_eq!(pts[1].cycle_rank, 1);
assert_eq!(pts[1].n_nontrivial_sccs, 1);
assert_eq!(pts[1].largest_nontrivial_size, 3);
}
#[test]
fn lineage_barcode_stable_component_survives() {
let versions = [
vs(1, &[(&1, &[2]), (&2, &[1])]),
vs(2, &[(&1, &[2]), (&2, &[1])]),
];
let bars = scc_lineage_barcode(&versions);
assert_eq!(bars.len(), 1);
assert_eq!(bars[0].birth_version, 1);
assert_eq!(bars[0].death_version, None);
assert_eq!(bars[0].versions_seen, 2);
}
#[test]
fn lineage_barcode_birth_and_death_exact() {
let versions = [
vs(1, &[(&1, &[]), (&2, &[]), (&3, &[])]),
vs(2, &[(&1, &[]), (&2, &[]), (&3, &[])]),
vs(3, &[]),
];
let bars = scc_lineage_barcode(&versions);
assert_eq!(bars.len(), 3);
for b in &bars {
assert_eq!(b.birth_version, 1);
assert_eq!(b.death_version, Some(3));
assert_eq!(b.versions_seen, 2); }
}
#[test]
fn lineage_barcode_merges_tracked_by_identity() {
let versions = [
vs(1, &[(&1, &[2]), (&2, &[1]), (&3, &[4]), (&4, &[3])]),
vs(2, &[(&1, &[2]), (&2, &[3]), (&3, &[4]), (&4, &[1])]),
];
let bars = scc_lineage_barcode(&versions);
let survived = bars.iter().filter(|b| b.death_version.is_none()).count();
let died = bars.iter().filter(|b| b.death_version.is_some()).count();
assert_eq!(survived, 1);
assert_eq!(died, 1);
let survivor = bars.iter().find(|b| b.death_version.is_none()).unwrap();
assert_eq!(survivor.peak_size, 4);
}
#[test]
fn cycle_barcode_no_cycles_no_bars() {
let versions = [
vs(1, &[(&1, &[2]), (&2, &[3])]),
vs(2, &[(&1, &[2]), (&2, &[3])]),
];
let bars = cycle_scc_barcode(&versions);
assert!(bars.is_empty(), "no cycles → no bars");
}
#[test]
fn cycle_barcode_born_then_broken() {
let versions = [
vs(1, &[(&1, &[2])]),
vs(2, &[(&1, &[2]), (&2, &[1])]),
vs(3, &[(&1, &[2])]),
];
let bars = cycle_scc_barcode(&versions);
assert_eq!(bars.len(), 1);
assert_eq!(bars[0].birth_version, 2);
assert_eq!(bars[0].death_version, Some(3));
assert_eq!(bars[0].peak_size, 2);
assert_eq!(bars[0].birth_size, 2);
}
#[test]
fn cycle_barcode_grows_then_shrinks() {
let versions = [
vs(1, &[(&1, &[2]), (&2, &[1])]),
vs(2, &[(&1, &[2]), (&2, &[3]), (&3, &[1])]),
vs(3, &[(&1, &[2]), (&2, &[1])]),
];
let bars = cycle_scc_barcode(&versions);
assert_eq!(bars.len(), 1);
assert_eq!(bars[0].birth_version, 1);
assert_eq!(bars[0].death_version, None);
assert_eq!(bars[0].peak_size, 3);
assert_eq!(bars[0].final_size, 2);
}
#[test]
fn cycle_barcode_two_independent_cycles() {
let versions = [
vs(1, &[(&1, &[2]), (&2, &[1]), (&3, &[4]), (&4, &[3])]),
vs(2, &[(&1, &[2]), (&2, &[1]), (&3, &[4]), (&4, &[3])]),
];
let bars = cycle_scc_barcode(&versions);
assert_eq!(bars.len(), 2);
assert!(bars.iter().all(|b| b.versions_seen == 2));
assert!(bars.iter().all(|b| b.death_version.is_none()));
}
#[test]
fn deprecated_barcode_basic() {
let pts = temporal_persistence_sweep(&[
vs(1, &[(&1, &[]), (&2, &[]), (&3, &[])]),
vs(2, &[(&1, &[2]), (&2, &[3]), (&3, &[1])]),
]);
let bars = compute_temporal_barcode(&pts);
assert!(!bars.is_empty());
}
#[test]
fn jaccard_identical_sets_is_one() {
let a: HashSet<i64> = [1, 2, 3].into_iter().collect();
let b: HashSet<i64> = [1, 2, 3].into_iter().collect();
assert!((jaccard(&a, &b) - 1.0).abs() < 1e-9);
}
#[test]
fn jaccard_disjoint_is_zero() {
let a: HashSet<i64> = [1, 2].into_iter().collect();
let b: HashSet<i64> = [3, 4].into_iter().collect();
assert!(jaccard(&a, &b).abs() < 1e-9);
}
#[test]
fn jaccard_half_overlap() {
let a: HashSet<i64> = [1, 2].into_iter().collect();
let b: HashSet<i64> = [2, 3].into_iter().collect();
assert!((jaccard(&a, &b) - 1.0 / 3.0).abs() < 1e-9);
}
}