swh-graph-stdlib 13.0.0

Library of algorithms and data structures for swh-graph
Documentation
// Copyright (C) 2023-2026  The Software Heritage developers
// See the AUTHORS file at the top-level directory of this distribution
// License: GNU General Public License version 3, or any later version
// See top-level LICENSE file for more information

use std::collections::HashMap;
use std::fmt::Display;
use std::fs::File;
use std::io::BufWriter;
use std::num::NonZeroUsize;
use std::ops::Range;
use std::path::Path;

use anyhow::Result;
use anyhow::{Context, Error, anyhow};
use dsi_bitstream::traits::BigEndian;
use dsi_progress_logger::{ProgressLog, concurrent_progress_logger};
use epserde::ser::Serialize;
use lender::{IntoLender, Lender};
#[allow(unused_imports)] // to keep debug!() around
use log::{self, debug, info, warn};
use rayon::prelude::*;
use swh_graph::graph::SwhGraph;
use swh_graph::views::{Subgraph, Transposed};
use swh_graph::{graph::*, properties};
use webgraph::graphs::arc_list_graph::ArcListGraph;
use webgraph::labels::LeftIterator;
use webgraph::prelude::{BvComp, BvGraphSeq};
use webgraph::traits::SequentialLabeling;
use webgraph::utils::par_sort_pairs::ParSortPairs;

use super::Database;
use crate::iter_nodes;

/// Computes various BV graphs relating to project ids
///
/// * `rev_to_roots`: from a revision to the root revisions that it refers to
/// * `rev_to_project_id`: from a revision to its project id
/// * `project_id_to_roots`: from a project id to the set of root revisions that it corresponds to
pub fn compute_project_ids<G>(
    graph: &G,
    num_partitions: usize,
    output_path: &Path,
) -> Result<(InitialRevStats, HashMap<Vec<usize>, usize>)>
where
    G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties<Maps: properties::Maps> + Sync,
{
    std::fs::create_dir_all(output_path)
        .with_context(|| format!("Could not directory create {}", output_path.display()))?;

    let db = Database {
        base_path: output_path.to_path_buf(),
    };

    let rev_graph = Subgraph::with_node_constraint(&graph, "rev".parse().unwrap());
    let num_revs = rev_graph.actual_num_nodes().unwrap_or_else(|_| {
        let pl = concurrent_progress_logger!(
            item_name = "rev",
            display_memory = false,
            expected_updates = graph.actual_num_nodes().ok(),
        );
        rev_graph.par_iter_nodes(pl).count()
    });

    let root_revs: Vec<_> =
        crate::vcs::list_root_revisions(graph).context("Could not list root revisions")?;

    write_rev_to_roots(&graph, &root_revs, num_revs, num_partitions, &db)
        .context("Could not write rev_to_roots")?;

    drop(root_revs); // free memory

    db.index_rev_to_roots()?;

    let (stats, rev_set_to_project_id) = write_project_id_to_roots(num_partitions, &db)
        .context("Could not write project_id_to_roots")?;
    db.index_project_id_to_roots()?;

    write_rev_to_project_id(&graph, &rev_set_to_project_id, num_revs, &db)
        .context("Could not write rev_to_project_id")?;

    Ok((stats, rev_set_to_project_id))
}

fn write_rev_to_roots<G>(
    graph: &G,
    root_revs: &[NodeId],
    num_revs: usize,
    num_partitions: usize,
    db: &Database,
) -> Result<()>
where
    G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties<Maps: properties::Maps> + Sync,
{
    let num_nodes = graph.num_nodes();
    let graph = Subgraph::with_node_constraint(graph, "rev".parse().unwrap());
    info!(
        "Found {} root revisions out of {num_revs} revisions and {num_nodes} nodes",
        root_revs.len()
    );

    // Gather all the descendants and sort them on temporary files on disk
    let mut pl = concurrent_progress_logger!(
        item_name = "root rev",
        display_memory = false,
        expected_updates = Some(root_revs.len()),
    );
    pl.start("Processing root revs...");
    // we're transposing the graph because `iter_nodes` doesn't support configuring the direction
    let transposed_graph = Transposed(graph);
    let unsorted_pairs = root_revs.par_iter().flat_map_iter(|&root_rev| {
        iter_nodes(&transposed_graph, [root_rev])
            .map(move |reached_node| -> Result<_> { Ok((reached_node, root_rev)) })
    });

    save_unsorted_as_bvgraph(
        unsorted_pairs,
        &db.rev_to_roots_path(),
        num_nodes,
        num_partitions,
    )
}

