txn_db/db.rs
1//! The database handle and the commit coordinator behind it.
2//!
3//! [`Db`] is the Tier-1 entry point: construct one, [`begin`](Db::begin)
4//! transactions against it, [`commit`](crate::Transaction::commit) them. A `Db`
5//! is a cheap, clonable handle to shared state — clone it freely and hand a
6//! clone to every thread that needs to read or write.
7//!
8//! The shared state itself lives in [`Inner`], which owns the version store and
9//! the [`Oracle`](crate::oracle::Oracle) that allocates timestamps and tracks
10//! the read watermark. Commit coordination is split deliberately: the oracle
11//! hands out timestamps lock-free, and the version store is the serialization
12//! point that validates and applies each commit atomically. The single global
13//! commit lock of the foundation release is gone.
14
15use std::sync::Arc;
16
17use crate::error::Result;
18use crate::oracle::Oracle;
19use crate::store::{MemoryStore, VersionStore, WriteEntry};
20use crate::timestamp::Timestamp;
21use crate::txn::{Snapshot, Transaction};
22
23/// Shared, reference-counted state for one logical database.
24///
25/// A [`Db`] is a handle to an `Arc<Inner>`; every clone of the `Db`, every
26/// [`Transaction`], and every [`Snapshot`] holds a clone of the same `Inner`,
27/// so they all read and commit against one version store and one timestamp
28/// sequence.
29pub(crate) struct Inner<S: VersionStore> {
30 /// The backing version store. Reads go to it; commits validate and apply
31 /// through it.
32 pub(crate) store: S,
33 /// Allocates timestamps and tracks the consistent-read watermark.
34 oracle: Oracle,
35 /// The durable commit log, present only for a database opened with
36 /// [`Db::open`]. `None` for an in-memory database.
37 #[cfg(feature = "durability")]
38 log: Option<crate::durable::CommitLog>,
39}
40
41impl<S: VersionStore> Inner<S> {
42 fn new(store: S) -> Self {
43 Inner {
44 store,
45 oracle: Oracle::new(),
46 #[cfg(feature = "durability")]
47 log: None,
48 }
49 }
50
51 /// The timestamp a transaction beginning now should read at.
52 #[inline]
53 fn read_ts(&self) -> Timestamp {
54 self.oracle.read_ts()
55 }
56
57 /// Register a live reader and return its read timestamp.
58 #[inline]
59 pub(crate) fn begin_reader(&self) -> Timestamp {
60 self.oracle.begin_reader()
61 }
62
63 /// Unregister a reader that began at `read_ts`.
64 #[inline]
65 pub(crate) fn end_reader(&self, read_ts: Timestamp) {
66 self.oracle.end_reader(read_ts);
67 }
68
69 /// Reclaim versions no live reader can observe, returning the count removed.
70 fn collect_garbage(&self) -> usize {
71 self.store.collect_garbage(self.oracle.low_watermark())
72 }
73
74 /// Allocate a commit timestamp, validate-and-apply through the store, then
75 /// release the timestamp into the watermark.
76 ///
77 /// The timestamp is reported to the oracle on both outcomes — a successful
78 /// commit and a rejected one — so a conflict never stalls the read watermark
79 /// behind the timestamp it consumed.
80 pub(crate) fn commit_writes(
81 &self,
82 read_ts: Timestamp,
83 writes: Vec<WriteEntry>,
84 reads: &[Arc<[u8]>],
85 ) -> Result<Timestamp> {
86 let commit_ts = self.oracle.alloc_commit_ts();
87
88 // Encode the durable record before the write set is consumed by the
89 // store. No cost for an in-memory database (no log).
90 #[cfg(feature = "durability")]
91 let record = self
92 .log
93 .as_ref()
94 .map(|_| crate::durable::encode_for_log(commit_ts, &writes));
95
96 let outcome = self.store.try_commit(read_ts, commit_ts, writes, reads);
97
98 // Make the commit durable before it is acknowledged. The validate-and-
99 // apply has already happened in memory but is not yet visible — the
100 // watermark only advances at `commit_done` below — so a crash before the
101 // sync completes leaves a transaction that was never acknowledged and is
102 // recovered as absent.
103 #[cfg(feature = "durability")]
104 if outcome.is_ok() {
105 if let (Some(log), Some(record)) = (self.log.as_ref(), record) {
106 if let Err(err) = log.append_committed(&record) {
107 self.oracle.commit_done(commit_ts);
108 return Err(err);
109 }
110 }
111 }
112
113 self.oracle.commit_done(commit_ts);
114 outcome.map(|()| commit_ts)
115 }
116
117 /// Build the shared inner state for a database recovered from a durable log.
118 #[cfg(feature = "durability")]
119 fn recovered(store: S, oracle: Oracle, log: crate::durable::CommitLog) -> Self {
120 Inner {
121 store,
122 oracle,
123 log: Some(log),
124 }
125 }
126}
127
128/// A transactional, multi-version key-value database.
129///
130/// `Db` is the front door. [`Db::new`] gives you an in-memory database;
131/// [`Db::with_store`] builds one over any [`VersionStore`]. From there the whole
132/// common case is [`begin`](Db::begin) / [`get`](crate::Transaction::get) /
133/// [`put`](crate::Transaction::put) / [`commit`](crate::Transaction::commit),
134/// with [`snapshot`](Db::snapshot) for read-only point-in-time views.
135///
136/// Transactions default to **snapshot isolation**. With the `serializable`
137/// feature enabled, `begin_serializable` starts a transaction whose read set is
138/// validated at commit, rejecting write skew and the other anomalies snapshot
139/// isolation permits.
140///
141/// A `Db` is a clonable handle over shared state, like an [`Arc`]. Cloning it
142/// is cheap and every clone refers to the same database, so the idiomatic way
143/// to use it across threads is to clone a handle per thread.
144///
145/// # Examples
146///
147/// The four-call common case:
148///
149/// ```
150/// use txn_db::Db;
151///
152/// let db = Db::new();
153///
154/// let mut tx = db.begin();
155/// tx.put(b"greeting".to_vec(), b"hei".to_vec());
156/// tx.commit()?;
157///
158/// let tx = db.begin();
159/// assert_eq!(tx.get(b"greeting")?.as_deref(), Some(&b"hei"[..]));
160/// # Ok::<(), txn_db::TxnError>(())
161/// ```
162///
163/// Sharing one database across threads:
164///
165/// ```
166/// use std::thread;
167/// use txn_db::Db;
168///
169/// let db = Db::new();
170/// let handles: Vec<_> = (0..4u8)
171/// .map(|i| {
172/// let db = db.clone();
173/// thread::spawn(move || {
174/// let mut tx = db.begin();
175/// tx.put(vec![i], vec![i]);
176/// // Independent keys never conflict.
177/// tx.commit().expect("commit");
178/// })
179/// })
180/// .collect();
181/// for h in handles {
182/// h.join().expect("thread");
183/// }
184/// # Ok::<(), txn_db::TxnError>(())
185/// ```
186pub struct Db<S: VersionStore = MemoryStore> {
187 inner: Arc<Inner<S>>,
188}
189
190impl Db<MemoryStore> {
191 /// Create an empty in-memory database.
192 ///
193 /// This is the default configuration: a [`MemoryStore`] backing store, ready
194 /// for [`begin`](Db::begin).
195 ///
196 /// # Examples
197 ///
198 /// ```
199 /// use txn_db::Db;
200 ///
201 /// let db = Db::new();
202 /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
203 /// ```
204 #[must_use]
205 pub fn new() -> Self {
206 Db::with_store(MemoryStore::new())
207 }
208
209 /// Open a durable database backed by a write-ahead log at `path`, replaying
210 /// any committed transactions already in the log.
211 ///
212 /// Every transaction committed against the returned database appends its
213 /// record to the log and syncs it before [`commit`](crate::Transaction::commit)
214 /// returns, so an acknowledged commit survives a crash. On open, the log is
215 /// replayed: each committed transaction is reinstated, and a transaction that
216 /// never reached the log — because it aborted, or because the process crashed
217 /// before its record was made durable — is simply absent. The recovered data
218 /// lives in memory; the log is the durable record from which it is rebuilt.
219 ///
220 /// Available with the `durability` feature.
221 ///
222 /// # Errors
223 ///
224 /// Returns [`TxnError::Durability`](crate::TxnError::Durability) if the log
225 /// cannot be opened or a record read back from it does not decode.
226 ///
227 /// # Examples
228 ///
229 /// ```
230 /// # #[cfg(feature = "durability")]
231 /// # {
232 /// # let dir = tempfile::tempdir().expect("tempdir");
233 /// # let path = dir.path().join("txn.wal");
234 /// use txn_db::Db;
235 ///
236 /// // Commit, then drop the database.
237 /// {
238 /// let db = Db::open(&path)?;
239 /// let mut tx = db.begin();
240 /// tx.put(b"k".to_vec(), b"v".to_vec());
241 /// tx.commit()?;
242 /// }
243 ///
244 /// // Reopening replays the log: the committed write is still there.
245 /// let db = Db::open(&path)?;
246 /// assert_eq!(db.begin().get(b"k")?.as_deref(), Some(&b"v"[..]));
247 /// # }
248 /// # Ok::<(), txn_db::TxnError>(())
249 /// ```
250 #[cfg(feature = "durability")]
251 #[cfg_attr(docsrs, doc(cfg(feature = "durability")))]
252 pub fn open(path: impl AsRef<std::path::Path>) -> Result<Db<MemoryStore>> {
253 let (log, mut recovered) = crate::durable::CommitLog::open(path)?;
254
255 // Replay in ascending commit-timestamp order; records may sit in the log
256 // out of that order because commits append after applying, concurrently.
257 recovered.sort_by_key(|commit| commit.commit_ts);
258
259 let store = MemoryStore::new();
260 let mut highest = Timestamp::ZERO;
261 for commit in recovered {
262 highest = highest.max(commit.commit_ts);
263 store.install_recovered(commit.commit_ts, commit.writes);
264 }
265
266 Ok(Db {
267 inner: Arc::new(Inner::recovered(store, Oracle::recovered(highest), log)),
268 })
269 }
270}
271
272impl Default for Db<MemoryStore> {
273 fn default() -> Self {
274 Db::new()
275 }
276}
277
278impl<S: VersionStore> Db<S> {
279 /// Create a database over a custom [`VersionStore`].
280 ///
281 /// This is the Tier-3 seam: supply any backing store and the transaction
282 /// semantics — snapshot isolation, read-your-own-writes, conflict detection
283 /// — compose on top of it unchanged.
284 ///
285 /// # Examples
286 ///
287 /// ```
288 /// use txn_db::{Db, MemoryStore};
289 ///
290 /// let db = Db::with_store(MemoryStore::new());
291 /// let mut tx = db.begin();
292 /// tx.put(b"k".to_vec(), b"v".to_vec());
293 /// tx.commit()?;
294 /// # Ok::<(), txn_db::TxnError>(())
295 /// ```
296 #[must_use]
297 pub fn with_store(store: S) -> Self {
298 Db {
299 inner: Arc::new(Inner::new(store)),
300 }
301 }
302
303 /// Begin a snapshot-isolation transaction over the current state.
304 ///
305 /// The transaction takes its snapshot at this moment: it reads as of the
306 /// most recent commit and is unaffected by commits that happen afterward.
307 /// Its writes are checked for write-write conflicts at commit, but its reads
308 /// are not validated — use `begin_serializable` (with the `serializable`
309 /// feature) when you need serializability.
310 ///
311 /// # Examples
312 ///
313 /// ```
314 /// use txn_db::Db;
315 ///
316 /// let db = Db::new();
317 /// let mut tx = db.begin();
318 /// tx.put(b"k".to_vec(), b"v".to_vec());
319 /// tx.commit()?;
320 /// # Ok::<(), txn_db::TxnError>(())
321 /// ```
322 pub fn begin(&self) -> Transaction<S> {
323 Transaction::new(Arc::clone(&self.inner), false)
324 }
325
326 /// Begin a serializable transaction over the current state.
327 ///
328 /// A serializable transaction tracks every key it reads and, at commit,
329 /// validates that none of them changed since its snapshot — in addition to
330 /// the write-write check every transaction gets. That read-set validation is
331 /// what rejects write skew and the read-only anomaly that plain snapshot
332 /// isolation permits, giving serializable behavior for the transactions that
333 /// commit writes. A serializable transaction that writes nothing commits
334 /// trivially, exactly like a read-only snapshot.
335 ///
336 /// Available with the `serializable` feature. Snapshot isolation remains the
337 /// default and is unaffected.
338 ///
339 /// # Examples
340 ///
341 /// ```
342 /// # #[cfg(feature = "serializable")]
343 /// # {
344 /// use txn_db::Db;
345 ///
346 /// let db = Db::new();
347 /// // Seed two rows that an invariant ties together.
348 /// let mut tx = db.begin();
349 /// tx.put(b"on_call:alice".to_vec(), vec![1]);
350 /// tx.put(b"on_call:bob".to_vec(), vec![1]);
351 /// tx.commit()?;
352 ///
353 /// // A serializable transaction validates the rows it read at commit.
354 /// let mut tx = db.begin_serializable();
355 /// let _alice = tx.get(b"on_call:alice")?;
356 /// let _bob = tx.get(b"on_call:bob")?;
357 /// tx.put(b"on_call:alice".to_vec(), vec![0]);
358 /// tx.commit()?;
359 /// # }
360 /// # Ok::<(), txn_db::TxnError>(())
361 /// ```
362 #[cfg(feature = "serializable")]
363 #[cfg_attr(docsrs, doc(cfg(feature = "serializable")))]
364 pub fn begin_serializable(&self) -> Transaction<S> {
365 Transaction::new(Arc::clone(&self.inner), true)
366 }
367
368 /// Take a read-only snapshot of the current state of the database.
369 ///
370 /// The returned [`Snapshot`] reads as of this instant and never changes,
371 /// even as other transactions commit. Use it to read several keys at one
372 /// consistent point in time without the overhead of a transaction.
373 ///
374 /// # Examples
375 ///
376 /// ```
377 /// use txn_db::Db;
378 ///
379 /// let db = Db::new();
380 /// let snap = db.snapshot();
381 /// assert_eq!(snap.get(b"k")?, None);
382 /// # Ok::<(), txn_db::TxnError>(())
383 /// ```
384 pub fn snapshot(&self) -> Snapshot<S> {
385 Snapshot::new(Arc::clone(&self.inner))
386 }
387
388 /// Read one key without opening a transaction.
389 ///
390 /// A convenience for the common single-read case: it takes a snapshot of the
391 /// current state and reads `key` from it, returning the newest committed
392 /// value or `None` if the key is absent. For reading several keys at one
393 /// consistent instant, take a [`snapshot`](Db::snapshot) and reuse it.
394 ///
395 /// # Errors
396 ///
397 /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing store
398 /// fails the read. The default in-memory store never fails.
399 ///
400 /// # Examples
401 ///
402 /// ```
403 /// use txn_db::Db;
404 ///
405 /// let db = Db::new();
406 /// db.put(b"k".to_vec(), b"v".to_vec())?;
407 /// assert_eq!(db.get(b"k")?.as_deref(), Some(&b"v"[..]));
408 /// assert_eq!(db.get(b"absent")?, None);
409 /// # Ok::<(), txn_db::TxnError>(())
410 /// ```
411 pub fn get(&self, key: &[u8]) -> Result<Option<Arc<[u8]>>> {
412 self.snapshot().get(key)
413 }
414
415 /// Write one key in its own transaction, retrying on conflict, and return the
416 /// commit timestamp.
417 ///
418 /// A convenience for the common single-write case: it begins a transaction,
419 /// buffers the write, and commits. If a concurrent transaction wins the
420 /// commit race it retries against a fresher snapshot, so this is
421 /// last-writer-wins and never surfaces a conflict — the value is always
422 /// installed. When you need to read-then-write atomically, or to control the
423 /// conflict outcome yourself, use [`begin`](Db::begin) instead.
424 ///
425 /// # Errors
426 ///
427 /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing store
428 /// fails to apply the write, or
429 /// [`TxnError::Durability`](crate::TxnError::Durability) for a durable
430 /// database whose commit cannot be made durable. Conflicts are retried, not
431 /// returned.
432 ///
433 /// # Examples
434 ///
435 /// ```
436 /// use txn_db::Db;
437 ///
438 /// let db = Db::new();
439 /// let ts = db.put(b"k".to_vec(), b"v".to_vec())?;
440 /// assert!(ts > txn_db::Timestamp::ZERO);
441 /// # Ok::<(), txn_db::TxnError>(())
442 /// ```
443 pub fn put(&self, key: impl Into<Arc<[u8]>>, value: impl Into<Arc<[u8]>>) -> Result<Timestamp> {
444 let key = key.into();
445 let value = value.into();
446 loop {
447 let mut tx = self.begin();
448 tx.put(Arc::clone(&key), Arc::clone(&value));
449 match tx.commit() {
450 Ok(ts) => return Ok(ts),
451 Err(e) if e.is_retryable() => continue,
452 Err(e) => return Err(e),
453 }
454 }
455 }
456
457 /// Delete one key in its own transaction, retrying on conflict, and return
458 /// the commit timestamp.
459 ///
460 /// The delete counterpart of [`put`](Db::put): last-writer-wins, conflicts
461 /// retried. After it returns the key reads as absent until written again.
462 ///
463 /// # Errors
464 ///
465 /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing store
466 /// fails, or [`TxnError::Durability`](crate::TxnError::Durability) for a
467 /// durable database whose commit cannot be made durable. Conflicts are
468 /// retried, not returned.
469 ///
470 /// # Examples
471 ///
472 /// ```
473 /// use txn_db::Db;
474 ///
475 /// let db = Db::new();
476 /// db.put(b"k".to_vec(), b"v".to_vec())?;
477 /// db.delete(b"k".to_vec())?;
478 /// assert_eq!(db.get(b"k")?, None);
479 /// # Ok::<(), txn_db::TxnError>(())
480 /// ```
481 pub fn delete(&self, key: impl Into<Arc<[u8]>>) -> Result<Timestamp> {
482 let key = key.into();
483 loop {
484 let mut tx = self.begin();
485 tx.delete(Arc::clone(&key));
486 match tx.commit() {
487 Ok(ts) => return Ok(ts),
488 Err(e) if e.is_retryable() => continue,
489 Err(e) => return Err(e),
490 }
491 }
492 }
493
494 /// The timestamp of the most recent commit visible to a new transaction.
495 ///
496 /// Returns [`Timestamp::ZERO`] for a database that has never been written.
497 /// This is the read watermark: the timestamp a transaction beginning now
498 /// would read at.
499 ///
500 /// # Examples
501 ///
502 /// ```
503 /// use txn_db::Db;
504 ///
505 /// let db = Db::new();
506 /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
507 ///
508 /// let mut tx = db.begin();
509 /// tx.put(b"k".to_vec(), b"v".to_vec());
510 /// let ts = tx.commit()?;
511 /// assert_eq!(db.last_committed(), ts);
512 /// # Ok::<(), txn_db::TxnError>(())
513 /// ```
514 #[must_use]
515 pub fn last_committed(&self) -> Timestamp {
516 self.inner.read_ts()
517 }
518
519 /// Reclaim versions that no live transaction or snapshot can observe,
520 /// returning how many were removed.
521 ///
522 /// `txn-db` keeps every version of a key so that an in-flight reader sees a
523 /// stable snapshot. Once no live reader can observe an old version — because
524 /// every active transaction and snapshot reads at a timestamp newer than a
525 /// later version of that key — the old one is unreachable and this reclaims
526 /// it. A key deleted before the oldest live reader's snapshot is dropped
527 /// entirely.
528 ///
529 /// Call it periodically, or after retiring long-running snapshots, to bound
530 /// memory. It is safe to call at any time and from any thread: a version a
531 /// live reader can still reach is never reclaimed. With the default
532 /// in-memory store this prunes the version chains; a custom
533 /// [`VersionStore`](crate::VersionStore) that keeps no history can leave the
534 /// default no-op in place.
535 ///
536 /// # Examples
537 ///
538 /// ```
539 /// use txn_db::Db;
540 ///
541 /// let db = Db::new();
542 /// // Overwrite the same key several times.
543 /// for v in 0..5u8 {
544 /// let mut tx = db.begin();
545 /// tx.put(b"k".to_vec(), vec![v]);
546 /// tx.commit()?;
547 /// }
548 ///
549 /// // No snapshot is held, so only the newest version need be kept.
550 /// let reclaimed = db.collect_garbage();
551 /// assert!(reclaimed > 0);
552 /// assert_eq!(db.begin().get(b"k")?.as_deref(), Some(&[4u8][..]));
553 /// # Ok::<(), txn_db::TxnError>(())
554 /// ```
555 pub fn collect_garbage(&self) -> usize {
556 self.inner.collect_garbage()
557 }
558}
559
560impl<S: VersionStore> Clone for Db<S> {
561 /// Clone the handle, not the data: the clone shares the same underlying
562 /// database.
563 fn clone(&self) -> Self {
564 Db {
565 inner: Arc::clone(&self.inner),
566 }
567 }
568}
569
570#[cfg(all(test, not(loom)))]
571#[allow(clippy::unwrap_used, clippy::expect_used)]
572mod tests {
573 use super::*;
574
575 #[test]
576 fn test_new_database_is_empty_at_zero() {
577 let db = Db::new();
578 assert_eq!(db.last_committed(), Timestamp::ZERO);
579 assert_eq!(db.begin().get(b"k").unwrap(), None);
580 }
581
582 #[test]
583 fn test_commit_makes_writes_visible_to_later_transactions() {
584 let db = Db::new();
585 let mut tx = db.begin();
586 tx.put(b"k".to_vec(), b"v".to_vec());
587 let ts = tx.commit().unwrap();
588 assert!(ts > Timestamp::ZERO);
589 assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
590 }
591
592 #[test]
593 fn test_snapshot_is_isolated_from_later_commits() {
594 let db = Db::new();
595 let mut tx = db.begin();
596 tx.put(b"k".to_vec(), b"v1".to_vec());
597 let _ = tx.commit().unwrap();
598
599 let snap = db.snapshot();
600 let mut tx = db.begin();
601 tx.put(b"k".to_vec(), b"v2".to_vec());
602 let _ = tx.commit().unwrap();
603
604 assert_eq!(snap.get(b"k").unwrap().as_deref(), Some(&b"v1"[..]));
605 }
606
607 #[test]
608 fn test_write_write_conflict_aborts_later_committer() {
609 let db = Db::new();
610 let mut a = db.begin();
611 let mut b = db.begin();
612 a.put(b"k".to_vec(), b"a".to_vec());
613 b.put(b"k".to_vec(), b"b".to_vec());
614
615 assert!(a.commit().is_ok());
616 let err = b.commit().expect_err("second committer must lose");
617 assert!(err.is_retryable());
618 assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"a"[..]));
619 }
620
621 #[test]
622 fn test_disjoint_keys_do_not_conflict() {
623 let db = Db::new();
624 let mut a = db.begin();
625 let mut b = db.begin();
626 a.put(b"a".to_vec(), b"1".to_vec());
627 b.put(b"b".to_vec(), b"2".to_vec());
628 assert!(a.commit().is_ok());
629 assert!(b.commit().is_ok());
630 }
631
632 #[test]
633 fn test_read_only_commit_returns_snapshot_timestamp() {
634 let db = Db::new();
635 let mut tx = db.begin();
636 tx.put(b"k".to_vec(), b"v".to_vec());
637 let ts = tx.commit().unwrap();
638
639 let ro = db.begin();
640 assert_eq!(ro.commit().unwrap(), ts);
641 }
642
643 #[test]
644 fn test_rollback_discards_writes() {
645 let db = Db::new();
646 let mut tx = db.begin();
647 tx.put(b"k".to_vec(), b"v".to_vec());
648 tx.rollback();
649 assert_eq!(db.begin().get(b"k").unwrap(), None);
650 }
651
652 #[test]
653 fn test_autocommit_put_get_delete() {
654 let db = Db::new();
655 let ts = db.put(b"k".to_vec(), b"v".to_vec()).unwrap();
656 assert!(ts > Timestamp::ZERO);
657 assert_eq!(db.get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
658 assert_eq!(db.get(b"absent").unwrap(), None);
659
660 let ts2 = db.delete(b"k".to_vec()).unwrap();
661 assert!(ts2 > ts);
662 assert_eq!(db.get(b"k").unwrap(), None);
663 }
664
665 #[test]
666 fn test_autocommit_put_is_last_writer_wins_under_contention() {
667 use std::thread;
668 let db = Db::new();
669 let handles: Vec<_> = (0..8u8)
670 .map(|t| {
671 let db = db.clone();
672 thread::spawn(move || {
673 for _ in 0..100 {
674 // All threads write the same hot key; autocommit retries
675 // internally, so none of these ever fail.
676 let _ = db.put(b"hot".to_vec(), vec![t]).unwrap();
677 }
678 })
679 })
680 .collect();
681 for h in handles {
682 h.join().unwrap();
683 }
684 // The key exists and holds one of the written values.
685 let v = db.get(b"hot").unwrap().unwrap();
686 assert!(v.len() == 1 && v[0] < 8);
687 }
688
689 #[test]
690 fn test_gc_reclaims_when_no_reader_is_held() {
691 let db = Db::new();
692 for v in 0..5u8 {
693 let mut tx = db.begin();
694 tx.put(b"k".to_vec(), vec![v]);
695 let _ = tx.commit().unwrap();
696 }
697 let reclaimed = db.collect_garbage();
698 assert!(reclaimed > 0);
699 assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&[4u8][..]));
700 }
701
702 #[test]
703 fn test_held_snapshot_pins_gc() {
704 let db = Db::new();
705 let mut tx = db.begin();
706 tx.put(b"k".to_vec(), vec![1]);
707 let _ = tx.commit().unwrap();
708
709 // Hold a snapshot of this state, then overwrite the key.
710 let snap = db.snapshot();
711 let mut tx = db.begin();
712 tx.put(b"k".to_vec(), vec![2]);
713 let _ = tx.commit().unwrap();
714
715 // GC must not reclaim the version the held snapshot still observes.
716 let _ = db.collect_garbage();
717 assert_eq!(snap.get(b"k").unwrap().as_deref(), Some(&[1u8][..]));
718
719 // Once the snapshot is dropped, the old version becomes reclaimable.
720 drop(snap);
721 let reclaimed = db.collect_garbage();
722 assert!(reclaimed > 0);
723 assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&[2u8][..]));
724 }
725
726 #[test]
727 fn test_clone_shares_state() {
728 let db = Db::new();
729 let db2 = db.clone();
730 let mut tx = db.begin();
731 tx.put(b"k".to_vec(), b"v".to_vec());
732 let _ = tx.commit().unwrap();
733 assert_eq!(db2.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
734 }
735
736 #[cfg(feature = "serializable")]
737 #[test]
738 fn test_serializable_rejects_write_skew() {
739 let db = Db::new();
740 let mut seed = db.begin();
741 seed.put(b"x".to_vec(), vec![1]);
742 seed.put(b"y".to_vec(), vec![1]);
743 let _ = seed.commit().unwrap();
744
745 // Two serializable transactions from the same snapshot each read both
746 // rows and write the one the other read.
747 let mut t1 = db.begin_serializable();
748 let mut t2 = db.begin_serializable();
749 let _ = t1.get(b"x").unwrap();
750 let _ = t1.get(b"y").unwrap();
751 let _ = t2.get(b"x").unwrap();
752 let _ = t2.get(b"y").unwrap();
753 t1.put(b"x".to_vec(), vec![0]);
754 t2.put(b"y".to_vec(), vec![0]);
755
756 assert!(t1.commit().is_ok());
757 // t2 read x, which t1 changed -> serializable validation aborts it.
758 let err = t2.commit().expect_err("write skew must be rejected");
759 assert!(err.is_retryable());
760 }
761
762 #[cfg(feature = "serializable")]
763 #[test]
764 fn test_snapshot_txn_allows_write_skew() {
765 let db = Db::new();
766 let mut seed = db.begin();
767 seed.put(b"x".to_vec(), vec![1]);
768 seed.put(b"y".to_vec(), vec![1]);
769 let _ = seed.commit().unwrap();
770
771 // The same schedule under plain snapshot isolation: both commit, because
772 // SI does not validate the read set.
773 let mut t1 = db.begin();
774 let mut t2 = db.begin();
775 let _ = t1.get(b"x").unwrap();
776 let _ = t1.get(b"y").unwrap();
777 let _ = t2.get(b"x").unwrap();
778 let _ = t2.get(b"y").unwrap();
779 t1.put(b"x".to_vec(), vec![0]);
780 t2.put(b"y".to_vec(), vec![0]);
781
782 assert!(t1.commit().is_ok());
783 assert!(t2.commit().is_ok());
784 }
785}