1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//! Barrier for synchronizing multiple tasks.
//!
//! This module provides barrier synchronization that allows multiple tasks
//! to wait until all tasks reach the barrier point before proceeding.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::Notify;
/// Barrier for synchronizing multiple tasks.
#[derive(Debug, Clone)]
pub struct Barrier {
inner: Arc<BarrierInner>,
}
#[derive(Debug)]
struct BarrierInner {
count: AtomicUsize,
total: usize,
generation: AtomicUsize,
notify: Notify,
}
impl Barrier {
/// Creates a new barrier for the given number of tasks.
pub fn new(n: usize) -> Self {
Self {
inner: Arc::new(BarrierInner {
count: AtomicUsize::new(0),
total: n,
generation: AtomicUsize::new(0),
notify: Notify::new(),
}),
}
}
/// Waits for all tasks to reach the barrier.
pub async fn wait(&self) -> BarrierWaitResult {
let current_generation = self.inner.generation.load(Ordering::SeqCst);
let count = self.inner.count.fetch_add(1, Ordering::SeqCst) + 1;
if count == self.inner.total {
// Last task to arrive - reset and notify others
self.inner.count.store(0, Ordering::SeqCst);
self.inner.generation.fetch_add(1, Ordering::SeqCst);
self.inner.notify.notify_waiters();
BarrierWaitResult { is_leader: true }
} else {
// Wait for the barrier to be released
loop {
self.inner.notify.notified().await;
if self.inner.generation.load(Ordering::SeqCst) != current_generation {
break;
}
}
BarrierWaitResult { is_leader: false }
}
}
}
/// Result of waiting on a barrier.
pub struct BarrierWaitResult {
/// True if this task was the last to reach the barrier.
pub is_leader: bool,
}