1use bytes::Bytes;
48
49use crate::serde::DeserializeError;
50use crate::serde::seq_block::SeqBlock;
51use crate::{Record, Storage, StorageError};
52
53pub const DEFAULT_BLOCK_SIZE: u64 = 4096;
55
56#[derive(Debug, Clone, PartialEq, Eq)]
58pub enum SequenceError {
59 Storage(StorageError),
61 Deserialize(DeserializeError),
63}
64
65impl std::error::Error for SequenceError {}
66
67impl std::fmt::Display for SequenceError {
68 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
69 match self {
70 SequenceError::Storage(e) => write!(f, "storage error: {}", e),
71 SequenceError::Deserialize(e) => write!(f, "deserialize error: {}", e),
72 }
73 }
74}
75
76impl From<StorageError> for SequenceError {
77 fn from(err: StorageError) -> Self {
78 SequenceError::Storage(err)
79 }
80}
81
82impl From<DeserializeError> for SequenceError {
83 fn from(err: DeserializeError) -> Self {
84 SequenceError::Deserialize(err)
85 }
86}
87
88pub type SequenceResult<T> = std::result::Result<T, SequenceError>;
90
91pub struct AllocatedSeqBlock {
92 current_block: Option<SeqBlock>,
93 next_sequence: u64,
94}
95
96impl AllocatedSeqBlock {
97 fn remaining(&self) -> u64 {
98 match &self.current_block {
99 None => 0,
100 Some(block) => block.next_base().saturating_sub(self.next_sequence),
101 }
102 }
103
104 async fn load(storage: &dyn Storage, key: &Bytes) -> SequenceResult<Self> {
105 let current_block = match storage.get(key.clone()).await? {
106 Some(record) => Some(SeqBlock::deserialize(&record.value)?),
107 None => None,
108 };
109 let next_sequence = current_block
110 .as_ref()
111 .map(|b| b.next_base())
114 .unwrap_or(0);
115 Ok(Self {
116 current_block,
117 next_sequence,
118 })
119 }
120}
121
122pub struct SequenceAllocator {
133 key: Bytes,
134 block: AllocatedSeqBlock,
135}
136
137impl SequenceAllocator {
138 pub async fn load(storage: &dyn Storage, key: Bytes) -> SequenceResult<Self> {
140 let block = AllocatedSeqBlock::load(storage, &key).await?;
141 Ok(Self { key, block })
142 }
143
144 pub fn peek_next_sequence(&self) -> u64 {
148 self.block.next_sequence
149 }
150
151 pub fn allocate_one(&mut self) -> (u64, Option<Record>) {
155 self.allocate(1)
156 }
157
158 pub fn allocate(&mut self, count: u64) -> (u64, Option<Record>) {
170 let remaining = self.block.remaining();
171
172 if remaining >= count {
174 let base_sequence = self.block.next_sequence;
175 self.block.next_sequence += count;
176 return (base_sequence, None);
177 }
178
179 let from_new_block = count - remaining;
181 let (new_block, record) = self.init_next_block(from_new_block);
182
183 let base_sequence = if remaining > 0 {
185 self.block.next_sequence
186 } else {
187 new_block.base_sequence
188 };
189
190 self.block.next_sequence = new_block.base_sequence + from_new_block;
191 self.block.current_block = Some(new_block);
192
193 (base_sequence, Some(record))
194 }
195
196 fn init_next_block(&self, min_count: u64) -> (SeqBlock, Record) {
197 let base_sequence = match &self.block.current_block {
198 Some(block) => block.next_base(),
199 None => 0,
200 };
201
202 let block_size = min_count.max(DEFAULT_BLOCK_SIZE);
203 let new_block = SeqBlock::new(base_sequence, block_size);
204
205 let value: Bytes = new_block.serialize();
206 let record = Record::new(self.key.clone(), value);
207 (new_block, record)
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use crate::storage::in_memory::InMemoryStorage;
215 use std::sync::Arc;
216
217 fn test_key() -> Bytes {
218 Bytes::from_static(&[0x01, 0x02])
219 }
220
221 #[tokio::test]
222 async fn should_load_none_when_no_block_allocated() {
223 let storage = Arc::new(InMemoryStorage::new());
225
226 let block = AllocatedSeqBlock::load(storage.as_ref(), &test_key())
228 .await
229 .unwrap();
230
231 assert_eq!(block.next_sequence, 0);
233 assert_eq!(block.current_block, None);
234 }
235
236 #[tokio::test]
237 async fn should_load_first_block() {
238 let storage = Arc::new(InMemoryStorage::new());
240 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
241 .await
242 .unwrap();
243
244 let (seq, record) = allocator.allocate(1);
246
247 assert_eq!(seq, 0);
249 assert!(record.is_some());
250 let record = record.unwrap();
251 let block = SeqBlock::deserialize(&record.value).unwrap();
252 assert_eq!(block.base_sequence, 0);
253 assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE);
254 }
255
256 #[tokio::test]
257 async fn should_allocate_larger_block_when_requested() {
258 let storage = Arc::new(InMemoryStorage::new());
260 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
261 .await
262 .unwrap();
263
264 let large_count = DEFAULT_BLOCK_SIZE * 2;
266 let (seq, record) = allocator.allocate(large_count);
267
268 assert_eq!(seq, 0);
270 assert!(record.is_some());
271 let record = record.unwrap();
272 let block = SeqBlock::deserialize(&record.value).unwrap();
273 assert_eq!(block.base_sequence, seq);
274 assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE * 2);
275 let (seq, _) = allocator.allocate(1);
276 assert_eq!(seq, large_count);
277 }
278
279 #[tokio::test]
280 async fn should_allocate_sequential_blocks() {
281 let storage = Arc::new(InMemoryStorage::new());
283 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
284 .await
285 .unwrap();
286 let mut puts = vec![];
287
288 for _ in 0..(DEFAULT_BLOCK_SIZE * 3) {
290 let (_, maybe_put) = allocator.allocate(1);
291 maybe_put.inspect(|r| puts.push(r.clone()));
292 }
293
294 let blocks: Vec<_> = puts
296 .into_iter()
297 .map(|r| SeqBlock::deserialize(&r.value).unwrap())
298 .collect();
299 assert_eq!(blocks.len(), 3);
300 assert_eq!(blocks[0].base_sequence, 0);
301 assert_eq!(blocks[1].base_sequence, DEFAULT_BLOCK_SIZE);
302 assert_eq!(blocks[2].base_sequence, DEFAULT_BLOCK_SIZE * 2);
303 }
304
305 #[tokio::test]
306 async fn should_recover_from_storage_on_initialize() {
307 let storage: Arc<dyn Storage> = Arc::new(InMemoryStorage::new());
309 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
311 .await
312 .unwrap();
313 allocator.allocate(DEFAULT_BLOCK_SIZE);
314 let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE);
315 storage.put(vec![put.unwrap()]).await.unwrap();
316
317 let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
319 .await
320 .unwrap();
321
322 assert_eq!(
324 allocator2.block.current_block,
325 Some(SeqBlock::new(DEFAULT_BLOCK_SIZE, DEFAULT_BLOCK_SIZE))
326 );
327 assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE * 2);
328 let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
329 let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
330 assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE * 2);
331 assert_eq!(seq, DEFAULT_BLOCK_SIZE * 2);
332 }
333
334 #[tokio::test]
335 async fn should_resume_from_next_block_on_initialize() {
336 let storage: Arc<dyn Storage> = Arc::new(InMemoryStorage::new());
338 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
340 .await
341 .unwrap();
342 let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE / 2);
343 storage.put(vec![put.unwrap()]).await.unwrap();
344
345 let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
347 .await
348 .unwrap();
349
350 assert_eq!(
352 allocator2.block.current_block,
353 Some(SeqBlock::new(0, DEFAULT_BLOCK_SIZE))
354 );
355 assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE);
356 let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
357 let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
358 assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
359 assert_eq!(seq, DEFAULT_BLOCK_SIZE);
360 }
361
362 #[tokio::test]
363 async fn should_allocate_sequential_sequence_numbers() {
364 let storage = Arc::new(InMemoryStorage::new());
366 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
367 .await
368 .unwrap();
369
370 let (seq1, _) = allocator.allocate_one();
372 let (seq2, _) = allocator.allocate_one();
373 let (seq3, _) = allocator.allocate_one();
374
375 assert_eq!(seq1, 0);
377 assert_eq!(seq2, 1);
378 assert_eq!(seq3, 2);
379 }
380
381 #[tokio::test]
382 async fn should_allocate_batch_of_sequences() {
383 let storage = Arc::new(InMemoryStorage::new());
385 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
386 .await
387 .unwrap();
388
389 let (seq1, _) = allocator.allocate(10);
391 let (seq2, _) = allocator.allocate(5);
392
393 assert_eq!(seq1, 0);
395 assert_eq!(seq2, 10);
396 }
397
398 #[tokio::test]
399 async fn should_span_blocks_when_batch_exceeds_remaining() {
400 let storage = Arc::new(InMemoryStorage::new());
402 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
403 .await
404 .unwrap();
405
406 allocator.allocate(DEFAULT_BLOCK_SIZE - 10);
408
409 let (seq, put) = allocator.allocate(25);
411
412 assert_eq!(seq, DEFAULT_BLOCK_SIZE - 10);
414 assert!(put.is_some());
415 let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
416 assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
417 }
418
419 #[tokio::test]
420 async fn should_allocate_new_block_when_exhausted() {
421 let storage = Arc::new(InMemoryStorage::new());
423 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
424 .await
425 .unwrap();
426
427 allocator.allocate(DEFAULT_BLOCK_SIZE);
429 let (seq, put) = allocator.allocate_one();
430
431 assert_eq!(seq, DEFAULT_BLOCK_SIZE);
433 assert!(put.is_some());
434 }
435
436 #[tokio::test]
437 async fn should_allocate_exactly_remaining() {
438 let storage = Arc::new(InMemoryStorage::new());
440 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
441 .await
442 .unwrap();
443
444 allocator.allocate(100);
446 let remaining = DEFAULT_BLOCK_SIZE - 100;
447
448 let (seq, put) = allocator.allocate(remaining);
450
451 assert_eq!(seq, 100);
453 assert!(put.is_none());
454
455 let (next_seq, put) = allocator.allocate_one();
457 assert_eq!(next_seq, DEFAULT_BLOCK_SIZE);
458 assert!(put.is_some());
459 }
460
461 #[tokio::test]
462 async fn should_handle_large_batch_spanning_from_partial_block() {
463 let storage = Arc::new(InMemoryStorage::new());
465 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
466 .await
467 .unwrap();
468
469 allocator.allocate(DEFAULT_BLOCK_SIZE - 100);
471
472 let large_request = DEFAULT_BLOCK_SIZE + 500;
474 let (seq, _) = allocator.allocate(large_request);
475
476 assert_eq!(seq, DEFAULT_BLOCK_SIZE - 100);
478
479 let (next_seq, _) = allocator.allocate_one();
481 assert_eq!(next_seq, DEFAULT_BLOCK_SIZE - 100 + large_request);
482 }
483
484 #[tokio::test]
485 async fn should_peek_next_sequence_without_consuming() {
486 let storage = Arc::new(InMemoryStorage::new());
488 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
489 .await
490 .unwrap();
491
492 allocator.allocate(10);
494
495 let peeked = allocator.peek_next_sequence();
497 let (allocated, _) = allocator.allocate_one();
498
499 assert_eq!(peeked, allocated);
501 assert_eq!(peeked, 10);
502 }
503}