mpi-fork-fnsp 0.6.0

Message Passing Interface bindings for Rust (FORK)
Documentation
//! Point to point communication
//!
//! Endpoints of communication are mostly described by types that implement the `Source` and
//! `Destination` trait. Communication operations are implemented as default methods on those
//! traits.
//!
//! # Unfinished features
//!
//! - **3.2.6**: `MPI_STATUS_IGNORE`
//! - **3.6**: Buffer usage, `MPI_Buffer_attach()`, `MPI_Buffer_detach()`
//! - **3.9**: Persistent requests, `MPI_Send_init()`, `MPI_Bsend_init()`, `MPI_Ssend_init()`,
//! `MPI_Rsend_init()`, `MPI_Recv_init()`, `MPI_Start()`, `MPI_Startall()`
#![allow(clippy::missing_panics_doc, clippy::ptr_as_ptr)]
use std::alloc::{self, Layout};
use std::mem::{transmute, MaybeUninit};
use std::{fmt, ptr};

use conv::ConvUtil;

use super::{Count, Tag};

use crate::ffi;
use crate::ffi::{MPI_Message, MPI_Status};

use crate::datatype::traits::{
    AsDatatype, Buffer, BufferMut, Collection, Datatype, Equivalence, Pointer,
};
use crate::raw::traits::{AsRaw, AsRawMut};
use crate::request::{Request, Scope, StaticScope};
use crate::topology::traits::{AsCommunicator, Communicator};
use crate::topology::{AnyProcess, CommunicatorRelation, Process, Rank};
use crate::{with_uninitialized, with_uninitialized2};

// TODO: rein in _with_tag ugliness, use optional tags or make tag part of Source and Destination

/// Point to point communication traits
pub mod traits {
    pub use super::{Destination, MatchedReceiveVec, Source};
}

/// Something that can be used as the source in a point to point receive operation
///
/// # Examples
///
/// - A `Process` used as a source for a receive operation will receive data only from the
/// identified process.
/// - A communicator can also be used as a source via the `AnyProcess` identifier.
///
/// # Standard section(s)
///
/// 3.2.3
pub unsafe trait Source: AsCommunicator {
    /// `Rank` that identifies the source
    fn source_rank(&self) -> Rank;

    /// Probe a source for incoming messages.
    ///
    /// Probe `Source` `&self` for incoming messages with a certain tag.
    ///
    /// An ordinary `probe()` returns a `Status` which allows inspection of the properties of the
    /// incoming message, but does not guarantee reception by a subsequent `receive()` (especially
    /// in a multi-threaded set-up). For a probe operation with stronger guarantees, see
    /// `matched_probe()`.
    ///
    /// # Standard section(s)
    ///
    /// 3.8.1
    fn probe_with_tag(&self, tag: Tag) -> Status {
        unsafe {
            Status(
                with_uninitialized(|status| {
                    ffi::MPI_Probe(
                        self.source_rank(),
                        tag,
                        self.as_communicator().as_raw(),
                        status,
                    )
                })
                .1,
            )
        }
    }

    /// Probe a source for incoming messages.
    ///
    /// Probe `Source` `&self` for incoming messages with any tag.
    ///
    /// An ordinary `probe()` returns a `Status` which allows inspection of the properties of the
    /// incoming message, but does not guarantee reception by a subsequent `receive()` (especially
    /// in a multi-threaded set-up). For a probe operation with stronger guarantees, see
    /// `matched_probe()`.
    ///
    /// # Standard section(s)
    ///
    /// 3.8.1
    fn probe(&self) -> Status {
        self.probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
    }

    /// Probe a source for incoming messages with guaranteed reception.
    ///
    /// Probe `Source` `&self` for incoming messages with a certain tag.
    ///
    /// A `matched_probe()` returns both a `Status` that describes the properties of a pending
    /// incoming message and a `Message` which can and *must* subsequently be used in a
    /// `matched_receive()` to receive the probed message.
    ///
    /// # Standard section(s)
    ///
    /// 3.8.2
    fn matched_probe_with_tag(&self, tag: Tag) -> (Message, Status) {
        let (_, message, status) = unsafe {
            with_uninitialized2(|message, status| {
                ffi::MPI_Mprobe(
                    self.source_rank(),
                    tag,
                    self.as_communicator().as_raw(),
                    message,
                    status,
                )
            })
        };
        (Message(message), Status(status))
    }

    /// Probe a source for incoming messages with guaranteed reception.
    ///
    /// Probe `Source` `&self` for incoming messages with any tag.
    ///
    /// A `matched_probe()` returns both a `Status` that describes the properties of a pending
    /// incoming message and a `Message` which can and *must* subsequently be used in a
    /// `matched_receive()` to receive the probed message.
    ///
    /// # Standard section(s)
    ///
    /// 3.8.2
    fn matched_probe(&self) -> (Message, Status) {
        self.matched_probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
    }

