Skip to main content

common/
sequence.rs

1//! Sequence number allocation and persistence.
2//!
3//! This module provides components for allocating monotonically increasing
4//! sequence numbers with crash recovery. It provides:
5//!
6//! **[`SequenceAllocator`]**: Allocates individual sequence numbers from sequence
7//!  blocks stored in storage. Tracks the current reserved block, and allocates new
8//!  sequence numbers from it. When the block is out of sequence numbers, the
9//!  allocator allocates a new block, and returns the corresponding Record in the
10//!  return from `SequenceAllocator#allocate`. It is up to the caller to ensure
11//!  that the record is persisted in storage. This allows the allocator to be used
12//!  from implementations of `Delta` by including the sequence block writes in
13//!  persisted deltas.
14//!
15//! # Design
16//!
17//! Block-based allocation reduces write amplification by pre-allocating ranges
18//! of sequence numbers instead of persisting after every allocation. The allocator
19//! tracks allocations via [`SeqBlock`] records:
20//!
21//! - `base_sequence`: Starting sequence number of the allocated block
22//! - `block_size`: Number of sequence numbers in the block
23//!
24//! On crash recovery, the next block starts at `base_sequence + block_size`,
25//! ensuring monotonicity even if some allocated sequences were unused.
26//!
27//! # Usage
28//!
29//! Each system provides its own key format for storing the SeqBlock record:
30//!
31//! ```ignore
32//! use bytes::Bytes;
33//! use common::sequence::{SeqBlockStore, SequenceAllocator};
34//!
35//! // Domain-specific key (e.g., Log uses [0x01, 0x02])
36//! const MY_SEQ_BLOCK_KEY: &[u8] = &[0x01, 0x02];
37//!
38//! let key = Bytes::from_static(MY_SEQ_BLOCK_KEY);
39//! let allocator = SequenceAllocator::load(storage.clone(), key);
40//!
41//! let (seq, put) = allocator.allocate_one().await?;
42//! if let Some(put) = put {
43//!     storage.put(vec![put]).await.unwrap();
44//! }
45//! ```
46
47use bytes::Bytes;
48
49use crate::serde::DeserializeError;
50use crate::serde::seq_block::SeqBlock;
51use crate::{Record, Storage, StorageError};
52
53/// Default block size for sequence allocation.
54pub const DEFAULT_BLOCK_SIZE: u64 = 4096;
55
56/// Error type for sequence allocation operations.
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub enum SequenceError {
59    /// Storage operation failed
60    Storage(StorageError),
61    /// Deserialization failed
62    Deserialize(DeserializeError),
63}
64
65impl std::error::Error for SequenceError {}
66
67impl std::fmt::Display for SequenceError {
68    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
69        match self {
70            SequenceError::Storage(e) => write!(f, "storage error: {}", e),
71            SequenceError::Deserialize(e) => write!(f, "deserialize error: {}", e),
72        }
73    }
74}
75
76impl From<StorageError> for SequenceError {
77    fn from(err: StorageError) -> Self {
78        SequenceError::Storage(err)
79    }
80}
81
82impl From<DeserializeError> for SequenceError {
83    fn from(err: DeserializeError) -> Self {
84        SequenceError::Deserialize(err)
85    }
86}
87
88/// Result type alias for sequence allocation operations.
89pub type SequenceResult<T> = std::result::Result<T, SequenceError>;
90
91pub struct AllocatedSeqBlock {
92    current_block: Option<SeqBlock>,
93    next_sequence: u64,
94}
95
96impl AllocatedSeqBlock {
97    fn remaining(&self) -> u64 {
98        match &self.current_block {
99            None => 0,
100            Some(block) => block.next_base().saturating_sub(self.next_sequence),
101        }
102    }
103
104    async fn load(storage: &dyn Storage, key: &Bytes) -> SequenceResult<Self> {
105        let current_block = match storage.get(key.clone()).await? {
106            Some(record) => Some(SeqBlock::deserialize(&record.value)?),
107            None => None,
108        };
109        let next_sequence = current_block
110            .as_ref()
111            // in the event that block exists, set the next sequence to the next base
112            // so that a new block is immediately allocated
113            .map(|b| b.next_base())
114            .unwrap_or(0);
115        Ok(Self {
116            current_block,
117            next_sequence,
118        })
119    }
120}
121
122/// Allocates sequence numbers from pre-allocated blocks.
123///
124/// This allocator manages the lifecycle of sequence number allocation:
125/// - Initialize from storage on startup
126/// - Allocate individual sequence numbers
127/// - Persist new blocks when the current block is exhausted
128///
129/// # Thread Safety
130///
131/// This struct is not inherently thread safe. It should be owned by a single writing task
132pub struct SequenceAllocator {
133    key: Bytes,
134    block: AllocatedSeqBlock,
135}
136
137impl SequenceAllocator {
138    /// Creates a new allocator using the given storage and key.
139    pub async fn load(storage: &dyn Storage, key: Bytes) -> SequenceResult<Self> {
140        let block = AllocatedSeqBlock::load(storage, &key).await?;
141        Ok(Self { key, block })
142    }
143
144    /// Returns the next sequence number that would be allocated.
145    ///
146    /// This does not consume any sequences; it just peeks at the current state.
147    pub fn peek_next_sequence(&self) -> u64 {
148        self.block.next_sequence
149    }
150
151    /// Allocates a single sequence number.
152    ///
153    /// Convenience method equivalent to `allocate(1)`.
154    pub fn allocate_one(&mut self) -> (u64, Option<Record>) {
155        self.allocate(1)
156    }
157
158    /// Allocates a contiguous range of sequence numbers.
159    ///
160    /// Returns a pair where the first element is the first sequence number in
161    /// the allocated range. The caller can use sequences `base..base+count`.
162    /// The second element is an `Option<Record>`. If the current block doesn't
163    /// have enough sequences remaining, the remaining sequences are used and a
164    /// new block is allocated for the rest. The returned record, if present,
165    /// must be put to storage to reserve the full sequence range. Note that since
166    /// blocks are contiguous (each starts where the previous ends), the returned
167    /// sequence range is always contiguous.
168    ///
169    pub fn allocate(&mut self, count: u64) -> (u64, Option<Record>) {
170        let remaining = self.block.remaining();
171
172        // If current block can satisfy the request, use it
173        if remaining >= count {
174            let base_sequence = self.block.next_sequence;
175            self.block.next_sequence += count;
176            return (base_sequence, None);
177        }
178
179        // Need a new block. Use remaining sequences from current block first.
180        let from_new_block = count - remaining;
181        let (new_block, record) = self.init_next_block(from_new_block);
182
183        // Base sequence is either from current block (if any remaining) or new block
184        let base_sequence = if remaining > 0 {
185            self.block.next_sequence
186        } else {
187            new_block.base_sequence
188        };
189
190        self.block.next_sequence = new_block.base_sequence + from_new_block;
191        self.block.current_block = Some(new_block);
192
193        (base_sequence, Some(record))
194    }
195
196    fn init_next_block(&self, min_count: u64) -> (SeqBlock, Record) {
197        let base_sequence = match &self.block.current_block {
198            Some(block) => block.next_base(),
199            None => 0,
200        };
201
202        let block_size = min_count.max(DEFAULT_BLOCK_SIZE);
203        let new_block = SeqBlock::new(base_sequence, block_size);
204
205        let value: Bytes = new_block.serialize();
206        let record = Record::new(self.key.clone(), value);
207        (new_block, record)
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use crate::storage::in_memory::InMemoryStorage;
215    use std::sync::Arc;
216
217    fn test_key() -> Bytes {
218        Bytes::from_static(&[0x01, 0x02])
219    }
220
221    #[tokio::test]
222    async fn should_load_none_when_no_block_allocated() {
223        // given
224        let storage = Arc::new(InMemoryStorage::new());
225
226        // when
227        let block = AllocatedSeqBlock::load(storage.as_ref(), &test_key())
228            .await
229            .unwrap();
230
231        // then
232        assert_eq!(block.next_sequence, 0);
233        assert_eq!(block.current_block, None);
234    }
235
236    #[tokio::test]
237    async fn should_load_first_block() {
238        // given:
239        let storage = Arc::new(InMemoryStorage::new());
240        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
241            .await
242            .unwrap();
243
244        // when:
245        let (seq, record) = allocator.allocate(1);
246
247        // then:
248        assert_eq!(seq, 0);
249        assert!(record.is_some());
250        let record = record.unwrap();
251        let block = SeqBlock::deserialize(&record.value).unwrap();
252        assert_eq!(block.base_sequence, 0);
253        assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE);
254    }
255
256    #[tokio::test]
257    async fn should_allocate_larger_block_when_requested() {
258        // given
259        let storage = Arc::new(InMemoryStorage::new());
260        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
261            .await
262            .unwrap();
263
264        // when:
265        let large_count = DEFAULT_BLOCK_SIZE * 2;
266        let (seq, record) = allocator.allocate(large_count);
267
268        // then:
269        assert_eq!(seq, 0);
270        assert!(record.is_some());
271        let record = record.unwrap();
272        let block = SeqBlock::deserialize(&record.value).unwrap();
273        assert_eq!(block.base_sequence, seq);
274        assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE * 2);
275        let (seq, _) = allocator.allocate(1);
276        assert_eq!(seq, large_count);
277    }
278
279    #[tokio::test]
280    async fn should_allocate_sequential_blocks() {
281        // given
282        let storage = Arc::new(InMemoryStorage::new());
283        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
284            .await
285            .unwrap();
286        let mut puts = vec![];
287
288        // when
289        for _ in 0..(DEFAULT_BLOCK_SIZE * 3) {
290            let (_, maybe_put) = allocator.allocate(1);
291            maybe_put.inspect(|r| puts.push(r.clone()));
292        }
293
294        // then
295        let blocks: Vec<_> = puts
296            .into_iter()
297            .map(|r| SeqBlock::deserialize(&r.value).unwrap())
298            .collect();
299        assert_eq!(blocks.len(), 3);
300        assert_eq!(blocks[0].base_sequence, 0);
301        assert_eq!(blocks[1].base_sequence, DEFAULT_BLOCK_SIZE);
302        assert_eq!(blocks[2].base_sequence, DEFAULT_BLOCK_SIZE * 2);
303    }
304
305    #[tokio::test]
306    async fn should_recover_from_storage_on_initialize() {
307        // given
308        let storage: Arc<dyn Storage> = Arc::new(InMemoryStorage::new());
309        // First instance allocates some blocks
310        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
311            .await
312            .unwrap();
313        allocator.allocate(DEFAULT_BLOCK_SIZE);
314        let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE);
315        storage.put(vec![put.unwrap()]).await.unwrap();
316
317        // when: Second instance should recover
318        let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
319            .await
320            .unwrap();
321
322        // then:
323        assert_eq!(
324            allocator2.block.current_block,
325            Some(SeqBlock::new(DEFAULT_BLOCK_SIZE, DEFAULT_BLOCK_SIZE))
326        );
327        assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE * 2);
328        let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
329        let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
330        assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE * 2);
331        assert_eq!(seq, DEFAULT_BLOCK_SIZE * 2);
332    }
333
334    #[tokio::test]
335    async fn should_resume_from_next_block_on_initialize() {
336        // given
337        let storage: Arc<dyn Storage> = Arc::new(InMemoryStorage::new());
338        // First instance allocates some blocks
339        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
340            .await
341            .unwrap();
342        let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE / 2);
343        storage.put(vec![put.unwrap()]).await.unwrap();
344
345        // when: Second instance should recover
346        let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
347            .await
348            .unwrap();
349
350        // then:
351        assert_eq!(
352            allocator2.block.current_block,
353            Some(SeqBlock::new(0, DEFAULT_BLOCK_SIZE))
354        );
355        assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE);
356        let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
357        let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
358        assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
359        assert_eq!(seq, DEFAULT_BLOCK_SIZE);
360    }
361
362    #[tokio::test]
363    async fn should_allocate_sequential_sequence_numbers() {
364        // given
365        let storage = Arc::new(InMemoryStorage::new());
366        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
367            .await
368            .unwrap();
369
370        // when
371        let (seq1, _) = allocator.allocate_one();
372        let (seq2, _) = allocator.allocate_one();
373        let (seq3, _) = allocator.allocate_one();
374
375        // then
376        assert_eq!(seq1, 0);
377        assert_eq!(seq2, 1);
378        assert_eq!(seq3, 2);
379    }
380
381    #[tokio::test]
382    async fn should_allocate_batch_of_sequences() {
383        // given
384        let storage = Arc::new(InMemoryStorage::new());
385        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
386            .await
387            .unwrap();
388
389        // when
390        let (seq1, _) = allocator.allocate(10);
391        let (seq2, _) = allocator.allocate(5);
392
393        // then
394        assert_eq!(seq1, 0);
395        assert_eq!(seq2, 10);
396    }
397
398    #[tokio::test]
399    async fn should_span_blocks_when_batch_exceeds_remaining() {
400        // given
401        let storage = Arc::new(InMemoryStorage::new());
402        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
403            .await
404            .unwrap();
405
406        // Allocate most of the first block
407        allocator.allocate(DEFAULT_BLOCK_SIZE - 10);
408
409        // when - allocate more than remaining (10 left, request 25)
410        let (seq, put) = allocator.allocate(25);
411
412        // then - should get contiguous sequences starting at remaining position
413        assert_eq!(seq, DEFAULT_BLOCK_SIZE - 10);
414        assert!(put.is_some());
415        let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
416        assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
417    }
418
419    #[tokio::test]
420    async fn should_allocate_new_block_when_exhausted() {
421        // given
422        let storage = Arc::new(InMemoryStorage::new());
423        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
424            .await
425            .unwrap();
426
427        // when - allocate entire first block plus one more
428        allocator.allocate(DEFAULT_BLOCK_SIZE);
429        let (seq, put) = allocator.allocate_one();
430
431        // then - should be first sequence of second block
432        assert_eq!(seq, DEFAULT_BLOCK_SIZE);
433        assert!(put.is_some());
434    }
435
436    #[tokio::test]
437    async fn should_allocate_exactly_remaining() {
438        // given
439        let storage = Arc::new(InMemoryStorage::new());
440        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
441            .await
442            .unwrap();
443
444        // Use some sequences
445        allocator.allocate(100);
446        let remaining = DEFAULT_BLOCK_SIZE - 100;
447
448        // when - allocate exactly the remaining amount
449        let (seq, put) = allocator.allocate(remaining);
450
451        // then - should use up exactly the current block
452        assert_eq!(seq, 100);
453        assert!(put.is_none());
454
455        // and next allocation should come from new block
456        let (next_seq, put) = allocator.allocate_one();
457        assert_eq!(next_seq, DEFAULT_BLOCK_SIZE);
458        assert!(put.is_some());
459    }
460
461    #[tokio::test]
462    async fn should_handle_large_batch_spanning_from_partial_block() {
463        // given
464        let storage = Arc::new(InMemoryStorage::new());
465        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
466            .await
467            .unwrap();
468
469        // Use most of first block, leaving 100
470        allocator.allocate(DEFAULT_BLOCK_SIZE - 100);
471
472        // when - request much more than remaining (spans into new block)
473        let large_request = DEFAULT_BLOCK_SIZE + 500;
474        let (seq, _) = allocator.allocate(large_request);
475
476        // then - should start at the remaining position
477        assert_eq!(seq, DEFAULT_BLOCK_SIZE - 100);
478
479        // and next allocation continues after the large batch
480        let (next_seq, _) = allocator.allocate_one();
481        assert_eq!(next_seq, DEFAULT_BLOCK_SIZE - 100 + large_request);
482    }
483
484    #[tokio::test]
485    async fn should_peek_next_sequence_without_consuming() {
486        // given
487        let storage = Arc::new(InMemoryStorage::new());
488        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
489            .await
490            .unwrap();
491
492        // Allocate some sequences
493        allocator.allocate(10);
494
495        // when
496        let peeked = allocator.peek_next_sequence();
497        let (allocated, _) = allocator.allocate_one();
498
499        // then - peeked should match what was allocated
500        assert_eq!(peeked, allocated);
501        assert_eq!(peeked, 10);
502    }
503}