swh_graph/compress/
persons.rs

1// Copyright (C) 2024  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6use std::fs::File;
7use std::io::{BufRead, BufReader};
8use std::path::{Path, PathBuf};
9
10use anyhow::{Context, Result};
11use dsi_progress_logger::{concurrent_progress_logger, ProgressLog};
12use pthash::{
13    BuildConfiguration, DictionaryDictionary, Hashable, Minimal, MurmurHash2_64, PartitionedPhf,
14    Phf,
15};
16
17pub struct Person<T: AsRef<[u8]>>(pub T);
18
19impl<T: AsRef<[u8]>> Hashable for Person<T> {
20    type Bytes<'a>
21        = &'a [u8]
22    where
23        T: 'a;
24    fn as_bytes(&self) -> Self::Bytes<'_> {
25        self.0.as_ref()
26    }
27}
28
29// pthash requires 128-bits hash when using over 2^30 keys, and the 2024-05-16 production
30// graph has just over 2^32 keys
31pub type PersonMphf = PartitionedPhf<Minimal, MurmurHash2_64, DictionaryDictionary>;
32
33fn iter_persons(path: &Path) -> Result<impl Iterator<Item = Person<Box<[u8]>>>> {
34    let persons_file =
35        File::open(path).with_context(|| format!("Could not open {}", path.display()))?;
36    Ok(BufReader::new(persons_file).lines().map(move |person| {
37        Person(
38            person
39                .expect("Could not decode persons as UTF-8")
40                .into_bytes()
41                .into_boxed_slice(),
42        )
43    }))
44}
45
46/// Reads base64-encoded persons from the path and return a MPH function for them.
47pub fn build_mphf(path: PathBuf, num_persons: usize) -> Result<PersonMphf> {
48    let mut pass_counter = 0;
49    let iter_persons = || {
50        pass_counter += 1;
51        let mut pl = concurrent_progress_logger!(
52            display_memory = true,
53            item_name = "person",
54            local_speed = true,
55            expected_updates = Some(num_persons),
56        );
57        pl.start(format!("Reading persons (pass #{pass_counter})"));
58        iter_persons(&path)
59            .expect("Could not read persons")
60            .inspect(move |_| pl.light_update())
61    };
62    let temp_dir = tempfile::tempdir().unwrap();
63
64    // TODO: tweak those for performance
65    let mut config = BuildConfiguration::new(temp_dir.path().to_owned());
66    config.num_threads = num_cpus::get() as u64;
67
68    log::info!("Building MPH with parameters: {:?}", config);
69
70    let mut f = PersonMphf::new();
71    f.build_in_internal_memory_from_bytes(iter_persons, &config)
72        .context("Failed to build MPH")?;
73    Ok(f)
74}
75
76#[derive(Clone, Copy)]
77pub struct PersonHasher<'a> {
78    mphf: &'a PersonMphf,
79}
80
81impl<'a> PersonHasher<'a> {
82    pub fn new(mphf: &'a PersonMphf) -> Self {
83        PersonHasher { mphf }
84    }
85
86    pub fn mphf(&self) -> &'a PersonMphf {
87        self.mphf
88    }
89
90    pub fn hash<T: AsRef<[u8]>>(&self, person_name: T) -> Result<u32> {
91        Ok(self
92            .mphf
93            .hash(Person(person_name))
94            .try_into()
95            .expect("person MPH overflowed"))
96    }
97}