use std::sync::atomic::{AtomicU64, Ordering};
#[repr(transparent)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SeqNum(pub u64);
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum OpType {
Delete = 0x00,
Put = 0x01,
}
pub const SEQNUM_MAX: SeqNum = SeqNum((1u64 << 56) - 1);
impl SeqNum {
#[inline]
pub fn next(self) -> Self {
SeqNum(self.0 + 1)
}
}
impl std::fmt::Display for SeqNum {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
pub struct GlobalSeq(AtomicU64);
impl GlobalSeq {
pub fn new(initial: u64) -> Self {
Self(AtomicU64::new(initial))
}
#[inline]
pub fn current(&self) -> SeqNum {
SeqNum(self.0.load(Ordering::Acquire))
}
#[inline]
pub fn allocate(&self) -> SeqNum {
SeqNum(self.0.fetch_add(1, Ordering::AcqRel))
}
#[inline]
pub fn allocate_n(&self, n: u64) -> SeqNum {
SeqNum(self.0.fetch_add(n, Ordering::AcqRel))
}
pub fn set_at_least(&self, val: u64) {
let mut current = self.0.load(Ordering::Acquire);
loop {
if val <= current {
break;
}
match self
.0
.compare_exchange_weak(current, val, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => break,
Err(now) => current = now,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn seqnum_next() {
let s = SeqNum(5);
assert_eq!(s.next(), SeqNum(6));
}
#[test]
fn seqnum_ordering() {
assert!(SeqNum(1) < SeqNum(2));
assert!(SeqNum(100) > SeqNum(99));
assert_eq!(SeqNum(42), SeqNum(42));
}
#[test]
fn seqnum_display() {
assert_eq!(format!("{}", SeqNum(42)), "42");
}
#[test]
fn seqnum_max_is_56_bits() {
assert_eq!(SEQNUM_MAX.0, (1u64 << 56) - 1);
}
#[test]
fn global_seq_allocate_monotone() {
let gs = GlobalSeq::new(1);
let s1 = gs.allocate();
let s2 = gs.allocate();
let s3 = gs.allocate();
assert_eq!(s1, SeqNum(1));
assert_eq!(s2, SeqNum(2));
assert_eq!(s3, SeqNum(3));
assert_eq!(gs.current(), SeqNum(4));
}
#[test]
fn global_seq_allocate_n_reserves_contiguous_range() {
let gs = GlobalSeq::new(10);
let base = gs.allocate_n(4);
assert_eq!(base, SeqNum(10));
assert_eq!(gs.allocate(), SeqNum(14));
assert_eq!(gs.allocate_n(2), SeqNum(15));
assert_eq!(gs.current(), SeqNum(17));
}
#[test]
fn global_seq_allocate_n_zero_is_noop() {
let gs = GlobalSeq::new(5);
let base = gs.allocate_n(0);
assert_eq!(base, SeqNum(5));
assert_eq!(gs.current(), SeqNum(5));
}
#[test]
fn global_seq_set_at_least() {
let gs = GlobalSeq::new(5);
gs.set_at_least(10);
assert_eq!(gs.current(), SeqNum(10));
gs.set_at_least(3);
assert_eq!(gs.current(), SeqNum(10));
}
#[test]
fn global_seq_concurrent_allocate() {
let gs = std::sync::Arc::new(GlobalSeq::new(1));
let mut handles = Vec::new();
for _ in 0..8 {
let gs = gs.clone();
handles.push(thread::spawn(move || {
let mut seqs = Vec::new();
for _ in 0..1000 {
seqs.push(gs.allocate().0);
}
seqs
}));
}
let mut all_seqs: Vec<u64> = Vec::new();
for h in handles {
all_seqs.extend(h.join().unwrap());
}
all_seqs.sort();
all_seqs.dedup();
assert_eq!(all_seqs.len(), 8000);
assert_eq!(*all_seqs.first().unwrap(), 1);
assert_eq!(*all_seqs.last().unwrap(), 8000);
}
#[test]
fn optype_values() {
assert_eq!(OpType::Delete as u8, 0x00);
assert_eq!(OpType::Put as u8, 0x01);
assert!((OpType::Delete as u8) < (OpType::Put as u8));
}
}