use crate::Database;
use crate::engine::crud::{MVCC_PREFIX_LEN, MVCC_TOMBSTONE_PREFIX, MVCC_VALUE_PREFIX};
use crate::error::DbxResult;
use dashmap::DashMap;
use std::sync::Arc;
#[derive(Clone)]
pub struct Snapshot {
pub read_ts: u64,
db: Arc<Database>,
#[allow(clippy::type_complexity)]
visible_cache: Arc<DashMap<(String, Vec<u8>), Option<Vec<u8>>>>,
}
impl Snapshot {
pub fn new(db: Arc<Database>, read_ts: u64) -> Self {
Self {
read_ts,
db,
visible_cache: Arc::new(DashMap::new()),
}
}
pub fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
let cache_key = (table.to_string(), key.to_vec());
if let Some(entry) = self.visible_cache.get(&cache_key) {
return Ok(entry.value().clone());
}
let result = match self.db.get_snapshot(table, key, self.read_ts)? {
Some(Some(value)) => Some(value), Some(None) => None, None => None, };
self.visible_cache.insert(cache_key, result.clone());
Ok(result)
}
pub fn scan(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
let delta_entries = self.db.scan_delta_versioned(table)?;
let wos_entries = self.db.scan_wos_versioned(table)?;
let visible_keys: DashMap<Vec<u8>, (Vec<u8>, u64)> = DashMap::new();
for (encoded_key, encoded_value) in wos_entries {
if let Ok(vk) = crate::transaction::mvcc::version::VersionedKey::decode(&encoded_key)
&& vk.commit_ts <= self.read_ts
{
let value = if encoded_value.is_empty() {
Vec::new()
} else if encoded_value.len() >= MVCC_PREFIX_LEN
&& encoded_value[..MVCC_PREFIX_LEN] == MVCC_VALUE_PREFIX
{
encoded_value[MVCC_PREFIX_LEN..].to_vec()
} else if encoded_value.len() >= MVCC_PREFIX_LEN
&& encoded_value[..MVCC_PREFIX_LEN] == MVCC_TOMBSTONE_PREFIX
{
Vec::new()
} else {
encoded_value.clone() };
visible_keys
.entry(vk.user_key.clone())
.and_modify(|(existing_val, existing_ts)| {
if vk.commit_ts > *existing_ts {
*existing_val = value.clone();
*existing_ts = vk.commit_ts;
}
})
.or_insert((value, vk.commit_ts));
}
}
for (encoded_key, encoded_value) in delta_entries {
if let Ok(vk) = crate::transaction::mvcc::version::VersionedKey::decode(&encoded_key)
&& vk.commit_ts <= self.read_ts
{
let value = if encoded_value.is_empty() {
Vec::new() } else if encoded_value.len() >= MVCC_PREFIX_LEN
&& encoded_value[..MVCC_PREFIX_LEN] == MVCC_VALUE_PREFIX
{
encoded_value[MVCC_PREFIX_LEN..].to_vec()
} else if encoded_value.len() >= MVCC_PREFIX_LEN
&& encoded_value[..MVCC_PREFIX_LEN] == MVCC_TOMBSTONE_PREFIX
{
Vec::new() } else {
encoded_value.clone()
};
visible_keys
.entry(vk.user_key.clone())
.and_modify(|(existing_val, existing_ts)| {
if vk.commit_ts > *existing_ts {
*existing_val = value.clone();
*existing_ts = vk.commit_ts;
}
})
.or_insert((value, vk.commit_ts));
}
}
let result: Vec<(Vec<u8>, Vec<u8>)> = visible_keys
.into_iter()
.filter(|(_, (v, _))| !v.is_empty())
.map(|(k, (v, _))| (k, v))
.collect();
Ok(result)
}
pub fn read_ts(&self) -> u64 {
self.read_ts
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Database;
#[test]
fn test_snapshot_isolation() -> DbxResult<()> {
let db = Arc::new(Database::open_in_memory()?);
db.insert_versioned("test", b"key1", Some(b"v1"), 10)?;
db.insert_versioned("test", b"key2", Some(b"v2"), 20)?;
let snapshot = Snapshot::new(Arc::clone(&db), 15);
assert_eq!(snapshot.get("test", b"key1")?, Some(b"v1".to_vec()));
assert_eq!(snapshot.get("test", b"key2")?, None);
db.insert_versioned("test", b"key1", Some(b"v1_new"), 30)?;
assert_eq!(snapshot.get("test", b"key1")?, Some(b"v1".to_vec()));
Ok(())
}
#[test]
fn test_snapshot_tombstone() -> DbxResult<()> {
let db = Arc::new(Database::open_in_memory()?);
db.insert_versioned("test", b"key1", Some(b"value"), 10)?;
db.insert_versioned("test", b"key1", None, 20)?;
let snapshot1 = Snapshot::new(Arc::clone(&db), 15);
assert_eq!(snapshot1.get("test", b"key1")?, Some(b"value".to_vec()));
let snapshot2 = Snapshot::new(Arc::clone(&db), 25);
assert_eq!(snapshot2.get("test", b"key1")?, None);
Ok(())
}
#[test]
fn test_snapshot_cache() -> DbxResult<()> {
let db = Arc::new(Database::open_in_memory()?);
db.insert_versioned("test", b"key1", Some(b"value"), 10)?;
let snapshot = Snapshot::new(Arc::clone(&db), 15);
let val1 = snapshot.get("test", b"key1")?;
assert_eq!(val1, Some(b"value".to_vec()));
let val2 = snapshot.get("test", b"key1")?;
assert_eq!(val2, Some(b"value".to_vec()));
assert_eq!(snapshot.visible_cache.len(), 1);
Ok(())
}
}