Skip to main content

txn_db/
db.rs

1//! The database handle and the commit coordinator behind it.
2//!
3//! [`Db`] is the Tier-1 entry point: construct one, [`begin`](Db::begin)
4//! transactions against it, [`commit`](crate::Transaction::commit) them. A `Db`
5//! is a cheap, clonable handle to shared state — clone it freely and hand a
6//! clone to every thread that needs to read or write.
7//!
8//! The shared state itself lives in [`Inner`], which owns the version store and
9//! the [`Oracle`](crate::oracle::Oracle) that allocates timestamps and tracks
10//! the read watermark. Commit coordination is split deliberately: the oracle
11//! hands out timestamps lock-free, and the version store is the serialization
12//! point that validates and applies each commit atomically. The single global
13//! commit lock of the foundation release is gone.
14
15use std::sync::Arc;
16
17use crate::error::Result;
18use crate::oracle::Oracle;
19use crate::store::{MemoryStore, VersionStore, WriteEntry};
20use crate::timestamp::Timestamp;
21use crate::txn::{Snapshot, Transaction};
22
23/// Shared, reference-counted state for one logical database.
24///
25/// A [`Db`] is a handle to an `Arc<Inner>`; every clone of the `Db`, every
26/// [`Transaction`], and every [`Snapshot`] holds a clone of the same `Inner`,
27/// so they all read and commit against one version store and one timestamp
28/// sequence.
29pub(crate) struct Inner<S: VersionStore> {
30    /// The backing version store. Reads go to it; commits validate and apply
31    /// through it.
32    pub(crate) store: S,
33    /// Allocates timestamps and tracks the consistent-read watermark.
34    oracle: Oracle,
35    /// The durable commit log, present only for a database opened with
36    /// [`Db::open`]. `None` for an in-memory database.
37    #[cfg(feature = "durability")]
38    log: Option<crate::durable::CommitLog>,
39}
40
41impl<S: VersionStore> Inner<S> {
42    fn new(store: S) -> Self {
43        Inner {
44            store,
45            oracle: Oracle::new(),
46            #[cfg(feature = "durability")]
47            log: None,
48        }
49    }
50
51    /// The timestamp a transaction beginning now should read at.
52    #[inline]
53    fn read_ts(&self) -> Timestamp {
54        self.oracle.read_ts()
55    }
56
57    /// Register a live reader and return its read timestamp.
58    #[inline]
59    pub(crate) fn begin_reader(&self) -> Timestamp {
60        self.oracle.begin_reader()
61    }
62
63    /// Unregister a reader that began at `read_ts`.
64    #[inline]
65    pub(crate) fn end_reader(&self, read_ts: Timestamp) {
66        self.oracle.end_reader(read_ts);
67    }
68
69    /// Reclaim versions no live reader can observe, returning the count removed.
70    fn collect_garbage(&self) -> usize {
71        self.store.collect_garbage(self.oracle.low_watermark())
72    }
73
74    /// Allocate a commit timestamp, validate-and-apply through the store, then
75    /// release the timestamp into the watermark.
76    ///
77    /// The timestamp is reported to the oracle on both outcomes — a successful
78    /// commit and a rejected one — so a conflict never stalls the read watermark
79    /// behind the timestamp it consumed.
80    pub(crate) fn commit_writes(
81        &self,
82        read_ts: Timestamp,
83        writes: Vec<WriteEntry>,
84        reads: &[Arc<[u8]>],
85    ) -> Result<Timestamp> {
86        let commit_ts = self.oracle.alloc_commit_ts();
87
88        // Encode the durable record before the write set is consumed by the
89        // store. No cost for an in-memory database (no log).
90        #[cfg(feature = "durability")]
91        let record = self
92            .log
93            .as_ref()
94            .map(|_| crate::durable::encode_for_log(commit_ts, &writes));
95
96        let outcome = self.store.try_commit(read_ts, commit_ts, writes, reads);
97
98        // Make the commit durable before it is acknowledged. The validate-and-
99        // apply has already happened in memory but is not yet visible — the
100        // watermark only advances at `commit_done` below — so a crash before the
101        // sync completes leaves a transaction that was never acknowledged and is
102        // recovered as absent.
103        #[cfg(feature = "durability")]
104        if outcome.is_ok() {
105            if let (Some(log), Some(record)) = (self.log.as_ref(), record) {
106                if let Err(err) = log.append_committed(&record) {
107                    self.oracle.commit_done(commit_ts);
108                    return Err(err);
109                }
110            }
111        }
112
113        self.oracle.commit_done(commit_ts);
114        outcome.map(|()| commit_ts)
115    }
116
117    /// Build the shared inner state for a database recovered from a durable log.
118    #[cfg(feature = "durability")]
119    fn recovered(store: S, oracle: Oracle, log: crate::durable::CommitLog) -> Self {
120        Inner {
121            store,
122            oracle,
123            log: Some(log),
124        }
125    }
126}
127
128/// A transactional, multi-version key-value database.
129///
130/// `Db` is the front door. [`Db::new`] gives you an in-memory database;
131/// [`Db::with_store`] builds one over any [`VersionStore`]. From there the whole
132/// common case is [`begin`](Db::begin) / [`get`](crate::Transaction::get) /
133/// [`put`](crate::Transaction::put) / [`commit`](crate::Transaction::commit),
134/// with [`snapshot`](Db::snapshot) for read-only point-in-time views.
135///
136/// Transactions default to **snapshot isolation**. With the `serializable`
137/// feature enabled, `begin_serializable` starts a transaction whose read set is
138/// validated at commit, rejecting write skew and the other anomalies snapshot
139/// isolation permits.
140///
141/// A `Db` is a clonable handle over shared state, like an [`Arc`]. Cloning it
142/// is cheap and every clone refers to the same database, so the idiomatic way
143/// to use it across threads is to clone a handle per thread.
144///
145/// # Examples
146///
147/// The four-call common case:
148///
149/// ```
150/// use txn_db::Db;
151///
152/// let db = Db::new();
153///
154/// let mut tx = db.begin();
155/// tx.put(b"greeting".to_vec(), b"hei".to_vec());
156/// tx.commit()?;
157///
158/// let tx = db.begin();
159/// assert_eq!(tx.get(b"greeting")?.as_deref(), Some(&b"hei"[..]));
160/// # Ok::<(), txn_db::TxnError>(())
161/// ```
162///
163/// Sharing one database across threads:
164///
165/// ```
166/// use std::thread;
167/// use txn_db::Db;
168///
169/// let db = Db::new();
170/// let handles: Vec<_> = (0..4u8)
171///     .map(|i| {
172///         let db = db.clone();
173///         thread::spawn(move || {
174///             let mut tx = db.begin();
175///             tx.put(vec![i], vec![i]);
176///             // Independent keys never conflict.
177///             tx.commit().expect("commit");
178///         })
179///     })
180///     .collect();
181/// for h in handles {
182///     h.join().expect("thread");
183/// }
184/// # Ok::<(), txn_db::TxnError>(())
185/// ```
186pub struct Db<S: VersionStore = MemoryStore> {
187    inner: Arc<Inner<S>>,
188}
189
190impl Db<MemoryStore> {
191    /// Create an empty in-memory database.
192    ///
193    /// This is the default configuration: a [`MemoryStore`] backing store, ready
194    /// for [`begin`](Db::begin).
195    ///
196    /// # Examples
197    ///
198    /// ```
199    /// use txn_db::Db;
200    ///
201    /// let db = Db::new();
202    /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
203    /// ```
204    #[must_use]
205    pub fn new() -> Self {
206        Db::with_store(MemoryStore::new())
207    }
208
209    /// Open a durable database backed by a write-ahead log at `path`, replaying
210    /// any committed transactions already in the log.
211    ///
212    /// Every transaction committed against the returned database appends its
213    /// record to the log and syncs it before [`commit`](crate::Transaction::commit)
214    /// returns, so an acknowledged commit survives a crash. On open, the log is
215    /// replayed: each committed transaction is reinstated, and a transaction that
216    /// never reached the log — because it aborted, or because the process crashed
217    /// before its record was made durable — is simply absent. The recovered data
218    /// lives in memory; the log is the durable record from which it is rebuilt.
219    ///
220    /// Available with the `durability` feature.
221    ///
222    /// # Errors
223    ///
224    /// Returns [`TxnError::Durability`](crate::TxnError::Durability) if the log
225    /// cannot be opened or a record read back from it does not decode.
226    ///
227    /// # Examples
228    ///
229    /// ```
230    /// # #[cfg(feature = "durability")]
231    /// # {
232    /// # let dir = tempfile::tempdir().expect("tempdir");
233    /// # let path = dir.path().join("txn.wal");
234    /// use txn_db::Db;
235    ///
236    /// // Commit, then drop the database.
237    /// {
238    ///     let db = Db::open(&path)?;
239    ///     let mut tx = db.begin();
240    ///     tx.put(b"k".to_vec(), b"v".to_vec());
241    ///     tx.commit()?;
242    /// }
243    ///
244    /// // Reopening replays the log: the committed write is still there.
245    /// let db = Db::open(&path)?;
246    /// assert_eq!(db.begin().get(b"k")?.as_deref(), Some(&b"v"[..]));
247    /// # }
248    /// # Ok::<(), txn_db::TxnError>(())
249    /// ```
250    #[cfg(feature = "durability")]
251    #[cfg_attr(docsrs, doc(cfg(feature = "durability")))]
252    pub fn open(path: impl AsRef<std::path::Path>) -> Result<Db<MemoryStore>> {
253        let (log, mut recovered) = crate::durable::CommitLog::open(path)?;
254
255        // Replay in ascending commit-timestamp order; records may sit in the log
256        // out of that order because commits append after applying, concurrently.
257        recovered.sort_by_key(|commit| commit.commit_ts);
258
259        let store = MemoryStore::new();
260        let mut highest = Timestamp::ZERO;
261        for commit in recovered {
262            highest = highest.max(commit.commit_ts);
263            store.install_recovered(commit.commit_ts, commit.writes);
264        }
265
266        Ok(Db {
267            inner: Arc::new(Inner::recovered(store, Oracle::recovered(highest), log)),
268        })
269    }
270}
271
272impl Default for Db<MemoryStore> {
273    fn default() -> Self {
274        Db::new()
275    }
276}
277
278impl<S: VersionStore> Db<S> {
279    /// Create a database over a custom [`VersionStore`].
280    ///
281    /// This is the Tier-3 seam: supply any backing store and the transaction
282    /// semantics — snapshot isolation, read-your-own-writes, conflict detection
283    /// — compose on top of it unchanged.
284    ///
285    /// # Examples
286    ///
287    /// ```
288    /// use txn_db::{Db, MemoryStore};
289    ///
290    /// let db = Db::with_store(MemoryStore::new());
291    /// let mut tx = db.begin();
292    /// tx.put(b"k".to_vec(), b"v".to_vec());
293    /// tx.commit()?;
294    /// # Ok::<(), txn_db::TxnError>(())
295    /// ```
296    #[must_use]
297    pub fn with_store(store: S) -> Self {
298        Db {
299            inner: Arc::new(Inner::new(store)),
300        }
301    }
302
303    /// Begin a snapshot-isolation transaction over the current state.
304    ///
305    /// The transaction takes its snapshot at this moment: it reads as of the
306    /// most recent commit and is unaffected by commits that happen afterward.
307    /// Its writes are checked for write-write conflicts at commit, but its reads
308    /// are not validated — use `begin_serializable` (with the `serializable`
309    /// feature) when you need serializability.
310    ///
311    /// # Examples
312    ///
313    /// ```
314    /// use txn_db::Db;
315    ///
316    /// let db = Db::new();
317    /// let mut tx = db.begin();
318    /// tx.put(b"k".to_vec(), b"v".to_vec());
319    /// tx.commit()?;
320    /// # Ok::<(), txn_db::TxnError>(())
321    /// ```
322    pub fn begin(&self) -> Transaction<S> {
323        Transaction::new(Arc::clone(&self.inner), false)
324    }
325
326    /// Begin a serializable transaction over the current state.
327    ///
328    /// A serializable transaction tracks every key it reads and, at commit,
329    /// validates that none of them changed since its snapshot — in addition to
330    /// the write-write check every transaction gets. That read-set validation is
331    /// what rejects write skew and the read-only anomaly that plain snapshot
332    /// isolation permits, giving serializable behavior for the transactions that
333    /// commit writes. A serializable transaction that writes nothing commits
334    /// trivially, exactly like a read-only snapshot.
335    ///
336    /// Available with the `serializable` feature. Snapshot isolation remains the
337    /// default and is unaffected.
338    ///
339    /// # Examples
340    ///
341    /// ```
342    /// # #[cfg(feature = "serializable")]
343    /// # {
344    /// use txn_db::Db;
345    ///
346    /// let db = Db::new();
347    /// // Seed two rows that an invariant ties together.
348    /// let mut tx = db.begin();
349    /// tx.put(b"on_call:alice".to_vec(), vec![1]);
350    /// tx.put(b"on_call:bob".to_vec(), vec![1]);
351    /// tx.commit()?;
352    ///
353    /// // A serializable transaction validates the rows it read at commit.
354    /// let mut tx = db.begin_serializable();
355    /// let _alice = tx.get(b"on_call:alice")?;
356    /// let _bob = tx.get(b"on_call:bob")?;
357    /// tx.put(b"on_call:alice".to_vec(), vec![0]);
358    /// tx.commit()?;
359    /// # }
360    /// # Ok::<(), txn_db::TxnError>(())
361    /// ```
362    #[cfg(feature = "serializable")]
363    #[cfg_attr(docsrs, doc(cfg(feature = "serializable")))]
364    pub fn begin_serializable(&self) -> Transaction<S> {
365        Transaction::new(Arc::clone(&self.inner), true)
366    }
367
368    /// Take a read-only snapshot of the current state of the database.
369    ///
370    /// The returned [`Snapshot`] reads as of this instant and never changes,
371    /// even as other transactions commit. Use it to read several keys at one
372    /// consistent point in time without the overhead of a transaction.
373    ///
374    /// # Examples
375    ///
376    /// ```
377    /// use txn_db::Db;
378    ///
379    /// let db = Db::new();
380    /// let snap = db.snapshot();
381    /// assert_eq!(snap.get(b"k")?, None);
382    /// # Ok::<(), txn_db::TxnError>(())
383    /// ```
384    pub fn snapshot(&self) -> Snapshot<S> {
385        Snapshot::new(Arc::clone(&self.inner))
386    }
387
388    /// The timestamp of the most recent commit visible to a new transaction.
389    ///
390    /// Returns [`Timestamp::ZERO`] for a database that has never been written.
391    /// This is the read watermark: the timestamp a transaction beginning now
392    /// would read at.
393    ///
394    /// # Examples
395    ///
396    /// ```
397    /// use txn_db::Db;
398    ///
399    /// let db = Db::new();
400    /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
401    ///
402    /// let mut tx = db.begin();
403    /// tx.put(b"k".to_vec(), b"v".to_vec());
404    /// let ts = tx.commit()?;
405    /// assert_eq!(db.last_committed(), ts);
406    /// # Ok::<(), txn_db::TxnError>(())
407    /// ```
408    #[must_use]
409    pub fn last_committed(&self) -> Timestamp {
410        self.inner.read_ts()
411    }
412
413    /// Reclaim versions that no live transaction or snapshot can observe,
414    /// returning how many were removed.
415    ///
416    /// `txn-db` keeps every version of a key so that an in-flight reader sees a
417    /// stable snapshot. Once no live reader can observe an old version — because
418    /// every active transaction and snapshot reads at a timestamp newer than a
419    /// later version of that key — the old one is unreachable and this reclaims
420    /// it. A key deleted before the oldest live reader's snapshot is dropped
421    /// entirely.
422    ///
423    /// Call it periodically, or after retiring long-running snapshots, to bound
424    /// memory. It is safe to call at any time and from any thread: a version a
425    /// live reader can still reach is never reclaimed. With the default
426    /// in-memory store this prunes the version chains; a custom
427    /// [`VersionStore`](crate::VersionStore) that keeps no history can leave the
428    /// default no-op in place.
429    ///
430    /// # Examples
431    ///
432    /// ```
433    /// use txn_db::Db;
434    ///
435    /// let db = Db::new();
436    /// // Overwrite the same key several times.
437    /// for v in 0..5u8 {
438    ///     let mut tx = db.begin();
439    ///     tx.put(b"k".to_vec(), vec![v]);
440    ///     tx.commit()?;
441    /// }
442    ///
443    /// // No snapshot is held, so only the newest version need be kept.
444    /// let reclaimed = db.collect_garbage();
445    /// assert!(reclaimed > 0);
446    /// assert_eq!(db.begin().get(b"k")?.as_deref(), Some(&[4u8][..]));
447    /// # Ok::<(), txn_db::TxnError>(())
448    /// ```
449    pub fn collect_garbage(&self) -> usize {
450        self.inner.collect_garbage()
451    }
452}
453
454impl<S: VersionStore> Clone for Db<S> {
455    /// Clone the handle, not the data: the clone shares the same underlying
456    /// database.
457    fn clone(&self) -> Self {
458        Db {
459            inner: Arc::clone(&self.inner),
460        }
461    }
462}
463
464#[cfg(all(test, not(loom)))]
465#[allow(clippy::unwrap_used, clippy::expect_used)]
466mod tests {
467    use super::*;
468
469    #[test]
470    fn test_new_database_is_empty_at_zero() {
471        let db = Db::new();
472        assert_eq!(db.last_committed(), Timestamp::ZERO);
473        assert_eq!(db.begin().get(b"k").unwrap(), None);
474    }
475
476    #[test]
477    fn test_commit_makes_writes_visible_to_later_transactions() {
478        let db = Db::new();
479        let mut tx = db.begin();
480        tx.put(b"k".to_vec(), b"v".to_vec());
481        let ts = tx.commit().unwrap();
482        assert!(ts > Timestamp::ZERO);
483        assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
484    }
485
486    #[test]
487    fn test_snapshot_is_isolated_from_later_commits() {
488        let db = Db::new();
489        let mut tx = db.begin();
490        tx.put(b"k".to_vec(), b"v1".to_vec());
491        let _ = tx.commit().unwrap();
492
493        let snap = db.snapshot();
494        let mut tx = db.begin();
495        tx.put(b"k".to_vec(), b"v2".to_vec());
496        let _ = tx.commit().unwrap();
497
498        assert_eq!(snap.get(b"k").unwrap().as_deref(), Some(&b"v1"[..]));
499    }
500
501    #[test]
502    fn test_write_write_conflict_aborts_later_committer() {
503        let db = Db::new();
504        let mut a = db.begin();
505        let mut b = db.begin();
506        a.put(b"k".to_vec(), b"a".to_vec());
507        b.put(b"k".to_vec(), b"b".to_vec());
508
509        assert!(a.commit().is_ok());
510        let err = b.commit().expect_err("second committer must lose");
511        assert!(err.is_retryable());
512        assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"a"[..]));
513    }
514
515    #[test]
516    fn test_disjoint_keys_do_not_conflict() {
517        let db = Db::new();
518        let mut a = db.begin();
519        let mut b = db.begin();
520        a.put(b"a".to_vec(), b"1".to_vec());
521        b.put(b"b".to_vec(), b"2".to_vec());
522        assert!(a.commit().is_ok());
523        assert!(b.commit().is_ok());
524    }
525
526    #[test]
527    fn test_read_only_commit_returns_snapshot_timestamp() {
528        let db = Db::new();
529        let mut tx = db.begin();
530        tx.put(b"k".to_vec(), b"v".to_vec());
531        let ts = tx.commit().unwrap();
532
533        let ro = db.begin();
534        assert_eq!(ro.commit().unwrap(), ts);
535    }
536
537    #[test]
538    fn test_rollback_discards_writes() {
539        let db = Db::new();
540        let mut tx = db.begin();
541        tx.put(b"k".to_vec(), b"v".to_vec());
542        tx.rollback();
543        assert_eq!(db.begin().get(b"k").unwrap(), None);
544    }
545
546    #[test]
547    fn test_gc_reclaims_when_no_reader_is_held() {
548        let db = Db::new();
549        for v in 0..5u8 {
550            let mut tx = db.begin();
551            tx.put(b"k".to_vec(), vec![v]);
552            let _ = tx.commit().unwrap();
553        }
554        let reclaimed = db.collect_garbage();
555        assert!(reclaimed > 0);
556        assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&[4u8][..]));
557    }
558
559    #[test]
560    fn test_held_snapshot_pins_gc() {
561        let db = Db::new();
562        let mut tx = db.begin();
563        tx.put(b"k".to_vec(), vec![1]);
564        let _ = tx.commit().unwrap();
565
566        // Hold a snapshot of this state, then overwrite the key.
567        let snap = db.snapshot();
568        let mut tx = db.begin();
569        tx.put(b"k".to_vec(), vec![2]);
570        let _ = tx.commit().unwrap();
571
572        // GC must not reclaim the version the held snapshot still observes.
573        let _ = db.collect_garbage();
574        assert_eq!(snap.get(b"k").unwrap().as_deref(), Some(&[1u8][..]));
575
576        // Once the snapshot is dropped, the old version becomes reclaimable.
577        drop(snap);
578        let reclaimed = db.collect_garbage();
579        assert!(reclaimed > 0);
580        assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&[2u8][..]));
581    }
582
583    #[test]
584    fn test_clone_shares_state() {
585        let db = Db::new();
586        let db2 = db.clone();
587        let mut tx = db.begin();
588        tx.put(b"k".to_vec(), b"v".to_vec());
589        let _ = tx.commit().unwrap();
590        assert_eq!(db2.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
591    }
592
593    #[cfg(feature = "serializable")]
594    #[test]
595    fn test_serializable_rejects_write_skew() {
596        let db = Db::new();
597        let mut seed = db.begin();
598        seed.put(b"x".to_vec(), vec![1]);
599        seed.put(b"y".to_vec(), vec![1]);
600        let _ = seed.commit().unwrap();
601
602        // Two serializable transactions from the same snapshot each read both
603        // rows and write the one the other read.
604        let mut t1 = db.begin_serializable();
605        let mut t2 = db.begin_serializable();
606        let _ = t1.get(b"x").unwrap();
607        let _ = t1.get(b"y").unwrap();
608        let _ = t2.get(b"x").unwrap();
609        let _ = t2.get(b"y").unwrap();
610        t1.put(b"x".to_vec(), vec![0]);
611        t2.put(b"y".to_vec(), vec![0]);
612
613        assert!(t1.commit().is_ok());
614        // t2 read x, which t1 changed -> serializable validation aborts it.
615        let err = t2.commit().expect_err("write skew must be rejected");
616        assert!(err.is_retryable());
617    }
618
619    #[cfg(feature = "serializable")]
620    #[test]
621    fn test_snapshot_txn_allows_write_skew() {
622        let db = Db::new();
623        let mut seed = db.begin();
624        seed.put(b"x".to_vec(), vec![1]);
625        seed.put(b"y".to_vec(), vec![1]);
626        let _ = seed.commit().unwrap();
627
628        // The same schedule under plain snapshot isolation: both commit, because
629        // SI does not validate the read set.
630        let mut t1 = db.begin();
631        let mut t2 = db.begin();
632        let _ = t1.get(b"x").unwrap();
633        let _ = t1.get(b"y").unwrap();
634        let _ = t2.get(b"x").unwrap();
635        let _ = t2.get(b"y").unwrap();
636        t1.put(b"x".to_vec(), vec![0]);
637        t2.put(b"y".to_vec(), vec![0]);
638
639        assert!(t1.commit().is_ok());
640        assert!(t2.commit().is_ok());
641    }
642}