webgraph_cli/from/
arcs.rs1use crate::create_parent_dir;
9use crate::*;
10use anyhow::Result;
11use clap::Parser;
12use dsi_bitstream::prelude::{Endianness, BE};
13use dsi_progress_logger::prelude::*;
14use itertools::Itertools;
15use rayon::prelude::ParallelSliceMut;
16use std::collections::HashMap;
17use std::io::{BufRead, Write};
18use std::path::PathBuf;
19use tempfile::Builder;
20use webgraph::graphs::arc_list_graph::ArcListGraph;
21use webgraph::prelude::*;
22
23#[derive(Parser, Debug)]
24#[command(
25 about = "Read from standard input a list of arcs and create a BvGraph. Each arc is specified by a pair of labels separated by a TAB (but the format is customizable), and numerical identifiers will be assigned to the labels in appearance order. The final list of node labels will be saved in a file with the same basename of the graph and extension .nodes. The option --exact can be used to use the labels directly as node identifiers. Note that in that case nodes are numbered starting from zero."
26)]
27pub struct CliArgs {
28 pub dst: PathBuf,
30
31 #[arg(long)]
32 pub num_nodes: Option<usize>,
35
36 #[arg(long)]
37 pub num_arcs: Option<usize>,
39
40 #[clap(flatten)]
41 pub arcs_args: ArcsArgs,
42
43 #[clap(flatten)]
44 pub num_threads: NumThreadsArg,
45
46 #[clap(flatten)]
47 pub batch_size: BatchSizeArg,
48
49 #[clap(flatten)]
50 pub ca: CompressArgs,
51}
52
53pub fn main(global_args: GlobalArgs, args: CliArgs) -> Result<()> {
54 log::info!("Reading arcs from stdin...");
55 let stdin = std::io::stdin().lock();
56 from_csv(global_args, args, stdin)
57}
58
59pub fn from_csv(global_args: GlobalArgs, args: CliArgs, file: impl BufRead) -> Result<()> {
60 let dir = Builder::new().prefix("from_arcs_sort_").tempdir()?;
61
62 let mut group_by = SortPairs::new(args.batch_size.batch_size, &dir)?;
63 let mut nodes = HashMap::new();
64
65 let mut pl = ProgressLogger::default();
67 pl.display_memory(true)
68 .item_name("lines")
69 .expected_updates(args.arcs_args.max_arcs.or(args.num_arcs));
70
71 if let Some(duration) = global_args.log_interval {
72 pl.log_interval(duration);
73 }
74 pl.start("Reading arcs CSV");
75
76 let mut iter = file.lines();
77 for _ in 0..args.arcs_args.lines_to_skip {
79 let _ = iter.next();
80 }
81 let biggest_idx = args
82 .arcs_args
83 .source_column
84 .max(args.arcs_args.target_column);
85 let mut num_nodes = 0;
86 let mut num_arcs = 0;
87 for (line_num, line) in iter.enumerate() {
88 if let Some(max_arcs) = args.arcs_args.max_arcs {
90 if num_arcs > max_arcs {
91 break;
92 }
93 }
94 let line = line.unwrap();
95 if line.trim().starts_with(args.arcs_args.line_comment_symbol) {
97 continue;
98 }
99
100 let vals = line.split(args.arcs_args.separator).collect::<Vec<_>>();
102
103 if vals.get(biggest_idx).is_none() {
104 log::warn!(
105 "Line {}: {:?} from stdin does not have enough columns: got {} columns but expected at least {} columns separated by {:?} (you can change the separator using the --separator option)",
106 line_num,
107 line,
108 vals.len(),
109 biggest_idx + 1,
110 args.arcs_args.separator,
111 );
112 continue;
113 }
114
115 let src = vals[args.arcs_args.source_column];
116 let dst = vals[args.arcs_args.target_column];
117
118 let src_id = if args.arcs_args.exact {
120 match src.parse::<usize>() {
121 Ok(src_id) => src_id,
122 Err(err) => {
123 log::error!(
124 "Error parsing as integer source column value {:?} at line {}: {:?}",
125 src,
126 line_num,
127 err
128 );
129 return Ok(());
130 }
131 }
132 } else {
133 let node_id = nodes.len();
134 *nodes.entry(src.to_string()).or_insert(node_id)
135 };
136 let dst_id = if args.arcs_args.exact {
137 match dst.parse::<usize>() {
138 Ok(dst_id) => dst_id,
139 Err(err) => {
140 log::error!(
141 "Error parsing as integer target column value {:?} at line {}: {:?}",
142 dst,
143 line_num,
144 err
145 );
146 return Ok(());
147 }
148 }
149 } else {
150 let node_id = nodes.len();
151 *nodes.entry(dst.to_string()).or_insert(node_id)
152 };
153
154 num_nodes = num_nodes.max(src_id.max(dst_id) + 1);
155 group_by.push(src_id, dst_id).unwrap();
156 pl.light_update();
157 num_arcs += 1;
158 }
159 pl.done();
160
161 if !args.arcs_args.exact {
162 debug_assert_eq!(
163 num_nodes,
164 nodes.len(),
165 "Consistency check of the algorithm. The number of nodes should be equal to the number of unique nodes found in the arcs."
166 );
167 }
168
169 if let Some(user_num_nodes) = args.num_nodes {
170 if user_num_nodes < num_nodes {
171 log::warn!(
172 "The number of nodes specified by --num-nodes={} is smaller than the number of nodes found in the arcs: {}",
173 user_num_nodes,
174 num_nodes
175 );
176 }
177 num_nodes = user_num_nodes;
178 }
179
180 log::info!("Arcs read: {} Nodes: {}", num_arcs, num_nodes);
181 if num_arcs == 0 {
182 log::error!(
183 "No arcs read from stdin! Check that the --separator={:?} value is correct and that the --source-column={:?} and --target-column={:?} values are correct.",
184 args.arcs_args.separator,
185 args.arcs_args.source_column,
186 args.arcs_args.target_column
187 );
188 return Ok(());
189 }
190
191 let g = Left(ArcListGraph::new(
193 num_nodes,
194 group_by
195 .iter()
196 .unwrap()
197 .map(|(src, dst, _)| (src, dst))
198 .dedup(),
199 ));
200
201 create_parent_dir(&args.dst)?;
202
203 let target_endianness = args.ca.endianness.clone();
205 let dir = Builder::new().prefix("from_arcs_compress_").tempdir()?;
206 let thread_pool = crate::get_thread_pool(args.num_threads.num_threads);
207 BvComp::parallel_endianness(
208 &args.dst,
209 &g,
210 num_nodes,
211 args.ca.into(),
212 &thread_pool,
213 dir,
214 &target_endianness.unwrap_or_else(|| BE::NAME.into()),
215 )
216 .unwrap();
217
218 if !args.arcs_args.exact {
220 let nodes_file = args.dst.with_extension("nodes");
221 let mut pl = ProgressLogger::default();
222 pl.display_memory(true)
223 .item_name("lines")
224 .expected_updates(args.arcs_args.max_arcs.or(args.num_arcs));
225 if let Some(duration) = global_args.log_interval {
226 pl.log_interval(duration);
227 }
228
229 let mut file = std::fs::File::create(&nodes_file).unwrap();
230 let mut buf = std::io::BufWriter::new(&mut file);
231 let mut nodes = nodes.into_iter().collect::<Vec<_>>();
232 nodes.par_sort_by(|(_, a), (_, b)| a.cmp(b));
234 pl.start(format!("Storing the nodes to {}", nodes_file.display()));
235 for (node, _) in nodes {
236 buf.write_all(node.as_bytes()).unwrap();
237 buf.write_all(b"\n").unwrap();
238 pl.light_update();
239 }
240 pl.done();
241 }
242 Ok(())
243}