use std::task::{Context, Poll};
use crate::{sync::mpsc::error::TrySendError, *};
pub fn channel<T>(_buffer: usize) -> (Sender<T>, Receiver<T>)
where
T: Clone + std::fmt::Debug + PartialEq + Message + 'static,
{
info!("This is an incomplete implementation. We ingore the buffer size");
let (tx, rx) = crate::channel::Builder::<T>::new().build();
(Sender::new(tx), Receiver::new(rx))
}
#[derive(Clone, Debug, PartialEq)]
pub struct Sender<T> {
sender: crate::channel::Sender<T>,
}
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Sync> Sync for Sender<T> {}
impl<T: Message + 'static> Sender<T> {
fn new(sender: crate::channel::Sender<T>) -> Self {
Sender { sender }
}
pub async fn send(&self, v: T) -> Result<(), error::SendError<T>> {
self.sender.send_msg(v);
Ok(())
}
pub fn try_send(&self, v: T) -> Result<(), TrySendError<T>> {
if named_nondet("mpsc::Sender::try_send") {
self.sender.send_msg(v);
Ok(())
} else {
Err(TrySendError::Full(v))
}
}
pub fn is_closed(&self) -> bool {
named_nondet("mpsc::Sender::is_closed")
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct Receiver<T> {
receiver: crate::channel::Receiver<T>,
}
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Sync> Sync for Receiver<T> {}
impl<T: Message + Clone + 'static> Receiver<T> {
fn new(receiver: crate::channel::Receiver<T>) -> Self {
Receiver { receiver }
}
pub async fn recv(&self) -> Option<T> {
info!("This is an incomplete implementation. It never returns None");
Some(self.receiver.async_recv_msg().await)
}
pub fn try_recv(&self) -> Result<T, error::TryRecvError> {
info!(
"This is an incomplete implementation. It returns Empty only when
sending some special message containing the string mpscClose"
);
let msg = self.receiver.recv_msg_block();
if format!("{:?}", msg).contains("mpscClose") {
Err(error::TryRecvError::Empty)
} else {
Ok(msg)
}
}
pub fn is_empty(&self) -> bool {
named_nondet("mpsc::Receiver::is_empty")
}
pub fn len(&self) -> usize {
named_nondet("mpsc::Receiver::len") as usize
}
pub fn close(&mut self) {}
pub fn is_closed(&self) -> bool {
named_nondet("mpsc::Receiver::is_closed")
}
pub fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<T>> {
Poll::Ready(Some(self.receiver.recv_msg_block()))
}
}
pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)
where
T: Clone + std::fmt::Debug + PartialEq + Message + 'static,
{
let (tx, rx) = crate::channel::Builder::<T>::new().build();
(UnboundedSender::new(tx), UnboundedReceiver::new(rx))
}
#[derive(Clone, Debug, PartialEq)]
pub struct UnboundedSender<T> {
sender: crate::channel::Sender<T>,
}
unsafe impl<T: Send> Send for UnboundedSender<T> {}
unsafe impl<T: Sync> Sync for UnboundedSender<T> {}
impl<T: Message + 'static> UnboundedSender<T> {
fn new(sender: crate::channel::Sender<T>) -> Self {
UnboundedSender { sender }
}
pub fn send(&self, v: T) -> Result<(), error::SendError<T>> {
self.sender.send_msg(v);
Ok(())
}
pub fn try_send(&self, v: T) -> Result<(), error::TrySendError<T>> {
self.sender.send_msg(v);
Ok(())
}
pub fn is_closed(&self) -> bool {
named_nondet("mpsc::UnboundedSender::is_closed")
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct UnboundedReceiver<T> {
receiver: crate::channel::Receiver<T>,
}
unsafe impl<T: Send> Send for UnboundedReceiver<T> {}
unsafe impl<T: Sync> Sync for UnboundedReceiver<T> {}
impl<T: Message + Clone + 'static> UnboundedReceiver<T> {
fn new(receiver: crate::channel::Receiver<T>) -> Self {
UnboundedReceiver { receiver }
}
pub async fn recv(&self) -> Option<T> {
info!("This is an incomplete implementation. It never returns None");
Some(self.receiver.async_recv_msg().await)
}
pub fn try_recv(&self) -> Result<T, error::TryRecvError> {
info!("This is an incomplete implementation. It never returns errors");
Ok(self.receiver.recv_msg_block())
}
pub fn is_empty(&self) -> bool {
named_nondet("mpsc::UnboundedReceiver::is_empty")
}
pub fn len(&self) -> usize {
named_nondet("mpsc::UnboundedReceiver::len") as usize
}
pub fn close(&mut self) {}
pub fn is_closed(&self) -> bool {
named_nondet("mpsc::UnboundedReceiver::is_closed")
}
}
pub mod error {
use std::error::Error;
use std::fmt;
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct SendError<T>(pub T);
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendError").finish_non_exhaustive()
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}
impl<T> Error for SendError<T> {}
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum TrySendError<T> {
Full(T),
Closed(T),
}
impl<T> TrySendError<T> {
pub fn into_inner(self) -> T {
match self {
TrySendError::Full(val) => val,
TrySendError::Closed(val) => val,
}
}
}
impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TrySendError::Full(..) => "Full(..)".fmt(f),
TrySendError::Closed(..) => "Closed(..)".fmt(f),
}
}
}
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
fmt,
"{}",
match self {
TrySendError::Full(..) => "no available capacity",
TrySendError::Closed(..) => "channel closed",
}
)
}
}
impl<T> Error for TrySendError<T> {}
impl<T> From<SendError<T>> for TrySendError<T> {
fn from(src: SendError<T>) -> TrySendError<T> {
TrySendError::Closed(src.0)
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum TryRecvError {
Empty,
Disconnected,
}
impl fmt::Display for TryRecvError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TryRecvError::Empty => "receiving on an empty channel".fmt(fmt),
TryRecvError::Disconnected => "receiving on a closed channel".fmt(fmt),
}
}
}
impl Error for TryRecvError {}
#[derive(Debug, Clone)]
#[doc(hidden)]
#[deprecated(note = "This type is unused because recv returns an Option.")]
pub struct RecvError(());
#[allow(deprecated)]
impl fmt::Display for RecvError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}
#[allow(deprecated)]
impl Error for RecvError {}
}