indexedlog/
index.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8//! Index support for `log`.
9//!
10//! See [`Index`] for the main structure.
11
12// File format:
13//
14// ```plain,ignore
15// INDEX       := HEADER + ENTRY_LIST
16// HEADER      := '\0'  (takes offset 0, so 0 is not a valid offset for ENTRY)
17// ENTRY_LIST  := RADIX | ENTRY_LIST + ENTRY
18// ENTRY       := RADIX | LEAF | LINK | KEY | ROOT + REVERSED(VLQ(ROOT_LEN)) |
19//                ROOT + CHECKSUM + REVERSED(VLQ(ROOT_LEN + CHECKSUM_LEN))
20// RADIX       := '\2' + RADIX_FLAG (1 byte) + BITMAP (2 bytes) +
21//                PTR2(RADIX | LEAF) * popcnt(BITMAP) + PTR2(LINK)
22// LEAF        := '\3' + PTR(KEY | EXT_KEY) + PTR(LINK)
23// LINK        := '\4' + VLQ(VALUE) + PTR(NEXT_LINK | NULL)
24// KEY         := '\5' + VLQ(KEY_LEN) + KEY_BYTES
25// EXT_KEY     := '\6' + VLQ(KEY_START) + VLQ(KEY_LEN)
26// INLINE_LEAF := '\7' + EXT_KEY + LINK
27// ROOT        := '\1' + PTR(RADIX) + VLQ(META_LEN) + META
28// CHECKSUM    := '\8' + PTR(PREVIOUS_CHECKSUM) + VLQ(CHUNK_SIZE_LOGARITHM) +
29//                VLQ(CHECKSUM_CHUNK_START) + XXHASH_LIST + CHECKSUM_XX32 (LE32)
30// XXHASH_LIST := A list of 64-bit xxhash in Little Endian.
31//
32// PTR(ENTRY)  := VLQ(the offset of ENTRY)
33// PTR2(ENTRY) := the offset of ENTRY, in 0 or 4, or 8 bytes depending on BITMAP and FLAGS
34//
35// RADIX_FLAG := USE_64_BIT (1 bit) + RESERVED (6 bits) + HAVE_LINK (1 bit)
36// ```
37//
38// Some notes about the format:
39//
40// - A "RADIX" entry has 16 children. This is mainly for source control hex hashes. The "N"
41//   in a radix entry could be less than 16 if some of the children are missing (ex. offset = 0).
42//   The corresponding jump table bytes of missing children are 0s. If child i exists, then
43//   `jumptable[i]` is the relative (to the beginning of radix entry) offset of PTR(child offset).
44// - A "ROOT" entry its length recorded as the last byte. Normally the root entry is written
45//   at the end. This makes it easier for the caller - it does not have to record the position
46//   of the root entry. The caller could optionally provide a root location.
47// - An entry has a 1 byte "type". This makes it possible to do a linear scan from the
48//   beginning of the file, instead of having to go through a root. Potentially useful for
49//   recovery purpose, or adding new entry types (ex. tree entries other than the 16-children
50//   radix entry, value entries that are not u64 linked list, key entries that refers external
51//   buffer).
52// - The "EXT_KEY" type has a logically similar function with "KEY". But it refers to an external
53//   buffer. This is useful to save spaces if the index is not a source of truth and keys are
54//   long.
55// - The "INLINE_LEAF" type is basically an inlined version of EXT_KEY and LINK, to save space.
56// - The "ROOT_LEN" is reversed so it can be read byte-by-byte from the end of a file.
57
58use std::borrow::Cow;
59use std::cmp::Ordering::Equal;
60use std::cmp::Ordering::Greater;
61use std::cmp::Ordering::Less;
62use std::fmt;
63use std::fmt::Debug;
64use std::fmt::Formatter;
65use std::fs;
66use std::fs::File;
67use std::hash::Hasher;
68use std::io;
69use std::io::Read;
70use std::io::Seek;
71use std::io::SeekFrom;
72use std::io::Write;
73use std::mem::size_of;
74use std::ops::Bound;
75use std::ops::Bound::Excluded;
76use std::ops::Bound::Included;
77use std::ops::Bound::Unbounded;
78use std::ops::Deref;
79use std::ops::RangeBounds;
80use std::path::Path;
81use std::path::PathBuf;
82use std::sync::atomic::AtomicU64;
83use std::sync::atomic::Ordering::AcqRel;
84use std::sync::atomic::Ordering::Acquire;
85use std::sync::atomic::Ordering::Relaxed;
86use std::sync::Arc;
87
88use byteorder::ByteOrder;
89use byteorder::LittleEndian;
90use byteorder::ReadBytesExt;
91use byteorder::WriteBytesExt;
92use fs2::FileExt;
93use minibytes::Bytes;
94use tracing::debug_span;
95use twox_hash::XxHash;
96use vlqencoding::VLQDecodeAt;
97use vlqencoding::VLQEncode;
98
99use crate::base16::base16_to_base256;
100use crate::base16::single_hex_to_base16;
101use crate::base16::Base16Iter;
102use crate::config;
103use crate::errors::IoResultExt;
104use crate::errors::ResultExt;
105use crate::lock::ScopedFileLock;
106use crate::utils;
107use crate::utils::mmap_bytes;
108use crate::utils::xxhash;
109use crate::utils::xxhash32;
110
111//// Structures and serialization
112
113#[derive(Clone, PartialEq, Default)]
114struct MemRadix {
115    pub offsets: [Offset; 16],
116    pub link_offset: LinkOffset,
117}
118
119#[derive(Clone, PartialEq)]
120struct MemLeaf {
121    pub key_offset: Offset,
122    pub link_offset: LinkOffset,
123}
124
125#[derive(Clone, PartialEq)]
126struct MemKey {
127    pub key: Box<[u8]>, // base256
128}
129
130#[derive(Clone, PartialEq)]
131struct MemExtKey {
132    pub start: u64,
133    pub len: u64,
134}
135
136#[derive(Clone, PartialEq)]
137struct MemLink {
138    pub value: u64,
139    pub next_link_offset: LinkOffset,
140    pub unused: bool,
141}
142
143#[derive(Clone, PartialEq)]
144struct MemRoot {
145    pub radix_offset: RadixOffset,
146    pub meta: Box<[u8]>,
147}
148
149/// A Checksum entry specifies the checksums for all bytes before the checksum
150/// entry.  To make CHECKSUM entry size bounded, a Checksum entry can refer to a
151/// previous Checksum entry so it does not have to repeat byte range that is
152/// already covered by the previous entry.  The xxhash list contains one xxhash
153/// per chunk. A chunk has (1 << chunk_size_logarithm) bytes.  The last chunk
154/// can be incomplete.
155struct MemChecksum {
156    /// Indicates the "start" offset of the bytes that should be written
157    /// on serialization.
158    ///
159    /// This is also the offset of the previous Checksum entry.
160    start: u64,
161
162    /// Indicates the "end" offset of the bytes that can be checked.
163    ///
164    /// This is also the offset of the current Checksum entry.
165    end: u64,
166
167    /// Tracks the chain length. Used to detect whether `checksum_max_chain_len`
168    /// is exceeded or not.
169    chain_len: u32,
170
171    /// Each chunk has (1 << chunk_size_logarithmarithm) bytes. The last chunk
172    /// can be shorter.
173    chunk_size_logarithm: u32,
174
175    /// Checksums per chunk.
176    ///
177    /// The start of a chunk always aligns with the chunk size, which might not
178    /// match `start`. For example, given the `start` and `end` shown below:
179    ///
180    /// ```plain,ignore
181    /// | chunk 1 (1MB) | chunk 2 (1MB) | chunk 3 (1MB) | chunk 4 (200K) |
182    ///                     |<-start                                end->|
183    /// ```
184    ///
185    /// The `xxhash_list` would be:
186    ///
187    /// ```plain,ignore
188    /// [xxhash(chunk 2 (1MB)), xxhash(chunk 3 (1MB)), xxhash(chunk 4 (200K))]
189    /// ```
190    ///
191    /// For the root checksum (`Index::chunksum`), the `xxhash_list` should
192    /// conver the entire buffer, as if `start` is 0 (but `start` is not 0).
193    xxhash_list: Vec<u64>,
194
195    /// Whether chunks are checked against `xxhash_list`.
196    ///
197    /// Stored in a bit vector. `checked.len() * 64` should be >=
198    /// `xxhash_list.len()`.
199    checked: Vec<AtomicU64>,
200}
201
202/// Read reversed vlq at the given end offset (exclusive).
203/// Return the decoded integer and the bytes used by the VLQ integer.
204fn read_vlq_reverse(buf: &[u8], end_offset: usize) -> io::Result<(u64, usize)> {
205    let mut int_buf = Vec::new();
206    for i in (0..end_offset).rev() {
207        int_buf.push(buf[i]);
208        if buf[i] <= 127 {
209            break;
210        }
211    }
212    let (value, vlq_size) = int_buf.read_vlq_at(0)?;
213    assert_eq!(vlq_size, int_buf.len());
214    Ok((value, vlq_size))
215}
216
217// Offsets that are >= DIRTY_OFFSET refer to in-memory entries that haven't been
218// written to disk. Offsets < DIRTY_OFFSET are on-disk offsets.
219const DIRTY_OFFSET: u64 = 1u64 << 63;
220
221const TYPE_HEAD: u8 = 0;
222const TYPE_ROOT: u8 = 1;
223const TYPE_RADIX: u8 = 2;
224const TYPE_LEAF: u8 = 3;
225const TYPE_LINK: u8 = 4;
226const TYPE_KEY: u8 = 5;
227const TYPE_EXT_KEY: u8 = 6;
228const TYPE_INLINE_LEAF: u8 = 7;
229const TYPE_CHECKSUM: u8 = 8;
230
231// Bits needed to represent the above type integers.
232const TYPE_BITS: usize = 3;
233
234// Size constants. Do not change.
235const TYPE_BYTES: usize = 1;
236const RADIX_FLAG_BYTES: usize = 1;
237const RADIX_BITMAP_BYTES: usize = 2;
238
239// Bit flags used by radix
240const RADIX_FLAG_USE_64BIT: u8 = 1;
241const RADIX_FLAG_HAVE_LINK: u8 = 1 << 7;
242
243/// Offset to an entry. The type of the entry is yet to be resolved.
244#[derive(Copy, Clone, PartialEq, PartialOrd, Default)]
245pub struct Offset(u64);
246
247// Typed offsets. Constructed after verifying types.
248// `LinkOffset` is public since it's exposed by some APIs.
249
250#[derive(Copy, Clone, PartialEq, PartialOrd, Default)]
251struct RadixOffset(Offset);
252#[derive(Copy, Clone, PartialEq, PartialOrd, Default)]
253struct LeafOffset(Offset);
254
255/// Offset to a linked list entry.
256///
257/// The entry stores a [u64] integer and optionally, the next [`LinkOffset`].
258#[derive(Copy, Clone, PartialEq, PartialOrd, Default)]
259pub struct LinkOffset(Offset);
260#[derive(Copy, Clone, PartialEq, PartialOrd, Default)]
261struct KeyOffset(Offset);
262#[derive(Copy, Clone, PartialEq, PartialOrd, Default)]
263struct ExtKeyOffset(Offset);
264#[derive(Copy, Clone, PartialEq, PartialOrd, Default)]
265struct ChecksumOffset(Offset);
266
267#[derive(Copy, Clone)]
268enum TypedOffset {
269    Radix(RadixOffset),
270    Leaf(LeafOffset),
271    Link(LinkOffset),
272    Key(KeyOffset),
273    ExtKey(ExtKeyOffset),
274    Checksum(ChecksumOffset),
275}
276
277impl Offset {
278    /// Convert an unverified `u64` read from disk to a non-dirty `Offset`.
279    /// Return [`errors::IndexError`] if the offset is dirty.
280    #[inline]
281    fn from_disk(index: impl IndexBuf, value: u64) -> crate::Result<Self> {
282        if value >= DIRTY_OFFSET {
283            Err(index.corruption(format!("illegal disk offset {}", value)))
284        } else {
285            Ok(Offset(value))
286        }
287    }
288
289    /// Convert a possibly "dirty" offset to a non-dirty offset.
290    /// Useful when writing offsets to disk.
291    #[inline]
292    fn to_disk(self, offset_map: &OffsetMap) -> u64 {
293        offset_map.get(self)
294    }
295
296    /// Convert to `TypedOffset`.
297    fn to_typed(self, index: impl IndexBuf) -> crate::Result<TypedOffset> {
298        let type_int = self.type_int(&index)?;
299        match type_int {
300            TYPE_RADIX => Ok(TypedOffset::Radix(RadixOffset(self))),
301            TYPE_LEAF => Ok(TypedOffset::Leaf(LeafOffset(self))),
302            TYPE_LINK => Ok(TypedOffset::Link(LinkOffset(self))),
303            TYPE_KEY => Ok(TypedOffset::Key(KeyOffset(self))),
304            TYPE_EXT_KEY => Ok(TypedOffset::ExtKey(ExtKeyOffset(self))),
305            // LeafOffset handles inline transparently.
306            TYPE_INLINE_LEAF => Ok(TypedOffset::Leaf(LeafOffset(self))),
307            TYPE_CHECKSUM => Ok(TypedOffset::Checksum(ChecksumOffset(self))),
308            _ => Err(index.corruption(format!("type {} is unsupported", type_int))),
309        }
310    }
311
312    /// Read the `type_int` value.
313    fn type_int(self, index: impl IndexBuf) -> crate::Result<u8> {
314        let buf = index.buf();
315        if self.is_null() {
316            Err(index.corruption("invalid read from null"))
317        } else if self.is_dirty() {
318            Ok(((self.0 - DIRTY_OFFSET) & ((1 << TYPE_BITS) - 1)) as u8)
319        } else {
320            index.verify_checksum(self.0, TYPE_BYTES as u64)?;
321            match buf.get(self.0 as usize) {
322                Some(x) => Ok(*x as u8),
323                _ => Err(index.range_error(self.0 as usize, 1)),
324            }
325        }
326    }
327
328    /// Convert to `TypedOffset` without returning detailed error message.
329    ///
330    /// This is useful for cases where the error handling is optional.
331    fn to_optional_typed(self, buf: &[u8]) -> Option<TypedOffset> {
332        if self.is_null() {
333            return None;
334        }
335        let type_int = if self.is_dirty() {
336            Some(((self.0 - DIRTY_OFFSET) & ((1 << TYPE_BITS) - 1)) as u8)
337        } else {
338            buf.get(self.0 as usize).cloned()
339        };
340        match type_int {
341            Some(TYPE_RADIX) => Some(TypedOffset::Radix(RadixOffset(self))),
342            Some(TYPE_LEAF) => Some(TypedOffset::Leaf(LeafOffset(self))),
343            Some(TYPE_LINK) => Some(TypedOffset::Link(LinkOffset(self))),
344            Some(TYPE_KEY) => Some(TypedOffset::Key(KeyOffset(self))),
345            Some(TYPE_EXT_KEY) => Some(TypedOffset::ExtKey(ExtKeyOffset(self))),
346            // LeafOffset handles inline transparently.
347            Some(TYPE_INLINE_LEAF) => Some(TypedOffset::Leaf(LeafOffset(self))),
348            Some(TYPE_CHECKSUM) => Some(TypedOffset::Checksum(ChecksumOffset(self))),
349            _ => None,
350        }
351    }
352
353    /// Test whether the offset is null (0).
354    #[inline]
355    fn is_null(self) -> bool {
356        self.0 == 0
357    }
358
359    /// Test whether the offset points to an in-memory entry.
360    #[inline]
361    fn is_dirty(self) -> bool {
362        self.0 >= DIRTY_OFFSET
363    }
364
365    fn null() -> Self {
366        Self(0)
367    }
368}
369
370// Common methods shared by typed offset structs.
371trait TypedOffsetMethods: Sized {
372    #[inline]
373    fn dirty_index(self) -> usize {
374        debug_assert!(self.to_offset().is_dirty());
375        ((self.to_offset().0 - DIRTY_OFFSET) >> TYPE_BITS) as usize
376    }
377
378    #[inline]
379    fn from_offset(offset: Offset, index: impl IndexBuf) -> crate::Result<Self> {
380        if offset.is_null() {
381            Ok(Self::from_offset_unchecked(offset))
382        } else {
383            let type_int = offset.type_int(&index)?;
384            if type_int == Self::type_int() {
385                Ok(Self::from_offset_unchecked(offset))
386            } else {
387                Err(index.corruption(format!("inconsistent type at {:?}", offset)))
388            }
389        }
390    }
391
392    #[inline]
393    fn from_dirty_index(index: usize) -> Self {
394        Self::from_offset_unchecked(Offset(
395            (((index as u64) << TYPE_BITS) | Self::type_int() as u64) + DIRTY_OFFSET,
396        ))
397    }
398
399    fn type_int() -> u8;
400
401    fn from_offset_unchecked(offset: Offset) -> Self;
402
403    fn to_offset(&self) -> Offset;
404}
405
406impl_offset!(RadixOffset, TYPE_RADIX, "Radix");
407impl_offset!(LeafOffset, TYPE_LEAF, "Leaf");
408impl_offset!(LinkOffset, TYPE_LINK, "Link");
409impl_offset!(KeyOffset, TYPE_KEY, "Key");
410impl_offset!(ExtKeyOffset, TYPE_EXT_KEY, "ExtKey");
411impl_offset!(ChecksumOffset, TYPE_CHECKSUM, "Checksum");
412
413impl RadixOffset {
414    /// Link offset of a radix entry.
415    #[inline]
416    fn link_offset(self, index: &Index) -> crate::Result<LinkOffset> {
417        if self.is_dirty() {
418            Ok(index.dirty_radixes[self.dirty_index()].link_offset)
419        } else {
420            let flag_start = TYPE_BYTES + usize::from(self);
421            let flag = *index
422                .buf
423                .get(flag_start)
424                .ok_or_else(|| index.range_error(flag_start, 1))?;
425            index.verify_checksum(
426                flag_start as u64,
427                (RADIX_FLAG_BYTES + RADIX_BITMAP_BYTES) as u64,
428            )?;
429
430            if Self::parse_have_link_from_flag(flag) {
431                let bitmap_start = flag_start + RADIX_FLAG_BYTES;
432                let bitmap = Self::read_bitmap_unchecked(index, bitmap_start)?;
433                let int_size = Self::parse_int_size_from_flag(flag);
434                let link_offset =
435                    bitmap_start + RADIX_BITMAP_BYTES + bitmap.count_ones() as usize * int_size;
436                index.verify_checksum(link_offset as u64, int_size as u64)?;
437                let raw_offset = Self::read_raw_int_unchecked(index, int_size, link_offset)?;
438                Ok(LinkOffset::from_offset(
439                    Offset::from_disk(index, raw_offset)?,
440                    index,
441                )?)
442            } else {
443                Ok(LinkOffset::default())
444            }
445        }
446    }
447
448    /// Lookup the `i`-th child inside a radix entry.
449    /// Return stored offset, or `Offset(0)` if that child does not exist.
450    #[inline]
451    fn child(self, index: &Index, i: u8) -> crate::Result<Offset> {
452        // "i" is not derived from user input.
453        assert!(i < 16);
454        if self.is_dirty() {
455            Ok(index.dirty_radixes[self.dirty_index()].offsets[i as usize])
456        } else {
457            let flag_start = TYPE_BYTES + usize::from(self);
458            let bitmap_start = flag_start + RADIX_FLAG_BYTES;
459            // Integrity of "bitmap" is checked below to reduce calls to verify_checksum, since
460            // this is a hot path.
461            let bitmap = Self::read_bitmap_unchecked(index, bitmap_start)?;
462            let has_child = (1u16 << i) & bitmap != 0;
463            if has_child {
464                let flag = *index
465                    .buf
466                    .get(flag_start)
467                    .ok_or_else(|| index.range_error(flag_start, 1))?;
468                let int_size = Self::parse_int_size_from_flag(flag);
469                let skip_child_count = (((1u16 << i) - 1) & bitmap).count_ones() as usize;
470                let child_offset = bitmap_start + RADIX_BITMAP_BYTES + skip_child_count * int_size;
471                index.verify_checksum(
472                    flag_start as u64,
473                    (child_offset + int_size - flag_start) as u64,
474                )?;
475                let raw_offset = Self::read_raw_int_unchecked(index, int_size, child_offset)?;
476                Ok(Offset::from_disk(index, raw_offset)?)
477            } else {
478                index.verify_checksum(bitmap_start as u64, RADIX_BITMAP_BYTES as u64)?;
479                Ok(Offset::default())
480            }
481        }
482    }
483
484    /// Copy an on-disk entry to memory so it can be modified. Return new offset.
485    /// If the offset is already in-memory, return it as-is.
486    #[inline]
487    fn copy(self, index: &mut Index) -> crate::Result<RadixOffset> {
488        if self.is_dirty() {
489            Ok(self)
490        } else {
491            let entry = MemRadix::read_from(&index, u64::from(self))?;
492            let len = index.dirty_radixes.len();
493            index.dirty_radixes.push(entry);
494            Ok(RadixOffset::from_dirty_index(len))
495        }
496    }
497
498    /// Change a child of `MemRadix`. Panic if the offset points to an on-disk entry.
499    #[inline]
500    fn set_child(self, index: &mut Index, i: u8, value: Offset) {
501        assert!(i < 16);
502        if self.is_dirty() {
503            index.dirty_radixes[self.dirty_index()].offsets[i as usize] = value;
504        } else {
505            panic!("bug: set_child called on immutable radix entry");
506        }
507    }
508
509    /// Change link offset of `MemRadix`. Panic if the offset points to an on-disk entry.
510    #[inline]
511    fn set_link(self, index: &mut Index, value: LinkOffset) {
512        if self.is_dirty() {
513            index.dirty_radixes[self.dirty_index()].link_offset = value;
514        } else {
515            panic!("bug: set_link called on immutable radix entry");
516        }
517    }
518
519    /// Change all children and link offset to null.
520    /// Panic if the offset points to an on-disk entry.
521    fn set_all_to_null(self, index: &mut Index) {
522        if self.is_dirty() {
523            index.dirty_radixes[self.dirty_index()] = MemRadix::default();
524        } else {
525            panic!("bug: set_all_to_null called on immutable radix entry");
526        }
527    }
528
529    /// Create a new in-memory radix entry.
530    #[inline]
531    fn create(index: &mut Index, radix: MemRadix) -> RadixOffset {
532        let len = index.dirty_radixes.len();
533        index.dirty_radixes.push(radix);
534        RadixOffset::from_dirty_index(len)
535    }
536
537    /// Parse whether link offset exists from a flag.
538    #[inline]
539    fn parse_have_link_from_flag(flag: u8) -> bool {
540        flag & RADIX_FLAG_HAVE_LINK != 0
541    }
542
543    /// Parse int size (in bytes) from a flag.
544    #[inline]
545    fn parse_int_size_from_flag(flag: u8) -> usize {
546        if flag & RADIX_FLAG_USE_64BIT == 0 {
547            size_of::<u32>()
548        } else {
549            size_of::<u64>()
550        }
551    }
552
553    /// Read bitmap from the given offset without integrity check.
554    #[inline]
555    fn read_bitmap_unchecked(index: &Index, bitmap_offset: usize) -> crate::Result<u16> {
556        debug_assert_eq!(RADIX_BITMAP_BYTES, size_of::<u16>());
557        let buf = &index.buf;
558        buf.get(bitmap_offset..bitmap_offset + RADIX_BITMAP_BYTES)
559            .map(|buf| LittleEndian::read_u16(buf))
560            .ok_or_else(|| {
561                crate::Error::corruption(
562                    &index.path,
563                    format!("cannot read radix bitmap at {}", bitmap_offset),
564                )
565            })
566    }
567
568    /// Read integer from the given offset without integrity check.
569    #[inline]
570    fn read_raw_int_unchecked(index: &Index, int_size: usize, offset: usize) -> crate::Result<u64> {
571        let buf = &index.buf;
572        let result = match int_size {
573            4 => buf
574                .get(offset..offset + 4)
575                .map(|buf| LittleEndian::read_u32(buf) as u64),
576            8 => buf
577                .get(offset..offset + 8)
578                .map(|buf| LittleEndian::read_u64(buf)),
579            _ => unreachable!(),
580        };
581        result.ok_or_else(|| {
582            crate::Error::corruption(
583                &index.path,
584                format!("cannot read {}-byte int at {}", int_size, offset),
585            )
586        })
587    }
588}
589
590/// Extract key_content from an untyped Offset. Internal use only.
591fn extract_key_content(index: &Index, key_offset: Offset) -> crate::Result<&[u8]> {
592    let typed_offset = key_offset.to_typed(index)?;
593    match typed_offset {
594        TypedOffset::Key(x) => Ok(x.key_content(index)?),
595        TypedOffset::ExtKey(x) => Ok(x.key_content(index)?),
596        _ => Err(index.corruption(format!("unexpected key type at {}", key_offset.0))),
597    }
598}
599
600impl LeafOffset {
601    /// Key content and link offsets of a leaf entry.
602    #[inline]
603    fn key_and_link_offset(self, index: &Index) -> crate::Result<(&[u8], LinkOffset)> {
604        if self.is_dirty() {
605            let e = &index.dirty_leafs[self.dirty_index()];
606            let key_content = extract_key_content(index, e.key_offset)?;
607            Ok((key_content, e.link_offset))
608        } else {
609            let (key_content, raw_link_offset) = match index.buf[usize::from(self)] {
610                TYPE_INLINE_LEAF => {
611                    let raw_key_offset = u64::from(self) + TYPE_BYTES as u64;
612                    // PERF: Consider skip checksum for this read.
613                    let key_offset = ExtKeyOffset::from_offset(
614                        Offset::from_disk(index, raw_key_offset)?,
615                        index,
616                    )?;
617                    // Avoid using key_content. Skip one checksum check.
618                    let (key_content, key_entry_size) =
619                        key_offset.key_content_and_entry_size_unchecked(index)?;
620                    let key_entry_size = key_entry_size.unwrap();
621                    let raw_link_offset = raw_key_offset + key_entry_size as u64;
622                    index.verify_checksum(
623                        u64::from(self),
624                        raw_link_offset as u64 - u64::from(self),
625                    )?;
626                    (key_content, raw_link_offset)
627                }
628                TYPE_LEAF => {
629                    let (raw_key_offset, vlq_len): (u64, _) = index
630                        .buf
631                        .read_vlq_at(usize::from(self) + TYPE_BYTES)
632                        .context(
633                            index.path(),
634                            "cannot read key_offset in LeafOffset::key_and_link_offset",
635                        )
636                        .corruption()?;
637                    let key_offset = Offset::from_disk(index, raw_key_offset)?;
638                    let key_content = extract_key_content(index, key_offset)?;
639                    let (raw_link_offset, vlq_len2) = index
640                        .buf
641                        .read_vlq_at(usize::from(self) + TYPE_BYTES + vlq_len)
642                        .context(
643                            index.path(),
644                            "cannot read link_offset in LeafOffset::key_and_link_offset",
645                        )
646                        .corruption()?;
647                    index.verify_checksum(
648                        u64::from(self),
649                        (TYPE_BYTES + vlq_len + vlq_len2) as u64,
650                    )?;
651                    (key_content, raw_link_offset)
652                }
653                _ => unreachable!("bug: LeafOffset constructed with non-leaf types"),
654            };
655            let link_offset =
656                LinkOffset::from_offset(Offset::from_disk(index, raw_link_offset as u64)?, index)?;
657            Ok((key_content, link_offset))
658        }
659    }
660
661    /// Create a new in-memory leaf entry. The key entry cannot be null.
662    #[inline]
663    fn create(index: &mut Index, link_offset: LinkOffset, key_offset: Offset) -> LeafOffset {
664        debug_assert!(!key_offset.is_null());
665        let len = index.dirty_leafs.len();
666        index.dirty_leafs.push(MemLeaf {
667            link_offset,
668            key_offset,
669        });
670        LeafOffset::from_dirty_index(len)
671    }
672
673    /// Update link_offset of a leaf entry in-place. Copy on write. Return the new leaf_offset
674    /// if it's copied from disk.
675    ///
676    /// Note: the old leaf is expected to be no longer needed. If that's not true, don't call
677    /// this function.
678    #[inline]
679    fn set_link(self, index: &mut Index, link_offset: LinkOffset) -> crate::Result<LeafOffset> {
680        if self.is_dirty() {
681            index.dirty_leafs[self.dirty_index()].link_offset = link_offset;
682            Ok(self)
683        } else {
684            let entry = MemLeaf::read_from(index, u64::from(self))?;
685            Ok(Self::create(index, link_offset, entry.key_offset))
686        }
687    }
688
689    /// Mark the entry as unused. An unused entry won't be written to disk.
690    /// No effect on an on-disk entry.
691    fn mark_unused(self, index: &mut Index) {
692        if self.is_dirty() {
693            let key_offset = index.dirty_leafs[self.dirty_index()].key_offset;
694            match key_offset.to_typed(&*index) {
695                Ok(TypedOffset::Key(x)) => x.mark_unused(index),
696                Ok(TypedOffset::ExtKey(x)) => x.mark_unused(index),
697                _ => {}
698            };
699            index.dirty_leafs[self.dirty_index()].mark_unused()
700        }
701    }
702}
703
704/// Iterator for values in the linked list
705pub struct LeafValueIter<'a> {
706    index: &'a Index,
707    offset: LinkOffset,
708    errored: bool,
709}
710
711impl<'a> Iterator for LeafValueIter<'a> {
712    type Item = crate::Result<u64>;
713
714    fn next(&mut self) -> Option<Self::Item> {
715        if self.offset.is_null() || self.errored {
716            None
717        } else {
718            match self.offset.value_and_next(self.index) {
719                Ok((value, next)) => {
720                    self.offset = next;
721                    Some(Ok(value))
722                }
723                Err(e) => {
724                    self.errored = true;
725                    Some(Err(e))
726                }
727            }
728        }
729    }
730}
731
732/// Iterator returned by [`Index::range`].
733/// Provide access to full keys and values (as [`LinkOffset`]), sorted by key.
734pub struct RangeIter<'a> {
735    index: &'a Index,
736
737    // Stack about what is being visited, for `next`.
738    front_stack: Vec<IterState>,
739
740    // Stack about what is being visited, for `next_back`.
741    back_stack: Vec<IterState>,
742
743    // Completed. Either error out, or the iteration ends.
744    completed: bool,
745}
746
747impl<'a> RangeIter<'a> {
748    fn new(index: &'a Index, front_stack: Vec<IterState>, back_stack: Vec<IterState>) -> Self {
749        assert!(!front_stack.is_empty());
750        assert!(!back_stack.is_empty());
751        Self {
752            completed: front_stack.last() == back_stack.last(),
753            index,
754            front_stack,
755            back_stack,
756        }
757    }
758
759    /// Reconstruct "key" from the stack.
760    fn key(stack: &[IterState], index: &Index) -> crate::Result<Vec<u8>> {
761        // Reconstruct key. Collect base16 child stack (prefix + visiting),
762        // then convert to base256.
763        let mut prefix = Vec::with_capacity(stack.len() - 1);
764        for frame in stack.iter().take(stack.len() - 1).cloned() {
765            prefix.push(match frame {
766                // The frame contains the "current" child being visited.
767                IterState::RadixChild(_, child) if child < 16 => child,
768                _ => unreachable!("bug: malicious iterator state"),
769            })
770        }
771        if prefix.len() & 1 == 1 {
772            // Odd-length key
773            Err(index.corruption("unexpected odd-length key"))
774        } else {
775            Ok(base16_to_base256(&prefix))
776        }
777    }
778
779    /// Used by both `next` and `next_back`.
780    fn step(
781        index: &'a Index,
782        stack: &mut Vec<IterState>,
783        towards: Side,
784        exclusive: IterState,
785    ) -> Option<crate::Result<(Cow<'a, [u8]>, LinkOffset)>> {
786        loop {
787            let state = match stack.pop().unwrap().step(towards) {
788                // Pop. Visit next.
789                None => continue,
790                Some(state) => state,
791            };
792
793            if state == exclusive {
794                // Stop iteration.
795                return None;
796            }
797
798            // Write down what's being visited.
799            stack.push(state);
800
801            return match state {
802                IterState::RadixChild(radix, child) => match radix.child(index, child) {
803                    Ok(next_offset) if next_offset.is_null() => continue,
804                    Ok(next_offset) => match next_offset.to_typed(index) {
805                        Ok(TypedOffset::Radix(next_radix)) => {
806                            stack.push(match towards {
807                                Front => IterState::RadixEnd(next_radix),
808                                Back => IterState::RadixStart(next_radix),
809                            });
810                            continue;
811                        }
812                        Ok(TypedOffset::Leaf(next_leaf)) => {
813                            stack.push(match towards {
814                                Front => IterState::LeafEnd(next_leaf),
815                                Back => IterState::LeafStart(next_leaf),
816                            });
817                            continue;
818                        }
819                        Ok(_) => Some(Err(index.corruption("unexpected type during iteration"))),
820                        Err(err) => Some(Err(err)),
821                    },
822                    Err(err) => Some(Err(err)),
823                },
824                IterState::RadixLeaf(radix) => match radix.link_offset(index) {
825                    Ok(link_offset) if link_offset.is_null() => continue,
826                    Ok(link_offset) => match Self::key(stack, index) {
827                        Ok(key) => Some(Ok((Cow::Owned(key), link_offset))),
828                        Err(err) => Some(Err(err)),
829                    },
830                    Err(err) => Some(Err(err)),
831                },
832                IterState::Leaf(leaf) => match leaf.key_and_link_offset(index) {
833                    Ok((key, link_offset)) => Some(Ok((Cow::Borrowed(key), link_offset))),
834                    Err(err) => Some(Err(err)),
835                },
836                IterState::RadixEnd(_)
837                | IterState::RadixStart(_)
838                | IterState::LeafStart(_)
839                | IterState::LeafEnd(_) => {
840                    continue;
841                }
842            };
843        }
844    }
845}
846
847impl<'a> Iterator for RangeIter<'a> {
848    type Item = crate::Result<(Cow<'a, [u8]>, LinkOffset)>;
849
850    /// Return the next key and corresponding [`LinkOffset`].
851    fn next(&mut self) -> Option<Self::Item> {
852        if self.completed {
853            return None;
854        }
855        let exclusive = self.back_stack.last().cloned().unwrap();
856        let result = Self::step(self.index, &mut self.front_stack, Back, exclusive);
857        match result {
858            Some(Err(_)) | None => self.completed = true,
859            _ => {}
860        }
861        result
862    }
863}
864
865impl<'a> DoubleEndedIterator for RangeIter<'a> {
866    /// Return the next key and corresponding [`LinkOffset`], from the end of
867    /// the iterator.
868    fn next_back(&mut self) -> Option<Self::Item> {
869        if self.completed {
870            return None;
871        }
872        let exclusive = self.front_stack.last().cloned().unwrap();
873        let result = Self::step(self.index, &mut self.back_stack, Front, exclusive);
874        match result {
875            Some(Err(_)) | None => self.completed = true,
876            _ => {}
877        }
878        result
879    }
880}
881
882impl LinkOffset {
883    /// Iterating through values referred by this linked list.
884    pub fn values(self, index: &Index) -> LeafValueIter<'_> {
885        LeafValueIter {
886            errored: false,
887            index,
888            offset: self,
889        }
890    }
891
892    /// Get value, and the next link offset.
893    #[inline]
894    fn value_and_next(self, index: &Index) -> crate::Result<(u64, LinkOffset)> {
895        if self.is_dirty() {
896            let e = &index.dirty_links[self.dirty_index()];
897            Ok((e.value, e.next_link_offset))
898        } else {
899            let (value, vlq_len) = index
900                .buf
901                .read_vlq_at(usize::from(self) + TYPE_BYTES)
902                .context(
903                    index.path(),
904                    "cannot read link_value in LinkOffset::value_and_next",
905                )
906                .corruption()?;
907            let (next_link, vlq_len2) = index
908                .buf
909                .read_vlq_at(usize::from(self) + TYPE_BYTES + vlq_len)
910                .context(
911                    index.path(),
912                    "cannot read next_link_offset in LinkOffset::value_and_next",
913                )
914                .corruption()?;
915            index.verify_checksum(u64::from(self), (TYPE_BYTES + vlq_len + vlq_len2) as u64)?;
916            let next_link = LinkOffset::from_offset(Offset::from_disk(index, next_link)?, index)?;
917            Ok((value, next_link))
918        }
919    }
920
921    /// Create a new link entry that chains this entry.
922    /// Return new `LinkOffset`
923    fn create(self, index: &mut Index, value: u64) -> LinkOffset {
924        let new_link = MemLink {
925            value,
926            next_link_offset: self,
927            unused: false,
928        };
929        let len = index.dirty_links.len();
930        index.dirty_links.push(new_link);
931        LinkOffset::from_dirty_index(len)
932    }
933}
934
935impl KeyOffset {
936    /// Key content of a key entry.
937    #[inline]
938    fn key_content(self, index: &Index) -> crate::Result<&[u8]> {
939        if self.is_dirty() {
940            Ok(&index.dirty_keys[self.dirty_index()].key[..])
941        } else {
942            let (key_len, vlq_len): (usize, _) = index
943                .buf
944                .read_vlq_at(usize::from(self) + TYPE_BYTES)
945                .context(
946                    index.path(),
947                    "cannot read key_len in KeyOffset::key_content",
948                )
949                .corruption()?;
950            let start = usize::from(self) + TYPE_BYTES + vlq_len;
951            let end = start + key_len;
952            index.verify_checksum(u64::from(self), end as u64 - u64::from(self))?;
953            if end > index.buf.len() {
954                Err(index.range_error(start, end - start))
955            } else {
956                Ok(&index.buf[start..end])
957            }
958        }
959    }
960
961    /// Create a new in-memory key entry. The key cannot be empty.
962    #[inline]
963    fn create(index: &mut Index, key: &[u8]) -> KeyOffset {
964        debug_assert!(!key.is_empty());
965        let len = index.dirty_keys.len();
966        index.dirty_keys.push(MemKey {
967            key: Vec::from(key).into_boxed_slice(),
968        });
969        KeyOffset::from_dirty_index(len)
970    }
971
972    /// Mark the entry as unused. An unused entry won't be written to disk.
973    /// No effect on an on-disk entry.
974    fn mark_unused(self, index: &mut Index) {
975        if self.is_dirty() {
976            index.dirty_keys[self.dirty_index()].mark_unused();
977        }
978    }
979}
980
981impl ExtKeyOffset {
982    /// Key content of a key entry.
983    #[inline]
984    fn key_content(self, index: &Index) -> crate::Result<&[u8]> {
985        let (key_content, entry_size) = self.key_content_and_entry_size_unchecked(index)?;
986        if let Some(entry_size) = entry_size {
987            index.verify_checksum(u64::from(self), entry_size as u64)?;
988        }
989        Ok(key_content)
990    }
991
992    /// Key content and key entry size. Used internally.
993    #[inline]
994    fn key_content_and_entry_size_unchecked(
995        self,
996        index: &Index,
997    ) -> crate::Result<(&[u8], Option<usize>)> {
998        let (start, len, entry_size) = if self.is_dirty() {
999            let e = &index.dirty_ext_keys[self.dirty_index()];
1000            (e.start, e.len, None)
1001        } else {
1002            let (start, vlq_len1): (u64, _) = index
1003                .buf
1004                .read_vlq_at(usize::from(self) + TYPE_BYTES)
1005                .context(
1006                    index.path(),
1007                    "cannot read ext_key_start in ExtKeyOffset::key_content",
1008                )
1009                .corruption()?;
1010            let (len, vlq_len2): (u64, _) = index
1011                .buf
1012                .read_vlq_at(usize::from(self) + TYPE_BYTES + vlq_len1)
1013                .context(
1014                    index.path(),
1015                    "cannot read ext_key_len in ExtKeyOffset::key_content",
1016                )
1017                .corruption()?;
1018            (start, len, Some(TYPE_BYTES + vlq_len1 + vlq_len2))
1019        };
1020        let key_buf = index.key_buf.as_ref();
1021        let key_content = match key_buf.slice(start, len) {
1022            Some(k) => k,
1023            None => {
1024                return Err(index.corruption(format!(
1025                    "key buffer is invalid when reading referred keys at {}",
1026                    start
1027                )));
1028            }
1029        };
1030        Ok((key_content, entry_size))
1031    }
1032
1033    /// Create a new in-memory external key entry. The key cannot be empty.
1034    #[inline]
1035    fn create(index: &mut Index, start: u64, len: u64) -> ExtKeyOffset {
1036        debug_assert!(len > 0);
1037        let i = index.dirty_ext_keys.len();
1038        index.dirty_ext_keys.push(MemExtKey { start, len });
1039        ExtKeyOffset::from_dirty_index(i)
1040    }
1041
1042    /// Mark the entry as unused. An unused entry won't be written to disk.
1043    /// No effect on an on-disk entry.
1044    fn mark_unused(self, index: &mut Index) {
1045        if self.is_dirty() {
1046            index.dirty_ext_keys[self.dirty_index()].mark_unused();
1047        }
1048    }
1049}
1050
1051/// Check type for an on-disk entry
1052fn check_type(index: impl IndexBuf, offset: usize, expected: u8) -> crate::Result<()> {
1053    let typeint = *(index
1054        .buf()
1055        .get(offset)
1056        .ok_or_else(|| index.range_error(offset, 1))?);
1057    if typeint != expected {
1058        Err(index.corruption(format!(
1059            "type mismatch at offset {} expected {} but got {}",
1060            offset, expected, typeint
1061        )))
1062    } else {
1063        Ok(())
1064    }
1065}
1066
1067impl MemRadix {
1068    fn read_from(index: &Index, offset: u64) -> crate::Result<Self> {
1069        let buf = &index.buf;
1070        let offset = offset as usize;
1071        let mut pos = 0;
1072
1073        // Integrity check is done at the end to reduce overhead.
1074        check_type(index, offset, TYPE_RADIX)?;
1075        pos += TYPE_BYTES;
1076
1077        let flag = *buf
1078            .get(offset + pos)
1079            .ok_or_else(|| index.range_error(offset + pos, 1))?;
1080        pos += RADIX_FLAG_BYTES;
1081
1082        let bitmap = RadixOffset::read_bitmap_unchecked(index, offset + pos)?;
1083        pos += RADIX_BITMAP_BYTES;
1084
1085        let int_size = RadixOffset::parse_int_size_from_flag(flag);
1086
1087        let mut offsets = [Offset::default(); 16];
1088        for (i, o) in offsets.iter_mut().enumerate() {
1089            if (bitmap >> i) & 1 == 1 {
1090                *o = Offset::from_disk(
1091                    index,
1092                    RadixOffset::read_raw_int_unchecked(index, int_size, offset + pos)?,
1093                )?;
1094                pos += int_size;
1095            }
1096        }
1097
1098        let link_offset = if RadixOffset::parse_have_link_from_flag(flag) {
1099            let raw_offset = RadixOffset::read_raw_int_unchecked(index, int_size, offset + pos)?;
1100            pos += int_size;
1101            LinkOffset::from_offset(Offset::from_disk(index, raw_offset)?, index)?
1102        } else {
1103            LinkOffset::default()
1104        };
1105
1106        index.verify_checksum(offset as u64, pos as u64)?;
1107
1108        Ok(MemRadix {
1109            offsets,
1110            link_offset,
1111        })
1112    }
1113
1114    fn write_to<W: Write>(&self, writer: &mut W, offset_map: &OffsetMap) -> io::Result<()> {
1115        // Prepare data to write
1116        let mut flag = 0;
1117        let mut bitmap = 0;
1118        let u32_max = ::std::u32::MAX as u64;
1119
1120        let link_offset = if !self.link_offset.is_null() {
1121            flag |= RADIX_FLAG_HAVE_LINK;
1122            let link_offset = self.link_offset.to_disk(offset_map);
1123            if link_offset > u32_max {
1124                flag |= RADIX_FLAG_USE_64BIT;
1125            }
1126            link_offset
1127        } else {
1128            0
1129        };
1130
1131        let mut child_offsets = [0u64; 16];
1132        for (i, child_offset) in self.offsets.iter().enumerate() {
1133            if !child_offset.is_null() {
1134                bitmap |= 1u16 << i;
1135                let child_offset = child_offset.to_disk(offset_map);
1136                if child_offset > u32_max {
1137                    flag |= RADIX_FLAG_USE_64BIT;
1138                }
1139                child_offsets[i] = child_offset;
1140            }
1141        }
1142
1143        // Write them
1144        writer.write_all(&[TYPE_RADIX, flag])?;
1145        writer.write_u16::<LittleEndian>(bitmap)?;
1146
1147        if flag & RADIX_FLAG_USE_64BIT != 0 {
1148            for &child_offset in child_offsets.iter() {
1149                if child_offset > 0 {
1150                    writer.write_u64::<LittleEndian>(child_offset)?;
1151                }
1152            }
1153            if link_offset > 0 {
1154                writer.write_u64::<LittleEndian>(link_offset)?;
1155            }
1156        } else {
1157            for &child_offset in child_offsets.iter() {
1158                if child_offset > 0 {
1159                    writer.write_u32::<LittleEndian>(child_offset as u32)?;
1160                }
1161            }
1162            if link_offset > 0 {
1163                writer.write_u32::<LittleEndian>(link_offset as u32)?;
1164            }
1165        }
1166        Ok(())
1167    }
1168}
1169
1170impl MemLeaf {
1171    fn read_from(index: &Index, offset: u64) -> crate::Result<Self> {
1172        let buf = &index.buf;
1173        let offset = offset as usize;
1174        match buf.get(offset) {
1175            Some(&TYPE_INLINE_LEAF) => {
1176                let key_offset = offset + TYPE_BYTES;
1177                // Skip the key part
1178                let offset = key_offset + TYPE_BYTES;
1179                let (_key_start, vlq_len): (u64, _) = buf
1180                    .read_vlq_at(offset)
1181                    .context(index.path(), "cannot read key_start in MemLeaf::read_from")
1182                    .corruption()?;
1183                let offset = offset + vlq_len;
1184                let (_key_len, vlq_len): (u64, _) = buf
1185                    .read_vlq_at(offset)
1186                    .context(index.path(), "cannot read key_len in MemLeaf::read_from")
1187                    .corruption()?;
1188                let offset = offset + vlq_len;
1189                // Checksum will be verified by ExtKey and Leaf nodes
1190                let key_offset = Offset::from_disk(index, key_offset as u64)?;
1191                let link_offset =
1192                    LinkOffset::from_offset(Offset::from_disk(index, offset as u64)?, index)?;
1193                Ok(MemLeaf {
1194                    key_offset,
1195                    link_offset,
1196                })
1197            }
1198            Some(&TYPE_LEAF) => {
1199                let (key_offset, len1) = buf
1200                    .read_vlq_at(offset + TYPE_BYTES)
1201                    .context(index.path(), "cannot read key_offset in MemLeaf::read_from")
1202                    .corruption()?;
1203                let key_offset = Offset::from_disk(index, key_offset)?;
1204                let (link_offset, len2) = buf
1205                    .read_vlq_at(offset + TYPE_BYTES + len1)
1206                    .context(
1207                        index.path(),
1208                        "cannot read link_offset in MemLeaf::read_from",
1209                    )
1210                    .corruption()?;
1211                let link_offset =
1212                    LinkOffset::from_offset(Offset::from_disk(index, link_offset)?, index)?;
1213                index.verify_checksum(offset as u64, (TYPE_BYTES + len1 + len2) as u64)?;
1214                Ok(MemLeaf {
1215                    key_offset,
1216                    link_offset,
1217                })
1218            }
1219            _ => Err(index.range_error(offset, 1)),
1220        }
1221    }
1222
1223    /// If the entry is suitable for writing inline, write an inline entry, mark dependent
1224    /// entries as "unused", and return `true`. Otherwise do nothing and return `false`.
1225    ///
1226    /// The caller probably wants to set this entry to "unused" to prevent writing twice,
1227    /// if true is returned.
1228    fn maybe_write_inline_to(
1229        &self,
1230        writer: &mut Vec<u8>,
1231        buf: &[u8],
1232        buf_offset: u64,
1233        dirty_ext_keys: &mut Vec<MemExtKey>,
1234        dirty_links: &mut Vec<MemLink>,
1235        offset_map: &mut OffsetMap,
1236    ) -> crate::Result<bool> {
1237        debug_assert!(!self.is_unused());
1238
1239        // Conditions to be inlined:
1240        // - Both Key and Link are dirty (in-memory). Otherwise this might waste space.
1241        // - Key is ExtKey. This is just to make implementation easier. Owned key support might be
1242        // added in the future.
1243        // - Link does not refer to another in-memory link that hasn't been written yet (i.e.
1244        //   does not exist in offset_map). This is just to make implementation easier.
1245
1246        let are_dependencies_dirty = self.key_offset.is_dirty() && self.link_offset.is_dirty();
1247
1248        if are_dependencies_dirty {
1249            // Not being able to read the key offset is not a fatal error here - it
1250            // disables the inline optimization. But everything else works just fine.
1251            if let Some(TypedOffset::ExtKey(key_offset)) = self.key_offset.to_optional_typed(buf) {
1252                let ext_key_index = key_offset.dirty_index();
1253                let link_index = self.link_offset.dirty_index();
1254                let ext_key = dirty_ext_keys.get_mut(ext_key_index).unwrap();
1255                let link = dirty_links.get_mut(link_index).unwrap();
1256
1257                let next_link_offset = link.next_link_offset;
1258                if next_link_offset.is_dirty()
1259                    && offset_map.link_map[next_link_offset.dirty_index()] == 0
1260                {
1261                    // Dependent Link is not written yet.
1262                    return Ok(false);
1263                }
1264
1265                // Header
1266                writer.write_all(&[TYPE_INLINE_LEAF]).infallible()?;
1267
1268                // Inlined ExtKey
1269                let offset = buf.len() as u64 + buf_offset;
1270                offset_map.ext_key_map[ext_key_index] = offset;
1271                ext_key.write_to(writer, offset_map).infallible()?;
1272
1273                // Inlined Link
1274                let offset = buf.len() as u64 + buf_offset;
1275                offset_map.link_map[ext_key_index] = offset;
1276                link.write_to(writer, offset_map).infallible()?;
1277
1278                ext_key.mark_unused();
1279                link.mark_unused();
1280
1281                Ok(true)
1282            } else {
1283                // InlineLeaf only supports ExtKey, not embedded Key.
1284                Ok(false)
1285            }
1286        } else {
1287            Ok(false)
1288        }
1289    }
1290
1291    /// Write a Leaf entry.
1292    fn write_noninline_to<W: Write>(
1293        &self,
1294        writer: &mut W,
1295        offset_map: &OffsetMap,
1296    ) -> io::Result<()> {
1297        debug_assert!(!self.is_unused());
1298        writer.write_all(&[TYPE_LEAF])?;
1299        writer.write_vlq(self.key_offset.to_disk(offset_map))?;
1300        writer.write_vlq(self.link_offset.to_disk(offset_map))?;
1301        Ok(())
1302    }
1303
1304    /// Mark the entry as unused. An unused entry won't be written to disk.
1305    fn mark_unused(&mut self) {
1306        self.key_offset = Offset::default();
1307    }
1308
1309    #[inline]
1310    fn is_unused(&self) -> bool {
1311        self.key_offset.is_null()
1312    }
1313}
1314
1315impl MemLink {
1316    fn read_from(index: impl IndexBuf, offset: u64) -> crate::Result<Self> {
1317        let buf = index.buf();
1318        let offset = offset as usize;
1319        check_type(&index, offset, TYPE_LINK)?;
1320        let (value, len1) = buf
1321            .read_vlq_at(offset + 1)
1322            .context(index.path(), "cannot read link_value in MemLink::read_from")
1323            .corruption()?;
1324        let (next_link_offset, len2) = buf
1325            .read_vlq_at(offset + TYPE_BYTES + len1)
1326            .context(
1327                index.path(),
1328                "cannot read next_link_offset in MemLink::read_from",
1329            )
1330            .corruption()?;
1331        let next_link_offset =
1332            LinkOffset::from_offset(Offset::from_disk(&index, next_link_offset)?, &index)?;
1333        index.verify_checksum(offset as u64, (TYPE_BYTES + len1 + len2) as u64)?;
1334        Ok(MemLink {
1335            value,
1336            next_link_offset,
1337            unused: false,
1338        })
1339    }
1340
1341    fn write_to<W: Write>(&self, writer: &mut W, offset_map: &OffsetMap) -> io::Result<()> {
1342        writer.write_all(&[TYPE_LINK])?;
1343        writer.write_vlq(self.value)?;
1344        writer.write_vlq(self.next_link_offset.to_disk(offset_map))?;
1345        Ok(())
1346    }
1347
1348    /// Mark the entry as unused. An unused entry won't be written to disk.
1349    fn mark_unused(&mut self) {
1350        self.unused = true;
1351    }
1352
1353    #[inline]
1354    fn is_unused(&self) -> bool {
1355        self.unused
1356    }
1357}
1358
1359impl MemKey {
1360    fn read_from(index: impl IndexBuf, offset: u64) -> crate::Result<Self> {
1361        let buf = index.buf();
1362        let offset = offset as usize;
1363        check_type(&index, offset, TYPE_KEY)?;
1364        let (key_len, len): (usize, _) = buf
1365            .read_vlq_at(offset + 1)
1366            .context(index.path(), "cannot read key_len in MemKey::read_from")
1367            .corruption()?;
1368        let key = Vec::from(
1369            buf.get(offset + TYPE_BYTES + len..offset + TYPE_BYTES + len + key_len)
1370                .ok_or_else(|| index.range_error(offset + TYPE_BYTES + len, key_len))?,
1371        )
1372        .into_boxed_slice();
1373        index.verify_checksum(offset as u64, (TYPE_BYTES + len + key_len) as u64)?;
1374        Ok(MemKey { key })
1375    }
1376
1377    fn write_to<W: Write>(&self, writer: &mut W, _: &OffsetMap) -> io::Result<()> {
1378        writer.write_all(&[TYPE_KEY])?;
1379        writer.write_vlq(self.key.len())?;
1380        writer.write_all(&self.key)?;
1381        Ok(())
1382    }
1383
1384    /// Mark the entry as unused. An unused entry won't be written to disk.
1385    fn mark_unused(&mut self) {
1386        self.key = Vec::new().into_boxed_slice();
1387    }
1388
1389    #[inline]
1390    fn is_unused(&self) -> bool {
1391        self.key.len() == 0
1392    }
1393}
1394
1395impl MemExtKey {
1396    fn read_from(index: impl IndexBuf, offset: u64) -> crate::Result<Self> {
1397        let buf = index.buf();
1398        let offset = offset as usize;
1399        check_type(&index, offset, TYPE_EXT_KEY)?;
1400        let (start, vlq_len1) = buf
1401            .read_vlq_at(offset + TYPE_BYTES)
1402            .context(
1403                index.path(),
1404                "cannot read ext_key_start in MemExtKey::read_from",
1405            )
1406            .corruption()?;
1407        let (len, vlq_len2) = buf
1408            .read_vlq_at(offset + TYPE_BYTES + vlq_len1)
1409            .context(
1410                index.path(),
1411                "cannot read ext_key_len in MemExtKey::read_from",
1412            )
1413            .corruption()?;
1414        index.verify_checksum(offset as u64, (TYPE_BYTES + vlq_len1 + vlq_len2) as u64)?;
1415        Ok(MemExtKey { start, len })
1416    }
1417
1418    fn write_to<W: Write>(&self, writer: &mut W, _: &OffsetMap) -> io::Result<()> {
1419        writer.write_all(&[TYPE_EXT_KEY])?;
1420        writer.write_vlq(self.start)?;
1421        writer.write_vlq(self.len)?;
1422        Ok(())
1423    }
1424
1425    /// Mark the entry as unused. An unused entry won't be written to disk.
1426    fn mark_unused(&mut self) {
1427        self.len = 0;
1428    }
1429
1430    #[inline]
1431    fn is_unused(&self) -> bool {
1432        self.len == 0
1433    }
1434}
1435
1436impl MemRoot {
1437    fn read_from(index: impl IndexBuf, offset: u64) -> crate::Result<(Self, usize)> {
1438        let offset = offset as usize;
1439        let mut cur = offset;
1440        check_type(&index, offset, TYPE_ROOT)?;
1441        cur += TYPE_BYTES;
1442
1443        let (radix_offset, vlq_len) = index
1444            .buf()
1445            .read_vlq_at(cur)
1446            .context(index.path(), "cannot read radix_offset")
1447            .corruption()?;
1448        cur += vlq_len;
1449
1450        let radix_offset =
1451            RadixOffset::from_offset(Offset::from_disk(&index, radix_offset)?, &index)?;
1452
1453        let (meta_len, vlq_len): (usize, _) = index
1454            .buf()
1455            .read_vlq_at(cur)
1456            .context(index.path(), "cannot read meta_len")
1457            .corruption()?;
1458        cur += vlq_len;
1459
1460        let meta = index
1461            .buf()
1462            .get(cur..cur + meta_len)
1463            .ok_or_else(|| index.range_error(cur, meta_len))?;
1464        cur += meta_len;
1465
1466        index.verify_checksum(offset as u64, (cur - offset) as u64)?;
1467        Ok((
1468            MemRoot {
1469                radix_offset,
1470                meta: meta.to_vec().into_boxed_slice(),
1471            },
1472            cur - offset,
1473        ))
1474    }
1475
1476    fn write_to<W: Write>(&self, writer: &mut W, offset_map: &OffsetMap) -> io::Result<usize> {
1477        let mut buf = Vec::with_capacity(16);
1478        buf.write_all(&[TYPE_ROOT])?;
1479        buf.write_vlq(self.radix_offset.to_disk(offset_map))?;
1480        buf.write_vlq(self.meta.len())?;
1481        buf.write_all(&self.meta)?;
1482        let len = buf.len();
1483        writer.write_all(&buf)?;
1484        Ok(len)
1485    }
1486}
1487
1488impl MemChecksum {
1489    fn read_from(index: &impl IndexBuf, start_offset: u64) -> crate::Result<(Self, usize)> {
1490        let span = debug_span!("MemRadix::read", offset = start_offset);
1491        let _entered = span.enter();
1492        let mut result = Self::default();
1493        let mut size: usize = 0;
1494
1495        let mut offset = start_offset as usize;
1496        while offset > 0 {
1497            let mut cur: usize = offset;
1498
1499            check_type(&index, cur, TYPE_CHECKSUM)?;
1500            cur += TYPE_BYTES;
1501
1502            let (previous_offset, vlq_len): (u64, _) = index
1503                .buf()
1504                .read_vlq_at(cur)
1505                .context(index.path(), "cannot read previous_checksum_offset")
1506                .corruption()?;
1507            cur += vlq_len;
1508
1509            let (chunk_size_logarithm, vlq_len): (u32, _) = index
1510                .buf()
1511                .read_vlq_at(cur)
1512                .context(index.path(), "cannot read chunk_size_logarithm")
1513                .corruption()?;
1514
1515            if chunk_size_logarithm > 31 {
1516                return Err(crate::Error::corruption(
1517                    index.path(),
1518                    format!(
1519                        "invalid chunk_size_logarithm {} at {}",
1520                        chunk_size_logarithm, cur
1521                    ),
1522                ));
1523            }
1524            cur += vlq_len;
1525
1526            let chunk_size = 1usize << chunk_size_logarithm;
1527            let chunk_needed = (offset + chunk_size - 1) >> chunk_size_logarithm;
1528
1529            let is_initial_checksum = start_offset == offset as u64;
1530
1531            // Initialize our Self result in our first iteration.
1532            if is_initial_checksum {
1533                result.set_chunk_size_logarithm(index.buf(), chunk_size_logarithm)?;
1534
1535                result.xxhash_list.resize(chunk_needed, 0);
1536                result.start = previous_offset;
1537                result.end = offset as u64;
1538
1539                let checked_needed = (result.xxhash_list.len() + 63) / 64;
1540                result.checked.resize_with(checked_needed, Default::default);
1541            }
1542            result.chain_len = result.chain_len.saturating_add(1);
1543
1544            //    0     1     2     3     4     5
1545            // |chunk|chunk|chunk|chunk|chunk|chunk|
1546            // |--------------|-----------------|---
1547            // 0              ^ previous        ^ offset
1548            //
1549            // The previous Checksum entry covers chunk 0,1,2(incomplete).
1550            // This Checksum entry covers chunk 2(complete),3,4,5(incomplete).
1551
1552            // Read the new checksums.
1553            let start_chunk_index = (previous_offset >> chunk_size_logarithm) as usize;
1554            for i in start_chunk_index..chunk_needed {
1555                let incomplete_chunk = offset % chunk_size > 0 && i == chunk_needed - 1;
1556
1557                // Don't record incomplete chunk hashes for previous checksums
1558                // since the "next" checksum (i.e. previous loop iteration) will
1559                // have already written the complete chunk's hash.
1560                if is_initial_checksum || !incomplete_chunk {
1561                    result.xxhash_list[i] = (&index.buf()[cur..])
1562                        .read_u64::<LittleEndian>()
1563                        .context(index.path(), "cannot read xxhash for checksum")?;
1564                }
1565                cur += 8;
1566            }
1567
1568            // Check the checksum buffer itself.
1569            let xx32_read = (&index.buf()[cur..])
1570                .read_u32::<LittleEndian>()
1571                .context(index.path(), "cannot read xxhash32 for checksum")?;
1572            let xx32_self = xxhash32(&index.buf()[offset as usize..cur as usize]);
1573            if xx32_read != xx32_self {
1574                return Err(crate::Error::corruption(
1575                    index.path(),
1576                    format!(
1577                        "checksum at {} fails integrity check ({} != {})",
1578                        offset, xx32_read, xx32_self
1579                    ),
1580                ));
1581            }
1582            cur += 4;
1583
1584            if is_initial_checksum {
1585                size = cur - offset;
1586            }
1587
1588            offset = previous_offset as usize;
1589        }
1590        span.record("chain_len", result.chain_len);
1591
1592        Ok((result, size))
1593    }
1594
1595    /// Incrementally update the checksums so it covers the file content + `append_buf`.
1596    /// Assume the file content is append-only.
1597    /// Call this before calling `write_to`.
1598    fn update(
1599        &mut self,
1600        old_buf: &[u8],
1601        file: &mut File,
1602        file_len: u64,
1603        append_buf: &[u8],
1604    ) -> io::Result<()> {
1605        let start_chunk_index = (self.end >> self.chunk_size_logarithm) as usize;
1606        let start_chunk_offset = (start_chunk_index as u64) << self.chunk_size_logarithm;
1607
1608        // Check the range that is being rewritten.
1609        let old_end = (old_buf.len() as u64).min(self.end);
1610        if old_end > start_chunk_offset {
1611            self.check_range(old_buf, start_chunk_offset, old_end - start_chunk_offset)?;
1612        }
1613
1614        let new_total_len = file_len + append_buf.len() as u64;
1615        let chunk_size = 1u64 << self.chunk_size_logarithm;
1616        let chunk_needed = ((new_total_len + chunk_size - 1) >> self.chunk_size_logarithm) as usize;
1617        self.xxhash_list.resize(chunk_needed, 0);
1618
1619        if self.end > file_len {
1620            return Err(io::Error::new(
1621                io::ErrorKind::UnexpectedEof,
1622                "unexpected truncation",
1623            ));
1624        }
1625
1626        //        start_chunk_offset (start of chunk including self.end)
1627        //                   v
1628        //                   |(1)|
1629        // |chunk|chunk|chunk|chunk|chunk|chunk|chunk|chunk|chunk|
1630        //                   |---file_buf---|
1631        // |--------------file--------------|
1632        // |-------old_buf-------|
1633        // |-----(2)-----|--(3)--|---(4)----|---append_buf---|
1634        //               ^       ^          ^                ^
1635        //          self.start self.end   file_len     new_total_len
1636        //
1637        // (1): range being re-checksummed
1638        // (2): covered by the previous Checksum
1639        // (3): covered by the current Checksum
1640        // (4): range written on-disk after `Index::open` (ex. by another process)
1641        //
1642        // old_buf:    buffer read at open time.
1643        // file:       buffer read at flush time (right now, with a lock).
1644        // append_buf: buffer to append to the file (protected by the same lock).
1645
1646        let file_buf = {
1647            let mut file_buf = vec![0; (file_len - start_chunk_offset) as usize];
1648            file.seek(SeekFrom::Start(start_chunk_offset))?;
1649            file.read_exact(&mut file_buf)?;
1650            file_buf
1651        };
1652
1653        for i in start_chunk_index..self.xxhash_list.len() {
1654            let start = (i as u64) << self.chunk_size_logarithm;
1655            let end = (start + chunk_size).min(new_total_len);
1656            let mut xx = XxHash::default();
1657            // Hash portions of file_buf that intersect with start..end.
1658            let file_buf_start = ((start - start_chunk_offset) as usize).min(file_buf.len());
1659            let file_buf_end = ((end - start_chunk_offset) as usize).min(file_buf.len());
1660            xx.write(&file_buf[file_buf_start..file_buf_end]);
1661            // Hash portions of append_buf that intersect with start..end.
1662            let append_buf_start = (start.max(file_len) - file_len) as usize;
1663            let append_buf_end = (end.max(file_len) - file_len) as usize;
1664            xx.write(&append_buf[append_buf_start..append_buf_end]);
1665            self.xxhash_list[i] = xx.finish();
1666        }
1667
1668        // Update start and end:
1669        //
1670        //      chunk | chunk
1671        //      start   end (both are valid ChecksumOffset, in different chunks)
1672        //      v       v
1673        // old: |-------|
1674        // new:         |--------|----(1)----|
1675        //              ^        ^
1676        //              start    new_total_len
1677        //
1678        // (1): Place that this Checksum will be written to.
1679        //
1680        // Avoid updating start if start and end are in a same chunk:
1681        //
1682        //       -----chunk----|
1683        //       start     end
1684        //       v         v
1685        // old:  |---------|
1686        // new:  |--------------------|
1687        //       ^                    ^
1688        //       start                end
1689        //
1690        // This makes the length of chain of Checksums O(len(chunks)).
1691        if (self.start >> self.chunk_size_logarithm) != (self.end >> self.chunk_size_logarithm) {
1692            self.start = self.end;
1693        }
1694        self.end = new_total_len;
1695
1696        Ok(())
1697    }
1698
1699    /// Extend the checksum range to cover the entire range of the index buffer
1700    /// so the next open would only read O(1) checksum entries.
1701    fn flatten(&mut self) {
1702        self.start = 0;
1703        self.chain_len = 1;
1704    }
1705
1706    fn write_to<W: Write>(&self, writer: &mut W, _offset_map: &OffsetMap) -> io::Result<usize> {
1707        let mut buf = Vec::with_capacity(16);
1708        buf.write_all(&[TYPE_CHECKSUM])?;
1709        buf.write_vlq(self.start)?;
1710        // self.end is implied by the write position.
1711        buf.write_vlq(self.chunk_size_logarithm)?;
1712        for &xx in self.xxhash_list_to_write() {
1713            buf.write_u64::<LittleEndian>(xx)?;
1714        }
1715        let xx32 = xxhash32(&buf);
1716        buf.write_u32::<LittleEndian>(xx32)?;
1717        writer.write_all(&buf)?;
1718        Ok(buf.len())
1719    }
1720
1721    /// Return a sub list of the xxhash list that is not covered by the
1722    /// previous Checksum entry. It's used for serialization.
1723    fn xxhash_list_to_write(&self) -> &[u64] {
1724        // See the comment in `read_from` for `start_chunk_index`.
1725        // It is the starting index for
1726        let start_chunk_index = (self.start >> self.chunk_size_logarithm) as usize;
1727        &self.xxhash_list[start_chunk_index..]
1728    }
1729
1730    /// Reset the `chunk_size_logarithm`.
1731    fn set_chunk_size_logarithm(
1732        &mut self,
1733        _buf: &[u8],
1734        chunk_size_logarithm: u32,
1735    ) -> crate::Result<()> {
1736        // Change chunk_size_logarithm if the checksum list is empty.
1737        if self.xxhash_list.is_empty() {
1738            self.chunk_size_logarithm = chunk_size_logarithm;
1739        }
1740        // NOTE: Consider re-hashing and allow changing the chunk_size_logarithm.
1741        Ok(())
1742    }
1743
1744    /// Check a range of bytes.
1745    ///
1746    /// Depending on `chunk_size_logarithm`, bytes outside the specified range
1747    /// might also be checked.
1748    #[inline]
1749    fn check_range(&self, buf: &[u8], offset: u64, length: u64) -> io::Result<()> {
1750        if length == 0 || !self.is_enabled() {
1751            return Ok(());
1752        }
1753
1754        // Ranges not covered by checksums are treated as bad.
1755        if offset + length > self.end {
1756            return checksum_error(self, offset, length);
1757        }
1758
1759        // Otherwise, scan related chunks.
1760        let start = (offset >> self.chunk_size_logarithm) as usize;
1761        let end = ((offset + length - 1) >> self.chunk_size_logarithm) as usize;
1762        if !(start..=end).all(|i| self.check_chunk(buf, i)) {
1763            return checksum_error(self, offset, length);
1764        }
1765        Ok(())
1766    }
1767
1768    /// Check the i-th chunk. The callsite must make sure `index` is within range.
1769    #[inline]
1770    fn check_chunk(&self, buf: &[u8], index: usize) -> bool {
1771        debug_assert!(index < self.xxhash_list.len());
1772        let bit = 1 << (index % 64);
1773        let checked = &self.checked[index / 64];
1774        if (checked.load(Acquire) & bit) == bit {
1775            true
1776        } else {
1777            let start = index << self.chunk_size_logarithm;
1778            let end = (self.end as usize).min((index + 1) << self.chunk_size_logarithm);
1779            if start == end {
1780                return true;
1781            }
1782            let hash = xxhash(&buf[start..end]);
1783            if hash == self.xxhash_list[index] {
1784                checked.fetch_or(bit, AcqRel);
1785                true
1786            } else {
1787                false
1788            }
1789        }
1790    }
1791
1792    #[inline]
1793    fn is_enabled(&self) -> bool {
1794        self.end > 0
1795    }
1796}
1797
1798fn write_reversed_vlq(mut writer: impl Write, value: usize) -> io::Result<()> {
1799    let mut reversed_vlq = Vec::new();
1800    reversed_vlq.write_vlq(value)?;
1801    reversed_vlq.reverse();
1802    writer.write_all(&reversed_vlq)?;
1803    Ok(())
1804}
1805
1806impl Clone for MemChecksum {
1807    fn clone(&self) -> Self {
1808        Self {
1809            start: self.start,
1810            end: self.end,
1811            chain_len: self.chain_len,
1812            chunk_size_logarithm: self.chunk_size_logarithm,
1813            xxhash_list: self.xxhash_list.clone(),
1814            checked: self
1815                .checked
1816                .iter()
1817                .map(|c| c.load(Relaxed))
1818                .map(AtomicU64::new)
1819                .collect(),
1820        }
1821    }
1822}
1823
1824impl Default for MemChecksum {
1825    fn default() -> Self {
1826        Self {
1827            start: 0,
1828            end: 0,
1829            chain_len: 0,
1830            chunk_size_logarithm: 20, // chunk_size: 1MB.
1831            xxhash_list: Vec::new(),
1832            checked: Vec::new(),
1833        }
1834    }
1835}
1836
1837#[derive(Default)]
1838struct OffsetMap {
1839    radix_len: usize,
1840    radix_map: Vec<u64>,
1841    leaf_map: Vec<u64>,
1842    link_map: Vec<u64>,
1843    key_map: Vec<u64>,
1844    ext_key_map: Vec<u64>,
1845}
1846
1847/// A simple structure that implements the IndexBuf interface.
1848/// It does not check checksum.
1849struct SimpleIndexBuf<'a>(&'a [u8], &'a Path);
1850
1851impl<'a> IndexBuf for SimpleIndexBuf<'a> {
1852    fn buf(&self) -> &[u8] {
1853        self.0
1854    }
1855    fn verify_checksum(&self, _start: u64, _length: u64) -> crate::Result<()> {
1856        Ok(())
1857    }
1858    fn path(&self) -> &Path {
1859        &self.1
1860    }
1861}
1862
1863impl OffsetMap {
1864    fn empty_for_index(index: &Index) -> OffsetMap {
1865        let radix_len = index.dirty_radixes.len();
1866        OffsetMap {
1867            radix_len,
1868            radix_map: vec![0; radix_len],
1869            leaf_map: vec![0; index.dirty_leafs.len()],
1870            link_map: vec![0; index.dirty_links.len()],
1871            key_map: vec![0; index.dirty_keys.len()],
1872            ext_key_map: vec![0; index.dirty_ext_keys.len()],
1873        }
1874    }
1875
1876    #[inline]
1877    fn get(&self, offset: Offset) -> u64 {
1878        if offset.is_dirty() {
1879            let dummy = SimpleIndexBuf(b"", Path::new("<dummy>"));
1880            let result = match offset.to_typed(dummy).unwrap() {
1881                // Radix entries are pushed in the reversed order. So the index needs to be
1882                // reversed.
1883                TypedOffset::Radix(x) => self.radix_map[self.radix_len - 1 - x.dirty_index()],
1884                TypedOffset::Leaf(x) => self.leaf_map[x.dirty_index()],
1885                TypedOffset::Link(x) => self.link_map[x.dirty_index()],
1886                TypedOffset::Key(x) => self.key_map[x.dirty_index()],
1887                TypedOffset::ExtKey(x) => self.ext_key_map[x.dirty_index()],
1888                TypedOffset::Checksum(_) => {
1889                    panic!("bug: ChecksumOffset shouldn't be used in OffsetMap::get")
1890                }
1891            };
1892            // result == 0 means an entry marked "unused" is actually used. It's a logic error.
1893            debug_assert!(result > 0);
1894            result
1895        } else {
1896            // No need to translate.
1897            offset.0
1898        }
1899    }
1900}
1901
1902/// Choose between Front and Back. Used by [`RangeIter`] related logic.
1903#[derive(Clone, Copy)]
1904enum Side {
1905    Front,
1906    Back,
1907}
1908use Side::Back;
1909use Side::Front;
1910
1911/// State used by [`RangeIter`].
1912#[derive(Clone, Copy, PartialEq, Debug)]
1913enum IterState {
1914    /// Visiting the child of a radix node.
1915    /// child must be inside 0..16 range.
1916    RadixChild(RadixOffset, u8),
1917
1918    /// Visiting the leaf of a radix node.
1919    RadixLeaf(RadixOffset),
1920
1921    /// Visiting this leaf node.
1922    Leaf(LeafOffset),
1923
1924    /// Dummy states to express "inclusive" bounds.
1925    /// RadixStart < RadixLeaf < RadixChild < RadixEnd.
1926    /// LeafStart < Leaf < LeafEnd.
1927    RadixStart(RadixOffset),
1928    RadixEnd(RadixOffset),
1929    LeafStart(LeafOffset),
1930    LeafEnd(LeafOffset),
1931}
1932
1933impl IterState {
1934    /// Get the next state on the same frame.
1935    /// Return `None` if the frame should be popped.
1936    fn next(self) -> Option<Self> {
1937        match self {
1938            IterState::RadixChild(radix, 15) => Some(IterState::RadixEnd(radix)),
1939            IterState::RadixChild(radix, i) => Some(IterState::RadixChild(radix, i + 1)),
1940            IterState::RadixStart(radix) => Some(IterState::RadixLeaf(radix)),
1941            IterState::RadixLeaf(radix) => Some(IterState::RadixChild(radix, 0)),
1942            IterState::LeafStart(leaf) => Some(IterState::Leaf(leaf)),
1943            IterState::Leaf(leaf) => Some(IterState::LeafEnd(leaf)),
1944            _ => None,
1945        }
1946    }
1947
1948    /// Get the previous state on the same frame.
1949    /// Return `None` if the frame should be popped.
1950    fn prev(self) -> Option<Self> {
1951        match self {
1952            IterState::RadixChild(radix, 0) => Some(IterState::RadixLeaf(radix)),
1953            IterState::RadixChild(radix, i) => Some(IterState::RadixChild(radix, i - 1)),
1954            IterState::RadixEnd(radix) => Some(IterState::RadixChild(radix, 15)),
1955            IterState::RadixLeaf(radix) => Some(IterState::RadixStart(radix)),
1956            IterState::LeafEnd(leaf) => Some(IterState::Leaf(leaf)),
1957            IterState::Leaf(leaf) => Some(IterState::LeafStart(leaf)),
1958            _ => None,
1959        }
1960    }
1961
1962    /// Move one step towards the given side.
1963    fn step(self, towards: Side) -> Option<Self> {
1964        match towards {
1965            Front => self.prev(),
1966            Back => self.next(),
1967        }
1968    }
1969}
1970
1971//// Main Index
1972
1973/// Insertion-only mapping from `bytes` to a list of [u64]s.
1974///
1975/// An [`Index`] is backed by an append-only file in the filesystem. Internally,
1976/// it uses base16 radix trees for keys and linked list for [u64] values. The
1977/// file format was designed to be able to support other types of indexes (ex.
1978/// non-radix-trees). Though none of them are implemented.
1979pub struct Index {
1980    // For locking and low-level access.
1981    file: Option<File>,
1982
1983    // For efficient and shared random reading.
1984    // Backed by mmap.
1985    pub(crate) buf: Bytes,
1986
1987    // For error messages.
1988    // Log uses this field for error messages.
1989    pub(crate) path: PathBuf,
1990
1991    // Options
1992    checksum_enabled: bool,
1993    checksum_max_chain_len: u32,
1994    fsync: bool,
1995    write: Option<bool>,
1996
1997    // Used by `clear_dirty`.
1998    clean_root: MemRoot,
1999
2000    // In-memory entries. The root entry is always in-memory.
2001    dirty_root: MemRoot,
2002    dirty_radixes: Vec<MemRadix>,
2003    dirty_leafs: Vec<MemLeaf>,
2004    dirty_links: Vec<MemLink>,
2005    dirty_keys: Vec<MemKey>,
2006    dirty_ext_keys: Vec<MemExtKey>,
2007
2008    checksum: MemChecksum,
2009
2010    // Additional buffer for external keys.
2011    // Log::sync needs write access to this field.
2012    pub(crate) key_buf: Arc<dyn ReadonlyBuffer + Send + Sync>,
2013}
2014
2015/// Abstraction of the "external key buffer".
2016///
2017/// This makes it possible to use non-contiguous memory for a buffer,
2018/// and expose them as if it's contiguous.
2019pub trait ReadonlyBuffer {
2020    /// Get a slice using the given offset.
2021    fn slice(&self, start: u64, len: u64) -> Option<&[u8]>;
2022}
2023
2024impl<T: AsRef<[u8]>> ReadonlyBuffer for T {
2025    #[inline]
2026    fn slice(&self, start: u64, len: u64) -> Option<&[u8]> {
2027        self.as_ref().get(start as usize..(start + len) as usize)
2028    }
2029}
2030
2031/// Key to insert. Used by [Index::insert_advanced].
2032pub enum InsertKey<'a> {
2033    /// Embedded key.
2034    Embed(&'a [u8]),
2035
2036    /// Reference (`[start, end)`) to `key_buf`.
2037    Reference((u64, u64)),
2038}
2039
2040/// Options used to configured how an [`Index`] is opened.
2041///
2042/// Similar to [std::fs::OpenOptions], to use this, first call `new`, then
2043/// chain calls to methods to set each option, finally call `open` to get
2044/// an [`Index`] structure.
2045#[derive(Clone)]
2046pub struct OpenOptions {
2047    checksum_max_chain_len: u32,
2048    checksum_chunk_size_logarithm: u32,
2049    checksum_enabled: bool,
2050    fsync: bool,
2051    len: Option<u64>,
2052    write: Option<bool>,
2053    key_buf: Option<Arc<dyn ReadonlyBuffer + Send + Sync>>,
2054}
2055
2056impl OpenOptions {
2057    #[allow(clippy::new_without_default)]
2058    /// Create [`OpenOptions`] with default configuration:
2059    /// - checksum enabled, with 1MB chunk size
2060    /// - checksum max chain length is `config::INDEX_CHECKSUM_MAX_CHAIN_LEN` (default: 10)
2061    /// - no external key buffer
2062    /// - no fsync
2063    /// - read root entry from the end of the file
2064    /// - open as read-write but fallback to read-only
2065    pub fn new() -> OpenOptions {
2066        OpenOptions {
2067            checksum_max_chain_len: config::INDEX_CHECKSUM_MAX_CHAIN_LEN.load(Acquire),
2068            checksum_chunk_size_logarithm: 20,
2069            checksum_enabled: true,
2070            fsync: false,
2071            len: None,
2072            write: None,
2073            key_buf: None,
2074        }
2075    }
2076
2077    /// Set the maximum checksum chain length.
2078    ///
2079    /// If it is non-zero, and the checksum chain (linked list of checksum
2080    /// entries needed to verify the entire index) exceeds the specified length,
2081    /// they will be collapsed into a single checksum entry to make `open` more
2082    /// efficient.
2083    pub fn checksum_max_chain_len(&mut self, len: u32) -> &mut Self {
2084        self.checksum_max_chain_len = len;
2085        self
2086    }
2087
2088    /// Set checksum chunk size as `1 << checksum_chunk_size_logarithm`.
2089    pub fn checksum_chunk_size_logarithm(
2090        &mut self,
2091        checksum_chunk_size_logarithm: u32,
2092    ) -> &mut Self {
2093        self.checksum_chunk_size_logarithm = checksum_chunk_size_logarithm;
2094        self
2095    }
2096
2097    /// Set whether to write checksum entries on `flush`.
2098    pub fn checksum_enabled(&mut self, checksum_enabled: bool) -> &mut Self {
2099        self.checksum_enabled = checksum_enabled;
2100        self
2101    }
2102
2103    /// Set fsync behavior.
2104    ///
2105    /// If true, then [`Index::flush`] will use `fsync` to flush data to the
2106    /// physical device before returning.
2107    pub fn fsync(&mut self, fsync: bool) -> &mut Self {
2108        self.fsync = fsync;
2109        self
2110    }
2111
2112    /// Set whether writing is required:
2113    ///
2114    /// - `None`: open as read-write but fallback to read-only. `flush()` may fail.
2115    /// - `Some(false)`: open as read-only. `flush()` will always fail.
2116    /// - `Some(true)`: open as read-write. `open()` fails if read-write is not
2117    ///   possible. `flush()` will not fail due to permission issues.
2118    ///
2119    /// Note:  The index is always mutable in-memory. Only `flush()` may fail.
2120    pub fn write(&mut self, value: Option<bool>) -> &mut Self {
2121        self.write = value;
2122        self
2123    }
2124
2125    /// Specify the logical length of the file.
2126    ///
2127    /// If `len` is `None`, use the actual file length. Otherwise, use the
2128    /// length specified by `len`. Reading the file length requires locking.
2129    ///
2130    /// This is useful for lock-free reads, or accessing to multiple versions of
2131    /// the index at the same time.
2132    ///
2133    /// To get a valid logical length, check the return value of [`Index::flush`].
2134    pub fn logical_len(&mut self, len: Option<u64>) -> &mut Self {
2135        self.len = len;
2136        self
2137    }
2138
2139    /// Specify the external key buffer.
2140    ///
2141    /// With an external key buffer, keys could be stored as references using
2142    /// `index.insert_advanced` to save space.
2143    pub fn key_buf(&mut self, buf: Option<Arc<dyn ReadonlyBuffer + Send + Sync>>) -> &mut Self {
2144        self.key_buf = buf;
2145        self
2146    }
2147
2148    /// Open the index file with given options.
2149    ///
2150    /// Driven by the "immutable by default" idea, together with append-only
2151    /// properties, [`OpenOptions::open`] returns a "snapshotted" view of the
2152    /// index. Changes to the filesystem won't change instantiated [`Index`]es.
2153    pub fn open<P: AsRef<Path>>(&self, path: P) -> crate::Result<Index> {
2154        let path = path.as_ref();
2155        let result: crate::Result<_> = (|| {
2156            let span = debug_span!("Index::open", path = path.to_string_lossy().as_ref());
2157            let _guard = span.enter();
2158
2159            let mut open_options = self.clone();
2160            let open_result = if self.write == Some(false) {
2161                fs::OpenOptions::new().read(true).open(path)
2162            } else {
2163                fs::OpenOptions::new()
2164                    .read(true)
2165                    .write(true)
2166                    .create(true)
2167                    .append(true)
2168                    .open(path)
2169            };
2170            let mut file = match self.write {
2171                Some(write) => open_result.context(
2172                    path,
2173                    if write {
2174                        "cannot open Index with read-write mode"
2175                    } else {
2176                        "cannot open Index with read-only mode"
2177                    },
2178                )?,
2179                None => {
2180                    // Fall back to open the file as read-only, automatically.
2181                    match open_result {
2182                        Err(_) => {
2183                            open_options.write = Some(false);
2184                            fs::OpenOptions::new()
2185                                .read(true)
2186                                .open(path)
2187                                .context(path, "cannot open Index with read-only mode")?
2188                        }
2189                        Ok(file) => file,
2190                    }
2191                }
2192            };
2193
2194            let bytes = {
2195                match self.len {
2196                    None => {
2197                        // Take the lock to read file length, since that decides root entry location.
2198                        let lock = ScopedFileLock::new(&mut file, false)
2199                            .context(path, "cannot lock Log to read file length")?;
2200                        mmap_bytes(lock.as_ref(), None).context(path, "cannot mmap")?
2201                    }
2202                    Some(len) => {
2203                        // No need to lock for getting file length.
2204                        mmap_bytes(&file, Some(len)).context(path, "cannot mmap")?
2205                    }
2206                }
2207            };
2208
2209            let (dirty_radixes, clean_root, mut checksum) = if bytes.is_empty() {
2210                // Empty file. Create root radix entry as an dirty entry, and
2211                // rebuild checksum table (in case it's corrupted).
2212                let radix_offset = RadixOffset::from_dirty_index(0);
2213                let _ = utils::fix_perm_file(&file, false);
2214                let meta = Default::default();
2215                let root = MemRoot { radix_offset, meta };
2216                let checksum = MemChecksum::default();
2217                (vec![MemRadix::default()], root, checksum)
2218            } else {
2219                let end = bytes.len();
2220                let (root, mut checksum) = read_root_checksum_at_end(path, &bytes, end)?;
2221                if !self.checksum_enabled {
2222                    checksum = MemChecksum::default();
2223                }
2224                (vec![], root, checksum)
2225            };
2226
2227            checksum.set_chunk_size_logarithm(&bytes, self.checksum_chunk_size_logarithm)?;
2228            let key_buf = self.key_buf.clone();
2229            let dirty_root = clean_root.clone();
2230
2231            let index = Index {
2232                file: Some(file),
2233                buf: bytes,
2234                path: path.to_path_buf(),
2235                // Deconstruct open_options instead of storing it whole, since it contains a
2236                // permanent reference to the original key_buf mmap.
2237                checksum_enabled: open_options.checksum_enabled,
2238                checksum_max_chain_len: open_options.checksum_max_chain_len,
2239                fsync: open_options.fsync,
2240                write: open_options.write,
2241                clean_root,
2242                dirty_root,
2243                checksum,
2244                dirty_radixes,
2245                dirty_links: vec![],
2246                dirty_leafs: vec![],
2247                dirty_keys: vec![],
2248                dirty_ext_keys: vec![],
2249                key_buf: key_buf.unwrap_or_else(|| Arc::new(&b""[..])),
2250            };
2251
2252            Ok(index)
2253        })();
2254        result.context(|| format!("in index::OpenOptions::open({:?})", path))
2255    }
2256
2257    /// Create an in-memory [`Index`] that skips flushing to disk.
2258    /// Return an error if `checksum_chunk_size_logarithm` is not 0.
2259    pub fn create_in_memory(&self) -> crate::Result<Index> {
2260        let result: crate::Result<_> = (|| {
2261            let buf = Bytes::new();
2262            let dirty_radixes = vec![MemRadix::default()];
2263            let clean_root = {
2264                let radix_offset = RadixOffset::from_dirty_index(0);
2265                let meta = Default::default();
2266                MemRoot { radix_offset, meta }
2267            };
2268            let key_buf = self.key_buf.clone();
2269            let dirty_root = clean_root.clone();
2270            let mut checksum = MemChecksum::default();
2271            checksum.set_chunk_size_logarithm(&buf, self.checksum_chunk_size_logarithm)?;
2272
2273            Ok(Index {
2274                file: None,
2275                buf,
2276                path: PathBuf::new(),
2277                checksum_enabled: self.checksum_enabled,
2278                checksum_max_chain_len: self.checksum_max_chain_len,
2279                fsync: self.fsync,
2280                write: self.write,
2281                clean_root,
2282                dirty_root,
2283                checksum,
2284                dirty_radixes,
2285                dirty_links: vec![],
2286                dirty_leafs: vec![],
2287                dirty_keys: vec![],
2288                dirty_ext_keys: vec![],
2289                key_buf: key_buf.unwrap_or_else(|| Arc::new(&b""[..])),
2290            })
2291        })();
2292        result.context("in index::OpenOptions::create_in_memory")
2293    }
2294}
2295
2296/// Load root and checksum from the logical end.
2297fn read_root_checksum_at_end(
2298    path: &Path,
2299    bytes: &[u8],
2300    end: usize,
2301) -> crate::Result<(MemRoot, MemChecksum)> {
2302    // root_offset      (root_offset + root_size)
2303    // v                v
2304    // |---root_entry---|---checksum_entry---|--reversed_vlq_size---|
2305    // |<--------- root_checksum_size ------>|
2306    // |<-- root_size ->|
2307
2308    let buf = SimpleIndexBuf(&bytes, path);
2309    // Be careful! SimpleIndexBuf does not do data verification.
2310    // Handle integer range overflows here.
2311    let (root_checksum_size, vlq_size) =
2312        read_vlq_reverse(&bytes, end).context(path, "cannot read len(root+checksum)")?;
2313
2314    // Verify the header byte.
2315    check_type(&buf, 0, TYPE_HEAD)?;
2316
2317    if end < root_checksum_size as usize + vlq_size {
2318        return Err(crate::Error::corruption(
2319            path,
2320            format!(
2321                "data corrupted at {} (invalid size: {})",
2322                end, root_checksum_size
2323            ),
2324        ));
2325    }
2326
2327    let root_offset = end - root_checksum_size as usize - vlq_size;
2328    let (root, root_size) = MemRoot::read_from(&buf, root_offset as u64)?;
2329
2330    let checksum = if root_offset + root_size + vlq_size == end {
2331        // No checksum - checksum disabled.
2332        MemChecksum::default()
2333    } else {
2334        let (checksum, _checksum_len) =
2335            MemChecksum::read_from(&buf, (root_offset + root_size) as u64)?;
2336
2337        // Not checking lengths here:
2338        //
2339        // root_offset + root_size + checksum_len + vlq_size == end
2340        // so we can add new kinds of data in the future without breaking
2341        // older clients, similar to how Checksum gets added while
2342        // maintaining compatibility.
2343
2344        // Verify the Root entry, since SimpleIndexBuf skips checksum check.
2345        // Checksum is self-verified. So no need to check it.
2346        checksum
2347            .check_range(bytes, root_offset as u64, root_size as u64)
2348            .context(path, "failed to verify Root entry")?;
2349        checksum
2350    };
2351
2352    Ok((root, checksum))
2353}
2354
2355impl fmt::Debug for OpenOptions {
2356    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2357        write!(f, "OpenOptions {{ ")?;
2358        write!(
2359            f,
2360            "checksum_chunk_size_logarithm: {}, ",
2361            self.checksum_chunk_size_logarithm
2362        )?;
2363        write!(f, "fsync: {}, ", self.fsync)?;
2364        write!(f, "len: {:?}, ", self.len)?;
2365        write!(f, "write: {:?}, ", self.write)?;
2366        let key_buf_desc = match self.key_buf {
2367            Some(ref _buf) => "Some(_)",
2368            None => "None",
2369        };
2370        write!(f, "key_buf: {} }}", key_buf_desc)?;
2371        Ok(())
2372    }
2373}
2374
2375/// A subset of Index features for read-only accesses.
2376/// - Provides the main buffer, immutable data serialized on-disk.
2377/// - Provides the optional checksum checker.
2378/// - Provides the path (for error message).
2379trait IndexBuf {
2380    fn buf(&self) -> &[u8];
2381    fn path(&self) -> &Path;
2382
2383    /// Verify checksum for the given range. Internal API used by `*Offset` structs.
2384    fn verify_checksum(&self, start: u64, length: u64) -> crate::Result<()>;
2385
2386    #[inline(never)]
2387    fn range_error(&self, start: usize, length: usize) -> crate::Error {
2388        self.corruption(format!(
2389            "byte range {}..{} is unavailable",
2390            start,
2391            start + length
2392        ))
2393    }
2394
2395    #[inline(never)]
2396    fn corruption(&self, message: impl ToString) -> crate::Error {
2397        crate::Error::corruption(self.path(), message)
2398    }
2399}
2400
2401impl<T: IndexBuf> IndexBuf for &T {
2402    fn buf(&self) -> &[u8] {
2403        T::buf(self)
2404    }
2405    fn verify_checksum(&self, start: u64, length: u64) -> crate::Result<()> {
2406        T::verify_checksum(self, start, length)
2407    }
2408    fn path(&self) -> &Path {
2409        T::path(self)
2410    }
2411}
2412
2413impl IndexBuf for Index {
2414    fn buf(&self) -> &[u8] {
2415        &self.buf
2416    }
2417    fn verify_checksum(&self, offset: u64, length: u64) -> crate::Result<()> {
2418        self.checksum
2419            .check_range(&self.buf, offset, length)
2420            .context(&self.path, || format!("Index path = {:?}", self.path))
2421    }
2422    fn path(&self) -> &Path {
2423        &self.path
2424    }
2425}
2426
2427// Intentionally not inlined. This affects the "index lookup (disk, verified)"
2428// benchmark. It takes 74ms with this function inlined, and 61ms without.
2429//
2430// Reduce instruction count in `Index::verify_checksum`.
2431#[inline(never)]
2432fn checksum_error(checksum: &MemChecksum, offset: u64, length: u64) -> io::Result<()> {
2433    Err(io::Error::new(
2434        io::ErrorKind::InvalidData,
2435        format!(
2436            "range {}..{} failed checksum check ({:?})",
2437            offset,
2438            offset + length,
2439            &checksum,
2440        ),
2441    ))
2442}
2443
2444impl Index {
2445    /// Return a cloned [`Index`] with pending in-memory changes.
2446    pub fn try_clone(&self) -> crate::Result<Self> {
2447        self.try_clone_internal(true)
2448            .context("in Index::try_clone")
2449            .context(|| format!("  Index.path = {:?}", self.path))
2450    }
2451
2452    /// Return a cloned [`Index`] without pending in-memory changes.
2453    ///
2454    /// This is logically equivalent to calling `clear_dirty` immediately
2455    /// on the result after `try_clone`, but potentially cheaper.
2456    pub fn try_clone_without_dirty(&self) -> crate::Result<Self> {
2457        self.try_clone_internal(false)
2458            .context("in Index::try_clone_without_dirty")
2459            .context(|| format!("  Index.path = {:?}", self.path))
2460    }
2461
2462    pub(crate) fn try_clone_internal(&self, copy_dirty: bool) -> crate::Result<Index> {
2463        let file = match &self.file {
2464            Some(f) => Some(f.duplicate().context(self.path(), "cannot duplicate")?),
2465            None => None,
2466        };
2467
2468        let index = if copy_dirty {
2469            Index {
2470                file,
2471                buf: self.buf.clone(),
2472                path: self.path.clone(),
2473                checksum_enabled: self.checksum_enabled,
2474                checksum_max_chain_len: self.checksum_max_chain_len,
2475                fsync: self.fsync,
2476                write: self.write,
2477                clean_root: self.clean_root.clone(),
2478                dirty_root: self.dirty_root.clone(),
2479                checksum: self.checksum.clone(),
2480                dirty_keys: self.dirty_keys.clone(),
2481                dirty_ext_keys: self.dirty_ext_keys.clone(),
2482                dirty_leafs: self.dirty_leafs.clone(),
2483                dirty_links: self.dirty_links.clone(),
2484                dirty_radixes: self.dirty_radixes.clone(),
2485                key_buf: self.key_buf.clone(),
2486            }
2487        } else {
2488            Index {
2489                file,
2490                buf: self.buf.clone(),
2491                path: self.path.clone(),
2492                checksum_enabled: self.checksum_enabled,
2493                checksum_max_chain_len: self.checksum_max_chain_len,
2494                fsync: self.fsync,
2495                write: self.write,
2496                clean_root: self.clean_root.clone(),
2497                dirty_root: self.clean_root.clone(),
2498                checksum: self.checksum.clone(),
2499                dirty_keys: Vec::new(),
2500                dirty_ext_keys: Vec::new(),
2501                dirty_leafs: Vec::new(),
2502                dirty_links: Vec::new(),
2503                dirty_radixes: if self.clean_root.radix_offset.is_dirty() {
2504                    // See `clear_dirty` for this special case.
2505                    vec![MemRadix::default()]
2506                } else {
2507                    Vec::new()
2508                },
2509                key_buf: self.key_buf.clone(),
2510            }
2511        };
2512
2513        Ok(index)
2514    }
2515
2516    /// Get metadata attached to the root node. This is what previously set by
2517    /// [Index::set_meta].
2518    pub fn get_meta(&self) -> &[u8] {
2519        &self.dirty_root.meta
2520    }
2521
2522    /// Get metadata attached to the root node at file open time. This is what
2523    /// stored on the filesystem at the index open time, not affected by
2524    /// [`Index::set_meta`].
2525    pub fn get_original_meta(&self) -> &[u8] {
2526        &self.clean_root.meta
2527    }
2528
2529    /// Set metadata attached to the root node. Will be written at
2530    /// [`Index::flush`] time.
2531    pub fn set_meta<B: AsRef<[u8]>>(&mut self, meta: B) {
2532        self.dirty_root.meta = meta.as_ref().to_vec().into_boxed_slice()
2533    }
2534
2535    /// Remove dirty (in-memory) state. Restore the [`Index`] to the state as
2536    /// if it's just loaded from disk without modifications.
2537    pub fn clear_dirty(&mut self) {
2538        self.dirty_root = self.clean_root.clone();
2539        self.dirty_radixes.clear();
2540        if self.dirty_root.radix_offset.is_dirty() {
2541            // In case the disk buffer is empty, a "dirty radix" entry
2542            // is created automatically. Check OpenOptions::open for
2543            // details.
2544            assert_eq!(
2545                self.dirty_root.radix_offset,
2546                RadixOffset::from_dirty_index(0)
2547            );
2548            self.dirty_radixes.push(MemRadix::default());
2549        }
2550        self.dirty_leafs.clear();
2551        self.dirty_links.clear();
2552        self.dirty_keys.clear();
2553        self.dirty_ext_keys.clear();
2554    }
2555
2556    /// Flush changes to disk.
2557    ///
2558    /// Take the file lock when writing.
2559    ///
2560    /// Return 0 if nothing needs to be written. Otherwise return the new file
2561    /// length on success. Return [`io::ErrorKind::PermissionDenied`] if the file
2562    /// was marked read-only at open time.
2563    ///
2564    /// The new file length can be used to obtain the exact same view of the
2565    /// index as it currently is. That means, other changes to the indexes won't
2566    /// be "combined" during flush. For example, given the following events
2567    /// happened in order:
2568    /// - Open. Get Index X.
2569    /// - Open using the same arguments. Get Index Y.
2570    /// - Write key "p" to X.
2571    /// - Write key "q" to Y.
2572    /// - Flush X. Get new length LX.
2573    /// - Flush Y. Get new length LY.
2574    /// - Open using LY as `logical_len`. Get Index Z.
2575    ///
2576    /// Then key "p" does not exist in Z. This allows some advanced use cases.
2577    /// On the other hand, if "merging changes" is the desired behavior, the
2578    /// caller needs to take another lock, re-instantiate [`Index`] and re-insert
2579    /// keys.
2580    ///
2581    /// For in-memory-only indexes, this function does nothing and returns 0,
2582    /// unless read-only was set at open time.
2583    pub fn flush(&mut self) -> crate::Result<u64> {
2584        let result: crate::Result<_> = (|| {
2585            let span = debug_span!("Index::flush", path = self.path.to_string_lossy().as_ref());
2586            let _guard = span.enter();
2587
2588            if self.write == Some(false) {
2589                return Err(crate::Error::path(
2590                    self.path(),
2591                    "cannot flush: Index opened with read-only mode",
2592                ));
2593            }
2594            if self.file.is_none() {
2595                // Why is this Ok, not Err?
2596                //
2597                // An in-memory Index does not share data with anybody else,
2598                // therefore no need to flush. In other words, whether flush
2599                // happens or not does not change the result of other APIs on
2600                // this Index instance.
2601                //
2602                // Another way to think about it, an in-memory Index is similar
2603                // to a private anonymous mmap, and msync on that mmap would
2604                // succeed.
2605                return Ok(0);
2606            }
2607
2608            let old_len = self.buf.len() as u64;
2609            let mut new_len = old_len;
2610            if self.dirty_root == self.clean_root && !self.dirty_root.radix_offset.is_dirty() {
2611                // Nothing changed
2612                return Ok(new_len);
2613            }
2614
2615            // Critical section: need write lock
2616            {
2617                let mut offset_map = OffsetMap::empty_for_index(self);
2618                let estimated_dirty_bytes = self.dirty_links.len() * 50;
2619                let path = self.path.clone(); // for error messages; and make the borrowck happy.
2620                let mut lock = ScopedFileLock::new(self.file.as_mut().unwrap(), true)
2621                    .context(&path, "cannot lock")?;
2622                let len = lock
2623                    .as_mut()
2624                    .seek(SeekFrom::End(0))
2625                    .context(&path, "cannot seek to end")?;
2626                if len < old_len {
2627                    let message = format!(
2628                        "on-disk index is unexpectedly smaller ({} bytes) than its previous version ({} bytes)",
2629                        len, old_len
2630                    );
2631                    // This is not a "corruption" - something has truncated the
2632                    // file, potentially recreating it. We haven't checked the
2633                    // new content, so it's not considered as "data corruption".
2634                    // TODO: Review this decision.
2635                    let err = crate::Error::path(&path, message);
2636                    return Err(err);
2637                }
2638
2639                let mut buf = Vec::with_capacity(estimated_dirty_bytes);
2640
2641                // Write in the following order:
2642                // header, keys, links, leafs, radixes, root.
2643                // Latter entries depend on former entries.
2644
2645                if len == 0 {
2646                    buf.write_all(&[TYPE_HEAD]).infallible()?;
2647                }
2648
2649                for (i, entry) in self.dirty_keys.iter().enumerate() {
2650                    if !entry.is_unused() {
2651                        let offset = buf.len() as u64 + len;
2652                        offset_map.key_map[i] = offset;
2653                        entry.write_to(&mut buf, &offset_map).infallible()?;
2654                    };
2655                }
2656
2657                // Inlined leafs. They might affect ExtKeys and Links. Need to write first.
2658                for i in 0..self.dirty_leafs.len() {
2659                    let entry = self.dirty_leafs.get_mut(i).unwrap();
2660                    let offset = buf.len() as u64 + len;
2661                    if !entry.is_unused()
2662                        && entry.maybe_write_inline_to(
2663                            &mut buf,
2664                            &self.buf,
2665                            len,
2666                            &mut self.dirty_ext_keys,
2667                            &mut self.dirty_links,
2668                            &mut offset_map,
2669                        )?
2670                    {
2671                        offset_map.leaf_map[i] = offset;
2672                        entry.mark_unused();
2673                    }
2674                }
2675
2676                for (i, entry) in self.dirty_ext_keys.iter().enumerate() {
2677                    if !entry.is_unused() {
2678                        let offset = buf.len() as u64 + len;
2679                        offset_map.ext_key_map[i] = offset;
2680                        entry.write_to(&mut buf, &offset_map).infallible()?;
2681                    }
2682                }
2683
2684                for (i, entry) in self.dirty_links.iter().enumerate() {
2685                    if !entry.is_unused() {
2686                        let offset = buf.len() as u64 + len;
2687                        offset_map.link_map[i] = offset;
2688                        entry.write_to(&mut buf, &offset_map).infallible()?;
2689                    }
2690                }
2691
2692                // Non-inlined leafs.
2693                for (i, entry) in self.dirty_leafs.iter().enumerate() {
2694                    if !entry.is_unused() {
2695                        let offset = buf.len() as u64 + len;
2696                        offset_map.leaf_map[i] = offset;
2697                        entry
2698                            .write_noninline_to(&mut buf, &offset_map)
2699                            .infallible()?;
2700                    }
2701                }
2702
2703                // Write Radix entries in reversed order since former ones might refer to latter ones.
2704                for (i, entry) in self.dirty_radixes.iter().rev().enumerate() {
2705                    let offset = buf.len() as u64 + len;
2706                    entry.write_to(&mut buf, &offset_map).infallible()?;
2707                    offset_map.radix_map[i] = offset;
2708                }
2709
2710                // Write Root.
2711                let root_len = self
2712                    .dirty_root
2713                    .write_to(&mut buf, &offset_map)
2714                    .infallible()?;
2715
2716                // Update and write Checksum if it's enabled.
2717                let mut new_checksum = self.checksum.clone();
2718                let checksum_len = if self.checksum_enabled {
2719                    new_checksum
2720                        .update(&self.buf, lock.as_mut(), len, &buf)
2721                        .context(&path, "cannot read and update checksum")?;
2722                    // Optionally merge the checksum entry for optimization.
2723                    if self.checksum_max_chain_len > 0
2724                        && new_checksum.chain_len >= self.checksum_max_chain_len
2725                    {
2726                        new_checksum.flatten();
2727                    }
2728                    new_checksum.write_to(&mut buf, &offset_map).infallible()?
2729                } else {
2730                    assert!(!self.checksum.is_enabled());
2731                    0
2732                };
2733                write_reversed_vlq(&mut buf, root_len + checksum_len).infallible()?;
2734
2735                new_len = buf.len() as u64 + len;
2736                lock.as_mut()
2737                    .seek(SeekFrom::Start(len))
2738                    .context(&path, "cannot seek")?;
2739                lock.as_mut()
2740                    .write_all(&buf)
2741                    .context(&path, "cannot write new data to index")?;
2742
2743                if self.fsync || config::get_global_fsync() {
2744                    lock.as_mut().sync_all().context(&path, "cannot sync")?;
2745                }
2746
2747                // Remap and update root since length has changed
2748                let bytes = mmap_bytes(lock.as_ref(), None).context(&path, "cannot mmap")?;
2749                self.buf = bytes;
2750
2751                // 'path' should not have changed.
2752                debug_assert_eq!(&self.path, &path);
2753
2754                // This is to workaround the borrow checker.
2755                let this = SimpleIndexBuf(&self.buf, &path);
2756
2757                // Sanity check - the length should be expected. Otherwise, the lock
2758                // is somehow ineffective.
2759                if self.buf.len() as u64 != new_len {
2760                    return Err(this.corruption("file changed unexpectedly"));
2761                }
2762
2763                // Reload root and checksum.
2764                let (root, checksum) =
2765                    read_root_checksum_at_end(&path, &self.buf, new_len as usize)?;
2766
2767                debug_assert_eq!(checksum.end, new_checksum.end);
2768                debug_assert_eq!(&checksum.xxhash_list, &new_checksum.xxhash_list);
2769                self.checksum = checksum;
2770                self.clean_root = root;
2771            }
2772
2773            // Outside critical section
2774            self.clear_dirty();
2775
2776            if cfg!(all(debug_assertions, test)) {
2777                self.verify()
2778                    .expect("sync() should not break checksum check");
2779            }
2780
2781            Ok(new_len)
2782        })();
2783        result
2784            .context("in Index::flush")
2785            .context(|| format!("  Index.path = {:?}", self.path))
2786    }
2787
2788    /// Lookup by `key`. Return [`LinkOffset`].
2789    ///
2790    /// To test if the key exists or not, use [Offset::is_null].
2791    /// To obtain all values, use [`LinkOffset::values`].
2792    pub fn get<K: AsRef<[u8]>>(&self, key: &K) -> crate::Result<LinkOffset> {
2793        let result: crate::Result<_> = (|| {
2794            let mut offset: Offset = self.dirty_root.radix_offset.into();
2795            let mut iter = Base16Iter::from_base256(key);
2796
2797            while !offset.is_null() {
2798                // Read the entry at "offset"
2799                match offset.to_typed(self)? {
2800                    TypedOffset::Radix(radix) => {
2801                        match iter.next() {
2802                            None => {
2803                                // The key ends at this Radix entry.
2804                                return radix.link_offset(self);
2805                            }
2806                            Some(x) => {
2807                                // Follow the `x`-th child in the Radix entry.
2808                                offset = radix.child(self, x)?;
2809                            }
2810                        }
2811                    }
2812                    TypedOffset::Leaf(leaf) => {
2813                        // Meet a leaf. If key matches, return the link offset.
2814                        let (stored_key, link_offset) = leaf.key_and_link_offset(self)?;
2815                        if stored_key == key.as_ref() {
2816                            return Ok(link_offset);
2817                        } else {
2818                            return Ok(LinkOffset::default());
2819                        }
2820                    }
2821                    _ => return Err(self.corruption("unexpected type during key lookup")),
2822                }
2823            }
2824
2825            // Not found
2826            Ok(LinkOffset::default())
2827        })();
2828
2829        result
2830            .context(|| format!("in Index::get({:?})", key.as_ref()))
2831            .context(|| format!("  Index.path = {:?}", self.path))
2832    }
2833
2834    /// Scan entries which match the given prefix in base16 form.
2835    /// Return [`RangeIter`] which allows accesses to keys and values.
2836    pub fn scan_prefix_base16(
2837        &self,
2838        mut base16: impl Iterator<Item = u8>,
2839    ) -> crate::Result<RangeIter> {
2840        let mut offset: Offset = self.dirty_root.radix_offset.into();
2841        let mut front_stack = Vec::<IterState>::new();
2842
2843        while !offset.is_null() {
2844            // Read the entry at "offset"
2845            match offset.to_typed(self)? {
2846                TypedOffset::Radix(radix) => {
2847                    match base16.next() {
2848                        None => {
2849                            let start = IterState::RadixStart(radix);
2850                            let end = IterState::RadixEnd(radix);
2851                            front_stack.push(start);
2852                            let mut back_stack = front_stack.clone();
2853                            *back_stack.last_mut().unwrap() = end;
2854                            return Ok(RangeIter::new(self, front_stack, back_stack));
2855                        }
2856                        Some(x) => {
2857                            // Follow the `x`-th child in the Radix entry.
2858                            front_stack.push(IterState::RadixChild(radix, x));
2859                            offset = radix.child(self, x)?;
2860                        }
2861                    }
2862                }
2863                TypedOffset::Leaf(leaf) => {
2864                    // Meet a leaf. If key matches, return the link offset.
2865                    let eq = {
2866                        let (stored_key, _link_offset) = leaf.key_and_link_offset(self)?;
2867                        // Remaining key matches?
2868                        let remaining: Vec<u8> = base16.collect();
2869                        Base16Iter::from_base256(&stored_key)
2870                            .skip(front_stack.len())
2871                            .take(remaining.len())
2872                            .eq(remaining.iter().cloned())
2873                    };
2874                    if eq {
2875                        let start = IterState::LeafStart(leaf);
2876                        let end = IterState::LeafEnd(leaf);
2877                        front_stack.push(start);
2878                        let mut back_stack = front_stack.clone();
2879                        *back_stack.last_mut().unwrap() = end;
2880                        return Ok(RangeIter::new(self, front_stack, back_stack));
2881                    } else {
2882                        return Ok(RangeIter::new(self, front_stack.clone(), front_stack));
2883                    };
2884                }
2885                _ => return Err(self.corruption("unexpected type during prefix scan")),
2886            }
2887        }
2888
2889        // Not found
2890        Ok(RangeIter::new(self, front_stack.clone(), front_stack))
2891    }
2892
2893    /// Scan entries which match the given prefix in base256 form.
2894    /// Return [`RangeIter`] which allows accesses to keys and values.
2895    pub fn scan_prefix<B: AsRef<[u8]>>(&self, prefix: B) -> crate::Result<RangeIter> {
2896        self.scan_prefix_base16(Base16Iter::from_base256(&prefix))
2897            .context(|| format!("in Index::scan_prefix({:?})", prefix.as_ref()))
2898            .context(|| format!("  Index.path = {:?}", self.path))
2899    }
2900
2901    /// Scan entries which match the given prefix in hex form.
2902    /// Return [`RangeIter`] which allows accesses to keys and values.
2903    pub fn scan_prefix_hex<B: AsRef<[u8]>>(&self, prefix: B) -> crate::Result<RangeIter> {
2904        // Invalid hex chars will be caught by `radix.child`
2905        let base16 = prefix.as_ref().iter().cloned().map(single_hex_to_base16);
2906        self.scan_prefix_base16(base16)
2907            .context(|| format!("in Index::scan_prefix_hex({:?})", prefix.as_ref()))
2908            .context(|| format!("  Index.path = {:?}", self.path))
2909    }
2910
2911    /// Scans entries whose keys are within the given range.
2912    ///
2913    /// Returns a double-ended iterator, which provides accesses to keys and
2914    /// values.
2915    pub fn range<'a>(&self, range: impl RangeBounds<&'a [u8]>) -> crate::Result<RangeIter> {
2916        let is_empty_range = match (range.start_bound(), range.end_bound()) {
2917            (Included(start), Included(end)) => start > end,
2918            (Included(start), Excluded(end)) => start > end,
2919            (Excluded(start), Included(end)) => start > end,
2920            (Excluded(start), Excluded(end)) => start >= end,
2921            (Unbounded, _) | (_, Unbounded) => false,
2922        };
2923
2924        if is_empty_range {
2925            // `BTreeSet::range` panics in this case. Match its behavior.
2926            panic!("range start is greater than range end");
2927        }
2928
2929        let result: crate::Result<_> = (|| {
2930            let front_stack = self.iter_stack_by_bound(range.start_bound(), Front)?;
2931            let back_stack = self.iter_stack_by_bound(range.end_bound(), Back)?;
2932            Ok(RangeIter::new(self, front_stack, back_stack))
2933        })();
2934
2935        result
2936            .context(|| {
2937                format!(
2938                    "in Index::range({:?} to {:?})",
2939                    range.start_bound(),
2940                    range.end_bound()
2941                )
2942            })
2943            .context(|| format!("  Index.path = {:?}", self.path))
2944    }
2945
2946    /// Insert a key-value pair. The value will be the head of the linked list.
2947    /// That is, `get(key).values().first()` will return the newly inserted
2948    /// value.
2949    pub fn insert<K: AsRef<[u8]>>(&mut self, key: &K, value: u64) -> crate::Result<()> {
2950        self.insert_advanced(InsertKey::Embed(key.as_ref()), InsertValue::Prepend(value))
2951            .context(|| format!("in Index::insert(key={:?}, value={})", key.as_ref(), value))
2952            .context(|| format!("  Index.path = {:?}", self.path))
2953    }
2954
2955    /// Remove all values associated with the given key.
2956    pub fn remove(&mut self, key: impl AsRef<[u8]>) -> crate::Result<()> {
2957        // NOTE: The implementation detail does not remove radix entries to
2958        // reclaim space or improve lookup performance. For example, inserting
2959        // "abcde" to an empty index, following by deleting it will still
2960        // require O(5) jumps looking up "abcde".
2961        let key = key.as_ref();
2962        self.insert_advanced(InsertKey::Embed(key), InsertValue::Tombstone)
2963            .context(|| format!("in Index::remove(key={:?})", key))
2964            .context(|| format!("  Index.path = {:?}", self.path))
2965    }
2966
2967    /// Remove all values associated with all keys with the given prefix.
2968    pub fn remove_prefix(&mut self, prefix: impl AsRef<[u8]>) -> crate::Result<()> {
2969        // NOTE: See "remove". The implementation detail does not optimize
2970        // for space or lookup performance.
2971        let prefix = prefix.as_ref();
2972        self.insert_advanced(InsertKey::Embed(prefix), InsertValue::TombstonePrefix)
2973            .context(|| format!("in Index::remove_prefix(prefix={:?})", prefix))
2974            .context(|| format!("  Index.path = {:?}", self.path))
2975    }
2976
2977    /// Update the linked list for a given key.
2978    ///
2979    /// If `link` is None, behave like `insert`. Otherwise, ignore the existing
2980    /// values `key` mapped to, create a new link entry that chains to the given
2981    /// [`LinkOffset`].
2982    ///
2983    /// `key` could be a reference, or an embedded value. See [`InsertKey`] for
2984    /// details.
2985    ///
2986    /// This is a low-level API.
2987    pub fn insert_advanced(&mut self, key: InsertKey, value: InsertValue) -> crate::Result<()> {
2988        let mut offset: Offset = self.dirty_root.radix_offset.into();
2989        let mut step = 0;
2990        let (key, key_buf_offset) = match key {
2991            InsertKey::Embed(k) => (k, None),
2992            InsertKey::Reference((start, len)) => {
2993                let key = match self.key_buf.as_ref().slice(start, len) {
2994                    Some(k) => k,
2995                    None => {
2996                        return Err(
2997                            self.corruption("key buffer is invalid when inserting referred keys")
2998                        );
2999                    }
3000                };
3001                // UNSAFE NOTICE: `key` is valid as long as `self.key_buf` is valid. `self.key_buf`
3002                // won't be changed. So `self` can still be mutable without a read-only
3003                // relationship with `key`.
3004                let detached_key = unsafe { &*(key as *const [u8]) };
3005                (detached_key, Some((start, len)))
3006            }
3007        };
3008        let mut iter = Base16Iter::from_base256(&key);
3009
3010        let mut last_radix = RadixOffset::default();
3011        let mut last_child = 0u8;
3012
3013        loop {
3014            match offset.to_typed(&*self)? {
3015                TypedOffset::Radix(radix) => {
3016                    // Copy radix entry since we must modify it.
3017                    let radix = radix.copy(self)?;
3018                    offset = radix.into();
3019
3020                    if step == 0 {
3021                        self.dirty_root.radix_offset = radix;
3022                    } else {
3023                        last_radix.set_child(self, last_child, offset);
3024                    }
3025
3026                    last_radix = radix;
3027
3028                    match iter.next() {
3029                        None => {
3030                            // "key" is shorter than existing ones. No need to create a new key.
3031                            // For example, insert "a", when root.radix is {'a': {'b': { ... }}}.
3032                            let old_link_offset = radix.link_offset(self)?;
3033                            let new_link_offset = match value {
3034                                InsertValue::Prepend(value) => old_link_offset.create(self, value),
3035                                InsertValue::PrependReplace(value, link_offset) => {
3036                                    link_offset.create(self, value)
3037                                }
3038                                InsertValue::Tombstone => LinkOffset::default(),
3039                                InsertValue::TombstonePrefix => {
3040                                    radix.set_all_to_null(self);
3041                                    return Ok(());
3042                                }
3043                            };
3044                            radix.set_link(self, new_link_offset);
3045                            return Ok(());
3046                        }
3047                        Some(x) => {
3048                            let next_offset = radix.child(self, x)?;
3049                            if next_offset.is_null() {
3050                                // "key" is longer than existing ones. Create key and leaf entries.
3051                                // For example, insert "abcd", when root.radix is {'a': {}}.
3052                                let new_link_offset = match value {
3053                                    InsertValue::Prepend(value) => {
3054                                        LinkOffset::default().create(self, value)
3055                                    }
3056                                    InsertValue::PrependReplace(value, link_offset) => {
3057                                        link_offset.create(self, value)
3058                                    }
3059                                    InsertValue::Tombstone | InsertValue::TombstonePrefix => {
3060                                        // No need to create a key.
3061                                        radix.set_child(self, x, Offset::null());
3062                                        return Ok(());
3063                                    }
3064                                };
3065                                let key_offset = self.create_key(key, key_buf_offset);
3066                                let leaf_offset =
3067                                    LeafOffset::create(self, new_link_offset, key_offset);
3068                                radix.set_child(self, x, leaf_offset.into());
3069                                return Ok(());
3070                            } else {
3071                                offset = next_offset;
3072                                last_child = x;
3073                            }
3074                        }
3075                    }
3076                }
3077                TypedOffset::Leaf(leaf) => {
3078                    let (old_key, old_link_offset) = {
3079                        let (old_key, link_offset) = leaf.key_and_link_offset(self)?;
3080                        // Detach "old_key" from "self".
3081                        // About safety: This is to avoid a memory copy / allocation.
3082                        // `old_key` are only valid before `dirty_*keys` being resized.
3083                        // `old_iter` (used by `split_leaf`) and `old_key` are not used
3084                        // after creating a key. So it's safe to not copy it.
3085                        let detached_key = unsafe { &*(old_key as *const [u8]) };
3086                        (detached_key, link_offset)
3087                    };
3088                    let matched = if let InsertValue::TombstonePrefix = value {
3089                        // Only test the prefix of old_key.
3090                        old_key.get(..key.len()) == Some(key)
3091                    } else {
3092                        old_key == key
3093                    };
3094                    if matched {
3095                        // Key matched. Need to copy leaf entry for modification, except for
3096                        // deletion.
3097                        let new_link_offset = match value {
3098                            InsertValue::Prepend(value) => old_link_offset.create(self, value),
3099                            InsertValue::PrependReplace(value, link_offset) => {
3100                                link_offset.create(self, value)
3101                            }
3102                            InsertValue::Tombstone | InsertValue::TombstonePrefix => {
3103                                // No need to copy the leaf entry.
3104                                last_radix.set_child(self, last_child, Offset::null());
3105                                return Ok(());
3106                            }
3107                        };
3108                        let new_leaf_offset = leaf.set_link(self, new_link_offset)?;
3109                        last_radix.set_child(self, last_child, new_leaf_offset.into());
3110                    } else {
3111                        // Key mismatch. Do a leaf split unless it's a deletion.
3112                        let new_link_offset = match value {
3113                            InsertValue::Prepend(value) => {
3114                                LinkOffset::default().create(self, value)
3115                            }
3116                            InsertValue::PrependReplace(value, link_offset) => {
3117                                link_offset.create(self, value)
3118                            }
3119                            InsertValue::Tombstone | InsertValue::TombstonePrefix => return Ok(()),
3120                        };
3121                        self.split_leaf(
3122                            leaf,
3123                            old_key,
3124                            key.as_ref(),
3125                            key_buf_offset,
3126                            step,
3127                            last_radix,
3128                            last_child,
3129                            old_link_offset,
3130                            new_link_offset,
3131                        )?;
3132                    }
3133                    return Ok(());
3134                }
3135                _ => return Err(self.corruption("unexpected type during insertion")),
3136            }
3137
3138            step += 1;
3139        }
3140    }
3141
3142    /// Convert a slice to [`Bytes`].
3143    /// Do not copy the slice if it's from the on-disk buffer.
3144    pub fn slice_to_bytes(&self, slice: &[u8]) -> Bytes {
3145        self.buf.slice_to_bytes(slice)
3146    }
3147
3148    /// Verify checksum for the entire on-disk buffer.
3149    pub fn verify(&self) -> crate::Result<()> {
3150        self.verify_checksum(0, self.checksum.end)
3151    }
3152
3153    // Internal function used by [`Index::range`].
3154    // Calculate the [`IterState`] stack used by [`RangeIter`].
3155    // `side` is the side of the `bound`, starting side of the iteration,
3156    // the opposite of "towards" side.
3157    fn iter_stack_by_bound(
3158        &self,
3159        bound: Bound<&&[u8]>,
3160        side: Side,
3161    ) -> crate::Result<Vec<IterState>> {
3162        let root_radix = self.dirty_root.radix_offset;
3163        let (inclusive, mut base16iter) = match bound {
3164            Unbounded => {
3165                return Ok(match side {
3166                    Front => vec![IterState::RadixStart(root_radix)],
3167                    Back => vec![IterState::RadixEnd(root_radix)],
3168                });
3169            }
3170            Included(ref key) => (true, Base16Iter::from_base256(key)),
3171            Excluded(ref key) => (false, Base16Iter::from_base256(key)),
3172        };
3173
3174        let mut offset: Offset = root_radix.into();
3175        let mut stack = Vec::<IterState>::new();
3176
3177        while !offset.is_null() {
3178            match offset.to_typed(self)? {
3179                TypedOffset::Radix(radix) => match base16iter.next() {
3180                    None => {
3181                        // The key ends at this Radix entry.
3182                        let state = IterState::RadixLeaf(radix);
3183                        let state = if inclusive {
3184                            state.step(side).unwrap()
3185                        } else {
3186                            state
3187                        };
3188                        stack.push(state);
3189                        return Ok(stack);
3190                    }
3191                    Some(x) => {
3192                        // Follow the `x`-th child in the Radix entry.
3193                        stack.push(IterState::RadixChild(radix, x));
3194                        offset = radix.child(self, x)?;
3195                    }
3196                },
3197                TypedOffset::Leaf(leaf) => {
3198                    let stored_cmp_key = {
3199                        let (stored_key, _link_offset) = leaf.key_and_link_offset(self)?;
3200                        Base16Iter::from_base256(&stored_key)
3201                            .skip(stack.len())
3202                            .cmp(base16iter)
3203                    };
3204                    let state = IterState::Leaf(leaf);
3205                    let state = match (stored_cmp_key, side, inclusive) {
3206                        (Equal, _, true) | (Less, Back, _) | (Greater, Front, _) => {
3207                            state.step(side).unwrap()
3208                        }
3209                        (Equal, _, false) | (Greater, Back, _) | (Less, Front, _) => state,
3210                    };
3211                    stack.push(state);
3212                    return Ok(stack);
3213                }
3214                _ => return Err(self.corruption("unexpected type following prefix")),
3215            }
3216        }
3217
3218        // Prefix does not exist. The stack ends with a RadixChild state that
3219        // points to nothing.
3220        Ok(stack)
3221    }
3222
3223    #[allow(clippy::too_many_arguments)]
3224    /// Split a leaf entry. Separated from `insert_advanced` to make `insert_advanced`
3225    /// shorter.  The parameters are internal states inside `insert_advanced`. Calling this
3226    /// from other functions makes less sense.
3227    #[inline]
3228    fn split_leaf(
3229        &mut self,
3230        old_leaf_offset: LeafOffset,
3231        old_key: &[u8],
3232        new_key: &[u8],
3233        key_buf_offset: Option<(u64, u64)>,
3234        step: usize,
3235        radix_offset: RadixOffset,
3236        child: u8,
3237        old_link_offset: LinkOffset,
3238        new_link_offset: LinkOffset,
3239    ) -> crate::Result<()> {
3240        // This is probably the most complex part. Here are some explanation about input parameters
3241        // and what this function is supposed to do for some cases:
3242        //
3243        // Input parameters are marked using `*`:
3244        //
3245        //      Offset            | Content
3246        //      root_radix        | Radix(child1: radix1, ...)         \
3247        //      radix1            | Radix(child2: radix2, ...)         |> steps
3248        //      ...               | ...                                | (for skipping check
3249        //      *radix_offset*    | Radix(*child*: *leaf_offset*, ...) /  of prefix in keys)
3250        //      *old_leaf_offset* | Leaf(link_offset: *old_link_offset*, ...)
3251        //      *new_link_offset* | Link(...)
3252        //
3253        //      old_* are redundant, but they are pre-calculated by the caller. So just reuse them.
3254        //
3255        // Here are 3 kinds of examples (Keys are embed in Leaf for simplicity):
3256        //
3257        // Example 1. old_key = "1234"; new_key = "1278".
3258        //
3259        //      Offset | Before                | After
3260        //           A | Radix(1: B)           | Radix(1: C)
3261        //           B | Leaf("1234", Link: X) | Leaf("1234", Link: X)
3262        //           C |                       | Radix(2: E)
3263        //           D |                       | Leaf("1278")
3264        //           E |                       | Radix(3: B, 7: D)
3265        //
3266        // Example 2. old_key = "1234", new_key = "12". No need for a new leaf entry:
3267        //
3268        //      Offset | Before                | After
3269        //           A | Radix(1: B)           | Radix(1: C)
3270        //           B | Leaf("1234", Link: X) | Leaf("1234", Link: X)
3271        //           C |                       | Radix(2: B, Link: Y)
3272        //
3273        // Example 3. old_key = "12", new_key = "1234". Need new leaf. Old leaf is not needed.
3274        //
3275        //      Offset | Before              | After
3276        //           A | Radix(1: B)         | Radix(1: C)
3277        //           B | Leaf("12", Link: X) | Leaf("12", Link: X) # not used
3278        //           C |                     | Radix(2: E, Link: X)
3279        //           D |                     | Leaf("1234", Link: Y)
3280        //           E |                     | Radix(3: D)
3281
3282        // UNSAFE NOTICE: Read the "UNSAFE NOTICE" inside `insert_advanced` to learn more.
3283        // Basically, `old_iter` is only guaranteed available if there is no insertion to
3284        // `self.dirty_keys` or `self.dirty_ext_keys`. That's true here since we won't read
3285        // `old_iter` after creating new keys. But be aware of the constraint when modifying the
3286        // code.
3287        let mut old_iter = Base16Iter::from_base256(&old_key).skip(step);
3288        let mut new_iter = Base16Iter::from_base256(&new_key).skip(step);
3289
3290        let mut last_radix_offset = radix_offset;
3291        let mut last_radix_child = child;
3292
3293        let mut completed = false;
3294
3295        loop {
3296            let b1 = old_iter.next();
3297            let b2 = new_iter.next();
3298
3299            let mut radix = MemRadix::default();
3300
3301            if let Some(b1) = b1 {
3302                // Initial value for the b1-th child. Could be rewritten by
3303                // "set_radix_entry_child" in the next loop iteration.
3304                radix.offsets[b1 as usize] = old_leaf_offset.into();
3305            } else {
3306                // Example 3. old_key is a prefix of new_key. A leaf is still needed.
3307                // The new leaf will be created by the next "if" block.
3308                old_leaf_offset.mark_unused(self);
3309                radix.link_offset = old_link_offset;
3310            }
3311
3312            match b2 {
3313                None => {
3314                    // Example 2. new_key is a prefix of old_key. A new leaf is not needed.
3315                    radix.link_offset = new_link_offset;
3316                    completed = true;
3317                }
3318                Some(b2v) if b1 != b2 => {
3319                    // Example 1 and Example 3. A new leaf is needed.
3320                    let new_key_offset = self.create_key(new_key, key_buf_offset);
3321                    let new_leaf_offset = LeafOffset::create(self, new_link_offset, new_key_offset);
3322                    radix.offsets[b2v as usize] = new_leaf_offset.into();
3323                    completed = true;
3324                }
3325                _ => {}
3326            }
3327
3328            // Create the Radix entry, and connect it to the parent entry.
3329            let offset = RadixOffset::create(self, radix);
3330            last_radix_offset.set_child(self, last_radix_child, offset.into());
3331
3332            if completed {
3333                break;
3334            }
3335
3336            debug_assert!(b1 == b2);
3337            last_radix_offset = offset;
3338            last_radix_child = b2.unwrap();
3339        }
3340
3341        Ok(())
3342    }
3343
3344    /// Create a key (if key_buf_offset is None) or ext key (if key_buf_offset is set) entry.
3345    #[inline]
3346    fn create_key(&mut self, key: &[u8], key_buf_offset: Option<(u64, u64)>) -> Offset {
3347        match key_buf_offset {
3348            None => KeyOffset::create(self, key).into(),
3349            Some((start, len)) => ExtKeyOffset::create(self, start, len).into(),
3350        }
3351    }
3352}
3353
3354/// Specify value to insert. Used by `insert_advanced`.
3355#[derive(Copy, Clone)]
3356pub enum InsertValue {
3357    /// Insert as a head of the existing linked list.
3358    Prepend(u64),
3359
3360    /// Replace the linked list. Then insert as a head.
3361    PrependReplace(u64, LinkOffset),
3362
3363    /// Effectively delete associated values for the specified key.
3364    Tombstone,
3365
3366    /// Effectively delete associated values for all keys starting with the
3367    /// prefix.
3368    TombstonePrefix,
3369}
3370
3371//// Debug Formatter
3372
3373impl Debug for Offset {
3374    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3375        if self.is_null() {
3376            write!(f, "None")
3377        } else if self.is_dirty() {
3378            let path = Path::new("<dummy>");
3379            let dummy = SimpleIndexBuf(b"", &path);
3380            match self.to_typed(dummy).unwrap() {
3381                TypedOffset::Radix(x) => x.fmt(f),
3382                TypedOffset::Leaf(x) => x.fmt(f),
3383                TypedOffset::Link(x) => x.fmt(f),
3384                TypedOffset::Key(x) => x.fmt(f),
3385                TypedOffset::ExtKey(x) => x.fmt(f),
3386                TypedOffset::Checksum(x) => x.fmt(f),
3387            }
3388        } else {
3389            write!(f, "Disk[{}]", self.0)
3390        }
3391    }
3392}
3393
3394impl Debug for MemRadix {
3395    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3396        write!(f, "Radix {{ link: {:?}", self.link_offset)?;
3397        for (i, v) in self.offsets.iter().cloned().enumerate() {
3398            if !v.is_null() {
3399                write!(f, ", {}: {:?}", i, v)?;
3400            }
3401        }
3402        write!(f, " }}")
3403    }
3404}
3405
3406impl Debug for MemLeaf {
3407    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3408        if self.is_unused() {
3409            write!(f, "Leaf (unused)")
3410        } else {
3411            write!(
3412                f,
3413                "Leaf {{ key: {:?}, link: {:?} }}",
3414                self.key_offset, self.link_offset
3415            )
3416        }
3417    }
3418}
3419
3420impl Debug for MemLink {
3421    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3422        write!(
3423            f,
3424            "Link {{ value: {}, next: {:?} }}",
3425            self.value, self.next_link_offset
3426        )
3427    }
3428}
3429
3430impl Debug for MemKey {
3431    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3432        if self.is_unused() {
3433            write!(f, "Key (unused)")
3434        } else {
3435            write!(f, "Key {{ key:")?;
3436            for byte in self.key.iter() {
3437                write!(f, " {:X}", byte)?;
3438            }
3439            write!(f, " }}")
3440        }
3441    }
3442}
3443
3444impl Debug for MemExtKey {
3445    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3446        if self.is_unused() {
3447            write!(f, "ExtKey (unused)")
3448        } else {
3449            write!(f, "ExtKey {{ start: {}, len: {} }}", self.start, self.len)
3450        }
3451    }
3452}
3453
3454impl Debug for MemRoot {
3455    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3456        if self.meta.is_empty() {
3457            write!(f, "Root {{ radix: {:?} }}", self.radix_offset)
3458        } else {
3459            write!(
3460                f,
3461                "Root {{ radix: {:?}, meta: {:?} }}",
3462                self.radix_offset, self.meta
3463            )
3464        }
3465    }
3466}
3467impl Debug for MemChecksum {
3468    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3469        write!(
3470            f,
3471            "Checksum {{ start: {}, end: {}, chunk_size_logarithm: {}, checksums.len(): {} }}",
3472            self.start,
3473            self.end,
3474            self.chunk_size_logarithm,
3475            self.xxhash_list_to_write().len(),
3476        )
3477    }
3478}
3479
3480impl Debug for Index {
3481    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
3482        writeln!(
3483            f,
3484            "Index {{ len: {}, root: {:?} }}",
3485            self.buf.len(),
3486            self.dirty_root.radix_offset
3487        )?;
3488
3489        // On-disk entries
3490        let offset_map = OffsetMap::default();
3491        let mut buf = Vec::with_capacity(self.buf.len());
3492        buf.push(TYPE_HEAD);
3493        let mut root_offset = 0;
3494        loop {
3495            let i = buf.len();
3496            if i >= self.buf.len() {
3497                break;
3498            }
3499            write!(f, "Disk[{}]: ", i)?;
3500            let type_int = self.buf[i];
3501            let i = i as u64;
3502            match type_int {
3503                TYPE_RADIX => {
3504                    let e = MemRadix::read_from(self, i).expect("read");
3505                    e.write_to(&mut buf, &offset_map).expect("write");
3506                    writeln!(f, "{:?}", e)?;
3507                }
3508                TYPE_LEAF => {
3509                    let e = MemLeaf::read_from(self, i).expect("read");
3510                    e.write_noninline_to(&mut buf, &offset_map).expect("write");
3511                    writeln!(f, "{:?}", e)?;
3512                }
3513                TYPE_INLINE_LEAF => {
3514                    let e = MemLeaf::read_from(self, i).expect("read");
3515                    writeln!(f, "Inline{:?}", e)?;
3516                    // Just skip the type int byte so we can parse inlined structures.
3517                    buf.push(TYPE_INLINE_LEAF);
3518                }
3519                TYPE_LINK => {
3520                    let e = MemLink::read_from(self, i).unwrap();
3521                    e.write_to(&mut buf, &offset_map).expect("write");
3522                    writeln!(f, "{:?}", e)?;
3523                }
3524                TYPE_KEY => {
3525                    let e = MemKey::read_from(self, i).expect("read");
3526                    e.write_to(&mut buf, &offset_map).expect("write");
3527                    writeln!(f, "{:?}", e)?;
3528                }
3529                TYPE_EXT_KEY => {
3530                    let e = MemExtKey::read_from(self, i).expect("read");
3531                    e.write_to(&mut buf, &offset_map).expect("write");
3532                    writeln!(f, "{:?}", e)?;
3533                }
3534                TYPE_ROOT => {
3535                    root_offset = i as usize;
3536                    let e = MemRoot::read_from(self, i).expect("read").0;
3537                    e.write_to(&mut buf, &offset_map).expect("write");
3538                    // Preview the next entry. If it's not Checksum, then it's in "legacy"
3539                    // format and we need to write "reversed_vlq" length of the Root entry
3540                    // immediately.
3541                    if self.buf.get(buf.len()) != Some(&TYPE_CHECKSUM)
3542                        || MemChecksum::read_from(&self, buf.len() as u64).is_err()
3543                    {
3544                        let root_len = buf.len() - root_offset;
3545                        write_reversed_vlq(&mut buf, root_len).expect("write");
3546                    }
3547                    writeln!(f, "{:?}", e)?;
3548                }
3549                TYPE_CHECKSUM => {
3550                    let e = MemChecksum::read_from(&self, i).expect("read").0;
3551                    e.write_to(&mut buf, &offset_map).expect("write");
3552                    let root_checksum_len = buf.len() - root_offset;
3553                    let vlq_start = buf.len();
3554                    write_reversed_vlq(&mut buf, root_checksum_len).expect("write");
3555                    let vlq_end = buf.len();
3556                    debug_assert_eq!(
3557                        &buf[vlq_start..vlq_end],
3558                        &self.buf[vlq_start..vlq_end],
3559                        "reversed vlq should match (root+checksum len: {})",
3560                        root_checksum_len
3561                    );
3562                    writeln!(f, "{:?}", e)?;
3563                }
3564                _ => {
3565                    writeln!(f, "Broken Data!")?;
3566                    break;
3567                }
3568            }
3569        }
3570
3571        if buf.len() > 1 && self.buf[..] != buf[..] {
3572            debug_assert_eq!(&self.buf[..], &buf[..]);
3573            return writeln!(f, "Inconsistent Data!");
3574        }
3575
3576        // In-memory entries
3577        for (i, e) in self.dirty_radixes.iter().enumerate() {
3578            write!(f, "Radix[{}]: ", i)?;
3579            writeln!(f, "{:?}", e)?;
3580        }
3581
3582        for (i, e) in self.dirty_leafs.iter().enumerate() {
3583            write!(f, "Leaf[{}]: ", i)?;
3584            writeln!(f, "{:?}", e)?;
3585        }
3586
3587        for (i, e) in self.dirty_links.iter().enumerate() {
3588            write!(f, "Link[{}]: ", i)?;
3589            writeln!(f, "{:?}", e)?;
3590        }
3591
3592        for (i, e) in self.dirty_keys.iter().enumerate() {
3593            write!(f, "Key[{}]: ", i)?;
3594            writeln!(f, "{:?}", e)?;
3595        }
3596
3597        for (i, e) in self.dirty_ext_keys.iter().enumerate() {
3598            writeln!(f, "ExtKey[{}]: {:?}", i, e)?;
3599        }
3600
3601        Ok(())
3602    }
3603}
3604
3605#[cfg(test)]
3606mod tests {
3607    use std::collections::BTreeSet;
3608    use std::collections::HashMap;
3609    use std::fs::File;
3610
3611    use quickcheck::quickcheck;
3612    use tempfile::tempdir;
3613
3614    use super::InsertValue::PrependReplace;
3615    use super::*;
3616
3617    fn open_opts() -> OpenOptions {
3618        let mut opts = OpenOptions::new();
3619        opts.checksum_chunk_size_logarithm(4);
3620        opts
3621    }
3622
3623    fn in_memory_index() -> Index {
3624        OpenOptions::new().create_in_memory().unwrap()
3625    }
3626
3627    #[test]
3628    fn test_scan_prefix() {
3629        let dir = tempdir().unwrap();
3630        let mut index = open_opts().open(dir.path().join("a")).unwrap();
3631        let keys: Vec<&[u8]> = vec![b"01", b"02", b"03", b"031", b"0410", b"042", b"05000"];
3632        for (i, key) in keys.iter().enumerate() {
3633            index.insert(key, i as u64).unwrap();
3634        }
3635
3636        // Return keys with the given prefix. Also verify LinkOffsets.
3637        let scan_keys = |prefix: &[u8]| -> Vec<Vec<u8>> {
3638            let iter = index.scan_prefix(prefix).unwrap();
3639            iter_to_keys(&index, &keys, &iter)
3640        };
3641
3642        assert_eq!(scan_keys(b"01"), vec![b"01"]);
3643        assert!(scan_keys(b"010").is_empty());
3644        assert_eq!(scan_keys(b"02"), vec![b"02"]);
3645        assert!(scan_keys(b"020").is_empty());
3646        assert_eq!(scan_keys(b"03"), vec![&b"03"[..], b"031"]);
3647        assert_eq!(scan_keys(b"031"), vec![b"031"]);
3648        assert!(scan_keys(b"032").is_empty());
3649        assert_eq!(scan_keys(b"04"), vec![&b"0410"[..], b"042"]);
3650        assert_eq!(scan_keys(b"041"), vec![b"0410"]);
3651        assert_eq!(scan_keys(b"0410"), vec![b"0410"]);
3652        assert!(scan_keys(b"04101").is_empty());
3653        assert!(scan_keys(b"0412").is_empty());
3654        assert_eq!(scan_keys(b"042"), vec![b"042"]);
3655        assert!(scan_keys(b"0421").is_empty());
3656        assert_eq!(scan_keys(b"05"), vec![b"05000"]);
3657        assert_eq!(scan_keys(b"0500"), vec![b"05000"]);
3658        assert_eq!(scan_keys(b"05000"), vec![b"05000"]);
3659        assert!(scan_keys(b"051").is_empty());
3660        assert_eq!(scan_keys(b"0"), keys);
3661        assert_eq!(scan_keys(b""), keys);
3662        assert!(scan_keys(b"1").is_empty());
3663
3664        // 0x30 = b'0'
3665        assert_eq!(index.scan_prefix_hex(b"30").unwrap().count(), keys.len());
3666        assert_eq!(index.scan_prefix_hex(b"3").unwrap().count(), keys.len());
3667        assert_eq!(index.scan_prefix_hex(b"31").unwrap().count(), 0);
3668    }
3669
3670    #[test]
3671    fn test_remove() {
3672        let dir = tempdir().unwrap();
3673        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3674
3675        // Removing keys on an empty index should not create new entries.
3676        let text = format!("{:?}", &index);
3677        index.remove("").unwrap();
3678        index.remove("a").unwrap();
3679        assert_eq!(text, format!("{:?}", &index));
3680
3681        index.insert(b"abc", 42).unwrap();
3682        index.insert(b"abc", 43).unwrap();
3683        index.insert(b"abxyz", 44).unwrap();
3684        index.flush().unwrap();
3685
3686        // Remove known keys.
3687        assert_eq!(index.range(..).unwrap().count(), 2);
3688        index.remove(b"abxyz").unwrap();
3689        assert_eq!(index.range(..).unwrap().count(), 1);
3690        index.remove(b"abc").unwrap();
3691        assert_eq!(index.range(..).unwrap().count(), 0);
3692
3693        // Since all entries are "dirty" in memory, removing keys should not create new entries.
3694        let text = format!("{:?}", &index);
3695        index.remove("").unwrap();
3696        index.remove("a").unwrap();
3697        index.remove("ab").unwrap();
3698        index.remove("abc").unwrap();
3699        index.remove("abcd").unwrap();
3700        index.remove("abcde").unwrap();
3701        index.remove("abcx").unwrap();
3702        index.remove("abcxyz").unwrap();
3703        index.remove("abcxyzz").unwrap();
3704        assert_eq!(text, format!("{:?}", &index));
3705
3706        // Removal state can be saved to disk.
3707        index.flush().unwrap();
3708        let index = open_opts().open(dir.path().join("a")).expect("open");
3709        assert_eq!(index.range(..).unwrap().count(), 0);
3710    }
3711
3712    #[test]
3713    fn test_remove_recursive() {
3714        let dir = tempdir().unwrap();
3715        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3716        index.insert(b"abc", 42).unwrap();
3717        index.insert(b"abc", 42).unwrap();
3718        index.insert(b"abxyz1", 42).unwrap();
3719        index.insert(b"abxyz2", 42).unwrap();
3720        index.insert(b"abxyz33333", 42).unwrap();
3721        index.insert(b"abxyz44444", 42).unwrap();
3722        index.insert(b"aby", 42).unwrap();
3723        index.flush().unwrap();
3724
3725        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3726        let mut n = index.range(..).unwrap().count();
3727        index.remove_prefix(b"abxyz33333333333").unwrap(); // nothing removed
3728        assert_eq!(index.range(..).unwrap().count(), n);
3729
3730        index.remove_prefix(b"abxyz33333").unwrap(); // exact match
3731        n -= 1; // abxyz33333 removed
3732        assert_eq!(index.range(..).unwrap().count(), n);
3733
3734        index.remove_prefix(b"abxyz4").unwrap(); // prefix exact match
3735        n -= 1; // abxyz44444 removed
3736        assert_eq!(index.range(..).unwrap().count(), n);
3737
3738        index.remove_prefix(b"ab").unwrap(); // prefix match
3739        n -= 4; // abc, aby, abxyz1, abxyz2 removed
3740        assert_eq!(index.range(..).unwrap().count(), n);
3741
3742        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3743        index.remove_prefix(b"").unwrap(); // remove everything
3744        assert_eq!(index.range(..).unwrap().count(), 0);
3745    }
3746
3747    #[test]
3748    fn test_distinct_one_byte_keys() {
3749        let dir = tempdir().unwrap();
3750        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3751        assert_eq!(
3752            format!("{:?}", index),
3753            "Index { len: 0, root: Radix[0] }\n\
3754             Radix[0]: Radix { link: None }\n"
3755        );
3756
3757        index.insert(&[], 55).expect("update");
3758        assert_eq!(
3759            format!("{:?}", index),
3760            r#"Index { len: 0, root: Radix[0] }
3761Radix[0]: Radix { link: Link[0] }
3762Link[0]: Link { value: 55, next: None }
3763"#
3764        );
3765
3766        index.insert(&[0x12], 77).expect("update");
3767        assert_eq!(
3768            format!("{:?}", index),
3769            r#"Index { len: 0, root: Radix[0] }
3770Radix[0]: Radix { link: Link[0], 1: Leaf[0] }
3771Leaf[0]: Leaf { key: Key[0], link: Link[1] }
3772Link[0]: Link { value: 55, next: None }
3773Link[1]: Link { value: 77, next: None }
3774Key[0]: Key { key: 12 }
3775"#
3776        );
3777
3778        let link = index.get(&[0x12]).expect("get");
3779        index
3780            .insert_advanced(InsertKey::Embed(&[0x34]), PrependReplace(99, link))
3781            .expect("update");
3782        assert_eq!(
3783            format!("{:?}", index),
3784            r#"Index { len: 0, root: Radix[0] }
3785Radix[0]: Radix { link: Link[0], 1: Leaf[0], 3: Leaf[1] }
3786Leaf[0]: Leaf { key: Key[0], link: Link[1] }
3787Leaf[1]: Leaf { key: Key[1], link: Link[2] }
3788Link[0]: Link { value: 55, next: None }
3789Link[1]: Link { value: 77, next: None }
3790Link[2]: Link { value: 99, next: Link[1] }
3791Key[0]: Key { key: 12 }
3792Key[1]: Key { key: 34 }
3793"#
3794        );
3795    }
3796
3797    #[test]
3798    fn test_distinct_one_byte_keys_flush() {
3799        let dir = tempdir().unwrap();
3800        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3801
3802        // 1st flush.
3803        assert_eq!(index.flush().expect("flush"), 24);
3804        assert_eq!(
3805            format!("{:?}", index),
3806            r#"Index { len: 24, root: Disk[1] }
3807Disk[1]: Radix { link: None }
3808Disk[5]: Root { radix: Disk[1] }
3809Disk[8]: Checksum { start: 0, end: 8, chunk_size_logarithm: 4, checksums.len(): 1 }
3810"#
3811        );
3812
3813        // Mixed on-disk and in-memory state.
3814        index.insert(&[], 55).expect("update");
3815        index.insert(&[0x12], 77).expect("update");
3816        assert_eq!(
3817            format!("{:?}", index),
3818            r#"Index { len: 24, root: Radix[0] }
3819Disk[1]: Radix { link: None }
3820Disk[5]: Root { radix: Disk[1] }
3821Disk[8]: Checksum { start: 0, end: 8, chunk_size_logarithm: 4, checksums.len(): 1 }
3822Radix[0]: Radix { link: Link[0], 1: Leaf[0] }
3823Leaf[0]: Leaf { key: Key[0], link: Link[1] }
3824Link[0]: Link { value: 55, next: None }
3825Link[1]: Link { value: 77, next: None }
3826Key[0]: Key { key: 12 }
3827"#
3828        );
3829
3830        // After 2nd flush. There are 2 roots.
3831        let link = index.get(&[0x12]).expect("get");
3832        index
3833            .insert_advanced(InsertKey::Embed(&[0x34]), PrependReplace(99, link))
3834            .expect("update");
3835        index.flush().expect("flush");
3836        assert_eq!(
3837            format!("{:?}", index),
3838            r#"Index { len: 104, root: Disk[45] }
3839Disk[1]: Radix { link: None }
3840Disk[5]: Root { radix: Disk[1] }
3841Disk[8]: Checksum { start: 0, end: 8, chunk_size_logarithm: 4, checksums.len(): 1 }
3842Disk[24]: Key { key: 12 }
3843Disk[27]: Key { key: 34 }
3844Disk[30]: Link { value: 55, next: None }
3845Disk[33]: Link { value: 77, next: None }
3846Disk[36]: Link { value: 99, next: Disk[33] }
3847Disk[39]: Leaf { key: Disk[24], link: Disk[33] }
3848Disk[42]: Leaf { key: Disk[27], link: Disk[36] }
3849Disk[45]: Radix { link: Disk[30], 1: Disk[39], 3: Disk[42] }
3850Disk[61]: Root { radix: Disk[45] }
3851Disk[64]: Checksum { start: 0, end: 64, chunk_size_logarithm: 4, checksums.len(): 4 }
3852"#
3853        );
3854    }
3855
3856    #[test]
3857    fn test_leaf_split() {
3858        let dir = tempdir().unwrap();
3859        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3860
3861        // Example 1: two keys are not prefixes of each other
3862        index.insert(&[0x12, 0x34], 5).expect("insert");
3863        assert_eq!(
3864            format!("{:?}", index),
3865            r#"Index { len: 0, root: Radix[0] }
3866Radix[0]: Radix { link: None, 1: Leaf[0] }
3867Leaf[0]: Leaf { key: Key[0], link: Link[0] }
3868Link[0]: Link { value: 5, next: None }
3869Key[0]: Key { key: 12 34 }
3870"#
3871        );
3872        index.insert(&[0x12, 0x78], 7).expect("insert");
3873        assert_eq!(
3874            format!("{:?}", index),
3875            r#"Index { len: 0, root: Radix[0] }
3876Radix[0]: Radix { link: None, 1: Radix[1] }
3877Radix[1]: Radix { link: None, 2: Radix[2] }
3878Radix[2]: Radix { link: None, 3: Leaf[0], 7: Leaf[1] }
3879Leaf[0]: Leaf { key: Key[0], link: Link[0] }
3880Leaf[1]: Leaf { key: Key[1], link: Link[1] }
3881Link[0]: Link { value: 5, next: None }
3882Link[1]: Link { value: 7, next: None }
3883Key[0]: Key { key: 12 34 }
3884Key[1]: Key { key: 12 78 }
3885"#
3886        );
3887
3888        // Example 2: new key is a prefix of the old key
3889        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3890        index.insert(&[0x12, 0x34], 5).expect("insert");
3891        index.insert(&[0x12], 7).expect("insert");
3892        assert_eq!(
3893            format!("{:?}", index),
3894            r#"Index { len: 0, root: Radix[0] }
3895Radix[0]: Radix { link: None, 1: Radix[1] }
3896Radix[1]: Radix { link: None, 2: Radix[2] }
3897Radix[2]: Radix { link: Link[1], 3: Leaf[0] }
3898Leaf[0]: Leaf { key: Key[0], link: Link[0] }
3899Link[0]: Link { value: 5, next: None }
3900Link[1]: Link { value: 7, next: None }
3901Key[0]: Key { key: 12 34 }
3902"#
3903        );
3904
3905        // Example 3: old key is a prefix of the new key
3906        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3907        index.insert(&[0x12], 5).expect("insert");
3908        index.insert(&[0x12, 0x78], 7).expect("insert");
3909        assert_eq!(
3910            format!("{:?}", index),
3911            r#"Index { len: 0, root: Radix[0] }
3912Radix[0]: Radix { link: None, 1: Radix[1] }
3913Radix[1]: Radix { link: None, 2: Radix[2] }
3914Radix[2]: Radix { link: Link[0], 7: Leaf[1] }
3915Leaf[0]: Leaf (unused)
3916Leaf[1]: Leaf { key: Key[1], link: Link[1] }
3917Link[0]: Link { value: 5, next: None }
3918Link[1]: Link { value: 7, next: None }
3919Key[0]: Key (unused)
3920Key[1]: Key { key: 12 78 }
3921"#
3922        );
3923
3924        // Same key. Multiple values.
3925        let mut index = open_opts().open(dir.path().join("a")).expect("open");
3926        index.insert(&[0x12], 5).expect("insert");
3927        index.insert(&[0x12], 7).expect("insert");
3928        assert_eq!(
3929            format!("{:?}", index),
3930            r#"Index { len: 0, root: Radix[0] }
3931Radix[0]: Radix { link: None, 1: Leaf[0] }
3932Leaf[0]: Leaf { key: Key[0], link: Link[1] }
3933Link[0]: Link { value: 5, next: None }
3934Link[1]: Link { value: 7, next: Link[0] }
3935Key[0]: Key { key: 12 }
3936"#
3937        );
3938    }
3939
3940    #[test]
3941    fn test_leaf_split_flush() {
3942        // Similar with test_leaf_split, but flush the first key before inserting the second.
3943        // This triggers some new code paths.
3944        let dir = tempdir().unwrap();
3945        let mut index = open_opts().open(dir.path().join("1")).expect("open");
3946
3947        // Example 1: two keys are not prefixes of each other
3948        index.insert(&[0x12, 0x34], 5).expect("insert");
3949        index.flush().expect("flush");
3950        assert_eq!(
3951            format!("{:?}", index),
3952            r#"Index { len: 46, root: Disk[11] }
3953Disk[1]: Key { key: 12 34 }
3954Disk[5]: Link { value: 5, next: None }
3955Disk[8]: Leaf { key: Disk[1], link: Disk[5] }
3956Disk[11]: Radix { link: None, 1: Disk[8] }
3957Disk[19]: Root { radix: Disk[11] }
3958Disk[22]: Checksum { start: 0, end: 22, chunk_size_logarithm: 4, checksums.len(): 2 }
3959"#
3960        );
3961        index.insert(&[0x12, 0x78], 7).expect("insert");
3962        assert_eq!(
3963            format!("{:?}", index),
3964            r#"Index { len: 46, root: Radix[0] }
3965Disk[1]: Key { key: 12 34 }
3966Disk[5]: Link { value: 5, next: None }
3967Disk[8]: Leaf { key: Disk[1], link: Disk[5] }
3968Disk[11]: Radix { link: None, 1: Disk[8] }
3969Disk[19]: Root { radix: Disk[11] }
3970Disk[22]: Checksum { start: 0, end: 22, chunk_size_logarithm: 4, checksums.len(): 2 }
3971Radix[0]: Radix { link: None, 1: Radix[1] }
3972Radix[1]: Radix { link: None, 2: Radix[2] }
3973Radix[2]: Radix { link: None, 3: Disk[8], 7: Leaf[0] }
3974Leaf[0]: Leaf { key: Key[0], link: Link[0] }
3975Link[0]: Link { value: 7, next: None }
3976Key[0]: Key { key: 12 78 }
3977"#
3978        );
3979
3980        // Example 2: new key is a prefix of the old key
3981        let mut index = open_opts().open(dir.path().join("2")).expect("open");
3982        index.insert(&[0x12, 0x34], 5).expect("insert");
3983        index.flush().expect("flush");
3984        index.insert(&[0x12], 7).expect("insert");
3985        assert_eq!(
3986            format!("{:?}", index),
3987            r#"Index { len: 46, root: Radix[0] }
3988Disk[1]: Key { key: 12 34 }
3989Disk[5]: Link { value: 5, next: None }
3990Disk[8]: Leaf { key: Disk[1], link: Disk[5] }
3991Disk[11]: Radix { link: None, 1: Disk[8] }
3992Disk[19]: Root { radix: Disk[11] }
3993Disk[22]: Checksum { start: 0, end: 22, chunk_size_logarithm: 4, checksums.len(): 2 }
3994Radix[0]: Radix { link: None, 1: Radix[1] }
3995Radix[1]: Radix { link: None, 2: Radix[2] }
3996Radix[2]: Radix { link: Link[0], 3: Disk[8] }
3997Link[0]: Link { value: 7, next: None }
3998"#
3999        );
4000
4001        // Example 3: old key is a prefix of the new key
4002        // Only one flush - only one key is written.
4003        let mut index = open_opts().open(dir.path().join("3a")).expect("open");
4004        index.insert(&[0x12], 5).expect("insert");
4005        index.insert(&[0x12, 0x78], 7).expect("insert");
4006        index.flush().expect("flush");
4007        assert_eq!(
4008            format!("{:?}", index),
4009            r#"Index { len: 77, root: Disk[34] }
4010Disk[1]: Key { key: 12 78 }
4011Disk[5]: Link { value: 5, next: None }
4012Disk[8]: Link { value: 7, next: None }
4013Disk[11]: Leaf { key: Disk[1], link: Disk[8] }
4014Disk[14]: Radix { link: Disk[5], 7: Disk[11] }
4015Disk[26]: Radix { link: None, 2: Disk[14] }
4016Disk[34]: Radix { link: None, 1: Disk[26] }
4017Disk[42]: Root { radix: Disk[34] }
4018Disk[45]: Checksum { start: 0, end: 45, chunk_size_logarithm: 4, checksums.len(): 3 }
4019"#
4020        );
4021
4022        // With two flushes - the old key cannot be removed since it was written.
4023        let mut index = open_opts().open(dir.path().join("3b")).expect("open");
4024        index.insert(&[0x12], 5).expect("insert");
4025        index.flush().expect("flush");
4026        index.insert(&[0x12, 0x78], 7).expect("insert");
4027        assert_eq!(
4028            format!("{:?}", index),
4029            r#"Index { len: 45, root: Radix[0] }
4030Disk[1]: Key { key: 12 }
4031Disk[4]: Link { value: 5, next: None }
4032Disk[7]: Leaf { key: Disk[1], link: Disk[4] }
4033Disk[10]: Radix { link: None, 1: Disk[7] }
4034Disk[18]: Root { radix: Disk[10] }
4035Disk[21]: Checksum { start: 0, end: 21, chunk_size_logarithm: 4, checksums.len(): 2 }
4036Radix[0]: Radix { link: None, 1: Radix[1] }
4037Radix[1]: Radix { link: None, 2: Radix[2] }
4038Radix[2]: Radix { link: Disk[4], 7: Leaf[0] }
4039Leaf[0]: Leaf { key: Key[0], link: Link[0] }
4040Link[0]: Link { value: 7, next: None }
4041Key[0]: Key { key: 12 78 }
4042"#
4043        );
4044
4045        // Same key. Multiple values.
4046        let mut index = open_opts().open(dir.path().join("4")).expect("open");
4047        index.insert(&[0x12], 5).expect("insert");
4048        index.flush().expect("flush");
4049        index.insert(&[0x12], 7).expect("insert");
4050        assert_eq!(
4051            format!("{:?}", index),
4052            r#"Index { len: 45, root: Radix[0] }
4053Disk[1]: Key { key: 12 }
4054Disk[4]: Link { value: 5, next: None }
4055Disk[7]: Leaf { key: Disk[1], link: Disk[4] }
4056Disk[10]: Radix { link: None, 1: Disk[7] }
4057Disk[18]: Root { radix: Disk[10] }
4058Disk[21]: Checksum { start: 0, end: 21, chunk_size_logarithm: 4, checksums.len(): 2 }
4059Radix[0]: Radix { link: None, 1: Leaf[0] }
4060Leaf[0]: Leaf { key: Disk[1], link: Link[0] }
4061Link[0]: Link { value: 7, next: Disk[4] }
4062"#
4063        );
4064    }
4065
4066    #[test]
4067    fn test_external_keys() {
4068        let buf = Arc::new(vec![0x12u8, 0x34, 0x56, 0x78, 0x9a, 0xbc]);
4069        let dir = tempdir().unwrap();
4070        let mut index = open_opts()
4071            .key_buf(Some(buf.clone()))
4072            .open(dir.path().join("a"))
4073            .expect("open");
4074        index
4075            .insert_advanced(InsertKey::Reference((1, 2)), InsertValue::Prepend(55))
4076            .expect("insert");
4077        index.flush().expect("flush");
4078        index
4079            .insert_advanced(InsertKey::Reference((1, 3)), InsertValue::Prepend(77))
4080            .expect("insert");
4081        assert_eq!(
4082            format!("{:?}", index),
4083            r#"Index { len: 43, root: Radix[0] }
4084Disk[1]: InlineLeaf { key: Disk[2], link: Disk[5] }
4085Disk[2]: ExtKey { start: 1, len: 2 }
4086Disk[5]: Link { value: 55, next: None }
4087Disk[8]: Radix { link: None, 3: Disk[1] }
4088Disk[16]: Root { radix: Disk[8] }
4089Disk[19]: Checksum { start: 0, end: 19, chunk_size_logarithm: 4, checksums.len(): 2 }
4090Radix[0]: Radix { link: None, 3: Radix[1] }
4091Radix[1]: Radix { link: None, 4: Radix[2] }
4092Radix[2]: Radix { link: None, 5: Radix[3] }
4093Radix[3]: Radix { link: None, 6: Radix[4] }
4094Radix[4]: Radix { link: Disk[5], 7: Leaf[0] }
4095Leaf[0]: Leaf { key: ExtKey[0], link: Link[0] }
4096Link[0]: Link { value: 77, next: None }
4097ExtKey[0]: ExtKey { start: 1, len: 3 }
4098"#
4099        );
4100    }
4101
4102    #[test]
4103    fn test_inline_leafs() {
4104        let buf = Arc::new(vec![0x12u8, 0x34, 0x56, 0x78, 0x9a, 0xbc]);
4105        let dir = tempdir().unwrap();
4106        let mut index = open_opts()
4107            .key_buf(Some(buf.clone()))
4108            .open(dir.path().join("a"))
4109            .expect("open");
4110
4111        // New entry. Should be inlined.
4112        index
4113            .insert_advanced(InsertKey::Reference((1, 1)), InsertValue::Prepend(55))
4114            .unwrap();
4115        index.flush().expect("flush");
4116
4117        // Independent leaf. Should also be inlined.
4118        index
4119            .insert_advanced(InsertKey::Reference((2, 1)), InsertValue::Prepend(77))
4120            .unwrap();
4121        index.flush().expect("flush");
4122
4123        // The link with 88 should refer to the inlined leaf 77.
4124        index
4125            .insert_advanced(InsertKey::Reference((2, 1)), InsertValue::Prepend(88))
4126            .unwrap();
4127        index.flush().expect("flush");
4128
4129        // Not inlined because dependent link was not written first.
4130        // (could be optimized in the future)
4131        index
4132            .insert_advanced(InsertKey::Reference((3, 1)), InsertValue::Prepend(99))
4133            .unwrap();
4134        index
4135            .insert_advanced(InsertKey::Reference((3, 1)), InsertValue::Prepend(100))
4136            .unwrap();
4137        index.flush().expect("flush");
4138
4139        assert_eq!(
4140            format!("{:?}", index),
4141            r#"Index { len: 257, root: Disk[181] }
4142Disk[1]: InlineLeaf { key: Disk[2], link: Disk[5] }
4143Disk[2]: ExtKey { start: 1, len: 1 }
4144Disk[5]: Link { value: 55, next: None }
4145Disk[8]: Radix { link: None, 3: Disk[1] }
4146Disk[16]: Root { radix: Disk[8] }
4147Disk[19]: Checksum { start: 0, end: 19, chunk_size_logarithm: 4, checksums.len(): 2 }
4148Disk[43]: InlineLeaf { key: Disk[44], link: Disk[47] }
4149Disk[44]: ExtKey { start: 2, len: 1 }
4150Disk[47]: Link { value: 77, next: None }
4151Disk[50]: Radix { link: None, 3: Disk[1], 5: Disk[43] }
4152Disk[62]: Root { radix: Disk[50] }
4153Disk[65]: Checksum { start: 19, end: 65, chunk_size_logarithm: 4, checksums.len(): 4 }
4154Disk[105]: Link { value: 88, next: Disk[47] }
4155Disk[108]: Leaf { key: Disk[44], link: Disk[105] }
4156Disk[111]: Radix { link: None, 3: Disk[1], 5: Disk[108] }
4157Disk[123]: Root { radix: Disk[111] }
4158Disk[126]: Checksum { start: 65, end: 126, chunk_size_logarithm: 4, checksums.len(): 4 }
4159Disk[166]: ExtKey { start: 3, len: 1 }
4160Disk[169]: Link { value: 99, next: None }
4161Disk[172]: Link { value: 100, next: Disk[169] }
4162Disk[176]: Leaf { key: Disk[166], link: Disk[172] }
4163Disk[181]: Radix { link: None, 3: Disk[1], 5: Disk[108], 7: Disk[176] }
4164Disk[197]: Root { radix: Disk[181] }
4165Disk[201]: Checksum { start: 126, end: 201, chunk_size_logarithm: 4, checksums.len(): 6 }
4166"#
4167        )
4168    }
4169
4170    #[test]
4171    fn test_clone() {
4172        let dir = tempdir().unwrap();
4173        let mut index = open_opts().open(dir.path().join("a")).expect("open");
4174
4175        // Test clone empty index
4176        assert_eq!(
4177            format!("{:?}", index.try_clone().unwrap()),
4178            format!("{:?}", index)
4179        );
4180        assert_eq!(
4181            format!("{:?}", index.try_clone_without_dirty().unwrap()),
4182            format!("{:?}", index)
4183        );
4184
4185        // Test on-disk Index
4186        index.insert(&[], 55).expect("insert");
4187        index.insert(&[0x12], 77).expect("insert");
4188        index.flush().expect("flush");
4189        index.insert(&[0x15], 99).expect("insert");
4190
4191        let mut index2 = index.try_clone().expect("clone");
4192        assert_eq!(format!("{:?}", index), format!("{:?}", index2));
4193
4194        // Test clone without in-memory part
4195        let index2clean = index.try_clone_without_dirty().unwrap();
4196        index2.clear_dirty();
4197        assert_eq!(format!("{:?}", index2), format!("{:?}", index2clean));
4198
4199        // Test in-memory Index
4200        let mut index3 = open_opts()
4201            .checksum_chunk_size_logarithm(0)
4202            .create_in_memory()
4203            .unwrap();
4204        let index4 = index3.try_clone().unwrap();
4205        assert_eq!(format!("{:?}", index3), format!("{:?}", index4));
4206
4207        index3.insert(&[0x15], 99).expect("insert");
4208        let index4 = index3.try_clone().unwrap();
4209        assert_eq!(format!("{:?}", index3), format!("{:?}", index4));
4210    }
4211
4212    #[test]
4213    fn test_open_options_write() {
4214        let dir = tempdir().unwrap();
4215        let mut index = OpenOptions::new().open(dir.path().join("a")).expect("open");
4216        index.insert(&[0x12], 77).expect("insert");
4217        index.flush().expect("flush");
4218
4219        OpenOptions::new()
4220            .write(Some(false))
4221            .open(dir.path().join("b"))
4222            .expect_err("open"); // file does not exist
4223
4224        let mut index = OpenOptions::new()
4225            .write(Some(false))
4226            .open(dir.path().join("a"))
4227            .expect("open");
4228        index.flush().expect_err("cannot flush read-only index");
4229    }
4230
4231    #[test]
4232    fn test_linked_list_values() {
4233        let dir = tempdir().unwrap();
4234        let mut index = OpenOptions::new().open(dir.path().join("a")).expect("open");
4235        let list = vec![11u64, 17, 19, 31];
4236        for i in list.iter().rev() {
4237            index.insert(&[], *i).expect("insert");
4238        }
4239
4240        let list1: Vec<u64> = index
4241            .get(&[])
4242            .unwrap()
4243            .values(&index)
4244            .map(|v| v.unwrap())
4245            .collect();
4246        assert_eq!(list, list1);
4247
4248        index.flush().expect("flush");
4249        let list2: Vec<u64> = index
4250            .get(&[])
4251            .unwrap()
4252            .values(&index)
4253            .map(|v| v.unwrap())
4254            .collect();
4255        assert_eq!(list, list2);
4256
4257        // Empty linked list
4258        assert_eq!(index.get(&[1]).unwrap().values(&index).count(), 0);
4259
4260        // In case error happens, the iteration still stops.
4261        index.insert(&[], 5).expect("insert");
4262        index.dirty_links[0].next_link_offset = LinkOffset(Offset(1000));
4263        // Note: `collect` can return `crate::Result<Vec<u64>>`. But that does not exercises the
4264        // infinite loop avoidance logic since `collect` stops iteration at the first error.
4265        let list_errored: Vec<crate::Result<u64>> =
4266            index.get(&[]).unwrap().values(&index).collect();
4267        assert!(list_errored[list_errored.len() - 1].is_err());
4268    }
4269
4270    #[test]
4271    fn test_checksum_bitflip_1b() {
4272        test_checksum_bitflip_with_size(0);
4273    }
4274
4275    #[test]
4276    fn test_checksum_bitflip_2b() {
4277        test_checksum_bitflip_with_size(1);
4278    }
4279
4280    #[test]
4281    fn test_checksum_bitflip_16b() {
4282        test_checksum_bitflip_with_size(4);
4283    }
4284
4285    #[test]
4286    fn test_checksum_bitflip_1mb() {
4287        test_checksum_bitflip_with_size(20);
4288    }
4289
4290    fn test_checksum_bitflip_with_size(checksum_log_size: u32) {
4291        let dir = tempdir().unwrap();
4292
4293        let keys = if cfg!(debug_assertions) {
4294            // Debug build is much slower than release build. Limit the key length to 1-byte.
4295            vec![vec![0x13], vec![0x17], vec![]]
4296        } else {
4297            // Release build can afford 2-byte key test.
4298            vec![
4299                vec![0x12, 0x34],
4300                vec![0x12, 0x78],
4301                vec![0x34, 0x56],
4302                vec![0x34],
4303                vec![0x78],
4304                vec![0x78, 0x9a],
4305            ]
4306        };
4307
4308        let opts = open_opts()
4309            .checksum_chunk_size_logarithm(checksum_log_size)
4310            .clone();
4311
4312        let bytes = {
4313            let mut index = opts.clone().open(dir.path().join("a")).expect("open");
4314
4315            for (i, key) in keys.iter().enumerate() {
4316                index.insert(key, i as u64).expect("insert");
4317                index.insert(key, (i as u64) << 50).expect("insert");
4318            }
4319            index.flush().expect("flush");
4320
4321            // Read the raw bytes of the index content
4322            let mut f = File::open(dir.path().join("a")).expect("open");
4323            let mut buf = vec![];
4324            f.read_to_end(&mut buf).expect("read");
4325
4326            // Drop `index` here. This would unmap files so File::create below
4327            // can work on Windows.
4328            if std::env::var("DEBUG").is_ok() {
4329                eprintln!("{:?}", &index);
4330            }
4331
4332            buf
4333        };
4334
4335        fn is_corrupted(index: &Index, key: &[u8]) -> bool {
4336            let link = index.get(&key);
4337            match link {
4338                Err(_) => true,
4339                Ok(link) => link.values(&index).any(|v| v.is_err()),
4340            }
4341        }
4342
4343        // Every bit change should trigger errors when reading all contents
4344        for i in 0..(bytes.len() * 8) {
4345            let mut bytes = bytes.clone();
4346            bytes[i / 8] ^= 1u8 << (i % 8);
4347            let mut f = File::create(dir.path().join("a")).expect("create");
4348            f.write_all(&bytes).expect("write");
4349
4350            let index = opts.clone().open(dir.path().join("a"));
4351            let detected = match index {
4352                Err(_) => true,
4353                Ok(index) => {
4354                    let range = if cfg!(debug_assertions) { 0 } else { 0x10000 };
4355                    (0..range).any(|key_int| {
4356                        let key = [(key_int >> 8) as u8, (key_int & 0xff) as u8];
4357                        is_corrupted(&index, &key)
4358                    }) || (0..0x100).any(|key_int| {
4359                        let key = [key_int as u8];
4360                        is_corrupted(&index, &key)
4361                    }) || is_corrupted(&index, &[])
4362                }
4363            };
4364            assert!(
4365                detected,
4366                "bit flip at byte {} , bit {} is not detected (set DEBUG=1 to see Index dump)",
4367                i / 8,
4368                i % 8,
4369            );
4370        }
4371    }
4372
4373    #[test]
4374    fn test_checksum_toggle() {
4375        let dir = tempdir().unwrap();
4376        let open = |enabled: bool| {
4377            open_opts()
4378                .checksum_enabled(enabled)
4379                .open(dir.path().join("a"))
4380                .expect("open")
4381        };
4382
4383        // Starting with checksum off.
4384        let mut index = open(false);
4385        index.verify().unwrap();
4386        index.insert(b"abcdefg", 0x1234).unwrap();
4387        index.flush().unwrap();
4388        index.insert(b"bcdefgh", 0x2345).unwrap();
4389        index.flush().unwrap();
4390
4391        // Turn on.
4392        let mut index = open(true);
4393        index.verify().unwrap();
4394        index.insert(b"cdefghi", 0x3456).unwrap();
4395        index.flush().unwrap();
4396        index.insert(b"defghij", 0x4567).unwrap();
4397        index.flush().unwrap();
4398
4399        // Turn off.
4400        let mut index = open(false);
4401        index.verify().unwrap();
4402        index.insert(b"efghijh", 0x5678).unwrap();
4403        index.flush().unwrap();
4404        index.insert(b"fghijkl", 0x7890).unwrap();
4405        index.flush().unwrap();
4406
4407        assert_eq!(
4408            format!("{:?}", index),
4409            r#"Index { len: 415, root: Disk[402] }
4410Disk[1]: Key { key: 61 62 63 64 65 66 67 }
4411Disk[10]: Link { value: 4660, next: None }
4412Disk[14]: Leaf { key: Disk[1], link: Disk[10] }
4413Disk[17]: Radix { link: None, 6: Disk[14] }
4414Disk[25]: Root { radix: Disk[17] }
4415Disk[29]: Key { key: 62 63 64 65 66 67 68 }
4416Disk[38]: Link { value: 9029, next: None }
4417Disk[42]: Leaf { key: Disk[29], link: Disk[38] }
4418Disk[45]: Radix { link: None, 1: Disk[14], 2: Disk[42] }
4419Disk[57]: Radix { link: None, 6: Disk[45] }
4420Disk[65]: Root { radix: Disk[57] }
4421Disk[69]: Key { key: 63 64 65 66 67 68 69 }
4422Disk[78]: Link { value: 13398, next: None }
4423Disk[82]: Leaf { key: Disk[69], link: Disk[78] }
4424Disk[85]: Radix { link: None, 1: Disk[14], 2: Disk[42], 3: Disk[82] }
4425Disk[101]: Radix { link: None, 6: Disk[85] }
4426Disk[109]: Root { radix: Disk[101] }
4427Disk[112]: Checksum { start: 0, end: 112, chunk_size_logarithm: 4, checksums.len(): 7 }
4428Disk[176]: Key { key: 64 65 66 67 68 69 6A }
4429Disk[185]: Link { value: 17767, next: None }
4430Disk[190]: Leaf { key: Disk[176], link: Disk[185] }
4431Disk[195]: Radix { link: None, 1: Disk[14], 2: Disk[42], 3: Disk[82], 4: Disk[190] }
4432Disk[215]: Radix { link: None, 6: Disk[195] }
4433Disk[223]: Root { radix: Disk[215] }
4434Disk[227]: Checksum { start: 112, end: 227, chunk_size_logarithm: 4, checksums.len(): 8 }
4435Disk[299]: Key { key: 65 66 67 68 69 6A 68 }
4436Disk[308]: Link { value: 22136, next: None }
4437Disk[313]: Leaf { key: Disk[299], link: Disk[308] }
4438Disk[318]: Radix { link: None, 1: Disk[14], 2: Disk[42], 3: Disk[82], 4: Disk[190], 5: Disk[313] }
4439Disk[342]: Radix { link: None, 6: Disk[318] }
4440Disk[350]: Root { radix: Disk[342] }
4441Disk[355]: Key { key: 66 67 68 69 6A 6B 6C }
4442Disk[364]: Link { value: 30864, next: None }
4443Disk[369]: Leaf { key: Disk[355], link: Disk[364] }
4444Disk[374]: Radix { link: None, 1: Disk[14], 2: Disk[42], 3: Disk[82], 4: Disk[190], 5: Disk[313], 6: Disk[369] }
4445Disk[402]: Radix { link: None, 6: Disk[374] }
4446Disk[410]: Root { radix: Disk[402] }
4447"#
4448        );
4449    }
4450
4451    fn show_checksums(index: &Index) -> String {
4452        let debug_str = format!("{:?}", index);
4453        debug_str
4454            .lines()
4455            .filter_map(|l| {
4456                if l.contains("Checksum") {
4457                    Some(format!("\n                {}", l))
4458                } else {
4459                    None
4460                }
4461            })
4462            .collect::<Vec<_>>()
4463            .concat()
4464    }
4465
4466    #[test]
4467    fn test_checksum_max_chain_len() {
4468        // Test with the given max_chain_len config.
4469        let t = |max_chain_len: u32| {
4470            let dir = tempdir().unwrap();
4471            let path = dir.path().join("i");
4472            let mut index = open_opts()
4473                .checksum_chunk_size_logarithm(7 /* chunk size: 127 */)
4474                .checksum_max_chain_len(max_chain_len)
4475                .open(&path)
4476                .unwrap();
4477            for i in 0..10u8 {
4478                let data: Vec<u8> = if i % 2 == 0 { vec![i] } else { vec![i; 100] };
4479                index.insert(&data, 1).unwrap();
4480                index.flush().unwrap();
4481                index.verify().unwrap();
4482                // If reload from disk, it pass verification too.
4483                let mut index2 = open_opts()
4484                    .checksum_max_chain_len(max_chain_len)
4485                    .open(&path)
4486                    .unwrap();
4487                index2.verify().unwrap();
4488                // Create "racy" writes by flushing from another index.
4489                if i % 3 == 0 {
4490                    index2.insert(&data, 2).unwrap();
4491                    index2.flush().unwrap();
4492                }
4493            }
4494            show_checksums(&index)
4495        };
4496
4497        // Unlimited chain. Chain: 1358 -> 1167 -> 1071 -> 818 -> 738 -> ...
4498        assert_eq!(
4499            t(0),
4500            r#"
4501                Disk[21]: Checksum { start: 0, end: 21, chunk_size_logarithm: 7, checksums.len(): 1 }
4502                Disk[54]: Checksum { start: 0, end: 54, chunk_size_logarithm: 7, checksums.len(): 1 }
4503                Disk[203]: Checksum { start: 0, end: 203, chunk_size_logarithm: 7, checksums.len(): 2 }
4504                Disk[266]: Checksum { start: 203, end: 266, chunk_size_logarithm: 7, checksums.len(): 2 }
4505                Disk[433]: Checksum { start: 266, end: 433, chunk_size_logarithm: 7, checksums.len(): 2 }
4506                Disk[499]: Checksum { start: 433, end: 499, chunk_size_logarithm: 7, checksums.len(): 1 }
4507                Disk[563]: Checksum { start: 433, end: 563, chunk_size_logarithm: 7, checksums.len(): 2 }
4508                Disk[738]: Checksum { start: 563, end: 738, chunk_size_logarithm: 7, checksums.len(): 2 }
4509                Disk[818]: Checksum { start: 738, end: 818, chunk_size_logarithm: 7, checksums.len(): 2 }
4510                Disk[896]: Checksum { start: 818, end: 896, chunk_size_logarithm: 7, checksums.len(): 1 }
4511                Disk[1071]: Checksum { start: 818, end: 1071, chunk_size_logarithm: 7, checksums.len(): 3 }
4512                Disk[1167]: Checksum { start: 1071, end: 1167, chunk_size_logarithm: 7, checksums.len(): 2 }
4513                Disk[1358]: Checksum { start: 1167, end: 1358, chunk_size_logarithm: 7, checksums.len(): 2 }"#
4514        );
4515
4516        // Max chain len = 2. Chain: 1331 -> 1180 -> 0; 872 -> 761 -> 0; ...
4517        assert_eq!(
4518            t(2),
4519            r#"
4520                Disk[21]: Checksum { start: 0, end: 21, chunk_size_logarithm: 7, checksums.len(): 1 }
4521                Disk[54]: Checksum { start: 0, end: 54, chunk_size_logarithm: 7, checksums.len(): 1 }
4522                Disk[203]: Checksum { start: 0, end: 203, chunk_size_logarithm: 7, checksums.len(): 2 }
4523                Disk[266]: Checksum { start: 203, end: 266, chunk_size_logarithm: 7, checksums.len(): 2 }
4524                Disk[433]: Checksum { start: 0, end: 433, chunk_size_logarithm: 7, checksums.len(): 4 }
4525                Disk[514]: Checksum { start: 433, end: 514, chunk_size_logarithm: 7, checksums.len(): 2 }
4526                Disk[586]: Checksum { start: 433, end: 586, chunk_size_logarithm: 7, checksums.len(): 2 }
4527                Disk[761]: Checksum { start: 0, end: 761, chunk_size_logarithm: 7, checksums.len(): 6 }
4528                Disk[872]: Checksum { start: 761, end: 872, chunk_size_logarithm: 7, checksums.len(): 2 }
4529                Disk[950]: Checksum { start: 0, end: 950, chunk_size_logarithm: 7, checksums.len(): 8 }
4530                Disk[1180]: Checksum { start: 0, end: 1180, chunk_size_logarithm: 7, checksums.len(): 10 }
4531                Disk[1331]: Checksum { start: 1180, end: 1331, chunk_size_logarithm: 7, checksums.len(): 2 }
4532                Disk[1522]: Checksum { start: 0, end: 1522, chunk_size_logarithm: 7, checksums.len(): 12 }"#
4533        );
4534
4535        // Max chain len = 1. All have start: 0.
4536        assert_eq!(
4537            t(1),
4538            r#"
4539                Disk[21]: Checksum { start: 0, end: 21, chunk_size_logarithm: 7, checksums.len(): 1 }
4540                Disk[54]: Checksum { start: 0, end: 54, chunk_size_logarithm: 7, checksums.len(): 1 }
4541                Disk[203]: Checksum { start: 0, end: 203, chunk_size_logarithm: 7, checksums.len(): 2 }
4542                Disk[266]: Checksum { start: 0, end: 266, chunk_size_logarithm: 7, checksums.len(): 3 }
4543                Disk[440]: Checksum { start: 0, end: 440, chunk_size_logarithm: 7, checksums.len(): 4 }
4544                Disk[521]: Checksum { start: 0, end: 521, chunk_size_logarithm: 7, checksums.len(): 5 }
4545                Disk[616]: Checksum { start: 0, end: 616, chunk_size_logarithm: 7, checksums.len(): 5 }
4546                Disk[814]: Checksum { start: 0, end: 814, chunk_size_logarithm: 7, checksums.len(): 7 }
4547                Disk[933]: Checksum { start: 0, end: 933, chunk_size_logarithm: 7, checksums.len(): 8 }
4548                Disk[1058]: Checksum { start: 0, end: 1058, chunk_size_logarithm: 7, checksums.len(): 9 }
4549                Disk[1296]: Checksum { start: 0, end: 1296, chunk_size_logarithm: 7, checksums.len(): 11 }
4550                Disk[1455]: Checksum { start: 0, end: 1455, chunk_size_logarithm: 7, checksums.len(): 12 }
4551                Disk[1725]: Checksum { start: 0, end: 1725, chunk_size_logarithm: 7, checksums.len(): 14 }"#
4552        );
4553    }
4554
4555    #[test]
4556    fn test_root_meta() {
4557        let dir = tempdir().unwrap();
4558        let mut index = open_opts().open(dir.path().join("a")).expect("open");
4559        assert!(index.get_meta().is_empty());
4560        let meta = vec![200; 4000];
4561        index.set_meta(&meta);
4562        assert_eq!(index.get_meta(), &meta[..]);
4563        index.flush().expect("flush");
4564        let index = open_opts().open(dir.path().join("a")).expect("open");
4565        assert_eq!(index.get_meta(), &meta[..]);
4566    }
4567
4568    impl<'a> RangeIter<'a> {
4569        fn clone_with_index(&self, index: &'a Index) -> Self {
4570            Self {
4571                completed: self.completed,
4572                index,
4573                front_stack: self.front_stack.clone(),
4574                back_stack: self.back_stack.clone(),
4575            }
4576        }
4577    }
4578
4579    /// Extract keys from the [`RangeIter`]. Verify different iteration
4580    /// directions, and returned link offsets. A `key` has link offset `i`
4581    /// if that key matches `keys[i]`.
4582    fn iter_to_keys(index: &Index, keys: &Vec<&[u8]>, iter: &RangeIter) -> Vec<Vec<u8>> {
4583        let it_forward = iter.clone_with_index(index);
4584        let it_backward = iter.clone_with_index(index);
4585        let mut it_both_ends = iter.clone_with_index(index);
4586
4587        let extract = |v: crate::Result<(Cow<'_, [u8]>, LinkOffset)>| -> Vec<u8> {
4588            let (key, link_offset) = v.unwrap();
4589            let key = key.as_ref();
4590            // Verify link_offset is correct
4591            let ids: Vec<u64> = link_offset
4592                .values(&index)
4593                .collect::<crate::Result<Vec<u64>>>()
4594                .unwrap();
4595            assert!(ids.len() == 1);
4596            assert_eq!(keys[ids[0] as usize], key);
4597            key.to_vec()
4598        };
4599
4600        let keys_forward: Vec<_> = it_forward.map(extract).collect();
4601        let mut keys_backward: Vec<_> = it_backward.rev().map(extract).collect();
4602        keys_backward.reverse();
4603        assert_eq!(keys_forward, keys_backward);
4604
4605        // Forward and backward iterators should not overlap
4606        let mut keys_both_ends = Vec::new();
4607        for i in 0..(keys_forward.len() + 2) {
4608            if let Some(v) = it_both_ends.next() {
4609                keys_both_ends.insert(i, extract(v));
4610            }
4611            if let Some(v) = it_both_ends.next_back() {
4612                keys_both_ends.insert(i + 1, extract(v));
4613            }
4614        }
4615        assert_eq!(keys_forward, keys_both_ends);
4616
4617        keys_forward
4618    }
4619
4620    /// Test `Index::range` against `BTreeSet::range`. `tree` specifies keys.
4621    fn test_range_against_btreeset(tree: BTreeSet<&[u8]>) {
4622        let dir = tempdir().unwrap();
4623        let mut index = open_opts().open(dir.path().join("a")).unwrap();
4624        let keys: Vec<&[u8]> = tree.iter().cloned().collect();
4625        for (i, key) in keys.iter().enumerate() {
4626            index.insert(key, i as u64).unwrap();
4627        }
4628
4629        let range_test = |start: Bound<&[u8]>, end: Bound<&[u8]>| {
4630            let range = (start, end);
4631            let iter = index.range(range).unwrap();
4632            let expected_keys: Vec<Vec<u8>> = tree
4633                .range::<&[u8], _>((start, end))
4634                .map(|v| v.to_vec())
4635                .collect();
4636            let selected_keys: Vec<Vec<u8>> = iter_to_keys(&index, &keys, &iter);
4637            assert_eq!(selected_keys, expected_keys);
4638        };
4639
4640        // Generate key variants based on existing keys. Generated keys do not
4641        // exist int the index. Therefore the test is more interesting.
4642        let mut variant_keys = Vec::new();
4643        for base_key in keys.iter() {
4644            // One byte appended
4645            for b in [0x00, 0x77, 0xff] {
4646                let mut key = base_key.to_vec();
4647                key.push(b);
4648                variant_keys.push(key);
4649            }
4650
4651            // Last byte mutated, or removed
4652            if !base_key.is_empty() {
4653                let mut key = base_key.to_vec();
4654                let last = *key.last().unwrap();
4655                *key.last_mut().unwrap() = last.wrapping_add(1);
4656                variant_keys.push(key.clone());
4657                *key.last_mut().unwrap() = last.wrapping_sub(1);
4658                variant_keys.push(key.clone());
4659                key.pop();
4660                variant_keys.push(key);
4661            }
4662        }
4663
4664        // Remove duplicated entries.
4665        let variant_keys = variant_keys
4666            .iter()
4667            .map(|v| v.as_ref())
4668            .filter(|k| !tree.contains(k))
4669            .collect::<BTreeSet<&[u8]>>()
4670            .iter()
4671            .cloned()
4672            .collect::<Vec<&[u8]>>();
4673
4674        range_test(Unbounded, Unbounded);
4675
4676        for key1 in keys.iter().chain(variant_keys.iter()) {
4677            range_test(Unbounded, Included(key1));
4678            range_test(Unbounded, Excluded(key1));
4679            range_test(Included(key1), Unbounded);
4680            range_test(Excluded(key1), Unbounded);
4681
4682            for key2 in keys.iter().chain(variant_keys.iter()) {
4683                if key1 < key2 {
4684                    range_test(Excluded(key1), Excluded(key2));
4685                }
4686                if key1 <= key2 {
4687                    range_test(Excluded(key1), Included(key2));
4688                    range_test(Included(key1), Excluded(key2));
4689                    range_test(Included(key1), Included(key2));
4690                }
4691            }
4692        }
4693    }
4694
4695    #[test]
4696    fn test_range_example1() {
4697        test_range_against_btreeset(
4698            vec![
4699                &[0x00, 0x00, 0x00][..],
4700                &[0x10, 0x0d, 0x01],
4701                &[0x10, 0x0e],
4702                &[0x10, 0x0f],
4703                &[0x10, 0x0f, 0xff],
4704                &[0x10, 0x10, 0x01],
4705                &[0x10, 0x11],
4706                &[0xff],
4707            ]
4708            .iter()
4709            .cloned()
4710            .collect(),
4711        );
4712    }
4713
4714    #[test]
4715    fn test_clear_dirty() {
4716        let dir = tempdir().unwrap();
4717        let mut index = open_opts().open(dir.path().join("a")).unwrap();
4718
4719        index.insert(&"foo", 2).unwrap();
4720        assert!(!index.get(&"foo").unwrap().is_null());
4721
4722        index.clear_dirty();
4723        assert!(index.get(&"foo").unwrap().is_null());
4724
4725        index.set_meta(&vec![42]);
4726        index.insert(&"foo", 1).unwrap();
4727        index.flush().unwrap();
4728
4729        index.set_meta(&vec![43]);
4730        index.insert(&"bar", 2).unwrap();
4731
4732        assert_eq!(index.get_original_meta(), [42]);
4733        index.clear_dirty();
4734
4735        assert_eq!(index.get_meta(), [42]);
4736        assert!(index.get(&"bar").unwrap().is_null());
4737    }
4738
4739    #[test]
4740    fn test_meta_only_flush() {
4741        let dir = tempdir().unwrap();
4742        let mut index = open_opts().open(dir.path().join("a")).unwrap();
4743
4744        index.set_meta(b"foo");
4745        index.flush().unwrap();
4746
4747        let mut index = open_opts().open(dir.path().join("a")).unwrap();
4748        assert_eq!(index.get_meta(), b"foo");
4749
4750        index.set_meta(b"bar");
4751        let len1 = index.flush().unwrap();
4752
4753        let mut index = open_opts().open(dir.path().join("a")).unwrap();
4754        assert_eq!(index.get_meta(), b"bar");
4755        index.set_meta(b"bar");
4756        let len2 = index.flush().unwrap();
4757        assert_eq!(len1, len2);
4758    }
4759
4760    quickcheck! {
4761        fn test_single_value(map: HashMap<Vec<u8>, u64>, flush: bool) -> bool {
4762            let dir = tempdir().unwrap();
4763            let mut index = open_opts().open(dir.path().join("a")).expect("open");
4764
4765            for (key, value) in &map {
4766                index.insert(key, *value).expect("insert");
4767            }
4768
4769            if flush {
4770                let len = index.flush().expect("flush");
4771                index = open_opts().logical_len(len.into()).open(dir.path().join("a")).unwrap();
4772            }
4773
4774            map.iter().all(|(key, value)| {
4775                let link_offset = index.get(key).expect("lookup");
4776                assert!(!link_offset.is_null());
4777                link_offset.value_and_next(&index).unwrap().0 == *value
4778            })
4779        }
4780
4781        fn test_multiple_values(map: HashMap<Vec<u8>, Vec<u64>>) -> bool {
4782            let dir = tempdir().unwrap();
4783            let mut index = open_opts().open(dir.path().join("a")).expect("open");
4784            let mut index_mem = open_opts().checksum_chunk_size_logarithm(20).create_in_memory().unwrap();
4785
4786            for (key, values) in &map {
4787                for value in values.iter().rev() {
4788                    index.insert(key, *value).expect("insert");
4789                    index_mem.insert(key, *value).expect("insert");
4790                }
4791                if values.len() == 0 {
4792                    // Flush sometimes.
4793                    index.flush().expect("flush");
4794                }
4795            }
4796
4797            map.iter().all(|(key, values)| {
4798                let v: Vec<u64> =
4799                    index.get(key).unwrap().values(&index).map(|v| v.unwrap()).collect();
4800                let v_mem: Vec<u64> =
4801                    index_mem.get(key).unwrap().values(&index_mem).map(|v| v.unwrap()).collect();
4802                v == *values && v_mem == *values
4803            })
4804        }
4805
4806        fn test_range_quickcheck(keys: Vec<Vec<u8>>) -> bool {
4807            let size_limit = if cfg!(debug_assertions) {
4808                4
4809            } else {
4810                16
4811            };
4812            let size = keys.len() % size_limit + 1;
4813            let tree: BTreeSet<&[u8]> = keys.iter().take(size).map(|v| v.as_ref()).collect();
4814            test_range_against_btreeset(tree);
4815            true
4816        }
4817
4818        fn test_deletion(keys_deleted: Vec<(Vec<u8>, bool)>) -> bool {
4819            // Compare Index with BTreeSet
4820            let mut set = BTreeSet::<Vec<u8>>::new();
4821            let mut index = in_memory_index();
4822            keys_deleted.into_iter().all(|(key, deleted)| {
4823                if deleted {
4824                    set.remove(&key);
4825                    index.remove(&key).unwrap();
4826                } else {
4827                    set.insert(key.clone());
4828                    index.insert(&key, 1).unwrap();
4829                }
4830                index
4831                    .range(..)
4832                    .unwrap()
4833                    .map(|s| s.unwrap().0.as_ref().to_vec())
4834                    .collect::<Vec<_>>()
4835                    == set.iter().cloned().collect::<Vec<_>>()
4836            })
4837        }
4838
4839        fn test_deletion_prefix(keys_deleted: Vec<(Vec<u8>, bool)>) -> bool {
4840            let mut set = BTreeSet::<Vec<u8>>::new();
4841            let mut index = in_memory_index();
4842            keys_deleted.into_iter().all(|(key, deleted)| {
4843                if deleted {
4844                    // BTreeSet does not have remove_prefix. Emulate it.
4845                    let to_delete = set
4846                        .iter()
4847                        .filter(|k| k.starts_with(&key))
4848                        .cloned()
4849                        .collect::<Vec<_>>();
4850                    for key in to_delete {
4851                        set.remove(&key);
4852                    }
4853                    index.remove_prefix(&key).unwrap();
4854                } else {
4855                    set.insert(key.clone());
4856                    index.insert(&key, 1).unwrap();
4857                }
4858                index
4859                    .range(..)
4860                    .unwrap()
4861                    .map(|s| s.unwrap().0.as_ref().to_vec())
4862                    .collect::<Vec<_>>()
4863                    == set.iter().cloned().collect::<Vec<_>>()
4864            })
4865        }
4866    }
4867}