safina_timer/
deadline_future.rs1#![forbid(unsafe_code)]
2
3use crate::schedule_wake;
4use core::fmt::{Debug, Display, Formatter};
5use core::future::Future;
6use core::pin::Pin;
7use core::task::{Context, Poll, Waker};
8use std::error::Error;
9use std::sync::{Arc, Mutex};
10use std::time::Instant;
11
12#[derive(Debug, PartialEq)]
15pub enum DeadlineError {
16 TimerThreadNotStarted,
17 DeadlineExceeded,
18}
19impl From<DeadlineError> for std::io::Error {
20 fn from(error: DeadlineError) -> Self {
21 match error {
22 DeadlineError::TimerThreadNotStarted => {
23 std::io::Error::new(std::io::ErrorKind::Other, "TimerThreadNotStarted")
24 }
25 DeadlineError::DeadlineExceeded => {
26 std::io::Error::new(std::io::ErrorKind::TimedOut, "DeadlineExceeded")
27 }
28 }
29 }
30}
31impl Display for DeadlineError {
32 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
33 std::fmt::Debug::fmt(self, f)
34 }
35}
36impl Error for DeadlineError {}
37
38#[derive(Debug, PartialEq)]
39pub struct DeadlineExceeded;
40impl From<DeadlineExceeded> for std::io::Error {
41 fn from(_error: DeadlineExceeded) -> Self {
42 std::io::Error::new(std::io::ErrorKind::TimedOut, "DeadlineExceeded")
43 }
44}
45impl Display for DeadlineExceeded {
46 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
47 std::fmt::Debug::fmt(self, f)
48 }
49}
50impl Error for DeadlineExceeded {}
51
52#[must_use = "futures stay idle unless you await them"]
56pub struct DeadlineFuture<Fut: Future + Unpin> {
57 inner: Fut,
58 deadline: std::time::Instant,
59 waker: Arc<Mutex<Option<Waker>>>,
60}
61impl<Fut: Future + Unpin> DeadlineFuture<Fut> {
62 pub fn new(inner: Fut, deadline: Instant) -> Self {
72 Self {
73 inner,
74 deadline,
75 waker: Arc::new(Mutex::new(None)),
76 }
77 }
78}
79impl<Fut: Future + Unpin> Future for DeadlineFuture<Fut> {
80 type Output = Result<Fut::Output, DeadlineError>;
81
82 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83 if self.deadline < std::time::Instant::now() {
87 return Poll::Ready(Err(DeadlineError::DeadlineExceeded));
88 }
89 match Pin::new(&mut self.inner).poll(cx) {
90 Poll::Ready(r) => return Poll::Ready(Ok(r)),
91 Poll::Pending => {}
92 }
93 let old_waker = self.waker.lock().unwrap().replace(cx.waker().clone());
94 if old_waker.is_none() {
95 schedule_wake(self.deadline, self.waker.clone())
96 .map_err(|_| DeadlineError::TimerThreadNotStarted)?;
97 }
98 Poll::Pending
99 }
100}
101
102pub async fn with_deadline<Fut: Future>(
119 inner: Fut,
120 deadline: std::time::Instant,
121) -> Result<Fut::Output, DeadlineExceeded> {
122 match DeadlineFuture::new(Box::pin(inner), deadline).await {
123 Ok(result) => Ok(result),
124 Err(DeadlineError::DeadlineExceeded) => Err(DeadlineExceeded),
125 Err(DeadlineError::TimerThreadNotStarted) => panic!("TimerThreadNotStarted"),
126 }
127}
128
129pub async fn with_timeout<Fut: Future>(
146 inner: Fut,
147 duration: std::time::Duration,
148) -> Result<Fut::Output, DeadlineExceeded> {
149 with_deadline(inner, Instant::now() + duration).await
150}