use crate::loom::sync::{atomic::AtomicUsize, Arc};
use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{SendError, TryRecvError};
use std::fmt;
use std::task::{Context, Poll};
pub struct UnboundedSender<T> {
chan: chan::Tx<T, Semaphore>,
}
pub struct WeakUnboundedSender<T> {
chan: Arc<chan::Chan<T, Semaphore>>,
}
impl<T> Clone for UnboundedSender<T> {
fn clone(&self) -> Self {
UnboundedSender {
chan: self.chan.clone(),
}
}
}
impl<T> fmt::Debug for UnboundedSender<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("UnboundedSender")
.field("chan", &self.chan)
.finish()
}
}
pub struct UnboundedReceiver<T> {
chan: chan::Rx<T, Semaphore>,
}
impl<T> fmt::Debug for UnboundedReceiver<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("UnboundedReceiver")
.field("chan", &self.chan)
.finish()
}
}
pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let (tx, rx) = chan::channel(Semaphore(AtomicUsize::new(0)));
let tx = UnboundedSender::new(tx);
let rx = UnboundedReceiver::new(rx);
(tx, rx)
}
#[derive(Debug)]
pub(crate) struct Semaphore(pub(crate) AtomicUsize);
impl<T> UnboundedReceiver<T> {
pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
UnboundedReceiver { chan }
}
pub async fn recv(&mut self) -> Option<T> {
use std::future::poll_fn;
poll_fn(|cx| self.poll_recv(cx)).await
}
pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
use std::future::poll_fn;
poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.chan.try_recv()
}
#[track_caller]
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
pub fn blocking_recv(&mut self) -> Option<T> {
crate::future::block_on(self.recv())
}
#[track_caller]
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))]
pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
crate::future::block_on(self.recv_many(buffer, limit))
}
pub fn close(&mut self) {
self.chan.close();
}
pub fn is_closed(&self) -> bool {
self.chan.is_closed()
}
pub fn is_empty(&self) -> bool {
self.chan.is_empty()
}
pub fn len(&self) -> usize {
self.chan.len()
}
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
pub fn poll_recv_many(
&mut self,
cx: &mut Context<'_>,
buffer: &mut Vec<T>,
limit: usize,
) -> Poll<usize> {
self.chan.recv_many(cx, buffer, limit)
}
pub fn sender_strong_count(&self) -> usize {
self.chan.sender_strong_count()
}
pub fn sender_weak_count(&self) -> usize {
self.chan.sender_weak_count()
}
}
impl<T> UnboundedSender<T> {
pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
UnboundedSender { chan }
}
pub fn send(&self, message: T) -> Result<(), SendError<T>> {
if !self.inc_num_messages() {
return Err(SendError(message));
}
self.chan.send(message);
Ok(())
}
fn inc_num_messages(&self) -> bool {
use std::process;
use std::sync::atomic::Ordering::{AcqRel, Acquire};
let mut curr = self.chan.semaphore().0.load(Acquire);
loop {
if curr & 1 == 1 {
return false;
}
if curr == usize::MAX ^ 1 {
process::abort()
}
match self
.chan
.semaphore()
.0
.compare_exchange(curr, curr + 2, AcqRel, Acquire)
{
Ok(_) => return true,
Err(actual) => {
curr = actual;
}
}
}
}
pub async fn closed(&self) {
self.chan.closed().await;
}
pub fn is_closed(&self) -> bool {
self.chan.is_closed()
}
pub fn same_channel(&self, other: &Self) -> bool {
self.chan.same_channel(&other.chan)
}
#[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
pub fn downgrade(&self) -> WeakUnboundedSender<T> {
WeakUnboundedSender {
chan: self.chan.downgrade(),
}
}
pub fn strong_count(&self) -> usize {
self.chan.strong_count()
}
pub fn weak_count(&self) -> usize {
self.chan.weak_count()
}
}
impl<T> Clone for WeakUnboundedSender<T> {
fn clone(&self) -> Self {
self.chan.increment_weak_count();
WeakUnboundedSender {
chan: self.chan.clone(),
}
}
}
impl<T> Drop for WeakUnboundedSender<T> {
fn drop(&mut self) {
self.chan.decrement_weak_count();
}
}
impl<T> WeakUnboundedSender<T> {
pub fn upgrade(&self) -> Option<UnboundedSender<T>> {
chan::Tx::upgrade(self.chan.clone()).map(UnboundedSender::new)
}
pub fn strong_count(&self) -> usize {
self.chan.strong_count()
}
pub fn weak_count(&self) -> usize {
self.chan.weak_count()
}
}
impl<T> fmt::Debug for WeakUnboundedSender<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("WeakUnboundedSender").finish()
}
}