txn_db/store.rs
1//! The version store: where committed versions live.
2//!
3//! `txn-db` is the transaction layer, not the storage layer. It owns
4//! visibility, conflict detection, and commit ordering, but it delegates the
5//! actual keeping of versioned bytes to a [`VersionStore`]. That trait is the
6//! crate's Tier-3 seam: implement it over an LSM tree, a B-tree, a remote
7//! service — anything that can keep multiple timestamped versions of a key —
8//! and the transaction semantics compose on top unchanged.
9//!
10//! A [`MemoryStore`] ships for the common in-process case, for tests, and for
11//! examples. It is the default backing store of [`Db::new`](crate::Db::new).
12//!
13//! ## The contract a store must uphold
14//!
15//! A correct [`VersionStore`] keeps, for each key, the history of versions it
16//! has been asked to apply, each tagged with the commit timestamp it was applied
17//! at. Its two obligations are:
18//!
19//! - [`get`](VersionStore::get) returns the *newest* version whose commit
20//! timestamp is less than or equal to the caller's snapshot timestamp — the
21//! snapshot-read rule. A tombstone (a delete) at that position reads as
22//! "absent".
23//! - [`try_commit`](VersionStore::try_commit) validates a transaction's read and
24//! write sets against its snapshot and, if nothing conflicts, installs its
25//! writes at the commit timestamp — atomically with respect to other commits
26//! touching the same keys. This single method is what makes the store the
27//! serialization point for concurrent commits.
28//!
29//! ## Sharding
30//!
31//! [`MemoryStore`] partitions keys across independent shards, each with its own
32//! lock. Reads and commits that touch disjoint shards proceed without
33//! contending; a commit locks only the shards its keys fall in, in a fixed order
34//! so concurrent commits cannot deadlock. This is the sharded commit path the
35//! single global commit lock of the foundation release grew into.
36
37use std::collections::HashMap;
38use std::sync::Arc;
39
40use crate::error::{Result, TxnError};
41use crate::sync::{self, RwLock, RwLockWriteGuard};
42use crate::timestamp::Timestamp;
43
44/// One entry in a commit batch handed to [`VersionStore::try_commit`].
45///
46/// A key paired with the value to write at the commit timestamp (`Some`) or a
47/// tombstone marking a delete (`None`).
48pub type WriteEntry = (Arc<[u8]>, Option<Arc<[u8]>>);
49
50/// Default number of shards. A power of two so the shard index is a mask, not a
51/// division. Sixteen spreads contention well for in-process workloads without
52/// the per-commit cost of locking a long list of shards. Loom builds use far
53/// fewer to keep the interleaving search tractable.
54#[cfg(not(loom))]
55const DEFAULT_SHARDS: usize = 16;
56#[cfg(loom)]
57const DEFAULT_SHARDS: usize = 2;
58
59/// A keeper of timestamped versions, the backend a [`Db`](crate::Db) is built on.
60///
61/// This is the extension point for plugging `txn-db` onto a real storage
62/// engine. The transaction layer supplies the snapshot timestamps and the read
63/// and write sets; the store stores versions and enforces, atomically, that a
64/// commit only lands when nothing it depends on has changed. The two methods
65/// below state the precise contract.
66///
67/// Implementations must be `Send + Sync`: a [`Db`](crate::Db) shares one store
68/// across every thread that holds a clone of it.
69///
70/// # Examples
71///
72/// Driving the shipped [`MemoryStore`] directly through the trait:
73///
74/// ```
75/// use std::sync::Arc;
76/// use txn_db::{MemoryStore, Timestamp, VersionStore};
77///
78/// let store = MemoryStore::new();
79/// let key: Arc<[u8]> = Arc::from(&b"k"[..]);
80///
81/// // Commit one version at timestamp 1 (snapshot 0, no reads to validate).
82/// store.try_commit(
83/// Timestamp::ZERO,
84/// Timestamp::from_raw(1),
85/// vec![(key.clone(), Some(Arc::from(&b"v1"[..])))],
86/// &[],
87/// )?;
88///
89/// // A reader at timestamp 1 sees it; a reader at timestamp 0 does not.
90/// assert_eq!(store.get(b"k", Timestamp::from_raw(1))?.as_deref(), Some(&b"v1"[..]));
91/// assert_eq!(store.get(b"k", Timestamp::ZERO)?, None);
92/// # Ok::<(), txn_db::TxnError>(())
93/// ```
94pub trait VersionStore: Send + Sync {
95 /// Return the value of `key` visible at `read_ts`.
96 ///
97 /// The result is the value of the newest version of `key` whose commit
98 /// timestamp is `<= read_ts`, or `None` if there is no such version or the
99 /// newest visible version is a tombstone (the key was deleted as of
100 /// `read_ts`).
101 ///
102 /// # Errors
103 ///
104 /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backend fails
105 /// to service the read. [`MemoryStore`] never fails.
106 fn get(&self, key: &[u8], read_ts: Timestamp) -> Result<Option<Arc<[u8]>>>;
107
108 /// Validate a transaction and, if it does not conflict, apply its writes.
109 ///
110 /// The store must perform the following as one step, atomic with respect to
111 /// any other `try_commit` that touches an overlapping key:
112 ///
113 /// 1. **Validate.** For every key in `writes` and every key in `reads`,
114 /// check that the key has no version with a commit timestamp greater than
115 /// `read_ts` — that is, that nothing the transaction wrote or read has
116 /// changed since its snapshot. `reads` is empty for snapshot-isolation
117 /// transactions and carries the read set for serializable ones.
118 /// 2. **Apply.** If validation passes, install each write in `writes` as a
119 /// new version stamped `commit_ts` (`Some` is a value, `None` a
120 /// tombstone). The database guarantees `commit_ts` is unique and that
121 /// timestamps are handed out in increasing order.
122 ///
123 /// If any key fails validation, the store applies nothing and reports the
124 /// conflict.
125 ///
126 /// # Errors
127 ///
128 /// Returns [`TxnError::Conflict`](crate::TxnError::Conflict) if validation
129 /// fails; no writes are applied. Returns
130 /// [`TxnError::Store`](crate::TxnError::Store) if the backend fails to apply
131 /// the batch.
132 fn try_commit(
133 &self,
134 read_ts: Timestamp,
135 commit_ts: Timestamp,
136 writes: Vec<WriteEntry>,
137 reads: &[Arc<[u8]>],
138 ) -> Result<()>;
139}
140
141/// One stored version of a key: the timestamp it became visible and its value.
142///
143/// A `value` of `None` is a tombstone — the key was deleted at `commit_ts`.
144#[derive(Debug, Clone)]
145struct Version {
146 commit_ts: Timestamp,
147 value: Option<Arc<[u8]>>,
148}
149
150/// One shard's map from key to its version chain, kept in ascending
151/// commit-timestamp order.
152type Chains = HashMap<Arc<[u8]>, Vec<Version>>;
153
154/// One shard's slice of the keyspace.
155struct Shard {
156 chains: RwLock<Chains>,
157}
158
159/// An in-memory [`VersionStore`] that shards the keyspace for concurrency.
160///
161/// Each key is hashed to one of a fixed number of shards; each shard holds its
162/// keys' version chains behind its own reader-writer lock. Reads lock a single
163/// shard; a commit locks only the shards its keys fall in. Commits to disjoint
164/// shards therefore run in parallel, and the snapshot read of a key is a binary
165/// search within its shard for the newest version at or below the snapshot
166/// timestamp.
167///
168/// This is the default store of [`Db::new`](crate::Db::new) and suits caches,
169/// tests, and workloads that fit in memory. Versions accumulate until garbage
170/// collection lands (a later roadmap phase), so a long-lived store under heavy
171/// overwrite grows without bound for now.
172///
173/// # Examples
174///
175/// ```
176/// use txn_db::{Db, MemoryStore};
177///
178/// // `Db::new()` uses a `MemoryStore`; this is the explicit form.
179/// let db = Db::with_store(MemoryStore::new());
180/// let mut tx = db.begin();
181/// tx.put(b"hello".to_vec(), b"world".to_vec());
182/// tx.commit()?;
183/// # Ok::<(), txn_db::TxnError>(())
184/// ```
185pub struct MemoryStore {
186 shards: Box<[Shard]>,
187 /// `shard_count - 1`; ANDed with a key hash to pick a shard.
188 mask: usize,
189}
190
191impl Default for MemoryStore {
192 fn default() -> Self {
193 MemoryStore::new()
194 }
195}
196
197impl MemoryStore {
198 /// Create an empty in-memory store with the default shard count.
199 ///
200 /// # Examples
201 ///
202 /// ```
203 /// use txn_db::MemoryStore;
204 ///
205 /// let store = MemoryStore::new();
206 /// # let _ = store;
207 /// ```
208 #[must_use]
209 pub fn new() -> Self {
210 MemoryStore::with_shards(DEFAULT_SHARDS)
211 }
212
213 /// Create an empty store with a specific number of shards.
214 ///
215 /// `shards` is rounded up to a power of two (and at least one). More shards
216 /// reduce contention between commits that touch unrelated keys, at the cost
217 /// of a larger fixed footprint. The default of [`MemoryStore::new`] suits
218 /// most workloads; tune this only with a benchmark in hand.
219 ///
220 /// # Examples
221 ///
222 /// ```
223 /// use txn_db::MemoryStore;
224 ///
225 /// let store = MemoryStore::with_shards(64);
226 /// # let _ = store;
227 /// ```
228 #[must_use]
229 pub fn with_shards(shards: usize) -> Self {
230 let count = shards.max(1).next_power_of_two();
231 let shards = (0..count)
232 .map(|_| Shard {
233 chains: RwLock::new(HashMap::new()),
234 })
235 .collect::<Vec<_>>()
236 .into_boxed_slice();
237 MemoryStore {
238 shards,
239 mask: count - 1,
240 }
241 }
242
243 /// Number of distinct keys that have ever been written.
244 ///
245 /// Counts keys, not versions, and includes keys whose latest version is a
246 /// tombstone. Primarily useful in tests and diagnostics.
247 ///
248 /// # Examples
249 ///
250 /// ```
251 /// use txn_db::MemoryStore;
252 ///
253 /// let store = MemoryStore::new();
254 /// assert_eq!(store.key_count(), 0);
255 /// ```
256 #[must_use]
257 pub fn key_count(&self) -> usize {
258 self.shards
259 .iter()
260 .map(|shard| sync::read(&shard.chains).len())
261 .sum()
262 }
263
264 /// The shard a key belongs to.
265 #[inline]
266 fn shard_of(&self, key: &[u8]) -> usize {
267 (hash_key(key) as usize) & self.mask
268 }
269}
270
271impl VersionStore for MemoryStore {
272 fn get(&self, key: &[u8], read_ts: Timestamp) -> Result<Option<Arc<[u8]>>> {
273 let shard = &self.shards[self.shard_of(key)];
274 let chains = sync::read(&shard.chains);
275 Ok(visible_value(chains.get(key), read_ts))
276 }
277
278 fn try_commit(
279 &self,
280 read_ts: Timestamp,
281 commit_ts: Timestamp,
282 writes: Vec<WriteEntry>,
283 reads: &[Arc<[u8]>],
284 ) -> Result<()> {
285 // Shard of every touched key, computed once.
286 let write_shards: Vec<usize> = writes.iter().map(|(k, _)| self.shard_of(k)).collect();
287 let read_shards: Vec<usize> = reads.iter().map(|k| self.shard_of(k)).collect();
288
289 // The distinct shards to lock, in ascending order so concurrent commits
290 // acquire shared shards in the same sequence and cannot deadlock.
291 let mut to_lock: Vec<usize> = write_shards
292 .iter()
293 .copied()
294 .chain(read_shards.iter().copied())
295 .collect();
296 to_lock.sort_unstable();
297 to_lock.dedup();
298
299 let mut guards: Vec<RwLockWriteGuard<'_, Chains>> = Vec::with_capacity(to_lock.len());
300 for &shard in &to_lock {
301 guards.push(sync::write(&self.shards[shard].chains));
302 }
303
304 // Validate the write set, then the read set: abort if any touched key
305 // gained a version after the transaction's snapshot.
306 for (entry, &shard) in writes.iter().zip(&write_shards) {
307 if let Ok(pos) = to_lock.binary_search(&shard) {
308 if newer_than(guards[pos].get(entry.0.as_ref()), read_ts) {
309 return Err(TxnError::conflict(entry.0.len()));
310 }
311 }
312 }
313 for (key, &shard) in reads.iter().zip(&read_shards) {
314 if let Ok(pos) = to_lock.binary_search(&shard) {
315 if newer_than(guards[pos].get(key.as_ref()), read_ts) {
316 return Err(TxnError::conflict(key.len()));
317 }
318 }
319 }
320
321 // Apply: append a new version for each write under the held locks.
322 for ((key, value), &shard) in writes.into_iter().zip(&write_shards) {
323 if let Ok(pos) = to_lock.binary_search(&shard) {
324 guards[pos]
325 .entry(key)
326 .or_default()
327 .push(Version { commit_ts, value });
328 }
329 }
330 Ok(())
331 }
332}
333
334/// Whether `key`'s newest version (if any) was committed after `read_ts` — the
335/// condition that makes a commit conflict.
336#[inline]
337fn newer_than(versions: Option<&Vec<Version>>, read_ts: Timestamp) -> bool {
338 matches!(versions.and_then(|v| v.last()), Some(v) if v.commit_ts > read_ts)
339}
340
341/// The value of the newest version at or below `read_ts`, or `None` if there is
342/// none or it is a tombstone.
343#[inline]
344fn visible_value(versions: Option<&Vec<Version>>, read_ts: Timestamp) -> Option<Arc<[u8]>> {
345 let versions = versions?;
346 // Versions are ascending by commit timestamp; the newest visible one is the
347 // last entry whose timestamp is `<= read_ts`.
348 let visible = versions.partition_point(|v| v.commit_ts <= read_ts);
349 let idx = visible.checked_sub(1)?;
350 versions[idx].value.clone()
351}
352
353/// FNV-1a hash of a key, used only to pick a shard. A non-cryptographic spread
354/// is all the shard index needs.
355#[inline]
356fn hash_key(key: &[u8]) -> u64 {
357 let mut hash = 0xcbf2_9ce4_8422_2325;
358 for &byte in key {
359 hash ^= u64::from(byte);
360 hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
361 }
362 hash
363}
364
365#[cfg(all(test, not(loom)))]
366#[allow(clippy::unwrap_used, clippy::expect_used)]
367mod tests {
368 use super::*;
369
370 fn k(b: &[u8]) -> Arc<[u8]> {
371 Arc::from(b)
372 }
373
374 fn commit(store: &MemoryStore, ts: u64, writes: Vec<WriteEntry>) {
375 store
376 .try_commit(
377 Timestamp::from_raw(ts - 1),
378 Timestamp::from_raw(ts),
379 writes,
380 &[],
381 )
382 .expect("commit");
383 }
384
385 #[test]
386 fn test_get_on_missing_key_returns_none() {
387 let store = MemoryStore::new();
388 assert_eq!(store.get(b"absent", Timestamp::from_raw(10)).unwrap(), None);
389 }
390
391 #[test]
392 fn test_read_sees_only_versions_at_or_before_snapshot() {
393 let store = MemoryStore::new();
394 commit(&store, 2, vec![(k(b"x"), Some(k(b"a")))]);
395 commit(&store, 4, vec![(k(b"x"), Some(k(b"b")))]);
396
397 assert_eq!(store.get(b"x", Timestamp::from_raw(1)).unwrap(), None);
398 assert_eq!(
399 store.get(b"x", Timestamp::from_raw(2)).unwrap().as_deref(),
400 Some(&b"a"[..])
401 );
402 assert_eq!(
403 store.get(b"x", Timestamp::from_raw(3)).unwrap().as_deref(),
404 Some(&b"a"[..])
405 );
406 assert_eq!(
407 store.get(b"x", Timestamp::from_raw(4)).unwrap().as_deref(),
408 Some(&b"b"[..])
409 );
410 assert_eq!(
411 store.get(b"x", Timestamp::from_raw(99)).unwrap().as_deref(),
412 Some(&b"b"[..])
413 );
414 }
415
416 #[test]
417 fn test_tombstone_reads_as_absent() {
418 let store = MemoryStore::new();
419 commit(&store, 1, vec![(k(b"x"), Some(k(b"a")))]);
420 commit(&store, 2, vec![(k(b"x"), None)]);
421
422 assert_eq!(
423 store.get(b"x", Timestamp::from_raw(1)).unwrap().as_deref(),
424 Some(&b"a"[..])
425 );
426 assert_eq!(store.get(b"x", Timestamp::from_raw(2)).unwrap(), None);
427 }
428
429 #[test]
430 fn test_write_write_conflict_is_detected() {
431 let store = MemoryStore::new();
432 commit(&store, 5, vec![(k(b"x"), Some(k(b"a")))]);
433
434 // A transaction whose snapshot predates the existing version conflicts.
435 let err = store
436 .try_commit(
437 Timestamp::from_raw(4),
438 Timestamp::from_raw(6),
439 vec![(k(b"x"), Some(k(b"b")))],
440 &[],
441 )
442 .unwrap_err();
443 assert!(matches!(err, TxnError::Conflict { .. }));
444 // Nothing was applied.
445 assert_eq!(
446 store.get(b"x", Timestamp::from_raw(99)).unwrap().as_deref(),
447 Some(&b"a"[..])
448 );
449 }
450
451 #[test]
452 fn test_read_set_validation_detects_skew() {
453 let store = MemoryStore::new();
454 commit(&store, 5, vec![(k(b"y"), Some(k(b"1")))]);
455
456 // Snapshot 4, write x, but read y which changed at ts 5 -> conflict.
457 let err = store
458 .try_commit(
459 Timestamp::from_raw(4),
460 Timestamp::from_raw(6),
461 vec![(k(b"x"), Some(k(b"a")))],
462 &[k(b"y")],
463 )
464 .unwrap_err();
465 assert!(matches!(err, TxnError::Conflict { .. }));
466 }
467
468 #[test]
469 fn test_multi_shard_commit_applies_all_keys() {
470 let store = MemoryStore::with_shards(8);
471 let writes: Vec<WriteEntry> = (0u8..32).map(|i| (k(&[i]), Some(k(&[i])))).collect();
472 commit(&store, 1, writes);
473 for i in 0u8..32 {
474 assert_eq!(
475 store.get(&[i], Timestamp::from_raw(1)).unwrap().as_deref(),
476 Some(&[i][..])
477 );
478 }
479 assert_eq!(store.key_count(), 32);
480 }
481
482 #[test]
483 fn test_with_shards_rounds_up_to_power_of_two() {
484 let store = MemoryStore::with_shards(5);
485 assert_eq!(store.shards.len(), 8);
486 assert_eq!(store.mask, 7);
487 }
488}