wmark/closer/
future.rs

1use core::sync::atomic::{AtomicUsize, Ordering};
2#[cfg(feature = "std")]
3use std::sync::Arc;
4
5#[cfg(not(feature = "std"))]
6use alloc::{boxed::Box, sync::Arc};
7
8use async_channel::{unbounded, Receiver, Sender};
9use event_listener::{Event, Listener};
10
11use crate::AsyncSpawner;
12
13#[derive(Debug)]
14struct Canceler {
15  tx: Sender<()>,
16}
17
18impl Canceler {
19  #[inline]
20  fn cancel(&self) {
21    self.tx.close();
22  }
23}
24
25impl Drop for Canceler {
26  fn drop(&mut self) {
27    self.cancel();
28  }
29}
30
31#[derive(Debug)]
32#[repr(transparent)]
33struct CancelContext {
34  rx: Receiver<()>,
35}
36
37impl CancelContext {
38  fn new() -> (Self, Canceler) {
39    let (tx, rx) = unbounded();
40    (Self { rx }, Canceler { tx })
41  }
42
43  #[inline]
44  fn done(&self) -> Receiver<()> {
45    self.rx.clone()
46  }
47}
48
49/// AsyncCloser holds the two things we need to close a thread and wait for it to
50/// finish: a chan to tell the thread to shut down, and a WaitGroup with
51/// which to wait for it to finish shutting down.
52#[derive(Debug)]
53pub struct AsyncCloser<S> {
54  inner: Arc<AsyncCloserInner>,
55  _spawner: core::marker::PhantomData<S>,
56}
57
58impl<S> Clone for AsyncCloser<S> {
59  fn clone(&self) -> Self {
60    Self {
61      inner: self.inner.clone(),
62      _spawner: core::marker::PhantomData,
63    }
64  }
65}
66
67#[derive(Debug)]
68struct AsyncCloserInner {
69  waitings: AtomicUsize,
70  event: Event,
71  ctx: CancelContext,
72  cancel: Canceler,
73}
74
75impl AsyncCloserInner {
76  #[inline]
77  fn new() -> Self {
78    let (ctx, cancel) = CancelContext::new();
79    Self {
80      waitings: AtomicUsize::new(0),
81      event: Event::new(),
82      ctx,
83      cancel,
84    }
85  }
86
87  #[inline]
88  fn with(initial: usize) -> Self {
89    let (ctx, cancel) = CancelContext::new();
90    Self {
91      waitings: AtomicUsize::new(initial),
92      event: Event::new(),
93      ctx,
94      cancel,
95    }
96  }
97}
98
99impl<S> Default for AsyncCloser<S> {
100  fn default() -> Self {
101    Self {
102      inner: Arc::new(AsyncCloserInner::new()),
103      _spawner: core::marker::PhantomData,
104    }
105  }
106}
107
108impl<S> AsyncCloser<S> {
109  /// Constructs a new [`AsyncCloser`], with an initial count on the [`WaitGroup`].
110  #[inline]
111  pub fn new(initial: usize) -> Self {
112    Self {
113      inner: Arc::new(AsyncCloserInner::with(initial)),
114      _spawner: core::marker::PhantomData,
115    }
116  }
117
118  /// Adds delta to the [`WaitGroup`].
119  #[inline]
120  pub fn add_running(&self, running: usize) {
121    self.inner.waitings.fetch_add(running, Ordering::SeqCst);
122  }
123
124  /// Calls [`WaitGroup::done`] on the [`WaitGroup`].
125  #[inline]
126  pub fn done(&self) {
127    if self
128      .inner
129      .waitings
130      .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
131        if v != 0 {
132          Some(v - 1)
133        } else {
134          None
135        }
136      })
137      .is_ok()
138    {
139      self.inner.event.notify(usize::MAX);
140    }
141  }
142
143  /// Signals the [`AsyncCloser::listen`] signal.
144  #[inline]
145  pub fn signal(&self) {
146    self.inner.cancel.cancel();
147  }
148
149  /// Waits on the [`WaitGroup`]. (It waits for the AsyncCloser's initial value, [`AsyncCloser::add_running`], and [`AsyncCloser::done`]
150  /// calls to balance out.)
151  #[inline]
152  pub async fn wait(&self) {
153    while self.inner.waitings.load(Ordering::SeqCst) != 0 {
154      let ln = self.inner.event.listen();
155      // Check the flag again after creating the listener.
156      if self.inner.waitings.load(Ordering::SeqCst) == 0 {
157        return;
158      }
159      ln.await;
160    }
161  }
162
163  /// Calls [`AsyncCloser::signal`], then [`AsyncCloser::wait`].
164  #[inline]
165  pub async fn signal_and_wait(&self) {
166    self.signal();
167    self.wait().await;
168  }
169
170  /// Gets signaled when [`AsyncCloser::signal`] is called.
171  #[inline]
172  pub fn listen(&self) -> Notify {
173    Notify(self.inner.ctx.done())
174  }
175}
176
177impl<S: AsyncSpawner> AsyncCloser<S> {
178  /// Waits on the [`WaitGroup`]. (It waits for the AsyncCloser's initial value, [`AsyncCloser::add_running`], and [`AsyncCloser::done`]
179  /// calls to balance out.)
180  #[inline]
181  pub fn blocking_wait(&self) {
182    while self.inner.waitings.load(Ordering::SeqCst) != 0 {
183      let ln = self.inner.event.listen();
184      // Check the flag again after creating the listener.
185      if self.inner.waitings.load(Ordering::SeqCst) == 0 {
186        return;
187      }
188      ln.wait();
189    }
190  }
191
192  /// Like [`AsyncCloser::signal_and_wait`], but spawns and detach the waiting in a new task.
193  #[inline]
194  pub fn signal_and_wait_detach(&self) {
195    self.signal();
196    let wg = self.clone();
197    S::spawn_detach(async move {
198      wg.wait().await;
199    })
200  }
201}
202
203/// Gets signaled when [`AsyncCloser::signal`] is called.
204pub struct Notify(Receiver<()>);
205
206impl Notify {
207  /// Waits for the signal.
208  pub async fn wait(&self) {
209    let _ = self.0.recv().await;
210  }
211}