1#![allow(
3 clippy::cast_possible_truncation,
4 clippy::cast_sign_loss,
5 clippy::cast_possible_wrap
6)]
7use std::{cmp, future::Future, future::poll_fn, pin::Pin, task, task::Poll};
8
9mod types;
10mod wheel;
11
12pub use self::types::{Millis, Seconds};
13pub use self::wheel::{TimerHandle, now, query_system_time, system_time};
14
15#[inline]
22pub fn sleep<T: Into<Millis>>(dur: T) -> Sleep {
23 Sleep::new(dur.into())
24}
25
26#[inline]
31pub fn deadline<T: Into<Millis>>(dur: T) -> Deadline {
32 Deadline::new(dur.into())
33}
34
35#[inline]
40pub fn interval<T: Into<Millis>>(period: T) -> Interval {
41 Interval::new(period.into())
42}
43
44#[inline]
50pub fn timeout<T, U>(dur: U, future: T) -> Timeout<T>
51where
52 T: Future,
53 U: Into<Millis>,
54{
55 Timeout::new_with_delay(future, Sleep::new(dur.into()))
56}
57
58#[inline]
64pub fn timeout_checked<T, U>(dur: U, future: T) -> TimeoutChecked<T>
65where
66 T: Future,
67 U: Into<Millis>,
68{
69 TimeoutChecked::new_with_delay(future, dur.into())
70}
71
72#[derive(Debug)]
88#[must_use = "futures do nothing unless you `.await` or poll them"]
89pub struct Sleep {
90 hnd: TimerHandle,
92}
93
94impl Sleep {
95 #[inline]
97 pub fn new(duration: Millis) -> Sleep {
98 Sleep {
99 hnd: TimerHandle::new(u64::from(cmp::max(duration.0, 1))),
100 }
101 }
102
103 #[inline]
105 pub fn is_elapsed(&self) -> bool {
106 self.hnd.is_elapsed()
107 }
108
109 #[inline]
111 pub fn elapse(&self) {
112 self.hnd.elapse();
113 }
114
115 pub fn reset<T: Into<Millis>>(&self, millis: T) {
123 self.hnd.reset(u64::from(millis.into().0));
124 }
125
126 #[inline]
127 pub async fn wait(&self) {
129 poll_fn(|cx| self.hnd.poll_elapsed(cx)).await;
130 }
131
132 #[inline]
133 pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
134 self.hnd.poll_elapsed(cx)
135 }
136}
137
138impl Future for Sleep {
139 type Output = ();
140
141 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
142 self.hnd.poll_elapsed(cx)
143 }
144}
145
146#[derive(Debug)]
162#[must_use = "futures do nothing unless you `.await` or poll them"]
163pub struct Deadline {
164 hnd: Option<TimerHandle>,
165}
166
167impl Deadline {
168 #[inline]
170 pub fn new(duration: Millis) -> Deadline {
171 if duration.0 != 0 {
172 Deadline {
173 hnd: Some(TimerHandle::new(u64::from(duration.0))),
174 }
175 } else {
176 Deadline { hnd: None }
177 }
178 }
179
180 #[inline]
181 pub async fn wait(&self) {
183 poll_fn(|cx| self.poll_elapsed(cx)).await;
184 }
185
186 pub fn reset<T: Into<Millis>>(&mut self, millis: T) {
194 let millis = millis.into();
195 if millis.0 != 0 {
196 if let Some(ref mut hnd) = self.hnd {
197 hnd.reset(u64::from(millis.0));
198 } else {
199 self.hnd = Some(TimerHandle::new(u64::from(millis.0)));
200 }
201 } else {
202 let _ = self.hnd.take();
203 }
204 }
205
206 #[inline]
208 pub fn is_elapsed(&self) -> bool {
209 self.hnd.as_ref().is_none_or(TimerHandle::is_elapsed)
210 }
211
212 #[inline]
213 pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
214 self.hnd
215 .as_ref()
216 .map_or(Poll::Pending, |t| t.poll_elapsed(cx))
217 }
218}
219
220impl Future for Deadline {
221 type Output = ();
222
223 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
224 self.poll_elapsed(cx)
225 }
226}
227
228pin_project_lite::pin_project! {
229 #[must_use = "futures do nothing unless you `.await` or poll them"]
231 #[derive(Debug)]
232 pub struct Timeout<T> {
233 #[pin]
234 value: T,
235 delay: Sleep,
236 }
237}
238
239impl<T> Timeout<T> {
240 pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout<T> {
241 Timeout { value, delay }
242 }
243}
244
245impl<T> Future for Timeout<T>
246where
247 T: Future,
248{
249 type Output = Result<T::Output, ()>;
250
251 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
252 let this = self.project();
253
254 if let Poll::Ready(v) = this.value.poll(cx) {
256 return Poll::Ready(Ok(v));
257 }
258
259 match this.delay.poll_elapsed(cx) {
261 Poll::Ready(()) => Poll::Ready(Err(())),
262 Poll::Pending => Poll::Pending,
263 }
264 }
265}
266
267pin_project_lite::pin_project! {
268 #[must_use = "futures do nothing unless you `.await` or poll them"]
270 pub struct TimeoutChecked<T> {
271 #[pin]
272 state: TimeoutCheckedState<T>,
273 }
274}
275
276pin_project_lite::pin_project! {
277 #[project = TimeoutCheckedStateProject]
278 enum TimeoutCheckedState<T> {
279 Timeout{ #[pin] fut: Timeout<T> },
280 NoTimeout{ #[pin] fut: T },
281 }
282}
283
284impl<T> TimeoutChecked<T> {
285 pub(crate) fn new_with_delay(value: T, delay: Millis) -> TimeoutChecked<T> {
286 if delay.is_zero() {
287 TimeoutChecked {
288 state: TimeoutCheckedState::NoTimeout { fut: value },
289 }
290 } else {
291 TimeoutChecked {
292 state: TimeoutCheckedState::Timeout {
293 fut: Timeout::new_with_delay(value, sleep(delay)),
294 },
295 }
296 }
297 }
298}
299
300impl<T> Future for TimeoutChecked<T>
301where
302 T: Future,
303{
304 type Output = Result<T::Output, ()>;
305
306 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
307 match self.project().state.as_mut().project() {
308 TimeoutCheckedStateProject::Timeout { fut } => fut.poll(cx),
309 TimeoutCheckedStateProject::NoTimeout { fut } => fut.poll(cx).map(Result::Ok),
310 }
311 }
312}
313
314#[must_use = "futures do nothing unless you `.await` or poll them"]
319#[derive(Debug)]
320pub struct Interval {
321 hnd: TimerHandle,
322 period: u32,
323}
324
325impl Interval {
326 #[inline]
328 pub fn new(period: Millis) -> Interval {
329 Interval {
330 hnd: TimerHandle::new(u64::from(period.0)),
331 period: period.0,
332 }
333 }
334
335 #[inline]
336 pub async fn tick(&self) {
337 poll_fn(|cx| self.poll_tick(cx)).await;
338 }
339
340 #[inline]
341 pub fn poll_tick(&self, cx: &mut task::Context<'_>) -> Poll<()> {
342 if self.hnd.poll_elapsed(cx).is_ready() {
343 self.hnd.reset(u64::from(self.period));
344 Poll::Ready(())
345 } else {
346 Poll::Pending
347 }
348 }
349}
350
351impl crate::Stream for Interval {
352 type Item = ();
353
354 #[inline]
355 fn poll_next(
356 self: Pin<&mut Self>,
357 cx: &mut task::Context<'_>,
358 ) -> Poll<Option<Self::Item>> {
359 self.poll_tick(cx).map(|()| Some(()))
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use futures_util::StreamExt;
366 use std::{future::poll_fn, rc::Rc, time};
367
368 use super::*;
369 use crate::future::lazy;
370
371 #[ntex::test]
375 async fn lowres_time_does_not_immediately_change() {
376 sleep(Millis(25)).await;
377
378 assert_eq!(now(), now());
379 }
380
381 #[ntex::test]
386 async fn lowres_time_updates_after_resolution_interval() {
387 sleep(Millis(50)).await;
388
389 let first_time = now();
390
391 sleep(Millis(25)).await;
392
393 let second_time = now();
394 assert!(second_time - first_time >= time::Duration::from_millis(25));
395 }
396
397 #[ntex::test]
401 async fn system_time_service_time_does_not_immediately_change() {
402 sleep(Seconds(1)).await;
403
404 assert_eq!(system_time(), system_time());
405 assert_eq!(system_time(), query_system_time());
406 }
407
408 #[ntex::test]
413 async fn system_time_service_time_updates_after_resolution_interval() {
414 sleep(Millis(100)).await;
415
416 let wait_time = 300;
417
418 let first_time = system_time()
419 .duration_since(time::SystemTime::UNIX_EPOCH)
420 .unwrap();
421
422 sleep(Millis(wait_time)).await;
423
424 let second_time = system_time()
425 .duration_since(time::SystemTime::UNIX_EPOCH)
426 .unwrap();
427
428 assert!(
429 second_time.checked_sub(first_time).unwrap()
430 >= time::Duration::from_millis(u64::from(wait_time))
431 );
432 }
433
434 #[ntex::test]
435 async fn test_sleep_0() {
436 sleep(Seconds(1)).await;
437
438 let first_time = now();
439 sleep(Millis(0)).await;
440 let second_time = now();
441 assert!(second_time - first_time >= time::Duration::from_millis(1));
442
443 let first_time = now();
444 sleep(Millis(1)).await;
445 let second_time = now();
446 assert!(second_time - first_time >= time::Duration::from_millis(1));
447
448 let first_time = now();
449 let fut = sleep(Millis(10000));
450 assert!(!fut.is_elapsed());
451 fut.reset(Millis::ZERO);
452 fut.await;
453 let second_time = now();
454 assert!(second_time - first_time < time::Duration::from_millis(1));
455
456 let first_time = now();
457 let fut = Sleep {
458 hnd: TimerHandle::new(0),
459 };
460 assert!(fut.is_elapsed());
461 fut.await;
462 let second_time = now();
463 assert!(second_time - first_time < time::Duration::from_millis(1));
464
465 let first_time = now();
466 let fut = Rc::new(sleep(Millis(10_0000)));
467 let s = fut.clone();
468 ntex::rt::spawn(async move {
469 s.elapse();
470 });
471 poll_fn(|cx| fut.poll_elapsed(cx)).await;
472 assert!(fut.is_elapsed());
473 let second_time = now();
474 assert!(second_time - first_time < time::Duration::from_millis(1));
475 }
476
477 #[ntex::test]
478 async fn test_deadline() {
479 sleep(Seconds(1)).await;
480
481 let first_time = now();
482 let dl = deadline(Millis(1));
483 dl.await;
484 let second_time = now();
485 assert!(second_time - first_time >= time::Duration::from_millis(1));
486 assert!(timeout(Millis(100), deadline(Millis(0))).await.is_err());
487
488 let mut dl = deadline(Millis(1));
489 dl.reset(Millis::ZERO);
490 assert!(lazy(|cx| dl.poll_elapsed(cx)).await.is_pending());
491
492 let mut dl = deadline(Millis(1));
493 dl.reset(Millis(100));
494 let first_time = now();
495 dl.await;
496 let second_time = now();
497 assert!(second_time - first_time >= time::Duration::from_millis(100));
498
499 let mut dl = deadline(Millis(0));
500 assert!(dl.is_elapsed());
501 dl.reset(Millis(1));
502 assert!(lazy(|cx| dl.poll_elapsed(cx)).await.is_pending());
503
504 assert!(format!("{dl:?}").contains("Deadline"));
505 }
506
507 #[ntex::test]
508 async fn test_interval() {
509 let mut int = interval(Millis(250));
510
511 let time = time::Instant::now();
512 int.tick().await;
513 let elapsed = time.elapsed();
514 assert!(
515 elapsed > time::Duration::from_millis(200)
516 && elapsed < time::Duration::from_millis(450),
517 "elapsed: {elapsed:?}"
518 );
519
520 let time = time::Instant::now();
521 int.next().await;
522 let elapsed = time.elapsed();
523 assert!(
524 elapsed > time::Duration::from_millis(200)
525 && elapsed < time::Duration::from_millis(450),
526 "elapsed: {elapsed:?}"
527 );
528 }
529
530 #[ntex::test]
531 async fn test_interval_one_sec() {
532 let int = interval(Millis::ONE_SEC);
533
534 for _i in 0..3 {
535 let time = time::Instant::now();
536 int.tick().await;
537 let elapsed = time.elapsed();
538 assert!(
539 elapsed > time::Duration::from_secs(1)
540 && elapsed < time::Duration::from_millis(1300),
541 "elapsed: {elapsed:?}"
542 );
543 }
544 }
545
546 #[ntex::test]
547 async fn test_timeout_checked() {
548 let result = timeout_checked(Millis(200), sleep(Millis(100))).await;
549 assert!(result.is_ok());
550
551 let result = timeout_checked(Millis(5), sleep(Millis(100))).await;
552 assert!(result.is_err());
553
554 let result = timeout_checked(Millis(0), sleep(Millis(100))).await;
555 assert!(result.is_ok());
556 }
557}