use {sys, Evented};
use util::BoundedQueue;
use std::{fmt, cmp, io};
use std::sync::Arc;
use std::sync::atomic::AtomicIsize;
use std::sync::atomic::Ordering::Relaxed;
use std::os::unix::io::{RawFd, AsRawFd};
const SLEEP: isize = -1;
pub struct Notify<M: Send> {
inner: Arc<NotifyInner<M>>
}
impl<M: Send> Notify<M> {
#[inline]
pub fn with_capacity(capacity: usize) -> io::Result<Notify<M>> {
Ok(Notify {
inner: Arc::new(try!(NotifyInner::with_capacity(capacity)))
})
}
#[inline]
pub fn check(&self, max: usize, will_sleep: bool) -> usize {
self.inner.check(max, will_sleep)
}
#[inline]
pub fn notify(&self, value: M) -> Result<(), NotifyError<M>> {
self.inner.notify(value)
}
#[inline]
pub fn poll(&self) -> Option<M> {
self.inner.poll()
}
#[inline]
pub fn cleanup(&self) {
self.inner.cleanup();
}
}
impl<M: Send> Clone for Notify<M> {
fn clone(&self) -> Notify<M> {
Notify {
inner: self.inner.clone()
}
}
}
impl<M> fmt::Debug for Notify<M> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Notify<?>")
}
}
unsafe impl<M: Send> Sync for Notify<M> { }
unsafe impl<M: Send> Send for Notify<M> { }
struct NotifyInner<M> {
state: AtomicIsize,
queue: BoundedQueue<M>,
awaken: sys::Awakener
}
impl<M: Send> NotifyInner<M> {
fn with_capacity(capacity: usize) -> io::Result<NotifyInner<M>> {
Ok(NotifyInner {
state: AtomicIsize::new(0),
queue: BoundedQueue::with_capacity(capacity),
awaken: try!(sys::Awakener::new())
})
}
fn check(&self, max: usize, will_sleep: bool) -> usize {
let max = max as isize;
let mut cur = self.state.load(Relaxed);
let mut nxt;
let mut val;
loop {
if cur > 0 {
if max >= cur {
nxt = 0;
} else {
nxt = cur - max;
}
} else {
if will_sleep {
nxt = SLEEP;
} else {
nxt = 0;
}
}
val = self.state.compare_and_swap(cur, nxt, Relaxed);
if val == cur {
break;
}
cur = val;
}
if cur < 0 {
0
} else {
cmp::min(cur, max) as usize
}
}
fn poll(&self) -> Option<M> {
self.queue.pop()
}
fn notify(&self, value: M) -> Result<(), NotifyError<M>> {
if let Err(value) = self.queue.push(value) {
return Err(NotifyError::Full(value));
}
let mut cur = self.state.load(Relaxed);
let mut nxt;
let mut val;
loop {
nxt = if cur == SLEEP { 1 } else { cur + 1 };
val = self.state.compare_and_swap(cur, nxt, Relaxed);
if val == cur {
break;
}
cur = val;
}
if cur == SLEEP {
if let Err(e) = self.awaken.wakeup() {
return Err(NotifyError::Io(e));
}
}
Ok(())
}
fn cleanup(&self) {
self.awaken.cleanup();
}
}
impl<M: Send> AsRawFd for Notify<M> {
fn as_raw_fd(&self) -> RawFd {
self.inner.awaken.as_raw_fd()
}
}
impl<M: Send> Evented for Notify<M> {
}
pub enum NotifyError<T> {
Io(io::Error),
Full(T),
}
impl<M> fmt::Debug for NotifyError<M> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
NotifyError::Io(ref e) => {
write!(fmt, "NotifyError::Io({:?})", e)
}
NotifyError::Full(..) => {
write!(fmt, "NotifyError::Full(..)")
}
}
}
}