1use bytes::Bytes;
33use memmap2::MmapMut;
34use std::collections::BTreeMap;
35use std::fs::{File, OpenOptions};
36use std::io::Write;
37use std::ops::{Bound, Range};
38use std::path::{Path, PathBuf};
39
40use super::{prefix_successor, StateError, StateSnapshot, StateStore};
41
42const MMAP_HEADER_SIZE: usize = 32;
44const MMAP_MAGIC: u64 = 0x004C_414D_494E_4152;
46const MMAP_VERSION: u32 = 1;
48const GROWTH_FACTOR: f64 = 1.5;
50const INDEX_EXTENSION: &str = "idx";
52
53#[derive(Debug, Clone, Copy, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
55struct ValueEntry {
57 offset: usize,
59 len: usize,
61 #[allow(dead_code)]
63 version: u64,
64}
65
66enum Storage {
68 Arena {
70 data: Vec<u8>,
72 write_pos: usize,
74 },
75 Mmap {
77 mmap: MmapMut,
78 file: File,
79 path: PathBuf,
80 write_pos: usize,
82 capacity: usize,
84 },
85}
86
87impl Storage {
88 fn get(&self, offset: usize, len: usize) -> &[u8] {
90 match self {
91 Storage::Arena { data, .. } => &data[offset..offset + len],
92 Storage::Mmap { mmap, .. } => {
93 &mmap[MMAP_HEADER_SIZE + offset..MMAP_HEADER_SIZE + offset + len]
94 }
95 }
96 }
97
98 fn write(&mut self, data: &[u8]) -> Result<usize, StateError> {
100 match self {
101 Storage::Arena {
102 data: buffer,
103 write_pos,
104 } => {
105 let offset = *write_pos;
106 let end = offset + data.len();
107
108 if end > buffer.len() {
110 #[allow(
112 clippy::cast_possible_truncation,
113 clippy::cast_sign_loss,
114 clippy::cast_precision_loss
115 )]
116 let new_size = (end as f64 * GROWTH_FACTOR) as usize;
117 buffer.resize(new_size, 0);
118 }
119
120 buffer[offset..end].copy_from_slice(data);
121 *write_pos = end;
122 Ok(offset)
123 }
124 Storage::Mmap {
125 mmap,
126 file,
127 path: _,
128 write_pos,
129 capacity,
130 } => {
131 let offset = *write_pos;
132 let end = offset + data.len();
133 let required = MMAP_HEADER_SIZE + end;
134
135 if required > *capacity {
137 #[allow(
139 clippy::cast_possible_truncation,
140 clippy::cast_sign_loss,
141 clippy::cast_precision_loss
142 )]
143 let new_capacity = (required as f64 * GROWTH_FACTOR) as usize;
144 file.set_len(new_capacity as u64)?;
145
146 #[allow(unsafe_code)]
151 {
152 *mmap = unsafe { MmapMut::map_mut(&*file)? };
153 }
154 *capacity = new_capacity;
155 }
156
157 mmap[MMAP_HEADER_SIZE + offset..MMAP_HEADER_SIZE + end].copy_from_slice(data);
158 *write_pos = end;
159 Ok(offset)
160 }
161 }
162 }
163
164 fn used_bytes(&self) -> usize {
166 match self {
167 Storage::Arena { write_pos, .. } | Storage::Mmap { write_pos, .. } => *write_pos,
168 }
169 }
170
171 fn flush(&mut self) -> Result<(), StateError> {
173 match self {
174 Storage::Arena { .. } => Ok(()),
175 Storage::Mmap { mmap, .. } => {
176 mmap.flush()?;
177 Ok(())
178 }
179 }
180 }
181
182 fn reset(&mut self) {
184 match self {
185 Storage::Arena { write_pos, .. } | Storage::Mmap { write_pos, .. } => *write_pos = 0,
186 }
187 }
188
189 fn is_persistent(&self) -> bool {
191 matches!(self, Storage::Mmap { .. })
192 }
193}
194
195pub struct MmapStateStore {
212 index: BTreeMap<Vec<u8>, ValueEntry>,
214 storage: Storage,
216 size_bytes: usize,
218 next_version: u64,
220}
221
222impl MmapStateStore {
223 #[must_use]
231 pub fn in_memory(capacity: usize) -> Self {
232 Self {
233 index: BTreeMap::new(),
234 storage: Storage::Arena {
235 data: vec![0u8; capacity],
236 write_pos: 0,
237 },
238 size_bytes: 0,
239 next_version: 1,
240 }
241 }
242
243 pub fn persistent(path: &Path, initial_capacity: usize) -> Result<Self, StateError> {
258 let file_exists = path.exists();
259
260 let file = OpenOptions::new()
261 .read(true)
262 .write(true)
263 .create(true)
264 .truncate(false)
265 .open(path)?;
266
267 let capacity = if file_exists {
268 let metadata = file.metadata()?;
269 #[allow(clippy::cast_possible_truncation)]
271 let cap = metadata.len() as usize;
272 cap
273 } else {
274 let capacity = initial_capacity.max(MMAP_HEADER_SIZE + 1024);
275 file.set_len(capacity as u64)?;
276 capacity
277 };
278
279 #[allow(unsafe_code)]
282 let mut mmap = unsafe { MmapMut::map_mut(&file)? };
283
284 let (index, write_pos, next_version) = if file_exists {
285 if capacity >= MMAP_HEADER_SIZE {
287 if let Ok(loaded) = Self::load_index(path) {
289 loaded
292 } else {
293 Self::load_from_mmap(&mmap)?
295 }
296 } else {
297 Self::init_mmap_header(&mut mmap);
299 (BTreeMap::new(), 0, 1)
300 }
301 } else {
302 Self::init_mmap_header(&mut mmap);
304 (BTreeMap::new(), 0, 1)
305 };
306
307 let size_bytes = index.iter().map(|(k, v)| k.len() + v.len).sum();
308
309 Ok(Self {
310 index,
311 storage: Storage::Mmap {
312 mmap,
313 file,
314 path: path.to_path_buf(),
315 write_pos,
316 capacity,
317 },
318 size_bytes,
319 next_version,
320 })
321 }
322
323 fn init_mmap_header(mmap: &mut MmapMut) {
325 mmap[0..8].copy_from_slice(&MMAP_MAGIC.to_le_bytes());
326 mmap[8..12].copy_from_slice(&MMAP_VERSION.to_le_bytes());
327 mmap[12..20].copy_from_slice(&0u64.to_le_bytes()); mmap[20..28].copy_from_slice(&0u64.to_le_bytes()); }
330
331 #[allow(clippy::type_complexity)]
333 fn load_from_mmap(
334 mmap: &MmapMut,
335 ) -> Result<(BTreeMap<Vec<u8>, ValueEntry>, usize, u64), StateError> {
336 let magic = u64::from_le_bytes(mmap[0..8].try_into().unwrap());
338 if magic != MMAP_MAGIC {
339 return Err(StateError::Corruption(
340 "Invalid magic number in state file".to_string(),
341 ));
342 }
343
344 let version = u32::from_le_bytes(mmap[8..12].try_into().unwrap());
346 if version != MMAP_VERSION {
347 return Err(StateError::Corruption(format!(
348 "Unsupported state file version: {version}"
349 )));
350 }
351
352 Ok((BTreeMap::new(), 0, 1))
356 }
357
358 #[must_use]
360 pub fn is_persistent(&self) -> bool {
361 self.storage.is_persistent()
362 }
363
364 #[must_use]
366 pub fn path(&self) -> Option<&Path> {
367 match &self.storage {
368 Storage::Arena { .. } => None,
369 Storage::Mmap { path, .. } => Some(path),
370 }
371 }
372
373 pub fn compact(&mut self) -> Result<(), StateError> {
381 let live_data: Vec<(Vec<u8>, Vec<u8>)> = self
383 .index
384 .iter()
385 .map(|(k, entry)| {
386 let value = self.storage.get(entry.offset, entry.len).to_vec();
387 (k.clone(), value)
388 })
389 .collect();
390
391 self.storage.reset();
393 self.index.clear();
394 self.size_bytes = 0;
395
396 for (key, value) in live_data {
398 let offset = self.storage.write(&value)?;
399 self.index.insert(
400 key.clone(),
401 ValueEntry {
402 offset,
403 len: value.len(),
404 version: self.next_version,
405 },
406 );
407 self.next_version += 1;
408 self.size_bytes += key.len() + value.len();
409 }
410
411 Ok(())
412 }
413
414 #[must_use]
416 #[allow(clippy::cast_precision_loss)]
417 pub fn fragmentation(&self) -> f64 {
418 let used = self.storage.used_bytes();
419 if used == 0 {
420 return 0.0;
421 }
422 let live: usize = self.index.values().map(|e| e.len).sum();
423 1.0 - (live as f64 / used as f64)
425 }
426
427 pub fn save_index(&self) -> Result<(), StateError> {
437 let path = match self.path() {
438 Some(p) => p.with_extension(INDEX_EXTENSION),
439 None => return Ok(()), };
441
442 let file = File::create(&path)?;
443 let mut writer = std::io::BufWriter::new(file);
444
445 let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&self.index)
447 .map_err(|e| StateError::Serialization(e.to_string()))?;
448
449 writer.write_all(&MMAP_MAGIC.to_le_bytes())?;
451 writer.write_all(&MMAP_VERSION.to_le_bytes())?;
452 writer.write_all(&[0u8; 4])?; let write_pos = self.storage.used_bytes() as u64;
455 writer.write_all(&write_pos.to_le_bytes())?;
456 writer.write_all(&self.next_version.to_le_bytes())?;
457
458 writer.write_all(&bytes)?;
460 writer.flush()?;
461
462 Ok(())
463 }
464
465 #[allow(clippy::type_complexity)]
467 fn load_index(
468 state_path: &Path,
469 ) -> Result<(BTreeMap<Vec<u8>, ValueEntry>, usize, u64), StateError> {
470 let path = state_path.with_extension(INDEX_EXTENSION);
471 if !path.exists() {
472 return Err(StateError::Io(std::io::Error::new(
473 std::io::ErrorKind::NotFound,
474 "Index file not found",
475 )));
476 }
477
478 let mut file = File::open(path)?;
479 let mut buffer = Vec::new();
480 std::io::Read::read_to_end(&mut file, &mut buffer)?;
481
482 if buffer.len() < 32 {
483 return Err(StateError::Corruption("Index file too short".to_string()));
485 }
486
487 let magic_bytes: [u8; 8] = buffer[0..8].try_into().unwrap();
489 if u64::from_le_bytes(magic_bytes) != MMAP_MAGIC {
490 return Err(StateError::Corruption("Invalid index magic".to_string()));
491 }
492
493 let version_bytes: [u8; 4] = buffer[8..12].try_into().unwrap();
495 if u32::from_le_bytes(version_bytes) != MMAP_VERSION {
496 return Err(StateError::Corruption("Invalid index version".to_string()));
497 }
498
499 let write_pos =
502 usize::try_from(u64::from_le_bytes(buffer[16..24].try_into().unwrap())).unwrap();
503 let next_version = u64::from_le_bytes(buffer[24..32].try_into().unwrap());
504
505 let index: BTreeMap<Vec<u8>, ValueEntry> =
506 rkyv::from_bytes::<BTreeMap<Vec<u8>, ValueEntry>, rkyv::rancor::Error>(&buffer[32..])
507 .map_err(|e| StateError::Deserialization(e.to_string()))?;
508
509 Ok((index, write_pos, next_version))
510 }
511}
512
513impl StateStore for MmapStateStore {
514 #[inline]
515 fn get(&self, key: &[u8]) -> Option<Bytes> {
516 self.index.get(key).map(|entry| {
517 let data = self.storage.get(entry.offset, entry.len);
518 Bytes::copy_from_slice(data)
519 })
520 }
521
522 #[inline]
523 fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), StateError> {
524 let offset = self.storage.write(value)?;
526
527 let entry = ValueEntry {
528 offset,
529 len: value.len(),
530 version: self.next_version,
531 };
532 self.next_version += 1;
533
534 match self.index.entry(key.to_vec()) {
536 std::collections::btree_map::Entry::Occupied(mut occupied) => {
537 self.size_bytes = self.size_bytes - occupied.get().len + value.len();
538 *occupied.get_mut() = entry;
539 }
540 std::collections::btree_map::Entry::Vacant(vacant) => {
541 self.size_bytes += key.len() + value.len();
542 vacant.insert(entry);
543 }
544 }
545 Ok(())
546 }
547
548 fn delete(&mut self, key: &[u8]) -> Result<(), StateError> {
549 if let Some(entry) = self.index.remove(key) {
550 self.size_bytes -= key.len() + entry.len;
551 }
554 Ok(())
555 }
556
557 fn prefix_scan<'a>(
558 &'a self,
559 prefix: &'a [u8],
560 ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
561 if prefix.is_empty() {
562 return Box::new(self.index.iter().map(|(k, entry)| {
563 let value = self.storage.get(entry.offset, entry.len);
564 (Bytes::copy_from_slice(k), Bytes::copy_from_slice(value))
565 }));
566 }
567 if let Some(end) = prefix_successor(prefix) {
568 Box::new(
569 self.index
570 .range::<[u8], _>((Bound::Included(prefix), Bound::Excluded(end.as_slice())))
571 .map(|(k, entry)| {
572 let value = self.storage.get(entry.offset, entry.len);
573 (Bytes::copy_from_slice(k), Bytes::copy_from_slice(value))
574 }),
575 )
576 } else {
577 Box::new(
578 self.index
579 .range::<[u8], _>((Bound::Included(prefix), Bound::Unbounded))
580 .map(|(k, entry)| {
581 let value = self.storage.get(entry.offset, entry.len);
582 (Bytes::copy_from_slice(k), Bytes::copy_from_slice(value))
583 }),
584 )
585 }
586 }
587
588 fn range_scan<'a>(
589 &'a self,
590 range: Range<&'a [u8]>,
591 ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
592 Box::new(
593 self.index
594 .range::<[u8], _>((Bound::Included(range.start), Bound::Excluded(range.end)))
595 .map(|(k, entry)| {
596 let value = self.storage.get(entry.offset, entry.len);
597 (Bytes::copy_from_slice(k), Bytes::copy_from_slice(value))
598 }),
599 )
600 }
601
602 #[inline]
603 fn contains(&self, key: &[u8]) -> bool {
604 self.index.contains_key(key)
605 }
606
607 fn size_bytes(&self) -> usize {
608 self.size_bytes
609 }
610
611 fn len(&self) -> usize {
612 self.index.len()
613 }
614
615 fn snapshot(&self) -> StateSnapshot {
616 let data: Vec<(Vec<u8>, Vec<u8>)> = self
617 .index
618 .iter()
619 .map(|(k, entry)| {
620 let value = self.storage.get(entry.offset, entry.len).to_vec();
621 (k.clone(), value)
622 })
623 .collect();
624 StateSnapshot::new(data)
625 }
626
627 fn restore(&mut self, snapshot: StateSnapshot) {
628 self.index.clear();
629 self.storage.reset();
630 self.size_bytes = 0;
631 self.next_version = 1;
632
633 for (key, value) in snapshot.data() {
634 if let Ok(offset) = self.storage.write(value) {
635 self.index.insert(
636 key.clone(),
637 ValueEntry {
638 offset,
639 len: value.len(),
640 version: self.next_version,
641 },
642 );
643 self.next_version += 1;
644 self.size_bytes += key.len() + value.len();
645 }
646 }
647 }
648
649 fn clear(&mut self) {
650 self.index.clear();
651 self.storage.reset();
652 self.size_bytes = 0;
653 }
654
655 fn flush(&mut self) -> Result<(), StateError> {
656 self.storage.flush()?;
657 if self.is_persistent() {
658 self.save_index()?;
659 }
660 Ok(())
661 }
662}
663
664#[cfg(test)]
665mod tests {
666 use super::*;
667 use tempfile::tempdir;
668
669 #[test]
670 fn test_in_memory_basic() {
671 let mut store = MmapStateStore::in_memory(1024);
672
673 store.put(b"key1", b"value1").unwrap();
675 assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
676 assert_eq!(store.len(), 1);
677
678 store.put(b"key1", b"value2").unwrap();
680 assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value2"));
681 assert_eq!(store.len(), 1);
682
683 store.delete(b"key1").unwrap();
685 assert!(store.get(b"key1").is_none());
686 assert_eq!(store.len(), 0);
687 }
688
689 #[test]
690 fn test_persistent_basic() {
691 let dir = tempdir().unwrap();
692 let path = dir.path().join("state.db");
693
694 {
696 let mut store = MmapStateStore::persistent(&path, 4096).unwrap();
697 store.put(b"key1", b"value1").unwrap();
698 store.put(b"key2", b"value2").unwrap();
699 store.flush().unwrap();
700 }
701
702 {
705 let store = MmapStateStore::persistent(&path, 4096).unwrap();
706 assert!(store.is_persistent());
707 assert_eq!(store.path(), Some(path.as_path()));
708 }
709 }
710
711 #[test]
712 fn test_contains() {
713 let mut store = MmapStateStore::in_memory(1024);
714 assert!(!store.contains(b"key1"));
715
716 store.put(b"key1", b"value1").unwrap();
717 assert!(store.contains(b"key1"));
718
719 store.delete(b"key1").unwrap();
720 assert!(!store.contains(b"key1"));
721 }
722
723 #[test]
724 fn test_prefix_scan() {
725 let mut store = MmapStateStore::in_memory(4096);
726 store.put(b"prefix:1", b"value1").unwrap();
727 store.put(b"prefix:2", b"value2").unwrap();
728 store.put(b"prefix:10", b"value10").unwrap();
729 store.put(b"other:1", b"value3").unwrap();
730
731 let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
732 assert_eq!(results.len(), 3);
733
734 for (key, _) in &results {
735 assert!(key.starts_with(b"prefix:"));
736 }
737 }
738
739 #[test]
740 fn test_range_scan() {
741 let mut store = MmapStateStore::in_memory(4096);
742 store.put(b"a", b"1").unwrap();
743 store.put(b"b", b"2").unwrap();
744 store.put(b"c", b"3").unwrap();
745 store.put(b"d", b"4").unwrap();
746
747 let results: Vec<_> = store.range_scan(b"b".as_slice()..b"d".as_slice()).collect();
748 assert_eq!(results.len(), 2);
749
750 let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref()).collect();
751 assert!(keys.contains(&b"b".as_slice()));
752 assert!(keys.contains(&b"c".as_slice()));
753 }
754
755 #[test]
756 fn test_snapshot_and_restore() {
757 let mut store = MmapStateStore::in_memory(4096);
758 store.put(b"key1", b"value1").unwrap();
759 store.put(b"key2", b"value2").unwrap();
760
761 let snapshot = store.snapshot();
763 assert_eq!(snapshot.len(), 2);
764
765 store.put(b"key1", b"modified").unwrap();
767 store.put(b"key3", b"value3").unwrap();
768 store.delete(b"key2").unwrap();
769
770 assert_eq!(store.len(), 2);
771 assert_eq!(store.get(b"key1").unwrap(), Bytes::from("modified"));
772
773 store.restore(snapshot);
775
776 assert_eq!(store.len(), 2);
777 assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
778 assert_eq!(store.get(b"key2").unwrap(), Bytes::from("value2"));
779 assert!(store.get(b"key3").is_none());
780 }
781
782 #[test]
783 fn test_size_tracking() {
784 let mut store = MmapStateStore::in_memory(4096);
785 assert_eq!(store.size_bytes(), 0);
786
787 store.put(b"key1", b"value1").unwrap();
788 assert_eq!(store.size_bytes(), 4 + 6); store.put(b"key2", b"value2").unwrap();
791 assert_eq!(store.size_bytes(), (4 + 6) * 2);
792
793 store.put(b"key1", b"v1").unwrap();
795 assert_eq!(store.size_bytes(), 4 + 2 + 4 + 6);
796
797 store.delete(b"key1").unwrap();
798 assert_eq!(store.size_bytes(), 4 + 6);
799
800 store.clear();
801 assert_eq!(store.size_bytes(), 0);
802 }
803
804 #[test]
805 fn test_compact() {
806 let mut store = MmapStateStore::in_memory(4096);
807
808 store.put(b"key1", b"value1").unwrap();
810 store.put(b"key2", b"value2").unwrap();
811 store.put(b"key3", b"value3").unwrap();
812
813 store.delete(b"key2").unwrap();
815
816 store.put(b"key1", b"new_value1").unwrap();
818
819 let frag_before = store.fragmentation();
820 assert!(frag_before > 0.0);
821
822 store.compact().unwrap();
824
825 let frag_after = store.fragmentation();
826 assert!(frag_after < frag_before);
827 assert!(frag_after.abs() < f64::EPSILON); assert_eq!(store.get(b"key1").unwrap(), Bytes::from("new_value1"));
831 assert!(store.get(b"key2").is_none());
832 assert_eq!(store.get(b"key3").unwrap(), Bytes::from("value3"));
833 }
834
835 #[test]
836 fn test_growth() {
837 let mut store = MmapStateStore::in_memory(32);
839
840 for i in 0..100 {
842 let key = format!("key{i:04}");
843 let value = format!("value{i:04}");
844 store.put(key.as_bytes(), value.as_bytes()).unwrap();
845 }
846
847 assert_eq!(store.len(), 100);
848
849 for i in 0..100 {
851 let key = format!("key{i:04}");
852 let expected = format!("value{i:04}");
853 assert_eq!(
854 store.get(key.as_bytes()).unwrap().as_ref(),
855 expected.as_bytes()
856 );
857 }
858 }
859
860 #[test]
861 fn test_clear() {
862 let mut store = MmapStateStore::in_memory(4096);
863 store.put(b"key1", b"value1").unwrap();
864 store.put(b"key2", b"value2").unwrap();
865
866 assert_eq!(store.len(), 2);
867 assert!(store.size_bytes() > 0);
868
869 store.clear();
870
871 assert_eq!(store.len(), 0);
872 assert_eq!(store.size_bytes(), 0);
873 assert!(store.get(b"key1").is_none());
874 }
875
876 #[test]
877 fn test_empty_store() {
878 let store = MmapStateStore::in_memory(1024);
879 assert!(store.is_empty());
880 assert_eq!(store.len(), 0);
881 assert_eq!(store.size_bytes(), 0);
882 assert!(store.get(b"nonexistent").is_none());
883 assert!(!store.contains(b"nonexistent"));
884 }
885
886 #[test]
887 fn test_large_values() {
888 let mut store = MmapStateStore::in_memory(1024 * 1024);
889
890 let large_value = vec![0xABu8; 100 * 1024];
892 store.put(b"large", &large_value).unwrap();
893
894 let retrieved = store.get(b"large").unwrap();
895 assert_eq!(retrieved.len(), large_value.len());
896 assert_eq!(retrieved.as_ref(), &large_value[..]);
897 }
898
899 #[test]
900 fn test_binary_keys_and_values() {
901 let mut store = MmapStateStore::in_memory(4096);
902
903 let key = [0x00, 0x01, 0x02, 0xFF, 0xFE];
905 let value = [0xDE, 0xAD, 0xBE, 0xEF];
906
907 store.put(&key, &value).unwrap();
908 assert_eq!(store.get(&key).unwrap().as_ref(), &value);
909 }
910
911 #[test]
912 fn test_index_persistence() {
913 let temp_dir = tempfile::tempdir().unwrap();
914 let db_path = temp_dir.path().join("test_index.db");
915
916 {
918 let mut store = MmapStateStore::persistent(&db_path, 1024 * 1024).unwrap();
919 store.put(b"key1", b"value1").unwrap();
920 store.put(b"key2", b"value2").unwrap();
921 store.flush().unwrap();
923 }
924
925 let idx_path = db_path.with_extension(INDEX_EXTENSION);
927 assert!(idx_path.exists());
928
929 {
931 let store = MmapStateStore::persistent(&db_path, 1024 * 1024).unwrap();
932 assert_eq!(store.len(), 2);
934 assert_eq!(store.get(b"key1").unwrap().as_ref(), b"value1");
935 }
936 }
937}