txn_db/store.rs
1//! The version store: where committed versions live.
2//!
3//! `txn-db` is the transaction layer, not the storage layer. It owns
4//! visibility, conflict detection, and commit ordering, but it delegates the
5//! actual keeping of versioned bytes to a [`VersionStore`]. That trait is the
6//! crate's Tier-3 seam: implement it over an LSM tree, a B-tree, a remote
7//! service — anything that can keep multiple timestamped versions of a key —
8//! and the transaction semantics compose on top unchanged.
9//!
10//! A [`MemoryStore`] ships for the common in-process case, for tests, and for
11//! examples. It is the default backing store of [`Db::new`](crate::Db::new).
12//!
13//! ## The contract a store must uphold
14//!
15//! A correct [`VersionStore`] keeps, for each key, the full history of versions
16//! it has been asked to apply, each tagged with the commit timestamp it was
17//! applied at. Its three obligations are:
18//!
19//! - [`get`](VersionStore::get) returns the *newest* version whose commit
20//! timestamp is less than or equal to the caller's snapshot timestamp — the
21//! snapshot-read rule. A tombstone (a delete) at that position reads as
22//! "absent".
23//! - [`latest_commit_ts`](VersionStore::latest_commit_ts) returns the timestamp
24//! of the most recent version of a key. The commit path uses it to detect
25//! write-write conflicts, so it must reflect every applied write.
26//! - [`apply`](VersionStore::apply) installs a batch of versions at one commit
27//! timestamp. The database calls it with strictly increasing timestamps and
28//! never concurrently with itself, so an implementation may assume applied
29//! versions arrive in commit order.
30
31use std::collections::HashMap;
32use std::sync::{Arc, PoisonError, RwLock};
33
34use crate::error::Result;
35use crate::timestamp::Timestamp;
36
37/// One entry in a commit batch handed to [`VersionStore::apply`].
38///
39/// A key paired with the value to write at the commit timestamp (`Some`) or a
40/// tombstone marking a delete (`None`).
41pub type WriteEntry = (Arc<[u8]>, Option<Arc<[u8]>>);
42
43/// A keeper of timestamped versions, the backend a [`Db`](crate::Db) is built on.
44///
45/// This is the extension point for plugging `txn-db` onto a real storage
46/// engine. The transaction layer calls these three methods and supplies all of
47/// the isolation logic itself; an implementation only has to store versions and
48/// answer the snapshot-read query honestly. The three methods below state the
49/// precise contract.
50///
51/// Implementations must be `Send + Sync`: a [`Db`](crate::Db) shares one store
52/// across every thread that holds a clone of it.
53///
54/// # Examples
55///
56/// Driving the shipped [`MemoryStore`] directly through the trait:
57///
58/// ```
59/// use std::sync::Arc;
60/// use txn_db::{MemoryStore, Timestamp, VersionStore};
61///
62/// let store = MemoryStore::new();
63/// let key: Arc<[u8]> = Arc::from(&b"k"[..]);
64///
65/// // Apply one version at commit timestamp 1.
66/// store.apply(Timestamp::from_raw(1), vec![(key.clone(), Some(Arc::from(&b"v1"[..])))])?;
67///
68/// // A reader at timestamp 1 sees it; a reader at timestamp 0 does not.
69/// assert_eq!(store.get(b"k", Timestamp::from_raw(1))?.as_deref(), Some(&b"v1"[..]));
70/// assert_eq!(store.get(b"k", Timestamp::ZERO)?, None);
71/// # Ok::<(), txn_db::TxnError>(())
72/// ```
73pub trait VersionStore: Send + Sync {
74 /// Return the value of `key` visible at `read_ts`.
75 ///
76 /// The result is the value of the newest version of `key` whose commit
77 /// timestamp is `<= read_ts`, or `None` if there is no such version or the
78 /// newest visible version is a tombstone (the key was deleted as of
79 /// `read_ts`).
80 ///
81 /// # Errors
82 ///
83 /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backend fails
84 /// to service the read. [`MemoryStore`] never fails.
85 fn get(&self, key: &[u8], read_ts: Timestamp) -> Result<Option<Arc<[u8]>>>;
86
87 /// Return the commit timestamp of the most recent version of `key`.
88 ///
89 /// Returns `None` if the key has never been written. The commit path uses
90 /// this to decide whether a key was modified after a transaction's
91 /// snapshot, so it must account for every version ever applied — including
92 /// tombstones.
93 ///
94 /// # Errors
95 ///
96 /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backend fails.
97 /// [`MemoryStore`] never fails.
98 fn latest_commit_ts(&self, key: &[u8]) -> Result<Option<Timestamp>>;
99
100 /// Install a batch of versions at `commit_ts`.
101 ///
102 /// Each entry is a key paired with either `Some(value)` (a write) or `None`
103 /// (a tombstone marking a delete). The database guarantees that `apply` is
104 /// called with strictly increasing `commit_ts` and is never run
105 /// concurrently with another `apply` on the same store, so versions arrive
106 /// in commit order.
107 ///
108 /// # Errors
109 ///
110 /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backend fails
111 /// to persist the batch. [`MemoryStore`] never fails.
112 fn apply(&self, commit_ts: Timestamp, writes: Vec<WriteEntry>) -> Result<()>;
113}
114
115/// One stored version of a key: the timestamp it became visible and its value.
116///
117/// A `value` of `None` is a tombstone — the key was deleted at `commit_ts`.
118#[derive(Debug, Clone)]
119struct Version {
120 commit_ts: Timestamp,
121 value: Option<Arc<[u8]>>,
122}
123
124/// An in-memory [`VersionStore`] backed by a hash map of version chains.
125///
126/// Each key maps to its versions in ascending commit-timestamp order, so a
127/// snapshot read is a binary search for the newest version at or below the
128/// snapshot timestamp. This is the default store of [`Db::new`](crate::Db::new)
129/// and is well suited to caches, tests, and workloads that fit in memory.
130///
131/// `MemoryStore` is thread-safe and is meant to be shared: a [`Db`](crate::Db)
132/// holds it behind an [`Arc`] and clones that handle to every thread. Versions
133/// accumulate until garbage collection lands (a later roadmap phase), so a
134/// long-lived store under heavy overwrite grows without bound for now.
135///
136/// # Examples
137///
138/// ```
139/// use txn_db::{Db, MemoryStore};
140///
141/// // `Db::new()` uses a `MemoryStore`; this is the explicit form.
142/// let db = Db::with_store(MemoryStore::new());
143/// let mut tx = db.begin();
144/// tx.put(b"hello".to_vec(), b"world".to_vec());
145/// tx.commit()?;
146/// # Ok::<(), txn_db::TxnError>(())
147/// ```
148#[derive(Debug, Default)]
149pub struct MemoryStore {
150 chains: RwLock<HashMap<Arc<[u8]>, Vec<Version>>>,
151}
152
153impl MemoryStore {
154 /// Create an empty in-memory store.
155 ///
156 /// # Examples
157 ///
158 /// ```
159 /// use txn_db::MemoryStore;
160 ///
161 /// let store = MemoryStore::new();
162 /// # let _ = store;
163 /// ```
164 #[inline]
165 #[must_use]
166 pub fn new() -> Self {
167 MemoryStore {
168 chains: RwLock::new(HashMap::new()),
169 }
170 }
171
172 /// Number of distinct keys that have ever been written.
173 ///
174 /// Counts keys, not versions, and includes keys whose latest version is a
175 /// tombstone. Primarily useful in tests and diagnostics.
176 ///
177 /// # Examples
178 ///
179 /// ```
180 /// use txn_db::{Db, MemoryStore};
181 ///
182 /// let store = MemoryStore::new();
183 /// assert_eq!(store.key_count(), 0);
184 /// ```
185 #[must_use]
186 pub fn key_count(&self) -> usize {
187 read_guard(&self.chains).len()
188 }
189}
190
191impl VersionStore for MemoryStore {
192 fn get(&self, key: &[u8], read_ts: Timestamp) -> Result<Option<Arc<[u8]>>> {
193 let chains = read_guard(&self.chains);
194 let Some(versions) = chains.get(key) else {
195 return Ok(None);
196 };
197 // Versions are kept sorted ascending by commit timestamp, so the newest
198 // version visible at `read_ts` is the last one that is <= read_ts.
199 let visible = versions.partition_point(|v| v.commit_ts <= read_ts);
200 match visible.checked_sub(1).map(|i| &versions[i]) {
201 Some(version) => Ok(version.value.clone()),
202 None => Ok(None),
203 }
204 }
205
206 fn latest_commit_ts(&self, key: &[u8]) -> Result<Option<Timestamp>> {
207 let chains = read_guard(&self.chains);
208 Ok(chains.get(key).and_then(|v| v.last()).map(|v| v.commit_ts))
209 }
210
211 fn apply(&self, commit_ts: Timestamp, writes: Vec<WriteEntry>) -> Result<()> {
212 let mut chains = write_guard(&self.chains);
213 for (key, value) in writes {
214 chains
215 .entry(key)
216 .or_default()
217 .push(Version { commit_ts, value });
218 }
219 Ok(())
220 }
221}
222
223/// Take a read guard, recovering the data if a previous holder panicked.
224///
225/// The store's critical sections never panic, so poisoning can only originate
226/// from a panic elsewhere while a guard was held. The protected map is still
227/// structurally valid in that case, so recovering the guard is the resilient
228/// choice and keeps the store usable rather than turning one panic into a
229/// permanent failure.
230#[inline]
231fn read_guard<T>(lock: &RwLock<T>) -> std::sync::RwLockReadGuard<'_, T> {
232 lock.read().unwrap_or_else(PoisonError::into_inner)
233}
234
235#[inline]
236fn write_guard<T>(lock: &RwLock<T>) -> std::sync::RwLockWriteGuard<'_, T> {
237 lock.write().unwrap_or_else(PoisonError::into_inner)
238}
239
240#[cfg(test)]
241#[allow(clippy::unwrap_used, clippy::expect_used)]
242mod tests {
243 use super::*;
244
245 fn k(b: &[u8]) -> Arc<[u8]> {
246 Arc::from(b)
247 }
248
249 fn v(b: &[u8]) -> Option<Arc<[u8]>> {
250 Some(Arc::from(b))
251 }
252
253 #[test]
254 fn test_get_on_missing_key_returns_none() {
255 let store = MemoryStore::new();
256 assert_eq!(store.get(b"absent", Timestamp::from_raw(10)).unwrap(), None);
257 }
258
259 #[test]
260 fn test_read_sees_only_versions_at_or_before_snapshot() {
261 let store = MemoryStore::new();
262 store
263 .apply(Timestamp::from_raw(2), vec![(k(b"x"), v(b"a"))])
264 .unwrap();
265 store
266 .apply(Timestamp::from_raw(4), vec![(k(b"x"), v(b"b"))])
267 .unwrap();
268
269 assert_eq!(store.get(b"x", Timestamp::from_raw(1)).unwrap(), None);
270 assert_eq!(
271 store.get(b"x", Timestamp::from_raw(2)).unwrap().as_deref(),
272 Some(&b"a"[..])
273 );
274 assert_eq!(
275 store.get(b"x", Timestamp::from_raw(3)).unwrap().as_deref(),
276 Some(&b"a"[..])
277 );
278 assert_eq!(
279 store.get(b"x", Timestamp::from_raw(4)).unwrap().as_deref(),
280 Some(&b"b"[..])
281 );
282 assert_eq!(
283 store.get(b"x", Timestamp::from_raw(99)).unwrap().as_deref(),
284 Some(&b"b"[..])
285 );
286 }
287
288 #[test]
289 fn test_tombstone_reads_as_absent() {
290 let store = MemoryStore::new();
291 store
292 .apply(Timestamp::from_raw(1), vec![(k(b"x"), v(b"a"))])
293 .unwrap();
294 store
295 .apply(Timestamp::from_raw(2), vec![(k(b"x"), None)])
296 .unwrap();
297
298 assert_eq!(
299 store.get(b"x", Timestamp::from_raw(1)).unwrap().as_deref(),
300 Some(&b"a"[..])
301 );
302 assert_eq!(store.get(b"x", Timestamp::from_raw(2)).unwrap(), None);
303 }
304
305 #[test]
306 fn test_latest_commit_ts_tracks_newest_write() {
307 let store = MemoryStore::new();
308 assert_eq!(store.latest_commit_ts(b"x").unwrap(), None);
309 store
310 .apply(Timestamp::from_raw(3), vec![(k(b"x"), v(b"a"))])
311 .unwrap();
312 store
313 .apply(Timestamp::from_raw(7), vec![(k(b"x"), None)])
314 .unwrap();
315 assert_eq!(
316 store.latest_commit_ts(b"x").unwrap(),
317 Some(Timestamp::from_raw(7))
318 );
319 }
320
321 #[test]
322 fn test_key_count_counts_distinct_keys() {
323 let store = MemoryStore::new();
324 store
325 .apply(
326 Timestamp::from_raw(1),
327 vec![(k(b"a"), v(b"1")), (k(b"b"), v(b"2"))],
328 )
329 .unwrap();
330 store
331 .apply(Timestamp::from_raw(2), vec![(k(b"a"), v(b"3"))])
332 .unwrap();
333 assert_eq!(store.key_count(), 2);
334 }
335}