txn_db/db.rs
1//! The database handle and the commit coordinator behind it.
2//!
3//! [`Db`] is the Tier-1 entry point: construct one, [`begin`](Db::begin)
4//! transactions against it, [`commit`](crate::Transaction::commit) them. A `Db`
5//! is a cheap, clonable handle to shared state — clone it freely and hand a
6//! clone to every thread that needs to read or write.
7//!
8//! The shared state itself lives in [`Inner`], which owns the version store and
9//! the [`Oracle`](crate::oracle::Oracle) that allocates timestamps and tracks
10//! the read watermark. Commit coordination is split deliberately: the oracle
11//! hands out timestamps lock-free, and the version store is the serialization
12//! point that validates and applies each commit atomically. The single global
13//! commit lock of the foundation release is gone.
14
15use std::sync::Arc;
16
17use crate::error::Result;
18use crate::oracle::Oracle;
19use crate::store::{MemoryStore, VersionStore, WriteEntry};
20use crate::timestamp::Timestamp;
21use crate::txn::{Snapshot, Transaction};
22
23/// Shared, reference-counted state for one logical database.
24///
25/// A [`Db`] is a handle to an `Arc<Inner>`; every clone of the `Db`, every
26/// [`Transaction`], and every [`Snapshot`] holds a clone of the same `Inner`,
27/// so they all read and commit against one version store and one timestamp
28/// sequence.
29pub(crate) struct Inner<S: VersionStore> {
30 /// The backing version store. Reads go to it; commits validate and apply
31 /// through it.
32 pub(crate) store: S,
33 /// Allocates timestamps and tracks the consistent-read watermark.
34 oracle: Oracle,
35}
36
37impl<S: VersionStore> Inner<S> {
38 fn new(store: S) -> Self {
39 Inner {
40 store,
41 oracle: Oracle::new(),
42 }
43 }
44
45 /// The timestamp a transaction beginning now should read at.
46 #[inline]
47 fn read_ts(&self) -> Timestamp {
48 self.oracle.read_ts()
49 }
50
51 /// Allocate a commit timestamp, validate-and-apply through the store, then
52 /// release the timestamp into the watermark.
53 ///
54 /// The timestamp is reported to the oracle on both outcomes — a successful
55 /// commit and a rejected one — so a conflict never stalls the read watermark
56 /// behind the timestamp it consumed.
57 pub(crate) fn commit_writes(
58 &self,
59 read_ts: Timestamp,
60 writes: Vec<WriteEntry>,
61 reads: &[Arc<[u8]>],
62 ) -> Result<Timestamp> {
63 let commit_ts = self.oracle.alloc_commit_ts();
64 let outcome = self.store.try_commit(read_ts, commit_ts, writes, reads);
65 self.oracle.commit_done(commit_ts);
66 outcome.map(|()| commit_ts)
67 }
68}
69
70/// A transactional, multi-version key-value database.
71///
72/// `Db` is the front door. [`Db::new`] gives you an in-memory database;
73/// [`Db::with_store`] builds one over any [`VersionStore`]. From there the whole
74/// common case is [`begin`](Db::begin) / [`get`](crate::Transaction::get) /
75/// [`put`](crate::Transaction::put) / [`commit`](crate::Transaction::commit),
76/// with [`snapshot`](Db::snapshot) for read-only point-in-time views.
77///
78/// Transactions default to **snapshot isolation**. With the `serializable`
79/// feature enabled, [`begin_serializable`](Db::begin_serializable) starts a
80/// transaction whose read set is validated at commit, rejecting write skew and
81/// the other anomalies snapshot isolation permits.
82///
83/// A `Db` is a clonable handle over shared state, like an [`Arc`]. Cloning it
84/// is cheap and every clone refers to the same database, so the idiomatic way
85/// to use it across threads is to clone a handle per thread.
86///
87/// # Examples
88///
89/// The four-call common case:
90///
91/// ```
92/// use txn_db::Db;
93///
94/// let db = Db::new();
95///
96/// let mut tx = db.begin();
97/// tx.put(b"greeting".to_vec(), b"hei".to_vec());
98/// tx.commit()?;
99///
100/// let tx = db.begin();
101/// assert_eq!(tx.get(b"greeting")?.as_deref(), Some(&b"hei"[..]));
102/// # Ok::<(), txn_db::TxnError>(())
103/// ```
104///
105/// Sharing one database across threads:
106///
107/// ```
108/// use std::thread;
109/// use txn_db::Db;
110///
111/// let db = Db::new();
112/// let handles: Vec<_> = (0..4u8)
113/// .map(|i| {
114/// let db = db.clone();
115/// thread::spawn(move || {
116/// let mut tx = db.begin();
117/// tx.put(vec![i], vec![i]);
118/// // Independent keys never conflict.
119/// tx.commit().expect("commit");
120/// })
121/// })
122/// .collect();
123/// for h in handles {
124/// h.join().expect("thread");
125/// }
126/// # Ok::<(), txn_db::TxnError>(())
127/// ```
128pub struct Db<S: VersionStore = MemoryStore> {
129 inner: Arc<Inner<S>>,
130}
131
132impl Db<MemoryStore> {
133 /// Create an empty in-memory database.
134 ///
135 /// This is the default configuration: a [`MemoryStore`] backing store, ready
136 /// for [`begin`](Db::begin).
137 ///
138 /// # Examples
139 ///
140 /// ```
141 /// use txn_db::Db;
142 ///
143 /// let db = Db::new();
144 /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
145 /// ```
146 #[must_use]
147 pub fn new() -> Self {
148 Db::with_store(MemoryStore::new())
149 }
150}
151
152impl Default for Db<MemoryStore> {
153 fn default() -> Self {
154 Db::new()
155 }
156}
157
158impl<S: VersionStore> Db<S> {
159 /// Create a database over a custom [`VersionStore`].
160 ///
161 /// This is the Tier-3 seam: supply any backing store and the transaction
162 /// semantics — snapshot isolation, read-your-own-writes, conflict detection
163 /// — compose on top of it unchanged.
164 ///
165 /// # Examples
166 ///
167 /// ```
168 /// use txn_db::{Db, MemoryStore};
169 ///
170 /// let db = Db::with_store(MemoryStore::new());
171 /// let mut tx = db.begin();
172 /// tx.put(b"k".to_vec(), b"v".to_vec());
173 /// tx.commit()?;
174 /// # Ok::<(), txn_db::TxnError>(())
175 /// ```
176 #[must_use]
177 pub fn with_store(store: S) -> Self {
178 Db {
179 inner: Arc::new(Inner::new(store)),
180 }
181 }
182
183 /// Begin a snapshot-isolation transaction over the current state.
184 ///
185 /// The transaction takes its snapshot at this moment: it reads as of the
186 /// most recent commit and is unaffected by commits that happen afterward.
187 /// Its writes are checked for write-write conflicts at commit, but its reads
188 /// are not validated — use [`begin_serializable`](Db::begin_serializable)
189 /// when you need serializability.
190 ///
191 /// # Examples
192 ///
193 /// ```
194 /// use txn_db::Db;
195 ///
196 /// let db = Db::new();
197 /// let mut tx = db.begin();
198 /// tx.put(b"k".to_vec(), b"v".to_vec());
199 /// tx.commit()?;
200 /// # Ok::<(), txn_db::TxnError>(())
201 /// ```
202 pub fn begin(&self) -> Transaction<S> {
203 Transaction::new(Arc::clone(&self.inner), self.inner.read_ts(), false)
204 }
205
206 /// Begin a serializable transaction over the current state.
207 ///
208 /// A serializable transaction tracks every key it reads and, at commit,
209 /// validates that none of them changed since its snapshot — in addition to
210 /// the write-write check every transaction gets. That read-set validation is
211 /// what rejects write skew and the read-only anomaly that plain snapshot
212 /// isolation permits, giving serializable behavior for the transactions that
213 /// commit writes. A serializable transaction that writes nothing commits
214 /// trivially, exactly like a read-only snapshot.
215 ///
216 /// Available with the `serializable` feature. Snapshot isolation remains the
217 /// default and is unaffected.
218 ///
219 /// # Examples
220 ///
221 /// ```
222 /// # #[cfg(feature = "serializable")]
223 /// # {
224 /// use txn_db::Db;
225 ///
226 /// let db = Db::new();
227 /// // Seed two rows that an invariant ties together.
228 /// let mut tx = db.begin();
229 /// tx.put(b"on_call:alice".to_vec(), vec![1]);
230 /// tx.put(b"on_call:bob".to_vec(), vec![1]);
231 /// tx.commit()?;
232 ///
233 /// // A serializable transaction validates the rows it read at commit.
234 /// let mut tx = db.begin_serializable();
235 /// let _alice = tx.get(b"on_call:alice")?;
236 /// let _bob = tx.get(b"on_call:bob")?;
237 /// tx.put(b"on_call:alice".to_vec(), vec![0]);
238 /// tx.commit()?;
239 /// # }
240 /// # Ok::<(), txn_db::TxnError>(())
241 /// ```
242 #[cfg(feature = "serializable")]
243 #[cfg_attr(docsrs, doc(cfg(feature = "serializable")))]
244 pub fn begin_serializable(&self) -> Transaction<S> {
245 Transaction::new(Arc::clone(&self.inner), self.inner.read_ts(), true)
246 }
247
248 /// Take a read-only snapshot of the current state of the database.
249 ///
250 /// The returned [`Snapshot`] reads as of this instant and never changes,
251 /// even as other transactions commit. Use it to read several keys at one
252 /// consistent point in time without the overhead of a transaction.
253 ///
254 /// # Examples
255 ///
256 /// ```
257 /// use txn_db::Db;
258 ///
259 /// let db = Db::new();
260 /// let snap = db.snapshot();
261 /// assert_eq!(snap.get(b"k")?, None);
262 /// # Ok::<(), txn_db::TxnError>(())
263 /// ```
264 pub fn snapshot(&self) -> Snapshot<S> {
265 Snapshot::new(Arc::clone(&self.inner), self.inner.read_ts())
266 }
267
268 /// The timestamp of the most recent commit visible to a new transaction.
269 ///
270 /// Returns [`Timestamp::ZERO`] for a database that has never been written.
271 /// This is the read watermark: the timestamp a transaction beginning now
272 /// would read at.
273 ///
274 /// # Examples
275 ///
276 /// ```
277 /// use txn_db::Db;
278 ///
279 /// let db = Db::new();
280 /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
281 ///
282 /// let mut tx = db.begin();
283 /// tx.put(b"k".to_vec(), b"v".to_vec());
284 /// let ts = tx.commit()?;
285 /// assert_eq!(db.last_committed(), ts);
286 /// # Ok::<(), txn_db::TxnError>(())
287 /// ```
288 #[must_use]
289 pub fn last_committed(&self) -> Timestamp {
290 self.inner.read_ts()
291 }
292}
293
294impl<S: VersionStore> Clone for Db<S> {
295 /// Clone the handle, not the data: the clone shares the same underlying
296 /// database.
297 fn clone(&self) -> Self {
298 Db {
299 inner: Arc::clone(&self.inner),
300 }
301 }
302}
303
304#[cfg(all(test, not(loom)))]
305#[allow(clippy::unwrap_used, clippy::expect_used)]
306mod tests {
307 use super::*;
308
309 #[test]
310 fn test_new_database_is_empty_at_zero() {
311 let db = Db::new();
312 assert_eq!(db.last_committed(), Timestamp::ZERO);
313 assert_eq!(db.begin().get(b"k").unwrap(), None);
314 }
315
316 #[test]
317 fn test_commit_makes_writes_visible_to_later_transactions() {
318 let db = Db::new();
319 let mut tx = db.begin();
320 tx.put(b"k".to_vec(), b"v".to_vec());
321 let ts = tx.commit().unwrap();
322 assert!(ts > Timestamp::ZERO);
323 assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
324 }
325
326 #[test]
327 fn test_snapshot_is_isolated_from_later_commits() {
328 let db = Db::new();
329 let mut tx = db.begin();
330 tx.put(b"k".to_vec(), b"v1".to_vec());
331 let _ = tx.commit().unwrap();
332
333 let snap = db.snapshot();
334 let mut tx = db.begin();
335 tx.put(b"k".to_vec(), b"v2".to_vec());
336 let _ = tx.commit().unwrap();
337
338 assert_eq!(snap.get(b"k").unwrap().as_deref(), Some(&b"v1"[..]));
339 }
340
341 #[test]
342 fn test_write_write_conflict_aborts_later_committer() {
343 let db = Db::new();
344 let mut a = db.begin();
345 let mut b = db.begin();
346 a.put(b"k".to_vec(), b"a".to_vec());
347 b.put(b"k".to_vec(), b"b".to_vec());
348
349 assert!(a.commit().is_ok());
350 let err = b.commit().expect_err("second committer must lose");
351 assert!(err.is_retryable());
352 assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"a"[..]));
353 }
354
355 #[test]
356 fn test_disjoint_keys_do_not_conflict() {
357 let db = Db::new();
358 let mut a = db.begin();
359 let mut b = db.begin();
360 a.put(b"a".to_vec(), b"1".to_vec());
361 b.put(b"b".to_vec(), b"2".to_vec());
362 assert!(a.commit().is_ok());
363 assert!(b.commit().is_ok());
364 }
365
366 #[test]
367 fn test_read_only_commit_returns_snapshot_timestamp() {
368 let db = Db::new();
369 let mut tx = db.begin();
370 tx.put(b"k".to_vec(), b"v".to_vec());
371 let ts = tx.commit().unwrap();
372
373 let ro = db.begin();
374 assert_eq!(ro.commit().unwrap(), ts);
375 }
376
377 #[test]
378 fn test_rollback_discards_writes() {
379 let db = Db::new();
380 let mut tx = db.begin();
381 tx.put(b"k".to_vec(), b"v".to_vec());
382 tx.rollback();
383 assert_eq!(db.begin().get(b"k").unwrap(), None);
384 }
385
386 #[test]
387 fn test_clone_shares_state() {
388 let db = Db::new();
389 let db2 = db.clone();
390 let mut tx = db.begin();
391 tx.put(b"k".to_vec(), b"v".to_vec());
392 let _ = tx.commit().unwrap();
393 assert_eq!(db2.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
394 }
395
396 #[cfg(feature = "serializable")]
397 #[test]
398 fn test_serializable_rejects_write_skew() {
399 let db = Db::new();
400 let mut seed = db.begin();
401 seed.put(b"x".to_vec(), vec![1]);
402 seed.put(b"y".to_vec(), vec![1]);
403 let _ = seed.commit().unwrap();
404
405 // Two serializable transactions from the same snapshot each read both
406 // rows and write the one the other read.
407 let mut t1 = db.begin_serializable();
408 let mut t2 = db.begin_serializable();
409 let _ = t1.get(b"x").unwrap();
410 let _ = t1.get(b"y").unwrap();
411 let _ = t2.get(b"x").unwrap();
412 let _ = t2.get(b"y").unwrap();
413 t1.put(b"x".to_vec(), vec![0]);
414 t2.put(b"y".to_vec(), vec![0]);
415
416 assert!(t1.commit().is_ok());
417 // t2 read x, which t1 changed -> serializable validation aborts it.
418 let err = t2.commit().expect_err("write skew must be rejected");
419 assert!(err.is_retryable());
420 }
421
422 #[cfg(feature = "serializable")]
423 #[test]
424 fn test_snapshot_txn_allows_write_skew() {
425 let db = Db::new();
426 let mut seed = db.begin();
427 seed.put(b"x".to_vec(), vec![1]);
428 seed.put(b"y".to_vec(), vec![1]);
429 let _ = seed.commit().unwrap();
430
431 // The same schedule under plain snapshot isolation: both commit, because
432 // SI does not validate the read set.
433 let mut t1 = db.begin();
434 let mut t2 = db.begin();
435 let _ = t1.get(b"x").unwrap();
436 let _ = t1.get(b"y").unwrap();
437 let _ = t2.get(b"x").unwrap();
438 let _ = t2.get(b"y").unwrap();
439 t1.put(b"x".to_vec(), vec![0]);
440 t2.put(b"y".to_vec(), vec![0]);
441
442 assert!(t1.commit().is_ok());
443 assert!(t2.commit().is_ok());
444 }
445}