Skip to main content

txn_db/
txn.rs

1//! Transactions and snapshots — the read and write handles a [`Db`](crate::Db)
2//! hands out.
3//!
4//! A [`Transaction`] is the read-write unit of work. It takes a snapshot of the
5//! database when it begins, serves every read from that snapshot (plus its own
6//! uncommitted writes), buffers writes locally, and applies them atomically at
7//! [`commit`](Transaction::commit) — or discards them on
8//! [`rollback`](Transaction::rollback) or drop. Because reads come from a fixed
9//! snapshot, a transaction never blocks writers and is never blocked by them.
10//!
11//! A [`Snapshot`] is the read-only counterpart: a consistent, point-in-time
12//! view with no write buffer and nothing to commit. Use it when you only need
13//! to read several keys as of one instant.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use crate::db::Inner;
19use crate::error::Result;
20use crate::store::{MemoryStore, VersionStore};
21use crate::timestamp::Timestamp;
22
23/// A read-write transaction over a consistent snapshot of the database.
24///
25/// A transaction is created by [`Db::begin`](crate::Db::begin). It reads as of
26/// the snapshot timestamp captured at that moment, so concurrent commits by
27/// other transactions are invisible to it — this is snapshot isolation. Writes
28/// are buffered in the transaction and become visible to others only when
29/// [`commit`](Transaction::commit) succeeds; within the transaction, a read of a
30/// key it has written returns that pending write (read-your-own-writes).
31///
32/// At commit the database checks every written key for a write-write conflict:
33/// if another transaction committed a change to any of those keys after this
34/// transaction's snapshot, the commit is rejected with a retryable
35/// [`TxnError::Conflict`](crate::TxnError::Conflict) and none of the writes are
36/// applied. This is what prevents lost updates.
37///
38/// Dropping a transaction without committing discards its buffered writes; it
39/// is equivalent to [`rollback`](Transaction::rollback).
40///
41/// # Examples
42///
43/// ```
44/// use txn_db::Db;
45///
46/// let db = Db::new();
47///
48/// let mut tx = db.begin();
49/// tx.put(b"account:1".to_vec(), 100u64.to_le_bytes().to_vec());
50/// tx.put(b"account:2".to_vec(), 50u64.to_le_bytes().to_vec());
51/// let commit_ts = tx.commit()?;
52///
53/// // A fresh transaction sees the committed state.
54/// let tx = db.begin();
55/// assert!(tx.get(b"account:1")?.is_some());
56/// assert!(commit_ts > txn_db::Timestamp::ZERO);
57/// # Ok::<(), txn_db::TxnError>(())
58/// ```
59#[must_use = "a transaction buffers writes that are discarded unless it is committed"]
60pub struct Transaction<S: VersionStore = MemoryStore> {
61    inner: Arc<Inner<S>>,
62    read_ts: Timestamp,
63    writes: HashMap<Arc<[u8]>, Option<Arc<[u8]>>>,
64}
65
66impl<S: VersionStore> Transaction<S> {
67    /// Construct a transaction over `inner` reading at `read_ts`.
68    pub(crate) fn new(inner: Arc<Inner<S>>, read_ts: Timestamp) -> Self {
69        Transaction {
70            inner,
71            read_ts,
72            writes: HashMap::new(),
73        }
74    }
75
76    /// The snapshot timestamp this transaction reads at.
77    ///
78    /// Every read that is not served from the transaction's own write buffer
79    /// observes the database as of this timestamp.
80    ///
81    /// # Examples
82    ///
83    /// ```
84    /// use txn_db::Db;
85    ///
86    /// let db = Db::new();
87    /// let tx = db.begin();
88    /// // Nothing has committed yet, so the snapshot is the empty database.
89    /// assert_eq!(tx.read_timestamp(), txn_db::Timestamp::ZERO);
90    /// ```
91    #[inline]
92    #[must_use]
93    pub fn read_timestamp(&self) -> Timestamp {
94        self.read_ts
95    }
96
97    /// Read the value of `key` as this transaction sees it.
98    ///
99    /// If the transaction has written `key`, the pending write is returned
100    /// (read-your-own-writes), including `None` if it has deleted the key.
101    /// Otherwise the value is read from the transaction's snapshot: the newest
102    /// version committed at or before the snapshot timestamp, or `None` if the
103    /// key does not exist as of the snapshot.
104    ///
105    /// # Errors
106    ///
107    /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing
108    /// [`VersionStore`](crate::VersionStore) fails the read. The default
109    /// in-memory store never fails.
110    ///
111    /// # Examples
112    ///
113    /// ```
114    /// use txn_db::Db;
115    ///
116    /// let db = Db::new();
117    /// let mut tx = db.begin();
118    ///
119    /// assert_eq!(tx.get(b"k")?, None);          // absent
120    /// tx.put(b"k".to_vec(), b"v".to_vec());
121    /// assert_eq!(tx.get(b"k")?.as_deref(), Some(&b"v"[..])); // its own write
122    /// tx.delete(b"k".to_vec());
123    /// assert_eq!(tx.get(b"k")?, None);          // its own delete
124    /// # Ok::<(), txn_db::TxnError>(())
125    /// ```
126    pub fn get(&self, key: &[u8]) -> Result<Option<Arc<[u8]>>> {
127        if let Some(pending) = self.writes.get(key) {
128            return Ok(pending.clone());
129        }
130        self.inner.store.get(key, self.read_ts)
131    }
132
133    /// Buffer a write of `value` to `key`, to be applied at commit.
134    ///
135    /// The write is local to this transaction until [`commit`](Self::commit)
136    /// succeeds; other transactions do not see it. Writing the same key twice
137    /// keeps the last value. Both arguments accept anything convertible into an
138    /// `Arc<[u8]>` — passing an owned `Vec<u8>` or `Arc<[u8]>` moves it in
139    /// without copying the bytes.
140    ///
141    /// # Examples
142    ///
143    /// ```
144    /// use txn_db::Db;
145    ///
146    /// let db = Db::new();
147    /// let mut tx = db.begin();
148    /// tx.put(b"city".to_vec(), b"oslo".to_vec());
149    /// tx.put(b"city".to_vec(), b"bergen".to_vec()); // overwrites within the txn
150    /// assert_eq!(tx.get(b"city")?.as_deref(), Some(&b"bergen"[..]));
151    /// # Ok::<(), txn_db::TxnError>(())
152    /// ```
153    pub fn put(&mut self, key: impl Into<Arc<[u8]>>, value: impl Into<Arc<[u8]>>) {
154        let _ = self.writes.insert(key.into(), Some(value.into()));
155    }
156
157    /// Buffer a delete of `key`, to be applied at commit.
158    ///
159    /// After this call the transaction reads `key` as absent. At commit a
160    /// tombstone is written so that snapshots taken after the commit also see
161    /// the key as absent. Deleting a key that does not exist is a no-op that
162    /// still participates in conflict detection, so a delete races other
163    /// writers the same way a `put` does.
164    ///
165    /// # Examples
166    ///
167    /// ```
168    /// use txn_db::Db;
169    ///
170    /// let db = Db::new();
171    /// let mut setup = db.begin();
172    /// setup.put(b"k".to_vec(), b"v".to_vec());
173    /// setup.commit()?;
174    ///
175    /// let mut tx = db.begin();
176    /// tx.delete(b"k".to_vec());
177    /// tx.commit()?;
178    ///
179    /// assert_eq!(db.begin().get(b"k")?, None);
180    /// # Ok::<(), txn_db::TxnError>(())
181    /// ```
182    pub fn delete(&mut self, key: impl Into<Arc<[u8]>>) {
183        let _ = self.writes.insert(key.into(), None);
184    }
185
186    /// Commit the transaction, applying all buffered writes atomically.
187    ///
188    /// On success every buffered write becomes visible to transactions that
189    /// begin afterward, and the commit timestamp is returned. A transaction
190    /// that buffered no writes commits trivially and returns its snapshot
191    /// timestamp without allocating a new one.
192    ///
193    /// # Errors
194    ///
195    /// Returns [`TxnError::Conflict`](crate::TxnError::Conflict) — which is
196    /// retryable — if any written key was changed by another transaction that
197    /// committed after this one's snapshot; in that case no writes are applied.
198    /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing store
199    /// fails to apply the batch.
200    ///
201    /// # Examples
202    ///
203    /// ```
204    /// use txn_db::Db;
205    ///
206    /// let db = Db::new();
207    /// let mut tx = db.begin();
208    /// tx.put(b"k".to_vec(), b"v".to_vec());
209    /// let ts = tx.commit()?;
210    /// assert!(ts > txn_db::Timestamp::ZERO);
211    /// # Ok::<(), txn_db::TxnError>(())
212    /// ```
213    pub fn commit(self) -> Result<Timestamp> {
214        if self.writes.is_empty() {
215            return Ok(self.read_ts);
216        }
217        self.inner.commit_writes(self.read_ts, self.writes)
218    }
219
220    /// Discard the transaction and all of its buffered writes.
221    ///
222    /// This is explicit; simply dropping the transaction has the same effect.
223    /// Rolling back never fails and never touches the shared store.
224    ///
225    /// # Examples
226    ///
227    /// ```
228    /// use txn_db::Db;
229    ///
230    /// let db = Db::new();
231    /// let mut tx = db.begin();
232    /// tx.put(b"k".to_vec(), b"v".to_vec());
233    /// tx.rollback();
234    ///
235    /// // The write never reached the database.
236    /// assert_eq!(db.begin().get(b"k")?, None);
237    /// # Ok::<(), txn_db::TxnError>(())
238    /// ```
239    #[inline]
240    pub fn rollback(self) {
241        // Dropping `self` releases the buffered writes; this method documents
242        // the intent and consumes the transaction so it cannot be used again.
243    }
244}
245
246/// A read-only, point-in-time view of the database.
247///
248/// A snapshot is created by [`Db::snapshot`](crate::Db::snapshot) and reads as
249/// of the moment it was taken. It has no write buffer and nothing to commit, so
250/// it is cheaper than a transaction when all you need is to read several keys at
251/// one consistent instant. Multiple snapshots and transactions coexist without
252/// blocking each other.
253///
254/// # Examples
255///
256/// ```
257/// use txn_db::Db;
258///
259/// let db = Db::new();
260/// let mut tx = db.begin();
261/// tx.put(b"k".to_vec(), b"v1".to_vec());
262/// tx.commit()?;
263///
264/// // Capture a snapshot, then change the database.
265/// let snap = db.snapshot();
266/// let mut tx = db.begin();
267/// tx.put(b"k".to_vec(), b"v2".to_vec());
268/// tx.commit()?;
269///
270/// // The snapshot still sees the value as of when it was taken.
271/// assert_eq!(snap.get(b"k")?.as_deref(), Some(&b"v1"[..]));
272/// assert_eq!(db.snapshot().get(b"k")?.as_deref(), Some(&b"v2"[..]));
273/// # Ok::<(), txn_db::TxnError>(())
274/// ```
275pub struct Snapshot<S: VersionStore = MemoryStore> {
276    inner: Arc<Inner<S>>,
277    read_ts: Timestamp,
278}
279
280impl<S: VersionStore> Snapshot<S> {
281    /// Construct a snapshot over `inner` reading at `read_ts`.
282    pub(crate) fn new(inner: Arc<Inner<S>>, read_ts: Timestamp) -> Self {
283        Snapshot { inner, read_ts }
284    }
285
286    /// The timestamp this snapshot reads at.
287    ///
288    /// # Examples
289    ///
290    /// ```
291    /// use txn_db::Db;
292    ///
293    /// let db = Db::new();
294    /// assert_eq!(db.snapshot().read_timestamp(), txn_db::Timestamp::ZERO);
295    /// ```
296    #[inline]
297    #[must_use]
298    pub fn read_timestamp(&self) -> Timestamp {
299        self.read_ts
300    }
301
302    /// Read the value of `key` as of this snapshot.
303    ///
304    /// Returns the newest version committed at or before the snapshot
305    /// timestamp, or `None` if the key does not exist as of that instant.
306    ///
307    /// # Errors
308    ///
309    /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing store
310    /// fails the read. The default in-memory store never fails.
311    ///
312    /// # Examples
313    ///
314    /// ```
315    /// use txn_db::Db;
316    ///
317    /// let db = Db::new();
318    /// assert_eq!(db.snapshot().get(b"missing")?, None);
319    /// # Ok::<(), txn_db::TxnError>(())
320    /// ```
321    pub fn get(&self, key: &[u8]) -> Result<Option<Arc<[u8]>>> {
322        self.inner.store.get(key, self.read_ts)
323    }
324}