use alloc::rc::Rc;
use core::pin::Pin;
use crossbeam_queue::{ArrayQueue, PushError};
use futures_util::stream::Stream;
use futures_util::task::{AtomicWaker, Context, Poll};
struct Queue<T> {
queue: ArrayQueue<T>,
waker: AtomicWaker,
}
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
let queue = Rc::new(Queue {
queue: ArrayQueue::new(buffer),
waker: Default::default(),
});
(
Sender {
queue: queue.clone(),
},
Receiver { queue },
)
}
#[derive(Clone)]
pub struct Sender<T> {
queue: Rc<Queue<T>>,
}
impl<T> Sender<T> {
pub fn send(&self, data: T) -> Result<(), PushError<T>> {
self.queue.waker.wake();
self.queue.queue.push(data)
}
pub fn capacity(&self) -> usize {
self.queue.queue.capacity()
}
pub fn is_empty(&self) -> bool {
self.queue.queue.is_empty()
}
pub fn is_full(&self) -> bool {
self.queue.queue.is_full()
}
pub fn len(&self) -> usize {
self.queue.queue.len()
}
}
pub struct Receiver<T> {
queue: Rc<Queue<T>>,
}
impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.queue.waker.register(&cx.waker());
match self.queue.queue.pop() {
Ok(data) => Poll::Ready(Some(data)),
_ => {
if Rc::strong_count(&self.queue) < 2 {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
}
}