use crate::codec::{BitPackedInts, DeltaBitPacked};
use grafeo_common::types::{EdgeId, NodeId};
use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
use parking_lot::RwLock;
use std::sync::atomic::{AtomicUsize, Ordering};
const DEFAULT_CHUNK_CAPACITY: usize = 64;
const DELTA_COMPACTION_THRESHOLD: usize = 64;
const COLD_COMPRESSION_THRESHOLD: usize = 4;
#[derive(Debug, Clone)]
struct AdjacencyChunk {
destinations: Vec<NodeId>,
edge_ids: Vec<EdgeId>,
capacity: usize,
}
impl AdjacencyChunk {
fn new(capacity: usize) -> Self {
Self {
destinations: Vec::with_capacity(capacity),
edge_ids: Vec::with_capacity(capacity),
capacity,
}
}
fn len(&self) -> usize {
self.destinations.len()
}
fn is_full(&self) -> bool {
self.destinations.len() >= self.capacity
}
fn push(&mut self, dst: NodeId, edge_id: EdgeId) -> bool {
if self.is_full() {
return false;
}
self.destinations.push(dst);
self.edge_ids.push(edge_id);
true
}
fn iter(&self) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
self.destinations
.iter()
.copied()
.zip(self.edge_ids.iter().copied())
}
fn compress(&self) -> CompressedAdjacencyChunk {
let mut entries: Vec<_> = self
.destinations
.iter()
.copied()
.zip(self.edge_ids.iter().copied())
.collect();
entries.sort_by_key(|(dst, _)| dst.as_u64());
let sorted_dsts: Vec<u64> = entries.iter().map(|(d, _)| d.as_u64()).collect();
let sorted_edges: Vec<u64> = entries.iter().map(|(_, e)| e.as_u64()).collect();
let max_destination = sorted_dsts.last().copied().unwrap_or(0);
CompressedAdjacencyChunk {
destinations: DeltaBitPacked::encode(&sorted_dsts),
edge_ids: BitPackedInts::pack(&sorted_edges),
count: entries.len(),
max_destination,
}
}
}
#[derive(Debug, Clone)]
struct CompressedAdjacencyChunk {
destinations: DeltaBitPacked,
edge_ids: BitPackedInts,
count: usize,
max_destination: u64,
}
impl CompressedAdjacencyChunk {
fn len(&self) -> usize {
self.count
}
#[cfg(test)]
fn is_empty(&self) -> bool {
self.count == 0
}
fn iter(&self) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
let dsts = self.destinations.decode();
let edges = self.edge_ids.unpack();
dsts.into_iter()
.zip(edges)
.map(|(d, e)| (NodeId::new(d), EdgeId::new(e)))
}
fn memory_size(&self) -> usize {
let dest_size = 8 + self.destinations.to_bytes().map_or(0, |b| b.len());
let edge_size = self.edge_ids.data().len() * 8;
dest_size + edge_size
}
#[must_use]
fn min_destination(&self) -> u64 {
self.destinations.base()
}
fn destinations_in_range(&self, min: u64, max: u64) -> Vec<(NodeId, EdgeId)> {
if min > self.max_destination || max < self.destinations.base() {
return Vec::new();
}
let destinations = self.destinations.decode();
let edges = self.edge_ids.unpack();
let start = destinations.partition_point(|&d| d < min);
let end = destinations.partition_point(|&d| d <= max);
destinations[start..end]
.iter()
.zip(&edges[start..end])
.map(|(&d, &e)| (NodeId::new(d), EdgeId::new(e)))
.collect()
}
#[cfg(test)]
fn compression_ratio(&self) -> f64 {
if self.count == 0 {
return 1.0;
}
let uncompressed = self.count * 16; let compressed = self.memory_size();
if compressed == 0 {
return f64::INFINITY;
}
uncompressed as f64 / compressed as f64
}
}
#[derive(Debug, Clone, Copy)]
struct SkipIndexEntry {
min_destination: u64,
max_destination: u64,
chunk_index: usize,
}
#[derive(Debug)]
struct AdjacencyList {
hot_chunks: Vec<AdjacencyChunk>,
cold_chunks: Vec<CompressedAdjacencyChunk>,
delta_inserts: Vec<(NodeId, EdgeId)>,
deleted: FxHashSet<EdgeId>,
skip_index: Vec<SkipIndexEntry>,
}
impl AdjacencyList {
fn new() -> Self {
Self {
hot_chunks: Vec::new(),
cold_chunks: Vec::new(),
delta_inserts: Vec::new(),
deleted: FxHashSet::default(),
skip_index: Vec::new(),
}
}
fn add_edge(&mut self, dst: NodeId, edge_id: EdgeId, chunk_capacity: usize) {
if let Some(last) = self.hot_chunks.last_mut()
&& last.push(dst, edge_id)
{
return;
}
self.delta_inserts.push((dst, edge_id));
if self.delta_inserts.len() >= DELTA_COMPACTION_THRESHOLD {
self.compact(chunk_capacity);
}
}
fn mark_deleted(&mut self, edge_id: EdgeId) {
self.deleted.insert(edge_id);
}
fn compact(&mut self, chunk_capacity: usize) {
if self.delta_inserts.is_empty() {
return;
}
let last_has_room = self.hot_chunks.last().is_some_and(|c| !c.is_full());
let mut current_chunk = if last_has_room {
self.hot_chunks
.pop()
.expect("hot_chunks is non-empty: is_some_and() succeeded on previous line")
} else {
AdjacencyChunk::new(chunk_capacity)
};
for (dst, edge_id) in self.delta_inserts.drain(..) {
if !current_chunk.push(dst, edge_id) {
self.hot_chunks.push(current_chunk);
current_chunk = AdjacencyChunk::new(chunk_capacity);
current_chunk.push(dst, edge_id);
}
}
if current_chunk.len() > 0 {
self.hot_chunks.push(current_chunk);
}
self.maybe_compress_to_cold();
}
fn maybe_compress_to_cold(&mut self) {
while self.hot_chunks.len() > COLD_COMPRESSION_THRESHOLD {
let oldest = self.hot_chunks.remove(0);
if oldest.len() == 0 {
continue;
}
let compressed = oldest.compress();
let chunk_index = self.cold_chunks.len();
self.skip_index.push(SkipIndexEntry {
min_destination: compressed.min_destination(),
max_destination: compressed.max_destination,
chunk_index,
});
self.cold_chunks.push(compressed);
}
self.skip_index.sort_unstable_by_key(|e| e.min_destination);
}
fn freeze_all(&mut self) {
for chunk in self.hot_chunks.drain(..) {
if chunk.len() > 0 {
let compressed = chunk.compress();
let chunk_index = self.cold_chunks.len();
self.skip_index.push(SkipIndexEntry {
min_destination: compressed.min_destination(),
max_destination: compressed.max_destination,
chunk_index,
});
self.cold_chunks.push(compressed);
}
}
self.skip_index.sort_unstable_by_key(|e| e.min_destination);
}
fn iter(&self) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
let deleted = &self.deleted;
let cold_iter = self.cold_chunks.iter().flat_map(|c| c.iter());
let hot_iter = self.hot_chunks.iter().flat_map(|c| c.iter());
let delta_iter = self.delta_inserts.iter().copied();
cold_iter
.chain(hot_iter)
.chain(delta_iter)
.filter(move |(_, edge_id)| !deleted.contains(edge_id))
}
fn contains(&self, destination: NodeId) -> bool {
let dst_raw = destination.as_u64();
let deleted = &self.deleted;
for entry in &self.skip_index {
if dst_raw < entry.min_destination || dst_raw > entry.max_destination {
continue;
}
let chunk = &self.cold_chunks[entry.chunk_index];
let decoded_dsts = chunk.destinations.decode();
let decoded_edges = chunk.edge_ids.unpack();
if let Ok(pos) = decoded_dsts.binary_search(&dst_raw) {
let mut i = pos;
while i > 0 && decoded_dsts[i - 1] == dst_raw {
i -= 1;
}
for j in i..decoded_dsts.len() {
if decoded_dsts[j] != dst_raw {
break;
}
if !deleted.contains(&EdgeId::new(decoded_edges[j])) {
return true;
}
}
}
}
for chunk in &self.hot_chunks {
for (dst, edge_id) in chunk.iter() {
if dst == destination && !deleted.contains(&edge_id) {
return true;
}
}
}
for &(dst, edge_id) in &self.delta_inserts {
if dst == destination && !deleted.contains(&edge_id) {
return true;
}
}
false
}
fn destinations_in_range(&self, min: NodeId, max: NodeId) -> Vec<(NodeId, EdgeId)> {
let min_raw = min.as_u64();
let max_raw = max.as_u64();
let deleted = &self.deleted;
let mut results = Vec::new();
for entry in &self.skip_index {
if entry.max_destination < min_raw || entry.min_destination > max_raw {
continue;
}
let chunk = &self.cold_chunks[entry.chunk_index];
results.extend(
chunk
.destinations_in_range(min_raw, max_raw)
.into_iter()
.filter(|(_, eid)| !deleted.contains(eid)),
);
}
for chunk in &self.hot_chunks {
for (dst, edge_id) in chunk.iter() {
if dst.as_u64() >= min_raw && dst.as_u64() <= max_raw && !deleted.contains(&edge_id)
{
results.push((dst, edge_id));
}
}
}
for &(dst, edge_id) in &self.delta_inserts {
if dst.as_u64() >= min_raw && dst.as_u64() <= max_raw && !deleted.contains(&edge_id) {
results.push((dst, edge_id));
}
}
results
}
fn neighbors(&self) -> impl Iterator<Item = NodeId> + '_ {
self.iter().map(|(dst, _)| dst)
}
fn degree(&self) -> usize {
self.iter().count()
}
fn hot_count(&self) -> usize {
self.hot_chunks.iter().map(|c| c.len()).sum::<usize>() + self.delta_inserts.len()
}
fn cold_count(&self) -> usize {
self.cold_chunks.iter().map(|c| c.len()).sum()
}
#[cfg(test)]
fn memory_size(&self) -> usize {
let hot_size = self.hot_chunks.iter().map(|c| c.len() * 16).sum::<usize>();
let cold_size = self
.cold_chunks
.iter()
.map(|c| c.memory_size())
.sum::<usize>();
let delta_size = self.delta_inserts.len() * 16;
let deleted_size = self.deleted.len() * 16;
hot_size + cold_size + delta_size + deleted_size
}
}
pub struct ChunkedAdjacency {
lists: RwLock<FxHashMap<NodeId, AdjacencyList>>,
chunk_capacity: usize,
edge_count: AtomicUsize,
deleted_count: AtomicUsize,
}
impl ChunkedAdjacency {
#[must_use]
pub fn new() -> Self {
Self::with_chunk_capacity(DEFAULT_CHUNK_CAPACITY)
}
#[must_use]
pub fn with_chunk_capacity(capacity: usize) -> Self {
Self {
lists: RwLock::new(FxHashMap::default()),
chunk_capacity: capacity,
edge_count: AtomicUsize::new(0),
deleted_count: AtomicUsize::new(0),
}
}
pub fn add_edge(&self, src: NodeId, dst: NodeId, edge_id: EdgeId) {
let mut lists = self.lists.write();
lists
.entry(src)
.or_insert_with(AdjacencyList::new)
.add_edge(dst, edge_id, self.chunk_capacity);
self.edge_count.fetch_add(1, Ordering::Relaxed);
}
pub fn batch_add_edges(&self, edges: &[(NodeId, NodeId, EdgeId)]) {
if edges.is_empty() {
return;
}
let mut lists = self.lists.write();
for &(src, dst, edge_id) in edges {
lists
.entry(src)
.or_insert_with(AdjacencyList::new)
.add_edge(dst, edge_id, self.chunk_capacity);
}
self.edge_count.fetch_add(edges.len(), Ordering::Relaxed);
}
pub fn mark_deleted(&self, src: NodeId, edge_id: EdgeId) {
let mut lists = self.lists.write();
if let Some(list) = lists.get_mut(&src) {
list.mark_deleted(edge_id);
self.deleted_count.fetch_add(1, Ordering::Relaxed);
}
}
pub fn batch_mark_deleted(&self, edges: &[(NodeId, EdgeId)]) {
let mut lists = self.lists.write();
let mut count = 0usize;
for &(src, edge_id) in edges {
if let Some(list) = lists.get_mut(&src) {
list.mark_deleted(edge_id);
count += 1;
}
}
if count > 0 {
self.deleted_count.fetch_add(count, Ordering::Relaxed);
}
}
pub fn unmark_deleted(&self, src: NodeId, edge_id: EdgeId) {
let mut lists = self.lists.write();
if let Some(list) = lists.get_mut(&src)
&& list.deleted.remove(&edge_id)
{
self.deleted_count.fetch_sub(1, Ordering::Relaxed);
}
}
#[must_use]
pub fn neighbors(&self, src: NodeId) -> Vec<NodeId> {
let lists = self.lists.read();
lists
.get(&src)
.map(|list| list.neighbors().collect())
.unwrap_or_default()
}
#[must_use]
pub fn edges_from(&self, src: NodeId) -> Vec<(NodeId, EdgeId)> {
let lists = self.lists.read();
lists
.get(&src)
.map(|list| list.iter().collect())
.unwrap_or_default()
}
pub fn out_degree(&self, src: NodeId) -> usize {
let lists = self.lists.read();
lists.get(&src).map_or(0, |list| list.degree())
}
pub fn in_degree(&self, node: NodeId) -> usize {
let lists = self.lists.read();
lists.get(&node).map_or(0, |list| list.degree())
}
pub fn compact(&self) {
let mut lists = self.lists.write();
for list in lists.values_mut() {
list.compact(self.chunk_capacity);
}
}
pub fn compact_if_needed(&self) {
let mut lists = self.lists.write();
for list in lists.values_mut() {
if list.delta_inserts.len() >= DELTA_COMPACTION_THRESHOLD {
list.compact(self.chunk_capacity);
}
}
}
pub fn total_edge_count(&self) -> usize {
self.edge_count.load(Ordering::Relaxed)
}
pub fn active_edge_count(&self) -> usize {
self.edge_count.load(Ordering::Relaxed) - self.deleted_count.load(Ordering::Relaxed)
}
pub fn node_count(&self) -> usize {
self.lists.read().len()
}
#[must_use]
pub fn contains_edge(&self, src: NodeId, dst: NodeId) -> bool {
let lists = self.lists.read();
lists.get(&src).is_some_and(|list| list.contains(dst))
}
#[must_use]
pub fn edges_in_range(
&self,
src: NodeId,
min_dst: NodeId,
max_dst: NodeId,
) -> Vec<(NodeId, EdgeId)> {
let lists = self.lists.read();
lists
.get(&src)
.map(|list| list.destinations_in_range(min_dst, max_dst))
.unwrap_or_default()
}
pub fn clear(&self) {
let mut lists = self.lists.write();
lists.clear();
self.edge_count.store(0, Ordering::Relaxed);
self.deleted_count.store(0, Ordering::Relaxed);
}
#[must_use]
pub fn memory_stats(&self) -> AdjacencyMemoryStats {
let lists = self.lists.read();
let mut hot_entries = 0usize;
let mut cold_entries = 0usize;
let mut hot_bytes = 0usize;
let mut cold_bytes = 0usize;
for list in lists.values() {
hot_entries += list.hot_count();
cold_entries += list.cold_count();
hot_bytes += list.hot_count() * 16;
for cold_chunk in &list.cold_chunks {
cold_bytes += cold_chunk.memory_size();
}
}
AdjacencyMemoryStats {
hot_entries,
cold_entries,
hot_bytes,
cold_bytes,
node_count: lists.len(),
}
}
#[must_use]
pub fn heap_memory_bytes(&self) -> usize {
let lists = self.lists.read();
let map_overhead = lists.capacity()
* (std::mem::size_of::<NodeId>() + std::mem::size_of::<AdjacencyList>() + 1);
let mut list_bytes = 0usize;
for list in lists.values() {
list_bytes += list.hot_chunks.capacity() * std::mem::size_of::<AdjacencyChunk>();
for chunk in &list.hot_chunks {
list_bytes += chunk.destinations.capacity() * std::mem::size_of::<NodeId>();
list_bytes += chunk.edge_ids.capacity() * std::mem::size_of::<EdgeId>();
}
list_bytes +=
list.cold_chunks.capacity() * std::mem::size_of::<CompressedAdjacencyChunk>();
for cold in &list.cold_chunks {
list_bytes += cold.memory_size();
}
list_bytes += list.delta_inserts.capacity() * 16;
list_bytes += list.deleted.capacity() * (std::mem::size_of::<EdgeId>() + 1);
list_bytes += list.skip_index.capacity() * std::mem::size_of::<SkipIndexEntry>();
}
map_overhead + list_bytes
}
pub fn freeze_all(&self) {
let mut lists = self.lists.write();
for list in lists.values_mut() {
list.freeze_all();
}
}
}
#[derive(Debug, Clone)]
pub struct AdjacencyMemoryStats {
pub hot_entries: usize,
pub cold_entries: usize,
pub hot_bytes: usize,
pub cold_bytes: usize,
pub node_count: usize,
}
impl AdjacencyMemoryStats {
#[must_use]
pub fn total_entries(&self) -> usize {
self.hot_entries + self.cold_entries
}
#[must_use]
pub fn total_bytes(&self) -> usize {
self.hot_bytes + self.cold_bytes
}
#[must_use]
pub fn cold_compression_ratio(&self) -> f64 {
if self.cold_entries == 0 || self.cold_bytes == 0 {
return 1.0;
}
let uncompressed = self.cold_entries * 16;
uncompressed as f64 / self.cold_bytes as f64
}
#[must_use]
pub fn overall_compression_ratio(&self) -> f64 {
let total_entries = self.total_entries();
if total_entries == 0 || self.total_bytes() == 0 {
return 1.0;
}
let uncompressed = total_entries * 16;
uncompressed as f64 / self.total_bytes() as f64
}
}
impl Default for ChunkedAdjacency {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_adjacency() {
let adj = ChunkedAdjacency::new();
adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
adj.add_edge(NodeId::new(0), NodeId::new(3), EdgeId::new(2));
let neighbors = adj.neighbors(NodeId::new(0));
assert_eq!(neighbors.len(), 3);
assert!(neighbors.contains(&NodeId::new(1)));
assert!(neighbors.contains(&NodeId::new(2)));
assert!(neighbors.contains(&NodeId::new(3)));
}
#[test]
fn test_out_degree() {
let adj = ChunkedAdjacency::new();
adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
assert_eq!(adj.out_degree(NodeId::new(0)), 2);
assert_eq!(adj.out_degree(NodeId::new(1)), 0);
}
#[test]
fn test_mark_deleted() {
let adj = ChunkedAdjacency::new();
adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
adj.mark_deleted(NodeId::new(0), EdgeId::new(0));
let neighbors = adj.neighbors(NodeId::new(0));
assert_eq!(neighbors.len(), 1);
assert!(neighbors.contains(&NodeId::new(2)));
}
#[test]
fn test_edges_from() {
let adj = ChunkedAdjacency::new();
adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(10));
adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(20));
let edges = adj.edges_from(NodeId::new(0));
assert_eq!(edges.len(), 2);
assert!(edges.contains(&(NodeId::new(1), EdgeId::new(10))));
assert!(edges.contains(&(NodeId::new(2), EdgeId::new(20))));
}
#[test]
fn test_compaction() {
let adj = ChunkedAdjacency::with_chunk_capacity(4);
for i in 0..10 {
adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
}
adj.compact();
let neighbors = adj.neighbors(NodeId::new(0));
assert_eq!(neighbors.len(), 10);
}
#[test]
fn test_edge_counts() {
let adj = ChunkedAdjacency::new();
adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
adj.add_edge(NodeId::new(1), NodeId::new(2), EdgeId::new(2));
assert_eq!(adj.total_edge_count(), 3);
assert_eq!(adj.active_edge_count(), 3);
adj.mark_deleted(NodeId::new(0), EdgeId::new(0));
assert_eq!(adj.total_edge_count(), 3);
assert_eq!(adj.active_edge_count(), 2);
}
#[test]
fn test_clear() {
let adj = ChunkedAdjacency::new();
adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
adj.clear();
assert_eq!(adj.total_edge_count(), 0);
assert_eq!(adj.node_count(), 0);
}
#[test]
fn test_chunk_compression() {
let mut chunk = AdjacencyChunk::new(64);
for i in 0..20 {
chunk.push(NodeId::new(100 + i * 5), EdgeId::new(1000 + i));
}
let compressed = chunk.compress();
assert_eq!(compressed.len(), 20);
let entries: Vec<_> = compressed.iter().collect();
assert_eq!(entries.len(), 20);
for window in entries.windows(2) {
assert!(window[0].0.as_u64() <= window[1].0.as_u64());
}
let original_dsts: std::collections::HashSet<_> =
(0..20).map(|i| NodeId::new(100 + i * 5)).collect();
let compressed_dsts: std::collections::HashSet<_> =
entries.iter().map(|(d, _)| *d).collect();
assert_eq!(original_dsts, compressed_dsts);
let ratio = compressed.compression_ratio();
assert!(
ratio > 1.0,
"Expected compression ratio > 1.0, got {}",
ratio
);
}
#[test]
fn test_empty_chunk_compression() {
let chunk = AdjacencyChunk::new(64);
let compressed = chunk.compress();
assert_eq!(compressed.len(), 0);
assert!(compressed.is_empty());
assert_eq!(compressed.iter().count(), 0);
}
#[test]
fn test_hot_to_cold_migration() {
let adj = ChunkedAdjacency::with_chunk_capacity(8);
for i in 0..100 {
adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
}
adj.compact();
let neighbors = adj.neighbors(NodeId::new(0));
assert_eq!(neighbors.len(), 100);
let stats = adj.memory_stats();
assert_eq!(stats.total_entries(), 100);
assert!(
stats.cold_entries > 0,
"Expected some cold entries, got {}",
stats.cold_entries
);
}
#[test]
fn test_memory_stats() {
let adj = ChunkedAdjacency::with_chunk_capacity(8);
for i in 0..20 {
adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
}
adj.compact();
let stats = adj.memory_stats();
assert_eq!(stats.total_entries(), 20);
assert_eq!(stats.node_count, 1);
assert!(stats.total_bytes() > 0);
}
#[test]
fn test_freeze_all() {
let adj = ChunkedAdjacency::with_chunk_capacity(8);
for i in 0..30 {
adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
}
adj.compact();
let before = adj.memory_stats();
adj.freeze_all();
let after = adj.memory_stats();
assert_eq!(after.hot_entries, 0);
assert_eq!(after.cold_entries, before.total_entries());
let neighbors = adj.neighbors(NodeId::new(0));
assert_eq!(neighbors.len(), 30);
}
#[test]
fn test_cold_compression_ratio() {
let adj = ChunkedAdjacency::with_chunk_capacity(8);
for i in 0..200 {
adj.add_edge(NodeId::new(0), NodeId::new(100 + i), EdgeId::new(i));
}
adj.compact();
adj.freeze_all();
let stats = adj.memory_stats();
let ratio = stats.cold_compression_ratio();
assert!(
ratio > 1.5,
"Expected cold compression ratio > 1.5, got {}",
ratio
);
}
#[test]
fn test_deleted_edges_with_cold_storage() {
let adj = ChunkedAdjacency::with_chunk_capacity(8);
for i in 0..50 {
adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
}
adj.compact();
for i in (0..50).step_by(2) {
adj.mark_deleted(NodeId::new(0), EdgeId::new(i));
}
let neighbors = adj.neighbors(NodeId::new(0));
assert_eq!(neighbors.len(), 25);
for neighbor in neighbors {
assert!(neighbor.as_u64() % 2 == 0); }
}
#[test]
fn test_adjacency_list_memory_size() {
let mut list = AdjacencyList::new();
for i in 0..50 {
list.add_edge(NodeId::new(i + 1), EdgeId::new(i), 64);
}
list.compact(8);
let size = list.memory_size();
assert!(size > 0);
assert!(size <= 50 * 16 + 200); }
#[test]
fn test_cold_iteration_order() {
let adj = ChunkedAdjacency::with_chunk_capacity(8);
for i in 0..50 {
adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
}
adj.compact();
let edges = adj.edges_from(NodeId::new(0));
assert_eq!(edges.len(), 50);
let edge_ids: std::collections::HashSet<_> = edges.iter().map(|(_, e)| *e).collect();
for i in 0..50 {
assert!(edge_ids.contains(&EdgeId::new(i)));
}
}
#[test]
fn test_in_degree() {
let adj = ChunkedAdjacency::new();
adj.add_edge(NodeId::new(2), NodeId::new(1), EdgeId::new(0)); adj.add_edge(NodeId::new(2), NodeId::new(3), EdgeId::new(1));
assert_eq!(adj.in_degree(NodeId::new(2)), 2);
assert_eq!(adj.in_degree(NodeId::new(1)), 0);
}
#[test]
fn test_bidirectional_edges() {
let forward = ChunkedAdjacency::new();
let backward = ChunkedAdjacency::new();
let edge_id = EdgeId::new(100);
forward.add_edge(NodeId::new(1), NodeId::new(2), edge_id);
backward.add_edge(NodeId::new(2), NodeId::new(1), edge_id);
let forward_edges = forward.edges_from(NodeId::new(1));
assert_eq!(forward_edges.len(), 1);
assert_eq!(forward_edges[0], (NodeId::new(2), edge_id));
assert_eq!(forward.edges_from(NodeId::new(2)).len(), 0);
let backward_edges = backward.edges_from(NodeId::new(2));
assert_eq!(backward_edges.len(), 1);
assert_eq!(backward_edges[0], (NodeId::new(1), edge_id));
assert_eq!(backward.edges_from(NodeId::new(1)).len(), 0);
}
#[test]
fn test_bidirectional_chain() {
let forward = ChunkedAdjacency::new();
let backward = ChunkedAdjacency::new();
let a = NodeId::new(1);
let b = NodeId::new(2);
let c = NodeId::new(3);
let edge_ab = EdgeId::new(10);
forward.add_edge(a, b, edge_ab);
backward.add_edge(b, a, edge_ab);
let edge_bc = EdgeId::new(20);
forward.add_edge(b, c, edge_bc);
backward.add_edge(c, b, edge_bc);
let from_a = forward.edges_from(a);
assert_eq!(from_a.len(), 1);
assert_eq!(from_a[0].0, b);
let from_b = forward.edges_from(b);
assert_eq!(from_b.len(), 1);
assert_eq!(from_b[0].0, c);
let to_c = backward.edges_from(c);
assert_eq!(to_c.len(), 1);
assert_eq!(to_c[0].0, b);
let to_b = backward.edges_from(b);
assert_eq!(to_b.len(), 1);
assert_eq!(to_b[0].0, a);
assert_eq!(backward.edges_from(a).len(), 0);
assert_eq!(forward.edges_from(c).len(), 0);
}
#[test]
fn test_contains_edge_basic() {
let adj = ChunkedAdjacency::new();
adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
adj.add_edge(NodeId::new(0), NodeId::new(3), EdgeId::new(2));
assert!(adj.contains_edge(NodeId::new(0), NodeId::new(1)));
assert!(adj.contains_edge(NodeId::new(0), NodeId::new(2)));
assert!(adj.contains_edge(NodeId::new(0), NodeId::new(3)));
assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(4)));
assert!(!adj.contains_edge(NodeId::new(1), NodeId::new(0)));
}
#[test]
fn test_contains_edge_after_delete() {
let adj = ChunkedAdjacency::new();
adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(10));
adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(20));
assert!(adj.contains_edge(NodeId::new(0), NodeId::new(1)));
adj.mark_deleted(NodeId::new(0), EdgeId::new(10));
assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(1)));
assert!(adj.contains_edge(NodeId::new(0), NodeId::new(2)));
}
#[test]
fn test_contains_edge_in_cold_storage() {
let adj = ChunkedAdjacency::with_chunk_capacity(8);
for i in 0..100 {
adj.add_edge(NodeId::new(0), NodeId::new(100 + i), EdgeId::new(i));
}
adj.compact();
adj.freeze_all();
for i in 0..100 {
assert!(
adj.contains_edge(NodeId::new(0), NodeId::new(100 + i)),
"Should find destination {} in cold storage",
100 + i
);
}
assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(0)));
assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(99)));
assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(200)));
}
#[test]
fn test_contains_edge_in_delta_only() {
let adj = ChunkedAdjacency::new();
adj.add_edge(NodeId::new(0), NodeId::new(5), EdgeId::new(0));
adj.add_edge(NodeId::new(0), NodeId::new(10), EdgeId::new(1));
assert!(adj.contains_edge(NodeId::new(0), NodeId::new(5)));
assert!(adj.contains_edge(NodeId::new(0), NodeId::new(10)));
assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(7)));
}
#[test]
fn test_edges_in_range() {
let adj = ChunkedAdjacency::with_chunk_capacity(8);
for i in 0..100 {
adj.add_edge(NodeId::new(0), NodeId::new(100 + i), EdgeId::new(i));
}
adj.compact();
let results = adj.edges_in_range(NodeId::new(0), NodeId::new(130), NodeId::new(140));
assert_eq!(
results.len(),
11,
"Expected 11 edges in range [130, 140], got {}",
results.len()
);
for (dst, _) in &results {
assert!(dst.as_u64() >= 130 && dst.as_u64() <= 140);
}
let empty = adj.edges_in_range(NodeId::new(0), NodeId::new(200), NodeId::new(300));
assert!(empty.is_empty());
}
#[test]
fn test_skip_index_prunes_chunks() {
let adj = ChunkedAdjacency::with_chunk_capacity(8);
for i in 0..8 {
adj.add_edge(NodeId::new(0), NodeId::new(100 + i), EdgeId::new(i));
}
adj.compact();
for i in 0..8 {
adj.add_edge(NodeId::new(0), NodeId::new(200 + i), EdgeId::new(100 + i));
}
adj.compact();
for i in 0..8 {
adj.add_edge(NodeId::new(0), NodeId::new(300 + i), EdgeId::new(200 + i));
}
adj.compact();
adj.freeze_all();
assert!(adj.contains_edge(NodeId::new(0), NodeId::new(103)));
assert!(adj.contains_edge(NodeId::new(0), NodeId::new(205)));
assert!(adj.contains_edge(NodeId::new(0), NodeId::new(307)));
assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(150)));
assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(250)));
let range_b = adj.edges_in_range(NodeId::new(0), NodeId::new(200), NodeId::new(207));
assert_eq!(range_b.len(), 8);
}
#[test]
fn test_contains_after_freeze_all() {
let adj = ChunkedAdjacency::with_chunk_capacity(8);
for i in 0..50 {
adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
}
adj.compact();
adj.freeze_all();
let stats = adj.memory_stats();
assert_eq!(stats.hot_entries, 0);
assert_eq!(stats.cold_entries, 50);
for i in 0..50 {
assert!(
adj.contains_edge(NodeId::new(0), NodeId::new(i + 1)),
"Should find destination {} after freeze_all",
i + 1
);
}
}
}