simd_r_drive/storage_engine/entry_iterator.rs
1use crate::storage_engine::constants::*;
2use crate::storage_engine::digest::*;
3use memmap2::Mmap;
4use simd_r_drive_entry_handle::{EntryHandle, EntryMetadata};
5use std::collections::HashSet;
6use std::sync::Arc;
7
8/// Iterator for traversing entries in the append-only storage.
9///
10/// This iterator scans entries stored in the memory-mapped file (`mmap`),
11/// reading each entry's metadata and returning unique key-value pairs.
12/// The iteration proceeds **backward**, following the chain of previous
13/// offsets stored in each entry.
14///
15/// ## Behavior:
16/// - **Starts at `tail_offset`** and moves backward using the
17/// `prev_offset` field.
18/// - **Ensures unique keys** by tracking seen hashes in a `HashSet`.
19/// - **Skips deleted entries**, which are represented by empty data.
20/// - **Stops when reaching an invalid or out-of-bounds offset.**
21pub struct EntryIterator {
22 mmap: Arc<Mmap>, // Borrow from Arc<Mmap> (zero-copy)
23 cursor: u64,
24 seen_keys: HashSet<u64, Xxh3BuildHasher>,
25}
26
27impl EntryIterator {
28 /// Creates a new iterator for scanning storage entries.
29 ///
30 /// Initializes an iterator starting at the provided `tail_offset`
31 /// and moving backward through the storage file. The iterator
32 /// ensures that only the most recent version of each key is
33 /// returned.
34 ///
35 /// # Parameters:
36 /// - `mmap`: A reference to the memory-mapped file.
37 /// - `tail_offset`: The file offset where iteration starts.
38 ///
39 /// # Returns:
40 /// - A new `EntryIterator` instance.
41 pub fn new(mmap: Arc<Mmap>, tail_offset: u64) -> Self {
42 Self {
43 mmap,
44 cursor: tail_offset,
45 seen_keys: HashSet::with_hasher(Xxh3BuildHasher),
46 }
47 }
48
49 #[inline]
50 fn prepad_len(offset: u64) -> usize {
51 let a = PAYLOAD_ALIGNMENT;
52 ((a - (offset % a)) & (a - 1)) as usize
53 }
54}
55
56impl Iterator for EntryIterator {
57 type Item = EntryHandle;
58
59 /// Advances the iterator to the next valid entry.
60 ///
61 /// Reads and parses the metadata for the current entry, determines
62 /// its boundaries, and extracts its data. If the key has already
63 /// been seen, the iterator skips it to ensure that only the latest
64 /// version is returned.
65 ///
66 /// # Returns:
67 /// - `Some(&[u8])` containing the entry data if valid.
68 /// - `None` when no more valid entries are available.
69 fn next(&mut self) -> Option<Self::Item> {
70 // Stop iteration if cursor is out of valid range
71 if self.cursor < METADATA_SIZE as u64 || self.mmap.is_empty() {
72 return None;
73 }
74
75 // Locate metadata at the current cursor position
76 let metadata_offset = (self.cursor - METADATA_SIZE as u64) as usize;
77 if metadata_offset + METADATA_SIZE > self.mmap.len() {
78 return None;
79 }
80 let metadata_bytes = &self.mmap[metadata_offset..metadata_offset + METADATA_SIZE];
81 let metadata = EntryMetadata::deserialize(metadata_bytes);
82
83 // Stored `prev_offset` is the **previous tail**. Derive the
84 // aligned payload start for regular values. For tombstones
85 // (single NULL byte), also support the no-prepad case.
86 let prev_tail = metadata.prev_offset;
87 let derived = prev_tail + Self::prepad_len(prev_tail) as u64;
88
89 let entry_end = metadata_offset;
90 let mut entry_start = derived as usize;
91
92 // Tombstone (legacy, no-prepad).
93 if entry_end > prev_tail as usize
94 && entry_end - prev_tail as usize == 1
95 && self.mmap[prev_tail as usize..entry_end] == NULL_BYTE
96 {
97 entry_start = prev_tail as usize;
98 }
99
100 // Ensure valid entry bounds before reading
101 if entry_start >= entry_end || entry_end > self.mmap.len() {
102 return None;
103 }
104
105 // Move cursor backward to follow the chain (by tails)
106 self.cursor = metadata.prev_offset;
107
108 // Skip duplicate keys (ensuring only the latest value is
109 // returned)
110 if !self.seen_keys.insert(metadata.key_hash) {
111 return self.next(); // Skip if already seen
112 }
113
114 let entry_data = &self.mmap[entry_start..entry_end];
115
116 // Skip deleted entries (denoted by single null byte)
117 if entry_end - entry_start == 1 && entry_data == NULL_BYTE {
118 return self.next();
119 }
120
121 Some(EntryHandle {
122 mmap_arc: Arc::clone(&self.mmap),
123 range: entry_start..entry_end,
124 metadata,
125 })
126 }
127}