1use std::io::{BufWriter, Write};
7use std::path::PathBuf;
8use std::sync::atomic::Ordering;
9
10use anyhow::{Context, Result};
11use ar_row::deserialize::{ArRowDeserialize, ArRowStruct};
12use ar_row_derive::ArRowDeserialize;
13use common_traits::{Atomic, IntoAtomic};
14use rayon::prelude::*;
15
16use super::orc::get_dataset_readers;
17use super::orc::{iter_arrow, par_iter_arrow};
18use crate::map::{MappedPermutation, Permutation};
19use crate::mph::SwhidMphf;
20use crate::properties::suffixes;
21use crate::utils::suffix_path;
22use crate::NodeType;
23
24pub struct PropertyWriter<'b, SWHIDMPHF: SwhidMphf> {
25 pub swhid_mph: SWHIDMPHF,
26 pub person_mph: Option<super::persons::PersonHasher<'b>>,
27 pub order: MappedPermutation,
28 pub num_nodes: usize,
29 pub dataset_dir: PathBuf,
30 pub allowed_node_types: Vec<NodeType>,
31 pub target: PathBuf,
32}
33
34impl<SWHIDMPHF: SwhidMphf + Sync> PropertyWriter<'_, SWHIDMPHF> {
35 fn for_each_row<Row>(&self, subdirectory: &str, f: impl FnMut(Row) -> Result<()>) -> Result<()>
36 where
37 Row: ArRowDeserialize + ArRowStruct + Send + Sync,
38 {
39 get_dataset_readers(self.dataset_dir.clone(), subdirectory)?
40 .into_iter()
41 .flat_map(|reader_builder| iter_arrow(reader_builder, |row: Row| [row]))
42 .try_for_each(f)
43 }
44
45 fn par_for_each_row<Row>(
46 &self,
47 subdirectory: &str,
48 f: impl Fn(Row) + Send + Sync,
49 ) -> Result<impl ParallelIterator<Item = ()>>
50 where
51 Row: ArRowDeserialize + ArRowStruct + Clone + Send + Sync,
52 {
53 Ok(get_dataset_readers(self.dataset_dir.clone(), subdirectory)?
54 .into_par_iter()
55 .flat_map(|reader_builder| par_iter_arrow(reader_builder, |row: Row| [row]))
56 .map(f))
57 }
58
59 fn init_vec<T: Copy + Default + Sync>(&self, initial_value: T) -> Vec<T>
64 where
65 for<'a> Vec<T>: IntoParallelRefMutIterator<'a, Item = &'a mut T>,
66 {
67 let mut vec = vec![T::default(); self.num_nodes];
68 vec.par_iter_mut().for_each(|v| *v = initial_value);
69 vec
70 }
71 fn init_atomic_vec<T: IntoAtomic + Copy + Default + Sync>(
73 &self,
74 initial_value: T,
75 ) -> Vec<<T as IntoAtomic>::AtomicType>
76 where
77 for<'a> Vec<T>: IntoParallelRefMutIterator<'a, Item = &'a mut T>,
78 {
79 (0..self.num_nodes)
80 .into_par_iter()
81 .map(|_| initial_value.to_atomic())
82 .collect()
83 }
84
85 fn node_id(&self, swhid: &str) -> usize {
86 self.order
87 .get(
88 self.swhid_mph
89 .hash_str(swhid)
90 .unwrap_or_else(|| panic!("unknown SWHID {swhid}")),
91 )
92 .unwrap()
93 }
94
95 fn set_atomic<Value: Atomic>(
96 &self,
97 vector: &[Value],
98 swhid: &str,
99 value: Value::NonAtomicType,
100 ) {
101 vector
102 .get(self.node_id(swhid))
103 .expect("node_id is larger than the array")
104 .store(value, Ordering::Relaxed)
105 }
106
107 fn set<Value>(&self, vector: &mut [Value], swhid: &str, value: Value) {
108 *vector
109 .get_mut(self.node_id(swhid))
110 .expect("node_id is larger than the array") = value;
111 }
112
113 fn write<Value: bytemuck::Pod>(&self, suffix: &str, values: impl AsRef<[Value]>) -> Result<()> {
114 let path = suffix_path(&self.target, suffix);
115 let mut file = std::fs::File::create(&path)
116 .with_context(|| format!("Could not create {}", path.display()))?;
117 file.write_all(bytemuck::cast_slice(values.as_ref()))
118 .with_context(|| format!("Could not write to {}", path.display()))?;
119
120 Ok(())
121 }
122
123 fn write_atomic<Value: Atomic>(&self, suffix: &str, values: Vec<Value>) -> Result<()>
124 where
125 <Value as Atomic>::NonAtomicType: bytemuck::Pod,
126 {
127 let values: Vec<_> = values.into_iter().map(Value::into_inner).collect();
129 self.write(suffix, values)
130 }
131
132 pub fn write_author_timestamps(&self) -> Result<()> {
133 #[derive(ArRowDeserialize, Default, Clone)]
134 struct Revrel {
135 id: String,
136 date: Option<ar_row::Timestamp>,
137 date_offset: Option<i16>,
138 }
139
140 let read_rev = self.allowed_node_types.contains(&NodeType::Revision);
141 let read_rel = self.allowed_node_types.contains(&NodeType::Release);
142
143 if !read_rev && !read_rel {
144 log::info!("Excluded");
145 return Ok(());
146 }
147
148 log::info!("Initializing...");
149 let timestamps = self.init_atomic_vec(i64::MIN.to_be());
150 let timestamp_offsets = self.init_atomic_vec(i16::MIN.to_be());
151
152 log::info!("Reading...");
153 let f = |type_: &str, r: Revrel| {
154 if let Some(date) = r.date {
155 let swhid = format!("swh:1:{}:{}", type_, r.id);
156 self.set_atomic(×tamps, &swhid, date.seconds.to_be());
157 if let Some(date_offset) = r.date_offset {
158 self.set_atomic(×tamp_offsets, &swhid, date_offset.to_be());
159 }
160 }
161 };
162
163 if read_rev && read_rel {
164 [].into_par_iter()
165 .chain(self.par_for_each_row("revision", |rev: Revrel| f("rev", rev))?)
166 .chain(self.par_for_each_row("release", |rel: Revrel| f("rel", rel))?)
167 .for_each(|()| ());
168 } else if read_rev {
169 self.par_for_each_row("revision", |rev: Revrel| f("rev", rev))?
170 .for_each(|()| ());
171 } else if read_rel {
172 self.par_for_each_row("release", |rel: Revrel| f("rel", rel))?
173 .for_each(|()| ());
174 } else {
175 unreachable!("!read_rev && !read_rel");
176 }
177
178 log::info!("Writing...");
179 self.write_atomic(suffixes::AUTHOR_TIMESTAMP, timestamps)?;
180 self.write_atomic(suffixes::AUTHOR_TIMESTAMP_OFFSET, timestamp_offsets)?;
181
182 Ok(())
183 }
184 pub fn write_committer_timestamps(&self) -> Result<()> {
185 #[derive(ArRowDeserialize, Default, Clone)]
186 struct Revision {
187 id: String,
188 committer_date: Option<ar_row::Timestamp>,
189 committer_offset: Option<i16>,
190 }
191
192 if !self.allowed_node_types.contains(&NodeType::Revision) {
193 log::info!("Excluded");
194 return Ok(());
195 }
196
197 log::info!("Initializing...");
198 let timestamps = self.init_atomic_vec(i64::MIN.to_be());
199 let timestamp_offsets = self.init_atomic_vec(i16::MIN.to_be());
200
201 log::info!("Reading...");
202 self.par_for_each_row("revision", |rev: Revision| {
203 if let Some(date) = rev.committer_date {
204 let swhid = format!("swh:1:rev:{}", rev.id);
205 self.set_atomic(×tamps, &swhid, date.seconds.to_be());
206 if let Some(date_offset) = rev.committer_offset {
207 self.set_atomic(×tamp_offsets, &swhid, date_offset.to_be());
208 }
209 }
210 })?
211 .for_each(|()| ());
212
213 log::info!("Writing...");
214 self.write_atomic(suffixes::COMMITTER_TIMESTAMP, timestamps)?;
215 self.write_atomic(suffixes::COMMITTER_TIMESTAMP_OFFSET, timestamp_offsets)?;
216
217 Ok(())
218 }
219 pub fn write_content_lengths(&self) -> Result<()> {
220 #[derive(ArRowDeserialize, Default, Clone)]
221 struct Content {
222 sha1_git: Option<String>,
223 length: Option<i64>,
224 }
225
226 if !self.allowed_node_types.contains(&NodeType::Content) {
227 log::info!("Excluded");
228 return Ok(());
229 }
230
231 log::info!("Initializing...");
232 let lengths = self.init_atomic_vec(u64::MAX.to_be());
233
234 log::info!("Reading...");
235 let f = |cnt: Content| {
236 if let Some(id) = cnt.sha1_git {
237 if let Some(length) = cnt.length {
238 let swhid = format!("swh:1:cnt:{id}");
239 self.set_atomic(&lengths, &swhid, (length as u64).to_be());
240 }
241 }
242 };
243 [].into_par_iter()
244 .chain(self.par_for_each_row("content", f)?)
245 .chain(self.par_for_each_row("skipped_content", f)?)
246 .for_each(|()| ());
247
248 log::info!("Writing...");
249 self.write_atomic(suffixes::CONTENT_LENGTH, lengths)?;
250
251 Ok(())
252 }
253 pub fn write_content_is_skipped(&self) -> Result<()> {
254 #[derive(ArRowDeserialize, Default, Clone)]
255 struct SkippedContent {
256 sha1_git: Option<String>,
257 }
258
259 if !self.allowed_node_types.contains(&NodeType::Content) {
260 log::info!("Excluded");
261 return Ok(());
262 }
263
264 log::info!("Initializing...");
265 let is_skipped = sux::bits::bit_vec::AtomicBitVec::new(self.num_nodes);
266
267 log::info!("Reading...");
268 self.par_for_each_row("skipped_content", |cnt: SkippedContent| {
269 if let Some(id) = cnt.sha1_git {
270 let swhid = format!("swh:1:cnt:{id}");
271 is_skipped.set(
272 self.node_id(&swhid),
273 true,
274 std::sync::atomic::Ordering::Relaxed,
275 );
276 }
277 })?
278 .for_each(|()| ());
279
280 log::info!("Converting...");
281 let (bitvec, len) = is_skipped.into_raw_parts();
282 assert_eq!(len, self.num_nodes);
283 let bitvec_be: Vec<u8> = bitvec
285 .into_par_iter()
286 .flat_map(|cell| cell.into_inner().to_be_bytes())
287 .collect();
288
289 log::info!("Writing...");
290 self.write(suffixes::CONTENT_IS_SKIPPED, bitvec_be)?;
291
292 Ok(())
293 }
294 pub fn write_author_ids(&self) -> Result<()> {
295 #[derive(ArRowDeserialize, Default, Clone)]
296 struct Revrel {
297 id: String,
298 author: Option<Box<[u8]>>,
299 }
300
301 let read_rev = self.allowed_node_types.contains(&NodeType::Revision);
302 let read_rel = self.allowed_node_types.contains(&NodeType::Release);
303
304 if !read_rev && !read_rel {
305 log::info!("Excluded");
306 return Ok(());
307 }
308
309 let Some(person_mph) = self.person_mph.as_ref() else {
310 panic!(
311 "write_author_ids is missing person MPH but allowed_node_types = {:?}",
312 self.allowed_node_types
313 );
314 };
315
316 log::info!("Initializing...");
317 let authors = self.init_atomic_vec(u32::MAX.to_be());
318
319 log::info!("Reading...");
320 let f = |type_: &str, r: Revrel| {
321 if let Some(person) = r.author {
322 let swhid = format!("swh:1:{}:{}", type_, r.id);
323 let base64 = base64_simd::STANDARD;
324 let person = base64.encode_to_string(person).into_bytes();
325 let person_id: u32 = person_mph.hash(person).expect("Unknown person");
326 self.set_atomic(&authors, &swhid, person_id.to_be());
327 }
328 };
329
330 if read_rev && read_rel {
331 [].into_par_iter()
332 .chain(self.par_for_each_row("revision", |rev: Revrel| f("rev", rev))?)
333 .chain(self.par_for_each_row("release", |rel: Revrel| f("rel", rel))?)
334 .for_each(|()| ());
335 } else if read_rev {
336 self.par_for_each_row("revision", |rev: Revrel| f("rev", rev))?
337 .for_each(|()| ());
338 } else if read_rel {
339 self.par_for_each_row("release", |rel: Revrel| f("rel", rel))?
340 .for_each(|()| ());
341 } else {
342 unreachable!("!read_rev && !read_rel");
343 }
344
345 log::info!("Writing...");
346 self.write_atomic(suffixes::AUTHOR_ID, authors)?;
347 Ok(())
348 }
349 pub fn write_committer_ids(&self) -> Result<()> {
350 #[derive(ArRowDeserialize, Default, Clone)]
351 struct Revision {
352 id: String,
353 committer: Option<Box<[u8]>>,
354 }
355
356 if !self.allowed_node_types.contains(&NodeType::Revision) {
357 log::info!("Excluded");
358 return Ok(());
359 }
360
361 let Some(person_mph) = self.person_mph.as_ref() else {
362 panic!(
363 "write_committer_ids is missing person MPH but allowed_node_types = {:?}",
364 self.allowed_node_types
365 );
366 };
367
368 log::info!("Initializing...");
369 let committers = self.init_atomic_vec(u32::MAX.to_be());
370
371 log::info!("Reading...");
372 self.par_for_each_row("revision", |rev: Revision| {
373 if let Some(person) = rev.committer {
374 let swhid = format!("swh:1:rev:{}", rev.id);
375 let base64 = base64_simd::STANDARD;
376 let person = base64.encode_to_string(person).into_bytes();
377 let person_id: u32 = person_mph.hash(person).expect("Unknown person");
378 self.set_atomic(&committers, &swhid, person_id.to_be());
379 }
380 })?
381 .for_each(|()| ());
382
383 log::info!("Writing...");
384 self.write_atomic(suffixes::COMMITTER_ID, committers)?;
385 Ok(())
386 }
387 pub fn write_messages(&self) -> Result<()> {
388 #[derive(ArRowDeserialize, Default, Clone)]
389 struct Revrel {
390 id: String,
391 message: Option<Box<[u8]>>,
392 }
393 #[derive(ArRowDeserialize, Default, Clone)]
394 struct Origin {
395 id: String,
396 url: String,
397 }
398
399 let read_rev = self.allowed_node_types.contains(&NodeType::Revision);
400 let read_rel = self.allowed_node_types.contains(&NodeType::Release);
401 let read_ori = self.allowed_node_types.contains(&NodeType::Origin);
402
403 if !read_rev && !read_rel && !read_ori {
404 log::info!("Excluded");
405 return Ok(());
406 }
407
408 log::info!("Initializing...");
409 let mut offsets = self.init_vec(u64::MAX.to_be());
410 let path = suffix_path(&self.target, suffixes::MESSAGE);
411 let file = std::fs::File::create(&path)
412 .with_context(|| format!("Could not create {}", path.display()))?;
413 let mut writer = BufWriter::new(file);
414
415 let base64 = base64_simd::STANDARD;
416 let mut offset = 0u64;
417
418 let mut f = |type_: &str, id: String, message: Option<Box<[u8]>>| {
419 if let Some(message) = message {
420 let swhid = format!("swh:1:{type_}:{id}");
421 let mut encoded_message = base64.encode_to_string(message);
422 encoded_message.push('\n');
423 let encoded_message = encoded_message.as_bytes();
424 writer.write_all(encoded_message)?;
425 self.set(&mut offsets, &swhid, offset.to_be());
426 offset += encoded_message.len() as u64;
427 }
428 Ok(())
429 };
430
431 if read_rel {
433 log::info!("Reading and writing release messages...");
434 self.for_each_row("release", |rel: Revrel| f("rel", rel.id, rel.message))?;
435 }
436 if read_rev {
437 log::info!("Reading and writing revision messages...");
438 self.for_each_row("revision", |rev: Revrel| f("rev", rev.id, rev.message))?;
439 }
440 if read_ori {
441 log::info!("Reading and writing origin URLs...");
442 self.for_each_row("origin", |ori: Origin| {
443 f("ori", ori.id, Some(ori.url.as_bytes().into()))
444 })?;
445 }
446
447 log::info!("Writing offsets...");
448 self.write(suffixes::MESSAGE_OFFSET, offsets)?;
449 Ok(())
450 }
451 pub fn write_tag_names(&self) -> Result<()> {
452 #[derive(ArRowDeserialize, Default, Clone)]
453 struct Release {
454 id: String,
455 name: Box<[u8]>,
456 }
457
458 if !self.allowed_node_types.contains(&NodeType::Release) {
459 log::info!("Excluded");
460 return Ok(());
461 }
462
463 log::info!("Initializing...");
464 let mut offsets = self.init_vec(u64::MAX.to_be());
465 let path = suffix_path(&self.target, suffixes::TAG_NAME);
466 let file = std::fs::File::create(&path)
467 .with_context(|| format!("Could not create {}", path.display()))?;
468 let mut writer = BufWriter::new(file);
469
470 log::info!("Reading and writing...");
471 let base64 = base64_simd::STANDARD;
472 let mut offset = 0u64;
473
474 self.for_each_row("release", |rel: Release| {
476 let swhid = format!("swh:1:rel:{}", rel.id);
477 let mut encoded_name = base64.encode_to_string(rel.name);
478 encoded_name.push('\n');
479 let encoded_name = encoded_name.as_bytes();
480 writer.write_all(encoded_name)?;
481 self.set(&mut offsets, &swhid, offset.to_be());
482 offset += encoded_name.len() as u64;
483
484 Ok(())
485 })?;
486
487 log::info!("Writing offsets...");
488 self.write(suffixes::TAG_NAME_OFFSET, offsets)?;
489 Ok(())
490 }
491}