use crate::queue::{NotificationError, Notifier};
use crate::NotificationId;
use crossbeam::channel as beamchannel;
use std::error;
use std::sync::{mpsc, Arc};
use std::{fmt, io};
pub fn channel<T>(
notifier: Arc<dyn Notifier>,
id: NotificationId,
) -> (Sender<T>, mpsc::Receiver<T>) {
let (tx, rx) = mpsc::channel();
let tx = Sender { notifier, tx, id };
(tx, rx)
}
pub fn sync_channel<T>(
notifier: Arc<dyn Notifier>,
id: NotificationId,
bound_size: usize,
) -> (SyncSender<T>, mpsc::Receiver<T>) {
let (tx, rx) = mpsc::sync_channel(bound_size);
let tx = SyncSender { notifier, tx, id };
(tx, rx)
}
pub fn crossbeam_channel_unbounded<T>(
notifier: Arc<dyn Notifier>,
id: NotificationId,
) -> (CrossbeamSender<T>, beamchannel::Receiver<T>) {
let (tx, rx) = beamchannel::unbounded();
let tx = CrossbeamSender { notifier, tx, id };
(tx, rx)
}
pub fn crossbeam_channel_bounded<T>(
notifier: Arc<dyn Notifier>,
id: NotificationId,
size: usize,
) -> (CrossbeamSender<T>, beamchannel::Receiver<T>) {
let (tx, rx) = beamchannel::bounded(size);
let tx = CrossbeamSender { notifier, tx, id };
(tx, rx)
}
pub struct Sender<T> {
tx: mpsc::Sender<T>,
notifier: Arc<dyn Notifier>,
id: NotificationId,
}
impl<T> Sender<T> {
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
self.tx.send(t).map_err(SendError::from)?;
self.notifier.notify(self.id).map_err(SendError::from)
}
}
pub struct CrossbeamSender<T> {
tx: beamchannel::Sender<T>,
notifier: Arc<dyn Notifier>,
id: NotificationId,
}
impl<T> CrossbeamSender<T> {
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
self.tx.send(t).map_err(SendError::from)?;
self.notifier.notify(self.id).map_err(SendError::from)
}
pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
self.tx
.try_send(t)
.map_err(From::from)
.and_then(|_| self.notifier.notify(self.id).map_err(From::from))
}
}
pub struct SyncSender<T> {
tx: mpsc::SyncSender<T>,
notifier: Arc<dyn Notifier>,
id: NotificationId,
}
impl<T> SyncSender<T> {
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
self.tx
.send(t)
.map_err(From::from)
.and_then(|_| self.notifier.notify(self.id).map_err(From::from))
}
pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
self.tx
.try_send(t)
.map_err(From::from)
.and_then(|_| self.notifier.notify(self.id).map_err(From::from))
}
}
pub enum SendError<T> {
Io(io::Error),
Disconnected(T),
NotificationQueueFull,
}
pub enum TrySendError<T> {
Io(io::Error),
Full(T),
Disconnected(T),
NotificationQueueFull,
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
Sender {
tx: self.tx.clone(),
notifier: Arc::clone(&self.notifier),
id: self.id,
}
}
}
impl<T> Clone for SyncSender<T> {
fn clone(&self) -> SyncSender<T> {
SyncSender {
tx: self.tx.clone(),
notifier: Arc::clone(&self.notifier),
id: self.id,
}
}
}
impl<T> From<mpsc::SendError<T>> for SendError<T> {
fn from(src: mpsc::SendError<T>) -> Self {
SendError::Disconnected(src.0)
}
}
impl<T> From<io::Error> for SendError<T> {
fn from(src: io::Error) -> Self {
SendError::Io(src)
}
}
impl<T> From<beamchannel::SendError<T>> for SendError<T> {
fn from(src: beamchannel::SendError<T>) -> Self {
SendError::Disconnected(src.0)
}
}
impl<T> From<NotificationError<NotificationId>> for SendError<T> {
fn from(_: NotificationError<NotificationId>) -> Self {
SendError::NotificationQueueFull
}
}
impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
fn from(src: mpsc::TrySendError<T>) -> Self {
match src {
mpsc::TrySendError::Full(v) => TrySendError::Full(v),
mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
}
}
}
impl<T> From<NotificationError<NotificationId>> for TrySendError<T> {
fn from(_: NotificationError<NotificationId>) -> Self {
TrySendError::NotificationQueueFull
}
}
impl<T> From<beamchannel::TrySendError<T>> for TrySendError<T> {
fn from(src: beamchannel::TrySendError<T>) -> Self {
match src {
beamchannel::TrySendError::Full(v) => TrySendError::Full(v),
beamchannel::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
}
}
}
impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
fn from(src: mpsc::SendError<T>) -> Self {
TrySendError::Disconnected(src.0)
}
}
impl<T> From<io::Error> for TrySendError<T> {
fn from(src: io::Error) -> Self {
TrySendError::Io(src)
}
}
impl<T> error::Error for SendError<T> {}
impl<T> error::Error for TrySendError<T> {}
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
SendError::Io(io_err) => write!(f, "{:?}", io_err),
SendError::Disconnected(_) => write!(f, "Disconnected(..)"),
SendError::NotificationQueueFull => write!(f, "NotificationQueueFull"),
}
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
SendError::Io(io_err) => write!(f, "{}", io_err),
SendError::Disconnected(_) => write!(f, "sending on a closed channel"),
SendError::NotificationQueueFull => write!(f, "sending on a full notification queue"),
}
}
}
impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
TrySendError::Io(io_err) => write!(f, "{:?}", io_err),
TrySendError::Full(..) => write!(f, "Full(..)"),
TrySendError::Disconnected(..) => write!(f, "Disconnected(..)"),
TrySendError::NotificationQueueFull => write!(f, "NotificationQueueFull"),
}
}
}
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
TrySendError::Io(io_err) => write!(f, "{}", io_err),
TrySendError::Full(..) => write!(f, "sending on a full channel"),
TrySendError::Disconnected(..) => write!(f, "sending on a closed channel"),
TrySendError::NotificationQueueFull => {
write!(f, "sending on a full notification queue")
}
}
}
}