async_io/
driver.rs

1use std::cell::{Cell, RefCell};
2use std::future::Future;
3use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::task::Waker;
6use std::task::{Context, Poll};
7use std::thread;
8use std::time::{Duration, Instant};
9
10use async_lock::OnceCell;
11use futures_lite::pin;
12use parking::Parker;
13
14use crate::reactor::Reactor;
15
16/// Number of currently active `block_on()` invocations.
17static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0);
18
19/// Unparker for the "async-io" thread.
20fn unparker() -> &'static parking::Unparker {
21    static UNPARKER: OnceCell<parking::Unparker> = OnceCell::new();
22
23    UNPARKER.get_or_init_blocking(|| {
24        let (parker, unparker) = parking::pair();
25
26        // Spawn a helper thread driving the reactor.
27        //
28        // Note that this thread is not exactly necessary, it's only here to help push things
29        // forward if there are no `Parker`s around or if `Parker`s are just idling and never
30        // parking.
31        thread::Builder::new()
32            .name("async-io".to_string())
33            .spawn(move || main_loop(parker))
34            .expect("cannot spawn async-io thread");
35
36        unparker
37    })
38}
39
40/// Initializes the "async-io" thread.
41pub(crate) fn init() {
42    let _ = unparker();
43}
44
45/// The main loop for the "async-io" thread.
46fn main_loop(parker: parking::Parker) {
47    #[cfg(feature = "tracing")]
48    let span = tracing::trace_span!("async_io::main_loop");
49    #[cfg(feature = "tracing")]
50    let _enter = span.enter();
51
52    // The last observed reactor tick.
53    let mut last_tick = 0;
54    // Number of sleeps since this thread has called `react()`.
55    let mut sleeps = 0u64;
56
57    loop {
58        let tick = Reactor::get().ticker();
59
60        if last_tick == tick {
61            let reactor_lock = if sleeps >= 10 {
62                // If no new ticks have occurred for a while, stop sleeping and spinning in
63                // this loop and just block on the reactor lock.
64                Some(Reactor::get().lock())
65            } else {
66                Reactor::get().try_lock()
67            };
68
69            if let Some(mut reactor_lock) = reactor_lock {
70                #[cfg(feature = "tracing")]
71                tracing::trace!("waiting on I/O");
72                reactor_lock.react(None).ok();
73                last_tick = Reactor::get().ticker();
74                sleeps = 0;
75            }
76        } else {
77            last_tick = tick;
78        }
79
80        if BLOCK_ON_COUNT.load(Ordering::SeqCst) > 0 {
81            // Exponential backoff from 50us to 10ms.
82            let delay_us = [50, 75, 100, 250, 500, 750, 1000, 2500, 5000]
83                .get(sleeps as usize)
84                .unwrap_or(&10_000);
85
86            #[cfg(feature = "tracing")]
87            tracing::trace!("sleeping for {} us", delay_us);
88            if parker.park_timeout(Duration::from_micros(*delay_us)) {
89                #[cfg(feature = "tracing")]
90                tracing::trace!("notified");
91
92                // If notified before timeout, reset the last tick and the sleep counter.
93                last_tick = Reactor::get().ticker();
94                sleeps = 0;
95            } else {
96                sleeps += 1;
97            }
98        }
99    }
100}
101
102/// Blocks the current thread on a future, processing I/O events when idle.
103///
104/// # Examples
105///
106/// ```
107/// use async_io::Timer;
108/// use std::time::Duration;
109///
110/// async_io::block_on(async {
111///     // This timer will likely be processed by the current
112///     // thread rather than the fallback "async-io" thread.
113///     Timer::after(Duration::from_millis(1)).await;
114/// });
115/// ```
116pub fn block_on<T>(future: impl Future<Output = T>) -> T {
117    #[cfg(feature = "tracing")]
118    let span = tracing::trace_span!("async_io::block_on");
119    #[cfg(feature = "tracing")]
120    let _enter = span.enter();
121
122    // Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive.
123    BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst);
124
125    // Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread.
126    let _guard = CallOnDrop(|| {
127        BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst);
128        unparker().unpark();
129    });
130
131    // Creates a parker and an associated waker that unparks it.
132    fn parker_and_waker() -> (Parker, Waker, Arc<AtomicBool>) {
133        // Parker and unparker for notifying the current thread.
134        let (p, u) = parking::pair();
135
136        // This boolean is set to `true` when the current thread is blocked on I/O.
137        let io_blocked = Arc::new(AtomicBool::new(false));
138
139        // Prepare the waker.
140        let waker = BlockOnWaker::create(io_blocked.clone(), u);
141
142        (p, waker, io_blocked)
143    }
144
145    thread_local! {
146        // Cached parker and waker for efficiency.
147        static CACHE: RefCell<(Parker, Waker, Arc<AtomicBool>)> = RefCell::new(parker_and_waker());
148
149        // Indicates that the current thread is polling I/O, but not necessarily blocked on it.
150        static IO_POLLING: Cell<bool> = const { Cell::new(false) };
151    }
152
153    struct BlockOnWaker {
154        io_blocked: Arc<AtomicBool>,
155        unparker: parking::Unparker,
156    }
157
158    impl BlockOnWaker {
159        fn create(io_blocked: Arc<AtomicBool>, unparker: parking::Unparker) -> Waker {
160            Waker::from(Arc::new(BlockOnWaker {
161                io_blocked,
162                unparker,
163            }))
164        }
165    }
166
167    impl std::task::Wake for BlockOnWaker {
168        fn wake_by_ref(self: &Arc<Self>) {
169            if self.unparker.unpark() {
170                // Check if waking from another thread and if currently blocked on I/O.
171                if !IO_POLLING.with(Cell::get) && self.io_blocked.load(Ordering::SeqCst) {
172                    Reactor::get().notify();
173                }
174            }
175        }
176
177        fn wake(self: Arc<Self>) {
178            self.wake_by_ref()
179        }
180    }
181
182    CACHE.with(|cache| {
183        // Try grabbing the cached parker and waker.
184        let tmp_cached;
185        let tmp_fresh;
186        let (p, waker, io_blocked) = match cache.try_borrow_mut() {
187            Ok(cache) => {
188                // Use the cached parker and waker.
189                tmp_cached = cache;
190                &*tmp_cached
191            }
192            Err(_) => {
193                // Looks like this is a recursive `block_on()` call.
194                // Create a fresh parker and waker.
195                tmp_fresh = parker_and_waker();
196                &tmp_fresh
197            }
198        };
199
200        pin!(future);
201
202        let cx = &mut Context::from_waker(waker);
203
204        loop {
205            // Poll the future.
206            if let Poll::Ready(t) = future.as_mut().poll(cx) {
207                // Ensure the cached parker is reset to the unnotified state for future block_on calls,
208                // in case this future called wake and then immediately returned Poll::Ready.
209                p.park_timeout(Duration::from_secs(0));
210                #[cfg(feature = "tracing")]
211                tracing::trace!("completed");
212                return t;
213            }
214
215            // Check if a notification was received.
216            if p.park_timeout(Duration::from_secs(0)) {
217                #[cfg(feature = "tracing")]
218                tracing::trace!("notified");
219
220                // Try grabbing a lock on the reactor to process I/O events.
221                if let Some(mut reactor_lock) = Reactor::get().try_lock() {
222                    // First let wakers know this parker is processing I/O events.
223                    IO_POLLING.with(|io| io.set(true));
224                    let _guard = CallOnDrop(|| {
225                        IO_POLLING.with(|io| io.set(false));
226                    });
227
228                    // Process available I/O events.
229                    reactor_lock.react(Some(Duration::from_secs(0))).ok();
230                }
231                continue;
232            }
233
234            // Try grabbing a lock on the reactor to wait on I/O.
235            if let Some(mut reactor_lock) = Reactor::get().try_lock() {
236                // Record the instant at which the lock was grabbed.
237                let start = Instant::now();
238
239                loop {
240                    // First let wakers know this parker is blocked on I/O.
241                    IO_POLLING.with(|io| io.set(true));
242                    io_blocked.store(true, Ordering::SeqCst);
243                    let _guard = CallOnDrop(|| {
244                        IO_POLLING.with(|io| io.set(false));
245                        io_blocked.store(false, Ordering::SeqCst);
246                    });
247
248                    // Check if a notification has been received before `io_blocked` was updated
249                    // because in that case the reactor won't receive a wakeup.
250                    if p.park_timeout(Duration::from_secs(0)) {
251                        #[cfg(feature = "tracing")]
252                        tracing::trace!("notified");
253                        break;
254                    }
255
256                    // Wait for I/O events.
257                    #[cfg(feature = "tracing")]
258                    tracing::trace!("waiting on I/O");
259                    reactor_lock.react(None).ok();
260
261                    // Check if a notification has been received.
262                    if p.park_timeout(Duration::from_secs(0)) {
263                        #[cfg(feature = "tracing")]
264                        tracing::trace!("notified");
265                        break;
266                    }
267
268                    // Check if this thread been handling I/O events for a long time.
269                    if start.elapsed() > Duration::from_micros(500) {
270                        #[cfg(feature = "tracing")]
271                        tracing::trace!("stops hogging the reactor");
272
273                        // This thread is clearly processing I/O events for some other threads
274                        // because it didn't get a notification yet. It's best to stop hogging the
275                        // reactor and give other threads a chance to process I/O events for
276                        // themselves.
277                        drop(reactor_lock);
278
279                        // Unpark the "async-io" thread in case no other thread is ready to start
280                        // processing I/O events. This way we prevent a potential latency spike.
281                        unparker().unpark();
282
283                        // Wait for a notification.
284                        p.park();
285                        break;
286                    }
287                }
288            } else {
289                // Wait for an actual notification.
290                #[cfg(feature = "tracing")]
291                tracing::trace!("sleep until notification");
292                p.park();
293            }
294        }
295    })
296}
297
298/// Runs a closure when dropped.
299struct CallOnDrop<F: Fn()>(F);
300
301impl<F: Fn()> Drop for CallOnDrop<F> {
302    fn drop(&mut self) {
303        (self.0)();
304    }
305}