use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
#[cfg(test)]
use std::time::Duration;
use crate::beakid::BeakId;
use crate::config::Config;
use crate::error::{BeakIdError, Result};
use tokistamp::Timestamp;
pub const TIMESTAMP_BITS: u32 = 35;
pub const SEQUENCE_BITS: u32 = 18;
pub const WORKER_BITS: u32 = 10;
pub const MAX_TIMESTAMP: u64 = (1_u64 << TIMESTAMP_BITS) - 1;
pub const MAX_SEQUENCE: u64 = (1_u64 << SEQUENCE_BITS) - 1;
pub const MAX_WORKER_ID: u16 = (1_u16 << WORKER_BITS) - 1;
pub const MAX_VIRTUAL_WINDOWS: u64 = 10;
const WORKER_MASK: u64 = (1_u64 << WORKER_BITS) - 1;
const SEQUENCE_MASK: u64 = (1_u64 << SEQUENCE_BITS) - 1;
const SEQUENCE_SHIFT: u32 = WORKER_BITS;
const TIMESTAMP_SHIFT: u32 = WORKER_BITS + SEQUENCE_BITS;
const TIMESTAMP_MASK: i64 = !((1_i64 << TIMESTAMP_SHIFT) - 1);
const SEQUENCE_INCREMENT: i64 = 1_i64 << WORKER_BITS;
const STATE_BLOCKED: u64 = 1;
const STATE_UPDATING: u64 = 1 << 1;
#[derive(Debug)]
pub struct Generator {
id: AtomicI64,
state: AtomicU64,
epoch: SystemTime,
}
impl Generator {
pub fn from_config(config: Config) -> Result<Self> {
Self::new(config.epoch(), config.worker_id())
}
pub fn new(epoch: SystemTime, worker_id: u16) -> Result<Self> {
if worker_id > MAX_WORKER_ID {
return Err(BeakIdError::WorkerIdOutOfRange(worker_id));
}
let window = current_window(epoch)?;
validate_window(window)?;
Ok(Self {
id: AtomicI64::new(raw_id(window, 0, worker_id)),
state: AtomicU64::new(0),
epoch,
})
}
pub fn next_id(&self) -> Result<BeakId> {
let state = loop {
let state = self.state.load(Ordering::Acquire);
if state & STATE_UPDATING == 0 {
break state;
}
std::hint::spin_loop();
};
if state & STATE_BLOCKED != 0 {
return Err(BeakIdError::Blocked);
}
let id = self.id.fetch_add(SEQUENCE_INCREMENT, Ordering::Relaxed);
let next_id = id.wrapping_add(SEQUENCE_INCREMENT);
if (id & TIMESTAMP_MASK) != (next_id & TIMESTAMP_MASK) {
self.refresh_hint()?;
}
Ok(BeakId::new(id))
}
pub fn refresh_hint(&self) -> Result<u64> {
let real_window = current_window(self.epoch)?;
validate_window(real_window)?;
let mut state = self.state.load(Ordering::Acquire);
loop {
if state & STATE_UPDATING != 0 {
return Ok(real_window);
}
if state & STATE_BLOCKED != 0 {
let timestamp = timestamp_part(real_window);
let id = self.id.load(Ordering::Relaxed);
if id > timestamp {
let delta = ((id - timestamp) as u64) >> TIMESTAMP_SHIFT;
if delta >= 8 {
return Ok(real_window);
}
}
self.state.store(0, Ordering::Release);
return Ok(real_window);
}
match self.state.compare_exchange_weak(
state,
STATE_UPDATING | STATE_BLOCKED,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => break,
Err(current) => state = current,
}
}
self.reconcile_id_with_time(real_window);
Ok(real_window)
}
pub fn real_window_hint(&self) -> Result<u64> {
current_window(self.epoch)
}
#[must_use]
pub fn worker_id(&self) -> u16 {
(self.id.load(Ordering::Acquire) as u64 & WORKER_MASK) as u16
}
#[must_use]
pub const fn epoch(&self) -> SystemTime {
self.epoch
}
pub fn timestamp(&self, id: BeakId) -> Result<Timestamp> {
let window = id.timestamp_window();
let offset_millis = window
.checked_mul(100)
.ok_or(BeakIdError::TimestampOverflow(window))?;
let epoch_millis = self
.epoch
.duration_since(UNIX_EPOCH)
.map_err(|_| BeakIdError::ClockBeforeEpoch)?
.as_millis();
let millis = epoch_millis
.checked_add(offset_millis as u128)
.ok_or(BeakIdError::TimestampOverflow(window))?;
let millis = i64::try_from(millis).map_err(|_| BeakIdError::TimestampOverflow(window))?;
Ok(Timestamp::from_millis(millis))
}
fn reconcile_id_with_time(&self, real_window: u64) {
let timestamp = timestamp_part(real_window);
let mut id = self.id.load(Ordering::Relaxed);
loop {
if id > timestamp {
let delta = ((id - timestamp) as u64) >> TIMESTAMP_SHIFT;
if delta >= MAX_VIRTUAL_WINDOWS {
self.state.store(STATE_BLOCKED, Ordering::Release);
return;
}
break;
}
let reset_id = (id & WORKER_MASK as i64) | timestamp;
match self
.id
.compare_exchange_weak(id, reset_id, Ordering::Release, Ordering::Acquire)
{
Ok(_) => break,
Err(current) => id = current,
}
}
self.state.store(0, Ordering::Release);
}
}
#[must_use]
pub const fn construct_id(timestamp: u64, sequence: u64, worker_id: u16) -> BeakId {
BeakId::new(raw_id(timestamp, sequence, worker_id))
}
#[must_use]
pub const fn decompose_id(id: BeakId) -> (u64, u64, u16) {
id.parts()
}
const fn raw_id(timestamp: u64, sequence: u64, worker_id: u16) -> i64 {
((timestamp << TIMESTAMP_SHIFT)
| ((sequence & SEQUENCE_MASK) << SEQUENCE_SHIFT)
| ((worker_id as u64) & WORKER_MASK)) as i64
}
const fn timestamp_part(window: u64) -> i64 {
(window << TIMESTAMP_SHIFT) as i64
}
fn current_window(epoch: SystemTime) -> Result<u64> {
let elapsed = SystemTime::now()
.duration_since(epoch)
.map_err(|_| BeakIdError::ClockBeforeEpoch)?;
let window = elapsed.as_millis() / 100;
u64::try_from(window).map_err(|_| BeakIdError::TimestampOverflow(u64::MAX))
}
fn validate_window(window: u64) -> Result<()> {
if window > MAX_TIMESTAMP {
Err(BeakIdError::TimestampOverflow(window))
} else {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_generator(window: u64, sequence: u64, worker_id: u16) -> Generator {
test_generator_with_real_window(window, sequence, worker_id, window)
}
fn test_generator_with_real_window(
window: u64,
sequence: u64,
worker_id: u16,
real_window: u64,
) -> Generator {
Generator {
id: AtomicI64::new(raw_id(window, sequence, worker_id)),
state: AtomicU64::new(0),
epoch: SystemTime::now() - Duration::from_millis(real_window * 100),
}
}
#[test]
fn constructs_expected_layout() {
let id = construct_id(0b101, 0b11, 42);
assert_eq!(id.as_i64() >> 63, 0);
assert_eq!(decompose_id(id), (0b101, 0b11, 42));
}
#[test]
fn first_id_uses_sequence_zero() {
let generator = test_generator(100, 0, 7);
let id = generator.next_id().unwrap();
assert_eq!(decompose_id(id), (100, 0, 7));
}
#[test]
fn sequence_overflow_advances_virtual_window() {
let generator = test_generator(100, MAX_SEQUENCE, 9);
let last_in_window = generator.next_id().unwrap();
let first_virtual = generator.next_id().unwrap();
assert_eq!(decompose_id(last_in_window), (100, MAX_SEQUENCE, 9));
assert_eq!(decompose_id(first_virtual), (101, 0, 9));
}
#[test]
fn refresh_resets_to_real_window_when_time_advances() {
let mut generator = test_generator(100, 55, 3);
generator.epoch = SystemTime::now() - Duration::from_millis(101 * 100);
generator.refresh_hint().unwrap();
let id = generator.next_id().unwrap();
assert_eq!(decompose_id(id), (101, 0, 3));
}
#[test]
fn timestamp_uses_generator_epoch() {
let generator = Generator::new(UNIX_EPOCH, 1).unwrap();
let id = construct_id(123, 45, 1);
let timestamp = generator.timestamp(id).unwrap();
assert_eq!(timestamp.as_i64(), 12_300);
}
#[test]
fn refresh_blocks_when_virtual_time_is_exhausted() {
let generator = test_generator_with_real_window(110, 0, 3, 100);
generator.refresh_hint().unwrap();
assert_eq!(generator.next_id(), Err(BeakIdError::Blocked));
}
#[test]
fn blocked_generator_unblocks_after_real_time_catches_up() {
let mut generator = test_generator_with_real_window(110, 0, 3, 100);
generator.refresh_hint().unwrap();
assert_eq!(generator.next_id(), Err(BeakIdError::Blocked));
generator.epoch = SystemTime::now() - Duration::from_millis(102 * 100);
generator.refresh_hint().unwrap();
assert_eq!(generator.next_id(), Err(BeakIdError::Blocked));
generator.epoch = SystemTime::now() - Duration::from_millis(103 * 100);
generator.refresh_hint().unwrap();
assert!(generator.next_id().is_ok());
}
}