lsm_tree/seqno.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::SeqNo;
6use std::{
7 fmt::Debug,
8 sync::{
9 atomic::{
10 AtomicU64,
11 Ordering::{AcqRel, Acquire, Release},
12 },
13 Arc,
14 },
15};
16
17/// The maximum allowed sequence number value.
18///
19/// The MSB (`0x8000_0000_0000_0000`) is reserved for internal use by the
20/// transaction layer, so all sequence numbers must be strictly less than
21/// this boundary. Values returned by [`SequenceNumberGenerator::next`]
22/// must be at most `MAX_SEQNO - 1` (`0x7FFF_FFFF_FFFF_FFFE`), leaving
23/// room for `seqno + 1` operations used internally by the engine.
24pub const MAX_SEQNO: SeqNo = 0x7FFF_FFFF_FFFF_FFFF;
25
26/// Trait for custom sequence number generation.
27///
28/// Implementations must be thread-safe and provide atomic operations
29/// for sequence number management.
30///
31/// # Invariants
32///
33/// Implementors **must** respect the following invariants, which are
34/// relied upon by the storage engine (e.g., for MVCC and transactions):
35///
36/// - The most-significant bit (MSB) of a sequence number is reserved
37/// for internal use by the transaction layer.
38/// - All sequence numbers produced by this generator **must** therefore
39/// be strictly less than `0x8000_0000_0000_0000`.
40/// - Calls to [`SequenceNumberGenerator::next`] on a given generator
41/// instance must return monotonically increasing values (each `next`
42/// returns a value that is strictly greater than any value previously
43/// returned by `next` on the same instance) until the reserved MSB
44/// boundary is reached.
45/// - In practice, this means `next`-generated sequence numbers are
46/// unique for the lifetime of the generator (modulo wrap-around, which
47/// must not occur within the reserved MSB range).
48///
49/// Implementors are also responsible for ensuring that [`get`] does not
50/// return values that violate these invariants, and that [`set`] and
51/// [`fetch_max`] do not violate them (e.g., by setting the counter to a
52/// value at or above `0x8000_0000_0000_0000`, or by moving the counter
53/// backwards such that subsequent calls to [`next`] would observe
54/// non-monotonic sequence numbers).
55///
56/// The [`set`] method is a special-case escape hatch intended for use
57/// during initialization or recovery. It may move the counter forwards
58/// or backwards and therefore **may** cause subsequent calls to
59/// [`next`] to observe non-monotonic sequence numbers. This is
60/// permitted; callers are responsible for using [`fetch_max`] (or other
61/// application-level logic) after `set` if they need to reestablish any
62/// monotonicity guarantees.
63///
64/// # Supertraits
65///
66/// `UnwindSafe + RefUnwindSafe` are required because generators may be
67/// captured inside `catch_unwind` (e.g., during ingestion error recovery).
68///
69/// The default implementation is [`SequenceNumberCounter`], which uses
70/// an `Arc<AtomicU64>` for lock-free atomic operations. It enforces the
71/// MSB boundary in `new`, `set`, and `fetch_max` (via assert/clamp),
72/// and panics in `next` if the counter reaches the reserved range.
73pub trait SequenceNumberGenerator:
74 Send + Sync + Debug + std::panic::UnwindSafe + std::panic::RefUnwindSafe + 'static
75{
76 /// Gets the next sequence number, atomically incrementing the counter.
77 ///
78 /// This must:
79 /// - Return a value strictly greater than any value previously returned
80 /// by `next` on the same generator instance (monotonic increase).
81 /// - Never return a value greater than `MAX_SEQNO - 1`, leaving room
82 /// for internal `seqno + 1` operations that must remain strictly
83 /// less than [`MAX_SEQNO`].
84 #[must_use]
85 fn next(&self) -> SeqNo;
86
87 /// Gets the current sequence number without incrementing.
88 ///
89 /// This should be consistent with the value that will be used as the
90 /// basis for the next call to [`next`], and must not itself alter the
91 /// monotonicity guarantees.
92 #[must_use]
93 fn get(&self) -> SeqNo;
94
95 /// Sets the sequence number to the given value.
96 ///
97 /// Implementations must ensure that `value` is strictly less than
98 /// `0x8000_0000_0000_0000` (the MSB is reserved).
99 ///
100 /// This method is intended for direct overrides (e.g., during
101 /// initialization or recovery) and is **not** required to preserve
102 /// monotonicity with respect to values previously returned by
103 /// [`next`]. In particular, calling `set` with a value lower than a
104 /// previously returned `next` value may cause subsequent calls to
105 /// [`next`] to observe non-monotonic sequence numbers. This is
106 /// allowed; callers that need to advance the sequence number in a
107 /// monotonic fashion should prefer [`fetch_max`] instead of `set`.
108 fn set(&self, value: SeqNo);
109
110 /// Atomically updates the sequence number to the maximum of
111 /// the current value and the given value.
112 ///
113 /// The resulting stored value must:
114 /// - Remain strictly less than `0x8000_0000_0000_0000`.
115 /// - Preserve the monotonicity guarantees expected by [`next`].
116 fn fetch_max(&self, value: SeqNo);
117}
118
119/// A shared, cloneable sequence number generator.
120pub type SharedSequenceNumberGenerator = Arc<dyn SequenceNumberGenerator>;
121
122/// Thread-safe sequence number generator
123///
124/// # Examples
125///
126/// ```
127/// # use lsm_tree::{AbstractTree, Config, SequenceNumberCounter};
128/// #
129/// # let path = tempfile::tempdir()?;
130///
131/// let seqno = SequenceNumberCounter::default();
132/// let visible_seqno = SequenceNumberCounter::default();
133///
134/// let tree = Config::new(path, seqno.clone(), visible_seqno.clone()).open()?;
135///
136/// // Do some inserts...
137/// for k in [b"a", b"b", b"c"] {
138/// let batch_seqno = seqno.next();
139/// tree.insert(k, "abc", batch_seqno);
140/// visible_seqno.fetch_max(batch_seqno + 1);
141/// }
142///
143/// // Create a batch
144/// let batch_seqno = seqno.next();
145/// tree.remove("a".as_bytes(), batch_seqno);
146/// tree.remove("b".as_bytes(), batch_seqno);
147/// tree.remove("c".as_bytes(), batch_seqno);
148/// visible_seqno.fetch_max(batch_seqno + 1);
149/// #
150/// # assert!(tree.is_empty(visible_seqno.get(), None)?);
151/// # Ok::<(), lsm_tree::Error>(())
152/// ```
153#[derive(Clone, Default, Debug)]
154pub struct SequenceNumberCounter(Arc<AtomicU64>);
155
156impl SequenceNumberCounter {
157 /// Creates a new counter, setting it to some previous value.
158 ///
159 /// # Panics
160 ///
161 /// Panics if `prev > MAX_SEQNO` (reserved MSB range).
162 ///
163 /// Note: `prev == MAX_SEQNO` is allowed but means the counter is
164 /// exhausted — any subsequent call to [`next()`](Self::next) will panic.
165 #[must_use]
166 pub fn new(prev: SeqNo) -> Self {
167 assert!(
168 prev <= MAX_SEQNO,
169 "Sequence number must not use the reserved MSB"
170 );
171 Self(Arc::new(AtomicU64::new(prev)))
172 }
173
174 /// Allocates and returns the next sequence number, atomically
175 /// incrementing the internal counter.
176 ///
177 /// The returned value is the counter's value **before** the increment
178 /// (i.e., pre-increment / fetch-then-add semantics).
179 ///
180 /// # Panics
181 ///
182 /// Panics if the current value is already [`MAX_SEQNO`] (i.e., when
183 /// advancing past it would enter the reserved MSB range).
184 #[must_use]
185 pub fn next(&self) -> SeqNo {
186 <Self as SequenceNumberGenerator>::next(self)
187 }
188
189 /// Returns the current internal counter value without incrementing.
190 ///
191 /// This is the value that the next call to [`next()`](Self::next) will
192 /// return (and then advance past).
193 #[must_use]
194 pub fn get(&self) -> SeqNo {
195 <Self as SequenceNumberGenerator>::get(self)
196 }
197
198 /// Sets the sequence number to the given value.
199 ///
200 /// # Panics
201 ///
202 /// Panics if `value > MAX_SEQNO` (reserved MSB range).
203 pub fn set(&self, value: SeqNo) {
204 <Self as SequenceNumberGenerator>::set(self, value);
205 }
206
207 /// Atomically updates the sequence number to the maximum of
208 /// the current value and the provided value.
209 pub fn fetch_max(&self, value: SeqNo) {
210 <Self as SequenceNumberGenerator>::fetch_max(self, value);
211 }
212}
213
214impl SequenceNumberGenerator for SequenceNumberCounter {
215 fn next(&self) -> SeqNo {
216 // We use fetch_update (CAS loop) instead of fetch_add to ensure the
217 // internal counter never enters the reserved MSB range. With fetch_add,
218 // a caught panic (via catch_unwind, which the trait requires via
219 // UnwindSafe) would leave the counter permanently stuck at an invalid
220 // value. fetch_update only stores the new value on success.
221 match self.0.fetch_update(AcqRel, Acquire, |current| {
222 if current >= MAX_SEQNO {
223 None
224 } else {
225 Some(current + 1)
226 }
227 }) {
228 Ok(seqno) => seqno,
229 Err(current) => panic!("Ran out of sequence numbers (current: {current})"),
230 }
231 }
232
233 fn get(&self) -> SeqNo {
234 self.0.load(Acquire)
235 }
236
237 fn set(&self, value: SeqNo) {
238 assert!(
239 value <= MAX_SEQNO,
240 "Sequence number must not use the reserved MSB"
241 );
242 self.0.store(value, Release);
243 }
244
245 fn fetch_max(&self, value: SeqNo) {
246 // Clamp to the maximum allowed sequence number to avoid storing
247 // a value in the reserved MSB range.
248 let clamped = value.min(MAX_SEQNO);
249 self.0.fetch_max(clamped, AcqRel);
250 }
251}
252
253// Orphan rules satisfied: SequenceNumberGenerator is a local trait,
254// so Arc<dyn SequenceNumberGenerator> is covered by a local type parameter.
255impl From<SequenceNumberCounter> for SharedSequenceNumberGenerator {
256 fn from(counter: SequenceNumberCounter) -> Self {
257 Arc::new(counter)
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::MAX_SEQNO;
264 use test_log::test;
265
266 #[test]
267 fn next_below_max_returns_valid_seqno() {
268 let counter = super::SequenceNumberCounter::default();
269 counter.set(MAX_SEQNO - 1);
270 let _ = counter.next();
271 }
272
273 #[test]
274 #[should_panic(expected = "Ran out of sequence numbers")]
275 fn next_at_max_panics() {
276 let counter = super::SequenceNumberCounter::default();
277 counter.set(MAX_SEQNO);
278 let _ = counter.next();
279 }
280
281 #[test]
282 #[should_panic(expected = "Sequence number must not use the reserved MSB")]
283 fn set_reserved_range_panics() {
284 let counter = super::SequenceNumberCounter::default();
285 counter.set(MAX_SEQNO + 1);
286 }
287
288 #[test]
289 fn fetch_max_clamps_reserved_to_max() {
290 let counter = super::SequenceNumberCounter::default();
291 counter.set(100);
292 counter.fetch_max(MAX_SEQNO + 1);
293 assert_eq!(counter.get(), MAX_SEQNO);
294 }
295}