Skip to main content

dag/idmap/
indexedlog_idmap.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::fmt;
9use std::fs;
10use std::fs::File;
11use std::path::Path;
12use std::path::PathBuf;
13
14use byteorder::BigEndian;
15use byteorder::ReadBytesExt;
16use fs2::FileExt;
17use indexedlog::log;
18use vlqencoding::VLQDecode;
19use vlqencoding::VLQEncode;
20
21use super::mem_idmap::CoreMemIdMap;
22use super::IdMapWrite;
23use crate::errors::bug;
24use crate::errors::programming;
25use crate::errors::NotFoundError;
26use crate::id::Group;
27use crate::id::Id;
28use crate::id::Vertex;
29use crate::ops::IdConvert;
30use crate::ops::Persist;
31use crate::ops::PrefixLookup;
32use crate::ops::TryClone;
33use crate::Result;
34use crate::VerLink;
35
36/// Bi-directional mapping between an integer id and a name (`[u8]`).
37///
38/// Backed by the filesystem.
39pub struct IdMap {
40    pub(crate) log: log::Log,
41    path: PathBuf,
42    map_id: String,
43    map_version: VerLink,
44    // Extra state for ids in the VIRTUAL group. Never write to disk.
45    virtual_map: CoreMemIdMap,
46}
47
48impl IdMap {
49    // Format:
50    //
51    // - Insertion:
52    //   id (8 bytes, BE) + group (1 byte) + name (n bytes)
53    // - Deletion:
54    //   u64::MAX (8 bytes, BE) + n (VLQ) + [id (VLQ) + len(name) (VLQ) + name ] * n
55    // - Clear non-master (only id->name mappings, being deprecated):
56    //   CLRNM
57
58    const INDEX_ID_TO_NAME: usize = 0;
59    const INDEX_GROUP_NAME_TO_ID: usize = 1;
60
61    /// Magic bytes in `Log` that indicates "remove all non-master id->name
62    /// mappings". A valid entry has at least 8 bytes so does not conflict
63    /// with this.
64    const MAGIC_CLEAR_NON_MASTER: &'static [u8] = b"CLRNM";
65
66    /// Magic prefix for deletion. It's u64::MAX id, which does not conflict
67    /// with a valid id because it's > `Id::MAX`.
68    const MAGIC_DELETION_PREFIX: &'static [u8] = &u64::MAX.to_be_bytes();
69
70    /// Start offset in an entry for "name".
71    const NAME_OFFSET: usize = 8 + Group::BYTES;
72
73    /// Create an [`IdMap`] backed by the given directory.
74    ///
75    /// By default, only read-only operations are allowed. For writing
76    /// access, call [`IdMap::make_writable`] to get a writable instance.
77    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
78        let path = path.as_ref();
79        let log = Self::log_open_options().open(path)?;
80        Self::open_from_log(log)
81    }
82}
83
84impl TryClone for IdMap {
85    fn try_clone(&self) -> Result<Self> {
86        let result = Self {
87            log: self.log.try_clone()?,
88            path: self.path.clone(),
89            map_id: self.map_id.clone(),
90            map_version: self.map_version.clone(),
91            virtual_map: self.virtual_map.clone(),
92        };
93        Ok(result)
94    }
95}
96
97impl IdMap {
98    pub(crate) fn open_from_log(log: log::Log) -> Result<Self> {
99        let path = log.path().as_opt_path().unwrap().to_path_buf();
100        let map_id = format!("ilog:{}", path.display());
101        let map_version = VerLink::from_storage_version_or_new(&map_id, log.version());
102        Ok(Self {
103            log,
104            path,
105            map_id,
106            map_version,
107            virtual_map: Default::default(),
108        })
109    }
110
111    pub(crate) fn log_open_options() -> log::OpenOptions {
112        assert!(Self::MAGIC_DELETION_PREFIX > &Id::MAX.0.to_be_bytes()[..]);
113        log::OpenOptions::new()
114            .create(true)
115            .index("id", |data| {
116                assert!(Self::MAGIC_CLEAR_NON_MASTER.len() < 8);
117                assert!(Group::BITS == 8);
118                if data.starts_with(Self::MAGIC_DELETION_PREFIX) {
119                    let items =
120                        decode_deletion_entry(data).expect("deletion entry should be valid");
121                    items
122                        .into_iter()
123                        .map(|(id, _name)| log::IndexOutput::Remove(id.0.to_be_bytes().into()))
124                        .collect()
125                } else if data.len() < 8 {
126                    if data == Self::MAGIC_CLEAR_NON_MASTER {
127                        vec![log::IndexOutput::RemovePrefix(Box::new([
128                            Group::NON_MASTER.0 as u8,
129                        ]))]
130                    } else {
131                        panic!("bug: invalid segment {:?}", &data);
132                    }
133                } else {
134                    let slice = &data[..8];
135                    let slice: [u8; 8] = slice.try_into().unwrap(); // length checked
136                    let id = Id(u64::from_be_bytes(slice));
137                    assert!(
138                        !id.is_virtual(),
139                        "bug: VIRTUAL group should never be written to disk"
140                    );
141                    vec![log::IndexOutput::Reference(0..8)]
142                }
143            })
144            .index("group-name", |data| {
145                if data.starts_with(Self::MAGIC_DELETION_PREFIX) {
146                    let items =
147                        decode_deletion_entry(data).expect("deletion entry should be valid");
148                    items
149                        .into_iter()
150                        .map(|(id, name)| {
151                            let mut key = Vec::with_capacity(name.len() + 1);
152                            key.extend_from_slice(&id.group().bytes());
153                            key.extend_from_slice(name);
154                            log::IndexOutput::Remove(key.into())
155                        })
156                        .collect()
157                } else if data.len() >= 8 {
158                    vec![log::IndexOutput::Reference(8..(data.len() as u64))]
159                } else if data == Self::MAGIC_CLEAR_NON_MASTER {
160                    vec![log::IndexOutput::RemovePrefix(Box::new([
161                        Group::NON_MASTER.0 as u8,
162                    ]))]
163                } else {
164                    panic!("bug: invalid segment {:?}", &data);
165                }
166            })
167            .flush_filter(Some(|_, _| {
168                panic!("programming error: idmap changed by other process")
169            }))
170    }
171
172    /// Find name by a specified integer id.
173    pub fn find_name_by_id(&self, id: Id) -> Result<Option<&[u8]>> {
174        if id.is_virtual() {
175            return Ok(self.virtual_map.lookup_vertex_name(id).map(|v| v.as_ref()));
176        }
177        let key = id.0.to_be_bytes();
178        let key = self.log.lookup(Self::INDEX_ID_TO_NAME, key)?.nth(0);
179        match key {
180            Some(Ok(entry)) => {
181                if entry.len() < 8 {
182                    return bug("index key should have 8 bytes at least");
183                }
184                Ok(Some(&entry[Self::NAME_OFFSET..]))
185            }
186            None => Ok(None),
187            Some(Err(err)) => Err(err.into()),
188        }
189    }
190
191    /// Find Vertex by a specified integer id.
192    pub fn find_vertex_name_by_id(&self, id: Id) -> Result<Option<Vertex>> {
193        if !id.is_valid() {
194            return Ok(None);
195        }
196        if id.is_virtual() {
197            return Ok(self.virtual_map.lookup_vertex_name(id).cloned());
198        }
199        self.find_name_by_id(id)
200            .map(|v| v.map(|n| Vertex(self.log.slice_to_bytes(n))))
201    }
202
203    /// Find the integer id matching the given name.
204    pub fn find_id_by_name(&self, name: &[u8]) -> Result<Option<Id>> {
205        for group in Group::ALL.iter() {
206            if *group == Group::VIRTUAL {
207                if let Some(found_id) = self.virtual_map.lookup_vertex_id(&Vertex::copy_from(name))
208                {
209                    return Ok(Some(found_id));
210                }
211            } else {
212                let mut group_name = Vec::with_capacity(Group::BYTES + name.len());
213                group_name.extend_from_slice(&group.bytes());
214                group_name.extend_from_slice(name);
215                let key = self
216                    .log
217                    .lookup(Self::INDEX_GROUP_NAME_TO_ID, group_name)?
218                    .nth(0);
219                match key {
220                    Some(Ok(mut entry)) => {
221                        if entry.len() < 8 {
222                            return bug("index key should have 8 bytes at least");
223                        }
224                        let id = Id(entry.read_u64::<BigEndian>().unwrap());
225                        return Ok(Some(id));
226                    }
227                    None => {}
228                    Some(Err(err)) => return Err(err.into()),
229                }
230            }
231        }
232        Ok(None)
233    }
234
235    /// Similar to `find_name_by_id`, but returns None if group > `max_group`.
236    pub fn find_id_by_name_with_max_group(
237        &self,
238        name: &[u8],
239        max_group: Group,
240    ) -> Result<Option<Id>> {
241        Ok(self.find_id_by_name(name)?.and_then(|id| {
242            if id.group() <= max_group {
243                Some(id)
244            } else {
245                None
246            }
247        }))
248    }
249
250    /// Insert a new entry mapping from a name to an id.
251    ///
252    /// Errors if the new entry conflicts with existing entries.
253    pub fn insert(&mut self, id: Id, name: &[u8]) -> Result<()> {
254        let existing_name = self.find_name_by_id(id)?;
255        if let Some(existing_name) = existing_name {
256            if existing_name == name {
257                return Ok(());
258            } else {
259                return bug(format!(
260                    "new entry {} = {:?} conflicts with an existing entry {} = {:?}",
261                    id, name, id, existing_name
262                ));
263            }
264        }
265        let existing_id = self.find_id_by_name(name)?;
266        if let Some(existing_id) = existing_id {
267            // Allow re-assigning Ids from a higher group to a lower group.
268            // For example, when a non-master commit gets merged into the
269            // master branch, the id is re-assigned to master. But, the
270            // ids in the master group will never be re-assigned to
271            // non-master groups.
272            if existing_id == id {
273                return Ok(());
274            } else {
275                return bug(format!(
276                    "new entry {} = {:?} conflicts with an existing entry {} = {:?}",
277                    id, name, existing_id, name
278                ));
279            }
280        }
281
282        if id.is_virtual() {
283            self.virtual_map
284                .insert_vertex_id_name(id, Vertex::copy_from(name));
285            return Ok(());
286        }
287
288        let mut data = Vec::with_capacity(8 + Group::BYTES + name.len());
289        data.extend_from_slice(&id.0.to_be_bytes());
290        data.extend_from_slice(&id.group().bytes());
291        data.extend_from_slice(name);
292        self.log.append(data)?;
293        self.map_version.bump();
294        #[cfg(debug_assertions)]
295        {
296            let items = self.find_range(id, id).unwrap();
297            assert_eq!(items[0], (id, name));
298        }
299        Ok(())
300    }
301
302    /// Find all (id, name) pairs in the `low..=high` range.
303    fn find_range(&self, low: Id, high: Id) -> Result<Vec<(Id, &[u8])>> {
304        let low_bytes = low.0.to_be_bytes();
305        let high_bytes = high.0.to_be_bytes();
306        let range = &low_bytes[..]..=&high_bytes[..];
307        let mut items = Vec::new();
308        if !low.is_virtual() {
309            for entry in self.log.lookup_range(Self::INDEX_ID_TO_NAME, range)? {
310                let (key, values) = entry?;
311                let key: [u8; 8] = match key.as_ref().try_into() {
312                    Ok(key) => key,
313                    Err(_) => {
314                        return bug("find_range got non-u64 keys in INDEX_ID_TO_NAME");
315                    }
316                };
317                let id = Id(u64::from_be_bytes(key));
318                for value in values {
319                    let value = value?;
320                    if value.len() < 8 {
321                        return bug(format!(
322                            "find_range got entry {:?} shorter than expected",
323                            &value
324                        ));
325                    }
326                    let name: &[u8] = &value[9..];
327                    items.push((id, name));
328                }
329            }
330        }
331        if high.is_virtual() {
332            for (id, name) in self.virtual_map.lookup_range(low, high) {
333                items.push((*id, name.as_ref()))
334            }
335        }
336        Ok(items)
337    }
338
339    fn remove_range(&mut self, low: Id, high: Id) -> Result<Vec<Vertex>> {
340        let mut names = Vec::new();
341        if !low.is_virtual() {
342            // Step 1: Find (id, name) pairs in the range except for virtual items.
343            debug_assert!(Group::ALL.contains(&(Group::VIRTUAL - 1)));
344            let items = self.find_range(low, (Group::VIRTUAL - 1).max_id().min(high))?;
345            names.extend(items.iter().map(|(_, bytes)| Vertex::copy_from(bytes)));
346            // Step 2: Write a "delete" entry to delete those indexes.
347            // The indexedlog index function (defined by log_open_options())
348            // will handle it.
349            let data = encode_deletion_entry(&items);
350            self.log.append(data)?;
351            // New map is not an "append-only" version of the previous map.
352            // Re-create the VerLink to mark it as incompatible.
353            self.map_version = VerLink::new();
354        }
355        // Step 3: Remove entries in the virutal_map.
356        if high.is_virtual() {
357            names.extend(self.virtual_map.remove_range(low, high)?);
358        }
359        Ok(names)
360    }
361
362    /// Lookup names by hex prefix.
363    fn find_names_by_hex_prefix(&self, hex_prefix: &[u8], limit: usize) -> Result<Vec<Vertex>> {
364        let mut result = Vec::with_capacity(limit);
365        for group in Group::ALL.iter().rev() {
366            if result.len() >= limit {
367                break;
368            }
369            if *group == Group::VIRTUAL {
370                result.extend(self.virtual_map.lookup_vertexes_by_hex_prefix(
371                    hex_prefix,
372                    limit.saturating_sub(result.len()),
373                )?);
374            } else {
375                let mut prefix = Vec::with_capacity(Group::BYTES * 2 + hex_prefix.len());
376                prefix.extend_from_slice(&group.hex_bytes());
377                prefix.extend_from_slice(hex_prefix);
378                for entry in self
379                    .log
380                    .lookup_prefix_hex(Self::INDEX_GROUP_NAME_TO_ID, prefix)?
381                {
382                    if result.len() >= limit {
383                        break;
384                    }
385                    let (k, _v) = entry?;
386                    let vertex = Vertex(self.log.slice_to_bytes(&k[Group::BYTES..]));
387                    if !result.contains(&vertex) {
388                        result.push(vertex);
389                    }
390                }
391            }
392        }
393        Ok(result)
394    }
395}
396
397/// Encode a list of (id, name) pairs as an deletion entry.
398/// The deletion entry will be consumed by the index functions defined by
399/// `log_open_options()`.
400fn encode_deletion_entry(items: &[(Id, &[u8])]) -> Vec<u8> {
401    // Rough size for common 20-byte sha1 hashes.
402    let len = IdMap::MAGIC_DELETION_PREFIX.len() + 9 + items.len() * 30;
403    let mut data = Vec::with_capacity(len);
404    data.extend_from_slice(IdMap::MAGIC_DELETION_PREFIX);
405    data.write_vlq(items.len()).unwrap();
406    for (id, name) in items {
407        data.write_vlq(id.0).unwrap();
408        data.write_vlq(name.len()).unwrap();
409        data.extend_from_slice(name);
410    }
411    data
412}
413
414/// Decode `encode_deletion_entry` result.
415/// Used by index functions in `log_open_options()`.
416fn decode_deletion_entry(data: &[u8]) -> Result<Vec<(Id, &[u8])>> {
417    assert!(data.starts_with(IdMap::MAGIC_DELETION_PREFIX));
418    let mut data = &data[IdMap::MAGIC_DELETION_PREFIX.len()..];
419    let n = data.read_vlq()?;
420    let mut items = Vec::with_capacity(n);
421    for _ in 0..n {
422        let id: u64 = data.read_vlq()?;
423        let id = Id(id);
424        let name_len: usize = data.read_vlq()?;
425        if name_len > data.len() {
426            return bug("decode_deletion_id_names got incomplete input");
427        }
428        let (name, rest) = data.split_at(name_len);
429        data = rest;
430        items.push((id, name));
431    }
432    Ok(items)
433}
434
435impl fmt::Debug for IdMap {
436    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
437        write!(f, "IdMap {{\n")?;
438        let vec = self.find_range(Id::MIN, Id::MAX).map_err(|_| fmt::Error)?;
439        for (id, name) in vec {
440            let name = if name.len() >= 20 {
441                Vertex::copy_from(name).to_hex()
442            } else {
443                String::from_utf8_lossy(&name).to_string()
444            };
445            write!(f, "  {}: {},\n", name, id)?;
446        }
447        write!(f, "}}\n")?;
448        Ok(())
449    }
450}
451
452#[async_trait::async_trait]
453impl IdConvert for IdMap {
454    async fn vertex_id(&self, name: Vertex) -> Result<Id> {
455        self.find_id_by_name(name.as_ref())?
456            .ok_or_else(|| name.not_found_error())
457    }
458    async fn vertex_id_with_max_group(
459        &self,
460        name: &Vertex,
461        max_group: Group,
462    ) -> Result<Option<Id>> {
463        self.find_id_by_name_with_max_group(name.as_ref(), max_group)
464    }
465    async fn vertex_name(&self, id: Id) -> Result<Vertex> {
466        self.find_vertex_name_by_id(id)?
467            .ok_or_else(|| id.not_found_error())
468    }
469    async fn contains_vertex_name(&self, name: &Vertex) -> Result<bool> {
470        Ok(self.find_id_by_name(name.as_ref())?.is_some())
471    }
472    async fn contains_vertex_id_locally(&self, ids: &[Id]) -> Result<Vec<bool>> {
473        let mut list = Vec::with_capacity(ids.len());
474        for &id in ids {
475            list.push(self.find_name_by_id(id)?.is_some());
476        }
477        Ok(list)
478    }
479    async fn contains_vertex_name_locally(&self, names: &[Vertex]) -> Result<Vec<bool>> {
480        let mut list = Vec::with_capacity(names.len());
481        for name in names {
482            let contains = self.find_id_by_name(name.as_ref())?.is_some();
483            tracing::trace!("contains_vertex_name_locally({:?}) = {}", name, contains);
484            list.push(contains);
485        }
486        Ok(list)
487    }
488    fn map_id(&self) -> &str {
489        &self.map_id
490    }
491    fn map_version(&self) -> &VerLink {
492        &self.map_version
493    }
494}
495
496#[async_trait::async_trait]
497impl IdMapWrite for IdMap {
498    async fn insert(&mut self, id: Id, name: &[u8]) -> Result<()> {
499        IdMap::insert(self, id, name)
500    }
501    async fn remove_range(&mut self, low: Id, high: Id) -> Result<Vec<Vertex>> {
502        IdMap::remove_range(self, low, high)
503    }
504}
505
506impl Persist for IdMap {
507    type Lock = File;
508
509    fn lock(&mut self) -> Result<Self::Lock> {
510        if self.log.iter_dirty().next().is_some() {
511            return programming("lock() must be called without dirty in-memory entries");
512        }
513        let lock_file = {
514            let path = self.path.join("wlock");
515            // Some NFS implementation reports `EBADF` for `flock()` unless the file is opened
516            // with `O_RDWR`.
517            fs::OpenOptions::new()
518                .read(true)
519                .write(true)
520                .create(true)
521                .open(&path)?
522        };
523        lock_file.lock_exclusive()?;
524        Ok(lock_file)
525    }
526
527    fn reload(&mut self, _lock: &Self::Lock) -> Result<()> {
528        self.log.clear_dirty()?;
529        self.log.sync()?;
530        Ok(())
531    }
532
533    fn persist(&mut self, _lock: &Self::Lock) -> Result<()> {
534        self.log.sync()?;
535        self.map_version
536            .associate_storage_version(self.map_id.clone(), self.log.version());
537        Ok(())
538    }
539}
540
541#[async_trait::async_trait]
542impl PrefixLookup for IdMap {
543    async fn vertexes_by_hex_prefix(&self, hex_prefix: &[u8], limit: usize) -> Result<Vec<Vertex>> {
544        self.find_names_by_hex_prefix(hex_prefix, limit)
545    }
546}
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551
552    #[test]
553    fn test_encode_decode_deletion_entry() {
554        let items: &[(Id, &[u8])] = &[
555            (Id(0), b"a"),
556            (Id(1), b"bb"),
557            (Id(10), b"ccc"),
558            (Id(20), b"dd"),
559        ];
560        let data = encode_deletion_entry(items);
561        let decoded = decode_deletion_entry(&data).unwrap();
562        assert_eq!(&decoded, items);
563    }
564}