Skip to main content

common/
sequence.rs

1//! Sequence number allocation and persistence.
2//!
3//! This module provides components for allocating monotonically increasing
4//! sequence numbers with crash recovery. It provides:
5//!
6//! **[`SequenceAllocator`]**: Allocates individual sequence numbers from sequence
7//!  blocks stored in storage. Tracks the current reserved block, and allocates new
8//!  sequence numbers from it. When the block is out of sequence numbers, the
9//!  allocator allocates a new block, and returns the corresponding Record in the
10//!  return from `SequenceAllocator#allocate`. It is up to the caller to ensure
11//!  that the record is persisted in storage. This allows the allocator to be used
12//!  from implementations of `Delta` by including the sequence block writes in
13//!  persisted deltas.
14//!
15//! # Design
16//!
17//! Block-based allocation reduces write amplification by pre-allocating ranges
18//! of sequence numbers instead of persisting after every allocation. The allocator
19//! tracks allocations via [`SeqBlock`] records:
20//!
21//! - `base_sequence`: Starting sequence number of the allocated block
22//! - `block_size`: Number of sequence numbers in the block
23//!
24//! On crash recovery, the next block starts at `base_sequence + block_size`,
25//! ensuring monotonicity even if some allocated sequences were unused.
26//!
27//! # Usage
28//!
29//! Each system provides its own key format for storing the SeqBlock record:
30//!
31//! ```ignore
32//! use bytes::Bytes;
33//! use common::sequence::{SeqBlockStore, SequenceAllocator};
34//!
35//! // Domain-specific key (e.g., Log uses [0x01, 0x02])
36//! const MY_SEQ_BLOCK_KEY: &[u8] = &[0x01, 0x02];
37//!
38//! let key = Bytes::from_static(MY_SEQ_BLOCK_KEY);
39//! let allocator = SequenceAllocator::load(storage.clone(), key);
40//!
41//! let (seq, put) = allocator.allocate_one().await?;
42//! if let Some(put) = put {
43//!     storage.put(vec![put]).await.unwrap();
44//! }
45//! ```
46
47use bytes::Bytes;
48
49use crate::serde::DeserializeError;
50use crate::serde::seq_block::SeqBlock;
51use crate::{Record, Storage, StorageError};
52
53/// Default block size for sequence allocation.
54pub const DEFAULT_BLOCK_SIZE: u64 = 4096;
55
56/// Error type for sequence allocation operations.
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub enum SequenceError {
59    /// Storage operation failed
60    Storage(StorageError),
61    /// Deserialization failed
62    Deserialize(DeserializeError),
63}
64
65impl std::error::Error for SequenceError {}
66
67impl std::fmt::Display for SequenceError {
68    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
69        match self {
70            SequenceError::Storage(e) => write!(f, "storage error: {}", e),
71            SequenceError::Deserialize(e) => write!(f, "deserialize error: {}", e),
72        }
73    }
74}
75
76impl From<StorageError> for SequenceError {
77    fn from(err: StorageError) -> Self {
78        SequenceError::Storage(err)
79    }
80}
81
82impl From<DeserializeError> for SequenceError {
83    fn from(err: DeserializeError) -> Self {
84        SequenceError::Deserialize(err)
85    }
86}
87
88/// Result type alias for sequence allocation operations.
89pub type SequenceResult<T> = std::result::Result<T, SequenceError>;
90
91pub struct AllocatedSeqBlock {
92    current_block: Option<SeqBlock>,
93    next_sequence: u64,
94}
95
96impl AllocatedSeqBlock {
97    fn remaining(&self) -> u64 {
98        match &self.current_block {
99            None => 0,
100            Some(block) => block.next_base().saturating_sub(self.next_sequence),
101        }
102    }
103
104    async fn load(storage: &dyn Storage, key: &Bytes) -> SequenceResult<Self> {
105        let current_block = match storage.get(key.clone()).await? {
106            Some(record) => Some(SeqBlock::deserialize(&record.value)?),
107            None => None,
108        };
109        let next_sequence = current_block
110            .as_ref()
111            // in the event that block exists, set the next sequence to the next base
112            // so that a new block is immediately allocated
113            .map(|b| b.next_base())
114            .unwrap_or(0);
115        Ok(Self {
116            current_block,
117            next_sequence,
118        })
119    }
120}
121
122/// Allocates sequence numbers from pre-allocated blocks.
123///
124/// This allocator manages the lifecycle of sequence number allocation:
125/// - Initialize from storage on startup
126/// - Allocate individual sequence numbers
127/// - Persist new blocks when the current block is exhausted
128///
129/// # Thread Safety
130///
131/// This struct is not inherently thread safe. It should be owned by a single writing task
132pub struct SequenceAllocator {
133    key: Bytes,
134    block: AllocatedSeqBlock,
135}
136
137impl SequenceAllocator {
138    /// Creates a new allocator using the given storage and key.
139    pub async fn load(storage: &dyn Storage, key: Bytes) -> SequenceResult<Self> {
140        let block = AllocatedSeqBlock::load(storage, &key).await?;
141        Ok(Self { key, block })
142    }
143
144    /// Returns the next sequence number that would be allocated.
145    ///
146    /// This does not consume any sequences; it just peeks at the current state.
147    pub fn peek_next_sequence(&self) -> u64 {
148        self.block.next_sequence
149    }
150
151    /// Allocates a single sequence number.
152    ///
153    /// Convenience method equivalent to `allocate(1)`.
154    pub fn allocate_one(&mut self) -> (u64, Option<Record>) {
155        self.allocate(1)
156    }
157
158    /// Allocates a contiguous range of sequence numbers.
159    ///
160    /// Returns a pair where the first element is the first sequence number in
161    /// the allocated range. The caller can use sequences `base..base+count`.
162    /// The second element is an `Option<Record>`. If the current block doesn't
163    /// have enough sequences remaining, the remaining sequences are used and a
164    /// new block is allocated for the rest. The returned record, if present,
165    /// must be put to storage to reserve the full sequence range. Note that since
166    /// blocks are contiguous (each starts where the previous ends), the returned
167    /// sequence range is always contiguous.
168    ///
169    pub fn allocate(&mut self, count: u64) -> (u64, Option<Record>) {
170        let remaining = self.block.remaining();
171
172        // If current block can satisfy the request, use it
173        if remaining >= count {
174            let base_sequence = self.block.next_sequence;
175            self.block.next_sequence += count;
176            return (base_sequence, None);
177        }
178
179        // Need a new block. Use remaining sequences from current block first.
180        let from_new_block = count - remaining;
181        let (new_block, record) = self.init_next_block(from_new_block);
182
183        // Base sequence is either from current block (if any remaining) or new block
184        let base_sequence = if remaining > 0 {
185            self.block.next_sequence
186        } else {
187            new_block.base_sequence
188        };
189
190        self.block.next_sequence = new_block.base_sequence + from_new_block;
191        self.block.current_block = Some(new_block);
192
193        (base_sequence, Some(record))
194    }
195
196    fn init_next_block(&self, min_count: u64) -> (SeqBlock, Record) {
197        let base_sequence = match &self.block.current_block {
198            Some(block) => block.next_base(),
199            None => 0,
200        };
201
202        let block_size = min_count.max(DEFAULT_BLOCK_SIZE);
203        let new_block = SeqBlock::new(base_sequence, block_size);
204
205        let value: Bytes = new_block.serialize();
206        let record = Record::new(self.key.clone(), value);
207        (new_block, record)
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use crate::storage::Storage;
215    use opendata_macros::storage_test;
216
217    fn test_key() -> Bytes {
218        Bytes::from_static(&[0x01, 0x02])
219    }
220
221    #[storage_test]
222    async fn should_load_none_when_no_block_allocated(storage: Arc<dyn Storage>) {
223        // when
224        let block = AllocatedSeqBlock::load(storage.as_ref(), &test_key())
225            .await
226            .unwrap();
227
228        // then
229        assert_eq!(block.next_sequence, 0);
230        assert_eq!(block.current_block, None);
231    }
232
233    #[storage_test]
234    async fn should_load_first_block(storage: Arc<dyn Storage>) {
235        // given:
236        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
237            .await
238            .unwrap();
239
240        // when:
241        let (seq, record) = allocator.allocate(1);
242
243        // then:
244        assert_eq!(seq, 0);
245        assert!(record.is_some());
246        let record = record.unwrap();
247        let block = SeqBlock::deserialize(&record.value).unwrap();
248        assert_eq!(block.base_sequence, 0);
249        assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE);
250    }
251
252    #[storage_test]
253    async fn should_allocate_larger_block_when_requested(storage: Arc<dyn Storage>) {
254        // given
255        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
256            .await
257            .unwrap();
258
259        // when:
260        let large_count = DEFAULT_BLOCK_SIZE * 2;
261        let (seq, record) = allocator.allocate(large_count);
262
263        // then:
264        assert_eq!(seq, 0);
265        assert!(record.is_some());
266        let record = record.unwrap();
267        let block = SeqBlock::deserialize(&record.value).unwrap();
268        assert_eq!(block.base_sequence, seq);
269        assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE * 2);
270        let (seq, _) = allocator.allocate(1);
271        assert_eq!(seq, large_count);
272    }
273
274    #[storage_test]
275    async fn should_allocate_sequential_blocks(storage: Arc<dyn Storage>) {
276        // given
277        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
278            .await
279            .unwrap();
280        let mut puts = vec![];
281
282        // when
283        for _ in 0..(DEFAULT_BLOCK_SIZE * 3) {
284            let (_, maybe_put) = allocator.allocate(1);
285            maybe_put.inspect(|r| puts.push(r.clone()));
286        }
287
288        // then
289        let blocks: Vec<_> = puts
290            .into_iter()
291            .map(|r| SeqBlock::deserialize(&r.value).unwrap())
292            .collect();
293        assert_eq!(blocks.len(), 3);
294        assert_eq!(blocks[0].base_sequence, 0);
295        assert_eq!(blocks[1].base_sequence, DEFAULT_BLOCK_SIZE);
296        assert_eq!(blocks[2].base_sequence, DEFAULT_BLOCK_SIZE * 2);
297    }
298
299    #[storage_test]
300    async fn should_recover_from_storage_on_initialize(storage: Arc<dyn Storage>) {
301        // First instance allocates some blocks
302        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
303            .await
304            .unwrap();
305        allocator.allocate(DEFAULT_BLOCK_SIZE);
306        let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE);
307        storage.put(vec![put.unwrap().into()]).await.unwrap();
308
309        // when: Second instance should recover
310        let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
311            .await
312            .unwrap();
313
314        // then:
315        assert_eq!(
316            allocator2.block.current_block,
317            Some(SeqBlock::new(DEFAULT_BLOCK_SIZE, DEFAULT_BLOCK_SIZE))
318        );
319        assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE * 2);
320        let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
321        let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
322        assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE * 2);
323        assert_eq!(seq, DEFAULT_BLOCK_SIZE * 2);
324    }
325
326    #[storage_test]
327    async fn should_resume_from_next_block_on_initialize(storage: Arc<dyn Storage>) {
328        // First instance allocates some blocks
329        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
330            .await
331            .unwrap();
332        let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE / 2);
333        storage.put(vec![put.unwrap().into()]).await.unwrap();
334
335        // when: Second instance should recover
336        let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
337            .await
338            .unwrap();
339
340        // then:
341        assert_eq!(
342            allocator2.block.current_block,
343            Some(SeqBlock::new(0, DEFAULT_BLOCK_SIZE))
344        );
345        assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE);
346        let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
347        let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
348        assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
349        assert_eq!(seq, DEFAULT_BLOCK_SIZE);
350    }
351
352    #[storage_test]
353    async fn should_allocate_sequential_sequence_numbers(storage: Arc<dyn Storage>) {
354        // given
355        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
356            .await
357            .unwrap();
358
359        // when
360        let (seq1, _) = allocator.allocate_one();
361        let (seq2, _) = allocator.allocate_one();
362        let (seq3, _) = allocator.allocate_one();
363
364        // then
365        assert_eq!(seq1, 0);
366        assert_eq!(seq2, 1);
367        assert_eq!(seq3, 2);
368    }
369
370    #[storage_test]
371    async fn should_allocate_batch_of_sequences(storage: Arc<dyn Storage>) {
372        // given
373        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
374            .await
375            .unwrap();
376
377        // when
378        let (seq1, _) = allocator.allocate(10);
379        let (seq2, _) = allocator.allocate(5);
380
381        // then
382        assert_eq!(seq1, 0);
383        assert_eq!(seq2, 10);
384    }
385
386    #[storage_test]
387    async fn should_span_blocks_when_batch_exceeds_remaining(storage: Arc<dyn Storage>) {
388        // given
389        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
390            .await
391            .unwrap();
392
393        // Allocate most of the first block
394        allocator.allocate(DEFAULT_BLOCK_SIZE - 10);
395
396        // when - allocate more than remaining (10 left, request 25)
397        let (seq, put) = allocator.allocate(25);
398
399        // then - should get contiguous sequences starting at remaining position
400        assert_eq!(seq, DEFAULT_BLOCK_SIZE - 10);
401        assert!(put.is_some());
402        let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
403        assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
404    }
405
406    #[storage_test]
407    async fn should_allocate_new_block_when_exhausted(storage: Arc<dyn Storage>) {
408        // given
409        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
410            .await
411            .unwrap();
412
413        // when - allocate entire first block plus one more
414        allocator.allocate(DEFAULT_BLOCK_SIZE);
415        let (seq, put) = allocator.allocate_one();
416
417        // then - should be first sequence of second block
418        assert_eq!(seq, DEFAULT_BLOCK_SIZE);
419        assert!(put.is_some());
420    }
421
422    #[storage_test]
423    async fn should_allocate_exactly_remaining(storage: Arc<dyn Storage>) {
424        // given
425        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
426            .await
427            .unwrap();
428
429        // Use some sequences
430        allocator.allocate(100);
431        let remaining = DEFAULT_BLOCK_SIZE - 100;
432
433        // when - allocate exactly the remaining amount
434        let (seq, put) = allocator.allocate(remaining);
435
436        // then - should use up exactly the current block
437        assert_eq!(seq, 100);
438        assert!(put.is_none());
439
440        // and next allocation should come from new block
441        let (next_seq, put) = allocator.allocate_one();
442        assert_eq!(next_seq, DEFAULT_BLOCK_SIZE);
443        assert!(put.is_some());
444    }
445
446    #[storage_test]
447    async fn should_handle_large_batch_spanning_from_partial_block(storage: Arc<dyn Storage>) {
448        // given
449        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
450            .await
451            .unwrap();
452
453        // Use most of first block, leaving 100
454        allocator.allocate(DEFAULT_BLOCK_SIZE - 100);
455
456        // when - request much more than remaining (spans into new block)
457        let large_request = DEFAULT_BLOCK_SIZE + 500;
458        let (seq, _) = allocator.allocate(large_request);
459
460        // then - should start at the remaining position
461        assert_eq!(seq, DEFAULT_BLOCK_SIZE - 100);
462
463        // and next allocation continues after the large batch
464        let (next_seq, _) = allocator.allocate_one();
465        assert_eq!(next_seq, DEFAULT_BLOCK_SIZE - 100 + large_request);
466    }
467
468    #[storage_test]
469    async fn should_peek_next_sequence_without_consuming(storage: Arc<dyn Storage>) {
470        // given
471        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
472            .await
473            .unwrap();
474
475        // Allocate some sequences
476        allocator.allocate(10);
477
478        // when
479        let peeked = allocator.peek_next_sequence();
480        let (allocated, _) = allocator.allocate_one();
481
482        // then - peeked should match what was allocated
483        assert_eq!(peeked, allocated);
484        assert_eq!(peeked, 10);
485    }
486}