use core::{
fmt, mem,
task::{Context, Poll},
};
use crate::{
alloc::{collections::VecDeque, rc::Rc},
error::{SendError, TryRecvError, TrySendError},
mask::{COUNTED, UNCOUNTED},
queue::BoundedQueue,
};
pub const fn channel<T>(capacity: usize) -> Channel<T> {
assert!(capacity > 0, "channel capacity must be at least 1");
Channel { queue: BoundedQueue::new(capacity) }
}
pub fn channel_from_iter<T>(min_capacity: usize, iter: impl IntoIterator<Item = T>) -> Channel<T> {
Channel::from_iter(min_capacity, iter)
}
pub struct Channel<T> {
queue: BoundedQueue<T>,
}
impl<T> Channel<T> {
pub fn with_initial_capacity(capacity: usize, initial: usize) -> Self {
Self { queue: BoundedQueue::with_capacity(capacity, initial) }
}
pub fn from_iter(capacity: usize, iter: impl IntoIterator<Item = T>) -> Self {
Self { queue: BoundedQueue::from_iter(capacity, iter) }
}
pub fn split(&mut self) -> (SenderRef<'_, T>, ReceiverRef<'_, T>) {
self.queue.0.get_mut().set_counted();
(SenderRef { queue: &self.queue }, ReceiverRef { queue: &self.queue })
}
pub fn into_split(mut self) -> (Sender<T>, Receiver<T>) {
self.queue.0.get_mut().set_counted();
let queue = Rc::new(self.queue);
(Sender { queue: Rc::clone(&queue) }, Receiver { queue })
}
pub fn into_deque(self) -> VecDeque<T> {
self.queue.into_deque()
}
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn max_capacity(&self) -> usize {
self.queue.max_capacity()
}
pub fn capacity(&self) -> usize {
self.queue.capacity()
}
pub fn close(&self) {
self.queue.close::<UNCOUNTED>();
}
pub fn is_closed(&self) -> bool {
self.queue.is_closed::<UNCOUNTED>()
}
pub fn is_empty(&self) -> bool {
self.queue.len() == 0
}
pub fn try_recv(&self) -> Result<T, TryRecvError> {
self.queue.try_recv::<UNCOUNTED>()
}
pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.queue.poll_recv::<UNCOUNTED>(cx)
}
pub async fn recv(&self) -> Option<T> {
self.queue.recv::<UNCOUNTED>().await
}
pub fn try_send(&self, elem: T) -> Result<(), TrySendError<T>> {
self.queue.try_send::<UNCOUNTED>(elem)
}
pub async fn send(&self, elem: T) -> Result<(), SendError<T>> {
self.queue.send::<UNCOUNTED>(elem).await
}
pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
self.queue.try_reserve::<COUNTED>()?;
Ok(Permit { queue: &self.queue })
}
pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
self.queue.reserve::<UNCOUNTED>().await?;
Ok(Permit { queue: &self.queue })
}
}
impl<T> fmt::Debug for Channel<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Channel")
.field("capacity", &self.capacity())
.field("max_capacity", &self.max_capacity())
.field("is_closed", &self.is_closed())
.finish()
}
}
pub struct Sender<T> {
queue: Rc<BoundedQueue<T>>,
}
impl<T> Sender<T> {
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn max_capacity(&self) -> usize {
self.queue.max_capacity()
}
pub fn capacity(&self) -> usize {
self.queue.capacity()
}
pub fn is_closed(&self) -> bool {
self.queue.is_closed::<COUNTED>()
}
pub fn is_empty(&self) -> bool {
self.queue.len() == 0
}
pub fn same_channel(&self, other: &Self) -> bool {
core::ptr::eq(Rc::as_ptr(&self.queue), Rc::as_ptr(&other.queue))
}
pub fn try_send(&self, elem: T) -> Result<(), TrySendError<T>> {
self.queue.try_send::<COUNTED>(elem)
}
pub async fn send(&self, elem: T) -> Result<(), SendError<T>> {
self.queue.send::<COUNTED>(elem).await
}
pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
self.queue.try_reserve::<COUNTED>()?;
Ok(Permit { queue: &self.queue })
}
pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
if let Err(err) = self.queue.try_reserve::<COUNTED>() {
return Err(err.set(self));
}
Ok(OwnedPermit { sender: Some(self) })
}
pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
self.queue.reserve::<COUNTED>().await?;
Ok(Permit { queue: &self.queue })
}
pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<Self>> {
if self.queue.reserve::<COUNTED>().await.is_err() {
return Err(SendError(self));
}
Ok(OwnedPermit { sender: Some(self) })
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
unsafe { (*self.queue.0.get()).mask.increase_sender_count() };
Self { queue: Rc::clone(&self.queue) }
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
unsafe { (*self.queue.0.get()).decrease_sender_count() };
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Sender")
.field("capacity", &self.capacity())
.field("max_capacity", &self.max_capacity())
.field("is_closed", &self.is_closed())
.finish()
}
}
pub struct SenderRef<'a, T> {
queue: &'a BoundedQueue<T>,
}
impl<T> SenderRef<'_, T> {
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn max_capacity(&self) -> usize {
self.queue.max_capacity()
}
pub fn capacity(&self) -> usize {
self.queue.capacity()
}
pub fn is_closed(&self) -> bool {
self.queue.is_closed::<COUNTED>()
}
pub fn is_empty(&self) -> bool {
self.queue.len() == 0
}
pub fn same_channel(&self, other: &Self) -> bool {
core::ptr::eq(&self.queue, &other.queue)
}
pub fn try_send(&self, elem: T) -> Result<(), TrySendError<T>> {
self.queue.try_send::<COUNTED>(elem)
}
pub async fn send(&self, elem: T) -> Result<(), SendError<T>> {
self.queue.send::<COUNTED>(elem).await
}
pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
self.queue.try_reserve::<COUNTED>()?;
Ok(Permit { queue: self.queue })
}
pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
self.queue.reserve::<COUNTED>().await?;
Ok(Permit { queue: self.queue })
}
}
impl<T> Clone for SenderRef<'_, T> {
fn clone(&self) -> Self {
unsafe { (*self.queue.0.get()).mask.increase_sender_count() };
Self { queue: self.queue }
}
}
impl<T> Drop for SenderRef<'_, T> {
fn drop(&mut self) {
unsafe { (*self.queue.0.get()).decrease_sender_count() };
}
}
impl<T> fmt::Debug for SenderRef<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SenderRef")
.field("capacity", &self.capacity())
.field("max_capacity", &self.max_capacity())
.field("is_closed", &self.is_closed())
.finish()
}
}
pub struct Receiver<T> {
queue: Rc<BoundedQueue<T>>,
}
impl<T> Receiver<T> {
pub fn close(&mut self) {
self.queue.close::<COUNTED>();
}
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn max_capacity(&self) -> usize {
self.queue.max_capacity()
}
pub fn capacity(&self) -> usize {
self.queue.capacity()
}
pub fn is_closed(&self) -> bool {
self.queue.is_closed::<COUNTED>()
}
pub fn is_empty(&self) -> bool {
self.queue.len() == 0
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.queue.try_recv::<COUNTED>()
}
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.queue.poll_recv::<COUNTED>(cx)
}
pub async fn recv(&mut self) -> Option<T> {
self.queue.recv::<COUNTED>().await
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.queue.close::<COUNTED>();
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Receiver")
.field("capacity", &self.capacity())
.field("max_capacity", &self.max_capacity())
.field("is_closed", &self.is_closed())
.finish()
}
}
pub struct ReceiverRef<'a, T> {
queue: &'a BoundedQueue<T>,
}
impl<T> ReceiverRef<'_, T> {
pub fn close(&mut self) {
self.queue.close::<COUNTED>();
}
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn max_capacity(&self) -> usize {
self.queue.max_capacity()
}
pub fn capacity(&self) -> usize {
self.queue.capacity()
}
pub fn is_closed(&self) -> bool {
self.queue.is_closed::<COUNTED>()
}
pub fn is_empty(&self) -> bool {
self.queue.len() == 0
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.queue.try_recv::<COUNTED>()
}
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.queue.poll_recv::<COUNTED>(cx)
}
pub async fn recv(&mut self) -> Option<T> {
self.queue.recv::<COUNTED>().await
}
}
impl<T> Drop for ReceiverRef<'_, T> {
fn drop(&mut self) {
self.queue.close::<COUNTED>();
}
}
impl<T> fmt::Debug for ReceiverRef<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReceiverRef")
.field("capacity", &self.capacity())
.field("max_capacity", &self.max_capacity())
.field("is_closed", &self.is_closed())
.finish()
}
}
pub struct Permit<'a, T> {
queue: &'a BoundedQueue<T>,
}
impl<T> Permit<'_, T> {
pub fn send(self, elem: T) {
self.queue.unbounded_send(elem);
self.queue.unreserve(true);
mem::forget(self);
}
}
impl<T> Drop for Permit<'_, T> {
fn drop(&mut self) {
self.queue.unreserve(false);
}
}
impl<T> fmt::Debug for Permit<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Permit").finish_non_exhaustive()
}
}
pub struct OwnedPermit<T> {
sender: Option<Sender<T>>,
}
impl<T> OwnedPermit<T> {
pub fn send(mut self, elem: T) -> Sender<T> {
let sender = self.sender.take().unwrap_or_else(|| unreachable!());
sender.queue.unbounded_send(elem);
sender.queue.unreserve(true);
sender
}
}
impl<T> Drop for OwnedPermit<T> {
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
sender.queue.unreserve(false);
}
}
}
impl<T> fmt::Debug for OwnedPermit<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("OwnedPermit").finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use core::{future::Future as _, task::Poll};
use futures_lite::future;
use crate::queue::RecvFuture;
#[test]
#[should_panic]
fn channel_panic() {
let _ = super::channel::<i32>(0);
}
#[test]
fn recv_split() {
future::block_on(async {
let mut chan = super::channel::<i32>(4);
let (tx, mut rx) = chan.split();
assert_eq!(tx.capacity(), 4);
for i in 0..4 {
assert!(tx.send(i).await.is_ok());
assert_eq!(tx.capacity(), 4 - i as usize - 1);
}
assert_eq!(rx.recv().await, Some(0));
assert_eq!(tx.capacity(), 1);
assert_eq!(rx.recv().await, Some(1));
assert_eq!(tx.capacity(), 2);
assert_eq!(rx.recv().await, Some(2));
assert_eq!(tx.capacity(), 3);
assert_eq!(rx.recv().await, Some(3));
assert_eq!(tx.capacity(), 4);
assert!(rx.try_recv().is_err());
drop(rx);
assert!(tx.send(0).await.is_err());
});
}
#[test]
fn poll_often() {
future::block_on(async {
let mut chan = super::channel::<i32>(4);
let (tx, rx) = chan.split();
for i in 0..4 {
assert!(tx.send(i).await.is_ok());
}
let queue = &rx.queue.0;
let fut = RecvFuture::<'_, _, _, true> { queue };
futures_lite::pin!(fut);
assert_eq!((&mut fut).await, Some(0));
assert_eq!((&mut fut).await, Some(1));
assert_eq!((&mut fut).await, Some(2));
assert_eq!((&mut fut).await, Some(3));
});
}
#[test]
fn cancel_recv() {
future::block_on(async {
let mut chan = super::channel::<i32>(1);
let (tx, mut rx) = chan.split();
assert_eq!(tx.capacity(), 1);
let mut r1 = Box::pin(rx.recv());
core::future::poll_fn(|cx| {
assert!(r1.as_mut().poll(cx).is_pending());
Poll::Ready(())
})
.await;
tx.send(0).await.unwrap();
assert_eq!(tx.capacity(), 0);
drop(r1);
assert_eq!(tx.capacity(), 0);
assert_eq!(rx.recv().await, Some(0));
});
}
#[test]
fn cancel_send() {
future::block_on(async {
let mut chan = super::channel::<i32>(1);
let (tx, mut rx) = chan.split();
assert_eq!(tx.capacity(), 1);
tx.send(0).await.unwrap();
assert_eq!(tx.capacity(), 0);
let mut s1 = Box::pin(tx.send(1));
let mut s2 = Box::pin(tx.send(2));
core::future::poll_fn(|cx| {
assert!(s1.as_mut().poll(cx).is_pending());
assert!(s2.as_mut().poll(cx).is_pending());
Poll::Ready(())
})
.await;
assert_eq!(rx.recv().await, Some(0));
drop(s1);
assert_eq!(tx.capacity(), 0);
core::future::poll_fn(|cx| {
assert!(s2.as_mut().poll(cx).is_ready());
assert_eq!(tx.capacity(), 0);
Poll::Ready(())
})
.await;
assert_eq!(rx.recv().await, Some(2));
assert_eq!(tx.capacity(), 1);
tx.send(1).await.unwrap();
assert_eq!(rx.recv().await, Some(1));
});
}
#[test]
fn poll_out_of_order() {
future::block_on(async {
let mut chan = super::channel::<i32>(1);
let (tx, mut rx) = chan.split();
assert_eq!(tx.capacity(), 1);
tx.send(0).await.unwrap();
assert_eq!(tx.capacity(), 0);
let s1 = tx.send(1);
let s2 = tx.send(2);
futures_lite::pin!(s1, s2);
core::future::poll_fn(|cx| {
assert!(s1.as_mut().poll(cx).is_pending());
assert!(s2.as_mut().poll(cx).is_pending());
assert_eq!(tx.capacity(), 0);
Poll::Ready(())
})
.await;
assert_eq!(rx.recv().await, Some(0));
assert_eq!(tx.capacity(), 0);
core::future::poll_fn(|cx| {
assert!(s2.as_mut().poll(cx).is_pending());
assert_eq!(s1.as_mut().poll(cx), Poll::Ready(Ok(())));
assert!(s2.as_mut().poll(cx).is_pending());
Poll::Ready(())
})
.await;
assert_eq!(rx.recv().await, Some(1));
assert!(s2.await.is_ok());
assert_eq!(rx.recv().await, Some(2));
});
}
#[test]
fn poll_out_of_order_drop() {
future::block_on(async {
let mut chan = super::channel::<i32>(1);
let (tx, mut rx) = chan.split();
tx.send(0).await.unwrap();
let mut s1 = Box::pin(tx.send(1));
let mut s2 = Box::pin(tx.send(2));
core::future::poll_fn(|cx| {
assert!(s1.as_mut().poll(cx).is_pending());
assert!(s2.as_mut().poll(cx).is_pending());
Poll::Ready(())
})
.await;
assert_eq!(rx.recv().await, Some(0));
drop(s1);
core::future::poll_fn(|cx| {
assert_eq!(s2.as_mut().poll(cx), Poll::Ready(Ok(())));
Poll::Ready(())
})
.await;
assert_eq!(rx.try_recv(), Ok(2));
});
}
#[test]
fn full() {
let chan = super::channel::<i32>(1);
assert!(chan.try_send(0).is_ok());
assert!(chan.try_send(1).is_err());
assert_eq!(chan.try_recv(), Ok(0));
assert!(chan.try_send(1).is_ok());
assert_eq!(chan.try_recv(), Ok(1));
}
#[test]
fn reserve_and_close() {
future::block_on(async {
let mut chan = super::channel::<i32>(1);
let (tx, mut rx) = chan.split();
let permit = tx.reserve().await.unwrap();
assert_eq!(tx.capacity(), 0);
assert_eq!(tx.max_capacity(), 1);
rx.close();
assert!(tx.reserve().await.is_err());
core::future::poll_fn(|cx| {
assert!(rx.poll_recv(cx).is_pending());
Poll::Ready(())
})
.await;
assert!(tx.send(1).await.is_err());
permit.send(1);
assert_eq!(tx.capacity(), 0);
assert_eq!(rx.recv().await, Some(1));
assert_eq!(tx.capacity(), 1);
assert_eq!(rx.recv().await, None);
});
}
#[test]
fn reserve_and_cancel() {
future::block_on(async {
let mut chan = super::channel::<i32>(1);
let (tx, mut rx) = chan.split();
let permit = tx.reserve().await.unwrap();
assert_eq!(tx.capacity(), 0);
assert_eq!(tx.max_capacity(), 1);
let mut fut = Box::pin(tx.reserve());
core::future::poll_fn(|cx| {
assert!(fut.as_mut().poll(cx).is_pending());
Poll::Ready(())
})
.await;
drop(permit);
let permit = fut.await.unwrap();
rx.close();
assert!(tx.reserve().await.is_err());
assert!(tx.send(1).await.is_err());
permit.send(1);
assert_eq!(rx.recv().await, Some(1));
assert_eq!(rx.recv().await, None);
});
}
#[test]
fn reserve_and_drop_permit() {
future::block_on(async {
let mut chan = super::channel::<i32>(1);
let (tx, mut rx) = chan.split();
let permit = tx.reserve().await.unwrap();
assert_eq!(tx.capacity(), 0);
assert_eq!(tx.max_capacity(), 1);
rx.close();
core::future::poll_fn(|cx| {
assert!(rx.poll_recv(cx).is_pending());
Poll::Ready(())
})
.await;
drop(permit);
assert_eq!(tx.capacity(), 1);
assert_eq!(rx.recv().await, None);
});
}
#[test]
fn diverting_len_and_capacity() {
future::block_on(async {
let mut chan = super::channel(5);
let (tx, mut rx) = chan.split();
tx.send(1).await.unwrap();
let permit1 = tx.reserve().await.unwrap();
assert_eq!(tx.len() + tx.capacity(), 4);
let permit2 = tx.reserve().await.unwrap();
assert_eq!(tx.len() + tx.capacity(), 3);
permit1.send(2);
permit2.send(3);
assert_eq!(tx.len() + tx.capacity(), 5);
assert_eq!(rx.recv().await, Some(1));
assert_eq!(rx.recv().await, Some(2));
assert_eq!(rx.recv().await, Some(3));
});
}
#[test]
fn split_after_close() {
let mut chan = super::channel::<i32>(1);
chan.close();
let (tx, rx) = chan.split();
assert!(tx.is_closed());
assert!(rx.is_closed());
}
#[test]
fn from_iter_less() {
let chan = super::channel_from_iter(0, &[0, 1, 2, 3]);
assert_eq!(chan.capacity(), 0);
}
#[test]
fn from_iter_more() {
future::block_on(async {
let chan = super::Channel::from_iter(5, [0, 1, 2, 3]);
assert_eq!(chan.recv().await, Some(0));
assert_eq!(chan.recv().await, Some(1));
assert_eq!(chan.recv().await, Some(2));
assert_eq!(chan.recv().await, Some(3));
assert_eq!(chan.capacity(), 5);
});
}
#[test]
fn send_vs_reserve() {
future::block_on(async {
let mut chan = super::channel::<i32>(1);
let (tx, mut rx) = chan.split();
assert!(tx.send(-1).await.is_ok());
let mut f1 = Box::pin(tx.send(-2));
let mut f2 = Box::pin(tx.send(-3));
let mut f3 = Box::pin(tx.reserve());
core::future::poll_fn(|cx| {
assert!(f1.as_mut().poll(cx).is_pending());
assert!(f2.as_mut().poll(cx).is_pending());
assert!(f3.as_mut().poll(cx).is_pending());
Poll::Ready(())
})
.await;
assert_eq!(rx.recv().await, Some(-1));
assert_eq!(tx.capacity(), 0, "capacity goes to f1");
assert!(f1.await.is_ok());
assert_eq!(tx.capacity(), 0);
assert_eq!(rx.recv().await, Some(-2));
assert_eq!(tx.capacity(), 0, "capacity goes to f3");
drop(f2);
assert_eq!(tx.capacity(), 0, "capacity goes to f3");
f3.await.unwrap().send(-4);
assert_eq!(tx.capacity(), 0);
assert_eq!(rx.recv().await, Some(-4));
assert_eq!(tx.capacity(), 1);
});
}
}