kernel 1.1.0

Abstract Reactive Streams
Documentation

use io::ready::Ready;
use io::options::PollOpt;
use io::token::Token;
use io::event::Event;
use io::event;
use std::io;
use nix::libc::timespec;
use nix::unistd::close;
use nix::sys::event::{EventFilter, EventFlag, FilterFlag, KEvent, kqueue, kevent, kevent_ts};
use nix::sys::event::{EV_ADD, EV_CLEAR, EV_DELETE, EV_DISABLE, EV_ENABLE, EV_EOF, EV_ERROR, EV_ONESHOT};
use libc::{self, time_t};
use std::{cmp, fmt, slice};
use std::cell::UnsafeCell;
use std::os::unix::io::RawFd;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::time::Duration;

static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;

pub fn from_nix_error(err: ::nix::Error) -> io::Error {
    io::Error::from_raw_os_error(err.errno() as i32)
}

pub struct Selector {
    id: usize,
    kq: RawFd,
    changes: UnsafeCell<Vec<KEvent>>,
}

#[cfg(not(target_os = "netbsd"))]
type UData = usize;

#[cfg(target_os = "netbsd")]
type UData = isize;

impl Selector {
    pub fn new() -> io::Result<Selector> {
        // offset by 1 to avoid choosing 0 as the id of a selector
        let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
        let kq = try!(kqueue().map_err(from_nix_error));

        unsafe {
            libc::ioctl(kq, libc::FIOCLEX);
        }

        Ok(Selector {
            id: id,
            kq: kq,
            changes: UnsafeCell::new(vec![]),
        })
    }

    pub fn id(&self) -> usize {
        self.id
    }

    pub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> {
        let timeout = timeout.map(|to| {
            timespec {
                tv_sec: cmp::min(to.as_secs(), time_t::max_value() as u64) as time_t,
                tv_nsec: to.subsec_nanos() as i64,
            }
        });

        let cnt = try!(kevent_ts(self.kq, &[], evts.as_mut_slice(), timeout).map_err(from_nix_error));

        self.mut_changes().clear();

        unsafe {
            evts.sys_events.set_len(cnt);
        }

        Ok(evts.coalesce(awakener))
    }

    pub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
        trace!("KEVENT registering; token={:?}; interests={:?}",
               token,
               interests);

        self.ev_register(fd,
                         usize::from(token),
                         EventFilter::EVFILT_READ,
                         interests.contains(Ready::readable()),
                         opts);
        self.ev_register(fd,
                         usize::from(token),
                         EventFilter::EVFILT_WRITE,
                         interests.contains(Ready::writable()),
                         opts);

        self.flush_changes()
    }

    pub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
        // Just need to call register here since EV_ADD is a mod if already
        // registered
        self.register(fd, token, interests, opts)
    }

    pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
        self.ev_push(fd, 0, EventFilter::EVFILT_READ, EV_DELETE);
        self.ev_push(fd, 0, EventFilter::EVFILT_WRITE, EV_DELETE);

        self.flush_changes()
    }

    fn ev_register(&self, fd: RawFd, token: usize, filter: EventFilter, enable: bool, opts: PollOpt) {
        let mut flags = EV_ADD;

        if enable {
            flags = flags | EV_ENABLE;
        } else {
            flags = flags | EV_DISABLE;
        }

        if opts.contains(PollOpt::edge()) {
            flags = flags | EV_CLEAR;
        }

        if opts.contains(PollOpt::oneshot()) {
            flags = flags | EV_ONESHOT;
        }

        self.ev_push(fd, token, filter, flags);
    }

    fn ev_push(&self, fd: RawFd, token: usize, filter: EventFilter, flags: EventFlag) {
        self.mut_changes().push(KEvent {
            ident: fd as ::libc::uintptr_t,
            filter: filter,
            flags: flags,
            fflags: FilterFlag::empty(),
            data: 0,
            udata: token as UData,
        });
    }

    fn flush_changes(&self) -> io::Result<()> {
        let result = kevent(self.kq, self.changes(), &mut [], 0)
            .map(|_| ())
            .map_err(from_nix_error)
            .map(|_| ());

        self.mut_changes().clear();
        result
    }

    fn changes(&self) -> &[KEvent] {
        unsafe { &(*self.changes.get())[..] }
    }

    fn mut_changes(&self) -> &mut Vec<KEvent> {
        unsafe { &mut *self.changes.get() }
    }
}

impl fmt::Debug for Selector {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        fmt.debug_struct("Selector")
            .field("id", &self.id)
            .field("kq", &self.kq)
            .field("changes", &self.changes().len())
            .finish()
    }
}

impl Drop for Selector {
    fn drop(&mut self) {
        let _ = close(self.kq);
    }
}

pub struct Events {
    sys_events: Vec<KEvent>,
    events: Vec<Event>,
    event_map: HashMap<Token, usize>,
}

impl Events {
    pub fn with_capacity(cap: usize) -> Events {
        Events {
            sys_events: Vec::with_capacity(cap),
            events: Vec::with_capacity(cap),
            event_map: HashMap::with_capacity(cap),
        }
    }

    #[inline]
    pub fn len(&self) -> usize {
        self.events.len()
    }

    #[inline]
    pub fn is_empty(&self) -> bool {
        self.events.is_empty()
    }

    pub fn get(&self, idx: usize) -> Option<Event> {
        self.events.get(idx).map(|e| *e)
    }

    fn coalesce(&mut self, awakener: Token) -> bool {
        let mut ret = false;
        self.events.clear();
        self.event_map.clear();

        for e in &self.sys_events {
            let token = Token(e.udata as usize);
            let len = self.events.len();

            if token == awakener {
                // TODO: Should this return an error if event is an error. It
                // is not critical as spurious wakeups are permitted.
                ret = true;
                continue;
            }

            let idx = *self.event_map
                .entry(token)
                .or_insert(len);

            if idx == len {
                // New entry, insert the default
                self.events.push(Event::new(Ready::none(), token));

            }

            if e.flags.contains(EV_ERROR) {
                event::kind_mut(&mut self.events[idx]).insert(Ready::error());
            }

            if e.filter == EventFilter::EVFILT_READ {
                event::kind_mut(&mut self.events[idx]).insert(Ready::readable());
            } else if e.filter == EventFilter::EVFILT_WRITE {
                event::kind_mut(&mut self.events[idx]).insert(Ready::writable());
            }

            if e.flags.contains(EV_EOF) {
                event::kind_mut(&mut self.events[idx]).insert(Ready::hup());

                // When the read end of the socket is closed, EV_EOF is set on
                // flags, and fflags contains the error if there is one.
                if !e.fflags.is_empty() {
                    event::kind_mut(&mut self.events[idx]).insert(Ready::error());
                }
            }
        }

        ret
    }

    pub fn push_event(&mut self, event: Event) {
        self.events.push(event);
    }

    fn as_mut_slice(&mut self) -> &mut [KEvent] {
        unsafe {
            let ptr = (&mut self.sys_events[..]).as_mut_ptr();
            slice::from_raw_parts_mut(ptr, self.sys_events.capacity())
        }
    }
}

impl fmt::Debug for Events {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        write!(fmt, "Events {{ len: {} }}", self.sys_events.len())
    }
}

#[derive(Clone, Copy)]
#[repr(C, packed)]
pub struct Notify {
    pub fd: RawFd,
}

impl Notify {
    pub fn new() -> Self {
        Notify { fd: !0 as i32 }
    }
    pub fn send(&self) {}
    pub fn wait(&self) {}
}