use std::{mem, fmt};
use std::os::raw::c_int;
use conv::ConvUtil;
use super::{Count, Tag};
use ffi;
use ffi::{MPI_Status, MPI_Message, MPI_Request};
use datatype::traits::*;
use raw::traits::*;
use request::{Request, Scope, StaticScope};
use topology::{Rank, Process, AnyProcess, CommunicatorRelation};
use topology::traits::*;
pub mod traits {
pub use super::{Source, Destination, MatchedReceiveVec};
}
pub unsafe trait Source: AsCommunicator {
fn source_rank(&self) -> Rank;
fn probe_with_tag(&self, tag: Tag) -> Status {
let mut status: MPI_Status = unsafe { mem::uninitialized() };
unsafe {
ffi::MPI_Probe(self.source_rank(),
tag,
self.as_communicator().as_raw(),
&mut status);
};
Status(status)
}
fn probe(&self) -> Status {
self.probe_with_tag(unsafe_extern_static!(ffi::RSMPI_ANY_TAG))
}
fn matched_probe_with_tag(&self, tag: Tag) -> (Message, Status) {
let mut message: MPI_Message = unsafe { mem::uninitialized() };
let mut status: MPI_Status = unsafe { mem::uninitialized() };
unsafe {
ffi::MPI_Mprobe(self.source_rank(),
tag,
self.as_communicator().as_raw(),
&mut message,
&mut status);
}
(Message(message), Status(status))
}
fn matched_probe(&self) -> (Message, Status) {
self.matched_probe_with_tag(unsafe_extern_static!(ffi::RSMPI_ANY_TAG))
}
fn receive_with_tag<Msg>(&self, tag: Tag) -> (Msg, Status)
where Msg: Equivalence
{
let mut res: Msg = unsafe { mem::uninitialized() };
let status = self.receive_into_with_tag(&mut res, tag);
if status.count(Msg::equivalent_datatype()) == 0 {
panic!("Received an empty message.");
}
(res, status)
}
fn receive<Msg>(&self) -> (Msg, Status)
where Msg: Equivalence
{
self.receive_with_tag(unsafe_extern_static!(ffi::RSMPI_ANY_TAG))
}
fn receive_into_with_tag<Buf: ?Sized>(&self, buf: &mut Buf, tag: Tag) -> Status
where Buf: BufferMut
{
let mut status: MPI_Status = unsafe { mem::uninitialized() };
unsafe {
ffi::MPI_Recv(buf.pointer_mut(),
buf.count(),
buf.as_datatype().as_raw(),
self.source_rank(),
tag,
self.as_communicator().as_raw(),
&mut status);
}
Status(status)
}
fn receive_into<Buf: ?Sized>(&self, buf: &mut Buf) -> Status
where Buf: BufferMut
{
self.receive_into_with_tag(buf, unsafe_extern_static!(ffi::RSMPI_ANY_TAG))
}
fn receive_vec_with_tag<Msg>(&self, tag: Tag) -> (Vec<Msg>, Status)
where Msg: Equivalence
{
self.matched_probe_with_tag(tag).matched_receive_vec()
}
fn receive_vec<Msg>(&self) -> (Vec<Msg>, Status)
where Msg: Equivalence
{
self.receive_vec_with_tag(unsafe_extern_static!(ffi::RSMPI_ANY_TAG))
}
fn immediate_receive_into_with_tag<'a, Sc, Buf: ?Sized>(&self,
scope: Sc,
buf: &'a mut Buf,
tag: Tag)
-> Request<'a, Sc>
where Buf: 'a + BufferMut,
Sc: Scope<'a>
{
let mut request: MPI_Request = unsafe { mem::uninitialized() };
unsafe {
ffi::MPI_Irecv(buf.pointer_mut(),
buf.count(),
buf.as_datatype().as_raw(),
self.source_rank(),
tag,
self.as_communicator().as_raw(),
&mut request);
Request::from_raw(request, scope)
}
}
fn immediate_receive_into<'a, Sc, Buf: ?Sized>(&self,
scope: Sc,
buf: &'a mut Buf)
-> Request<'a, Sc>
where Buf: 'a + BufferMut,
Sc: Scope<'a>
{
self.immediate_receive_into_with_tag(scope, buf, unsafe_extern_static!(ffi::RSMPI_ANY_TAG))
}
fn immediate_receive_with_tag<Msg>(&self, tag: Tag) -> ReceiveFuture<Msg>
where Msg: Equivalence
{
let mut val: Box<Msg> = Box::new(unsafe { mem::uninitialized() });
let mut req: MPI_Request = unsafe { mem::uninitialized() };
unsafe {
ffi::MPI_Irecv((&mut *(val)).pointer_mut(),
val.count(),
Msg::equivalent_datatype().as_raw(),
self.source_rank(),
tag,
self.as_communicator().as_raw(),
&mut req);
ReceiveFuture {
val: val,
req: Request::from_raw(req, StaticScope)
}
}
}
fn immediate_receive<Msg>(&self) -> ReceiveFuture<Msg>
where Msg: Equivalence
{
self.immediate_receive_with_tag(unsafe_extern_static!(ffi::RSMPI_ANY_TAG))
}
fn immediate_probe_with_tag(&self, tag: Tag) -> Option<Status> {
let mut status: MPI_Status = unsafe { mem::uninitialized() };
let mut flag: c_int = unsafe { mem::uninitialized() };
unsafe {
ffi::MPI_Iprobe(self.source_rank(),
tag,
self.as_communicator().as_raw(),
&mut flag,
&mut status);
};
if flag != 0 {
Some(Status(status))
} else {
None
}
}
fn immediate_probe(&self) -> Option<Status> {
self.immediate_probe_with_tag(unsafe_extern_static!(ffi::RSMPI_ANY_TAG))
}
fn immediate_matched_probe_with_tag(&self, tag: Tag) -> Option<(Message, Status)> {
let mut message: MPI_Message = unsafe { mem::uninitialized() };
let mut status: MPI_Status = unsafe { mem::uninitialized() };
let mut flag: c_int = unsafe { mem::uninitialized() };
unsafe {
ffi::MPI_Improbe(self.source_rank(),
tag,
self.as_communicator().as_raw(),
&mut flag,
&mut message,
&mut status);
}
if flag != 0 {
Some((Message(message), Status(status)))
} else {
None
}
}
fn immediate_matched_probe(&self) -> Option<(Message, Status)> {
self.immediate_matched_probe_with_tag(unsafe_extern_static!(ffi::RSMPI_ANY_TAG))
}
}
unsafe impl<'a, C> Source for AnyProcess<'a, C> where C: 'a + Communicator
{
fn source_rank(&self) -> Rank {
unsafe_extern_static!(ffi::RSMPI_ANY_SOURCE)
}
}
unsafe impl<'a, C> Source for Process<'a, C> where C: 'a + Communicator
{
fn source_rank(&self) -> Rank {
self.rank()
}
}
pub trait Destination: AsCommunicator {
fn destination_rank(&self) -> Rank;
fn send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
where Buf: Buffer
{
unsafe {
ffi::MPI_Send(buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw());
}
}
fn send<Buf: ?Sized>(&self, buf: &Buf)
where Buf: Buffer
{
self.send_with_tag(buf, Tag::default())
}
fn buffered_send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
where Buf: Buffer
{
unsafe {
ffi::MPI_Bsend(buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw());
}
}
fn buffered_send<Buf: ?Sized>(&self, buf: &Buf)
where Buf: Buffer
{
self.buffered_send_with_tag(buf, Tag::default())
}
fn synchronous_send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
where Buf: Buffer
{
unsafe {
ffi::MPI_Ssend(buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw());
}
}
fn synchronous_send<Buf: ?Sized>(&self, buf: &Buf)
where Buf: Buffer
{
self.synchronous_send_with_tag(buf, Tag::default())
}
fn ready_send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
where Buf: Buffer
{
unsafe {
ffi::MPI_Rsend(buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw());
}
}
fn ready_send<Buf: ?Sized>(&self, buf: &Buf)
where Buf: Buffer
{
self.ready_send_with_tag(buf, Tag::default())
}
fn immediate_send_with_tag<'a, Sc, Buf: ?Sized>(&self,
scope: Sc,
buf: &'a Buf,
tag: Tag)
-> Request<'a, Sc>
where Buf: 'a + Buffer,
Sc: Scope<'a>
{
let mut request: MPI_Request = unsafe { mem::uninitialized() };
unsafe {
ffi::MPI_Isend(buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw(),
&mut request);
Request::from_raw(request, scope)
}
}
fn immediate_send<'a, Sc, Buf: ?Sized>(&self, scope: Sc, buf: &'a Buf) -> Request<'a, Sc>
where Buf: 'a + Buffer,
Sc: Scope<'a>
{
self.immediate_send_with_tag(scope, buf, Tag::default())
}
fn immediate_buffered_send_with_tag<'a, Sc, Buf: ?Sized>(&self,
scope: Sc,
buf: &'a Buf,
tag: Tag)
-> Request<'a, Sc>
where Buf: 'a + Buffer,
Sc: Scope<'a>
{
let mut request: MPI_Request = unsafe { mem::uninitialized() };
unsafe {
ffi::MPI_Ibsend(buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw(),
&mut request);
Request::from_raw(request, scope)
}
}
fn immediate_buffered_send<'a, Sc, Buf: ?Sized>(&self, scope: Sc, buf: &'a Buf)
-> Request<'a, Sc>
where Buf: 'a + Buffer,
Sc: Scope<'a>
{
self.immediate_buffered_send_with_tag(scope, buf, Tag::default())
}
fn immediate_synchronous_send_with_tag<'a, Sc, Buf: ?Sized>(&self,
scope: Sc,
buf: &'a Buf,
tag: Tag)
-> Request<'a, Sc>
where Buf: 'a + Buffer,
Sc: Scope<'a>
{
let mut request: MPI_Request = unsafe { mem::uninitialized() };
unsafe {
ffi::MPI_Issend(buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw(),
&mut request);
Request::from_raw(request, scope)
}
}
fn immediate_synchronous_send<'a, Sc, Buf: ?Sized>(&self,
scope: Sc,
buf: &'a Buf)
-> Request<'a, Sc>
where Buf: 'a + Buffer,
Sc: Scope<'a>
{
self.immediate_synchronous_send_with_tag(scope, buf, Tag::default())
}
fn immediate_ready_send_with_tag<'a, Sc, Buf: ?Sized>(&self,
scope: Sc,
buf: &'a Buf,
tag: Tag)
-> Request<'a, Sc>
where Buf: 'a + Buffer,
Sc: Scope<'a>
{
let mut request: MPI_Request = unsafe { mem::uninitialized() };
unsafe {
ffi::MPI_Irsend(buf.pointer(),
buf.count(),
buf.as_datatype().as_raw(),
self.destination_rank(),
tag,
self.as_communicator().as_raw(),
&mut request);
Request::from_raw(request, scope)
}
}
fn immediate_ready_send<'a, Sc, Buf: ?Sized>(&self, scope: Sc, buf: &'a Buf)
-> Request<'a, Sc>
where Buf: 'a + Buffer,
Sc: Scope<'a>
{
self.immediate_ready_send_with_tag(scope, buf, Tag::default())
}
}
impl<'a, C> Destination for Process<'a, C> where C: 'a + Communicator
{
fn destination_rank(&self) -> Rank {
self.rank()
}
}
#[derive(Copy, Clone)]
pub struct Status(MPI_Status);
impl Status {
pub fn from_raw(status: MPI_Status) -> Status {
Status(status)
}
pub fn source_rank(&self) -> Rank {
self.0.MPI_SOURCE
}
pub fn tag(&self) -> Tag {
self.0.MPI_TAG
}
pub fn count<D: Datatype>(&self, d: D) -> Count {
let mut count: Count = unsafe { mem::uninitialized() };
unsafe { ffi::MPI_Get_count(&self.0, d.as_raw(), &mut count) };
count
}
}
impl fmt::Debug for Status {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f,
"Status {{ source_rank: {}, tag: {} }}",
self.source_rank(),
self.tag())
}
}
#[must_use]
pub struct Message(MPI_Message);
impl Message {
pub fn is_no_proc(&self) -> bool {
self.as_raw() == unsafe_extern_static!(ffi::RSMPI_MESSAGE_NO_PROC)
}
pub fn matched_receive<Msg>(self) -> (Msg, Status)
where Msg: Equivalence
{
let mut res: Msg = unsafe { mem::uninitialized() };
let status = self.matched_receive_into(&mut res);
if status.count(Msg::equivalent_datatype()) == 0 {
panic!("Received an empty message.");
}
(res, status)
}
pub fn matched_receive_into<Buf: ?Sized>(mut self, buf: &mut Buf) -> Status
where Buf: BufferMut
{
let mut status: MPI_Status = unsafe { mem::uninitialized() };
unsafe {
ffi::MPI_Mrecv(buf.pointer_mut(),
buf.count(),
buf.as_datatype().as_raw(),
self.as_raw_mut(),
&mut status);
assert_eq!(self.as_raw(), ffi::RSMPI_MESSAGE_NULL);
}
Status(status)
}
pub fn immediate_matched_receive_into<'a, Sc, Buf: ?Sized + 'a>(mut self,
scope: Sc,
buf: &'a mut Buf)
-> Request<'a, Sc>
where Buf: BufferMut,
Sc: Scope<'a>
{
let mut request: MPI_Request = unsafe { mem::uninitialized() };
unsafe {
ffi::MPI_Imrecv(buf.pointer_mut(),
buf.count(),
buf.as_datatype().as_raw(),
self.as_raw_mut(),
&mut request);
assert_eq!(self.as_raw(), ffi::RSMPI_MESSAGE_NULL);
Request::from_raw(request, scope)
}
}
}
unsafe impl AsRaw for Message {
type Raw = MPI_Message;
fn as_raw(&self) -> Self::Raw {
self.0
}
}
unsafe impl AsRawMut for Message {
fn as_raw_mut(&mut self) -> *mut <Self as AsRaw>::Raw {
&mut self.0
}
}
impl Drop for Message {
fn drop(&mut self) {
assert_eq!(self.as_raw(), unsafe_extern_static!(ffi::RSMPI_MESSAGE_NULL),
"matched message dropped without receiving.");
}
}
pub trait MatchedReceiveVec {
fn matched_receive_vec<Msg>(self) -> (Vec<Msg>, Status) where Msg: Equivalence;
}
impl MatchedReceiveVec for (Message, Status) {
fn matched_receive_vec<Msg>(self) -> (Vec<Msg>, Status)
where Msg: Equivalence
{
let (message, status) = self;
let count = status.count(Msg::equivalent_datatype())
.value_as()
.expect("Message element count cannot be expressed as a usize.");
let mut res = Vec::with_capacity(count);
unsafe {
res.set_len(count);
}
let status = message.matched_receive_into(&mut res[..]);
(res, status)
}
}
pub fn send_receive_with_tags<M, D, R, S>(msg: &M,
destination: &D,
sendtag: Tag,
source: &S,
receivetag: Tag)
-> (R, Status)
where M: Equivalence,
D: Destination,
R: Equivalence,
S: Source
{
let mut res: R = unsafe { mem::uninitialized() };
let status = send_receive_into_with_tags(msg,
destination,
sendtag,
&mut res,
source,
receivetag);
(res, status)
}
pub fn send_receive<R, M, D, S>(msg: &M, destination: &D, source: &S) -> (R, Status)
where M: Equivalence,
D: Destination,
R: Equivalence,
S: Source
{
send_receive_with_tags(msg, destination, Tag::default(), source, unsafe_extern_static!(ffi::RSMPI_ANY_TAG))
}
pub fn send_receive_into_with_tags<M: ?Sized, D, B: ?Sized, S>(msg: &M,
destination: &D,
sendtag: Tag,
buf: &mut B,
source: &S,
receivetag: Tag)
-> Status
where M: Buffer,
D: Destination,
B: BufferMut,
S: Source
{
assert_eq!(source.as_communicator().compare(destination.as_communicator()),
CommunicatorRelation::Identical);
let mut status: MPI_Status = unsafe { mem::uninitialized() };
unsafe {
ffi::MPI_Sendrecv(msg.pointer(),
msg.count(),
msg.as_datatype().as_raw(),
destination.destination_rank(),
sendtag,
buf.pointer_mut(),
buf.count(),
buf.as_datatype().as_raw(),
source.source_rank(),
receivetag,
source.as_communicator().as_raw(),
&mut status);
}
Status(status)
}
pub fn send_receive_into<M: ?Sized, D, B: ?Sized, S>(msg: &M,
destination: &D,
buf: &mut B,
source: &S)
-> Status
where M: Buffer,
D: Destination,
B: BufferMut,
S: Source
{
send_receive_into_with_tags(msg,
destination,
Tag::default(),
buf,
source,
unsafe_extern_static!(ffi::RSMPI_ANY_TAG))
}
pub fn send_receive_replace_into_with_tags<B: ?Sized, D, S>(buf: &mut B,
destination: &D,
sendtag: Tag,
source: &S,
receivetag: Tag)
-> Status
where B: BufferMut,
D: Destination,
S: Source
{
assert_eq!(source.as_communicator().compare(destination.as_communicator()),
CommunicatorRelation::Identical);
let mut status: MPI_Status = unsafe { mem::uninitialized() };
unsafe {
ffi::MPI_Sendrecv_replace(buf.pointer_mut(),
buf.count(),
buf.as_datatype().as_raw(),
destination.destination_rank(),
sendtag,
source.source_rank(),
receivetag,
source.as_communicator().as_raw(),
&mut status);
}
Status(status)
}
pub fn send_receive_replace_into<B: ?Sized, D, S>(buf: &mut B,
destination: &D,
source: &S)
-> Status
where B: BufferMut,
D: Destination,
S: Source
{
send_receive_replace_into_with_tags(buf,
destination,
Tag::default(),
source,
unsafe_extern_static!(ffi::RSMPI_ANY_TAG))
}
#[must_use]
pub struct ReceiveFuture<T> {
val: Box<T>,
req: Request<'static>,
}
impl<T> ReceiveFuture<T> where T: Equivalence {
pub fn get(self) -> (T, Status) {
let status = self.req.wait();
if status.count(T::equivalent_datatype()) == 0 {
panic!("Received an empty message into a ReceiveFuture.");
}
(*self.val, status)
}
pub fn try(mut self) -> Result<(T, Status), Self> {
match self.req.test() {
Ok(status) => {
if status.count(T::equivalent_datatype()) == 0 {
panic!("Received an empty message into a ReceiveFuture.");
}
Ok((*self.val, status))
},
Err(request) => {
self.req = request;
Err(self)
}
}
}
}