1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
//! Communicator management operations: duplicate, split, abort, topology.
use crate::comm::{Communicator, SplitType};
use crate::error::{Error, Result};
use crate::ffi;
use crate::group::Group;
impl Communicator {
/// Get the processor name for this process.
pub fn processor_name(&self) -> Result<String> {
let mut buf = [0u8; 256];
let mut len: i32 = 0;
// SAFETY: buf is a 256-byte stack array, exclusively writable; the C shim writes at most
// MPI_MAX_PROCESSOR_NAME bytes and sets len to the actual length.
let ret = unsafe {
ffi::ferrompi_get_processor_name(buf.as_mut_ptr().cast::<std::ffi::c_char>(), &mut len)
};
Error::check_with_op(ret, "get_processor_name")?;
let len = (len.max(0) as usize).min(buf.len());
let s = std::str::from_utf8(&buf[..len])
.map_err(|_| Error::Internal("Invalid UTF-8 in processor name".into()))?;
Ok(s.to_string())
}
/// Gather topology information from all ranks in this communicator.
///
/// This is a **collective operation** — all ranks must call it.
/// Every rank receives the complete [`TopologyInfo`], which includes the
/// rank-to-host mapping, MPI version metadata, and (with the `numa`
/// feature) SLURM job information.
///
/// [`TopologyInfo`]: crate::TopologyInfo
///
/// # Example
///
/// ```no_run
/// use ferrompi::Mpi;
///
/// let mpi = Mpi::init().unwrap();
/// let world = mpi.world();
/// let topo = world.topology(&mpi).unwrap();
/// if world.rank() == 0 {
/// println!("{topo}");
/// }
/// ```
pub fn topology(&self, mpi: &crate::Mpi) -> Result<crate::TopologyInfo> {
crate::topology::gather_topology(self, mpi)
}
/// Get the group associated with this communicator.
///
/// Returns a [`Group`] containing all processes in this communicator.
/// Use [`Group::include`] or [`Group::exclude`] on the returned group
/// to derive sub-groups, which can then be used with
/// `MPI_Comm_create_group` (a future epic).
///
/// # Errors
///
/// Returns an error if the underlying `MPI_Comm_group` call fails or if
/// the C-side group table is full.
///
/// # Example
///
/// ```no_run
/// use ferrompi::Mpi;
///
/// let mpi = Mpi::init().unwrap();
/// let world = mpi.world();
/// let g = world.group().unwrap();
/// assert_eq!(g.size().unwrap(), world.size());
/// ```
pub fn group(&self) -> Result<Group> {
let mut group_handle: i32 = 0;
// SAFETY: self.handle is owned by this Communicator; group_handle is an exclusive output.
let ret = unsafe { ffi::ferrompi_comm_group(self.handle, &mut group_handle) };
Error::check_with_op(ret, "comm_group")?;
Ok(Group {
handle: group_handle,
})
}
/// Duplicate this communicator.
pub fn duplicate(&self) -> Result<Self> {
let mut new_handle: i32 = 0;
// SAFETY: self.handle is owned by this Communicator; new_handle is an exclusive output.
let ret = unsafe { ffi::ferrompi_comm_dup(self.handle, &mut new_handle) };
Error::check_with_op(ret, "comm_dup")?;
Self::from_handle(new_handle)
}
/// Split this communicator into sub-communicators based on color and key.
///
/// Processes with the same `color` are placed in the same new communicator.
/// The `key` controls the rank ordering within the new communicator.
///
/// Returns `None` if this process used [`Communicator::UNDEFINED`] as color.
///
/// # Example
///
/// ```no_run
/// use ferrompi::Mpi;
///
/// let mpi = Mpi::init().unwrap();
/// let world = mpi.world();
/// let color = world.rank() % 2; // Even/odd split
/// if let Some(sub) = world.split(color, world.rank()).unwrap() {
/// println!("Rank {} in sub-communicator of size {}", sub.rank(), sub.size());
/// }
/// ```
pub fn split(&self, color: i32, key: i32) -> Result<Option<Communicator>> {
let mut new_handle: i32 = 0;
// SAFETY: self.handle is owned by this Communicator; remaining arguments are scalars.
let ret = unsafe { ffi::ferrompi_comm_split(self.handle, color, key, &mut new_handle) };
Error::check_with_op(ret, "comm_split")?;
if new_handle < 0 {
Ok(None)
} else {
Self::from_handle(new_handle).map(Some)
}
}
/// Split this communicator by type.
///
/// Processes that share the same resource (determined by `split_type`) are
/// placed in the same new communicator. The `key` controls the rank ordering
/// within the new communicator.
///
/// Returns `None` if MPI returns `MPI_COMM_NULL` for this process.
///
/// # Example
///
/// ```no_run
/// use ferrompi::{Mpi, SplitType};
///
/// let mpi = Mpi::init().unwrap();
/// let world = mpi.world();
/// if let Some(node) = world.split_type(SplitType::Shared, world.rank()).unwrap() {
/// println!("Node has {} processes", node.size());
/// }
/// ```
pub fn split_type(&self, split_type: SplitType, key: i32) -> Result<Option<Communicator>> {
let mut new_handle: i32 = 0;
// SAFETY: self.handle is owned by this Communicator; remaining arguments are scalars.
let ret = unsafe {
ffi::ferrompi_comm_split_type(self.handle, split_type as i32, key, &mut new_handle)
};
Error::check_with_op(ret, "comm_split_type")?;
if new_handle < 0 {
Ok(None)
} else {
Self::from_handle(new_handle).map(Some)
}
}
/// Create a communicator containing only processes that share memory.
///
/// This is equivalent to `split_type(SplitType::Shared, self.rank())`.
/// All processes on the same physical node will be in the same communicator.
///
/// # Errors
///
/// Returns [`Error::Internal`] if MPI unexpectedly returns a null communicator,
/// which should not happen for `MPI_COMM_TYPE_SHARED` under normal conditions.
///
/// # Example
///
/// ```no_run
/// use ferrompi::Mpi;
///
/// let mpi = Mpi::init().unwrap();
/// let world = mpi.world();
/// let node = world.split_shared().unwrap();
/// println!("Node has {} processes, I am local rank {}", node.size(), node.rank());
/// ```
pub fn split_shared(&self) -> Result<Communicator> {
self.split_type(SplitType::Shared, self.rank())?
.ok_or_else(|| Error::Internal("split_shared returned null communicator".into()))
}
/// Create a sub-communicator from a group.
///
/// `group` must be a subset of `self`'s group. This call is collective
/// over `self` — every rank in the parent communicator must invoke it,
/// even ranks not in `group`.
///
/// Returns `Ok(None)` for ranks that are not members of `group`
/// (MPI returns `MPI_COMM_NULL` for those ranks). Returns
/// `Ok(Some(comm))` for ranks that are members.
///
/// # Errors
///
/// Returns an error if `group` contains a rank not present in the parent
/// communicator (`MPI_ERR_GROUP`), or if the C-side communicator table is
/// full (`MPI_ERR_OTHER`).
///
/// # Example
///
/// ```no_run
/// use ferrompi::Mpi;
///
/// let mpi = Mpi::init().unwrap();
/// let world = mpi.world();
/// let g = world.group().unwrap().include(&[0, 2]).unwrap();
/// // Collective: every rank calls create_from_group
/// if let Some(sub) = world.create_from_group(&g).unwrap() {
/// assert_eq!(sub.size(), 2);
/// }
/// ```
pub fn create_from_group(&self, group: &Group) -> Result<Option<Communicator>> {
let mut new_handle: i32 = -1;
// SAFETY: self.handle is owned by this Communicator; group.handle is owned by the Group
// argument and remains valid for the duration of this call.
let ret = unsafe {
ffi::ferrompi_comm_create_from_group_parent(self.handle, group.handle, &mut new_handle)
};
Error::check_with_op(ret, "comm_create_from_group_parent")?;
if new_handle < 0 {
Ok(None)
} else {
Self::from_handle(new_handle).map(Some)
}
}
/// Abort MPI execution across all processes in this communicator.
///
/// This function terminates all processes associated with the communicator.
/// It calls `MPI_Abort` and then exits the process. This function never
/// returns.
///
/// # Arguments
///
/// * `errorcode` - Error code to return to the invoking environment
///
/// # Example
///
/// ```no_run
/// # use ferrompi::Mpi;
/// # let mpi = Mpi::init().unwrap();
/// # let world = mpi.world();
/// if world.rank() == 0 {
/// // Fatal error detected, abort all processes
/// world.abort(1);
/// }
/// ```
///
/// # Fallback Behavior
///
/// In the standard case `MPI_Abort` terminates the entire process
/// group and this function never returns. If `MPI_Abort` itself
/// returns (non-standard but observed in some implementations during
/// internal failures), this function falls back to
/// [`std::process::abort`], which raises `SIGABRT` and terminates
/// the current process immediately. The original `errorcode` is lost
/// in that fallback path because `process::abort` does not accept an
/// exit code; the signal code (usually 134 = 128 + 6) is the best
/// evidence the caller has that the fallback triggered.
pub fn abort(&self, errorcode: i32) -> ! {
// SAFETY: ferrompi_abort delegates to MPI_Abort, which is
// defined to terminate all processes in the communicator and
// never return. This call is safe to make with any valid
// communicator handle; self.handle is always valid because
// Communicator is only constructed through safe methods that
// guarantee handle validity.
unsafe { ffi::ferrompi_abort(self.handle, errorcode) };
// Defense in depth: the MPI standard says MPI_Abort "should"
// terminate all processes but does not strictly guarantee the
// calling process aborts before return. If MPI_Abort ever
// returns (non-standard implementation behavior), we must still
// honor the `-> !` contract. std::process::abort() raises
// SIGABRT and is guaranteed to diverge without running
// destructors — which is the right outcome, because any
// destructor that touches MPI state after a failed MPI_Abort
// has undefined behavior.
std::process::abort()
}
}
#[cfg(test)]
mod tests {
use crate::comm::Communicator;
use crate::error::Result;
use crate::group::Group;
// Compile-time witness: verifies that create_from_group has the expected signature.
#[allow(dead_code)]
fn create_from_group_signature_compiles(
c: &Communicator,
g: &Group,
) -> Result<Option<Communicator>> {
c.create_from_group(g)
}
}