use crate::visits::{
Parallel,
breadth_first::{EventPred, FilterArgsPred},
};
use crate::{traits::RandomAccessGraph, utils::Granularity};
use parallel_frontier::Frontier;
use rayon::prelude::*;
use std::{
ops::ControlFlow::{self, Continue},
sync::atomic::Ordering,
};
use sux::{bits::AtomicBitVec, traits::AtomicBitVecOps};
pub struct ParLowMem<G: RandomAccessGraph> {
graph: G,
granularity: usize,
visited: AtomicBitVec,
}
impl<G: RandomAccessGraph> ParLowMem<G> {
pub fn new(graph: G) -> Self {
Self::with_granularity(graph, Granularity::Nodes(128))
}
pub fn with_granularity(graph: G, granularity: Granularity) -> Self {
let num_nodes = graph.num_nodes();
let num_arcs = graph.num_arcs();
Self {
graph,
granularity: granularity.node_granularity(num_nodes, Some(num_arcs)),
visited: AtomicBitVec::new(num_nodes),
}
}
}
impl<G: RandomAccessGraph + Sync> Parallel<EventPred> for ParLowMem<G> {
fn par_visit_filtered_with<
R: IntoIterator<Item = usize>,
T: Clone + Send + Sync,
E: Send,
C: Fn(&mut T, EventPred) -> ControlFlow<E, ()> + Sync,
F: Fn(&mut T, FilterArgsPred) -> bool + Sync,
>(
&mut self,
roots: R,
mut init: T,
callback: C,
filter: F,
) -> ControlFlow<E, ()> {
let mut filtered_roots = vec![];
for root in roots {
if self.visited.get(root, Ordering::Relaxed)
|| !filter(
&mut init,
FilterArgsPred {
node: root,
pred: root,
distance: 0,
},
)
{
continue;
}
if filtered_roots.is_empty() {
callback(&mut init, EventPred::Init {})?;
}
filtered_roots.push(root);
self.visited.set(root, true, Ordering::Relaxed);
callback(
&mut init,
EventPred::Visit {
node: root,
pred: root,
distance: 0,
},
)?;
}
if filtered_roots.is_empty() {
return Continue(());
}
let mut curr_frontier = Frontier::new();
curr_frontier.as_mut()[0] = filtered_roots;
let mut next_frontier = Frontier::new();
let mut distance = 1;
while !curr_frontier.is_empty() {
callback(
&mut init,
EventPred::FrontierSize {
distance: distance - 1,
size: curr_frontier.len(),
},
)?;
{
curr_frontier
.par_iter()
.chunks(self.granularity)
.try_for_each_with(init.clone(), |init, chunk| {
chunk.into_iter().try_for_each(|&node| {
self.graph
.successors(node)
.into_iter()
.try_for_each(|succ| {
let (node, pred) = (succ, node);
if filter(
init,
FilterArgsPred {
node,
pred,
distance,
},
) {
if !self.visited.swap(succ, true, Ordering::Relaxed) {
callback(
init,
EventPred::Visit {
node,
pred,
distance,
},
)?;
next_frontier.push(succ);
} else {
callback(init, EventPred::Revisit { node, pred })?;
}
}
Continue(())
})
})
})
}?;
distance += 1;
std::mem::swap(&mut curr_frontier, &mut next_frontier);
next_frontier.clear();
}
callback(&mut init, EventPred::Done {})
}
fn reset(&mut self) {
self.visited.fill(false, Ordering::Relaxed);
}
}