use std::num::NonZeroUsize;
use std::path::PathBuf;
use anyhow::{Context, Result};
use dsi_bitstream::prelude::BE;
use dsi_progress_logger::{concurrent_progress_logger, ProgressLog};
use itertools::Itertools;
use lender::{IntoIteratorExt, IntoLender, Lender};
use rayon::prelude::*;
use webgraph::graphs::arc_list_graph::ArcListGraph;
use webgraph::prelude::*;
use webgraph::utils::ParSortPairs;
pub fn transform<F, G, Iter>(
partitions_per_thread: usize,
graph: G,
transformation: F,
target_path: PathBuf,
) -> Result<()>
where
F: Fn(usize, usize) -> Iter + Send + Sync,
Iter: IntoIterator<Item = (usize, usize), IntoIter: Send + Sync>,
G: SplitLabeling<Label=usize>,
for<'a> <<G as SplitLabeling>::IntoIterator<'a> as IntoIterator>::IntoIter: Send + Sync,
for<'a, 'b> <<<G as SplitLabeling>::SplitLender<'a> as NodeLabelsLender<'b>>::IntoIterator as IntoIterator>::IntoIter: Send + Sync,
{
let num_nodes = graph.num_nodes();
let temp_dir = tempfile::tempdir().context("Could not create temporary directory")?;
let num_threads = num_cpus::get();
let num_partitions = num_threads * partitions_per_thread;
let nodes_per_partition = num_nodes.div_ceil(num_partitions);
let num_partitions = num_nodes.div_ceil(nodes_per_partition);
log::info!(
"Transforming {} nodes with {} threads, {} partitions, {} nodes per partition",
num_nodes,
num_threads,
num_partitions,
nodes_per_partition,
);
let mut pl = concurrent_progress_logger!(
display_memory = true,
item_name = "node",
expected_updates = Some(num_nodes),
local_speed = true,
);
pl.start("Reading and sorting...");
let pair_sorter =
ParSortPairs::new(num_nodes)?.num_partitions(NonZeroUsize::new(num_partitions).unwrap());
let transformation = &transformation;
let sorted_arcs = {
let pl = pl.clone();
pair_sorter
.sort(
graph
.split_iter(num_partitions)
.into_iter()
.collect::<Vec<_>>()
.into_par_iter()
.flat_map_iter(move |partition| {
let mut pl = pl.clone();
partition
.flat_map(lender::covar_mut!(
#![with<'g, G: SplitLabeling<Label=usize>>]
for<'lend>
move |(src, succ): (usize, <<G as SplitLabeling>::SplitLender<'g> as NodeLabelsLender<'lend>>::IntoIterator)|
-> lender::FromIter<std::vec::IntoIter<(usize, usize)>> {
let transformed_succ: Vec<_> = succ
.into_iter()
.flat_map(move |dst: usize| transformation(src, dst).into_iter())
.collect();
pl.light_update();
transformed_succ.into_into_lender().into_lender()
}
))
.iter()
}),
)
.context("Could not sort arcs")?
};
pl.done();
let arc_list_graphs = Vec::from(sorted_arcs.iters).into_iter().enumerate().map(
|(partition_id, sorted_arcs_partition)| {
ArcListGraph::new(num_nodes, sorted_arcs_partition.into_iter().dedup())
.iter_from(sorted_arcs.boundaries[partition_id])
.take(
sorted_arcs.boundaries[partition_id + 1]
.checked_sub(sorted_arcs.boundaries[partition_id])
.expect("sorted_arcs.boundaries is not sorted"),
)
},
);
BvComp::with_basename(target_path)
.par_comp_lenders::<BE, _>(arc_list_graphs.into_iter(), num_nodes)
.context("Could not build BVGraph from arcs")?;
drop(temp_dir);
Ok(())
}