1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use core::{future::Future, task::Poll, time::Duration};
use std::time::Instant;

use crate::time::{AsyncLocalSleep, AsyncLocalSleepExt};

pin_project_lite::pin_project! {
  /// The [`AsyncSleep`] implementation for tokio runtime
  #[cfg_attr(docsrs, doc(cfg(all(feature = "std", feature = "tokio"))))]
  #[repr(transparent)]
  pub struct TokioSleep {
    #[pin]
    inner: ::tokio::time::Sleep,
  }
}

impl From<::tokio::time::Sleep> for TokioSleep {
  fn from(sleep: ::tokio::time::Sleep) -> Self {
    Self { inner: sleep }
  }
}

impl From<TokioSleep> for ::tokio::time::Sleep {
  fn from(sleep: TokioSleep) -> Self {
    sleep.inner
  }
}

impl Future for TokioSleep {
  type Output = Instant;

  fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
    let this = self.project();
    let ddl = this.inner.deadline().into();
    match this.inner.poll(cx) {
      Poll::Ready(_) => Poll::Ready(ddl),
      Poll::Pending => Poll::Pending,
    }
  }
}

impl AsyncLocalSleep for TokioSleep {
  fn reset(self: std::pin::Pin<&mut Self>, deadline: Instant) {
    self.project().inner.as_mut().reset(deadline.into())
  }
}

impl AsyncLocalSleepExt for TokioSleep {
  fn sleep_local(after: Duration) -> Self
  where
    Self: Sized,
  {
    Self {
      inner: tokio::time::sleep(after),
    }
  }

  fn sleep_local_until(deadline: Instant) -> Self
  where
    Self: Sized,
  {
    Self {
      inner: tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)),
    }
  }
}

#[cfg(test)]
mod tests {
  use super::TokioSleep;
  use crate::time::{AsyncSleep, AsyncSleepExt};
  use std::time::{Duration, Instant};

  const ORIGINAL: Duration = Duration::from_secs(1);
  const RESET: Duration = Duration::from_secs(2);
  const BOUND: Duration = Duration::from_millis(10);

  #[tokio::test]
  async fn test_object_safe() {
    let _a: Box<dyn AsyncSleep> = Box::new(TokioSleep::sleep(ORIGINAL));
  }

  #[tokio::test]
  async fn test_tokio_sleep() {
    let start = Instant::now();
    let sleep = TokioSleep::sleep(ORIGINAL);
    let ins = sleep.await;
    assert!(ins >= start + ORIGINAL);
    let elapsed = start.elapsed();
    assert!(elapsed >= ORIGINAL && elapsed < ORIGINAL + BOUND);
  }

  #[tokio::test]
  async fn test_tokio_sleep_until() {
    let start = Instant::now();
    let sleep = TokioSleep::sleep_until(start + ORIGINAL);
    let ins = sleep.await;
    assert!(ins >= start + ORIGINAL);
    let elapsed = start.elapsed();
    assert!(elapsed >= ORIGINAL && elapsed < ORIGINAL + BOUND);
  }

  #[tokio::test]
  async fn test_tokio_sleep_reset() {
    let start = Instant::now();
    let sleep = TokioSleep::sleep(ORIGINAL);
    tokio::pin!(sleep);
    sleep.as_mut().reset(Instant::now() + RESET);
    let ins = sleep.await;
    assert!(ins >= start + RESET);
    let elapsed = start.elapsed();
    assert!(elapsed >= RESET && elapsed < RESET + BOUND);
  }

  #[tokio::test]
  async fn test_tokio_sleep_reset2() {
    let start = Instant::now();
    let sleep = TokioSleep::sleep_until(start + ORIGINAL);
    tokio::pin!(sleep);
    sleep.as_mut().reset(Instant::now() + RESET);
    let ins = sleep.await;
    assert!(ins >= start + RESET);
    let elapsed = start.elapsed();
    assert!(elapsed >= RESET && elapsed < RESET + BOUND);
  }
}