lsm_tree/
seqno.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::SeqNo;
6use std::sync::{
7    atomic::{
8        AtomicU64,
9        Ordering::{AcqRel, Acquire, Release},
10    },
11    Arc,
12};
13
14/// Thread-safe sequence number generator
15///
16/// # Examples
17///
18/// ```
19/// # use lsm_tree::{AbstractTree, Config, SequenceNumberCounter};
20/// #
21/// # let path = tempfile::tempdir()?;
22/// let tree = Config::new(path).open()?;
23///
24/// let seqno = SequenceNumberCounter::default();
25///
26/// // Do some inserts...
27/// tree.insert("a".as_bytes(), "abc", seqno.next());
28/// tree.insert("b".as_bytes(), "abc", seqno.next());
29/// tree.insert("c".as_bytes(), "abc", seqno.next());
30///
31/// // Create a batch
32/// let batch_seqno = seqno.next();
33/// tree.remove("a".as_bytes(), batch_seqno);
34/// tree.remove("b".as_bytes(), batch_seqno);
35/// tree.remove("c".as_bytes(), batch_seqno);
36/// #
37/// # assert!(tree.is_empty(batch_seqno + 1, None)?);
38/// # Ok::<(), lsm_tree::Error>(())
39/// ```
40#[derive(Clone, Default, Debug)]
41pub struct SequenceNumberCounter(Arc<AtomicU64>);
42
43impl SequenceNumberCounter {
44    /// Creates a new counter, setting it to some previous value
45    #[must_use]
46    pub fn new(prev: SeqNo) -> Self {
47        Self(Arc::new(AtomicU64::new(prev)))
48    }
49
50    /// Gets the next sequence number, without incrementing the counter.
51    ///
52    /// This should only be used when creating a snapshot.
53    #[must_use]
54    pub fn get(&self) -> SeqNo {
55        self.0.load(Acquire)
56    }
57
58    /// Gets the next sequence number.
59    #[must_use]
60    pub fn next(&self) -> SeqNo {
61        let seqno = self.0.fetch_add(1, Release);
62
63        // The MSB is reserved for transactions.
64        //
65        // This gives us 63-bit sequence numbers technically.
66        assert!(seqno < 0x8000_0000_0000_0000, "Ran out of sequence numbers");
67
68        seqno
69    }
70
71    /// Sets the sequence number.
72    pub fn set(&self, seqno: SeqNo) {
73        self.0.store(seqno, Release);
74    }
75
76    /// Maximizes the sequence number.
77    pub fn fetch_max(&self, seqno: SeqNo) {
78        self.0.fetch_max(seqno, AcqRel);
79    }
80}
81
82#[cfg(test)]
83mod tests {
84    use test_log::test;
85
86    #[test]
87    fn not_max_seqno() {
88        let counter = super::SequenceNumberCounter::default();
89        counter.set(0x7FFF_FFFF_FFFF_FFFF);
90        let _ = counter.next();
91    }
92
93    #[test]
94    #[should_panic = "Ran out of sequence numbers"]
95    fn max_seqno() {
96        let counter = super::SequenceNumberCounter::default();
97        counter.set(0x8000_0000_0000_0000);
98        let _ = counter.next();
99    }
100}