pub type TxnId = u64;
pub type Timestamp = u64;
#[derive(Debug, Clone)]
pub struct VisibilityContext {
pub reader_txn_id: TxnId,
pub snapshot_ts: Timestamp,
pub active_txn_ids: std::collections::HashSet<TxnId>,
}
impl VisibilityContext {
pub fn new(reader_txn_id: TxnId, snapshot_ts: Timestamp) -> Self {
Self {
reader_txn_id,
snapshot_ts,
active_txn_ids: std::collections::HashSet::new(),
}
}
pub fn with_active_txns(
reader_txn_id: TxnId,
snapshot_ts: Timestamp,
active_txn_ids: std::collections::HashSet<TxnId>,
) -> Self {
Self {
reader_txn_id,
snapshot_ts,
active_txn_ids,
}
}
pub fn is_committed_before(&self, txn_id: TxnId, commit_ts: Option<Timestamp>) -> bool {
match commit_ts {
Some(ts) => ts < self.snapshot_ts && !self.active_txn_ids.contains(&txn_id),
None => false,
}
}
}
#[derive(Debug, Clone)]
pub struct VersionMeta {
pub created_by: TxnId,
pub created_ts: Timestamp,
pub deleted_by: TxnId,
pub deleted_ts: Timestamp,
pub commit_ts: Timestamp,
}
impl VersionMeta {
pub fn new_uncommitted(created_by: TxnId, created_ts: Timestamp) -> Self {
Self {
created_by,
created_ts,
deleted_by: 0,
deleted_ts: 0,
commit_ts: 0,
}
}
pub fn is_visible(&self, ctx: &VisibilityContext) -> bool {
if self.commit_ts == 0 {
return self.created_by == ctx.reader_txn_id;
}
if self.commit_ts >= ctx.snapshot_ts {
return false;
}
if self.deleted_by != 0 && self.deleted_ts < ctx.snapshot_ts {
return false;
}
true
}
pub fn commit(&mut self, commit_ts: Timestamp) {
self.commit_ts = commit_ts;
}
pub fn delete(&mut self, deleted_by: TxnId, deleted_ts: Timestamp) {
self.deleted_by = deleted_by;
self.deleted_ts = deleted_ts;
}
pub fn is_committed(&self) -> bool {
self.commit_ts != 0
}
pub fn is_deleted(&self) -> bool {
self.deleted_by != 0
}
}
pub trait MvccVersionChain {
type Value;
fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value>;
fn get_latest(&self) -> Option<&Self::Value>;
fn version_count(&self) -> usize;
fn is_empty(&self) -> bool {
self.version_count() == 0
}
}
pub trait MvccVersionChainMut: MvccVersionChain {
fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId);
fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool;
fn delete_version(&mut self, txn_id: TxnId, delete_ts: Timestamp) -> bool;
fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize);
}
pub trait WriteConflictDetection {
fn has_write_conflict(&self, txn_id: TxnId) -> bool;
}
pub trait ConcurrencyPolicy: Send + Sync {
const NAME: &'static str;
}
pub struct ExternalLock;
impl ConcurrencyPolicy for ExternalLock {
const NAME: &'static str = "ExternalLock";
}
pub struct InternalRwLock;
impl ConcurrencyPolicy for InternalRwLock {
const NAME: &'static str = "InternalRwLock";
}
pub struct LockFreeAtomic;
impl ConcurrencyPolicy for LockFreeAtomic {
const NAME: &'static str = "LockFreeAtomic";
}
unsafe impl Send for ExternalLock {}
unsafe impl Sync for ExternalLock {}
unsafe impl Send for InternalRwLock {}
unsafe impl Sync for InternalRwLock {}
unsafe impl Send for LockFreeAtomic {}
unsafe impl Sync for LockFreeAtomic {}
pub trait ChainEntry: Sized + std::fmt::Debug {
fn commit_ts(&self) -> u64;
fn txn_id(&self) -> u64;
fn set_commit_ts(&mut self, ts: u64);
}
#[derive(Debug)]
pub struct BinarySearchChain<E: ChainEntry> {
committed: Vec<E>,
uncommitted: Option<E>,
}
impl<E: ChainEntry> Default for BinarySearchChain<E> {
fn default() -> Self {
Self::new()
}
}
impl<E: ChainEntry> BinarySearchChain<E> {
#[inline]
pub fn new() -> Self {
Self {
committed: Vec::new(),
uncommitted: None,
}
}
#[inline]
pub fn set_uncommitted(&mut self, entry: E) -> Option<E> {
self.uncommitted.replace(entry)
}
#[inline]
pub fn uncommitted(&self) -> Option<&E> {
self.uncommitted.as_ref()
}
#[inline]
pub fn uncommitted_mut(&mut self) -> Option<&mut E> {
self.uncommitted.as_mut()
}
pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
if let Some(ref mut v) = self.uncommitted {
if v.txn_id() == txn_id && v.commit_ts() == 0 {
v.set_commit_ts(commit_ts);
let committed_version = self.uncommitted.take().unwrap();
let insert_pos = self
.committed
.partition_point(|e| e.commit_ts() > commit_ts);
self.committed.insert(insert_pos, committed_version);
return true;
}
}
false
}
#[inline]
pub fn abort(&mut self, txn_id: u64) {
if let Some(ref v) = self.uncommitted {
if v.txn_id() == txn_id {
self.uncommitted = None;
}
}
}
#[inline]
pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&E> {
if let Some(txn_id) = current_txn_id {
if let Some(ref v) = self.uncommitted {
if v.txn_id() == txn_id {
return Some(v);
}
}
}
let idx = self
.committed
.partition_point(|v| v.commit_ts() >= snapshot_ts);
self.committed.get(idx)
}
#[inline]
pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
if let Some(ref v) = self.uncommitted {
return v.txn_id() != my_txn_id;
}
false
}
pub fn gc_by_ts(&mut self, min_active_ts: u64) {
if self.committed.len() <= 1 {
return;
}
let split_idx = self
.committed
.partition_point(|v| v.commit_ts() >= min_active_ts);
let keep_count = if split_idx < self.committed.len() {
split_idx + 1
} else {
split_idx
};
self.committed.truncate(keep_count);
}
#[inline]
pub fn version_count(&self) -> usize {
self.committed.len() + usize::from(self.uncommitted.is_some())
}
#[inline]
pub fn committed_count(&self) -> usize {
self.committed.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.committed.is_empty() && self.uncommitted.is_none()
}
#[inline]
pub fn committed_versions(&self) -> &[E] {
&self.committed
}
#[inline]
pub fn committed_versions_mut(&mut self) -> &mut Vec<E> {
&mut self.committed
}
#[inline]
pub fn latest(&self) -> Option<&E> {
self.uncommitted.as_ref().or_else(|| self.committed.first())
}
#[inline]
pub fn latest_committed(&self) -> Option<&E> {
self.committed.first()
}
}
#[derive(Debug)]
pub enum MvccStoreError {
WriteConflict,
}
impl std::fmt::Display for MvccStoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::WriteConflict => write!(f, "write-write conflict"),
}
}
}
impl std::error::Error for MvccStoreError {}
#[derive(Debug, Default, Clone)]
pub struct MvccGcStats {
pub versions_removed: usize,
pub keys_scanned: usize,
}
pub trait MvccStore: Send + Sync {
fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>>;
fn mvcc_put(
&self,
key: &[u8],
value: Option<Vec<u8>>,
txn_id: u64,
) -> Result<(), MvccStoreError>;
fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool;
fn mvcc_abort_key(&self, key: &[u8], txn_id: u64);
fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool;
fn mvcc_gc(&self, min_ts: u64) -> MvccGcStats;
fn mvcc_key_count(&self) -> usize;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_version_meta_visibility() {
let mut meta = VersionMeta::new_uncommitted(1, 100);
let ctx = VisibilityContext::new(1, 200);
assert!(meta.is_visible(&ctx));
let ctx2 = VisibilityContext::new(2, 200);
assert!(!meta.is_visible(&ctx2));
meta.commit(150);
assert!(meta.is_visible(&ctx2));
let ctx3 = VisibilityContext::new(3, 100);
assert!(!meta.is_visible(&ctx3));
}
#[test]
fn test_version_meta_deletion() {
let mut meta = VersionMeta::new_uncommitted(1, 100);
meta.commit(150);
meta.delete(2, 200);
let ctx = VisibilityContext::new(3, 180);
assert!(meta.is_visible(&ctx));
let ctx2 = VisibilityContext::new(3, 250);
assert!(!meta.is_visible(&ctx2));
}
#[test]
fn test_visibility_context_committed_before() {
let mut active = std::collections::HashSet::new();
active.insert(5);
let ctx = VisibilityContext::with_active_txns(1, 200, active);
assert!(ctx.is_committed_before(2, Some(100)));
assert!(!ctx.is_committed_before(3, Some(250)));
assert!(!ctx.is_committed_before(5, Some(100)));
assert!(!ctx.is_committed_before(6, None));
}
#[test]
fn test_concurrency_policy_names() {
assert_eq!(ExternalLock::NAME, "ExternalLock");
assert_eq!(InternalRwLock::NAME, "InternalRwLock");
assert_eq!(LockFreeAtomic::NAME, "LockFreeAtomic");
}
#[derive(Debug, Clone)]
struct TestEntry {
commit_ts: u64,
txn_id: u64,
val: i32,
}
impl ChainEntry for TestEntry {
fn commit_ts(&self) -> u64 {
self.commit_ts
}
fn txn_id(&self) -> u64 {
self.txn_id
}
fn set_commit_ts(&mut self, ts: u64) {
self.commit_ts = ts;
}
}
#[test]
fn test_binary_search_chain_commit_and_read() {
let mut chain = BinarySearchChain::<TestEntry>::new();
assert!(chain.is_empty());
chain.set_uncommitted(TestEntry {
commit_ts: 0,
txn_id: 1,
val: 10,
});
assert_eq!(chain.version_count(), 1);
let v = chain.read_at(100, Some(1)).unwrap();
assert_eq!(v.val, 10);
assert!(chain.read_at(100, Some(2)).is_none());
assert!(chain.commit(1, 50));
assert_eq!(chain.committed_count(), 1);
let v = chain.read_at(51, None).unwrap();
assert_eq!(v.val, 10);
assert!(chain.read_at(50, None).is_none());
}
#[test]
fn test_binary_search_chain_abort() {
let mut chain = BinarySearchChain::<TestEntry>::new();
chain.set_uncommitted(TestEntry {
commit_ts: 0,
txn_id: 1,
val: 10,
});
chain.abort(1);
assert!(chain.is_empty());
chain.set_uncommitted(TestEntry {
commit_ts: 0,
txn_id: 2,
val: 20,
});
chain.abort(1);
assert_eq!(chain.version_count(), 1);
}
#[test]
fn test_binary_search_chain_write_conflict() {
let mut chain = BinarySearchChain::<TestEntry>::new();
assert!(!chain.has_write_conflict(1));
chain.set_uncommitted(TestEntry {
commit_ts: 0,
txn_id: 1,
val: 10,
});
assert!(!chain.has_write_conflict(1)); assert!(chain.has_write_conflict(2)); }
#[test]
fn test_binary_search_chain_gc() {
let mut chain = BinarySearchChain::<TestEntry>::new();
for i in 1..=5u64 {
chain.set_uncommitted(TestEntry {
commit_ts: 0,
txn_id: i,
val: i as i32,
});
chain.commit(i, i * 10);
}
assert_eq!(chain.committed_count(), 5);
chain.gc_by_ts(25);
assert_eq!(chain.committed_count(), 4);
chain.gc_by_ts(45);
assert_eq!(chain.committed_count(), 2); }
#[test]
fn test_binary_search_chain_multiple_versions() {
let mut chain = BinarySearchChain::<TestEntry>::new();
for (i, ts) in [100u64, 200, 300].iter().enumerate() {
let txn = (i + 1) as u64;
chain.set_uncommitted(TestEntry {
commit_ts: 0,
txn_id: txn,
val: *ts as i32,
});
chain.commit(txn, *ts);
}
assert_eq!(chain.read_at(150, None).unwrap().val, 100);
assert_eq!(chain.read_at(250, None).unwrap().val, 200);
assert_eq!(chain.read_at(350, None).unwrap().val, 300);
assert!(chain.read_at(50, None).is_none());
}
}
#[cfg(test)]
mod version_chain_properties {
use super::*;
use proptest::prelude::*;
#[derive(Debug, Clone)]
struct PropEntry {
commit_ts: u64,
val: i32,
}
impl ChainEntry for PropEntry {
fn commit_ts(&self) -> u64 {
self.commit_ts
}
fn txn_id(&self) -> u64 {
0
}
fn set_commit_ts(&mut self, ts: u64) {
self.commit_ts = ts;
}
}
fn build_chain(mut tss: Vec<u64>) -> BinarySearchChain<PropEntry> {
tss.sort_unstable();
tss.dedup();
let mut chain = BinarySearchChain::<PropEntry>::new();
for ts in tss {
chain.set_uncommitted(PropEntry {
commit_ts: 0,
val: ts as i32,
});
chain.commit(0, ts);
}
chain
}
fn expected_visible(tss: &[u64], snapshot: u64) -> Option<i32> {
tss.iter()
.copied()
.filter(|&ts| ts < snapshot)
.max()
.map(|ts| ts as i32)
}
proptest! {
#[test]
fn prop_read_at_matches_strict_visibility(
tss in prop::collection::vec(1u64..1000, 0..32),
snapshot in 0u64..1100,
) {
let mut sorted = tss.clone();
sorted.sort_unstable();
sorted.dedup();
let chain = build_chain(tss);
let got = chain.read_at(snapshot, None).map(|e| e.val);
prop_assert_eq!(got, expected_visible(&sorted, snapshot));
}
#[test]
fn prop_gc_preserves_reads_for_live_snapshots(
tss in prop::collection::vec(1u64..1000, 1..32),
min_active in 1u64..1000,
extra in prop::collection::vec(0u64..200, 1..6),
) {
let mut sorted = tss.clone();
sorted.sort_unstable();
sorted.dedup();
let mut chain = build_chain(tss);
let snapshots: Vec<u64> = extra.iter().map(|d| min_active + *d).collect();
let before: Vec<Option<i32>> = snapshots
.iter()
.map(|&s| chain.read_at(s, None).map(|e| e.val))
.collect();
chain.gc_by_ts(min_active);
for (i, &s) in snapshots.iter().enumerate() {
let after = chain.read_at(s, None).map(|e| e.val);
prop_assert_eq!(after, before[i],
"GC at {} changed read at snapshot {}", min_active, s);
prop_assert_eq!(after, expected_visible(&sorted, s));
}
}
}
}