pub mod error;
use event_listener::Event;
use std::ops;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::{Arc, RwLock, RwLockReadGuard};
const VERSION_0: usize = 0b00;
const VERSION_1: usize = 0b10;
const CLOSED: usize = 0b01;
#[derive(Debug)]
pub struct Receiver<T> {
shared: Arc<Shared<T>>,
version: usize,
}
#[derive(Debug)]
pub struct Sender<T> {
shared: Arc<Shared<T>>,
}
#[derive(Debug)]
pub struct Ref<'a, T> {
inner: RwLockReadGuard<'a, T>,
}
#[derive(Debug)]
struct Shared<T> {
value: RwLock<T>,
version: AtomicUsize,
ref_count_rx: AtomicUsize,
event_value_changed: Event,
event_all_recv_dropped: Event,
}
pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
let shared = Arc::new(Shared {
value: RwLock::new(init),
version: AtomicUsize::new(VERSION_0),
ref_count_rx: AtomicUsize::new(1),
event_value_changed: Event::new(),
event_all_recv_dropped: Event::new(),
});
let tx = Sender {
shared: shared.clone(),
};
let rx = Receiver {
shared,
version: VERSION_0,
};
(tx, rx)
}
impl<T> Receiver<T> {
pub fn borrow(&self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
Ref { inner }
}
pub async fn changed(&mut self) -> Result<(), error::RecvError> {
if let Some(ret) = self.maybe_changed() {
return ret;
}
let listener = self.shared.event_value_changed.listen();
if let Some(ret) = self.maybe_changed() {
return ret;
}
listener.await;
self.maybe_changed()
.expect("[bug] failed to observe change after notificaton.")
}
fn maybe_changed(&mut self) -> Option<Result<(), error::RecvError>> {
let state = self.shared.version.load(SeqCst);
let new_version = state & !CLOSED;
if self.version != new_version {
self.version = new_version;
return Some(Ok(()));
}
if CLOSED == state & CLOSED {
return Some(Err(error::RecvError {}));
}
None
}
}
impl<T: Clone> Receiver<T> {
pub async fn recv(&mut self) -> Result<T, error::RecvError> {
self.changed().await?;
Ok(self.borrow().clone())
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
self.shared.ref_count_rx.fetch_add(1, Relaxed);
Receiver {
shared: self.shared.clone(),
version: self.version,
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
if self.shared.ref_count_rx.fetch_sub(1, Relaxed) == 1 {
self.shared.event_all_recv_dropped.notify(usize::MAX);
}
}
}
impl<T> Sender<T> {
pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
if self.shared.ref_count_rx.load(Relaxed) == 0 {
return Err(error::SendError { inner: value });
}
*self.shared.value.write().unwrap() = value;
self.shared.version.fetch_add(VERSION_1, SeqCst);
self.shared.event_value_changed.notify(usize::MAX);
Ok(())
}
pub async fn closed(&self) {
if self.shared.ref_count_rx.load(Relaxed) == 0 {
return;
}
let listener = self.shared.event_all_recv_dropped.listen();
if self.shared.ref_count_rx.load(Relaxed) == 0 {
return;
}
listener.await;
debug_assert_eq!(self.shared.ref_count_rx.load(Relaxed), 0);
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.shared.version.fetch_or(CLOSED, SeqCst);
self.shared.event_value_changed.notify(usize::MAX);
}
}
impl<T> ops::Deref for Ref<'_, T> {
type Target = T;
fn deref(&self) -> &T {
self.inner.deref()
}
}