swh_graph/compress/
transform.rs1use std::num::NonZeroUsize;
7use std::path::PathBuf;
8
9use anyhow::{Context, Result};
10use dsi_bitstream::prelude::BE;
11use dsi_progress_logger::{concurrent_progress_logger, ProgressLog};
12use itertools::Itertools;
13use lender::{IntoIteratorExt, IntoLender, Lender};
14use rayon::prelude::*;
15use webgraph::graphs::arc_list_graph::ArcListGraph;
16use webgraph::prelude::*;
17use webgraph::utils::ParSortPairs;
18
19pub fn transform<F, G, Iter>(
22 partitions_per_thread: usize,
23 graph: G,
24 transformation: F,
25 target_path: PathBuf,
26) -> Result<()>
27where
28 F: Fn(usize, usize) -> Iter + Send + Sync,
29 Iter: IntoIterator<Item = (usize, usize), IntoIter: Send + Sync>,
30 G: SplitLabeling<Label=usize>,
31 for<'a> <<G as SplitLabeling>::IntoIterator<'a> as IntoIterator>::IntoIter: Send + Sync,
32 for<'a, 'b> <<<G as SplitLabeling>::SplitLender<'a> as NodeLabelsLender<'b>>::IntoIterator as IntoIterator>::IntoIter: Send + Sync,
33{
34 let num_nodes = graph.num_nodes();
36
37 let temp_dir = tempfile::tempdir().context("Could not create temporary directory")?;
38
39 let num_threads = num_cpus::get();
40 let num_partitions = num_threads * partitions_per_thread;
41 let nodes_per_partition = num_nodes.div_ceil(num_partitions);
42
43 let num_partitions = num_nodes.div_ceil(nodes_per_partition);
45
46 log::info!(
47 "Transforming {} nodes with {} threads, {} partitions, {} nodes per partition",
48 num_nodes,
49 num_threads,
50 num_partitions,
51 nodes_per_partition,
52 );
53
54 let mut pl = concurrent_progress_logger!(
55 display_memory = true,
56 item_name = "node",
57 expected_updates = Some(num_nodes),
58 local_speed = true,
59 );
60 pl.start("Reading and sorting...");
61
62 let pair_sorter =
64 ParSortPairs::new(num_nodes)?.num_partitions(NonZeroUsize::new(num_partitions).unwrap());
65 let transformation = &transformation;
66 let sorted_arcs = {
67 let pl = pl.clone();
68 pair_sorter
69 .sort(
70 graph
71 .split_iter(num_partitions)
72 .into_iter()
73 .collect::<Vec<_>>()
74 .into_par_iter()
75 .flat_map_iter(move |partition| {
76 let mut pl = pl.clone();
77 partition
78 .flat_map(lender::covar_mut!(
79 #![with<'g, G: SplitLabeling<Label=usize>>]
80 for<'lend>
81 move |(src, succ): (usize, <<G as SplitLabeling>::SplitLender<'g> as NodeLabelsLender<'lend>>::IntoIterator)|
82 -> lender::FromIter<std::vec::IntoIter<(usize, usize)>> {
83 let transformed_succ: Vec<_> = succ
84 .into_iter()
85 .flat_map(move |dst: usize| transformation(src, dst).into_iter())
86 .collect();
87 pl.light_update();
88 transformed_succ.into_into_lender().into_lender()
89 }
90 ))
91 .iter()
92 }),
93 )
94 .context("Could not sort arcs")?
95 };
96 pl.done();
97
98 let arc_list_graphs = Vec::from(sorted_arcs.iters).into_iter().enumerate().map(
99 |(partition_id, sorted_arcs_partition)| {
100 ArcListGraph::new(num_nodes, sorted_arcs_partition.into_iter().dedup())
101 .iter_from(sorted_arcs.boundaries[partition_id])
102 .take(
103 sorted_arcs.boundaries[partition_id + 1]
104 .checked_sub(sorted_arcs.boundaries[partition_id])
105 .expect("sorted_arcs.boundaries is not sorted"),
106 )
107 },
108 );
109
110 BvComp::with_basename(target_path)
111 .par_comp_lenders::<BE, _>(arc_list_graphs, num_nodes)
112 .context("Could not build BVGraph from arcs")?;
113
114 drop(temp_dir); Ok(())
117}