#![allow(
clippy::missing_panics_doc,
clippy::ptr_as_ptr,
clippy::used_underscore_binding
)]
#[cfg(feature = "user-operations")]
use std::mem;
use std::os::raw::{c_int, c_void};
use std::{fmt, ptr};
#[cfg(feature = "user-operations")]
use libffi::middle::{Cif, Closure, Type};
use crate::ffi;
use crate::ffi::MPI_Op;
use crate::datatype::traits::{
Buffer, BufferMut, Equivalence, PartitionedBuffer, PartitionedBufferMut,
};
#[cfg(feature = "user-operations")]
use crate::datatype::{DatatypeRef, DynBuffer, DynBufferMut};
use crate::raw::traits::{AsRaw, FromRaw};
use crate::request::{Request, Scope, StaticScope};
use crate::topology::traits::{AsCommunicator, Communicator};
use crate::topology::{Process, Rank};
use crate::with_uninitialized;
pub mod traits {
pub use super::{CommunicatorCollectives, Operation, Root};
}
pub trait CommunicatorCollectives: Communicator {
fn barrier(&self) {
unsafe {
ffi::MPI_Barrier(self.as_raw());
}
}
fn all_gather_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: BufferMut,
{
unsafe {
ffi::MPI_Allgather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count() / self.size(),
recvbuf.as_datatype().as_raw(),
self.as_raw(),
);
}
}
fn all_gather_varcount_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: PartitionedBufferMut,
{
unsafe {
ffi::MPI_Allgatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.as_raw(),
);
}
}
fn all_to_all_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: BufferMut,
{
let c_size = self.size();
unsafe {
ffi::MPI_Alltoall(
sendbuf.pointer(),
sendbuf.count() / c_size,
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count() / c_size,
recvbuf.as_datatype().as_raw(),
self.as_raw(),
);
}
}
fn all_to_all_varcount_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: PartitionedBuffer,
R: PartitionedBufferMut,
{
unsafe {
ffi::MPI_Alltoallv(
sendbuf.pointer(),
sendbuf.counts().as_ptr(),
sendbuf.displs().as_ptr(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.as_raw(),
);
}
}
fn all_reduce_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where
S: Buffer,
R: BufferMut,
O: Operation,
{
unsafe {
ffi::MPI_Allreduce(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
);
}
}
fn reduce_scatter_block_into<S: ?Sized, R: ?Sized, O>(
&self,
sendbuf: &S,
recvbuf: &mut R,
op: O,
) where
S: Buffer,
R: BufferMut,
O: Operation,
{
assert_eq!(recvbuf.count() * self.size(), sendbuf.count());
unsafe {
ffi::MPI_Reduce_scatter_block(
sendbuf.pointer(),
recvbuf.pointer_mut(),
recvbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
);
}
}
fn scan_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where
S: Buffer,
R: BufferMut,
O: Operation,
{
unsafe {
ffi::MPI_Scan(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
);
}
}
fn exclusive_scan_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where
S: Buffer,
R: BufferMut,
O: Operation,
{
unsafe {
ffi::MPI_Exscan(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
);
}
}
fn immediate_barrier(&self) -> Request<'static> {
unsafe {
Request::from_raw(
with_uninitialized(|request| ffi::MPI_Ibarrier(self.as_raw(), request)).1,
StaticScope,
)
}
}
fn immediate_all_gather_into<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
Sc: Scope<'a>,
{
unsafe {
let recvcount = recvbuf.count() / self.size();
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iallgather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvcount,
recvbuf.as_datatype().as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_all_gather_varcount_into<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + PartitionedBufferMut,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iallgatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_all_to_all_into<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
Sc: Scope<'a>,
{
let c_size = self.size();
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ialltoall(
sendbuf.pointer(),
sendbuf.count() / c_size,
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count() / c_size,
recvbuf.as_datatype().as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_all_to_all_varcount_into<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + PartitionedBuffer,
R: 'a + PartitionedBufferMut,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ialltoallv(
sendbuf.pointer(),
sendbuf.counts().as_ptr(),
sendbuf.displs().as_ptr(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_all_reduce_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
O: 'a + Operation,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iallreduce(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_reduce_scatter_block_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
O: 'a + Operation,
Sc: Scope<'a>,
{
assert_eq!(recvbuf.count() * self.size(), sendbuf.count());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ireduce_scatter_block(
sendbuf.pointer(),
recvbuf.pointer_mut(),
recvbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_scan_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
O: 'a + Operation,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iscan(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_exclusive_scan_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
O: 'a + Operation,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iexscan(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
request,
)
})
.1,
scope,
)
}
}
}
impl<C: Communicator> CommunicatorCollectives for C {}
pub trait Root: AsCommunicator {
fn root_rank(&self) -> Rank;
fn broadcast_into<Buf: ?Sized>(&self, buffer: &mut Buf)
where
Buf: BufferMut,
{
unsafe {
ffi::MPI_Bcast(
buffer.pointer_mut(),
buffer.count(),
buffer.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn gather_into<S: ?Sized>(&self, sendbuf: &S)
where
S: Buffer,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Gather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
ptr::null_mut(),
0,
u8::equivalent_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn gather_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: BufferMut,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
let recvcount = recvbuf.count() / self.as_communicator().size();
ffi::MPI_Gather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvcount,
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn gather_varcount_into<S: ?Sized>(&self, sendbuf: &S)
where
S: Buffer,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Gatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
ptr::null_mut(),
ptr::null(),
ptr::null(),
u8::equivalent_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn gather_varcount_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: PartitionedBufferMut,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Gatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn scatter_into<R: ?Sized>(&self, recvbuf: &mut R)
where
R: BufferMut,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Scatter(
ptr::null(),
0,
u8::equivalent_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn scatter_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: BufferMut,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
let sendcount = sendbuf.count() / self.as_communicator().size();
unsafe {
ffi::MPI_Scatter(
sendbuf.pointer(),
sendcount,
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn scatter_varcount_into<R: ?Sized>(&self, recvbuf: &mut R)
where
R: BufferMut,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Scatterv(
ptr::null(),
ptr::null(),
ptr::null(),
u8::equivalent_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn scatter_varcount_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: PartitionedBuffer,
R: BufferMut,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Scatterv(
sendbuf.pointer(),
sendbuf.counts().as_ptr(),
sendbuf.displs().as_ptr(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn reduce_into<S: ?Sized, O>(&self, sendbuf: &S, op: O)
where
S: Buffer,
O: Operation,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Reduce(
sendbuf.pointer(),
ptr::null_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn reduce_into_root<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where
S: Buffer,
R: BufferMut,
O: Operation,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Reduce(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn immediate_broadcast_into<'a, Sc, Buf: ?Sized>(
&self,
scope: Sc,
buf: &'a mut Buf,
) -> Request<'a, Sc>
where
Buf: 'a + BufferMut,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ibcast(
buf.pointer_mut(),
buf.count(),
buf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_gather_into<'a, Sc, S: ?Sized>(&self, scope: Sc, sendbuf: &'a S) -> Request<'a, Sc>
where
S: 'a + Buffer,
Sc: Scope<'a>,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Igather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
ptr::null_mut(),
0,
u8::equivalent_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_gather_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
Sc: Scope<'a>,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
let recvcount = recvbuf.count() / self.as_communicator().size();
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Igather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvcount,
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_gather_varcount_into<'a, Sc, S: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
Sc: Scope<'a>,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Igatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
ptr::null_mut(),
ptr::null(),
ptr::null(),
u8::equivalent_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_gather_varcount_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + PartitionedBufferMut,
Sc: Scope<'a>,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Igatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_scatter_into<'a, Sc, R: ?Sized>(
&self,
scope: Sc,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
R: 'a + BufferMut,
Sc: Scope<'a>,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iscatter(
ptr::null(),
0,
u8::equivalent_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_scatter_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
Sc: Scope<'a>,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
let sendcount = sendbuf.count() / self.as_communicator().size();
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iscatter(
sendbuf.pointer(),
sendcount,
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_scatter_varcount_into<'a, Sc, R: ?Sized>(
&self,
scope: Sc,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
R: 'a + BufferMut,
Sc: Scope<'a>,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iscatterv(
ptr::null(),
ptr::null(),
ptr::null(),
u8::equivalent_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_scatter_varcount_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where
S: 'a + PartitionedBuffer,
R: 'a + BufferMut,
Sc: Scope<'a>,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iscatterv(
sendbuf.pointer(),
sendbuf.counts().as_ptr(),
sendbuf.displs().as_ptr(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_reduce_into<'a, Sc, S: ?Sized, O>(
&self,
scope: Sc,
sendbuf: &'a S,
op: O,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
O: 'a + Operation,
Sc: Scope<'a>,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ireduce(
sendbuf.pointer(),
ptr::null_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_reduce_into_root<'a, Sc, S: ?Sized, R: ?Sized, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
O: 'a + Operation,
Sc: Scope<'a>,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ireduce(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
}
impl<'a, C: 'a + Communicator> Root for Process<'a, C> {
fn root_rank(&self) -> Rank {
self.rank()
}
}
pub trait Operation: AsRaw<Raw = MPI_Op> {
fn is_commutative(&self) -> bool {
unsafe {
let mut commute = 0;
ffi::MPI_Op_commutative(self.as_raw(), &mut commute);
commute != 0
}
}
}
impl<'a, T: 'a + Operation> Operation for &'a T {}
#[derive(Copy, Clone)]
pub struct SystemOperation(MPI_Op);
macro_rules! system_operation_constructors {
($($ctor:ident => $val:path),*) => (
$(pub fn $ctor() -> SystemOperation {
SystemOperation(unsafe { $val })
})*
)
}
impl SystemOperation {
system_operation_constructors! {
max => ffi::RSMPI_MAX,
min => ffi::RSMPI_MIN,
sum => ffi::RSMPI_SUM,
product => ffi::RSMPI_PROD,
logical_and => ffi::RSMPI_LAND,
bitwise_and => ffi::RSMPI_BAND,
logical_or => ffi::RSMPI_LOR,
bitwise_or => ffi::RSMPI_BOR,
logical_xor => ffi::RSMPI_LXOR,
bitwise_xor => ffi::RSMPI_BXOR
}
}
unsafe impl AsRaw for SystemOperation {
type Raw = MPI_Op;
fn as_raw(&self) -> Self::Raw {
self.0
}
}
impl Operation for SystemOperation {}
trait Erased {}
impl<T> Erased for T {}
#[cfg(feature = "user-operations")]
pub struct UserOperation<'a> {
op: MPI_Op,
_anchor: Box<dyn Erased + 'a>, }
#[cfg(feature = "user-operations")]
impl<'a> fmt::Debug for UserOperation<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("UserOperation").field(&self.op).finish()
}
}
#[cfg(feature = "user-operations")]
impl<'a> Drop for UserOperation<'a> {
fn drop(&mut self) {
unsafe {
ffi::MPI_Op_free(&mut self.op);
}
}
}
#[cfg(feature = "user-operations")]
unsafe impl<'a> AsRaw for UserOperation<'a> {
type Raw = MPI_Op;
fn as_raw(&self) -> Self::Raw {
self.op
}
}
#[cfg(feature = "user-operations")]
impl<'a, 'b> Operation for &'b UserOperation<'a> {}
#[cfg(feature = "user-operations")]
impl<'a> UserOperation<'a> {
pub fn associative<F>(function: F) -> Self
where
F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
{
Self::new(false, function)
}
pub fn commutative<F>(function: F) -> Self
where
F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
{
Self::new(true, function)
}
pub fn new<F>(commute: bool, function: F) -> Self
where
F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
{
struct ClosureAnchor<F> {
rust_closure: F,
_ffi_closure: Option<Closure<'static>>,
}
unsafe extern "C" fn trampoline<'a, F: Fn(DynBuffer, DynBufferMut) + Sync + 'a>(
cif: &libffi::low::ffi_cif,
_result: &mut c_void,
args: *const *const c_void,
user_function: &F,
) {
debug_assert_eq!(4, cif.nargs);
let (mut invec, mut inoutvec, len, datatype) = (
*(*args.offset(0) as *const *mut c_void),
*(*args.offset(1) as *const *mut c_void),
*(*args.offset(2) as *const *mut i32),
*(*args.offset(3) as *const *mut ffi::MPI_Datatype),
);
let len = *len;
let datatype = DatatypeRef::from_raw(*datatype);
if len == 0 {
invec = [].as_mut_ptr();
inoutvec = [].as_mut_ptr();
}
user_function(
DynBuffer::from_raw(invec, len, datatype),
DynBufferMut::from_raw(inoutvec, len, datatype),
)
}
let mut anchor = Box::new(ClosureAnchor {
rust_closure: function,
_ffi_closure: None,
});
let args = [
Type::pointer(), Type::pointer(), Type::pointer(), Type::pointer(), ];
#[allow(unused_mut)]
let mut cif = Cif::new(args.iter().cloned(), Type::void());
#[cfg(all(msmpi, target_arch = "x86"))]
cif.set_abi(libffi::raw::ffi_abi_FFI_STDCALL);
let op;
anchor._ffi_closure = Some(unsafe {
let ffi_closure = Closure::new(cif, trampoline, &anchor.rust_closure);
op = with_uninitialized(|op| {
ffi::MPI_Op_create(Some(*ffi_closure.instantiate_code_ptr()), commute as _, op)
})
.1;
mem::transmute(ffi_closure) });
UserOperation {
op,
_anchor: anchor,
}
}
pub unsafe fn from_raw<T: 'a>(op: MPI_Op, anchor: Box<T>) -> Self {
Self {
op,
_anchor: anchor,
}
}
}
pub struct UnsafeUserOperation {
op: MPI_Op,
}
impl fmt::Debug for UnsafeUserOperation {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("UnsafeUserOperation")
.field(&self.op)
.finish()
}
}
impl Drop for UnsafeUserOperation {
fn drop(&mut self) {
unsafe {
ffi::MPI_Op_free(&mut self.op);
}
}
}
unsafe impl AsRaw for UnsafeUserOperation {
type Raw = MPI_Op;
fn as_raw(&self) -> Self::Raw {
self.op
}
}
impl<'a> Operation for &'a UnsafeUserOperation {}
#[cfg(not(all(msmpi, target_arch = "x86")))]
pub type UnsafeUserFunction =
unsafe extern "C" fn(*mut c_void, *mut c_void, *mut c_int, *mut ffi::MPI_Datatype);
#[cfg(all(msmpi, target_arch = "x86"))]
pub type UnsafeUserFunction =
unsafe extern "stdcall" fn(*mut c_void, *mut c_void, *mut c_int, *mut ffi::MPI_Datatype);
impl UnsafeUserOperation {
pub unsafe fn associative(function: UnsafeUserFunction) -> Self {
Self::new(false, function)
}
pub unsafe fn commutative(function: UnsafeUserFunction) -> Self {
Self::new(true, function)
}
pub unsafe fn new(commute: bool, function: UnsafeUserFunction) -> Self {
UnsafeUserOperation {
op: with_uninitialized(|op| ffi::MPI_Op_create(Some(function), commute as _, op)).1,
}
}
}
#[allow(clippy::needless_pass_by_value)]
pub fn reduce_local_into<S: ?Sized, R: ?Sized, O>(inbuf: &S, inoutbuf: &mut R, op: O)
where
S: Buffer,
R: BufferMut,
O: Operation,
{
unsafe {
ffi::MPI_Reduce_local(
inbuf.pointer(),
inoutbuf.pointer_mut(),
inbuf.count(),
inbuf.as_datatype().as_raw(),
op.as_raw(),
);
}
}