corcovado 0.1.3

Non-blocking IO library
Documentation
use std::collections::hash_map;
use std::fmt;
use std::mem;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;
use sys;
use sys::fuchsia::{
    assert_fuchsia_ready_repr, epoll_event_to_ready, poll_opts_to_wait_async, EventedFd,
    EventedFdInner, FuchsiaReady,
};
use zircon;
use zircon::AsHandleRef;
use zircon_sys::zx_handle_t;
use {io, Event, PollOpt, Ready, Token};

/// The kind of registration-- file descriptor or handle.
///
/// The last bit of a token is set to indicate the type of the registration.
#[derive(Copy, Clone, Eq, PartialEq)]
enum RegType {
    Fd,
    Handle,
}

fn key_from_token_and_type(token: Token, reg_type: RegType) -> io::Result<u64> {
    let key = token.0 as u64;
    let msb = 1u64 << 63;
    if (key & msb) != 0 {
        return Err(io::Error::new(
            io::ErrorKind::InvalidInput,
            "Most-significant bit of token must remain unset.",
        ));
    }

    Ok(match reg_type {
        RegType::Fd => key,
        RegType::Handle => key | msb,
    })
}

fn token_and_type_from_key(key: u64) -> (Token, RegType) {
    let msb = 1u64 << 63;
    (
        Token((key & !msb) as usize),
        if (key & msb) == 0 {
            RegType::Fd
        } else {
            RegType::Handle
        },
    )
}

/// Each Selector has a globally unique(ish) ID associated with it. This ID
/// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
/// registered with the `Selector`. If a type that is previously associated with
/// a `Selector` attempts to register itself with a different `Selector`, the
/// operation will return with an error. This matches windows behavior.
static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;

pub struct Selector {
    id: usize,

    /// Zircon object on which the handles have been registered, and on which events occur
    port: Arc<zircon::Port>,

    /// Whether or not `tokens_to_rereg` contains any elements. This is a best-effort attempt
    /// used to prevent having to lock `tokens_to_rereg` when it is empty.
    has_tokens_to_rereg: AtomicBool,

    /// List of `Token`s corresponding to registrations that need to be reregistered before the
    /// next `port::wait`. This is necessary to provide level-triggered behavior for
    /// `Async::repeating` registrations.
    ///
    /// When a level-triggered `Async::repeating` event is seen, its token is added to this list so
    /// that it will be reregistered before the next `port::wait` call, making `port::wait` return
    /// immediately if the signal was high during the reregistration.
    ///
    /// Note: when used at the same time, the `tokens_to_rereg` lock should be taken out _before_
    /// `token_to_fd`.
    tokens_to_rereg: Mutex<Vec<Token>>,

    /// Map from tokens to weak references to `EventedFdInner`-- a structure describing a
    /// file handle, its associated `fdio` object, and its current registration.
    token_to_fd: Mutex<hash_map::HashMap<Token, Weak<EventedFdInner>>>,
}

impl Selector {
    pub fn new() -> io::Result<Selector> {
        // Assertion from fuchsia/ready.rs to make sure that FuchsiaReady's representation is
        // compatible with Ready.
        assert_fuchsia_ready_repr();

        let port = Arc::new(zircon::Port::create(zircon::PortOpts::Default)?);

        // offset by 1 to avoid choosing 0 as the id of a selector
        let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;

        let has_tokens_to_rereg = AtomicBool::new(false);
        let tokens_to_rereg = Mutex::new(Vec::new());
        let token_to_fd = Mutex::new(hash_map::HashMap::new());

        Ok(Selector {
            id: id,
            port: port,
            has_tokens_to_rereg: has_tokens_to_rereg,
            tokens_to_rereg: tokens_to_rereg,
            token_to_fd: token_to_fd,
        })
    }

    pub fn id(&self) -> usize {
        self.id
    }

    /// Returns a reference to the underlying port `Arc`.
    pub fn port(&self) -> &Arc<zircon::Port> {
        &self.port
    }

    /// Reregisters all registrations pointed to by the `tokens_to_rereg` list
    /// if `has_tokens_to_rereg`.
    fn reregister_handles(&self) -> io::Result<()> {
        // We use `Ordering::Acquire` to make sure that we see all `tokens_to_rereg`
        // written before the store using `Ordering::Release`.
        if self.has_tokens_to_rereg.load(Ordering::Acquire) {
            let mut tokens = self.tokens_to_rereg.lock().unwrap();
            let token_to_fd = self.token_to_fd.lock().unwrap();
            for token in tokens.drain(0..) {
                if let Some(eventedfd) = token_to_fd.get(&token).and_then(|h| h.upgrade())
                {
                    eventedfd.rereg_for_level(&self.port);
                }
            }
            self.has_tokens_to_rereg.store(false, Ordering::Release);
        }
        Ok(())
    }

