use crate::loom::sync::atomic::AtomicUsize;
use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{SendError, TryRecvError};
use std::fmt;
use std::task::{Context, Poll};
pub struct UnboundedSender<T> {
chan: chan::Tx<T, Semaphore>,
}
impl<T> Clone for UnboundedSender<T> {
fn clone(&self) -> Self {
UnboundedSender {
chan: self.chan.clone(),
}
}
}
impl<T> fmt::Debug for UnboundedSender<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("UnboundedSender")
.field("chan", &self.chan)
.finish()
}
}
pub struct UnboundedReceiver<T> {
chan: chan::Rx<T, Semaphore>,
}
impl<T> fmt::Debug for UnboundedReceiver<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("UnboundedReceiver")
.field("chan", &self.chan)
.finish()
}
}
pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let (tx, rx) = chan::channel(AtomicUsize::new(0));
let tx = UnboundedSender::new(tx);
let rx = UnboundedReceiver::new(rx);
(tx, rx)
}
type Semaphore = AtomicUsize;
impl<T> UnboundedReceiver<T> {
pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
UnboundedReceiver { chan }
}
#[doc(hidden)] pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
pub async fn recv(&mut self) -> Option<T> {
use crate::future::poll_fn;
poll_fn(|cx| self.poll_recv(cx)).await
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.chan.try_recv()
}
pub fn close(&mut self) {
self.chan.close();
}
}
#[cfg(feature = "stream")]
impl<T> crate::stream::Stream for UnboundedReceiver<T> {
type Item = T;
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.poll_recv(cx)
}
}
impl<T> UnboundedSender<T> {
pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
UnboundedSender { chan }
}
pub fn send(&self, message: T) -> Result<(), SendError<T>> {
self.chan.send_unbounded(message)?;
Ok(())
}
}