1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use pin_project::pin_project;
use std::{
collections::HashMap,
future::Future,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
// TODO(azw): support more executors
#[cfg(feature = "async-std-experimental")]
use async_std::task::{spawn_blocking, JoinHandle};
#[cfg(feature = "async-std-experimental")]
use futures::channel::oneshot::{channel, Receiver};
#[cfg(feature = "tokio")]
use tokio::{
sync::oneshot::{channel, Receiver},
task::{spawn_blocking, JoinHandle},
};
use super::token::{Token, TokenType};
static TIMINGS: Lazy<Mutex<HashMap<Token, AdaptiveState>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
#[derive(Clone, Copy)]
enum AdaptiveState {
Inline,
Spawn,
}
impl Default for AdaptiveState {
fn default() -> Self {
AdaptiveState::Inline
}
}
#[pin_project]
pub struct TimedBlockingFuture<O, F> {
fut: Option<F>,
token: Token,
cutoff: Duration,
inner: Option<JoinHandle<O>>,
wakeup: Option<Receiver<()>>,
}
impl<O, F: FnOnce() -> O> TimedBlockingFuture<O, F> {
pub fn new(token: Token, cutoff: Duration, future: F) -> Self {
TimedBlockingFuture {
fut: Some(future),
cutoff,
token,
inner: None,
wakeup: None,
}
}
}
fn track_and_run<O, F: FnOnce() -> O>(token: Token, cutoff: Duration, f: F) -> O {
let now = Instant::now();
let ret = f();
if now.elapsed() > cutoff {
TIMINGS.lock().insert(token, AdaptiveState::Spawn);
} else {
TIMINGS.lock().insert(token, AdaptiveState::Inline);
}
ret
}
impl<O: Send + 'static, F: FnOnce() -> O + Send + 'static> Future for TimedBlockingFuture<O, F> {
type Output = O;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
loop {
match this.fut.take() {
Some(f) => {
let state = match this.token.0 {
TokenType::AlwaysInline => AdaptiveState::Inline,
TokenType::AlwaysSpawn => AdaptiveState::Spawn,
// Need to drop the lock before entering the `track_and_run` section
_ => *(*TIMINGS.lock()).entry(*this.token).or_default(),
};
match state {
AdaptiveState::Inline => {
// Just run it inline
return Poll::Ready(track_and_run(*this.token, *this.cutoff, f));
}
AdaptiveState::Spawn => {
// Spawn the blocking task
let (tx, rx) = channel();
let jh = {
let token = *this.token;
let cutoff = *this.cutoff;
spawn_blocking(move || {
let ret = track_and_run(token, cutoff, f);
// Panic's cause tx to be dropped which will wake the
// Reciever
let _ = tx.send(());
ret
})
};
// Store the reciever to poll later
*this.wakeup = Some(rx);
// Store the handle to the spawned blocking task, to join later
*this.inner = Some(jh);
// TODO(guswynn): This is hacky, I should just make a waker myself,
// but making sure I wake when tx is dropped is non-trivial
//
// TODO(guswynn): figure out how to control this in cfg(test):
// std::thread::sleep(Duration::from_secs(1));
//
// Polling the Reciever registers the task's waker
// If the blocking thread is working fast, we may immediately
// see that we are ready. We aren't just wrapping around as an
// optimization: We must read the value off the JoinHandle
// as we no longer have any way to recieve wakeups.
match Pin::new(this.wakeup.as_mut().unwrap()).poll(cx) {
Poll::Ready(_) => {
*this.wakeup = None;
continue;
}
Poll::Pending => {}
}
return Poll::Pending;
}
}
}
None => {
let jh = this.inner.as_mut().expect("re-polled a Ready Future");
// Re-register the waker if its still possible
// If we got a ready, we need to busy poll so we can get join handle
// value signalled by this channel being ready. We MUST busy poll;
// its not an optimization: once we consume our last possible wakeup,
// we need to get the value out of the join handle
//
// TODO(guswynn): catch_unwind in the closure yourself and send the value
// from the JoinHandle?
let busy_poll = match this.wakeup.as_mut() {
Some(rx) => match Pin::new(rx).poll(cx) {
Poll::Ready(_) => {
// Like above, we don't care about the value,
// as joining the handle below is what we want.
*this.wakeup = None;
true
}
Poll::Pending => {
// Explicit false, as our waker it correctly registered in the
// Reciever
false
}
},
None => {
// an unset reciever means we need to continue to loop
// until the join handle gives us the value
true
}
};
match Pin::new(jh).poll(cx) {
#[cfg(feature = "async-std-experimental")]
Poll::Ready(val) => return Poll::Ready(val),
#[cfg(feature = "tokio")]
Poll::Ready(Ok(val)) => return Poll::Ready(val),
#[cfg(feature = "tokio")]
Poll::Ready(Err(e)) => match e.try_into_panic() {
Ok(panic) => {
std::panic::resume_unwind(panic);
}
Err(_) => {
// Task is shutdown so we just pend:
// We never abort the sub-task ourselves, so something
// else is shutting everything else down, and the task
// polling us will likely be shutting down as well
// TODO(guswynn): figure out a way to test this
return Poll::Pending;
}
},
Poll::Pending if busy_poll => continue,
Poll::Pending => return Poll::Pending,
}
}
}
}
}
}