strut_sync/
latch.rs

1use tokio_util::sync::CancellationToken;
2
3/// A synchronization primitive that can be released exactly once, notifying all
4/// associated [`Gate`]s. This is intended for one-shot notifications or
5/// barriers in asynchronous contexts.
6///
7/// ## Simple example
8///
9/// ```
10/// use strut_sync::{Gate, Latch};
11///
12/// # tokio_test::block_on(async {
13///
14/// // Make a latch
15/// let latch = Latch::new();
16///
17/// // Derive a gate from it
18/// let gate = latch.gate();
19///
20/// // Spawn an asynchronous task
21/// tokio::spawn(async move {
22///     // Perform some asynchronous work
23///     println!("This will print first");
24///
25///     // Signal completion
26///     latch.release();
27/// });
28///
29/// // Wait for the completion signal
30/// gate.opened().await;
31///
32/// println!("Asynchronous task completed!")
33/// # })
34/// ```
35///
36/// ## Full example
37///
38/// ```
39/// use std::sync::Arc;
40/// use std::sync::atomic::{AtomicU8, Ordering};
41/// use strut_sync::{Gate, Latch};
42/// use pretty_assertions::assert_eq;
43///
44/// # tokio_test::block_on(async {
45///
46/// // Make a latch
47/// let latch = Latch::new();
48///
49/// // Derive any number of gates from it
50/// let gate_a = latch.gate();
51/// let gate_b = latch.gate();
52/// let gate_c = gate_b.clone();
53///
54/// // Create a marker
55/// let marker = Arc::new(AtomicU8::new(0));
56///
57/// // Spawn tasks that increment the marker
58/// tokio::spawn(increment_marker(gate_a, marker.clone()));
59/// tokio::spawn(increment_marker(gate_b, marker.clone()));
60/// tokio::spawn(increment_marker(gate_c, marker.clone()));
61///
62/// // Give the tasks a chance to start waiting
63/// tokio::task::yield_now().await;
64///
65/// // Nothing should have happened yet
66/// assert_eq!(marker.load(Ordering::Relaxed), 0);
67///
68/// // Release the latch
69/// latch.release();
70///
71/// // Give the tasks a chance to wake up
72/// tokio::task::yield_now().await;
73///
74/// // Marker should have been increased three times by now
75/// assert_eq!(marker.load(Ordering::Relaxed), 3);
76///
77/// # });
78///
79/// // Helper function
80/// async fn increment_marker(gate: Gate, marker: Arc<AtomicU8>) {
81///     // Wait for the gate to open
82///     gate.opened().await;
83///
84///     // Increment the marker
85///     marker.fetch_add(1, Ordering::Relaxed);
86/// }
87/// ```
88#[derive(Debug, Default, Clone)]
89pub struct Latch {
90    token: CancellationToken,
91}
92
93/// A single-release barrier that is [opened](Gate::opened) when the associated
94/// [`Latch`] is [released](Latch::release).
95///
96/// This gate can be cheaply cloned and awaited on by any number of asynchronous
97/// tasks at any time.
98#[derive(Debug, Clone)]
99pub struct Gate {
100    token: CancellationToken,
101}
102
103impl Latch {
104    /// Returns a brand new, unreleased [`Latch`].
105    pub fn new() -> Self {
106        let token = CancellationToken::new();
107
108        Self { token }
109    }
110
111    /// Returns a new [`Gate`] handle associated with this [`Latch`]. Multiple
112    /// gates can be created and awaited independently, all linked to the same
113    /// single-release latch.
114    pub fn gate(&self) -> Gate {
115        Gate {
116            token: self.token.clone(),
117        }
118    }
119
120    /// Permanently releases this [`Latch`], notifying all associated [`Gate`]s.
121    /// Subsequent calls have no additional effect.
122    pub fn release(&self) {
123        self.token.cancel();
124    }
125}
126
127impl Gate {
128    /// Waits asynchronously until the associated [`Latch`] is
129    /// [released](Latch::release). Resolves immediately if the latch has
130    /// already been released.
131    pub async fn opened(&self) {
132        self.token.cancelled().await;
133    }
134
135    /// Reports whether the associated [`Latch`] has been released.
136    pub fn is_open(&self) -> bool {
137        self.token.is_cancelled()
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use std::sync::atomic::{AtomicBool, Ordering};
145    use std::sync::Arc;
146    use std::time::Duration;
147
148    #[tokio::test]
149    async fn test_work_and_complete() {
150        let (latch, gate, marker) = make_objects();
151
152        tokio::spawn(work_and_release(latch));
153        tokio::spawn(await_opened_and_flip_marker(gate, marker.clone()));
154
155        sleep_a_little().await;
156
157        assert!(marker.load(Ordering::Relaxed));
158    }
159
160    #[tokio::test]
161    async fn test_multi_work_and_complete() {
162        let (latch, gate, marker) = make_objects();
163
164        tokio::spawn(work_and_release(latch.clone()));
165        tokio::spawn(work_and_release(latch.clone()));
166        tokio::spawn(await_opened_and_flip_marker(gate, marker.clone()));
167
168        sleep_a_little().await;
169
170        assert!(marker.load(Ordering::Relaxed));
171    }
172
173    #[tokio::test]
174    async fn test_work_and_complete_reordered() {
175        let (latch, gate, marker) = make_objects();
176
177        tokio::spawn(await_opened_and_flip_marker(gate, marker.clone()));
178        tokio::spawn(work_and_release(latch));
179
180        sleep_a_little().await;
181
182        assert!(marker.load(Ordering::Relaxed));
183    }
184
185    #[tokio::test]
186    async fn test_multi_monitor_work_and_complete() {
187        let latch = Latch::new();
188        let gate_a = latch.gate();
189        let gate_b = gate_a.clone();
190        let marker_a = Arc::new(AtomicBool::new(false));
191        let marker_b = Arc::new(AtomicBool::new(false));
192
193        tokio::spawn(work_and_release(latch));
194        tokio::spawn(await_opened_and_flip_marker(gate_a, marker_a.clone()));
195        tokio::spawn(await_opened_and_flip_marker(gate_b, marker_b.clone()));
196
197        sleep_a_little().await;
198
199        assert!(marker_a.load(Ordering::Relaxed));
200        assert!(marker_b.load(Ordering::Relaxed));
201    }
202
203    #[tokio::test]
204    async fn test_multi_completion_work_and_complete() {
205        let latch = Latch::new();
206        let gate = latch.gate();
207        let marker_a = Arc::new(AtomicBool::new(false));
208        let marker_b = Arc::new(AtomicBool::new(false));
209
210        tokio::spawn(work_a_lot(latch.clone()));
211        tokio::spawn(work_and_release(latch.clone()));
212        tokio::spawn(await_opened_and_flip_marker(gate.clone(), marker_a.clone()));
213        tokio::spawn(await_opened_and_flip_marker(gate.clone(), marker_b.clone()));
214
215        sleep_a_little().await;
216
217        assert!(marker_a.load(Ordering::Relaxed));
218        assert!(marker_b.load(Ordering::Relaxed));
219    }
220
221    fn make_objects() -> (Latch, Gate, Arc<AtomicBool>) {
222        let latch = Latch::new();
223        let gate = latch.gate();
224
225        (latch, gate, Arc::new(AtomicBool::new(false)))
226    }
227
228    async fn work_and_release(latch: Latch) {
229        tokio::time::sleep(Duration::from_millis(2)).await;
230        latch.release();
231        tokio::time::sleep(Duration::from_secs(3600)).await;
232    }
233
234    async fn work_a_lot(_completion: Latch) {
235        tokio::time::sleep(Duration::from_secs(3600)).await;
236    }
237
238    async fn await_opened_and_flip_marker(gate: Gate, marker: Arc<AtomicBool>) {
239        gate.opened().await;
240        marker.store(true, Ordering::Relaxed);
241    }
242
243    async fn sleep_a_little() {
244        tokio::time::sleep(Duration::from_millis(5)).await;
245    }
246}