mpi/collective.rs
1//! Collective communication
2//!
3//! Developing...
4//!
5//! # Unfinished features
6//!
7//! - **5.8**: All-to-all, `MPI_Alltoallw()`
8//! - **5.10**: Reduce-scatter, `MPI_Reduce_scatter()`
9//! - **5.12**: Nonblocking collective operations,
10//! `MPI_Ialltoallw()`, `MPI_Ireduce_scatter()`
11
12use std::ffi::{CString, NulError};
13#[cfg(feature = "user-operations")]
14use std::mem;
15use std::os::raw::{c_char, c_int, c_void};
16use std::process::Command;
17use std::{fmt, ptr};
18
19use conv::ConvUtil;
20#[cfg(feature = "user-operations")]
21use libffi::middle::{Cif, Closure, Type};
22
23use crate::ffi::MPI_Op;
24use crate::{ffi, MpiError};
25
26use crate::datatype::traits::*;
27#[cfg(feature = "user-operations")]
28use crate::datatype::{DatatypeRef, DynBuffer, DynBufferMut};
29use crate::raw::traits::*;
30use crate::request::{Request, Scope, StaticScope};
31use crate::topology::{traits::*, InterCommunicator};
32use crate::topology::{Process, Rank};
33use crate::with_uninitialized;
34
35/// Collective communication traits
36pub mod traits {
37 pub use super::{CommunicatorCollectives, Operation, Root};
38}
39
40/// Collective communication patterns defined on `Communicator`s
41pub trait CommunicatorCollectives: Communicator {
42 /// Barrier synchronization among all processes in a `Communicator`
43 ///
44 /// Partake in a barrier synchronization across all processes in the `Communicator` `&self`.
45 ///
46 /// Calling processes (or threads within the calling processes) will enter the barrier and block
47 /// execution until all processes in the `Communicator` `&self` have entered the barrier.
48 ///
49 /// # Examples
50 ///
51 /// See `examples/barrier.rs`
52 ///
53 /// # Standard section(s)
54 ///
55 /// 5.3
56 fn barrier(&self) {
57 unsafe {
58 ffi::MPI_Barrier(self.as_raw());
59 }
60 }
61
62 /// Gather contents of buffers on all participating processes.
63 ///
64 /// After the call completes, the contents of the send `Buffer`s on all processes will be
65 /// concatenated into the receive `Buffer`s on all ranks.
66 ///
67 /// All send `Buffer`s must contain the same count of elements.
68 ///
69 /// # Examples
70 ///
71 /// See `examples/all_gather.rs`
72 ///
73 /// # Standard section(s)
74 ///
75 /// 5.7
76 fn all_gather_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
77 where
78 S: Buffer,
79 R: BufferMut,
80 {
81 unsafe {
82 ffi::MPI_Allgather(
83 sendbuf.pointer(),
84 sendbuf.count(),
85 sendbuf.as_datatype().as_raw(),
86 recvbuf.pointer_mut(),
87 recvbuf.count() / self.target_size(),
88 recvbuf.as_datatype().as_raw(),
89 self.as_raw(),
90 );
91 }
92 }
93
94 /// Gather contents of buffers on all participating processes.
95 ///
96 /// After the call completes, the contents of the send `Buffer`s on all processes will be
97 /// concatenated into the receive `Buffer`s on all ranks.
98 ///
99 /// The send `Buffer`s may contain different counts of elements on different processes. The
100 /// distribution of elements in the receive `Buffer`s is specified via `Partitioned`.
101 ///
102 /// # Examples
103 ///
104 /// See `examples/all_gather_varcount.rs`
105 ///
106 /// # Standard section(s)
107 ///
108 /// 5.7
109 fn all_gather_varcount_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
110 where
111 S: Buffer,
112 R: PartitionedBufferMut,
113 {
114 unsafe {
115 ffi::MPI_Allgatherv(
116 sendbuf.pointer(),
117 sendbuf.count(),
118 sendbuf.as_datatype().as_raw(),
119 recvbuf.pointer_mut(),
120 recvbuf.counts().as_ptr(),
121 recvbuf.displs().as_ptr(),
122 recvbuf.as_datatype().as_raw(),
123 self.as_raw(),
124 );
125 }
126 }
127
128 /// Distribute the send `Buffer`s from all processes to the receive `Buffer`s on all processes.
129 ///
130 /// Each process sends and receives the same count of elements to and from each process.
131 ///
132 /// # Examples
133 ///
134 /// See `examples/all_to_all.rs`
135 ///
136 /// # Standard section(s)
137 ///
138 /// 5.8
139 fn all_to_all_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
140 where
141 S: Buffer,
142 R: BufferMut,
143 {
144 let c_size = self.target_size();
145 unsafe {
146 ffi::MPI_Alltoall(
147 sendbuf.pointer(),
148 sendbuf.count() / c_size,
149 sendbuf.as_datatype().as_raw(),
150 recvbuf.pointer_mut(),
151 recvbuf.count() / c_size,
152 recvbuf.as_datatype().as_raw(),
153 self.as_raw(),
154 );
155 }
156 }
157
158 /// Distribute the send `Buffer`s from all processes to the receive `Buffer`s on all processes.
159 ///
160 /// The count of elements to send and receive to and from each process can vary and is specified
161 /// using `Partitioned`.
162 ///
163 /// # Standard section(s)
164 ///
165 /// 5.8
166 fn all_to_all_varcount_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
167 where
168 S: PartitionedBuffer,
169 R: PartitionedBufferMut,
170 {
171 unsafe {
172 ffi::MPI_Alltoallv(
173 sendbuf.pointer(),
174 sendbuf.counts().as_ptr(),
175 sendbuf.displs().as_ptr(),
176 sendbuf.as_datatype().as_raw(),
177 recvbuf.pointer_mut(),
178 recvbuf.counts().as_ptr(),
179 recvbuf.displs().as_ptr(),
180 recvbuf.as_datatype().as_raw(),
181 self.as_raw(),
182 );
183 }
184 }
185
186 /// Performs a global reduction under the operation `op` of the input data in `sendbuf` and
187 /// stores the result in `recvbuf` on all processes.
188 ///
189 /// # Examples
190 ///
191 /// See `examples/reduce.rs`
192 ///
193 /// # Standard section(s)
194 ///
195 /// 5.9.6
196 fn all_reduce_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
197 where
198 S: Buffer,
199 R: BufferMut,
200 O: Operation,
201 {
202 unsafe {
203 ffi::MPI_Allreduce(
204 sendbuf.pointer(),
205 recvbuf.pointer_mut(),
206 sendbuf.count(),
207 sendbuf.as_datatype().as_raw(),
208 op.as_raw(),
209 self.as_raw(),
210 );
211 }
212 }
213
214 /// Performs an element-wise global reduction under the operation `op` of the input data in
215 /// `sendbuf` and scatters the result into equal sized blocks in the receive buffers on all
216 /// processes.
217 ///
218 /// # Examples
219 ///
220 /// See `examples/reduce.rs`
221 ///
222 /// # Standard section(s)
223 ///
224 /// 5.10.1
225 fn reduce_scatter_block_into<S: ?Sized, R: ?Sized, O>(
226 &self,
227 sendbuf: &S,
228 recvbuf: &mut R,
229 op: O,
230 ) where
231 S: Buffer,
232 R: BufferMut,
233 O: Operation,
234 {
235 assert_eq!(recvbuf.count() * self.target_size(), sendbuf.count());
236 unsafe {
237 ffi::MPI_Reduce_scatter_block(
238 sendbuf.pointer(),
239 recvbuf.pointer_mut(),
240 recvbuf.count(),
241 sendbuf.as_datatype().as_raw(),
242 op.as_raw(),
243 self.as_raw(),
244 );
245 }
246 }
247
248 /// Performs a global inclusive prefix reduction of the data in `sendbuf` into `recvbuf` under
249 /// operation `op`.
250 ///
251 /// # Examples
252 ///
253 /// See `examples/scan.rs`
254 ///
255 /// # Standard section(s)
256 ///
257 /// 5.11.1
258 fn scan_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
259 where
260 S: Buffer,
261 R: BufferMut,
262 O: Operation,
263 {
264 unsafe {
265 ffi::MPI_Scan(
266 sendbuf.pointer(),
267 recvbuf.pointer_mut(),
268 sendbuf.count(),
269 sendbuf.as_datatype().as_raw(),
270 op.as_raw(),
271 self.as_raw(),
272 );
273 }
274 }
275
276 /// Performs a global exclusive prefix reduction of the data in `sendbuf` into `recvbuf` under
277 /// operation `op`.
278 ///
279 /// # Examples
280 ///
281 /// See `examples/scan.rs`
282 ///
283 /// # Standard section(s)
284 ///
285 /// 5.11.2
286 fn exclusive_scan_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
287 where
288 S: Buffer,
289 R: BufferMut,
290 O: Operation,
291 {
292 unsafe {
293 ffi::MPI_Exscan(
294 sendbuf.pointer(),
295 recvbuf.pointer_mut(),
296 sendbuf.count(),
297 sendbuf.as_datatype().as_raw(),
298 op.as_raw(),
299 self.as_raw(),
300 );
301 }
302 }
303
304 /// Non-blocking barrier synchronization among all processes in a `Communicator`
305 ///
306 /// Calling processes (or threads within the calling processes) enter the barrier. Completion
307 /// methods on the associated request object will block until all processes have entered.
308 ///
309 /// # Examples
310 ///
311 /// See `examples/immediate_barrier.rs`
312 ///
313 /// # Standard section(s)
314 ///
315 /// 5.12.1
316 fn immediate_barrier(&self) -> Request<'static, ()> {
317 unsafe {
318 Request::from_raw(
319 with_uninitialized(|request| ffi::MPI_Ibarrier(self.as_raw(), request)).1,
320 &(),
321 StaticScope,
322 )
323 }
324 }
325
326 /// Initiate non-blocking gather of the contents of all `sendbuf`s into all `rcevbuf`s on all
327 /// processes in the communicator.
328 ///
329 /// # Examples
330 ///
331 /// See `examples/immediate_all_gather.rs`
332 ///
333 /// # Standard section(s)
334 ///
335 /// 5.12.5
336 fn immediate_all_gather_into<'a, S: ?Sized, R: ?Sized, Sc>(
337 &self,
338 scope: Sc,
339 sendbuf: &'a S,
340 recvbuf: &'a mut R,
341 ) -> Request<'a, R, Sc>
342 where
343 S: 'a + Buffer,
344 R: 'a + BufferMut,
345 Sc: Scope<'a>,
346 {
347 unsafe {
348 let recvcount = recvbuf.count() / self.target_size();
349 Request::from_raw(
350 with_uninitialized(|request| {
351 ffi::MPI_Iallgather(
352 sendbuf.pointer(),
353 sendbuf.count(),
354 sendbuf.as_datatype().as_raw(),
355 recvbuf.pointer_mut(),
356 recvcount,
357 recvbuf.as_datatype().as_raw(),
358 self.as_raw(),
359 request,
360 )
361 })
362 .1,
363 recvbuf,
364 scope,
365 )
366 }
367 }
368
369 /// Initiate non-blocking gather of the contents of all `sendbuf`s into all `rcevbuf`s on all
370 /// processes in the communicator.
371 ///
372 /// # Examples
373 ///
374 /// See `examples/immediate_all_gather_varcount.rs`
375 ///
376 /// # Standard section(s)
377 ///
378 /// 5.12.5
379 fn immediate_all_gather_varcount_into<'a, S: ?Sized, R: ?Sized, Sc>(
380 &self,
381 scope: Sc,
382 sendbuf: &'a S,
383 recvbuf: &'a mut R,
384 ) -> Request<'a, R, Sc>
385 where
386 S: 'a + Buffer,
387 R: 'a + PartitionedBufferMut,
388 Sc: Scope<'a>,
389 {
390 unsafe {
391 Request::from_raw(
392 with_uninitialized(|request| {
393 ffi::MPI_Iallgatherv(
394 sendbuf.pointer(),
395 sendbuf.count(),
396 sendbuf.as_datatype().as_raw(),
397 recvbuf.pointer_mut(),
398 recvbuf.counts().as_ptr(),
399 recvbuf.displs().as_ptr(),
400 recvbuf.as_datatype().as_raw(),
401 self.as_raw(),
402 request,
403 )
404 })
405 .1,
406 recvbuf,
407 scope,
408 )
409 }
410 }
411
412 /// Initiate non-blocking all-to-all communication.
413 ///
414 /// # Examples
415 ///
416 /// See `examples/immediate_all_to_all.rs`
417 ///
418 /// # Standard section(s)
419 ///
420 /// 5.12.6
421 fn immediate_all_to_all_into<'a, S: ?Sized, R: ?Sized, Sc>(
422 &self,
423 scope: Sc,
424 sendbuf: &'a S,
425 recvbuf: &'a mut R,
426 ) -> Request<'a, R, Sc>
427 where
428 S: 'a + Buffer,
429 R: 'a + BufferMut,
430 Sc: Scope<'a>,
431 {
432 let c_size = self.target_size();
433 unsafe {
434 Request::from_raw(
435 with_uninitialized(|request| {
436 ffi::MPI_Ialltoall(
437 sendbuf.pointer(),
438 sendbuf.count() / c_size,
439 sendbuf.as_datatype().as_raw(),
440 recvbuf.pointer_mut(),
441 recvbuf.count() / c_size,
442 recvbuf.as_datatype().as_raw(),
443 self.as_raw(),
444 request,
445 )
446 })
447 .1,
448 recvbuf,
449 scope,
450 )
451 }
452 }
453
454 /// Initiate non-blocking all-to-all communication.
455 ///
456 /// # Standard section(s)
457 ///
458 /// 5.12.6
459 fn immediate_all_to_all_varcount_into<'a, S: ?Sized, R: ?Sized, Sc>(
460 &self,
461 scope: Sc,
462 sendbuf: &'a S,
463 recvbuf: &'a mut R,
464 ) -> Request<'a, R, Sc>
465 where
466 S: 'a + PartitionedBuffer,
467 R: 'a + PartitionedBufferMut,
468 Sc: Scope<'a>,
469 {
470 unsafe {
471 Request::from_raw(
472 with_uninitialized(|request| {
473 ffi::MPI_Ialltoallv(
474 sendbuf.pointer(),
475 sendbuf.counts().as_ptr(),
476 sendbuf.displs().as_ptr(),
477 sendbuf.as_datatype().as_raw(),
478 recvbuf.pointer_mut(),
479 recvbuf.counts().as_ptr(),
480 recvbuf.displs().as_ptr(),
481 recvbuf.as_datatype().as_raw(),
482 self.as_raw(),
483 request,
484 )
485 })
486 .1,
487 recvbuf,
488 scope,
489 )
490 }
491 }
492
493 /// Initiates a non-blocking global reduction under the operation `op` of the input data in
494 /// `sendbuf` and stores the result in `recvbuf` on all processes.
495 ///
496 /// # Examples
497 ///
498 /// See `examples/immediate_reduce.rs`
499 ///
500 /// # Standard section(s)
501 ///
502 /// 5.12.8
503 fn immediate_all_reduce_into<'a, S: ?Sized, R: ?Sized, O, Sc>(
504 &self,
505 scope: Sc,
506 sendbuf: &'a S,
507 recvbuf: &'a mut R,
508 op: O,
509 ) -> Request<'a, R, Sc>
510 where
511 S: 'a + Buffer,
512 R: 'a + BufferMut,
513 O: 'a + Operation,
514 Sc: Scope<'a>,
515 {
516 unsafe {
517 Request::from_raw(
518 with_uninitialized(|request| {
519 ffi::MPI_Iallreduce(
520 sendbuf.pointer(),
521 recvbuf.pointer_mut(),
522 sendbuf.count(),
523 sendbuf.as_datatype().as_raw(),
524 op.as_raw(),
525 self.as_raw(),
526 request,
527 )
528 })
529 .1,
530 recvbuf,
531 scope,
532 )
533 }
534 }
535
536 /// Initiates a non-blocking element-wise global reduction under the operation `op` of the
537 /// input data in `sendbuf` and scatters the result into equal sized blocks in the receive
538 /// buffers on all processes.
539 ///
540 /// # Examples
541 ///
542 /// See `examples/immediate_reduce.rs`
543 ///
544 /// # Standard section(s)
545 ///
546 /// 5.12.9
547 fn immediate_reduce_scatter_block_into<'a, S: ?Sized, R: ?Sized, O, Sc>(
548 &self,
549 scope: Sc,
550 sendbuf: &'a S,
551 recvbuf: &'a mut R,
552 op: O,
553 ) -> Request<'a, R, Sc>
554 where
555 S: 'a + Buffer,
556 R: 'a + BufferMut,
557 O: 'a + Operation,
558 Sc: Scope<'a>,
559 {
560 assert_eq!(recvbuf.count() * self.target_size(), sendbuf.count());
561 unsafe {
562 Request::from_raw(
563 with_uninitialized(|request| {
564 ffi::MPI_Ireduce_scatter_block(
565 sendbuf.pointer(),
566 recvbuf.pointer_mut(),
567 recvbuf.count(),
568 sendbuf.as_datatype().as_raw(),
569 op.as_raw(),
570 self.as_raw(),
571 request,
572 )
573 })
574 .1,
575 recvbuf,
576 scope,
577 )
578 }
579 }
580
581 /// Initiates a non-blocking global inclusive prefix reduction of the data in `sendbuf` into
582 /// `recvbuf` under operation `op`.
583 ///
584 /// # Examples
585 ///
586 /// See `examples/immediate_scan.rs`
587 ///
588 /// # Standard section(s)
589 ///
590 /// 5.12.11
591 fn immediate_scan_into<'a, S: ?Sized, R: ?Sized, O, Sc>(
592 &self,
593 scope: Sc,
594 sendbuf: &'a S,
595 recvbuf: &'a mut R,
596 op: O,
597 ) -> Request<'a, R, Sc>
598 where
599 S: 'a + Buffer,
600 R: 'a + BufferMut,
601 O: 'a + Operation,
602 Sc: Scope<'a>,
603 {
604 unsafe {
605 Request::from_raw(
606 with_uninitialized(|request| {
607 ffi::MPI_Iscan(
608 sendbuf.pointer(),
609 recvbuf.pointer_mut(),
610 sendbuf.count(),
611 sendbuf.as_datatype().as_raw(),
612 op.as_raw(),
613 self.as_raw(),
614 request,
615 )
616 })
617 .1,
618 recvbuf,
619 scope,
620 )
621 }
622 }
623
624 /// Initiates a non-blocking global exclusive prefix reduction of the data in `sendbuf` into
625 /// `recvbuf` under operation `op`.
626 ///
627 /// # Examples
628 ///
629 /// See `examples/immediate_scan.rs`
630 ///
631 /// # Standard section(s)
632 ///
633 /// 5.12.12
634 fn immediate_exclusive_scan_into<'a, S: ?Sized, R: ?Sized, O, Sc>(
635 &self,
636 scope: Sc,
637 sendbuf: &'a S,
638 recvbuf: &'a mut R,
639 op: O,
640 ) -> Request<'a, R, Sc>
641 where
642 S: 'a + Buffer,
643 R: 'a + BufferMut,
644 O: 'a + Operation,
645 Sc: Scope<'a>,
646 {
647 unsafe {
648 Request::from_raw(
649 with_uninitialized(|request| {
650 ffi::MPI_Iexscan(
651 sendbuf.pointer(),
652 recvbuf.pointer_mut(),
653 sendbuf.count(),
654 sendbuf.as_datatype().as_raw(),
655 op.as_raw(),
656 self.as_raw(),
657 request,
658 )
659 })
660 .1,
661 recvbuf,
662 scope,
663 )
664 }
665 }
666}
667
668impl<C: Communicator + ?Sized> CommunicatorCollectives for C {}
669
670/// Something that can take the role of 'root' in a collective operation.
671///
672/// Many collective operations define a 'root' process that takes a special role in the
673/// communication. These collective operations are implemented as default methods of this trait.
674pub trait Root: AsCommunicator {
675 /// Rank of the root process
676 fn root_rank(&self) -> Rank;
677
678 /// Broadcast of the contents of a buffer
679 ///
680 /// After the call completes, the `Buffer` on all processes in the `Communicator` of the `Root`
681 /// `&self` will contain what it contains on the `Root`.
682 ///
683 /// # Examples
684 ///
685 /// See `examples/broadcast.rs`
686 ///
687 /// # Standard section(s)
688 ///
689 /// 5.4
690 fn broadcast_into<Buf: ?Sized>(&self, buffer: &mut Buf)
691 where
692 Buf: BufferMut,
693 {
694 unsafe {
695 ffi::MPI_Bcast(
696 buffer.pointer_mut(),
697 buffer.count(),
698 buffer.as_datatype().as_raw(),
699 self.root_rank(),
700 self.as_communicator().as_raw(),
701 );
702 }
703 }
704
705 /// Gather contents of buffers on `Root`.
706 ///
707 /// After the call completes, the contents of the `Buffer`s on all ranks will be
708 /// concatenated into the `Buffer` on `Root`.
709 ///
710 /// All send `Buffer`s must have the same count of elements.
711 ///
712 /// This function must be called on all non-root processes.
713 ///
714 /// # Examples
715 ///
716 /// See `examples/gather.rs`
717 ///
718 /// # Standard section(s)
719 ///
720 /// 5.5
721 fn gather_into<S: ?Sized>(&self, sendbuf: &S)
722 where
723 S: Buffer,
724 {
725 assert_ne!(self.as_communicator().rank(), self.root_rank());
726 unsafe {
727 ffi::MPI_Gather(
728 sendbuf.pointer(),
729 sendbuf.count(),
730 sendbuf.as_datatype().as_raw(),
731 ptr::null_mut(),
732 0,
733 u8::equivalent_datatype().as_raw(),
734 self.root_rank(),
735 self.as_communicator().as_raw(),
736 );
737 }
738 }
739
740 /// Gather contents of buffers on `Root`.
741 ///
742 /// After the call completes, the contents of the `Buffer`s on all ranks will be
743 /// concatenated into the `Buffer` on `Root`.
744 ///
745 /// All send `Buffer`s must have the same count of elements.
746 ///
747 /// This function must be called on the root process.
748 ///
749 /// # Examples
750 ///
751 /// See `examples/gather.rs`
752 ///
753 /// # Standard section(s)
754 ///
755 /// 5.5
756 fn gather_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
757 where
758 S: Buffer,
759 R: BufferMut,
760 {
761 assert_eq!(self.as_communicator().rank(), self.root_rank());
762 unsafe {
763 let recvcount = recvbuf.count() / self.as_communicator().target_size();
764 ffi::MPI_Gather(
765 sendbuf.pointer(),
766 sendbuf.count(),
767 sendbuf.as_datatype().as_raw(),
768 recvbuf.pointer_mut(),
769 recvcount,
770 recvbuf.as_datatype().as_raw(),
771 self.root_rank(),
772 self.as_communicator().as_raw(),
773 );
774 }
775 }
776
777 /// Gather contents of buffers on `Root`.
778 ///
779 /// After the call completes, the contents of the `Buffer`s on all ranks will be
780 /// concatenated into the `Buffer` on `Root`.
781 ///
782 /// The send `Buffer`s may contain different counts of elements on different processes. The
783 /// distribution of elements in the receive `Buffer` is specified via `Partitioned`.
784 ///
785 /// This function must be called on all non-root processes.
786 ///
787 /// # Examples
788 ///
789 /// See `examples/gather_varcount.rs`
790 ///
791 /// # Standard section(s)
792 ///
793 /// 5.5
794 fn gather_varcount_into<S: ?Sized>(&self, sendbuf: &S)
795 where
796 S: Buffer,
797 {
798 assert_ne!(self.as_communicator().rank(), self.root_rank());
799 unsafe {
800 ffi::MPI_Gatherv(
801 sendbuf.pointer(),
802 sendbuf.count(),
803 sendbuf.as_datatype().as_raw(),
804 ptr::null_mut(),
805 ptr::null(),
806 ptr::null(),
807 u8::equivalent_datatype().as_raw(),
808 self.root_rank(),
809 self.as_communicator().as_raw(),
810 );
811 }
812 }
813
814 /// Gather contents of buffers on `Root`.
815 ///
816 /// After the call completes, the contents of the `Buffer`s on all ranks will be
817 /// concatenated into the `Buffer` on `Root`.
818 ///
819 /// The send `Buffer`s may contain different counts of elements on different processes. The
820 /// distribution of elements in the receive `Buffer` is specified via `Partitioned`.
821 ///
822 /// This function must be called on the root process.
823 ///
824 /// # Examples
825 ///
826 /// See `examples/gather_varcount.rs`
827 ///
828 /// # Standard section(s)
829 ///
830 /// 5.5
831 fn gather_varcount_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
832 where
833 S: Buffer,
834 R: PartitionedBufferMut,
835 {
836 assert_eq!(self.as_communicator().rank(), self.root_rank());
837 unsafe {
838 ffi::MPI_Gatherv(
839 sendbuf.pointer(),
840 sendbuf.count(),
841 sendbuf.as_datatype().as_raw(),
842 recvbuf.pointer_mut(),
843 recvbuf.counts().as_ptr(),
844 recvbuf.displs().as_ptr(),
845 recvbuf.as_datatype().as_raw(),
846 self.root_rank(),
847 self.as_communicator().as_raw(),
848 );
849 }
850 }
851
852 /// Scatter contents of a buffer on the root process to all processes.
853 ///
854 /// After the call completes each participating process will have received a part of the send
855 /// `Buffer` on the root process.
856 ///
857 /// All send `Buffer`s must have the same count of elements.
858 ///
859 /// This function must be called on all non-root processes.
860 ///
861 /// # Examples
862 ///
863 /// See `examples/scatter.rs`
864 ///
865 /// # Standard section(s)
866 ///
867 /// 5.6
868 fn scatter_into<R: ?Sized>(&self, recvbuf: &mut R)
869 where
870 R: BufferMut,
871 {
872 assert_ne!(self.as_communicator().rank(), self.root_rank());
873 unsafe {
874 ffi::MPI_Scatter(
875 ptr::null(),
876 0,
877 u8::equivalent_datatype().as_raw(),
878 recvbuf.pointer_mut(),
879 recvbuf.count(),
880 recvbuf.as_datatype().as_raw(),
881 self.root_rank(),
882 self.as_communicator().as_raw(),
883 );
884 }
885 }
886
887 /// Scatter contents of a buffer on the root process to all processes.
888 ///
889 /// After the call completes each participating process will have received a part of the send
890 /// `Buffer` on the root process.
891 ///
892 /// All send `Buffer`s must have the same count of elements.
893 ///
894 /// This function must be called on the root process.
895 ///
896 /// # Examples
897 ///
898 /// See `examples/scatter.rs`
899 ///
900 /// # Standard section(s)
901 ///
902 /// 5.6
903 fn scatter_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
904 where
905 S: Buffer,
906 R: BufferMut,
907 {
908 assert_eq!(self.as_communicator().rank(), self.root_rank());
909 let sendcount = sendbuf.count() / self.as_communicator().target_size();
910 unsafe {
911 ffi::MPI_Scatter(
912 sendbuf.pointer(),
913 sendcount,
914 sendbuf.as_datatype().as_raw(),
915 recvbuf.pointer_mut(),
916 recvbuf.count(),
917 recvbuf.as_datatype().as_raw(),
918 self.root_rank(),
919 self.as_communicator().as_raw(),
920 );
921 }
922 }
923
924 /// Scatter contents of a buffer on the root process to all processes.
925 ///
926 /// After the call completes each participating process will have received a part of the send
927 /// `Buffer` on the root process.
928 ///
929 /// The send `Buffer` may contain different counts of elements for different processes. The
930 /// distribution of elements in the send `Buffer` is specified via `Partitioned`.
931 ///
932 /// This function must be called on all non-root processes.
933 ///
934 /// # Examples
935 ///
936 /// See `examples/scatter_varcount.rs`
937 ///
938 /// # Standard section(s)
939 ///
940 /// 5.6
941 fn scatter_varcount_into<R: ?Sized>(&self, recvbuf: &mut R)
942 where
943 R: BufferMut,
944 {
945 assert_ne!(self.as_communicator().rank(), self.root_rank());
946 unsafe {
947 ffi::MPI_Scatterv(
948 ptr::null(),
949 ptr::null(),
950 ptr::null(),
951 u8::equivalent_datatype().as_raw(),
952 recvbuf.pointer_mut(),
953 recvbuf.count(),
954 recvbuf.as_datatype().as_raw(),
955 self.root_rank(),
956 self.as_communicator().as_raw(),
957 );
958 }
959 }
960
961 /// Scatter contents of a buffer on the root process to all processes.
962 ///
963 /// After the call completes each participating process will have received a part of the send
964 /// `Buffer` on the root process.
965 ///
966 /// The send `Buffer` may contain different counts of elements for different processes. The
967 /// distribution of elements in the send `Buffer` is specified via `Partitioned`.
968 ///
969 /// This function must be called on the root process.
970 ///
971 /// # Examples
972 ///
973 /// See `examples/scatter_varcount.rs`
974 ///
975 /// # Standard section(s)
976 ///
977 /// 5.6
978 fn scatter_varcount_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
979 where
980 S: PartitionedBuffer,
981 R: BufferMut,
982 {
983 assert_eq!(self.as_communicator().rank(), self.root_rank());
984 unsafe {
985 ffi::MPI_Scatterv(
986 sendbuf.pointer(),
987 sendbuf.counts().as_ptr(),
988 sendbuf.displs().as_ptr(),
989 sendbuf.as_datatype().as_raw(),
990 recvbuf.pointer_mut(),
991 recvbuf.count(),
992 recvbuf.as_datatype().as_raw(),
993 self.root_rank(),
994 self.as_communicator().as_raw(),
995 );
996 }
997 }
998
999 /// Performs a global reduction under the operation `op` of the input data in `sendbuf` and
1000 /// stores the result on the `Root` process.
1001 ///
1002 /// This function must be called on all non-root processes.
1003 ///
1004 /// # Examples
1005 ///
1006 /// See `examples/reduce.rs`
1007 ///
1008 /// # Standard section(s)
1009 ///
1010 /// 5.9.1
1011 fn reduce_into<S: ?Sized, O>(&self, sendbuf: &S, op: O)
1012 where
1013 S: Buffer,
1014 O: Operation,
1015 {
1016 assert_ne!(self.as_communicator().rank(), self.root_rank());
1017 unsafe {
1018 ffi::MPI_Reduce(
1019 sendbuf.pointer(),
1020 ptr::null_mut(),
1021 sendbuf.count(),
1022 sendbuf.as_datatype().as_raw(),
1023 op.as_raw(),
1024 self.root_rank(),
1025 self.as_communicator().as_raw(),
1026 );
1027 }
1028 }
1029
1030 /// Performs a global reduction under the operation `op` of the input data in `sendbuf` and
1031 /// stores the result on the `Root` process.
1032 ///
1033 /// This function must be called on the root process.
1034 ///
1035 /// # Examples
1036 ///
1037 /// See `examples/reduce.rs`
1038 ///
1039 /// # Standard section(s)
1040 ///
1041 /// 5.9.1
1042 fn reduce_into_root<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
1043 where
1044 S: Buffer,
1045 R: BufferMut,
1046 O: Operation,
1047 {
1048 assert_eq!(self.as_communicator().rank(), self.root_rank());
1049 unsafe {
1050 ffi::MPI_Reduce(
1051 sendbuf.pointer(),
1052 recvbuf.pointer_mut(),
1053 sendbuf.count(),
1054 sendbuf.as_datatype().as_raw(),
1055 op.as_raw(),
1056 self.root_rank(),
1057 self.as_communicator().as_raw(),
1058 );
1059 }
1060 }
1061
1062 /// Initiate broadcast of a value from the `Root` process to all other processes.
1063 ///
1064 /// # Examples
1065 ///
1066 /// See `examples/immediate_broadcast.rs`
1067 ///
1068 /// # Standard section(s)
1069 ///
1070 /// 5.12.2
1071 fn immediate_broadcast_into<'a, Buf: ?Sized, Sc>(
1072 &self,
1073 scope: Sc,
1074 buf: &'a mut Buf,
1075 ) -> Request<'a, Buf, Sc>
1076 where
1077 Buf: 'a + BufferMut,
1078 Sc: Scope<'a>,
1079 {
1080 unsafe {
1081 Request::from_raw(
1082 with_uninitialized(|request| {
1083 ffi::MPI_Ibcast(
1084 buf.pointer_mut(),
1085 buf.count(),
1086 buf.as_datatype().as_raw(),
1087 self.root_rank(),
1088 self.as_communicator().as_raw(),
1089 request,
1090 )
1091 })
1092 .1,
1093 buf,
1094 scope,
1095 )
1096 }
1097 }
1098
1099 /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1100 ///
1101 /// This function must be called on all non-root processes.
1102 ///
1103 /// # Examples
1104 ///
1105 /// See `examples/immediate_gather.rs`
1106 ///
1107 /// # Standard section(s)
1108 ///
1109 /// 5.12.3
1110 fn immediate_gather_into<'a, S: ?Sized, Sc>(
1111 &self,
1112 scope: Sc,
1113 sendbuf: &'a S,
1114 ) -> Request<'a, S, Sc>
1115 where
1116 S: 'a + Buffer,
1117 Sc: Scope<'a>,
1118 {
1119 assert_ne!(self.as_communicator().rank(), self.root_rank());
1120 unsafe {
1121 Request::from_raw(
1122 with_uninitialized(|request| {
1123 ffi::MPI_Igather(
1124 sendbuf.pointer(),
1125 sendbuf.count(),
1126 sendbuf.as_datatype().as_raw(),
1127 ptr::null_mut(),
1128 0,
1129 u8::equivalent_datatype().as_raw(),
1130 self.root_rank(),
1131 self.as_communicator().as_raw(),
1132 request,
1133 )
1134 })
1135 .1,
1136 sendbuf,
1137 scope,
1138 )
1139 }
1140 }
1141
1142 /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1143 ///
1144 /// This function must be called on the root processes.
1145 ///
1146 /// # Examples
1147 ///
1148 /// See `examples/immediate_gather.rs`
1149 ///
1150 /// # Standard section(s)
1151 ///
1152 /// 5.12.3
1153 fn immediate_gather_into_root<'a, S: ?Sized, R: ?Sized, Sc>(
1154 &self,
1155 scope: Sc,
1156 sendbuf: &'a S,
1157 recvbuf: &'a mut R,
1158 ) -> Request<'a, R, Sc>
1159 where
1160 S: 'a + Buffer,
1161 R: 'a + BufferMut,
1162 Sc: Scope<'a>,
1163 {
1164 assert_eq!(self.as_communicator().rank(), self.root_rank());
1165 unsafe {
1166 let recvcount = recvbuf.count() / self.as_communicator().target_size();
1167 Request::from_raw(
1168 with_uninitialized(|request| {
1169 ffi::MPI_Igather(
1170 sendbuf.pointer(),
1171 sendbuf.count(),
1172 sendbuf.as_datatype().as_raw(),
1173 recvbuf.pointer_mut(),
1174 recvcount,
1175 recvbuf.as_datatype().as_raw(),
1176 self.root_rank(),
1177 self.as_communicator().as_raw(),
1178 request,
1179 )
1180 })
1181 .1,
1182 recvbuf,
1183 scope,
1184 )
1185 }
1186 }
1187
1188 /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1189 ///
1190 /// This function must be called on all non-root processes.
1191 ///
1192 /// # Examples
1193 ///
1194 /// See `examples/immediate_gather_varcount.rs`
1195 ///
1196 /// # Standard section(s)
1197 ///
1198 /// 5.12.3
1199 fn immediate_gather_varcount_into<'a, Sc, S: ?Sized>(
1200 &self,
1201 scope: Sc,
1202 sendbuf: &'a S,
1203 ) -> Request<'a, S, Sc>
1204 where
1205 S: 'a + Buffer,
1206 Sc: Scope<'a>,
1207 {
1208 assert_ne!(self.as_communicator().rank(), self.root_rank());
1209 unsafe {
1210 Request::from_raw(
1211 with_uninitialized(|request| {
1212 ffi::MPI_Igatherv(
1213 sendbuf.pointer(),
1214 sendbuf.count(),
1215 sendbuf.as_datatype().as_raw(),
1216 ptr::null_mut(),
1217 ptr::null(),
1218 ptr::null(),
1219 u8::equivalent_datatype().as_raw(),
1220 self.root_rank(),
1221 self.as_communicator().as_raw(),
1222 request,
1223 )
1224 })
1225 .1,
1226 sendbuf,
1227 scope,
1228 )
1229 }
1230 }
1231
1232 /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1233 ///
1234 /// This function must be called on the root processes.
1235 ///
1236 /// # Examples
1237 ///
1238 /// See `examples/immediate_gather_varcount.rs`
1239 ///
1240 /// # Standard section(s)
1241 ///
1242 /// 5.12.3
1243 fn immediate_gather_varcount_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
1244 &self,
1245 scope: Sc,
1246 sendbuf: &'a S,
1247 recvbuf: &'a mut R,
1248 ) -> Request<'a, R, Sc>
1249 where
1250 S: 'a + Buffer,
1251 R: 'a + PartitionedBufferMut,
1252 Sc: Scope<'a>,
1253 {
1254 assert_eq!(self.as_communicator().rank(), self.root_rank());
1255 unsafe {
1256 Request::from_raw(
1257 with_uninitialized(|request| {
1258 ffi::MPI_Igatherv(
1259 sendbuf.pointer(),
1260 sendbuf.count(),
1261 sendbuf.as_datatype().as_raw(),
1262 recvbuf.pointer_mut(),
1263 recvbuf.counts().as_ptr(),
1264 recvbuf.displs().as_ptr(),
1265 recvbuf.as_datatype().as_raw(),
1266 self.root_rank(),
1267 self.as_communicator().as_raw(),
1268 request,
1269 )
1270 })
1271 .1,
1272 recvbuf,
1273 scope,
1274 )
1275 }
1276 }
1277
1278 /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1279 ///
1280 /// This function must be called on all non-root processes.
1281 ///
1282 /// # Examples
1283 ///
1284 /// See `examples/immediate_scatter.rs`
1285 ///
1286 /// # Standard section(s)
1287 ///
1288 /// 5.12.4
1289 fn immediate_scatter_into<'a, Sc, R: ?Sized>(
1290 &self,
1291 scope: Sc,
1292 recvbuf: &'a mut R,
1293 ) -> Request<'a, R, Sc>
1294 where
1295 R: 'a + BufferMut,
1296 Sc: Scope<'a>,
1297 {
1298 assert_ne!(self.as_communicator().rank(), self.root_rank());
1299 unsafe {
1300 Request::from_raw(
1301 with_uninitialized(|request| {
1302 ffi::MPI_Iscatter(
1303 ptr::null(),
1304 0,
1305 u8::equivalent_datatype().as_raw(),
1306 recvbuf.pointer_mut(),
1307 recvbuf.count(),
1308 recvbuf.as_datatype().as_raw(),
1309 self.root_rank(),
1310 self.as_communicator().as_raw(),
1311 request,
1312 )
1313 })
1314 .1,
1315 recvbuf,
1316 scope,
1317 )
1318 }
1319 }
1320
1321 /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1322 ///
1323 /// This function must be called on the root processes.
1324 ///
1325 /// # Examples
1326 ///
1327 /// See `examples/immediate_scatter.rs`
1328 ///
1329 /// # Standard section(s)
1330 ///
1331 /// 5.12.4
1332 fn immediate_scatter_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
1333 &self,
1334 scope: Sc,
1335 sendbuf: &'a S,
1336 recvbuf: &'a mut R,
1337 ) -> Request<'a, R, Sc>
1338 where
1339 S: 'a + Buffer,
1340 R: 'a + BufferMut,
1341 Sc: Scope<'a>,
1342 {
1343 assert_eq!(self.as_communicator().rank(), self.root_rank());
1344 unsafe {
1345 let sendcount = sendbuf.count() / self.as_communicator().target_size();
1346 Request::from_raw(
1347 with_uninitialized(|request| {
1348 ffi::MPI_Iscatter(
1349 sendbuf.pointer(),
1350 sendcount,
1351 sendbuf.as_datatype().as_raw(),
1352 recvbuf.pointer_mut(),
1353 recvbuf.count(),
1354 recvbuf.as_datatype().as_raw(),
1355 self.root_rank(),
1356 self.as_communicator().as_raw(),
1357 request,
1358 )
1359 })
1360 .1,
1361 recvbuf,
1362 scope,
1363 )
1364 }
1365 }
1366
1367 /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1368 ///
1369 /// This function must be called on all non-root processes.
1370 ///
1371 /// # Examples
1372 ///
1373 /// See `examples/immediate_scatter_varcount.rs`
1374 ///
1375 /// # Standard section(s)
1376 ///
1377 /// 5.12.4
1378 fn immediate_scatter_varcount_into<'a, Sc, R: ?Sized>(
1379 &self,
1380 scope: Sc,
1381 recvbuf: &'a mut R,
1382 ) -> Request<'a, R, Sc>
1383 where
1384 R: 'a + BufferMut,
1385 Sc: Scope<'a>,
1386 {
1387 assert_ne!(self.as_communicator().rank(), self.root_rank());
1388 unsafe {
1389 Request::from_raw(
1390 with_uninitialized(|request| {
1391 ffi::MPI_Iscatterv(
1392 ptr::null(),
1393 ptr::null(),
1394 ptr::null(),
1395 u8::equivalent_datatype().as_raw(),
1396 recvbuf.pointer_mut(),
1397 recvbuf.count(),
1398 recvbuf.as_datatype().as_raw(),
1399 self.root_rank(),
1400 self.as_communicator().as_raw(),
1401 request,
1402 )
1403 })
1404 .1,
1405 recvbuf,
1406 scope,
1407 )
1408 }
1409 }
1410
1411 /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1412 ///
1413 /// This function must be called on the root processes.
1414 ///
1415 /// # Examples
1416 ///
1417 /// See `examples/immediate_scatter_varcount.rs`
1418 ///
1419 /// # Standard section(s)
1420 ///
1421 /// 5.12.4
1422 fn immediate_scatter_varcount_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
1423 &self,
1424 scope: Sc,
1425 sendbuf: &'a S,
1426 recvbuf: &'a mut R,
1427 ) -> Request<'a, R, Sc>
1428 where
1429 S: 'a + PartitionedBuffer,
1430 R: 'a + BufferMut,
1431 Sc: Scope<'a>,
1432 {
1433 assert_eq!(self.as_communicator().rank(), self.root_rank());
1434 unsafe {
1435 Request::from_raw(
1436 with_uninitialized(|request| {
1437 ffi::MPI_Iscatterv(
1438 sendbuf.pointer(),
1439 sendbuf.counts().as_ptr(),
1440 sendbuf.displs().as_ptr(),
1441 sendbuf.as_datatype().as_raw(),
1442 recvbuf.pointer_mut(),
1443 recvbuf.count(),
1444 recvbuf.as_datatype().as_raw(),
1445 self.root_rank(),
1446 self.as_communicator().as_raw(),
1447 request,
1448 )
1449 })
1450 .1,
1451 recvbuf,
1452 scope,
1453 )
1454 }
1455 }
1456
1457 /// Initiates a non-blacking global reduction under the operation `op` of the input data in
1458 /// `sendbuf` and stores the result on the `Root` process.
1459 ///
1460 /// This function must be called on all non-root processes.
1461 ///
1462 /// # Examples
1463 ///
1464 /// See `examples/immediate_reduce.rs`
1465 ///
1466 /// # Standard section(s)
1467 ///
1468 /// 5.12.7
1469 fn immediate_reduce_into<'a, Sc, S: ?Sized, O>(
1470 &self,
1471 scope: Sc,
1472 sendbuf: &'a S,
1473 op: O,
1474 ) -> Request<'a, S, Sc>
1475 where
1476 S: 'a + Buffer,
1477 O: 'a + Operation,
1478 Sc: Scope<'a>,
1479 {
1480 assert_ne!(self.as_communicator().rank(), self.root_rank());
1481 unsafe {
1482 Request::from_raw(
1483 with_uninitialized(|request| {
1484 ffi::MPI_Ireduce(
1485 sendbuf.pointer(),
1486 ptr::null_mut(),
1487 sendbuf.count(),
1488 sendbuf.as_datatype().as_raw(),
1489 op.as_raw(),
1490 self.root_rank(),
1491 self.as_communicator().as_raw(),
1492 request,
1493 )
1494 })
1495 .1,
1496 sendbuf,
1497 scope,
1498 )
1499 }
1500 }
1501
1502 /// Initiates a non-blocking global reduction under the operation `op` of the input data in
1503 /// `sendbuf` and stores the result on the `Root` process.
1504 ///
1505 /// # Examples
1506 ///
1507 /// See `examples/immediate_reduce.rs`
1508 ///
1509 /// This function must be called on the root process.
1510 ///
1511 /// # Standard section(s)
1512 ///
1513 /// 5.12.7
1514 fn immediate_reduce_into_root<'a, Sc, S: ?Sized, R: ?Sized, O>(
1515 &self,
1516 scope: Sc,
1517 sendbuf: &'a S,
1518 recvbuf: &'a mut R,
1519 op: O,
1520 ) -> Request<'a, R, Sc>
1521 where
1522 S: 'a + Buffer,
1523 R: 'a + BufferMut,
1524 O: 'a + Operation,
1525 Sc: Scope<'a>,
1526 {
1527 assert_eq!(self.as_communicator().rank(), self.root_rank());
1528 unsafe {
1529 Request::from_raw(
1530 with_uninitialized(|request| {
1531 ffi::MPI_Ireduce(
1532 sendbuf.pointer(),
1533 recvbuf.pointer_mut(),
1534 sendbuf.count(),
1535 sendbuf.as_datatype().as_raw(),
1536 op.as_raw(),
1537 self.root_rank(),
1538 self.as_communicator().as_raw(),
1539 request,
1540 )
1541 })
1542 .1,
1543 recvbuf,
1544 scope,
1545 )
1546 }
1547 }
1548
1549 /// Spawns child processes
1550 ///
1551 /// # Standard sections
1552 /// 10.3.2, see MPI_Comm_spawn
1553 fn spawn(&self, command: &Command, maxprocs: Rank) -> Result<InterCommunicator, MpiError> {
1554 // Environment variables can be handled using the info key
1555 assert_eq!(
1556 command.get_envs().len(),
1557 0,
1558 "Support for environment variables not yet implemented"
1559 );
1560
1561 // The Microsoft-MPI implementation treats the char* arguments as being
1562 // encoded in utf8, and are internally converted to Windows wide
1563 // characters. See ConvertArgs() using
1564 // [`MultiByteToWideChar`](https://learn.microsoft.com/en-us/windows/win32/api/stringapiset/nf-stringapiset-multibytetowidechar)
1565 // with `CP_UTF8` when called from MPIDI_Comm_spawn_multiple().
1566 // https://github.com/microsoft/Microsoft-MPI/blob/7ff6bdcdb1d5dc7b791e47457ee2686cd6b3d355/src/mpi/msmpi/mpid/dynamic.cpp#L2074
1567 //
1568 // Since Windows wide-char strings are allowed to contain invalid
1569 // UTF-16, such characters cannot be preserved. We'll choose lossy
1570 // conversion, but returning an error is an option to consider.
1571 let prog = CString::new(command.get_program().to_string_lossy().as_bytes())?;
1572 let mut args: Vec<CString> = command
1573 .get_args()
1574 .map(|os| CString::new(os.to_string_lossy().as_bytes()))
1575 .collect::<Result<Vec<CString>, NulError>>()?;
1576 // We must retain args above so that the strings are not dropped while
1577 // being used. An alternative that seems to be recommended any time the
1578 // function takes mutable C strings is to use CString::into_raw to give
1579 // ownership to the *mut c_char for the function call, then reclaim
1580 // using CString::from_raw.
1581 let mut argv: Vec<*mut c_char> = args
1582 .iter_mut()
1583 .map(|s| s.as_ptr() as *mut c_char)
1584 .chain(std::iter::once(ptr::null_mut()))
1585 .collect();
1586
1587 let mut result = unsafe { ffi::RSMPI_COMM_NULL };
1588 let mut errcodes: Vec<c_int> =
1589 vec![0; maxprocs.value_as().expect("maxprocs should be positive")];
1590
1591 unsafe {
1592 ffi::MPI_Comm_spawn(
1593 prog.as_ptr(),
1594 argv.as_mut_ptr(),
1595 maxprocs,
1596 ffi::RSMPI_INFO_NULL,
1597 self.root_rank(),
1598 self.as_communicator().as_raw(),
1599 &mut result,
1600 errcodes.as_mut_ptr(),
1601 );
1602 }
1603 let fails = errcodes
1604 .into_iter()
1605 .filter(|&c| c != ffi::MPI_SUCCESS as i32)
1606 .count();
1607 if fails > 0 {
1608 Err(MpiError::Spawn(Rank::try_from(fails).unwrap(), maxprocs))
1609 } else {
1610 Ok(unsafe { InterCommunicator::from_raw(result) })
1611 }
1612 }
1613
1614 /// Spawns child processes
1615 ///
1616 /// # Standard sections
1617 /// 10.3.3, see MPI_Comm_spawn_multiple
1618 fn spawn_multiple(
1619 &self,
1620 commands: &[Command],
1621 maxprocs: &[Rank],
1622 ) -> Result<InterCommunicator, MpiError> {
1623 assert_eq!(commands.len(), maxprocs.len());
1624
1625 let progs = commands
1626 .iter()
1627 .map(|c| CString::new(c.get_program().to_string_lossy().as_bytes()))
1628 .collect::<Result<Vec<CString>, NulError>>()?;
1629 let mut progp: Vec<*mut c_char> = progs.iter().map(|p| p.as_ptr() as *mut c_char).collect();
1630 let mut argss = commands
1631 .iter()
1632 .map(|c| {
1633 c.get_args()
1634 .map(|os| CString::new(os.to_string_lossy().as_bytes()))
1635 .collect::<Result<Vec<CString>, NulError>>()
1636 })
1637 .collect::<Result<Vec<Vec<CString>>, NulError>>()?;
1638 let mut argvs: Vec<Vec<*mut c_char>> = argss
1639 .iter_mut()
1640 .map(|args| {
1641 args.iter_mut()
1642 .map(|a| a.as_ptr() as *mut c_char)
1643 .chain(std::iter::once(ptr::null_mut()))
1644 .collect()
1645 })
1646 .collect();
1647
1648 let mut argvv: Vec<*mut *mut c_char> =
1649 argvs.iter_mut().map(|argv| argv.as_mut_ptr()).collect();
1650
1651 let infos: Vec<_> = (0..commands.len())
1652 .map(|_| unsafe { ffi::RSMPI_INFO_NULL })
1653 .collect();
1654
1655 let mut result = unsafe { ffi::RSMPI_COMM_NULL };
1656 let sum_maxprocs: Rank = maxprocs.iter().sum();
1657 let mut errcodes = vec![0; usize::try_from(sum_maxprocs).unwrap()];
1658
1659 unsafe {
1660 ffi::MPI_Comm_spawn_multiple(
1661 progs.len().value_as().unwrap(),
1662 progp.as_mut_ptr(),
1663 argvv.as_mut_ptr(),
1664 maxprocs.as_ptr(),
1665 infos.as_ptr(),
1666 self.root_rank(),
1667 self.as_communicator().as_raw(),
1668 &mut result,
1669 errcodes.as_mut_ptr(),
1670 );
1671 }
1672 let fails = errcodes
1673 .into_iter()
1674 .filter(|&c| c != ffi::MPI_SUCCESS as i32)
1675 .count();
1676 if fails > 0 {
1677 Err(MpiError::Spawn(
1678 Rank::try_from(fails).unwrap(),
1679 sum_maxprocs,
1680 ))
1681 } else {
1682 Ok(unsafe { InterCommunicator::from_raw(result) })
1683 }
1684 }
1685}
1686
1687impl<'a> Root for Process<'a> {
1688 fn root_rank(&self) -> Rank {
1689 self.rank()
1690 }
1691}
1692
1693/// An operation to be used in a reduction or scan type operation, e.g. `MPI_SUM`
1694pub trait Operation: AsRaw<Raw = MPI_Op> {
1695 /// Returns whether the operation is commutative.
1696 ///
1697 /// # Standard section(s)
1698 ///
1699 /// 5.9.7
1700 fn is_commutative(&self) -> bool {
1701 unsafe {
1702 let mut commute = 0;
1703 ffi::MPI_Op_commutative(self.as_raw(), &mut commute);
1704 commute != 0
1705 }
1706 }
1707}
1708impl<'a, T: 'a + Operation> Operation for &'a T {}
1709
1710/// A built-in operation like `MPI_SUM`
1711///
1712/// # Examples
1713///
1714/// See `examples/reduce.rs`
1715///
1716/// # Standard section(s)
1717///
1718/// 5.9.2
1719#[derive(Copy, Clone)]
1720pub struct SystemOperation(MPI_Op);
1721
1722macro_rules! system_operation_constructors {
1723 ($($ctor:ident => $val:path),*) => (
1724 $(pub fn $ctor() -> SystemOperation {
1725 //! A built-in operation
1726 SystemOperation(unsafe { $val })
1727 })*
1728 )
1729}
1730
1731impl SystemOperation {
1732 system_operation_constructors! {
1733 max => ffi::RSMPI_MAX,
1734 min => ffi::RSMPI_MIN,
1735 sum => ffi::RSMPI_SUM,
1736 product => ffi::RSMPI_PROD,
1737 logical_and => ffi::RSMPI_LAND,
1738 bitwise_and => ffi::RSMPI_BAND,
1739 logical_or => ffi::RSMPI_LOR,
1740 bitwise_or => ffi::RSMPI_BOR,
1741 logical_xor => ffi::RSMPI_LXOR,
1742 bitwise_xor => ffi::RSMPI_BXOR
1743 }
1744}
1745
1746unsafe impl AsRaw for SystemOperation {
1747 type Raw = MPI_Op;
1748 fn as_raw(&self) -> Self::Raw {
1749 self.0
1750 }
1751}
1752
1753impl Operation for SystemOperation {}
1754
1755#[cfg(feature = "user-operations")]
1756trait Erased {}
1757
1758#[cfg(feature = "user-operations")]
1759impl<T> Erased for T {}
1760
1761/// A user-defined operation.
1762///
1763/// The lifetime `'a` of the operation is limited by the lifetime of the underlying closure.
1764///
1765/// For safety reasons, `UserOperation` is in of itself not considered an `Operation`, but a
1766/// reference of it is. This limitation may be lifted in the future when `Request` objects can
1767/// store finalizers.
1768///
1769/// **Note:** When a `UserOperation` is passed to a non-blocking API call, it must outlive the
1770/// completion of the request. This is normally enforced by the safe API, so this is only a concern
1771/// if you use the unsafe API. Do not rely on MPI's internal reference-counting here, because once
1772/// `UserOperation` is destroyed, the closure object will be deallocated even if the `MPI_Op` handle
1773/// is still alive due to outstanding references.
1774///
1775/// # Examples
1776///
1777/// See `examples/reduce.rs` and `examples/immediate_reduce.rs`
1778#[cfg(feature = "user-operations")]
1779pub struct UserOperation<'a> {
1780 op: MPI_Op,
1781 _anchor: Box<dyn Erased + 'a>, // keeps the internal data alive
1782}
1783
1784#[cfg(feature = "user-operations")]
1785impl<'a> fmt::Debug for UserOperation<'a> {
1786 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1787 f.debug_tuple("UserOperation").field(&self.op).finish()
1788 }
1789}
1790
1791#[cfg(feature = "user-operations")]
1792impl<'a> Drop for UserOperation<'a> {
1793 fn drop(&mut self) {
1794 unsafe {
1795 ffi::MPI_Op_free(&mut self.op);
1796 }
1797 }
1798}
1799
1800#[cfg(feature = "user-operations")]
1801unsafe impl<'a> AsRaw for UserOperation<'a> {
1802 type Raw = MPI_Op;
1803 fn as_raw(&self) -> Self::Raw {
1804 self.op
1805 }
1806}
1807
1808#[cfg(feature = "user-operations")]
1809impl<'a, 'b> Operation for &'b UserOperation<'a> {}
1810
1811#[cfg(feature = "user-operations")]
1812impl<'a> UserOperation<'a> {
1813 /// Define an operation using a closure. The operation must be associative.
1814 ///
1815 /// This is a more readable shorthand for the `new` method. Refer to [`new`](#method.new) for
1816 /// more information.
1817 pub fn associative<F>(function: F) -> Self
1818 where
1819 F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
1820 {
1821 Self::new(false, function)
1822 }
1823
1824 /// Define an operation using a closure. The operation must be both associative and
1825 /// commutative.
1826 ///
1827 /// This is a more readable shorthand for the `new` method. Refer to [`new`](#method.new) for
1828 /// more information.
1829 pub fn commutative<F>(function: F) -> Self
1830 where
1831 F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
1832 {
1833 Self::new(true, function)
1834 }
1835
1836 /// Creates an associative and possibly commutative operation using a closure.
1837 ///
1838 /// The closure receives two arguments `invec` and `inoutvec` as dynamically typed buffers. It
1839 /// shall set `inoutvec` to the value of `f(invec, inoutvec)`, where `f` is a binary associative
1840 /// operation.
1841 ///
1842 /// If the operation is also commutative, setting `commute` to `true` may yield performance
1843 /// benefits.
1844 ///
1845 /// **Note:** If the closure panics, the entire program will abort.
1846 ///
1847 /// # Standard section(s)
1848 ///
1849 /// 5.9.5
1850 pub fn new<F>(commute: bool, function: F) -> Self
1851 where
1852 F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
1853 {
1854 struct ClosureAnchor<F> {
1855 rust_closure: F,
1856 _ffi_closure: Option<Closure<'static>>,
1857 }
1858
1859 // must box it to prevent moves
1860 let mut anchor = Box::new(ClosureAnchor {
1861 rust_closure: function,
1862 _ffi_closure: None,
1863 });
1864
1865 let args = [
1866 Type::pointer(), // void *
1867 Type::pointer(), // void *
1868 Type::pointer(), // int32_t *
1869 Type::pointer(), // MPI_Datatype *
1870 ];
1871 #[allow(unused_mut)]
1872 let mut cif = Cif::new(args.iter().cloned(), Type::void());
1873 // MS-MPI uses "stdcall" calling convention on 32-bit x86
1874 #[cfg(all(msmpi, target_arch = "x86"))]
1875 cif.set_abi(libffi::raw::ffi_abi_FFI_STDCALL);
1876
1877 unsafe extern "C" fn trampoline<'a, F: Fn(DynBuffer, DynBufferMut) + Sync + 'a>(
1878 cif: &libffi::low::ffi_cif,
1879 _result: &mut c_void,
1880 args: *const *const c_void,
1881 user_function: &F,
1882 ) {
1883 debug_assert_eq!(4, cif.nargs);
1884
1885 let (mut invec, mut inoutvec, len, datatype) = (
1886 *(*args.offset(0) as *const *mut c_void),
1887 *(*args.offset(1) as *const *mut c_void),
1888 *(*args.offset(2) as *const *mut i32),
1889 *(*args.offset(3) as *const *mut ffi::MPI_Datatype),
1890 );
1891
1892 let len = *len;
1893 let datatype = DatatypeRef::from_raw(*datatype);
1894 if len == 0 {
1895 // precautionary measure: ensure pointers are not null
1896 invec = [].as_mut_ptr();
1897 inoutvec = [].as_mut_ptr();
1898 }
1899
1900 user_function(
1901 DynBuffer::from_raw(invec, len, datatype),
1902 DynBufferMut::from_raw(inoutvec, len, datatype),
1903 )
1904 }
1905
1906 let op;
1907 anchor._ffi_closure = Some(unsafe {
1908 let ffi_closure = Closure::new(cif, trampoline, &anchor.rust_closure);
1909 op = with_uninitialized(|op| {
1910 ffi::MPI_Op_create(Some(*ffi_closure.instantiate_code_ptr()), commute as _, op)
1911 })
1912 .1;
1913 mem::transmute(ffi_closure) // erase the lifetime
1914 });
1915 UserOperation {
1916 op,
1917 _anchor: anchor,
1918 }
1919 }
1920
1921 /// Creates a `UserOperation` from raw parts.
1922 ///
1923 /// Here, `anchor` is an arbitrary object that is stored alongside the `MPI_Op`.
1924 /// This can be used to attach finalizers to the object.
1925 ///
1926 /// # Safety
1927 /// MPI_Op must not be MPI_OP_NULL
1928 pub unsafe fn from_raw<T: 'a>(op: MPI_Op, anchor: Box<T>) -> Self {
1929 Self {
1930 op,
1931 _anchor: anchor,
1932 }
1933 }
1934}
1935
1936/// An unsafe user-defined operation.
1937///
1938/// Unsafe user-defined operations are created from pointers to functions that have the unsafe
1939/// signatures of user functions defined in the MPI C bindings, `UnsafeUserFunction`.
1940///
1941/// The recommended way to create user-defined operations is through the safer `UserOperation`
1942/// type. This type can be used as a work-around in situations where the `libffi` dependency is not
1943/// available.
1944pub struct UnsafeUserOperation {
1945 op: MPI_Op,
1946}
1947
1948impl fmt::Debug for UnsafeUserOperation {
1949 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1950 f.debug_tuple("UnsafeUserOperation")
1951 .field(&self.op)
1952 .finish()
1953 }
1954}
1955
1956impl Drop for UnsafeUserOperation {
1957 fn drop(&mut self) {
1958 unsafe {
1959 ffi::MPI_Op_free(&mut self.op);
1960 }
1961 }
1962}
1963
1964unsafe impl AsRaw for UnsafeUserOperation {
1965 type Raw = MPI_Op;
1966 fn as_raw(&self) -> Self::Raw {
1967 self.op
1968 }
1969}
1970
1971impl<'a> Operation for &'a UnsafeUserOperation {}
1972
1973/// A raw pointer to a function that can be used to define an `UnsafeUserOperation`.
1974#[cfg(not(all(msmpi, target_arch = "x86")))]
1975pub type UnsafeUserFunction =
1976 unsafe extern "C" fn(*mut c_void, *mut c_void, *mut c_int, *mut ffi::MPI_Datatype);
1977
1978/// A raw pointer to a function that can be used to define an `UnsafeUserOperation`.
1979///
1980/// MS-MPI uses "stdcall" rather than "C" calling convention. "stdcall" is ignored on x86_64
1981/// Windows and the default calling convention is used instead.
1982#[cfg(all(msmpi, target_arch = "x86"))]
1983pub type UnsafeUserFunction =
1984 unsafe extern "stdcall" fn(*mut c_void, *mut c_void, *mut c_int, *mut ffi::MPI_Datatype);
1985
1986impl UnsafeUserOperation {
1987 /// Define an unsafe operation using a function pointer. The operation must be associative.
1988 ///
1989 /// This is a more readable shorthand for the `new` method. Refer to [`new`](#method.new) for
1990 /// more information.
1991 ///
1992 /// # Safety
1993 /// The construction of an `UnsafeUserOperation` asserts that `function` is safe to be called
1994 /// in all reductions that this `UnsafeUserOperation` is used in.
1995 pub unsafe fn associative(function: UnsafeUserFunction) -> Self {
1996 Self::new(false, function)
1997 }
1998
1999 /// Define an unsafe operation using a function pointer. The operation must be both associative
2000 /// and commutative.
2001 ///
2002 /// This is a more readable shorthand for the `new` method. Refer to [`new`](#method.new) for
2003 /// more information.
2004 ///
2005 /// # Safety
2006 /// The construction of an `UnsafeUserOperation` asserts that `function` is safe to be called
2007 /// in all reductions that this `UnsafeUserOperation` is used in.
2008 pub unsafe fn commutative(function: UnsafeUserFunction) -> Self {
2009 Self::new(true, function)
2010 }
2011
2012 /// Creates an associative and possibly commutative unsafe operation using a function pointer.
2013 ///
2014 /// The function receives raw `*mut c_void` as `invec` and `inoutvec` and the number of elemnts
2015 /// of those two vectors as a `*mut c_int` `len`. It shall set `inoutvec`
2016 /// to the value of `f(invec, inoutvec)`, where `f` is a binary associative operation.
2017 ///
2018 /// If the operation is also commutative, setting `commute` to `true` may yield performance
2019 /// benefits.
2020 ///
2021 /// **Note:** The user function is not allowed to panic.
2022 ///
2023 /// # Standard section(s)
2024 ///
2025 /// 5.9.5
2026 ///
2027 /// # Safety
2028 /// The construction of an `UnsafeUserOperation` asserts that `function` is safe to be called
2029 /// in all reductions that this `UnsafeUserOperation` is used in.
2030 pub unsafe fn new(commute: bool, function: UnsafeUserFunction) -> Self {
2031 UnsafeUserOperation {
2032 op: with_uninitialized(|op| ffi::MPI_Op_create(Some(function), commute as _, op)).1,
2033 }
2034 }
2035}
2036
2037/// Perform a local reduction.
2038///
2039/// # Examples
2040///
2041/// See `examples/reduce.rs`
2042///
2043/// # Standard section(s)
2044///
2045/// 5.9.7
2046#[allow(clippy::needless_pass_by_value)]
2047pub fn reduce_local_into<S: ?Sized, R: ?Sized, O>(inbuf: &S, inoutbuf: &mut R, op: O)
2048where
2049 S: Buffer,
2050 R: BufferMut,
2051 O: Operation,
2052{
2053 unsafe {
2054 ffi::MPI_Reduce_local(
2055 inbuf.pointer(),
2056 inoutbuf.pointer_mut(),
2057 inbuf.count(),
2058 inbuf.as_datatype().as_raw(),
2059 op.as_raw(),
2060 );
2061 }
2062}