use std::os::unix::io::RawFd;
use std::collections::HashMap;
use std::slice;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::os::unix::io::AsRawFd;
use nix::sys::event::{ev_set, kqueue, kevent, KEvent, EventFilter, EventFlag, FilterFlag};
use libc::{uintptr_t, intptr_t};
use std::io::Result;
use event::Event;
use notification::Notification;
use user_event::UserEvent;
use nix_err_to_io_err;
type UserData = intptr_t;
static KQUEUE_EVENT_SIZE: usize = 1024;
pub struct KernelPoller {
kqueue: RawFd,
registrar: KernelRegistrar,
eventlist: Vec<KEvent>,
notifications: HashMap<RawFd, Notification>
}
impl KernelPoller {
pub fn new() -> Result<KernelPoller> {
let kq = kqueue().map_err(nix_err_to_io_err)?;
let registrations = Arc::new(AtomicUsize::new(1));
Ok(KernelPoller {
kqueue: kq,
registrar: KernelRegistrar::new(kq, registrations),
eventlist: Vec::with_capacity(KQUEUE_EVENT_SIZE),
notifications: HashMap::with_capacity(KQUEUE_EVENT_SIZE)
})
}
pub fn get_registrar(&self) -> KernelRegistrar {
self.registrar.clone()
}
pub fn wait(&mut self, timeout_ms: usize) -> Result<Vec<Notification>> {
let dst = unsafe {
slice::from_raw_parts_mut(self.eventlist.as_mut_ptr(), self.eventlist.capacity())
};
let count = kevent(self.kqueue, &[], dst, timeout_ms).map_err(nix_err_to_io_err)?;
unsafe { self.eventlist.set_len(count); }
self.coalesce_events();
Ok(self.notifications.drain().map(|(_, v)| v).collect())
}
fn coalesce_events(&mut self) {
for e in self.eventlist.drain(..) {
let event = event_from_filter(e.filter());
let new_notification = Notification {
id: e.udata() as usize,
event: event.clone()
};
let mut notification = self.notifications.entry(e.ident() as RawFd)
.or_insert(new_notification);
if notification.event != event {
notification.event = Event::Both
}
}
}
}
#[derive(Debug, Clone)]
pub struct KernelRegistrar {
kqueue: RawFd,
total_registrations: Arc<AtomicUsize>
}
impl KernelRegistrar {
fn new(kq: RawFd, registrations: Arc<AtomicUsize>) -> KernelRegistrar {
KernelRegistrar {
kqueue: kq,
total_registrations: registrations
}
}
pub fn register<T: AsRawFd>(&self, sock: &T, event: Event) -> Result<usize> {
let sock_fd = sock.as_raw_fd();
let id = self.total_registrations.fetch_add(1, Ordering::SeqCst);
let changes = make_changelist(sock_fd, event, id as UserData);
kevent(self.kqueue, &changes, &mut[], 0).map_err(nix_err_to_io_err)?;
Ok(id)
}
pub fn reregister<T: AsRawFd>(&self, id: usize, sock: &T, event: Event) -> Result<()> {
let sock_fd = sock.as_raw_fd();
let changes = make_changelist(sock_fd, event, id as UserData);
kevent(self.kqueue, &changes, &mut[], 0).map_err(nix_err_to_io_err)?;
Ok(())
}
pub fn deregister<T: AsRawFd>(&self, sock: &T) -> Result<()> {
let sock_fd = sock.as_raw_fd();
let mut changes = make_changelist(sock_fd, Event::Both, 0);
for e in changes.iter_mut() {
set_flags(e, EventFlag::EV_DELETE);
}
let _ = kevent(self.kqueue, &changes, &mut[], 0);
Ok(())
}
pub fn register_user_event(&mut self) -> Result<UserEvent> {
let id = self.total_registrations.fetch_add(1, Ordering::SeqCst);
let changes = vec![make_user_event(id)];
kevent(self.kqueue, &changes, &mut[], 0).map_err(nix_err_to_io_err)?;
Ok(UserEvent {id: id, registrar: self.clone()})
}
pub fn trigger_user_event(&self, event: &UserEvent) -> Result<()> {
let mut e = make_user_event(event.get_id());
set_flags(&mut e, EventFlag::EV_ENABLE);
set_fflags(&mut e, FilterFlag::NOTE_TRIGGER);
kevent(self.kqueue, &vec![e], &mut[], 0).map_err(nix_err_to_io_err)?;
Ok(())
}
pub fn clear_user_event(&self, event: &UserEvent) -> Result<()> {
let mut user_event = make_user_event(event.get_id());
set_flags(&mut user_event, EventFlag::EV_DISABLE);
kevent(self.kqueue, &vec![user_event], &mut[], 0).map_err(nix_err_to_io_err)?;
Ok(())
}
pub fn deregister_user_event(&self, event_id: usize) -> Result<()> {
let mut user_event = make_user_event(event_id);
set_flags(&mut user_event, EventFlag::EV_DELETE);
kevent(self.kqueue, &vec![user_event], &mut[], 0).map_err(nix_err_to_io_err)?;
Ok(())
}
pub fn set_timeout(&self, timeout: usize) -> Result<usize> {
self.set_timer(timeout, false)
}
pub fn set_interval(&self, timeout: usize) -> Result<usize> {
self.set_timer(timeout, true)
}
pub fn cancel_timeout(&self, timer_id: usize) -> Result<()> {
let mut e = make_timer(timer_id, 0, false);
set_flags(&mut e, EventFlag::EV_DELETE);
kevent(self.kqueue, &vec![e], &mut[], 0).map_err(nix_err_to_io_err)?;
Ok(())
}
fn set_timer(&self, timeout: usize, recurring: bool) -> Result<usize> {
let id = self.total_registrations.fetch_add(1, Ordering::SeqCst);
let changes = vec![make_timer(id, timeout, recurring)];
kevent(self.kqueue, &changes, &mut[], 0).map_err(nix_err_to_io_err)?;
Ok(id)
}
}
fn event_from_filter(filter: EventFilter) -> Event {
if filter == EventFilter::EVFILT_READ ||
filter == EventFilter::EVFILT_TIMER ||
filter == EventFilter::EVFILT_USER {
Event::Read
} else {
Event::Write
}
}
fn make_changelist(sock_fd: RawFd, event: Event, user_data: UserData) -> Vec<KEvent> {
let mut ev = KEvent::new(
sock_fd as uintptr_t,
EventFilter::EVFILT_READ,
EventFlag::EV_ADD | EventFlag::EV_CLEAR,
FilterFlag::empty(),
0,
user_data
);
match event {
Event::Read => vec![ev],
Event::Write => {
set_filter(&mut ev, EventFilter::EVFILT_WRITE);
vec![ev]
},
Event::Both => vec![ev, KEvent::new(ev.ident(), EventFilter::EVFILT_WRITE, ev.flags(), ev.fflags(), ev.data(), ev.udata())]
}
}
fn make_user_event(id: usize) -> KEvent {
KEvent::new(
id as uintptr_t,
EventFilter::EVFILT_USER,
EventFlag::EV_ADD | EventFlag::EV_CLEAR | EventFlag::EV_ENABLE,
FilterFlag::empty(),
0,
id as UserData
)
}
fn set_filter(e: &mut KEvent, filter: EventFilter) {
let ident = e.ident();
let flags = e.flags();
let fflags = e.fflags();
let udata = e.udata();
ev_set(e, ident, filter, flags, fflags, udata);
}
fn set_flags(e: &mut KEvent, flags: EventFlag) {
let ident = e.ident();
let filter = e.filter();
let fflags = e.fflags();
let udata = e.udata();
ev_set(e, ident, filter, flags, fflags, udata);
}
fn set_fflags(e: &mut KEvent, fflags: FilterFlag) {
let ident = e.ident();
let filter = e.filter();
let flags = e.flags();
let udata = e.udata();
ev_set(e, ident, filter, flags, fflags, udata);
}
fn make_timer(id: usize, timeout: usize, recurring: bool) -> KEvent {
let mut flags = EventFlag::EV_ADD;
if !recurring {
flags |= EventFlag::EV_ONESHOT;
}
let ev = KEvent::new(
id as uintptr_t,
EventFilter::EVFILT_TIMER,
flags,
FilterFlag::empty(), timeout as intptr_t,
id as UserData
);
ev
}