safina_timer/
deadline_future.rs

1#![forbid(unsafe_code)]
2
3use crate::schedule_wake;
4use core::fmt::{Debug, Display, Formatter};
5use core::future::Future;
6use core::pin::Pin;
7use core::task::{Context, Poll, Waker};
8use std::error::Error;
9use std::sync::{Arc, Mutex};
10use std::time::Instant;
11
12/// - `DeadlineError::TimerThreadNotStarted`
13/// - `DeadlineError::DeadlineExceeded`
14#[derive(Debug, PartialEq)]
15pub enum DeadlineError {
16    TimerThreadNotStarted,
17    DeadlineExceeded,
18}
19impl From<DeadlineError> for std::io::Error {
20    fn from(error: DeadlineError) -> Self {
21        match error {
22            DeadlineError::TimerThreadNotStarted => {
23                std::io::Error::new(std::io::ErrorKind::Other, "TimerThreadNotStarted")
24            }
25            DeadlineError::DeadlineExceeded => {
26                std::io::Error::new(std::io::ErrorKind::TimedOut, "DeadlineExceeded")
27            }
28        }
29    }
30}
31impl Display for DeadlineError {
32    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
33        std::fmt::Debug::fmt(self, f)
34    }
35}
36impl Error for DeadlineError {}
37
38#[derive(Debug, PartialEq)]
39pub struct DeadlineExceeded;
40impl From<DeadlineExceeded> for std::io::Error {
41    fn from(_error: DeadlineExceeded) -> Self {
42        std::io::Error::new(std::io::ErrorKind::TimedOut, "DeadlineExceeded")
43    }
44}
45impl Display for DeadlineExceeded {
46    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
47        std::fmt::Debug::fmt(self, f)
48    }
49}
50impl Error for DeadlineExceeded {}
51
52/// A future wrapper that returns `DeadlineExceeded` at a specified deadline.
53///
54/// It is returned by [`with_deadline`] and [`with_timeout`].
55#[must_use = "futures stay idle unless you await them"]
56pub struct DeadlineFuture<Fut: Future + Unpin> {
57    inner: Fut,
58    deadline: std::time::Instant,
59    waker: Arc<Mutex<Option<Waker>>>,
60}
61impl<Fut: Future + Unpin> DeadlineFuture<Fut> {
62    /// Makes a future that awaits `inner`,
63    /// but returns [`DeadlineError`](enum.DeadlineError.html) after `deadline`.
64    ///
65    /// Note that `inner` must be
66    /// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
67    /// Use [`std::boxed::Box::pin`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html#method.pin)
68    /// to make it Unpin.
69    /// Or use [`pin_utils::pin_mut`](https://docs.rs/pin-utils/latest/pin_utils/macro.pin_mut.html)
70    /// to do it with unsafe code that does not allocate memory.
71    pub fn new(inner: Fut, deadline: Instant) -> Self {
72        Self {
73            inner,
74            deadline,
75            waker: Arc::new(Mutex::new(None)),
76        }
77    }
78}
79impl<Fut: Future + Unpin> Future for DeadlineFuture<Fut> {
80    type Output = Result<Fut::Output, DeadlineError>;
81
82    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83        // The primary purpose of deadlines is to shed load during overload.
84        // If the inner future completed and the deadline exceeded, the process
85        // is likely overloaded.  In this case, we return error to shed load.
86        if self.deadline < std::time::Instant::now() {
87            return Poll::Ready(Err(DeadlineError::DeadlineExceeded));
88        }
89        match Pin::new(&mut self.inner).poll(cx) {
90            Poll::Ready(r) => return Poll::Ready(Ok(r)),
91            Poll::Pending => {}
92        }
93        let old_waker = self.waker.lock().unwrap().replace(cx.waker().clone());
94        if old_waker.is_none() {
95            schedule_wake(self.deadline, self.waker.clone())
96                .map_err(|_| DeadlineError::TimerThreadNotStarted)?;
97        }
98        Poll::Pending
99    }
100}
101
102/// Awaits `inner`, but returns [`DeadlineExceeded`](struct.DeadlineExceeded.html)
103/// after `deadline`.
104///
105/// First moves `inner` to the heap, to make it
106/// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
107/// Use
108/// [`DeadlineFuture::new`](https://docs.rs/safina-timer/latest/safina_timer/struct.DeadlineFuture.html)
109/// to avoid allocating on the heap.
110///
111/// # Errors
112/// Returns `Err(DeadlineExceeded)` if `deadline` passes before `inner` completes.
113///
114/// # Panics
115/// Panics if [`start_timer_thread()`](fn.start_timer_thread.html) has not been called.
116/// If you need to handle this error, use
117/// [`DeadlineFuture::new`](https://docs.rs/safina-timer/latest/safina_timer/struct.DeadlineFuture.html).
118pub async fn with_deadline<Fut: Future>(
119    inner: Fut,
120    deadline: std::time::Instant,
121) -> Result<Fut::Output, DeadlineExceeded> {
122    match DeadlineFuture::new(Box::pin(inner), deadline).await {
123        Ok(result) => Ok(result),
124        Err(DeadlineError::DeadlineExceeded) => Err(DeadlineExceeded),
125        Err(DeadlineError::TimerThreadNotStarted) => panic!("TimerThreadNotStarted"),
126    }
127}
128
129/// Awaits `inner`, but returns [`DeadlineExceeded`](struct.DeadlineExceeded.html)
130/// after `duration` time from now.
131///
132/// First moves `inner` to the heap, to make it
133/// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
134/// Use
135/// [`DeadlineFuture::new`](https://docs.rs/safina-timer/latest/safina_timer/struct.DeadlineFuture.html)
136/// to avoid allocating on the heap.
137///
138/// # Errors
139/// Returns `Err(DeadlineExceeded)` if `duration` passes before `inner` completes.
140///
141/// # Panics
142/// Panics if [`start_timer_thread()`](fn.start_timer_thread.html) has not been called.
143/// If you need to handle this error, use
144/// [`DeadlineFuture::new`](https://docs.rs/safina-timer/latest/safina_timer/struct.DeadlineFuture.html).
145pub async fn with_timeout<Fut: Future>(
146    inner: Fut,
147    duration: std::time::Duration,
148) -> Result<Fut::Output, DeadlineExceeded> {
149    with_deadline(inner, Instant::now() + duration).await
150}