mpi/
collective.rs

1//! Collective communication
2//!
3//! Developing...
4//!
5//! # Unfinished features
6//!
7//! - **5.8**: All-to-all, `MPI_Alltoallw()`
8//! - **5.10**: Reduce-scatter, `MPI_Reduce_scatter()`
9//! - **5.12**: Nonblocking collective operations,
10//! `MPI_Ialltoallw()`, `MPI_Ireduce_scatter()`
11
12use std::ffi::{CString, NulError};
13#[cfg(feature = "user-operations")]
14use std::mem;
15use std::os::raw::{c_char, c_int, c_void};
16use std::process::Command;
17use std::{fmt, ptr};
18
19use conv::ConvUtil;
20#[cfg(feature = "user-operations")]
21use libffi::middle::{Cif, Closure, Type};
22
23use crate::ffi::MPI_Op;
24use crate::{ffi, MpiError};
25
26use crate::datatype::traits::*;
27#[cfg(feature = "user-operations")]
28use crate::datatype::{DatatypeRef, DynBuffer, DynBufferMut};
29use crate::raw::traits::*;
30use crate::request::{Request, Scope, StaticScope};
31use crate::topology::{traits::*, InterCommunicator};
32use crate::topology::{Process, Rank};
33use crate::with_uninitialized;
34
35/// Collective communication traits
36pub mod traits {
37    pub use super::{CommunicatorCollectives, Operation, Root};
38}
39
40/// Collective communication patterns defined on `Communicator`s
41pub trait CommunicatorCollectives: Communicator {
42    /// Barrier synchronization among all processes in a `Communicator`
43    ///
44    /// Partake in a barrier synchronization across all processes in the `Communicator` `&self`.
45    ///
46    /// Calling processes (or threads within the calling processes) will enter the barrier and block
47    /// execution until all processes in the `Communicator` `&self` have entered the barrier.
48    ///
49    /// # Examples
50    ///
51    /// See `examples/barrier.rs`
52    ///
53    /// # Standard section(s)
54    ///
55    /// 5.3
56    fn barrier(&self) {
57        unsafe {
58            ffi::MPI_Barrier(self.as_raw());
59        }
60    }
61
62    /// Gather contents of buffers on all participating processes.
63    ///
64    /// After the call completes, the contents of the send `Buffer`s on all processes will be
65    /// concatenated into the receive `Buffer`s on all ranks.
66    ///
67    /// All send `Buffer`s must contain the same count of elements.
68    ///
69    /// # Examples
70    ///
71    /// See `examples/all_gather.rs`
72    ///
73    /// # Standard section(s)
74    ///
75    /// 5.7
76    fn all_gather_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
77    where
78        S: Buffer,
79        R: BufferMut,
80    {
81        unsafe {
82            ffi::MPI_Allgather(
83                sendbuf.pointer(),
84                sendbuf.count(),
85                sendbuf.as_datatype().as_raw(),
86                recvbuf.pointer_mut(),
87                recvbuf.count() / self.target_size(),
88                recvbuf.as_datatype().as_raw(),
89                self.as_raw(),
90            );
91        }
92    }
93
94    /// Gather contents of buffers on all participating processes.
95    ///
96    /// After the call completes, the contents of the send `Buffer`s on all processes will be
97    /// concatenated into the receive `Buffer`s on all ranks.
98    ///
99    /// The send `Buffer`s may contain different counts of elements on different processes. The
100    /// distribution of elements in the receive `Buffer`s is specified via `Partitioned`.
101    ///
102    /// # Examples
103    ///
104    /// See `examples/all_gather_varcount.rs`
105    ///
106    /// # Standard section(s)
107    ///
108    /// 5.7
109    fn all_gather_varcount_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
110    where
111        S: Buffer,
112        R: PartitionedBufferMut,
113    {
114        unsafe {
115            ffi::MPI_Allgatherv(
116                sendbuf.pointer(),
117                sendbuf.count(),
118                sendbuf.as_datatype().as_raw(),
119                recvbuf.pointer_mut(),
120                recvbuf.counts().as_ptr(),
121                recvbuf.displs().as_ptr(),
122                recvbuf.as_datatype().as_raw(),
123                self.as_raw(),
124            );
125        }
126    }
127
128    /// Distribute the send `Buffer`s from all processes to the receive `Buffer`s on all processes.
129    ///
130    /// Each process sends and receives the same count of elements to and from each process.
131    ///
132    /// # Examples
133    ///
134    /// See `examples/all_to_all.rs`
135    ///
136    /// # Standard section(s)
137    ///
138    /// 5.8
139    fn all_to_all_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
140    where
141        S: Buffer,
142        R: BufferMut,
143    {
144        let c_size = self.target_size();
145        unsafe {
146            ffi::MPI_Alltoall(
147                sendbuf.pointer(),
148                sendbuf.count() / c_size,
149                sendbuf.as_datatype().as_raw(),
150                recvbuf.pointer_mut(),
151                recvbuf.count() / c_size,
152                recvbuf.as_datatype().as_raw(),
153                self.as_raw(),
154            );
155        }
156    }
157
158    /// Distribute the send `Buffer`s from all processes to the receive `Buffer`s on all processes.
159    ///
160    /// The count of elements to send and receive to and from each process can vary and is specified
161    /// using `Partitioned`.
162    ///
163    /// # Standard section(s)
164    ///
165    /// 5.8
166    fn all_to_all_varcount_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
167    where
168        S: PartitionedBuffer,
169        R: PartitionedBufferMut,
170    {
171        unsafe {
172            ffi::MPI_Alltoallv(
173                sendbuf.pointer(),
174                sendbuf.counts().as_ptr(),
175                sendbuf.displs().as_ptr(),
176                sendbuf.as_datatype().as_raw(),
177                recvbuf.pointer_mut(),
178                recvbuf.counts().as_ptr(),
179                recvbuf.displs().as_ptr(),
180                recvbuf.as_datatype().as_raw(),
181                self.as_raw(),
182            );
183        }
184    }
185
186    /// Performs a global reduction under the operation `op` of the input data in `sendbuf` and
187    /// stores the result in `recvbuf` on all processes.
188    ///
189    /// # Examples
190    ///
191    /// See `examples/reduce.rs`
192    ///
193    /// # Standard section(s)
194    ///
195    /// 5.9.6
196    fn all_reduce_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
197    where
198        S: Buffer,
199        R: BufferMut,
200        O: Operation,
201    {
202        unsafe {
203            ffi::MPI_Allreduce(
204                sendbuf.pointer(),
205                recvbuf.pointer_mut(),
206                sendbuf.count(),
207                sendbuf.as_datatype().as_raw(),
208                op.as_raw(),
209                self.as_raw(),
210            );
211        }
212    }
213
214    /// Performs an element-wise global reduction under the operation `op` of the input data in
215    /// `sendbuf` and scatters the result into equal sized blocks in the receive buffers on all
216    /// processes.
217    ///
218    /// # Examples
219    ///
220    /// See `examples/reduce.rs`
221    ///
222    /// # Standard section(s)
223    ///
224    /// 5.10.1
225    fn reduce_scatter_block_into<S: ?Sized, R: ?Sized, O>(
226        &self,
227        sendbuf: &S,
228        recvbuf: &mut R,
229        op: O,
230    ) where
231        S: Buffer,
232        R: BufferMut,
233        O: Operation,
234    {
235        assert_eq!(recvbuf.count() * self.target_size(), sendbuf.count());
236        unsafe {
237            ffi::MPI_Reduce_scatter_block(
238                sendbuf.pointer(),
239                recvbuf.pointer_mut(),
240                recvbuf.count(),
241                sendbuf.as_datatype().as_raw(),
242                op.as_raw(),
243                self.as_raw(),
244            );
245        }
246    }
247
248    /// Performs a global inclusive prefix reduction of the data in `sendbuf` into `recvbuf` under
249    /// operation `op`.
250    ///
251    /// # Examples
252    ///
253    /// See `examples/scan.rs`
254    ///
255    /// # Standard section(s)
256    ///
257    /// 5.11.1
258    fn scan_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
259    where
260        S: Buffer,
261        R: BufferMut,
262        O: Operation,
263    {
264        unsafe {
265            ffi::MPI_Scan(
266                sendbuf.pointer(),
267                recvbuf.pointer_mut(),
268                sendbuf.count(),
269                sendbuf.as_datatype().as_raw(),
270                op.as_raw(),
271                self.as_raw(),
272            );
273        }
274    }
275
276    /// Performs a global exclusive prefix reduction of the data in `sendbuf` into `recvbuf` under
277    /// operation `op`.
278    ///
279    /// # Examples
280    ///
281    /// See `examples/scan.rs`
282    ///
283    /// # Standard section(s)
284    ///
285    /// 5.11.2
286    fn exclusive_scan_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
287    where
288        S: Buffer,
289        R: BufferMut,
290        O: Operation,
291    {
292        unsafe {
293            ffi::MPI_Exscan(
294                sendbuf.pointer(),
295                recvbuf.pointer_mut(),
296                sendbuf.count(),
297                sendbuf.as_datatype().as_raw(),
298                op.as_raw(),
299                self.as_raw(),
300            );
301        }
302    }
303
304    /// Non-blocking barrier synchronization among all processes in a `Communicator`
305    ///
306    /// Calling processes (or threads within the calling processes) enter the barrier. Completion
307    /// methods on the associated request object will block until all processes have entered.
308    ///
309    /// # Examples
310    ///
311    /// See `examples/immediate_barrier.rs`
312    ///
313    /// # Standard section(s)
314    ///
315    /// 5.12.1
316    fn immediate_barrier(&self) -> Request<'static, ()> {
317        unsafe {
318            Request::from_raw(
319                with_uninitialized(|request| ffi::MPI_Ibarrier(self.as_raw(), request)).1,
320                &(),
321                StaticScope,
322            )
323        }
324    }
325
326    /// Initiate non-blocking gather of the contents of all `sendbuf`s into all `rcevbuf`s on all
327    /// processes in the communicator.
328    ///
329    /// # Examples
330    ///
331    /// See `examples/immediate_all_gather.rs`
332    ///
333    /// # Standard section(s)
334    ///
335    /// 5.12.5
336    fn immediate_all_gather_into<'a, S: ?Sized, R: ?Sized, Sc>(
337        &self,
338        scope: Sc,
339        sendbuf: &'a S,
340        recvbuf: &'a mut R,
341    ) -> Request<'a, R, Sc>
342    where
343        S: 'a + Buffer,
344        R: 'a + BufferMut,
345        Sc: Scope<'a>,
346    {
347        unsafe {
348            let recvcount = recvbuf.count() / self.target_size();
349            Request::from_raw(
350                with_uninitialized(|request| {
351                    ffi::MPI_Iallgather(
352                        sendbuf.pointer(),
353                        sendbuf.count(),
354                        sendbuf.as_datatype().as_raw(),
355                        recvbuf.pointer_mut(),
356                        recvcount,
357                        recvbuf.as_datatype().as_raw(),
358                        self.as_raw(),
359                        request,
360                    )
361                })
362                .1,
363                recvbuf,
364                scope,
365            )
366        }
367    }
368
369    /// Initiate non-blocking gather of the contents of all `sendbuf`s into all `rcevbuf`s on all
370    /// processes in the communicator.
371    ///
372    /// # Examples
373    ///
374    /// See `examples/immediate_all_gather_varcount.rs`
375    ///
376    /// # Standard section(s)
377    ///
378    /// 5.12.5
379    fn immediate_all_gather_varcount_into<'a, S: ?Sized, R: ?Sized, Sc>(
380        &self,
381        scope: Sc,
382        sendbuf: &'a S,
383        recvbuf: &'a mut R,
384    ) -> Request<'a, R, Sc>
385    where
386        S: 'a + Buffer,
387        R: 'a + PartitionedBufferMut,
388        Sc: Scope<'a>,
389    {
390        unsafe {
391            Request::from_raw(
392                with_uninitialized(|request| {
393                    ffi::MPI_Iallgatherv(
394                        sendbuf.pointer(),
395                        sendbuf.count(),
396                        sendbuf.as_datatype().as_raw(),
397                        recvbuf.pointer_mut(),
398                        recvbuf.counts().as_ptr(),
399                        recvbuf.displs().as_ptr(),
400                        recvbuf.as_datatype().as_raw(),
401                        self.as_raw(),
402                        request,
403                    )
404                })
405                .1,
406                recvbuf,
407                scope,
408            )
409        }
410    }
411
412    /// Initiate non-blocking all-to-all communication.
413    ///
414    /// # Examples
415    ///
416    /// See `examples/immediate_all_to_all.rs`
417    ///
418    /// # Standard section(s)
419    ///
420    /// 5.12.6
421    fn immediate_all_to_all_into<'a, S: ?Sized, R: ?Sized, Sc>(
422        &self,
423        scope: Sc,
424        sendbuf: &'a S,
425        recvbuf: &'a mut R,
426    ) -> Request<'a, R, Sc>
427    where
428        S: 'a + Buffer,
429        R: 'a + BufferMut,
430        Sc: Scope<'a>,
431    {
432        let c_size = self.target_size();
433        unsafe {
434            Request::from_raw(
435                with_uninitialized(|request| {
436                    ffi::MPI_Ialltoall(
437                        sendbuf.pointer(),
438                        sendbuf.count() / c_size,
439                        sendbuf.as_datatype().as_raw(),
440                        recvbuf.pointer_mut(),
441                        recvbuf.count() / c_size,
442                        recvbuf.as_datatype().as_raw(),
443                        self.as_raw(),
444                        request,
445                    )
446                })
447                .1,
448                recvbuf,
449                scope,
450            )
451        }
452    }
453
454    /// Initiate non-blocking all-to-all communication.
455    ///
456    /// # Standard section(s)
457    ///
458    /// 5.12.6
459    fn immediate_all_to_all_varcount_into<'a, S: ?Sized, R: ?Sized, Sc>(
460        &self,
461        scope: Sc,
462        sendbuf: &'a S,
463        recvbuf: &'a mut R,
464    ) -> Request<'a, R, Sc>
465    where
466        S: 'a + PartitionedBuffer,
467        R: 'a + PartitionedBufferMut,
468        Sc: Scope<'a>,
469    {
470        unsafe {
471            Request::from_raw(
472                with_uninitialized(|request| {
473                    ffi::MPI_Ialltoallv(
474                        sendbuf.pointer(),
475                        sendbuf.counts().as_ptr(),
476                        sendbuf.displs().as_ptr(),
477                        sendbuf.as_datatype().as_raw(),
478                        recvbuf.pointer_mut(),
479                        recvbuf.counts().as_ptr(),
480                        recvbuf.displs().as_ptr(),
481                        recvbuf.as_datatype().as_raw(),
482                        self.as_raw(),
483                        request,
484                    )
485                })
486                .1,
487                recvbuf,
488                scope,
489            )
490        }
491    }
492
493    /// Initiates a non-blocking global reduction under the operation `op` of the input data in
494    /// `sendbuf` and stores the result in `recvbuf` on all processes.
495    ///
496    /// # Examples
497    ///
498    /// See `examples/immediate_reduce.rs`
499    ///
500    /// # Standard section(s)
501    ///
502    /// 5.12.8
503    fn immediate_all_reduce_into<'a, S: ?Sized, R: ?Sized, O, Sc>(
504        &self,
505        scope: Sc,
506        sendbuf: &'a S,
507        recvbuf: &'a mut R,
508        op: O,
509    ) -> Request<'a, R, Sc>
510    where
511        S: 'a + Buffer,
512        R: 'a + BufferMut,
513        O: 'a + Operation,
514        Sc: Scope<'a>,
515    {
516        unsafe {
517            Request::from_raw(
518                with_uninitialized(|request| {
519                    ffi::MPI_Iallreduce(
520                        sendbuf.pointer(),
521                        recvbuf.pointer_mut(),
522                        sendbuf.count(),
523                        sendbuf.as_datatype().as_raw(),
524                        op.as_raw(),
525                        self.as_raw(),
526                        request,
527                    )
528                })
529                .1,
530                recvbuf,
531                scope,
532            )
533        }
534    }
535
536    /// Initiates a non-blocking element-wise global reduction under the operation `op` of the
537    /// input data in `sendbuf` and scatters the result into equal sized blocks in the receive
538    /// buffers on all processes.
539    ///
540    /// # Examples
541    ///
542    /// See `examples/immediate_reduce.rs`
543    ///
544    /// # Standard section(s)
545    ///
546    /// 5.12.9
547    fn immediate_reduce_scatter_block_into<'a, S: ?Sized, R: ?Sized, O, Sc>(
548        &self,
549        scope: Sc,
550        sendbuf: &'a S,
551        recvbuf: &'a mut R,
552        op: O,
553    ) -> Request<'a, R, Sc>
554    where
555        S: 'a + Buffer,
556        R: 'a + BufferMut,
557        O: 'a + Operation,
558        Sc: Scope<'a>,
559    {
560        assert_eq!(recvbuf.count() * self.target_size(), sendbuf.count());
561        unsafe {
562            Request::from_raw(
563                with_uninitialized(|request| {
564                    ffi::MPI_Ireduce_scatter_block(
565                        sendbuf.pointer(),
566                        recvbuf.pointer_mut(),
567                        recvbuf.count(),
568                        sendbuf.as_datatype().as_raw(),
569                        op.as_raw(),
570                        self.as_raw(),
571                        request,
572                    )
573                })
574                .1,
575                recvbuf,
576                scope,
577            )
578        }
579    }
580
581    /// Initiates a non-blocking global inclusive prefix reduction of the data in `sendbuf` into
582    /// `recvbuf` under operation `op`.
583    ///
584    /// # Examples
585    ///
586    /// See `examples/immediate_scan.rs`
587    ///
588    /// # Standard section(s)
589    ///
590    /// 5.12.11
591    fn immediate_scan_into<'a, S: ?Sized, R: ?Sized, O, Sc>(
592        &self,
593        scope: Sc,
594        sendbuf: &'a S,
595        recvbuf: &'a mut R,
596        op: O,
597    ) -> Request<'a, R, Sc>
598    where
599        S: 'a + Buffer,
600        R: 'a + BufferMut,
601        O: 'a + Operation,
602        Sc: Scope<'a>,
603    {
604        unsafe {
605            Request::from_raw(
606                with_uninitialized(|request| {
607                    ffi::MPI_Iscan(
608                        sendbuf.pointer(),
609                        recvbuf.pointer_mut(),
610                        sendbuf.count(),
611                        sendbuf.as_datatype().as_raw(),
612                        op.as_raw(),
613                        self.as_raw(),
614                        request,
615                    )
616                })
617                .1,
618                recvbuf,
619                scope,
620            )
621        }
622    }
623
624    /// Initiates a non-blocking global exclusive prefix reduction of the data in `sendbuf` into
625    /// `recvbuf` under operation `op`.
626    ///
627    /// # Examples
628    ///
629    /// See `examples/immediate_scan.rs`
630    ///
631    /// # Standard section(s)
632    ///
633    /// 5.12.12
634    fn immediate_exclusive_scan_into<'a, S: ?Sized, R: ?Sized, O, Sc>(
635        &self,
636        scope: Sc,
637        sendbuf: &'a S,
638        recvbuf: &'a mut R,
639        op: O,
640    ) -> Request<'a, R, Sc>
641    where
642        S: 'a + Buffer,
643        R: 'a + BufferMut,
644        O: 'a + Operation,
645        Sc: Scope<'a>,
646    {
647        unsafe {
648            Request::from_raw(
649                with_uninitialized(|request| {
650                    ffi::MPI_Iexscan(
651                        sendbuf.pointer(),
652                        recvbuf.pointer_mut(),
653                        sendbuf.count(),
654                        sendbuf.as_datatype().as_raw(),
655                        op.as_raw(),
656                        self.as_raw(),
657                        request,
658                    )
659                })
660                .1,
661                recvbuf,
662                scope,
663            )
664        }
665    }
666}
667
668impl<C: Communicator + ?Sized> CommunicatorCollectives for C {}
669
670/// Something that can take the role of 'root' in a collective operation.
671///
672/// Many collective operations define a 'root' process that takes a special role in the
673/// communication. These collective operations are implemented as default methods of this trait.
674pub trait Root: AsCommunicator {
675    /// Rank of the root process
676    fn root_rank(&self) -> Rank;
677
678    /// Broadcast of the contents of a buffer
679    ///
680    /// After the call completes, the `Buffer` on all processes in the `Communicator` of the `Root`
681    /// `&self` will contain what it contains on the `Root`.
682    ///
683    /// # Examples
684    ///
685    /// See `examples/broadcast.rs`
686    ///
687    /// # Standard section(s)
688    ///
689    /// 5.4
690    fn broadcast_into<Buf: ?Sized>(&self, buffer: &mut Buf)
691    where
692        Buf: BufferMut,
693    {
694        unsafe {
695            ffi::MPI_Bcast(
696                buffer.pointer_mut(),
697                buffer.count(),
698                buffer.as_datatype().as_raw(),
699                self.root_rank(),
700                self.as_communicator().as_raw(),
701            );
702        }
703    }
704
705    /// Gather contents of buffers on `Root`.
706    ///
707    /// After the call completes, the contents of the `Buffer`s on all ranks will be
708    /// concatenated into the `Buffer` on `Root`.
709    ///
710    /// All send `Buffer`s must have the same count of elements.
711    ///
712    /// This function must be called on all non-root processes.
713    ///
714    /// # Examples
715    ///
716    /// See `examples/gather.rs`
717    ///
718    /// # Standard section(s)
719    ///
720    /// 5.5
721    fn gather_into<S: ?Sized>(&self, sendbuf: &S)
722    where
723        S: Buffer,
724    {
725        assert_ne!(self.as_communicator().rank(), self.root_rank());
726        unsafe {
727            ffi::MPI_Gather(
728                sendbuf.pointer(),
729                sendbuf.count(),
730                sendbuf.as_datatype().as_raw(),
731                ptr::null_mut(),
732                0,
733                u8::equivalent_datatype().as_raw(),
734                self.root_rank(),
735                self.as_communicator().as_raw(),
736            );
737        }
738    }
739
740    /// Gather contents of buffers on `Root`.
741    ///
742    /// After the call completes, the contents of the `Buffer`s on all ranks will be
743    /// concatenated into the `Buffer` on `Root`.
744    ///
745    /// All send `Buffer`s must have the same count of elements.
746    ///
747    /// This function must be called on the root process.
748    ///
749    /// # Examples
750    ///
751    /// See `examples/gather.rs`
752    ///
753    /// # Standard section(s)
754    ///
755    /// 5.5
756    fn gather_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
757    where
758        S: Buffer,
759        R: BufferMut,
760    {
761        assert_eq!(self.as_communicator().rank(), self.root_rank());
762        unsafe {
763            let recvcount = recvbuf.count() / self.as_communicator().target_size();
764            ffi::MPI_Gather(
765                sendbuf.pointer(),
766                sendbuf.count(),
767                sendbuf.as_datatype().as_raw(),
768                recvbuf.pointer_mut(),
769                recvcount,
770                recvbuf.as_datatype().as_raw(),
771                self.root_rank(),
772                self.as_communicator().as_raw(),
773            );
774        }
775    }
776
777    /// Gather contents of buffers on `Root`.
778    ///
779    /// After the call completes, the contents of the `Buffer`s on all ranks will be
780    /// concatenated into the `Buffer` on `Root`.
781    ///
782    /// The send `Buffer`s may contain different counts of elements on different processes. The
783    /// distribution of elements in the receive `Buffer` is specified via `Partitioned`.
784    ///
785    /// This function must be called on all non-root processes.
786    ///
787    /// # Examples
788    ///
789    /// See `examples/gather_varcount.rs`
790    ///
791    /// # Standard section(s)
792    ///
793    /// 5.5
794    fn gather_varcount_into<S: ?Sized>(&self, sendbuf: &S)
795    where
796        S: Buffer,
797    {
798        assert_ne!(self.as_communicator().rank(), self.root_rank());
799        unsafe {
800            ffi::MPI_Gatherv(
801                sendbuf.pointer(),
802                sendbuf.count(),
803                sendbuf.as_datatype().as_raw(),
804                ptr::null_mut(),
805                ptr::null(),
806                ptr::null(),
807                u8::equivalent_datatype().as_raw(),
808                self.root_rank(),
809                self.as_communicator().as_raw(),
810            );
811        }
812    }
813
814    /// Gather contents of buffers on `Root`.
815    ///
816    /// After the call completes, the contents of the `Buffer`s on all ranks will be
817    /// concatenated into the `Buffer` on `Root`.
818    ///
819    /// The send `Buffer`s may contain different counts of elements on different processes. The
820    /// distribution of elements in the receive `Buffer` is specified via `Partitioned`.
821    ///
822    /// This function must be called on the root process.
823    ///
824    /// # Examples
825    ///
826    /// See `examples/gather_varcount.rs`
827    ///
828    /// # Standard section(s)
829    ///
830    /// 5.5
831    fn gather_varcount_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
832    where
833        S: Buffer,
834        R: PartitionedBufferMut,
835    {
836        assert_eq!(self.as_communicator().rank(), self.root_rank());
837        unsafe {
838            ffi::MPI_Gatherv(
839                sendbuf.pointer(),
840                sendbuf.count(),
841                sendbuf.as_datatype().as_raw(),
842                recvbuf.pointer_mut(),
843                recvbuf.counts().as_ptr(),
844                recvbuf.displs().as_ptr(),
845                recvbuf.as_datatype().as_raw(),
846                self.root_rank(),
847                self.as_communicator().as_raw(),
848            );
849        }
850    }
851
852    /// Scatter contents of a buffer on the root process to all processes.
853    ///
854    /// After the call completes each participating process will have received a part of the send
855    /// `Buffer` on the root process.
856    ///
857    /// All send `Buffer`s must have the same count of elements.
858    ///
859    /// This function must be called on all non-root processes.
860    ///
861    /// # Examples
862    ///
863    /// See `examples/scatter.rs`
864    ///
865    /// # Standard section(s)
866    ///
867    /// 5.6
868    fn scatter_into<R: ?Sized>(&self, recvbuf: &mut R)
869    where
870        R: BufferMut,
871    {
872        assert_ne!(self.as_communicator().rank(), self.root_rank());
873        unsafe {
874            ffi::MPI_Scatter(
875                ptr::null(),
876                0,
877                u8::equivalent_datatype().as_raw(),
878                recvbuf.pointer_mut(),
879                recvbuf.count(),
880                recvbuf.as_datatype().as_raw(),
881                self.root_rank(),
882                self.as_communicator().as_raw(),
883            );
884        }
885    }
886
887    /// Scatter contents of a buffer on the root process to all processes.
888    ///
889    /// After the call completes each participating process will have received a part of the send
890    /// `Buffer` on the root process.
891    ///
892    /// All send `Buffer`s must have the same count of elements.
893    ///
894    /// This function must be called on the root process.
895    ///
896    /// # Examples
897    ///
898    /// See `examples/scatter.rs`
899    ///
900    /// # Standard section(s)
901    ///
902    /// 5.6
903    fn scatter_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
904    where
905        S: Buffer,
906        R: BufferMut,
907    {
908        assert_eq!(self.as_communicator().rank(), self.root_rank());
909        let sendcount = sendbuf.count() / self.as_communicator().target_size();
910        unsafe {
911            ffi::MPI_Scatter(
912                sendbuf.pointer(),
913                sendcount,
914                sendbuf.as_datatype().as_raw(),
915                recvbuf.pointer_mut(),
916                recvbuf.count(),
917                recvbuf.as_datatype().as_raw(),
918                self.root_rank(),
919                self.as_communicator().as_raw(),
920            );
921        }
922    }
923
924    /// Scatter contents of a buffer on the root process to all processes.
925    ///
926    /// After the call completes each participating process will have received a part of the send
927    /// `Buffer` on the root process.
928    ///
929    /// The send `Buffer` may contain different counts of elements for different processes. The
930    /// distribution of elements in the send `Buffer` is specified via `Partitioned`.
931    ///
932    /// This function must be called on all non-root processes.
933    ///
934    /// # Examples
935    ///
936    /// See `examples/scatter_varcount.rs`
937    ///
938    /// # Standard section(s)
939    ///
940    /// 5.6
941    fn scatter_varcount_into<R: ?Sized>(&self, recvbuf: &mut R)
942    where
943        R: BufferMut,
944    {
945        assert_ne!(self.as_communicator().rank(), self.root_rank());
946        unsafe {
947            ffi::MPI_Scatterv(
948                ptr::null(),
949                ptr::null(),
950                ptr::null(),
951                u8::equivalent_datatype().as_raw(),
952                recvbuf.pointer_mut(),
953                recvbuf.count(),
954                recvbuf.as_datatype().as_raw(),
955                self.root_rank(),
956                self.as_communicator().as_raw(),
957            );
958        }
959    }
960
961    /// Scatter contents of a buffer on the root process to all processes.
962    ///
963    /// After the call completes each participating process will have received a part of the send
964    /// `Buffer` on the root process.
965    ///
966    /// The send `Buffer` may contain different counts of elements for different processes. The
967    /// distribution of elements in the send `Buffer` is specified via `Partitioned`.
968    ///
969    /// This function must be called on the root process.
970    ///
971    /// # Examples
972    ///
973    /// See `examples/scatter_varcount.rs`
974    ///
975    /// # Standard section(s)
976    ///
977    /// 5.6
978    fn scatter_varcount_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
979    where
980        S: PartitionedBuffer,
981        R: BufferMut,
982    {
983        assert_eq!(self.as_communicator().rank(), self.root_rank());
984        unsafe {
985            ffi::MPI_Scatterv(
986                sendbuf.pointer(),
987                sendbuf.counts().as_ptr(),
988                sendbuf.displs().as_ptr(),
989                sendbuf.as_datatype().as_raw(),
990                recvbuf.pointer_mut(),
991                recvbuf.count(),
992                recvbuf.as_datatype().as_raw(),
993                self.root_rank(),
994                self.as_communicator().as_raw(),
995            );
996        }
997    }
998
999    /// Performs a global reduction under the operation `op` of the input data in `sendbuf` and
1000    /// stores the result on the `Root` process.
1001    ///
1002    /// This function must be called on all non-root processes.
1003    ///
1004    /// # Examples
1005    ///
1006    /// See `examples/reduce.rs`
1007    ///
1008    /// # Standard section(s)
1009    ///
1010    /// 5.9.1
1011    fn reduce_into<S: ?Sized, O>(&self, sendbuf: &S, op: O)
1012    where
1013        S: Buffer,
1014        O: Operation,
1015    {
1016        assert_ne!(self.as_communicator().rank(), self.root_rank());
1017        unsafe {
1018            ffi::MPI_Reduce(
1019                sendbuf.pointer(),
1020                ptr::null_mut(),
1021                sendbuf.count(),
1022                sendbuf.as_datatype().as_raw(),
1023                op.as_raw(),
1024                self.root_rank(),
1025                self.as_communicator().as_raw(),
1026            );
1027        }
1028    }
1029
1030    /// Performs a global reduction under the operation `op` of the input data in `sendbuf` and
1031    /// stores the result on the `Root` process.
1032    ///
1033    /// This function must be called on the root process.
1034    ///
1035    /// # Examples
1036    ///
1037    /// See `examples/reduce.rs`
1038    ///
1039    /// # Standard section(s)
1040    ///
1041    /// 5.9.1
1042    fn reduce_into_root<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
1043    where
1044        S: Buffer,
1045        R: BufferMut,
1046        O: Operation,
1047    {
1048        assert_eq!(self.as_communicator().rank(), self.root_rank());
1049        unsafe {
1050            ffi::MPI_Reduce(
1051                sendbuf.pointer(),
1052                recvbuf.pointer_mut(),
1053                sendbuf.count(),
1054                sendbuf.as_datatype().as_raw(),
1055                op.as_raw(),
1056                self.root_rank(),
1057                self.as_communicator().as_raw(),
1058            );
1059        }
1060    }
1061
1062    /// Initiate broadcast of a value from the `Root` process to all other processes.
1063    ///
1064    /// # Examples
1065    ///
1066    /// See `examples/immediate_broadcast.rs`
1067    ///
1068    /// # Standard section(s)
1069    ///
1070    /// 5.12.2
1071    fn immediate_broadcast_into<'a, Buf: ?Sized, Sc>(
1072        &self,
1073        scope: Sc,
1074        buf: &'a mut Buf,
1075    ) -> Request<'a, Buf, Sc>
1076    where
1077        Buf: 'a + BufferMut,
1078        Sc: Scope<'a>,
1079    {
1080        unsafe {
1081            Request::from_raw(
1082                with_uninitialized(|request| {
1083                    ffi::MPI_Ibcast(
1084                        buf.pointer_mut(),
1085                        buf.count(),
1086                        buf.as_datatype().as_raw(),
1087                        self.root_rank(),
1088                        self.as_communicator().as_raw(),
1089                        request,
1090                    )
1091                })
1092                .1,
1093                buf,
1094                scope,
1095            )
1096        }
1097    }
1098
1099    /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1100    ///
1101    /// This function must be called on all non-root processes.
1102    ///
1103    /// # Examples
1104    ///
1105    /// See `examples/immediate_gather.rs`
1106    ///
1107    /// # Standard section(s)
1108    ///
1109    /// 5.12.3
1110    fn immediate_gather_into<'a, S: ?Sized, Sc>(
1111        &self,
1112        scope: Sc,
1113        sendbuf: &'a S,
1114    ) -> Request<'a, S, Sc>
1115    where
1116        S: 'a + Buffer,
1117        Sc: Scope<'a>,
1118    {
1119        assert_ne!(self.as_communicator().rank(), self.root_rank());
1120        unsafe {
1121            Request::from_raw(
1122                with_uninitialized(|request| {
1123                    ffi::MPI_Igather(
1124                        sendbuf.pointer(),
1125                        sendbuf.count(),
1126                        sendbuf.as_datatype().as_raw(),
1127                        ptr::null_mut(),
1128                        0,
1129                        u8::equivalent_datatype().as_raw(),
1130                        self.root_rank(),
1131                        self.as_communicator().as_raw(),
1132                        request,
1133                    )
1134                })
1135                .1,
1136                sendbuf,
1137                scope,
1138            )
1139        }
1140    }
1141
1142    /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1143    ///
1144    /// This function must be called on the root processes.
1145    ///
1146    /// # Examples
1147    ///
1148    /// See `examples/immediate_gather.rs`
1149    ///
1150    /// # Standard section(s)
1151    ///
1152    /// 5.12.3
1153    fn immediate_gather_into_root<'a, S: ?Sized, R: ?Sized, Sc>(
1154        &self,
1155        scope: Sc,
1156        sendbuf: &'a S,
1157        recvbuf: &'a mut R,
1158    ) -> Request<'a, R, Sc>
1159    where
1160        S: 'a + Buffer,
1161        R: 'a + BufferMut,
1162        Sc: Scope<'a>,
1163    {
1164        assert_eq!(self.as_communicator().rank(), self.root_rank());
1165        unsafe {
1166            let recvcount = recvbuf.count() / self.as_communicator().target_size();
1167            Request::from_raw(
1168                with_uninitialized(|request| {
1169                    ffi::MPI_Igather(
1170                        sendbuf.pointer(),
1171                        sendbuf.count(),
1172                        sendbuf.as_datatype().as_raw(),
1173                        recvbuf.pointer_mut(),
1174                        recvcount,
1175                        recvbuf.as_datatype().as_raw(),
1176                        self.root_rank(),
1177                        self.as_communicator().as_raw(),
1178                        request,
1179                    )
1180                })
1181                .1,
1182                recvbuf,
1183                scope,
1184            )
1185        }
1186    }
1187
1188    /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1189    ///
1190    /// This function must be called on all non-root processes.
1191    ///
1192    /// # Examples
1193    ///
1194    /// See `examples/immediate_gather_varcount.rs`
1195    ///
1196    /// # Standard section(s)
1197    ///
1198    /// 5.12.3
1199    fn immediate_gather_varcount_into<'a, Sc, S: ?Sized>(
1200        &self,
1201        scope: Sc,
1202        sendbuf: &'a S,
1203    ) -> Request<'a, S, Sc>
1204    where
1205        S: 'a + Buffer,
1206        Sc: Scope<'a>,
1207    {
1208        assert_ne!(self.as_communicator().rank(), self.root_rank());
1209        unsafe {
1210            Request::from_raw(
1211                with_uninitialized(|request| {
1212                    ffi::MPI_Igatherv(
1213                        sendbuf.pointer(),
1214                        sendbuf.count(),
1215                        sendbuf.as_datatype().as_raw(),
1216                        ptr::null_mut(),
1217                        ptr::null(),
1218                        ptr::null(),
1219                        u8::equivalent_datatype().as_raw(),
1220                        self.root_rank(),
1221                        self.as_communicator().as_raw(),
1222                        request,
1223                    )
1224                })
1225                .1,
1226                sendbuf,
1227                scope,
1228            )
1229        }
1230    }
1231
1232    /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1233    ///
1234    /// This function must be called on the root processes.
1235    ///
1236    /// # Examples
1237    ///
1238    /// See `examples/immediate_gather_varcount.rs`
1239    ///
1240    /// # Standard section(s)
1241    ///
1242    /// 5.12.3
1243    fn immediate_gather_varcount_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
1244        &self,
1245        scope: Sc,
1246        sendbuf: &'a S,
1247        recvbuf: &'a mut R,
1248    ) -> Request<'a, R, Sc>
1249    where
1250        S: 'a + Buffer,
1251        R: 'a + PartitionedBufferMut,
1252        Sc: Scope<'a>,
1253    {
1254        assert_eq!(self.as_communicator().rank(), self.root_rank());
1255        unsafe {
1256            Request::from_raw(
1257                with_uninitialized(|request| {
1258                    ffi::MPI_Igatherv(
1259                        sendbuf.pointer(),
1260                        sendbuf.count(),
1261                        sendbuf.as_datatype().as_raw(),
1262                        recvbuf.pointer_mut(),
1263                        recvbuf.counts().as_ptr(),
1264                        recvbuf.displs().as_ptr(),
1265                        recvbuf.as_datatype().as_raw(),
1266                        self.root_rank(),
1267                        self.as_communicator().as_raw(),
1268                        request,
1269                    )
1270                })
1271                .1,
1272                recvbuf,
1273                scope,
1274            )
1275        }
1276    }
1277
1278    /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1279    ///
1280    /// This function must be called on all non-root processes.
1281    ///
1282    /// # Examples
1283    ///
1284    /// See `examples/immediate_scatter.rs`
1285    ///
1286    /// # Standard section(s)
1287    ///
1288    /// 5.12.4
1289    fn immediate_scatter_into<'a, Sc, R: ?Sized>(
1290        &self,
1291        scope: Sc,
1292        recvbuf: &'a mut R,
1293    ) -> Request<'a, R, Sc>
1294    where
1295        R: 'a + BufferMut,
1296        Sc: Scope<'a>,
1297    {
1298        assert_ne!(self.as_communicator().rank(), self.root_rank());
1299        unsafe {
1300            Request::from_raw(
1301                with_uninitialized(|request| {
1302                    ffi::MPI_Iscatter(
1303                        ptr::null(),
1304                        0,
1305                        u8::equivalent_datatype().as_raw(),
1306                        recvbuf.pointer_mut(),
1307                        recvbuf.count(),
1308                        recvbuf.as_datatype().as_raw(),
1309                        self.root_rank(),
1310                        self.as_communicator().as_raw(),
1311                        request,
1312                    )
1313                })
1314                .1,
1315                recvbuf,
1316                scope,
1317            )
1318        }
1319    }
1320
1321    /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1322    ///
1323    /// This function must be called on the root processes.
1324    ///
1325    /// # Examples
1326    ///
1327    /// See `examples/immediate_scatter.rs`
1328    ///
1329    /// # Standard section(s)
1330    ///
1331    /// 5.12.4
1332    fn immediate_scatter_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
1333        &self,
1334        scope: Sc,
1335        sendbuf: &'a S,
1336        recvbuf: &'a mut R,
1337    ) -> Request<'a, R, Sc>
1338    where
1339        S: 'a + Buffer,
1340        R: 'a + BufferMut,
1341        Sc: Scope<'a>,
1342    {
1343        assert_eq!(self.as_communicator().rank(), self.root_rank());
1344        unsafe {
1345            let sendcount = sendbuf.count() / self.as_communicator().target_size();
1346            Request::from_raw(
1347                with_uninitialized(|request| {
1348                    ffi::MPI_Iscatter(
1349                        sendbuf.pointer(),
1350                        sendcount,
1351                        sendbuf.as_datatype().as_raw(),
1352                        recvbuf.pointer_mut(),
1353                        recvbuf.count(),
1354                        recvbuf.as_datatype().as_raw(),
1355                        self.root_rank(),
1356                        self.as_communicator().as_raw(),
1357                        request,
1358                    )
1359                })
1360                .1,
1361                recvbuf,
1362                scope,
1363            )
1364        }
1365    }
1366
1367    /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1368    ///
1369    /// This function must be called on all non-root processes.
1370    ///
1371    /// # Examples
1372    ///
1373    /// See `examples/immediate_scatter_varcount.rs`
1374    ///
1375    /// # Standard section(s)
1376    ///
1377    /// 5.12.4
1378    fn immediate_scatter_varcount_into<'a, Sc, R: ?Sized>(
1379        &self,
1380        scope: Sc,
1381        recvbuf: &'a mut R,
1382    ) -> Request<'a, R, Sc>
1383    where
1384        R: 'a + BufferMut,
1385        Sc: Scope<'a>,
1386    {
1387        assert_ne!(self.as_communicator().rank(), self.root_rank());
1388        unsafe {
1389            Request::from_raw(
1390                with_uninitialized(|request| {
1391                    ffi::MPI_Iscatterv(
1392                        ptr::null(),
1393                        ptr::null(),
1394                        ptr::null(),
1395                        u8::equivalent_datatype().as_raw(),
1396                        recvbuf.pointer_mut(),
1397                        recvbuf.count(),
1398                        recvbuf.as_datatype().as_raw(),
1399                        self.root_rank(),
1400                        self.as_communicator().as_raw(),
1401                        request,
1402                    )
1403                })
1404                .1,
1405                recvbuf,
1406                scope,
1407            )
1408        }
1409    }
1410
1411    /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1412    ///
1413    /// This function must be called on the root processes.
1414    ///
1415    /// # Examples
1416    ///
1417    /// See `examples/immediate_scatter_varcount.rs`
1418    ///
1419    /// # Standard section(s)
1420    ///
1421    /// 5.12.4
1422    fn immediate_scatter_varcount_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
1423        &self,
1424        scope: Sc,
1425        sendbuf: &'a S,
1426        recvbuf: &'a mut R,
1427    ) -> Request<'a, R, Sc>
1428    where
1429        S: 'a + PartitionedBuffer,
1430        R: 'a + BufferMut,
1431        Sc: Scope<'a>,
1432    {
1433        assert_eq!(self.as_communicator().rank(), self.root_rank());
1434        unsafe {
1435            Request::from_raw(
1436                with_uninitialized(|request| {
1437                    ffi::MPI_Iscatterv(
1438                        sendbuf.pointer(),
1439                        sendbuf.counts().as_ptr(),
1440                        sendbuf.displs().as_ptr(),
1441                        sendbuf.as_datatype().as_raw(),
1442                        recvbuf.pointer_mut(),
1443                        recvbuf.count(),
1444                        recvbuf.as_datatype().as_raw(),
1445                        self.root_rank(),
1446                        self.as_communicator().as_raw(),
1447                        request,
1448                    )
1449                })
1450                .1,
1451                recvbuf,
1452                scope,
1453            )
1454        }
1455    }
1456
1457    /// Initiates a non-blacking global reduction under the operation `op` of the input data in
1458    /// `sendbuf` and stores the result on the `Root` process.
1459    ///
1460    /// This function must be called on all non-root processes.
1461    ///
1462    /// # Examples
1463    ///
1464    /// See `examples/immediate_reduce.rs`
1465    ///
1466    /// # Standard section(s)
1467    ///
1468    /// 5.12.7
1469    fn immediate_reduce_into<'a, Sc, S: ?Sized, O>(
1470        &self,
1471        scope: Sc,
1472        sendbuf: &'a S,
1473        op: O,
1474    ) -> Request<'a, S, Sc>
1475    where
1476        S: 'a + Buffer,
1477        O: 'a + Operation,
1478        Sc: Scope<'a>,
1479    {
1480        assert_ne!(self.as_communicator().rank(), self.root_rank());
1481        unsafe {
1482            Request::from_raw(
1483                with_uninitialized(|request| {
1484                    ffi::MPI_Ireduce(
1485                        sendbuf.pointer(),
1486                        ptr::null_mut(),
1487                        sendbuf.count(),
1488                        sendbuf.as_datatype().as_raw(),
1489                        op.as_raw(),
1490                        self.root_rank(),
1491                        self.as_communicator().as_raw(),
1492                        request,
1493                    )
1494                })
1495                .1,
1496                sendbuf,
1497                scope,
1498            )
1499        }
1500    }
1501
1502    /// Initiates a non-blocking global reduction under the operation `op` of the input data in
1503    /// `sendbuf` and stores the result on the `Root` process.
1504    ///
1505    /// # Examples
1506    ///
1507    /// See `examples/immediate_reduce.rs`
1508    ///
1509    /// This function must be called on the root process.
1510    ///
1511    /// # Standard section(s)
1512    ///
1513    /// 5.12.7
1514    fn immediate_reduce_into_root<'a, Sc, S: ?Sized, R: ?Sized, O>(
1515        &self,
1516        scope: Sc,
1517        sendbuf: &'a S,
1518        recvbuf: &'a mut R,
1519        op: O,
1520    ) -> Request<'a, R, Sc>
1521    where
1522        S: 'a + Buffer,
1523        R: 'a + BufferMut,
1524        O: 'a + Operation,
1525        Sc: Scope<'a>,
1526    {
1527        assert_eq!(self.as_communicator().rank(), self.root_rank());
1528        unsafe {
1529            Request::from_raw(
1530                with_uninitialized(|request| {
1531                    ffi::MPI_Ireduce(
1532                        sendbuf.pointer(),
1533                        recvbuf.pointer_mut(),
1534                        sendbuf.count(),
1535                        sendbuf.as_datatype().as_raw(),
1536                        op.as_raw(),
1537                        self.root_rank(),
1538                        self.as_communicator().as_raw(),
1539                        request,
1540                    )
1541                })
1542                .1,
1543                recvbuf,
1544                scope,
1545            )
1546        }
1547    }
1548
1549    /// Spawns child processes
1550    ///
1551    /// # Standard sections
1552    /// 10.3.2, see MPI_Comm_spawn
1553    fn spawn(&self, command: &Command, maxprocs: Rank) -> Result<InterCommunicator, MpiError> {
1554        // Environment variables can be handled using the info key
1555        assert_eq!(
1556            command.get_envs().len(),
1557            0,
1558            "Support for environment variables not yet implemented"
1559        );
1560
1561        // The Microsoft-MPI implementation treats the char* arguments as being
1562        // encoded in utf8, and are internally converted to Windows wide
1563        // characters. See ConvertArgs() using
1564        // [`MultiByteToWideChar`](https://learn.microsoft.com/en-us/windows/win32/api/stringapiset/nf-stringapiset-multibytetowidechar)
1565        // with `CP_UTF8` when called from MPIDI_Comm_spawn_multiple().
1566        // https://github.com/microsoft/Microsoft-MPI/blob/7ff6bdcdb1d5dc7b791e47457ee2686cd6b3d355/src/mpi/msmpi/mpid/dynamic.cpp#L2074
1567        //
1568        // Since Windows wide-char strings are allowed to contain invalid
1569        // UTF-16, such characters cannot be preserved. We'll choose lossy
1570        // conversion, but returning an error is an option to consider.
1571        let prog = CString::new(command.get_program().to_string_lossy().as_bytes())?;
1572        let mut args: Vec<CString> = command
1573            .get_args()
1574            .map(|os| CString::new(os.to_string_lossy().as_bytes()))
1575            .collect::<Result<Vec<CString>, NulError>>()?;
1576        // We must retain args above so that the strings are not dropped while
1577        // being used. An alternative that seems to be recommended any time the
1578        // function takes mutable C strings is to use CString::into_raw to give
1579        // ownership to the *mut c_char for the function call, then reclaim
1580        // using CString::from_raw.
1581        let mut argv: Vec<*mut c_char> = args
1582            .iter_mut()
1583            .map(|s| s.as_ptr() as *mut c_char)
1584            .chain(std::iter::once(ptr::null_mut()))
1585            .collect();
1586
1587        let mut result = unsafe { ffi::RSMPI_COMM_NULL };
1588        let mut errcodes: Vec<c_int> =
1589            vec![0; maxprocs.value_as().expect("maxprocs should be positive")];
1590
1591        unsafe {
1592            ffi::MPI_Comm_spawn(
1593                prog.as_ptr(),
1594                argv.as_mut_ptr(),
1595                maxprocs,
1596                ffi::RSMPI_INFO_NULL,
1597                self.root_rank(),
1598                self.as_communicator().as_raw(),
1599                &mut result,
1600                errcodes.as_mut_ptr(),
1601            );
1602        }
1603        let fails = errcodes
1604            .into_iter()
1605            .filter(|&c| c != ffi::MPI_SUCCESS as i32)
1606            .count();
1607        if fails > 0 {
1608            Err(MpiError::Spawn(Rank::try_from(fails).unwrap(), maxprocs))
1609        } else {
1610            Ok(unsafe { InterCommunicator::from_raw(result) })
1611        }
1612    }
1613
1614    /// Spawns child processes
1615    ///
1616    /// # Standard sections
1617    /// 10.3.3, see MPI_Comm_spawn_multiple
1618    fn spawn_multiple(
1619        &self,
1620        commands: &[Command],
1621        maxprocs: &[Rank],
1622    ) -> Result<InterCommunicator, MpiError> {
1623        assert_eq!(commands.len(), maxprocs.len());
1624
1625        let progs = commands
1626            .iter()
1627            .map(|c| CString::new(c.get_program().to_string_lossy().as_bytes()))
1628            .collect::<Result<Vec<CString>, NulError>>()?;
1629        let mut progp: Vec<*mut c_char> = progs.iter().map(|p| p.as_ptr() as *mut c_char).collect();
1630        let mut argss = commands
1631            .iter()
1632            .map(|c| {
1633                c.get_args()
1634                    .map(|os| CString::new(os.to_string_lossy().as_bytes()))
1635                    .collect::<Result<Vec<CString>, NulError>>()
1636            })
1637            .collect::<Result<Vec<Vec<CString>>, NulError>>()?;
1638        let mut argvs: Vec<Vec<*mut c_char>> = argss
1639            .iter_mut()
1640            .map(|args| {
1641                args.iter_mut()
1642                    .map(|a| a.as_ptr() as *mut c_char)
1643                    .chain(std::iter::once(ptr::null_mut()))
1644                    .collect()
1645            })
1646            .collect();
1647
1648        let mut argvv: Vec<*mut *mut c_char> =
1649            argvs.iter_mut().map(|argv| argv.as_mut_ptr()).collect();
1650
1651        let infos: Vec<_> = (0..commands.len())
1652            .map(|_| unsafe { ffi::RSMPI_INFO_NULL })
1653            .collect();
1654
1655        let mut result = unsafe { ffi::RSMPI_COMM_NULL };
1656        let sum_maxprocs: Rank = maxprocs.iter().sum();
1657        let mut errcodes = vec![0; usize::try_from(sum_maxprocs).unwrap()];
1658
1659        unsafe {
1660            ffi::MPI_Comm_spawn_multiple(
1661                progs.len().value_as().unwrap(),
1662                progp.as_mut_ptr(),
1663                argvv.as_mut_ptr(),
1664                maxprocs.as_ptr(),
1665                infos.as_ptr(),
1666                self.root_rank(),
1667                self.as_communicator().as_raw(),
1668                &mut result,
1669                errcodes.as_mut_ptr(),
1670            );
1671        }
1672        let fails = errcodes
1673            .into_iter()
1674            .filter(|&c| c != ffi::MPI_SUCCESS as i32)
1675            .count();
1676        if fails > 0 {
1677            Err(MpiError::Spawn(
1678                Rank::try_from(fails).unwrap(),
1679                sum_maxprocs,
1680            ))
1681        } else {
1682            Ok(unsafe { InterCommunicator::from_raw(result) })
1683        }
1684    }
1685}
1686
1687impl<'a> Root for Process<'a> {
1688    fn root_rank(&self) -> Rank {
1689        self.rank()
1690    }
1691}
1692
1693/// An operation to be used in a reduction or scan type operation, e.g. `MPI_SUM`
1694pub trait Operation: AsRaw<Raw = MPI_Op> {
1695    /// Returns whether the operation is commutative.
1696    ///
1697    /// # Standard section(s)
1698    ///
1699    /// 5.9.7
1700    fn is_commutative(&self) -> bool {
1701        unsafe {
1702            let mut commute = 0;
1703            ffi::MPI_Op_commutative(self.as_raw(), &mut commute);
1704            commute != 0
1705        }
1706    }
1707}
1708impl<'a, T: 'a + Operation> Operation for &'a T {}
1709
1710/// A built-in operation like `MPI_SUM`
1711///
1712/// # Examples
1713///
1714/// See `examples/reduce.rs`
1715///
1716/// # Standard section(s)
1717///
1718/// 5.9.2
1719#[derive(Copy, Clone)]
1720pub struct SystemOperation(MPI_Op);
1721
1722macro_rules! system_operation_constructors {
1723    ($($ctor:ident => $val:path),*) => (
1724        $(pub fn $ctor() -> SystemOperation {
1725            //! A built-in operation
1726            SystemOperation(unsafe { $val })
1727        })*
1728    )
1729}
1730
1731impl SystemOperation {
1732    system_operation_constructors! {
1733        max => ffi::RSMPI_MAX,
1734        min => ffi::RSMPI_MIN,
1735        sum => ffi::RSMPI_SUM,
1736        product => ffi::RSMPI_PROD,
1737        logical_and => ffi::RSMPI_LAND,
1738        bitwise_and => ffi::RSMPI_BAND,
1739        logical_or => ffi::RSMPI_LOR,
1740        bitwise_or => ffi::RSMPI_BOR,
1741        logical_xor => ffi::RSMPI_LXOR,
1742        bitwise_xor => ffi::RSMPI_BXOR
1743    }
1744}
1745
1746unsafe impl AsRaw for SystemOperation {
1747    type Raw = MPI_Op;
1748    fn as_raw(&self) -> Self::Raw {
1749        self.0
1750    }
1751}
1752
1753impl Operation for SystemOperation {}
1754
1755#[cfg(feature = "user-operations")]
1756trait Erased {}
1757
1758#[cfg(feature = "user-operations")]
1759impl<T> Erased for T {}
1760
1761/// A user-defined operation.
1762///
1763/// The lifetime `'a` of the operation is limited by the lifetime of the underlying closure.
1764///
1765/// For safety reasons, `UserOperation` is in of itself not considered an `Operation`, but a
1766/// reference of it is.  This limitation may be lifted in the future when `Request` objects can
1767/// store finalizers.
1768///
1769/// **Note:** When a `UserOperation` is passed to a non-blocking API call, it must outlive the
1770/// completion of the request.  This is normally enforced by the safe API, so this is only a concern
1771/// if you use the unsafe API.  Do not rely on MPI's internal reference-counting here, because once
1772/// `UserOperation` is destroyed, the closure object will be deallocated even if the `MPI_Op` handle
1773/// is still alive due to outstanding references.
1774///
1775/// # Examples
1776///
1777/// See `examples/reduce.rs` and `examples/immediate_reduce.rs`
1778#[cfg(feature = "user-operations")]
1779pub struct UserOperation<'a> {
1780    op: MPI_Op,
1781    _anchor: Box<dyn Erased + 'a>, // keeps the internal data alive
1782}
1783
1784#[cfg(feature = "user-operations")]
1785impl<'a> fmt::Debug for UserOperation<'a> {
1786    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1787        f.debug_tuple("UserOperation").field(&self.op).finish()
1788    }
1789}
1790
1791#[cfg(feature = "user-operations")]
1792impl<'a> Drop for UserOperation<'a> {
1793    fn drop(&mut self) {
1794        unsafe {
1795            ffi::MPI_Op_free(&mut self.op);
1796        }
1797    }
1798}
1799
1800#[cfg(feature = "user-operations")]
1801unsafe impl<'a> AsRaw for UserOperation<'a> {
1802    type Raw = MPI_Op;
1803    fn as_raw(&self) -> Self::Raw {
1804        self.op
1805    }
1806}
1807
1808#[cfg(feature = "user-operations")]
1809impl<'a, 'b> Operation for &'b UserOperation<'a> {}
1810
1811#[cfg(feature = "user-operations")]
1812impl<'a> UserOperation<'a> {
1813    /// Define an operation using a closure.  The operation must be associative.
1814    ///
1815    /// This is a more readable shorthand for the `new` method.  Refer to [`new`](#method.new) for
1816    /// more information.
1817    pub fn associative<F>(function: F) -> Self
1818    where
1819        F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
1820    {
1821        Self::new(false, function)
1822    }
1823
1824    /// Define an operation using a closure.  The operation must be both associative and
1825    /// commutative.
1826    ///
1827    /// This is a more readable shorthand for the `new` method.  Refer to [`new`](#method.new) for
1828    /// more information.
1829    pub fn commutative<F>(function: F) -> Self
1830    where
1831        F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
1832    {
1833        Self::new(true, function)
1834    }
1835
1836    /// Creates an associative and possibly commutative operation using a closure.
1837    ///
1838    /// The closure receives two arguments `invec` and `inoutvec` as dynamically typed buffers.  It
1839    /// shall set `inoutvec` to the value of `f(invec, inoutvec)`, where `f` is a binary associative
1840    /// operation.
1841    ///
1842    /// If the operation is also commutative, setting `commute` to `true` may yield performance
1843    /// benefits.
1844    ///
1845    /// **Note:** If the closure panics, the entire program will abort.
1846    ///
1847    /// # Standard section(s)
1848    ///
1849    /// 5.9.5
1850    pub fn new<F>(commute: bool, function: F) -> Self
1851    where
1852        F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
1853    {
1854        struct ClosureAnchor<F> {
1855            rust_closure: F,
1856            _ffi_closure: Option<Closure<'static>>,
1857        }
1858
1859        // must box it to prevent moves
1860        let mut anchor = Box::new(ClosureAnchor {
1861            rust_closure: function,
1862            _ffi_closure: None,
1863        });
1864
1865        let args = [
1866            Type::pointer(), // void *
1867            Type::pointer(), // void *
1868            Type::pointer(), // int32_t *
1869            Type::pointer(), // MPI_Datatype *
1870        ];
1871        #[allow(unused_mut)]
1872        let mut cif = Cif::new(args.iter().cloned(), Type::void());
1873        // MS-MPI uses "stdcall" calling convention on 32-bit x86
1874        #[cfg(all(msmpi, target_arch = "x86"))]
1875        cif.set_abi(libffi::raw::ffi_abi_FFI_STDCALL);
1876
1877        unsafe extern "C" fn trampoline<'a, F: Fn(DynBuffer, DynBufferMut) + Sync + 'a>(
1878            cif: &libffi::low::ffi_cif,
1879            _result: &mut c_void,
1880            args: *const *const c_void,
1881            user_function: &F,
1882        ) {
1883            debug_assert_eq!(4, cif.nargs);
1884
1885            let (mut invec, mut inoutvec, len, datatype) = (
1886                *(*args.offset(0) as *const *mut c_void),
1887                *(*args.offset(1) as *const *mut c_void),
1888                *(*args.offset(2) as *const *mut i32),
1889                *(*args.offset(3) as *const *mut ffi::MPI_Datatype),
1890            );
1891
1892            let len = *len;
1893            let datatype = DatatypeRef::from_raw(*datatype);
1894            if len == 0 {
1895                // precautionary measure: ensure pointers are not null
1896                invec = [].as_mut_ptr();
1897                inoutvec = [].as_mut_ptr();
1898            }
1899
1900            user_function(
1901                DynBuffer::from_raw(invec, len, datatype),
1902                DynBufferMut::from_raw(inoutvec, len, datatype),
1903            )
1904        }
1905
1906        let op;
1907        anchor._ffi_closure = Some(unsafe {
1908            let ffi_closure = Closure::new(cif, trampoline, &anchor.rust_closure);
1909            op = with_uninitialized(|op| {
1910                ffi::MPI_Op_create(Some(*ffi_closure.instantiate_code_ptr()), commute as _, op)
1911            })
1912            .1;
1913            mem::transmute(ffi_closure) // erase the lifetime
1914        });
1915        UserOperation {
1916            op,
1917            _anchor: anchor,
1918        }
1919    }
1920
1921    /// Creates a `UserOperation` from raw parts.
1922    ///
1923    /// Here, `anchor` is an arbitrary object that is stored alongside the `MPI_Op`.
1924    /// This can be used to attach finalizers to the object.
1925    ///
1926    /// # Safety
1927    /// MPI_Op must not be MPI_OP_NULL
1928    pub unsafe fn from_raw<T: 'a>(op: MPI_Op, anchor: Box<T>) -> Self {
1929        Self {
1930            op,
1931            _anchor: anchor,
1932        }
1933    }
1934}
1935
1936/// An unsafe user-defined operation.
1937///
1938/// Unsafe user-defined operations are created from pointers to functions that have the unsafe
1939/// signatures of user functions defined in the MPI C bindings, `UnsafeUserFunction`.
1940///
1941/// The recommended way to create user-defined operations is through the safer `UserOperation`
1942/// type. This type can be used as a work-around in situations where the `libffi` dependency is not
1943/// available.
1944pub struct UnsafeUserOperation {
1945    op: MPI_Op,
1946}
1947
1948impl fmt::Debug for UnsafeUserOperation {
1949    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1950        f.debug_tuple("UnsafeUserOperation")
1951            .field(&self.op)
1952            .finish()
1953    }
1954}
1955
1956impl Drop for UnsafeUserOperation {
1957    fn drop(&mut self) {
1958        unsafe {
1959            ffi::MPI_Op_free(&mut self.op);
1960        }
1961    }
1962}
1963
1964unsafe impl AsRaw for UnsafeUserOperation {
1965    type Raw = MPI_Op;
1966    fn as_raw(&self) -> Self::Raw {
1967        self.op
1968    }
1969}
1970
1971impl<'a> Operation for &'a UnsafeUserOperation {}
1972
1973/// A raw pointer to a function that can be used to define an `UnsafeUserOperation`.
1974#[cfg(not(all(msmpi, target_arch = "x86")))]
1975pub type UnsafeUserFunction =
1976    unsafe extern "C" fn(*mut c_void, *mut c_void, *mut c_int, *mut ffi::MPI_Datatype);
1977
1978/// A raw pointer to a function that can be used to define an `UnsafeUserOperation`.
1979///
1980/// MS-MPI uses "stdcall" rather than "C" calling convention. "stdcall" is ignored on x86_64
1981/// Windows and the default calling convention is used instead.
1982#[cfg(all(msmpi, target_arch = "x86"))]
1983pub type UnsafeUserFunction =
1984    unsafe extern "stdcall" fn(*mut c_void, *mut c_void, *mut c_int, *mut ffi::MPI_Datatype);
1985
1986impl UnsafeUserOperation {
1987    /// Define an unsafe operation using a function pointer. The operation must be associative.
1988    ///
1989    /// This is a more readable shorthand for the `new` method.  Refer to [`new`](#method.new) for
1990    /// more information.
1991    ///
1992    /// # Safety
1993    /// The construction of an `UnsafeUserOperation` asserts that `function` is safe to be called
1994    /// in all reductions that this `UnsafeUserOperation` is used in.
1995    pub unsafe fn associative(function: UnsafeUserFunction) -> Self {
1996        Self::new(false, function)
1997    }
1998
1999    /// Define an unsafe operation using a function pointer.  The operation must be both associative
2000    /// and commutative.
2001    ///
2002    /// This is a more readable shorthand for the `new` method.  Refer to [`new`](#method.new) for
2003    /// more information.
2004    ///
2005    /// # Safety
2006    /// The construction of an `UnsafeUserOperation` asserts that `function` is safe to be called
2007    /// in all reductions that this `UnsafeUserOperation` is used in.
2008    pub unsafe fn commutative(function: UnsafeUserFunction) -> Self {
2009        Self::new(true, function)
2010    }
2011
2012    /// Creates an associative and possibly commutative unsafe operation using a function pointer.
2013    ///
2014    /// The function receives raw `*mut c_void` as `invec` and `inoutvec` and the number of elemnts
2015    /// of those two vectors as a `*mut c_int` `len`. It shall set `inoutvec`
2016    /// to the value of `f(invec, inoutvec)`, where `f` is a binary associative operation.
2017    ///
2018    /// If the operation is also commutative, setting `commute` to `true` may yield performance
2019    /// benefits.
2020    ///
2021    /// **Note:** The user function is not allowed to panic.
2022    ///
2023    /// # Standard section(s)
2024    ///
2025    /// 5.9.5
2026    ///
2027    /// # Safety
2028    /// The construction of an `UnsafeUserOperation` asserts that `function` is safe to be called
2029    /// in all reductions that this `UnsafeUserOperation` is used in.
2030    pub unsafe fn new(commute: bool, function: UnsafeUserFunction) -> Self {
2031        UnsafeUserOperation {
2032            op: with_uninitialized(|op| ffi::MPI_Op_create(Some(function), commute as _, op)).1,
2033        }
2034    }
2035}
2036
2037/// Perform a local reduction.
2038///
2039/// # Examples
2040///
2041/// See `examples/reduce.rs`
2042///
2043/// # Standard section(s)
2044///
2045/// 5.9.7
2046#[allow(clippy::needless_pass_by_value)]
2047pub fn reduce_local_into<S: ?Sized, R: ?Sized, O>(inbuf: &S, inoutbuf: &mut R, op: O)
2048where
2049    S: Buffer,
2050    R: BufferMut,
2051    O: Operation,
2052{
2053    unsafe {
2054        ffi::MPI_Reduce_local(
2055            inbuf.pointer(),
2056            inoutbuf.pointer_mut(),
2057            inbuf.count(),
2058            inbuf.as_datatype().as_raw(),
2059            op.as_raw(),
2060        );
2061    }
2062}