use std::cell::{Cell, Ref, RefCell};
use std::cmp::min;
use std::fs::File;
use std::i32;
use std::i64;
use std::marker::PhantomData;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::ptr::null_mut;
use std::slice;
use std::thread;
use std::time::Duration;
use libc::{
c_int, epoll_create1, epoll_ctl, epoll_event, epoll_wait, EPOLLHUP, EPOLLIN, EPOLLOUT,
EPOLL_CLOEXEC, EPOLL_CTL_ADD, EPOLL_CTL_DEL, EPOLL_CTL_MOD,
};
use {errno_result, Result};
const POLL_CONTEXT_MAX_EVENTS: usize = 16;
pub struct EpollEvents(RefCell<[epoll_event; POLL_CONTEXT_MAX_EVENTS]>);
impl EpollEvents {
pub fn new() -> EpollEvents {
EpollEvents(RefCell::new(
[epoll_event { events: 0, u64: 0 }; POLL_CONTEXT_MAX_EVENTS],
))
}
}
pub trait PollToken {
fn as_raw_token(&self) -> u64;
fn from_raw_token(data: u64) -> Self;
}
impl PollToken for usize {
fn as_raw_token(&self) -> u64 {
*self as u64
}
fn from_raw_token(data: u64) -> Self {
data as Self
}
}
impl PollToken for u64 {
fn as_raw_token(&self) -> u64 {
*self as u64
}
fn from_raw_token(data: u64) -> Self {
data as Self
}
}
impl PollToken for u32 {
fn as_raw_token(&self) -> u64 {
*self as u64
}
fn from_raw_token(data: u64) -> Self {
data as Self
}
}
impl PollToken for u16 {
fn as_raw_token(&self) -> u64 {
*self as u64
}
fn from_raw_token(data: u64) -> Self {
data as Self
}
}
impl PollToken for u8 {
fn as_raw_token(&self) -> u64 {
*self as u64
}
fn from_raw_token(data: u64) -> Self {
data as Self
}
}
impl PollToken for () {
fn as_raw_token(&self) -> u64 {
0
}
fn from_raw_token(_data: u64) -> Self {}
}
pub struct PollEvent<'a, T> {
event: &'a epoll_event,
token: PhantomData<T>, }
impl<'a, T: PollToken> PollEvent<'a, T> {
pub fn token(&self) -> T {
T::from_raw_token(self.event.u64)
}
pub fn readable(&self) -> bool {
self.event.events & (EPOLLIN as u32) != 0
}
pub fn hungup(&self) -> bool {
self.event.events & (EPOLLHUP as u32) != 0
}
}
pub struct PollEventIter<'a, I, T>
where
I: Iterator<Item = &'a epoll_event>,
{
mask: u32,
iter: I,
tokens: PhantomData<[T]>, }
impl<'a, I, T> Iterator for PollEventIter<'a, I, T>
where
I: Iterator<Item = &'a epoll_event>,
T: PollToken,
{
type Item = PollEvent<'a, T>;
fn next(&mut self) -> Option<Self::Item> {
let mask = self.mask;
self.iter
.find(|event| (event.events & mask) != 0)
.map(|event| PollEvent {
event,
token: PhantomData,
})
}
}
pub struct PollEvents<'a, T> {
count: usize,
events: Ref<'a, [epoll_event; POLL_CONTEXT_MAX_EVENTS]>,
tokens: PhantomData<[T]>, }
impl<'a, T: PollToken> PollEvents<'a, T> {
pub fn to_owned(&self) -> PollEventsOwned<T> {
PollEventsOwned {
count: self.count,
events: RefCell::new(*self.events),
tokens: PhantomData,
}
}
pub fn iter(&self) -> PollEventIter<slice::Iter<epoll_event>, T> {
PollEventIter {
mask: 0xffffffff,
iter: self.events[..self.count].iter(),
tokens: PhantomData,
}
}
pub fn iter_readable(&self) -> PollEventIter<slice::Iter<epoll_event>, T> {
PollEventIter {
mask: EPOLLIN as u32,
iter: self.events[..self.count].iter(),
tokens: PhantomData,
}
}
pub fn iter_hungup(&self) -> PollEventIter<slice::Iter<epoll_event>, T> {
PollEventIter {
mask: EPOLLHUP as u32,
iter: self.events[..self.count].iter(),
tokens: PhantomData,
}
}
}
pub struct PollEventsOwned<T> {
count: usize,
events: RefCell<[epoll_event; POLL_CONTEXT_MAX_EVENTS]>,
tokens: PhantomData<T>, }
impl<T: PollToken> PollEventsOwned<T> {
pub fn as_ref(&self) -> PollEvents<T> {
PollEvents {
count: self.count,
events: self.events.borrow(),
tokens: PhantomData,
}
}
}
pub struct WatchingEvents(u32);
impl WatchingEvents {
#[inline(always)]
pub fn empty() -> WatchingEvents {
WatchingEvents(0)
}
#[inline(always)]
pub fn new(raw: u32) -> WatchingEvents {
WatchingEvents(raw)
}
#[inline(always)]
pub fn set_read(self) -> WatchingEvents {
WatchingEvents(self.0 | EPOLLIN as u32)
}
#[inline(always)]
pub fn set_write(self) -> WatchingEvents {
WatchingEvents(self.0 | EPOLLOUT as u32)
}
pub fn get_raw(&self) -> u32 {
self.0
}
}
pub struct EpollContext<T> {
epoll_ctx: File,
tokens: PhantomData<[T]>,
}
impl<T: PollToken> EpollContext<T> {
pub fn new() -> Result<EpollContext<T>> {
let epoll_fd = unsafe { epoll_create1(EPOLL_CLOEXEC) };
if epoll_fd < 0 {
return errno_result();
}
Ok(EpollContext {
epoll_ctx: unsafe { File::from_raw_fd(epoll_fd) },
tokens: PhantomData,
})
}
pub fn add(&self, fd: &AsRawFd, token: T) -> Result<()> {
self.add_fd_with_events(fd, WatchingEvents::empty().set_read(), token)
}
pub fn add_fd_with_events(&self, fd: &AsRawFd, events: WatchingEvents, token: T) -> Result<()> {
let mut evt = epoll_event {
events: events.get_raw(),
u64: token.as_raw_token(),
};
let ret = unsafe {
epoll_ctl(
self.epoll_ctx.as_raw_fd(),
EPOLL_CTL_ADD,
fd.as_raw_fd(),
&mut evt,
)
};
if ret < 0 {
return errno_result();
};
Ok(())
}
pub fn modify(&self, fd: &AsRawFd, events: WatchingEvents, token: T) -> Result<()> {
let mut evt = epoll_event {
events: events.0,
u64: token.as_raw_token(),
};
let ret = unsafe {
epoll_ctl(
self.epoll_ctx.as_raw_fd(),
EPOLL_CTL_MOD,
fd.as_raw_fd(),
&mut evt,
)
};
if ret < 0 {
return errno_result();
};
Ok(())
}
pub fn delete(&self, fd: &AsRawFd) -> Result<()> {
let ret = unsafe {
epoll_ctl(
self.epoll_ctx.as_raw_fd(),
EPOLL_CTL_DEL,
fd.as_raw_fd(),
null_mut(),
)
};
if ret < 0 {
return errno_result();
};
Ok(())
}
pub fn wait<'a>(&self, events: &'a EpollEvents) -> Result<PollEvents<'a, T>> {
self.wait_timeout(events, Duration::new(i64::MAX as u64, 0))
}
pub fn wait_timeout<'a>(
&self,
events: &'a EpollEvents,
timeout: Duration,
) -> Result<PollEvents<'a, T>> {
let timeout_millis = if timeout.as_secs() as i64 == i64::max_value() {
-1
} else {
let millis = timeout
.as_secs()
.checked_mul(1_000)
.and_then(|ms| ms.checked_add(timeout.subsec_nanos() as u64 / 1_000_000))
.unwrap_or(i32::max_value() as u64);
min(i32::max_value() as u64, millis) as i32
};
let ret = {
let mut epoll_events = events.0.borrow_mut();
let max_events = epoll_events.len() as c_int;
unsafe {
handle_eintr_errno!(epoll_wait(
self.epoll_ctx.as_raw_fd(),
&mut epoll_events[0],
max_events,
timeout_millis
))
}
};
if ret < 0 {
return errno_result();
}
let epoll_events = events.0.borrow();
let events = PollEvents {
count: ret as usize,
events: epoll_events,
tokens: PhantomData,
};
Ok(events)
}
}
impl<T: PollToken> AsRawFd for EpollContext<T> {
fn as_raw_fd(&self) -> RawFd {
self.epoll_ctx.as_raw_fd()
}
}
impl<T: PollToken> IntoRawFd for EpollContext<T> {
fn into_raw_fd(self) -> RawFd {
self.epoll_ctx.into_raw_fd()
}
}
pub struct PollContext<T> {
epoll_ctx: EpollContext<T>,
events: EpollEvents,
hangups: Cell<usize>,
max_hangups: Cell<usize>,
}
impl<T: PollToken> PollContext<T> {
pub fn new() -> Result<PollContext<T>> {
Ok(PollContext {
epoll_ctx: EpollContext::new()?,
events: EpollEvents::new(),
hangups: Cell::new(0),
max_hangups: Cell::new(0),
})
}
pub fn add(&self, fd: &AsRawFd, token: T) -> Result<()> {
self.add_fd_with_events(fd, WatchingEvents::empty().set_read(), token)
}
pub fn add_fd_with_events(&self, fd: &AsRawFd, events: WatchingEvents, token: T) -> Result<()> {
self.epoll_ctx.add_fd_with_events(fd, events, token)?;
self.hangups.set(0);
self.max_hangups.set(self.max_hangups.get() + 1);
Ok(())
}
pub fn modify(&self, fd: &AsRawFd, events: WatchingEvents, token: T) -> Result<()> {
self.epoll_ctx.modify(fd, events, token)
}
pub fn delete(&self, fd: &AsRawFd) -> Result<()> {
self.epoll_ctx.delete(fd)?;
self.hangups.set(0);
self.max_hangups.set(self.max_hangups.get() - 1);
Ok(())
}
fn check_for_hungup_busy_loop(&self, new_hangups: usize) {
let old_hangups = self.hangups.get();
let max_hangups = self.max_hangups.get();
if old_hangups <= max_hangups && old_hangups + new_hangups > max_hangups {
warn!(
"busy poll wait loop with hungup FDs detected on thread {}",
thread::current().name().unwrap_or("")
);
#[cfg(test)]
panic!("hungup busy loop detected");
}
self.hangups.set(old_hangups + new_hangups);
}
pub fn wait(&self) -> Result<PollEvents<T>> {
self.wait_timeout(Duration::new(i64::MAX as u64, 0))
}
pub fn wait_timeout(&self, timeout: Duration) -> Result<PollEvents<T>> {
let events = self.epoll_ctx.wait_timeout(&self.events, timeout)?;
let hangups = events.iter_hungup().count();
self.check_for_hungup_busy_loop(hangups);
Ok(events)
}
}
impl<T: PollToken> AsRawFd for PollContext<T> {
fn as_raw_fd(&self) -> RawFd {
self.epoll_ctx.as_raw_fd()
}
}
impl<T: PollToken> IntoRawFd for PollContext<T> {
fn into_raw_fd(self) -> RawFd {
self.epoll_ctx.into_raw_fd()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::os::unix::net::UnixStream;
use std::time::Instant;
use EventFd;
#[test]
fn poll_context() {
let evt1 = EventFd::new().unwrap();
let evt2 = EventFd::new().unwrap();
evt1.write(1).unwrap();
evt2.write(1).unwrap();
let ctx: PollContext<u32> = PollContext::new().unwrap();
ctx.add(&evt1, 1).unwrap();
ctx.add(&evt2, 2).unwrap();
let mut evt_count = 0;
while evt_count < 2 {
for event in ctx.wait().unwrap().iter_readable() {
evt_count += 1;
match event.token() {
1 => {
evt1.read().unwrap();
ctx.delete(&evt1).unwrap();
}
2 => {
evt2.read().unwrap();
ctx.delete(&evt2).unwrap();
}
_ => panic!("unexpected token"),
};
}
}
assert_eq!(evt_count, 2);
}
#[test]
fn poll_context_overflow() {
const EVT_COUNT: usize = POLL_CONTEXT_MAX_EVENTS * 2 + 1;
let ctx: PollContext<usize> = PollContext::new().unwrap();
let mut evts = Vec::with_capacity(EVT_COUNT);
for i in 0..EVT_COUNT {
let evt = EventFd::new().unwrap();
evt.write(1).unwrap();
ctx.add(&evt, i).unwrap();
evts.push(evt);
}
let mut evt_count = 0;
while evt_count < EVT_COUNT {
for event in ctx.wait().unwrap().iter_readable() {
evts[event.token()].read().unwrap();
evt_count += 1;
}
}
}
#[test]
#[should_panic]
fn poll_context_hungup() {
let (s1, s2) = UnixStream::pair().unwrap();
let ctx: PollContext<u32> = PollContext::new().unwrap();
ctx.add(&s1, 1).unwrap();
drop(s2);
for _ in 0..1000 {
ctx.wait().unwrap();
}
}
#[test]
fn poll_context_timeout() {
let ctx: PollContext<u32> = PollContext::new().unwrap();
let dur = Duration::from_millis(10);
let start_inst = Instant::now();
ctx.wait_timeout(dur).unwrap();
assert!(start_inst.elapsed() >= dur);
}
#[test]
#[allow(dead_code)]
fn poll_token_derive() {
#[derive(PollToken)]
enum EmptyToken {}
#[derive(PartialEq, Debug, PollToken)]
enum Token {
Alpha,
Beta,
Gamma(u32),
Delta { index: usize },
Omega,
}
assert_eq!(
Token::from_raw_token(Token::Alpha.as_raw_token()),
Token::Alpha
);
assert_eq!(
Token::from_raw_token(Token::Beta.as_raw_token()),
Token::Beta
);
assert_eq!(
Token::from_raw_token(Token::Gamma(55).as_raw_token()),
Token::Gamma(55)
);
assert_eq!(
Token::from_raw_token(Token::Delta { index: 100 }.as_raw_token()),
Token::Delta { index: 100 }
);
assert_eq!(
Token::from_raw_token(Token::Omega.as_raw_token()),
Token::Omega
);
}
}