swh_graph_stdlib/io/mod.rs
1// Copyright (C) 2023-2026 The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6use std::io::{BufReader, Read};
7
8use anyhow::{Context, Result};
9use itertools::Itertools;
10use rayon::prelude::*;
11use swh_graph::graph::{NodeId, SwhGraphWithProperties};
12
13/// Reads CSV records from a file, and queues their SWHIDs and node ids to `tx`,
14/// preserving the order.
15///
16/// This is equivalent to:
17///
18/// ```no_compile
19/// std::thread::spawn(move || -> Result<()> {
20/// let mut reader = csv::ReaderBuilder::new()
21/// .has_headers(true)
22/// .from_reader(reader);
23///
24/// for record in reader.deserialize() {
25/// let InputRecord { swhid, .. } =
26/// record.with_context(|| format!("Could not deserialize record"))?;
27/// let node = graph
28/// .properties()
29/// .node_id_from_string_swhid(swhid)
30/// .with_context(|| format!("Unknown SWHID: {}", swhid))?;
31///
32/// tx.send((swhid, node))
33/// }
34/// });
35/// ```
36///
37/// but uses inner parallelism as `node_id()` could otherwise be a bottleneck on systems
38/// where accessing `graph.order` has high latency (network and/or compressed filesystem).
39/// This reduces the runtime from a couple of weeks to less than a day on the 2023-09-06
40/// graph on a ZSTD-compressed ZFS.
41///
42/// `reader` is buffered internally.
43pub fn queue_nodes_from_swhids_csv<G, R: Read + Send>(
44 graph: &G,
45 reader: R,
46 column_name: &str,
47 tx: std::sync::mpsc::SyncSender<Box<[(String, NodeId)]>>,
48 batch_size: usize,
49) -> Result<()>
50where
51 G: SwhGraphWithProperties + Sync,
52 <G as SwhGraphWithProperties>::Maps: swh_graph::properties::Maps,
53{
54 // Workers in this function block while pushing data to a queue.
55 // as that queue is read by other Rayon workers, using a shared thread pool
56 // risks deadlocks, as this function can block all the threads, leaving no thread
57 // to fill the queue.
58 let pool = rayon::ThreadPoolBuilder::new()
59 .build()
60 .context("Could not build thread pool")?;
61 pool.install(|| {
62 let mut reader = csv::ReaderBuilder::new()
63 .has_headers(true)
64 .from_reader(BufReader::new(reader));
65
66 // Makes sure the input at least has a header, even when there is no payload
67 let column_id = reader
68 .headers()
69 .context("Invalid header in input")?
70 .iter()
71 .position(|item| item == column_name)
72 .context("Input has no 'swhid' header")?;
73
74 reader
75 .records()
76 .chunks(batch_size)
77 .into_iter()
78 .try_for_each(|chunk| {
79 // Process entries of this chunk in parallel
80 let results: Result<Box<[_]>> = chunk
81 .collect::<Vec<Result<csv::StringRecord, _>>>()
82 .into_par_iter()
83 .map(|record| {
84 let record = record.context("Could not parse record")?;
85 let swhid = record.get(column_id).context("Missing cell")?;
86 let swhid = swhid.parse().context("Could not parse SWHID")?;
87
88 let node = graph.properties().node_id_from_string_swhid(&swhid)?;
89 Ok((swhid, node))
90 })
91 .collect();
92
93 let results = results?;
94
95 // Then collect them **IN ORDER** before pushing to 'tx'.
96 tx.send(results).expect("Could not send (swhid, node_id)");
97
98 Ok::<_, anyhow::Error>(())
99 })?;
100
101 Ok(())
102 })
103}