1use std::fs::OpenOptions;
100use std::path::{Path, PathBuf};
101use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
102use std::time::{Duration, SystemTime, UNIX_EPOCH};
103
104use dashmap::DashMap;
105use parking_lot::RwLock;
106use sochdb_core::version_chain::{
107 BinarySearchChain, ChainEntry, MvccVersionChain, MvccVersionChainMut, Timestamp, TxnId,
108 VisibilityContext, WriteConflictDetection,
109};
110use sochdb_core::{Result, SochDBError};
111
112pub type ConcurrentVersionChain = VersionChain;
114pub type ConcurrentVersionEntry = VersionEntry;
115
116const MVCC_MAGIC: u64 = 0x43435F564D484353; const MVCC_VERSION: u32 = 1;
125
126const MAX_READERS: usize = 1024;
128
129const READER_SLOT_SIZE: usize = 64;
131
132const HEADER_SIZE: usize = 64;
134
135const METADATA_SIZE: usize = HEADER_SIZE + (MAX_READERS * READER_SLOT_SIZE);
137
138const STALE_READER_TIMEOUT_US: u64 = 60_000_000;
140
141const GC_COMMIT_INTERVAL: u64 = 1000;
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
157pub struct HlcTimestamp(pub u64);
158
159impl HlcTimestamp {
160 #[inline]
162 pub fn new(physical_ms: u64, logical: u16) -> Self {
163 Self((physical_ms << 16) | (logical as u64))
164 }
165
166 #[inline]
168 pub fn physical_ms(&self) -> u64 {
169 self.0 >> 16
170 }
171
172 #[inline]
174 pub fn logical(&self) -> u16 {
175 (self.0 & 0xFFFF) as u16
176 }
177
178 #[inline]
180 pub fn raw(&self) -> u64 {
181 self.0
182 }
183
184 pub fn allocate_next(last: &AtomicU64) -> Self {
192 let physical_now = SystemTime::now()
193 .duration_since(UNIX_EPOCH)
194 .unwrap()
195 .as_millis() as u64;
196
197 loop {
198 let last_val = last.load(Ordering::Acquire);
199 let last_phys = last_val >> 16;
200 let last_log = (last_val & 0xFFFF) as u16;
201
202 let (new_phys, new_log) = if physical_now > last_phys {
203 (physical_now, 0u16)
204 } else {
205 (last_phys, last_log.saturating_add(1))
207 };
208
209 let new_val = (new_phys << 16) | (new_log as u64);
210
211 if last
212 .compare_exchange(last_val, new_val, Ordering::AcqRel, Ordering::Acquire)
213 .is_ok()
214 {
215 return Self(new_val);
216 }
217 std::hint::spin_loop();
219 }
220 }
221
222 #[inline]
224 pub fn read_current(ts: &AtomicU64) -> Self {
225 Self(ts.load(Ordering::Acquire))
226 }
227}
228
229impl From<u64> for HlcTimestamp {
230 fn from(val: u64) -> Self {
231 Self(val)
232 }
233}
234
235impl From<HlcTimestamp> for u64 {
236 fn from(ts: HlcTimestamp) -> Self {
237 ts.0
238 }
239}
240
241#[repr(C, align(64))]
260#[derive(Debug)]
261pub struct ReaderSlot {
262 pub pid: AtomicU32,
264 pub snapshot_ts: AtomicU64,
266 pub epoch: AtomicU32,
268 pub last_heartbeat: AtomicU64,
270 _reserved: [u8; 32],
272}
273
274impl ReaderSlot {
275 pub const fn empty() -> Self {
277 Self {
278 pid: AtomicU32::new(0),
279 snapshot_ts: AtomicU64::new(0),
280 epoch: AtomicU32::new(0),
281 last_heartbeat: AtomicU64::new(0),
282 _reserved: [0u8; 32],
283 }
284 }
285
286 #[inline]
288 pub fn is_free(&self) -> bool {
289 self.pid.load(Ordering::Acquire) == 0
290 }
291
292 #[inline]
296 pub fn try_claim(&self, my_pid: u32, snapshot_ts: u64, epoch: u32) -> bool {
297 let current_pid = self.pid.load(Ordering::Acquire);
298
299 if current_pid != 0 && current_pid != my_pid {
301 return false;
302 }
303
304 if self
305 .pid
306 .compare_exchange(current_pid, my_pid, Ordering::AcqRel, Ordering::Acquire)
307 .is_ok()
308 {
309 self.snapshot_ts.store(snapshot_ts, Ordering::Release);
311 self.epoch.store(epoch, Ordering::Release);
312 self.last_heartbeat
313 .store(current_time_us(), Ordering::Release);
314 true
315 } else {
316 false
317 }
318 }
319
320 #[inline]
322 pub fn release(&self, my_pid: u32) {
323 if self.pid.load(Ordering::Acquire) == my_pid {
325 self.snapshot_ts.store(0, Ordering::Release);
326 self.pid.store(0, Ordering::Release);
327 }
328 }
329
330 #[inline]
332 pub fn heartbeat(&self) {
333 self.last_heartbeat
334 .store(current_time_us(), Ordering::Release);
335 }
336
337 #[inline]
339 pub fn is_stale(&self, now_us: u64) -> bool {
340 let pid = self.pid.load(Ordering::Acquire);
341 if pid == 0 {
342 return false; }
344
345 let last_hb = self.last_heartbeat.load(Ordering::Acquire);
347 if now_us.saturating_sub(last_hb) > STALE_READER_TIMEOUT_US {
348 return true;
349 }
350
351 !process_exists(pid)
353 }
354}
355
356#[repr(C)]
362#[derive(Debug)]
363pub struct MvccHeader {
364 pub magic: u64,
366 pub version: u32,
368 pub page_size: u32,
370 pub num_readers: u32,
372 pub current_epoch: AtomicU64,
374 pub current_ts: AtomicU64,
376 pub writer_lock: AtomicU32,
378 pub commits_since_gc: AtomicU64,
380 _reserved: [u8; 4],
382}
383
384impl MvccHeader {
385 pub fn new() -> Self {
387 Self {
388 magic: MVCC_MAGIC,
389 version: MVCC_VERSION,
390 page_size: 4096,
391 num_readers: MAX_READERS as u32,
392 current_epoch: AtomicU64::new(1),
393 current_ts: AtomicU64::new(
394 HlcTimestamp::new(
395 SystemTime::now()
396 .duration_since(UNIX_EPOCH)
397 .unwrap()
398 .as_millis() as u64,
399 0,
400 )
401 .raw(),
402 ),
403 writer_lock: AtomicU32::new(0),
404 commits_since_gc: AtomicU64::new(0),
405 _reserved: [0u8; 4],
406 }
407 }
408
409 pub fn validate(&self) -> Result<()> {
411 if self.magic != MVCC_MAGIC {
412 return Err(SochDBError::Corruption(
413 "Invalid MVCC metadata magic".into(),
414 ));
415 }
416 if self.version != MVCC_VERSION {
417 return Err(SochDBError::Corruption(format!(
418 "Unsupported MVCC version: {} (expected {})",
419 self.version, MVCC_VERSION
420 )));
421 }
422 Ok(())
423 }
424}
425
426impl Default for MvccHeader {
427 fn default() -> Self {
428 Self::new()
429 }
430}
431
432#[derive(Debug, Clone)]
438pub struct VersionEntry {
439 pub commit_ts: u64,
441 pub txn_id: u64,
443 pub epoch: u32,
445 pub value: Option<Vec<u8>>,
447}
448
449impl VersionEntry {
450 pub fn new(commit_ts: u64, txn_id: u64, epoch: u32, value: Option<Vec<u8>>) -> Self {
452 Self {
453 commit_ts,
454 txn_id,
455 epoch,
456 value,
457 }
458 }
459
460 #[inline]
462 pub fn is_visible_at(&self, snapshot_ts: u64) -> bool {
463 self.commit_ts > 0 && self.commit_ts < snapshot_ts
464 }
465}
466
467impl ChainEntry for VersionEntry {
469 #[inline]
470 fn commit_ts(&self) -> u64 {
471 self.commit_ts
472 }
473 #[inline]
474 fn txn_id(&self) -> u64 {
475 self.txn_id
476 }
477 #[inline]
478 fn set_commit_ts(&mut self, ts: u64) {
479 self.commit_ts = ts;
480 }
481}
482
483#[derive(Debug, Default)]
499pub struct VersionChain {
500 inner: BinarySearchChain<VersionEntry>,
502}
503
504impl VersionChain {
505 pub fn new() -> Self {
507 Self {
508 inner: BinarySearchChain::new(),
509 }
510 }
511
512 pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64, epoch: u32) {
514 self.inner.set_uncommitted(VersionEntry {
515 commit_ts: 0,
516 txn_id,
517 epoch,
518 value,
519 });
520 }
521
522 #[inline]
524 pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
525 self.inner.commit(txn_id, commit_ts)
526 }
527
528 #[inline]
530 pub fn abort(&mut self, txn_id: u64) {
531 self.inner.abort(txn_id);
532 }
533
534 #[inline]
538 pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&VersionEntry> {
539 self.inner.read_at(snapshot_ts, current_txn_id)
540 }
541
542 #[inline]
544 pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
545 self.inner.has_write_conflict(my_txn_id)
546 }
547
548 pub fn gc(&mut self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
553 let versions = self.inner.committed_versions_mut();
554 if versions.len() <= 1 {
555 return 0;
556 }
557
558 let original_len = versions.len();
559
560 let mut keep_count = 1; for v in versions.iter().skip(1) {
562 if v.epoch >= min_epoch || v.commit_ts >= min_snapshot_ts {
563 keep_count += 1;
564 } else {
565 break; }
567 }
568
569 versions.truncate(keep_count);
570 original_len - versions.len()
571 }
572
573 #[inline]
575 pub fn len(&self) -> usize {
576 self.inner.version_count()
577 }
578
579 #[inline]
581 pub fn is_empty(&self) -> bool {
582 self.inner.is_empty()
583 }
584}
585
586impl MvccVersionChain for VersionChain {
591 type Value = Option<Vec<u8>>;
592
593 fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value> {
594 self.inner
595 .read_at(ctx.snapshot_ts, Some(ctx.reader_txn_id))
596 .map(|v| &v.value)
597 }
598
599 fn get_latest(&self) -> Option<&Self::Value> {
600 self.inner.latest().map(|v| &v.value)
601 }
602
603 fn version_count(&self) -> usize {
604 self.inner.version_count()
605 }
606}
607
608impl MvccVersionChainMut for VersionChain {
609 fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId) {
610 self.add_uncommitted(value, txn_id, 0);
611 }
612
613 fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool {
614 self.inner.commit(txn_id, commit_ts)
615 }
616
617 fn delete_version(&mut self, txn_id: TxnId, _delete_ts: Timestamp) -> bool {
618 self.add_uncommitted(None, txn_id, 0);
619 true
620 }
621
622 fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize) {
623 let removed = self.gc(0, min_visible_ts);
624 (removed, removed * std::mem::size_of::<VersionEntry>())
625 }
626}
627
628impl WriteConflictDetection for VersionChain {
629 fn has_write_conflict(&self, txn_id: TxnId) -> bool {
630 self.inner.has_write_conflict(txn_id)
631 }
632}
633
634pub struct VersionStore {
642 data: DashMap<Vec<u8>, VersionChain>,
644 stats: VersionStoreStats,
646}
647
648#[derive(Debug, Default)]
650pub struct VersionStoreStats {
651 pub num_keys: AtomicU64,
653 pub num_versions: AtomicU64,
655 pub gc_passes: AtomicU64,
657 pub versions_reclaimed: AtomicU64,
659}
660
661impl VersionStore {
662 pub fn new() -> Self {
664 Self {
665 data: DashMap::new(),
666 stats: VersionStoreStats::default(),
667 }
668 }
669
670 pub fn insert_uncommitted(
672 &self,
673 key: &[u8],
674 value: Option<Vec<u8>>,
675 txn_id: u64,
676 epoch: u32,
677 ) -> Result<()> {
678 let mut entry = self.data.entry(key.to_vec()).or_insert_with(|| {
679 self.stats.num_keys.fetch_add(1, Ordering::Relaxed);
680 VersionChain::new()
681 });
682
683 if entry.has_write_conflict(txn_id) {
685 return Err(SochDBError::Internal(
686 "Write conflict: another transaction has uncommitted write".into(),
687 ));
688 }
689
690 entry.add_uncommitted(value, txn_id, epoch);
691 self.stats.num_versions.fetch_add(1, Ordering::Relaxed);
692 Ok(())
693 }
694
695 pub fn commit(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
697 if let Some(mut entry) = self.data.get_mut(key) {
698 return entry.commit(txn_id, commit_ts);
699 }
700 false
701 }
702
703 pub fn abort(&self, key: &[u8], txn_id: u64) {
705 if let Some(mut entry) = self.data.get_mut(key) {
706 entry.abort(txn_id);
707 self.stats.num_versions.fetch_sub(1, Ordering::Relaxed);
708 }
709 }
710
711 pub fn get(
713 &self,
714 key: &[u8],
715 snapshot_ts: u64,
716 current_txn_id: Option<u64>,
717 ) -> Option<Vec<u8>> {
718 self.data.get(key).and_then(|chain| {
719 chain
720 .read_at(snapshot_ts, current_txn_id)
721 .and_then(|v| v.value.clone())
722 })
723 }
724
725 pub fn contains(&self, key: &[u8], snapshot_ts: u64) -> bool {
727 self.data
728 .get(key)
729 .map(|chain| chain.read_at(snapshot_ts, None).is_some())
730 .unwrap_or(false)
731 }
732
733 pub fn gc(&self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
737 let mut total_reclaimed = 0;
738
739 for mut entry in self.data.iter_mut() {
740 let reclaimed = entry.gc(min_epoch, min_snapshot_ts);
741 total_reclaimed += reclaimed;
742 }
743
744 self.stats.gc_passes.fetch_add(1, Ordering::Relaxed);
745 self.stats
746 .versions_reclaimed
747 .fetch_add(total_reclaimed as u64, Ordering::Relaxed);
748
749 total_reclaimed
750 }
751
752 pub fn len(&self) -> usize {
754 self.data.len()
755 }
756
757 pub fn is_empty(&self) -> bool {
759 self.data.is_empty()
760 }
761
762 pub fn stats(&self) -> &VersionStoreStats {
764 &self.stats
765 }
766}
767
768impl Default for VersionStore {
769 fn default() -> Self {
770 Self::new()
771 }
772}
773
774impl sochdb_core::version_chain::MvccStore for VersionStore {
779 fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
780 self.get(key, snapshot_ts, txn_id)
781 }
782
783 fn mvcc_put(
784 &self,
785 key: &[u8],
786 value: Option<Vec<u8>>,
787 txn_id: u64,
788 ) -> std::result::Result<(), sochdb_core::version_chain::MvccStoreError> {
789 let mut entry = self.data.entry(key.to_vec()).or_insert_with(|| {
790 self.stats.num_keys.fetch_add(1, Ordering::Relaxed);
791 VersionChain::new()
792 });
793 if entry.has_write_conflict(txn_id) {
794 return Err(sochdb_core::version_chain::MvccStoreError::WriteConflict);
795 }
796 entry.add_uncommitted(value, txn_id, 0);
797 self.stats.num_versions.fetch_add(1, Ordering::Relaxed);
798 Ok(())
799 }
800
801 fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
802 self.commit(key, txn_id, commit_ts)
803 }
804
805 fn mvcc_abort_key(&self, key: &[u8], txn_id: u64) {
806 self.abort(key, txn_id);
807 }
808
809 fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool {
810 self.data
811 .get(key)
812 .map(|chain| chain.has_write_conflict(txn_id))
813 .unwrap_or(false)
814 }
815
816 fn mvcc_gc(&self, min_ts: u64) -> sochdb_core::version_chain::MvccGcStats {
817 let mut stats = sochdb_core::version_chain::MvccGcStats::default();
818 for mut entry in self.data.iter_mut() {
819 stats.keys_scanned += 1;
820 let removed = entry.gc(0, min_ts);
821 stats.versions_removed += removed;
822 }
823 self.stats.gc_passes.fetch_add(1, Ordering::Relaxed);
824 self.stats
825 .versions_reclaimed
826 .fetch_add(stats.versions_removed as u64, Ordering::Relaxed);
827 stats
828 }
829
830 fn mvcc_key_count(&self) -> usize {
831 self.len()
832 }
833}
834
835pub struct ConcurrentMvcc {
853 path: PathBuf,
855 _mmap: memmap2::MmapMut,
860 header: *const MvccHeader,
864 reader_slots_ptr: *const ReaderSlot,
868 num_reader_slots: usize,
870 version_store: VersionStore,
872 our_pid: u32,
874 our_slot: RwLock<Option<usize>>,
876}
877
878unsafe impl Send for ConcurrentMvcc {}
882unsafe impl Sync for ConcurrentMvcc {}
883
884impl ConcurrentMvcc {
885 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
891 let path = path.as_ref().to_path_buf();
892 std::fs::create_dir_all(&path)?;
893
894 let metadata_path = path.join(".mvcc_metadata");
895 let is_new = !metadata_path.exists();
896
897 let file = OpenOptions::new()
899 .create(true)
900 .read(true)
901 .write(true)
902 .open(&metadata_path)?;
903
904 let required_size = METADATA_SIZE as u64;
906 if file.metadata()?.len() < required_size {
907 file.set_len(required_size)?;
908 }
909
910 let mut mmap = unsafe { memmap2::MmapMut::map_mut(&file)? };
916
917 if is_new || mmap.len() < METADATA_SIZE {
918 let header = MvccHeader::new();
920 let header_bytes = unsafe {
921 std::slice::from_raw_parts(
922 &header as *const MvccHeader as *const u8,
923 std::mem::size_of::<MvccHeader>(),
924 )
925 };
926 mmap[..header_bytes.len()].copy_from_slice(header_bytes);
927
928 for i in 0..MAX_READERS {
930 let offset = HEADER_SIZE + i * READER_SLOT_SIZE;
931 let end = offset + READER_SLOT_SIZE;
932 if end <= mmap.len() {
933 mmap[offset..end].fill(0);
934 }
935 }
936
937 mmap.flush()?;
938 } else {
939 let header_ref = unsafe { &*(mmap.as_ptr() as *const MvccHeader) };
941 header_ref.validate()?;
942 }
943
944 let header = mmap.as_ptr() as *const MvccHeader;
946 let reader_slots_ptr = unsafe { mmap.as_ptr().add(HEADER_SIZE) as *const ReaderSlot };
947
948 Ok(Self {
949 path,
950 _mmap: mmap,
951 header,
952 reader_slots_ptr,
953 num_reader_slots: MAX_READERS,
954 version_store: VersionStore::new(),
955 our_pid: std::process::id(),
956 our_slot: RwLock::new(None),
957 })
958 }
959
960 #[inline]
964 fn header(&self) -> &MvccHeader {
965 unsafe { &*self.header }
966 }
967
968 #[inline]
972 fn reader_slot(&self, idx: usize) -> &ReaderSlot {
973 assert!(idx < self.num_reader_slots);
974 unsafe { &*self.reader_slots_ptr.add(idx) }
975 }
976
977 #[inline]
979 pub fn allocate_timestamp(&self) -> HlcTimestamp {
980 HlcTimestamp::allocate_next(&self.header().current_ts)
981 }
982
983 #[inline]
985 pub fn current_timestamp(&self) -> HlcTimestamp {
986 HlcTimestamp::read_current(&self.header().current_ts)
987 }
988
989 #[inline]
991 pub fn current_epoch(&self) -> u64 {
992 self.header().current_epoch.load(Ordering::Acquire)
993 }
994
995 pub fn register_reader(&self) -> Result<usize> {
1000 let snapshot_ts = self.current_timestamp().raw();
1001 let epoch = self.current_epoch() as u32;
1002
1003 for i in 0..self.num_reader_slots {
1005 let slot = self.reader_slot(i);
1006 if slot.try_claim(self.our_pid, snapshot_ts, epoch) {
1007 *self.our_slot.write() = Some(i);
1008 return Ok(i);
1009 }
1010 }
1011
1012 Err(SochDBError::ResourceExhausted(
1013 "Too many concurrent readers".into(),
1014 ))
1015 }
1016
1017 pub fn unregister_reader(&self, slot_idx: usize) {
1019 if slot_idx < self.num_reader_slots {
1020 self.reader_slot(slot_idx).release(self.our_pid);
1021 *self.our_slot.write() = None;
1022 }
1023 }
1024
1025 pub fn try_acquire_writer(&self) -> Result<WriterGuard<'_>> {
1029 let current = self.header().writer_lock.load(Ordering::Acquire);
1030
1031 if current == 0 {
1032 if self
1034 .header()
1035 .writer_lock
1036 .compare_exchange(0, self.our_pid, Ordering::AcqRel, Ordering::Acquire)
1037 .is_ok()
1038 {
1039 return Ok(WriterGuard { mvcc: self });
1040 }
1041 } else if current == self.our_pid {
1042 return Ok(WriterGuard { mvcc: self });
1044 }
1045
1046 Err(SochDBError::LockError(format!(
1047 "Writer lock held by process {}",
1048 current
1049 )))
1050 }
1051
1052 pub fn acquire_writer(&self, timeout: Duration) -> Result<WriterGuard<'_>> {
1054 let deadline = std::time::Instant::now() + timeout;
1055
1056 loop {
1057 match self.try_acquire_writer() {
1058 Ok(guard) => return Ok(guard),
1059 Err(_) if std::time::Instant::now() < deadline => {
1060 std::thread::sleep(Duration::from_micros(100));
1061 }
1062 Err(e) => return Err(e),
1063 }
1064 }
1065 }
1066
1067 fn release_writer(&self) {
1069 let current = self.header().writer_lock.load(Ordering::Acquire);
1070 if current == self.our_pid {
1071 self.header().writer_lock.store(0, Ordering::Release);
1072 }
1073 }
1074
1075 pub fn version_store(&self) -> &VersionStore {
1077 &self.version_store
1078 }
1079
1080 pub fn min_active_snapshot(&self) -> u64 {
1082 let mut min_ts = u64::MAX;
1083
1084 for i in 0..self.num_reader_slots {
1085 let slot = self.reader_slot(i);
1086 let pid = slot.pid.load(Ordering::Acquire);
1087 if pid != 0 {
1088 let ts = slot.snapshot_ts.load(Ordering::Acquire);
1089 if ts > 0 && ts < min_ts {
1090 min_ts = ts;
1091 }
1092 }
1093 }
1094
1095 min_ts
1096 }
1097
1098 pub fn min_active_epoch(&self) -> u32 {
1100 let mut min_epoch = u32::MAX;
1101
1102 for i in 0..self.num_reader_slots {
1103 let slot = self.reader_slot(i);
1104 let pid = slot.pid.load(Ordering::Acquire);
1105 if pid != 0 {
1106 let epoch = slot.epoch.load(Ordering::Acquire);
1107 if epoch < min_epoch {
1108 min_epoch = epoch;
1109 }
1110 }
1111 }
1112
1113 if min_epoch == u32::MAX {
1114 self.current_epoch() as u32
1116 } else {
1117 min_epoch
1118 }
1119 }
1120
1121 pub fn run_gc(&self) -> usize {
1125 let min_epoch = self.min_active_epoch();
1126 let min_snapshot = self.min_active_snapshot();
1127
1128 self.version_store.gc(min_epoch, min_snapshot)
1129 }
1130
1131 pub fn should_run_gc(&self) -> bool {
1133 self.header().commits_since_gc.load(Ordering::Relaxed) >= GC_COMMIT_INTERVAL
1134 }
1135
1136 pub fn on_commit(&self) {
1138 let count = self
1139 .header()
1140 .commits_since_gc
1141 .fetch_add(1, Ordering::Relaxed);
1142
1143 if count >= GC_COMMIT_INTERVAL {
1144 self.header().commits_since_gc.store(0, Ordering::Relaxed);
1145 let _ = self.run_gc();
1146 }
1147 }
1148
1149 pub fn cleanup_stale_readers(&self) -> usize {
1151 let now = current_time_us();
1152 let mut cleaned = 0;
1153
1154 for i in 0..self.num_reader_slots {
1155 let slot = self.reader_slot(i);
1156 if slot.is_stale(now) {
1157 slot.pid.store(0, Ordering::Release);
1158 cleaned += 1;
1159 }
1160 }
1161
1162 cleaned
1163 }
1164
1165 pub fn advance_epoch(&self) -> u64 {
1167 self.header().current_epoch.fetch_add(1, Ordering::AcqRel) + 1
1168 }
1169}
1170
1171impl Drop for ConcurrentMvcc {
1172 fn drop(&mut self) {
1173 if let Some(slot_idx) = *self.our_slot.read() {
1175 self.unregister_reader(slot_idx);
1176 }
1177
1178 self.release_writer();
1180 }
1181}
1182
1183pub struct WriterGuard<'a> {
1189 mvcc: &'a ConcurrentMvcc,
1190}
1191
1192impl<'a> Drop for WriterGuard<'a> {
1193 fn drop(&mut self) {
1194 self.mvcc.release_writer();
1195 }
1196}
1197
1198#[inline]
1204fn current_time_us() -> u64 {
1205 SystemTime::now()
1206 .duration_since(UNIX_EPOCH)
1207 .unwrap()
1208 .as_micros() as u64
1209}
1210
1211#[cfg(unix)]
1213fn process_exists(pid: u32) -> bool {
1214 let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
1216 if result == 0 {
1217 true
1218 } else {
1219 std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
1221 }
1222}
1223
1224#[cfg(windows)]
1225fn process_exists(pid: u32) -> bool {
1226 use windows_sys::Win32::Foundation::CloseHandle;
1227 use windows_sys::Win32::System::Threading::{OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION};
1228
1229 unsafe {
1230 let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1231 if handle == 0 {
1232 false
1233 } else {
1234 CloseHandle(handle);
1235 true
1236 }
1237 }
1238}
1239
1240#[cfg(not(any(unix, windows)))]
1241fn process_exists(_pid: u32) -> bool {
1242 true }
1244
1245#[cfg(test)]
1250mod tests {
1251 use super::*;
1252 use std::sync::Arc;
1253 use std::thread;
1254
1255 #[test]
1256 fn test_struct_sizes() {
1257 eprintln!("MvccHeader size: {}", std::mem::size_of::<MvccHeader>());
1258 eprintln!("MvccHeader align: {}", std::mem::align_of::<MvccHeader>());
1259 eprintln!("ReaderSlot size: {}", std::mem::size_of::<ReaderSlot>());
1260 eprintln!("ReaderSlot align: {}", std::mem::align_of::<ReaderSlot>());
1261 eprintln!("HEADER_SIZE constant: {}", HEADER_SIZE);
1262 eprintln!("READER_SLOT_SIZE constant: {}", READER_SLOT_SIZE);
1263 eprintln!("METADATA_SIZE constant: {}", METADATA_SIZE);
1264
1265 assert_eq!(
1266 std::mem::size_of::<MvccHeader>(),
1267 HEADER_SIZE,
1268 "MvccHeader size mismatch! Actual: {}, Expected: {}",
1269 std::mem::size_of::<MvccHeader>(),
1270 HEADER_SIZE
1271 );
1272 assert_eq!(
1273 std::mem::size_of::<ReaderSlot>(),
1274 READER_SLOT_SIZE,
1275 "ReaderSlot size mismatch! Actual: {}, Expected: {}",
1276 std::mem::size_of::<ReaderSlot>(),
1277 READER_SLOT_SIZE
1278 );
1279 }
1280
1281 #[test]
1282 fn test_hlc_timestamp_ordering() {
1283 let ts = AtomicU64::new(0);
1284
1285 let t1 = HlcTimestamp::allocate_next(&ts);
1286 let t2 = HlcTimestamp::allocate_next(&ts);
1287 let t3 = HlcTimestamp::allocate_next(&ts);
1288
1289 assert!(t1.raw() < t2.raw());
1290 assert!(t2.raw() < t3.raw());
1291 }
1292
1293 #[test]
1294 fn test_hlc_timestamp_concurrent() {
1295 let ts = Arc::new(AtomicU64::new(0));
1296 let mut handles = vec![];
1297
1298 for _ in 0..8 {
1299 let ts = ts.clone();
1300 handles.push(thread::spawn(move || {
1301 let mut timestamps = vec![];
1302 for _ in 0..1000 {
1303 timestamps.push(HlcTimestamp::allocate_next(&ts).raw());
1304 }
1305 timestamps
1306 }));
1307 }
1308
1309 let mut all_ts: Vec<u64> = handles
1310 .into_iter()
1311 .flat_map(|h| h.join().unwrap())
1312 .collect();
1313
1314 all_ts.sort();
1316 let len_before = all_ts.len();
1317 all_ts.dedup();
1318 assert_eq!(all_ts.len(), len_before, "Duplicate timestamps found!");
1319 }
1320
1321 #[test]
1322 fn test_version_chain_read_at() {
1323 let mut chain = VersionChain::new();
1324
1325 chain.add_uncommitted(Some(b"v80".to_vec()), 3, 1);
1327 chain.commit(3, 80);
1328 chain.add_uncommitted(Some(b"v90".to_vec()), 2, 1);
1329 chain.commit(2, 90);
1330 chain.add_uncommitted(Some(b"v100".to_vec()), 1, 1);
1331 chain.commit(1, 100);
1332
1333 let v = chain.read_at(105, None).unwrap();
1335 assert_eq!(v.value, Some(b"v100".to_vec()));
1336
1337 let v = chain.read_at(95, None).unwrap();
1338 assert_eq!(v.value, Some(b"v90".to_vec()));
1339
1340 let v = chain.read_at(85, None).unwrap();
1341 assert_eq!(v.value, Some(b"v80".to_vec()));
1342
1343 assert!(chain.read_at(75, None).is_none());
1345 }
1346
1347 #[test]
1348 fn test_version_chain_gc() {
1349 let mut chain = VersionChain::new();
1350
1351 for i in (0..10u64).rev() {
1353 chain.add_uncommitted(
1354 Some(format!("v{}", 100 - i * 5).into_bytes()),
1355 i,
1356 (10 - i) as u32,
1357 );
1358 chain.commit(i, 100 - i * 5);
1359 }
1360
1361 assert_eq!(chain.len(), 10);
1362
1363 let reclaimed = chain.gc(7, 75);
1369
1370 assert!(
1374 reclaimed > 0,
1375 "Should have reclaimed some versions, got {}",
1376 reclaimed
1377 );
1378 assert!(chain.len() >= 1, "Should keep at least one version");
1379 }
1380
1381 #[test]
1382 fn test_version_store_basic() {
1383 let store = VersionStore::new();
1384
1385 store
1387 .insert_uncommitted(b"key1", Some(b"value1".to_vec()), 1, 1)
1388 .unwrap();
1389
1390 assert!(store.commit(b"key1", 1, 100));
1392
1393 let value = store.get(b"key1", 150, None);
1395 assert_eq!(value, Some(b"value1".to_vec()));
1396
1397 let value = store.get(b"key1", 50, None);
1399 assert!(value.is_none());
1400 }
1401
1402 #[test]
1403 fn test_reader_slot_claim_release() {
1404 let slot = ReaderSlot::empty();
1405
1406 assert!(slot.is_free());
1407
1408 assert!(slot.try_claim(1234, 100, 1));
1410 assert!(!slot.is_free());
1411
1412 assert!(!slot.try_claim(5678, 200, 2));
1414
1415 assert!(slot.try_claim(1234, 300, 3));
1417
1418 slot.release(1234);
1420 assert!(slot.is_free());
1421 }
1422}