fn write_project_id_to_roots(
    num_partitions: usize,
    db: &Database,
) -> Result<(InitialRevStats, HashMap<Vec<usize>, usize>)> {
    // Compute project ids and stats
    info!("Computing project ids and stats");
    let rev_to_roots = BvGraphSeq::with_basename(db.rev_to_roots_path()).load()?;
    let mut stats = InitialRevStats::default();
    let mut rev_set_to_project_id = HashMap::new();
    let mut project_id_to_rev_set = Vec::new();
    rev_to_roots.into_lender().for_each(|(rev, root_revs)| {
        let root_rev_set: Vec<usize> = root_revs.collect();
        if root_rev_set.is_empty() {
            // not a revision
            return;
        }
        stats.increment(rev, &root_rev_set);
        let next_id = rev_set_to_project_id.len();

        rev_set_to_project_id
            .entry(root_rev_set.clone())
            .or_insert_with(|| {
                // first time we see this set
                project_id_to_rev_set.push(root_rev_set);
                next_id
            });
    });

    info!("Computing the BvGraph mapping project ids to root revisions");
    let get_partition = |range: Range<usize>| {
        project_id_to_rev_set[range.clone()]
            .iter()
            .enumerate()
            .flat_map(move |(relative_project_id, revs)| {
                revs.iter()
                    .map(move |rev| (range.start + relative_project_id, *rev))
            })
    };
    save_partitioned_sorted_as_bvgraph(
        get_partition,
        &db.project_id_to_roots_path(),
        project_id_to_rev_set.len(),
        num_partitions,
    )?;

    Ok((stats, rev_set_to_project_id))
}

fn write_rev_to_project_id<G>(
    graph: &G,
    rev_set_to_project_id: &HashMap<Vec<NodeId>, usize>,
    num_revs: usize,
    db: &Database,
) -> Result<()>
where
    G: SwhGraph + SwhGraphWithProperties<Maps: properties::Maps>,
{
    let rev_to_roots = BvGraphSeq::with_basename(db.rev_to_roots_path()).load()?;
    let mut pl = concurrent_progress_logger!(
        item_name = "rev",
        display_memory = false,
        expected_updates = Some(num_revs),
    );
    pl.start("Computing the BvGraph mapping revisions to project ids...");

    let mut pab = sux::array::partial_array::new_sparse(graph.num_nodes(), num_revs);
    rev_to_roots.into_lender().for_each(|(rev, root_revs)| {
        let root_revs: Vec<_> = root_revs.collect();
        if root_revs.is_empty() {
            // rev is not a revision
            return;
        }
        let project_id = rev_set_to_project_id
            .get(&root_revs)
            .expect("Set of root revisions could not be mapped to project id");
        pab.set(rev, *project_id);
        pl.light_update();
    });
    pl.done();
    let pa = pab.build();
    let path = db.rev_to_project_id_path();
    let file =
        File::create(&path).with_context(|| format!("Could not create {}", path.display()))?;
    unsafe { pa.serialize(&mut BufWriter::new(file)) }
        .context("Could not serialize rev_to_project_id")?;

    Ok(())
}

