Skip to main content

txn_db/
db.rs

1//! The database handle and the commit coordinator behind it.
2//!
3//! [`Db`] is the Tier-1 entry point: construct one, [`begin`](Db::begin)
4//! transactions against it, [`commit`](crate::Transaction::commit) them. A `Db`
5//! is a cheap, clonable handle to shared state — clone it freely and hand a
6//! clone to every thread that needs to read or write.
7//!
8//! The shared state itself lives in [`Inner`], which owns the version store and
9//! the [`Oracle`](crate::oracle::Oracle) that allocates timestamps and tracks
10//! the read watermark. Commit coordination is split deliberately: the oracle
11//! hands out timestamps lock-free, and the version store is the serialization
12//! point that validates and applies each commit atomically. The single global
13//! commit lock of the foundation release is gone.
14
15use std::sync::Arc;
16
17use crate::error::Result;
18use crate::oracle::Oracle;
19use crate::store::{MemoryStore, VersionStore, WriteEntry};
20use crate::timestamp::Timestamp;
21use crate::txn::{Snapshot, Transaction};
22
23/// Shared, reference-counted state for one logical database.
24///
25/// A [`Db`] is a handle to an `Arc<Inner>`; every clone of the `Db`, every
26/// [`Transaction`], and every [`Snapshot`] holds a clone of the same `Inner`,
27/// so they all read and commit against one version store and one timestamp
28/// sequence.
29pub(crate) struct Inner<S: VersionStore> {
30    /// The backing version store. Reads go to it; commits validate and apply
31    /// through it.
32    pub(crate) store: S,
33    /// Allocates timestamps and tracks the consistent-read watermark.
34    oracle: Oracle,
35    /// The durable commit log, present only for a database opened with
36    /// [`Db::open`]. `None` for an in-memory database.
37    #[cfg(feature = "durability")]
38    log: Option<crate::durable::CommitLog>,
39}
40
41impl<S: VersionStore> Inner<S> {
42    fn new(store: S) -> Self {
43        Inner {
44            store,
45            oracle: Oracle::new(),
46            #[cfg(feature = "durability")]
47            log: None,
48        }
49    }
50
51    /// The timestamp a transaction beginning now should read at.
52    #[inline]
53    fn read_ts(&self) -> Timestamp {
54        self.oracle.read_ts()
55    }
56
57    /// Allocate a commit timestamp, validate-and-apply through the store, then
58    /// release the timestamp into the watermark.
59    ///
60    /// The timestamp is reported to the oracle on both outcomes — a successful
61    /// commit and a rejected one — so a conflict never stalls the read watermark
62    /// behind the timestamp it consumed.
63    pub(crate) fn commit_writes(
64        &self,
65        read_ts: Timestamp,
66        writes: Vec<WriteEntry>,
67        reads: &[Arc<[u8]>],
68    ) -> Result<Timestamp> {
69        let commit_ts = self.oracle.alloc_commit_ts();
70
71        // Encode the durable record before the write set is consumed by the
72        // store. No cost for an in-memory database (no log).
73        #[cfg(feature = "durability")]
74        let record = self
75            .log
76            .as_ref()
77            .map(|_| crate::durable::encode_for_log(commit_ts, &writes));
78
79        let outcome = self.store.try_commit(read_ts, commit_ts, writes, reads);
80
81        // Make the commit durable before it is acknowledged. The validate-and-
82        // apply has already happened in memory but is not yet visible — the
83        // watermark only advances at `commit_done` below — so a crash before the
84        // sync completes leaves a transaction that was never acknowledged and is
85        // recovered as absent.
86        #[cfg(feature = "durability")]
87        if outcome.is_ok() {
88            if let (Some(log), Some(record)) = (self.log.as_ref(), record) {
89                if let Err(err) = log.append_committed(&record) {
90                    self.oracle.commit_done(commit_ts);
91                    return Err(err);
92                }
93            }
94        }
95
96        self.oracle.commit_done(commit_ts);
97        outcome.map(|()| commit_ts)
98    }
99
100    /// Build the shared inner state for a database recovered from a durable log.
101    #[cfg(feature = "durability")]
102    fn recovered(store: S, oracle: Oracle, log: crate::durable::CommitLog) -> Self {
103        Inner {
104            store,
105            oracle,
106            log: Some(log),
107        }
108    }
109}
110
111/// A transactional, multi-version key-value database.
112///
113/// `Db` is the front door. [`Db::new`] gives you an in-memory database;
114/// [`Db::with_store`] builds one over any [`VersionStore`]. From there the whole
115/// common case is [`begin`](Db::begin) / [`get`](crate::Transaction::get) /
116/// [`put`](crate::Transaction::put) / [`commit`](crate::Transaction::commit),
117/// with [`snapshot`](Db::snapshot) for read-only point-in-time views.
118///
119/// Transactions default to **snapshot isolation**. With the `serializable`
120/// feature enabled, `begin_serializable` starts a transaction whose read set is
121/// validated at commit, rejecting write skew and the other anomalies snapshot
122/// isolation permits.
123///
124/// A `Db` is a clonable handle over shared state, like an [`Arc`]. Cloning it
125/// is cheap and every clone refers to the same database, so the idiomatic way
126/// to use it across threads is to clone a handle per thread.
127///
128/// # Examples
129///
130/// The four-call common case:
131///
132/// ```
133/// use txn_db::Db;
134///
135/// let db = Db::new();
136///
137/// let mut tx = db.begin();
138/// tx.put(b"greeting".to_vec(), b"hei".to_vec());
139/// tx.commit()?;
140///
141/// let tx = db.begin();
142/// assert_eq!(tx.get(b"greeting")?.as_deref(), Some(&b"hei"[..]));
143/// # Ok::<(), txn_db::TxnError>(())
144/// ```
145///
146/// Sharing one database across threads:
147///
148/// ```
149/// use std::thread;
150/// use txn_db::Db;
151///
152/// let db = Db::new();
153/// let handles: Vec<_> = (0..4u8)
154///     .map(|i| {
155///         let db = db.clone();
156///         thread::spawn(move || {
157///             let mut tx = db.begin();
158///             tx.put(vec![i], vec![i]);
159///             // Independent keys never conflict.
160///             tx.commit().expect("commit");
161///         })
162///     })
163///     .collect();
164/// for h in handles {
165///     h.join().expect("thread");
166/// }
167/// # Ok::<(), txn_db::TxnError>(())
168/// ```
169pub struct Db<S: VersionStore = MemoryStore> {
170    inner: Arc<Inner<S>>,
171}
172
173impl Db<MemoryStore> {
174    /// Create an empty in-memory database.
175    ///
176    /// This is the default configuration: a [`MemoryStore`] backing store, ready
177    /// for [`begin`](Db::begin).
178    ///
179    /// # Examples
180    ///
181    /// ```
182    /// use txn_db::Db;
183    ///
184    /// let db = Db::new();
185    /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
186    /// ```
187    #[must_use]
188    pub fn new() -> Self {
189        Db::with_store(MemoryStore::new())
190    }
191
192    /// Open a durable database backed by a write-ahead log at `path`, replaying
193    /// any committed transactions already in the log.
194    ///
195    /// Every transaction committed against the returned database appends its
196    /// record to the log and syncs it before [`commit`](crate::Transaction::commit)
197    /// returns, so an acknowledged commit survives a crash. On open, the log is
198    /// replayed: each committed transaction is reinstated, and a transaction that
199    /// never reached the log — because it aborted, or because the process crashed
200    /// before its record was made durable — is simply absent. The recovered data
201    /// lives in memory; the log is the durable record from which it is rebuilt.
202    ///
203    /// Available with the `durability` feature.
204    ///
205    /// # Errors
206    ///
207    /// Returns [`TxnError::Durability`](crate::TxnError::Durability) if the log
208    /// cannot be opened or a record read back from it does not decode.
209    ///
210    /// # Examples
211    ///
212    /// ```
213    /// # #[cfg(feature = "durability")]
214    /// # {
215    /// # let dir = tempfile::tempdir().expect("tempdir");
216    /// # let path = dir.path().join("txn.wal");
217    /// use txn_db::Db;
218    ///
219    /// // Commit, then drop the database.
220    /// {
221    ///     let db = Db::open(&path)?;
222    ///     let mut tx = db.begin();
223    ///     tx.put(b"k".to_vec(), b"v".to_vec());
224    ///     tx.commit()?;
225    /// }
226    ///
227    /// // Reopening replays the log: the committed write is still there.
228    /// let db = Db::open(&path)?;
229    /// assert_eq!(db.begin().get(b"k")?.as_deref(), Some(&b"v"[..]));
230    /// # }
231    /// # Ok::<(), txn_db::TxnError>(())
232    /// ```
233    #[cfg(feature = "durability")]
234    #[cfg_attr(docsrs, doc(cfg(feature = "durability")))]
235    pub fn open(path: impl AsRef<std::path::Path>) -> Result<Db<MemoryStore>> {
236        let (log, mut recovered) = crate::durable::CommitLog::open(path)?;
237
238        // Replay in ascending commit-timestamp order; records may sit in the log
239        // out of that order because commits append after applying, concurrently.
240        recovered.sort_by_key(|commit| commit.commit_ts);
241
242        let store = MemoryStore::new();
243        let mut highest = Timestamp::ZERO;
244        for commit in recovered {
245            highest = highest.max(commit.commit_ts);
246            store.install_recovered(commit.commit_ts, commit.writes);
247        }
248
249        Ok(Db {
250            inner: Arc::new(Inner::recovered(store, Oracle::recovered(highest), log)),
251        })
252    }
253}
254
255impl Default for Db<MemoryStore> {
256    fn default() -> Self {
257        Db::new()
258    }
259}
260
261impl<S: VersionStore> Db<S> {
262    /// Create a database over a custom [`VersionStore`].
263    ///
264    /// This is the Tier-3 seam: supply any backing store and the transaction
265    /// semantics — snapshot isolation, read-your-own-writes, conflict detection
266    /// — compose on top of it unchanged.
267    ///
268    /// # Examples
269    ///
270    /// ```
271    /// use txn_db::{Db, MemoryStore};
272    ///
273    /// let db = Db::with_store(MemoryStore::new());
274    /// let mut tx = db.begin();
275    /// tx.put(b"k".to_vec(), b"v".to_vec());
276    /// tx.commit()?;
277    /// # Ok::<(), txn_db::TxnError>(())
278    /// ```
279    #[must_use]
280    pub fn with_store(store: S) -> Self {
281        Db {
282            inner: Arc::new(Inner::new(store)),
283        }
284    }
285
286    /// Begin a snapshot-isolation transaction over the current state.
287    ///
288    /// The transaction takes its snapshot at this moment: it reads as of the
289    /// most recent commit and is unaffected by commits that happen afterward.
290    /// Its writes are checked for write-write conflicts at commit, but its reads
291    /// are not validated — use `begin_serializable` (with the `serializable`
292    /// feature) when you need serializability.
293    ///
294    /// # Examples
295    ///
296    /// ```
297    /// use txn_db::Db;
298    ///
299    /// let db = Db::new();
300    /// let mut tx = db.begin();
301    /// tx.put(b"k".to_vec(), b"v".to_vec());
302    /// tx.commit()?;
303    /// # Ok::<(), txn_db::TxnError>(())
304    /// ```
305    pub fn begin(&self) -> Transaction<S> {
306        Transaction::new(Arc::clone(&self.inner), self.inner.read_ts(), false)
307    }
308
309    /// Begin a serializable transaction over the current state.
310    ///
311    /// A serializable transaction tracks every key it reads and, at commit,
312    /// validates that none of them changed since its snapshot — in addition to
313    /// the write-write check every transaction gets. That read-set validation is
314    /// what rejects write skew and the read-only anomaly that plain snapshot
315    /// isolation permits, giving serializable behavior for the transactions that
316    /// commit writes. A serializable transaction that writes nothing commits
317    /// trivially, exactly like a read-only snapshot.
318    ///
319    /// Available with the `serializable` feature. Snapshot isolation remains the
320    /// default and is unaffected.
321    ///
322    /// # Examples
323    ///
324    /// ```
325    /// # #[cfg(feature = "serializable")]
326    /// # {
327    /// use txn_db::Db;
328    ///
329    /// let db = Db::new();
330    /// // Seed two rows that an invariant ties together.
331    /// let mut tx = db.begin();
332    /// tx.put(b"on_call:alice".to_vec(), vec![1]);
333    /// tx.put(b"on_call:bob".to_vec(), vec![1]);
334    /// tx.commit()?;
335    ///
336    /// // A serializable transaction validates the rows it read at commit.
337    /// let mut tx = db.begin_serializable();
338    /// let _alice = tx.get(b"on_call:alice")?;
339    /// let _bob = tx.get(b"on_call:bob")?;
340    /// tx.put(b"on_call:alice".to_vec(), vec![0]);
341    /// tx.commit()?;
342    /// # }
343    /// # Ok::<(), txn_db::TxnError>(())
344    /// ```
345    #[cfg(feature = "serializable")]
346    #[cfg_attr(docsrs, doc(cfg(feature = "serializable")))]
347    pub fn begin_serializable(&self) -> Transaction<S> {
348        Transaction::new(Arc::clone(&self.inner), self.inner.read_ts(), true)
349    }
350
351    /// Take a read-only snapshot of the current state of the database.
352    ///
353    /// The returned [`Snapshot`] reads as of this instant and never changes,
354    /// even as other transactions commit. Use it to read several keys at one
355    /// consistent point in time without the overhead of a transaction.
356    ///
357    /// # Examples
358    ///
359    /// ```
360    /// use txn_db::Db;
361    ///
362    /// let db = Db::new();
363    /// let snap = db.snapshot();
364    /// assert_eq!(snap.get(b"k")?, None);
365    /// # Ok::<(), txn_db::TxnError>(())
366    /// ```
367    pub fn snapshot(&self) -> Snapshot<S> {
368        Snapshot::new(Arc::clone(&self.inner), self.inner.read_ts())
369    }
370
371    /// The timestamp of the most recent commit visible to a new transaction.
372    ///
373    /// Returns [`Timestamp::ZERO`] for a database that has never been written.
374    /// This is the read watermark: the timestamp a transaction beginning now
375    /// would read at.
376    ///
377    /// # Examples
378    ///
379    /// ```
380    /// use txn_db::Db;
381    ///
382    /// let db = Db::new();
383    /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
384    ///
385    /// let mut tx = db.begin();
386    /// tx.put(b"k".to_vec(), b"v".to_vec());
387    /// let ts = tx.commit()?;
388    /// assert_eq!(db.last_committed(), ts);
389    /// # Ok::<(), txn_db::TxnError>(())
390    /// ```
391    #[must_use]
392    pub fn last_committed(&self) -> Timestamp {
393        self.inner.read_ts()
394    }
395}
396
397impl<S: VersionStore> Clone for Db<S> {
398    /// Clone the handle, not the data: the clone shares the same underlying
399    /// database.
400    fn clone(&self) -> Self {
401        Db {
402            inner: Arc::clone(&self.inner),
403        }
404    }
405}
406
407#[cfg(all(test, not(loom)))]
408#[allow(clippy::unwrap_used, clippy::expect_used)]
409mod tests {
410    use super::*;
411
412    #[test]
413    fn test_new_database_is_empty_at_zero() {
414        let db = Db::new();
415        assert_eq!(db.last_committed(), Timestamp::ZERO);
416        assert_eq!(db.begin().get(b"k").unwrap(), None);
417    }
418
419    #[test]
420    fn test_commit_makes_writes_visible_to_later_transactions() {
421        let db = Db::new();
422        let mut tx = db.begin();
423        tx.put(b"k".to_vec(), b"v".to_vec());
424        let ts = tx.commit().unwrap();
425        assert!(ts > Timestamp::ZERO);
426        assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
427    }
428
429    #[test]
430    fn test_snapshot_is_isolated_from_later_commits() {
431        let db = Db::new();
432        let mut tx = db.begin();
433        tx.put(b"k".to_vec(), b"v1".to_vec());
434        let _ = tx.commit().unwrap();
435
436        let snap = db.snapshot();
437        let mut tx = db.begin();
438        tx.put(b"k".to_vec(), b"v2".to_vec());
439        let _ = tx.commit().unwrap();
440
441        assert_eq!(snap.get(b"k").unwrap().as_deref(), Some(&b"v1"[..]));
442    }
443
444    #[test]
445    fn test_write_write_conflict_aborts_later_committer() {
446        let db = Db::new();
447        let mut a = db.begin();
448        let mut b = db.begin();
449        a.put(b"k".to_vec(), b"a".to_vec());
450        b.put(b"k".to_vec(), b"b".to_vec());
451
452        assert!(a.commit().is_ok());
453        let err = b.commit().expect_err("second committer must lose");
454        assert!(err.is_retryable());
455        assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"a"[..]));
456    }
457
458    #[test]
459    fn test_disjoint_keys_do_not_conflict() {
460        let db = Db::new();
461        let mut a = db.begin();
462        let mut b = db.begin();
463        a.put(b"a".to_vec(), b"1".to_vec());
464        b.put(b"b".to_vec(), b"2".to_vec());
465        assert!(a.commit().is_ok());
466        assert!(b.commit().is_ok());
467    }
468
469    #[test]
470    fn test_read_only_commit_returns_snapshot_timestamp() {
471        let db = Db::new();
472        let mut tx = db.begin();
473        tx.put(b"k".to_vec(), b"v".to_vec());
474        let ts = tx.commit().unwrap();
475
476        let ro = db.begin();
477        assert_eq!(ro.commit().unwrap(), ts);
478    }
479
480    #[test]
481    fn test_rollback_discards_writes() {
482        let db = Db::new();
483        let mut tx = db.begin();
484        tx.put(b"k".to_vec(), b"v".to_vec());
485        tx.rollback();
486        assert_eq!(db.begin().get(b"k").unwrap(), None);
487    }
488
489    #[test]
490    fn test_clone_shares_state() {
491        let db = Db::new();
492        let db2 = db.clone();
493        let mut tx = db.begin();
494        tx.put(b"k".to_vec(), b"v".to_vec());
495        let _ = tx.commit().unwrap();
496        assert_eq!(db2.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
497    }
498
499    #[cfg(feature = "serializable")]
500    #[test]
501    fn test_serializable_rejects_write_skew() {
502        let db = Db::new();
503        let mut seed = db.begin();
504        seed.put(b"x".to_vec(), vec![1]);
505        seed.put(b"y".to_vec(), vec![1]);
506        let _ = seed.commit().unwrap();
507
508        // Two serializable transactions from the same snapshot each read both
509        // rows and write the one the other read.
510        let mut t1 = db.begin_serializable();
511        let mut t2 = db.begin_serializable();
512        let _ = t1.get(b"x").unwrap();
513        let _ = t1.get(b"y").unwrap();
514        let _ = t2.get(b"x").unwrap();
515        let _ = t2.get(b"y").unwrap();
516        t1.put(b"x".to_vec(), vec![0]);
517        t2.put(b"y".to_vec(), vec![0]);
518
519        assert!(t1.commit().is_ok());
520        // t2 read x, which t1 changed -> serializable validation aborts it.
521        let err = t2.commit().expect_err("write skew must be rejected");
522        assert!(err.is_retryable());
523    }
524
525    #[cfg(feature = "serializable")]
526    #[test]
527    fn test_snapshot_txn_allows_write_skew() {
528        let db = Db::new();
529        let mut seed = db.begin();
530        seed.put(b"x".to_vec(), vec![1]);
531        seed.put(b"y".to_vec(), vec![1]);
532        let _ = seed.commit().unwrap();
533
534        // The same schedule under plain snapshot isolation: both commit, because
535        // SI does not validate the read set.
536        let mut t1 = db.begin();
537        let mut t2 = db.begin();
538        let _ = t1.get(b"x").unwrap();
539        let _ = t1.get(b"y").unwrap();
540        let _ = t2.get(b"x").unwrap();
541        let _ = t2.get(b"y").unwrap();
542        t1.put(b"x".to_vec(), vec![0]);
543        t2.put(b"y".to_vec(), vec![0]);
544
545        assert!(t1.commit().is_ok());
546        assert!(t2.commit().is_ok());
547    }
548}