mpi_fork_fnsp/
point_to_point.rs

1//! Point to point communication
2//!
3//! Endpoints of communication are mostly described by types that implement the `Source` and
4//! `Destination` trait. Communication operations are implemented as default methods on those
5//! traits.
6//!
7//! # Unfinished features
8//!
9//! - **3.2.6**: `MPI_STATUS_IGNORE`
10//! - **3.6**: Buffer usage, `MPI_Buffer_attach()`, `MPI_Buffer_detach()`
11//! - **3.9**: Persistent requests, `MPI_Send_init()`, `MPI_Bsend_init()`, `MPI_Ssend_init()`,
12//! `MPI_Rsend_init()`, `MPI_Recv_init()`, `MPI_Start()`, `MPI_Startall()`
13#![allow(clippy::missing_panics_doc, clippy::ptr_as_ptr)]
14use std::alloc::{self, Layout};
15use std::mem::{transmute, MaybeUninit};
16use std::{fmt, ptr};
17
18use conv::ConvUtil;
19
20use super::{Count, Tag};
21
22use crate::ffi;
23use crate::ffi::{MPI_Message, MPI_Status};
24
25use crate::datatype::traits::{
26    AsDatatype, Buffer, BufferMut, Collection, Datatype, Equivalence, Pointer,
27};
28use crate::raw::traits::{AsRaw, AsRawMut};
29use crate::request::{Request, Scope, StaticScope};
30use crate::topology::traits::{AsCommunicator, Communicator};
31use crate::topology::{AnyProcess, CommunicatorRelation, Process, Rank};
32use crate::{with_uninitialized, with_uninitialized2};
33
34// TODO: rein in _with_tag ugliness, use optional tags or make tag part of Source and Destination
35
36/// Point to point communication traits
37pub mod traits {
38    pub use super::{Destination, MatchedReceiveVec, Source};
39}
40
41/// Something that can be used as the source in a point to point receive operation
42///
43/// # Examples
44///
45/// - A `Process` used as a source for a receive operation will receive data only from the
46/// identified process.
47/// - A communicator can also be used as a source via the `AnyProcess` identifier.
48///
49/// # Standard section(s)
50///
51/// 3.2.3
52pub unsafe trait Source: AsCommunicator {
53    /// `Rank` that identifies the source
54    fn source_rank(&self) -> Rank;
55
56    /// Probe a source for incoming messages.
57    ///
58    /// Probe `Source` `&self` for incoming messages with a certain tag.
59    ///
60    /// An ordinary `probe()` returns a `Status` which allows inspection of the properties of the
61    /// incoming message, but does not guarantee reception by a subsequent `receive()` (especially
62    /// in a multi-threaded set-up). For a probe operation with stronger guarantees, see
63    /// `matched_probe()`.
64    ///
65    /// # Standard section(s)
66    ///
67    /// 3.8.1
68    fn probe_with_tag(&self, tag: Tag) -> Status {
69        unsafe {
70            Status(
71                with_uninitialized(|status| {
72                    ffi::MPI_Probe(
73                        self.source_rank(),
74                        tag,
75                        self.as_communicator().as_raw(),
76                        status,
77                    )
78                })
79                .1,
80            )
81        }
82    }
83
84    /// Probe a source for incoming messages.
85    ///
86    /// Probe `Source` `&self` for incoming messages with any tag.
87    ///
88    /// An ordinary `probe()` returns a `Status` which allows inspection of the properties of the
89    /// incoming message, but does not guarantee reception by a subsequent `receive()` (especially
90    /// in a multi-threaded set-up). For a probe operation with stronger guarantees, see
91    /// `matched_probe()`.
92    ///
93    /// # Standard section(s)
94    ///
95    /// 3.8.1
96    fn probe(&self) -> Status {
97        self.probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
98    }
99
100    /// Probe a source for incoming messages with guaranteed reception.
101    ///
102    /// Probe `Source` `&self` for incoming messages with a certain tag.
103    ///
104    /// A `matched_probe()` returns both a `Status` that describes the properties of a pending
105    /// incoming message and a `Message` which can and *must* subsequently be used in a
106    /// `matched_receive()` to receive the probed message.
107    ///
108    /// # Standard section(s)
109    ///
110    /// 3.8.2
111    fn matched_probe_with_tag(&self, tag: Tag) -> (Message, Status) {
112        let (_, message, status) = unsafe {
113            with_uninitialized2(|message, status| {
114                ffi::MPI_Mprobe(
115                    self.source_rank(),
116                    tag,
117                    self.as_communicator().as_raw(),
118                    message,
119                    status,
120                )
121            })
122        };
123        (Message(message), Status(status))
124    }
125
126    /// Probe a source for incoming messages with guaranteed reception.
127    ///
128    /// Probe `Source` `&self` for incoming messages with any tag.
129    ///
130    /// A `matched_probe()` returns both a `Status` that describes the properties of a pending
131    /// incoming message and a `Message` which can and *must* subsequently be used in a
132    /// `matched_receive()` to receive the probed message.
133    ///
134    /// # Standard section(s)
135    ///
136    /// 3.8.2
137    fn matched_probe(&self) -> (Message, Status) {
138        self.matched_probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
139    }
140
141    /// Receive a message containing a single instance of type `Msg`.
142    ///
143    /// Receive a message from `Source` `&self` tagged `tag` containing a single instance of type
144    /// `Msg`.
145    ///
146    /// # Standard section(s)
147    ///
148    /// 3.2.4
149    fn receive_with_tag<Msg>(&self, tag: Tag) -> (Msg, Status)
150    where
151        Msg: Equivalence,
152    {
153        unsafe {
154            let (_, msg, status) = with_uninitialized2(|msg, status| {
155                ffi::MPI_Recv(
156                    msg as _,
157                    1,
158                    Msg::equivalent_datatype().as_raw(),
159                    self.source_rank(),
160                    tag,
161                    self.as_communicator().as_raw(),
162                    status,
163                )
164            });
165            let status = Status(status);
166            if status.count(Msg::equivalent_datatype()) == 0 {
167                panic!("Received an empty message.");
168            }
169            (msg, status)
170        }
171    }
172
173    /// Receive a message containing a single instance of type `Msg`.
174    ///
175    /// Receive a message from `Source` `&self` containing a single instance of type `Msg`.
176    ///
177    /// # Examples
178    ///
179    /// ```no_run
180    /// extern crate mpi_fork_fnsp as mpi;
181    /// use mpi::traits::*;
182    ///
183    /// let universe = mpi::initialize().unwrap();
184    /// let world = universe.world();
185    ///
186    /// let x = world.any_process().receive::<f64>();
187    /// ```
188    ///
189    /// # Standard section(s)
190    ///
191    /// 3.2.4
192    fn receive<Msg>(&self) -> (Msg, Status)
193    where
194        Msg: Equivalence,
195    {
196        self.receive_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
197    }
198
199    /// Receive a message into a `Buffer`.
200    ///
201    /// Receive a message from `Source` `&self` tagged `tag` into `Buffer` `buf`.
202    ///
203    /// # Standard section(s)
204    ///
205    /// 3.2.4
206    fn receive_into_with_tag<Buf: ?Sized>(&self, buf: &mut Buf, tag: Tag) -> Status
207    where
208        Buf: BufferMut,
209    {
210        unsafe {
211            Status(
212                with_uninitialized(|status| {
213                    ffi::MPI_Recv(
214                        buf.pointer_mut(),
215                        buf.count(),
216                        buf.as_datatype().as_raw(),
217                        self.source_rank(),
218                        tag,
219                        self.as_communicator().as_raw(),
220                        status,
221                    )
222                })
223                .1,
224            )
225        }
226    }
227
228    /// Receive a message into a `Buffer`.
229    ///
230    /// Receive a message from `Source` `&self` into `Buffer` `buf`.
231    ///
232    /// # Standard section(s)
233    ///
234    /// 3.2.4
235    fn receive_into<Buf: ?Sized>(&self, buf: &mut Buf) -> Status
236    where
237        Buf: BufferMut,
238    {
239        self.receive_into_with_tag(buf, unsafe { ffi::RSMPI_ANY_TAG })
240    }
241
242    /// Receive a message containing multiple instances of type `Msg` into a `Vec`.
243    ///
244    /// Receive a message from `Source` `&self` tagged `tag` containing multiple instances of type
245    /// `Msg` into a `Vec`.
246    ///
247    /// # Standard section(s)
248    ///
249    /// 3.2.4
250    fn receive_vec_with_tag<Msg>(&self, tag: Tag) -> (Vec<Msg>, Status)
251    where
252        Msg: Equivalence,
253    {
254        self.matched_probe_with_tag(tag).matched_receive_vec()
255    }
256
257    /// Receive a message containing multiple instances of type `Msg` into a `Vec`.
258    ///
259    /// Receive a message from `Source` `&self` containing multiple instances of type `Msg` into a
260    /// `Vec`.
261    ///
262    /// # Examples
263    /// See `examples/send_receive.rs`
264    ///
265    /// # Standard section(s)
266    ///
267    /// 3.2.4
268    fn receive_vec<Msg>(&self) -> (Vec<Msg>, Status)
269    where
270        Msg: Equivalence,
271    {
272        self.receive_vec_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
273    }
274
275    /// Initiate an immediate (non-blocking) receive operation.
276    ///
277    /// Initiate receiving a message matching `tag` into `buf`.
278    ///
279    /// # Standard section(s)
280    ///
281    /// 3.7.2
282    fn immediate_receive_into_with_tag<'a, Sc, Buf: ?Sized>(
283        &self,
284        scope: Sc,
285        buf: &'a mut Buf,
286        tag: Tag,
287    ) -> Request<'a, Sc>
288    where
289        Buf: 'a + BufferMut,
290        Sc: Scope<'a>,
291    {
292        unsafe {
293            Request::from_raw(
294                with_uninitialized(|request| {
295                    ffi::MPI_Irecv(
296                        buf.pointer_mut(),
297                        buf.count(),
298                        buf.as_datatype().as_raw(),
299                        self.source_rank(),
300                        tag,
301                        self.as_communicator().as_raw(),
302                        request,
303                    )
304                })
305                .1,
306                scope,
307            )
308        }
309    }
310
311    /// Initiate an immediate (non-blocking) receive operation.
312    ///
313    /// Initiate receiving a message into `buf`.
314    ///
315    /// # Examples
316    /// See `examples/immediate.rs`
317    ///
318    /// # Standard section(s)
319    ///
320    /// 3.7.2
321    fn immediate_receive_into<'a, Sc, Buf: ?Sized>(
322        &self,
323        scope: Sc,
324        buf: &'a mut Buf,
325    ) -> Request<'a, Sc>
326    where
327        Buf: 'a + BufferMut,
328        Sc: Scope<'a>,
329    {
330        self.immediate_receive_into_with_tag(scope, buf, unsafe { ffi::RSMPI_ANY_TAG })
331    }
332
333    /// Initiate a non-blocking receive operation for messages matching tag `tag`.
334    ///
335    /// # Standard section(s)
336    ///
337    /// 3.7.2
338    fn immediate_receive_with_tag<Msg>(&self, tag: Tag) -> ReceiveFuture<Msg>
339    where
340        Msg: Equivalence,
341    {
342        unsafe {
343            let val = alloc::alloc(Layout::new::<Msg>()) as *mut Msg;
344            let (_, request) = with_uninitialized(|request| {
345                ffi::MPI_Irecv(
346                    val as _,
347                    1,
348                    Msg::equivalent_datatype().as_raw(),
349                    self.source_rank(),
350                    tag,
351                    self.as_communicator().as_raw(),
352                    request,
353                )
354            });
355            ReceiveFuture {
356                val,
357                req: Request::from_raw(request, StaticScope),
358            }
359        }
360    }
361
362    /// Initiate a non-blocking receive operation.
363    ///
364    /// # Examples
365    /// See `examples/immediate.rs`
366    ///
367    /// # Standard section(s)
368    ///
369    /// 3.7.2
370    fn immediate_receive<Msg>(&self) -> ReceiveFuture<Msg>
371    where
372        Msg: Equivalence,
373    {
374        self.immediate_receive_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
375    }
376
377    /// Asynchronously probe a source for incoming messages.
378    ///
379    /// Asynchronously probe `Source` `&self` for incoming messages with a certain tag.
380    ///
381    /// Like `Probe` but returns a `None` immediately if there is no incoming message to be probed.
382    ///
383    /// # Standard section(s)
384    ///
385    /// 3.8.1
386    fn immediate_probe_with_tag(&self, tag: Tag) -> Option<Status> {
387        unsafe {
388            let mut status = MaybeUninit::uninit();
389
390            let (_, flag) = with_uninitialized(|flag| {
391                ffi::MPI_Iprobe(
392                    self.source_rank(),
393                    tag,
394                    self.as_communicator().as_raw(),
395                    flag,
396                    status.as_mut_ptr(),
397                )
398            });
399
400            if flag == 0 {
401                None
402            } else {
403                Some(Status(status.assume_init()))
404            }
405        }
406    }
407
408    /// Asynchronously probe a source for incoming messages.
409    ///
410    /// Asynchronously probe `Source` `&self` for incoming messages with any tag.
411    ///
412    /// Like `Probe` but returns a `None` immediately if there is no incoming message to be probed.
413    ///
414    /// # Standard section(s)
415    ///
416    /// 3.8.1
417    fn immediate_probe(&self) -> Option<Status> {
418        self.immediate_probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
419    }
420
421    /// Asynchronously probe a source for incoming messages with guaranteed reception.
422    ///
423    /// Asynchronously probe `Source` `&self` for incoming messages with a certain tag.
424    ///
425    /// Like `MatchedProbe` but returns a `None` immediately if there is no incoming message to be
426    /// probed.
427    ///
428    /// # Standard section(s)
429    ///
430    /// 3.8.2
431    fn immediate_matched_probe_with_tag(&self, tag: Tag) -> Option<(Message, Status)> {
432        unsafe {
433            let mut message = MaybeUninit::uninit();
434            let mut status = MaybeUninit::uninit();
435
436            let (_, flag) = with_uninitialized(|flag| {
437                ffi::MPI_Improbe(
438                    self.source_rank(),
439                    tag,
440                    self.as_communicator().as_raw(),
441                    flag,
442                    message.as_mut_ptr(),
443                    status.as_mut_ptr(),
444                )
445            });
446
447            if flag == 0 {
448                None
449            } else {
450                Some((Message(message.assume_init()), Status(status.assume_init())))
451            }
452        }
453    }
454
455    /// Asynchronously probe a source for incoming messages with guaranteed reception.
456    ///
457    /// Asynchronously probe `Source` `&self` for incoming messages with any tag.
458    ///
459    /// Like `MatchedProbe` but returns a `None` immediately if there is no incoming message to be
460    /// probed.
461    ///
462    /// # Standard section(s)
463    ///
464    /// 3.8.2
465    fn immediate_matched_probe(&self) -> Option<(Message, Status)> {
466        self.immediate_matched_probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
467    }
468}
469
470unsafe impl<'a, C> Source for AnyProcess<'a, C>
471where
472    C: 'a + Communicator,
473{
474    fn source_rank(&self) -> Rank {
475        unsafe { ffi::RSMPI_ANY_SOURCE }
476    }
477}
478
479unsafe impl<'a, C> Source for Process<'a, C>
480where
481    C: 'a + Communicator,
482{
483    fn source_rank(&self) -> Rank {
484        self.rank()
485    }
486}
487
488/// Something that can be used as the destination in a point to point send operation
489///
490/// # Examples
491/// - Using a `Process` as the destination will send data to that specific process.
492///
493/// # Standard section(s)
494///
495/// 3.2.3
496pub trait Destination: AsCommunicator {
497    /// `Rank` that identifies the destination
498    fn destination_rank(&self) -> Rank;
499
500    /// Blocking standard mode send operation
501    ///
502    /// Send the contents of a `Buffer` to the `Destination` `&self` and tag it.
503    ///
504    /// # Standard section(s)
505    ///
506    /// 3.2.1
507    fn send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
508    where
509        Buf: Buffer,
510    {
511        unsafe {
512            ffi::MPI_Send(
513                buf.pointer(),
514                buf.count(),
515                buf.as_datatype().as_raw(),
516                self.destination_rank(),
517                tag,
518                self.as_communicator().as_raw(),
519            );
520        }
521    }
522
523    /// Blocking standard mode send operation
524    ///
525    /// Send the contents of a `Buffer` to the `Destination` `&self`.
526    ///
527    /// # Examples
528    ///
529    /// ```no_run
530    /// extern crate mpi_fork_fnsp as mpi;
531    /// use mpi::traits::*;
532    ///
533    /// let universe = mpi::initialize().unwrap();
534    /// let world = universe.world();
535    ///
536    /// let v = vec![ 1.0f64, 2.0, 3.0 ];
537    /// world.process_at_rank(1).send(&v[..]);
538    /// ```
539    ///
540    /// See also `examples/send_receive.rs`
541    ///
542    /// # Standard section(s)
543    ///
544    /// 3.2.1
545    fn send<Buf: ?Sized>(&self, buf: &Buf)
546    where
547        Buf: Buffer,
548    {
549        self.send_with_tag(buf, Tag::default())
550    }
551
552    /// Blocking buffered mode send operation
553    ///
554    /// Send the contents of a `Buffer` to the `Destination` `&self` and tag it.
555    ///
556    /// # Standard section(s)
557    ///
558    /// 3.4
559    fn buffered_send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
560    where
561        Buf: Buffer,
562    {
563        unsafe {
564            ffi::MPI_Bsend(
565                buf.pointer(),
566                buf.count(),
567                buf.as_datatype().as_raw(),
568                self.destination_rank(),
569                tag,
570                self.as_communicator().as_raw(),
571            );
572        }
573    }
574
575    /// Blocking buffered mode send operation
576    ///
577    /// Send the contents of a `Buffer` to the `Destination` `&self`.
578    ///
579    /// # Standard section(s)
580    ///
581    /// 3.4
582    fn buffered_send<Buf: ?Sized>(&self, buf: &Buf)
583    where
584        Buf: Buffer,
585    {
586        self.buffered_send_with_tag(buf, Tag::default())
587    }
588
589    /// Blocking synchronous mode send operation
590    ///
591    /// Send the contents of a `Buffer` to the `Destination` `&self` and tag it.
592    ///
593    /// Completes only once the matching receive operation has started.
594    ///
595    /// # Standard section(s)
596    ///
597    /// 3.4
598    fn synchronous_send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
599    where
600        Buf: Buffer,
601    {
602        unsafe {
603            ffi::MPI_Ssend(
604                buf.pointer(),
605                buf.count(),
606                buf.as_datatype().as_raw(),
607                self.destination_rank(),
608                tag,
609                self.as_communicator().as_raw(),
610            );
611        }
612    }
613
614    /// Blocking synchronous mode send operation
615    ///
616    /// Send the contents of a `Buffer` to the `Destination` `&self`.
617    ///
618    /// Completes only once the matching receive operation has started.
619    ///
620    /// # Standard section(s)
621    ///
622    /// 3.4
623    fn synchronous_send<Buf: ?Sized>(&self, buf: &Buf)
624    where
625        Buf: Buffer,
626    {
627        self.synchronous_send_with_tag(buf, Tag::default())
628    }
629
630    /// Blocking ready mode send operation
631    ///
632    /// Send the contents of a `Buffer` to the `Destination` `&self` and tag it.
633    ///
634    /// Fails if the matching receive operation has not been posted.
635    ///
636    /// # Standard section(s)
637    ///
638    /// 3.4
639    fn ready_send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
640    where
641        Buf: Buffer,
642    {
643        unsafe {
644            ffi::MPI_Rsend(
645                buf.pointer(),
646                buf.count(),
647                buf.as_datatype().as_raw(),
648                self.destination_rank(),
649                tag,
650                self.as_communicator().as_raw(),
651            );
652        }
653    }
654
655    /// Blocking ready mode send operation
656    ///
657    /// Send the contents of a `Buffer` to the `Destination` `&self`.
658    ///
659    /// Fails if the matching receive operation has not been posted.
660    ///
661    /// # Standard section(s)
662    ///
663    /// 3.4
664    fn ready_send<Buf: ?Sized>(&self, buf: &Buf)
665    where
666        Buf: Buffer,
667    {
668        self.ready_send_with_tag(buf, Tag::default())
669    }
670
671    /// Initiate an immediate (non-blocking) standard mode send operation.
672    ///
673    /// Initiate sending the data in `buf` in standard mode and tag it.
674    ///
675    /// # Standard section(s)
676    ///
677    /// 3.7.2
678    fn immediate_send_with_tag<'a, Sc, Buf: ?Sized>(
679        &self,
680        scope: Sc,
681        buf: &'a Buf,
682        tag: Tag,
683    ) -> Request<'a, Sc>
684    where
685        Buf: 'a + Buffer,
686        Sc: Scope<'a>,
687    {
688        unsafe {
689            Request::from_raw(
690                with_uninitialized(|request| {
691                    ffi::MPI_Isend(
692                        buf.pointer(),
693                        buf.count(),
694                        buf.as_datatype().as_raw(),
695                        self.destination_rank(),
696                        tag,
697                        self.as_communicator().as_raw(),
698                        request,
699                    )
700                })
701                .1,
702                scope,
703            )
704        }
705    }
706
707    /// Initiate an immediate (non-blocking) standard mode send operation.
708    ///
709    /// Initiate sending the data in `buf` in standard mode.
710    ///
711    /// # Examples
712    /// See `examples/immediate.rs`
713    ///
714    /// # Standard section(s)
715    ///
716    /// 3.7.2
717    fn immediate_send<'a, Sc, Buf: ?Sized>(&self, scope: Sc, buf: &'a Buf) -> Request<'a, Sc>
718    where
719        Buf: 'a + Buffer,
720        Sc: Scope<'a>,
721    {
722        self.immediate_send_with_tag(scope, buf, Tag::default())
723    }
724
725    /// Initiate an immediate (non-blocking) buffered mode send operation.
726    ///
727    /// Initiate sending the data in `buf` in buffered mode and tag it.
728    ///
729    /// # Standard section(s)
730    ///
731    /// 3.7.2
732    fn immediate_buffered_send_with_tag<'a, Sc, Buf: ?Sized>(
733        &self,
734        scope: Sc,
735        buf: &'a Buf,
736        tag: Tag,
737    ) -> Request<'a, Sc>
738    where
739        Buf: 'a + Buffer,
740        Sc: Scope<'a>,
741    {
742        unsafe {
743            Request::from_raw(
744                with_uninitialized(|request| {
745                    ffi::MPI_Ibsend(
746                        buf.pointer(),
747                        buf.count(),
748                        buf.as_datatype().as_raw(),
749                        self.destination_rank(),
750                        tag,
751                        self.as_communicator().as_raw(),
752                        request,
753                    )
754                })
755                .1,
756                scope,
757            )
758        }
759    }
760
761    /// Initiate an immediate (non-blocking) buffered mode send operation.
762    ///
763    /// Initiate sending the data in `buf` in buffered mode.
764    ///
765    /// # Standard section(s)
766    ///
767    /// 3.7.2
768    fn immediate_buffered_send<'a, Sc, Buf: ?Sized>(
769        &self,
770        scope: Sc,
771        buf: &'a Buf,
772    ) -> Request<'a, Sc>
773    where
774        Buf: 'a + Buffer,
775        Sc: Scope<'a>,
776    {
777        self.immediate_buffered_send_with_tag(scope, buf, Tag::default())
778    }
779
780    /// Initiate an immediate (non-blocking) synchronous mode send operation.
781    ///
782    /// Initiate sending the data in `buf` in synchronous mode and tag it.
783    ///
784    /// # Standard section(s)
785    ///
786    /// 3.7.2
787    fn immediate_synchronous_send_with_tag<'a, Sc, Buf: ?Sized>(
788        &self,
789        scope: Sc,
790        buf: &'a Buf,
791        tag: Tag,
792    ) -> Request<'a, Sc>
793    where
794        Buf: 'a + Buffer,
795        Sc: Scope<'a>,
796    {
797        unsafe {
798            Request::from_raw(
799                with_uninitialized(|request| {
800                    ffi::MPI_Issend(
801                        buf.pointer(),
802                        buf.count(),
803                        buf.as_datatype().as_raw(),
804                        self.destination_rank(),
805                        tag,
806                        self.as_communicator().as_raw(),
807                        request,
808                    )
809                })
810                .1,
811                scope,
812            )
813        }
814    }
815
816    /// Initiate an immediate (non-blocking) synchronous mode send operation.
817    ///
818    /// Initiate sending the data in `buf` in synchronous mode.
819    ///
820    /// # Standard section(s)
821    ///
822    /// 3.7.2
823    fn immediate_synchronous_send<'a, Sc, Buf: ?Sized>(
824        &self,
825        scope: Sc,
826        buf: &'a Buf,
827    ) -> Request<'a, Sc>
828    where
829        Buf: 'a + Buffer,
830        Sc: Scope<'a>,
831    {
832        self.immediate_synchronous_send_with_tag(scope, buf, Tag::default())
833    }
834
835    /// Initiate an immediate (non-blocking) ready mode send operation.
836    ///
837    /// Initiate sending the data in `buf` in ready mode and tag it.
838    ///
839    /// # Standard section(s)
840    ///
841    /// 3.7.2
842    fn immediate_ready_send_with_tag<'a, Sc, Buf: ?Sized>(
843        &self,
844        scope: Sc,
845        buf: &'a Buf,
846        tag: Tag,
847    ) -> Request<'a, Sc>
848    where
849        Buf: 'a + Buffer,
850        Sc: Scope<'a>,
851    {
852        unsafe {
853            Request::from_raw(
854                with_uninitialized(|request| {
855                    ffi::MPI_Irsend(
856                        buf.pointer(),
857                        buf.count(),
858                        buf.as_datatype().as_raw(),
859                        self.destination_rank(),
860                        tag,
861                        self.as_communicator().as_raw(),
862                        request,
863                    )
864                })
865                .1,
866                scope,
867            )
868        }
869    }
870
871    /// Initiate an immediate (non-blocking) ready mode send operation.
872    ///
873    /// Initiate sending the data in `buf` in ready mode.
874    ///
875    /// # Examples
876    ///
877    /// See `examples/immediate.rs`
878    ///
879    /// # Standard section(s)
880    ///
881    /// 3.7.2
882    fn immediate_ready_send<'a, Sc, Buf: ?Sized>(&self, scope: Sc, buf: &'a Buf) -> Request<'a, Sc>
883    where
884        Buf: 'a + Buffer,
885        Sc: Scope<'a>,
886    {
887        self.immediate_ready_send_with_tag(scope, buf, Tag::default())
888    }
889}
890
891impl<'a, C> Destination for Process<'a, C>
892where
893    C: 'a + Communicator,
894{
895    fn destination_rank(&self) -> Rank {
896        self.rank()
897    }
898}
899
900/// Describes the result of a point to point receive operation.
901///
902/// # Standard section(s)
903///
904/// 3.2.5
905#[derive(Copy, Clone)]
906pub struct Status(MPI_Status);
907
908impl Status {
909    /// Construct a `Status` value from the raw MPI type
910    pub fn from_raw(status: MPI_Status) -> Status {
911        Status(status)
912    }
913
914    /// The rank of the message source
915    pub fn source_rank(&self) -> Rank {
916        self.0.MPI_SOURCE
917    }
918
919    /// The message tag
920    pub fn tag(&self) -> Tag {
921        self.0.MPI_TAG
922    }
923
924    /// Number of instances of the type contained in the message
925    pub fn count<D: Datatype>(&self, d: D) -> Count {
926        unsafe { with_uninitialized(|count| ffi::MPI_Get_count(&self.0, d.as_raw(), count)).1 }
927    }
928}
929
930impl fmt::Debug for Status {
931    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
932        write!(
933            f,
934            "Status {{ source_rank: {}, tag: {} }}",
935            self.source_rank(),
936            self.tag()
937        )
938    }
939}
940
941/// Describes a pending incoming message, probed by a `matched_probe()`.
942///
943/// # Standard section(s)
944///
945/// 3.8.2
946#[must_use]
947pub struct Message(MPI_Message);
948
949impl Message {
950    /// True if the `Source` for the probe was the null process.
951    pub fn is_no_proc(&self) -> bool {
952        self.as_raw() == unsafe { ffi::RSMPI_MESSAGE_NO_PROC }
953    }
954
955    /// Receive a previously probed message containing a single instance of type `Msg`.
956    ///
957    /// Receives the message `&self` which contains a single instance of type `Msg`.
958    ///
959    /// # Panics
960    /// Received an empty message
961    ///
962    /// # Standard section(s)
963    ///
964    /// 3.8.3
965    pub fn matched_receive<Msg>(mut self) -> (Msg, Status)
966    where
967        Msg: Equivalence,
968    {
969        unsafe {
970            let (_, res, status) = with_uninitialized2(|res, status| {
971                ffi::MPI_Mrecv(
972                    res as _,
973                    1,
974                    Msg::equivalent_datatype().as_raw(),
975                    self.as_raw_mut(),
976                    status,
977                )
978            });
979            let status = Status(status);
980            if status.count(Msg::equivalent_datatype()) == 0 {
981                panic!("Received an empty message.");
982            }
983            (res, status)
984        }
985    }
986
987    /// Receive a previously probed message into a `Buffer`.
988    ///
989    /// Receive the message `&self` with contents matching `buf`.
990    ///
991    /// # Standard section(s)
992    ///
993    /// 3.8.3
994    pub fn matched_receive_into<Buf: ?Sized>(mut self, buf: &mut Buf) -> Status
995    where
996        Buf: BufferMut,
997    {
998        let status;
999        unsafe {
1000            status = with_uninitialized(|status| {
1001                ffi::MPI_Mrecv(
1002                    buf.pointer_mut(),
1003                    buf.count(),
1004                    buf.as_datatype().as_raw(),
1005                    self.as_raw_mut(),
1006                    status,
1007                )
1008            })
1009            .1;
1010            assert_eq!(self.as_raw(), ffi::RSMPI_MESSAGE_NULL);
1011        };
1012        Status(status)
1013    }
1014
1015    /// Asynchronously receive a previously probed message into a `Buffer`.
1016    ///
1017    /// Asynchronously receive the message `&self` with contents matching `buf`.
1018    ///
1019    /// # Panics
1020    /// ?
1021    ///
1022    /// # Standard section(s)
1023    ///
1024    /// 3.8.3
1025    pub fn immediate_matched_receive_into<'a, Sc, Buf: ?Sized + 'a>(
1026        mut self,
1027        scope: Sc,
1028        buf: &'a mut Buf,
1029    ) -> Request<'a, Sc>
1030    where
1031        Buf: BufferMut,
1032        Sc: Scope<'a>,
1033    {
1034        unsafe {
1035            let request = with_uninitialized(|request| {
1036                ffi::MPI_Imrecv(
1037                    buf.pointer_mut(),
1038                    buf.count(),
1039                    buf.as_datatype().as_raw(),
1040                    self.as_raw_mut(),
1041                    request,
1042                )
1043            })
1044            .1;
1045            assert_eq!(self.as_raw(), ffi::RSMPI_MESSAGE_NULL);
1046            Request::from_raw(request, scope)
1047        }
1048    }
1049}
1050
1051unsafe impl AsRaw for Message {
1052    type Raw = MPI_Message;
1053    fn as_raw(&self) -> Self::Raw {
1054        self.0
1055    }
1056}
1057
1058unsafe impl AsRawMut for Message {
1059    fn as_raw_mut(&mut self) -> *mut <Self as AsRaw>::Raw {
1060        &mut self.0
1061    }
1062}
1063
1064/// # Panics
1065/// ?
1066impl Drop for Message {
1067    fn drop(&mut self) {
1068        assert_eq!(
1069            self.as_raw(),
1070            unsafe { ffi::RSMPI_MESSAGE_NULL },
1071            "matched message dropped without receiving."
1072        );
1073    }
1074}
1075
1076/// Receive a previously probed message containing multiple instances of type `Msg` into a `Vec`.
1077///
1078/// # Standard section(s)
1079///
1080/// 3.8.3
1081pub trait MatchedReceiveVec {
1082    /// Receives the message `&self` which contains multiple instances of type `Msg` into a `Vec`.
1083    fn matched_receive_vec<Msg>(self) -> (Vec<Msg>, Status)
1084    where
1085        Msg: Equivalence;
1086}
1087
1088impl MatchedReceiveVec for (Message, Status) {
1089    fn matched_receive_vec<Msg>(self) -> (Vec<Msg>, Status)
1090    where
1091        Msg: Equivalence,
1092    {
1093        #[repr(transparent)]
1094        struct UninitMsg<M>(MaybeUninit<M>);
1095
1096        unsafe impl<M: Equivalence> Equivalence for UninitMsg<M> {
1097            type Out = M::Out;
1098
1099            fn equivalent_datatype() -> Self::Out {
1100                M::equivalent_datatype()
1101            }
1102        }
1103
1104        let (message, status) = self;
1105        let count = status
1106            .count(Msg::equivalent_datatype())
1107            .value_as()
1108            .expect("Message element count cannot be expressed as a usize.");
1109
1110        let mut res = (0..count)
1111            .map(|_| UninitMsg::<Msg>(MaybeUninit::uninit()))
1112            .collect::<Vec<_>>();
1113
1114        let status = message.matched_receive_into(&mut res[..]);
1115
1116        let res = unsafe { transmute(res) };
1117
1118        (res, status)
1119    }
1120}
1121
1122/// Sends `msg` to `destination` tagging it `sendtag` and simultaneously receives an
1123/// instance of `R` tagged `receivetag` from `source`.
1124///
1125/// # Panics
1126/// ?
1127///
1128/// # Standard section(s)
1129///
1130/// 3.10
1131pub fn send_receive_with_tags<M, D, R, S>(
1132    msg: &M,
1133    destination: &D,
1134    sendtag: Tag,
1135    source: &S,
1136    receivetag: Tag,
1137) -> (R, Status)
1138where
1139    M: Equivalence,
1140    D: Destination,
1141    R: Equivalence,
1142    S: Source,
1143{
1144    assert_eq!(
1145        source
1146            .as_communicator()
1147            .compare(destination.as_communicator()),
1148        CommunicatorRelation::Identical
1149    );
1150    unsafe {
1151        let (_, res, status) = with_uninitialized2(|res, status| {
1152            ffi::MPI_Sendrecv(
1153                msg.pointer(),
1154                msg.count(),
1155                msg.as_datatype().as_raw(),
1156                destination.destination_rank(),
1157                sendtag,
1158                res as _,
1159                1,
1160                R::equivalent_datatype().as_raw(),
1161                source.source_rank(),
1162                receivetag,
1163                source.as_communicator().as_raw(),
1164                status,
1165            )
1166        });
1167        let status = Status(status);
1168        (res, status)
1169    }
1170}
1171
1172/// Sends `msg` to `destination` and simultaneously receives an instance of `R` from
1173/// `source`.
1174///
1175/// # Examples
1176/// See `examples/send_receive.rs`
1177///
1178/// # Standard section(s)
1179///
1180/// 3.10
1181pub fn send_receive<R, M, D, S>(msg: &M, destination: &D, source: &S) -> (R, Status)
1182where
1183    M: Equivalence,
1184    D: Destination,
1185    R: Equivalence,
1186    S: Source,
1187{
1188    send_receive_with_tags(msg, destination, Tag::default(), source, unsafe {
1189        ffi::RSMPI_ANY_TAG
1190    })
1191}
1192
1193/// Sends the contents of `msg` to `destination` tagging it `sendtag` and
1194/// simultaneously receives a message tagged `receivetag` from `source` into
1195/// `buf`.
1196///
1197/// # Panics
1198/// ?
1199///
1200/// # Standard section(s)
1201///
1202/// 3.10
1203pub fn send_receive_into_with_tags<M: ?Sized, D, B: ?Sized, S>(
1204    msg: &M,
1205    destination: &D,
1206    sendtag: Tag,
1207    buf: &mut B,
1208    source: &S,
1209    receivetag: Tag,
1210) -> Status
1211where
1212    M: Buffer,
1213    D: Destination,
1214    B: BufferMut,
1215    S: Source,
1216{
1217    assert_eq!(
1218        source
1219            .as_communicator()
1220            .compare(destination.as_communicator()),
1221        CommunicatorRelation::Identical
1222    );
1223    unsafe {
1224        Status(
1225            with_uninitialized(|status| {
1226                ffi::MPI_Sendrecv(
1227                    msg.pointer(),
1228                    msg.count(),
1229                    msg.as_datatype().as_raw(),
1230                    destination.destination_rank(),
1231                    sendtag,
1232                    buf.pointer_mut(),
1233                    buf.count(),
1234                    buf.as_datatype().as_raw(),
1235                    source.source_rank(),
1236                    receivetag,
1237                    source.as_communicator().as_raw(),
1238                    status,
1239                )
1240            })
1241            .1,
1242        )
1243    }
1244}
1245
1246/// Sends the contents of `msg` to `destination` and
1247/// simultaneously receives a message from `source` into
1248/// `buf`.
1249///
1250/// # Standard section(s)
1251///
1252/// 3.10
1253pub fn send_receive_into<M: ?Sized, D, B: ?Sized, S>(
1254    msg: &M,
1255    destination: &D,
1256    buf: &mut B,
1257    source: &S,
1258) -> Status
1259where
1260    M: Buffer,
1261    D: Destination,
1262    B: BufferMut,
1263    S: Source,
1264{
1265    send_receive_into_with_tags(msg, destination, Tag::default(), buf, source, unsafe {
1266        ffi::RSMPI_ANY_TAG
1267    })
1268}
1269
1270/// Sends the contents of `buf` to `destination` tagging it `sendtag` and
1271/// simultaneously receives a message tagged `receivetag` from `source` and replaces the
1272/// contents of `buf` with it.
1273///
1274/// # Panics
1275/// ?
1276///
1277/// # Standard section(s)
1278///
1279/// 3.10
1280pub fn send_receive_replace_into_with_tags<B: ?Sized, D, S>(
1281    buf: &mut B,
1282    destination: &D,
1283    sendtag: Tag,
1284    source: &S,
1285    receivetag: Tag,
1286) -> Status
1287where
1288    B: BufferMut,
1289    D: Destination,
1290    S: Source,
1291{
1292    assert_eq!(
1293        source
1294            .as_communicator()
1295            .compare(destination.as_communicator()),
1296        CommunicatorRelation::Identical
1297    );
1298    unsafe {
1299        Status(
1300            with_uninitialized(|status| {
1301                ffi::MPI_Sendrecv_replace(
1302                    buf.pointer_mut(),
1303                    buf.count(),
1304                    buf.as_datatype().as_raw(),
1305                    destination.destination_rank(),
1306                    sendtag,
1307                    source.source_rank(),
1308                    receivetag,
1309                    source.as_communicator().as_raw(),
1310                    status,
1311                )
1312            })
1313            .1,
1314        )
1315    }
1316}
1317
1318/// Sends the contents of `buf` to `destination` and
1319/// simultaneously receives a message from `source` and replaces the contents of
1320/// `buf` with it.
1321///
1322/// # Standard section(s)
1323///
1324/// 3.10
1325pub fn send_receive_replace_into<B: ?Sized, D, S>(
1326    buf: &mut B,
1327    destination: &D,
1328    source: &S,
1329) -> Status
1330where
1331    B: BufferMut,
1332    D: Destination,
1333    S: Source,
1334{
1335    send_receive_replace_into_with_tags(buf, destination, Tag::default(), source, unsafe {
1336        ffi::RSMPI_ANY_TAG
1337    })
1338}
1339
1340/// Will contain a value of type `T` received via a non-blocking receive operation.
1341#[must_use]
1342pub struct ReceiveFuture<T> {
1343    val: *mut T,
1344    req: Request<'static>,
1345}
1346
1347impl<T> ReceiveFuture<T>
1348where
1349    T: Equivalence,
1350{
1351    /// Wait for the receive operation to finish and return the received data.
1352    ///
1353    /// # Panics
1354    /// Received an empty message
1355    pub fn get(self) -> (T, Status) {
1356        let status = self.req.wait();
1357        if status.count(T::equivalent_datatype()) == 0 {
1358            panic!("Received an empty message into a ReceiveFuture.");
1359        }
1360        unsafe { (ptr::read(self.val), status) }
1361    }
1362
1363    /// Check whether the receive operation has finished.
1364    ///
1365    /// If the operation has finished, the data received is returned. Otherwise the future itself
1366    /// is returned.
1367    ///
1368    /// # Panics
1369    /// Received an empty message
1370    ///
1371    /// # Errors
1372    /// Receive has not finished yet
1373    pub fn r#try(mut self) -> Result<(T, Status), Self> {
1374        match self.req.test() {
1375            Ok(status) => {
1376                if status.count(T::equivalent_datatype()) == 0 {
1377                    panic!("Received an empty message into a ReceiveFuture.");
1378                }
1379                unsafe { Ok((ptr::read(self.val), status)) }
1380            }
1381            Err(request) => {
1382                self.req = request;
1383                Err(self)
1384            }
1385        }
1386    }
1387}