use crate::{Event, EventListener, IntoNotification};
use alloc::collections::VecDeque;
use alloc::rc::Rc;
use core::cell::{Cell, RefCell};
use core::fmt;
use core::pin::Pin;
struct Channel<T> {
data: RefCell<VecDeque<T>>,
closed: Cell<bool>,
senders: Cell<usize>,
receivers: Cell<usize>,
event: Event<Option<T>>,
}
pub struct Sender<T> {
channel: Rc<Channel<T>>,
}
pub struct Receiver<T> {
channel: Rc<Channel<T>>,
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let channel = Rc::new(Channel {
data: RefCell::new(VecDeque::new()),
senders: Cell::new(1),
receivers: Cell::new(1),
closed: Cell::new(false),
event: Event::new(),
});
(
Sender {
channel: channel.clone(),
},
Receiver { channel },
)
}
impl<T> Sender<T> {
pub fn send(&self, item: T) -> Result<(), ChannelClosed> {
if self.channel.closed.get() {
return Err(ChannelClosed { _private: () });
}
let mut item = Some(item);
self.channel.event.notify(1.tag_with(|| item.take()));
if let Some(item) = item {
self.channel.data.borrow_mut().push_back(item);
}
Ok(())
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
let new_senders = self.channel.senders.get() + 1;
self.channel.senders.set(new_senders);
Sender {
channel: self.channel.clone(),
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let new_senders = self.channel.senders.get() - 1;
self.channel.senders.set(new_senders);
if new_senders == 0 {
self.channel.closed.set(true);
self.channel
.event
.notify(core::usize::MAX.tag_with(|| None));
}
}
}
impl<T> Receiver<T> {
pub fn try_recv(&self) -> Result<T, TryRecvError> {
self.channel.data.borrow_mut().pop_front().ok_or_else(|| {
if self.channel.closed.get() {
TryRecvError::Closed
} else {
TryRecvError::Empty
}
})
}
pub async fn recv(&self) -> Result<T, ChannelClosed> {
let mut listener = EventListener::new(&self.channel.event);
{
let mut listener = unsafe { Pin::new_unchecked(&mut listener) };
loop {
if let Some(item) = self.channel.data.borrow_mut().pop_front() {
return Ok(item);
}
if self.channel.closed.get() {
return Err(ChannelClosed { _private: () });
}
if let Some(item) = listener.as_mut().await {
return Ok(item);
}
}
}
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Receiver<T> {
let new_receivers = self.channel.receivers.get() + 1;
self.channel.receivers.set(new_receivers);
Receiver {
channel: self.channel.clone(),
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let new_receivers = self.channel.receivers.get() - 1;
self.channel.receivers.set(new_receivers);
if new_receivers == 0 {
self.channel.closed.set(true);
self.channel
.event
.notify(core::usize::MAX.tag_with(|| None));
}
}
}
#[derive(Debug)]
pub struct ChannelClosed {
_private: (),
}
impl fmt::Display for ChannelClosed {
#[cfg_attr(coverage, no_coverage)]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "channel closed")
}
}
#[cfg(feature = "std")]
impl std::error::Error for ChannelClosed {}
#[derive(Debug)]
pub enum TryRecvError {
Closed,
Empty,
}
impl fmt::Display for TryRecvError {
#[cfg_attr(coverage, no_coverage)]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TryRecvError::Closed => write!(f, "channel closed"),
TryRecvError::Empty => write!(f, "channel empty"),
}
}
}
#[cfg(feature = "std")]
impl std::error::Error for TryRecvError {
#[cfg_attr(coverage, no_coverage)]
fn cause(&self) -> Option<&dyn std::error::Error> {
match self {
TryRecvError::Closed => Some(&ChannelClosed { _private: () }),
TryRecvError::Empty => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_lite::future;
#[test]
fn test_channel() {
future::block_on(async {
let (sender, receiver) = channel();
sender.send(1).unwrap();
sender.send(2).unwrap();
sender.send(3).unwrap();
assert_eq!(receiver.try_recv().unwrap(), 1);
assert_eq!(receiver.recv().await.unwrap(), 2);
assert_eq!(receiver.try_recv().unwrap(), 3);
assert!(receiver.try_recv().is_err());
drop(sender);
assert!(receiver.recv().await.is_err());
});
}
#[test]
fn test_channel_clone() {
future::block_on(async {
let (sender, receiver) = channel();
let sender2 = sender.clone();
sender.send(1).unwrap();
sender2.send(2).unwrap();
assert_eq!(receiver.try_recv().unwrap(), 1);
assert_eq!(receiver.try_recv().unwrap(), 2);
assert!(receiver.try_recv().is_err());
drop(sender);
drop(sender2);
assert!(receiver.recv().await.is_err());
});
}
#[test]
fn test_channel_recv_clone() {
future::block_on(async {
let (sender, receiver) = channel();
let receiver2 = receiver.clone();
sender.send(1).unwrap();
sender.send(2).unwrap();
assert_eq!(receiver.try_recv().unwrap(), 1);
assert_eq!(receiver2.try_recv().unwrap(), 2);
assert!(receiver.try_recv().is_err());
assert!(receiver2.try_recv().is_err());
drop((receiver, receiver2));
assert!(sender.send(3).is_err());
});
}
#[test]
fn test_send_direct() {
future::block_on(async {
let (sender, receiver) = channel();
let recv = receiver.recv();
futures_lite::pin!(recv);
assert!(future::poll_once(&mut recv).await.is_none());
sender.send(1).unwrap();
assert_eq!(future::poll_once(&mut recv).await.unwrap().ok(), Some(1));
});
}
#[test]
fn test_recv_and_drop() {
future::block_on(async {
let (sender, receiver) = channel::<i32>();
let recv = receiver.recv();
futures_lite::pin!(recv);
let receiver2 = receiver.clone();
let recv2 = receiver2.recv();
futures_lite::pin!(recv2);
assert!(future::poll_once(&mut recv).await.is_none());
assert!(future::poll_once(&mut recv2).await.is_none());
drop(sender);
assert!(recv.await.is_err());
assert!(recv2.await.is_err());
});
}
#[test]
fn test_channel_drop() {
future::block_on(async {
let (sender, receiver) = channel();
sender.send(1).unwrap();
sender.send(2).unwrap();
sender.send(3).unwrap();
drop(sender);
assert_eq!(receiver.try_recv().unwrap(), 1);
assert_eq!(receiver.try_recv().unwrap(), 2);
assert_eq!(receiver.try_recv().unwrap(), 3);
assert!(receiver.try_recv().is_err());
assert!(receiver.recv().await.is_err());
});
}
}