1use std::fs::{File, OpenOptions};
58use std::path::{Path, PathBuf};
59use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
60use std::time::{Duration, SystemTime, UNIX_EPOCH};
61
62use dashmap::DashMap;
63use parking_lot::RwLock;
64use sochdb_core::{Result, SochDBError};
65
66pub type ConcurrentVersionChain = VersionChain;
68pub type ConcurrentVersionEntry = VersionEntry;
69
70const MVCC_MAGIC: u64 = 0x43435F564D484353; const MVCC_VERSION: u32 = 1;
79
80const MAX_READERS: usize = 1024;
82
83const READER_SLOT_SIZE: usize = 64;
85
86const HEADER_SIZE: usize = 64;
88
89const METADATA_SIZE: usize = HEADER_SIZE + (MAX_READERS * READER_SLOT_SIZE);
91
92const STALE_READER_TIMEOUT_US: u64 = 60_000_000;
94
95const GC_COMMIT_INTERVAL: u64 = 1000;
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
111pub struct HlcTimestamp(pub u64);
112
113impl HlcTimestamp {
114 #[inline]
116 pub fn new(physical_ms: u64, logical: u16) -> Self {
117 Self((physical_ms << 16) | (logical as u64))
118 }
119
120 #[inline]
122 pub fn physical_ms(&self) -> u64 {
123 self.0 >> 16
124 }
125
126 #[inline]
128 pub fn logical(&self) -> u16 {
129 (self.0 & 0xFFFF) as u16
130 }
131
132 #[inline]
134 pub fn raw(&self) -> u64 {
135 self.0
136 }
137
138 pub fn allocate_next(last: &AtomicU64) -> Self {
146 let physical_now = SystemTime::now()
147 .duration_since(UNIX_EPOCH)
148 .unwrap()
149 .as_millis() as u64;
150
151 loop {
152 let last_val = last.load(Ordering::Acquire);
153 let last_phys = last_val >> 16;
154 let last_log = (last_val & 0xFFFF) as u16;
155
156 let (new_phys, new_log) = if physical_now > last_phys {
157 (physical_now, 0u16)
158 } else {
159 (last_phys, last_log.saturating_add(1))
161 };
162
163 let new_val = (new_phys << 16) | (new_log as u64);
164
165 if last
166 .compare_exchange(last_val, new_val, Ordering::AcqRel, Ordering::Acquire)
167 .is_ok()
168 {
169 return Self(new_val);
170 }
171 std::hint::spin_loop();
173 }
174 }
175
176 #[inline]
178 pub fn read_current(ts: &AtomicU64) -> Self {
179 Self(ts.load(Ordering::Acquire))
180 }
181}
182
183impl From<u64> for HlcTimestamp {
184 fn from(val: u64) -> Self {
185 Self(val)
186 }
187}
188
189impl From<HlcTimestamp> for u64 {
190 fn from(ts: HlcTimestamp) -> Self {
191 ts.0
192 }
193}
194
195#[repr(C, align(64))]
204#[derive(Debug)]
205pub struct ReaderSlot {
206 pub pid: AtomicU32,
208 pub snapshot_ts: AtomicU64,
210 pub epoch: AtomicU32,
212 pub last_heartbeat: AtomicU64,
214 _reserved: [u8; 36],
216}
217
218impl ReaderSlot {
219 pub const fn empty() -> Self {
221 Self {
222 pid: AtomicU32::new(0),
223 snapshot_ts: AtomicU64::new(0),
224 epoch: AtomicU32::new(0),
225 last_heartbeat: AtomicU64::new(0),
226 _reserved: [0u8; 36],
227 }
228 }
229
230 #[inline]
232 pub fn is_free(&self) -> bool {
233 self.pid.load(Ordering::Acquire) == 0
234 }
235
236 #[inline]
240 pub fn try_claim(&self, my_pid: u32, snapshot_ts: u64, epoch: u32) -> bool {
241 let current_pid = self.pid.load(Ordering::Acquire);
242
243 if current_pid != 0 && current_pid != my_pid {
245 return false;
246 }
247
248 if self
249 .pid
250 .compare_exchange(current_pid, my_pid, Ordering::AcqRel, Ordering::Acquire)
251 .is_ok()
252 {
253 self.snapshot_ts.store(snapshot_ts, Ordering::Release);
255 self.epoch.store(epoch, Ordering::Release);
256 self.last_heartbeat
257 .store(current_time_us(), Ordering::Release);
258 true
259 } else {
260 false
261 }
262 }
263
264 #[inline]
266 pub fn release(&self, my_pid: u32) {
267 if self.pid.load(Ordering::Acquire) == my_pid {
269 self.snapshot_ts.store(0, Ordering::Release);
270 self.pid.store(0, Ordering::Release);
271 }
272 }
273
274 #[inline]
276 pub fn heartbeat(&self) {
277 self.last_heartbeat
278 .store(current_time_us(), Ordering::Release);
279 }
280
281 #[inline]
283 pub fn is_stale(&self, now_us: u64) -> bool {
284 let pid = self.pid.load(Ordering::Acquire);
285 if pid == 0 {
286 return false; }
288
289 let last_hb = self.last_heartbeat.load(Ordering::Acquire);
291 if now_us.saturating_sub(last_hb) > STALE_READER_TIMEOUT_US {
292 return true;
293 }
294
295 !process_exists(pid)
297 }
298}
299
300#[repr(C)]
306#[derive(Debug)]
307pub struct MvccHeader {
308 pub magic: u64,
310 pub version: u32,
312 pub page_size: u32,
314 pub num_readers: u32,
316 pub current_epoch: AtomicU64,
318 pub current_ts: AtomicU64,
320 pub writer_lock: AtomicU32,
322 pub commits_since_gc: AtomicU64,
324 _reserved: [u8; 4],
326}
327
328impl MvccHeader {
329 pub fn new() -> Self {
331 Self {
332 magic: MVCC_MAGIC,
333 version: MVCC_VERSION,
334 page_size: 4096,
335 num_readers: MAX_READERS as u32,
336 current_epoch: AtomicU64::new(1),
337 current_ts: AtomicU64::new(HlcTimestamp::new(
338 SystemTime::now()
339 .duration_since(UNIX_EPOCH)
340 .unwrap()
341 .as_millis() as u64,
342 0,
343 ).raw()),
344 writer_lock: AtomicU32::new(0),
345 commits_since_gc: AtomicU64::new(0),
346 _reserved: [0u8; 4],
347 }
348 }
349
350 pub fn validate(&self) -> Result<()> {
352 if self.magic != MVCC_MAGIC {
353 return Err(SochDBError::Corruption(
354 "Invalid MVCC metadata magic".into(),
355 ));
356 }
357 if self.version != MVCC_VERSION {
358 return Err(SochDBError::Corruption(format!(
359 "Unsupported MVCC version: {} (expected {})",
360 self.version, MVCC_VERSION
361 )));
362 }
363 Ok(())
364 }
365}
366
367impl Default for MvccHeader {
368 fn default() -> Self {
369 Self::new()
370 }
371}
372
373#[derive(Debug, Clone)]
379pub struct VersionEntry {
380 pub commit_ts: u64,
382 pub txn_id: u64,
384 pub epoch: u32,
386 pub value: Option<Vec<u8>>,
388}
389
390impl VersionEntry {
391 pub fn new(commit_ts: u64, txn_id: u64, epoch: u32, value: Option<Vec<u8>>) -> Self {
393 Self {
394 commit_ts,
395 txn_id,
396 epoch,
397 value,
398 }
399 }
400
401 #[inline]
403 pub fn is_visible_at(&self, snapshot_ts: u64) -> bool {
404 self.commit_ts > 0 && self.commit_ts < snapshot_ts
405 }
406}
407
408#[derive(Debug, Default)]
419pub struct VersionChain {
420 versions: Vec<VersionEntry>,
422 uncommitted: Option<VersionEntry>,
424}
425
426impl VersionChain {
427 pub fn new() -> Self {
429 Self {
430 versions: Vec::new(),
431 uncommitted: None,
432 }
433 }
434
435 pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64, epoch: u32) {
437 self.uncommitted = Some(VersionEntry {
438 commit_ts: 0, txn_id,
440 epoch,
441 value,
442 });
443 }
444
445 pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
449 if let Some(ref mut v) = self.uncommitted {
450 if v.txn_id == txn_id {
451 v.commit_ts = commit_ts;
452 let committed = self.uncommitted.take().unwrap();
453
454 let pos = self
457 .versions
458 .partition_point(|existing| existing.commit_ts > commit_ts);
459 self.versions.insert(pos, committed);
460
461 return true;
462 }
463 }
464 false
465 }
466
467 pub fn abort(&mut self, txn_id: u64) {
469 if let Some(ref v) = self.uncommitted {
470 if v.txn_id == txn_id {
471 self.uncommitted = None;
472 }
473 }
474 }
475
476 pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&VersionEntry> {
483 if let Some(txn_id) = current_txn_id {
485 if let Some(ref v) = self.uncommitted {
486 if v.txn_id == txn_id {
487 return Some(v);
488 }
489 }
490 }
491
492 let idx = self
495 .versions
496 .partition_point(|v| v.commit_ts >= snapshot_ts);
497
498 self.versions.get(idx)
499 }
500
501 pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
503 if let Some(ref v) = self.uncommitted {
504 return v.txn_id != my_txn_id;
505 }
506 false
507 }
508
509 pub fn gc(&mut self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
518 if self.versions.len() <= 1 {
519 return 0; }
521
522 let original_len = self.versions.len();
523
524 let mut keep_count = 1; for v in self.versions.iter().skip(1) {
530 if v.epoch >= min_epoch || v.commit_ts >= min_snapshot_ts {
531 keep_count += 1;
532 } else {
533 break; }
535 }
536
537 self.versions.truncate(keep_count);
538 original_len - self.versions.len()
539 }
540
541 pub fn len(&self) -> usize {
543 self.versions.len() + if self.uncommitted.is_some() { 1 } else { 0 }
544 }
545
546 pub fn is_empty(&self) -> bool {
548 self.versions.is_empty() && self.uncommitted.is_none()
549 }
550}
551
552pub struct VersionStore {
560 data: DashMap<Vec<u8>, VersionChain>,
562 stats: VersionStoreStats,
564}
565
566#[derive(Debug, Default)]
568pub struct VersionStoreStats {
569 pub num_keys: AtomicU64,
571 pub num_versions: AtomicU64,
573 pub gc_passes: AtomicU64,
575 pub versions_reclaimed: AtomicU64,
577}
578
579impl VersionStore {
580 pub fn new() -> Self {
582 Self {
583 data: DashMap::new(),
584 stats: VersionStoreStats::default(),
585 }
586 }
587
588 pub fn insert_uncommitted(
590 &self,
591 key: &[u8],
592 value: Option<Vec<u8>>,
593 txn_id: u64,
594 epoch: u32,
595 ) -> Result<()> {
596 let mut entry = self.data.entry(key.to_vec()).or_insert_with(|| {
597 self.stats.num_keys.fetch_add(1, Ordering::Relaxed);
598 VersionChain::new()
599 });
600
601 if entry.has_write_conflict(txn_id) {
603 return Err(SochDBError::Internal(
604 "Write conflict: another transaction has uncommitted write".into(),
605 ));
606 }
607
608 entry.add_uncommitted(value, txn_id, epoch);
609 self.stats.num_versions.fetch_add(1, Ordering::Relaxed);
610 Ok(())
611 }
612
613 pub fn commit(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
615 if let Some(mut entry) = self.data.get_mut(key) {
616 return entry.commit(txn_id, commit_ts);
617 }
618 false
619 }
620
621 pub fn abort(&self, key: &[u8], txn_id: u64) {
623 if let Some(mut entry) = self.data.get_mut(key) {
624 entry.abort(txn_id);
625 self.stats.num_versions.fetch_sub(1, Ordering::Relaxed);
626 }
627 }
628
629 pub fn get(&self, key: &[u8], snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<Vec<u8>> {
631 self.data.get(key).and_then(|chain| {
632 chain
633 .read_at(snapshot_ts, current_txn_id)
634 .and_then(|v| v.value.clone())
635 })
636 }
637
638 pub fn contains(&self, key: &[u8], snapshot_ts: u64) -> bool {
640 self.data
641 .get(key)
642 .map(|chain| chain.read_at(snapshot_ts, None).is_some())
643 .unwrap_or(false)
644 }
645
646 pub fn gc(&self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
650 let mut total_reclaimed = 0;
651
652 for mut entry in self.data.iter_mut() {
653 let reclaimed = entry.gc(min_epoch, min_snapshot_ts);
654 total_reclaimed += reclaimed;
655 }
656
657 self.stats
658 .gc_passes
659 .fetch_add(1, Ordering::Relaxed);
660 self.stats
661 .versions_reclaimed
662 .fetch_add(total_reclaimed as u64, Ordering::Relaxed);
663
664 total_reclaimed
665 }
666
667 pub fn len(&self) -> usize {
669 self.data.len()
670 }
671
672 pub fn is_empty(&self) -> bool {
674 self.data.is_empty()
675 }
676
677 pub fn stats(&self) -> &VersionStoreStats {
679 &self.stats
680 }
681}
682
683impl Default for VersionStore {
684 fn default() -> Self {
685 Self::new()
686 }
687}
688
689pub struct ConcurrentMvcc {
701 path: PathBuf,
703 header: MvccHeader,
705 reader_slots: Vec<ReaderSlot>,
707 version_store: VersionStore,
709 our_pid: u32,
711 our_slot: RwLock<Option<usize>>,
713}
714
715impl ConcurrentMvcc {
716 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
718 let path = path.as_ref().to_path_buf();
719 std::fs::create_dir_all(&path)?;
720
721 let metadata_path = path.join(".mvcc_metadata");
722 let header = if metadata_path.exists() {
723 Self::load_header(&metadata_path)?
725 } else {
726 let header = MvccHeader::new();
728 Self::save_header(&metadata_path, &header)?;
729 header
730 };
731
732 let mut reader_slots = Vec::with_capacity(MAX_READERS);
734 for _ in 0..MAX_READERS {
735 reader_slots.push(ReaderSlot::empty());
736 }
737
738 Ok(Self {
739 path,
740 header,
741 reader_slots,
742 version_store: VersionStore::new(),
743 our_pid: std::process::id(),
744 our_slot: RwLock::new(None),
745 })
746 }
747
748 fn load_header(path: &Path) -> Result<MvccHeader> {
750 use std::io::Read;
751 let mut file = File::open(path)?;
752 let mut buf = vec![0u8; std::mem::size_of::<MvccHeader>()];
753 file.read_exact(&mut buf)?;
754
755 let header: MvccHeader = unsafe { std::ptr::read(buf.as_ptr() as *const MvccHeader) };
757 header.validate()?;
758 Ok(header)
759 }
760
761 fn save_header(path: &Path, header: &MvccHeader) -> Result<()> {
763 use std::io::Write;
764 let mut file = OpenOptions::new()
765 .create(true)
766 .write(true)
767 .truncate(true)
768 .open(path)?;
769
770 let buf = unsafe {
772 std::slice::from_raw_parts(
773 header as *const MvccHeader as *const u8,
774 std::mem::size_of::<MvccHeader>(),
775 )
776 };
777 file.write_all(buf)?;
778 file.sync_all()?;
779 Ok(())
780 }
781
782 #[inline]
784 pub fn allocate_timestamp(&self) -> HlcTimestamp {
785 HlcTimestamp::allocate_next(&self.header.current_ts)
786 }
787
788 #[inline]
790 pub fn current_timestamp(&self) -> HlcTimestamp {
791 HlcTimestamp::read_current(&self.header.current_ts)
792 }
793
794 #[inline]
796 pub fn current_epoch(&self) -> u64 {
797 self.header.current_epoch.load(Ordering::Acquire)
798 }
799
800 pub fn register_reader(&self) -> Result<usize> {
805 let snapshot_ts = self.current_timestamp().raw();
806 let epoch = self.current_epoch() as u32;
807
808 for (i, slot) in self.reader_slots.iter().enumerate() {
810 if slot.try_claim(self.our_pid, snapshot_ts, epoch) {
811 *self.our_slot.write() = Some(i);
812 return Ok(i);
813 }
814 }
815
816 Err(SochDBError::ResourceExhausted(
817 "Too many concurrent readers".into(),
818 ))
819 }
820
821 pub fn unregister_reader(&self, slot_idx: usize) {
823 if slot_idx < self.reader_slots.len() {
824 self.reader_slots[slot_idx].release(self.our_pid);
825 *self.our_slot.write() = None;
826 }
827 }
828
829 pub fn try_acquire_writer(&self) -> Result<WriterGuard<'_>> {
833 let current = self.header.writer_lock.load(Ordering::Acquire);
834
835 if current == 0 {
836 if self
838 .header
839 .writer_lock
840 .compare_exchange(0, self.our_pid, Ordering::AcqRel, Ordering::Acquire)
841 .is_ok()
842 {
843 return Ok(WriterGuard { mvcc: self });
844 }
845 } else if current == self.our_pid {
846 return Ok(WriterGuard { mvcc: self });
848 }
849
850 Err(SochDBError::LockError(format!(
851 "Writer lock held by process {}",
852 current
853 )))
854 }
855
856 pub fn acquire_writer(&self, timeout: Duration) -> Result<WriterGuard<'_>> {
858 let deadline = std::time::Instant::now() + timeout;
859
860 loop {
861 match self.try_acquire_writer() {
862 Ok(guard) => return Ok(guard),
863 Err(_) if std::time::Instant::now() < deadline => {
864 std::thread::sleep(Duration::from_micros(100));
865 }
866 Err(e) => return Err(e),
867 }
868 }
869 }
870
871 fn release_writer(&self) {
873 let current = self.header.writer_lock.load(Ordering::Acquire);
874 if current == self.our_pid {
875 self.header.writer_lock.store(0, Ordering::Release);
876 }
877 }
878
879 pub fn version_store(&self) -> &VersionStore {
881 &self.version_store
882 }
883
884 pub fn min_active_snapshot(&self) -> u64 {
886 let mut min_ts = u64::MAX;
887
888 for slot in &self.reader_slots {
889 let pid = slot.pid.load(Ordering::Acquire);
890 if pid != 0 {
891 let ts = slot.snapshot_ts.load(Ordering::Acquire);
892 if ts > 0 && ts < min_ts {
893 min_ts = ts;
894 }
895 }
896 }
897
898 min_ts
899 }
900
901 pub fn min_active_epoch(&self) -> u32 {
903 let mut min_epoch = u32::MAX;
904
905 for slot in &self.reader_slots {
906 let pid = slot.pid.load(Ordering::Acquire);
907 if pid != 0 {
908 let epoch = slot.epoch.load(Ordering::Acquire);
909 if epoch < min_epoch {
910 min_epoch = epoch;
911 }
912 }
913 }
914
915 if min_epoch == u32::MAX {
916 self.current_epoch() as u32
918 } else {
919 min_epoch
920 }
921 }
922
923 pub fn run_gc(&self) -> usize {
927 let min_epoch = self.min_active_epoch();
928 let min_snapshot = self.min_active_snapshot();
929
930 self.version_store.gc(min_epoch, min_snapshot)
931 }
932
933 pub fn should_run_gc(&self) -> bool {
935 self.header
936 .commits_since_gc
937 .load(Ordering::Relaxed)
938 >= GC_COMMIT_INTERVAL
939 }
940
941 pub fn on_commit(&self) {
943 let count = self
944 .header
945 .commits_since_gc
946 .fetch_add(1, Ordering::Relaxed);
947
948 if count >= GC_COMMIT_INTERVAL {
949 self.header.commits_since_gc.store(0, Ordering::Relaxed);
950 let _ = self.run_gc();
951 }
952 }
953
954 pub fn cleanup_stale_readers(&self) -> usize {
956 let now = current_time_us();
957 let mut cleaned = 0;
958
959 for slot in &self.reader_slots {
960 if slot.is_stale(now) {
961 slot.pid.store(0, Ordering::Release);
962 cleaned += 1;
963 }
964 }
965
966 cleaned
967 }
968
969 pub fn advance_epoch(&self) -> u64 {
971 self.header.current_epoch.fetch_add(1, Ordering::AcqRel) + 1
972 }
973}
974
975impl Drop for ConcurrentMvcc {
976 fn drop(&mut self) {
977 if let Some(slot_idx) = *self.our_slot.read() {
979 self.unregister_reader(slot_idx);
980 }
981
982 self.release_writer();
984 }
985}
986
987pub struct WriterGuard<'a> {
993 mvcc: &'a ConcurrentMvcc,
994}
995
996impl<'a> Drop for WriterGuard<'a> {
997 fn drop(&mut self) {
998 self.mvcc.release_writer();
999 }
1000}
1001
1002#[inline]
1008fn current_time_us() -> u64 {
1009 SystemTime::now()
1010 .duration_since(UNIX_EPOCH)
1011 .unwrap()
1012 .as_micros() as u64
1013}
1014
1015#[cfg(unix)]
1017fn process_exists(pid: u32) -> bool {
1018 let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
1020 if result == 0 {
1021 true
1022 } else {
1023 std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
1025 }
1026}
1027
1028#[cfg(windows)]
1029fn process_exists(pid: u32) -> bool {
1030 use windows_sys::Win32::Foundation::CloseHandle;
1031 use windows_sys::Win32::System::Threading::{OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION};
1032
1033 unsafe {
1034 let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1035 if handle == 0 {
1036 false
1037 } else {
1038 CloseHandle(handle);
1039 true
1040 }
1041 }
1042}
1043
1044#[cfg(not(any(unix, windows)))]
1045fn process_exists(_pid: u32) -> bool {
1046 true }
1048
1049#[cfg(test)]
1054mod tests {
1055 use super::*;
1056 use std::sync::Arc;
1057 use std::thread;
1058
1059 #[test]
1060 fn test_hlc_timestamp_ordering() {
1061 let ts = AtomicU64::new(0);
1062
1063 let t1 = HlcTimestamp::allocate_next(&ts);
1064 let t2 = HlcTimestamp::allocate_next(&ts);
1065 let t3 = HlcTimestamp::allocate_next(&ts);
1066
1067 assert!(t1.raw() < t2.raw());
1068 assert!(t2.raw() < t3.raw());
1069 }
1070
1071 #[test]
1072 fn test_hlc_timestamp_concurrent() {
1073 let ts = Arc::new(AtomicU64::new(0));
1074 let mut handles = vec![];
1075
1076 for _ in 0..8 {
1077 let ts = ts.clone();
1078 handles.push(thread::spawn(move || {
1079 let mut timestamps = vec![];
1080 for _ in 0..1000 {
1081 timestamps.push(HlcTimestamp::allocate_next(&ts).raw());
1082 }
1083 timestamps
1084 }));
1085 }
1086
1087 let mut all_ts: Vec<u64> = handles
1088 .into_iter()
1089 .flat_map(|h| h.join().unwrap())
1090 .collect();
1091
1092 all_ts.sort();
1094 let len_before = all_ts.len();
1095 all_ts.dedup();
1096 assert_eq!(all_ts.len(), len_before, "Duplicate timestamps found!");
1097 }
1098
1099 #[test]
1100 fn test_version_chain_read_at() {
1101 let mut chain = VersionChain::new();
1102
1103 chain.versions.push(VersionEntry::new(100, 1, 1, Some(b"v100".to_vec())));
1105 chain.versions.push(VersionEntry::new(90, 2, 1, Some(b"v90".to_vec())));
1106 chain.versions.push(VersionEntry::new(80, 3, 1, Some(b"v80".to_vec())));
1107
1108 let v = chain.read_at(105, None).unwrap();
1110 assert_eq!(v.value, Some(b"v100".to_vec()));
1111
1112 let v = chain.read_at(95, None).unwrap();
1113 assert_eq!(v.value, Some(b"v90".to_vec()));
1114
1115 let v = chain.read_at(85, None).unwrap();
1116 assert_eq!(v.value, Some(b"v80".to_vec()));
1117
1118 assert!(chain.read_at(75, None).is_none());
1120 }
1121
1122 #[test]
1123 fn test_version_chain_gc() {
1124 let mut chain = VersionChain::new();
1125
1126 for i in 0..10 {
1129 chain.versions.push(VersionEntry::new(
1130 100 - i * 5, i as u64, (10 - i) as u32, Some(format!("v{}", 100 - i * 5).into_bytes()),
1134 ));
1135 }
1136
1137 assert_eq!(chain.len(), 10);
1138
1139 let reclaimed = chain.gc(7, 75);
1145
1146 assert!(reclaimed > 0, "Should have reclaimed some versions, got {}", reclaimed);
1150 assert!(chain.len() >= 1, "Should keep at least one version");
1151 }
1152
1153 #[test]
1154 fn test_version_store_basic() {
1155 let store = VersionStore::new();
1156
1157 store
1159 .insert_uncommitted(b"key1", Some(b"value1".to_vec()), 1, 1)
1160 .unwrap();
1161
1162 assert!(store.commit(b"key1", 1, 100));
1164
1165 let value = store.get(b"key1", 150, None);
1167 assert_eq!(value, Some(b"value1".to_vec()));
1168
1169 let value = store.get(b"key1", 50, None);
1171 assert!(value.is_none());
1172 }
1173
1174 #[test]
1175 fn test_reader_slot_claim_release() {
1176 let slot = ReaderSlot::empty();
1177
1178 assert!(slot.is_free());
1179
1180 assert!(slot.try_claim(1234, 100, 1));
1182 assert!(!slot.is_free());
1183
1184 assert!(!slot.try_claim(5678, 200, 2));
1186
1187 assert!(slot.try_claim(1234, 300, 3));
1189
1190 slot.release(1234);
1192 assert!(slot.is_free());
1193 }
1194}