use std::fs::{File, OpenOptions};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use dashmap::DashMap;
use parking_lot::RwLock;
use sochdb_core::{Result, SochDBError};
pub type ConcurrentVersionChain = VersionChain;
pub type ConcurrentVersionEntry = VersionEntry;
const MVCC_MAGIC: u64 = 0x43435F564D484353;
const MVCC_VERSION: u32 = 1;
const MAX_READERS: usize = 1024;
const READER_SLOT_SIZE: usize = 64;
const HEADER_SIZE: usize = 64;
const METADATA_SIZE: usize = HEADER_SIZE + (MAX_READERS * READER_SLOT_SIZE);
const STALE_READER_TIMEOUT_US: u64 = 60_000_000;
const GC_COMMIT_INTERVAL: u64 = 1000;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct HlcTimestamp(pub u64);
impl HlcTimestamp {
#[inline]
pub fn new(physical_ms: u64, logical: u16) -> Self {
Self((physical_ms << 16) | (logical as u64))
}
#[inline]
pub fn physical_ms(&self) -> u64 {
self.0 >> 16
}
#[inline]
pub fn logical(&self) -> u16 {
(self.0 & 0xFFFF) as u16
}
#[inline]
pub fn raw(&self) -> u64 {
self.0
}
pub fn allocate_next(last: &AtomicU64) -> Self {
let physical_now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
loop {
let last_val = last.load(Ordering::Acquire);
let last_phys = last_val >> 16;
let last_log = (last_val & 0xFFFF) as u16;
let (new_phys, new_log) = if physical_now > last_phys {
(physical_now, 0u16)
} else {
(last_phys, last_log.saturating_add(1))
};
let new_val = (new_phys << 16) | (new_log as u64);
if last
.compare_exchange(last_val, new_val, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return Self(new_val);
}
std::hint::spin_loop();
}
}
#[inline]
pub fn read_current(ts: &AtomicU64) -> Self {
Self(ts.load(Ordering::Acquire))
}
}
impl From<u64> for HlcTimestamp {
fn from(val: u64) -> Self {
Self(val)
}
}
impl From<HlcTimestamp> for u64 {
fn from(ts: HlcTimestamp) -> Self {
ts.0
}
}
#[repr(C, align(64))]
#[derive(Debug)]
pub struct ReaderSlot {
pub pid: AtomicU32,
pub snapshot_ts: AtomicU64,
pub epoch: AtomicU32,
pub last_heartbeat: AtomicU64,
_reserved: [u8; 36],
}
impl ReaderSlot {
pub const fn empty() -> Self {
Self {
pid: AtomicU32::new(0),
snapshot_ts: AtomicU64::new(0),
epoch: AtomicU32::new(0),
last_heartbeat: AtomicU64::new(0),
_reserved: [0u8; 36],
}
}
#[inline]
pub fn is_free(&self) -> bool {
self.pid.load(Ordering::Acquire) == 0
}
#[inline]
pub fn try_claim(&self, my_pid: u32, snapshot_ts: u64, epoch: u32) -> bool {
let current_pid = self.pid.load(Ordering::Acquire);
if current_pid != 0 && current_pid != my_pid {
return false;
}
if self
.pid
.compare_exchange(current_pid, my_pid, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
self.snapshot_ts.store(snapshot_ts, Ordering::Release);
self.epoch.store(epoch, Ordering::Release);
self.last_heartbeat
.store(current_time_us(), Ordering::Release);
true
} else {
false
}
}
#[inline]
pub fn release(&self, my_pid: u32) {
if self.pid.load(Ordering::Acquire) == my_pid {
self.snapshot_ts.store(0, Ordering::Release);
self.pid.store(0, Ordering::Release);
}
}
#[inline]
pub fn heartbeat(&self) {
self.last_heartbeat
.store(current_time_us(), Ordering::Release);
}
#[inline]
pub fn is_stale(&self, now_us: u64) -> bool {
let pid = self.pid.load(Ordering::Acquire);
if pid == 0 {
return false; }
let last_hb = self.last_heartbeat.load(Ordering::Acquire);
if now_us.saturating_sub(last_hb) > STALE_READER_TIMEOUT_US {
return true;
}
!process_exists(pid)
}
}
#[repr(C)]
#[derive(Debug)]
pub struct MvccHeader {
pub magic: u64,
pub version: u32,
pub page_size: u32,
pub num_readers: u32,
pub current_epoch: AtomicU64,
pub current_ts: AtomicU64,
pub writer_lock: AtomicU32,
pub commits_since_gc: AtomicU64,
_reserved: [u8; 4],
}
impl MvccHeader {
pub fn new() -> Self {
Self {
magic: MVCC_MAGIC,
version: MVCC_VERSION,
page_size: 4096,
num_readers: MAX_READERS as u32,
current_epoch: AtomicU64::new(1),
current_ts: AtomicU64::new(HlcTimestamp::new(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
0,
).raw()),
writer_lock: AtomicU32::new(0),
commits_since_gc: AtomicU64::new(0),
_reserved: [0u8; 4],
}
}
pub fn validate(&self) -> Result<()> {
if self.magic != MVCC_MAGIC {
return Err(SochDBError::Corruption(
"Invalid MVCC metadata magic".into(),
));
}
if self.version != MVCC_VERSION {
return Err(SochDBError::Corruption(format!(
"Unsupported MVCC version: {} (expected {})",
self.version, MVCC_VERSION
)));
}
Ok(())
}
}
impl Default for MvccHeader {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct VersionEntry {
pub commit_ts: u64,
pub txn_id: u64,
pub epoch: u32,
pub value: Option<Vec<u8>>,
}
impl VersionEntry {
pub fn new(commit_ts: u64, txn_id: u64, epoch: u32, value: Option<Vec<u8>>) -> Self {
Self {
commit_ts,
txn_id,
epoch,
value,
}
}
#[inline]
pub fn is_visible_at(&self, snapshot_ts: u64) -> bool {
self.commit_ts > 0 && self.commit_ts < snapshot_ts
}
}
#[derive(Debug, Default)]
pub struct VersionChain {
versions: Vec<VersionEntry>,
uncommitted: Option<VersionEntry>,
}
impl VersionChain {
pub fn new() -> Self {
Self {
versions: Vec::new(),
uncommitted: None,
}
}
pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64, epoch: u32) {
self.uncommitted = Some(VersionEntry {
commit_ts: 0, txn_id,
epoch,
value,
});
}
pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
if let Some(ref mut v) = self.uncommitted {
if v.txn_id == txn_id {
v.commit_ts = commit_ts;
let committed = self.uncommitted.take().unwrap();
let pos = self
.versions
.partition_point(|existing| existing.commit_ts > commit_ts);
self.versions.insert(pos, committed);
return true;
}
}
false
}
pub fn abort(&mut self, txn_id: u64) {
if let Some(ref v) = self.uncommitted {
if v.txn_id == txn_id {
self.uncommitted = None;
}
}
}
pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&VersionEntry> {
if let Some(txn_id) = current_txn_id {
if let Some(ref v) = self.uncommitted {
if v.txn_id == txn_id {
return Some(v);
}
}
}
let idx = self
.versions
.partition_point(|v| v.commit_ts >= snapshot_ts);
self.versions.get(idx)
}
pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
if let Some(ref v) = self.uncommitted {
return v.txn_id != my_txn_id;
}
false
}
pub fn gc(&mut self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
if self.versions.len() <= 1 {
return 0; }
let original_len = self.versions.len();
let mut keep_count = 1; for v in self.versions.iter().skip(1) {
if v.epoch >= min_epoch || v.commit_ts >= min_snapshot_ts {
keep_count += 1;
} else {
break; }
}
self.versions.truncate(keep_count);
original_len - self.versions.len()
}
pub fn len(&self) -> usize {
self.versions.len() + if self.uncommitted.is_some() { 1 } else { 0 }
}
pub fn is_empty(&self) -> bool {
self.versions.is_empty() && self.uncommitted.is_none()
}
}
pub struct VersionStore {
data: DashMap<Vec<u8>, VersionChain>,
stats: VersionStoreStats,
}
#[derive(Debug, Default)]
pub struct VersionStoreStats {
pub num_keys: AtomicU64,
pub num_versions: AtomicU64,
pub gc_passes: AtomicU64,
pub versions_reclaimed: AtomicU64,
}
impl VersionStore {
pub fn new() -> Self {
Self {
data: DashMap::new(),
stats: VersionStoreStats::default(),
}
}
pub fn insert_uncommitted(
&self,
key: &[u8],
value: Option<Vec<u8>>,
txn_id: u64,
epoch: u32,
) -> Result<()> {
let mut entry = self.data.entry(key.to_vec()).or_insert_with(|| {
self.stats.num_keys.fetch_add(1, Ordering::Relaxed);
VersionChain::new()
});
if entry.has_write_conflict(txn_id) {
return Err(SochDBError::Internal(
"Write conflict: another transaction has uncommitted write".into(),
));
}
entry.add_uncommitted(value, txn_id, epoch);
self.stats.num_versions.fetch_add(1, Ordering::Relaxed);
Ok(())
}
pub fn commit(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
if let Some(mut entry) = self.data.get_mut(key) {
return entry.commit(txn_id, commit_ts);
}
false
}
pub fn abort(&self, key: &[u8], txn_id: u64) {
if let Some(mut entry) = self.data.get_mut(key) {
entry.abort(txn_id);
self.stats.num_versions.fetch_sub(1, Ordering::Relaxed);
}
}
pub fn get(&self, key: &[u8], snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<Vec<u8>> {
self.data.get(key).and_then(|chain| {
chain
.read_at(snapshot_ts, current_txn_id)
.and_then(|v| v.value.clone())
})
}
pub fn contains(&self, key: &[u8], snapshot_ts: u64) -> bool {
self.data
.get(key)
.map(|chain| chain.read_at(snapshot_ts, None).is_some())
.unwrap_or(false)
}
pub fn gc(&self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
let mut total_reclaimed = 0;
for mut entry in self.data.iter_mut() {
let reclaimed = entry.gc(min_epoch, min_snapshot_ts);
total_reclaimed += reclaimed;
}
self.stats
.gc_passes
.fetch_add(1, Ordering::Relaxed);
self.stats
.versions_reclaimed
.fetch_add(total_reclaimed as u64, Ordering::Relaxed);
total_reclaimed
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn stats(&self) -> &VersionStoreStats {
&self.stats
}
}
impl Default for VersionStore {
fn default() -> Self {
Self::new()
}
}
pub struct ConcurrentMvcc {
path: PathBuf,
header: MvccHeader,
reader_slots: Vec<ReaderSlot>,
version_store: VersionStore,
our_pid: u32,
our_slot: RwLock<Option<usize>>,
}
impl ConcurrentMvcc {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref().to_path_buf();
std::fs::create_dir_all(&path)?;
let metadata_path = path.join(".mvcc_metadata");
let header = if metadata_path.exists() {
Self::load_header(&metadata_path)?
} else {
let header = MvccHeader::new();
Self::save_header(&metadata_path, &header)?;
header
};
let mut reader_slots = Vec::with_capacity(MAX_READERS);
for _ in 0..MAX_READERS {
reader_slots.push(ReaderSlot::empty());
}
Ok(Self {
path,
header,
reader_slots,
version_store: VersionStore::new(),
our_pid: std::process::id(),
our_slot: RwLock::new(None),
})
}
fn load_header(path: &Path) -> Result<MvccHeader> {
use std::io::Read;
let mut file = File::open(path)?;
let mut buf = vec![0u8; std::mem::size_of::<MvccHeader>()];
file.read_exact(&mut buf)?;
let header: MvccHeader = unsafe { std::ptr::read(buf.as_ptr() as *const MvccHeader) };
header.validate()?;
Ok(header)
}
fn save_header(path: &Path, header: &MvccHeader) -> Result<()> {
use std::io::Write;
let mut file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)?;
let buf = unsafe {
std::slice::from_raw_parts(
header as *const MvccHeader as *const u8,
std::mem::size_of::<MvccHeader>(),
)
};
file.write_all(buf)?;
file.sync_all()?;
Ok(())
}
#[inline]
pub fn allocate_timestamp(&self) -> HlcTimestamp {
HlcTimestamp::allocate_next(&self.header.current_ts)
}
#[inline]
pub fn current_timestamp(&self) -> HlcTimestamp {
HlcTimestamp::read_current(&self.header.current_ts)
}
#[inline]
pub fn current_epoch(&self) -> u64 {
self.header.current_epoch.load(Ordering::Acquire)
}
pub fn register_reader(&self) -> Result<usize> {
let snapshot_ts = self.current_timestamp().raw();
let epoch = self.current_epoch() as u32;
for (i, slot) in self.reader_slots.iter().enumerate() {
if slot.try_claim(self.our_pid, snapshot_ts, epoch) {
*self.our_slot.write() = Some(i);
return Ok(i);
}
}
Err(SochDBError::ResourceExhausted(
"Too many concurrent readers".into(),
))
}
pub fn unregister_reader(&self, slot_idx: usize) {
if slot_idx < self.reader_slots.len() {
self.reader_slots[slot_idx].release(self.our_pid);
*self.our_slot.write() = None;
}
}
pub fn try_acquire_writer(&self) -> Result<WriterGuard<'_>> {
let current = self.header.writer_lock.load(Ordering::Acquire);
if current == 0 {
if self
.header
.writer_lock
.compare_exchange(0, self.our_pid, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return Ok(WriterGuard { mvcc: self });
}
} else if current == self.our_pid {
return Ok(WriterGuard { mvcc: self });
}
Err(SochDBError::LockError(format!(
"Writer lock held by process {}",
current
)))
}
pub fn acquire_writer(&self, timeout: Duration) -> Result<WriterGuard<'_>> {
let deadline = std::time::Instant::now() + timeout;
loop {
match self.try_acquire_writer() {
Ok(guard) => return Ok(guard),
Err(_) if std::time::Instant::now() < deadline => {
std::thread::sleep(Duration::from_micros(100));
}
Err(e) => return Err(e),
}
}
}
fn release_writer(&self) {
let current = self.header.writer_lock.load(Ordering::Acquire);
if current == self.our_pid {
self.header.writer_lock.store(0, Ordering::Release);
}
}
pub fn version_store(&self) -> &VersionStore {
&self.version_store
}
pub fn min_active_snapshot(&self) -> u64 {
let mut min_ts = u64::MAX;
for slot in &self.reader_slots {
let pid = slot.pid.load(Ordering::Acquire);
if pid != 0 {
let ts = slot.snapshot_ts.load(Ordering::Acquire);
if ts > 0 && ts < min_ts {
min_ts = ts;
}
}
}
min_ts
}
pub fn min_active_epoch(&self) -> u32 {
let mut min_epoch = u32::MAX;
for slot in &self.reader_slots {
let pid = slot.pid.load(Ordering::Acquire);
if pid != 0 {
let epoch = slot.epoch.load(Ordering::Acquire);
if epoch < min_epoch {
min_epoch = epoch;
}
}
}
if min_epoch == u32::MAX {
self.current_epoch() as u32
} else {
min_epoch
}
}
pub fn run_gc(&self) -> usize {
let min_epoch = self.min_active_epoch();
let min_snapshot = self.min_active_snapshot();
self.version_store.gc(min_epoch, min_snapshot)
}
pub fn should_run_gc(&self) -> bool {
self.header
.commits_since_gc
.load(Ordering::Relaxed)
>= GC_COMMIT_INTERVAL
}
pub fn on_commit(&self) {
let count = self
.header
.commits_since_gc
.fetch_add(1, Ordering::Relaxed);
if count >= GC_COMMIT_INTERVAL {
self.header.commits_since_gc.store(0, Ordering::Relaxed);
let _ = self.run_gc();
}
}
pub fn cleanup_stale_readers(&self) -> usize {
let now = current_time_us();
let mut cleaned = 0;
for slot in &self.reader_slots {
if slot.is_stale(now) {
slot.pid.store(0, Ordering::Release);
cleaned += 1;
}
}
cleaned
}
pub fn advance_epoch(&self) -> u64 {
self.header.current_epoch.fetch_add(1, Ordering::AcqRel) + 1
}
}
impl Drop for ConcurrentMvcc {
fn drop(&mut self) {
if let Some(slot_idx) = *self.our_slot.read() {
self.unregister_reader(slot_idx);
}
self.release_writer();
}
}
pub struct WriterGuard<'a> {
mvcc: &'a ConcurrentMvcc,
}
impl<'a> Drop for WriterGuard<'a> {
fn drop(&mut self) {
self.mvcc.release_writer();
}
}
#[inline]
fn current_time_us() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as u64
}
#[cfg(unix)]
fn process_exists(pid: u32) -> bool {
let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
if result == 0 {
true
} else {
std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
}
}
#[cfg(windows)]
fn process_exists(pid: u32) -> bool {
use windows_sys::Win32::Foundation::CloseHandle;
use windows_sys::Win32::System::Threading::{OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION};
unsafe {
let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
if handle == 0 {
false
} else {
CloseHandle(handle);
true
}
}
}
#[cfg(not(any(unix, windows)))]
fn process_exists(_pid: u32) -> bool {
true }
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_hlc_timestamp_ordering() {
let ts = AtomicU64::new(0);
let t1 = HlcTimestamp::allocate_next(&ts);
let t2 = HlcTimestamp::allocate_next(&ts);
let t3 = HlcTimestamp::allocate_next(&ts);
assert!(t1.raw() < t2.raw());
assert!(t2.raw() < t3.raw());
}
#[test]
fn test_hlc_timestamp_concurrent() {
let ts = Arc::new(AtomicU64::new(0));
let mut handles = vec![];
for _ in 0..8 {
let ts = ts.clone();
handles.push(thread::spawn(move || {
let mut timestamps = vec![];
for _ in 0..1000 {
timestamps.push(HlcTimestamp::allocate_next(&ts).raw());
}
timestamps
}));
}
let mut all_ts: Vec<u64> = handles
.into_iter()
.flat_map(|h| h.join().unwrap())
.collect();
all_ts.sort();
let len_before = all_ts.len();
all_ts.dedup();
assert_eq!(all_ts.len(), len_before, "Duplicate timestamps found!");
}
#[test]
fn test_version_chain_read_at() {
let mut chain = VersionChain::new();
chain.versions.push(VersionEntry::new(100, 1, 1, Some(b"v100".to_vec())));
chain.versions.push(VersionEntry::new(90, 2, 1, Some(b"v90".to_vec())));
chain.versions.push(VersionEntry::new(80, 3, 1, Some(b"v80".to_vec())));
let v = chain.read_at(105, None).unwrap();
assert_eq!(v.value, Some(b"v100".to_vec()));
let v = chain.read_at(95, None).unwrap();
assert_eq!(v.value, Some(b"v90".to_vec()));
let v = chain.read_at(85, None).unwrap();
assert_eq!(v.value, Some(b"v80".to_vec()));
assert!(chain.read_at(75, None).is_none());
}
#[test]
fn test_version_chain_gc() {
let mut chain = VersionChain::new();
for i in 0..10 {
chain.versions.push(VersionEntry::new(
100 - i * 5, i as u64, (10 - i) as u32, Some(format!("v{}", 100 - i * 5).into_bytes()),
));
}
assert_eq!(chain.len(), 10);
let reclaimed = chain.gc(7, 75);
assert!(reclaimed > 0, "Should have reclaimed some versions, got {}", reclaimed);
assert!(chain.len() >= 1, "Should keep at least one version");
}
#[test]
fn test_version_store_basic() {
let store = VersionStore::new();
store
.insert_uncommitted(b"key1", Some(b"value1".to_vec()), 1, 1)
.unwrap();
assert!(store.commit(b"key1", 1, 100));
let value = store.get(b"key1", 150, None);
assert_eq!(value, Some(b"value1".to_vec()));
let value = store.get(b"key1", 50, None);
assert!(value.is_none());
}
#[test]
fn test_reader_slot_claim_release() {
let slot = ReaderSlot::empty();
assert!(slot.is_free());
assert!(slot.try_claim(1234, 100, 1));
assert!(!slot.is_free());
assert!(!slot.try_claim(5678, 200, 2));
assert!(slot.try_claim(1234, 300, 3));
slot.release(1234);
assert!(slot.is_free());
}
}