use std::collections::HashMap;
use std::fmt::Display;
use std::fs::File;
use std::io::BufWriter;
use std::num::NonZeroUsize;
use std::ops::Range;
use std::path::Path;
use anyhow::Result;
use anyhow::{Context, Error, anyhow};
use dsi_bitstream::traits::BigEndian;
use dsi_progress_logger::{ProgressLog, concurrent_progress_logger};
use epserde::ser::Serialize;
use lender::{IntoLender, Lender};
#[allow(unused_imports)] use log::{self, debug, info, warn};
use rayon::prelude::*;
use swh_graph::graph::SwhGraph;
use swh_graph::views::{Subgraph, Transposed};
use swh_graph::{graph::*, properties};
use webgraph::graphs::arc_list_graph::ArcListGraph;
use webgraph::labels::LeftIterator;
use webgraph::prelude::{BvComp, BvGraphSeq};
use webgraph::traits::SequentialLabeling;
use webgraph::utils::par_sort_pairs::ParSortPairs;
use super::Database;
use crate::iter_nodes;
pub fn compute_project_ids<G>(
graph: &G,
num_partitions: usize,
output_path: &Path,
) -> Result<(InitialRevStats, HashMap<Vec<usize>, usize>)>
where
G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties<Maps: properties::Maps> + Sync,
{
std::fs::create_dir_all(output_path)
.with_context(|| format!("Could not directory create {}", output_path.display()))?;
let db = Database {
base_path: output_path.to_path_buf(),
};
let rev_graph = Subgraph::with_node_constraint(&graph, "rev".parse().unwrap());
let num_revs = rev_graph.actual_num_nodes().unwrap_or_else(|_| {
let pl = concurrent_progress_logger!(
item_name = "rev",
display_memory = false,
expected_updates = graph.actual_num_nodes().ok(),
);
rev_graph.par_iter_nodes(pl).count()
});
let root_revs: Vec<_> =
crate::vcs::list_root_revisions(graph).context("Could not list root revisions")?;
write_rev_to_roots(&graph, &root_revs, num_revs, num_partitions, &db)
.context("Could not write rev_to_roots")?;
drop(root_revs);
db.index_rev_to_roots()?;
let (stats, rev_set_to_project_id) = write_project_id_to_roots(num_partitions, &db)
.context("Could not write project_id_to_roots")?;
db.index_project_id_to_roots()?;
write_rev_to_project_id(&graph, &rev_set_to_project_id, num_revs, &db)
.context("Could not write rev_to_project_id")?;
Ok((stats, rev_set_to_project_id))
}
fn write_rev_to_roots<G>(
graph: &G,
root_revs: &[NodeId],
num_revs: usize,
num_partitions: usize,
db: &Database,
) -> Result<()>
where
G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties<Maps: properties::Maps> + Sync,
{
let num_nodes = graph.num_nodes();
let graph = Subgraph::with_node_constraint(graph, "rev".parse().unwrap());
info!(
"Found {} root revisions out of {num_revs} revisions and {num_nodes} nodes",
root_revs.len()
);
let mut pl = concurrent_progress_logger!(
item_name = "root rev",
display_memory = false,
expected_updates = Some(root_revs.len()),
);
pl.start("Processing root revs...");
let transposed_graph = Transposed(graph);
let unsorted_pairs = root_revs.par_iter().flat_map_iter(|&root_rev| {
iter_nodes(&transposed_graph, [root_rev])
.map(move |reached_node| -> Result<_> { Ok((reached_node, root_rev)) })
});
save_unsorted_as_bvgraph(
unsorted_pairs,
&db.rev_to_roots_path(),
num_nodes,
num_partitions,
)
}
fn write_project_id_to_roots(
num_partitions: usize,
db: &Database,
) -> Result<(InitialRevStats, HashMap<Vec<usize>, usize>)> {
info!("Computing project ids and stats");
let rev_to_roots = BvGraphSeq::with_basename(db.rev_to_roots_path()).load()?;
let mut stats = InitialRevStats::default();
let mut rev_set_to_project_id = HashMap::new();
let mut project_id_to_rev_set = Vec::new();
rev_to_roots.into_lender().for_each(|(rev, root_revs)| {
let root_rev_set: Vec<usize> = root_revs.collect();
if root_rev_set.is_empty() {
return;
}
stats.increment(rev, &root_rev_set);
let next_id = rev_set_to_project_id.len();
rev_set_to_project_id
.entry(root_rev_set.clone())
.or_insert_with(|| {
project_id_to_rev_set.push(root_rev_set);
next_id
});
});
info!("Computing the BvGraph mapping project ids to root revisions");
let get_partition = |range: Range<usize>| {
project_id_to_rev_set[range.clone()]
.iter()
.enumerate()
.flat_map(move |(relative_project_id, revs)| {
revs.iter()
.map(move |rev| (range.start + relative_project_id, *rev))
})
};
save_partitioned_sorted_as_bvgraph(
get_partition,
&db.project_id_to_roots_path(),
project_id_to_rev_set.len(),
num_partitions,
)?;
Ok((stats, rev_set_to_project_id))
}
fn write_rev_to_project_id<G>(
graph: &G,
rev_set_to_project_id: &HashMap<Vec<NodeId>, usize>,
num_revs: usize,
db: &Database,
) -> Result<()>
where
G: SwhGraph + SwhGraphWithProperties<Maps: properties::Maps>,
{
let rev_to_roots = BvGraphSeq::with_basename(db.rev_to_roots_path()).load()?;
let mut pl = concurrent_progress_logger!(
item_name = "rev",
display_memory = false,
expected_updates = Some(num_revs),
);
pl.start("Computing the BvGraph mapping revisions to project ids...");
let mut pab = sux::array::partial_array::new_sparse(graph.num_nodes(), num_revs);
rev_to_roots.into_lender().for_each(|(rev, root_revs)| {
let root_revs: Vec<_> = root_revs.collect();
if root_revs.is_empty() {
return;
}
let project_id = rev_set_to_project_id
.get(&root_revs)
.expect("Set of root revisions could not be mapped to project id");
pab.set(rev, *project_id);
pl.light_update();
});
pl.done();
let pa = pab.build();
let path = db.rev_to_project_id_path();
let file =
File::create(&path).with_context(|| format!("Could not create {}", path.display()))?;
unsafe { pa.serialize(&mut BufWriter::new(file)) }
.context("Could not serialize rev_to_project_id")?;
Ok(())
}
fn save_unsorted_as_bvgraph<I>(
unsorted_pairs: I,
path: &Path,
num_nodes: usize,
num_partitions: usize,
) -> Result<()>
where
I: ParallelIterator<Item = Result<(usize, usize), Error>>,
{
let mut pl = concurrent_progress_logger!(
item_name = "rev",
display_memory = false,
expected_updates = Some(num_nodes), );
pl.start("Sorting pairs...");
let pair_sorter = ParSortPairs::new(num_nodes)
.unwrap()
.expected_num_pairs(num_nodes)
.num_partitions(
NonZeroUsize::new(num_partitions).ok_or(anyhow!("num_partitions cannot be zero"))?,
);
let sorted_arcs: Vec<LeftIterator<_>> = pair_sorter.try_sort(unsorted_pairs)?.into();
pl.done();
BvComp::with_basename(path)
.par_comp_lenders::<BigEndian, _>(sorted_arcs.into_iter(), num_nodes)?;
Ok(())
}
fn save_partitioned_sorted_as_bvgraph<I>(
get_partition: impl Fn(Range<usize>) -> I,
path: &Path,
num_nodes: usize,
num_partitions: usize,
) -> Result<()>
where
I: Iterator<Item = (usize, usize)> + Clone + Send,
{
let partition_size = num_nodes.div_ceil(num_partitions);
let split_graphs = (0..num_partitions).filter_map(|partition_id| {
assert!(partition_id < num_partitions);
let offset = partition_id * partition_size;
if offset >= num_nodes {
return None;
};
let end = (offset + partition_size).min(num_nodes);
Some(ArcListGraph::new(end, get_partition(offset..end)).iter_from(offset))
});
BvComp::with_basename(path).par_comp_lenders::<BigEndian, _>(split_graphs, num_nodes)?;
Ok(())
}
#[derive(Debug, Default)]
pub struct InitialRevStats {
max_root_revs: usize,
rev_with_most_root_revs: Option<usize>,
rev_count: usize,
root_rev_count: usize,
reachable_from_one: usize,
reachable_from_two: usize,
reachable_from_three: usize,
reachable_from_four: usize,
}
impl InitialRevStats {
fn increment(&mut self, rev: usize, root_revs: &[usize]) {
let mut root_revs = root_revs.iter();
let first_root_rev = root_revs.next();
if let Some(&first_root_rev) = first_root_rev {
self.rev_count += 1;
if first_root_rev == rev {
self.root_rev_count += 1;
self.reachable_from_one += 1;
} else {
let remaining = root_revs.len();
if remaining == 0 {
self.reachable_from_one += 1;
} else if remaining == 1 {
self.reachable_from_two += 1;
} else if remaining == 2 {
self.reachable_from_three += 1;
} else if remaining == 3 {
self.reachable_from_four += 1;
}
if remaining + 1 > self.max_root_revs {
self.max_root_revs = remaining + 1;
self.rev_with_most_root_revs = Some(rev);
}
}
}
}
}
impl Display for InitialRevStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let percentage = |count| {
format!(
"{count} ({:.2}%)",
100. * (count as f64) / (self.rev_count as f64)
)
};
writeln!(f, "Statistics about root revisions")?;
writeln!(f, "----------------------------------")?;
writeln!(f, "Revisions: {}", self.rev_count)?;
writeln!(f, "Initial revisions: {}", percentage(self.root_rev_count))?;
writeln!(f, "Initial revisions per reached revision:")?;
writeln!(f, " 1: {}", percentage(self.reachable_from_one))?;
writeln!(f, " 2: {}", percentage(self.reachable_from_two))?;
writeln!(f, " 3: {}", percentage(self.reachable_from_three))?;
writeln!(f, " 4: {}", percentage(self.reachable_from_four))?;
writeln!(
f,
" 5+: {}",
percentage(
self.rev_count
- self.reachable_from_one
- self.reachable_from_two
- self.reachable_from_three
- self.reachable_from_four
)
)?;
writeln!(
f,
"Maximum number of root revisions reaching a revision: {}",
self.max_root_revs
)
}
}