use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
cfg_time! {
use crate::sync::mpsc::error::SendTimeoutError;
use crate::time::Duration;
}
use std::fmt;
use std::task::{Context, Poll};
pub struct Sender<T> {
chan: chan::Tx<T, Semaphore>,
}
pub struct Permit<'a, T> {
chan: &'a chan::Tx<T, Semaphore>,
}
pub struct OwnedPermit<T> {
chan: Option<chan::Tx<T, Semaphore>>,
}
pub struct Receiver<T> {
chan: chan::Rx<T, Semaphore>,
}
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
let semaphore = (semaphore::Semaphore::new(buffer), buffer);
let (tx, rx) = chan::channel(semaphore);
let tx = Sender::new(tx);
let rx = Receiver::new(rx);
(tx, rx)
}
type Semaphore = (semaphore::Semaphore, usize);
impl<T> Receiver<T> {
pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
Receiver { chan }
}
pub async fn recv(&mut self) -> Option<T> {
use crate::future::poll_fn;
poll_fn(|cx| self.chan.recv(cx)).await
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.chan.try_recv()
}
#[cfg(feature = "sync")]
pub fn blocking_recv(&mut self) -> Option<T> {
crate::future::block_on(self.recv())
}
pub fn close(&mut self) {
self.chan.close();
}
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Receiver")
.field("chan", &self.chan)
.finish()
}
}
impl<T> Unpin for Receiver<T> {}
impl<T> Sender<T> {
pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
Sender { chan }
}
pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
match self.reserve().await {
Ok(permit) => {
permit.send(value);
Ok(())
}
Err(_) => Err(SendError(value)),
}
}
pub async fn closed(&self) {
self.chan.closed().await
}
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
match self.chan.semaphore().0.try_acquire(1) {
Ok(_) => {}
Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
}
self.chan.send(message);
Ok(())
}
#[cfg(feature = "time")]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
pub async fn send_timeout(
&self,
value: T,
timeout: Duration,
) -> Result<(), SendTimeoutError<T>> {
let permit = match crate::time::timeout(timeout, self.reserve()).await {
Err(_) => {
return Err(SendTimeoutError::Timeout(value));
}
Ok(Err(_)) => {
return Err(SendTimeoutError::Closed(value));
}
Ok(Ok(permit)) => permit,
};
permit.send(value);
Ok(())
}
#[cfg(feature = "sync")]
pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
crate::future::block_on(self.send(value))
}
pub fn is_closed(&self) -> bool {
self.chan.is_closed()
}
pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
self.reserve_inner().await?;
Ok(Permit { chan: &self.chan })
}
pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
self.reserve_inner().await?;
Ok(OwnedPermit {
chan: Some(self.chan),
})
}
async fn reserve_inner(&self) -> Result<(), SendError<()>> {
match self.chan.semaphore().0.acquire(1).await {
Ok(_) => Ok(()),
Err(_) => Err(SendError(())),
}
}
pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
match self.chan.semaphore().0.try_acquire(1) {
Ok(_) => {}
Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
}
Ok(Permit { chan: &self.chan })
}
pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
match self.chan.semaphore().0.try_acquire(1) {
Ok(_) => {}
Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
}
Ok(OwnedPermit {
chan: Some(self.chan),
})
}
pub fn same_channel(&self, other: &Self) -> bool {
self.chan.same_channel(&other.chan)
}
pub fn capacity(&self) -> usize {
self.chan.semaphore().0.available_permits()
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Sender {
chan: self.chan.clone(),
}
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Sender")
.field("chan", &self.chan)
.finish()
}
}
impl<T> Permit<'_, T> {
pub fn send(self, value: T) {
use std::mem;
self.chan.send(value);
mem::forget(self);
}
}
impl<T> Drop for Permit<'_, T> {
fn drop(&mut self) {
use chan::Semaphore;
let semaphore = self.chan.semaphore();
semaphore.add_permit();
if semaphore.is_closed() && semaphore.is_idle() {
self.chan.wake_rx();
}
}
}
impl<T> fmt::Debug for Permit<'_, T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Permit")
.field("chan", &self.chan)
.finish()
}
}
impl<T> OwnedPermit<T> {
pub fn send(mut self, value: T) -> Sender<T> {
let chan = self.chan.take().unwrap_or_else(|| {
unreachable!("OwnedPermit channel is only taken when the permit is moved")
});
chan.send(value);
Sender { chan }
}
pub fn release(mut self) -> Sender<T> {
use chan::Semaphore;
let chan = self.chan.take().unwrap_or_else(|| {
unreachable!("OwnedPermit channel is only taken when the permit is moved")
});
chan.semaphore().add_permit();
Sender { chan }
}
}
impl<T> Drop for OwnedPermit<T> {
fn drop(&mut self) {
use chan::Semaphore;
if let Some(chan) = self.chan.take() {
let semaphore = chan.semaphore();
semaphore.add_permit();
if semaphore.is_closed() && semaphore.is_idle() {
chan.wake_rx();
}
}
}
}
impl<T> fmt::Debug for OwnedPermit<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("OwnedPermit")
.field("chan", &self.chan)
.finish()
}
}