    pub fn select(
        &self,
        evts: &mut Events,
        _awakener: Token,
        timeout: Option<Duration>,
    ) -> io::Result<bool> {
        evts.clear();

        self.reregister_handles()?;

        let deadline = match timeout {
            Some(duration) => {
                let nanos = duration
                    .as_secs()
                    .saturating_mul(1_000_000_000)
                    .saturating_add(duration.subsec_nanos() as u64);

                zircon::deadline_after(nanos)
            }
            None => zircon::ZX_TIME_INFINITE,
        };

        let packet = match self.port.wait(deadline) {
            Ok(packet) => packet,
            Err(zircon::Status::ErrTimedOut) => return Ok(false),
            Err(e) => Err(e)?,
        };

        let observed_signals = match packet.contents() {
            zircon::PacketContents::SignalOne(signal_packet) => signal_packet.observed(),
            zircon::PacketContents::SignalRep(signal_packet) => signal_packet.observed(),
            zircon::PacketContents::User(_user_packet) => {
                // User packets are only ever sent by an Awakener
                return Ok(true);
            }
        };

        let key = packet.key();
        let (token, reg_type) = token_and_type_from_key(key);

        match reg_type {
            RegType::Handle => {
                // We can return immediately-- no lookup or registration necessary.
                evts.events
                    .push(Event::new(Ready::from(observed_signals), token));
                Ok(false)
            }
            RegType::Fd => {
                // Convert the signals to epoll events using __fdio_wait_end,
                // and add to reregistration list if necessary.
                let events: u32;
                {
                    let handle = if let Some(handle) = self
                        .token_to_fd
                        .lock()
                        .unwrap()
                        .get(&token)
                        .and_then(|h| h.upgrade())
                    {
                        handle
                    } else {
                        // This handle is apparently in the process of removal.
                        // It has been removed from the list, but port_cancel has not been called.
                        return Ok(false);
                    };

                    events = unsafe {
                        let mut events: u32 = mem::uninitialized();
                        sys::fuchsia::sys::__fdio_wait_end(
                            handle.fdio(),
                            observed_signals,
                            &mut events,
                        );
                        events
                    };

                    // If necessary, queue to be reregistered before next port_await
                    let needs_to_rereg = {
                        let registration_lock = handle.registration().lock().unwrap();

                        registration_lock
                            .as_ref()
                            .and_then(|r| r.rereg_signals())
                            .is_some()
                    };

                    if needs_to_rereg {
                        let mut tokens_to_rereg_lock =
                            self.tokens_to_rereg.lock().unwrap();
                        tokens_to_rereg_lock.push(token);
                        // We use `Ordering::Release` to make sure that we see all `tokens_to_rereg`
                        // written before the store.
                        self.has_tokens_to_rereg.store(true, Ordering::Release);
                    }
                }

                evts.events
                    .push(Event::new(epoll_event_to_ready(events), token));
                Ok(false)
            }
        }
    }

    /// Register event interests for the given IO handle with the OS
    pub fn register_fd(
        &self,
        handle: &zircon::Handle,
        fd: &EventedFd,
        token: Token,
        signals: zircon::Signals,
        poll_opts: PollOpt,
    ) -> io::Result<()> {
        {
            let mut token_to_fd = self.token_to_fd.lock().unwrap();
            match token_to_fd.entry(token) {
                hash_map::Entry::Occupied(_) => {
                    return Err(io::Error::new(
                        io::ErrorKind::AlreadyExists,
                        "Attempted to register a filedescriptor on an existing token.",
                    ))
                }
                hash_map::Entry::Vacant(slot) => slot.insert(Arc::downgrade(&fd.inner)),
            };
        }

        let wait_async_opts = poll_opts_to_wait_async(poll_opts);

        let wait_res = handle.wait_async_handle(
            &self.port,
            token.0 as u64,
            signals,
            wait_async_opts,
        );

        if wait_res.is_err() {
            self.token_to_fd.lock().unwrap().remove(&token);
        }

        Ok(wait_res?)
    }

    /// Deregister event interests for the given IO handle with the OS
    pub fn deregister_fd(&self, handle: &zircon::Handle, token: Token) -> io::Result<()> {
        self.token_to_fd.lock().unwrap().remove(&token);

        // We ignore NotFound errors since oneshots are automatically deregistered,
        // but mio will attempt to deregister them manually.
        self.port
            .cancel(&*handle, token.0 as u64)
            .map_err(io::Error::from)
            .or_else(|e| {
                if e.kind() == io::ErrorKind::NotFound {
                    Ok(())
                } else {
                    Err(e)
                }
            })
    }

    pub fn register_handle(
        &self,
        handle: zx_handle_t,
        token: Token,
        interests: Ready,
        poll_opts: PollOpt,
    ) -> io::Result<()> {
        if poll_opts.is_level() && !poll_opts.is_oneshot() {
            return Err(io::Error::new(
                io::ErrorKind::InvalidInput,
                "Repeated level-triggered events are not supported on Fuchsia handles.",
            ));
        }

        let temp_handle = unsafe { zircon::Handle::from_raw(handle) };

        let res = temp_handle.wait_async_handle(
            &self.port,
            key_from_token_and_type(token, RegType::Handle)?,
            FuchsiaReady::from(interests).into_zx_signals(),
            poll_opts_to_wait_async(poll_opts),
        );

        mem::forget(temp_handle);

        Ok(res?)
    }

    pub fn deregister_handle(&self, handle: zx_handle_t, token: Token) -> io::Result<()> {
        let temp_handle = unsafe { zircon::Handle::from_raw(handle) };
        let res = self.port.cancel(
            &temp_handle,
            key_from_token_and_type(token, RegType::Handle)?,
        );

        mem::forget(temp_handle);

        Ok(res?)
    }
}

pub struct Events {
    events: Vec<Event>,
}

impl Events {
    pub fn with_capacity(_u: usize) -> Events {
        // The Fuchsia selector only handles one event at a time,
        // so we ignore the default capacity and set it to one.
        Events {
            events: Vec::with_capacity(1),
        }
    }
    pub fn len(&self) -> usize {
        self.events.len()
    }
    pub fn capacity(&self) -> usize {
        self.events.capacity()
    }
    pub fn is_empty(&self) -> bool {
        self.events.is_empty()
    }
    pub fn get(&self, idx: usize) -> Option<Event> {
        self.events.get(idx).map(|e| *e)
    }
    pub fn push_event(&mut self, event: Event) {
        self.events.push(event)
    }
    pub fn clear(&mut self) {
        self.events.events.drain(0..);
    }
}

impl fmt::Debug for Events {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        fmt.debug_struct("Events")
            .field("len", &self.len())
            .finish()
    }
}