futures_net/driver/
mod.rs

1//! futures reactor,  event loop.
2
3pub(crate) mod background;
4mod poll_evented;
5pub(crate) mod registration;
6mod sharded_rwlock;
7pub mod sys;
8
9pub use self::poll_evented::PollEvented;
10
11use futures_util::task::AtomicWaker;
12use log::{debug, log_enabled, trace, Level};
13use slab::Slab;
14use std::cell::RefCell;
15use std::io;
16use std::mem;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::{Relaxed, SeqCst};
19use std::sync::{Arc, Weak};
20use std::task::Context;
21use std::time::{Duration, Instant};
22use std::{fmt, usize};
23
24use self::background::Background;
25use self::sharded_rwlock::RwLock;
26use self::sys::event::Evented;
27
28/// The futures reactor,  event loop.
29///
30/// The event loop is the main source of blocking in an application which drives
31/// all other I/O events and notifications happening. Each event loop can have
32/// multiple handles pointing to it, each of which can then be used to create
33/// various I/O objects to interact with the event loop in interesting ways.
34struct Reactor {
35    /// Reuse the `sys::event::Events` value across calls to poll.
36    events: sys::event::Events,
37
38    /// State shared between the reactor and the handles.
39    inner: Arc<Inner>,
40
41    _wakeup_registration: sys::Registration,
42}
43
44/// A reference to a reactor.
45///
46/// A `Handle` is used for associating I/O objects with an event loop
47/// explicitly. Typically though you won't end up using a `Handle` that often
48/// and will instead use the default reactor for the execution context.
49///
50/// By default, most components bind lazily to reactors.
51/// To get this behavior when manually passing a `Handle`, use `default()`.
52#[derive(Clone)]
53struct Handle {
54    inner: Option<HandlePriv>,
55}
56
57/// Like `Handle`, but never `None`.
58#[derive(Clone)]
59struct HandlePriv {
60    inner: Weak<Inner>,
61}
62
63/// Return value from the `turn` method on `Reactor`.
64///
65/// Currently this value doesn't actually provide any functionality, but it may
66/// in the future give insight into what happened during `turn`.
67#[derive(Debug)]
68struct Turn {
69    _priv: (),
70}
71
72#[test]
73fn test_handle_size() {
74    use std::mem;
75    assert_eq!(mem::size_of::<Handle>(), mem::size_of::<HandlePriv>());
76}
77
78struct Inner {
79    /// The underlying system event queue.
80    io: sys::Poll,
81
82    /// ABA guard counter
83    next_aba_guard: AtomicUsize,
84
85    /// Dispatch slabs for I/O and futures events
86    io_dispatch: RwLock<Slab<ScheduledIo>>,
87
88    /// Used to wake up the reactor from a call to `turn`
89    wakeup: sys::SetReadiness,
90}
91
92struct ScheduledIo {
93    aba_guard: usize,
94    readiness: AtomicUsize,
95    reader: AtomicWaker,
96    writer: AtomicWaker,
97}
98
99#[derive(Debug, Eq, PartialEq, Clone, Copy)]
100pub(crate) enum Direction {
101    Read,
102    Write,
103}
104
105/// The global fallback reactor.
106static HANDLE_FALLBACK: AtomicUsize = AtomicUsize::new(0);
107
108// Tracks the reactor for the current execution context.
109thread_local!(static CURRENT_REACTOR: RefCell<Option<HandlePriv>> = RefCell::new(None));
110
111const TOKEN_SHIFT: usize = 22;
112
113// Kind of arbitrary, but this reserves some token space for later usage.
114const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1;
115const TOKEN_WAKEUP: sys::Token = sys::Token(MAX_SOURCES);
116
117fn _assert_kinds() {
118    fn _assert<T: Send + Sync>() {}
119
120    _assert::<Handle>();
121}
122
123// ===== impl Reactor =====
124
125impl Reactor {
126    /// Creates a new event loop, returning any error that happened during the
127    /// creation.
128    fn new() -> io::Result<Reactor> {
129        let io = sys::Poll::new()?;
130        let wakeup_pair = sys::Registration::new2();
131
132        io.register(
133            &wakeup_pair.0,
134            TOKEN_WAKEUP,
135            sys::event::Ready::readable(),
136            sys::event::PollOpt::level(),
137        )?;
138
139        Ok(Reactor {
140            events: sys::event::Events::with_capacity(1024),
141            _wakeup_registration: wakeup_pair.0,
142            inner: Arc::new(Inner {
143                io: io,
144                next_aba_guard: AtomicUsize::new(0),
145                io_dispatch: RwLock::new(Slab::with_capacity(1)),
146                wakeup: wakeup_pair.1,
147            }),
148        })
149    }
150
151    /// Returns a handle to this event loop which can be sent across threads
152    /// and can be used as a proxy to the event loop itself.
153    ///
154    /// Handles are cloneable and clones always refer to the same event loop.
155    /// This handle is typically passed into functions that create I/O objects
156    /// to bind them to this event loop.
157    fn handle(&self) -> Handle {
158        Handle {
159            inner: Some(HandlePriv {
160                inner: Arc::downgrade(&self.inner),
161            }),
162        }
163    }
164
165    /// Performs one iteration of the event loop, blocking on waiting for events
166    /// for at most `max_wait` (forever if `None`).
167    ///
168    /// This method is the primary method of running this reactor and processing
169    /// I/O events that occur. This method executes one iteration of an event
170    /// loop, blocking at most once waiting for events to happen.
171    ///
172    /// If a `max_wait` is specified then the method should block no longer than
173    /// the duration specified, but this shouldn't be used as a super-precise
174    /// timer but rather a "ballpark approximation"
175    ///
176    /// # Return value
177    ///
178    /// This function returns an instance of `Turn`
179    ///
180    /// `Turn` as of today has no extra information with it and can be safely
181    /// discarded.  In the future `Turn` may contain information about what
182    /// happened while this reactor blocked.
183    ///
184    /// # Errors
185    ///
186    /// This function may also return any I/O error which occurs when polling
187    /// for readiness of I/O objects with the OS. This is quite unlikely to
188    /// arise and typically mean that things have gone horribly wrong at that
189    /// point.
190    fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> {
191        self.poll(max_wait)?;
192        Ok(Turn { _priv: () })
193    }
194
195    /// Returns true if the reactor is currently idle.
196    ///
197    /// Idle is defined as all tasks that have been spawned have completed,
198    /// either successfully or with an error.
199    fn is_idle(&self) -> bool {
200        self.inner.io_dispatch.read().is_empty()
201    }
202
203    /// Run this reactor on a background thread.
204    ///
205    /// This function takes ownership, spawns a new thread, and moves the
206    /// reactor to this new thread. It then runs the reactor, driving all
207    /// associated I/O resources, until the `Background` handle is dropped or
208    /// explicitly shutdown.
209    fn background(self) -> io::Result<Background> {
210        Background::new(self)
211    }
212
213    fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
214        // Block waiting for an event to happen, peeling out how many events
215        // happened.
216        match self.inner.io.poll(&mut self.events, max_wait) {
217            Ok(_) => {}
218            Err(e) => return Err(e),
219        }
220
221        let start = if log_enabled!(Level::Debug) {
222            Some(Instant::now())
223        } else {
224            None
225        };
226
227        // Process all the events that came in, dispatching appropriately
228        let mut events = 0;
229        for event in self.events.iter() {
230            events += 1;
231            let token = event.token();
232            trace!("event {:?} {:?}", event.readiness(), event.token());
233
234            if token == TOKEN_WAKEUP {
235                self.inner
236                    .wakeup
237                    .set_readiness(sys::event::Ready::empty())
238                    .unwrap();
239            } else {
240                self.dispatch(token, event.readiness());
241            }
242        }
243
244        if let Some(start) = start {
245            let dur = start.elapsed();
246            trace!(
247                "loop process - {} events, {}.{:03}s",
248                events,
249                dur.as_secs(),
250                dur.subsec_nanos() / 1_000_000
251            );
252        }
253
254        Ok(())
255    }
256
257    fn dispatch(&self, token: sys::Token, ready: sys::event::Ready) {
258        let aba_guard = token.0 & !MAX_SOURCES;
259        let token = token.0 & MAX_SOURCES;
260
261        let mut rd = None;
262        let mut wr = None;
263
264        // Create a scope to ensure that notifying the tasks stays out of the
265        // lock's critical section.
266        {
267            let io_dispatch = self.inner.io_dispatch.read();
268
269            let io = match io_dispatch.get(token) {
270                Some(io) => io,
271                None => return,
272            };
273
274            if aba_guard != io.aba_guard {
275                return;
276            }
277
278            io.readiness.fetch_or(ready.as_usize(), Relaxed);
279
280            if ready.is_writable() || platform::is_hup(&ready) {
281                wr = io.writer.take();
282            }
283
284            if !(ready & (!sys::event::Ready::writable())).is_empty() {
285                rd = io.reader.take();
286            }
287        }
288
289        if let Some(task) = rd {
290            task.wake();
291        }
292
293        if let Some(task) = wr {
294            task.wake();
295        }
296    }
297}
298
299impl fmt::Debug for Reactor {
300    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
301        write!(f, "Reactor")
302    }
303}
304
305// ===== impl Handle =====
306
307impl Handle {
308    fn as_priv(&self) -> Option<&HandlePriv> {
309        self.inner.as_ref()
310    }
311
312    fn into_priv(self) -> Option<HandlePriv> {
313        self.inner
314    }
315
316    fn wakeup(&self) {
317        if let Some(handle) = self.as_priv() {
318            handle.wakeup();
319        }
320    }
321}
322
323impl Default for Handle {
324    /// Returns a "default" handle, i.e., a handle that lazily binds to a reactor.
325    fn default() -> Handle {
326        Handle { inner: None }
327    }
328}
329
330impl fmt::Debug for Handle {
331    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332        write!(f, "Handle")
333    }
334}
335
336fn set_fallback(handle: HandlePriv) -> Result<(), ()> {
337    unsafe {
338        let val = handle.into_usize();
339        match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) {
340            Ok(_) => Ok(()),
341            Err(_) => {
342                drop(HandlePriv::from_usize(val));
343                Err(())
344            }
345        }
346    }
347}
348
349// ===== impl HandlePriv =====
350
351impl HandlePriv {
352    /// Try to get a handle to the current reactor.
353    ///
354    /// Returns `Err` if no handle is found.
355    pub(crate) fn try_current() -> io::Result<HandlePriv> {
356        CURRENT_REACTOR.with(|current| match *current.borrow() {
357            Some(ref handle) => Ok(handle.clone()),
358            None => HandlePriv::fallback(),
359        })
360    }
361
362    /// Returns a handle to the fallback reactor.
363    fn fallback() -> io::Result<HandlePriv> {
364        let mut fallback = HANDLE_FALLBACK.load(SeqCst);
365
366        // If the fallback hasn't been previously initialized then let's spin
367        // up a helper thread and try to initialize with that. If we can't
368        // actually create a helper thread then we'll just return a "defunct"
369        // handle which will return errors when I/O objects are attempted to be
370        // associated.
371        if fallback == 0 {
372            let reactor = match Reactor::new() {
373                Ok(reactor) => reactor,
374                Err(_) => {
375                    return Err(io::Error::new(
376                        io::ErrorKind::Other,
377                        "failed to create reactor",
378                    ))
379                }
380            };
381
382            // If we successfully set ourselves as the actual fallback then we
383            // want to `forget` the helper thread to ensure that it persists
384            // globally. If we fail to set ourselves as the fallback that means
385            // that someone was racing with this call to `Handle::default`.
386            // They ended up winning so we'll destroy our helper thread (which
387            // shuts down the thread) and reload the fallback.
388            if set_fallback(reactor.handle().into_priv().unwrap()).is_ok() {
389                let ret = reactor.handle().into_priv().unwrap();
390
391                match reactor.background() {
392                    Ok(bg) => bg.forget(),
393                    // The global handle is fubar, but y'all probably got bigger
394                    // problems if a thread can't spawn.
395                    Err(_) => {}
396                }
397
398                return Ok(ret);
399            }
400
401            fallback = HANDLE_FALLBACK.load(SeqCst);
402        }
403
404        // At this point our fallback handle global was configured so we use
405        // its value to reify a handle, clone it, and then forget our reified
406        // handle as we don't actually have an owning reference to it.
407        assert!(fallback != 0);
408
409        let ret = unsafe {
410            let handle = HandlePriv::from_usize(fallback);
411            let ret = handle.clone();
412
413            // This prevents `handle` from being dropped and having the ref
414            // count decremented.
415            drop(handle.into_usize());
416
417            ret
418        };
419
420        Ok(ret)
421    }
422
423    /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
424    /// makes the next call to `turn` return immediately.
425    ///
426    /// This method is intended to be used in situations where a notification
427    /// needs to otherwise be sent to the main reactor. If the reactor is
428    /// currently blocked inside of `turn` then it will wake up and soon return
429    /// after this method has been called. If the reactor is not currently
430    /// blocked in `turn`, then the next call to `turn` will not block and
431    /// return immediately.
432    fn wakeup(&self) {
433        if let Some(inner) = self.inner() {
434            inner
435                .wakeup
436                .set_readiness(sys::event::Ready::readable())
437                .unwrap();
438        }
439    }
440
441    fn into_usize(self) -> usize {
442        unsafe { mem::transmute::<Weak<Inner>, usize>(self.inner) }
443    }
444
445    unsafe fn from_usize(val: usize) -> HandlePriv {
446        let inner = mem::transmute::<usize, Weak<Inner>>(val);
447        HandlePriv { inner }
448    }
449
450    fn inner(&self) -> Option<Arc<Inner>> {
451        self.inner.upgrade()
452    }
453}
454
455impl fmt::Debug for HandlePriv {
456    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
457        write!(f, "HandlePriv")
458    }
459}
460
461// ===== impl Inner =====
462
463impl Inner {
464    /// Register an I/O resource with the reactor.
465    ///
466    /// The registration token is returned.
467    fn add_source(&self, source: &dyn Evented) -> io::Result<usize> {
468        // Get an ABA guard value
469        let aba_guard = self.next_aba_guard.fetch_add(1 << TOKEN_SHIFT, Relaxed);
470
471        let mut io_dispatch = self.io_dispatch.write();
472
473        if io_dispatch.len() == MAX_SOURCES {
474            return Err(io::Error::new(
475                io::ErrorKind::Other,
476                "reactor at max \
477                 registered I/O resources",
478            ));
479        }
480
481        // Acquire a write lock
482        let key = io_dispatch.insert(ScheduledIo {
483            aba_guard,
484            readiness: AtomicUsize::new(0),
485            reader: AtomicWaker::new(),
486            writer: AtomicWaker::new(),
487        });
488
489        self.io.register(
490            source,
491            sys::Token(aba_guard | key),
492            sys::event::Ready::all(),
493            sys::event::PollOpt::edge(),
494        )?;
495
496        Ok(key)
497    }
498
499    /// Deregisters an I/O resource from the reactor.
500    fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
501        self.io.deregister(source)
502    }
503
504    fn drop_source(&self, token: usize) {
505        debug!("dropping I/O source: {}", token);
506        self.io_dispatch.write().remove(token);
507    }
508
509    /// Registers interest in the I/O resource associated with `token`.
510    fn register(&self, cx: &mut Context<'_>, token: usize, dir: Direction) {
511        debug!("scheduling direction for: {}", token);
512        let io_dispatch = self.io_dispatch.read();
513        let sched = io_dispatch.get(token).unwrap();
514
515        let (atomic_waker, ready) = match dir {
516            Direction::Read => (&sched.reader, !sys::event::Ready::writable()),
517            Direction::Write => (&sched.writer, sys::event::Ready::writable()),
518        };
519
520        atomic_waker.register(&cx.waker());
521
522        if sched.readiness.load(SeqCst) & ready.as_usize() != 0 {
523            atomic_waker.wake();
524        }
525    }
526}
527
528impl Drop for Inner {
529    fn drop(&mut self) {
530        // When a reactor is dropped it needs to wake up all blocked tasks as
531        // they'll never receive a notification, and all connected I/O objects
532        // will start returning errors pretty quickly.
533        let io = self.io_dispatch.read();
534        for (_, io) in io.iter() {
535            io.writer.wake();
536            io.reader.wake();
537        }
538    }
539}
540
541impl Direction {
542    fn mask(&self) -> sys::event::Ready {
543        match *self {
544            Direction::Read => {
545                // Everything except writable is signaled through read.
546                sys::event::Ready::all() - sys::event::Ready::writable()
547            }
548            Direction::Write => sys::event::Ready::writable() | platform::hup(),
549        }
550    }
551}
552
553pub(crate) mod platform {
554    use super::sys::event::Ready;
555    use super::sys::UnixReady;
556
557    pub fn hup() -> Ready {
558        UnixReady::hup().into()
559    }
560
561    pub fn is_hup(ready: &Ready) -> bool {
562        UnixReady::from(*ready).is_hup()
563    }
564}