tokio_adjustable_timeout/
lib.rs1use std::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5};
6use tokio::{
7 sync::mpsc,
8 time::{sleep_until, Duration, Instant, Sleep},
9};
10
11#[derive(Debug)]
12pub struct Elapsed;
13
14#[derive(Debug)]
15pub struct Closed;
16
17pub struct Handle {
18 tx: mpsc::UnboundedSender<Command>,
19}
20
21impl Handle {
22 pub(crate) fn new(tx: mpsc::UnboundedSender<Command>) -> Self {
23 Self { tx }
24 }
25
26 pub fn increment(&self, value: Duration) -> Result<(), Closed> {
27 self.tx.send(Command::Increment(value)).map_err(|_| Closed)
28 }
29
30 pub fn decrement(&self, value: Duration) -> Result<(), Closed> {
31 self.tx.send(Command::Decrement(value)).map_err(|_| Closed)
32 }
33
34 pub fn update(&self, deadline: Instant) -> Result<(), Closed> {
35 self.tx.send(Command::Update(deadline)).map_err(|_| Closed)
36 }
37}
38
39enum Command {
40 Increment(Duration),
41 Decrement(Duration),
42 Update(Instant),
43}
44
45pin_project_lite::pin_project! {
46 #[derive(Debug)]
47 pub struct AdjustableTimeout<T> {
48 #[pin]
49 future: T,
50 #[pin]
51 delay: Sleep,
52
53 tx: mpsc::UnboundedSender<Command>,
54 rx: mpsc::UnboundedReceiver<Command>,
55 }
56}
57
58impl<T> AdjustableTimeout<T> {
59 pub fn handle(&self) -> Handle {
60 Handle::new(self.tx.clone())
61 }
62}
63
64impl<T> Future for AdjustableTimeout<T>
65where
66 T: Future,
67{
68 type Output = Result<T::Output, Elapsed>;
69
70 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
71 let mut this = self.project();
72
73 if let Poll::Ready(v) = this.future.poll(cx) {
74 return Poll::Ready(Ok(v));
75 }
76
77 if let Poll::Ready(cmd) = this.rx.poll_recv(cx) {
78 let deadline = match cmd.expect("shouldn't happen") {
79 Command::Increment(value) => this.delay.deadline() + value,
80 Command::Decrement(value) => this.delay.deadline() - value,
81 Command::Update(deadline) => deadline,
82 };
83
84 this.delay.as_mut().reset(deadline);
85 }
86
87 if let Poll::Ready(()) = this.delay.poll(cx) {
88 return Poll::Ready(Err(Elapsed));
89 }
90
91 Poll::Pending
92 }
93}
94
95pub fn adjustable_timeout<T>(duration: Duration, future: T) -> AdjustableTimeout<T>
96where
97 T: Future,
98{
99 let (tx, rx) = mpsc::unbounded_channel();
100 let deadline = Instant::now() + duration;
101 let delay = sleep_until(deadline);
102
103 AdjustableTimeout {
104 future,
105 delay,
106 tx,
107 rx,
108 }
109}