Skip to main content

txn_db/
db.rs

1//! The database handle and the commit coordinator behind it.
2//!
3//! [`Db`] is the Tier-1 entry point: construct one, [`begin`](Db::begin)
4//! transactions against it, [`commit`](crate::Transaction::commit) them. A `Db`
5//! is a cheap, clonable handle to shared state — clone it freely and hand a
6//! clone to every thread that needs to read or write.
7//!
8//! The shared state itself lives in [`Inner`], which owns the version store and
9//! the small amount of coordination that snapshot isolation needs: a monotonic
10//! timestamp counter and a commit serialization point. Keeping that logic in
11//! one place is deliberate — commit ordering and conflict detection are the
12//! crate's correctness core, and they are easier to reason about when they are
13//! not scattered across the read and write handles.
14
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex, PoisonError};
17
18use crate::error::{Result, TxnError};
19use crate::store::{MemoryStore, VersionStore, WriteEntry};
20use crate::timestamp::Timestamp;
21use crate::txn::{Snapshot, Transaction};
22
23/// Shared, reference-counted state for one logical database.
24///
25/// A [`Db`] is a handle to an `Arc<Inner>`; every clone of the `Db`, every
26/// [`Transaction`], and every [`Snapshot`] holds a clone of the same `Inner`,
27/// so they all read and commit against one version store and one timestamp
28/// sequence.
29pub(crate) struct Inner<S: VersionStore> {
30    /// The backing version store. Reads go straight to it; commits apply to it.
31    pub(crate) store: S,
32    /// The next commit timestamp to hand out. Only ever advances.
33    next_ts: AtomicU64,
34    /// The highest timestamp whose writes are fully applied and visible. A new
35    /// transaction reads at this timestamp.
36    last_committed: AtomicU64,
37    /// Serializes the validate-then-apply commit critical section so two
38    /// commits cannot both pass conflict detection and then overwrite each
39    /// other. This single lock is the snapshot-isolation baseline; a sharded,
40    /// lock-free commit path is a later roadmap phase.
41    commit_lock: Mutex<()>,
42}
43
44impl<S: VersionStore> Inner<S> {
45    fn new(store: S) -> Self {
46        Inner {
47            store,
48            next_ts: AtomicU64::new(1),
49            last_committed: AtomicU64::new(Timestamp::ZERO.get()),
50            commit_lock: Mutex::new(()),
51        }
52    }
53
54    /// The timestamp a transaction beginning now should read at.
55    #[inline]
56    fn read_ts(&self) -> Timestamp {
57        Timestamp::from_raw(self.last_committed.load(Ordering::Acquire))
58    }
59
60    /// Validate and apply a transaction's writes under the commit lock.
61    ///
62    /// Holding `commit_lock` makes the sequence — check every written key for a
63    /// conflicting newer commit, allocate a commit timestamp, apply the batch,
64    /// publish the new high-water mark — atomic with respect to other commits.
65    pub(crate) fn commit_writes(
66        &self,
67        read_ts: Timestamp,
68        writes: std::collections::HashMap<Arc<[u8]>, Option<Arc<[u8]>>>,
69    ) -> Result<Timestamp> {
70        let _guard = self
71            .commit_lock
72            .lock()
73            .unwrap_or_else(PoisonError::into_inner);
74
75        // First-committer-wins: if any written key already has a version newer
76        // than this transaction's snapshot, another transaction beat it to the
77        // commit and this one must abort without applying anything.
78        for key in writes.keys() {
79            if let Some(latest) = self.store.latest_commit_ts(key)? {
80                if latest > read_ts {
81                    return Err(TxnError::conflict(key.len()));
82                }
83            }
84        }
85
86        let commit_ts = Timestamp::from_raw(self.next_ts.fetch_add(1, Ordering::Relaxed));
87        let batch: Vec<WriteEntry> = writes.into_iter().collect();
88        self.store.apply(commit_ts, batch)?;
89
90        // Publish only after the writes are applied, so any transaction that
91        // observes this timestamp also observes the data it stamps.
92        self.last_committed
93            .store(commit_ts.get(), Ordering::Release);
94        Ok(commit_ts)
95    }
96}
97
98/// A transactional, multi-version key-value database.
99///
100/// `Db` is the front door. [`Db::new`] gives you an in-memory database;
101/// [`Db::with_store`] builds one over any [`VersionStore`]. From there the whole
102/// common case is [`begin`](Db::begin) / [`get`](crate::Transaction::get) /
103/// [`put`](crate::Transaction::put) / [`commit`](crate::Transaction::commit),
104/// with [`snapshot`](Db::snapshot) for read-only point-in-time views.
105///
106/// A `Db` is a clonable handle over shared state, like an [`Arc`]. Cloning it
107/// is cheap and every clone refers to the same database, so the idiomatic way
108/// to use it across threads is to clone a handle per thread.
109///
110/// # Examples
111///
112/// The four-call common case:
113///
114/// ```
115/// use txn_db::Db;
116///
117/// let db = Db::new();
118///
119/// let mut tx = db.begin();
120/// tx.put(b"greeting".to_vec(), b"hei".to_vec());
121/// tx.commit()?;
122///
123/// let tx = db.begin();
124/// assert_eq!(tx.get(b"greeting")?.as_deref(), Some(&b"hei"[..]));
125/// # Ok::<(), txn_db::TxnError>(())
126/// ```
127///
128/// Sharing one database across threads:
129///
130/// ```
131/// use std::thread;
132/// use txn_db::Db;
133///
134/// let db = Db::new();
135/// let handles: Vec<_> = (0..4u8)
136///     .map(|i| {
137///         let db = db.clone();
138///         thread::spawn(move || {
139///             let mut tx = db.begin();
140///             tx.put(vec![i], vec![i]);
141///             // Independent keys never conflict.
142///             tx.commit().expect("commit");
143///         })
144///     })
145///     .collect();
146/// for h in handles {
147///     h.join().expect("thread");
148/// }
149/// # Ok::<(), txn_db::TxnError>(())
150/// ```
151pub struct Db<S: VersionStore = MemoryStore> {
152    inner: Arc<Inner<S>>,
153}
154
155impl Db<MemoryStore> {
156    /// Create an empty in-memory database.
157    ///
158    /// This is the default configuration: a [`MemoryStore`] backing store, ready
159    /// for [`begin`](Db::begin).
160    ///
161    /// # Examples
162    ///
163    /// ```
164    /// use txn_db::Db;
165    ///
166    /// let db = Db::new();
167    /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
168    /// ```
169    #[must_use]
170    pub fn new() -> Self {
171        Db::with_store(MemoryStore::new())
172    }
173}
174
175impl Default for Db<MemoryStore> {
176    fn default() -> Self {
177        Db::new()
178    }
179}
180
181impl<S: VersionStore> Db<S> {
182    /// Create a database over a custom [`VersionStore`].
183    ///
184    /// This is the Tier-3 seam: supply any backing store and the transaction
185    /// semantics — snapshot isolation, read-your-own-writes, write-write
186    /// conflict detection — compose on top of it unchanged.
187    ///
188    /// # Examples
189    ///
190    /// ```
191    /// use txn_db::{Db, MemoryStore};
192    ///
193    /// let db = Db::with_store(MemoryStore::new());
194    /// let mut tx = db.begin();
195    /// tx.put(b"k".to_vec(), b"v".to_vec());
196    /// tx.commit()?;
197    /// # Ok::<(), txn_db::TxnError>(())
198    /// ```
199    #[must_use]
200    pub fn with_store(store: S) -> Self {
201        Db {
202            inner: Arc::new(Inner::new(store)),
203        }
204    }
205
206    /// Begin a read-write transaction over the current state of the database.
207    ///
208    /// The transaction takes its snapshot at this moment: it reads as of the
209    /// most recent commit and is unaffected by commits that happen afterward.
210    ///
211    /// # Examples
212    ///
213    /// ```
214    /// use txn_db::Db;
215    ///
216    /// let db = Db::new();
217    /// let mut tx = db.begin();
218    /// tx.put(b"k".to_vec(), b"v".to_vec());
219    /// tx.commit()?;
220    /// # Ok::<(), txn_db::TxnError>(())
221    /// ```
222    pub fn begin(&self) -> Transaction<S> {
223        Transaction::new(Arc::clone(&self.inner), self.inner.read_ts())
224    }
225
226    /// Take a read-only snapshot of the current state of the database.
227    ///
228    /// The returned [`Snapshot`] reads as of this instant and never changes,
229    /// even as other transactions commit. Use it to read several keys at one
230    /// consistent point in time without the overhead of a transaction.
231    ///
232    /// # Examples
233    ///
234    /// ```
235    /// use txn_db::Db;
236    ///
237    /// let db = Db::new();
238    /// let snap = db.snapshot();
239    /// assert_eq!(snap.get(b"k")?, None);
240    /// # Ok::<(), txn_db::TxnError>(())
241    /// ```
242    #[must_use]
243    pub fn snapshot(&self) -> Snapshot<S> {
244        Snapshot::new(Arc::clone(&self.inner), self.inner.read_ts())
245    }
246
247    /// The timestamp of the most recent successful commit.
248    ///
249    /// Returns [`Timestamp::ZERO`] for a database that has never been written.
250    /// This is the timestamp a transaction beginning now would read at.
251    ///
252    /// # Examples
253    ///
254    /// ```
255    /// use txn_db::Db;
256    ///
257    /// let db = Db::new();
258    /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
259    ///
260    /// let mut tx = db.begin();
261    /// tx.put(b"k".to_vec(), b"v".to_vec());
262    /// let ts = tx.commit()?;
263    /// assert_eq!(db.last_committed(), ts);
264    /// # Ok::<(), txn_db::TxnError>(())
265    /// ```
266    #[must_use]
267    pub fn last_committed(&self) -> Timestamp {
268        self.inner.read_ts()
269    }
270}
271
272impl<S: VersionStore> Clone for Db<S> {
273    /// Clone the handle, not the data: the clone shares the same underlying
274    /// database.
275    fn clone(&self) -> Self {
276        Db {
277            inner: Arc::clone(&self.inner),
278        }
279    }
280}
281
282#[cfg(test)]
283#[allow(clippy::unwrap_used, clippy::expect_used)]
284mod tests {
285    use super::*;
286
287    #[test]
288    fn test_new_database_is_empty_at_zero() {
289        let db = Db::new();
290        assert_eq!(db.last_committed(), Timestamp::ZERO);
291        assert_eq!(db.begin().get(b"k").unwrap(), None);
292    }
293
294    #[test]
295    fn test_commit_makes_writes_visible_to_later_transactions() {
296        let db = Db::new();
297        let mut tx = db.begin();
298        tx.put(b"k".to_vec(), b"v".to_vec());
299        let ts = tx.commit().unwrap();
300        assert!(ts > Timestamp::ZERO);
301        assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
302    }
303
304    #[test]
305    fn test_snapshot_is_isolated_from_later_commits() {
306        let db = Db::new();
307        let mut tx = db.begin();
308        tx.put(b"k".to_vec(), b"v1".to_vec());
309        let _ = tx.commit().unwrap();
310
311        let snap = db.snapshot();
312        let mut tx = db.begin();
313        tx.put(b"k".to_vec(), b"v2".to_vec());
314        let _ = tx.commit().unwrap();
315
316        assert_eq!(snap.get(b"k").unwrap().as_deref(), Some(&b"v1"[..]));
317    }
318
319    #[test]
320    fn test_write_write_conflict_aborts_later_committer() {
321        let db = Db::new();
322        // Both transactions take the same empty snapshot.
323        let mut a = db.begin();
324        let mut b = db.begin();
325        a.put(b"k".to_vec(), b"a".to_vec());
326        b.put(b"k".to_vec(), b"b".to_vec());
327
328        assert!(a.commit().is_ok());
329        let err = b.commit().expect_err("second committer must lose");
330        assert!(err.is_retryable());
331        // First committer's value stands.
332        assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"a"[..]));
333    }
334
335    #[test]
336    fn test_disjoint_keys_do_not_conflict() {
337        let db = Db::new();
338        let mut a = db.begin();
339        let mut b = db.begin();
340        a.put(b"a".to_vec(), b"1".to_vec());
341        b.put(b"b".to_vec(), b"2".to_vec());
342        assert!(a.commit().is_ok());
343        assert!(b.commit().is_ok());
344    }
345
346    #[test]
347    fn test_read_only_commit_returns_snapshot_timestamp() {
348        let db = Db::new();
349        let mut tx = db.begin();
350        tx.put(b"k".to_vec(), b"v".to_vec());
351        let ts = tx.commit().unwrap();
352
353        let ro = db.begin();
354        assert_eq!(ro.commit().unwrap(), ts);
355    }
356
357    #[test]
358    fn test_rollback_discards_writes() {
359        let db = Db::new();
360        let mut tx = db.begin();
361        tx.put(b"k".to_vec(), b"v".to_vec());
362        tx.rollback();
363        assert_eq!(db.begin().get(b"k").unwrap(), None);
364    }
365
366    #[test]
367    fn test_clone_shares_state() {
368        let db = Db::new();
369        let db2 = db.clone();
370        let mut tx = db.begin();
371        tx.put(b"k".to_vec(), b"v".to_vec());
372        let _ = tx.commit().unwrap();
373        assert_eq!(db2.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
374    }
375}