async_io/driver.rs
1use std::cell::{Cell, RefCell};
2use std::future::Future;
3use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::task::Waker;
6use std::task::{Context, Poll};
7use std::thread;
8use std::time::{Duration, Instant};
9
10use async_lock::OnceCell;
11use futures_lite::pin;
12use parking::Parker;
13
14use crate::reactor::Reactor;
15
16/// Number of currently active `block_on()` invocations.
17static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0);
18
19/// Unparker for the "async-io" thread.
20fn unparker() -> &'static parking::Unparker {
21 static UNPARKER: OnceCell<parking::Unparker> = OnceCell::new();
22
23 UNPARKER.get_or_init_blocking(|| {
24 let (parker, unparker) = parking::pair();
25
26 // Spawn a helper thread driving the reactor.
27 //
28 // Note that this thread is not exactly necessary, it's only here to help push things
29 // forward if there are no `Parker`s around or if `Parker`s are just idling and never
30 // parking.
31 thread::Builder::new()
32 .name("async-io".to_string())
33 .spawn(move || main_loop(parker))
34 .expect("cannot spawn async-io thread");
35
36 unparker
37 })
38}
39
40/// Initializes the "async-io" thread.
41pub(crate) fn init() {
42 let _ = unparker();
43}
44
45/// The main loop for the "async-io" thread.
46fn main_loop(parker: parking::Parker) {
47 #[cfg(feature = "tracing")]
48 let span = tracing::trace_span!("async_io::main_loop");
49 #[cfg(feature = "tracing")]
50 let _enter = span.enter();
51
52 // The last observed reactor tick.
53 let mut last_tick = 0;
54 // Number of sleeps since this thread has called `react()`.
55 let mut sleeps = 0u64;
56
57 loop {
58 let tick = Reactor::get().ticker();
59
60 if last_tick == tick {
61 let reactor_lock = if sleeps >= 10 {
62 // If no new ticks have occurred for a while, stop sleeping and spinning in
63 // this loop and just block on the reactor lock.
64 Some(Reactor::get().lock())
65 } else {
66 Reactor::get().try_lock()
67 };
68
69 if let Some(mut reactor_lock) = reactor_lock {
70 #[cfg(feature = "tracing")]
71 tracing::trace!("waiting on I/O");
72 reactor_lock.react(None).ok();
73 last_tick = Reactor::get().ticker();
74 sleeps = 0;
75 }
76 } else {
77 last_tick = tick;
78 }
79
80 if BLOCK_ON_COUNT.load(Ordering::SeqCst) > 0 {
81 // Exponential backoff from 50us to 10ms.
82 let delay_us = [50, 75, 100, 250, 500, 750, 1000, 2500, 5000]
83 .get(sleeps as usize)
84 .unwrap_or(&10_000);
85
86 #[cfg(feature = "tracing")]
87 tracing::trace!("sleeping for {} us", delay_us);
88 if parker.park_timeout(Duration::from_micros(*delay_us)) {
89 #[cfg(feature = "tracing")]
90 tracing::trace!("notified");
91
92 // If notified before timeout, reset the last tick and the sleep counter.
93 last_tick = Reactor::get().ticker();
94 sleeps = 0;
95 } else {
96 sleeps += 1;
97 }
98 }
99 }
100}
101
102/// Blocks the current thread on a future, processing I/O events when idle.
103///
104/// # Examples
105///
106/// ```
107/// use async_io::Timer;
108/// use std::time::Duration;
109///
110/// async_io::block_on(async {
111/// // This timer will likely be processed by the current
112/// // thread rather than the fallback "async-io" thread.
113/// Timer::after(Duration::from_millis(1)).await;
114/// });
115/// ```
116pub fn block_on<T>(future: impl Future<Output = T>) -> T {
117 #[cfg(feature = "tracing")]
118 let span = tracing::trace_span!("async_io::block_on");
119 #[cfg(feature = "tracing")]
120 let _enter = span.enter();
121
122 // Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive.
123 BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst);
124
125 // Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread.
126 let _guard = CallOnDrop(|| {
127 BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst);
128 unparker().unpark();
129 });
130
131 // Creates a parker and an associated waker that unparks it.
132 fn parker_and_waker() -> (Parker, Waker, Arc<AtomicBool>) {
133 // Parker and unparker for notifying the current thread.
134 let (p, u) = parking::pair();
135
136 // This boolean is set to `true` when the current thread is blocked on I/O.
137 let io_blocked = Arc::new(AtomicBool::new(false));
138
139 // Prepare the waker.
140 let waker = BlockOnWaker::create(io_blocked.clone(), u);
141
142 (p, waker, io_blocked)
143 }
144
145 thread_local! {
146 // Cached parker and waker for efficiency.
147 static CACHE: RefCell<(Parker, Waker, Arc<AtomicBool>)> = RefCell::new(parker_and_waker());
148
149 // Indicates that the current thread is polling I/O, but not necessarily blocked on it.
150 static IO_POLLING: Cell<bool> = const { Cell::new(false) };
151 }
152
153 struct BlockOnWaker {
154 io_blocked: Arc<AtomicBool>,
155 unparker: parking::Unparker,
156 }
157
158 impl BlockOnWaker {
159 fn create(io_blocked: Arc<AtomicBool>, unparker: parking::Unparker) -> Waker {
160 Waker::from(Arc::new(BlockOnWaker {
161 io_blocked,
162 unparker,
163 }))
164 }
165 }
166
167 impl std::task::Wake for BlockOnWaker {
168 fn wake_by_ref(self: &Arc<Self>) {
169 if self.unparker.unpark() {
170 // Check if waking from another thread and if currently blocked on I/O.
171 if !IO_POLLING.with(Cell::get) && self.io_blocked.load(Ordering::SeqCst) {
172 Reactor::get().notify();
173 }
174 }
175 }
176
177 fn wake(self: Arc<Self>) {
178 self.wake_by_ref()
179 }
180 }
181
182 CACHE.with(|cache| {
183 // Try grabbing the cached parker and waker.
184 let tmp_cached;
185 let tmp_fresh;
186 let (p, waker, io_blocked) = match cache.try_borrow_mut() {
187 Ok(cache) => {
188 // Use the cached parker and waker.
189 tmp_cached = cache;
190 &*tmp_cached
191 }
192 Err(_) => {
193 // Looks like this is a recursive `block_on()` call.
194 // Create a fresh parker and waker.
195 tmp_fresh = parker_and_waker();
196 &tmp_fresh
197 }
198 };
199
200 pin!(future);
201
202 let cx = &mut Context::from_waker(waker);
203
204 loop {
205 // Poll the future.
206 if let Poll::Ready(t) = future.as_mut().poll(cx) {
207 // Ensure the cached parker is reset to the unnotified state for future block_on calls,
208 // in case this future called wake and then immediately returned Poll::Ready.
209 p.park_timeout(Duration::from_secs(0));
210 #[cfg(feature = "tracing")]
211 tracing::trace!("completed");
212 return t;
213 }
214
215 // Check if a notification was received.
216 if p.park_timeout(Duration::from_secs(0)) {
217 #[cfg(feature = "tracing")]
218 tracing::trace!("notified");
219
220 // Try grabbing a lock on the reactor to process I/O events.
221 if let Some(mut reactor_lock) = Reactor::get().try_lock() {
222 // First let wakers know this parker is processing I/O events.
223 IO_POLLING.with(|io| io.set(true));
224 let _guard = CallOnDrop(|| {
225 IO_POLLING.with(|io| io.set(false));
226 });
227
228 // Process available I/O events.
229 reactor_lock.react(Some(Duration::from_secs(0))).ok();
230 }
231 continue;
232 }
233
234 // Try grabbing a lock on the reactor to wait on I/O.
235 if let Some(mut reactor_lock) = Reactor::get().try_lock() {
236 // Record the instant at which the lock was grabbed.
237 let start = Instant::now();
238
239 loop {
240 // First let wakers know this parker is blocked on I/O.
241 IO_POLLING.with(|io| io.set(true));
242 io_blocked.store(true, Ordering::SeqCst);
243 let _guard = CallOnDrop(|| {
244 IO_POLLING.with(|io| io.set(false));
245 io_blocked.store(false, Ordering::SeqCst);
246 });
247
248 // Check if a notification has been received before `io_blocked` was updated
249 // because in that case the reactor won't receive a wakeup.
250 if p.park_timeout(Duration::from_secs(0)) {
251 #[cfg(feature = "tracing")]
252 tracing::trace!("notified");
253 break;
254 }
255
256 // Wait for I/O events.
257 #[cfg(feature = "tracing")]
258 tracing::trace!("waiting on I/O");
259 reactor_lock.react(None).ok();
260
261 // Check if a notification has been received.
262 if p.park_timeout(Duration::from_secs(0)) {
263 #[cfg(feature = "tracing")]
264 tracing::trace!("notified");
265 break;
266 }
267
268 // Check if this thread been handling I/O events for a long time.
269 if start.elapsed() > Duration::from_micros(500) {
270 #[cfg(feature = "tracing")]
271 tracing::trace!("stops hogging the reactor");
272
273 // This thread is clearly processing I/O events for some other threads
274 // because it didn't get a notification yet. It's best to stop hogging the
275 // reactor and give other threads a chance to process I/O events for
276 // themselves.
277 drop(reactor_lock);
278
279 // Unpark the "async-io" thread in case no other thread is ready to start
280 // processing I/O events. This way we prevent a potential latency spike.
281 unparker().unpark();
282
283 // Wait for a notification.
284 p.park();
285 break;
286 }
287 }
288 } else {
289 // Wait for an actual notification.
290 #[cfg(feature = "tracing")]
291 tracing::trace!("sleep until notification");
292 p.park();
293 }
294 }
295 })
296}
297
298/// Runs a closure when dropped.
299struct CallOnDrop<F: Fn()>(F);
300
301impl<F: Fn()> Drop for CallOnDrop<F> {
302 fn drop(&mut self) {
303 (self.0)();
304 }
305}