txn-db 1.0.0

MVCC transaction engine for Rust storage layers. Snapshot isolation and serializable transactions with multi-version concurrency control, conflict detection, and a durable transaction log on wal-db. The transaction layer for embedded databases and Hive DB.
Documentation
//! Plugging a custom backing store into the engine through the [`VersionStore`]
//! trait — the Tier-3 seam. Here the custom store is an instrumented wrapper
//! that counts reads and applied versions while delegating the actual keeping
//! of data to the shipped in-memory store. The same shape lets you back the
//! engine with an on-disk store, a remote service, or anything that can hold
//! timestamped versions of a key.
//!
//! Run with: `cargo run --example custom_store`

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

use txn_db::{Db, MemoryStore, Timestamp, TxnError, VersionStore, WriteEntry};

/// Counters shared between the store and whoever wants to observe it.
#[derive(Clone, Default)]
struct Counters {
    reads: Arc<AtomicU64>,
    versions_applied: Arc<AtomicU64>,
}

/// A [`VersionStore`] that records how often it is read and written, then
/// forwards every call to an inner [`MemoryStore`].
struct CountingStore {
    inner: MemoryStore,
    counters: Counters,
}

impl VersionStore for CountingStore {
    fn get(&self, key: &[u8], read_ts: Timestamp) -> Result<Option<Arc<[u8]>>, TxnError> {
        let _ = self.counters.reads.fetch_add(1, Ordering::Relaxed);
        self.inner.get(key, read_ts)
    }

    fn try_commit(
        &self,
        read_ts: Timestamp,
        commit_ts: Timestamp,
        writes: Vec<WriteEntry>,
        reads: &[Arc<[u8]>],
    ) -> Result<(), TxnError> {
        // Count the versions only when the commit actually lands; a conflict
        // applies nothing.
        let count = writes.len() as u64;
        let outcome = self.inner.try_commit(read_ts, commit_ts, writes, reads);
        if outcome.is_ok() {
            let _ = self
                .counters
                .versions_applied
                .fetch_add(count, Ordering::Relaxed);
        }
        outcome
    }
}

fn main() -> Result<(), TxnError> {
    // Keep a handle to the counters, then move the store into the database.
    let counters = Counters::default();
    let db = Db::with_store(CountingStore {
        inner: MemoryStore::new(),
        counters: counters.clone(),
    });

    let mut tx = db.begin();
    tx.put(b"a".to_vec(), b"1".to_vec());
    tx.put(b"b".to_vec(), b"2".to_vec());
    tx.commit()?;

    let tx = db.begin();
    let _ = tx.get(b"a")?;
    let _ = tx.get(b"b")?;
    let _ = tx.get(b"c")?;

    // The wrapper observed everything the engine did, without changing any of
    // the transaction semantics.
    println!(
        "reads served:     {}",
        counters.reads.load(Ordering::Relaxed)
    );
    println!(
        "versions applied: {}",
        counters.versions_applied.load(Ordering::Relaxed)
    );

    Ok(())
}