1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
//! Integration test for comm-table concurrency under MPI_THREAD_MULTIPLE.
//!
//! Spawns 4 std::thread workers on a single MPI rank; each thread calls
//! `world.duplicate()` 50 times and collects all 50 live Communicator objects
//! before appending their handles to a shared list. All 200 communicators are
//! kept alive until every thread has finished allocating, so the handles are
//! all simultaneously resident in the table when we check for duplicates.
//! Any duplicate handle means two threads claimed the same `comm_table` slot
//! via a data race in `alloc_comm`.
//!
//! With 4 threads × 50 duplications = 200 concurrent alloc_comm calls, the
//! test races reliably without the C11 CAS fix and confirms no slot is
//! double-claimed with it. After the assertion, the communicators are dropped,
//! which exercises concurrent ferrompi_comm_free as well.
//!
//! Run with: mpiexec -n 1 ./target/debug/examples/test_comm_table_concurrency
//!
//! TSan manual verification (pre-release step, not CI-gated because libmpi
//! internals trigger false positives):
//! RUSTFLAGS="-Zsanitizer=thread" CFLAGS="-fsanitize=thread" \
//! cargo +nightly build --target x86_64-unknown-linux-gnu --examples && \
//! mpiexec -n 1 \
//! ./target/x86_64-unknown-linux-gnu/debug/examples/test_comm_table_concurrency
//! Expected result: no TSan diagnostics from ferrompi C code; any reports
//! from libmpi internals are known benign.
use ferrompi::{Communicator, Mpi, ThreadLevel};
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
const NUM_THREADS: usize = 4;
const DUPS_PER_THREAD: usize = 50;
const TOTAL_HANDLES: usize = NUM_THREADS * DUPS_PER_THREAD;
fn main() {
let mpi = Mpi::init_thread(ThreadLevel::Multiple).expect("MPI init failed");
// If the MPI library cannot provide MPI_THREAD_MULTIPLE, skip gracefully.
// Some builds (e.g. certain Cray MPT configurations) deliberately refuse it.
if mpi.thread_level() < ThreadLevel::Multiple {
println!(
"SKIP: MPI provided {:?}, MPI_THREAD_MULTIPLE required; skipping test",
mpi.thread_level()
);
return;
}
let world = mpi.world();
// Shared collection: each thread appends its Communicators (still live) so
// all 200 slots are simultaneously occupied when we check for duplicates.
let comms: Arc<Mutex<Vec<Communicator>>> =
Arc::new(Mutex::new(Vec::with_capacity(TOTAL_HANDLES)));
// std::thread::scope borrows `world` and `comms` for all workers.
std::thread::scope(|s| {
for _thread_id in 0..NUM_THREADS {
let comms_ref = Arc::clone(&comms);
let world_ref = &world;
s.spawn(move || {
// Allocate all 50 communicators before touching the shared vec
// so that threads are racing in alloc_comm for as long as possible.
let mut local: Vec<Communicator> = Vec::with_capacity(DUPS_PER_THREAD);
for _ in 0..DUPS_PER_THREAD {
let dup = world_ref
.duplicate()
.expect("world.duplicate() must not fail");
local.push(dup);
}
// Transfer all live communicators to the shared collection.
// The Mutex acquisition happens only once per thread, after all
// allocations are done, keeping the hot alloc_comm loop lock-free.
comms_ref.lock().expect("mutex poisoned").extend(local);
});
}
// All threads join here (scope exit). All 200 Communicators are now
// alive inside `comms`.
});
// Check that all 200 simultaneously-live handles are distinct.
let collected = comms.lock().expect("mutex poisoned");
assert_eq!(
collected.len(),
TOTAL_HANDLES,
"expected {TOTAL_HANDLES} communicators, got {}",
collected.len()
);
let handles: Vec<i32> = collected.iter().map(|c| c.raw_handle()).collect();
let unique: HashSet<i32> = handles.iter().copied().collect();
assert_eq!(
unique.len(),
TOTAL_HANDLES,
"duplicate comm handles detected: {} unique out of {} total — slot collision in alloc_comm",
unique.len(),
TOTAL_HANDLES
);
// Drop collected here (end of scope) — exercises concurrent ferrompi_comm_free.
drop(collected);
println!("OK: comm table concurrency test passed ({TOTAL_HANDLES} distinct handles)");
}