use crate::loom::sync::Arc;
use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
cfg_time! {
use crate::sync::mpsc::error::SendTimeoutError;
use crate::time::Duration;
}
use std::fmt;
use std::task::{Context, Poll};
pub struct Sender<T> {
chan: chan::Tx<T, Semaphore>,
}
pub struct WeakSender<T> {
chan: Arc<chan::Chan<T, Semaphore>>,
}
pub struct Permit<'a, T> {
chan: &'a chan::Tx<T, Semaphore>,
}
pub struct PermitIterator<'a, T> {
chan: &'a chan::Tx<T, Semaphore>,
n: usize,
}
pub struct OwnedPermit<T> {
chan: Option<chan::Tx<T, Semaphore>>,
}
pub struct Receiver<T> {
chan: chan::Rx<T, Semaphore>,
}
#[track_caller]
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
let semaphore = Semaphore {
semaphore: semaphore::Semaphore::new(buffer),
bound: buffer,
};
let (tx, rx) = chan::channel(semaphore);
let tx = Sender::new(tx);
let rx = Receiver::new(rx);
(tx, rx)
}
#[derive(Debug)]
pub(crate) struct Semaphore {
pub(crate) semaphore: semaphore::Semaphore,
pub(crate) bound: usize,
}
impl<T> Receiver<T> {
pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
Receiver { chan }
}
pub async fn recv(&mut self) -> Option<T> {
use std::future::poll_fn;
poll_fn(|cx| self.chan.recv(cx)).await
}
pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
use std::future::poll_fn;
poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.chan.try_recv()
}
#[track_caller]
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
pub fn blocking_recv(&mut self) -> Option<T> {
crate::future::block_on(self.recv())
}
#[track_caller]
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))]
pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
crate::future::block_on(self.recv_many(buffer, limit))
}
pub fn close(&mut self) {
self.chan.close();
}
pub fn is_closed(&self) -> bool {
self.chan.is_closed()
}
pub fn is_empty(&self) -> bool {
self.chan.is_empty()
}
pub fn len(&self) -> usize {
self.chan.len()
}
pub fn capacity(&self) -> usize {
self.chan.semaphore().semaphore.available_permits()
}
pub fn max_capacity(&self) -> usize {
self.chan.semaphore().bound
}
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
pub fn poll_recv_many(
&mut self,
cx: &mut Context<'_>,
buffer: &mut Vec<T>,
limit: usize,
) -> Poll<usize> {
self.chan.recv_many(cx, buffer, limit)
}
pub fn sender_strong_count(&self) -> usize {
self.chan.sender_strong_count()
}
pub fn sender_weak_count(&self) -> usize {
self.chan.sender_weak_count()
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Receiver")
.field("chan", &self.chan)
.finish()
}
}
impl<T> Unpin for Receiver<T> {}
impl<T> Sender<T> {
pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
Sender { chan }
}
pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
match self.reserve().await {
Ok(permit) => {
permit.send(value);
Ok(())
}
Err(_) => Err(SendError(value)),
}
}
pub async fn closed(&self) {
self.chan.closed().await;
}
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
match self.chan.semaphore().semaphore.try_acquire(1) {
Ok(()) => {}
Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
}
self.chan.send(message);
Ok(())
}
#[cfg(feature = "time")]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
pub async fn send_timeout(
&self,
value: T,
timeout: Duration,
) -> Result<(), SendTimeoutError<T>> {
let permit = match crate::time::timeout(timeout, self.reserve()).await {
Err(_) => {
return Err(SendTimeoutError::Timeout(value));
}
Ok(Err(_)) => {
return Err(SendTimeoutError::Closed(value));
}
Ok(Ok(permit)) => permit,
};
permit.send(value);
Ok(())
}
#[track_caller]
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(alias = "send_blocking"))]
pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
crate::future::block_on(self.send(value))
}
pub fn is_closed(&self) -> bool {
self.chan.is_closed()
}
pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
self.reserve_inner(1).await?;
Ok(Permit { chan: &self.chan })
}
pub async fn reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, SendError<()>> {
self.reserve_inner(n).await?;
Ok(PermitIterator {
chan: &self.chan,
n,
})
}
pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
self.reserve_inner(1).await?;
Ok(OwnedPermit {
chan: Some(self.chan),
})
}
async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> {
crate::trace::async_trace_leaf().await;
if n > self.max_capacity() {
return Err(SendError(()));
}
match self.chan.semaphore().semaphore.acquire(n).await {
Ok(()) => Ok(()),
Err(_) => Err(SendError(())),
}
}
pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
match self.chan.semaphore().semaphore.try_acquire(1) {
Ok(()) => {}
Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
}
Ok(Permit { chan: &self.chan })
}
pub fn try_reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, TrySendError<()>> {
if n > self.max_capacity() {
return Err(TrySendError::Full(()));
}
match self.chan.semaphore().semaphore.try_acquire(n) {
Ok(()) => {}
Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
}
Ok(PermitIterator {
chan: &self.chan,
n,
})
}
pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
match self.chan.semaphore().semaphore.try_acquire(1) {
Ok(()) => {}
Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
}
Ok(OwnedPermit {
chan: Some(self.chan),
})
}
pub fn same_channel(&self, other: &Self) -> bool {
self.chan.same_channel(&other.chan)
}
pub fn capacity(&self) -> usize {
self.chan.semaphore().semaphore.available_permits()
}
#[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
pub fn downgrade(&self) -> WeakSender<T> {
WeakSender {
chan: self.chan.downgrade(),
}
}
pub fn max_capacity(&self) -> usize {
self.chan.semaphore().bound
}
pub fn strong_count(&self) -> usize {
self.chan.strong_count()
}
pub fn weak_count(&self) -> usize {
self.chan.weak_count()
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Sender {
chan: self.chan.clone(),
}
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Sender")
.field("chan", &self.chan)
.finish()
}
}
impl<T> Clone for WeakSender<T> {
fn clone(&self) -> Self {
self.chan.increment_weak_count();
WeakSender {
chan: self.chan.clone(),
}
}
}
impl<T> Drop for WeakSender<T> {
fn drop(&mut self) {
self.chan.decrement_weak_count();
}
}
impl<T> WeakSender<T> {
pub fn upgrade(&self) -> Option<Sender<T>> {
chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
}
pub fn strong_count(&self) -> usize {
self.chan.strong_count()
}
pub fn weak_count(&self) -> usize {
self.chan.weak_count()
}
}
impl<T> fmt::Debug for WeakSender<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("WeakSender").finish()
}
}
impl<T> Permit<'_, T> {
pub fn send(self, value: T) {
use std::mem;
self.chan.send(value);
mem::forget(self);
}
}
impl<T> Drop for Permit<'_, T> {
fn drop(&mut self) {
use chan::Semaphore;
let semaphore = self.chan.semaphore();
semaphore.add_permit();
if semaphore.is_closed() && semaphore.is_idle() {
self.chan.wake_rx();
}
}
}
impl<T> fmt::Debug for Permit<'_, T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Permit")
.field("chan", &self.chan)
.finish()
}
}
impl<'a, T> Iterator for PermitIterator<'a, T> {
type Item = Permit<'a, T>;
fn next(&mut self) -> Option<Self::Item> {
if self.n == 0 {
return None;
}
self.n -= 1;
Some(Permit { chan: self.chan })
}
fn size_hint(&self) -> (usize, Option<usize>) {
let n = self.n;
(n, Some(n))
}
}
impl<T> ExactSizeIterator for PermitIterator<'_, T> {}
impl<T> std::iter::FusedIterator for PermitIterator<'_, T> {}
impl<T> Drop for PermitIterator<'_, T> {
fn drop(&mut self) {
use chan::Semaphore;
if self.n == 0 {
return;
}
let semaphore = self.chan.semaphore();
semaphore.add_permits(self.n);
if semaphore.is_closed() && semaphore.is_idle() {
self.chan.wake_rx();
}
}
}
impl<T> fmt::Debug for PermitIterator<'_, T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("PermitIterator")
.field("chan", &self.chan)
.field("capacity", &self.n)
.finish()
}
}
impl<T> OwnedPermit<T> {
pub fn send(mut self, value: T) -> Sender<T> {
let chan = self.chan.take().unwrap_or_else(|| {
unreachable!("OwnedPermit channel is only taken when the permit is moved")
});
chan.send(value);
Sender { chan }
}
pub fn release(mut self) -> Sender<T> {
use chan::Semaphore;
let chan = self.chan.take().unwrap_or_else(|| {
unreachable!("OwnedPermit channel is only taken when the permit is moved")
});
chan.semaphore().add_permit();
Sender { chan }
}
pub fn same_channel(&self, other: &Self) -> bool {
self.chan
.as_ref()
.zip(other.chan.as_ref())
.is_some_and(|(a, b)| a.same_channel(b))
}
pub fn same_channel_as_sender(&self, sender: &Sender<T>) -> bool {
self.chan
.as_ref()
.is_some_and(|chan| chan.same_channel(&sender.chan))
}
}
impl<T> Drop for OwnedPermit<T> {
fn drop(&mut self) {
use chan::Semaphore;
if let Some(chan) = self.chan.take() {
let semaphore = chan.semaphore();
semaphore.add_permit();
if semaphore.is_closed() && semaphore.is_idle() {
chan.wake_rx();
}
}
}
}
impl<T> fmt::Debug for OwnedPermit<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("OwnedPermit")
.field("chan", &self.chan)
.finish()
}
}