    /// Receive a message containing a single instance of type `Msg`.
    ///
    /// Receive a message from `Source` `&self` tagged `tag` containing a single instance of type
    /// `Msg`.
    ///
    /// # Standard section(s)
    ///
    /// 3.2.4
    fn receive_with_tag<Msg>(&self, tag: Tag) -> (Msg, Status)
    where
        Msg: Equivalence,
    {
        unsafe {
            let (_, msg, status) = with_uninitialized2(|msg, status| {
                ffi::MPI_Recv(
                    msg as _,
                    1,
                    Msg::equivalent_datatype().as_raw(),
                    self.source_rank(),
                    tag,
                    self.as_communicator().as_raw(),
                    status,
                )
            });
            let status = Status(status);
            if status.count(Msg::equivalent_datatype()) == 0 {
                panic!("Received an empty message.");
            }
            (msg, status)
        }
    }

    /// Receive a message containing a single instance of type `Msg`.
    ///
    /// Receive a message from `Source` `&self` containing a single instance of type `Msg`.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// extern crate mpi_fork_fnsp as mpi;
    /// use mpi::traits::*;
    ///
    /// let universe = mpi::initialize().unwrap();
    /// let world = universe.world();
    ///
    /// let x = world.any_process().receive::<f64>();
    /// ```
    ///
    /// # Standard section(s)
    ///
    /// 3.2.4
    fn receive<Msg>(&self) -> (Msg, Status)
    where
        Msg: Equivalence,
    {
        self.receive_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
    }

    /// Receive a message into a `Buffer`.
    ///
    /// Receive a message from `Source` `&self` tagged `tag` into `Buffer` `buf`.
    ///
    /// # Standard section(s)
    ///
    /// 3.2.4
    fn receive_into_with_tag<Buf: ?Sized>(&self, buf: &mut Buf, tag: Tag) -> Status
    where
        Buf: BufferMut,
    {
        unsafe {
            Status(
                with_uninitialized(|status| {
                    ffi::MPI_Recv(
                        buf.pointer_mut(),
                        buf.count(),
                        buf.as_datatype().as_raw(),
                        self.source_rank(),
                        tag,
                        self.as_communicator().as_raw(),
                        status,
                    )
                })
                .1,
            )
        }
    }

    /// Receive a message into a `Buffer`.
    ///
    /// Receive a message from `Source` `&self` into `Buffer` `buf`.
    ///
    /// # Standard section(s)
    ///
    /// 3.2.4
    fn receive_into<Buf: ?Sized>(&self, buf: &mut Buf) -> Status
    where
        Buf: BufferMut,
    {
        self.receive_into_with_tag(buf, unsafe { ffi::RSMPI_ANY_TAG })
    }

    /// Receive a message containing multiple instances of type `Msg` into a `Vec`.
    ///
    /// Receive a message from `Source` `&self` tagged `tag` containing multiple instances of type
    /// `Msg` into a `Vec`.
    ///
    /// # Standard section(s)
    ///
    /// 3.2.4
    fn receive_vec_with_tag<Msg>(&self, tag: Tag) -> (Vec<Msg>, Status)
    where
        Msg: Equivalence,
    {
        self.matched_probe_with_tag(tag).matched_receive_vec()
    }

    /// Receive a message containing multiple instances of type `Msg` into a `Vec`.
    ///
    /// Receive a message from `Source` `&self` containing multiple instances of type `Msg` into a
    /// `Vec`.
    ///
    /// # Examples
    /// See `examples/send_receive.rs`
    ///
    /// # Standard section(s)
    ///
    /// 3.2.4
    fn receive_vec<Msg>(&self) -> (Vec<Msg>, Status)
    where
        Msg: Equivalence,
    {
        self.receive_vec_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
    }