/// Sort pairs and save them as a BVGraph
fn save_unsorted_as_bvgraph<I>(
    unsorted_pairs: I,
    path: &Path,
    num_nodes: usize,
    num_partitions: usize,
) -> Result<()>
where
    I: ParallelIterator<Item = Result<(usize, usize), Error>>,
{
    let mut pl = concurrent_progress_logger!(
        item_name = "rev",
        display_memory = false,
        expected_updates = Some(num_nodes), // approximation
    );
    pl.start("Sorting pairs...");
    let pair_sorter = ParSortPairs::new(num_nodes)
        .unwrap()
        .expected_num_pairs(num_nodes)
        .num_partitions(
            NonZeroUsize::new(num_partitions).ok_or(anyhow!("num_partitions cannot be zero"))?,
        );

    let sorted_arcs: Vec<LeftIterator<_>> = pair_sorter.try_sort(unsorted_pairs)?.into();
    pl.done();

    // Build a BvGraph out of those temporary files
    BvComp::with_basename(path)
        .par_comp_lenders::<BigEndian, _>(sorted_arcs.into_iter(), num_nodes)?;

    Ok(())
}

/// Save an iterator of iterators of pairs as BVGraph, assuming it is already sorted.
///
/// each item of partitioned_sorted_pairs should be
fn save_partitioned_sorted_as_bvgraph<I>(
    get_partition: impl Fn(Range<usize>) -> I,
    path: &Path,
    num_nodes: usize,
    num_partitions: usize,
) -> Result<()>
where
    I: Iterator<Item = (usize, usize)> + Clone + Send,
{
    let partition_size = num_nodes.div_ceil(num_partitions);
    let split_graphs = (0..num_partitions).filter_map(|partition_id| {
        assert!(partition_id < num_partitions);
        let offset = partition_id * partition_size;
        if offset >= num_nodes {
            // happens on tiny graphs due to partition_size being rounded up
            return None;
        };
        let end = (offset + partition_size).min(num_nodes);

        Some(ArcListGraph::new(end, get_partition(offset..end)).iter_from(offset))
    });

    BvComp::with_basename(path).par_comp_lenders::<BigEndian, _>(split_graphs, num_nodes)?;

    Ok(())
}

#[derive(Debug, Default)]
pub struct InitialRevStats {
    max_root_revs: usize,
    rev_with_most_root_revs: Option<usize>,
    rev_count: usize,
    root_rev_count: usize,
    reachable_from_one: usize,
    reachable_from_two: usize,
    reachable_from_three: usize,
    reachable_from_four: usize,
}

impl InitialRevStats {
    fn increment(&mut self, rev: usize, root_revs: &[usize]) {
        let mut root_revs = root_revs.iter();
        let first_root_rev = root_revs.next();
        if let Some(&first_root_rev) = first_root_rev {
            self.rev_count += 1;
            if first_root_rev == rev {
                self.root_rev_count += 1;
                self.reachable_from_one += 1;
            } else {
                let remaining = root_revs.len();
                if remaining == 0 {
                    self.reachable_from_one += 1;
                } else if remaining == 1 {
                    self.reachable_from_two += 1;
                } else if remaining == 2 {
                    self.reachable_from_three += 1;
                } else if remaining == 3 {
                    self.reachable_from_four += 1;
                }
                if remaining + 1 > self.max_root_revs {
                    self.max_root_revs = remaining + 1;
                    self.rev_with_most_root_revs = Some(rev);
                }
            }
        }
    }
}

impl Display for InitialRevStats {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let percentage = |count| {
            format!(
                "{count} ({:.2}%)",
                100. * (count as f64) / (self.rev_count as f64)
            )
        };
        writeln!(f, "Statistics about root revisions")?;
        writeln!(f, "----------------------------------")?;
        writeln!(f, "Revisions: {}", self.rev_count)?;
        writeln!(f, "Initial revisions: {}", percentage(self.root_rev_count))?;
        writeln!(f, "Initial revisions per reached revision:")?;
        writeln!(f, "  1: {}", percentage(self.reachable_from_one))?;
        writeln!(f, "  2: {}", percentage(self.reachable_from_two))?;
        writeln!(f, "  3: {}", percentage(self.reachable_from_three))?;
        writeln!(f, "  4: {}", percentage(self.reachable_from_four))?;
        writeln!(
            f,
            "  5+: {}",
            percentage(
                self.rev_count
                    - self.reachable_from_one
                    - self.reachable_from_two
                    - self.reachable_from_three
                    - self.reachable_from_four
            )
        )?;
        writeln!(
            f,
            "Maximum number of root revisions reaching a revision: {}",
            self.max_root_revs
        )
    }
}