use super::super::edge::{DeltaEdge, EdgeKind, EdgeStore};
use super::super::node::NodeId;
use super::super::storage::{CsrError, CsrGraph};
use super::errors::{BuildFailureReason, CompactionError, Direction};
use super::merge::{MergeStats, MergedEdge, merge_delta_edges};
#[derive(Debug, Clone)]
pub struct CompactionSnapshot {
pub csr_edges: Vec<MergedEdge>,
pub delta_edges: Vec<DeltaEdge>,
pub node_count: usize,
pub csr_version: u64,
}
impl CompactionSnapshot {
#[must_use]
pub fn empty(node_count: usize) -> Self {
Self {
csr_edges: Vec::new(),
delta_edges: Vec::new(),
node_count,
csr_version: 0,
}
}
#[must_use]
pub fn total_edges(&self) -> usize {
self.csr_edges.len() + self.delta_edges.len()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct BuildStats {
pub csr_input_edges: usize,
pub delta_input_edges: usize,
pub merge_stats: MergeStats,
pub output_edges: usize,
pub output_nodes: usize,
}
#[must_use]
pub fn snapshot_edges(store: &EdgeStore, node_count: usize) -> CompactionSnapshot {
let mut csr_edges = Vec::new();
if let Some(csr) = store.csr() {
for node_idx in 0..csr.node_count() {
let node_idx_u32 = u32::try_from(node_idx).expect("CSR node index exceeds u32::MAX");
for edge_ref in csr.edges_of(node_idx_u32) {
if store.is_edge_tombstoned(edge_ref.index) {
continue;
}
csr_edges.push(MergedEdge {
source: NodeId::new(node_idx_u32, 0), target: edge_ref.target,
kind: edge_ref.kind,
seq: edge_ref.seq,
file: super::super::file::FileId::new(0), spans: edge_ref.spans, });
}
}
}
let delta_edges: Vec<DeltaEdge> = store.delta().iter().cloned().collect();
CompactionSnapshot {
csr_edges,
delta_edges,
node_count,
csr_version: store.csr_version(),
}
}
pub fn build_compacted_csr(
snapshot: &CompactionSnapshot,
direction: Direction,
) -> Result<(CsrGraph, BuildStats), CompactionError> {
let csr_input_edges = snapshot.csr_edges.len();
let delta_input_edges = snapshot.delta_edges.len();
let node_count = snapshot.node_count;
if node_count == 0 {
return Ok((
CsrGraph::empty(0),
BuildStats {
csr_input_edges,
delta_input_edges,
merge_stats: MergeStats::default(),
output_edges: 0,
output_nodes: 0,
},
));
}
let mut all_delta_edges: Vec<DeltaEdge> = snapshot
.csr_edges
.iter()
.map(|e| {
DeltaEdge::with_spans(
e.source,
e.target,
e.kind.clone(),
e.seq,
super::super::edge::DeltaOp::Add,
e.file,
e.spans.clone(),
)
})
.collect();
all_delta_edges.extend(snapshot.delta_edges.iter().cloned());
let (merged_edges, merge_stats) = merge_delta_edges(all_delta_edges);
let csr = build_csr_from_edges(&merged_edges, node_count).map_err(|e| {
CompactionError::BuildFailed {
direction,
reason: BuildFailureReason::BuilderError {
message: e.to_string(),
},
}
})?;
let output_edges = csr.edge_count();
let output_nodes = csr.node_count();
Ok((
csr,
BuildStats {
csr_input_edges,
delta_input_edges,
merge_stats,
output_edges,
output_nodes,
},
))
}
pub fn build_csr_from_edges(edges: &[MergedEdge], node_count: usize) -> Result<CsrGraph, CsrError> {
if node_count == 0 {
return Ok(CsrGraph::empty(0));
}
let mut sorted_edges: Vec<&MergedEdge> = edges.iter().collect();
sorted_edges.sort_by_key(|e| e.source.index());
let mut edge_counts: Vec<usize> = vec![0; node_count];
for edge in &sorted_edges {
let src_idx = edge.source.index() as usize;
if src_idx < node_count {
edge_counts[src_idx] += 1;
}
}
let mut row_ptr: Vec<u32> = Vec::with_capacity(node_count + 1);
row_ptr.push(0);
let mut cumulative = 0u32;
for count in &edge_counts {
let count_u32 = u32::try_from(*count).expect("CSR edge count exceeds u32::MAX");
cumulative = cumulative
.checked_add(count_u32)
.expect("CSR row_ptr overflow");
row_ptr.push(cumulative);
}
let edge_count = sorted_edges.len();
let mut col_idx: Vec<NodeId> = Vec::with_capacity(edge_count);
let mut edge_kind: Vec<EdgeKind> = Vec::with_capacity(edge_count);
let mut edge_seq: Vec<u64> = Vec::with_capacity(edge_count);
let mut edge_spans: Vec<Vec<crate::graph::node::Span>> = Vec::with_capacity(edge_count);
for edge in sorted_edges {
col_idx.push(edge.target);
edge_kind.push(edge.kind.clone());
edge_seq.push(edge.seq);
edge_spans.push(edge.spans.clone());
}
let csr = CsrGraph::from_raw(
node_count, row_ptr, col_idx, edge_kind, edge_seq, edge_spans,
);
csr.validate()?;
Ok(csr)
}
#[cfg(test)]
mod tests {
use super::super::super::edge::{DeltaOp, EdgeKind};
use super::super::super::file::FileId;
use super::super::super::node::NodeId;
use super::*;
fn make_merged_edge(source: u32, target: u32, seq: u64) -> MergedEdge {
MergedEdge::new(
NodeId::new(source, 0),
NodeId::new(target, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
seq,
FileId::new(1),
)
}
fn make_delta_edge(source: u32, target: u32, seq: u64, op: DeltaOp) -> DeltaEdge {
DeltaEdge::new(
NodeId::new(source, 0),
NodeId::new(target, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
seq,
op,
FileId::new(1),
)
}
#[test]
fn test_compaction_snapshot_empty() {
let snapshot = CompactionSnapshot::empty(10);
assert_eq!(snapshot.node_count, 10);
assert_eq!(snapshot.total_edges(), 0);
assert_eq!(snapshot.csr_version, 0);
}
#[test]
fn test_build_csr_from_edges_empty() {
let edges: Vec<MergedEdge> = vec![];
let csr = build_csr_from_edges(&edges, 5).unwrap();
assert_eq!(csr.node_count(), 5);
assert_eq!(csr.edge_count(), 0);
assert!(csr.validate().is_ok());
}
#[test]
fn test_build_csr_from_edges_simple() {
let edges = vec![
make_merged_edge(0, 1, 1),
make_merged_edge(0, 2, 2),
make_merged_edge(1, 2, 3),
];
let csr = build_csr_from_edges(&edges, 3).unwrap();
assert_eq!(csr.node_count(), 3);
assert_eq!(csr.edge_count(), 3);
assert_eq!(csr.out_degree(0), 2);
assert_eq!(csr.out_degree(1), 1);
assert_eq!(csr.out_degree(2), 0);
assert!(csr.validate().is_ok());
}
#[test]
fn test_build_csr_from_edges_unsorted() {
let edges = vec![
make_merged_edge(2, 0, 3),
make_merged_edge(0, 1, 1),
make_merged_edge(1, 2, 2),
];
let csr = build_csr_from_edges(&edges, 3).unwrap();
assert_eq!(csr.edge_count(), 3);
assert_eq!(csr.out_degree(0), 1);
assert_eq!(csr.out_degree(1), 1);
assert_eq!(csr.out_degree(2), 1);
assert!(csr.validate().is_ok());
}
#[test]
fn test_build_csr_from_edges_zero_nodes() {
let edges: Vec<MergedEdge> = vec![];
let csr = build_csr_from_edges(&edges, 0).unwrap();
assert_eq!(csr.node_count(), 0);
assert_eq!(csr.edge_count(), 0);
assert!(csr.validate().is_ok());
}
#[test]
fn test_build_compacted_csr_empty_snapshot() {
let snapshot = CompactionSnapshot::empty(5);
let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
assert_eq!(csr.node_count(), 5);
assert_eq!(csr.edge_count(), 0);
assert_eq!(stats.csr_input_edges, 0);
assert_eq!(stats.delta_input_edges, 0);
assert_eq!(stats.output_edges, 0);
}
#[test]
fn test_build_compacted_csr_with_delta_edges() {
let snapshot = CompactionSnapshot {
csr_edges: vec![],
delta_edges: vec![
make_delta_edge(0, 1, 1, DeltaOp::Add),
make_delta_edge(1, 2, 2, DeltaOp::Add),
],
node_count: 3,
csr_version: 0,
};
let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
assert_eq!(csr.edge_count(), 2);
assert_eq!(stats.delta_input_edges, 2);
assert_eq!(stats.output_edges, 2);
}
#[test]
fn test_build_compacted_csr_merge_removes_duplicates() {
let snapshot = CompactionSnapshot {
csr_edges: vec![make_merged_edge(0, 1, 1)], delta_edges: vec![
make_delta_edge(0, 1, 5, DeltaOp::Add), ],
node_count: 3,
csr_version: 1,
};
let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
assert_eq!(csr.edge_count(), 1);
assert_eq!(stats.csr_input_edges, 1);
assert_eq!(stats.delta_input_edges, 1);
assert_eq!(stats.output_edges, 1);
let edge = csr.edge_at(0).unwrap();
assert_eq!(edge.seq, 5);
}
#[test]
fn test_build_compacted_csr_remove_wins() {
let snapshot = CompactionSnapshot {
csr_edges: vec![make_merged_edge(0, 1, 1)], delta_edges: vec![
make_delta_edge(0, 1, 5, DeltaOp::Remove), ],
node_count: 3,
csr_version: 1,
};
let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
assert_eq!(csr.edge_count(), 0);
assert_eq!(stats.csr_input_edges, 1);
assert_eq!(stats.delta_input_edges, 1);
assert_eq!(stats.output_edges, 0);
}
#[test]
fn test_build_compacted_csr_add_after_remove() {
let snapshot = CompactionSnapshot {
csr_edges: vec![],
delta_edges: vec![
make_delta_edge(0, 1, 1, DeltaOp::Add),
make_delta_edge(0, 1, 2, DeltaOp::Remove),
make_delta_edge(0, 1, 3, DeltaOp::Add), ],
node_count: 3,
csr_version: 0,
};
let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
assert_eq!(csr.edge_count(), 1);
assert_eq!(stats.delta_input_edges, 3);
assert_eq!(stats.output_edges, 1);
assert_eq!(stats.merge_stats.deduplicated_count, 2);
}
#[test]
fn test_build_stats() {
let snapshot = CompactionSnapshot {
csr_edges: vec![make_merged_edge(0, 1, 1), make_merged_edge(1, 2, 2)],
delta_edges: vec![
make_delta_edge(2, 0, 3, DeltaOp::Add),
make_delta_edge(0, 1, 4, DeltaOp::Add), ],
node_count: 3,
csr_version: 1,
};
let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
assert_eq!(stats.csr_input_edges, 2);
assert_eq!(stats.delta_input_edges, 2);
assert_eq!(stats.output_edges, 3); assert_eq!(stats.output_nodes, 3);
assert!(csr.validate().is_ok());
}
#[test]
fn test_build_csr_preserves_edge_data() {
use super::super::super::edge::EdgeKind;
let edges = vec![MergedEdge::new(
NodeId::new(0, 0),
NodeId::new(1, 0),
EdgeKind::References,
42,
FileId::new(99),
)];
let csr = build_csr_from_edges(&edges, 2).unwrap();
let edge_ref = csr.edge_at(0).unwrap();
assert_eq!(edge_ref.target, NodeId::new(1, 0));
assert_eq!(edge_ref.kind, EdgeKind::References);
assert_eq!(edge_ref.seq, 42);
}
}