    /// Initiate an immediate (non-blocking) receive operation.
    ///
    /// Initiate receiving a message matching `tag` into `buf`.
    ///
    /// # Standard section(s)
    ///
    /// 3.7.2
    fn immediate_receive_into_with_tag<'a, Sc, Buf: ?Sized>(
        &self,
        scope: Sc,
        buf: &'a mut Buf,
        tag: Tag,
    ) -> Request<'a, Sc>
    where
        Buf: 'a + BufferMut,
        Sc: Scope<'a>,
    {
        unsafe {
            Request::from_raw(
                with_uninitialized(|request| {
                    ffi::MPI_Irecv(
                        buf.pointer_mut(),
                        buf.count(),
                        buf.as_datatype().as_raw(),
                        self.source_rank(),
                        tag,
                        self.as_communicator().as_raw(),
                        request,
                    )
                })
                .1,
                scope,
            )
        }
    }

    /// Initiate an immediate (non-blocking) receive operation.
    ///
    /// Initiate receiving a message into `buf`.
    ///
    /// # Examples
    /// See `examples/immediate.rs`
    ///
    /// # Standard section(s)
    ///
    /// 3.7.2
    fn immediate_receive_into<'a, Sc, Buf: ?Sized>(
        &self,
        scope: Sc,
        buf: &'a mut Buf,
    ) -> Request<'a, Sc>
    where
        Buf: 'a + BufferMut,
        Sc: Scope<'a>,
    {
        self.immediate_receive_into_with_tag(scope, buf, unsafe { ffi::RSMPI_ANY_TAG })
    }

    /// Initiate a non-blocking receive operation for messages matching tag `tag`.
    ///
    /// # Standard section(s)
    ///
    /// 3.7.2
    fn immediate_receive_with_tag<Msg>(&self, tag: Tag) -> ReceiveFuture<Msg>
    where
        Msg: Equivalence,
    {
        unsafe {
            let val = alloc::alloc(Layout::new::<Msg>()) as *mut Msg;
            let (_, request) = with_uninitialized(|request| {
                ffi::MPI_Irecv(
                    val as _,
                    1,
                    Msg::equivalent_datatype().as_raw(),
                    self.source_rank(),
                    tag,
                    self.as_communicator().as_raw(),
                    request,
                )
            });
            ReceiveFuture {
                val,
                req: Request::from_raw(request, StaticScope),
            }
        }
    }

    /// Initiate a non-blocking receive operation.
    ///
    /// # Examples
    /// See `examples/immediate.rs`
    ///
    /// # Standard section(s)
    ///
    /// 3.7.2
    fn immediate_receive<Msg>(&self) -> ReceiveFuture<Msg>
    where
        Msg: Equivalence,
    {
        self.immediate_receive_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
    }

    /// Asynchronously probe a source for incoming messages.
    ///
    /// Asynchronously probe `Source` `&self` for incoming messages with a certain tag.
    ///
    /// Like `Probe` but returns a `None` immediately if there is no incoming message to be probed.
    ///
    /// # Standard section(s)
    ///
    /// 3.8.1
    fn immediate_probe_with_tag(&self, tag: Tag) -> Option<Status> {
        unsafe {
            let mut status = MaybeUninit::uninit();

            let (_, flag) = with_uninitialized(|flag| {
                ffi::MPI_Iprobe(
                    self.source_rank(),
                    tag,
                    self.as_communicator().as_raw(),
                    flag,
                    status.as_mut_ptr(),
                )
            });

            if flag == 0 {
                None
            } else {
                Some(Status(status.assume_init()))
            }
        }
    }

    /// Asynchronously probe a source for incoming messages.
    ///
    /// Asynchronously probe `Source` `&self` for incoming messages with any tag.
    ///
    /// Like `Probe` but returns a `None` immediately if there is no incoming message to be probed.
    ///
    /// # Standard section(s)
    ///
    /// 3.8.1
    fn immediate_probe(&self) -> Option<Status> {
        self.immediate_probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
    }

    /// Asynchronously probe a source for incoming messages with guaranteed reception.
    ///
    /// Asynchronously probe `Source` `&self` for incoming messages with a certain tag.
    ///
    /// Like `MatchedProbe` but returns a `None` immediately if there is no incoming message to be
    /// probed.
    ///
    /// # Standard section(s)
    ///
    /// 3.8.2
    fn immediate_matched_probe_with_tag(&self, tag: Tag) -> Option<(Message, Status)> {
        unsafe {
            let mut message = MaybeUninit::uninit();
            let mut status = MaybeUninit::uninit();

            let (_, flag) = with_uninitialized(|flag| {
                ffi::MPI_Improbe(
                    self.source_rank(),
                    tag,
                    self.as_communicator().as_raw(),
                    flag,
                    message.as_mut_ptr(),
                    status.as_mut_ptr(),
                )
            });

            if flag == 0 {
                None
            } else {
                Some((Message(message.assume_init()), Status(status.assume_init())))
            }
        }
    }

    /// Asynchronously probe a source for incoming messages with guaranteed reception.
    ///
    /// Asynchronously probe `Source` `&self` for incoming messages with any tag.
    ///
    /// Like `MatchedProbe` but returns a `None` immediately if there is no incoming message to be
    /// probed.
    ///
    /// # Standard section(s)
    ///
    /// 3.8.2
    fn immediate_matched_probe(&self) -> Option<(Message, Status)> {
        self.immediate_matched_probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
    }
}

