Skip to main content

simd_r_drive/storage_engine/
entry_iterator.rs

1use crate::storage_engine::constants::*;
2use crate::storage_engine::digest::*;
3use memmap2::Mmap;
4use simd_r_drive_entry_handle::{EntryHandle, EntryMetadata};
5use std::collections::HashSet;
6use std::sync::Arc;
7
8/// Iterator for traversing entries in the append-only storage.
9///
10/// This iterator scans entries stored in the memory-mapped file (`mmap`),
11/// reading each entry's metadata and returning unique key-value pairs.
12/// The iteration proceeds **backward**, following the chain of previous
13/// offsets stored in each entry.
14///
15/// ## Behavior:
16/// - **Starts at `tail_offset`** and moves backward using the
17///   `prev_offset` field.
18/// - **Ensures unique keys** by tracking seen hashes in a `HashSet`.
19/// - **Skips deleted entries**, which are represented by empty data.
20/// - **Stops when reaching an invalid or out-of-bounds offset.**
21pub struct EntryIterator {
22    mmap: Arc<Mmap>, // Borrow from Arc<Mmap> (zero-copy)
23    cursor: u64,
24    seen_keys: HashSet<u64, Xxh3BuildHasher>,
25}
26
27impl EntryIterator {
28    /// Creates a new iterator for scanning storage entries.
29    ///
30    /// Initializes an iterator starting at the provided `tail_offset`
31    /// and moving backward through the storage file. The iterator
32    /// ensures that only the most recent version of each key is
33    /// returned.
34    ///
35    /// # Parameters:
36    /// - `mmap`: A reference to the memory-mapped file.
37    /// - `tail_offset`: The file offset where iteration starts.
38    ///
39    /// # Returns:
40    /// - A new `EntryIterator` instance.
41    pub fn new(mmap: Arc<Mmap>, tail_offset: u64) -> Self {
42        Self {
43            mmap,
44            cursor: tail_offset,
45            seen_keys: HashSet::with_hasher(Xxh3BuildHasher),
46        }
47    }
48
49    #[inline]
50    fn prepad_len(offset: u64) -> usize {
51        let a = PAYLOAD_ALIGNMENT;
52        ((a - (offset % a)) & (a - 1)) as usize
53    }
54}
55
56impl Iterator for EntryIterator {
57    type Item = EntryHandle;
58
59    /// Advances the iterator to the next valid entry.
60    ///
61    /// Reads and parses the metadata for the current entry, determines
62    /// its boundaries, and extracts its data. If the key has already
63    /// been seen, the iterator skips it to ensure that only the latest
64    /// version is returned.
65    ///
66    /// # Returns:
67    /// - `Some(&[u8])` containing the entry data if valid.
68    /// - `None` when no more valid entries are available.
69    fn next(&mut self) -> Option<Self::Item> {
70        // Stop iteration if cursor is out of valid range
71        if self.cursor < METADATA_SIZE as u64 || self.mmap.is_empty() {
72            return None;
73        }
74
75        // Locate metadata at the current cursor position
76        let metadata_offset = (self.cursor - METADATA_SIZE as u64) as usize;
77        if metadata_offset + METADATA_SIZE > self.mmap.len() {
78            return None;
79        }
80        let metadata_bytes = &self.mmap[metadata_offset..metadata_offset + METADATA_SIZE];
81        let metadata = EntryMetadata::deserialize(metadata_bytes);
82
83        // Stored `prev_offset` is the **previous tail**. Derive the
84        // aligned payload start for regular values. For tombstones
85        // (single NULL byte), also support the no-prepad case.
86        let prev_tail = metadata.prev_offset;
87        let derived = prev_tail + Self::prepad_len(prev_tail) as u64;
88
89        let entry_end = metadata_offset;
90        let mut entry_start = derived as usize;
91
92        // Tombstone (legacy, no-prepad).
93        if entry_end > prev_tail as usize
94            && entry_end - prev_tail as usize == 1
95            && self.mmap[prev_tail as usize..entry_end] == NULL_BYTE
96        {
97            entry_start = prev_tail as usize;
98        }
99
100        // Ensure valid entry bounds before reading
101        if entry_start >= entry_end || entry_end > self.mmap.len() {
102            return None;
103        }
104
105        // Move cursor backward to follow the chain (by tails)
106        self.cursor = metadata.prev_offset;
107
108        // Skip duplicate keys (ensuring only the latest value is
109        // returned)
110        if !self.seen_keys.insert(metadata.key_hash) {
111            return self.next(); // Skip if already seen
112        }
113
114        let entry_data = &self.mmap[entry_start..entry_end];
115
116        // Skip deleted entries (denoted by single null byte)
117        if entry_end - entry_start == 1 && entry_data == NULL_BYTE {
118            return self.next();
119        }
120
121        Some(EntryHandle {
122            mmap_arc: Arc::clone(&self.mmap),
123            range: entry_start..entry_end,
124            metadata,
125        })
126    }
127}