Skip to main content

txn_db/
db.rs

1//! The database handle and the commit coordinator behind it.
2//!
3//! [`Db`] is the Tier-1 entry point: construct one, [`begin`](Db::begin)
4//! transactions against it, [`commit`](crate::Transaction::commit) them. A `Db`
5//! is a cheap, clonable handle to shared state — clone it freely and hand a
6//! clone to every thread that needs to read or write.
7//!
8//! The shared state itself lives in [`Inner`], which owns the version store and
9//! the [`Oracle`](crate::oracle::Oracle) that allocates timestamps and tracks
10//! the read watermark. Commit coordination is split deliberately: the oracle
11//! hands out timestamps lock-free, and the version store is the serialization
12//! point that validates and applies each commit atomically. The single global
13//! commit lock of the foundation release is gone.
14
15use std::sync::Arc;
16
17use crate::error::Result;
18use crate::oracle::Oracle;
19use crate::store::{MemoryStore, VersionStore, WriteEntry};
20use crate::timestamp::Timestamp;
21use crate::txn::{Snapshot, Transaction};
22
23/// Shared, reference-counted state for one logical database.
24///
25/// A [`Db`] is a handle to an `Arc<Inner>`; every clone of the `Db`, every
26/// [`Transaction`], and every [`Snapshot`] holds a clone of the same `Inner`,
27/// so they all read and commit against one version store and one timestamp
28/// sequence.
29pub(crate) struct Inner<S: VersionStore> {
30    /// The backing version store. Reads go to it; commits validate and apply
31    /// through it.
32    pub(crate) store: S,
33    /// Allocates timestamps and tracks the consistent-read watermark.
34    oracle: Oracle,
35    /// The durable commit log, present only for a database opened with
36    /// [`Db::open`]. `None` for an in-memory database.
37    #[cfg(feature = "durability")]
38    log: Option<crate::durable::CommitLog>,
39}
40
41impl<S: VersionStore> Inner<S> {
42    fn new(store: S) -> Self {
43        Inner {
44            store,
45            oracle: Oracle::new(),
46            #[cfg(feature = "durability")]
47            log: None,
48        }
49    }
50
51    /// The timestamp a transaction beginning now should read at.
52    #[inline]
53    fn read_ts(&self) -> Timestamp {
54        self.oracle.read_ts()
55    }
56
57    /// Register a live reader and return its read timestamp.
58    #[inline]
59    pub(crate) fn begin_reader(&self) -> Timestamp {
60        self.oracle.begin_reader()
61    }
62
63    /// Unregister a reader that began at `read_ts`.
64    #[inline]
65    pub(crate) fn end_reader(&self, read_ts: Timestamp) {
66        self.oracle.end_reader(read_ts);
67    }
68
69    /// Reclaim versions no live reader can observe, returning the count removed.
70    fn collect_garbage(&self) -> usize {
71        self.store.collect_garbage(self.oracle.low_watermark())
72    }
73
74    /// Allocate a commit timestamp, validate-and-apply through the store, then
75    /// release the timestamp into the watermark.
76    ///
77    /// The timestamp is reported to the oracle on both outcomes — a successful
78    /// commit and a rejected one — so a conflict never stalls the read watermark
79    /// behind the timestamp it consumed.
80    pub(crate) fn commit_writes(
81        &self,
82        read_ts: Timestamp,
83        writes: Vec<WriteEntry>,
84        reads: &[Arc<[u8]>],
85    ) -> Result<Timestamp> {
86        let commit_ts = self.oracle.alloc_commit_ts();
87
88        // Encode the durable record before the write set is consumed by the
89        // store. No cost for an in-memory database (no log).
90        #[cfg(feature = "durability")]
91        let record = self
92            .log
93            .as_ref()
94            .map(|_| crate::durable::encode_for_log(commit_ts, &writes));
95
96        let outcome = self.store.try_commit(read_ts, commit_ts, writes, reads);
97
98        // Make the commit durable before it is acknowledged. The validate-and-
99        // apply has already happened in memory but is not yet visible — the
100        // watermark only advances at `commit_done` below — so a crash before the
101        // sync completes leaves a transaction that was never acknowledged and is
102        // recovered as absent.
103        #[cfg(feature = "durability")]
104        if outcome.is_ok() {
105            if let (Some(log), Some(record)) = (self.log.as_ref(), record) {
106                if let Err(err) = log.append_committed(&record) {
107                    self.oracle.commit_done(commit_ts);
108                    return Err(err);
109                }
110            }
111        }
112
113        self.oracle.commit_done(commit_ts);
114        outcome.map(|()| commit_ts)
115    }
116
117    /// Build the shared inner state for a database recovered from a durable log.
118    #[cfg(feature = "durability")]
119    fn recovered(store: S, oracle: Oracle, log: crate::durable::CommitLog) -> Self {
120        Inner {
121            store,
122            oracle,
123            log: Some(log),
124        }
125    }
126}
127
128/// A transactional, multi-version key-value database.
129///
130/// `Db` is the front door. [`Db::new`] gives you an in-memory database;
131/// [`Db::with_store`] builds one over any [`VersionStore`]. From there the whole
132/// common case is [`begin`](Db::begin) / [`get`](crate::Transaction::get) /
133/// [`put`](crate::Transaction::put) / [`commit`](crate::Transaction::commit),
134/// with [`snapshot`](Db::snapshot) for read-only point-in-time views.
135///
136/// Transactions default to **snapshot isolation**. With the `serializable`
137/// feature enabled, `begin_serializable` starts a transaction whose read set is
138/// validated at commit, rejecting write skew and the other anomalies snapshot
139/// isolation permits.
140///
141/// A `Db` is a clonable handle over shared state, like an [`Arc`]. Cloning it
142/// is cheap and every clone refers to the same database, so the idiomatic way
143/// to use it across threads is to clone a handle per thread.
144///
145/// # Examples
146///
147/// The four-call common case:
148///
149/// ```
150/// use txn_db::Db;
151///
152/// let db = Db::new();
153///
154/// let mut tx = db.begin();
155/// tx.put(b"greeting".to_vec(), b"hei".to_vec());
156/// tx.commit()?;
157///
158/// let tx = db.begin();
159/// assert_eq!(tx.get(b"greeting")?.as_deref(), Some(&b"hei"[..]));
160/// # Ok::<(), txn_db::TxnError>(())
161/// ```
162///
163/// Sharing one database across threads:
164///
165/// ```
166/// use std::thread;
167/// use txn_db::Db;
168///
169/// let db = Db::new();
170/// let handles: Vec<_> = (0..4u8)
171///     .map(|i| {
172///         let db = db.clone();
173///         thread::spawn(move || {
174///             let mut tx = db.begin();
175///             tx.put(vec![i], vec![i]);
176///             // Independent keys never conflict.
177///             tx.commit().expect("commit");
178///         })
179///     })
180///     .collect();
181/// for h in handles {
182///     h.join().expect("thread");
183/// }
184/// # Ok::<(), txn_db::TxnError>(())
185/// ```
186pub struct Db<S: VersionStore = MemoryStore> {
187    inner: Arc<Inner<S>>,
188}
189
190impl Db<MemoryStore> {
191    /// Create an empty in-memory database.
192    ///
193    /// This is the default configuration: a [`MemoryStore`] backing store, ready
194    /// for [`begin`](Db::begin).
195    ///
196    /// # Examples
197    ///
198    /// ```
199    /// use txn_db::Db;
200    ///
201    /// let db = Db::new();
202    /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
203    /// ```
204    #[must_use]
205    pub fn new() -> Self {
206        Db::with_store(MemoryStore::new())
207    }
208
209    /// Open a durable database backed by a write-ahead log at `path`, replaying
210    /// any committed transactions already in the log.
211    ///
212    /// Every transaction committed against the returned database appends its
213    /// record to the log and syncs it before [`commit`](crate::Transaction::commit)
214    /// returns, so an acknowledged commit survives a crash. On open, the log is
215    /// replayed: each committed transaction is reinstated, and a transaction that
216    /// never reached the log — because it aborted, or because the process crashed
217    /// before its record was made durable — is simply absent. The recovered data
218    /// lives in memory; the log is the durable record from which it is rebuilt.
219    ///
220    /// Available with the `durability` feature.
221    ///
222    /// # Errors
223    ///
224    /// Returns [`TxnError::Durability`](crate::TxnError::Durability) if the log
225    /// cannot be opened or a record read back from it does not decode.
226    ///
227    /// # Examples
228    ///
229    /// ```
230    /// # #[cfg(feature = "durability")]
231    /// # {
232    /// # let dir = tempfile::tempdir().expect("tempdir");
233    /// # let path = dir.path().join("txn.wal");
234    /// use txn_db::Db;
235    ///
236    /// // Commit, then drop the database.
237    /// {
238    ///     let db = Db::open(&path)?;
239    ///     let mut tx = db.begin();
240    ///     tx.put(b"k".to_vec(), b"v".to_vec());
241    ///     tx.commit()?;
242    /// }
243    ///
244    /// // Reopening replays the log: the committed write is still there.
245    /// let db = Db::open(&path)?;
246    /// assert_eq!(db.begin().get(b"k")?.as_deref(), Some(&b"v"[..]));
247    /// # }
248    /// # Ok::<(), txn_db::TxnError>(())
249    /// ```
250    #[cfg(feature = "durability")]
251    #[cfg_attr(docsrs, doc(cfg(feature = "durability")))]
252    pub fn open(path: impl AsRef<std::path::Path>) -> Result<Db<MemoryStore>> {
253        let (log, mut recovered) = crate::durable::CommitLog::open(path)?;
254
255        // Replay in ascending commit-timestamp order; records may sit in the log
256        // out of that order because commits append after applying, concurrently.
257        recovered.sort_by_key(|commit| commit.commit_ts);
258
259        let store = MemoryStore::new();
260        let mut highest = Timestamp::ZERO;
261        for commit in recovered {
262            highest = highest.max(commit.commit_ts);
263            store.install_recovered(commit.commit_ts, commit.writes);
264        }
265
266        Ok(Db {
267            inner: Arc::new(Inner::recovered(store, Oracle::recovered(highest), log)),
268        })
269    }
270}
271
272impl Default for Db<MemoryStore> {
273    fn default() -> Self {
274        Db::new()
275    }
276}
277
278impl<S: VersionStore> Db<S> {
279    /// Create a database over a custom [`VersionStore`].
280    ///
281    /// This is the Tier-3 seam: supply any backing store and the transaction
282    /// semantics — snapshot isolation, read-your-own-writes, conflict detection
283    /// — compose on top of it unchanged.
284    ///
285    /// # Examples
286    ///
287    /// ```
288    /// use txn_db::{Db, MemoryStore};
289    ///
290    /// let db = Db::with_store(MemoryStore::new());
291    /// let mut tx = db.begin();
292    /// tx.put(b"k".to_vec(), b"v".to_vec());
293    /// tx.commit()?;
294    /// # Ok::<(), txn_db::TxnError>(())
295    /// ```
296    #[must_use]
297    pub fn with_store(store: S) -> Self {
298        Db {
299            inner: Arc::new(Inner::new(store)),
300        }
301    }
302
303    /// Begin a snapshot-isolation transaction over the current state.
304    ///
305    /// The transaction takes its snapshot at this moment: it reads as of the
306    /// most recent commit and is unaffected by commits that happen afterward.
307    /// Its writes are checked for write-write conflicts at commit, but its reads
308    /// are not validated — use `begin_serializable` (with the `serializable`
309    /// feature) when you need serializability.
310    ///
311    /// # Examples
312    ///
313    /// ```
314    /// use txn_db::Db;
315    ///
316    /// let db = Db::new();
317    /// let mut tx = db.begin();
318    /// tx.put(b"k".to_vec(), b"v".to_vec());
319    /// tx.commit()?;
320    /// # Ok::<(), txn_db::TxnError>(())
321    /// ```
322    pub fn begin(&self) -> Transaction<S> {
323        Transaction::new(Arc::clone(&self.inner), false)
324    }
325
326    /// Begin a serializable transaction over the current state.
327    ///
328    /// A serializable transaction tracks every key it reads and, at commit,
329    /// validates that none of them changed since its snapshot — in addition to
330    /// the write-write check every transaction gets. That read-set validation is
331    /// what rejects write skew and the read-only anomaly that plain snapshot
332    /// isolation permits, giving serializable behavior for the transactions that
333    /// commit writes. A serializable transaction that writes nothing commits
334    /// trivially, exactly like a read-only snapshot.
335    ///
336    /// Available with the `serializable` feature. Snapshot isolation remains the
337    /// default and is unaffected.
338    ///
339    /// # Examples
340    ///
341    /// ```
342    /// # #[cfg(feature = "serializable")]
343    /// # {
344    /// use txn_db::Db;
345    ///
346    /// let db = Db::new();
347    /// // Seed two rows that an invariant ties together.
348    /// let mut tx = db.begin();
349    /// tx.put(b"on_call:alice".to_vec(), vec![1]);
350    /// tx.put(b"on_call:bob".to_vec(), vec![1]);
351    /// tx.commit()?;
352    ///
353    /// // A serializable transaction validates the rows it read at commit.
354    /// let mut tx = db.begin_serializable();
355    /// let _alice = tx.get(b"on_call:alice")?;
356    /// let _bob = tx.get(b"on_call:bob")?;
357    /// tx.put(b"on_call:alice".to_vec(), vec![0]);
358    /// tx.commit()?;
359    /// # }
360    /// # Ok::<(), txn_db::TxnError>(())
361    /// ```
362    #[cfg(feature = "serializable")]
363    #[cfg_attr(docsrs, doc(cfg(feature = "serializable")))]
364    pub fn begin_serializable(&self) -> Transaction<S> {
365        Transaction::new(Arc::clone(&self.inner), true)
366    }
367
368    /// Take a read-only snapshot of the current state of the database.
369    ///
370    /// The returned [`Snapshot`] reads as of this instant and never changes,
371    /// even as other transactions commit. Use it to read several keys at one
372    /// consistent point in time without the overhead of a transaction.
373    ///
374    /// # Examples
375    ///
376    /// ```
377    /// use txn_db::Db;
378    ///
379    /// let db = Db::new();
380    /// let snap = db.snapshot();
381    /// assert_eq!(snap.get(b"k")?, None);
382    /// # Ok::<(), txn_db::TxnError>(())
383    /// ```
384    pub fn snapshot(&self) -> Snapshot<S> {
385        Snapshot::new(Arc::clone(&self.inner))
386    }
387
388    /// Read one key without opening a transaction.
389    ///
390    /// A convenience for the common single-read case: it takes a snapshot of the
391    /// current state and reads `key` from it, returning the newest committed
392    /// value or `None` if the key is absent. For reading several keys at one
393    /// consistent instant, take a [`snapshot`](Db::snapshot) and reuse it.
394    ///
395    /// # Errors
396    ///
397    /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing store
398    /// fails the read. The default in-memory store never fails.
399    ///
400    /// # Examples
401    ///
402    /// ```
403    /// use txn_db::Db;
404    ///
405    /// let db = Db::new();
406    /// db.put(b"k".to_vec(), b"v".to_vec())?;
407    /// assert_eq!(db.get(b"k")?.as_deref(), Some(&b"v"[..]));
408    /// assert_eq!(db.get(b"absent")?, None);
409    /// # Ok::<(), txn_db::TxnError>(())
410    /// ```
411    pub fn get(&self, key: &[u8]) -> Result<Option<Arc<[u8]>>> {
412        self.snapshot().get(key)
413    }
414
415    /// Write one key in its own transaction, retrying on conflict, and return the
416    /// commit timestamp.
417    ///
418    /// A convenience for the common single-write case: it begins a transaction,
419    /// buffers the write, and commits. If a concurrent transaction wins the
420    /// commit race it retries against a fresher snapshot, so this is
421    /// last-writer-wins and never surfaces a conflict — the value is always
422    /// installed. When you need to read-then-write atomically, or to control the
423    /// conflict outcome yourself, use [`begin`](Db::begin) instead.
424    ///
425    /// # Errors
426    ///
427    /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing store
428    /// fails to apply the write, or
429    /// [`TxnError::Durability`](crate::TxnError::Durability) for a durable
430    /// database whose commit cannot be made durable. Conflicts are retried, not
431    /// returned.
432    ///
433    /// # Examples
434    ///
435    /// ```
436    /// use txn_db::Db;
437    ///
438    /// let db = Db::new();
439    /// let ts = db.put(b"k".to_vec(), b"v".to_vec())?;
440    /// assert!(ts > txn_db::Timestamp::ZERO);
441    /// # Ok::<(), txn_db::TxnError>(())
442    /// ```
443    pub fn put(&self, key: impl Into<Arc<[u8]>>, value: impl Into<Arc<[u8]>>) -> Result<Timestamp> {
444        let key = key.into();
445        let value = value.into();
446        loop {
447            let mut tx = self.begin();
448            tx.put(Arc::clone(&key), Arc::clone(&value));
449            match tx.commit() {
450                Ok(ts) => return Ok(ts),
451                Err(e) if e.is_retryable() => continue,
452                Err(e) => return Err(e),
453            }
454        }
455    }
456
457    /// Delete one key in its own transaction, retrying on conflict, and return
458    /// the commit timestamp.
459    ///
460    /// The delete counterpart of [`put`](Db::put): last-writer-wins, conflicts
461    /// retried. After it returns the key reads as absent until written again.
462    ///
463    /// # Errors
464    ///
465    /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backing store
466    /// fails, or [`TxnError::Durability`](crate::TxnError::Durability) for a
467    /// durable database whose commit cannot be made durable. Conflicts are
468    /// retried, not returned.
469    ///
470    /// # Examples
471    ///
472    /// ```
473    /// use txn_db::Db;
474    ///
475    /// let db = Db::new();
476    /// db.put(b"k".to_vec(), b"v".to_vec())?;
477    /// db.delete(b"k".to_vec())?;
478    /// assert_eq!(db.get(b"k")?, None);
479    /// # Ok::<(), txn_db::TxnError>(())
480    /// ```
481    pub fn delete(&self, key: impl Into<Arc<[u8]>>) -> Result<Timestamp> {
482        let key = key.into();
483        loop {
484            let mut tx = self.begin();
485            tx.delete(Arc::clone(&key));
486            match tx.commit() {
487                Ok(ts) => return Ok(ts),
488                Err(e) if e.is_retryable() => continue,
489                Err(e) => return Err(e),
490            }
491        }
492    }
493
494    /// The timestamp of the most recent commit visible to a new transaction.
495    ///
496    /// Returns [`Timestamp::ZERO`] for a database that has never been written.
497    /// This is the read watermark: the timestamp a transaction beginning now
498    /// would read at.
499    ///
500    /// # Examples
501    ///
502    /// ```
503    /// use txn_db::Db;
504    ///
505    /// let db = Db::new();
506    /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
507    ///
508    /// let mut tx = db.begin();
509    /// tx.put(b"k".to_vec(), b"v".to_vec());
510    /// let ts = tx.commit()?;
511    /// assert_eq!(db.last_committed(), ts);
512    /// # Ok::<(), txn_db::TxnError>(())
513    /// ```
514    #[must_use]
515    pub fn last_committed(&self) -> Timestamp {
516        self.inner.read_ts()
517    }
518
519    /// Reclaim versions that no live transaction or snapshot can observe,
520    /// returning how many were removed.
521    ///
522    /// `txn-db` keeps every version of a key so that an in-flight reader sees a
523    /// stable snapshot. Once no live reader can observe an old version — because
524    /// every active transaction and snapshot reads at a timestamp newer than a
525    /// later version of that key — the old one is unreachable and this reclaims
526    /// it. A key deleted before the oldest live reader's snapshot is dropped
527    /// entirely.
528    ///
529    /// Call it periodically, or after retiring long-running snapshots, to bound
530    /// memory. It is safe to call at any time and from any thread: a version a
531    /// live reader can still reach is never reclaimed. With the default
532    /// in-memory store this prunes the version chains; a custom
533    /// [`VersionStore`](crate::VersionStore) that keeps no history can leave the
534    /// default no-op in place.
535    ///
536    /// # Examples
537    ///
538    /// ```
539    /// use txn_db::Db;
540    ///
541    /// let db = Db::new();
542    /// // Overwrite the same key several times.
543    /// for v in 0..5u8 {
544    ///     let mut tx = db.begin();
545    ///     tx.put(b"k".to_vec(), vec![v]);
546    ///     tx.commit()?;
547    /// }
548    ///
549    /// // No snapshot is held, so only the newest version need be kept.
550    /// let reclaimed = db.collect_garbage();
551    /// assert!(reclaimed > 0);
552    /// assert_eq!(db.begin().get(b"k")?.as_deref(), Some(&[4u8][..]));
553    /// # Ok::<(), txn_db::TxnError>(())
554    /// ```
555    pub fn collect_garbage(&self) -> usize {
556        self.inner.collect_garbage()
557    }
558}
559
560impl<S: VersionStore> Clone for Db<S> {
561    /// Clone the handle, not the data: the clone shares the same underlying
562    /// database.
563    fn clone(&self) -> Self {
564        Db {
565            inner: Arc::clone(&self.inner),
566        }
567    }
568}
569
570#[cfg(all(test, not(loom)))]
571#[allow(clippy::unwrap_used, clippy::expect_used)]
572mod tests {
573    use super::*;
574
575    #[test]
576    fn test_new_database_is_empty_at_zero() {
577        let db = Db::new();
578        assert_eq!(db.last_committed(), Timestamp::ZERO);
579        assert_eq!(db.begin().get(b"k").unwrap(), None);
580    }
581
582    #[test]
583    fn test_commit_makes_writes_visible_to_later_transactions() {
584        let db = Db::new();
585        let mut tx = db.begin();
586        tx.put(b"k".to_vec(), b"v".to_vec());
587        let ts = tx.commit().unwrap();
588        assert!(ts > Timestamp::ZERO);
589        assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
590    }
591
592    #[test]
593    fn test_snapshot_is_isolated_from_later_commits() {
594        let db = Db::new();
595        let mut tx = db.begin();
596        tx.put(b"k".to_vec(), b"v1".to_vec());
597        let _ = tx.commit().unwrap();
598
599        let snap = db.snapshot();
600        let mut tx = db.begin();
601        tx.put(b"k".to_vec(), b"v2".to_vec());
602        let _ = tx.commit().unwrap();
603
604        assert_eq!(snap.get(b"k").unwrap().as_deref(), Some(&b"v1"[..]));
605    }
606
607    #[test]
608    fn test_write_write_conflict_aborts_later_committer() {
609        let db = Db::new();
610        let mut a = db.begin();
611        let mut b = db.begin();
612        a.put(b"k".to_vec(), b"a".to_vec());
613        b.put(b"k".to_vec(), b"b".to_vec());
614
615        assert!(a.commit().is_ok());
616        let err = b.commit().expect_err("second committer must lose");
617        assert!(err.is_retryable());
618        assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"a"[..]));
619    }
620
621    #[test]
622    fn test_disjoint_keys_do_not_conflict() {
623        let db = Db::new();
624        let mut a = db.begin();
625        let mut b = db.begin();
626        a.put(b"a".to_vec(), b"1".to_vec());
627        b.put(b"b".to_vec(), b"2".to_vec());
628        assert!(a.commit().is_ok());
629        assert!(b.commit().is_ok());
630    }
631
632    #[test]
633    fn test_read_only_commit_returns_snapshot_timestamp() {
634        let db = Db::new();
635        let mut tx = db.begin();
636        tx.put(b"k".to_vec(), b"v".to_vec());
637        let ts = tx.commit().unwrap();
638
639        let ro = db.begin();
640        assert_eq!(ro.commit().unwrap(), ts);
641    }
642
643    #[test]
644    fn test_rollback_discards_writes() {
645        let db = Db::new();
646        let mut tx = db.begin();
647        tx.put(b"k".to_vec(), b"v".to_vec());
648        tx.rollback();
649        assert_eq!(db.begin().get(b"k").unwrap(), None);
650    }
651
652    #[test]
653    fn test_autocommit_put_get_delete() {
654        let db = Db::new();
655        let ts = db.put(b"k".to_vec(), b"v".to_vec()).unwrap();
656        assert!(ts > Timestamp::ZERO);
657        assert_eq!(db.get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
658        assert_eq!(db.get(b"absent").unwrap(), None);
659
660        let ts2 = db.delete(b"k".to_vec()).unwrap();
661        assert!(ts2 > ts);
662        assert_eq!(db.get(b"k").unwrap(), None);
663    }
664
665    #[test]
666    fn test_autocommit_put_is_last_writer_wins_under_contention() {
667        use std::thread;
668        let db = Db::new();
669        let handles: Vec<_> = (0..8u8)
670            .map(|t| {
671                let db = db.clone();
672                thread::spawn(move || {
673                    for _ in 0..100 {
674                        // All threads write the same hot key; autocommit retries
675                        // internally, so none of these ever fail.
676                        let _ = db.put(b"hot".to_vec(), vec![t]).unwrap();
677                    }
678                })
679            })
680            .collect();
681        for h in handles {
682            h.join().unwrap();
683        }
684        // The key exists and holds one of the written values.
685        let v = db.get(b"hot").unwrap().unwrap();
686        assert!(v.len() == 1 && v[0] < 8);
687    }
688
689    #[test]
690    fn test_gc_reclaims_when_no_reader_is_held() {
691        let db = Db::new();
692        for v in 0..5u8 {
693            let mut tx = db.begin();
694            tx.put(b"k".to_vec(), vec![v]);
695            let _ = tx.commit().unwrap();
696        }
697        let reclaimed = db.collect_garbage();
698        assert!(reclaimed > 0);
699        assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&[4u8][..]));
700    }
701
702    #[test]
703    fn test_held_snapshot_pins_gc() {
704        let db = Db::new();
705        let mut tx = db.begin();
706        tx.put(b"k".to_vec(), vec![1]);
707        let _ = tx.commit().unwrap();
708
709        // Hold a snapshot of this state, then overwrite the key.
710        let snap = db.snapshot();
711        let mut tx = db.begin();
712        tx.put(b"k".to_vec(), vec![2]);
713        let _ = tx.commit().unwrap();
714
715        // GC must not reclaim the version the held snapshot still observes.
716        let _ = db.collect_garbage();
717        assert_eq!(snap.get(b"k").unwrap().as_deref(), Some(&[1u8][..]));
718
719        // Once the snapshot is dropped, the old version becomes reclaimable.
720        drop(snap);
721        let reclaimed = db.collect_garbage();
722        assert!(reclaimed > 0);
723        assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&[2u8][..]));
724    }
725
726    #[test]
727    fn test_clone_shares_state() {
728        let db = Db::new();
729        let db2 = db.clone();
730        let mut tx = db.begin();
731        tx.put(b"k".to_vec(), b"v".to_vec());
732        let _ = tx.commit().unwrap();
733        assert_eq!(db2.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
734    }
735
736    #[cfg(feature = "serializable")]
737    #[test]
738    fn test_serializable_rejects_write_skew() {
739        let db = Db::new();
740        let mut seed = db.begin();
741        seed.put(b"x".to_vec(), vec![1]);
742        seed.put(b"y".to_vec(), vec![1]);
743        let _ = seed.commit().unwrap();
744
745        // Two serializable transactions from the same snapshot each read both
746        // rows and write the one the other read.
747        let mut t1 = db.begin_serializable();
748        let mut t2 = db.begin_serializable();
749        let _ = t1.get(b"x").unwrap();
750        let _ = t1.get(b"y").unwrap();
751        let _ = t2.get(b"x").unwrap();
752        let _ = t2.get(b"y").unwrap();
753        t1.put(b"x".to_vec(), vec![0]);
754        t2.put(b"y".to_vec(), vec![0]);
755
756        assert!(t1.commit().is_ok());
757        // t2 read x, which t1 changed -> serializable validation aborts it.
758        let err = t2.commit().expect_err("write skew must be rejected");
759        assert!(err.is_retryable());
760    }
761
762    #[cfg(feature = "serializable")]
763    #[test]
764    fn test_snapshot_txn_allows_write_skew() {
765        let db = Db::new();
766        let mut seed = db.begin();
767        seed.put(b"x".to_vec(), vec![1]);
768        seed.put(b"y".to_vec(), vec![1]);
769        let _ = seed.commit().unwrap();
770
771        // The same schedule under plain snapshot isolation: both commit, because
772        // SI does not validate the read set.
773        let mut t1 = db.begin();
774        let mut t2 = db.begin();
775        let _ = t1.get(b"x").unwrap();
776        let _ = t1.get(b"y").unwrap();
777        let _ = t2.get(b"x").unwrap();
778        let _ = t2.get(b"y").unwrap();
779        t1.put(b"x".to_vec(), vec![0]);
780        t2.put(b"y".to_vec(), vec![0]);
781
782        assert!(t1.commit().is_ok());
783        assert!(t2.commit().is_ok());
784    }
785}