use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use super::super::file::FileId;
use super::super::node::NodeId;
use super::super::storage::CsrGraph;
use super::delta::DeltaEdge;
use super::kind::EdgeKind;
use super::store::{EdgeStore, EdgeStoreStats, StoreEdgeRef};
#[derive(Debug, Serialize, Deserialize)]
pub struct BidirectionalEdgeStore {
#[serde(with = "rwlock_edge_store_serde")]
forward: RwLock<EdgeStore>,
#[serde(with = "rwlock_edge_store_serde")]
reverse: RwLock<EdgeStore>,
}
impl BidirectionalEdgeStore {
#[must_use]
pub fn new() -> Self {
Self {
forward: RwLock::new(EdgeStore::new()),
reverse: RwLock::new(EdgeStore::new()),
}
}
pub fn add_edge(
&self,
source: NodeId,
target: NodeId,
kind: EdgeKind,
file: FileId,
) -> DeltaEdge {
self.add_edge_with_spans(source, target, kind, file, Vec::new())
}
pub fn add_edge_with_spans(
&self,
source: NodeId,
target: NodeId,
kind: EdgeKind,
file: FileId,
spans: Vec<crate::graph::node::Span>,
) -> DeltaEdge {
let forward_edge = self.forward.write().add_edge_with_spans(
source,
target,
kind.clone(),
file,
spans.clone(),
);
self.reverse
.write()
.add_edge_with_spans(target, source, kind, file, spans);
forward_edge
}
pub fn remove_edge(
&self,
source: NodeId,
target: NodeId,
kind: EdgeKind,
file: FileId,
) -> DeltaEdge {
let forward_edge = self
.forward
.write()
.remove_edge(source, target, kind.clone(), file);
self.reverse.write().remove_edge(target, source, kind, file);
forward_edge
}
pub fn edges_from(&self, source: NodeId) -> Vec<StoreEdgeRef> {
self.forward.read().edges_from(source)
}
pub fn edges_to(&self, target: NodeId) -> Vec<StoreEdgeRef> {
let reverse_edges = self.reverse.read().edges_from(target);
reverse_edges
.into_iter()
.map(|e| StoreEdgeRef {
source: e.target, target, kind: e.kind,
seq: e.seq,
file: e.file, spans: e.spans, })
.collect()
}
pub fn has_edge(&self, source: NodeId, target: NodeId, kind: &EdgeKind) -> bool {
self.forward.read().has_edge(source, target, kind)
}
pub fn stats(&self) -> BidirectionalEdgeStoreStats {
let forward_stats = self.forward.read().stats();
let reverse_stats = self.reverse.read().stats();
BidirectionalEdgeStoreStats {
forward: forward_stats,
reverse: reverse_stats,
}
}
#[must_use]
pub fn csr_version(&self) -> u64 {
self.forward.read().csr_version()
}
pub fn clear_file(&self, file: FileId) -> usize {
let forward_cleared = self.forward.write().clear_file(file);
let _reverse_cleared = self.reverse.write().clear_file(file);
forward_cleared
}
pub fn clear_delta(&self) {
self.forward.write().clear_delta();
self.reverse.write().clear_delta();
}
pub fn swap_csrs_and_clear_deltas(&self, forward_csr: CsrGraph, reverse_csr: CsrGraph) {
let mut forward = self.forward.write();
let mut reverse = self.reverse.write();
forward.swap_csr(forward_csr);
reverse.swap_csr(reverse_csr);
forward.clear_delta();
reverse.clear_delta();
}
pub fn forward(&self) -> parking_lot::RwLockReadGuard<'_, EdgeStore> {
self.forward.read()
}
pub fn reverse(&self) -> parking_lot::RwLockReadGuard<'_, EdgeStore> {
self.reverse.read()
}
pub fn forward_mut(&self) -> parking_lot::RwLockWriteGuard<'_, EdgeStore> {
self.forward.write()
}
pub fn reverse_mut(&self) -> parking_lot::RwLockWriteGuard<'_, EdgeStore> {
self.reverse.write()
}
pub fn add_edges_bulk_ordered(&self, file_edge_vecs: &[Vec<DeltaEdge>], expected_total: u64) {
let actual_total: u64 = file_edge_vecs.iter().map(|v| v.len() as u64).sum();
assert_eq!(
actual_total, expected_total,
"add_edges_bulk_ordered: actual edge count {actual_total} != expected {expected_total}"
);
let mut forward = self.forward.write();
let mut reverse = self.reverse.write();
let current_forward_seq = forward.seq_counter();
let current_reverse_seq = reverse.seq_counter();
let mut prev_seq: Option<u64> = None;
for file_edges in file_edge_vecs {
for edge in file_edges {
if let Some(prev) = prev_seq {
assert!(
edge.seq >= prev,
"add_edges_bulk_ordered: non-monotonic seq: {} follows {prev}",
edge.seq
);
}
assert!(
edge.seq >= current_forward_seq,
"add_edges_bulk_ordered: edge seq {} < forward store counter {current_forward_seq}",
edge.seq
);
assert!(
edge.seq >= current_reverse_seq,
"add_edges_bulk_ordered: edge seq {} < reverse store counter {current_reverse_seq}",
edge.seq
);
prev_seq = Some(edge.seq);
forward.delta_mut().push(edge.clone());
let reverse_edge = DeltaEdge::with_spans(
edge.target,
edge.source,
edge.kind.clone(),
edge.seq,
edge.op,
edge.file,
edge.spans.clone(),
);
reverse.delta_mut().push(reverse_edge);
}
}
if let Some(max) = prev_seq {
forward.delta_mut().advance_seq_to(max + 1);
reverse.delta_mut().advance_seq_to(max + 1);
}
}
}
impl Default for BidirectionalEdgeStore {
fn default() -> Self {
Self::new()
}
}
impl Clone for BidirectionalEdgeStore {
fn clone(&self) -> Self {
let forward_data = self.forward.read().clone();
let reverse_data = self.reverse.read().clone();
Self {
forward: RwLock::new(forward_data),
reverse: RwLock::new(reverse_data),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BidirectionalEdgeStoreStats {
pub forward: EdgeStoreStats,
pub reverse: EdgeStoreStats,
}
#[cfg(test)]
mod tests {
use super::super::delta::DeltaOp;
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_bidirectional_new() {
let store = BidirectionalEdgeStore::new();
assert_eq!(store.csr_version(), 0);
}
#[test]
fn test_default() {
let store: BidirectionalEdgeStore = BidirectionalEdgeStore::default();
assert_eq!(store.csr_version(), 0);
}
#[test]
fn test_add_updates_both_directions() {
let store = BidirectionalEdgeStore::new();
let source = NodeId::new(1, 0);
let target = NodeId::new(2, 0);
let file = FileId::new(10);
store.add_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
let forward_edges = store.edges_from(source);
assert_eq!(forward_edges.len(), 1);
assert_eq!(forward_edges[0].target, target);
let reverse_edges = store.edges_to(target);
assert_eq!(reverse_edges.len(), 1);
assert_eq!(reverse_edges[0].source, source); }
#[test]
fn test_remove_updates_both_directions() {
let store = BidirectionalEdgeStore::new();
let source = NodeId::new(1, 0);
let target = NodeId::new(2, 0);
let file = FileId::new(10);
store.add_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.remove_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
let forward_stats = store.stats().forward;
assert_eq!(forward_stats.delta_edge_count, 2);
let reverse_stats = store.stats().reverse;
assert_eq!(reverse_stats.delta_edge_count, 2); }
#[test]
fn test_forward_reverse_consistency() {
let store = BidirectionalEdgeStore::new();
let file = FileId::new(10);
let n1 = NodeId::new(1, 0);
let n2 = NodeId::new(2, 0);
let n3 = NodeId::new(3, 0);
store.add_edge(
n1,
n2,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.add_edge(
n2,
n3,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.add_edge(
n1,
n3,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
let from_n1 = store.edges_from(n1);
assert_eq!(from_n1.len(), 2);
let to_n3 = store.edges_to(n3);
assert_eq!(to_n3.len(), 2);
let total_outgoing =
store.edges_from(n1).len() + store.edges_from(n2).len() + store.edges_from(n3).len();
let total_incoming =
store.edges_to(n1).len() + store.edges_to(n2).len() + store.edges_to(n3).len();
assert_eq!(total_outgoing, total_incoming);
}
#[test]
fn test_concurrent_access() {
let store = Arc::new(BidirectionalEdgeStore::new());
let file = FileId::new(10);
let mut handles = vec![];
for i in 0..4 {
let store_clone = Arc::clone(&store);
let handle = thread::spawn(move || {
for j in 0..100 {
let source = NodeId::new(i * 100 + j, 0);
let target = NodeId::new((i * 100 + j + 1) % 400, 0);
store_clone.add_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
}
});
handles.push(handle);
}
for _ in 0..4 {
let store_clone = Arc::clone(&store);
let handle = thread::spawn(move || {
for i in 0..100 {
let node = NodeId::new(i, 0);
let _ = store_clone.edges_from(node);
let _ = store_clone.edges_to(node);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().expect("Thread panicked");
}
let stats = store.stats();
assert!(stats.forward.delta_edge_count > 0);
}
#[test]
fn test_has_edge() {
let store = BidirectionalEdgeStore::new();
let source = NodeId::new(1, 0);
let target = NodeId::new(2, 0);
let file = FileId::new(10);
assert!(!store.has_edge(
source,
target,
&EdgeKind::Calls {
argument_count: 0,
is_async: false
}
));
store.add_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
assert!(store.has_edge(
source,
target,
&EdgeKind::Calls {
argument_count: 0,
is_async: false
}
));
assert!(!store.has_edge(source, target, &EdgeKind::References));
}
#[test]
fn test_clear_file() {
let store = BidirectionalEdgeStore::new();
let file1 = FileId::new(10);
let file2 = FileId::new(20);
store.add_edge(
NodeId::new(1, 0),
NodeId::new(2, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file1,
);
store.add_edge(
NodeId::new(2, 0),
NodeId::new(3, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file1,
);
store.add_edge(
NodeId::new(3, 0),
NodeId::new(4, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file2,
);
let cleared = store.clear_file(file1);
assert_eq!(cleared, 2);
let stats = store.stats();
assert_eq!(stats.forward.delta_edge_count, 1);
}
#[test]
fn test_clear_delta() {
let store = BidirectionalEdgeStore::new();
let file = FileId::new(10);
store.add_edge(
NodeId::new(1, 0),
NodeId::new(2, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.add_edge(
NodeId::new(2, 0),
NodeId::new(3, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
assert!(store.stats().forward.delta_edge_count > 0);
store.clear_delta();
assert_eq!(store.stats().forward.delta_edge_count, 0);
assert_eq!(store.stats().reverse.delta_edge_count, 0);
}
#[test]
fn test_stats() {
let store = BidirectionalEdgeStore::new();
let file = FileId::new(10);
store.add_edge(
NodeId::new(1, 0),
NodeId::new(2, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
let stats = store.stats();
assert_eq!(stats.forward.delta_edge_count, 1);
assert_eq!(stats.reverse.delta_edge_count, 1);
}
fn make_delta_edge(source: u32, target: u32, seq: u64, file: u32) -> DeltaEdge {
DeltaEdge::new(
NodeId::new(source, 0),
NodeId::new(target, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
seq,
DeltaOp::Add,
FileId::new(file),
)
}
#[test]
fn test_add_edges_bulk_ordered_basic() {
let store = BidirectionalEdgeStore::new();
let file1_edges = vec![make_delta_edge(1, 2, 0, 10), make_delta_edge(3, 4, 1, 10)];
let file2_edges = vec![make_delta_edge(5, 6, 2, 20), make_delta_edge(7, 8, 3, 20)];
store.add_edges_bulk_ordered(&[file1_edges, file2_edges], 4);
let stats = store.stats();
assert_eq!(stats.forward.delta_edge_count, 4);
assert_eq!(stats.reverse.delta_edge_count, 4);
let edges_from_1 = store.edges_from(NodeId::new(1, 0));
assert_eq!(edges_from_1.len(), 1);
assert_eq!(edges_from_1[0].target, NodeId::new(2, 0));
let edges_from_5 = store.edges_from(NodeId::new(5, 0));
assert_eq!(edges_from_5.len(), 1);
assert_eq!(edges_from_5[0].target, NodeId::new(6, 0));
let edges_to_2 = store.edges_to(NodeId::new(2, 0));
assert_eq!(edges_to_2.len(), 1);
assert_eq!(edges_to_2[0].source, NodeId::new(1, 0));
let edges_to_8 = store.edges_to(NodeId::new(8, 0));
assert_eq!(edges_to_8.len(), 1);
assert_eq!(edges_to_8[0].source, NodeId::new(7, 0));
let fwd_seq = store.forward().seq_counter();
assert_eq!(fwd_seq, 4);
let rev_seq = store.reverse().seq_counter();
assert_eq!(rev_seq, 4);
}
#[test]
fn test_add_edges_bulk_ordered_empty() {
let store = BidirectionalEdgeStore::new();
store.add_edges_bulk_ordered(&[], 0);
let stats = store.stats();
assert_eq!(stats.forward.delta_edge_count, 0);
assert_eq!(stats.reverse.delta_edge_count, 0);
}
#[test]
fn test_add_edges_bulk_ordered_single_file() {
let store = BidirectionalEdgeStore::new();
let edges = vec![
make_delta_edge(10, 20, 0, 5),
make_delta_edge(20, 30, 1, 5),
make_delta_edge(30, 10, 2, 5),
];
store.add_edges_bulk_ordered(&[edges], 3);
let stats = store.stats();
assert_eq!(stats.forward.delta_edge_count, 3);
assert_eq!(stats.reverse.delta_edge_count, 3);
let from_10 = store.edges_from(NodeId::new(10, 0));
assert_eq!(from_10.len(), 1);
assert_eq!(from_10[0].target, NodeId::new(20, 0));
let from_20 = store.edges_from(NodeId::new(20, 0));
assert_eq!(from_20.len(), 1);
assert_eq!(from_20[0].target, NodeId::new(30, 0));
let from_30 = store.edges_from(NodeId::new(30, 0));
assert_eq!(from_30.len(), 1);
assert_eq!(from_30[0].target, NodeId::new(10, 0));
let to_10 = store.edges_to(NodeId::new(10, 0));
assert_eq!(to_10.len(), 1);
assert_eq!(to_10[0].source, NodeId::new(30, 0));
assert_eq!(store.forward().seq_counter(), 3);
}
#[test]
#[should_panic(expected = "actual edge count")]
fn test_add_edges_bulk_ordered_wrong_expected_total() {
let store = BidirectionalEdgeStore::new();
let edges = vec![make_delta_edge(1, 2, 0, 1)];
store.add_edges_bulk_ordered(&[edges], 5);
}
#[test]
#[should_panic(expected = "non-monotonic seq")]
fn test_add_edges_bulk_ordered_non_monotonic_seq() {
let store = BidirectionalEdgeStore::new();
let edges = vec![
make_delta_edge(1, 2, 0, 1),
make_delta_edge(3, 4, 5, 1),
make_delta_edge(5, 6, 3, 1),
];
store.add_edges_bulk_ordered(&[edges], 3);
}
#[test]
#[should_panic(expected = "forward store counter")]
fn test_add_edges_bulk_ordered_stale_seq() {
let store = BidirectionalEdgeStore::new();
store.add_edge(
NodeId::new(1, 0),
NodeId::new(2, 0),
EdgeKind::References,
FileId::new(1),
);
let edges = vec![make_delta_edge(10, 20, 0, 5)];
store.add_edges_bulk_ordered(&[edges], 1);
}
#[test]
fn test_add_edges_bulk_ordered_seq_counter_allows_subsequent_ops() {
let store = BidirectionalEdgeStore::new();
let edges = vec![make_delta_edge(1, 2, 10, 1), make_delta_edge(3, 4, 20, 1)];
store.add_edges_bulk_ordered(&[edges], 2);
assert_eq!(store.forward().seq_counter(), 21);
let added = store.add_edge(
NodeId::new(100, 0),
NodeId::new(200, 0),
EdgeKind::References,
FileId::new(1),
);
assert!(
added.seq >= 21,
"subsequent edge should have seq >= 21, got {}",
added.seq
);
assert_eq!(store.stats().forward.delta_edge_count, 3);
}
#[test]
fn test_swap_csrs_and_clear_deltas() {
use super::super::super::compaction::{Direction, build_compacted_csr, snapshot_edges};
let store = BidirectionalEdgeStore::new();
let source = NodeId::new(0, 0);
let target = NodeId::new(1, 0);
let file = FileId::new(0);
store.add_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
assert!(store.forward().csr().is_none());
assert!(store.reverse().csr().is_none());
assert!(store.stats().forward.delta_edge_count > 0);
assert!(store.stats().reverse.delta_edge_count > 0);
let node_count = 2;
let fwd_snap = snapshot_edges(&store.forward(), node_count);
let (forward_csr, _) = build_compacted_csr(&fwd_snap, Direction::Forward).unwrap();
let rev_snap = snapshot_edges(&store.reverse(), node_count);
let (reverse_csr, _) = build_compacted_csr(&rev_snap, Direction::Reverse).unwrap();
store.swap_csrs_and_clear_deltas(forward_csr, reverse_csr);
assert!(store.forward().csr().is_some());
assert!(store.reverse().csr().is_some());
assert_eq!(store.stats().forward.delta_edge_count, 0);
assert_eq!(store.stats().reverse.delta_edge_count, 0);
let reverse_edges = store.edges_to(target);
assert!(!reverse_edges.is_empty(), "edges_to must return callers");
let has_caller = reverse_edges
.iter()
.any(|e| e.source == source && matches!(e.kind, EdgeKind::Calls { .. }));
assert!(has_caller, "Reverse traversal must find source as caller");
}
}
mod rwlock_edge_store_serde {
use parking_lot::RwLock;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use super::EdgeStore;
pub fn serialize<S>(value: &RwLock<EdgeStore>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
value.read().serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<RwLock<EdgeStore>, D::Error>
where
D: Deserializer<'de>,
{
let store = EdgeStore::deserialize(deserializer)?;
Ok(RwLock::new(store))
}
}