use std::io::{BufReader, Read};
use anyhow::{Context, Result};
use itertools::Itertools;
use rayon::prelude::*;
use swh_graph::graph::{NodeId, SwhGraphWithProperties};
pub fn queue_nodes_from_swhids_csv<G, R: Read + Send>(
graph: &G,
reader: R,
column_name: &str,
tx: std::sync::mpsc::SyncSender<Box<[(String, NodeId)]>>,
batch_size: usize,
) -> Result<()>
where
G: SwhGraphWithProperties + Sync,
<G as SwhGraphWithProperties>::Maps: swh_graph::properties::Maps,
{
let pool = rayon::ThreadPoolBuilder::new()
.build()
.context("Could not build thread pool")?;
pool.install(|| {
let mut reader = csv::ReaderBuilder::new()
.has_headers(true)
.from_reader(BufReader::new(reader));
let column_id = reader
.headers()
.context("Invalid header in input")?
.iter()
.position(|item| item == column_name)
.context("Input has no 'swhid' header")?;
reader
.records()
.chunks(batch_size)
.into_iter()
.try_for_each(|chunk| {
let results: Result<Box<[_]>> = chunk
.collect::<Vec<Result<csv::StringRecord, _>>>()
.into_par_iter()
.map(|record| {
let record = record.context("Could not parse record")?;
let swhid = record.get(column_id).context("Missing cell")?;
let swhid = swhid.parse().context("Could not parse SWHID")?;
let node = graph.properties().node_id_from_string_swhid(&swhid)?;
Ok((swhid, node))
})
.collect();
let results = results?;
tx.send(results).expect("Could not send (swhid, node_id)");
Ok::<_, anyhow::Error>(())
})?;
Ok(())
})
}