swh_graph/compress/
properties.rs

1// Copyright (C) 2023-2024  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6use std::io::{BufWriter, Write};
7use std::path::PathBuf;
8use std::sync::atomic::Ordering;
9
10use anyhow::{Context, Result};
11use ar_row::deserialize::{ArRowDeserialize, ArRowStruct};
12use ar_row_derive::ArRowDeserialize;
13use common_traits::{Atomic, IntoAtomic};
14use rayon::prelude::*;
15
16use super::orc::get_dataset_readers;
17use super::orc::{iter_arrow, par_iter_arrow};
18use crate::map::{MappedPermutation, Permutation};
19use crate::mph::SwhidMphf;
20use crate::properties::suffixes;
21use crate::utils::suffix_path;
22use crate::NodeType;
23
24pub struct PropertyWriter<'b, SWHIDMPHF: SwhidMphf> {
25    pub swhid_mph: SWHIDMPHF,
26    pub person_mph: Option<super::persons::PersonHasher<'b>>,
27    pub order: MappedPermutation,
28    pub num_nodes: usize,
29    pub dataset_dir: PathBuf,
30    pub allowed_node_types: Vec<NodeType>,
31    pub target: PathBuf,
32}
33
34impl<SWHIDMPHF: SwhidMphf + Sync> PropertyWriter<'_, SWHIDMPHF> {
35    fn for_each_row<Row>(&self, subdirectory: &str, f: impl FnMut(Row) -> Result<()>) -> Result<()>
36    where
37        Row: ArRowDeserialize + ArRowStruct + Send + Sync,
38    {
39        get_dataset_readers(self.dataset_dir.clone(), subdirectory)?
40            .into_iter()
41            .flat_map(|reader_builder| iter_arrow(reader_builder, |row: Row| [row]))
42            .try_for_each(f)
43    }
44
45    fn par_for_each_row<Row>(
46        &self,
47        subdirectory: &str,
48        f: impl Fn(Row) + Send + Sync,
49    ) -> Result<impl ParallelIterator<Item = ()>>
50    where
51        Row: ArRowDeserialize + ArRowStruct + Clone + Send + Sync,
52    {
53        Ok(get_dataset_readers(self.dataset_dir.clone(), subdirectory)?
54            .into_par_iter()
55            .flat_map(|reader_builder| par_iter_arrow(reader_builder, |row: Row| [row]))
56            .map(f))
57    }
58
59    /// Equivalent to `vec![initial_value; self.num_nodes]`, but initializes values in the vector
60    /// in parallel
61    ///
62    /// This is 9 times faster on a NUMA machine with two Intel Xeon Gold 6342 CPUs.
63    fn init_vec<T: Copy + Default + Sync>(&self, initial_value: T) -> Vec<T>
64    where
65        for<'a> Vec<T>: IntoParallelRefMutIterator<'a, Item = &'a mut T>,
66    {
67        let mut vec = vec![T::default(); self.num_nodes];
68        vec.par_iter_mut().for_each(|v| *v = initial_value);
69        vec
70    }
71    /// Same as [`Self::init_vec`] but returns a vector of atomic values
72    fn init_atomic_vec<T: IntoAtomic + Copy + Default + Sync>(
73        &self,
74        initial_value: T,
75    ) -> Vec<<T as IntoAtomic>::AtomicType>
76    where
77        for<'a> Vec<T>: IntoParallelRefMutIterator<'a, Item = &'a mut T>,
78    {
79        (0..self.num_nodes)
80            .into_par_iter()
81            .map(|_| initial_value.to_atomic())
82            .collect()
83    }
84
85    fn node_id(&self, swhid: &str) -> usize {
86        self.order
87            .get(
88                self.swhid_mph
89                    .hash_str(swhid)
90                    .unwrap_or_else(|| panic!("unknown SWHID {swhid}")),
91            )
92            .unwrap()
93    }
94
95    fn set_atomic<Value: Atomic>(
96        &self,
97        vector: &[Value],
98        swhid: &str,
99        value: Value::NonAtomicType,
100    ) {
101        vector
102            .get(self.node_id(swhid))
103            .expect("node_id is larger than the array")
104            .store(value, Ordering::Relaxed)
105    }
106
107    fn set<Value>(&self, vector: &mut [Value], swhid: &str, value: Value) {
108        *vector
109            .get_mut(self.node_id(swhid))
110            .expect("node_id is larger than the array") = value;
111    }
112
113    fn write<Value: bytemuck::Pod>(&self, suffix: &str, values: impl AsRef<[Value]>) -> Result<()> {
114        let path = suffix_path(&self.target, suffix);
115        let mut file = std::fs::File::create(&path)
116            .with_context(|| format!("Could not create {}", path.display()))?;
117        file.write_all(bytemuck::cast_slice(values.as_ref()))
118            .with_context(|| format!("Could not write to {}", path.display()))?;
119
120        Ok(())
121    }
122
123    fn write_atomic<Value: Atomic>(&self, suffix: &str, values: Vec<Value>) -> Result<()>
124    where
125        <Value as Atomic>::NonAtomicType: bytemuck::Pod,
126    {
127        // In release mode, this is compiled into a no-op
128        let values: Vec<_> = values.into_iter().map(Value::into_inner).collect();
129        self.write(suffix, values)
130    }
131
132    pub fn write_author_timestamps(&self) -> Result<()> {
133        #[derive(ArRowDeserialize, Default, Clone)]
134        struct Revrel {
135            id: String,
136            date: Option<ar_row::Timestamp>,
137            date_offset: Option<i16>,
138        }
139
140        let read_rev = self.allowed_node_types.contains(&NodeType::Revision);
141        let read_rel = self.allowed_node_types.contains(&NodeType::Release);
142
143        if !read_rev && !read_rel {
144            log::info!("Excluded");
145            return Ok(());
146        }
147
148        log::info!("Initializing...");
149        let timestamps = self.init_atomic_vec(i64::MIN.to_be());
150        let timestamp_offsets = self.init_atomic_vec(i16::MIN.to_be());
151
152        log::info!("Reading...");
153        let f = |type_: &str, r: Revrel| {
154            if let Some(date) = r.date {
155                let swhid = format!("swh:1:{}:{}", type_, r.id);
156                self.set_atomic(&timestamps, &swhid, date.seconds.to_be());
157                if let Some(date_offset) = r.date_offset {
158                    self.set_atomic(&timestamp_offsets, &swhid, date_offset.to_be());
159                }
160            }
161        };
162
163        if read_rev && read_rel {
164            [].into_par_iter()
165                .chain(self.par_for_each_row("revision", |rev: Revrel| f("rev", rev))?)
166                .chain(self.par_for_each_row("release", |rel: Revrel| f("rel", rel))?)
167                .for_each(|()| ());
168        } else if read_rev {
169            self.par_for_each_row("revision", |rev: Revrel| f("rev", rev))?
170                .for_each(|()| ());
171        } else if read_rel {
172            self.par_for_each_row("release", |rel: Revrel| f("rel", rel))?
173                .for_each(|()| ());
174        } else {
175            unreachable!("!read_rev && !read_rel");
176        }
177
178        log::info!("Writing...");
179        self.write_atomic(suffixes::AUTHOR_TIMESTAMP, timestamps)?;
180        self.write_atomic(suffixes::AUTHOR_TIMESTAMP_OFFSET, timestamp_offsets)?;
181
182        Ok(())
183    }
184    pub fn write_committer_timestamps(&self) -> Result<()> {
185        #[derive(ArRowDeserialize, Default, Clone)]
186        struct Revision {
187            id: String,
188            committer_date: Option<ar_row::Timestamp>,
189            committer_offset: Option<i16>,
190        }
191
192        if !self.allowed_node_types.contains(&NodeType::Revision) {
193            log::info!("Excluded");
194            return Ok(());
195        }
196
197        log::info!("Initializing...");
198        let timestamps = self.init_atomic_vec(i64::MIN.to_be());
199        let timestamp_offsets = self.init_atomic_vec(i16::MIN.to_be());
200
201        log::info!("Reading...");
202        self.par_for_each_row("revision", |rev: Revision| {
203            if let Some(date) = rev.committer_date {
204                let swhid = format!("swh:1:rev:{}", rev.id);
205                self.set_atomic(&timestamps, &swhid, date.seconds.to_be());
206                if let Some(date_offset) = rev.committer_offset {
207                    self.set_atomic(&timestamp_offsets, &swhid, date_offset.to_be());
208                }
209            }
210        })?
211        .for_each(|()| ());
212
213        log::info!("Writing...");
214        self.write_atomic(suffixes::COMMITTER_TIMESTAMP, timestamps)?;
215        self.write_atomic(suffixes::COMMITTER_TIMESTAMP_OFFSET, timestamp_offsets)?;
216
217        Ok(())
218    }
219    pub fn write_content_lengths(&self) -> Result<()> {
220        #[derive(ArRowDeserialize, Default, Clone)]
221        struct Content {
222            sha1_git: Option<String>,
223            length: Option<i64>,
224        }
225
226        if !self.allowed_node_types.contains(&NodeType::Content) {
227            log::info!("Excluded");
228            return Ok(());
229        }
230
231        log::info!("Initializing...");
232        let lengths = self.init_atomic_vec(u64::MAX.to_be());
233
234        log::info!("Reading...");
235        let f = |cnt: Content| {
236            if let Some(id) = cnt.sha1_git {
237                if let Some(length) = cnt.length {
238                    let swhid = format!("swh:1:cnt:{id}");
239                    self.set_atomic(&lengths, &swhid, (length as u64).to_be());
240                }
241            }
242        };
243        [].into_par_iter()
244            .chain(self.par_for_each_row("content", f)?)
245            .chain(self.par_for_each_row("skipped_content", f)?)
246            .for_each(|()| ());
247
248        log::info!("Writing...");
249        self.write_atomic(suffixes::CONTENT_LENGTH, lengths)?;
250
251        Ok(())
252    }
253    pub fn write_content_is_skipped(&self) -> Result<()> {
254        #[derive(ArRowDeserialize, Default, Clone)]
255        struct SkippedContent {
256            sha1_git: Option<String>,
257        }
258
259        if !self.allowed_node_types.contains(&NodeType::Content) {
260            log::info!("Excluded");
261            return Ok(());
262        }
263
264        log::info!("Initializing...");
265        let is_skipped = sux::bits::bit_vec::AtomicBitVec::new(self.num_nodes);
266
267        log::info!("Reading...");
268        self.par_for_each_row("skipped_content", |cnt: SkippedContent| {
269            if let Some(id) = cnt.sha1_git {
270                let swhid = format!("swh:1:cnt:{id}");
271                is_skipped.set(
272                    self.node_id(&swhid),
273                    true,
274                    std::sync::atomic::Ordering::Relaxed,
275                );
276            }
277        })?
278        .for_each(|()| ());
279
280        log::info!("Converting...");
281        let (bitvec, len) = is_skipped.into_raw_parts();
282        assert_eq!(len, self.num_nodes);
283        // Make its values big-endian
284        let bitvec_be: Vec<u8> = bitvec
285            .into_par_iter()
286            .flat_map(|cell| cell.into_inner().to_be_bytes())
287            .collect();
288
289        log::info!("Writing...");
290        self.write(suffixes::CONTENT_IS_SKIPPED, bitvec_be)?;
291
292        Ok(())
293    }
294    pub fn write_author_ids(&self) -> Result<()> {
295        #[derive(ArRowDeserialize, Default, Clone)]
296        struct Revrel {
297            id: String,
298            author: Option<Box<[u8]>>,
299        }
300
301        let read_rev = self.allowed_node_types.contains(&NodeType::Revision);
302        let read_rel = self.allowed_node_types.contains(&NodeType::Release);
303
304        if !read_rev && !read_rel {
305            log::info!("Excluded");
306            return Ok(());
307        }
308
309        let Some(person_mph) = self.person_mph.as_ref() else {
310            panic!(
311                "write_author_ids is missing person MPH but allowed_node_types = {:?}",
312                self.allowed_node_types
313            );
314        };
315
316        log::info!("Initializing...");
317        let authors = self.init_atomic_vec(u32::MAX.to_be());
318
319        log::info!("Reading...");
320        let f = |type_: &str, r: Revrel| {
321            if let Some(person) = r.author {
322                let swhid = format!("swh:1:{}:{}", type_, r.id);
323                let base64 = base64_simd::STANDARD;
324                let person = base64.encode_to_string(person).into_bytes();
325                let person_id: u32 = person_mph.hash(person).expect("Unknown person");
326                self.set_atomic(&authors, &swhid, person_id.to_be());
327            }
328        };
329
330        if read_rev && read_rel {
331            [].into_par_iter()
332                .chain(self.par_for_each_row("revision", |rev: Revrel| f("rev", rev))?)
333                .chain(self.par_for_each_row("release", |rel: Revrel| f("rel", rel))?)
334                .for_each(|()| ());
335        } else if read_rev {
336            self.par_for_each_row("revision", |rev: Revrel| f("rev", rev))?
337                .for_each(|()| ());
338        } else if read_rel {
339            self.par_for_each_row("release", |rel: Revrel| f("rel", rel))?
340                .for_each(|()| ());
341        } else {
342            unreachable!("!read_rev && !read_rel");
343        }
344
345        log::info!("Writing...");
346        self.write_atomic(suffixes::AUTHOR_ID, authors)?;
347        Ok(())
348    }
349    pub fn write_committer_ids(&self) -> Result<()> {
350        #[derive(ArRowDeserialize, Default, Clone)]
351        struct Revision {
352            id: String,
353            committer: Option<Box<[u8]>>,
354        }
355
356        if !self.allowed_node_types.contains(&NodeType::Revision) {
357            log::info!("Excluded");
358            return Ok(());
359        }
360
361        let Some(person_mph) = self.person_mph.as_ref() else {
362            panic!(
363                "write_committer_ids is missing person MPH but allowed_node_types = {:?}",
364                self.allowed_node_types
365            );
366        };
367
368        log::info!("Initializing...");
369        let committers = self.init_atomic_vec(u32::MAX.to_be());
370
371        log::info!("Reading...");
372        self.par_for_each_row("revision", |rev: Revision| {
373            if let Some(person) = rev.committer {
374                let swhid = format!("swh:1:rev:{}", rev.id);
375                let base64 = base64_simd::STANDARD;
376                let person = base64.encode_to_string(person).into_bytes();
377                let person_id: u32 = person_mph.hash(person).expect("Unknown person");
378                self.set_atomic(&committers, &swhid, person_id.to_be());
379            }
380        })?
381        .for_each(|()| ());
382
383        log::info!("Writing...");
384        self.write_atomic(suffixes::COMMITTER_ID, committers)?;
385        Ok(())
386    }
387    pub fn write_messages(&self) -> Result<()> {
388        #[derive(ArRowDeserialize, Default, Clone)]
389        struct Revrel {
390            id: String,
391            message: Option<Box<[u8]>>,
392        }
393        #[derive(ArRowDeserialize, Default, Clone)]
394        struct Origin {
395            id: String,
396            url: String,
397        }
398
399        let read_rev = self.allowed_node_types.contains(&NodeType::Revision);
400        let read_rel = self.allowed_node_types.contains(&NodeType::Release);
401        let read_ori = self.allowed_node_types.contains(&NodeType::Origin);
402
403        if !read_rev && !read_rel && !read_ori {
404            log::info!("Excluded");
405            return Ok(());
406        }
407
408        log::info!("Initializing...");
409        let mut offsets = self.init_vec(u64::MAX.to_be());
410        let path = suffix_path(&self.target, suffixes::MESSAGE);
411        let file = std::fs::File::create(&path)
412            .with_context(|| format!("Could not create {}", path.display()))?;
413        let mut writer = BufWriter::new(file);
414
415        let base64 = base64_simd::STANDARD;
416        let mut offset = 0u64;
417
418        let mut f = |type_: &str, id: String, message: Option<Box<[u8]>>| {
419            if let Some(message) = message {
420                let swhid = format!("swh:1:{type_}:{id}");
421                let mut encoded_message = base64.encode_to_string(message);
422                encoded_message.push('\n');
423                let encoded_message = encoded_message.as_bytes();
424                writer.write_all(encoded_message)?;
425                self.set(&mut offsets, &swhid, offset.to_be());
426                offset += encoded_message.len() as u64;
427            }
428            Ok(())
429        };
430
431        // Can't do it in parallel because we are writing to a single file
432        if read_rel {
433            log::info!("Reading and writing release messages...");
434            self.for_each_row("release", |rel: Revrel| f("rel", rel.id, rel.message))?;
435        }
436        if read_rev {
437            log::info!("Reading and writing revision messages...");
438            self.for_each_row("revision", |rev: Revrel| f("rev", rev.id, rev.message))?;
439        }
440        if read_ori {
441            log::info!("Reading and writing origin URLs...");
442            self.for_each_row("origin", |ori: Origin| {
443                f("ori", ori.id, Some(ori.url.as_bytes().into()))
444            })?;
445        }
446
447        log::info!("Writing offsets...");
448        self.write(suffixes::MESSAGE_OFFSET, offsets)?;
449        Ok(())
450    }
451    pub fn write_tag_names(&self) -> Result<()> {
452        #[derive(ArRowDeserialize, Default, Clone)]
453        struct Release {
454            id: String,
455            name: Box<[u8]>,
456        }
457
458        if !self.allowed_node_types.contains(&NodeType::Release) {
459            log::info!("Excluded");
460            return Ok(());
461        }
462
463        log::info!("Initializing...");
464        let mut offsets = self.init_vec(u64::MAX.to_be());
465        let path = suffix_path(&self.target, suffixes::TAG_NAME);
466        let file = std::fs::File::create(&path)
467            .with_context(|| format!("Could not create {}", path.display()))?;
468        let mut writer = BufWriter::new(file);
469
470        log::info!("Reading and writing...");
471        let base64 = base64_simd::STANDARD;
472        let mut offset = 0u64;
473
474        // Can't do it in parallel because we are writing to a single file
475        self.for_each_row("release", |rel: Release| {
476            let swhid = format!("swh:1:rel:{}", rel.id);
477            let mut encoded_name = base64.encode_to_string(rel.name);
478            encoded_name.push('\n');
479            let encoded_name = encoded_name.as_bytes();
480            writer.write_all(encoded_name)?;
481            self.set(&mut offsets, &swhid, offset.to_be());
482            offset += encoded_name.len() as u64;
483
484            Ok(())
485        })?;
486
487        log::info!("Writing offsets...");
488        self.write(suffixes::TAG_NAME_OFFSET, offsets)?;
489        Ok(())
490    }
491}