txn_db/db.rs
1//! The database handle and the commit coordinator behind it.
2//!
3//! [`Db`] is the Tier-1 entry point: construct one, [`begin`](Db::begin)
4//! transactions against it, [`commit`](crate::Transaction::commit) them. A `Db`
5//! is a cheap, clonable handle to shared state — clone it freely and hand a
6//! clone to every thread that needs to read or write.
7//!
8//! The shared state itself lives in [`Inner`], which owns the version store and
9//! the [`Oracle`](crate::oracle::Oracle) that allocates timestamps and tracks
10//! the read watermark. Commit coordination is split deliberately: the oracle
11//! hands out timestamps lock-free, and the version store is the serialization
12//! point that validates and applies each commit atomically. The single global
13//! commit lock of the foundation release is gone.
14
15use std::sync::Arc;
16
17use crate::error::Result;
18use crate::oracle::Oracle;
19use crate::store::{MemoryStore, VersionStore, WriteEntry};
20use crate::timestamp::Timestamp;
21use crate::txn::{Snapshot, Transaction};
22
23/// Shared, reference-counted state for one logical database.
24///
25/// A [`Db`] is a handle to an `Arc<Inner>`; every clone of the `Db`, every
26/// [`Transaction`], and every [`Snapshot`] holds a clone of the same `Inner`,
27/// so they all read and commit against one version store and one timestamp
28/// sequence.
29pub(crate) struct Inner<S: VersionStore> {
30 /// The backing version store. Reads go to it; commits validate and apply
31 /// through it.
32 pub(crate) store: S,
33 /// Allocates timestamps and tracks the consistent-read watermark.
34 oracle: Oracle,
35 /// The durable commit log, present only for a database opened with
36 /// [`Db::open`]. `None` for an in-memory database.
37 #[cfg(feature = "durability")]
38 log: Option<crate::durable::CommitLog>,
39}
40
41impl<S: VersionStore> Inner<S> {
42 fn new(store: S) -> Self {
43 Inner {
44 store,
45 oracle: Oracle::new(),
46 #[cfg(feature = "durability")]
47 log: None,
48 }
49 }
50
51 /// The timestamp a transaction beginning now should read at.
52 #[inline]
53 fn read_ts(&self) -> Timestamp {
54 self.oracle.read_ts()
55 }
56
57 /// Register a live reader and return its read timestamp.
58 #[inline]
59 pub(crate) fn begin_reader(&self) -> Timestamp {
60 self.oracle.begin_reader()
61 }
62
63 /// Unregister a reader that began at `read_ts`.
64 #[inline]
65 pub(crate) fn end_reader(&self, read_ts: Timestamp) {
66 self.oracle.end_reader(read_ts);
67 }
68
69 /// Reclaim versions no live reader can observe, returning the count removed.
70 fn collect_garbage(&self) -> usize {
71 self.store.collect_garbage(self.oracle.low_watermark())
72 }
73
74 /// Allocate a commit timestamp, validate-and-apply through the store, then
75 /// release the timestamp into the watermark.
76 ///
77 /// The timestamp is reported to the oracle on both outcomes — a successful
78 /// commit and a rejected one — so a conflict never stalls the read watermark
79 /// behind the timestamp it consumed.
80 pub(crate) fn commit_writes(
81 &self,
82 read_ts: Timestamp,
83 writes: Vec<WriteEntry>,
84 reads: &[Arc<[u8]>],
85 ) -> Result<Timestamp> {
86 let commit_ts = self.oracle.alloc_commit_ts();
87
88 // Encode the durable record before the write set is consumed by the
89 // store. No cost for an in-memory database (no log).
90 #[cfg(feature = "durability")]
91 let record = self
92 .log
93 .as_ref()
94 .map(|_| crate::durable::encode_for_log(commit_ts, &writes));
95
96 let outcome = self.store.try_commit(read_ts, commit_ts, writes, reads);
97
98 // Make the commit durable before it is acknowledged. The validate-and-
99 // apply has already happened in memory but is not yet visible — the
100 // watermark only advances at `commit_done` below — so a crash before the
101 // sync completes leaves a transaction that was never acknowledged and is
102 // recovered as absent.
103 #[cfg(feature = "durability")]
104 if outcome.is_ok() {
105 if let (Some(log), Some(record)) = (self.log.as_ref(), record) {
106 if let Err(err) = log.append_committed(&record) {
107 self.oracle.commit_done(commit_ts);
108 return Err(err);
109 }
110 }
111 }
112
113 self.oracle.commit_done(commit_ts);
114 outcome.map(|()| commit_ts)
115 }
116
117 /// Build the shared inner state for a database recovered from a durable log.
118 #[cfg(feature = "durability")]
119 fn recovered(store: S, oracle: Oracle, log: crate::durable::CommitLog) -> Self {
120 Inner {
121 store,
122 oracle,
123 log: Some(log),
124 }
125 }
126}
127
128/// A transactional, multi-version key-value database.
129///
130/// `Db` is the front door. [`Db::new`] gives you an in-memory database;
131/// [`Db::with_store`] builds one over any [`VersionStore`]. From there the whole
132/// common case is [`begin`](Db::begin) / [`get`](crate::Transaction::get) /
133/// [`put`](crate::Transaction::put) / [`commit`](crate::Transaction::commit),
134/// with [`snapshot`](Db::snapshot) for read-only point-in-time views.
135///
136/// Transactions default to **snapshot isolation**. With the `serializable`
137/// feature enabled, `begin_serializable` starts a transaction whose read set is
138/// validated at commit, rejecting write skew and the other anomalies snapshot
139/// isolation permits.
140///
141/// A `Db` is a clonable handle over shared state, like an [`Arc`]. Cloning it
142/// is cheap and every clone refers to the same database, so the idiomatic way
143/// to use it across threads is to clone a handle per thread.
144///
145/// # Examples
146///
147/// The four-call common case:
148///
149/// ```
150/// use txn_db::Db;
151///
152/// let db = Db::new();
153///
154/// let mut tx = db.begin();
155/// tx.put(b"greeting".to_vec(), b"hei".to_vec());
156/// tx.commit()?;
157///
158/// let tx = db.begin();
159/// assert_eq!(tx.get(b"greeting")?.as_deref(), Some(&b"hei"[..]));
160/// # Ok::<(), txn_db::TxnError>(())
161/// ```
162///
163/// Sharing one database across threads:
164///
165/// ```
166/// use std::thread;
167/// use txn_db::Db;
168///
169/// let db = Db::new();
170/// let handles: Vec<_> = (0..4u8)
171/// .map(|i| {
172/// let db = db.clone();
173/// thread::spawn(move || {
174/// let mut tx = db.begin();
175/// tx.put(vec![i], vec![i]);
176/// // Independent keys never conflict.
177/// tx.commit().expect("commit");
178/// })
179/// })
180/// .collect();
181/// for h in handles {
182/// h.join().expect("thread");
183/// }
184/// # Ok::<(), txn_db::TxnError>(())
185/// ```
186pub struct Db<S: VersionStore = MemoryStore> {
187 inner: Arc<Inner<S>>,
188}
189
190impl Db<MemoryStore> {
191 /// Create an empty in-memory database.
192 ///
193 /// This is the default configuration: a [`MemoryStore`] backing store, ready
194 /// for [`begin`](Db::begin).
195 ///
196 /// # Examples
197 ///
198 /// ```
199 /// use txn_db::Db;
200 ///
201 /// let db = Db::new();
202 /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
203 /// ```
204 #[must_use]
205 pub fn new() -> Self {
206 Db::with_store(MemoryStore::new())
207 }
208
209 /// Open a durable database backed by a write-ahead log at `path`, replaying
210 /// any committed transactions already in the log.
211 ///
212 /// Every transaction committed against the returned database appends its
213 /// record to the log and syncs it before [`commit`](crate::Transaction::commit)
214 /// returns, so an acknowledged commit survives a crash. On open, the log is
215 /// replayed: each committed transaction is reinstated, and a transaction that
216 /// never reached the log — because it aborted, or because the process crashed
217 /// before its record was made durable — is simply absent. The recovered data
218 /// lives in memory; the log is the durable record from which it is rebuilt.
219 ///
220 /// Available with the `durability` feature.
221 ///
222 /// # Errors
223 ///
224 /// Returns [`TxnError::Durability`](crate::TxnError::Durability) if the log
225 /// cannot be opened or a record read back from it does not decode.
226 ///
227 /// # Examples
228 ///
229 /// ```
230 /// # #[cfg(feature = "durability")]
231 /// # {
232 /// # let dir = tempfile::tempdir().expect("tempdir");
233 /// # let path = dir.path().join("txn.wal");
234 /// use txn_db::Db;
235 ///
236 /// // Commit, then drop the database.
237 /// {
238 /// let db = Db::open(&path)?;
239 /// let mut tx = db.begin();
240 /// tx.put(b"k".to_vec(), b"v".to_vec());
241 /// tx.commit()?;
242 /// }
243 ///
244 /// // Reopening replays the log: the committed write is still there.
245 /// let db = Db::open(&path)?;
246 /// assert_eq!(db.begin().get(b"k")?.as_deref(), Some(&b"v"[..]));
247 /// # }
248 /// # Ok::<(), txn_db::TxnError>(())
249 /// ```
250 #[cfg(feature = "durability")]
251 #[cfg_attr(docsrs, doc(cfg(feature = "durability")))]
252 pub fn open(path: impl AsRef<std::path::Path>) -> Result<Db<MemoryStore>> {
253 let (log, mut recovered) = crate::durable::CommitLog::open(path)?;
254
255 // Replay in ascending commit-timestamp order; records may sit in the log
256 // out of that order because commits append after applying, concurrently.
257 recovered.sort_by_key(|commit| commit.commit_ts);
258
259 let store = MemoryStore::new();
260 let mut highest = Timestamp::ZERO;
261 for commit in recovered {
262 highest = highest.max(commit.commit_ts);
263 store.install_recovered(commit.commit_ts, commit.writes);
264 }
265
266 Ok(Db {
267 inner: Arc::new(Inner::recovered(store, Oracle::recovered(highest), log)),
268 })
269 }
270}
271
272impl Default for Db<MemoryStore> {
273 fn default() -> Self {
274 Db::new()
275 }
276}
277
278impl<S: VersionStore> Db<S> {
279 /// Create a database over a custom [`VersionStore`].
280 ///
281 /// This is the Tier-3 seam: supply any backing store and the transaction
282 /// semantics — snapshot isolation, read-your-own-writes, conflict detection
283 /// — compose on top of it unchanged.
284 ///
285 /// # Examples
286 ///
287 /// ```
288 /// use txn_db::{Db, MemoryStore};
289 ///
290 /// let db = Db::with_store(MemoryStore::new());
291 /// let mut tx = db.begin();
292 /// tx.put(b"k".to_vec(), b"v".to_vec());
293 /// tx.commit()?;
294 /// # Ok::<(), txn_db::TxnError>(())
295 /// ```
296 #[must_use]
297 pub fn with_store(store: S) -> Self {
298 Db {
299 inner: Arc::new(Inner::new(store)),
300 }
301 }
302
303 /// Begin a snapshot-isolation transaction over the current state.
304 ///
305 /// The transaction takes its snapshot at this moment: it reads as of the
306 /// most recent commit and is unaffected by commits that happen afterward.
307 /// Its writes are checked for write-write conflicts at commit, but its reads
308 /// are not validated — use `begin_serializable` (with the `serializable`
309 /// feature) when you need serializability.
310 ///
311 /// # Examples
312 ///
313 /// ```
314 /// use txn_db::Db;
315 ///
316 /// let db = Db::new();
317 /// let mut tx = db.begin();
318 /// tx.put(b"k".to_vec(), b"v".to_vec());
319 /// tx.commit()?;
320 /// # Ok::<(), txn_db::TxnError>(())
321 /// ```
322 pub fn begin(&self) -> Transaction<S> {
323 Transaction::new(Arc::clone(&self.inner), false)
324 }
325
326 /// Begin a serializable transaction over the current state.
327 ///
328 /// A serializable transaction tracks every key it reads and, at commit,
329 /// validates that none of them changed since its snapshot — in addition to
330 /// the write-write check every transaction gets. That read-set validation is
331 /// what rejects write skew and the read-only anomaly that plain snapshot
332 /// isolation permits, giving serializable behavior for the transactions that
333 /// commit writes. A serializable transaction that writes nothing commits
334 /// trivially, exactly like a read-only snapshot.
335 ///
336 /// Available with the `serializable` feature. Snapshot isolation remains the
337 /// default and is unaffected.
338 ///
339 /// # Examples
340 ///
341 /// ```
342 /// # #[cfg(feature = "serializable")]
343 /// # {
344 /// use txn_db::Db;
345 ///
346 /// let db = Db::new();
347 /// // Seed two rows that an invariant ties together.
348 /// let mut tx = db.begin();
349 /// tx.put(b"on_call:alice".to_vec(), vec![1]);
350 /// tx.put(b"on_call:bob".to_vec(), vec![1]);
351 /// tx.commit()?;
352 ///
353 /// // A serializable transaction validates the rows it read at commit.
354 /// let mut tx = db.begin_serializable();
355 /// let _alice = tx.get(b"on_call:alice")?;
356 /// let _bob = tx.get(b"on_call:bob")?;
357 /// tx.put(b"on_call:alice".to_vec(), vec![0]);
358 /// tx.commit()?;
359 /// # }
360 /// # Ok::<(), txn_db::TxnError>(())
361 /// ```
362 #[cfg(feature = "serializable")]
363 #[cfg_attr(docsrs, doc(cfg(feature = "serializable")))]
364 pub fn begin_serializable(&self) -> Transaction<S> {
365 Transaction::new(Arc::clone(&self.inner), true)
366 }
367
368 /// Take a read-only snapshot of the current state of the database.
369 ///
370 /// The returned [`Snapshot`] reads as of this instant and never changes,
371 /// even as other transactions commit. Use it to read several keys at one
372 /// consistent point in time without the overhead of a transaction.
373 ///
374 /// # Examples
375 ///
376 /// ```
377 /// use txn_db::Db;
378 ///
379 /// let db = Db::new();
380 /// let snap = db.snapshot();
381 /// assert_eq!(snap.get(b"k")?, None);
382 /// # Ok::<(), txn_db::TxnError>(())
383 /// ```
384 pub fn snapshot(&self) -> Snapshot<S> {
385 Snapshot::new(Arc::clone(&self.inner))
386 }
387
388 /// The timestamp of the most recent commit visible to a new transaction.
389 ///
390 /// Returns [`Timestamp::ZERO`] for a database that has never been written.
391 /// This is the read watermark: the timestamp a transaction beginning now
392 /// would read at.
393 ///
394 /// # Examples
395 ///
396 /// ```
397 /// use txn_db::Db;
398 ///
399 /// let db = Db::new();
400 /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
401 ///
402 /// let mut tx = db.begin();
403 /// tx.put(b"k".to_vec(), b"v".to_vec());
404 /// let ts = tx.commit()?;
405 /// assert_eq!(db.last_committed(), ts);
406 /// # Ok::<(), txn_db::TxnError>(())
407 /// ```
408 #[must_use]
409 pub fn last_committed(&self) -> Timestamp {
410 self.inner.read_ts()
411 }
412
413 /// Reclaim versions that no live transaction or snapshot can observe,
414 /// returning how many were removed.
415 ///
416 /// `txn-db` keeps every version of a key so that an in-flight reader sees a
417 /// stable snapshot. Once no live reader can observe an old version — because
418 /// every active transaction and snapshot reads at a timestamp newer than a
419 /// later version of that key — the old one is unreachable and this reclaims
420 /// it. A key deleted before the oldest live reader's snapshot is dropped
421 /// entirely.
422 ///
423 /// Call it periodically, or after retiring long-running snapshots, to bound
424 /// memory. It is safe to call at any time and from any thread: a version a
425 /// live reader can still reach is never reclaimed. With the default
426 /// in-memory store this prunes the version chains; a custom
427 /// [`VersionStore`](crate::VersionStore) that keeps no history can leave the
428 /// default no-op in place.
429 ///
430 /// # Examples
431 ///
432 /// ```
433 /// use txn_db::Db;
434 ///
435 /// let db = Db::new();
436 /// // Overwrite the same key several times.
437 /// for v in 0..5u8 {
438 /// let mut tx = db.begin();
439 /// tx.put(b"k".to_vec(), vec![v]);
440 /// tx.commit()?;
441 /// }
442 ///
443 /// // No snapshot is held, so only the newest version need be kept.
444 /// let reclaimed = db.collect_garbage();
445 /// assert!(reclaimed > 0);
446 /// assert_eq!(db.begin().get(b"k")?.as_deref(), Some(&[4u8][..]));
447 /// # Ok::<(), txn_db::TxnError>(())
448 /// ```
449 pub fn collect_garbage(&self) -> usize {
450 self.inner.collect_garbage()
451 }
452}
453
454impl<S: VersionStore> Clone for Db<S> {
455 /// Clone the handle, not the data: the clone shares the same underlying
456 /// database.
457 fn clone(&self) -> Self {
458 Db {
459 inner: Arc::clone(&self.inner),
460 }
461 }
462}
463
464#[cfg(all(test, not(loom)))]
465#[allow(clippy::unwrap_used, clippy::expect_used)]
466mod tests {
467 use super::*;
468
469 #[test]
470 fn test_new_database_is_empty_at_zero() {
471 let db = Db::new();
472 assert_eq!(db.last_committed(), Timestamp::ZERO);
473 assert_eq!(db.begin().get(b"k").unwrap(), None);
474 }
475
476 #[test]
477 fn test_commit_makes_writes_visible_to_later_transactions() {
478 let db = Db::new();
479 let mut tx = db.begin();
480 tx.put(b"k".to_vec(), b"v".to_vec());
481 let ts = tx.commit().unwrap();
482 assert!(ts > Timestamp::ZERO);
483 assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
484 }
485
486 #[test]
487 fn test_snapshot_is_isolated_from_later_commits() {
488 let db = Db::new();
489 let mut tx = db.begin();
490 tx.put(b"k".to_vec(), b"v1".to_vec());
491 let _ = tx.commit().unwrap();
492
493 let snap = db.snapshot();
494 let mut tx = db.begin();
495 tx.put(b"k".to_vec(), b"v2".to_vec());
496 let _ = tx.commit().unwrap();
497
498 assert_eq!(snap.get(b"k").unwrap().as_deref(), Some(&b"v1"[..]));
499 }
500
501 #[test]
502 fn test_write_write_conflict_aborts_later_committer() {
503 let db = Db::new();
504 let mut a = db.begin();
505 let mut b = db.begin();
506 a.put(b"k".to_vec(), b"a".to_vec());
507 b.put(b"k".to_vec(), b"b".to_vec());
508
509 assert!(a.commit().is_ok());
510 let err = b.commit().expect_err("second committer must lose");
511 assert!(err.is_retryable());
512 assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"a"[..]));
513 }
514
515 #[test]
516 fn test_disjoint_keys_do_not_conflict() {
517 let db = Db::new();
518 let mut a = db.begin();
519 let mut b = db.begin();
520 a.put(b"a".to_vec(), b"1".to_vec());
521 b.put(b"b".to_vec(), b"2".to_vec());
522 assert!(a.commit().is_ok());
523 assert!(b.commit().is_ok());
524 }
525
526 #[test]
527 fn test_read_only_commit_returns_snapshot_timestamp() {
528 let db = Db::new();
529 let mut tx = db.begin();
530 tx.put(b"k".to_vec(), b"v".to_vec());
531 let ts = tx.commit().unwrap();
532
533 let ro = db.begin();
534 assert_eq!(ro.commit().unwrap(), ts);
535 }
536
537 #[test]
538 fn test_rollback_discards_writes() {
539 let db = Db::new();
540 let mut tx = db.begin();
541 tx.put(b"k".to_vec(), b"v".to_vec());
542 tx.rollback();
543 assert_eq!(db.begin().get(b"k").unwrap(), None);
544 }
545
546 #[test]
547 fn test_gc_reclaims_when_no_reader_is_held() {
548 let db = Db::new();
549 for v in 0..5u8 {
550 let mut tx = db.begin();
551 tx.put(b"k".to_vec(), vec![v]);
552 let _ = tx.commit().unwrap();
553 }
554 let reclaimed = db.collect_garbage();
555 assert!(reclaimed > 0);
556 assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&[4u8][..]));
557 }
558
559 #[test]
560 fn test_held_snapshot_pins_gc() {
561 let db = Db::new();
562 let mut tx = db.begin();
563 tx.put(b"k".to_vec(), vec![1]);
564 let _ = tx.commit().unwrap();
565
566 // Hold a snapshot of this state, then overwrite the key.
567 let snap = db.snapshot();
568 let mut tx = db.begin();
569 tx.put(b"k".to_vec(), vec![2]);
570 let _ = tx.commit().unwrap();
571
572 // GC must not reclaim the version the held snapshot still observes.
573 let _ = db.collect_garbage();
574 assert_eq!(snap.get(b"k").unwrap().as_deref(), Some(&[1u8][..]));
575
576 // Once the snapshot is dropped, the old version becomes reclaimable.
577 drop(snap);
578 let reclaimed = db.collect_garbage();
579 assert!(reclaimed > 0);
580 assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&[2u8][..]));
581 }
582
583 #[test]
584 fn test_clone_shares_state() {
585 let db = Db::new();
586 let db2 = db.clone();
587 let mut tx = db.begin();
588 tx.put(b"k".to_vec(), b"v".to_vec());
589 let _ = tx.commit().unwrap();
590 assert_eq!(db2.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
591 }
592
593 #[cfg(feature = "serializable")]
594 #[test]
595 fn test_serializable_rejects_write_skew() {
596 let db = Db::new();
597 let mut seed = db.begin();
598 seed.put(b"x".to_vec(), vec![1]);
599 seed.put(b"y".to_vec(), vec![1]);
600 let _ = seed.commit().unwrap();
601
602 // Two serializable transactions from the same snapshot each read both
603 // rows and write the one the other read.
604 let mut t1 = db.begin_serializable();
605 let mut t2 = db.begin_serializable();
606 let _ = t1.get(b"x").unwrap();
607 let _ = t1.get(b"y").unwrap();
608 let _ = t2.get(b"x").unwrap();
609 let _ = t2.get(b"y").unwrap();
610 t1.put(b"x".to_vec(), vec![0]);
611 t2.put(b"y".to_vec(), vec![0]);
612
613 assert!(t1.commit().is_ok());
614 // t2 read x, which t1 changed -> serializable validation aborts it.
615 let err = t2.commit().expect_err("write skew must be rejected");
616 assert!(err.is_retryable());
617 }
618
619 #[cfg(feature = "serializable")]
620 #[test]
621 fn test_snapshot_txn_allows_write_skew() {
622 let db = Db::new();
623 let mut seed = db.begin();
624 seed.put(b"x".to_vec(), vec![1]);
625 seed.put(b"y".to_vec(), vec![1]);
626 let _ = seed.commit().unwrap();
627
628 // The same schedule under plain snapshot isolation: both commit, because
629 // SI does not validate the read set.
630 let mut t1 = db.begin();
631 let mut t2 = db.begin();
632 let _ = t1.get(b"x").unwrap();
633 let _ = t1.get(b"y").unwrap();
634 let _ = t2.get(b"x").unwrap();
635 let _ = t2.get(b"y").unwrap();
636 t1.put(b"x".to_vec(), vec![0]);
637 t2.put(b"y".to_vec(), vec![0]);
638
639 assert!(t1.commit().is_ok());
640 assert!(t2.commit().is_ok());
641 }
642}