1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//! Utilities to wait for asynchronous primitives to resolve.
use futures::{channel::oneshot, future::join_all};
use or_poisoned::OrPoisoned;
use std::{
future::Future,
sync::{mpsc, OnceLock, RwLock},
};
static TRANSITION: OnceLock<RwLock<Option<TransitionInner>>> = OnceLock::new();
fn global_transition() -> &'static RwLock<Option<TransitionInner>> {
TRANSITION.get_or_init(|| RwLock::new(None))
}
#[derive(Debug, Clone)]
struct TransitionInner {
tx: mpsc::Sender<oneshot::Receiver<()>>,
}
/// Transitions allow you to wait for all asynchronous resources created during them to resolve.
#[derive(Debug)]
pub struct AsyncTransition;
impl AsyncTransition {
/// Calls the `action` function, and returns a `Future` that resolves when any
/// [`AsyncDerived`](crate::computed::AsyncDerived) or
/// or [`ArcAsyncDerived`](crate::computed::ArcAsyncDerived) that is read during the action
/// has resolved.
///
/// This allows for an inversion of control: the caller does not need to know when all the
/// resources created inside the `action` will resolve, but can wait for them to notify it.
pub async fn run<T, U>(action: impl FnOnce() -> T) -> U
where
T: Future<Output = U>,
{
let (tx, rx) = mpsc::channel();
let global_transition = global_transition();
let inner = TransitionInner { tx };
let prev = Option::replace(
&mut *global_transition.write().or_poisoned(),
inner.clone(),
);
let value = action().await;
_ = std::mem::replace(
&mut *global_transition.write().or_poisoned(),
prev,
);
let mut pending = Vec::new();
while let Ok(tx) = rx.try_recv() {
pending.push(tx);
}
join_all(pending).await;
value
}
pub(crate) fn register(rx: oneshot::Receiver<()>) {
if let Some(tx) = global_transition()
.read()
.or_poisoned()
.as_ref()
.map(|n| &n.tx)
{
// if it's an Err, that just means the Receiver was dropped
// i.e., the transition is no longer listening, in which case it doesn't matter if we
// successfully register with it or not
_ = tx.send(rx);
}
}
}