lsm_tree/seqno.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::SeqNo;
6use std::sync::{
7 atomic::{
8 AtomicU64,
9 Ordering::{AcqRel, Acquire, Release},
10 },
11 Arc,
12};
13
14/// Thread-safe sequence number generator
15///
16/// # Examples
17///
18/// ```
19/// # use lsm_tree::{AbstractTree, Config, SequenceNumberCounter};
20/// #
21/// # let path = tempfile::tempdir()?;
22///
23/// let seqno = SequenceNumberCounter::default();
24///
25/// let tree = Config::new(path, seqno.clone()).open()?;
26///
27/// // Do some inserts...
28/// tree.insert("a".as_bytes(), "abc", seqno.next());
29/// tree.insert("b".as_bytes(), "abc", seqno.next());
30/// tree.insert("c".as_bytes(), "abc", seqno.next());
31///
32/// // Create a batch
33/// let batch_seqno = seqno.next();
34/// tree.remove("a".as_bytes(), batch_seqno);
35/// tree.remove("b".as_bytes(), batch_seqno);
36/// tree.remove("c".as_bytes(), batch_seqno);
37/// #
38/// # assert!(tree.is_empty(batch_seqno + 1, None)?);
39/// # Ok::<(), lsm_tree::Error>(())
40/// ```
41#[derive(Clone, Default, Debug)]
42pub struct SequenceNumberCounter(Arc<AtomicU64>);
43
44impl SequenceNumberCounter {
45 /// Creates a new counter, setting it to some previous value
46 #[must_use]
47 pub fn new(prev: SeqNo) -> Self {
48 Self(Arc::new(AtomicU64::new(prev)))
49 }
50
51 /// Gets the next sequence number, without incrementing the counter.
52 ///
53 /// This should only be used when creating a snapshot.
54 #[must_use]
55 pub fn get(&self) -> SeqNo {
56 self.0.load(Acquire)
57 }
58
59 /// Gets the next sequence number.
60 #[must_use]
61 #[allow(clippy::missing_panics_doc, reason = "we should never run out of u64s")]
62 pub fn next(&self) -> SeqNo {
63 let seqno = self.0.fetch_add(1, Release);
64
65 // The MSB is reserved for transactions.
66 //
67 // This gives us 63-bit sequence numbers technically.
68 assert!(seqno < 0x8000_0000_0000_0000, "Ran out of sequence numbers");
69
70 seqno
71 }
72
73 /// Sets the sequence number.
74 pub fn set(&self, seqno: SeqNo) {
75 self.0.store(seqno, Release);
76 }
77
78 /// Maximizes the sequence number.
79 pub fn fetch_max(&self, seqno: SeqNo) {
80 self.0.fetch_max(seqno, AcqRel);
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use test_log::test;
87
88 #[test]
89 fn not_max_seqno() {
90 let counter = super::SequenceNumberCounter::default();
91 counter.set(0x7FFF_FFFF_FFFF_FFFF);
92 let _ = counter.next();
93 }
94
95 #[test]
96 #[should_panic = "Ran out of sequence numbers"]
97 fn max_seqno() {
98 let counter = super::SequenceNumberCounter::default();
99 counter.set(0x8000_0000_0000_0000);
100 let _ = counter.next();
101 }
102}