async_barrier/lib.rs
1//! DO NOT USE!
2//!
3//! This crate was merged into [async-lock], which provides the API this crate used to.
4//!
5//! [async-lock]: https://crates.io/crates/async-lock
6
7#![forbid(unsafe_code)]
8#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
9
10use async_mutex::Mutex;
11use event_listener::Event;
12
13/// A counter to synchronize multiple tasks at the same time.
14#[derive(Debug)]
15pub struct Barrier {
16 n: usize,
17 state: Mutex<State>,
18 event: Event,
19}
20
21#[derive(Debug)]
22struct State {
23 count: usize,
24 generation_id: u64,
25}
26
27impl Barrier {
28 /// Creates a barrier that can block the given number of tasks.
29 ///
30 /// A barrier will block `n`-1 tasks which call [`wait()`] and then wake up all tasks
31 /// at once when the `n`th task calls [`wait()`].
32 ///
33 /// [`wait()`]: `Barrier::wait()`
34 ///
35 /// # Examples
36 ///
37 /// ```
38 /// use async_barrier::Barrier;
39 ///
40 /// let barrier = Barrier::new(5);
41 /// ```
42 pub const fn new(n: usize) -> Barrier {
43 Barrier {
44 n,
45 state: Mutex::new(State {
46 count: 0,
47 generation_id: 0,
48 }),
49 event: Event::new(),
50 }
51 }
52
53 /// Blocks the current task until all tasks reach this point.
54 ///
55 /// Barriers are reusable after all tasks have synchronized, and can be used continuously.
56 ///
57 /// Returns a [`BarrierWaitResult`] indicating whether this task is the "leader", meaning the
58 /// last task to call this method.
59 ///
60 /// # Examples
61 ///
62 /// ```
63 /// use async_barrier::Barrier;
64 /// use futures_lite::future;
65 /// use std::sync::Arc;
66 /// use std::thread;
67 ///
68 /// let barrier = Arc::new(Barrier::new(5));
69 ///
70 /// for _ in 0..5 {
71 /// let b = barrier.clone();
72 /// thread::spawn(move || {
73 /// future::block_on(async {
74 /// // The same messages will be printed together.
75 /// // There will NOT be interleaving of "before" and "after".
76 /// println!("before wait");
77 /// b.wait().await;
78 /// println!("after wait");
79 /// });
80 /// });
81 /// }
82 /// ```
83 pub async fn wait(&self) -> BarrierWaitResult {
84 let mut state = self.state.lock().await;
85 let local_gen = state.generation_id;
86 state.count += 1;
87
88 if state.count < self.n {
89 while local_gen == state.generation_id && state.count < self.n {
90 let listener = self.event.listen();
91 drop(state);
92 listener.await;
93 state = self.state.lock().await;
94 }
95 BarrierWaitResult { is_leader: false }
96 } else {
97 state.count = 0;
98 state.generation_id = state.generation_id.wrapping_add(1);
99 self.event.notify(std::usize::MAX);
100 BarrierWaitResult { is_leader: true }
101 }
102 }
103}
104
105/// Returned by [`Barrier::wait()`] when all tasks have called it.
106///
107/// # Examples
108///
109/// ```
110/// # futures_lite::future::block_on(async {
111/// use async_barrier::Barrier;
112///
113/// let barrier = Barrier::new(1);
114/// let barrier_wait_result = barrier.wait().await;
115/// # });
116/// ```
117#[derive(Debug, Clone)]
118pub struct BarrierWaitResult {
119 is_leader: bool,
120}
121
122impl BarrierWaitResult {
123 /// Returns `true` if this task was the last to call to [`Barrier::wait()`].
124 ///
125 /// # Examples
126 ///
127 /// ```
128 /// # futures_lite::future::block_on(async {
129 /// use async_barrier::Barrier;
130 /// use futures_lite::future;
131 ///
132 /// let barrier = Barrier::new(2);
133 /// let (a, b) = future::zip(barrier.wait(), barrier.wait()).await;
134 /// assert_eq!(a.is_leader(), false);
135 /// assert_eq!(b.is_leader(), true);
136 /// # });
137 /// ```
138 pub fn is_leader(&self) -> bool {
139 self.is_leader
140 }
141}