openmp_reducer/lib.rs
1/*
2 * SPDX-FileCopyrightText: 2025 Tommaso Fontana
3 * SPDX-FileCopyrightText: 2025 Sebastiano Vigna
4 *
5 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6 */
7
8#![doc = include_str!("../README.md")]
9
10use std::fmt::{self, Debug};
11use std::marker::PhantomData;
12use std::sync::Mutex;
13
14/// An OpenMP-style reducer that wraps a global value into a [`Mutex`],
15/// providing [shareable, cloneable copies with a local value](#method.share);
16/// the copies will be reduced into the global value when dropped.
17///
18/// The global value can be observed with [`peek`](Reducer::peek) if the base
19/// type is [`Clone`], whereas [`get`](Reducer::get) consumes self and returns
20/// the global value.
21///
22/// For convenience, the global value and the local value have distinct type
23/// parameters `G` and `L`, respectively; the second type defaults to the first
24/// one. The reduction function has type parameter `F`, defaulting to
25/// `fn(&mut G, &L)`.
26///
27/// Each shared copy has a reference to the [`Reducer`] it was created from, so
28/// you cannot call [`get`](Reducer::get) if there are still shared copies
29/// around. For example, this code will not compile:
30/// ```compile_fail
31/// use openmp_reducer::Reducer;
32///
33/// let reducer = Reducer::<usize>::new(3, |global, local| *global += *local);
34/// let mut shared = reducer.share();
35/// // drop(shared); // uncommenting this line would make the code compile
36/// assert_eq!(reducer.get(), 3);
37///```
38///
39/// # Thread Safety
40///
41/// [`Reducer`] is [`Send`] and [`Sync`] when `G: Send` and `F: Send + Sync`.
42/// [`SharedReducer`] is [`Send`] when additionally `L: Send`.
43///
44/// # Examples
45///
46/// In this example, we manually spawn processes:
47///
48/// ```rust
49/// use openmp_reducer::Reducer;
50/// use std::thread;
51///
52/// let reducer = Reducer::<usize>::new(5, |global, local| *global += *local);
53/// std::thread::scope(|s| {
54/// for i in 0..3 {
55/// let mut shared = reducer.share();
56/// s.spawn(move || {
57/// *shared.as_mut() += 10;
58/// });
59/// }
60/// });
61///
62/// // Initial value plus additional values from shared copies
63/// assert_eq!(reducer.get(), 35);
64/// ```
65///
66/// You can obtain the same behavior with [Rayon](https://docs.rs/rayon) using
67/// methods such as
68/// [`for_each_with`](https://docs.rs/rayon/latest/rayon/iter/trait.ParallelIterator.html#method.for_each_with)
69/// and
70/// [`map_with`](https://docs.rs/rayon/latest/rayon/iter/trait.ParallelIterator.html#method.map_with):
71///
72/// ```rust
73/// use openmp_reducer::Reducer;
74/// use rayon::prelude::*;
75///
76/// let reducer = Reducer::<usize>::new(5, |global, local| *global += *local);
77/// (0..1000000).into_par_iter().
78/// with_min_len(1000). // optional, might reduce the amount of cloning
79/// for_each_with(reducer.share(), |shared, i| {
80/// *shared.as_mut() += 1;
81/// }
82/// );
83///
84/// // Initial value plus additional values from clones
85/// assert_eq!(reducer.get(), 1_000_005);
86/// ```
87///
88/// Note that you have to pass `reducer.share()`, which can be cloned. Also,
89/// since
90/// [`for_each_with`](https://docs.rs/rayon/latest/rayon/iter/trait.ParallelIterator.html#method.for_each_with)
91/// might perform excessive cloning if jobs are too short, you can use
92/// [`with_min_len`](https://docs.rs/rayon/latest/rayon/iter/trait.ParallelIterator.html#method.with_min_len)
93/// to reduce the amount of cloning.
94pub struct Reducer<G, L = G, F = fn(&mut G, &L)> {
95 global: Mutex<G>,
96 reduce: F,
97 _marker: PhantomData<fn(L)>,
98}
99
100impl<G: Debug, L, F> Debug for Reducer<G, L, F> {
101 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102 f.debug_struct("Reducer")
103 .field("global", &self.global)
104 .finish_non_exhaustive()
105 }
106}
107
108impl<G, L, F: Fn(&mut G, &L)> Reducer<G, L, F> {
109 /// Creates a new reducer with a given reduction function.
110 ///
111 /// The function must reduce the local value (second argument) into the
112 /// global value (first argument). For the result to be deterministic, the
113 /// global value must be the same regardless of the order in which the local
114 /// values are reduced.
115 pub fn new(init: G, reduce: F) -> Self {
116 Reducer {
117 global: Mutex::new(init),
118 reduce,
119 _marker: PhantomData,
120 }
121 }
122}
123
124impl<G, L, F> Reducer<G, L, F> {
125 /// Consumes self and returns the global value.
126 ///
127 /// Note that you cannot call this method if there are still [shared
128 /// copies](#method.share) that have not been dropped.
129 ///
130 /// If you just need to access the global value without consuming self, and
131 /// the base type is [`Clone`], use [`peek`](Reducer::peek).
132 ///
133 /// # Panics
134 ///
135 /// This method will panic if the mutex is poisoned.
136 pub fn get(self) -> G {
137 self.global.into_inner().unwrap()
138 }
139}
140
141impl<G, L: Default, F: Fn(&mut G, &L)> Reducer<G, L, F> {
142 /// Returns a [`SharedReducer`] referencing this [`Reducer`].
143 ///
144 /// The [`SharedReducer`] will be initialized with the default value of the
145 /// local type.
146 pub fn share(&self) -> SharedReducer<'_, G, L, F> {
147 SharedReducer {
148 openmp_reducer: self,
149 local: L::default(),
150 }
151 }
152}
153
154impl<G: Clone, L, F> Reducer<G, L, F> {
155 /// Returns the current global value.
156 ///
157 /// Note that this method does not guarantee that all shared copies have
158 /// been dropped. If you need that guarantee, use [`get`](Reducer::get).
159 ///
160 /// # Panics
161 ///
162 /// This method will panic if the mutex is poisoned.
163 pub fn peek(&self) -> G {
164 self.global.lock().unwrap().clone()
165 }
166}
167
168/// A shareable copy of a [`Reducer`] containing a local value and implementing
169/// [`Clone`].
170///
171/// The local value can be accessed with [`AsRef`] and [`AsMut`]
172/// implementations.
173///
174/// When a [`SharedReducer`] is dropped, the local value will be reduced into
175/// the global value. If the mutex is poisoned (e.g., due to a panic in another
176/// thread), the reduction will still be performed on the recovered value.
177pub struct SharedReducer<'a, G, L = G, F: Fn(&mut G, &L) = fn(&mut G, &L)> {
178 openmp_reducer: &'a Reducer<G, L, F>,
179 local: L,
180}
181
182impl<G: Debug, L: Debug, F: Fn(&mut G, &L)> Debug for SharedReducer<'_, G, L, F> {
183 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
184 f.debug_struct("SharedReducer")
185 .field("openmp_reducer", &self.openmp_reducer)
186 .field("local", &self.local)
187 .finish()
188 }
189}
190
191impl<G, L: Default, F: Fn(&mut G, &L)> Clone for SharedReducer<'_, G, L, F> {
192 /// Returns a copy sharing the same global value and
193 /// with local value initialized to the default value.
194 fn clone(&self) -> Self {
195 SharedReducer {
196 openmp_reducer: self.openmp_reducer,
197 local: L::default(),
198 }
199 }
200}
201
202impl<G: Clone, L, F: Fn(&mut G, &L)> SharedReducer<'_, G, L, F> {
203 /// Returns the current global value.
204 ///
205 /// This method delegates to [`Reducer::peek`].
206 pub fn peek(&self) -> G {
207 self.openmp_reducer.peek()
208 }
209}
210
211impl<G, L, F: Fn(&mut G, &L)> Drop for SharedReducer<'_, G, L, F> {
212 /// Reduces the local value into the global value.
213 fn drop(&mut self) {
214 let mut lock = self.openmp_reducer.global.lock().unwrap_or_else(|e| e.into_inner());
215 (self.openmp_reducer.reduce)(&mut *lock, &self.local);
216 }
217}
218
219impl<G, L, F: Fn(&mut G, &L)> AsRef<L> for SharedReducer<'_, G, L, F> {
220 /// Returns a reference to the local value.
221 fn as_ref(&self) -> &L {
222 &self.local
223 }
224}
225
226impl<G, L, F: Fn(&mut G, &L)> AsMut<L> for SharedReducer<'_, G, L, F> {
227 /// Returns a mutable reference to the local value.
228 fn as_mut(&mut self) -> &mut L {
229 &mut self.local
230 }
231}