pub use crate::storage::primitives::ids::{current_timestamp, next_timestamp, Timestamp, TxnId};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VersionVisibility {
Visible,
Uncommitted,
Deleted,
Future,
}
#[derive(Debug, Clone)]
pub struct Version<V: Clone> {
pub created_by: TxnId,
pub created_at: Timestamp,
pub deleted_by: TxnId,
pub deleted_at: Timestamp,
pub value: Option<V>,
pub prev: Option<Box<Version<V>>>,
}
impl<V: Clone> Version<V> {
pub fn new(value: V, txn_id: TxnId, timestamp: Timestamp) -> Self {
Self {
created_by: txn_id,
created_at: timestamp,
deleted_by: TxnId::ZERO,
deleted_at: Timestamp::EPOCH,
value: Some(value),
prev: None,
}
}
pub fn tombstone(txn_id: TxnId, timestamp: Timestamp) -> Self {
Self {
created_by: txn_id,
created_at: timestamp,
deleted_by: TxnId::ZERO,
deleted_at: Timestamp::EPOCH,
value: None,
prev: None,
}
}
pub fn is_tombstone(&self) -> bool {
self.value.is_none()
}
pub fn is_deleted(&self) -> bool {
!self.deleted_by.is_zero()
}
pub fn mark_deleted(&mut self, txn_id: TxnId, timestamp: Timestamp) {
self.deleted_by = txn_id;
self.deleted_at = timestamp;
}
pub fn check_visibility(&self, snapshot: &Snapshot) -> VersionVisibility {
if !self.created_by.is_zero() && !snapshot.is_committed(self.created_by) {
if self.created_by == snapshot.txn_id {
if self.is_deleted() && self.deleted_by == snapshot.txn_id {
return VersionVisibility::Deleted;
}
return VersionVisibility::Visible;
}
return VersionVisibility::Uncommitted;
}
if self.created_at > snapshot.start_ts {
return VersionVisibility::Future;
}
if self.is_deleted()
&& snapshot.is_committed(self.deleted_by)
&& self.deleted_at <= snapshot.start_ts
{
return VersionVisibility::Deleted;
}
VersionVisibility::Visible
}
}
#[derive(Debug, Clone)]
pub struct VersionChain<V: Clone> {
head: Option<Box<Version<V>>>,
version_count: usize,
oldest_ts: Timestamp,
}
impl<V: Clone> VersionChain<V> {
pub fn new() -> Self {
Self {
head: None,
version_count: 0,
oldest_ts: Timestamp::EPOCH,
}
}
pub fn with_value(value: V, txn_id: TxnId, timestamp: Timestamp) -> Self {
Self {
head: Some(Box::new(Version::new(value, txn_id, timestamp))),
version_count: 1,
oldest_ts: timestamp,
}
}
pub fn is_empty(&self) -> bool {
self.head.is_none()
}
pub fn len(&self) -> usize {
self.version_count
}
pub fn get(&self, snapshot: &Snapshot) -> Option<&V> {
let mut current = self.head.as_ref();
while let Some(version) = current {
match version.check_visibility(snapshot) {
VersionVisibility::Visible => {
return version.value.as_ref();
}
VersionVisibility::Deleted => {
return None;
}
VersionVisibility::Uncommitted | VersionVisibility::Future => {
current = version.prev.as_ref();
}
}
}
None
}
pub fn insert(&mut self, value: V, txn_id: TxnId, timestamp: Timestamp) {
let mut new_version = Box::new(Version::new(value, txn_id, timestamp));
new_version.prev = self.head.take();
self.head = Some(new_version);
self.version_count += 1;
if self.oldest_ts.is_epoch() {
self.oldest_ts = timestamp;
}
}
pub fn update(&mut self, value: V, txn_id: TxnId, timestamp: Timestamp) {
self.insert(value, txn_id, timestamp);
}
pub fn delete(&mut self, txn_id: TxnId, timestamp: Timestamp) {
let mut tombstone = Box::new(Version::tombstone(txn_id, timestamp));
tombstone.prev = self.head.take();
self.head = Some(tombstone);
self.version_count += 1;
}
pub fn head(&self) -> Option<&Version<V>> {
self.head.as_ref().map(|v| v.as_ref())
}
pub fn head_mut(&mut self) -> Option<&mut Version<V>> {
self.head.as_mut().map(|v| v.as_mut())
}
pub fn gc(&mut self, watermark: Timestamp) -> usize {
let mut removed = 0;
let mut current = &mut self.head;
let mut found_visible = false;
while let Some(version) = current {
if version.created_at <= watermark {
if found_visible {
if let Some(prev) = version.prev.take() {
removed += 1 + self.count_chain(&prev);
}
break;
}
found_visible = true;
}
current = &mut version.prev;
}
self.version_count -= removed;
removed
}
fn count_chain(&self, version: &Version<V>) -> usize {
let mut count = 1;
let mut current = version.prev.as_ref();
while let Some(v) = current {
count += 1;
current = v.prev.as_ref();
}
count
}
pub fn is_all_deleted(&self) -> bool {
let mut current = self.head.as_ref();
while let Some(version) = current {
if !version.is_tombstone() {
return false;
}
current = version.prev.as_ref();
}
true
}
pub fn oldest_timestamp(&self) -> Timestamp {
self.oldest_ts
}
}
impl<V: Clone> Default for VersionChain<V> {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct Snapshot {
pub txn_id: TxnId,
pub start_ts: Timestamp,
active_txns: Vec<TxnId>,
committed_txns: Vec<TxnId>,
}
impl Snapshot {
pub fn new(txn_id: TxnId, start_ts: Timestamp) -> Self {
Self {
txn_id,
start_ts,
active_txns: Vec::new(),
committed_txns: Vec::new(),
}
}
pub fn with_active(txn_id: TxnId, start_ts: Timestamp, active: Vec<TxnId>) -> Self {
Self {
txn_id,
start_ts,
active_txns: active,
committed_txns: Vec::new(),
}
}
pub fn add_committed(&mut self, txn_id: TxnId) {
if !self.committed_txns.contains(&txn_id) {
self.committed_txns.push(txn_id);
}
}
pub fn is_committed(&self, txn_id: TxnId) -> bool {
if txn_id.is_zero() {
return true;
}
if self.active_txns.contains(&txn_id) {
return false;
}
if self.committed_txns.contains(&txn_id) {
return true;
}
true
}
pub fn is_active(&self, txn_id: TxnId) -> bool {
self.active_txns.contains(&txn_id)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TxnState {
Active,
Committed,
Aborted,
}
#[derive(Debug)]
pub struct ActiveTransaction {
pub id: TxnId,
pub start_ts: Timestamp,
pub state: TxnState,
pub snapshot: Snapshot,
write_set: Vec<Vec<u8>>,
read_set: Vec<Vec<u8>>,
}
impl ActiveTransaction {
pub fn new(id: TxnId, active_txns: Vec<TxnId>) -> Self {
let start_ts = next_timestamp();
Self {
id,
start_ts,
state: TxnState::Active,
snapshot: Snapshot::with_active(id, start_ts, active_txns),
write_set: Vec::new(),
read_set: Vec::new(),
}
}
pub fn record_read(&mut self, key: &[u8]) {
if !self.read_set.iter().any(|k| k == key) {
self.read_set.push(key.to_vec());
}
}
pub fn record_write(&mut self, key: &[u8]) {
if !self.write_set.iter().any(|k| k == key) {
self.write_set.push(key.to_vec());
}
}
pub fn write_set(&self) -> &[Vec<u8>] {
&self.write_set
}
pub fn read_set(&self) -> &[Vec<u8>] {
&self.read_set
}
pub fn commit(&mut self) {
self.state = TxnState::Committed;
}
pub fn abort(&mut self) {
self.state = TxnState::Aborted;
}
pub fn is_active(&self) -> bool {
self.state == TxnState::Active
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_version_chain_basic() {
let mut chain: VersionChain<String> = VersionChain::new();
assert!(chain.is_empty());
chain.insert("v1".to_string(), TxnId(1), Timestamp(1));
assert_eq!(chain.len(), 1);
chain.update("v2".to_string(), TxnId(2), Timestamp(2));
assert_eq!(chain.len(), 2);
}
#[test]
fn test_version_visibility() {
let mut chain: VersionChain<String> = VersionChain::new();
chain.insert("v1".to_string(), TxnId(1), Timestamp(1));
chain.update("v2".to_string(), TxnId(2), Timestamp(2));
let _snap1 = Snapshot::new(TxnId(3), Timestamp(1));
let snap2 = Snapshot::new(TxnId(3), Timestamp(2));
assert_eq!(chain.get(&snap2), Some(&"v2".to_string()));
}
#[test]
fn test_version_delete() {
let mut chain: VersionChain<String> = VersionChain::new();
chain.insert("v1".to_string(), TxnId(1), Timestamp(1));
chain.delete(TxnId(2), Timestamp(2));
let snap = Snapshot::new(TxnId(3), Timestamp(2));
assert!(chain.get(&snap).is_none());
}
#[test]
fn test_version_gc() {
let mut chain: VersionChain<String> = VersionChain::new();
chain.insert("v1".to_string(), TxnId(1), Timestamp(1));
chain.update("v2".to_string(), TxnId(2), Timestamp(2));
chain.update("v3".to_string(), TxnId(3), Timestamp(3));
chain.update("v4".to_string(), TxnId(4), Timestamp(4));
assert_eq!(chain.len(), 4);
let removed = chain.gc(Timestamp(3));
assert!(removed > 0);
assert!(chain.len() < 4);
}
#[test]
fn test_snapshot() {
let snap = Snapshot::new(TxnId(5), Timestamp(10));
assert!(snap.is_committed(TxnId::ZERO));
assert!(snap.is_committed(TxnId(3)));
}
#[test]
fn test_snapshot_with_active() {
let snap = Snapshot::with_active(TxnId(5), Timestamp(10), vec![TxnId(3), TxnId(4)]);
assert!(!snap.is_committed(TxnId(3)));
assert!(!snap.is_committed(TxnId(4)));
assert!(snap.is_committed(TxnId(1)));
assert!(snap.is_committed(TxnId(2)));
}
#[test]
fn test_active_transaction() {
let mut txn = ActiveTransaction::new(TxnId(1), vec![]);
assert!(txn.is_active());
txn.record_read(b"key1");
txn.record_write(b"key2");
assert_eq!(txn.read_set().len(), 1);
assert_eq!(txn.write_set().len(), 1);
txn.commit();
assert!(!txn.is_active());
assert_eq!(txn.state, TxnState::Committed);
}
#[test]
fn test_timestamp_generation() {
let ts1 = next_timestamp();
let ts2 = next_timestamp();
let ts3 = next_timestamp();
assert!(ts2 > ts1);
assert!(ts3 > ts2);
}
}