1use core::sync::atomic::{AtomicUsize, Ordering};
2#[cfg(feature = "std")]
3use std::sync::Arc;
4
5#[cfg(not(feature = "std"))]
6use alloc::{boxed::Box, sync::Arc};
7
8use async_channel::{unbounded, Receiver, Sender};
9use event_listener::{Event, Listener};
10
11use crate::AsyncSpawner;
12
13#[derive(Debug)]
14struct Canceler {
15 tx: Sender<()>,
16}
17
18impl Canceler {
19 #[inline]
20 fn cancel(&self) {
21 self.tx.close();
22 }
23}
24
25impl Drop for Canceler {
26 fn drop(&mut self) {
27 self.cancel();
28 }
29}
30
31#[derive(Debug)]
32#[repr(transparent)]
33struct CancelContext {
34 rx: Receiver<()>,
35}
36
37impl CancelContext {
38 fn new() -> (Self, Canceler) {
39 let (tx, rx) = unbounded();
40 (Self { rx }, Canceler { tx })
41 }
42
43 #[inline]
44 fn done(&self) -> Receiver<()> {
45 self.rx.clone()
46 }
47}
48
49#[derive(Debug)]
53pub struct AsyncCloser<S> {
54 inner: Arc<AsyncCloserInner>,
55 _spawner: core::marker::PhantomData<S>,
56}
57
58impl<S> Clone for AsyncCloser<S> {
59 fn clone(&self) -> Self {
60 Self {
61 inner: self.inner.clone(),
62 _spawner: core::marker::PhantomData,
63 }
64 }
65}
66
67#[derive(Debug)]
68struct AsyncCloserInner {
69 waitings: AtomicUsize,
70 event: Event,
71 ctx: CancelContext,
72 cancel: Canceler,
73}
74
75impl AsyncCloserInner {
76 #[inline]
77 fn new() -> Self {
78 let (ctx, cancel) = CancelContext::new();
79 Self {
80 waitings: AtomicUsize::new(0),
81 event: Event::new(),
82 ctx,
83 cancel,
84 }
85 }
86
87 #[inline]
88 fn with(initial: usize) -> Self {
89 let (ctx, cancel) = CancelContext::new();
90 Self {
91 waitings: AtomicUsize::new(initial),
92 event: Event::new(),
93 ctx,
94 cancel,
95 }
96 }
97}
98
99impl<S> Default for AsyncCloser<S> {
100 fn default() -> Self {
101 Self {
102 inner: Arc::new(AsyncCloserInner::new()),
103 _spawner: core::marker::PhantomData,
104 }
105 }
106}
107
108impl<S> AsyncCloser<S> {
109 #[inline]
111 pub fn new(initial: usize) -> Self {
112 Self {
113 inner: Arc::new(AsyncCloserInner::with(initial)),
114 _spawner: core::marker::PhantomData,
115 }
116 }
117
118 #[inline]
120 pub fn add_running(&self, running: usize) {
121 self.inner.waitings.fetch_add(running, Ordering::SeqCst);
122 }
123
124 #[inline]
126 pub fn done(&self) {
127 if self
128 .inner
129 .waitings
130 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
131 if v != 0 {
132 Some(v - 1)
133 } else {
134 None
135 }
136 })
137 .is_ok()
138 {
139 self.inner.event.notify(usize::MAX);
140 }
141 }
142
143 #[inline]
145 pub fn signal(&self) {
146 self.inner.cancel.cancel();
147 }
148
149 #[inline]
152 pub async fn wait(&self) {
153 while self.inner.waitings.load(Ordering::SeqCst) != 0 {
154 let ln = self.inner.event.listen();
155 if self.inner.waitings.load(Ordering::SeqCst) == 0 {
157 return;
158 }
159 ln.await;
160 }
161 }
162
163 #[inline]
165 pub async fn signal_and_wait(&self) {
166 self.signal();
167 self.wait().await;
168 }
169
170 #[inline]
172 pub fn listen(&self) -> Notify {
173 Notify(self.inner.ctx.done())
174 }
175}
176
177impl<S: AsyncSpawner> AsyncCloser<S> {
178 #[inline]
181 pub fn blocking_wait(&self) {
182 while self.inner.waitings.load(Ordering::SeqCst) != 0 {
183 let ln = self.inner.event.listen();
184 if self.inner.waitings.load(Ordering::SeqCst) == 0 {
186 return;
187 }
188 ln.wait();
189 }
190 }
191
192 #[inline]
194 pub fn signal_and_wait_detach(&self) {
195 self.signal();
196 let wg = self.clone();
197 S::spawn_detach(async move {
198 wg.wait().await;
199 })
200 }
201}
202
203pub struct Notify(Receiver<()>);
205
206impl Notify {
207 pub async fn wait(&self) {
209 let _ = self.0.recv().await;
210 }
211}