webgraph_cli/from/
arcs.rs1use crate::create_parent_dir;
9use crate::*;
10use anyhow::{Context, Result};
11use clap::Parser;
12use dsi_bitstream::prelude::{BE, Endianness};
13use dsi_progress_logger::prelude::*;
14use itertools::Itertools;
15use rayon::prelude::ParallelSliceMut;
16use std::collections::HashMap;
17use std::io::{BufRead, Write};
18use std::path::PathBuf;
19use tempfile::Builder;
20use webgraph::graphs::arc_list_graph::ArcListGraph;
21use webgraph::prelude::*;
22
23#[derive(Parser, Debug)]
24#[command(
25 about = "Read from standard input a list of arcs and create a BvGraph. Each arc is specified by a pair of labels separated by a TAB (but the format is customizable), and numerical identifiers will be assigned to the labels in appearance order. The final list of node labels will be saved in a file with the same basename of the graph and extension .nodes. The option --exact can be used to use the labels directly as node identifiers. Note that in that case nodes are numbered starting from zero."
26)]
27pub struct CliArgs {
28 pub dst: PathBuf,
30
31 #[arg(long)]
32 pub num_nodes: Option<usize>,
35
36 #[arg(long)]
37 pub num_arcs: Option<usize>,
39
40 #[clap(flatten)]
41 pub arcs_args: ArcsArgs,
42
43 #[clap(flatten)]
44 pub num_threads: NumThreadsArg,
45
46 #[clap(flatten)]
47 pub memory_usage: MemoryUsageArg,
48
49 #[clap(flatten)]
50 pub ca: CompressArgs,
51}
52
53pub fn main(global_args: GlobalArgs, args: CliArgs) -> Result<()> {
54 log::info!("Reading arcs from stdin...");
55 let stdin = std::io::stdin().lock();
56 from_csv(global_args, args, stdin)
57}
58
59pub fn from_csv(global_args: GlobalArgs, args: CliArgs, file: impl BufRead) -> Result<()> {
60 let dir = Builder::new().prefix("from_arcs_sort_").tempdir()?;
61
62 let mut group_by = SortPairs::new(args.memory_usage.memory_usage, &dir)?;
63 let mut nodes = HashMap::new();
64
65 let mut pl = ProgressLogger::default();
67 pl.display_memory(true)
68 .item_name("lines")
69 .expected_updates(args.arcs_args.max_arcs.or(args.num_arcs));
70
71 if let Some(duration) = global_args.log_interval {
72 pl.log_interval(duration);
73 }
74 pl.start("Reading arcs CSV");
75
76 let mut iter = file.lines();
77 for _ in 0..args.arcs_args.lines_to_skip {
79 let _ = iter.next();
80 }
81 let biggest_idx = args
82 .arcs_args
83 .source_column
84 .max(args.arcs_args.target_column);
85 let mut num_nodes = 0;
86 let mut num_arcs = 0;
87 for (line_num, line) in iter.enumerate() {
88 if let Some(max_arcs) = args.arcs_args.max_arcs {
90 if num_arcs >= max_arcs {
91 break;
92 }
93 }
94 let line = line.unwrap();
95 if line.trim().starts_with(args.arcs_args.line_comment_symbol) {
97 continue;
98 }
99
100 let vals = line.split(args.arcs_args.separator).collect::<Vec<_>>();
102
103 if vals.get(biggest_idx).is_none() {
104 log::warn!(
105 "Line {}: {:?} from stdin does not have enough columns: got {} columns but expected at least {} columns separated by {:?} (you can change the separator using the --separator option)",
106 line_num,
107 line,
108 vals.len(),
109 biggest_idx + 1,
110 args.arcs_args.separator,
111 );
112 continue;
113 }
114
115 let src = vals[args.arcs_args.source_column];
116 let dst = vals[args.arcs_args.target_column];
117
118 let src_id = if args.arcs_args.labels {
120 let node_id = nodes.len();
121 *nodes.entry(src.to_string()).or_insert(node_id)
122 } else {
123 src.parse::<usize>().with_context(|| {
124 format!(
125 "Error parsing as integer source column value {:?} at line {}",
126 src, line_num,
127 )
128 })?
129 };
130 let dst_id = if args.arcs_args.labels {
131 let node_id = nodes.len();
132 *nodes.entry(dst.to_string()).or_insert(node_id)
133 } else {
134 dst.parse::<usize>().with_context(|| {
135 format!(
136 "Error parsing as integer target column value {:?} at line {}",
137 dst, line_num,
138 )
139 })?
140 };
141
142 num_nodes = num_nodes.max(src_id.max(dst_id) + 1);
143 group_by.push(src_id, dst_id).unwrap();
144 pl.light_update();
145 num_arcs += 1;
146 }
147 pl.done();
148
149 if args.arcs_args.labels {
150 debug_assert_eq!(
151 num_nodes,
152 nodes.len(),
153 "Consistency check of the algorithm. The number of nodes should be equal to the number of unique nodes found in the arcs."
154 );
155 }
156
157 if let Some(user_num_nodes) = args.num_nodes {
158 if user_num_nodes < num_nodes {
159 log::warn!(
160 "The number of nodes specified by --num-nodes={} is smaller than the number of nodes found in the arcs: {}",
161 user_num_nodes,
162 num_nodes
163 );
164 }
165 num_nodes = user_num_nodes;
166 }
167
168 log::info!("Arcs read: {} Nodes: {}", num_arcs, num_nodes);
169 if num_arcs == 0 {
170 log::error!(
171 "No arcs read from stdin! Check that the --separator={:?} value is correct and that the --source-column={:?} and --target-column={:?} values are correct.",
172 args.arcs_args.separator,
173 args.arcs_args.source_column,
174 args.arcs_args.target_column
175 );
176 return Ok(());
177 }
178
179 let g = ArcListGraph::new(
181 num_nodes,
182 group_by.iter().unwrap().map(|(pair, _)| pair).dedup(),
183 );
184
185 create_parent_dir(&args.dst)?;
186
187 let target_endianness = args.ca.endianness.clone();
189 let dir = Builder::new().prefix("from_arcs_compress_").tempdir()?;
190 let thread_pool = crate::get_thread_pool(args.num_threads.num_threads);
191 let chunk_size = args.ca.chunk_size;
192 let bvgraphz = args.ca.bvgraphz;
193 let mut builder = BvCompConfig::new(&args.dst)
194 .with_comp_flags(args.ca.into())
195 .with_tmp_dir(&dir);
196
197 if bvgraphz {
198 builder = builder.with_chunk_size(chunk_size);
199 }
200
201 thread_pool.install(|| {
202 builder.par_comp_lenders_endianness(
203 &g,
204 num_nodes,
205 &target_endianness.unwrap_or_else(|| BE::NAME.into()),
206 )
207 })?;
208
209 if args.arcs_args.labels {
211 let nodes_file = args.dst.with_extension("nodes");
212 let mut pl = ProgressLogger::default();
213 pl.display_memory(true)
214 .item_name("lines")
215 .expected_updates(args.arcs_args.max_arcs.or(args.num_arcs));
216 if let Some(duration) = global_args.log_interval {
217 pl.log_interval(duration);
218 }
219
220 let mut file = std::fs::File::create(&nodes_file).unwrap();
221 let mut buf = std::io::BufWriter::new(&mut file);
222 let mut nodes = nodes.into_iter().collect::<Vec<_>>();
223 nodes.par_sort_by(|(_, a), (_, b)| a.cmp(b));
225 pl.start(format!("Storing the nodes to {}", nodes_file.display()));
226 for (node, _) in nodes {
227 buf.write_all(node.as_bytes()).unwrap();
228 buf.write_all(b"\n").unwrap();
229 pl.light_update();
230 }
231 pl.done();
232 }
233 Ok(())
234}