tokio_adjustable_timeout/
lib.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5};
6use tokio::{
7    sync::mpsc,
8    time::{sleep_until, Duration, Instant, Sleep},
9};
10
11#[derive(Debug)]
12pub struct Elapsed;
13
14#[derive(Debug)]
15pub struct Closed;
16
17pub struct Handle {
18    tx: mpsc::UnboundedSender<Command>,
19}
20
21impl Handle {
22    pub(crate) fn new(tx: mpsc::UnboundedSender<Command>) -> Self {
23        Self { tx }
24    }
25
26    pub fn increment(&self, value: Duration) -> Result<(), Closed> {
27        self.tx.send(Command::Increment(value)).map_err(|_| Closed)
28    }
29
30    pub fn decrement(&self, value: Duration) -> Result<(), Closed> {
31        self.tx.send(Command::Decrement(value)).map_err(|_| Closed)
32    }
33
34    pub fn update(&self, deadline: Instant) -> Result<(), Closed> {
35        self.tx.send(Command::Update(deadline)).map_err(|_| Closed)
36    }
37}
38
39enum Command {
40    Increment(Duration),
41    Decrement(Duration),
42    Update(Instant),
43}
44
45pin_project_lite::pin_project! {
46    #[derive(Debug)]
47    pub struct AdjustableTimeout<T> {
48        #[pin]
49        future: T,
50        #[pin]
51        delay: Sleep,
52
53        tx: mpsc::UnboundedSender<Command>,
54        rx: mpsc::UnboundedReceiver<Command>,
55    }
56}
57
58impl<T> AdjustableTimeout<T> {
59    pub fn handle(&self) -> Handle {
60        Handle::new(self.tx.clone())
61    }
62}
63
64impl<T> Future for AdjustableTimeout<T>
65where
66    T: Future,
67{
68    type Output = Result<T::Output, Elapsed>;
69
70    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
71        let mut this = self.project();
72
73        if let Poll::Ready(v) = this.future.poll(cx) {
74            return Poll::Ready(Ok(v));
75        }
76
77        if let Poll::Ready(cmd) = this.rx.poll_recv(cx) {
78            let deadline = match cmd.expect("shouldn't happen") {
79                Command::Increment(value) => this.delay.deadline() + value,
80                Command::Decrement(value) => this.delay.deadline() - value,
81                Command::Update(deadline) => deadline,
82            };
83
84            this.delay.as_mut().reset(deadline);
85        }
86
87        if let Poll::Ready(()) = this.delay.poll(cx) {
88            return Poll::Ready(Err(Elapsed));
89        }
90
91        Poll::Pending
92    }
93}
94
95pub fn adjustable_timeout<T>(duration: Duration, future: T) -> AdjustableTimeout<T>
96where
97    T: Future,
98{
99    let (tx, rx) = mpsc::unbounded_channel();
100    let deadline = Instant::now() + duration;
101    let delay = sleep_until(deadline);
102
103    AdjustableTimeout {
104        future,
105        delay,
106        tx,
107        rx,
108    }
109}