use crate::mvcc::{HLCTimestamp, MVCCManager, TransactionSnapshot};
use crate::shard::ShardId;
use crate::storage::StorageBackend;
use crate::transaction::{IsolationLevel, TransactionId};
use anyhow::Result;
use async_trait::async_trait;
use dashmap::DashMap;
#[allow(unused_imports)]
use oxirs_core::model::{
BlankNode, Literal, NamedNode, Object, Predicate, QuotedTriple, Subject, Triple, Variable,
};
#[cfg(test)]
use oxirs_core::vocab::xsd;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{info, warn};
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub enum IndexKey {
Subject(String),
Predicate(String),
Object(String),
SubjectPredicate(String, String),
PredicateObject(String, String),
SubjectObject(String, String),
Triple(String, String, String),
}
impl IndexKey {
pub fn from_triple_pattern(
subject: Option<&str>,
predicate: Option<&str>,
object: Option<&str>,
) -> Self {
match (subject, predicate, object) {
(Some(s), Some(p), Some(o)) => {
IndexKey::Triple(s.to_string(), p.to_string(), o.to_string())
}
(Some(s), Some(p), None) => IndexKey::SubjectPredicate(s.to_string(), p.to_string()),
(Some(s), None, Some(o)) => IndexKey::SubjectObject(s.to_string(), o.to_string()),
(None, Some(p), Some(o)) => IndexKey::PredicateObject(p.to_string(), o.to_string()),
(Some(s), None, None) => IndexKey::Subject(s.to_string()),
(None, Some(p), None) => IndexKey::Predicate(p.to_string()),
(None, None, Some(o)) => IndexKey::Object(o.to_string()),
(None, None, None) => IndexKey::Subject("*".to_string()), }
}
pub fn to_storage_key(&self) -> String {
match self {
IndexKey::Subject(s) => format!("s:{s}"),
IndexKey::Predicate(p) => format!("p:{p}"),
IndexKey::Object(o) => format!("o:{o}"),
IndexKey::SubjectPredicate(s, p) => format!("sp:{s}:{p}"),
IndexKey::PredicateObject(p, o) => format!("po:{p}:{o}"),
IndexKey::SubjectObject(s, o) => format!("so:{s}:{o}"),
IndexKey::Triple(s, p, o) => format!("spo:{s}:{p}:{o}"),
}
}
}
pub struct MVCCIndex {
primary_index: Arc<DashMap<IndexKey, BTreeMap<HLCTimestamp, HashSet<String>>>>,
reverse_index: Arc<DashMap<String, HashSet<IndexKey>>>,
}
impl Default for MVCCIndex {
fn default() -> Self {
Self::new()
}
}
impl MVCCIndex {
pub fn new() -> Self {
Self {
primary_index: Arc::new(DashMap::new()),
reverse_index: Arc::new(DashMap::new()),
}
}
pub fn index_triple(&self, triple: &Triple, timestamp: HLCTimestamp, triple_key: &str) {
let subject = subject_to_string(triple.subject());
let predicate = predicate_to_string(triple.predicate());
let object = object_to_string(triple.object());
let index_keys = vec![
IndexKey::Subject(subject.clone()),
IndexKey::Predicate(predicate.clone()),
IndexKey::Object(object.clone()),
IndexKey::SubjectPredicate(subject.clone(), predicate.clone()),
IndexKey::PredicateObject(predicate.clone(), object.clone()),
IndexKey::SubjectObject(subject.clone(), object.clone()),
IndexKey::Triple(subject, predicate, object),
];
for key in &index_keys {
self.primary_index
.entry(key.clone())
.or_default()
.entry(timestamp)
.or_default()
.insert(triple_key.to_string());
}
self.reverse_index
.entry(triple_key.to_string())
.or_default()
.extend(index_keys);
}
pub fn remove_triple(&self, triple_key: &str, timestamp: HLCTimestamp) {
if let Some(index_keys) = self.reverse_index.get(triple_key) {
for key in index_keys.value() {
if let Some(mut entry) = self.primary_index.get_mut(key) {
if let Some(keys_at_timestamp) = entry.get_mut(×tamp) {
keys_at_timestamp.remove(triple_key);
if keys_at_timestamp.is_empty() {
entry.remove(×tamp);
}
}
}
}
}
}
pub fn query(
&self,
index_key: &IndexKey,
timestamp: &HLCTimestamp,
include_uncommitted: bool,
) -> HashSet<String> {
let mut results = HashSet::new();
if let Some(versions) = self.primary_index.get(index_key) {
for (ts, keys) in versions.range(..=timestamp) {
if include_uncommitted || ts <= timestamp {
results.extend(keys.clone());
}
}
}
results
}
pub fn get_statistics(&self) -> IndexStatistics {
let total_index_entries = self.primary_index.len();
let total_triple_keys = self.reverse_index.len();
let mut max_versions_per_index = 0;
for entry in self.primary_index.iter() {
max_versions_per_index = max_versions_per_index.max(entry.value().len());
}
IndexStatistics {
total_index_entries,
total_triple_keys,
max_versions_per_index,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexStatistics {
pub total_index_entries: usize,
pub total_triple_keys: usize,
pub max_versions_per_index: usize,
}
#[derive(Debug, Clone, Copy)]
pub enum CompactionStrategy {
None,
KeepLatest(usize),
TimeBasedRetention(std::time::Duration),
Hybrid {
max_versions: usize,
retention_period: std::time::Duration,
},
}
impl Default for CompactionStrategy {
fn default() -> Self {
CompactionStrategy::Hybrid {
max_versions: 100,
retention_period: std::time::Duration::from_secs(86400), }
}
}
pub struct MVCCStorage {
mvcc: Arc<MVCCManager>,
index: Arc<MVCCIndex>,
base_path: String,
compaction_strategy: CompactionStrategy,
stats: Arc<RwLock<StorageStatistics>>,
}
impl MVCCStorage {
pub fn new(node_id: u64, base_path: String, compaction_strategy: CompactionStrategy) -> Self {
let mvcc = Arc::new(MVCCManager::new(
node_id,
crate::mvcc::MVCCConfig::default(),
));
Self {
mvcc,
index: Arc::new(MVCCIndex::new()),
base_path,
compaction_strategy,
stats: Arc::new(RwLock::new(StorageStatistics::default())),
}
}
pub async fn start(&self) -> Result<()> {
self.mvcc.start().await?;
tokio::fs::create_dir_all(&self.base_path).await?;
info!("MVCC storage started at {}", self.base_path);
Ok(())
}
pub async fn stop(&self) -> Result<()> {
self.mvcc.stop().await?;
info!("MVCC storage stopped");
Ok(())
}
pub async fn begin_transaction(
&self,
transaction_id: TransactionId,
isolation_level: IsolationLevel,
) -> Result<TransactionSnapshot> {
self.mvcc
.begin_transaction(transaction_id, isolation_level)
.await
}
pub async fn insert_triple(
&self,
transaction_id: &TransactionId,
triple: Triple,
) -> Result<()> {
let shard_id = 0;
let key = self.shard_triple_to_key(shard_id, &triple);
self.mvcc
.write(transaction_id, &key, Some(triple.clone()))
.await?;
let timestamp = self.mvcc.current_timestamp();
self.index.index_triple(&triple, timestamp, &key);
self.stats.write().await.total_inserts += 1;
Ok(())
}
pub async fn delete_triple(
&self,
transaction_id: &TransactionId,
triple: Triple,
) -> Result<()> {
let key = self.triple_to_key(&triple);
self.mvcc.write(transaction_id, &key, None).await?;
self.stats.write().await.total_deletes += 1;
Ok(())
}
pub async fn query_triples(
&self,
transaction_id: &TransactionId,
subject: Option<&str>,
predicate: Option<&str>,
object: Option<&str>,
) -> Result<Vec<Triple>> {
let index_key = IndexKey::from_triple_pattern(subject, predicate, object);
let snapshot = self
.mvcc
.begin_transaction(transaction_id.clone(), IsolationLevel::ReadCommitted)
.await?;
let triple_keys = self.index.query(
&index_key,
&snapshot.timestamp,
snapshot.isolation_level == IsolationLevel::ReadUncommitted,
);
let mut results = Vec::new();
for key in triple_keys {
if let Some(triple) = self.mvcc.read(transaction_id, &key).await? {
if matches_pattern(&triple, subject, predicate, object) {
results.push(triple);
}
}
}
self.stats.write().await.total_queries += 1;
Ok(results)
}
pub async fn commit_transaction(&self, transaction_id: &TransactionId) -> Result<()> {
self.mvcc.commit_transaction(transaction_id).await?;
self.stats.write().await.total_commits += 1;
Ok(())
}
pub async fn rollback_transaction(&self, transaction_id: &TransactionId) -> Result<()> {
self.mvcc.rollback_transaction(transaction_id).await?;
self.stats.write().await.total_rollbacks += 1;
Ok(())
}
pub async fn compact(&self) -> Result<CompactionResult> {
let start_time = std::time::Instant::now();
let versions_removed = 0;
let keys_processed = 0;
match self.compaction_strategy {
CompactionStrategy::None => {
return Ok(CompactionResult {
duration: start_time.elapsed(),
versions_removed: 0,
keys_processed: 0,
});
}
CompactionStrategy::KeepLatest(_n) => {
warn!("KeepLatest compaction not yet implemented");
}
CompactionStrategy::TimeBasedRetention(_retention) => {
warn!("TimeBasedRetention compaction not yet implemented");
}
CompactionStrategy::Hybrid {
max_versions: _,
retention_period: _,
} => {
warn!("Hybrid compaction not yet implemented");
}
}
Ok(CompactionResult {
duration: start_time.elapsed(),
versions_removed,
keys_processed,
})
}
pub async fn get_statistics(&self) -> StorageStatistics {
let stats = self.stats.read().await.clone();
stats
}
fn triple_to_key(&self, triple: &Triple) -> String {
format!(
"{}:{}:{}",
subject_to_string(triple.subject()),
predicate_to_string(triple.predicate()),
object_to_string(triple.object())
)
}
fn shard_triple_to_key(&self, shard_id: ShardId, triple: &Triple) -> String {
format!("shard:{}:{}", shard_id, self.triple_to_key(triple))
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct StorageStatistics {
pub total_inserts: u64,
pub total_deletes: u64,
pub total_queries: u64,
pub total_commits: u64,
pub total_rollbacks: u64,
}
#[derive(Debug, Clone)]
pub struct CompactionResult {
pub duration: std::time::Duration,
pub versions_removed: usize,
pub keys_processed: usize,
}
#[async_trait]
impl StorageBackend for MVCCStorage {
async fn create_shard(&self, _shard_id: ShardId) -> Result<()> {
Ok(())
}
async fn delete_shard(&self, _shard_id: ShardId) -> Result<()> {
Ok(())
}
async fn insert_triple_to_shard(&self, shard_id: ShardId, triple: Triple) -> Result<()> {
let tx_id = format!("shard_{}_insert_{}", shard_id, uuid::Uuid::new_v4());
self.begin_transaction(tx_id.clone(), IsolationLevel::ReadCommitted)
.await?;
let key = format!("shard:{}:{}", shard_id, self.triple_to_key(&triple));
self.mvcc.write(&tx_id, &key, Some(triple.clone())).await?;
let timestamp = self.mvcc.current_timestamp();
self.index.index_triple(&triple, timestamp, &key);
self.stats.write().await.total_inserts += 1;
self.commit_transaction(&tx_id).await
}
async fn delete_triple_from_shard(&self, shard_id: ShardId, triple: &Triple) -> Result<()> {
let tx_id = format!("shard_{}_delete_{}", shard_id, uuid::Uuid::new_v4());
self.begin_transaction(tx_id.clone(), IsolationLevel::ReadCommitted)
.await?;
let key = format!("shard:{}:{}", shard_id, self.triple_to_key(triple));
self.mvcc.write(&tx_id, &key, None).await?;
self.commit_transaction(&tx_id).await
}
async fn query_shard(
&self,
shard_id: ShardId,
subject: Option<&str>,
predicate: Option<&str>,
object: Option<&str>,
) -> Result<Vec<Triple>> {
let tx_id = format!("shard_{}_query_{}", shard_id, uuid::Uuid::new_v4());
let _snapshot = self
.mvcc
.begin_transaction(tx_id.clone(), IsolationLevel::ReadCommitted)
.await?;
let mut results = Vec::new();
let index_key = IndexKey::from_triple_pattern(subject, predicate, object);
let triple_keys = self.index.query(
&index_key,
&_snapshot.timestamp,
_snapshot.isolation_level == IsolationLevel::ReadUncommitted,
);
let shard_prefix = format!("shard:{shard_id}:");
for key in triple_keys {
if key.starts_with(&shard_prefix) {
if let Some(triple) = self.mvcc.read(&tx_id, &key).await? {
if matches_pattern(&triple, subject, predicate, object) {
results.push(triple);
}
}
}
}
Ok(results)
}
async fn get_shard_size(&self, shard_id: ShardId) -> Result<u64> {
let count = self.get_shard_triple_count(shard_id).await?;
Ok((count * 100) as u64) }
async fn get_shard_triple_count(&self, _shard_id: ShardId) -> Result<usize> {
let stats = self.mvcc.get_statistics().await;
Ok(stats.total_keys / 10) }
async fn export_shard(&self, shard_id: ShardId) -> Result<Vec<Triple>> {
self.query_shard(shard_id, None, None, None).await
}
async fn import_shard(&self, shard_id: ShardId, triples: Vec<Triple>) -> Result<()> {
let tx_id = format!("shard_{}_import_{}", shard_id, uuid::Uuid::new_v4());
self.begin_transaction(tx_id.clone(), IsolationLevel::ReadCommitted)
.await?;
for triple in triples {
let key = format!("shard:{}:{}", shard_id, self.triple_to_key(&triple));
self.mvcc.write(&tx_id, &key, Some(triple)).await?;
}
self.commit_transaction(&tx_id).await
}
async fn get_shard_triples(&self, shard_id: ShardId) -> Result<Vec<Triple>> {
let tx_id = format!("shard_{}_get_triples_{}", shard_id, uuid::Uuid::new_v4());
self.begin_transaction(tx_id.clone(), IsolationLevel::ReadCommitted)
.await?;
let prefix = format!("shard:{shard_id}:");
let results = self.mvcc.scan_prefix(&tx_id, &prefix).await?;
let mut triples = Vec::new();
for (_, triple) in results {
triples.push(triple);
}
self.commit_transaction(&tx_id).await?;
Ok(triples)
}
async fn insert_triples_to_shard(&self, shard_id: ShardId, triples: Vec<Triple>) -> Result<()> {
let tx_id = format!("shard_{}_insert_bulk_{}", shard_id, uuid::Uuid::new_v4());
self.begin_transaction(tx_id.clone(), IsolationLevel::ReadCommitted)
.await?;
for triple in triples {
let key = format!("shard:{}:{}", shard_id, self.triple_to_key(&triple));
self.mvcc.write(&tx_id, &key, Some(triple)).await?;
}
self.commit_transaction(&tx_id).await
}
async fn mark_shard_for_deletion(&self, shard_id: ShardId) -> Result<()> {
let tx_id = format!("shard_{}_mark_delete_{}", shard_id, uuid::Uuid::new_v4());
self.begin_transaction(tx_id.clone(), IsolationLevel::ReadCommitted)
.await?;
let deletion_marker_key = format!("shard:{shard_id}:__MARKED_FOR_DELETION__");
let marker_triple = Triple::new(
Subject::NamedNode(
NamedNode::new("urn:oxirs:shard:deleted").expect("valid static URI"),
),
Predicate::NamedNode(
NamedNode::new("urn:oxirs:prop:deletionMarker").expect("valid static URI"),
),
Object::Literal(Literal::new_simple_literal("true")),
);
self.mvcc
.write(&tx_id, &deletion_marker_key, Some(marker_triple))
.await?;
self.commit_transaction(&tx_id).await
}
}
fn subject_to_string(subject: &Subject) -> String {
match subject {
Subject::NamedNode(n) => n.as_str().to_string(),
Subject::BlankNode(b) => format!("_:{}", b.as_str()),
Subject::Variable(v) => format!("?{}", v.as_str()),
Subject::QuotedTriple(t) => format!(
"<<{} {} {}>>",
subject_to_string(t.subject()),
predicate_to_string(t.predicate()),
object_to_string(t.object())
),
}
}
fn predicate_to_string(predicate: &Predicate) -> String {
match predicate {
Predicate::NamedNode(n) => n.as_str().to_string(),
Predicate::Variable(v) => format!("?{}", v.as_str()),
}
}
fn object_to_string(object: &Object) -> String {
match object {
Object::NamedNode(n) => n.as_str().to_string(),
Object::BlankNode(b) => format!("_:{}", b.as_str()),
Object::Literal(l) => {
if let Some(lang) = l.language() {
format!("\"{}\"@{}", l.value(), lang)
} else {
let dt = l.datatype();
format!("\"{}\"^^<{}>", l.value(), dt.as_str())
}
}
Object::Variable(v) => format!("?{}", v.as_str()),
Object::QuotedTriple(t) => format!(
"<<{} {} {}>>",
subject_to_string(t.subject()),
predicate_to_string(t.predicate()),
object_to_string(t.object())
),
}
}
fn matches_pattern(
triple: &Triple,
subject: Option<&str>,
predicate: Option<&str>,
object: Option<&str>,
) -> bool {
if let Some(s) = subject {
if subject_to_string(triple.subject()) != s {
return false;
}
}
if let Some(p) = predicate {
if predicate_to_string(triple.predicate()) != p {
return false;
}
}
if let Some(o) = object {
if object_to_string(triple.object()) != o {
return false;
}
}
true
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_index_key_creation() {
let key1 = IndexKey::from_triple_pattern(Some("s"), Some("p"), Some("o"));
assert!(matches!(key1, IndexKey::Triple(_, _, _)));
let key2 = IndexKey::from_triple_pattern(Some("s"), Some("p"), None);
assert!(matches!(key2, IndexKey::SubjectPredicate(_, _)));
let key3 = IndexKey::from_triple_pattern(Some("s"), None, None);
assert!(matches!(key3, IndexKey::Subject(_)));
}
#[test]
fn test_index_key_to_storage_key() {
let key = IndexKey::Triple("s".to_string(), "p".to_string(), "o".to_string());
assert_eq!(key.to_storage_key(), "spo:s:p:o");
let key = IndexKey::Subject("s".to_string());
assert_eq!(key.to_storage_key(), "s:s");
}
#[tokio::test]
async fn test_mvcc_storage_basic() {
let storage = MVCCStorage::new(1, "/tmp/mvcc_test".to_string(), CompactionStrategy::None);
storage.start().await.unwrap();
let triple = Triple::new(
NamedNode::new("http://example.org/s").unwrap(),
NamedNode::new("http://example.org/p").unwrap(),
Literal::new_typed_literal("value", xsd::STRING.clone()),
);
storage
.insert_triple_to_shard(0, triple.clone())
.await
.unwrap();
let results = storage
.query_shard(0, Some("http://example.org/s"), None, None)
.await
.unwrap();
assert_eq!(results.len(), 1);
let stats = storage.get_statistics().await;
assert_eq!(stats.total_inserts, 1);
assert_eq!(stats.total_commits, 1);
storage.stop().await.unwrap();
}
#[tokio::test]
async fn test_mvcc_storage_transaction() {
let storage =
MVCCStorage::new(1, "/tmp/mvcc_test_tx".to_string(), CompactionStrategy::None);
storage.start().await.unwrap();
let tx_id = "test_tx".to_string();
storage
.begin_transaction(tx_id.clone(), IsolationLevel::ReadCommitted)
.await
.unwrap();
let triple = Triple::new(
NamedNode::new("http://example.org/s").unwrap(),
NamedNode::new("http://example.org/p").unwrap(),
Literal::new_typed_literal("value", xsd::STRING.clone()),
);
storage.insert_triple(&tx_id, triple.clone()).await.unwrap();
storage.commit_transaction(&tx_id).await.unwrap();
let results = storage
.query_shard(0, Some("http://example.org/s"), None, None)
.await
.unwrap();
assert_eq!(results.len(), 1);
storage.stop().await.unwrap();
}
#[test]
fn test_compaction_strategy() {
let strategy = CompactionStrategy::default();
match strategy {
CompactionStrategy::Hybrid {
max_versions,
retention_period,
} => {
assert_eq!(max_versions, 100);
assert_eq!(retention_period.as_secs(), 86400);
}
_ => panic!("Expected hybrid strategy"),
}
}
}