txn_db/store.rs
1//! The version store: where committed versions live.
2//!
3//! `txn-db` is the transaction layer, not the storage layer. It owns
4//! visibility, conflict detection, and commit ordering, but it delegates the
5//! actual keeping of versioned bytes to a [`VersionStore`]. That trait is the
6//! crate's Tier-3 seam: implement it over an LSM tree, a B-tree, a remote
7//! service — anything that can keep multiple timestamped versions of a key —
8//! and the transaction semantics compose on top unchanged.
9//!
10//! A [`MemoryStore`] ships for the common in-process case, for tests, and for
11//! examples. It is the default backing store of [`Db::new`](crate::Db::new).
12//!
13//! ## The contract a store must uphold
14//!
15//! A correct [`VersionStore`] keeps, for each key, the history of versions it
16//! has been asked to apply, each tagged with the commit timestamp it was applied
17//! at. Its two obligations are:
18//!
19//! - [`get`](VersionStore::get) returns the *newest* version whose commit
20//! timestamp is less than or equal to the caller's snapshot timestamp — the
21//! snapshot-read rule. A tombstone (a delete) at that position reads as
22//! "absent".
23//! - [`try_commit`](VersionStore::try_commit) validates a transaction's read and
24//! write sets against its snapshot and, if nothing conflicts, installs its
25//! writes at the commit timestamp — atomically with respect to other commits
26//! touching the same keys. This single method is what makes the store the
27//! serialization point for concurrent commits.
28//!
29//! ## Sharding
30//!
31//! [`MemoryStore`] partitions keys across independent shards, each with its own
32//! lock. Reads and commits that touch disjoint shards proceed without
33//! contending; a commit locks only the shards its keys fall in, in a fixed order
34//! so concurrent commits cannot deadlock. This is the sharded commit path the
35//! single global commit lock of the foundation release grew into.
36
37use std::collections::HashMap;
38use std::sync::Arc;
39
40use crate::error::{Result, TxnError};
41use crate::sync::{self, RwLock, RwLockWriteGuard};
42use crate::timestamp::Timestamp;
43
44/// One entry in a commit batch handed to [`VersionStore::try_commit`].
45///
46/// A key paired with the value to write at the commit timestamp (`Some`) or a
47/// tombstone marking a delete (`None`).
48pub type WriteEntry = (Arc<[u8]>, Option<Arc<[u8]>>);
49
50/// Default number of shards. A power of two so the shard index is a mask, not a
51/// division. Sixteen spreads contention well for in-process workloads without
52/// the per-commit cost of locking a long list of shards. Loom builds use far
53/// fewer to keep the interleaving search tractable.
54#[cfg(not(loom))]
55const DEFAULT_SHARDS: usize = 16;
56#[cfg(loom)]
57const DEFAULT_SHARDS: usize = 2;
58
59/// A keeper of timestamped versions, the backend a [`Db`](crate::Db) is built on.
60///
61/// This is the extension point for plugging `txn-db` onto a real storage
62/// engine. The transaction layer supplies the snapshot timestamps and the read
63/// and write sets; the store stores versions and enforces, atomically, that a
64/// commit only lands when nothing it depends on has changed. The two methods
65/// below state the precise contract.
66///
67/// Implementations must be `Send + Sync`: a [`Db`](crate::Db) shares one store
68/// across every thread that holds a clone of it.
69///
70/// # Examples
71///
72/// Driving the shipped [`MemoryStore`] directly through the trait:
73///
74/// ```
75/// use std::sync::Arc;
76/// use txn_db::{MemoryStore, Timestamp, VersionStore};
77///
78/// let store = MemoryStore::new();
79/// let key: Arc<[u8]> = Arc::from(&b"k"[..]);
80///
81/// // Commit one version at timestamp 1 (snapshot 0, no reads to validate).
82/// store.try_commit(
83/// Timestamp::ZERO,
84/// Timestamp::from_raw(1),
85/// vec![(key.clone(), Some(Arc::from(&b"v1"[..])))],
86/// &[],
87/// )?;
88///
89/// // A reader at timestamp 1 sees it; a reader at timestamp 0 does not.
90/// assert_eq!(store.get(b"k", Timestamp::from_raw(1))?.as_deref(), Some(&b"v1"[..]));
91/// assert_eq!(store.get(b"k", Timestamp::ZERO)?, None);
92/// # Ok::<(), txn_db::TxnError>(())
93/// ```
94pub trait VersionStore: Send + Sync {
95 /// Return the value of `key` visible at `read_ts`.
96 ///
97 /// The result is the value of the newest version of `key` whose commit
98 /// timestamp is `<= read_ts`, or `None` if there is no such version or the
99 /// newest visible version is a tombstone (the key was deleted as of
100 /// `read_ts`).
101 ///
102 /// # Errors
103 ///
104 /// Returns [`TxnError::Store`](crate::TxnError::Store) if the backend fails
105 /// to service the read. [`MemoryStore`] never fails.
106 fn get(&self, key: &[u8], read_ts: Timestamp) -> Result<Option<Arc<[u8]>>>;
107
108 /// Validate a transaction and, if it does not conflict, apply its writes.
109 ///
110 /// The store must perform the following as one step, atomic with respect to
111 /// any other `try_commit` that touches an overlapping key:
112 ///
113 /// 1. **Validate.** For every key in `writes` and every key in `reads`,
114 /// check that the key has no version with a commit timestamp greater than
115 /// `read_ts` — that is, that nothing the transaction wrote or read has
116 /// changed since its snapshot. `reads` is empty for snapshot-isolation
117 /// transactions and carries the read set for serializable ones.
118 /// 2. **Apply.** If validation passes, install each write in `writes` as a
119 /// new version stamped `commit_ts` (`Some` is a value, `None` a
120 /// tombstone). The database guarantees `commit_ts` is unique and that
121 /// timestamps are handed out in increasing order.
122 ///
123 /// If any key fails validation, the store applies nothing and reports the
124 /// conflict.
125 ///
126 /// # Errors
127 ///
128 /// Returns [`TxnError::Conflict`](crate::TxnError::Conflict) if validation
129 /// fails; no writes are applied. Returns
130 /// [`TxnError::Store`](crate::TxnError::Store) if the backend fails to apply
131 /// the batch.
132 fn try_commit(
133 &self,
134 read_ts: Timestamp,
135 commit_ts: Timestamp,
136 writes: Vec<WriteEntry>,
137 reads: &[Arc<[u8]>],
138 ) -> Result<()>;
139
140 /// Reclaim versions that no reader at or after `low_watermark` can observe,
141 /// returning how many were removed.
142 ///
143 /// For each key, the newest version with a commit timestamp at or below
144 /// `low_watermark` is the oldest one any live snapshot can still see;
145 /// versions older than it are unreachable and may be dropped. A key whose
146 /// only surviving version is a tombstone at or below the watermark may be
147 /// removed entirely.
148 ///
149 /// The default implementation does nothing, so a store that does not retain
150 /// history — or chooses not to collect — need not implement it. [`MemoryStore`]
151 /// overrides it.
152 fn collect_garbage(&self, low_watermark: Timestamp) -> usize {
153 let _ = low_watermark;
154 0
155 }
156}
157
158/// One stored version of a key: the timestamp it became visible and its value.
159///
160/// A `value` of `None` is a tombstone — the key was deleted at `commit_ts`.
161#[derive(Debug, Clone)]
162struct Version {
163 commit_ts: Timestamp,
164 value: Option<Arc<[u8]>>,
165}
166
167/// One shard's map from key to its version chain, kept in ascending
168/// commit-timestamp order.
169type Chains = HashMap<Arc<[u8]>, Vec<Version>>;
170
171/// One shard's slice of the keyspace.
172struct Shard {
173 chains: RwLock<Chains>,
174}
175
176/// An in-memory [`VersionStore`] that shards the keyspace for concurrency.
177///
178/// Each key is hashed to one of a fixed number of shards; each shard holds its
179/// keys' version chains behind its own reader-writer lock. Reads lock a single
180/// shard; a commit locks only the shards its keys fall in. Commits to disjoint
181/// shards therefore run in parallel, and the snapshot read of a key is a binary
182/// search within its shard for the newest version at or below the snapshot
183/// timestamp.
184///
185/// This is the default store of [`Db::new`](crate::Db::new) and suits caches,
186/// tests, and workloads that fit in memory. Versions accumulate until garbage
187/// collection lands (a later roadmap phase), so a long-lived store under heavy
188/// overwrite grows without bound for now.
189///
190/// # Examples
191///
192/// ```
193/// use txn_db::{Db, MemoryStore};
194///
195/// // `Db::new()` uses a `MemoryStore`; this is the explicit form.
196/// let db = Db::with_store(MemoryStore::new());
197/// let mut tx = db.begin();
198/// tx.put(b"hello".to_vec(), b"world".to_vec());
199/// tx.commit()?;
200/// # Ok::<(), txn_db::TxnError>(())
201/// ```
202pub struct MemoryStore {
203 shards: Box<[Shard]>,
204 /// `shard_count - 1`; ANDed with a key hash to pick a shard.
205 mask: usize,
206}
207
208impl Default for MemoryStore {
209 fn default() -> Self {
210 MemoryStore::new()
211 }
212}
213
214impl MemoryStore {
215 /// Create an empty in-memory store with the default shard count.
216 ///
217 /// # Examples
218 ///
219 /// ```
220 /// use txn_db::MemoryStore;
221 ///
222 /// let store = MemoryStore::new();
223 /// # let _ = store;
224 /// ```
225 #[must_use]
226 pub fn new() -> Self {
227 MemoryStore::with_shards(DEFAULT_SHARDS)
228 }
229
230 /// Create an empty store with a specific number of shards.
231 ///
232 /// `shards` is rounded up to a power of two (and at least one). More shards
233 /// reduce contention between commits that touch unrelated keys, at the cost
234 /// of a larger fixed footprint. The default of [`MemoryStore::new`] suits
235 /// most workloads; tune this only with a benchmark in hand.
236 ///
237 /// # Examples
238 ///
239 /// ```
240 /// use txn_db::MemoryStore;
241 ///
242 /// let store = MemoryStore::with_shards(64);
243 /// # let _ = store;
244 /// ```
245 #[must_use]
246 pub fn with_shards(shards: usize) -> Self {
247 let count = shards.max(1).next_power_of_two();
248 let shards = (0..count)
249 .map(|_| Shard {
250 chains: RwLock::new(HashMap::new()),
251 })
252 .collect::<Vec<_>>()
253 .into_boxed_slice();
254 MemoryStore {
255 shards,
256 mask: count - 1,
257 }
258 }
259
260 /// Number of distinct keys that have ever been written.
261 ///
262 /// Counts keys, not versions, and includes keys whose latest version is a
263 /// tombstone. Primarily useful in tests and diagnostics.
264 ///
265 /// # Examples
266 ///
267 /// ```
268 /// use txn_db::MemoryStore;
269 ///
270 /// let store = MemoryStore::new();
271 /// assert_eq!(store.key_count(), 0);
272 /// ```
273 #[must_use]
274 pub fn key_count(&self) -> usize {
275 self.shards
276 .iter()
277 .map(|shard| sync::read(&shard.chains).len())
278 .sum()
279 }
280
281 /// The shard a key belongs to.
282 #[inline]
283 fn shard_of(&self, key: &[u8]) -> usize {
284 (hash_key(key) as usize) & self.mask
285 }
286
287 /// Install a recovered version directly, without conflict validation.
288 ///
289 /// Used only during durability recovery, replaying a committed transaction
290 /// from the log. The caller installs recovered commits in ascending
291 /// commit-timestamp order, so each version is appended to the end of its
292 /// chain and the ascending invariant is preserved.
293 #[cfg(feature = "durability")]
294 pub(crate) fn install_recovered(&self, commit_ts: Timestamp, writes: Vec<WriteEntry>) {
295 for (key, value) in writes {
296 let shard = self.shard_of(&key);
297 sync::write(&self.shards[shard].chains)
298 .entry(key)
299 .or_default()
300 .push(Version { commit_ts, value });
301 }
302 }
303}
304
305impl VersionStore for MemoryStore {
306 fn get(&self, key: &[u8], read_ts: Timestamp) -> Result<Option<Arc<[u8]>>> {
307 let shard = &self.shards[self.shard_of(key)];
308 let chains = sync::read(&shard.chains);
309 Ok(visible_value(chains.get(key), read_ts))
310 }
311
312 fn try_commit(
313 &self,
314 read_ts: Timestamp,
315 commit_ts: Timestamp,
316 writes: Vec<WriteEntry>,
317 reads: &[Arc<[u8]>],
318 ) -> Result<()> {
319 // Fast path for the dominant shape — a single write with no read set to
320 // validate. It locks one shard and skips the per-shard bookkeeping
321 // (shard-index vectors, sort, dedup, guard vector, and the binary
322 // searches that map keys back to guards) the general path needs for
323 // multi-key, multi-shard commits.
324 if writes.len() == 1 && reads.is_empty() {
325 let shard = self.shard_of(&writes[0].0);
326 let mut chains = sync::write(&self.shards[shard].chains);
327 if newer_than(chains.get(writes[0].0.as_ref()), read_ts) {
328 return Err(TxnError::conflict(writes[0].0.len()));
329 }
330 for (key, value) in writes {
331 chains
332 .entry(key)
333 .or_default()
334 .push(Version { commit_ts, value });
335 }
336 return Ok(());
337 }
338
339 // Shard of every touched key, computed once.
340 let write_shards: Vec<usize> = writes.iter().map(|(k, _)| self.shard_of(k)).collect();
341 let read_shards: Vec<usize> = reads.iter().map(|k| self.shard_of(k)).collect();
342
343 // The distinct shards to lock, in ascending order so concurrent commits
344 // acquire shared shards in the same sequence and cannot deadlock.
345 let mut to_lock: Vec<usize> = write_shards
346 .iter()
347 .copied()
348 .chain(read_shards.iter().copied())
349 .collect();
350 to_lock.sort_unstable();
351 to_lock.dedup();
352
353 let mut guards: Vec<RwLockWriteGuard<'_, Chains>> = Vec::with_capacity(to_lock.len());
354 for &shard in &to_lock {
355 guards.push(sync::write(&self.shards[shard].chains));
356 }
357
358 // Validate the write set, then the read set: abort if any touched key
359 // gained a version after the transaction's snapshot.
360 for (entry, &shard) in writes.iter().zip(&write_shards) {
361 if let Ok(pos) = to_lock.binary_search(&shard) {
362 if newer_than(guards[pos].get(entry.0.as_ref()), read_ts) {
363 return Err(TxnError::conflict(entry.0.len()));
364 }
365 }
366 }
367 for (key, &shard) in reads.iter().zip(&read_shards) {
368 if let Ok(pos) = to_lock.binary_search(&shard) {
369 if newer_than(guards[pos].get(key.as_ref()), read_ts) {
370 return Err(TxnError::conflict(key.len()));
371 }
372 }
373 }
374
375 // Apply: append a new version for each write under the held locks.
376 for ((key, value), &shard) in writes.into_iter().zip(&write_shards) {
377 if let Ok(pos) = to_lock.binary_search(&shard) {
378 guards[pos]
379 .entry(key)
380 .or_default()
381 .push(Version { commit_ts, value });
382 }
383 }
384 Ok(())
385 }
386
387 fn collect_garbage(&self, low_watermark: Timestamp) -> usize {
388 let mut reclaimed = 0;
389 for shard in &self.shards {
390 let mut chains = sync::write(&shard.chains);
391 chains.retain(|_key, chain| {
392 // Versions at or below the watermark; the last of them is the
393 // oldest any live snapshot can still observe.
394 let visible = chain.partition_point(|v| v.commit_ts <= low_watermark);
395 if visible > 1 {
396 // Drop everything before that oldest-observable version.
397 reclaimed += visible - 1;
398 let _ = chain.drain(0..visible - 1);
399 }
400 // A key whose only surviving version is a tombstone the watermark
401 // has passed is absent for every live reader: drop it entirely.
402 if chain.len() == 1
403 && chain[0].commit_ts <= low_watermark
404 && chain[0].value.is_none()
405 {
406 reclaimed += 1;
407 false
408 } else {
409 true
410 }
411 });
412 }
413 reclaimed
414 }
415}
416
417/// Whether `key`'s newest version (if any) was committed after `read_ts` — the
418/// condition that makes a commit conflict.
419#[inline]
420fn newer_than(versions: Option<&Vec<Version>>, read_ts: Timestamp) -> bool {
421 matches!(versions.and_then(|v| v.last()), Some(v) if v.commit_ts > read_ts)
422}
423
424/// The value of the newest version at or below `read_ts`, or `None` if there is
425/// none or it is a tombstone.
426#[inline]
427fn visible_value(versions: Option<&Vec<Version>>, read_ts: Timestamp) -> Option<Arc<[u8]>> {
428 let versions = versions?;
429 // Versions are ascending by commit timestamp; the newest visible one is the
430 // last entry whose timestamp is `<= read_ts`.
431 let visible = versions.partition_point(|v| v.commit_ts <= read_ts);
432 let idx = visible.checked_sub(1)?;
433 versions[idx].value.clone()
434}
435
436/// FNV-1a hash of a key, used only to pick a shard. A non-cryptographic spread
437/// is all the shard index needs.
438#[inline]
439fn hash_key(key: &[u8]) -> u64 {
440 let mut hash = 0xcbf2_9ce4_8422_2325;
441 for &byte in key {
442 hash ^= u64::from(byte);
443 hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
444 }
445 hash
446}
447
448#[cfg(all(test, not(loom)))]
449#[allow(clippy::unwrap_used, clippy::expect_used)]
450mod tests {
451 use super::*;
452
453 fn k(b: &[u8]) -> Arc<[u8]> {
454 Arc::from(b)
455 }
456
457 fn commit(store: &MemoryStore, ts: u64, writes: Vec<WriteEntry>) {
458 store
459 .try_commit(
460 Timestamp::from_raw(ts - 1),
461 Timestamp::from_raw(ts),
462 writes,
463 &[],
464 )
465 .expect("commit");
466 }
467
468 #[test]
469 fn test_get_on_missing_key_returns_none() {
470 let store = MemoryStore::new();
471 assert_eq!(store.get(b"absent", Timestamp::from_raw(10)).unwrap(), None);
472 }
473
474 #[test]
475 fn test_read_sees_only_versions_at_or_before_snapshot() {
476 let store = MemoryStore::new();
477 commit(&store, 2, vec![(k(b"x"), Some(k(b"a")))]);
478 commit(&store, 4, vec![(k(b"x"), Some(k(b"b")))]);
479
480 assert_eq!(store.get(b"x", Timestamp::from_raw(1)).unwrap(), None);
481 assert_eq!(
482 store.get(b"x", Timestamp::from_raw(2)).unwrap().as_deref(),
483 Some(&b"a"[..])
484 );
485 assert_eq!(
486 store.get(b"x", Timestamp::from_raw(3)).unwrap().as_deref(),
487 Some(&b"a"[..])
488 );
489 assert_eq!(
490 store.get(b"x", Timestamp::from_raw(4)).unwrap().as_deref(),
491 Some(&b"b"[..])
492 );
493 assert_eq!(
494 store.get(b"x", Timestamp::from_raw(99)).unwrap().as_deref(),
495 Some(&b"b"[..])
496 );
497 }
498
499 #[test]
500 fn test_tombstone_reads_as_absent() {
501 let store = MemoryStore::new();
502 commit(&store, 1, vec![(k(b"x"), Some(k(b"a")))]);
503 commit(&store, 2, vec![(k(b"x"), None)]);
504
505 assert_eq!(
506 store.get(b"x", Timestamp::from_raw(1)).unwrap().as_deref(),
507 Some(&b"a"[..])
508 );
509 assert_eq!(store.get(b"x", Timestamp::from_raw(2)).unwrap(), None);
510 }
511
512 #[test]
513 fn test_write_write_conflict_is_detected() {
514 let store = MemoryStore::new();
515 commit(&store, 5, vec![(k(b"x"), Some(k(b"a")))]);
516
517 // A transaction whose snapshot predates the existing version conflicts.
518 let err = store
519 .try_commit(
520 Timestamp::from_raw(4),
521 Timestamp::from_raw(6),
522 vec![(k(b"x"), Some(k(b"b")))],
523 &[],
524 )
525 .unwrap_err();
526 assert!(matches!(err, TxnError::Conflict { .. }));
527 // Nothing was applied.
528 assert_eq!(
529 store.get(b"x", Timestamp::from_raw(99)).unwrap().as_deref(),
530 Some(&b"a"[..])
531 );
532 }
533
534 #[test]
535 fn test_read_set_validation_detects_skew() {
536 let store = MemoryStore::new();
537 commit(&store, 5, vec![(k(b"y"), Some(k(b"1")))]);
538
539 // Snapshot 4, write x, but read y which changed at ts 5 -> conflict.
540 let err = store
541 .try_commit(
542 Timestamp::from_raw(4),
543 Timestamp::from_raw(6),
544 vec![(k(b"x"), Some(k(b"a")))],
545 &[k(b"y")],
546 )
547 .unwrap_err();
548 assert!(matches!(err, TxnError::Conflict { .. }));
549 }
550
551 #[test]
552 fn test_multi_shard_commit_applies_all_keys() {
553 let store = MemoryStore::with_shards(8);
554 let writes: Vec<WriteEntry> = (0u8..32).map(|i| (k(&[i]), Some(k(&[i])))).collect();
555 commit(&store, 1, writes);
556 for i in 0u8..32 {
557 assert_eq!(
558 store.get(&[i], Timestamp::from_raw(1)).unwrap().as_deref(),
559 Some(&[i][..])
560 );
561 }
562 assert_eq!(store.key_count(), 32);
563 }
564
565 #[test]
566 fn test_with_shards_rounds_up_to_power_of_two() {
567 let store = MemoryStore::with_shards(5);
568 assert_eq!(store.shards.len(), 8);
569 assert_eq!(store.mask, 7);
570 }
571
572 #[test]
573 fn test_gc_prunes_versions_below_watermark_but_keeps_newest_visible() {
574 let store = MemoryStore::new();
575 commit(&store, 1, vec![(k(b"x"), Some(k(b"a")))]);
576 commit(&store, 2, vec![(k(b"x"), Some(k(b"b")))]);
577 commit(&store, 3, vec![(k(b"x"), Some(k(b"c")))]);
578
579 // A reader at timestamp 2 must still see "b", so GC at watermark 2 keeps
580 // the version at 2 and everything newer, dropping only the version at 1.
581 let reclaimed = store.collect_garbage(Timestamp::from_raw(2));
582 assert_eq!(reclaimed, 1);
583 assert_eq!(
584 store.get(b"x", Timestamp::from_raw(2)).unwrap().as_deref(),
585 Some(&b"b"[..])
586 );
587 assert_eq!(
588 store.get(b"x", Timestamp::from_raw(3)).unwrap().as_deref(),
589 Some(&b"c"[..])
590 );
591 }
592
593 #[test]
594 fn test_gc_drops_key_whose_only_survivor_is_a_passed_tombstone() {
595 let store = MemoryStore::new();
596 commit(&store, 1, vec![(k(b"x"), Some(k(b"a")))]);
597 commit(&store, 2, vec![(k(b"x"), None)]); // delete
598
599 // At watermark 5 the key is absent for everyone; it is dropped whole.
600 let reclaimed = store.collect_garbage(Timestamp::from_raw(5));
601 assert_eq!(reclaimed, 2);
602 assert_eq!(store.key_count(), 0);
603 }
604
605 #[test]
606 fn test_gc_keeps_everything_above_watermark() {
607 let store = MemoryStore::new();
608 commit(&store, 5, vec![(k(b"x"), Some(k(b"a")))]);
609 commit(&store, 6, vec![(k(b"x"), Some(k(b"b")))]);
610
611 // A watermark below all versions reclaims nothing.
612 assert_eq!(store.collect_garbage(Timestamp::from_raw(4)), 0);
613 assert_eq!(
614 store.get(b"x", Timestamp::from_raw(5)).unwrap().as_deref(),
615 Some(&b"a"[..])
616 );
617 }
618
619 #[test]
620 fn test_extreme_timestamps_order_correctly() {
621 // Versions at the top of the u64 range must still be ordered and read
622 // back correctly — the version chain uses no arithmetic that could wrap.
623 let store = MemoryStore::new();
624 let near_max = Timestamp::from_raw(u64::MAX - 1);
625 let max = Timestamp::from_raw(u64::MAX);
626 store
627 .try_commit(
628 Timestamp::ZERO,
629 near_max,
630 vec![(k(b"x"), Some(k(b"a")))],
631 &[],
632 )
633 .unwrap();
634 store
635 .try_commit(near_max, max, vec![(k(b"x"), Some(k(b"b")))], &[])
636 .unwrap();
637
638 assert_eq!(
639 store.get(b"x", Timestamp::from_raw(u64::MAX - 2)).unwrap(),
640 None
641 );
642 assert_eq!(
643 store.get(b"x", near_max).unwrap().as_deref(),
644 Some(&b"a"[..])
645 );
646 assert_eq!(store.get(b"x", max).unwrap().as_deref(), Some(&b"b"[..]));
647 // A conflict check at the very top of the range behaves normally.
648 let err = store
649 .try_commit(near_max, max, vec![(k(b"x"), Some(k(b"c")))], &[])
650 .unwrap_err();
651 assert!(matches!(err, TxnError::Conflict { .. }));
652 }
653
654 #[test]
655 fn test_gc_at_extreme_watermark() {
656 let store = MemoryStore::new();
657 store
658 .try_commit(
659 Timestamp::ZERO,
660 Timestamp::from_raw(u64::MAX - 1),
661 vec![(k(b"x"), Some(k(b"a")))],
662 &[],
663 )
664 .unwrap();
665 store
666 .try_commit(
667 Timestamp::from_raw(u64::MAX - 1),
668 Timestamp::from_raw(u64::MAX),
669 vec![(k(b"x"), Some(k(b"b")))],
670 &[],
671 )
672 .unwrap();
673 // Watermark at the max keeps only the newest visible version.
674 let reclaimed = store.collect_garbage(Timestamp::from_raw(u64::MAX));
675 assert_eq!(reclaimed, 1);
676 assert_eq!(
677 store
678 .get(b"x", Timestamp::from_raw(u64::MAX))
679 .unwrap()
680 .as_deref(),
681 Some(&b"b"[..])
682 );
683 }
684
685 #[test]
686 fn test_default_trait_gc_is_noop() {
687 // A bare trait object using the default never reclaims.
688 struct NoHistory;
689 impl VersionStore for NoHistory {
690 fn get(&self, _: &[u8], _: Timestamp) -> Result<Option<Arc<[u8]>>> {
691 Ok(None)
692 }
693 fn try_commit(
694 &self,
695 _: Timestamp,
696 _: Timestamp,
697 _: Vec<WriteEntry>,
698 _: &[Arc<[u8]>],
699 ) -> Result<()> {
700 Ok(())
701 }
702 }
703 assert_eq!(NoHistory.collect_garbage(Timestamp::from_raw(100)), 0);
704 }
705}