txn_db/txn.rs
1//! Transactions and snapshots — the read and write handles a [`Db`](crate::Db)
2//! hands out.
3//!
4//! A [`Transaction`] is the read-write unit of work. It takes a snapshot of the
5//! database when it begins, serves every read from that snapshot (plus its own
6//! uncommitted writes), buffers writes locally, and applies them atomically at
7//! [`commit`](Transaction::commit) — or discards them on
8//! [`rollback`](Transaction::rollback) or drop. Because reads come from a fixed
9//! snapshot, a transaction never blocks writers and is never blocked by them.
10//!
11//! Transactions run under snapshot isolation by default. A serializable
12//! transaction (from [`Db::begin_serializable`](crate::Db::begin_serializable),
13//! behind the `serializable` feature) additionally records every key it reads so
14//! that the read set can be validated at commit.
15//!
16//! Both a transaction and a [`Snapshot`] hold an [`ActiveReader`], which
17//! registers the read timestamp with the database while the handle is alive and
18//! unregisters it on drop. That registry is what lets garbage collection know
19//! the oldest snapshot still in use, so it never reclaims a version a live reader
20//! can still observe.
21
22use std::cell::RefCell;
23use std::collections::{HashMap, HashSet};
24use std::sync::Arc;
25
26use crate::db::Inner;
27use crate::error::Result;
28use crate::store::{MemoryStore, VersionStore};
29use crate::timestamp::Timestamp;
30
31/// A handle's registration in the database's live-reader set.
32///
33/// Constructing one registers a read timestamp; dropping it unregisters that
34/// timestamp. Holding it keeps the database from garbage-collecting versions the
35/// reader can still see. It is a field of both [`Transaction`] and [`Snapshot`]
36/// rather than a `Drop` on those types directly, so a transaction can still move
37/// its write buffer out at commit.
38struct ActiveReader<S: VersionStore> {
39 inner: Arc<Inner<S>>,
40 read_ts: Timestamp,
41}
42
43impl<S: VersionStore> ActiveReader<S> {
44 /// Register a reader against `inner`, reading at the current watermark.
45 fn new(inner: Arc<Inner<S>>) -> Self {
46 let read_ts = inner.begin_reader();
47 ActiveReader { inner, read_ts }
48 }
49}
50
51impl<S: VersionStore> Drop for ActiveReader<S> {
52 fn drop(&mut self) {
53 self.inner.end_reader(self.read_ts);
54 }
55}
56
57/// A read-write transaction over a consistent snapshot of the database.
58///
59/// A transaction is created by [`Db::begin`](crate::Db::begin) (snapshot
60/// isolation) or `Db::begin_serializable` (serializable, with the `serializable`
61/// feature). It reads as of the snapshot timestamp captured at that moment, so
62/// concurrent commits by other transactions are invisible to it. Writes are
63/// buffered in the transaction and become visible to others only when
64/// [`commit`](Transaction::commit) succeeds; within the transaction, a read of a
65/// key it has written returns that pending write (read-your-own-writes).
66///
67/// At commit the database checks every written key for a write-write conflict:
68/// if another transaction committed a change to any of those keys after this
69/// transaction's snapshot, the commit is rejected with a retryable
70/// [`TxnError::Conflict`](crate::TxnError::Conflict) and none of the writes are
71/// applied. A serializable transaction also validates its read set, rejecting
72/// commits whose reads are no longer current.
73///
74/// Dropping a transaction without committing discards its buffered writes; it
75/// is equivalent to [`rollback`](Transaction::rollback).
76///
77/// # Examples
78///
79/// ```
80/// use txn_db::Db;
81///
82/// let db = Db::new();
83///
84/// let mut tx = db.begin();
85/// tx.put(b"account:1".to_vec(), 100u64.to_le_bytes().to_vec());
86/// tx.put(b"account:2".to_vec(), 50u64.to_le_bytes().to_vec());
87/// let commit_ts = tx.commit()?;
88///
89/// // A fresh transaction sees the committed state.
90/// let tx = db.begin();
91/// assert!(tx.get(b"account:1")?.is_some());
92/// assert!(commit_ts > txn_db::Timestamp::ZERO);
93/// # Ok::<(), txn_db::TxnError>(())
94/// ```
95#[must_use = "a transaction buffers writes that are discarded unless it is committed"]
96pub struct Transaction<S: VersionStore = MemoryStore> {
97 active: ActiveReader<S>,
98 writes: HashMap<Arc<[u8]>, Option<Arc<[u8]>>>,
99 /// The set of keys read from the snapshot, tracked only for serializable
100 /// transactions. `None` under snapshot isolation, where reads are not
101 /// validated. Interior mutability lets [`get`](Self::get) record reads
102 /// through a shared reference.
103 reads: Option<RefCell<HashSet<Arc<[u8]>>>>,
104}
105
106impl<S: VersionStore> Transaction<S> {
107 /// Construct a transaction over `inner`. When `serializable` is set the
108 /// transaction records its read set for validation at commit.
109 pub(crate) fn new(inner: Arc<Inner<S>>, serializable: bool) -> Self {
110 Transaction {
111 active: ActiveReader::new(inner),
112 writes: HashMap::new(),
113 reads: serializable.then(|| RefCell::new(HashSet::new())),
114 }
115 }
116
117 /// The snapshot timestamp this transaction reads at.
118 ///
119 /// Every read that is not served from the transaction's own write buffer
120 /// observes the database as of this timestamp.
121 ///
122 /// # Examples
123 ///
124 /// ```
125 /// use txn_db::Db;
126 ///
127 /// let db = Db::new();
128 /// let tx = db.begin();
129 /// // Nothing has committed yet, so the snapshot is the empty database.
130 /// assert_eq!(tx.read_timestamp(), txn_db::Timestamp::ZERO);
131 /// ```
132 #[inline]
133 #[must_use]
134 pub fn read_timestamp(&self) -> Timestamp {
135 self.active.read_ts
136 }
137
138 /// Read the value of `key` as this transaction sees it.
139 ///
140 /// If the transaction has written `key`, the pending write is returned
141 /// (read-your-own-writes), including `None` if it has deleted the key.
142 /// Otherwise the value is read from the transaction's snapshot: the newest
143 /// version committed at or before the snapshot timestamp, or `None` if the
144 /// key does not exist as of the snapshot. For a serializable transaction the
145 /// key is recorded in the read set for validation at commit.
146 ///
147 /// # Errors
148 ///
149 /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing
150 /// [`VersionStore`](crate::VersionStore) fails the read. The default
151 /// in-memory store never fails.
152 ///
153 /// # Examples
154 ///
155 /// ```
156 /// use txn_db::Db;
157 ///
158 /// let db = Db::new();
159 /// let mut tx = db.begin();
160 ///
161 /// assert_eq!(tx.get(b"k")?, None); // absent
162 /// tx.put(b"k".to_vec(), b"v".to_vec());
163 /// assert_eq!(tx.get(b"k")?.as_deref(), Some(&b"v"[..])); // its own write
164 /// tx.delete(b"k".to_vec());
165 /// assert_eq!(tx.get(b"k")?, None); // its own delete
166 /// # Ok::<(), txn_db::TxnError>(())
167 /// ```
168 pub fn get(&self, key: &[u8]) -> Result<Option<Arc<[u8]>>> {
169 if let Some(pending) = self.writes.get(key) {
170 return Ok(pending.clone());
171 }
172 let value = self.active.inner.store.get(key, self.active.read_ts)?;
173 // A serializable transaction records the key — present or absent — so a
174 // later writer to it is caught at commit.
175 if let Some(reads) = &self.reads {
176 let _ = reads.borrow_mut().insert(Arc::from(key));
177 }
178 Ok(value)
179 }
180
181 /// Buffer a write of `value` to `key`, to be applied at commit.
182 ///
183 /// The write is local to this transaction until [`commit`](Self::commit)
184 /// succeeds; other transactions do not see it. Writing the same key twice
185 /// keeps the last value. Both arguments accept anything convertible into an
186 /// `Arc<[u8]>` — passing an owned `Vec<u8>` or `Arc<[u8]>` moves it in
187 /// without copying the bytes.
188 ///
189 /// # Examples
190 ///
191 /// ```
192 /// use txn_db::Db;
193 ///
194 /// let db = Db::new();
195 /// let mut tx = db.begin();
196 /// tx.put(b"city".to_vec(), b"oslo".to_vec());
197 /// tx.put(b"city".to_vec(), b"bergen".to_vec()); // overwrites within the txn
198 /// assert_eq!(tx.get(b"city")?.as_deref(), Some(&b"bergen"[..]));
199 /// # Ok::<(), txn_db::TxnError>(())
200 /// ```
201 pub fn put(&mut self, key: impl Into<Arc<[u8]>>, value: impl Into<Arc<[u8]>>) {
202 let _ = self.writes.insert(key.into(), Some(value.into()));
203 }
204
205 /// Buffer a delete of `key`, to be applied at commit.
206 ///
207 /// After this call the transaction reads `key` as absent. At commit a
208 /// tombstone is written so that snapshots taken after the commit also see
209 /// the key as absent. Deleting a key that does not exist is a no-op that
210 /// still participates in conflict detection, so a delete races other
211 /// writers the same way a `put` does.
212 ///
213 /// # Examples
214 ///
215 /// ```
216 /// use txn_db::Db;
217 ///
218 /// let db = Db::new();
219 /// let mut setup = db.begin();
220 /// setup.put(b"k".to_vec(), b"v".to_vec());
221 /// setup.commit()?;
222 ///
223 /// let mut tx = db.begin();
224 /// tx.delete(b"k".to_vec());
225 /// tx.commit()?;
226 ///
227 /// assert_eq!(db.begin().get(b"k")?, None);
228 /// # Ok::<(), txn_db::TxnError>(())
229 /// ```
230 pub fn delete(&mut self, key: impl Into<Arc<[u8]>>) {
231 let _ = self.writes.insert(key.into(), None);
232 }
233
234 /// Commit the transaction, applying all buffered writes atomically.
235 ///
236 /// On success every buffered write becomes visible to transactions that
237 /// begin afterward, and the commit timestamp is returned. A transaction
238 /// that buffered no writes commits trivially and returns its snapshot
239 /// timestamp without allocating a new one — including a serializable
240 /// read-only transaction, which has observed a consistent snapshot and needs
241 /// no validation.
242 ///
243 /// # Errors
244 ///
245 /// Returns [`TxnError::Conflict`](crate::TxnError::Conflict) — which is
246 /// retryable — if any written key was changed by another transaction that
247 /// committed after this one's snapshot, or, for a serializable transaction,
248 /// if any key it read has since changed. In either case no writes are
249 /// applied. Returns [`TxnError::Store`](crate::TxnError::Store) if the
250 /// backing store fails to apply the batch, or
251 /// [`TxnError::Durability`](crate::TxnError::Durability) if a durable commit
252 /// cannot be made durable.
253 ///
254 /// # Examples
255 ///
256 /// ```
257 /// use txn_db::Db;
258 ///
259 /// let db = Db::new();
260 /// let mut tx = db.begin();
261 /// tx.put(b"k".to_vec(), b"v".to_vec());
262 /// let ts = tx.commit()?;
263 /// assert!(ts > txn_db::Timestamp::ZERO);
264 /// # Ok::<(), txn_db::TxnError>(())
265 /// ```
266 pub fn commit(self) -> Result<Timestamp> {
267 let read_ts = self.active.read_ts;
268 if self.writes.is_empty() {
269 return Ok(read_ts);
270 }
271 // The read set, minus keys also in the write set (those are covered by
272 // the write-write check). Empty for snapshot-isolation transactions.
273 let reads: Vec<Arc<[u8]>> = match self.reads {
274 Some(set) => set
275 .into_inner()
276 .into_iter()
277 .filter(|key| !self.writes.contains_key(key))
278 .collect(),
279 None => Vec::new(),
280 };
281 let batch = self.writes.into_iter().collect();
282 self.active.inner.commit_writes(read_ts, batch, &reads)
283 }
284
285 /// Discard the transaction and all of its buffered writes.
286 ///
287 /// This is explicit; simply dropping the transaction has the same effect.
288 /// Rolling back never fails and never touches the shared store.
289 ///
290 /// # Examples
291 ///
292 /// ```
293 /// use txn_db::Db;
294 ///
295 /// let db = Db::new();
296 /// let mut tx = db.begin();
297 /// tx.put(b"k".to_vec(), b"v".to_vec());
298 /// tx.rollback();
299 ///
300 /// // The write never reached the database.
301 /// assert_eq!(db.begin().get(b"k")?, None);
302 /// # Ok::<(), txn_db::TxnError>(())
303 /// ```
304 #[inline]
305 pub fn rollback(self) {
306 // Dropping `self` releases the buffered writes and unregisters the
307 // reader; this method documents the intent and consumes the transaction
308 // so it cannot be used again.
309 }
310}
311
312/// A read-only, point-in-time view of the database.
313///
314/// A snapshot is created by [`Db::snapshot`](crate::Db::snapshot) and reads as
315/// of the moment it was taken. It has no write buffer and nothing to commit, so
316/// it is cheaper than a transaction when all you need is to read several keys at
317/// one consistent instant. While it is alive it pins that instant: garbage
318/// collection will not reclaim a version the snapshot can still observe.
319///
320/// # Examples
321///
322/// ```
323/// use txn_db::Db;
324///
325/// let db = Db::new();
326/// let mut tx = db.begin();
327/// tx.put(b"k".to_vec(), b"v1".to_vec());
328/// tx.commit()?;
329///
330/// // Capture a snapshot, then change the database.
331/// let snap = db.snapshot();
332/// let mut tx = db.begin();
333/// tx.put(b"k".to_vec(), b"v2".to_vec());
334/// tx.commit()?;
335///
336/// // The snapshot still sees the value as of when it was taken.
337/// assert_eq!(snap.get(b"k")?.as_deref(), Some(&b"v1"[..]));
338/// assert_eq!(db.snapshot().get(b"k")?.as_deref(), Some(&b"v2"[..]));
339/// # Ok::<(), txn_db::TxnError>(())
340/// ```
341pub struct Snapshot<S: VersionStore = MemoryStore> {
342 active: ActiveReader<S>,
343}
344
345impl<S: VersionStore> Snapshot<S> {
346 /// Construct a snapshot over `inner`, reading at the current watermark.
347 pub(crate) fn new(inner: Arc<Inner<S>>) -> Self {
348 Snapshot {
349 active: ActiveReader::new(inner),
350 }
351 }
352
353 /// The timestamp this snapshot reads at.
354 ///
355 /// # Examples
356 ///
357 /// ```
358 /// use txn_db::Db;
359 ///
360 /// let db = Db::new();
361 /// assert_eq!(db.snapshot().read_timestamp(), txn_db::Timestamp::ZERO);
362 /// ```
363 #[inline]
364 #[must_use]
365 pub fn read_timestamp(&self) -> Timestamp {
366 self.active.read_ts
367 }
368
369 /// Read the value of `key` as of this snapshot.
370 ///
371 /// Returns the newest version committed at or before the snapshot
372 /// timestamp, or `None` if the key does not exist as of that instant.
373 ///
374 /// # Errors
375 ///
376 /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing store
377 /// fails the read. The default in-memory store never fails.
378 ///
379 /// # Examples
380 ///
381 /// ```
382 /// use txn_db::Db;
383 ///
384 /// let db = Db::new();
385 /// assert_eq!(db.snapshot().get(b"missing")?, None);
386 /// # Ok::<(), txn_db::TxnError>(())
387 /// ```
388 pub fn get(&self, key: &[u8]) -> Result<Option<Arc<[u8]>>> {
389 self.active.inner.store.get(key, self.active.read_ts)
390 }
391}