use super::{ConcurrentEdgeStore, EdgeStore};
use parking_lot::RwLockWriteGuard;
use rustc_hash::{FxHashMap, FxHashSet};
use std::collections::BTreeSet;
impl ConcurrentEdgeStore {
#[allow(clippy::type_complexity)] pub(super) fn collect_node_edges(
&self,
node_shard: usize,
node_id: u64,
) -> (Vec<(u64, u64)>, Vec<(u64, u64)>) {
let guard = self.shards[node_shard].read();
let outgoing: Vec<_> = guard
.get_outgoing(node_id)
.iter()
.map(|e| (e.id(), e.target()))
.collect();
let incoming: Vec<_> = guard
.get_incoming(node_id)
.iter()
.map(|e| (e.id(), e.source()))
.collect();
(outgoing, incoming)
}
pub(super) fn gather_affected_shards(
&self,
node_shard: usize,
outgoing: &[(u64, u64)],
incoming: &[(u64, u64)],
) -> BTreeSet<usize> {
let mut shards = BTreeSet::new();
shards.insert(node_shard);
for (_, target) in outgoing {
shards.insert(self.shard_index(*target));
}
for (_, source) in incoming {
shards.insert(self.shard_index(*source));
}
shards
}
#[allow(clippy::too_many_arguments)]
pub(super) fn cleanup_shard_edges(
&self,
guards: &mut [(usize, RwLockWriteGuard<'_, EdgeStore>)],
node_shard: usize,
node_id: u64,
outgoing: &[(u64, u64)],
incoming: &[(u64, u64)],
) {
for (shard_idx, guard) in guards {
if *shard_idx == node_shard {
guard.remove_node_edges(node_id);
} else {
self.cleanup_cross_shard_edges(*shard_idx, guard, outgoing, incoming);
}
}
}
fn cleanup_cross_shard_edges(
&self,
shard_idx: usize,
guard: &mut RwLockWriteGuard<'_, EdgeStore>,
outgoing: &[(u64, u64)],
incoming: &[(u64, u64)],
) {
for (edge_id, target) in outgoing {
if self.shard_index(*target) == shard_idx {
guard.remove_edge_incoming_only(*edge_id);
}
}
for (edge_id, source) in incoming {
if self.shard_index(*source) == shard_idx {
guard.remove_edge_outgoing_only(*edge_id);
}
}
}
#[allow(clippy::unused_self)] pub(super) fn deregister_edge_ids(
&self,
ids: &mut FxHashMap<u64, u64>,
outgoing: &[(u64, u64)],
incoming: &[(u64, u64)],
) {
let mut removed: FxHashSet<u64> = FxHashSet::default();
for (edge_id, _) in outgoing {
if removed.insert(*edge_id) {
ids.remove(edge_id);
}
}
for (edge_id, _) in incoming {
if removed.insert(*edge_id) {
ids.remove(edge_id);
}
}
}
}