1use crate::storage_engine::constants::*;
2use crate::storage_engine::digest::{
3 Xxh3BuildHasher, compute_checksum, compute_hash, compute_hash_batch,
4};
5use crate::storage_engine::simd_copy;
6use crate::storage_engine::{EntryIterator, EntryStream, KeyIndexer};
7use crate::traits::{DataStoreReader, DataStoreWriter};
8use crate::utils::verify_file_existence;
9use memmap2::Mmap;
10use simd_r_drive_entry_handle::{EntryHandle, EntryMetadata};
11use std::collections::HashSet;
12use std::convert::From;
13use std::fs::{File, OpenOptions};
14use std::io::{BufWriter, Error, Read, Result, Seek, SeekFrom, Write};
15use std::path::{Path, PathBuf};
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
18use tracing::{debug, info, warn};
19
20#[cfg(any(test, debug_assertions))]
21use simd_r_drive_entry_handle::debug_assert_aligned_offset;
22
23#[cfg(feature = "parallel")]
24use rayon::prelude::*;
25
26pub struct DataStore {
28 file: Arc<RwLock<BufWriter<File>>>,
29 mmap: Arc<Mutex<Arc<Mmap>>>,
30 tail_offset: AtomicU64,
31 key_indexer: Arc<RwLock<KeyIndexer>>,
32 path: PathBuf,
33}
34
35impl IntoIterator for DataStore {
45 type Item = EntryHandle;
46 type IntoIter = EntryIterator;
47
48 fn into_iter(self) -> Self::IntoIter {
49 self.iter_entries()
50 }
51}
52
53impl From<PathBuf> for DataStore {
54 fn from(path: PathBuf) -> Self {
62 DataStore::open(&path).expect("Failed to open storage file")
63 }
64}
65
66impl DataStore {
67 pub fn open(path: &Path) -> Result<Self> {
85 let file = Self::open_file_in_append_mode(path)?;
86 let file_len = file.get_ref().metadata()?.len();
87
88 let mmap = Self::init_mmap(&file)?;
89 let final_len = Self::recover_valid_chain(&mmap, file_len)?;
90
91 if final_len < file_len {
92 warn!(
93 "Truncating corrupted data in {} from offset {} to {}.",
94 path.display(),
95 final_len,
96 file_len
97 );
98 drop(mmap);
99 drop(file);
100 let file = OpenOptions::new().read(true).write(true).open(path)?;
101 file.set_len(final_len)?;
102 file.sync_all()?;
103 return Self::open(path);
104 }
105
106 let mmap_arc = Arc::new(mmap);
107 let mmap_for_indexer: &'static Mmap = unsafe { &*(Arc::as_ptr(&mmap_arc)) };
108 let key_indexer = KeyIndexer::build(mmap_for_indexer, final_len);
109
110 Ok(Self {
111 file: Arc::new(RwLock::new(file)),
112 mmap: Arc::new(Mutex::new(mmap_arc)),
113 tail_offset: final_len.into(),
114 key_indexer: Arc::new(RwLock::new(key_indexer)),
115 path: path.to_path_buf(),
116 })
117 }
118
119 pub fn open_existing(path: &Path) -> Result<Self> {
142 verify_file_existence(path)?;
143 Self::open(path)
144 }
145
146 fn open_file_in_append_mode(path: &Path) -> Result<BufWriter<File>> {
162 let mut file = OpenOptions::new()
163 .read(true)
164 .write(true)
165 .create(true)
166 .truncate(false)
167 .open(path)?;
168 file.seek(SeekFrom::End(0))?;
169 Ok(BufWriter::new(file))
170 }
171
172 fn init_mmap(file: &BufWriter<File>) -> Result<Mmap> {
173 unsafe { memmap2::MmapOptions::new().map(file.get_ref()) }
174 }
175
176 fn reindex(
225 &self,
226 write_guard: &std::sync::RwLockWriteGuard<'_, BufWriter<File>>,
227 key_hash_offsets: &[(u64, u64)],
228 tail_offset: u64,
229 deleted_keys: Option<&HashSet<u64>>,
230 ) -> std::io::Result<()> {
231 let new_mmap = Self::init_mmap(write_guard)?;
232 let mut mmap_guard = self.mmap.lock().unwrap();
233 let mut key_indexer_guard = self
234 .key_indexer
235 .write()
236 .map_err(|_| std::io::Error::other("Failed to acquire index lock"))?;
237
238 for (key_hash, offset) in key_hash_offsets.iter() {
239 if deleted_keys
240 .as_ref()
241 .is_some_and(|set| set.contains(key_hash))
242 {
243 key_indexer_guard.remove(key_hash);
244 } else {
245 if let Err(e) = key_indexer_guard.insert(*key_hash, *offset) {
247 warn!("Write operation aborted due to hash collision: {}", e);
250 return Err(std::io::Error::other(e));
251 }
252 }
253 }
254
255 *mmap_guard = Arc::new(new_mmap);
256 self.tail_offset.store(tail_offset, Ordering::Release);
257
258 Ok(())
259 }
260
261 pub fn get_path(&self) -> PathBuf {
266 self.path.clone()
267 }
268
269 pub fn iter_entries(&self) -> EntryIterator {
277 let mmap_clone = self.get_mmap_arc();
278 let tail_offset = self.tail_offset.load(Ordering::Acquire);
279 EntryIterator::new(mmap_clone, tail_offset)
280 }
281
282 #[cfg(feature = "parallel")]
297 pub fn par_iter_entries(&self) -> impl ParallelIterator<Item = EntryHandle> {
298 let key_indexer_guard = self.key_indexer.read().unwrap();
301 let packed_values: Vec<u64> = key_indexer_guard.values().cloned().collect();
302 drop(key_indexer_guard); let mmap_arc = self.get_mmap_arc();
306
307 packed_values.into_par_iter().filter_map(move |packed| {
311 let (_tag, offset) = KeyIndexer::unpack(packed);
312 let offset = offset as usize;
313
314 if offset + METADATA_SIZE > mmap_arc.len() {
318 return None;
319 }
320
321 let metadata_bytes = &mmap_arc[offset..offset + METADATA_SIZE];
322 let metadata = EntryMetadata::deserialize(metadata_bytes);
323
324 let prev_tail = metadata.prev_offset;
328 let derived = prev_tail + Self::prepad_len(prev_tail) as u64;
329 let entry_end = offset;
330
331 let mut entry_start = derived as usize;
332
333 if entry_end > prev_tail as usize
335 && entry_end - prev_tail as usize == 1
336 && mmap_arc[prev_tail as usize..entry_end] == NULL_BYTE
337 {
338 entry_start = prev_tail as usize;
339 }
340
341 if entry_start >= entry_end || entry_end > mmap_arc.len() {
342 return None;
343 }
344
345 if entry_end - entry_start == 1 && mmap_arc[entry_start..entry_end] == NULL_BYTE {
347 return None;
348 }
349
350 #[cfg(any(test, debug_assertions))]
351 {
352 debug_assert_aligned_offset(entry_start as u64);
353 }
354
355 Some(EntryHandle {
356 mmap_arc: mmap_arc.clone(),
357 range: entry_start..entry_end,
358 metadata,
359 })
360 })
361 }
362
363 fn recover_valid_chain(mmap: &Mmap, file_len: u64) -> Result<u64> {
384 if file_len < METADATA_SIZE as u64 {
385 return Ok(0);
386 }
387
388 let mut cursor = file_len;
389 let mut best_valid_offset = None;
390 while cursor >= METADATA_SIZE as u64 {
391 let metadata_offset = cursor - METADATA_SIZE as u64;
392 let metadata_bytes =
393 &mmap[metadata_offset as usize..(metadata_offset as usize + METADATA_SIZE)];
394 let metadata = EntryMetadata::deserialize(metadata_bytes);
395
396 let prev_tail = metadata.prev_offset;
398
399 let derived_start = prev_tail + Self::prepad_len(prev_tail) as u64;
402 let entry_end = metadata_offset;
403
404 let entry_start = if entry_end > prev_tail
405 && entry_end - prev_tail == 1
406 && mmap[prev_tail as usize..entry_end as usize] == NULL_BYTE
407 {
408 prev_tail
409 } else {
410 #[cfg(any(test, debug_assertions))]
411 {
412 debug_assert_aligned_offset(derived_start);
413 }
414
415 derived_start
416 };
417
418 if entry_start >= metadata_offset {
419 cursor -= 1;
420 continue;
421 }
422
423 let mut chain_valid = true;
424 let mut back_cursor = prev_tail; let mut total_size = (metadata_offset - entry_start) + METADATA_SIZE as u64;
427
428 while back_cursor != 0 {
429 if back_cursor < METADATA_SIZE as u64 {
430 chain_valid = false;
431 break;
432 }
433
434 let prev_metadata_offset = back_cursor - METADATA_SIZE as u64;
435 if prev_metadata_offset as usize + METADATA_SIZE > mmap.len() {
436 chain_valid = false;
437 break;
438 }
439
440 let prev_metadata_bytes = &mmap[prev_metadata_offset as usize
441 ..(prev_metadata_offset as usize + METADATA_SIZE)];
442 let prev_metadata = EntryMetadata::deserialize(prev_metadata_bytes);
443
444 let prev_prev_tail = prev_metadata.prev_offset;
445
446 let prev_entry_start = if prev_metadata_offset > prev_prev_tail
448 && prev_metadata_offset - prev_prev_tail == 1
449 && mmap[prev_prev_tail as usize..prev_metadata_offset as usize] == NULL_BYTE
450 {
451 prev_prev_tail
452 } else {
453 prev_prev_tail + Self::prepad_len(prev_prev_tail) as u64
454 };
455
456 if prev_entry_start >= prev_metadata_offset {
457 chain_valid = false;
458 break;
459 }
460
461 let entry_size = prev_metadata_offset.saturating_sub(prev_entry_start);
462
463 total_size += entry_size + METADATA_SIZE as u64;
464
465 if prev_prev_tail >= prev_metadata_offset {
466 chain_valid = false;
467 break;
468 }
469
470 back_cursor = prev_prev_tail;
471 }
472
473 if chain_valid && back_cursor == 0 && total_size <= file_len {
474 best_valid_offset = Some(metadata_offset + METADATA_SIZE as u64);
475 break;
476 }
477
478 cursor -= 1;
479 }
480
481 Ok(best_valid_offset.unwrap_or(0))
482 }
483
484 #[inline]
502 fn read_entry_with_context<'a>(
503 &self,
504 non_hashed_key: Option<&[u8]>,
505 key_hash: u64,
506 mmap_arc: &Arc<Mmap>,
507 key_indexer_guard: &RwLockReadGuard<'a, KeyIndexer>,
508 ) -> Option<EntryHandle> {
509 let packed = *key_indexer_guard.get_packed(&key_hash)?;
510 let (tag, offset) = KeyIndexer::unpack(packed);
511
512 if let Some(non_hashed_key) = non_hashed_key
514 && tag != KeyIndexer::tag_from_key(non_hashed_key)
515 {
516 warn!(
517 "Tag mismatch detected for `non_hashed_key`, likely a \
518 hash collision or index corruption."
519 );
520 return None;
521 }
522
523 let offset = offset as usize;
524 if offset + METADATA_SIZE > mmap_arc.len() {
525 return None;
526 }
527
528 let metadata_bytes = &mmap_arc[offset..offset + METADATA_SIZE];
529 let metadata = EntryMetadata::deserialize(metadata_bytes);
530
531 let prev_tail = metadata.prev_offset;
534 let derived = prev_tail + Self::prepad_len(prev_tail) as u64;
535 let entry_end = offset;
536
537 let mut entry_start = derived as usize;
538
539 if entry_end > prev_tail as usize
540 && entry_end - prev_tail as usize == 1
541 && mmap_arc[prev_tail as usize..entry_end] == NULL_BYTE
542 {
543 entry_start = prev_tail as usize;
544 }
545
546 if entry_start >= entry_end || entry_end > mmap_arc.len() {
547 return None;
548 }
549
550 if entry_end - entry_start == 1 && mmap_arc[entry_start..entry_end] == NULL_BYTE {
552 return None;
553 }
554
555 #[cfg(any(test, debug_assertions))]
556 {
557 debug_assert_aligned_offset(entry_start as u64);
558 }
559
560 Some(EntryHandle {
561 mmap_arc: mmap_arc.clone(),
562 range: entry_start..entry_end,
563 metadata,
564 })
565 }
566
567 fn copy_handle(&self, entry: &EntryHandle, target: &DataStore) -> Result<u64> {
588 let mut entry_stream = EntryStream::from(entry.clone_arc());
589 target.write_stream_with_key_hash(entry.key_hash(), &mut entry_stream)
590 }
591
592 pub fn estimate_compaction_savings(&self) -> u64 {
606 let total_size = self.file_size().unwrap_or(0);
607 let mut unique_entry_size: u64 = 0;
608 let mut seen_keys = HashSet::with_hasher(Xxh3BuildHasher);
609
610 for entry in self.iter_entries() {
611 if seen_keys.insert(entry.key_hash()) {
612 unique_entry_size += entry.file_size() as u64;
613 }
614 }
615 total_size.saturating_sub(unique_entry_size)
616 }
617
618 #[cfg(any(test, debug_assertions))]
631 pub fn get_mmap_arc_for_testing(&self) -> Arc<Mmap> {
632 self.get_mmap_arc()
633 }
634
635 #[cfg(any(test, debug_assertions))]
653 pub fn arc_ptr(&self) -> *const u8 {
654 self.get_mmap_arc().as_ptr()
655 }
656
657 #[inline]
658 fn get_mmap_arc(&self) -> Arc<Mmap> {
659 let guard = self.mmap.lock().unwrap();
660 let mmap_clone = guard.clone();
661 drop(guard);
662 mmap_clone
663 }
664
665 #[inline]
670 fn prepad_len(offset: u64) -> usize {
671 let a = PAYLOAD_ALIGNMENT;
672 ((a - (offset % a)) & (a - 1)) as usize
673 }
674
675 pub fn compact(&mut self) -> Result<()> {
707 let compacted_path = crate::utils::append_extension(&self.path, "bk");
708 info!("Starting compaction. Writing to: {:?}", compacted_path);
709
710 let compacted_storage = DataStore::open(&compacted_path)?;
711 let mut index_pairs: Vec<(u64, u64)> = Vec::new();
712 let mut compacted_data_size: u64 = 0;
713
714 for entry in self.iter_entries() {
715 let new_tail_offset = self.copy_handle(&entry, &compacted_storage)?;
716 let stored_metadata_offset = new_tail_offset - METADATA_SIZE as u64;
717 index_pairs.push((entry.key_hash(), stored_metadata_offset));
718 compacted_data_size += entry.file_size() as u64;
719 }
720
721 let size_before = self.file_size()?;
722
723 if size_before > compacted_data_size {
728 info!("Compaction will save space. Writing static index.");
729
730 let mut file_guard = compacted_storage
731 .file
732 .write()
733 .map_err(|e| std::io::Error::other(format!("Lock poisoned: {e}")))?;
734 file_guard.flush()?;
735 } else {
736 info!(
737 "Compaction would increase file size \
738 (data w/ indexing: {compacted_data_size}). \
739 Skipping static index generation.",
740 );
741 }
742
743 drop(compacted_storage);
744
745 debug!("Compaction successful. Swapping files...");
746 std::fs::rename(&compacted_path, &self.path)?;
747 info!("Compaction file swap complete.");
748 Ok(())
749 }
750}
751
752impl DataStoreWriter for DataStore {
753 fn write_stream<R: Read>(&self, key: &[u8], reader: &mut R) -> Result<u64> {
754 let key_hash = compute_hash(key);
755 self.write_stream_with_key_hash(key_hash, reader)
756 }
757
758 fn write_stream_with_key_hash<R: Read>(&self, key_hash: u64, reader: &mut R) -> Result<u64> {
759 let mut file = self
760 .file
761 .write()
762 .map_err(|_| std::io::Error::other("Failed to acquire file lock"))?;
763 let prev_tail = self.tail_offset.load(Ordering::Acquire);
764
765 let prepad = Self::prepad_len(prev_tail);
767 if prepad > 0 {
768 let pad = [0u8; 64];
770 file.write_all(&pad[..prepad])?;
771 }
772
773 let mut buffer = vec![0; WRITE_STREAM_BUFFER_SIZE];
774 let mut total_written = 0;
775 let mut checksum_state = crc32fast::Hasher::new();
776 let mut is_null_only = true;
777
778 while let Ok(bytes_read) = reader.read(&mut buffer) {
779 if bytes_read == 0 {
780 break;
781 }
782
783 if buffer[..bytes_read].iter().any(|&b| b != NULL_BYTE[0]) {
784 is_null_only = false;
785 }
786
787 file.write_all(&buffer[..bytes_read])?;
788 checksum_state.update(&buffer[..bytes_read]);
789 total_written += bytes_read;
790 }
791
792 if total_written > 0 && is_null_only {
793 return Err(std::io::Error::new(
794 std::io::ErrorKind::InvalidInput,
795 "NULL-byte-only streams cannot be written directly.",
796 ));
797 }
798
799 if total_written == 0 {
800 return Err(std::io::Error::new(
801 std::io::ErrorKind::InvalidInput,
802 "Payload cannot be empty.",
803 ));
804 }
805
806 let checksum = checksum_state.finalize().to_le_bytes();
807 let metadata = EntryMetadata {
809 key_hash,
810 prev_offset: prev_tail,
811 checksum,
812 };
813 file.write_all(&metadata.serialize())?;
814 file.flush()?;
815
816 let tail_offset = prev_tail + prepad as u64 + total_written as u64 + METADATA_SIZE as u64;
817
818 self.reindex(
819 &file,
820 &[(key_hash, tail_offset - METADATA_SIZE as u64)],
821 tail_offset,
822 None,
823 )?;
824 Ok(tail_offset)
825 }
826
827 fn write(&self, key: &[u8], payload: &[u8]) -> Result<u64> {
828 let key_hash = compute_hash(key);
829 self.write_with_key_hash(key_hash, payload)
830 }
831
832 fn write_with_key_hash(&self, key_hash: u64, payload: &[u8]) -> Result<u64> {
833 self.batch_write_with_key_hashes(vec![(key_hash, payload)], false)
834 }
835
836 fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result<u64> {
839 let (keys, payloads): (Vec<_>, Vec<_>) = entries.iter().cloned().unzip();
840 let hashes = compute_hash_batch(&keys);
841 let hashed_entries = hashes.into_iter().zip(payloads).collect::<Vec<_>>();
842 self.batch_write_with_key_hashes(hashed_entries, false)
843 }
844
845 fn batch_write_with_key_hashes(
848 &self,
849 prehashed_keys: Vec<(u64, &[u8])>,
850 allow_null_bytes: bool,
851 ) -> Result<u64> {
852 let mut file = self
853 .file
854 .write()
855 .map_err(|_| std::io::Error::other("Failed to acquire file lock"))?;
856
857 let mut buffer = Vec::new();
858 let mut tail_offset = self.tail_offset.load(Ordering::Acquire);
859
860 let mut key_hash_offsets: Vec<(u64, u64)> = Vec::with_capacity(prehashed_keys.len());
861 let mut deleted_keys: HashSet<u64> = HashSet::new();
862
863 for (key_hash, payload) in prehashed_keys {
864 if payload == NULL_BYTE {
865 if !allow_null_bytes {
866 return Err(std::io::Error::new(
867 std::io::ErrorKind::InvalidInput,
868 "NULL-byte payloads cannot be written directly.",
869 ));
870 }
871 let link_to_prev_tail = tail_offset;
873
874 if payload.is_empty() {
875 return Err(std::io::Error::new(
876 std::io::ErrorKind::InvalidInput,
877 "Payload cannot be empty.",
878 ));
879 }
880
881 let checksum = compute_checksum(payload);
882 let metadata = EntryMetadata {
883 key_hash,
884 prev_offset: link_to_prev_tail,
885 checksum,
886 };
887
888 let mut entry: Vec<u8> = vec![0u8; 1 + METADATA_SIZE];
889 entry[0] = NULL_BYTE[0];
890 entry[1..].copy_from_slice(&metadata.serialize());
891
892 buffer.extend_from_slice(&entry);
893
894 tail_offset += entry.len() as u64;
895 key_hash_offsets.push((key_hash, tail_offset - METADATA_SIZE as u64));
896 deleted_keys.insert(key_hash);
897 continue;
898 }
899
900 if payload.is_empty() {
901 return Err(std::io::Error::new(
902 std::io::ErrorKind::InvalidInput,
903 "Payload cannot be empty.",
904 ));
905 }
906
907 let link_to_prev_tail = tail_offset;
909 let prepad = Self::prepad_len(tail_offset);
910 if prepad > 0 {
911 let old_len = buffer.len();
912 buffer.resize(old_len + prepad, 0u8);
913 tail_offset += prepad as u64;
914 }
915
916 let checksum = compute_checksum(payload);
917 let metadata = EntryMetadata {
918 key_hash,
919 prev_offset: link_to_prev_tail,
920 checksum,
921 };
922 let payload_len = payload.len();
923
924 let mut entry: Vec<u8> = vec![0u8; payload_len + METADATA_SIZE];
925 simd_copy(&mut entry[..payload.len()], payload);
926 entry[payload.len()..].copy_from_slice(&metadata.serialize());
927 buffer.extend_from_slice(&entry);
928
929 tail_offset += entry.len() as u64;
930 key_hash_offsets.push((key_hash, tail_offset - METADATA_SIZE as u64));
931 }
932
933 file.write_all(&buffer)?;
934 file.flush()?;
935
936 self.reindex(&file, &key_hash_offsets, tail_offset, Some(&deleted_keys))?;
937
938 Ok(self.tail_offset.load(Ordering::Acquire))
939 }
940
941 fn rename(&self, old_key: &[u8], new_key: &[u8]) -> Result<u64> {
942 if old_key == new_key {
943 return Err(std::io::Error::new(
944 std::io::ErrorKind::InvalidInput,
945 "Cannot rename a key to itself",
946 ));
947 }
948
949 let old_entry = self.read(old_key)?.ok_or_else(|| {
950 std::io::Error::new(std::io::ErrorKind::NotFound, "Old key not found")
951 })?;
952 let mut old_entry_stream = EntryStream::from(old_entry);
953
954 self.write_stream(new_key, &mut old_entry_stream)?;
955
956 let new_offset = self.delete(old_key)?;
957 Ok(new_offset)
958 }
959
960 fn copy(&self, key: &[u8], target: &DataStore) -> Result<u64> {
961 if self.path == target.path {
962 return Err(std::io::Error::new(
963 std::io::ErrorKind::InvalidInput,
964 format!(
965 "Cannot copy entry to the same storage ({:?}). \
966 Use `rename` instead.",
967 self.path
968 ),
969 ));
970 }
971
972 let entry_handle = self.read(key)?.ok_or_else(|| {
973 std::io::Error::new(
974 std::io::ErrorKind::NotFound,
975 format!("Key not found: {:?}", String::from_utf8_lossy(key)),
976 )
977 })?;
978 self.copy_handle(&entry_handle, target)
979 }
980
981 fn transfer(&self, key: &[u8], target: &DataStore) -> Result<u64> {
982 self.copy(key, target)?;
983 self.delete(key)
984 }
985
986 fn delete(&self, key: &[u8]) -> Result<u64> {
987 self.batch_delete(&[key])
988 }
989
990 fn batch_delete(&self, keys: &[&[u8]]) -> Result<u64> {
991 let key_hashes = compute_hash_batch(keys);
992 self.batch_delete_key_hashes(&key_hashes)
993 }
994
995 fn batch_delete_key_hashes(&self, prehashed_keys: &[u64]) -> Result<u64> {
996 let keys_to_delete: Vec<u64> = {
999 let key_indexer_guard = self
1000 .key_indexer
1001 .read()
1002 .map_err(|_| Error::other("Key-index lock poisoned during batch_delete check"))?;
1003
1004 prehashed_keys
1005 .iter()
1006 .filter(|&&key_hash| key_indexer_guard.get_packed(&key_hash).is_some())
1007 .cloned()
1008 .collect()
1009 };
1010
1011 if keys_to_delete.is_empty() {
1013 return Ok(self.tail_offset.load(Ordering::Acquire));
1014 }
1015
1016 let delete_ops: Vec<(u64, &[u8])> = keys_to_delete
1018 .iter()
1019 .map(|&key_hash| (key_hash, &NULL_BYTE as &[u8]))
1020 .collect();
1021
1022 self.batch_write_with_key_hashes(delete_ops, true)
1024 }
1025}
1026
1027impl DataStoreReader for DataStore {
1028 type EntryHandleType = EntryHandle;
1029
1030 fn exists(&self, key: &[u8]) -> Result<bool> {
1031 Ok(self.read(key)?.is_some())
1032 }
1033
1034 fn exists_with_key_hash(&self, prehashed_key: u64) -> Result<bool> {
1035 Ok(self.read_with_key_hash(prehashed_key)?.is_some())
1038 }
1039
1040 fn read(&self, key: &[u8]) -> Result<Option<EntryHandle>> {
1041 let key_hash = compute_hash(key);
1042 let key_indexer_guard = self
1043 .key_indexer
1044 .read()
1045 .map_err(|_| Error::other("key-index lock poisoned"))?;
1046 let mmap_arc = self.get_mmap_arc();
1047
1048 Ok(self.read_entry_with_context(Some(key), key_hash, &mmap_arc, &key_indexer_guard))
1049 }
1050
1051 fn read_with_key_hash(&self, prehashed_key: u64) -> Result<Option<EntryHandle>> {
1052 let key_indexer_guard = self
1053 .key_indexer
1054 .read()
1055 .map_err(|_| Error::other("key-index lock poisoned"))?;
1056 let mmap_arc = self.get_mmap_arc();
1057
1058 Ok(self.read_entry_with_context(None, prehashed_key, &mmap_arc, &key_indexer_guard))
1059 }
1060
1061 fn read_last_entry(&self) -> Result<Option<EntryHandle>> {
1062 let mmap_arc = self.get_mmap_arc();
1063 let tail_offset = self.tail_offset.load(std::sync::atomic::Ordering::Acquire);
1064 if tail_offset < METADATA_SIZE as u64 || mmap_arc.is_empty() {
1065 return Ok(None);
1066 }
1067
1068 let metadata_offset = (tail_offset - METADATA_SIZE as u64) as usize;
1069 if metadata_offset + METADATA_SIZE > mmap_arc.len() {
1070 return Ok(None);
1071 }
1072
1073 let metadata_bytes = &mmap_arc[metadata_offset..metadata_offset + METADATA_SIZE];
1074 let metadata = EntryMetadata::deserialize(metadata_bytes);
1075
1076 let prev_tail = metadata.prev_offset;
1078 let derived = prev_tail + Self::prepad_len(prev_tail) as u64;
1079 let entry_end = metadata_offset;
1080
1081 let mut entry_start = derived as usize;
1082 if entry_end > prev_tail as usize
1083 && entry_end - prev_tail as usize == 1
1084 && mmap_arc[prev_tail as usize..entry_end] == NULL_BYTE
1085 {
1086 entry_start = prev_tail as usize;
1087 }
1088
1089 if entry_start >= entry_end || entry_end > mmap_arc.len() {
1090 return Ok(None);
1091 }
1092
1093 #[cfg(any(test, debug_assertions))]
1094 {
1095 debug_assert_aligned_offset(entry_start as u64);
1096 }
1097
1098 Ok(Some(EntryHandle {
1099 mmap_arc,
1100 range: entry_start..entry_end,
1101 metadata,
1102 }))
1103 }
1104
1105 fn batch_read(&self, keys: &[&[u8]]) -> Result<Vec<Option<EntryHandle>>> {
1106 let hashed_keys = compute_hash_batch(keys);
1107
1108 self.batch_read_hashed_keys(&hashed_keys, Some(keys))
1109 }
1110
1111 fn batch_read_hashed_keys(
1112 &self,
1113 prehashed_keys: &[u64],
1114 non_hashed_keys: Option<&[&[u8]]>,
1115 ) -> Result<Vec<Option<EntryHandle>>> {
1116 let mmap_arc = self.get_mmap_arc();
1117 let key_indexer_guard = self
1118 .key_indexer
1119 .read()
1120 .map_err(|_| Error::other("Key-index lock poisoned during `batch_read`"))?;
1121
1122 let results = match non_hashed_keys {
1124 Some(keys) => {
1126 if keys.len() != prehashed_keys.len() {
1128 return Err(std::io::Error::new(
1129 std::io::ErrorKind::InvalidInput,
1130 "Mismatched lengths for hashed and non-hashed keys.",
1131 ));
1132 }
1133
1134 prehashed_keys
1135 .iter()
1136 .zip(keys.iter())
1137 .map(|(key_hash, &key)| {
1138 self.read_entry_with_context(
1140 Some(key),
1141 *key_hash,
1142 &mmap_arc,
1143 &key_indexer_guard,
1144 )
1145 })
1146 .collect()
1147 }
1148 None => prehashed_keys
1150 .iter()
1151 .map(|key_hash| {
1152 self.read_entry_with_context(None, *key_hash, &mmap_arc, &key_indexer_guard)
1153 })
1154 .collect(),
1155 };
1156
1157 Ok(results)
1158 }
1159
1160 fn read_metadata(&self, key: &[u8]) -> Result<Option<EntryMetadata>> {
1161 Ok(self.read(key)?.map(|entry| entry.metadata().clone()))
1162 }
1163
1164 fn len(&self) -> Result<usize> {
1165 let read_guard = self
1166 .key_indexer
1167 .read()
1168 .map_err(|_| Error::other("Key-index lock poisoned during `len`"))?;
1169
1170 Ok(read_guard.len())
1171 }
1172
1173 fn is_empty(&self) -> Result<bool> {
1174 let len = self.len()?;
1175
1176 Ok(len == 0)
1177 }
1178
1179 fn file_size(&self) -> Result<u64> {
1180 std::fs::metadata(&self.path).map(|meta| meta.len())
1181 }
1182}