1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use crate::cancel::{CancelToken, CancelWaker};
use futures_timer::Delay;
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll};
use std::time::Duration;
pub fn once<T, C>(delay: Duration, task: C) -> (CancelToken, DelayedTask<T>)
where
T: Future<Output = ()>,
C: FnOnce() -> T,
{
let done = AtomicBool::new(false);
let waker = CancelWaker::new(done);
(
CancelToken::new(waker.clone()),
DelayedTask {
waker,
delay: Delay::new(delay),
task: task(),
},
)
}
pin_project! {
pub struct DelayedTask<T: Future<Output = ()>> {
waker: CancelWaker,
#[pin]
delay: Delay,
#[pin]
task: T,
}
}
impl<T: Future<Output = ()>> DelayedTask<T> {
pub(crate) fn new(waker: CancelWaker, delay: Delay, task: T) -> DelayedTask<T> {
DelayedTask {
waker,
delay: delay,
task: task,
}
}
}
impl<T: Future<Output = ()>> Future for DelayedTask<T> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.waker.0.done.load(Ordering::Relaxed) {
true => Poll::Ready(()),
false => {
this.waker.0.waker.register(cx.waker());
if this.waker.0.done.load(Ordering::Relaxed) {
Poll::Ready(())
} else {
match this.delay.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => match this.task.poll(cx) {
Poll::Ready(v) => Poll::Ready(v),
Poll::Pending => Poll::Pending,
},
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::once;
use smol::{self, Task};
use std::time::Duration;
use futures::join;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
#[test]
fn once_test() {
let atomic = Arc::new(AtomicI32::new(0));
let atomic_c = atomic.clone();
smol::run(async {
let (_, task) = once(Duration::from_secs(1), || async {
atomic_c.store(1, Ordering::Relaxed);
});
task.await;
});
assert_eq!(1, atomic.load(Ordering::Relaxed));
}
#[test]
fn once_cancel_test() {
let atomic = Arc::new(AtomicI32::new(0));
let atomic_c = atomic.clone();
smol::run(async {
let (token, task) = once(Duration::from_secs(1), || async move {
atomic_c.store(1, Ordering::Relaxed);
});
let handle = Task::spawn(async move { task.await });
join!(token.cancel(), handle)
});
assert_eq!(0, atomic.load(Ordering::Relaxed));
}
}