1#![warn(clippy::pedantic)]
15#![allow(clippy::must_use_candidate)]
16#![allow(clippy::missing_panics_doc)]
17
18#[doc(no_inline)]
19pub use std::time::{Duration, Instant};
20
21use std::future::{Future, IntoFuture};
22use std::pin::Pin;
23use std::task::{Context, Poll};
24
25use pin_project_lite::pin_project;
26
27mod utils;
28
29cfg_if::cfg_if! {
30 if #[cfg(any(target_os = "linux", target_os = "android"))] {
31 mod timerfd;
32 use timerfd::Timer;
33 } else if #[cfg(any(target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", target_os = "dragonfly", target_vendor = "apple"))] {
34 mod kqueue;
35 use kqueue::Timer;
36 } else if #[cfg(windows)] {
37 mod waitable;
38 use waitable::Timer;
39 } else {
40 compile_error!("unsupported platform");
41 }
42}
43
44#[cfg(not(feature = "rt"))]
45fn poll_timer(timer: &mut Timer, cx: &mut Context) -> Poll<u64> {
46 timer.poll_expired(cx)
47}
48
49#[cfg(feature = "rt")]
50fn poll_timer(timer: &mut Timer, cx: &mut Context) -> Poll<u64> {
51 std::pin::pin!(tokio::task::unconstrained(std::future::poll_fn(|cx| {
52 timer.poll_expired(cx)
53 })))
54 .poll(cx)
55}
56
57#[must_use]
58pub struct Sleep {
59 timer: Timer,
60 deadline: Instant,
61 elapsed: bool,
62}
63
64pub fn sleep_until(deadline: Instant) -> Sleep {
65 Sleep {
66 timer: Timer::new(Some(deadline), None),
67 deadline,
68 elapsed: false,
69 }
70}
71
72pub fn sleep(duration: Duration) -> Sleep {
73 sleep_until(Instant::now() + duration)
74}
75
76impl Future for Sleep {
77 type Output = ();
78
79 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80 if self.elapsed {
81 return Poll::Ready(());
82 }
83
84 self.elapsed = matches!(poll_timer(&mut self.timer, cx), Poll::Ready(_));
85
86 if self.elapsed {
87 Poll::Ready(())
88 } else {
89 Poll::Pending
90 }
91 }
92}
93
94impl Sleep {
95 pub fn deadline(&self) -> Instant {
96 self.deadline
97 }
98
99 pub fn is_elapsed(&self) -> bool {
100 self.elapsed
101 }
102
103 pub fn reset(&mut self, deadline: Instant) {
104 self.timer.reset(Some(deadline), None);
105 }
106}
107
108pub mod error {
109 use std::error::Error;
110 use std::fmt::{Display, Formatter};
111
112 #[derive(Debug, PartialEq, Eq)]
113 pub struct Elapsed(pub(crate) ());
114
115 impl Display for Elapsed {
116 fn fmt(&self, fmt: &mut Formatter<'_>) -> std::fmt::Result {
117 "deadline has elapsed".fmt(fmt)
118 }
119 }
120 impl Error for Elapsed {}
121}
122
123pin_project! {
124 #[must_use]
125 pub struct Timeout<F> {
126 #[pin]
127 future: F,
128 sleep: Sleep,
129 }
130}
131
132pub fn timeout_at<F: IntoFuture>(deadline: Instant, future: F) -> Timeout<F::IntoFuture> {
133 Timeout {
134 future: future.into_future(),
135 sleep: sleep_until(deadline),
136 }
137}
138
139pub fn timeout<F: IntoFuture>(duration: Duration, future: F) -> Timeout<F::IntoFuture> {
140 timeout_at(Instant::now() + duration, future)
141}
142
143impl<F: Future> Future for Timeout<F> {
144 type Output = Result<F::Output, error::Elapsed>;
145
146 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
147 let mut this = self.project();
148 #[expect(clippy::same_functions_in_if_condition)]
149 if let Poll::Ready(()) = Pin::new(&mut this.sleep).poll(cx) {
150 Poll::Ready(Err(error::Elapsed(())))
151 } else if let Poll::Ready(output) = this.future.poll(cx) {
152 Poll::Ready(Ok(output))
153 } else if let Poll::Ready(()) = Pin::new(&mut this.sleep).poll(cx) {
154 Poll::Ready(Err(error::Elapsed(())))
155 } else {
156 Poll::Pending
157 }
158 }
159}
160
161impl<F> Timeout<F> {
162 pub fn get_ref(&self) -> &F {
163 &self.future
164 }
165
166 pub fn get_mut(&mut self) -> &mut F {
167 &mut self.future
168 }
169
170 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut F> {
171 self.project().future
172 }
173
174 pub fn into_inner(self) -> F {
175 self.future
176 }
177}
178
179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
180pub enum MissedTickBehavior {
181 Burst,
182 Delay,
183 Skip,
184}
185
186#[must_use]
187pub struct Interval {
188 timer: Timer,
189 period: Duration,
190 expirations: u64,
191 behavior: MissedTickBehavior,
192}
193
194pub fn interval_at(start: Instant, period: Duration) -> Interval {
195 Interval {
196 timer: Timer::new(Some(start), Some(period)),
197 period,
198 expirations: 0,
199 behavior: MissedTickBehavior::Burst,
200 }
201}
202
203pub fn interval(period: Duration) -> Interval {
204 Interval {
205 timer: Timer::new(None, Some(period)),
206 period,
207 expirations: 1,
208 behavior: MissedTickBehavior::Burst,
209 }
210}
211
212struct Tick<'a>(&'a mut Interval);
213
214impl<'a> Future for Tick<'a> {
215 type Output = Instant;
216
217 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
218 self.0.poll_tick(cx)
219 }
220}
221
222impl Interval {
223 pub fn tick(&mut self) -> impl Future<Output = Instant> + '_ {
224 Tick(self)
225 }
226
227 pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
228 if let Poll::Ready(exp) = poll_timer(&mut self.timer, cx) {
229 self.expirations += exp;
230 };
231 if self.expirations != 0 {
232 self.expirations = match self.behavior {
233 MissedTickBehavior::Burst => self.expirations - 1,
234 MissedTickBehavior::Skip => 0,
235 MissedTickBehavior::Delay => unreachable!(),
236 };
237 Poll::Ready(Instant::now())
238 } else {
239 Poll::Pending
240 }
241 }
242
243 pub fn reset(&mut self) {
244 self.timer.reset(None, Some(self.period));
245 }
246
247 pub fn reset_immediately(&mut self) {
248 self.expirations = 1;
249 self.reset();
250 }
251
252 pub fn reset_after(&mut self, after: Duration) {
253 self.reset_at(Instant::now() + after);
254 }
255
256 pub fn reset_at(&mut self, deadline: Instant) {
257 let now = Instant::now();
258 if now < deadline {
259 self.expirations = 0;
260 self.timer.reset(Some(deadline), Some(self.period));
261 } else {
262 let past = u64::try_from((now - deadline).as_nanos()).unwrap();
263 let divider = u64::try_from(self.period.as_nanos()).unwrap();
264 self.expirations = past / divider + 1;
265 #[expect(clippy::unchecked_duration_subtraction)]
266 self.timer.reset(
267 Some(now - Duration::from_nanos(past % divider) + self.period),
268 Some(self.period),
269 );
270 }
271 }
272
273 pub fn missed_tick_behavior(&self) -> MissedTickBehavior {
274 self.behavior
275 }
276
277 pub fn set_missed_tick_behavior(&mut self, behavior: MissedTickBehavior) {
278 if behavior == MissedTickBehavior::Delay {
279 unimplemented!("MissedTickBehavior::Delay is not implemented yet");
280 }
281 self.behavior = behavior;
282 }
283
284 pub fn period(&self) -> Duration {
285 self.period
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use crate::*;
292 use std::sync::LazyLock;
293
294 cfg_if::cfg_if! {
295 if #[cfg(feature = "test-hires")] {
296 const TOLERANCE: Duration = Duration::from_millis(1);
297 } else {
298 const TOLERANCE: Duration = Duration::from_millis(5);
299 }
300 }
301
302 fn new_current_thread_runtime() -> tokio::runtime::Runtime {
303 let mut builder = tokio::runtime::Builder::new_current_thread();
304 #[cfg(unix)]
305 builder.enable_io();
306 builder.build().unwrap()
307 }
308
309 fn new_multi_thread_runtime() -> tokio::runtime::Runtime {
310 let mut builder = tokio::runtime::Builder::new_multi_thread();
311 builder.worker_threads(4);
312 #[cfg(unix)]
313 builder.enable_io();
314 builder.build().unwrap()
315 }
316
317 static CURRENT_THREAD_RUNTIME: LazyLock<tokio::runtime::Runtime> =
318 LazyLock::new(new_current_thread_runtime);
319
320 static MULTI_THREAD_RUNTIME: LazyLock<tokio::runtime::Runtime> =
321 LazyLock::new(new_multi_thread_runtime);
322
323 macro_rules! mytest {
324 ($(async fn $name:ident() $body:block)*) => {
325 $(
326 #[test]
327 fn $name() {
328 let current_thread_runtime = &*CURRENT_THREAD_RUNTIME;
329 let multi_thread_runtime = &*MULTI_THREAD_RUNTIME;
330 let local_runtime = new_current_thread_runtime();
331 let f1 = current_thread_runtime.spawn(async $body);
332 let f2 = multi_thread_runtime.spawn(async $body);
333 let f3 = local_runtime.spawn(async $body);
334 current_thread_runtime.block_on(f1).unwrap();
335 multi_thread_runtime.block_on(f2).unwrap();
336 local_runtime.block_on(f3).unwrap();
337 }
338 )*
339 }
340 }
341
342 mytest! {
343 async fn test_sleep() {
344 let start = Instant::now();
345 let duration = Duration::from_millis(100);
346 sleep(duration).await;
347 let elapsed = start.elapsed();
348 assert!(elapsed.abs_diff(duration) < TOLERANCE);
349 }
350
351 async fn test_sleep_until() {
352 let start = Instant::now();
353 let duration = Duration::from_millis(100);
354 sleep_until(start + duration).await;
355 let elapsed = start.elapsed();
356 assert!(elapsed.abs_diff(duration) < TOLERANCE);
357 }
358
359 async fn test_timeout() {
360 let start = Instant::now();
361 let duration = Duration::from_millis(100);
362 let large_duration = Duration::from_secs(1);
363 let small_duration = Duration::from_millis(10);
364 assert!(timeout(duration, sleep(small_duration)).await.is_ok());
365 let elapsed = start.elapsed();
366 assert!(elapsed.abs_diff(small_duration) < TOLERANCE);
367
368 let start = Instant::now();
369 assert!(timeout(duration, sleep(large_duration)).await.is_err());
370 let elapsed = start.elapsed();
371 assert!(elapsed.abs_diff(duration) < TOLERANCE);
372 }
373
374 async fn test_timeout_at() {
375 let start = Instant::now();
376 let duration = Duration::from_millis(100);
377 let large_duration = Duration::from_secs(1);
378 let small_duration = Duration::from_millis(10);
379 assert!(timeout_at(start + duration, sleep(small_duration))
380 .await
381 .is_ok());
382 let elapsed = start.elapsed();
383 assert!(elapsed.abs_diff(small_duration) < TOLERANCE);
384
385 let start = Instant::now();
386 assert!(timeout_at(start + duration, sleep(large_duration))
387 .await
388 .is_err());
389 let elapsed = start.elapsed();
390 assert!(elapsed.abs_diff(duration) < TOLERANCE);
391 }
392
393 async fn test_interval() {
394 let start = Instant::now();
395 let duration = Duration::from_millis(100);
396 let mut iv = interval(duration);
397
398 for i in 0..10 {
399 let _ = iv.tick().await;
400 let elapsed = start.elapsed();
401 assert!(elapsed.abs_diff(duration * i) < TOLERANCE);
402 }
403 }
404
405 async fn test_interval_at() {
406 let start = Instant::now();
407 let duration = Duration::from_millis(100);
408 let mut iv = interval_at(start + duration, duration);
409
410 for i in 1..=10 {
411 let _ = iv.tick().await;
412 let elapsed = start.elapsed();
413 assert!(elapsed.abs_diff(duration * i) < TOLERANCE);
414 }
415 }
416
417 async fn test_interval_burst() {
418 let start = Instant::now();
419 let duration = Duration::from_millis(100);
420 let mut iv = interval(duration);
421
422 sleep(duration * 3).await;
423 let _ = iv.tick().await;
424 let _ = iv.tick().await;
425 let _ = iv.tick().await;
426 let _ = iv.tick().await;
427 let _ = iv.tick().await;
428 let elapsed = start.elapsed();
429 assert!(elapsed.abs_diff(duration * 4) < TOLERANCE);
430 }
431
432 async fn test_interval_skip() {
433 let start = Instant::now();
434 let duration = Duration::from_millis(100);
435 let mut iv = interval(duration);
436 iv.set_missed_tick_behavior(MissedTickBehavior::Skip);
437
438 sleep(Duration::from_millis(350)).await;
439 let _ = iv.tick().await;
440 let _ = iv.tick().await;
441 let elapsed = start.elapsed();
442 assert!(
443 elapsed.abs_diff(duration * 4) < TOLERANCE
444 || elapsed.abs_diff(duration * 5) < TOLERANCE
445 );
446 }
447
448 async fn test_interval_reset() {
449 let start = Instant::now();
450 let duration = Duration::from_millis(100);
451 let mut iv = interval(duration);
452
453 let _ = iv.tick().await;
454 let _ = iv.tick().await;
455 let _ = iv.tick().await;
456 let elapsed = start.elapsed();
457 assert!(elapsed.abs_diff(duration * 2) < TOLERANCE);
458
459 iv.reset_immediately();
460 let _ = iv.tick().await;
461 let _ = iv.tick().await;
462 let _ = iv.tick().await;
463 let elapsed = start.elapsed();
464 assert!(elapsed.abs_diff(duration * 4) < TOLERANCE);
465
466 iv.reset_at(start + Duration::from_millis(250));
467 let _ = iv.tick().await;
468 let _ = iv.tick().await;
469 let _ = iv.tick().await;
470 let elapsed = start.elapsed();
471 assert!(elapsed.abs_diff(Duration::from_millis(450)) < TOLERANCE);
472 }
473 }
474}