unsafe impl<'a, C> Source for AnyProcess<'a, C>
where
    C: 'a + Communicator,
{
    fn source_rank(&self) -> Rank {
        unsafe { ffi::RSMPI_ANY_SOURCE }
    }
}

unsafe impl<'a, C> Source for Process<'a, C>
where
    C: 'a + Communicator,
{
    fn source_rank(&self) -> Rank {
        self.rank()
    }
}

/// Something that can be used as the destination in a point to point send operation
///
/// # Examples
/// - Using a `Process` as the destination will send data to that specific process.
///
/// # Standard section(s)
///
/// 3.2.3
pub trait Destination: AsCommunicator {
    /// `Rank` that identifies the destination
    fn destination_rank(&self) -> Rank;

    /// Blocking standard mode send operation
    ///
    /// Send the contents of a `Buffer` to the `Destination` `&self` and tag it.
    ///
    /// # Standard section(s)
    ///
    /// 3.2.1
    fn send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
    where
        Buf: Buffer,
    {
        unsafe {
            ffi::MPI_Send(
                buf.pointer(),
                buf.count(),
                buf.as_datatype().as_raw(),
                self.destination_rank(),
                tag,
                self.as_communicator().as_raw(),
            );
        }
    }

    /// Blocking standard mode send operation
    ///
    /// Send the contents of a `Buffer` to the `Destination` `&self`.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// extern crate mpi_fork_fnsp as mpi;
    /// use mpi::traits::*;
    ///
    /// let universe = mpi::initialize().unwrap();
    /// let world = universe.world();
    ///
    /// let v = vec![ 1.0f64, 2.0, 3.0 ];
    /// world.process_at_rank(1).send(&v[..]);
    /// ```
    ///
    /// See also `examples/send_receive.rs`
    ///
    /// # Standard section(s)
    ///
    /// 3.2.1
    fn send<Buf: ?Sized>(&self, buf: &Buf)
    where
        Buf: Buffer,
    {
        self.send_with_tag(buf, Tag::default())
    }

    /// Blocking buffered mode send operation
    ///
    /// Send the contents of a `Buffer` to the `Destination` `&self` and tag it.
    ///
    /// # Standard section(s)
    ///
    /// 3.4
    fn buffered_send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
    where
        Buf: Buffer,
    {
        unsafe {
            ffi::MPI_Bsend(
                buf.pointer(),
                buf.count(),
                buf.as_datatype().as_raw(),
                self.destination_rank(),
                tag,
                self.as_communicator().as_raw(),
            );
        }
    }

    /// Blocking buffered mode send operation
    ///
    /// Send the contents of a `Buffer` to the `Destination` `&self`.
    ///
    /// # Standard section(s)
    ///
    /// 3.4
    fn buffered_send<Buf: ?Sized>(&self, buf: &Buf)
    where
        Buf: Buffer,
    {
        self.buffered_send_with_tag(buf, Tag::default())
    }

    /// Blocking synchronous mode send operation
    ///
    /// Send the contents of a `Buffer` to the `Destination` `&self` and tag it.
    ///
    /// Completes only once the matching receive operation has started.
    ///
    /// # Standard section(s)
    ///
    /// 3.4
    fn synchronous_send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
    where
        Buf: Buffer,
    {
        unsafe {
            ffi::MPI_Ssend(
                buf.pointer(),
                buf.count(),
                buf.as_datatype().as_raw(),
                self.destination_rank(),
                tag,
                self.as_communicator().as_raw(),
            );
        }
    }

    /// Blocking synchronous mode send operation
    ///
    /// Send the contents of a `Buffer` to the `Destination` `&self`.
    ///
    /// Completes only once the matching receive operation has started.
    ///
    /// # Standard section(s)
    ///
    /// 3.4
    fn synchronous_send<Buf: ?Sized>(&self, buf: &Buf)
    where
        Buf: Buffer,
    {
        self.synchronous_send_with_tag(buf, Tag::default())
    }

    /// Blocking ready mode send operation
    ///
    /// Send the contents of a `Buffer` to the `Destination` `&self` and tag it.
    ///
    /// Fails if the matching receive operation has not been posted.
    ///
    /// # Standard section(s)
    ///
    /// 3.4
    fn ready_send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
    where
        Buf: Buffer,
    {
        unsafe {
            ffi::MPI_Rsend(
                buf.pointer(),
                buf.count(),
                buf.as_datatype().as_raw(),
                self.destination_rank(),
                tag,
                self.as_communicator().as_raw(),
            );
        }
    }

    /// Blocking ready mode send operation
    ///
    /// Send the contents of a `Buffer` to the `Destination` `&self`.
    ///
    /// Fails if the matching receive operation has not been posted.
    ///
    /// # Standard section(s)
    ///
    /// 3.4
    fn ready_send<Buf: ?Sized>(&self, buf: &Buf)
    where
        Buf: Buffer,
    {
        self.ready_send_with_tag(buf, Tag::default())
    }

