use crate::error::{DbxError, DbxResult};
use crate::transaction::mvcc::manager::TimestampOracle;
use crate::transaction::mvcc::versionable::Versionable;
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};
type VersionStorage<T> = Arc<RwLock<BTreeMap<Vec<u8>, Vec<(u64, T)>>>>;
#[derive(Clone)]
pub struct VersionManager<T: Versionable> {
versions: VersionStorage<T>,
#[allow(dead_code)]
oracle: Option<Arc<TimestampOracle>>,
}
impl<T: Versionable> VersionManager<T> {
pub fn new(oracle: Arc<TimestampOracle>) -> Self {
Self {
versions: Arc::new(RwLock::new(BTreeMap::new())),
oracle: Some(oracle),
}
}
pub fn new_without_oracle() -> Self {
Self {
versions: Arc::new(RwLock::new(BTreeMap::new())),
oracle: None,
}
}
pub fn add_version(&self, key: Vec<u8>, value: T, commit_ts: u64) -> DbxResult<()> {
let mut versions = self
.versions
.write()
.map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
let version_list = versions.entry(key).or_insert_with(Vec::new);
let insert_pos = version_list
.binary_search_by(|(ts, _)| commit_ts.cmp(ts))
.unwrap_or_else(|pos| pos);
version_list.insert(insert_pos, (commit_ts, value));
Ok(())
}
pub fn get_at_snapshot(&self, key: &[u8], read_ts: u64) -> DbxResult<Option<T>> {
let versions = self
.versions
.read()
.map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
if let Some(version_list) = versions.get(key) {
for (commit_ts, value) in version_list {
if *commit_ts <= read_ts {
return Ok(Some(value.clone()));
}
}
}
Ok(None)
}
pub fn collect_garbage(&self, min_active_ts: u64) -> DbxResult<usize> {
let mut versions = self
.versions
.write()
.map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
let mut deleted_count = 0;
for (_key, version_list) in versions.iter_mut() {
if version_list.len() <= 1 {
continue;
}
let mut to_remove = Vec::new();
for (i, (commit_ts, _)) in version_list.iter().enumerate() {
if i == 0 {
continue;
}
if *commit_ts < min_active_ts {
to_remove.push(i);
}
}
for &idx in to_remove.iter().rev() {
version_list.remove(idx);
deleted_count += 1;
}
}
Ok(deleted_count)
}
pub fn version_count(&self, key: &[u8]) -> DbxResult<usize> {
let versions = self
.versions
.read()
.map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
Ok(versions.get(key).map(|v| v.len()).unwrap_or(0))
}
pub fn key_count(&self) -> DbxResult<usize> {
let versions = self
.versions
.read()
.map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
Ok(versions.len())
}
pub fn total_version_count(&self) -> DbxResult<usize> {
let versions = self
.versions
.read()
.map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
Ok(versions.values().map(|v| v.len()).sum())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_version_manager_add_and_get() -> DbxResult<()> {
let oracle = Arc::new(TimestampOracle::default());
let manager = VersionManager::<String>::new(Arc::clone(&oracle));
manager.add_version(b"user:1".to_vec(), "Alice v1".to_string(), 10)?;
manager.add_version(b"user:1".to_vec(), "Alice v2".to_string(), 20)?;
manager.add_version(b"user:1".to_vec(), "Alice v3".to_string(), 30)?;
assert_eq!(
manager.get_at_snapshot(b"user:1", 5)?,
None );
assert_eq!(
manager.get_at_snapshot(b"user:1", 15)?,
Some("Alice v1".to_string()) );
assert_eq!(
manager.get_at_snapshot(b"user:1", 25)?,
Some("Alice v2".to_string()) );
assert_eq!(
manager.get_at_snapshot(b"user:1", 35)?,
Some("Alice v3".to_string()) );
Ok(())
}
#[test]
fn test_version_manager_snapshot_isolation() -> DbxResult<()> {
let oracle = Arc::new(TimestampOracle::default());
let manager = VersionManager::<Vec<u8>>::new(Arc::clone(&oracle));
manager.add_version(b"key1".to_vec(), b"value1".to_vec(), 10)?;
let snapshot_ts = 15;
let value_at_15 = manager.get_at_snapshot(b"key1", snapshot_ts)?;
assert_eq!(value_at_15, Some(b"value1".to_vec()));
manager.add_version(b"key1".to_vec(), b"value2".to_vec(), 20)?;
let value_at_15_again = manager.get_at_snapshot(b"key1", snapshot_ts)?;
assert_eq!(value_at_15_again, Some(b"value1".to_vec()));
let value_at_25 = manager.get_at_snapshot(b"key1", 25)?;
assert_eq!(value_at_25, Some(b"value2".to_vec()));
Ok(())
}
#[test]
fn test_version_manager_garbage_collection() -> DbxResult<()> {
let oracle = Arc::new(TimestampOracle::default());
let manager = VersionManager::<String>::new(Arc::clone(&oracle));
manager.add_version(b"key1".to_vec(), "v1".to_string(), 10)?;
manager.add_version(b"key1".to_vec(), "v2".to_string(), 20)?;
manager.add_version(b"key1".to_vec(), "v3".to_string(), 30)?;
manager.add_version(b"key1".to_vec(), "v4".to_string(), 40)?;
assert_eq!(manager.version_count(b"key1")?, 4);
let deleted = manager.collect_garbage(25)?;
assert_eq!(deleted, 2);
assert_eq!(manager.version_count(b"key1")?, 2);
assert_eq!(
manager.get_at_snapshot(b"key1", 15)?,
None );
assert_eq!(
manager.get_at_snapshot(b"key1", 35)?,
Some("v3".to_string()) );
assert_eq!(
manager.get_at_snapshot(b"key1", 45)?,
Some("v4".to_string()) );
Ok(())
}
#[test]
fn test_version_manager_multiple_keys() -> DbxResult<()> {
let manager = VersionManager::<String>::new_without_oracle();
manager.add_version(b"user:1".to_vec(), "Alice".to_string(), 10)?;
manager.add_version(b"user:2".to_vec(), "Bob".to_string(), 15)?;
manager.add_version(b"user:1".to_vec(), "Alice Updated".to_string(), 20)?;
assert_eq!(manager.key_count()?, 2);
assert_eq!(manager.total_version_count()?, 3);
assert_eq!(
manager.get_at_snapshot(b"user:1", 12)?,
Some("Alice".to_string())
);
assert_eq!(
manager.get_at_snapshot(b"user:2", 18)?,
Some("Bob".to_string())
);
assert_eq!(
manager.get_at_snapshot(b"user:1", 25)?,
Some("Alice Updated".to_string())
);
Ok(())
}
#[test]
fn test_version_manager_gc_preserves_latest() -> DbxResult<()> {
let manager = VersionManager::<Vec<u8>>::new_without_oracle();
manager.add_version(b"key1".to_vec(), b"value1".to_vec(), 10)?;
let deleted = manager.collect_garbage(100)?;
assert_eq!(deleted, 0);
assert_eq!(manager.version_count(b"key1")?, 1);
Ok(())
}
}