1use bytes::Bytes;
48
49use crate::serde::DeserializeError;
50use crate::serde::seq_block::SeqBlock;
51use crate::{Record, Storage, StorageError};
52
53pub const DEFAULT_BLOCK_SIZE: u64 = 4096;
55
56#[derive(Debug, Clone, PartialEq, Eq)]
58pub enum SequenceError {
59 Storage(StorageError),
61 Deserialize(DeserializeError),
63}
64
65impl std::error::Error for SequenceError {}
66
67impl std::fmt::Display for SequenceError {
68 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
69 match self {
70 SequenceError::Storage(e) => write!(f, "storage error: {}", e),
71 SequenceError::Deserialize(e) => write!(f, "deserialize error: {}", e),
72 }
73 }
74}
75
76impl From<StorageError> for SequenceError {
77 fn from(err: StorageError) -> Self {
78 SequenceError::Storage(err)
79 }
80}
81
82impl From<DeserializeError> for SequenceError {
83 fn from(err: DeserializeError) -> Self {
84 SequenceError::Deserialize(err)
85 }
86}
87
88pub type SequenceResult<T> = std::result::Result<T, SequenceError>;
90
91pub struct AllocatedSeqBlock {
92 current_block: Option<SeqBlock>,
93 next_sequence: u64,
94}
95
96impl AllocatedSeqBlock {
97 fn remaining(&self) -> u64 {
98 match &self.current_block {
99 None => 0,
100 Some(block) => block.next_base().saturating_sub(self.next_sequence),
101 }
102 }
103
104 async fn load(storage: &dyn Storage, key: &Bytes) -> SequenceResult<Self> {
105 let current_block = match storage.get(key.clone()).await? {
106 Some(record) => Some(SeqBlock::deserialize(&record.value)?),
107 None => None,
108 };
109 let next_sequence = current_block
110 .as_ref()
111 .map(|b| b.next_base())
114 .unwrap_or(0);
115 Ok(Self {
116 current_block,
117 next_sequence,
118 })
119 }
120}
121
122pub struct SequenceAllocator {
133 key: Bytes,
134 block: AllocatedSeqBlock,
135}
136
137impl SequenceAllocator {
138 pub async fn load(storage: &dyn Storage, key: Bytes) -> SequenceResult<Self> {
140 let block = AllocatedSeqBlock::load(storage, &key).await?;
141 Ok(Self { key, block })
142 }
143
144 pub fn peek_next_sequence(&self) -> u64 {
148 self.block.next_sequence
149 }
150
151 pub fn allocate_one(&mut self) -> (u64, Option<Record>) {
155 self.allocate(1)
156 }
157
158 pub fn allocate(&mut self, count: u64) -> (u64, Option<Record>) {
170 let remaining = self.block.remaining();
171
172 if remaining >= count {
174 let base_sequence = self.block.next_sequence;
175 self.block.next_sequence += count;
176 return (base_sequence, None);
177 }
178
179 let from_new_block = count - remaining;
181 let (new_block, record) = self.init_next_block(from_new_block);
182
183 let base_sequence = if remaining > 0 {
185 self.block.next_sequence
186 } else {
187 new_block.base_sequence
188 };
189
190 self.block.next_sequence = new_block.base_sequence + from_new_block;
191 self.block.current_block = Some(new_block);
192
193 (base_sequence, Some(record))
194 }
195
196 fn init_next_block(&self, min_count: u64) -> (SeqBlock, Record) {
197 let base_sequence = match &self.block.current_block {
198 Some(block) => block.next_base(),
199 None => 0,
200 };
201
202 let block_size = min_count.max(DEFAULT_BLOCK_SIZE);
203 let new_block = SeqBlock::new(base_sequence, block_size);
204
205 let value: Bytes = new_block.serialize();
206 let record = Record::new(self.key.clone(), value);
207 (new_block, record)
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use crate::storage::Storage;
215 use opendata_macros::storage_test;
216
217 fn test_key() -> Bytes {
218 Bytes::from_static(&[0x01, 0x02])
219 }
220
221 #[storage_test]
222 async fn should_load_none_when_no_block_allocated(storage: Arc<dyn Storage>) {
223 let block = AllocatedSeqBlock::load(storage.as_ref(), &test_key())
225 .await
226 .unwrap();
227
228 assert_eq!(block.next_sequence, 0);
230 assert_eq!(block.current_block, None);
231 }
232
233 #[storage_test]
234 async fn should_load_first_block(storage: Arc<dyn Storage>) {
235 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
237 .await
238 .unwrap();
239
240 let (seq, record) = allocator.allocate(1);
242
243 assert_eq!(seq, 0);
245 assert!(record.is_some());
246 let record = record.unwrap();
247 let block = SeqBlock::deserialize(&record.value).unwrap();
248 assert_eq!(block.base_sequence, 0);
249 assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE);
250 }
251
252 #[storage_test]
253 async fn should_allocate_larger_block_when_requested(storage: Arc<dyn Storage>) {
254 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
256 .await
257 .unwrap();
258
259 let large_count = DEFAULT_BLOCK_SIZE * 2;
261 let (seq, record) = allocator.allocate(large_count);
262
263 assert_eq!(seq, 0);
265 assert!(record.is_some());
266 let record = record.unwrap();
267 let block = SeqBlock::deserialize(&record.value).unwrap();
268 assert_eq!(block.base_sequence, seq);
269 assert_eq!(block.block_size, DEFAULT_BLOCK_SIZE * 2);
270 let (seq, _) = allocator.allocate(1);
271 assert_eq!(seq, large_count);
272 }
273
274 #[storage_test]
275 async fn should_allocate_sequential_blocks(storage: Arc<dyn Storage>) {
276 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
278 .await
279 .unwrap();
280 let mut puts = vec![];
281
282 for _ in 0..(DEFAULT_BLOCK_SIZE * 3) {
284 let (_, maybe_put) = allocator.allocate(1);
285 maybe_put.inspect(|r| puts.push(r.clone()));
286 }
287
288 let blocks: Vec<_> = puts
290 .into_iter()
291 .map(|r| SeqBlock::deserialize(&r.value).unwrap())
292 .collect();
293 assert_eq!(blocks.len(), 3);
294 assert_eq!(blocks[0].base_sequence, 0);
295 assert_eq!(blocks[1].base_sequence, DEFAULT_BLOCK_SIZE);
296 assert_eq!(blocks[2].base_sequence, DEFAULT_BLOCK_SIZE * 2);
297 }
298
299 #[storage_test]
300 async fn should_recover_from_storage_on_initialize(storage: Arc<dyn Storage>) {
301 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
303 .await
304 .unwrap();
305 allocator.allocate(DEFAULT_BLOCK_SIZE);
306 let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE);
307 storage.put(vec![put.unwrap().into()]).await.unwrap();
308
309 let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
311 .await
312 .unwrap();
313
314 assert_eq!(
316 allocator2.block.current_block,
317 Some(SeqBlock::new(DEFAULT_BLOCK_SIZE, DEFAULT_BLOCK_SIZE))
318 );
319 assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE * 2);
320 let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
321 let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
322 assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE * 2);
323 assert_eq!(seq, DEFAULT_BLOCK_SIZE * 2);
324 }
325
326 #[storage_test]
327 async fn should_resume_from_next_block_on_initialize(storage: Arc<dyn Storage>) {
328 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
330 .await
331 .unwrap();
332 let (_, put) = allocator.allocate(DEFAULT_BLOCK_SIZE / 2);
333 storage.put(vec![put.unwrap().into()]).await.unwrap();
334
335 let mut allocator2 = SequenceAllocator::load(storage.as_ref(), test_key())
337 .await
338 .unwrap();
339
340 assert_eq!(
342 allocator2.block.current_block,
343 Some(SeqBlock::new(0, DEFAULT_BLOCK_SIZE))
344 );
345 assert_eq!(allocator2.block.next_sequence, DEFAULT_BLOCK_SIZE);
346 let (seq, put) = allocator2.allocate(DEFAULT_BLOCK_SIZE);
347 let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
348 assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
349 assert_eq!(seq, DEFAULT_BLOCK_SIZE);
350 }
351
352 #[storage_test]
353 async fn should_allocate_sequential_sequence_numbers(storage: Arc<dyn Storage>) {
354 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
356 .await
357 .unwrap();
358
359 let (seq1, _) = allocator.allocate_one();
361 let (seq2, _) = allocator.allocate_one();
362 let (seq3, _) = allocator.allocate_one();
363
364 assert_eq!(seq1, 0);
366 assert_eq!(seq2, 1);
367 assert_eq!(seq3, 2);
368 }
369
370 #[storage_test]
371 async fn should_allocate_batch_of_sequences(storage: Arc<dyn Storage>) {
372 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
374 .await
375 .unwrap();
376
377 let (seq1, _) = allocator.allocate(10);
379 let (seq2, _) = allocator.allocate(5);
380
381 assert_eq!(seq1, 0);
383 assert_eq!(seq2, 10);
384 }
385
386 #[storage_test]
387 async fn should_span_blocks_when_batch_exceeds_remaining(storage: Arc<dyn Storage>) {
388 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
390 .await
391 .unwrap();
392
393 allocator.allocate(DEFAULT_BLOCK_SIZE - 10);
395
396 let (seq, put) = allocator.allocate(25);
398
399 assert_eq!(seq, DEFAULT_BLOCK_SIZE - 10);
401 assert!(put.is_some());
402 let block = SeqBlock::deserialize(&put.unwrap().value).unwrap();
403 assert_eq!(block.base_sequence, DEFAULT_BLOCK_SIZE);
404 }
405
406 #[storage_test]
407 async fn should_allocate_new_block_when_exhausted(storage: Arc<dyn Storage>) {
408 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
410 .await
411 .unwrap();
412
413 allocator.allocate(DEFAULT_BLOCK_SIZE);
415 let (seq, put) = allocator.allocate_one();
416
417 assert_eq!(seq, DEFAULT_BLOCK_SIZE);
419 assert!(put.is_some());
420 }
421
422 #[storage_test]
423 async fn should_allocate_exactly_remaining(storage: Arc<dyn Storage>) {
424 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
426 .await
427 .unwrap();
428
429 allocator.allocate(100);
431 let remaining = DEFAULT_BLOCK_SIZE - 100;
432
433 let (seq, put) = allocator.allocate(remaining);
435
436 assert_eq!(seq, 100);
438 assert!(put.is_none());
439
440 let (next_seq, put) = allocator.allocate_one();
442 assert_eq!(next_seq, DEFAULT_BLOCK_SIZE);
443 assert!(put.is_some());
444 }
445
446 #[storage_test]
447 async fn should_handle_large_batch_spanning_from_partial_block(storage: Arc<dyn Storage>) {
448 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
450 .await
451 .unwrap();
452
453 allocator.allocate(DEFAULT_BLOCK_SIZE - 100);
455
456 let large_request = DEFAULT_BLOCK_SIZE + 500;
458 let (seq, _) = allocator.allocate(large_request);
459
460 assert_eq!(seq, DEFAULT_BLOCK_SIZE - 100);
462
463 let (next_seq, _) = allocator.allocate_one();
465 assert_eq!(next_seq, DEFAULT_BLOCK_SIZE - 100 + large_request);
466 }
467
468 #[storage_test]
469 async fn should_peek_next_sequence_without_consuming(storage: Arc<dyn Storage>) {
470 let mut allocator = SequenceAllocator::load(storage.as_ref(), test_key())
472 .await
473 .unwrap();
474
475 allocator.allocate(10);
477
478 let peeked = allocator.peek_next_sequence();
480 let (allocated, _) = allocator.allocate_one();
481
482 assert_eq!(peeked, allocated);
484 assert_eq!(peeked, 10);
485 }
486}