use std::borrow::BorrowMut;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::thread::sleep;
use std::time::{Duration, SystemTime, SystemTimeError};
pub type SequenceGeneratorSystemTimeError = SystemTimeError;
fn timestamp_from_custom_epoch(
custom_epoch: SystemTime,
micros_ten_power: u8,
) -> Result<u64, SequenceGeneratorSystemTimeError> {
let timestamp: u128 = SystemTime::now().duration_since(custom_epoch)?.as_micros();
let micros_power_adjustment_factor: u64 = 10_u64.pow(micros_ten_power.into());
let calculated_timestamp = (timestamp as u64) / micros_power_adjustment_factor;
Ok(calculated_timestamp)
}
#[derive(Debug)]
pub struct SequenceProperties {
pub unused_bits: u8,
pub timestamp_bits: u8,
pub node_id_bits: u8,
pub sequence_bits: u8,
pub custom_epoch: SystemTime,
current_timestamp: Rc<RefCell<Option<u64>>>,
last_timestamp: Rc<RefCell<Option<u64>>>,
pub micros_ten_power: u8,
pub node_id: u16,
pub sequence: Cell<u16>,
pub max_sequence: u16,
pub backoff_cooldown_start_ns: u64,
partial_cached_id: Rc<RefCell<Option<u64>>>,
}
impl SequenceProperties {
pub fn new(
custom_epoch: SystemTime,
node_id_bits: u8,
node_id: u16,
sequence_bits: u8,
micros_ten_power: u8,
unused_bits: u8,
backoff_cooldown_start_ns: u64,
) -> Self {
if unused_bits > 7 {
panic!(
"ERROR: unused_bits '{}' is larger than the maximum value of 7.",
unused_bits
)
}
if sequence_bits > 16 {
panic!(
"ERROR: sequence_bits '{}' is larger than the maximum value of 16.",
sequence_bits
)
}
if sequence_bits == 0 {
panic!(
"ERROR: sequence_bits '{}' must be larger or equal than 1.",
sequence_bits
)
}
if node_id_bits > 16 {
panic!(
"ERROR: node_id_bits '{}' is larger than the maximum value of 16.",
node_id_bits
)
}
if node_id_bits == 0 {
panic!(
"ERROR: node_id_bits '{}' must be larger or equal than 1.",
node_id_bits
)
}
let timestamp_bits = (64_u8)
.checked_sub(sequence_bits)
.unwrap_or_else(|| {panic!(
"ERROR: Sum of bits is too large, maximum value 64. Sequence bits '{}'", sequence_bits)})
.checked_sub(node_id_bits)
.unwrap_or_else(|| {panic!(
"ERROR: Sum of bits is too large, maximum value 64. Node ID bits '{}', Sequence bits '{}'",
node_id_bits, sequence_bits
)})
.checked_sub(unused_bits)
.unwrap_or_else(|| {panic!(
"ERROR: Sum of bits is too large, maximum value 64. Unused bits '{}', Sequence bits '{}', Node ID bits '{}'",
unused_bits, sequence_bits, node_id_bits
)});
SequenceProperties {
custom_epoch,
timestamp_bits,
node_id_bits,
sequence_bits,
micros_ten_power,
node_id,
unused_bits,
sequence: Cell::new(0_u16),
current_timestamp: Rc::new(RefCell::new(None)),
last_timestamp: Rc::new(RefCell::new(None)),
max_sequence: (2_u16).pow(sequence_bits.into()),
backoff_cooldown_start_ns,
partial_cached_id: Rc::new(RefCell::new(None)),
}
}
pub fn set_last_timestamp(&self, timestamp: &mut Option<u64>) {
if let Some(last_timestamp) = timestamp.take() {
let _ = self
.last_timestamp
.as_ref()
.borrow_mut()
.insert(last_timestamp);
}
}
pub fn set_current_timestamp(&self) {
let _ = self.current_timestamp.as_ref().borrow_mut().insert(timestamp_from_custom_epoch(
self.custom_epoch,
self.micros_ten_power,
).unwrap_or_else(|error| {panic!("ERROR: Could not calculate current timestamp from custom epoch {:?} and micros power of {:?}. Error: {}",
self.custom_epoch, self.micros_ten_power, error)}));
}
pub fn set_partial_cached_id(&self, cached_id: &mut Option<u64>) {
let _ = self
.partial_cached_id
.as_ref()
.borrow_mut()
.insert(cached_id.take().unwrap());
}
}
pub fn generate_id(
properties: &SequenceProperties,
) -> Result<u64, SequenceGeneratorSystemTimeError> {
properties.set_last_timestamp(&mut properties.current_timestamp.clone().take().take());
properties.set_current_timestamp();
if let Some(last_timestamp) = properties.last_timestamp.take() {
let current_timestamp = properties.current_timestamp.borrow().unwrap();
if current_timestamp < last_timestamp {
println!("ERROR: System Clock moved backwards. Current timestamp '{}' is earlier than last registered '{}'.",
current_timestamp, last_timestamp);
if properties.sequence.get() == properties.max_sequence {
wait_next_timestamp(
last_timestamp,
properties.custom_epoch,
properties.micros_ten_power,
properties.backoff_cooldown_start_ns,
)?;
properties.sequence.set(0);
} else {
wait_until_last_timestamp(
last_timestamp,
properties.custom_epoch,
properties.micros_ten_power,
properties.backoff_cooldown_start_ns,
)?;
}
properties.set_current_timestamp();
} else if properties.current_timestamp.borrow().unwrap() != last_timestamp {
properties.sequence.set(0);
}
}
let new_id = to_id(properties);
let new_sequence = properties.sequence.get() + 1;
properties.sequence.set(new_sequence);
if new_sequence == properties.max_sequence {
wait_next_timestamp(
properties.current_timestamp.borrow().unwrap(),
properties.custom_epoch,
properties.micros_ten_power,
properties.backoff_cooldown_start_ns,
)?;
properties.set_current_timestamp();
properties.sequence.set(0);
}
Ok(new_id)
}
fn wait_next_timestamp(
last_timestamp: u64,
custom_epoch: SystemTime,
micros_ten_power: u8,
backoff_cooldown_start_ns: u64,
) -> Result<(), SequenceGeneratorSystemTimeError> {
let mut current_timestamp = timestamp_from_custom_epoch(custom_epoch, micros_ten_power)?;
let backoff_cooldown_ns: u64 = backoff_cooldown_start_ns;
while current_timestamp <= last_timestamp {
sleep(Duration::from_nanos(backoff_cooldown_ns));
current_timestamp = timestamp_from_custom_epoch(custom_epoch, micros_ten_power)?;
backoff_cooldown_ns
.checked_add(backoff_cooldown_ns)
.unwrap_or_else(|| {
panic!(
"ERROR: Cannot double backoff cooldown, maximum value reached '{}'",
backoff_cooldown_ns
)
});
}
Ok(())
}
fn wait_until_last_timestamp(
last_timestamp: u64,
custom_epoch: SystemTime,
micros_ten_power: u8,
backoff_cooldown_start_ns: u64,
) -> Result<(), SequenceGeneratorSystemTimeError> {
let mut current_timestamp = timestamp_from_custom_epoch(custom_epoch, micros_ten_power)?;
let backoff_cooldown_ns: u64 = backoff_cooldown_start_ns;
while current_timestamp < last_timestamp {
sleep(Duration::from_nanos(backoff_cooldown_ns));
current_timestamp = timestamp_from_custom_epoch(custom_epoch, micros_ten_power)?;
backoff_cooldown_ns
.checked_add(backoff_cooldown_ns)
.unwrap_or_else(|| {
panic!(
"ERROR: Cannot double backoff cooldown, maximum value reached '{}'",
backoff_cooldown_ns
)
});
}
Ok(())
}
fn to_id_cached(properties: &SequenceProperties) -> u64 {
let mut id = properties.partial_cached_id.as_ref().borrow().unwrap();
id |= (properties.sequence.get() as u64) << properties.node_id_bits;
id
}
fn to_id(properties: &SequenceProperties) -> u64 {
if properties.sequence.get() == 0 {
cache_partial_id(properties);
}
to_id_cached(properties)
}
fn cache_partial_id(properties: &SequenceProperties) {
let timestamp_shift_bits = properties.node_id_bits + properties.sequence_bits;
let mut id = properties.current_timestamp.borrow().unwrap() << timestamp_shift_bits;
id |= properties.node_id as u64;
properties.set_partial_cached_id(Some(id).borrow_mut());
}
pub fn decode_timestamp_micros(id: u64, properties: &SequenceProperties) -> u64 {
let id_timestamp_custom_epoch = (id << (properties.unused_bits))
>> (properties.node_id_bits + properties.sequence_bits + properties.unused_bits);
let timestamp_micros =
id_timestamp_custom_epoch * (10_u64).pow(properties.micros_ten_power as u32);
properties
.custom_epoch
.duration_since(properties.custom_epoch)
.unwrap_or_else(|_| {panic!("ERROR: Could not calculate difference between timestamp decoded from ID and Unix epoch.")})
.checked_add(Duration::from_micros(timestamp_micros))
.unwrap_or_else(|| {panic!("ERROR: Could not add the timestamp decoded from ID to the provided custom epoch.")})
.as_micros() as u64
}
pub fn decode_node_id(id: u64, properties: &SequenceProperties) -> u16 {
((id << (properties.unused_bits + properties.timestamp_bits + properties.sequence_bits))
>> (properties.sequence_bits + properties.timestamp_bits + properties.unused_bits))
as u16
}
pub fn decode_sequence_id(id: u64, properties: &SequenceProperties) -> u16 {
((id << (properties.unused_bits + properties.timestamp_bits))
>> (properties.unused_bits + properties.timestamp_bits + properties.node_id_bits))
as u16
}
#[cfg(test)]
mod tests {
#[test]
fn timestamp_from() {
use super::*;
use std::time::UNIX_EPOCH;
let time_now = SystemTime::now();
let millis_start = time_now
.duration_since(UNIX_EPOCH)
.expect("ERROR: Failed to get current time as duration from epoch.")
.as_millis();
sleep(Duration::from_millis(50));
let millis_after = timestamp_from_custom_epoch(UNIX_EPOCH, 3).unwrap_or_else(
|error| {
panic!(
"SequenceGeneratorSystemTimeError: Failed to get timestamp from custom epoch {:?}, difference {:?}",
UNIX_EPOCH, (error).duration()
)
});
let substracted_times = millis_after.checked_sub(millis_start as u64).unwrap();
println!("Too small time difference between times calculated\nfrom UNIX_EPOCH using independent functions.\n\nEpoch System Time - Time Difference w/Epoch = {} ms,\nexpected greater or equals than sleep interval 50 ms.\n", substracted_times);
assert!(substracted_times >= 50);
assert!((millis_after.checked_sub(millis_start as u64).unwrap()) < 90);
let custom_epoch = UNIX_EPOCH
.checked_add(Duration::from_millis(millis_start as u64))
.expect("ERROR: Failed to create custom epoch.");
let tenths_millis_custom_epoch_time = timestamp_from_custom_epoch(custom_epoch, 2).unwrap_or_else(
|error| {
panic!(
"SequenceGeneratorSystemTimeError: Failed to get current timestamp from custom epoch {:?}, difference {:?}",
UNIX_EPOCH, (error).duration()
)
});
sleep(Duration::from_millis(2));
let power_two: u32 = 2;
let tenths_millis_elapsed_time = (time_now.elapsed().map_or_else(
|error| {
panic!(
"SequenceGeneratorSystemTimeError: Failed to get elapsed time, difference {:?}",
(error).duration()
)
},
|duration| duration.as_micros() as u64,
)) / (10_u64).pow(power_two);
let substracted_times = tenths_millis_elapsed_time
.checked_sub(tenths_millis_custom_epoch_time)
.unwrap();
println!("Too high time difference between calculated time from\nCustom Epoch set at test start and actual elapsed\ntime since the test started.\n\nElapsed Time - Calculated Time Custom Epoch = {} mcs,\nexpected under 100 mcs\n\nPlease note that Pareto distribution applies and it\nis impossible to ensure a high enough difference for\nthe test not to fail on ocassion.\n\nReview only after ensuring repeated failures.\n", substracted_times);
assert!(substracted_times < 200);
}
#[test]
fn wait_until() {
use super::*;
use std::time::UNIX_EPOCH;
let calculated_time_after_50ms: u64 = SystemTime::now()
.checked_add(Duration::from_millis(50))
.unwrap()
.duration_since(UNIX_EPOCH)
.expect("ERROR: Failed to get duration from epoch of timestamp 50ms into the future.")
.as_millis() as u64;
wait_until_last_timestamp(calculated_time_after_50ms, UNIX_EPOCH, 3, 1500).expect(
&format!(
"SequenceGeneratorSystemTimeError: Couldn't wait until timestamp '{}' with custom epoch '{:?}'",
calculated_time_after_50ms, UNIX_EPOCH
),
);
sleep(Duration::from_millis(1));
let time_after_50ms: u64 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("ERROR: Failed to get current time as duration from epoch.")
.as_millis() as u64;
let substracted_times = time_after_50ms
.checked_sub(calculated_time_after_50ms)
.unwrap();
assert!(substracted_times > 0);
println!("Too high time difference while waiting for last timestamp\nafter clock moved backwards\n\nTime Calculated - Actual Time = {} ms, expected under 35 ms\n\nPlease note that Pareto distribution applies and it\nis impossible to ensure a high enough difference for\nthe test not to fail on ocassion.\n\nReview only after ensuring repeated failures.\n", substracted_times);
assert!(substracted_times < 35);
}
#[test]
fn wait_next() {
use super::*;
use std::time::UNIX_EPOCH;
let calculated_time_after_10ms: u64 = SystemTime::now()
.checked_add(Duration::from_millis(10))
.expect("ERROR: Failed to 10ms to current timestamp.")
.duration_since(UNIX_EPOCH)
.expect("ERROR: Failed to get duration from epoch of timestamp 10ms into the future.")
.as_millis() as u64;
wait_next_timestamp(calculated_time_after_10ms, UNIX_EPOCH, 3, 1500).unwrap_or_else(|_| {panic!(
"SequenceGeneratorSystemTimeError: Couldn't wait until timestamp '{}' with custom epoch '{:?}'",
calculated_time_after_10ms, UNIX_EPOCH
)});
let time_after_11ms: u64 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("ERROR: Failed to get current time as duration from epoch.")
.as_millis() as u64;
let substracted_times = time_after_11ms
.checked_sub(calculated_time_after_10ms)
.unwrap();
assert!(substracted_times > 0);
println!("Too high time difference while waiting for next timestamp\n\nNext timestamp - Last Timestamp = {} ms, expected under 35 ms\n\nPlease note that Pareto distribution applies and it\nis impossible to ensure a high enough difference for\nthe test not to fail on ocassion.\n\nReview only after ensuring repeated failures.\n", substracted_times);
assert!(substracted_times < 35);
}
#[test]
fn gen_id() {
use super::*;
use rand::Rng;
let custom_epoch = SystemTime::now();
let node_id_bits = 16;
let unused_bits = 7;
let sequence_bits = 2;
let micros_ten_power = 4;
let mut rng = rand::thread_rng();
let node_id = rng.gen_range(0..65535);
let backoff_cooldown_start_ns = 1_000_000;
let last_timestamp = (SystemTime::now()
.duration_since(custom_epoch)
.expect("ERROR: Failed to get current time as duration from epoch.")
.as_millis()
/ 10) as u64;
wait_next_timestamp(
last_timestamp,
custom_epoch,
micros_ten_power,
backoff_cooldown_start_ns,
)
.unwrap_or_else(|_| {panic!(
"SequenceGeneratorSystemTimeError: Couldn't wait until timestamp '{}' with custom epoch '{:?}'",
last_timestamp, custom_epoch
)});
let mut vector_ids: Vec<u64> = vec![0; 5];
let properties = SequenceProperties::new(
custom_epoch,
node_id_bits,
node_id,
sequence_bits,
micros_ten_power,
unused_bits,
backoff_cooldown_start_ns,
);
for element in vector_ids.iter_mut() {
*element = generate_id(&properties).unwrap_or_else(
|error| {
panic!(
"SequenceGeneratorSystemTimeError: Failed to get timestamp from custom epoch {:?}, difference {:?}",
custom_epoch, (error).duration()
)
});
}
let decoded_timestamp = decode_timestamp_micros(vector_ids[0], &properties);
assert!(((decoded_timestamp / 10_000) - (last_timestamp + 1)) < 15);
let mut decoded_node_id = decode_node_id(vector_ids[0], &properties);
assert_eq!(decoded_node_id, node_id);
let mut decoded_seq_id = decode_sequence_id(vector_ids[0], &properties);
assert_eq!(decoded_seq_id, 0);
for index in 1..5 {
decoded_seq_id = decode_sequence_id(vector_ids[index], &properties);
assert_eq!(decoded_seq_id, (index as u16) % 4);
decoded_node_id = decode_node_id(vector_ids[index], &properties);
assert_eq!(decoded_node_id, node_id);
}
assert!(properties.current_timestamp.borrow().unwrap() - last_timestamp < 15);
}
}