indexedlog/
index.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8//! Index support for `log`.
9//!
10//! See [`Index`] for the main structure.
11
12// File format:
13//
14// ```plain,ignore
15// INDEX       := HEADER + ENTRY_LIST
16// HEADER      := '\0'  (takes offset 0, so 0 is not a valid offset for ENTRY)
17// ENTRY_LIST  := RADIX | ENTRY_LIST + ENTRY
18// ENTRY       := RADIX | LEAF | LINK | KEY | ROOT + REVERSED(VLQ(ROOT_LEN)) |
19//                ROOT + CHECKSUM + REVERSED(VLQ(ROOT_LEN + CHECKSUM_LEN))
20// RADIX       := '\2' + RADIX_FLAG (1 byte) + BITMAP (2 bytes) +
21//                PTR2(RADIX | LEAF) * popcnt(BITMAP) + PTR2(LINK)
22// LEAF        := '\3' + PTR(KEY | EXT_KEY) + PTR(LINK)
23// LINK        := '\4' + VLQ(VALUE) + PTR(NEXT_LINK | NULL)
24// KEY         := '\5' + VLQ(KEY_LEN) + KEY_BYTES
25// EXT_KEY     := '\6' + VLQ(KEY_START) + VLQ(KEY_LEN)
26// INLINE_LEAF := '\7' + EXT_KEY + LINK
27// ROOT        := '\1' + PTR(RADIX) + VLQ(META_LEN) + META
28// CHECKSUM    := '\8' + PTR(PREVIOUS_CHECKSUM) + VLQ(CHUNK_SIZE_LOGARITHM) +
29//                VLQ(CHECKSUM_CHUNK_START) + XXHASH_LIST + CHECKSUM_XX32 (LE32)
30// XXHASH_LIST := A list of 64-bit xxhash in Little Endian.
31//
32// PTR(ENTRY)  := VLQ(the offset of ENTRY)
33// PTR2(ENTRY) := the offset of ENTRY, in 0 or 4, or 8 bytes depending on BITMAP and FLAGS
34//
35// RADIX_FLAG := USE_64_BIT (1 bit) + RESERVED (6 bits) + HAVE_LINK (1 bit)
36// ```
37//
38// Some notes about the format:
39//
40// - A "RADIX" entry has 16 children. This is mainly for source control hex hashes. The "N"
41//   in a radix entry could be less than 16 if some of the children are missing (ex. offset = 0).
42//   The corresponding jump table bytes of missing children are 0s. If child i exists, then
43//   `jumptable[i]` is the relative (to the beginning of radix entry) offset of PTR(child offset).
44// - A "ROOT" entry its length recorded as the last byte. Normally the root entry is written
45//   at the end. This makes it easier for the caller - it does not have to record the position
46//   of the root entry. The caller could optionally provide a root location.
47// - An entry has a 1 byte "type". This makes it possible to do a linear scan from the
48//   beginning of the file, instead of having to go through a root. Potentially useful for
49//   recovery purpose, or adding new entry types (ex. tree entries other than the 16-children
50//   radix entry, value entries that are not u64 linked list, key entries that refers external
51//   buffer).
52// - The "EXT_KEY" type has a logically similar function with "KEY". But it refers to an external
53//   buffer. This is useful to save spaces if the index is not a source of truth and keys are
54//   long.
55// - The "INLINE_LEAF" type is basically an inlined version of EXT_KEY and LINK, to save space.
56// - The "ROOT_LEN" is reversed so it can be read byte-by-byte from the end of a file.
57
58use std::borrow::Cow;
59use std::cmp::Ordering::Equal;
60use std::cmp::Ordering::Greater;
61use std::cmp::Ordering::Less;
62use std::fmt;
63use std::fmt::Debug;
64use std::fmt::Formatter;
65use std::fs;
66use std::fs::File;
67use std::hash::Hasher;
68use std::io;
69use std::io::Read;
70use std::io::Seek;
71use std::io::SeekFrom;
72use std::io::Write;
73use std::mem::size_of;
74use std::ops::Bound;
75use std::ops::Bound::Excluded;
76use std::ops::Bound::Included;
77use std::ops::Bound::Unbounded;
78use std::ops::Deref;
79use std::ops::RangeBounds;
80use std::path::Path;
81use std::path::PathBuf;
82use std::sync::atomic::AtomicU64;
83use std::sync::atomic::Ordering::AcqRel;
84use std::sync::atomic::Ordering::Acquire;
85use std::sync::atomic::Ordering::Relaxed;
86use std::sync::Arc;
87
88use byteorder::ByteOrder;
89use byteorder::LittleEndian;
90use byteorder::ReadBytesExt;
91use byteorder::WriteBytesExt;
92use fs2::FileExt;
93use minibytes::Bytes;
94use tracing::debug_span;
95use twox_hash::XxHash;
96use vlqencoding::VLQDecodeAt;
97use vlqencoding::VLQEncode;
98
99use crate::base16::base16_to_base256;
100use crate::base16::single_hex_to_base16;
101use crate::base16::Base16Iter;
102use crate::config;
103use crate::errors::IoResultExt;
104use crate::errors::ResultExt;
105use crate::lock::ScopedFileLock;
106use crate::utils;
107use crate::utils::mmap_bytes;
108use crate::utils::xxhash;
109use crate::utils::xxhash32;
110
111/// Structures and serialization
112
113#[derive(Clone, PartialEq, Default)]
114struct MemRadix {
115    pub offsets: [Offset; 16],
116    pub link_offset: LinkOffset,
117}
118
119#[derive(Clone, PartialEq)]
120struct MemLeaf {
121    pub key_offset: Offset,
122    pub link_offset: LinkOffset,
123}
124
125#[derive(Clone, PartialEq)]
126struct MemKey {
127    pub key: Box<[u8]>, // base256
128}
129
130#[derive(Clone, PartialEq)]
131struct MemExtKey {
132    pub start: u64,
133    pub len: u64,
134}
135
136#[derive(Clone, PartialEq)]
137struct MemLink {
138    pub value: u64,
139    pub next_link_offset: LinkOffset,
140    pub unused: bool,
141}
142
143#[derive(Clone, PartialEq)]
144struct MemRoot {
145    pub radix_offset: RadixOffset,
146    pub meta: Box<[u8]>,
147}
148
149/// A Checksum entry specifies the checksums for all bytes before the checksum
150/// entry.  To make CHECKSUM entry size bounded, a Checksum entry can refer to a
151/// previous Checksum entry so it does not have to repeat byte range that is
152/// already covered by the previous entry.  The xxhash list contains one xxhash
153/// per chunk. A chunk has (1 << chunk_size_logarithm) bytes.  The last chunk
154/// can be incomplete.
155struct MemChecksum {
156    /// Indicates the "start" offset of the bytes that should be written
157    /// on serialization.
158    ///
159    /// This is also the offset of the previous Checksum entry.
160    start: u64,
161
162    /// Indicates the "end" offset of the bytes that can be checked.
163    ///
164    /// This is also the offset of the current Checksum entry.
165    end: u64,
166
167    /// Tracks the chain length. Used to detect whether `checksum_max_chain_len`
168    /// is exceeded or not.
169    chain_len: u32,
170
171    /// Each chunk has (1 << chunk_size_logarithmarithm) bytes. The last chunk
172    /// can be shorter.
173    chunk_size_logarithm: u32,
174
175    /// Checksums per chunk.
176    ///
177    /// The start of a chunk always aligns with the chunk size, which might not
178    /// match `start`. For example, given the `start` and `end` shown below:
179    ///
180    /// ```plain,ignore
181    /// | chunk 1 (1MB) | chunk 2 (1MB) | chunk 3 (1MB) | chunk 4 (200K) |
182    ///                     |<-start                                end->|
183    /// ```
184    ///
185    /// The `xxhash_list` would be:
186    ///
187    /// ```plain,ignore
188    /// [xxhash(chunk 2 (1MB)), xxhash(chunk 3 (1MB)), xxhash(chunk 4 (200K))]
189    /// ```
190    ///
191    /// For the root checksum (`Index::chunksum`), the `xxhash_list` should
192    /// conver the entire buffer, as if `start` is 0 (but `start` is not 0).
193    xxhash_list: Vec<u64>,
194
195    /// Whether chunks are checked against `xxhash_list`.
196    ///
197    /// Stored in a bit vector. `checked.len() * 64` should be >=
198    /// `xxhash_list.len()`.
199    checked: Vec<AtomicU64>,
200}
201
202/// Read reversed vlq at the given end offset (exclusive).
203/// Return the decoded integer and the bytes used by the VLQ integer.
204fn read_vlq_reverse(buf: &[u8], end_offset: usize) -> io::Result<(u64, usize)> {
205    let mut int_buf = Vec::new();
206    for i in (0..end_offset).rev() {
207        int_buf.push(buf[i]);
208        if buf[i] <= 127 {
209            break;
210        }
211    }
212    let (value, vlq_size) = int_buf.read_vlq_at(0)?;
213    assert_eq!(vlq_size, int_buf.len());
214    Ok((value, vlq_size))
215}
216
217// Offsets that are >= DIRTY_OFFSET refer to in-memory entries that haven't been
218// written to disk. Offsets < DIRTY_OFFSET are on-disk offsets.
219const DIRTY_OFFSET: u64 = 1u64 << 63;
220
221const TYPE_HEAD: u8 = 0;
222const TYPE_ROOT: u8 = 1;
223const TYPE_RADIX: u8 = 2;
224const TYPE_LEAF: u8 = 3;
225const TYPE_LINK: u8 = 4;
226const TYPE_KEY: u8 = 5;
227const TYPE_EXT_KEY: u8 = 6;
228const TYPE_INLINE_LEAF: u8 = 7;
229const TYPE_CHECKSUM: u8 = 8;
230
231// Bits needed to represent the above type integers.
232const TYPE_BITS: usize = 3;
233
234// Size constants. Do not change.
235const TYPE_BYTES: usize = 1;
236const RADIX_FLAG_BYTES: usize = 1;
237const RADIX_BITMAP_BYTES: usize = 2;
238
239// Bit flags used by radix
240const RADIX_FLAG_USE_64BIT: u8 = 1;
241const RADIX_FLAG_HAVE_LINK: u8 = 1 << 7;
242
243/// Offset to an entry. The type of the entry is yet to be resolved.
244#[derive(Copy, Clone, PartialEq, PartialOrd, Default)]
245pub struct Offset(u64);
246
247// Typed offsets. Constructed after verifying types.
248// `LinkOffset` is public since it's exposed by some APIs.
249
250#[derive(Copy, Clone, PartialEq, PartialOrd, Default)]
251struct RadixOffset(Offset);
252#[derive(Copy, Clone, PartialEq, PartialOrd, Default)]
253struct LeafOffset(Offset);
254
255/// Offset to a linked list entry.
256///
257/// The entry stores a [u64] integer and optionally, the next [`LinkOffset`].
258#[derive(Copy, Clone, PartialEq, PartialOrd, Default)]
259pub struct LinkOffset(Offset);
260#[derive(Copy, Clone, PartialEq, PartialOrd, Default)]
261struct KeyOffset(Offset);
262#[derive(Copy, Clone, PartialEq, PartialOrd, Default)]
263struct ExtKeyOffset(Offset);
264#[derive(Copy, Clone, PartialEq, PartialOrd, Default)]
265struct ChecksumOffset(Offset);
266
267#[derive(Copy, Clone)]
268enum TypedOffset {
269    Radix(RadixOffset),
270    Leaf(LeafOffset),
271    Link(LinkOffset),
272    Key(KeyOffset),
273    ExtKey(ExtKeyOffset),
274    Checksum(ChecksumOffset),
275}
276
277impl Offset {
278    /// Convert an unverified `u64` read from disk to a non-dirty `Offset`.
279    /// Return [`errors::IndexError`] if the offset is dirty.
280    #[inline]
281    fn from_disk(index: impl IndexBuf, value: u64) -> crate::Result<Self> {
282        if value >= DIRTY_OFFSET {
283            Err(index.corruption(format!("illegal disk offset {}", value)))
284        } else {
285            Ok(Offset(value))
286        }
287    }
288
289    /// Convert a possibly "dirty" offset to a non-dirty offset.
290    /// Useful when writing offsets to disk.
291    #[inline]
292    fn to_disk(self, offset_map: &OffsetMap) -> u64 {
293        offset_map.get(self)
294    }
295
296    /// Convert to `TypedOffset`.
297    fn to_typed(self, index: impl IndexBuf) -> crate::Result<TypedOffset> {
298        let type_int = self.type_int(&index)?;
299        match type_int {
300            TYPE_RADIX => Ok(TypedOffset::Radix(RadixOffset(self))),
301            TYPE_LEAF => Ok(TypedOffset::Leaf(LeafOffset(self))),
302            TYPE_LINK => Ok(TypedOffset::Link(LinkOffset(self))),
303            TYPE_KEY => Ok(TypedOffset::Key(KeyOffset(self))),
304            TYPE_EXT_KEY => Ok(TypedOffset::ExtKey(ExtKeyOffset(self))),
305            // LeafOffset handles inline transparently.
306            TYPE_INLINE_LEAF => Ok(TypedOffset::Leaf(LeafOffset(self))),
307            TYPE_CHECKSUM => Ok(TypedOffset::Checksum(ChecksumOffset(self))),
308            _ => Err(index.corruption(format!("type {} is unsupported", type_int))),
309        }
310    }
311
312    /// Read the `type_int` value.
313    fn type_int(self, index: impl IndexBuf) -> crate::Result<u8> {
314        let buf = index.buf();
315        if self.is_null() {
316            Err(index.corruption("invalid read from null"))
317        } else if self.is_dirty() {
318            Ok(((self.0 - DIRTY_OFFSET) & ((1 << TYPE_BITS) - 1)) as u8)
319        } else {
320            index.verify_checksum(self.0, TYPE_BYTES as u64)?;
321            match buf.get(self.0 as usize) {
322                Some(x) => Ok(*x),
323                _ => Err(index.range_error(self.0 as usize, 1)),
324            }
325        }
326    }
327
328    /// Convert to `TypedOffset` without returning detailed error message.
329    ///
330    /// This is useful for cases where the error handling is optional.
331    fn to_optional_typed(self, buf: &[u8]) -> Option<TypedOffset> {
332        if self.is_null() {
333            return None;
334        }
335        let type_int = if self.is_dirty() {
336            Some(((self.0 - DIRTY_OFFSET) & ((1 << TYPE_BITS) - 1)) as u8)
337        } else {
338            buf.get(self.0 as usize).cloned()
339        };
340        match type_int {
341            Some(TYPE_RADIX) => Some(TypedOffset::Radix(RadixOffset(self))),
342            Some(TYPE_LEAF) => Some(TypedOffset::Leaf(LeafOffset(self))),
343            Some(TYPE_LINK) => Some(TypedOffset::Link(LinkOffset(self))),
344            Some(TYPE_KEY) => Some(TypedOffset::Key(KeyOffset(self))),
345            Some(TYPE_EXT_KEY) => Some(TypedOffset::ExtKey(ExtKeyOffset(self))),
346            // LeafOffset handles inline transparently.
347            Some(TYPE_INLINE_LEAF) => Some(TypedOffset::Leaf(LeafOffset(self))),
348            Some(TYPE_CHECKSUM) => Some(TypedOffset::Checksum(ChecksumOffset(self))),
349            _ => None,
350        }
351    }
352
353    /// Test whether the offset is null (0).
354    #[inline]
355    fn is_null(self) -> bool {
356        self.0 == 0
357    }
358
359    /// Test whether the offset points to an in-memory entry.
360    #[inline]
361    fn is_dirty(self) -> bool {
362        self.0 >= DIRTY_OFFSET
363    }
364
365    fn null() -> Self {
366        Self(0)
367    }
368}
369
370// Common methods shared by typed offset structs.
371trait TypedOffsetMethods: Sized {
372    #[inline]
373    fn dirty_index(self) -> usize {
374        debug_assert!(self.to_offset().is_dirty());
375        ((self.to_offset().0 - DIRTY_OFFSET) >> TYPE_BITS) as usize
376    }
377
378    #[inline]
379    fn from_offset(offset: Offset, index: impl IndexBuf) -> crate::Result<Self> {
380        if offset.is_null() {
381            Ok(Self::from_offset_unchecked(offset))
382        } else {
383            let type_int = offset.type_int(&index)?;
384            if type_int == Self::type_int() {
385                Ok(Self::from_offset_unchecked(offset))
386            } else {
387                Err(index.corruption(format!("inconsistent type at {:?}", offset)))
388            }
389        }
390    }
391
392    #[inline]
393    fn from_dirty_index(index: usize) -> Self {
394        Self::from_offset_unchecked(Offset(
395            (((index as u64) << TYPE_BITS) | Self::type_int() as u64) + DIRTY_OFFSET,
396        ))
397    }
398
399    fn type_int() -> u8;
400
401    fn from_offset_unchecked(offset: Offset) -> Self;
402
403    fn to_offset(&self) -> Offset;
404}
405
406impl_offset!(RadixOffset, TYPE_RADIX, "Radix");
407impl_offset!(LeafOffset, TYPE_LEAF, "Leaf");
408impl_offset!(LinkOffset, TYPE_LINK, "Link");
409impl_offset!(KeyOffset, TYPE_KEY, "Key");
410impl_offset!(ExtKeyOffset, TYPE_EXT_KEY, "ExtKey");
411impl_offset!(ChecksumOffset, TYPE_CHECKSUM, "Checksum");
412
413impl RadixOffset {
414    /// Link offset of a radix entry.
415    #[inline]
416    fn link_offset(self, index: &Index) -> crate::Result<LinkOffset> {
417        if self.is_dirty() {
418            Ok(index.dirty_radixes[self.dirty_index()].link_offset)
419        } else {
420            let flag_start = TYPE_BYTES + usize::from(self);
421            let flag = *index
422                .buf
423                .get(flag_start)
424                .ok_or_else(|| index.range_error(flag_start, 1))?;
425            index.verify_checksum(
426                flag_start as u64,
427                (RADIX_FLAG_BYTES + RADIX_BITMAP_BYTES) as u64,
428            )?;
429
430            if Self::parse_have_link_from_flag(flag) {
431                let bitmap_start = flag_start + RADIX_FLAG_BYTES;
432                let bitmap = Self::read_bitmap_unchecked(index, bitmap_start)?;
433                let int_size = Self::parse_int_size_from_flag(flag);
434                let link_offset =
435                    bitmap_start + RADIX_BITMAP_BYTES + bitmap.count_ones() as usize * int_size;
436                index.verify_checksum(link_offset as u64, int_size as u64)?;
437                let raw_offset = Self::read_raw_int_unchecked(index, int_size, link_offset)?;
438                Ok(LinkOffset::from_offset(
439                    Offset::from_disk(index, raw_offset)?,
440                    index,
441                )?)
442            } else {
443                Ok(LinkOffset::default())
444            }
445        }
446    }
447
448    /// Lookup the `i`-th child inside a radix entry.
449    /// Return stored offset, or `Offset(0)` if that child does not exist.
450    #[inline]
451    fn child(self, index: &Index, i: u8) -> crate::Result<Offset> {
452        // "i" is not derived from user input.
453        assert!(i < 16);
454        if self.is_dirty() {
455            Ok(index.dirty_radixes[self.dirty_index()].offsets[i as usize])
456        } else {
457            let flag_start = TYPE_BYTES + usize::from(self);
458            let bitmap_start = flag_start + RADIX_FLAG_BYTES;
459            // Integrity of "bitmap" is checked below to reduce calls to verify_checksum, since
460            // this is a hot path.
461            let bitmap = Self::read_bitmap_unchecked(index, bitmap_start)?;
462            let has_child = (1u16 << i) & bitmap != 0;
463            if has_child {
464                let flag = *index
465                    .buf
466                    .get(flag_start)
467                    .ok_or_else(|| index.range_error(flag_start, 1))?;
468                let int_size = Self::parse_int_size_from_flag(flag);
469                let skip_child_count = (((1u16 << i) - 1) & bitmap).count_ones() as usize;
470                let child_offset = bitmap_start + RADIX_BITMAP_BYTES + skip_child_count * int_size;
471                index.verify_checksum(
472                    flag_start as u64,
473                    (child_offset + int_size - flag_start) as u64,
474                )?;
475                let raw_offset = Self::read_raw_int_unchecked(index, int_size, child_offset)?;
476                Ok(Offset::from_disk(index, raw_offset)?)
477            } else {
478                index.verify_checksum(bitmap_start as u64, RADIX_BITMAP_BYTES as u64)?;
479                Ok(Offset::default())
480            }
481        }
482    }
483
484    /// Copy an on-disk entry to memory so it can be modified. Return new offset.
485    /// If the offset is already in-memory, return it as-is.
486    #[inline]
487    fn copy(self, index: &mut Index) -> crate::Result<RadixOffset> {
488        if self.is_dirty() {
489            Ok(self)
490        } else {
491            let entry = MemRadix::read_from(index, u64::from(self))?;
492            let len = index.dirty_radixes.len();
493            index.dirty_radixes.push(entry);
494            Ok(RadixOffset::from_dirty_index(len))
495        }
496    }
497
498    /// Change a child of `MemRadix`. Panic if the offset points to an on-disk entry.
499    #[inline]
500    fn set_child(self, index: &mut Index, i: u8, value: Offset) {
501        assert!(i < 16);
502        if self.is_dirty() {
503            index.dirty_radixes[self.dirty_index()].offsets[i as usize] = value;
504        } else {
505            panic!("bug: set_child called on immutable radix entry");
506        }
507    }
508
509    /// Change link offset of `MemRadix`. Panic if the offset points to an on-disk entry.
510    #[inline]
511    fn set_link(self, index: &mut Index, value: LinkOffset) {
512        if self.is_dirty() {
513            index.dirty_radixes[self.dirty_index()].link_offset = value;
514        } else {
515            panic!("bug: set_link called on immutable radix entry");
516        }
517    }
518
519    /// Change all children and link offset to null.
520    /// Panic if the offset points to an on-disk entry.
521    fn set_all_to_null(self, index: &mut Index) {
522        if self.is_dirty() {
523            index.dirty_radixes[self.dirty_index()] = MemRadix::default();
524        } else {
525            panic!("bug: set_all_to_null called on immutable radix entry");
526        }
527    }
528
529    /// Create a new in-memory radix entry.
530    #[inline]
531    fn create(index: &mut Index, radix: MemRadix) -> RadixOffset {
532        let len = index.dirty_radixes.len();
533        index.dirty_radixes.push(radix);
534        RadixOffset::from_dirty_index(len)
535    }
536
537    /// Parse whether link offset exists from a flag.
538    #[inline]
539    fn parse_have_link_from_flag(flag: u8) -> bool {
540        flag & RADIX_FLAG_HAVE_LINK != 0
541    }
542
543    /// Parse int size (in bytes) from a flag.
544    #[inline]
545    fn parse_int_size_from_flag(flag: u8) -> usize {
546        if flag & RADIX_FLAG_USE_64BIT == 0 {
547            size_of::<u32>()
548        } else {
549            size_of::<u64>()
550        }
551    }
552
553    /// Read bitmap from the given offset without integrity check.
554    #[inline]
555    fn read_bitmap_unchecked(index: &Index, bitmap_offset: usize) -> crate::Result<u16> {
556        debug_assert_eq!(RADIX_BITMAP_BYTES, size_of::<u16>());
557        let buf = &index.buf;
558        buf.get(bitmap_offset..bitmap_offset + RADIX_BITMAP_BYTES)
559            .map(LittleEndian::read_u16)
560            .ok_or_else(|| {
561                crate::Error::corruption(
562                    &index.path,
563                    format!("cannot read radix bitmap at {}", bitmap_offset),
564                )
565            })
566    }
567
568    /// Read integer from the given offset without integrity check.
569    #[inline]
570    fn read_raw_int_unchecked(index: &Index, int_size: usize, offset: usize) -> crate::Result<u64> {
571        let buf = &index.buf;
572        let result = match int_size {
573            4 => buf
574                .get(offset..offset + 4)
575                .map(|buf| LittleEndian::read_u32(buf) as u64),
576            8 => buf.get(offset..offset + 8).map(LittleEndian::read_u64),
577            _ => unreachable!(),
578        };
579        result.ok_or_else(|| {
580            crate::Error::corruption(
581                &index.path,
582                format!("cannot read {}-byte int at {}", int_size, offset),
583            )
584        })
585    }
586}
587
588/// Extract key_content from an untyped Offset. Internal use only.
589fn extract_key_content(index: &Index, key_offset: Offset) -> crate::Result<&[u8]> {
590    let typed_offset = key_offset.to_typed(index)?;
591    match typed_offset {
592        TypedOffset::Key(x) => Ok(x.key_content(index)?),
593        TypedOffset::ExtKey(x) => Ok(x.key_content(index)?),
594        _ => Err(index.corruption(format!("unexpected key type at {}", key_offset.0))),
595    }
596}
597
598impl LeafOffset {
599    /// Key content and link offsets of a leaf entry.
600    #[inline]
601    fn key_and_link_offset(self, index: &Index) -> crate::Result<(&[u8], LinkOffset)> {
602        if self.is_dirty() {
603            let e = &index.dirty_leafs[self.dirty_index()];
604            let key_content = extract_key_content(index, e.key_offset)?;
605            Ok((key_content, e.link_offset))
606        } else {
607            let (key_content, raw_link_offset) = match index.buf[usize::from(self)] {
608                TYPE_INLINE_LEAF => {
609                    let raw_key_offset = u64::from(self) + TYPE_BYTES as u64;
610                    // PERF: Consider skip checksum for this read.
611                    let key_offset = ExtKeyOffset::from_offset(
612                        Offset::from_disk(index, raw_key_offset)?,
613                        index,
614                    )?;
615                    // Avoid using key_content. Skip one checksum check.
616                    let (key_content, key_entry_size) =
617                        key_offset.key_content_and_entry_size_unchecked(index)?;
618                    let key_entry_size = key_entry_size.unwrap();
619                    let raw_link_offset = raw_key_offset + key_entry_size as u64;
620                    index.verify_checksum(u64::from(self), raw_link_offset - u64::from(self))?;
621                    (key_content, raw_link_offset)
622                }
623                TYPE_LEAF => {
624                    let (raw_key_offset, vlq_len): (u64, _) = index
625                        .buf
626                        .read_vlq_at(usize::from(self) + TYPE_BYTES)
627                        .context(
628                            index.path(),
629                            "cannot read key_offset in LeafOffset::key_and_link_offset",
630                        )
631                        .corruption()?;
632                    let key_offset = Offset::from_disk(index, raw_key_offset)?;
633                    let key_content = extract_key_content(index, key_offset)?;
634                    let (raw_link_offset, vlq_len2) = index
635                        .buf
636                        .read_vlq_at(usize::from(self) + TYPE_BYTES + vlq_len)
637                        .context(
638                            index.path(),
639                            "cannot read link_offset in LeafOffset::key_and_link_offset",
640                        )
641                        .corruption()?;
642                    index.verify_checksum(
643                        u64::from(self),
644                        (TYPE_BYTES + vlq_len + vlq_len2) as u64,
645                    )?;
646                    (key_content, raw_link_offset)
647                }
648                _ => unreachable!("bug: LeafOffset constructed with non-leaf types"),
649            };
650            let link_offset =
651                LinkOffset::from_offset(Offset::from_disk(index, raw_link_offset)?, index)?;
652            Ok((key_content, link_offset))
653        }
654    }
655
656    /// Create a new in-memory leaf entry. The key entry cannot be null.
657    #[inline]
658    fn create(index: &mut Index, link_offset: LinkOffset, key_offset: Offset) -> LeafOffset {
659        debug_assert!(!key_offset.is_null());
660        let len = index.dirty_leafs.len();
661        index.dirty_leafs.push(MemLeaf {
662            link_offset,
663            key_offset,
664        });
665        LeafOffset::from_dirty_index(len)
666    }
667
668    /// Update link_offset of a leaf entry in-place. Copy on write. Return the new leaf_offset
669    /// if it's copied from disk.
670    ///
671    /// Note: the old leaf is expected to be no longer needed. If that's not true, don't call
672    /// this function.
673    #[inline]
674    fn set_link(self, index: &mut Index, link_offset: LinkOffset) -> crate::Result<LeafOffset> {
675        if self.is_dirty() {
676            index.dirty_leafs[self.dirty_index()].link_offset = link_offset;
677            Ok(self)
678        } else {
679            let entry = MemLeaf::read_from(index, u64::from(self))?;
680            Ok(Self::create(index, link_offset, entry.key_offset))
681        }
682    }
683
684    /// Mark the entry as unused. An unused entry won't be written to disk.
685    /// No effect on an on-disk entry.
686    fn mark_unused(self, index: &mut Index) {
687        if self.is_dirty() {
688            let key_offset = index.dirty_leafs[self.dirty_index()].key_offset;
689            match key_offset.to_typed(&*index) {
690                Ok(TypedOffset::Key(x)) => x.mark_unused(index),
691                Ok(TypedOffset::ExtKey(x)) => x.mark_unused(index),
692                _ => {}
693            };
694            index.dirty_leafs[self.dirty_index()].mark_unused()
695        }
696    }
697}
698
699/// Iterator for values in the linked list
700pub struct LeafValueIter<'a> {
701    index: &'a Index,
702    offset: LinkOffset,
703    errored: bool,
704}
705
706impl<'a> Iterator for LeafValueIter<'a> {
707    type Item = crate::Result<u64>;
708
709    fn next(&mut self) -> Option<Self::Item> {
710        if self.offset.is_null() || self.errored {
711            None
712        } else {
713            match self.offset.value_and_next(self.index) {
714                Ok((value, next)) => {
715                    self.offset = next;
716                    Some(Ok(value))
717                }
718                Err(e) => {
719                    self.errored = true;
720                    Some(Err(e))
721                }
722            }
723        }
724    }
725}
726
727impl<'a> LeafValueIter<'a> {
728    pub fn is_empty(&self) -> bool {
729        self.offset.is_null()
730    }
731}
732
733/// Iterator returned by [`Index::range`].
734/// Provide access to full keys and values (as [`LinkOffset`]), sorted by key.
735pub struct RangeIter<'a> {
736    index: &'a Index,
737
738    // Stack about what is being visited, for `next`.
739    front_stack: Vec<IterState>,
740
741    // Stack about what is being visited, for `next_back`.
742    back_stack: Vec<IterState>,
743
744    // Completed. Either error out, or the iteration ends.
745    completed: bool,
746}
747
748impl<'a> RangeIter<'a> {
749    fn new(index: &'a Index, front_stack: Vec<IterState>, back_stack: Vec<IterState>) -> Self {
750        assert!(!front_stack.is_empty());
751        assert!(!back_stack.is_empty());
752        Self {
753            completed: front_stack.last() == back_stack.last(),
754            index,
755            front_stack,
756            back_stack,
757        }
758    }
759
760    /// Reconstruct "key" from the stack.
761    fn key(stack: &[IterState], index: &Index) -> crate::Result<Vec<u8>> {
762        // Reconstruct key. Collect base16 child stack (prefix + visiting),
763        // then convert to base256.
764        let mut prefix = Vec::with_capacity(stack.len() - 1);
765        for frame in stack.iter().take(stack.len() - 1).cloned() {
766            prefix.push(match frame {
767                // The frame contains the "current" child being visited.
768                IterState::RadixChild(_, child) if child < 16 => child,
769                _ => unreachable!("bug: malicious iterator state"),
770            })
771        }
772        if prefix.len() & 1 == 1 {
773            // Odd-length key
774            Err(index.corruption("unexpected odd-length key"))
775        } else {
776            Ok(base16_to_base256(&prefix))
777        }
778    }
779
780    /// Used by both `next` and `next_back`.
781    fn step(
782        index: &'a Index,
783        stack: &mut Vec<IterState>,
784        towards: Side,
785        exclusive: IterState,
786    ) -> Option<crate::Result<(Cow<'a, [u8]>, LinkOffset)>> {
787        loop {
788            let state = match stack.pop().unwrap().step(towards) {
789                // Pop. Visit next.
790                None => continue,
791                Some(state) => state,
792            };
793
794            if state == exclusive {
795                // Stop iteration.
796                return None;
797            }
798
799            // Write down what's being visited.
800            stack.push(state);
801
802            return match state {
803                IterState::RadixChild(radix, child) => match radix.child(index, child) {
804                    Ok(next_offset) if next_offset.is_null() => continue,
805                    Ok(next_offset) => match next_offset.to_typed(index) {
806                        Ok(TypedOffset::Radix(next_radix)) => {
807                            stack.push(match towards {
808                                Front => IterState::RadixEnd(next_radix),
809                                Back => IterState::RadixStart(next_radix),
810                            });
811                            continue;
812                        }
813                        Ok(TypedOffset::Leaf(next_leaf)) => {
814                            stack.push(match towards {
815                                Front => IterState::LeafEnd(next_leaf),
816                                Back => IterState::LeafStart(next_leaf),
817                            });
818                            continue;
819                        }
820                        Ok(_) => Some(Err(index.corruption("unexpected type during iteration"))),
821                        Err(err) => Some(Err(err)),
822                    },
823                    Err(err) => Some(Err(err)),
824                },
825                IterState::RadixLeaf(radix) => match radix.link_offset(index) {
826                    Ok(link_offset) if link_offset.is_null() => continue,
827                    Ok(link_offset) => match Self::key(stack, index) {
828                        Ok(key) => Some(Ok((Cow::Owned(key), link_offset))),
829                        Err(err) => Some(Err(err)),
830                    },
831                    Err(err) => Some(Err(err)),
832                },
833                IterState::Leaf(leaf) => match leaf.key_and_link_offset(index) {
834                    Ok((key, link_offset)) => Some(Ok((Cow::Borrowed(key), link_offset))),
835                    Err(err) => Some(Err(err)),
836                },
837                IterState::RadixEnd(_)
838                | IterState::RadixStart(_)
839                | IterState::LeafStart(_)
840                | IterState::LeafEnd(_) => {
841                    continue;
842                }
843            };
844        }
845    }
846}
847
848impl<'a> Iterator for RangeIter<'a> {
849    type Item = crate::Result<(Cow<'a, [u8]>, LinkOffset)>;
850
851    /// Return the next key and corresponding [`LinkOffset`].
852    fn next(&mut self) -> Option<Self::Item> {
853        if self.completed {
854            return None;
855        }
856        let exclusive = self.back_stack.last().cloned().unwrap();
857        let result = Self::step(self.index, &mut self.front_stack, Back, exclusive);
858        match result {
859            Some(Err(_)) | None => self.completed = true,
860            _ => {}
861        }
862        result
863    }
864}
865
866impl<'a> DoubleEndedIterator for RangeIter<'a> {
867    /// Return the next key and corresponding [`LinkOffset`], from the end of
868    /// the iterator.
869    fn next_back(&mut self) -> Option<Self::Item> {
870        if self.completed {
871            return None;
872        }
873        let exclusive = self.front_stack.last().cloned().unwrap();
874        let result = Self::step(self.index, &mut self.back_stack, Front, exclusive);
875        match result {
876            Some(Err(_)) | None => self.completed = true,
877            _ => {}
878        }
879        result
880    }
881}
882
883impl LinkOffset {
884    /// Iterating through values referred by this linked list.
885    pub fn values(self, index: &Index) -> LeafValueIter<'_> {
886        LeafValueIter {
887            errored: false,
888            index,
889            offset: self,
890        }
891    }
892
893    /// Get value, and the next link offset.
894    #[inline]
895    fn value_and_next(self, index: &Index) -> crate::Result<(u64, LinkOffset)> {
896        if self.is_dirty() {
897            let e = &index.dirty_links[self.dirty_index()];
898            Ok((e.value, e.next_link_offset))
899        } else {
900            let (value, vlq_len) = index
901                .buf
902                .read_vlq_at(usize::from(self) + TYPE_BYTES)
903                .context(
904                    index.path(),
905                    "cannot read link_value in LinkOffset::value_and_next",
906                )
907                .corruption()?;
908            let (next_link, vlq_len2) = index
909                .buf
910                .read_vlq_at(usize::from(self) + TYPE_BYTES + vlq_len)
911                .context(
912                    index.path(),
913                    "cannot read next_link_offset in LinkOffset::value_and_next",
914                )
915                .corruption()?;
916            index.verify_checksum(u64::from(self), (TYPE_BYTES + vlq_len + vlq_len2) as u64)?;
917            let next_link = LinkOffset::from_offset(Offset::from_disk(index, next_link)?, index)?;
918            Ok((value, next_link))
919        }
920    }
921
922    /// Create a new link entry that chains this entry.
923    /// Return new `LinkOffset`
924    fn create(self, index: &mut Index, value: u64) -> LinkOffset {
925        let new_link = MemLink {
926            value,
927            next_link_offset: self,
928            unused: false,
929        };
930        let len = index.dirty_links.len();
931        index.dirty_links.push(new_link);
932        LinkOffset::from_dirty_index(len)
933    }
934}
935
936impl KeyOffset {
937    /// Key content of a key entry.
938    #[inline]
939    fn key_content(self, index: &Index) -> crate::Result<&[u8]> {
940        if self.is_dirty() {
941            Ok(&index.dirty_keys[self.dirty_index()].key[..])
942        } else {
943            let (key_len, vlq_len): (usize, _) = index
944                .buf
945                .read_vlq_at(usize::from(self) + TYPE_BYTES)
946                .context(
947                    index.path(),
948                    "cannot read key_len in KeyOffset::key_content",
949                )
950                .corruption()?;
951            let start = usize::from(self) + TYPE_BYTES + vlq_len;
952            let end = start + key_len;
953            index.verify_checksum(u64::from(self), end as u64 - u64::from(self))?;
954            if end > index.buf.len() {
955                Err(index.range_error(start, end - start))
956            } else {
957                Ok(&index.buf[start..end])
958            }
959        }
960    }
961
962    /// Create a new in-memory key entry. The key cannot be empty.
963    #[inline]
964    fn create(index: &mut Index, key: &[u8]) -> KeyOffset {
965        debug_assert!(!key.is_empty());
966        let len = index.dirty_keys.len();
967        index.dirty_keys.push(MemKey {
968            key: Vec::from(key).into_boxed_slice(),
969        });
970        KeyOffset::from_dirty_index(len)
971    }
972
973    /// Mark the entry as unused. An unused entry won't be written to disk.
974    /// No effect on an on-disk entry.
975    fn mark_unused(self, index: &mut Index) {
976        if self.is_dirty() {
977            index.dirty_keys[self.dirty_index()].mark_unused();
978        }
979    }
980}
981
982impl ExtKeyOffset {
983    /// Key content of a key entry.
984    #[inline]
985    fn key_content(self, index: &Index) -> crate::Result<&[u8]> {
986        let (key_content, entry_size) = self.key_content_and_entry_size_unchecked(index)?;
987        if let Some(entry_size) = entry_size {
988            index.verify_checksum(u64::from(self), entry_size as u64)?;
989        }
990        Ok(key_content)
991    }
992
993    /// Key content and key entry size. Used internally.
994    #[inline]
995    fn key_content_and_entry_size_unchecked(
996        self,
997        index: &Index,
998    ) -> crate::Result<(&[u8], Option<usize>)> {
999        let (start, len, entry_size) = if self.is_dirty() {
1000            let e = &index.dirty_ext_keys[self.dirty_index()];
1001            (e.start, e.len, None)
1002        } else {
1003            let (start, vlq_len1): (u64, _) = index
1004                .buf
1005                .read_vlq_at(usize::from(self) + TYPE_BYTES)
1006                .context(
1007                    index.path(),
1008                    "cannot read ext_key_start in ExtKeyOffset::key_content",
1009                )
1010                .corruption()?;
1011            let (len, vlq_len2): (u64, _) = index
1012                .buf
1013                .read_vlq_at(usize::from(self) + TYPE_BYTES + vlq_len1)
1014                .context(
1015                    index.path(),
1016                    "cannot read ext_key_len in ExtKeyOffset::key_content",
1017                )
1018                .corruption()?;
1019            (start, len, Some(TYPE_BYTES + vlq_len1 + vlq_len2))
1020        };
1021        let key_buf = index.key_buf.as_ref();
1022        let key_content = match key_buf.slice(start, len) {
1023            Some(k) => k,
1024            None => {
1025                return Err(index.corruption(format!(
1026                    "key buffer is invalid when reading referred keys at {}",
1027                    start
1028                )));
1029            }
1030        };
1031        Ok((key_content, entry_size))
1032    }
1033
1034    /// Create a new in-memory external key entry. The key cannot be empty.
1035    #[inline]
1036    fn create(index: &mut Index, start: u64, len: u64) -> ExtKeyOffset {
1037        debug_assert!(len > 0);
1038        let i = index.dirty_ext_keys.len();
1039        index.dirty_ext_keys.push(MemExtKey { start, len });
1040        ExtKeyOffset::from_dirty_index(i)
1041    }
1042
1043    /// Mark the entry as unused. An unused entry won't be written to disk.
1044    /// No effect on an on-disk entry.
1045    fn mark_unused(self, index: &mut Index) {
1046        if self.is_dirty() {
1047            index.dirty_ext_keys[self.dirty_index()].mark_unused();
1048        }
1049    }
1050}
1051
1052/// Check type for an on-disk entry
1053fn check_type(index: impl IndexBuf, offset: usize, expected: u8) -> crate::Result<()> {
1054    let typeint = *(index
1055        .buf()
1056        .get(offset)
1057        .ok_or_else(|| index.range_error(offset, 1))?);
1058    if typeint != expected {
1059        Err(index.corruption(format!(
1060            "type mismatch at offset {} expected {} but got {}",
1061            offset, expected, typeint
1062        )))
1063    } else {
1064        Ok(())
1065    }
1066}
1067
1068impl MemRadix {
1069    fn read_from(index: &Index, offset: u64) -> crate::Result<Self> {
1070        let buf = &index.buf;
1071        let offset = offset as usize;
1072        let mut pos = 0;
1073
1074        // Integrity check is done at the end to reduce overhead.
1075        check_type(index, offset, TYPE_RADIX)?;
1076        pos += TYPE_BYTES;
1077
1078        let flag = *buf
1079            .get(offset + pos)
1080            .ok_or_else(|| index.range_error(offset + pos, 1))?;
1081        pos += RADIX_FLAG_BYTES;
1082
1083        let bitmap = RadixOffset::read_bitmap_unchecked(index, offset + pos)?;
1084        pos += RADIX_BITMAP_BYTES;
1085
1086        let int_size = RadixOffset::parse_int_size_from_flag(flag);
1087
1088        let mut offsets = [Offset::default(); 16];
1089        for (i, o) in offsets.iter_mut().enumerate() {
1090            if (bitmap >> i) & 1 == 1 {
1091                *o = Offset::from_disk(
1092                    index,
1093                    RadixOffset::read_raw_int_unchecked(index, int_size, offset + pos)?,
1094                )?;
1095                pos += int_size;
1096            }
1097        }
1098
1099        let link_offset = if RadixOffset::parse_have_link_from_flag(flag) {
1100            let raw_offset = RadixOffset::read_raw_int_unchecked(index, int_size, offset + pos)?;
1101            pos += int_size;
1102            LinkOffset::from_offset(Offset::from_disk(index, raw_offset)?, index)?
1103        } else {
1104            LinkOffset::default()
1105        };
1106
1107        index.verify_checksum(offset as u64, pos as u64)?;
1108
1109        Ok(MemRadix {
1110            offsets,
1111            link_offset,
1112        })
1113    }
1114
1115    fn write_to<W: Write>(&self, writer: &mut W, offset_map: &OffsetMap) -> io::Result<()> {
1116        // Prepare data to write
1117        let mut flag = 0;
1118        let mut bitmap = 0;
1119        let u32_max = ::std::u32::MAX as u64;
1120
1121        let link_offset = if !self.link_offset.is_null() {
1122            flag |= RADIX_FLAG_HAVE_LINK;
1123            let link_offset = self.link_offset.to_disk(offset_map);
1124            if link_offset > u32_max {
1125                flag |= RADIX_FLAG_USE_64BIT;
1126            }
1127            link_offset
1128        } else {
1129            0
1130        };
1131
1132        let mut child_offsets = [0u64; 16];
1133        for (i, child_offset) in self.offsets.iter().enumerate() {
1134            if !child_offset.is_null() {
1135                bitmap |= 1u16 << i;
1136                let child_offset = child_offset.to_disk(offset_map);
1137                if child_offset > u32_max {
1138                    flag |= RADIX_FLAG_USE_64BIT;
1139                }
1140                child_offsets[i] = child_offset;
1141            }
1142        }
1143
1144        // Write them
1145        writer.write_all(&[TYPE_RADIX, flag])?;
1146        writer.write_u16::<LittleEndian>(bitmap)?;
1147
1148        if flag & RADIX_FLAG_USE_64BIT != 0 {
1149            for &child_offset in child_offsets.iter() {
1150                if child_offset > 0 {
1151                    writer.write_u64::<LittleEndian>(child_offset)?;
1152                }
1153            }
1154            if link_offset > 0 {
1155                writer.write_u64::<LittleEndian>(link_offset)?;
1156            }
1157        } else {
1158            for &child_offset in child_offsets.iter() {
1159                if child_offset > 0 {
1160                    writer.write_u32::<LittleEndian>(child_offset as u32)?;
1161                }
1162            }
1163            if link_offset > 0 {
1164                writer.write_u32::<LittleEndian>(link_offset as u32)?;
1165            }
1166        }
1167        Ok(())
1168    }
1169}
1170
1171impl MemLeaf {
1172    fn read_from(index: &Index, offset: u64) -> crate::Result<Self> {
1173        let buf = &index.buf;
1174        let offset = offset as usize;
1175        match buf.get(offset) {
1176            Some(&TYPE_INLINE_LEAF) => {
1177                let key_offset = offset + TYPE_BYTES;
1178                // Skip the key part
1179                let offset = key_offset + TYPE_BYTES;
1180                let (_key_start, vlq_len): (u64, _) = buf
1181                    .read_vlq_at(offset)
1182                    .context(index.path(), "cannot read key_start in MemLeaf::read_from")
1183                    .corruption()?;
1184                let offset = offset + vlq_len;
1185                let (_key_len, vlq_len): (u64, _) = buf
1186                    .read_vlq_at(offset)
1187                    .context(index.path(), "cannot read key_len in MemLeaf::read_from")
1188                    .corruption()?;
1189                let offset = offset + vlq_len;
1190                // Checksum will be verified by ExtKey and Leaf nodes
1191                let key_offset = Offset::from_disk(index, key_offset as u64)?;
1192                let link_offset =
1193                    LinkOffset::from_offset(Offset::from_disk(index, offset as u64)?, index)?;
1194                Ok(MemLeaf {
1195                    key_offset,
1196                    link_offset,
1197                })
1198            }
1199            Some(&TYPE_LEAF) => {
1200                let (key_offset, len1) = buf
1201                    .read_vlq_at(offset + TYPE_BYTES)
1202                    .context(index.path(), "cannot read key_offset in MemLeaf::read_from")
1203                    .corruption()?;
1204                let key_offset = Offset::from_disk(index, key_offset)?;
1205                let (link_offset, len2) = buf
1206                    .read_vlq_at(offset + TYPE_BYTES + len1)
1207                    .context(
1208                        index.path(),
1209                        "cannot read link_offset in MemLeaf::read_from",
1210                    )
1211                    .corruption()?;
1212                let link_offset =
1213                    LinkOffset::from_offset(Offset::from_disk(index, link_offset)?, index)?;
1214                index.verify_checksum(offset as u64, (TYPE_BYTES + len1 + len2) as u64)?;
1215                Ok(MemLeaf {
1216                    key_offset,
1217                    link_offset,
1218                })
1219            }
1220            _ => Err(index.range_error(offset, 1)),
1221        }
1222    }
1223
1224    /// If the entry is suitable for writing inline, write an inline entry, mark dependent
1225    /// entries as "unused", and return `true`. Otherwise do nothing and return `false`.
1226    ///
1227    /// The caller probably wants to set this entry to "unused" to prevent writing twice,
1228    /// if true is returned.
1229    fn maybe_write_inline_to(
1230        &self,
1231        writer: &mut Vec<u8>,
1232        buf: &[u8],
1233        buf_offset: u64,
1234        dirty_ext_keys: &mut Vec<MemExtKey>,
1235        dirty_links: &mut Vec<MemLink>,
1236        offset_map: &mut OffsetMap,
1237    ) -> crate::Result<bool> {
1238        debug_assert!(!self.is_unused());
1239
1240        // Conditions to be inlined:
1241        // - Both Key and Link are dirty (in-memory). Otherwise this might waste space.
1242        // - Key is ExtKey. This is just to make implementation easier. Owned key support might be
1243        // added in the future.
1244        // - Link does not refer to another in-memory link that hasn't been written yet (i.e.
1245        //   does not exist in offset_map). This is just to make implementation easier.
1246
1247        let are_dependencies_dirty = self.key_offset.is_dirty() && self.link_offset.is_dirty();
1248
1249        if are_dependencies_dirty {
1250            // Not being able to read the key offset is not a fatal error here - it
1251            // disables the inline optimization. But everything else works just fine.
1252            if let Some(TypedOffset::ExtKey(key_offset)) = self.key_offset.to_optional_typed(buf) {
1253                let ext_key_index = key_offset.dirty_index();
1254                let link_index = self.link_offset.dirty_index();
1255                let ext_key = dirty_ext_keys.get_mut(ext_key_index).unwrap();
1256                let link = dirty_links.get_mut(link_index).unwrap();
1257
1258                let next_link_offset = link.next_link_offset;
1259                if next_link_offset.is_dirty()
1260                    && offset_map.link_map[next_link_offset.dirty_index()] == 0
1261                {
1262                    // Dependent Link is not written yet.
1263                    return Ok(false);
1264                }
1265
1266                // Header
1267                writer.write_all(&[TYPE_INLINE_LEAF]).infallible()?;
1268
1269                // Inlined ExtKey
1270                let offset = buf.len() as u64 + buf_offset;
1271                offset_map.ext_key_map[ext_key_index] = offset;
1272                ext_key.write_to(writer, offset_map).infallible()?;
1273
1274                // Inlined Link
1275                let offset = buf.len() as u64 + buf_offset;
1276                offset_map.link_map[ext_key_index] = offset;
1277                link.write_to(writer, offset_map).infallible()?;
1278
1279                ext_key.mark_unused();
1280                link.mark_unused();
1281
1282                Ok(true)
1283            } else {
1284                // InlineLeaf only supports ExtKey, not embedded Key.
1285                Ok(false)
1286            }
1287        } else {
1288            Ok(false)
1289        }
1290    }
1291
1292    /// Write a Leaf entry.
1293    fn write_noninline_to<W: Write>(
1294        &self,
1295        writer: &mut W,
1296        offset_map: &OffsetMap,
1297    ) -> io::Result<()> {
1298        debug_assert!(!self.is_unused());
1299        writer.write_all(&[TYPE_LEAF])?;
1300        writer.write_vlq(self.key_offset.to_disk(offset_map))?;
1301        writer.write_vlq(self.link_offset.to_disk(offset_map))?;
1302        Ok(())
1303    }
1304
1305    /// Mark the entry as unused. An unused entry won't be written to disk.
1306    fn mark_unused(&mut self) {
1307        self.key_offset = Offset::default();
1308    }
1309
1310    #[inline]
1311    fn is_unused(&self) -> bool {
1312        self.key_offset.is_null()
1313    }
1314}
1315
1316impl MemLink {
1317    fn read_from(index: impl IndexBuf, offset: u64) -> crate::Result<Self> {
1318        let buf = index.buf();
1319        let offset = offset as usize;
1320        check_type(&index, offset, TYPE_LINK)?;
1321        let (value, len1) = buf
1322            .read_vlq_at(offset + 1)
1323            .context(index.path(), "cannot read link_value in MemLink::read_from")
1324            .corruption()?;
1325        let (next_link_offset, len2) = buf
1326            .read_vlq_at(offset + TYPE_BYTES + len1)
1327            .context(
1328                index.path(),
1329                "cannot read next_link_offset in MemLink::read_from",
1330            )
1331            .corruption()?;
1332        let next_link_offset =
1333            LinkOffset::from_offset(Offset::from_disk(&index, next_link_offset)?, &index)?;
1334        index.verify_checksum(offset as u64, (TYPE_BYTES + len1 + len2) as u64)?;
1335        Ok(MemLink {
1336            value,
1337            next_link_offset,
1338            unused: false,
1339        })
1340    }
1341
1342    fn write_to<W: Write>(&self, writer: &mut W, offset_map: &OffsetMap) -> io::Result<()> {
1343        writer.write_all(&[TYPE_LINK])?;
1344        writer.write_vlq(self.value)?;
1345        writer.write_vlq(self.next_link_offset.to_disk(offset_map))?;
1346        Ok(())
1347    }
1348
1349    /// Mark the entry as unused. An unused entry won't be written to disk.
1350    fn mark_unused(&mut self) {
1351        self.unused = true;
1352    }
1353
1354    #[inline]
1355    fn is_unused(&self) -> bool {
1356        self.unused
1357    }
1358}
1359
1360impl MemKey {
1361    fn read_from(index: impl IndexBuf, offset: u64) -> crate::Result<Self> {
1362        let buf = index.buf();
1363        let offset = offset as usize;
1364        check_type(&index, offset, TYPE_KEY)?;
1365        let (key_len, len): (usize, _) = buf
1366            .read_vlq_at(offset + 1)
1367            .context(index.path(), "cannot read key_len in MemKey::read_from")
1368            .corruption()?;
1369        let key = Vec::from(
1370            buf.get(offset + TYPE_BYTES + len..offset + TYPE_BYTES + len + key_len)
1371                .ok_or_else(|| index.range_error(offset + TYPE_BYTES + len, key_len))?,
1372        )
1373        .into_boxed_slice();
1374        index.verify_checksum(offset as u64, (TYPE_BYTES + len + key_len) as u64)?;
1375        Ok(MemKey { key })
1376    }
1377
1378    fn write_to<W: Write>(&self, writer: &mut W, _: &OffsetMap) -> io::Result<()> {
1379        writer.write_all(&[TYPE_KEY])?;
1380        writer.write_vlq(self.key.len())?;
1381        writer.write_all(&self.key)?;
1382        Ok(())
1383    }
1384
1385    /// Mark the entry as unused. An unused entry won't be written to disk.
1386    fn mark_unused(&mut self) {
1387        self.key = Vec::new().into_boxed_slice();
1388    }
1389
1390    #[inline]
1391    fn is_unused(&self) -> bool {
1392        self.key.len() == 0
1393    }
1394}
1395
1396impl MemExtKey {
1397    fn read_from(index: impl IndexBuf, offset: u64) -> crate::Result<Self> {
1398        let buf = index.buf();
1399        let offset = offset as usize;
1400        check_type(&index, offset, TYPE_EXT_KEY)?;
1401        let (start, vlq_len1) = buf
1402            .read_vlq_at(offset + TYPE_BYTES)
1403            .context(
1404                index.path(),
1405                "cannot read ext_key_start in MemExtKey::read_from",
1406            )
1407            .corruption()?;
1408        let (len, vlq_len2) = buf
1409            .read_vlq_at(offset + TYPE_BYTES + vlq_len1)
1410            .context(
1411                index.path(),
1412                "cannot read ext_key_len in MemExtKey::read_from",
1413            )
1414            .corruption()?;
1415        index.verify_checksum(offset as u64, (TYPE_BYTES + vlq_len1 + vlq_len2) as u64)?;
1416        Ok(MemExtKey { start, len })
1417    }
1418
1419    fn write_to<W: Write>(&self, writer: &mut W, _: &OffsetMap) -> io::Result<()> {
1420        writer.write_all(&[TYPE_EXT_KEY])?;
1421        writer.write_vlq(self.start)?;
1422        writer.write_vlq(self.len)?;
1423        Ok(())
1424    }
1425
1426    /// Mark the entry as unused. An unused entry won't be written to disk.
1427    fn mark_unused(&mut self) {
1428        self.len = 0;
1429    }
1430
1431    #[inline]
1432    fn is_unused(&self) -> bool {
1433        self.len == 0
1434    }
1435}
1436
1437impl MemRoot {
1438    fn read_from(index: impl IndexBuf, offset: u64) -> crate::Result<(Self, usize)> {
1439        let offset = offset as usize;
1440        let mut cur = offset;
1441        check_type(&index, offset, TYPE_ROOT)?;
1442        cur += TYPE_BYTES;
1443
1444        let (radix_offset, vlq_len) = index
1445            .buf()
1446            .read_vlq_at(cur)
1447            .context(index.path(), "cannot read radix_offset")
1448            .corruption()?;
1449        cur += vlq_len;
1450
1451        let radix_offset =
1452            RadixOffset::from_offset(Offset::from_disk(&index, radix_offset)?, &index)?;
1453
1454        let (meta_len, vlq_len): (usize, _) = index
1455            .buf()
1456            .read_vlq_at(cur)
1457            .context(index.path(), "cannot read meta_len")
1458            .corruption()?;
1459        cur += vlq_len;
1460
1461        let meta = index
1462            .buf()
1463            .get(cur..cur + meta_len)
1464            .ok_or_else(|| index.range_error(cur, meta_len))?;
1465        cur += meta_len;
1466
1467        index.verify_checksum(offset as u64, (cur - offset) as u64)?;
1468        Ok((
1469            MemRoot {
1470                radix_offset,
1471                meta: meta.to_vec().into_boxed_slice(),
1472            },
1473            cur - offset,
1474        ))
1475    }
1476
1477    fn write_to<W: Write>(&self, writer: &mut W, offset_map: &OffsetMap) -> io::Result<usize> {
1478        let mut buf = Vec::with_capacity(16);
1479        buf.write_all(&[TYPE_ROOT])?;
1480        buf.write_vlq(self.radix_offset.to_disk(offset_map))?;
1481        buf.write_vlq(self.meta.len())?;
1482        buf.write_all(&self.meta)?;
1483        let len = buf.len();
1484        writer.write_all(&buf)?;
1485        Ok(len)
1486    }
1487}
1488
1489impl MemChecksum {
1490    fn read_from(index: &impl IndexBuf, start_offset: u64) -> crate::Result<(Self, usize)> {
1491        let span = debug_span!("Checksum::read", offset = start_offset, chain_len = 0);
1492        let _entered = span.enter();
1493        let mut result = Self::default();
1494        let mut size: usize = 0;
1495
1496        let mut offset = start_offset as usize;
1497        while offset > 0 {
1498            let mut cur: usize = offset;
1499
1500            check_type(index, cur, TYPE_CHECKSUM)?;
1501            cur += TYPE_BYTES;
1502
1503            let (previous_offset, vlq_len): (u64, _) = index
1504                .buf()
1505                .read_vlq_at(cur)
1506                .context(index.path(), "cannot read previous_checksum_offset")
1507                .corruption()?;
1508            cur += vlq_len;
1509
1510            let (chunk_size_logarithm, vlq_len): (u32, _) = index
1511                .buf()
1512                .read_vlq_at(cur)
1513                .context(index.path(), "cannot read chunk_size_logarithm")
1514                .corruption()?;
1515
1516            if chunk_size_logarithm > 31 {
1517                return Err(crate::Error::corruption(
1518                    index.path(),
1519                    format!(
1520                        "invalid chunk_size_logarithm {} at {}",
1521                        chunk_size_logarithm, cur
1522                    ),
1523                ));
1524            }
1525            cur += vlq_len;
1526
1527            let chunk_size = 1usize << chunk_size_logarithm;
1528            let chunk_needed = (offset + chunk_size - 1) >> chunk_size_logarithm;
1529
1530            let is_initial_checksum = start_offset == offset as u64;
1531
1532            // Initialize our Self result in our first iteration.
1533            if is_initial_checksum {
1534                result.set_chunk_size_logarithm(index.buf(), chunk_size_logarithm)?;
1535
1536                result.xxhash_list.resize(chunk_needed, 0);
1537                result.start = previous_offset;
1538                result.end = offset as u64;
1539
1540                let checked_needed = (result.xxhash_list.len() + 63) / 64;
1541                result.checked.resize_with(checked_needed, Default::default);
1542            }
1543            result.chain_len = result.chain_len.saturating_add(1);
1544
1545            //    0     1     2     3     4     5
1546            // |chunk|chunk|chunk|chunk|chunk|chunk|
1547            // |--------------|-----------------|---
1548            // 0              ^ previous        ^ offset
1549            //
1550            // The previous Checksum entry covers chunk 0,1,2(incomplete).
1551            // This Checksum entry covers chunk 2(complete),3,4,5(incomplete).
1552
1553            // Read the new checksums.
1554            let start_chunk_index = (previous_offset >> chunk_size_logarithm) as usize;
1555            for i in start_chunk_index..chunk_needed {
1556                let incomplete_chunk = offset % chunk_size > 0 && i == chunk_needed - 1;
1557
1558                // Don't record incomplete chunk hashes for previous checksums
1559                // since the "next" checksum (i.e. previous loop iteration) will
1560                // have already written the complete chunk's hash.
1561                if is_initial_checksum || !incomplete_chunk {
1562                    result.xxhash_list[i] = (&index.buf()[cur..])
1563                        .read_u64::<LittleEndian>()
1564                        .context(index.path(), "cannot read xxhash for checksum")?;
1565                }
1566                cur += 8;
1567            }
1568
1569            // Check the checksum buffer itself.
1570            let xx32_read = (&index.buf()[cur..])
1571                .read_u32::<LittleEndian>()
1572                .context(index.path(), "cannot read xxhash32 for checksum")?;
1573            let xx32_self = xxhash32(&index.buf()[offset..cur]);
1574            if xx32_read != xx32_self {
1575                return Err(crate::Error::corruption(
1576                    index.path(),
1577                    format!(
1578                        "checksum at {} fails integrity check ({} != {})",
1579                        offset, xx32_read, xx32_self
1580                    ),
1581                ));
1582            }
1583            cur += 4;
1584
1585            if is_initial_checksum {
1586                size = cur - offset;
1587            }
1588
1589            offset = previous_offset as usize;
1590        }
1591        span.record("chain_len", result.chain_len);
1592
1593        Ok((result, size))
1594    }
1595
1596    /// Incrementally update the checksums so it covers the file content + `append_buf`.
1597    /// Assume the file content is append-only.
1598    /// Call this before calling `write_to`.
1599    fn update(
1600        &mut self,
1601        old_buf: &[u8],
1602        file: &mut File,
1603        file_len: u64,
1604        append_buf: &[u8],
1605    ) -> io::Result<()> {
1606        let start_chunk_index = (self.end >> self.chunk_size_logarithm) as usize;
1607        let start_chunk_offset = (start_chunk_index as u64) << self.chunk_size_logarithm;
1608
1609        // Check the range that is being rewritten.
1610        let old_end = (old_buf.len() as u64).min(self.end);
1611        if old_end > start_chunk_offset {
1612            self.check_range(old_buf, start_chunk_offset, old_end - start_chunk_offset)?;
1613        }
1614
1615        let new_total_len = file_len + append_buf.len() as u64;
1616        let chunk_size = 1u64 << self.chunk_size_logarithm;
1617        let chunk_needed = ((new_total_len + chunk_size - 1) >> self.chunk_size_logarithm) as usize;
1618        self.xxhash_list.resize(chunk_needed, 0);
1619
1620        if self.end > file_len {
1621            return Err(io::Error::new(
1622                io::ErrorKind::UnexpectedEof,
1623                "unexpected truncation",
1624            ));
1625        }
1626
1627        //        start_chunk_offset (start of chunk including self.end)
1628        //                   v
1629        //                   |(1)|
1630        // |chunk|chunk|chunk|chunk|chunk|chunk|chunk|chunk|chunk|
1631        //                   |---file_buf---|
1632        // |--------------file--------------|
1633        // |-------old_buf-------|
1634        // |-----(2)-----|--(3)--|---(4)----|---append_buf---|
1635        //               ^       ^          ^                ^
1636        //          self.start self.end   file_len     new_total_len
1637        //
1638        // (1): range being re-checksummed
1639        // (2): covered by the previous Checksum
1640        // (3): covered by the current Checksum
1641        // (4): range written on-disk after `Index::open` (ex. by another process)
1642        //
1643        // old_buf:    buffer read at open time.
1644        // file:       buffer read at flush time (right now, with a lock).
1645        // append_buf: buffer to append to the file (protected by the same lock).
1646
1647        let file_buf = {
1648            let mut file_buf = vec![0; (file_len - start_chunk_offset) as usize];
1649            file.seek(SeekFrom::Start(start_chunk_offset))?;
1650            file.read_exact(&mut file_buf)?;
1651            file_buf
1652        };
1653
1654        for i in start_chunk_index..self.xxhash_list.len() {
1655            let start = (i as u64) << self.chunk_size_logarithm;
1656            let end = (start + chunk_size).min(new_total_len);
1657            let mut xx = XxHash::default();
1658            // Hash portions of file_buf that intersect with start..end.
1659            let file_buf_start = ((start - start_chunk_offset) as usize).min(file_buf.len());
1660            let file_buf_end = ((end - start_chunk_offset) as usize).min(file_buf.len());
1661            xx.write(&file_buf[file_buf_start..file_buf_end]);
1662            // Hash portions of append_buf that intersect with start..end.
1663            let append_buf_start = (start.max(file_len) - file_len) as usize;
1664            let append_buf_end = (end.max(file_len) - file_len) as usize;
1665            xx.write(&append_buf[append_buf_start..append_buf_end]);
1666            self.xxhash_list[i] = xx.finish();
1667        }
1668
1669        // Update start and end:
1670        //
1671        //      chunk | chunk
1672        //      start   end (both are valid ChecksumOffset, in different chunks)
1673        //      v       v
1674        // old: |-------|
1675        // new:         |--------|----(1)----|
1676        //              ^        ^
1677        //              start    new_total_len
1678        //
1679        // (1): Place that this Checksum will be written to.
1680        //
1681        // Avoid updating start if start and end are in a same chunk:
1682        //
1683        //       -----chunk----|
1684        //       start     end
1685        //       v         v
1686        // old:  |---------|
1687        // new:  |--------------------|
1688        //       ^                    ^
1689        //       start                end
1690        //
1691        // This makes the length of chain of Checksums O(len(chunks)).
1692        if (self.start >> self.chunk_size_logarithm) != (self.end >> self.chunk_size_logarithm) {
1693            self.start = self.end;
1694        }
1695        self.end = new_total_len;
1696
1697        Ok(())
1698    }
1699
1700    /// Extend the checksum range to cover the entire range of the index buffer
1701    /// so the next open would only read O(1) checksum entries.
1702    fn flatten(&mut self) {
1703        self.start = 0;
1704        self.chain_len = 1;
1705    }
1706
1707    fn write_to<W: Write>(&self, writer: &mut W, _offset_map: &OffsetMap) -> io::Result<usize> {
1708        let mut buf = Vec::with_capacity(16);
1709        buf.write_all(&[TYPE_CHECKSUM])?;
1710        buf.write_vlq(self.start)?;
1711        // self.end is implied by the write position.
1712        buf.write_vlq(self.chunk_size_logarithm)?;
1713        for &xx in self.xxhash_list_to_write() {
1714            buf.write_u64::<LittleEndian>(xx)?;
1715        }
1716        let xx32 = xxhash32(&buf);
1717        buf.write_u32::<LittleEndian>(xx32)?;
1718        writer.write_all(&buf)?;
1719        Ok(buf.len())
1720    }
1721
1722    /// Return a sub list of the xxhash list that is not covered by the
1723    /// previous Checksum entry. It's used for serialization.
1724    fn xxhash_list_to_write(&self) -> &[u64] {
1725        // See the comment in `read_from` for `start_chunk_index`.
1726        // It is the starting index for
1727        let start_chunk_index = (self.start >> self.chunk_size_logarithm) as usize;
1728        &self.xxhash_list[start_chunk_index..]
1729    }
1730
1731    /// Reset the `chunk_size_logarithm`.
1732    fn set_chunk_size_logarithm(
1733        &mut self,
1734        _buf: &[u8],
1735        chunk_size_logarithm: u32,
1736    ) -> crate::Result<()> {
1737        // Change chunk_size_logarithm if the checksum list is empty.
1738        if self.xxhash_list.is_empty() {
1739            self.chunk_size_logarithm = chunk_size_logarithm;
1740        }
1741        // NOTE: Consider re-hashing and allow changing the chunk_size_logarithm.
1742        Ok(())
1743    }
1744
1745    /// Check a range of bytes.
1746    ///
1747    /// Depending on `chunk_size_logarithm`, bytes outside the specified range
1748    /// might also be checked.
1749    #[inline]
1750    fn check_range(&self, buf: &[u8], offset: u64, length: u64) -> io::Result<()> {
1751        if length == 0 || !self.is_enabled() {
1752            return Ok(());
1753        }
1754
1755        // Ranges not covered by checksums are treated as bad.
1756        if offset + length > self.end {
1757            return checksum_error(self, offset, length);
1758        }
1759
1760        // Otherwise, scan related chunks.
1761        let start = (offset >> self.chunk_size_logarithm) as usize;
1762        let end = ((offset + length - 1) >> self.chunk_size_logarithm) as usize;
1763        if !(start..=end).all(|i| self.check_chunk(buf, i)) {
1764            return checksum_error(self, offset, length);
1765        }
1766        Ok(())
1767    }
1768
1769    /// Check the i-th chunk. The callsite must make sure `index` is within range.
1770    #[inline]
1771    fn check_chunk(&self, buf: &[u8], index: usize) -> bool {
1772        debug_assert!(index < self.xxhash_list.len());
1773        let bit = 1 << (index % 64);
1774        let checked = &self.checked[index / 64];
1775        if (checked.load(Acquire) & bit) == bit {
1776            true
1777        } else {
1778            let start = index << self.chunk_size_logarithm;
1779            let end = (self.end as usize).min((index + 1) << self.chunk_size_logarithm);
1780            if start == end {
1781                return true;
1782            }
1783            let hash = xxhash(&buf[start..end]);
1784            if hash == self.xxhash_list[index] {
1785                checked.fetch_or(bit, AcqRel);
1786                true
1787            } else {
1788                false
1789            }
1790        }
1791    }
1792
1793    #[inline]
1794    fn is_enabled(&self) -> bool {
1795        self.end > 0
1796    }
1797}
1798
1799fn write_reversed_vlq(mut writer: impl Write, value: usize) -> io::Result<()> {
1800    let mut reversed_vlq = Vec::new();
1801    reversed_vlq.write_vlq(value)?;
1802    reversed_vlq.reverse();
1803    writer.write_all(&reversed_vlq)?;
1804    Ok(())
1805}
1806
1807impl Clone for MemChecksum {
1808    fn clone(&self) -> Self {
1809        Self {
1810            start: self.start,
1811            end: self.end,
1812            chain_len: self.chain_len,
1813            chunk_size_logarithm: self.chunk_size_logarithm,
1814            xxhash_list: self.xxhash_list.clone(),
1815            checked: self
1816                .checked
1817                .iter()
1818                .map(|c| c.load(Relaxed))
1819                .map(AtomicU64::new)
1820                .collect(),
1821        }
1822    }
1823}
1824
1825impl Default for MemChecksum {
1826    fn default() -> Self {
1827        Self {
1828            start: 0,
1829            end: 0,
1830            chain_len: 0,
1831            chunk_size_logarithm: 20, // chunk_size: 1MB.
1832            xxhash_list: Vec::new(),
1833            checked: Vec::new(),
1834        }
1835    }
1836}
1837
1838#[derive(Default)]
1839struct OffsetMap {
1840    radix_len: usize,
1841    radix_map: Vec<u64>,
1842    leaf_map: Vec<u64>,
1843    link_map: Vec<u64>,
1844    key_map: Vec<u64>,
1845    ext_key_map: Vec<u64>,
1846}
1847
1848/// A simple structure that implements the IndexBuf interface.
1849/// It does not check checksum.
1850struct SimpleIndexBuf<'a>(&'a [u8], &'a Path);
1851
1852impl<'a> IndexBuf for SimpleIndexBuf<'a> {
1853    fn buf(&self) -> &[u8] {
1854        self.0
1855    }
1856    fn verify_checksum(&self, _start: u64, _length: u64) -> crate::Result<()> {
1857        Ok(())
1858    }
1859    fn path(&self) -> &Path {
1860        self.1
1861    }
1862}
1863
1864impl OffsetMap {
1865    fn empty_for_index(index: &Index) -> OffsetMap {
1866        let radix_len = index.dirty_radixes.len();
1867        OffsetMap {
1868            radix_len,
1869            radix_map: vec![0; radix_len],
1870            leaf_map: vec![0; index.dirty_leafs.len()],
1871            link_map: vec![0; index.dirty_links.len()],
1872            key_map: vec![0; index.dirty_keys.len()],
1873            ext_key_map: vec![0; index.dirty_ext_keys.len()],
1874        }
1875    }
1876
1877    #[inline]
1878    fn get(&self, offset: Offset) -> u64 {
1879        if offset.is_dirty() {
1880            let dummy = SimpleIndexBuf(b"", Path::new("<dummy>"));
1881            let result = match offset.to_typed(dummy).unwrap() {
1882                // Radix entries are pushed in the reversed order. So the index needs to be
1883                // reversed.
1884                TypedOffset::Radix(x) => self.radix_map[self.radix_len - 1 - x.dirty_index()],
1885                TypedOffset::Leaf(x) => self.leaf_map[x.dirty_index()],
1886                TypedOffset::Link(x) => self.link_map[x.dirty_index()],
1887                TypedOffset::Key(x) => self.key_map[x.dirty_index()],
1888                TypedOffset::ExtKey(x) => self.ext_key_map[x.dirty_index()],
1889                TypedOffset::Checksum(_) => {
1890                    panic!("bug: ChecksumOffset shouldn't be used in OffsetMap::get")
1891                }
1892            };
1893            // result == 0 means an entry marked "unused" is actually used. It's a logic error.
1894            debug_assert!(result > 0);
1895            result
1896        } else {
1897            // No need to translate.
1898            offset.0
1899        }
1900    }
1901}
1902
1903/// Choose between Front and Back. Used by [`RangeIter`] related logic.
1904#[derive(Clone, Copy)]
1905enum Side {
1906    Front,
1907    Back,
1908}
1909use Side::Back;
1910use Side::Front;
1911
1912/// State used by [`RangeIter`].
1913#[derive(Clone, Copy, PartialEq, Debug)]
1914enum IterState {
1915    /// Visiting the child of a radix node.
1916    /// child must be inside 0..16 range.
1917    RadixChild(RadixOffset, u8),
1918
1919    /// Visiting the leaf of a radix node.
1920    RadixLeaf(RadixOffset),
1921
1922    /// Visiting this leaf node.
1923    Leaf(LeafOffset),
1924
1925    /// Dummy states to express "inclusive" bounds.
1926    /// RadixStart < RadixLeaf < RadixChild < RadixEnd.
1927    /// LeafStart < Leaf < LeafEnd.
1928    RadixStart(RadixOffset),
1929    RadixEnd(RadixOffset),
1930    LeafStart(LeafOffset),
1931    LeafEnd(LeafOffset),
1932}
1933
1934impl IterState {
1935    /// Get the next state on the same frame.
1936    /// Return `None` if the frame should be popped.
1937    fn next(self) -> Option<Self> {
1938        match self {
1939            IterState::RadixChild(radix, 15) => Some(IterState::RadixEnd(radix)),
1940            IterState::RadixChild(radix, i) => Some(IterState::RadixChild(radix, i + 1)),
1941            IterState::RadixStart(radix) => Some(IterState::RadixLeaf(radix)),
1942            IterState::RadixLeaf(radix) => Some(IterState::RadixChild(radix, 0)),
1943            IterState::LeafStart(leaf) => Some(IterState::Leaf(leaf)),
1944            IterState::Leaf(leaf) => Some(IterState::LeafEnd(leaf)),
1945            _ => None,
1946        }
1947    }
1948
1949    /// Get the previous state on the same frame.
1950    /// Return `None` if the frame should be popped.
1951    fn prev(self) -> Option<Self> {
1952        match self {
1953            IterState::RadixChild(radix, 0) => Some(IterState::RadixLeaf(radix)),
1954            IterState::RadixChild(radix, i) => Some(IterState::RadixChild(radix, i - 1)),
1955            IterState::RadixEnd(radix) => Some(IterState::RadixChild(radix, 15)),
1956            IterState::RadixLeaf(radix) => Some(IterState::RadixStart(radix)),
1957            IterState::LeafEnd(leaf) => Some(IterState::Leaf(leaf)),
1958            IterState::Leaf(leaf) => Some(IterState::LeafStart(leaf)),
1959            _ => None,
1960        }
1961    }
1962
1963    /// Move one step towards the given side.
1964    fn step(self, towards: Side) -> Option<Self> {
1965        match towards {
1966            Front => self.prev(),
1967            Back => self.next(),
1968        }
1969    }
1970}
1971
1972/// Main Index
1973
1974/// Insertion-only mapping from `bytes` to a list of [u64]s.
1975///
1976/// An [`Index`] is backed by an append-only file in the filesystem. Internally,
1977/// it uses base16 radix trees for keys and linked list for [u64] values. The
1978/// file format was designed to be able to support other types of indexes (ex.
1979/// non-radix-trees). Though none of them are implemented.
1980pub struct Index {
1981    // For locking and low-level access.
1982    file: Option<File>,
1983
1984    // For efficient and shared random reading.
1985    // Backed by mmap.
1986    pub(crate) buf: Bytes,
1987
1988    // For error messages.
1989    // Log uses this field for error messages.
1990    pub(crate) path: PathBuf,
1991
1992    // Options
1993    checksum_enabled: bool,
1994    checksum_max_chain_len: u32,
1995    fsync: bool,
1996    write: Option<bool>,
1997
1998    // Used by `clear_dirty`.
1999    clean_root: MemRoot,
2000
2001    // In-memory entries. The root entry is always in-memory.
2002    dirty_root: MemRoot,
2003    dirty_radixes: Vec<MemRadix>,
2004    dirty_leafs: Vec<MemLeaf>,
2005    dirty_links: Vec<MemLink>,
2006    dirty_keys: Vec<MemKey>,
2007    dirty_ext_keys: Vec<MemExtKey>,
2008
2009    checksum: MemChecksum,
2010
2011    // Additional buffer for external keys.
2012    // Log::sync needs write access to this field.
2013    pub(crate) key_buf: Arc<dyn ReadonlyBuffer + Send + Sync>,
2014}
2015
2016/// Abstraction of the "external key buffer".
2017///
2018/// This makes it possible to use non-contiguous memory for a buffer,
2019/// and expose them as if it's contiguous.
2020pub trait ReadonlyBuffer {
2021    /// Get a slice using the given offset.
2022    fn slice(&self, start: u64, len: u64) -> Option<&[u8]>;
2023}
2024
2025impl<T: AsRef<[u8]>> ReadonlyBuffer for T {
2026    #[inline]
2027    fn slice(&self, start: u64, len: u64) -> Option<&[u8]> {
2028        self.as_ref().get(start as usize..(start + len) as usize)
2029    }
2030}
2031
2032/// Key to insert. Used by [Index::insert_advanced].
2033pub enum InsertKey<'a> {
2034    /// Embedded key.
2035    Embed(&'a [u8]),
2036
2037    /// Reference (`[start, end)`) to `key_buf`.
2038    Reference((u64, u64)),
2039}
2040
2041/// Options used to configured how an [`Index`] is opened.
2042///
2043/// Similar to [std::fs::OpenOptions], to use this, first call `new`, then
2044/// chain calls to methods to set each option, finally call `open` to get
2045/// an [`Index`] structure.
2046#[derive(Clone)]
2047pub struct OpenOptions {
2048    checksum_max_chain_len: u32,
2049    checksum_chunk_size_logarithm: u32,
2050    checksum_enabled: bool,
2051    fsync: bool,
2052    len: Option<u64>,
2053    write: Option<bool>,
2054    key_buf: Option<Arc<dyn ReadonlyBuffer + Send + Sync>>,
2055}
2056
2057impl OpenOptions {
2058    #[allow(clippy::new_without_default)]
2059    /// Create [`OpenOptions`] with default configuration:
2060    /// - checksum enabled, with 1MB chunk size
2061    /// - checksum max chain length is `config::INDEX_CHECKSUM_MAX_CHAIN_LEN` (default: 10)
2062    /// - no external key buffer
2063    /// - no fsync
2064    /// - read root entry from the end of the file
2065    /// - open as read-write but fallback to read-only
2066    pub fn new() -> OpenOptions {
2067        OpenOptions {
2068            checksum_max_chain_len: config::INDEX_CHECKSUM_MAX_CHAIN_LEN.load(Acquire),
2069            checksum_chunk_size_logarithm: 20,
2070            checksum_enabled: true,
2071            fsync: false,
2072            len: None,
2073            write: None,
2074            key_buf: None,
2075        }
2076    }
2077
2078    /// Set the maximum checksum chain length.
2079    ///
2080    /// If it is non-zero, and the checksum chain (linked list of checksum
2081    /// entries needed to verify the entire index) exceeds the specified length,
2082    /// they will be collapsed into a single checksum entry to make `open` more
2083    /// efficient.
2084    pub fn checksum_max_chain_len(&mut self, len: u32) -> &mut Self {
2085        self.checksum_max_chain_len = len;
2086        self
2087    }
2088
2089    /// Set checksum chunk size as `1 << checksum_chunk_size_logarithm`.
2090    pub fn checksum_chunk_size_logarithm(
2091        &mut self,
2092        checksum_chunk_size_logarithm: u32,
2093    ) -> &mut Self {
2094        self.checksum_chunk_size_logarithm = checksum_chunk_size_logarithm;
2095        self
2096    }
2097
2098    /// Set whether to write checksum entries on `flush`.
2099    pub fn checksum_enabled(&mut self, checksum_enabled: bool) -> &mut Self {
2100        self.checksum_enabled = checksum_enabled;
2101        self
2102    }
2103
2104    /// Set fsync behavior.
2105    ///
2106    /// If true, then [`Index::flush`] will use `fsync` to flush data to the
2107    /// physical device before returning.
2108    pub fn fsync(&mut self, fsync: bool) -> &mut Self {
2109        self.fsync = fsync;
2110        self
2111    }
2112
2113    /// Set whether writing is required:
2114    ///
2115    /// - `None`: open as read-write but fallback to read-only. `flush()` may fail.
2116    /// - `Some(false)`: open as read-only. `flush()` will always fail.
2117    /// - `Some(true)`: open as read-write. `open()` fails if read-write is not
2118    ///   possible. `flush()` will not fail due to permission issues.
2119    ///
2120    /// Note:  The index is always mutable in-memory. Only `flush()` may fail.
2121    pub fn write(&mut self, value: Option<bool>) -> &mut Self {
2122        self.write = value;
2123        self
2124    }
2125
2126    /// Specify the logical length of the file.
2127    ///
2128    /// If `len` is `None`, use the actual file length. Otherwise, use the
2129    /// length specified by `len`. Reading the file length requires locking.
2130    ///
2131    /// This is useful for lock-free reads, or accessing to multiple versions of
2132    /// the index at the same time.
2133    ///
2134    /// To get a valid logical length, check the return value of [`Index::flush`].
2135    pub fn logical_len(&mut self, len: Option<u64>) -> &mut Self {
2136        self.len = len;
2137        self
2138    }
2139
2140    /// Specify the external key buffer.
2141    ///
2142    /// With an external key buffer, keys could be stored as references using
2143    /// `index.insert_advanced` to save space.
2144    pub fn key_buf(&mut self, buf: Option<Arc<dyn ReadonlyBuffer + Send + Sync>>) -> &mut Self {
2145        self.key_buf = buf;
2146        self
2147    }
2148
2149    /// Open the index file with given options.
2150    ///
2151    /// Driven by the "immutable by default" idea, together with append-only
2152    /// properties, [`OpenOptions::open`] returns a "snapshotted" view of the
2153    /// index. Changes to the filesystem won't change instantiated [`Index`]es.
2154    pub fn open<P: AsRef<Path>>(&self, path: P) -> crate::Result<Index> {
2155        let path = path.as_ref();
2156        let result: crate::Result<_> = (|| {
2157            let span = debug_span!("Index::open", path = path.to_string_lossy().as_ref());
2158            let _guard = span.enter();
2159
2160            let mut open_options = self.clone();
2161            let open_result = if self.write == Some(false) {
2162                fs::OpenOptions::new().read(true).open(path)
2163            } else {
2164                fs::OpenOptions::new()
2165                    .read(true)
2166                    .write(true)
2167                    .create(true)
2168                    .append(true)
2169                    .open(path)
2170            };
2171            let mut file = match self.write {
2172                Some(write) => open_result.context(
2173                    path,
2174                    if write {
2175                        "cannot open Index with read-write mode"
2176                    } else {
2177                        "cannot open Index with read-only mode"
2178                    },
2179                )?,
2180                None => {
2181                    // Fall back to open the file as read-only, automatically.
2182                    match open_result {
2183                        Err(_) => {
2184                            open_options.write = Some(false);
2185                            fs::OpenOptions::new()
2186                                .read(true)
2187                                .open(path)
2188                                .context(path, "cannot open Index with read-only mode")?
2189                        }
2190                        Ok(file) => file,
2191                    }
2192                }
2193            };
2194
2195            let bytes = {
2196                match self.len {
2197                    None => {
2198                        // Take the lock to read file length, since that decides root entry location.
2199                        let lock = ScopedFileLock::new(&mut file, false)
2200                            .context(path, "cannot lock Log to read file length")?;
2201                        mmap_bytes(lock.as_ref(), None).context(path, "cannot mmap")?
2202                    }
2203                    Some(len) => {
2204                        // No need to lock for getting file length.
2205                        mmap_bytes(&file, Some(len)).context(path, "cannot mmap")?
2206                    }
2207                }
2208            };
2209
2210            let (dirty_radixes, clean_root, mut checksum) = if bytes.is_empty() {
2211                // Empty file. Create root radix entry as an dirty entry, and
2212                // rebuild checksum table (in case it's corrupted).
2213                let radix_offset = RadixOffset::from_dirty_index(0);
2214                let _ = utils::fix_perm_file(&file, false);
2215                let meta = Default::default();
2216                let root = MemRoot { radix_offset, meta };
2217                let checksum = MemChecksum::default();
2218                (vec![MemRadix::default()], root, checksum)
2219            } else {
2220                let end = bytes.len();
2221                let (root, mut checksum) = read_root_checksum_at_end(path, &bytes, end)?;
2222                if !self.checksum_enabled {
2223                    checksum = MemChecksum::default();
2224                }
2225                (vec![], root, checksum)
2226            };
2227
2228            checksum.set_chunk_size_logarithm(&bytes, self.checksum_chunk_size_logarithm)?;
2229            let key_buf = self.key_buf.clone();
2230            let dirty_root = clean_root.clone();
2231
2232            let index = Index {
2233                file: Some(file),
2234                buf: bytes,
2235                path: path.to_path_buf(),
2236                // Deconstruct open_options instead of storing it whole, since it contains a
2237                // permanent reference to the original key_buf mmap.
2238                checksum_enabled: open_options.checksum_enabled,
2239                checksum_max_chain_len: open_options.checksum_max_chain_len,
2240                fsync: open_options.fsync,
2241                write: open_options.write,
2242                clean_root,
2243                dirty_root,
2244                checksum,
2245                dirty_radixes,
2246                dirty_links: vec![],
2247                dirty_leafs: vec![],
2248                dirty_keys: vec![],
2249                dirty_ext_keys: vec![],
2250                key_buf: key_buf.unwrap_or_else(|| Arc::new(&b""[..])),
2251            };
2252
2253            Ok(index)
2254        })();
2255        result.context(|| format!("in index::OpenOptions::open({:?})", path))
2256    }
2257
2258    /// Create an in-memory [`Index`] that skips flushing to disk.
2259    /// Return an error if `checksum_chunk_size_logarithm` is not 0.
2260    pub fn create_in_memory(&self) -> crate::Result<Index> {
2261        let result: crate::Result<_> = (|| {
2262            let buf = Bytes::new();
2263            let dirty_radixes = vec![MemRadix::default()];
2264            let clean_root = {
2265                let radix_offset = RadixOffset::from_dirty_index(0);
2266                let meta = Default::default();
2267                MemRoot { radix_offset, meta }
2268            };
2269            let key_buf = self.key_buf.clone();
2270            let dirty_root = clean_root.clone();
2271            let mut checksum = MemChecksum::default();
2272            checksum.set_chunk_size_logarithm(&buf, self.checksum_chunk_size_logarithm)?;
2273
2274            Ok(Index {
2275                file: None,
2276                buf,
2277                path: PathBuf::new(),
2278                checksum_enabled: self.checksum_enabled,
2279                checksum_max_chain_len: self.checksum_max_chain_len,
2280                fsync: self.fsync,
2281                write: self.write,
2282                clean_root,
2283                dirty_root,
2284                checksum,
2285                dirty_radixes,
2286                dirty_links: vec![],
2287                dirty_leafs: vec![],
2288                dirty_keys: vec![],
2289                dirty_ext_keys: vec![],
2290                key_buf: key_buf.unwrap_or_else(|| Arc::new(&b""[..])),
2291            })
2292        })();
2293        result.context("in index::OpenOptions::create_in_memory")
2294    }
2295}
2296
2297/// Load root and checksum from the logical end.
2298fn read_root_checksum_at_end(
2299    path: &Path,
2300    bytes: &[u8],
2301    end: usize,
2302) -> crate::Result<(MemRoot, MemChecksum)> {
2303    // root_offset      (root_offset + root_size)
2304    // v                v
2305    // |---root_entry---|---checksum_entry---|--reversed_vlq_size---|
2306    // |<--------- root_checksum_size ------>|
2307    // |<-- root_size ->|
2308
2309    let buf = SimpleIndexBuf(bytes, path);
2310    // Be careful! SimpleIndexBuf does not do data verification.
2311    // Handle integer range overflows here.
2312    let (root_checksum_size, vlq_size) =
2313        read_vlq_reverse(bytes, end).context(path, "cannot read len(root+checksum)")?;
2314
2315    // Verify the header byte.
2316    check_type(&buf, 0, TYPE_HEAD)?;
2317
2318    if end < root_checksum_size as usize + vlq_size {
2319        return Err(crate::Error::corruption(
2320            path,
2321            format!(
2322                "data corrupted at {} (invalid size: {})",
2323                end, root_checksum_size
2324            ),
2325        ));
2326    }
2327
2328    let root_offset = end - root_checksum_size as usize - vlq_size;
2329    let (root, root_size) = MemRoot::read_from(&buf, root_offset as u64)?;
2330
2331    let checksum = if root_offset + root_size + vlq_size == end {
2332        // No checksum - checksum disabled.
2333        MemChecksum::default()
2334    } else {
2335        let (checksum, _checksum_len) =
2336            MemChecksum::read_from(&buf, (root_offset + root_size) as u64)?;
2337
2338        // Not checking lengths here:
2339        //
2340        // root_offset + root_size + checksum_len + vlq_size == end
2341        // so we can add new kinds of data in the future without breaking
2342        // older clients, similar to how Checksum gets added while
2343        // maintaining compatibility.
2344
2345        // Verify the Root entry, since SimpleIndexBuf skips checksum check.
2346        // Checksum is self-verified. So no need to check it.
2347        checksum
2348            .check_range(bytes, root_offset as u64, root_size as u64)
2349            .context(path, "failed to verify Root entry")?;
2350        checksum
2351    };
2352
2353    Ok((root, checksum))
2354}
2355
2356impl fmt::Debug for OpenOptions {
2357    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2358        write!(f, "OpenOptions {{ ")?;
2359        write!(
2360            f,
2361            "checksum_chunk_size_logarithm: {}, ",
2362            self.checksum_chunk_size_logarithm
2363        )?;
2364        write!(f, "fsync: {}, ", self.fsync)?;
2365        write!(f, "len: {:?}, ", self.len)?;
2366        write!(f, "write: {:?}, ", self.write)?;
2367        let key_buf_desc = match self.key_buf {
2368            Some(ref _buf) => "Some(_)",
2369            None => "None",
2370        };
2371        write!(f, "key_buf: {} }}", key_buf_desc)?;
2372        Ok(())
2373    }
2374}
2375
2376/// A subset of Index features for read-only accesses.
2377/// - Provides the main buffer, immutable data serialized on-disk.
2378/// - Provides the optional checksum checker.
2379/// - Provides the path (for error message).
2380trait IndexBuf {
2381    fn buf(&self) -> &[u8];
2382    fn path(&self) -> &Path;
2383
2384    /// Verify checksum for the given range. Internal API used by `*Offset` structs.
2385    fn verify_checksum(&self, start: u64, length: u64) -> crate::Result<()>;
2386
2387    #[inline(never)]
2388    fn range_error(&self, start: usize, length: usize) -> crate::Error {
2389        self.corruption(format!(
2390            "byte range {}..{} is unavailable",
2391            start,
2392            start + length
2393        ))
2394    }
2395
2396    #[inline(never)]
2397    fn corruption(&self, message: impl ToString) -> crate::Error {
2398        crate::Error::corruption(self.path(), message)
2399    }
2400}
2401
2402impl<T: IndexBuf> IndexBuf for &T {
2403    fn buf(&self) -> &[u8] {
2404        T::buf(self)
2405    }
2406    fn verify_checksum(&self, start: u64, length: u64) -> crate::Result<()> {
2407        T::verify_checksum(self, start, length)
2408    }
2409    fn path(&self) -> &Path {
2410        T::path(self)
2411    }
2412}
2413
2414impl IndexBuf for Index {
2415    fn buf(&self) -> &[u8] {
2416        &self.buf
2417    }
2418    fn verify_checksum(&self, offset: u64, length: u64) -> crate::Result<()> {
2419        self.checksum
2420            .check_range(&self.buf, offset, length)
2421            .context(&self.path, || format!("Index path = {:?}", self.path))
2422    }
2423    fn path(&self) -> &Path {
2424        &self.path
2425    }
2426}
2427
2428// Intentionally not inlined. This affects the "index lookup (disk, verified)"
2429// benchmark. It takes 74ms with this function inlined, and 61ms without.
2430//
2431// Reduce instruction count in `Index::verify_checksum`.
2432#[inline(never)]
2433fn checksum_error(checksum: &MemChecksum, offset: u64, length: u64) -> io::Result<()> {
2434    Err(io::Error::new(
2435        io::ErrorKind::InvalidData,
2436        format!(
2437            "range {}..{} failed checksum check ({:?})",
2438            offset,
2439            offset + length,
2440            &checksum,
2441        ),
2442    ))
2443}
2444
2445impl Index {
2446    /// Return a cloned [`Index`] with pending in-memory changes.
2447    pub fn try_clone(&self) -> crate::Result<Self> {
2448        self.try_clone_internal(true)
2449            .context("in Index::try_clone")
2450            .context(|| format!("  Index.path = {:?}", self.path))
2451    }
2452
2453    /// Return a cloned [`Index`] without pending in-memory changes.
2454    ///
2455    /// This is logically equivalent to calling `clear_dirty` immediately
2456    /// on the result after `try_clone`, but potentially cheaper.
2457    pub fn try_clone_without_dirty(&self) -> crate::Result<Self> {
2458        self.try_clone_internal(false)
2459            .context("in Index::try_clone_without_dirty")
2460            .context(|| format!("  Index.path = {:?}", self.path))
2461    }
2462
2463    pub(crate) fn try_clone_internal(&self, copy_dirty: bool) -> crate::Result<Index> {
2464        let file = match &self.file {
2465            Some(f) => Some(f.duplicate().context(self.path(), "cannot duplicate")?),
2466            None => None,
2467        };
2468
2469        let index = if copy_dirty {
2470            Index {
2471                file,
2472                buf: self.buf.clone(),
2473                path: self.path.clone(),
2474                checksum_enabled: self.checksum_enabled,
2475                checksum_max_chain_len: self.checksum_max_chain_len,
2476                fsync: self.fsync,
2477                write: self.write,
2478                clean_root: self.clean_root.clone(),
2479                dirty_root: self.dirty_root.clone(),
2480                checksum: self.checksum.clone(),
2481                dirty_keys: self.dirty_keys.clone(),
2482                dirty_ext_keys: self.dirty_ext_keys.clone(),
2483                dirty_leafs: self.dirty_leafs.clone(),
2484                dirty_links: self.dirty_links.clone(),
2485                dirty_radixes: self.dirty_radixes.clone(),
2486                key_buf: self.key_buf.clone(),
2487            }
2488        } else {
2489            Index {
2490                file,
2491                buf: self.buf.clone(),
2492                path: self.path.clone(),
2493                checksum_enabled: self.checksum_enabled,
2494                checksum_max_chain_len: self.checksum_max_chain_len,
2495                fsync: self.fsync,
2496                write: self.write,
2497                clean_root: self.clean_root.clone(),
2498                dirty_root: self.clean_root.clone(),
2499                checksum: self.checksum.clone(),
2500                dirty_keys: Vec::new(),
2501                dirty_ext_keys: Vec::new(),
2502                dirty_leafs: Vec::new(),
2503                dirty_links: Vec::new(),
2504                dirty_radixes: if self.clean_root.radix_offset.is_dirty() {
2505                    // See `clear_dirty` for this special case.
2506                    vec![MemRadix::default()]
2507                } else {
2508                    Vec::new()
2509                },
2510                key_buf: self.key_buf.clone(),
2511            }
2512        };
2513
2514        Ok(index)
2515    }
2516
2517    /// Get metadata attached to the root node. This is what previously set by
2518    /// [Index::set_meta].
2519    pub fn get_meta(&self) -> &[u8] {
2520        &self.dirty_root.meta
2521    }
2522
2523    /// Get metadata attached to the root node at file open time. This is what
2524    /// stored on the filesystem at the index open time, not affected by
2525    /// [`Index::set_meta`].
2526    pub fn get_original_meta(&self) -> &[u8] {
2527        &self.clean_root.meta
2528    }
2529
2530    /// Set metadata attached to the root node. Will be written at
2531    /// [`Index::flush`] time.
2532    pub fn set_meta<B: AsRef<[u8]>>(&mut self, meta: B) {
2533        self.dirty_root.meta = meta.as_ref().to_vec().into_boxed_slice()
2534    }
2535
2536    /// Remove dirty (in-memory) state. Restore the [`Index`] to the state as
2537    /// if it's just loaded from disk without modifications.
2538    pub fn clear_dirty(&mut self) {
2539        self.dirty_root = self.clean_root.clone();
2540        self.dirty_radixes.clear();
2541        if self.dirty_root.radix_offset.is_dirty() {
2542            // In case the disk buffer is empty, a "dirty radix" entry
2543            // is created automatically. Check OpenOptions::open for
2544            // details.
2545            assert_eq!(
2546                self.dirty_root.radix_offset,
2547                RadixOffset::from_dirty_index(0)
2548            );
2549            self.dirty_radixes.push(MemRadix::default());
2550        }
2551        self.dirty_leafs.clear();
2552        self.dirty_links.clear();
2553        self.dirty_keys.clear();
2554        self.dirty_ext_keys.clear();
2555    }
2556
2557    /// Flush changes to disk.
2558    ///
2559    /// Take the file lock when writing.
2560    ///
2561    /// Return 0 if nothing needs to be written. Otherwise return the new file
2562    /// length on success. Return [`io::ErrorKind::PermissionDenied`] if the file
2563    /// was marked read-only at open time.
2564    ///
2565    /// The new file length can be used to obtain the exact same view of the
2566    /// index as it currently is. That means, other changes to the indexes won't
2567    /// be "combined" during flush. For example, given the following events
2568    /// happened in order:
2569    /// - Open. Get Index X.
2570    /// - Open using the same arguments. Get Index Y.
2571    /// - Write key "p" to X.
2572    /// - Write key "q" to Y.
2573    /// - Flush X. Get new length LX.
2574    /// - Flush Y. Get new length LY.
2575    /// - Open using LY as `logical_len`. Get Index Z.
2576    ///
2577    /// Then key "p" does not exist in Z. This allows some advanced use cases.
2578    /// On the other hand, if "merging changes" is the desired behavior, the
2579    /// caller needs to take another lock, re-instantiate [`Index`] and re-insert
2580    /// keys.
2581    ///
2582    /// For in-memory-only indexes, this function does nothing and returns 0,
2583    /// unless read-only was set at open time.
2584    pub fn flush(&mut self) -> crate::Result<u64> {
2585        let result: crate::Result<_> = (|| {
2586            let span = debug_span!("Index::flush", path = self.path.to_string_lossy().as_ref());
2587            let _guard = span.enter();
2588
2589            if self.write == Some(false) {
2590                return Err(crate::Error::path(
2591                    self.path(),
2592                    "cannot flush: Index opened with read-only mode",
2593                ));
2594            }
2595            if self.file.is_none() {
2596                // Why is this Ok, not Err?
2597                //
2598                // An in-memory Index does not share data with anybody else,
2599                // therefore no need to flush. In other words, whether flush
2600                // happens or not does not change the result of other APIs on
2601                // this Index instance.
2602                //
2603                // Another way to think about it, an in-memory Index is similar
2604                // to a private anonymous mmap, and msync on that mmap would
2605                // succeed.
2606                return Ok(0);
2607            }
2608
2609            let old_len = self.buf.len() as u64;
2610            let mut new_len = old_len;
2611            if self.dirty_root == self.clean_root && !self.dirty_root.radix_offset.is_dirty() {
2612                // Nothing changed
2613                return Ok(new_len);
2614            }
2615
2616            // Critical section: need write lock
2617            {
2618                let mut offset_map = OffsetMap::empty_for_index(self);
2619                let estimated_dirty_bytes = self.dirty_links.len() * 50;
2620                let path = self.path.clone(); // for error messages; and make the borrowck happy.
2621                let mut lock = ScopedFileLock::new(self.file.as_mut().unwrap(), true)
2622                    .context(&path, "cannot lock")?;
2623                let len = lock
2624                    .as_mut()
2625                    .seek(SeekFrom::End(0))
2626                    .context(&path, "cannot seek to end")?;
2627                if len < old_len {
2628                    let message = format!(
2629                        "on-disk index is unexpectedly smaller ({} bytes) than its previous version ({} bytes)",
2630                        len, old_len
2631                    );
2632                    // This is not a "corruption" - something has truncated the
2633                    // file, potentially recreating it. We haven't checked the
2634                    // new content, so it's not considered as "data corruption".
2635                    // TODO: Review this decision.
2636                    let err = crate::Error::path(&path, message);
2637                    return Err(err);
2638                }
2639
2640                let mut buf = Vec::with_capacity(estimated_dirty_bytes);
2641
2642                // Write in the following order:
2643                // header, keys, links, leafs, radixes, root.
2644                // Latter entries depend on former entries.
2645
2646                if len == 0 {
2647                    buf.write_all(&[TYPE_HEAD]).infallible()?;
2648                }
2649
2650                for (i, entry) in self.dirty_keys.iter().enumerate() {
2651                    if !entry.is_unused() {
2652                        let offset = buf.len() as u64 + len;
2653                        offset_map.key_map[i] = offset;
2654                        entry.write_to(&mut buf, &offset_map).infallible()?;
2655                    };
2656                }
2657
2658                // Inlined leafs. They might affect ExtKeys and Links. Need to write first.
2659                for i in 0..self.dirty_leafs.len() {
2660                    let entry = self.dirty_leafs.get_mut(i).unwrap();
2661                    let offset = buf.len() as u64 + len;
2662                    if !entry.is_unused()
2663                        && entry.maybe_write_inline_to(
2664                            &mut buf,
2665                            &self.buf,
2666                            len,
2667                            &mut self.dirty_ext_keys,
2668                            &mut self.dirty_links,
2669                            &mut offset_map,
2670                        )?
2671                    {
2672                        offset_map.leaf_map[i] = offset;
2673                        entry.mark_unused();
2674                    }
2675                }
2676
2677                for (i, entry) in self.dirty_ext_keys.iter().enumerate() {
2678                    if !entry.is_unused() {
2679                        let offset = buf.len() as u64 + len;
2680                        offset_map.ext_key_map[i] = offset;
2681                        entry.write_to(&mut buf, &offset_map).infallible()?;
2682                    }
2683                }
2684
2685                for (i, entry) in self.dirty_links.iter().enumerate() {
2686                    if !entry.is_unused() {
2687                        let offset = buf.len() as u64 + len;
2688                        offset_map.link_map[i] = offset;
2689                        entry.write_to(&mut buf, &offset_map).infallible()?;
2690                    }
2691                }
2692
2693                // Non-inlined leafs.
2694                for (i, entry) in self.dirty_leafs.iter().enumerate() {
2695                    if !entry.is_unused() {
2696                        let offset = buf.len() as u64 + len;
2697                        offset_map.leaf_map[i] = offset;
2698                        entry
2699                            .write_noninline_to(&mut buf, &offset_map)
2700                            .infallible()?;
2701                    }
2702                }
2703
2704                // Write Radix entries in reversed order since former ones might refer to latter ones.
2705                for (i, entry) in self.dirty_radixes.iter().rev().enumerate() {
2706                    let offset = buf.len() as u64 + len;
2707                    entry.write_to(&mut buf, &offset_map).infallible()?;
2708                    offset_map.radix_map[i] = offset;
2709                }
2710
2711                // Write Root.
2712                let root_len = self
2713                    .dirty_root
2714                    .write_to(&mut buf, &offset_map)
2715                    .infallible()?;
2716
2717                // Update and write Checksum if it's enabled.
2718                let mut new_checksum = self.checksum.clone();
2719                let checksum_len = if self.checksum_enabled {
2720                    new_checksum
2721                        .update(&self.buf, lock.as_mut(), len, &buf)
2722                        .context(&path, "cannot read and update checksum")?;
2723                    // Optionally merge the checksum entry for optimization.
2724                    if self.checksum_max_chain_len > 0
2725                        && new_checksum.chain_len >= self.checksum_max_chain_len
2726                    {
2727                        new_checksum.flatten();
2728                    }
2729                    new_checksum.write_to(&mut buf, &offset_map).infallible()?
2730                } else {
2731                    assert!(!self.checksum.is_enabled());
2732                    0
2733                };
2734                write_reversed_vlq(&mut buf, root_len + checksum_len).infallible()?;
2735
2736                new_len = buf.len() as u64 + len;
2737                lock.as_mut()
2738                    .seek(SeekFrom::Start(len))
2739                    .context(&path, "cannot seek")?;
2740                lock.as_mut()
2741                    .write_all(&buf)
2742                    .context(&path, "cannot write new data to index")?;
2743
2744                if self.fsync || config::get_global_fsync() {
2745                    lock.as_mut().sync_all().context(&path, "cannot sync")?;
2746                }
2747
2748                // Remap and update root since length has changed
2749                let bytes = mmap_bytes(lock.as_ref(), None).context(&path, "cannot mmap")?;
2750                self.buf = bytes;
2751
2752                // 'path' should not have changed.
2753                debug_assert_eq!(&self.path, &path);
2754
2755                // This is to workaround the borrow checker.
2756                let this = SimpleIndexBuf(&self.buf, &path);
2757
2758                // Sanity check - the length should be expected. Otherwise, the lock
2759                // is somehow ineffective.
2760                if self.buf.len() as u64 != new_len {
2761                    return Err(this.corruption("file changed unexpectedly"));
2762                }
2763
2764                // Reload root and checksum.
2765                let (root, checksum) =
2766                    read_root_checksum_at_end(&path, &self.buf, new_len as usize)?;
2767
2768                debug_assert_eq!(checksum.end, new_checksum.end);
2769                debug_assert_eq!(&checksum.xxhash_list, &new_checksum.xxhash_list);
2770                self.checksum = checksum;
2771                self.clean_root = root;
2772            }
2773
2774            // Outside critical section
2775            self.clear_dirty();
2776
2777            if cfg!(all(debug_assertions, test)) {
2778                self.verify()
2779                    .expect("sync() should not break checksum check");
2780            }
2781
2782            Ok(new_len)
2783        })();
2784        result
2785            .context("in Index::flush")
2786            .context(|| format!("  Index.path = {:?}", self.path))
2787    }
2788
2789    /// Lookup by `key`. Return [`LinkOffset`].
2790    ///
2791    /// To test if the key exists or not, use [Offset::is_null].
2792    /// To obtain all values, use [`LinkOffset::values`].
2793    pub fn get<K: AsRef<[u8]>>(&self, key: &K) -> crate::Result<LinkOffset> {
2794        let result: crate::Result<_> = (|| {
2795            let mut offset: Offset = self.dirty_root.radix_offset.into();
2796            let mut iter = Base16Iter::from_base256(key);
2797
2798            while !offset.is_null() {
2799                // Read the entry at "offset"
2800                match offset.to_typed(self)? {
2801                    TypedOffset::Radix(radix) => {
2802                        match iter.next() {
2803                            None => {
2804                                // The key ends at this Radix entry.
2805                                return radix.link_offset(self);
2806                            }
2807                            Some(x) => {
2808                                // Follow the `x`-th child in the Radix entry.
2809                                offset = radix.child(self, x)?;
2810                            }
2811                        }
2812                    }
2813                    TypedOffset::Leaf(leaf) => {
2814                        // Meet a leaf. If key matches, return the link offset.
2815                        let (stored_key, link_offset) = leaf.key_and_link_offset(self)?;
2816                        if stored_key == key.as_ref() {
2817                            return Ok(link_offset);
2818                        } else {
2819                            return Ok(LinkOffset::default());
2820                        }
2821                    }
2822                    _ => return Err(self.corruption("unexpected type during key lookup")),
2823                }
2824            }
2825
2826            // Not found
2827            Ok(LinkOffset::default())
2828        })();
2829
2830        result
2831            .context(|| format!("in Index::get({:?})", key.as_ref()))
2832            .context(|| format!("  Index.path = {:?}", self.path))
2833    }
2834
2835    /// Scan entries which match the given prefix in base16 form.
2836    /// Return [`RangeIter`] which allows accesses to keys and values.
2837    pub fn scan_prefix_base16(
2838        &self,
2839        mut base16: impl Iterator<Item = u8>,
2840    ) -> crate::Result<RangeIter> {
2841        let mut offset: Offset = self.dirty_root.radix_offset.into();
2842        let mut front_stack = Vec::<IterState>::new();
2843
2844        while !offset.is_null() {
2845            // Read the entry at "offset"
2846            match offset.to_typed(self)? {
2847                TypedOffset::Radix(radix) => {
2848                    match base16.next() {
2849                        None => {
2850                            let start = IterState::RadixStart(radix);
2851                            let end = IterState::RadixEnd(radix);
2852                            front_stack.push(start);
2853                            let mut back_stack = front_stack.clone();
2854                            *back_stack.last_mut().unwrap() = end;
2855                            return Ok(RangeIter::new(self, front_stack, back_stack));
2856                        }
2857                        Some(x) => {
2858                            // Follow the `x`-th child in the Radix entry.
2859                            front_stack.push(IterState::RadixChild(radix, x));
2860                            offset = radix.child(self, x)?;
2861                        }
2862                    }
2863                }
2864                TypedOffset::Leaf(leaf) => {
2865                    // Meet a leaf. If key matches, return the link offset.
2866                    let eq = {
2867                        let (stored_key, _link_offset) = leaf.key_and_link_offset(self)?;
2868                        // Remaining key matches?
2869                        let remaining: Vec<u8> = base16.collect();
2870                        Base16Iter::from_base256(&stored_key)
2871                            .skip(front_stack.len())
2872                            .take(remaining.len())
2873                            .eq(remaining.iter().cloned())
2874                    };
2875                    if eq {
2876                        let start = IterState::LeafStart(leaf);
2877                        let end = IterState::LeafEnd(leaf);
2878                        front_stack.push(start);
2879                        let mut back_stack = front_stack.clone();
2880                        *back_stack.last_mut().unwrap() = end;
2881                        return Ok(RangeIter::new(self, front_stack, back_stack));
2882                    } else {
2883                        return Ok(RangeIter::new(self, front_stack.clone(), front_stack));
2884                    };
2885                }
2886                _ => return Err(self.corruption("unexpected type during prefix scan")),
2887            }
2888        }
2889
2890        // Not found
2891        Ok(RangeIter::new(self, front_stack.clone(), front_stack))
2892    }
2893
2894    /// Scan entries which match the given prefix in base256 form.
2895    /// Return [`RangeIter`] which allows accesses to keys and values.
2896    pub fn scan_prefix<B: AsRef<[u8]>>(&self, prefix: B) -> crate::Result<RangeIter> {
2897        self.scan_prefix_base16(Base16Iter::from_base256(&prefix))
2898            .context(|| format!("in Index::scan_prefix({:?})", prefix.as_ref()))
2899            .context(|| format!("  Index.path = {:?}", self.path))
2900    }
2901
2902    /// Scan entries which match the given prefix in hex form.
2903    /// Return [`RangeIter`] which allows accesses to keys and values.
2904    pub fn scan_prefix_hex<B: AsRef<[u8]>>(&self, prefix: B) -> crate::Result<RangeIter> {
2905        // Invalid hex chars will be caught by `radix.child`
2906        let base16 = prefix.as_ref().iter().cloned().map(single_hex_to_base16);
2907        self.scan_prefix_base16(base16)
2908            .context(|| format!("in Index::scan_prefix_hex({:?})", prefix.as_ref()))
2909            .context(|| format!("  Index.path = {:?}", self.path))
2910    }
2911
2912    /// Scans entries whose keys are within the given range.
2913    ///
2914    /// Returns a double-ended iterator, which provides accesses to keys and
2915    /// values.
2916    pub fn range<'a>(&self, range: impl RangeBounds<&'a [u8]>) -> crate::Result<RangeIter> {
2917        let is_empty_range = match (range.start_bound(), range.end_bound()) {
2918            (Included(start), Included(end)) => start > end,
2919            (Included(start), Excluded(end)) => start > end,
2920            (Excluded(start), Included(end)) => start > end,
2921            (Excluded(start), Excluded(end)) => start >= end,
2922            (Unbounded, _) | (_, Unbounded) => false,
2923        };
2924
2925        if is_empty_range {
2926            // `BTreeSet::range` panics in this case. Match its behavior.
2927            panic!("range start is greater than range end");
2928        }
2929
2930        let result: crate::Result<_> = (|| {
2931            let front_stack = self.iter_stack_by_bound(range.start_bound(), Front)?;
2932            let back_stack = self.iter_stack_by_bound(range.end_bound(), Back)?;
2933            Ok(RangeIter::new(self, front_stack, back_stack))
2934        })();
2935
2936        result
2937            .context(|| {
2938                format!(
2939                    "in Index::range({:?} to {:?})",
2940                    range.start_bound(),
2941                    range.end_bound()
2942                )
2943            })
2944            .context(|| format!("  Index.path = {:?}", self.path))
2945    }
2946
2947    /// Insert a key-value pair. The value will be the head of the linked list.
2948    /// That is, `get(key).values().first()` will return the newly inserted
2949    /// value.
2950    pub fn insert<K: AsRef<[u8]>>(&mut self, key: &K, value: u64) -> crate::Result<()> {
2951        self.insert_advanced(InsertKey::Embed(key.as_ref()), InsertValue::Prepend(value))
2952            .context(|| format!("in Index::insert(key={:?}, value={})", key.as_ref(), value))
2953            .context(|| format!("  Index.path = {:?}", self.path))
2954    }
2955
2956    /// Remove all values associated with the given key.
2957    pub fn remove(&mut self, key: impl AsRef<[u8]>) -> crate::Result<()> {
2958        // NOTE: The implementation detail does not remove radix entries to
2959        // reclaim space or improve lookup performance. For example, inserting
2960        // "abcde" to an empty index, following by deleting it will still
2961        // require O(5) jumps looking up "abcde".
2962        let key = key.as_ref();
2963        self.insert_advanced(InsertKey::Embed(key), InsertValue::Tombstone)
2964            .context(|| format!("in Index::remove(key={:?})", key))
2965            .context(|| format!("  Index.path = {:?}", self.path))
2966    }
2967
2968    /// Remove all values associated with all keys with the given prefix.
2969    pub fn remove_prefix(&mut self, prefix: impl AsRef<[u8]>) -> crate::Result<()> {
2970        // NOTE: See "remove". The implementation detail does not optimize
2971        // for space or lookup performance.
2972        let prefix = prefix.as_ref();
2973        self.insert_advanced(InsertKey::Embed(prefix), InsertValue::TombstonePrefix)
2974            .context(|| format!("in Index::remove_prefix(prefix={:?})", prefix))
2975            .context(|| format!("  Index.path = {:?}", self.path))
2976    }
2977
2978    /// Update the linked list for a given key.
2979    ///
2980    /// If `link` is None, behave like `insert`. Otherwise, ignore the existing
2981    /// values `key` mapped to, create a new link entry that chains to the given
2982    /// [`LinkOffset`].
2983    ///
2984    /// `key` could be a reference, or an embedded value. See [`InsertKey`] for
2985    /// details.
2986    ///
2987    /// This is a low-level API.
2988    pub fn insert_advanced(&mut self, key: InsertKey, value: InsertValue) -> crate::Result<()> {
2989        let mut offset: Offset = self.dirty_root.radix_offset.into();
2990        let mut step = 0;
2991        let (key, key_buf_offset) = match key {
2992            InsertKey::Embed(k) => (k, None),
2993            InsertKey::Reference((start, len)) => {
2994                let key = match self.key_buf.as_ref().slice(start, len) {
2995                    Some(k) => k,
2996                    None => {
2997                        return Err(
2998                            self.corruption("key buffer is invalid when inserting referred keys")
2999                        );
3000                    }
3001                };
3002                // UNSAFE NOTICE: `key` is valid as long as `self.key_buf` is valid. `self.key_buf`
3003                // won't be changed. So `self` can still be mutable without a read-only
3004                // relationship with `key`.
3005                let detached_key = unsafe { &*(key as *const [u8]) };
3006                (detached_key, Some((start, len)))
3007            }
3008        };
3009        let mut iter = Base16Iter::from_base256(&key);
3010
3011        let mut last_radix = RadixOffset::default();
3012        let mut last_child = 0u8;
3013
3014        loop {
3015            match offset.to_typed(&*self)? {
3016                TypedOffset::Radix(radix) => {
3017                    // Copy radix entry since we must modify it.
3018                    let radix = radix.copy(self)?;
3019                    offset = radix.into();
3020
3021                    if step == 0 {
3022                        self.dirty_root.radix_offset = radix;
3023                    } else {
3024                        last_radix.set_child(self, last_child, offset);
3025                    }
3026
3027                    last_radix = radix;
3028
3029                    match iter.next() {
3030                        None => {
3031                            // "key" is shorter than existing ones. No need to create a new key.
3032                            // For example, insert "a", when root.radix is {'a': {'b': { ... }}}.
3033                            let old_link_offset = radix.link_offset(self)?;
3034                            let new_link_offset = match value {
3035                                InsertValue::Prepend(value) => old_link_offset.create(self, value),
3036                                InsertValue::PrependReplace(value, link_offset) => {
3037                                    link_offset.create(self, value)
3038                                }
3039                                InsertValue::Tombstone => LinkOffset::default(),
3040                                InsertValue::TombstonePrefix => {
3041                                    radix.set_all_to_null(self);
3042                                    return Ok(());
3043                                }
3044                            };
3045                            radix.set_link(self, new_link_offset);
3046                            return Ok(());
3047                        }
3048                        Some(x) => {
3049                            let next_offset = radix.child(self, x)?;
3050                            if next_offset.is_null() {
3051                                // "key" is longer than existing ones. Create key and leaf entries.
3052                                // For example, insert "abcd", when root.radix is {'a': {}}.
3053                                let new_link_offset = match value {
3054                                    InsertValue::Prepend(value) => {
3055                                        LinkOffset::default().create(self, value)
3056                                    }
3057                                    InsertValue::PrependReplace(value, link_offset) => {
3058                                        link_offset.create(self, value)
3059                                    }
3060                                    InsertValue::Tombstone | InsertValue::TombstonePrefix => {
3061                                        // No need to create a key.
3062                                        radix.set_child(self, x, Offset::null());
3063                                        return Ok(());
3064                                    }
3065                                };
3066                                let key_offset = self.create_key(key, key_buf_offset);
3067                                let leaf_offset =
3068                                    LeafOffset::create(self, new_link_offset, key_offset);
3069                                radix.set_child(self, x, leaf_offset.into());
3070                                return Ok(());
3071                            } else {
3072                                offset = next_offset;
3073                                last_child = x;
3074                            }
3075                        }
3076                    }
3077                }
3078                TypedOffset::Leaf(leaf) => {
3079                    let (old_key, old_link_offset) = {
3080                        let (old_key, link_offset) = leaf.key_and_link_offset(self)?;
3081                        // Detach "old_key" from "self".
3082                        // About safety: This is to avoid a memory copy / allocation.
3083                        // `old_key` are only valid before `dirty_*keys` being resized.
3084                        // `old_iter` (used by `split_leaf`) and `old_key` are not used
3085                        // after creating a key. So it's safe to not copy it.
3086                        let detached_key = unsafe { &*(old_key as *const [u8]) };
3087                        (detached_key, link_offset)
3088                    };
3089                    let matched = if let InsertValue::TombstonePrefix = value {
3090                        // Only test the prefix of old_key.
3091                        old_key.get(..key.len()) == Some(key)
3092                    } else {
3093                        old_key == key
3094                    };
3095                    if matched {
3096                        // Key matched. Need to copy leaf entry for modification, except for
3097                        // deletion.
3098                        let new_link_offset = match value {
3099                            InsertValue::Prepend(value) => old_link_offset.create(self, value),
3100                            InsertValue::PrependReplace(value, link_offset) => {
3101                                link_offset.create(self, value)
3102                            }
3103                            InsertValue::Tombstone | InsertValue::TombstonePrefix => {
3104                                // No need to copy the leaf entry.
3105                                last_radix.set_child(self, last_child, Offset::null());
3106                                return Ok(());
3107                            }
3108                        };
3109                        let new_leaf_offset = leaf.set_link(self, new_link_offset)?;
3110                        last_radix.set_child(self, last_child, new_leaf_offset.into());
3111                    } else {
3112                        // Key mismatch. Do a leaf split unless it's a deletion.
3113                        let new_link_offset = match value {
3114                            InsertValue::Prepend(value) => {
3115                                LinkOffset::default().create(self, value)
3116                            }
3117                            InsertValue::PrependReplace(value, link_offset) => {
3118                                link_offset.create(self, value)
3119                            }
3120                            InsertValue::Tombstone | InsertValue::TombstonePrefix => return Ok(()),
3121                        };
3122                        self.split_leaf(
3123                            leaf,
3124                            old_key,
3125                            key.as_ref(),
3126                            key_buf_offset,
3127                            step,
3128                            last_radix,
3129                            last_child,
3130                            old_link_offset,
3131                            new_link_offset,
3132                        )?;
3133                    }
3134                    return Ok(());
3135                }
3136                _ => return Err(self.corruption("unexpected type during insertion")),
3137            }
3138
3139            step += 1;
3140        }
3141    }
3142
3143    /// Convert a slice to [`Bytes`].
3144    /// Do not copy the slice if it's from the on-disk buffer.
3145    pub fn slice_to_bytes(&self, slice: &[u8]) -> Bytes {
3146        self.buf.slice_to_bytes(slice)
3147    }
3148
3149    /// Verify checksum for the entire on-disk buffer.
3150    pub fn verify(&self) -> crate::Result<()> {
3151        self.verify_checksum(0, self.checksum.end)
3152    }
3153
3154    // Internal function used by [`Index::range`].
3155    // Calculate the [`IterState`] stack used by [`RangeIter`].
3156    // `side` is the side of the `bound`, starting side of the iteration,
3157    // the opposite of "towards" side.
3158    fn iter_stack_by_bound(
3159        &self,
3160        bound: Bound<&&[u8]>,
3161        side: Side,
3162    ) -> crate::Result<Vec<IterState>> {
3163        let root_radix = self.dirty_root.radix_offset;
3164        let (inclusive, mut base16iter) = match bound {
3165            Unbounded => {
3166                return Ok(match side {
3167                    Front => vec![IterState::RadixStart(root_radix)],
3168                    Back => vec![IterState::RadixEnd(root_radix)],
3169                });
3170            }
3171            Included(ref key) => (true, Base16Iter::from_base256(key)),
3172            Excluded(ref key) => (false, Base16Iter::from_base256(key)),
3173        };
3174
3175        let mut offset: Offset = root_radix.into();
3176        let mut stack = Vec::<IterState>::new();
3177
3178        while !offset.is_null() {
3179            match offset.to_typed(self)? {
3180                TypedOffset::Radix(radix) => match base16iter.next() {
3181                    None => {
3182                        // The key ends at this Radix entry.
3183                        let state = IterState::RadixLeaf(radix);
3184                        let state = if inclusive {
3185                            state.step(side).unwrap()
3186                        } else {
3187                            state
3188                        };
3189                        stack.push(state);
3190                        return Ok(stack);
3191                    }
3192                    Some(x) => {
3193                        // Follow the `x`-th child in the Radix entry.
3194                        stack.push(IterState::RadixChild(radix, x));
3195                        offset = radix.child(self, x)?;
3196                    }
3197                },
3198                TypedOffset::Leaf(leaf) => {
3199                    let stored_cmp_key = {
3200                        let (stored_key, _link_offset) = leaf.key_and_link_offset(self)?;
3201                        Base16Iter::from_base256(&stored_key)
3202                            .skip(stack.len())
3203                            .cmp(base16iter)
3204                    };
3205                    let state = IterState::Leaf(leaf);
3206                    let state = match (stored_cmp_key, side, inclusive) {
3207                        (Equal, _, true) | (Less, Back, _) | (Greater, Front, _) => {
3208                            state.step(side).unwrap()
3209                        }
3210                        (Equal, _, false) | (Greater, Back, _) | (Less, Front, _) => state,
3211                    };
3212                    stack.push(state);
3213                    return Ok(stack);
3214                }
3215                _ => return Err(self.corruption("unexpected type following prefix")),
3216            }
3217        }
3218
3219        // Prefix does not exist. The stack ends with a RadixChild state that
3220        // points to nothing.
3221        Ok(stack)
3222    }
3223
3224    #[allow(clippy::too_many_arguments)]
3225    /// Split a leaf entry. Separated from `insert_advanced` to make `insert_advanced`
3226    /// shorter.  The parameters are internal states inside `insert_advanced`. Calling this
3227    /// from other functions makes less sense.
3228    #[inline]
3229    fn split_leaf(
3230        &mut self,
3231        old_leaf_offset: LeafOffset,
3232        old_key: &[u8],
3233        new_key: &[u8],
3234        key_buf_offset: Option<(u64, u64)>,
3235        step: usize,
3236        radix_offset: RadixOffset,
3237        child: u8,
3238        old_link_offset: LinkOffset,
3239        new_link_offset: LinkOffset,
3240    ) -> crate::Result<()> {
3241        // This is probably the most complex part. Here are some explanation about input parameters
3242        // and what this function is supposed to do for some cases:
3243        //
3244        // Input parameters are marked using `*`:
3245        //
3246        //      Offset            | Content
3247        //      root_radix        | Radix(child1: radix1, ...)         \
3248        //      radix1            | Radix(child2: radix2, ...)         |> steps
3249        //      ...               | ...                                | (for skipping check
3250        //      *radix_offset*    | Radix(*child*: *leaf_offset*, ...) /  of prefix in keys)
3251        //      *old_leaf_offset* | Leaf(link_offset: *old_link_offset*, ...)
3252        //      *new_link_offset* | Link(...)
3253        //
3254        //      old_* are redundant, but they are pre-calculated by the caller. So just reuse them.
3255        //
3256        // Here are 3 kinds of examples (Keys are embed in Leaf for simplicity):
3257        //
3258        // Example 1. old_key = "1234"; new_key = "1278".
3259        //
3260        //      Offset | Before                | After
3261        //           A | Radix(1: B)           | Radix(1: C)
3262        //           B | Leaf("1234", Link: X) | Leaf("1234", Link: X)
3263        //           C |                       | Radix(2: E)
3264        //           D |                       | Leaf("1278")
3265        //           E |                       | Radix(3: B, 7: D)
3266        //
3267        // Example 2. old_key = "1234", new_key = "12". No need for a new leaf entry:
3268        //
3269        //      Offset | Before                | After
3270        //           A | Radix(1: B)           | Radix(1: C)
3271        //           B | Leaf("1234", Link: X) | Leaf("1234", Link: X)
3272        //           C |                       | Radix(2: B, Link: Y)
3273        //
3274        // Example 3. old_key = "12", new_key = "1234". Need new leaf. Old leaf is not needed.
3275        //
3276        //      Offset | Before              | After
3277        //           A | Radix(1: B)         | Radix(1: C)
3278        //           B | Leaf("12", Link: X) | Leaf("12", Link: X) # not used
3279        //           C |                     | Radix(2: E, Link: X)
3280        //           D |                     | Leaf("1234", Link: Y)
3281        //           E |                     | Radix(3: D)
3282
3283        // UNSAFE NOTICE: Read the "UNSAFE NOTICE" inside `insert_advanced` to learn more.
3284        // Basically, `old_iter` is only guaranteed available if there is no insertion to
3285        // `self.dirty_keys` or `self.dirty_ext_keys`. That's true here since we won't read
3286        // `old_iter` after creating new keys. But be aware of the constraint when modifying the
3287        // code.
3288        let mut old_iter = Base16Iter::from_base256(&old_key).skip(step);
3289        let mut new_iter = Base16Iter::from_base256(&new_key).skip(step);
3290
3291        let mut last_radix_offset = radix_offset;
3292        let mut last_radix_child = child;
3293
3294        let mut completed = false;
3295
3296        loop {
3297            let b1 = old_iter.next();
3298            let b2 = new_iter.next();
3299
3300            let mut radix = MemRadix::default();
3301
3302            if let Some(b1) = b1 {
3303                // Initial value for the b1-th child. Could be rewritten by
3304                // "set_radix_entry_child" in the next loop iteration.
3305                radix.offsets[b1 as usize] = old_leaf_offset.into();
3306            } else {
3307                // Example 3. old_key is a prefix of new_key. A leaf is still needed.
3308                // The new leaf will be created by the next "if" block.
3309                old_leaf_offset.mark_unused(self);
3310                radix.link_offset = old_link_offset;
3311            }
3312
3313            match b2 {
3314                None => {
3315                    // Example 2. new_key is a prefix of old_key. A new leaf is not needed.
3316                    radix.link_offset = new_link_offset;
3317                    completed = true;
3318                }
3319                Some(b2v) if b1 != b2 => {
3320                    // Example 1 and Example 3. A new leaf is needed.
3321                    let new_key_offset = self.create_key(new_key, key_buf_offset);
3322                    let new_leaf_offset = LeafOffset::create(self, new_link_offset, new_key_offset);
3323                    radix.offsets[b2v as usize] = new_leaf_offset.into();
3324                    completed = true;
3325                }
3326                _ => {}
3327            }
3328
3329            // Create the Radix entry, and connect it to the parent entry.
3330            let offset = RadixOffset::create(self, radix);
3331            last_radix_offset.set_child(self, last_radix_child, offset.into());
3332
3333            if completed {
3334                break;
3335            }
3336
3337            debug_assert!(b1 == b2);
3338            last_radix_offset = offset;
3339            last_radix_child = b2.unwrap();
3340        }
3341
3342        Ok(())
3343    }
3344
3345    /// Create a key (if key_buf_offset is None) or ext key (if key_buf_offset is set) entry.
3346    #[inline]
3347    fn create_key(&mut self, key: &[u8], key_buf_offset: Option<(u64, u64)>) -> Offset {
3348        match key_buf_offset {
3349            None => KeyOffset::create(self, key).into(),
3350            Some((start, len)) => ExtKeyOffset::create(self, start, len).into(),
3351        }
3352    }
3353}
3354
3355/// Specify value to insert. Used by `insert_advanced`.
3356#[derive(Copy, Clone)]
3357pub enum InsertValue {
3358    /// Insert as a head of the existing linked list.
3359    Prepend(u64),
3360
3361    /// Replace the linked list. Then insert as a head.
3362    PrependReplace(u64, LinkOffset),
3363
3364    /// Effectively delete associated values for the specified key.
3365    Tombstone,
3366
3367    /// Effectively delete associated values for all keys starting with the
3368    /// prefix.
3369    TombstonePrefix,
3370}
3371
3372/// Debug Formatter
3373
3374impl Debug for Offset {
3375    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3376        if self.is_null() {
3377            write!(f, "None")
3378        } else if self.is_dirty() {
3379            let path = Path::new("<dummy>");
3380            let dummy = SimpleIndexBuf(b"", path);
3381            match self.to_typed(dummy).unwrap() {
3382                TypedOffset::Radix(x) => x.fmt(f),
3383                TypedOffset::Leaf(x) => x.fmt(f),
3384                TypedOffset::Link(x) => x.fmt(f),
3385                TypedOffset::Key(x) => x.fmt(f),
3386                TypedOffset::ExtKey(x) => x.fmt(f),
3387                TypedOffset::Checksum(x) => x.fmt(f),
3388            }
3389        } else {
3390            write!(f, "Disk[{}]", self.0)
3391        }
3392    }
3393}
3394
3395impl Debug for MemRadix {
3396    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3397        write!(f, "Radix {{ link: {:?}", self.link_offset)?;
3398        for (i, v) in self.offsets.iter().cloned().enumerate() {
3399            if !v.is_null() {
3400                write!(f, ", {}: {:?}", i, v)?;
3401            }
3402        }
3403        write!(f, " }}")
3404    }
3405}
3406
3407impl Debug for MemLeaf {
3408    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3409        if self.is_unused() {
3410            write!(f, "Leaf (unused)")
3411        } else {
3412            write!(
3413                f,
3414                "Leaf {{ key: {:?}, link: {:?} }}",
3415                self.key_offset, self.link_offset
3416            )
3417        }
3418    }
3419}
3420
3421impl Debug for MemLink {
3422    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3423        write!(
3424            f,
3425            "Link {{ value: {}, next: {:?} }}",
3426            self.value, self.next_link_offset
3427        )
3428    }
3429}
3430
3431impl Debug for MemKey {
3432    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3433        if self.is_unused() {
3434            write!(f, "Key (unused)")
3435        } else {
3436            write!(f, "Key {{ key:")?;
3437            for byte in self.key.iter() {
3438                write!(f, " {:X}", byte)?;
3439            }
3440            write!(f, " }}")
3441        }
3442    }
3443}
3444
3445impl Debug for MemExtKey {
3446    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3447        if self.is_unused() {
3448            write!(f, "ExtKey (unused)")
3449        } else {
3450            write!(f, "ExtKey {{ start: {}, len: {} }}", self.start, self.len)
3451        }
3452    }
3453}
3454
3455impl Debug for MemRoot {
3456    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3457        if self.meta.is_empty() {
3458            write!(f, "Root {{ radix: {:?} }}", self.radix_offset)
3459        } else {
3460            write!(
3461                f,
3462                "Root {{ radix: {:?}, meta: {:?} }}",
3463                self.radix_offset, self.meta
3464            )
3465        }
3466    }
3467}
3468impl Debug for MemChecksum {
3469    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3470        write!(
3471            f,
3472            "Checksum {{ start: {}, end: {}, chunk_size_logarithm: {}, checksums.len(): {} }}",
3473            self.start,
3474            self.end,
3475            self.chunk_size_logarithm,
3476            self.xxhash_list_to_write().len(),
3477        )
3478    }
3479}
3480
3481impl Debug for Index {
3482    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3483        writeln!(
3484            f,
3485            "Index {{ len: {}, root: {:?} }}",
3486            self.buf.len(),
3487            self.dirty_root.radix_offset
3488        )?;
3489
3490        // On-disk entries
3491        let offset_map = OffsetMap::default();
3492        let mut buf = Vec::with_capacity(self.buf.len());
3493        buf.push(TYPE_HEAD);
3494        let mut root_offset = 0;
3495        loop {
3496            let i = buf.len();
3497            if i >= self.buf.len() {
3498                break;
3499            }
3500            write!(f, "Disk[{}]: ", i)?;
3501            let type_int = self.buf[i];
3502            let i = i as u64;
3503            match type_int {
3504                TYPE_RADIX => {
3505                    let e = MemRadix::read_from(self, i).expect("read");
3506                    e.write_to(&mut buf, &offset_map).expect("write");
3507                    writeln!(f, "{:?}", e)?;
3508                }
3509                TYPE_LEAF => {
3510                    let e = MemLeaf::read_from(self, i).expect("read");
3511                    e.write_noninline_to(&mut buf, &offset_map).expect("write");
3512                    writeln!(f, "{:?}", e)?;
3513                }
3514                TYPE_INLINE_LEAF => {
3515                    let e = MemLeaf::read_from(self, i).expect("read");
3516                    writeln!(f, "Inline{:?}", e)?;
3517                    // Just skip the type int byte so we can parse inlined structures.
3518                    buf.push(TYPE_INLINE_LEAF);
3519                }
3520                TYPE_LINK => {
3521                    let e = MemLink::read_from(self, i).unwrap();
3522                    e.write_to(&mut buf, &offset_map).expect("write");
3523                    writeln!(f, "{:?}", e)?;
3524                }
3525                TYPE_KEY => {
3526                    let e = MemKey::read_from(self, i).expect("read");
3527                    e.write_to(&mut buf, &offset_map).expect("write");
3528                    writeln!(f, "{:?}", e)?;
3529                }
3530                TYPE_EXT_KEY => {
3531                    let e = MemExtKey::read_from(self, i).expect("read");
3532                    e.write_to(&mut buf, &offset_map).expect("write");
3533                    writeln!(f, "{:?}", e)?;
3534                }
3535                TYPE_ROOT => {
3536                    root_offset = i as usize;
3537                    let e = MemRoot::read_from(self, i).expect("read").0;
3538                    e.write_to(&mut buf, &offset_map).expect("write");
3539                    // Preview the next entry. If it's not Checksum, then it's in "legacy"
3540                    // format and we need to write "reversed_vlq" length of the Root entry
3541                    // immediately.
3542                    if self.buf.get(buf.len()) != Some(&TYPE_CHECKSUM)
3543                        || MemChecksum::read_from(&self, buf.len() as u64).is_err()
3544                    {
3545                        let root_len = buf.len() - root_offset;
3546                        write_reversed_vlq(&mut buf, root_len).expect("write");
3547                    }
3548                    writeln!(f, "{:?}", e)?;
3549                }
3550                TYPE_CHECKSUM => {
3551                    let e = MemChecksum::read_from(&self, i).expect("read").0;
3552                    e.write_to(&mut buf, &offset_map).expect("write");
3553                    let root_checksum_len = buf.len() - root_offset;
3554                    let vlq_start = buf.len();
3555                    write_reversed_vlq(&mut buf, root_checksum_len).expect("write");
3556                    let vlq_end = buf.len();
3557                    debug_assert_eq!(
3558                        &buf[vlq_start..vlq_end],
3559                        &self.buf[vlq_start..vlq_end],
3560                        "reversed vlq should match (root+checksum len: {})",
3561                        root_checksum_len
3562                    );
3563                    writeln!(f, "{:?}", e)?;
3564                }
3565                _ => {
3566                    writeln!(f, "Broken Data!")?;
3567                    break;
3568                }
3569            }
3570        }
3571
3572        if buf.len() > 1 && self.buf[..] != buf[..] {
3573            debug_assert_eq!(&self.buf[..], &buf[..]);
3574            return writeln!(f, "Inconsistent Data!");
3575        }
3576
3577        // In-memory entries
3578        for (i, e) in self.dirty_radixes.iter().enumerate() {
3579            write!(f, "Radix[{}]: ", i)?;
3580            writeln!(f, "{:?}", e)?;
3581        }
3582
3583        for (i, e) in self.dirty_leafs.iter().enumerate() {
3584            write!(f, "Leaf[{}]: ", i)?;
3585            writeln!(f, "{:?}", e)?;
3586        }
3587
3588        for (i, e) in self.dirty_links.iter().enumerate() {
3589            write!(f, "Link[{}]: ", i)?;
3590            writeln!(f, "{:?}", e)?;
3591        }
3592
3593        for (i, e) in self.dirty_keys.iter().enumerate() {
3594            write!(f, "Key[{}]: ", i)?;
3595            writeln!(f, "{:?}", e)?;
3596        }
3597
3598        for (i, e) in self.dirty_ext_keys.iter().enumerate() {
3599            writeln!(f, "ExtKey[{}]: {:?}", i, e)?;
3600        }
3601
3602        Ok(())
3603    }
3604}
3605
3606#[cfg(test)]
3607mod tests {
3608    use std::collections::BTreeSet;
3609    use std::collections::HashMap;
3610    use std::fs::File;
3611
3612    use quickcheck::quickcheck;
3613    use tempfile::tempdir;
3614
3615    use super::InsertValue::PrependReplace;
3616    use super::*;
3617
3618    fn open_opts() -> OpenOptions {
3619        let mut opts = OpenOptions::new();
3620        opts.checksum_chunk_size_logarithm(4);
3621        opts
3622    }
3623
3624    fn in_memory_index() -> Index {
3625        OpenOptions::new().create_in_memory().unwrap()
3626    }
3627
3628    #[test]
3629    fn test_scan_prefix() {
3630        let dir = tempdir().unwrap();
3631        let mut index = open_opts().open(dir.path().join("a")).unwrap();
3632        let keys: Vec<&[u8]> = vec![b"01", b"02", b"03", b"031", b"0410", b"042", b"05000"];
3633        for (i, key) in keys.iter().enumerate() {
3634            index.insert(key, i as u64).unwrap();
3635        }
3636
3637        // Return keys with the given prefix. Also verify LinkOffsets.
3638        let scan_keys = |prefix: &[u8]| -> Vec<Vec<u8>> {
3639            let iter = index.scan_prefix(prefix).unwrap();
3640            iter_to_keys(&index, &keys, &iter)
3641        };
3642
3643        assert_eq!(scan_keys(b"01"), vec![b"01"]);
3644        assert!(scan_keys(b"010").is_empty());
3645        assert_eq!(scan_keys(b"02"), vec![b"02"]);
3646        assert!(scan_keys(b"020").is_empty());
3647        assert_eq!(scan_keys(b"03"), vec![&b"03"[..], b"031"]);
3648        assert_eq!(scan_keys(b"031"), vec![b"031"]);
3649        assert!(scan_keys(b"032").is_empty());
3650        assert_eq!(scan_keys(b"04"), vec![&b"0410"[..], b"042"]);
3651        assert_eq!(scan_keys(b"041"), vec![b"0410"]);
3652        assert_eq!(scan_keys(b"0410"), vec![b"0410"]);
3653        assert!(scan_keys(b"04101").is_empty());
3654        assert!(scan_keys(b"0412").is_empty());
3655        assert_eq!(scan_keys(b"042"), vec![b"042"]);
3656        assert!(scan_keys(b"0421").is_empty());
3657        assert_eq!(scan_keys(b"05"), vec![b"05000"]);
3658        assert_eq!(scan_keys(b"0500"), vec![b"05000"]);
3659        assert_eq!(scan_keys(b"05000"), vec![b"05000"]);
3660        assert!(scan_keys(b"051").is_empty());
3661        assert_eq!(scan_keys(b"0"), keys);
3662        assert_eq!(scan_keys(b""), keys);
3663        assert!(scan_keys(b"1").is_empty());
3664
3665        // 0x30 = b'0'
3666        assert_eq!(index.scan_prefix_hex(b"30").unwrap().count(), keys.len());
3667        assert_eq!(index.scan_prefix_hex(b"3").unwrap().count(), keys.len());
3668        assert_eq!(index.scan_prefix_hex(b"31").unwrap().count(), 0);
3669    }
3670
3671    #[test]
3672    fn test_remove() {
3673        let dir = tempdir().unwrap();
3674        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3675
3676        // Removing keys on an empty index should not create new entries.
3677        let text = format!("{:?}", &index);
3678        index.remove("").unwrap();
3679        index.remove("a").unwrap();
3680        assert_eq!(text, format!("{:?}", &index));
3681
3682        index.insert(b"abc", 42).unwrap();
3683        index.insert(b"abc", 43).unwrap();
3684        index.insert(b"abxyz", 44).unwrap();
3685        index.flush().unwrap();
3686
3687        // Remove known keys.
3688        assert_eq!(index.range(..).unwrap().count(), 2);
3689        index.remove(b"abxyz").unwrap();
3690        assert_eq!(index.range(..).unwrap().count(), 1);
3691        index.remove(b"abc").unwrap();
3692        assert_eq!(index.range(..).unwrap().count(), 0);
3693
3694        // Since all entries are "dirty" in memory, removing keys should not create new entries.
3695        let text = format!("{:?}", &index);
3696        index.remove("").unwrap();
3697        index.remove("a").unwrap();
3698        index.remove("ab").unwrap();
3699        index.remove("abc").unwrap();
3700        index.remove("abcd").unwrap();
3701        index.remove("abcde").unwrap();
3702        index.remove("abcx").unwrap();
3703        index.remove("abcxyz").unwrap();
3704        index.remove("abcxyzz").unwrap();
3705        assert_eq!(text, format!("{:?}", &index));
3706
3707        // Removal state can be saved to disk.
3708        index.flush().unwrap();
3709        let index = open_opts().open(dir.path().join("a")).expect("open");
3710        assert_eq!(index.range(..).unwrap().count(), 0);
3711    }
3712
3713    #[test]
3714    fn test_remove_recursive() {
3715        let dir = tempdir().unwrap();
3716        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3717        index.insert(b"abc", 42).unwrap();
3718        index.insert(b"abc", 42).unwrap();
3719        index.insert(b"abxyz1", 42).unwrap();
3720        index.insert(b"abxyz2", 42).unwrap();
3721        index.insert(b"abxyz33333", 42).unwrap();
3722        index.insert(b"abxyz44444", 42).unwrap();
3723        index.insert(b"aby", 42).unwrap();
3724        index.flush().unwrap();
3725
3726        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3727        let mut n = index.range(..).unwrap().count();
3728        index.remove_prefix(b"abxyz33333333333").unwrap(); // nothing removed
3729        assert_eq!(index.range(..).unwrap().count(), n);
3730
3731        index.remove_prefix(b"abxyz33333").unwrap(); // exact match
3732        n -= 1; // abxyz33333 removed
3733        assert_eq!(index.range(..).unwrap().count(), n);
3734
3735        index.remove_prefix(b"abxyz4").unwrap(); // prefix exact match
3736        n -= 1; // abxyz44444 removed
3737        assert_eq!(index.range(..).unwrap().count(), n);
3738
3739        index.remove_prefix(b"ab").unwrap(); // prefix match
3740        n -= 4; // abc, aby, abxyz1, abxyz2 removed
3741        assert_eq!(index.range(..).unwrap().count(), n);
3742
3743        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3744        index.remove_prefix(b"").unwrap(); // remove everything
3745        assert_eq!(index.range(..).unwrap().count(), 0);
3746    }
3747
3748    #[test]
3749    fn test_distinct_one_byte_keys() {
3750        let dir = tempdir().unwrap();
3751        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3752        assert_eq!(
3753            format!("{:?}", index),
3754            "Index { len: 0, root: Radix[0] }\n\
3755             Radix[0]: Radix { link: None }\n"
3756        );
3757
3758        index.insert(&[], 55).expect("update");
3759        assert_eq!(
3760            format!("{:?}", index),
3761            r#"Index { len: 0, root: Radix[0] }
3762Radix[0]: Radix { link: Link[0] }
3763Link[0]: Link { value: 55, next: None }
3764"#
3765        );
3766
3767        index.insert(&[0x12], 77).expect("update");
3768        assert_eq!(
3769            format!("{:?}", index),
3770            r#"Index { len: 0, root: Radix[0] }
3771Radix[0]: Radix { link: Link[0], 1: Leaf[0] }
3772Leaf[0]: Leaf { key: Key[0], link: Link[1] }
3773Link[0]: Link { value: 55, next: None }
3774Link[1]: Link { value: 77, next: None }
3775Key[0]: Key { key: 12 }
3776"#
3777        );
3778
3779        let link = index.get(&[0x12]).expect("get");
3780        index
3781            .insert_advanced(InsertKey::Embed(&[0x34]), PrependReplace(99, link))
3782            .expect("update");
3783        assert_eq!(
3784            format!("{:?}", index),
3785            r#"Index { len: 0, root: Radix[0] }
3786Radix[0]: Radix { link: Link[0], 1: Leaf[0], 3: Leaf[1] }
3787Leaf[0]: Leaf { key: Key[0], link: Link[1] }
3788Leaf[1]: Leaf { key: Key[1], link: Link[2] }
3789Link[0]: Link { value: 55, next: None }
3790Link[1]: Link { value: 77, next: None }
3791Link[2]: Link { value: 99, next: Link[1] }
3792Key[0]: Key { key: 12 }
3793Key[1]: Key { key: 34 }
3794"#
3795        );
3796    }
3797
3798    #[test]
3799    fn test_distinct_one_byte_keys_flush() {
3800        let dir = tempdir().unwrap();
3801        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3802
3803        // 1st flush.
3804        assert_eq!(index.flush().expect("flush"), 24);
3805        assert_eq!(
3806            format!("{:?}", index),
3807            r#"Index { len: 24, root: Disk[1] }
3808Disk[1]: Radix { link: None }
3809Disk[5]: Root { radix: Disk[1] }
3810Disk[8]: Checksum { start: 0, end: 8, chunk_size_logarithm: 4, checksums.len(): 1 }
3811"#
3812        );
3813
3814        // Mixed on-disk and in-memory state.
3815        index.insert(&[], 55).expect("update");
3816        index.insert(&[0x12], 77).expect("update");
3817        assert_eq!(
3818            format!("{:?}", index),
3819            r#"Index { len: 24, root: Radix[0] }
3820Disk[1]: Radix { link: None }
3821Disk[5]: Root { radix: Disk[1] }
3822Disk[8]: Checksum { start: 0, end: 8, chunk_size_logarithm: 4, checksums.len(): 1 }
3823Radix[0]: Radix { link: Link[0], 1: Leaf[0] }
3824Leaf[0]: Leaf { key: Key[0], link: Link[1] }
3825Link[0]: Link { value: 55, next: None }
3826Link[1]: Link { value: 77, next: None }
3827Key[0]: Key { key: 12 }
3828"#
3829        );
3830
3831        // After 2nd flush. There are 2 roots.
3832        let link = index.get(&[0x12]).expect("get");
3833        index
3834            .insert_advanced(InsertKey::Embed(&[0x34]), PrependReplace(99, link))
3835            .expect("update");
3836        index.flush().expect("flush");
3837        assert_eq!(
3838            format!("{:?}", index),
3839            r#"Index { len: 104, root: Disk[45] }
3840Disk[1]: Radix { link: None }
3841Disk[5]: Root { radix: Disk[1] }
3842Disk[8]: Checksum { start: 0, end: 8, chunk_size_logarithm: 4, checksums.len(): 1 }
3843Disk[24]: Key { key: 12 }
3844Disk[27]: Key { key: 34 }
3845Disk[30]: Link { value: 55, next: None }
3846Disk[33]: Link { value: 77, next: None }
3847Disk[36]: Link { value: 99, next: Disk[33] }
3848Disk[39]: Leaf { key: Disk[24], link: Disk[33] }
3849Disk[42]: Leaf { key: Disk[27], link: Disk[36] }
3850Disk[45]: Radix { link: Disk[30], 1: Disk[39], 3: Disk[42] }
3851Disk[61]: Root { radix: Disk[45] }
3852Disk[64]: Checksum { start: 0, end: 64, chunk_size_logarithm: 4, checksums.len(): 4 }
3853"#
3854        );
3855    }
3856
3857    #[test]
3858    fn test_leaf_split() {
3859        let dir = tempdir().unwrap();
3860        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3861
3862        // Example 1: two keys are not prefixes of each other
3863        index.insert(&[0x12, 0x34], 5).expect("insert");
3864        assert_eq!(
3865            format!("{:?}", index),
3866            r#"Index { len: 0, root: Radix[0] }
3867Radix[0]: Radix { link: None, 1: Leaf[0] }
3868Leaf[0]: Leaf { key: Key[0], link: Link[0] }
3869Link[0]: Link { value: 5, next: None }
3870Key[0]: Key { key: 12 34 }
3871"#
3872        );
3873        index.insert(&[0x12, 0x78], 7).expect("insert");
3874        assert_eq!(
3875            format!("{:?}", index),
3876            r#"Index { len: 0, root: Radix[0] }
3877Radix[0]: Radix { link: None, 1: Radix[1] }
3878Radix[1]: Radix { link: None, 2: Radix[2] }
3879Radix[2]: Radix { link: None, 3: Leaf[0], 7: Leaf[1] }
3880Leaf[0]: Leaf { key: Key[0], link: Link[0] }
3881Leaf[1]: Leaf { key: Key[1], link: Link[1] }
3882Link[0]: Link { value: 5, next: None }
3883Link[1]: Link { value: 7, next: None }
3884Key[0]: Key { key: 12 34 }
3885Key[1]: Key { key: 12 78 }
3886"#
3887        );
3888
3889        // Example 2: new key is a prefix of the old key
3890        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3891        index.insert(&[0x12, 0x34], 5).expect("insert");
3892        index.insert(&[0x12], 7).expect("insert");
3893        assert_eq!(
3894            format!("{:?}", index),
3895            r#"Index { len: 0, root: Radix[0] }
3896Radix[0]: Radix { link: None, 1: Radix[1] }
3897Radix[1]: Radix { link: None, 2: Radix[2] }
3898Radix[2]: Radix { link: Link[1], 3: Leaf[0] }
3899Leaf[0]: Leaf { key: Key[0], link: Link[0] }
3900Link[0]: Link { value: 5, next: None }
3901Link[1]: Link { value: 7, next: None }
3902Key[0]: Key { key: 12 34 }
3903"#
3904        );
3905
3906        // Example 3: old key is a prefix of the new key
3907        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3908        index.insert(&[0x12], 5).expect("insert");
3909        index.insert(&[0x12, 0x78], 7).expect("insert");
3910        assert_eq!(
3911            format!("{:?}", index),
3912            r#"Index { len: 0, root: Radix[0] }
3913Radix[0]: Radix { link: None, 1: Radix[1] }
3914Radix[1]: Radix { link: None, 2: Radix[2] }
3915Radix[2]: Radix { link: Link[0], 7: Leaf[1] }
3916Leaf[0]: Leaf (unused)
3917Leaf[1]: Leaf { key: Key[1], link: Link[1] }
3918Link[0]: Link { value: 5, next: None }
3919Link[1]: Link { value: 7, next: None }
3920Key[0]: Key (unused)
3921Key[1]: Key { key: 12 78 }
3922"#
3923        );
3924
3925        // Same key. Multiple values.
3926        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3927        index.insert(&[0x12], 5).expect("insert");
3928        index.insert(&[0x12], 7).expect("insert");
3929        assert_eq!(
3930            format!("{:?}", index),
3931            r#"Index { len: 0, root: Radix[0] }
3932Radix[0]: Radix { link: None, 1: Leaf[0] }
3933Leaf[0]: Leaf { key: Key[0], link: Link[1] }
3934Link[0]: Link { value: 5, next: None }
3935Link[1]: Link { value: 7, next: Link[0] }
3936Key[0]: Key { key: 12 }
3937"#
3938        );
3939    }
3940
3941    #[test]
3942    fn test_leaf_split_flush() {
3943        // Similar with test_leaf_split, but flush the first key before inserting the second.
3944        // This triggers some new code paths.
3945        let dir = tempdir().unwrap();
3946        let mut index = open_opts().open(dir.path().join("1")).expect("open");
3947
3948        // Example 1: two keys are not prefixes of each other
3949        index.insert(&[0x12, 0x34], 5).expect("insert");
3950        index.flush().expect("flush");
3951        assert_eq!(
3952            format!("{:?}", index),
3953            r#"Index { len: 46, root: Disk[11] }
3954Disk[1]: Key { key: 12 34 }
3955Disk[5]: Link { value: 5, next: None }
3956Disk[8]: Leaf { key: Disk[1], link: Disk[5] }
3957Disk[11]: Radix { link: None, 1: Disk[8] }
3958Disk[19]: Root { radix: Disk[11] }
3959Disk[22]: Checksum { start: 0, end: 22, chunk_size_logarithm: 4, checksums.len(): 2 }
3960"#
3961        );
3962        index.insert(&[0x12, 0x78], 7).expect("insert");
3963        assert_eq!(
3964            format!("{:?}", index),
3965            r#"Index { len: 46, root: Radix[0] }
3966Disk[1]: Key { key: 12 34 }
3967Disk[5]: Link { value: 5, next: None }
3968Disk[8]: Leaf { key: Disk[1], link: Disk[5] }
3969Disk[11]: Radix { link: None, 1: Disk[8] }
3970Disk[19]: Root { radix: Disk[11] }
3971Disk[22]: Checksum { start: 0, end: 22, chunk_size_logarithm: 4, checksums.len(): 2 }
3972Radix[0]: Radix { link: None, 1: Radix[1] }
3973Radix[1]: Radix { link: None, 2: Radix[2] }
3974Radix[2]: Radix { link: None, 3: Disk[8], 7: Leaf[0] }
3975Leaf[0]: Leaf { key: Key[0], link: Link[0] }
3976Link[0]: Link { value: 7, next: None }
3977Key[0]: Key { key: 12 78 }
3978"#
3979        );
3980
3981        // Example 2: new key is a prefix of the old key
3982        let mut index = open_opts().open(dir.path().join("2")).expect("open");
3983        index.insert(&[0x12, 0x34], 5).expect("insert");
3984        index.flush().expect("flush");
3985        index.insert(&[0x12], 7).expect("insert");
3986        assert_eq!(
3987            format!("{:?}", index),
3988            r#"Index { len: 46, root: Radix[0] }
3989Disk[1]: Key { key: 12 34 }
3990Disk[5]: Link { value: 5, next: None }
3991Disk[8]: Leaf { key: Disk[1], link: Disk[5] }
3992Disk[11]: Radix { link: None, 1: Disk[8] }
3993Disk[19]: Root { radix: Disk[11] }
3994Disk[22]: Checksum { start: 0, end: 22, chunk_size_logarithm: 4, checksums.len(): 2 }
3995Radix[0]: Radix { link: None, 1: Radix[1] }
3996Radix[1]: Radix { link: None, 2: Radix[2] }
3997Radix[2]: Radix { link: Link[0], 3: Disk[8] }
3998Link[0]: Link { value: 7, next: None }
3999"#
4000        );
4001
4002        // Example 3: old key is a prefix of the new key
4003        // Only one flush - only one key is written.
4004        let mut index = open_opts().open(dir.path().join("3a")).expect("open");
4005        index.insert(&[0x12], 5).expect("insert");
4006        index.insert(&[0x12, 0x78], 7).expect("insert");
4007        index.flush().expect("flush");
4008        assert_eq!(
4009            format!("{:?}", index),
4010            r#"Index { len: 77, root: Disk[34] }
4011Disk[1]: Key { key: 12 78 }
4012Disk[5]: Link { value: 5, next: None }
4013Disk[8]: Link { value: 7, next: None }
4014Disk[11]: Leaf { key: Disk[1], link: Disk[8] }
4015Disk[14]: Radix { link: Disk[5], 7: Disk[11] }
4016Disk[26]: Radix { link: None, 2: Disk[14] }
4017Disk[34]: Radix { link: None, 1: Disk[26] }
4018Disk[42]: Root { radix: Disk[34] }
4019Disk[45]: Checksum { start: 0, end: 45, chunk_size_logarithm: 4, checksums.len(): 3 }
4020"#
4021        );
4022
4023        // With two flushes - the old key cannot be removed since it was written.
4024        let mut index = open_opts().open(dir.path().join("3b")).expect("open");
4025        index.insert(&[0x12], 5).expect("insert");
4026        index.flush().expect("flush");
4027        index.insert(&[0x12, 0x78], 7).expect("insert");
4028        assert_eq!(
4029            format!("{:?}", index),
4030            r#"Index { len: 45, root: Radix[0] }
4031Disk[1]: Key { key: 12 }
4032Disk[4]: Link { value: 5, next: None }
4033Disk[7]: Leaf { key: Disk[1], link: Disk[4] }
4034Disk[10]: Radix { link: None, 1: Disk[7] }
4035Disk[18]: Root { radix: Disk[10] }
4036Disk[21]: Checksum { start: 0, end: 21, chunk_size_logarithm: 4, checksums.len(): 2 }
4037Radix[0]: Radix { link: None, 1: Radix[1] }
4038Radix[1]: Radix { link: None, 2: Radix[2] }
4039Radix[2]: Radix { link: Disk[4], 7: Leaf[0] }
4040Leaf[0]: Leaf { key: Key[0], link: Link[0] }
4041Link[0]: Link { value: 7, next: None }
4042Key[0]: Key { key: 12 78 }
4043"#
4044        );
4045
4046        // Same key. Multiple values.
4047        let mut index = open_opts().open(dir.path().join("4")).expect("open");
4048        index.insert(&[0x12], 5).expect("insert");
4049        index.flush().expect("flush");
4050        index.insert(&[0x12], 7).expect("insert");
4051        assert_eq!(
4052            format!("{:?}", index),
4053            r#"Index { len: 45, root: Radix[0] }
4054Disk[1]: Key { key: 12 }
4055Disk[4]: Link { value: 5, next: None }
4056Disk[7]: Leaf { key: Disk[1], link: Disk[4] }
4057Disk[10]: Radix { link: None, 1: Disk[7] }
4058Disk[18]: Root { radix: Disk[10] }
4059Disk[21]: Checksum { start: 0, end: 21, chunk_size_logarithm: 4, checksums.len(): 2 }
4060Radix[0]: Radix { link: None, 1: Leaf[0] }
4061Leaf[0]: Leaf { key: Disk[1], link: Link[0] }
4062Link[0]: Link { value: 7, next: Disk[4] }
4063"#
4064        );
4065    }
4066
4067    #[test]
4068    fn test_external_keys() {
4069        let buf = Arc::new(vec![0x12u8, 0x34, 0x56, 0x78, 0x9a, 0xbc]);
4070        let dir = tempdir().unwrap();
4071        let mut index = open_opts()
4072            .key_buf(Some(buf))
4073            .open(dir.path().join("a"))
4074            .expect("open");
4075        index
4076            .insert_advanced(InsertKey::Reference((1, 2)), InsertValue::Prepend(55))
4077            .expect("insert");
4078        index.flush().expect("flush");
4079        index
4080            .insert_advanced(InsertKey::Reference((1, 3)), InsertValue::Prepend(77))
4081            .expect("insert");
4082        assert_eq!(
4083            format!("{:?}", index),
4084            r#"Index { len: 43, root: Radix[0] }
4085Disk[1]: InlineLeaf { key: Disk[2], link: Disk[5] }
4086Disk[2]: ExtKey { start: 1, len: 2 }
4087Disk[5]: Link { value: 55, next: None }
4088Disk[8]: Radix { link: None, 3: Disk[1] }
4089Disk[16]: Root { radix: Disk[8] }
4090Disk[19]: Checksum { start: 0, end: 19, chunk_size_logarithm: 4, checksums.len(): 2 }
4091Radix[0]: Radix { link: None, 3: Radix[1] }
4092Radix[1]: Radix { link: None, 4: Radix[2] }
4093Radix[2]: Radix { link: None, 5: Radix[3] }
4094Radix[3]: Radix { link: None, 6: Radix[4] }
4095Radix[4]: Radix { link: Disk[5], 7: Leaf[0] }
4096Leaf[0]: Leaf { key: ExtKey[0], link: Link[0] }
4097Link[0]: Link { value: 77, next: None }
4098ExtKey[0]: ExtKey { start: 1, len: 3 }
4099"#
4100        );
4101    }
4102
4103    #[test]
4104    fn test_inline_leafs() {
4105        let buf = Arc::new(vec![0x12u8, 0x34, 0x56, 0x78, 0x9a, 0xbc]);
4106        let dir = tempdir().unwrap();
4107        let mut index = open_opts()
4108            .key_buf(Some(buf))
4109            .open(dir.path().join("a"))
4110            .expect("open");
4111
4112        // New entry. Should be inlined.
4113        index
4114            .insert_advanced(InsertKey::Reference((1, 1)), InsertValue::Prepend(55))
4115            .unwrap();
4116        index.flush().expect("flush");
4117
4118        // Independent leaf. Should also be inlined.
4119        index
4120            .insert_advanced(InsertKey::Reference((2, 1)), InsertValue::Prepend(77))
4121            .unwrap();
4122        index.flush().expect("flush");
4123
4124        // The link with 88 should refer to the inlined leaf 77.
4125        index
4126            .insert_advanced(InsertKey::Reference((2, 1)), InsertValue::Prepend(88))
4127            .unwrap();
4128        index.flush().expect("flush");
4129
4130        // Not inlined because dependent link was not written first.
4131        // (could be optimized in the future)
4132        index
4133            .insert_advanced(InsertKey::Reference((3, 1)), InsertValue::Prepend(99))
4134            .unwrap();
4135        index
4136            .insert_advanced(InsertKey::Reference((3, 1)), InsertValue::Prepend(100))
4137            .unwrap();
4138        index.flush().expect("flush");
4139
4140        assert_eq!(
4141            format!("{:?}", index),
4142            r#"Index { len: 257, root: Disk[181] }
4143Disk[1]: InlineLeaf { key: Disk[2], link: Disk[5] }
4144Disk[2]: ExtKey { start: 1, len: 1 }
4145Disk[5]: Link { value: 55, next: None }
4146Disk[8]: Radix { link: None, 3: Disk[1] }
4147Disk[16]: Root { radix: Disk[8] }
4148Disk[19]: Checksum { start: 0, end: 19, chunk_size_logarithm: 4, checksums.len(): 2 }
4149Disk[43]: InlineLeaf { key: Disk[44], link: Disk[47] }
4150Disk[44]: ExtKey { start: 2, len: 1 }
4151Disk[47]: Link { value: 77, next: None }
4152Disk[50]: Radix { link: None, 3: Disk[1], 5: Disk[43] }
4153Disk[62]: Root { radix: Disk[50] }
4154Disk[65]: Checksum { start: 19, end: 65, chunk_size_logarithm: 4, checksums.len(): 4 }
4155Disk[105]: Link { value: 88, next: Disk[47] }
4156Disk[108]: Leaf { key: Disk[44], link: Disk[105] }
4157Disk[111]: Radix { link: None, 3: Disk[1], 5: Disk[108] }
4158Disk[123]: Root { radix: Disk[111] }
4159Disk[126]: Checksum { start: 65, end: 126, chunk_size_logarithm: 4, checksums.len(): 4 }
4160Disk[166]: ExtKey { start: 3, len: 1 }
4161Disk[169]: Link { value: 99, next: None }
4162Disk[172]: Link { value: 100, next: Disk[169] }
4163Disk[176]: Leaf { key: Disk[166], link: Disk[172] }
4164Disk[181]: Radix { link: None, 3: Disk[1], 5: Disk[108], 7: Disk[176] }
4165Disk[197]: Root { radix: Disk[181] }
4166Disk[201]: Checksum { start: 126, end: 201, chunk_size_logarithm: 4, checksums.len(): 6 }
4167"#
4168        )
4169    }
4170
4171    #[test]
4172    fn test_clone() {
4173        let dir = tempdir().unwrap();
4174        let mut index = open_opts().open(dir.path().join("a")).expect("open");
4175
4176        // Test clone empty index
4177        assert_eq!(
4178            format!("{:?}", index.try_clone().unwrap()),
4179            format!("{:?}", index)
4180        );
4181        assert_eq!(
4182            format!("{:?}", index.try_clone_without_dirty().unwrap()),
4183            format!("{:?}", index)
4184        );
4185
4186        // Test on-disk Index
4187        index.insert(&[], 55).expect("insert");
4188        index.insert(&[0x12], 77).expect("insert");
4189        index.flush().expect("flush");
4190        index.insert(&[0x15], 99).expect("insert");
4191
4192        let mut index2 = index.try_clone().expect("clone");
4193        assert_eq!(format!("{:?}", index), format!("{:?}", index2));
4194
4195        // Test clone without in-memory part
4196        let index2clean = index.try_clone_without_dirty().unwrap();
4197        index2.clear_dirty();
4198        assert_eq!(format!("{:?}", index2), format!("{:?}", index2clean));
4199
4200        // Test in-memory Index
4201        let mut index3 = open_opts()
4202            .checksum_chunk_size_logarithm(0)
4203            .create_in_memory()
4204            .unwrap();
4205        let index4 = index3.try_clone().unwrap();
4206        assert_eq!(format!("{:?}", index3), format!("{:?}", index4));
4207
4208        index3.insert(&[0x15], 99).expect("insert");
4209        let index4 = index3.try_clone().unwrap();
4210        assert_eq!(format!("{:?}", index3), format!("{:?}", index4));
4211    }
4212
4213    #[test]
4214    fn test_open_options_write() {
4215        let dir = tempdir().unwrap();
4216        let mut index = OpenOptions::new().open(dir.path().join("a")).expect("open");
4217        index.insert(&[0x12], 77).expect("insert");
4218        index.flush().expect("flush");
4219
4220        OpenOptions::new()
4221            .write(Some(false))
4222            .open(dir.path().join("b"))
4223            .expect_err("open"); // file does not exist
4224
4225        let mut index = OpenOptions::new()
4226            .write(Some(false))
4227            .open(dir.path().join("a"))
4228            .expect("open");
4229        index.flush().expect_err("cannot flush read-only index");
4230    }
4231
4232    #[test]
4233    fn test_linked_list_values() {
4234        let dir = tempdir().unwrap();
4235        let mut index = OpenOptions::new().open(dir.path().join("a")).expect("open");
4236        let list = vec![11u64, 17, 19, 31];
4237        for i in list.iter().rev() {
4238            index.insert(&[], *i).expect("insert");
4239        }
4240
4241        let list1: Vec<u64> = index
4242            .get(&[])
4243            .unwrap()
4244            .values(&index)
4245            .map(|v| v.unwrap())
4246            .collect();
4247        assert_eq!(list, list1);
4248
4249        index.flush().expect("flush");
4250        let list2: Vec<u64> = index
4251            .get(&[])
4252            .unwrap()
4253            .values(&index)
4254            .map(|v| v.unwrap())
4255            .collect();
4256        assert_eq!(list, list2);
4257
4258        // Empty linked list
4259        assert_eq!(index.get(&[1]).unwrap().values(&index).count(), 0);
4260
4261        // In case error happens, the iteration still stops.
4262        index.insert(&[], 5).expect("insert");
4263        index.dirty_links[0].next_link_offset = LinkOffset(Offset(1000));
4264        // Note: `collect` can return `crate::Result<Vec<u64>>`. But that does not exercises the
4265        // infinite loop avoidance logic since `collect` stops iteration at the first error.
4266        let list_errored: Vec<crate::Result<u64>> =
4267            index.get(&[]).unwrap().values(&index).collect();
4268        assert!(list_errored[list_errored.len() - 1].is_err());
4269    }
4270
4271    #[test]
4272    fn test_checksum_bitflip_1b() {
4273        test_checksum_bitflip_with_size(0);
4274    }
4275
4276    #[test]
4277    fn test_checksum_bitflip_2b() {
4278        test_checksum_bitflip_with_size(1);
4279    }
4280
4281    #[test]
4282    fn test_checksum_bitflip_16b() {
4283        test_checksum_bitflip_with_size(4);
4284    }
4285
4286    #[test]
4287    fn test_checksum_bitflip_1mb() {
4288        test_checksum_bitflip_with_size(20);
4289    }
4290
4291    fn test_checksum_bitflip_with_size(checksum_log_size: u32) {
4292        let dir = tempdir().unwrap();
4293
4294        let keys = if cfg!(debug_assertions) {
4295            // Debug build is much slower than release build. Limit the key length to 1-byte.
4296            vec![vec![0x13], vec![0x17], vec![]]
4297        } else {
4298            // Release build can afford 2-byte key test.
4299            vec![
4300                vec![0x12, 0x34],
4301                vec![0x12, 0x78],
4302                vec![0x34, 0x56],
4303                vec![0x34],
4304                vec![0x78],
4305                vec![0x78, 0x9a],
4306            ]
4307        };
4308
4309        let opts = open_opts()
4310            .checksum_chunk_size_logarithm(checksum_log_size)
4311            .clone();
4312
4313        let bytes = {
4314            let mut index = opts.open(dir.path().join("a")).expect("open");
4315
4316            for (i, key) in keys.iter().enumerate() {
4317                index.insert(key, i as u64).expect("insert");
4318                index.insert(key, (i as u64) << 50).expect("insert");
4319            }
4320            index.flush().expect("flush");
4321
4322            // Read the raw bytes of the index content
4323            let mut f = File::open(dir.path().join("a")).expect("open");
4324            let mut buf = vec![];
4325            f.read_to_end(&mut buf).expect("read");
4326
4327            // Drop `index` here. This would unmap files so File::create below
4328            // can work on Windows.
4329            if std::env::var("DEBUG").is_ok() {
4330                eprintln!("{:?}", &index);
4331            }
4332
4333            buf
4334        };
4335
4336        fn is_corrupted(index: &Index, key: &[u8]) -> bool {
4337            let link = index.get(&key);
4338            match link {
4339                Err(_) => true,
4340                Ok(link) => link.values(index).any(|v| v.is_err()),
4341            }
4342        }
4343
4344        // Every bit change should trigger errors when reading all contents
4345        for i in 0..(bytes.len() * 8) {
4346            let mut bytes = bytes.clone();
4347            bytes[i / 8] ^= 1u8 << (i % 8);
4348            let mut f = File::create(dir.path().join("a")).expect("create");
4349            f.write_all(&bytes).expect("write");
4350
4351            let index = opts.clone().open(dir.path().join("a"));
4352            let detected = match index {
4353                Err(_) => true,
4354                Ok(index) => {
4355                    let range = if cfg!(debug_assertions) { 0 } else { 0x10000 };
4356                    (0..range).any(|key_int| {
4357                        let key = [(key_int >> 8) as u8, (key_int & 0xff) as u8];
4358                        is_corrupted(&index, &key)
4359                    }) || (0..0x100).any(|key_int| {
4360                        let key = [key_int as u8];
4361                        is_corrupted(&index, &key)
4362                    }) || is_corrupted(&index, &[])
4363                }
4364            };
4365            assert!(
4366                detected,
4367                "bit flip at byte {} , bit {} is not detected (set DEBUG=1 to see Index dump)",
4368                i / 8,
4369                i % 8,
4370            );
4371        }
4372    }
4373
4374    #[test]
4375    fn test_checksum_toggle() {
4376        let dir = tempdir().unwrap();
4377        let open = |enabled: bool| {
4378            open_opts()
4379                .checksum_enabled(enabled)
4380                .open(dir.path().join("a"))
4381                .expect("open")
4382        };
4383
4384        // Starting with checksum off.
4385        let mut index = open(false);
4386        index.verify().unwrap();
4387        index.insert(b"abcdefg", 0x1234).unwrap();
4388        index.flush().unwrap();
4389        index.insert(b"bcdefgh", 0x2345).unwrap();
4390        index.flush().unwrap();
4391
4392        // Turn on.
4393        let mut index = open(true);
4394        index.verify().unwrap();
4395        index.insert(b"cdefghi", 0x3456).unwrap();
4396        index.flush().unwrap();
4397        index.insert(b"defghij", 0x4567).unwrap();
4398        index.flush().unwrap();
4399
4400        // Turn off.
4401        let mut index = open(false);
4402        index.verify().unwrap();
4403        index.insert(b"efghijh", 0x5678).unwrap();
4404        index.flush().unwrap();
4405        index.insert(b"fghijkl", 0x7890).unwrap();
4406        index.flush().unwrap();
4407
4408        assert_eq!(
4409            format!("{:?}", index),
4410            r#"Index { len: 415, root: Disk[402] }
4411Disk[1]: Key { key: 61 62 63 64 65 66 67 }
4412Disk[10]: Link { value: 4660, next: None }
4413Disk[14]: Leaf { key: Disk[1], link: Disk[10] }
4414Disk[17]: Radix { link: None, 6: Disk[14] }
4415Disk[25]: Root { radix: Disk[17] }
4416Disk[29]: Key { key: 62 63 64 65 66 67 68 }
4417Disk[38]: Link { value: 9029, next: None }
4418Disk[42]: Leaf { key: Disk[29], link: Disk[38] }
4419Disk[45]: Radix { link: None, 1: Disk[14], 2: Disk[42] }
4420Disk[57]: Radix { link: None, 6: Disk[45] }
4421Disk[65]: Root { radix: Disk[57] }
4422Disk[69]: Key { key: 63 64 65 66 67 68 69 }
4423Disk[78]: Link { value: 13398, next: None }
4424Disk[82]: Leaf { key: Disk[69], link: Disk[78] }
4425Disk[85]: Radix { link: None, 1: Disk[14], 2: Disk[42], 3: Disk[82] }
4426Disk[101]: Radix { link: None, 6: Disk[85] }
4427Disk[109]: Root { radix: Disk[101] }
4428Disk[112]: Checksum { start: 0, end: 112, chunk_size_logarithm: 4, checksums.len(): 7 }
4429Disk[176]: Key { key: 64 65 66 67 68 69 6A }
4430Disk[185]: Link { value: 17767, next: None }
4431Disk[190]: Leaf { key: Disk[176], link: Disk[185] }
4432Disk[195]: Radix { link: None, 1: Disk[14], 2: Disk[42], 3: Disk[82], 4: Disk[190] }
4433Disk[215]: Radix { link: None, 6: Disk[195] }
4434Disk[223]: Root { radix: Disk[215] }
4435Disk[227]: Checksum { start: 112, end: 227, chunk_size_logarithm: 4, checksums.len(): 8 }
4436Disk[299]: Key { key: 65 66 67 68 69 6A 68 }
4437Disk[308]: Link { value: 22136, next: None }
4438Disk[313]: Leaf { key: Disk[299], link: Disk[308] }
4439Disk[318]: Radix { link: None, 1: Disk[14], 2: Disk[42], 3: Disk[82], 4: Disk[190], 5: Disk[313] }
4440Disk[342]: Radix { link: None, 6: Disk[318] }
4441Disk[350]: Root { radix: Disk[342] }
4442Disk[355]: Key { key: 66 67 68 69 6A 6B 6C }
4443Disk[364]: Link { value: 30864, next: None }
4444Disk[369]: Leaf { key: Disk[355], link: Disk[364] }
4445Disk[374]: Radix { link: None, 1: Disk[14], 2: Disk[42], 3: Disk[82], 4: Disk[190], 5: Disk[313], 6: Disk[369] }
4446Disk[402]: Radix { link: None, 6: Disk[374] }
4447Disk[410]: Root { radix: Disk[402] }
4448"#
4449        );
4450    }
4451
4452    fn show_checksums(index: &Index) -> String {
4453        let debug_str = format!("{:?}", index);
4454        debug_str
4455            .lines()
4456            .filter_map(|l| {
4457                if l.contains("Checksum") {
4458                    Some(format!("\n                {}", l))
4459                } else {
4460                    None
4461                }
4462            })
4463            .collect::<Vec<_>>()
4464            .concat()
4465    }
4466
4467    #[test]
4468    fn test_checksum_max_chain_len() {
4469        // Test with the given max_chain_len config.
4470        let t = |max_chain_len: u32| {
4471            let dir = tempdir().unwrap();
4472            let path = dir.path().join("i");
4473            let mut index = open_opts()
4474                .checksum_chunk_size_logarithm(7 /* chunk size: 127 */)
4475                .checksum_max_chain_len(max_chain_len)
4476                .open(&path)
4477                .unwrap();
4478            for i in 0..10u8 {
4479                let data: Vec<u8> = if i % 2 == 0 { vec![i] } else { vec![i; 100] };
4480                index.insert(&data, 1).unwrap();
4481                index.flush().unwrap();
4482                index.verify().unwrap();
4483                // If reload from disk, it pass verification too.
4484                let mut index2 = open_opts()
4485                    .checksum_max_chain_len(max_chain_len)
4486                    .open(&path)
4487                    .unwrap();
4488                index2.verify().unwrap();
4489                // Create "racy" writes by flushing from another index.
4490                if i % 3 == 0 {
4491                    index2.insert(&data, 2).unwrap();
4492                    index2.flush().unwrap();
4493                }
4494            }
4495            show_checksums(&index)
4496        };
4497
4498        // Unlimited chain. Chain: 1358 -> 1167 -> 1071 -> 818 -> 738 -> ...
4499        assert_eq!(
4500            t(0),
4501            r#"
4502                Disk[21]: Checksum { start: 0, end: 21, chunk_size_logarithm: 7, checksums.len(): 1 }
4503                Disk[54]: Checksum { start: 0, end: 54, chunk_size_logarithm: 7, checksums.len(): 1 }
4504                Disk[203]: Checksum { start: 0, end: 203, chunk_size_logarithm: 7, checksums.len(): 2 }
4505                Disk[266]: Checksum { start: 203, end: 266, chunk_size_logarithm: 7, checksums.len(): 2 }
4506                Disk[433]: Checksum { start: 266, end: 433, chunk_size_logarithm: 7, checksums.len(): 2 }
4507                Disk[499]: Checksum { start: 433, end: 499, chunk_size_logarithm: 7, checksums.len(): 1 }
4508                Disk[563]: Checksum { start: 433, end: 563, chunk_size_logarithm: 7, checksums.len(): 2 }
4509                Disk[738]: Checksum { start: 563, end: 738, chunk_size_logarithm: 7, checksums.len(): 2 }
4510                Disk[818]: Checksum { start: 738, end: 818, chunk_size_logarithm: 7, checksums.len(): 2 }
4511                Disk[896]: Checksum { start: 818, end: 896, chunk_size_logarithm: 7, checksums.len(): 1 }
4512                Disk[1071]: Checksum { start: 818, end: 1071, chunk_size_logarithm: 7, checksums.len(): 3 }
4513                Disk[1167]: Checksum { start: 1071, end: 1167, chunk_size_logarithm: 7, checksums.len(): 2 }
4514                Disk[1358]: Checksum { start: 1167, end: 1358, chunk_size_logarithm: 7, checksums.len(): 2 }"#
4515        );
4516
4517        // Max chain len = 2. Chain: 1331 -> 1180 -> 0; 872 -> 761 -> 0; ...
4518        assert_eq!(
4519            t(2),
4520            r#"
4521                Disk[21]: Checksum { start: 0, end: 21, chunk_size_logarithm: 7, checksums.len(): 1 }
4522                Disk[54]: Checksum { start: 0, end: 54, chunk_size_logarithm: 7, checksums.len(): 1 }
4523                Disk[203]: Checksum { start: 0, end: 203, chunk_size_logarithm: 7, checksums.len(): 2 }
4524                Disk[266]: Checksum { start: 203, end: 266, chunk_size_logarithm: 7, checksums.len(): 2 }
4525                Disk[433]: Checksum { start: 0, end: 433, chunk_size_logarithm: 7, checksums.len(): 4 }
4526                Disk[514]: Checksum { start: 433, end: 514, chunk_size_logarithm: 7, checksums.len(): 2 }
4527                Disk[586]: Checksum { start: 433, end: 586, chunk_size_logarithm: 7, checksums.len(): 2 }
4528                Disk[761]: Checksum { start: 0, end: 761, chunk_size_logarithm: 7, checksums.len(): 6 }
4529                Disk[872]: Checksum { start: 761, end: 872, chunk_size_logarithm: 7, checksums.len(): 2 }
4530                Disk[950]: Checksum { start: 0, end: 950, chunk_size_logarithm: 7, checksums.len(): 8 }
4531                Disk[1180]: Checksum { start: 0, end: 1180, chunk_size_logarithm: 7, checksums.len(): 10 }
4532                Disk[1331]: Checksum { start: 1180, end: 1331, chunk_size_logarithm: 7, checksums.len(): 2 }
4533                Disk[1522]: Checksum { start: 0, end: 1522, chunk_size_logarithm: 7, checksums.len(): 12 }"#
4534        );
4535
4536        // Max chain len = 1. All have start: 0.
4537        assert_eq!(
4538            t(1),
4539            r#"
4540                Disk[21]: Checksum { start: 0, end: 21, chunk_size_logarithm: 7, checksums.len(): 1 }
4541                Disk[54]: Checksum { start: 0, end: 54, chunk_size_logarithm: 7, checksums.len(): 1 }
4542                Disk[203]: Checksum { start: 0, end: 203, chunk_size_logarithm: 7, checksums.len(): 2 }
4543                Disk[266]: Checksum { start: 0, end: 266, chunk_size_logarithm: 7, checksums.len(): 3 }
4544                Disk[440]: Checksum { start: 0, end: 440, chunk_size_logarithm: 7, checksums.len(): 4 }
4545                Disk[521]: Checksum { start: 0, end: 521, chunk_size_logarithm: 7, checksums.len(): 5 }
4546                Disk[616]: Checksum { start: 0, end: 616, chunk_size_logarithm: 7, checksums.len(): 5 }
4547                Disk[814]: Checksum { start: 0, end: 814, chunk_size_logarithm: 7, checksums.len(): 7 }
4548                Disk[933]: Checksum { start: 0, end: 933, chunk_size_logarithm: 7, checksums.len(): 8 }
4549                Disk[1058]: Checksum { start: 0, end: 1058, chunk_size_logarithm: 7, checksums.len(): 9 }
4550                Disk[1296]: Checksum { start: 0, end: 1296, chunk_size_logarithm: 7, checksums.len(): 11 }
4551                Disk[1455]: Checksum { start: 0, end: 1455, chunk_size_logarithm: 7, checksums.len(): 12 }
4552                Disk[1725]: Checksum { start: 0, end: 1725, chunk_size_logarithm: 7, checksums.len(): 14 }"#
4553        );
4554    }
4555
4556    #[test]
4557    fn test_root_meta() {
4558        let dir = tempdir().unwrap();
4559        let mut index = open_opts().open(dir.path().join("a")).expect("open");
4560        assert!(index.get_meta().is_empty());
4561        let meta = vec![200; 4000];
4562        index.set_meta(&meta);
4563        assert_eq!(index.get_meta(), &meta[..]);
4564        index.flush().expect("flush");
4565        let index = open_opts().open(dir.path().join("a")).expect("open");
4566        assert_eq!(index.get_meta(), &meta[..]);
4567    }
4568
4569    impl<'a> RangeIter<'a> {
4570        fn clone_with_index(&self, index: &'a Index) -> Self {
4571            Self {
4572                completed: self.completed,
4573                index,
4574                front_stack: self.front_stack.clone(),
4575                back_stack: self.back_stack.clone(),
4576            }
4577        }
4578    }
4579
4580    /// Extract keys from the [`RangeIter`]. Verify different iteration
4581    /// directions, and returned link offsets. A `key` has link offset `i`
4582    /// if that key matches `keys[i]`.
4583    fn iter_to_keys(index: &Index, keys: &Vec<&[u8]>, iter: &RangeIter) -> Vec<Vec<u8>> {
4584        let it_forward = iter.clone_with_index(index);
4585        let it_backward = iter.clone_with_index(index);
4586        let mut it_both_ends = iter.clone_with_index(index);
4587
4588        let extract = |v: crate::Result<(Cow<'_, [u8]>, LinkOffset)>| -> Vec<u8> {
4589            let (key, link_offset) = v.unwrap();
4590            let key = key.as_ref();
4591            // Verify link_offset is correct
4592            let ids: Vec<u64> = link_offset
4593                .values(index)
4594                .collect::<crate::Result<Vec<u64>>>()
4595                .unwrap();
4596            assert!(ids.len() == 1);
4597            assert_eq!(keys[ids[0] as usize], key);
4598            key.to_vec()
4599        };
4600
4601        let keys_forward: Vec<_> = it_forward.map(extract).collect();
4602        let mut keys_backward: Vec<_> = it_backward.rev().map(extract).collect();
4603        keys_backward.reverse();
4604        assert_eq!(keys_forward, keys_backward);
4605
4606        // Forward and backward iterators should not overlap
4607        let mut keys_both_ends = Vec::new();
4608        for i in 0..(keys_forward.len() + 2) {
4609            if let Some(v) = it_both_ends.next() {
4610                keys_both_ends.insert(i, extract(v));
4611            }
4612            if let Some(v) = it_both_ends.next_back() {
4613                keys_both_ends.insert(i + 1, extract(v));
4614            }
4615        }
4616        assert_eq!(keys_forward, keys_both_ends);
4617
4618        keys_forward
4619    }
4620
4621    /// Test `Index::range` against `BTreeSet::range`. `tree` specifies keys.
4622    fn test_range_against_btreeset(tree: BTreeSet<&[u8]>) {
4623        let dir = tempdir().unwrap();
4624        let mut index = open_opts().open(dir.path().join("a")).unwrap();
4625        let keys: Vec<&[u8]> = tree.iter().cloned().collect();
4626        for (i, key) in keys.iter().enumerate() {
4627            index.insert(key, i as u64).unwrap();
4628        }
4629
4630        let range_test = |start: Bound<&[u8]>, end: Bound<&[u8]>| {
4631            let range = (start, end);
4632            let iter = index.range(range).unwrap();
4633            let expected_keys: Vec<Vec<u8>> = tree
4634                .range::<&[u8], _>((start, end))
4635                .map(|v| v.to_vec())
4636                .collect();
4637            let selected_keys: Vec<Vec<u8>> = iter_to_keys(&index, &keys, &iter);
4638            assert_eq!(selected_keys, expected_keys);
4639        };
4640
4641        // Generate key variants based on existing keys. Generated keys do not
4642        // exist int the index. Therefore the test is more interesting.
4643        let mut variant_keys = Vec::new();
4644        for base_key in keys.iter() {
4645            // One byte appended
4646            for b in [0x00, 0x77, 0xff] {
4647                let mut key = base_key.to_vec();
4648                key.push(b);
4649                variant_keys.push(key);
4650            }
4651
4652            // Last byte mutated, or removed
4653            if !base_key.is_empty() {
4654                let mut key = base_key.to_vec();
4655                let last = *key.last().unwrap();
4656                *key.last_mut().unwrap() = last.wrapping_add(1);
4657                variant_keys.push(key.clone());
4658                *key.last_mut().unwrap() = last.wrapping_sub(1);
4659                variant_keys.push(key.clone());
4660                key.pop();
4661                variant_keys.push(key);
4662            }
4663        }
4664
4665        // Remove duplicated entries.
4666        let variant_keys = variant_keys
4667            .iter()
4668            .map(|v| v.as_ref())
4669            .filter(|k| !tree.contains(k))
4670            .collect::<BTreeSet<&[u8]>>()
4671            .iter()
4672            .cloned()
4673            .collect::<Vec<&[u8]>>();
4674
4675        range_test(Unbounded, Unbounded);
4676
4677        for key1 in keys.iter().chain(variant_keys.iter()) {
4678            range_test(Unbounded, Included(key1));
4679            range_test(Unbounded, Excluded(key1));
4680            range_test(Included(key1), Unbounded);
4681            range_test(Excluded(key1), Unbounded);
4682
4683            for key2 in keys.iter().chain(variant_keys.iter()) {
4684                if key1 < key2 {
4685                    range_test(Excluded(key1), Excluded(key2));
4686                }
4687                if key1 <= key2 {
4688                    range_test(Excluded(key1), Included(key2));
4689                    range_test(Included(key1), Excluded(key2));
4690                    range_test(Included(key1), Included(key2));
4691                }
4692            }
4693        }
4694    }
4695
4696    #[test]
4697    fn test_range_example1() {
4698        test_range_against_btreeset(
4699            [
4700                &[0x00, 0x00, 0x00][..],
4701                &[0x10, 0x0d, 0x01],
4702                &[0x10, 0x0e],
4703                &[0x10, 0x0f],
4704                &[0x10, 0x0f, 0xff],
4705                &[0x10, 0x10, 0x01],
4706                &[0x10, 0x11],
4707                &[0xff],
4708            ]
4709            .iter()
4710            .cloned()
4711            .collect(),
4712        );
4713    }
4714
4715    #[test]
4716    fn test_clear_dirty() {
4717        let dir = tempdir().unwrap();
4718        let mut index = open_opts().open(dir.path().join("a")).unwrap();
4719
4720        index.insert(&"foo", 2).unwrap();
4721        assert!(!index.get(&"foo").unwrap().is_null());
4722
4723        index.clear_dirty();
4724        assert!(index.get(&"foo").unwrap().is_null());
4725
4726        index.set_meta(&vec![42]);
4727        index.insert(&"foo", 1).unwrap();
4728        index.flush().unwrap();
4729
4730        index.set_meta(&vec![43]);
4731        index.insert(&"bar", 2).unwrap();
4732
4733        assert_eq!(index.get_original_meta(), [42]);
4734        index.clear_dirty();
4735
4736        assert_eq!(index.get_meta(), [42]);
4737        assert!(index.get(&"bar").unwrap().is_null());
4738    }
4739
4740    #[test]
4741    fn test_meta_only_flush() {
4742        let dir = tempdir().unwrap();
4743        let mut index = open_opts().open(dir.path().join("a")).unwrap();
4744
4745        index.set_meta(b"foo");
4746        index.flush().unwrap();
4747
4748        let mut index = open_opts().open(dir.path().join("a")).unwrap();
4749        assert_eq!(index.get_meta(), b"foo");
4750
4751        index.set_meta(b"bar");
4752        let len1 = index.flush().unwrap();
4753
4754        let mut index = open_opts().open(dir.path().join("a")).unwrap();
4755        assert_eq!(index.get_meta(), b"bar");
4756        index.set_meta(b"bar");
4757        let len2 = index.flush().unwrap();
4758        assert_eq!(len1, len2);
4759    }
4760
4761    quickcheck! {
4762        fn test_single_value(map: HashMap<Vec<u8>, u64>, flush: bool) -> bool {
4763            let dir = tempdir().unwrap();
4764            let mut index = open_opts().open(dir.path().join("a")).expect("open");
4765
4766            for (key, value) in &map {
4767                index.insert(key, *value).expect("insert");
4768            }
4769
4770            if flush {
4771                let len = index.flush().expect("flush");
4772                index = open_opts().logical_len(len.into()).open(dir.path().join("a")).unwrap();
4773            }
4774
4775            map.iter().all(|(key, value)| {
4776                let link_offset = index.get(key).expect("lookup");
4777                assert!(!link_offset.is_null());
4778                link_offset.value_and_next(&index).unwrap().0 == *value
4779            })
4780        }
4781
4782        fn test_multiple_values(map: HashMap<Vec<u8>, Vec<u64>>) -> bool {
4783            let dir = tempdir().unwrap();
4784            let mut index = open_opts().open(dir.path().join("a")).expect("open");
4785            let mut index_mem = open_opts().checksum_chunk_size_logarithm(20).create_in_memory().unwrap();
4786
4787            for (key, values) in &map {
4788                for value in values.iter().rev() {
4789                    index.insert(key, *value).expect("insert");
4790                    index_mem.insert(key, *value).expect("insert");
4791                }
4792                if values.is_empty() {
4793                    // Flush sometimes.
4794                    index.flush().expect("flush");
4795                }
4796            }
4797
4798            map.iter().all(|(key, values)| {
4799                let v: Vec<u64> =
4800                    index.get(key).unwrap().values(&index).map(|v| v.unwrap()).collect();
4801                let v_mem: Vec<u64> =
4802                    index_mem.get(key).unwrap().values(&index_mem).map(|v| v.unwrap()).collect();
4803                v == *values && v_mem == *values
4804            })
4805        }
4806
4807        fn test_range_quickcheck(keys: Vec<Vec<u8>>) -> bool {
4808            let size_limit = if cfg!(debug_assertions) {
4809                4
4810            } else {
4811                16
4812            };
4813            let size = keys.len() % size_limit + 1;
4814            let tree: BTreeSet<&[u8]> = keys.iter().take(size).map(|v| v.as_ref()).collect();
4815            test_range_against_btreeset(tree);
4816            true
4817        }
4818
4819        fn test_deletion(keys_deleted: Vec<(Vec<u8>, bool)>) -> bool {
4820            // Compare Index with BTreeSet
4821            let mut set = BTreeSet::<Vec<u8>>::new();
4822            let mut index = in_memory_index();
4823            keys_deleted.into_iter().all(|(key, deleted)| {
4824                if deleted {
4825                    set.remove(&key);
4826                    index.remove(&key).unwrap();
4827                } else {
4828                    set.insert(key.clone());
4829                    index.insert(&key, 1).unwrap();
4830                }
4831                index
4832                    .range(..)
4833                    .unwrap()
4834                    .map(|s| s.unwrap().0.as_ref().to_vec())
4835                    .collect::<Vec<_>>()
4836                    == set.iter().cloned().collect::<Vec<_>>()
4837            })
4838        }
4839
4840        fn test_deletion_prefix(keys_deleted: Vec<(Vec<u8>, bool)>) -> bool {
4841            let mut set = BTreeSet::<Vec<u8>>::new();
4842            let mut index = in_memory_index();
4843            keys_deleted.into_iter().all(|(key, deleted)| {
4844                if deleted {
4845                    // BTreeSet does not have remove_prefix. Emulate it.
4846                    let to_delete = set
4847                        .iter()
4848                        .filter(|k| k.starts_with(&key))
4849                        .cloned()
4850                        .collect::<Vec<_>>();
4851                    for key in to_delete {
4852                        set.remove(&key);
4853                    }
4854                    index.remove_prefix(&key).unwrap();
4855                } else {
4856                    set.insert(key.clone());
4857                    index.insert(&key, 1).unwrap();
4858                }
4859                index
4860                    .range(..)
4861                    .unwrap()
4862                    .map(|s| s.unwrap().0.as_ref().to_vec())
4863                    .collect::<Vec<_>>()
4864                    == set.iter().cloned().collect::<Vec<_>>()
4865            })
4866        }
4867    }
4868}