1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
//! Facility to wait for a dynamic set of tasks to complete, with a single
//! waiter and multiple waitees (things that are waited for). Notably, each
//! waitee can also start more work to be waited for.
//!
//! # Implementation Details
//!
//! The implementation of waiting in this module is just a wrapper around
//! [`tokio::sync::mpsc::channel`]. A [`WaitGroup`] holds the unique
//! [`tokio::sync::mpsc::Receiver`] and each [`WaitGuard`] holds a
//! [`tokio::sync::mpsc::Sender`]. Despite this simple implementation, the
//! [`WaitGroup`] and [`WaitGuard`] wrappers are useful to make this discoverable.
//!
//! # Example
//!
//! This example demonstrates use of the [`WaitGroup`] and [`WaitGuard`] to
//! (very inefficiently) compute the Fibonacci number `F(n)` using recursive channels.
//!
//! The given `waiter` will be used to detect when the work has finished and it will
//! close the channels. Additionally, `waiter` can be omitted to show that without
//! the [`WaitGroup`], the tasks would not terminate.
//!
//! ```rust
//! # use lychee_lib::waiter::{WaitGuard, WaitGroup};
//! # use futures::StreamExt;
//! # use tokio::sync::mpsc::{Receiver, Sender, channel};
//! # use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
//! # use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
//! #
//! # use std::time::Duration;
//! #
//! # fn timeout<F: IntoFuture>(fut: F) -> tokio::time::Timeout<F::IntoFuture> {
//! # tokio::time::timeout(Duration::from_millis(250), fut)
//! # }
//! #
//! # #[tokio::main]
//! # async fn main() {
//! async fn fibonacci_waiter_example(n: usize, waiter: Option<(WaitGroup, WaitGuard)>) -> usize {
//! let (send, recv) = unbounded_channel();
//! let (incr_count, recv_count) = channel(1);
//!
//! let (waiter, guard) = match waiter {
//! Some((waiter, guard)) => (Some(waiter), Some(guard)),
//! None => (None, None),
//! };
//!
//! let recursive_task = tokio::task::spawn({
//! let send = send.clone();
//! fibonacci_waiter_example_task(recv, send, incr_count, waiter)
//! });
//!
//! let count_task = tokio::task::spawn(async move {
//! ReceiverStream::new(recv_count).count().await
//! });
//!
//! send.send((guard, n)).expect("initial send"); // note `guard` must be moved!
//!
//! let ((), result) = futures::try_join!(recursive_task, count_task).expect("join");
//! result
//! }
//!
//! /// An inefficient Fibonacci implementation. This computes `F(n)` by sending
//! /// by `n-1` and `n-2` back into the channel. This shows how one work item can
//! /// create multiple subsequent work items.
//! async fn fibonacci_waiter_example_task(
//! recv: UnboundedReceiver<(Option<WaitGuard>, usize)>,
//! send: UnboundedSender<(Option<WaitGuard>, usize)>,
//! incr_count: Sender<()>,
//! waiter: Option<WaitGroup>,
//! ) {
//! let stream = UnboundedReceiverStream::new(recv);
//! let stream = match waiter {
//! Some(waiter) => stream.take_until(waiter.wait()).left_stream(),
//! None => stream.right_stream(),
//! };
//!
//! stream
//! .for_each(async |(guard, n)| match n {
//! 0 => (),
//! 1 => incr_count.send(()).await.expect("send incr"),
//! n => {
//! send.send((guard.clone(), n - 1)).expect("send 1");
//! send.send((guard.clone(), n - 2)).expect("send 2");
//! }
//! })
//! .await;
//! }
//!
//! // basic termination works as expected and computes the right result.
//! assert_eq!(fibonacci_waiter_example(0, Some(WaitGroup::new())).await, 0);
//! assert_eq!(fibonacci_waiter_example(9, Some(WaitGroup::new())).await, 34);
//! assert_eq!(fibonacci_waiter_example(10, Some(WaitGroup::new())).await, 55);
//!
//! // task does not terminate if WaitGroup is not used, due to recursive channels.
//! assert!(timeout(fibonacci_waiter_example(9, None)).await.is_err());
//! // even a "trivial" case does not terminate.
//! assert!(timeout(fibonacci_waiter_example(0, None)).await.is_err());
//!
//! // in these tests, we do use a WaitGroup but it doesn't terminate because we
//! // *clone* the guard and the test function holds an extra guard, blocking
//! // WaitGroup from returning. this is an example of something that can go wrong
//! // when using the waiter.
//! let (waiter, guard) = WaitGroup::new();
//! assert!(timeout(fibonacci_waiter_example(9, Some((waiter, guard.clone()))))
//! .await.is_err());
//!
//! let (waiter, guard) = WaitGroup::new();
//! assert!(timeout(fibonacci_waiter_example(0, Some((waiter, guard.clone()))))
//! .await.is_err());
//! # }
//! ```
use Never;
use ;
/// Manager for a particular wait group. This can spawn a number of [`WaitGuard`]s
/// and it can then wait for them to all complete.
///
/// Each [`WaitGroup`] is single-use—calling [`WaitGroup::wait`] to start
/// waiting consumes the [`WaitGroup`]. Additionally, once all [`WaitGuard`]s
/// have been dropped, it is not possible to create any more [`WaitGuard`]s.
/// RAII guard held by a task which is being waited for.
///
/// The existence of values of this type represents outstanding work for
/// its corresponding [`WaitGroup`].
///
/// A [`WaitGuard`] can be cloned using [`WaitGuard::clone`]. This allows
/// a task to spawn additional tasks, recursively.