mpi_fork_fnsp/collective.rs
1//! Collective communication
2//!
3//! Developing...
4//!
5//! # Unfinished features
6//!
7//! - **5.8**: All-to-all, `MPI_Alltoallw()`
8//! - **5.10**: Reduce-scatter, `MPI_Reduce_scatter()`
9//! - **5.12**: Nonblocking collective operations,
10//! `MPI_Ialltoallw()`, `MPI_Ireduce_scatter()`
11#![allow(
12 clippy::missing_panics_doc,
13 clippy::ptr_as_ptr,
14 clippy::used_underscore_binding
15)]
16
17#[cfg(feature = "user-operations")]
18use std::mem;
19use std::os::raw::{c_int, c_void};
20use std::{fmt, ptr};
21
22#[cfg(feature = "user-operations")]
23use libffi::middle::{Cif, Closure, Type};
24
25use crate::ffi;
26use crate::ffi::MPI_Op;
27
28use crate::datatype::traits::{
29 Buffer, BufferMut, Equivalence, PartitionedBuffer, PartitionedBufferMut,
30};
31#[cfg(feature = "user-operations")]
32use crate::datatype::{DatatypeRef, DynBuffer, DynBufferMut};
33use crate::raw::traits::{AsRaw, FromRaw};
34use crate::request::{Request, Scope, StaticScope};
35use crate::topology::traits::{AsCommunicator, Communicator};
36use crate::topology::{Process, Rank};
37use crate::with_uninitialized;
38
39/// Collective communication traits
40pub mod traits {
41 pub use super::{CommunicatorCollectives, Operation, Root};
42}
43
44/// Collective communication patterns defined on `Communicator`s
45pub trait CommunicatorCollectives: Communicator {
46 /// Barrier synchronization among all processes in a `Communicator`
47 ///
48 /// Partake in a barrier synchronization across all processes in the `Communicator` `&self`.
49 ///
50 /// Calling processes (or threads within the calling processes) will enter the barrier and block
51 /// execution until all processes in the `Communicator` `&self` have entered the barrier.
52 ///
53 /// # Examples
54 ///
55 /// See `examples/barrier.rs`
56 ///
57 /// # Standard section(s)
58 ///
59 /// 5.3
60 fn barrier(&self) {
61 unsafe {
62 ffi::MPI_Barrier(self.as_raw());
63 }
64 }
65
66 /// Gather contents of buffers on all participating processes.
67 ///
68 /// After the call completes, the contents of the send `Buffer`s on all processes will be
69 /// concatenated into the receive `Buffer`s on all ranks.
70 ///
71 /// All send `Buffer`s must contain the same count of elements.
72 ///
73 /// # Examples
74 ///
75 /// See `examples/all_gather.rs`
76 ///
77 /// # Standard section(s)
78 ///
79 /// 5.7
80 fn all_gather_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
81 where
82 S: Buffer,
83 R: BufferMut,
84 {
85 unsafe {
86 ffi::MPI_Allgather(
87 sendbuf.pointer(),
88 sendbuf.count(),
89 sendbuf.as_datatype().as_raw(),
90 recvbuf.pointer_mut(),
91 recvbuf.count() / self.size(),
92 recvbuf.as_datatype().as_raw(),
93 self.as_raw(),
94 );
95 }
96 }
97
98 /// Gather contents of buffers on all participating processes.
99 ///
100 /// After the call completes, the contents of the send `Buffer`s on all processes will be
101 /// concatenated into the receive `Buffer`s on all ranks.
102 ///
103 /// The send `Buffer`s may contain different counts of elements on different processes. The
104 /// distribution of elements in the receive `Buffer`s is specified via `Partitioned`.
105 ///
106 /// # Examples
107 ///
108 /// See `examples/all_gather_varcount.rs`
109 ///
110 /// # Standard section(s)
111 ///
112 /// 5.7
113 fn all_gather_varcount_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
114 where
115 S: Buffer,
116 R: PartitionedBufferMut,
117 {
118 unsafe {
119 ffi::MPI_Allgatherv(
120 sendbuf.pointer(),
121 sendbuf.count(),
122 sendbuf.as_datatype().as_raw(),
123 recvbuf.pointer_mut(),
124 recvbuf.counts().as_ptr(),
125 recvbuf.displs().as_ptr(),
126 recvbuf.as_datatype().as_raw(),
127 self.as_raw(),
128 );
129 }
130 }
131
132 /// Distribute the send `Buffer`s from all processes to the receive `Buffer`s on all processes.
133 ///
134 /// Each process sends and receives the same count of elements to and from each process.
135 ///
136 /// # Examples
137 ///
138 /// See `examples/all_to_all.rs`
139 ///
140 /// # Standard section(s)
141 ///
142 /// 5.8
143 fn all_to_all_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
144 where
145 S: Buffer,
146 R: BufferMut,
147 {
148 let c_size = self.size();
149 unsafe {
150 ffi::MPI_Alltoall(
151 sendbuf.pointer(),
152 sendbuf.count() / c_size,
153 sendbuf.as_datatype().as_raw(),
154 recvbuf.pointer_mut(),
155 recvbuf.count() / c_size,
156 recvbuf.as_datatype().as_raw(),
157 self.as_raw(),
158 );
159 }
160 }
161
162 /// Distribute the send `Buffer`s from all processes to the receive `Buffer`s on all processes.
163 ///
164 /// The count of elements to send and receive to and from each process can vary and is specified
165 /// using `Partitioned`.
166 ///
167 /// # Standard section(s)
168 ///
169 /// 5.8
170 fn all_to_all_varcount_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
171 where
172 S: PartitionedBuffer,
173 R: PartitionedBufferMut,
174 {
175 unsafe {
176 ffi::MPI_Alltoallv(
177 sendbuf.pointer(),
178 sendbuf.counts().as_ptr(),
179 sendbuf.displs().as_ptr(),
180 sendbuf.as_datatype().as_raw(),
181 recvbuf.pointer_mut(),
182 recvbuf.counts().as_ptr(),
183 recvbuf.displs().as_ptr(),
184 recvbuf.as_datatype().as_raw(),
185 self.as_raw(),
186 );
187 }
188 }
189
190 /// Performs a global reduction under the operation `op` of the input data in `sendbuf` and
191 /// stores the result in `recvbuf` on all processes.
192 ///
193 /// # Examples
194 ///
195 /// See `examples/reduce.rs`
196 ///
197 /// # Standard section(s)
198 ///
199 /// 5.9.6
200 fn all_reduce_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
201 where
202 S: Buffer,
203 R: BufferMut,
204 O: Operation,
205 {
206 unsafe {
207 ffi::MPI_Allreduce(
208 sendbuf.pointer(),
209 recvbuf.pointer_mut(),
210 sendbuf.count(),
211 sendbuf.as_datatype().as_raw(),
212 op.as_raw(),
213 self.as_raw(),
214 );
215 }
216 }
217
218 /// Performs an element-wise global reduction under the operation `op` of the input data in
219 /// `sendbuf` and scatters the result into equal sized blocks in the receive buffers on all
220 /// processes.
221 ///
222 /// # Examples
223 ///
224 /// See `examples/reduce.rs`
225 ///
226 /// # Standard section(s)
227 ///
228 /// 5.10.1
229 fn reduce_scatter_block_into<S: ?Sized, R: ?Sized, O>(
230 &self,
231 sendbuf: &S,
232 recvbuf: &mut R,
233 op: O,
234 ) where
235 S: Buffer,
236 R: BufferMut,
237 O: Operation,
238 {
239 assert_eq!(recvbuf.count() * self.size(), sendbuf.count());
240 unsafe {
241 ffi::MPI_Reduce_scatter_block(
242 sendbuf.pointer(),
243 recvbuf.pointer_mut(),
244 recvbuf.count(),
245 sendbuf.as_datatype().as_raw(),
246 op.as_raw(),
247 self.as_raw(),
248 );
249 }
250 }
251
252 /// Performs a global inclusive prefix reduction of the data in `sendbuf` into `recvbuf` under
253 /// operation `op`.
254 ///
255 /// # Examples
256 ///
257 /// See `examples/scan.rs`
258 ///
259 /// # Standard section(s)
260 ///
261 /// 5.11.1
262 fn scan_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
263 where
264 S: Buffer,
265 R: BufferMut,
266 O: Operation,
267 {
268 unsafe {
269 ffi::MPI_Scan(
270 sendbuf.pointer(),
271 recvbuf.pointer_mut(),
272 sendbuf.count(),
273 sendbuf.as_datatype().as_raw(),
274 op.as_raw(),
275 self.as_raw(),
276 );
277 }
278 }
279
280 /// Performs a global exclusive prefix reduction of the data in `sendbuf` into `recvbuf` under
281 /// operation `op`.
282 ///
283 /// # Examples
284 ///
285 /// See `examples/scan.rs`
286 ///
287 /// # Standard section(s)
288 ///
289 /// 5.11.2
290 fn exclusive_scan_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
291 where
292 S: Buffer,
293 R: BufferMut,
294 O: Operation,
295 {
296 unsafe {
297 ffi::MPI_Exscan(
298 sendbuf.pointer(),
299 recvbuf.pointer_mut(),
300 sendbuf.count(),
301 sendbuf.as_datatype().as_raw(),
302 op.as_raw(),
303 self.as_raw(),
304 );
305 }
306 }
307
308 /// Non-blocking barrier synchronization among all processes in a `Communicator`
309 ///
310 /// Calling processes (or threads within the calling processes) enter the barrier. Completion
311 /// methods on the associated request object will block until all processes have entered.
312 ///
313 /// # Examples
314 ///
315 /// See `examples/immediate_barrier.rs`
316 ///
317 /// # Standard section(s)
318 ///
319 /// 5.12.1
320 fn immediate_barrier(&self) -> Request<'static> {
321 unsafe {
322 Request::from_raw(
323 with_uninitialized(|request| ffi::MPI_Ibarrier(self.as_raw(), request)).1,
324 StaticScope,
325 )
326 }
327 }
328
329 /// Initiate non-blocking gather of the contents of all `sendbuf`s into all `rcevbuf`s on all
330 /// processes in the communicator.
331 ///
332 /// # Examples
333 ///
334 /// See `examples/immediate_all_gather.rs`
335 ///
336 /// # Standard section(s)
337 ///
338 /// 5.12.5
339 fn immediate_all_gather_into<'a, Sc, S: ?Sized, R: ?Sized>(
340 &self,
341 scope: Sc,
342 sendbuf: &'a S,
343 recvbuf: &'a mut R,
344 ) -> Request<'a, Sc>
345 where
346 S: 'a + Buffer,
347 R: 'a + BufferMut,
348 Sc: Scope<'a>,
349 {
350 unsafe {
351 let recvcount = recvbuf.count() / self.size();
352 Request::from_raw(
353 with_uninitialized(|request| {
354 ffi::MPI_Iallgather(
355 sendbuf.pointer(),
356 sendbuf.count(),
357 sendbuf.as_datatype().as_raw(),
358 recvbuf.pointer_mut(),
359 recvcount,
360 recvbuf.as_datatype().as_raw(),
361 self.as_raw(),
362 request,
363 )
364 })
365 .1,
366 scope,
367 )
368 }
369 }
370
371 /// Initiate non-blocking gather of the contents of all `sendbuf`s into all `rcevbuf`s on all
372 /// processes in the communicator.
373 ///
374 /// # Examples
375 ///
376 /// See `examples/immediate_all_gather_varcount.rs`
377 ///
378 /// # Standard section(s)
379 ///
380 /// 5.12.5
381 fn immediate_all_gather_varcount_into<'a, Sc, S: ?Sized, R: ?Sized>(
382 &self,
383 scope: Sc,
384 sendbuf: &'a S,
385 recvbuf: &'a mut R,
386 ) -> Request<'a, Sc>
387 where
388 S: 'a + Buffer,
389 R: 'a + PartitionedBufferMut,
390 Sc: Scope<'a>,
391 {
392 unsafe {
393 Request::from_raw(
394 with_uninitialized(|request| {
395 ffi::MPI_Iallgatherv(
396 sendbuf.pointer(),
397 sendbuf.count(),
398 sendbuf.as_datatype().as_raw(),
399 recvbuf.pointer_mut(),
400 recvbuf.counts().as_ptr(),
401 recvbuf.displs().as_ptr(),
402 recvbuf.as_datatype().as_raw(),
403 self.as_raw(),
404 request,
405 )
406 })
407 .1,
408 scope,
409 )
410 }
411 }
412
413 /// Initiate non-blocking all-to-all communication.
414 ///
415 /// # Examples
416 ///
417 /// See `examples/immediate_all_to_all.rs`
418 ///
419 /// # Standard section(s)
420 ///
421 /// 5.12.6
422 fn immediate_all_to_all_into<'a, Sc, S: ?Sized, R: ?Sized>(
423 &self,
424 scope: Sc,
425 sendbuf: &'a S,
426 recvbuf: &'a mut R,
427 ) -> Request<'a, Sc>
428 where
429 S: 'a + Buffer,
430 R: 'a + BufferMut,
431 Sc: Scope<'a>,
432 {
433 let c_size = self.size();
434 unsafe {
435 Request::from_raw(
436 with_uninitialized(|request| {
437 ffi::MPI_Ialltoall(
438 sendbuf.pointer(),
439 sendbuf.count() / c_size,
440 sendbuf.as_datatype().as_raw(),
441 recvbuf.pointer_mut(),
442 recvbuf.count() / c_size,
443 recvbuf.as_datatype().as_raw(),
444 self.as_raw(),
445 request,
446 )
447 })
448 .1,
449 scope,
450 )
451 }
452 }
453
454 /// Initiate non-blocking all-to-all communication.
455 ///
456 /// # Standard section(s)
457 ///
458 /// 5.12.6
459 fn immediate_all_to_all_varcount_into<'a, Sc, S: ?Sized, R: ?Sized>(
460 &self,
461 scope: Sc,
462 sendbuf: &'a S,
463 recvbuf: &'a mut R,
464 ) -> Request<'a, Sc>
465 where
466 S: 'a + PartitionedBuffer,
467 R: 'a + PartitionedBufferMut,
468 Sc: Scope<'a>,
469 {
470 unsafe {
471 Request::from_raw(
472 with_uninitialized(|request| {
473 ffi::MPI_Ialltoallv(
474 sendbuf.pointer(),
475 sendbuf.counts().as_ptr(),
476 sendbuf.displs().as_ptr(),
477 sendbuf.as_datatype().as_raw(),
478 recvbuf.pointer_mut(),
479 recvbuf.counts().as_ptr(),
480 recvbuf.displs().as_ptr(),
481 recvbuf.as_datatype().as_raw(),
482 self.as_raw(),
483 request,
484 )
485 })
486 .1,
487 scope,
488 )
489 }
490 }
491
492 /// Initiates a non-blocking global reduction under the operation `op` of the input data in
493 /// `sendbuf` and stores the result in `recvbuf` on all processes.
494 ///
495 /// # Examples
496 ///
497 /// See `examples/immediate_reduce.rs`
498 ///
499 /// # Standard section(s)
500 ///
501 /// 5.12.8
502 fn immediate_all_reduce_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
503 &self,
504 scope: Sc,
505 sendbuf: &'a S,
506 recvbuf: &'a mut R,
507 op: O,
508 ) -> Request<'a, Sc>
509 where
510 S: 'a + Buffer,
511 R: 'a + BufferMut,
512 O: 'a + Operation,
513 Sc: Scope<'a>,
514 {
515 unsafe {
516 Request::from_raw(
517 with_uninitialized(|request| {
518 ffi::MPI_Iallreduce(
519 sendbuf.pointer(),
520 recvbuf.pointer_mut(),
521 sendbuf.count(),
522 sendbuf.as_datatype().as_raw(),
523 op.as_raw(),
524 self.as_raw(),
525 request,
526 )
527 })
528 .1,
529 scope,
530 )
531 }
532 }
533
534 /// Initiates a non-blocking element-wise global reduction under the operation `op` of the
535 /// input data in `sendbuf` and scatters the result into equal sized blocks in the receive
536 /// buffers on all processes.
537 ///
538 /// # Examples
539 ///
540 /// See `examples/immediate_reduce.rs`
541 ///
542 /// # Standard section(s)
543 ///
544 /// 5.12.9
545 fn immediate_reduce_scatter_block_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
546 &self,
547 scope: Sc,
548 sendbuf: &'a S,
549 recvbuf: &'a mut R,
550 op: O,
551 ) -> Request<'a, Sc>
552 where
553 S: 'a + Buffer,
554 R: 'a + BufferMut,
555 O: 'a + Operation,
556 Sc: Scope<'a>,
557 {
558 assert_eq!(recvbuf.count() * self.size(), sendbuf.count());
559 unsafe {
560 Request::from_raw(
561 with_uninitialized(|request| {
562 ffi::MPI_Ireduce_scatter_block(
563 sendbuf.pointer(),
564 recvbuf.pointer_mut(),
565 recvbuf.count(),
566 sendbuf.as_datatype().as_raw(),
567 op.as_raw(),
568 self.as_raw(),
569 request,
570 )
571 })
572 .1,
573 scope,
574 )
575 }
576 }
577
578 /// Initiates a non-blocking global inclusive prefix reduction of the data in `sendbuf` into
579 /// `recvbuf` under operation `op`.
580 ///
581 /// # Examples
582 ///
583 /// See `examples/immediate_scan.rs`
584 ///
585 /// # Standard section(s)
586 ///
587 /// 5.12.11
588 fn immediate_scan_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
589 &self,
590 scope: Sc,
591 sendbuf: &'a S,
592 recvbuf: &'a mut R,
593 op: O,
594 ) -> Request<'a, Sc>
595 where
596 S: 'a + Buffer,
597 R: 'a + BufferMut,
598 O: 'a + Operation,
599 Sc: Scope<'a>,
600 {
601 unsafe {
602 Request::from_raw(
603 with_uninitialized(|request| {
604 ffi::MPI_Iscan(
605 sendbuf.pointer(),
606 recvbuf.pointer_mut(),
607 sendbuf.count(),
608 sendbuf.as_datatype().as_raw(),
609 op.as_raw(),
610 self.as_raw(),
611 request,
612 )
613 })
614 .1,
615 scope,
616 )
617 }
618 }
619
620 /// Initiates a non-blocking global exclusive prefix reduction of the data in `sendbuf` into
621 /// `recvbuf` under operation `op`.
622 ///
623 /// # Examples
624 ///
625 /// See `examples/immediate_scan.rs`
626 ///
627 /// # Standard section(s)
628 ///
629 /// 5.12.12
630 fn immediate_exclusive_scan_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
631 &self,
632 scope: Sc,
633 sendbuf: &'a S,
634 recvbuf: &'a mut R,
635 op: O,
636 ) -> Request<'a, Sc>
637 where
638 S: 'a + Buffer,
639 R: 'a + BufferMut,
640 O: 'a + Operation,
641 Sc: Scope<'a>,
642 {
643 unsafe {
644 Request::from_raw(
645 with_uninitialized(|request| {
646 ffi::MPI_Iexscan(
647 sendbuf.pointer(),
648 recvbuf.pointer_mut(),
649 sendbuf.count(),
650 sendbuf.as_datatype().as_raw(),
651 op.as_raw(),
652 self.as_raw(),
653 request,
654 )
655 })
656 .1,
657 scope,
658 )
659 }
660 }
661}
662
663impl<C: Communicator> CommunicatorCollectives for C {}
664
665/// Something that can take the role of 'root' in a collective operation.
666///
667/// Many collective operations define a 'root' process that takes a special role in the
668/// communication. These collective operations are implemented as default methods of this trait.
669pub trait Root: AsCommunicator {
670 /// Rank of the root process
671 fn root_rank(&self) -> Rank;
672
673 /// Broadcast of the contents of a buffer
674 ///
675 /// After the call completes, the `Buffer` on all processes in the `Communicator` of the `Root`
676 /// `&self` will contain what it contains on the `Root`.
677 ///
678 /// # Examples
679 ///
680 /// See `examples/broadcast.rs`
681 ///
682 /// # Standard section(s)
683 ///
684 /// 5.4
685 fn broadcast_into<Buf: ?Sized>(&self, buffer: &mut Buf)
686 where
687 Buf: BufferMut,
688 {
689 unsafe {
690 ffi::MPI_Bcast(
691 buffer.pointer_mut(),
692 buffer.count(),
693 buffer.as_datatype().as_raw(),
694 self.root_rank(),
695 self.as_communicator().as_raw(),
696 );
697 }
698 }
699
700 /// Gather contents of buffers on `Root`.
701 ///
702 /// After the call completes, the contents of the `Buffer`s on all ranks will be
703 /// concatenated into the `Buffer` on `Root`.
704 ///
705 /// All send `Buffer`s must have the same count of elements.
706 ///
707 /// This function must be called on all non-root processes.
708 ///
709 /// # Examples
710 ///
711 /// See `examples/gather.rs`
712 ///
713 /// # Standard section(s)
714 ///
715 /// 5.5
716 fn gather_into<S: ?Sized>(&self, sendbuf: &S)
717 where
718 S: Buffer,
719 {
720 assert_ne!(self.as_communicator().rank(), self.root_rank());
721 unsafe {
722 ffi::MPI_Gather(
723 sendbuf.pointer(),
724 sendbuf.count(),
725 sendbuf.as_datatype().as_raw(),
726 ptr::null_mut(),
727 0,
728 u8::equivalent_datatype().as_raw(),
729 self.root_rank(),
730 self.as_communicator().as_raw(),
731 );
732 }
733 }
734
735 /// Gather contents of buffers on `Root`.
736 ///
737 /// After the call completes, the contents of the `Buffer`s on all ranks will be
738 /// concatenated into the `Buffer` on `Root`.
739 ///
740 /// All send `Buffer`s must have the same count of elements.
741 ///
742 /// This function must be called on the root process.
743 ///
744 /// # Examples
745 ///
746 /// See `examples/gather.rs`
747 ///
748 /// # Standard section(s)
749 ///
750 /// 5.5
751 fn gather_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
752 where
753 S: Buffer,
754 R: BufferMut,
755 {
756 assert_eq!(self.as_communicator().rank(), self.root_rank());
757 unsafe {
758 let recvcount = recvbuf.count() / self.as_communicator().size();
759 ffi::MPI_Gather(
760 sendbuf.pointer(),
761 sendbuf.count(),
762 sendbuf.as_datatype().as_raw(),
763 recvbuf.pointer_mut(),
764 recvcount,
765 recvbuf.as_datatype().as_raw(),
766 self.root_rank(),
767 self.as_communicator().as_raw(),
768 );
769 }
770 }
771
772 /// Gather contents of buffers on `Root`.
773 ///
774 /// After the call completes, the contents of the `Buffer`s on all ranks will be
775 /// concatenated into the `Buffer` on `Root`.
776 ///
777 /// The send `Buffer`s may contain different counts of elements on different processes. The
778 /// distribution of elements in the receive `Buffer` is specified via `Partitioned`.
779 ///
780 /// This function must be called on all non-root processes.
781 ///
782 /// # Examples
783 ///
784 /// See `examples/gather_varcount.rs`
785 ///
786 /// # Standard section(s)
787 ///
788 /// 5.5
789 fn gather_varcount_into<S: ?Sized>(&self, sendbuf: &S)
790 where
791 S: Buffer,
792 {
793 assert_ne!(self.as_communicator().rank(), self.root_rank());
794 unsafe {
795 ffi::MPI_Gatherv(
796 sendbuf.pointer(),
797 sendbuf.count(),
798 sendbuf.as_datatype().as_raw(),
799 ptr::null_mut(),
800 ptr::null(),
801 ptr::null(),
802 u8::equivalent_datatype().as_raw(),
803 self.root_rank(),
804 self.as_communicator().as_raw(),
805 );
806 }
807 }
808
809 /// Gather contents of buffers on `Root`.
810 ///
811 /// After the call completes, the contents of the `Buffer`s on all ranks will be
812 /// concatenated into the `Buffer` on `Root`.
813 ///
814 /// The send `Buffer`s may contain different counts of elements on different processes. The
815 /// distribution of elements in the receive `Buffer` is specified via `Partitioned`.
816 ///
817 /// This function must be called on the root process.
818 ///
819 /// # Examples
820 ///
821 /// See `examples/gather_varcount.rs`
822 ///
823 /// # Standard section(s)
824 ///
825 /// 5.5
826 fn gather_varcount_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
827 where
828 S: Buffer,
829 R: PartitionedBufferMut,
830 {
831 assert_eq!(self.as_communicator().rank(), self.root_rank());
832 unsafe {
833 ffi::MPI_Gatherv(
834 sendbuf.pointer(),
835 sendbuf.count(),
836 sendbuf.as_datatype().as_raw(),
837 recvbuf.pointer_mut(),
838 recvbuf.counts().as_ptr(),
839 recvbuf.displs().as_ptr(),
840 recvbuf.as_datatype().as_raw(),
841 self.root_rank(),
842 self.as_communicator().as_raw(),
843 );
844 }
845 }
846
847 /// Scatter contents of a buffer on the root process to all processes.
848 ///
849 /// After the call completes each participating process will have received a part of the send
850 /// `Buffer` on the root process.
851 ///
852 /// All send `Buffer`s must have the same count of elements.
853 ///
854 /// This function must be called on all non-root processes.
855 ///
856 /// # Examples
857 ///
858 /// See `examples/scatter.rs`
859 ///
860 /// # Standard section(s)
861 ///
862 /// 5.6
863 fn scatter_into<R: ?Sized>(&self, recvbuf: &mut R)
864 where
865 R: BufferMut,
866 {
867 assert_ne!(self.as_communicator().rank(), self.root_rank());
868 unsafe {
869 ffi::MPI_Scatter(
870 ptr::null(),
871 0,
872 u8::equivalent_datatype().as_raw(),
873 recvbuf.pointer_mut(),
874 recvbuf.count(),
875 recvbuf.as_datatype().as_raw(),
876 self.root_rank(),
877 self.as_communicator().as_raw(),
878 );
879 }
880 }
881
882 /// Scatter contents of a buffer on the root process to all processes.
883 ///
884 /// After the call completes each participating process will have received a part of the send
885 /// `Buffer` on the root process.
886 ///
887 /// All send `Buffer`s must have the same count of elements.
888 ///
889 /// This function must be called on the root process.
890 ///
891 /// # Examples
892 ///
893 /// See `examples/scatter.rs`
894 ///
895 /// # Standard section(s)
896 ///
897 /// 5.6
898 fn scatter_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
899 where
900 S: Buffer,
901 R: BufferMut,
902 {
903 assert_eq!(self.as_communicator().rank(), self.root_rank());
904 let sendcount = sendbuf.count() / self.as_communicator().size();
905 unsafe {
906 ffi::MPI_Scatter(
907 sendbuf.pointer(),
908 sendcount,
909 sendbuf.as_datatype().as_raw(),
910 recvbuf.pointer_mut(),
911 recvbuf.count(),
912 recvbuf.as_datatype().as_raw(),
913 self.root_rank(),
914 self.as_communicator().as_raw(),
915 );
916 }
917 }
918
919 /// Scatter contents of a buffer on the root process to all processes.
920 ///
921 /// After the call completes each participating process will have received a part of the send
922 /// `Buffer` on the root process.
923 ///
924 /// The send `Buffer` may contain different counts of elements for different processes. The
925 /// distribution of elements in the send `Buffer` is specified via `Partitioned`.
926 ///
927 /// This function must be called on all non-root processes.
928 ///
929 /// # Examples
930 ///
931 /// See `examples/scatter_varcount.rs`
932 ///
933 /// # Standard section(s)
934 ///
935 /// 5.6
936 fn scatter_varcount_into<R: ?Sized>(&self, recvbuf: &mut R)
937 where
938 R: BufferMut,
939 {
940 assert_ne!(self.as_communicator().rank(), self.root_rank());
941 unsafe {
942 ffi::MPI_Scatterv(
943 ptr::null(),
944 ptr::null(),
945 ptr::null(),
946 u8::equivalent_datatype().as_raw(),
947 recvbuf.pointer_mut(),
948 recvbuf.count(),
949 recvbuf.as_datatype().as_raw(),
950 self.root_rank(),
951 self.as_communicator().as_raw(),
952 );
953 }
954 }
955
956 /// Scatter contents of a buffer on the root process to all processes.
957 ///
958 /// After the call completes each participating process will have received a part of the send
959 /// `Buffer` on the root process.
960 ///
961 /// The send `Buffer` may contain different counts of elements for different processes. The
962 /// distribution of elements in the send `Buffer` is specified via `Partitioned`.
963 ///
964 /// This function must be called on the root process.
965 ///
966 /// # Examples
967 ///
968 /// See `examples/scatter_varcount.rs`
969 ///
970 /// # Standard section(s)
971 ///
972 /// 5.6
973 fn scatter_varcount_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
974 where
975 S: PartitionedBuffer,
976 R: BufferMut,
977 {
978 assert_eq!(self.as_communicator().rank(), self.root_rank());
979 unsafe {
980 ffi::MPI_Scatterv(
981 sendbuf.pointer(),
982 sendbuf.counts().as_ptr(),
983 sendbuf.displs().as_ptr(),
984 sendbuf.as_datatype().as_raw(),
985 recvbuf.pointer_mut(),
986 recvbuf.count(),
987 recvbuf.as_datatype().as_raw(),
988 self.root_rank(),
989 self.as_communicator().as_raw(),
990 );
991 }
992 }
993
994 /// Performs a global reduction under the operation `op` of the input data in `sendbuf` and
995 /// stores the result on the `Root` process.
996 ///
997 /// This function must be called on all non-root processes.
998 ///
999 /// # Examples
1000 ///
1001 /// See `examples/reduce.rs`
1002 ///
1003 /// # Standard section(s)
1004 ///
1005 /// 5.9.1
1006 fn reduce_into<S: ?Sized, O>(&self, sendbuf: &S, op: O)
1007 where
1008 S: Buffer,
1009 O: Operation,
1010 {
1011 assert_ne!(self.as_communicator().rank(), self.root_rank());
1012 unsafe {
1013 ffi::MPI_Reduce(
1014 sendbuf.pointer(),
1015 ptr::null_mut(),
1016 sendbuf.count(),
1017 sendbuf.as_datatype().as_raw(),
1018 op.as_raw(),
1019 self.root_rank(),
1020 self.as_communicator().as_raw(),
1021 );
1022 }
1023 }
1024
1025 /// Performs a global reduction under the operation `op` of the input data in `sendbuf` and
1026 /// stores the result on the `Root` process.
1027 ///
1028 /// This function must be called on the root process.
1029 ///
1030 /// # Examples
1031 ///
1032 /// See `examples/reduce.rs`
1033 ///
1034 /// # Standard section(s)
1035 ///
1036 /// 5.9.1
1037 fn reduce_into_root<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
1038 where
1039 S: Buffer,
1040 R: BufferMut,
1041 O: Operation,
1042 {
1043 assert_eq!(self.as_communicator().rank(), self.root_rank());
1044 unsafe {
1045 ffi::MPI_Reduce(
1046 sendbuf.pointer(),
1047 recvbuf.pointer_mut(),
1048 sendbuf.count(),
1049 sendbuf.as_datatype().as_raw(),
1050 op.as_raw(),
1051 self.root_rank(),
1052 self.as_communicator().as_raw(),
1053 );
1054 }
1055 }
1056
1057 /// Initiate broadcast of a value from the `Root` process to all other processes.
1058 ///
1059 /// # Examples
1060 ///
1061 /// See `examples/immediate_broadcast.rs`
1062 ///
1063 /// # Standard section(s)
1064 ///
1065 /// 5.12.2
1066 fn immediate_broadcast_into<'a, Sc, Buf: ?Sized>(
1067 &self,
1068 scope: Sc,
1069 buf: &'a mut Buf,
1070 ) -> Request<'a, Sc>
1071 where
1072 Buf: 'a + BufferMut,
1073 Sc: Scope<'a>,
1074 {
1075 unsafe {
1076 Request::from_raw(
1077 with_uninitialized(|request| {
1078 ffi::MPI_Ibcast(
1079 buf.pointer_mut(),
1080 buf.count(),
1081 buf.as_datatype().as_raw(),
1082 self.root_rank(),
1083 self.as_communicator().as_raw(),
1084 request,
1085 )
1086 })
1087 .1,
1088 scope,
1089 )
1090 }
1091 }
1092
1093 /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1094 ///
1095 /// This function must be called on all non-root processes.
1096 ///
1097 /// # Examples
1098 ///
1099 /// See `examples/immediate_gather.rs`
1100 ///
1101 /// # Standard section(s)
1102 ///
1103 /// 5.12.3
1104 fn immediate_gather_into<'a, Sc, S: ?Sized>(&self, scope: Sc, sendbuf: &'a S) -> Request<'a, Sc>
1105 where
1106 S: 'a + Buffer,
1107 Sc: Scope<'a>,
1108 {
1109 assert_ne!(self.as_communicator().rank(), self.root_rank());
1110 unsafe {
1111 Request::from_raw(
1112 with_uninitialized(|request| {
1113 ffi::MPI_Igather(
1114 sendbuf.pointer(),
1115 sendbuf.count(),
1116 sendbuf.as_datatype().as_raw(),
1117 ptr::null_mut(),
1118 0,
1119 u8::equivalent_datatype().as_raw(),
1120 self.root_rank(),
1121 self.as_communicator().as_raw(),
1122 request,
1123 )
1124 })
1125 .1,
1126 scope,
1127 )
1128 }
1129 }
1130
1131 /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1132 ///
1133 /// This function must be called on the root processes.
1134 ///
1135 /// # Examples
1136 ///
1137 /// See `examples/immediate_gather.rs`
1138 ///
1139 /// # Standard section(s)
1140 ///
1141 /// 5.12.3
1142 fn immediate_gather_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
1143 &self,
1144 scope: Sc,
1145 sendbuf: &'a S,
1146 recvbuf: &'a mut R,
1147 ) -> Request<'a, Sc>
1148 where
1149 S: 'a + Buffer,
1150 R: 'a + BufferMut,
1151 Sc: Scope<'a>,
1152 {
1153 assert_eq!(self.as_communicator().rank(), self.root_rank());
1154 unsafe {
1155 let recvcount = recvbuf.count() / self.as_communicator().size();
1156 Request::from_raw(
1157 with_uninitialized(|request| {
1158 ffi::MPI_Igather(
1159 sendbuf.pointer(),
1160 sendbuf.count(),
1161 sendbuf.as_datatype().as_raw(),
1162 recvbuf.pointer_mut(),
1163 recvcount,
1164 recvbuf.as_datatype().as_raw(),
1165 self.root_rank(),
1166 self.as_communicator().as_raw(),
1167 request,
1168 )
1169 })
1170 .1,
1171 scope,
1172 )
1173 }
1174 }
1175
1176 /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1177 ///
1178 /// This function must be called on all non-root processes.
1179 ///
1180 /// # Examples
1181 ///
1182 /// See `examples/immediate_gather_varcount.rs`
1183 ///
1184 /// # Standard section(s)
1185 ///
1186 /// 5.12.3
1187 fn immediate_gather_varcount_into<'a, Sc, S: ?Sized>(
1188 &self,
1189 scope: Sc,
1190 sendbuf: &'a S,
1191 ) -> Request<'a, Sc>
1192 where
1193 S: 'a + Buffer,
1194 Sc: Scope<'a>,
1195 {
1196 assert_ne!(self.as_communicator().rank(), self.root_rank());
1197 unsafe {
1198 Request::from_raw(
1199 with_uninitialized(|request| {
1200 ffi::MPI_Igatherv(
1201 sendbuf.pointer(),
1202 sendbuf.count(),
1203 sendbuf.as_datatype().as_raw(),
1204 ptr::null_mut(),
1205 ptr::null(),
1206 ptr::null(),
1207 u8::equivalent_datatype().as_raw(),
1208 self.root_rank(),
1209 self.as_communicator().as_raw(),
1210 request,
1211 )
1212 })
1213 .1,
1214 scope,
1215 )
1216 }
1217 }
1218
1219 /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1220 ///
1221 /// This function must be called on the root processes.
1222 ///
1223 /// # Examples
1224 ///
1225 /// See `examples/immediate_gather_varcount.rs`
1226 ///
1227 /// # Standard section(s)
1228 ///
1229 /// 5.12.3
1230 fn immediate_gather_varcount_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
1231 &self,
1232 scope: Sc,
1233 sendbuf: &'a S,
1234 recvbuf: &'a mut R,
1235 ) -> Request<'a, Sc>
1236 where
1237 S: 'a + Buffer,
1238 R: 'a + PartitionedBufferMut,
1239 Sc: Scope<'a>,
1240 {
1241 assert_eq!(self.as_communicator().rank(), self.root_rank());
1242 unsafe {
1243 Request::from_raw(
1244 with_uninitialized(|request| {
1245 ffi::MPI_Igatherv(
1246 sendbuf.pointer(),
1247 sendbuf.count(),
1248 sendbuf.as_datatype().as_raw(),
1249 recvbuf.pointer_mut(),
1250 recvbuf.counts().as_ptr(),
1251 recvbuf.displs().as_ptr(),
1252 recvbuf.as_datatype().as_raw(),
1253 self.root_rank(),
1254 self.as_communicator().as_raw(),
1255 request,
1256 )
1257 })
1258 .1,
1259 scope,
1260 )
1261 }
1262 }
1263
1264 /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1265 ///
1266 /// This function must be called on all non-root processes.
1267 ///
1268 /// # Examples
1269 ///
1270 /// See `examples/immediate_scatter.rs`
1271 ///
1272 /// # Standard section(s)
1273 ///
1274 /// 5.12.4
1275 fn immediate_scatter_into<'a, Sc, R: ?Sized>(
1276 &self,
1277 scope: Sc,
1278 recvbuf: &'a mut R,
1279 ) -> Request<'a, Sc>
1280 where
1281 R: 'a + BufferMut,
1282 Sc: Scope<'a>,
1283 {
1284 assert_ne!(self.as_communicator().rank(), self.root_rank());
1285 unsafe {
1286 Request::from_raw(
1287 with_uninitialized(|request| {
1288 ffi::MPI_Iscatter(
1289 ptr::null(),
1290 0,
1291 u8::equivalent_datatype().as_raw(),
1292 recvbuf.pointer_mut(),
1293 recvbuf.count(),
1294 recvbuf.as_datatype().as_raw(),
1295 self.root_rank(),
1296 self.as_communicator().as_raw(),
1297 request,
1298 )
1299 })
1300 .1,
1301 scope,
1302 )
1303 }
1304 }
1305
1306 /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1307 ///
1308 /// This function must be called on the root processes.
1309 ///
1310 /// # Examples
1311 ///
1312 /// See `examples/immediate_scatter.rs`
1313 ///
1314 /// # Standard section(s)
1315 ///
1316 /// 5.12.4
1317 fn immediate_scatter_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
1318 &self,
1319 scope: Sc,
1320 sendbuf: &'a S,
1321 recvbuf: &'a mut R,
1322 ) -> Request<'a, Sc>
1323 where
1324 S: 'a + Buffer,
1325 R: 'a + BufferMut,
1326 Sc: Scope<'a>,
1327 {
1328 assert_eq!(self.as_communicator().rank(), self.root_rank());
1329 unsafe {
1330 let sendcount = sendbuf.count() / self.as_communicator().size();
1331 Request::from_raw(
1332 with_uninitialized(|request| {
1333 ffi::MPI_Iscatter(
1334 sendbuf.pointer(),
1335 sendcount,
1336 sendbuf.as_datatype().as_raw(),
1337 recvbuf.pointer_mut(),
1338 recvbuf.count(),
1339 recvbuf.as_datatype().as_raw(),
1340 self.root_rank(),
1341 self.as_communicator().as_raw(),
1342 request,
1343 )
1344 })
1345 .1,
1346 scope,
1347 )
1348 }
1349 }
1350
1351 /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1352 ///
1353 /// This function must be called on all non-root processes.
1354 ///
1355 /// # Examples
1356 ///
1357 /// See `examples/immediate_scatter_varcount.rs`
1358 ///
1359 /// # Standard section(s)
1360 ///
1361 /// 5.12.4
1362 fn immediate_scatter_varcount_into<'a, Sc, R: ?Sized>(
1363 &self,
1364 scope: Sc,
1365 recvbuf: &'a mut R,
1366 ) -> Request<'a, Sc>
1367 where
1368 R: 'a + BufferMut,
1369 Sc: Scope<'a>,
1370 {
1371 assert_ne!(self.as_communicator().rank(), self.root_rank());
1372 unsafe {
1373 Request::from_raw(
1374 with_uninitialized(|request| {
1375 ffi::MPI_Iscatterv(
1376 ptr::null(),
1377 ptr::null(),
1378 ptr::null(),
1379 u8::equivalent_datatype().as_raw(),
1380 recvbuf.pointer_mut(),
1381 recvbuf.count(),
1382 recvbuf.as_datatype().as_raw(),
1383 self.root_rank(),
1384 self.as_communicator().as_raw(),
1385 request,
1386 )
1387 })
1388 .1,
1389 scope,
1390 )
1391 }
1392 }
1393
1394 /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1395 ///
1396 /// This function must be called on the root processes.
1397 ///
1398 /// # Examples
1399 ///
1400 /// See `examples/immediate_scatter_varcount.rs`
1401 ///
1402 /// # Standard section(s)
1403 ///
1404 /// 5.12.4
1405 fn immediate_scatter_varcount_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
1406 &self,
1407 scope: Sc,
1408 sendbuf: &'a S,
1409 recvbuf: &'a mut R,
1410 ) -> Request<'a, Sc>
1411 where
1412 S: 'a + PartitionedBuffer,
1413 R: 'a + BufferMut,
1414 Sc: Scope<'a>,
1415 {
1416 assert_eq!(self.as_communicator().rank(), self.root_rank());
1417 unsafe {
1418 Request::from_raw(
1419 with_uninitialized(|request| {
1420 ffi::MPI_Iscatterv(
1421 sendbuf.pointer(),
1422 sendbuf.counts().as_ptr(),
1423 sendbuf.displs().as_ptr(),
1424 sendbuf.as_datatype().as_raw(),
1425 recvbuf.pointer_mut(),
1426 recvbuf.count(),
1427 recvbuf.as_datatype().as_raw(),
1428 self.root_rank(),
1429 self.as_communicator().as_raw(),
1430 request,
1431 )
1432 })
1433 .1,
1434 scope,
1435 )
1436 }
1437 }
1438
1439 /// Initiates a non-blacking global reduction under the operation `op` of the input data in
1440 /// `sendbuf` and stores the result on the `Root` process.
1441 ///
1442 /// This function must be called on all non-root processes.
1443 ///
1444 /// # Examples
1445 ///
1446 /// See `examples/immediate_reduce.rs`
1447 ///
1448 /// # Standard section(s)
1449 ///
1450 /// 5.12.7
1451 fn immediate_reduce_into<'a, Sc, S: ?Sized, O>(
1452 &self,
1453 scope: Sc,
1454 sendbuf: &'a S,
1455 op: O,
1456 ) -> Request<'a, Sc>
1457 where
1458 S: 'a + Buffer,
1459 O: 'a + Operation,
1460 Sc: Scope<'a>,
1461 {
1462 assert_ne!(self.as_communicator().rank(), self.root_rank());
1463 unsafe {
1464 Request::from_raw(
1465 with_uninitialized(|request| {
1466 ffi::MPI_Ireduce(
1467 sendbuf.pointer(),
1468 ptr::null_mut(),
1469 sendbuf.count(),
1470 sendbuf.as_datatype().as_raw(),
1471 op.as_raw(),
1472 self.root_rank(),
1473 self.as_communicator().as_raw(),
1474 request,
1475 )
1476 })
1477 .1,
1478 scope,
1479 )
1480 }
1481 }
1482
1483 /// Initiates a non-blocking global reduction under the operation `op` of the input data in
1484 /// `sendbuf` and stores the result on the `Root` process.
1485 ///
1486 /// # Examples
1487 ///
1488 /// See `examples/immediate_reduce.rs`
1489 ///
1490 /// This function must be called on the root process.
1491 ///
1492 /// # Standard section(s)
1493 ///
1494 /// 5.12.7
1495 fn immediate_reduce_into_root<'a, Sc, S: ?Sized, R: ?Sized, O>(
1496 &self,
1497 scope: Sc,
1498 sendbuf: &'a S,
1499 recvbuf: &'a mut R,
1500 op: O,
1501 ) -> Request<'a, Sc>
1502 where
1503 S: 'a + Buffer,
1504 R: 'a + BufferMut,
1505 O: 'a + Operation,
1506 Sc: Scope<'a>,
1507 {
1508 assert_eq!(self.as_communicator().rank(), self.root_rank());
1509 unsafe {
1510 Request::from_raw(
1511 with_uninitialized(|request| {
1512 ffi::MPI_Ireduce(
1513 sendbuf.pointer(),
1514 recvbuf.pointer_mut(),
1515 sendbuf.count(),
1516 sendbuf.as_datatype().as_raw(),
1517 op.as_raw(),
1518 self.root_rank(),
1519 self.as_communicator().as_raw(),
1520 request,
1521 )
1522 })
1523 .1,
1524 scope,
1525 )
1526 }
1527 }
1528}
1529
1530impl<'a, C: 'a + Communicator> Root for Process<'a, C> {
1531 fn root_rank(&self) -> Rank {
1532 self.rank()
1533 }
1534}
1535
1536/// An operation to be used in a reduction or scan type operation, e.g. `MPI_SUM`
1537pub trait Operation: AsRaw<Raw = MPI_Op> {
1538 /// Returns whether the operation is commutative.
1539 ///
1540 /// # Standard section(s)
1541 ///
1542 /// 5.9.7
1543 fn is_commutative(&self) -> bool {
1544 unsafe {
1545 let mut commute = 0;
1546 ffi::MPI_Op_commutative(self.as_raw(), &mut commute);
1547 commute != 0
1548 }
1549 }
1550}
1551impl<'a, T: 'a + Operation> Operation for &'a T {}
1552
1553/// A built-in operation like `MPI_SUM`
1554///
1555/// # Examples
1556///
1557/// See `examples/reduce.rs`
1558///
1559/// # Standard section(s)
1560///
1561/// 5.9.2
1562#[derive(Copy, Clone)]
1563pub struct SystemOperation(MPI_Op);
1564
1565macro_rules! system_operation_constructors {
1566 ($($ctor:ident => $val:path),*) => (
1567 $(pub fn $ctor() -> SystemOperation {
1568 //! A built-in operation
1569 SystemOperation(unsafe { $val })
1570 })*
1571 )
1572}
1573
1574impl SystemOperation {
1575 system_operation_constructors! {
1576 max => ffi::RSMPI_MAX,
1577 min => ffi::RSMPI_MIN,
1578 sum => ffi::RSMPI_SUM,
1579 product => ffi::RSMPI_PROD,
1580 logical_and => ffi::RSMPI_LAND,
1581 bitwise_and => ffi::RSMPI_BAND,
1582 logical_or => ffi::RSMPI_LOR,
1583 bitwise_or => ffi::RSMPI_BOR,
1584 logical_xor => ffi::RSMPI_LXOR,
1585 bitwise_xor => ffi::RSMPI_BXOR
1586 }
1587}
1588
1589unsafe impl AsRaw for SystemOperation {
1590 type Raw = MPI_Op;
1591 fn as_raw(&self) -> Self::Raw {
1592 self.0
1593 }
1594}
1595
1596impl Operation for SystemOperation {}
1597
1598trait Erased {}
1599
1600impl<T> Erased for T {}
1601
1602/// A user-defined operation.
1603///
1604/// The lifetime `'a` of the operation is limited by the lifetime of the underlying closure.
1605///
1606/// For safety reasons, `UserOperation` is in of itself not considered an `Operation`, but a
1607/// reference of it is. This limitation may be lifted in the future when `Request` objects can
1608/// store finalizers.
1609///
1610/// **Note:** When a `UserOperation` is passed to a non-blocking API call, it must outlive the
1611/// completion of the request. This is normally enforced by the safe API, so this is only a concern
1612/// if you use the unsafe API. Do not rely on MPI's internal reference-counting here, because once
1613/// `UserOperation` is destroyed, the closure object will be deallocated even if the `MPI_Op` handle
1614/// is still alive due to outstanding references.
1615///
1616/// # Examples
1617///
1618/// See `examples/reduce.rs` and `examples/immediate_reduce.rs`
1619#[cfg(feature = "user-operations")]
1620pub struct UserOperation<'a> {
1621 op: MPI_Op,
1622 _anchor: Box<dyn Erased + 'a>, // keeps the internal data alive
1623}
1624
1625#[cfg(feature = "user-operations")]
1626impl<'a> fmt::Debug for UserOperation<'a> {
1627 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1628 f.debug_tuple("UserOperation").field(&self.op).finish()
1629 }
1630}
1631
1632#[cfg(feature = "user-operations")]
1633impl<'a> Drop for UserOperation<'a> {
1634 fn drop(&mut self) {
1635 unsafe {
1636 ffi::MPI_Op_free(&mut self.op);
1637 }
1638 }
1639}
1640
1641#[cfg(feature = "user-operations")]
1642unsafe impl<'a> AsRaw for UserOperation<'a> {
1643 type Raw = MPI_Op;
1644 fn as_raw(&self) -> Self::Raw {
1645 self.op
1646 }
1647}
1648
1649#[cfg(feature = "user-operations")]
1650impl<'a, 'b> Operation for &'b UserOperation<'a> {}
1651
1652#[cfg(feature = "user-operations")]
1653impl<'a> UserOperation<'a> {
1654 /// Define an operation using a closure. The operation must be associative.
1655 ///
1656 /// This is a more readable shorthand for the `new` method. Refer to [`new`](#method.new) for
1657 /// more information.
1658 pub fn associative<F>(function: F) -> Self
1659 where
1660 F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
1661 {
1662 Self::new(false, function)
1663 }
1664
1665 /// Define an operation using a closure. The operation must be both associative and
1666 /// commutative.
1667 ///
1668 /// This is a more readable shorthand for the `new` method. Refer to [`new`](#method.new) for
1669 /// more information.
1670 pub fn commutative<F>(function: F) -> Self
1671 where
1672 F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
1673 {
1674 Self::new(true, function)
1675 }
1676
1677 /// Creates an associative and possibly commutative operation using a closure.
1678 ///
1679 /// The closure receives two arguments `invec` and `inoutvec` as dynamically typed buffers. It
1680 /// shall set `inoutvec` to the value of `f(invec, inoutvec)`, where `f` is a binary associative
1681 /// operation.
1682 ///
1683 /// If the operation is also commutative, setting `commute` to `true` may yield performance
1684 /// benefits.
1685 ///
1686 /// **Note:** If the closure panics, the entire program will abort.
1687 ///
1688 /// # Standard section(s)
1689 ///
1690 /// 5.9.5
1691 pub fn new<F>(commute: bool, function: F) -> Self
1692 where
1693 F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
1694 {
1695 struct ClosureAnchor<F> {
1696 rust_closure: F,
1697 _ffi_closure: Option<Closure<'static>>,
1698 }
1699
1700 unsafe extern "C" fn trampoline<'a, F: Fn(DynBuffer, DynBufferMut) + Sync + 'a>(
1701 cif: &libffi::low::ffi_cif,
1702 _result: &mut c_void,
1703 args: *const *const c_void,
1704 user_function: &F,
1705 ) {
1706 debug_assert_eq!(4, cif.nargs);
1707
1708 let (mut invec, mut inoutvec, len, datatype) = (
1709 *(*args.offset(0) as *const *mut c_void),
1710 *(*args.offset(1) as *const *mut c_void),
1711 *(*args.offset(2) as *const *mut i32),
1712 *(*args.offset(3) as *const *mut ffi::MPI_Datatype),
1713 );
1714
1715 let len = *len;
1716 let datatype = DatatypeRef::from_raw(*datatype);
1717 if len == 0 {
1718 // precautionary measure: ensure pointers are not null
1719 invec = [].as_mut_ptr();
1720 inoutvec = [].as_mut_ptr();
1721 }
1722
1723 user_function(
1724 DynBuffer::from_raw(invec, len, datatype),
1725 DynBufferMut::from_raw(inoutvec, len, datatype),
1726 )
1727 }
1728
1729 // must box it to prevent moves
1730 let mut anchor = Box::new(ClosureAnchor {
1731 rust_closure: function,
1732 _ffi_closure: None,
1733 });
1734
1735 let args = [
1736 Type::pointer(), // void *
1737 Type::pointer(), // void *
1738 Type::pointer(), // int32_t *
1739 Type::pointer(), // MPI_Datatype *
1740 ];
1741 #[allow(unused_mut)]
1742 let mut cif = Cif::new(args.iter().cloned(), Type::void());
1743 // MS-MPI uses "stdcall" calling convention on 32-bit x86
1744 #[cfg(all(msmpi, target_arch = "x86"))]
1745 cif.set_abi(libffi::raw::ffi_abi_FFI_STDCALL);
1746
1747 let op;
1748 anchor._ffi_closure = Some(unsafe {
1749 let ffi_closure = Closure::new(cif, trampoline, &anchor.rust_closure);
1750 op = with_uninitialized(|op| {
1751 ffi::MPI_Op_create(Some(*ffi_closure.instantiate_code_ptr()), commute as _, op)
1752 })
1753 .1;
1754 mem::transmute(ffi_closure) // erase the lifetime
1755 });
1756 UserOperation {
1757 op,
1758 _anchor: anchor,
1759 }
1760 }
1761
1762 /// Creates a `UserOperation` from raw parts.
1763 ///
1764 /// Here, `anchor` is an arbitrary object that is stored alongside the `MPI_Op`.
1765 /// This can be used to attach finalizers to the object.
1766 ///
1767 /// # Safety
1768 /// `MPI_Op` must not be `MPI_OP_NULL`
1769 pub unsafe fn from_raw<T: 'a>(op: MPI_Op, anchor: Box<T>) -> Self {
1770 Self {
1771 op,
1772 _anchor: anchor,
1773 }
1774 }
1775}
1776
1777/// An unsafe user-defined operation.
1778///
1779/// Unsafe user-defined operations are created from pointers to functions that have the unsafe
1780/// signatures of user functions defined in the MPI C bindings, `UnsafeUserFunction`.
1781///
1782/// The recommended way to create user-defined operations is through the safer `UserOperation`
1783/// type. This type can be used as a work-around in situations where the `libffi` dependency is not
1784/// available.
1785pub struct UnsafeUserOperation {
1786 op: MPI_Op,
1787}
1788
1789impl fmt::Debug for UnsafeUserOperation {
1790 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1791 f.debug_tuple("UnsafeUserOperation")
1792 .field(&self.op)
1793 .finish()
1794 }
1795}
1796
1797impl Drop for UnsafeUserOperation {
1798 fn drop(&mut self) {
1799 unsafe {
1800 ffi::MPI_Op_free(&mut self.op);
1801 }
1802 }
1803}
1804
1805unsafe impl AsRaw for UnsafeUserOperation {
1806 type Raw = MPI_Op;
1807 fn as_raw(&self) -> Self::Raw {
1808 self.op
1809 }
1810}
1811
1812impl<'a> Operation for &'a UnsafeUserOperation {}
1813
1814/// A raw pointer to a function that can be used to define an `UnsafeUserOperation`.
1815#[cfg(not(all(msmpi, target_arch = "x86")))]
1816pub type UnsafeUserFunction =
1817 unsafe extern "C" fn(*mut c_void, *mut c_void, *mut c_int, *mut ffi::MPI_Datatype);
1818
1819/// A raw pointer to a function that can be used to define an `UnsafeUserOperation`.
1820///
1821/// MS-MPI uses "stdcall" rather than "C" calling convention. "stdcall" is ignored on x86_64
1822/// Windows and the default calling convention is used instead.
1823#[cfg(all(msmpi, target_arch = "x86"))]
1824pub type UnsafeUserFunction =
1825 unsafe extern "stdcall" fn(*mut c_void, *mut c_void, *mut c_int, *mut ffi::MPI_Datatype);
1826
1827impl UnsafeUserOperation {
1828 /// Define an unsafe operation using a function pointer. The operation must be associative.
1829 ///
1830 /// This is a more readable shorthand for the `new` method. Refer to [`new`](#method.new) for
1831 /// more information.
1832 ///
1833 /// # Safety
1834 /// The construction of an `UnsafeUserOperation` asserts that `function` is safe to be called
1835 /// in all reductions that this `UnsafeUserOperation` is used in.
1836 pub unsafe fn associative(function: UnsafeUserFunction) -> Self {
1837 Self::new(false, function)
1838 }
1839
1840 /// Define an unsafe operation using a function pointer. The operation must be both associative
1841 /// and commutative.
1842 ///
1843 /// This is a more readable shorthand for the `new` method. Refer to [`new`](#method.new) for
1844 /// more information.
1845 ///
1846 /// # Safety
1847 /// The construction of an `UnsafeUserOperation` asserts that `function` is safe to be called
1848 /// in all reductions that this `UnsafeUserOperation` is used in.
1849 pub unsafe fn commutative(function: UnsafeUserFunction) -> Self {
1850 Self::new(true, function)
1851 }
1852
1853 /// Creates an associative and possibly commutative unsafe operation using a function pointer.
1854 ///
1855 /// The function receives raw `*mut c_void` as `invec` and `inoutvec` and the number of elemnts
1856 /// of those two vectors as a `*mut c_int` `len`. It shall set `inoutvec`
1857 /// to the value of `f(invec, inoutvec)`, where `f` is a binary associative operation.
1858 ///
1859 /// If the operation is also commutative, setting `commute` to `true` may yield performance
1860 /// benefits.
1861 ///
1862 /// **Note:** The user function is not allowed to panic.
1863 ///
1864 /// # Standard section(s)
1865 ///
1866 /// 5.9.5
1867 ///
1868 /// # Safety
1869 /// The construction of an `UnsafeUserOperation` asserts that `function` is safe to be called
1870 /// in all reductions that this `UnsafeUserOperation` is used in.
1871 pub unsafe fn new(commute: bool, function: UnsafeUserFunction) -> Self {
1872 UnsafeUserOperation {
1873 op: with_uninitialized(|op| ffi::MPI_Op_create(Some(function), commute as _, op)).1,
1874 }
1875 }
1876}
1877
1878/// Perform a local reduction.
1879///
1880/// # Examples
1881///
1882/// See `examples/reduce.rs`
1883///
1884/// # Standard section(s)
1885///
1886/// 5.9.7
1887#[allow(clippy::needless_pass_by_value)]
1888pub fn reduce_local_into<S: ?Sized, R: ?Sized, O>(inbuf: &S, inoutbuf: &mut R, op: O)
1889where
1890 S: Buffer,
1891 R: BufferMut,
1892 O: Operation,
1893{
1894 unsafe {
1895 ffi::MPI_Reduce_local(
1896 inbuf.pointer(),
1897 inoutbuf.pointer_mut(),
1898 inbuf.count(),
1899 inbuf.as_datatype().as_raw(),
1900 op.as_raw(),
1901 );
1902 }
1903}