tor-rtmock 0.19.0

Testing mock support for tor-rtcomapt
//! Simple provider of simulated time
//! See [`SimpleMockTimeProvider`]

use std::cmp::Reverse;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant, SystemTime};

use derive_more::AsMut;
use priority_queue::priority_queue::PriorityQueue;
use slotmap::DenseSlotMap;

use tor_rtcompat::CoarseInstant;
use tor_rtcompat::CoarseTimeProvider;
use tor_rtcompat::SleepProvider;

use crate::time_core::MockTimeCore;

/// Simple provider of simulated time
/// Maintains a mocked view of the current [`Instant`] and [`SystemTime`].
/// The simulated time advances only when explicitly instructed,
/// by calling [`.advance()`](Provider::advance).
/// The wallclock time can be warped with
/// [`.jump_wallclock()`](Provider::jump_wallclock),
/// allowing simulation of wall clock non-monotonicity.
/// # Panics and aborts
/// Panics on time under/overflow.
/// May cause an abort if the [`SimpleMockTimeProvider`] implementation contains bugs.
#[derive(Clone, Debug)]
pub struct SimpleMockTimeProvider {
    /// The actual state
    state: Arc<Mutex<State>>,

/// Convenience abbreviation
pub(crate) use SimpleMockTimeProvider as Provider;

/// Identifier of a [`SleepFuture`]
type Id = slotmap::DefaultKey;

/// Future for `sleep`
/// Iff this struct exists, there is an entry for `id` in `prov.futures`.
/// (It might contain `None`.)
pub struct SleepFuture {
    /// Reference to our state
    prov: Provider,

    /// Which `SleepFuture` are we
    id: Id,

/// Mutable state for a [`Provider`]
/// Each sleep ([`Id`], [`SleepFuture`]) is in one of the following states:
/// | state       | [`SleepFuture`]  | `futures`         | `unready`          |
/// |-------------|------------------|------------------|--------------------|
/// | UNPOLLLED   | exists           | present, `None`  | present, `> now`   |
/// | WAITING     | exists           | present, `Some`  | present, `> now`   |
/// | READY       | exists           | present, `None`  | absent             |
/// | DROPPED     | dropped          | absent           | absent             |
#[derive(Debug, AsMut)]
struct State {
    /// Current time (coarse)
    core: MockTimeCore,

    /// Futures; record of every existing [`SleepFuture`], including any `Waker`
    /// Entry exists iff `SleepFuture` exists.
    /// Contains `None` if we haven't polled the future;
    /// `Some` if we have.
    /// We could use a `Vec` or `TiVec`
    /// but using a slotmap is more robust against bugs here.
    futures: DenseSlotMap<Id, Option<Waker>>,

