async_io/
reactor.rs

1use std::borrow::Borrow;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::future::Future;
5use std::io;
6use std::marker::PhantomData;
7use std::mem;
8use std::panic;
9use std::pin::Pin;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, Mutex, MutexGuard};
12use std::task::{Context, Poll, Waker};
13use std::time::{Duration, Instant};
14
15use async_lock::OnceCell;
16use concurrent_queue::ConcurrentQueue;
17use futures_lite::ready;
18use polling::{Event, Events, Poller};
19use slab::Slab;
20
21// Choose the proper implementation of `Registration` based on the target platform.
22cfg_if::cfg_if! {
23    if #[cfg(windows)] {
24        mod windows;
25        pub use windows::Registration;
26    } else if #[cfg(any(
27        target_vendor = "apple",
28        target_os = "freebsd",
29        target_os = "netbsd",
30        target_os = "openbsd",
31        target_os = "dragonfly",
32    ))] {
33        mod kqueue;
34        pub use kqueue::Registration;
35    } else if #[cfg(unix)] {
36        mod unix;
37        pub use unix::Registration;
38    } else {
39        compile_error!("unsupported platform");
40    }
41}
42
43#[cfg(not(target_os = "espidf"))]
44const TIMER_QUEUE_SIZE: usize = 1000;
45
46/// ESP-IDF - being an embedded OS - does not need so many timers
47/// and this saves ~ 20K RAM which is a lot for an MCU with RAM < 400K
48#[cfg(target_os = "espidf")]
49const TIMER_QUEUE_SIZE: usize = 100;
50
51const READ: usize = 0;
52const WRITE: usize = 1;
53
54/// The reactor.
55///
56/// There is only one global instance of this type, accessible by [`Reactor::get()`].
57pub(crate) struct Reactor {
58    /// Portable bindings to epoll/kqueue/event ports/IOCP.
59    ///
60    /// This is where I/O is polled, producing I/O events.
61    pub(crate) poller: Poller,
62
63    /// Ticker bumped before polling.
64    ///
65    /// This is useful for checking what is the current "round" of `ReactorLock::react()` when
66    /// synchronizing things in `Source::readable()` and `Source::writable()`. Both of those
67    /// methods must make sure they don't receive stale I/O events - they only accept events from a
68    /// fresh "round" of `ReactorLock::react()`.
69    ticker: AtomicUsize,
70
71    /// Registered sources.
72    sources: Mutex<Slab<Arc<Source>>>,
73
74    /// Temporary storage for I/O events when polling the reactor.
75    ///
76    /// Holding a lock on this event list implies the exclusive right to poll I/O.
77    events: Mutex<Events>,
78
79    /// An ordered map of registered timers.
80    ///
81    /// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to
82    /// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the
83    /// timer.
84    timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
85
86    /// A queue of timer operations (insert and remove).
87    ///
88    /// When inserting or removing a timer, we don't process it immediately - we just push it into
89    /// this queue. Timers actually get processed when the queue fills up or the reactor is polled.
90    timer_ops: ConcurrentQueue<TimerOp>,
91}
92
93impl Reactor {
94    /// Returns a reference to the reactor.
95    pub(crate) fn get() -> &'static Reactor {
96        static REACTOR: OnceCell<Reactor> = OnceCell::new();
97
98        REACTOR.get_or_init_blocking(|| {
99            crate::driver::init();
100            Reactor {
101                poller: Poller::new().expect("cannot initialize I/O event notification"),
102                ticker: AtomicUsize::new(0),
103                sources: Mutex::new(Slab::new()),
104                events: Mutex::new(Events::new()),
105                timers: Mutex::new(BTreeMap::new()),
106                timer_ops: ConcurrentQueue::bounded(TIMER_QUEUE_SIZE),
107            }
108        })
109    }
110
111    /// Returns the current ticker.
112    pub(crate) fn ticker(&self) -> usize {
113        self.ticker.load(Ordering::SeqCst)
114    }
115
116    /// Registers an I/O source in the reactor.
117    pub(crate) fn insert_io(&self, raw: Registration) -> io::Result<Arc<Source>> {
118        // Create an I/O source for this file descriptor.
119        let source = {
120            let mut sources = self.sources.lock().unwrap();
121            let key = sources.vacant_entry().key();
122            let source = Arc::new(Source {
123                registration: raw,
124                key,
125                state: Default::default(),
126            });
127            sources.insert(source.clone());
128            source
129        };
130
131        // Register the file descriptor.
132        if let Err(err) = source.registration.add(&self.poller, source.key) {
133            let mut sources = self.sources.lock().unwrap();
134            sources.remove(source.key);
135            return Err(err);
136        }
137
138        Ok(source)
139    }
140
141    /// Deregisters an I/O source from the reactor.
142    pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
143        let mut sources = self.sources.lock().unwrap();
144        sources.remove(source.key);
145        source.registration.delete(&self.poller)
146    }
147
148    /// Registers a timer in the reactor.
149    ///
150    /// Returns the inserted timer's ID.
151    pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
152        // Generate a new timer ID.
153        static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
154        let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
155
156        // Push an insert operation.
157        while self
158            .timer_ops
159            .push(TimerOp::Insert(when, id, waker.clone()))
160            .is_err()
161        {
162            // If the queue is full, drain it and try again.
163            let mut timers = self.timers.lock().unwrap();
164            self.process_timer_ops(&mut timers);
165        }
166
167        // Notify that a timer has been inserted.
168        self.notify();
169
170        id
171    }
172
173    /// Deregisters a timer from the reactor.
174    pub(crate) fn remove_timer(&self, when: Instant, id: usize) {
175        // Push a remove operation.
176        while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
177            // If the queue is full, drain it and try again.
178            let mut timers = self.timers.lock().unwrap();
179            self.process_timer_ops(&mut timers);
180        }
181    }
182
183    /// Notifies the thread blocked on the reactor.
184    pub(crate) fn notify(&self) {
185        self.poller.notify().expect("failed to notify reactor");
186    }
187
188    /// Locks the reactor, potentially blocking if the lock is held by another thread.
189    pub(crate) fn lock(&self) -> ReactorLock<'_> {
190        let reactor = self;
191        let events = self.events.lock().unwrap();
192        ReactorLock { reactor, events }
193    }
194
195    /// Attempts to lock the reactor.
196    pub(crate) fn try_lock(&self) -> Option<ReactorLock<'_>> {
197        self.events.try_lock().ok().map(|events| {
198            let reactor = self;
199            ReactorLock { reactor, events }
200        })
201    }
202
203    /// Processes ready timers and extends the list of wakers to wake.
204    ///
205    /// Returns the duration until the next timer before this method was called.
206    fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
207        #[cfg(feature = "tracing")]
208        let span = tracing::trace_span!("process_timers");
209        #[cfg(feature = "tracing")]
210        let _enter = span.enter();
211
212        let mut timers = self.timers.lock().unwrap();
213        self.process_timer_ops(&mut timers);
214
215        let now = Instant::now();
216
217        // Split timers into ready and pending timers.
218        //
219        // Careful to split just *after* `now`, so that a timer set for exactly `now` is considered
220        // ready.
221        let pending = timers.split_off(&(now + Duration::from_nanos(1), 0));
222        let ready = mem::replace(&mut *timers, pending);
223
224        // Calculate the duration until the next event.
225        let dur = if ready.is_empty() {
226            // Duration until the next timer.
227            timers
228                .keys()
229                .next()
230                .map(|(when, _)| when.saturating_duration_since(now))
231        } else {
232            // Timers are about to fire right now.
233            Some(Duration::from_secs(0))
234        };
235
236        // Drop the lock before waking.
237        drop(timers);
238
239        // Add wakers to the list.
240        #[cfg(feature = "tracing")]
241        tracing::trace!("{} ready wakers", ready.len());
242
243        for (_, waker) in ready {
244            wakers.push(waker);
245        }
246
247        dur
248    }
249
250    /// Processes queued timer operations.
251    fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) {
252        // Process only as much as fits into the queue, or else this loop could in theory run
253        // forever.
254        self.timer_ops
255            .try_iter()
256            .take(self.timer_ops.capacity().unwrap())
257            .for_each(|op| match op {
258                TimerOp::Insert(when, id, waker) => {
259                    timers.insert((when, id), waker);
260                }
261                TimerOp::Remove(when, id) => {
262                    timers.remove(&(when, id));
263                }
264            });
265    }
266}
267
268/// A lock on the reactor.
269pub(crate) struct ReactorLock<'a> {
270    reactor: &'a Reactor,
271    events: MutexGuard<'a, Events>,
272}
273
274impl ReactorLock<'_> {
275    /// Processes new events, blocking until the first event or the timeout.
276    pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
277        #[cfg(feature = "tracing")]
278        let span = tracing::trace_span!("react");
279        #[cfg(feature = "tracing")]
280        let _enter = span.enter();
281
282        let mut wakers = Vec::new();
283
284        // Process ready timers.
285        let next_timer = self.reactor.process_timers(&mut wakers);
286
287        // compute the timeout for blocking on I/O events.
288        let timeout = match (next_timer, timeout) {
289            (None, None) => None,
290            (Some(t), None) | (None, Some(t)) => Some(t),
291            (Some(a), Some(b)) => Some(a.min(b)),
292        };
293
294        // Bump the ticker before polling I/O.
295        let tick = self
296            .reactor
297            .ticker
298            .fetch_add(1, Ordering::SeqCst)
299            .wrapping_add(1);
300
301        self.events.clear();
302
303        // Block on I/O events.
304        let res = match self.reactor.poller.wait(&mut self.events, timeout) {
305            // No I/O events occurred.
306            Ok(0) => {
307                if timeout != Some(Duration::from_secs(0)) {
308                    // The non-zero timeout was hit so fire ready timers.
309                    self.reactor.process_timers(&mut wakers);
310                }
311                Ok(())
312            }
313
314            // At least one I/O event occurred.
315            Ok(_) => {
316                // Iterate over sources in the event list.
317                let sources = self.reactor.sources.lock().unwrap();
318
319                for ev in self.events.iter() {
320                    // Check if there is a source in the table with this key.
321                    if let Some(source) = sources.get(ev.key) {
322                        let mut state = source.state.lock().unwrap();
323
324                        // Collect wakers if any event was emitted.
325                        for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
326                            if emitted {
327                                state[dir].tick = tick;
328                                state[dir].drain_into(&mut wakers);
329                            }
330                        }
331
332                        // Re-register if there are still writers or readers. This can happen if
333                        // e.g. we were previously interested in both readability and writability,
334                        // but only one of them was emitted.
335                        if !state[READ].is_empty() || !state[WRITE].is_empty() {
336                            // Create the event that we are interested in.
337                            let event = {
338                                let mut event = Event::none(source.key);
339                                event.readable = !state[READ].is_empty();
340                                event.writable = !state[WRITE].is_empty();
341                                event
342                            };
343
344                            // Register interest in this event.
345                            source.registration.modify(&self.reactor.poller, event)?;
346                        }
347                    }
348                }
349
350                Ok(())
351            }
352
353            // The syscall was interrupted.
354            Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
355
356            // An actual error occureed.
357            Err(err) => Err(err),
358        };
359
360        // Wake up ready tasks.
361        #[cfg(feature = "tracing")]
362        tracing::trace!("{} ready wakers", wakers.len());
363        for waker in wakers {
364            // Don't let a panicking waker blow everything up.
365            panic::catch_unwind(|| waker.wake()).ok();
366        }
367
368        res
369    }
370}
371
372/// A single timer operation.
373enum TimerOp {
374    Insert(Instant, usize, Waker),
375    Remove(Instant, usize),
376}
377
378/// A registered source of I/O events.
379#[derive(Debug)]
380pub(crate) struct Source {
381    /// This source's registration into the reactor.
382    registration: Registration,
383
384    /// The key of this source obtained during registration.
385    key: usize,
386
387    /// Inner state with registered wakers.
388    state: Mutex<[Direction; 2]>,
389}
390
391/// A read or write direction.
392#[derive(Debug, Default)]
393struct Direction {
394    /// Last reactor tick that delivered an event.
395    tick: usize,
396
397    /// Ticks remembered by `Async::poll_readable()` or `Async::poll_writable()`.
398    ticks: Option<(usize, usize)>,
399
400    /// Waker stored by `Async::poll_readable()` or `Async::poll_writable()`.
401    waker: Option<Waker>,
402
403    /// Wakers of tasks waiting for the next event.
404    ///
405    /// Registered by `Async::readable()` and `Async::writable()`.
406    wakers: Slab<Option<Waker>>,
407}
408
409impl Direction {
410    /// Returns `true` if there are no wakers interested in this direction.
411    fn is_empty(&self) -> bool {
412        self.waker.is_none() && self.wakers.iter().all(|(_, opt)| opt.is_none())
413    }
414
415    /// Moves all wakers into a `Vec`.
416    fn drain_into(&mut self, dst: &mut Vec<Waker>) {
417        if let Some(w) = self.waker.take() {
418            dst.push(w);
419        }
420        for (_, opt) in self.wakers.iter_mut() {
421            if let Some(w) = opt.take() {
422                dst.push(w);
423            }
424        }
425    }
426}
427
428impl Source {
429    /// Polls the I/O source for readability.
430    pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
431        self.poll_ready(READ, cx)
432    }
433
434    /// Polls the I/O source for writability.
435    pub(crate) fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
436        self.poll_ready(WRITE, cx)
437    }
438
439    /// Registers a waker from `poll_readable()` or `poll_writable()`.
440    ///
441    /// If a different waker is already registered, it gets replaced and woken.
442    fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
443        let mut state = self.state.lock().unwrap();
444
445        // Check if the reactor has delivered an event.
446        if let Some((a, b)) = state[dir].ticks {
447            // If `state[dir].tick` has changed to a value other than the old reactor tick,
448            // that means a newer reactor tick has delivered an event.
449            if state[dir].tick != a && state[dir].tick != b {
450                state[dir].ticks = None;
451                return Poll::Ready(Ok(()));
452            }
453        }
454
455        let was_empty = state[dir].is_empty();
456
457        // Register the current task's waker.
458        if let Some(w) = state[dir].waker.take() {
459            if w.will_wake(cx.waker()) {
460                state[dir].waker = Some(w);
461                return Poll::Pending;
462            }
463            // Wake the previous waker because it's going to get replaced.
464            panic::catch_unwind(|| w.wake()).ok();
465        }
466        state[dir].waker = Some(cx.waker().clone());
467        state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick));
468
469        // Update interest in this I/O handle.
470        if was_empty {
471            // Create the event that we are interested in.
472            let event = {
473                let mut event = Event::none(self.key);
474                event.readable = !state[READ].is_empty();
475                event.writable = !state[WRITE].is_empty();
476                event
477            };
478
479            // Register interest in it.
480            self.registration.modify(&Reactor::get().poller, event)?;
481        }
482
483        Poll::Pending
484    }
485
486    /// Waits until the I/O source is readable.
487    pub(crate) fn readable<T>(handle: &crate::Async<T>) -> Readable<'_, T> {
488        Readable(Self::ready(handle, READ))
489    }
490
491    /// Waits until the I/O source is readable.
492    pub(crate) fn readable_owned<T>(handle: Arc<crate::Async<T>>) -> ReadableOwned<T> {
493        ReadableOwned(Self::ready(handle, READ))
494    }
495
496    /// Waits until the I/O source is writable.
497    pub(crate) fn writable<T>(handle: &crate::Async<T>) -> Writable<'_, T> {
498        Writable(Self::ready(handle, WRITE))
499    }
500
501    /// Waits until the I/O source is writable.
502    pub(crate) fn writable_owned<T>(handle: Arc<crate::Async<T>>) -> WritableOwned<T> {
503        WritableOwned(Self::ready(handle, WRITE))
504    }
505
506    /// Waits until the I/O source is readable or writable.
507    fn ready<H: Borrow<crate::Async<T>> + Clone, T>(handle: H, dir: usize) -> Ready<H, T> {
508        Ready {
509            handle,
510            dir,
511            ticks: None,
512            index: None,
513            _capture: PhantomData,
514        }
515    }
516}
517
518/// Future for [`Async::readable`](crate::Async::readable).
519#[must_use = "futures do nothing unless you `.await` or poll them"]
520pub struct Readable<'a, T>(Ready<&'a crate::Async<T>, T>);
521
522impl<T> Future for Readable<'_, T> {
523    type Output = io::Result<()>;
524
525    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
526        ready!(Pin::new(&mut self.0).poll(cx))?;
527        #[cfg(feature = "tracing")]
528        tracing::trace!(fd = ?self.0.handle.source.registration, "readable");
529        Poll::Ready(Ok(()))
530    }
531}
532
533impl<T> fmt::Debug for Readable<'_, T> {
534    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
535        f.debug_struct("Readable").finish()
536    }
537}
538
539/// Future for [`Async::readable_owned`](crate::Async::readable_owned).
540#[must_use = "futures do nothing unless you `.await` or poll them"]
541pub struct ReadableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
542
543impl<T> Future for ReadableOwned<T> {
544    type Output = io::Result<()>;
545
546    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
547        ready!(Pin::new(&mut self.0).poll(cx))?;
548        #[cfg(feature = "tracing")]
549        tracing::trace!(fd = ?self.0.handle.source.registration, "readable_owned");
550        Poll::Ready(Ok(()))
551    }
552}
553
554impl<T> fmt::Debug for ReadableOwned<T> {
555    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
556        f.debug_struct("ReadableOwned").finish()
557    }
558}
559
560/// Future for [`Async::writable`](crate::Async::writable).
561#[must_use = "futures do nothing unless you `.await` or poll them"]
562pub struct Writable<'a, T>(Ready<&'a crate::Async<T>, T>);
563
564impl<T> Future for Writable<'_, T> {
565    type Output = io::Result<()>;
566
567    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
568        ready!(Pin::new(&mut self.0).poll(cx))?;
569        #[cfg(feature = "tracing")]
570        tracing::trace!(fd = ?self.0.handle.source.registration, "writable");
571        Poll::Ready(Ok(()))
572    }
573}
574
575impl<T> fmt::Debug for Writable<'_, T> {
576    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
577        f.debug_struct("Writable").finish()
578    }
579}
580
581/// Future for [`Async::writable_owned`](crate::Async::writable_owned).
582#[must_use = "futures do nothing unless you `.await` or poll them"]
583pub struct WritableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
584
585impl<T> Future for WritableOwned<T> {
586    type Output = io::Result<()>;
587
588    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
589        ready!(Pin::new(&mut self.0).poll(cx))?;
590        #[cfg(feature = "tracing")]
591        tracing::trace!(fd = ?self.0.handle.source.registration, "writable_owned");
592        Poll::Ready(Ok(()))
593    }
594}
595
596impl<T> fmt::Debug for WritableOwned<T> {
597    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
598        f.debug_struct("WritableOwned").finish()
599    }
600}
601
602struct Ready<H: Borrow<crate::Async<T>>, T> {
603    handle: H,
604    dir: usize,
605    ticks: Option<(usize, usize)>,
606    index: Option<usize>,
607    _capture: PhantomData<fn() -> T>,
608}
609
610impl<H: Borrow<crate::Async<T>>, T> Unpin for Ready<H, T> {}
611
612impl<H: Borrow<crate::Async<T>> + Clone, T> Future for Ready<H, T> {
613    type Output = io::Result<()>;
614
615    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
616        let Self {
617            ref handle,
618            dir,
619            ticks,
620            index,
621            ..
622        } = &mut *self;
623
624        let mut state = handle.borrow().source.state.lock().unwrap();
625
626        // Check if the reactor has delivered an event.
627        if let Some((a, b)) = *ticks {
628            // If `state[dir].tick` has changed to a value other than the old reactor tick,
629            // that means a newer reactor tick has delivered an event.
630            if state[*dir].tick != a && state[*dir].tick != b {
631                return Poll::Ready(Ok(()));
632            }
633        }
634
635        let was_empty = state[*dir].is_empty();
636
637        // Register the current task's waker.
638        let i = match *index {
639            Some(i) => i,
640            None => {
641                let i = state[*dir].wakers.insert(None);
642                *index = Some(i);
643                *ticks = Some((Reactor::get().ticker(), state[*dir].tick));
644                i
645            }
646        };
647        state[*dir].wakers[i] = Some(cx.waker().clone());
648
649        // Update interest in this I/O handle.
650        if was_empty {
651            // Create the event that we are interested in.
652            let event = {
653                let mut event = Event::none(handle.borrow().source.key);
654                event.readable = !state[READ].is_empty();
655                event.writable = !state[WRITE].is_empty();
656                event
657            };
658
659            // Indicate that we are interested in this event.
660            handle
661                .borrow()
662                .source
663                .registration
664                .modify(&Reactor::get().poller, event)?;
665        }
666
667        Poll::Pending
668    }
669}
670
671impl<H: Borrow<crate::Async<T>>, T> Drop for Ready<H, T> {
672    fn drop(&mut self) {
673        // Remove our waker when dropped.
674        if let Some(key) = self.index {
675            let mut state = self.handle.borrow().source.state.lock().unwrap();
676            let wakers = &mut state[self.dir].wakers;
677            if wakers.contains(key) {
678                wakers.remove(key);
679            }
680        }
681    }
682}