Skip to main content

common/
sequence.rs

1//! Sequence number allocation and persistence.
2//!
3//! This module provides components for allocating monotonically increasing
4//! sequence numbers with crash recovery. It provides:
5//!
6//! **[`SequenceAllocator`]**: Allocates individual sequence numbers from sequence
7//!  blocks stored in storage. Tracks the current reserved block, and allocates new
8//!  sequence numbers from it. When the block is out of sequence numbers, the
9//!  allocator allocates a new block, and returns the corresponding Record in the
10//!  return from `SequenceAllocator#allocate`. It is up to the caller to ensure
11//!  that the record is persisted in storage. This allows the allocator to be used
12//!  from implementations of `Delta` by including the sequence block writes in
13//!  persisted deltas.
14//!
15//! # Design
16//!
17//! Block-based allocation reduces write amplification by pre-allocating ranges
18//! of sequence numbers instead of persisting after every allocation. The allocator
19//! tracks allocations via [`SeqBlock`] records:
20//!
21//! - `base_sequence`: Starting sequence number of the allocated block
22//! - `block_size`: Number of sequence numbers in the block
23//!
24//! On crash recovery, the next block starts at `base_sequence + block_size`,
25//! ensuring monotonicity even if some allocated sequences were unused.
26//!
27//! # Usage
28//!
29//! Each system provides its own key format for storing the SeqBlock record:
30//!
31//! ```ignore
32//! use bytes::Bytes;
33//! use common::sequence::{SeqBlockStore, SequenceAllocator};
34//!
35//! // Domain-specific key (e.g., Log uses [0x01, 0x02])
36//! const MY_SEQ_BLOCK_KEY: &[u8] = &[0x01, 0x02];
37//!
38//! let key = Bytes::from_static(MY_SEQ_BLOCK_KEY);
39//! let allocator = SequenceAllocator::load(storage.clone(), key);
40//!
41//! let (seq, put) = allocator.allocate_one().await?;
42//! if let Some(put) = put {
43//!     storage.put(vec![put]).await.unwrap();
44//! }
45//! ```
46
47use bytes::Bytes;
48
49use crate::serde::DeserializeError;
50use crate::serde::seq_block::SeqBlock;
51use crate::{Record, Storage, StorageError};
52
53/// Default block size for sequence allocation.
54pub const DEFAULT_BLOCK_SIZE: u64 = 4096;
55
56/// Error type for sequence allocation operations.
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub enum SequenceError {
59    /// Storage operation failed
60    Storage(StorageError),
61    /// Deserialization failed
62    Deserialize(DeserializeError),
63}
64
65impl std::error::Error for SequenceError {}
66
67impl std::fmt::Display for SequenceError {
68    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
69        match self {
70            SequenceError::Storage(e) => write!(f, "storage error: {}", e),
71            SequenceError::Deserialize(e) => write!(f, "deserialize error: {}", e),
72        }
73    }
74}
75
76impl From<StorageError> for SequenceError {
77    fn from(err: StorageError) -> Self {
78        SequenceError::Storage(err)
79    }
80}
81
82impl From<DeserializeError> for SequenceError {
83    fn from(err: DeserializeError) -> Self {
84        SequenceError::Deserialize(err)
85    }
86}
87
88/// Result type alias for sequence allocation operations.
89pub type SequenceResult<T> = std::result::Result<T, SequenceError>;
90
91#[derive(Clone)]
92pub struct AllocatedSeqBlock {
93    current_block: Option<SeqBlock>,
94    next_sequence: u64,
95}
96
97impl AllocatedSeqBlock {
98    fn remaining(&self) -> u64 {
99        match &self.current_block {
100            None => 0,
101            Some(block) => block.next_base().saturating_sub(self.next_sequence),
102        }
103    }
104
105    async fn load(storage: &dyn Storage, key: &Bytes) -> SequenceResult<Self> {
106        let current_block = match storage.get(key.clone()).await? {
107            Some(record) => Some(SeqBlock::deserialize(&record.value)?),
108            None => None,
109        };
110        let next_sequence = current_block
111            .as_ref()
112            // in the event that block exists, set the next sequence to the next base
113            // so that a new block is immediately allocated
114            .map(|b| b.next_base())
115            .unwrap_or(0);
116        Ok(Self {
117            current_block,
118            next_sequence,
119        })
120    }
121}
122
123/// Allocates sequence numbers from pre-allocated blocks.
124///
125/// This allocator manages the lifecycle of sequence number allocation:
126/// - Initialize from storage on startup
127/// - Allocate individual sequence numbers
128/// - Persist new blocks when the current block is exhausted
129///
130/// # Thread Safety
131///
132/// This struct is not inherently thread safe. It should be owned by a single writing task
133pub struct SequenceAllocator {
134    key: Bytes,
135    block: AllocatedSeqBlock,
136}
137
138impl SequenceAllocator {
139    /// Creates a new allocator using the given storage and key.
140    pub async fn load(storage: &dyn Storage, key: Bytes) -> SequenceResult<Self> {
141        let block = AllocatedSeqBlock::load(storage, &key).await?;
142        Ok(Self { key, block })
143    }
144
145    pub fn new(key: Bytes, block: AllocatedSeqBlock) -> Self {
146        Self { key, block }
147    }
148
149    /// Returns the next sequence number that would be allocated.
150    ///
151    /// This does not consume any sequences; it just peeks at the current state.
152    pub fn peek_next_sequence(&self) -> u64 {
153        self.block.next_sequence
154    }
155
156    /// Allocates a single sequence number.
157    ///
158    /// Convenience method equivalent to `allocate(1)`.
159    pub fn allocate_one(&mut self) -> (u64, Option<Record>) {
160        self.allocate(1)
161    }
162
163    /// Allocates a contiguous range of sequence numbers.
164    ///
165    /// Returns a pair where the first element is the first sequence number in
166    /// the allocated range. The caller can use sequences `base..base+count`.
167    /// The second element is an `Option<Record>`. If the current block doesn't
168    /// have enough sequences remaining, the remaining sequences are used and a
169    /// new block is allocated for the rest. The returned record, if present,
170    /// must be put to storage to reserve the full sequence range. Note that since
171    /// blocks are contiguous (each starts where the previous ends), the returned
172    /// sequence range is always contiguous.
173    ///
174    pub fn allocate(&mut self, count: u64) -> (u64, Option<Record>) {
175        let remaining = self.block.remaining();
176
177        // If current block can satisfy the request, use it
178        if remaining >= count {
179            let base_sequence = self.block.next_sequence;
180            self.block.next_sequence += count;
181            return (base_sequence, None);
182        }
183
184        // Need a new block. Use remaining sequences from current block first.
185        let from_new_block = count - remaining;
186        let (new_block, record) = self.init_next_block(from_new_block);
187
188        // Base sequence is either from current block (if any remaining) or new block
189        let base_sequence = if remaining > 0 {
190            self.block.next_sequence
191        } else {
192            new_block.base_sequence
193        };
194
195        self.block.next_sequence = new_block.base_sequence + from_new_block;
196        self.block.current_block = Some(new_block);
197
198        (base_sequence, Some(record))
199    }
200
201    fn init_next_block(&self, min_count: u64) -> (SeqBlock, Record) {
202        let base_sequence = match &self.block.current_block {
203            Some(block) => block.next_base(),
204            None => 0,
205        };
206
207        let block_size = min_count.max(DEFAULT_BLOCK_SIZE);
208        let new_block = SeqBlock::new(base_sequence, block_size);
209
210        let value: Bytes = new_block.serialize();
211        let record = Record::new(self.key.clone(), value);
212        (new_block, record)
213    }
214
215    pub fn freeze(self) -> (Bytes, AllocatedSeqBlock) {
216        (self.key, self.block)
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223    use crate::storage::Storage;
224    use opendata_macros::storage_test;
225
226    fn test_key() -> Bytes {
227        Bytes::from_static(&[0x01, 0x02])
228    }
229
230    #[storage_test]
231    async fn should_load_none_when_no_block_allocated(storage: Arc<dyn Storage>) {
232        // when
233        let block = AllocatedSeqBlock::load(storage.as_ref(), &test_key())
234            .await
235            .unwrap();
236
237        // then
238        assert_eq!(block.next_sequence, 0);
239        assert_eq!(block.current_block, None);
240    }
241
242    #[storage_test]
243    async fn should_load_first_block(storage: Arc<dyn Storage>) {
244        // given:
245        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
246            .await
247            .unwrap();
248
249        // when:
250        let (seq, record) = allocator.allocate(1);
251
252        // then:
253        assert_eq!(seq, 0);
254        assert!(record.is_some());
255        let record = record.unwrap();
256        let block = SeqBlock::deserialize(&record.value).unwrap();
257        assert_eq!(block.base_sequence, 0);
258        assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE);
259    }
260
261    #[storage_test]
262    async fn should_allocate_larger_block_when_requested(storage: Arc<dyn Storage>) {
263        // given
264        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
265            .await
266            .unwrap();
267
268        // when:
269        let large_count = DEFAULT_BLOCK_SIZE * 2;
270        let (seq, record) = allocator.allocate(large_count);
271
272        // then:
273        assert_eq!(seq, 0);
274        assert!(record.is_some());
275        let record = record.unwrap();
276        let block = SeqBlock::deserialize(&record.value).unwrap();
277        assert_eq!(block.base_sequence, seq);
278        assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE * 2);
279        let (seq, _) = allocator.allocate(1);
280        assert_eq!(seq, large_count);
281    }
282
283    #[storage_test]
284    async fn should_allocate_sequential_blocks(storage: Arc<dyn Storage>) {
285        // given
286        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
287            .await
288            .unwrap();
289        let mut puts = vec![];
290
291        // when
292        for _ in 0..(DEFAULT_BLOCK_SIZE * 3) {
293            let (_, maybe_put) = allocator.allocate(1);
294            maybe_put.inspect(|r| puts.push(r.clone()));
295        }
296
297        // then
298        let blocks: Vec<_> = puts
299            .into_iter()
300            .map(|r| SeqBlock::deserialize(&r.value).unwrap())
301            .collect();
302        assert_eq!(blocks.len(), 3);
303        assert_eq!(blocks[0].base_sequence, 0);
304        assert_eq!(blocks[1].base_sequence, DEFAULT_BLOCK_SIZE);
305        assert_eq!(blocks[2].base_sequence, DEFAULT_BLOCK_SIZE * 2);
306    }
307
308    #[storage_test]
309    async fn should_recover_from_storage_on_initialize(storage: Arc<dyn Storage>) {
310        // First instance allocates some blocks
311        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
312            .await
313            .unwrap();
314        allocator.allocate(DEFAULT_BLOCK_SIZE);
315        let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE);
316        storage.put(vec![put.unwrap().into()]).await.unwrap();
317
318        // when: Second instance should recover
319        let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
320            .await
321            .unwrap();
322
323        // then:
324        assert_eq!(
325            allocator2.block.current_block,
326            Some(SeqBlock::new(DEFAULT_BLOCK_SIZE, DEFAULT_BLOCK_SIZE))
327        );
328        assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE * 2);
329        let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
330        let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
331        assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE * 2);
332        assert_eq!(seq, DEFAULT_BLOCK_SIZE * 2);
333    }
334
335    #[storage_test]
336    async fn should_resume_from_next_block_on_initialize(storage: Arc<dyn Storage>) {
337        // First instance allocates some blocks
338        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
339            .await
340            .unwrap();
341        let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE / 2);
342        storage.put(vec![put.unwrap().into()]).await.unwrap();
343
344        // when: Second instance should recover
345        let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
346            .await
347            .unwrap();
348
349        // then:
350        assert_eq!(
351            allocator2.block.current_block,
352            Some(SeqBlock::new(0, DEFAULT_BLOCK_SIZE))
353        );
354        assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE);
355        let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
356        let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
357        assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
358        assert_eq!(seq, DEFAULT_BLOCK_SIZE);
359    }
360
361    #[storage_test]
362    async fn should_allocate_sequential_sequence_numbers(storage: Arc<dyn Storage>) {
363        // given
364        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
365            .await
366            .unwrap();
367
368        // when
369        let (seq1, _) = allocator.allocate_one();
370        let (seq2, _) = allocator.allocate_one();
371        let (seq3, _) = allocator.allocate_one();
372
373        // then
374        assert_eq!(seq1, 0);
375        assert_eq!(seq2, 1);
376        assert_eq!(seq3, 2);
377    }
378
379    #[storage_test]
380    async fn should_allocate_batch_of_sequences(storage: Arc<dyn Storage>) {
381        // given
382        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
383            .await
384            .unwrap();
385
386        // when
387        let (seq1, _) = allocator.allocate(10);
388        let (seq2, _) = allocator.allocate(5);
389
390        // then
391        assert_eq!(seq1, 0);
392        assert_eq!(seq2, 10);
393    }
394
395    #[storage_test]
396    async fn should_span_blocks_when_batch_exceeds_remaining(storage: Arc<dyn Storage>) {
397        // given
398        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
399            .await
400            .unwrap();
401
402        // Allocate most of the first block
403        allocator.allocate(DEFAULT_BLOCK_SIZE - 10);
404
405        // when - allocate more than remaining (10 left, request 25)
406        let (seq, put) = allocator.allocate(25);
407
408        // then - should get contiguous sequences starting at remaining position
409        assert_eq!(seq, DEFAULT_BLOCK_SIZE - 10);
410        assert!(put.is_some());
411        let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
412        assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
413    }
414
415    #[storage_test]
416    async fn should_allocate_new_block_when_exhausted(storage: Arc<dyn Storage>) {
417        // given
418        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
419            .await
420            .unwrap();
421
422        // when - allocate entire first block plus one more
423        allocator.allocate(DEFAULT_BLOCK_SIZE);
424        let (seq, put) = allocator.allocate_one();
425
426        // then - should be first sequence of second block
427        assert_eq!(seq, DEFAULT_BLOCK_SIZE);
428        assert!(put.is_some());
429    }
430
431    #[storage_test]
432    async fn should_allocate_exactly_remaining(storage: Arc<dyn Storage>) {
433        // given
434        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
435            .await
436            .unwrap();
437
438        // Use some sequences
439        allocator.allocate(100);
440        let remaining = DEFAULT_BLOCK_SIZE - 100;
441
442        // when - allocate exactly the remaining amount
443        let (seq, put) = allocator.allocate(remaining);
444
445        // then - should use up exactly the current block
446        assert_eq!(seq, 100);
447        assert!(put.is_none());
448
449        // and next allocation should come from new block
450        let (next_seq, put) = allocator.allocate_one();
451        assert_eq!(next_seq, DEFAULT_BLOCK_SIZE);
452        assert!(put.is_some());
453    }
454
455    #[storage_test]
456    async fn should_handle_large_batch_spanning_from_partial_block(storage: Arc<dyn Storage>) {
457        // given
458        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
459            .await
460            .unwrap();
461
462        // Use most of first block, leaving 100
463        allocator.allocate(DEFAULT_BLOCK_SIZE - 100);
464
465        // when - request much more than remaining (spans into new block)
466        let large_request = DEFAULT_BLOCK_SIZE + 500;
467        let (seq, _) = allocator.allocate(large_request);
468
469        // then - should start at the remaining position
470        assert_eq!(seq, DEFAULT_BLOCK_SIZE - 100);
471
472        // and next allocation continues after the large batch
473        let (next_seq, _) = allocator.allocate_one();
474        assert_eq!(next_seq, DEFAULT_BLOCK_SIZE - 100 + large_request);
475    }
476
477    #[storage_test]
478    async fn should_peek_next_sequence_without_consuming(storage: Arc<dyn Storage>) {
479        // given
480        let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
481            .await
482            .unwrap();
483
484        // Allocate some sequences
485        allocator.allocate(10);
486
487        // when
488        let peeked = allocator.peek_next_sequence();
489        let (allocated, _) = allocator.allocate_one();
490
491        // then - peeked should match what was allocated
492        assert_eq!(peeked, allocated);
493        assert_eq!(peeked, 10);
494    }
495}