use std::fs::OpenOptions;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use dashmap::DashMap;
use parking_lot::RwLock;
use sochdb_core::version_chain::{
BinarySearchChain, ChainEntry, MvccVersionChain, MvccVersionChainMut, Timestamp, TxnId,
VisibilityContext, WriteConflictDetection,
};
use sochdb_core::{Result, SochDBError};
pub type ConcurrentVersionChain = VersionChain;
pub type ConcurrentVersionEntry = VersionEntry;
const MVCC_MAGIC: u64 = 0x43435F564D484353;
const MVCC_VERSION: u32 = 1;
const MAX_READERS: usize = 1024;
const READER_SLOT_SIZE: usize = 64;
const HEADER_SIZE: usize = 64;
const METADATA_SIZE: usize = HEADER_SIZE + (MAX_READERS * READER_SLOT_SIZE);
const STALE_READER_TIMEOUT_US: u64 = 60_000_000;
const GC_COMMIT_INTERVAL: u64 = 1000;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct HlcTimestamp(pub u64);
impl HlcTimestamp {
#[inline]
pub fn new(physical_ms: u64, logical: u16) -> Self {
Self((physical_ms << 16) | (logical as u64))
}
#[inline]
pub fn physical_ms(&self) -> u64 {
self.0 >> 16
}
#[inline]
pub fn logical(&self) -> u16 {
(self.0 & 0xFFFF) as u16
}
#[inline]
pub fn raw(&self) -> u64 {
self.0
}
pub fn allocate_next(last: &AtomicU64) -> Self {
let physical_now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
loop {
let last_val = last.load(Ordering::Acquire);
let last_phys = last_val >> 16;
let last_log = (last_val & 0xFFFF) as u16;
let (new_phys, new_log) = if physical_now > last_phys {
(physical_now, 0u16)
} else {
(last_phys, last_log.saturating_add(1))
};
let new_val = (new_phys << 16) | (new_log as u64);
if last
.compare_exchange(last_val, new_val, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return Self(new_val);
}
std::hint::spin_loop();
}
}
#[inline]
pub fn read_current(ts: &AtomicU64) -> Self {
Self(ts.load(Ordering::Acquire))
}
}
impl From<u64> for HlcTimestamp {
fn from(val: u64) -> Self {
Self(val)
}
}
impl From<HlcTimestamp> for u64 {
fn from(ts: HlcTimestamp) -> Self {
ts.0
}
}
#[repr(C, align(64))]
#[derive(Debug)]
pub struct ReaderSlot {
pub pid: AtomicU32,
pub snapshot_ts: AtomicU64,
pub epoch: AtomicU32,
pub last_heartbeat: AtomicU64,
_reserved: [u8; 32],
}
impl ReaderSlot {
pub const fn empty() -> Self {
Self {
pid: AtomicU32::new(0),
snapshot_ts: AtomicU64::new(0),
epoch: AtomicU32::new(0),
last_heartbeat: AtomicU64::new(0),
_reserved: [0u8; 32],
}
}
#[inline]
pub fn is_free(&self) -> bool {
self.pid.load(Ordering::Acquire) == 0
}
#[inline]
pub fn try_claim(&self, my_pid: u32, snapshot_ts: u64, epoch: u32) -> bool {
let current_pid = self.pid.load(Ordering::Acquire);
if current_pid != 0 && current_pid != my_pid {
return false;
}
if self
.pid
.compare_exchange(current_pid, my_pid, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
self.snapshot_ts.store(snapshot_ts, Ordering::Release);
self.epoch.store(epoch, Ordering::Release);
self.last_heartbeat
.store(current_time_us(), Ordering::Release);
true
} else {
false
}
}
#[inline]
pub fn release(&self, my_pid: u32) {
if self.pid.load(Ordering::Acquire) == my_pid {
self.snapshot_ts.store(0, Ordering::Release);
self.pid.store(0, Ordering::Release);
}
}
#[inline]
pub fn heartbeat(&self) {
self.last_heartbeat
.store(current_time_us(), Ordering::Release);
}
#[inline]
pub fn is_stale(&self, now_us: u64) -> bool {
let pid = self.pid.load(Ordering::Acquire);
if pid == 0 {
return false; }
let last_hb = self.last_heartbeat.load(Ordering::Acquire);
if now_us.saturating_sub(last_hb) > STALE_READER_TIMEOUT_US {
return true;
}
!process_exists(pid)
}
}
#[repr(C)]
#[derive(Debug)]
pub struct MvccHeader {
pub magic: u64,
pub version: u32,
pub page_size: u32,
pub num_readers: u32,
pub current_epoch: AtomicU64,
pub current_ts: AtomicU64,
pub writer_lock: AtomicU32,
pub commits_since_gc: AtomicU64,
_reserved: [u8; 4],
}
impl MvccHeader {
pub fn new() -> Self {
Self {
magic: MVCC_MAGIC,
version: MVCC_VERSION,
page_size: 4096,
num_readers: MAX_READERS as u32,
current_epoch: AtomicU64::new(1),
current_ts: AtomicU64::new(
HlcTimestamp::new(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
0,
)
.raw(),
),
writer_lock: AtomicU32::new(0),
commits_since_gc: AtomicU64::new(0),
_reserved: [0u8; 4],
}
}
pub fn validate(&self) -> Result<()> {
if self.magic != MVCC_MAGIC {
return Err(SochDBError::Corruption(
"Invalid MVCC metadata magic".into(),
));
}
if self.version != MVCC_VERSION {
return Err(SochDBError::Corruption(format!(
"Unsupported MVCC version: {} (expected {})",
self.version, MVCC_VERSION
)));
}
Ok(())
}
}
impl Default for MvccHeader {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct VersionEntry {
pub commit_ts: u64,
pub txn_id: u64,
pub epoch: u32,
pub value: Option<Vec<u8>>,
}
impl VersionEntry {
pub fn new(commit_ts: u64, txn_id: u64, epoch: u32, value: Option<Vec<u8>>) -> Self {
Self {
commit_ts,
txn_id,
epoch,
value,
}
}
#[inline]
pub fn is_visible_at(&self, snapshot_ts: u64) -> bool {
self.commit_ts > 0 && self.commit_ts < snapshot_ts
}
}
impl ChainEntry for VersionEntry {
#[inline]
fn commit_ts(&self) -> u64 {
self.commit_ts
}
#[inline]
fn txn_id(&self) -> u64 {
self.txn_id
}
#[inline]
fn set_commit_ts(&mut self, ts: u64) {
self.commit_ts = ts;
}
}
#[derive(Debug, Default)]
pub struct VersionChain {
inner: BinarySearchChain<VersionEntry>,
}
impl VersionChain {
pub fn new() -> Self {
Self {
inner: BinarySearchChain::new(),
}
}
pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64, epoch: u32) {
self.inner.set_uncommitted(VersionEntry {
commit_ts: 0,
txn_id,
epoch,
value,
});
}
#[inline]
pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
self.inner.commit(txn_id, commit_ts)
}
#[inline]
pub fn abort(&mut self, txn_id: u64) {
self.inner.abort(txn_id);
}
#[inline]
pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&VersionEntry> {
self.inner.read_at(snapshot_ts, current_txn_id)
}
#[inline]
pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
self.inner.has_write_conflict(my_txn_id)
}
pub fn gc(&mut self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
let versions = self.inner.committed_versions_mut();
if versions.len() <= 1 {
return 0;
}
let original_len = versions.len();
let mut keep_count = 1; for v in versions.iter().skip(1) {
if v.epoch >= min_epoch || v.commit_ts >= min_snapshot_ts {
keep_count += 1;
} else {
break; }
}
versions.truncate(keep_count);
original_len - versions.len()
}
#[inline]
pub fn len(&self) -> usize {
self.inner.version_count()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
impl MvccVersionChain for VersionChain {
type Value = Option<Vec<u8>>;
fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value> {
self.inner
.read_at(ctx.snapshot_ts, Some(ctx.reader_txn_id))
.map(|v| &v.value)
}
fn get_latest(&self) -> Option<&Self::Value> {
self.inner.latest().map(|v| &v.value)
}
fn version_count(&self) -> usize {
self.inner.version_count()
}
}
impl MvccVersionChainMut for VersionChain {
fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId) {
self.add_uncommitted(value, txn_id, 0);
}
fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool {
self.inner.commit(txn_id, commit_ts)
}
fn delete_version(&mut self, txn_id: TxnId, _delete_ts: Timestamp) -> bool {
self.add_uncommitted(None, txn_id, 0);
true
}
fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize) {
let removed = self.gc(0, min_visible_ts);
(removed, removed * std::mem::size_of::<VersionEntry>())
}
}
impl WriteConflictDetection for VersionChain {
fn has_write_conflict(&self, txn_id: TxnId) -> bool {
self.inner.has_write_conflict(txn_id)
}
}
pub struct VersionStore {
data: DashMap<Vec<u8>, VersionChain>,
stats: VersionStoreStats,
}
#[derive(Debug, Default)]
pub struct VersionStoreStats {
pub num_keys: AtomicU64,
pub num_versions: AtomicU64,
pub gc_passes: AtomicU64,
pub versions_reclaimed: AtomicU64,
}
impl VersionStore {
pub fn new() -> Self {
Self {
data: DashMap::new(),
stats: VersionStoreStats::default(),
}
}
pub fn insert_uncommitted(
&self,
key: &[u8],
value: Option<Vec<u8>>,
txn_id: u64,
epoch: u32,
) -> Result<()> {
let mut entry = self.data.entry(key.to_vec()).or_insert_with(|| {
self.stats.num_keys.fetch_add(1, Ordering::Relaxed);
VersionChain::new()
});
if entry.has_write_conflict(txn_id) {
return Err(SochDBError::Internal(
"Write conflict: another transaction has uncommitted write".into(),
));
}
entry.add_uncommitted(value, txn_id, epoch);
self.stats.num_versions.fetch_add(1, Ordering::Relaxed);
Ok(())
}
pub fn commit(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
if let Some(mut entry) = self.data.get_mut(key) {
return entry.commit(txn_id, commit_ts);
}
false
}
pub fn abort(&self, key: &[u8], txn_id: u64) {
if let Some(mut entry) = self.data.get_mut(key) {
entry.abort(txn_id);
self.stats.num_versions.fetch_sub(1, Ordering::Relaxed);
}
}
pub fn get(
&self,
key: &[u8],
snapshot_ts: u64,
current_txn_id: Option<u64>,
) -> Option<Vec<u8>> {
self.data.get(key).and_then(|chain| {
chain
.read_at(snapshot_ts, current_txn_id)
.and_then(|v| v.value.clone())
})
}
pub fn contains(&self, key: &[u8], snapshot_ts: u64) -> bool {
self.data
.get(key)
.map(|chain| chain.read_at(snapshot_ts, None).is_some())
.unwrap_or(false)
}
pub fn gc(&self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
let mut total_reclaimed = 0;
for mut entry in self.data.iter_mut() {
let reclaimed = entry.gc(min_epoch, min_snapshot_ts);
total_reclaimed += reclaimed;
}
self.stats.gc_passes.fetch_add(1, Ordering::Relaxed);
self.stats
.versions_reclaimed
.fetch_add(total_reclaimed as u64, Ordering::Relaxed);
total_reclaimed
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn stats(&self) -> &VersionStoreStats {
&self.stats
}
}
impl Default for VersionStore {
fn default() -> Self {
Self::new()
}
}
impl sochdb_core::version_chain::MvccStore for VersionStore {
fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
self.get(key, snapshot_ts, txn_id)
}
fn mvcc_put(
&self,
key: &[u8],
value: Option<Vec<u8>>,
txn_id: u64,
) -> std::result::Result<(), sochdb_core::version_chain::MvccStoreError> {
let mut entry = self.data.entry(key.to_vec()).or_insert_with(|| {
self.stats.num_keys.fetch_add(1, Ordering::Relaxed);
VersionChain::new()
});
if entry.has_write_conflict(txn_id) {
return Err(sochdb_core::version_chain::MvccStoreError::WriteConflict);
}
entry.add_uncommitted(value, txn_id, 0);
self.stats.num_versions.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
self.commit(key, txn_id, commit_ts)
}
fn mvcc_abort_key(&self, key: &[u8], txn_id: u64) {
self.abort(key, txn_id);
}
fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool {
self.data
.get(key)
.map(|chain| chain.has_write_conflict(txn_id))
.unwrap_or(false)
}
fn mvcc_gc(&self, min_ts: u64) -> sochdb_core::version_chain::MvccGcStats {
let mut stats = sochdb_core::version_chain::MvccGcStats::default();
for mut entry in self.data.iter_mut() {
stats.keys_scanned += 1;
let removed = entry.gc(0, min_ts);
stats.versions_removed += removed;
}
self.stats.gc_passes.fetch_add(1, Ordering::Relaxed);
self.stats
.versions_reclaimed
.fetch_add(stats.versions_removed as u64, Ordering::Relaxed);
stats
}
fn mvcc_key_count(&self) -> usize {
self.len()
}
}
pub struct ConcurrentMvcc {
path: PathBuf,
_mmap: memmap2::MmapMut,
header: *const MvccHeader,
reader_slots_ptr: *const ReaderSlot,
num_reader_slots: usize,
version_store: VersionStore,
our_pid: u32,
our_slot: RwLock<Option<usize>>,
}
unsafe impl Send for ConcurrentMvcc {}
unsafe impl Sync for ConcurrentMvcc {}
impl ConcurrentMvcc {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref().to_path_buf();
std::fs::create_dir_all(&path)?;
let metadata_path = path.join(".mvcc_metadata");
let is_new = !metadata_path.exists();
let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.open(&metadata_path)?;
let required_size = METADATA_SIZE as u64;
if file.metadata()?.len() < required_size {
file.set_len(required_size)?;
}
let mut mmap = unsafe { memmap2::MmapMut::map_mut(&file)? };
if is_new || mmap.len() < METADATA_SIZE {
let header = MvccHeader::new();
let header_bytes = unsafe {
std::slice::from_raw_parts(
&header as *const MvccHeader as *const u8,
std::mem::size_of::<MvccHeader>(),
)
};
mmap[..header_bytes.len()].copy_from_slice(header_bytes);
for i in 0..MAX_READERS {
let offset = HEADER_SIZE + i * READER_SLOT_SIZE;
let end = offset + READER_SLOT_SIZE;
if end <= mmap.len() {
mmap[offset..end].fill(0);
}
}
mmap.flush()?;
} else {
let header_ref = unsafe { &*(mmap.as_ptr() as *const MvccHeader) };
header_ref.validate()?;
}
let header = mmap.as_ptr() as *const MvccHeader;
let reader_slots_ptr = unsafe { mmap.as_ptr().add(HEADER_SIZE) as *const ReaderSlot };
Ok(Self {
path,
_mmap: mmap,
header,
reader_slots_ptr,
num_reader_slots: MAX_READERS,
version_store: VersionStore::new(),
our_pid: std::process::id(),
our_slot: RwLock::new(None),
})
}
#[inline]
fn header(&self) -> &MvccHeader {
unsafe { &*self.header }
}
#[inline]
fn reader_slot(&self, idx: usize) -> &ReaderSlot {
assert!(idx < self.num_reader_slots);
unsafe { &*self.reader_slots_ptr.add(idx) }
}
#[inline]
pub fn allocate_timestamp(&self) -> HlcTimestamp {
HlcTimestamp::allocate_next(&self.header().current_ts)
}
#[inline]
pub fn current_timestamp(&self) -> HlcTimestamp {
HlcTimestamp::read_current(&self.header().current_ts)
}
#[inline]
pub fn current_epoch(&self) -> u64 {
self.header().current_epoch.load(Ordering::Acquire)
}
pub fn register_reader(&self) -> Result<usize> {
let snapshot_ts = self.current_timestamp().raw();
let epoch = self.current_epoch() as u32;
for i in 0..self.num_reader_slots {
let slot = self.reader_slot(i);
if slot.try_claim(self.our_pid, snapshot_ts, epoch) {
*self.our_slot.write() = Some(i);
return Ok(i);
}
}
Err(SochDBError::ResourceExhausted(
"Too many concurrent readers".into(),
))
}
pub fn unregister_reader(&self, slot_idx: usize) {
if slot_idx < self.num_reader_slots {
self.reader_slot(slot_idx).release(self.our_pid);
*self.our_slot.write() = None;
}
}
pub fn try_acquire_writer(&self) -> Result<WriterGuard<'_>> {
let current = self.header().writer_lock.load(Ordering::Acquire);
if current == 0 {
if self
.header()
.writer_lock
.compare_exchange(0, self.our_pid, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return Ok(WriterGuard { mvcc: self });
}
} else if current == self.our_pid {
return Ok(WriterGuard { mvcc: self });
}
Err(SochDBError::LockError(format!(
"Writer lock held by process {}",
current
)))
}
pub fn acquire_writer(&self, timeout: Duration) -> Result<WriterGuard<'_>> {
let deadline = std::time::Instant::now() + timeout;
loop {
match self.try_acquire_writer() {
Ok(guard) => return Ok(guard),
Err(_) if std::time::Instant::now() < deadline => {
std::thread::sleep(Duration::from_micros(100));
}
Err(e) => return Err(e),
}
}
}
fn release_writer(&self) {
let current = self.header().writer_lock.load(Ordering::Acquire);
if current == self.our_pid {
self.header().writer_lock.store(0, Ordering::Release);
}
}
pub fn version_store(&self) -> &VersionStore {
&self.version_store
}
pub fn min_active_snapshot(&self) -> u64 {
let mut min_ts = u64::MAX;
for i in 0..self.num_reader_slots {
let slot = self.reader_slot(i);
let pid = slot.pid.load(Ordering::Acquire);
if pid != 0 {
let ts = slot.snapshot_ts.load(Ordering::Acquire);
if ts > 0 && ts < min_ts {
min_ts = ts;
}
}
}
min_ts
}
pub fn min_active_epoch(&self) -> u32 {
let mut min_epoch = u32::MAX;
for i in 0..self.num_reader_slots {
let slot = self.reader_slot(i);
let pid = slot.pid.load(Ordering::Acquire);
if pid != 0 {
let epoch = slot.epoch.load(Ordering::Acquire);
if epoch < min_epoch {
min_epoch = epoch;
}
}
}
if min_epoch == u32::MAX {
self.current_epoch() as u32
} else {
min_epoch
}
}
pub fn run_gc(&self) -> usize {
let min_epoch = self.min_active_epoch();
let min_snapshot = self.min_active_snapshot();
self.version_store.gc(min_epoch, min_snapshot)
}
pub fn should_run_gc(&self) -> bool {
self.header().commits_since_gc.load(Ordering::Relaxed) >= GC_COMMIT_INTERVAL
}
pub fn on_commit(&self) {
let count = self
.header()
.commits_since_gc
.fetch_add(1, Ordering::Relaxed);
if count >= GC_COMMIT_INTERVAL {
self.header().commits_since_gc.store(0, Ordering::Relaxed);
let _ = self.run_gc();
}
}
pub fn cleanup_stale_readers(&self) -> usize {
let now = current_time_us();
let mut cleaned = 0;
for i in 0..self.num_reader_slots {
let slot = self.reader_slot(i);
if slot.is_stale(now) {
slot.pid.store(0, Ordering::Release);
cleaned += 1;
}
}
cleaned
}
pub fn advance_epoch(&self) -> u64 {
self.header().current_epoch.fetch_add(1, Ordering::AcqRel) + 1
}
}
impl Drop for ConcurrentMvcc {
fn drop(&mut self) {
if let Some(slot_idx) = *self.our_slot.read() {
self.unregister_reader(slot_idx);
}
self.release_writer();
}
}
pub struct WriterGuard<'a> {
mvcc: &'a ConcurrentMvcc,
}
impl<'a> Drop for WriterGuard<'a> {
fn drop(&mut self) {
self.mvcc.release_writer();
}
}
#[inline]
fn current_time_us() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as u64
}
#[cfg(unix)]
fn process_exists(pid: u32) -> bool {
let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
if result == 0 {
true
} else {
std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
}
}
#[cfg(windows)]
fn process_exists(pid: u32) -> bool {
use windows_sys::Win32::Foundation::CloseHandle;
use windows_sys::Win32::System::Threading::{OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION};
unsafe {
let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
if handle == 0 {
false
} else {
CloseHandle(handle);
true
}
}
}
#[cfg(not(any(unix, windows)))]
fn process_exists(_pid: u32) -> bool {
true }
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_struct_sizes() {
eprintln!("MvccHeader size: {}", std::mem::size_of::<MvccHeader>());
eprintln!("MvccHeader align: {}", std::mem::align_of::<MvccHeader>());
eprintln!("ReaderSlot size: {}", std::mem::size_of::<ReaderSlot>());
eprintln!("ReaderSlot align: {}", std::mem::align_of::<ReaderSlot>());
eprintln!("HEADER_SIZE constant: {}", HEADER_SIZE);
eprintln!("READER_SLOT_SIZE constant: {}", READER_SLOT_SIZE);
eprintln!("METADATA_SIZE constant: {}", METADATA_SIZE);
assert_eq!(
std::mem::size_of::<MvccHeader>(),
HEADER_SIZE,
"MvccHeader size mismatch! Actual: {}, Expected: {}",
std::mem::size_of::<MvccHeader>(),
HEADER_SIZE
);
assert_eq!(
std::mem::size_of::<ReaderSlot>(),
READER_SLOT_SIZE,
"ReaderSlot size mismatch! Actual: {}, Expected: {}",
std::mem::size_of::<ReaderSlot>(),
READER_SLOT_SIZE
);
}
#[test]
fn test_hlc_timestamp_ordering() {
let ts = AtomicU64::new(0);
let t1 = HlcTimestamp::allocate_next(&ts);
let t2 = HlcTimestamp::allocate_next(&ts);
let t3 = HlcTimestamp::allocate_next(&ts);
assert!(t1.raw() < t2.raw());
assert!(t2.raw() < t3.raw());
}
#[test]
fn test_hlc_timestamp_concurrent() {
let ts = Arc::new(AtomicU64::new(0));
let mut handles = vec![];
for _ in 0..8 {
let ts = ts.clone();
handles.push(thread::spawn(move || {
let mut timestamps = vec![];
for _ in 0..1000 {
timestamps.push(HlcTimestamp::allocate_next(&ts).raw());
}
timestamps
}));
}
let mut all_ts: Vec<u64> = handles
.into_iter()
.flat_map(|h| h.join().unwrap())
.collect();
all_ts.sort();
let len_before = all_ts.len();
all_ts.dedup();
assert_eq!(all_ts.len(), len_before, "Duplicate timestamps found!");
}
#[test]
fn test_version_chain_read_at() {
let mut chain = VersionChain::new();
chain.add_uncommitted(Some(b"v80".to_vec()), 3, 1);
chain.commit(3, 80);
chain.add_uncommitted(Some(b"v90".to_vec()), 2, 1);
chain.commit(2, 90);
chain.add_uncommitted(Some(b"v100".to_vec()), 1, 1);
chain.commit(1, 100);
let v = chain.read_at(105, None).unwrap();
assert_eq!(v.value, Some(b"v100".to_vec()));
let v = chain.read_at(95, None).unwrap();
assert_eq!(v.value, Some(b"v90".to_vec()));
let v = chain.read_at(85, None).unwrap();
assert_eq!(v.value, Some(b"v80".to_vec()));
assert!(chain.read_at(75, None).is_none());
}
#[test]
fn test_version_chain_gc() {
let mut chain = VersionChain::new();
for i in (0..10u64).rev() {
chain.add_uncommitted(
Some(format!("v{}", 100 - i * 5).into_bytes()),
i,
(10 - i) as u32,
);
chain.commit(i, 100 - i * 5);
}
assert_eq!(chain.len(), 10);
let reclaimed = chain.gc(7, 75);
assert!(
reclaimed > 0,
"Should have reclaimed some versions, got {}",
reclaimed
);
assert!(chain.len() >= 1, "Should keep at least one version");
}
#[test]
fn test_version_store_basic() {
let store = VersionStore::new();
store
.insert_uncommitted(b"key1", Some(b"value1".to_vec()), 1, 1)
.unwrap();
assert!(store.commit(b"key1", 1, 100));
let value = store.get(b"key1", 150, None);
assert_eq!(value, Some(b"value1".to_vec()));
let value = store.get(b"key1", 50, None);
assert!(value.is_none());
}
#[test]
fn test_reader_slot_claim_release() {
let slot = ReaderSlot::empty();
assert!(slot.is_free());
assert!(slot.try_claim(1234, 100, 1));
assert!(!slot.is_free());
assert!(!slot.try_claim(5678, 200, 2));
assert!(slot.try_claim(1234, 300, 3));
slot.release(1234);
assert!(slot.is_free());
}
}