Skip to main content

txn_db/
db.rs

1//! The database handle and the commit coordinator behind it.
2//!
3//! [`Db`] is the Tier-1 entry point: construct one, [`begin`](Db::begin)
4//! transactions against it, [`commit`](crate::Transaction::commit) them. A `Db`
5//! is a cheap, clonable handle to shared state — clone it freely and hand a
6//! clone to every thread that needs to read or write.
7//!
8//! The shared state itself lives in [`Inner`], which owns the version store and
9//! the [`Oracle`](crate::oracle::Oracle) that allocates timestamps and tracks
10//! the read watermark. Commit coordination is split deliberately: the oracle
11//! hands out timestamps lock-free, and the version store is the serialization
12//! point that validates and applies each commit atomically. The single global
13//! commit lock of the foundation release is gone.
14
15use std::sync::Arc;
16
17use crate::error::Result;
18use crate::oracle::Oracle;
19use crate::store::{MemoryStore, VersionStore, WriteEntry};
20use crate::timestamp::Timestamp;
21use crate::txn::{Snapshot, Transaction};
22
23/// Shared, reference-counted state for one logical database.
24///
25/// A [`Db`] is a handle to an `Arc<Inner>`; every clone of the `Db`, every
26/// [`Transaction`], and every [`Snapshot`] holds a clone of the same `Inner`,
27/// so they all read and commit against one version store and one timestamp
28/// sequence.
29pub(crate) struct Inner<S: VersionStore> {
30    /// The backing version store. Reads go to it; commits validate and apply
31    /// through it.
32    pub(crate) store: S,
33    /// Allocates timestamps and tracks the consistent-read watermark.
34    oracle: Oracle,
35}
36
37impl<S: VersionStore> Inner<S> {
38    fn new(store: S) -> Self {
39        Inner {
40            store,
41            oracle: Oracle::new(),
42        }
43    }
44
45    /// The timestamp a transaction beginning now should read at.
46    #[inline]
47    fn read_ts(&self) -> Timestamp {
48        self.oracle.read_ts()
49    }
50
51    /// Allocate a commit timestamp, validate-and-apply through the store, then
52    /// release the timestamp into the watermark.
53    ///
54    /// The timestamp is reported to the oracle on both outcomes — a successful
55    /// commit and a rejected one — so a conflict never stalls the read watermark
56    /// behind the timestamp it consumed.
57    pub(crate) fn commit_writes(
58        &self,
59        read_ts: Timestamp,
60        writes: Vec<WriteEntry>,
61        reads: &[Arc<[u8]>],
62    ) -> Result<Timestamp> {
63        let commit_ts = self.oracle.alloc_commit_ts();
64        let outcome = self.store.try_commit(read_ts, commit_ts, writes, reads);
65        self.oracle.commit_done(commit_ts);
66        outcome.map(|()| commit_ts)
67    }
68}
69
70/// A transactional, multi-version key-value database.
71///
72/// `Db` is the front door. [`Db::new`] gives you an in-memory database;
73/// [`Db::with_store`] builds one over any [`VersionStore`]. From there the whole
74/// common case is [`begin`](Db::begin) / [`get`](crate::Transaction::get) /
75/// [`put`](crate::Transaction::put) / [`commit`](crate::Transaction::commit),
76/// with [`snapshot`](Db::snapshot) for read-only point-in-time views.
77///
78/// Transactions default to **snapshot isolation**. With the `serializable`
79/// feature enabled, [`begin_serializable`](Db::begin_serializable) starts a
80/// transaction whose read set is validated at commit, rejecting write skew and
81/// the other anomalies snapshot isolation permits.
82///
83/// A `Db` is a clonable handle over shared state, like an [`Arc`]. Cloning it
84/// is cheap and every clone refers to the same database, so the idiomatic way
85/// to use it across threads is to clone a handle per thread.
86///
87/// # Examples
88///
89/// The four-call common case:
90///
91/// ```
92/// use txn_db::Db;
93///
94/// let db = Db::new();
95///
96/// let mut tx = db.begin();
97/// tx.put(b"greeting".to_vec(), b"hei".to_vec());
98/// tx.commit()?;
99///
100/// let tx = db.begin();
101/// assert_eq!(tx.get(b"greeting")?.as_deref(), Some(&b"hei"[..]));
102/// # Ok::<(), txn_db::TxnError>(())
103/// ```
104///
105/// Sharing one database across threads:
106///
107/// ```
108/// use std::thread;
109/// use txn_db::Db;
110///
111/// let db = Db::new();
112/// let handles: Vec<_> = (0..4u8)
113///     .map(|i| {
114///         let db = db.clone();
115///         thread::spawn(move || {
116///             let mut tx = db.begin();
117///             tx.put(vec![i], vec![i]);
118///             // Independent keys never conflict.
119///             tx.commit().expect("commit");
120///         })
121///     })
122///     .collect();
123/// for h in handles {
124///     h.join().expect("thread");
125/// }
126/// # Ok::<(), txn_db::TxnError>(())
127/// ```
128pub struct Db<S: VersionStore = MemoryStore> {
129    inner: Arc<Inner<S>>,
130}
131
132impl Db<MemoryStore> {
133    /// Create an empty in-memory database.
134    ///
135    /// This is the default configuration: a [`MemoryStore`] backing store, ready
136    /// for [`begin`](Db::begin).
137    ///
138    /// # Examples
139    ///
140    /// ```
141    /// use txn_db::Db;
142    ///
143    /// let db = Db::new();
144    /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
145    /// ```
146    #[must_use]
147    pub fn new() -> Self {
148        Db::with_store(MemoryStore::new())
149    }
150}
151
152impl Default for Db<MemoryStore> {
153    fn default() -> Self {
154        Db::new()
155    }
156}
157
158impl<S: VersionStore> Db<S> {
159    /// Create a database over a custom [`VersionStore`].
160    ///
161    /// This is the Tier-3 seam: supply any backing store and the transaction
162    /// semantics — snapshot isolation, read-your-own-writes, conflict detection
163    /// — compose on top of it unchanged.
164    ///
165    /// # Examples
166    ///
167    /// ```
168    /// use txn_db::{Db, MemoryStore};
169    ///
170    /// let db = Db::with_store(MemoryStore::new());
171    /// let mut tx = db.begin();
172    /// tx.put(b"k".to_vec(), b"v".to_vec());
173    /// tx.commit()?;
174    /// # Ok::<(), txn_db::TxnError>(())
175    /// ```
176    #[must_use]
177    pub fn with_store(store: S) -> Self {
178        Db {
179            inner: Arc::new(Inner::new(store)),
180        }
181    }
182
183    /// Begin a snapshot-isolation transaction over the current state.
184    ///
185    /// The transaction takes its snapshot at this moment: it reads as of the
186    /// most recent commit and is unaffected by commits that happen afterward.
187    /// Its writes are checked for write-write conflicts at commit, but its reads
188    /// are not validated — use [`begin_serializable`](Db::begin_serializable)
189    /// when you need serializability.
190    ///
191    /// # Examples
192    ///
193    /// ```
194    /// use txn_db::Db;
195    ///
196    /// let db = Db::new();
197    /// let mut tx = db.begin();
198    /// tx.put(b"k".to_vec(), b"v".to_vec());
199    /// tx.commit()?;
200    /// # Ok::<(), txn_db::TxnError>(())
201    /// ```
202    pub fn begin(&self) -> Transaction<S> {
203        Transaction::new(Arc::clone(&self.inner), self.inner.read_ts(), false)
204    }
205
206    /// Begin a serializable transaction over the current state.
207    ///
208    /// A serializable transaction tracks every key it reads and, at commit,
209    /// validates that none of them changed since its snapshot — in addition to
210    /// the write-write check every transaction gets. That read-set validation is
211    /// what rejects write skew and the read-only anomaly that plain snapshot
212    /// isolation permits, giving serializable behavior for the transactions that
213    /// commit writes. A serializable transaction that writes nothing commits
214    /// trivially, exactly like a read-only snapshot.
215    ///
216    /// Available with the `serializable` feature. Snapshot isolation remains the
217    /// default and is unaffected.
218    ///
219    /// # Examples
220    ///
221    /// ```
222    /// # #[cfg(feature = "serializable")]
223    /// # {
224    /// use txn_db::Db;
225    ///
226    /// let db = Db::new();
227    /// // Seed two rows that an invariant ties together.
228    /// let mut tx = db.begin();
229    /// tx.put(b"on_call:alice".to_vec(), vec![1]);
230    /// tx.put(b"on_call:bob".to_vec(), vec![1]);
231    /// tx.commit()?;
232    ///
233    /// // A serializable transaction validates the rows it read at commit.
234    /// let mut tx = db.begin_serializable();
235    /// let _alice = tx.get(b"on_call:alice")?;
236    /// let _bob = tx.get(b"on_call:bob")?;
237    /// tx.put(b"on_call:alice".to_vec(), vec![0]);
238    /// tx.commit()?;
239    /// # }
240    /// # Ok::<(), txn_db::TxnError>(())
241    /// ```
242    #[cfg(feature = "serializable")]
243    #[cfg_attr(docsrs, doc(cfg(feature = "serializable")))]
244    pub fn begin_serializable(&self) -> Transaction<S> {
245        Transaction::new(Arc::clone(&self.inner), self.inner.read_ts(), true)
246    }
247
248    /// Take a read-only snapshot of the current state of the database.
249    ///
250    /// The returned [`Snapshot`] reads as of this instant and never changes,
251    /// even as other transactions commit. Use it to read several keys at one
252    /// consistent point in time without the overhead of a transaction.
253    ///
254    /// # Examples
255    ///
256    /// ```
257    /// use txn_db::Db;
258    ///
259    /// let db = Db::new();
260    /// let snap = db.snapshot();
261    /// assert_eq!(snap.get(b"k")?, None);
262    /// # Ok::<(), txn_db::TxnError>(())
263    /// ```
264    pub fn snapshot(&self) -> Snapshot<S> {
265        Snapshot::new(Arc::clone(&self.inner), self.inner.read_ts())
266    }
267
268    /// The timestamp of the most recent commit visible to a new transaction.
269    ///
270    /// Returns [`Timestamp::ZERO`] for a database that has never been written.
271    /// This is the read watermark: the timestamp a transaction beginning now
272    /// would read at.
273    ///
274    /// # Examples
275    ///
276    /// ```
277    /// use txn_db::Db;
278    ///
279    /// let db = Db::new();
280    /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
281    ///
282    /// let mut tx = db.begin();
283    /// tx.put(b"k".to_vec(), b"v".to_vec());
284    /// let ts = tx.commit()?;
285    /// assert_eq!(db.last_committed(), ts);
286    /// # Ok::<(), txn_db::TxnError>(())
287    /// ```
288    #[must_use]
289    pub fn last_committed(&self) -> Timestamp {
290        self.inner.read_ts()
291    }
292}
293
294impl<S: VersionStore> Clone for Db<S> {
295    /// Clone the handle, not the data: the clone shares the same underlying
296    /// database.
297    fn clone(&self) -> Self {
298        Db {
299            inner: Arc::clone(&self.inner),
300        }
301    }
302}
303
304#[cfg(all(test, not(loom)))]
305#[allow(clippy::unwrap_used, clippy::expect_used)]
306mod tests {
307    use super::*;
308
309    #[test]
310    fn test_new_database_is_empty_at_zero() {
311        let db = Db::new();
312        assert_eq!(db.last_committed(), Timestamp::ZERO);
313        assert_eq!(db.begin().get(b"k").unwrap(), None);
314    }
315
316    #[test]
317    fn test_commit_makes_writes_visible_to_later_transactions() {
318        let db = Db::new();
319        let mut tx = db.begin();
320        tx.put(b"k".to_vec(), b"v".to_vec());
321        let ts = tx.commit().unwrap();
322        assert!(ts > Timestamp::ZERO);
323        assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
324    }
325
326    #[test]
327    fn test_snapshot_is_isolated_from_later_commits() {
328        let db = Db::new();
329        let mut tx = db.begin();
330        tx.put(b"k".to_vec(), b"v1".to_vec());
331        let _ = tx.commit().unwrap();
332
333        let snap = db.snapshot();
334        let mut tx = db.begin();
335        tx.put(b"k".to_vec(), b"v2".to_vec());
336        let _ = tx.commit().unwrap();
337
338        assert_eq!(snap.get(b"k").unwrap().as_deref(), Some(&b"v1"[..]));
339    }
340
341    #[test]
342    fn test_write_write_conflict_aborts_later_committer() {
343        let db = Db::new();
344        let mut a = db.begin();
345        let mut b = db.begin();
346        a.put(b"k".to_vec(), b"a".to_vec());
347        b.put(b"k".to_vec(), b"b".to_vec());
348
349        assert!(a.commit().is_ok());
350        let err = b.commit().expect_err("second committer must lose");
351        assert!(err.is_retryable());
352        assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"a"[..]));
353    }
354
355    #[test]
356    fn test_disjoint_keys_do_not_conflict() {
357        let db = Db::new();
358        let mut a = db.begin();
359        let mut b = db.begin();
360        a.put(b"a".to_vec(), b"1".to_vec());
361        b.put(b"b".to_vec(), b"2".to_vec());
362        assert!(a.commit().is_ok());
363        assert!(b.commit().is_ok());
364    }
365
366    #[test]
367    fn test_read_only_commit_returns_snapshot_timestamp() {
368        let db = Db::new();
369        let mut tx = db.begin();
370        tx.put(b"k".to_vec(), b"v".to_vec());
371        let ts = tx.commit().unwrap();
372
373        let ro = db.begin();
374        assert_eq!(ro.commit().unwrap(), ts);
375    }
376
377    #[test]
378    fn test_rollback_discards_writes() {
379        let db = Db::new();
380        let mut tx = db.begin();
381        tx.put(b"k".to_vec(), b"v".to_vec());
382        tx.rollback();
383        assert_eq!(db.begin().get(b"k").unwrap(), None);
384    }
385
386    #[test]
387    fn test_clone_shares_state() {
388        let db = Db::new();
389        let db2 = db.clone();
390        let mut tx = db.begin();
391        tx.put(b"k".to_vec(), b"v".to_vec());
392        let _ = tx.commit().unwrap();
393        assert_eq!(db2.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
394    }
395
396    #[cfg(feature = "serializable")]
397    #[test]
398    fn test_serializable_rejects_write_skew() {
399        let db = Db::new();
400        let mut seed = db.begin();
401        seed.put(b"x".to_vec(), vec![1]);
402        seed.put(b"y".to_vec(), vec![1]);
403        let _ = seed.commit().unwrap();
404
405        // Two serializable transactions from the same snapshot each read both
406        // rows and write the one the other read.
407        let mut t1 = db.begin_serializable();
408        let mut t2 = db.begin_serializable();
409        let _ = t1.get(b"x").unwrap();
410        let _ = t1.get(b"y").unwrap();
411        let _ = t2.get(b"x").unwrap();
412        let _ = t2.get(b"y").unwrap();
413        t1.put(b"x".to_vec(), vec![0]);
414        t2.put(b"y".to_vec(), vec![0]);
415
416        assert!(t1.commit().is_ok());
417        // t2 read x, which t1 changed -> serializable validation aborts it.
418        let err = t2.commit().expect_err("write skew must be rejected");
419        assert!(err.is_retryable());
420    }
421
422    #[cfg(feature = "serializable")]
423    #[test]
424    fn test_snapshot_txn_allows_write_skew() {
425        let db = Db::new();
426        let mut seed = db.begin();
427        seed.put(b"x".to_vec(), vec![1]);
428        seed.put(b"y".to_vec(), vec![1]);
429        let _ = seed.commit().unwrap();
430
431        // The same schedule under plain snapshot isolation: both commit, because
432        // SI does not validate the read set.
433        let mut t1 = db.begin();
434        let mut t2 = db.begin();
435        let _ = t1.get(b"x").unwrap();
436        let _ = t1.get(b"y").unwrap();
437        let _ = t2.get(b"x").unwrap();
438        let _ = t2.get(b"y").unwrap();
439        t1.put(b"x".to_vec(), vec![0]);
440        t2.put(b"y".to_vec(), vec![0]);
441
442        assert!(t1.commit().is_ok());
443        assert!(t2.commit().is_ok());
444    }
445}