use alloc::sync::Arc;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
pub fn deliver_channel<T>() -> (DeliverSender<T>, DeliverReceiver<T>) {
let common = Arc::new(Common {
queue: crossbeam_queue::SegQueue::new(),
num_senders_alive: AtomicUsize::new(1),
sender_did_something: event_listener::Event::new(),
receiver_is_dead: AtomicBool::new(false),
receiver_did_something: event_listener::Event::new(),
});
let tx = DeliverSender {
common: common.clone(),
};
let rx = DeliverReceiver { common };
(tx, rx)
}
pub struct DeliverSender<T> {
common: Arc<Common<T>>,
}
impl<T> DeliverSender<T> {
pub async fn deliver(&mut self, payload: T) -> Result<(), T> {
let message = Arc::new(atomic_take::AtomicTake::new(payload));
self.common.queue.push(message.clone());
self.common.sender_did_something.notify(1);
let mut waiter = None;
loop {
if message.is_taken() {
return Ok(());
}
if self.common.receiver_is_dead.load(Ordering::Acquire) {
return match message.take() {
Some(payload) => Err(payload),
None => Ok(()),
};
}
if let Some(waiter) = waiter.take() {
waiter.await;
} else {
waiter = Some(self.common.receiver_did_something.listen());
}
}
}
}
impl<T> Clone for DeliverSender<T> {
fn clone(&self) -> Self {
self.common
.num_senders_alive
.fetch_add(1, Ordering::Release);
DeliverSender {
common: self.common.clone(),
}
}
}
impl<T> Drop for DeliverSender<T> {
fn drop(&mut self) {
let _num_remain = self
.common
.num_senders_alive
.fetch_sub(1, Ordering::Release);
debug_assert!(_num_remain != usize::MAX); self.common.sender_did_something.notify(usize::MAX);
}
}
pub struct DeliverReceiver<T> {
common: Arc<Common<T>>,
}
impl<T> DeliverReceiver<T> {
pub async fn next(&mut self) -> Option<T> {
let message: Arc<atomic_take::AtomicTake<T>> = {
let mut waiter = None;
loop {
if let Some(item) = self.common.queue.pop() {
break item;
}
if self.common.num_senders_alive.load(Ordering::Acquire) == 0 {
return None;
}
if let Some(waiter) = waiter.take() {
waiter.await;
} else {
waiter = Some(self.common.sender_did_something.listen());
}
}
};
let payload = message.take().unwrap();
self.common.receiver_did_something.notify(usize::MAX);
Some(payload)
}
}
impl<T> Drop for DeliverReceiver<T> {
fn drop(&mut self) {
self.common.receiver_is_dead.store(true, Ordering::Release);
self.common.receiver_did_something.notify(usize::MAX);
}
}
struct Common<T> {
queue: crossbeam_queue::SegQueue<Arc<atomic_take::AtomicTake<T>>>,
num_senders_alive: AtomicUsize,
sender_did_something: event_listener::Event,
receiver_is_dead: AtomicBool,
receiver_did_something: event_listener::Event,
}