1use bytes::Bytes;
48
49use crate::serde::DeserializeError;
50use crate::serde::seq_block::SeqBlock;
51use crate::{Record, Storage, StorageError};
52
53pub const DEFAULT_BLOCK_SIZE: u64 = 4096;
55
56#[derive(Debug, Clone, PartialEq, Eq)]
58pub enum SequenceError {
59 Storage(StorageError),
61 Deserialize(DeserializeError),
63}
64
65impl std::error::Error for SequenceError {}
66
67impl std::fmt::Display for SequenceError {
68 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
69 match self {
70 SequenceError::Storage(e) => write!(f, "storage error: {}", e),
71 SequenceError::Deserialize(e) => write!(f, "deserialize error: {}", e),
72 }
73 }
74}
75
76impl From<StorageError> for SequenceError {
77 fn from(err: StorageError) -> Self {
78 SequenceError::Storage(err)
79 }
80}
81
82impl From<DeserializeError> for SequenceError {
83 fn from(err: DeserializeError) -> Self {
84 SequenceError::Deserialize(err)
85 }
86}
87
88pub type SequenceResult<T> = std::result::Result<T, SequenceError>;
90
91#[derive(Clone)]
92pub struct AllocatedSeqBlock {
93 current_block: Option<SeqBlock>,
94 next_sequence: u64,
95}
96
97impl AllocatedSeqBlock {
98 fn remaining(&self) -> u64 {
99 match &self.current_block {
100 None => 0,
101 Some(block) => block.next_base().saturating_sub(self.next_sequence),
102 }
103 }
104
105 async fn load(storage: &dyn Storage, key: &Bytes) -> SequenceResult<Self> {
106 let current_block = match storage.get(key.clone()).await? {
107 Some(record) => Some(SeqBlock::deserialize(&record.value)?),
108 None => None,
109 };
110 let next_sequence = current_block
111 .as_ref()
112 .map(|b| b.next_base())
115 .unwrap_or(0);
116 Ok(Self {
117 current_block,
118 next_sequence,
119 })
120 }
121}
122
123pub struct SequenceAllocator {
134 key: Bytes,
135 block: AllocatedSeqBlock,
136}
137
138impl SequenceAllocator {
139 pub async fn load(storage: &dyn Storage, key: Bytes) -> SequenceResult<Self> {
141 let block = AllocatedSeqBlock::load(storage, &key).await?;
142 Ok(Self { key, block })
143 }
144
145 pub fn new(key: Bytes, block: AllocatedSeqBlock) -> Self {
146 Self { key, block }
147 }
148
149 pub fn peek_next_sequence(&self) -> u64 {
153 self.block.next_sequence
154 }
155
156 pub fn allocate_one(&mut self) -> (u64, Option<Record>) {
160 self.allocate(1)
161 }
162
163 pub fn allocate(&mut self, count: u64) -> (u64, Option<Record>) {
175 let remaining = self.block.remaining();
176
177 if remaining >= count {
179 let base_sequence = self.block.next_sequence;
180 self.block.next_sequence += count;
181 return (base_sequence, None);
182 }
183
184 let from_new_block = count - remaining;
186 let (new_block, record) = self.init_next_block(from_new_block);
187
188 let base_sequence = if remaining > 0 {
190 self.block.next_sequence
191 } else {
192 new_block.base_sequence
193 };
194
195 self.block.next_sequence = new_block.base_sequence + from_new_block;
196 self.block.current_block = Some(new_block);
197
198 (base_sequence, Some(record))
199 }
200
201 fn init_next_block(&self, min_count: u64) -> (SeqBlock, Record) {
202 let base_sequence = match &self.block.current_block {
203 Some(block) => block.next_base(),
204 None => 0,
205 };
206
207 let block_size = min_count.max(DEFAULT_BLOCK_SIZE);
208 let new_block = SeqBlock::new(base_sequence, block_size);
209
210 let value: Bytes = new_block.serialize();
211 let record = Record::new(self.key.clone(), value);
212 (new_block, record)
213 }
214
215 pub fn freeze(self) -> (Bytes, AllocatedSeqBlock) {
216 (self.key, self.block)
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223 use crate::storage::Storage;
224 use opendata_macros::storage_test;
225
226 fn test_key() -> Bytes {
227 Bytes::from_static(&[0x01, 0x02])
228 }
229
230 #[storage_test]
231 async fn should_load_none_when_no_block_allocated(storage: Arc<dyn Storage>) {
232 let block = AllocatedSeqBlock::load(storage.as_ref(), &test_key())
234 .await
235 .unwrap();
236
237 assert_eq!(block.next_sequence, 0);
239 assert_eq!(block.current_block, None);
240 }
241
242 #[storage_test]
243 async fn should_load_first_block(storage: Arc<dyn Storage>) {
244 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
246 .await
247 .unwrap();
248
249 let (seq, record) = allocator.allocate(1);
251
252 assert_eq!(seq, 0);
254 assert!(record.is_some());
255 let record = record.unwrap();
256 let block = SeqBlock::deserialize(&record.value).unwrap();
257 assert_eq!(block.base_sequence, 0);
258 assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE);
259 }
260
261 #[storage_test]
262 async fn should_allocate_larger_block_when_requested(storage: Arc<dyn Storage>) {
263 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
265 .await
266 .unwrap();
267
268 let large_count = DEFAULT_BLOCK_SIZE * 2;
270 let (seq, record) = allocator.allocate(large_count);
271
272 assert_eq!(seq, 0);
274 assert!(record.is_some());
275 let record = record.unwrap();
276 let block = SeqBlock::deserialize(&record.value).unwrap();
277 assert_eq!(block.base_sequence, seq);
278 assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE * 2);
279 let (seq, _) = allocator.allocate(1);
280 assert_eq!(seq, large_count);
281 }
282
283 #[storage_test]
284 async fn should_allocate_sequential_blocks(storage: Arc<dyn Storage>) {
285 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
287 .await
288 .unwrap();
289 let mut puts = vec![];
290
291 for _ in 0..(DEFAULT_BLOCK_SIZE * 3) {
293 let (_, maybe_put) = allocator.allocate(1);
294 maybe_put.inspect(|r| puts.push(r.clone()));
295 }
296
297 let blocks: Vec<_> = puts
299 .into_iter()
300 .map(|r| SeqBlock::deserialize(&r.value).unwrap())
301 .collect();
302 assert_eq!(blocks.len(), 3);
303 assert_eq!(blocks[0].base_sequence, 0);
304 assert_eq!(blocks[1].base_sequence, DEFAULT_BLOCK_SIZE);
305 assert_eq!(blocks[2].base_sequence, DEFAULT_BLOCK_SIZE * 2);
306 }
307
308 #[storage_test]
309 async fn should_recover_from_storage_on_initialize(storage: Arc<dyn Storage>) {
310 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
312 .await
313 .unwrap();
314 allocator.allocate(DEFAULT_BLOCK_SIZE);
315 let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE);
316 storage.put(vec![put.unwrap().into()]).await.unwrap();
317
318 let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
320 .await
321 .unwrap();
322
323 assert_eq!(
325 allocator2.block.current_block,
326 Some(SeqBlock::new(DEFAULT_BLOCK_SIZE, DEFAULT_BLOCK_SIZE))
327 );
328 assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE * 2);
329 let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
330 let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
331 assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE * 2);
332 assert_eq!(seq, DEFAULT_BLOCK_SIZE * 2);
333 }
334
335 #[storage_test]
336 async fn should_resume_from_next_block_on_initialize(storage: Arc<dyn Storage>) {
337 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
339 .await
340 .unwrap();
341 let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE / 2);
342 storage.put(vec![put.unwrap().into()]).await.unwrap();
343
344 let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
346 .await
347 .unwrap();
348
349 assert_eq!(
351 allocator2.block.current_block,
352 Some(SeqBlock::new(0, DEFAULT_BLOCK_SIZE))
353 );
354 assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE);
355 let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
356 let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
357 assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
358 assert_eq!(seq, DEFAULT_BLOCK_SIZE);
359 }
360
361 #[storage_test]
362 async fn should_allocate_sequential_sequence_numbers(storage: Arc<dyn Storage>) {
363 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
365 .await
366 .unwrap();
367
368 let (seq1, _) = allocator.allocate_one();
370 let (seq2, _) = allocator.allocate_one();
371 let (seq3, _) = allocator.allocate_one();
372
373 assert_eq!(seq1, 0);
375 assert_eq!(seq2, 1);
376 assert_eq!(seq3, 2);
377 }
378
379 #[storage_test]
380 async fn should_allocate_batch_of_sequences(storage: Arc<dyn Storage>) {
381 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
383 .await
384 .unwrap();
385
386 let (seq1, _) = allocator.allocate(10);
388 let (seq2, _) = allocator.allocate(5);
389
390 assert_eq!(seq1, 0);
392 assert_eq!(seq2, 10);
393 }
394
395 #[storage_test]
396 async fn should_span_blocks_when_batch_exceeds_remaining(storage: Arc<dyn Storage>) {
397 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
399 .await
400 .unwrap();
401
402 allocator.allocate(DEFAULT_BLOCK_SIZE - 10);
404
405 let (seq, put) = allocator.allocate(25);
407
408 assert_eq!(seq, DEFAULT_BLOCK_SIZE - 10);
410 assert!(put.is_some());
411 let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
412 assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
413 }
414
415 #[storage_test]
416 async fn should_allocate_new_block_when_exhausted(storage: Arc<dyn Storage>) {
417 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
419 .await
420 .unwrap();
421
422 allocator.allocate(DEFAULT_BLOCK_SIZE);
424 let (seq, put) = allocator.allocate_one();
425
426 assert_eq!(seq, DEFAULT_BLOCK_SIZE);
428 assert!(put.is_some());
429 }
430
431 #[storage_test]
432 async fn should_allocate_exactly_remaining(storage: Arc<dyn Storage>) {
433 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
435 .await
436 .unwrap();
437
438 allocator.allocate(100);
440 let remaining = DEFAULT_BLOCK_SIZE - 100;
441
442 let (seq, put) = allocator.allocate(remaining);
444
445 assert_eq!(seq, 100);
447 assert!(put.is_none());
448
449 let (next_seq, put) = allocator.allocate_one();
451 assert_eq!(next_seq, DEFAULT_BLOCK_SIZE);
452 assert!(put.is_some());
453 }
454
455 #[storage_test]
456 async fn should_handle_large_batch_spanning_from_partial_block(storage: Arc<dyn Storage>) {
457 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
459 .await
460 .unwrap();
461
462 allocator.allocate(DEFAULT_BLOCK_SIZE - 100);
464
465 let large_request = DEFAULT_BLOCK_SIZE + 500;
467 let (seq, _) = allocator.allocate(large_request);
468
469 assert_eq!(seq, DEFAULT_BLOCK_SIZE - 100);
471
472 let (next_seq, _) = allocator.allocate_one();
474 assert_eq!(next_seq, DEFAULT_BLOCK_SIZE - 100 + large_request);
475 }
476
477 #[storage_test]
478 async fn should_peek_next_sequence_without_consuming(storage: Arc<dyn Storage>) {
479 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
481 .await
482 .unwrap();
483
484 allocator.allocate(10);
486
487 let peeked = allocator.peek_next_sequence();
489 let (allocated, _) = allocator.allocate_one();
490
491 assert_eq!(peeked, allocated);
493 assert_eq!(peeked, 10);
494 }
495}