Skip to main content

txn_db/
store.rs

1//! The version store: where committed versions live.
2//!
3//! `txn-db` is the transaction layer, not the storage layer. It owns
4//! visibility, conflict detection, and commit ordering, but it delegates the
5//! actual keeping of versioned bytes to a [`VersionStore`]. That trait is the
6//! crate's Tier-3 seam: implement it over an LSM tree, a B-tree, a remote
7//! service — anything that can keep multiple timestamped versions of a key —
8//! and the transaction semantics compose on top unchanged.
9//!
10//! A [`MemoryStore`] ships for the common in-process case, for tests, and for
11//! examples. It is the default backing store of [`Db::new`](crate::Db::new).
12//!
13//! ## The contract a store must uphold
14//!
15//! A correct [`VersionStore`] keeps, for each key, the full history of versions
16//! it has been asked to apply, each tagged with the commit timestamp it was
17//! applied at. Its three obligations are:
18//!
19//! - [`get`](VersionStore::get) returns the *newest* version whose commit
20//!   timestamp is less than or equal to the caller's snapshot timestamp — the
21//!   snapshot-read rule. A tombstone (a delete) at that position reads as
22//!   "absent".
23//! - [`latest_commit_ts`](VersionStore::latest_commit_ts) returns the timestamp
24//!   of the most recent version of a key. The commit path uses it to detect
25//!   write-write conflicts, so it must reflect every applied write.
26//! - [`apply`](VersionStore::apply) installs a batch of versions at one commit
27//!   timestamp. The database calls it with strictly increasing timestamps and
28//!   never concurrently with itself, so an implementation may assume applied
29//!   versions arrive in commit order.
30
31use std::collections::HashMap;
32use std::sync::{Arc, PoisonError, RwLock};
33
34use crate::error::Result;
35use crate::timestamp::Timestamp;
36
37/// One entry in a commit batch handed to [`VersionStore::apply`].
38///
39/// A key paired with the value to write at the commit timestamp (`Some`) or a
40/// tombstone marking a delete (`None`).
41pub type WriteEntry = (Arc<[u8]>, Option<Arc<[u8]>>);
42
43/// A keeper of timestamped versions, the backend a [`Db`](crate::Db) is built on.
44///
45/// This is the extension point for plugging `txn-db` onto a real storage
46/// engine. The transaction layer calls these three methods and supplies all of
47/// the isolation logic itself; an implementation only has to store versions and
48/// answer the snapshot-read query honestly. The three methods below state the
49/// precise contract.
50///
51/// Implementations must be `Send + Sync`: a [`Db`](crate::Db) shares one store
52/// across every thread that holds a clone of it.
53///
54/// # Examples
55///
56/// Driving the shipped [`MemoryStore`] directly through the trait:
57///
58/// ```
59/// use std::sync::Arc;
60/// use txn_db::{MemoryStore, Timestamp, VersionStore};
61///
62/// let store = MemoryStore::new();
63/// let key: Arc<[u8]> = Arc::from(&b"k"[..]);
64///
65/// // Apply one version at commit timestamp 1.
66/// store.apply(Timestamp::from_raw(1), vec![(key.clone(), Some(Arc::from(&b"v1"[..])))])?;
67///
68/// // A reader at timestamp 1 sees it; a reader at timestamp 0 does not.
69/// assert_eq!(store.get(b"k", Timestamp::from_raw(1))?.as_deref(), Some(&b"v1"[..]));
70/// assert_eq!(store.get(b"k", Timestamp::ZERO)?, None);
71/// # Ok::<(), txn_db::TxnError>(())
72/// ```
73pub trait VersionStore: Send + Sync {
74    /// Return the value of `key` visible at `read_ts`.
75    ///
76    /// The result is the value of the newest version of `key` whose commit
77    /// timestamp is `<= read_ts`, or `None` if there is no such version or the
78    /// newest visible version is a tombstone (the key was deleted as of
79    /// `read_ts`).
80    ///
81    /// # Errors
82    ///
83    /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backend fails
84    /// to service the read. [`MemoryStore`] never fails.
85    fn get(&self, key: &[u8], read_ts: Timestamp) -> Result<Option<Arc<[u8]>>>;
86
87    /// Return the commit timestamp of the most recent version of `key`.
88    ///
89    /// Returns `None` if the key has never been written. The commit path uses
90    /// this to decide whether a key was modified after a transaction's
91    /// snapshot, so it must account for every version ever applied — including
92    /// tombstones.
93    ///
94    /// # Errors
95    ///
96    /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backend fails.
97    /// [`MemoryStore`] never fails.
98    fn latest_commit_ts(&self, key: &[u8]) -> Result<Option<Timestamp>>;
99
100    /// Install a batch of versions at `commit_ts`.
101    ///
102    /// Each entry is a key paired with either `Some(value)` (a write) or `None`
103    /// (a tombstone marking a delete). The database guarantees that `apply` is
104    /// called with strictly increasing `commit_ts` and is never run
105    /// concurrently with another `apply` on the same store, so versions arrive
106    /// in commit order.
107    ///
108    /// # Errors
109    ///
110    /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backend fails
111    /// to persist the batch. [`MemoryStore`] never fails.
112    fn apply(&self, commit_ts: Timestamp, writes: Vec<WriteEntry>) -> Result<()>;
113}
114
115/// One stored version of a key: the timestamp it became visible and its value.
116///
117/// A `value` of `None` is a tombstone — the key was deleted at `commit_ts`.
118#[derive(Debug, Clone)]
119struct Version {
120    commit_ts: Timestamp,
121    value: Option<Arc<[u8]>>,
122}
123
124/// An in-memory [`VersionStore`] backed by a hash map of version chains.
125///
126/// Each key maps to its versions in ascending commit-timestamp order, so a
127/// snapshot read is a binary search for the newest version at or below the
128/// snapshot timestamp. This is the default store of [`Db::new`](crate::Db::new)
129/// and is well suited to caches, tests, and workloads that fit in memory.
130///
131/// `MemoryStore` is thread-safe and is meant to be shared: a [`Db`](crate::Db)
132/// holds it behind an [`Arc`] and clones that handle to every thread. Versions
133/// accumulate until garbage collection lands (a later roadmap phase), so a
134/// long-lived store under heavy overwrite grows without bound for now.
135///
136/// # Examples
137///
138/// ```
139/// use txn_db::{Db, MemoryStore};
140///
141/// // `Db::new()` uses a `MemoryStore`; this is the explicit form.
142/// let db = Db::with_store(MemoryStore::new());
143/// let mut tx = db.begin();
144/// tx.put(b"hello".to_vec(), b"world".to_vec());
145/// tx.commit()?;
146/// # Ok::<(), txn_db::TxnError>(())
147/// ```
148#[derive(Debug, Default)]
149pub struct MemoryStore {
150    chains: RwLock<HashMap<Arc<[u8]>, Vec<Version>>>,
151}
152
153impl MemoryStore {
154    /// Create an empty in-memory store.
155    ///
156    /// # Examples
157    ///
158    /// ```
159    /// use txn_db::MemoryStore;
160    ///
161    /// let store = MemoryStore::new();
162    /// # let _ = store;
163    /// ```
164    #[inline]
165    #[must_use]
166    pub fn new() -> Self {
167        MemoryStore {
168            chains: RwLock::new(HashMap::new()),
169        }
170    }
171
172    /// Number of distinct keys that have ever been written.
173    ///
174    /// Counts keys, not versions, and includes keys whose latest version is a
175    /// tombstone. Primarily useful in tests and diagnostics.
176    ///
177    /// # Examples
178    ///
179    /// ```
180    /// use txn_db::{Db, MemoryStore};
181    ///
182    /// let store = MemoryStore::new();
183    /// assert_eq!(store.key_count(), 0);
184    /// ```
185    #[must_use]
186    pub fn key_count(&self) -> usize {
187        read_guard(&self.chains).len()
188    }
189}
190
191impl VersionStore for MemoryStore {
192    fn get(&self, key: &[u8], read_ts: Timestamp) -> Result<Option<Arc<[u8]>>> {
193        let chains = read_guard(&self.chains);
194        let Some(versions) = chains.get(key) else {
195            return Ok(None);
196        };
197        // Versions are kept sorted ascending by commit timestamp, so the newest
198        // version visible at `read_ts` is the last one that is <= read_ts.
199        let visible = versions.partition_point(|v| v.commit_ts <= read_ts);
200        match visible.checked_sub(1).map(|i| &versions[i]) {
201            Some(version) => Ok(version.value.clone()),
202            None => Ok(None),
203        }
204    }
205
206    fn latest_commit_ts(&self, key: &[u8]) -> Result<Option<Timestamp>> {
207        let chains = read_guard(&self.chains);
208        Ok(chains.get(key).and_then(|v| v.last()).map(|v| v.commit_ts))
209    }
210
211    fn apply(&self, commit_ts: Timestamp, writes: Vec<WriteEntry>) -> Result<()> {
212        let mut chains = write_guard(&self.chains);
213        for (key, value) in writes {
214            chains
215                .entry(key)
216                .or_default()
217                .push(Version { commit_ts, value });
218        }
219        Ok(())
220    }
221}
222
223/// Take a read guard, recovering the data if a previous holder panicked.
224///
225/// The store's critical sections never panic, so poisoning can only originate
226/// from a panic elsewhere while a guard was held. The protected map is still
227/// structurally valid in that case, so recovering the guard is the resilient
228/// choice and keeps the store usable rather than turning one panic into a
229/// permanent failure.
230#[inline]
231fn read_guard<T>(lock: &RwLock<T>) -> std::sync::RwLockReadGuard<'_, T> {
232    lock.read().unwrap_or_else(PoisonError::into_inner)
233}
234
235#[inline]
236fn write_guard<T>(lock: &RwLock<T>) -> std::sync::RwLockWriteGuard<'_, T> {
237    lock.write().unwrap_or_else(PoisonError::into_inner)
238}
239
240#[cfg(test)]
241#[allow(clippy::unwrap_used, clippy::expect_used)]
242mod tests {
243    use super::*;
244
245    fn k(b: &[u8]) -> Arc<[u8]> {
246        Arc::from(b)
247    }
248
249    fn v(b: &[u8]) -> Option<Arc<[u8]>> {
250        Some(Arc::from(b))
251    }
252
253    #[test]
254    fn test_get_on_missing_key_returns_none() {
255        let store = MemoryStore::new();
256        assert_eq!(store.get(b"absent", Timestamp::from_raw(10)).unwrap(), None);
257    }
258
259    #[test]
260    fn test_read_sees_only_versions_at_or_before_snapshot() {
261        let store = MemoryStore::new();
262        store
263            .apply(Timestamp::from_raw(2), vec![(k(b"x"), v(b"a"))])
264            .unwrap();
265        store
266            .apply(Timestamp::from_raw(4), vec![(k(b"x"), v(b"b"))])
267            .unwrap();
268
269        assert_eq!(store.get(b"x", Timestamp::from_raw(1)).unwrap(), None);
270        assert_eq!(
271            store.get(b"x", Timestamp::from_raw(2)).unwrap().as_deref(),
272            Some(&b"a"[..])
273        );
274        assert_eq!(
275            store.get(b"x", Timestamp::from_raw(3)).unwrap().as_deref(),
276            Some(&b"a"[..])
277        );
278        assert_eq!(
279            store.get(b"x", Timestamp::from_raw(4)).unwrap().as_deref(),
280            Some(&b"b"[..])
281        );
282        assert_eq!(
283            store.get(b"x", Timestamp::from_raw(99)).unwrap().as_deref(),
284            Some(&b"b"[..])
285        );
286    }
287
288    #[test]
289    fn test_tombstone_reads_as_absent() {
290        let store = MemoryStore::new();
291        store
292            .apply(Timestamp::from_raw(1), vec![(k(b"x"), v(b"a"))])
293            .unwrap();
294        store
295            .apply(Timestamp::from_raw(2), vec![(k(b"x"), None)])
296            .unwrap();
297
298        assert_eq!(
299            store.get(b"x", Timestamp::from_raw(1)).unwrap().as_deref(),
300            Some(&b"a"[..])
301        );
302        assert_eq!(store.get(b"x", Timestamp::from_raw(2)).unwrap(), None);
303    }
304
305    #[test]
306    fn test_latest_commit_ts_tracks_newest_write() {
307        let store = MemoryStore::new();
308        assert_eq!(store.latest_commit_ts(b"x").unwrap(), None);
309        store
310            .apply(Timestamp::from_raw(3), vec![(k(b"x"), v(b"a"))])
311            .unwrap();
312        store
313            .apply(Timestamp::from_raw(7), vec![(k(b"x"), None)])
314            .unwrap();
315        assert_eq!(
316            store.latest_commit_ts(b"x").unwrap(),
317            Some(Timestamp::from_raw(7))
318        );
319    }
320
321    #[test]
322    fn test_key_count_counts_distinct_keys() {
323        let store = MemoryStore::new();
324        store
325            .apply(
326                Timestamp::from_raw(1),
327                vec![(k(b"a"), v(b"1")), (k(b"b"), v(b"2"))],
328            )
329            .unwrap();
330        store
331            .apply(Timestamp::from_raw(2), vec![(k(b"a"), v(b"3"))])
332            .unwrap();
333        assert_eq!(store.key_count(), 2);
334    }
335}