Skip to main content

common/
sequence.rs

1//! Sequence number allocation and persistence.
2//!
3//! This module provides components for allocating monotonically increasing
4//! sequence numbers with crash recovery. It provides:
5//!
6//! **[`SequenceAllocator`]**: Allocates individual sequence numbers from sequence
7//!  blocks stored in storage. Tracks the current reserved block, and allocates new
8//!  sequence numbers from it. When the block is out of sequence numbers, the
9//!  allocator allocates a new block, and returns the corresponding Record in the
10//!  return from `SequenceAllocator#allocate`. It is up to the caller to ensure
11//!  that the record is persisted in storage. This allows the allocator to be used
12//!  from implementations of `Delta` by including the sequence block writes in
13//!  persisted deltas.
14//!
15//! # Design
16//!
17//! Block-based allocation reduces write amplification by pre-allocating ranges
18//! of sequence numbers instead of persisting after every allocation. The allocator
19//! tracks allocations via [`SeqBlock`] records:
20//!
21//! - `base_sequence`: Starting sequence number of the allocated block
22//! - `block_size`: Number of sequence numbers in the block
23//!
24//! On crash recovery, the next block starts at `base_sequence + block_size`,
25//! ensuring monotonicity even if some allocated sequences were unused.
26//!
27//! # Usage
28//!
29//! Each system provides its own key format for storing the SeqBlock record:
30//!
31//! ```ignore
32//! use bytes::Bytes;
33//! use common::sequence::{SeqBlockStore, SequenceAllocator};
34//!
35//! // Domain-specific key (e.g., Log uses [0x01, 0x02])
36//! const MY_SEQ_BLOCK_KEY: &[u8] = &[0x01, 0x02];
37//!
38//! let key = Bytes::from_static(MY_SEQ_BLOCK_KEY);
39//! let allocator = SequenceAllocator::load(storage.clone(), key);
40//!
41//! let (seq, put) = allocator.allocate_one().await?;
42//! if let Some(put) = put {
43//!     storage.put(vec![put]).await.unwrap();
44//! }
45//! ```
46
47use bytes::Bytes;
48
49use crate::serde::DeserializeError;
50use crate::serde::seq_block::SeqBlock;
51use crate::{Record, Storage, StorageError};
52
53/// Default block size for sequence allocation.
54pub const DEFAULT_BLOCK_SIZE: u64 = 4096;
55
56/// Error type for sequence allocation operations.
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub enum SequenceError {
59    /// Storage operation failed
60    Storage(StorageError),
61    /// Deserialization failed
62    Deserialize(DeserializeError),
63}
64
65impl std::error::Error for SequenceError {}
66
67impl std::fmt::Display for SequenceError {
68    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
69        match self {
70            SequenceError::Storage(e) => write!(f, "storage error: {}", e),
71            SequenceError::Deserialize(e) => write!(f, "deserialize error: {}", e),
72        }
73    }
74}
75
76impl From<StorageError> for SequenceError {
77    fn from(err: StorageError) -> Self {
78        SequenceError::Storage(err)
79    }
80}
81
82impl From<DeserializeError> for SequenceError {
83    fn from(err: DeserializeError) -> Self {
84        SequenceError::Deserialize(err)
85    }
86}
87
88/// Result type alias for sequence allocation operations.
89pub type SequenceResult<T> = std::result::Result<T, SequenceError>;
90
91#[derive(Clone, Debug)]
92pub struct AllocatedSeqBlock {
93    current_block: Option<SeqBlock>,
94    next_sequence: u64,
95}
96
97impl AllocatedSeqBlock {
98    fn remaining(&self) -> u64 {
99        match &self.current_block {
100            None => 0,
101            Some(block) => block.next_base().saturating_sub(self.next_sequence),
102        }
103    }
104
105    async fn load(storage: &dyn Storage, key: &Bytes) -> SequenceResult<Self> {
106        let current_block = match storage.get(key.clone()).await? {
107            Some(record) => Some(SeqBlock::deserialize(&record.value)?),
108            None => None,
109        };
110        let next_sequence = current_block
111            .as_ref()
112            // in the event that block exists, set the next sequence to the next base
113            // so that a new block is immediately allocated
114            .map(|b| b.next_base())
115            .unwrap_or(0);
116        Ok(Self {
117            current_block,
118            next_sequence,
119        })
120    }
121}
122
123/// Allocates sequence numbers from pre-allocated blocks.
124///
125/// This allocator manages the lifecycle of sequence number allocation:
126/// - Initialize from storage on startup
127/// - Allocate individual sequence numbers
128/// - Persist new blocks when the current block is exhausted
129///
130/// # Thread Safety
131///
132/// This struct is not inherently thread safe. It should be owned by a single writing task
133#[derive(Debug)]
134pub struct SequenceAllocator {
135    key: Bytes,
136    block: AllocatedSeqBlock,
137}
138
139impl SequenceAllocator {
140    /// Creates a new allocator using the given storage and key.
141    pub async fn load(storage: &dyn Storage, key: Bytes) -> SequenceResult<Self> {
142        let block = AllocatedSeqBlock::load(storage, &key).await?;
143        Ok(Self { key, block })
144    }
145
146    pub fn new(key: Bytes, block: AllocatedSeqBlock) -> Self {
147        Self { key, block }
148    }
149
150    /// Returns the next sequence number that would be allocated.
151    ///
152    /// This does not consume any sequences; it just peeks at the current state.
153    pub fn peek_next_sequence(&self) -> u64 {
154        self.block.next_sequence
155    }
156
157    /// Allocates a single sequence number.
158    ///
159    /// Convenience method equivalent to `allocate(1)`.
160    pub fn allocate_one(&mut self) -> (u64, Option<Record>) {
161        self.allocate(1)
162    }
163
164    /// Allocates a contiguous range of sequence numbers.
165    ///
166    /// Returns a pair where the first element is the first sequence number in
167    /// the allocated range. The caller can use sequences `base..base+count`.
168    /// The second element is an `Option<Record>`. If the current block doesn't
169    /// have enough sequences remaining, the remaining sequences are used and a
170    /// new block is allocated for the rest. The returned record, if present,
171    /// must be put to storage to reserve the full sequence range. Note that since
172    /// blocks are contiguous (each starts where the previous ends), the returned
173    /// sequence range is always contiguous.
174    ///
175    pub fn allocate(&mut self, count: u64) -> (u64, Option<Record>) {
176        let remaining = self.block.remaining();
177
178        // If current block can satisfy the request, use it
179        if remaining >= count {
180            let base_sequence = self.block.next_sequence;
181            self.block.next_sequence += count;
182            return (base_sequence, None);
183        }
184
185        // Need a new block. Use remaining sequences from current block first.
186        let from_new_block = count - remaining;
187        let (new_block, record) = self.init_next_block(from_new_block);
188
189        // Base sequence is either from current block (if any remaining) or new block
190        let base_sequence = if remaining > 0 {
191            self.block.next_sequence
192        } else {
193            new_block.base_sequence
194        };
195
196        self.block.next_sequence = new_block.base_sequence + from_new_block;
197        self.block.current_block = Some(new_block);
198
199        (base_sequence, Some(record))
200    }
201
202    fn init_next_block(&self, min_count: u64) -> (SeqBlock, Record) {
203        let base_sequence = match &self.block.current_block {
204            Some(block) => block.next_base(),
205            None => 0,
206        };
207
208        let block_size = min_count.max(DEFAULT_BLOCK_SIZE);
209        let new_block = SeqBlock::new(base_sequence, block_size);
210
211        let value: Bytes = new_block.serialize();
212        let record = Record::new(self.key.clone(), value);
213        (new_block, record)
214    }
215
216    pub fn freeze(self) -> (Bytes, AllocatedSeqBlock) {
217        (self.key, self.block)
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use crate::storage::Storage;
225    use opendata_macros::storage_test;
226
227    fn test_key() -> Bytes {
228        Bytes::from_static(&[0x01, 0x02])
229    }
230
231    #[storage_test]
232    async fn should_load_none_when_no_block_allocated(storage: Arc<dyn Storage>) {
233        // when
234        let block = AllocatedSeqBlock::load(storage.as_ref(), &test_key())
235            .await
236            .unwrap();
237
238        // then
239        assert_eq!(block.next_sequence, 0);
240        assert_eq!(block.current_block, None);
241    }
242
243    #[storage_test]
244    async fn should_load_first_block(storage: Arc<dyn Storage>) {
245        // given:
246        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
247            .await
248            .unwrap();
249
250        // when:
251        let (seq, record) = allocator.allocate(1);
252
253        // then:
254        assert_eq!(seq, 0);
255        assert!(record.is_some());
256        let record = record.unwrap();
257        let block = SeqBlock::deserialize(&record.value).unwrap();
258        assert_eq!(block.base_sequence, 0);
259        assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE);
260    }
261
262    #[storage_test]
263    async fn should_allocate_larger_block_when_requested(storage: Arc<dyn Storage>) {
264        // given
265        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
266            .await
267            .unwrap();
268
269        // when:
270        let large_count = DEFAULT_BLOCK_SIZE * 2;
271        let (seq, record) = allocator.allocate(large_count);
272
273        // then:
274        assert_eq!(seq, 0);
275        assert!(record.is_some());
276        let record = record.unwrap();
277        let block = SeqBlock::deserialize(&record.value).unwrap();
278        assert_eq!(block.base_sequence, seq);
279        assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE * 2);
280        let (seq, _) = allocator.allocate(1);
281        assert_eq!(seq, large_count);
282    }
283
284    #[storage_test]
285    async fn should_allocate_sequential_blocks(storage: Arc<dyn Storage>) {
286        // given
287        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
288            .await
289            .unwrap();
290        let mut puts = vec![];
291
292        // when
293        for _ in 0..(DEFAULT_BLOCK_SIZE * 3) {
294            let (_, maybe_put) = allocator.allocate(1);
295            maybe_put.inspect(|r| puts.push(r.clone()));
296        }
297
298        // then
299        let blocks: Vec<_> = puts
300            .into_iter()
301            .map(|r| SeqBlock::deserialize(&r.value).unwrap())
302            .collect();
303        assert_eq!(blocks.len(), 3);
304        assert_eq!(blocks[0].base_sequence, 0);
305        assert_eq!(blocks[1].base_sequence, DEFAULT_BLOCK_SIZE);
306        assert_eq!(blocks[2].base_sequence, DEFAULT_BLOCK_SIZE * 2);
307    }
308
309    #[storage_test]
310    async fn should_recover_from_storage_on_initialize(storage: Arc<dyn Storage>) {
311        // First instance allocates some blocks
312        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
313            .await
314            .unwrap();
315        allocator.allocate(DEFAULT_BLOCK_SIZE);
316        let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE);
317        storage.put(vec![put.unwrap().into()]).await.unwrap();
318
319        // when: Second instance should recover
320        let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
321            .await
322            .unwrap();
323
324        // then:
325        assert_eq!(
326            allocator2.block.current_block,
327            Some(SeqBlock::new(DEFAULT_BLOCK_SIZE, DEFAULT_BLOCK_SIZE))
328        );
329        assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE * 2);
330        let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
331        let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
332        assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE * 2);
333        assert_eq!(seq, DEFAULT_BLOCK_SIZE * 2);
334    }
335
336    #[storage_test]
337    async fn should_resume_from_next_block_on_initialize(storage: Arc<dyn Storage>) {
338        // First instance allocates some blocks
339        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
340            .await
341            .unwrap();
342        let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE / 2);
343        storage.put(vec![put.unwrap().into()]).await.unwrap();
344
345        // when: Second instance should recover
346        let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
347            .await
348            .unwrap();
349
350        // then:
351        assert_eq!(
352            allocator2.block.current_block,
353            Some(SeqBlock::new(0, DEFAULT_BLOCK_SIZE))
354        );
355        assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE);
356        let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
357        let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
358        assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
359        assert_eq!(seq, DEFAULT_BLOCK_SIZE);
360    }
361
362    #[storage_test]
363    async fn should_allocate_sequential_sequence_numbers(storage: Arc<dyn Storage>) {
364        // given
365        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
366            .await
367            .unwrap();
368
369        // when
370        let (seq1, _) = allocator.allocate_one();
371        let (seq2, _) = allocator.allocate_one();
372        let (seq3, _) = allocator.allocate_one();
373
374        // then
375        assert_eq!(seq1, 0);
376        assert_eq!(seq2, 1);
377        assert_eq!(seq3, 2);
378    }
379
380    #[storage_test]
381    async fn should_allocate_batch_of_sequences(storage: Arc<dyn Storage>) {
382        // given
383        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
384            .await
385            .unwrap();
386
387        // when
388        let (seq1, _) = allocator.allocate(10);
389        let (seq2, _) = allocator.allocate(5);
390
391        // then
392        assert_eq!(seq1, 0);
393        assert_eq!(seq2, 10);
394    }
395
396    #[storage_test]
397    async fn should_span_blocks_when_batch_exceeds_remaining(storage: Arc<dyn Storage>) {
398        // given
399        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
400            .await
401            .unwrap();
402
403        // Allocate most of the first block
404        allocator.allocate(DEFAULT_BLOCK_SIZE - 10);
405
406        // when - allocate more than remaining (10 left, request 25)
407        let (seq, put) = allocator.allocate(25);
408
409        // then - should get contiguous sequences starting at remaining position
410        assert_eq!(seq, DEFAULT_BLOCK_SIZE - 10);
411        assert!(put.is_some());
412        let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
413        assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
414    }
415
416    #[storage_test]
417    async fn should_allocate_new_block_when_exhausted(storage: Arc<dyn Storage>) {
418        // given
419        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
420            .await
421            .unwrap();
422
423        // when - allocate entire first block plus one more
424        allocator.allocate(DEFAULT_BLOCK_SIZE);
425        let (seq, put) = allocator.allocate_one();
426
427        // then - should be first sequence of second block
428        assert_eq!(seq, DEFAULT_BLOCK_SIZE);
429        assert!(put.is_some());
430    }
431
432    #[storage_test]
433    async fn should_allocate_exactly_remaining(storage: Arc<dyn Storage>) {
434        // given
435        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
436            .await
437            .unwrap();
438
439        // Use some sequences
440        allocator.allocate(100);
441        let remaining = DEFAULT_BLOCK_SIZE - 100;
442
443        // when - allocate exactly the remaining amount
444        let (seq, put) = allocator.allocate(remaining);
445
446        // then - should use up exactly the current block
447        assert_eq!(seq, 100);
448        assert!(put.is_none());
449
450        // and next allocation should come from new block
451        let (next_seq, put) = allocator.allocate_one();
452        assert_eq!(next_seq, DEFAULT_BLOCK_SIZE);
453        assert!(put.is_some());
454    }
455
456    #[storage_test]
457    async fn should_handle_large_batch_spanning_from_partial_block(storage: Arc<dyn Storage>) {
458        // given
459        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
460            .await
461            .unwrap();
462
463        // Use most of first block, leaving 100
464        allocator.allocate(DEFAULT_BLOCK_SIZE - 100);
465
466        // when - request much more than remaining (spans into new block)
467        let large_request = DEFAULT_BLOCK_SIZE + 500;
468        let (seq, _) = allocator.allocate(large_request);
469
470        // then - should start at the remaining position
471        assert_eq!(seq, DEFAULT_BLOCK_SIZE - 100);
472
473        // and next allocation continues after the large batch
474        let (next_seq, _) = allocator.allocate_one();
475        assert_eq!(next_seq, DEFAULT_BLOCK_SIZE - 100 + large_request);
476    }
477
478    #[storage_test]
479    async fn should_peek_next_sequence_without_consuming(storage: Arc<dyn Storage>) {
480        // given
481        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
482            .await
483            .unwrap();
484
485        // Allocate some sequences
486        allocator.allocate(10);
487
488        // when
489        let peeked = allocator.peek_next_sequence();
490        let (allocated, _) = allocator.allocate_one();
491
492        // then - peeked should match what was allocated
493        assert_eq!(peeked, allocated);
494        assert_eq!(peeked, 10);
495    }
496}