mpi_fork_fnsp/point_to_point.rs
1//! Point to point communication
2//!
3//! Endpoints of communication are mostly described by types that implement the `Source` and
4//! `Destination` trait. Communication operations are implemented as default methods on those
5//! traits.
6//!
7//! # Unfinished features
8//!
9//! - **3.2.6**: `MPI_STATUS_IGNORE`
10//! - **3.6**: Buffer usage, `MPI_Buffer_attach()`, `MPI_Buffer_detach()`
11//! - **3.9**: Persistent requests, `MPI_Send_init()`, `MPI_Bsend_init()`, `MPI_Ssend_init()`,
12//! `MPI_Rsend_init()`, `MPI_Recv_init()`, `MPI_Start()`, `MPI_Startall()`
13#![allow(clippy::missing_panics_doc, clippy::ptr_as_ptr)]
14use std::alloc::{self, Layout};
15use std::mem::{transmute, MaybeUninit};
16use std::{fmt, ptr};
17
18use conv::ConvUtil;
19
20use super::{Count, Tag};
21
22use crate::ffi;
23use crate::ffi::{MPI_Message, MPI_Status};
24
25use crate::datatype::traits::{
26 AsDatatype, Buffer, BufferMut, Collection, Datatype, Equivalence, Pointer,
27};
28use crate::raw::traits::{AsRaw, AsRawMut};
29use crate::request::{Request, Scope, StaticScope};
30use crate::topology::traits::{AsCommunicator, Communicator};
31use crate::topology::{AnyProcess, CommunicatorRelation, Process, Rank};
32use crate::{with_uninitialized, with_uninitialized2};
33
34// TODO: rein in _with_tag ugliness, use optional tags or make tag part of Source and Destination
35
36/// Point to point communication traits
37pub mod traits {
38 pub use super::{Destination, MatchedReceiveVec, Source};
39}
40
41/// Something that can be used as the source in a point to point receive operation
42///
43/// # Examples
44///
45/// - A `Process` used as a source for a receive operation will receive data only from the
46/// identified process.
47/// - A communicator can also be used as a source via the `AnyProcess` identifier.
48///
49/// # Standard section(s)
50///
51/// 3.2.3
52pub unsafe trait Source: AsCommunicator {
53 /// `Rank` that identifies the source
54 fn source_rank(&self) -> Rank;
55
56 /// Probe a source for incoming messages.
57 ///
58 /// Probe `Source` `&self` for incoming messages with a certain tag.
59 ///
60 /// An ordinary `probe()` returns a `Status` which allows inspection of the properties of the
61 /// incoming message, but does not guarantee reception by a subsequent `receive()` (especially
62 /// in a multi-threaded set-up). For a probe operation with stronger guarantees, see
63 /// `matched_probe()`.
64 ///
65 /// # Standard section(s)
66 ///
67 /// 3.8.1
68 fn probe_with_tag(&self, tag: Tag) -> Status {
69 unsafe {
70 Status(
71 with_uninitialized(|status| {
72 ffi::MPI_Probe(
73 self.source_rank(),
74 tag,
75 self.as_communicator().as_raw(),
76 status,
77 )
78 })
79 .1,
80 )
81 }
82 }
83
84 /// Probe a source for incoming messages.
85 ///
86 /// Probe `Source` `&self` for incoming messages with any tag.
87 ///
88 /// An ordinary `probe()` returns a `Status` which allows inspection of the properties of the
89 /// incoming message, but does not guarantee reception by a subsequent `receive()` (especially
90 /// in a multi-threaded set-up). For a probe operation with stronger guarantees, see
91 /// `matched_probe()`.
92 ///
93 /// # Standard section(s)
94 ///
95 /// 3.8.1
96 fn probe(&self) -> Status {
97 self.probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
98 }
99
100 /// Probe a source for incoming messages with guaranteed reception.
101 ///
102 /// Probe `Source` `&self` for incoming messages with a certain tag.
103 ///
104 /// A `matched_probe()` returns both a `Status` that describes the properties of a pending
105 /// incoming message and a `Message` which can and *must* subsequently be used in a
106 /// `matched_receive()` to receive the probed message.
107 ///
108 /// # Standard section(s)
109 ///
110 /// 3.8.2
111 fn matched_probe_with_tag(&self, tag: Tag) -> (Message, Status) {
112 let (_, message, status) = unsafe {
113 with_uninitialized2(|message, status| {
114 ffi::MPI_Mprobe(
115 self.source_rank(),
116 tag,
117 self.as_communicator().as_raw(),
118 message,
119 status,
120 )
121 })
122 };
123 (Message(message), Status(status))
124 }
125
126 /// Probe a source for incoming messages with guaranteed reception.
127 ///
128 /// Probe `Source` `&self` for incoming messages with any tag.
129 ///
130 /// A `matched_probe()` returns both a `Status` that describes the properties of a pending
131 /// incoming message and a `Message` which can and *must* subsequently be used in a
132 /// `matched_receive()` to receive the probed message.
133 ///
134 /// # Standard section(s)
135 ///
136 /// 3.8.2
137 fn matched_probe(&self) -> (Message, Status) {
138 self.matched_probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
139 }
140
141 /// Receive a message containing a single instance of type `Msg`.
142 ///
143 /// Receive a message from `Source` `&self` tagged `tag` containing a single instance of type
144 /// `Msg`.
145 ///
146 /// # Standard section(s)
147 ///
148 /// 3.2.4
149 fn receive_with_tag<Msg>(&self, tag: Tag) -> (Msg, Status)
150 where
151 Msg: Equivalence,
152 {
153 unsafe {
154 let (_, msg, status) = with_uninitialized2(|msg, status| {
155 ffi::MPI_Recv(
156 msg as _,
157 1,
158 Msg::equivalent_datatype().as_raw(),
159 self.source_rank(),
160 tag,
161 self.as_communicator().as_raw(),
162 status,
163 )
164 });
165 let status = Status(status);
166 if status.count(Msg::equivalent_datatype()) == 0 {
167 panic!("Received an empty message.");
168 }
169 (msg, status)
170 }
171 }
172
173 /// Receive a message containing a single instance of type `Msg`.
174 ///
175 /// Receive a message from `Source` `&self` containing a single instance of type `Msg`.
176 ///
177 /// # Examples
178 ///
179 /// ```no_run
180 /// extern crate mpi_fork_fnsp as mpi;
181 /// use mpi::traits::*;
182 ///
183 /// let universe = mpi::initialize().unwrap();
184 /// let world = universe.world();
185 ///
186 /// let x = world.any_process().receive::<f64>();
187 /// ```
188 ///
189 /// # Standard section(s)
190 ///
191 /// 3.2.4
192 fn receive<Msg>(&self) -> (Msg, Status)
193 where
194 Msg: Equivalence,
195 {
196 self.receive_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
197 }
198
199 /// Receive a message into a `Buffer`.
200 ///
201 /// Receive a message from `Source` `&self` tagged `tag` into `Buffer` `buf`.
202 ///
203 /// # Standard section(s)
204 ///
205 /// 3.2.4
206 fn receive_into_with_tag<Buf: ?Sized>(&self, buf: &mut Buf, tag: Tag) -> Status
207 where
208 Buf: BufferMut,
209 {
210 unsafe {
211 Status(
212 with_uninitialized(|status| {
213 ffi::MPI_Recv(
214 buf.pointer_mut(),
215 buf.count(),
216 buf.as_datatype().as_raw(),
217 self.source_rank(),
218 tag,
219 self.as_communicator().as_raw(),
220 status,
221 )
222 })
223 .1,
224 )
225 }
226 }
227
228 /// Receive a message into a `Buffer`.
229 ///
230 /// Receive a message from `Source` `&self` into `Buffer` `buf`.
231 ///
232 /// # Standard section(s)
233 ///
234 /// 3.2.4
235 fn receive_into<Buf: ?Sized>(&self, buf: &mut Buf) -> Status
236 where
237 Buf: BufferMut,
238 {
239 self.receive_into_with_tag(buf, unsafe { ffi::RSMPI_ANY_TAG })
240 }
241
242 /// Receive a message containing multiple instances of type `Msg` into a `Vec`.
243 ///
244 /// Receive a message from `Source` `&self` tagged `tag` containing multiple instances of type
245 /// `Msg` into a `Vec`.
246 ///
247 /// # Standard section(s)
248 ///
249 /// 3.2.4
250 fn receive_vec_with_tag<Msg>(&self, tag: Tag) -> (Vec<Msg>, Status)
251 where
252 Msg: Equivalence,
253 {
254 self.matched_probe_with_tag(tag).matched_receive_vec()
255 }
256
257 /// Receive a message containing multiple instances of type `Msg` into a `Vec`.
258 ///
259 /// Receive a message from `Source` `&self` containing multiple instances of type `Msg` into a
260 /// `Vec`.
261 ///
262 /// # Examples
263 /// See `examples/send_receive.rs`
264 ///
265 /// # Standard section(s)
266 ///
267 /// 3.2.4
268 fn receive_vec<Msg>(&self) -> (Vec<Msg>, Status)
269 where
270 Msg: Equivalence,
271 {
272 self.receive_vec_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
273 }
274
275 /// Initiate an immediate (non-blocking) receive operation.
276 ///
277 /// Initiate receiving a message matching `tag` into `buf`.
278 ///
279 /// # Standard section(s)
280 ///
281 /// 3.7.2
282 fn immediate_receive_into_with_tag<'a, Sc, Buf: ?Sized>(
283 &self,
284 scope: Sc,
285 buf: &'a mut Buf,
286 tag: Tag,
287 ) -> Request<'a, Sc>
288 where
289 Buf: 'a + BufferMut,
290 Sc: Scope<'a>,
291 {
292 unsafe {
293 Request::from_raw(
294 with_uninitialized(|request| {
295 ffi::MPI_Irecv(
296 buf.pointer_mut(),
297 buf.count(),
298 buf.as_datatype().as_raw(),
299 self.source_rank(),
300 tag,
301 self.as_communicator().as_raw(),
302 request,
303 )
304 })
305 .1,
306 scope,
307 )
308 }
309 }
310
311 /// Initiate an immediate (non-blocking) receive operation.
312 ///
313 /// Initiate receiving a message into `buf`.
314 ///
315 /// # Examples
316 /// See `examples/immediate.rs`
317 ///
318 /// # Standard section(s)
319 ///
320 /// 3.7.2
321 fn immediate_receive_into<'a, Sc, Buf: ?Sized>(
322 &self,
323 scope: Sc,
324 buf: &'a mut Buf,
325 ) -> Request<'a, Sc>
326 where
327 Buf: 'a + BufferMut,
328 Sc: Scope<'a>,
329 {
330 self.immediate_receive_into_with_tag(scope, buf, unsafe { ffi::RSMPI_ANY_TAG })
331 }
332
333 /// Initiate a non-blocking receive operation for messages matching tag `tag`.
334 ///
335 /// # Standard section(s)
336 ///
337 /// 3.7.2
338 fn immediate_receive_with_tag<Msg>(&self, tag: Tag) -> ReceiveFuture<Msg>
339 where
340 Msg: Equivalence,
341 {
342 unsafe {
343 let val = alloc::alloc(Layout::new::<Msg>()) as *mut Msg;
344 let (_, request) = with_uninitialized(|request| {
345 ffi::MPI_Irecv(
346 val as _,
347 1,
348 Msg::equivalent_datatype().as_raw(),
349 self.source_rank(),
350 tag,
351 self.as_communicator().as_raw(),
352 request,
353 )
354 });
355 ReceiveFuture {
356 val,
357 req: Request::from_raw(request, StaticScope),
358 }
359 }
360 }
361
362 /// Initiate a non-blocking receive operation.
363 ///
364 /// # Examples
365 /// See `examples/immediate.rs`
366 ///
367 /// # Standard section(s)
368 ///
369 /// 3.7.2
370 fn immediate_receive<Msg>(&self) -> ReceiveFuture<Msg>
371 where
372 Msg: Equivalence,
373 {
374 self.immediate_receive_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
375 }
376
377 /// Asynchronously probe a source for incoming messages.
378 ///
379 /// Asynchronously probe `Source` `&self` for incoming messages with a certain tag.
380 ///
381 /// Like `Probe` but returns a `None` immediately if there is no incoming message to be probed.
382 ///
383 /// # Standard section(s)
384 ///
385 /// 3.8.1
386 fn immediate_probe_with_tag(&self, tag: Tag) -> Option<Status> {
387 unsafe {
388 let mut status = MaybeUninit::uninit();
389
390 let (_, flag) = with_uninitialized(|flag| {
391 ffi::MPI_Iprobe(
392 self.source_rank(),
393 tag,
394 self.as_communicator().as_raw(),
395 flag,
396 status.as_mut_ptr(),
397 )
398 });
399
400 if flag == 0 {
401 None
402 } else {
403 Some(Status(status.assume_init()))
404 }
405 }
406 }
407
408 /// Asynchronously probe a source for incoming messages.
409 ///
410 /// Asynchronously probe `Source` `&self` for incoming messages with any tag.
411 ///
412 /// Like `Probe` but returns a `None` immediately if there is no incoming message to be probed.
413 ///
414 /// # Standard section(s)
415 ///
416 /// 3.8.1
417 fn immediate_probe(&self) -> Option<Status> {
418 self.immediate_probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
419 }
420
421 /// Asynchronously probe a source for incoming messages with guaranteed reception.
422 ///
423 /// Asynchronously probe `Source` `&self` for incoming messages with a certain tag.
424 ///
425 /// Like `MatchedProbe` but returns a `None` immediately if there is no incoming message to be
426 /// probed.
427 ///
428 /// # Standard section(s)
429 ///
430 /// 3.8.2
431 fn immediate_matched_probe_with_tag(&self, tag: Tag) -> Option<(Message, Status)> {
432 unsafe {
433 let mut message = MaybeUninit::uninit();
434 let mut status = MaybeUninit::uninit();
435
436 let (_, flag) = with_uninitialized(|flag| {
437 ffi::MPI_Improbe(
438 self.source_rank(),
439 tag,
440 self.as_communicator().as_raw(),
441 flag,
442 message.as_mut_ptr(),
443 status.as_mut_ptr(),
444 )
445 });
446
447 if flag == 0 {
448 None
449 } else {
450 Some((Message(message.assume_init()), Status(status.assume_init())))
451 }
452 }
453 }
454
455 /// Asynchronously probe a source for incoming messages with guaranteed reception.
456 ///
457 /// Asynchronously probe `Source` `&self` for incoming messages with any tag.
458 ///
459 /// Like `MatchedProbe` but returns a `None` immediately if there is no incoming message to be
460 /// probed.
461 ///
462 /// # Standard section(s)
463 ///
464 /// 3.8.2
465 fn immediate_matched_probe(&self) -> Option<(Message, Status)> {
466 self.immediate_matched_probe_with_tag(unsafe { ffi::RSMPI_ANY_TAG })
467 }
468}
469
470unsafe impl<'a, C> Source for AnyProcess<'a, C>
471where
472 C: 'a + Communicator,
473{
474 fn source_rank(&self) -> Rank {
475 unsafe { ffi::RSMPI_ANY_SOURCE }
476 }
477}
478
479unsafe impl<'a, C> Source for Process<'a, C>
480where
481 C: 'a + Communicator,
482{
483 fn source_rank(&self) -> Rank {
484 self.rank()
485 }
486}
487
488/// Something that can be used as the destination in a point to point send operation
489///
490/// # Examples
491/// - Using a `Process` as the destination will send data to that specific process.
492///
493/// # Standard section(s)
494///
495/// 3.2.3
496pub trait Destination: AsCommunicator {
497 /// `Rank` that identifies the destination
498 fn destination_rank(&self) -> Rank;
499
500 /// Blocking standard mode send operation
501 ///
502 /// Send the contents of a `Buffer` to the `Destination` `&self` and tag it.
503 ///
504 /// # Standard section(s)
505 ///
506 /// 3.2.1
507 fn send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
508 where
509 Buf: Buffer,
510 {
511 unsafe {
512 ffi::MPI_Send(
513 buf.pointer(),
514 buf.count(),
515 buf.as_datatype().as_raw(),
516 self.destination_rank(),
517 tag,
518 self.as_communicator().as_raw(),
519 );
520 }
521 }
522
523 /// Blocking standard mode send operation
524 ///
525 /// Send the contents of a `Buffer` to the `Destination` `&self`.
526 ///
527 /// # Examples
528 ///
529 /// ```no_run
530 /// extern crate mpi_fork_fnsp as mpi;
531 /// use mpi::traits::*;
532 ///
533 /// let universe = mpi::initialize().unwrap();
534 /// let world = universe.world();
535 ///
536 /// let v = vec![ 1.0f64, 2.0, 3.0 ];
537 /// world.process_at_rank(1).send(&v[..]);
538 /// ```
539 ///
540 /// See also `examples/send_receive.rs`
541 ///
542 /// # Standard section(s)
543 ///
544 /// 3.2.1
545 fn send<Buf: ?Sized>(&self, buf: &Buf)
546 where
547 Buf: Buffer,
548 {
549 self.send_with_tag(buf, Tag::default())
550 }
551
552 /// Blocking buffered mode send operation
553 ///
554 /// Send the contents of a `Buffer` to the `Destination` `&self` and tag it.
555 ///
556 /// # Standard section(s)
557 ///
558 /// 3.4
559 fn buffered_send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
560 where
561 Buf: Buffer,
562 {
563 unsafe {
564 ffi::MPI_Bsend(
565 buf.pointer(),
566 buf.count(),
567 buf.as_datatype().as_raw(),
568 self.destination_rank(),
569 tag,
570 self.as_communicator().as_raw(),
571 );
572 }
573 }
574
575 /// Blocking buffered mode send operation
576 ///
577 /// Send the contents of a `Buffer` to the `Destination` `&self`.
578 ///
579 /// # Standard section(s)
580 ///
581 /// 3.4
582 fn buffered_send<Buf: ?Sized>(&self, buf: &Buf)
583 where
584 Buf: Buffer,
585 {
586 self.buffered_send_with_tag(buf, Tag::default())
587 }
588
589 /// Blocking synchronous mode send operation
590 ///
591 /// Send the contents of a `Buffer` to the `Destination` `&self` and tag it.
592 ///
593 /// Completes only once the matching receive operation has started.
594 ///
595 /// # Standard section(s)
596 ///
597 /// 3.4
598 fn synchronous_send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
599 where
600 Buf: Buffer,
601 {
602 unsafe {
603 ffi::MPI_Ssend(
604 buf.pointer(),
605 buf.count(),
606 buf.as_datatype().as_raw(),
607 self.destination_rank(),
608 tag,
609 self.as_communicator().as_raw(),
610 );
611 }
612 }
613
614 /// Blocking synchronous mode send operation
615 ///
616 /// Send the contents of a `Buffer` to the `Destination` `&self`.
617 ///
618 /// Completes only once the matching receive operation has started.
619 ///
620 /// # Standard section(s)
621 ///
622 /// 3.4
623 fn synchronous_send<Buf: ?Sized>(&self, buf: &Buf)
624 where
625 Buf: Buffer,
626 {
627 self.synchronous_send_with_tag(buf, Tag::default())
628 }
629
630 /// Blocking ready mode send operation
631 ///
632 /// Send the contents of a `Buffer` to the `Destination` `&self` and tag it.
633 ///
634 /// Fails if the matching receive operation has not been posted.
635 ///
636 /// # Standard section(s)
637 ///
638 /// 3.4
639 fn ready_send_with_tag<Buf: ?Sized>(&self, buf: &Buf, tag: Tag)
640 where
641 Buf: Buffer,
642 {
643 unsafe {
644 ffi::MPI_Rsend(
645 buf.pointer(),
646 buf.count(),
647 buf.as_datatype().as_raw(),
648 self.destination_rank(),
649 tag,
650 self.as_communicator().as_raw(),
651 );
652 }
653 }
654
655 /// Blocking ready mode send operation
656 ///
657 /// Send the contents of a `Buffer` to the `Destination` `&self`.
658 ///
659 /// Fails if the matching receive operation has not been posted.
660 ///
661 /// # Standard section(s)
662 ///
663 /// 3.4
664 fn ready_send<Buf: ?Sized>(&self, buf: &Buf)
665 where
666 Buf: Buffer,
667 {
668 self.ready_send_with_tag(buf, Tag::default())
669 }
670
671 /// Initiate an immediate (non-blocking) standard mode send operation.
672 ///
673 /// Initiate sending the data in `buf` in standard mode and tag it.
674 ///
675 /// # Standard section(s)
676 ///
677 /// 3.7.2
678 fn immediate_send_with_tag<'a, Sc, Buf: ?Sized>(
679 &self,
680 scope: Sc,
681 buf: &'a Buf,
682 tag: Tag,
683 ) -> Request<'a, Sc>
684 where
685 Buf: 'a + Buffer,
686 Sc: Scope<'a>,
687 {
688 unsafe {
689 Request::from_raw(
690 with_uninitialized(|request| {
691 ffi::MPI_Isend(
692 buf.pointer(),
693 buf.count(),
694 buf.as_datatype().as_raw(),
695 self.destination_rank(),
696 tag,
697 self.as_communicator().as_raw(),
698 request,
699 )
700 })
701 .1,
702 scope,
703 )
704 }
705 }
706
707 /// Initiate an immediate (non-blocking) standard mode send operation.
708 ///
709 /// Initiate sending the data in `buf` in standard mode.
710 ///
711 /// # Examples
712 /// See `examples/immediate.rs`
713 ///
714 /// # Standard section(s)
715 ///
716 /// 3.7.2
717 fn immediate_send<'a, Sc, Buf: ?Sized>(&self, scope: Sc, buf: &'a Buf) -> Request<'a, Sc>
718 where
719 Buf: 'a + Buffer,
720 Sc: Scope<'a>,
721 {
722 self.immediate_send_with_tag(scope, buf, Tag::default())
723 }
724
725 /// Initiate an immediate (non-blocking) buffered mode send operation.
726 ///
727 /// Initiate sending the data in `buf` in buffered mode and tag it.
728 ///
729 /// # Standard section(s)
730 ///
731 /// 3.7.2
732 fn immediate_buffered_send_with_tag<'a, Sc, Buf: ?Sized>(
733 &self,
734 scope: Sc,
735 buf: &'a Buf,
736 tag: Tag,
737 ) -> Request<'a, Sc>
738 where
739 Buf: 'a + Buffer,
740 Sc: Scope<'a>,
741 {
742 unsafe {
743 Request::from_raw(
744 with_uninitialized(|request| {
745 ffi::MPI_Ibsend(
746 buf.pointer(),
747 buf.count(),
748 buf.as_datatype().as_raw(),
749 self.destination_rank(),
750 tag,
751 self.as_communicator().as_raw(),
752 request,
753 )
754 })
755 .1,
756 scope,
757 )
758 }
759 }
760
761 /// Initiate an immediate (non-blocking) buffered mode send operation.
762 ///
763 /// Initiate sending the data in `buf` in buffered mode.
764 ///
765 /// # Standard section(s)
766 ///
767 /// 3.7.2
768 fn immediate_buffered_send<'a, Sc, Buf: ?Sized>(
769 &self,
770 scope: Sc,
771 buf: &'a Buf,
772 ) -> Request<'a, Sc>
773 where
774 Buf: 'a + Buffer,
775 Sc: Scope<'a>,
776 {
777 self.immediate_buffered_send_with_tag(scope, buf, Tag::default())
778 }
779
780 /// Initiate an immediate (non-blocking) synchronous mode send operation.
781 ///
782 /// Initiate sending the data in `buf` in synchronous mode and tag it.
783 ///
784 /// # Standard section(s)
785 ///
786 /// 3.7.2
787 fn immediate_synchronous_send_with_tag<'a, Sc, Buf: ?Sized>(
788 &self,
789 scope: Sc,
790 buf: &'a Buf,
791 tag: Tag,
792 ) -> Request<'a, Sc>
793 where
794 Buf: 'a + Buffer,
795 Sc: Scope<'a>,
796 {
797 unsafe {
798 Request::from_raw(
799 with_uninitialized(|request| {
800 ffi::MPI_Issend(
801 buf.pointer(),
802 buf.count(),
803 buf.as_datatype().as_raw(),
804 self.destination_rank(),
805 tag,
806 self.as_communicator().as_raw(),
807 request,
808 )
809 })
810 .1,
811 scope,
812 )
813 }
814 }
815
816 /// Initiate an immediate (non-blocking) synchronous mode send operation.
817 ///
818 /// Initiate sending the data in `buf` in synchronous mode.
819 ///
820 /// # Standard section(s)
821 ///
822 /// 3.7.2
823 fn immediate_synchronous_send<'a, Sc, Buf: ?Sized>(
824 &self,
825 scope: Sc,
826 buf: &'a Buf,
827 ) -> Request<'a, Sc>
828 where
829 Buf: 'a + Buffer,
830 Sc: Scope<'a>,
831 {
832 self.immediate_synchronous_send_with_tag(scope, buf, Tag::default())
833 }
834
835 /// Initiate an immediate (non-blocking) ready mode send operation.
836 ///
837 /// Initiate sending the data in `buf` in ready mode and tag it.
838 ///
839 /// # Standard section(s)
840 ///
841 /// 3.7.2
842 fn immediate_ready_send_with_tag<'a, Sc, Buf: ?Sized>(
843 &self,
844 scope: Sc,
845 buf: &'a Buf,
846 tag: Tag,
847 ) -> Request<'a, Sc>
848 where
849 Buf: 'a + Buffer,
850 Sc: Scope<'a>,
851 {
852 unsafe {
853 Request::from_raw(
854 with_uninitialized(|request| {
855 ffi::MPI_Irsend(
856 buf.pointer(),
857 buf.count(),
858 buf.as_datatype().as_raw(),
859 self.destination_rank(),
860 tag,
861 self.as_communicator().as_raw(),
862 request,
863 )
864 })
865 .1,
866 scope,
867 )
868 }
869 }
870
871 /// Initiate an immediate (non-blocking) ready mode send operation.
872 ///
873 /// Initiate sending the data in `buf` in ready mode.
874 ///
875 /// # Examples
876 ///
877 /// See `examples/immediate.rs`
878 ///
879 /// # Standard section(s)
880 ///
881 /// 3.7.2
882 fn immediate_ready_send<'a, Sc, Buf: ?Sized>(&self, scope: Sc, buf: &'a Buf) -> Request<'a, Sc>
883 where
884 Buf: 'a + Buffer,
885 Sc: Scope<'a>,
886 {
887 self.immediate_ready_send_with_tag(scope, buf, Tag::default())
888 }
889}
890
891impl<'a, C> Destination for Process<'a, C>
892where
893 C: 'a + Communicator,
894{
895 fn destination_rank(&self) -> Rank {
896 self.rank()
897 }
898}
899
900/// Describes the result of a point to point receive operation.
901///
902/// # Standard section(s)
903///
904/// 3.2.5
905#[derive(Copy, Clone)]
906pub struct Status(MPI_Status);
907
908impl Status {
909 /// Construct a `Status` value from the raw MPI type
910 pub fn from_raw(status: MPI_Status) -> Status {
911 Status(status)
912 }
913
914 /// The rank of the message source
915 pub fn source_rank(&self) -> Rank {
916 self.0.MPI_SOURCE
917 }
918
919 /// The message tag
920 pub fn tag(&self) -> Tag {
921 self.0.MPI_TAG
922 }
923
924 /// Number of instances of the type contained in the message
925 pub fn count<D: Datatype>(&self, d: D) -> Count {
926 unsafe { with_uninitialized(|count| ffi::MPI_Get_count(&self.0, d.as_raw(), count)).1 }
927 }
928}
929
930impl fmt::Debug for Status {
931 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
932 write!(
933 f,
934 "Status {{ source_rank: {}, tag: {} }}",
935 self.source_rank(),
936 self.tag()
937 )
938 }
939}
940
941/// Describes a pending incoming message, probed by a `matched_probe()`.
942///
943/// # Standard section(s)
944///
945/// 3.8.2
946#[must_use]
947pub struct Message(MPI_Message);
948
949impl Message {
950 /// True if the `Source` for the probe was the null process.
951 pub fn is_no_proc(&self) -> bool {
952 self.as_raw() == unsafe { ffi::RSMPI_MESSAGE_NO_PROC }
953 }
954
955 /// Receive a previously probed message containing a single instance of type `Msg`.
956 ///
957 /// Receives the message `&self` which contains a single instance of type `Msg`.
958 ///
959 /// # Panics
960 /// Received an empty message
961 ///
962 /// # Standard section(s)
963 ///
964 /// 3.8.3
965 pub fn matched_receive<Msg>(mut self) -> (Msg, Status)
966 where
967 Msg: Equivalence,
968 {
969 unsafe {
970 let (_, res, status) = with_uninitialized2(|res, status| {
971 ffi::MPI_Mrecv(
972 res as _,
973 1,
974 Msg::equivalent_datatype().as_raw(),
975 self.as_raw_mut(),
976 status,
977 )
978 });
979 let status = Status(status);
980 if status.count(Msg::equivalent_datatype()) == 0 {
981 panic!("Received an empty message.");
982 }
983 (res, status)
984 }
985 }
986
987 /// Receive a previously probed message into a `Buffer`.
988 ///
989 /// Receive the message `&self` with contents matching `buf`.
990 ///
991 /// # Standard section(s)
992 ///
993 /// 3.8.3
994 pub fn matched_receive_into<Buf: ?Sized>(mut self, buf: &mut Buf) -> Status
995 where
996 Buf: BufferMut,
997 {
998 let status;
999 unsafe {
1000 status = with_uninitialized(|status| {
1001 ffi::MPI_Mrecv(
1002 buf.pointer_mut(),
1003 buf.count(),
1004 buf.as_datatype().as_raw(),
1005 self.as_raw_mut(),
1006 status,
1007 )
1008 })
1009 .1;
1010 assert_eq!(self.as_raw(), ffi::RSMPI_MESSAGE_NULL);
1011 };
1012 Status(status)
1013 }
1014
1015 /// Asynchronously receive a previously probed message into a `Buffer`.
1016 ///
1017 /// Asynchronously receive the message `&self` with contents matching `buf`.
1018 ///
1019 /// # Panics
1020 /// ?
1021 ///
1022 /// # Standard section(s)
1023 ///
1024 /// 3.8.3
1025 pub fn immediate_matched_receive_into<'a, Sc, Buf: ?Sized + 'a>(
1026 mut self,
1027 scope: Sc,
1028 buf: &'a mut Buf,
1029 ) -> Request<'a, Sc>
1030 where
1031 Buf: BufferMut,
1032 Sc: Scope<'a>,
1033 {
1034 unsafe {
1035 let request = with_uninitialized(|request| {
1036 ffi::MPI_Imrecv(
1037 buf.pointer_mut(),
1038 buf.count(),
1039 buf.as_datatype().as_raw(),
1040 self.as_raw_mut(),
1041 request,
1042 )
1043 })
1044 .1;
1045 assert_eq!(self.as_raw(), ffi::RSMPI_MESSAGE_NULL);
1046 Request::from_raw(request, scope)
1047 }
1048 }
1049}
1050
1051unsafe impl AsRaw for Message {
1052 type Raw = MPI_Message;
1053 fn as_raw(&self) -> Self::Raw {
1054 self.0
1055 }
1056}
1057
1058unsafe impl AsRawMut for Message {
1059 fn as_raw_mut(&mut self) -> *mut <Self as AsRaw>::Raw {
1060 &mut self.0
1061 }
1062}
1063
1064/// # Panics
1065/// ?
1066impl Drop for Message {
1067 fn drop(&mut self) {
1068 assert_eq!(
1069 self.as_raw(),
1070 unsafe { ffi::RSMPI_MESSAGE_NULL },
1071 "matched message dropped without receiving."
1072 );
1073 }
1074}
1075
1076/// Receive a previously probed message containing multiple instances of type `Msg` into a `Vec`.
1077///
1078/// # Standard section(s)
1079///
1080/// 3.8.3
1081pub trait MatchedReceiveVec {
1082 /// Receives the message `&self` which contains multiple instances of type `Msg` into a `Vec`.
1083 fn matched_receive_vec<Msg>(self) -> (Vec<Msg>, Status)
1084 where
1085 Msg: Equivalence;
1086}
1087
1088impl MatchedReceiveVec for (Message, Status) {
1089 fn matched_receive_vec<Msg>(self) -> (Vec<Msg>, Status)
1090 where
1091 Msg: Equivalence,
1092 {
1093 #[repr(transparent)]
1094 struct UninitMsg<M>(MaybeUninit<M>);
1095
1096 unsafe impl<M: Equivalence> Equivalence for UninitMsg<M> {
1097 type Out = M::Out;
1098
1099 fn equivalent_datatype() -> Self::Out {
1100 M::equivalent_datatype()
1101 }
1102 }
1103
1104 let (message, status) = self;
1105 let count = status
1106 .count(Msg::equivalent_datatype())
1107 .value_as()
1108 .expect("Message element count cannot be expressed as a usize.");
1109
1110 let mut res = (0..count)
1111 .map(|_| UninitMsg::<Msg>(MaybeUninit::uninit()))
1112 .collect::<Vec<_>>();
1113
1114 let status = message.matched_receive_into(&mut res[..]);
1115
1116 let res = unsafe { transmute(res) };
1117
1118 (res, status)
1119 }
1120}
1121
1122/// Sends `msg` to `destination` tagging it `sendtag` and simultaneously receives an
1123/// instance of `R` tagged `receivetag` from `source`.
1124///
1125/// # Panics
1126/// ?
1127///
1128/// # Standard section(s)
1129///
1130/// 3.10
1131pub fn send_receive_with_tags<M, D, R, S>(
1132 msg: &M,
1133 destination: &D,
1134 sendtag: Tag,
1135 source: &S,
1136 receivetag: Tag,
1137) -> (R, Status)
1138where
1139 M: Equivalence,
1140 D: Destination,
1141 R: Equivalence,
1142 S: Source,
1143{
1144 assert_eq!(
1145 source
1146 .as_communicator()
1147 .compare(destination.as_communicator()),
1148 CommunicatorRelation::Identical
1149 );
1150 unsafe {
1151 let (_, res, status) = with_uninitialized2(|res, status| {
1152 ffi::MPI_Sendrecv(
1153 msg.pointer(),
1154 msg.count(),
1155 msg.as_datatype().as_raw(),
1156 destination.destination_rank(),
1157 sendtag,
1158 res as _,
1159 1,
1160 R::equivalent_datatype().as_raw(),
1161 source.source_rank(),
1162 receivetag,
1163 source.as_communicator().as_raw(),
1164 status,
1165 )
1166 });
1167 let status = Status(status);
1168 (res, status)
1169 }
1170}
1171
1172/// Sends `msg` to `destination` and simultaneously receives an instance of `R` from
1173/// `source`.
1174///
1175/// # Examples
1176/// See `examples/send_receive.rs`
1177///
1178/// # Standard section(s)
1179///
1180/// 3.10
1181pub fn send_receive<R, M, D, S>(msg: &M, destination: &D, source: &S) -> (R, Status)
1182where
1183 M: Equivalence,
1184 D: Destination,
1185 R: Equivalence,
1186 S: Source,
1187{
1188 send_receive_with_tags(msg, destination, Tag::default(), source, unsafe {
1189 ffi::RSMPI_ANY_TAG
1190 })
1191}
1192
1193/// Sends the contents of `msg` to `destination` tagging it `sendtag` and
1194/// simultaneously receives a message tagged `receivetag` from `source` into
1195/// `buf`.
1196///
1197/// # Panics
1198/// ?
1199///
1200/// # Standard section(s)
1201///
1202/// 3.10
1203pub fn send_receive_into_with_tags<M: ?Sized, D, B: ?Sized, S>(
1204 msg: &M,
1205 destination: &D,
1206 sendtag: Tag,
1207 buf: &mut B,
1208 source: &S,
1209 receivetag: Tag,
1210) -> Status
1211where
1212 M: Buffer,
1213 D: Destination,
1214 B: BufferMut,
1215 S: Source,
1216{
1217 assert_eq!(
1218 source
1219 .as_communicator()
1220 .compare(destination.as_communicator()),
1221 CommunicatorRelation::Identical
1222 );
1223 unsafe {
1224 Status(
1225 with_uninitialized(|status| {
1226 ffi::MPI_Sendrecv(
1227 msg.pointer(),
1228 msg.count(),
1229 msg.as_datatype().as_raw(),
1230 destination.destination_rank(),
1231 sendtag,
1232 buf.pointer_mut(),
1233 buf.count(),
1234 buf.as_datatype().as_raw(),
1235 source.source_rank(),
1236 receivetag,
1237 source.as_communicator().as_raw(),
1238 status,
1239 )
1240 })
1241 .1,
1242 )
1243 }
1244}
1245
1246/// Sends the contents of `msg` to `destination` and
1247/// simultaneously receives a message from `source` into
1248/// `buf`.
1249///
1250/// # Standard section(s)
1251///
1252/// 3.10
1253pub fn send_receive_into<M: ?Sized, D, B: ?Sized, S>(
1254 msg: &M,
1255 destination: &D,
1256 buf: &mut B,
1257 source: &S,
1258) -> Status
1259where
1260 M: Buffer,
1261 D: Destination,
1262 B: BufferMut,
1263 S: Source,
1264{
1265 send_receive_into_with_tags(msg, destination, Tag::default(), buf, source, unsafe {
1266 ffi::RSMPI_ANY_TAG
1267 })
1268}
1269
1270/// Sends the contents of `buf` to `destination` tagging it `sendtag` and
1271/// simultaneously receives a message tagged `receivetag` from `source` and replaces the
1272/// contents of `buf` with it.
1273///
1274/// # Panics
1275/// ?
1276///
1277/// # Standard section(s)
1278///
1279/// 3.10
1280pub fn send_receive_replace_into_with_tags<B: ?Sized, D, S>(
1281 buf: &mut B,
1282 destination: &D,
1283 sendtag: Tag,
1284 source: &S,
1285 receivetag: Tag,
1286) -> Status
1287where
1288 B: BufferMut,
1289 D: Destination,
1290 S: Source,
1291{
1292 assert_eq!(
1293 source
1294 .as_communicator()
1295 .compare(destination.as_communicator()),
1296 CommunicatorRelation::Identical
1297 );
1298 unsafe {
1299 Status(
1300 with_uninitialized(|status| {
1301 ffi::MPI_Sendrecv_replace(
1302 buf.pointer_mut(),
1303 buf.count(),
1304 buf.as_datatype().as_raw(),
1305 destination.destination_rank(),
1306 sendtag,
1307 source.source_rank(),
1308 receivetag,
1309 source.as_communicator().as_raw(),
1310 status,
1311 )
1312 })
1313 .1,
1314 )
1315 }
1316}
1317
1318/// Sends the contents of `buf` to `destination` and
1319/// simultaneously receives a message from `source` and replaces the contents of
1320/// `buf` with it.
1321///
1322/// # Standard section(s)
1323///
1324/// 3.10
1325pub fn send_receive_replace_into<B: ?Sized, D, S>(
1326 buf: &mut B,
1327 destination: &D,
1328 source: &S,
1329) -> Status
1330where
1331 B: BufferMut,
1332 D: Destination,
1333 S: Source,
1334{
1335 send_receive_replace_into_with_tags(buf, destination, Tag::default(), source, unsafe {
1336 ffi::RSMPI_ANY_TAG
1337 })
1338}
1339
1340/// Will contain a value of type `T` received via a non-blocking receive operation.
1341#[must_use]
1342pub struct ReceiveFuture<T> {
1343 val: *mut T,
1344 req: Request<'static>,
1345}
1346
1347impl<T> ReceiveFuture<T>
1348where
1349 T: Equivalence,
1350{
1351 /// Wait for the receive operation to finish and return the received data.
1352 ///
1353 /// # Panics
1354 /// Received an empty message
1355 pub fn get(self) -> (T, Status) {
1356 let status = self.req.wait();
1357 if status.count(T::equivalent_datatype()) == 0 {
1358 panic!("Received an empty message into a ReceiveFuture.");
1359 }
1360 unsafe { (ptr::read(self.val), status) }
1361 }
1362
1363 /// Check whether the receive operation has finished.
1364 ///
1365 /// If the operation has finished, the data received is returned. Otherwise the future itself
1366 /// is returned.
1367 ///
1368 /// # Panics
1369 /// Received an empty message
1370 ///
1371 /// # Errors
1372 /// Receive has not finished yet
1373 pub fn r#try(mut self) -> Result<(T, Status), Self> {
1374 match self.req.test() {
1375 Ok(status) => {
1376 if status.count(T::equivalent_datatype()) == 0 {
1377 panic!("Received an empty message into a ReceiveFuture.");
1378 }
1379 unsafe { Ok((ptr::read(self.val), status)) }
1380 }
1381 Err(request) => {
1382 self.req = request;
1383 Err(self)
1384 }
1385 }
1386 }
1387}