txn_db/db.rs
1//! The database handle and the commit coordinator behind it.
2//!
3//! [`Db`] is the Tier-1 entry point: construct one, [`begin`](Db::begin)
4//! transactions against it, [`commit`](crate::Transaction::commit) them. A `Db`
5//! is a cheap, clonable handle to shared state — clone it freely and hand a
6//! clone to every thread that needs to read or write.
7//!
8//! The shared state itself lives in [`Inner`], which owns the version store and
9//! the small amount of coordination that snapshot isolation needs: a monotonic
10//! timestamp counter and a commit serialization point. Keeping that logic in
11//! one place is deliberate — commit ordering and conflict detection are the
12//! crate's correctness core, and they are easier to reason about when they are
13//! not scattered across the read and write handles.
14
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex, PoisonError};
17
18use crate::error::{Result, TxnError};
19use crate::store::{MemoryStore, VersionStore, WriteEntry};
20use crate::timestamp::Timestamp;
21use crate::txn::{Snapshot, Transaction};
22
23/// Shared, reference-counted state for one logical database.
24///
25/// A [`Db`] is a handle to an `Arc<Inner>`; every clone of the `Db`, every
26/// [`Transaction`], and every [`Snapshot`] holds a clone of the same `Inner`,
27/// so they all read and commit against one version store and one timestamp
28/// sequence.
29pub(crate) struct Inner<S: VersionStore> {
30 /// The backing version store. Reads go straight to it; commits apply to it.
31 pub(crate) store: S,
32 /// The next commit timestamp to hand out. Only ever advances.
33 next_ts: AtomicU64,
34 /// The highest timestamp whose writes are fully applied and visible. A new
35 /// transaction reads at this timestamp.
36 last_committed: AtomicU64,
37 /// Serializes the validate-then-apply commit critical section so two
38 /// commits cannot both pass conflict detection and then overwrite each
39 /// other. This single lock is the snapshot-isolation baseline; a sharded,
40 /// lock-free commit path is a later roadmap phase.
41 commit_lock: Mutex<()>,
42}
43
44impl<S: VersionStore> Inner<S> {
45 fn new(store: S) -> Self {
46 Inner {
47 store,
48 next_ts: AtomicU64::new(1),
49 last_committed: AtomicU64::new(Timestamp::ZERO.get()),
50 commit_lock: Mutex::new(()),
51 }
52 }
53
54 /// The timestamp a transaction beginning now should read at.
55 #[inline]
56 fn read_ts(&self) -> Timestamp {
57 Timestamp::from_raw(self.last_committed.load(Ordering::Acquire))
58 }
59
60 /// Validate and apply a transaction's writes under the commit lock.
61 ///
62 /// Holding `commit_lock` makes the sequence — check every written key for a
63 /// conflicting newer commit, allocate a commit timestamp, apply the batch,
64 /// publish the new high-water mark — atomic with respect to other commits.
65 pub(crate) fn commit_writes(
66 &self,
67 read_ts: Timestamp,
68 writes: std::collections::HashMap<Arc<[u8]>, Option<Arc<[u8]>>>,
69 ) -> Result<Timestamp> {
70 let _guard = self
71 .commit_lock
72 .lock()
73 .unwrap_or_else(PoisonError::into_inner);
74
75 // First-committer-wins: if any written key already has a version newer
76 // than this transaction's snapshot, another transaction beat it to the
77 // commit and this one must abort without applying anything.
78 for key in writes.keys() {
79 if let Some(latest) = self.store.latest_commit_ts(key)? {
80 if latest > read_ts {
81 return Err(TxnError::conflict(key.len()));
82 }
83 }
84 }
85
86 let commit_ts = Timestamp::from_raw(self.next_ts.fetch_add(1, Ordering::Relaxed));
87 let batch: Vec<WriteEntry> = writes.into_iter().collect();
88 self.store.apply(commit_ts, batch)?;
89
90 // Publish only after the writes are applied, so any transaction that
91 // observes this timestamp also observes the data it stamps.
92 self.last_committed
93 .store(commit_ts.get(), Ordering::Release);
94 Ok(commit_ts)
95 }
96}
97
98/// A transactional, multi-version key-value database.
99///
100/// `Db` is the front door. [`Db::new`] gives you an in-memory database;
101/// [`Db::with_store`] builds one over any [`VersionStore`]. From there the whole
102/// common case is [`begin`](Db::begin) / [`get`](crate::Transaction::get) /
103/// [`put`](crate::Transaction::put) / [`commit`](crate::Transaction::commit),
104/// with [`snapshot`](Db::snapshot) for read-only point-in-time views.
105///
106/// A `Db` is a clonable handle over shared state, like an [`Arc`]. Cloning it
107/// is cheap and every clone refers to the same database, so the idiomatic way
108/// to use it across threads is to clone a handle per thread.
109///
110/// # Examples
111///
112/// The four-call common case:
113///
114/// ```
115/// use txn_db::Db;
116///
117/// let db = Db::new();
118///
119/// let mut tx = db.begin();
120/// tx.put(b"greeting".to_vec(), b"hei".to_vec());
121/// tx.commit()?;
122///
123/// let tx = db.begin();
124/// assert_eq!(tx.get(b"greeting")?.as_deref(), Some(&b"hei"[..]));
125/// # Ok::<(), txn_db::TxnError>(())
126/// ```
127///
128/// Sharing one database across threads:
129///
130/// ```
131/// use std::thread;
132/// use txn_db::Db;
133///
134/// let db = Db::new();
135/// let handles: Vec<_> = (0..4u8)
136/// .map(|i| {
137/// let db = db.clone();
138/// thread::spawn(move || {
139/// let mut tx = db.begin();
140/// tx.put(vec![i], vec![i]);
141/// // Independent keys never conflict.
142/// tx.commit().expect("commit");
143/// })
144/// })
145/// .collect();
146/// for h in handles {
147/// h.join().expect("thread");
148/// }
149/// # Ok::<(), txn_db::TxnError>(())
150/// ```
151pub struct Db<S: VersionStore = MemoryStore> {
152 inner: Arc<Inner<S>>,
153}
154
155impl Db<MemoryStore> {
156 /// Create an empty in-memory database.
157 ///
158 /// This is the default configuration: a [`MemoryStore`] backing store, ready
159 /// for [`begin`](Db::begin).
160 ///
161 /// # Examples
162 ///
163 /// ```
164 /// use txn_db::Db;
165 ///
166 /// let db = Db::new();
167 /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
168 /// ```
169 #[must_use]
170 pub fn new() -> Self {
171 Db::with_store(MemoryStore::new())
172 }
173}
174
175impl Default for Db<MemoryStore> {
176 fn default() -> Self {
177 Db::new()
178 }
179}
180
181impl<S: VersionStore> Db<S> {
182 /// Create a database over a custom [`VersionStore`].
183 ///
184 /// This is the Tier-3 seam: supply any backing store and the transaction
185 /// semantics — snapshot isolation, read-your-own-writes, write-write
186 /// conflict detection — compose on top of it unchanged.
187 ///
188 /// # Examples
189 ///
190 /// ```
191 /// use txn_db::{Db, MemoryStore};
192 ///
193 /// let db = Db::with_store(MemoryStore::new());
194 /// let mut tx = db.begin();
195 /// tx.put(b"k".to_vec(), b"v".to_vec());
196 /// tx.commit()?;
197 /// # Ok::<(), txn_db::TxnError>(())
198 /// ```
199 #[must_use]
200 pub fn with_store(store: S) -> Self {
201 Db {
202 inner: Arc::new(Inner::new(store)),
203 }
204 }
205
206 /// Begin a read-write transaction over the current state of the database.
207 ///
208 /// The transaction takes its snapshot at this moment: it reads as of the
209 /// most recent commit and is unaffected by commits that happen afterward.
210 ///
211 /// # Examples
212 ///
213 /// ```
214 /// use txn_db::Db;
215 ///
216 /// let db = Db::new();
217 /// let mut tx = db.begin();
218 /// tx.put(b"k".to_vec(), b"v".to_vec());
219 /// tx.commit()?;
220 /// # Ok::<(), txn_db::TxnError>(())
221 /// ```
222 pub fn begin(&self) -> Transaction<S> {
223 Transaction::new(Arc::clone(&self.inner), self.inner.read_ts())
224 }
225
226 /// Take a read-only snapshot of the current state of the database.
227 ///
228 /// The returned [`Snapshot`] reads as of this instant and never changes,
229 /// even as other transactions commit. Use it to read several keys at one
230 /// consistent point in time without the overhead of a transaction.
231 ///
232 /// # Examples
233 ///
234 /// ```
235 /// use txn_db::Db;
236 ///
237 /// let db = Db::new();
238 /// let snap = db.snapshot();
239 /// assert_eq!(snap.get(b"k")?, None);
240 /// # Ok::<(), txn_db::TxnError>(())
241 /// ```
242 #[must_use]
243 pub fn snapshot(&self) -> Snapshot<S> {
244 Snapshot::new(Arc::clone(&self.inner), self.inner.read_ts())
245 }
246
247 /// The timestamp of the most recent successful commit.
248 ///
249 /// Returns [`Timestamp::ZERO`] for a database that has never been written.
250 /// This is the timestamp a transaction beginning now would read at.
251 ///
252 /// # Examples
253 ///
254 /// ```
255 /// use txn_db::Db;
256 ///
257 /// let db = Db::new();
258 /// assert_eq!(db.last_committed(), txn_db::Timestamp::ZERO);
259 ///
260 /// let mut tx = db.begin();
261 /// tx.put(b"k".to_vec(), b"v".to_vec());
262 /// let ts = tx.commit()?;
263 /// assert_eq!(db.last_committed(), ts);
264 /// # Ok::<(), txn_db::TxnError>(())
265 /// ```
266 #[must_use]
267 pub fn last_committed(&self) -> Timestamp {
268 self.inner.read_ts()
269 }
270}
271
272impl<S: VersionStore> Clone for Db<S> {
273 /// Clone the handle, not the data: the clone shares the same underlying
274 /// database.
275 fn clone(&self) -> Self {
276 Db {
277 inner: Arc::clone(&self.inner),
278 }
279 }
280}
281
282#[cfg(test)]
283#[allow(clippy::unwrap_used, clippy::expect_used)]
284mod tests {
285 use super::*;
286
287 #[test]
288 fn test_new_database_is_empty_at_zero() {
289 let db = Db::new();
290 assert_eq!(db.last_committed(), Timestamp::ZERO);
291 assert_eq!(db.begin().get(b"k").unwrap(), None);
292 }
293
294 #[test]
295 fn test_commit_makes_writes_visible_to_later_transactions() {
296 let db = Db::new();
297 let mut tx = db.begin();
298 tx.put(b"k".to_vec(), b"v".to_vec());
299 let ts = tx.commit().unwrap();
300 assert!(ts > Timestamp::ZERO);
301 assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
302 }
303
304 #[test]
305 fn test_snapshot_is_isolated_from_later_commits() {
306 let db = Db::new();
307 let mut tx = db.begin();
308 tx.put(b"k".to_vec(), b"v1".to_vec());
309 let _ = tx.commit().unwrap();
310
311 let snap = db.snapshot();
312 let mut tx = db.begin();
313 tx.put(b"k".to_vec(), b"v2".to_vec());
314 let _ = tx.commit().unwrap();
315
316 assert_eq!(snap.get(b"k").unwrap().as_deref(), Some(&b"v1"[..]));
317 }
318
319 #[test]
320 fn test_write_write_conflict_aborts_later_committer() {
321 let db = Db::new();
322 // Both transactions take the same empty snapshot.
323 let mut a = db.begin();
324 let mut b = db.begin();
325 a.put(b"k".to_vec(), b"a".to_vec());
326 b.put(b"k".to_vec(), b"b".to_vec());
327
328 assert!(a.commit().is_ok());
329 let err = b.commit().expect_err("second committer must lose");
330 assert!(err.is_retryable());
331 // First committer's value stands.
332 assert_eq!(db.begin().get(b"k").unwrap().as_deref(), Some(&b"a"[..]));
333 }
334
335 #[test]
336 fn test_disjoint_keys_do_not_conflict() {
337 let db = Db::new();
338 let mut a = db.begin();
339 let mut b = db.begin();
340 a.put(b"a".to_vec(), b"1".to_vec());
341 b.put(b"b".to_vec(), b"2".to_vec());
342 assert!(a.commit().is_ok());
343 assert!(b.commit().is_ok());
344 }
345
346 #[test]
347 fn test_read_only_commit_returns_snapshot_timestamp() {
348 let db = Db::new();
349 let mut tx = db.begin();
350 tx.put(b"k".to_vec(), b"v".to_vec());
351 let ts = tx.commit().unwrap();
352
353 let ro = db.begin();
354 assert_eq!(ro.commit().unwrap(), ts);
355 }
356
357 #[test]
358 fn test_rollback_discards_writes() {
359 let db = Db::new();
360 let mut tx = db.begin();
361 tx.put(b"k".to_vec(), b"v".to_vec());
362 tx.rollback();
363 assert_eq!(db.begin().get(b"k").unwrap(), None);
364 }
365
366 #[test]
367 fn test_clone_shares_state() {
368 let db = Db::new();
369 let db2 = db.clone();
370 let mut tx = db.begin();
371 tx.put(b"k".to_vec(), b"v".to_vec());
372 let _ = tx.commit().unwrap();
373 assert_eq!(db2.begin().get(b"k").unwrap().as_deref(), Some(&b"v"[..]));
374 }
375}