tokio 0.3.0

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
use super::{Direction, Ready, ReadyEvent, Tick};
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
use crate::util::slab::Entry;

use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::task::{Context, Poll, Waker};

cfg_io_readiness! {
    use crate::util::linked_list::{self, LinkedList};

    use std::cell::UnsafeCell;
    use std::future::Future;
    use std::marker::PhantomPinned;
    use std::pin::Pin;
    use std::ptr::NonNull;
}

/// Stored in the I/O driver resource slab.
#[derive(Debug)]
pub(crate) struct ScheduledIo {
    /// Packs the resource's readiness with the resource's generation.
    readiness: AtomicUsize,

    waiters: Mutex<Waiters>,
}

cfg_io_readiness! {
    type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
}

#[derive(Debug, Default)]
struct Waiters {
    #[cfg(feature = "net")]
    /// List of all current waiters
    list: WaitList,

    /// Waker used for AsyncRead
    reader: Option<Waker>,

    /// Waker used for AsyncWrite
    writer: Option<Waker>,
}

cfg_io_readiness! {
    #[derive(Debug)]
    struct Waiter {
        pointers: linked_list::Pointers<Waiter>,

        /// The waker for this task
        waker: Option<Waker>,

        /// The interest this waiter is waiting on
        interest: mio::Interest,

        is_ready: bool,

        /// Should never be `!Unpin`
        _p: PhantomPinned,
    }

    /// Future returned by `readiness()`
    struct Readiness<'a> {
        scheduled_io: &'a ScheduledIo,

        state: State,

        /// Entry in the waiter `LinkedList`.
        waiter: UnsafeCell<Waiter>,
    }

    enum State {
        Init,
        Waiting,
        Done,
    }
}

// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
//
// | reserved | generation |  driver tick | readinesss |
// |----------+------------+--------------+------------|
// |   1 bit  |   7 bits   +    8 bits    +   16 bits  |

const READINESS: bit::Pack = bit::Pack::least_significant(16);

const TICK: bit::Pack = READINESS.then(8);

const GENERATION: bit::Pack = TICK.then(7);

#[test]
fn test_generations_assert_same() {
    assert_eq!(super::GENERATION, GENERATION);
}

// ===== impl ScheduledIo =====

impl Entry for ScheduledIo {
    fn reset(&self) {
        let state = self.readiness.load(Acquire);

        let generation = GENERATION.unpack(state);
        let next = GENERATION.pack_lossy(generation + 1, 0);

        self.readiness.store(next, Release);
    }
}

impl Default for ScheduledIo {
    fn default() -> ScheduledIo {
        ScheduledIo {
            readiness: AtomicUsize::new(0),
            waiters: Mutex::new(Default::default()),
        }
    }
}

impl ScheduledIo {
    pub(crate) fn generation(&self) -> usize {
        GENERATION.unpack(self.readiness.load(Acquire))
    }

    /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
    /// the current value, returning the previous readiness value.
    ///
    /// # Arguments
    /// - `token`: the token for this `ScheduledIo`.
    /// - `tick`: whether setting the tick or trying to clear readiness for a
    ///    specific tick.
    /// - `f`: a closure returning a new readiness value given the previous
    ///   readiness.
    ///
    /// # Returns
    ///
    /// If the given token's generation no longer matches the `ScheduledIo`'s
    /// generation, then the corresponding IO resource has been removed and
    /// replaced with a new resource. In that case, this method returns `Err`.
    /// Otherwise, this returns the previous readiness.
    pub(super) fn set_readiness(
        &self,
        token: Option<usize>,
        tick: Tick,
        f: impl Fn(Ready) -> Ready,
    ) -> Result<(), ()> {
        let mut current = self.readiness.load(Acquire);

        loop {
            let current_generation = GENERATION.unpack(current);

            if let Some(token) = token {
                // Check that the generation for this access is still the
                // current one.
                if GENERATION.unpack(token) != current_generation {
                    return Err(());
                }
            }

            // Mask out the tick/generation bits so that the modifying
            // function doesn't see them.
            let current_readiness = Ready::from_usize(current);
            let new = f(current_readiness);

            let packed = match tick {
                Tick::Set(t) => TICK.pack(t as usize, new.as_usize()),
                Tick::Clear(t) => {
                    if TICK.unpack(current) as u8 != t {
                        // Trying to clear readiness with an old event!
                        return Err(());
                    }

                    TICK.pack(t as usize, new.as_usize())
                }
            };

            let next = GENERATION.pack(current_generation, packed);

            match self
                .readiness
                .compare_exchange(current, next, AcqRel, Acquire)
            {
                Ok(_) => return Ok(()),
                // we lost the race, retry!
                Err(actual) => current = actual,
            }
        }
    }

