use alloc::sync::Arc;
use core::sync::atomic::{AtomicUsize, Ordering::*};
use super::q::Queue;
use crate::sync::{AtomicWaker, Closure, Flags, Signal, Waiters};
pub enum RecvError {
Closed,
Empty,
}
pub enum SendError<V> {
Closed(V),
Full(V),
}
struct Channel<T> {
queue: Queue<T>,
senders: Waiters,
receivers: Waiters,
is_closed: Flags<Closure>,
capacity: Option<usize>,
}
impl<T: Send> Channel<T> {
fn init(capacity: Option<usize>) -> Arc<Channel<T>> {
Arc::new(Channel {
queue: Queue::new(),
senders: Waiters::new(),
receivers: Waiters::new(),
is_closed: Flags::default(),
capacity,
})
}
}
#[derive()]
pub struct Sender<T>(Arc<Channel<T>>);
pub struct Receiver<T>(Arc<Channel<T>>);
impl<T: Send> Sender<T> {
pub fn try_send(&self, value: T) -> Result<(), SendError<T>> {
let Channel {
queue,
senders,
receivers,
is_closed,
capacity,
} = &*self.0;
if is_closed.is_set(&Closure::Closed) {
return Err(SendError::Closed(value));
}
if let Some(cap) = capacity
&& queue.len() >= cap - 1
{
return Err(SendError::Full(value));
}
let Some(waiter) = (unsafe {
queue.enqueue(value);
receivers.dequeue()
}) else {
unreachable!();
};
waiter.wake_by_ref();
Ok(())
}
}
impl<T: Send> Receiver<T> {
pub fn try_recv(&self) -> Result<T, RecvError> {
let Channel {
queue,
senders,
receivers,
is_closed,
capacity,
} = &*self.0;
if let Some(value) = unsafe { queue.dequeue() } {
if capacity.is_some()
&& let Some(waiter) = unsafe { senders.dequeue() }
{
waiter.wake_by_ref();
}
return Ok(value);
}
if is_closed.is_set(&Closure::Closed) {
Err(RecvError::Closed)?
}
Err(RecvError::Empty)
}
}
pub fn unbounded<T: Send>() -> (Sender<T>, Receiver<T>) {
let channel = Channel::init(None);
(Sender(channel.clone()), Receiver(channel))
}
pub fn bounded<T: Send>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let channel = Channel::init(Some(capacity));
(Sender(channel.clone()), Receiver(channel))
}