use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::task::AtomicWaker;
#[derive(Debug)]
pub struct AsyncReceiver<T> {
pub(crate) received: Arc<AtomicBool>,
pub(crate) waker: Arc<AtomicWaker>, pub(crate) receiver: oneshot::Receiver<T>,
}
impl<T> AsyncReceiver<T> {
pub fn try_recv(&mut self) -> Option<T> {
match self.receiver.try_recv() {
Ok(Some(t)) => {
self.receiver.close();
self.received.store(true, Ordering::Relaxed);
self.waker.wake(); Some(t)
}
Ok(None) | Err(_) => None,
}
}
}
#[derive(Debug)]
pub struct AsyncStreamReceiver<T> {
pub(crate) finished: Arc<AtomicBool>,
pub(crate) waker: Arc<AtomicWaker>,
pub(crate) receiver: mpsc::UnboundedReceiver<T>,
pub(crate) received: Arc<AtomicBool>,
}
impl<T> AsyncStreamReceiver<T> {
pub fn is_finished(&self) -> bool {
self.finished.load(Ordering::Relaxed)
}
pub fn try_recv(&mut self) -> Option<T> {
match self.receiver.try_next() {
Ok(Some(item)) => Some(item),
Err(_) => {
None
}
Ok(None) => {
if self.finished.load(Ordering::Relaxed) {
self.received.store(true, Ordering::Relaxed);
self.waker.wake();
}
None
}
}
}
}
impl<T> Drop for AsyncStreamReceiver<T> {
fn drop(&mut self) {
self.received.store(true, Ordering::Relaxed);
self.waker.wake();
}
}