    /// Priority queue
    /// Subset of `futures`.
    /// An entry is present iff the `Instant` is *strictly* after ``,
    /// in which case that's when the future should be woken.
    /// `PriorityQueue` is a max-heap but we want earliest times, hence `Reverse`
    unready: PriorityQueue<Id, Reverse<Instant>>,

/// `Default` makes a `Provider` which starts at whatever the current real time is
impl Default for Provider {
    fn default() -> Self {

impl Provider {
    /// Return a new mock time provider starting at a specified point in time
    pub fn new(now: Instant, wallclock: SystemTime) -> Self {
        let state = State {
            core: MockTimeCore::new(now, wallclock),
            futures: Default::default(),
            unready: Default::default(),
        Provider {
            state: Arc::new(Mutex::new(state)),

    /// Return a new mock time provider starting at the current actual (non-mock) time
    /// Like any [`SimpleMockTimeProvider`], the time is frozen and only changes
    /// due to calls to `advance`.
    pub fn from_real() -> Self {
    /// Return a new mock time provider starting at a specified wallclock time
    /// The monotonic time ([`Instant`]) starts at the current actual (non-mock) time.
    /// (Absolute values of the real monotonic time are not readily
    /// observable or distinguishable from Rust,
    /// nor can a fixed `Instant` be constructed,
    /// so this is usually sufficient for a reproducible test.)
    pub fn from_wallclock(wallclock: SystemTime) -> Self {
        Provider::new(Instant::now(), wallclock)

    /// Advance the simulated time by `d`
    /// This advances both the `Instant` (monotonic time)
    /// and `SystemTime` (wallclock time)
    /// by the same amount.
    /// Will wake sleeping [`SleepFuture`]s, as appropriate.
    /// Note that the tasks which were waiting on those now-expired `SleepFuture`s
    /// will only actually execute when they are next polled.
    /// `advance` does not yield to the executor or poll any futures.
    /// The executor will (presumably) poll those woken tasks, when it regains control.
    /// But the order in which the tasks run will depend on its scheduling policy,
    /// and might be different to the order implied by the futures' timeout values.
    /// To simulate normal time advancement, wakeups, and task activations,
    /// use [`MockExecutor::advance_*()`](crate::MockRuntime).
    pub fn advance(&self, d: Duration) {
        let mut state = self.lock();

    /// Warp the wallclock time
    /// This has no effect on any sleeping futures.
    /// It only affects the return value from [`.wallclock()`](Provider::wallclock).
    pub fn jump_wallclock(&self, new_wallclock: SystemTime) {
        // Really we ought to wake people up, here.
        // But absolutely every Rust API is wrong: none offer a way to sleep until a SystemTime.
        // (There might be some less-portable non-Rust APIs for that.)

    /// When will the next timeout occur?
    /// Returns the duration until the next [`SleepFuture`] should wake up.
    /// Advancing time by at least this amount will wake up that future,
    /// and any others with the same wakeup time.
    /// Will never return `Some(ZERO)`:
    /// any future that is supposed to wake up now (or earlier) has indeed already been woken,
    /// so it is no longer sleeping and isn't included in the calculation.
    pub fn time_until_next_timeout(&self) -> Option<Duration> {
        let state = self.lock();
        let Reverse(until) = state.unready.peek()?.1;
        // The invariant (see `State`) guarantees that entries in `unready` are always `> now`,
        // so we don't whether duration_since would panic or saturate.
        let d = until.duration_since(state.core.instant());

    /// Convenience function to lock the state
    fn lock(&self) -> MutexGuard<'_, State> {
        self.state.lock().expect("simple time state poisoned")

impl SleepProvider for Provider {
    type SleepFuture = SleepFuture;

    fn sleep(&self, d: Duration) -> SleepFuture {
        let mut state = self.lock();
        let until = state.core.instant() + d;

        let id = state.futures.insert(None);
        state.unready.push(id, Reverse(until));

        let fut = SleepFuture {
            prov: self.clone(),

        // This sleep is now UNPOLLLED, except that its time might be `<= now`:

        // Possibly, `until` isn't *strictly* after than ``, since d might be 0.
        // If so, .wake_any() will restore the invariant by immediately waking.

        // This sleep is now UNPOLLED or READY, according to whether duration was 0.


    fn now(&self) -> Instant {
    fn wallclock(&self) -> SystemTime {

impl CoarseTimeProvider for Provider {
    fn now_coarse(&self) -> CoarseInstant {

impl Future for SleepFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        let mut state = self.prov.lock();
        if let Some((_, Reverse(scheduled))) = state.unready.get(& {
            // Presence of this entry implies scheduled > now: we are UNPOLLED or WAITING
            assert!(*scheduled > state.core.instant());
            let waker = Some(cx.waker().clone());
            // Make this be WAITING.  (If we're re-polled, we simply drop any previous waker.)
                .expect("polling futures entry") = waker;
        } else {
            // Absence implies scheduled (no longer stored) <= now: we are READY

impl State {
    /// Restore the invariant for `unready` after `now` has been increased
    /// Ie, ensures that any sleeps which are
    /// WAITING/UNPOLLED except that they are `<= now`,
    /// are moved to state READY.
    fn wake_any(&mut self) {
        loop {
            match self.unready.peek() {
                // Keep picking off entries with scheduled <= now
                Some((_, Reverse(scheduled))) if *scheduled <= self.core.instant() => {
                    let (id, _) = self.unready.pop().expect("vanished");
                    // We can .take() the waker since this can only ever run once
                    // per sleep future (since it happens when we pop it from unready).
                    let futures_entry = self.futures.get_mut(id).expect("stale unready entry");
                    if let Some(waker) = futures_entry.take() {
                _ => break,

impl Drop for SleepFuture {
    fn drop(&mut self) {
        let mut state = self.prov.lock();
        let _: Option<Waker> = state.futures.remove("entry vanished");
        let _: Option<(Id, Reverse<Instant>)> = state.unready.remove(&;
        // Now it is DROPPED.

mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    use super::*;
    use crate::task::MockExecutor;
    use futures::poll;
    use humantime::parse_rfc3339;
    use tor_rtcompat::BlockOn as _;
    use Poll::*;

    fn ms(ms: u64) -> Duration {

    fn run_test<FUT>(f: impl FnOnce(Provider, MockExecutor) -> FUT)
        FUT: Future<Output = ()>,
        let sp = Provider::new(
            Instant::now(), // it would have been nice to make this fixed for the test
        let exec = MockExecutor::new();
        exec.block_on(f(sp, exec.clone()));

    fn simple() {
        run_test(|sp, _exec| async move {
            let n1 =;
            let w1 = sp.wallclock();
            let mut f1 = sp.sleep(ms(500));
            let mut f2 = sp.sleep(ms(1500));
            assert_eq!(poll!(&mut f1), Pending);
            assert_eq!(n1 + ms(200),;
            assert_eq!(w1 + ms(200), sp.wallclock());
            assert_eq!(poll!(&mut f1), Pending);
            assert_eq!(poll!(&mut f2), Pending);
            sp.jump_wallclock(w1 + ms(10_000));
            assert_eq!(n1 + ms(500),;
            assert_eq!(w1 + ms(10_300), sp.wallclock());
            assert_eq!(poll!(&mut f1), Ready(()));
            let mut f0 = sp.sleep(ms(0));
            assert_eq!(poll!(&mut f0), Ready(()));

    fn task() {
        run_test(|sp, exec| async move {
            let st = Arc::new(Mutex::new(0_i8));

            exec.spawn_identified("test task", {
                let st = st.clone();
                let sp = sp.clone();
                async move {
                    *st.lock().unwrap() = 1;
                    *st.lock().unwrap() = 2;
                    *st.lock().unwrap() = 3;

            let st = move || *st.lock().unwrap();

            assert_eq!(st(), 0);
            assert_eq!(st(), 1);
            assert_eq!(sp.time_until_next_timeout(), Some(ms(500)));


            assert_eq!(st(), 1);
            assert_eq!(sp.time_until_next_timeout(), None);
            assert_eq!(st(), 2);
            assert_eq!(sp.time_until_next_timeout(), Some(ms(300)));

            assert_eq!(st(), 2);
            assert_eq!(sp.time_until_next_timeout(), None);
            assert_eq!(sp.time_until_next_timeout(), None);
            assert_eq!(st(), 3);