use core::{
fmt,
task::{Context, Poll},
};
use crate::{
alloc::{collections::VecDeque, rc::Rc},
error::{SendError, TryRecvError},
mask::{COUNTED, UNCOUNTED},
queue::UnboundedQueue,
};
pub const fn channel<T>() -> UnboundedChannel<T> {
UnboundedChannel { queue: UnboundedQueue::new() }
}
pub fn channel_from_iter<T>(iter: impl IntoIterator<Item = T>) -> UnboundedChannel<T> {
UnboundedChannel::from_iter(iter)
}
pub struct UnboundedChannel<T> {
queue: UnboundedQueue<T>,
}
impl<T> FromIterator<T> for UnboundedChannel<T> {
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
Self { queue: UnboundedQueue::from_iter(iter) }
}
}
impl<T> UnboundedChannel<T> {
pub fn with_initial_capacity(initial: usize) -> Self {
Self { queue: UnboundedQueue::with_capacity(initial) }
}
pub fn split(&mut self) -> (UnboundedSenderRef<'_, T>, UnboundedReceiverRef<'_, T>) {
self.queue.0.get_mut().set_counted();
(UnboundedSenderRef { queue: &self.queue }, UnboundedReceiverRef { queue: &self.queue })
}
pub fn into_split(mut self) -> (UnboundedSender<T>, UnboundedReceiver<T>) {
self.queue.0.get_mut().set_counted();
let queue = Rc::new(self.queue);
(UnboundedSender { queue: Rc::clone(&queue) }, UnboundedReceiver { queue })
}
pub fn into_deque(self) -> VecDeque<T> {
self.queue.into_deque()
}
pub fn len(&self) -> usize {
self.queue.len()
}
#[cold]
pub fn close(&self) {
self.queue.close::<UNCOUNTED>();
}
pub fn is_closed(&self) -> bool {
self.queue.is_closed::<UNCOUNTED>()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn try_recv(&self) -> Result<T, TryRecvError> {
self.queue.try_recv::<UNCOUNTED>()
}
pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.queue.poll_recv::<UNCOUNTED>(cx)
}
pub async fn recv(&self) -> Option<T> {
self.queue.recv::<UNCOUNTED>().await
}
pub fn send(&self, elem: T) -> Result<(), SendError<T>> {
self.queue.send::<UNCOUNTED>(elem)
}
}
impl<T> fmt::Debug for UnboundedChannel<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UnboundedChannel")
.field("len", &self.len())
.field("is_closed", &self.is_closed())
.finish()
}
}
pub struct UnboundedSender<T> {
queue: Rc<UnboundedQueue<T>>,
}
impl<T> UnboundedSender<T> {
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn is_closed(&self) -> bool {
self.queue.is_closed::<COUNTED>()
}
pub fn is_empty(&self) -> bool {
self.queue.len() == 0
}
pub fn same_channel(&self, other: &Self) -> bool {
core::ptr::eq(Rc::as_ptr(&self.queue), Rc::as_ptr(&other.queue))
}
pub fn send(&self, elem: T) -> Result<(), SendError<T>> {
self.queue.send::<COUNTED>(elem)
}
}
impl<T> Clone for UnboundedSender<T> {
fn clone(&self) -> Self {
unsafe { (*self.queue.0.get()).mask.increase_sender_count() };
Self { queue: Rc::clone(&self.queue) }
}
}
impl<T> Drop for UnboundedSender<T> {
fn drop(&mut self) {
unsafe { (*self.queue.0.get()).decrease_sender_count() };
}
}
impl<T> fmt::Debug for UnboundedSender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UnboundedSender")
.field("len", &self.len())
.field("is_closed", &self.is_closed())
.finish()
}
}
pub struct UnboundedSenderRef<'a, T> {
queue: &'a UnboundedQueue<T>,
}
impl<T> UnboundedSenderRef<'_, T> {
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn is_closed(&self) -> bool {
self.queue.is_closed::<COUNTED>()
}
pub fn is_empty(&self) -> bool {
self.queue.len() == 0
}
pub fn same_channel(&self, other: &Self) -> bool {
core::ptr::eq(&self.queue, &other.queue)
}
pub fn send(&self, elem: T) -> Result<(), SendError<T>> {
self.queue.send::<COUNTED>(elem)
}
}
impl<T> Clone for UnboundedSenderRef<'_, T> {
fn clone(&self) -> Self {
unsafe { (*self.queue.0.get()).mask.increase_sender_count() };
Self { queue: self.queue }
}
}
impl<T> Drop for UnboundedSenderRef<'_, T> {
fn drop(&mut self) {
unsafe { (*self.queue.0.get()).decrease_sender_count() };
}
}
impl<T> fmt::Debug for UnboundedSenderRef<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UnboundedSenderRef")
.field("len", &self.len())
.field("is_closed", &self.is_closed())
.finish()
}
}
pub struct UnboundedReceiver<T> {
queue: Rc<UnboundedQueue<T>>,
}
impl<T> UnboundedReceiver<T> {
#[cold]
pub fn close(&mut self) {
self.queue.close::<COUNTED>();
}
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn is_closed(&self) -> bool {
self.queue.is_closed::<COUNTED>()
}
pub fn is_empty(&self) -> bool {
self.queue.len() == 0
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.queue.try_recv::<COUNTED>()
}
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.queue.poll_recv::<COUNTED>(cx)
}
pub async fn recv(&mut self) -> Option<T> {
self.queue.recv::<COUNTED>().await
}
}
impl<T> Drop for UnboundedReceiver<T> {
fn drop(&mut self) {
self.queue.close::<COUNTED>();
}
}
impl<T> fmt::Debug for UnboundedReceiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UnboundedReceiver")
.field("len", &self.len())
.field("is_closed", &self.is_closed())
.finish()
}
}
pub struct UnboundedReceiverRef<'a, T> {
queue: &'a UnboundedQueue<T>,
}
impl<T> UnboundedReceiverRef<'_, T> {
#[cold]
pub fn close(&mut self) {
self.queue.close::<COUNTED>();
}
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn is_closed(&self) -> bool {
self.queue.is_closed::<COUNTED>()
}
pub fn is_empty(&self) -> bool {
self.queue.len() == 0
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.queue.try_recv::<COUNTED>()
}
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.queue.poll_recv::<COUNTED>(cx)
}
pub async fn recv(&mut self) -> Option<T> {
self.queue.recv::<COUNTED>().await
}
}
impl<T> Drop for UnboundedReceiverRef<'_, T> {
fn drop(&mut self) {
self.queue.close::<COUNTED>();
}
}
impl<T> fmt::Debug for UnboundedReceiverRef<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UnboundedReceiverRef")
.field("len", &self.len())
.field("is_closed", &self.is_closed())
.finish()
}
}
#[cfg(test)]
mod tests {
use core::{future::Future as _, task::Poll};
#[test]
fn try_recv() {
let chan = super::channel::<i32>();
assert!(chan.try_recv().is_err());
chan.send(-1).unwrap();
assert_eq!(chan.try_recv(), Ok(-1));
}
#[test]
fn try_recv_split() {
let mut chan = super::channel::<i32>();
let (tx, mut rx) = chan.split();
assert!(rx.try_recv().is_err());
tx.send(-1).unwrap();
assert_eq!(rx.try_recv(), Ok(-1));
}
#[test]
fn try_recv_closed() {
let chan = super::channel::<i32>();
assert!(chan.try_recv().is_err());
chan.send(-1).unwrap();
chan.close();
assert_eq!(chan.try_recv(), Ok(-1));
assert!(chan.try_recv().unwrap_err().is_disconnected());
}
#[test]
fn try_recv_closed_split() {
let mut chan = super::channel::<i32>();
let (tx, mut rx) = chan.split();
assert!(rx.try_recv().is_err());
tx.send(-1).unwrap();
drop(tx);
assert_eq!(rx.try_recv(), Ok(-1));
assert!(rx.try_recv().unwrap_err().is_disconnected());
}
#[test]
fn send_split() {
let mut chan = super::channel::<i32>();
let (tx, mut rx) = chan.split();
for i in 0..4 {
let _ = tx.send(i);
}
assert_eq!(rx.try_recv(), Ok(0));
assert_eq!(rx.try_recv(), Ok(1));
assert_eq!(rx.try_recv(), Ok(2));
assert_eq!(rx.try_recv(), Ok(3));
}
#[test]
fn send_closed() {
let chan = super::channel::<i32>();
chan.close();
assert!(chan.send(-1).is_err());
}
#[test]
fn send_closed_split() {
let mut chan = super::channel::<i32>();
let (tx, _) = chan.split();
assert!(tx.send(-1).is_err());
}
#[test]
fn recv() {
futures_lite::future::block_on(async {
let chan = super::channel::<i32>();
chan.send(-1).unwrap();
assert_eq!(chan.recv().await, Some(-1));
chan.send(-2).unwrap();
assert_eq!(chan.recv().await, Some(-2));
chan.close();
chan.send(-3).unwrap_err();
});
}
#[test]
fn recv_split() {
futures_lite::future::block_on(async {
let mut chan = super::channel::<i32>();
let (tx, mut rx) = chan.split();
tx.send(-1).unwrap();
assert_eq!(rx.recv().await, Some(-1));
tx.send(-2).unwrap();
assert_eq!(rx.recv().await, Some(-2));
drop(rx);
tx.send(-3).unwrap_err();
});
}
#[test]
fn recv_closed_split() {
futures_lite::future::block_on(async {
let mut chan = super::channel::<i32>();
let (tx, mut rx) = chan.split();
tx.send(-1).unwrap();
tx.send(-2).unwrap();
tx.send(-3).unwrap();
rx.close();
assert!(tx.send(-4).is_err());
assert_eq!(rx.recv().await, Some(-1));
assert_eq!(rx.recv().await, Some(-2));
assert_eq!(rx.recv().await, Some(-3));
assert_eq!(rx.recv().await, None);
assert!(tx.send(-4).is_err());
});
}
#[test]
fn poll_recv() {
futures_lite::future::block_on(async {
let chan = super::channel::<i32>();
core::future::poll_fn(|cx| {
assert!(chan.poll_recv(cx).is_pending());
assert!(chan.poll_recv(cx).is_pending());
chan.send(1).unwrap();
assert_eq!(chan.poll_recv(cx), Poll::Ready(Some(1)));
Poll::Ready(())
})
.await;
});
}
#[test]
fn multiple_recv() {
futures_lite::future::block_on(async {
let chan = super::channel::<i32>();
let mut recv1 = Box::pin(chan.recv());
let mut recv2 = Box::pin(chan.recv());
core::future::poll_fn(|cx| {
assert!(recv1.as_mut().poll(cx).is_pending());
assert!(recv2.as_mut().poll(cx).is_pending());
chan.send(1).unwrap();
assert_eq!(recv2.as_mut().poll(cx), Poll::Ready(Some(1)));
Poll::Ready(())
})
.await;
chan.send(2).unwrap();
assert_eq!(chan.recv().await, Some(2))
});
}
#[test]
fn use_after_split() {
futures_lite::future::block_on(async {
let mut chan = super::channel::<i32>();
{
let (tx, mut rx) = chan.split();
tx.send(1).unwrap();
tx.send(2).unwrap();
assert_eq!(rx.recv().await, Some(1));
rx.close();
}
assert!(chan.is_closed());
assert_eq!(chan.recv().await, Some(2));
assert_eq!(chan.recv().await, None);
});
}
#[test]
fn split_after_close() {
let mut chan = super::channel::<i32>();
chan.close();
let (tx, rx) = chan.split();
assert!(tx.is_closed());
assert!(rx.is_closed());
}
}