async_io/
reactor.rs

1use std::borrow::Borrow;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::future::Future;
5use std::io;
6use std::marker::PhantomData;
7use std::mem;
8use std::panic;
9use std::pin::Pin;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
12use std::task::{ready, Context, Poll, Waker};
13use std::time::{Duration, Instant};
14
15use concurrent_queue::ConcurrentQueue;
16use polling::{Event, Events, Poller};
17use slab::Slab;
18
19// Choose the proper implementation of `Registration` based on the target platform.
20cfg_if::cfg_if! {
21    if #[cfg(windows)] {
22        mod windows;
23        pub use windows::Registration;
24    } else if #[cfg(any(
25        target_vendor = "apple",
26        target_os = "freebsd",
27        target_os = "netbsd",
28        target_os = "openbsd",
29        target_os = "dragonfly",
30    ))] {
31        mod kqueue;
32        pub use kqueue::Registration;
33    } else if #[cfg(unix)] {
34        mod unix;
35        pub use unix::Registration;
36    } else {
37        compile_error!("unsupported platform");
38    }
39}
40
41#[cfg(not(target_os = "espidf"))]
42const TIMER_QUEUE_SIZE: usize = 1000;
43
44/// ESP-IDF - being an embedded OS - does not need so many timers
45/// and this saves ~ 20K RAM which is a lot for an MCU with RAM < 400K
46#[cfg(target_os = "espidf")]
47const TIMER_QUEUE_SIZE: usize = 100;
48
49const READ: usize = 0;
50const WRITE: usize = 1;
51
52/// The reactor.
53///
54/// There is only one global instance of this type, accessible by [`Reactor::get()`].
55pub(crate) struct Reactor {
56    /// Portable bindings to epoll/kqueue/event ports/IOCP.
57    ///
58    /// This is where I/O is polled, producing I/O events.
59    pub(crate) poller: Poller,
60
61    /// Ticker bumped before polling.
62    ///
63    /// This is useful for checking what is the current "round" of `ReactorLock::react()` when
64    /// synchronizing things in `Source::readable()` and `Source::writable()`. Both of those
65    /// methods must make sure they don't receive stale I/O events - they only accept events from a
66    /// fresh "round" of `ReactorLock::react()`.
67    ticker: AtomicUsize,
68
69    /// Registered sources.
70    sources: Mutex<Slab<Arc<Source>>>,
71
72    /// Temporary storage for I/O events when polling the reactor.
73    ///
74    /// Holding a lock on this event list implies the exclusive right to poll I/O.
75    events: Mutex<Events>,
76
77    /// An ordered map of registered timers.
78    ///
79    /// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to
80    /// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the
81    /// timer.
82    timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
83
84    /// A queue of timer operations (insert and remove).
85    ///
86    /// When inserting or removing a timer, we don't process it immediately - we just push it into
87    /// this queue. Timers actually get processed when the queue fills up or the reactor is polled.
88    timer_ops: ConcurrentQueue<TimerOp>,
89}
90
91impl Reactor {
92    /// Returns a reference to the reactor.
93    pub(crate) fn get() -> &'static Reactor {
94        static REACTOR: OnceLock<Reactor> = OnceLock::new();
95
96        REACTOR.get_or_init(|| {
97            crate::driver::init();
98            Reactor {
99                poller: Poller::new().expect("cannot initialize I/O event notification"),
100                ticker: AtomicUsize::new(0),
101                sources: Mutex::new(Slab::new()),
102                events: Mutex::new(Events::new()),
103                timers: Mutex::new(BTreeMap::new()),
104                timer_ops: ConcurrentQueue::bounded(TIMER_QUEUE_SIZE),
105            }
106        })
107    }
108
109    /// Returns the current ticker.
110    pub(crate) fn ticker(&self) -> usize {
111        self.ticker.load(Ordering::SeqCst)
112    }
113
114    /// Registers an I/O source in the reactor.
115    pub(crate) fn insert_io(&self, raw: Registration) -> io::Result<Arc<Source>> {
116        // Create an I/O source for this file descriptor.
117        let source = {
118            let mut sources = self.sources.lock().unwrap();
119            let key = sources.vacant_entry().key();
120            let source = Arc::new(Source {
121                registration: raw,
122                key,
123                state: Default::default(),
124            });
125            sources.insert(source.clone());
126            source
127        };
128
129        // Register the file descriptor.
130        if let Err(err) = source.registration.add(&self.poller, source.key) {
131            let mut sources = self.sources.lock().unwrap();
132            sources.remove(source.key);
133            return Err(err);
134        }
135
136        Ok(source)
137    }
138
139    /// Deregisters an I/O source from the reactor.
140    pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
141        let mut sources = self.sources.lock().unwrap();
142        sources.remove(source.key);
143        source.registration.delete(&self.poller)
144    }
145
146    /// Registers a timer in the reactor.
147    ///
148    /// Returns the inserted timer's ID.
149    pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
150        // Generate a new timer ID.
151        static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
152        let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
153
154        // Push an insert operation.
155        while self
156            .timer_ops
157            .push(TimerOp::Insert(when, id, waker.clone()))
158            .is_err()
159        {
160            // If the queue is full, drain it and try again.
161            let mut timers = self.timers.lock().unwrap();
162            self.process_timer_ops(&mut timers);
163        }
164
165        // Notify that a timer has been inserted.
166        self.notify();
167
168        id
169    }
170
171    /// Deregisters a timer from the reactor.
172    pub(crate) fn remove_timer(&self, when: Instant, id: usize) {
173        // Push a remove operation.
174        while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
175            // If the queue is full, drain it and try again.
176            let mut timers = self.timers.lock().unwrap();
177            self.process_timer_ops(&mut timers);
178        }
179    }
180
181    /// Notifies the thread blocked on the reactor.
182    pub(crate) fn notify(&self) {
183        self.poller.notify().expect("failed to notify reactor");
184    }
185
186    /// Locks the reactor, potentially blocking if the lock is held by another thread.
187    pub(crate) fn lock(&self) -> ReactorLock<'_> {
188        let reactor = self;
189        let events = self.events.lock().unwrap();
190        ReactorLock { reactor, events }
191    }
192
193    /// Attempts to lock the reactor.
194    pub(crate) fn try_lock(&self) -> Option<ReactorLock<'_>> {
195        self.events.try_lock().ok().map(|events| {
196            let reactor = self;
197            ReactorLock { reactor, events }
198        })
199    }
200
201    /// Processes ready timers and extends the list of wakers to wake.
202    ///
203    /// Returns the duration until the next timer before this method was called.
204    fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
205        #[cfg(feature = "tracing")]
206        let span = tracing::trace_span!("process_timers");
207        #[cfg(feature = "tracing")]
208        let _enter = span.enter();
209
210        let mut timers = self.timers.lock().unwrap();
211        self.process_timer_ops(&mut timers);
212
213        let now = Instant::now();
214
215        // Split timers into ready and pending timers.
216        //
217        // Careful to split just *after* `now`, so that a timer set for exactly `now` is considered
218        // ready.
219        let pending = timers.split_off(&(now + Duration::from_nanos(1), 0));
220        let ready = mem::replace(&mut *timers, pending);
221
222        // Calculate the duration until the next event.
223        let dur = if ready.is_empty() {
224            // Duration until the next timer.
225            timers
226                .keys()
227                .next()
228                .map(|(when, _)| when.saturating_duration_since(now))
229        } else {
230            // Timers are about to fire right now.
231            Some(Duration::from_secs(0))
232        };
233
234        // Drop the lock before waking.
235        drop(timers);
236
237        // Add wakers to the list.
238        #[cfg(feature = "tracing")]
239        tracing::trace!("{} ready wakers", ready.len());
240
241        for (_, waker) in ready {
242            wakers.push(waker);
243        }
244
245        dur
246    }
247
248    /// Processes queued timer operations.
249    fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) {
250        // Process only as much as fits into the queue, or else this loop could in theory run
251        // forever.
252        self.timer_ops
253            .try_iter()
254            .take(self.timer_ops.capacity().unwrap())
255            .for_each(|op| match op {
256                TimerOp::Insert(when, id, waker) => {
257                    timers.insert((when, id), waker);
258                }
259                TimerOp::Remove(when, id) => {
260                    timers.remove(&(when, id));
261                }
262            });
263    }
264}
265
266/// A lock on the reactor.
267pub(crate) struct ReactorLock<'a> {
268    reactor: &'a Reactor,
269    events: MutexGuard<'a, Events>,
270}
271
272impl ReactorLock<'_> {
273    /// Processes new events, blocking until the first event or the timeout.
274    pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
275        #[cfg(feature = "tracing")]
276        let span = tracing::trace_span!("react");
277        #[cfg(feature = "tracing")]
278        let _enter = span.enter();
279
280        let mut wakers = Vec::new();
281
282        // Process ready timers.
283        let next_timer = self.reactor.process_timers(&mut wakers);
284
285        // compute the timeout for blocking on I/O events.
286        let timeout = match (next_timer, timeout) {
287            (None, None) => None,
288            (Some(t), None) | (None, Some(t)) => Some(t),
289            (Some(a), Some(b)) => Some(a.min(b)),
290        };
291
292        // Bump the ticker before polling I/O.
293        let tick = self
294            .reactor
295            .ticker
296            .fetch_add(1, Ordering::SeqCst)
297            .wrapping_add(1);
298
299        self.events.clear();
300
301        // Block on I/O events.
302        let res = match self.reactor.poller.wait(&mut self.events, timeout) {
303            // No I/O events occurred.
304            Ok(0) => {
305                if timeout != Some(Duration::from_secs(0)) {
306                    // The non-zero timeout was hit so fire ready timers.
307                    self.reactor.process_timers(&mut wakers);
308                }
309                Ok(())
310            }
311
312            // At least one I/O event occurred.
313            Ok(_) => {
314                // Iterate over sources in the event list.
315                let sources = self.reactor.sources.lock().unwrap();
316
317                for ev in self.events.iter() {
318                    // Check if there is a source in the table with this key.
319                    if let Some(source) = sources.get(ev.key) {
320                        let mut state = source.state.lock().unwrap();
321
322                        // Collect wakers if any event was emitted.
323                        for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
324                            if emitted {
325                                state[dir].tick = tick;
326                                state[dir].drain_into(&mut wakers);
327                            }
328                        }
329
330                        // Re-register if there are still writers or readers. This can happen if
331                        // e.g. we were previously interested in both readability and writability,
332                        // but only one of them was emitted.
333                        if !state[READ].is_empty() || !state[WRITE].is_empty() {
334                            // Create the event that we are interested in.
335                            let event = {
336                                let mut event = Event::none(source.key);
337                                event.readable = !state[READ].is_empty();
338                                event.writable = !state[WRITE].is_empty();
339                                event
340                            };
341
342                            // Register interest in this event.
343                            source.registration.modify(&self.reactor.poller, event)?;
344                        }
345                    }
346                }
347
348                Ok(())
349            }
350
351            // The syscall was interrupted.
352            Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
353
354            // An actual error occureed.
355            Err(err) => Err(err),
356        };
357
358        // Wake up ready tasks.
359        #[cfg(feature = "tracing")]
360        tracing::trace!("{} ready wakers", wakers.len());
361        for waker in wakers {
362            // Don't let a panicking waker blow everything up.
363            panic::catch_unwind(|| waker.wake()).ok();
364        }
365
366        res
367    }
368}
369
370/// A single timer operation.
371enum TimerOp {
372    Insert(Instant, usize, Waker),
373    Remove(Instant, usize),
374}
375
376/// A registered source of I/O events.
377#[derive(Debug)]
378pub(crate) struct Source {
379    /// This source's registration into the reactor.
380    registration: Registration,
381
382    /// The key of this source obtained during registration.
383    key: usize,
384
385    /// Inner state with registered wakers.
386    state: Mutex<[Direction; 2]>,
387}
388
389/// A read or write direction.
390#[derive(Debug, Default)]
391struct Direction {
392    /// Last reactor tick that delivered an event.
393    tick: usize,
394
395    /// Ticks remembered by `Async::poll_readable()` or `Async::poll_writable()`.
396    ticks: Option<(usize, usize)>,
397
398    /// Waker stored by `Async::poll_readable()` or `Async::poll_writable()`.
399    waker: Option<Waker>,
400
401    /// Wakers of tasks waiting for the next event.
402    ///
403    /// Registered by `Async::readable()` and `Async::writable()`.
404    wakers: Slab<Option<Waker>>,
405}
406
407impl Direction {
408    /// Returns `true` if there are no wakers interested in this direction.
409    fn is_empty(&self) -> bool {
410        self.waker.is_none() && self.wakers.iter().all(|(_, opt)| opt.is_none())
411    }
412
413    /// Moves all wakers into a `Vec`.
414    fn drain_into(&mut self, dst: &mut Vec<Waker>) {
415        if let Some(w) = self.waker.take() {
416            dst.push(w);
417        }
418        for (_, opt) in self.wakers.iter_mut() {
419            if let Some(w) = opt.take() {
420                dst.push(w);
421            }
422        }
423    }
424}
425
426impl Source {
427    /// Polls the I/O source for readability.
428    pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
429        self.poll_ready(READ, cx)
430    }
431
432    /// Polls the I/O source for writability.
433    pub(crate) fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
434        self.poll_ready(WRITE, cx)
435    }
436
437    /// Registers a waker from `poll_readable()` or `poll_writable()`.
438    ///
439    /// If a different waker is already registered, it gets replaced and woken.
440    fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
441        let mut state = self.state.lock().unwrap();
442
443        // Check if the reactor has delivered an event.
444        if let Some((a, b)) = state[dir].ticks {
445            // If `state[dir].tick` has changed to a value other than the old reactor tick,
446            // that means a newer reactor tick has delivered an event.
447            if state[dir].tick != a && state[dir].tick != b {
448                state[dir].ticks = None;
449                return Poll::Ready(Ok(()));
450            }
451        }
452
453        let was_empty = state[dir].is_empty();
454
455        // Register the current task's waker.
456        if let Some(w) = state[dir].waker.take() {
457            if w.will_wake(cx.waker()) {
458                state[dir].waker = Some(w);
459                return Poll::Pending;
460            }
461            // Wake the previous waker because it's going to get replaced.
462            panic::catch_unwind(|| w.wake()).ok();
463        }
464        state[dir].waker = Some(cx.waker().clone());
465        state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick));
466
467        // Update interest in this I/O handle.
468        if was_empty {
469            // Create the event that we are interested in.
470            let event = {
471                let mut event = Event::none(self.key);
472                event.readable = !state[READ].is_empty();
473                event.writable = !state[WRITE].is_empty();
474                event
475            };
476
477            // Register interest in it.
478            self.registration.modify(&Reactor::get().poller, event)?;
479        }
480
481        Poll::Pending
482    }
483
484    /// Waits until the I/O source is readable.
485    pub(crate) fn readable<T>(handle: &crate::Async<T>) -> Readable<'_, T> {
486        Readable(Self::ready(handle, READ))
487    }
488
489    /// Waits until the I/O source is readable.
490    pub(crate) fn readable_owned<T>(handle: Arc<crate::Async<T>>) -> ReadableOwned<T> {
491        ReadableOwned(Self::ready(handle, READ))
492    }
493
494    /// Waits until the I/O source is writable.
495    pub(crate) fn writable<T>(handle: &crate::Async<T>) -> Writable<'_, T> {
496        Writable(Self::ready(handle, WRITE))
497    }
498
499    /// Waits until the I/O source is writable.
500    pub(crate) fn writable_owned<T>(handle: Arc<crate::Async<T>>) -> WritableOwned<T> {
501        WritableOwned(Self::ready(handle, WRITE))
502    }
503
504    /// Waits until the I/O source is readable or writable.
505    fn ready<H: Borrow<crate::Async<T>> + Clone, T>(handle: H, dir: usize) -> Ready<H, T> {
506        Ready {
507            handle,
508            dir,
509            ticks: None,
510            index: None,
511            _capture: PhantomData,
512        }
513    }
514}
515
516/// Future for [`Async::readable`](crate::Async::readable).
517#[must_use = "futures do nothing unless you `.await` or poll them"]
518pub struct Readable<'a, T>(Ready<&'a crate::Async<T>, T>);
519
520impl<T> Future for Readable<'_, T> {
521    type Output = io::Result<()>;
522
523    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
524        ready!(Pin::new(&mut self.0).poll(cx))?;
525        #[cfg(feature = "tracing")]
526        tracing::trace!(fd = ?self.0.handle.source.registration, "readable");
527        Poll::Ready(Ok(()))
528    }
529}
530
531impl<T> fmt::Debug for Readable<'_, T> {
532    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
533        f.debug_struct("Readable").finish()
534    }
535}
536
537/// Future for [`Async::readable_owned`](crate::Async::readable_owned).
538#[must_use = "futures do nothing unless you `.await` or poll them"]
539pub struct ReadableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
540
541impl<T> Future for ReadableOwned<T> {
542    type Output = io::Result<()>;
543
544    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
545        ready!(Pin::new(&mut self.0).poll(cx))?;
546        #[cfg(feature = "tracing")]
547        tracing::trace!(fd = ?self.0.handle.source.registration, "readable_owned");
548        Poll::Ready(Ok(()))
549    }
550}
551
552impl<T> fmt::Debug for ReadableOwned<T> {
553    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
554        f.debug_struct("ReadableOwned").finish()
555    }
556}
557
558/// Future for [`Async::writable`](crate::Async::writable).
559#[must_use = "futures do nothing unless you `.await` or poll them"]
560pub struct Writable<'a, T>(Ready<&'a crate::Async<T>, T>);
561
562impl<T> Future for Writable<'_, T> {
563    type Output = io::Result<()>;
564
565    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
566        ready!(Pin::new(&mut self.0).poll(cx))?;
567        #[cfg(feature = "tracing")]
568        tracing::trace!(fd = ?self.0.handle.source.registration, "writable");
569        Poll::Ready(Ok(()))
570    }
571}
572
573impl<T> fmt::Debug for Writable<'_, T> {
574    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
575        f.debug_struct("Writable").finish()
576    }
577}
578
579/// Future for [`Async::writable_owned`](crate::Async::writable_owned).
580#[must_use = "futures do nothing unless you `.await` or poll them"]
581pub struct WritableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
582
583impl<T> Future for WritableOwned<T> {
584    type Output = io::Result<()>;
585
586    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
587        ready!(Pin::new(&mut self.0).poll(cx))?;
588        #[cfg(feature = "tracing")]
589        tracing::trace!(fd = ?self.0.handle.source.registration, "writable_owned");
590        Poll::Ready(Ok(()))
591    }
592}
593
594impl<T> fmt::Debug for WritableOwned<T> {
595    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
596        f.debug_struct("WritableOwned").finish()
597    }
598}
599
600struct Ready<H: Borrow<crate::Async<T>>, T> {
601    handle: H,
602    dir: usize,
603    ticks: Option<(usize, usize)>,
604    index: Option<usize>,
605    _capture: PhantomData<fn() -> T>,
606}
607
608impl<H: Borrow<crate::Async<T>>, T> Unpin for Ready<H, T> {}
609
610impl<H: Borrow<crate::Async<T>> + Clone, T> Future for Ready<H, T> {
611    type Output = io::Result<()>;
612
613    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
614        let Self {
615            ref handle,
616            dir,
617            ticks,
618            index,
619            ..
620        } = &mut *self;
621
622        let mut state = handle.borrow().source.state.lock().unwrap();
623
624        // Check if the reactor has delivered an event.
625        if let Some((a, b)) = *ticks {
626            // If `state[dir].tick` has changed to a value other than the old reactor tick,
627            // that means a newer reactor tick has delivered an event.
628            if state[*dir].tick != a && state[*dir].tick != b {
629                return Poll::Ready(Ok(()));
630            }
631        }
632
633        let was_empty = state[*dir].is_empty();
634
635        // Register the current task's waker.
636        let i = match *index {
637            Some(i) => i,
638            None => {
639                let i = state[*dir].wakers.insert(None);
640                *index = Some(i);
641                *ticks = Some((Reactor::get().ticker(), state[*dir].tick));
642                i
643            }
644        };
645        state[*dir].wakers[i] = Some(cx.waker().clone());
646
647        // Update interest in this I/O handle.
648        if was_empty {
649            // Create the event that we are interested in.
650            let event = {
651                let mut event = Event::none(handle.borrow().source.key);
652                event.readable = !state[READ].is_empty();
653                event.writable = !state[WRITE].is_empty();
654                event
655            };
656
657            // Indicate that we are interested in this event.
658            handle
659                .borrow()
660                .source
661                .registration
662                .modify(&Reactor::get().poller, event)?;
663        }
664
665        Poll::Pending
666    }
667}
668
669impl<H: Borrow<crate::Async<T>>, T> Drop for Ready<H, T> {
670    fn drop(&mut self) {
671        // Remove our waker when dropped.
672        if let Some(key) = self.index {
673            let mut state = self.handle.borrow().source.state.lock().unwrap();
674            let wakers = &mut state[self.dir].wakers;
675            if wakers.contains(key) {
676                wakers.remove(key);
677            }
678        }
679    }
680}