dag/idmap/
indexedlog_idmap.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::fmt;
9use std::fs;
10use std::fs::File;
11use std::io::Read;
12use std::path::Path;
13use std::path::PathBuf;
14
15use byteorder::BigEndian;
16use byteorder::ReadBytesExt;
17use fs2::FileExt;
18use indexedlog::log;
19use vlqencoding::VLQDecode;
20use vlqencoding::VLQEncode;
21
22use super::IdMapWrite;
23use crate::errors::bug;
24use crate::errors::programming;
25use crate::errors::NotFoundError;
26use crate::id::Group;
27use crate::id::Id;
28use crate::id::VertexName;
29use crate::ops::IdConvert;
30use crate::ops::Persist;
31use crate::ops::PrefixLookup;
32use crate::ops::TryClone;
33use crate::Result;
34use crate::VerLink;
35
36/// Bi-directional mapping between an integer id and a name (`[u8]`).
37///
38/// Backed by the filesystem.
39pub struct IdMap {
40    pub(crate) log: log::Log,
41    path: PathBuf,
42    map_id: String,
43    map_version: VerLink,
44}
45
46impl IdMap {
47    // Format:
48    //
49    // - Insertion:
50    //   id (8 bytes, BE) + group (1 byte) + name (n bytes)
51    // - Deletion:
52    //   u64::MAX (8 bytes, BE) + n (VLQ) + [id (VLQ) + len(name) (VLQ) + name ] * n
53    // - Clear non-master (only id->name mappings, being deprecated):
54    //   CLRNM
55
56    const INDEX_ID_TO_NAME: usize = 0;
57    const INDEX_GROUP_NAME_TO_ID: usize = 1;
58
59    /// Magic bytes in `Log` that indicates "remove all non-master id->name
60    /// mappings". A valid entry has at least 8 bytes so does not conflict
61    /// with this.
62    const MAGIC_CLEAR_NON_MASTER: &'static [u8] = b"CLRNM";
63
64    /// Magic prefix for deletion. It's u64::MAX id, which does not conflict
65    /// with a valid id because it's > `Id::MAX`.
66    const MAGIC_DELETION_PREFIX: &'static [u8] = &u64::MAX.to_be_bytes();
67
68    /// Start offset in an entry for "name".
69    const NAME_OFFSET: usize = 8 + Group::BYTES;
70
71    /// Create an [`IdMap`] backed by the given directory.
72    ///
73    /// By default, only read-only operations are allowed. For writing
74    /// access, call [`IdMap::make_writable`] to get a writable instance.
75    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
76        let path = path.as_ref();
77        let log = Self::log_open_options().open(path)?;
78        Self::open_from_log(log)
79    }
80}
81
82impl TryClone for IdMap {
83    fn try_clone(&self) -> Result<Self> {
84        let result = Self {
85            log: self.log.try_clone()?,
86            path: self.path.clone(),
87            map_id: self.map_id.clone(),
88            map_version: self.map_version.clone(),
89        };
90        Ok(result)
91    }
92}
93
94impl IdMap {
95    pub(crate) fn open_from_log(log: log::Log) -> Result<Self> {
96        let path = log.path().as_opt_path().unwrap().to_path_buf();
97        let map_id = format!("ilog:{}", path.display());
98        Ok(Self {
99            log,
100            path,
101            map_id,
102            map_version: VerLink::new(),
103        })
104    }
105
106    pub(crate) fn log_open_options() -> log::OpenOptions {
107        assert!(Self::MAGIC_DELETION_PREFIX > &Id::MAX.0.to_be_bytes()[..]);
108        log::OpenOptions::new()
109            .create(true)
110            .index("id", |data| {
111                assert!(Self::MAGIC_CLEAR_NON_MASTER.len() < 8);
112                assert!(Group::BITS == 8);
113                if data.starts_with(Self::MAGIC_DELETION_PREFIX) {
114                    let items =
115                        decode_deletion_entry(data).expect("deletion entry should be valid");
116                    items
117                        .into_iter()
118                        .map(|(id, _name)| log::IndexOutput::Remove(id.0.to_be_bytes().into()))
119                        .collect()
120                } else if data.len() < 8 {
121                    if data == Self::MAGIC_CLEAR_NON_MASTER {
122                        vec![log::IndexOutput::RemovePrefix(Box::new([
123                            Group::NON_MASTER.0 as u8,
124                        ]))]
125                    } else {
126                        panic!("bug: invalid segment {:?}", &data);
127                    }
128                } else {
129                    vec![log::IndexOutput::Reference(0..8)]
130                }
131            })
132            .index("group-name", |data| {
133                if data.starts_with(Self::MAGIC_DELETION_PREFIX) {
134                    let items =
135                        decode_deletion_entry(data).expect("deletion entry should be valid");
136                    items
137                        .into_iter()
138                        .map(|(id, name)| {
139                            let mut key = Vec::with_capacity(name.len() + 1);
140                            key.extend_from_slice(&id.group().bytes());
141                            key.extend_from_slice(name);
142                            log::IndexOutput::Remove(key.into())
143                        })
144                        .collect()
145                } else if data.len() >= 8 {
146                    vec![log::IndexOutput::Reference(8..(data.len() as u64))]
147                } else {
148                    if data == Self::MAGIC_CLEAR_NON_MASTER {
149                        vec![log::IndexOutput::RemovePrefix(Box::new([
150                            Group::NON_MASTER.0 as u8,
151                        ]))]
152                    } else {
153                        panic!("bug: invalid segment {:?}", &data);
154                    }
155                }
156            })
157            .flush_filter(Some(|_, _| {
158                panic!("programming error: idmap changed by other process")
159            }))
160    }
161
162    /// Find name by a specified integer id.
163    pub fn find_name_by_id(&self, id: Id) -> Result<Option<&[u8]>> {
164        let key = id.0.to_be_bytes();
165        let key = self.log.lookup(Self::INDEX_ID_TO_NAME, &key)?.nth(0);
166        match key {
167            Some(Ok(entry)) => {
168                if entry.len() < 8 {
169                    return bug("index key should have 8 bytes at least");
170                }
171                Ok(Some(&entry[Self::NAME_OFFSET..]))
172            }
173            None => Ok(None),
174            Some(Err(err)) => Err(err.into()),
175        }
176    }
177
178    /// Find VertexName by a specified integer id.
179    pub fn find_vertex_name_by_id(&self, id: Id) -> Result<Option<VertexName>> {
180        self.find_name_by_id(id)
181            .map(|v| v.map(|n| VertexName(self.log.slice_to_bytes(n))))
182    }
183
184    /// Find the integer id matching the given name.
185    pub fn find_id_by_name(&self, name: &[u8]) -> Result<Option<Id>> {
186        for group in Group::ALL.iter() {
187            let mut group_name = Vec::with_capacity(Group::BYTES + name.len());
188            group_name.extend_from_slice(&group.bytes());
189            group_name.extend_from_slice(name);
190            let key = self
191                .log
192                .lookup(Self::INDEX_GROUP_NAME_TO_ID, group_name)?
193                .nth(0);
194            match key {
195                Some(Ok(mut entry)) => {
196                    if entry.len() < 8 {
197                        return bug("index key should have 8 bytes at least");
198                    }
199                    let id = Id(entry.read_u64::<BigEndian>().unwrap());
200                    return Ok(Some(id));
201                }
202                None => {}
203                Some(Err(err)) => return Err(err.into()),
204            }
205        }
206        Ok(None)
207    }
208
209    /// Similar to `find_name_by_id`, but returns None if group > `max_group`.
210    pub fn find_id_by_name_with_max_group(
211        &self,
212        name: &[u8],
213        max_group: Group,
214    ) -> Result<Option<Id>> {
215        Ok(self.find_id_by_name(name)?.and_then(|id| {
216            if id.group() <= max_group {
217                Some(id)
218            } else {
219                None
220            }
221        }))
222    }
223
224    /// Insert a new entry mapping from a name to an id.
225    ///
226    /// Errors if the new entry conflicts with existing entries.
227    pub fn insert(&mut self, id: Id, name: &[u8]) -> Result<()> {
228        let existing_name = self.find_name_by_id(id)?;
229        if let Some(existing_name) = existing_name {
230            if existing_name == name {
231                return Ok(());
232            } else {
233                return bug(format!(
234                    "new entry {} = {:?} conflicts with an existing entry {} = {:?}",
235                    id, name, id, existing_name
236                ));
237            }
238        }
239        let existing_id = self.find_id_by_name(name)?;
240        if let Some(existing_id) = existing_id {
241            // Allow re-assigning Ids from a higher group to a lower group.
242            // For example, when a non-master commit gets merged into the
243            // master branch, the id is re-assigned to master. But, the
244            // ids in the master group will never be re-assigned to
245            // non-master groups.
246            if existing_id == id {
247                return Ok(());
248            } else {
249                return bug(format!(
250                    "new entry {} = {:?} conflicts with an existing entry {} = {:?}",
251                    id, name, existing_id, name
252                ));
253            }
254        }
255
256        let mut data = Vec::with_capacity(8 + Group::BYTES + name.len());
257        data.extend_from_slice(&id.0.to_be_bytes());
258        data.extend_from_slice(&id.group().bytes());
259        data.extend_from_slice(&name);
260        self.log.append(data)?;
261        self.map_version.bump();
262        #[cfg(debug_assertions)]
263        {
264            let items = self.find_range(id, id).unwrap();
265            assert_eq!(items[0], (id, name));
266        }
267        Ok(())
268    }
269
270    /// Find all (id, name) pairs in the `low..=high` range.
271    fn find_range(&self, low: Id, high: Id) -> Result<Vec<(Id, &[u8])>> {
272        let low = low.0.to_be_bytes();
273        let high = high.0.to_be_bytes();
274        let range = &low[..]..=&high[..];
275        let mut items = Vec::new();
276        for entry in self.log.lookup_range(Self::INDEX_ID_TO_NAME, range)? {
277            let (key, values) = entry?;
278            let key: [u8; 8] = match key.as_ref().try_into() {
279                Ok(key) => key,
280                Err(_) => {
281                    return bug("find_range got non-u64 keys in INDEX_ID_TO_NAME");
282                }
283            };
284            let id = Id(u64::from_be_bytes(key));
285            for value in values {
286                let value = value?;
287                if value.len() < 8 {
288                    return bug(format!(
289                        "find_range got entry {:?} shorter than expected",
290                        &value
291                    ));
292                }
293                let name: &[u8] = &value[9..];
294                items.push((id, name));
295            }
296        }
297        Ok(items)
298    }
299
300    fn remove_range(&mut self, low: Id, high: Id) -> Result<Vec<VertexName>> {
301        // Step 1: Find (id, name) pairs in the range.
302        let items = self.find_range(low, high)?;
303        let names = items
304            .iter()
305            .map(|(_, bytes)| VertexName::copy_from(bytes))
306            .collect();
307        // Step 2: Write a "delete" entry to delete those indexes.
308        // The indexedlog index function (defined by log_open_options())
309        // will handle it.
310        let data = encode_deletion_entry(&items);
311        self.log.append(data)?;
312        // New map is not an "append-only" version of the previous map.
313        // Re-create the VerLink to mark it as incompatible.
314        self.map_version = VerLink::new();
315        Ok(names)
316    }
317
318    /// Lookup names by hex prefix.
319    fn find_names_by_hex_prefix(&self, hex_prefix: &[u8], limit: usize) -> Result<Vec<VertexName>> {
320        let mut result = Vec::with_capacity(limit);
321        for group in Group::ALL.iter().rev() {
322            let mut prefix = Vec::with_capacity(Group::BYTES * 2 + hex_prefix.len());
323            prefix.extend_from_slice(&group.hex_bytes());
324            prefix.extend_from_slice(hex_prefix);
325            for entry in self
326                .log
327                .lookup_prefix_hex(Self::INDEX_GROUP_NAME_TO_ID, prefix)?
328            {
329                let (k, _v) = entry?;
330                let vertex = VertexName(self.log.slice_to_bytes(&k[Group::BYTES..]));
331                if !result.contains(&vertex) {
332                    result.push(vertex);
333                }
334                if result.len() >= limit {
335                    return Ok(result);
336                }
337            }
338        }
339        Ok(result)
340    }
341}
342
343/// Encode a list of (id, name) pairs as an deletion entry.
344/// The deletion entry will be consumed by the index functions defined by
345/// `log_open_options()`.
346fn encode_deletion_entry(items: &[(Id, &[u8])]) -> Vec<u8> {
347    // Rough size for common 20-byte sha1 hashes.
348    let len = IdMap::MAGIC_DELETION_PREFIX.len() + 9 + items.len() * 30;
349    let mut data = Vec::with_capacity(len);
350    data.extend_from_slice(IdMap::MAGIC_DELETION_PREFIX);
351    data.write_vlq(items.len()).unwrap();
352    for (id, name) in items {
353        data.write_vlq(id.0).unwrap();
354        data.write_vlq(name.len()).unwrap();
355        data.extend_from_slice(name);
356    }
357    data
358}
359
360/// Decode `encode_deletion_entry` result.
361/// Used by index functions in `log_open_options()`.
362fn decode_deletion_entry(data: &[u8]) -> Result<Vec<(Id, &[u8])>> {
363    assert!(data.starts_with(IdMap::MAGIC_DELETION_PREFIX));
364    let mut data = &data[IdMap::MAGIC_DELETION_PREFIX.len()..];
365    let n = data.read_vlq()?;
366    let mut items = Vec::with_capacity(n);
367    for _ in 0..n {
368        let id: u64 = data.read_vlq()?;
369        let id = Id(id);
370        let name_len: usize = data.read_vlq()?;
371        if name_len > data.len() {
372            return bug("decode_deletion_id_names got incomplete input");
373        }
374        let (name, rest) = data.split_at(name_len);
375        data = rest;
376        items.push((id, name));
377    }
378    Ok(items)
379}
380
381impl fmt::Debug for IdMap {
382    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
383        write!(f, "IdMap {{\n")?;
384        for data in self.log.iter() {
385            if let Ok(mut data) = data {
386                let id = data.read_u64::<BigEndian>().unwrap();
387                let _group = data.read_u8().unwrap();
388                let mut name = Vec::with_capacity(20);
389                data.read_to_end(&mut name).unwrap();
390                let name = if name.len() >= 20 {
391                    VertexName::from(name).to_hex()
392                } else {
393                    String::from_utf8_lossy(&name).to_string()
394                };
395                let id = Id(id);
396                write!(f, "  {}: {},\n", name, id)?;
397            }
398        }
399        write!(f, "}}\n")?;
400        Ok(())
401    }
402}
403
404#[async_trait::async_trait]
405impl IdConvert for IdMap {
406    async fn vertex_id(&self, name: VertexName) -> Result<Id> {
407        self.find_id_by_name(name.as_ref())?
408            .ok_or_else(|| name.not_found_error())
409    }
410    async fn vertex_id_with_max_group(
411        &self,
412        name: &VertexName,
413        max_group: Group,
414    ) -> Result<Option<Id>> {
415        self.find_id_by_name_with_max_group(name.as_ref(), max_group)
416    }
417    async fn vertex_name(&self, id: Id) -> Result<VertexName> {
418        self.find_vertex_name_by_id(id)?
419            .ok_or_else(|| id.not_found_error())
420    }
421    async fn contains_vertex_name(&self, name: &VertexName) -> Result<bool> {
422        Ok(self.find_id_by_name(name.as_ref())?.is_some())
423    }
424    async fn contains_vertex_id_locally(&self, ids: &[Id]) -> Result<Vec<bool>> {
425        let mut list = Vec::with_capacity(ids.len());
426        for &id in ids {
427            list.push(self.find_name_by_id(id)?.is_some());
428        }
429        Ok(list)
430    }
431    async fn contains_vertex_name_locally(&self, names: &[VertexName]) -> Result<Vec<bool>> {
432        let mut list = Vec::with_capacity(names.len());
433        for name in names {
434            let contains = self.find_id_by_name(name.as_ref())?.is_some();
435            tracing::trace!("contains_vertex_name_locally({:?}) = {}", name, contains);
436            list.push(contains);
437        }
438        Ok(list)
439    }
440    fn map_id(&self) -> &str {
441        &self.map_id
442    }
443    fn map_version(&self) -> &VerLink {
444        &self.map_version
445    }
446}
447
448#[async_trait::async_trait]
449impl IdMapWrite for IdMap {
450    async fn insert(&mut self, id: Id, name: &[u8]) -> Result<()> {
451        IdMap::insert(self, id, name)
452    }
453    async fn remove_range(&mut self, low: Id, high: Id) -> Result<Vec<VertexName>> {
454        IdMap::remove_range(self, low, high)
455    }
456}
457
458impl Persist for IdMap {
459    type Lock = File;
460
461    fn lock(&mut self) -> Result<Self::Lock> {
462        if self.log.iter_dirty().next().is_some() {
463            return programming("lock() must be called without dirty in-memory entries");
464        }
465        let lock_file = {
466            let mut path = self.path.clone();
467            path.push("wlock");
468            File::open(&path).or_else(|_| {
469                fs::OpenOptions::new()
470                    .write(true)
471                    .create_new(true)
472                    .open(&path)
473            })?
474        };
475        lock_file.lock_exclusive()?;
476        Ok(lock_file)
477    }
478
479    fn reload(&mut self, _lock: &Self::Lock) -> Result<()> {
480        self.log.clear_dirty()?;
481        self.log.sync()?;
482        Ok(())
483    }
484
485    fn persist(&mut self, _lock: &Self::Lock) -> Result<()> {
486        self.log.sync()?;
487        Ok(())
488    }
489}
490
491#[async_trait::async_trait]
492impl PrefixLookup for IdMap {
493    async fn vertexes_by_hex_prefix(
494        &self,
495        hex_prefix: &[u8],
496        limit: usize,
497    ) -> Result<Vec<VertexName>> {
498        self.find_names_by_hex_prefix(hex_prefix, limit)
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505
506    #[test]
507    fn test_encode_decode_deletion_entry() {
508        let items: &[(Id, &[u8])] = &[
509            (Id(0), b"a"),
510            (Id(1), b"bb"),
511            (Id(10), b"ccc"),
512            (Id(20), b"dd"),
513        ];
514        let data = encode_deletion_entry(items);
515        let decoded = decode_deletion_entry(&data).unwrap();
516        assert_eq!(&decoded, items);
517    }
518}