Skip to main content

webgraph_cli/from/
arcs.rs

1/*
2 * SPDX-FileCopyrightText: 2023 Inria
3 * SPDX-FileCopyrightText: 2023 Tommaso Fontana
4 *
5 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6 */
7
8use crate::create_parent_dir;
9use crate::*;
10use anyhow::{Context, Result};
11use clap::Parser;
12use dsi_bitstream::prelude::{BE, Endianness};
13use dsi_progress_logger::prelude::*;
14use itertools::Itertools;
15use rayon::prelude::ParallelSliceMut;
16use std::collections::HashMap;
17use std::io::{BufRead, Write};
18use std::path::PathBuf;
19use tempfile::Builder;
20use webgraph::graphs::arc_list_graph::ArcListGraph;
21use webgraph::prelude::*;
22
23#[derive(Parser, Debug)]
24#[command(
25    about = "Read from standard input a list of arcs and create a BvGraph. Each arc is specified by a pair of labels separated by a TAB (but the format is customizable), and numerical identifiers will be assigned to the labels in appearance order. The final list of node labels will be saved in a file with the same basename of the graph and extension .nodes. The option --exact can be used to use the labels directly as node identifiers. Note that in that case nodes are numbered starting from zero."
26)]
27pub struct CliArgs {
28    /// The basename of the graph.
29    pub dst: PathBuf,
30
31    #[arg(long)]
32    /// The number of nodes in the graph; if specified this will be used instead of the number inferred.
33    /// This is useful if you want to add disconnected nodes at the end of the graph.
34    pub num_nodes: Option<usize>,
35
36    #[arg(long)]
37    /// The number of arcs in the graph; if specified, it will be used to estimate the progress.
38    pub num_arcs: Option<usize>,
39
40    #[clap(flatten)]
41    pub arcs_args: ArcsArgs,
42
43    #[clap(flatten)]
44    pub num_threads: NumThreadsArg,
45
46    #[clap(flatten)]
47    pub memory_usage: MemoryUsageArg,
48
49    #[clap(flatten)]
50    pub ca: CompressArgs,
51}
52
53pub fn main(global_args: GlobalArgs, args: CliArgs) -> Result<()> {
54    log::info!("Reading arcs from stdin...");
55    let stdin = std::io::stdin().lock();
56    from_csv(global_args, args, stdin)
57}
58
59pub fn from_csv(global_args: GlobalArgs, args: CliArgs, file: impl BufRead) -> Result<()> {
60    let dir = Builder::new().prefix("from_arcs_sort_").tempdir()?;
61
62    let mut group_by = SortPairs::new(args.memory_usage.memory_usage, &dir)?;
63    let mut nodes = HashMap::new();
64
65    // read the csv and put it inside the sort pairs
66    let mut pl = ProgressLogger::default();
67    pl.display_memory(true)
68        .item_name("lines")
69        .expected_updates(args.arcs_args.max_arcs.or(args.num_arcs));
70
71    if let Some(duration) = global_args.log_interval {
72        pl.log_interval(duration);
73    }
74    pl.start("Reading arcs CSV");
75
76    let mut iter = file.lines();
77    // skip the first few lines
78    for _ in 0..args.arcs_args.lines_to_skip {
79        let _ = iter.next();
80    }
81    let biggest_idx = args
82        .arcs_args
83        .source_column
84        .max(args.arcs_args.target_column);
85    let mut num_nodes = 0;
86    let mut num_arcs = 0;
87    for (line_num, line) in iter.enumerate() {
88        // break if we reached the end
89        if let Some(max_arcs) = args.arcs_args.max_arcs {
90            if num_arcs >= max_arcs {
91                break;
92            }
93        }
94        let line = line.unwrap();
95        // skip comment
96        if line.trim().starts_with(args.arcs_args.line_comment_symbol) {
97            continue;
98        }
99
100        // split the csv line into the args
101        let vals = line.split(args.arcs_args.separator).collect::<Vec<_>>();
102
103        if vals.get(biggest_idx).is_none() {
104            log::warn!(
105                "Line {}: {:?} from stdin does not have enough columns: got {} columns but expected at least {} columns separated by {:?} (you can change the separator using the --separator option)",
106                line_num,
107                line,
108                vals.len(),
109                biggest_idx + 1,
110                args.arcs_args.separator,
111            );
112            continue;
113        }
114
115        let src = vals[args.arcs_args.source_column];
116        let dst = vals[args.arcs_args.target_column];
117
118        // parse if exact, or build a node list
119        let src_id = if args.arcs_args.labels {
120            let node_id = nodes.len();
121            *nodes.entry(src.to_string()).or_insert(node_id)
122        } else {
123            src.parse::<usize>().with_context(|| {
124                format!(
125                    "Error parsing as integer source column value {:?} at line {}",
126                    src, line_num,
127                )
128            })?
129        };
130        let dst_id = if args.arcs_args.labels {
131            let node_id = nodes.len();
132            *nodes.entry(dst.to_string()).or_insert(node_id)
133        } else {
134            dst.parse::<usize>().with_context(|| {
135                format!(
136                    "Error parsing as integer target column value {:?} at line {}",
137                    dst, line_num,
138                )
139            })?
140        };
141
142        num_nodes = num_nodes.max(src_id.max(dst_id) + 1);
143        group_by.push(src_id, dst_id).unwrap();
144        pl.light_update();
145        num_arcs += 1;
146    }
147    pl.done();
148
149    if args.arcs_args.labels {
150        debug_assert_eq!(
151            num_nodes,
152            nodes.len(),
153            "Consistency check of the algorithm. The number of nodes should be equal to the number of unique nodes found in the arcs."
154        );
155    }
156
157    if let Some(user_num_nodes) = args.num_nodes {
158        if user_num_nodes < num_nodes {
159            log::warn!(
160                "The number of nodes specified by --num-nodes={} is smaller than the number of nodes found in the arcs: {}",
161                user_num_nodes,
162                num_nodes
163            );
164        }
165        num_nodes = user_num_nodes;
166    }
167
168    log::info!("Arcs read: {} Nodes: {}", num_arcs, num_nodes);
169    if num_arcs == 0 {
170        log::error!(
171            "No arcs read from stdin! Check that the --separator={:?} value is correct and that the --source-column={:?} and --target-column={:?} values are correct.",
172            args.arcs_args.separator,
173            args.arcs_args.source_column,
174            args.arcs_args.target_column
175        );
176        return Ok(());
177    }
178
179    // convert the iter to a graph
180    let g = ArcListGraph::new(
181        num_nodes,
182        group_by.iter().unwrap().map(|(pair, _)| pair).dedup(),
183    );
184
185    create_parent_dir(&args.dst)?;
186
187    // compress it
188    let target_endianness = args.ca.endianness.clone();
189    let dir = Builder::new().prefix("from_arcs_compress_").tempdir()?;
190    let thread_pool = crate::get_thread_pool(args.num_threads.num_threads);
191    let chunk_size = args.ca.chunk_size;
192    let bvgraphz = args.ca.bvgraphz;
193    let mut builder = BvCompConfig::new(&args.dst)
194        .with_comp_flags(args.ca.into())
195        .with_tmp_dir(&dir);
196
197    if bvgraphz {
198        builder = builder.with_chunk_size(chunk_size);
199    }
200
201    thread_pool.install(|| {
202        builder.par_comp_lenders_endianness(
203            &g,
204            num_nodes,
205            &target_endianness.unwrap_or_else(|| BE::NAME.into()),
206        )
207    })?;
208
209    // save the nodes
210    if args.arcs_args.labels {
211        let nodes_file = args.dst.with_extension("nodes");
212        let mut pl = ProgressLogger::default();
213        pl.display_memory(true)
214            .item_name("lines")
215            .expected_updates(args.arcs_args.max_arcs.or(args.num_arcs));
216        if let Some(duration) = global_args.log_interval {
217            pl.log_interval(duration);
218        }
219
220        let mut file = std::fs::File::create(&nodes_file).unwrap();
221        let mut buf = std::io::BufWriter::new(&mut file);
222        let mut nodes = nodes.into_iter().collect::<Vec<_>>();
223        // sort based on the idx
224        nodes.par_sort_by(|(_, a), (_, b)| a.cmp(b));
225        pl.start(format!("Storing the nodes to {}", nodes_file.display()));
226        for (node, _) in nodes {
227            buf.write_all(node.as_bytes()).unwrap();
228            buf.write_all(b"\n").unwrap();
229            pl.light_update();
230        }
231        pl.done();
232    }
233    Ok(())
234}