#![cfg(all(feature = "_any_backend", feature = "macros", feature = "sync"))]
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use switchy_async::sync::Barrier;
#[switchy_async::test]
async fn test_barrier_basic_functionality() {
let barrier = Arc::new(Barrier::new(3));
let mut handles = vec![];
for i in 0..3 {
let b = barrier.clone();
handles.push(switchy_async::task::spawn(async move {
let result = b.wait().await;
(i, result.is_leader())
}));
}
let results: Vec<_> = futures::future::join_all(handles)
.await
.into_iter()
.map(Result::unwrap)
.collect();
let leader_count = results.iter().filter(|(_, is_leader)| *is_leader).count();
assert_eq!(leader_count, 1);
assert_eq!(results.len(), 3);
}
#[switchy_async::test]
async fn test_barrier_reusability() {
let barrier = Arc::new(Barrier::new(2));
let b1 = barrier.clone();
let b2 = barrier.clone();
let (r1, r2) = switchy_async::join!(
async move { b1.wait().await },
async move { b2.wait().await }
);
assert_ne!(r1.is_leader(), r2.is_leader());
let b1 = barrier.clone();
let b2 = barrier.clone();
let (r3, r4) = switchy_async::join!(
async move { b1.wait().await },
async move { b2.wait().await }
);
assert_ne!(r3.is_leader(), r4.is_leader());
let b1 = barrier.clone();
let b2 = barrier.clone();
let (r5, r6) = switchy_async::join!(
async move { b1.wait().await },
async move { b2.wait().await }
);
assert_ne!(r5.is_leader(), r6.is_leader());
}
#[switchy_async::test]
async fn test_barrier_single_task() {
let barrier = Barrier::new(1);
let result = barrier.wait().await;
assert!(result.is_leader());
}
#[switchy_async::test]
async fn test_barrier_large_group() {
let n = 10;
let barrier = Arc::new(Barrier::new(n));
let mut handles = vec![];
for i in 0..n {
let b = barrier.clone();
handles.push(switchy_async::task::spawn(async move {
let result = b.wait().await;
(i, result.is_leader())
}));
}
let results: Vec<_> = futures::future::join_all(handles)
.await
.into_iter()
.map(Result::unwrap)
.collect();
let leader_count = results.iter().filter(|(_, is_leader)| *is_leader).count();
assert_eq!(leader_count, 1);
assert_eq!(results.len(), n);
}
#[switchy_async::test]
async fn test_barrier_synchronization() {
let barrier = Arc::new(Barrier::new(3));
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for i in 0..3 {
let b = barrier.clone();
let c = counter.clone();
handles.push(switchy_async::task::spawn(async move {
let before = c.fetch_add(1, Ordering::SeqCst);
let result = b.wait().await;
let after = c.fetch_add(1, Ordering::SeqCst);
(i, before, after, result.is_leader())
}));
}
let results: Vec<_> = futures::future::join_all(handles)
.await
.into_iter()
.map(Result::unwrap)
.collect();
let max_before = results
.iter()
.map(|(_, before, _, _)| *before)
.max()
.unwrap();
let min_after = results.iter().map(|(_, _, after, _)| *after).min().unwrap();
assert!(
max_before < min_after,
"Before work: max={}, After work: min={}",
max_before,
min_after
);
let leader_count = results
.iter()
.filter(|(_, _, _, is_leader)| *is_leader)
.count();
assert_eq!(leader_count, 1);
}
#[switchy_async::test]
async fn test_barrier_with_async_work() {
let barrier = Arc::new(Barrier::new(2));
let results = Arc::new(std::sync::Mutex::new(Vec::new()));
let b1 = barrier.clone();
let r1 = results.clone();
let task1 = switchy_async::task::spawn(async move {
switchy_async::task::yield_now().await;
r1.lock().unwrap().push("task1_before".to_string());
let result = b1.wait().await;
r1.lock().unwrap().push("task1_after".to_string());
result.is_leader()
});
let b2 = barrier.clone();
let r2 = results.clone();
let task2 = switchy_async::task::spawn(async move {
switchy_async::task::yield_now().await;
r2.lock().unwrap().push("task2_before".to_string());
let result = b2.wait().await;
r2.lock().unwrap().push("task2_after".to_string());
result.is_leader()
});
let (leader1, leader2) = futures::future::join(task1, task2).await;
let leader1 = leader1.unwrap();
let leader2 = leader2.unwrap();
assert_ne!(leader1, leader2);
let events = results.lock().unwrap().clone();
assert_eq!(events.len(), 4);
let before_count = events
.iter()
.take(2)
.filter(|s| s.contains("before"))
.count();
let after_count = events
.iter()
.skip(2)
.filter(|s| s.contains("after"))
.count();
assert_eq!(before_count, 2);
assert_eq!(after_count, 2);
}
#[switchy_async::test]
async fn test_barrier_multiple_cycles() {
let barrier = Arc::new(Barrier::new(3));
let cycle_results = Arc::new(std::sync::Mutex::new(Vec::new()));
for cycle in 0..3 {
let mut handles = vec![];
for task_id in 0..3 {
let b = barrier.clone();
let results = cycle_results.clone();
handles.push(switchy_async::task::spawn(async move {
let result = b.wait().await;
results
.lock()
.unwrap()
.push((cycle, task_id, result.is_leader()));
result.is_leader()
}));
}
let leaders: Vec<_> = futures::future::join_all(handles)
.await
.into_iter()
.map(Result::unwrap)
.collect();
let leader_count = leaders.iter().filter(|&&x| x).count();
assert_eq!(leader_count, 1);
}
let all_results = cycle_results.lock().unwrap().clone();
assert_eq!(all_results.len(), 9);
for cycle in 0..3 {
let cycle_leaders = all_results
.iter()
.filter(|(c, _, _)| *c == cycle)
.filter(|(_, _, is_leader)| *is_leader)
.count();
assert_eq!(cycle_leaders, 1);
}
}
#[switchy_async::test]
async fn test_barrier_stress() {
let n = 20;
let barrier = Arc::new(Barrier::new(n));
let mut handles = vec![];
for i in 0..n {
let b = barrier.clone();
handles.push(switchy_async::task::spawn(async move {
for _ in 0..i % 3 {
switchy_async::task::yield_now().await;
}
let result = b.wait().await;
(i, result.is_leader())
}));
}
let results: Vec<_> = futures::future::join_all(handles)
.await
.into_iter()
.map(Result::unwrap)
.collect();
let leader_count = results.iter().filter(|(_, is_leader)| *is_leader).count();
assert_eq!(leader_count, 1);
assert_eq!(results.len(), n);
}
#[test_log::test]
#[should_panic(expected = "barrier size must be positive")]
fn test_barrier_zero_size_panics() {
let _ = Barrier::new(0);
}