    /// Initiate an immediate (non-blocking) standard mode send operation.
    ///
    /// Initiate sending the data in `buf` in standard mode and tag it.
    ///
    /// # Standard section(s)
    ///
    /// 3.7.2
    fn immediate_send_with_tag<'a, Sc, Buf: ?Sized>(
        &self,
        scope: Sc,
        buf: &'a Buf,
        tag: Tag,
    ) -> Request<'a, Sc>
    where
        Buf: 'a + Buffer,
        Sc: Scope<'a>,
    {
        unsafe {
            Request::from_raw(
                with_uninitialized(|request| {
                    ffi::MPI_Isend(
                        buf.pointer(),
                        buf.count(),
                        buf.as_datatype().as_raw(),
                        self.destination_rank(),
                        tag,
                        self.as_communicator().as_raw(),
                        request,
                    )
                })
                .1,
                scope,
            )
        }
    }

    /// Initiate an immediate (non-blocking) standard mode send operation.
    ///
    /// Initiate sending the data in `buf` in standard mode.
    ///
    /// # Examples
    /// See `examples/immediate.rs`
    ///
    /// # Standard section(s)
    ///
    /// 3.7.2
    fn immediate_send<'a, Sc, Buf: ?Sized>(&self, scope: Sc, buf: &'a Buf) -> Request<'a, Sc>
    where
        Buf: 'a + Buffer,
        Sc: Scope<'a>,
    {
        self.immediate_send_with_tag(scope, buf, Tag::default())
    }

    /// Initiate an immediate (non-blocking) buffered mode send operation.
    ///
    /// Initiate sending the data in `buf` in buffered mode and tag it.
    ///
    /// # Standard section(s)
    ///
    /// 3.7.2
    fn immediate_buffered_send_with_tag<'a, Sc, Buf: ?Sized>(
        &self,
        scope: Sc,
        buf: &'a Buf,
        tag: Tag,
    ) -> Request<'a, Sc>
    where
        Buf: 'a + Buffer,
        Sc: Scope<'a>,
    {
        unsafe {
            Request::from_raw(
                with_uninitialized(|request| {
                    ffi::MPI_Ibsend(
                        buf.pointer(),
                        buf.count(),
                        buf.as_datatype().as_raw(),
                        self.destination_rank(),
                        tag,
                        self.as_communicator().as_raw(),
                        request,
                    )
                })
                .1,
                scope,
            )
        }
    }

    /// Initiate an immediate (non-blocking) buffered mode send operation.
    ///
    /// Initiate sending the data in `buf` in buffered mode.
    ///
    /// # Standard section(s)
    ///
    /// 3.7.2
    fn immediate_buffered_send<'a, Sc, Buf: ?Sized>(
        &self,
        scope: Sc,
        buf: &'a Buf,
    ) -> Request<'a, Sc>
    where
        Buf: 'a + Buffer,
        Sc: Scope<'a>,
    {
        self.immediate_buffered_send_with_tag(scope, buf, Tag::default())
    }

    /// Initiate an immediate (non-blocking) synchronous mode send operation.
    ///
    /// Initiate sending the data in `buf` in synchronous mode and tag it.
    ///
    /// # Standard section(s)
    ///
    /// 3.7.2
    fn immediate_synchronous_send_with_tag<'a, Sc, Buf: ?Sized>(
        &self,
        scope: Sc,
        buf: &'a Buf,
        tag: Tag,
    ) -> Request<'a, Sc>
    where
        Buf: 'a + Buffer,
        Sc: Scope<'a>,
    {
        unsafe {
            Request::from_raw(
                with_uninitialized(|request| {
                    ffi::MPI_Issend(
                        buf.pointer(),
                        buf.count(),
                        buf.as_datatype().as_raw(),
                        self.destination_rank(),
                        tag,
                        self.as_communicator().as_raw(),
                        request,
                    )
                })
                .1,
                scope,
            )
        }
    }

    /// Initiate an immediate (non-blocking) synchronous mode send operation.
    ///
    /// Initiate sending the data in `buf` in synchronous mode.
    ///
    /// # Standard section(s)
    ///
    /// 3.7.2
    fn immediate_synchronous_send<'a, Sc, Buf: ?Sized>(
        &self,
        scope: Sc,
        buf: &'a Buf,
    ) -> Request<'a, Sc>
    where
        Buf: 'a + Buffer,
        Sc: Scope<'a>,
    {
        self.immediate_synchronous_send_with_tag(scope, buf, Tag::default())
    }

    /// Initiate an immediate (non-blocking) ready mode send operation.
    ///
    /// Initiate sending the data in `buf` in ready mode and tag it.
    ///
    /// # Standard section(s)
    ///
    /// 3.7.2
    fn immediate_ready_send_with_tag<'a, Sc, Buf: ?Sized>(
        &self,
        scope: Sc,
        buf: &'a Buf,
        tag: Tag,
    ) -> Request<'a, Sc>
    where
        Buf: 'a + Buffer,
        Sc: Scope<'a>,
    {
        unsafe {
            Request::from_raw(
                with_uninitialized(|request| {
                    ffi::MPI_Irsend(
                        buf.pointer(),
                        buf.count(),
                        buf.as_datatype().as_raw(),
                        self.destination_rank(),
                        tag,
                        self.as_communicator().as_raw(),
                        request,
                    )
                })
                .1,
                scope,
            )
        }
    }

    /// Initiate an immediate (non-blocking) ready mode send operation.
    ///
    /// Initiate sending the data in `buf` in ready mode.
    ///
    /// # Examples
    ///
    /// See `examples/immediate.rs`
    ///
    /// # Standard section(s)
    ///
    /// 3.7.2
    fn immediate_ready_send<'a, Sc, Buf: ?Sized>(&self, scope: Sc, buf: &'a Buf) -> Request<'a, Sc>
    where
        Buf: 'a + Buffer,
        Sc: Scope<'a>,
    {
        self.immediate_ready_send_with_tag(scope, buf, Tag::default())
    }
}

