#![allow(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::float_cmp
)]
use super::edge::GraphEdge;
use super::edge_concurrent::ConcurrentEdgeStore;
use std::sync::Arc;
use std::thread;
#[test]
fn test_concurrent_store_add_and_get() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 100, 200, "KNOWS").expect("valid"))
.expect("add");
let outgoing = store.get_outgoing(100);
assert_eq!(outgoing.len(), 1);
assert_eq!(outgoing[0].target(), 200);
}
#[test]
fn test_concurrent_store_get_neighbors() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 100, 200, "A").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(2, 100, 300, "B").expect("valid"))
.expect("add");
let neighbors = store.get_neighbors(100);
assert_eq!(neighbors.len(), 2);
assert!(neighbors.contains(&200));
assert!(neighbors.contains(&300));
}
#[test]
fn test_concurrent_store_cascade_delete() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 100, 200, "A").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(2, 100, 300, "B").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(3, 400, 100, "C").expect("valid"))
.expect("add");
store.remove_node_edges(100);
assert!(store.get_outgoing(100).is_empty());
}
#[test]
fn test_traverse_bfs_single_hop() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 1, 2, "LINK").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(2, 1, 3, "LINK").expect("valid"))
.expect("add");
let reachable = store.traverse_bfs(1, 1);
assert!(reachable.contains(&1));
assert!(reachable.contains(&2));
assert!(reachable.contains(&3));
}
#[test]
fn test_traverse_bfs_multi_hop() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 1, 2, "NEXT").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(2, 2, 3, "NEXT").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(3, 3, 4, "NEXT").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(4, 4, 5, "NEXT").expect("valid"))
.expect("add");
let depth2 = store.traverse_bfs(1, 2);
assert!(depth2.contains(&1));
assert!(depth2.contains(&2));
assert!(depth2.contains(&3));
assert!(!depth2.contains(&4));
let depth4 = store.traverse_bfs(1, 4);
assert_eq!(depth4.len(), 5);
}
#[test]
fn test_traverse_bfs_with_cycle() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 1, 2, "NEXT").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(2, 2, 3, "NEXT").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(3, 3, 1, "NEXT").expect("valid"))
.expect("add");
let reachable = store.traverse_bfs(1, 10);
assert_eq!(reachable.len(), 3);
}
#[test]
fn test_traverse_bfs_disconnected() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 1, 2, "LINK").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(2, 100, 200, "OTHER").expect("valid")) .expect("add");
let reachable = store.traverse_bfs(1, 10);
assert!(reachable.contains(&1));
assert!(reachable.contains(&2));
assert!(!reachable.contains(&100));
assert!(!reachable.contains(&200));
}
#[test]
fn test_concurrent_reads_no_block() {
let store = Arc::new(ConcurrentEdgeStore::new());
for i in 0..100 {
store
.add_edge(GraphEdge::new(i, i, i + 1, "LINK").expect("valid"))
.expect("add");
}
let mut handles = vec![];
for _ in 0..10 {
let store_clone = Arc::clone(&store);
handles.push(thread::spawn(move || {
for i in 0..100 {
let _ = store_clone.get_outgoing(i);
}
}));
}
for h in handles {
h.join().expect("Thread panicked");
}
}
#[test]
fn test_concurrent_write_different_shards() {
let store = Arc::new(ConcurrentEdgeStore::with_shards(64).expect("test: valid shard count"));
let mut handles = vec![];
for t in 0..8 {
let store_clone = Arc::clone(&store);
handles.push(thread::spawn(move || {
for i in 0..100 {
let id = (t * 1000 + i) as u64;
let source = t as u64 * 1000 + i as u64;
let target = source + 1;
store_clone
.add_edge(GraphEdge::new(id, source, target, "LINK").expect("valid"))
.expect("add");
}
}));
}
for h in handles {
h.join().expect("Thread panicked");
}
assert_eq!(store.edge_count(), 800);
}
#[test]
fn test_concurrent_read_write_same_shard() {
let store = Arc::new(ConcurrentEdgeStore::with_shards(1).expect("test: valid shard count"));
let store_writer = Arc::clone(&store);
let store_reader = Arc::clone(&store);
let writer = thread::spawn(move || {
for i in 0..100 {
store_writer
.add_edge(GraphEdge::new(i, 1, i + 100, "LINK").expect("valid"))
.expect("add");
}
});
let reader = thread::spawn(move || {
for _ in 0..100 {
let _ = store_reader.get_outgoing(1);
}
});
writer.join().expect("Writer panicked");
reader.join().expect("Reader panicked");
}
#[test]
fn test_sharded_lock_ordering_no_deadlock() {
let store = Arc::new(ConcurrentEdgeStore::with_shards(4).expect("test: valid shard count"));
let mut handles = vec![];
for t in 0..4 {
let store_clone = Arc::clone(&store);
handles.push(thread::spawn(move || {
for i in 0..50 {
let source = (t * 100 + i) as u64;
let target = ((t + 1) % 4 * 100 + i) as u64;
store_clone
.add_edge(
GraphEdge::new((t * 1000 + i) as u64, source, target, "CROSS")
.expect("valid"),
)
.expect("add");
}
}));
}
for h in handles {
h.join().expect("Thread panicked - possible deadlock");
}
}
#[test]
fn test_get_incoming_cross_shard() {
let store = ConcurrentEdgeStore::with_shards(64).expect("test: valid shard count");
store
.add_edge(GraphEdge::new(1, 100, 200, "WROTE").expect("valid"))
.expect("add");
let outgoing = store.get_outgoing(100);
assert_eq!(outgoing.len(), 1, "get_outgoing should find the edge");
assert_eq!(outgoing[0].target(), 200);
let incoming = store.get_incoming(200);
assert_eq!(
incoming.len(),
1,
"get_incoming must find cross-shard edges"
);
assert_eq!(incoming[0].source(), 100);
}
#[test]
fn test_bidirectional_traversal_cross_shard() {
let store = ConcurrentEdgeStore::with_shards(64).expect("test: valid shard count");
store
.add_edge(GraphEdge::new(1, 0, 64, "A").expect("valid")) .expect("add");
store
.add_edge(GraphEdge::new(2, 1, 65, "B").expect("valid")) .expect("add");
store
.add_edge(GraphEdge::new(3, 2, 100, "C").expect("valid")) .expect("add");
assert_eq!(store.get_incoming(64).len(), 1);
assert_eq!(store.get_incoming(65).len(), 1);
assert_eq!(store.get_incoming(100).len(), 1);
}
#[test]
fn test_with_shards_zero_returns_error() {
let result = ConcurrentEdgeStore::with_shards(0);
assert!(result.is_err(), "with_shards(0) should return Err");
let err = result.err().expect("test: should be Err");
let err_msg = err.to_string();
assert!(
err_msg.contains("num_shards must be at least 1"),
"unexpected error: {err_msg}"
);
}
#[test]
fn test_remove_node_edges_cross_shard_cleanup() {
let store = ConcurrentEdgeStore::with_shards(64).expect("test: valid shard count");
store
.add_edge(GraphEdge::new(1, 100, 200, "WROTE").expect("valid"))
.expect("add");
assert_eq!(store.get_outgoing(100).len(), 1);
assert_eq!(store.get_incoming(200).len(), 1);
assert_eq!(store.edge_count(), 1);
store.remove_node_edges(100);
assert_eq!(
store.get_outgoing(100).len(),
0,
"Outgoing edges should be removed"
);
assert_eq!(
store.get_incoming(200).len(),
0,
"Incoming edges in other shard should also be cleaned up"
);
assert_eq!(
store.edge_count(),
0,
"Edge count should be 0 after cleanup"
);
}
#[test]
fn test_remove_node_edges_incoming_cross_shard() {
let store = ConcurrentEdgeStore::with_shards(64).expect("test: valid shard count");
store
.add_edge(GraphEdge::new(1, 200, 100, "POINTS_TO").expect("valid"))
.expect("add");
store.remove_node_edges(100);
assert_eq!(store.get_outgoing(200).len(), 0);
assert_eq!(store.get_incoming(100).len(), 0);
assert_eq!(store.edge_count(), 0);
}
#[test]
fn test_edge_count_across_shards() {
let store = ConcurrentEdgeStore::with_shards(4).expect("test: valid shard count");
for i in 0..100 {
store
.add_edge(GraphEdge::new(i, i, i + 1, "LINK").expect("valid"))
.expect("add");
}
assert_eq!(store.edge_count(), 100);
}
#[test]
fn test_concurrent_store_duplicate_id_rejected() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 100, 200, "FIRST").expect("valid"))
.expect("add first");
let result = store.add_edge(GraphEdge::new(1, 300, 400, "SECOND").expect("valid"));
assert!(result.is_err());
assert_eq!(store.edge_count(), 1);
let edges = store.get_outgoing(100);
assert_eq!(edges.len(), 1);
assert_eq!(edges[0].label(), "FIRST");
}
#[test]
fn test_concurrent_store_duplicate_id_cross_shard() {
let store = ConcurrentEdgeStore::with_shards(64).expect("test: valid shard count");
store
.add_edge(GraphEdge::new(1, 100, 200, "FIRST").expect("valid"))
.expect("add first");
let result = store.add_edge(GraphEdge::new(1, 1, 2, "SECOND").expect("valid"));
assert!(
result.is_err(),
"duplicate ID should be rejected even in different shards"
);
assert_eq!(store.edge_count(), 1);
}
#[test]
fn test_remove_node_edges_allows_id_reuse() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 100, 200, "FIRST").expect("valid"))
.expect("add first");
assert_eq!(store.edge_count(), 1);
store.remove_node_edges(100);
assert_eq!(store.edge_count(), 0);
let result = store.add_edge(GraphEdge::new(1, 300, 400, "REUSED").expect("valid"));
assert!(
result.is_ok(),
"should be able to reuse ID after remove_node_edges"
);
assert_eq!(store.edge_count(), 1);
}
#[test]
fn test_remove_edge_allows_id_reuse() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(42, 1, 2, "TEST").expect("valid"))
.expect("add");
assert_eq!(store.edge_count(), 1);
store.remove_edge(42);
assert_eq!(store.edge_count(), 0);
let result = store.add_edge(GraphEdge::new(42, 3, 4, "REUSED").expect("valid"));
assert!(
result.is_ok(),
"should be able to reuse ID after remove_edge"
);
}
#[test]
fn test_concurrent_remove_and_add_same_id() {
use std::sync::Arc;
use std::thread;
let store = Arc::new(ConcurrentEdgeStore::new());
store
.add_edge(GraphEdge::new(100, 1, 2, "INITIAL").expect("valid"))
.expect("add initial");
let mut handles = vec![];
for i in 0..10 {
let store_clone = Arc::clone(&store);
handles.push(thread::spawn(move || {
store_clone.remove_edge(100);
std::thread::yield_now();
let _ = store_clone.add_edge(
GraphEdge::new(100, (i * 10) as u64, (i * 10 + 1) as u64, "RETRY").expect("valid"),
);
}));
}
for handle in handles {
handle.join().unwrap();
}
assert!(
store.edge_count() <= 1,
"should have at most 1 edge, got {}",
store.edge_count()
);
}
#[test]
fn test_concurrent_remove_node_edges_and_add() {
use std::sync::Arc;
use std::thread;
let store = Arc::new(ConcurrentEdgeStore::new());
for i in 1..=5 {
store
.add_edge(GraphEdge::new(i, 1, i + 100, "LINK").expect("valid"))
.expect("add");
}
assert_eq!(store.edge_count(), 5);
let store1 = Arc::clone(&store);
let h1 = thread::spawn(move || {
store1.remove_node_edges(1);
});
let store2 = Arc::clone(&store);
let h2 = thread::spawn(move || {
std::thread::yield_now();
let _ = store2.add_edge(GraphEdge::new(3, 50, 51, "NEW").expect("valid"));
});
h1.join().unwrap();
h2.join().unwrap();
let count = store.edge_count();
assert!(
count <= 1,
"edge count should be <= 1 after remove_node_edges, got {}",
count
);
}
#[test]
fn test_cross_shard_add_edge_consistency() {
let store = ConcurrentEdgeStore::with_shards(64).expect("test: valid shard count");
store
.add_edge(GraphEdge::new(1, 100, 200, "CROSS").expect("valid"))
.expect("add cross-shard");
let outgoing = store.get_outgoing(100);
assert_eq!(outgoing.len(), 1, "outgoing index should have 1 edge");
assert_eq!(outgoing[0].id(), 1);
let incoming = store.get_incoming(200);
assert_eq!(incoming.len(), 1, "incoming index should have 1 edge");
assert_eq!(incoming[0].id(), 1);
assert_eq!(outgoing[0].source(), incoming[0].source());
assert_eq!(outgoing[0].target(), incoming[0].target());
}
#[test]
fn test_get_outgoing_by_label() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 100, 200, "KNOWS").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(2, 100, 300, "LIKES").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(3, 100, 400, "KNOWS").expect("valid"))
.expect("add");
let knows = store.get_outgoing_by_label(100, "KNOWS");
assert_eq!(knows.len(), 2);
let likes = store.get_outgoing_by_label(100, "LIKES");
assert_eq!(likes.len(), 1);
assert_eq!(likes[0].target(), 300);
let none = store.get_outgoing_by_label(100, "HATES");
assert!(none.is_empty());
}
#[test]
fn test_get_incoming_by_label() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 100, 500, "FOLLOWS").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(2, 200, 500, "FOLLOWS").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(3, 300, 500, "BLOCKS").expect("valid"))
.expect("add");
let follows = store.get_incoming_by_label(500, "FOLLOWS");
assert_eq!(follows.len(), 2);
let blocks = store.get_incoming_by_label(500, "BLOCKS");
assert_eq!(blocks.len(), 1);
}
#[test]
fn test_contains_edge() {
let store = ConcurrentEdgeStore::new();
assert!(!store.contains_edge(1));
store
.add_edge(GraphEdge::new(1, 100, 200, "TEST").expect("valid"))
.expect("add");
assert!(store.contains_edge(1));
assert!(!store.contains_edge(2));
store.remove_edge(1);
assert!(!store.contains_edge(1));
}
#[test]
fn test_get_edge() {
let store = ConcurrentEdgeStore::new();
assert!(store.get_edge(1).is_none());
store
.add_edge(GraphEdge::new(1, 100, 200, "TEST").expect("valid"))
.expect("add");
let edge = store.get_edge(1);
assert!(edge.is_some());
let edge = edge.unwrap();
assert_eq!(edge.id(), 1);
assert_eq!(edge.source(), 100);
assert_eq!(edge.target(), 200);
assert_eq!(edge.label(), "TEST");
assert!(store.get_edge(999).is_none());
}
#[test]
fn test_self_loop_remove_node_edges() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 100, 100, "SELF").expect("valid"))
.expect("add self-loop");
store
.add_edge(GraphEdge::new(2, 100, 200, "OTHER").expect("valid"))
.expect("add");
assert_eq!(store.edge_count(), 2);
store.remove_node_edges(100);
assert_eq!(store.edge_count(), 0);
assert!(store
.add_edge(GraphEdge::new(1, 1, 2, "REUSED").expect("valid"))
.is_ok());
assert!(store
.add_edge(GraphEdge::new(2, 3, 4, "REUSED").expect("valid"))
.is_ok());
}
#[test]
fn test_add_remove_race_registry_consistency() {
use std::sync::Arc;
use std::thread;
let store = Arc::new(ConcurrentEdgeStore::with_shards(4).expect("test: valid shard count"));
store
.add_edge(GraphEdge::new(1, 100, 200, "INITIAL").expect("valid"))
.expect("initial add");
let mut handles = vec![];
for i in 0..10 {
let store_clone = Arc::clone(&store);
handles.push(thread::spawn(move || {
store_clone.remove_edge(1);
let _ =
store_clone.add_edge(GraphEdge::new(1, 100 + i, 200 + i, "RETRY").expect("valid"));
}));
}
for handle in handles {
handle.join().expect("thread join");
}
let in_registry = store.contains_edge(1);
let in_shard = store.get_edge(1).is_some();
assert_eq!(
in_registry, in_shard,
"Registry and shard must be consistent: registry={}, shard={}",
in_registry, in_shard
);
}
#[test]
fn test_lock_ordering_no_deadlock_add_remove() {
use std::sync::Arc;
use std::thread;
let store = Arc::new(ConcurrentEdgeStore::with_shards(4).expect("test: valid shard count"));
let mut handles = vec![];
for i in 0..20 {
let store_clone = Arc::clone(&store);
handles.push(thread::spawn(move || {
let edge_id = (i % 5) as u64; for _ in 0..50 {
let _ = store_clone.add_edge(
GraphEdge::new(edge_id, i as u64 * 100, i as u64 * 100 + 1, "CONTEND")
.expect("valid"),
);
store_clone.remove_edge(edge_id);
}
}));
}
for handle in handles {
handle.join().expect("thread should not deadlock");
}
}
#[test]
fn test_default_shard_count_is_256() {
let store = ConcurrentEdgeStore::new();
for i in 0..256u64 {
store
.add_edge(GraphEdge::new(i, i, i + 1000, "TEST").expect("valid"))
.expect("add");
}
assert_eq!(store.edge_count(), 256);
store
.add_edge(GraphEdge::new(1000, 256, 1256, "TEST").expect("valid"))
.expect("add");
assert_eq!(store.edge_count(), 257);
}
#[test]
fn test_edge_distribution_across_256_shards() {
let store = ConcurrentEdgeStore::new();
for i in 0..2560u64 {
store
.add_edge(GraphEdge::new(i, i, i + 10000, "DIST").expect("valid"))
.expect("add");
}
assert_eq!(store.edge_count(), 2560);
}
#[test]
fn test_shard_selection_deterministic() {
let store1 = ConcurrentEdgeStore::new();
let store2 = ConcurrentEdgeStore::new();
store1
.add_edge(GraphEdge::new(1, 12345, 67890, "DET").expect("valid"))
.expect("add");
store2
.add_edge(GraphEdge::new(1, 12345, 67890, "DET").expect("valid"))
.expect("add");
let out1 = store1.get_outgoing(12345);
let out2 = store2.get_outgoing(12345);
assert_eq!(out1.len(), out2.len());
assert_eq!(out1[0].target(), out2[0].target());
}
#[test]
fn test_concurrent_insert_16_threads_256_shards() {
let store = Arc::new(ConcurrentEdgeStore::new());
let mut handles = vec![];
for t in 0..16 {
let store_clone = Arc::clone(&store);
handles.push(thread::spawn(move || {
for i in 0..1000 {
let id = (t * 10000 + i) as u64;
let source = (t * 10000 + i) as u64;
let target = source + 1;
store_clone
.add_edge(GraphEdge::new(id, source, target, "CONCURRENT").expect("valid"))
.expect("add");
}
}));
}
for h in handles {
h.join()
.expect("Thread panicked - possible deadlock with 256 shards");
}
assert_eq!(store.edge_count(), 16000);
}
#[test]
fn test_concurrent_insertions_all_edges_retrievable() {
let store = Arc::new(ConcurrentEdgeStore::with_shards(16).expect("test: valid shard count"));
let threads = 8;
let edges_per_thread = 200;
let mut handles = vec![];
for t in 0..threads {
let s = Arc::clone(&store);
handles.push(thread::spawn(move || {
for i in 0..edges_per_thread {
let id = (t * edges_per_thread + i) as u64;
let source = id * 7; let target = source + 1;
s.add_edge(GraphEdge::new(id, source, target, "LINK").expect("valid"))
.expect("add");
}
}));
}
for h in handles {
h.join().expect("writer panicked");
}
let expected_total = threads * edges_per_thread;
assert_eq!(store.edge_count(), expected_total);
for t in 0..threads {
for i in 0..edges_per_thread {
let id = (t * edges_per_thread + i) as u64;
let edge = store
.get_edge(id)
.unwrap_or_else(|| panic!("edge {id} missing after concurrent insert"));
assert_eq!(edge.source(), id * 7);
assert_eq!(edge.target(), id * 7 + 1);
}
}
}
#[test]
fn test_concurrent_read_write_monotonic_count() {
use std::sync::atomic::{AtomicBool, Ordering};
let store = Arc::new(ConcurrentEdgeStore::with_shards(16).expect("test: valid shard count"));
let done = Arc::new(AtomicBool::new(false));
let store_w = Arc::clone(&store);
let done_w = Arc::clone(&done);
let writer = thread::spawn(move || {
for i in 0..500u64 {
store_w
.add_edge(GraphEdge::new(i, i * 3, i * 3 + 1, "W").expect("valid"))
.expect("add");
}
done_w.store(true, Ordering::Release);
});
let store_r = Arc::clone(&store);
let done_r = Arc::clone(&done);
let reader = thread::spawn(move || {
let mut max_seen = 0usize;
loop {
let count = store_r.edge_count();
assert!(
count >= max_seen,
"edge_count went backwards: was {max_seen}, now {count}"
);
max_seen = count;
for id in 0..max_seen as u64 {
let _ = store_r.get_edge(id);
}
if done_r.load(Ordering::Acquire) {
break;
}
thread::yield_now();
}
});
writer.join().expect("writer panicked");
reader.join().expect("reader panicked");
assert_eq!(store.edge_count(), 500);
}
#[test]
fn test_shard_index_deterministic_and_consistent() {
for num_shards in [1, 4, 16, 64, 256] {
let store = ConcurrentEdgeStore::with_shards(num_shards).expect("test: valid shard count");
for node_id in [0u64, 1, 255, 256, 1000, u64::MAX] {
let expected = (node_id as usize) % num_shards;
let actual = store.shard_index(node_id);
assert_eq!(
actual, expected,
"shard_index({node_id}) with {num_shards} shards: expected {expected}, got {actual}"
);
assert_eq!(
store.shard_index(node_id),
actual,
"shard_index must be deterministic"
);
}
}
}
#[test]
fn test_same_source_always_same_shard() {
let store = ConcurrentEdgeStore::with_shards(8).expect("test: valid shard count");
let source_id = 42u64;
let expected_shard = (source_id as usize) % 8;
for i in 0..20u64 {
store
.add_edge(GraphEdge::new(i, source_id, i + 1000, "REL").expect("valid"))
.expect("add");
}
let outgoing = store.get_outgoing(source_id);
assert_eq!(outgoing.len(), 20);
assert_eq!(store.shard_index(source_id), expected_shard);
}
#[test]
fn test_snapshot_absent_by_default() {
let store = ConcurrentEdgeStore::new();
assert!(!store.has_read_snapshot());
}
#[test]
fn test_build_snapshot_populates_index() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 10, 20, "KNOWS").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(2, 10, 30, "LIKES").expect("valid"))
.expect("add");
store.build_read_snapshot();
assert!(store.has_read_snapshot());
}
#[test]
fn test_snapshot_get_neighbors_matches_shard_path() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 10, 20, "A").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(2, 10, 30, "B").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(3, 10, 40, "C").expect("valid"))
.expect("add");
let mut before: Vec<u64> = store.get_neighbors(10);
before.sort_unstable();
store.build_read_snapshot();
let mut after: Vec<u64> = store.get_neighbors(10);
after.sort_unstable();
assert_eq!(before, after);
assert_eq!(after, vec![20, 30, 40]);
}
#[test]
fn test_with_neighbors_closure_receives_correct_data() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 100, 200, "R").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(2, 100, 300, "R").expect("valid"))
.expect("add");
store.build_read_snapshot();
let mut collected = Vec::new();
store.with_neighbors(100, |neighbors| {
collected.extend_from_slice(neighbors);
});
collected.sort_unstable();
assert_eq!(collected, vec![200, 300]);
}
#[test]
fn test_with_neighbors_fallback_without_snapshot() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 5, 10, "R").expect("valid"))
.expect("add");
let mut result = Vec::new();
store.with_neighbors(5, |neighbors| {
result.extend_from_slice(neighbors);
});
assert_eq!(result, vec![10]);
}
#[test]
fn test_add_edge_invalidates_snapshot() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 10, 20, "R").expect("valid"))
.expect("add");
store.build_read_snapshot();
assert!(store.has_read_snapshot());
store
.add_edge(GraphEdge::new(2, 10, 30, "R").expect("valid"))
.expect("add");
assert!(!store.has_read_snapshot());
}
#[test]
fn test_remove_edge_invalidates_snapshot() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 10, 20, "R").expect("valid"))
.expect("add");
store.build_read_snapshot();
assert!(store.has_read_snapshot());
store.remove_edge(1);
assert!(!store.has_read_snapshot());
}
#[test]
fn test_remove_node_edges_invalidates_snapshot() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 10, 20, "R").expect("valid"))
.expect("add");
store.build_read_snapshot();
assert!(store.has_read_snapshot());
store.remove_node_edges(10);
assert!(!store.has_read_snapshot());
}
#[test]
fn test_outgoing_degree_uses_snapshot() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 10, 20, "R").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(2, 10, 30, "R").expect("valid"))
.expect("add");
assert_eq!(store.outgoing_degree(10), 2);
store.build_read_snapshot();
assert_eq!(store.outgoing_degree(10), 2);
}
#[test]
fn test_traverse_bfs_snapshot_matches_shard_path() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 1, 2, "R").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(2, 2, 3, "R").expect("valid"))
.expect("add");
store
.add_edge(GraphEdge::new(3, 3, 4, "R").expect("valid"))
.expect("add");
let mut without_snapshot: Vec<u64> = store.traverse_bfs(1, 10);
without_snapshot.sort_unstable();
store.build_read_snapshot();
let mut with_snapshot: Vec<u64> = store.traverse_bfs(1, 10);
with_snapshot.sort_unstable();
assert_eq!(without_snapshot, with_snapshot);
assert!(with_snapshot.contains(&1));
assert!(with_snapshot.contains(&2));
assert!(with_snapshot.contains(&3));
assert!(with_snapshot.contains(&4));
}
#[test]
fn test_rebuild_snapshot_after_mutation() {
let store = ConcurrentEdgeStore::new();
store
.add_edge(GraphEdge::new(1, 10, 20, "R").expect("valid"))
.expect("add");
store.build_read_snapshot();
store
.add_edge(GraphEdge::new(2, 10, 30, "R").expect("valid"))
.expect("add");
assert!(!store.has_read_snapshot());
store.build_read_snapshot();
assert!(store.has_read_snapshot());
let mut neighbors: Vec<u64> = store.get_neighbors(10);
neighbors.sort_unstable();
assert_eq!(neighbors, vec![20, 30]);
}
#[test]
fn test_from_edge_store_builds_snapshot() {
use super::edge::EdgeStore;
let mut es = EdgeStore::new();
es.add_edge(GraphEdge::new(1, 10, 20, "R").expect("valid"))
.expect("add");
es.add_edge(GraphEdge::new(2, 10, 30, "R").expect("valid"))
.expect("add");
let store = ConcurrentEdgeStore::from_edge_store(&es);
assert!(store.has_read_snapshot());
let mut neighbors: Vec<u64> = store.get_neighbors(10);
neighbors.sort_unstable();
assert_eq!(neighbors, vec![20, 30]);
}