webgraph_cli/from/
arcs.rs

1/*
2 * SPDX-FileCopyrightText: 2023 Inria
3 * SPDX-FileCopyrightText: 2023 Tommaso Fontana
4 *
5 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6 */
7
8use crate::create_parent_dir;
9use crate::*;
10use anyhow::Result;
11use clap::Parser;
12use dsi_bitstream::prelude::{Endianness, BE};
13use dsi_progress_logger::prelude::*;
14use itertools::Itertools;
15use rayon::prelude::ParallelSliceMut;
16use std::collections::HashMap;
17use std::io::{BufRead, Write};
18use std::path::PathBuf;
19use tempfile::Builder;
20use webgraph::graphs::arc_list_graph::ArcListGraph;
21use webgraph::prelude::*;
22
23#[derive(Parser, Debug)]
24#[command(
25    about = "Read from standard input a list of arcs and create a BvGraph. Each arc is specified by a pair of labels separated by a TAB (but the format is customizable), and numerical identifiers will be assigned to the labels in appearance order. The final list of node labels will be saved in a file with the same basename of the graph and extension .nodes. The option --exact can be used to use the labels directly as node identifiers. Note that in that case nodes are numbered starting from zero."
26)]
27pub struct CliArgs {
28    /// The basename of the graph.
29    pub dst: PathBuf,
30
31    #[arg(long)]
32    /// The number of nodes in the graph; if specified this will be used instead of the number inferred.
33    /// This is useful if you want to add disconnected nodes at the end of the graph.
34    pub num_nodes: Option<usize>,
35
36    #[arg(long)]
37    /// The number of arcs in the graph; if specified, it will be used to estimate the progress.
38    pub num_arcs: Option<usize>,
39
40    #[clap(flatten)]
41    pub arcs_args: ArcsArgs,
42
43    #[clap(flatten)]
44    pub num_threads: NumThreadsArg,
45
46    #[clap(flatten)]
47    pub batch_size: BatchSizeArg,
48
49    #[clap(flatten)]
50    pub ca: CompressArgs,
51}
52
53pub fn main(global_args: GlobalArgs, args: CliArgs) -> Result<()> {
54    log::info!("Reading arcs from stdin...");
55    let stdin = std::io::stdin().lock();
56    from_csv(global_args, args, stdin)
57}
58
59pub fn from_csv(global_args: GlobalArgs, args: CliArgs, file: impl BufRead) -> Result<()> {
60    let dir = Builder::new().prefix("from_arcs_sort_").tempdir()?;
61
62    let mut group_by = SortPairs::new(args.batch_size.batch_size, &dir)?;
63    let mut nodes = HashMap::new();
64
65    // read the csv and put it inside the sort pairs
66    let mut pl = ProgressLogger::default();
67    pl.display_memory(true)
68        .item_name("lines")
69        .expected_updates(args.arcs_args.max_arcs.or(args.num_arcs));
70
71    if let Some(duration) = global_args.log_interval {
72        pl.log_interval(duration);
73    }
74    pl.start("Reading arcs CSV");
75
76    let mut iter = file.lines();
77    // skip the first few lines
78    for _ in 0..args.arcs_args.lines_to_skip {
79        let _ = iter.next();
80    }
81    let biggest_idx = args
82        .arcs_args
83        .source_column
84        .max(args.arcs_args.target_column);
85    let mut num_nodes = 0;
86    let mut num_arcs = 0;
87    for (line_num, line) in iter.enumerate() {
88        // break if we reached the end
89        if let Some(max_arcs) = args.arcs_args.max_arcs {
90            if num_arcs > max_arcs {
91                break;
92            }
93        }
94        let line = line.unwrap();
95        // skip comment
96        if line.trim().starts_with(args.arcs_args.line_comment_symbol) {
97            continue;
98        }
99
100        // split the csv line into the args
101        let vals = line.split(args.arcs_args.separator).collect::<Vec<_>>();
102
103        if vals.get(biggest_idx).is_none() {
104            log::warn!(
105                "Line {}: {:?} from stdin does not have enough columns: got {} columns but expected at least {} columns separated by {:?} (you can change the separator using the --separator option)",
106                line_num,
107                line,
108                vals.len(),
109                biggest_idx + 1,
110                args.arcs_args.separator,
111            );
112            continue;
113        }
114
115        let src = vals[args.arcs_args.source_column];
116        let dst = vals[args.arcs_args.target_column];
117
118        // parse if exact, or build a node list
119        let src_id = if args.arcs_args.exact {
120            match src.parse::<usize>() {
121                Ok(src_id) => src_id,
122                Err(err) => {
123                    log::error!(
124                        "Error parsing as integer source column value {:?} at line {}: {:?}",
125                        src,
126                        line_num,
127                        err
128                    );
129                    return Ok(());
130                }
131            }
132        } else {
133            let node_id = nodes.len();
134            *nodes.entry(src.to_string()).or_insert(node_id)
135        };
136        let dst_id = if args.arcs_args.exact {
137            match dst.parse::<usize>() {
138                Ok(dst_id) => dst_id,
139                Err(err) => {
140                    log::error!(
141                        "Error parsing as integer target column value {:?} at line {}: {:?}",
142                        dst,
143                        line_num,
144                        err
145                    );
146                    return Ok(());
147                }
148            }
149        } else {
150            let node_id = nodes.len();
151            *nodes.entry(dst.to_string()).or_insert(node_id)
152        };
153
154        num_nodes = num_nodes.max(src_id.max(dst_id) + 1);
155        group_by.push(src_id, dst_id).unwrap();
156        pl.light_update();
157        num_arcs += 1;
158    }
159    pl.done();
160
161    if !args.arcs_args.exact {
162        debug_assert_eq!(
163            num_nodes,
164            nodes.len(),
165            "Consistency check of the algorithm. The number of nodes should be equal to the number of unique nodes found in the arcs."
166        );
167    }
168
169    if let Some(user_num_nodes) = args.num_nodes {
170        if user_num_nodes < num_nodes {
171            log::warn!(
172                "The number of nodes specified by --num-nodes={} is smaller than the number of nodes found in the arcs: {}",
173                user_num_nodes,
174                num_nodes
175            );
176        }
177        num_nodes = user_num_nodes;
178    }
179
180    log::info!("Arcs read: {} Nodes: {}", num_arcs, num_nodes);
181    if num_arcs == 0 {
182        log::error!(
183            "No arcs read from stdin! Check that the --separator={:?} value is correct and that the --source-column={:?} and --target-column={:?} values are correct.",
184            args.arcs_args.separator,
185            args.arcs_args.source_column,
186            args.arcs_args.target_column
187        );
188        return Ok(());
189    }
190
191    // convert the iter to a graph
192    let g = Left(ArcListGraph::new(
193        num_nodes,
194        group_by
195            .iter()
196            .unwrap()
197            .map(|(src, dst, _)| (src, dst))
198            .dedup(),
199    ));
200
201    create_parent_dir(&args.dst)?;
202
203    // compress it
204    let target_endianness = args.ca.endianness.clone();
205    let dir = Builder::new().prefix("from_arcs_compress_").tempdir()?;
206    let thread_pool = crate::get_thread_pool(args.num_threads.num_threads);
207    BvComp::parallel_endianness(
208        &args.dst,
209        &g,
210        num_nodes,
211        args.ca.into(),
212        &thread_pool,
213        dir,
214        &target_endianness.unwrap_or_else(|| BE::NAME.into()),
215    )
216    .unwrap();
217
218    // save the nodes
219    if !args.arcs_args.exact {
220        let nodes_file = args.dst.with_extension("nodes");
221        let mut pl = ProgressLogger::default();
222        pl.display_memory(true)
223            .item_name("lines")
224            .expected_updates(args.arcs_args.max_arcs.or(args.num_arcs));
225        if let Some(duration) = global_args.log_interval {
226            pl.log_interval(duration);
227        }
228
229        let mut file = std::fs::File::create(&nodes_file).unwrap();
230        let mut buf = std::io::BufWriter::new(&mut file);
231        let mut nodes = nodes.into_iter().collect::<Vec<_>>();
232        // sort based on the idx
233        nodes.par_sort_by(|(_, a), (_, b)| a.cmp(b));
234        pl.start(format!("Storing the nodes to {}", nodes_file.display()));
235        for (node, _) in nodes {
236            buf.write_all(node.as_bytes()).unwrap();
237            buf.write_all(b"\n").unwrap();
238            pl.light_update();
239        }
240        pl.done();
241    }
242    Ok(())
243}