impl<'a, C> Destination for Process<'a, C>
where
    C: 'a + Communicator,
{
    fn destination_rank(&self) -> Rank {
        self.rank()
    }
}

/// Describes the result of a point to point receive operation.
///
/// # Standard section(s)
///
/// 3.2.5
#[derive(Copy, Clone)]
pub struct Status(MPI_Status);

impl Status {
    /// Construct a `Status` value from the raw MPI type
    pub fn from_raw(status: MPI_Status) -> Status {
        Status(status)
    }

    /// The rank of the message source
    pub fn source_rank(&self) -> Rank {
        self.0.MPI_SOURCE
    }

    /// The message tag
    pub fn tag(&self) -> Tag {
        self.0.MPI_TAG
    }

    /// Number of instances of the type contained in the message
    pub fn count<D: Datatype>(&self, d: D) -> Count {
        unsafe { with_uninitialized(|count| ffi::MPI_Get_count(&self.0, d.as_raw(), count)).1 }
    }
}

impl fmt::Debug for Status {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        write!(
            f,
            "Status {{ source_rank: {}, tag: {} }}",
            self.source_rank(),
            self.tag()
        )
    }
}

/// Describes a pending incoming message, probed by a `matched_probe()`.
///
/// # Standard section(s)
///
/// 3.8.2
#[must_use]
pub struct Message(MPI_Message);

impl Message {
    /// True if the `Source` for the probe was the null process.
    pub fn is_no_proc(&self) -> bool {
        self.as_raw() == unsafe { ffi::RSMPI_MESSAGE_NO_PROC }
    }

    /// Receive a previously probed message containing a single instance of type `Msg`.
    ///
    /// Receives the message `&self` which contains a single instance of type `Msg`.
    ///
    /// # Panics
    /// Received an empty message
    ///
    /// # Standard section(s)
    ///
    /// 3.8.3
    pub fn matched_receive<Msg>(mut self) -> (Msg, Status)
    where
        Msg: Equivalence,
    {
        unsafe {
            let (_, res, status) = with_uninitialized2(|res, status| {
                ffi::MPI_Mrecv(
                    res as _,
                    1,
                    Msg::equivalent_datatype().as_raw(),
                    self.as_raw_mut(),
                    status,
                )
            });
            let status = Status(status);
            if status.count(Msg::equivalent_datatype()) == 0 {
                panic!("Received an empty message.");
            }
            (res, status)
        }
    }

    /// Receive a previously probed message into a `Buffer`.
    ///
    /// Receive the message `&self` with contents matching `buf`.
    ///
    /// # Standard section(s)
    ///
    /// 3.8.3
    pub fn matched_receive_into<Buf: ?Sized>(mut self, buf: &mut Buf) -> Status
    where
        Buf: BufferMut,
    {
        let status;
        unsafe {
            status = with_uninitialized(|status| {
                ffi::MPI_Mrecv(
                    buf.pointer_mut(),
                    buf.count(),
                    buf.as_datatype().as_raw(),
                    self.as_raw_mut(),
                    status,
                )
            })
            .1;
            assert_eq!(self.as_raw(), ffi::RSMPI_MESSAGE_NULL);
        };
        Status(status)
    }

