lsm_tree/
seqno.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::SeqNo;
6use std::sync::{
7    atomic::{
8        AtomicU64,
9        Ordering::{AcqRel, Acquire, Release},
10    },
11    Arc,
12};
13
14/// Thread-safe sequence number generator
15///
16/// # Examples
17///
18/// ```
19/// # use lsm_tree::{AbstractTree, Config, SequenceNumberCounter};
20/// #
21/// # let path = tempfile::tempdir()?;
22///
23/// let seqno = SequenceNumberCounter::default();
24/// let visible_seqno = SequenceNumberCounter::default();
25///
26/// let tree = Config::new(path, seqno.clone(), visible_seqno.clone()).open()?;
27///
28/// // Do some inserts...
29/// for k in [b"a", b"b", b"c"] {
30///     let batch_seqno = seqno.next();
31///     tree.insert(k, "abc", batch_seqno);
32///     visible_seqno.fetch_max(batch_seqno + 1);
33/// }
34///
35/// // Create a batch
36/// let batch_seqno = seqno.next();
37/// tree.remove("a".as_bytes(), batch_seqno);
38/// tree.remove("b".as_bytes(), batch_seqno);
39/// tree.remove("c".as_bytes(), batch_seqno);
40/// visible_seqno.fetch_max(batch_seqno + 1);
41/// #
42/// # assert!(tree.is_empty(visible_seqno.get(), None)?);
43/// # Ok::<(), lsm_tree::Error>(())
44/// ```
45#[derive(Clone, Default, Debug)]
46pub struct SequenceNumberCounter(Arc<AtomicU64>);
47
48impl SequenceNumberCounter {
49    /// Creates a new counter, setting it to some previous value
50    #[must_use]
51    pub fn new(prev: SeqNo) -> Self {
52        Self(Arc::new(AtomicU64::new(prev)))
53    }
54
55    /// Gets the would-be-next sequence number, without incrementing the counter.
56    ///
57    /// This should only be used when creating a snapshot.
58    #[must_use]
59    pub fn get(&self) -> SeqNo {
60        self.0.load(Acquire)
61    }
62
63    /// Gets the next sequence number.
64    #[must_use]
65    #[allow(clippy::missing_panics_doc, reason = "we should never run out of u64s")]
66    pub fn next(&self) -> SeqNo {
67        let seqno = self.0.fetch_add(1, Release);
68
69        // The MSB is reserved for transactions.
70        //
71        // This gives us 63-bit sequence numbers technically.
72        assert!(seqno < 0x8000_0000_0000_0000, "Ran out of sequence numbers");
73
74        seqno
75    }
76
77    /// Sets the sequence number.
78    pub fn set(&self, seqno: SeqNo) {
79        self.0.store(seqno, Release);
80    }
81
82    /// Maximizes the sequence number.
83    pub fn fetch_max(&self, seqno: SeqNo) {
84        self.0.fetch_max(seqno, AcqRel);
85    }
86}
87
88#[cfg(test)]
89mod tests {
90    use test_log::test;
91
92    #[test]
93    fn not_max_seqno() {
94        let counter = super::SequenceNumberCounter::default();
95        counter.set(0x7FFF_FFFF_FFFF_FFFF);
96        let _ = counter.next();
97    }
98
99    #[test]
100    #[should_panic = "Ran out of sequence numbers"]
101    fn max_seqno() {
102        let counter = super::SequenceNumberCounter::default();
103        counter.set(0x8000_0000_0000_0000);
104        let _ = counter.next();
105    }
106}