    /// Notifies all pending waiters that have registered interest in `ready`.
    ///
    /// There may be many waiters to notify. Waking the pending task **must** be
    /// done from outside of the lock otherwise there is a potential for a
    /// deadlock.
    ///
    /// A stack array of wakers is created and filled with wakers to notify, the
    /// lock is released, and the wakers are notified. Because there may be more
    /// than 32 wakers to notify, if the stack array fills up, the lock is
    /// released, the array is cleared, and the iteration continues.
    pub(super) fn wake(&self, ready: Ready) {
        const NUM_WAKERS: usize = 32;

        let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
        let mut curr = 0;

        let mut waiters = self.waiters.lock();

        // check for AsyncRead slot
        if ready.is_readable() {
            if let Some(waker) = waiters.reader.take() {
                wakers[curr] = Some(waker);
                curr += 1;
            }
        }

        // check for AsyncWrite slot
        if ready.is_writable() {
            if let Some(waker) = waiters.writer.take() {
                wakers[curr] = Some(waker);
                curr += 1;
            }
        }

        #[cfg(feature = "net")]
        'outer: loop {
            let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));

            while curr < NUM_WAKERS {
                match iter.next() {
                    Some(waiter) => {
                        let waiter = unsafe { &mut *waiter.as_ptr() };

                        if let Some(waker) = waiter.waker.take() {
                            waiter.is_ready = true;
                            wakers[curr] = Some(waker);
                            curr += 1;
                        }
                    }
                    None => {
                        break 'outer;
                    }
                }
            }

            drop(waiters);

            for waker in wakers.iter_mut().take(curr) {
                waker.take().unwrap().wake();
            }

            curr = 0;

            // Acquire the lock again.
            waiters = self.waiters.lock();
        }

        // Release the lock before notifying
        drop(waiters);

        for waker in wakers.iter_mut().take(curr) {
            waker.take().unwrap().wake();
        }
    }

    /// Poll version of checking readiness for a certain direction.
    ///
    /// These are to support `AsyncRead` and `AsyncWrite` polling methods,
    /// which cannot use the `async fn` version. This uses reserved reader
    /// and writer slots.
    pub(in crate::io) fn poll_readiness(
        &self,
        cx: &mut Context<'_>,
        direction: Direction,
    ) -> Poll<ReadyEvent> {
        let curr = self.readiness.load(Acquire);

        let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));

        if ready.is_empty() {
            // Update the task info
            let mut waiters = self.waiters.lock();
            let slot = match direction {
                Direction::Read => &mut waiters.reader,
                Direction::Write => &mut waiters.writer,
            };
            *slot = Some(cx.waker().clone());

            // Try again, in case the readiness was changed while we were
            // taking the waiters lock
            let curr = self.readiness.load(Acquire);
            let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
            if ready.is_empty() {
                Poll::Pending
            } else {
                Poll::Ready(ReadyEvent {
                    tick: TICK.unpack(curr) as u8,
                    ready,
                })
            }
        } else {
            Poll::Ready(ReadyEvent {
                tick: TICK.unpack(curr) as u8,
                ready,
            })
        }
    }

    pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
        // This consumes the current readiness state **except** for closed
        // states. Closed states are excluded because they are final states.
        let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;

        // result isn't important
        let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed);
    }
}

impl Drop for ScheduledIo {
    fn drop(&mut self) {
        self.wake(Ready::ALL);
    }
}

unsafe impl Send for ScheduledIo {}
unsafe impl Sync for ScheduledIo {}

