use bytes::Bytes;
use crate::serde::DeserializeError;
use crate::serde::seq_block::SeqBlock;
use crate::{Record, Storage, StorageError};
pub const DEFAULT_BLOCK_SIZE: u64 = 4096;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SequenceError {
Storage(StorageError),
Deserialize(DeserializeError),
}
impl std::error::Error for SequenceError {}
impl std::fmt::Display for SequenceError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
SequenceError::Storage(e) => write!(f, "storage error: {}", e),
SequenceError::Deserialize(e) => write!(f, "deserialize error: {}", e),
}
}
}
impl From<StorageError> for SequenceError {
fn from(err: StorageError) -> Self {
SequenceError::Storage(err)
}
}
impl From<DeserializeError> for SequenceError {
fn from(err: DeserializeError) -> Self {
SequenceError::Deserialize(err)
}
}
pub type SequenceResult<T> = std::result::Result<T, SequenceError>;
#[derive(Clone)]
pub struct AllocatedSeqBlock {
current_block: Option<SeqBlock>,
next_sequence: u64,
}
impl AllocatedSeqBlock {
fn remaining(&self) -> u64 {
match &self.current_block {
None => 0,
Some(block) => block.next_base().saturating_sub(self.next_sequence),
}
}
async fn load(storage: &dyn Storage, key: &Bytes) -> SequenceResult<Self> {
let current_block = match storage.get(key.clone()).await? {
Some(record) => Some(SeqBlock::deserialize(&record.value)?),
None => None,
};
let next_sequence = current_block
.as_ref()
.map(|b| b.next_base())
.unwrap_or(0);
Ok(Self {
current_block,
next_sequence,
})
}
}
pub struct SequenceAllocator {
key: Bytes,
block: AllocatedSeqBlock,
}
impl SequenceAllocator {
pub async fn load(storage: &dyn Storage, key: Bytes) -> SequenceResult<Self> {
let block = AllocatedSeqBlock::load(storage, &key).await?;
Ok(Self { key, block })
}
pub fn new(key: Bytes, block: AllocatedSeqBlock) -> Self {
Self { key, block }
}
pub fn peek_next_sequence(&self) -> u64 {
self.block.next_sequence
}
pub fn allocate_one(&mut self) -> (u64, Option<Record>) {
self.allocate(1)
}
pub fn allocate(&mut self, count: u64) -> (u64, Option<Record>) {
let remaining = self.block.remaining();
if remaining >= count {
let base_sequence = self.block.next_sequence;
self.block.next_sequence += count;
return (base_sequence, None);
}
let from_new_block = count - remaining;
let (new_block, record) = self.init_next_block(from_new_block);
let base_sequence = if remaining > 0 {
self.block.next_sequence
} else {
new_block.base_sequence
};
self.block.next_sequence = new_block.base_sequence + from_new_block;
self.block.current_block = Some(new_block);
(base_sequence, Some(record))
}
fn init_next_block(&self, min_count: u64) -> (SeqBlock, Record) {
let base_sequence = match &self.block.current_block {
Some(block) => block.next_base(),
None => 0,
};
let block_size = min_count.max(DEFAULT_BLOCK_SIZE);
let new_block = SeqBlock::new(base_sequence, block_size);
let value: Bytes = new_block.serialize();
let record = Record::new(self.key.clone(), value);
(new_block, record)
}
pub fn freeze(self) -> (Bytes, AllocatedSeqBlock) {
(self.key, self.block)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::Storage;
use opendata_macros::storage_test;
fn test_key() -> Bytes {
Bytes::from_static(&[0x01, 0x02])
}
#[storage_test]
async fn should_load_none_when_no_block_allocated(storage: Arc<dyn Storage>) {
let block = AllocatedSeqBlock::load(storage.as_ref(), &test_key())
.await
.unwrap();
assert_eq!(block.next_sequence, 0);
assert_eq!(block.current_block, None);
}
#[storage_test]
async fn should_load_first_block(storage: Arc<dyn Storage>) {
let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
.await
.unwrap();
let (seq, record) = allocator.allocate(1);
assert_eq!(seq, 0);
assert!(record.is_some());
let record = record.unwrap();
let block = SeqBlock::deserialize(&record.value).unwrap();
assert_eq!(block.base_sequence, 0);
assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE);
}
#[storage_test]
async fn should_allocate_larger_block_when_requested(storage: Arc<dyn Storage>) {
let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
.await
.unwrap();
let large_count = DEFAULT_BLOCK_SIZE * 2;
let (seq, record) = allocator.allocate(large_count);
assert_eq!(seq, 0);
assert!(record.is_some());
let record = record.unwrap();
let block = SeqBlock::deserialize(&record.value).unwrap();
assert_eq!(block.base_sequence, seq);
assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE * 2);
let (seq, _) = allocator.allocate(1);
assert_eq!(seq, large_count);
}
#[storage_test]
async fn should_allocate_sequential_blocks(storage: Arc<dyn Storage>) {
let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
.await
.unwrap();
let mut puts = vec![];
for _ in 0..(DEFAULT_BLOCK_SIZE * 3) {
let (_, maybe_put) = allocator.allocate(1);
maybe_put.inspect(|r| puts.push(r.clone()));
}
let blocks: Vec<_> = puts
.into_iter()
.map(|r| SeqBlock::deserialize(&r.value).unwrap())
.collect();
assert_eq!(blocks.len(), 3);
assert_eq!(blocks[0].base_sequence, 0);
assert_eq!(blocks[1].base_sequence, DEFAULT_BLOCK_SIZE);
assert_eq!(blocks[2].base_sequence, DEFAULT_BLOCK_SIZE * 2);
}
#[storage_test]
async fn should_recover_from_storage_on_initialize(storage: Arc<dyn Storage>) {
let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
.await
.unwrap();
allocator.allocate(DEFAULT_BLOCK_SIZE);
let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE);
storage.put(vec![put.unwrap().into()]).await.unwrap();
let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
.await
.unwrap();
assert_eq!(
allocator2.block.current_block,
Some(SeqBlock::new(DEFAULT_BLOCK_SIZE, DEFAULT_BLOCK_SIZE))
);
assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE * 2);
let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE * 2);
assert_eq!(seq, DEFAULT_BLOCK_SIZE * 2);
}
#[storage_test]
async fn should_resume_from_next_block_on_initialize(storage: Arc<dyn Storage>) {
let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
.await
.unwrap();
let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE / 2);
storage.put(vec![put.unwrap().into()]).await.unwrap();
let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
.await
.unwrap();
assert_eq!(
allocator2.block.current_block,
Some(SeqBlock::new(0, DEFAULT_BLOCK_SIZE))
);
assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE);
let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
assert_eq!(seq, DEFAULT_BLOCK_SIZE);
}
#[storage_test]
async fn should_allocate_sequential_sequence_numbers(storage: Arc<dyn Storage>) {
let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
.await
.unwrap();
let (seq1, _) = allocator.allocate_one();
let (seq2, _) = allocator.allocate_one();
let (seq3, _) = allocator.allocate_one();
assert_eq!(seq1, 0);
assert_eq!(seq2, 1);
assert_eq!(seq3, 2);
}
#[storage_test]
async fn should_allocate_batch_of_sequences(storage: Arc<dyn Storage>) {
let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
.await
.unwrap();
let (seq1, _) = allocator.allocate(10);
let (seq2, _) = allocator.allocate(5);
assert_eq!(seq1, 0);
assert_eq!(seq2, 10);
}
#[storage_test]
async fn should_span_blocks_when_batch_exceeds_remaining(storage: Arc<dyn Storage>) {
let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
.await
.unwrap();
allocator.allocate(DEFAULT_BLOCK_SIZE - 10);
let (seq, put) = allocator.allocate(25);
assert_eq!(seq, DEFAULT_BLOCK_SIZE - 10);
assert!(put.is_some());
let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
}
#[storage_test]
async fn should_allocate_new_block_when_exhausted(storage: Arc<dyn Storage>) {
let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
.await
.unwrap();
allocator.allocate(DEFAULT_BLOCK_SIZE);
let (seq, put) = allocator.allocate_one();
assert_eq!(seq, DEFAULT_BLOCK_SIZE);
assert!(put.is_some());
}
#[storage_test]
async fn should_allocate_exactly_remaining(storage: Arc<dyn Storage>) {
let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
.await
.unwrap();
allocator.allocate(100);
let remaining = DEFAULT_BLOCK_SIZE - 100;
let (seq, put) = allocator.allocate(remaining);
assert_eq!(seq, 100);
assert!(put.is_none());
let (next_seq, put) = allocator.allocate_one();
assert_eq!(next_seq, DEFAULT_BLOCK_SIZE);
assert!(put.is_some());
}
#[storage_test]
async fn should_handle_large_batch_spanning_from_partial_block(storage: Arc<dyn Storage>) {
let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
.await
.unwrap();
allocator.allocate(DEFAULT_BLOCK_SIZE - 100);
let large_request = DEFAULT_BLOCK_SIZE + 500;
let (seq, _) = allocator.allocate(large_request);
assert_eq!(seq, DEFAULT_BLOCK_SIZE - 100);
let (next_seq, _) = allocator.allocate_one();
assert_eq!(next_seq, DEFAULT_BLOCK_SIZE - 100 + large_request);
}
#[storage_test]
async fn should_peek_next_sequence_without_consuming(storage: Arc<dyn Storage>) {
let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
.await
.unwrap();
allocator.allocate(10);
let peeked = allocator.peek_next_sequence();
let (allocated, _) = allocator.allocate_one();
assert_eq!(peeked, allocated);
assert_eq!(peeked, 10);
}
}