//! Collective communication
//!
//! Developing...
//!
//! # Unfinished features
//!
//! - **5.8**: All-to-all, `MPI_Alltoallw()`
//! - **5.10**: Reduce-scatter, `MPI_Reduce_scatter()`
//! - **5.12**: Nonblocking collective operations,
//! `MPI_Ialltoallw()`, `MPI_Ireduce_scatter()`
#![allow(
clippy::missing_panics_doc,
clippy::ptr_as_ptr,
clippy::used_underscore_binding
)]
#[cfg(feature = "user-operations")]
use std::mem;
use std::os::raw::{c_int, c_void};
use std::{fmt, ptr};
#[cfg(feature = "user-operations")]
use libffi::middle::{Cif, Closure, Type};
use crate::ffi;
use crate::ffi::MPI_Op;
use crate::datatype::traits::{
Buffer, BufferMut, Equivalence, PartitionedBuffer, PartitionedBufferMut,
};
#[cfg(feature = "user-operations")]
use crate::datatype::{DatatypeRef, DynBuffer, DynBufferMut};
use crate::raw::traits::{AsRaw, FromRaw};
use crate::request::{Request, Scope, StaticScope};
use crate::topology::traits::{AsCommunicator, Communicator};
use crate::topology::{Process, Rank};
use crate::with_uninitialized;
/// Collective communication traits
pub mod traits {
pub use super::{CommunicatorCollectives, Operation, Root};
}
/// Collective communication patterns defined on `Communicator`s
pub trait CommunicatorCollectives: Communicator {
/// Barrier synchronization among all processes in a `Communicator`
///
/// Partake in a barrier synchronization across all processes in the `Communicator` `&self`.
///
/// Calling processes (or threads within the calling processes) will enter the barrier and block
/// execution until all processes in the `Communicator` `&self` have entered the barrier.
///
/// # Examples
///
/// See `examples/barrier.rs`
///
/// # Standard section(s)
///
/// 5.3
fn barrier(&self) {
unsafe {
ffi::MPI_Barrier(self.as_raw());
}
}
/// Gather contents of buffers on all participating processes.
///
/// After the call completes, the contents of the send `Buffer`s on all processes will be
/// concatenated into the receive `Buffer`s on all ranks.
///
/// All send `Buffer`s must contain the same count of elements.
///
/// # Examples
///
/// See `examples/all_gather.rs`
///
/// # Standard section(s)
///
/// 5.7
fn all_gather_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: BufferMut,
{
unsafe {
ffi::MPI_Allgather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count() / self.size(),
recvbuf.as_datatype().as_raw(),
self.as_raw(),
);
}
}
/// Gather contents of buffers on all participating processes.
///
/// After the call completes, the contents of the send `Buffer`s on all processes will be
/// concatenated into the receive `Buffer`s on all ranks.
///
/// The send `Buffer`s may contain different counts of elements on different processes. The
/// distribution of elements in the receive `Buffer`s is specified via `Partitioned`.
///
/// # Examples
///
/// See `examples/all_gather_varcount.rs`
///
/// # Standard section(s)
///
/// 5.7
fn all_gather_varcount_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: PartitionedBufferMut,
{
unsafe {
ffi::MPI_Allgatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.as_raw(),
);
}
}
/// Distribute the send `Buffer`s from all processes to the receive `Buffer`s on all processes.
///
/// Each process sends and receives the same count of elements to and from each process.
///
/// # Examples
///
/// See `examples/all_to_all.rs`
///
/// # Standard section(s)
///
/// 5.8
fn all_to_all_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: BufferMut,
{
let c_size = self.size();
unsafe {
ffi::MPI_Alltoall(
sendbuf.pointer(),
sendbuf.count() / c_size,
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count() / c_size,
recvbuf.as_datatype().as_raw(),
self.as_raw(),
);
}
}
/// Distribute the send `Buffer`s from all processes to the receive `Buffer`s on all processes.
///
/// The count of elements to send and receive to and from each process can vary and is specified
/// using `Partitioned`.
///
/// # Standard section(s)
///
/// 5.8
fn all_to_all_varcount_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: PartitionedBuffer,
R: PartitionedBufferMut,
{
unsafe {
ffi::MPI_Alltoallv(
sendbuf.pointer(),
sendbuf.counts().as_ptr(),
sendbuf.displs().as_ptr(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.as_raw(),
);
}
}
/// Performs a global reduction under the operation `op` of the input data in `sendbuf` and
/// stores the result in `recvbuf` on all processes.
///
/// # Examples
///
/// See `examples/reduce.rs`
///
/// # Standard section(s)
///
/// 5.9.6
fn all_reduce_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where
S: Buffer,
R: BufferMut,
O: Operation,
{
unsafe {
ffi::MPI_Allreduce(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
);
}
}
/// Performs an element-wise global reduction under the operation `op` of the input data in
/// `sendbuf` and scatters the result into equal sized blocks in the receive buffers on all
/// processes.
///
/// # Examples
///
/// See `examples/reduce.rs`
///
/// # Standard section(s)
///
/// 5.10.1
fn reduce_scatter_block_into<S: ?Sized, R: ?Sized, O>(
&self,
sendbuf: &S,
recvbuf: &mut R,
op: O,
) where
S: Buffer,
R: BufferMut,
O: Operation,
{
assert_eq!(recvbuf.count() * self.size(), sendbuf.count());
unsafe {
ffi::MPI_Reduce_scatter_block(
sendbuf.pointer(),
recvbuf.pointer_mut(),
recvbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
);
}
}
/// Performs a global inclusive prefix reduction of the data in `sendbuf` into `recvbuf` under
/// operation `op`.
///
/// # Examples
///
/// See `examples/scan.rs`
///
/// # Standard section(s)
///
/// 5.11.1
fn scan_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where
S: Buffer,
R: BufferMut,
O: Operation,
{
unsafe {
ffi::MPI_Scan(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
);
}
}
/// Performs a global exclusive prefix reduction of the data in `sendbuf` into `recvbuf` under
/// operation `op`.
///
/// # Examples
///
/// See `examples/scan.rs`
///
/// # Standard section(s)
///
/// 5.11.2
fn exclusive_scan_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where
S: Buffer,
R: BufferMut,
O: Operation,
{
unsafe {
ffi::MPI_Exscan(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
);
}
}
/// Non-blocking barrier synchronization among all processes in a `Communicator`
///
/// Calling processes (or threads within the calling processes) enter the barrier. Completion
/// methods on the associated request object will block until all processes have entered.
///
/// # Examples
///
/// See `examples/immediate_barrier.rs`
///
/// # Standard section(s)
///
/// 5.12.1
fn immediate_barrier(&self) -> Request<'static> {
unsafe {
Request::from_raw(
with_uninitialized(|request| ffi::MPI_Ibarrier(self.as_raw(), request)).1,
StaticScope,
)
}
}
/// Initiate non-blocking gather of the contents of all `sendbuf`s into all `rcevbuf`s on all
/// processes in the communicator.
///
/// # Examples
///
/// See `examples/immediate_all_gather.rs`
///
/// # Standard section(s)
///
/// 5.12.5
fn immediate_all_gather_into<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
Sc: Scope<'a>,
{
unsafe {
let recvcount = recvbuf.count() / self.size();
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iallgather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvcount,
recvbuf.as_datatype().as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiate non-blocking gather of the contents of all `sendbuf`s into all `rcevbuf`s on all
/// processes in the communicator.
///
/// # Examples
///
/// See `examples/immediate_all_gather_varcount.rs`
///
/// # Standard section(s)
///
/// 5.12.5
fn immediate_all_gather_varcount_into<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + PartitionedBufferMut,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iallgatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiate non-blocking all-to-all communication.
///
/// # Examples
///
/// See `examples/immediate_all_to_all.rs`
///
/// # Standard section(s)
///
/// 5.12.6
fn immediate_all_to_all_into<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
Sc: Scope<'a>,
{
let c_size = self.size();
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ialltoall(
sendbuf.pointer(),
sendbuf.count() / c_size,
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count() / c_size,
recvbuf.as_datatype().as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiate non-blocking all-to-all communication.
///
/// # Standard section(s)
///
/// 5.12.6
fn immediate_all_to_all_varcount_into<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + PartitionedBuffer,
R: 'a + PartitionedBufferMut,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ialltoallv(
sendbuf.pointer(),
sendbuf.counts().as_ptr(),
sendbuf.displs().as_ptr(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiates a non-blocking global reduction under the operation `op` of the input data in
/// `sendbuf` and stores the result in `recvbuf` on all processes.
///
/// # Examples
///
/// See `examples/immediate_reduce.rs`
///
/// # Standard section(s)
///
/// 5.12.8
fn immediate_all_reduce_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
O: 'a + Operation,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iallreduce(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiates a non-blocking element-wise global reduction under the operation `op` of the
/// input data in `sendbuf` and scatters the result into equal sized blocks in the receive
/// buffers on all processes.
///
/// # Examples
///
/// See `examples/immediate_reduce.rs`
///
/// # Standard section(s)
///
/// 5.12.9
fn immediate_reduce_scatter_block_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
O: 'a + Operation,
Sc: Scope<'a>,
{
assert_eq!(recvbuf.count() * self.size(), sendbuf.count());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ireduce_scatter_block(
sendbuf.pointer(),
recvbuf.pointer_mut(),
recvbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiates a non-blocking global inclusive prefix reduction of the data in `sendbuf` into
/// `recvbuf` under operation `op`.
///
/// # Examples
///
/// See `examples/immediate_scan.rs`
///
/// # Standard section(s)
///
/// 5.12.11
fn immediate_scan_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
O: 'a + Operation,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iscan(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiates a non-blocking global exclusive prefix reduction of the data in `sendbuf` into
/// `recvbuf` under operation `op`.
///
/// # Examples
///
/// See `examples/immediate_scan.rs`
///
/// # Standard section(s)
///
/// 5.12.12
fn immediate_exclusive_scan_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
O: 'a + Operation,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iexscan(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
}
impl<C: Communicator> CommunicatorCollectives for C {}
/// Something that can take the role of 'root' in a collective operation.
///
/// Many collective operations define a 'root' process that takes a special role in the
/// communication. These collective operations are implemented as default methods of this trait.
pub trait Root: AsCommunicator {
/// Rank of the root process
fn root_rank(&self) -> Rank;
/// Broadcast of the contents of a buffer
///
/// After the call completes, the `Buffer` on all processes in the `Communicator` of the `Root`
/// `&self` will contain what it contains on the `Root`.
///
/// # Examples
///
/// See `examples/broadcast.rs`
///
/// # Standard section(s)
///
/// 5.4
fn broadcast_into<Buf: ?Sized>(&self, buffer: &mut Buf)
where
Buf: BufferMut,
{
unsafe {
ffi::MPI_Bcast(
buffer.pointer_mut(),
buffer.count(),
buffer.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
/// Gather contents of buffers on `Root`.
///
/// After the call completes, the contents of the `Buffer`s on all ranks will be
/// concatenated into the `Buffer` on `Root`.
///
/// All send `Buffer`s must have the same count of elements.
///
/// This function must be called on all non-root processes.
///
/// # Examples
///
/// See `examples/gather.rs`
///
/// # Standard section(s)
///
/// 5.5
fn gather_into<S: ?Sized>(&self, sendbuf: &S)
where
S: Buffer,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Gather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
ptr::null_mut(),
0,
u8::equivalent_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
/// Gather contents of buffers on `Root`.
///
/// After the call completes, the contents of the `Buffer`s on all ranks will be
/// concatenated into the `Buffer` on `Root`.
///
/// All send `Buffer`s must have the same count of elements.
///
/// This function must be called on the root process.
///
/// # Examples
///
/// See `examples/gather.rs`
///
/// # Standard section(s)
///
/// 5.5
fn gather_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: BufferMut,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
let recvcount = recvbuf.count() / self.as_communicator().size();
ffi::MPI_Gather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvcount,
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
/// Gather contents of buffers on `Root`.
///
/// After the call completes, the contents of the `Buffer`s on all ranks will be
/// concatenated into the `Buffer` on `Root`.
///
/// The send `Buffer`s may contain different counts of elements on different processes. The
/// distribution of elements in the receive `Buffer` is specified via `Partitioned`.
///
/// This function must be called on all non-root processes.
///
/// # Examples
///
/// See `examples/gather_varcount.rs`
///
/// # Standard section(s)
///
/// 5.5
fn gather_varcount_into<S: ?Sized>(&self, sendbuf: &S)
where
S: Buffer,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Gatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
ptr::null_mut(),
ptr::null(),
ptr::null(),
u8::equivalent_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
/// Gather contents of buffers on `Root`.
///
/// After the call completes, the contents of the `Buffer`s on all ranks will be
/// concatenated into the `Buffer` on `Root`.
///
/// The send `Buffer`s may contain different counts of elements on different processes. The
/// distribution of elements in the receive `Buffer` is specified via `Partitioned`.
///
/// This function must be called on the root process.
///
/// # Examples
///
/// See `examples/gather_varcount.rs`
///
/// # Standard section(s)
///
/// 5.5
fn gather_varcount_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: PartitionedBufferMut,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Gatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
/// Scatter contents of a buffer on the root process to all processes.
///
/// After the call completes each participating process will have received a part of the send
/// `Buffer` on the root process.
///
/// All send `Buffer`s must have the same count of elements.
///
/// This function must be called on all non-root processes.
///
/// # Examples
///
/// See `examples/scatter.rs`
///
/// # Standard section(s)
///
/// 5.6
fn scatter_into<R: ?Sized>(&self, recvbuf: &mut R)
where
R: BufferMut,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Scatter(
ptr::null(),
0,
u8::equivalent_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
/// Scatter contents of a buffer on the root process to all processes.
///
/// After the call completes each participating process will have received a part of the send
/// `Buffer` on the root process.
///
/// All send `Buffer`s must have the same count of elements.
///
/// This function must be called on the root process.
///
/// # Examples
///
/// See `examples/scatter.rs`
///
/// # Standard section(s)
///
/// 5.6
fn scatter_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: BufferMut,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
let sendcount = sendbuf.count() / self.as_communicator().size();
unsafe {
ffi::MPI_Scatter(
sendbuf.pointer(),
sendcount,
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
/// Scatter contents of a buffer on the root process to all processes.
///
/// After the call completes each participating process will have received a part of the send
/// `Buffer` on the root process.
///
/// The send `Buffer` may contain different counts of elements for different processes. The
/// distribution of elements in the send `Buffer` is specified via `Partitioned`.
///
/// This function must be called on all non-root processes.
///
/// # Examples
///
/// See `examples/scatter_varcount.rs`
///
/// # Standard section(s)
///
/// 5.6
fn scatter_varcount_into<R: ?Sized>(&self, recvbuf: &mut R)
where
R: BufferMut,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Scatterv(
ptr::null(),
ptr::null(),
ptr::null(),
u8::equivalent_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
/// Scatter contents of a buffer on the root process to all processes.
///
/// After the call completes each participating process will have received a part of the send
/// `Buffer` on the root process.
///
/// The send `Buffer` may contain different counts of elements for different processes. The
/// distribution of elements in the send `Buffer` is specified via `Partitioned`.
///
/// This function must be called on the root process.
///
/// # Examples
///
/// See `examples/scatter_varcount.rs`
///
/// # Standard section(s)
///
/// 5.6
fn scatter_varcount_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: PartitionedBuffer,
R: BufferMut,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Scatterv(
sendbuf.pointer(),
sendbuf.counts().as_ptr(),
sendbuf.displs().as_ptr(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
/// Performs a global reduction under the operation `op` of the input data in `sendbuf` and
/// stores the result on the `Root` process.
///
/// This function must be called on all non-root processes.
///
/// # Examples
///
/// See `examples/reduce.rs`
///
/// # Standard section(s)
///
/// 5.9.1
fn reduce_into<S: ?Sized, O>(&self, sendbuf: &S, op: O)
where
S: Buffer,
O: Operation,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Reduce(
sendbuf.pointer(),
ptr::null_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
/// Performs a global reduction under the operation `op` of the input data in `sendbuf` and
/// stores the result on the `Root` process.
///
/// This function must be called on the root process.
///
/// # Examples
///
/// See `examples/reduce.rs`
///
/// # Standard section(s)
///
/// 5.9.1
fn reduce_into_root<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where
S: Buffer,
R: BufferMut,
O: Operation,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Reduce(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
/// Initiate broadcast of a value from the `Root` process to all other processes.
///
/// # Examples
///
/// See `examples/immediate_broadcast.rs`
///
/// # Standard section(s)
///
/// 5.12.2
fn immediate_broadcast_into<'a, Sc, Buf: ?Sized>(
&self,
scope: Sc,
buf: &'a mut Buf,
) -> Request<'a, Sc>
where
Buf: 'a + BufferMut,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ibcast(
buf.pointer_mut(),
buf.count(),
buf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
///
/// This function must be called on all non-root processes.
///
/// # Examples
///
/// See `examples/immediate_gather.rs`
///
/// # Standard section(s)
///
/// 5.12.3
fn immediate_gather_into<'a, Sc, S: ?Sized>(&self, scope: Sc, sendbuf: &'a S) -> Request<'a, Sc>
where
S: 'a + Buffer,
Sc: Scope<'a>,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Igather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
ptr::null_mut(),
0,
u8::equivalent_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
///
/// This function must be called on the root processes.
///
/// # Examples
///
/// See `examples/immediate_gather.rs`
///
/// # Standard section(s)
///
/// 5.12.3
fn immediate_gather_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
Sc: Scope<'a>,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
let recvcount = recvbuf.count() / self.as_communicator().size();
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Igather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvcount,
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
///
/// This function must be called on all non-root processes.
///
/// # Examples
///
/// See `examples/immediate_gather_varcount.rs`
///
/// # Standard section(s)
///
/// 5.12.3
fn immediate_gather_varcount_into<'a, Sc, S: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
Sc: Scope<'a>,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Igatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
ptr::null_mut(),
ptr::null(),
ptr::null(),
u8::equivalent_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
///
/// This function must be called on the root processes.
///
/// # Examples
///
/// See `examples/immediate_gather_varcount.rs`
///
/// # Standard section(s)
///
/// 5.12.3
fn immediate_gather_varcount_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + PartitionedBufferMut,
Sc: Scope<'a>,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Igatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
///
/// This function must be called on all non-root processes.
///
/// # Examples
///
/// See `examples/immediate_scatter.rs`
///
/// # Standard section(s)
///
/// 5.12.4
fn immediate_scatter_into<'a, Sc, R: ?Sized>(
&self,
scope: Sc,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
R: 'a + BufferMut,
Sc: Scope<'a>,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iscatter(
ptr::null(),
0,
u8::equivalent_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
///
/// This function must be called on the root processes.
///
/// # Examples
///
/// See `examples/immediate_scatter.rs`
///
/// # Standard section(s)
///
/// 5.12.4
fn immediate_scatter_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
Sc: Scope<'a>,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
let sendcount = sendbuf.count() / self.as_communicator().size();
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iscatter(
sendbuf.pointer(),
sendcount,
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
///
/// This function must be called on all non-root processes.
///
/// # Examples
///
/// See `examples/immediate_scatter_varcount.rs`
///
/// # Standard section(s)
///
/// 5.12.4
fn immediate_scatter_varcount_into<'a, Sc, R: ?Sized>(
&self,
scope: Sc,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
R: 'a + BufferMut,
Sc: Scope<'a>,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iscatterv(
ptr::null(),
ptr::null(),
ptr::null(),
u8::equivalent_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
///
/// This function must be called on the root processes.
///
/// # Examples
///
/// See `examples/immediate_scatter_varcount.rs`
///
/// # Standard section(s)
///
/// 5.12.4
fn immediate_scatter_varcount_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + PartitionedBuffer,
R: 'a + BufferMut,
Sc: Scope<'a>,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iscatterv(
sendbuf.pointer(),
sendbuf.counts().as_ptr(),
sendbuf.displs().as_ptr(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiates a non-blacking global reduction under the operation `op` of the input data in
/// `sendbuf` and stores the result on the `Root` process.
///
/// This function must be called on all non-root processes.
///
/// # Examples
///
/// See `examples/immediate_reduce.rs`
///
/// # Standard section(s)
///
/// 5.12.7
fn immediate_reduce_into<'a, Sc, S: ?Sized, O>(
&self,
scope: Sc,
sendbuf: &'a S,
op: O,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
O: 'a + Operation,
Sc: Scope<'a>,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ireduce(
sendbuf.pointer(),
ptr::null_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
/// Initiates a non-blocking global reduction under the operation `op` of the input data in
/// `sendbuf` and stores the result on the `Root` process.
///
/// # Examples
///
/// See `examples/immediate_reduce.rs`
///
/// This function must be called on the root process.
///
/// # Standard section(s)
///
/// 5.12.7
fn immediate_reduce_into_root<'a, Sc, S: ?Sized, R: ?Sized, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
O: 'a + Operation,
Sc: Scope<'a>,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ireduce(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
}
impl<'a, C: 'a + Communicator> Root for Process<'a, C> {
fn root_rank(&self) -> Rank {
self.rank()
}
}
/// An operation to be used in a reduction or scan type operation, e.g. `MPI_SUM`
pub trait Operation: AsRaw<Raw = MPI_Op> {
/// Returns whether the operation is commutative.
///
/// # Standard section(s)
///
/// 5.9.7
fn is_commutative(&self) -> bool {
unsafe {
let mut commute = 0;
ffi::MPI_Op_commutative(self.as_raw(), &mut commute);
commute != 0
}
}
}
impl<'a, T: 'a + Operation> Operation for &'a T {}
/// A built-in operation like `MPI_SUM`
///
/// # Examples
///
/// See `examples/reduce.rs`
///
/// # Standard section(s)
///
/// 5.9.2
#[derive(Copy, Clone)]
pub struct SystemOperation(MPI_Op);
macro_rules! system_operation_constructors {
($($ctor:ident => $val:path),*) => (
$(pub fn $ctor() -> SystemOperation {
//! A built-in operation
SystemOperation(unsafe { $val })
})*
)
}
impl SystemOperation {
system_operation_constructors! {
max => ffi::RSMPI_MAX,
min => ffi::RSMPI_MIN,
sum => ffi::RSMPI_SUM,
product => ffi::RSMPI_PROD,
logical_and => ffi::RSMPI_LAND,
bitwise_and => ffi::RSMPI_BAND,
logical_or => ffi::RSMPI_LOR,
bitwise_or => ffi::RSMPI_BOR,
logical_xor => ffi::RSMPI_LXOR,
bitwise_xor => ffi::RSMPI_BXOR
}
}
unsafe impl AsRaw for SystemOperation {
type Raw = MPI_Op;
fn as_raw(&self) -> Self::Raw {
self.0
}
}
impl Operation for SystemOperation {}
trait Erased {}
impl<T> Erased for T {}
/// A user-defined operation.
///
/// The lifetime `'a` of the operation is limited by the lifetime of the underlying closure.
///
/// For safety reasons, `UserOperation` is in of itself not considered an `Operation`, but a
/// reference of it is. This limitation may be lifted in the future when `Request` objects can
/// store finalizers.
///
/// **Note:** When a `UserOperation` is passed to a non-blocking API call, it must outlive the
/// completion of the request. This is normally enforced by the safe API, so this is only a concern
/// if you use the unsafe API. Do not rely on MPI's internal reference-counting here, because once
/// `UserOperation` is destroyed, the closure object will be deallocated even if the `MPI_Op` handle
/// is still alive due to outstanding references.
///
/// # Examples
///
/// See `examples/reduce.rs` and `examples/immediate_reduce.rs`
#[cfg(feature = "user-operations")]
pub struct UserOperation<'a> {
op: MPI_Op,
_anchor: Box<dyn Erased + 'a>, // keeps the internal data alive
}
#[cfg(feature = "user-operations")]
impl<'a> fmt::Debug for UserOperation<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("UserOperation").field(&self.op).finish()
}
}
#[cfg(feature = "user-operations")]
impl<'a> Drop for UserOperation<'a> {
fn drop(&mut self) {
unsafe {
ffi::MPI_Op_free(&mut self.op);
}
}
}
#[cfg(feature = "user-operations")]
unsafe impl<'a> AsRaw for UserOperation<'a> {
type Raw = MPI_Op;
fn as_raw(&self) -> Self::Raw {
self.op
}
}
#[cfg(feature = "user-operations")]
impl<'a, 'b> Operation for &'b UserOperation<'a> {}
#[cfg(feature = "user-operations")]
impl<'a> UserOperation<'a> {
/// Define an operation using a closure. The operation must be associative.
///
/// This is a more readable shorthand for the `new` method. Refer to [`new`](#method.new) for
/// more information.
pub fn associative<F>(function: F) -> Self
where
F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
{
Self::new(false, function)
}
/// Define an operation using a closure. The operation must be both associative and
/// commutative.
///
/// This is a more readable shorthand for the `new` method. Refer to [`new`](#method.new) for
/// more information.
pub fn commutative<F>(function: F) -> Self
where
F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
{
Self::new(true, function)
}
/// Creates an associative and possibly commutative operation using a closure.
///
/// The closure receives two arguments `invec` and `inoutvec` as dynamically typed buffers. It
/// shall set `inoutvec` to the value of `f(invec, inoutvec)`, where `f` is a binary associative
/// operation.
///
/// If the operation is also commutative, setting `commute` to `true` may yield performance
/// benefits.
///
/// **Note:** If the closure panics, the entire program will abort.
///
/// # Standard section(s)
///
/// 5.9.5
pub fn new<F>(commute: bool, function: F) -> Self
where
F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
{
struct ClosureAnchor<F> {
rust_closure: F,
_ffi_closure: Option<Closure<'static>>,
}
unsafe extern "C" fn trampoline<'a, F: Fn(DynBuffer, DynBufferMut) + Sync + 'a>(
cif: &libffi::low::ffi_cif,
_result: &mut c_void,
args: *const *const c_void,
user_function: &F,
) {
debug_assert_eq!(4, cif.nargs);
let (mut invec, mut inoutvec, len, datatype) = (
*(*args.offset(0) as *const *mut c_void),
*(*args.offset(1) as *const *mut c_void),
*(*args.offset(2) as *const *mut i32),
*(*args.offset(3) as *const *mut ffi::MPI_Datatype),
);
let len = *len;
let datatype = DatatypeRef::from_raw(*datatype);
if len == 0 {
// precautionary measure: ensure pointers are not null
invec = [].as_mut_ptr();
inoutvec = [].as_mut_ptr();
}
user_function(
DynBuffer::from_raw(invec, len, datatype),
DynBufferMut::from_raw(inoutvec, len, datatype),
)
}
// must box it to prevent moves
let mut anchor = Box::new(ClosureAnchor {
rust_closure: function,
_ffi_closure: None,
});
let args = [
Type::pointer(), // void *
Type::pointer(), // void *
Type::pointer(), // int32_t *
Type::pointer(), // MPI_Datatype *
];
#[allow(unused_mut)]
let mut cif = Cif::new(args.iter().cloned(), Type::void());
// MS-MPI uses "stdcall" calling convention on 32-bit x86
#[cfg(all(msmpi, target_arch = "x86"))]
cif.set_abi(libffi::raw::ffi_abi_FFI_STDCALL);
let op;
anchor._ffi_closure = Some(unsafe {
let ffi_closure = Closure::new(cif, trampoline, &anchor.rust_closure);
op = with_uninitialized(|op| {
ffi::MPI_Op_create(Some(*ffi_closure.instantiate_code_ptr()), commute as _, op)
})
.1;
mem::transmute(ffi_closure) // erase the lifetime
});
UserOperation {
op,
_anchor: anchor,
}
}
/// Creates a `UserOperation` from raw parts.
///
/// Here, `anchor` is an arbitrary object that is stored alongside the `MPI_Op`.
/// This can be used to attach finalizers to the object.
///
/// # Safety
/// `MPI_Op` must not be `MPI_OP_NULL`
pub unsafe fn from_raw<T: 'a>(op: MPI_Op, anchor: Box<T>) -> Self {
Self {
op,
_anchor: anchor,
}
}
}
/// An unsafe user-defined operation.
///
/// Unsafe user-defined operations are created from pointers to functions that have the unsafe
/// signatures of user functions defined in the MPI C bindings, `UnsafeUserFunction`.
///
/// The recommended way to create user-defined operations is through the safer `UserOperation`
/// type. This type can be used as a work-around in situations where the `libffi` dependency is not
/// available.
pub struct UnsafeUserOperation {
op: MPI_Op,
}
impl fmt::Debug for UnsafeUserOperation {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("UnsafeUserOperation")
.field(&self.op)
.finish()
}
}
impl Drop for UnsafeUserOperation {
fn drop(&mut self) {
unsafe {
ffi::MPI_Op_free(&mut self.op);
}
}
}
unsafe impl AsRaw for UnsafeUserOperation {
type Raw = MPI_Op;
fn as_raw(&self) -> Self::Raw {
self.op
}
}
impl<'a> Operation for &'a UnsafeUserOperation {}
/// A raw pointer to a function that can be used to define an `UnsafeUserOperation`.
#[cfg(not(all(msmpi, target_arch = "x86")))]
pub type UnsafeUserFunction =
unsafe extern "C" fn(*mut c_void, *mut c_void, *mut c_int, *mut ffi::MPI_Datatype);
/// A raw pointer to a function that can be used to define an `UnsafeUserOperation`.
///
/// MS-MPI uses "stdcall" rather than "C" calling convention. "stdcall" is ignored on x86_64
/// Windows and the default calling convention is used instead.
#[cfg(all(msmpi, target_arch = "x86"))]
pub type UnsafeUserFunction =
unsafe extern "stdcall" fn(*mut c_void, *mut c_void, *mut c_int, *mut ffi::MPI_Datatype);
impl UnsafeUserOperation {
/// Define an unsafe operation using a function pointer. The operation must be associative.
///
/// This is a more readable shorthand for the `new` method. Refer to [`new`](#method.new) for
/// more information.
///
/// # Safety
/// The construction of an `UnsafeUserOperation` asserts that `function` is safe to be called
/// in all reductions that this `UnsafeUserOperation` is used in.
pub unsafe fn associative(function: UnsafeUserFunction) -> Self {
Self::new(false, function)
}
/// Define an unsafe operation using a function pointer. The operation must be both associative
/// and commutative.
///
/// This is a more readable shorthand for the `new` method. Refer to [`new`](#method.new) for
/// more information.
///
/// # Safety
/// The construction of an `UnsafeUserOperation` asserts that `function` is safe to be called
/// in all reductions that this `UnsafeUserOperation` is used in.
pub unsafe fn commutative(function: UnsafeUserFunction) -> Self {
Self::new(true, function)
}
/// Creates an associative and possibly commutative unsafe operation using a function pointer.
///
/// The function receives raw `*mut c_void` as `invec` and `inoutvec` and the number of elemnts
/// of those two vectors as a `*mut c_int` `len`. It shall set `inoutvec`
/// to the value of `f(invec, inoutvec)`, where `f` is a binary associative operation.
///
/// If the operation is also commutative, setting `commute` to `true` may yield performance
/// benefits.
///
/// **Note:** The user function is not allowed to panic.
///
/// # Standard section(s)
///
/// 5.9.5
///
/// # Safety
/// The construction of an `UnsafeUserOperation` asserts that `function` is safe to be called
/// in all reductions that this `UnsafeUserOperation` is used in.
pub unsafe fn new(commute: bool, function: UnsafeUserFunction) -> Self {
UnsafeUserOperation {
op: with_uninitialized(|op| ffi::MPI_Op_create(Some(function), commute as _, op)).1,
}
}
}
/// Perform a local reduction.
///
/// # Examples
///
/// See `examples/reduce.rs`
///
/// # Standard section(s)
///
/// 5.9.7
#[allow(clippy::needless_pass_by_value)]
pub fn reduce_local_into<S: ?Sized, R: ?Sized, O>(inbuf: &S, inoutbuf: &mut R, op: O)
where
S: Buffer,
R: BufferMut,
O: Operation,
{
unsafe {
ffi::MPI_Reduce_local(
inbuf.pointer(),
inoutbuf.pointer_mut(),
inbuf.count(),
inbuf.as_datatype().as_raw(),
op.as_raw(),
);
}
}