use crate::bloom::BloomFilter;
use crate::cursor::{Cursor, KeyIterator, ScanIterator};
use crate::direction::Direction;
use crate::err::Error;
use crate::inner::{Inner, COUNTER_TOMBSTONE};
use crate::iter::{for_each_in_range, MergeIterator, MergeQueueIter, TreeIterState};
use crate::kv::IntoBytes;
use crate::pool::Pool;
use crate::queue::{Commit, Merge};
use crate::version::Version;
use crate::versions::Versions;
use crate::LOG_TARGET_CONFLICTS;
use arc_swap::ArcSwap;
use bytes::Bytes;
use crossbeam_skiplist::SkipMap;
use papaya::HashSet;
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::ops::Bound;
use std::ops::ControlFlow;
use std::ops::Range;
use std::sync::atomic::{fence, AtomicU64, Ordering};
use std::sync::Arc;
use tracing::debug;
#[derive(PartialEq, PartialOrd)]
pub enum IsolationLevel {
SnapshotIsolation,
SerializableSnapshotIsolation,
}
pub struct Transaction {
pub(crate) pool: Arc<Pool>,
pub(crate) inner: Option<TransactionInner>,
}
impl Drop for Transaction {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
if release_counter(&inner.counter_commit) {
inner.database.counter_by_commit.remove(&inner.commit);
}
if release_counter(&inner.counter_version) {
inner.database.counter_by_oracle.remove(&inner.version);
}
self.pool.put(inner);
}
}
}
impl Transaction {
pub fn with_snapshot_isolation(mut self) -> Self {
self.inner.as_mut().unwrap().mode = IsolationLevel::SnapshotIsolation;
self
}
pub fn with_serializable_snapshot_isolation(mut self) -> Self {
self.inner.as_mut().unwrap().mode = IsolationLevel::SerializableSnapshotIsolation;
self
}
pub fn set_savepoint(&mut self) -> Result<(), Error> {
self.inner.as_mut().unwrap().set_savepoint()
}
pub fn rollback_to_savepoint(&mut self) -> Result<(), Error> {
self.inner.as_mut().unwrap().rollback_to_savepoint()
}
pub fn version(&self) -> u64 {
self.inner.as_ref().unwrap().version()
}
pub fn closed(&self) -> bool {
self.inner.as_ref().unwrap().closed()
}
pub fn cancel(&mut self) -> Result<(), Error> {
self.inner.as_mut().unwrap().cancel()
}
pub fn commit(&mut self) -> Result<(), Error> {
self.inner.as_mut().unwrap().commit()
}
pub fn exists<K>(&self, key: K) -> Result<bool, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().exists(key)
}
pub fn exists_at_version<K>(&self, key: K, version: u64) -> Result<bool, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().exists_at_version(key, version)
}
pub fn get<K>(&self, key: K) -> Result<Option<Bytes>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().get(key)
}
pub fn get_at_version<K>(&self, key: K, version: u64) -> Result<Option<Bytes>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().get_at_version(key, version)
}
pub fn getm<K>(&self, keys: Vec<K>) -> Result<Vec<Option<Bytes>>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().getm(keys)
}
pub fn getm_at_version<K>(
&self,
keys: Vec<K>,
version: u64,
) -> Result<Vec<Option<Bytes>>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().getm_at_version(keys, version)
}
pub fn set<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
where
K: IntoBytes,
V: IntoBytes,
{
self.inner.as_mut().unwrap().set(key, val)
}
pub fn put<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
where
K: IntoBytes,
V: IntoBytes,
{
self.inner.as_mut().unwrap().put(key, val)
}
pub fn putc<K, V, C>(&mut self, key: K, val: V, chk: Option<C>) -> Result<(), Error>
where
K: IntoBytes,
V: IntoBytes,
C: IntoBytes,
{
self.inner.as_mut().unwrap().putc(key, val, chk)
}
pub fn del<K>(&mut self, key: K) -> Result<(), Error>
where
K: IntoBytes,
{
self.inner.as_mut().unwrap().del(key)
}
pub fn delc<K, C>(&mut self, key: K, chk: Option<C>) -> Result<(), Error>
where
K: IntoBytes,
C: IntoBytes,
{
self.inner.as_mut().unwrap().delc(key, chk)
}
pub fn total<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
) -> Result<usize, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().total(rng, skip, limit)
}
pub fn total_at_version<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
version: u64,
) -> Result<usize, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().total_at_version(rng, skip, limit, version)
}
pub fn keys<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
) -> Result<Vec<Bytes>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().keys(rng, skip, limit)
}
pub fn keys_reverse<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
) -> Result<Vec<Bytes>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().keys_reverse(rng, skip, limit)
}
pub fn keys_at_version<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
version: u64,
) -> Result<Vec<Bytes>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().keys_at_version(rng, skip, limit, version)
}
pub fn keys_at_version_reverse<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
version: u64,
) -> Result<Vec<Bytes>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().keys_at_version_reverse(rng, skip, limit, version)
}
pub fn scan<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
) -> Result<Vec<(Bytes, Bytes)>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().scan(rng, skip, limit)
}
pub fn scan_reverse<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
) -> Result<Vec<(Bytes, Bytes)>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().scan_reverse(rng, skip, limit)
}
pub fn scan_at_version<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
version: u64,
) -> Result<Vec<(Bytes, Bytes)>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().scan_at_version(rng, skip, limit, version)
}
pub fn scan_at_version_reverse<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
version: u64,
) -> Result<Vec<(Bytes, Bytes)>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().scan_at_version_reverse(rng, skip, limit, version)
}
pub fn scan_all_versions<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
) -> Result<Vec<(Bytes, u64, Option<Bytes>)>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().scan_all_versions(rng, skip, limit)
}
pub fn scan_for_each<K, F>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
f: F,
) -> Result<usize, Error>
where
K: IntoBytes,
F: FnMut(&Bytes, &Bytes) -> bool,
{
self.inner.as_ref().unwrap().scan_for_each(rng, skip, limit, f)
}
pub fn keys_for_each<K, F>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
f: F,
) -> Result<usize, Error>
where
K: IntoBytes,
F: FnMut(&Bytes) -> bool,
{
self.inner.as_ref().unwrap().keys_for_each(rng, skip, limit, f)
}
pub fn scan_into<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
buf: &mut Vec<(Bytes, Bytes)>,
) -> Result<(), Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().scan_into(rng, skip, limit, buf)
}
pub fn keys_into<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
buf: &mut Vec<Bytes>,
) -> Result<(), Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().keys_into(rng, skip, limit, buf)
}
pub fn cursor<K>(&self, rng: Range<K>) -> Result<Cursor<'_>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().cursor(rng)
}
pub fn cursor_at_version<K>(&self, rng: Range<K>, version: u64) -> Result<Cursor<'_>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().cursor_at_version(rng, version)
}
pub fn keys_iter<K>(&self, rng: Range<K>) -> Result<KeyIterator<'_>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().keys_iter(rng)
}
pub fn keys_iter_reverse<K>(&self, rng: Range<K>) -> Result<KeyIterator<'_>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().keys_iter_reverse(rng)
}
pub fn keys_iter_at_version<K>(
&self,
rng: Range<K>,
version: u64,
) -> Result<KeyIterator<'_>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().keys_iter_at_version(rng, version)
}
pub fn keys_iter_at_version_reverse<K>(
&self,
rng: Range<K>,
version: u64,
) -> Result<KeyIterator<'_>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().keys_iter_at_version_reverse(rng, version)
}
pub fn scan_iter<K>(&self, rng: Range<K>) -> Result<ScanIterator<'_>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().scan_iter(rng)
}
pub fn scan_iter_reverse<K>(&self, rng: Range<K>) -> Result<ScanIterator<'_>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().scan_iter_reverse(rng)
}
pub fn scan_iter_at_version<K>(
&self,
rng: Range<K>,
version: u64,
) -> Result<ScanIterator<'_>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().scan_iter_at_version(rng, version)
}
pub fn scan_iter_at_version_reverse<K>(
&self,
rng: Range<K>,
version: u64,
) -> Result<ScanIterator<'_>, Error>
where
K: IntoBytes,
{
self.inner.as_ref().unwrap().scan_iter_at_version_reverse(rng, version)
}
}
struct SavepointState {
readset: HashSet<Bytes>,
scanset: SkipMap<Bytes, ArcSwap<Bytes>>,
writeset: BTreeMap<Bytes, Option<Bytes>>,
}
pub(crate) struct TransactionInner {
pub(crate) mode: IsolationLevel,
pub(crate) done: bool,
pub(crate) write: bool,
pub(crate) commit: u64,
pub(crate) version: u64,
pub(crate) readset: HashSet<Bytes>,
pub(crate) readset_bloom: RefCell<BloomFilter>,
pub(crate) scanset: SkipMap<Bytes, ArcSwap<Bytes>>,
pub(crate) writeset: BTreeMap<Bytes, Option<Bytes>>,
pub(crate) database: Arc<Inner>,
pub(crate) counter_commit: Arc<AtomicU64>,
pub(crate) counter_version: Arc<AtomicU64>,
reset_threshold: usize,
savepoint_stack: Vec<SavepointState>,
}
#[inline]
fn register_counter(
map: &SkipMap<u64, Arc<AtomicU64>>,
atomic: &AtomicU64,
gc_floor: Option<&AtomicU64>,
) -> (u64, Arc<AtomicU64>) {
loop {
let v = atomic.load(Ordering::SeqCst);
let counter = map.get_or_insert_with(v, || Arc::new(AtomicU64::new(0))).value().clone();
let acquired = loop {
let cur = counter.load(Ordering::Acquire);
if cur == COUNTER_TOMBSTONE {
break false;
}
if counter
.compare_exchange_weak(cur, cur + 1, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
break true;
}
};
if !acquired {
continue;
}
fence(Ordering::SeqCst);
let oracle_stable = atomic.load(Ordering::SeqCst) == v;
let floor_ok = match gc_floor {
Some(f) => f.load(Ordering::SeqCst) <= v,
None => true,
};
if oracle_stable && floor_ok {
return (v, counter);
}
if release_counter(&counter) {
if let Some(e) = map.get(&v) {
if Arc::ptr_eq(e.value(), &counter) {
e.remove();
}
}
}
}
}
#[inline]
fn release_counter(counter: &AtomicU64) -> bool {
loop {
let cur = counter.load(Ordering::Acquire);
if cur > 1 {
if counter
.compare_exchange_weak(cur, cur - 1, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return false;
}
continue;
}
debug_assert_eq!(cur, 1);
if counter
.compare_exchange(1, COUNTER_TOMBSTONE, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return true;
}
}
}
impl TransactionInner {
pub(crate) fn new(db: Arc<Inner>, write: bool) -> Self {
let (version, counter_version) =
register_counter(&db.counter_by_oracle, &db.oracle.inner.timestamp, Some(&db.gc_floor));
let (commit, counter_commit) =
register_counter(&db.counter_by_commit, &db.transaction_commit_id, None);
let threshold = db.reset_threshold;
Self {
mode: IsolationLevel::SerializableSnapshotIsolation,
done: false,
write,
commit,
version,
readset: HashSet::new(),
readset_bloom: RefCell::new(BloomFilter::new()),
scanset: SkipMap::new(),
writeset: BTreeMap::new(),
database: db,
counter_commit,
counter_version,
reset_threshold: threshold,
savepoint_stack: Vec::new(),
}
}
pub(crate) fn reset(&mut self, write: bool) {
self.mode = IsolationLevel::SerializableSnapshotIsolation;
self.reset_threshold = self.database.reset_threshold;
let (version, counter_version) = register_counter(
&self.database.counter_by_oracle,
&self.database.oracle.inner.timestamp,
Some(&self.database.gc_floor),
);
let (commit, counter_commit) = register_counter(
&self.database.counter_by_commit,
&self.database.transaction_commit_id,
None,
);
self.savepoint_stack.clear();
let threshold = self.reset_threshold;
self.scanset.clear();
self.readset.pin().clear();
self.readset_bloom.borrow_mut().clear();
match self.writeset.len() > threshold {
true => self.writeset = BTreeMap::new(),
false => self.writeset.clear(),
};
if self.savepoint_stack.len() > threshold {
self.savepoint_stack = Vec::new();
}
self.done = false;
self.write = write;
self.commit = commit;
self.version = version;
self.counter_commit = counter_commit;
self.counter_version = counter_version;
}
pub fn version(&self) -> u64 {
self.version
}
pub fn closed(&self) -> bool {
self.done
}
pub fn cancel(&mut self) -> Result<(), Error> {
if self.done == true {
return Err(Error::TxClosed);
}
self.done = true;
self.readset.pin().clear();
self.readset_bloom.borrow_mut().clear();
self.scanset.clear();
self.writeset.clear();
self.savepoint_stack.clear();
Ok(())
}
pub fn set_savepoint(&mut self) -> Result<(), Error> {
if self.done == true {
return Err(Error::TxClosed);
}
if self.write == false {
return Err(Error::TxNotWritable);
}
let readset = HashSet::new();
{
let pin = readset.pin();
for key in self.readset.pin().iter() {
pin.insert(key.clone());
}
};
let scanset = SkipMap::new();
for entry in self.scanset.iter() {
let value = Arc::clone(&entry.value().load());
scanset.insert(entry.key().clone(), ArcSwap::new(value));
}
self.savepoint_stack.push(SavepointState {
readset,
scanset,
writeset: self.writeset.clone(),
});
Ok(())
}
pub fn rollback_to_savepoint(&mut self) -> Result<(), Error> {
if self.done == true {
return Err(Error::TxClosed);
}
if self.write == false {
return Err(Error::TxNotWritable);
}
let savepoint = self.savepoint_stack.pop().ok_or(Error::NoSavepoint)?;
let readset = self.readset.pin();
readset.clear();
for key in savepoint.readset.pin().iter() {
readset.insert(key.clone());
}
self.scanset.clear();
for entry in savepoint.scanset.iter() {
let value = Arc::clone(&entry.value().load());
self.scanset.insert(entry.key().clone(), ArcSwap::new(value));
}
self.writeset = savepoint.writeset;
Ok(())
}
pub fn commit(&mut self) -> Result<(), Error> {
if self.done == true {
return Err(Error::TxClosed);
}
self.done = true;
if self.writeset.is_empty() {
self.scanset.clear();
self.readset.pin().clear();
self.readset_bloom.borrow_mut().clear();
self.savepoint_stack.clear();
return Ok(());
}
let writeset = Arc::new(std::mem::take(&mut self.writeset));
let mut writeset_bloom = BloomFilter::new();
for key in writeset.keys() {
writeset_bloom.insert(key);
}
let min_key = writeset.keys().next().cloned().unwrap_or_default();
let max_key = writeset.keys().next_back().cloned().unwrap_or_default();
let (version, entry) = self.atomic_commit(Commit {
writeset: writeset.clone(),
id: self.database.transaction_queue_id.fetch_add(1, Ordering::AcqRel) + 1,
writeset_bloom,
min_key,
max_key,
});
if self.mode >= IsolationLevel::SnapshotIsolation {
for tx in self.database.transaction_commit_queue.range(self.commit + 1..version) {
if !tx.value().is_disjoint_writeset_bloom(&entry) {
self.database.transaction_commit_queue.remove(&version);
self.scanset.clear();
self.readset.pin().clear();
self.readset_bloom.borrow_mut().clear();
self.writeset.clear();
self.savepoint_stack.clear();
return Err(Error::KeyWriteConflict);
}
if self.mode >= IsolationLevel::SerializableSnapshotIsolation {
if !tx
.value()
.is_disjoint_readset_bloom(&self.readset, &self.readset_bloom.borrow())
{
self.database.transaction_commit_queue.remove(&version);
self.scanset.clear();
self.readset.pin().clear();
self.readset_bloom.borrow_mut().clear();
self.writeset.clear();
self.savepoint_stack.clear();
return Err(Error::KeyReadConflict);
}
let scan_overlap = if let Some(scan_front) = self.scanset.front() {
if let Some(scan_back) = self.scanset.back() {
let scan_max_end = Arc::clone(&scan_back.value().load());
tx.value().may_overlap_range(scan_front.key(), &scan_max_end)
} else {
false
}
} else {
false
};
if scan_overlap {
for k in tx.value().writeset.keys() {
if let Some(entry) = self.scanset.range::<Bytes, _>(..=k).next_back() {
if **entry.value().load() > *k {
self.database.transaction_commit_queue.remove(&version);
self.readset.pin().clear();
self.readset_bloom.borrow_mut().clear();
self.scanset.clear();
self.writeset.clear();
self.savepoint_stack.clear();
#[cfg(debug_assertions)]
debug!(target: LOG_TARGET_CONFLICTS, "KeyReadConflict involving {:?}", k);
return Err(Error::KeyReadConflict);
}
}
}
}
}
}
}
let (version, entry) = self.atomic_merge(Merge {
writeset,
id: self.database.transaction_merge_id.fetch_add(1, Ordering::AcqRel) + 1,
});
for (key, value) in entry.writeset.iter() {
let value = value.clone();
let mut iter = self.database.datastore.raw_iter_mut();
if iter.seek_exact(key) {
let (_, versions) = iter.next().expect("seek_exact returned true");
versions.push(Version {
version,
value,
});
} else {
iter.insert_here(
key.clone(),
Versions::from(Version {
version,
value,
}),
);
}
self.database.gc_dirty_keys.push(key.clone());
}
#[cfg(not(target_arch = "wasm32"))]
if let Some(p) = self.database.persistence.read().clone() {
if let Err(e) = p.append(version, entry.writeset.as_ref()) {
self.database.transaction_merge_queue.remove(&version);
self.scanset.clear();
self.readset.pin().clear();
self.readset_bloom.borrow_mut().clear();
self.writeset.clear();
self.savepoint_stack.clear();
return Err(Error::TxCommitNotPersisted(e));
}
}
self.database.transaction_merge_queue.remove(&version);
self.scanset.clear();
self.readset.pin().clear();
self.readset_bloom.borrow_mut().clear();
self.writeset.clear();
self.savepoint_stack.clear();
Ok(())
}
pub fn exists<K>(&self, key: K) -> Result<bool, Error>
where
K: IntoBytes,
{
let lookup = key.as_slice();
if self.done == true {
return Err(Error::TxClosed);
}
let res = match self.write {
true => match self.writeset.get(lookup) {
Some(_) => true,
None => {
let res = self.exists_in_datastore(lookup, self.version);
if self.mode >= IsolationLevel::SerializableSnapshotIsolation {
self.readset_bloom.borrow_mut().insert(lookup);
self.readset.pin().insert(key.into_bytes());
}
res
}
},
false => self.exists_in_datastore(lookup, self.version),
};
Ok(res)
}
pub fn exists_at_version<K>(&self, key: K, version: u64) -> Result<bool, Error>
where
K: IntoBytes,
{
let lookup = key.as_slice();
if self.done == true {
return Err(Error::TxClosed);
}
if self.version <= version {
return Err(Error::VersionInFuture);
}
let res = self.exists_in_datastore(lookup, version);
Ok(res)
}
pub fn get<K>(&self, key: K) -> Result<Option<Bytes>, Error>
where
K: IntoBytes,
{
let lookup = key.as_slice();
if self.done == true {
return Err(Error::TxClosed);
}
let res = match self.write {
true => match self.writeset.get(lookup) {
Some(v) => v.clone(),
None => {
let res = self.fetch_in_datastore(lookup, self.version);
if self.mode >= IsolationLevel::SerializableSnapshotIsolation {
let guard = self.readset.pin();
if !guard.contains(lookup) {
self.readset_bloom.borrow_mut().insert(lookup);
guard.insert(key.into_bytes());
}
}
res
}
},
false => self.fetch_in_datastore(lookup, self.version),
};
Ok(res)
}
pub fn get_at_version<K>(&self, key: K, version: u64) -> Result<Option<Bytes>, Error>
where
K: IntoBytes,
{
let lookup = key.as_slice();
if self.done == true {
return Err(Error::TxClosed);
}
if self.version <= version {
return Err(Error::VersionInFuture);
}
let res = self.fetch_in_datastore(lookup, version);
Ok(res)
}
pub fn getm<K>(&self, keys: Vec<K>) -> Result<Vec<Option<Bytes>>, Error>
where
K: IntoBytes,
{
if self.done == true {
return Err(Error::TxClosed);
}
let mut results = Vec::with_capacity(keys.len());
match self.write {
true => {
for key in keys {
let lookup = key.as_slice();
let res = match self.writeset.get(lookup) {
Some(v) => v.clone(),
None => {
let res = self.fetch_in_datastore(lookup, self.version);
if self.mode >= IsolationLevel::SerializableSnapshotIsolation
&& !self.readset.pin().contains(lookup)
{
self.readset_bloom.borrow_mut().insert(lookup);
self.readset.pin().insert(key.into_bytes());
}
res
}
};
results.push(res);
}
}
false => {
for key in keys {
let lookup = key.as_slice();
let res = self.fetch_in_datastore(lookup, self.version);
results.push(res);
}
}
}
Ok(results)
}
pub fn getm_at_version<K>(
&self,
keys: Vec<K>,
version: u64,
) -> Result<Vec<Option<Bytes>>, Error>
where
K: IntoBytes,
{
if self.done == true {
return Err(Error::TxClosed);
}
if self.version <= version {
return Err(Error::VersionInFuture);
}
let mut results = Vec::with_capacity(keys.len());
for key in keys {
let lookup = key.as_slice();
let res = self.fetch_in_datastore(lookup, version);
results.push(res);
}
Ok(results)
}
pub fn set<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
where
K: IntoBytes,
V: IntoBytes,
{
if self.done == true {
return Err(Error::TxClosed);
}
if self.write == false {
return Err(Error::TxNotWritable);
}
self.writeset.insert(key.into_bytes(), Some(val.into_bytes()));
Ok(())
}
pub fn put<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
where
K: IntoBytes,
V: IntoBytes,
{
let lookup = key.as_slice();
if self.done == true {
return Err(Error::TxClosed);
}
if self.write == false {
return Err(Error::TxNotWritable);
}
match self.writeset.get(lookup) {
Some(_) => return Err(Error::KeyAlreadyExists),
None => match self.exists_in_datastore(lookup, self.version) {
false => self.writeset.insert(key.into_bytes(), Some(val.into_bytes())),
true => return Err(Error::KeyAlreadyExists),
},
};
Ok(())
}
pub fn putc<K, V, C>(&mut self, key: K, val: V, chk: Option<C>) -> Result<(), Error>
where
K: IntoBytes,
V: IntoBytes,
C: IntoBytes,
{
let lookup = key.as_slice();
if self.done == true {
return Err(Error::TxClosed);
}
if self.write == false {
return Err(Error::TxNotWritable);
}
match (chk.as_ref(), self.writeset.get(lookup)) {
(Some(x), Some(Some(y))) if x.as_slice() == y.as_slice() => {
self.writeset.insert(key.into_bytes(), Some(val.into_bytes()));
}
(None, Some(None)) => {
self.writeset.insert(key.into_bytes(), Some(val.into_bytes()));
}
(_, Some(_)) => return Err(Error::ValNotExpectedValue),
_ => match self.equals_in_datastore(lookup, chk, self.version) {
true => {
self.writeset.insert(key.into_bytes(), Some(val.into_bytes()));
}
_ => return Err(Error::ValNotExpectedValue),
},
};
Ok(())
}
pub fn del<K>(&mut self, key: K) -> Result<(), Error>
where
K: IntoBytes,
{
if self.done == true {
return Err(Error::TxClosed);
}
if self.write == false {
return Err(Error::TxNotWritable);
}
self.writeset.insert(key.into_bytes(), None);
Ok(())
}
pub fn delc<K, C>(&mut self, key: K, chk: Option<C>) -> Result<(), Error>
where
K: IntoBytes,
C: IntoBytes,
{
let lookup = key.as_slice();
if self.done == true {
return Err(Error::TxClosed);
}
if self.write == false {
return Err(Error::TxNotWritable);
}
match (chk.as_ref(), self.writeset.get(lookup)) {
(Some(x), Some(Some(y))) if x.as_slice() == y.as_slice() => {
self.writeset.insert(key.into_bytes(), None);
}
(None, Some(None)) => {
self.writeset.insert(key.into_bytes(), None);
}
(_, Some(_)) => return Err(Error::ValNotExpectedValue),
_ => match self.equals_in_datastore(lookup, chk, self.version) {
true => {
self.writeset.insert(key.into_bytes(), None);
}
_ => return Err(Error::ValNotExpectedValue),
},
};
Ok(())
}
pub fn total<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
) -> Result<usize, Error>
where
K: IntoBytes,
{
self.total_any(rng, skip, limit, Direction::Forward, self.version)
}
pub fn total_at_version<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
version: u64,
) -> Result<usize, Error>
where
K: IntoBytes,
{
self.total_any(rng, skip, limit, Direction::Forward, version)
}
pub fn keys<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
) -> Result<Vec<Bytes>, Error>
where
K: IntoBytes,
{
self.keys_any(rng, skip, limit, Direction::Forward, self.version)
}
pub fn keys_reverse<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
) -> Result<Vec<Bytes>, Error>
where
K: IntoBytes,
{
self.keys_any(rng, skip, limit, Direction::Reverse, self.version)
}
pub fn keys_at_version<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
version: u64,
) -> Result<Vec<Bytes>, Error>
where
K: IntoBytes,
{
self.keys_any(rng, skip, limit, Direction::Forward, version)
}
pub fn keys_at_version_reverse<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
version: u64,
) -> Result<Vec<Bytes>, Error>
where
K: IntoBytes,
{
self.keys_any(rng, skip, limit, Direction::Reverse, version)
}
pub fn scan<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
) -> Result<Vec<(Bytes, Bytes)>, Error>
where
K: IntoBytes,
{
self.scan_any(rng, skip, limit, Direction::Forward, self.version)
}
pub fn scan_reverse<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
) -> Result<Vec<(Bytes, Bytes)>, Error>
where
K: IntoBytes,
{
self.scan_any(rng, skip, limit, Direction::Reverse, self.version)
}
pub fn scan_at_version<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
version: u64,
) -> Result<Vec<(Bytes, Bytes)>, Error>
where
K: IntoBytes,
{
self.scan_any(rng, skip, limit, Direction::Forward, version)
}
pub fn scan_at_version_reverse<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
version: u64,
) -> Result<Vec<(Bytes, Bytes)>, Error>
where
K: IntoBytes,
{
self.scan_any(rng, skip, limit, Direction::Reverse, version)
}
pub fn scan_all_versions<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
) -> Result<Vec<(Bytes, u64, Option<Bytes>)>, Error>
where
K: IntoBytes,
{
self.scan_all_versions_any(rng, skip, limit, self.version)
}
pub fn scan_for_each<K, F>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
mut f: F,
) -> Result<usize, Error>
where
K: IntoBytes,
F: FnMut(&Bytes, &Bytes) -> bool,
{
if self.done == true {
return Err(Error::TxClosed);
}
let mut count = 0;
let beg = &rng.start.into_bytes();
let end = &rng.end.into_bytes();
let mut skip = skip.unwrap_or_default();
if self.write && self.mode >= IsolationLevel::SerializableSnapshotIsolation {
self.track_scan_range(beg, end);
}
if self.database.transaction_merge_queue.is_empty()
&& self.writeset.range::<Bytes, _>(beg..end).next().is_none()
{
let version = self.version;
for_each_in_range(&self.database.datastore, beg, end, |k, v| {
let Some(value) = v.fetch_version(version) else {
return ControlFlow::Continue(());
};
if skip > 0 {
skip -= 1;
return ControlFlow::Continue(());
}
count += 1;
if !f(k, &value) {
return ControlFlow::Break(());
}
if let Some(l) = limit {
if count >= l {
return ControlFlow::Break(());
}
}
ControlFlow::Continue(())
});
return Ok(count);
}
let join_iter = Box::new(MergeQueueIter::new(
self.snapshot_merge_sources(self.version),
beg.clone(),
end.clone(),
Direction::Forward,
));
let iter = MergeIterator::new(
TreeIterState::build(&self.database.datastore, beg, end, Direction::Forward),
join_iter,
self.writeset.range::<Bytes, _>(beg..end),
Direction::Forward,
self.version,
skip,
);
for (key, val) in iter {
if let Some(val) = &val {
count += 1;
if !f(&key, val) {
break;
}
if let Some(l) = limit {
if count >= l {
break;
}
}
}
}
Ok(count)
}
pub fn keys_for_each<K, F>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
mut f: F,
) -> Result<usize, Error>
where
K: IntoBytes,
F: FnMut(&Bytes) -> bool,
{
if self.done == true {
return Err(Error::TxClosed);
}
let mut count = 0;
let beg = &rng.start.into_bytes();
let end = &rng.end.into_bytes();
let mut skip = skip.unwrap_or_default();
if self.write && self.mode >= IsolationLevel::SerializableSnapshotIsolation {
self.track_scan_range(beg, end);
}
if self.database.transaction_merge_queue.is_empty()
&& self.writeset.range::<Bytes, _>(beg..end).next().is_none()
{
let version = self.version;
for_each_in_range(&self.database.datastore, beg, end, |k, v| {
if !v.exists_version(version) {
return ControlFlow::Continue(());
}
if skip > 0 {
skip -= 1;
return ControlFlow::Continue(());
}
count += 1;
if !f(k) {
return ControlFlow::Break(());
}
if let Some(l) = limit {
if count >= l {
return ControlFlow::Break(());
}
}
ControlFlow::Continue(())
});
return Ok(count);
}
let join_iter = Box::new(MergeQueueIter::new(
self.snapshot_merge_sources(self.version),
beg.clone(),
end.clone(),
Direction::Forward,
));
let mut iter = MergeIterator::new(
TreeIterState::build(&self.database.datastore, beg, end, Direction::Forward),
join_iter,
self.writeset.range::<Bytes, _>(beg..end),
Direction::Forward,
self.version,
skip,
);
while let Some((key, exists)) = iter.next_key() {
if exists {
count += 1;
if !f(&key) {
break;
}
if let Some(l) = limit {
if count >= l {
break;
}
}
}
}
Ok(count)
}
pub fn scan_into<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
buf: &mut Vec<(Bytes, Bytes)>,
) -> Result<(), Error>
where
K: IntoBytes,
{
buf.clear();
if self.done == true {
return Err(Error::TxClosed);
}
let beg = &rng.start.into_bytes();
let end = &rng.end.into_bytes();
let mut skip = skip.unwrap_or_default();
if self.write && self.mode >= IsolationLevel::SerializableSnapshotIsolation {
self.track_scan_range(beg, end);
}
if self.database.transaction_merge_queue.is_empty()
&& self.writeset.range::<Bytes, _>(beg..end).next().is_none()
{
let version = self.version;
for_each_in_range(&self.database.datastore, beg, end, |k, v| {
let Some(value) = v.fetch_version(version) else {
return ControlFlow::Continue(());
};
if skip > 0 {
skip -= 1;
return ControlFlow::Continue(());
}
buf.push((k.clone(), value));
if let Some(l) = limit {
if buf.len() >= l {
return ControlFlow::Break(());
}
}
ControlFlow::Continue(())
});
return Ok(());
}
let join_iter = Box::new(MergeQueueIter::new(
self.snapshot_merge_sources(self.version),
beg.clone(),
end.clone(),
Direction::Forward,
));
let iter = MergeIterator::new(
TreeIterState::build(&self.database.datastore, beg, end, Direction::Forward),
join_iter,
self.writeset.range::<Bytes, _>(beg..end),
Direction::Forward,
self.version,
skip,
);
for (key, val) in iter {
if let Some(val) = val {
buf.push((key, val));
if let Some(l) = limit {
if buf.len() >= l {
break;
}
}
}
}
Ok(())
}
pub fn keys_into<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
buf: &mut Vec<Bytes>,
) -> Result<(), Error>
where
K: IntoBytes,
{
buf.clear();
if self.done == true {
return Err(Error::TxClosed);
}
let beg = &rng.start.into_bytes();
let end = &rng.end.into_bytes();
let mut skip = skip.unwrap_or_default();
if self.write && self.mode >= IsolationLevel::SerializableSnapshotIsolation {
self.track_scan_range(beg, end);
}
if self.database.transaction_merge_queue.is_empty()
&& self.writeset.range::<Bytes, _>(beg..end).next().is_none()
{
let version = self.version;
for_each_in_range(&self.database.datastore, beg, end, |k, v| {
if !v.exists_version(version) {
return ControlFlow::Continue(());
}
if skip > 0 {
skip -= 1;
return ControlFlow::Continue(());
}
buf.push(k.clone());
if let Some(l) = limit {
if buf.len() >= l {
return ControlFlow::Break(());
}
}
ControlFlow::Continue(())
});
return Ok(());
}
let join_iter = Box::new(MergeQueueIter::new(
self.snapshot_merge_sources(self.version),
beg.clone(),
end.clone(),
Direction::Forward,
));
let mut iter = MergeIterator::new(
TreeIterState::build(&self.database.datastore, beg, end, Direction::Forward),
join_iter,
self.writeset.range::<Bytes, _>(beg..end),
Direction::Forward,
self.version,
skip,
);
while let Some((key, exists)) = iter.next_key() {
if exists {
buf.push(key);
if let Some(l) = limit {
if buf.len() >= l {
break;
}
}
}
}
Ok(())
}
#[inline(always)]
fn track_scan_range(&self, beg: &Bytes, end: &Bytes) {
match self.scanset.range::<Bytes, _>(..=beg).next_back() {
None => {
self.scanset.insert(beg.clone(), ArcSwap::from_pointee(end.clone()));
}
Some(entry) if **entry.value().load() < *beg => {
self.scanset.insert(beg.clone(), ArcSwap::from_pointee(end.clone()));
}
Some(entry) if **entry.value().load() < *end => {
entry.value().store(Arc::new(end.clone()));
}
_ => (),
}
}
#[inline]
fn snapshot_merge_sources(&self, version: u64) -> Vec<Arc<Merge>> {
self.database
.transaction_merge_queue
.range(..=version)
.rev()
.map(|e| e.value().clone())
.collect()
}
fn total_any<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
direction: Direction,
version: u64,
) -> Result<usize, Error>
where
K: IntoBytes,
{
if self.done == true {
return Err(Error::TxClosed);
}
let mut res = 0;
let beg = &rng.start.into_bytes();
let end = &rng.end.into_bytes();
let mut skip = skip.unwrap_or_default();
if self.write && self.mode >= IsolationLevel::SerializableSnapshotIsolation {
if version == self.version {
self.track_scan_range(beg, end);
}
}
if self.database.transaction_merge_queue.is_empty()
&& self.writeset.range::<Bytes, _>(beg..end).next().is_none()
{
match direction {
Direction::Forward => {
for_each_in_range(&self.database.datastore, beg, end, |_, v| {
if !v.exists_version(version) {
return ControlFlow::Continue(());
}
if skip > 0 {
skip -= 1;
return ControlFlow::Continue(());
}
res += 1;
if let Some(l) = limit {
if res >= l {
return ControlFlow::Break(());
}
}
ControlFlow::Continue(())
});
}
Direction::Reverse => {
let mut range = self
.database
.datastore
.range_rev(Bound::Included(beg), Bound::Excluded(end));
while let Some((_, v)) = range.next() {
if !v.exists_version(version) {
continue;
}
if skip > 0 {
skip -= 1;
continue;
}
res += 1;
if let Some(l) = limit {
if res >= l {
break;
}
}
}
}
}
return Ok(res);
}
let join_iter = Box::new(MergeQueueIter::new(
self.snapshot_merge_sources(version),
beg.clone(),
end.clone(),
direction,
));
let mut iter = MergeIterator::new(
TreeIterState::build(&self.database.datastore, beg, end, direction),
join_iter,
self.writeset.range::<Bytes, _>(beg..end),
direction,
version,
skip,
);
while let Some(exists) = iter.next_count() {
if exists {
res += 1;
if let Some(l) = limit {
if res >= l {
break;
}
}
}
}
Ok(res)
}
fn keys_any<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
direction: Direction,
version: u64,
) -> Result<Vec<Bytes>, Error>
where
K: IntoBytes,
{
if self.done == true {
return Err(Error::TxClosed);
}
let mut res = match limit {
Some(l) => Vec::with_capacity(l.min(10_000)),
None => Vec::new(),
};
let beg = &rng.start.into_bytes();
let end = &rng.end.into_bytes();
let mut skip = skip.unwrap_or_default();
if self.write && self.mode >= IsolationLevel::SerializableSnapshotIsolation {
if version == self.version {
self.track_scan_range(beg, end);
}
}
if self.database.transaction_merge_queue.is_empty()
&& self.writeset.range::<Bytes, _>(beg..end).next().is_none()
{
match direction {
Direction::Forward => {
for_each_in_range(&self.database.datastore, beg, end, |k, v| {
if !v.exists_version(version) {
return ControlFlow::Continue(());
}
if skip > 0 {
skip -= 1;
return ControlFlow::Continue(());
}
res.push(k.clone());
if let Some(l) = limit {
if res.len() >= l {
return ControlFlow::Break(());
}
}
ControlFlow::Continue(())
});
}
Direction::Reverse => {
let mut range = self
.database
.datastore
.range_rev(Bound::Included(beg), Bound::Excluded(end));
while let Some((k, v)) = range.next() {
if !v.exists_version(version) {
continue;
}
if skip > 0 {
skip -= 1;
continue;
}
res.push(k.clone());
if let Some(l) = limit {
if res.len() >= l {
break;
}
}
}
}
}
return Ok(res);
}
let join_iter = Box::new(MergeQueueIter::new(
self.snapshot_merge_sources(version),
beg.clone(),
end.clone(),
direction,
));
let mut iter = MergeIterator::new(
TreeIterState::build(&self.database.datastore, beg, end, direction),
join_iter,
self.writeset.range::<Bytes, _>(beg..end),
direction,
version,
skip,
);
while let Some((key, exists)) = iter.next_key() {
if exists {
res.push(key);
if let Some(l) = limit {
if res.len() >= l {
break;
}
}
}
}
Ok(res)
}
fn scan_any<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
direction: Direction,
version: u64,
) -> Result<Vec<(Bytes, Bytes)>, Error>
where
K: IntoBytes,
{
if self.done == true {
return Err(Error::TxClosed);
}
let mut res = match limit {
Some(l) => Vec::with_capacity(l.min(10_000)),
None => Vec::new(),
};
let beg = &rng.start.into_bytes();
let end = &rng.end.into_bytes();
let mut skip = skip.unwrap_or_default();
if self.write && self.mode >= IsolationLevel::SerializableSnapshotIsolation {
if version == self.version {
self.track_scan_range(beg, end);
}
}
if self.database.transaction_merge_queue.is_empty()
&& self.writeset.range::<Bytes, _>(beg..end).next().is_none()
{
match direction {
Direction::Forward => {
for_each_in_range(&self.database.datastore, beg, end, |k, v| {
let Some(value) = v.fetch_version(version) else {
return ControlFlow::Continue(());
};
if skip > 0 {
skip -= 1;
return ControlFlow::Continue(());
}
res.push((k.clone(), value));
if let Some(l) = limit {
if res.len() >= l {
return ControlFlow::Break(());
}
}
ControlFlow::Continue(())
});
}
Direction::Reverse => {
let mut range = self
.database
.datastore
.range_rev(Bound::Included(beg), Bound::Excluded(end));
while let Some((k, v)) = range.next() {
let Some(value) = v.fetch_version(version) else {
continue;
};
if skip > 0 {
skip -= 1;
continue;
}
res.push((k.clone(), value));
if let Some(l) = limit {
if res.len() >= l {
break;
}
}
}
}
}
return Ok(res);
}
let join_iter = Box::new(MergeQueueIter::new(
self.snapshot_merge_sources(version),
beg.clone(),
end.clone(),
direction,
));
let iter = MergeIterator::new(
TreeIterState::build(&self.database.datastore, beg, end, direction),
join_iter,
self.writeset.range::<Bytes, _>(beg..end),
direction,
version,
skip,
);
for (key, val) in iter {
if let Some(val) = val {
res.push((key, val));
if let Some(l) = limit {
if res.len() >= l {
break;
}
}
}
}
Ok(res)
}
fn scan_all_versions_any<K>(
&self,
rng: Range<K>,
skip: Option<usize>,
limit: Option<usize>,
version: u64,
) -> Result<Vec<(Bytes, u64, Option<Bytes>)>, Error>
where
K: IntoBytes,
{
if self.done == true {
return Err(Error::TxClosed);
}
let mut res = Vec::new();
let beg = &rng.start.into_bytes();
let end = &rng.end.into_bytes();
let skip = skip.unwrap_or_default();
if self.write && self.mode >= IsolationLevel::SerializableSnapshotIsolation {
if version == self.version {
self.track_scan_range(beg, end);
}
}
let join_iter = Box::new(MergeQueueIter::new(
self.snapshot_merge_sources(version),
beg.clone(),
end.clone(),
Direction::Forward,
));
let iter = MergeIterator::new(
TreeIterState::build(&self.database.datastore, beg, end, Direction::Forward),
join_iter,
self.writeset.range::<Bytes, _>(beg..end),
Direction::Forward,
version,
skip,
);
let mut count = 0;
for (key, _) in iter {
let mut all_versions: BTreeMap<u64, Option<Bytes>> = BTreeMap::new();
let collected: Option<Vec<(u64, Option<Bytes>)>> =
self.database.datastore.lookup(&key, |v| v.all_versions());
if let Some(versions) = collected {
for (ver, val) in versions {
if ver <= version {
all_versions.insert(ver, val);
}
}
}
if self.write {
if let Some(val) = self.writeset.get(&key) {
all_versions.insert(self.version + 1, val.clone());
}
}
for (ver, val) in all_versions {
res.push((key.clone(), ver, val));
}
count += 1;
if let Some(l) = limit {
if count >= l {
break;
}
}
}
Ok(res)
}
pub fn cursor<K>(&self, rng: Range<K>) -> Result<Cursor<'_>, Error>
where
K: IntoBytes,
{
self.cursor_at_version(rng, self.version)
}
pub fn cursor_at_version<K>(&self, rng: Range<K>, version: u64) -> Result<Cursor<'_>, Error>
where
K: IntoBytes,
{
if self.done == true {
return Err(Error::TxClosed);
}
let beg = rng.start.into_bytes();
let end = rng.end.into_bytes();
if self.write && self.mode >= IsolationLevel::SerializableSnapshotIsolation {
if version == self.version {
self.track_scan_range(&beg, &end);
}
}
Ok(Cursor::new(&self.database, &self.writeset, beg, end, version))
}
pub fn keys_iter<K>(&self, rng: Range<K>) -> Result<KeyIterator<'_>, Error>
where
K: IntoBytes,
{
let cursor = self.cursor(rng)?;
Ok(KeyIterator::new(cursor))
}
pub fn keys_iter_reverse<K>(&self, rng: Range<K>) -> Result<KeyIterator<'_>, Error>
where
K: IntoBytes,
{
let cursor = self.cursor(rng)?;
Ok(KeyIterator::new_reverse(cursor))
}
pub fn keys_iter_at_version<K>(
&self,
rng: Range<K>,
version: u64,
) -> Result<KeyIterator<'_>, Error>
where
K: IntoBytes,
{
let cursor = self.cursor_at_version(rng, version)?;
Ok(KeyIterator::new(cursor))
}
pub fn keys_iter_at_version_reverse<K>(
&self,
rng: Range<K>,
version: u64,
) -> Result<KeyIterator<'_>, Error>
where
K: IntoBytes,
{
let cursor = self.cursor_at_version(rng, version)?;
Ok(KeyIterator::new_reverse(cursor))
}
pub fn scan_iter<K>(&self, rng: Range<K>) -> Result<ScanIterator<'_>, Error>
where
K: IntoBytes,
{
let cursor = self.cursor(rng)?;
Ok(ScanIterator::new(cursor))
}
pub fn scan_iter_reverse<K>(&self, rng: Range<K>) -> Result<ScanIterator<'_>, Error>
where
K: IntoBytes,
{
let cursor = self.cursor(rng)?;
Ok(ScanIterator::new_reverse(cursor))
}
pub fn scan_iter_at_version<K>(
&self,
rng: Range<K>,
version: u64,
) -> Result<ScanIterator<'_>, Error>
where
K: IntoBytes,
{
let cursor = self.cursor_at_version(rng, version)?;
Ok(ScanIterator::new(cursor))
}
pub fn scan_iter_at_version_reverse<K>(
&self,
rng: Range<K>,
version: u64,
) -> Result<ScanIterator<'_>, Error>
where
K: IntoBytes,
{
let cursor = self.cursor_at_version(rng, version)?;
Ok(ScanIterator::new_reverse(cursor))
}
#[inline(always)]
fn fetch_in_datastore<K>(&self, key: K, version: u64) -> Option<Bytes>
where
K: IntoBytes,
{
let key = key.as_slice();
let iter = self.database.transaction_merge_queue.range(..=version);
for entry in iter.rev() {
if !entry.is_removed() {
if let Some(v) = entry.value().writeset.get(key) {
return v.clone();
}
}
}
self.database.datastore.lookup(key, |v| v.fetch_version(version)).flatten()
}
#[inline(always)]
fn exists_in_datastore<K>(&self, key: K, version: u64) -> bool
where
K: IntoBytes,
{
let key = key.as_slice();
let iter = self.database.transaction_merge_queue.range(..=version);
for entry in iter.rev() {
if !entry.is_removed() {
if let Some(v) = entry.value().writeset.get(key) {
return v.is_some();
}
}
}
self.database.datastore.lookup(key, |v| v.exists_version(version)).unwrap_or(false)
}
#[inline(always)]
fn equals_in_datastore<C, K>(&self, key: K, chk: Option<C>, version: u64) -> bool
where
K: IntoBytes,
C: IntoBytes,
{
let key = key.as_slice();
let iter = self.database.transaction_merge_queue.range(..=version);
for entry in iter.rev() {
if !entry.is_removed() {
if let Some(v) = entry.value().writeset.get(key) {
return match (chk.as_ref(), v.as_ref()) {
(Some(x), Some(y)) => x.as_slice() == y,
(None, None) => true,
_ => false,
};
}
}
}
let stored = self.database.datastore.lookup(key, |v| v.fetch_version(version)).flatten();
match (chk.as_ref(), stored.as_ref()) {
(Some(x), Some(y)) => x.as_slice() == y,
(None, None) => true,
_ => false,
}
}
#[inline(always)]
fn atomic_commit(&self, updates: Commit) -> (u64, Arc<Commit>) {
let mut spins = 0;
let id = updates.id;
let updates = Arc::new(updates);
let queue = &self.database.transaction_commit_queue;
loop {
let version = self.database.transaction_commit_id.load(Ordering::Acquire) + 1;
let entry = queue.get_or_insert_with(version, || Arc::clone(&updates));
if id == entry.value().id {
self.database.transaction_commit_id.fetch_add(1, Ordering::SeqCst);
return (version, entry.value().clone());
}
backoff(spins);
spins += 1;
}
}
#[inline(always)]
fn atomic_merge(&self, updates: Merge) -> (u64, Arc<Merge>) {
let mut spins = 0;
let id = updates.id;
let updates = Arc::new(updates);
let oracle = self.database.oracle.clone();
let queue = &self.database.transaction_merge_queue;
loop {
let mut version = oracle.current_time_ns();
let last_ts = oracle.inner.timestamp.load(Ordering::Acquire);
if version <= last_ts {
version = last_ts + 1;
}
let entry = queue.get_or_insert_with(version, || Arc::clone(&updates));
if id == entry.value().id {
oracle.inner.timestamp.fetch_max(version, Ordering::SeqCst);
return (version, entry.value().clone());
}
backoff(spins);
spins += 1;
}
}
}
#[inline(always)]
fn backoff(spins: usize) {
if spins < 10 {
std::hint::spin_loop();
} else {
#[cfg(not(target_arch = "wasm32"))]
if spins < 100 {
std::thread::yield_now();
} else {
std::thread::park_timeout(std::time::Duration::from_micros(10));
}
#[cfg(target_arch = "wasm32")]
std::hint::spin_loop();
}
}
#[cfg(test)]
mod tests {
use crate::err::Error;
use crate::Database;
use std::sync::Arc;
use std::thread;
#[test]
fn mvcc_non_conflicting_keys_should_succeed() {
let db = Database::new();
let mut tx1 = db.transaction(true);
let mut tx2 = db.transaction(true);
assert!(tx1.get("key1").unwrap().is_none());
tx1.set("key1", "value1").unwrap();
assert!(tx1.commit().is_ok());
assert!(tx2.get("key2").unwrap().is_none());
tx2.set("key2", "value2").unwrap();
assert!(tx2.commit().is_ok());
}
#[test]
fn mvcc_conflicting_blind_writes_should_error() {
let db = Database::new();
let mut tx1 = db.transaction(true);
let mut tx2 = db.transaction(true);
assert!(tx1.get("key1").unwrap().is_none());
tx1.set("key1", "value1").unwrap();
assert!(tx2.get("key1").unwrap().is_none());
tx2.set("key1", "value2").unwrap();
assert!(tx1.commit().is_ok());
assert!(tx2.commit().is_err());
}
#[test]
fn mvcc_si_conflicting_read_keys_should_succeed() {
let db = Database::new();
let mut tx1 = db.transaction(true).with_snapshot_isolation();
let mut tx2 = db.transaction(true).with_snapshot_isolation();
assert!(tx1.get("key1").unwrap().is_none());
tx1.set("key1", "value1").unwrap();
assert!(tx1.commit().is_ok());
assert!(tx2.get("key1").unwrap().is_none());
tx2.set("key2", "value2").unwrap();
assert!(tx2.commit().is_ok());
}
#[test]
fn mvcc_ssi_conflicting_read_keys_should_error() {
let db = Database::new();
let mut tx1 = db.transaction(true).with_serializable_snapshot_isolation();
let mut tx2 = db.transaction(true).with_serializable_snapshot_isolation();
assert!(tx1.get("key1").unwrap().is_none());
tx1.set("key1", "value1").unwrap();
assert!(tx1.commit().is_ok());
assert!(tx2.get("key1").unwrap().is_none());
tx2.set("key2", "value2").unwrap();
assert!(tx2.commit().is_err());
}
#[test]
fn mvcc_conflicting_write_keys_should_error() {
let db = Database::new();
let mut tx1 = db.transaction(true);
let mut tx2 = db.transaction(true);
assert!(tx1.get("key1").unwrap().is_none());
tx1.set("key1", "value1").unwrap();
assert!(tx1.commit().is_ok());
assert!(tx2.get("key1").unwrap().is_none());
tx2.set("key1", "value2").unwrap();
assert!(tx2.commit().is_err());
}
#[test]
fn mvcc_conflicting_read_deleted_keys_should_error() {
let db = Database::new();
let mut tx1 = db.transaction(true);
tx1.set("key", "value1").unwrap();
assert!(tx1.commit().is_ok());
let mut tx2 = db.transaction(true);
let mut tx3 = db.transaction(true);
assert!(tx2.get("key").unwrap().is_some());
tx2.del("key").unwrap();
assert!(tx2.commit().is_ok());
assert!(tx3.get("key").unwrap().is_some());
tx3.set("key", "value2").unwrap();
assert!(tx3.commit().is_err());
}
#[test]
fn mvcc_scan_conflicting_write_keys_should_error() {
let db = Database::new();
let mut tx1 = db.transaction(true);
tx1.set("key1", "value1").unwrap();
assert!(tx1.commit().is_ok());
let mut tx2 = db.transaction(true);
let mut tx3 = db.transaction(true);
tx2.set("key1", "value4").unwrap();
tx2.set("key2", "value2").unwrap();
tx2.set("key3", "value3").unwrap();
assert!(tx2.commit().is_ok());
let res = tx3.scan("key1".."key9", None, Some(10)).unwrap();
assert_eq!(res.len(), 1);
tx3.set("key2", "value5").unwrap();
tx3.set("key3", "value6").unwrap();
let res = tx3.scan("key1".."key9", None, Some(10)).unwrap();
assert_eq!(res.len(), 3);
assert!(tx3.commit().is_err());
}
#[test]
fn mvcc_scan_conflicting_read_deleted_keys_should_error() {
let db = Database::new();
let mut tx1 = db.transaction(true);
tx1.set("key1", "value1").unwrap();
assert!(tx1.commit().is_ok());
let mut tx2 = db.transaction(true);
let mut tx3 = db.transaction(true);
tx2.del("key1").unwrap();
assert!(tx2.commit().is_ok());
let res = tx3.scan("key1".."key9", None, Some(10)).unwrap();
assert_eq!(res.len(), 1);
tx3.set("key1", "other").unwrap();
tx3.set("key2", "value2").unwrap();
tx3.set("key3", "value3").unwrap();
let res = tx3.scan("key1".."key9", None, Some(10)).unwrap();
assert_eq!(res.len(), 3);
assert!(tx3.commit().is_err());
}
#[test]
fn mvcc_transaction_queue_correctness() {
let db = Database::new();
let mut tx1 = db.transaction(true);
tx1.set("key1", "value1").unwrap();
assert!(tx1.commit().is_ok());
std::mem::drop(tx1);
let mut tx2 = db.transaction(true);
tx2.set("key2", "value2").unwrap();
assert!(tx2.commit().is_ok());
std::mem::drop(tx2);
let mut tx3 = db.transaction(true);
tx3.set("key", "value").unwrap();
let mut tx4 = db.transaction(true);
tx4.set("key", "value").unwrap();
assert!(tx3.commit().is_ok());
assert!(tx4.commit().is_err());
std::mem::drop(tx3);
std::mem::drop(tx4);
let mut tx5 = db.transaction(true);
tx5.set("key", "other").unwrap();
let mut tx6 = db.transaction(true);
tx6.set("key", "other").unwrap();
assert!(tx5.commit().is_ok());
assert!(tx6.commit().is_err());
std::mem::drop(tx5);
std::mem::drop(tx6);
let mut tx7 = db.transaction(true);
tx7.set("key", "change").unwrap();
let mut tx8 = db.transaction(true);
tx8.set("key", "change").unwrap();
assert!(tx7.commit().is_ok());
assert!(tx7.commit().is_err());
std::mem::drop(tx7);
std::mem::drop(tx8);
}
#[test]
fn test_snapshot_isolation() {
let db = Database::new();
let key1 = "key1";
let key2 = "key2";
let value1 = "baz";
let value2 = "bar";
{
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
txn1.set(key1, value1).unwrap();
txn1.commit().unwrap();
assert!(txn2.get(key2).unwrap().is_none());
txn2.set(key2, value2).unwrap();
txn2.commit().unwrap();
}
{
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
txn1.set(key1, value1).unwrap();
txn1.commit().unwrap();
assert!(txn2.get(key1).is_ok());
txn2.set(key1, value2).unwrap();
assert!(txn2.commit().is_err());
}
{
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
txn1.set(key1, value1).unwrap();
txn2.set(key1, value2).unwrap();
txn1.commit().unwrap();
assert!(txn2.commit().is_err());
}
{
let key = "key3";
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
txn1.set(key, value1).unwrap();
txn1.commit().unwrap();
assert!(txn2.get(key).unwrap().is_none());
txn2.set(key, value1).unwrap();
assert!(txn2.commit().is_err());
}
{
let key = "key4";
let mut txn1 = db.transaction(true);
txn1.set(key, value1).unwrap();
txn1.commit().unwrap();
let mut txn2 = db.transaction(true);
let mut txn3 = db.transaction(true);
txn2.del(key).unwrap();
assert!(txn2.commit().is_ok());
assert!(txn3.get(key).is_ok());
txn3.set(key, value2).unwrap();
assert!(txn3.commit().is_err());
}
}
#[test]
fn test_snapshot_isolation_scan() {
let db = Database::new();
let key1 = "key1";
let key2 = "key2";
let key3 = "key3";
let key4 = "key4";
let value1 = "value1";
let value2 = "value2";
let value3 = "value3";
let value4 = "value4";
let value5 = "value5";
let value6 = "value6";
{
let mut txn1 = db.transaction(true);
txn1.set(key1, value1).unwrap();
txn1.commit().unwrap();
let mut txn2 = db.transaction(true);
let mut txn3 = db.transaction(true);
txn2.set(key1, value4).unwrap();
txn2.set(key2, value2).unwrap();
txn2.set(key3, value3).unwrap();
txn2.commit().unwrap();
let range = "key1".."key4";
let results = txn3.scan(range, None, Some(10)).unwrap();
assert_eq!(results.len(), 1);
txn3.set(key2, value5).unwrap();
txn3.set(key3, value6).unwrap();
assert!(txn3.commit().is_err());
}
{
let mut txn1 = db.transaction(true);
txn1.set(key4, value1).unwrap();
txn1.commit().unwrap();
let mut txn2 = db.transaction(true);
let mut txn3 = db.transaction(true);
txn2.del(key4).unwrap();
txn2.commit().unwrap();
let range = "key1".."key5";
let _ = txn3.scan(range, None, Some(10)).unwrap();
txn3.set(key4, value2).unwrap();
assert!(txn3.commit().is_err());
}
}
fn new_db() -> Database {
let db = Database::new();
let key1 = "k1";
let key2 = "k2";
let value1 = "v1";
let value2 = "v2";
let mut txn = db.transaction(true);
txn.set(key1, value1).unwrap();
txn.set(key2, value2).unwrap();
txn.commit().unwrap();
db
}
#[test]
fn test_anomaly_g0() {
let db = new_db();
let key1 = "k1";
let key2 = "k2";
let value3 = "v3";
let value4 = "v4";
let value5 = "v5";
let value6 = "v6";
{
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
assert!(txn1.get(key1).is_ok());
assert!(txn1.get(key2).is_ok());
assert!(txn2.get(key1).is_ok());
assert!(txn2.get(key2).is_ok());
txn1.set(key1, value3).unwrap();
txn2.set(key1, value4).unwrap();
txn1.set(key2, value5).unwrap();
txn1.commit().unwrap();
txn2.set(key2, value6).unwrap();
assert!(txn2.commit().is_err());
}
{
let txn3 = db.transaction(true);
let val1 = txn3.get(key1).unwrap().unwrap();
assert_eq!(val1, value3);
let val2 = txn3.get(key2).unwrap().unwrap();
assert_eq!(val2, value5);
}
}
#[test]
fn test_anomaly_g1a() {
let db = new_db();
let key1 = "k1";
let key2 = "k2";
let value1 = "v1";
let value2 = "v2";
let value3 = "v3";
{
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
assert!(txn1.get(key1).is_ok());
txn1.set(key1, value3).unwrap();
let range = "k1".."k3";
let res = txn2.scan(range.clone(), None, None).expect("Scan should succeed");
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, value1);
drop(txn1);
let res = txn2.scan(range, None, None).expect("Scan should succeed");
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, value1);
txn2.commit().unwrap();
}
{
let txn3 = db.transaction(true);
let val1 = txn3.get(key1).unwrap().unwrap();
assert_eq!(val1, value1);
let val2 = txn3.get(key2).unwrap().unwrap();
assert_eq!(val2, value2);
}
}
#[test]
fn test_anomaly_g1b() {
let db = new_db();
let key1 = "k1";
let key2 = "k2";
let value1 = "v1";
let value3 = "v3";
let value4 = "v4";
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
assert!(txn1.get(key1).is_ok());
assert!(txn1.get(key2).is_ok());
txn1.set(key1, value3).unwrap();
let range = "k1".."k3";
let res = txn2.scan(range.clone(), None, None).expect("Scan should succeed");
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, value1);
txn1.set(key1, value4).unwrap();
txn1.commit().unwrap();
let res = txn2.scan(range, None, None).expect("Scan should succeed");
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, value1);
txn2.commit().unwrap();
}
#[test]
fn test_anomaly_pmp() {
let db = new_db();
let key3 = "k3";
let value1 = "v1";
let value2 = "v2";
let value3 = "v3";
let txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
let range = "k1".."k3";
let res = txn1.scan(range.clone(), None, None).expect("Scan should succeed");
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, value1);
assert_eq!(res[1].1, value2);
txn2.set(key3, value3).unwrap();
txn2.commit().unwrap();
let range = "k1".."k3";
let res = txn1.scan(range.clone(), None, None).expect("Scan should succeed");
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, value1);
assert_eq!(res[1].1, value2);
}
#[test]
fn test_anomaly_p4() {
let db = new_db();
let key1 = "k1";
let value3 = "v3";
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
assert!(txn1.get(key1).is_ok());
assert!(txn2.get(key1).is_ok());
txn1.set(key1, value3).unwrap();
txn2.set(key1, value3).unwrap();
txn1.commit().unwrap();
assert!(txn2.commit().is_err());
}
#[test]
fn test_anomaly_g_single() {
let db = new_db();
let key1 = "k1";
let key2 = "k2";
let value1 = "v1";
let value2 = "v2";
let value3 = "v3";
let value4 = "v4";
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
assert_eq!(txn1.get(key1).unwrap().unwrap(), value1);
assert_eq!(txn2.get(key1).unwrap().unwrap(), value1);
assert_eq!(txn2.get(key2).unwrap().unwrap(), value2);
txn2.set(key1, value3).unwrap();
txn2.set(key2, value4).unwrap();
txn2.commit().unwrap();
assert_eq!(txn1.get(key2).unwrap().unwrap(), value2);
txn1.commit().unwrap();
}
#[test]
fn test_anomaly_g_single_write_1() {
let db = new_db();
let key1 = "k1";
let key2 = "k2";
let value1 = "v1";
let value2 = "v2";
let value3 = "v3";
let value4 = "v4";
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
assert_eq!(txn1.get(key1).unwrap().unwrap(), value1);
let range = "k1".."k3";
let res = txn2.scan(range.clone(), None, None).expect("Scan should succeed");
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, value1);
assert_eq!(res[1].1, value2);
txn2.set(key1, value3).unwrap();
txn2.set(key2, value4).unwrap();
txn2.commit().unwrap();
txn1.del(key2).unwrap();
assert!(txn1.get(key2).unwrap().is_none());
assert!(txn1.commit().is_err());
}
#[test]
fn test_anomaly_g_single_write_2() {
let db = new_db();
let key1 = "k1";
let key2 = "k2";
let value1 = "v1";
let value2 = "v2";
let value3 = "v3";
let value4 = "v4";
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
assert_eq!(txn1.get(key1).unwrap().unwrap(), value1);
let range = "k1".."k3";
let res = txn2.scan(range.clone(), None, None).expect("Scan should succeed");
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, value1);
assert_eq!(res[1].1, value2);
txn2.set(key1, value3).unwrap();
txn1.del(key2).unwrap();
txn2.set(key2, value4).unwrap();
drop(txn1);
txn2.commit().unwrap();
}
#[test]
fn test_anomaly_g1c() {
let db = new_db();
let key1 = "k1";
let key2 = "k2";
let value1 = "v1";
let value2 = "v2";
let value3 = "v3";
let value4 = "v4";
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
assert!(txn1.get(key1).is_ok());
assert!(txn2.get(key2).is_ok());
txn1.set(key1, value3).unwrap();
txn2.set(key2, value4).unwrap();
assert_eq!(txn1.get(key2).unwrap().unwrap(), value2);
assert_eq!(txn2.get(key1).unwrap().unwrap(), value1);
txn1.commit().unwrap();
assert!(txn2.commit().is_err());
}
#[test]
fn test_pmp_write() {
let db = new_db();
let key1 = "k1";
let key2 = "k2";
let value1 = "v1";
let value2 = "v2";
let value3 = "v3";
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
assert!(txn1.get(key1).is_ok());
txn1.set(key1, value3).unwrap();
let range = "k1".."k3";
let res = txn2.scan(range.clone(), None, None).expect("Scan should succeed");
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, value1);
assert_eq!(res[1].1, value2);
txn2.del(key2).unwrap();
txn1.commit().unwrap();
let range = "k1".."k3";
let res = txn2.scan(range.clone(), None, None).expect("Scan should succeed");
assert_eq!(res.len(), 1);
assert_eq!(res[0].1, value1);
assert!(txn2.commit().is_err());
}
#[test]
fn test_g2_item() {
let db = new_db();
let key1 = "k1";
let key2 = "k2";
let value1 = "v1";
let value2 = "v2";
let value3 = "v3";
let value4 = "v4";
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
let range = "k1".."k3";
let res = txn1.scan(range.clone(), None, None).expect("Scan should succeed");
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, value1);
assert_eq!(res[1].1, value2);
let res = txn2.scan(range.clone(), None, None).expect("Scan should succeed");
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, value1);
assert_eq!(res[1].1, value2);
txn1.set(key1, value3).unwrap();
txn2.set(key2, value4).unwrap();
txn1.commit().unwrap();
assert!(txn2.commit().is_err());
}
#[test]
fn test_g2_item_predicate() {
let db = new_db();
let key3 = "k3";
let key4 = "k4";
let key5 = "k5";
let key6 = "k6";
let key7 = "k7";
let value3 = "v3";
let value4 = "v4";
{
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
let range = "k1".."k4";
txn1.scan(range.clone(), None, None).expect("Scan should succeed");
txn2.scan(range.clone(), None, None).expect("Scan should succeed");
txn1.set(key3, value3).unwrap();
txn2.set(key4, value4).unwrap();
txn1.commit().unwrap();
assert!(txn2.commit().is_err());
}
{
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
let range = "k1".."k3";
txn1.scan(range.clone(), None, None).expect("Scan should succeed");
txn2.scan(range.clone(), None, None).expect("Scan should succeed");
txn1.set(key4, value3).unwrap();
txn2.set(key5, value4).unwrap();
txn1.commit().unwrap();
txn2.commit().unwrap();
}
{
let mut txn1 = db.transaction(true);
let mut txn2 = db.transaction(true);
let range = "k1".."k7";
txn1.scan(range.clone(), None, None).expect("Scan should succeed");
let range = "k3".."k7";
txn2.scan(range.clone(), None, None).expect("Scan should succeed");
txn1.set(key6, value3).unwrap();
txn2.set(key7, value4).unwrap();
txn1.commit().unwrap();
assert!(txn2.commit().is_err());
}
}
#[test]
fn test_range_scan() {
let db = Database::new();
let mut txn1 = db.transaction(true);
txn1.set("a", "1").unwrap();
txn1.set("b", "2").unwrap();
txn1.set("c", "3").unwrap();
txn1.set("d", "4").unwrap();
txn1.set("e", "5").unwrap();
txn1.commit().unwrap();
let txn2 = db.transaction(false);
let res = txn2.scan("b".."d", None, None).unwrap();
assert_eq!(res.len(), 2);
assert_eq!(res[0].0.as_ref(), b"b");
assert_eq!(res[1].0.as_ref(), b"c");
let res = txn2.scan("a".."f", Some(1), None).unwrap();
assert_eq!(res.len(), 4);
assert_eq!(res[0].0.as_ref(), b"b");
let res = txn2.scan("a".."f", None, Some(3)).unwrap();
assert_eq!(res.len(), 3);
assert_eq!(res[2].0.as_ref(), b"c");
let res = txn2.scan("a".."f", Some(2), Some(2)).unwrap();
assert_eq!(res.len(), 2);
assert_eq!(res[0].0.as_ref(), b"c");
assert_eq!(res[1].0.as_ref(), b"d");
let res = txn2.scan_reverse("b".."e", None, None).unwrap();
assert_eq!(res.len(), 3);
assert_eq!(res[0].0.as_ref(), b"d");
assert_eq!(res[1].0.as_ref(), b"c");
assert_eq!(res[2].0.as_ref(), b"b");
let res = txn2.scan("x".."z", None, None).unwrap();
assert_eq!(res.len(), 0);
let res = txn2.scan("c".."d", None, None).unwrap();
assert_eq!(res.len(), 1);
assert_eq!(res[0].0.as_ref(), b"c");
}
#[test]
fn test_range_scan_with_merge_queue() {
use std::sync::atomic::Ordering;
let db = Database::new();
db.background_threads_enabled.store(false, Ordering::Relaxed);
let mut txn1 = db.transaction(true);
txn1.set("a", "1").unwrap();
txn1.set("c", "3").unwrap();
txn1.set("e", "5").unwrap();
txn1.commit().unwrap();
assert!(db.transaction_merge_queue.is_empty(), "Data should be in merge queue");
assert!(!db.datastore.is_empty(), "Data should NOT be in datastore yet");
let mut txn2 = db.transaction(true);
txn2.set("b", "2").unwrap();
txn2.set("d", "4").unwrap();
let res = txn2.scan("a".."f", None, None).unwrap();
assert_eq!(res.len(), 5);
assert_eq!(res[0].0.as_ref(), b"a"); assert_eq!(res[1].0.as_ref(), b"b"); assert_eq!(res[2].0.as_ref(), b"c"); assert_eq!(res[3].0.as_ref(), b"d"); assert_eq!(res[4].0.as_ref(), b"e"); }
#[test]
fn test_range_scan_with_deletions_in_merge_queue() {
let db = Database::new();
let mut txn1 = db.transaction(true);
txn1.set("a", "1").unwrap();
txn1.set("b", "2").unwrap();
txn1.set("c", "3").unwrap();
txn1.set("d", "4").unwrap();
txn1.commit().unwrap();
let mut txn2 = db.transaction(true);
txn2.del("b").unwrap();
txn2.del("d").unwrap();
txn2.commit().unwrap();
let txn3 = db.transaction(false);
let res = txn3.scan("a".."e", None, None).unwrap();
assert_eq!(res.len(), 2);
assert_eq!(res[0].0.as_ref(), b"a");
assert_eq!(res[1].0.as_ref(), b"c");
}
#[test]
fn test_range_scan_with_overwrites_in_merge_queue() {
let db = Database::new();
let mut txn1 = db.transaction(true);
txn1.set("a", "1").unwrap();
txn1.set("b", "2").unwrap();
txn1.set("c", "3").unwrap();
txn1.commit().unwrap();
let mut txn2 = db.transaction(true);
txn2.set("a", "10").unwrap();
txn2.set("b", "20").unwrap();
txn2.commit().unwrap();
let txn3 = db.transaction(false);
let res = txn3.scan("a".."d", None, None).unwrap();
assert_eq!(res.len(), 3);
assert_eq!(res[0].0.as_ref(), b"a"); assert_eq!(res[0].1.as_ref(), b"10");
assert_eq!(res[1].0.as_ref(), b"b"); assert_eq!(res[1].1.as_ref(), b"20");
assert_eq!(res[2].0.as_ref(), b"c"); }
#[test]
fn test_range_scan_boundary_conditions() {
let db = Database::new();
let mut txn1 = db.transaction(true);
txn1.set("a", "1").unwrap();
txn1.set("aa", "2").unwrap();
txn1.set("ab", "3").unwrap();
txn1.set("b", "4").unwrap();
txn1.set("ba", "5").unwrap();
txn1.commit().unwrap();
let txn2 = db.transaction(false);
let res = txn2.scan("aa".."b", None, None).unwrap();
assert_eq!(res.len(), 2);
assert_eq!(res[0].0.as_ref(), b"aa");
assert_eq!(res[0].1.as_ref(), b"2");
assert_eq!(res[1].0.as_ref(), b"ab");
assert_eq!(res[1].1.as_ref(), b"3");
let res = txn2.scan("a".."aa", None, None).unwrap();
assert_eq!(res.len(), 1);
assert_eq!(res[0].0.as_ref(), b"a");
let res = txn2.scan("aaa".."az", None, None).unwrap();
assert_eq!(res.len(), 1);
assert_eq!(res[0].0.as_ref(), b"ab");
assert_eq!(res[0].1.as_ref(), b"3");
}
#[test]
fn test_range_scan_with_concurrent_transactions() {
let db = Database::new();
let mut txn1 = db.transaction(true);
txn1.set("a", "1").unwrap();
txn1.set("c", "3").unwrap();
txn1.set("e", "5").unwrap();
txn1.commit().unwrap();
let mut txn2 = db.transaction(true);
txn2.set("b", "2").unwrap(); txn2.set("c", "30").unwrap(); txn2.del("e").unwrap(); txn2.set("d", "4").unwrap();
let res = txn2.scan("a".."f", None, None).unwrap();
assert_eq!(res.len(), 4);
assert_eq!(res[0].0.as_ref(), b"a"); assert_eq!(res[1].0.as_ref(), b"b"); assert_eq!(res[2].0.as_ref(), b"c"); assert_eq!(res[2].1.as_ref(), b"30");
assert_eq!(res[3].0.as_ref(), b"d"); }
#[test]
fn test_range_scan_keys_only() {
let db = Database::new();
let mut txn1 = db.transaction(true);
txn1.set("a", "1").unwrap();
txn1.set("b", "2").unwrap();
txn1.set("c", "3").unwrap();
txn1.commit().unwrap();
let txn2 = db.transaction(false);
let keys = txn2.keys("a".."d", None, None).unwrap();
assert_eq!(keys.len(), 3);
assert_eq!(keys, vec!["a", "b", "c"]);
let keys = txn2.keys_reverse("a".."d", None, None).unwrap();
assert_eq!(keys.len(), 3);
assert_eq!(keys, vec!["c", "b", "a"]);
}
#[test]
fn test_range_scan_total_count() {
let db = Database::new();
let mut txn1 = db.transaction(true);
txn1.set("key00", "val0").unwrap();
txn1.set("key01", "val1").unwrap();
txn1.set("key02", "val2").unwrap();
txn1.set("key03", "val3").unwrap();
txn1.set("key04", "val4").unwrap();
txn1.set("key05", "val5").unwrap();
txn1.set("key06", "val6").unwrap();
txn1.set("key07", "val7").unwrap();
txn1.set("key08", "val8").unwrap();
txn1.set("key09", "val9").unwrap();
txn1.commit().unwrap();
let txn2 = db.transaction(false);
let count = txn2.total("key00".."key99", None, None).unwrap();
assert_eq!(count, 10);
let count = txn2.total("key00".."key99", Some(3), None).unwrap();
assert_eq!(count, 7);
let count = txn2.total("key00".."key99", None, Some(5)).unwrap();
assert_eq!(count, 5);
let count = txn2.total("key03".."key07", None, None).unwrap();
assert_eq!(count, 4);
}
#[test]
fn test_atomic_transaction_id_generation() {
use std::sync::{Arc, Barrier};
use std::thread;
let db = Arc::new(Database::default());
let num_threads = 100;
let commits_per_thread = 50;
let barrier = Arc::new(Barrier::new(num_threads));
let mut handles = vec![];
for _ in 0..num_threads {
let db = Arc::clone(&db);
let barrier = Arc::clone(&barrier);
let handle = thread::spawn(move || {
barrier.wait();
for i in 0..commits_per_thread {
let mut tx = db.transaction(true);
tx.set(format!("key_{}", i), format!("value_{}", i)).unwrap();
match tx.commit() {
Ok(_) => {
}
Err(_) => {
}
}
let mut tx2 = db.transaction(true);
tx2.set(format!("merge_key_{}", i), format!("merge_value_{}", i)).unwrap();
let _ = tx2.commit();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let tx = db.transaction(false);
let mut count = 0;
for _ in tx.scan("key_0".to_string().."key_999".to_string(), None, None).unwrap() {
count += 1;
}
assert!(count > 0, "Should have at least some successful commits");
let mut tx = db.transaction(true);
tx.set("final_key", "final_value".to_string()).unwrap();
tx.commit().expect("Final commit should succeed, database should be healthy");
}
#[test]
fn test_atomic_commit_ordering() {
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
let db = Arc::new(Database::default());
let num_threads = 50;
let barrier = Arc::new(Barrier::new(num_threads));
let mut handles = vec![];
for thread_id in 0..num_threads {
let db = Arc::clone(&db);
let barrier = Arc::clone(&barrier);
let handle = thread::spawn(move || {
barrier.wait();
for i in 0..10 {
let mut tx = db.transaction(true);
let key = format!("thread_{}_key_{}", thread_id, i);
let value = format!("thread_{}_value_{}", thread_id, i);
tx.set(key.clone(), value.clone()).unwrap();
let _ = tx.commit();
thread::sleep(Duration::from_micros(thread_id as u64));
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let tx = db.transaction(false);
let mut count = 0;
for _ in tx.scan("".to_string().."~".to_string(), None, None).unwrap() {
count += 1;
}
assert!(count > 0, "Should have at least some successful commits");
assert!(count <= (num_threads * 10), "Should not exceed maximum possible commits");
let mut final_tx = db.transaction(true);
final_tx.set("ordering_test_complete", "true".to_string()).unwrap();
final_tx.commit().expect("Final transaction should succeed");
}
#[test]
fn test_savepoints() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.set("initial_key", "initial_value").unwrap();
tx.set_savepoint().unwrap();
tx.set("savepoint_key", "savepoint_value").unwrap();
tx.set("another_key", "another_value").unwrap();
assert_eq!(tx.get("initial_key").unwrap().as_deref(), Some(b"initial_value" as &[u8]));
assert_eq!(tx.get("savepoint_key").unwrap().as_deref(), Some(b"savepoint_value" as &[u8]));
assert_eq!(tx.get("another_key").unwrap().as_deref(), Some(b"another_value" as &[u8]));
tx.rollback_to_savepoint().unwrap();
assert_eq!(tx.get("initial_key").unwrap().as_deref(), Some(b"initial_value" as &[u8]));
assert_eq!(tx.get("savepoint_key").unwrap(), None);
assert_eq!(tx.get("another_key").unwrap(), None);
tx.set("new_key", "new_value").unwrap();
assert_eq!(tx.get("new_key").unwrap().as_deref(), Some(b"new_value" as &[u8]));
tx.set("base", "value").unwrap();
tx.set_savepoint().unwrap();
tx.set("level1_key1", "level1_value1").unwrap();
tx.set("level1_key2", "level1_value2").unwrap();
tx.set_savepoint().unwrap();
tx.set("level2_key1", "level2_value1").unwrap();
tx.set("level1_key1", "modified_at_level2").unwrap();
tx.set_savepoint().unwrap();
tx.set("level3_key1", "level3_value1").unwrap();
assert_eq!(tx.get("base").unwrap().as_deref(), Some(b"value" as &[u8]));
assert_eq!(tx.get("level1_key1").unwrap().as_deref(), Some(b"modified_at_level2" as &[u8]));
assert_eq!(tx.get("level1_key2").unwrap().as_deref(), Some(b"level1_value2" as &[u8]));
assert_eq!(tx.get("level2_key1").unwrap().as_deref(), Some(b"level2_value1" as &[u8]));
assert_eq!(tx.get("level3_key1").unwrap().as_deref(), Some(b"level3_value1" as &[u8]));
tx.rollback_to_savepoint().unwrap();
assert_eq!(tx.get("base").unwrap().as_deref(), Some(b"value" as &[u8]));
assert_eq!(tx.get("level1_key1").unwrap().as_deref(), Some(b"modified_at_level2" as &[u8]));
assert_eq!(tx.get("level1_key2").unwrap().as_deref(), Some(b"level1_value2" as &[u8]));
assert_eq!(tx.get("level2_key1").unwrap().as_deref(), Some(b"level2_value1" as &[u8]));
assert_eq!(tx.get("level3_key1").unwrap(), None);
tx.set("level2_new", "after_rollback").unwrap();
tx.rollback_to_savepoint().unwrap();
assert_eq!(tx.get("base").unwrap().as_deref(), Some(b"value" as &[u8]));
assert_eq!(tx.get("level1_key1").unwrap().as_deref(), Some(b"level1_value1" as &[u8])); assert_eq!(tx.get("level1_key2").unwrap().as_deref(), Some(b"level1_value2" as &[u8]));
assert_eq!(tx.get("level2_key1").unwrap(), None); assert_eq!(tx.get("level2_new").unwrap(), None); assert_eq!(tx.get("level3_key1").unwrap(), None);
tx.set("final", "committed_data").unwrap();
tx.commit().unwrap();
let tx_verify = db.transaction(false);
assert_eq!(
tx_verify.get("initial_key").unwrap().as_deref(),
Some(b"initial_value" as &[u8])
);
assert_eq!(tx_verify.get("new_key").unwrap().as_deref(), Some(b"new_value" as &[u8]));
assert_eq!(tx_verify.get("savepoint_key").unwrap(), None);
assert_eq!(tx_verify.get("another_key").unwrap(), None);
assert_eq!(tx_verify.get("base").unwrap().as_deref(), Some(b"value" as &[u8]));
assert_eq!(
tx_verify.get("level1_key1").unwrap().as_deref(),
Some(b"level1_value1" as &[u8])
);
assert_eq!(
tx_verify.get("level1_key2").unwrap().as_deref(),
Some(b"level1_value2" as &[u8])
);
assert_eq!(tx_verify.get("final").unwrap().as_deref(), Some(b"committed_data" as &[u8]));
assert_eq!(tx_verify.get("level2_key1").unwrap(), None);
assert_eq!(tx_verify.get("level2_new").unwrap(), None);
assert_eq!(tx_verify.get("level3_key1").unwrap(), None);
}
#[test]
fn test_scan_all_versions() {
let db = Database::new();
let mut tx1 = db.transaction(true);
tx1.set("key1", "v1").unwrap();
tx1.set("key2", "v1").unwrap();
tx1.set("key3", "v1").unwrap();
tx1.commit().unwrap();
let mut tx2 = db.transaction(true);
tx2.set("key1", "v2").unwrap();
tx2.set("key2", "v2").unwrap();
tx2.commit().unwrap();
let mut tx3 = db.transaction(true);
tx3.set("key1", "v3").unwrap();
tx3.del("key3").unwrap();
tx3.commit().unwrap();
let tx4 = db.transaction(false);
let results = tx4.scan_all_versions("key0".."key9", None, None).unwrap();
let key1_versions: Vec<_> =
results.iter().filter(|(k, _, _)| k.as_ref() == b"key1").collect();
let key2_versions: Vec<_> =
results.iter().filter(|(k, _, _)| k.as_ref() == b"key2").collect();
let key3_versions: Vec<_> =
results.iter().filter(|(k, _, _)| k.as_ref() == b"key3").collect();
assert_eq!(key1_versions.len(), 3, "key1 should have 3 versions");
assert_eq!(key2_versions.len(), 2, "key2 should have 2 versions");
assert_eq!(key3_versions.len(), 2, "key3 should have 2 versions (including delete)");
assert_eq!(key1_versions[0].2.as_deref(), Some(b"v1" as &[u8]));
assert_eq!(key1_versions[1].2.as_deref(), Some(b"v2" as &[u8]));
assert_eq!(key1_versions[2].2.as_deref(), Some(b"v3" as &[u8]));
assert_eq!(key2_versions[0].2.as_deref(), Some(b"v1" as &[u8]));
assert_eq!(key2_versions[1].2.as_deref(), Some(b"v2" as &[u8]));
assert_eq!(key3_versions[0].2.as_deref(), Some(b"v1" as &[u8]));
assert_eq!(key3_versions[1].2, None);
let tx5 = db.transaction(false);
let results = tx5.scan_all_versions("key0".."key9", Some(1), None).unwrap();
let has_key1 = results.iter().any(|(k, _, _)| k.as_ref() == b"key1");
let has_key2 = results.iter().any(|(k, _, _)| k.as_ref() == b"key2");
let has_key3 = results.iter().any(|(k, _, _)| k.as_ref() == b"key3");
assert!(!has_key1, "key1 should be skipped");
assert!(has_key2, "key2 should be included");
assert!(has_key3, "key3 should be included");
let key2_versions: Vec<_> =
results.iter().filter(|(k, _, _)| k.as_ref() == b"key2").collect();
assert_eq!(key2_versions.len(), 2, "key2 should still have all 2 versions");
let tx6 = db.transaction(false);
let results = tx6.scan_all_versions("key0".."key9", None, Some(2)).unwrap();
let has_key1 = results.iter().any(|(k, _, _)| k.as_ref() == b"key1");
let has_key2 = results.iter().any(|(k, _, _)| k.as_ref() == b"key2");
let has_key3 = results.iter().any(|(k, _, _)| k.as_ref() == b"key3");
assert!(has_key1, "key1 should be included");
assert!(has_key2, "key2 should be included");
assert!(!has_key3, "key3 should not be included (limit)");
let key1_versions: Vec<_> =
results.iter().filter(|(k, _, _)| k.as_ref() == b"key1").collect();
assert_eq!(key1_versions.len(), 3, "key1 should still have all 3 versions");
let tx7 = db.transaction(false);
let results = tx7.scan_all_versions("key0".."key9", Some(1), Some(1)).unwrap();
let has_key1 = results.iter().any(|(k, _, _)| k.as_ref() == b"key1");
let has_key2 = results.iter().any(|(k, _, _)| k.as_ref() == b"key2");
let has_key3 = results.iter().any(|(k, _, _)| k.as_ref() == b"key3");
assert!(!has_key1, "key1 should be skipped");
assert!(has_key2, "key2 should be included");
assert!(!has_key3, "key3 should not be included (limit)");
let key2_versions: Vec<_> =
results.iter().filter(|(k, _, _)| k.as_ref() == b"key2").collect();
assert_eq!(key2_versions.len(), 2, "key2 should have all 2 versions");
}
#[test]
fn test_scan_all_versions_with_writeset() {
let db = Database::new();
let mut tx1 = db.transaction(true);
tx1.set("key1", "v1").unwrap();
tx1.set("key2", "v1").unwrap();
tx1.commit().unwrap();
let mut tx2 = db.transaction(true);
tx2.set("key1", "v2").unwrap();
tx2.commit().unwrap();
let mut tx3 = db.transaction(true);
tx3.set("key1", "v3").unwrap(); tx3.set("key3", "new").unwrap();
let results = tx3.scan_all_versions("key0".."key9", None, None).unwrap();
let key1_versions: Vec<_> =
results.iter().filter(|(k, _, _)| k.as_ref() == b"key1").collect();
assert_eq!(key1_versions.len(), 3, "key1 should have 3 versions including writeset");
assert_eq!(
key1_versions[2].2.as_deref(),
Some(b"v3" as &[u8]),
"Latest version should be from writeset"
);
let key2_versions: Vec<_> =
results.iter().filter(|(k, _, _)| k.as_ref() == b"key2").collect();
assert_eq!(key2_versions.len(), 1, "key2 should have 1 version");
assert_eq!(key2_versions[0].2.as_deref(), Some(b"v1" as &[u8]));
let key3_versions: Vec<_> =
results.iter().filter(|(k, _, _)| k.as_ref() == b"key3").collect();
assert_eq!(key3_versions.len(), 1, "key3 should have 1 version from writeset");
assert_eq!(key3_versions[0].2.as_deref(), Some(b"new" as &[u8]));
tx3.cancel().unwrap();
let tx4 = db.transaction(false);
let results = tx4.scan_all_versions("key0".."key9", None, None).unwrap();
let key1_versions: Vec<_> =
results.iter().filter(|(k, _, _)| k.as_ref() == b"key1").collect();
assert_eq!(key1_versions.len(), 2, "key1 should only have 2 versions (tx3 was cancelled)");
let key3_versions: Vec<_> =
results.iter().filter(|(k, _, _)| k.as_ref() == b"key3").collect();
assert_eq!(key3_versions.len(), 0, "key3 should not exist (tx3 was cancelled)");
}
#[test]
fn test_savepoint_errors() {
let db = Database::new();
let mut tx = db.transaction(true);
assert!(matches!(tx.rollback_to_savepoint(), Err(Error::NoSavepoint)));
let mut tx_readonly = db.transaction(false);
assert!(matches!(tx_readonly.set_savepoint(), Err(Error::TxNotWritable)));
assert!(matches!(tx_readonly.rollback_to_savepoint(), Err(Error::TxNotWritable)));
let mut tx_closed = db.transaction(true);
tx_closed.cancel().unwrap();
assert!(matches!(tx_closed.set_savepoint(), Err(Error::TxClosed)));
assert!(matches!(tx_closed.rollback_to_savepoint(), Err(Error::TxClosed)));
let db_stack = Database::new();
let mut tx_stack = db_stack.transaction(true);
for i in 0..3 {
let key = format!("key{}", i);
tx_stack.set(key, "value").unwrap();
tx_stack.set_savepoint().unwrap();
}
for _rollback_count in 0..3 {
tx_stack.rollback_to_savepoint().unwrap();
}
assert!(matches!(tx_stack.rollback_to_savepoint(), Err(Error::NoSavepoint)));
}
#[test]
fn test_savepoint_stack_cleared_on_transaction_reuse() {
let db = Database::new();
let mut tx1 = db.transaction(true);
tx1.set("key1", "value1").unwrap();
tx1.set_savepoint().unwrap();
tx1.set("key2", "value2").unwrap();
tx1.set_savepoint().unwrap();
tx1.set("key3", "value3").unwrap();
assert!(!tx1.inner.as_ref().unwrap().savepoint_stack.is_empty());
tx1.commit().unwrap();
let mut tx2 = db.transaction(true);
assert!(
tx2.inner.as_ref().unwrap().savepoint_stack.is_empty(),
"Savepoint stack should be cleared when transaction is reused from pool"
);
let result = tx2.rollback_to_savepoint();
assert!(
matches!(result, Err(Error::NoSavepoint)),
"Should get NoSavepoint error on fresh transaction, got: {:?}",
result
);
tx2.cancel().unwrap();
let mut tx3 = db.transaction(true);
tx3.set("key4", "value4").unwrap();
tx3.set_savepoint().unwrap();
tx3.set_savepoint().unwrap();
tx3.set_savepoint().unwrap();
assert_eq!(tx3.inner.as_ref().unwrap().savepoint_stack.len(), 3);
tx3.cancel().unwrap();
let mut tx4 = db.transaction(true);
assert!(
tx4.inner.as_ref().unwrap().savepoint_stack.is_empty(),
"Savepoint stack should be cleared after cancel"
);
assert!(
tx4.inner.as_ref().unwrap().readset.pin().is_empty(),
"Readset should be cleared after cancel"
);
assert!(
tx4.inner.as_ref().unwrap().scanset.is_empty(),
"Scanset should be cleared after cancel"
);
tx4.cancel().unwrap();
}
#[test]
fn test_savepoints_with_scans_and_writes() {
let db = Database::new();
let mut tx_setup = db.transaction(true);
tx_setup.set("person:test", "initial").unwrap();
tx_setup.set("person:other", "other_value").unwrap();
tx_setup.commit().unwrap();
let mut tx1 = db.transaction(true);
let result = tx1.scan("person:".."person;", None, None).unwrap();
assert_eq!(result.len(), 2);
tx1.set_savepoint().unwrap();
let mut tx2 = db.transaction(true);
tx2.set("person:other", "modified").unwrap();
tx2.commit().unwrap();
tx1.set("person:test", "updated").unwrap();
let result = tx1.commit();
assert!(
matches!(result, Err(Error::KeyReadConflict)),
"Should detect scan conflict even with savepoint, got: {:?}",
result
);
}
#[test]
fn test_savepoint_rollback_preserves_earlier_scans() {
let db = Database::new();
let mut tx_setup = db.transaction(true);
tx_setup.set("key1", "value1").unwrap();
tx_setup.set("key2", "value2").unwrap();
tx_setup.commit().unwrap();
let mut tx = db.transaction(true);
let _ = tx.scan("key0".."key5", None, None).unwrap();
assert!(!tx.inner.as_ref().unwrap().scanset.is_empty());
let scanset_before_len = tx.inner.as_ref().unwrap().scanset.len();
tx.set_savepoint().unwrap();
let _ = tx.scan("key5".."key9", None, None).unwrap();
assert!(tx.inner.as_ref().unwrap().scanset.len() >= scanset_before_len);
tx.rollback_to_savepoint().unwrap();
assert_eq!(
tx.inner.as_ref().unwrap().scanset.len(),
scanset_before_len,
"Scanset should be restored to state before savepoint"
);
tx.set("key1", "updated").unwrap();
let mut tx2 = db.transaction(true);
tx2.set("key2", "concurrent").unwrap();
tx2.commit().unwrap();
let result = tx.commit();
assert!(
matches!(result, Err(Error::KeyReadConflict)),
"Should detect conflict from scan before savepoint, got: {:?}",
result
);
}
#[test]
fn test_gc_does_not_remove_active_versions() {
let db = Database::new();
for i in 0..10_000 {
let key = format!("key_{:08}", i).into_bytes();
let value = format!("value_{:08}", i).into_bytes();
let mut tx = db.transaction(true);
tx.put(key, value).unwrap();
tx.commit().unwrap();
}
for i in 0..10_000 {
let key = format!("key_{:08}", i).into_bytes();
let expected_value = format!("value_{:08}", i).into_bytes();
let mut tx = db.transaction(false);
let result = tx.get(key.clone());
assert!(result.is_ok(), "Failed to get key at index {i}: {result:?}");
let value = result.unwrap();
assert!(
value.is_some(),
"Key at index {} was not found (version was garbage collected too early)",
i
);
assert_eq!(value.unwrap(), expected_value, "Value mismatch at index {i}");
tx.cancel().unwrap();
}
}
#[test]
fn test_gc_concurrent_readers() {
use std::sync::Arc;
use std::thread;
let db = Database::new();
{
let mut tx = db.transaction(true);
for i in 0..1000 {
let key = format!("key_{:08}", i).into_bytes();
let value = format!("value_{:08}", i).into_bytes();
tx.put(key, value).unwrap();
}
tx.commit().unwrap();
}
let db = Arc::new(db);
let mut handles = vec![];
for thread_id in 0..4 {
let db = Arc::clone(&db);
let handle = thread::spawn(move || {
for i in 0..1000 {
let key = format!("key_{:08}", i).into_bytes();
let mut tx = db.transaction(false);
let result = tx.get(key.clone());
assert!(
result.is_ok(),
"Thread {thread_id} failed to get key at index {i}: {result:?}",
);
let value = result.unwrap();
assert!(value.is_some(), "Thread {thread_id} could not find key at index {i} (version was garbage collected)");
tx.cancel().unwrap();
if i % 100 == 0 {
let mut write_tx = db.transaction(true);
let update_key = format!("thread_{thread_id}_key_{i:08}").into_bytes();
let update_value = format!("updated_by_thread_{thread_id}").into_bytes();
write_tx.put(update_key, update_value).unwrap();
write_tx.commit().unwrap();
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_concurrent_write_read_merge_queue_race() {
let db = Database::new();
let db = Arc::new(db);
for iteration in 0..100 {
let key = format!("race_test_key_{}", iteration).into_bytes();
let value = format!("race_test_value_{}", iteration).into_bytes();
let db_writer = Arc::clone(&db);
let key_writer = key.clone();
let value_writer = value.clone();
let db_reader = Arc::clone(&db);
let key_reader = key.clone();
let value_reader = value.clone();
let writer = thread::spawn(move || {
let mut tx = db_writer.transaction(true);
tx.set(key_writer, value_writer).unwrap();
tx.commit().unwrap();
});
let reader = thread::spawn(move || {
thread::yield_now();
for _ in 0..10 {
let mut tx = db_reader.transaction(false);
let result = tx.get(key_reader.clone()).unwrap();
tx.cancel().unwrap();
if let Some(v) = result {
assert_eq!(v, value_reader, "Read wrong value during race");
return true;
}
thread::yield_now();
}
false });
writer.join().unwrap();
let _saw_value = reader.join().unwrap();
let mut tx = db.transaction(false);
let result = tx.get(key.clone()).unwrap();
tx.cancel().unwrap();
assert!(
result.is_some(),
"Key must be visible after writer completes (iteration {})",
iteration
);
assert_eq!(
result.unwrap(),
value,
"Value mismatch after writer completes (iteration {})",
iteration
);
}
}
#[test]
fn test_high_concurrency_merge_queue_visibility() {
let db = Database::new();
let db = Arc::new(db);
let num_threads = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(8);
let operations_per_thread = 50;
let mut handles = vec![];
for thread_id in 0..num_threads {
let db = Arc::clone(&db);
let handle = thread::spawn(move || {
for op_id in 0..operations_per_thread {
let key = format!("thread_{}_key_{}", thread_id, op_id).into_bytes();
let value = format!("thread_{}_value_{}", thread_id, op_id).into_bytes();
let mut tx = db.transaction(true);
tx.set(key, value).unwrap();
tx.commit().unwrap();
}
});
handles.push(handle);
}
for reader_id in 0..num_threads {
let db = Arc::clone(&db);
let handle = thread::spawn(move || {
for writer_id in 0..num_threads {
for op_id in 0..operations_per_thread {
let key = format!("thread_{}_key_{}", writer_id, op_id).into_bytes();
let expected_value =
format!("thread_{}_value_{}", writer_id, op_id).into_bytes();
for _ in 0..5 {
let mut tx = db.transaction(false);
if let Some(value) = tx.get(key.clone()).unwrap() {
assert_eq!(
value, expected_value,
"Reader {} saw wrong value for writer {} op {}",
reader_id, writer_id, op_id
);
}
tx.cancel().unwrap();
thread::yield_now();
}
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for thread_id in 0..num_threads {
for op_id in 0..operations_per_thread {
let key = format!("thread_{}_key_{}", thread_id, op_id).into_bytes();
let expected_value = format!("thread_{}_value_{}", thread_id, op_id).into_bytes();
let mut tx = db.transaction(false);
let result = tx.get(key.clone()).unwrap();
tx.cancel().unwrap();
assert!(
result.is_some(),
"Key from thread {} op {} not found after all threads complete",
thread_id,
op_id
);
assert_eq!(
result.unwrap(),
expected_value,
"Wrong value for thread {} op {} after all threads complete",
thread_id,
op_id
);
}
}
}
#[test]
fn test_writeset_checked_before_datastore() {
let db = Database::new();
{
let mut tx = db.transaction(true);
tx.set("key1", "value1").unwrap();
let result = tx.put("key1", "value2");
assert!(
matches!(result, Err(Error::KeyAlreadyExists)),
"put() should check writeset and fail when key exists there"
);
tx.cancel().unwrap();
}
{
let mut tx1 = db.transaction(true);
tx1.put("key2", "initial").unwrap();
tx1.commit().unwrap();
let mut tx2 = db.transaction(true);
let result = tx2.put("key2", "should_fail");
assert!(
matches!(result, Err(Error::KeyAlreadyExists)),
"put() should fail when key exists in datastore"
);
}
{
let mut tx = db.transaction(true);
tx.set("key3", "value3").unwrap();
tx.putc("key3", "new_value", Some("value3")).unwrap();
assert_eq!(tx.get("key3").unwrap().as_deref(), Some(b"new_value" as &[u8]));
tx.cancel().unwrap();
}
{
let mut tx = db.transaction(true);
tx.set("key4", "value4").unwrap();
let result = tx.putc("key4", "new_value", Some("wrong_value"));
assert!(
matches!(result, Err(Error::ValNotExpectedValue)),
"putc() should check writeset and fail when value doesn't match"
);
tx.cancel().unwrap();
}
{
let mut tx = db.transaction(true);
tx.del("key5").unwrap();
tx.putc("key5", "resurrected", None::<&str>).unwrap();
assert_eq!(tx.get("key5").unwrap().as_deref(), Some(b"resurrected" as &[u8]));
tx.cancel().unwrap();
}
{
let mut tx = db.transaction(true);
tx.set("key6", "value6").unwrap();
tx.delc("key6", Some("value6")).unwrap();
assert_eq!(tx.get("key6").unwrap(), None);
tx.cancel().unwrap();
}
{
let mut tx = db.transaction(true);
tx.set("key7", "value7").unwrap();
let result = tx.delc("key7", Some("wrong_value"));
assert!(
matches!(result, Err(Error::ValNotExpectedValue)),
"delc() should check writeset and fail when value doesn't match"
);
tx.cancel().unwrap();
}
{
let mut tx = db.transaction(true);
tx.del("key8").unwrap();
tx.delc("key8", None::<&str>).unwrap();
assert_eq!(tx.get("key8").unwrap(), None);
tx.cancel().unwrap();
}
{
let mut tx = db.transaction(true);
tx.set("complex", "v1").unwrap();
tx.set("complex", "v2").unwrap();
assert!(matches!(tx.put("complex", "v3"), Err(Error::KeyAlreadyExists)));
tx.putc("complex", "v3", Some("v2")).unwrap();
assert_eq!(tx.get("complex").unwrap().as_deref(), Some(b"v3" as &[u8]));
tx.delc("complex", Some("v3")).unwrap();
assert_eq!(tx.get("complex").unwrap(), None);
tx.putc("complex", "v4", None::<&str>).unwrap();
assert_eq!(tx.get("complex").unwrap().as_deref(), Some(b"v4" as &[u8]));
tx.cancel().unwrap();
}
}
#[test]
fn test_getm_basic() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.set("key1", "value1").unwrap();
tx.set("key2", "value2").unwrap();
tx.set("key3", "value3").unwrap();
tx.commit().unwrap();
let tx = db.transaction(false);
let keys = vec!["key1", "key2", "key3"];
let results = tx.getm(keys).unwrap();
assert_eq!(results.len(), 3);
assert_eq!(results[0].as_deref(), Some(b"value1" as &[u8]));
assert_eq!(results[1].as_deref(), Some(b"value2" as &[u8]));
assert_eq!(results[2].as_deref(), Some(b"value3" as &[u8]));
}
#[test]
fn test_getm_with_missing_keys() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.set("key1", "value1").unwrap();
tx.set("key3", "value3").unwrap();
tx.commit().unwrap();
let tx = db.transaction(false);
let keys = vec!["key1", "key2", "key3", "key4"];
let results = tx.getm(keys).unwrap();
assert_eq!(results.len(), 4);
assert_eq!(results[0].as_deref(), Some(b"value1" as &[u8]));
assert_eq!(results[1], None); assert_eq!(results[2].as_deref(), Some(b"value3" as &[u8]));
assert_eq!(results[3], None); }
#[test]
fn test_getm_empty_vector() {
let db = Database::new();
let tx = db.transaction(false);
let keys: Vec<&str> = vec![];
let results = tx.getm(keys).unwrap();
assert_eq!(results.len(), 0);
}
#[test]
fn test_getm_with_writeset() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.set("key1", "value1").unwrap();
tx.set("key2", "value2").unwrap();
tx.commit().unwrap();
let mut tx = db.transaction(true);
tx.set("key2", "updated2").unwrap(); tx.set("key3", "value3").unwrap(); tx.del("key1").unwrap();
let keys = vec!["key1", "key2", "key3", "key4"];
let results = tx.getm(keys).unwrap();
assert_eq!(results.len(), 4);
assert_eq!(results[0], None); assert_eq!(results[1].as_deref(), Some(b"updated2" as &[u8])); assert_eq!(results[2].as_deref(), Some(b"value3" as &[u8])); assert_eq!(results[3], None); }
#[test]
fn test_getm_maintains_order() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.set("a", "1").unwrap();
tx.set("b", "2").unwrap();
tx.set("c", "3").unwrap();
tx.commit().unwrap();
let tx = db.transaction(false);
let keys = vec!["c", "a", "b"];
let results = tx.getm(keys).unwrap();
assert_eq!(results[0].as_deref(), Some(b"3" as &[u8])); assert_eq!(results[1].as_deref(), Some(b"1" as &[u8])); assert_eq!(results[2].as_deref(), Some(b"2" as &[u8])); }
#[test]
fn test_getm_duplicate_keys() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.set("key1", "value1").unwrap();
tx.commit().unwrap();
let tx = db.transaction(false);
let keys = vec!["key1", "key1", "key1"];
let results = tx.getm(keys).unwrap();
assert_eq!(results.len(), 3);
assert_eq!(results[0].as_deref(), Some(b"value1" as &[u8]));
assert_eq!(results[1].as_deref(), Some(b"value1" as &[u8]));
assert_eq!(results[2].as_deref(), Some(b"value1" as &[u8]));
}
#[test]
fn test_getm_closed_transaction() {
let db = Database::new();
let mut tx = db.transaction(false);
tx.cancel().unwrap();
let keys = vec!["key1"];
let result = tx.getm(keys);
assert!(matches!(result, Err(Error::TxClosed)));
}
#[test]
fn test_getm_ssi_read_tracking() {
let db = Database::new();
let mut tx1 = db.transaction(true).with_serializable_snapshot_isolation();
let mut tx2 = db.transaction(true).with_serializable_snapshot_isolation();
let keys = vec!["key1", "key2"];
let results = tx1.getm(keys).unwrap();
assert_eq!(results[0], None);
assert_eq!(results[1], None);
tx1.set("key1", "value1").unwrap();
assert!(tx1.commit().is_ok());
assert!(tx2.get("key1").unwrap().is_none());
tx2.set("key2", "value2").unwrap();
assert!(tx2.commit().is_err());
}
#[test]
fn test_getm_at_version_basic() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.set("key1", "v1_value1").unwrap();
tx.set("key2", "v1_value2").unwrap();
tx.commit().unwrap();
let version1 = db.oracle.current_timestamp();
thread::sleep(std::time::Duration::from_millis(1));
let mut tx = db.transaction(true);
tx.set("key1", "v2_value1").unwrap();
tx.set("key2", "v2_value2").unwrap();
tx.set("key3", "v2_value3").unwrap();
tx.commit().unwrap();
let tx = db.transaction(false);
let keys = vec!["key1", "key2", "key3"];
let results = tx.getm_at_version(keys, version1).unwrap();
assert_eq!(results.len(), 3);
assert_eq!(results[0].as_deref(), Some(b"v1_value1" as &[u8]));
assert_eq!(results[1].as_deref(), Some(b"v1_value2" as &[u8]));
assert_eq!(results[2], None); }
#[test]
fn test_getm_at_version_with_deletes() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.set("key1", "value1").unwrap();
tx.set("key2", "value2").unwrap();
tx.commit().unwrap();
let version1 = db.oracle.current_timestamp();
thread::sleep(std::time::Duration::from_millis(1));
let mut tx = db.transaction(true);
tx.del("key1").unwrap();
tx.commit().unwrap();
let tx = db.transaction(false);
let keys = vec!["key1", "key2"];
let results = tx.getm_at_version(keys.clone(), version1).unwrap();
assert_eq!(results[0].as_deref(), Some(b"value1" as &[u8]));
assert_eq!(results[1].as_deref(), Some(b"value2" as &[u8]));
let results = tx.getm(keys).unwrap();
assert_eq!(results[0], None); assert_eq!(results[1].as_deref(), Some(b"value2" as &[u8]));
}
#[test]
fn test_getm_at_version_future_version() {
let db = Database::new();
let tx = db.transaction(false);
let future_version = tx.version() + 1000;
let keys = vec!["key1"];
let result = tx.getm_at_version(keys, future_version);
assert!(matches!(result, Err(Error::VersionInFuture)));
}
#[test]
fn test_getm_at_version_closed_transaction() {
let db = Database::new();
let mut tx = db.transaction(false);
let version = tx.version();
tx.cancel().unwrap();
let keys = vec!["key1"];
let result = tx.getm_at_version(keys, version);
assert!(matches!(result, Err(Error::TxClosed)));
}
#[test]
fn test_getm_at_version_multiple_versions() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.set("key", "v1").unwrap();
tx.commit().unwrap();
let version1 = db.oracle.current_timestamp();
thread::sleep(std::time::Duration::from_millis(1));
let mut tx = db.transaction(true);
tx.set("key", "v2").unwrap();
tx.commit().unwrap();
let version2 = db.oracle.current_timestamp();
thread::sleep(std::time::Duration::from_millis(1));
let mut tx = db.transaction(true);
tx.set("key", "v3").unwrap();
tx.commit().unwrap();
let tx = db.transaction(false);
let keys = vec!["key"];
let results = tx.getm_at_version(keys.clone(), version1).unwrap();
assert_eq!(results[0].as_deref(), Some(b"v1" as &[u8]));
let results = tx.getm_at_version(keys.clone(), version2).unwrap();
assert_eq!(results[0].as_deref(), Some(b"v2" as &[u8]));
let results = tx.getm(keys).unwrap();
assert_eq!(results[0].as_deref(), Some(b"v3" as &[u8]));
}
#[test]
fn test_getm_concurrent_reads() {
let db = Arc::new(Database::new());
let mut tx = db.transaction(true);
for i in 0..100 {
tx.set(format!("key{}", i), format!("value{}", i)).unwrap();
}
tx.commit().unwrap();
let mut handles = vec![];
for thread_id in 0..10 {
let db_clone = Arc::clone(&db);
let handle = thread::spawn(move || {
let tx = db_clone.transaction(false);
let keys: Vec<String> = (0..100).map(|i| format!("key{}", i)).collect();
let results = tx.getm(keys).unwrap();
for (i, result) in results.iter().enumerate() {
assert_eq!(
result.as_deref(),
Some(format!("value{}", i).as_bytes()),
"Thread {} failed at index {}",
thread_id,
i
);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn snapshot_isolation_reader_registration_race_does_not_lose_versions() {
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
let db = Arc::new(Database::new());
{
let mut tx = db.transaction(true);
tx.set("key", "v0").unwrap();
tx.commit().unwrap();
}
let stop = Arc::new(AtomicBool::new(false));
let none_reads = Arc::new(AtomicUsize::new(0));
let total_reads = Arc::new(AtomicUsize::new(0));
let writer = {
let db = Arc::clone(&db);
let stop = Arc::clone(&stop);
thread::spawn(move || {
let mut counter: u64 = 0;
while !stop.load(Ordering::Relaxed) {
let mut tx = db.transaction(true);
tx.set("key", format!("v{counter}")).unwrap();
tx.commit().unwrap();
counter = counter.wrapping_add(1);
}
})
};
let mut readers = Vec::new();
for _ in 0..6 {
let db = Arc::clone(&db);
let stop = Arc::clone(&stop);
let none_reads = Arc::clone(&none_reads);
let total_reads = Arc::clone(&total_reads);
readers.push(thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let tx = db.transaction(false);
let value = tx.get("key").unwrap();
total_reads.fetch_add(1, Ordering::Relaxed);
if value.is_none() {
none_reads.fetch_add(1, Ordering::Relaxed);
}
drop(tx);
}
}));
}
let started = Instant::now();
while started.elapsed() < Duration::from_millis(500) {
thread::sleep(Duration::from_millis(10));
}
stop.store(true, Ordering::Relaxed);
writer.join().unwrap();
for r in readers {
r.join().unwrap();
}
let nones = none_reads.load(Ordering::Relaxed);
let total = total_reads.load(Ordering::Relaxed);
assert_eq!(
nones, 0,
"reader observed `None` for a key that was seeded with a value \
and never deleted ({nones} of {total} reads returned `None`). \
This is a snapshot-isolation registration race: \
`TransactionInner::new` (and `::reset`) load \
`oracle.current_timestamp()` before registering the \
transaction's start version in `counter_by_oracle`, so a \
concurrent committer's inline GC can advance `cleanup_ts` \
past the reader's snapshot. The reader then sees no visible \
version at its snapshot timestamp."
);
}
}