use crate::datatype::MpiDatatype;
use crate::error::{Error, Result};
use crate::ffi;
use crate::persistent::PersistentRequest;
use crate::request::Request;
use crate::status::Status;
use crate::ReduceOp;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(i32)]
pub enum SplitType {
Shared = 0,
}
pub struct Communicator {
handle: i32,
}
unsafe impl Send for Communicator {}
unsafe impl Sync for Communicator {}
impl Communicator {
pub const UNDEFINED: i32 = -1;
pub(crate) fn world() -> Self {
Communicator {
handle: unsafe { ffi::ferrompi_comm_world() },
}
}
pub fn raw_handle(&self) -> i32 {
self.handle
}
pub fn rank(&self) -> i32 {
let mut rank: i32 = 0;
let ret = unsafe { ffi::ferrompi_comm_rank(self.handle, &mut rank) };
debug_assert_eq!(ret, 0, "MPI_Comm_rank failed with code {ret}");
rank
}
pub fn size(&self) -> i32 {
let mut size: i32 = 0;
let ret = unsafe { ffi::ferrompi_comm_size(self.handle, &mut size) };
debug_assert_eq!(ret, 0, "MPI_Comm_size failed with code {ret}");
size
}
pub fn processor_name(&self) -> Result<String> {
let mut buf = [0u8; 256];
let mut len: i32 = 0;
let ret = unsafe {
ffi::ferrompi_get_processor_name(buf.as_mut_ptr().cast::<std::ffi::c_char>(), &mut len)
};
Error::check(ret)?;
let len = (len.max(0) as usize).min(buf.len());
let s = std::str::from_utf8(&buf[..len])
.map_err(|_| Error::Internal("Invalid UTF-8 in processor name".into()))?;
Ok(s.to_string())
}
pub fn topology(&self, mpi: &crate::Mpi) -> Result<crate::TopologyInfo> {
crate::topology::gather_topology(self, mpi)
}
pub fn duplicate(&self) -> Result<Self> {
let mut new_handle: i32 = 0;
let ret = unsafe { ffi::ferrompi_comm_dup(self.handle, &mut new_handle) };
Error::check(ret)?;
Ok(Communicator { handle: new_handle })
}
pub fn split(&self, color: i32, key: i32) -> Result<Option<Communicator>> {
let mut new_handle: i32 = 0;
let ret = unsafe { ffi::ferrompi_comm_split(self.handle, color, key, &mut new_handle) };
Error::check(ret)?;
if new_handle < 0 {
Ok(None)
} else {
Ok(Some(Communicator { handle: new_handle }))
}
}
pub fn split_type(&self, split_type: SplitType, key: i32) -> Result<Option<Communicator>> {
let mut new_handle: i32 = 0;
let ret = unsafe {
ffi::ferrompi_comm_split_type(self.handle, split_type as i32, key, &mut new_handle)
};
Error::check(ret)?;
if new_handle < 0 {
Ok(None)
} else {
Ok(Some(Communicator { handle: new_handle }))
}
}
pub fn split_shared(&self) -> Result<Communicator> {
self.split_type(SplitType::Shared, self.rank())?
.ok_or_else(|| Error::Internal("split_shared returned null communicator".into()))
}
pub fn barrier(&self) -> Result<()> {
let ret = unsafe { ffi::ferrompi_barrier(self.handle) };
Error::check(ret)
}
pub fn send<T: MpiDatatype>(&self, data: &[T], dest: i32, tag: i32) -> Result<()> {
let ret = unsafe {
ffi::ferrompi_send(
data.as_ptr().cast::<std::ffi::c_void>(),
data.len() as i64,
T::TAG as i32,
dest,
tag,
self.handle,
)
};
Error::check(ret)
}
pub fn recv<T: MpiDatatype>(
&self,
data: &mut [T],
source: i32,
tag: i32,
) -> Result<(i32, i32, i64)> {
let mut actual_source: i32 = 0;
let mut actual_tag: i32 = 0;
let mut actual_count: i64 = 0;
let ret = unsafe {
ffi::ferrompi_recv(
data.as_mut_ptr().cast::<std::ffi::c_void>(),
data.len() as i64,
T::TAG as i32,
source,
tag,
self.handle,
&mut actual_source,
&mut actual_tag,
&mut actual_count,
)
};
Error::check(ret)?;
Ok((actual_source, actual_tag, actual_count))
}
pub fn isend<T: MpiDatatype>(&self, data: &[T], dest: i32, tag: i32) -> Result<Request> {
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_isend(
data.as_ptr().cast::<std::ffi::c_void>(),
data.len() as i64,
T::TAG as i32,
dest,
tag,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn irecv<T: MpiDatatype>(&self, data: &mut [T], source: i32, tag: i32) -> Result<Request> {
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_irecv(
data.as_mut_ptr().cast::<std::ffi::c_void>(),
data.len() as i64,
T::TAG as i32,
source,
tag,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn sendrecv<T: MpiDatatype>(
&self,
send: &[T],
dest: i32,
sendtag: i32,
recv: &mut [T],
source: i32,
recvtag: i32,
) -> Result<(i32, i32, i64)> {
let mut actual_source: i32 = 0;
let mut actual_tag: i32 = 0;
let mut actual_count: i64 = 0;
let ret = unsafe {
ffi::ferrompi_sendrecv(
send.as_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
dest,
sendtag,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recv.len() as i64,
T::TAG as i32,
source,
recvtag,
self.handle,
&mut actual_source,
&mut actual_tag,
&mut actual_count,
)
};
Error::check(ret)?;
Ok((actual_source, actual_tag, actual_count))
}
pub fn probe<T: MpiDatatype>(&self, source: i32, tag: i32) -> Result<Status> {
let mut actual_source: i32 = 0;
let mut actual_tag: i32 = 0;
let mut count: i64 = 0;
let ret = unsafe {
ffi::ferrompi_probe(
source,
tag,
self.handle,
&mut actual_source,
&mut actual_tag,
&mut count,
T::TAG as i32,
)
};
Error::check(ret)?;
Ok(Status {
source: actual_source,
tag: actual_tag,
count,
})
}
pub fn iprobe<T: MpiDatatype>(&self, source: i32, tag: i32) -> Result<Option<Status>> {
let mut flag: i32 = 0;
let mut actual_source: i32 = 0;
let mut actual_tag: i32 = 0;
let mut count: i64 = 0;
let ret = unsafe {
ffi::ferrompi_iprobe(
source,
tag,
self.handle,
&mut flag,
&mut actual_source,
&mut actual_tag,
&mut count,
T::TAG as i32,
)
};
Error::check(ret)?;
if flag != 0 {
Ok(Some(Status {
source: actual_source,
tag: actual_tag,
count,
}))
} else {
Ok(None)
}
}
pub fn broadcast<T: MpiDatatype>(&self, data: &mut [T], root: i32) -> Result<()> {
let ret = unsafe {
ffi::ferrompi_bcast(
data.as_mut_ptr().cast::<std::ffi::c_void>(),
data.len() as i64,
T::TAG as i32,
root,
self.handle,
)
};
Error::check(ret)
}
pub fn reduce<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
op: ReduceOp,
root: i32,
) -> Result<()> {
if send.len() != recv.len() {
return Err(Error::InvalidBuffer);
}
let ret = unsafe {
ffi::ferrompi_reduce(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
op as i32,
root,
self.handle,
)
};
Error::check(ret)
}
pub fn reduce_scalar<T: MpiDatatype>(&self, value: T, op: ReduceOp, root: i32) -> Result<T> {
let send = [value];
let mut recv = [value]; self.reduce(&send, &mut recv, op, root)?;
Ok(recv[0])
}
pub fn reduce_inplace<T: MpiDatatype>(
&self,
data: &mut [T],
op: ReduceOp,
root: i32,
) -> Result<()> {
let is_root = if self.rank() == root { 1i32 } else { 0i32 };
let ret = unsafe {
ffi::ferrompi_reduce_inplace(
data.as_mut_ptr().cast::<std::ffi::c_void>(),
data.len() as i64,
T::TAG as i32,
op as i32,
root,
is_root,
self.handle,
)
};
Error::check(ret)
}
pub fn allreduce<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
op: ReduceOp,
) -> Result<()> {
if send.len() != recv.len() {
return Err(Error::InvalidBuffer);
}
let ret = unsafe {
ffi::ferrompi_allreduce(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
op as i32,
self.handle,
)
};
Error::check(ret)
}
pub fn allreduce_inplace<T: MpiDatatype>(&self, data: &mut [T], op: ReduceOp) -> Result<()> {
let ret = unsafe {
ffi::ferrompi_allreduce_inplace(
data.as_mut_ptr().cast::<std::ffi::c_void>(),
data.len() as i64,
T::TAG as i32,
op as i32,
self.handle,
)
};
Error::check(ret)
}
pub fn allreduce_scalar<T: MpiDatatype>(&self, value: T, op: ReduceOp) -> Result<T> {
let send = [value];
let mut recv = [value]; self.allreduce(&send, &mut recv, op)?;
Ok(recv[0])
}
pub fn scan<T: MpiDatatype>(&self, send: &[T], recv: &mut [T], op: ReduceOp) -> Result<()> {
if send.len() != recv.len() {
return Err(Error::InvalidBuffer);
}
let ret = unsafe {
ffi::ferrompi_scan(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
op as i32,
self.handle,
)
};
Error::check(ret)
}
pub fn exscan<T: MpiDatatype>(&self, send: &[T], recv: &mut [T], op: ReduceOp) -> Result<()> {
if send.len() != recv.len() {
return Err(Error::InvalidBuffer);
}
let ret = unsafe {
ffi::ferrompi_exscan(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
op as i32,
self.handle,
)
};
Error::check(ret)
}
pub fn scan_scalar<T: MpiDatatype>(&self, value: T, op: ReduceOp) -> Result<T> {
let send = [value];
let mut recv = [value]; self.scan(&send, &mut recv, op)?;
Ok(recv[0])
}
pub fn exscan_scalar<T: MpiDatatype>(&self, value: T, op: ReduceOp) -> Result<T> {
let send = [value];
let mut recv = [value]; self.exscan(&send, &mut recv, op)?;
Ok(recv[0])
}
pub fn gather<T: MpiDatatype>(&self, send: &[T], recv: &mut [T], root: i32) -> Result<()> {
let ret = unsafe {
ffi::ferrompi_gather(
send.as_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
root,
self.handle,
)
};
Error::check(ret)
}
pub fn allgather<T: MpiDatatype>(&self, send: &[T], recv: &mut [T]) -> Result<()> {
let ret = unsafe {
ffi::ferrompi_allgather(
send.as_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
self.handle,
)
};
Error::check(ret)
}
pub fn scatter<T: MpiDatatype>(&self, send: &[T], recv: &mut [T], root: i32) -> Result<()> {
let ret = unsafe {
ffi::ferrompi_scatter(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.len() as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recv.len() as i64,
T::TAG as i32,
root,
self.handle,
)
};
Error::check(ret)
}
pub fn alltoall<T: MpiDatatype>(&self, send: &[T], recv: &mut [T]) -> Result<()> {
let size = self.size() as usize;
if send.len() != recv.len() || send.len() % size != 0 {
return Err(Error::InvalidBuffer);
}
let count = (send.len() / size) as i64;
let ret = unsafe {
ffi::ferrompi_alltoall(
send.as_ptr().cast::<std::ffi::c_void>(),
count,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
count,
T::TAG as i32,
self.handle,
)
};
Error::check(ret)
}
pub fn reduce_scatter_block<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
op: ReduceOp,
) -> Result<()> {
let size = self.size() as usize;
if send.len() != recv.len() * size {
return Err(Error::InvalidBuffer);
}
let ret = unsafe {
ffi::ferrompi_reduce_scatter_block(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recv.len() as i64,
T::TAG as i32,
op as i32,
self.handle,
)
};
Error::check(ret)
}
pub fn gatherv<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
recvcounts: &[i32],
displs: &[i32],
root: i32,
) -> Result<()> {
let ret = unsafe {
ffi::ferrompi_gatherv(
send.as_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recvcounts.as_ptr(),
displs.as_ptr(),
T::TAG as i32,
root,
self.handle,
)
};
Error::check(ret)
}
pub fn scatterv<T: MpiDatatype>(
&self,
send: &[T],
sendcounts: &[i32],
displs: &[i32],
recv: &mut [T],
root: i32,
) -> Result<()> {
let ret = unsafe {
ffi::ferrompi_scatterv(
send.as_ptr().cast::<std::ffi::c_void>(),
sendcounts.as_ptr(),
displs.as_ptr(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recv.len() as i64,
T::TAG as i32,
root,
self.handle,
)
};
Error::check(ret)
}
pub fn allgatherv<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
recvcounts: &[i32],
displs: &[i32],
) -> Result<()> {
let ret = unsafe {
ffi::ferrompi_allgatherv(
send.as_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recvcounts.as_ptr(),
displs.as_ptr(),
T::TAG as i32,
self.handle,
)
};
Error::check(ret)
}
pub fn alltoallv<T: MpiDatatype>(
&self,
send: &[T],
sendcounts: &[i32],
sdispls: &[i32],
recv: &mut [T],
recvcounts: &[i32],
rdispls: &[i32],
) -> Result<()> {
let ret = unsafe {
ffi::ferrompi_alltoallv(
send.as_ptr().cast::<std::ffi::c_void>(),
sendcounts.as_ptr(),
sdispls.as_ptr(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recvcounts.as_ptr(),
rdispls.as_ptr(),
T::TAG as i32,
self.handle,
)
};
Error::check(ret)
}
pub fn ibroadcast<T: MpiDatatype>(&self, data: &mut [T], root: i32) -> Result<Request> {
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_ibcast(
data.as_mut_ptr().cast::<std::ffi::c_void>(),
data.len() as i64,
T::TAG as i32,
root,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn iallreduce<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
op: ReduceOp,
) -> Result<Request> {
if send.len() != recv.len() {
return Err(Error::InvalidBuffer);
}
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_iallreduce(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
op as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn ireduce<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
op: ReduceOp,
root: i32,
) -> Result<Request> {
if send.len() != recv.len() {
return Err(Error::InvalidBuffer);
}
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_ireduce(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
op as i32,
root,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn igather<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
root: i32,
) -> Result<Request> {
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_igather(
send.as_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
root,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn iallgather<T: MpiDatatype>(&self, send: &[T], recv: &mut [T]) -> Result<Request> {
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_iallgather(
send.as_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn iscatter<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
root: i32,
) -> Result<Request> {
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_iscatter(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.len() as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recv.len() as i64,
T::TAG as i32,
root,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn ibarrier(&self) -> Result<Request> {
let mut request_handle: i64 = 0;
let ret = unsafe { ffi::ferrompi_ibarrier(self.handle, &mut request_handle) };
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn iscan<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
op: ReduceOp,
) -> Result<Request> {
if send.len() != recv.len() {
return Err(Error::InvalidBuffer);
}
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_iscan(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
op as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn iexscan<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
op: ReduceOp,
) -> Result<Request> {
if send.len() != recv.len() {
return Err(Error::InvalidBuffer);
}
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_iexscan(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
op as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn ialltoall<T: MpiDatatype>(&self, send: &[T], recv: &mut [T]) -> Result<Request> {
let size = self.size() as usize;
if send.len() != recv.len() || send.len() % size != 0 {
return Err(Error::InvalidBuffer);
}
let count = (send.len() / size) as i64;
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_ialltoall(
send.as_ptr().cast::<std::ffi::c_void>(),
count,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
count,
T::TAG as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn igatherv<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
recvcounts: &[i32],
displs: &[i32],
root: i32,
) -> Result<Request> {
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_igatherv(
send.as_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recvcounts.as_ptr(),
displs.as_ptr(),
T::TAG as i32,
root,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn iscatterv<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
sendcounts: &[i32],
displs: &[i32],
root: i32,
) -> Result<Request> {
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_iscatterv(
send.as_ptr().cast::<std::ffi::c_void>(),
sendcounts.as_ptr(),
displs.as_ptr(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recv.len() as i64,
T::TAG as i32,
root,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn iallgatherv<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
recvcounts: &[i32],
displs: &[i32],
) -> Result<Request> {
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_iallgatherv(
send.as_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recvcounts.as_ptr(),
displs.as_ptr(),
T::TAG as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn ialltoallv<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
sendcounts: &[i32],
sdispls: &[i32],
recvcounts: &[i32],
rdispls: &[i32],
) -> Result<Request> {
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_ialltoallv(
send.as_ptr().cast::<std::ffi::c_void>(),
sendcounts.as_ptr(),
sdispls.as_ptr(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recvcounts.as_ptr(),
rdispls.as_ptr(),
T::TAG as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn ireduce_scatter_block<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
op: ReduceOp,
) -> Result<Request> {
let size = self.size() as usize;
if send.len() != recv.len() * size {
return Err(Error::InvalidBuffer);
}
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_ireduce_scatter_block(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recv.len() as i64,
T::TAG as i32,
op as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(Request::new(request_handle))
}
pub fn bcast_init<T: MpiDatatype>(
&self,
data: &mut [T],
root: i32,
) -> Result<PersistentRequest> {
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_bcast_init(
data.as_mut_ptr().cast::<std::ffi::c_void>(),
data.len() as i64,
T::TAG as i32,
root,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(PersistentRequest::new(request_handle))
}
pub fn allreduce_init<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
op: ReduceOp,
) -> Result<PersistentRequest> {
if send.len() != recv.len() {
return Err(Error::InvalidBuffer);
}
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_allreduce_init(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
op as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(PersistentRequest::new(request_handle))
}
pub fn allreduce_init_inplace<T: MpiDatatype>(
&self,
data: &mut [T],
op: ReduceOp,
) -> Result<PersistentRequest> {
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_allreduce_init_inplace(
data.as_mut_ptr().cast::<std::ffi::c_void>(),
data.len() as i64,
T::TAG as i32,
op as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(PersistentRequest::new(request_handle))
}
pub fn gather_init<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
root: i32,
) -> Result<PersistentRequest> {
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_gather_init(
send.as_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
root,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(PersistentRequest::new(request_handle))
}
pub fn reduce_init<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
op: ReduceOp,
root: i32,
) -> Result<PersistentRequest> {
if send.len() != recv.len() {
return Err(Error::InvalidBuffer);
}
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_reduce_init(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
op as i32,
root,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(PersistentRequest::new(request_handle))
}
pub fn scatter_init<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
root: i32,
) -> Result<PersistentRequest> {
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_scatter_init(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.len() as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recv.len() as i64,
T::TAG as i32,
root,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(PersistentRequest::new(request_handle))
}
pub fn allgather_init<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
) -> Result<PersistentRequest> {
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_allgather_init(
send.as_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(PersistentRequest::new(request_handle))
}
pub fn scan_init<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
op: ReduceOp,
) -> Result<PersistentRequest> {
if send.len() != recv.len() {
return Err(Error::InvalidBuffer);
}
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_scan_init(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
op as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(PersistentRequest::new(request_handle))
}
pub fn exscan_init<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
op: ReduceOp,
) -> Result<PersistentRequest> {
if send.len() != recv.len() {
return Err(Error::InvalidBuffer);
}
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_exscan_init(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
T::TAG as i32,
op as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(PersistentRequest::new(request_handle))
}
pub fn alltoall_init<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
) -> Result<PersistentRequest> {
if send.len() != recv.len() {
return Err(Error::InvalidBuffer);
}
let size = self.size() as usize;
if size == 0 || send.len() % size != 0 {
return Err(Error::InvalidBuffer);
}
let count_per_rank = send.len() / size;
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_alltoall_init(
send.as_ptr().cast::<std::ffi::c_void>(),
count_per_rank as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
count_per_rank as i64,
T::TAG as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(PersistentRequest::new(request_handle))
}
pub fn gatherv_init<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
recvcounts: &[i32],
displs: &[i32],
root: i32,
) -> Result<PersistentRequest> {
if recvcounts.len() != displs.len() {
return Err(Error::InvalidBuffer);
}
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_gatherv_init(
send.as_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recvcounts.as_ptr(),
displs.as_ptr(),
T::TAG as i32,
root,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(PersistentRequest::new(request_handle))
}
pub fn scatterv_init<T: MpiDatatype>(
&self,
send: &[T],
sendcounts: &[i32],
displs: &[i32],
recv: &mut [T],
root: i32,
) -> Result<PersistentRequest> {
if sendcounts.len() != displs.len() {
return Err(Error::InvalidBuffer);
}
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_scatterv_init(
send.as_ptr().cast::<std::ffi::c_void>(),
sendcounts.as_ptr(),
displs.as_ptr(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recv.len() as i64,
T::TAG as i32,
root,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(PersistentRequest::new(request_handle))
}
pub fn allgatherv_init<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
recvcounts: &[i32],
displs: &[i32],
) -> Result<PersistentRequest> {
if recvcounts.len() != displs.len() {
return Err(Error::InvalidBuffer);
}
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_allgatherv_init(
send.as_ptr().cast::<std::ffi::c_void>(),
send.len() as i64,
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recvcounts.as_ptr(),
displs.as_ptr(),
T::TAG as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(PersistentRequest::new(request_handle))
}
pub fn alltoallv_init<T: MpiDatatype>(
&self,
send: &[T],
sendcounts: &[i32],
sdispls: &[i32],
recv: &mut [T],
recvcounts: &[i32],
rdispls: &[i32],
) -> Result<PersistentRequest> {
if sendcounts.len() != sdispls.len() || recvcounts.len() != rdispls.len() {
return Err(Error::InvalidBuffer);
}
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_alltoallv_init(
send.as_ptr().cast::<std::ffi::c_void>(),
sendcounts.as_ptr(),
sdispls.as_ptr(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recvcounts.as_ptr(),
rdispls.as_ptr(),
T::TAG as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(PersistentRequest::new(request_handle))
}
pub fn reduce_scatter_block_init<T: MpiDatatype>(
&self,
send: &[T],
recv: &mut [T],
op: ReduceOp,
) -> Result<PersistentRequest> {
let size = self.size() as usize;
if size == 0 || send.len() != recv.len() * size {
return Err(Error::InvalidBuffer);
}
let mut request_handle: i64 = 0;
let ret = unsafe {
ffi::ferrompi_reduce_scatter_block_init(
send.as_ptr().cast::<std::ffi::c_void>(),
recv.as_mut_ptr().cast::<std::ffi::c_void>(),
recv.len() as i64,
T::TAG as i32,
op as i32,
self.handle,
&mut request_handle,
)
};
Error::check(ret)?;
Ok(PersistentRequest::new(request_handle))
}
pub fn abort(&self, errorcode: i32) -> ! {
unsafe { ffi::ferrompi_abort(self.handle, errorcode) };
std::process::exit(errorcode)
}
}
impl Drop for Communicator {
fn drop(&mut self) {
if self.handle != 0 {
unsafe { ffi::ferrompi_comm_free(self.handle) };
}
}
}
const _: () = {
#[allow(dead_code)]
fn assert_send_sync<T: Send + Sync>() {}
#[allow(dead_code)]
fn check() {
assert_send_sync::<Communicator>();
}
};
#[cfg(test)]
mod tests {
use super::*;
fn dummy_comm() -> Communicator {
Communicator { handle: 0 }
}
#[test]
fn reduce_mismatched_buffers_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 10];
let mut recv = vec![0.0f64; 5]; let result = comm.reduce(&send, &mut recv, ReduceOp::Sum, 0);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn allreduce_mismatched_buffers_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 10];
let mut recv = vec![0.0f64; 5];
let result = comm.allreduce(&send, &mut recv, ReduceOp::Sum);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn scan_mismatched_buffers_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 10];
let mut recv = vec![0.0f64; 5];
let result = comm.scan(&send, &mut recv, ReduceOp::Sum);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn exscan_mismatched_buffers_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 10];
let mut recv = vec![0.0f64; 5];
let result = comm.exscan(&send, &mut recv, ReduceOp::Sum);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn split_type_shared_repr_value_and_traits() {
assert_eq!(SplitType::Shared as i32, 0);
let st = SplitType::Shared;
let cloned = st;
assert_eq!(cloned, SplitType::Shared);
assert_eq!(format!("{:?}", st), "Shared");
}
#[test]
fn communicator_undefined_is_negative_one() {
assert_eq!(Communicator::UNDEFINED, -1);
}
#[test]
fn communicator_raw_handle_returns_correct_value() {
let comm = dummy_comm();
assert_eq!(comm.raw_handle(), 0);
}
#[test]
fn iallreduce_mismatched_buffers_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 10];
let mut recv = vec![0.0f64; 5];
let result = comm.iallreduce(&send, &mut recv, ReduceOp::Sum);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn ireduce_mismatched_buffers_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 10];
let mut recv = vec![0.0f64; 5];
let result = comm.ireduce(&send, &mut recv, ReduceOp::Sum, 0);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn iscan_mismatched_buffers_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 10];
let mut recv = vec![0.0f64; 5];
let result = comm.iscan(&send, &mut recv, ReduceOp::Sum);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn iexscan_mismatched_buffers_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 10];
let mut recv = vec![0.0f64; 5];
let result = comm.iexscan(&send, &mut recv, ReduceOp::Sum);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn allreduce_init_mismatched_buffers_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 10];
let mut recv = vec![0.0f64; 5];
let result = comm.allreduce_init(&send, &mut recv, ReduceOp::Sum);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn reduce_init_mismatched_buffers_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 10];
let mut recv = vec![0.0f64; 5];
let result = comm.reduce_init(&send, &mut recv, ReduceOp::Sum, 0);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn scan_init_mismatched_buffers_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 10];
let mut recv = vec![0.0f64; 5];
let result = comm.scan_init(&send, &mut recv, ReduceOp::Sum);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn exscan_init_mismatched_buffers_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 10];
let mut recv = vec![0.0f64; 5];
let result = comm.exscan_init(&send, &mut recv, ReduceOp::Sum);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn alltoall_init_mismatched_buffers_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 10];
let mut recv = vec![0.0f64; 5]; let result = comm.alltoall_init(&send, &mut recv);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn gatherv_init_mismatched_counts_displs_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 10];
let mut recv = vec![0.0f64; 40];
let recvcounts = vec![10i32; 4];
let displs = vec![0i32, 10, 20]; let result = comm.gatherv_init(&send, &mut recv, &recvcounts, &displs, 0);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn scatterv_init_mismatched_counts_displs_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 40];
let sendcounts = vec![10i32; 4];
let displs = vec![0i32, 10, 20]; let mut recv = vec![0.0f64; 10];
let result = comm.scatterv_init(&send, &sendcounts, &displs, &mut recv, 0);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn allgatherv_init_mismatched_counts_displs_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 10];
let mut recv = vec![0.0f64; 40];
let recvcounts = vec![10i32; 4];
let displs = vec![0i32, 10, 20]; let result = comm.allgatherv_init(&send, &mut recv, &recvcounts, &displs);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn alltoallv_init_mismatched_send_counts_displs_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 40];
let sendcounts = vec![10i32; 4];
let sdispls = vec![0i32, 10, 20]; let mut recv = vec![0.0f64; 40];
let recvcounts = vec![10i32; 4];
let rdispls = vec![0i32, 10, 20, 30];
let result = comm.alltoallv_init(
&send,
&sendcounts,
&sdispls,
&mut recv,
&recvcounts,
&rdispls,
);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
#[test]
fn alltoallv_init_mismatched_recv_counts_displs_returns_invalid_buffer() {
let comm = dummy_comm();
let send = vec![1.0f64; 40];
let sendcounts = vec![10i32; 4];
let sdispls = vec![0i32, 10, 20, 30];
let mut recv = vec![0.0f64; 40];
let recvcounts = vec![10i32; 4];
let rdispls = vec![0i32, 10, 20]; let result = comm.alltoallv_init(
&send,
&sendcounts,
&sdispls,
&mut recv,
&recvcounts,
&rdispls,
);
assert!(matches!(result, Err(Error::InvalidBuffer)));
}
}