use std::borrow::Borrow;
use anyhow::{Result, bail, ensure};
use dsi_progress_logger::{ProgressLog, progress_logger};
use rapidhash::RapidHashMap;
use smallvec::SmallVec;
use swh_graph::graph::*;
use swh_graph::graph::{NodeId, SwhForwardGraph};
use super::MapReducer;
use super::labels::{
DenseLabels, Labels, SparseLabels, StridableLabel, StriddenLabels, StriddenLabelsConfig,
};
pub struct MapReduceBuilder<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L: Labels> {
graph: G,
num_nodes: usize,
cheap_clones: bool,
keep_labels: bool,
pub map_reducer: MR,
labels_config: L::Config,
}
impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer<Label: Sized>>
MapReduceBuilder<G, MR, SparseLabels<MR::Label>>
{
pub fn new_sparse(graph: G, map_reducer: MR) -> Self {
MapReduceBuilder {
num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
graph,
cheap_clones: false,
keep_labels: false,
map_reducer,
labels_config: (),
}
}
}
impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer<Label: Default + Clone + Sized>>
MapReduceBuilder<G, MR, DenseLabels<MR::Label>>
{
pub fn new_dense(graph: G, map_reducer: MR) -> Self {
MapReduceBuilder {
num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
graph,
cheap_clones: false,
keep_labels: false,
map_reducer,
labels_config: (),
}
}
}
impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer<Label: StridableLabel>>
MapReduceBuilder<G, MR, StriddenLabels<MR::Label>>
{
pub fn new_stridden(graph: G, map_reducer: MR, num_words: usize) -> Self
where
MR::Label: StridableLabel + ToOwned,
{
MapReduceBuilder {
num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
graph,
cheap_clones: false,
keep_labels: false,
map_reducer,
labels_config: StriddenLabelsConfig { num_words },
}
}
}
impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L: Labels<Label = MR::Label>>
MapReduceBuilder<G, MR, L>
{
pub fn num_nodes(mut self, num_nodes: usize) -> Self {
self.num_nodes = num_nodes;
self
}
pub fn cheap_clones(mut self, cheap_clones: bool) -> Self {
self.cheap_clones = cheap_clones;
self
}
pub fn keep_labels(mut self, keep_labels: bool) -> Self {
self.keep_labels = keep_labels;
self
}
pub fn build(self) -> Result<MapReduce<G, MR, L>> {
let Self {
graph,
num_nodes,
cheap_clones,
keep_labels,
map_reducer,
labels_config,
} = self;
let pop_labels = match (cheap_clones, keep_labels) {
(true, true) => false, (true, false) => true, (false, true) => bail!("MapReduce cannot both keep labels and avoid expensive clones"),
(false, false) => true, };
log::info!("Allocating labels...");
let labels = L::new(graph.num_nodes(), labels_config);
Ok(MapReduce {
graph,
num_nodes,
pop_labels,
map_reducer,
labels,
pending_dependents: Default::default(),
})
}
}
pub struct MapReduce<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L> {
graph: G,
num_nodes: usize,
pop_labels: bool,
pub map_reducer: MR,
labels: L,
pending_dependents: RapidHashMap<NodeId, usize>,
}
impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L: Labels<Label = MR::Label>>
MapReduce<G, MR, L>
{
pub fn run_in_topological_order(
&mut self,
nodes: impl Iterator<Item = NodeId>,
) -> Result<(), MR::Error> {
if self.pop_labels {
self.run_in_topological_order_with_popped_labels(nodes)
} else {
self.run_in_topological_order_with_kept_labels(nodes)
}
}
pub fn push_node(&mut self, node: NodeId) -> Result<(), MR::Error> {
if self.pop_labels {
self.push_node_with_popped_labels(node)
} else {
self.push_node_with_kept_labels(node)
}
}
pub fn labels(&self) -> Result<&L> {
ensure!(
!self.pop_labels,
"MapReducer::labels() is not available as MapReduceBuilder::keep_labels() was not set to true"
);
Ok(&self.labels)
}
pub fn take_labels(self) -> Result<L> {
ensure!(
!self.pop_labels,
"MapReducer::labels() is not available as MapReduceBuilder::keep_labels() was not set to true"
);
Ok(self.labels)
}
fn run_in_topological_order_with_kept_labels(
&mut self,
nodes: impl Iterator<Item = NodeId>,
) -> Result<(), MR::Error> {
let mut pl = progress_logger!(
display_memory = true,
item_name = "node",
local_speed = true,
expected_updates = Some(self.num_nodes),
);
pl.start("Traversing graph in topological order...");
for node in nodes {
pl.light_update();
self.push_node_with_kept_labels(node)?;
}
pl.done();
Ok(())
}
#[inline]
fn push_node_with_kept_labels(&mut self, node: NodeId) -> Result<(), MR::Error> {
let mut dependencies = self.graph.successors(node).into_iter();
let first_dependency_label = (&mut dependencies)
.flat_map(|dependency| self.labels.get(dependency).map(ToOwned::to_owned))
.next();
let label: Option<<MR::Label as ToOwned>::Owned> = match first_dependency_label {
Some(label) => {
self.map_reducer.reduce(
label,
dependencies.flat_map(|dep|
self.labels.get(dep)),
)?
}
None => {
assert!(
dependencies.next().is_none(),
"first_dependency_label is None, but not all dependencies were consumed"
);
None
}
};
let label = match label {
Some(label) => self.map_reducer.map_reduce(node, label)?,
None => self.map_reducer.map(node)?,
};
self.map_reducer
.on_node_traversed(node, label.as_ref().map(|l| l.borrow()))?;
if let Some(label) = label {
let previous_label = self.labels.insert(node, label);
assert!(previous_label.is_none(), "{node} was labeled twice");
}
Ok(())
}
fn run_in_topological_order_with_popped_labels(
&mut self,
nodes: impl Iterator<Item = NodeId>,
) -> Result<(), MR::Error> {
let mut pl = progress_logger!(
display_memory = true,
item_name = "node",
local_speed = true,
expected_updates = Some(self.num_nodes),
);
pl.start("Traversing graph in topological order...");
for node in nodes {
pl.light_update();
self.push_node_with_popped_labels(node)?;
}
pl.done();
debug_assert!(
self.labels.is_empty(),
"run_in_topological_order_with_popped_labels ended without clearing its labels store"
);
Ok(())
}
#[inline]
fn push_node_with_popped_labels(&mut self, node: NodeId) -> Result<(), MR::Error> {
let num_dependents = self.graph.indegree(node);
if num_dependents > 0 {
self.pending_dependents.insert(node, num_dependents);
}
let mut dependencies = self.graph.successors(node).into_iter();
let mut merged_label: Option<<MR::Label as ToOwned>::Owned> = None;
while let Some(first_dependency) = dependencies.next() {
let first_dependency_label = if self.pending_dependents.get(&first_dependency)
== Some(&1)
{
self.pending_dependents.remove(&first_dependency);
self.labels.remove(first_dependency)
} else {
let pending_dependants = self.pending_dependents.get_mut(&first_dependency).unwrap_or_else(|| panic!("Node {node} depends on node {first_dependency} but the latter's label was not computed (yet?). Check the topological order is complete."));
*pending_dependants -= 1;
self.labels.get(first_dependency).map(
|l: &MR::Label| -> <MR::Label as std::borrow::ToOwned>::Owned { l.to_owned() },
)
};
if let Some(first_dependency_label) = first_dependency_label {
let mut dependencies_to_remove = SmallVec::<[_; 1]>::new();
merged_label = self.map_reducer.reduce(
first_dependency_label,
dependencies.flat_map(|dep| {
*self.pending_dependents.get_mut(&dep).unwrap() -= 1;
if *self.pending_dependents.get(&dep).unwrap() == 0 {
dependencies_to_remove.push(dep);
}
self.labels.get(dep)
}),
)?;
for dep in dependencies_to_remove {
self.pending_dependents.remove(&dep);
self.labels.remove(dep);
}
break;
}
}
let label = match merged_label {
Some(merged_label) => self.map_reducer.map_reduce(node, merged_label)?,
None => self.map_reducer.map(node)?,
};
self.map_reducer
.on_node_traversed(node, label.as_ref().map(Borrow::borrow))?;
if num_dependents > 0 {
if let Some(label) = label {
let previous_label = self.labels.insert(node, label);
assert!(previous_label.is_none(), "{node} was labeled twice");
}
} else {
assert!(
!self.labels.contains_key(node),
"{node} was already labeled"
);
}
Ok(())
}
}