1use std::{cmp, future::poll_fn, future::Future, pin::Pin, task, task::Poll};
3
4mod types;
5mod wheel;
6
7pub use self::types::{Millis, Seconds};
8pub use self::wheel::{now, query_system_time, system_time, TimerHandle};
9
10#[inline]
17pub fn sleep<T: Into<Millis>>(dur: T) -> Sleep {
18 Sleep::new(dur.into())
19}
20
21#[inline]
26pub fn deadline<T: Into<Millis>>(dur: T) -> Deadline {
27 Deadline::new(dur.into())
28}
29
30#[inline]
35pub fn interval<T: Into<Millis>>(period: T) -> Interval {
36 Interval::new(period.into())
37}
38
39#[inline]
45pub fn timeout<T, U>(dur: U, future: T) -> Timeout<T>
46where
47 T: Future,
48 U: Into<Millis>,
49{
50 Timeout::new_with_delay(future, Sleep::new(dur.into()))
51}
52
53#[inline]
59pub fn timeout_checked<T, U>(dur: U, future: T) -> TimeoutChecked<T>
60where
61 T: Future,
62 U: Into<Millis>,
63{
64 TimeoutChecked::new_with_delay(future, dur.into())
65}
66
67#[derive(Debug)]
83#[must_use = "futures do nothing unless you `.await` or poll them"]
84pub struct Sleep {
85 hnd: TimerHandle,
87}
88
89impl Sleep {
90 #[inline]
92 pub fn new(duration: Millis) -> Sleep {
93 Sleep {
94 hnd: TimerHandle::new(cmp::max(duration.0, 1) as u64),
95 }
96 }
97
98 #[inline]
100 pub fn is_elapsed(&self) -> bool {
101 self.hnd.is_elapsed()
102 }
103
104 #[inline]
106 pub fn elapse(&self) {
107 self.hnd.elapse()
108 }
109
110 pub fn reset<T: Into<Millis>>(&self, millis: T) {
118 self.hnd.reset(millis.into().0 as u64);
119 }
120
121 #[inline]
122 pub async fn wait(&self) {
124 poll_fn(|cx| self.hnd.poll_elapsed(cx)).await
125 }
126
127 #[inline]
128 pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
129 self.hnd.poll_elapsed(cx)
130 }
131}
132
133impl Future for Sleep {
134 type Output = ();
135
136 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
137 self.hnd.poll_elapsed(cx)
138 }
139}
140
141#[derive(Debug)]
157#[must_use = "futures do nothing unless you `.await` or poll them"]
158pub struct Deadline {
159 hnd: Option<TimerHandle>,
160}
161
162impl Deadline {
163 #[inline]
165 pub fn new(duration: Millis) -> Deadline {
166 if duration.0 != 0 {
167 Deadline {
168 hnd: Some(TimerHandle::new(duration.0 as u64)),
169 }
170 } else {
171 Deadline { hnd: None }
172 }
173 }
174
175 #[inline]
176 pub async fn wait(&self) {
178 poll_fn(|cx| self.poll_elapsed(cx)).await
179 }
180
181 pub fn reset<T: Into<Millis>>(&mut self, millis: T) {
189 let millis = millis.into();
190 if millis.0 != 0 {
191 if let Some(ref mut hnd) = self.hnd {
192 hnd.reset(millis.0 as u64);
193 } else {
194 self.hnd = Some(TimerHandle::new(millis.0 as u64));
195 }
196 } else {
197 let _ = self.hnd.take();
198 }
199 }
200
201 #[inline]
203 pub fn is_elapsed(&self) -> bool {
204 self.hnd.as_ref().map(|t| t.is_elapsed()).unwrap_or(true)
205 }
206
207 #[inline]
208 pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
209 self.hnd
210 .as_ref()
211 .map(|t| t.poll_elapsed(cx))
212 .unwrap_or(Poll::Pending)
213 }
214}
215
216impl Future for Deadline {
217 type Output = ();
218
219 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
220 self.poll_elapsed(cx)
221 }
222}
223
224pin_project_lite::pin_project! {
225 #[must_use = "futures do nothing unless you `.await` or poll them"]
227 #[derive(Debug)]
228 pub struct Timeout<T> {
229 #[pin]
230 value: T,
231 delay: Sleep,
232 }
233}
234
235impl<T> Timeout<T> {
236 pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout<T> {
237 Timeout { value, delay }
238 }
239}
240
241impl<T> Future for Timeout<T>
242where
243 T: Future,
244{
245 type Output = Result<T::Output, ()>;
246
247 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
248 let this = self.project();
249
250 if let Poll::Ready(v) = this.value.poll(cx) {
252 return Poll::Ready(Ok(v));
253 }
254
255 match this.delay.poll_elapsed(cx) {
257 Poll::Ready(()) => Poll::Ready(Err(())),
258 Poll::Pending => Poll::Pending,
259 }
260 }
261}
262
263pin_project_lite::pin_project! {
264 #[must_use = "futures do nothing unless you `.await` or poll them"]
266 pub struct TimeoutChecked<T> {
267 #[pin]
268 state: TimeoutCheckedState<T>,
269 }
270}
271
272pin_project_lite::pin_project! {
273 #[project = TimeoutCheckedStateProject]
274 enum TimeoutCheckedState<T> {
275 Timeout{ #[pin] fut: Timeout<T> },
276 NoTimeout{ #[pin] fut: T },
277 }
278}
279
280impl<T> TimeoutChecked<T> {
281 pub(crate) fn new_with_delay(value: T, delay: Millis) -> TimeoutChecked<T> {
282 if delay.is_zero() {
283 TimeoutChecked {
284 state: TimeoutCheckedState::NoTimeout { fut: value },
285 }
286 } else {
287 TimeoutChecked {
288 state: TimeoutCheckedState::Timeout {
289 fut: Timeout::new_with_delay(value, sleep(delay)),
290 },
291 }
292 }
293 }
294}
295
296impl<T> Future for TimeoutChecked<T>
297where
298 T: Future,
299{
300 type Output = Result<T::Output, ()>;
301
302 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
303 match self.project().state.as_mut().project() {
304 TimeoutCheckedStateProject::Timeout { fut } => fut.poll(cx),
305 TimeoutCheckedStateProject::NoTimeout { fut } => fut.poll(cx).map(Result::Ok),
306 }
307 }
308}
309
310#[must_use = "futures do nothing unless you `.await` or poll them"]
315#[derive(Debug)]
316pub struct Interval {
317 hnd: TimerHandle,
318 period: u32,
319}
320
321impl Interval {
322 #[inline]
324 pub fn new(period: Millis) -> Interval {
325 Interval {
326 hnd: TimerHandle::new(period.0 as u64),
327 period: period.0,
328 }
329 }
330
331 #[inline]
332 pub async fn tick(&self) {
333 poll_fn(|cx| self.poll_tick(cx)).await;
334 }
335
336 #[inline]
337 pub fn poll_tick(&self, cx: &mut task::Context<'_>) -> Poll<()> {
338 if self.hnd.poll_elapsed(cx).is_ready() {
339 self.hnd.reset(self.period as u64);
340 Poll::Ready(())
341 } else {
342 Poll::Pending
343 }
344 }
345}
346
347impl crate::Stream for Interval {
348 type Item = ();
349
350 #[inline]
351 fn poll_next(
352 self: Pin<&mut Self>,
353 cx: &mut task::Context<'_>,
354 ) -> Poll<Option<Self::Item>> {
355 self.poll_tick(cx).map(|_| Some(()))
356 }
357}
358
359#[cfg(test)]
360#[allow(clippy::let_underscore_future)]
361mod tests {
362 use futures_util::StreamExt;
363 use std::{future::poll_fn, rc::Rc, time};
364
365 use super::*;
366 use crate::future::lazy;
367
368 #[ntex_macros::rt_test2]
372 async fn lowres_time_does_not_immediately_change() {
373 let _ = sleep(Seconds(1));
374
375 assert_eq!(now(), now())
376 }
377
378 #[ntex_macros::rt_test2]
383 async fn lowres_time_updates_after_resolution_interval() {
384 let _ = sleep(Seconds(1));
385
386 let first_time = now();
387
388 sleep(Millis(25)).await;
389
390 let second_time = now();
391 assert!(second_time - first_time >= time::Duration::from_millis(25));
392 }
393
394 #[ntex_macros::rt_test2]
398 async fn system_time_service_time_does_not_immediately_change() {
399 let _ = sleep(Seconds(1));
400
401 assert_eq!(system_time(), system_time());
402 assert_eq!(system_time(), query_system_time());
403 }
404
405 #[ntex_macros::rt_test2]
410 async fn system_time_service_time_updates_after_resolution_interval() {
411 let _ = sleep(Seconds(1));
412
413 let wait_time = 300;
414
415 let first_time = system_time()
416 .duration_since(time::SystemTime::UNIX_EPOCH)
417 .unwrap();
418
419 sleep(Millis(wait_time)).await;
420
421 let second_time = system_time()
422 .duration_since(time::SystemTime::UNIX_EPOCH)
423 .unwrap();
424
425 assert!(second_time - first_time >= time::Duration::from_millis(wait_time as u64));
426 }
427
428 #[ntex_macros::rt_test2]
429 async fn test_sleep_0() {
430 let _ = sleep(Seconds(1));
431
432 let first_time = now();
433 sleep(Millis(0)).await;
434 let second_time = now();
435 assert!(second_time - first_time >= time::Duration::from_millis(1));
436
437 let first_time = now();
438 sleep(Millis(1)).await;
439 let second_time = now();
440 assert!(second_time - first_time >= time::Duration::from_millis(1));
441
442 let first_time = now();
443 let fut = sleep(Millis(10000));
444 assert!(!fut.is_elapsed());
445 fut.reset(Millis::ZERO);
446 fut.await;
447 let second_time = now();
448 assert!(second_time - first_time < time::Duration::from_millis(1));
449
450 let first_time = now();
451 let fut = Sleep {
452 hnd: TimerHandle::new(0),
453 };
454 assert!(fut.is_elapsed());
455 fut.await;
456 let second_time = now();
457 assert!(second_time - first_time < time::Duration::from_millis(1));
458
459 let first_time = now();
460 let fut = Rc::new(sleep(Millis(100000)));
461 let s = fut.clone();
462 ntex::rt::spawn(async move {
463 s.elapse();
464 });
465 poll_fn(|cx| fut.poll_elapsed(cx)).await;
466 assert!(fut.is_elapsed());
467 let second_time = now();
468 assert!(second_time - first_time < time::Duration::from_millis(1));
469 }
470
471 #[ntex_macros::rt_test2]
472 async fn test_deadline() {
473 let _ = sleep(Seconds(1));
474
475 let first_time = now();
476 let dl = deadline(Millis(1));
477 dl.await;
478 let second_time = now();
479 assert!(second_time - first_time >= time::Duration::from_millis(1));
480 assert!(timeout(Millis(100), deadline(Millis(0))).await.is_err());
481
482 let mut dl = deadline(Millis(1));
483 dl.reset(Millis::ZERO);
484 assert!(lazy(|cx| dl.poll_elapsed(cx)).await.is_pending());
485
486 let mut dl = deadline(Millis(1));
487 dl.reset(Millis(100));
488 let first_time = now();
489 dl.await;
490 let second_time = now();
491 assert!(second_time - first_time >= time::Duration::from_millis(100));
492
493 let mut dl = deadline(Millis(0));
494 assert!(dl.is_elapsed());
495 dl.reset(Millis(1));
496 assert!(lazy(|cx| dl.poll_elapsed(cx)).await.is_pending());
497
498 assert!(format!("{dl:?}").contains("Deadline"));
499 }
500
501 #[ntex_macros::rt_test2]
502 async fn test_interval() {
503 let mut int = interval(Millis(250));
504
505 let time = time::Instant::now();
506 int.tick().await;
507 let elapsed = time::Instant::now() - time;
508 assert!(
509 elapsed > time::Duration::from_millis(200)
510 && elapsed < time::Duration::from_millis(450),
511 "elapsed: {elapsed:?}"
512 );
513
514 let time = time::Instant::now();
515 int.next().await;
516 let elapsed = time::Instant::now() - time;
517 assert!(
518 elapsed > time::Duration::from_millis(200)
519 && elapsed < time::Duration::from_millis(450),
520 "elapsed: {elapsed:?}"
521 );
522 }
523
524 #[ntex_macros::rt_test2]
525 async fn test_interval_one_sec() {
526 let int = interval(Millis::ONE_SEC);
527
528 for _i in 0..3 {
529 let time = time::Instant::now();
530 int.tick().await;
531 let elapsed = time::Instant::now() - time;
532 assert!(
533 elapsed > time::Duration::from_millis(1000)
534 && elapsed < time::Duration::from_millis(1300),
535 "elapsed: {elapsed:?}"
536 );
537 }
538 }
539
540 #[ntex_macros::rt_test2]
541 async fn test_timeout_checked() {
542 let result = timeout_checked(Millis(200), sleep(Millis(100))).await;
543 assert!(result.is_ok());
544
545 let result = timeout_checked(Millis(5), sleep(Millis(100))).await;
546 assert!(result.is_err());
547
548 let result = timeout_checked(Millis(0), sleep(Millis(100))).await;
549 assert!(result.is_ok());
550 }
551}