use parking_lot::{Condvar, Mutex};
use std::sync::{
Arc,
mpsc::{RecvError, SendError, TryRecvError},
};
type Channel<T> = Mutex<(Option<T>, bool)>;
pub(crate) struct Notifier<T> {
inner: Arc<(Channel<T>, Condvar)>,
}
#[inline]
fn channel_inner<T>(msg: Option<T>) -> (Notifier<T>, Observer<T>) {
let inner = Arc::new((Mutex::new((msg, true)), Condvar::new()));
let observer = Observer {
inner: Arc::clone(&inner),
};
let notifier = Notifier { inner };
(notifier, observer)
}
pub(crate) fn occupied_channel<T>(msg: T) -> (Notifier<T>, Observer<T>) {
channel_inner(Some(msg))
}
pub(crate) fn channel<T>() -> (Notifier<T>, Observer<T>) {
channel_inner(None)
}
impl<T> Notifier<T> {
pub fn push(&self, msg: T) -> Result<(), SendError<T>> {
if Arc::strong_count(&self.inner) == 1 {
Err(SendError(msg))
} else {
let (lock, cvar) = &*self.inner;
let mut channel = lock.lock();
channel.0 = Some(msg);
cvar.notify_one();
Ok(())
}
}
}
impl<T> Drop for Notifier<T> {
fn drop(&mut self) {
let (lock, cvar) = &*self.inner;
let mut channel = lock.lock();
channel.1 = false;
cvar.notify_all();
}
}
pub struct Observer<T> {
inner: Arc<(Channel<T>, Condvar)>,
}
impl<T> Clone for Observer<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T> Observer<T> {
pub fn recv(&self) -> Result<T, RecvError> {
let (lock, cvar) = &*self.inner;
let mut channel = lock.lock();
match channel.0.take() {
Some(msg) => Ok(msg),
None => {
if channel.1 {
cvar.wait(&mut channel);
match channel.0.take() {
Some(msg) => Ok(msg),
None => Err(RecvError),
}
} else {
Err(RecvError)
}
}
}
}
pub fn try_recv(&self) -> Result<T, TryRecvError> {
let (lock, _) = &*self.inner;
let mut channel = lock.lock();
channel.0.take().ok_or(if channel.1 {
TryRecvError::Empty
} else {
TryRecvError::Disconnected
})
}
}