#![allow(clippy::missing_panics_doc, clippy::ptr_as_ptr)]
use std::alloc::{self, Layout};
use std::mem::{transmute, MaybeUninit};
use std::{fmt, ptr};
use conv::ConvUtil;
use super::{Count, Tag};
use crate::ffi;
use crate::ffi::{MPI_Message, MPI_Status};
use crate::datatype::traits::{
AsDatatype, Buffer, BufferMut, Collection, Datatype, Equivalence, Pointer,
};
use crate::raw::traits::{AsRaw, AsRawMut};
use crate::request::{Request, Scope, StaticScope};
use crate::topology::traits::{AsCommunicator, Communicator};
use crate::topology::{AnyProcess, CommunicatorRelation, Process, Rank};
use crate::{with_uninitialized, with_uninitialized2};
pub mod traits {
pub use super::{Destination, MatchedReceiveVec, Source};
}
pub unsafe trait Source: AsCommunicator {
fn source_rank(&self) -> Rank;
fn probe_with_tag(&self, tag: Tag) -> Status {
unsafe {
Status(
with_uninitialized(|status| {
ffi::MPI_Probe(
self.source_rank(),
tag,
self.as_communicator().as_raw(),
status,
)
})
.1,
)
}
}
fn probe(&self) -> Status {
self.probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
}
fn matched_probe_with_tag(&self, tag: Tag) -> (Message, Status) {
let (_, message, status) = unsafe {
with_uninitialized2(|message, status| {
ffi::MPI_Mprobe(
self.source_rank(),
tag,
self.as_communicator().as_raw(),
message,
status,
)
})
};
(Message(message), Status(status))
}
fn matched_probe(&self) -> (Message, Status) {
self.matched_probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
}
fn receive_with_tag<Msg>(&self, tag: Tag) -> (Msg, Status)
where
Msg: Equivalence,
{
unsafe {
let (_, msg, status) = with_uninitialized2(|msg, status| {
ffi::MPI_Recv(
msg as _,
1,
Msg::equivalent_datatype().as_raw(),
self.source_rank(),
tag,
self.as_communicator().as_raw(),
status,
)
});
let status = Status(status);
if status.count(Msg::equivalent_datatype()) == 0 {
panic!("Received an empty message.");
}
(msg, status)
}
}
fn receive<Msg>(&self) -> (Msg, Status)
where
Msg: Equivalence,
{
self.receive_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
}
fn receive_into_with_tag<Buf: ?Sized>(&self, buf: &mut Buf, tag: Tag) -> Status
where
Buf: BufferMut,
{
unsafe {
Status(
with_uninitialized(|status| {
ffi::MPI_Recv(
buf.pointer_mut(),
buf.count(),
buf.as_datatype().as_raw(),
self.source_rank(),
tag,
self.as_communicator().as_raw(),
status,
)
})
.1,
)
}
}
fn receive_into<Buf: ?Sized>(&self, buf: &mut Buf) -> Status
where
Buf: BufferMut,
{
self.receive_into_with_tag(buf, unsafe { ffi::RSMPI_ANY_TAG })
}
fn receive_vec_with_tag<Msg>(&self, tag: Tag) -> (Vec<Msg>, Status)
where
Msg: Equivalence,
{
self.matched_probe_with_tag(tag).matched_receive_vec()
}
fn receive_vec<Msg>(&self) -> (Vec<Msg>, Status)
where
Msg: Equivalence,
{
self.receive_vec_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
}
fn immediate_receive_into_with_tag<'a, Sc, Buf: ?Sized>(
&self,
scope: Sc,
buf: &'a mut Buf,
tag: Tag,
) -> Request<'a, Sc>
where
Buf: 'a + BufferMut,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Irecv(
buf.pointer_mut(),
buf.count(),
buf.as_datatype().as_raw(),
self.source_rank(),
tag,
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_receive_into<'a, Sc, Buf: ?Sized>(
&self,
scope: Sc,
buf: &'a mut Buf,
) -> Request<'a, Sc>
where
Buf: 'a + BufferMut,
Sc: Scope<'a>,
{
self.immediate_receive_into_with_tag(scope, buf, unsafe { ffi::RSMPI_ANY_TAG })
}
fn immediate_receive_with_tag<Msg>(&self, tag: Tag) -> ReceiveFuture<Msg>
where
Msg: Equivalence,
{
unsafe {
let val = alloc::alloc(Layout::new::<Msg>()) as *mut Msg;
let (_, request) = with_uninitialized(|request| {
ffi::MPI_Irecv(
val as _,
1,
Msg::equivalent_datatype().as_raw(),
self.source_rank(),
tag,
self.as_communicator().as_raw(),
request,
)
});
ReceiveFuture {
val,
req: Request::from_raw(request, StaticScope),
}
}
}
fn immediate_receive<Msg>(&self) -> ReceiveFuture<Msg>
where
Msg: Equivalence,
{
self.immediate_receive_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
}
fn immediate_probe_with_tag(&self, tag: Tag) -> Option<Status> {
unsafe {
let mut status = MaybeUninit::uninit();
let (_, flag) = with_uninitialized(|flag| {
ffi::MPI_Iprobe(
self.source_rank(),
tag,
self.as_communicator().as_raw(),
flag,
status.as_mut_ptr(),
)
});
if flag == 0 {
None
} else {
Some(Status(status.assume_init()))
}
}
}
fn immediate_probe(&self) -> Option<Status> {
self.immediate_probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
}
fn immediate_matched_probe_with_tag(&self, tag: Tag) -> Option<(Message, Status)> {
unsafe {
let mut message = MaybeUninit::uninit();
let mut status = MaybeUninit::uninit();
let (_, flag) = with_uninitialized(|flag| {
ffi::MPI_Improbe(
self.source_rank(),
tag,
self.as_communicator().as_raw(),
flag,
message.as_mut_ptr(),
status.as_mut_ptr(),
)
});
if flag == 0 {
None
} else {
Some((Message(message.assume_init()), Status(status.assume_init())))
}
}
}
fn immediate_matched_probe(&self) -> Option<(Message, Status)> {
self.immediate_matched_probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
}
}
unsafe impl<'a, C> Source for AnyProcess<'a, C>
where
C: 'a + Communicator,
{
fn source_rank(&self) -> Rank {
unsafe { ffi::RSMPI_ANY_SOURCE }
}
}
unsafe impl<'a, C> Source for Process<'a, C>
where
C: 'a + Communicator,
{
fn source_rank(&self) -> Rank {
self.rank()
}
}
pub trait Destination: AsCommunicator {
fn destination_rank(&self) -> Rank;
fn send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
where
Buf: Buffer,
{
unsafe {
ffi::MPI_Send(
buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw(),
);
}
}
fn send<Buf: ?Sized>(&self, buf: &Buf)
where
Buf: Buffer,
{
self.send_with_tag(buf, Tag::default())
}
fn buffered_send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
where
Buf: Buffer,
{
unsafe {
ffi::MPI_Bsend(
buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw(),
);
}
}
fn buffered_send<Buf: ?Sized>(&self, buf: &Buf)
where
Buf: Buffer,
{
self.buffered_send_with_tag(buf, Tag::default())
}
fn synchronous_send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
where
Buf: Buffer,
{
unsafe {
ffi::MPI_Ssend(
buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw(),
);
}
}
fn synchronous_send<Buf: ?Sized>(&self, buf: &Buf)
where
Buf: Buffer,
{
self.synchronous_send_with_tag(buf, Tag::default())
}
fn ready_send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
where
Buf: Buffer,
{
unsafe {
ffi::MPI_Rsend(
buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw(),
);
}
}
fn ready_send<Buf: ?Sized>(&self, buf: &Buf)
where
Buf: Buffer,
{
self.ready_send_with_tag(buf, Tag::default())
}
fn immediate_send_with_tag<'a, Sc, Buf: ?Sized>(
&self,
scope: Sc,
buf: &'a Buf,
tag: Tag,
) -> Request<'a, Sc>
where
Buf: 'a + Buffer,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Isend(
buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_send<'a, Sc, Buf: ?Sized>(&self, scope: Sc, buf: &'a Buf) -> Request<'a, Sc>
where
Buf: 'a + Buffer,
Sc: Scope<'a>,
{
self.immediate_send_with_tag(scope, buf, Tag::default())
}
fn immediate_buffered_send_with_tag<'a, Sc, Buf: ?Sized>(
&self,
scope: Sc,
buf: &'a Buf,
tag: Tag,
) -> Request<'a, Sc>
where
Buf: 'a + Buffer,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Ibsend(
buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_buffered_send<'a, Sc, Buf: ?Sized>(
&self,
scope: Sc,
buf: &'a Buf,
) -> Request<'a, Sc>
where
Buf: 'a + Buffer,
Sc: Scope<'a>,
{
self.immediate_buffered_send_with_tag(scope, buf, Tag::default())
}
fn immediate_synchronous_send_with_tag<'a, Sc, Buf: ?Sized>(
&self,
scope: Sc,
buf: &'a Buf,
tag: Tag,
) -> Request<'a, Sc>
where
Buf: 'a + Buffer,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Issend(
buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_synchronous_send<'a, Sc, Buf: ?Sized>(
&self,
scope: Sc,
buf: &'a Buf,
) -> Request<'a, Sc>
where
Buf: 'a + Buffer,
Sc: Scope<'a>,
{
self.immediate_synchronous_send_with_tag(scope, buf, Tag::default())
}
fn immediate_ready_send_with_tag<'a, Sc, Buf: ?Sized>(
&self,
scope: Sc,
buf: &'a Buf,
tag: Tag,
) -> Request<'a, Sc>
where
Buf: 'a + Buffer,
Sc: Scope<'a>,
{
unsafe {
Request::from_raw(
with_uninitialized(|request| {
ffi::MPI_Irsend(
buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw(),
request,
)
})
.1,
scope,
)
}
}
fn immediate_ready_send<'a, Sc, Buf: ?Sized>(&self, scope: Sc, buf: &'a Buf) -> Request<'a, Sc>
where
Buf: 'a + Buffer,
Sc: Scope<'a>,
{
self.immediate_ready_send_with_tag(scope, buf, Tag::default())
}
}
impl<'a, C> Destination for Process<'a, C>
where
C: 'a + Communicator,
{
fn destination_rank(&self) -> Rank {
self.rank()
}
}
#[derive(Copy, Clone)]
pub struct Status(MPI_Status);
impl Status {
pub fn from_raw(status: MPI_Status) -> Status {
Status(status)
}
pub fn source_rank(&self) -> Rank {
self.0.MPI_SOURCE
}
pub fn tag(&self) -> Tag {
self.0.MPI_TAG
}
pub fn count<D: Datatype>(&self, d: D) -> Count {
unsafe { with_uninitialized(|count| ffi::MPI_Get_count(&self.0, d.as_raw(), count)).1 }
}
}
impl fmt::Debug for Status {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(
f,
"Status {{ source_rank: {}, tag: {} }}",
self.source_rank(),
self.tag()
)
}
}
#[must_use]
pub struct Message(MPI_Message);
impl Message {
pub fn is_no_proc(&self) -> bool {
self.as_raw() == unsafe { ffi::RSMPI_MESSAGE_NO_PROC }
}
pub fn matched_receive<Msg>(mut self) -> (Msg, Status)
where
Msg: Equivalence,
{
unsafe {
let (_, res, status) = with_uninitialized2(|res, status| {
ffi::MPI_Mrecv(
res as _,
1,
Msg::equivalent_datatype().as_raw(),
self.as_raw_mut(),
status,
)
});
let status = Status(status);
if status.count(Msg::equivalent_datatype()) == 0 {
panic!("Received an empty message.");
}
(res, status)
}
}
pub fn matched_receive_into<Buf: ?Sized>(mut self, buf: &mut Buf) -> Status
where
Buf: BufferMut,
{
let status;
unsafe {
status = with_uninitialized(|status| {
ffi::MPI_Mrecv(
buf.pointer_mut(),
buf.count(),
buf.as_datatype().as_raw(),
self.as_raw_mut(),
status,
)
})
.1;
assert_eq!(self.as_raw(), ffi::RSMPI_MESSAGE_NULL);
};
Status(status)
}
pub fn immediate_matched_receive_into<'a, Sc, Buf: ?Sized + 'a>(
mut self,
scope: Sc,
buf: &'a mut Buf,
) -> Request<'a, Sc>
where
Buf: BufferMut,
Sc: Scope<'a>,
{
unsafe {
let request = with_uninitialized(|request| {
ffi::MPI_Imrecv(
buf.pointer_mut(),
buf.count(),
buf.as_datatype().as_raw(),
self.as_raw_mut(),
request,
)
})
.1;
assert_eq!(self.as_raw(), ffi::RSMPI_MESSAGE_NULL);
Request::from_raw(request, scope)
}
}
}
unsafe impl AsRaw for Message {
type Raw = MPI_Message;
fn as_raw(&self) -> Self::Raw {
self.0
}
}
unsafe impl AsRawMut for Message {
fn as_raw_mut(&mut self) -> *mut <Self as AsRaw>::Raw {
&mut self.0
}
}
impl Drop for Message {
fn drop(&mut self) {
assert_eq!(
self.as_raw(),
unsafe { ffi::RSMPI_MESSAGE_NULL },
"matched message dropped without receiving."
);
}
}
pub trait MatchedReceiveVec {
fn matched_receive_vec<Msg>(self) -> (Vec<Msg>, Status)
where
Msg: Equivalence;
}
impl MatchedReceiveVec for (Message, Status) {
fn matched_receive_vec<Msg>(self) -> (Vec<Msg>, Status)
where
Msg: Equivalence,
{
#[repr(transparent)]
struct UninitMsg<M>(MaybeUninit<M>);
unsafe impl<M: Equivalence> Equivalence for UninitMsg<M> {
type Out = M::Out;
fn equivalent_datatype() -> Self::Out {
M::equivalent_datatype()
}
}
let (message, status) = self;
let count = status
.count(Msg::equivalent_datatype())
.value_as()
.expect("Message element count cannot be expressed as a usize.");
let mut res = (0..count)
.map(|_| UninitMsg::<Msg>(MaybeUninit::uninit()))
.collect::<Vec<_>>();
let status = message.matched_receive_into(&mut res[..]);
let res = unsafe { transmute(res) };
(res, status)
}
}
pub fn send_receive_with_tags<M, D, R, S>(
msg: &M,
destination: &D,
sendtag: Tag,
source: &S,
receivetag: Tag,
) -> (R, Status)
where
M: Equivalence,
D: Destination,
R: Equivalence,
S: Source,
{
assert_eq!(
source
.as_communicator()
.compare(destination.as_communicator()),
CommunicatorRelation::Identical
);
unsafe {
let (_, res, status) = with_uninitialized2(|res, status| {
ffi::MPI_Sendrecv(
msg.pointer(),
msg.count(),
msg.as_datatype().as_raw(),
destination.destination_rank(),
sendtag,
res as _,
1,
R::equivalent_datatype().as_raw(),
source.source_rank(),
receivetag,
source.as_communicator().as_raw(),
status,
)
});
let status = Status(status);
(res, status)
}
}
pub fn send_receive<R, M, D, S>(msg: &M, destination: &D, source: &S) -> (R, Status)
where
M: Equivalence,
D: Destination,
R: Equivalence,
S: Source,
{
send_receive_with_tags(msg, destination, Tag::default(), source, unsafe {
ffi::RSMPI_ANY_TAG
})
}
pub fn send_receive_into_with_tags<M: ?Sized, D, B: ?Sized, S>(
msg: &M,
destination: &D,
sendtag: Tag,
buf: &mut B,
source: &S,
receivetag: Tag,
) -> Status
where
M: Buffer,
D: Destination,
B: BufferMut,
S: Source,
{
assert_eq!(
source
.as_communicator()
.compare(destination.as_communicator()),
CommunicatorRelation::Identical
);
unsafe {
Status(
with_uninitialized(|status| {
ffi::MPI_Sendrecv(
msg.pointer(),
msg.count(),
msg.as_datatype().as_raw(),
destination.destination_rank(),
sendtag,
buf.pointer_mut(),
buf.count(),
buf.as_datatype().as_raw(),
source.source_rank(),
receivetag,
source.as_communicator().as_raw(),
status,
)
})
.1,
)
}
}
pub fn send_receive_into<M: ?Sized, D, B: ?Sized, S>(
msg: &M,
destination: &D,
buf: &mut B,
source: &S,
) -> Status
where
M: Buffer,
D: Destination,
B: BufferMut,
S: Source,
{
send_receive_into_with_tags(msg, destination, Tag::default(), buf, source, unsafe {
ffi::RSMPI_ANY_TAG
})
}
pub fn send_receive_replace_into_with_tags<B: ?Sized, D, S>(
buf: &mut B,
destination: &D,
sendtag: Tag,
source: &S,
receivetag: Tag,
) -> Status
where
B: BufferMut,
D: Destination,
S: Source,
{
assert_eq!(
source
.as_communicator()
.compare(destination.as_communicator()),
CommunicatorRelation::Identical
);
unsafe {
Status(
with_uninitialized(|status| {
ffi::MPI_Sendrecv_replace(
buf.pointer_mut(),
buf.count(),
buf.as_datatype().as_raw(),
destination.destination_rank(),
sendtag,
source.source_rank(),
receivetag,
source.as_communicator().as_raw(),
status,
)
})
.1,
)
}
}
pub fn send_receive_replace_into<B: ?Sized, D, S>(
buf: &mut B,
destination: &D,
source: &S,
) -> Status
where
B: BufferMut,
D: Destination,
S: Source,
{
send_receive_replace_into_with_tags(buf, destination, Tag::default(), source, unsafe {
ffi::RSMPI_ANY_TAG
})
}
#[must_use]
pub struct ReceiveFuture<T> {
val: *mut T,
req: Request<'static>,
}
impl<T> ReceiveFuture<T>
where
T: Equivalence,
{
pub fn get(self) -> (T, Status) {
let status = self.req.wait();
if status.count(T::equivalent_datatype()) == 0 {
panic!("Received an empty message into a ReceiveFuture.");
}
unsafe { (ptr::read(self.val), status) }
}
pub fn r#try(mut self) -> Result<(T, Status), Self> {
match self.req.test() {
Ok(status) => {
if status.count(T::equivalent_datatype()) == 0 {
panic!("Received an empty message into a ReceiveFuture.");
}
unsafe { Ok((ptr::read(self.val), status)) }
}
Err(request) => {
self.req = request;
Err(self)
}
}
}
}