1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
//! Integration test for request-table concurrency under MPI_THREAD_MULTIPLE.
//!
//! Spawns 4 std::thread workers per MPI rank; each thread runs 100 iterations
//! of isend + irecv + wait, exercising concurrent alloc_request / free_request
//! calls against the C11-atomic request table implemented in csrc/ferrompi.c.
//!
//! With 2 MPI ranks and 4 threads each, this generates 400 concurrent
//! alloc_request calls per rank, which is sufficient to race without
//! the C11 CAS fix and to confirm no slot is double-claimed with it.
//!
//! Run with: mpiexec -n 2 ./target/debug/examples/test_request_table_concurrency
//!
//! TSan manual verification (pre-release step, not CI-gated because libmpi
//! internals trigger false positives):
//! RUSTFLAGS="-Zsanitizer=thread" CFLAGS="-fsanitize=thread" \
//! cargo +nightly build --target x86_64-unknown-linux-gnu --examples && \
//! mpiexec -n 2 \
//! ./target/x86_64-unknown-linux-gnu/debug/examples/test_request_table_concurrency
//! Expected result: no TSan diagnostics from ferrompi C code; any reports
//! from libmpi internals (allreduce, progress engine) are known benign.
use ferrompi::{Mpi, ThreadLevel};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
const NUM_THREADS: usize = 4;
const ITERATIONS: usize = 100;
fn main() {
let mpi = Mpi::init_thread(ThreadLevel::Multiple).expect("MPI init failed");
// If the MPI library cannot provide MPI_THREAD_MULTIPLE, skip gracefully.
// Some builds (e.g. certain Cray MPT configurations) deliberately refuse it.
if mpi.thread_level() < ThreadLevel::Multiple {
println!(
"SKIP: MPI provided {:?}, MPI_THREAD_MULTIPLE required; skipping test",
mpi.thread_level()
);
return;
}
let world = mpi.world();
let rank = world.rank();
let size = world.size();
assert!(
size >= 2,
"test_request_table_concurrency requires at least 2 MPI ranks, got {size}"
);
// partner: rank 0 communicates with rank 1; all other pairs mirror this pattern.
// With mpiexec -n 2 there are exactly ranks 0 and 1.
let partner = if rank % 2 == 0 {
(rank + 1) % size
} else {
rank - 1
};
// Shared error flag: set to true by any thread that encounters a failure.
let any_error = Arc::new(AtomicBool::new(false));
// std::thread::scope borrows `world` and `any_error` for all workers.
// Using references rather than move captures keeps `world` alive after
// the scope so that the barrier and allreduce below can still use it.
std::thread::scope(|s| {
for thread_id in 0..NUM_THREADS {
// Clone the Arc before the closure; the closure borrows everything
// else by reference via the scoped-thread lifetime guarantee.
let any_error_ref = Arc::clone(&any_error);
let world_ref = &world;
s.spawn(move || {
// Each (rank, thread_id) pair uses a unique tag to avoid
// cross-thread message matching: tags are in [1000, 1000+NUM_THREADS).
// Rank 0's thread t sends tag (1000 + t) and receives the same tag
// from rank 1, which mirrors the same tag in its own loop.
let tag = 1000 + thread_id as i32;
for _iter in 0..ITERATIONS {
let send_buf: [u8; 4] = [rank as u8; 4];
let mut recv_buf: [u8; 4] = [0u8; 4];
// Post irecv before isend to avoid potential deadlock.
let recv_req = match world_ref.irecv(&mut recv_buf, partner, tag) {
Ok(r) => r,
Err(e) => {
eprintln!("rank {rank} thread {thread_id}: irecv failed: {e}");
any_error_ref.store(true, Ordering::Relaxed);
return;
}
};
let send_req = match world_ref.isend(&send_buf, partner, tag) {
Ok(r) => r,
Err(e) => {
eprintln!("rank {rank} thread {thread_id}: isend failed: {e}");
any_error_ref.store(true, Ordering::Relaxed);
return;
}
};
if let Err(e) = send_req.wait() {
eprintln!("rank {rank} thread {thread_id}: isend wait failed: {e}");
any_error_ref.store(true, Ordering::Relaxed);
return;
}
if let Err(e) = recv_req.wait() {
eprintln!("rank {rank} thread {thread_id}: irecv wait failed: {e}");
any_error_ref.store(true, Ordering::Relaxed);
return;
}
}
});
}
// All threads join here (scope exit).
});
// Aggregate error state across ranks BEFORE any process exits. If any
// rank exited early, surviving ranks would deadlock at this allreduce —
// so the exit-on-failure path runs after the collective, never before.
let local_ok: i32 = if any_error.load(Ordering::Acquire) {
0
} else {
1
};
let global_ok = world
.allreduce_scalar(local_ok, ferrompi::ReduceOp::Min)
.expect("allreduce_scalar failed");
if global_ok == 0 {
if rank == 0 {
eprintln!("FAIL: at least one rank reported a thread error");
}
std::process::exit(1);
}
if rank == 0 {
println!("OK: request table concurrency test passed");
}
}