cfg_io_readiness! {
    impl ScheduledIo {
        /// An async version of `poll_readiness` which uses a linked list of wakers
        pub(crate) async fn readiness(&self, interest: mio::Interest) -> ReadyEvent {
            self.readiness_fut(interest).await
        }

        // This is in a separate function so that the borrow checker doesn't think
        // we are borrowing the `UnsafeCell` possibly over await boundaries.
        //
        // Go figure.
        fn readiness_fut(&self, interest: mio::Interest) -> Readiness<'_> {
            Readiness {
                scheduled_io: self,
                state: State::Init,
                waiter: UnsafeCell::new(Waiter {
                    pointers: linked_list::Pointers::new(),
                    waker: None,
                    is_ready: false,
                    interest,
                    _p: PhantomPinned,
                }),
            }
        }
    }

    unsafe impl linked_list::Link for Waiter {
        type Handle = NonNull<Waiter>;
        type Target = Waiter;

        fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
            *handle
        }

        unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
            ptr
        }

        unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
            NonNull::from(&mut target.as_mut().pointers)
        }
    }

    // ===== impl Readiness =====

    impl Future for Readiness<'_> {
        type Output = ReadyEvent;

        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            use std::sync::atomic::Ordering::SeqCst;

            let (scheduled_io, state, waiter) = unsafe {
                let me = self.get_unchecked_mut();
                (&me.scheduled_io, &mut me.state, &me.waiter)
            };

            loop {
                match *state {
                    State::Init => {
                        // Optimistically check existing readiness
                        let curr = scheduled_io.readiness.load(SeqCst);
                        let ready = Ready::from_usize(READINESS.unpack(curr));

                        // Safety: `waiter.interest` never changes
                        let interest = unsafe { (*waiter.get()).interest };
                        let ready = ready.intersection(interest);

                        if !ready.is_empty() {
                            // Currently ready!
                            let tick = TICK.unpack(curr) as u8;
                            *state = State::Done;
                            return Poll::Ready(ReadyEvent { ready, tick });
                        }

                        // Wasn't ready, take the lock (and check again while locked).
                        let mut waiters = scheduled_io.waiters.lock();

                        let curr = scheduled_io.readiness.load(SeqCst);
                        let ready = Ready::from_usize(READINESS.unpack(curr));
                        let ready = ready.intersection(interest);

                        if !ready.is_empty() {
                            // Currently ready!
                            let tick = TICK.unpack(curr) as u8;
                            *state = State::Done;
                            return Poll::Ready(ReadyEvent { ready, tick });
                        }

                        // Not ready even after locked, insert into list...

                        // Safety: called while locked
                        unsafe {
                            (*waiter.get()).waker = Some(cx.waker().clone());
                        }

                        // Insert the waiter into the linked list
                        //
                        // safety: pointers from `UnsafeCell` are never null.
                        waiters
                            .list
                            .push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
                        *state = State::Waiting;
                    }
                    State::Waiting => {
                        // Currently in the "Waiting" state, implying the caller has
                        // a waiter stored in the waiter list (guarded by
                        // `notify.waiters`). In order to access the waker fields,
                        // we must hold the lock.

                        let waiters = scheduled_io.waiters.lock();

                        // Safety: called while locked
                        let w = unsafe { &mut *waiter.get() };

                        if w.is_ready {
                            // Our waker has been notified.
                            *state = State::Done;
                        } else {
                            // Update the waker, if necessary.
                            if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
                                w.waker = Some(cx.waker().clone());
                            }

                            return Poll::Pending;
                        }

                        // Explicit drop of the lock to indicate the scope that the
                        // lock is held. Because holding the lock is required to
                        // ensure safe access to fields not held within the lock, it
                        // is helpful to visualize the scope of the critical
                        // section.
                        drop(waiters);
                    }
                    State::Done => {
                        let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8;

                        // Safety: State::Done means it is no longer shared
                        let w = unsafe { &mut *waiter.get() };

                        return Poll::Ready(ReadyEvent {
                            tick,
                            ready: Ready::from_interest(w.interest),
                        });
                    }
                }
            }
        }
    }

    impl Drop for Readiness<'_> {
        fn drop(&mut self) {
            let mut waiters = self.scheduled_io.waiters.lock();

            // Safety: `waiter` is only ever stored in `waiters`
            unsafe {
                waiters
                    .list
                    .remove(NonNull::new_unchecked(self.waiter.get()))
            };
        }
    }

    unsafe impl Send for Readiness<'_> {}
    unsafe impl Sync for Readiness<'_> {}
}