Skip to main content

txn_db/
txn.rs

1//! Transactions and snapshots — the read and write handles a [`Db`](crate::Db)
2//! hands out.
3//!
4//! A [`Transaction`] is the read-write unit of work. It takes a snapshot of the
5//! database when it begins, serves every read from that snapshot (plus its own
6//! uncommitted writes), buffers writes locally, and applies them atomically at
7//! [`commit`](Transaction::commit) — or discards them on
8//! [`rollback`](Transaction::rollback) or drop. Because reads come from a fixed
9//! snapshot, a transaction never blocks writers and is never blocked by them.
10//!
11//! Transactions run under snapshot isolation by default. A serializable
12//! transaction (from [`Db::begin_serializable`](crate::Db::begin_serializable),
13//! behind the `serializable` feature) additionally records every key it reads so
14//! that the read set can be validated at commit; that is the only behavioral
15//! difference, and it is invisible to a snapshot-isolation transaction.
16//!
17//! A [`Snapshot`] is the read-only counterpart: a consistent, point-in-time
18//! view with no write buffer and nothing to commit.
19
20use std::cell::RefCell;
21use std::collections::{HashMap, HashSet};
22use std::sync::Arc;
23
24use crate::db::Inner;
25use crate::error::Result;
26use crate::store::{MemoryStore, VersionStore};
27use crate::timestamp::Timestamp;
28
29/// A read-write transaction over a consistent snapshot of the database.
30///
31/// A transaction is created by [`Db::begin`](crate::Db::begin) (snapshot
32/// isolation) or `Db::begin_serializable` (serializable, with the
33/// `serializable` feature). It reads as of the snapshot timestamp captured at
34/// that moment, so concurrent commits by other transactions are invisible to it.
35/// Writes are buffered in the transaction and become visible to others only when
36/// [`commit`](Transaction::commit) succeeds; within the transaction, a read of a
37/// key it has written returns that pending write (read-your-own-writes).
38///
39/// At commit the database checks every written key for a write-write conflict:
40/// if another transaction committed a change to any of those keys after this
41/// transaction's snapshot, the commit is rejected with a retryable
42/// [`TxnError::Conflict`](crate::TxnError::Conflict) and none of the writes are
43/// applied. A serializable transaction also validates its read set, rejecting
44/// commits whose reads are no longer current.
45///
46/// Dropping a transaction without committing discards its buffered writes; it
47/// is equivalent to [`rollback`](Transaction::rollback).
48///
49/// # Examples
50///
51/// ```
52/// use txn_db::Db;
53///
54/// let db = Db::new();
55///
56/// let mut tx = db.begin();
57/// tx.put(b"account:1".to_vec(), 100u64.to_le_bytes().to_vec());
58/// tx.put(b"account:2".to_vec(), 50u64.to_le_bytes().to_vec());
59/// let commit_ts = tx.commit()?;
60///
61/// // A fresh transaction sees the committed state.
62/// let tx = db.begin();
63/// assert!(tx.get(b"account:1")?.is_some());
64/// assert!(commit_ts > txn_db::Timestamp::ZERO);
65/// # Ok::<(), txn_db::TxnError>(())
66/// ```
67#[must_use = "a transaction buffers writes that are discarded unless it is committed"]
68pub struct Transaction<S: VersionStore = MemoryStore> {
69    inner: Arc<Inner<S>>,
70    read_ts: Timestamp,
71    writes: HashMap<Arc<[u8]>, Option<Arc<[u8]>>>,
72    /// The set of keys read from the snapshot, tracked only for serializable
73    /// transactions. `None` under snapshot isolation, where reads are not
74    /// validated. Interior mutability lets [`get`](Self::get) record reads
75    /// through a shared reference.
76    reads: Option<RefCell<HashSet<Arc<[u8]>>>>,
77}
78
79impl<S: VersionStore> Transaction<S> {
80    /// Construct a transaction over `inner` reading at `read_ts`. When
81    /// `serializable` is set the transaction records its read set for validation
82    /// at commit.
83    pub(crate) fn new(inner: Arc<Inner<S>>, read_ts: Timestamp, serializable: bool) -> Self {
84        Transaction {
85            inner,
86            read_ts,
87            writes: HashMap::new(),
88            reads: serializable.then(|| RefCell::new(HashSet::new())),
89        }
90    }
91
92    /// The snapshot timestamp this transaction reads at.
93    ///
94    /// Every read that is not served from the transaction's own write buffer
95    /// observes the database as of this timestamp.
96    ///
97    /// # Examples
98    ///
99    /// ```
100    /// use txn_db::Db;
101    ///
102    /// let db = Db::new();
103    /// let tx = db.begin();
104    /// // Nothing has committed yet, so the snapshot is the empty database.
105    /// assert_eq!(tx.read_timestamp(), txn_db::Timestamp::ZERO);
106    /// ```
107    #[inline]
108    #[must_use]
109    pub fn read_timestamp(&self) -> Timestamp {
110        self.read_ts
111    }
112
113    /// Read the value of `key` as this transaction sees it.
114    ///
115    /// If the transaction has written `key`, the pending write is returned
116    /// (read-your-own-writes), including `None` if it has deleted the key.
117    /// Otherwise the value is read from the transaction's snapshot: the newest
118    /// version committed at or before the snapshot timestamp, or `None` if the
119    /// key does not exist as of the snapshot. For a serializable transaction the
120    /// key is recorded in the read set for validation at commit.
121    ///
122    /// # Errors
123    ///
124    /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing
125    /// [`VersionStore`](crate::VersionStore) fails the read. The default
126    /// in-memory store never fails.
127    ///
128    /// # Examples
129    ///
130    /// ```
131    /// use txn_db::Db;
132    ///
133    /// let db = Db::new();
134    /// let mut tx = db.begin();
135    ///
136    /// assert_eq!(tx.get(b"k")?, None);          // absent
137    /// tx.put(b"k".to_vec(), b"v".to_vec());
138    /// assert_eq!(tx.get(b"k")?.as_deref(), Some(&b"v"[..])); // its own write
139    /// tx.delete(b"k".to_vec());
140    /// assert_eq!(tx.get(b"k")?, None);          // its own delete
141    /// # Ok::<(), txn_db::TxnError>(())
142    /// ```
143    pub fn get(&self, key: &[u8]) -> Result<Option<Arc<[u8]>>> {
144        if let Some(pending) = self.writes.get(key) {
145            return Ok(pending.clone());
146        }
147        let value = self.inner.store.get(key, self.read_ts)?;
148        // A serializable transaction records the key — present or absent — so a
149        // later writer to it is caught at commit.
150        if let Some(reads) = &self.reads {
151            let _ = reads.borrow_mut().insert(Arc::from(key));
152        }
153        Ok(value)
154    }
155
156    /// Buffer a write of `value` to `key`, to be applied at commit.
157    ///
158    /// The write is local to this transaction until [`commit`](Self::commit)
159    /// succeeds; other transactions do not see it. Writing the same key twice
160    /// keeps the last value. Both arguments accept anything convertible into an
161    /// `Arc<[u8]>` — passing an owned `Vec<u8>` or `Arc<[u8]>` moves it in
162    /// without copying the bytes.
163    ///
164    /// # Examples
165    ///
166    /// ```
167    /// use txn_db::Db;
168    ///
169    /// let db = Db::new();
170    /// let mut tx = db.begin();
171    /// tx.put(b"city".to_vec(), b"oslo".to_vec());
172    /// tx.put(b"city".to_vec(), b"bergen".to_vec()); // overwrites within the txn
173    /// assert_eq!(tx.get(b"city")?.as_deref(), Some(&b"bergen"[..]));
174    /// # Ok::<(), txn_db::TxnError>(())
175    /// ```
176    pub fn put(&mut self, key: impl Into<Arc<[u8]>>, value: impl Into<Arc<[u8]>>) {
177        let _ = self.writes.insert(key.into(), Some(value.into()));
178    }
179
180    /// Buffer a delete of `key`, to be applied at commit.
181    ///
182    /// After this call the transaction reads `key` as absent. At commit a
183    /// tombstone is written so that snapshots taken after the commit also see
184    /// the key as absent. Deleting a key that does not exist is a no-op that
185    /// still participates in conflict detection, so a delete races other
186    /// writers the same way a `put` does.
187    ///
188    /// # Examples
189    ///
190    /// ```
191    /// use txn_db::Db;
192    ///
193    /// let db = Db::new();
194    /// let mut setup = db.begin();
195    /// setup.put(b"k".to_vec(), b"v".to_vec());
196    /// setup.commit()?;
197    ///
198    /// let mut tx = db.begin();
199    /// tx.delete(b"k".to_vec());
200    /// tx.commit()?;
201    ///
202    /// assert_eq!(db.begin().get(b"k")?, None);
203    /// # Ok::<(), txn_db::TxnError>(())
204    /// ```
205    pub fn delete(&mut self, key: impl Into<Arc<[u8]>>) {
206        let _ = self.writes.insert(key.into(), None);
207    }
208
209    /// Commit the transaction, applying all buffered writes atomically.
210    ///
211    /// On success every buffered write becomes visible to transactions that
212    /// begin afterward, and the commit timestamp is returned. A transaction
213    /// that buffered no writes commits trivially and returns its snapshot
214    /// timestamp without allocating a new one — including a serializable
215    /// read-only transaction, which has observed a consistent snapshot and needs
216    /// no validation.
217    ///
218    /// # Errors
219    ///
220    /// Returns [`TxnError::Conflict`](crate::TxnError::Conflict) — which is
221    /// retryable — if any written key was changed by another transaction that
222    /// committed after this one's snapshot, or, for a serializable transaction,
223    /// if any key it read has since changed. In either case no writes are
224    /// applied. Returns [`TxnError::Store`](crate::TxnError::Store) if the
225    /// backing store fails to apply the batch.
226    ///
227    /// # Examples
228    ///
229    /// ```
230    /// use txn_db::Db;
231    ///
232    /// let db = Db::new();
233    /// let mut tx = db.begin();
234    /// tx.put(b"k".to_vec(), b"v".to_vec());
235    /// let ts = tx.commit()?;
236    /// assert!(ts > txn_db::Timestamp::ZERO);
237    /// # Ok::<(), txn_db::TxnError>(())
238    /// ```
239    pub fn commit(self) -> Result<Timestamp> {
240        if self.writes.is_empty() {
241            return Ok(self.read_ts);
242        }
243        // The read set, minus keys also in the write set (those are covered by
244        // the write-write check). Empty for snapshot-isolation transactions.
245        let reads: Vec<Arc<[u8]>> = match self.reads {
246            Some(set) => set
247                .into_inner()
248                .into_iter()
249                .filter(|key| !self.writes.contains_key(key))
250                .collect(),
251            None => Vec::new(),
252        };
253        let batch = self.writes.into_iter().collect();
254        self.inner.commit_writes(self.read_ts, batch, &reads)
255    }
256
257    /// Discard the transaction and all of its buffered writes.
258    ///
259    /// This is explicit; simply dropping the transaction has the same effect.
260    /// Rolling back never fails and never touches the shared store.
261    ///
262    /// # Examples
263    ///
264    /// ```
265    /// use txn_db::Db;
266    ///
267    /// let db = Db::new();
268    /// let mut tx = db.begin();
269    /// tx.put(b"k".to_vec(), b"v".to_vec());
270    /// tx.rollback();
271    ///
272    /// // The write never reached the database.
273    /// assert_eq!(db.begin().get(b"k")?, None);
274    /// # Ok::<(), txn_db::TxnError>(())
275    /// ```
276    #[inline]
277    pub fn rollback(self) {
278        // Dropping `self` releases the buffered writes; this method documents
279        // the intent and consumes the transaction so it cannot be used again.
280    }
281}
282
283/// A read-only, point-in-time view of the database.
284///
285/// A snapshot is created by [`Db::snapshot`](crate::Db::snapshot) and reads as
286/// of the moment it was taken. It has no write buffer and nothing to commit, so
287/// it is cheaper than a transaction when all you need is to read several keys at
288/// one consistent instant. Multiple snapshots and transactions coexist without
289/// blocking each other.
290///
291/// # Examples
292///
293/// ```
294/// use txn_db::Db;
295///
296/// let db = Db::new();
297/// let mut tx = db.begin();
298/// tx.put(b"k".to_vec(), b"v1".to_vec());
299/// tx.commit()?;
300///
301/// // Capture a snapshot, then change the database.
302/// let snap = db.snapshot();
303/// let mut tx = db.begin();
304/// tx.put(b"k".to_vec(), b"v2".to_vec());
305/// tx.commit()?;
306///
307/// // The snapshot still sees the value as of when it was taken.
308/// assert_eq!(snap.get(b"k")?.as_deref(), Some(&b"v1"[..]));
309/// assert_eq!(db.snapshot().get(b"k")?.as_deref(), Some(&b"v2"[..]));
310/// # Ok::<(), txn_db::TxnError>(())
311/// ```
312pub struct Snapshot<S: VersionStore = MemoryStore> {
313    inner: Arc<Inner<S>>,
314    read_ts: Timestamp,
315}
316
317impl<S: VersionStore> Snapshot<S> {
318    /// Construct a snapshot over `inner` reading at `read_ts`.
319    pub(crate) fn new(inner: Arc<Inner<S>>, read_ts: Timestamp) -> Self {
320        Snapshot { inner, read_ts }
321    }
322
323    /// The timestamp this snapshot reads at.
324    ///
325    /// # Examples
326    ///
327    /// ```
328    /// use txn_db::Db;
329    ///
330    /// let db = Db::new();
331    /// assert_eq!(db.snapshot().read_timestamp(), txn_db::Timestamp::ZERO);
332    /// ```
333    #[inline]
334    #[must_use]
335    pub fn read_timestamp(&self) -> Timestamp {
336        self.read_ts
337    }
338
339    /// Read the value of `key` as of this snapshot.
340    ///
341    /// Returns the newest version committed at or before the snapshot
342    /// timestamp, or `None` if the key does not exist as of that instant.
343    ///
344    /// # Errors
345    ///
346    /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing store
347    /// fails the read. The default in-memory store never fails.
348    ///
349    /// # Examples
350    ///
351    /// ```
352    /// use txn_db::Db;
353    ///
354    /// let db = Db::new();
355    /// assert_eq!(db.snapshot().get(b"missing")?, None);
356    /// # Ok::<(), txn_db::TxnError>(())
357    /// ```
358    pub fn get(&self, key: &[u8]) -> Result<Option<Arc<[u8]>>> {
359        self.inner.store.get(key, self.read_ts)
360    }
361}