use super::chan;
use futures::{Poll, Sink, StartSend, Stream};
use std::fmt;
pub struct Sender<T> {
chan: chan::Tx<T, Semaphore>,
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Sender {
chan: self.chan.clone(),
}
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Sender")
.field("chan", &self.chan)
.finish()
}
}
pub struct Receiver<T> {
chan: chan::Rx<T, Semaphore>,
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Receiver")
.field("chan", &self.chan)
.finish()
}
}
#[derive(Debug)]
pub struct SendError(());
#[derive(Debug)]
pub struct TrySendError<T> {
kind: ErrorKind,
value: T,
}
#[derive(Debug)]
enum ErrorKind {
Closed,
NoCapacity,
}
#[derive(Debug)]
pub struct RecvError(());
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
let semaphore = (::semaphore::Semaphore::new(buffer), buffer);
let (tx, rx) = chan::channel(semaphore);
let tx = Sender::new(tx);
let rx = Receiver::new(rx);
(tx, rx)
}
type Semaphore = (::semaphore::Semaphore, usize);
impl<T> Receiver<T> {
pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
Receiver { chan }
}
pub fn close(&mut self) {
self.chan.close();
}
}
impl<T> Stream for Receiver<T> {
type Item = T;
type Error = RecvError;
fn poll(&mut self) -> Poll<Option<T>, Self::Error> {
self.chan.recv().map_err(|_| RecvError(()))
}
}
impl<T> Sender<T> {
pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
Sender { chan }
}
pub fn poll_ready(&mut self) -> Poll<(), SendError> {
self.chan.poll_ready().map_err(|_| SendError(()))
}
pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
self.chan.try_send(message)?;
Ok(())
}
}
impl<T> Sink for Sender<T> {
type SinkItem = T;
type SinkError = SendError;
fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> {
use futures::Async::*;
use futures::AsyncSink;
match self.poll_ready()? {
Ready(_) => {
self.try_send(msg).map_err(|_| SendError(()))?;
Ok(AsyncSink::Ready)
}
NotReady => Ok(AsyncSink::NotReady(msg)),
}
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
use futures::Async::Ready;
Ok(Ready(()))
}
fn close(&mut self) -> Poll<(), Self::SinkError> {
use futures::Async::Ready;
Ok(Ready(()))
}
}
impl fmt::Display for SendError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use std::error::Error;
write!(fmt, "{}", self.description())
}
}
impl ::std::error::Error for SendError {
fn description(&self) -> &str {
"channel closed"
}
}
impl<T> TrySendError<T> {
pub fn into_inner(self) -> T {
self.value
}
pub fn is_closed(&self) -> bool {
if let ErrorKind::Closed = self.kind {
true
} else {
false
}
}
pub fn is_full(&self) -> bool {
if let ErrorKind::NoCapacity = self.kind {
true
} else {
false
}
}
}
impl<T: fmt::Debug> fmt::Display for TrySendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use std::error::Error;
write!(fmt, "{}", self.description())
}
}
impl<T: fmt::Debug> ::std::error::Error for TrySendError<T> {
fn description(&self) -> &str {
match self.kind {
ErrorKind::Closed => "channel closed",
ErrorKind::NoCapacity => "no available capacity",
}
}
}
impl<T> From<(T, chan::TrySendError)> for TrySendError<T> {
fn from((value, err): (T, chan::TrySendError)) -> TrySendError<T> {
TrySendError {
value,
kind: match err {
chan::TrySendError::Closed => ErrorKind::Closed,
chan::TrySendError::NoPermits => ErrorKind::NoCapacity,
},
}
}
}
impl fmt::Display for RecvError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use std::error::Error;
write!(fmt, "{}", self.description())
}
}
impl ::std::error::Error for RecvError {
fn description(&self) -> &str {
"channel closed"
}
}