use std::sync::Arc;
use crate::error::Result;
use crate::oracle::Oracle;
use crate::store::{MemoryStore, VersionStore, WriteEntry};
use crate::timestamp::Timestamp;
use crate::txn::{Snapshot, Transaction};
pub(crate) struct Inner<S: VersionStore> {
pub(crate) store: S,
oracle: Oracle,
}
impl<S: VersionStore> Inner<S> {
fn new(store: S) -> Self {
Inner {
store,
oracle: Oracle::new(),
}
}
#[inline]
fn read_ts(&self) -> Timestamp {
self.oracle.read_ts()
}
pub(crate) fn commit_writes(
&self,
read_ts: Timestamp,
writes: Vec<WriteEntry>,
reads: &[Arc<[u8]>],
) -> Result<Timestamp> {
let commit_ts = self.oracle.alloc_commit_ts();
let outcome = self.store.try_commit(read_ts, commit_ts, writes, reads);
self.oracle.commit_done(commit_ts);
outcome.map(|()| commit_ts)
}
}
pub struct Db<S: VersionStore = MemoryStore> {
inner: Arc<Inner<S>>,
}
impl Db<MemoryStore> {
#[must_use]
pub fn new() -> Self {
Db::with_store(MemoryStore::new())
}
}
impl Default for Db<MemoryStore> {
fn default() -> Self {
Db::new()
}
}
impl<S: VersionStore> Db<S> {
#[must_use]
pub fn with_store(store: S) -> Self {
Db {
inner: Arc::new(Inner::new(store)),
}
}
pub fn begin(&self) -> Transaction<S> {
Transaction::new(Arc::clone(&self.inner), self.inner.read_ts(), false)
}
#[cfg(feature = "serializable")]
#[cfg_attr(docsrs, doc(cfg(feature = "serializable")))]
pub fn begin_serializable(&self) -> Transaction<S> {
Transaction::new(Arc::clone(&self.inner), self.inner.read_ts(), true)
}
pub fn snapshot(&self) -> Snapshot<S> {
Snapshot::new(Arc::clone(&self.inner), self.inner.read_ts())
}
#[must_use]
pub fn last_committed(&self) -> Timestamp {
self.inner.read_ts()
}
}
impl<S: VersionStore> Clone for Db<S> {
fn clone(&self) -> Self {
Db {
inner: Arc::clone(&self.inner),
}
}
}
#[cfg(all(test, not(loom)))]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn test_new_database_is_empty_at_zero() {
let db = Db::new();
assert_eq!(db.last_committed(), Timestamp::ZERO);
assert_eq!(db.begin().get(b"k").unwrap(), None);
}
#[test]
fn test_commit_makes_writes_visible_to_later_transactions() {
let db = Db::new();
let mut tx = db.begin();
tx.put(b"k".to_vec(), b"v".to_vec());
let ts = tx.commit().unwrap();
assert!(ts > Timestamp::ZERO);
assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
}
#[test]
fn test_snapshot_is_isolated_from_later_commits() {
let db = Db::new();
let mut tx = db.begin();
tx.put(b"k".to_vec(), b"v1".to_vec());
let _ = tx.commit().unwrap();
let snap = db.snapshot();
let mut tx = db.begin();
tx.put(b"k".to_vec(), b"v2".to_vec());
let _ = tx.commit().unwrap();
assert_eq!(snap.get(b"k").unwrap().as_deref(), Some(&b"v1"[..]));
}
#[test]
fn test_write_write_conflict_aborts_later_committer() {
let db = Db::new();
let mut a = db.begin();
let mut b = db.begin();
a.put(b"k".to_vec(), b"a".to_vec());
b.put(b"k".to_vec(), b"b".to_vec());
assert!(a.commit().is_ok());
let err = b.commit().expect_err("second committer must lose");
assert!(err.is_retryable());
assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"a"[..]));
}
#[test]
fn test_disjoint_keys_do_not_conflict() {
let db = Db::new();
let mut a = db.begin();
let mut b = db.begin();
a.put(b"a".to_vec(), b"1".to_vec());
b.put(b"b".to_vec(), b"2".to_vec());
assert!(a.commit().is_ok());
assert!(b.commit().is_ok());
}
#[test]
fn test_read_only_commit_returns_snapshot_timestamp() {
let db = Db::new();
let mut tx = db.begin();
tx.put(b"k".to_vec(), b"v".to_vec());
let ts = tx.commit().unwrap();
let ro = db.begin();
assert_eq!(ro.commit().unwrap(), ts);
}
#[test]
fn test_rollback_discards_writes() {
let db = Db::new();
let mut tx = db.begin();
tx.put(b"k".to_vec(), b"v".to_vec());
tx.rollback();
assert_eq!(db.begin().get(b"k").unwrap(), None);
}
#[test]
fn test_clone_shares_state() {
let db = Db::new();
let db2 = db.clone();
let mut tx = db.begin();
tx.put(b"k".to_vec(), b"v".to_vec());
let _ = tx.commit().unwrap();
assert_eq!(db2.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
}
#[cfg(feature = "serializable")]
#[test]
fn test_serializable_rejects_write_skew() {
let db = Db::new();
let mut seed = db.begin();
seed.put(b"x".to_vec(), vec![1]);
seed.put(b"y".to_vec(), vec![1]);
let _ = seed.commit().unwrap();
let mut t1 = db.begin_serializable();
let mut t2 = db.begin_serializable();
let _ = t1.get(b"x").unwrap();
let _ = t1.get(b"y").unwrap();
let _ = t2.get(b"x").unwrap();
let _ = t2.get(b"y").unwrap();
t1.put(b"x".to_vec(), vec![0]);
t2.put(b"y".to_vec(), vec![0]);
assert!(t1.commit().is_ok());
let err = t2.commit().expect_err("write skew must be rejected");
assert!(err.is_retryable());
}
#[cfg(feature = "serializable")]
#[test]
fn test_snapshot_txn_allows_write_skew() {
let db = Db::new();
let mut seed = db.begin();
seed.put(b"x".to_vec(), vec![1]);
seed.put(b"y".to_vec(), vec![1]);
let _ = seed.commit().unwrap();
let mut t1 = db.begin();
let mut t2 = db.begin();
let _ = t1.get(b"x").unwrap();
let _ = t1.get(b"y").unwrap();
let _ = t2.get(b"x").unwrap();
let _ = t2.get(b"y").unwrap();
t1.put(b"x".to_vec(), vec![0]);
t2.put(b"y".to_vec(), vec![0]);
assert!(t1.commit().is_ok());
assert!(t2.commit().is_ok());
}
}