1use std::fs::{File, OpenOptions};
58use std::path::{Path, PathBuf};
59use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
60use std::time::{Duration, SystemTime, UNIX_EPOCH};
61
62use dashmap::DashMap;
63use parking_lot::RwLock;
64use sochdb_core::{Result, SochDBError};
65
66pub type ConcurrentVersionChain = VersionChain;
68pub type ConcurrentVersionEntry = VersionEntry;
69
70const MVCC_MAGIC: u64 = 0x43435F564D484353; const MVCC_VERSION: u32 = 1;
79
80const MAX_READERS: usize = 1024;
82
83const READER_SLOT_SIZE: usize = 64;
85
86const HEADER_SIZE: usize = 64;
88
89const METADATA_SIZE: usize = HEADER_SIZE + (MAX_READERS * READER_SLOT_SIZE);
91
92const STALE_READER_TIMEOUT_US: u64 = 60_000_000;
94
95const GC_COMMIT_INTERVAL: u64 = 1000;
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
111pub struct HlcTimestamp(pub u64);
112
113impl HlcTimestamp {
114 #[inline]
116 pub fn new(physical_ms: u64, logical: u16) -> Self {
117 Self((physical_ms << 16) | (logical as u64))
118 }
119
120 #[inline]
122 pub fn physical_ms(&self) -> u64 {
123 self.0 >> 16
124 }
125
126 #[inline]
128 pub fn logical(&self) -> u16 {
129 (self.0 & 0xFFFF) as u16
130 }
131
132 #[inline]
134 pub fn raw(&self) -> u64 {
135 self.0
136 }
137
138 pub fn allocate_next(last: &AtomicU64) -> Self {
146 let physical_now = SystemTime::now()
147 .duration_since(UNIX_EPOCH)
148 .unwrap()
149 .as_millis() as u64;
150
151 loop {
152 let last_val = last.load(Ordering::Acquire);
153 let last_phys = last_val >> 16;
154 let last_log = (last_val & 0xFFFF) as u16;
155
156 let (new_phys, new_log) = if physical_now > last_phys {
157 (physical_now, 0u16)
158 } else {
159 (last_phys, last_log.saturating_add(1))
161 };
162
163 let new_val = (new_phys << 16) | (new_log as u64);
164
165 if last
166 .compare_exchange(last_val, new_val, Ordering::AcqRel, Ordering::Acquire)
167 .is_ok()
168 {
169 return Self(new_val);
170 }
171 std::hint::spin_loop();
173 }
174 }
175
176 #[inline]
178 pub fn read_current(ts: &AtomicU64) -> Self {
179 Self(ts.load(Ordering::Acquire))
180 }
181}
182
183impl From<u64> for HlcTimestamp {
184 fn from(val: u64) -> Self {
185 Self(val)
186 }
187}
188
189impl From<HlcTimestamp> for u64 {
190 fn from(ts: HlcTimestamp) -> Self {
191 ts.0
192 }
193}
194
195#[repr(C, align(64))]
214#[derive(Debug)]
215pub struct ReaderSlot {
216 pub pid: AtomicU32,
218 pub snapshot_ts: AtomicU64,
220 pub epoch: AtomicU32,
222 pub last_heartbeat: AtomicU64,
224 _reserved: [u8; 32],
226}
227
228impl ReaderSlot {
229 pub const fn empty() -> Self {
231 Self {
232 pid: AtomicU32::new(0),
233 snapshot_ts: AtomicU64::new(0),
234 epoch: AtomicU32::new(0),
235 last_heartbeat: AtomicU64::new(0),
236 _reserved: [0u8; 32],
237 }
238 }
239
240 #[inline]
242 pub fn is_free(&self) -> bool {
243 self.pid.load(Ordering::Acquire) == 0
244 }
245
246 #[inline]
250 pub fn try_claim(&self, my_pid: u32, snapshot_ts: u64, epoch: u32) -> bool {
251 let current_pid = self.pid.load(Ordering::Acquire);
252
253 if current_pid != 0 && current_pid != my_pid {
255 return false;
256 }
257
258 if self
259 .pid
260 .compare_exchange(current_pid, my_pid, Ordering::AcqRel, Ordering::Acquire)
261 .is_ok()
262 {
263 self.snapshot_ts.store(snapshot_ts, Ordering::Release);
265 self.epoch.store(epoch, Ordering::Release);
266 self.last_heartbeat
267 .store(current_time_us(), Ordering::Release);
268 true
269 } else {
270 false
271 }
272 }
273
274 #[inline]
276 pub fn release(&self, my_pid: u32) {
277 if self.pid.load(Ordering::Acquire) == my_pid {
279 self.snapshot_ts.store(0, Ordering::Release);
280 self.pid.store(0, Ordering::Release);
281 }
282 }
283
284 #[inline]
286 pub fn heartbeat(&self) {
287 self.last_heartbeat
288 .store(current_time_us(), Ordering::Release);
289 }
290
291 #[inline]
293 pub fn is_stale(&self, now_us: u64) -> bool {
294 let pid = self.pid.load(Ordering::Acquire);
295 if pid == 0 {
296 return false; }
298
299 let last_hb = self.last_heartbeat.load(Ordering::Acquire);
301 if now_us.saturating_sub(last_hb) > STALE_READER_TIMEOUT_US {
302 return true;
303 }
304
305 !process_exists(pid)
307 }
308}
309
310#[repr(C)]
316#[derive(Debug)]
317pub struct MvccHeader {
318 pub magic: u64,
320 pub version: u32,
322 pub page_size: u32,
324 pub num_readers: u32,
326 pub current_epoch: AtomicU64,
328 pub current_ts: AtomicU64,
330 pub writer_lock: AtomicU32,
332 pub commits_since_gc: AtomicU64,
334 _reserved: [u8; 4],
336}
337
338impl MvccHeader {
339 pub fn new() -> Self {
341 Self {
342 magic: MVCC_MAGIC,
343 version: MVCC_VERSION,
344 page_size: 4096,
345 num_readers: MAX_READERS as u32,
346 current_epoch: AtomicU64::new(1),
347 current_ts: AtomicU64::new(HlcTimestamp::new(
348 SystemTime::now()
349 .duration_since(UNIX_EPOCH)
350 .unwrap()
351 .as_millis() as u64,
352 0,
353 ).raw()),
354 writer_lock: AtomicU32::new(0),
355 commits_since_gc: AtomicU64::new(0),
356 _reserved: [0u8; 4],
357 }
358 }
359
360 pub fn validate(&self) -> Result<()> {
362 if self.magic != MVCC_MAGIC {
363 return Err(SochDBError::Corruption(
364 "Invalid MVCC metadata magic".into(),
365 ));
366 }
367 if self.version != MVCC_VERSION {
368 return Err(SochDBError::Corruption(format!(
369 "Unsupported MVCC version: {} (expected {})",
370 self.version, MVCC_VERSION
371 )));
372 }
373 Ok(())
374 }
375}
376
377impl Default for MvccHeader {
378 fn default() -> Self {
379 Self::new()
380 }
381}
382
383#[derive(Debug, Clone)]
389pub struct VersionEntry {
390 pub commit_ts: u64,
392 pub txn_id: u64,
394 pub epoch: u32,
396 pub value: Option<Vec<u8>>,
398}
399
400impl VersionEntry {
401 pub fn new(commit_ts: u64, txn_id: u64, epoch: u32, value: Option<Vec<u8>>) -> Self {
403 Self {
404 commit_ts,
405 txn_id,
406 epoch,
407 value,
408 }
409 }
410
411 #[inline]
413 pub fn is_visible_at(&self, snapshot_ts: u64) -> bool {
414 self.commit_ts > 0 && self.commit_ts < snapshot_ts
415 }
416}
417
418#[derive(Debug, Default)]
429pub struct VersionChain {
430 versions: Vec<VersionEntry>,
432 uncommitted: Option<VersionEntry>,
434}
435
436impl VersionChain {
437 pub fn new() -> Self {
439 Self {
440 versions: Vec::new(),
441 uncommitted: None,
442 }
443 }
444
445 pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64, epoch: u32) {
447 self.uncommitted = Some(VersionEntry {
448 commit_ts: 0, txn_id,
450 epoch,
451 value,
452 });
453 }
454
455 pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
459 if let Some(ref mut v) = self.uncommitted {
460 if v.txn_id == txn_id {
461 v.commit_ts = commit_ts;
462 let committed = self.uncommitted.take().unwrap();
463
464 let pos = self
467 .versions
468 .partition_point(|existing| existing.commit_ts > commit_ts);
469 self.versions.insert(pos, committed);
470
471 return true;
472 }
473 }
474 false
475 }
476
477 pub fn abort(&mut self, txn_id: u64) {
479 if let Some(ref v) = self.uncommitted {
480 if v.txn_id == txn_id {
481 self.uncommitted = None;
482 }
483 }
484 }
485
486 pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&VersionEntry> {
493 if let Some(txn_id) = current_txn_id {
495 if let Some(ref v) = self.uncommitted {
496 if v.txn_id == txn_id {
497 return Some(v);
498 }
499 }
500 }
501
502 let idx = self
505 .versions
506 .partition_point(|v| v.commit_ts >= snapshot_ts);
507
508 self.versions.get(idx)
509 }
510
511 pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
513 if let Some(ref v) = self.uncommitted {
514 return v.txn_id != my_txn_id;
515 }
516 false
517 }
518
519 pub fn gc(&mut self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
528 if self.versions.len() <= 1 {
529 return 0; }
531
532 let original_len = self.versions.len();
533
534 let mut keep_count = 1; for v in self.versions.iter().skip(1) {
540 if v.epoch >= min_epoch || v.commit_ts >= min_snapshot_ts {
541 keep_count += 1;
542 } else {
543 break; }
545 }
546
547 self.versions.truncate(keep_count);
548 original_len - self.versions.len()
549 }
550
551 pub fn len(&self) -> usize {
553 self.versions.len() + if self.uncommitted.is_some() { 1 } else { 0 }
554 }
555
556 pub fn is_empty(&self) -> bool {
558 self.versions.is_empty() && self.uncommitted.is_none()
559 }
560}
561
562pub struct VersionStore {
570 data: DashMap<Vec<u8>, VersionChain>,
572 stats: VersionStoreStats,
574}
575
576#[derive(Debug, Default)]
578pub struct VersionStoreStats {
579 pub num_keys: AtomicU64,
581 pub num_versions: AtomicU64,
583 pub gc_passes: AtomicU64,
585 pub versions_reclaimed: AtomicU64,
587}
588
589impl VersionStore {
590 pub fn new() -> Self {
592 Self {
593 data: DashMap::new(),
594 stats: VersionStoreStats::default(),
595 }
596 }
597
598 pub fn insert_uncommitted(
600 &self,
601 key: &[u8],
602 value: Option<Vec<u8>>,
603 txn_id: u64,
604 epoch: u32,
605 ) -> Result<()> {
606 let mut entry = self.data.entry(key.to_vec()).or_insert_with(|| {
607 self.stats.num_keys.fetch_add(1, Ordering::Relaxed);
608 VersionChain::new()
609 });
610
611 if entry.has_write_conflict(txn_id) {
613 return Err(SochDBError::Internal(
614 "Write conflict: another transaction has uncommitted write".into(),
615 ));
616 }
617
618 entry.add_uncommitted(value, txn_id, epoch);
619 self.stats.num_versions.fetch_add(1, Ordering::Relaxed);
620 Ok(())
621 }
622
623 pub fn commit(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
625 if let Some(mut entry) = self.data.get_mut(key) {
626 return entry.commit(txn_id, commit_ts);
627 }
628 false
629 }
630
631 pub fn abort(&self, key: &[u8], txn_id: u64) {
633 if let Some(mut entry) = self.data.get_mut(key) {
634 entry.abort(txn_id);
635 self.stats.num_versions.fetch_sub(1, Ordering::Relaxed);
636 }
637 }
638
639 pub fn get(&self, key: &[u8], snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<Vec<u8>> {
641 self.data.get(key).and_then(|chain| {
642 chain
643 .read_at(snapshot_ts, current_txn_id)
644 .and_then(|v| v.value.clone())
645 })
646 }
647
648 pub fn contains(&self, key: &[u8], snapshot_ts: u64) -> bool {
650 self.data
651 .get(key)
652 .map(|chain| chain.read_at(snapshot_ts, None).is_some())
653 .unwrap_or(false)
654 }
655
656 pub fn gc(&self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
660 let mut total_reclaimed = 0;
661
662 for mut entry in self.data.iter_mut() {
663 let reclaimed = entry.gc(min_epoch, min_snapshot_ts);
664 total_reclaimed += reclaimed;
665 }
666
667 self.stats
668 .gc_passes
669 .fetch_add(1, Ordering::Relaxed);
670 self.stats
671 .versions_reclaimed
672 .fetch_add(total_reclaimed as u64, Ordering::Relaxed);
673
674 total_reclaimed
675 }
676
677 pub fn len(&self) -> usize {
679 self.data.len()
680 }
681
682 pub fn is_empty(&self) -> bool {
684 self.data.is_empty()
685 }
686
687 pub fn stats(&self) -> &VersionStoreStats {
689 &self.stats
690 }
691}
692
693impl Default for VersionStore {
694 fn default() -> Self {
695 Self::new()
696 }
697}
698
699pub struct ConcurrentMvcc {
717 path: PathBuf,
719 _mmap: memmap2::MmapMut,
724 header: *const MvccHeader,
728 reader_slots_ptr: *const ReaderSlot,
732 num_reader_slots: usize,
734 version_store: VersionStore,
736 our_pid: u32,
738 our_slot: RwLock<Option<usize>>,
740}
741
742unsafe impl Send for ConcurrentMvcc {}
746unsafe impl Sync for ConcurrentMvcc {}
747
748impl ConcurrentMvcc {
749 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
755 let path = path.as_ref().to_path_buf();
756 std::fs::create_dir_all(&path)?;
757
758 let metadata_path = path.join(".mvcc_metadata");
759 let is_new = !metadata_path.exists();
760
761 let file = OpenOptions::new()
763 .create(true)
764 .read(true)
765 .write(true)
766 .open(&metadata_path)?;
767
768 let required_size = METADATA_SIZE as u64;
770 if file.metadata()?.len() < required_size {
771 file.set_len(required_size)?;
772 }
773
774 let mut mmap = unsafe { memmap2::MmapMut::map_mut(&file)? };
780
781 if is_new || mmap.len() < METADATA_SIZE {
782 let header = MvccHeader::new();
784 let header_bytes = unsafe {
785 std::slice::from_raw_parts(
786 &header as *const MvccHeader as *const u8,
787 std::mem::size_of::<MvccHeader>(),
788 )
789 };
790 mmap[..header_bytes.len()].copy_from_slice(header_bytes);
791
792 for i in 0..MAX_READERS {
794 let offset = HEADER_SIZE + i * READER_SLOT_SIZE;
795 let end = offset + READER_SLOT_SIZE;
796 if end <= mmap.len() {
797 mmap[offset..end].fill(0);
798 }
799 }
800
801 mmap.flush()?;
802 } else {
803 let header_ref = unsafe { &*(mmap.as_ptr() as *const MvccHeader) };
805 header_ref.validate()?;
806 }
807
808 let header = mmap.as_ptr() as *const MvccHeader;
810 let reader_slots_ptr = unsafe {
811 mmap.as_ptr().add(HEADER_SIZE) as *const ReaderSlot
812 };
813
814 Ok(Self {
815 path,
816 _mmap: mmap,
817 header,
818 reader_slots_ptr,
819 num_reader_slots: MAX_READERS,
820 version_store: VersionStore::new(),
821 our_pid: std::process::id(),
822 our_slot: RwLock::new(None),
823 })
824 }
825
826 #[inline]
830 fn header(&self) -> &MvccHeader {
831 unsafe { &*self.header }
832 }
833
834 #[inline]
838 fn reader_slot(&self, idx: usize) -> &ReaderSlot {
839 assert!(idx < self.num_reader_slots);
840 unsafe { &*self.reader_slots_ptr.add(idx) }
841 }
842
843 #[inline]
845 pub fn allocate_timestamp(&self) -> HlcTimestamp {
846 HlcTimestamp::allocate_next(&self.header().current_ts)
847 }
848
849 #[inline]
851 pub fn current_timestamp(&self) -> HlcTimestamp {
852 HlcTimestamp::read_current(&self.header().current_ts)
853 }
854
855 #[inline]
857 pub fn current_epoch(&self) -> u64 {
858 self.header().current_epoch.load(Ordering::Acquire)
859 }
860
861 pub fn register_reader(&self) -> Result<usize> {
866 let snapshot_ts = self.current_timestamp().raw();
867 let epoch = self.current_epoch() as u32;
868
869 for i in 0..self.num_reader_slots {
871 let slot = self.reader_slot(i);
872 if slot.try_claim(self.our_pid, snapshot_ts, epoch) {
873 *self.our_slot.write() = Some(i);
874 return Ok(i);
875 }
876 }
877
878 Err(SochDBError::ResourceExhausted(
879 "Too many concurrent readers".into(),
880 ))
881 }
882
883 pub fn unregister_reader(&self, slot_idx: usize) {
885 if slot_idx < self.num_reader_slots {
886 self.reader_slot(slot_idx).release(self.our_pid);
887 *self.our_slot.write() = None;
888 }
889 }
890
891 pub fn try_acquire_writer(&self) -> Result<WriterGuard<'_>> {
895 let current = self.header().writer_lock.load(Ordering::Acquire);
896
897 if current == 0 {
898 if self
900 .header()
901 .writer_lock
902 .compare_exchange(0, self.our_pid, Ordering::AcqRel, Ordering::Acquire)
903 .is_ok()
904 {
905 return Ok(WriterGuard { mvcc: self });
906 }
907 } else if current == self.our_pid {
908 return Ok(WriterGuard { mvcc: self });
910 }
911
912 Err(SochDBError::LockError(format!(
913 "Writer lock held by process {}",
914 current
915 )))
916 }
917
918 pub fn acquire_writer(&self, timeout: Duration) -> Result<WriterGuard<'_>> {
920 let deadline = std::time::Instant::now() + timeout;
921
922 loop {
923 match self.try_acquire_writer() {
924 Ok(guard) => return Ok(guard),
925 Err(_) if std::time::Instant::now() < deadline => {
926 std::thread::sleep(Duration::from_micros(100));
927 }
928 Err(e) => return Err(e),
929 }
930 }
931 }
932
933 fn release_writer(&self) {
935 let current = self.header().writer_lock.load(Ordering::Acquire);
936 if current == self.our_pid {
937 self.header().writer_lock.store(0, Ordering::Release);
938 }
939 }
940
941 pub fn version_store(&self) -> &VersionStore {
943 &self.version_store
944 }
945
946 pub fn min_active_snapshot(&self) -> u64 {
948 let mut min_ts = u64::MAX;
949
950 for i in 0..self.num_reader_slots {
951 let slot = self.reader_slot(i);
952 let pid = slot.pid.load(Ordering::Acquire);
953 if pid != 0 {
954 let ts = slot.snapshot_ts.load(Ordering::Acquire);
955 if ts > 0 && ts < min_ts {
956 min_ts = ts;
957 }
958 }
959 }
960
961 min_ts
962 }
963
964 pub fn min_active_epoch(&self) -> u32 {
966 let mut min_epoch = u32::MAX;
967
968 for i in 0..self.num_reader_slots {
969 let slot = self.reader_slot(i);
970 let pid = slot.pid.load(Ordering::Acquire);
971 if pid != 0 {
972 let epoch = slot.epoch.load(Ordering::Acquire);
973 if epoch < min_epoch {
974 min_epoch = epoch;
975 }
976 }
977 }
978
979 if min_epoch == u32::MAX {
980 self.current_epoch() as u32
982 } else {
983 min_epoch
984 }
985 }
986
987 pub fn run_gc(&self) -> usize {
991 let min_epoch = self.min_active_epoch();
992 let min_snapshot = self.min_active_snapshot();
993
994 self.version_store.gc(min_epoch, min_snapshot)
995 }
996
997 pub fn should_run_gc(&self) -> bool {
999 self.header()
1000 .commits_since_gc
1001 .load(Ordering::Relaxed)
1002 >= GC_COMMIT_INTERVAL
1003 }
1004
1005 pub fn on_commit(&self) {
1007 let count = self
1008 .header()
1009 .commits_since_gc
1010 .fetch_add(1, Ordering::Relaxed);
1011
1012 if count >= GC_COMMIT_INTERVAL {
1013 self.header().commits_since_gc.store(0, Ordering::Relaxed);
1014 let _ = self.run_gc();
1015 }
1016 }
1017
1018 pub fn cleanup_stale_readers(&self) -> usize {
1020 let now = current_time_us();
1021 let mut cleaned = 0;
1022
1023 for i in 0..self.num_reader_slots {
1024 let slot = self.reader_slot(i);
1025 if slot.is_stale(now) {
1026 slot.pid.store(0, Ordering::Release);
1027 cleaned += 1;
1028 }
1029 }
1030
1031 cleaned
1032 }
1033
1034 pub fn advance_epoch(&self) -> u64 {
1036 self.header().current_epoch.fetch_add(1, Ordering::AcqRel) + 1
1037 }
1038}
1039
1040impl Drop for ConcurrentMvcc {
1041 fn drop(&mut self) {
1042 if let Some(slot_idx) = *self.our_slot.read() {
1044 self.unregister_reader(slot_idx);
1045 }
1046
1047 self.release_writer();
1049 }
1050}
1051
1052pub struct WriterGuard<'a> {
1058 mvcc: &'a ConcurrentMvcc,
1059}
1060
1061impl<'a> Drop for WriterGuard<'a> {
1062 fn drop(&mut self) {
1063 self.mvcc.release_writer();
1064 }
1065}
1066
1067#[inline]
1073fn current_time_us() -> u64 {
1074 SystemTime::now()
1075 .duration_since(UNIX_EPOCH)
1076 .unwrap()
1077 .as_micros() as u64
1078}
1079
1080#[cfg(unix)]
1082fn process_exists(pid: u32) -> bool {
1083 let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
1085 if result == 0 {
1086 true
1087 } else {
1088 std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
1090 }
1091}
1092
1093#[cfg(windows)]
1094fn process_exists(pid: u32) -> bool {
1095 use windows_sys::Win32::Foundation::CloseHandle;
1096 use windows_sys::Win32::System::Threading::{OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION};
1097
1098 unsafe {
1099 let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1100 if handle == 0 {
1101 false
1102 } else {
1103 CloseHandle(handle);
1104 true
1105 }
1106 }
1107}
1108
1109#[cfg(not(any(unix, windows)))]
1110fn process_exists(_pid: u32) -> bool {
1111 true }
1113
1114#[cfg(test)]
1119mod tests {
1120 use super::*;
1121 use std::sync::Arc;
1122 use std::thread;
1123
1124 #[test]
1125 fn test_struct_sizes() {
1126 eprintln!("MvccHeader size: {}", std::mem::size_of::<MvccHeader>());
1127 eprintln!("MvccHeader align: {}", std::mem::align_of::<MvccHeader>());
1128 eprintln!("ReaderSlot size: {}", std::mem::size_of::<ReaderSlot>());
1129 eprintln!("ReaderSlot align: {}", std::mem::align_of::<ReaderSlot>());
1130 eprintln!("HEADER_SIZE constant: {}", HEADER_SIZE);
1131 eprintln!("READER_SLOT_SIZE constant: {}", READER_SLOT_SIZE);
1132 eprintln!("METADATA_SIZE constant: {}", METADATA_SIZE);
1133
1134 assert_eq!(std::mem::size_of::<MvccHeader>(), HEADER_SIZE,
1135 "MvccHeader size mismatch! Actual: {}, Expected: {}",
1136 std::mem::size_of::<MvccHeader>(), HEADER_SIZE);
1137 assert_eq!(std::mem::size_of::<ReaderSlot>(), READER_SLOT_SIZE,
1138 "ReaderSlot size mismatch! Actual: {}, Expected: {}",
1139 std::mem::size_of::<ReaderSlot>(), READER_SLOT_SIZE);
1140 }
1141
1142 #[test]
1143 fn test_hlc_timestamp_ordering() {
1144 let ts = AtomicU64::new(0);
1145
1146 let t1 = HlcTimestamp::allocate_next(&ts);
1147 let t2 = HlcTimestamp::allocate_next(&ts);
1148 let t3 = HlcTimestamp::allocate_next(&ts);
1149
1150 assert!(t1.raw() < t2.raw());
1151 assert!(t2.raw() < t3.raw());
1152 }
1153
1154 #[test]
1155 fn test_hlc_timestamp_concurrent() {
1156 let ts = Arc::new(AtomicU64::new(0));
1157 let mut handles = vec![];
1158
1159 for _ in 0..8 {
1160 let ts = ts.clone();
1161 handles.push(thread::spawn(move || {
1162 let mut timestamps = vec![];
1163 for _ in 0..1000 {
1164 timestamps.push(HlcTimestamp::allocate_next(&ts).raw());
1165 }
1166 timestamps
1167 }));
1168 }
1169
1170 let mut all_ts: Vec<u64> = handles
1171 .into_iter()
1172 .flat_map(|h| h.join().unwrap())
1173 .collect();
1174
1175 all_ts.sort();
1177 let len_before = all_ts.len();
1178 all_ts.dedup();
1179 assert_eq!(all_ts.len(), len_before, "Duplicate timestamps found!");
1180 }
1181
1182 #[test]
1183 fn test_version_chain_read_at() {
1184 let mut chain = VersionChain::new();
1185
1186 chain.versions.push(VersionEntry::new(100, 1, 1, Some(b"v100".to_vec())));
1188 chain.versions.push(VersionEntry::new(90, 2, 1, Some(b"v90".to_vec())));
1189 chain.versions.push(VersionEntry::new(80, 3, 1, Some(b"v80".to_vec())));
1190
1191 let v = chain.read_at(105, None).unwrap();
1193 assert_eq!(v.value, Some(b"v100".to_vec()));
1194
1195 let v = chain.read_at(95, None).unwrap();
1196 assert_eq!(v.value, Some(b"v90".to_vec()));
1197
1198 let v = chain.read_at(85, None).unwrap();
1199 assert_eq!(v.value, Some(b"v80".to_vec()));
1200
1201 assert!(chain.read_at(75, None).is_none());
1203 }
1204
1205 #[test]
1206 fn test_version_chain_gc() {
1207 let mut chain = VersionChain::new();
1208
1209 for i in 0..10 {
1212 chain.versions.push(VersionEntry::new(
1213 100 - i * 5, i as u64, (10 - i) as u32, Some(format!("v{}", 100 - i * 5).into_bytes()),
1217 ));
1218 }
1219
1220 assert_eq!(chain.len(), 10);
1221
1222 let reclaimed = chain.gc(7, 75);
1228
1229 assert!(reclaimed > 0, "Should have reclaimed some versions, got {}", reclaimed);
1233 assert!(chain.len() >= 1, "Should keep at least one version");
1234 }
1235
1236 #[test]
1237 fn test_version_store_basic() {
1238 let store = VersionStore::new();
1239
1240 store
1242 .insert_uncommitted(b"key1", Some(b"value1".to_vec()), 1, 1)
1243 .unwrap();
1244
1245 assert!(store.commit(b"key1", 1, 100));
1247
1248 let value = store.get(b"key1", 150, None);
1250 assert_eq!(value, Some(b"value1".to_vec()));
1251
1252 let value = store.get(b"key1", 50, None);
1254 assert!(value.is_none());
1255 }
1256
1257 #[test]
1258 fn test_reader_slot_claim_release() {
1259 let slot = ReaderSlot::empty();
1260
1261 assert!(slot.is_free());
1262
1263 assert!(slot.try_claim(1234, 100, 1));
1265 assert!(!slot.is_free());
1266
1267 assert!(!slot.try_claim(5678, 200, 2));
1269
1270 assert!(slot.try_claim(1234, 300, 3));
1272
1273 slot.release(1234);
1275 assert!(slot.is_free());
1276 }
1277}