    /// Asynchronously receive a previously probed message into a `Buffer`.
    ///
    /// Asynchronously receive the message `&self` with contents matching `buf`.
    ///
    /// # Panics
    /// ?
    ///
    /// # Standard section(s)
    ///
    /// 3.8.3
    pub fn immediate_matched_receive_into<'a, Sc, Buf: ?Sized + 'a>(
        mut self,
        scope: Sc,
        buf: &'a mut Buf,
    ) -> Request<'a, Sc>
    where
        Buf: BufferMut,
        Sc: Scope<'a>,
    {
        unsafe {
            let request = with_uninitialized(|request| {
                ffi::MPI_Imrecv(
                    buf.pointer_mut(),
                    buf.count(),
                    buf.as_datatype().as_raw(),
                    self.as_raw_mut(),
                    request,
                )
            })
            .1;
            assert_eq!(self.as_raw(), ffi::RSMPI_MESSAGE_NULL);
            Request::from_raw(request, scope)
        }
    }
}

unsafe impl AsRaw for Message {
    type Raw = MPI_Message;
    fn as_raw(&self) -> Self::Raw {
        self.0
    }
}

unsafe impl AsRawMut for Message {
    fn as_raw_mut(&mut self) -> *mut <Self as AsRaw>::Raw {
        &mut self.0
    }
}

/// # Panics
/// ?
impl Drop for Message {
    fn drop(&mut self) {
        assert_eq!(
            self.as_raw(),
            unsafe { ffi::RSMPI_MESSAGE_NULL },
            "matched message dropped without receiving."
        );
    }
}

/// Receive a previously probed message containing multiple instances of type `Msg` into a `Vec`.
///
/// # Standard section(s)
///
/// 3.8.3
pub trait MatchedReceiveVec {
    /// Receives the message `&self` which contains multiple instances of type `Msg` into a `Vec`.
    fn matched_receive_vec<Msg>(self) -> (Vec<Msg>, Status)
    where
        Msg: Equivalence;
}

impl MatchedReceiveVec for (Message, Status) {
    fn matched_receive_vec<Msg>(self) -> (Vec<Msg>, Status)
    where
        Msg: Equivalence,
    {
        #[repr(transparent)]
        struct UninitMsg<M>(MaybeUninit<M>);

        unsafe impl<M: Equivalence> Equivalence for UninitMsg<M> {
            type Out = M::Out;

            fn equivalent_datatype() -> Self::Out {
                M::equivalent_datatype()
            }
        }

        let (message, status) = self;
        let count = status
            .count(Msg::equivalent_datatype())
            .value_as()
            .expect("Message element count cannot be expressed as a usize.");

        let mut res = (0..count)
            .map(|_| UninitMsg::<Msg>(MaybeUninit::uninit()))
            .collect::<Vec<_>>();

        let status = message.matched_receive_into(&mut res[..]);

        let res = unsafe { transmute(res) };

        (res, status)
    }
}

/// Sends `msg` to `destination` tagging it `sendtag` and simultaneously receives an
/// instance of `R` tagged `receivetag` from `source`.
///
/// # Panics
/// ?
///
/// # Standard section(s)
///
/// 3.10
pub fn send_receive_with_tags<M, D, R, S>(
    msg: &M,
    destination: &D,
    sendtag: Tag,
    source: &S,
    receivetag: Tag,
) -> (R, Status)
where
    M: Equivalence,
    D: Destination,
    R: Equivalence,
    S: Source,
{
    assert_eq!(
        source
            .as_communicator()
            .compare(destination.as_communicator()),
        CommunicatorRelation::Identical
    );
    unsafe {
        let (_, res, status) = with_uninitialized2(|res, status| {
            ffi::MPI_Sendrecv(
                msg.pointer(),
                msg.count(),
                msg.as_datatype().as_raw(),
                destination.destination_rank(),
                sendtag,
                res as _,
                1,
                R::equivalent_datatype().as_raw(),
                source.source_rank(),
                receivetag,
                source.as_communicator().as_raw(),
                status,
            )
        });
        let status = Status(status);
        (res, status)
    }
}

/// Sends `msg` to `destination` and simultaneously receives an instance of `R` from
/// `source`.
///
/// # Examples
/// See `examples/send_receive.rs`
///
/// # Standard section(s)
///
/// 3.10
pub fn send_receive<R, M, D, S>(msg: &M, destination: &D, source: &S) -> (R, Status)
where
    M: Equivalence,
    D: Destination,
    R: Equivalence,
    S: Source,
{
    send_receive_with_tags(msg, destination, Tag::default(), source, unsafe {
        ffi::RSMPI_ANY_TAG
    })
}

