txn_db/txn.rs
1//! Transactions and snapshots — the read and write handles a [`Db`](crate::Db)
2//! hands out.
3//!
4//! A [`Transaction`] is the read-write unit of work. It takes a snapshot of the
5//! database when it begins, serves every read from that snapshot (plus its own
6//! uncommitted writes), buffers writes locally, and applies them atomically at
7//! [`commit`](Transaction::commit) — or discards them on
8//! [`rollback`](Transaction::rollback) or drop. Because reads come from a fixed
9//! snapshot, a transaction never blocks writers and is never blocked by them.
10//!
11//! A [`Snapshot`] is the read-only counterpart: a consistent, point-in-time
12//! view with no write buffer and nothing to commit. Use it when you only need
13//! to read several keys as of one instant.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use crate::db::Inner;
19use crate::error::Result;
20use crate::store::{MemoryStore, VersionStore};
21use crate::timestamp::Timestamp;
22
23/// A read-write transaction over a consistent snapshot of the database.
24///
25/// A transaction is created by [`Db::begin`](crate::Db::begin). It reads as of
26/// the snapshot timestamp captured at that moment, so concurrent commits by
27/// other transactions are invisible to it — this is snapshot isolation. Writes
28/// are buffered in the transaction and become visible to others only when
29/// [`commit`](Transaction::commit) succeeds; within the transaction, a read of a
30/// key it has written returns that pending write (read-your-own-writes).
31///
32/// At commit the database checks every written key for a write-write conflict:
33/// if another transaction committed a change to any of those keys after this
34/// transaction's snapshot, the commit is rejected with a retryable
35/// [`TxnError::Conflict`](crate::TxnError::Conflict) and none of the writes are
36/// applied. This is what prevents lost updates.
37///
38/// Dropping a transaction without committing discards its buffered writes; it
39/// is equivalent to [`rollback`](Transaction::rollback).
40///
41/// # Examples
42///
43/// ```
44/// use txn_db::Db;
45///
46/// let db = Db::new();
47///
48/// let mut tx = db.begin();
49/// tx.put(b"account:1".to_vec(), 100u64.to_le_bytes().to_vec());
50/// tx.put(b"account:2".to_vec(), 50u64.to_le_bytes().to_vec());
51/// let commit_ts = tx.commit()?;
52///
53/// // A fresh transaction sees the committed state.
54/// let tx = db.begin();
55/// assert!(tx.get(b"account:1")?.is_some());
56/// assert!(commit_ts > txn_db::Timestamp::ZERO);
57/// # Ok::<(), txn_db::TxnError>(())
58/// ```
59#[must_use = "a transaction buffers writes that are discarded unless it is committed"]
60pub struct Transaction<S: VersionStore = MemoryStore> {
61 inner: Arc<Inner<S>>,
62 read_ts: Timestamp,
63 writes: HashMap<Arc<[u8]>, Option<Arc<[u8]>>>,
64}
65
66impl<S: VersionStore> Transaction<S> {
67 /// Construct a transaction over `inner` reading at `read_ts`.
68 pub(crate) fn new(inner: Arc<Inner<S>>, read_ts: Timestamp) -> Self {
69 Transaction {
70 inner,
71 read_ts,
72 writes: HashMap::new(),
73 }
74 }
75
76 /// The snapshot timestamp this transaction reads at.
77 ///
78 /// Every read that is not served from the transaction's own write buffer
79 /// observes the database as of this timestamp.
80 ///
81 /// # Examples
82 ///
83 /// ```
84 /// use txn_db::Db;
85 ///
86 /// let db = Db::new();
87 /// let tx = db.begin();
88 /// // Nothing has committed yet, so the snapshot is the empty database.
89 /// assert_eq!(tx.read_timestamp(), txn_db::Timestamp::ZERO);
90 /// ```
91 #[inline]
92 #[must_use]
93 pub fn read_timestamp(&self) -> Timestamp {
94 self.read_ts
95 }
96
97 /// Read the value of `key` as this transaction sees it.
98 ///
99 /// If the transaction has written `key`, the pending write is returned
100 /// (read-your-own-writes), including `None` if it has deleted the key.
101 /// Otherwise the value is read from the transaction's snapshot: the newest
102 /// version committed at or before the snapshot timestamp, or `None` if the
103 /// key does not exist as of the snapshot.
104 ///
105 /// # Errors
106 ///
107 /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing
108 /// [`VersionStore`](crate::VersionStore) fails the read. The default
109 /// in-memory store never fails.
110 ///
111 /// # Examples
112 ///
113 /// ```
114 /// use txn_db::Db;
115 ///
116 /// let db = Db::new();
117 /// let mut tx = db.begin();
118 ///
119 /// assert_eq!(tx.get(b"k")?, None); // absent
120 /// tx.put(b"k".to_vec(), b"v".to_vec());
121 /// assert_eq!(tx.get(b"k")?.as_deref(), Some(&b"v"[..])); // its own write
122 /// tx.delete(b"k".to_vec());
123 /// assert_eq!(tx.get(b"k")?, None); // its own delete
124 /// # Ok::<(), txn_db::TxnError>(())
125 /// ```
126 pub fn get(&self, key: &[u8]) -> Result<Option<Arc<[u8]>>> {
127 if let Some(pending) = self.writes.get(key) {
128 return Ok(pending.clone());
129 }
130 self.inner.store.get(key, self.read_ts)
131 }
132
133 /// Buffer a write of `value` to `key`, to be applied at commit.
134 ///
135 /// The write is local to this transaction until [`commit`](Self::commit)
136 /// succeeds; other transactions do not see it. Writing the same key twice
137 /// keeps the last value. Both arguments accept anything convertible into an
138 /// `Arc<[u8]>` — passing an owned `Vec<u8>` or `Arc<[u8]>` moves it in
139 /// without copying the bytes.
140 ///
141 /// # Examples
142 ///
143 /// ```
144 /// use txn_db::Db;
145 ///
146 /// let db = Db::new();
147 /// let mut tx = db.begin();
148 /// tx.put(b"city".to_vec(), b"oslo".to_vec());
149 /// tx.put(b"city".to_vec(), b"bergen".to_vec()); // overwrites within the txn
150 /// assert_eq!(tx.get(b"city")?.as_deref(), Some(&b"bergen"[..]));
151 /// # Ok::<(), txn_db::TxnError>(())
152 /// ```
153 pub fn put(&mut self, key: impl Into<Arc<[u8]>>, value: impl Into<Arc<[u8]>>) {
154 let _ = self.writes.insert(key.into(), Some(value.into()));
155 }
156
157 /// Buffer a delete of `key`, to be applied at commit.
158 ///
159 /// After this call the transaction reads `key` as absent. At commit a
160 /// tombstone is written so that snapshots taken after the commit also see
161 /// the key as absent. Deleting a key that does not exist is a no-op that
162 /// still participates in conflict detection, so a delete races other
163 /// writers the same way a `put` does.
164 ///
165 /// # Examples
166 ///
167 /// ```
168 /// use txn_db::Db;
169 ///
170 /// let db = Db::new();
171 /// let mut setup = db.begin();
172 /// setup.put(b"k".to_vec(), b"v".to_vec());
173 /// setup.commit()?;
174 ///
175 /// let mut tx = db.begin();
176 /// tx.delete(b"k".to_vec());
177 /// tx.commit()?;
178 ///
179 /// assert_eq!(db.begin().get(b"k")?, None);
180 /// # Ok::<(), txn_db::TxnError>(())
181 /// ```
182 pub fn delete(&mut self, key: impl Into<Arc<[u8]>>) {
183 let _ = self.writes.insert(key.into(), None);
184 }
185
186 /// Commit the transaction, applying all buffered writes atomically.
187 ///
188 /// On success every buffered write becomes visible to transactions that
189 /// begin afterward, and the commit timestamp is returned. A transaction
190 /// that buffered no writes commits trivially and returns its snapshot
191 /// timestamp without allocating a new one.
192 ///
193 /// # Errors
194 ///
195 /// Returns [`TxnError::Conflict`](crate::TxnError::Conflict) — which is
196 /// retryable — if any written key was changed by another transaction that
197 /// committed after this one's snapshot; in that case no writes are applied.
198 /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing store
199 /// fails to apply the batch.
200 ///
201 /// # Examples
202 ///
203 /// ```
204 /// use txn_db::Db;
205 ///
206 /// let db = Db::new();
207 /// let mut tx = db.begin();
208 /// tx.put(b"k".to_vec(), b"v".to_vec());
209 /// let ts = tx.commit()?;
210 /// assert!(ts > txn_db::Timestamp::ZERO);
211 /// # Ok::<(), txn_db::TxnError>(())
212 /// ```
213 pub fn commit(self) -> Result<Timestamp> {
214 if self.writes.is_empty() {
215 return Ok(self.read_ts);
216 }
217 self.inner.commit_writes(self.read_ts, self.writes)
218 }
219
220 /// Discard the transaction and all of its buffered writes.
221 ///
222 /// This is explicit; simply dropping the transaction has the same effect.
223 /// Rolling back never fails and never touches the shared store.
224 ///
225 /// # Examples
226 ///
227 /// ```
228 /// use txn_db::Db;
229 ///
230 /// let db = Db::new();
231 /// let mut tx = db.begin();
232 /// tx.put(b"k".to_vec(), b"v".to_vec());
233 /// tx.rollback();
234 ///
235 /// // The write never reached the database.
236 /// assert_eq!(db.begin().get(b"k")?, None);
237 /// # Ok::<(), txn_db::TxnError>(())
238 /// ```
239 #[inline]
240 pub fn rollback(self) {
241 // Dropping `self` releases the buffered writes; this method documents
242 // the intent and consumes the transaction so it cannot be used again.
243 }
244}
245
246/// A read-only, point-in-time view of the database.
247///
248/// A snapshot is created by [`Db::snapshot`](crate::Db::snapshot) and reads as
249/// of the moment it was taken. It has no write buffer and nothing to commit, so
250/// it is cheaper than a transaction when all you need is to read several keys at
251/// one consistent instant. Multiple snapshots and transactions coexist without
252/// blocking each other.
253///
254/// # Examples
255///
256/// ```
257/// use txn_db::Db;
258///
259/// let db = Db::new();
260/// let mut tx = db.begin();
261/// tx.put(b"k".to_vec(), b"v1".to_vec());
262/// tx.commit()?;
263///
264/// // Capture a snapshot, then change the database.
265/// let snap = db.snapshot();
266/// let mut tx = db.begin();
267/// tx.put(b"k".to_vec(), b"v2".to_vec());
268/// tx.commit()?;
269///
270/// // The snapshot still sees the value as of when it was taken.
271/// assert_eq!(snap.get(b"k")?.as_deref(), Some(&b"v1"[..]));
272/// assert_eq!(db.snapshot().get(b"k")?.as_deref(), Some(&b"v2"[..]));
273/// # Ok::<(), txn_db::TxnError>(())
274/// ```
275pub struct Snapshot<S: VersionStore = MemoryStore> {
276 inner: Arc<Inner<S>>,
277 read_ts: Timestamp,
278}
279
280impl<S: VersionStore> Snapshot<S> {
281 /// Construct a snapshot over `inner` reading at `read_ts`.
282 pub(crate) fn new(inner: Arc<Inner<S>>, read_ts: Timestamp) -> Self {
283 Snapshot { inner, read_ts }
284 }
285
286 /// The timestamp this snapshot reads at.
287 ///
288 /// # Examples
289 ///
290 /// ```
291 /// use txn_db::Db;
292 ///
293 /// let db = Db::new();
294 /// assert_eq!(db.snapshot().read_timestamp(), txn_db::Timestamp::ZERO);
295 /// ```
296 #[inline]
297 #[must_use]
298 pub fn read_timestamp(&self) -> Timestamp {
299 self.read_ts
300 }
301
302 /// Read the value of `key` as of this snapshot.
303 ///
304 /// Returns the newest version committed at or before the snapshot
305 /// timestamp, or `None` if the key does not exist as of that instant.
306 ///
307 /// # Errors
308 ///
309 /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing store
310 /// fails the read. The default in-memory store never fails.
311 ///
312 /// # Examples
313 ///
314 /// ```
315 /// use txn_db::Db;
316 ///
317 /// let db = Db::new();
318 /// assert_eq!(db.snapshot().get(b"missing")?, None);
319 /// # Ok::<(), txn_db::TxnError>(())
320 /// ```
321 pub fn get(&self, key: &[u8]) -> Result<Option<Arc<[u8]>>> {
322 self.inner.store.get(key, self.read_ts)
323 }
324}