Skip to main content

simd_r_drive/storage_engine/
data_store.rs

1use crate::storage_engine::constants::*;
2use crate::storage_engine::digest::{
3    Xxh3BuildHasher, compute_checksum, compute_hash, compute_hash_batch,
4};
5use crate::storage_engine::simd_copy;
6use crate::storage_engine::{EntryIterator, EntryStream, KeyIndexer};
7use crate::traits::{DataStoreReader, DataStoreWriter};
8use crate::utils::verify_file_existence;
9use memmap2::Mmap;
10use simd_r_drive_entry_handle::{EntryHandle, EntryMetadata};
11use std::collections::HashSet;
12use std::convert::From;
13use std::fs::{File, OpenOptions};
14use std::io::{BufWriter, Error, Read, Result, Seek, SeekFrom, Write};
15use std::path::{Path, PathBuf};
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
18use tracing::{debug, info, warn};
19
20#[cfg(any(test, debug_assertions))]
21use simd_r_drive_entry_handle::debug_assert_aligned_offset;
22
23#[cfg(feature = "parallel")]
24use rayon::prelude::*;
25
26/// Append-Only Storage Engine
27pub struct DataStore {
28    file: Arc<RwLock<BufWriter<File>>>,
29    mmap: Arc<Mutex<Arc<Mmap>>>,
30    tail_offset: AtomicU64,
31    key_indexer: Arc<RwLock<KeyIndexer>>,
32    path: PathBuf,
33}
34
35/// Provides a **consuming sequential** iterator over the valid entries.
36///
37/// This allows a `DataStore` to be consumed to produce a sequential
38/// iterator. For non-consuming iteration, iterate over a reference
39/// (`&storage`).
40///
41/// The iterator produced is **sequential**. For parallel processing,
42/// enable the `parallel` feature and use the `.par_iter_entries()`
43/// method instead.
44impl IntoIterator for DataStore {
45    type Item = EntryHandle;
46    type IntoIter = EntryIterator;
47
48    fn into_iter(self) -> Self::IntoIter {
49        self.iter_entries()
50    }
51}
52
53impl From<PathBuf> for DataStore {
54    /// Creates an `DataStore` instance from a `PathBuf`.
55    ///
56    /// This allows creating a storage instance **directly from a file
57    /// path**.
58    ///
59    /// # Panics:
60    /// - If the file cannot be opened or mapped into memory.
61    fn from(path: PathBuf) -> Self {
62        DataStore::open(&path).expect("Failed to open storage file")
63    }
64}
65
66impl DataStore {
67    /// Opens an **existing** or **new** append-only storage file.
68    ///
69    /// This function:
70    /// 1. **Opens the file** in read/write mode (creating it if
71    ///    necessary).
72    /// 2. **Maps the file** into memory using `mmap` for fast access.
73    /// 3. **Recovers the valid chain**, ensuring **data integrity**.
74    /// 4. **Re-maps** the file after recovery to reflect the correct
75    ///    state.
76    /// 5. **Builds an in-memory index** for **fast key lookups**.
77    ///
78    /// # Parameters:
79    /// - `path`: The **file path** where the storage is located.
80    ///
81    /// # Returns:
82    /// - `Ok(DataStore)`: A **new storage instance**.
83    /// - `Err(std::io::Error)`: If any file operation fails.
84    pub fn open(path: &Path) -> Result<Self> {
85        let file = Self::open_file_in_append_mode(path)?;
86        let file_len = file.get_ref().metadata()?.len();
87
88        let mmap = Self::init_mmap(&file)?;
89        let final_len = Self::recover_valid_chain(&mmap, file_len)?;
90
91        if final_len < file_len {
92            warn!(
93                "Truncating corrupted data in {} from offset {} to {}.",
94                path.display(),
95                final_len,
96                file_len
97            );
98            drop(mmap);
99            drop(file);
100            let file = OpenOptions::new().read(true).write(true).open(path)?;
101            file.set_len(final_len)?;
102            file.sync_all()?;
103            return Self::open(path);
104        }
105
106        let mmap_arc = Arc::new(mmap);
107        let mmap_for_indexer: &'static Mmap = unsafe { &*(Arc::as_ptr(&mmap_arc)) };
108        let key_indexer = KeyIndexer::build(mmap_for_indexer, final_len);
109
110        Ok(Self {
111            file: Arc::new(RwLock::new(file)),
112            mmap: Arc::new(Mutex::new(mmap_arc)),
113            tail_offset: final_len.into(),
114            key_indexer: Arc::new(RwLock::new(key_indexer)),
115            path: path.to_path_buf(),
116        })
117    }
118
119    /// Opens an **existing** append-only storage file.
120    ///
121    /// This function verifies that the file exists before attempting to
122    /// open it. If the file does not exist or is not a valid file, an
123    /// error is returned.
124    ///
125    /// # Parameters:
126    /// - `path`: The **file path** of the storage file.
127    ///
128    /// # Returns:
129    /// - `Ok(DataStore)`: A **new storage instance** if the file exists
130    ///   and can be opened.
131    /// - `Err(std::io::Error)`: If the file does not exist or is
132    ///   invalid.
133    ///
134    /// # Notes:
135    /// - Unlike `open()`, this function **does not create** a new
136    ///   storage file if the specified file does not exist.
137    /// - If the file is **missing** or is not a regular file, an error
138    ///   is returned.
139    /// - This is useful in scenarios where the caller needs to **ensure**
140    ///   that they are working with an already existing storage file.
141    pub fn open_existing(path: &Path) -> Result<Self> {
142        verify_file_existence(path)?;
143        Self::open(path)
144    }
145
146    /// Workaround for directly opening in **append mode** causing
147    /// permissions issues on Windows
148    ///
149    /// The file is opened normally and the **cursor is moved to the
150    /// end.
151    ///
152    /// Unix family unaffected by this issue, but this standardizes
153    /// their handling.
154    ///
155    /// # Parameters:
156    /// - `path`: The **file path** of the storage file.
157    ///
158    /// # Returns:
159    /// - `Ok(BufWriter<File>)`: A buffered writer pointing to the file.
160    /// - `Err(std::io::Error)`: If the file could not be opened.
161    fn open_file_in_append_mode(path: &Path) -> Result<BufWriter<File>> {
162        let mut file = OpenOptions::new()
163            .read(true)
164            .write(true)
165            .create(true)
166            .truncate(false)
167            .open(path)?;
168        file.seek(SeekFrom::End(0))?;
169        Ok(BufWriter::new(file))
170    }
171
172    fn init_mmap(file: &BufWriter<File>) -> Result<Mmap> {
173        unsafe { memmap2::MmapOptions::new().map(file.get_ref()) }
174    }
175
176    /// Re-maps the storage file and updates the key index after a write
177    /// operation.
178    ///
179    /// This function performs two key tasks:
180    /// 1. **Re-maps the file (`mmap`)**: Ensures that newly written data
181    ///    is visible to readers by creating a fresh memory-mapped view of
182    ///    the storage file.
183    /// 2. **Updates the key index**: Inserts new key hash-to-offset
184    ///    mappings into the in-memory key index, ensuring efficient key
185    ///    lookups for future reads.
186    ///
187    /// # Parameters:
188    /// - `write_guard`: A locked reference to the `BufWriter<File>`,
189    ///   ensuring that writes are completed before remapping and
190    ///   indexing.
191    /// - `key_hash_offsets`: A slice of `(key_hash, tail_offset)`
192    ///   tuples containing the latest key mappings to be added to the
193    ///   index.
194    /// - `tail_offset`: The **new absolute file offset** after the most
195    ///   recent write. This represents the byte position where the next
196    ///   write operation should begin. It is updated to reflect the
197    ///   latest valid data in the storage.
198    ///
199    /// # Returns:
200    /// - `Ok(())` if the reindexing process completes successfully.
201    /// - `Err(std::io::Error)` if file metadata retrieval, memory
202    ///   mapping, or key index updates fail.
203    ///
204    /// # Important:
205    /// - **The write operation must be flushed before calling
206    ///   `reindex`** to ensure all pending writes are persisted and
207    ///   visible in the new memory-mapped file. This prevents potential
208    ///   inconsistencies where written data is not reflected in the
209    ///   remapped view.
210    ///
211    /// # Safety:
212    /// - This function should be called **immediately after a write
213    ///   operation** to ensure the file is in a consistent state before
214    ///   remapping.
215    /// - The function acquires locks on both the `mmap` and
216    ///   `key_indexer` to prevent race conditions while updating shared
217    ///   structures.
218    ///
219    /// # Locks Acquired:
220    /// - `mmap` (`Mutex<Arc<Mmap>>`) is locked to update the
221    ///   memory-mapped file.
222    /// - `key_indexer` (`RwLock<HashMap<u64, u64>>`) is locked to
223    ///   modify key mappings.
224    fn reindex(
225        &self,
226        write_guard: &std::sync::RwLockWriteGuard<'_, BufWriter<File>>,
227        key_hash_offsets: &[(u64, u64)],
228        tail_offset: u64,
229        deleted_keys: Option<&HashSet<u64>>,
230    ) -> std::io::Result<()> {
231        let new_mmap = Self::init_mmap(write_guard)?;
232        let mut mmap_guard = self.mmap.lock().unwrap();
233        let mut key_indexer_guard = self
234            .key_indexer
235            .write()
236            .map_err(|_| std::io::Error::other("Failed to acquire index lock"))?;
237
238        for (key_hash, offset) in key_hash_offsets.iter() {
239            if deleted_keys
240                .as_ref()
241                .is_some_and(|set| set.contains(key_hash))
242            {
243                key_indexer_guard.remove(key_hash);
244            } else {
245                // Handle the Result from the new insert method
246                if let Err(e) = key_indexer_guard.insert(*key_hash, *offset) {
247                    // A collision was detected on write. The entire batch
248                    // operation should fail to prevent an inconsistent state.
249                    warn!("Write operation aborted due to hash collision: {}", e);
250                    return Err(std::io::Error::other(e));
251                }
252            }
253        }
254
255        *mmap_guard = Arc::new(new_mmap);
256        self.tail_offset.store(tail_offset, Ordering::Release);
257
258        Ok(())
259    }
260
261    /// Returns the storage file path.
262    ///
263    /// # Returns:
264    /// - A `PathBuf` containing the path to the storage file.
265    pub fn get_path(&self) -> PathBuf {
266        self.path.clone()
267    }
268
269    /// Retrieves an iterator over all valid entries in the storage.
270    ///
271    /// This iterator allows scanning the storage file and retrieving
272    /// **only the most recent** versions of each key.
273    ///
274    /// # Returns:
275    /// - An `EntryIterator` instance for iterating over valid entries.
276    pub fn iter_entries(&self) -> EntryIterator {
277        let mmap_clone = self.get_mmap_arc();
278        let tail_offset = self.tail_offset.load(Ordering::Acquire);
279        EntryIterator::new(mmap_clone, tail_offset)
280    }
281
282    /// Provides a parallel iterator over all valid entries in the
283    /// storage.
284    ///
285    /// This method is only available when the `parallel` feature is
286    /// enabled. It leverages the Rayon crate to process entries across
287    /// multiple threads, which can be significantly faster for bulk
288    /// operations on multi-core machines.
289    ///
290    /// The iterator is efficient, collecting only the necessary entry
291    /// offsets first and then constructing the `EntryHandle` objects in
292    /// parallel.
293    ///
294    /// # Returns
295    /// - A Rayon `ParallelIterator` that yields `EntryHandle` items.
296    #[cfg(feature = "parallel")]
297    pub fn par_iter_entries(&self) -> impl ParallelIterator<Item = EntryHandle> {
298        // First, acquire a read lock and collect all the packed offset
299        // values. This is a short, fast operation.
300        let key_indexer_guard = self.key_indexer.read().unwrap();
301        let packed_values: Vec<u64> = key_indexer_guard.values().cloned().collect();
302        drop(key_indexer_guard); // Release the lock as soon as possible.
303
304        // Clone the mmap Arc once to be moved into the parallel iterator.
305        let mmap_arc = self.get_mmap_arc();
306
307        // Create a parallel iterator over the collected offsets. The
308        // `filter_map` operation is the part that will run in parallel
309        // across threads.
310        packed_values.into_par_iter().filter_map(move |packed| {
311            let (_tag, offset) = KeyIndexer::unpack(packed);
312            let offset = offset as usize;
313
314            // This logic is a simplified, read-only version of
315            // `read_entry_with_context`. We perform basic bounds checks
316            // to ensure safety.
317            if offset + METADATA_SIZE > mmap_arc.len() {
318                return None;
319            }
320
321            let metadata_bytes = &mmap_arc[offset..offset + METADATA_SIZE];
322            let metadata = EntryMetadata::deserialize(metadata_bytes);
323
324            // Derive aligned start from previous tail. For tombstones
325            // (single NULL byte without pre-pad), also support legacy
326            // no-prepad case.
327            let prev_tail = metadata.prev_offset;
328            let derived = prev_tail + Self::prepad_len(prev_tail) as u64;
329            let entry_end = offset;
330
331            let mut entry_start = derived as usize;
332
333            // Tombstone detection (no pre-pad case).
334            if entry_end > prev_tail as usize
335                && entry_end - prev_tail as usize == 1
336                && mmap_arc[prev_tail as usize..entry_end] == NULL_BYTE
337            {
338                entry_start = prev_tail as usize;
339            }
340
341            if entry_start >= entry_end || entry_end > mmap_arc.len() {
342                return None;
343            }
344
345            // Filter out tombstones (single NULL byte region).
346            if entry_end - entry_start == 1 && mmap_arc[entry_start..entry_end] == NULL_BYTE {
347                return None;
348            }
349
350            #[cfg(any(test, debug_assertions))]
351            {
352                debug_assert_aligned_offset(entry_start as u64);
353            }
354
355            Some(EntryHandle {
356                mmap_arc: mmap_arc.clone(),
357                range: entry_start..entry_end,
358                metadata,
359            })
360        })
361    }
362
363    /// Recovers the **latest valid chain** of entries from the storage
364    /// file.
365    ///
366    /// This function **scans backward** through the file, verifying that
367    /// each entry correctly references the previous offset. It
368    /// determines the **last valid storage position** to ensure data
369    /// integrity.
370    ///
371    /// # How It Works:
372    /// - Scans from the last written offset **backward**.
373    /// - Ensures each entry correctly points to its **previous offset**.
374    /// - Stops at the **deepest valid chain** that reaches offset `0`.
375    ///
376    /// # Parameters:
377    /// - `mmap`: A reference to the **memory-mapped file**.
378    /// - `file_len`: The **current size** of the file in bytes.
379    ///
380    /// # Returns:
381    /// - `Ok(final_valid_offset)`: The last **valid** byte offset.
382    /// - `Err(std::io::Error)`: If a file read or integrity check fails
383    fn recover_valid_chain(mmap: &Mmap, file_len: u64) -> Result<u64> {
384        if file_len < METADATA_SIZE as u64 {
385            return Ok(0);
386        }
387
388        let mut cursor = file_len;
389        let mut best_valid_offset = None;
390        while cursor >= METADATA_SIZE as u64 {
391            let metadata_offset = cursor - METADATA_SIZE as u64;
392            let metadata_bytes =
393                &mmap[metadata_offset as usize..(metadata_offset as usize + METADATA_SIZE)];
394            let metadata = EntryMetadata::deserialize(metadata_bytes);
395
396            // Stored `prev_offset` is the **previous tail**.
397            let prev_tail = metadata.prev_offset;
398
399            // Derive start; handle tombstone (no pre-pad) as a special
400            // case so chain length math stays correct.
401            let derived_start = prev_tail + Self::prepad_len(prev_tail) as u64;
402            let entry_end = metadata_offset;
403
404            let entry_start = if entry_end > prev_tail
405                && entry_end - prev_tail == 1
406                && mmap[prev_tail as usize..entry_end as usize] == NULL_BYTE
407            {
408                prev_tail
409            } else {
410                #[cfg(any(test, debug_assertions))]
411                {
412                    debug_assert_aligned_offset(derived_start);
413                }
414
415                derived_start
416            };
417
418            if entry_start >= metadata_offset {
419                cursor -= 1;
420                continue;
421            }
422
423            let mut chain_valid = true;
424            let mut back_cursor = prev_tail; // walk by tails
425            // size of current entry data region
426            let mut total_size = (metadata_offset - entry_start) + METADATA_SIZE as u64;
427
428            while back_cursor != 0 {
429                if back_cursor < METADATA_SIZE as u64 {
430                    chain_valid = false;
431                    break;
432                }
433
434                let prev_metadata_offset = back_cursor - METADATA_SIZE as u64;
435                if prev_metadata_offset as usize + METADATA_SIZE > mmap.len() {
436                    chain_valid = false;
437                    break;
438                }
439
440                let prev_metadata_bytes = &mmap[prev_metadata_offset as usize
441                    ..(prev_metadata_offset as usize + METADATA_SIZE)];
442                let prev_metadata = EntryMetadata::deserialize(prev_metadata_bytes);
443
444                let prev_prev_tail = prev_metadata.prev_offset;
445
446                // Size of the previous entry’s data region
447                let prev_entry_start = if prev_metadata_offset > prev_prev_tail
448                    && prev_metadata_offset - prev_prev_tail == 1
449                    && mmap[prev_prev_tail as usize..prev_metadata_offset as usize] == NULL_BYTE
450                {
451                    prev_prev_tail
452                } else {
453                    prev_prev_tail + Self::prepad_len(prev_prev_tail) as u64
454                };
455
456                if prev_entry_start >= prev_metadata_offset {
457                    chain_valid = false;
458                    break;
459                }
460
461                let entry_size = prev_metadata_offset.saturating_sub(prev_entry_start);
462
463                total_size += entry_size + METADATA_SIZE as u64;
464
465                if prev_prev_tail >= prev_metadata_offset {
466                    chain_valid = false;
467                    break;
468                }
469
470                back_cursor = prev_prev_tail;
471            }
472
473            if chain_valid && back_cursor == 0 && total_size <= file_len {
474                best_valid_offset = Some(metadata_offset + METADATA_SIZE as u64);
475                break;
476            }
477
478            cursor -= 1;
479        }
480
481        Ok(best_valid_offset.unwrap_or(0))
482    }
483
484    /// Performs the core logic of reading an entry from the store.
485    ///
486    /// This private helper centralizes the logic for both `read` and
487    /// `batch_read`. It takes all necessary context to perform a safe
488    /// lookup, including the key, its hash, the memory map, and a read
489    /// guard for the key indexer.
490    ///
491    /// # Parameters
492    /// - `key`: The original key bytes used for tag verification.
493    /// - `key_hash`: The pre-computed hash of the key for index lookup.
494    /// - `mmap_arc`: A reference to the active memory map.
495    /// - `key_indexer_guard`: A read-lock guard for the key index.
496    ///
497    /// # Returns
498    /// - `Some(EntryHandle)` if the key is found and all checks pass.
499    /// - `None` if the key is not found, a tag mismatch occurs
500    ///   (collision/corruption), or the entry is a tombstone.
501    #[inline]
502    fn read_entry_with_context<'a>(
503        &self,
504        non_hashed_key: Option<&[u8]>,
505        key_hash: u64,
506        mmap_arc: &Arc<Mmap>,
507        key_indexer_guard: &RwLockReadGuard<'a, KeyIndexer>,
508    ) -> Option<EntryHandle> {
509        let packed = *key_indexer_guard.get_packed(&key_hash)?;
510        let (tag, offset) = KeyIndexer::unpack(packed);
511
512        // The crucial verification check, now centralized.
513        if let Some(non_hashed_key) = non_hashed_key
514            && tag != KeyIndexer::tag_from_key(non_hashed_key)
515        {
516            warn!(
517                "Tag mismatch detected for `non_hashed_key`, likely a \
518                 hash collision or index corruption."
519            );
520            return None;
521        }
522
523        let offset = offset as usize;
524        if offset + METADATA_SIZE > mmap_arc.len() {
525            return None;
526        }
527
528        let metadata_bytes = &mmap_arc[offset..offset + METADATA_SIZE];
529        let metadata = EntryMetadata::deserialize(metadata_bytes);
530
531        // Derive aligned payload start from stored previous tail. Support
532        // tombstones (single NULL byte without pre-pad).
533        let prev_tail = metadata.prev_offset;
534        let derived = prev_tail + Self::prepad_len(prev_tail) as u64;
535        let entry_end = offset;
536
537        let mut entry_start = derived as usize;
538
539        if entry_end > prev_tail as usize
540            && entry_end - prev_tail as usize == 1
541            && mmap_arc[prev_tail as usize..entry_end] == NULL_BYTE
542        {
543            entry_start = prev_tail as usize;
544        }
545
546        if entry_start >= entry_end || entry_end > mmap_arc.len() {
547            return None;
548        }
549
550        // Tombstone (single null byte)
551        if entry_end - entry_start == 1 && mmap_arc[entry_start..entry_end] == NULL_BYTE {
552            return None;
553        }
554
555        #[cfg(any(test, debug_assertions))]
556        {
557            debug_assert_aligned_offset(entry_start as u64);
558        }
559
560        Some(EntryHandle {
561            mmap_arc: mmap_arc.clone(),
562            range: entry_start..entry_end,
563            metadata,
564        })
565    }
566
567    /// Copies an entry handle to a **different storage container**.
568    ///
569    /// This function:
570    /// - Extracts metadata and content from the given `EntryHandle`.
571    /// - Writes the entry into the `target` storage.
572    ///
573    /// # Parameters:
574    /// - `entry`: The **entry handle** to be copied.
575    /// - `target`: The **destination storage** where the entry should be
576    ///   copied.
577    ///
578    /// # Returns:
579    /// - `Ok(target_offset)`: The file offset where the copied entry was
580    ///   written.
581    /// - `Err(std::io::Error)`: If a write operation fails.
582    ///
583    /// # Notes:
584    /// - This is a **low-level function** used by `copy` and related
585    ///   operations.
586    /// - The `entry` remains **unchanged** in the original storage.
587    fn copy_handle(&self, entry: &EntryHandle, target: &DataStore) -> Result<u64> {
588        let mut entry_stream = EntryStream::from(entry.clone_arc());
589        target.write_stream_with_key_hash(entry.key_hash(), &mut entry_stream)
590    }
591
592    /// Estimates the potential space savings from compaction.
593    ///
594    /// This method scans the storage file and calculates the difference
595    /// between the total file size and the size required to keep only
596    /// the latest versions of all keys.
597    ///
598    /// # How It Works:
599    /// - Iterates through the entries, tracking the **latest version** of
600    ///   each key.
601    /// - Ignores older versions of keys to estimate the **optimized**
602    ///   storage footprint.
603    /// - Returns the **difference** between the total file size and the
604    ///   estimated compacted size.
605    pub fn estimate_compaction_savings(&self) -> u64 {
606        let total_size = self.file_size().unwrap_or(0);
607        let mut unique_entry_size: u64 = 0;
608        let mut seen_keys = HashSet::with_hasher(Xxh3BuildHasher);
609
610        for entry in self.iter_entries() {
611            if seen_keys.insert(entry.key_hash()) {
612                unique_entry_size += entry.file_size() as u64;
613            }
614        }
615        total_size.saturating_sub(unique_entry_size)
616    }
617
618    /// Provides access to the shared memory-mapped file (`Arc<Mmap>`)
619    /// for testing.
620    ///
621    /// This method returns a cloned `Arc<Mmap>`, allowing test cases to
622    /// inspect the memory-mapped region while ensuring reference
623    /// counting remains intact.
624    ///
625    /// # Notes:
626    /// - The returned `Arc<Mmap>` ensures safe access without
627    ///   invalidating the mmap.
628    /// - This function is only available in **test** and **debug**
629    ///   builds.
630    #[cfg(any(test, debug_assertions))]
631    pub fn get_mmap_arc_for_testing(&self) -> Arc<Mmap> {
632        self.get_mmap_arc()
633    }
634
635    /// Provides direct access to the raw pointer of the underlying
636    /// memory map for testing.
637    ///
638    /// This method retrieves a raw pointer (`*const u8`) to the start of
639    /// the memory-mapped file. It is useful for validating zero-copy
640    /// behavior and memory alignment in test cases.
641    ///
642    /// # Safety Considerations:
643    /// - The pointer remains valid **as long as** the mmap is not
644    ///   remapped or dropped.
645    /// - Dereferencing this pointer outside of controlled test
646    ///   environments **is unsafe** and may result in undefined
647    ///   behavior.
648    ///
649    /// # Notes:
650    /// - This function is only available in **test** and **debug**
651    ///   builds.
652    #[cfg(any(test, debug_assertions))]
653    pub fn arc_ptr(&self) -> *const u8 {
654        self.get_mmap_arc().as_ptr()
655    }
656
657    #[inline]
658    fn get_mmap_arc(&self) -> Arc<Mmap> {
659        let guard = self.mmap.lock().unwrap();
660        let mmap_clone = guard.clone();
661        drop(guard);
662        mmap_clone
663    }
664
665    // --- Alignment helper -------------------------------------------------
666
667    /// Compute the number of pre-pad bytes required to align `offset` to
668    /// `PAYLOAD_ALIGNMENT`. `PAYLOAD_ALIGNMENT` is a power of two.
669    #[inline]
670    fn prepad_len(offset: u64) -> usize {
671        let a = PAYLOAD_ALIGNMENT;
672        ((a - (offset % a)) & (a - 1)) as usize
673    }
674
675    // ---------------------------------------------------------------------
676
677    // TODO: Determine thread count *before* running this OR [somehow]
678    // make it thread safe.
679    /// Compacts the storage by keeping only the latest version of each
680    /// key.
681    ///
682    /// # ⚠️ WARNING:
683    /// - **This function should only be used when a single thread is
684    ///   accessing the storage.**
685    /// - While `&mut self` prevents concurrent **mutations**, it **does
686    ///   not** prevent other threads from holding shared references
687    ///   (`&DataStore`) and performing reads.
688    /// - If the `DataStore` instance is wrapped in `Arc<DataStore>`,
689    ///   multiple threads may still hold **read** references while
690    ///   compaction is running, potentially leading to inconsistent
691    ///   reads.
692    /// - If stricter concurrency control is required, **manual
693    ///   synchronization should be enforced externally.**
694    ///
695    /// # Behavior:
696    /// - Creates a **temporary compacted file** containing only the
697    ///   latest versions of stored keys.
698    /// - Swaps the original file with the compacted version upon
699    ///   success.
700    /// - Does **not** remove tombstone (deleted) entries due to the
701    ///   append-only model.
702    ///
703    /// # Returns:
704    /// - `Ok(())` if compaction completes successfully.
705    /// - `Err(std::io::Error)` if an I/O operation fails.
706    pub fn compact(&mut self) -> Result<()> {
707        let compacted_path = crate::utils::append_extension(&self.path, "bk");
708        info!("Starting compaction. Writing to: {:?}", compacted_path);
709
710        let compacted_storage = DataStore::open(&compacted_path)?;
711        let mut index_pairs: Vec<(u64, u64)> = Vec::new();
712        let mut compacted_data_size: u64 = 0;
713
714        for entry in self.iter_entries() {
715            let new_tail_offset = self.copy_handle(&entry, &compacted_storage)?;
716            let stored_metadata_offset = new_tail_offset - METADATA_SIZE as u64;
717            index_pairs.push((entry.key_hash(), stored_metadata_offset));
718            compacted_data_size += entry.file_size() as u64;
719        }
720
721        let size_before = self.file_size()?;
722
723        // Note: The current implementation should never increase space,
724        // but if an additional indexer is ever used, this may change.
725        //
726        // Only write the static index if it actually saves space
727        if size_before > compacted_data_size {
728            info!("Compaction will save space. Writing static index.");
729
730            let mut file_guard = compacted_storage
731                .file
732                .write()
733                .map_err(|e| std::io::Error::other(format!("Lock poisoned: {e}")))?;
734            file_guard.flush()?;
735        } else {
736            info!(
737                "Compaction would increase file size \
738                 (data w/ indexing: {compacted_data_size}). \
739                 Skipping static index generation.",
740            );
741        }
742
743        drop(compacted_storage);
744
745        debug!("Compaction successful. Swapping files...");
746        std::fs::rename(&compacted_path, &self.path)?;
747        info!("Compaction file swap complete.");
748        Ok(())
749    }
750}
751
752impl DataStoreWriter for DataStore {
753    fn write_stream<R: Read>(&self, key: &[u8], reader: &mut R) -> Result<u64> {
754        let key_hash = compute_hash(key);
755        self.write_stream_with_key_hash(key_hash, reader)
756    }
757
758    fn write_stream_with_key_hash<R: Read>(&self, key_hash: u64, reader: &mut R) -> Result<u64> {
759        let mut file = self
760            .file
761            .write()
762            .map_err(|_| std::io::Error::other("Failed to acquire file lock"))?;
763        let prev_tail = self.tail_offset.load(Ordering::Acquire);
764
765        // Pre-align payload start so typed views can be zero-copy.
766        let prepad = Self::prepad_len(prev_tail);
767        if prepad > 0 {
768            // Pad with zeros; not part of logical payload.
769            let pad = [0u8; 64];
770            file.write_all(&pad[..prepad])?;
771        }
772
773        let mut buffer = vec![0; WRITE_STREAM_BUFFER_SIZE];
774        let mut total_written = 0;
775        let mut checksum_state = crc32fast::Hasher::new();
776        let mut is_null_only = true;
777
778        while let Ok(bytes_read) = reader.read(&mut buffer) {
779            if bytes_read == 0 {
780                break;
781            }
782
783            if buffer[..bytes_read].iter().any(|&b| b != NULL_BYTE[0]) {
784                is_null_only = false;
785            }
786
787            file.write_all(&buffer[..bytes_read])?;
788            checksum_state.update(&buffer[..bytes_read]);
789            total_written += bytes_read;
790        }
791
792        if total_written > 0 && is_null_only {
793            return Err(std::io::Error::new(
794                std::io::ErrorKind::InvalidInput,
795                "NULL-byte-only streams cannot be written directly.",
796            ));
797        }
798
799        if total_written == 0 {
800            return Err(std::io::Error::new(
801                std::io::ErrorKind::InvalidInput,
802                "Payload cannot be empty.",
803            ));
804        }
805
806        let checksum = checksum_state.finalize().to_le_bytes();
807        // Link to previous tail; readers derive aligned start.
808        let metadata = EntryMetadata {
809            key_hash,
810            prev_offset: prev_tail,
811            checksum,
812        };
813        file.write_all(&metadata.serialize())?;
814        file.flush()?;
815
816        let tail_offset = prev_tail + prepad as u64 + total_written as u64 + METADATA_SIZE as u64;
817
818        self.reindex(
819            &file,
820            &[(key_hash, tail_offset - METADATA_SIZE as u64)],
821            tail_offset,
822            None,
823        )?;
824        Ok(tail_offset)
825    }
826
827    fn write(&self, key: &[u8], payload: &[u8]) -> Result<u64> {
828        let key_hash = compute_hash(key);
829        self.write_with_key_hash(key_hash, payload)
830    }
831
832    fn write_with_key_hash(&self, key_hash: u64, payload: &[u8]) -> Result<u64> {
833        self.batch_write_with_key_hashes(vec![(key_hash, payload)], false)
834    }
835
836    // TODO: Consider change signature to:
837    // fn batch_write(&self, entries: Vec<(Vec<u8>, Vec<u8>)>) -> Result<u64>
838    fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result<u64> {
839        let (keys, payloads): (Vec<_>, Vec<_>) = entries.iter().cloned().unzip();
840        let hashes = compute_hash_batch(&keys);
841        let hashed_entries = hashes.into_iter().zip(payloads).collect::<Vec<_>>();
842        self.batch_write_with_key_hashes(hashed_entries, false)
843    }
844
845    // TODO: Consider change `prehashed_keys: Vec<(u64, &[u8])>` to
846    // `prehashed_keys: Vec<(u64, Vec<u8>)>`
847    fn batch_write_with_key_hashes(
848        &self,
849        prehashed_keys: Vec<(u64, &[u8])>,
850        allow_null_bytes: bool,
851    ) -> Result<u64> {
852        let mut file = self
853            .file
854            .write()
855            .map_err(|_| std::io::Error::other("Failed to acquire file lock"))?;
856
857        let mut buffer = Vec::new();
858        let mut tail_offset = self.tail_offset.load(Ordering::Acquire);
859
860        let mut key_hash_offsets: Vec<(u64, u64)> = Vec::with_capacity(prehashed_keys.len());
861        let mut deleted_keys: HashSet<u64> = HashSet::new();
862
863        for (key_hash, payload) in prehashed_keys {
864            if payload == NULL_BYTE {
865                if !allow_null_bytes {
866                    return Err(std::io::Error::new(
867                        std::io::ErrorKind::InvalidInput,
868                        "NULL-byte payloads cannot be written directly.",
869                    ));
870                }
871                // Tombstone: write 1 byte, no pre-pad.
872                let link_to_prev_tail = tail_offset;
873
874                if payload.is_empty() {
875                    return Err(std::io::Error::new(
876                        std::io::ErrorKind::InvalidInput,
877                        "Payload cannot be empty.",
878                    ));
879                }
880
881                let checksum = compute_checksum(payload);
882                let metadata = EntryMetadata {
883                    key_hash,
884                    prev_offset: link_to_prev_tail,
885                    checksum,
886                };
887
888                let mut entry: Vec<u8> = vec![0u8; 1 + METADATA_SIZE];
889                entry[0] = NULL_BYTE[0];
890                entry[1..].copy_from_slice(&metadata.serialize());
891
892                buffer.extend_from_slice(&entry);
893
894                tail_offset += entry.len() as u64;
895                key_hash_offsets.push((key_hash, tail_offset - METADATA_SIZE as u64));
896                deleted_keys.insert(key_hash);
897                continue;
898            }
899
900            if payload.is_empty() {
901                return Err(std::io::Error::new(
902                    std::io::ErrorKind::InvalidInput,
903                    "Payload cannot be empty.",
904                ));
905            }
906
907            // Non-tombstone: pre-align current payload.
908            let link_to_prev_tail = tail_offset;
909            let prepad = Self::prepad_len(tail_offset);
910            if prepad > 0 {
911                let old_len = buffer.len();
912                buffer.resize(old_len + prepad, 0u8);
913                tail_offset += prepad as u64;
914            }
915
916            let checksum = compute_checksum(payload);
917            let metadata = EntryMetadata {
918                key_hash,
919                prev_offset: link_to_prev_tail,
920                checksum,
921            };
922            let payload_len = payload.len();
923
924            let mut entry: Vec<u8> = vec![0u8; payload_len + METADATA_SIZE];
925            simd_copy(&mut entry[..payload.len()], payload);
926            entry[payload.len()..].copy_from_slice(&metadata.serialize());
927            buffer.extend_from_slice(&entry);
928
929            tail_offset += entry.len() as u64;
930            key_hash_offsets.push((key_hash, tail_offset - METADATA_SIZE as u64));
931        }
932
933        file.write_all(&buffer)?;
934        file.flush()?;
935
936        self.reindex(&file, &key_hash_offsets, tail_offset, Some(&deleted_keys))?;
937
938        Ok(self.tail_offset.load(Ordering::Acquire))
939    }
940
941    fn rename(&self, old_key: &[u8], new_key: &[u8]) -> Result<u64> {
942        if old_key == new_key {
943            return Err(std::io::Error::new(
944                std::io::ErrorKind::InvalidInput,
945                "Cannot rename a key to itself",
946            ));
947        }
948
949        let old_entry = self.read(old_key)?.ok_or_else(|| {
950            std::io::Error::new(std::io::ErrorKind::NotFound, "Old key not found")
951        })?;
952        let mut old_entry_stream = EntryStream::from(old_entry);
953
954        self.write_stream(new_key, &mut old_entry_stream)?;
955
956        let new_offset = self.delete(old_key)?;
957        Ok(new_offset)
958    }
959
960    fn copy(&self, key: &[u8], target: &DataStore) -> Result<u64> {
961        if self.path == target.path {
962            return Err(std::io::Error::new(
963                std::io::ErrorKind::InvalidInput,
964                format!(
965                    "Cannot copy entry to the same storage ({:?}). \
966                     Use `rename` instead.",
967                    self.path
968                ),
969            ));
970        }
971
972        let entry_handle = self.read(key)?.ok_or_else(|| {
973            std::io::Error::new(
974                std::io::ErrorKind::NotFound,
975                format!("Key not found: {:?}", String::from_utf8_lossy(key)),
976            )
977        })?;
978        self.copy_handle(&entry_handle, target)
979    }
980
981    fn transfer(&self, key: &[u8], target: &DataStore) -> Result<u64> {
982        self.copy(key, target)?;
983        self.delete(key)
984    }
985
986    fn delete(&self, key: &[u8]) -> Result<u64> {
987        self.batch_delete(&[key])
988    }
989
990    fn batch_delete(&self, keys: &[&[u8]]) -> Result<u64> {
991        let key_hashes = compute_hash_batch(keys);
992        self.batch_delete_key_hashes(&key_hashes)
993    }
994
995    fn batch_delete_key_hashes(&self, prehashed_keys: &[u64]) -> Result<u64> {
996        // First, check which keys actually exist to avoid writing
997        // useless tombstones.
998        let keys_to_delete: Vec<u64> = {
999            let key_indexer_guard = self
1000                .key_indexer
1001                .read()
1002                .map_err(|_| Error::other("Key-index lock poisoned during batch_delete check"))?;
1003
1004            prehashed_keys
1005                .iter()
1006                .filter(|&&key_hash| key_indexer_guard.get_packed(&key_hash).is_some())
1007                .cloned()
1008                .collect()
1009        };
1010
1011        // If no keys were found to delete, exit early without any I/O.
1012        if keys_to_delete.is_empty() {
1013            return Ok(self.tail_offset.load(Ordering::Acquire));
1014        }
1015
1016        // Prepare the delete operations (a key hash + a null byte).
1017        let delete_ops: Vec<(u64, &[u8])> = keys_to_delete
1018            .iter()
1019            .map(|&key_hash| (key_hash, &NULL_BYTE as &[u8]))
1020            .collect();
1021
1022        // Use the underlying batch write method, allowing null bytes.
1023        self.batch_write_with_key_hashes(delete_ops, true)
1024    }
1025}
1026
1027impl DataStoreReader for DataStore {
1028    type EntryHandleType = EntryHandle;
1029
1030    fn exists(&self, key: &[u8]) -> Result<bool> {
1031        Ok(self.read(key)?.is_some())
1032    }
1033
1034    fn exists_with_key_hash(&self, prehashed_key: u64) -> Result<bool> {
1035        // This is a lightweight wrapper around the read method, just
1036        // like exists().
1037        Ok(self.read_with_key_hash(prehashed_key)?.is_some())
1038    }
1039
1040    fn read(&self, key: &[u8]) -> Result<Option<EntryHandle>> {
1041        let key_hash = compute_hash(key);
1042        let key_indexer_guard = self
1043            .key_indexer
1044            .read()
1045            .map_err(|_| Error::other("key-index lock poisoned"))?;
1046        let mmap_arc = self.get_mmap_arc();
1047
1048        Ok(self.read_entry_with_context(Some(key), key_hash, &mmap_arc, &key_indexer_guard))
1049    }
1050
1051    fn read_with_key_hash(&self, prehashed_key: u64) -> Result<Option<EntryHandle>> {
1052        let key_indexer_guard = self
1053            .key_indexer
1054            .read()
1055            .map_err(|_| Error::other("key-index lock poisoned"))?;
1056        let mmap_arc = self.get_mmap_arc();
1057
1058        Ok(self.read_entry_with_context(None, prehashed_key, &mmap_arc, &key_indexer_guard))
1059    }
1060
1061    fn read_last_entry(&self) -> Result<Option<EntryHandle>> {
1062        let mmap_arc = self.get_mmap_arc();
1063        let tail_offset = self.tail_offset.load(std::sync::atomic::Ordering::Acquire);
1064        if tail_offset < METADATA_SIZE as u64 || mmap_arc.is_empty() {
1065            return Ok(None);
1066        }
1067
1068        let metadata_offset = (tail_offset - METADATA_SIZE as u64) as usize;
1069        if metadata_offset + METADATA_SIZE > mmap_arc.len() {
1070            return Ok(None);
1071        }
1072
1073        let metadata_bytes = &mmap_arc[metadata_offset..metadata_offset + METADATA_SIZE];
1074        let metadata = EntryMetadata::deserialize(metadata_bytes);
1075
1076        // Derive aligned start; support tombstone no-prepad case.
1077        let prev_tail = metadata.prev_offset;
1078        let derived = prev_tail + Self::prepad_len(prev_tail) as u64;
1079        let entry_end = metadata_offset;
1080
1081        let mut entry_start = derived as usize;
1082        if entry_end > prev_tail as usize
1083            && entry_end - prev_tail as usize == 1
1084            && mmap_arc[prev_tail as usize..entry_end] == NULL_BYTE
1085        {
1086            entry_start = prev_tail as usize;
1087        }
1088
1089        if entry_start >= entry_end || entry_end > mmap_arc.len() {
1090            return Ok(None);
1091        }
1092
1093        #[cfg(any(test, debug_assertions))]
1094        {
1095            debug_assert_aligned_offset(entry_start as u64);
1096        }
1097
1098        Ok(Some(EntryHandle {
1099            mmap_arc,
1100            range: entry_start..entry_end,
1101            metadata,
1102        }))
1103    }
1104
1105    fn batch_read(&self, keys: &[&[u8]]) -> Result<Vec<Option<EntryHandle>>> {
1106        let hashed_keys = compute_hash_batch(keys);
1107
1108        self.batch_read_hashed_keys(&hashed_keys, Some(keys))
1109    }
1110
1111    fn batch_read_hashed_keys(
1112        &self,
1113        prehashed_keys: &[u64],
1114        non_hashed_keys: Option<&[&[u8]]>,
1115    ) -> Result<Vec<Option<EntryHandle>>> {
1116        let mmap_arc = self.get_mmap_arc();
1117        let key_indexer_guard = self
1118            .key_indexer
1119            .read()
1120            .map_err(|_| Error::other("Key-index lock poisoned during `batch_read`"))?;
1121
1122        // Use a match to handle the two possible scenarios
1123        let results = match non_hashed_keys {
1124            // Case 1: We have the original keys for tag verification.
1125            Some(keys) => {
1126                // Good practice to ensure lengths match.
1127                if keys.len() != prehashed_keys.len() {
1128                    return Err(std::io::Error::new(
1129                        std::io::ErrorKind::InvalidInput,
1130                        "Mismatched lengths for hashed and non-hashed keys.",
1131                    ));
1132                }
1133
1134                prehashed_keys
1135                    .iter()
1136                    .zip(keys.iter())
1137                    .map(|(key_hash, &key)| {
1138                        // Correctly pass `Some(key)` for verification
1139                        self.read_entry_with_context(
1140                            Some(key),
1141                            *key_hash,
1142                            &mmap_arc,
1143                            &key_indexer_guard,
1144                        )
1145                    })
1146                    .collect()
1147            }
1148            // Case 2: We only have the hashes and must skip tag check.
1149            None => prehashed_keys
1150                .iter()
1151                .map(|key_hash| {
1152                    self.read_entry_with_context(None, *key_hash, &mmap_arc, &key_indexer_guard)
1153                })
1154                .collect(),
1155        };
1156
1157        Ok(results)
1158    }
1159
1160    fn read_metadata(&self, key: &[u8]) -> Result<Option<EntryMetadata>> {
1161        Ok(self.read(key)?.map(|entry| entry.metadata().clone()))
1162    }
1163
1164    fn len(&self) -> Result<usize> {
1165        let read_guard = self
1166            .key_indexer
1167            .read()
1168            .map_err(|_| Error::other("Key-index lock poisoned during `len`"))?;
1169
1170        Ok(read_guard.len())
1171    }
1172
1173    fn is_empty(&self) -> Result<bool> {
1174        let len = self.len()?;
1175
1176        Ok(len == 0)
1177    }
1178
1179    fn file_size(&self) -> Result<u64> {
1180        std::fs::metadata(&self.path).map(|meta| meta.len())
1181    }
1182}