use crate::model::{Object, Predicate, Subject, Triple};
use anyhow::{anyhow, Result};
use dashmap::DashMap;
use parking_lot::{Mutex, RwLock};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
pub type Timestamp = u64;
pub type TransactionId = u64;
pub type VersionId = u64;
#[derive(Debug, Clone)]
pub struct MvccConfig {
pub max_versions_per_triple: usize,
pub gc_interval: Duration,
pub min_version_age: Duration,
pub enable_snapshot_isolation: bool,
pub enable_read_your_writes: bool,
pub conflict_detection: ConflictDetection,
}
impl Default for MvccConfig {
fn default() -> Self {
Self {
max_versions_per_triple: 100,
gc_interval: Duration::from_secs(60),
min_version_age: Duration::from_secs(300), enable_snapshot_isolation: true,
enable_read_your_writes: true,
conflict_detection: ConflictDetection::Optimistic,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConflictDetection {
Optimistic,
OptimisticTwoPhase,
Pessimistic,
TimestampOrdering,
}
pub struct MvccStore {
config: MvccConfig,
versions: Arc<DashMap<TripleKey, VersionChain>>,
transactions: Arc<DashMap<TransactionId, TransactionState>>,
timestamp_counter: Arc<AtomicU64>,
transaction_counter: Arc<AtomicU64>,
snapshots: Arc<RwLock<BTreeMap<Timestamp, SnapshotInfo>>>,
gc_state: Arc<Mutex<GarbageCollectionState>>,
indexes: Arc<MvccIndexes>,
}
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct TripleKey {
subject: String,
predicate: String,
object: String,
}
impl TripleKey {
fn from_triple(triple: &Triple) -> Self {
Self {
subject: triple.subject().to_string(),
predicate: triple.predicate().to_string(),
object: triple.object().to_string(),
}
}
}
#[derive(Debug, Clone)]
pub struct VersionChain {
versions: Vec<Version>,
}
impl VersionChain {
fn new() -> Self {
Self {
versions: Vec::new(),
}
}
fn add_version(&mut self, version: Version) {
let pos = self
.versions
.binary_search_by_key(&std::cmp::Reverse(version.timestamp), |v| {
std::cmp::Reverse(v.timestamp)
})
.unwrap_or_else(|pos| pos);
self.versions.insert(pos, version);
}
fn get_visible_version(&self, timestamp: Timestamp) -> Option<&Version> {
self.versions
.iter()
.find(|v| v.timestamp <= timestamp && v.is_visible_at(timestamp))
}
fn gc_versions(&mut self, min_timestamp: Timestamp, max_versions: usize) {
if self.versions.len() <= 1 {
return;
}
self.versions
.retain(|v| !(v.deleted && v.timestamp < min_timestamp));
if self.versions.len() > max_versions {
self.versions.truncate(max_versions);
}
}
}
#[derive(Debug, Clone)]
pub struct Version {
pub id: VersionId,
pub timestamp: Timestamp,
pub transaction_id: TransactionId,
pub deleted: bool,
pub triple: Option<Triple>,
pub commit_timestamp: Option<Timestamp>,
}
impl Version {
fn is_visible_at(&self, timestamp: Timestamp) -> bool {
if let Some(commit_ts) = self.commit_timestamp {
commit_ts <= timestamp
} else {
false }
}
}
#[derive(Debug, Clone)]
pub struct TransactionState {
pub id: TransactionId,
pub start_timestamp: Timestamp,
pub commit_timestamp: Option<Timestamp>,
pub status: TransactionStatus,
pub read_set: HashSet<TripleKey>,
pub write_set: HashMap<TripleKey, WriteOperation>,
pub isolation_level: IsolationLevel,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransactionStatus {
Active,
Preparing,
Committed,
Aborted,
}
#[derive(Debug, Clone)]
pub enum WriteOperation {
Insert(Triple),
Delete,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsolationLevel {
ReadUncommitted,
ReadCommitted,
RepeatableRead,
Serializable,
Snapshot,
SnapshotIsolation,
}
#[derive(Debug, Clone)]
pub struct SnapshotInfo {
pub timestamp: Timestamp,
pub active_transactions: HashSet<TransactionId>,
pub ref_count: usize,
}
#[derive(Debug)]
pub struct GarbageCollectionState {
last_gc: Instant,
versions_collected: u64,
gc_runs: u64,
}
pub struct MvccIndexes {
subject_index: DashMap<String, HashSet<TripleKey>>,
predicate_index: DashMap<String, HashSet<TripleKey>>,
object_index: DashMap<String, HashSet<TripleKey>>,
}
impl MvccStore {
pub fn new(config: MvccConfig) -> Self {
Self {
config,
versions: Arc::new(DashMap::new()),
transactions: Arc::new(DashMap::new()),
timestamp_counter: Arc::new(AtomicU64::new(1)),
transaction_counter: Arc::new(AtomicU64::new(1)),
snapshots: Arc::new(RwLock::new(BTreeMap::new())),
gc_state: Arc::new(Mutex::new(GarbageCollectionState {
last_gc: Instant::now(),
versions_collected: 0,
gc_runs: 0,
})),
indexes: Arc::new(MvccIndexes {
subject_index: DashMap::new(),
predicate_index: DashMap::new(),
object_index: DashMap::new(),
}),
}
}
pub fn begin_transaction(&self, isolation_level: IsolationLevel) -> Result<TransactionId> {
let tx_id = self.transaction_counter.fetch_add(1, Ordering::SeqCst);
let start_timestamp = self.get_next_timestamp();
let tx_state = TransactionState {
id: tx_id,
start_timestamp,
commit_timestamp: None,
status: TransactionStatus::Active,
read_set: HashSet::new(),
write_set: HashMap::new(),
isolation_level,
};
self.transactions.insert(tx_id, tx_state);
if isolation_level == IsolationLevel::Snapshot {
self.create_snapshot(start_timestamp)?;
}
Ok(tx_id)
}
pub fn insert(&self, tx_id: TransactionId, triple: Triple) -> Result<()> {
let mut tx = self.get_active_transaction(tx_id)?;
let key = TripleKey::from_triple(&triple);
if self.config.conflict_detection == ConflictDetection::Pessimistic {
self.check_write_conflict(&key, tx_id)?;
}
tx.write_set
.insert(key.clone(), WriteOperation::Insert(triple.clone()));
self.update_indexes_for_insert(&key);
Ok(())
}
pub fn delete(&self, tx_id: TransactionId, triple: &Triple) -> Result<()> {
let mut tx = self.get_active_transaction(tx_id)?;
let key = TripleKey::from_triple(triple);
if !self.exists_at_timestamp(&key, tx.start_timestamp)? {
return Err(anyhow!("Triple does not exist"));
}
tx.write_set.insert(key.clone(), WriteOperation::Delete);
self.update_indexes_for_delete(&key);
Ok(())
}
pub fn query(
&self,
tx_id: TransactionId,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
) -> Result<Vec<Triple>> {
let mut tx = self.get_active_transaction(tx_id)?;
let timestamp = tx.start_timestamp;
let candidates = self.get_candidate_keys(subject, predicate, object);
let mut results = Vec::new();
let mut processed_keys = HashSet::new();
for key in candidates {
processed_keys.insert(key.clone());
tx.read_set.insert(key.clone());
if let Some(version_chain) = self.versions.get(&key) {
if let Some(version) = version_chain.get_visible_version(timestamp) {
if !version.deleted {
if let Some(triple) = &version.triple {
if self.matches_pattern(triple, subject, predicate, object) {
results.push(triple.clone());
}
}
}
}
}
if self.config.enable_read_your_writes {
if let Some(write_op) = tx.write_set.get(&key) {
match write_op {
WriteOperation::Insert(triple) => {
if self.matches_pattern(triple, subject, predicate, object) {
results.push(triple.clone());
}
}
WriteOperation::Delete => {
results.retain(|t| TripleKey::from_triple(t) != key);
}
}
}
}
}
if self.config.enable_read_your_writes {
for (key, write_op) in &tx.write_set {
if !processed_keys.contains(key) {
match write_op {
WriteOperation::Insert(triple) => {
if self.matches_pattern(triple, subject, predicate, object) {
results.push(triple.clone());
}
}
WriteOperation::Delete => {
}
}
}
}
}
Ok(results)
}
pub fn commit_transaction(&self, tx_id: TransactionId) -> Result<()> {
let mut tx = self.get_active_transaction(tx_id)?;
tx.status = TransactionStatus::Preparing;
self.validate_transaction(&tx)?;
let commit_timestamp = self.get_next_timestamp();
for (key, operation) in &tx.write_set {
let version = match operation {
WriteOperation::Insert(triple) => Version {
id: self.get_next_timestamp(), timestamp: commit_timestamp,
transaction_id: tx_id,
deleted: false,
triple: Some(triple.clone()),
commit_timestamp: Some(commit_timestamp),
},
WriteOperation::Delete => Version {
id: self.get_next_timestamp(),
timestamp: commit_timestamp,
transaction_id: tx_id,
deleted: true,
triple: None,
commit_timestamp: Some(commit_timestamp),
},
};
self.versions
.entry(key.clone())
.or_insert_with(VersionChain::new)
.add_version(version);
}
tx.commit_timestamp = Some(commit_timestamp);
tx.status = TransactionStatus::Committed;
self.maybe_run_gc();
Ok(())
}
pub fn abort_transaction(&self, tx_id: TransactionId) -> Result<()> {
if let Some(mut tx) = self.transactions.get_mut(&tx_id) {
tx.status = TransactionStatus::Aborted;
}
Ok(())
}
fn validate_transaction(&self, tx: &TransactionState) -> Result<()> {
match self.config.conflict_detection {
ConflictDetection::Optimistic => {
for key in &tx.read_set {
if let Some(version_chain) = self.versions.get(key) {
if let Some(latest) = version_chain.versions.first() {
if latest.timestamp > tx.start_timestamp {
return Err(anyhow!("Read conflict detected"));
}
}
}
}
for key in tx.write_set.keys() {
if let Some(version_chain) = self.versions.get(key) {
if let Some(latest) = version_chain.versions.first() {
if latest.timestamp > tx.start_timestamp
&& latest.transaction_id != tx.id
{
return Err(anyhow!("Write conflict detected"));
}
}
}
}
}
ConflictDetection::Pessimistic => {
}
ConflictDetection::TimestampOrdering => {
for key in tx.write_set.keys() {
if let Some(version_chain) = self.versions.get(key) {
for version in &version_chain.versions {
if version.transaction_id != tx.id
&& version.timestamp > tx.start_timestamp
&& version.commit_timestamp.is_some()
{
return Err(anyhow!("Timestamp ordering violation"));
}
}
}
}
}
ConflictDetection::OptimisticTwoPhase => {
for key in &tx.read_set {
if let Some(version_chain) = self.versions.get(key) {
if let Some(latest) = version_chain.versions.first() {
if latest.timestamp > tx.start_timestamp {
return Err(anyhow!("Read conflict detected in phase 1"));
}
}
}
}
for key in tx.write_set.keys() {
if let Some(version_chain) = self.versions.get(key) {
for version in &version_chain.versions {
if version.transaction_id != tx.id
&& version.timestamp > tx.start_timestamp
&& version.commit_timestamp.is_some()
{
return Err(anyhow!("Write conflict detected in phase 2"));
}
}
}
}
}
}
Ok(())
}
fn get_active_transaction(
&self,
tx_id: TransactionId,
) -> Result<dashmap::mapref::one::RefMut<'_, TransactionId, TransactionState>> {
let tx = self
.transactions
.get_mut(&tx_id)
.ok_or_else(|| anyhow!("Transaction not found"))?;
if tx.status != TransactionStatus::Active {
return Err(anyhow!("Transaction is not active"));
}
Ok(tx)
}
fn exists_at_timestamp(&self, key: &TripleKey, timestamp: Timestamp) -> Result<bool> {
if let Some(version_chain) = self.versions.get(key) {
if let Some(version) = version_chain.get_visible_version(timestamp) {
return Ok(!version.deleted);
}
}
Ok(false)
}
fn get_candidate_keys(
&self,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
) -> HashSet<TripleKey> {
let mut candidates = HashSet::new();
if let Some(subj) = subject {
if let Some(keys) = self.indexes.subject_index.get(&subj.to_string()) {
candidates.extend(keys.iter().cloned());
}
} else if let Some(pred) = predicate {
if let Some(keys) = self.indexes.predicate_index.get(&pred.to_string()) {
candidates.extend(keys.iter().cloned());
}
} else if let Some(obj) = object {
if let Some(keys) = self.indexes.object_index.get(&obj.to_string()) {
candidates.extend(keys.iter().cloned());
}
} else {
for entry in self.versions.iter() {
candidates.insert(entry.key().clone());
}
}
candidates
}
fn matches_pattern(
&self,
triple: &Triple,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
) -> bool {
if let Some(s) = subject {
if triple.subject() != s {
return false;
}
}
if let Some(p) = predicate {
if triple.predicate() != p {
return false;
}
}
if let Some(o) = object {
if triple.object() != o {
return false;
}
}
true
}
fn update_indexes_for_insert(&self, key: &TripleKey) {
self.indexes
.subject_index
.entry(key.subject.clone())
.or_default()
.insert(key.clone());
self.indexes
.predicate_index
.entry(key.predicate.clone())
.or_default()
.insert(key.clone());
self.indexes
.object_index
.entry(key.object.clone())
.or_default()
.insert(key.clone());
}
fn update_indexes_for_delete(&self, _key: &TripleKey) {
}
fn check_write_conflict(&self, _key: &TripleKey, _tx_id: TransactionId) -> Result<()> {
Ok(())
}
fn create_snapshot(&self, timestamp: Timestamp) -> Result<()> {
let active_txs: HashSet<TransactionId> = self
.transactions
.iter()
.filter(|entry| entry.value().status == TransactionStatus::Active)
.map(|entry| *entry.key())
.collect();
let snapshot = SnapshotInfo {
timestamp,
active_transactions: active_txs,
ref_count: 1,
};
self.snapshots.write().insert(timestamp, snapshot);
Ok(())
}
fn get_current_timestamp(&self) -> Timestamp {
self.timestamp_counter.load(Ordering::SeqCst)
}
fn get_next_timestamp(&self) -> Timestamp {
self.timestamp_counter.fetch_add(1, Ordering::SeqCst)
}
fn maybe_run_gc(&self) {
let mut gc_state = self.gc_state.lock();
if gc_state.last_gc.elapsed() >= self.config.gc_interval {
let versions = self.versions.clone();
let config = self.config.clone();
let min_timestamp = self.calculate_min_timestamp();
std::thread::spawn(move || {
Self::run_gc_internal(versions, config, min_timestamp);
});
gc_state.last_gc = Instant::now();
gc_state.gc_runs += 1;
}
}
fn run_gc_internal(
versions: Arc<DashMap<TripleKey, VersionChain>>,
config: MvccConfig,
min_timestamp: Timestamp,
) {
for mut entry in versions.iter_mut() {
entry
.value_mut()
.gc_versions(min_timestamp, config.max_versions_per_triple);
}
}
fn calculate_min_timestamp(&self) -> Timestamp {
let min_active = self
.transactions
.iter()
.filter(|entry| entry.value().status == TransactionStatus::Active)
.map(|entry| entry.value().start_timestamp)
.min()
.unwrap_or(self.get_current_timestamp());
let min_snapshot = self
.snapshots
.read()
.keys()
.next()
.copied()
.unwrap_or(self.get_current_timestamp());
min_active.min(min_snapshot)
}
pub fn garbage_collect(&self) -> Result<()> {
let min_timestamp = self.calculate_min_timestamp();
Self::run_gc_internal(self.versions.clone(), self.config.clone(), min_timestamp);
Ok(())
}
pub fn get_stats(&self) -> MvccStats {
let total_versions = self
.versions
.iter()
.map(|entry| entry.value().versions.len())
.sum();
let gc_state = self.gc_state.lock();
MvccStats {
total_triples: self.versions.len(),
total_versions,
active_transactions: self
.transactions
.iter()
.filter(|entry| entry.value().status == TransactionStatus::Active)
.count(),
gc_runs: gc_state.gc_runs,
versions_collected: gc_state.versions_collected,
}
}
}
#[derive(Debug, Clone)]
pub struct MvccStats {
pub total_triples: usize,
pub total_versions: usize,
pub active_transactions: usize,
pub gc_runs: u64,
pub versions_collected: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::{Literal, NamedNode};
#[test]
fn test_basic_mvcc_operations() {
let config = MvccConfig::default();
let store = MvccStore::new(config);
let tx1 = store
.begin_transaction(IsolationLevel::Snapshot)
.expect("store operation should succeed");
let triple = Triple::new(
NamedNode::new("http://example.org/s").expect("valid IRI"),
NamedNode::new("http://example.org/p").expect("valid IRI"),
Literal::new("value"),
);
store
.insert(tx1, triple.clone())
.expect("MVCC insert should succeed");
let results = store
.query(tx1, None, None, None)
.expect("store operation should succeed");
assert_eq!(results.len(), 1);
store
.commit_transaction(tx1)
.expect("store operation should succeed");
let tx2 = store
.begin_transaction(IsolationLevel::Snapshot)
.expect("store operation should succeed");
let results = store
.query(tx2, None, None, None)
.expect("store operation should succeed");
assert_eq!(results.len(), 1);
}
#[test]
fn test_concurrent_transactions() {
let config = MvccConfig::default();
let store = MvccStore::new(config);
let tx0 = store
.begin_transaction(IsolationLevel::Snapshot)
.expect("store operation should succeed");
let triple = Triple::new(
NamedNode::new("http://example.org/s").expect("valid IRI"),
NamedNode::new("http://example.org/p").expect("valid IRI"),
Literal::new("initial"),
);
store
.insert(tx0, triple.clone())
.expect("MVCC insert should succeed");
store
.commit_transaction(tx0)
.expect("store operation should succeed");
let tx1 = store
.begin_transaction(IsolationLevel::Snapshot)
.expect("store operation should succeed");
let tx2 = store
.begin_transaction(IsolationLevel::Snapshot)
.expect("store operation should succeed");
assert_eq!(
store
.query(tx1, None, None, None)
.expect("store operation should succeed")
.len(),
1
);
assert_eq!(
store
.query(tx2, None, None, None)
.expect("store operation should succeed")
.len(),
1
);
store
.delete(tx1, &triple)
.expect("store operation should succeed");
let new_triple = Triple::new(
NamedNode::new("http://example.org/s").expect("valid IRI"),
NamedNode::new("http://example.org/p").expect("valid IRI"),
Literal::new("modified"),
);
store
.insert(tx1, new_triple)
.expect("store operation should succeed");
let tx2_results = store
.query(tx2, None, None, None)
.expect("store operation should succeed");
assert_eq!(tx2_results.len(), 1);
assert_eq!(tx2_results[0].object().to_string(), "\"initial\"");
store
.commit_transaction(tx1)
.expect("store operation should succeed");
let tx2_results = store
.query(tx2, None, None, None)
.expect("store operation should succeed");
assert_eq!(tx2_results.len(), 1);
assert_eq!(tx2_results[0].object().to_string(), "\"initial\"");
let tx3 = store
.begin_transaction(IsolationLevel::Snapshot)
.expect("store operation should succeed");
let tx3_results = store
.query(tx3, None, None, None)
.expect("store operation should succeed");
assert_eq!(tx3_results.len(), 1);
assert_eq!(tx3_results[0].object().to_string(), "\"modified\"");
}
#[test]
fn test_write_conflict_detection() {
let config = MvccConfig {
conflict_detection: ConflictDetection::Optimistic,
..Default::default()
};
let store = MvccStore::new(config);
let tx0 = store
.begin_transaction(IsolationLevel::Snapshot)
.expect("store operation should succeed");
let triple = Triple::new(
NamedNode::new("http://example.org/s").expect("valid IRI"),
NamedNode::new("http://example.org/p").expect("valid IRI"),
Literal::new("initial"),
);
store
.insert(tx0, triple.clone())
.expect("MVCC insert should succeed");
store
.commit_transaction(tx0)
.expect("store operation should succeed");
let tx1 = store
.begin_transaction(IsolationLevel::Snapshot)
.expect("store operation should succeed");
let tx2 = store
.begin_transaction(IsolationLevel::Snapshot)
.expect("store operation should succeed");
store
.delete(tx1, &triple)
.expect("store operation should succeed");
store
.delete(tx2, &triple)
.expect("store operation should succeed");
assert!(store.commit_transaction(tx1).is_ok());
assert!(store.commit_transaction(tx2).is_err());
}
#[test]
fn test_version_chain() {
let mut chain = VersionChain::new();
for i in 0..5 {
let version = Version {
id: i,
timestamp: i * 10,
transaction_id: i,
deleted: false,
triple: None,
commit_timestamp: Some(i * 10 + 5),
};
chain.add_version(version);
}
assert_eq!(
chain
.get_visible_version(25)
.expect("operation should succeed")
.id,
2
);
assert_eq!(
chain
.get_visible_version(45)
.expect("operation should succeed")
.id,
4
);
chain.gc_versions(20, 3);
assert!(chain.versions.len() <= 3);
}
}