use bytes::Bytes;
use common::{DEFAULT_BLOCK_SIZE, Record, SeqBlock};
use crate::error::Result;
use crate::serde::SEQ_BLOCK_KEY;
use crate::storage::LogStorageRead;
fn seq_block_key() -> Bytes {
Bytes::from_static(&SEQ_BLOCK_KEY)
}
#[derive(Debug, Clone)]
pub(crate) struct SequenceDelta {
base_sequence: u64,
count: u64,
new_block: Option<SeqBlock>,
}
impl SequenceDelta {
pub(crate) fn base_sequence(&self) -> u64 {
self.base_sequence
}
#[cfg(test)]
pub(crate) fn count(&self) -> u64 {
self.count
}
}
pub(crate) struct SequenceAllocator {
current_block: Option<SeqBlock>,
next_sequence: u64,
}
impl SequenceAllocator {
pub(crate) async fn open(storage: &LogStorageRead) -> Result<Self> {
let current_block = storage.get_seq_block().await?;
let next_sequence = current_block.as_ref().map(|b| b.next_base()).unwrap_or(0);
Ok(Self {
current_block,
next_sequence,
})
}
#[cfg(test)]
pub(crate) fn peek_next_sequence(&self) -> u64 {
self.next_sequence
}
pub(crate) fn build_delta(&self, count: u64, records: &mut Vec<Record>) -> SequenceDelta {
let remaining = self.remaining();
let base_sequence = self.next_sequence;
if remaining >= count {
SequenceDelta {
base_sequence,
count,
new_block: None,
}
} else {
let from_new_block = count - remaining;
let new_base = self
.current_block
.as_ref()
.map(|b| b.next_base())
.unwrap_or(0);
let block_size = from_new_block.max(DEFAULT_BLOCK_SIZE);
let new_block = SeqBlock::new(new_base, block_size);
let key = seq_block_key();
let value = new_block.serialize();
records.push(Record::new(key, value));
let actual_base = if remaining > 0 {
base_sequence
} else {
new_block.base_sequence
};
SequenceDelta {
base_sequence: actual_base,
count,
new_block: Some(new_block),
}
}
}
pub(crate) fn apply_delta(&mut self, delta: SequenceDelta) {
self.next_sequence = delta.base_sequence + delta.count;
if delta.new_block.is_some() {
self.current_block = delta.new_block;
}
}
fn remaining(&self) -> u64 {
match &self.current_block {
None => 0,
Some(block) => block.next_base().saturating_sub(self.next_sequence),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::LogStorage;
#[tokio::test]
async fn should_allocate_from_new_block_on_first_allocation() {
let storage = LogStorage::in_memory();
let allocator = SequenceAllocator::open(&storage.as_read()).await.unwrap();
let mut records = Vec::new();
let delta = allocator.build_delta(10, &mut records);
assert_eq!(delta.base_sequence(), 0);
assert_eq!(delta.count(), 10);
assert!(delta.new_block.is_some());
assert_eq!(records.len(), 1); }
#[tokio::test]
async fn should_allocate_from_current_block_when_sufficient() {
let storage = LogStorage::in_memory();
let mut allocator = SequenceAllocator::open(&storage.as_read()).await.unwrap();
let mut records1 = Vec::new();
let delta1 = allocator.build_delta(10, &mut records1);
allocator.apply_delta(delta1);
let mut records2 = Vec::new();
let delta2 = allocator.build_delta(20, &mut records2);
assert_eq!(delta2.base_sequence(), 10);
assert_eq!(delta2.count(), 20);
assert!(delta2.new_block.is_none()); assert_eq!(records2.len(), 0); }
#[tokio::test]
async fn should_allocate_new_block_when_exhausted() {
let storage = LogStorage::in_memory();
let mut allocator = SequenceAllocator::open(&storage.as_read()).await.unwrap();
let mut records1 = Vec::new();
let delta1 = allocator.build_delta(DEFAULT_BLOCK_SIZE, &mut records1);
allocator.apply_delta(delta1);
let mut records2 = Vec::new();
let delta2 = allocator.build_delta(10, &mut records2);
assert_eq!(delta2.base_sequence(), DEFAULT_BLOCK_SIZE);
assert!(delta2.new_block.is_some());
assert_eq!(records2.len(), 1);
}
#[tokio::test]
async fn should_span_blocks_when_allocation_exceeds_remaining() {
let storage = LogStorage::in_memory();
let mut allocator = SequenceAllocator::open(&storage.as_read()).await.unwrap();
let mut records1 = Vec::new();
let delta1 = allocator.build_delta(DEFAULT_BLOCK_SIZE - 10, &mut records1);
allocator.apply_delta(delta1);
let mut records2 = Vec::new();
let delta2 = allocator.build_delta(25, &mut records2);
assert_eq!(delta2.base_sequence(), DEFAULT_BLOCK_SIZE - 10);
assert_eq!(delta2.count(), 25);
assert!(delta2.new_block.is_some());
}
#[tokio::test]
async fn should_update_state_on_apply_delta() {
let storage = LogStorage::in_memory();
let mut allocator = SequenceAllocator::open(&storage.as_read()).await.unwrap();
let mut records = Vec::new();
let delta = allocator.build_delta(10, &mut records);
allocator.apply_delta(delta);
assert_eq!(allocator.peek_next_sequence(), 10);
}
#[tokio::test]
async fn should_recover_from_storage() {
let storage = LogStorage::in_memory();
let block = SeqBlock::new(1000, 500);
storage.write_seq_block(&block).await.unwrap();
let allocator = SequenceAllocator::open(&storage.as_read()).await.unwrap();
assert_eq!(allocator.peek_next_sequence(), 1500);
}
#[tokio::test]
async fn should_allocate_larger_block_when_needed() {
let storage = LogStorage::in_memory();
let allocator = SequenceAllocator::open(&storage.as_read()).await.unwrap();
let mut records = Vec::new();
let large_count = DEFAULT_BLOCK_SIZE * 2;
let delta = allocator.build_delta(large_count, &mut records);
assert!(delta.new_block.is_some());
assert_eq!(delta.new_block.as_ref().unwrap().block_size, large_count);
}
}