1use std::fs::File;
7use std::io::BufWriter;
8use std::path::{Path, PathBuf};
9use std::time::{SystemTime, UNIX_EPOCH};
10
11use anyhow::{anyhow, Context, Result};
12use dsi_bitstream::codes::GammaWrite;
13use dsi_bitstream::prelude::{BitRead, BitWrite, BufBitWriter, WordAdapter, BE, NE};
14use dsi_progress_logger::{concurrent_progress_logger, progress_logger, ProgressLog};
15use itertools::Itertools;
16use lender::{for_, Lender};
17use nonmax::NonMaxU64;
18use pthash::Phf;
19use rayon::prelude::*;
20use tempfile;
21use webgraph::graphs::arc_list_graph::ArcListGraph;
22use webgraph::prelude::sort_pairs::{BitReader, BitWriter};
23use webgraph::prelude::*;
24
25use super::iter_arcs::iter_arcs;
26use super::iter_labeled_arcs::iter_labeled_arcs;
27use super::label_names::{LabelNameHasher, LabelNameMphf};
28use super::stats::estimate_edge_count;
29use crate::map::{MappedPermutation, Permutation};
30use crate::mph::LoadableSwhidMphf;
31use crate::utils::sort::par_sort_arcs;
32
33pub fn bv<MPHF: LoadableSwhidMphf + Sync>(
34 sort_batch_size: usize,
35 partitions_per_thread: usize,
36 mph_basepath: PathBuf,
37 num_nodes: usize,
38 dataset_dir: PathBuf,
39 allowed_node_types: &[crate::NodeType],
40 target_dir: PathBuf,
41) -> Result<()> {
42 log::info!("Reading MPH");
43 let mph = MPHF::load(mph_basepath).context("Could not load MPHF")?;
44 log::info!("MPH loaded, sorting arcs");
45
46 let num_threads = num_cpus::get();
47 let num_partitions = num_threads * partitions_per_thread;
48 let nodes_per_partition = num_nodes.div_ceil(num_partitions);
49
50 let num_partitions = num_nodes.div_ceil(nodes_per_partition);
52
53 let mut pl = concurrent_progress_logger!(
54 display_memory = true,
55 item_name = "arc",
56 local_speed = true,
57 expected_updates = Some(
58 estimate_edge_count(&dataset_dir, allowed_node_types)
59 .context("Could not estimate edge count")? as usize,
60 ),
61 );
62 pl.start("Reading arcs");
63
64 let temp_dir = tempfile::tempdir().context("Could not get temporary_directory")?;
66 let sorted_arcs_path = temp_dir.path().join("sorted_arcs");
67 std::fs::create_dir(&sorted_arcs_path)
68 .with_context(|| format!("Could not create {}", sorted_arcs_path.display()))?;
69 let sorted_arcs = par_sort_arcs(
70 &sorted_arcs_path,
71 sort_batch_size,
72 iter_arcs(&dataset_dir, allowed_node_types)
73 .context("Could not open input files to read arcs")?
74 .map_with(pl.clone(), |thread_pl, (src, dst)| {
75 thread_pl.light_update();
76 (src, dst)
77 }),
78 num_partitions,
79 (),
80 (),
81 |buffer, (src, dst)| {
82 let src = mph
83 .hash_str_array(&src)
84 .ok_or_else(|| anyhow!("Unknown SWHID {:?}", String::from_utf8_lossy(&src)))?;
85 let dst = mph
86 .hash_str_array(&dst)
87 .ok_or_else(|| anyhow!("Unknown SWHID {:?}", String::from_utf8_lossy(&dst)))?;
88 assert!(src < num_nodes, "src node id is greater than {num_nodes}");
89 assert!(dst < num_nodes, "dst node id is greater than {num_nodes}");
90 let partition_id = src / nodes_per_partition;
91 buffer.insert(partition_id, src, dst)?;
92 Ok(())
93 },
94 )?;
95 pl.done();
96
97 let arc_list_graphs =
98 sorted_arcs
99 .into_iter()
100 .enumerate()
101 .map(|(partition_id, sorted_arcs_partition)| {
102 webgraph::prelude::Left(ArcListGraph::new_labeled(
103 num_nodes,
104 sorted_arcs_partition.dedup(),
105 ))
106 .iter_from(partition_id * nodes_per_partition)
107 .take(nodes_per_partition)
108 });
109 let comp_flags = Default::default();
110
111 let temp_bv_dir = temp_dir.path().join("bv");
112 std::fs::create_dir(&temp_bv_dir)
113 .with_context(|| format!("Could not create {}", temp_bv_dir.display()))?;
114 BvComp::parallel_iter::<BE, _>(
115 target_dir,
116 arc_list_graphs,
117 num_nodes,
118 comp_flags,
119 &rayon::ThreadPoolBuilder::default()
120 .build()
121 .expect("Could not create BvComp thread pool"),
122 &temp_bv_dir,
123 )
124 .context("Could not build BVGraph from arcs")?;
125
126 drop(temp_dir); Ok(())
129}
130
131#[allow(clippy::too_many_arguments)]
133pub fn edge_labels<MPHF: LoadableSwhidMphf + Sync>(
134 sort_batch_size: usize,
135 partitions_per_thread: usize,
136 mph_basepath: PathBuf,
137 order: MappedPermutation,
138 label_name_hasher: LabelNameHasher,
139 num_nodes: usize,
140 dataset_dir: PathBuf,
141 allowed_node_types: &[crate::NodeType],
142 transposed: bool,
143 target_dir: &Path,
144) -> Result<usize> {
145 log::info!("Reading MPH");
146 let mph = MPHF::load(mph_basepath).context("Could not load MPHF")?;
147 log::info!("MPH loaded, sorting arcs");
148
149 let num_threads = num_cpus::get();
150 let num_partitions = num_threads * partitions_per_thread;
151 let nodes_per_partition = num_nodes.div_ceil(num_partitions);
152 let label_width = label_width(label_name_hasher.mphf());
153
154 let num_partitions = num_nodes.div_ceil(nodes_per_partition);
156
157 let mut pl = concurrent_progress_logger!(
158 display_memory = true,
159 item_name = "arc",
160 local_speed = true,
161 expected_updates = Some(
162 estimate_edge_count(&dataset_dir, allowed_node_types)
163 .context("Could not estimate edge count")? as usize,
164 ),
165 );
166 pl.start("Reading and sorting arcs");
167
168 let temp_dir = tempfile::tempdir().context("Could not get temporary_directory")?;
170 let sorted_arcs_path = temp_dir.path().join("sorted_arcs");
171 std::fs::create_dir(&sorted_arcs_path)
172 .with_context(|| format!("Could not create {}", sorted_arcs_path.display()))?;
173 let sorted_arcs = par_sort_arcs(
174 &sorted_arcs_path,
175 sort_batch_size,
176 iter_labeled_arcs(&dataset_dir, allowed_node_types, label_name_hasher)
177 .context("Could not open input files to read arcs")?
178 .map_with(pl.clone(), |thread_pl, (src, dst, label)| {
179 thread_pl.light_update();
180 (src, dst, label)
181 }),
182 num_partitions,
183 LabelSerializer { label_width },
184 LabelDeserializer { label_width },
185 |buffer, (src, dst, label)| {
186 let mut src = mph
187 .hash_str_array(&src)
188 .ok_or_else(|| anyhow!("Unknown SWHID {:?}", String::from_utf8_lossy(&src)))?;
189 let mut dst = mph
190 .hash_str_array(&dst)
191 .ok_or_else(|| anyhow!("Unknown SWHID {:?}", String::from_utf8_lossy(&dst)))?;
192 if transposed {
193 (src, dst) = (dst, src);
194 }
195 assert!(src < num_nodes, "src node id is greater than {num_nodes}");
196 assert!(dst < num_nodes, "dst node id is greater than {num_nodes}");
197 let src = order.get(src).expect("Could not permute src");
198 let dst = order.get(dst).expect("Could not permute dst");
199 let partition_id = src / nodes_per_partition;
200 buffer.insert_labeled(partition_id, src, dst, label)?;
201 Ok(())
202 },
203 )?;
204 let total_labeled_arcs = pl.count();
205 pl.done();
206
207 let arc_list_graphs =
208 sorted_arcs
209 .into_iter()
210 .enumerate()
211 .map(|(partition_id, sorted_arcs_partition)| {
212 ArcListGraph::new_labeled(num_nodes, sorted_arcs_partition)
214 .iter_from(partition_id * nodes_per_partition)
215 .take(nodes_per_partition)
216 });
217
218 let mut labels_path = target_dir.to_owned();
219 labels_path.as_mut_os_string().push("-labelled.labels");
220 let mut labels_writer =
221 BufBitWriter::<BE, _, _>::new(WordAdapter::<u8, _>::new(BufWriter::new(
222 File::create(&labels_path)
223 .with_context(|| format!("Could not create {}", labels_path.display()))?,
224 )));
225
226 let mut offsets_path = target_dir.to_owned();
227 offsets_path
228 .as_mut_os_string()
229 .push("-labelled.labeloffsets");
230 let mut offsets_writer =
231 BufBitWriter::<BE, _, _>::new(WordAdapter::<u8, _>::new(BufWriter::new(
232 File::create(&offsets_path)
233 .with_context(|| format!("Could not create {}", offsets_path.display()))?,
234 )));
235
236 let mut pl = progress_logger!(
237 display_memory = true,
238 item_name = "arc",
239 local_speed = true,
240 expected_updates = Some(total_labeled_arcs),
241 );
242 pl.start("Writing arc labels");
243
244 offsets_writer
246 .write_gamma(0)
247 .context("Could not write initial offset")?;
248
249 for partition_graph in arc_list_graphs {
250 for_!( (_src, successors) in partition_graph {
251 let mut offset_bits = 0u64;
252 for (_dst, labels) in &successors.group_by(|(dst, _label)| *dst) {
253 let mut labels: Vec<u64> = labels
254 .flat_map(|(_dst, label)| label)
255 .map(|label: NonMaxU64| u64::from(label))
256 .collect();
257 labels.par_sort_unstable();
258 pl.update_with_count(labels.len());
259
260 offset_bits = offset_bits
262 .checked_add(
263 labels_writer
264 .write_gamma(labels.len() as u64)
265 .context("Could not write number of labels")?
266 as u64,
267 )
268 .context("offset overflowed u64")?;
269 for label in labels {
270 offset_bits = offset_bits
271 .checked_add(
272 labels_writer
273 .write_bits(label, label_width)
274 .context("Could not write label")?
275 as u64,
276 )
277 .context("offset overflowed u64")?;
278 }
279 }
280
281 offsets_writer
283 .write_gamma(offset_bits)
284 .context("Could not write offset")?;
285 });
286 }
287
288 drop(
289 labels_writer
290 .into_inner()
291 .context("Could not flush labels writer")?
292 .into_inner()
293 .into_inner()
294 .context("Could not flush labels bufwriter")?,
295 );
296 drop(
297 offsets_writer
298 .into_inner()
299 .context("Could not close label offsets writer")?
300 .into_inner()
301 .into_inner()
302 .context("Could not flush label offsets bufwriter"),
303 );
304
305 pl.done();
306
307 drop(temp_dir); Ok(label_width)
310}
311
312fn label_width(mphf: &LabelNameMphf) -> usize {
313 use crate::labels::{
314 Branch, DirEntry, EdgeLabel, LabelNameId, Permission, UntypedEdgeLabel, Visit, VisitStatus,
315 };
316 let num_label_names = mphf.num_keys();
317
318 let max_visit_timestamp = SystemTime::now()
320 .duration_since(UNIX_EPOCH)
321 .expect("Could not get current time")
322 .as_secs();
323
324 let max_label = [
325 EdgeLabel::Branch(Branch::new(LabelNameId(num_label_names)).unwrap()),
326 EdgeLabel::DirEntry(DirEntry::new(Permission::None, LabelNameId(num_label_names)).unwrap()),
327 EdgeLabel::Visit(Visit::new(VisitStatus::Full, max_visit_timestamp).unwrap()),
328 ]
329 .into_iter()
330 .map(|label| UntypedEdgeLabel::from(label).0) .max()
332 .unwrap();
333 width_for_max_label_value(max_label)
334}
335
336fn width_for_max_label_value(max_label: u64) -> usize {
338 let num_label_values = max_label + 1; let num_values = num_label_values + 1; num_values
341 .next_power_of_two() .checked_ilog2()
343 .unwrap() as usize
344}
345
346#[test]
347fn test_width_for_max_label_value() {
348 assert_eq!(width_for_max_label_value(0), 1); assert_eq!(width_for_max_label_value(1), 2); assert_eq!(width_for_max_label_value(2), 2); for i in 3..=6 {
352 assert_eq!(width_for_max_label_value(i), 3);
353 }
354 for i in 7..=14 {
355 assert_eq!(width_for_max_label_value(i), 4);
356 }
357 assert_eq!(width_for_max_label_value(15), 5);
358}
359
360#[derive(Clone, Copy)]
361struct LabelDeserializer {
362 label_width: usize,
363}
364#[derive(Clone, Copy)]
365struct LabelSerializer {
366 label_width: usize,
367}
368
369impl BitDeserializer<NE, BitReader> for LabelDeserializer {
370 type DeserType = Option<NonMaxU64>;
371 fn deserialize(
372 &self,
373 bitstream: &mut BitReader,
374 ) -> Result<Self::DeserType, <BitReader as BitRead<NE>>::Error> {
375 assert_ne!(self.label_width, 64, "label_width = 64 is not implemented");
376 let max = (1u64 << self.label_width) - 1; let value = bitstream.read_bits(self.label_width)?;
378 assert!(value <= max, "Read unexpectedly large value");
379 if value == max {
380 Ok(None)
381 } else {
382 Ok(Some(NonMaxU64::try_from(value).unwrap()))
383 }
384 }
385}
386
387impl BitSerializer<NE, BitWriter> for LabelSerializer {
388 type SerType = Option<NonMaxU64>;
389 fn serialize(
390 &self,
391 value: &Self::SerType,
392 bitstream: &mut BitWriter,
393 ) -> Result<usize, <BitWriter as BitWrite<NE>>::Error> {
394 assert_ne!(self.label_width, 64, "label_width = 64 is not implemented");
395 let max = (1u64 << self.label_width) - 1;
396 match *value {
397 Some(value) => {
398 assert!(u64::from(value) < max, "value does not fit in label width");
399 bitstream.write_bits(u64::from(value), self.label_width)
400 }
401 None => bitstream.write_bits(max, self.label_width),
402 }
403 }
404}