txn_db/db.rs
1//! The database handle and the commit coordinator behind it.
2//!
3//! [`Db`] is the Tier-1 entry point: construct one, [`begin`](Db::begin)
4//! transactions against it, [`commit`](crate::Transaction::commit) them. A `Db`
5//! is a cheap, clonable handle to shared state — clone it freely and hand a
6//! clone to every thread that needs to read or write.
7//!
8//! The shared state itself lives in [`Inner`], which owns the version store and
9//! the [`Oracle`](crate::oracle::Oracle) that allocates timestamps and tracks
10//! the read watermark. Commit coordination is split deliberately: the oracle
11//! hands out timestamps lock-free, and the version store is the serialization
12//! point that validates and applies each commit atomically. The single global
13//! commit lock of the foundation release is gone.
14
15use std::sync::Arc;
16
17use crate::error::Result;
18use crate::oracle::Oracle;
19use crate::store::{MemoryStore, VersionStore, WriteEntry};
20use crate::timestamp::Timestamp;
21use crate::txn::{Snapshot, Transaction};
22
23/// Shared, reference-counted state for one logical database.
24///
25/// A [`Db`] is a handle to an `Arc<Inner>`; every clone of the `Db`, every
26/// [`Transaction`], and every [`Snapshot`] holds a clone of the same `Inner`,
27/// so they all read and commit against one version store and one timestamp
28/// sequence.
29pub(crate) struct Inner<S: VersionStore> {
30 /// The backing version store. Reads go to it; commits validate and apply
31 /// through it.
32 pub(crate) store: S,
33 /// Allocates timestamps and tracks the consistent-read watermark.
34 oracle: Oracle,
35 /// The durable commit log, present only for a database opened with
36 /// [`Db::open`]. `None` for an in-memory database.
37 #[cfg(feature = "durability")]
38 log: Option<crate::durable::CommitLog>,
39}
40
41impl<S: VersionStore> Inner<S> {
42 fn new(store: S) -> Self {
43 Inner {
44 store,
45 oracle: Oracle::new(),
46 #[cfg(feature = "durability")]
47 log: None,
48 }
49 }
50
51 /// The timestamp a transaction beginning now should read at.
52 #[inline]
53 fn read_ts(&self) -> Timestamp {
54 self.oracle.read_ts()
55 }
56
57 /// Allocate a commit timestamp, validate-and-apply through the store, then
58 /// release the timestamp into the watermark.
59 ///
60 /// The timestamp is reported to the oracle on both outcomes — a successful
61 /// commit and a rejected one — so a conflict never stalls the read watermark
62 /// behind the timestamp it consumed.
63 pub(crate) fn commit_writes(
64 &self,
65 read_ts: Timestamp,
66 writes: Vec<WriteEntry>,
67 reads: &[Arc<[u8]>],
68 ) -> Result<Timestamp> {
69 let commit_ts = self.oracle.alloc_commit_ts();
70
71 // Encode the durable record before the write set is consumed by the
72 // store. No cost for an in-memory database (no log).
73 #[cfg(feature = "durability")]
74 let record = self
75 .log
76 .as_ref()
77 .map(|_| crate::durable::encode_for_log(commit_ts, &writes));
78
79 let outcome = self.store.try_commit(read_ts, commit_ts, writes, reads);
80
81 // Make the commit durable before it is acknowledged. The validate-and-
82 // apply has already happened in memory but is not yet visible — the
83 // watermark only advances at `commit_done` below — so a crash before the
84 // sync completes leaves a transaction that was never acknowledged and is
85 // recovered as absent.
86 #[cfg(feature = "durability")]
87 if outcome.is_ok() {
88 if let (Some(log), Some(record)) = (self.log.as_ref(), record) {
89 if let Err(err) = log.append_committed(&record) {
90 self.oracle.commit_done(commit_ts);
91 return Err(err);
92 }
93 }
94 }
95
96 self.oracle.commit_done(commit_ts);
97 outcome.map(|()| commit_ts)
98 }
99
100 /// Build the shared inner state for a database recovered from a durable log.
101 #[cfg(feature = "durability")]
102 fn recovered(store: S, oracle: Oracle, log: crate::durable::CommitLog) -> Self {
103 Inner {
104 store,
105 oracle,
106 log: Some(log),
107 }
108 }
109}
110
111/// A transactional, multi-version key-value database.
112///
113/// `Db` is the front door. [`Db::new`] gives you an in-memory database;
114/// [`Db::with_store`] builds one over any [`VersionStore`]. From there the whole
115/// common case is [`begin`](Db::begin) / [`get`](crate::Transaction::get) /
116/// [`put`](crate::Transaction::put) / [`commit`](crate::Transaction::commit),
117/// with [`snapshot`](Db::snapshot) for read-only point-in-time views.
118///
119/// Transactions default to **snapshot isolation**. With the `serializable`
120/// feature enabled, `begin_serializable` starts a transaction whose read set is
121/// validated at commit, rejecting write skew and the other anomalies snapshot
122/// isolation permits.
123///
124/// A `Db` is a clonable handle over shared state, like an [`Arc`]. Cloning it
125/// is cheap and every clone refers to the same database, so the idiomatic way
126/// to use it across threads is to clone a handle per thread.
127///
128/// # Examples
129///
130/// The four-call common case:
131///
132/// ```
133/// use txn_db::Db;
134///
135/// let db = Db::new();
136///
137/// let mut tx = db.begin();
138/// tx.put(b"greeting".to_vec(), b"hei".to_vec());
139/// tx.commit()?;
140///
141/// let tx = db.begin();
142/// assert_eq!(tx.get(b"greeting")?.as_deref(), Some(&b"hei"[..]));
143/// # Ok::<(), txn_db::TxnError>(())
144/// ```
145///
146/// Sharing one database across threads:
147///
148/// ```
149/// use std::thread;
150/// use txn_db::Db;
151///
152/// let db = Db::new();
153/// let handles: Vec<_> = (0..4u8)
154/// .map(|i| {
155/// let db = db.clone();
156/// thread::spawn(move || {
157/// let mut tx = db.begin();
158/// tx.put(vec![i], vec![i]);
159/// // Independent keys never conflict.
160/// tx.commit().expect("commit");
161/// })
162/// })
163/// .collect();
164/// for h in handles {
165/// h.join().expect("thread");
166/// }
167/// # Ok::<(), txn_db::TxnError>(())
168/// ```
169pub struct Db<S: VersionStore = MemoryStore> {
170 inner: Arc<Inner<S>>,
171}
172
173impl Db<MemoryStore> {
174 /// Create an empty in-memory database.
175 ///
176 /// This is the default configuration: a [`MemoryStore`] backing store, ready
177 /// for [`begin`](Db::begin).
178 ///
179 /// # Examples
180 ///
181 /// ```
182 /// use txn_db::Db;
183 ///
184 /// let db = Db::new();
185 /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
186 /// ```
187 #[must_use]
188 pub fn new() -> Self {
189 Db::with_store(MemoryStore::new())
190 }
191
192 /// Open a durable database backed by a write-ahead log at `path`, replaying
193 /// any committed transactions already in the log.
194 ///
195 /// Every transaction committed against the returned database appends its
196 /// record to the log and syncs it before [`commit`](crate::Transaction::commit)
197 /// returns, so an acknowledged commit survives a crash. On open, the log is
198 /// replayed: each committed transaction is reinstated, and a transaction that
199 /// never reached the log — because it aborted, or because the process crashed
200 /// before its record was made durable — is simply absent. The recovered data
201 /// lives in memory; the log is the durable record from which it is rebuilt.
202 ///
203 /// Available with the `durability` feature.
204 ///
205 /// # Errors
206 ///
207 /// Returns [`TxnError::Durability`](crate::TxnError::Durability) if the log
208 /// cannot be opened or a record read back from it does not decode.
209 ///
210 /// # Examples
211 ///
212 /// ```
213 /// # #[cfg(feature = "durability")]
214 /// # {
215 /// # let dir = tempfile::tempdir().expect("tempdir");
216 /// # let path = dir.path().join("txn.wal");
217 /// use txn_db::Db;
218 ///
219 /// // Commit, then drop the database.
220 /// {
221 /// let db = Db::open(&path)?;
222 /// let mut tx = db.begin();
223 /// tx.put(b"k".to_vec(), b"v".to_vec());
224 /// tx.commit()?;
225 /// }
226 ///
227 /// // Reopening replays the log: the committed write is still there.
228 /// let db = Db::open(&path)?;
229 /// assert_eq!(db.begin().get(b"k")?.as_deref(), Some(&b"v"[..]));
230 /// # }
231 /// # Ok::<(), txn_db::TxnError>(())
232 /// ```
233 #[cfg(feature = "durability")]
234 #[cfg_attr(docsrs, doc(cfg(feature = "durability")))]
235 pub fn open(path: impl AsRef<std::path::Path>) -> Result<Db<MemoryStore>> {
236 let (log, mut recovered) = crate::durable::CommitLog::open(path)?;
237
238 // Replay in ascending commit-timestamp order; records may sit in the log
239 // out of that order because commits append after applying, concurrently.
240 recovered.sort_by_key(|commit| commit.commit_ts);
241
242 let store = MemoryStore::new();
243 let mut highest = Timestamp::ZERO;
244 for commit in recovered {
245 highest = highest.max(commit.commit_ts);
246 store.install_recovered(commit.commit_ts, commit.writes);
247 }
248
249 Ok(Db {
250 inner: Arc::new(Inner::recovered(store, Oracle::recovered(highest), log)),
251 })
252 }
253}
254
255impl Default for Db<MemoryStore> {
256 fn default() -> Self {
257 Db::new()
258 }
259}
260
261impl<S: VersionStore> Db<S> {
262 /// Create a database over a custom [`VersionStore`].
263 ///
264 /// This is the Tier-3 seam: supply any backing store and the transaction
265 /// semantics — snapshot isolation, read-your-own-writes, conflict detection
266 /// — compose on top of it unchanged.
267 ///
268 /// # Examples
269 ///
270 /// ```
271 /// use txn_db::{Db, MemoryStore};
272 ///
273 /// let db = Db::with_store(MemoryStore::new());
274 /// let mut tx = db.begin();
275 /// tx.put(b"k".to_vec(), b"v".to_vec());
276 /// tx.commit()?;
277 /// # Ok::<(), txn_db::TxnError>(())
278 /// ```
279 #[must_use]
280 pub fn with_store(store: S) -> Self {
281 Db {
282 inner: Arc::new(Inner::new(store)),
283 }
284 }
285
286 /// Begin a snapshot-isolation transaction over the current state.
287 ///
288 /// The transaction takes its snapshot at this moment: it reads as of the
289 /// most recent commit and is unaffected by commits that happen afterward.
290 /// Its writes are checked for write-write conflicts at commit, but its reads
291 /// are not validated — use `begin_serializable` (with the `serializable`
292 /// feature) when you need serializability.
293 ///
294 /// # Examples
295 ///
296 /// ```
297 /// use txn_db::Db;
298 ///
299 /// let db = Db::new();
300 /// let mut tx = db.begin();
301 /// tx.put(b"k".to_vec(), b"v".to_vec());
302 /// tx.commit()?;
303 /// # Ok::<(), txn_db::TxnError>(())
304 /// ```
305 pub fn begin(&self) -> Transaction<S> {
306 Transaction::new(Arc::clone(&self.inner), self.inner.read_ts(), false)
307 }
308
309 /// Begin a serializable transaction over the current state.
310 ///
311 /// A serializable transaction tracks every key it reads and, at commit,
312 /// validates that none of them changed since its snapshot — in addition to
313 /// the write-write check every transaction gets. That read-set validation is
314 /// what rejects write skew and the read-only anomaly that plain snapshot
315 /// isolation permits, giving serializable behavior for the transactions that
316 /// commit writes. A serializable transaction that writes nothing commits
317 /// trivially, exactly like a read-only snapshot.
318 ///
319 /// Available with the `serializable` feature. Snapshot isolation remains the
320 /// default and is unaffected.
321 ///
322 /// # Examples
323 ///
324 /// ```
325 /// # #[cfg(feature = "serializable")]
326 /// # {
327 /// use txn_db::Db;
328 ///
329 /// let db = Db::new();
330 /// // Seed two rows that an invariant ties together.
331 /// let mut tx = db.begin();
332 /// tx.put(b"on_call:alice".to_vec(), vec![1]);
333 /// tx.put(b"on_call:bob".to_vec(), vec![1]);
334 /// tx.commit()?;
335 ///
336 /// // A serializable transaction validates the rows it read at commit.
337 /// let mut tx = db.begin_serializable();
338 /// let _alice = tx.get(b"on_call:alice")?;
339 /// let _bob = tx.get(b"on_call:bob")?;
340 /// tx.put(b"on_call:alice".to_vec(), vec![0]);
341 /// tx.commit()?;
342 /// # }
343 /// # Ok::<(), txn_db::TxnError>(())
344 /// ```
345 #[cfg(feature = "serializable")]
346 #[cfg_attr(docsrs, doc(cfg(feature = "serializable")))]
347 pub fn begin_serializable(&self) -> Transaction<S> {
348 Transaction::new(Arc::clone(&self.inner), self.inner.read_ts(), true)
349 }
350
351 /// Take a read-only snapshot of the current state of the database.
352 ///
353 /// The returned [`Snapshot`] reads as of this instant and never changes,
354 /// even as other transactions commit. Use it to read several keys at one
355 /// consistent point in time without the overhead of a transaction.
356 ///
357 /// # Examples
358 ///
359 /// ```
360 /// use txn_db::Db;
361 ///
362 /// let db = Db::new();
363 /// let snap = db.snapshot();
364 /// assert_eq!(snap.get(b"k")?, None);
365 /// # Ok::<(), txn_db::TxnError>(())
366 /// ```
367 pub fn snapshot(&self) -> Snapshot<S> {
368 Snapshot::new(Arc::clone(&self.inner), self.inner.read_ts())
369 }
370
371 /// The timestamp of the most recent commit visible to a new transaction.
372 ///
373 /// Returns [`Timestamp::ZERO`] for a database that has never been written.
374 /// This is the read watermark: the timestamp a transaction beginning now
375 /// would read at.
376 ///
377 /// # Examples
378 ///
379 /// ```
380 /// use txn_db::Db;
381 ///
382 /// let db = Db::new();
383 /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
384 ///
385 /// let mut tx = db.begin();
386 /// tx.put(b"k".to_vec(), b"v".to_vec());
387 /// let ts = tx.commit()?;
388 /// assert_eq!(db.last_committed(), ts);
389 /// # Ok::<(), txn_db::TxnError>(())
390 /// ```
391 #[must_use]
392 pub fn last_committed(&self) -> Timestamp {
393 self.inner.read_ts()
394 }
395}
396
397impl<S: VersionStore> Clone for Db<S> {
398 /// Clone the handle, not the data: the clone shares the same underlying
399 /// database.
400 fn clone(&self) -> Self {
401 Db {
402 inner: Arc::clone(&self.inner),
403 }
404 }
405}
406
407#[cfg(all(test, not(loom)))]
408#[allow(clippy::unwrap_used, clippy::expect_used)]
409mod tests {
410 use super::*;
411
412 #[test]
413 fn test_new_database_is_empty_at_zero() {
414 let db = Db::new();
415 assert_eq!(db.last_committed(), Timestamp::ZERO);
416 assert_eq!(db.begin().get(b"k").unwrap(), None);
417 }
418
419 #[test]
420 fn test_commit_makes_writes_visible_to_later_transactions() {
421 let db = Db::new();
422 let mut tx = db.begin();
423 tx.put(b"k".to_vec(), b"v".to_vec());
424 let ts = tx.commit().unwrap();
425 assert!(ts > Timestamp::ZERO);
426 assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
427 }
428
429 #[test]
430 fn test_snapshot_is_isolated_from_later_commits() {
431 let db = Db::new();
432 let mut tx = db.begin();
433 tx.put(b"k".to_vec(), b"v1".to_vec());
434 let _ = tx.commit().unwrap();
435
436 let snap = db.snapshot();
437 let mut tx = db.begin();
438 tx.put(b"k".to_vec(), b"v2".to_vec());
439 let _ = tx.commit().unwrap();
440
441 assert_eq!(snap.get(b"k").unwrap().as_deref(), Some(&b"v1"[..]));
442 }
443
444 #[test]
445 fn test_write_write_conflict_aborts_later_committer() {
446 let db = Db::new();
447 let mut a = db.begin();
448 let mut b = db.begin();
449 a.put(b"k".to_vec(), b"a".to_vec());
450 b.put(b"k".to_vec(), b"b".to_vec());
451
452 assert!(a.commit().is_ok());
453 let err = b.commit().expect_err("second committer must lose");
454 assert!(err.is_retryable());
455 assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"a"[..]));
456 }
457
458 #[test]
459 fn test_disjoint_keys_do_not_conflict() {
460 let db = Db::new();
461 let mut a = db.begin();
462 let mut b = db.begin();
463 a.put(b"a".to_vec(), b"1".to_vec());
464 b.put(b"b".to_vec(), b"2".to_vec());
465 assert!(a.commit().is_ok());
466 assert!(b.commit().is_ok());
467 }
468
469 #[test]
470 fn test_read_only_commit_returns_snapshot_timestamp() {
471 let db = Db::new();
472 let mut tx = db.begin();
473 tx.put(b"k".to_vec(), b"v".to_vec());
474 let ts = tx.commit().unwrap();
475
476 let ro = db.begin();
477 assert_eq!(ro.commit().unwrap(), ts);
478 }
479
480 #[test]
481 fn test_rollback_discards_writes() {
482 let db = Db::new();
483 let mut tx = db.begin();
484 tx.put(b"k".to_vec(), b"v".to_vec());
485 tx.rollback();
486 assert_eq!(db.begin().get(b"k").unwrap(), None);
487 }
488
489 #[test]
490 fn test_clone_shares_state() {
491 let db = Db::new();
492 let db2 = db.clone();
493 let mut tx = db.begin();
494 tx.put(b"k".to_vec(), b"v".to_vec());
495 let _ = tx.commit().unwrap();
496 assert_eq!(db2.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
497 }
498
499 #[cfg(feature = "serializable")]
500 #[test]
501 fn test_serializable_rejects_write_skew() {
502 let db = Db::new();
503 let mut seed = db.begin();
504 seed.put(b"x".to_vec(), vec![1]);
505 seed.put(b"y".to_vec(), vec![1]);
506 let _ = seed.commit().unwrap();
507
508 // Two serializable transactions from the same snapshot each read both
509 // rows and write the one the other read.
510 let mut t1 = db.begin_serializable();
511 let mut t2 = db.begin_serializable();
512 let _ = t1.get(b"x").unwrap();
513 let _ = t1.get(b"y").unwrap();
514 let _ = t2.get(b"x").unwrap();
515 let _ = t2.get(b"y").unwrap();
516 t1.put(b"x".to_vec(), vec![0]);
517 t2.put(b"y".to_vec(), vec![0]);
518
519 assert!(t1.commit().is_ok());
520 // t2 read x, which t1 changed -> serializable validation aborts it.
521 let err = t2.commit().expect_err("write skew must be rejected");
522 assert!(err.is_retryable());
523 }
524
525 #[cfg(feature = "serializable")]
526 #[test]
527 fn test_snapshot_txn_allows_write_skew() {
528 let db = Db::new();
529 let mut seed = db.begin();
530 seed.put(b"x".to_vec(), vec![1]);
531 seed.put(b"y".to_vec(), vec![1]);
532 let _ = seed.commit().unwrap();
533
534 // The same schedule under plain snapshot isolation: both commit, because
535 // SI does not validate the read set.
536 let mut t1 = db.begin();
537 let mut t2 = db.begin();
538 let _ = t1.get(b"x").unwrap();
539 let _ = t1.get(b"y").unwrap();
540 let _ = t2.get(b"x").unwrap();
541 let _ = t2.get(b"y").unwrap();
542 t1.put(b"x".to_vec(), vec![0]);
543 t2.put(b"y".to_vec(), vec![0]);
544
545 assert!(t1.commit().is_ok());
546 assert!(t2.commit().is_ok());
547 }
548}