async_io/
driver.rs

1use std::cell::{Cell, RefCell};
2use std::future::Future;
3use std::pin::pin;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, OnceLock};
6use std::task::{Context, Poll, Waker};
7use std::thread;
8use std::time::{Duration, Instant};
9
10use parking::Parker;
11
12use crate::reactor::Reactor;
13
14/// Number of currently active `block_on()` invocations.
15static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0);
16
17/// Unparker for the "async-io" thread.
18fn unparker() -> &'static parking::Unparker {
19    static UNPARKER: OnceLock<parking::Unparker> = OnceLock::new();
20
21    UNPARKER.get_or_init(|| {
22        let (parker, unparker) = parking::pair();
23
24        // Spawn a helper thread driving the reactor.
25        //
26        // Note that this thread is not exactly necessary, it's only here to help push things
27        // forward if there are no `Parker`s around or if `Parker`s are just idling and never
28        // parking.
29        thread::Builder::new()
30            .name("async-io".to_string())
31            .spawn(move || main_loop(parker))
32            .expect("cannot spawn async-io thread");
33
34        unparker
35    })
36}
37
38/// Initializes the "async-io" thread.
39pub(crate) fn init() {
40    let _ = unparker();
41}
42
43/// The main loop for the "async-io" thread.
44fn main_loop(parker: parking::Parker) {
45    #[cfg(feature = "tracing")]
46    let span = tracing::trace_span!("async_io::main_loop");
47    #[cfg(feature = "tracing")]
48    let _enter = span.enter();
49
50    // The last observed reactor tick.
51    let mut last_tick = 0;
52    // Number of sleeps since this thread has called `react()`.
53    let mut sleeps = 0u64;
54
55    loop {
56        let tick = Reactor::get().ticker();
57
58        if last_tick == tick {
59            let reactor_lock = if sleeps >= 10 {
60                // If no new ticks have occurred for a while, stop sleeping and spinning in
61                // this loop and just block on the reactor lock.
62                Some(Reactor::get().lock())
63            } else {
64                Reactor::get().try_lock()
65            };
66
67            if let Some(mut reactor_lock) = reactor_lock {
68                #[cfg(feature = "tracing")]
69                tracing::trace!("waiting on I/O");
70                reactor_lock.react(None).ok();
71                last_tick = Reactor::get().ticker();
72                sleeps = 0;
73            }
74        } else {
75            last_tick = tick;
76        }
77
78        if BLOCK_ON_COUNT.load(Ordering::SeqCst) > 0 {
79            // Exponential backoff from 50us to 10ms.
80            let delay_us = [50, 75, 100, 250, 500, 750, 1000, 2500, 5000]
81                .get(sleeps as usize)
82                .unwrap_or(&10_000);
83
84            #[cfg(feature = "tracing")]
85            tracing::trace!("sleeping for {} us", delay_us);
86            if parker.park_timeout(Duration::from_micros(*delay_us)) {
87                #[cfg(feature = "tracing")]
88                tracing::trace!("notified");
89
90                // If notified before timeout, reset the last tick and the sleep counter.
91                last_tick = Reactor::get().ticker();
92                sleeps = 0;
93            } else {
94                sleeps += 1;
95            }
96        }
97    }
98}
99
100/// Blocks the current thread on a future, processing I/O events when idle.
101///
102/// # Examples
103///
104/// ```
105/// use async_io::Timer;
106/// use std::time::Duration;
107///
108/// async_io::block_on(async {
109///     // This timer will likely be processed by the current
110///     // thread rather than the fallback "async-io" thread.
111///     Timer::after(Duration::from_millis(1)).await;
112/// });
113/// ```
114pub fn block_on<T>(future: impl Future<Output = T>) -> T {
115    #[cfg(feature = "tracing")]
116    let span = tracing::trace_span!("async_io::block_on");
117    #[cfg(feature = "tracing")]
118    let _enter = span.enter();
119
120    // Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive.
121    BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst);
122
123    // Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread.
124    let _guard = CallOnDrop(|| {
125        BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst);
126        unparker().unpark();
127    });
128
129    // Creates a parker and an associated waker that unparks it.
130    fn parker_and_waker() -> (Parker, Waker, Arc<AtomicBool>) {
131        // Parker and unparker for notifying the current thread.
132        let (p, u) = parking::pair();
133
134        // This boolean is set to `true` when the current thread is blocked on I/O.
135        let io_blocked = Arc::new(AtomicBool::new(false));
136
137        // Prepare the waker.
138        let waker = BlockOnWaker::create(io_blocked.clone(), u);
139
140        (p, waker, io_blocked)
141    }
142
143    thread_local! {
144        // Cached parker and waker for efficiency.
145        static CACHE: RefCell<(Parker, Waker, Arc<AtomicBool>)> = RefCell::new(parker_and_waker());
146
147        // Indicates that the current thread is polling I/O, but not necessarily blocked on it.
148        static IO_POLLING: Cell<bool> = const { Cell::new(false) };
149    }
150
151    struct BlockOnWaker {
152        io_blocked: Arc<AtomicBool>,
153        unparker: parking::Unparker,
154    }
155
156    impl BlockOnWaker {
157        fn create(io_blocked: Arc<AtomicBool>, unparker: parking::Unparker) -> Waker {
158            Waker::from(Arc::new(BlockOnWaker {
159                io_blocked,
160                unparker,
161            }))
162        }
163    }
164
165    impl std::task::Wake for BlockOnWaker {
166        fn wake_by_ref(self: &Arc<Self>) {
167            if self.unparker.unpark() {
168                // Check if waking from another thread and if currently blocked on I/O.
169                if !IO_POLLING.with(Cell::get) && self.io_blocked.load(Ordering::SeqCst) {
170                    Reactor::get().notify();
171                }
172            }
173        }
174
175        fn wake(self: Arc<Self>) {
176            self.wake_by_ref()
177        }
178    }
179
180    CACHE.with(|cache| {
181        // Try grabbing the cached parker and waker.
182        let tmp_cached;
183        let tmp_fresh;
184        let (p, waker, io_blocked) = match cache.try_borrow_mut() {
185            Ok(cache) => {
186                // Use the cached parker and waker.
187                tmp_cached = cache;
188                &*tmp_cached
189            }
190            Err(_) => {
191                // Looks like this is a recursive `block_on()` call.
192                // Create a fresh parker and waker.
193                tmp_fresh = parker_and_waker();
194                &tmp_fresh
195            }
196        };
197
198        let mut future = pin!(future);
199
200        let cx = &mut Context::from_waker(waker);
201
202        loop {
203            // Poll the future.
204            if let Poll::Ready(t) = future.as_mut().poll(cx) {
205                // Ensure the cached parker is reset to the unnotified state for future block_on calls,
206                // in case this future called wake and then immediately returned Poll::Ready.
207                p.park_timeout(Duration::from_secs(0));
208                #[cfg(feature = "tracing")]
209                tracing::trace!("completed");
210                return t;
211            }
212
213            // Check if a notification was received.
214            if p.park_timeout(Duration::from_secs(0)) {
215                #[cfg(feature = "tracing")]
216                tracing::trace!("notified");
217
218                // Try grabbing a lock on the reactor to process I/O events.
219                if let Some(mut reactor_lock) = Reactor::get().try_lock() {
220                    // First let wakers know this parker is processing I/O events.
221                    IO_POLLING.with(|io| io.set(true));
222                    let _guard = CallOnDrop(|| {
223                        IO_POLLING.with(|io| io.set(false));
224                    });
225
226                    // Process available I/O events.
227                    reactor_lock.react(Some(Duration::from_secs(0))).ok();
228                }
229                continue;
230            }
231
232            // Try grabbing a lock on the reactor to wait on I/O.
233            if let Some(mut reactor_lock) = Reactor::get().try_lock() {
234                // Record the instant at which the lock was grabbed.
235                let start = Instant::now();
236
237                loop {
238                    // First let wakers know this parker is blocked on I/O.
239                    IO_POLLING.with(|io| io.set(true));
240                    io_blocked.store(true, Ordering::SeqCst);
241                    let _guard = CallOnDrop(|| {
242                        IO_POLLING.with(|io| io.set(false));
243                        io_blocked.store(false, Ordering::SeqCst);
244                    });
245
246                    // Check if a notification has been received before `io_blocked` was updated
247                    // because in that case the reactor won't receive a wakeup.
248                    if p.park_timeout(Duration::from_secs(0)) {
249                        #[cfg(feature = "tracing")]
250                        tracing::trace!("notified");
251                        break;
252                    }
253
254                    // Wait for I/O events.
255                    #[cfg(feature = "tracing")]
256                    tracing::trace!("waiting on I/O");
257                    reactor_lock.react(None).ok();
258
259                    // Check if a notification has been received.
260                    if p.park_timeout(Duration::from_secs(0)) {
261                        #[cfg(feature = "tracing")]
262                        tracing::trace!("notified");
263                        break;
264                    }
265
266                    // Check if this thread been handling I/O events for a long time.
267                    if start.elapsed() > Duration::from_micros(500) {
268                        #[cfg(feature = "tracing")]
269                        tracing::trace!("stops hogging the reactor");
270
271                        // This thread is clearly processing I/O events for some other threads
272                        // because it didn't get a notification yet. It's best to stop hogging the
273                        // reactor and give other threads a chance to process I/O events for
274                        // themselves.
275                        drop(reactor_lock);
276
277                        // Unpark the "async-io" thread in case no other thread is ready to start
278                        // processing I/O events. This way we prevent a potential latency spike.
279                        unparker().unpark();
280
281                        // Wait for a notification.
282                        p.park();
283                        break;
284                    }
285                }
286            } else {
287                // Wait for an actual notification.
288                #[cfg(feature = "tracing")]
289                tracing::trace!("sleep until notification");
290                p.park();
291            }
292        }
293    })
294}
295
296/// Runs a closure when dropped.
297struct CallOnDrop<F: Fn()>(F);
298
299impl<F: Fn()> Drop for CallOnDrop<F> {
300    fn drop(&mut self) {
301        (self.0)();
302    }
303}