use event_listener::Event;
use std::mem;
use std::sync::{Arc, Mutex};
pub(crate) fn channel<T>() -> (Sender<T>, Receiver<T>) {
let channel = Arc::new(Channel {
val: Mutex::new(State::Empty),
waker: Event::new(),
});
(Sender(channel.clone()), Receiver(channel))
}
pub(crate) struct Sender<T>(Arc<Channel<T>>);
impl<T> Sender<T> {
pub(crate) fn send(&self, val: T) {
let mut lock = self.0.val.lock().unwrap();
*lock = State::Full(val);
self.0.waker.notify(1);
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let mut lock = self.0.val.lock().unwrap();
match &mut *lock {
State::Full(_) => {}
_ => {
*lock = State::Closed;
self.0.waker.notify(1);
}
}
}
}
pub(crate) struct Receiver<T>(Arc<Channel<T>>);
impl<T> Receiver<T> {
pub(crate) fn try_recv(&self) -> Option<T> {
let mut lock = self.0.val.lock().unwrap();
match &mut *lock {
State::Full(_) => {
if let State::Full(val) = mem::replace(&mut *lock, State::Empty) {
Some(val)
} else {
unreachable!()
}
}
State::Empty => None,
State::Closed => panic!("channel is closed"),
}
}
pub(crate) async fn recv(&self) -> T {
loop {
if let Some(value) = self.try_recv() {
return value;
}
let listener = self.0.waker.listen();
if let Some(value) = self.try_recv() {
return value;
}
listener.await;
}
}
pub(crate) fn recv_blocking(&self) -> T {
loop {
if let Some(value) = self.try_recv() {
return value;
}
let listener = self.0.waker.listen();
if let Some(value) = self.try_recv() {
return value;
}
listener.wait();
}
}
}
struct Channel<T> {
val: Mutex<State<T>>,
waker: Event,
}
enum State<T> {
Empty,
Full(T),
Closed,
}