use std::sync::atomic::{AtomicU64, Ordering};
use super::LSN;
pub struct LsnAllocator {
next_lsn: AtomicU64,
}
impl LsnAllocator {
pub fn new() -> Self {
Self {
next_lsn: AtomicU64::new(1),
}
}
pub fn starting_at(lsn: LSN) -> Self {
Self {
next_lsn: AtomicU64::new(lsn.0),
}
}
#[inline]
pub fn allocate(&self) -> LSN {
let lsn = self
.next_lsn
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
if current == u64::MAX {
None } else {
Some(current + 1)
}
})
.expect("LSN Allocator Overflow");
LSN(lsn)
}
#[inline]
pub fn allocate_batch(&self, count: u64) -> (LSN, LSN) {
assert!(count > 0, "Cannot allocate 0 LSNs");
let first = self
.next_lsn
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
if current > u64::MAX.saturating_sub(count) {
None
} else {
Some(current + count)
}
})
.expect("LSN Allocator Overflow");
(LSN(first), LSN(first + count - 1))
}
#[inline]
pub fn current(&self) -> LSN {
LSN(self.next_lsn.load(Ordering::Relaxed))
}
pub fn set_next(&self, lsn: LSN) {
self.next_lsn.store(lsn.0, Ordering::Relaxed);
}
}
impl Default for LsnAllocator {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
use std::sync::Arc;
use std::thread;
#[test]
fn test_allocator_starts_at_one() {
let alloc = LsnAllocator::new();
assert_eq!(alloc.current(), LSN(1));
}
#[test]
fn test_allocator_starts_at_custom_lsn() {
let alloc = LsnAllocator::starting_at(LSN(100));
assert_eq!(alloc.current(), LSN(100));
}
#[test]
fn test_single_allocation() {
let alloc = LsnAllocator::new();
let lsn1 = alloc.allocate();
let lsn2 = alloc.allocate();
let lsn3 = alloc.allocate();
assert_eq!(lsn1, LSN(1));
assert_eq!(lsn2, LSN(2));
assert_eq!(lsn3, LSN(3));
assert_eq!(alloc.current(), LSN(4));
}
#[test]
fn test_batch_allocation() {
let alloc = LsnAllocator::new();
let (first, last) = alloc.allocate_batch(5);
assert_eq!(first, LSN(1));
assert_eq!(last, LSN(5));
assert_eq!(alloc.current(), LSN(6));
}
#[test]
fn test_batch_allocation_single() {
let alloc = LsnAllocator::new();
let (first, last) = alloc.allocate_batch(1);
assert_eq!(first, LSN(1));
assert_eq!(last, LSN(1));
}
#[test]
#[should_panic(expected = "Cannot allocate 0 LSNs")]
fn test_batch_allocation_zero_panics() {
let alloc = LsnAllocator::new();
let _ = alloc.allocate_batch(0);
}
#[test]
fn test_set_next_lsn() {
let alloc = LsnAllocator::new();
alloc.set_next(LSN(1000));
assert_eq!(alloc.current(), LSN(1000));
assert_eq!(alloc.allocate(), LSN(1000));
assert_eq!(alloc.allocate(), LSN(1001));
}
#[test]
fn test_concurrent_allocation_unique() {
let alloc = Arc::new(LsnAllocator::new());
let num_threads = 8;
let allocations_per_thread = 1000;
let handles: Vec<_> = (0..num_threads)
.map(|_| {
let alloc = Arc::clone(&alloc);
thread::spawn(move || {
let mut lsns = Vec::with_capacity(allocations_per_thread);
for _ in 0..allocations_per_thread {
lsns.push(alloc.allocate());
}
lsns
})
})
.collect();
let mut all_lsns = HashSet::new();
for handle in handles {
let lsns = handle.join().unwrap();
for lsn in lsns {
assert!(all_lsns.insert(lsn.0), "Duplicate LSN detected: {:?}", lsn);
}
}
assert_eq!(all_lsns.len(), num_threads * allocations_per_thread);
assert_eq!(
alloc.current(),
LSN((num_threads * allocations_per_thread + 1) as u64)
);
}
#[test]
fn test_concurrent_batch_allocation_no_overlap() {
let alloc = Arc::new(LsnAllocator::new());
let num_threads = 8;
let batches_per_thread = 100;
let batch_size = 10u64;
let handles: Vec<_> = (0..num_threads)
.map(|_| {
let alloc = Arc::clone(&alloc);
thread::spawn(move || {
let mut ranges = Vec::with_capacity(batches_per_thread);
for _ in 0..batches_per_thread {
ranges.push(alloc.allocate_batch(batch_size));
}
ranges
})
})
.collect();
let mut all_ranges: Vec<(LSN, LSN)> = Vec::new();
for handle in handles {
all_ranges.extend(handle.join().unwrap());
}
all_ranges.sort_by_key(|(first, _)| first.0);
for i in 1..all_ranges.len() {
let (_, prev_last) = all_ranges[i - 1];
let (curr_first, _) = all_ranges[i];
assert!(
curr_first.0 > prev_last.0,
"Overlapping ranges: prev_last={:?}, curr_first={:?}",
prev_last,
curr_first
);
}
}
#[test]
fn test_monotonically_increasing() {
let alloc = LsnAllocator::new();
let mut prev = LSN(0);
for _ in 0..1000 {
let curr = alloc.allocate();
assert!(curr.0 > prev.0, "LSN not monotonically increasing");
prev = curr;
}
}
#[test]
fn test_default_impl() {
let alloc = LsnAllocator::default();
assert_eq!(alloc.current(), LSN(1));
}
}
#[cfg(test)]
mod sentry_tests {
use super::*;
#[test]
#[should_panic(expected = "LSN Allocator Overflow")]
fn test_allocator_overflow_boundary() {
let alloc = LsnAllocator::starting_at(LSN(u64::MAX - 1));
let lsn = alloc.allocate();
assert_eq!(lsn, LSN(u64::MAX - 1));
let _ = alloc.allocate();
}
#[test]
#[should_panic(expected = "LSN Allocator Overflow")]
fn test_batch_allocation_boundary() {
let alloc = LsnAllocator::starting_at(LSN(u64::MAX - 10));
let (start, end) = alloc.allocate_batch(10);
assert_eq!(start, LSN(u64::MAX - 10));
assert_eq!(end, LSN(u64::MAX - 1));
let _ = alloc.allocate();
}
#[test]
fn test_batch_allocation_exact_fit_succeeds() {
let alloc = LsnAllocator::starting_at(LSN(u64::MAX - 10));
let (start, end) = alloc.allocate_batch(10);
assert_eq!(start, LSN(u64::MAX - 10));
assert_eq!(end, LSN(u64::MAX - 1));
}
}