Skip to main content

lsm_tree/
seqno.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::SeqNo;
6use std::{
7    fmt::Debug,
8    sync::{
9        atomic::{
10            AtomicU64,
11            Ordering::{AcqRel, Acquire, Release},
12        },
13        Arc,
14    },
15};
16
17/// The maximum allowed sequence number value.
18///
19/// The MSB (`0x8000_0000_0000_0000`) is reserved for internal use by the
20/// transaction layer, so all sequence numbers must be strictly less than
21/// this boundary. Values returned by [`SequenceNumberGenerator::next`]
22/// must be at most `MAX_SEQNO - 1` (`0x7FFF_FFFF_FFFF_FFFE`), leaving
23/// room for `seqno + 1` operations used internally by the engine.
24pub const MAX_SEQNO: SeqNo = 0x7FFF_FFFF_FFFF_FFFF;
25
26/// Trait for custom sequence number generation.
27///
28/// Implementations must be thread-safe and provide atomic operations
29/// for sequence number management.
30///
31/// # Invariants
32///
33/// Implementors **must** respect the following invariants, which are
34/// relied upon by the storage engine (e.g., for MVCC and transactions):
35///
36/// - The most-significant bit (MSB) of a sequence number is reserved
37///   for internal use by the transaction layer.
38/// - All sequence numbers produced by this generator **must** therefore
39///   be strictly less than `0x8000_0000_0000_0000`.
40/// - Calls to [`SequenceNumberGenerator::next`] on a given generator
41///   instance must return monotonically increasing values (each `next`
42///   returns a value that is strictly greater than any value previously
43///   returned by `next` on the same instance) until the reserved MSB
44///   boundary is reached.
45/// - In practice, this means `next`-generated sequence numbers are
46///   unique for the lifetime of the generator (modulo wrap-around, which
47///   must not occur within the reserved MSB range).
48///
49/// Implementors are also responsible for ensuring that [`get`] does not
50/// return values that violate these invariants, and that [`set`] and
51/// [`fetch_max`] do not violate them (e.g., by setting the counter to a
52/// value at or above `0x8000_0000_0000_0000`, or by moving the counter
53/// backwards such that subsequent calls to [`next`] would observe
54/// non-monotonic sequence numbers).
55///
56/// The [`set`] method is a special-case escape hatch intended for use
57/// during initialization or recovery. It may move the counter forwards
58/// or backwards and therefore **may** cause subsequent calls to
59/// [`next`] to observe non-monotonic sequence numbers. This is
60/// permitted; callers are responsible for using [`fetch_max`] (or other
61/// application-level logic) after `set` if they need to reestablish any
62/// monotonicity guarantees.
63///
64/// # Supertraits
65///
66/// `UnwindSafe + RefUnwindSafe` are required because generators may be
67/// captured inside `catch_unwind` (e.g., during ingestion error recovery).
68///
69/// The default implementation is [`SequenceNumberCounter`], which uses
70/// an `Arc<AtomicU64>` for lock-free atomic operations. It enforces the
71/// MSB boundary in `new`, `set`, and `fetch_max` (via assert/clamp),
72/// and panics in `next` if the counter reaches the reserved range.
73pub trait SequenceNumberGenerator:
74    Send + Sync + Debug + std::panic::UnwindSafe + std::panic::RefUnwindSafe + 'static
75{
76    /// Gets the next sequence number, atomically incrementing the counter.
77    ///
78    /// This must:
79    /// - Return a value strictly greater than any value previously returned
80    ///   by `next` on the same generator instance (monotonic increase).
81    /// - Never return a value greater than `MAX_SEQNO - 1`, leaving room
82    ///   for internal `seqno + 1` operations that must remain strictly
83    ///   less than [`MAX_SEQNO`].
84    #[must_use]
85    fn next(&self) -> SeqNo;
86
87    /// Gets the current sequence number without incrementing.
88    ///
89    /// This should be consistent with the value that will be used as the
90    /// basis for the next call to [`next`], and must not itself alter the
91    /// monotonicity guarantees.
92    #[must_use]
93    fn get(&self) -> SeqNo;
94
95    /// Sets the sequence number to the given value.
96    ///
97    /// Implementations must ensure that `value` is strictly less than
98    /// `0x8000_0000_0000_0000` (the MSB is reserved).
99    ///
100    /// This method is intended for direct overrides (e.g., during
101    /// initialization or recovery) and is **not** required to preserve
102    /// monotonicity with respect to values previously returned by
103    /// [`next`]. In particular, calling `set` with a value lower than a
104    /// previously returned `next` value may cause subsequent calls to
105    /// [`next`] to observe non-monotonic sequence numbers. This is
106    /// allowed; callers that need to advance the sequence number in a
107    /// monotonic fashion should prefer [`fetch_max`] instead of `set`.
108    fn set(&self, value: SeqNo);
109
110    /// Atomically updates the sequence number to the maximum of
111    /// the current value and the given value.
112    ///
113    /// The resulting stored value must:
114    /// - Remain strictly less than `0x8000_0000_0000_0000`.
115    /// - Preserve the monotonicity guarantees expected by [`next`].
116    fn fetch_max(&self, value: SeqNo);
117}
118
119/// A shared, cloneable sequence number generator.
120pub type SharedSequenceNumberGenerator = Arc<dyn SequenceNumberGenerator>;
121
122/// Thread-safe sequence number generator
123///
124/// # Examples
125///
126/// ```
127/// # use lsm_tree::{AbstractTree, Config, SequenceNumberCounter};
128/// #
129/// # let path = tempfile::tempdir()?;
130///
131/// let seqno = SequenceNumberCounter::default();
132/// let visible_seqno = SequenceNumberCounter::default();
133///
134/// let tree = Config::new(path, seqno.clone(), visible_seqno.clone()).open()?;
135///
136/// // Do some inserts...
137/// for k in [b"a", b"b", b"c"] {
138///     let batch_seqno = seqno.next();
139///     tree.insert(k, "abc", batch_seqno);
140///     visible_seqno.fetch_max(batch_seqno + 1);
141/// }
142///
143/// // Create a batch
144/// let batch_seqno = seqno.next();
145/// tree.remove("a".as_bytes(), batch_seqno);
146/// tree.remove("b".as_bytes(), batch_seqno);
147/// tree.remove("c".as_bytes(), batch_seqno);
148/// visible_seqno.fetch_max(batch_seqno + 1);
149/// #
150/// # assert!(tree.is_empty(visible_seqno.get(), None)?);
151/// # Ok::<(), lsm_tree::Error>(())
152/// ```
153#[derive(Clone, Default, Debug)]
154pub struct SequenceNumberCounter(Arc<AtomicU64>);
155
156impl SequenceNumberCounter {
157    /// Creates a new counter, setting it to some previous value.
158    ///
159    /// # Panics
160    ///
161    /// Panics if `prev > MAX_SEQNO` (reserved MSB range).
162    ///
163    /// Note: `prev == MAX_SEQNO` is allowed but means the counter is
164    /// exhausted — any subsequent call to [`next()`](Self::next) will panic.
165    #[must_use]
166    pub fn new(prev: SeqNo) -> Self {
167        assert!(
168            prev <= MAX_SEQNO,
169            "Sequence number must not use the reserved MSB"
170        );
171        Self(Arc::new(AtomicU64::new(prev)))
172    }
173
174    /// Allocates and returns the next sequence number, atomically
175    /// incrementing the internal counter.
176    ///
177    /// The returned value is the counter's value **before** the increment
178    /// (i.e., pre-increment / fetch-then-add semantics).
179    ///
180    /// # Panics
181    ///
182    /// Panics if the current value is already [`MAX_SEQNO`] (i.e., when
183    /// advancing past it would enter the reserved MSB range).
184    #[must_use]
185    pub fn next(&self) -> SeqNo {
186        <Self as SequenceNumberGenerator>::next(self)
187    }
188
189    /// Returns the current internal counter value without incrementing.
190    ///
191    /// This is the value that the next call to [`next()`](Self::next) will
192    /// return (and then advance past).
193    #[must_use]
194    pub fn get(&self) -> SeqNo {
195        <Self as SequenceNumberGenerator>::get(self)
196    }
197
198    /// Sets the sequence number to the given value.
199    ///
200    /// # Panics
201    ///
202    /// Panics if `value > MAX_SEQNO` (reserved MSB range).
203    pub fn set(&self, value: SeqNo) {
204        <Self as SequenceNumberGenerator>::set(self, value);
205    }
206
207    /// Atomically updates the sequence number to the maximum of
208    /// the current value and the provided value.
209    pub fn fetch_max(&self, value: SeqNo) {
210        <Self as SequenceNumberGenerator>::fetch_max(self, value);
211    }
212}
213
214impl SequenceNumberGenerator for SequenceNumberCounter {
215    fn next(&self) -> SeqNo {
216        // We use fetch_update (CAS loop) instead of fetch_add to ensure the
217        // internal counter never enters the reserved MSB range. With fetch_add,
218        // a caught panic (via catch_unwind, which the trait requires via
219        // UnwindSafe) would leave the counter permanently stuck at an invalid
220        // value. fetch_update only stores the new value on success.
221        match self.0.fetch_update(AcqRel, Acquire, |current| {
222            if current >= MAX_SEQNO {
223                None
224            } else {
225                Some(current + 1)
226            }
227        }) {
228            Ok(seqno) => seqno,
229            Err(current) => panic!("Ran out of sequence numbers (current: {current})"),
230        }
231    }
232
233    fn get(&self) -> SeqNo {
234        self.0.load(Acquire)
235    }
236
237    fn set(&self, value: SeqNo) {
238        assert!(
239            value <= MAX_SEQNO,
240            "Sequence number must not use the reserved MSB"
241        );
242        self.0.store(value, Release);
243    }
244
245    fn fetch_max(&self, value: SeqNo) {
246        // Clamp to the maximum allowed sequence number to avoid storing
247        // a value in the reserved MSB range.
248        let clamped = value.min(MAX_SEQNO);
249        self.0.fetch_max(clamped, AcqRel);
250    }
251}
252
253// Orphan rules satisfied: SequenceNumberGenerator is a local trait,
254// so Arc<dyn SequenceNumberGenerator> is covered by a local type parameter.
255impl From<SequenceNumberCounter> for SharedSequenceNumberGenerator {
256    fn from(counter: SequenceNumberCounter) -> Self {
257        Arc::new(counter)
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::MAX_SEQNO;
264    use test_log::test;
265
266    #[test]
267    fn next_below_max_returns_valid_seqno() {
268        let counter = super::SequenceNumberCounter::default();
269        counter.set(MAX_SEQNO - 1);
270        let _ = counter.next();
271    }
272
273    #[test]
274    #[should_panic(expected = "Ran out of sequence numbers")]
275    fn next_at_max_panics() {
276        let counter = super::SequenceNumberCounter::default();
277        counter.set(MAX_SEQNO);
278        let _ = counter.next();
279    }
280
281    #[test]
282    #[should_panic(expected = "Sequence number must not use the reserved MSB")]
283    fn set_reserved_range_panics() {
284        let counter = super::SequenceNumberCounter::default();
285        counter.set(MAX_SEQNO + 1);
286    }
287
288    #[test]
289    fn fetch_max_clamps_reserved_to_max() {
290        let counter = super::SequenceNumberCounter::default();
291        counter.set(100);
292        counter.fetch_max(MAX_SEQNO + 1);
293        assert_eq!(counter.get(), MAX_SEQNO);
294    }
295}