use std::collections::{BTreeMap, BTreeSet, HashMap};
use crate::types::{CommittedVersion, Intent, Key, Timestamp, TxnId, Value};
pub trait Backend {
fn get_committed_versions(&self, key: &[u8]) -> Result<Vec<CommittedVersion>, String>;
fn get_latest_committed(&self, key: &[u8]) -> Result<Option<CommittedVersion>, String>;
fn get_visible_committed(
&self,
key: &[u8],
read_ts: Timestamp,
) -> Result<Option<CommittedVersion>, String>;
fn get_latest_commit_ts(&self, key: &[u8]) -> Result<Option<Timestamp>, String>;
fn get_intent(&self, key: &[u8]) -> Result<Option<Intent>, String>;
fn put_intent(&mut self, intent: Intent) -> Result<(), String>;
fn remove_intent(
&mut self,
key: &[u8],
txn_id: TxnId,
start_ts: Timestamp,
) -> Result<bool, String>;
fn put_committed(&mut self, version: CommittedVersion) -> Result<(), String>;
fn put_committed_batch(&mut self, versions: Vec<CommittedVersion>) -> Result<(), String>;
fn remove_committed_version(&mut self, key: &[u8], commit_ts: Timestamp) -> Result<(), String>;
fn all_keys(&self) -> Result<Vec<Key>, String>;
fn put_intents_batch(&mut self, intents: Vec<Intent>) -> Result<(), String>;
fn commit_intents_batch(
&mut self,
commits: Vec<CommittedVersion>,
removed_intents: Vec<(Key, TxnId, Timestamp)>,
) -> Result<(), String>;
fn remove_intents_batch(&mut self, intents: Vec<(Key, TxnId, Timestamp)>)
-> Result<(), String>;
fn keys_from(&self, start: Option<&[u8]>, limit: usize) -> Result<Vec<Key>, String>;
fn keys_from_prefix(
&self,
prefix: &[u8],
start: Option<&[u8]>,
limit: usize,
) -> Result<Vec<Key>, String>;
fn get_committed_timestamps_before(
&self,
key: &[u8],
before_ts: Timestamp,
limit: usize,
) -> Result<Vec<Timestamp>, String>;
fn collapse_tombstone(
&mut self,
key: &[u8],
tombstone_ts: Timestamp,
older_ts: Vec<Timestamp>,
) -> Result<(), String>;
}
#[derive(Debug, Clone, Default)]
pub struct InMemoryBackend {
committed: BTreeMap<(Key, Timestamp), Option<Value>>,
intents: HashMap<Key, Intent>,
all_keys_set: BTreeSet<Key>,
}
impl InMemoryBackend {
pub fn new() -> Self {
Self::default()
}
}
impl Backend for InMemoryBackend {
fn get_committed_versions(&self, key: &[u8]) -> Result<Vec<CommittedVersion>, String> {
let mut result = Vec::new();
let start = (key.to_vec(), Timestamp(0));
for ((k, ts), value) in self.committed.range(start..) {
if k.as_slice() != key {
break;
}
result.push(CommittedVersion {
key: k.clone(),
commit_ts: *ts,
value: value.clone(),
});
}
Ok(result)
}
fn get_latest_committed(&self, key: &[u8]) -> Result<Option<CommittedVersion>, String> {
let range = (key.to_vec(), Timestamp(0))..=(key.to_vec(), Timestamp(u64::MAX));
if let Some(((k, ts), value)) = self.committed.range(range).next_back() {
if k.as_slice() == key {
return Ok(Some(CommittedVersion {
key: k.clone(),
commit_ts: *ts,
value: value.clone(),
}));
}
}
Ok(None)
}
fn get_visible_committed(
&self,
key: &[u8],
read_ts: Timestamp,
) -> Result<Option<CommittedVersion>, String> {
let range = (key.to_vec(), Timestamp(0))..=(key.to_vec(), read_ts);
if let Some(((k, ts), value)) = self.committed.range(range).next_back() {
if k.as_slice() == key {
return Ok(Some(CommittedVersion {
key: k.clone(),
commit_ts: *ts,
value: value.clone(),
}));
}
}
Ok(None)
}
fn get_latest_commit_ts(&self, key: &[u8]) -> Result<Option<Timestamp>, String> {
let range = (key.to_vec(), Timestamp(0))..=(key.to_vec(), Timestamp(u64::MAX));
if let Some(((k, ts), _)) = self.committed.range(range).next_back() {
if k.as_slice() == key {
return Ok(Some(*ts));
}
}
Ok(None)
}
fn get_intent(&self, key: &[u8]) -> Result<Option<Intent>, String> {
Ok(self.intents.get(key).cloned())
}
fn put_intent(&mut self, intent: Intent) -> Result<(), String> {
self.all_keys_set.insert(intent.key.clone());
self.intents.insert(intent.key.clone(), intent);
Ok(())
}
fn remove_intent(
&mut self,
key: &[u8],
txn_id: TxnId,
start_ts: Timestamp,
) -> Result<bool, String> {
if let Some(intent) = self.intents.get(key) {
if intent.txn_id == txn_id && intent.start_ts == start_ts {
self.intents.remove(key);
self.maybe_remove_from_keys(key);
return Ok(true);
}
}
Ok(false)
}
fn put_committed(&mut self, version: CommittedVersion) -> Result<(), String> {
self.all_keys_set.insert(version.key.clone());
self.committed.insert(
(version.key.clone(), version.commit_ts),
version.value.clone(),
);
Ok(())
}
fn put_committed_batch(&mut self, versions: Vec<CommittedVersion>) -> Result<(), String> {
for version in versions {
self.all_keys_set.insert(version.key.clone());
self.committed.insert(
(version.key.clone(), version.commit_ts),
version.value.clone(),
);
}
Ok(())
}
fn remove_committed_version(&mut self, key: &[u8], commit_ts: Timestamp) -> Result<(), String> {
self.committed.remove(&(key.to_vec(), commit_ts));
self.maybe_remove_from_keys(key);
Ok(())
}
fn all_keys(&self) -> Result<Vec<Key>, String> {
Ok(self.all_keys_set.iter().cloned().collect())
}
fn put_intents_batch(&mut self, intents: Vec<Intent>) -> Result<(), String> {
for intent in intents {
self.all_keys_set.insert(intent.key.clone());
self.intents.insert(intent.key.clone(), intent);
}
Ok(())
}
fn commit_intents_batch(
&mut self,
commits: Vec<CommittedVersion>,
removed_intents: Vec<(Key, TxnId, Timestamp)>,
) -> Result<(), String> {
for version in commits {
self.all_keys_set.insert(version.key.clone());
self.committed.insert(
(version.key.clone(), version.commit_ts),
version.value.clone(),
);
}
for (key, txn_id, start_ts) in removed_intents {
if let Some(intent) = self.intents.get(&key) {
if intent.txn_id == txn_id && intent.start_ts == start_ts {
self.intents.remove(&key);
self.maybe_remove_from_keys(&key);
}
}
}
Ok(())
}
fn remove_intents_batch(
&mut self,
intents: Vec<(Key, TxnId, Timestamp)>,
) -> Result<(), String> {
for (key, txn_id, start_ts) in intents {
if let Some(intent) = self.intents.get(&key) {
if intent.txn_id == txn_id && intent.start_ts == start_ts {
self.intents.remove(&key);
self.maybe_remove_from_keys(&key);
}
}
}
Ok(())
}
fn keys_from(&self, start: Option<&[u8]>, limit: usize) -> Result<Vec<Key>, String> {
if let Some(s) = start {
Ok(self
.all_keys_set
.range(s.to_vec()..)
.take(limit)
.cloned()
.collect())
} else {
Ok(self.all_keys_set.iter().take(limit).cloned().collect())
}
}
fn keys_from_prefix(
&self,
prefix: &[u8],
start: Option<&[u8]>,
limit: usize,
) -> Result<Vec<Key>, String> {
if limit == 0 {
return Ok(Vec::new());
}
if prefix.is_empty() {
return Err("Prefix cannot be empty".to_string());
}
if let Some(s) = start {
if !s.starts_with(prefix) {
return Err("Start cursor must start with the prefix".to_string());
}
}
let scan_start = start.unwrap_or(prefix);
let mut result = Vec::new();
let range_start = (scan_start.to_vec(), Timestamp(0));
for ((k, _), _) in self.committed.range(range_start..) {
if !k.starts_with(prefix) {
break;
}
if result.last() != Some(k) {
result.push(k.clone());
if result.len() == limit {
break;
}
}
}
Ok(result)
}
fn get_committed_timestamps_before(
&self,
key: &[u8],
before_ts: Timestamp,
limit: usize,
) -> Result<Vec<Timestamp>, String> {
let range = (key.to_vec(), Timestamp(0))..(key.to_vec(), before_ts);
let mut result = Vec::new();
for ((k, ts), _) in self.committed.range(range).rev().take(limit) {
if k.as_slice() == key {
result.push(*ts);
}
}
Ok(result)
}
fn collapse_tombstone(
&mut self,
key: &[u8],
tombstone_ts: Timestamp,
older_ts: Vec<Timestamp>,
) -> Result<(), String> {
self.committed.remove(&(key.to_vec(), tombstone_ts));
for ts in older_ts {
self.committed.remove(&(key.to_vec(), ts));
}
self.maybe_remove_from_keys(key);
Ok(())
}
}
impl InMemoryBackend {
fn maybe_remove_from_keys(&mut self, key: &[u8]) {
if !self.intents.contains_key(key) {
let range = (key.to_vec(), Timestamp(0))..=(key.to_vec(), Timestamp(u64::MAX));
if self.committed.range(range).next().is_none() {
self.all_keys_set.remove(key);
}
}
}
}