swh-graph-stdlib 13.0.0

Library of algorithms and data structures for swh-graph
Documentation
// Copyright (C) 2023-2026  The Software Heritage developers
// See the AUTHORS file at the top-level directory of this distribution
// License: GNU General Public License version 3, or any later version
// See top-level LICENSE file for more information

use std::io::{BufReader, Read};

use anyhow::{Context, Result};
use itertools::Itertools;
use rayon::prelude::*;
use swh_graph::graph::{NodeId, SwhGraphWithProperties};

/// Reads CSV records from a file, and queues their SWHIDs and node ids to `tx`,
/// preserving the order.
///
/// This is equivalent to:
///
/// ```no_compile
/// std::thread::spawn(move || -> Result<()> {
///     let mut reader = csv::ReaderBuilder::new()
///         .has_headers(true)
///         .from_reader(reader);
///
///     for record in reader.deserialize() {
///         let InputRecord { swhid, .. } =
///             record.with_context(|| format!("Could not deserialize record"))?;
///         let node = graph
///             .properties()
///             .node_id_from_string_swhid(swhid)
///             .with_context(|| format!("Unknown SWHID: {}", swhid))?;
///
///         tx.send((swhid, node))
///     }
/// });
/// ```
///
/// but uses inner parallelism as `node_id()` could otherwise be a bottleneck on systems
/// where accessing `graph.order` has high latency (network and/or compressed filesystem).
/// This reduces the runtime from a couple of weeks to less than a day on the 2023-09-06
/// graph on a ZSTD-compressed ZFS.
///
/// `reader` is buffered internally.
pub fn queue_nodes_from_swhids_csv<G, R: Read + Send>(
    graph: &G,
    reader: R,
    column_name: &str,
    tx: std::sync::mpsc::SyncSender<Box<[(String, NodeId)]>>,
    batch_size: usize,
) -> Result<()>
where
    G: SwhGraphWithProperties + Sync,
    <G as SwhGraphWithProperties>::Maps: swh_graph::properties::Maps,
{
    // Workers in this function block while pushing data to a queue.
    // as that queue is read by other Rayon workers, using a shared thread pool
    // risks deadlocks, as this function can block all the threads, leaving no thread
    // to fill the queue.
    let pool = rayon::ThreadPoolBuilder::new()
        .build()
        .context("Could not build thread pool")?;
    pool.install(|| {
        let mut reader = csv::ReaderBuilder::new()
            .has_headers(true)
            .from_reader(BufReader::new(reader));

        // Makes sure the input at least has a header, even when there is no payload
        let column_id = reader
            .headers()
            .context("Invalid header in input")?
            .iter()
            .position(|item| item == column_name)
            .context("Input has no 'swhid' header")?;

        reader
            .records()
            .chunks(batch_size)
            .into_iter()
            .try_for_each(|chunk| {
                // Process entries of this chunk in parallel
                let results: Result<Box<[_]>> = chunk
                    .collect::<Vec<Result<csv::StringRecord, _>>>()
                    .into_par_iter()
                    .map(|record| {
                        let record = record.context("Could not parse record")?;
                        let swhid = record.get(column_id).context("Missing cell")?;
                        let swhid = swhid.parse().context("Could not parse SWHID")?;

                        let node = graph.properties().node_id_from_string_swhid(&swhid)?;
                        Ok((swhid, node))
                    })
                    .collect();

                let results = results?;

                // Then collect them **IN ORDER** before pushing to 'tx'.
                tx.send(results).expect("Could not send (swhid, node_id)");

                Ok::<_, anyhow::Error>(())
            })?;

        Ok(())
    })
}