/// Sends the contents of `msg` to `destination` tagging it `sendtag` and
/// simultaneously receives a message tagged `receivetag` from `source` into
/// `buf`.
///
/// # Panics
/// ?
///
/// # Standard section(s)
///
/// 3.10
pub fn send_receive_into_with_tags<M: ?Sized, D, B: ?Sized, S>(
    msg: &M,
    destination: &D,
    sendtag: Tag,
    buf: &mut B,
    source: &S,
    receivetag: Tag,
) -> Status
where
    M: Buffer,
    D: Destination,
    B: BufferMut,
    S: Source,
{
    assert_eq!(
        source
            .as_communicator()
            .compare(destination.as_communicator()),
        CommunicatorRelation::Identical
    );
    unsafe {
        Status(
            with_uninitialized(|status| {
                ffi::MPI_Sendrecv(
                    msg.pointer(),
                    msg.count(),
                    msg.as_datatype().as_raw(),
                    destination.destination_rank(),
                    sendtag,
                    buf.pointer_mut(),
                    buf.count(),
                    buf.as_datatype().as_raw(),
                    source.source_rank(),
                    receivetag,
                    source.as_communicator().as_raw(),
                    status,
                )
            })
            .1,
        )
    }
}

/// Sends the contents of `msg` to `destination` and
/// simultaneously receives a message from `source` into
/// `buf`.
///
/// # Standard section(s)
///
/// 3.10
pub fn send_receive_into<M: ?Sized, D, B: ?Sized, S>(
    msg: &M,
    destination: &D,
    buf: &mut B,
    source: &S,
) -> Status
where
    M: Buffer,
    D: Destination,
    B: BufferMut,
    S: Source,
{
    send_receive_into_with_tags(msg, destination, Tag::default(), buf, source, unsafe {
        ffi::RSMPI_ANY_TAG
    })
}

/// Sends the contents of `buf` to `destination` tagging it `sendtag` and
/// simultaneously receives a message tagged `receivetag` from `source` and replaces the
/// contents of `buf` with it.
///
/// # Panics
/// ?
///
/// # Standard section(s)
///
/// 3.10
pub fn send_receive_replace_into_with_tags<B: ?Sized, D, S>(
    buf: &mut B,
    destination: &D,
    sendtag: Tag,
    source: &S,
    receivetag: Tag,
) -> Status
where
    B: BufferMut,
    D: Destination,
    S: Source,
{
    assert_eq!(
        source
            .as_communicator()
            .compare(destination.as_communicator()),
        CommunicatorRelation::Identical
    );
    unsafe {
        Status(
            with_uninitialized(|status| {
                ffi::MPI_Sendrecv_replace(
                    buf.pointer_mut(),
                    buf.count(),
                    buf.as_datatype().as_raw(),
                    destination.destination_rank(),
                    sendtag,
                    source.source_rank(),
                    receivetag,
                    source.as_communicator().as_raw(),
                    status,
                )
            })
            .1,
        )
    }
}

/// Sends the contents of `buf` to `destination` and
/// simultaneously receives a message from `source` and replaces the contents of
/// `buf` with it.
///
/// # Standard section(s)
///
/// 3.10
pub fn send_receive_replace_into<B: ?Sized, D, S>(
    buf: &mut B,
    destination: &D,
    source: &S,
) -> Status
where
    B: BufferMut,
    D: Destination,
    S: Source,
{
    send_receive_replace_into_with_tags(buf, destination, Tag::default(), source, unsafe {
        ffi::RSMPI_ANY_TAG
    })
}

/// Will contain a value of type `T` received via a non-blocking receive operation.
#[must_use]
pub struct ReceiveFuture<T> {
    val: *mut T,
    req: Request<'static>,
}

impl<T> ReceiveFuture<T>
where
    T: Equivalence,
{
    /// Wait for the receive operation to finish and return the received data.
    ///
    /// # Panics
    /// Received an empty message
    pub fn get(self) -> (T, Status) {
        let status = self.req.wait();
        if status.count(T::equivalent_datatype()) == 0 {
            panic!("Received an empty message into a ReceiveFuture.");
        }
        unsafe { (ptr::read(self.val), status) }
    }

    /// Check whether the receive operation has finished.
    ///
    /// If the operation has finished, the data received is returned. Otherwise the future itself
    /// is returned.
    ///
    /// # Panics
    /// Received an empty message
    ///
    /// # Errors
    /// Receive has not finished yet
    pub fn r#try(mut self) -> Result<(T, Status), Self> {
        match self.req.test() {
            Ok(status) => {
                if status.count(T::equivalent_datatype()) == 0 {
                    panic!("Received an empty message into a ReceiveFuture.");
                }
                unsafe { Ok((ptr::read(self.val), status)) }
            }
            Err(request) => {
                self.req = request;
                Err(self)
            }
        }
    }
}