use std::sync::atomic::{AtomicU64, Ordering};
use parking_lot::RwLock;
use crate::index::hnsw::native::layer::Layer;
#[derive(Debug, Clone)]
pub struct CsrGraph {
pub offsets: Vec<u32>,
pub neighbors: Vec<u32>,
pub num_nodes: u32,
pub max_degree: u32,
pub total_edges: u32,
}
impl CsrGraph {
#[must_use]
pub fn from_layer(layer: &Layer, num_nodes: usize) -> Self {
let n = num_nodes.min(layer.neighbors.len());
let mut offsets = Vec::with_capacity(n + 1);
let mut neighbors = Vec::with_capacity(n * 16);
let mut max_degree: u32 = 0;
offsets.push(0u32);
for node_id in 0..n {
let nbrs = layer.get_neighbors(node_id);
#[allow(clippy::cast_possible_truncation)]
let degree = nbrs.len() as u32;
max_degree = max_degree.max(degree);
for &nbr in &nbrs {
#[allow(clippy::cast_possible_truncation)]
let nbr_u32 = nbr as u32;
neighbors.push(nbr_u32);
}
#[allow(clippy::cast_possible_truncation)]
let offset = neighbors.len() as u32;
offsets.push(offset);
}
#[allow(clippy::cast_possible_truncation)]
let total_edges = neighbors.len() as u32;
#[allow(clippy::cast_possible_truncation)]
let num_nodes_u32 = n as u32;
CsrGraph {
offsets,
neighbors,
num_nodes: num_nodes_u32,
max_degree,
total_edges,
}
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.num_nodes == 0
}
#[must_use]
pub fn offsets_byte_size(&self) -> usize {
self.offsets.len() * std::mem::size_of::<u32>()
}
#[must_use]
pub fn neighbors_byte_size(&self) -> usize {
self.neighbors.len() * std::mem::size_of::<u32>()
}
#[must_use]
pub fn total_gpu_bytes(&self) -> usize {
self.offsets_byte_size() + self.neighbors_byte_size()
}
#[must_use]
pub fn density(&self) -> f64 {
if self.num_nodes <= 1 {
return 0.0;
}
let n = f64::from(self.num_nodes);
let max_edges = n * (n - 1.0);
if max_edges == 0.0 {
return 0.0;
}
f64::from(self.total_edges) / max_edges
}
#[must_use]
pub fn avg_degree(&self) -> f64 {
if self.num_nodes == 0 {
return 0.0;
}
f64::from(self.total_edges) / f64::from(self.num_nodes)
}
pub fn validate(&self) -> Result<(), String> {
let expected_len = self.num_nodes as usize + 1;
if self.offsets.len() != expected_len {
return Err(format!(
"offsets.len()={} != num_nodes+1={}",
self.offsets.len(),
expected_len,
));
}
for i in 1..self.offsets.len() {
if self.offsets[i] < self.offsets[i - 1] {
return Err(format!(
"offsets not monotonic at {}: {} < {}",
i,
self.offsets[i],
self.offsets[i - 1],
));
}
}
if let Some(&last) = self.offsets.last() {
if last != self.total_edges {
return Err(format!(
"last offset {} != total_edges {}",
last, self.total_edges,
));
}
}
for (idx, &nbr) in self.neighbors.iter().enumerate() {
if nbr >= self.num_nodes {
return Err(format!(
"neighbor[{}]={} >= num_nodes={}",
idx, nbr, self.num_nodes,
));
}
}
Ok(())
}
}
impl std::fmt::Display for CsrGraph {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
#[allow(clippy::cast_precision_loss)]
let vram_kb = self.total_gpu_bytes() as f64 / 1024.0;
write!(
f,
"CsrGraph(nodes={}, edges={}, max_deg={}, avg_deg={:.1}, density={:.6}, vram={:.1}KB)",
self.num_nodes,
self.total_edges,
self.max_degree,
self.avg_degree(),
self.density(),
vram_kb,
)
}
}
pub struct CsrCache {
csr: RwLock<Option<CsrGraph>>,
generation: AtomicU64,
built_generation: AtomicU64,
version: AtomicU64,
}
impl CsrCache {
#[must_use]
pub fn new() -> Self {
Self {
csr: RwLock::new(None),
generation: AtomicU64::new(1),
built_generation: AtomicU64::new(0),
version: AtomicU64::new(0),
}
}
#[inline]
fn is_stale(&self) -> bool {
self.generation.load(Ordering::Acquire) != self.built_generation.load(Ordering::Acquire)
}
pub fn invalidate(&self) {
self.generation.fetch_add(1, Ordering::Release);
}
#[must_use]
pub fn version(&self) -> u64 {
self.version.load(Ordering::Acquire)
}
pub fn get_or_rebuild(&self, layer: &Layer, num_nodes: usize) -> CsrGraph {
debug_assert!(
crate::index::hnsw::native::hnsw_holds_lock(
crate::index::hnsw::native::HnswLockRank::Layers
),
"CsrCache::get_or_rebuild must be called while holding the layers read lock"
);
if !self.is_stale() {
let guard = self.csr.read();
if let Some(ref csr) = *guard {
return csr.clone();
}
}
let gen_before = self.generation.load(Ordering::Acquire);
let new_csr = CsrGraph::from_layer(layer, num_nodes);
{
let mut guard = self.csr.write();
let gen_after = self.generation.load(Ordering::Acquire);
if gen_after == gen_before {
*guard = Some(new_csr.clone());
self.built_generation.store(gen_before, Ordering::Release);
self.version.fetch_add(1, Ordering::AcqRel);
}
}
new_csr
}
#[must_use]
pub fn clean_snapshot(&self) -> Option<CsrGraph> {
if self.is_stale() {
return None;
}
self.csr.read().clone()
}
}
impl Default for CsrCache {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::index::hnsw::native::{
hnsw_holds_lock, hnsw_record_lock_acquire, hnsw_record_lock_release, HnswLockRank, NodeId,
};
fn with_layers_rank<R>(f: impl FnOnce() -> R) -> R {
hnsw_record_lock_acquire(HnswLockRank::Layers);
debug_assert!(hnsw_holds_lock(HnswLockRank::Layers));
let result = f();
hnsw_record_lock_release(HnswLockRank::Layers);
result
}
#[test]
fn test_csr_from_empty_layer() {
let layer = Layer::new(0);
let csr = CsrGraph::from_layer(&layer, 0);
assert!(csr.is_empty());
assert_eq!(csr.offsets, vec![0]);
assert!(csr.neighbors.is_empty());
assert_eq!(csr.max_degree, 0);
assert_eq!(csr.total_edges, 0);
}
#[test]
fn test_csr_from_simple_layer() {
let layer = Layer::new(4);
layer.set_neighbors(0, vec![1, 2]);
layer.set_neighbors(1, vec![0, 3]);
layer.set_neighbors(2, vec![0, 1, 3]);
layer.set_neighbors(3, vec![1, 2]);
let csr = CsrGraph::from_layer(&layer, 4);
assert_eq!(csr.num_nodes, 4);
assert_eq!(csr.offsets, vec![0, 2, 4, 7, 9]);
assert_eq!(csr.neighbors, vec![1, 2, 0, 3, 0, 1, 3, 1, 2]);
assert_eq!(csr.max_degree, 3);
assert_eq!(csr.total_edges, 9);
}
#[test]
fn test_csr_neighbor_lookup() {
let layer = Layer::new(3);
layer.set_neighbors(0, vec![1, 2]);
layer.set_neighbors(1, vec![]);
layer.set_neighbors(2, vec![0]);
let csr = CsrGraph::from_layer(&layer, 3);
assert_eq!(
&csr.neighbors[csr.offsets[0] as usize..csr.offsets[1] as usize],
&[1, 2]
);
assert_eq!(
&csr.neighbors[csr.offsets[1] as usize..csr.offsets[2] as usize],
&[] as &[u32]
);
assert_eq!(
&csr.neighbors[csr.offsets[2] as usize..csr.offsets[3] as usize],
&[0]
);
}
#[test]
fn test_csr_cache_dirty_flag() {
let cache = CsrCache::new();
assert_eq!(cache.version(), 0);
let layer = Layer::new(2);
layer.set_neighbors(0, vec![1]);
layer.set_neighbors(1, vec![0]);
let csr = with_layers_rank(|| cache.get_or_rebuild(&layer, 2));
assert_eq!(csr.num_nodes, 2);
assert_eq!(cache.version(), 1);
let csr2 = with_layers_rank(|| cache.get_or_rebuild(&layer, 2));
assert_eq!(csr2.num_nodes, 2);
assert_eq!(cache.version(), 1);
cache.invalidate();
let csr3 = with_layers_rank(|| cache.get_or_rebuild(&layer, 2));
assert_eq!(csr3.num_nodes, 2);
assert_eq!(cache.version(), 2); }
#[test]
fn test_csr_byte_sizes() {
let layer = Layer::new(100);
for i in 0..100 {
let neighbors: Vec<NodeId> = (0..16).map(|j| (i + j + 1) % 100).collect();
layer.set_neighbors(i, neighbors);
}
let csr = CsrGraph::from_layer(&layer, 100);
assert_eq!(csr.offsets_byte_size(), 101 * 4); assert_eq!(csr.neighbors_byte_size(), 1600 * 4); assert_eq!(csr.total_gpu_bytes(), 101 * 4 + 1600 * 4);
}
#[test]
fn test_csr_partial_capacity() {
let layer = Layer::new(100);
layer.set_neighbors(0, vec![1, 2]);
layer.set_neighbors(1, vec![0]);
let csr = CsrGraph::from_layer(&layer, 5);
assert_eq!(csr.num_nodes, 5);
assert_eq!(csr.offsets[2], csr.offsets[3]);
assert_eq!(csr.offsets[3], csr.offsets[4]);
assert_eq!(csr.offsets[4], csr.offsets[5]);
}
#[test]
fn test_clean_snapshot_returns_none_when_dirty() {
let cache = CsrCache::new();
assert!(cache.clean_snapshot().is_none());
let layer = Layer::new(1);
with_layers_rank(|| cache.get_or_rebuild(&layer, 1)); assert!(cache.clean_snapshot().is_some());
cache.invalidate();
assert!(cache.clean_snapshot().is_none()); }
}