use std::thread::sleep;
use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH};
pub type SequenceGeneratorSystemTimeError = SystemTimeError;
fn timestamp_from_custom_epoch(
custom_epoch: SystemTime,
micros_ten_power: u8,
) -> Result<u64, SequenceGeneratorSystemTimeError> {
let timestamp;
let mut micros_ten_power = micros_ten_power;
if micros_ten_power >= 3 {
timestamp = SystemTime::now().duration_since(custom_epoch)?.as_millis();
micros_ten_power -= 3;
} else {
timestamp = SystemTime::now().duration_since(custom_epoch)?.as_micros();
}
match micros_ten_power {
0 => Ok(timestamp as u64),
_ => Ok((timestamp as u64) / (10 as u64).pow(micros_ten_power.into())),
}
}
#[derive(Debug)]
pub struct SequenceProperties {
pub unused_bits: u8,
pub timestamp_bits: u8,
pub node_id_bits: u8,
pub sequence_bits: u8,
pub custom_epoch: SystemTime,
current_timestamp: Option<u64>,
last_timestamp: Option<u64>,
pub micros_ten_power: u8,
pub node_id: u16,
pub sequence: u16,
pub max_sequence: u16,
pub backoff_cooldown_start_ns: u64,
partial_cached_id: Option<u64>,
}
impl SequenceProperties {
pub fn new(
custom_epoch: SystemTime,
node_id_bits: u8,
node_id: u16,
sequence_bits: u8,
micros_ten_power: u8,
unused_bits: u8,
backoff_cooldown_start_ns: u64,
) -> Self {
let timestamp_bits = (64 as u8)
.checked_sub(sequence_bits)
.expect(&format!(
"Error: Sequence bits is too large '{}'", sequence_bits))
.checked_sub(node_id_bits)
.expect(&format!(
"Error: Sum of bits is too large, maximum value 64. Node ID bits '{}', Sequence bits '{}'",
node_id_bits, sequence_bits
))
.checked_sub(unused_bits)
.expect(&format!(
"Error: Sum of bits is too large, maximum value 64. Unused bits '{}', Sequence bits '{}', Node ID bits '{}'",
unused_bits, sequence_bits, node_id_bits
));
SequenceProperties {
custom_epoch,
timestamp_bits,
node_id_bits,
sequence_bits,
current_timestamp: None,
last_timestamp: None,
micros_ten_power,
node_id,
unused_bits,
sequence: 0,
max_sequence: (2 as u16).pow(sequence_bits.into()),
backoff_cooldown_start_ns,
partial_cached_id: None,
}
}
}
pub fn generate_id(
properties: &mut SequenceProperties,
) -> Result<u64, SequenceGeneratorSystemTimeError> {
properties.last_timestamp = properties.current_timestamp;
properties.current_timestamp = Some(timestamp_from_custom_epoch(
properties.custom_epoch,
properties.micros_ten_power,
)?);
if let Some(last_timestamp) = properties.last_timestamp {
let current_timestamp = properties.current_timestamp.unwrap();
if current_timestamp < last_timestamp {
println!("Error: System Clock moved backwards. Current timestamp '{}' is earlier than last registered '{}'.",
current_timestamp, last_timestamp);
if properties.sequence == properties.max_sequence {
wait_next_timestamp(
last_timestamp,
properties.custom_epoch,
properties.micros_ten_power,
properties.backoff_cooldown_start_ns,
)?;
properties.sequence = 0;
} else {
wait_until_last_timestamp(
last_timestamp,
properties.custom_epoch,
properties.micros_ten_power,
properties.backoff_cooldown_start_ns,
)?;
}
properties.current_timestamp = Some(timestamp_from_custom_epoch(
properties.custom_epoch,
properties.micros_ten_power,
)?);
} else if properties.current_timestamp.unwrap() != last_timestamp {
properties.sequence = 0;
}
}
let new_id = to_id(properties);
properties.sequence += 1;
if properties.sequence == properties.max_sequence {
wait_next_timestamp(
properties.last_timestamp.unwrap(),
properties.custom_epoch,
properties.micros_ten_power,
properties.backoff_cooldown_start_ns,
)?;
properties.current_timestamp = Some(timestamp_from_custom_epoch(
properties.custom_epoch,
properties.micros_ten_power,
)?);
properties.sequence = 0;
}
Ok(new_id)
}
fn wait_next_timestamp(
last_timestamp: u64,
custom_epoch: SystemTime,
micros_ten_power: u8,
backoff_cooldown_start_ns: u64,
) -> Result<(), SequenceGeneratorSystemTimeError> {
let mut current_timestamp = timestamp_from_custom_epoch(custom_epoch, micros_ten_power)?;
let backoff_cooldown_ns: u64 = backoff_cooldown_start_ns;
while current_timestamp <= last_timestamp {
sleep(Duration::from_nanos(backoff_cooldown_ns));
current_timestamp = timestamp_from_custom_epoch(custom_epoch, micros_ten_power)?;
backoff_cooldown_ns
.checked_add(backoff_cooldown_ns)
.expect(&format!(
"Error: Cannot double backoff cooldown, maximum value reached '{}'",
backoff_cooldown_ns
));
}
Ok(())
}
fn wait_until_last_timestamp(
last_timestamp: u64,
custom_epoch: SystemTime,
micros_ten_power: u8,
backoff_cooldown_start_ns: u64,
) -> Result<(), SequenceGeneratorSystemTimeError> {
let mut current_timestamp = timestamp_from_custom_epoch(custom_epoch, micros_ten_power)?;
let backoff_cooldown_ns: u64 = backoff_cooldown_start_ns;
while current_timestamp < last_timestamp {
sleep(Duration::from_nanos(backoff_cooldown_ns));
current_timestamp = timestamp_from_custom_epoch(custom_epoch, micros_ten_power)?;
backoff_cooldown_ns
.checked_add(backoff_cooldown_ns)
.expect(&format!(
"Error: Cannot double backoff cooldown, maximum value reached '{}'",
backoff_cooldown_ns
));
}
Ok(())
}
fn to_id_cached(properties: &mut SequenceProperties) -> u64 {
let mut id = properties.partial_cached_id.unwrap();
id |= ((properties.sequence as u64) << properties.node_id_bits) as u64;
id
}
fn to_id(properties: &mut SequenceProperties) -> u64 {
if properties.sequence == 0 {
cache_partial_id(properties);
}
to_id_cached(properties)
}
fn cache_partial_id(properties: &mut SequenceProperties) {
let timestamp_shift_bits = properties.node_id_bits + properties.sequence_bits;
let mut id = properties.current_timestamp.unwrap() << timestamp_shift_bits;
id |= properties.node_id as u64;
properties.partial_cached_id = Some(id);
}
pub fn decode_id_unix_epoch_micros(id: u64, properties: &SequenceProperties) -> u64 {
let id_timestamp_custom_epoch = (id << (properties.unused_bits))
>> (properties.node_id_bits + properties.sequence_bits + properties.unused_bits);
let timestamp_micros =
id_timestamp_custom_epoch * (10 as u64).pow(properties.micros_ten_power as u32);
properties
.custom_epoch
.duration_since(UNIX_EPOCH)
.expect(&format!(
"Error: Could not calculate difference between timestamp decoded from ID and Unix epoch."
))
.checked_add(Duration::from_micros(timestamp_micros))
.expect(&format!(
"Error: Could not add the timestamp decoded from ID to the provided custom epoch."
))
.as_micros() as u64
}
pub fn decode_node_id(id: u64, properties: &SequenceProperties) -> u16 {
((id << (properties.unused_bits + properties.timestamp_bits + properties.sequence_bits))
>> (properties.sequence_bits + properties.timestamp_bits + properties.unused_bits))
as u16
}
pub fn decode_sequence_id(id: u64, properties: &SequenceProperties) -> u16 {
((id << (properties.unused_bits + properties.timestamp_bits))
>> (properties.unused_bits + properties.timestamp_bits + properties.node_id_bits))
as u16
}
#[cfg(test)]
mod tests {
#[test]
fn timestamp_from() {
use super::*;
let time_now = SystemTime::now();
let millis_start = time_now
.duration_since(UNIX_EPOCH)
.expect("Error: Failed to get current time as duration from epoch.")
.as_millis();
sleep(Duration::from_millis(50));
let millis_after = timestamp_from_custom_epoch(UNIX_EPOCH, 3).unwrap_or_else(
|error| {
panic!(format!(
"SequenceGeneratorSystemTimeError: Failed to get timestamp from custom epoch {:?}, difference {:?}",
UNIX_EPOCH, (error).duration()
))
});
let substracted_times = millis_after.checked_sub(millis_start as u64).unwrap();
println!("Too small time difference between times calculated\nfrom UNIX_EPOCH using independent functions.\n\nEpoch System Time - Time Difference w/Epoch = {} ms,\nexpected greater or equals than sleep interval 50 ms.\n", substracted_times);
assert!(substracted_times >= 50);
assert!((millis_after.checked_sub(millis_start as u64).unwrap()) < 90);
let custom_epoch = UNIX_EPOCH
.checked_add(Duration::from_millis(millis_start as u64))
.expect("Error: Failed to create custom epoch.");
let tenths_millis_custom_epoch_time = timestamp_from_custom_epoch(custom_epoch, 2).unwrap_or_else(
|error| {
panic!(format!(
"SequenceGeneratorSystemTimeError: Failed to get current timestamp from custom epoch {:?}, difference {:?}",
UNIX_EPOCH, (error).duration()
))
});
sleep(Duration::from_millis(2));
let power_two: u32 = 2;
let tenths_millis_elapsed_time = (time_now.elapsed().map_or_else(
|error| {
panic!(format!(
"SequenceGeneratorSystemTimeError: Failed to get elapsed time, difference {:?}",
(error).duration()
))
},
|duration| duration.as_micros() as u64,
)) / (10 as u64).pow(power_two);
let substracted_times = tenths_millis_elapsed_time
.checked_sub(tenths_millis_custom_epoch_time)
.unwrap();
println!("Too high time difference between calculated time from\nCustom Epoch set at test start and actual elapsed\ntime since the test started.\n\nElapsed Time - Calculated Time Custom Epoch = {} mcs,\nexpected under 100 mcs\n\nPlease note that Pareto distribution applies and it\nis impossible to ensure a high enough difference for\nthe test not to fail on ocassion.\n\nReview only after ensuring repeated failures.\n", substracted_times);
assert!(substracted_times < 200);
}
#[test]
fn wait_until() {
use super::*;
let calculated_time_after_50ms: u64 = SystemTime::now()
.checked_add(Duration::from_millis(50))
.unwrap()
.duration_since(UNIX_EPOCH)
.expect("Error: Failed to get duration from epoch of timestamp 50ms into the future.")
.as_millis() as u64;
wait_until_last_timestamp(calculated_time_after_50ms, UNIX_EPOCH, 3, 1500).expect(
&format!(
"SequenceGeneratorSystemTimeError: Couldn't wait until timestamp '{}' with custom epoch '{:?}'",
calculated_time_after_50ms, UNIX_EPOCH
),
);
sleep(Duration::from_millis(1));
let time_after_50ms: u64 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Error: Failed to get current time as duration from epoch.")
.as_millis() as u64;
let substracted_times = time_after_50ms
.checked_sub(calculated_time_after_50ms)
.unwrap();
assert!(substracted_times > 0);
println!("Too high time difference while waiting for last timestamp\nafter clock moved backwards\n\nTime Calculated - Actual Time = {} ms, expected under 35 ms\n\nPlease note that Pareto distribution applies and it\nis impossible to ensure a high enough difference for\nthe test not to fail on ocassion.\n\nReview only after ensuring repeated failures.\n", substracted_times);
assert!(substracted_times < 35);
}
#[test]
fn wait_next() {
use super::*;
let calculated_time_after_10ms: u64 = SystemTime::now()
.checked_add(Duration::from_millis(10))
.expect("Error: Failed to 10ms to current timestamp.")
.duration_since(UNIX_EPOCH)
.expect("Error: Failed to get duration from epoch of timestamp 10ms into the future.")
.as_millis() as u64;
wait_next_timestamp(calculated_time_after_10ms, UNIX_EPOCH, 3, 1500).expect(&format!(
"SequenceGeneratorSystemTimeError: Couldn't wait until timestamp '{}' with custom epoch '{:?}'",
calculated_time_after_10ms, UNIX_EPOCH
));
let time_after_11ms: u64 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Error: Failed to get current time as duration from epoch.")
.as_millis() as u64;
let substracted_times = time_after_11ms
.checked_sub(calculated_time_after_10ms)
.unwrap();
assert!(substracted_times > 0);
println!("Too high time difference while waiting for next timestamp\n\nNext timestamp - Last Timestamp = {} ms, expected under 35 ms\n\nPlease note that Pareto distribution applies and it\nis impossible to ensure a high enough difference for\nthe test not to fail on ocassion.\n\nReview only after ensuring repeated failures.\n", substracted_times);
assert!(substracted_times < 35);
}
#[test]
fn gen_id() {
use super::*;
use rand::Rng;
let custom_epoch = UNIX_EPOCH;
let node_id_bits = 16;
let unused_bits = 7;
let sequence_bits = 2;
let micros_ten_power = 4;
let mut rng = rand::thread_rng();
let node_id = rng.gen_range(0..65535);
let backoff_cooldown_start_ns = 1_000_000;
let mut properties = SequenceProperties::new(
custom_epoch,
node_id_bits,
node_id,
sequence_bits,
micros_ten_power,
unused_bits,
backoff_cooldown_start_ns,
);
let last_timestamp = (SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Error: Failed to get current time as duration from epoch.")
.as_millis()
/ 10) as u64;
let mut vector_ids: Vec<u64> = vec![0; 5];
wait_next_timestamp(
last_timestamp,
UNIX_EPOCH,
micros_ten_power,
backoff_cooldown_start_ns,
)
.expect(&format!(
"SequenceGeneratorSystemTimeError: Couldn't wait until timestamp '{}' with custom epoch '{:?}'",
last_timestamp, UNIX_EPOCH
));
for element in vector_ids.iter_mut() {
*element = generate_id(&mut properties).unwrap_or_else(
|error| {
panic!(format!(
"SequenceGeneratorSystemTimeError: Failed to get timestamp from custom epoch {:?}, difference {:?}",
UNIX_EPOCH, (error).duration()
))
});
}
let decoded_timestamp = decode_id_unix_epoch_micros(vector_ids[0], &properties);
assert!(((decoded_timestamp / 10_000) - (last_timestamp + 1)) < 15);
let mut decoded_node_id = decode_node_id(vector_ids[0], &properties);
assert_eq!(decoded_node_id, node_id);
let mut decoded_seq_id = decode_sequence_id(vector_ids[0], &properties);
assert_eq!(decoded_seq_id, 0);
for index in 1..5 {
decoded_seq_id = decode_sequence_id(vector_ids[index], &properties);
assert_eq!(decoded_seq_id, (index as u16) % 4);
decoded_node_id = decode_node_id(vector_ids[index], &properties);
assert_eq!(decoded_node_id, node_id);
}
assert!(properties.current_timestamp.unwrap() - last_timestamp < 15);
}
}