1use bytes::Bytes;
48
49use crate::serde::DeserializeError;
50use crate::serde::seq_block::SeqBlock;
51use crate::{Record, Storage, StorageError};
52
53pub const DEFAULT_BLOCK_SIZE: u64 = 4096;
55
56#[derive(Debug, Clone, PartialEq, Eq)]
58pub enum SequenceError {
59 Storage(StorageError),
61 Deserialize(DeserializeError),
63}
64
65impl std::error::Error for SequenceError {}
66
67impl std::fmt::Display for SequenceError {
68 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
69 match self {
70 SequenceError::Storage(e) => write!(f, "storage error: {}", e),
71 SequenceError::Deserialize(e) => write!(f, "deserialize error: {}", e),
72 }
73 }
74}
75
76impl From<StorageError> for SequenceError {
77 fn from(err: StorageError) -> Self {
78 SequenceError::Storage(err)
79 }
80}
81
82impl From<DeserializeError> for SequenceError {
83 fn from(err: DeserializeError) -> Self {
84 SequenceError::Deserialize(err)
85 }
86}
87
88pub type SequenceResult<T> = std::result::Result<T, SequenceError>;
90
91#[derive(Clone, Debug)]
92pub struct AllocatedSeqBlock {
93 current_block: Option<SeqBlock>,
94 next_sequence: u64,
95}
96
97impl AllocatedSeqBlock {
98 fn remaining(&self) -> u64 {
99 match &self.current_block {
100 None => 0,
101 Some(block) => block.next_base().saturating_sub(self.next_sequence),
102 }
103 }
104
105 async fn load(storage: &dyn Storage, key: &Bytes) -> SequenceResult<Self> {
106 let current_block = match storage.get(key.clone()).await? {
107 Some(record) => Some(SeqBlock::deserialize(&record.value)?),
108 None => None,
109 };
110 let next_sequence = current_block
111 .as_ref()
112 .map(|b| b.next_base())
115 .unwrap_or(0);
116 Ok(Self {
117 current_block,
118 next_sequence,
119 })
120 }
121}
122
123#[derive(Debug)]
134pub struct SequenceAllocator {
135 key: Bytes,
136 block: AllocatedSeqBlock,
137}
138
139impl SequenceAllocator {
140 pub async fn load(storage: &dyn Storage, key: Bytes) -> SequenceResult<Self> {
142 let block = AllocatedSeqBlock::load(storage, &key).await?;
143 Ok(Self { key, block })
144 }
145
146 pub fn new(key: Bytes, block: AllocatedSeqBlock) -> Self {
147 Self { key, block }
148 }
149
150 pub fn peek_next_sequence(&self) -> u64 {
154 self.block.next_sequence
155 }
156
157 pub fn allocate_one(&mut self) -> (u64, Option<Record>) {
161 self.allocate(1)
162 }
163
164 pub fn allocate(&mut self, count: u64) -> (u64, Option<Record>) {
176 let remaining = self.block.remaining();
177
178 if remaining >= count {
180 let base_sequence = self.block.next_sequence;
181 self.block.next_sequence += count;
182 return (base_sequence, None);
183 }
184
185 let from_new_block = count - remaining;
187 let (new_block, record) = self.init_next_block(from_new_block);
188
189 let base_sequence = if remaining > 0 {
191 self.block.next_sequence
192 } else {
193 new_block.base_sequence
194 };
195
196 self.block.next_sequence = new_block.base_sequence + from_new_block;
197 self.block.current_block = Some(new_block);
198
199 (base_sequence, Some(record))
200 }
201
202 fn init_next_block(&self, min_count: u64) -> (SeqBlock, Record) {
203 let base_sequence = match &self.block.current_block {
204 Some(block) => block.next_base(),
205 None => 0,
206 };
207
208 let block_size = min_count.max(DEFAULT_BLOCK_SIZE);
209 let new_block = SeqBlock::new(base_sequence, block_size);
210
211 let value: Bytes = new_block.serialize();
212 let record = Record::new(self.key.clone(), value);
213 (new_block, record)
214 }
215
216 pub fn freeze(self) -> (Bytes, AllocatedSeqBlock) {
217 (self.key, self.block)
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use crate::storage::Storage;
225 use opendata_macros::storage_test;
226
227 fn test_key() -> Bytes {
228 Bytes::from_static(&[0x01, 0x02])
229 }
230
231 #[storage_test]
232 async fn should_load_none_when_no_block_allocated(storage: Arc<dyn Storage>) {
233 let block = AllocatedSeqBlock::load(storage.as_ref(), &test_key())
235 .await
236 .unwrap();
237
238 assert_eq!(block.next_sequence, 0);
240 assert_eq!(block.current_block, None);
241 }
242
243 #[storage_test]
244 async fn should_load_first_block(storage: Arc<dyn Storage>) {
245 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
247 .await
248 .unwrap();
249
250 let (seq, record) = allocator.allocate(1);
252
253 assert_eq!(seq, 0);
255 assert!(record.is_some());
256 let record = record.unwrap();
257 let block = SeqBlock::deserialize(&record.value).unwrap();
258 assert_eq!(block.base_sequence, 0);
259 assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE);
260 }
261
262 #[storage_test]
263 async fn should_allocate_larger_block_when_requested(storage: Arc<dyn Storage>) {
264 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
266 .await
267 .unwrap();
268
269 let large_count = DEFAULT_BLOCK_SIZE * 2;
271 let (seq, record) = allocator.allocate(large_count);
272
273 assert_eq!(seq, 0);
275 assert!(record.is_some());
276 let record = record.unwrap();
277 let block = SeqBlock::deserialize(&record.value).unwrap();
278 assert_eq!(block.base_sequence, seq);
279 assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE * 2);
280 let (seq, _) = allocator.allocate(1);
281 assert_eq!(seq, large_count);
282 }
283
284 #[storage_test]
285 async fn should_allocate_sequential_blocks(storage: Arc<dyn Storage>) {
286 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
288 .await
289 .unwrap();
290 let mut puts = vec![];
291
292 for _ in 0..(DEFAULT_BLOCK_SIZE * 3) {
294 let (_, maybe_put) = allocator.allocate(1);
295 maybe_put.inspect(|r| puts.push(r.clone()));
296 }
297
298 let blocks: Vec<_> = puts
300 .into_iter()
301 .map(|r| SeqBlock::deserialize(&r.value).unwrap())
302 .collect();
303 assert_eq!(blocks.len(), 3);
304 assert_eq!(blocks[0].base_sequence, 0);
305 assert_eq!(blocks[1].base_sequence, DEFAULT_BLOCK_SIZE);
306 assert_eq!(blocks[2].base_sequence, DEFAULT_BLOCK_SIZE * 2);
307 }
308
309 #[storage_test]
310 async fn should_recover_from_storage_on_initialize(storage: Arc<dyn Storage>) {
311 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
313 .await
314 .unwrap();
315 allocator.allocate(DEFAULT_BLOCK_SIZE);
316 let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE);
317 storage.put(vec![put.unwrap().into()]).await.unwrap();
318
319 let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
321 .await
322 .unwrap();
323
324 assert_eq!(
326 allocator2.block.current_block,
327 Some(SeqBlock::new(DEFAULT_BLOCK_SIZE, DEFAULT_BLOCK_SIZE))
328 );
329 assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE * 2);
330 let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
331 let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
332 assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE * 2);
333 assert_eq!(seq, DEFAULT_BLOCK_SIZE * 2);
334 }
335
336 #[storage_test]
337 async fn should_resume_from_next_block_on_initialize(storage: Arc<dyn Storage>) {
338 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
340 .await
341 .unwrap();
342 let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE / 2);
343 storage.put(vec![put.unwrap().into()]).await.unwrap();
344
345 let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
347 .await
348 .unwrap();
349
350 assert_eq!(
352 allocator2.block.current_block,
353 Some(SeqBlock::new(0, DEFAULT_BLOCK_SIZE))
354 );
355 assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE);
356 let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
357 let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
358 assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
359 assert_eq!(seq, DEFAULT_BLOCK_SIZE);
360 }
361
362 #[storage_test]
363 async fn should_allocate_sequential_sequence_numbers(storage: Arc<dyn Storage>) {
364 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
366 .await
367 .unwrap();
368
369 let (seq1, _) = allocator.allocate_one();
371 let (seq2, _) = allocator.allocate_one();
372 let (seq3, _) = allocator.allocate_one();
373
374 assert_eq!(seq1, 0);
376 assert_eq!(seq2, 1);
377 assert_eq!(seq3, 2);
378 }
379
380 #[storage_test]
381 async fn should_allocate_batch_of_sequences(storage: Arc<dyn Storage>) {
382 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
384 .await
385 .unwrap();
386
387 let (seq1, _) = allocator.allocate(10);
389 let (seq2, _) = allocator.allocate(5);
390
391 assert_eq!(seq1, 0);
393 assert_eq!(seq2, 10);
394 }
395
396 #[storage_test]
397 async fn should_span_blocks_when_batch_exceeds_remaining(storage: Arc<dyn Storage>) {
398 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
400 .await
401 .unwrap();
402
403 allocator.allocate(DEFAULT_BLOCK_SIZE - 10);
405
406 let (seq, put) = allocator.allocate(25);
408
409 assert_eq!(seq, DEFAULT_BLOCK_SIZE - 10);
411 assert!(put.is_some());
412 let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
413 assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
414 }
415
416 #[storage_test]
417 async fn should_allocate_new_block_when_exhausted(storage: Arc<dyn Storage>) {
418 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
420 .await
421 .unwrap();
422
423 allocator.allocate(DEFAULT_BLOCK_SIZE);
425 let (seq, put) = allocator.allocate_one();
426
427 assert_eq!(seq, DEFAULT_BLOCK_SIZE);
429 assert!(put.is_some());
430 }
431
432 #[storage_test]
433 async fn should_allocate_exactly_remaining(storage: Arc<dyn Storage>) {
434 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
436 .await
437 .unwrap();
438
439 allocator.allocate(100);
441 let remaining = DEFAULT_BLOCK_SIZE - 100;
442
443 let (seq, put) = allocator.allocate(remaining);
445
446 assert_eq!(seq, 100);
448 assert!(put.is_none());
449
450 let (next_seq, put) = allocator.allocate_one();
452 assert_eq!(next_seq, DEFAULT_BLOCK_SIZE);
453 assert!(put.is_some());
454 }
455
456 #[storage_test]
457 async fn should_handle_large_batch_spanning_from_partial_block(storage: Arc<dyn Storage>) {
458 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
460 .await
461 .unwrap();
462
463 allocator.allocate(DEFAULT_BLOCK_SIZE - 100);
465
466 let large_request = DEFAULT_BLOCK_SIZE + 500;
468 let (seq, _) = allocator.allocate(large_request);
469
470 assert_eq!(seq, DEFAULT_BLOCK_SIZE - 100);
472
473 let (next_seq, _) = allocator.allocate_one();
475 assert_eq!(next_seq, DEFAULT_BLOCK_SIZE - 100 + large_request);
476 }
477
478 #[storage_test]
479 async fn should_peek_next_sequence_without_consuming(storage: Arc<dyn Storage>) {
480 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
482 .await
483 .unwrap();
484
485 allocator.allocate(10);
487
488 let peeked = allocator.peek_next_sequence();
490 let (allocated, _) = allocator.allocate_one();
491
492 assert_eq!(peeked, allocated);
494 assert_eq!(peeked, 10);
495 }
496}