use crate::NotificationId;
use crossbeam_queue::{ArrayQueue, SegQueue};
use mio::Waker;
use std::sync::Arc;
use std::{error, fmt, io};
pub type Result = std::result::Result<(), NotificationError<NotificationId>>;
pub trait Notifier: Send + Sync + fmt::Debug {
fn notify(&self, id: NotificationId) -> Result;
}
pub trait NotificationReceiver: Send + Sync {
fn receive(&self) -> Option<NotificationId>;
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
}
#[derive(Debug)]
pub struct NotificationQueue {
waker: Arc<Waker>,
queue: SegQueue<NotificationId>,
}
impl NotificationQueue {
pub fn new(waker: Arc<Waker>) -> NotificationQueue {
NotificationQueue {
waker,
queue: SegQueue::new(),
}
}
pub fn push(&self, id: NotificationId) -> io::Result<()> {
self.queue.push(id);
self.waker.wake()
}
pub fn pop(&self) -> Option<NotificationId> {
self.queue.pop()
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
pub fn len(&self) -> usize {
self.queue.len()
}
}
impl Notifier for NotificationQueue {
fn notify(&self, id: NotificationId) -> Result {
self.push(id).map_err(From::from)
}
}
impl NotificationReceiver for NotificationQueue {
fn receive(&self) -> Option<NotificationId> {
self.pop()
}
fn len(&self) -> usize {
self.len()
}
fn is_empty(&self) -> bool {
self.is_empty()
}
}
#[derive(Debug)]
pub struct BoundedNotificationQueue {
waker: Arc<Waker>,
queue: ArrayQueue<NotificationId>,
}
impl BoundedNotificationQueue {
pub fn new(size: usize, waker: Arc<Waker>) -> BoundedNotificationQueue {
BoundedNotificationQueue {
waker,
queue: ArrayQueue::new(size),
}
}
pub fn push(&self, id: NotificationId) -> Result {
self.queue
.push(id)
.map_err(NotificationError::Full)
.and_then(|_| self.waker.wake().map_err(From::from))
}
pub fn pop(&self) -> Option<NotificationId> {
self.queue.pop()
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
pub fn len(&self) -> usize {
self.queue.len()
}
}
impl Notifier for BoundedNotificationQueue {
fn notify(&self, id: NotificationId) -> Result {
self.push(id)
}
}
impl NotificationReceiver for BoundedNotificationQueue {
fn receive(&self) -> Option<NotificationId> {
self.pop()
}
fn len(&self) -> usize {
self.len()
}
fn is_empty(&self) -> bool {
self.queue.is_empty()
}
}
pub enum NotificationError<T> {
Io(io::Error),
Full(T),
}
impl<T> From<io::Error> for NotificationError<T> {
fn from(src: io::Error) -> Self {
NotificationError::Io(src)
}
}
impl<T> error::Error for NotificationError<T> {}
impl<T> fmt::Debug for NotificationError<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
NotificationError::Io(io_err) => write!(f, "{:?}", io_err),
NotificationError::Full(..) => write!(f, "Full(..)"),
}
}
}
impl<T> fmt::Display for NotificationError<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
NotificationError::Io(io_err) => write!(f, "{}", io_err),
NotificationError::Full(..) => write!(f, "sending on a full notification queue"),
}
}
}