webgraph_cli/from/
arcs.rs1use crate::create_parent_dir;
9use crate::*;
10use anyhow::Result;
11use clap::Parser;
12use dsi_bitstream::prelude::{Endianness, BE};
13use dsi_progress_logger::prelude::*;
14use itertools::Itertools;
15use rayon::prelude::ParallelSliceMut;
16use std::collections::HashMap;
17use std::io::{BufRead, Write};
18use std::path::PathBuf;
19use tempfile::Builder;
20use webgraph::graphs::arc_list_graph::ArcListGraph;
21use webgraph::prelude::*;
22
23#[derive(Parser, Debug)]
24#[command(
25 about = "Read from standard input a list of arcs and create a BvGraph. Each arc is specified by a pair of labels separated by a TAB (but the format is customizable), and numerical identifiers will be assigned to the labels in appearance order. The final list of node labels will be saved in a file with the same basename of the graph and extension .nodes. The option --exact can be used to use the labels directly as node identifiers. Note that in that case nodes are numbered starting from zero."
26)]
27pub struct CliArgs {
28 pub dst: PathBuf,
30
31 #[arg(long)]
32 pub num_nodes: Option<usize>,
35
36 #[arg(long)]
37 pub num_arcs: Option<usize>,
39
40 #[clap(flatten)]
41 pub arcs_args: ArcsArgs,
42
43 #[clap(flatten)]
44 pub num_threads: NumThreadsArg,
45
46 #[clap(flatten)]
47 pub memory_usage: MemoryUsageArg,
48
49 #[clap(flatten)]
50 pub ca: CompressArgs,
51}
52
53pub fn main(global_args: GlobalArgs, args: CliArgs) -> Result<()> {
54 log::info!("Reading arcs from stdin...");
55 let stdin = std::io::stdin().lock();
56 from_csv(global_args, args, stdin)
57}
58
59pub fn from_csv(global_args: GlobalArgs, args: CliArgs, file: impl BufRead) -> Result<()> {
60 let dir = Builder::new().prefix("from_arcs_sort_").tempdir()?;
61
62 let mut group_by = SortPairs::new(args.memory_usage.memory_usage, &dir)?;
63 let mut nodes = HashMap::new();
64
65 let mut pl = ProgressLogger::default();
67 pl.display_memory(true)
68 .item_name("lines")
69 .expected_updates(args.arcs_args.max_arcs.or(args.num_arcs));
70
71 if let Some(duration) = global_args.log_interval {
72 pl.log_interval(duration);
73 }
74 pl.start("Reading arcs CSV");
75
76 let mut iter = file.lines();
77 for _ in 0..args.arcs_args.lines_to_skip {
79 let _ = iter.next();
80 }
81 let biggest_idx = args
82 .arcs_args
83 .source_column
84 .max(args.arcs_args.target_column);
85 let mut num_nodes = 0;
86 let mut num_arcs = 0;
87 for (line_num, line) in iter.enumerate() {
88 if let Some(max_arcs) = args.arcs_args.max_arcs {
90 if num_arcs > max_arcs {
91 break;
92 }
93 }
94 let line = line.unwrap();
95 if line.trim().starts_with(args.arcs_args.line_comment_symbol) {
97 continue;
98 }
99
100 let vals = line.split(args.arcs_args.separator).collect::<Vec<_>>();
102
103 if vals.get(biggest_idx).is_none() {
104 log::warn!(
105 "Line {}: {:?} from stdin does not have enough columns: got {} columns but expected at least {} columns separated by {:?} (you can change the separator using the --separator option)",
106 line_num,
107 line,
108 vals.len(),
109 biggest_idx + 1,
110 args.arcs_args.separator,
111 );
112 continue;
113 }
114
115 let src = vals[args.arcs_args.source_column];
116 let dst = vals[args.arcs_args.target_column];
117
118 let src_id = if args.arcs_args.labels {
120 let node_id = nodes.len();
121 *nodes.entry(src.to_string()).or_insert(node_id)
122 } else {
123 match src.parse::<usize>() {
124 Ok(src_id) => src_id,
125 Err(err) => {
126 log::error!(
127 "Error parsing as integer source column value {:?} at line {}: {:?}",
128 src,
129 line_num,
130 err
131 );
132 return Ok(());
133 }
134 }
135 };
136 let dst_id = if args.arcs_args.labels {
137 let node_id = nodes.len();
138 *nodes.entry(dst.to_string()).or_insert(node_id)
139 } else {
140 match dst.parse::<usize>() {
141 Ok(dst_id) => dst_id,
142 Err(err) => {
143 log::error!(
144 "Error parsing as integer target column value {:?} at line {}: {:?}",
145 dst,
146 line_num,
147 err
148 );
149 return Ok(());
150 }
151 }
152 };
153
154 num_nodes = num_nodes.max(src_id.max(dst_id) + 1);
155 group_by.push(src_id, dst_id).unwrap();
156 pl.light_update();
157 num_arcs += 1;
158 }
159 pl.done();
160
161 if args.arcs_args.labels {
162 debug_assert_eq!(
163 num_nodes,
164 nodes.len(),
165 "Consistency check of the algorithm. The number of nodes should be equal to the number of unique nodes found in the arcs."
166 );
167 }
168
169 if let Some(user_num_nodes) = args.num_nodes {
170 if user_num_nodes < num_nodes {
171 log::warn!(
172 "The number of nodes specified by --num-nodes={} is smaller than the number of nodes found in the arcs: {}",
173 user_num_nodes,
174 num_nodes
175 );
176 }
177 num_nodes = user_num_nodes;
178 }
179
180 log::info!("Arcs read: {} Nodes: {}", num_arcs, num_nodes);
181 if num_arcs == 0 {
182 log::error!(
183 "No arcs read from stdin! Check that the --separator={:?} value is correct and that the --source-column={:?} and --target-column={:?} values are correct.",
184 args.arcs_args.separator,
185 args.arcs_args.source_column,
186 args.arcs_args.target_column
187 );
188 return Ok(());
189 }
190
191 let g = ArcListGraph::new(
193 num_nodes,
194 group_by.iter().unwrap().map(|(pair, _)| pair).dedup(),
195 );
196
197 create_parent_dir(&args.dst)?;
198
199 let target_endianness = args.ca.endianness.clone();
201 let dir = Builder::new().prefix("from_arcs_compress_").tempdir()?;
202 let thread_pool = crate::get_thread_pool(args.num_threads.num_threads);
203 BvComp::parallel_endianness(
204 &args.dst,
205 &g,
206 num_nodes,
207 args.ca.into(),
208 &thread_pool,
209 dir,
210 &target_endianness.unwrap_or_else(|| BE::NAME.into()),
211 )
212 .unwrap();
213
214 if args.arcs_args.labels {
216 let nodes_file = args.dst.with_extension("nodes");
217 let mut pl = ProgressLogger::default();
218 pl.display_memory(true)
219 .item_name("lines")
220 .expected_updates(args.arcs_args.max_arcs.or(args.num_arcs));
221 if let Some(duration) = global_args.log_interval {
222 pl.log_interval(duration);
223 }
224
225 let mut file = std::fs::File::create(&nodes_file).unwrap();
226 let mut buf = std::io::BufWriter::new(&mut file);
227 let mut nodes = nodes.into_iter().collect::<Vec<_>>();
228 nodes.par_sort_by(|(_, a), (_, b)| a.cmp(b));
230 pl.start(format!("Storing the nodes to {}", nodes_file.display()));
231 for (node, _) in nodes {
232 buf.write_all(node.as_bytes()).unwrap();
233 buf.write_all(b"\n").unwrap();
234 pl.light_update();
235 }
236 pl.done();
237 }
238 Ok(())
239}