Skip to main content

swh_graph_stdlib/io/
mod.rs

1// Copyright (C) 2023-2026  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6use std::io::{BufReader, Read};
7
8use anyhow::{Context, Result};
9use itertools::Itertools;
10use rayon::prelude::*;
11use swh_graph::graph::{NodeId, SwhGraphWithProperties};
12
13/// Reads CSV records from a file, and queues their SWHIDs and node ids to `tx`,
14/// preserving the order.
15///
16/// This is equivalent to:
17///
18/// ```no_compile
19/// std::thread::spawn(move || -> Result<()> {
20///     let mut reader = csv::ReaderBuilder::new()
21///         .has_headers(true)
22///         .from_reader(reader);
23///
24///     for record in reader.deserialize() {
25///         let InputRecord { swhid, .. } =
26///             record.with_context(|| format!("Could not deserialize record"))?;
27///         let node = graph
28///             .properties()
29///             .node_id_from_string_swhid(swhid)
30///             .with_context(|| format!("Unknown SWHID: {}", swhid))?;
31///
32///         tx.send((swhid, node))
33///     }
34/// });
35/// ```
36///
37/// but uses inner parallelism as `node_id()` could otherwise be a bottleneck on systems
38/// where accessing `graph.order` has high latency (network and/or compressed filesystem).
39/// This reduces the runtime from a couple of weeks to less than a day on the 2023-09-06
40/// graph on a ZSTD-compressed ZFS.
41///
42/// `reader` is buffered internally.
43pub fn queue_nodes_from_swhids_csv<G, R: Read + Send>(
44    graph: &G,
45    reader: R,
46    column_name: &str,
47    tx: std::sync::mpsc::SyncSender<Box<[(String, NodeId)]>>,
48    batch_size: usize,
49) -> Result<()>
50where
51    G: SwhGraphWithProperties + Sync,
52    <G as SwhGraphWithProperties>::Maps: swh_graph::properties::Maps,
53{
54    // Workers in this function block while pushing data to a queue.
55    // as that queue is read by other Rayon workers, using a shared thread pool
56    // risks deadlocks, as this function can block all the threads, leaving no thread
57    // to fill the queue.
58    let pool = rayon::ThreadPoolBuilder::new()
59        .build()
60        .context("Could not build thread pool")?;
61    pool.install(|| {
62        let mut reader = csv::ReaderBuilder::new()
63            .has_headers(true)
64            .from_reader(BufReader::new(reader));
65
66        // Makes sure the input at least has a header, even when there is no payload
67        let column_id = reader
68            .headers()
69            .context("Invalid header in input")?
70            .iter()
71            .position(|item| item == column_name)
72            .context("Input has no 'swhid' header")?;
73
74        reader
75            .records()
76            .chunks(batch_size)
77            .into_iter()
78            .try_for_each(|chunk| {
79                // Process entries of this chunk in parallel
80                let results: Result<Box<[_]>> = chunk
81                    .collect::<Vec<Result<csv::StringRecord, _>>>()
82                    .into_par_iter()
83                    .map(|record| {
84                        let record = record.context("Could not parse record")?;
85                        let swhid = record.get(column_id).context("Missing cell")?;
86                        let swhid = swhid.parse().context("Could not parse SWHID")?;
87
88                        let node = graph.properties().node_id_from_string_swhid(&swhid)?;
89                        Ok((swhid, node))
90                    })
91                    .collect();
92
93                let results = results?;
94
95                // Then collect them **IN ORDER** before pushing to 'tx'.
96                tx.send(results).expect("Could not send (swhid, node_id)");
97
98                Ok::<_, anyhow::Error>(())
99            })?;
100
101        Ok(())
102    })
103}