use std::time::{
Duration,
SystemTime,
UNIX_EPOCH
};
use std::sync::atomic::Ordering;
#[cfg(target_has_atomic = "64")]
use std::sync::atomic::AtomicU64;
#[cfg(not(target_has_atomic = "64"))]
use portable_atomic::AtomicU64;
use liter::Value;
use serde::{Serialize, Deserialize};
use fixed::types::U32F32;
use fixed::traits::FromFixed;
use super::NodeID;
const ONLY_LS_BIT_16: u64 = 1 << 15;
const MS_48_BITS_SET: u64 = 0xFF_FF_FF_FF_FF_FF_00_00;
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Value)]
pub enum Status {
Active,
Suspicious,
Unreachable,
Indirect,
Quit
}
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PeerState {
pub id: NodeID,
pub clk: Clock,
pub state: Status
}
impl PeerState {
pub fn has_id(&self, id: NodeID) -> bool {
self.id == id
}
pub fn is_active(&self) -> bool {
matches!(self.state, Status::Active)
}
}
impl std::fmt::Debug for PeerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}@{}: {:?}", self.id, self.clk, self.state)
}
}
pub fn elapsed(time: Option<SystemTime>) -> Option<Duration> {
time?
.elapsed()
.map_err(|e| error!("time is {:?} in the future!", e.duration()))
.ok()
}
pub fn remaining(from: Option<SystemTime>, until: Duration)
-> Option<Duration>
{
let elapsed_time = from?
.elapsed()
.map_err(|e| error!(time_error = %e, "elapsed() from the past"))
.ok()?;
until
.checked_sub(elapsed_time)
.filter(|time| !time.is_zero())
}
#[derive(
Clone,
Copy,
Debug,
PartialEq,
Eq,
PartialOrd,
Ord,
Serialize,
Deserialize,
Value
)]
pub struct Clock(u64);
impl Clock {
pub fn null_clock() -> Self { Self(0) }
pub fn get_time(self) -> SystemTime {
let seconds = f64::unwrapped_from_fixed(
U32F32::from_bits(self.0 & MS_48_BITS_SET)
);
UNIX_EPOCH
+ Duration::from_secs_f64(seconds)
}
}
impl std::fmt::Display for Clock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let unix_time = f64::unwrapped_from_fixed(
U32F32::from_bits(self.0 & MS_48_BITS_SET)
);
let counter: u16 = (self.0 & !MS_48_BITS_SET)
.try_into()
.unwrap();
write!(f, "{unix_time}/{counter}")
}
}
#[derive(Debug)]
pub struct AtomicClock(AtomicU64);
impl AtomicClock {
pub const fn null_clock() -> Self { Self(AtomicU64::new(0)) }
pub const fn load(prev: Clock) -> Self { Self(AtomicU64::new(prev.0)) }
fn next_with(&self, mut now: u64) -> Clock {
#[cfg(debug_assertions)]
let mut retries = 0;
let mut prev = self.0.fetch_max(now, Ordering::AcqRel);
while prev >= now {
let (time, counter) = Self::split(prev);
let counter = counter
.checked_add(1)
.expect("too much clock drift / too many HLCs created");
now = time | (counter as u64);
prev = self.0.fetch_max(now, Ordering::AcqRel);
#[cfg(debug_assertions)] {
retries += 1;
assert!(retries <= 100);
}
}
Clock(now)
}
pub fn next(&self) -> Clock {
self.next_with(Self::get_timestamp_now())
}
pub fn prev(&self) -> Clock {
Clock(self.0.load(Ordering::Relaxed))
}
fn get_timestamp_now() -> u64 {
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Unable to get local time!")
.as_secs_f64();
let mut t = U32F32::from_num(time);
let only_bit_16 = t.to_bits() & ONLY_LS_BIT_16;
t += U32F32::from_bits(only_bit_16);
t.to_bits() & MS_48_BITS_SET
}
fn split(clock: u64) -> (u64, u16) {
let counter: u16 = (clock & !MS_48_BITS_SET)
.try_into()
.unwrap();
let time = clock & MS_48_BITS_SET;
(time, counter)
}
}
#[test]
fn atomic_clock() {
let clk = AtomicClock::null_clock();
let tests = [
(clk.next_with(0), "clock is the same"),
(clk.next_with(1), "clock increased"),
(clk.next_with(0), "clock went back"),
(clk.next_with(0), "clock is the same"),
(clk.next(), "clock increased a lot"),
(clk.next_with(0), "clock went back a lot"),
(clk.next_with(0), "clock is the same"),
(clk.next(), "clock increased a lot again"),
(clk.next_with(0), "clock went back a lot again")
];
let mut prev = Clock::null_clock();
for (i, (val, msg)) in tests.into_iter().enumerate() {
assert!(prev < val, "{prev} IS NOT < {val} @ {msg} (test {i})");
prev = val;
}
}