1use tokio_util::sync::CancellationToken;
2
3#[derive(Debug, Default, Clone)]
89pub struct Latch {
90 token: CancellationToken,
91}
92
93#[derive(Debug, Clone)]
99pub struct Gate {
100 token: CancellationToken,
101}
102
103impl Latch {
104 pub fn new() -> Self {
106 let token = CancellationToken::new();
107
108 Self { token }
109 }
110
111 pub fn gate(&self) -> Gate {
115 Gate {
116 token: self.token.clone(),
117 }
118 }
119
120 pub fn release(&self) {
123 self.token.cancel();
124 }
125}
126
127impl Gate {
128 pub async fn opened(&self) {
132 self.token.cancelled().await;
133 }
134
135 pub fn is_open(&self) -> bool {
137 self.token.is_cancelled()
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use std::sync::atomic::{AtomicBool, Ordering};
145 use std::sync::Arc;
146 use std::time::Duration;
147
148 #[tokio::test]
149 async fn test_work_and_complete() {
150 let (latch, gate, marker) = make_objects();
151
152 tokio::spawn(work_and_release(latch));
153 tokio::spawn(await_opened_and_flip_marker(gate, marker.clone()));
154
155 sleep_a_little().await;
156
157 assert!(marker.load(Ordering::Relaxed));
158 }
159
160 #[tokio::test]
161 async fn test_multi_work_and_complete() {
162 let (latch, gate, marker) = make_objects();
163
164 tokio::spawn(work_and_release(latch.clone()));
165 tokio::spawn(work_and_release(latch.clone()));
166 tokio::spawn(await_opened_and_flip_marker(gate, marker.clone()));
167
168 sleep_a_little().await;
169
170 assert!(marker.load(Ordering::Relaxed));
171 }
172
173 #[tokio::test]
174 async fn test_work_and_complete_reordered() {
175 let (latch, gate, marker) = make_objects();
176
177 tokio::spawn(await_opened_and_flip_marker(gate, marker.clone()));
178 tokio::spawn(work_and_release(latch));
179
180 sleep_a_little().await;
181
182 assert!(marker.load(Ordering::Relaxed));
183 }
184
185 #[tokio::test]
186 async fn test_multi_monitor_work_and_complete() {
187 let latch = Latch::new();
188 let gate_a = latch.gate();
189 let gate_b = gate_a.clone();
190 let marker_a = Arc::new(AtomicBool::new(false));
191 let marker_b = Arc::new(AtomicBool::new(false));
192
193 tokio::spawn(work_and_release(latch));
194 tokio::spawn(await_opened_and_flip_marker(gate_a, marker_a.clone()));
195 tokio::spawn(await_opened_and_flip_marker(gate_b, marker_b.clone()));
196
197 sleep_a_little().await;
198
199 assert!(marker_a.load(Ordering::Relaxed));
200 assert!(marker_b.load(Ordering::Relaxed));
201 }
202
203 #[tokio::test]
204 async fn test_multi_completion_work_and_complete() {
205 let latch = Latch::new();
206 let gate = latch.gate();
207 let marker_a = Arc::new(AtomicBool::new(false));
208 let marker_b = Arc::new(AtomicBool::new(false));
209
210 tokio::spawn(work_a_lot(latch.clone()));
211 tokio::spawn(work_and_release(latch.clone()));
212 tokio::spawn(await_opened_and_flip_marker(gate.clone(), marker_a.clone()));
213 tokio::spawn(await_opened_and_flip_marker(gate.clone(), marker_b.clone()));
214
215 sleep_a_little().await;
216
217 assert!(marker_a.load(Ordering::Relaxed));
218 assert!(marker_b.load(Ordering::Relaxed));
219 }
220
221 fn make_objects() -> (Latch, Gate, Arc<AtomicBool>) {
222 let latch = Latch::new();
223 let gate = latch.gate();
224
225 (latch, gate, Arc::new(AtomicBool::new(false)))
226 }
227
228 async fn work_and_release(latch: Latch) {
229 tokio::time::sleep(Duration::from_millis(2)).await;
230 latch.release();
231 tokio::time::sleep(Duration::from_secs(3600)).await;
232 }
233
234 async fn work_a_lot(_completion: Latch) {
235 tokio::time::sleep(Duration::from_secs(3600)).await;
236 }
237
238 async fn await_opened_and_flip_marker(gate: Gate, marker: Arc<AtomicBool>) {
239 gate.opened().await;
240 marker.store(true, Ordering::Relaxed);
241 }
242
243 async fn sleep_a_little() {
244 tokio::time::sleep(Duration::from_millis(5)).await;
245 }
246}