horloge 0.2.3

Clock and timer traits
Documentation
use crate::{impl_now, impl_sleep};
use std::collections::BTreeMap;
use ::std::time::{Duration, Instant};
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll, Waker};

/// Tokio clock with std types using virtual time.
///
/// This struct is a handle to the internal state of the clock. The [`Clone`]
/// implementation is shallow: cloning this type will create a new handle to
/// the same underlying shared state.
#[derive(Debug, Clone)]
pub struct VirtualTokio1StdClock {
  state: Arc<VirtualTokio1StdClockState>,
}

impl VirtualTokio1StdClock {
  pub fn new(start: Instant) -> Self {
    Self {
      state: Arc::new(VirtualTokio1StdClockState::new(start)),
    }
  }

  pub fn advance_by(&self, d: Duration) -> Instant {
    self.state.advance_by(d)
  }

  /// Updates the clock to the supplied time, if it is more recent
  ///
  /// Returns the time after the update.
  pub fn advance_to(&self, t: Instant) -> Instant {
    self.state.advance_to(t)
  }
}

#[derive(Debug)]
pub struct VirtualTokio1StdClockState {
  start: Instant,
  offset: AtomicU64,
  timer_id: AtomicUsize,
  timers: RwLock<BTreeMap<(Instant, usize), Waker>>,
}

impl VirtualTokio1StdClockState {
  pub fn new(start: Instant) -> Self {
    Self {
      start,
      offset: AtomicU64::new(0),
      timer_id: AtomicUsize::new(0),
      timers: RwLock::new(BTreeMap::new()),
    }
  }

  /// Returns the time after the update
  pub fn advance_by(&self, d: Duration) -> Instant {
    let d: u128 = d.as_nanos();
    let d: u64 = u64::try_from(d).unwrap_or(u64::MAX);
    let prev: u64 = self.offset.load(Ordering::SeqCst);
    let wanted_offset = prev.max(d);
    let prev: u64 = self.offset.fetch_max(wanted_offset, Ordering::SeqCst);
    let current_offset: u64 = prev.max(wanted_offset);
    let now = self.start + Duration::from_nanos(current_offset);
    self.signal_ready(now);
    now
  }

  /// Updates the clock to the supplied time, if it is more recent
  ///
  /// Returns the time after the update.
  pub fn advance_to(&self, t: Instant) -> Instant {
    self.advance_by(t.duration_since(self.start))
  }

  /// Mark all timers with a deadline equal to or earlier than `now` as ready.
  fn signal_ready(&self, now: Instant) {
    let timers = self.timers.read().expect("failed to lock virtual clock timers");
    let mut ready: Vec<Waker> = Vec::new();
    for ((deadline, _id), waker) in timers.iter() {
      if *deadline > now {
        break;
      }
      ready.push(waker.clone());
    }
    drop(timers);
    for waker in ready {
      waker.wake()
    }
  }

  fn now(&self) -> Instant {
    let offset: u64 = self.offset.load(Ordering::SeqCst);
    self.start + Duration::from_nanos(offset)
  }

  /// Create a new [`VirtualTokio1StdTimer`] key for the provided deadline.
  fn sleep(&self, duration: Duration) -> (Instant, usize) {
    let deadline: Instant = self.now() + duration;
    let id = self.timer_id.fetch_add(1, Ordering::SeqCst);
    (deadline, id)
  }

  fn clear(&self, key: (Instant, usize)) -> bool {
    let mut timers = self.timers.write().expect("failed to lock virtual clock timers");
    timers.remove(&key).is_some()
  }

  fn clear_expired(&self, key: (Instant, usize), waker: &Waker) -> bool {
    let now = self.now();
    if now < key.0 {
      let mut timers = self.timers.write().expect("failed to lock virtual clock timers");
      timers.entry(key).or_insert_with(|| waker.clone());
      false
    } else {
      self.clear(key);
      true
    }
  }
}

impl_now! {
  impl Now for VirtualTokio1StdClock {
    type Instant = Instant;

    fn now(&this)-> Self::Instant {
      this.state.now()
    }
  }
}

pub struct VirtualTokio1StdTimer {
  key: (Instant, usize),
  state: Arc<VirtualTokio1StdClockState>,
}

impl Drop for VirtualTokio1StdTimer {
  fn drop(&mut self) {
    self.state.clear(self.key);
  }
}

impl Future for VirtualTokio1StdTimer {
  type Output = ();

  fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    let state = &*self.state;
    let key = self.key;
    if state.clear_expired(key, cx.waker()) {
      Poll::Ready(())
    } else {
      Poll::Pending
    }
  }
}

impl_sleep! {
  impl Sleep<Duration> for VirtualTokio1StdClock {
    type Timer = VirtualTokio1StdTimer;

    fn sleep(&this, duration: Duration) -> Self::Timer {
      let state = Arc::clone(&this.state);
      let key = state.sleep(duration);
      VirtualTokio1StdTimer { key, state }
    }
  }
}

#[cfg(test)]
mod tests {
  use super::*;
  use crate::{StdNow, Now};
  use std::sync::LazyLock;

  const START: LazyLock<Instant> = LazyLock::new(|| {
    Instant::now()
  });
  const ONE_YEAR: Duration = Duration::new(365 * 24 * 60 * 60, 0);

  #[test]
  fn test_now() {
    let start = *START;
    let clock = VirtualTokio1StdClock::new(start);
    use_now(&clock);
    use_std_now(&clock);
  }

  fn use_now<TyNow>(clock: &TyNow)
  where
    TyNow: Now<Instant = Instant>,
  {
    let one_year_ago = Instant::now() - ONE_YEAR;
    let now: Instant = clock.now();
    assert!(now > one_year_ago);
    use_std_now(clock);
  }

  fn use_std_now<TyNow>(clock: &TyNow)
  where
    TyNow: StdNow,
  {
    let one_year_ago = Instant::now() - ONE_YEAR;
    let now: Instant = clock.now_std();
    assert!(now > one_year_ago);
  }
}