mpi_fork_fnsp/
collective.rs

1//! Collective communication
2//!
3//! Developing...
4//!
5//! # Unfinished features
6//!
7//! - **5.8**: All-to-all, `MPI_Alltoallw()`
8//! - **5.10**: Reduce-scatter, `MPI_Reduce_scatter()`
9//! - **5.12**: Nonblocking collective operations,
10//! `MPI_Ialltoallw()`, `MPI_Ireduce_scatter()`
11#![allow(
12    clippy::missing_panics_doc,
13    clippy::ptr_as_ptr,
14    clippy::used_underscore_binding
15)]
16
17#[cfg(feature = "user-operations")]
18use std::mem;
19use std::os::raw::{c_int, c_void};
20use std::{fmt, ptr};
21
22#[cfg(feature = "user-operations")]
23use libffi::middle::{Cif, Closure, Type};
24
25use crate::ffi;
26use crate::ffi::MPI_Op;
27
28use crate::datatype::traits::{
29    Buffer, BufferMut, Equivalence, PartitionedBuffer, PartitionedBufferMut,
30};
31#[cfg(feature = "user-operations")]
32use crate::datatype::{DatatypeRef, DynBuffer, DynBufferMut};
33use crate::raw::traits::{AsRaw, FromRaw};
34use crate::request::{Request, Scope, StaticScope};
35use crate::topology::traits::{AsCommunicator, Communicator};
36use crate::topology::{Process, Rank};
37use crate::with_uninitialized;
38
39/// Collective communication traits
40pub mod traits {
41    pub use super::{CommunicatorCollectives, Operation, Root};
42}
43
44/// Collective communication patterns defined on `Communicator`s
45pub trait CommunicatorCollectives: Communicator {
46    /// Barrier synchronization among all processes in a `Communicator`
47    ///
48    /// Partake in a barrier synchronization across all processes in the `Communicator` `&self`.
49    ///
50    /// Calling processes (or threads within the calling processes) will enter the barrier and block
51    /// execution until all processes in the `Communicator` `&self` have entered the barrier.
52    ///
53    /// # Examples
54    ///
55    /// See `examples/barrier.rs`
56    ///
57    /// # Standard section(s)
58    ///
59    /// 5.3
60    fn barrier(&self) {
61        unsafe {
62            ffi::MPI_Barrier(self.as_raw());
63        }
64    }
65
66    /// Gather contents of buffers on all participating processes.
67    ///
68    /// After the call completes, the contents of the send `Buffer`s on all processes will be
69    /// concatenated into the receive `Buffer`s on all ranks.
70    ///
71    /// All send `Buffer`s must contain the same count of elements.
72    ///
73    /// # Examples
74    ///
75    /// See `examples/all_gather.rs`
76    ///
77    /// # Standard section(s)
78    ///
79    /// 5.7
80    fn all_gather_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
81    where
82        S: Buffer,
83        R: BufferMut,
84    {
85        unsafe {
86            ffi::MPI_Allgather(
87                sendbuf.pointer(),
88                sendbuf.count(),
89                sendbuf.as_datatype().as_raw(),
90                recvbuf.pointer_mut(),
91                recvbuf.count() / self.size(),
92                recvbuf.as_datatype().as_raw(),
93                self.as_raw(),
94            );
95        }
96    }
97
98    /// Gather contents of buffers on all participating processes.
99    ///
100    /// After the call completes, the contents of the send `Buffer`s on all processes will be
101    /// concatenated into the receive `Buffer`s on all ranks.
102    ///
103    /// The send `Buffer`s may contain different counts of elements on different processes. The
104    /// distribution of elements in the receive `Buffer`s is specified via `Partitioned`.
105    ///
106    /// # Examples
107    ///
108    /// See `examples/all_gather_varcount.rs`
109    ///
110    /// # Standard section(s)
111    ///
112    /// 5.7
113    fn all_gather_varcount_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
114    where
115        S: Buffer,
116        R: PartitionedBufferMut,
117    {
118        unsafe {
119            ffi::MPI_Allgatherv(
120                sendbuf.pointer(),
121                sendbuf.count(),
122                sendbuf.as_datatype().as_raw(),
123                recvbuf.pointer_mut(),
124                recvbuf.counts().as_ptr(),
125                recvbuf.displs().as_ptr(),
126                recvbuf.as_datatype().as_raw(),
127                self.as_raw(),
128            );
129        }
130    }
131
132    /// Distribute the send `Buffer`s from all processes to the receive `Buffer`s on all processes.
133    ///
134    /// Each process sends and receives the same count of elements to and from each process.
135    ///
136    /// # Examples
137    ///
138    /// See `examples/all_to_all.rs`
139    ///
140    /// # Standard section(s)
141    ///
142    /// 5.8
143    fn all_to_all_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
144    where
145        S: Buffer,
146        R: BufferMut,
147    {
148        let c_size = self.size();
149        unsafe {
150            ffi::MPI_Alltoall(
151                sendbuf.pointer(),
152                sendbuf.count() / c_size,
153                sendbuf.as_datatype().as_raw(),
154                recvbuf.pointer_mut(),
155                recvbuf.count() / c_size,
156                recvbuf.as_datatype().as_raw(),
157                self.as_raw(),
158            );
159        }
160    }
161
162    /// Distribute the send `Buffer`s from all processes to the receive `Buffer`s on all processes.
163    ///
164    /// The count of elements to send and receive to and from each process can vary and is specified
165    /// using `Partitioned`.
166    ///
167    /// # Standard section(s)
168    ///
169    /// 5.8
170    fn all_to_all_varcount_into<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
171    where
172        S: PartitionedBuffer,
173        R: PartitionedBufferMut,
174    {
175        unsafe {
176            ffi::MPI_Alltoallv(
177                sendbuf.pointer(),
178                sendbuf.counts().as_ptr(),
179                sendbuf.displs().as_ptr(),
180                sendbuf.as_datatype().as_raw(),
181                recvbuf.pointer_mut(),
182                recvbuf.counts().as_ptr(),
183                recvbuf.displs().as_ptr(),
184                recvbuf.as_datatype().as_raw(),
185                self.as_raw(),
186            );
187        }
188    }
189
190    /// Performs a global reduction under the operation `op` of the input data in `sendbuf` and
191    /// stores the result in `recvbuf` on all processes.
192    ///
193    /// # Examples
194    ///
195    /// See `examples/reduce.rs`
196    ///
197    /// # Standard section(s)
198    ///
199    /// 5.9.6
200    fn all_reduce_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
201    where
202        S: Buffer,
203        R: BufferMut,
204        O: Operation,
205    {
206        unsafe {
207            ffi::MPI_Allreduce(
208                sendbuf.pointer(),
209                recvbuf.pointer_mut(),
210                sendbuf.count(),
211                sendbuf.as_datatype().as_raw(),
212                op.as_raw(),
213                self.as_raw(),
214            );
215        }
216    }
217
218    /// Performs an element-wise global reduction under the operation `op` of the input data in
219    /// `sendbuf` and scatters the result into equal sized blocks in the receive buffers on all
220    /// processes.
221    ///
222    /// # Examples
223    ///
224    /// See `examples/reduce.rs`
225    ///
226    /// # Standard section(s)
227    ///
228    /// 5.10.1
229    fn reduce_scatter_block_into<S: ?Sized, R: ?Sized, O>(
230        &self,
231        sendbuf: &S,
232        recvbuf: &mut R,
233        op: O,
234    ) where
235        S: Buffer,
236        R: BufferMut,
237        O: Operation,
238    {
239        assert_eq!(recvbuf.count() * self.size(), sendbuf.count());
240        unsafe {
241            ffi::MPI_Reduce_scatter_block(
242                sendbuf.pointer(),
243                recvbuf.pointer_mut(),
244                recvbuf.count(),
245                sendbuf.as_datatype().as_raw(),
246                op.as_raw(),
247                self.as_raw(),
248            );
249        }
250    }
251
252    /// Performs a global inclusive prefix reduction of the data in `sendbuf` into `recvbuf` under
253    /// operation `op`.
254    ///
255    /// # Examples
256    ///
257    /// See `examples/scan.rs`
258    ///
259    /// # Standard section(s)
260    ///
261    /// 5.11.1
262    fn scan_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
263    where
264        S: Buffer,
265        R: BufferMut,
266        O: Operation,
267    {
268        unsafe {
269            ffi::MPI_Scan(
270                sendbuf.pointer(),
271                recvbuf.pointer_mut(),
272                sendbuf.count(),
273                sendbuf.as_datatype().as_raw(),
274                op.as_raw(),
275                self.as_raw(),
276            );
277        }
278    }
279
280    /// Performs a global exclusive prefix reduction of the data in `sendbuf` into `recvbuf` under
281    /// operation `op`.
282    ///
283    /// # Examples
284    ///
285    /// See `examples/scan.rs`
286    ///
287    /// # Standard section(s)
288    ///
289    /// 5.11.2
290    fn exclusive_scan_into<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
291    where
292        S: Buffer,
293        R: BufferMut,
294        O: Operation,
295    {
296        unsafe {
297            ffi::MPI_Exscan(
298                sendbuf.pointer(),
299                recvbuf.pointer_mut(),
300                sendbuf.count(),
301                sendbuf.as_datatype().as_raw(),
302                op.as_raw(),
303                self.as_raw(),
304            );
305        }
306    }
307
308    /// Non-blocking barrier synchronization among all processes in a `Communicator`
309    ///
310    /// Calling processes (or threads within the calling processes) enter the barrier. Completion
311    /// methods on the associated request object will block until all processes have entered.
312    ///
313    /// # Examples
314    ///
315    /// See `examples/immediate_barrier.rs`
316    ///
317    /// # Standard section(s)
318    ///
319    /// 5.12.1
320    fn immediate_barrier(&self) -> Request<'static> {
321        unsafe {
322            Request::from_raw(
323                with_uninitialized(|request| ffi::MPI_Ibarrier(self.as_raw(), request)).1,
324                StaticScope,
325            )
326        }
327    }
328
329    /// Initiate non-blocking gather of the contents of all `sendbuf`s into all `rcevbuf`s on all
330    /// processes in the communicator.
331    ///
332    /// # Examples
333    ///
334    /// See `examples/immediate_all_gather.rs`
335    ///
336    /// # Standard section(s)
337    ///
338    /// 5.12.5
339    fn immediate_all_gather_into<'a, Sc, S: ?Sized, R: ?Sized>(
340        &self,
341        scope: Sc,
342        sendbuf: &'a S,
343        recvbuf: &'a mut R,
344    ) -> Request<'a, Sc>
345    where
346        S: 'a + Buffer,
347        R: 'a + BufferMut,
348        Sc: Scope<'a>,
349    {
350        unsafe {
351            let recvcount = recvbuf.count() / self.size();
352            Request::from_raw(
353                with_uninitialized(|request| {
354                    ffi::MPI_Iallgather(
355                        sendbuf.pointer(),
356                        sendbuf.count(),
357                        sendbuf.as_datatype().as_raw(),
358                        recvbuf.pointer_mut(),
359                        recvcount,
360                        recvbuf.as_datatype().as_raw(),
361                        self.as_raw(),
362                        request,
363                    )
364                })
365                .1,
366                scope,
367            )
368        }
369    }
370
371    /// Initiate non-blocking gather of the contents of all `sendbuf`s into all `rcevbuf`s on all
372    /// processes in the communicator.
373    ///
374    /// # Examples
375    ///
376    /// See `examples/immediate_all_gather_varcount.rs`
377    ///
378    /// # Standard section(s)
379    ///
380    /// 5.12.5
381    fn immediate_all_gather_varcount_into<'a, Sc, S: ?Sized, R: ?Sized>(
382        &self,
383        scope: Sc,
384        sendbuf: &'a S,
385        recvbuf: &'a mut R,
386    ) -> Request<'a, Sc>
387    where
388        S: 'a + Buffer,
389        R: 'a + PartitionedBufferMut,
390        Sc: Scope<'a>,
391    {
392        unsafe {
393            Request::from_raw(
394                with_uninitialized(|request| {
395                    ffi::MPI_Iallgatherv(
396                        sendbuf.pointer(),
397                        sendbuf.count(),
398                        sendbuf.as_datatype().as_raw(),
399                        recvbuf.pointer_mut(),
400                        recvbuf.counts().as_ptr(),
401                        recvbuf.displs().as_ptr(),
402                        recvbuf.as_datatype().as_raw(),
403                        self.as_raw(),
404                        request,
405                    )
406                })
407                .1,
408                scope,
409            )
410        }
411    }
412
413    /// Initiate non-blocking all-to-all communication.
414    ///
415    /// # Examples
416    ///
417    /// See `examples/immediate_all_to_all.rs`
418    ///
419    /// # Standard section(s)
420    ///
421    /// 5.12.6
422    fn immediate_all_to_all_into<'a, Sc, S: ?Sized, R: ?Sized>(
423        &self,
424        scope: Sc,
425        sendbuf: &'a S,
426        recvbuf: &'a mut R,
427    ) -> Request<'a, Sc>
428    where
429        S: 'a + Buffer,
430        R: 'a + BufferMut,
431        Sc: Scope<'a>,
432    {
433        let c_size = self.size();
434        unsafe {
435            Request::from_raw(
436                with_uninitialized(|request| {
437                    ffi::MPI_Ialltoall(
438                        sendbuf.pointer(),
439                        sendbuf.count() / c_size,
440                        sendbuf.as_datatype().as_raw(),
441                        recvbuf.pointer_mut(),
442                        recvbuf.count() / c_size,
443                        recvbuf.as_datatype().as_raw(),
444                        self.as_raw(),
445                        request,
446                    )
447                })
448                .1,
449                scope,
450            )
451        }
452    }
453
454    /// Initiate non-blocking all-to-all communication.
455    ///
456    /// # Standard section(s)
457    ///
458    /// 5.12.6
459    fn immediate_all_to_all_varcount_into<'a, Sc, S: ?Sized, R: ?Sized>(
460        &self,
461        scope: Sc,
462        sendbuf: &'a S,
463        recvbuf: &'a mut R,
464    ) -> Request<'a, Sc>
465    where
466        S: 'a + PartitionedBuffer,
467        R: 'a + PartitionedBufferMut,
468        Sc: Scope<'a>,
469    {
470        unsafe {
471            Request::from_raw(
472                with_uninitialized(|request| {
473                    ffi::MPI_Ialltoallv(
474                        sendbuf.pointer(),
475                        sendbuf.counts().as_ptr(),
476                        sendbuf.displs().as_ptr(),
477                        sendbuf.as_datatype().as_raw(),
478                        recvbuf.pointer_mut(),
479                        recvbuf.counts().as_ptr(),
480                        recvbuf.displs().as_ptr(),
481                        recvbuf.as_datatype().as_raw(),
482                        self.as_raw(),
483                        request,
484                    )
485                })
486                .1,
487                scope,
488            )
489        }
490    }
491
492    /// Initiates a non-blocking global reduction under the operation `op` of the input data in
493    /// `sendbuf` and stores the result in `recvbuf` on all processes.
494    ///
495    /// # Examples
496    ///
497    /// See `examples/immediate_reduce.rs`
498    ///
499    /// # Standard section(s)
500    ///
501    /// 5.12.8
502    fn immediate_all_reduce_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
503        &self,
504        scope: Sc,
505        sendbuf: &'a S,
506        recvbuf: &'a mut R,
507        op: O,
508    ) -> Request<'a, Sc>
509    where
510        S: 'a + Buffer,
511        R: 'a + BufferMut,
512        O: 'a + Operation,
513        Sc: Scope<'a>,
514    {
515        unsafe {
516            Request::from_raw(
517                with_uninitialized(|request| {
518                    ffi::MPI_Iallreduce(
519                        sendbuf.pointer(),
520                        recvbuf.pointer_mut(),
521                        sendbuf.count(),
522                        sendbuf.as_datatype().as_raw(),
523                        op.as_raw(),
524                        self.as_raw(),
525                        request,
526                    )
527                })
528                .1,
529                scope,
530            )
531        }
532    }
533
534    /// Initiates a non-blocking element-wise global reduction under the operation `op` of the
535    /// input data in `sendbuf` and scatters the result into equal sized blocks in the receive
536    /// buffers on all processes.
537    ///
538    /// # Examples
539    ///
540    /// See `examples/immediate_reduce.rs`
541    ///
542    /// # Standard section(s)
543    ///
544    /// 5.12.9
545    fn immediate_reduce_scatter_block_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
546        &self,
547        scope: Sc,
548        sendbuf: &'a S,
549        recvbuf: &'a mut R,
550        op: O,
551    ) -> Request<'a, Sc>
552    where
553        S: 'a + Buffer,
554        R: 'a + BufferMut,
555        O: 'a + Operation,
556        Sc: Scope<'a>,
557    {
558        assert_eq!(recvbuf.count() * self.size(), sendbuf.count());
559        unsafe {
560            Request::from_raw(
561                with_uninitialized(|request| {
562                    ffi::MPI_Ireduce_scatter_block(
563                        sendbuf.pointer(),
564                        recvbuf.pointer_mut(),
565                        recvbuf.count(),
566                        sendbuf.as_datatype().as_raw(),
567                        op.as_raw(),
568                        self.as_raw(),
569                        request,
570                    )
571                })
572                .1,
573                scope,
574            )
575        }
576    }
577
578    /// Initiates a non-blocking global inclusive prefix reduction of the data in `sendbuf` into
579    /// `recvbuf` under operation `op`.
580    ///
581    /// # Examples
582    ///
583    /// See `examples/immediate_scan.rs`
584    ///
585    /// # Standard section(s)
586    ///
587    /// 5.12.11
588    fn immediate_scan_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
589        &self,
590        scope: Sc,
591        sendbuf: &'a S,
592        recvbuf: &'a mut R,
593        op: O,
594    ) -> Request<'a, Sc>
595    where
596        S: 'a + Buffer,
597        R: 'a + BufferMut,
598        O: 'a + Operation,
599        Sc: Scope<'a>,
600    {
601        unsafe {
602            Request::from_raw(
603                with_uninitialized(|request| {
604                    ffi::MPI_Iscan(
605                        sendbuf.pointer(),
606                        recvbuf.pointer_mut(),
607                        sendbuf.count(),
608                        sendbuf.as_datatype().as_raw(),
609                        op.as_raw(),
610                        self.as_raw(),
611                        request,
612                    )
613                })
614                .1,
615                scope,
616            )
617        }
618    }
619
620    /// Initiates a non-blocking global exclusive prefix reduction of the data in `sendbuf` into
621    /// `recvbuf` under operation `op`.
622    ///
623    /// # Examples
624    ///
625    /// See `examples/immediate_scan.rs`
626    ///
627    /// # Standard section(s)
628    ///
629    /// 5.12.12
630    fn immediate_exclusive_scan_into<'a, Sc, S: ?Sized, R: ?Sized, O>(
631        &self,
632        scope: Sc,
633        sendbuf: &'a S,
634        recvbuf: &'a mut R,
635        op: O,
636    ) -> Request<'a, Sc>
637    where
638        S: 'a + Buffer,
639        R: 'a + BufferMut,
640        O: 'a + Operation,
641        Sc: Scope<'a>,
642    {
643        unsafe {
644            Request::from_raw(
645                with_uninitialized(|request| {
646                    ffi::MPI_Iexscan(
647                        sendbuf.pointer(),
648                        recvbuf.pointer_mut(),
649                        sendbuf.count(),
650                        sendbuf.as_datatype().as_raw(),
651                        op.as_raw(),
652                        self.as_raw(),
653                        request,
654                    )
655                })
656                .1,
657                scope,
658            )
659        }
660    }
661}
662
663impl<C: Communicator> CommunicatorCollectives for C {}
664
665/// Something that can take the role of 'root' in a collective operation.
666///
667/// Many collective operations define a 'root' process that takes a special role in the
668/// communication. These collective operations are implemented as default methods of this trait.
669pub trait Root: AsCommunicator {
670    /// Rank of the root process
671    fn root_rank(&self) -> Rank;
672
673    /// Broadcast of the contents of a buffer
674    ///
675    /// After the call completes, the `Buffer` on all processes in the `Communicator` of the `Root`
676    /// `&self` will contain what it contains on the `Root`.
677    ///
678    /// # Examples
679    ///
680    /// See `examples/broadcast.rs`
681    ///
682    /// # Standard section(s)
683    ///
684    /// 5.4
685    fn broadcast_into<Buf: ?Sized>(&self, buffer: &mut Buf)
686    where
687        Buf: BufferMut,
688    {
689        unsafe {
690            ffi::MPI_Bcast(
691                buffer.pointer_mut(),
692                buffer.count(),
693                buffer.as_datatype().as_raw(),
694                self.root_rank(),
695                self.as_communicator().as_raw(),
696            );
697        }
698    }
699
700    /// Gather contents of buffers on `Root`.
701    ///
702    /// After the call completes, the contents of the `Buffer`s on all ranks will be
703    /// concatenated into the `Buffer` on `Root`.
704    ///
705    /// All send `Buffer`s must have the same count of elements.
706    ///
707    /// This function must be called on all non-root processes.
708    ///
709    /// # Examples
710    ///
711    /// See `examples/gather.rs`
712    ///
713    /// # Standard section(s)
714    ///
715    /// 5.5
716    fn gather_into<S: ?Sized>(&self, sendbuf: &S)
717    where
718        S: Buffer,
719    {
720        assert_ne!(self.as_communicator().rank(), self.root_rank());
721        unsafe {
722            ffi::MPI_Gather(
723                sendbuf.pointer(),
724                sendbuf.count(),
725                sendbuf.as_datatype().as_raw(),
726                ptr::null_mut(),
727                0,
728                u8::equivalent_datatype().as_raw(),
729                self.root_rank(),
730                self.as_communicator().as_raw(),
731            );
732        }
733    }
734
735    /// Gather contents of buffers on `Root`.
736    ///
737    /// After the call completes, the contents of the `Buffer`s on all ranks will be
738    /// concatenated into the `Buffer` on `Root`.
739    ///
740    /// All send `Buffer`s must have the same count of elements.
741    ///
742    /// This function must be called on the root process.
743    ///
744    /// # Examples
745    ///
746    /// See `examples/gather.rs`
747    ///
748    /// # Standard section(s)
749    ///
750    /// 5.5
751    fn gather_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
752    where
753        S: Buffer,
754        R: BufferMut,
755    {
756        assert_eq!(self.as_communicator().rank(), self.root_rank());
757        unsafe {
758            let recvcount = recvbuf.count() / self.as_communicator().size();
759            ffi::MPI_Gather(
760                sendbuf.pointer(),
761                sendbuf.count(),
762                sendbuf.as_datatype().as_raw(),
763                recvbuf.pointer_mut(),
764                recvcount,
765                recvbuf.as_datatype().as_raw(),
766                self.root_rank(),
767                self.as_communicator().as_raw(),
768            );
769        }
770    }
771
772    /// Gather contents of buffers on `Root`.
773    ///
774    /// After the call completes, the contents of the `Buffer`s on all ranks will be
775    /// concatenated into the `Buffer` on `Root`.
776    ///
777    /// The send `Buffer`s may contain different counts of elements on different processes. The
778    /// distribution of elements in the receive `Buffer` is specified via `Partitioned`.
779    ///
780    /// This function must be called on all non-root processes.
781    ///
782    /// # Examples
783    ///
784    /// See `examples/gather_varcount.rs`
785    ///
786    /// # Standard section(s)
787    ///
788    /// 5.5
789    fn gather_varcount_into<S: ?Sized>(&self, sendbuf: &S)
790    where
791        S: Buffer,
792    {
793        assert_ne!(self.as_communicator().rank(), self.root_rank());
794        unsafe {
795            ffi::MPI_Gatherv(
796                sendbuf.pointer(),
797                sendbuf.count(),
798                sendbuf.as_datatype().as_raw(),
799                ptr::null_mut(),
800                ptr::null(),
801                ptr::null(),
802                u8::equivalent_datatype().as_raw(),
803                self.root_rank(),
804                self.as_communicator().as_raw(),
805            );
806        }
807    }
808
809    /// Gather contents of buffers on `Root`.
810    ///
811    /// After the call completes, the contents of the `Buffer`s on all ranks will be
812    /// concatenated into the `Buffer` on `Root`.
813    ///
814    /// The send `Buffer`s may contain different counts of elements on different processes. The
815    /// distribution of elements in the receive `Buffer` is specified via `Partitioned`.
816    ///
817    /// This function must be called on the root process.
818    ///
819    /// # Examples
820    ///
821    /// See `examples/gather_varcount.rs`
822    ///
823    /// # Standard section(s)
824    ///
825    /// 5.5
826    fn gather_varcount_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
827    where
828        S: Buffer,
829        R: PartitionedBufferMut,
830    {
831        assert_eq!(self.as_communicator().rank(), self.root_rank());
832        unsafe {
833            ffi::MPI_Gatherv(
834                sendbuf.pointer(),
835                sendbuf.count(),
836                sendbuf.as_datatype().as_raw(),
837                recvbuf.pointer_mut(),
838                recvbuf.counts().as_ptr(),
839                recvbuf.displs().as_ptr(),
840                recvbuf.as_datatype().as_raw(),
841                self.root_rank(),
842                self.as_communicator().as_raw(),
843            );
844        }
845    }
846
847    /// Scatter contents of a buffer on the root process to all processes.
848    ///
849    /// After the call completes each participating process will have received a part of the send
850    /// `Buffer` on the root process.
851    ///
852    /// All send `Buffer`s must have the same count of elements.
853    ///
854    /// This function must be called on all non-root processes.
855    ///
856    /// # Examples
857    ///
858    /// See `examples/scatter.rs`
859    ///
860    /// # Standard section(s)
861    ///
862    /// 5.6
863    fn scatter_into<R: ?Sized>(&self, recvbuf: &mut R)
864    where
865        R: BufferMut,
866    {
867        assert_ne!(self.as_communicator().rank(), self.root_rank());
868        unsafe {
869            ffi::MPI_Scatter(
870                ptr::null(),
871                0,
872                u8::equivalent_datatype().as_raw(),
873                recvbuf.pointer_mut(),
874                recvbuf.count(),
875                recvbuf.as_datatype().as_raw(),
876                self.root_rank(),
877                self.as_communicator().as_raw(),
878            );
879        }
880    }
881
882    /// Scatter contents of a buffer on the root process to all processes.
883    ///
884    /// After the call completes each participating process will have received a part of the send
885    /// `Buffer` on the root process.
886    ///
887    /// All send `Buffer`s must have the same count of elements.
888    ///
889    /// This function must be called on the root process.
890    ///
891    /// # Examples
892    ///
893    /// See `examples/scatter.rs`
894    ///
895    /// # Standard section(s)
896    ///
897    /// 5.6
898    fn scatter_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
899    where
900        S: Buffer,
901        R: BufferMut,
902    {
903        assert_eq!(self.as_communicator().rank(), self.root_rank());
904        let sendcount = sendbuf.count() / self.as_communicator().size();
905        unsafe {
906            ffi::MPI_Scatter(
907                sendbuf.pointer(),
908                sendcount,
909                sendbuf.as_datatype().as_raw(),
910                recvbuf.pointer_mut(),
911                recvbuf.count(),
912                recvbuf.as_datatype().as_raw(),
913                self.root_rank(),
914                self.as_communicator().as_raw(),
915            );
916        }
917    }
918
919    /// Scatter contents of a buffer on the root process to all processes.
920    ///
921    /// After the call completes each participating process will have received a part of the send
922    /// `Buffer` on the root process.
923    ///
924    /// The send `Buffer` may contain different counts of elements for different processes. The
925    /// distribution of elements in the send `Buffer` is specified via `Partitioned`.
926    ///
927    /// This function must be called on all non-root processes.
928    ///
929    /// # Examples
930    ///
931    /// See `examples/scatter_varcount.rs`
932    ///
933    /// # Standard section(s)
934    ///
935    /// 5.6
936    fn scatter_varcount_into<R: ?Sized>(&self, recvbuf: &mut R)
937    where
938        R: BufferMut,
939    {
940        assert_ne!(self.as_communicator().rank(), self.root_rank());
941        unsafe {
942            ffi::MPI_Scatterv(
943                ptr::null(),
944                ptr::null(),
945                ptr::null(),
946                u8::equivalent_datatype().as_raw(),
947                recvbuf.pointer_mut(),
948                recvbuf.count(),
949                recvbuf.as_datatype().as_raw(),
950                self.root_rank(),
951                self.as_communicator().as_raw(),
952            );
953        }
954    }
955
956    /// Scatter contents of a buffer on the root process to all processes.
957    ///
958    /// After the call completes each participating process will have received a part of the send
959    /// `Buffer` on the root process.
960    ///
961    /// The send `Buffer` may contain different counts of elements for different processes. The
962    /// distribution of elements in the send `Buffer` is specified via `Partitioned`.
963    ///
964    /// This function must be called on the root process.
965    ///
966    /// # Examples
967    ///
968    /// See `examples/scatter_varcount.rs`
969    ///
970    /// # Standard section(s)
971    ///
972    /// 5.6
973    fn scatter_varcount_into_root<S: ?Sized, R: ?Sized>(&self, sendbuf: &S, recvbuf: &mut R)
974    where
975        S: PartitionedBuffer,
976        R: BufferMut,
977    {
978        assert_eq!(self.as_communicator().rank(), self.root_rank());
979        unsafe {
980            ffi::MPI_Scatterv(
981                sendbuf.pointer(),
982                sendbuf.counts().as_ptr(),
983                sendbuf.displs().as_ptr(),
984                sendbuf.as_datatype().as_raw(),
985                recvbuf.pointer_mut(),
986                recvbuf.count(),
987                recvbuf.as_datatype().as_raw(),
988                self.root_rank(),
989                self.as_communicator().as_raw(),
990            );
991        }
992    }
993
994    /// Performs a global reduction under the operation `op` of the input data in `sendbuf` and
995    /// stores the result on the `Root` process.
996    ///
997    /// This function must be called on all non-root processes.
998    ///
999    /// # Examples
1000    ///
1001    /// See `examples/reduce.rs`
1002    ///
1003    /// # Standard section(s)
1004    ///
1005    /// 5.9.1
1006    fn reduce_into<S: ?Sized, O>(&self, sendbuf: &S, op: O)
1007    where
1008        S: Buffer,
1009        O: Operation,
1010    {
1011        assert_ne!(self.as_communicator().rank(), self.root_rank());
1012        unsafe {
1013            ffi::MPI_Reduce(
1014                sendbuf.pointer(),
1015                ptr::null_mut(),
1016                sendbuf.count(),
1017                sendbuf.as_datatype().as_raw(),
1018                op.as_raw(),
1019                self.root_rank(),
1020                self.as_communicator().as_raw(),
1021            );
1022        }
1023    }
1024
1025    /// Performs a global reduction under the operation `op` of the input data in `sendbuf` and
1026    /// stores the result on the `Root` process.
1027    ///
1028    /// This function must be called on the root process.
1029    ///
1030    /// # Examples
1031    ///
1032    /// See `examples/reduce.rs`
1033    ///
1034    /// # Standard section(s)
1035    ///
1036    /// 5.9.1
1037    fn reduce_into_root<S: ?Sized, R: ?Sized, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
1038    where
1039        S: Buffer,
1040        R: BufferMut,
1041        O: Operation,
1042    {
1043        assert_eq!(self.as_communicator().rank(), self.root_rank());
1044        unsafe {
1045            ffi::MPI_Reduce(
1046                sendbuf.pointer(),
1047                recvbuf.pointer_mut(),
1048                sendbuf.count(),
1049                sendbuf.as_datatype().as_raw(),
1050                op.as_raw(),
1051                self.root_rank(),
1052                self.as_communicator().as_raw(),
1053            );
1054        }
1055    }
1056
1057    /// Initiate broadcast of a value from the `Root` process to all other processes.
1058    ///
1059    /// # Examples
1060    ///
1061    /// See `examples/immediate_broadcast.rs`
1062    ///
1063    /// # Standard section(s)
1064    ///
1065    /// 5.12.2
1066    fn immediate_broadcast_into<'a, Sc, Buf: ?Sized>(
1067        &self,
1068        scope: Sc,
1069        buf: &'a mut Buf,
1070    ) -> Request<'a, Sc>
1071    where
1072        Buf: 'a + BufferMut,
1073        Sc: Scope<'a>,
1074    {
1075        unsafe {
1076            Request::from_raw(
1077                with_uninitialized(|request| {
1078                    ffi::MPI_Ibcast(
1079                        buf.pointer_mut(),
1080                        buf.count(),
1081                        buf.as_datatype().as_raw(),
1082                        self.root_rank(),
1083                        self.as_communicator().as_raw(),
1084                        request,
1085                    )
1086                })
1087                .1,
1088                scope,
1089            )
1090        }
1091    }
1092
1093    /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1094    ///
1095    /// This function must be called on all non-root processes.
1096    ///
1097    /// # Examples
1098    ///
1099    /// See `examples/immediate_gather.rs`
1100    ///
1101    /// # Standard section(s)
1102    ///
1103    /// 5.12.3
1104    fn immediate_gather_into<'a, Sc, S: ?Sized>(&self, scope: Sc, sendbuf: &'a S) -> Request<'a, Sc>
1105    where
1106        S: 'a + Buffer,
1107        Sc: Scope<'a>,
1108    {
1109        assert_ne!(self.as_communicator().rank(), self.root_rank());
1110        unsafe {
1111            Request::from_raw(
1112                with_uninitialized(|request| {
1113                    ffi::MPI_Igather(
1114                        sendbuf.pointer(),
1115                        sendbuf.count(),
1116                        sendbuf.as_datatype().as_raw(),
1117                        ptr::null_mut(),
1118                        0,
1119                        u8::equivalent_datatype().as_raw(),
1120                        self.root_rank(),
1121                        self.as_communicator().as_raw(),
1122                        request,
1123                    )
1124                })
1125                .1,
1126                scope,
1127            )
1128        }
1129    }
1130
1131    /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1132    ///
1133    /// This function must be called on the root processes.
1134    ///
1135    /// # Examples
1136    ///
1137    /// See `examples/immediate_gather.rs`
1138    ///
1139    /// # Standard section(s)
1140    ///
1141    /// 5.12.3
1142    fn immediate_gather_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
1143        &self,
1144        scope: Sc,
1145        sendbuf: &'a S,
1146        recvbuf: &'a mut R,
1147    ) -> Request<'a, Sc>
1148    where
1149        S: 'a + Buffer,
1150        R: 'a + BufferMut,
1151        Sc: Scope<'a>,
1152    {
1153        assert_eq!(self.as_communicator().rank(), self.root_rank());
1154        unsafe {
1155            let recvcount = recvbuf.count() / self.as_communicator().size();
1156            Request::from_raw(
1157                with_uninitialized(|request| {
1158                    ffi::MPI_Igather(
1159                        sendbuf.pointer(),
1160                        sendbuf.count(),
1161                        sendbuf.as_datatype().as_raw(),
1162                        recvbuf.pointer_mut(),
1163                        recvcount,
1164                        recvbuf.as_datatype().as_raw(),
1165                        self.root_rank(),
1166                        self.as_communicator().as_raw(),
1167                        request,
1168                    )
1169                })
1170                .1,
1171                scope,
1172            )
1173        }
1174    }
1175
1176    /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1177    ///
1178    /// This function must be called on all non-root processes.
1179    ///
1180    /// # Examples
1181    ///
1182    /// See `examples/immediate_gather_varcount.rs`
1183    ///
1184    /// # Standard section(s)
1185    ///
1186    /// 5.12.3
1187    fn immediate_gather_varcount_into<'a, Sc, S: ?Sized>(
1188        &self,
1189        scope: Sc,
1190        sendbuf: &'a S,
1191    ) -> Request<'a, Sc>
1192    where
1193        S: 'a + Buffer,
1194        Sc: Scope<'a>,
1195    {
1196        assert_ne!(self.as_communicator().rank(), self.root_rank());
1197        unsafe {
1198            Request::from_raw(
1199                with_uninitialized(|request| {
1200                    ffi::MPI_Igatherv(
1201                        sendbuf.pointer(),
1202                        sendbuf.count(),
1203                        sendbuf.as_datatype().as_raw(),
1204                        ptr::null_mut(),
1205                        ptr::null(),
1206                        ptr::null(),
1207                        u8::equivalent_datatype().as_raw(),
1208                        self.root_rank(),
1209                        self.as_communicator().as_raw(),
1210                        request,
1211                    )
1212                })
1213                .1,
1214                scope,
1215            )
1216        }
1217    }
1218
1219    /// Initiate non-blocking gather of the contents of all `sendbuf`s on `Root` `&self`.
1220    ///
1221    /// This function must be called on the root processes.
1222    ///
1223    /// # Examples
1224    ///
1225    /// See `examples/immediate_gather_varcount.rs`
1226    ///
1227    /// # Standard section(s)
1228    ///
1229    /// 5.12.3
1230    fn immediate_gather_varcount_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
1231        &self,
1232        scope: Sc,
1233        sendbuf: &'a S,
1234        recvbuf: &'a mut R,
1235    ) -> Request<'a, Sc>
1236    where
1237        S: 'a + Buffer,
1238        R: 'a + PartitionedBufferMut,
1239        Sc: Scope<'a>,
1240    {
1241        assert_eq!(self.as_communicator().rank(), self.root_rank());
1242        unsafe {
1243            Request::from_raw(
1244                with_uninitialized(|request| {
1245                    ffi::MPI_Igatherv(
1246                        sendbuf.pointer(),
1247                        sendbuf.count(),
1248                        sendbuf.as_datatype().as_raw(),
1249                        recvbuf.pointer_mut(),
1250                        recvbuf.counts().as_ptr(),
1251                        recvbuf.displs().as_ptr(),
1252                        recvbuf.as_datatype().as_raw(),
1253                        self.root_rank(),
1254                        self.as_communicator().as_raw(),
1255                        request,
1256                    )
1257                })
1258                .1,
1259                scope,
1260            )
1261        }
1262    }
1263
1264    /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1265    ///
1266    /// This function must be called on all non-root processes.
1267    ///
1268    /// # Examples
1269    ///
1270    /// See `examples/immediate_scatter.rs`
1271    ///
1272    /// # Standard section(s)
1273    ///
1274    /// 5.12.4
1275    fn immediate_scatter_into<'a, Sc, R: ?Sized>(
1276        &self,
1277        scope: Sc,
1278        recvbuf: &'a mut R,
1279    ) -> Request<'a, Sc>
1280    where
1281        R: 'a + BufferMut,
1282        Sc: Scope<'a>,
1283    {
1284        assert_ne!(self.as_communicator().rank(), self.root_rank());
1285        unsafe {
1286            Request::from_raw(
1287                with_uninitialized(|request| {
1288                    ffi::MPI_Iscatter(
1289                        ptr::null(),
1290                        0,
1291                        u8::equivalent_datatype().as_raw(),
1292                        recvbuf.pointer_mut(),
1293                        recvbuf.count(),
1294                        recvbuf.as_datatype().as_raw(),
1295                        self.root_rank(),
1296                        self.as_communicator().as_raw(),
1297                        request,
1298                    )
1299                })
1300                .1,
1301                scope,
1302            )
1303        }
1304    }
1305
1306    /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1307    ///
1308    /// This function must be called on the root processes.
1309    ///
1310    /// # Examples
1311    ///
1312    /// See `examples/immediate_scatter.rs`
1313    ///
1314    /// # Standard section(s)
1315    ///
1316    /// 5.12.4
1317    fn immediate_scatter_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
1318        &self,
1319        scope: Sc,
1320        sendbuf: &'a S,
1321        recvbuf: &'a mut R,
1322    ) -> Request<'a, Sc>
1323    where
1324        S: 'a + Buffer,
1325        R: 'a + BufferMut,
1326        Sc: Scope<'a>,
1327    {
1328        assert_eq!(self.as_communicator().rank(), self.root_rank());
1329        unsafe {
1330            let sendcount = sendbuf.count() / self.as_communicator().size();
1331            Request::from_raw(
1332                with_uninitialized(|request| {
1333                    ffi::MPI_Iscatter(
1334                        sendbuf.pointer(),
1335                        sendcount,
1336                        sendbuf.as_datatype().as_raw(),
1337                        recvbuf.pointer_mut(),
1338                        recvbuf.count(),
1339                        recvbuf.as_datatype().as_raw(),
1340                        self.root_rank(),
1341                        self.as_communicator().as_raw(),
1342                        request,
1343                    )
1344                })
1345                .1,
1346                scope,
1347            )
1348        }
1349    }
1350
1351    /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1352    ///
1353    /// This function must be called on all non-root processes.
1354    ///
1355    /// # Examples
1356    ///
1357    /// See `examples/immediate_scatter_varcount.rs`
1358    ///
1359    /// # Standard section(s)
1360    ///
1361    /// 5.12.4
1362    fn immediate_scatter_varcount_into<'a, Sc, R: ?Sized>(
1363        &self,
1364        scope: Sc,
1365        recvbuf: &'a mut R,
1366    ) -> Request<'a, Sc>
1367    where
1368        R: 'a + BufferMut,
1369        Sc: Scope<'a>,
1370    {
1371        assert_ne!(self.as_communicator().rank(), self.root_rank());
1372        unsafe {
1373            Request::from_raw(
1374                with_uninitialized(|request| {
1375                    ffi::MPI_Iscatterv(
1376                        ptr::null(),
1377                        ptr::null(),
1378                        ptr::null(),
1379                        u8::equivalent_datatype().as_raw(),
1380                        recvbuf.pointer_mut(),
1381                        recvbuf.count(),
1382                        recvbuf.as_datatype().as_raw(),
1383                        self.root_rank(),
1384                        self.as_communicator().as_raw(),
1385                        request,
1386                    )
1387                })
1388                .1,
1389                scope,
1390            )
1391        }
1392    }
1393
1394    /// Initiate non-blocking scatter of the contents of `sendbuf` from `Root` `&self`.
1395    ///
1396    /// This function must be called on the root processes.
1397    ///
1398    /// # Examples
1399    ///
1400    /// See `examples/immediate_scatter_varcount.rs`
1401    ///
1402    /// # Standard section(s)
1403    ///
1404    /// 5.12.4
1405    fn immediate_scatter_varcount_into_root<'a, Sc, S: ?Sized, R: ?Sized>(
1406        &self,
1407        scope: Sc,
1408        sendbuf: &'a S,
1409        recvbuf: &'a mut R,
1410    ) -> Request<'a, Sc>
1411    where
1412        S: 'a + PartitionedBuffer,
1413        R: 'a + BufferMut,
1414        Sc: Scope<'a>,
1415    {
1416        assert_eq!(self.as_communicator().rank(), self.root_rank());
1417        unsafe {
1418            Request::from_raw(
1419                with_uninitialized(|request| {
1420                    ffi::MPI_Iscatterv(
1421                        sendbuf.pointer(),
1422                        sendbuf.counts().as_ptr(),
1423                        sendbuf.displs().as_ptr(),
1424                        sendbuf.as_datatype().as_raw(),
1425                        recvbuf.pointer_mut(),
1426                        recvbuf.count(),
1427                        recvbuf.as_datatype().as_raw(),
1428                        self.root_rank(),
1429                        self.as_communicator().as_raw(),
1430                        request,
1431                    )
1432                })
1433                .1,
1434                scope,
1435            )
1436        }
1437    }
1438
1439    /// Initiates a non-blacking global reduction under the operation `op` of the input data in
1440    /// `sendbuf` and stores the result on the `Root` process.
1441    ///
1442    /// This function must be called on all non-root processes.
1443    ///
1444    /// # Examples
1445    ///
1446    /// See `examples/immediate_reduce.rs`
1447    ///
1448    /// # Standard section(s)
1449    ///
1450    /// 5.12.7
1451    fn immediate_reduce_into<'a, Sc, S: ?Sized, O>(
1452        &self,
1453        scope: Sc,
1454        sendbuf: &'a S,
1455        op: O,
1456    ) -> Request<'a, Sc>
1457    where
1458        S: 'a + Buffer,
1459        O: 'a + Operation,
1460        Sc: Scope<'a>,
1461    {
1462        assert_ne!(self.as_communicator().rank(), self.root_rank());
1463        unsafe {
1464            Request::from_raw(
1465                with_uninitialized(|request| {
1466                    ffi::MPI_Ireduce(
1467                        sendbuf.pointer(),
1468                        ptr::null_mut(),
1469                        sendbuf.count(),
1470                        sendbuf.as_datatype().as_raw(),
1471                        op.as_raw(),
1472                        self.root_rank(),
1473                        self.as_communicator().as_raw(),
1474                        request,
1475                    )
1476                })
1477                .1,
1478                scope,
1479            )
1480        }
1481    }
1482
1483    /// Initiates a non-blocking global reduction under the operation `op` of the input data in
1484    /// `sendbuf` and stores the result on the `Root` process.
1485    ///
1486    /// # Examples
1487    ///
1488    /// See `examples/immediate_reduce.rs`
1489    ///
1490    /// This function must be called on the root process.
1491    ///
1492    /// # Standard section(s)
1493    ///
1494    /// 5.12.7
1495    fn immediate_reduce_into_root<'a, Sc, S: ?Sized, R: ?Sized, O>(
1496        &self,
1497        scope: Sc,
1498        sendbuf: &'a S,
1499        recvbuf: &'a mut R,
1500        op: O,
1501    ) -> Request<'a, Sc>
1502    where
1503        S: 'a + Buffer,
1504        R: 'a + BufferMut,
1505        O: 'a + Operation,
1506        Sc: Scope<'a>,
1507    {
1508        assert_eq!(self.as_communicator().rank(), self.root_rank());
1509        unsafe {
1510            Request::from_raw(
1511                with_uninitialized(|request| {
1512                    ffi::MPI_Ireduce(
1513                        sendbuf.pointer(),
1514                        recvbuf.pointer_mut(),
1515                        sendbuf.count(),
1516                        sendbuf.as_datatype().as_raw(),
1517                        op.as_raw(),
1518                        self.root_rank(),
1519                        self.as_communicator().as_raw(),
1520                        request,
1521                    )
1522                })
1523                .1,
1524                scope,
1525            )
1526        }
1527    }
1528}
1529
1530impl<'a, C: 'a + Communicator> Root for Process<'a, C> {
1531    fn root_rank(&self) -> Rank {
1532        self.rank()
1533    }
1534}
1535
1536/// An operation to be used in a reduction or scan type operation, e.g. `MPI_SUM`
1537pub trait Operation: AsRaw<Raw = MPI_Op> {
1538    /// Returns whether the operation is commutative.
1539    ///
1540    /// # Standard section(s)
1541    ///
1542    /// 5.9.7
1543    fn is_commutative(&self) -> bool {
1544        unsafe {
1545            let mut commute = 0;
1546            ffi::MPI_Op_commutative(self.as_raw(), &mut commute);
1547            commute != 0
1548        }
1549    }
1550}
1551impl<'a, T: 'a + Operation> Operation for &'a T {}
1552
1553/// A built-in operation like `MPI_SUM`
1554///
1555/// # Examples
1556///
1557/// See `examples/reduce.rs`
1558///
1559/// # Standard section(s)
1560///
1561/// 5.9.2
1562#[derive(Copy, Clone)]
1563pub struct SystemOperation(MPI_Op);
1564
1565macro_rules! system_operation_constructors {
1566    ($($ctor:ident => $val:path),*) => (
1567        $(pub fn $ctor() -> SystemOperation {
1568            //! A built-in operation
1569            SystemOperation(unsafe { $val })
1570        })*
1571    )
1572}
1573
1574impl SystemOperation {
1575    system_operation_constructors! {
1576        max => ffi::RSMPI_MAX,
1577        min => ffi::RSMPI_MIN,
1578        sum => ffi::RSMPI_SUM,
1579        product => ffi::RSMPI_PROD,
1580        logical_and => ffi::RSMPI_LAND,
1581        bitwise_and => ffi::RSMPI_BAND,
1582        logical_or => ffi::RSMPI_LOR,
1583        bitwise_or => ffi::RSMPI_BOR,
1584        logical_xor => ffi::RSMPI_LXOR,
1585        bitwise_xor => ffi::RSMPI_BXOR
1586    }
1587}
1588
1589unsafe impl AsRaw for SystemOperation {
1590    type Raw = MPI_Op;
1591    fn as_raw(&self) -> Self::Raw {
1592        self.0
1593    }
1594}
1595
1596impl Operation for SystemOperation {}
1597
1598trait Erased {}
1599
1600impl<T> Erased for T {}
1601
1602/// A user-defined operation.
1603///
1604/// The lifetime `'a` of the operation is limited by the lifetime of the underlying closure.
1605///
1606/// For safety reasons, `UserOperation` is in of itself not considered an `Operation`, but a
1607/// reference of it is.  This limitation may be lifted in the future when `Request` objects can
1608/// store finalizers.
1609///
1610/// **Note:** When a `UserOperation` is passed to a non-blocking API call, it must outlive the
1611/// completion of the request.  This is normally enforced by the safe API, so this is only a concern
1612/// if you use the unsafe API.  Do not rely on MPI's internal reference-counting here, because once
1613/// `UserOperation` is destroyed, the closure object will be deallocated even if the `MPI_Op` handle
1614/// is still alive due to outstanding references.
1615///
1616/// # Examples
1617///
1618/// See `examples/reduce.rs` and `examples/immediate_reduce.rs`
1619#[cfg(feature = "user-operations")]
1620pub struct UserOperation<'a> {
1621    op: MPI_Op,
1622    _anchor: Box<dyn Erased + 'a>, // keeps the internal data alive
1623}
1624
1625#[cfg(feature = "user-operations")]
1626impl<'a> fmt::Debug for UserOperation<'a> {
1627    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1628        f.debug_tuple("UserOperation").field(&self.op).finish()
1629    }
1630}
1631
1632#[cfg(feature = "user-operations")]
1633impl<'a> Drop for UserOperation<'a> {
1634    fn drop(&mut self) {
1635        unsafe {
1636            ffi::MPI_Op_free(&mut self.op);
1637        }
1638    }
1639}
1640
1641#[cfg(feature = "user-operations")]
1642unsafe impl<'a> AsRaw for UserOperation<'a> {
1643    type Raw = MPI_Op;
1644    fn as_raw(&self) -> Self::Raw {
1645        self.op
1646    }
1647}
1648
1649#[cfg(feature = "user-operations")]
1650impl<'a, 'b> Operation for &'b UserOperation<'a> {}
1651
1652#[cfg(feature = "user-operations")]
1653impl<'a> UserOperation<'a> {
1654    /// Define an operation using a closure.  The operation must be associative.
1655    ///
1656    /// This is a more readable shorthand for the `new` method.  Refer to [`new`](#method.new) for
1657    /// more information.
1658    pub fn associative<F>(function: F) -> Self
1659    where
1660        F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
1661    {
1662        Self::new(false, function)
1663    }
1664
1665    /// Define an operation using a closure.  The operation must be both associative and
1666    /// commutative.
1667    ///
1668    /// This is a more readable shorthand for the `new` method.  Refer to [`new`](#method.new) for
1669    /// more information.
1670    pub fn commutative<F>(function: F) -> Self
1671    where
1672        F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
1673    {
1674        Self::new(true, function)
1675    }
1676
1677    /// Creates an associative and possibly commutative operation using a closure.
1678    ///
1679    /// The closure receives two arguments `invec` and `inoutvec` as dynamically typed buffers.  It
1680    /// shall set `inoutvec` to the value of `f(invec, inoutvec)`, where `f` is a binary associative
1681    /// operation.
1682    ///
1683    /// If the operation is also commutative, setting `commute` to `true` may yield performance
1684    /// benefits.
1685    ///
1686    /// **Note:** If the closure panics, the entire program will abort.
1687    ///
1688    /// # Standard section(s)
1689    ///
1690    /// 5.9.5
1691    pub fn new<F>(commute: bool, function: F) -> Self
1692    where
1693        F: Fn(DynBuffer, DynBufferMut) + Sync + 'a,
1694    {
1695        struct ClosureAnchor<F> {
1696            rust_closure: F,
1697            _ffi_closure: Option<Closure<'static>>,
1698        }
1699
1700        unsafe extern "C" fn trampoline<'a, F: Fn(DynBuffer, DynBufferMut) + Sync + 'a>(
1701            cif: &libffi::low::ffi_cif,
1702            _result: &mut c_void,
1703            args: *const *const c_void,
1704            user_function: &F,
1705        ) {
1706            debug_assert_eq!(4, cif.nargs);
1707
1708            let (mut invec, mut inoutvec, len, datatype) = (
1709                *(*args.offset(0) as *const *mut c_void),
1710                *(*args.offset(1) as *const *mut c_void),
1711                *(*args.offset(2) as *const *mut i32),
1712                *(*args.offset(3) as *const *mut ffi::MPI_Datatype),
1713            );
1714
1715            let len = *len;
1716            let datatype = DatatypeRef::from_raw(*datatype);
1717            if len == 0 {
1718                // precautionary measure: ensure pointers are not null
1719                invec = [].as_mut_ptr();
1720                inoutvec = [].as_mut_ptr();
1721            }
1722
1723            user_function(
1724                DynBuffer::from_raw(invec, len, datatype),
1725                DynBufferMut::from_raw(inoutvec, len, datatype),
1726            )
1727        }
1728
1729        // must box it to prevent moves
1730        let mut anchor = Box::new(ClosureAnchor {
1731            rust_closure: function,
1732            _ffi_closure: None,
1733        });
1734
1735        let args = [
1736            Type::pointer(), // void *
1737            Type::pointer(), // void *
1738            Type::pointer(), // int32_t *
1739            Type::pointer(), // MPI_Datatype *
1740        ];
1741        #[allow(unused_mut)]
1742        let mut cif = Cif::new(args.iter().cloned(), Type::void());
1743        // MS-MPI uses "stdcall" calling convention on 32-bit x86
1744        #[cfg(all(msmpi, target_arch = "x86"))]
1745        cif.set_abi(libffi::raw::ffi_abi_FFI_STDCALL);
1746
1747        let op;
1748        anchor._ffi_closure = Some(unsafe {
1749            let ffi_closure = Closure::new(cif, trampoline, &anchor.rust_closure);
1750            op = with_uninitialized(|op| {
1751                ffi::MPI_Op_create(Some(*ffi_closure.instantiate_code_ptr()), commute as _, op)
1752            })
1753            .1;
1754            mem::transmute(ffi_closure) // erase the lifetime
1755        });
1756        UserOperation {
1757            op,
1758            _anchor: anchor,
1759        }
1760    }
1761
1762    /// Creates a `UserOperation` from raw parts.
1763    ///
1764    /// Here, `anchor` is an arbitrary object that is stored alongside the `MPI_Op`.
1765    /// This can be used to attach finalizers to the object.
1766    ///
1767    /// # Safety
1768    /// `MPI_Op` must not be `MPI_OP_NULL`
1769    pub unsafe fn from_raw<T: 'a>(op: MPI_Op, anchor: Box<T>) -> Self {
1770        Self {
1771            op,
1772            _anchor: anchor,
1773        }
1774    }
1775}
1776
1777/// An unsafe user-defined operation.
1778///
1779/// Unsafe user-defined operations are created from pointers to functions that have the unsafe
1780/// signatures of user functions defined in the MPI C bindings, `UnsafeUserFunction`.
1781///
1782/// The recommended way to create user-defined operations is through the safer `UserOperation`
1783/// type. This type can be used as a work-around in situations where the `libffi` dependency is not
1784/// available.
1785pub struct UnsafeUserOperation {
1786    op: MPI_Op,
1787}
1788
1789impl fmt::Debug for UnsafeUserOperation {
1790    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1791        f.debug_tuple("UnsafeUserOperation")
1792            .field(&self.op)
1793            .finish()
1794    }
1795}
1796
1797impl Drop for UnsafeUserOperation {
1798    fn drop(&mut self) {
1799        unsafe {
1800            ffi::MPI_Op_free(&mut self.op);
1801        }
1802    }
1803}
1804
1805unsafe impl AsRaw for UnsafeUserOperation {
1806    type Raw = MPI_Op;
1807    fn as_raw(&self) -> Self::Raw {
1808        self.op
1809    }
1810}
1811
1812impl<'a> Operation for &'a UnsafeUserOperation {}
1813
1814/// A raw pointer to a function that can be used to define an `UnsafeUserOperation`.
1815#[cfg(not(all(msmpi, target_arch = "x86")))]
1816pub type UnsafeUserFunction =
1817    unsafe extern "C" fn(*mut c_void, *mut c_void, *mut c_int, *mut ffi::MPI_Datatype);
1818
1819/// A raw pointer to a function that can be used to define an `UnsafeUserOperation`.
1820///
1821/// MS-MPI uses "stdcall" rather than "C" calling convention. "stdcall" is ignored on x86_64
1822/// Windows and the default calling convention is used instead.
1823#[cfg(all(msmpi, target_arch = "x86"))]
1824pub type UnsafeUserFunction =
1825    unsafe extern "stdcall" fn(*mut c_void, *mut c_void, *mut c_int, *mut ffi::MPI_Datatype);
1826
1827impl UnsafeUserOperation {
1828    /// Define an unsafe operation using a function pointer. The operation must be associative.
1829    ///
1830    /// This is a more readable shorthand for the `new` method.  Refer to [`new`](#method.new) for
1831    /// more information.
1832    ///
1833    /// # Safety
1834    /// The construction of an `UnsafeUserOperation` asserts that `function` is safe to be called
1835    /// in all reductions that this `UnsafeUserOperation` is used in.
1836    pub unsafe fn associative(function: UnsafeUserFunction) -> Self {
1837        Self::new(false, function)
1838    }
1839
1840    /// Define an unsafe operation using a function pointer.  The operation must be both associative
1841    /// and commutative.
1842    ///
1843    /// This is a more readable shorthand for the `new` method.  Refer to [`new`](#method.new) for
1844    /// more information.
1845    ///
1846    /// # Safety
1847    /// The construction of an `UnsafeUserOperation` asserts that `function` is safe to be called
1848    /// in all reductions that this `UnsafeUserOperation` is used in.
1849    pub unsafe fn commutative(function: UnsafeUserFunction) -> Self {
1850        Self::new(true, function)
1851    }
1852
1853    /// Creates an associative and possibly commutative unsafe operation using a function pointer.
1854    ///
1855    /// The function receives raw `*mut c_void` as `invec` and `inoutvec` and the number of elemnts
1856    /// of those two vectors as a `*mut c_int` `len`. It shall set `inoutvec`
1857    /// to the value of `f(invec, inoutvec)`, where `f` is a binary associative operation.
1858    ///
1859    /// If the operation is also commutative, setting `commute` to `true` may yield performance
1860    /// benefits.
1861    ///
1862    /// **Note:** The user function is not allowed to panic.
1863    ///
1864    /// # Standard section(s)
1865    ///
1866    /// 5.9.5
1867    ///
1868    /// # Safety
1869    /// The construction of an `UnsafeUserOperation` asserts that `function` is safe to be called
1870    /// in all reductions that this `UnsafeUserOperation` is used in.
1871    pub unsafe fn new(commute: bool, function: UnsafeUserFunction) -> Self {
1872        UnsafeUserOperation {
1873            op: with_uninitialized(|op| ffi::MPI_Op_create(Some(function), commute as _, op)).1,
1874        }
1875    }
1876}
1877
1878/// Perform a local reduction.
1879///
1880/// # Examples
1881///
1882/// See `examples/reduce.rs`
1883///
1884/// # Standard section(s)
1885///
1886/// 5.9.7
1887#[allow(clippy::needless_pass_by_value)]
1888pub fn reduce_local_into<S: ?Sized, R: ?Sized, O>(inbuf: &S, inoutbuf: &mut R, op: O)
1889where
1890    S: Buffer,
1891    R: BufferMut,
1892    O: Operation,
1893{
1894    unsafe {
1895        ffi::MPI_Reduce_local(
1896            inbuf.pointer(),
1897            inoutbuf.pointer_mut(),
1898            inbuf.count(),
1899            inbuf.as_datatype().as_raw(),
1900            op.as_raw(),
1901        );
1902    }
1903}