spin_sync/
barrier.rs

1// Copyright 2020 Shin Yoshida
2//
3// "LGPL-3.0-or-later OR Apache-2.0 OR BSD-2-Clause"
4//
5// This is part of spin-sync
6//
7//  spin-sync is free software: you can redistribute it and/or modify
8//  it under the terms of the GNU Lesser General Public License as published by
9//  the Free Software Foundation, either version 3 of the License, or
10//  (at your option) any later version.
11//
12//  spin-sync is distributed in the hope that it will be useful,
13//  but WITHOUT ANY WARRANTY; without even the implied warranty of
14//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15//  GNU Lesser General Public License for more details.
16//
17//  You should have received a copy of the GNU Lesser General Public License
18//  along with spin-sync.  If not, see <http://www.gnu.org/licenses/>.
19//
20//
21// Licensed under the Apache License, Version 2.0 (the "License");
22// you may not use this file except in compliance with the License.
23// You may obtain a copy of the License at
24//
25//     http://www.apache.org/licenses/LICENSE-2.0
26//
27// Unless required by applicable law or agreed to in writing, software
28// distributed under the License is distributed on an "AS IS" BASIS,
29// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
30// See the License for the specific language governing permissions and
31// limitations under the License.
32//
33//
34// Redistribution and use in source and binary forms, with or without modification, are permitted
35// provided that the following conditions are met:
36//
37// 1. Redistributions of source code must retain the above copyright notice, this list of
38//    conditions and the following disclaimer.
39// 2. Redistributions in binary form must reproduce the above copyright notice, this
40//    list of conditions and the following disclaimer in the documentation and/or other
41//    materials provided with the distribution.
42//
43// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
44// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
45// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
46// IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
47// INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
48// NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
49// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
50// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
51// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
52// POSSIBILITY OF SUCH DAMAGE.
53
54use crate::misc::{PhantomBarrier, PhantomBarrierWaitResult};
55use std::fmt;
56use std::sync::atomic::{AtomicUsize, Ordering};
57
58/// A barrier enables multiple threads to synchronize the beginning
59/// of some computation.
60///
61/// The behavior is same to `std::sync::Barrier` except for this uses spinlock.
62///
63/// Unlike to `std::sync::Barrier::new`, the constructor [`Barrier.new`] is a const function;
64/// i.e. static [`Barrier`] variable can be declared.
65///
66/// [`Barrier`]: struct.Barrier.html
67/// [`Barrier.new`]: #method.new
68///
69/// # Examples
70///
71/// ```
72/// use spin_sync::Barrier;
73/// use std::thread;
74///
75/// static NUM_THREADS: usize = 10;
76/// static BARRIER: Barrier = Barrier::new(NUM_THREADS);
77///
78/// let mut handles = Vec::with_capacity(10);
79/// for _ in 0..10 {
80///     // The same messages will be printed together.
81///     // You will NOT see any interleaving.
82///     handles.push(thread::spawn(move|| {
83///         println!("before wait");
84///         BARRIER.wait();
85///         println!("after wait");
86///     }));
87/// }
88/// // Wait for other threads to finish.
89/// for handle in handles {
90///     handle.join().unwrap();
91/// }
92/// ```
93///
94/// Once all the threads have finished to wait, `Barrier` is reinitialized.
95/// The same instance can be used again.
96///
97/// ```
98/// use spin_sync::Barrier;
99/// use std::thread;
100///
101/// static NUM_THREADS: usize = 10;
102/// static BARRIER: Barrier = Barrier::new(NUM_THREADS);
103///
104/// fn wait_and_reinitialize() {
105///     let mut handles = Vec::with_capacity(10);
106///     for _ in 0..10 {
107///         // The same messages will be printed together.
108///         // You will NOT see any interleaving.
109///         handles.push(thread::spawn(move|| {
110///             println!("before wait");
111///             BARRIER.wait();
112///             println!("after wait");
113///         }));
114///     }
115///     // Wait for other threads to finish.
116///     for handle in handles {
117///         handle.join().unwrap();
118///     }
119/// }
120///
121/// fn main() {
122///     // First use.
123///     wait_and_reinitialize();
124///     // Second use.
125///     wait_and_reinitialize();
126/// }
127/// ```
128///
129/// If 0 or 1 is passed to `Barrier::new`, the instance will never block.
130///
131/// ```
132/// use spin_sync::Barrier;
133/// use std::thread;
134///
135/// static BARRIER0: Barrier = Barrier::new(0);
136/// static BARRIER1: Barrier = Barrier::new(1);
137///
138/// BARRIER0.wait();
139/// BARRIER1.wait();
140/// ```
141pub struct Barrier {
142    num_threads: usize, // immutable
143    count: AtomicUsize,
144    generation_id: AtomicUsize, // MSB plays lock flag role.
145    _phantom: PhantomBarrier,
146}
147
148impl Barrier {
149    /// Creates a new barrier that can block a given number of threads.
150    ///
151    /// Unlike to `std::sync::Barrier::new`, this function is const; i.e.
152    /// static [`Barrier`] variable can be declared.
153    ///
154    /// A barrier will block `n`-1 threads which call [`wait`] and then wake up
155    /// all threads at once when the `n`th thread calls [`wait`].
156    ///
157    /// [`Barrier`]: struct.Barrier.html
158    /// [`wait`]: #method.wait
159    ///
160    /// # Examples
161    ///
162    /// Declaring [`Barrier`] instance as a local variable.
163    ///
164    /// ```
165    /// use spin_sync::Barrier;
166    ///
167    /// let barrier = Barrier::new(10);
168    /// ```
169    ///
170    /// Declaring static [`Barrier`] variable.
171    ///
172    /// ```
173    /// use spin_sync::Barrier;
174    ///
175    /// static BARRIER: Barrier = Barrier::new(5);
176    /// ```
177    pub const fn new(n: usize) -> Self {
178        Self {
179            num_threads: n,
180            count: AtomicUsize::new(0),
181            generation_id: AtomicUsize::new(0),
182            _phantom: PhantomBarrier {},
183        }
184    }
185
186    /// Blocks the current thread until all threads have rendezvoused here.
187    ///
188    /// Barriers are re-usable after all threads have rendezvoused once, and can
189    /// be used continuously.
190    ///
191    /// A single (arbitrary) thread will receive a [`BarrierWaitResult`] that
192    /// returns `true` from [`is_leader`] when returning from this function, and
193    /// all other threads will receive a result that will return `false` from
194    /// [`is_leader`].
195    ///
196    /// [`BarrierWaitResult`]: struct.BarrierWaitResult.html
197    /// [`is_leader`]: struct.BarrierWaitResult.html#method.is_leader
198    ///
199    /// # Examples
200    ///
201    /// ```
202    /// use spin_sync::Barrier;
203    /// use std::thread;
204    ///
205    /// static NUM_THREADS: usize = 10;
206    /// static BARRIER: Barrier = Barrier::new(NUM_THREADS);
207    ///
208    /// let mut handles = Vec::with_capacity(10);
209    /// for _ in 0..10 {
210    ///     // The same messages will be printed together.
211    ///     // You will NOT see any interleaving.
212    ///     handles.push(thread::spawn(move|| {
213    ///         println!("before wait");
214    ///         BARRIER.wait();
215    ///         println!("after wait");
216    ///     }));
217    /// }
218    /// // Wait for other threads to finish.
219    /// for handle in handles {
220    ///     handle.join().unwrap();
221    /// }
222    /// ```
223    pub fn wait(&self) -> BarrierWaitResult {
224        let (guard, generation_id) = self.lock();
225
226        let count = self.count.load(Ordering::Relaxed) + 1;
227        self.count.store(count, Ordering::Relaxed);
228
229        if count < self.num_threads {
230            // Unlock and waiting for the leader reinitialize self.
231            drop(guard);
232
233            loop {
234                let mut current_id = self.generation_id.load(Ordering::Relaxed);
235                if (current_id & BarrierLockGuard::MSB) != 0 {
236                    current_id = current_id - BarrierLockGuard::MSB;
237                }
238
239                if generation_id != current_id {
240                    return BarrierWaitResult(false, PhantomBarrierWaitResult {});
241                } else {
242                    std::thread::yield_now();
243                }
244            }
245        } else {
246            // This thread will be the leader.
247            // Reinitialize self and return immediately.
248            self.count.store(0, Ordering::Relaxed);
249
250            // The other waiting threads judge whether reinitialized or not from generation_id.
251            // After generation_id was updated, they stop to block and return.
252            // However, the next wait() won't be started because this thread still owns the lock.
253            let generation_id = (generation_id + 1) | BarrierLockGuard::MSB;
254            self.generation_id.store(generation_id, Ordering::Relaxed);
255
256            // Release the lock.
257            drop(guard);
258
259            BarrierWaitResult(true, PhantomBarrierWaitResult {})
260        }
261    }
262
263    fn lock(&self) -> (BarrierLockGuard, usize) {
264        // Acquire lock
265        let mut expected = 0;
266        loop {
267            let desired = expected + BarrierLockGuard::MSB;
268
269            let current = self
270                .generation_id
271                .compare_and_swap(expected, desired, Ordering::Acquire);
272
273            if current == expected {
274                // Succeeded to lock
275                break;
276            } else {
277                // Failed to lock.
278                // Retry.
279                if (current & BarrierLockGuard::MSB) != 0 {
280                    // Another thread is holding the lock.
281                    // Wait for a while and retry.
282                    expected = current - BarrierLockGuard::MSB;
283                    std::thread::yield_now();
284                } else {
285                    // Just the first assumption was wrong.
286                    // Retry immediately.
287                    expected = current;
288                }
289            }
290        }
291
292        (
293            BarrierLockGuard {
294                generation_id: &self.generation_id,
295            },
296            expected,
297        )
298    }
299}
300
301impl fmt::Debug for Barrier {
302    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303        f.pad("Barrier { .. }")
304    }
305}
306
307pub struct BarrierWaitResult(bool, PhantomBarrierWaitResult);
308
309impl BarrierWaitResult {
310    /// Returns `true` if this thread from [`wait`] is the "leader thread".
311    ///
312    /// Only one thread will have `true` returned from their result, all other
313    /// threads will have `false` returned.
314    ///
315    /// [`wait`]: struct.Barrier.html#method.wait
316    ///
317    /// # Examples
318    ///
319    /// ```
320    /// use std::sync::Barrier;
321    ///
322    /// let barrier = Barrier::new(1);
323    /// let barrier_wait_result = barrier.wait();
324    /// assert!(barrier_wait_result.is_leader());
325    /// ```
326    pub fn is_leader(&self) -> bool {
327        self.0
328    }
329}
330
331impl fmt::Debug for BarrierWaitResult {
332    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333        f.debug_struct("BarrierWaitResult")
334            .field("is_leader", &self.is_leader())
335            .finish()
336    }
337}
338
339struct BarrierLockGuard<'a> {
340    generation_id: &'a AtomicUsize,
341}
342
343impl BarrierLockGuard<'_> {
344    pub const MSB: usize = usize::MAX / 2 + 1;
345}
346
347impl Drop for BarrierLockGuard<'_> {
348    // Make sure to unlock
349    fn drop(&mut self) {
350        let current = self.generation_id.load(Ordering::Relaxed);
351        debug_assert_eq!(Self::MSB, current & Self::MSB);
352
353        let desired = current - Self::MSB;
354        self.generation_id.store(desired, Ordering::Release);
355    }
356}