use std::borrow::Borrow;
use std::io::Error;
use std::iter::Enumerate;
use std::os::unix::io::AsRawFd;
use std::os::unix::net::UnixStream;
use std::slice::Iter;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use libc::{self, c_int};
use pipe;
use SigId;
const MAX_SIGNUM: usize = 128;
#[derive(Debug)]
struct Waker {
pending: Vec<AtomicBool>,
closed: AtomicBool,
read: UnixStream,
write: UnixStream,
}
impl Waker {
fn wake(&self) {
pipe::wake(self.write.as_raw_fd());
}
}
#[derive(Debug)]
struct RegisteredSignals(Mutex<Vec<Option<SigId>>>);
impl Drop for RegisteredSignals {
fn drop(&mut self) {
let lock = self.0.lock().unwrap();
for id in lock.iter().filter_map(|s| *s) {
::unregister(id);
}
}
}
#[derive(Clone, Debug)]
pub struct Signals {
ids: Arc<RegisteredSignals>,
waker: Arc<Waker>,
}
impl Signals {
pub fn new<I, S>(signals: I) -> Result<Self, Error>
where
I: IntoIterator<Item = S>,
S: Borrow<c_int>,
{
let (read, write) = UnixStream::pair()?;
let pending = (0..MAX_SIGNUM).map(|_| AtomicBool::new(false)).collect();
let waker = Arc::new(Waker {
pending,
closed: AtomicBool::new(false),
read,
write,
});
let ids = (0..MAX_SIGNUM).map(|_| None).collect();
let me = Self {
ids: Arc::new(RegisteredSignals(Mutex::new(ids))),
waker,
};
for sig in signals {
me.add_signal(*sig.borrow())?;
}
Ok(me)
}
pub fn add_signal(&self, signal: c_int) -> Result<(), Error> {
assert!(signal >= 0);
assert!(
(signal as usize) < MAX_SIGNUM,
"Signal number {} too large. If your OS really supports such signal, file a bug",
signal,
);
let mut lock = self.ids.0.lock().unwrap();
if lock[signal as usize].is_some() {
return Ok(());
}
let waker = Arc::clone(&self.waker);
let action = move || {
waker.pending[signal as usize].store(true, Ordering::SeqCst);
waker.wake();
};
let id = unsafe { ::register(signal, action) }?;
lock[signal as usize] = Some(id);
Ok(())
}
fn flush(&self, wait: bool) -> bool {
if self.waker.closed.load(Ordering::SeqCst) {
return false;
}
const SIZE: usize = 1024;
let mut buff = [0u8; SIZE];
let res = unsafe {
libc::recv(
self.waker.read.as_raw_fd(),
buff.as_mut_ptr() as *mut libc::c_void,
SIZE,
if wait { 0 } else { libc::MSG_DONTWAIT },
)
};
if self.waker.closed.load(Ordering::SeqCst) {
self.waker.wake();
}
res > 0
}
pub fn pending(&self) -> Pending {
self.flush(false);
Pending(self.waker.pending.iter().enumerate())
}
pub fn wait(&self) -> Pending {
self.flush(true);
Pending(self.waker.pending.iter().enumerate())
}
pub fn forever(&self) -> Forever {
Forever {
signals: self,
iter: self.pending(),
}
}
pub fn is_closed(&self) -> bool {
self.waker.closed.load(Ordering::SeqCst)
}
pub fn close(&self) {
self.waker.closed.store(true, Ordering::SeqCst);
self.waker.wake();
}
}
impl<'a> IntoIterator for &'a Signals {
type Item = c_int;
type IntoIter = Forever<'a>;
fn into_iter(self) -> Forever<'a> {
self.forever()
}
}
pub struct Pending<'a>(Enumerate<Iter<'a, AtomicBool>>);
impl<'a> Iterator for Pending<'a> {
type Item = c_int;
fn next(&mut self) -> Option<c_int> {
while let Some((sig, flag)) = self.0.next() {
if flag
.compare_exchange(true, false, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
return Some(sig as c_int);
}
}
None
}
}
pub struct Forever<'a> {
signals: &'a Signals,
iter: Pending<'a>,
}
impl<'a> Iterator for Forever<'a> {
type Item = c_int;
fn next(&mut self) -> Option<c_int> {
while !self.signals.is_closed() {
if let Some(result) = self.iter.next() {
return Some(result);
}
self.iter = self.signals.wait();
}
None
}
}
#[cfg(feature = "mio-support")]
mod mio_support {
use std::io::Error;
use std::os::unix::io::AsRawFd;
use mio::event::Evented;
use mio::unix::EventedFd;
use mio::{Poll, PollOpt, Ready, Token};
use super::Signals;
impl Evented for Signals {
fn register(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> Result<(), Error> {
EventedFd(&self.waker.read.as_raw_fd()).register(poll, token, interest, opts)
}
fn reregister(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> Result<(), Error> {
EventedFd(&self.waker.read.as_raw_fd()).reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &Poll) -> Result<(), Error> {
EventedFd(&self.waker.read.as_raw_fd()).deregister(poll)
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use libc;
use mio::Events;
use super::*;
#[test]
fn mio_wakeup() {
let signals = Signals::new(&[::SIGUSR1]).unwrap();
let token = Token(0);
let poll = Poll::new().unwrap();
poll.register(&signals, token, Ready::readable(), PollOpt::level())
.unwrap();
let mut events = Events::with_capacity(10);
unsafe { libc::raise(::SIGUSR1) };
poll.poll(&mut events, Some(Duration::from_secs(10)))
.unwrap();
let event = events.iter().next().unwrap();
assert!(event.readiness().is_readable());
assert_eq!(token, event.token());
let sig = signals.pending().next().unwrap();
assert_eq!(::SIGUSR1, sig);
}
}
}
#[cfg(feature = "tokio-support")]
mod tokio_support {
use std::io::Error;
use std::sync::atomic::Ordering;
use futures::stream::Stream;
use futures::{Async as AsyncResult, Poll};
use libc::{self, c_int};
use tokio_reactor::{Handle, Registration};
use super::Signals;
#[derive(Debug)]
pub struct Async {
registration: Registration,
inner: Signals,
position: usize,
}
impl Async {
pub fn new(signals: Signals, handle: &Handle) -> Result<Self, Error> {
let registration = Registration::new();
registration.register_with(&signals, handle)?;
Ok(Async {
registration,
inner: signals,
position: 0,
})
}
}
impl Stream for Async {
type Item = libc::c_int;
type Error = Error;
fn poll(&mut self) -> Poll<Option<libc::c_int>, Self::Error> {
while !self.inner.is_closed() {
if self.position >= self.inner.waker.pending.len() {
if self.registration.poll_read_ready()?.is_not_ready() {
return Ok(AsyncResult::NotReady);
}
while self.inner.flush(false) {}
self.position = 0;
}
assert!(self.position < self.inner.waker.pending.len());
let sig = &self.inner.waker.pending[self.position];
let sig_num = self.position;
self.position += 1;
if sig
.compare_exchange(true, false, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
return Ok(AsyncResult::Ready(Some(sig_num as c_int)));
}
}
Ok(AsyncResult::Ready(None))
}
}
impl Signals {
pub fn into_async(self) -> Result<Async, Error> {
Async::new(self, &Handle::default())
}
pub fn into_async_with_handle(self, handle: &Handle) -> Result<Async, Error> {
Async::new(self, handle)
}
}
}
#[cfg(feature = "tokio-support")]
pub use self::tokio_support::Async;