1use std::fs::{File, OpenOptions};
58use std::path::{Path, PathBuf};
59use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
60use std::time::{Duration, SystemTime, UNIX_EPOCH};
61
62use dashmap::DashMap;
63use parking_lot::RwLock;
64use sochdb_core::{Result, SochDBError};
65use sochdb_core::version_chain::{
66 BinarySearchChain, ChainEntry,
67 MvccVersionChain, MvccVersionChainMut, WriteConflictDetection,
68 VisibilityContext, TxnId, Timestamp,
69};
70
71pub type ConcurrentVersionChain = VersionChain;
73pub type ConcurrentVersionEntry = VersionEntry;
74
75const MVCC_MAGIC: u64 = 0x43435F564D484353; const MVCC_VERSION: u32 = 1;
84
85const MAX_READERS: usize = 1024;
87
88const READER_SLOT_SIZE: usize = 64;
90
91const HEADER_SIZE: usize = 64;
93
94const METADATA_SIZE: usize = HEADER_SIZE + (MAX_READERS * READER_SLOT_SIZE);
96
97const STALE_READER_TIMEOUT_US: u64 = 60_000_000;
99
100const GC_COMMIT_INTERVAL: u64 = 1000;
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
116pub struct HlcTimestamp(pub u64);
117
118impl HlcTimestamp {
119 #[inline]
121 pub fn new(physical_ms: u64, logical: u16) -> Self {
122 Self((physical_ms << 16) | (logical as u64))
123 }
124
125 #[inline]
127 pub fn physical_ms(&self) -> u64 {
128 self.0 >> 16
129 }
130
131 #[inline]
133 pub fn logical(&self) -> u16 {
134 (self.0 & 0xFFFF) as u16
135 }
136
137 #[inline]
139 pub fn raw(&self) -> u64 {
140 self.0
141 }
142
143 pub fn allocate_next(last: &AtomicU64) -> Self {
151 let physical_now = SystemTime::now()
152 .duration_since(UNIX_EPOCH)
153 .unwrap()
154 .as_millis() as u64;
155
156 loop {
157 let last_val = last.load(Ordering::Acquire);
158 let last_phys = last_val >> 16;
159 let last_log = (last_val & 0xFFFF) as u16;
160
161 let (new_phys, new_log) = if physical_now > last_phys {
162 (physical_now, 0u16)
163 } else {
164 (last_phys, last_log.saturating_add(1))
166 };
167
168 let new_val = (new_phys << 16) | (new_log as u64);
169
170 if last
171 .compare_exchange(last_val, new_val, Ordering::AcqRel, Ordering::Acquire)
172 .is_ok()
173 {
174 return Self(new_val);
175 }
176 std::hint::spin_loop();
178 }
179 }
180
181 #[inline]
183 pub fn read_current(ts: &AtomicU64) -> Self {
184 Self(ts.load(Ordering::Acquire))
185 }
186}
187
188impl From<u64> for HlcTimestamp {
189 fn from(val: u64) -> Self {
190 Self(val)
191 }
192}
193
194impl From<HlcTimestamp> for u64 {
195 fn from(ts: HlcTimestamp) -> Self {
196 ts.0
197 }
198}
199
200#[repr(C, align(64))]
219#[derive(Debug)]
220pub struct ReaderSlot {
221 pub pid: AtomicU32,
223 pub snapshot_ts: AtomicU64,
225 pub epoch: AtomicU32,
227 pub last_heartbeat: AtomicU64,
229 _reserved: [u8; 32],
231}
232
233impl ReaderSlot {
234 pub const fn empty() -> Self {
236 Self {
237 pid: AtomicU32::new(0),
238 snapshot_ts: AtomicU64::new(0),
239 epoch: AtomicU32::new(0),
240 last_heartbeat: AtomicU64::new(0),
241 _reserved: [0u8; 32],
242 }
243 }
244
245 #[inline]
247 pub fn is_free(&self) -> bool {
248 self.pid.load(Ordering::Acquire) == 0
249 }
250
251 #[inline]
255 pub fn try_claim(&self, my_pid: u32, snapshot_ts: u64, epoch: u32) -> bool {
256 let current_pid = self.pid.load(Ordering::Acquire);
257
258 if current_pid != 0 && current_pid != my_pid {
260 return false;
261 }
262
263 if self
264 .pid
265 .compare_exchange(current_pid, my_pid, Ordering::AcqRel, Ordering::Acquire)
266 .is_ok()
267 {
268 self.snapshot_ts.store(snapshot_ts, Ordering::Release);
270 self.epoch.store(epoch, Ordering::Release);
271 self.last_heartbeat
272 .store(current_time_us(), Ordering::Release);
273 true
274 } else {
275 false
276 }
277 }
278
279 #[inline]
281 pub fn release(&self, my_pid: u32) {
282 if self.pid.load(Ordering::Acquire) == my_pid {
284 self.snapshot_ts.store(0, Ordering::Release);
285 self.pid.store(0, Ordering::Release);
286 }
287 }
288
289 #[inline]
291 pub fn heartbeat(&self) {
292 self.last_heartbeat
293 .store(current_time_us(), Ordering::Release);
294 }
295
296 #[inline]
298 pub fn is_stale(&self, now_us: u64) -> bool {
299 let pid = self.pid.load(Ordering::Acquire);
300 if pid == 0 {
301 return false; }
303
304 let last_hb = self.last_heartbeat.load(Ordering::Acquire);
306 if now_us.saturating_sub(last_hb) > STALE_READER_TIMEOUT_US {
307 return true;
308 }
309
310 !process_exists(pid)
312 }
313}
314
315#[repr(C)]
321#[derive(Debug)]
322pub struct MvccHeader {
323 pub magic: u64,
325 pub version: u32,
327 pub page_size: u32,
329 pub num_readers: u32,
331 pub current_epoch: AtomicU64,
333 pub current_ts: AtomicU64,
335 pub writer_lock: AtomicU32,
337 pub commits_since_gc: AtomicU64,
339 _reserved: [u8; 4],
341}
342
343impl MvccHeader {
344 pub fn new() -> Self {
346 Self {
347 magic: MVCC_MAGIC,
348 version: MVCC_VERSION,
349 page_size: 4096,
350 num_readers: MAX_READERS as u32,
351 current_epoch: AtomicU64::new(1),
352 current_ts: AtomicU64::new(HlcTimestamp::new(
353 SystemTime::now()
354 .duration_since(UNIX_EPOCH)
355 .unwrap()
356 .as_millis() as u64,
357 0,
358 ).raw()),
359 writer_lock: AtomicU32::new(0),
360 commits_since_gc: AtomicU64::new(0),
361 _reserved: [0u8; 4],
362 }
363 }
364
365 pub fn validate(&self) -> Result<()> {
367 if self.magic != MVCC_MAGIC {
368 return Err(SochDBError::Corruption(
369 "Invalid MVCC metadata magic".into(),
370 ));
371 }
372 if self.version != MVCC_VERSION {
373 return Err(SochDBError::Corruption(format!(
374 "Unsupported MVCC version: {} (expected {})",
375 self.version, MVCC_VERSION
376 )));
377 }
378 Ok(())
379 }
380}
381
382impl Default for MvccHeader {
383 fn default() -> Self {
384 Self::new()
385 }
386}
387
388#[derive(Debug, Clone)]
394pub struct VersionEntry {
395 pub commit_ts: u64,
397 pub txn_id: u64,
399 pub epoch: u32,
401 pub value: Option<Vec<u8>>,
403}
404
405impl VersionEntry {
406 pub fn new(commit_ts: u64, txn_id: u64, epoch: u32, value: Option<Vec<u8>>) -> Self {
408 Self {
409 commit_ts,
410 txn_id,
411 epoch,
412 value,
413 }
414 }
415
416 #[inline]
418 pub fn is_visible_at(&self, snapshot_ts: u64) -> bool {
419 self.commit_ts > 0 && self.commit_ts < snapshot_ts
420 }
421}
422
423impl ChainEntry for VersionEntry {
425 #[inline] fn commit_ts(&self) -> u64 { self.commit_ts }
426 #[inline] fn txn_id(&self) -> u64 { self.txn_id }
427 #[inline] fn set_commit_ts(&mut self, ts: u64) { self.commit_ts = ts; }
428}
429
430#[derive(Debug, Default)]
446pub struct VersionChain {
447 inner: BinarySearchChain<VersionEntry>,
449}
450
451impl VersionChain {
452 pub fn new() -> Self {
454 Self { inner: BinarySearchChain::new() }
455 }
456
457 pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64, epoch: u32) {
459 self.inner.set_uncommitted(VersionEntry {
460 commit_ts: 0,
461 txn_id,
462 epoch,
463 value,
464 });
465 }
466
467 #[inline]
469 pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
470 self.inner.commit(txn_id, commit_ts)
471 }
472
473 #[inline]
475 pub fn abort(&mut self, txn_id: u64) {
476 self.inner.abort(txn_id);
477 }
478
479 #[inline]
483 pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&VersionEntry> {
484 self.inner.read_at(snapshot_ts, current_txn_id)
485 }
486
487 #[inline]
489 pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
490 self.inner.has_write_conflict(my_txn_id)
491 }
492
493 pub fn gc(&mut self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
498 let versions = self.inner.committed_versions_mut();
499 if versions.len() <= 1 {
500 return 0;
501 }
502
503 let original_len = versions.len();
504
505 let mut keep_count = 1; for v in versions.iter().skip(1) {
507 if v.epoch >= min_epoch || v.commit_ts >= min_snapshot_ts {
508 keep_count += 1;
509 } else {
510 break; }
512 }
513
514 versions.truncate(keep_count);
515 original_len - versions.len()
516 }
517
518 #[inline]
520 pub fn len(&self) -> usize {
521 self.inner.version_count()
522 }
523
524 #[inline]
526 pub fn is_empty(&self) -> bool {
527 self.inner.is_empty()
528 }
529}
530
531impl MvccVersionChain for VersionChain {
536 type Value = Option<Vec<u8>>;
537
538 fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value> {
539 self.inner.read_at(ctx.snapshot_ts, Some(ctx.reader_txn_id))
540 .map(|v| &v.value)
541 }
542
543 fn get_latest(&self) -> Option<&Self::Value> {
544 self.inner.latest().map(|v| &v.value)
545 }
546
547 fn version_count(&self) -> usize {
548 self.inner.version_count()
549 }
550}
551
552impl MvccVersionChainMut for VersionChain {
553 fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId) {
554 self.add_uncommitted(value, txn_id, 0);
555 }
556
557 fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool {
558 self.inner.commit(txn_id, commit_ts)
559 }
560
561 fn delete_version(&mut self, txn_id: TxnId, _delete_ts: Timestamp) -> bool {
562 self.add_uncommitted(None, txn_id, 0);
563 true
564 }
565
566 fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize) {
567 let removed = self.gc(0, min_visible_ts);
568 (removed, removed * std::mem::size_of::<VersionEntry>())
569 }
570}
571
572impl WriteConflictDetection for VersionChain {
573 fn has_write_conflict(&self, txn_id: TxnId) -> bool {
574 self.inner.has_write_conflict(txn_id)
575 }
576}
577
578pub struct VersionStore {
586 data: DashMap<Vec<u8>, VersionChain>,
588 stats: VersionStoreStats,
590}
591
592#[derive(Debug, Default)]
594pub struct VersionStoreStats {
595 pub num_keys: AtomicU64,
597 pub num_versions: AtomicU64,
599 pub gc_passes: AtomicU64,
601 pub versions_reclaimed: AtomicU64,
603}
604
605impl VersionStore {
606 pub fn new() -> Self {
608 Self {
609 data: DashMap::new(),
610 stats: VersionStoreStats::default(),
611 }
612 }
613
614 pub fn insert_uncommitted(
616 &self,
617 key: &[u8],
618 value: Option<Vec<u8>>,
619 txn_id: u64,
620 epoch: u32,
621 ) -> Result<()> {
622 let mut entry = self.data.entry(key.to_vec()).or_insert_with(|| {
623 self.stats.num_keys.fetch_add(1, Ordering::Relaxed);
624 VersionChain::new()
625 });
626
627 if entry.has_write_conflict(txn_id) {
629 return Err(SochDBError::Internal(
630 "Write conflict: another transaction has uncommitted write".into(),
631 ));
632 }
633
634 entry.add_uncommitted(value, txn_id, epoch);
635 self.stats.num_versions.fetch_add(1, Ordering::Relaxed);
636 Ok(())
637 }
638
639 pub fn commit(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
641 if let Some(mut entry) = self.data.get_mut(key) {
642 return entry.commit(txn_id, commit_ts);
643 }
644 false
645 }
646
647 pub fn abort(&self, key: &[u8], txn_id: u64) {
649 if let Some(mut entry) = self.data.get_mut(key) {
650 entry.abort(txn_id);
651 self.stats.num_versions.fetch_sub(1, Ordering::Relaxed);
652 }
653 }
654
655 pub fn get(&self, key: &[u8], snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<Vec<u8>> {
657 self.data.get(key).and_then(|chain| {
658 chain
659 .read_at(snapshot_ts, current_txn_id)
660 .and_then(|v| v.value.clone())
661 })
662 }
663
664 pub fn contains(&self, key: &[u8], snapshot_ts: u64) -> bool {
666 self.data
667 .get(key)
668 .map(|chain| chain.read_at(snapshot_ts, None).is_some())
669 .unwrap_or(false)
670 }
671
672 pub fn gc(&self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
676 let mut total_reclaimed = 0;
677
678 for mut entry in self.data.iter_mut() {
679 let reclaimed = entry.gc(min_epoch, min_snapshot_ts);
680 total_reclaimed += reclaimed;
681 }
682
683 self.stats
684 .gc_passes
685 .fetch_add(1, Ordering::Relaxed);
686 self.stats
687 .versions_reclaimed
688 .fetch_add(total_reclaimed as u64, Ordering::Relaxed);
689
690 total_reclaimed
691 }
692
693 pub fn len(&self) -> usize {
695 self.data.len()
696 }
697
698 pub fn is_empty(&self) -> bool {
700 self.data.is_empty()
701 }
702
703 pub fn stats(&self) -> &VersionStoreStats {
705 &self.stats
706 }
707}
708
709impl Default for VersionStore {
710 fn default() -> Self {
711 Self::new()
712 }
713}
714
715impl sochdb_core::version_chain::MvccStore for VersionStore {
720 fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
721 self.get(key, snapshot_ts, txn_id)
722 }
723
724 fn mvcc_put(
725 &self,
726 key: &[u8],
727 value: Option<Vec<u8>>,
728 txn_id: u64,
729 ) -> std::result::Result<(), sochdb_core::version_chain::MvccStoreError> {
730 let mut entry = self.data.entry(key.to_vec()).or_insert_with(|| {
731 self.stats.num_keys.fetch_add(1, Ordering::Relaxed);
732 VersionChain::new()
733 });
734 if entry.has_write_conflict(txn_id) {
735 return Err(sochdb_core::version_chain::MvccStoreError::WriteConflict);
736 }
737 entry.add_uncommitted(value, txn_id, 0);
738 self.stats.num_versions.fetch_add(1, Ordering::Relaxed);
739 Ok(())
740 }
741
742 fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
743 self.commit(key, txn_id, commit_ts)
744 }
745
746 fn mvcc_abort_key(&self, key: &[u8], txn_id: u64) {
747 self.abort(key, txn_id);
748 }
749
750 fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool {
751 self.data
752 .get(key)
753 .map(|chain| chain.has_write_conflict(txn_id))
754 .unwrap_or(false)
755 }
756
757 fn mvcc_gc(&self, min_ts: u64) -> sochdb_core::version_chain::MvccGcStats {
758 let mut stats = sochdb_core::version_chain::MvccGcStats::default();
759 for mut entry in self.data.iter_mut() {
760 stats.keys_scanned += 1;
761 let removed = entry.gc(0, min_ts);
762 stats.versions_removed += removed;
763 }
764 self.stats.gc_passes.fetch_add(1, Ordering::Relaxed);
765 self.stats.versions_reclaimed.fetch_add(stats.versions_removed as u64, Ordering::Relaxed);
766 stats
767 }
768
769 fn mvcc_key_count(&self) -> usize {
770 self.len()
771 }
772}
773
774pub struct ConcurrentMvcc {
792 path: PathBuf,
794 _mmap: memmap2::MmapMut,
799 header: *const MvccHeader,
803 reader_slots_ptr: *const ReaderSlot,
807 num_reader_slots: usize,
809 version_store: VersionStore,
811 our_pid: u32,
813 our_slot: RwLock<Option<usize>>,
815}
816
817unsafe impl Send for ConcurrentMvcc {}
821unsafe impl Sync for ConcurrentMvcc {}
822
823impl ConcurrentMvcc {
824 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
830 let path = path.as_ref().to_path_buf();
831 std::fs::create_dir_all(&path)?;
832
833 let metadata_path = path.join(".mvcc_metadata");
834 let is_new = !metadata_path.exists();
835
836 let file = OpenOptions::new()
838 .create(true)
839 .read(true)
840 .write(true)
841 .open(&metadata_path)?;
842
843 let required_size = METADATA_SIZE as u64;
845 if file.metadata()?.len() < required_size {
846 file.set_len(required_size)?;
847 }
848
849 let mut mmap = unsafe { memmap2::MmapMut::map_mut(&file)? };
855
856 if is_new || mmap.len() < METADATA_SIZE {
857 let header = MvccHeader::new();
859 let header_bytes = unsafe {
860 std::slice::from_raw_parts(
861 &header as *const MvccHeader as *const u8,
862 std::mem::size_of::<MvccHeader>(),
863 )
864 };
865 mmap[..header_bytes.len()].copy_from_slice(header_bytes);
866
867 for i in 0..MAX_READERS {
869 let offset = HEADER_SIZE + i * READER_SLOT_SIZE;
870 let end = offset + READER_SLOT_SIZE;
871 if end <= mmap.len() {
872 mmap[offset..end].fill(0);
873 }
874 }
875
876 mmap.flush()?;
877 } else {
878 let header_ref = unsafe { &*(mmap.as_ptr() as *const MvccHeader) };
880 header_ref.validate()?;
881 }
882
883 let header = mmap.as_ptr() as *const MvccHeader;
885 let reader_slots_ptr = unsafe {
886 mmap.as_ptr().add(HEADER_SIZE) as *const ReaderSlot
887 };
888
889 Ok(Self {
890 path,
891 _mmap: mmap,
892 header,
893 reader_slots_ptr,
894 num_reader_slots: MAX_READERS,
895 version_store: VersionStore::new(),
896 our_pid: std::process::id(),
897 our_slot: RwLock::new(None),
898 })
899 }
900
901 #[inline]
905 fn header(&self) -> &MvccHeader {
906 unsafe { &*self.header }
907 }
908
909 #[inline]
913 fn reader_slot(&self, idx: usize) -> &ReaderSlot {
914 assert!(idx < self.num_reader_slots);
915 unsafe { &*self.reader_slots_ptr.add(idx) }
916 }
917
918 #[inline]
920 pub fn allocate_timestamp(&self) -> HlcTimestamp {
921 HlcTimestamp::allocate_next(&self.header().current_ts)
922 }
923
924 #[inline]
926 pub fn current_timestamp(&self) -> HlcTimestamp {
927 HlcTimestamp::read_current(&self.header().current_ts)
928 }
929
930 #[inline]
932 pub fn current_epoch(&self) -> u64 {
933 self.header().current_epoch.load(Ordering::Acquire)
934 }
935
936 pub fn register_reader(&self) -> Result<usize> {
941 let snapshot_ts = self.current_timestamp().raw();
942 let epoch = self.current_epoch() as u32;
943
944 for i in 0..self.num_reader_slots {
946 let slot = self.reader_slot(i);
947 if slot.try_claim(self.our_pid, snapshot_ts, epoch) {
948 *self.our_slot.write() = Some(i);
949 return Ok(i);
950 }
951 }
952
953 Err(SochDBError::ResourceExhausted(
954 "Too many concurrent readers".into(),
955 ))
956 }
957
958 pub fn unregister_reader(&self, slot_idx: usize) {
960 if slot_idx < self.num_reader_slots {
961 self.reader_slot(slot_idx).release(self.our_pid);
962 *self.our_slot.write() = None;
963 }
964 }
965
966 pub fn try_acquire_writer(&self) -> Result<WriterGuard<'_>> {
970 let current = self.header().writer_lock.load(Ordering::Acquire);
971
972 if current == 0 {
973 if self
975 .header()
976 .writer_lock
977 .compare_exchange(0, self.our_pid, Ordering::AcqRel, Ordering::Acquire)
978 .is_ok()
979 {
980 return Ok(WriterGuard { mvcc: self });
981 }
982 } else if current == self.our_pid {
983 return Ok(WriterGuard { mvcc: self });
985 }
986
987 Err(SochDBError::LockError(format!(
988 "Writer lock held by process {}",
989 current
990 )))
991 }
992
993 pub fn acquire_writer(&self, timeout: Duration) -> Result<WriterGuard<'_>> {
995 let deadline = std::time::Instant::now() + timeout;
996
997 loop {
998 match self.try_acquire_writer() {
999 Ok(guard) => return Ok(guard),
1000 Err(_) if std::time::Instant::now() < deadline => {
1001 std::thread::sleep(Duration::from_micros(100));
1002 }
1003 Err(e) => return Err(e),
1004 }
1005 }
1006 }
1007
1008 fn release_writer(&self) {
1010 let current = self.header().writer_lock.load(Ordering::Acquire);
1011 if current == self.our_pid {
1012 self.header().writer_lock.store(0, Ordering::Release);
1013 }
1014 }
1015
1016 pub fn version_store(&self) -> &VersionStore {
1018 &self.version_store
1019 }
1020
1021 pub fn min_active_snapshot(&self) -> u64 {
1023 let mut min_ts = u64::MAX;
1024
1025 for i in 0..self.num_reader_slots {
1026 let slot = self.reader_slot(i);
1027 let pid = slot.pid.load(Ordering::Acquire);
1028 if pid != 0 {
1029 let ts = slot.snapshot_ts.load(Ordering::Acquire);
1030 if ts > 0 && ts < min_ts {
1031 min_ts = ts;
1032 }
1033 }
1034 }
1035
1036 min_ts
1037 }
1038
1039 pub fn min_active_epoch(&self) -> u32 {
1041 let mut min_epoch = u32::MAX;
1042
1043 for i in 0..self.num_reader_slots {
1044 let slot = self.reader_slot(i);
1045 let pid = slot.pid.load(Ordering::Acquire);
1046 if pid != 0 {
1047 let epoch = slot.epoch.load(Ordering::Acquire);
1048 if epoch < min_epoch {
1049 min_epoch = epoch;
1050 }
1051 }
1052 }
1053
1054 if min_epoch == u32::MAX {
1055 self.current_epoch() as u32
1057 } else {
1058 min_epoch
1059 }
1060 }
1061
1062 pub fn run_gc(&self) -> usize {
1066 let min_epoch = self.min_active_epoch();
1067 let min_snapshot = self.min_active_snapshot();
1068
1069 self.version_store.gc(min_epoch, min_snapshot)
1070 }
1071
1072 pub fn should_run_gc(&self) -> bool {
1074 self.header()
1075 .commits_since_gc
1076 .load(Ordering::Relaxed)
1077 >= GC_COMMIT_INTERVAL
1078 }
1079
1080 pub fn on_commit(&self) {
1082 let count = self
1083 .header()
1084 .commits_since_gc
1085 .fetch_add(1, Ordering::Relaxed);
1086
1087 if count >= GC_COMMIT_INTERVAL {
1088 self.header().commits_since_gc.store(0, Ordering::Relaxed);
1089 let _ = self.run_gc();
1090 }
1091 }
1092
1093 pub fn cleanup_stale_readers(&self) -> usize {
1095 let now = current_time_us();
1096 let mut cleaned = 0;
1097
1098 for i in 0..self.num_reader_slots {
1099 let slot = self.reader_slot(i);
1100 if slot.is_stale(now) {
1101 slot.pid.store(0, Ordering::Release);
1102 cleaned += 1;
1103 }
1104 }
1105
1106 cleaned
1107 }
1108
1109 pub fn advance_epoch(&self) -> u64 {
1111 self.header().current_epoch.fetch_add(1, Ordering::AcqRel) + 1
1112 }
1113}
1114
1115impl Drop for ConcurrentMvcc {
1116 fn drop(&mut self) {
1117 if let Some(slot_idx) = *self.our_slot.read() {
1119 self.unregister_reader(slot_idx);
1120 }
1121
1122 self.release_writer();
1124 }
1125}
1126
1127pub struct WriterGuard<'a> {
1133 mvcc: &'a ConcurrentMvcc,
1134}
1135
1136impl<'a> Drop for WriterGuard<'a> {
1137 fn drop(&mut self) {
1138 self.mvcc.release_writer();
1139 }
1140}
1141
1142#[inline]
1148fn current_time_us() -> u64 {
1149 SystemTime::now()
1150 .duration_since(UNIX_EPOCH)
1151 .unwrap()
1152 .as_micros() as u64
1153}
1154
1155#[cfg(unix)]
1157fn process_exists(pid: u32) -> bool {
1158 let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
1160 if result == 0 {
1161 true
1162 } else {
1163 std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
1165 }
1166}
1167
1168#[cfg(windows)]
1169fn process_exists(pid: u32) -> bool {
1170 use windows_sys::Win32::Foundation::CloseHandle;
1171 use windows_sys::Win32::System::Threading::{OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION};
1172
1173 unsafe {
1174 let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1175 if handle == 0 {
1176 false
1177 } else {
1178 CloseHandle(handle);
1179 true
1180 }
1181 }
1182}
1183
1184#[cfg(not(any(unix, windows)))]
1185fn process_exists(_pid: u32) -> bool {
1186 true }
1188
1189#[cfg(test)]
1194mod tests {
1195 use super::*;
1196 use std::sync::Arc;
1197 use std::thread;
1198
1199 #[test]
1200 fn test_struct_sizes() {
1201 eprintln!("MvccHeader size: {}", std::mem::size_of::<MvccHeader>());
1202 eprintln!("MvccHeader align: {}", std::mem::align_of::<MvccHeader>());
1203 eprintln!("ReaderSlot size: {}", std::mem::size_of::<ReaderSlot>());
1204 eprintln!("ReaderSlot align: {}", std::mem::align_of::<ReaderSlot>());
1205 eprintln!("HEADER_SIZE constant: {}", HEADER_SIZE);
1206 eprintln!("READER_SLOT_SIZE constant: {}", READER_SLOT_SIZE);
1207 eprintln!("METADATA_SIZE constant: {}", METADATA_SIZE);
1208
1209 assert_eq!(std::mem::size_of::<MvccHeader>(), HEADER_SIZE,
1210 "MvccHeader size mismatch! Actual: {}, Expected: {}",
1211 std::mem::size_of::<MvccHeader>(), HEADER_SIZE);
1212 assert_eq!(std::mem::size_of::<ReaderSlot>(), READER_SLOT_SIZE,
1213 "ReaderSlot size mismatch! Actual: {}, Expected: {}",
1214 std::mem::size_of::<ReaderSlot>(), READER_SLOT_SIZE);
1215 }
1216
1217 #[test]
1218 fn test_hlc_timestamp_ordering() {
1219 let ts = AtomicU64::new(0);
1220
1221 let t1 = HlcTimestamp::allocate_next(&ts);
1222 let t2 = HlcTimestamp::allocate_next(&ts);
1223 let t3 = HlcTimestamp::allocate_next(&ts);
1224
1225 assert!(t1.raw() < t2.raw());
1226 assert!(t2.raw() < t3.raw());
1227 }
1228
1229 #[test]
1230 fn test_hlc_timestamp_concurrent() {
1231 let ts = Arc::new(AtomicU64::new(0));
1232 let mut handles = vec![];
1233
1234 for _ in 0..8 {
1235 let ts = ts.clone();
1236 handles.push(thread::spawn(move || {
1237 let mut timestamps = vec![];
1238 for _ in 0..1000 {
1239 timestamps.push(HlcTimestamp::allocate_next(&ts).raw());
1240 }
1241 timestamps
1242 }));
1243 }
1244
1245 let mut all_ts: Vec<u64> = handles
1246 .into_iter()
1247 .flat_map(|h| h.join().unwrap())
1248 .collect();
1249
1250 all_ts.sort();
1252 let len_before = all_ts.len();
1253 all_ts.dedup();
1254 assert_eq!(all_ts.len(), len_before, "Duplicate timestamps found!");
1255 }
1256
1257 #[test]
1258 fn test_version_chain_read_at() {
1259 let mut chain = VersionChain::new();
1260
1261 chain.add_uncommitted(Some(b"v80".to_vec()), 3, 1);
1263 chain.commit(3, 80);
1264 chain.add_uncommitted(Some(b"v90".to_vec()), 2, 1);
1265 chain.commit(2, 90);
1266 chain.add_uncommitted(Some(b"v100".to_vec()), 1, 1);
1267 chain.commit(1, 100);
1268
1269 let v = chain.read_at(105, None).unwrap();
1271 assert_eq!(v.value, Some(b"v100".to_vec()));
1272
1273 let v = chain.read_at(95, None).unwrap();
1274 assert_eq!(v.value, Some(b"v90".to_vec()));
1275
1276 let v = chain.read_at(85, None).unwrap();
1277 assert_eq!(v.value, Some(b"v80".to_vec()));
1278
1279 assert!(chain.read_at(75, None).is_none());
1281 }
1282
1283 #[test]
1284 fn test_version_chain_gc() {
1285 let mut chain = VersionChain::new();
1286
1287 for i in (0..10u64).rev() {
1289 chain.add_uncommitted(
1290 Some(format!("v{}", 100 - i * 5).into_bytes()),
1291 i,
1292 (10 - i) as u32,
1293 );
1294 chain.commit(i, 100 - i * 5);
1295 }
1296
1297 assert_eq!(chain.len(), 10);
1298
1299 let reclaimed = chain.gc(7, 75);
1305
1306 assert!(reclaimed > 0, "Should have reclaimed some versions, got {}", reclaimed);
1310 assert!(chain.len() >= 1, "Should keep at least one version");
1311 }
1312
1313 #[test]
1314 fn test_version_store_basic() {
1315 let store = VersionStore::new();
1316
1317 store
1319 .insert_uncommitted(b"key1", Some(b"value1".to_vec()), 1, 1)
1320 .unwrap();
1321
1322 assert!(store.commit(b"key1", 1, 100));
1324
1325 let value = store.get(b"key1", 150, None);
1327 assert_eq!(value, Some(b"value1".to_vec()));
1328
1329 let value = store.get(b"key1", 50, None);
1331 assert!(value.is_none());
1332 }
1333
1334 #[test]
1335 fn test_reader_slot_claim_release() {
1336 let slot = ReaderSlot::empty();
1337
1338 assert!(slot.is_free());
1339
1340 assert!(slot.try_claim(1234, 100, 1));
1342 assert!(!slot.is_free());
1343
1344 assert!(!slot.try_claim(5678, 200, 2));
1346
1347 assert!(slot.try_claim(1234, 300, 3));
1349
1350 slot.release(1234);
1352 assert!(slot.is_free());
1353 }
1354}