use super::super::clustered_index::ClusteredIndex;
use super::super::csr_snapshot::SnapshotBuilder;
use super::super::edge::EdgeStore;
use super::ConcurrentEdgeStore;
use crate::error::Result;
use std::sync::atomic::Ordering;
use std::sync::Arc;
impl ConcurrentEdgeStore {
#[inline]
pub(super) fn invalidate_snapshot(&self) {
let guard = self.clustered_snapshot.read();
if guard.is_some() {
drop(guard);
*self.clustered_snapshot.write() = None;
}
}
#[inline]
pub(super) fn rebuild_snapshot_best_effort(&self) {
self.csr_dirty.store(true, Ordering::Release);
}
#[allow(clippy::unnecessary_wraps)] pub(crate) fn rebuild_snapshot(&self) -> Result<()> {
let mut merged = EdgeStore::new();
for shard in &self.shards {
let guard = shard.read();
for edge in guard.all_edges() {
let _ = merged.add_edge(edge.clone());
}
}
let label_table = self.label_table.read();
let new_snapshot = SnapshotBuilder::build(&merged, &label_table);
self.csr_snapshot.store(Arc::new(new_snapshot));
Ok(())
}
pub fn build_read_snapshot(&self) {
let ids = self.edge_ids.read();
let edge_count = ids.len();
let mut snapshot = ClusteredIndex::with_capacity(edge_count, edge_count);
for (&edge_id, &source_id) in ids.iter() {
let shard_idx = self.shard_index(source_id);
let guard = self.shards[shard_idx].read();
if let Some(edge) = guard.get_edge(edge_id) {
snapshot.insert(source_id, edge.target());
}
}
snapshot.compact();
*self.clustered_snapshot.write() = Some(snapshot);
let _ = self.rebuild_snapshot();
}
#[must_use]
pub fn has_read_snapshot(&self) -> bool {
self.clustered_snapshot.read().is_some()
}
}