use crate::{BeakId, Error, states};
use std::sync::atomic::Ordering;
use std::{sync::atomic::AtomicU64, time::SystemTime};
pub(crate) const TIMESTAMP_SHIFT: u8 = 29;
const TIMESTAMP_MASK: u64 = !((1 << TIMESTAMP_SHIFT) - 1);
const WORKER_ID_SHIFT: u8 = 10;
const WORKER_ID_MASK: u64 = (1 << WORKER_ID_SHIFT) - 1;
const INCR: u64 = 1 << WORKER_ID_SHIFT;
pub struct Generator {
id: AtomicU64,
state: AtomicU64,
epoch: SystemTime,
}
impl Generator {
pub fn try_new(worker_id: u64, epoch: SystemTime) -> Result<Self, Error> {
if worker_id >= 1024 {
return Err(Error::InvalidWorkerId);
}
Ok(Self {
id: AtomicU64::new(worker_id),
state: AtomicU64::new(states::NOT_INITED),
epoch,
})
}
pub fn new(worker_id: u64, epoch: SystemTime) -> Self {
Self::try_new(worker_id, epoch).expect("worker_id out of range (>=1024)")
}
pub fn must_generate(&self) -> BeakId {
loop {
match self.generate() {
Ok(id) => return id,
Err(Error::Blocked) => {
self.update_time();
std::thread::yield_now();
}
Err(_) => unreachable!(),
}
}
}
pub fn generate(&self) -> Result<BeakId, Error> {
let state = loop {
let s = self.state.load(Ordering::Acquire);
if s & states::UPDATING == 0 {
break s;
}
std::hint::spin_loop();
};
if state == states::NOT_INITED {
panic!(
"Run worker with update_time with 100ms interval, or use macro for create generator"
)
}
if state & states::BLOCKED > 0 {
return Err(Error::Blocked);
}
let id = self.id.fetch_add(INCR, Ordering::Relaxed);
let new_id = id + INCR;
if (id & TIMESTAMP_MASK) != (new_id & TIMESTAMP_MASK) {
self.update_time();
}
Ok(BeakId::from(id))
}
pub fn update_time(&self) {
let mut state = self.state.load(Ordering::Acquire);
loop {
if state & states::UPDATING > 0 {
return;
}
if state & states::BLOCKED > 0 {
let timestamp = ((SystemTime::now()
.duration_since(self.epoch)
.expect("Current time before epoch")
.as_millis() as u64)
/ 100)
<< TIMESTAMP_SHIFT;
let id = self.id.load(Ordering::Relaxed);
if id > timestamp {
let delta = (id - timestamp) >> TIMESTAMP_SHIFT;
if delta >= 8 {
return;
}
}
self.state.store(0, Ordering::Release);
return;
}
match self.state.compare_exchange_weak(
state,
states::UPDATING | states::BLOCKED,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => break,
Err(cur) => state = cur,
}
}
let now = SystemTime::now();
let mut id = self.id.load(Ordering::Relaxed);
let timestamp = ((now
.duration_since(self.epoch)
.expect("Current time before epoch")
.as_millis() as u64)
/ 100)
<< TIMESTAMP_SHIFT;
loop {
if id > timestamp {
let delta = (id - timestamp) >> TIMESTAMP_SHIFT;
if delta < 10 {
break;
}
self.state.store(states::BLOCKED, Ordering::Release);
return;
}
let new_id = (id & WORKER_ID_MASK) | timestamp;
match self
.id
.compare_exchange_weak(id, new_id, Ordering::Release, Ordering::Acquire)
{
Ok(_) => break,
Err(cur) => id = cur,
}
}
self.state.store(0, Ordering::Release);
}
pub fn get_created_at(&self, id: BeakId) -> u64 {
id.timestamp()
+ self
.epoch
.duration_since(SystemTime::UNIX_EPOCH)
.expect("epoch before unix epoch")
.as_millis() as u64
}
}