use core::sync::atomic::{AtomicUsize, Ordering};
#[cfg(feature = "std")]
use std::sync::Arc;
#[cfg(not(feature = "std"))]
use alloc::{boxed::Box, sync::Arc};
use async_channel::{unbounded, Receiver, Sender};
use event_listener::{Event, Listener};
use crate::AsyncSpawner;
#[derive(Debug)]
struct Canceler {
tx: Sender<()>,
}
impl Canceler {
#[inline]
fn cancel(&self) {
self.tx.close();
}
}
impl Drop for Canceler {
fn drop(&mut self) {
self.cancel();
}
}
#[derive(Debug)]
#[repr(transparent)]
struct CancelContext {
rx: Receiver<()>,
}
impl CancelContext {
fn new() -> (Self, Canceler) {
let (tx, rx) = unbounded();
(Self { rx }, Canceler { tx })
}
#[inline]
fn done(&self) -> Receiver<()> {
self.rx.clone()
}
}
#[derive(Debug)]
pub struct AsyncCloser<S> {
inner: Arc<AsyncCloserInner>,
_spawner: core::marker::PhantomData<S>,
}
impl<S> Clone for AsyncCloser<S> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
_spawner: core::marker::PhantomData,
}
}
}
#[derive(Debug)]
struct AsyncCloserInner {
waitings: AtomicUsize,
event: Event,
ctx: CancelContext,
cancel: Canceler,
}
impl AsyncCloserInner {
#[inline]
fn new() -> Self {
let (ctx, cancel) = CancelContext::new();
Self {
waitings: AtomicUsize::new(0),
event: Event::new(),
ctx,
cancel,
}
}
#[inline]
fn with(initial: usize) -> Self {
let (ctx, cancel) = CancelContext::new();
Self {
waitings: AtomicUsize::new(initial),
event: Event::new(),
ctx,
cancel,
}
}
}
impl<S> Default for AsyncCloser<S> {
fn default() -> Self {
Self {
inner: Arc::new(AsyncCloserInner::new()),
_spawner: core::marker::PhantomData,
}
}
}
impl<S> AsyncCloser<S> {
#[inline]
pub fn new(initial: usize) -> Self {
Self {
inner: Arc::new(AsyncCloserInner::with(initial)),
_spawner: core::marker::PhantomData,
}
}
#[inline]
pub fn add_running(&self, running: usize) {
self.inner.waitings.fetch_add(running, Ordering::SeqCst);
}
#[inline]
pub fn done(&self) {
if self
.inner
.waitings
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
if v != 0 {
Some(v - 1)
} else {
None
}
})
.is_ok()
{
self.inner.event.notify(usize::MAX);
}
}
#[inline]
pub fn signal(&self) {
self.inner.cancel.cancel();
}
#[inline]
pub async fn wait(&self) {
while self.inner.waitings.load(Ordering::SeqCst) != 0 {
let ln = self.inner.event.listen();
if self.inner.waitings.load(Ordering::SeqCst) == 0 {
return;
}
ln.await;
}
}
#[inline]
pub async fn signal_and_wait(&self) {
self.signal();
self.wait().await;
}
#[inline]
pub fn listen(&self) -> Notify {
Notify(self.inner.ctx.done())
}
}
impl<S: AsyncSpawner> AsyncCloser<S> {
#[inline]
pub fn blocking_wait(&self) {
while self.inner.waitings.load(Ordering::SeqCst) != 0 {
let ln = self.inner.event.listen();
if self.inner.waitings.load(Ordering::SeqCst) == 0 {
return;
}
ln.wait();
}
}
#[inline]
pub fn signal_and_wait_detach(&self) {
self.signal();
let wg = self.clone();
S::spawn_detach(async move {
wg.wait().await;
})
}
}
pub struct Notify(Receiver<()>);
impl Notify {
pub async fn wait(&self) {
let _ = self.0.recv().await;
}
}