#[cfg(feature = "user-operations")]
use std::mem;
use std::{
ffi::{CString, NulError},
fmt,
os::raw::{c_char, c_int, c_void},
process::Command,
ptr,
};
use conv::ConvUtil;
#[cfg(feature = "user-operations")]
use libffi::middle::{Cif, Closure, Type};
#[cfg(feature = "user-operations")]
use crate::datatype::{DatatypeRef, DynBuffer, DynBufferMut};
use crate::{
datatype::traits::*,
ffi,
ffi::MPI_Op,
raw::traits::*,
request::{Request, Scope, StaticScope},
topology::{traits::*, InterCommunicator, Process, Rank},
with_uninitialized, MpiError,
};
pub mod traits {
pub use super::{CommunicatorCollectives, Operation, Root};
}
pub trait CommunicatorCollectives: Communicator {
fn barrier(&self) {
unsafe {
ffi::MPI_Barrier(self.as_raw());
}
}
fn all_gather_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: BufferMut,
{
unsafe {
ffi::MPI_Allgather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count() / self.target_size(),
recvbuf.as_datatype().as_raw(),
self.as_raw(),
);
}
}
fn all_gather_varcount_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: PartitionedBufferMut,
{
unsafe {
ffi::MPI_Allgatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.as_raw(),
);
}
}
fn all_to_all_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: BufferMut,
{
let c_size = self.target_size();
unsafe {
ffi::MPI_Alltoall(
sendbuf.pointer(),
sendbuf.count() / c_size,
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count() / c_size,
recvbuf.as_datatype().as_raw(),
self.as_raw(),
);
}
}
fn all_to_all_varcount_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: PartitionedBuffer,
R: PartitionedBufferMut,
{
unsafe {
ffi::MPI_Alltoallv(
sendbuf.pointer(),
sendbuf.counts().as_ptr(),
sendbuf.displs().as_ptr(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.as_raw(),
);
}
}
fn all_reduce_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where
S: Buffer,
R: BufferMut,
O: Operation,
{
unsafe {
ffi::MPI_Allreduce(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
);
}
}
fn reduce_scatter_block_into<S: ?Sized, R: ?Sized, O>(
&self,
sendbuf: &S,
recvbuf: &mut R,
op: O,
) where
S: Buffer,
R: BufferMut,
O: Operation,
{
assert_eq!(recvbuf.count() * self.target_size(), sendbuf.count());
unsafe {
ffi::MPI_Reduce_scatter_block(
sendbuf.pointer(),
recvbuf.pointer_mut(),
recvbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
);
}
}
fn scan_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where
S: Buffer,
R: BufferMut,
O: Operation,
{
unsafe {
ffi::MPI_Scan(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
);
}
}
fn exclusive_scan_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where
S: Buffer,
R: BufferMut,
O: Operation,
{
unsafe {
ffi::MPI_Exscan(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
);
}
}
fn immediate_barrier(&self) -> Request<'static, ()> {
unsafe {
Request::from_raw(
with_uninitialized(|request| ffi::MPI_Ibarrier(self.as_raw(), request)).1,
&(),
StaticScope,
)
}
}
fn immediate_all_gather_into<'a, S: ?Sized, R: ?Sized, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
Sc: Scope<'a>,
{
unsafe {
let recvcount = recvbuf.count() / self.target_size();
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iallgather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvcount,
recvbuf.as_datatype().as_raw(),
self.as_raw(),
request,
)
})
.1,
recvbuf,
scope,
)
}
}
fn immediate_all_gather_varcount_into<'a, S: ?Sized, R: ?Sized, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
where
S: 'a + Buffer,
R: 'a + PartitionedBufferMut,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iallgatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.as_raw(),
request,
)
})
.1,
recvbuf,
scope,
)
}
}
fn immediate_all_to_all_into<'a, S: ?Sized, R: ?Sized, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
Sc: Scope<'a>,
{
let c_size = self.target_size();
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ialltoall(
sendbuf.pointer(),
sendbuf.count() / c_size,
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count() / c_size,
recvbuf.as_datatype().as_raw(),
self.as_raw(),
request,
)
})
.1,
recvbuf,
scope,
)
}
}
fn immediate_all_to_all_varcount_into<'a, S: ?Sized, R: ?Sized, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
where
S: 'a + PartitionedBuffer,
R: 'a + PartitionedBufferMut,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ialltoallv(
sendbuf.pointer(),
sendbuf.counts().as_ptr(),
sendbuf.displs().as_ptr(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.as_raw(),
request,
)
})
.1,
recvbuf,
scope,
)
}
}
fn immediate_all_reduce_into<'a, S: ?Sized, R: ?Sized, O, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, R, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
O: 'a + Operation,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iallreduce(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
request,
)
})
.1,
recvbuf,
scope,
)
}
}
fn immediate_reduce_scatter_block_into<'a, S: ?Sized, R: ?Sized, O, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, R, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
O: 'a + Operation,
Sc: Scope<'a>,
{
assert_eq!(recvbuf.count() * self.target_size(), sendbuf.count());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ireduce_scatter_block(
sendbuf.pointer(),
recvbuf.pointer_mut(),
recvbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
request,
)
})
.1,
recvbuf,
scope,
)
}
}
fn immediate_scan_into<'a, S: ?Sized, R: ?Sized, O, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, R, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
O: 'a + Operation,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iscan(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
request,
)
})
.1,
recvbuf,
scope,
)
}
}
fn immediate_exclusive_scan_into<'a, S: ?Sized, R: ?Sized, O, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, R, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
O: 'a + Operation,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iexscan(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.as_raw(),
request,
)
})
.1,
recvbuf,
scope,
)
}
}
}
impl<C: Communicator + ?Sized> CommunicatorCollectives for C {}
pub trait Root: AsCommunicator {
fn root_rank(&self) -> Rank;
fn broadcast_into<Buf: ?Sized>(&self, buffer: &mut Buf)
where
Buf: BufferMut,
{
unsafe {
ffi::MPI_Bcast(
buffer.pointer_mut(),
buffer.count(),
buffer.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn gather_into<S: ?Sized>(&self, sendbuf: &S)
where
S: Buffer,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Gather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
ptr::null_mut(),
0,
u8::equivalent_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn gather_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: BufferMut,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
let recvcount = recvbuf.count() / self.as_communicator().target_size();
ffi::MPI_Gather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvcount,
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn gather_varcount_into<S: ?Sized>(&self, sendbuf: &S)
where
S: Buffer,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Gatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
ptr::null_mut(),
ptr::null(),
ptr::null(),
u8::equivalent_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn gather_varcount_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: PartitionedBufferMut,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Gatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn scatter_into<R: ?Sized>(&self, recvbuf: &mut R)
where
R: BufferMut,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Scatter(
ptr::null(),
0,
u8::equivalent_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn scatter_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: Buffer,
R: BufferMut,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
let sendcount = sendbuf.count() / self.as_communicator().target_size();
unsafe {
ffi::MPI_Scatter(
sendbuf.pointer(),
sendcount,
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn scatter_varcount_into<R: ?Sized>(&self, recvbuf: &mut R)
where
R: BufferMut,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Scatterv(
ptr::null(),
ptr::null(),
ptr::null(),
u8::equivalent_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn scatter_varcount_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
where
S: PartitionedBuffer,
R: BufferMut,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Scatterv(
sendbuf.pointer(),
sendbuf.counts().as_ptr(),
sendbuf.displs().as_ptr(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn reduce_into<S: ?Sized, O>(&self, sendbuf: &S, op: O)
where
S: Buffer,
O: Operation,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Reduce(
sendbuf.pointer(),
ptr::null_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn reduce_into_root<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where
S: Buffer,
R: BufferMut,
O: Operation,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
ffi::MPI_Reduce(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
);
}
}
fn immediate_broadcast_into<'a, Buf: ?Sized, Sc>(
&self,
scope: Sc,
buf: &'a mut Buf,
) -> Request<'a, Buf, Sc>
where
Buf: 'a + BufferMut,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ibcast(
buf.pointer_mut(),
buf.count(),
buf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
buf,
scope,
)
}
}
fn immediate_gather_into<'a, S: ?Sized, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
) -> Request<'a, S, Sc>
where
S: 'a + Buffer,
Sc: Scope<'a>,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Igather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
ptr::null_mut(),
0,
u8::equivalent_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
sendbuf,
scope,
)
}
}
fn immediate_gather_into_root<'a, S: ?Sized, R: ?Sized, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
Sc: Scope<'a>,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
let recvcount = recvbuf.count() / self.as_communicator().target_size();
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Igather(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvcount,
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
recvbuf,
scope,
)
}
}
fn immediate_gather_varcount_into<'a, Sc, S: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
) -> Request<'a, S, Sc>
where
S: 'a + Buffer,
Sc: Scope<'a>,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Igatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
ptr::null_mut(),
ptr::null(),
ptr::null(),
u8::equivalent_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
sendbuf,
scope,
)
}
}
fn immediate_gather_varcount_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
where
S: 'a + Buffer,
R: 'a + PartitionedBufferMut,
Sc: Scope<'a>,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Igatherv(
sendbuf.pointer(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.counts().as_ptr(),
recvbuf.displs().as_ptr(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
recvbuf,
scope,
)
}
}
fn immediate_scatter_into<'a, Sc, R: ?Sized>(
&self,
scope: Sc,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
where
R: 'a + BufferMut,
Sc: Scope<'a>,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iscatter(
ptr::null(),
0,
u8::equivalent_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
recvbuf,
scope,
)
}
}
fn immediate_scatter_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
Sc: Scope<'a>,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
let sendcount = sendbuf.count() / self.as_communicator().target_size();
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iscatter(
sendbuf.pointer(),
sendcount,
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
recvbuf,
scope,
)
}
}
fn immediate_scatter_varcount_into<'a, Sc, R: ?Sized>(
&self,
scope: Sc,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
where
R: 'a + BufferMut,
Sc: Scope<'a>,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iscatterv(
ptr::null(),
ptr::null(),
ptr::null(),
u8::equivalent_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
recvbuf,
scope,
)
}
}
fn immediate_scatter_varcount_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
where
S: 'a + PartitionedBuffer,
R: 'a + BufferMut,
Sc: Scope<'a>,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Iscatterv(
sendbuf.pointer(),
sendbuf.counts().as_ptr(),
sendbuf.displs().as_ptr(),
sendbuf.as_datatype().as_raw(),
recvbuf.pointer_mut(),
recvbuf.count(),
recvbuf.as_datatype().as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
recvbuf,
scope,
)
}
}
fn immediate_reduce_into<'a, Sc, S: ?Sized, O>(
&self,
scope: Sc,
sendbuf: &'a S,
op: O,
) -> Request<'a, S, Sc>
where
S: 'a + Buffer,
O: 'a + Operation,
Sc: Scope<'a>,
{
assert_ne!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ireduce(
sendbuf.pointer(),
ptr::null_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
sendbuf,
scope,
)
}
}
fn immediate_reduce_into_root<'a, Sc, S: ?Sized, R: ?Sized, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, R, Sc>
where
S: 'a + Buffer,
R: 'a + BufferMut,
O: 'a + Operation,
Sc: Scope<'a>,
{
assert_eq!(self.as_communicator().rank(), self.root_rank());
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ireduce(
sendbuf.pointer(),
recvbuf.pointer_mut(),
sendbuf.count(),
sendbuf.as_datatype().as_raw(),
op.as_raw(),
self.root_rank(),
self.as_communicator().as_raw(),
request,
)
})
.1,
recvbuf,
scope,
)
}
}
fn spawn(&self, command: &Command, maxprocs: Rank) -> Result<InterCommunicator, MpiError> {
assert_eq!(
command.get_envs().len(),
0,
"Support for environment variables not yet implemented"
);
let prog = CString::new(command.get_program().to_string_lossy().as_bytes())?;
let mut args: Vec<CString> = command
.get_args()
.map(|os| CString::new(os.to_string_lossy().as_bytes()))
.collect::<Result<Vec<CString>, NulError>>()?;
let mut argv: Vec<*mut c_char> = args
.iter_mut()
.map(|s| s.as_ptr() as *mut c_char)
.chain(std::iter::once(ptr::null_mut()))
.collect();
let mut result = unsafe { ffi::RSMPI_COMM_NULL };
let mut errcodes: Vec<c_int> =
vec![0; maxprocs.value_as().expect("maxprocs should be positive")];
unsafe {
ffi::MPI_Comm_spawn(
prog.as_ptr(),
argv.as_mut_ptr(),
maxprocs,
ffi::RSMPI_INFO_NULL,
self.root_rank(),
self.as_communicator().as_raw(),
&mut result,
errcodes.as_mut_ptr(),
);
}
let fails = errcodes
.into_iter()
.filter(|&c| c != ffi::MPI_SUCCESS as i32)
.count();
if fails > 0 {
Err(MpiError::Spawn(Rank::try_from(fails).unwrap(), maxprocs))
} else {
Ok(unsafe { InterCommunicator::from_raw(result) })
}
}
fn spawn_multiple(
&self,
commands: &[Command],
maxprocs: &[Rank],
) -> Result<InterCommunicator, MpiError> {
assert_eq!(commands.len(), maxprocs.len());
let progs = commands
.iter()
.map(|c| CString::new(c.get_program().to_string_lossy().as_bytes()))
.collect::<Result<Vec<CString>, NulError>>()?;
let mut progp: Vec<*mut c_char> = progs.iter().map(|p| p.as_ptr() as *mut c_char).collect();
let mut argss = commands
.iter()
.map(|c| {
c.get_args()
.map(|os| CString::new(os.to_string_lossy().as_bytes()))
.collect::<Result<Vec<CString>, NulError>>()
})
.collect::<Result<Vec<Vec<CString>>, NulError>>()?;
let mut argvs: Vec<Vec<*mut c_char>> = argss
.iter_mut()
.map(|args| {
args.iter_mut()
.map(|a| a.as_ptr() as *mut c_char)
.chain(std::iter::once(ptr::null_mut()))
.collect()
})
.collect();
let mut argvv: Vec<*mut *mut c_char> =
argvs.iter_mut().map(|argv| argv.as_mut_ptr()).collect();
let infos: Vec<_> = (0..commands.len())
.map(|_| unsafe { ffi::RSMPI_INFO_NULL })
.collect();
let mut result = unsafe { ffi::RSMPI_COMM_NULL };
let sum_maxprocs: Rank = maxprocs.iter().sum();
let mut errcodes = vec![0; usize::try_from(sum_maxprocs).unwrap()];
unsafe {
ffi::MPI_Comm_spawn_multiple(
progs.len().value_as().unwrap(),
progp.as_mut_ptr(),
argvv.as_mut_ptr(),
maxprocs.as_ptr(),
infos.as_ptr(),
self.root_rank(),
self.as_communicator().as_raw(),
&mut result,
errcodes.as_mut_ptr(),
);
}
let fails = errcodes
.into_iter()
.filter(|&c| c != ffi::MPI_SUCCESS as i32)
.count();
if fails > 0 {
Err(MpiError::Spawn(
Rank::try_from(fails).unwrap(),
sum_maxprocs,
))
} else {
Ok(unsafe { InterCommunicator::from_raw(result) })
}
}
}
impl<'a> Root for Process<'a> {
fn root_rank(&self) -> Rank {
self.rank()
}
}
pub trait Operation: AsRaw<Raw = MPI_Op> {
fn is_commutative(&self) -> bool {
unsafe {
let mut commute = 0;
ffi::MPI_Op_commutative(self.as_raw(), &mut commute);
commute != 0
}
}
}
impl<'a, T: 'a + Operation> Operation for &'a T {}
#[derive(Copy, Clone)]
pub struct SystemOperation(MPI_Op);
macro_rules! system_operation_constructors {
($($ctor:ident => $val:path),*) => (
$(pub fn $ctor() -> SystemOperation {
SystemOperation(unsafe { $val })
})*
)
}
impl SystemOperation {
system_operation_constructors! {
max => ffi::RSMPI_MAX,
min => ffi::RSMPI_MIN,
sum => ffi::RSMPI_SUM,
product => ffi::RSMPI_PROD,
logical_and => ffi::RSMPI_LAND,
bitwise_and => ffi::RSMPI_BAND,
logical_or => ffi::RSMPI_LOR,
bitwise_or => ffi::RSMPI_BOR,
logical_xor => ffi::RSMPI_LXOR,
bitwise_xor => ffi::RSMPI_BXOR
}
}
unsafe impl AsRaw for SystemOperation {
type Raw = MPI_Op;
fn as_raw(&self) -> Self::Raw {
self.0
}
}
impl Operation for SystemOperation {}
#[cfg(feature = "user-operations")]
trait Erased {}
#[cfg(feature = "user-operations")]
impl<T> Erased for T {}
#[cfg(feature = "user-operations")]
pub struct UserOperation<'a> {
op: MPI_Op,
_anchor: Box<dyn Erased + 'a>, }
#[cfg(feature = "user-operations")]
impl<'a> fmt::Debug for UserOperation<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("UserOperation").field(&self.op).finish()
}
}
#[cfg(feature = "user-operations")]
impl<'a> Drop for UserOperation<'a> {
fn drop(&mut self) {
unsafe {
ffi::MPI_Op_free(&mut self.op);
}
}
}
#[cfg(feature = "user-operations")]
unsafe impl<'a> AsRaw for UserOperation<'a> {
type Raw = MPI_Op;
fn as_raw(&self) -> Self::Raw {
self.op
}
}
#[cfg(feature = "user-operations")]
impl<'a, 'b> Operation for &'b UserOperation<'a> {}
#[cfg(feature = "user-operations")]
impl<'a> UserOperation<'a> {
pub fn associative<F>(function: F) -> Self
where
F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
{
Self::new(false, function)
}
pub fn commutative<F>(function: F) -> Self
where
F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
{
Self::new(true, function)
}
pub fn new<F>(commute: bool, function: F) -> Self
where
F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
{
struct ClosureAnchor<F> {
rust_closure: F,
_ffi_closure: Option<Closure<'static>>,
}
let mut anchor = Box::new(ClosureAnchor {
rust_closure: function,
_ffi_closure: None,
});
let args = [
Type::pointer(), Type::pointer(), Type::pointer(), Type::pointer(), ];
#[allow(unused_mut)]
let mut cif = Cif::new(args.iter().cloned(), Type::void());
#[cfg(all(msmpi, target_arch = "x86"))]
cif.set_abi(libffi::raw::ffi_abi_FFI_STDCALL);
unsafe extern "C" fn trampoline<'a, F: Fn(DynBuffer, DynBufferMut) + Sync + 'a>(
cif: &libffi::low::ffi_cif,
_result: &mut c_void,
args: *const *const c_void,
user_function: &F,
) {
debug_assert_eq!(4, cif.nargs);
let (mut invec, mut inoutvec, len, datatype) = (
*(*args.offset(0) as *const *mut c_void),
*(*args.offset(1) as *const *mut c_void),
*(*args.offset(2) as *const *mut i32),
*(*args.offset(3) as *const *mut ffi::MPI_Datatype),
);
let len = *len;
let datatype = DatatypeRef::from_raw(*datatype);
if len == 0 {
invec = [].as_mut_ptr();
inoutvec = [].as_mut_ptr();
}
user_function(
DynBuffer::from_raw(invec, len, datatype),
DynBufferMut::from_raw(inoutvec, len, datatype),
)
}
let op;
anchor._ffi_closure = Some(unsafe {
let ffi_closure = Closure::new(cif, trampoline, &anchor.rust_closure);
op = with_uninitialized(|op| {
ffi::MPI_Op_create(Some(*ffi_closure.instantiate_code_ptr()), commute as _, op)
})
.1;
mem::transmute(ffi_closure) });
UserOperation {
op,
_anchor: anchor,
}
}
pub unsafe fn from_raw<T: 'a>(op: MPI_Op, anchor: Box<T>) -> Self {
Self {
op,
_anchor: anchor,
}
}
}
pub struct UnsafeUserOperation {
op: MPI_Op,
}
impl fmt::Debug for UnsafeUserOperation {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("UnsafeUserOperation")
.field(&self.op)
.finish()
}
}
impl Drop for UnsafeUserOperation {
fn drop(&mut self) {
unsafe {
ffi::MPI_Op_free(&mut self.op);
}
}
}
unsafe impl AsRaw for UnsafeUserOperation {
type Raw = MPI_Op;
fn as_raw(&self) -> Self::Raw {
self.op
}
}
impl<'a> Operation for &'a UnsafeUserOperation {}
#[cfg(not(all(msmpi, target_arch = "x86")))]
pub type UnsafeUserFunction =
unsafe extern "C" fn(*mut c_void, *mut c_void, *mut c_int, *mut ffi::MPI_Datatype);
#[cfg(all(msmpi, target_arch = "x86"))]
pub type UnsafeUserFunction =
unsafe extern "stdcall" fn(*mut c_void, *mut c_void, *mut c_int, *mut ffi::MPI_Datatype);
impl UnsafeUserOperation {
pub unsafe fn associative(function: UnsafeUserFunction) -> Self {
Self::new(false, function)
}
pub unsafe fn commutative(function: UnsafeUserFunction) -> Self {
Self::new(true, function)
}
pub unsafe fn new(commute: bool, function: UnsafeUserFunction) -> Self {
UnsafeUserOperation {
op: with_uninitialized(|op| ffi::MPI_Op_create(Some(function), commute as _, op)).1,
}
}
}
#[allow(clippy::needless_pass_by_value)]
pub fn reduce_local_into<S: ?Sized, R: ?Sized, O>(inbuf: &S, inoutbuf: &mut R, op: O)
where
S: Buffer,
R: BufferMut,
O: Operation,
{
unsafe {
ffi::MPI_Reduce_local(
inbuf.pointer(),
inoutbuf.pointer_mut(),
inbuf.count(),
inbuf.as_datatype().as_raw(),
op.as_raw(),
);
}
}